view bnpparibas.py @ 17:80648a7a119c default tip

Update
author Daniele Nicolodi <daniele@grinta.net>
date Sun, 21 Jan 2018 01:29:18 +0000
parents af2e222f2dad
children
line wrap: on
line source

import cgi
import email
import imp
import itertools
import json
import os.path
import smtplib
import sqlite3
import subprocess
import sys
import textwrap
import time

from datetime import datetime
from contextlib import contextmanager
from email.mime.text import MIMEText
from email.utils import format_datetime, localtime, parseaddr
from io import BytesIO
from pprint import pprint
from urllib.parse import urljoin

import requests
import click

from html2text import HTML2Text
from PIL import Image
from dateutil.relativedelta import relativedelta

URL = 'https://mabanque.bnpparibas/'

# message template
MESSAGE = """\
From: {message.sender:}
Subject: {message.subject:}
Date: {message.date:%a, %d %b %Y %H:%M:%S}

{message.body:}
"""

# transactions table row template
TRANSACTION = """{xact.id:14d}  {xact.date:10s}  {xact.description:54s}  {xact.amount:+8.2f}"""

# transactions table header
HEADER = """{:14s}  {:10s}  {:54s}  {:>8s}""".format('Id', 'Date', 'Description', 'Amount')

# transactions table footer
FOOTER = """{:14s}  {:10s}  {:54s}  {{balance:8.2f}}""".format('', '', 'BALANCE')

# transactions table horizontal separator
SEP = """-""" * len(HEADER)


# GPG encrypted text is ascii and as such does not require encoding
# but its decrypted form is utf-8 and therefore the charset header
# must be set accordingly. define an appropriate charset object
email.charset.add_charset('utf8 7bit', header_enc=email.charset.SHORTEST,
                          body_enc=None, output_charset='utf-8')


def loadconf(filename):
    module = imp.new_module('conf')
    module.__file__ = filename
    with open(filename) as fd:
        exec(compile(fd.read(), filename, 'exec'), module.__dict__)
    conf = {}
    for key in dir(module):
        if key.isupper():
            conf[key] = getattr(module, key)

    conf['DATADIR'] = os.path.dirname(filename)
    for key in 'DATABASE', 'GNUPGHOME':
        # if path is not absolute, it is interpreted as relative
        # to the location of the configuration file
        if not os.path.isabs(conf[key]):
            conf[key] = os.path.join(conf['DATADIR'], conf[key])

    return conf


def wrap(p, indent):
    return textwrap.fill(p, 72, initial_indent=indent, subsequent_indent=indent)


def html2text(html):
    # the html2text module does an ok job, but it can be improved in
    # the quality of the transformation and in the coding style
    conv = HTML2Text()
    conv.ignore_links = True
    conv.ignore_images = True
    conv.ignore_emphasis = True
    return conv.handle(html)


class Mailer:
    def __init__(self, host='localhost', port=25, starttls=True, username=None, password=None):
        self.host = host
        self.port = port
        self.starttls = starttls
        self.username = username
        self.password = password

    @contextmanager
    def connect(self):
        smtp = smtplib.SMTP(self.host, self.port)
        if self.starttls:
            smtp.starttls()
        if self.username:
            smtp.login(self.username, self.password)
        yield smtp
        smtp.quit()

    def send(self, message, fromaddr=None, toaddr=None):
        if not fromaddr:
            fromaddr = message['From']
        if not toaddr:
            toaddr = message['To']
        with self.connect() as conn:
            conn.sendmail(fromaddr, toaddr, str(message))


class GPG:
    def __init__(self, homedir):
        self.homedir = homedir

    def encrypt(self, message, sender, recipient):
        sender = parseaddr(sender)[1]
        recipient = parseaddr(recipient)[1]
        cmd = [ "gpg2", "--homedir", self.homedir, "--sign", "--encrypt",
                "--batch", "--no-options", "--yes", "--armor",
                "--local-user", sender, "--recipient", recipient, ]
        p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        encdata, err = p.communicate(input=message.encode('utf-8'))
        if p.returncode:
            raise RuntimeError(p.returncode, err)
        return encdata.decode('ascii')


class Transaction:
    __slots__ = 'id', 'date', 'category', 'description', 'amount', 'currency'

    def __init__(self, id, date, category, description, amount, currency):
        self.id = id
        self.date = date
        self.category = category
        self.description = description
        self.amount = amount
        self.currency = currency

    @classmethod
    def fromjson(cls, x):
        data = {'id': int(x['idOperation']),
                'date': x['dateOperation'],
                'category': int(x['idCategorie']),
                'description': x['libelleOperation'].strip().replace('VIREMENT', 'VIR'),
                'amount': x['montant']['montant'],
                'currency': x['montant']['currency'], }
        return cls(**data)

    def __str__(self):
        return TRANSACTION.format(xact=self)


class Message:
    __slots__ = 'id', 'date', 'subject', 'sender', 'content', 'quoted'

    def __init__(self, id, date, subject, sender, content, quoted):
        self.id = id
        self.date = date
        self.subject = subject
        self.sender = sender
        self.content = content
        self.quoted = quoted

    @classmethod
    def fromjson(cls, x):
        data = {'id': x['msg']['id'],
                'date': datetime.datetime.strptime(x['msg']['id'], '%Y-%m-%d-%H.%M.%S.%f'),
                'subject': x['msg']['objet'],
                'sender': x['msg']['emetteur']['nom'],
                'content': x['msg']['contenu'],
                'quoted': None, }
        quoted = x.get('msgAttache')
        if quoted:
            data['quoted'] = quoted['contenu']
        return cls(**data)

    @staticmethod
    def normalize(txt, indent=''):
        if '<div' in txt:
            txt = txt.replace('<br></br>','<br/><br/>')
            return html2text(txt)

        txt = txt.replace(r'<br/>', '\n')
        parts = []
        for p in txt.split('\n'):
            p = p.strip()
            if p:
                p = wrap(p, indent)
            else:
                p = indent
            parts.append(p)
        return '\n'.join(parts)

    @property
    def body(self):
        body = self.normalize(self.content)
        if self.quoted is not None:
            body = body + '\n\n' + self.normalize(self.quoted, '> ')
        return body

    def __str__(self):
        return MESSAGE.format(message=self)


class Keypad:
    def __init__(self, data):
        # reference keypad
        fname = os.path.join(os.path.dirname(__file__), 'keypad.jpeg')
        reference = Image.open(fname).convert('L')
        symbols = [ 8, 4, 1, 6, 3, 7, 9, 0, 5, 2 ]
        self.keypad = dict(zip(symbols, self.imslice(reference)))

        # decode keypad
        image = Image.open(BytesIO(data)).convert('L')
        self.keymap = {}
        for n, tile in enumerate(self.imslice(image), 1):
            # compare to reference tiles
            for sym, key in self.keypad.items():
                if tile == key:
                    self.keymap[sym] = n
                    break

        # verify
        if sorted(self.keymap.keys()) != list(range(10)):
            raise ValueError('keypad decode failed')

    @staticmethod
    def imslice(image):
        for j, i in itertools.product(range(2), range(5)):
            yield image.crop((83 * i, 80 * j, 83 * (i + 1), 80 * (j + 1)))

    def encode(self, passwd):
        return ''.join('%02d' % self.keymap[d] for d in map(int, passwd))


class BNPParibas:
    def __init__(self):
        self.session = requests.Session()
        self.session.headers.update({'X-Requested-With': 'XMLHttpRequest'})

    @staticmethod
    def validate(response):
        response.raise_for_status()
        ctype, params = cgi.parse_header(response.headers.get('content-type'))
        if ctype == 'application/json':
            data = response.json()
            # the status code may sometime be represented as string not int
            code = data.get('codeRetour', -1)
            if int(code) != 0:
                raise requests.HTTPError()
            return data

    def login(self, username, password):
        url = urljoin(URL, 'identification-wspl-pres/identification')
        r = self.session.get(url, params={'timestamp': int(time.time())})
        v = self.validate(r)

        keypadid = v['data']['grille']['idGrille']
        authtemplate = v['data']['authTemplate']

        url = urljoin(URL, 'identification-wspl-pres/grille/' + keypadid)
        r = self.session.get(url)
        r.raise_for_status()

        keypad = Keypad(r.content)

        # fill authentication template
        auth = authtemplate
        auth = auth.replace('{{ idTelematique }}', username)
        auth = auth.replace('{{ password }}', keypad.encode(password))
        auth = auth.replace('{{ clientele }}', '')

        url = urljoin(URL, 'SEEA-pa01/devServer/seeaserver')
        r = self.session.post(url, data={'AUTH': auth})
        v = self.validate(r)
        return v['data']

    def info(self):
        url = urljoin(URL, 'serviceinfosclient-wspl/rpc/InfosClient')
        r = self.session.get(url, params={'modeAppel': 0})
        v = self.validate(r)
        return v['data']

    def recent(self, startdate, enddate):
        url = urljoin(URL, 'udc-wspl/rest/getlstcpt')
        r = self.session.get(url)
        v = self.validate(r)
        account = v['data']['infoUdc']['familleCompte'][0]['compte'][0]

        url = urljoin(URL, 'rop-wspl/rest/releveOp')
        data = json.dumps({'ibanCrypte': account['key'],
                           'pastOrPending': 1, 'triAV': 0,
                           'startDate': '{:%d%m%Y}'.format(startdate), # ddmmyyyy
                           'endDate': '{:%d%m%Y}'.format(enddate)})
        headers = {'Content-Type': 'application/json'}
        r = self.session.post(url, headers=headers, data=data)
        v = self.validate(r)
        return v['data']

    def messages(self):
        url = urljoin(URL, 'bmm-wspl/recupMsgRecu')
        r = self.session.get(url, params={'nbMessagesParPage': 200, 'index': 0})
        v = self.validate(r)
        return v['data']

    def message(self, mid):
        # required to set some cookies required by the next call
        url = urljoin(URL, 'fr/connexion/mes-outils/messagerie')
        r = self.session.get(url)
        self.validate(r)

        url = urljoin(URL, 'bmm-wspl/recupMsg')
        r = self.session.get(url, params={'identifiant': mid})
        v = self.validate(r)
        return v['data']

    def records(self):
        url = urljoin(URL, 'fr/connexion/mes-outils/dematerialisation')
        r = self.session.get(url)
        self.validate(r)

        url = urljoin(URL, 'demat-wspl/rest/listerDocuments')
        r = self.session.get(url)
        v = self.validate(r)

        data = r.json()['data']
        documents = data['listerDocumentDemat']['mapReleves']['Comptes chèques']['listeDocument']
        return documents


    def document(self, x):
        url = urljoin(URL, 'demat-wspl/rest/consultationDocumentDemat')
        params = {'consulted': x['consulted'],
                  'familleDoc': x['famDoc'],
                  'ibanCrypte': x['ibanCrypte'],
                  'idDocument': x['idDoc'],
                  'dateDocument': x['dateDoc'],
                  'typeCpt': x['typeCompte'],
                  'typeDoc': x['typeDoc'],
                  'viDocDocument': x['viDocDocument'],
                  'idLocalisation': x['idLocalisation'],
                  'ikpiPersonne': '',
                  'typeFamille': 'R001'}
        r = self.session.get(url, params=params)
        self.validate(r)
        return r.content


def transactions(conf):
    db = sqlite3.connect(conf['DATABASE'])
    db.execute('''CREATE TABLE IF NOT EXISTS transactions (id INTEGER PRIMARY KEY)''')
    db.execute('''CREATE TABLE IF NOT EXISTS last (date TEXT)''')

    sendmail = Mailer(host=conf['SMTPHOST'],
                      port=conf['SMTPPORT'],
                      starttls=conf['SMTPSTARTTLS'],
                      username=conf['SMTPUSER'],
                      password=conf['SMTPPASSWD']).send

    encrypt = GPG(conf['GNUPGHOME']).encrypt

    remote = BNPParibas()
    remote.login(conf['USERNAME'], conf['PASSWORD'])

    # last date
    curs = db.cursor()
    curs.execute('''SELECT date FROM last''')
    row = curs.fetchone()
    startdate = datetime.strptime(row[0], '%d-%m-%Y') if row else datetime(2016, 10, 1)

    transactions = []
    while startdate <= datetime.now():
        enddate = startdate + relativedelta(months=2)
        recent = remote.recent(startdate, enddate)
        data = recent['listerOperations']['compte']
        transactions += [ Transaction.fromjson(x) for x in data['operationPassee'] ]
        balance = data['soldeDispo']
        startdate = enddate

    transactions.sort(key=lambda x: datetime.strptime(x.date, '%d-%m-%Y'))

    curs = db.cursor()
    unseen = []
    for t in transactions:
        curs.execute('''SELECT COUNT(*) FROM transactions WHERE id = ?''', (t.id, ))
        if not curs.fetchone()[0]:
            # not seen before
            unseen.append(t)

    if unseen:
        lines = []
        lines.append(HEADER)
        lines.append(SEP)
        for t in unseen:
            lines.append(str(t))
        lines.append(SEP)
        lines.append(FOOTER.format(balance=balance))

        text = '\n'.join(lines)
        payload = encrypt(text, conf['MAILFROM'], conf['MAILTO'])

        message = MIMEText(payload, _charset='utf8 7bit')
        message['Subject'] = 'BNP Paribas Account update'
        message['From'] = conf['MAILFROM']
        message['To'] = conf['MAILTO']
        message['Date'] = format_datetime(localtime())

        sendmail(message)

    curs.executemany('''INSERT INTO transactions (id) VALUES (?)''', ((x.id, ) for x in unseen))
    curs.execute('''INSERT OR REPLACE INTO 'last' (date) VALUES (?)''', (transactions[-1].date, ))
    db.commit()


def messages(conf):
    db = sqlite3.connect(conf['DATABASE'])
    db.execute('''CREATE TABLE IF NOT EXISTS messages (id TEXT PRIMARY KEY)''')

    sendmail = Mailer(host=conf['SMTPHOST'],
                      port=conf['SMTPPORT'],
                      starttls=conf['SMTPSTARTTLS'],
                      username=conf['SMTPUSER'],
                      password=conf['SMTPPASSWD']).send

    encrypt = GPG(conf['GNUPGHOME']).encrypt

    remote = BNPParibas()
    remote.login(conf['USERNAME'], conf['PASSWORD'])

    data = remote.info()
    info = data['abonnement']
    nnew = info['nombreMessageBMMNonLus'] + info['nombreMessageBilatNonLus']

    data = remote.messages()
    for m in data['messages']:

        curs = db.cursor()
        curs.execute('''SELECT COUNT(*) FROM messages WHERE id = ?''', (m['id'], ))
        if curs.fetchone()[0]:
            # already handled
            continue

        text = Message.fromjson(remote.message(m['id']))
        payload = encrypt(str(text), conf['MAILFROM'], conf['MAILTO'])

        message = MIMEText(payload, _charset='utf8 7bit')
        message['Subject'] = 'BNP Paribas Message'
        message['From'] = conf['MAILFROM']
        message['To'] = conf['MAILTO']
        message['Date'] = format_datetime(localtime())

        sendmail(message)

        curs.execute('''INSERT INTO messages (id) VALUES (?)''', (m['id'], ))
        db.commit()


def records(conf):
    db = sqlite3.connect(conf['DATABASE'])
    db.execute('''CREATE TABLE IF NOT EXISTS records (id TEXT PRIMARY KEY)''')

    remote = BNPParibas()
    data = remote.login(conf['USERNAME'], conf['PASSWORD'])

    records = remote.records()
    for r in records:

        curs = db.cursor()
        curs.execute('''SELECT COUNT(*) FROM records WHERE id = ?''', (r['idDoc'], ))
        if curs.fetchone()[0]:
            # already handled
            continue

        data = remote.document(r)
        date = datetime.strptime(r['dateDoc'], '%d/%m/%Y').strftime('bnpparibas-%Y%m%d.pdf')
        filename = os.path.join(conf['DATADIR'], 'data', date)
        if conf.get('VERBOSE'):
            print(r['idDoc'], filename)
        with open(filename, 'wb') as fd:
            fd.write(data)

        curs.execute('''INSERT INTO records (id) VALUES (?)''', (r['idDoc'], ))
        db.commit()


@click.command()
@click.argument('conffile')
@click.option('--transactions', 'what', multiple=True, flag_value='transactions', help='Email new transactions.')
@click.option('--messages', 'what', multiple=True, flag_value='messages', help='Email new messages.')
@click.option('--records', 'what', multiple=True, flag_value='records', help='Download new montly records.')
@click.option('--verbose', is_flag=True, help='Verbose output.')
def main(conffile, what, verbose):

    actions = {'transactions': transactions,
               'messages': messages,
               'records': records}

    conf = loadconf(conffile)
    if verbose:
        conf['VERBOSE'] = True

    for x in what:
        action = actions.get(x)
        action(conf)


if __name__ == '__main__':
    main()