Mercurial > hg > bnpparibas
view src/bnpparibas.py @ 3:1311f6533978
Load configuration from file specified on the command line
author | Daniele Nicolodi <daniele@grinta.net> |
---|---|
date | Wed, 25 Feb 2015 01:44:50 +0100 |
parents | ad577744dd8e |
children | b4c2db70bbf2 |
line wrap: on
line source
import email import imp import os.path import re import smtplib import sqlite3 import subprocess import textwrap from collections import namedtuple, defaultdict from contextlib import contextmanager from datetime import datetime from decimal import Decimal from email.mime.text import MIMEText from email.utils import format_datetime, localtime, parseaddr from io import BytesIO from itertools import product, islice from urllib.parse import urljoin import bs4 import click import requests from PIL import Image # message template MESSAGE = """\ From: {sender:} Subject: {subject:} Date: {date:} Message-Id: {id:} {body:} """ # transaction template HEADER = '{:14s} {:10s} {:59s} {:>8s}'.format('Id', 'Date', 'Description', 'Amount') TRANSACTION = '{id:} {date:%d/%m/%Y} {descr:59s} {amount:>8s}' # as defined in bnpbaribas web app CATEGORIES = { '1': 'Alimentation', '7': 'Logement', '8': 'Loisirs', '9': 'Transport', '12': 'Opérations bancaires', '13': 'Non défini', '14': 'Multimédia', '20': 'Energies', '22': 'Retrait', '23': 'Sorties', 'R58': 'Non défini', } # euro symbol EURO = b'\xe2\x82\xac'.decode('utf-8') # load configuration def loadconfig(filename): module = imp.new_module('config') module.__file__ = filename try: with open(filename) as fd: exec(compile(fd.read(), filename, 'exec'), module.__dict__) except IOError as e: e.strerror = 'Unable to load configuration file (%s)' % e.strerror raise config = {} for key in dir(module): if key.isupper(): config[key] = getattr(module, key) return config # GPG encrypted text is ascii and as such does not require encoding # but its decrypted form is utf-8 and therefore the charset header # must be set accordingly. define an appropriate charset object email.charset.add_charset('utf8 7bit', header_enc=email.charset.SHORTEST, body_enc=None, output_charset='utf-8') Message = namedtuple('Message', 'id read icon sender subject date validity'.split()) class Transaction: def __init__(self, tid, date, descr, debit, credit, category): self.id = tid self.date = date self.descr = descr self.debit = debit self.credit = credit self.category = category def __str__(self): # there does not seem to be an easy way to format Decimal # objects with a leading sign in both the positive and # negative value cases so do it manually d = vars(self) if d['debit']: d['amount'] = '-' + str(d['debit']) if d['credit']: d['amount'] = '+' + str(d['credit']) return TRANSACTION.format(**d) def imslice(image): for y, x in product(range(0, 5), range(0, 5)): yield image.crop((27 * x + 1, 27 * y + 1, 27 * (x + 1), 27 * (y + 1))) def imdecode(image): # load reference keypad keypad = Image.open(os.path.join(os.path.dirname(__file__), 'keypad.png')).convert('L') keypad = [ keypad.crop((26 * i, 0, 26 * (i + 1), 26)) for i in range(10) ] immap = {} for n, tile in enumerate(imslice(image)): # skip tiles with background only if tile.getextrema()[0] > 0: continue # compare to reference tiles for d in range(0, 10): if tile == keypad[d]: immap[d] = n + 1 break if sorted(immap.keys()) != list(range(10)): raise ValueError('keypad decode failed') return immap def amountparse(value): # empty if value == '\xa0': return None m = re.match(r'\s+((?:\d+\.)?\d+,\d+)\s+([^\s]+)\s+$', value, re.U|re.S) if m is None: raise ValueError(repr(value)) # euro currency = m.group(2) if currency != EURO: raise ValueError(repr(currency)) return Decimal(m.group(1).replace('.', '').replace(',', '.')) class Site: def __init__(self): self.url = 'https://www.secure.bnpparibas.net' self.req = requests.Session() def login(self, user, passwd): # login page url = urljoin(self.url, '/banque/portail/particulier/HomeConnexion') r = self.req.get(url, params={'type': 'homeconnex'}) r.raise_for_status() # login form soup = bs4.BeautifulSoup(r.text) form = soup.find('form', attrs={'name': 'logincanalnet'}) # extract relevant data action = form['action'] data = { field['name']: field['value'] for field in form('input') } # keyboard image url tag = soup.find(attrs={'id': 'secret-nbr-keyboard'}) for prop in tag['style'].split(';'): match = re.match(r'background-image:\s+url\(\'(.*)\'\)\s*', prop) if match: src = match.group(1) break # download keyboard image r = self.req.get(urljoin(self.url, src)) image = Image.open(BytesIO(r.content)).convert('L') # decode digits position passwdmap = imdecode(image) # encode password passwdenc = ''.join('%02d' % passwdmap[d] for d in map(int, passwd)) # username and password data['ch1'] = user data['ch5'] = passwdenc # post r = self.req.post(urljoin(self.url, action), data=data) r.raise_for_status() # redirection m = re.search(r'document\.location\.replace\(\"(.+)\"\)', r.text) dest = m.group(1) r = self.req.get(dest) r.raise_for_status() # check for errors soup = bs4.BeautifulSoup(r.text) err = soup.find(attrs={'class': 'TitreErreur'}) if err: raise ValueError(err.text) def recent(self, contract): data = { 'BeginDate': '', 'Categs': '', 'Contracts': '', 'EndDate': '', 'OpTypes': '', 'cboFlowName': 'flow/iastatement', 'contractId': contract, 'contractIds': '', 'entryDashboard': '', 'execution': 'e6s1', 'externalIAId': 'IAStatements', 'g1Style': 'expand', 'g1Type': '', 'g2Style': 'collapse', 'g2Type': '', 'g3Style': 'collapse', 'g3Type': '', 'g4Style': 'collapse', 'g4Type': '', 'groupId': '-2', 'groupSelected': '-2', 'gt': 'homepage:basic-theme', 'pageId': 'releveoperations', 'pastOrPendingOperations': '1', 'sendEUD': 'true', 'step': 'STAMENTS', } url = urljoin(self.url, '/banque/portail/particulier/FicheA') r = self.req.post(url, data=data) r.raise_for_status() text = r.text # the html is so broken beautifulsoup does not understand it text = text.replace( '<th class="thTitre" style="width:7%">Pointage </td>', '<th class="thTitre" style="width:7%">Pointage </th>') s = bs4.BeautifulSoup(text) # extract transactions table = s.find('table', id='tableCompte') rows = table.find_all('tr') for row in rows: fields = row.find_all('td') if not fields: # skip headers row continue id = int(fields[0].input['id'].lstrip('_')) date = datetime.strptime(fields[1].text, '%d/%m/%Y') descr = fields[2].text.strip() debit = amountparse(fields[3].text) credit = amountparse(fields[4].text) category = fields[5].text.strip() categoryid = fields[6].span['class'][2][4:] yield Transaction(id, date, descr, debit, credit, categoryid) def messages(self): data = { 'identifiant': 'BmmFicheListerMessagesRecus_20100607022434', 'type': 'fiche', } url = urljoin(self.url, '/banque/portail/particulier/Fiche') r = self.req.post(url, data=data) r.raise_for_status() s = bs4.BeautifulSoup(r.text) # messages list table = s.find('table', id='listeMessages') for row in table.find_all('tr', recursive=False): # skip headers and separators if 'entete' in row['class']: continue # skip separators if 'sep' in row['class']: continue # skip footer if 'actions_bas' in row['class']: continue fields = row.find_all('td') icon = fields[1].img['src'] sender = fields[2].text.strip() subject = fields[4].a.text.strip() date = datetime.strptime(fields[5]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec') validity = datetime.strptime(fields[6]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec') m = re.match(r'''validerFormulaire\('BmmFicheLireMessage_20100607022346','(.+)','(true|false)'\);$''', fields[4].a['onclick']) mid = m.group(1) read = m.group(2) == 'false' yield Message(mid, read, icon, sender, subject, date, validity) def message(self, mid): data = { 'etape': 'boiteReception', 'idMessage': mid, 'identifiant': 'BmmFicheLireMessage_20100607022346', 'maxPagination': 2, 'minPagination': 1, 'nbElementParPage': 20, 'nbEltPagination': 5, 'nbPages': 2, 'newMsg': 'false', 'pagination': 1, 'type': 'fiche', 'typeAction': '', } url = urljoin(self.url, '/banque/portail/particulier/Fiche') r = self.req.post(url, data=data) r.raise_for_status() # fix badly broken html text = r.text.replace('<br>', '<br/>').replace('</br>', '') s = bs4.BeautifulSoup(text) envelope = s.find('div', attrs={'class': 'enveloppe'}) rows = envelope.find_all('tr') fields = rows[1].find_all('td') # the messages list present a truncated sender sender = fields[0].text.strip() # not used subject = fields[1].text.strip() date = fields[2].text.strip() content = s.find('div', attrs={'class': 'txtMessage'}) # clean up text for t in content.find_all('style'): t.extract() for t in content.find_all('script'): t.extract() for t in content.find_all(id='info_pro'): t.extract() for t in content.find_all('br'): t.replace_with('\n\n') for t in content.find_all('b'): if t.string: t.replace_with('*%s*' % t.string.strip()) for t in content.find_all('li'): t.replace_with('- %s\n\n' % t.text.strip()) # format nicely text = re.sub(' +', ' ', content.text) text = re.sub(r'\s+([\.:])', r'\1', text) pars = [] for p in re.split('\n\n+', text): p = p.strip() if p: pars.append('\n'.join(textwrap.wrap(p, 72))) body = '\n\n'.join(pars) return sender, body def transactions(self): data = {'ch_memo': 'NON', 'ch_rop_cpt_0': 'FR7630004001640000242975804', 'ch_rop_dat': 'tous', 'ch_rop_dat_deb': '', 'ch_rop_dat_fin': '', 'ch_rop_fmt_dat': 'JJMMAAAA', 'ch_rop_fmt_fic': 'RTEXC', 'ch_rop_fmt_sep': 'PT', 'ch_rop_mon': 'EUR', 'x': '55', 'y': '7'} r = self.req.post(urljoin(self.url, '/SAF_TLC_CNF'), data=data) r.raise_for_status() s = bs4.BeautifulSoup(r.text) path = s.find('a')['href'] r = self.req.get(urljoin(self.url, path)) r.raise_for_status() return r.text class Mailer: def __init__(self, config): self.server = config.get('SMTPSERVER', 'localhost') self.port = config.get('SMTPPORT', 25) self.starttls = config.get('SMTPSTARTTLS', False) self.username = config.get('SMTPUSER', '') self.password = config.get('SMTPPASSWD', '') @contextmanager def connect(self): smtp = smtplib.SMTP(self.server, self.port) if self.starttls: smtp.starttls() if self.username: smtp.login(self.username, self.password) yield smtp smtp.quit() def send(self, message, fromaddr=None, toaddr=None): if not fromaddr: fromaddr = message['From'] if not toaddr: toaddr = message['To'] with self.connect() as conn: conn.sendmail(fromaddr, toaddr, str(message)) class GPG: def __init__(self, homedir): self.homedir = homedir def encrypt(self, message, sender, recipient): sender = parseaddr(sender)[1] recipient = parseaddr(recipient)[1] cmd = [ "gpg", "--homedir", self.homedir, "--batch", "--yes", "--no-options", "--armor", "--local-user", sender, "--recipient", recipient, "--sign", "--encrypt"] p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) encdata = p.communicate(input=message.encode('utf-8'))[0].decode('ascii') return encdata @click.command() @click.argument('filename') def main(filename): # load configuration config = loadconfig(filename) bnp = Site() bnp.login(config['USERNAME'], config['PASSWORD']) db = sqlite3.connect(config['DATABASE']) db.execute('''CREATE TABLE IF NOT EXISTS messages (id TEXT PRIMARY KEY)''') db.execute('''CREATE TABLE IF NOT EXISTS transactions (id INTEGER PRIMARY KEY)''') mailer = Mailer(config) encrypt = GPG(config['GNUPGHOME']).encrypt ## unread messages messages = filter(lambda x: not x.read, bnp.messages()) for m in sorted(messages, key=lambda x: x.date): curs = db.cursor() curs.execute('''SELECT IFNULL((SELECT id FROM messages WHERE id = ?), 0)''', (m.id, )) if curs.fetchone()[0]: # already handled continue # retrieve complete sender and message body sender, body = bnp.message(m.id) # compose and send message body = MESSAGE.format(id=m.id, sender=sender, date=m.date, subject=m.subject, body=body) message = MIMEText(encrypt(body, config['MAILFROM'], config['MAILTO']), _charset='utf8 7bit') message['Subject'] = 'BNP Paribas message' message['From'] = config['MAILFROM'] message['To'] = config['MAILTO'] message['Date'] = format_datetime(localtime(m.date)) mailer.send(message) curs.execute('''INSERT INTO messages (id) VALUES (?)''', (m.id, )) db.commit() ## transactions transactions = bnp.recent(config['CONTRACT']) curs = db.cursor() lines = [] for t in transactions: curs.execute('''SELECT IFNULL((SELECT id FROM transactions WHERE id = ?), 0)''', (t.id, )) if curs.fetchone()[0]: # already handled continue lines.append(str(t)) curs.execute('''INSERT INTO transactions (id) VALUES (?)''', (t.id, )) if lines: lines.insert(0, HEADER) lines.insert(1, '-' * len(HEADER)) body = '\n'.join(lines) message = MIMEText(encrypt(body, config['MAILFROM'], config['MAILTO']), _charset='utf8 7bit') message['Subject'] = 'BNP Paribas update' message['From'] = config['MAILFROM'] message['To'] = config['MAILTO'] message['Date'] = format_datetime(localtime()) mailer.send(message) db.commit() if __name__ == '__main__': main()