Mercurial > hg > bnpparibas
diff bnpparibas.py @ 6:13a8bc43bc09
Simplify package structure
author | Daniele Nicolodi <daniele@grinta.net> |
---|---|
date | Mon, 11 Jan 2016 18:57:25 +0100 |
parents | src/bnpparibas.py@a47012c9db15 |
children | 90f4e0bd0c2d |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/bnpparibas.py Mon Jan 11 18:57:25 2016 +0100 @@ -0,0 +1,481 @@ +import email +import imp +import os.path +import re +import smtplib +import sqlite3 +import subprocess +import textwrap + +from collections import namedtuple, defaultdict +from contextlib import contextmanager +from datetime import datetime +from decimal import Decimal +from email.mime.text import MIMEText +from email.utils import format_datetime, localtime, parseaddr +from io import BytesIO +from itertools import product, islice +from urllib.parse import urljoin + +import bs4 +import click +import requests + +from PIL import Image + + +# message template +MESSAGE = """\ +From: {sender:} +Subject: {subject:} +Date: {date:} +Message-Id: {id:} + +{body:} +""" + +# transaction template +HEADER = '{:14s} {:10s} {:59s} {:>8s}'.format('Id', 'Date', 'Description', 'Amount') +TRANSACTION = '{id:} {date:%d/%m/%Y} {descr:59s} {amount:>8s}' + +# as defined in bnpbaribas web app +CATEGORIES = { + '1': 'Alimentation', + '7': 'Logement', + '8': 'Loisirs', + '9': 'Transport', + '12': 'Opérations bancaires', + '13': 'Non défini', + '14': 'Multimédia', + '20': 'Energies', + '22': 'Retrait', + '23': 'Sorties', + 'R58': 'Non défini', +} + +# euro symbol +EURO = b'\xe2\x82\xac'.decode('utf-8') + + +# load configuration +def loadconfig(filename): + module = imp.new_module('config') + module.__file__ = filename + try: + with open(filename) as fd: + exec(compile(fd.read(), filename, 'exec'), module.__dict__) + except IOError as e: + e.strerror = 'Unable to load configuration file (%s)' % e.strerror + raise + config = {} + for key in dir(module): + if key.isupper(): + config[key] = getattr(module, key) + return config + + +# GPG encrypted text is ascii and as such does not require encoding +# but its decrypted form is utf-8 and therefore the charset header +# must be set accordingly. define an appropriate charset object +email.charset.add_charset('utf8 7bit', header_enc=email.charset.SHORTEST, + body_enc=None, output_charset='utf-8') + + +Message = namedtuple('Message', 'id read icon sender subject date validity'.split()) + + +class Transaction: + def __init__(self, tid, date, descr, debit, credit, category): + self.id = tid + self.date = date + self.descr = descr + self.debit = debit + self.credit = credit + self.category = category + + def __str__(self): + # there does not seem to be an easy way to format Decimal + # objects with a leading sign in both the positive and + # negative value cases so do it manually + d = vars(self) + if d['debit']: + d['amount'] = '-' + str(d['debit']) + if d['credit']: + d['amount'] = '+' + str(d['credit']) + return TRANSACTION.format(**d) + + +def imslice(image): + for y, x in product(range(0, 5), range(0, 5)): + yield image.crop((27 * x + 1, 27 * y + 1, 27 * (x + 1), 27 * (y + 1))) + + +def imdecode(image): + # load reference keypad + keypad = Image.open(os.path.join(os.path.dirname(__file__), 'keypad.png')).convert('L') + keypad = [ keypad.crop((26 * i, 0, 26 * (i + 1), 26)) for i in range(10) ] + immap = {} + for n, tile in enumerate(imslice(image)): + # skip tiles with background only + if tile.getextrema()[0] > 0: + continue + # compare to reference tiles + for d in range(0, 10): + if tile == keypad[d]: + immap[d] = n + 1 + break + if sorted(immap.keys()) != list(range(10)): + raise ValueError('keypad decode failed') + return immap + + +def amountparse(value): + # empty + if value == '\xa0': + return None + m = re.match(r'\s+((?:\d+\.)?\d+,\d+)\s+([^\s]+)\s+$', value, re.U|re.S) + if m is None: + raise ValueError(repr(value)) + # euro + currency = m.group(2) + if currency != EURO: + raise ValueError(repr(currency)) + return Decimal(m.group(1).replace('.', '').replace(',', '.')) + + +class Site: + def __init__(self): + self.url = 'https://www.secure.bnpparibas.net' + self.req = requests.Session() + + def login(self, user, passwd): + # login page + url = urljoin(self.url, '/banque/portail/particulier/HomeConnexion') + r = self.req.get(url, params={'type': 'homeconnex'}) + r.raise_for_status() + # login form + soup = bs4.BeautifulSoup(r.text) + form = soup.find('form', attrs={'name': 'logincanalnet'}) + # extract relevant data + action = form['action'] + data = { field['name']: field['value'] for field in form('input') } + + # keyboard image url + tag = soup.find(attrs={'id': 'secret-nbr-keyboard'}) + for prop in tag['style'].split(';'): + match = re.match(r'background-image:\s+url\(\'(.*)\'\)\s*', prop) + if match: + src = match.group(1) + break + # download keyboard image + r = self.req.get(urljoin(self.url, src)) + image = Image.open(BytesIO(r.content)).convert('L') + # decode digits position + passwdmap = imdecode(image) + + # encode password + passwdenc = ''.join('%02d' % passwdmap[d] for d in map(int, passwd)) + + # username and password + data['ch1'] = user + data['ch5'] = passwdenc + + # post + r = self.req.post(urljoin(self.url, action), data=data) + r.raise_for_status() + # redirection + m = re.search(r'document\.location\.replace\(\"(.+)\"\)', r.text) + dest = m.group(1) + r = self.req.get(dest) + r.raise_for_status() + + # check for errors + soup = bs4.BeautifulSoup(r.text) + err = soup.find(attrs={'class': 'TitreErreur'}) + if err: + raise ValueError(err.text) + + + def recent(self, contract): + data = { + 'BeginDate': '', + 'Categs': '', + 'Contracts': '', + 'EndDate': '', + 'OpTypes': '', + 'cboFlowName': 'flow/iastatement', + 'contractId': contract, + 'contractIds': '', + 'entryDashboard': '', + 'execution': 'e6s1', + 'externalIAId': 'IAStatements', + 'g1Style': 'expand', + 'g1Type': '', + 'g2Style': 'collapse', + 'g2Type': '', + 'g3Style': 'collapse', + 'g3Type': '', + 'g4Style': 'collapse', + 'g4Type': '', + 'groupId': '-2', + 'groupSelected': '-2', + 'gt': 'homepage:basic-theme', + 'pageId': 'releveoperations', + 'pastOrPendingOperations': '1', + 'sendEUD': 'true', + 'step': 'STAMENTS', } + + url = urljoin(self.url, '/banque/portail/particulier/FicheA') + r = self.req.post(url, data=data) + r.raise_for_status() + text = r.text + + # the html is so broken beautifulsoup does not understand it + text = text.replace( + '<th class="thTitre" style="width:7%">Pointage </td>', + '<th class="thTitre" style="width:7%">Pointage </th>') + s = bs4.BeautifulSoup(text) + + # extract transactions + table = s.find('table', id='tableCompte') + rows = table.find_all('tr') + for row in rows: + fields = row.find_all('td') + if not fields: + # skip headers row + continue + id = int(fields[0].input['id'].lstrip('_')) + date = datetime.strptime(fields[1].text, '%d/%m/%Y') + descr = fields[2].text.strip() + debit = amountparse(fields[3].text) + credit = amountparse(fields[4].text) + category = fields[5].text.strip() + categoryid = fields[6].span['class'][2][4:] + yield Transaction(id, date, descr, debit, credit, categoryid) + + + def messages(self): + data = { + 'identifiant': 'BmmFicheListerMessagesRecus_20100607022434', + 'type': 'fiche', } + + url = urljoin(self.url, '/banque/portail/particulier/Fiche') + r = self.req.post(url, data=data) + r.raise_for_status() + s = bs4.BeautifulSoup(r.text) + + # messages list + table = s.find('table', id='listeMessages') + for row in table.find_all('tr', recursive=False): + # skip headers and separators + if 'entete' in row['class']: + continue + # skip separators + if 'sep' in row['class']: + continue + # skip footer + if 'actions_bas' in row['class']: + continue + fields = row.find_all('td') + icon = fields[1].img['src'] + sender = fields[2].text.strip() + subject = fields[4].a.text.strip() + date = datetime.strptime(fields[5]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec') + validity = datetime.strptime(fields[6]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec') + m = re.match(r'''validerFormulaire\('BmmFicheLireMessage_20100607022346','(.+)','(true|false)'\);$''', fields[4].a['onclick']) + mid = m.group(1) + read = m.group(2) == 'false' + yield Message(mid, read, icon, sender, subject, date, validity) + + + def message(self, mid): + data = { + 'etape': 'boiteReception', + 'idMessage': mid, + 'identifiant': 'BmmFicheLireMessage_20100607022346', + 'maxPagination': 2, + 'minPagination': 1, + 'nbElementParPage': 20, + 'nbEltPagination': 5, + 'nbPages': 2, + 'newMsg': 'false', + 'pagination': 1, + 'type': 'fiche', + 'typeAction': '', } + + url = urljoin(self.url, '/banque/portail/particulier/Fiche') + r = self.req.post(url, data=data) + r.raise_for_status() + # fix badly broken html + text = r.text.replace('<br>', '<br/>').replace('</br>', '') + s = bs4.BeautifulSoup(text) + + envelope = s.find('div', attrs={'class': 'enveloppe'}) + rows = envelope.find_all('tr') + fields = rows[1].find_all('td') + # the messages list present a truncated sender + sender = fields[0].text.strip() + # not used + subject = fields[1].text.strip() + date = fields[2].text.strip() + + content = s.find('div', attrs={'class': 'txtMessage'}) + # clean up text + for t in content.find_all('style'): + t.extract() + for t in content.find_all('script'): + t.extract() + for t in content.find_all(id='info_pro'): + t.extract() + for t in content.find_all('br'): + t.replace_with('\n\n') + for t in content.find_all('b'): + if t.string: + t.replace_with('*%s*' % t.string.strip()) + for t in content.find_all('li'): + t.replace_with('- %s\n\n' % t.text.strip()) + # format nicely + text = re.sub(' +', ' ', content.text) + text = re.sub(r'\s+([\.:])', r'\1', text) + pars = [] + for p in re.split('\n\n+', text): + p = p.strip() + if p: + pars.append('\n'.join(textwrap.wrap(p, 72))) + body = '\n\n'.join(pars) + return sender, body + + + def transactions(self): + data = {'ch_memo': 'NON', + 'ch_rop_cpt_0': 'FR7630004001640000242975804', + 'ch_rop_dat': 'tous', + 'ch_rop_dat_deb': '', + 'ch_rop_dat_fin': '', + 'ch_rop_fmt_dat': 'JJMMAAAA', + 'ch_rop_fmt_fic': 'RTEXC', + 'ch_rop_fmt_sep': 'PT', + 'ch_rop_mon': 'EUR', + 'x': '55', + 'y': '7'} + r = self.req.post(urljoin(self.url, '/SAF_TLC_CNF'), data=data) + r.raise_for_status() + s = bs4.BeautifulSoup(r.text) + path = s.find('a')['href'] + r = self.req.get(urljoin(self.url, path)) + r.raise_for_status() + return r.text + + +class Mailer: + def __init__(self, config): + self.server = config.get('SMTPSERVER', 'localhost') + self.port = config.get('SMTPPORT', 25) + self.starttls = config.get('SMTPSTARTTLS', False) + self.username = config.get('SMTPUSER', '') + self.password = config.get('SMTPPASSWD', '') + + @contextmanager + def connect(self): + smtp = smtplib.SMTP(self.server, self.port) + if self.starttls: + smtp.starttls() + if self.username: + smtp.login(self.username, self.password) + yield smtp + smtp.quit() + + def send(self, message, fromaddr=None, toaddr=None): + if not fromaddr: + fromaddr = message['From'] + if not toaddr: + toaddr = message['To'] + with self.connect() as conn: + conn.sendmail(fromaddr, toaddr, str(message)) + + +class GPG: + def __init__(self, homedir): + self.homedir = homedir + + def encrypt(self, message, sender, recipient): + sender = parseaddr(sender)[1] + recipient = parseaddr(recipient)[1] + cmd = [ "gpg", "--homedir", self.homedir, "--batch", "--yes", "--no-options", "--armor", + "--local-user", sender, "--recipient", recipient, "--sign", "--encrypt"] + p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + encdata, err = p.communicate(input=message.encode('utf-8')) + if p.returncode: + raise RuntimeError(p.returncode, err) + return encdata.decode('ascii') + + +@click.command() +@click.argument('filename') +def main(filename): + # load configuration + config = loadconfig(filename) + + bnp = Site() + bnp.login(config['USERNAME'], config['PASSWORD']) + + db = sqlite3.connect(config['DATABASE']) + db.execute('''CREATE TABLE IF NOT EXISTS messages (id TEXT PRIMARY KEY)''') + db.execute('''CREATE TABLE IF NOT EXISTS transactions (id INTEGER PRIMARY KEY)''') + + mailer = Mailer(config) + encrypt = GPG(config['GNUPGHOME']).encrypt + + ## unread messages + messages = filter(lambda x: not x.read, bnp.messages()) + for m in sorted(messages, key=lambda x: x.date): + curs = db.cursor() + curs.execute('''SELECT IFNULL((SELECT id FROM messages WHERE id = ?), 0)''', (m.id, )) + if curs.fetchone()[0]: + # already handled + continue + + # retrieve complete sender and message body + sender, body = bnp.message(m.id) + + # compose and send message + body = MESSAGE.format(id=m.id, sender=sender, date=m.date, subject=m.subject, body=body) + message = MIMEText(encrypt(body, config['MAILFROM'], config['MAILTO']), _charset='utf8 7bit') + message['Subject'] = 'BNP Paribas message' + message['From'] = config['MAILFROM'] + message['To'] = config['MAILTO'] + message['Date'] = format_datetime(localtime(m.date)) + mailer.send(message) + + curs.execute('''INSERT INTO messages (id) VALUES (?)''', (m.id, )) + db.commit() + + + ## transactions + transactions = bnp.recent(config['CONTRACT']) + curs = db.cursor() + lines = [] + for t in transactions: + curs.execute('''SELECT IFNULL((SELECT id FROM transactions WHERE id = ?), 0)''', (t.id, )) + if curs.fetchone()[0]: + # already handled + continue + lines.append(str(t)) + curs.execute('''INSERT INTO transactions (id) VALUES (?)''', (t.id, )) + + if lines: + lines.insert(0, HEADER) + lines.insert(1, '-' * len(HEADER)) + body = '\n'.join(lines) + message = MIMEText(encrypt(body, config['MAILFROM'], config['MAILTO']), _charset='utf8 7bit') + message['Subject'] = 'BNP Paribas update' + message['From'] = config['MAILFROM'] + message['To'] = config['MAILTO'] + message['Date'] = format_datetime(localtime()) + mailer.send(message) + + db.commit() + + +if __name__ == '__main__': + main()