view bnpparibas.py @ 14:0a3509a12762

Implement download of montly records and add command line options
author Daniele Nicolodi <daniele@grinta.net>
date Tue, 12 Jan 2016 02:22:59 +0100
parents 37ce0dc68cad
children af2e222f2dad
line wrap: on
line source

import cgi
import email
import imp
import itertools
import json
import os.path
import smtplib
import sqlite3
import subprocess
import sys
import textwrap
import time

from contextlib import contextmanager
from datetime import datetime
from email.mime.text import MIMEText
from email.utils import format_datetime, localtime, parseaddr
from io import BytesIO
from pprint import pprint
from urllib.parse import urljoin

import requests
import click

from html2text import HTML2Text
from PIL import Image


URL = 'https://mabanque.bnpparibas/'

# message template
MESSAGE = """\
From: {message.sender:}
Subject: {message.subject:}
Date: {message.date:%a, %d %b %Y %H:%M:%S}

{message.body:}
"""

# transactions table row template
TRANSACTION = """{xact.id:14d}  {xact.date:10s}  {xact.description:54s}  {xact.amount:+8.2f}"""

# transactions table header
HEADER = """{:14s}  {:10s}  {:54s}  {:>8s}""".format('Id', 'Date', 'Description', 'Amount')

# transactions table footer
FOOTER = """{:14s}  {:10s}  {:54s}  {{balance:8.2f}}""".format('', '', 'BALANCE')

# transactions table horizontal separator
SEP = """-""" * len(HEADER)


# GPG encrypted text is ascii and as such does not require encoding
# but its decrypted form is utf-8 and therefore the charset header
# must be set accordingly. define an appropriate charset object
email.charset.add_charset('utf8 7bit', header_enc=email.charset.SHORTEST,
                          body_enc=None, output_charset='utf-8')


def loadconf(filename):
    module = imp.new_module('conf')
    module.__file__ = filename
    with open(filename) as fd:
        exec(compile(fd.read(), filename, 'exec'), module.__dict__)
    conf = {}
    for key in dir(module):
        if key.isupper():
            conf[key] = getattr(module, key)

    conf['DATADIR'] = os.path.dirname(filename)
    for key in 'DATABASE', 'GNUPGHOME':
        # if path is not absolute, it is interpreted as relative
        # to the location of the configuration file
        if not os.path.isabs(conf[key]):
            conf[key] = os.path.join(conf['DATADIR'], conf[key])

    return conf


def wrap(p, indent):
    return textwrap.fill(p, 72, initial_indent=indent, subsequent_indent=indent)


def html2text(html):
    # the html2text module does an ok job, but it can be improved in
    # the quality of the transformation and in the coding style
    conv = HTML2Text()
    conv.ignore_links = True
    conv.ignore_images = True
    conv.ignore_emphasis = True
    return conv.handle(html)


class Mailer:
    def __init__(self, host='localhost', port=25, starttls=True, username=None, password=None):
        self.host = host
        self.port = port
        self.starttls = starttls
        self.username = username
        self.password = password

    @contextmanager
    def connect(self):
        smtp = smtplib.SMTP(self.host, self.port)
        if self.starttls:
            smtp.starttls()
        if self.username:
            smtp.login(self.username, self.password)
        yield smtp
        smtp.quit()

    def send(self, message, fromaddr=None, toaddr=None):
        if not fromaddr:
            fromaddr = message['From']
        if not toaddr:
            toaddr = message['To']
        with self.connect() as conn:
            conn.sendmail(fromaddr, toaddr, str(message))


class GPG:
    def __init__(self, homedir):
        self.homedir = homedir

    def encrypt(self, message, sender, recipient):
        sender = parseaddr(sender)[1]
        recipient = parseaddr(recipient)[1]
        cmd = [ "gpg2", "--homedir", self.homedir, "--sign", "--encrypt",
                "--batch", "--no-options", "--yes", "--armor",
                "--local-user", sender, "--recipient", recipient, ]
        p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        encdata, err = p.communicate(input=message.encode('utf-8'))
        if p.returncode:
            raise RuntimeError(p.returncode, err)
        return encdata.decode('ascii')


class Transaction:
    __slots__ = 'id', 'date', 'category', 'description', 'amount', 'currency'

    def __init__(self, id, date, category, description, amount, currency):
        self.id = id
        self.date = date
        self.category = category
        self.description = description
        self.amount = amount
        self.currency = currency

    @classmethod
    def fromjson(cls, x):
        data = {'id': int(x['idOperation']),
                'date': x['dateOperation'],
                'category': int(x['idCategorie']),
                'description': x['libelleOperation'].strip().replace('VIREMENT', 'VIR'),
                'amount': x['montant']['montant'],
                'currency': x['montant']['currency'], }
        return cls(**data)

    def __str__(self):
        return TRANSACTION.format(xact=self)


class Message:
    __slots__ = 'id', 'date', 'subject', 'sender', 'content', 'quoted'

    def __init__(self, id, date, subject, sender, content, quoted):
        self.id = id
        self.date = date
        self.subject = subject
        self.sender = sender
        self.content = content
        self.quoted = quoted

    @classmethod
    def fromjson(cls, x):
        data = {'id': x['msg']['id'],
                'date': datetime.strptime(x['msg']['id'], '%Y-%m-%d-%H.%M.%S.%f'),
                'subject': x['msg']['objet'],
                'sender': x['msg']['emetteur']['nom'],
                'content': x['msg']['contenu'],
                'quoted': None, }
        quoted = x.get('msgAttache')
        if quoted:
            data['quoted'] = quoted['contenu']
        return cls(**data)

    @staticmethod
    def normalize(txt, indent=''):
        if '<div' in txt:
            txt = txt.replace('<br></br>','<br/><br/>')
            return html2text(txt)

        txt = txt.replace(r'<br/>', '\n')
        parts = []
        for p in txt.split('\n'):
            p = p.strip()
            if p:
                p = wrap(p, indent)
            else:
                p = indent
            parts.append(p)
        return '\n'.join(parts)

    @property
    def body(self):
        body = self.normalize(self.content)
        if self.quoted is not None:
            body = body + '\n\n' + self.normalize(self.quoted, '> ')
        return body

    def __str__(self):
        return MESSAGE.format(message=self)


class Keypad:
    def __init__(self, data):
        # reference keypad
        fname = os.path.join(os.path.dirname(__file__), 'keypad.jpeg')
        reference = Image.open(fname).convert('L')
        symbols = [ 8, 4, 1, 6, 3, 7, 9, 0, 5, 2 ]
        self.keypad = dict(zip(symbols, self.imslice(reference)))

        # decode keypad
        image = Image.open(BytesIO(data)).convert('L')
        self.keymap = {}
        for n, tile in enumerate(self.imslice(image), 1):
            # compare to reference tiles
            for sym, key in self.keypad.items():
                if tile == key:
                    self.keymap[sym] = n
                    break

        # verify
        if sorted(self.keymap.keys()) != list(range(10)):
            raise ValueError('keypad decode failed')

    @staticmethod
    def imslice(image):
        for j, i in itertools.product(range(2), range(5)):
            yield image.crop((83 * i, 80 * j, 83 * (i + 1), 80 * (j + 1)))

    def encode(self, passwd):
        return ''.join('%02d' % self.keymap[d] for d in map(int, passwd))


class BNPParibas:
    def __init__(self):
        self.session = requests.Session()
        self.session.headers.update({'X-Requested-With': 'XMLHttpRequest'})

    @staticmethod
    def validate(response):
        response.raise_for_status()
        ctype, params = cgi.parse_header(response.headers.get('content-type'))
        if ctype == 'application/json':
            data = response.json()
            # the status code may sometime be represented as string not int
            code = data.get('codeRetour', -1)
            if int(code) != 0:
                raise requests.HTTPError()
            return data

    def login(self, username, password):
        url = urljoin(URL, 'identification-wspl-pres/identification')
        r = self.session.get(url, params={'timestamp': int(time.time())})
        v = self.validate(r)

        keypadid = v['data']['grille']['idGrille']
        authtemplate = v['data']['authTemplate']

        url = urljoin(URL, 'identification-wspl-pres/grille/' + keypadid)
        r = self.session.get(url)
        r.raise_for_status()

        keypad = Keypad(r.content)

        # fill authentication template
        auth = authtemplate
        auth = auth.replace('{{ idTelematique }}', username)
        auth = auth.replace('{{ password }}', keypad.encode(password))
        auth = auth.replace('{{ clientele }}', '')

        url = urljoin(URL, 'SEEA-pa01/devServer/seeaserver')
        r = self.session.post(url, data={'AUTH': auth})
        v = self.validate(r)
        return v['data']

    def info(self):
        url = urljoin(URL, 'serviceinfosclient-wspl/rpc/InfosClient')
        r = self.session.get(url, params={'modeAppel': 0})
        v = self.validate(r)
        return v['data']

    def recent(self):
        url = urljoin(URL, 'udc-wspl/rest/getlstcpt')
        r = self.session.get(url)
        v = self.validate(r)
        account = v['data']['infoUdc']['familleCompte'][0]['compte'][0]

        url = urljoin(URL, 'rop-wspl/rest/releveOp')
        data = json.dumps({'ibanCrypte': account['key'],
                           'pastOrPending': 1, 'triAV': 0,
                           'startDate': None, # ddmmyyyy
                           'endDate': None})
        headers = {'Content-Type': 'application/json'}
        r = self.session.post(url, headers=headers, data=data)
        v = self.validate(r)
        return v['data']

    def messages(self):
        url = urljoin(URL, 'bmm-wspl/recupMsgRecu')
        r = self.session.get(url, params={'nbMessagesParPage': 200, 'index': 0})
        v = self.validate(r)
        return v['data']

    def message(self, mid):
        # required to set some cookies required by the next call
        url = urljoin(URL, 'fr/connexion/mes-outils/messagerie')
        r = self.session.get(url)
        self.validate(r)

        url = urljoin(URL, 'bmm-wspl/recupMsg')
        r = self.session.get(url, params={'identifiant': mid})
        v = self.validate(r)
        return v['data']

    def records(self):
        # required to set some cookies required by the next call
        url = urljoin(URL, 'fr/connexion/virements-services/releves-en-ligne')
        r = self.session.get(url)
        self.validate(r)

        url = urljoin(URL, 'demat-wspl/rest/initialisationDemat')
        r = self.session.get(url)
        v = self.validate(r)

        for branch in v['data']['initialisationDemat']['arbres']:
            for leave in branch.get('arbre', ( )):
                if leave['typeDoc'] == 'RELEV':
                    data = leave
                    break

        self.iban = data['ibans'][0]['ibanCrypte']
        query = {'famDoc': data['codeFamille'],
                 'idTypeDocument': data['idBranche'],
                 'listeIbanCrypte': [ self.iban, ],
                 'typeCpt': data['typeCompte'],
                 'typeDoc': data['typeDoc'],
                 'typeFamille': 'R001'} # ???

        url = urljoin(URL, 'demat-wspl/rest/consultationDemat')
        data = json.dumps(query)
        headers = {'Content-Type': 'application/json'}
        r = self.session.post(url, headers=headers, data=data)
        v = self.validate(r)

        years = v['data']['consultationDemat']['listeCompte'][0]['listeAnnee']
        query['codeProduit'] = ''
        documents = []

        url = urljoin(URL, 'demat-wspl/rest/rechercheDemat')
        for year in years:
            query['anneeSelectionnee'] = year
            data = json.dumps(query)
            r = self.session.post(url, headers=headers, data=data)
            v = self.validate(r)
            documents += v['data']['consultationDemat']['listeCompte'][0]['listeDocument']

        return documents

    def document(self, x):
        url = urljoin(URL, 'demat-wspl/rest/consultationDocumentDemat')
        params = {'consulted': x['consulted'],
                  'familleDoc': x['famDoc'],
                  'ibanCrypte': self.iban,
                  'idDocument': x['idDoc'],
                  'idLocalisation': 'undefined',
                  'typeCpt': x['typeCompte'],
                  'typeDoc': x['typeDoc'],
                  'viDocDocument': x['viDocDocument'],
                  'typeFamille': 'R001'}
        r = self.session.get(url, params=params)
        self.validate(r)
        return r.content


def transactions(conf):
    db = sqlite3.connect(conf['DATABASE'])
    db.execute('''CREATE TABLE IF NOT EXISTS transactions (id INTEGER PRIMARY KEY)''')

    sendmail = Mailer(host=conf['SMTPHOST'],
                      port=conf['SMTPPORT'],
                      starttls=conf['SMTPSTARTTLS'],
                      username=conf['SMTPUSER'],
                      password=conf['SMTPPASSWD']).send

    encrypt = GPG(conf['GNUPGHOME']).encrypt

    remote = BNPParibas()
    remote.login(conf['USERNAME'], conf['PASSWORD'])

    recent = remote.recent()
    data = recent['listerOperations']['compte']
    transactions = [ Transaction.fromjson(x) for x in data['operationPassee'] ]
    balance = data['soldeDispo']

    curs = db.cursor()
    unseen = []
    for t in transactions:
        curs.execute('''SELECT COUNT(*) FROM transactions WHERE id = ?''', (t.id, ))
        if not curs.fetchone()[0]:
            # not seen before
            unseen.append(t)

    if unseen:
        lines = []
        lines.append(HEADER)
        lines.append(SEP)
        for t in unseen:
            lines.append(str(t))
        lines.append(SEP)
        lines.append(FOOTER.format(balance=balance))

        text = '\n'.join(lines)
        payload = encrypt(text, conf['MAILFROM'], conf['MAILTO'])

        message = MIMEText(payload, _charset='utf8 7bit')
        message['Subject'] = 'BNP Paribas Account update'
        message['From'] = conf['MAILFROM']
        message['To'] = conf['MAILTO']
        message['Date'] = format_datetime(localtime())

        sendmail(message)

    curs.executemany('''INSERT INTO transactions (id) VALUES (?)''', ((x.id, ) for x in unseen))
    db.commit()


def messages(conf):
    db = sqlite3.connect(conf['DATABASE'])
    db.execute('''CREATE TABLE IF NOT EXISTS messages (id TEXT PRIMARY KEY)''')

    sendmail = Mailer(host=conf['SMTPHOST'],
                      port=conf['SMTPPORT'],
                      starttls=conf['SMTPSTARTTLS'],
                      username=conf['SMTPUSER'],
                      password=conf['SMTPPASSWD']).send

    encrypt = GPG(conf['GNUPGHOME']).encrypt

    remote = BNPParibas()
    remote.login(conf['USERNAME'], conf['PASSWORD'])

    data = remote.info()
    info = data['abonnement']
    nnew = info['nombreMessageBMMNonLus'] + info['nombreMessageBilatNonLus']

    data = remote.messages()
    for m in data['messages']:

        curs = db.cursor()
        curs.execute('''SELECT COUNT(*) FROM messages WHERE id = ?''', (m['id'], ))
        if curs.fetchone()[0]:
            # already handled
            continue

        text = Message.fromjson(remote.message(m['id']))
        payload = encrypt(str(text), conf['MAILFROM'], conf['MAILTO'])

        message = MIMEText(payload, _charset='utf8 7bit')
        message['Subject'] = 'BNP Paribas Message'
        message['From'] = conf['MAILFROM']
        message['To'] = conf['MAILTO']
        message['Date'] = format_datetime(localtime())

        sendmail(message)

        curs.execute('''INSERT INTO messages (id) VALUES (?)''', (m['id'], ))
        db.commit()


def records(conf):
    db = sqlite3.connect(conf['DATABASE'])
    db.execute('''CREATE TABLE IF NOT EXISTS records (id TEXT PRIMARY KEY)''')

    remote = BNPParibas()
    data = remote.login(conf['USERNAME'], conf['PASSWORD'])

    records = remote.records()
    for r in records:

        curs = db.cursor()
        curs.execute('''SELECT COUNT(*) FROM records WHERE id = ?''', (r['idDoc'], ))
        if curs.fetchone()[0]:
            # already handled
            continue

        data = remote.document(r)
        date = datetime.strptime(r['dateDoc'], '%d/%m/%Y').strftime('bnpparibas-%Y%m%d.pdf')
        filename = os.path.join(conf['DATADIR'], 'data', date)
        if conf.get('VERBOSE'):
            print(r['idDoc'], filename)
        with open(filename, 'wb') as fd:
            fd.write(data)

        curs.execute('''INSERT INTO records (id) VALUES (?)''', (r['idDoc'], ))
        db.commit()


@click.command()
@click.argument('conffile')
@click.option('--transactions', 'what', multiple=True, flag_value='transactions', help='Email new transactions.')
@click.option('--messages', 'what', multiple=True, flag_value='messages', help='Email new messages.')
@click.option('--records', 'what', multiple=True, flag_value='records', help='Download new montly records.')
@click.option('--verbose', is_flag=True, help='Verbose output.')
def main(conffile, what, verbose):

    actions = {'transactions': transactions,
               'messages': messages,
               'records': records}

    conf = loadconf(conffile)
    if verbose:
        conf['VERBOSE'] = True

    for x in what:
        action = actions.get(x)
        action(conf)


if __name__ == '__main__':
    main()