Mercurial > hg > bnpparibas
diff bnpparibas.py @ 7:90f4e0bd0c2d
Almost complete rewrite to adapt to the new BNP Paribas website
author | Daniele Nicolodi <daniele@grinta.net> |
---|---|
date | Mon, 11 Jan 2016 19:01:00 +0100 |
parents | 13a8bc43bc09 |
children | 225885e803b4 |
line wrap: on
line diff
--- a/bnpparibas.py Mon Jan 11 18:57:25 2016 +0100 +++ b/bnpparibas.py Mon Jan 11 19:01:00 2016 +0100 @@ -1,383 +1,87 @@ -import email +import cgi import imp +import itertools +import json import os.path -import re +import requests import smtplib import sqlite3 -import subprocess import textwrap +import time -from collections import namedtuple, defaultdict from contextlib import contextmanager -from datetime import datetime -from decimal import Decimal from email.mime.text import MIMEText from email.utils import format_datetime, localtime, parseaddr from io import BytesIO -from itertools import product, islice +from pprint import pprint from urllib.parse import urljoin -import bs4 -import click -import requests - +from bs4 import BeautifulSoup +from html2text import HTML2Text from PIL import Image +URL = 'https://mabanque.bnpparibas/' + # message template MESSAGE = """\ -From: {sender:} -Subject: {subject:} -Date: {date:} -Message-Id: {id:} +From: {message.sender:} +Subject: {message.subject:} +Date: {message.date:} -{body:} +{message.body:} """ -# transaction template -HEADER = '{:14s} {:10s} {:59s} {:>8s}'.format('Id', 'Date', 'Description', 'Amount') -TRANSACTION = '{id:} {date:%d/%m/%Y} {descr:59s} {amount:>8s}' - -# as defined in bnpbaribas web app -CATEGORIES = { - '1': 'Alimentation', - '7': 'Logement', - '8': 'Loisirs', - '9': 'Transport', - '12': 'Opérations bancaires', - '13': 'Non défini', - '14': 'Multimédia', - '20': 'Energies', - '22': 'Retrait', - '23': 'Sorties', - 'R58': 'Non défini', -} - -# euro symbol -EURO = b'\xe2\x82\xac'.decode('utf-8') - +# transactions table row template +TRANSACTION = """{xact.id:14d} {xact.date:10s} {xact.description:54s} {xact.amount:+8.2f}""" -# load configuration -def loadconfig(filename): - module = imp.new_module('config') - module.__file__ = filename - try: - with open(filename) as fd: - exec(compile(fd.read(), filename, 'exec'), module.__dict__) - except IOError as e: - e.strerror = 'Unable to load configuration file (%s)' % e.strerror - raise - config = {} - for key in dir(module): - if key.isupper(): - config[key] = getattr(module, key) - return config - - -# GPG encrypted text is ascii and as such does not require encoding -# but its decrypted form is utf-8 and therefore the charset header -# must be set accordingly. define an appropriate charset object -email.charset.add_charset('utf8 7bit', header_enc=email.charset.SHORTEST, - body_enc=None, output_charset='utf-8') - - -Message = namedtuple('Message', 'id read icon sender subject date validity'.split()) - - -class Transaction: - def __init__(self, tid, date, descr, debit, credit, category): - self.id = tid - self.date = date - self.descr = descr - self.debit = debit - self.credit = credit - self.category = category - - def __str__(self): - # there does not seem to be an easy way to format Decimal - # objects with a leading sign in both the positive and - # negative value cases so do it manually - d = vars(self) - if d['debit']: - d['amount'] = '-' + str(d['debit']) - if d['credit']: - d['amount'] = '+' + str(d['credit']) - return TRANSACTION.format(**d) - - -def imslice(image): - for y, x in product(range(0, 5), range(0, 5)): - yield image.crop((27 * x + 1, 27 * y + 1, 27 * (x + 1), 27 * (y + 1))) - +# transactions table header +HEADER = """{:14s} {:10s} {:54s} {:>8s}""".format('Id', 'Date', 'Description', 'Amount') -def imdecode(image): - # load reference keypad - keypad = Image.open(os.path.join(os.path.dirname(__file__), 'keypad.png')).convert('L') - keypad = [ keypad.crop((26 * i, 0, 26 * (i + 1), 26)) for i in range(10) ] - immap = {} - for n, tile in enumerate(imslice(image)): - # skip tiles with background only - if tile.getextrema()[0] > 0: - continue - # compare to reference tiles - for d in range(0, 10): - if tile == keypad[d]: - immap[d] = n + 1 - break - if sorted(immap.keys()) != list(range(10)): - raise ValueError('keypad decode failed') - return immap - - -def amountparse(value): - # empty - if value == '\xa0': - return None - m = re.match(r'\s+((?:\d+\.)?\d+,\d+)\s+([^\s]+)\s+$', value, re.U|re.S) - if m is None: - raise ValueError(repr(value)) - # euro - currency = m.group(2) - if currency != EURO: - raise ValueError(repr(currency)) - return Decimal(m.group(1).replace('.', '').replace(',', '.')) - - -class Site: - def __init__(self): - self.url = 'https://www.secure.bnpparibas.net' - self.req = requests.Session() +# transactions table footer +FOOTER = """{:14s} {:10s} {:54s} {{balance:8.2f}}""".format('', '', 'BALANCE') - def login(self, user, passwd): - # login page - url = urljoin(self.url, '/banque/portail/particulier/HomeConnexion') - r = self.req.get(url, params={'type': 'homeconnex'}) - r.raise_for_status() - # login form - soup = bs4.BeautifulSoup(r.text) - form = soup.find('form', attrs={'name': 'logincanalnet'}) - # extract relevant data - action = form['action'] - data = { field['name']: field['value'] for field in form('input') } - - # keyboard image url - tag = soup.find(attrs={'id': 'secret-nbr-keyboard'}) - for prop in tag['style'].split(';'): - match = re.match(r'background-image:\s+url\(\'(.*)\'\)\s*', prop) - if match: - src = match.group(1) - break - # download keyboard image - r = self.req.get(urljoin(self.url, src)) - image = Image.open(BytesIO(r.content)).convert('L') - # decode digits position - passwdmap = imdecode(image) - - # encode password - passwdenc = ''.join('%02d' % passwdmap[d] for d in map(int, passwd)) - - # username and password - data['ch1'] = user - data['ch5'] = passwdenc - - # post - r = self.req.post(urljoin(self.url, action), data=data) - r.raise_for_status() - # redirection - m = re.search(r'document\.location\.replace\(\"(.+)\"\)', r.text) - dest = m.group(1) - r = self.req.get(dest) - r.raise_for_status() - - # check for errors - soup = bs4.BeautifulSoup(r.text) - err = soup.find(attrs={'class': 'TitreErreur'}) - if err: - raise ValueError(err.text) +# transactions table horizontal separator +SEP = """-""" * len(HEADER) - def recent(self, contract): - data = { - 'BeginDate': '', - 'Categs': '', - 'Contracts': '', - 'EndDate': '', - 'OpTypes': '', - 'cboFlowName': 'flow/iastatement', - 'contractId': contract, - 'contractIds': '', - 'entryDashboard': '', - 'execution': 'e6s1', - 'externalIAId': 'IAStatements', - 'g1Style': 'expand', - 'g1Type': '', - 'g2Style': 'collapse', - 'g2Type': '', - 'g3Style': 'collapse', - 'g3Type': '', - 'g4Style': 'collapse', - 'g4Type': '', - 'groupId': '-2', - 'groupSelected': '-2', - 'gt': 'homepage:basic-theme', - 'pageId': 'releveoperations', - 'pastOrPendingOperations': '1', - 'sendEUD': 'true', - 'step': 'STAMENTS', } - - url = urljoin(self.url, '/banque/portail/particulier/FicheA') - r = self.req.post(url, data=data) - r.raise_for_status() - text = r.text - - # the html is so broken beautifulsoup does not understand it - text = text.replace( - '<th class="thTitre" style="width:7%">Pointage </td>', - '<th class="thTitre" style="width:7%">Pointage </th>') - s = bs4.BeautifulSoup(text) - - # extract transactions - table = s.find('table', id='tableCompte') - rows = table.find_all('tr') - for row in rows: - fields = row.find_all('td') - if not fields: - # skip headers row - continue - id = int(fields[0].input['id'].lstrip('_')) - date = datetime.strptime(fields[1].text, '%d/%m/%Y') - descr = fields[2].text.strip() - debit = amountparse(fields[3].text) - credit = amountparse(fields[4].text) - category = fields[5].text.strip() - categoryid = fields[6].span['class'][2][4:] - yield Transaction(id, date, descr, debit, credit, categoryid) +def loadconf(filename): + module = imp.new_module('conf') + module.__file__ = filename + with open(filename) as fd: + exec(compile(fd.read(), filename, 'exec'), module.__dict__) + conf = {} + for key in dir(module): + if key.isupper(): + conf[key] = getattr(module, key) + return conf - def messages(self): - data = { - 'identifiant': 'BmmFicheListerMessagesRecus_20100607022434', - 'type': 'fiche', } - - url = urljoin(self.url, '/banque/portail/particulier/Fiche') - r = self.req.post(url, data=data) - r.raise_for_status() - s = bs4.BeautifulSoup(r.text) - - # messages list - table = s.find('table', id='listeMessages') - for row in table.find_all('tr', recursive=False): - # skip headers and separators - if 'entete' in row['class']: - continue - # skip separators - if 'sep' in row['class']: - continue - # skip footer - if 'actions_bas' in row['class']: - continue - fields = row.find_all('td') - icon = fields[1].img['src'] - sender = fields[2].text.strip() - subject = fields[4].a.text.strip() - date = datetime.strptime(fields[5]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec') - validity = datetime.strptime(fields[6]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec') - m = re.match(r'''validerFormulaire\('BmmFicheLireMessage_20100607022346','(.+)','(true|false)'\);$''', fields[4].a['onclick']) - mid = m.group(1) - read = m.group(2) == 'false' - yield Message(mid, read, icon, sender, subject, date, validity) +def wrap(p, indent): + return textwrap.fill(p, 72, initial_indent=indent, subsequent_indent=indent) - def message(self, mid): - data = { - 'etape': 'boiteReception', - 'idMessage': mid, - 'identifiant': 'BmmFicheLireMessage_20100607022346', - 'maxPagination': 2, - 'minPagination': 1, - 'nbElementParPage': 20, - 'nbEltPagination': 5, - 'nbPages': 2, - 'newMsg': 'false', - 'pagination': 1, - 'type': 'fiche', - 'typeAction': '', } - - url = urljoin(self.url, '/banque/portail/particulier/Fiche') - r = self.req.post(url, data=data) - r.raise_for_status() - # fix badly broken html - text = r.text.replace('<br>', '<br/>').replace('</br>', '') - s = bs4.BeautifulSoup(text) - - envelope = s.find('div', attrs={'class': 'enveloppe'}) - rows = envelope.find_all('tr') - fields = rows[1].find_all('td') - # the messages list present a truncated sender - sender = fields[0].text.strip() - # not used - subject = fields[1].text.strip() - date = fields[2].text.strip() - - content = s.find('div', attrs={'class': 'txtMessage'}) - # clean up text - for t in content.find_all('style'): - t.extract() - for t in content.find_all('script'): - t.extract() - for t in content.find_all(id='info_pro'): - t.extract() - for t in content.find_all('br'): - t.replace_with('\n\n') - for t in content.find_all('b'): - if t.string: - t.replace_with('*%s*' % t.string.strip()) - for t in content.find_all('li'): - t.replace_with('- %s\n\n' % t.text.strip()) - # format nicely - text = re.sub(' +', ' ', content.text) - text = re.sub(r'\s+([\.:])', r'\1', text) - pars = [] - for p in re.split('\n\n+', text): - p = p.strip() - if p: - pars.append('\n'.join(textwrap.wrap(p, 72))) - body = '\n\n'.join(pars) - return sender, body - - - def transactions(self): - data = {'ch_memo': 'NON', - 'ch_rop_cpt_0': 'FR7630004001640000242975804', - 'ch_rop_dat': 'tous', - 'ch_rop_dat_deb': '', - 'ch_rop_dat_fin': '', - 'ch_rop_fmt_dat': 'JJMMAAAA', - 'ch_rop_fmt_fic': 'RTEXC', - 'ch_rop_fmt_sep': 'PT', - 'ch_rop_mon': 'EUR', - 'x': '55', - 'y': '7'} - r = self.req.post(urljoin(self.url, '/SAF_TLC_CNF'), data=data) - r.raise_for_status() - s = bs4.BeautifulSoup(r.text) - path = s.find('a')['href'] - r = self.req.get(urljoin(self.url, path)) - r.raise_for_status() - return r.text +def html2text(html): + # the html2text module does an ok job, but it can be improved in + # the quality of the transformation and in the coding style + conv = HTML2Text() + conv.ignore_links = True + conv.ignore_images = True + conv.ignore_emphasis = True + return conv.handle(html) class Mailer: - def __init__(self, config): - self.server = config.get('SMTPSERVER', 'localhost') - self.port = config.get('SMTPPORT', 25) - self.starttls = config.get('SMTPSTARTTLS', False) - self.username = config.get('SMTPUSER', '') - self.password = config.get('SMTPPASSWD', '') + def __init__(self, host='localhost', port=25, starttls=True, username=None, password=None): + self.host = host + self.port = port + self.starttls = starttls + self.username = username + self.password = password @contextmanager def connect(self): - smtp = smtplib.SMTP(self.server, self.port) + smtp = smtplib.SMTP(self.host, self.port) if self.starttls: smtp.starttls() if self.username: @@ -397,12 +101,13 @@ class GPG: def __init__(self, homedir): self.homedir = homedir - + def encrypt(self, message, sender, recipient): sender = parseaddr(sender)[1] recipient = parseaddr(recipient)[1] - cmd = [ "gpg", "--homedir", self.homedir, "--batch", "--yes", "--no-options", "--armor", - "--local-user", sender, "--recipient", recipient, "--sign", "--encrypt"] + cmd = [ "gpg", "--homedir", self.homedir, "--sign", "--encrypt" + "--batch", "--no-options", "--yes", "--armor", + "--local-user", sender, "--recipient", recipient, ] p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) encdata, err = p.communicate(input=message.encode('utf-8')) if p.returncode: @@ -410,72 +115,276 @@ return encdata.decode('ascii') -@click.command() -@click.argument('filename') -def main(filename): - # load configuration - config = loadconfig(filename) - - bnp = Site() - bnp.login(config['USERNAME'], config['PASSWORD']) +class Transaction: + __slots__ = 'id', 'date', 'category', 'description', 'amount', 'currency' + + def __init__(self, id, date, category, description, amount, currency): + self.id = id + self.date = date + self.category = category + self.description = description + self.amount = amount + self.currency = currency + + @classmethod + def fromjson(cls, x): + data = {'id': int(x['idOperation']), + 'date': x['dateOperation'], + 'category': int(x['idCategorie']), + 'description': x['libelleOperation'].strip().replace('VIREMENT', 'VIR'), + 'amount': x['montant']['montant'], + 'currency': x['montant']['currency'], } + return cls(**data) + + def __str__(self): + return TRANSACTION.format(xact=self) + + +class Message: + __slots__ = 'id', 'date', 'subject', 'sender', 'content', 'quoted' + + def __init__(self, id, date, subject, sender, content, quoted): + self.id = id + self.date = date + self.subject = subject + self.sender = sender + self.content = content + self.quoted = quoted + + @classmethod + def fromjson(cls, x): + data = {'id': x['msg']['id'], + 'date': x['msg']['id'], + 'subject': x['msg']['objet'], + 'sender': x['msg']['emetteur']['nom'], + 'content': x['msg']['contenu'], + 'quoted': None, } + quoted = x.get('msgAttache') + if quoted: + data['quoted'] = quoted['contenu'] + return cls(**data) + + @staticmethod + def normalize(txt, indent=''): + if '<div' in txt: + txt = txt.replace('<br></br>','<br/><br/>') + return html2text(txt) + + txt = txt.replace(r'<br/>', '\n') + parts = [] + for p in txt.split('\n'): + p = p.strip() + if p: + p = wrap(p, indent) + else: + p = indent + parts.append(p) + return '\n'.join(parts) + + @property + def body(self): + body = self.normalize(self.content) + if self.quoted is not None: + body = body + '\n\n' + self.normalize(self.quoted, '> ') + return body + + def __str__(self): + return MESSAGE.format(message=self) + + +class Keypad: + def __init__(self, data): + # reference keypad + fname = os.path.join(os.path.dirname(__file__), 'keypad.jpeg') + reference = Image.open(fname).convert('L') + symbols = [ 8, 4, 1, 6, 3, 7, 9, 0, 5, 2 ] + self.keypad = dict(zip(symbols, self.imslice(reference))) + + # decode keypad + image = Image.open(BytesIO(data)).convert('L') + self.keymap = {} + for n, tile in enumerate(self.imslice(image), 1): + # compare to reference tiles + for sym, key in self.keypad.items(): + if tile == key: + self.keymap[sym] = n + break - db = sqlite3.connect(config['DATABASE']) + # verify + if sorted(self.keymap.keys()) != list(range(10)): + raise ValueError('keypad decode failed') + + @staticmethod + def imslice(image): + for j, i in itertools.product(range(2), range(5)): + yield image.crop((83 * i, 80 * j, 83 * (i + 1), 80 * (j + 1))) + + def encode(self, passwd): + return ''.join('%02d' % self.keymap[d] for d in map(int, passwd)) + + +class BNPParibas: + def __init__(self): + self.session = requests.Session() + self.session.headers.update({'X-Requested-With': 'XMLHttpRequest'}) + + @staticmethod + def validate(response): + response.raise_for_status() + ctype, params = cgi.parse_header(response.headers.get('content-type')) + if ctype == 'application/json': + data = response.json() + # the status code may sometime be represented as string not int + code = data.get('codeRetour', -1) + if int(code) != 0: + raise requests.HTTPError() + return data + + def login(self, username, password): + url = urljoin(URL, 'identification-wspl-pres/identification') + r = self.session.get(url, params={'timestamp': int(time.time())}) + v = self.validate(r) + + keypadid = v['data']['grille']['idGrille'] + authtemplate = v['data']['authTemplate'] + + url = urljoin(URL, 'identification-wspl-pres/grille/' + keypadid) + r = self.session.get(url) + r.raise_for_status() + + keypad = Keypad(r.content) + + # fill authentication template + auth = authtemplate + auth = auth.replace('{{ idTelematique }}', username) + auth = auth.replace('{{ password }}', keypad.encode(password)) + auth = auth.replace('{{ clientele }}', '') + + url = urljoin(URL, 'SEEA-pa01/devServer/seeaserver') + r = self.session.post(url, data={'AUTH': auth}) + v = self.validate(r) + return v['data'] + + def info(self): + url = urljoin(URL, 'serviceinfosclient-wspl/rpc/InfosClient') + r = self.session.get(url, params={'modeAppel': 0}) + v = self.validate(r) + return v['data'] + + def recent(self): + url = urljoin(URL, 'udc-wspl/rest/getlstcpt') + r = self.session.get(url) + v = self.validate(r) + account = v['data']['infoUdc']['familleCompte'][0]['compte'][0] + + url = urljoin(URL, 'rop-wspl/rest/releveOp') + data = json.dumps({'ibanCrypte': account['key'], + 'pastOrPending': 1, 'triAV': 0, + 'startDate': None, # ddmmyyyy + 'endDate': None}) + headers = {'Content-Type': 'application/json'} + r = self.session.post(url, headers=headers, data=data) + v = self.validate(r) + return v['data'] + + def messages(self): + url = urljoin(URL, 'bmm-wspl/recupMsgRecu') + r = self.session.get(url, params={'nbMessagesParPage': 200, 'index': 0}) + v = self.validate(r) + return v['data'] + + def message(self, mid): + # required to set some cookies required by the next call + url = urljoin(URL, 'fr/connexion/mes-outils/messagerie') + r = self.session.get(url) + self.validate(r) + + url = urljoin(URL, 'bmm-wspl/recupMsg') + r = self.session.get(url, params={'identifiant': mid}) + v = self.validate(r) + return v['data'] + + +def main(conffile): + conf = loadconf(conffile) + + db = sqlite3.connect(conf['DATABASE']) db.execute('''CREATE TABLE IF NOT EXISTS messages (id TEXT PRIMARY KEY)''') db.execute('''CREATE TABLE IF NOT EXISTS transactions (id INTEGER PRIMARY KEY)''') - mailer = Mailer(config) - encrypt = GPG(config['GNUPGHOME']).encrypt + sendmail = Mailer(host=conf['SMTPHOST'], + port=conf['SMTPPORT'], + starttls=conf['SMTPSTARTTLS'], + username=conf['SMTPUSER'], + password=conf['SMTPPASSWD']).send + + encrypt = GPG(conf['GNUPGHOME']).encrypt + + remote = BNPParibas() + remote.login(conf['USERNAME'], conf['PASSWORD']) + + ## transactions + recent = remote.recent() + data = recent['listerOperations']['compte'] + transactions = [ Transaction.fromjson(x) for x in data['operationPassee'] ] + balance = data['soldeDispo'] + + curs = db.cursor() + unseen = [] + for t in transactions: + curs.execute('''SELECT COUNT(*) FROM transactions WHERE id = ?''', (t.id, )) + if not curs.fetchone()[0]: + # not seen before + unseen.append(t) - ## unread messages - messages = filter(lambda x: not x.read, bnp.messages()) - for m in sorted(messages, key=lambda x: x.date): + lines = [] + lines.append(HEADER) + lines.append(SEP) + for t in unseen: + lines.append(str(t)) + lines.append(SEP) + lines.append(FOOTER.format(balance=balance)) + body = '\n'.join(lines) + + message = MIMEText(encrypt(body, conf['MAILFROM'], conf['MAILTO'])) + message['Subject'] = 'BNP Paribas Account update' + message['From'] = conf['MAILFROM'] + message['To'] = conf['MAILTO'] + message['Date'] = format_datetime(localtime()) + + sendmail(message) + + curs.executemany('''INSERT INTO transactions (id) VALUES (?)''', ((x.id, ) for x in unseen)) + db.commit() + + ## messages + data = remote.info() + info = data['abonnement'] + nnew = info['nombreMessageBMMNonLus'] + info['nombreMessageBilatNonLus'] + + data = remote.messages() + for m in data['messages']: + curs = db.cursor() - curs.execute('''SELECT IFNULL((SELECT id FROM messages WHERE id = ?), 0)''', (m.id, )) + curs.execute('''SELECT COUNT(*) FROM messages WHERE id = ?), 0)''', (m['id'], )) if curs.fetchone()[0]: # already handled continue - # retrieve complete sender and message body - sender, body = bnp.message(m.id) - - # compose and send message - body = MESSAGE.format(id=m.id, sender=sender, date=m.date, subject=m.subject, body=body) - message = MIMEText(encrypt(body, config['MAILFROM'], config['MAILTO']), _charset='utf8 7bit') - message['Subject'] = 'BNP Paribas message' - message['From'] = config['MAILFROM'] - message['To'] = config['MAILTO'] - message['Date'] = format_datetime(localtime(m.date)) - mailer.send(message) - - curs.execute('''INSERT INTO messages (id) VALUES (?)''', (m.id, )) - db.commit() + body = Message.fromjson(remote.message(m['id'])) - - ## transactions - transactions = bnp.recent(config['CONTRACT']) - curs = db.cursor() - lines = [] - for t in transactions: - curs.execute('''SELECT IFNULL((SELECT id FROM transactions WHERE id = ?), 0)''', (t.id, )) - if curs.fetchone()[0]: - # already handled - continue - lines.append(str(t)) - curs.execute('''INSERT INTO transactions (id) VALUES (?)''', (t.id, )) - - if lines: - lines.insert(0, HEADER) - lines.insert(1, '-' * len(HEADER)) - body = '\n'.join(lines) - message = MIMEText(encrypt(body, config['MAILFROM'], config['MAILTO']), _charset='utf8 7bit') - message['Subject'] = 'BNP Paribas update' - message['From'] = config['MAILFROM'] - message['To'] = config['MAILTO'] + message = MIMEText(encrypt(str(body), conf['MAILFROM'], conf['MAILTO'])) + message['Subject'] = 'BNP Paribas Message' + message['From'] = conf['MAILFROM'] + message['To'] = conf['MAILTO'] message['Date'] = format_datetime(localtime()) - mailer.send(message) + + sendmail(message) - db.commit() + curs.execute('''INSERT INTO messages (id) VALUES (?)''', (m['id'], )) + db.commit() if __name__ == '__main__': - main() + import sys + main(sys.argv[1])