Skip to content

Commit

Permalink
Add template for email services (#324)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamsclafani authored Apr 29, 2020
1 parent 1df9dad commit d05c0b2
Show file tree
Hide file tree
Showing 11 changed files with 172 additions and 1 deletion.
50 changes: 49 additions & 1 deletion opwen_email_server/actions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from abc import ABC
from hashlib import sha256
from typing import Any
from typing import Callable
from typing import Dict
from typing import Iterable
from typing import Tuple
from typing import Union
Expand All @@ -11,6 +13,7 @@
from opwen_email_server.constants import mailbox
from opwen_email_server.constants import sync
from opwen_email_server.services.auth import AzureAuth
from opwen_email_server.services.auth import NoAuth
from opwen_email_server.services.sendgrid import SendSendgridEmail
from opwen_email_server.services.storage import AzureObjectsStorage
from opwen_email_server.services.storage import AzureObjectStorage
Expand Down Expand Up @@ -212,7 +215,8 @@ def _decode_attachments(cls, email: dict) -> dict:


class ReceiveInboundEmail(_Action):
def __init__(self, auth: AzureAuth, raw_email_storage: AzureTextStorage, next_task: Callable[[str], None]):
def __init__(self, auth: Union[AzureAuth, NoAuth], raw_email_storage: AzureTextStorage, next_task: Callable[[str],
None]):

self._auth = auth
self._raw_email_storage = raw_email_storage
Expand All @@ -238,6 +242,50 @@ def _new_email_id(cls, email: str) -> str:
return sha256(email.encode('utf-8')).hexdigest()


class ProcessServiceEmail(_Action):
def __init__(self,
raw_email_storage: AzureTextStorage,
email_storage: AzureObjectStorage,
next_task: Callable[[str], None],
registry: Dict[str, Any],
email_parser: Callable[[dict], dict] = None):

self._raw_email_storage = raw_email_storage
self._email_storage = email_storage
self._next_task = next_task
self._registry = registry
self._email_parser = email_parser or MimeEmailParser()

def _action(self, resource_id): # type: ignore
try:
mime_email = self._raw_email_storage.fetch_text(resource_id)
except ObjectDoesNotExistError:
self.log_warning('Inbound email %s does not exist', resource_id)
return 'skipped', 202

email = self._email_parser(mime_email)

for address in email.get('to', []):
try:
mailer_service = self._registry[address]
except KeyError:
self.log_warning('Skipping unknown mailer service: %s', address)
continue

formatted_email = mailer_service(email)

formatted_email_id = new_email_id(formatted_email)
formatted_email['_uid'] = formatted_email_id

self._email_storage.store_email(formatted_email, formatted_email_id)

self._next_task(formatted_email_id)

self._raw_email_storage.delete(resource_id)
self.log_event(events.EMAILS_FORMATTED_FOR_CLIENT) # noqa: E501 # yapf: disable
return 'OK', 200


class DownloadClientEmails(_Action):
def __init__(self, auth: AzureAuth, client_storage: AzureObjectsStorage, email_storage: AzureObjectStorage,
pending_storage: AzureTextStorage):
Expand Down
1 change: 1 addition & 0 deletions opwen_email_server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
REGISTER_CLIENT_QUEUE = f'register{resource_suffix}'
INBOUND_STORE_QUEUE = f'inbound{resource_suffix}'
WRITTEN_STORE_QUEUE = f'written{resource_suffix}'
PROCESS_SERVICE_QUEUE = f'service{resource_suffix}'
SEND_QUEUE = f'send{resource_suffix}'
MAILBOX_RECEIVED_QUEUE = f'mailboxreceived{resource_suffix}'
MAILBOX_SENT_QUEUE = f'mailboxsent{resource_suffix}'
Expand Down
1 change: 1 addition & 0 deletions opwen_email_server/constants/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
MISSING_SCOPES = 'missing_scopes' # type: Final
UNKNOWN_COMPRESSION_FORMAT = 'unknown_compression_format' # type: Final
EMAILS_DELIVERED_TO_CLIENT = 'emails_delivered_to_client' # type: Final
EMAILS_FORMATTED_FOR_CLIENT = 'emails_formatted_for_client' # type: Final
EMAILS_RECEIVED_FROM_CLIENT = 'emails_received_from_client' # type: Final
EMAIL_RECEIVED_FOR_CLIENT = 'email_received_for_client' # type: Final
EMAIL_DELIVERED_FROM_CLIENT = 'email_delivered_from_client' # type: Final
Expand Down
5 changes: 5 additions & 0 deletions opwen_email_server/integration/azure.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
from opwen_email_server import config
from opwen_email_server.services.auth import AzureAuth
from opwen_email_server.services.auth import NoAuth
from opwen_email_server.services.storage import AzureFileStorage
from opwen_email_server.services.storage import AzureObjectsStorage
from opwen_email_server.services.storage import AzureObjectStorage
from opwen_email_server.services.storage import AzureTextStorage
from opwen_email_server.utils.collections import singleton


def get_no_auth() -> NoAuth:
return NoAuth()


@singleton
def get_auth() -> AzureAuth:
return AzureAuth(storage=AzureObjectStorage(
Expand Down
15 changes: 15 additions & 0 deletions opwen_email_server/integration/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from opwen_email_server import config
from opwen_email_server.actions import IndexReceivedEmailForMailbox
from opwen_email_server.actions import IndexSentEmailForMailbox
from opwen_email_server.actions import ProcessServiceEmail
from opwen_email_server.actions import RegisterClient
from opwen_email_server.actions import SendOutboundEmails
from opwen_email_server.actions import StoreInboundEmails
Expand All @@ -14,6 +15,7 @@
from opwen_email_server.integration.azure import get_pending_storage
from opwen_email_server.integration.azure import get_raw_email_storage
from opwen_email_server.integration.azure import get_user_storage
from opwen_email_server.mailers import REGISTRY
from opwen_email_server.services.dns import SetupMxRecords
from opwen_email_server.services.sendgrid import SendSendgridEmail
from opwen_email_server.services.sendgrid import SetupSendgridMailbox
Expand Down Expand Up @@ -101,6 +103,18 @@ def send(resource_id: str) -> None:
action(resource_id)


@celery.task(ignore_result=True)
def process_service_email(resource_id: str) -> None:
action = ProcessServiceEmail(
raw_email_storage=get_raw_email_storage(),
email_storage=get_email_storage(),
registry=REGISTRY,
next_task=send_and_index_email,
)

action(resource_id)


def _fqn(task):
return f'{__name__}.{task.__name__}'

Expand All @@ -109,6 +123,7 @@ def _fqn(task):
_fqn(register_client): {'queue': config.REGISTER_CLIENT_QUEUE},
_fqn(index_received_email_for_mailbox): {'queue': config.MAILBOX_RECEIVED_QUEUE},
_fqn(index_sent_email_for_mailbox): {'queue': config.MAILBOX_SENT_QUEUE},
_fqn(process_service_email): {'queue': config.PROCESS_SERVICE_QUEUE},
_fqn(inbound_store): {'queue': config.INBOUND_STORE_QUEUE},
_fqn(written_store): {'queue': config.WRITTEN_STORE_QUEUE},
_fqn(send): {'queue': config.SEND_QUEUE}
Expand Down
8 changes: 8 additions & 0 deletions opwen_email_server/integration/connexion.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
from opwen_email_server.integration.azure import get_client_storage
from opwen_email_server.integration.azure import get_email_storage
from opwen_email_server.integration.azure import get_mailbox_storage
from opwen_email_server.integration.azure import get_no_auth
from opwen_email_server.integration.azure import get_pending_storage
from opwen_email_server.integration.azure import get_raw_email_storage
from opwen_email_server.integration.azure import get_user_storage
from opwen_email_server.integration.celery import inbound_store
from opwen_email_server.integration.celery import process_service_email
from opwen_email_server.integration.celery import register_client
from opwen_email_server.integration.celery import written_store
from opwen_email_server.services.auth import AnyOfBasicAuth
Expand All @@ -31,6 +33,12 @@
next_task=inbound_store.delay,
)

receive_service_email = ReceiveInboundEmail(
auth=get_no_auth(),
raw_email_storage=get_raw_email_storage(),
next_task=process_service_email,
)

client_write = UploadClientEmails(
auth=get_auth(),
next_task=written_store.delay,
Expand Down
6 changes: 6 additions & 0 deletions opwen_email_server/mailers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from opwen_email_server.mailers.echo import ECHO_ADDRESS
from opwen_email_server.mailers.echo import EchoEmailFormatter

REGISTRY = {
ECHO_ADDRESS: EchoEmailFormatter(),
}
10 changes: 10 additions & 0 deletions opwen_email_server/mailers/echo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from opwen_email_server.utils.log import LogMixin

ECHO_ADDRESS = 'echo@bot.lokole.ca'


class EchoEmailFormatter(LogMixin):
def __call__(self, email: dict) -> dict:
email['to'] = [email['from']]
email['from'] = ECHO_ADDRESS
return email
8 changes: 8 additions & 0 deletions opwen_email_server/services/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,11 @@ def _domain_file(cls, domain: str) -> str:
@classmethod
def _client_id_file(cls, client_id: str) -> str:
return f'client_id/{client_id}'


class NoAuth(LogMixin):
def is_owner(self, domain: str, username: str) -> bool:
return True

def domain_for(self, client_id: str) -> str:
return 'service'
28 changes: 28 additions & 0 deletions opwen_email_server/swagger/email-receive.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,34 @@ paths:
403:
description: Request from unregistered client.

/service:

post:
operationId: opwen_email_server.integration.connexion.receive_service_email
summary: Webhook listening for request for services emails via Sendgrid.
consumes:
- multipart/form-data
parameters:
- $ref: '#/parameters/SendgridEmail'
- $ref: '#/parameters/SendgridDkim'
- $ref: '#/parameters/SendgridTo'
- $ref: '#/parameters/SendgridCc'
- $ref: '#/parameters/SendgridFrom'
- $ref: '#/parameters/SendgridText'
- $ref: '#/parameters/SendgridSenderIp'
- $ref: '#/parameters/SendgridSpamReport'
- $ref: '#/parameters/SendgridEnvelope'
- $ref: '#/parameters/SendgridSubject'
- $ref: '#/parameters/SendgridSpamScore'
- $ref: '#/parameters/SendgridCharsets'
- $ref: '#/parameters/SendgridSpf'
responses:
200:
description: The email was succesfully received. No need to retry it.
403:
description: Error in receiving email.


parameters:

ClientId:
Expand Down
41 changes: 41 additions & 0 deletions tests/opwen_email_server/test_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,47 @@ def _execute_action(self, *args, **kwargs):
return action(*args, **kwargs)


class ProcessServiceEmailTests(TestCase):
def setUp(self):
self.raw_email_storage = Mock()
self.email_storage = Mock()
self.next_task = MagicMock()
self.registry = {'service@lokole.ca': (lambda email: email)}
self.email_parser = MagicMock()

def test_202(self):
resource_id = 'eb93fde9-0cc6-4339-b7d6-f6e838e78f1c'
self.raw_email_storage.fetch_text.side_effect = throw(ObjectDoesNotExistError(None, None, None))

_, status = self._execute_action(resource_id)
self.assertEqual(status, 202)

def test_200(self):
resource_id = 'eb93fde9-0cc6-4339-b7d6-f6e838e78f1c'
email = 'some-mime'
parsed_email = {
'to': ['service@lokole.ca', 'foo@test.com'], 'from': 'user@lokole.ca', 'sent_at': '2020-02-01 21:17'
}
self.raw_email_storage.return_value = email
self.email_parser.return_value = parsed_email

_, status = self._execute_action(resource_id)
self.assertEqual(status, 200)
self.email_storage.store_email.assert_called_once()
self.next_task.assert_called_once()

def _execute_action(self, *args, **kwargs):
action = actions.ProcessServiceEmail(
raw_email_storage=self.raw_email_storage,
email_storage=self.email_storage,
next_task=self.next_task,
registry=self.registry,
email_parser=self.email_parser,
)

return action(*args, **kwargs)


class DownloadClientEmailsTests(TestCase):
def setUp(self):
self.auth = Mock()
Expand Down

0 comments on commit d05c0b2

Please sign in to comment.