view bnpparibas.py @ 6:13a8bc43bc09

Simplify package structure
author Daniele Nicolodi <daniele@grinta.net>
date Mon, 11 Jan 2016 18:57:25 +0100
parents src/bnpparibas.py@a47012c9db15
children 90f4e0bd0c2d
line wrap: on
line source

import email
import imp
import os.path
import re
import smtplib
import sqlite3
import subprocess
import textwrap

from collections import namedtuple, defaultdict
from contextlib import contextmanager
from datetime import datetime
from decimal import Decimal
from email.mime.text import MIMEText
from email.utils import format_datetime, localtime, parseaddr
from io import BytesIO
from itertools import product, islice
from urllib.parse import urljoin

import bs4
import click
import requests

from PIL import Image


# message template
MESSAGE = """\
From: {sender:}
Subject: {subject:}
Date: {date:}
Message-Id: {id:}

{body:}
"""

# transaction template
HEADER = '{:14s}  {:10s}  {:59s}  {:>8s}'.format('Id', 'Date', 'Description', 'Amount')
TRANSACTION = '{id:}  {date:%d/%m/%Y}  {descr:59s}  {amount:>8s}'

# as defined in bnpbaribas web app
CATEGORIES = {
      '1': 'Alimentation',
      '7': 'Logement',
      '8': 'Loisirs',
      '9': 'Transport',
     '12': 'Opérations bancaires',
     '13': 'Non défini',
     '14': 'Multimédia',
     '20': 'Energies',
     '22': 'Retrait',
     '23': 'Sorties',
    'R58': 'Non défini',
}

# euro symbol
EURO = b'\xe2\x82\xac'.decode('utf-8')


# load configuration
def loadconfig(filename):
    module = imp.new_module('config')
    module.__file__ = filename
    try:
        with open(filename) as fd:
            exec(compile(fd.read(), filename, 'exec'), module.__dict__)
    except IOError as e:
        e.strerror = 'Unable to load configuration file (%s)' % e.strerror
        raise
    config = {}
    for key in dir(module):
        if key.isupper():
            config[key] = getattr(module, key)
    return config


# GPG encrypted text is ascii and as such does not require encoding
# but its decrypted form is utf-8 and therefore the charset header
# must be set accordingly. define an appropriate charset object
email.charset.add_charset('utf8 7bit', header_enc=email.charset.SHORTEST,
                          body_enc=None, output_charset='utf-8')


Message = namedtuple('Message', 'id read icon sender subject date validity'.split())


class Transaction:
    def __init__(self, tid, date, descr, debit, credit, category):
        self.id = tid
        self.date = date
        self.descr = descr
        self.debit = debit
        self.credit = credit
        self.category = category

    def __str__(self):
        # there does not seem to be an easy way to format Decimal
        # objects with a leading sign in both the positive and
        # negative value cases so do it manually
        d = vars(self)
        if d['debit']:
            d['amount'] = '-' + str(d['debit'])
        if d['credit']:
            d['amount'] = '+' + str(d['credit'])
        return TRANSACTION.format(**d)
        
        
def imslice(image):
    for y, x in product(range(0, 5), range(0, 5)):
        yield image.crop((27 * x + 1, 27 * y + 1, 27 * (x + 1), 27 * (y + 1)))


def imdecode(image):
    # load reference keypad
    keypad = Image.open(os.path.join(os.path.dirname(__file__), 'keypad.png')).convert('L')
    keypad = [ keypad.crop((26 * i, 0, 26 * (i + 1), 26)) for i in range(10) ]
    immap = {}
    for n, tile in enumerate(imslice(image)):
        # skip tiles with background only
        if tile.getextrema()[0] > 0:
            continue
        # compare to reference tiles
        for d in range(0, 10):
            if tile == keypad[d]:
                immap[d] = n + 1
                break
    if sorted(immap.keys()) != list(range(10)):
        raise ValueError('keypad decode failed')    
    return immap


def amountparse(value):
    # empty
    if value == '\xa0':
        return None
    m = re.match(r'\s+((?:\d+\.)?\d+,\d+)\s+([^\s]+)\s+$', value, re.U|re.S)
    if m is None:
        raise ValueError(repr(value))
    # euro
    currency = m.group(2)
    if currency != EURO:
        raise ValueError(repr(currency))
    return Decimal(m.group(1).replace('.', '').replace(',', '.'))


class Site:
    def __init__(self):
        self.url = 'https://www.secure.bnpparibas.net'
        self.req = requests.Session()

    def login(self, user, passwd):
        # login page
        url = urljoin(self.url, '/banque/portail/particulier/HomeConnexion')
        r = self.req.get(url, params={'type': 'homeconnex'})
        r.raise_for_status()
        # login form
        soup = bs4.BeautifulSoup(r.text)
        form = soup.find('form', attrs={'name': 'logincanalnet'})
        # extract relevant data
        action = form['action']
        data = { field['name']: field['value'] for field in form('input') }

        # keyboard image url
        tag = soup.find(attrs={'id': 'secret-nbr-keyboard'})
        for prop in tag['style'].split(';'):
            match = re.match(r'background-image:\s+url\(\'(.*)\'\)\s*', prop)
            if match:
                src = match.group(1)
                break
        # download keyboard image
        r = self.req.get(urljoin(self.url, src))
        image = Image.open(BytesIO(r.content)).convert('L')
        # decode digits position
        passwdmap = imdecode(image)

        # encode password
        passwdenc = ''.join('%02d' % passwdmap[d] for d in map(int, passwd))

        # username and password
        data['ch1'] = user
        data['ch5'] = passwdenc

        # post
        r = self.req.post(urljoin(self.url, action), data=data)
        r.raise_for_status()
        # redirection
        m = re.search(r'document\.location\.replace\(\"(.+)\"\)', r.text)
        dest = m.group(1)
        r = self.req.get(dest)
        r.raise_for_status()

        # check for errors
        soup = bs4.BeautifulSoup(r.text)
        err = soup.find(attrs={'class': 'TitreErreur'})
        if err:
            raise ValueError(err.text)


    def recent(self, contract):
        data = {
            'BeginDate': '',
            'Categs': '',
            'Contracts': '',
            'EndDate': '',
            'OpTypes': '',
            'cboFlowName': 'flow/iastatement',
            'contractId': contract,
            'contractIds': '',
            'entryDashboard': '',
            'execution': 'e6s1',
            'externalIAId': 'IAStatements',
            'g1Style': 'expand',
            'g1Type': '',
            'g2Style': 'collapse',
            'g2Type': '',
            'g3Style': 'collapse',
            'g3Type': '',
            'g4Style': 'collapse',
            'g4Type': '',
            'groupId': '-2',
            'groupSelected': '-2',
            'gt': 'homepage:basic-theme',
            'pageId': 'releveoperations',
            'pastOrPendingOperations': '1',
            'sendEUD': 'true',
            'step': 'STAMENTS', }

        url = urljoin(self.url, '/banque/portail/particulier/FicheA')
        r = self.req.post(url, data=data)
        r.raise_for_status()
        text = r.text

        # the html is so broken beautifulsoup does not understand it
        text = text.replace(
            '<th class="thTitre" style="width:7%">Pointage </td>',
            '<th class="thTitre" style="width:7%">Pointage </th>')
        s = bs4.BeautifulSoup(text)

        # extract transactions
        table = s.find('table', id='tableCompte')
        rows = table.find_all('tr')
        for row in rows:
            fields = row.find_all('td')
            if not fields:
                # skip headers row
                continue
            id = int(fields[0].input['id'].lstrip('_'))
            date = datetime.strptime(fields[1].text, '%d/%m/%Y')
            descr = fields[2].text.strip()
            debit = amountparse(fields[3].text)
            credit = amountparse(fields[4].text)
            category = fields[5].text.strip()
            categoryid = fields[6].span['class'][2][4:]
            yield Transaction(id, date, descr, debit, credit, categoryid)


    def messages(self):
        data = {
            'identifiant': 'BmmFicheListerMessagesRecus_20100607022434',
            'type': 'fiche', }

        url = urljoin(self.url, '/banque/portail/particulier/Fiche')
        r = self.req.post(url, data=data)
        r.raise_for_status()
        s = bs4.BeautifulSoup(r.text)

        # messages list
        table = s.find('table', id='listeMessages')
        for row in table.find_all('tr', recursive=False):
            # skip headers and separators
            if 'entete' in row['class']:
                continue
            # skip separators
            if 'sep' in row['class']:
                continue
            # skip footer
            if 'actions_bas' in row['class']:
                continue
            fields = row.find_all('td')
            icon = fields[1].img['src']
            sender = fields[2].text.strip()
            subject = fields[4].a.text.strip()
            date = datetime.strptime(fields[5]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec')
            validity = datetime.strptime(fields[6]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec')
            m = re.match(r'''validerFormulaire\('BmmFicheLireMessage_20100607022346','(.+)','(true|false)'\);$''', fields[4].a['onclick'])
            mid = m.group(1)
            read = m.group(2) == 'false'
            yield Message(mid, read, icon, sender, subject, date, validity)


    def message(self, mid):
        data = {
            'etape': 'boiteReception',
            'idMessage': mid,
            'identifiant': 'BmmFicheLireMessage_20100607022346',
            'maxPagination': 2,
            'minPagination': 1,
            'nbElementParPage': 20,
            'nbEltPagination': 5,
            'nbPages': 2,
            'newMsg': 'false',
            'pagination': 1,
            'type': 'fiche',
            'typeAction': '', }

        url = urljoin(self.url, '/banque/portail/particulier/Fiche')
        r = self.req.post(url, data=data)
        r.raise_for_status()
        # fix badly broken html
        text = r.text.replace('<br>', '<br/>').replace('</br>', '')
        s = bs4.BeautifulSoup(text)

        envelope = s.find('div', attrs={'class': 'enveloppe'})
        rows = envelope.find_all('tr')
        fields = rows[1].find_all('td')
        # the messages list present a truncated sender
        sender = fields[0].text.strip()
        # not used
        subject = fields[1].text.strip()
        date = fields[2].text.strip()

        content = s.find('div', attrs={'class': 'txtMessage'})
        # clean up text
        for t in content.find_all('style'):
            t.extract()
        for t in content.find_all('script'):
            t.extract()
        for t in content.find_all(id='info_pro'):
            t.extract()
        for t in content.find_all('br'):
            t.replace_with('\n\n')
        for t in content.find_all('b'):
            if t.string:
                t.replace_with('*%s*' % t.string.strip())
        for t in content.find_all('li'):
            t.replace_with('- %s\n\n' % t.text.strip())
        # format nicely
        text = re.sub(' +', ' ', content.text)
        text = re.sub(r'\s+([\.:])', r'\1', text)
        pars = []
        for p in re.split('\n\n+', text):
            p = p.strip()
            if p:
                pars.append('\n'.join(textwrap.wrap(p, 72)))
        body = '\n\n'.join(pars)
        return sender, body


    def transactions(self):
        data = {'ch_memo': 'NON',
                'ch_rop_cpt_0': 'FR7630004001640000242975804',
                'ch_rop_dat': 'tous',
                'ch_rop_dat_deb': '',
                'ch_rop_dat_fin': '',
                'ch_rop_fmt_dat': 'JJMMAAAA',
                'ch_rop_fmt_fic': 'RTEXC',
                'ch_rop_fmt_sep': 'PT',
                'ch_rop_mon': 'EUR',
                'x': '55',
                'y': '7'}
        r = self.req.post(urljoin(self.url, '/SAF_TLC_CNF'), data=data)
        r.raise_for_status()
        s = bs4.BeautifulSoup(r.text)
        path = s.find('a')['href']
        r = self.req.get(urljoin(self.url, path))
        r.raise_for_status()
        return r.text


class Mailer:
    def __init__(self, config):
        self.server = config.get('SMTPSERVER', 'localhost')
        self.port = config.get('SMTPPORT', 25)
        self.starttls = config.get('SMTPSTARTTLS', False)
        self.username = config.get('SMTPUSER', '')
        self.password = config.get('SMTPPASSWD', '')

    @contextmanager
    def connect(self):
        smtp = smtplib.SMTP(self.server, self.port)
        if self.starttls:
            smtp.starttls()
        if self.username:
            smtp.login(self.username, self.password)
        yield smtp
        smtp.quit()

    def send(self, message, fromaddr=None, toaddr=None):
        if not fromaddr:
            fromaddr = message['From']
        if not toaddr:
            toaddr = message['To']
        with self.connect() as conn:
            conn.sendmail(fromaddr, toaddr, str(message))


class GPG:
    def __init__(self, homedir):
        self.homedir = homedir
        
    def encrypt(self, message, sender, recipient):
        sender = parseaddr(sender)[1]
        recipient = parseaddr(recipient)[1]
        cmd = [ "gpg", "--homedir", self.homedir, "--batch", "--yes", "--no-options", "--armor",
                "--local-user", sender, "--recipient", recipient, "--sign", "--encrypt"]
        p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        encdata, err = p.communicate(input=message.encode('utf-8'))
        if p.returncode:
            raise RuntimeError(p.returncode, err)
        return encdata.decode('ascii')


@click.command()
@click.argument('filename')
def main(filename):
    # load configuration
    config = loadconfig(filename)
        
    bnp = Site()
    bnp.login(config['USERNAME'], config['PASSWORD'])

    db = sqlite3.connect(config['DATABASE'])
    db.execute('''CREATE TABLE IF NOT EXISTS messages (id TEXT PRIMARY KEY)''')
    db.execute('''CREATE TABLE IF NOT EXISTS transactions (id INTEGER PRIMARY KEY)''')

    mailer = Mailer(config)
    encrypt = GPG(config['GNUPGHOME']).encrypt

    ## unread messages
    messages = filter(lambda x: not x.read, bnp.messages())
    for m in sorted(messages, key=lambda x: x.date):
        curs = db.cursor()
        curs.execute('''SELECT IFNULL((SELECT id FROM messages WHERE id = ?), 0)''', (m.id, ))
        if curs.fetchone()[0]:
            # already handled
            continue

        # retrieve complete sender and message body
        sender, body = bnp.message(m.id)

        # compose and send message
        body = MESSAGE.format(id=m.id, sender=sender, date=m.date, subject=m.subject, body=body)
        message = MIMEText(encrypt(body, config['MAILFROM'], config['MAILTO']), _charset='utf8 7bit')
        message['Subject'] = 'BNP Paribas message'
        message['From'] = config['MAILFROM']
        message['To'] = config['MAILTO']
        message['Date'] = format_datetime(localtime(m.date))
        mailer.send(message)

        curs.execute('''INSERT INTO messages (id) VALUES (?)''', (m.id, ))
        db.commit()

        
    ## transactions
    transactions = bnp.recent(config['CONTRACT'])
    curs = db.cursor()
    lines = []
    for t in transactions:
        curs.execute('''SELECT IFNULL((SELECT id FROM transactions WHERE id = ?), 0)''', (t.id, ))
        if curs.fetchone()[0]:
            # already handled
            continue
        lines.append(str(t))
        curs.execute('''INSERT INTO transactions (id) VALUES (?)''', (t.id, ))
        
    if lines:
        lines.insert(0, HEADER)
        lines.insert(1, '-' * len(HEADER))
        body = '\n'.join(lines)
        message = MIMEText(encrypt(body, config['MAILFROM'], config['MAILTO']), _charset='utf8 7bit')
        message['Subject'] = 'BNP Paribas update'
        message['From'] = config['MAILFROM']
        message['To'] = config['MAILTO']
        message['Date'] = format_datetime(localtime())
        mailer.send(message)

    db.commit()


if __name__ == '__main__':
    main()