Mercurial > hg > bnpparibas
diff src/bnpparibas.py @ 0:02ec4a9ab0f0
Import
author | Daniele Nicolodi <daniele.nicolodi@obspm.fr> |
---|---|
date | Tue, 24 Feb 2015 15:50:21 +0100 (2015-02-24) |
parents | |
children | ad577744dd8e |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/bnpparibas.py Tue Feb 24 15:50:21 2015 +0100 @@ -0,0 +1,455 @@ +import email +import os.path +import re +import smtplib +import sqlite3 +import subprocess +import textwrap + +from collections import namedtuple, defaultdict +from contextlib import contextmanager +from datetime import datetime +from decimal import Decimal +from email.mime.text import MIMEText +from email.utils import format_datetime, localtime, parseaddr +from io import BytesIO +from itertools import product, islice +from urllib.parse import urljoin + +import bs4 +import numpy as np +import requests + +from PIL import Image + +DB = 'bnpparibas.sqlite' + +# message template +MESSAGE = """\ +From: {sender:} +Subject: {subject:} +Date: {date:} +Message-Id: {id:} + +{body:} +""" + +# transaction template +HEADER = '{:14s} {:10s} {:59s} {:>8s}'.format('Id', 'Date', 'Description', 'Amount') +TRANSACTION = '{id:} {date:%d/%m/%Y} {descr:59s} {amount:>8s}' + +# as defined in bnpbaribas web app +CATEGORIES = { + '1': 'Alimentation', + '7': 'Logement', + '8': 'Loisirs', + '9': 'Transport', + '12': 'Opérations bancaires', + '13': 'Non défini', + '14': 'Multimédia', + '20': 'Energies', + '22': 'Retrait', + '23': 'Sorties', + 'R58': 'Non défini', +} + +# euro symbol +EURO = b'\xe2\x82\xac'.decode('utf-8') + + +# load configuration +from config import * + + +# GPG encrypted text is ascii and as such does not require encoding +# but its decrypted form is utf-8 and therefore the charset header +# must be set accordingly. define an appropriate charset object +email.charset.add_charset('utf8 7bit', header_enc=email.charset.SHORTEST, + body_enc=None, output_charset='utf-8') + + +Message = namedtuple('Message', 'id read icon sender subject date validity'.split()) + + +class Transaction: + def __init__(self, tid, date, descr, debit, credit, category): + self.id = tid + self.date = date + self.descr = descr + self.debit = debit + self.credit = credit + self.category = category + + def __str__(self): + # there does not seem to be an easy way to format Decimal + # objects with a leading sign in both the positive and + # negative value cases so do it manually + d = vars(self) + if d['debit']: + d['amount'] = '-' + str(d['debit']) + if d['credit']: + d['amount'] = '+' + str(d['credit']) + return TRANSACTION.format(**d) + + +def imslice(image): + for x, y in product(range(0, 5), range(0, 5)): + islice = image[27*x+1:27*(x+1), 27*y+1:27*(y+1), 0] + yield islice + + +def imdecode(image): + keypad = np.load(os.path.join(os.path.dirname(__file__), 'keypad.npy')) + immap = {} + for n, islice in enumerate(imslice(image)): + # skip empty tiles + if np.mean(islice) > 248.0: + continue + # compare to reference tiles + for d in range(0, 10): + delta = np.sum(islice - keypad[d]) + if delta < 100: + print(delta) + immap[d] = n + 1 + return immap + + +def amountparse(value): + # empty + if value == '\xa0': + return None + m = re.match(r'\s+((?:\d+\.)?\d+,\d+)\s+([^\s]+)\s+$', value, re.U|re.S) + if m is None: + raise ValueError(repr(value)) + # euro + currency = m.group(2) + if currency != EURO: + raise ValueError(repr(currency)) + return Decimal(m.group(1).replace('.', '').replace(',', '.')) + + +class Site: + def __init__(self): + self.url = 'https://www.secure.bnpparibas.net' + self.req = requests.Session() + + def login(self, user, passwd): + # login page + url = urljoin(self.url, '/banque/portail/particulier/HomeConnexion') + r = self.req.get(url, params={'type': 'homeconnex'}) + r.raise_for_status() + # login form + soup = bs4.BeautifulSoup(r.text) + form = soup.find('form', attrs={'name': 'logincanalnet'}) + # extract relevant data + action = form['action'] + data = { field['name']: field['value'] for field in form('input') } + + # keyboard image url + src = '' + tag = soup.find(attrs={'id': 'secret-nbr-keyboard'}) + for prop in tag['style'].split(';'): + match = re.match(r'background-image:\s+url\(\'(.*)\'\)\s*', prop) + if match: + src = match.group(1) + break + # download keyboard image + r = self.req.get(urljoin(self.url, src)) + image = np.array(Image.open(BytesIO(r.content)).convert('RGB')) + # decode digits position + passwdmap = imdecode(image) + + # encode password + passwdenc = ''.join('%02d' % passwdmap[d] for d in map(int, passwd)) + + # username and password + data['ch1'] = user + data['ch5'] = passwdenc + + # post + r = self.req.post(urljoin(self.url, action), data=data) + r.raise_for_status() + # redirection + m = re.search(r'document\.location\.replace\(\"(.+)\"\)', r.text) + dest = m.group(1) + r = self.req.get(dest) + r.raise_for_status() + + # check for errors + soup = bs4.BeautifulSoup(r.text) + err = soup.find(attrs={'class': 'TitreErreur'}) + if err: + raise ValueError(err.text) + + + def recent(self): + data = { + 'BeginDate': '', + 'Categs': '', + 'Contracts': '', + 'EndDate': '', + 'OpTypes': '', + 'cboFlowName': 'flow/iastatement', + 'contractId': CONTRACT, + 'contractIds': '', + 'entryDashboard': '', + 'execution': 'e6s1', + 'externalIAId': 'IAStatements', + 'g1Style': 'expand', + 'g1Type': '', + 'g2Style': 'collapse', + 'g2Type': '', + 'g3Style': 'collapse', + 'g3Type': '', + 'g4Style': 'collapse', + 'g4Type': '', + 'groupId': '-2', + 'groupSelected': '-2', + 'gt': 'homepage:basic-theme', + 'pageId': 'releveoperations', + 'pastOrPendingOperations': '1', + 'sendEUD': 'true', + 'step': 'STAMENTS', } + + url = urljoin(self.url, '/banque/portail/particulier/FicheA') + r = self.req.post(url, data=data) + r.raise_for_status() + text = r.text + + # the html is so broken beautifulsoup does not understand it + text = text.replace( + '<th class="thTitre" style="width:7%">Pointage </td>', + '<th class="thTitre" style="width:7%">Pointage </th>') + s = bs4.BeautifulSoup(text) + + # extract transactions + table = s.find('table', id='tableCompte') + rows = table.find_all('tr') + for row in rows: + fields = row.find_all('td') + if not fields: + # skip headers row + continue + id = int(fields[0].input['id'].lstrip('_')) + date = datetime.strptime(fields[1].text, '%d/%m/%Y') + descr = fields[2].text.strip() + debit = amountparse(fields[3].text) + credit = amountparse(fields[4].text) + category = fields[5].text.strip() + categoryid = fields[6].span['class'][2][4:] + yield Transaction(id, date, descr, debit, credit, categoryid) + + + def messages(self): + data = { + 'identifiant': 'BmmFicheListerMessagesRecus_20100607022434', + 'type': 'fiche', } + + url = urljoin(self.url, '/banque/portail/particulier/Fiche') + r = self.req.post(url, data=data) + r.raise_for_status() + s = bs4.BeautifulSoup(r.text) + + # messages list + table = s.find('table', id='listeMessages') + for row in table.find_all('tr', recursive=False): + # skip headers and separators + if 'entete' in row['class']: + continue + # skip separators + if 'sep' in row['class']: + continue + # skip footer + if 'actions_bas' in row['class']: + continue + fields = row.find_all('td') + icon = fields[1].img['src'] + sender = fields[2].text.strip() + subject = fields[4].a.text.strip() + date = datetime.strptime(fields[5]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec') + validity = datetime.strptime(fields[6]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec') + m = re.match(r'''validerFormulaire\('BmmFicheLireMessage_20100607022346','(.+)','(true|false)'\);$''', fields[4].a['onclick']) + mid = m.group(1) + read = m.group(2) == 'false' + yield Message(mid, read, icon, sender, subject, date, validity) + + + def message(self, mid): + data = { + 'etape': 'boiteReception', + 'idMessage': mid, + 'identifiant': 'BmmFicheLireMessage_20100607022346', + 'maxPagination': 2, + 'minPagination': 1, + 'nbElementParPage': 20, + 'nbEltPagination': 5, + 'nbPages': 2, + 'newMsg': 'false', + 'pagination': 1, + 'type': 'fiche', + 'typeAction': '', } + + url = urljoin(self.url, '/banque/portail/particulier/Fiche') + r = self.req.post(url, data=data) + r.raise_for_status() + # fix badly broken html + text = r.text.replace('<br>', '<br/>').replace('</br>', '') + s = bs4.BeautifulSoup(text) + + envelope = s.find('div', attrs={'class': 'enveloppe'}) + rows = envelope.find_all('tr') + fields = rows[1].find_all('td') + # the messages list present a truncated sender + sender = fields[0].text.strip() + # not used + subject = fields[1].text.strip() + date = fields[2].text.strip() + + content = s.find('div', attrs={'class': 'txtMessage'}) + # clean up text + for t in content.find_all('style'): + t.extract() + for t in content.find_all('script'): + t.extract() + for t in content.find_all(id='info_pro'): + t.extract() + for t in content.find_all('br'): + t.replace_with('\n\n') + for t in content.find_all('b'): + if t.string: + t.replace_with('*%s*' % t.string.strip()) + for t in content.find_all('li'): + t.replace_with('- %s\n\n' % t.text.strip()) + # format nicely + text = re.sub(' +', ' ', content.text) + text = re.sub(r'\s+([\.:])', r'\1', text) + pars = [] + for p in re.split('\n\n+', text): + p = p.strip() + if p: + pars.append('\n'.join(textwrap.wrap(p, 72))) + body = '\n\n'.join(pars) + return sender, body + + + def transactions(self): + data = {'ch_memo': 'NON', + 'ch_rop_cpt_0': 'FR7630004001640000242975804', + 'ch_rop_dat': 'tous', + 'ch_rop_dat_deb': '', + 'ch_rop_dat_fin': '', + 'ch_rop_fmt_dat': 'JJMMAAAA', + 'ch_rop_fmt_fic': 'RTEXC', + 'ch_rop_fmt_sep': 'PT', + 'ch_rop_mon': 'EUR', + 'x': '55', + 'y': '7'} + r = self.req.post(urljoin(self.url, '/SAF_TLC_CNF'), data=data) + r.raise_for_status() + s = bs4.BeautifulSoup(r.text) + path = s.find('a')['href'] + r = self.req.get(urljoin(self.url, path)) + r.raise_for_status() + return r.text + + +class Mailer: + def __init__(self): + self.server = SMTPSERVER + self.port = SMTPPORT + self.starttls = SMTPSTARTTLS + self.username = SMTPUSER + self.password = SMTPPASSWD + + @contextmanager + def connect(self): + smtp = smtplib.SMTP(self.server, self.port) + if self.starttls: + smtp.starttls() + if self.username: + smtp.login(self.username, self.password) + yield smtp + smtp.quit() + + def send(self, message, fromaddr=None, toaddr=None): + if not fromaddr: + fromaddr = message['From'] + if not toaddr: + toaddr = message['To'] + with self.connect() as conn: + conn.sendmail(fromaddr, toaddr, str(message)) + + +def encrypt(message, sender, recipient): + sender = parseaddr(sender)[1] + recipient = parseaddr(recipient)[1] + cmd = [ "gpg", "--homedir", GNUPGHOME, "--batch", "--yes", "--no-options", "--armor", + "--local-user", sender, "--recipient", recipient, "--sign", "--encrypt"] + p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + encdata = p.communicate(input=message.encode('utf-8'))[0].decode('ascii') + return encdata + + +def main(): + bnp = Site() + bnp.login(USERNAME, PASSWORD) + + db = sqlite3.connect(DB) + db.execute('''CREATE TABLE IF NOT EXISTS messages (id TEXT PRIMARY KEY)''') + db.execute('''CREATE TABLE IF NOT EXISTS transactions (id INTEGER PRIMARY KEY)''') + + mailer = Mailer() + + ## unread messages + messages = filter(lambda x: not x.read, bnp.messages()) + for m in sorted(messages, key=lambda x: x.date): + curs = db.cursor() + curs.execute('''SELECT IFNULL((SELECT id FROM messages WHERE id = ?), 0)''', (m.id, )) + if curs.fetchone()[0]: + # already handled + continue + + # retrieve complete sender and message body + sender, body = bnp.message(m.id) + + # compose and send message + body = MESSAGE.format(id=m.id, sender=sender, date=m.date, subject=m.subject, body=body) + message = MIMEText(encrypt(body, MAILFROM, MAILTO), _charset='utf8 7bit') + message['Subject'] = 'BNP Paribas message' + message['From'] = MAILFROM + message['To'] = MAILTO + message['Date'] = format_datetime(localtime(m.date)) + mailer.send(message) + + curs.execute('''INSERT INTO messages (id) VALUES (?)''', (m.id, )) + db.commit() + + + ## transactions + transactions = bnp.recent() + curs = db.cursor() + lines = [] + for t in transactions: + curs.execute('''SELECT IFNULL((SELECT id FROM transactions WHERE id = ?), 0)''', (t.id, )) + if curs.fetchone()[0]: + # already handled + continue + lines.append(str(t)) + curs.execute('''INSERT INTO transactions (id) VALUES (?)''', (t.id, )) + + if lines: + lines.insert(0, HEADER) + lines.insert(1, '-' * len(HEADER)) + body = '\n'.join(lines) + message = MIMEText(encrypt(body, MAILFROM, MAILTO), _charset='utf8 7bit') + message['Subject'] = 'BNP Paribas update' + message['From'] = MAILFROM + message['To'] = MAILTO + message['Date'] = format_datetime(localtime()) + mailer.send(message) + + db.commit() + + +if __name__ == '__main__': + main()