changeset 85:ff2da2018071

Add email sending utility.
author Daniele Nicolodi <daniele@grinta.net>
date Sun, 21 Aug 2011 16:10:23 +0200
parents 5a3c91771914
children 5c567edc0e6c
files src/ltpdarepo/config.py src/ltpdarepo/mail.py src/ltpdarepo/tests/test_mail.py
diffstat 3 files changed, 162 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/src/ltpdarepo/config.py	Sun Aug 21 16:10:23 2011 +0200
+++ b/src/ltpdarepo/config.py	Sun Aug 21 16:10:23 2011 +0200
@@ -6,3 +6,13 @@
 USERNAME = 'root'
 PASSWORD = ''
 DATABASE = 'ltpda'
+
+# mailserver
+MAIL_SMTP_SERVER = 'localhost'
+MAIL_SMTP_PORT = 25
+MAIL_SMTP_USETLS = False
+MAIL_SMTP_USERNAME = ''
+MAIL_SMTP_PASSWORD = ''
+
+# administrator email
+ADMIN_EMAIL_ADDR = ''
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/ltpdarepo/mail.py	Sun Aug 21 16:10:23 2011 +0200
@@ -0,0 +1,86 @@
+import smtplib
+from email.utils import parseaddr, formataddr
+from collections import namedtuple
+from contextlib import contextmanager
+
+from flask import _request_ctx_stack, request, current_app as app
+
+Message = namedtuple('Message', ('fromaddr', 'toaddr', 'message'))
+
+
+class DUMMYTransport(object):
+    def __init__(self, app):
+        self.isdummy = True
+
+    @contextmanager
+    def connect(self):
+        yield Dummy()
+
+
+class Dummy(object):
+    def __init__(self):
+        self.sentmail = []
+        ctx = _request_ctx_stack.top
+        if ctx is not None:
+            if not hasattr(ctx, 'sentmail'):
+                ctx.sentmail = []
+            self.sentmail = ctx.sentmail
+
+    def sendmail(self, fromaddr, toaddr, message):
+        self.sentmail.append(Message(fromaddr, toaddr, message))
+
+
+class SMTPTransport(object):
+    def __init__(self, app):
+        self.server   = app.config.get('MAIL_SMTP_SERVER', 'localhost')
+        self.port     = app.config.get('MAIL_SMTP_PORT', 25)
+        self.usetls   = app.config.get('MAIL_SMTP_USETLS', False)
+        self.username = app.config.get('MAIL_SMTP_USERNAME')
+        self.password = app.config.get('MAIL_SMTP_PASSWORD')
+        self.debug    = app.config.get('MAIL_SMTP_DEBUG')
+
+    @contextmanager
+    def connect(self):
+        smtp = smtplib.SMTP(self.server, self.port)
+        if self.debug:
+            smtp.set_debuglevel(1)
+        if self.usetls:
+            smtp.starttls()
+        if self.username:
+            smtp.login(self.username, self.password)
+        yield smtp
+        smtp.quit()
+
+
+class Mailer(object):
+    def __init__(self, transport=None, testing=False):
+        self.app = app
+        if transport is None:
+            transport = SMTPTransport(app)
+            if testing or app.config.get('TESTING'):
+                transport = DUMMYTransport(app)
+        self.transport = transport
+
+    def send(self, message, fromaddr=None, toaddr=None):
+        if not fromaddr:
+            fromaddr = message['From']
+        if not toaddr:
+            toaddr = message['To']
+        with self.transport.connect() as conn:
+            conn.sendmail(fromaddr, toaddr, str(message))
+
+    @property
+    def admin_email_addr(self):
+        addr = self.app.config.get('ADMIN_EMAIL_ADDR')
+        if not addr:
+            addr = 'noreply@' + request.host
+        name, addr = parseaddr(addr)
+        if not name:
+            name = 'LTPDA Repository Administrator'
+        return formataddr((name, addr))
+
+    @property
+    def sentmail(self):
+        ctx = _request_ctx_stack.top
+        if ctx is not None:
+            return getattr(ctx, 'sentmail')
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/ltpdarepo/tests/test_mail.py	Sun Aug 21 16:10:23 2011 +0200
@@ -0,0 +1,66 @@
+from email.mime.text import MIMEText
+
+from ltpdarepo.tests.utils import RequestContextTestCase
+
+
+class RequestTestCase(RequestContextTestCase):
+
+    @classmethod
+    def setUpClass(self):
+        from ltpdarepo.admin import wipe, install
+        wipe()
+        install()
+
+    @classmethod
+    def tearDownClass(self):
+        from ltpdarepo.admin import wipe
+        wipe()
+
+    def test_admin_email_addr(self):
+        from ltpdarepo.mail import Mailer
+        mailer = Mailer()
+        self.assertIn('LTPDA Repository Administrator', mailer.admin_email_addr)        
+
+    def test_mailer_send(self):
+        from ltpdarepo.mail import Mailer
+        mailer = Mailer()
+
+        # check that the dummy tranport is being used
+        self.assertTrue(mailer.transport.isdummy)
+
+        # compose message
+        message = MIMEText(u'Hello!')
+        message['Subject'] = 'Hello'
+        message['From'] = fromaddress = 'From Address <from@example.net>'
+        message['To'] = toaddress = 'To Address <to@exmple.net>'
+
+        # send
+        mailer.send(message)
+
+        # check
+        sent = mailer.sentmail[0]
+        self.assertEqual(sent.fromaddr, fromaddress)
+        self.assertEqual(sent.toaddr, toaddress)
+        self.assertIn('Subject: Hello', sent.message)
+        self.assertIn('Hello!', sent.message)
+
+    def test_mailer_send_multiple(self):
+        from ltpdarepo.mail import Mailer
+        mailer = Mailer()
+
+        # compose message
+        message = MIMEText(u'Hello!')
+        message['Subject'] = 'Hello'
+        message['From'] = 'From Address <from@example.net>'
+        message['To'] = 'To Address <to@exmple.net>'
+
+        # send
+        mailer.send(message)
+        mailer.send(message)
+
+        # send from another mailer instance
+        mailer = Mailer()
+        mailer.send(message)
+
+        # check that we collected all messages
+        self.assertEqual(len(mailer.sentmail), 3)