diff brinksmoney.py @ 0:72fab2710469 default tip

Import
author Daniele Nicolodi <daniele@grinta.net>
date Fri, 05 Aug 2016 23:16:31 -0600
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/brinksmoney.py	Fri Aug 05 23:16:31 2016 -0600
@@ -0,0 +1,283 @@
+import email
+import imp
+import itertools
+import json
+import os.path
+import smtplib
+import smtplib
+import sqlite3
+import sqlite3
+import subprocess
+
+import re
+import csv
+
+from contextlib import contextmanager
+from datetime import datetime, date, timedelta
+from email.mime.text import MIMEText
+from email.utils import format_datetime, localtime, parseaddr
+from base64 import b32encode
+from hashlib import sha1
+from urllib.parse import urljoin
+
+import click
+import requests
+
+
+URL = 'https://www.brinksmoney.com/'
+
+
+# transactions table row template
+TRANSACTION = """{0.shortid:}  {0.date:%d-%m-%Y}  {0.description:57s}  {0.amount:+8.2f}"""
+
+# transactions table header
+HEADER = """{:14s}  {:10s}  {:57s}  {:>8s}""".format('Id', 'Date', 'Description', 'Amount')
+
+# transactions table footer
+FOOTER = """{:14s}  {:10s}  {:57s}  {{balance:8.2f}}""".format('', '', 'BALANCE')
+
+# transactions table horizontal separator
+SEP = """-""" * len(HEADER)
+
+
+# GPG encrypted text is ascii and as such does not require encoding
+# but its decrypted form is utf-8 and therefore the charset header
+# must be set accordingly. define an appropriate charset object
+email.charset.add_charset('utf8 7bit', header_enc=email.charset.SHORTEST,
+                          body_enc=None, output_charset='utf-8')
+
+
+def prevmonth(d):
+    year = d.year
+    month = d.month - 1
+    if month < 1:
+        year = yeat - 1
+    return date(year, month, 1)
+
+
+class Mailer:
+    def __init__(self, host='localhost', port=25, starttls=True, username=None, password=None):
+        self.host = host
+        self.port = port
+        self.starttls = starttls
+        self.username = username
+        self.password = password
+
+    @contextmanager
+    def connect(self):
+        smtp = smtplib.SMTP(self.host, self.port)
+        if self.starttls:
+            smtp.starttls()
+        if self.username:
+            smtp.login(self.username, self.password)
+        yield smtp
+        smtp.quit()
+
+    def send(self, message, fromaddr=None, toaddr=None):
+        if not fromaddr:
+            fromaddr = message['From']
+        if not toaddr:
+            toaddr = message['To']
+        with self.connect() as conn:
+            conn.sendmail(fromaddr, toaddr, str(message))
+
+
+class GPG:
+    def __init__(self, homedir):
+        self.homedir = homedir
+
+    def encrypt(self, message, sender, recipient):
+        sender = parseaddr(sender)[1]
+        recipient = parseaddr(recipient)[1]
+        cmd = [ "gpg2", "--homedir", self.homedir, "--sign", "--encrypt",
+                "--batch", "--no-options", "--yes", "--armor",
+                "--local-user", sender, "--recipient", recipient, ]
+        p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        encdata, err = p.communicate(input=message.encode('utf-8'))
+        if p.returncode:
+            raise RuntimeError(p.returncode, err)
+        return encdata.decode('ascii')
+
+
+def normalize(s):
+    s = s.strip()    
+    m = re.match(r'Debit: (Signature|PIN) purchase from \d*\s+(.*)', s)
+    if m:
+        return m.group(2)
+    m = re.match(r'Debit: ATM Cash Withdrawal at \d*\s+(.*)', s)
+    if m:
+        return 'ATM ' + m.group(1)
+    m = re.match(r'Debit: (ATM Cash Withdrawal Fee .*)', s)
+    if m:
+        return m.group(1).upper()
+    m = re.match(r'Credit: Direct Deposit from (.+) for (.+)', s)
+    if m:
+        return m.group(2) + ' ' + m.group(1)
+    return s
+
+
+class Transaction(object):    
+    __slots__ = 'id', 'shortid', 'date', 'description', 'amount', 'balance'
+
+    def __init__(self, date, description, amount, balance):
+        r = repr((date, description, amount, balance))
+        self.id = b32encode(sha1(r.encode('utf8')).digest()).decode()
+        self.shortid = self.id[-14:]
+        self.date = date
+        self.description = description
+        self.amount = amount
+        self.balance = balance
+
+    @classmethod
+    def fromcsv(cls, x):        
+        if x[2]:
+            amount = -float(re.sub('[^\d.]', '', x[2]))
+        if x[3]:
+            amount = +float(re.sub('[^\d.]', '', x[3]))
+        data = {'date': datetime.strptime(x[0], '%m/%d/%Y %I:%M%p'),
+                'description': normalize(x[1]),
+                'amount': amount,
+                'balance': float(re.sub('[^\d.]', '', x[4]))}
+        return cls(**data)
+
+    def __str__(self):
+        return TRANSACTION.format(self)
+
+
+class BRINKSMoney(object):
+    def __init__(self):
+        self.session = requests.Session()
+
+    def login(self, username, password):
+        url = urljoin(URL, 'account/authenticate.m')
+        
+        # acquire session id
+        r = self.session.get(url)
+        r.raise_for_status()
+
+        # credentials
+        data = { 'blackBox': '',
+                 'identifier': username,
+                 'secret': password,
+                 'type': 'pwd',
+                 'login': 'Log In' }
+        r = self.session.post(url, data=data)
+        r.raise_for_status()
+
+    def transactions(self, year, month):
+        url = urljoin(URL, 'account/acctHistory.m')
+
+        months = ('Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
+                  'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec')
+
+        datestr = '{} {}'.format(months[month - 1], year)
+
+        params = { 'selectedAcct': 'CARD',
+                   'CSV': 'true',
+                   'selectedMonth': datestr }
+
+        r = self.session.get(url, params=params)
+        r.raise_for_status()
+
+        lines = r.text.splitlines()
+
+        # skip header and footer
+        lines = lines[10:-9]
+
+        rows = csv.reader(lines, doublequote=False, strict=True)
+        transactions = [ Transaction.fromcsv(row) for row in rows ]
+
+        return transactions
+
+
+def transactions(conf):
+    db = sqlite3.connect(conf['DATABASE'])
+    db.execute('''CREATE TABLE IF NOT EXISTS transactions (id TEXT PRIMARY KEY)''')
+
+    sendmail = Mailer(host=conf['SMTPHOST'],
+                      port=conf['SMTPPORT'],
+                      starttls=conf['SMTPSTARTTLS'],
+                      username=conf['SMTPUSER'],
+                      password=conf['SMTPPASSWD']).send
+
+    encrypt = GPG(conf['GNUPGHOME']).encrypt
+
+    remote = BRINKSMoney()
+    remote.login(conf['USERNAME'], conf['PASSWORD'])
+
+    this = date.today().replace(day=1)
+    prev = prevmonth(this)
+    
+    recent = ( remote.transactions(this.year, this.month) +
+               remote.transactions(prev.year, prev.month) )
+
+    recent.sort(key=lambda x: x.date)
+               
+    balance = recent[-1].balance
+    
+    curs = db.cursor()
+    unseen = []
+    for t in recent:
+        curs.execute('''SELECT COUNT(*) FROM transactions WHERE id = ?''', (t.id, ))
+        if not curs.fetchone()[0]:
+            # not seen before
+            unseen.append(t)
+
+    if unseen:
+        lines = []
+        lines.append(HEADER)
+        lines.append(SEP)
+        for t in unseen:
+            lines.append(str(t))
+        lines.append(SEP)
+        lines.append(FOOTER.format(balance=balance))
+
+        text = '\n'.join(lines)
+        payload = encrypt(text, conf['MAILFROM'], conf['MAILTO'])
+
+        message = MIMEText(payload, _charset='utf8 7bit')
+        message['Subject'] = 'BRINKS Money Account update'
+        message['From'] = conf['MAILFROM']
+        message['To'] = conf['MAILTO']
+        message['Date'] = format_datetime(localtime())
+
+        sendmail(message)
+
+    curs.executemany('''INSERT INTO transactions (id) VALUES (?)''', ((x.id, ) for x in unseen))
+    db.commit()
+
+
+def loadconf(filename):
+    module = imp.new_module('conf')
+    module.__file__ = filename
+    with open(filename) as fd:
+        exec(compile(fd.read(), filename, 'exec'), module.__dict__)
+    conf = {}
+    for key in dir(module):
+        if key.isupper():
+            conf[key] = getattr(module, key)
+
+    conf['DATADIR'] = os.path.dirname(filename)
+    for key in 'DATABASE', 'GNUPGHOME':
+        # if path is not absolute, it is interpreted as relative
+        # to the location of the configuration file
+        if not os.path.isabs(conf[key]):
+            conf[key] = os.path.join(conf['DATADIR'], conf[key])
+
+    return conf
+
+            
+@click.command()
+@click.argument('conffile')
+@click.option('--verbose', is_flag=True, help='Verbose output.')
+def main(conffile, verbose):
+
+    conf = loadconf(conffile)
+    if verbose:
+        conf['VERBOSE'] = True
+
+    transactions(conf)
+
+
+if __name__ == '__main__':
+    main()