diff src/bnpparibas.py @ 0:02ec4a9ab0f0

Import
author Daniele Nicolodi <daniele.nicolodi@obspm.fr>
date Tue, 24 Feb 2015 15:50:21 +0100 (2015-02-24)
parents
children ad577744dd8e
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/bnpparibas.py	Tue Feb 24 15:50:21 2015 +0100
@@ -0,0 +1,455 @@
+import email
+import os.path
+import re
+import smtplib
+import sqlite3
+import subprocess
+import textwrap
+
+from collections import namedtuple, defaultdict
+from contextlib import contextmanager
+from datetime import datetime
+from decimal import Decimal
+from email.mime.text import MIMEText
+from email.utils import format_datetime, localtime, parseaddr
+from io import BytesIO
+from itertools import product, islice
+from urllib.parse import urljoin
+
+import bs4
+import numpy as np
+import requests
+
+from PIL import Image
+
+DB = 'bnpparibas.sqlite'
+
+# message template
+MESSAGE = """\
+From: {sender:}
+Subject: {subject:}
+Date: {date:}
+Message-Id: {id:}
+
+{body:}
+"""
+
+# transaction template
+HEADER = '{:14s}  {:10s}  {:59s}  {:>8s}'.format('Id', 'Date', 'Description', 'Amount')
+TRANSACTION = '{id:}  {date:%d/%m/%Y}  {descr:59s}  {amount:>8s}'
+
+# as defined in bnpbaribas web app
+CATEGORIES = {
+      '1': 'Alimentation',
+      '7': 'Logement',
+      '8': 'Loisirs',
+      '9': 'Transport',
+     '12': 'Opérations bancaires',
+     '13': 'Non défini',
+     '14': 'Multimédia',
+     '20': 'Energies',
+     '22': 'Retrait',
+     '23': 'Sorties',
+    'R58': 'Non défini',
+}
+
+# euro symbol
+EURO = b'\xe2\x82\xac'.decode('utf-8')
+
+
+# load configuration
+from config import *
+
+
+# GPG encrypted text is ascii and as such does not require encoding
+# but its decrypted form is utf-8 and therefore the charset header
+# must be set accordingly. define an appropriate charset object
+email.charset.add_charset('utf8 7bit', header_enc=email.charset.SHORTEST,
+                          body_enc=None, output_charset='utf-8')
+
+
+Message = namedtuple('Message', 'id read icon sender subject date validity'.split())
+
+
+class Transaction:
+    def __init__(self, tid, date, descr, debit, credit, category):
+        self.id = tid
+        self.date = date
+        self.descr = descr
+        self.debit = debit
+        self.credit = credit
+        self.category = category
+
+    def __str__(self):
+        # there does not seem to be an easy way to format Decimal
+        # objects with a leading sign in both the positive and
+        # negative value cases so do it manually
+        d = vars(self)
+        if d['debit']:
+            d['amount'] = '-' + str(d['debit'])
+        if d['credit']:
+            d['amount'] = '+' + str(d['credit'])
+        return TRANSACTION.format(**d)
+        
+        
+def imslice(image):
+    for x, y in product(range(0, 5), range(0, 5)):
+        islice = image[27*x+1:27*(x+1), 27*y+1:27*(y+1), 0]
+        yield islice
+
+
+def imdecode(image):
+    keypad = np.load(os.path.join(os.path.dirname(__file__), 'keypad.npy'))
+    immap = {}
+    for n, islice in enumerate(imslice(image)):
+        # skip empty tiles
+        if np.mean(islice) > 248.0:
+            continue
+        # compare to reference tiles
+        for d in range(0, 10):
+            delta = np.sum(islice - keypad[d])
+            if delta < 100:
+                print(delta)
+                immap[d] = n + 1
+    return immap
+
+
+def amountparse(value):
+    # empty
+    if value == '\xa0':
+        return None
+    m = re.match(r'\s+((?:\d+\.)?\d+,\d+)\s+([^\s]+)\s+$', value, re.U|re.S)
+    if m is None:
+        raise ValueError(repr(value))
+    # euro
+    currency = m.group(2)
+    if currency != EURO:
+        raise ValueError(repr(currency))
+    return Decimal(m.group(1).replace('.', '').replace(',', '.'))
+
+
+class Site:
+    def __init__(self):
+        self.url = 'https://www.secure.bnpparibas.net'
+        self.req = requests.Session()
+
+    def login(self, user, passwd):
+        # login page
+        url = urljoin(self.url, '/banque/portail/particulier/HomeConnexion')
+        r = self.req.get(url, params={'type': 'homeconnex'})
+        r.raise_for_status()
+        # login form
+        soup = bs4.BeautifulSoup(r.text)
+        form = soup.find('form', attrs={'name': 'logincanalnet'})
+        # extract relevant data
+        action = form['action']
+        data = { field['name']: field['value'] for field in form('input') }
+
+        # keyboard image url
+        src = ''
+        tag = soup.find(attrs={'id': 'secret-nbr-keyboard'})
+        for prop in tag['style'].split(';'):
+            match = re.match(r'background-image:\s+url\(\'(.*)\'\)\s*', prop)
+            if match:
+                src = match.group(1)
+                break
+        # download keyboard image
+        r = self.req.get(urljoin(self.url, src))
+        image = np.array(Image.open(BytesIO(r.content)).convert('RGB'))
+        # decode digits position
+        passwdmap = imdecode(image)
+
+        # encode password
+        passwdenc = ''.join('%02d' % passwdmap[d] for d in map(int, passwd))
+
+        # username and password
+        data['ch1'] = user
+        data['ch5'] = passwdenc
+
+        # post
+        r = self.req.post(urljoin(self.url, action), data=data)
+        r.raise_for_status()
+        # redirection
+        m = re.search(r'document\.location\.replace\(\"(.+)\"\)', r.text)
+        dest = m.group(1)
+        r = self.req.get(dest)
+        r.raise_for_status()
+
+        # check for errors
+        soup = bs4.BeautifulSoup(r.text)
+        err = soup.find(attrs={'class': 'TitreErreur'})
+        if err:
+            raise ValueError(err.text)
+
+
+    def recent(self):
+        data = {
+            'BeginDate': '',
+            'Categs': '',
+            'Contracts': '',
+            'EndDate': '',
+            'OpTypes': '',
+            'cboFlowName': 'flow/iastatement',
+            'contractId': CONTRACT,
+            'contractIds': '',
+            'entryDashboard': '',
+            'execution': 'e6s1',
+            'externalIAId': 'IAStatements',
+            'g1Style': 'expand',
+            'g1Type': '',
+            'g2Style': 'collapse',
+            'g2Type': '',
+            'g3Style': 'collapse',
+            'g3Type': '',
+            'g4Style': 'collapse',
+            'g4Type': '',
+            'groupId': '-2',
+            'groupSelected': '-2',
+            'gt': 'homepage:basic-theme',
+            'pageId': 'releveoperations',
+            'pastOrPendingOperations': '1',
+            'sendEUD': 'true',
+            'step': 'STAMENTS', }
+
+        url = urljoin(self.url, '/banque/portail/particulier/FicheA')
+        r = self.req.post(url, data=data)
+        r.raise_for_status()
+        text = r.text
+
+        # the html is so broken beautifulsoup does not understand it
+        text = text.replace(
+            '<th class="thTitre" style="width:7%">Pointage </td>',
+            '<th class="thTitre" style="width:7%">Pointage </th>')
+        s = bs4.BeautifulSoup(text)
+
+        # extract transactions
+        table = s.find('table', id='tableCompte')
+        rows = table.find_all('tr')
+        for row in rows:
+            fields = row.find_all('td')
+            if not fields:
+                # skip headers row
+                continue
+            id = int(fields[0].input['id'].lstrip('_'))
+            date = datetime.strptime(fields[1].text, '%d/%m/%Y')
+            descr = fields[2].text.strip()
+            debit = amountparse(fields[3].text)
+            credit = amountparse(fields[4].text)
+            category = fields[5].text.strip()
+            categoryid = fields[6].span['class'][2][4:]
+            yield Transaction(id, date, descr, debit, credit, categoryid)
+
+
+    def messages(self):
+        data = {
+            'identifiant': 'BmmFicheListerMessagesRecus_20100607022434',
+            'type': 'fiche', }
+
+        url = urljoin(self.url, '/banque/portail/particulier/Fiche')
+        r = self.req.post(url, data=data)
+        r.raise_for_status()
+        s = bs4.BeautifulSoup(r.text)
+
+        # messages list
+        table = s.find('table', id='listeMessages')
+        for row in table.find_all('tr', recursive=False):
+            # skip headers and separators
+            if 'entete' in row['class']:
+                continue
+            # skip separators
+            if 'sep' in row['class']:
+                continue
+            # skip footer
+            if 'actions_bas' in row['class']:
+                continue
+            fields = row.find_all('td')
+            icon = fields[1].img['src']
+            sender = fields[2].text.strip()
+            subject = fields[4].a.text.strip()
+            date = datetime.strptime(fields[5]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec')
+            validity = datetime.strptime(fields[6]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec')
+            m = re.match(r'''validerFormulaire\('BmmFicheLireMessage_20100607022346','(.+)','(true|false)'\);$''', fields[4].a['onclick'])
+            mid = m.group(1)
+            read = m.group(2) == 'false'
+            yield Message(mid, read, icon, sender, subject, date, validity)
+
+
+    def message(self, mid):
+        data = {
+            'etape': 'boiteReception',
+            'idMessage': mid,
+            'identifiant': 'BmmFicheLireMessage_20100607022346',
+            'maxPagination': 2,
+            'minPagination': 1,
+            'nbElementParPage': 20,
+            'nbEltPagination': 5,
+            'nbPages': 2,
+            'newMsg': 'false',
+            'pagination': 1,
+            'type': 'fiche',
+            'typeAction': '', }
+
+        url = urljoin(self.url, '/banque/portail/particulier/Fiche')
+        r = self.req.post(url, data=data)
+        r.raise_for_status()
+        # fix badly broken html
+        text = r.text.replace('<br>', '<br/>').replace('</br>', '')
+        s = bs4.BeautifulSoup(text)
+
+        envelope = s.find('div', attrs={'class': 'enveloppe'})
+        rows = envelope.find_all('tr')
+        fields = rows[1].find_all('td')
+        # the messages list present a truncated sender
+        sender = fields[0].text.strip()
+        # not used
+        subject = fields[1].text.strip()
+        date = fields[2].text.strip()
+
+        content = s.find('div', attrs={'class': 'txtMessage'})
+        # clean up text
+        for t in content.find_all('style'):
+            t.extract()
+        for t in content.find_all('script'):
+            t.extract()
+        for t in content.find_all(id='info_pro'):
+            t.extract()
+        for t in content.find_all('br'):
+            t.replace_with('\n\n')
+        for t in content.find_all('b'):
+            if t.string:
+                t.replace_with('*%s*' % t.string.strip())
+        for t in content.find_all('li'):
+            t.replace_with('- %s\n\n' % t.text.strip())
+        # format nicely
+        text = re.sub(' +', ' ', content.text)
+        text = re.sub(r'\s+([\.:])', r'\1', text)
+        pars = []
+        for p in re.split('\n\n+', text):
+            p = p.strip()
+            if p:
+                pars.append('\n'.join(textwrap.wrap(p, 72)))
+        body = '\n\n'.join(pars)
+        return sender, body
+
+
+    def transactions(self):
+        data = {'ch_memo': 'NON',
+                'ch_rop_cpt_0': 'FR7630004001640000242975804',
+                'ch_rop_dat': 'tous',
+                'ch_rop_dat_deb': '',
+                'ch_rop_dat_fin': '',
+                'ch_rop_fmt_dat': 'JJMMAAAA',
+                'ch_rop_fmt_fic': 'RTEXC',
+                'ch_rop_fmt_sep': 'PT',
+                'ch_rop_mon': 'EUR',
+                'x': '55',
+                'y': '7'}
+        r = self.req.post(urljoin(self.url, '/SAF_TLC_CNF'), data=data)
+        r.raise_for_status()
+        s = bs4.BeautifulSoup(r.text)
+        path = s.find('a')['href']
+        r = self.req.get(urljoin(self.url, path))
+        r.raise_for_status()
+        return r.text
+
+
+class Mailer:
+    def __init__(self):
+        self.server = SMTPSERVER
+        self.port = SMTPPORT
+        self.starttls = SMTPSTARTTLS
+        self.username = SMTPUSER
+        self.password = SMTPPASSWD
+
+    @contextmanager
+    def connect(self):
+        smtp = smtplib.SMTP(self.server, self.port)
+        if self.starttls:
+            smtp.starttls()
+        if self.username:
+            smtp.login(self.username, self.password)
+        yield smtp
+        smtp.quit()
+
+    def send(self, message, fromaddr=None, toaddr=None):
+        if not fromaddr:
+            fromaddr = message['From']
+        if not toaddr:
+            toaddr = message['To']
+        with self.connect() as conn:
+            conn.sendmail(fromaddr, toaddr, str(message))
+
+
+def encrypt(message, sender, recipient):
+    sender = parseaddr(sender)[1]
+    recipient = parseaddr(recipient)[1]
+    cmd = [ "gpg", "--homedir", GNUPGHOME, "--batch", "--yes", "--no-options", "--armor",
+            "--local-user", sender, "--recipient", recipient, "--sign", "--encrypt"]
+    p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+    encdata = p.communicate(input=message.encode('utf-8'))[0].decode('ascii')
+    return encdata
+
+
+def main():
+    bnp = Site()
+    bnp.login(USERNAME, PASSWORD)
+
+    db = sqlite3.connect(DB)
+    db.execute('''CREATE TABLE IF NOT EXISTS messages (id TEXT PRIMARY KEY)''')
+    db.execute('''CREATE TABLE IF NOT EXISTS transactions (id INTEGER PRIMARY KEY)''')
+
+    mailer = Mailer()
+
+    ## unread messages
+    messages = filter(lambda x: not x.read, bnp.messages())
+    for m in sorted(messages, key=lambda x: x.date):
+        curs = db.cursor()
+        curs.execute('''SELECT IFNULL((SELECT id FROM messages WHERE id = ?), 0)''', (m.id, ))
+        if curs.fetchone()[0]:
+            # already handled
+            continue
+
+        # retrieve complete sender and message body
+        sender, body = bnp.message(m.id)
+
+        # compose and send message
+        body = MESSAGE.format(id=m.id, sender=sender, date=m.date, subject=m.subject, body=body)
+        message = MIMEText(encrypt(body, MAILFROM, MAILTO), _charset='utf8 7bit')
+        message['Subject'] = 'BNP Paribas message'
+        message['From'] = MAILFROM
+        message['To'] = MAILTO
+        message['Date'] = format_datetime(localtime(m.date))
+        mailer.send(message)
+
+        curs.execute('''INSERT INTO messages (id) VALUES (?)''', (m.id, ))
+        db.commit()
+
+        
+    ## transactions
+    transactions = bnp.recent()
+    curs = db.cursor()
+    lines = []
+    for t in transactions:
+        curs.execute('''SELECT IFNULL((SELECT id FROM transactions WHERE id = ?), 0)''', (t.id, ))
+        if curs.fetchone()[0]:
+            # already handled
+            continue
+        lines.append(str(t))
+        curs.execute('''INSERT INTO transactions (id) VALUES (?)''', (t.id, ))
+        
+    if lines:
+        lines.insert(0, HEADER)
+        lines.insert(1, '-' * len(HEADER))
+        body = '\n'.join(lines)
+        message = MIMEText(encrypt(body, MAILFROM, MAILTO), _charset='utf8 7bit')
+        message['Subject'] = 'BNP Paribas update'
+        message['From'] = MAILFROM
+        message['To'] = MAILTO
+        message['Date'] = format_datetime(localtime())
+        mailer.send(message)
+
+    db.commit()
+
+
+if __name__ == '__main__':
+    main()