view src/bnpparibas.py @ 2:ad577744dd8e

Drop dependency on numpy
author Daniele Nicolodi <daniele.nicolodi@obspm.fr>
date Tue, 24 Feb 2015 17:23:39 +0100
parents 02ec4a9ab0f0
children 1311f6533978
line wrap: on
line source

import email
import os.path
import re
import smtplib
import sqlite3
import subprocess
import textwrap

from collections import namedtuple, defaultdict
from contextlib import contextmanager
from datetime import datetime
from decimal import Decimal
from email.mime.text import MIMEText
from email.utils import format_datetime, localtime, parseaddr
from io import BytesIO
from itertools import product, islice
from urllib.parse import urljoin

import bs4
import requests

from PIL import Image

DB = 'bnpparibas.sqlite'

# message template
MESSAGE = """\
From: {sender:}
Subject: {subject:}
Date: {date:}
Message-Id: {id:}

{body:}
"""

# transaction template
HEADER = '{:14s}  {:10s}  {:59s}  {:>8s}'.format('Id', 'Date', 'Description', 'Amount')
TRANSACTION = '{id:}  {date:%d/%m/%Y}  {descr:59s}  {amount:>8s}'

# as defined in bnpbaribas web app
CATEGORIES = {
      '1': 'Alimentation',
      '7': 'Logement',
      '8': 'Loisirs',
      '9': 'Transport',
     '12': 'Opérations bancaires',
     '13': 'Non défini',
     '14': 'Multimédia',
     '20': 'Energies',
     '22': 'Retrait',
     '23': 'Sorties',
    'R58': 'Non défini',
}

# euro symbol
EURO = b'\xe2\x82\xac'.decode('utf-8')


# load configuration
from config import *


# GPG encrypted text is ascii and as such does not require encoding
# but its decrypted form is utf-8 and therefore the charset header
# must be set accordingly. define an appropriate charset object
email.charset.add_charset('utf8 7bit', header_enc=email.charset.SHORTEST,
                          body_enc=None, output_charset='utf-8')


Message = namedtuple('Message', 'id read icon sender subject date validity'.split())


class Transaction:
    def __init__(self, tid, date, descr, debit, credit, category):
        self.id = tid
        self.date = date
        self.descr = descr
        self.debit = debit
        self.credit = credit
        self.category = category

    def __str__(self):
        # there does not seem to be an easy way to format Decimal
        # objects with a leading sign in both the positive and
        # negative value cases so do it manually
        d = vars(self)
        if d['debit']:
            d['amount'] = '-' + str(d['debit'])
        if d['credit']:
            d['amount'] = '+' + str(d['credit'])
        return TRANSACTION.format(**d)
        
        
def imslice(image):
    for y, x in product(range(0, 5), range(0, 5)):
        yield image.crop((27 * x + 1, 27 * y + 1, 27 * (x + 1), 27 * (y + 1)))


def imdecode(image):
    # load reference keypad
    keypad = Image.open(os.path.join(os.path.dirname(__file__), 'keypad.png')).convert('L')
    keypad = [ keypad.crop((26 * i, 0, 26 * (i + 1), 26)) for i in range(10) ]
    immap = {}
    for n, tile in enumerate(imslice(image)):
        # skip tiles with background only
        if tile.getextrema()[0] > 0:
            continue
        # compare to reference tiles
        for d in range(0, 10):
            if tile == keypad[d]:
                immap[d] = n + 1
                break
    if sorted(immap.keys()) != list(range(10)):
        raise ValueError('keypad decode failed')    
    return immap


def amountparse(value):
    # empty
    if value == '\xa0':
        return None
    m = re.match(r'\s+((?:\d+\.)?\d+,\d+)\s+([^\s]+)\s+$', value, re.U|re.S)
    if m is None:
        raise ValueError(repr(value))
    # euro
    currency = m.group(2)
    if currency != EURO:
        raise ValueError(repr(currency))
    return Decimal(m.group(1).replace('.', '').replace(',', '.'))


class Site:
    def __init__(self):
        self.url = 'https://www.secure.bnpparibas.net'
        self.req = requests.Session()

    def login(self, user, passwd):
        # login page
        url = urljoin(self.url, '/banque/portail/particulier/HomeConnexion')
        r = self.req.get(url, params={'type': 'homeconnex'})
        r.raise_for_status()
        # login form
        soup = bs4.BeautifulSoup(r.text)
        form = soup.find('form', attrs={'name': 'logincanalnet'})
        # extract relevant data
        action = form['action']
        data = { field['name']: field['value'] for field in form('input') }

        # keyboard image url
        src = ''
        tag = soup.find(attrs={'id': 'secret-nbr-keyboard'})
        for prop in tag['style'].split(';'):
            match = re.match(r'background-image:\s+url\(\'(.*)\'\)\s*', prop)
            if match:
                src = match.group(1)
                break
        # download keyboard image
        r = self.req.get(urljoin(self.url, src))
        image = Image.open(BytesIO(r.content)).convert('L')
        # decode digits position
        passwdmap = imdecode(image)

        # encode password
        passwdenc = ''.join('%02d' % passwdmap[d] for d in map(int, passwd))

        # username and password
        data['ch1'] = user
        data['ch5'] = passwdenc

        # post
        r = self.req.post(urljoin(self.url, action), data=data)
        r.raise_for_status()
        # redirection
        m = re.search(r'document\.location\.replace\(\"(.+)\"\)', r.text)
        dest = m.group(1)
        r = self.req.get(dest)
        r.raise_for_status()

        # check for errors
        soup = bs4.BeautifulSoup(r.text)
        err = soup.find(attrs={'class': 'TitreErreur'})
        if err:
            raise ValueError(err.text)


    def recent(self):
        data = {
            'BeginDate': '',
            'Categs': '',
            'Contracts': '',
            'EndDate': '',
            'OpTypes': '',
            'cboFlowName': 'flow/iastatement',
            'contractId': CONTRACT,
            'contractIds': '',
            'entryDashboard': '',
            'execution': 'e6s1',
            'externalIAId': 'IAStatements',
            'g1Style': 'expand',
            'g1Type': '',
            'g2Style': 'collapse',
            'g2Type': '',
            'g3Style': 'collapse',
            'g3Type': '',
            'g4Style': 'collapse',
            'g4Type': '',
            'groupId': '-2',
            'groupSelected': '-2',
            'gt': 'homepage:basic-theme',
            'pageId': 'releveoperations',
            'pastOrPendingOperations': '1',
            'sendEUD': 'true',
            'step': 'STAMENTS', }

        url = urljoin(self.url, '/banque/portail/particulier/FicheA')
        r = self.req.post(url, data=data)
        r.raise_for_status()
        text = r.text

        # the html is so broken beautifulsoup does not understand it
        text = text.replace(
            '<th class="thTitre" style="width:7%">Pointage </td>',
            '<th class="thTitre" style="width:7%">Pointage </th>')
        s = bs4.BeautifulSoup(text)

        # extract transactions
        table = s.find('table', id='tableCompte')
        rows = table.find_all('tr')
        for row in rows:
            fields = row.find_all('td')
            if not fields:
                # skip headers row
                continue
            id = int(fields[0].input['id'].lstrip('_'))
            date = datetime.strptime(fields[1].text, '%d/%m/%Y')
            descr = fields[2].text.strip()
            debit = amountparse(fields[3].text)
            credit = amountparse(fields[4].text)
            category = fields[5].text.strip()
            categoryid = fields[6].span['class'][2][4:]
            yield Transaction(id, date, descr, debit, credit, categoryid)


    def messages(self):
        data = {
            'identifiant': 'BmmFicheListerMessagesRecus_20100607022434',
            'type': 'fiche', }

        url = urljoin(self.url, '/banque/portail/particulier/Fiche')
        r = self.req.post(url, data=data)
        r.raise_for_status()
        s = bs4.BeautifulSoup(r.text)

        # messages list
        table = s.find('table', id='listeMessages')
        for row in table.find_all('tr', recursive=False):
            # skip headers and separators
            if 'entete' in row['class']:
                continue
            # skip separators
            if 'sep' in row['class']:
                continue
            # skip footer
            if 'actions_bas' in row['class']:
                continue
            fields = row.find_all('td')
            icon = fields[1].img['src']
            sender = fields[2].text.strip()
            subject = fields[4].a.text.strip()
            date = datetime.strptime(fields[5]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec')
            validity = datetime.strptime(fields[6]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec')
            m = re.match(r'''validerFormulaire\('BmmFicheLireMessage_20100607022346','(.+)','(true|false)'\);$''', fields[4].a['onclick'])
            mid = m.group(1)
            read = m.group(2) == 'false'
            yield Message(mid, read, icon, sender, subject, date, validity)


    def message(self, mid):
        data = {
            'etape': 'boiteReception',
            'idMessage': mid,
            'identifiant': 'BmmFicheLireMessage_20100607022346',
            'maxPagination': 2,
            'minPagination': 1,
            'nbElementParPage': 20,
            'nbEltPagination': 5,
            'nbPages': 2,
            'newMsg': 'false',
            'pagination': 1,
            'type': 'fiche',
            'typeAction': '', }

        url = urljoin(self.url, '/banque/portail/particulier/Fiche')
        r = self.req.post(url, data=data)
        r.raise_for_status()
        # fix badly broken html
        text = r.text.replace('<br>', '<br/>').replace('</br>', '')
        s = bs4.BeautifulSoup(text)

        envelope = s.find('div', attrs={'class': 'enveloppe'})
        rows = envelope.find_all('tr')
        fields = rows[1].find_all('td')
        # the messages list present a truncated sender
        sender = fields[0].text.strip()
        # not used
        subject = fields[1].text.strip()
        date = fields[2].text.strip()

        content = s.find('div', attrs={'class': 'txtMessage'})
        # clean up text
        for t in content.find_all('style'):
            t.extract()
        for t in content.find_all('script'):
            t.extract()
        for t in content.find_all(id='info_pro'):
            t.extract()
        for t in content.find_all('br'):
            t.replace_with('\n\n')
        for t in content.find_all('b'):
            if t.string:
                t.replace_with('*%s*' % t.string.strip())
        for t in content.find_all('li'):
            t.replace_with('- %s\n\n' % t.text.strip())
        # format nicely
        text = re.sub(' +', ' ', content.text)
        text = re.sub(r'\s+([\.:])', r'\1', text)
        pars = []
        for p in re.split('\n\n+', text):
            p = p.strip()
            if p:
                pars.append('\n'.join(textwrap.wrap(p, 72)))
        body = '\n\n'.join(pars)
        return sender, body


    def transactions(self):
        data = {'ch_memo': 'NON',
                'ch_rop_cpt_0': 'FR7630004001640000242975804',
                'ch_rop_dat': 'tous',
                'ch_rop_dat_deb': '',
                'ch_rop_dat_fin': '',
                'ch_rop_fmt_dat': 'JJMMAAAA',
                'ch_rop_fmt_fic': 'RTEXC',
                'ch_rop_fmt_sep': 'PT',
                'ch_rop_mon': 'EUR',
                'x': '55',
                'y': '7'}
        r = self.req.post(urljoin(self.url, '/SAF_TLC_CNF'), data=data)
        r.raise_for_status()
        s = bs4.BeautifulSoup(r.text)
        path = s.find('a')['href']
        r = self.req.get(urljoin(self.url, path))
        r.raise_for_status()
        return r.text


class Mailer:
    def __init__(self):
        self.server = SMTPSERVER
        self.port = SMTPPORT
        self.starttls = SMTPSTARTTLS
        self.username = SMTPUSER
        self.password = SMTPPASSWD

    @contextmanager
    def connect(self):
        smtp = smtplib.SMTP(self.server, self.port)
        if self.starttls:
            smtp.starttls()
        if self.username:
            smtp.login(self.username, self.password)
        yield smtp
        smtp.quit()

    def send(self, message, fromaddr=None, toaddr=None):
        if not fromaddr:
            fromaddr = message['From']
        if not toaddr:
            toaddr = message['To']
        with self.connect() as conn:
            conn.sendmail(fromaddr, toaddr, str(message))


def encrypt(message, sender, recipient):
    sender = parseaddr(sender)[1]
    recipient = parseaddr(recipient)[1]
    cmd = [ "gpg", "--homedir", GNUPGHOME, "--batch", "--yes", "--no-options", "--armor",
            "--local-user", sender, "--recipient", recipient, "--sign", "--encrypt"]
    p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    encdata = p.communicate(input=message.encode('utf-8'))[0].decode('ascii')
    return encdata


def main():
    bnp = Site()
    bnp.login(USERNAME, PASSWORD)

    db = sqlite3.connect(DB)
    db.execute('''CREATE TABLE IF NOT EXISTS messages (id TEXT PRIMARY KEY)''')
    db.execute('''CREATE TABLE IF NOT EXISTS transactions (id INTEGER PRIMARY KEY)''')

    mailer = Mailer()

    ## unread messages
    messages = filter(lambda x: not x.read, bnp.messages())
    for m in sorted(messages, key=lambda x: x.date):
        curs = db.cursor()
        curs.execute('''SELECT IFNULL((SELECT id FROM messages WHERE id = ?), 0)''', (m.id, ))
        if curs.fetchone()[0]:
            # already handled
            continue

        # retrieve complete sender and message body
        sender, body = bnp.message(m.id)

        # compose and send message
        body = MESSAGE.format(id=m.id, sender=sender, date=m.date, subject=m.subject, body=body)
        message = MIMEText(encrypt(body, MAILFROM, MAILTO), _charset='utf8 7bit')
        message['Subject'] = 'BNP Paribas message'
        message['From'] = MAILFROM
        message['To'] = MAILTO
        message['Date'] = format_datetime(localtime(m.date))
        mailer.send(message)

        curs.execute('''INSERT INTO messages (id) VALUES (?)''', (m.id, ))
        db.commit()

        
    ## transactions
    transactions = bnp.recent()
    curs = db.cursor()
    lines = []
    for t in transactions:
        curs.execute('''SELECT IFNULL((SELECT id FROM transactions WHERE id = ?), 0)''', (t.id, ))
        if curs.fetchone()[0]:
            # already handled
            continue
        lines.append(str(t))
        curs.execute('''INSERT INTO transactions (id) VALUES (?)''', (t.id, ))
        
    if lines:
        lines.insert(0, HEADER)
        lines.insert(1, '-' * len(HEADER))
        body = '\n'.join(lines)
        message = MIMEText(encrypt(body, MAILFROM, MAILTO), _charset='utf8 7bit')
        message['Subject'] = 'BNP Paribas update'
        message['From'] = MAILFROM
        message['To'] = MAILTO
        message['Date'] = format_datetime(localtime())
        mailer.send(message)

    db.commit()


if __name__ == '__main__':
    main()