diff --git a/opwen_email_server/actions.py b/opwen_email_server/actions.py index 2feec0f3..a1171bf8 100644 --- a/opwen_email_server/actions.py +++ b/opwen_email_server/actions.py @@ -1,6 +1,8 @@ from abc import ABC from hashlib import sha256 +from typing import Any from typing import Callable +from typing import Dict from typing import Iterable from typing import Tuple from typing import Union @@ -11,6 +13,7 @@ from opwen_email_server.constants import mailbox from opwen_email_server.constants import sync from opwen_email_server.services.auth import AzureAuth +from opwen_email_server.services.auth import NoAuth from opwen_email_server.services.sendgrid import SendSendgridEmail from opwen_email_server.services.storage import AzureObjectsStorage from opwen_email_server.services.storage import AzureObjectStorage @@ -212,7 +215,8 @@ def _decode_attachments(cls, email: dict) -> dict: class ReceiveInboundEmail(_Action): - def __init__(self, auth: AzureAuth, raw_email_storage: AzureTextStorage, next_task: Callable[[str], None]): + def __init__(self, auth: Union[AzureAuth, NoAuth], raw_email_storage: AzureTextStorage, next_task: Callable[[str], + None]): self._auth = auth self._raw_email_storage = raw_email_storage @@ -238,6 +242,50 @@ def _new_email_id(cls, email: str) -> str: return sha256(email.encode('utf-8')).hexdigest() +class ProcessServiceEmail(_Action): + def __init__(self, + raw_email_storage: AzureTextStorage, + email_storage: AzureObjectStorage, + next_task: Callable[[str], None], + registry: Dict[str, Any], + email_parser: Callable[[dict], dict] = None): + + self._raw_email_storage = raw_email_storage + self._email_storage = email_storage + self._next_task = next_task + self._registry = registry + self._email_parser = email_parser or MimeEmailParser() + + def _action(self, resource_id): # type: ignore + try: + mime_email = self._raw_email_storage.fetch_text(resource_id) + except ObjectDoesNotExistError: + self.log_warning('Inbound email %s does not exist', resource_id) + return 'skipped', 202 + + email = self._email_parser(mime_email) + + for address in email.get('to', []): + try: + mailer_service = self._registry[address] + except KeyError: + self.log_warning('Skipping unknown mailer service: %s', address) + continue + + formatted_email = mailer_service(email) + + formatted_email_id = new_email_id(formatted_email) + formatted_email['_uid'] = formatted_email_id + + self._email_storage.store_email(formatted_email, formatted_email_id) + + self._next_task(formatted_email_id) + + self._raw_email_storage.delete(resource_id) + self.log_event(events.EMAILS_FORMATTED_FOR_CLIENT) # noqa: E501 # yapf: disable + return 'OK', 200 + + class DownloadClientEmails(_Action): def __init__(self, auth: AzureAuth, client_storage: AzureObjectsStorage, email_storage: AzureObjectStorage, pending_storage: AzureTextStorage): diff --git a/opwen_email_server/config.py b/opwen_email_server/config.py index 58f3a4c2..b1e538b1 100644 --- a/opwen_email_server/config.py +++ b/opwen_email_server/config.py @@ -34,6 +34,7 @@ REGISTER_CLIENT_QUEUE = f'register{resource_suffix}' INBOUND_STORE_QUEUE = f'inbound{resource_suffix}' WRITTEN_STORE_QUEUE = f'written{resource_suffix}' +PROCESS_SERVICE_QUEUE = f'service{resource_suffix}' SEND_QUEUE = f'send{resource_suffix}' MAILBOX_RECEIVED_QUEUE = f'mailboxreceived{resource_suffix}' MAILBOX_SENT_QUEUE = f'mailboxsent{resource_suffix}' diff --git a/opwen_email_server/constants/events.py b/opwen_email_server/constants/events.py index 3d91ae53..70aa94a8 100644 --- a/opwen_email_server/constants/events.py +++ b/opwen_email_server/constants/events.py @@ -11,6 +11,7 @@ MISSING_SCOPES = 'missing_scopes' # type: Final UNKNOWN_COMPRESSION_FORMAT = 'unknown_compression_format' # type: Final EMAILS_DELIVERED_TO_CLIENT = 'emails_delivered_to_client' # type: Final +EMAILS_FORMATTED_FOR_CLIENT = 'emails_formatted_for_client' # type: Final EMAILS_RECEIVED_FROM_CLIENT = 'emails_received_from_client' # type: Final EMAIL_RECEIVED_FOR_CLIENT = 'email_received_for_client' # type: Final EMAIL_DELIVERED_FROM_CLIENT = 'email_delivered_from_client' # type: Final diff --git a/opwen_email_server/integration/azure.py b/opwen_email_server/integration/azure.py index f7a06d6f..13e0105e 100644 --- a/opwen_email_server/integration/azure.py +++ b/opwen_email_server/integration/azure.py @@ -1,5 +1,6 @@ from opwen_email_server import config from opwen_email_server.services.auth import AzureAuth +from opwen_email_server.services.auth import NoAuth from opwen_email_server.services.storage import AzureFileStorage from opwen_email_server.services.storage import AzureObjectsStorage from opwen_email_server.services.storage import AzureObjectStorage @@ -7,6 +8,10 @@ from opwen_email_server.utils.collections import singleton +def get_no_auth() -> NoAuth: + return NoAuth() + + @singleton def get_auth() -> AzureAuth: return AzureAuth(storage=AzureObjectStorage( diff --git a/opwen_email_server/integration/celery.py b/opwen_email_server/integration/celery.py index 875df993..2c9ef56e 100644 --- a/opwen_email_server/integration/celery.py +++ b/opwen_email_server/integration/celery.py @@ -3,6 +3,7 @@ from opwen_email_server import config from opwen_email_server.actions import IndexReceivedEmailForMailbox from opwen_email_server.actions import IndexSentEmailForMailbox +from opwen_email_server.actions import ProcessServiceEmail from opwen_email_server.actions import RegisterClient from opwen_email_server.actions import SendOutboundEmails from opwen_email_server.actions import StoreInboundEmails @@ -14,6 +15,7 @@ from opwen_email_server.integration.azure import get_pending_storage from opwen_email_server.integration.azure import get_raw_email_storage from opwen_email_server.integration.azure import get_user_storage +from opwen_email_server.mailers import REGISTRY from opwen_email_server.services.dns import SetupMxRecords from opwen_email_server.services.sendgrid import SendSendgridEmail from opwen_email_server.services.sendgrid import SetupSendgridMailbox @@ -101,6 +103,18 @@ def send(resource_id: str) -> None: action(resource_id) +@celery.task(ignore_result=True) +def process_service_email(resource_id: str) -> None: + action = ProcessServiceEmail( + raw_email_storage=get_raw_email_storage(), + email_storage=get_email_storage(), + registry=REGISTRY, + next_task=send_and_index_email, + ) + + action(resource_id) + + def _fqn(task): return f'{__name__}.{task.__name__}' @@ -109,6 +123,7 @@ def _fqn(task): _fqn(register_client): {'queue': config.REGISTER_CLIENT_QUEUE}, _fqn(index_received_email_for_mailbox): {'queue': config.MAILBOX_RECEIVED_QUEUE}, _fqn(index_sent_email_for_mailbox): {'queue': config.MAILBOX_SENT_QUEUE}, + _fqn(process_service_email): {'queue': config.PROCESS_SERVICE_QUEUE}, _fqn(inbound_store): {'queue': config.INBOUND_STORE_QUEUE}, _fqn(written_store): {'queue': config.WRITTEN_STORE_QUEUE}, _fqn(send): {'queue': config.SEND_QUEUE} diff --git a/opwen_email_server/integration/connexion.py b/opwen_email_server/integration/connexion.py index 16aaac71..cddbfac3 100644 --- a/opwen_email_server/integration/connexion.py +++ b/opwen_email_server/integration/connexion.py @@ -13,10 +13,12 @@ from opwen_email_server.integration.azure import get_client_storage from opwen_email_server.integration.azure import get_email_storage from opwen_email_server.integration.azure import get_mailbox_storage +from opwen_email_server.integration.azure import get_no_auth from opwen_email_server.integration.azure import get_pending_storage from opwen_email_server.integration.azure import get_raw_email_storage from opwen_email_server.integration.azure import get_user_storage from opwen_email_server.integration.celery import inbound_store +from opwen_email_server.integration.celery import process_service_email from opwen_email_server.integration.celery import register_client from opwen_email_server.integration.celery import written_store from opwen_email_server.services.auth import AnyOfBasicAuth @@ -31,6 +33,12 @@ next_task=inbound_store.delay, ) +receive_service_email = ReceiveInboundEmail( + auth=get_no_auth(), + raw_email_storage=get_raw_email_storage(), + next_task=process_service_email, +) + client_write = UploadClientEmails( auth=get_auth(), next_task=written_store.delay, diff --git a/opwen_email_server/mailers/__init__.py b/opwen_email_server/mailers/__init__.py new file mode 100644 index 00000000..9a27f58f --- /dev/null +++ b/opwen_email_server/mailers/__init__.py @@ -0,0 +1,6 @@ +from opwen_email_server.mailers.echo import ECHO_ADDRESS +from opwen_email_server.mailers.echo import EchoEmailFormatter + +REGISTRY = { + ECHO_ADDRESS: EchoEmailFormatter(), +} diff --git a/opwen_email_server/mailers/echo.py b/opwen_email_server/mailers/echo.py new file mode 100644 index 00000000..ca2d6e7c --- /dev/null +++ b/opwen_email_server/mailers/echo.py @@ -0,0 +1,10 @@ +from opwen_email_server.utils.log import LogMixin + +ECHO_ADDRESS = 'echo@bot.lokole.ca' + + +class EchoEmailFormatter(LogMixin): + def __call__(self, email: dict) -> dict: + email['to'] = [email['from']] + email['from'] = ECHO_ADDRESS + return email diff --git a/opwen_email_server/services/auth.py b/opwen_email_server/services/auth.py index a896ab67..f6c8cae1 100644 --- a/opwen_email_server/services/auth.py +++ b/opwen_email_server/services/auth.py @@ -188,3 +188,11 @@ def _domain_file(cls, domain: str) -> str: @classmethod def _client_id_file(cls, client_id: str) -> str: return f'client_id/{client_id}' + + +class NoAuth(LogMixin): + def is_owner(self, domain: str, username: str) -> bool: + return True + + def domain_for(self, client_id: str) -> str: + return 'service' diff --git a/opwen_email_server/swagger/email-receive.yaml b/opwen_email_server/swagger/email-receive.yaml index b9d749c2..cbbab32b 100644 --- a/opwen_email_server/swagger/email-receive.yaml +++ b/opwen_email_server/swagger/email-receive.yaml @@ -36,6 +36,34 @@ paths: 403: description: Request from unregistered client. + /service: + + post: + operationId: opwen_email_server.integration.connexion.receive_service_email + summary: Webhook listening for request for services emails via Sendgrid. + consumes: + - multipart/form-data + parameters: + - $ref: '#/parameters/SendgridEmail' + - $ref: '#/parameters/SendgridDkim' + - $ref: '#/parameters/SendgridTo' + - $ref: '#/parameters/SendgridCc' + - $ref: '#/parameters/SendgridFrom' + - $ref: '#/parameters/SendgridText' + - $ref: '#/parameters/SendgridSenderIp' + - $ref: '#/parameters/SendgridSpamReport' + - $ref: '#/parameters/SendgridEnvelope' + - $ref: '#/parameters/SendgridSubject' + - $ref: '#/parameters/SendgridSpamScore' + - $ref: '#/parameters/SendgridCharsets' + - $ref: '#/parameters/SendgridSpf' + responses: + 200: + description: The email was succesfully received. No need to retry it. + 403: + description: Error in receiving email. + + parameters: ClientId: diff --git a/tests/opwen_email_server/test_actions.py b/tests/opwen_email_server/test_actions.py index 7c697cde..ccd85c8b 100644 --- a/tests/opwen_email_server/test_actions.py +++ b/tests/opwen_email_server/test_actions.py @@ -318,6 +318,47 @@ def _execute_action(self, *args, **kwargs): return action(*args, **kwargs) +class ProcessServiceEmailTests(TestCase): + def setUp(self): + self.raw_email_storage = Mock() + self.email_storage = Mock() + self.next_task = MagicMock() + self.registry = {'service@lokole.ca': (lambda email: email)} + self.email_parser = MagicMock() + + def test_202(self): + resource_id = 'eb93fde9-0cc6-4339-b7d6-f6e838e78f1c' + self.raw_email_storage.fetch_text.side_effect = throw(ObjectDoesNotExistError(None, None, None)) + + _, status = self._execute_action(resource_id) + self.assertEqual(status, 202) + + def test_200(self): + resource_id = 'eb93fde9-0cc6-4339-b7d6-f6e838e78f1c' + email = 'some-mime' + parsed_email = { + 'to': ['service@lokole.ca', 'foo@test.com'], 'from': 'user@lokole.ca', 'sent_at': '2020-02-01 21:17' + } + self.raw_email_storage.return_value = email + self.email_parser.return_value = parsed_email + + _, status = self._execute_action(resource_id) + self.assertEqual(status, 200) + self.email_storage.store_email.assert_called_once() + self.next_task.assert_called_once() + + def _execute_action(self, *args, **kwargs): + action = actions.ProcessServiceEmail( + raw_email_storage=self.raw_email_storage, + email_storage=self.email_storage, + next_task=self.next_task, + registry=self.registry, + email_parser=self.email_parser, + ) + + return action(*args, **kwargs) + + class DownloadClientEmailsTests(TestCase): def setUp(self): self.auth = Mock()