view bnpparibas.py @ 12:4747393db602

Decode messages send time and provide a nicer representation in emails
author Daniele Nicolodi <daniele@grinta.net>
date Mon, 11 Jan 2016 21:15:53 +0100
parents f3d6d73a7184
children 37ce0dc68cad
line wrap: on
line source

import cgi
import email
import imp
import itertools
import json
import os.path
import requests
import smtplib
import sqlite3
import subprocess
import sys
import textwrap
import time

from contextlib import contextmanager
from datetime import datetime
from email.mime.text import MIMEText
from email.utils import format_datetime, localtime, parseaddr
from io import BytesIO
from pprint import pprint
from urllib.parse import urljoin

from html2text import HTML2Text
from PIL import Image


URL = 'https://mabanque.bnpparibas/'

# message template
MESSAGE = """\
From: {message.sender:}
Subject: {message.subject:}
Date: {message.date:%a, %d %b %Y %H:%M:%S}

{message.body:}
"""

# transactions table row template
TRANSACTION = """{xact.id:14d}  {xact.date:10s}  {xact.description:54s}  {xact.amount:+8.2f}"""

# transactions table header
HEADER = """{:14s}  {:10s}  {:54s}  {:>8s}""".format('Id', 'Date', 'Description', 'Amount')

# transactions table footer
FOOTER = """{:14s}  {:10s}  {:54s}  {{balance:8.2f}}""".format('', '', 'BALANCE')

# transactions table horizontal separator
SEP = """-""" * len(HEADER)


# GPG encrypted text is ascii and as such does not require encoding
# but its decrypted form is utf-8 and therefore the charset header
# must be set accordingly. define an appropriate charset object
email.charset.add_charset('utf8 7bit', header_enc=email.charset.SHORTEST,
                          body_enc=None, output_charset='utf-8')


def loadconf(filename):
    module = imp.new_module('conf')
    module.__file__ = filename
    with open(filename) as fd:
        exec(compile(fd.read(), filename, 'exec'), module.__dict__)
    conf = {}
    for key in dir(module):
        if key.isupper():
            conf[key] = getattr(module, key)
    return conf


def wrap(p, indent):
    return textwrap.fill(p, 72, initial_indent=indent, subsequent_indent=indent)


def html2text(html):
    # the html2text module does an ok job, but it can be improved in
    # the quality of the transformation and in the coding style
    conv = HTML2Text()
    conv.ignore_links = True
    conv.ignore_images = True
    conv.ignore_emphasis = True
    return conv.handle(html)


class Mailer:
    def __init__(self, host='localhost', port=25, starttls=True, username=None, password=None):
        self.host = host
        self.port = port
        self.starttls = starttls
        self.username = username
        self.password = password

    @contextmanager
    def connect(self):
        smtp = smtplib.SMTP(self.host, self.port)
        if self.starttls:
            smtp.starttls()
        if self.username:
            smtp.login(self.username, self.password)
        yield smtp
        smtp.quit()

    def send(self, message, fromaddr=None, toaddr=None):
        if not fromaddr:
            fromaddr = message['From']
        if not toaddr:
            toaddr = message['To']
        with self.connect() as conn:
            conn.sendmail(fromaddr, toaddr, str(message))


class GPG:
    def __init__(self, homedir):
        self.homedir = homedir

    def encrypt(self, message, sender, recipient):
        sender = parseaddr(sender)[1]
        recipient = parseaddr(recipient)[1]
        cmd = [ "gpg2", "--homedir", self.homedir, "--sign", "--encrypt",
                "--batch", "--no-options", "--yes", "--armor",
                "--local-user", sender, "--recipient", recipient, ]
        p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        encdata, err = p.communicate(input=message.encode('utf-8'))
        if p.returncode:
            raise RuntimeError(p.returncode, err)
        return encdata.decode('ascii')


class Transaction:
    __slots__ = 'id', 'date', 'category', 'description', 'amount', 'currency'

    def __init__(self, id, date, category, description, amount, currency):
        self.id = id
        self.date = date
        self.category = category
        self.description = description
        self.amount = amount
        self.currency = currency

    @classmethod
    def fromjson(cls, x):
        data = {'id': int(x['idOperation']),
                'date': x['dateOperation'],
                'category': int(x['idCategorie']),
                'description': x['libelleOperation'].strip().replace('VIREMENT', 'VIR'),
                'amount': x['montant']['montant'],
                'currency': x['montant']['currency'], }
        return cls(**data)

    def __str__(self):
        return TRANSACTION.format(xact=self)


class Message:
    __slots__ = 'id', 'date', 'subject', 'sender', 'content', 'quoted'

    def __init__(self, id, date, subject, sender, content, quoted):
        self.id = id
        self.date = date
        self.subject = subject
        self.sender = sender
        self.content = content
        self.quoted = quoted

    @classmethod
    def fromjson(cls, x):
        data = {'id': x['msg']['id'],
                'date': datetime.strptime(x['msg']['id'], '%Y-%m-%d-%H.%M.%S.%f'),
                'subject': x['msg']['objet'],
                'sender': x['msg']['emetteur']['nom'],
                'content': x['msg']['contenu'],
                'quoted': None, }
        quoted = x.get('msgAttache')
        if quoted:
            data['quoted'] = quoted['contenu']
        return cls(**data)

    @staticmethod
    def normalize(txt, indent=''):
        if '<div' in txt:
            txt = txt.replace('<br></br>','<br/><br/>')
            return html2text(txt)

        txt = txt.replace(r'<br/>', '\n')
        parts = []
        for p in txt.split('\n'):
            p = p.strip()
            if p:
                p = wrap(p, indent)
            else:
                p = indent
            parts.append(p)
        return '\n'.join(parts)

    @property
    def body(self):
        body = self.normalize(self.content)
        if self.quoted is not None:
            body = body + '\n\n' + self.normalize(self.quoted, '> ')
        return body

    def __str__(self):
        return MESSAGE.format(message=self)


class Keypad:
    def __init__(self, data):
        # reference keypad
        fname = os.path.join(os.path.dirname(__file__), 'keypad.jpeg')
        reference = Image.open(fname).convert('L')
        symbols = [ 8, 4, 1, 6, 3, 7, 9, 0, 5, 2 ]
        self.keypad = dict(zip(symbols, self.imslice(reference)))

        # decode keypad
        image = Image.open(BytesIO(data)).convert('L')
        self.keymap = {}
        for n, tile in enumerate(self.imslice(image), 1):
            # compare to reference tiles
            for sym, key in self.keypad.items():
                if tile == key:
                    self.keymap[sym] = n
                    break

        # verify
        if sorted(self.keymap.keys()) != list(range(10)):
            raise ValueError('keypad decode failed')

    @staticmethod
    def imslice(image):
        for j, i in itertools.product(range(2), range(5)):
            yield image.crop((83 * i, 80 * j, 83 * (i + 1), 80 * (j + 1)))

    def encode(self, passwd):
        return ''.join('%02d' % self.keymap[d] for d in map(int, passwd))


class BNPParibas:
    def __init__(self):
        self.session = requests.Session()
        self.session.headers.update({'X-Requested-With': 'XMLHttpRequest'})

    @staticmethod
    def validate(response):
        response.raise_for_status()
        ctype, params = cgi.parse_header(response.headers.get('content-type'))
        if ctype == 'application/json':
            data = response.json()
            # the status code may sometime be represented as string not int
            code = data.get('codeRetour', -1)
            if int(code) != 0:
                raise requests.HTTPError()
            return data

    def login(self, username, password):
        url = urljoin(URL, 'identification-wspl-pres/identification')
        r = self.session.get(url, params={'timestamp': int(time.time())})
        v = self.validate(r)

        keypadid = v['data']['grille']['idGrille']
        authtemplate = v['data']['authTemplate']

        url = urljoin(URL, 'identification-wspl-pres/grille/' + keypadid)
        r = self.session.get(url)
        r.raise_for_status()

        keypad = Keypad(r.content)

        # fill authentication template
        auth = authtemplate
        auth = auth.replace('{{ idTelematique }}', username)
        auth = auth.replace('{{ password }}', keypad.encode(password))
        auth = auth.replace('{{ clientele }}', '')

        url = urljoin(URL, 'SEEA-pa01/devServer/seeaserver')
        r = self.session.post(url, data={'AUTH': auth})
        v = self.validate(r)
        return v['data']

    def info(self):
        url = urljoin(URL, 'serviceinfosclient-wspl/rpc/InfosClient')
        r = self.session.get(url, params={'modeAppel': 0})
        v = self.validate(r)
        return v['data']

    def recent(self):
        url = urljoin(URL, 'udc-wspl/rest/getlstcpt')
        r = self.session.get(url)
        v = self.validate(r)
        account = v['data']['infoUdc']['familleCompte'][0]['compte'][0]

        url = urljoin(URL, 'rop-wspl/rest/releveOp')
        data = json.dumps({'ibanCrypte': account['key'],
                           'pastOrPending': 1, 'triAV': 0,
                           'startDate': None, # ddmmyyyy
                           'endDate': None})
        headers = {'Content-Type': 'application/json'}
        r = self.session.post(url, headers=headers, data=data)
        v = self.validate(r)
        return v['data']

    def messages(self):
        url = urljoin(URL, 'bmm-wspl/recupMsgRecu')
        r = self.session.get(url, params={'nbMessagesParPage': 200, 'index': 0})
        v = self.validate(r)
        return v['data']

    def message(self, mid):
        # required to set some cookies required by the next call
        url = urljoin(URL, 'fr/connexion/mes-outils/messagerie')
        r = self.session.get(url)
        self.validate(r)

        url = urljoin(URL, 'bmm-wspl/recupMsg')
        r = self.session.get(url, params={'identifiant': mid})
        v = self.validate(r)
        return v['data']


def main():
    conffile = sys.argv[1]
    conf = loadconf(conffile)

    db = sqlite3.connect(conf['DATABASE'])
    db.execute('''CREATE TABLE IF NOT EXISTS messages (id TEXT PRIMARY KEY)''')
    db.execute('''CREATE TABLE IF NOT EXISTS transactions (id INTEGER PRIMARY KEY)''')

    sendmail = Mailer(host=conf['SMTPHOST'],
                      port=conf['SMTPPORT'],
                      starttls=conf['SMTPSTARTTLS'],
                      username=conf['SMTPUSER'],
                      password=conf['SMTPPASSWD']).send

    encrypt = GPG(conf['GNUPGHOME']).encrypt

    remote = BNPParibas()
    remote.login(conf['USERNAME'], conf['PASSWORD'])

    ## transactions
    recent = remote.recent()
    data = recent['listerOperations']['compte']
    transactions = [ Transaction.fromjson(x) for x in data['operationPassee'] ]
    balance = data['soldeDispo']

    curs = db.cursor()
    unseen = []
    for t in transactions:
        curs.execute('''SELECT COUNT(*) FROM transactions WHERE id = ?''', (t.id, ))
        if not curs.fetchone()[0]:
            # not seen before
            unseen.append(t)

    if unseen:
        lines = []
        lines.append(HEADER)
        lines.append(SEP)
        for t in unseen:
            lines.append(str(t))
        lines.append(SEP)
        lines.append(FOOTER.format(balance=balance))

        text = '\n'.join(lines)
        payload = encrypt(text, conf['MAILFROM'], conf['MAILTO'])

        message = MIMEText(payload, _charset='utf8 7bit')
        message['Subject'] = 'BNP Paribas Account update'
        message['From'] = conf['MAILFROM']
        message['To'] = conf['MAILTO']
        message['Date'] = format_datetime(localtime())

        sendmail(message)

    curs.executemany('''INSERT INTO transactions (id) VALUES (?)''', ((x.id, ) for x in unseen))
    db.commit()

    ## messages
    data = remote.info()
    info = data['abonnement']
    nnew = info['nombreMessageBMMNonLus'] + info['nombreMessageBilatNonLus']

    data = remote.messages()
    for m in data['messages']:

        curs = db.cursor()
        curs.execute('''SELECT COUNT(*) FROM messages WHERE id = ?''', (m['id'], ))
        if curs.fetchone()[0]:
            # already handled
            continue

        text = Message.fromjson(remote.message(m['id']))
        payload = encrypt(str(text), conf['MAILFROM'], conf['MAILTO'])

        message = MIMEText(payload, _charset='utf8 7bit')
        message['Subject'] = 'BNP Paribas Message'
        message['From'] = conf['MAILFROM']
        message['To'] = conf['MAILTO']
        message['Date'] = format_datetime(localtime())

        sendmail(message)

        curs.execute('''INSERT INTO messages (id) VALUES (?)''', (m['id'], ))
        db.commit()


if __name__ == '__main__':
    main()