# HG changeset patch # User Daniele Nicolodi # Date 1313935823 -7200 # Node ID ff2da2018071b6673d9a4a21810a7bb364dadb93 # Parent 5a3c9177191453f461c90cf97887dc441f71f6c5 Add email sending utility. diff -r 5a3c91771914 -r ff2da2018071 src/ltpdarepo/config.py --- a/src/ltpdarepo/config.py Sun Aug 21 16:10:23 2011 +0200 +++ b/src/ltpdarepo/config.py Sun Aug 21 16:10:23 2011 +0200 @@ -6,3 +6,13 @@ USERNAME = 'root' PASSWORD = '' DATABASE = 'ltpda' + +# mailserver +MAIL_SMTP_SERVER = 'localhost' +MAIL_SMTP_PORT = 25 +MAIL_SMTP_USETLS = False +MAIL_SMTP_USERNAME = '' +MAIL_SMTP_PASSWORD = '' + +# administrator email +ADMIN_EMAIL_ADDR = '' diff -r 5a3c91771914 -r ff2da2018071 src/ltpdarepo/mail.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/ltpdarepo/mail.py Sun Aug 21 16:10:23 2011 +0200 @@ -0,0 +1,86 @@ +import smtplib +from email.utils import parseaddr, formataddr +from collections import namedtuple +from contextlib import contextmanager + +from flask import _request_ctx_stack, request, current_app as app + +Message = namedtuple('Message', ('fromaddr', 'toaddr', 'message')) + + +class DUMMYTransport(object): + def __init__(self, app): + self.isdummy = True + + @contextmanager + def connect(self): + yield Dummy() + + +class Dummy(object): + def __init__(self): + self.sentmail = [] + ctx = _request_ctx_stack.top + if ctx is not None: + if not hasattr(ctx, 'sentmail'): + ctx.sentmail = [] + self.sentmail = ctx.sentmail + + def sendmail(self, fromaddr, toaddr, message): + self.sentmail.append(Message(fromaddr, toaddr, message)) + + +class SMTPTransport(object): + def __init__(self, app): + self.server = app.config.get('MAIL_SMTP_SERVER', 'localhost') + self.port = app.config.get('MAIL_SMTP_PORT', 25) + self.usetls = app.config.get('MAIL_SMTP_USETLS', False) + self.username = app.config.get('MAIL_SMTP_USERNAME') + self.password = app.config.get('MAIL_SMTP_PASSWORD') + self.debug = app.config.get('MAIL_SMTP_DEBUG') + + @contextmanager + def connect(self): + smtp = smtplib.SMTP(self.server, self.port) + if self.debug: + smtp.set_debuglevel(1) + if self.usetls: + smtp.starttls() + if self.username: + smtp.login(self.username, self.password) + yield smtp + smtp.quit() + + +class Mailer(object): + def __init__(self, transport=None, testing=False): + self.app = app + if transport is None: + transport = SMTPTransport(app) + if testing or app.config.get('TESTING'): + transport = DUMMYTransport(app) + self.transport = transport + + def send(self, message, fromaddr=None, toaddr=None): + if not fromaddr: + fromaddr = message['From'] + if not toaddr: + toaddr = message['To'] + with self.transport.connect() as conn: + conn.sendmail(fromaddr, toaddr, str(message)) + + @property + def admin_email_addr(self): + addr = self.app.config.get('ADMIN_EMAIL_ADDR') + if not addr: + addr = 'noreply@' + request.host + name, addr = parseaddr(addr) + if not name: + name = 'LTPDA Repository Administrator' + return formataddr((name, addr)) + + @property + def sentmail(self): + ctx = _request_ctx_stack.top + if ctx is not None: + return getattr(ctx, 'sentmail') diff -r 5a3c91771914 -r ff2da2018071 src/ltpdarepo/tests/test_mail.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/ltpdarepo/tests/test_mail.py Sun Aug 21 16:10:23 2011 +0200 @@ -0,0 +1,66 @@ +from email.mime.text import MIMEText + +from ltpdarepo.tests.utils import RequestContextTestCase + + +class RequestTestCase(RequestContextTestCase): + + @classmethod + def setUpClass(self): + from ltpdarepo.admin import wipe, install + wipe() + install() + + @classmethod + def tearDownClass(self): + from ltpdarepo.admin import wipe + wipe() + + def test_admin_email_addr(self): + from ltpdarepo.mail import Mailer + mailer = Mailer() + self.assertIn('LTPDA Repository Administrator', mailer.admin_email_addr) + + def test_mailer_send(self): + from ltpdarepo.mail import Mailer + mailer = Mailer() + + # check that the dummy tranport is being used + self.assertTrue(mailer.transport.isdummy) + + # compose message + message = MIMEText(u'Hello!') + message['Subject'] = 'Hello' + message['From'] = fromaddress = 'From Address ' + message['To'] = toaddress = 'To Address ' + + # send + mailer.send(message) + + # check + sent = mailer.sentmail[0] + self.assertEqual(sent.fromaddr, fromaddress) + self.assertEqual(sent.toaddr, toaddress) + self.assertIn('Subject: Hello', sent.message) + self.assertIn('Hello!', sent.message) + + def test_mailer_send_multiple(self): + from ltpdarepo.mail import Mailer + mailer = Mailer() + + # compose message + message = MIMEText(u'Hello!') + message['Subject'] = 'Hello' + message['From'] = 'From Address ' + message['To'] = 'To Address ' + + # send + mailer.send(message) + mailer.send(message) + + # send from another mailer instance + mailer = Mailer() + mailer.send(message) + + # check that we collected all messages + self.assertEqual(len(mailer.sentmail), 3)