Mercurial > hg > bnpparibas
view src/bnpparibas.py @ 2:ad577744dd8e
Drop dependency on numpy
author | Daniele Nicolodi <daniele.nicolodi@obspm.fr> |
---|---|
date | Tue, 24 Feb 2015 17:23:39 +0100 |
parents | 02ec4a9ab0f0 |
children | 1311f6533978 |
line wrap: on
line source
import email import os.path import re import smtplib import sqlite3 import subprocess import textwrap from collections import namedtuple, defaultdict from contextlib import contextmanager from datetime import datetime from decimal import Decimal from email.mime.text import MIMEText from email.utils import format_datetime, localtime, parseaddr from io import BytesIO from itertools import product, islice from urllib.parse import urljoin import bs4 import requests from PIL import Image DB = 'bnpparibas.sqlite' # message template MESSAGE = """\ From: {sender:} Subject: {subject:} Date: {date:} Message-Id: {id:} {body:} """ # transaction template HEADER = '{:14s} {:10s} {:59s} {:>8s}'.format('Id', 'Date', 'Description', 'Amount') TRANSACTION = '{id:} {date:%d/%m/%Y} {descr:59s} {amount:>8s}' # as defined in bnpbaribas web app CATEGORIES = { '1': 'Alimentation', '7': 'Logement', '8': 'Loisirs', '9': 'Transport', '12': 'Opérations bancaires', '13': 'Non défini', '14': 'Multimédia', '20': 'Energies', '22': 'Retrait', '23': 'Sorties', 'R58': 'Non défini', } # euro symbol EURO = b'\xe2\x82\xac'.decode('utf-8') # load configuration from config import * # GPG encrypted text is ascii and as such does not require encoding # but its decrypted form is utf-8 and therefore the charset header # must be set accordingly. define an appropriate charset object email.charset.add_charset('utf8 7bit', header_enc=email.charset.SHORTEST, body_enc=None, output_charset='utf-8') Message = namedtuple('Message', 'id read icon sender subject date validity'.split()) class Transaction: def __init__(self, tid, date, descr, debit, credit, category): self.id = tid self.date = date self.descr = descr self.debit = debit self.credit = credit self.category = category def __str__(self): # there does not seem to be an easy way to format Decimal # objects with a leading sign in both the positive and # negative value cases so do it manually d = vars(self) if d['debit']: d['amount'] = '-' + str(d['debit']) if d['credit']: d['amount'] = '+' + str(d['credit']) return TRANSACTION.format(**d) def imslice(image): for y, x in product(range(0, 5), range(0, 5)): yield image.crop((27 * x + 1, 27 * y + 1, 27 * (x + 1), 27 * (y + 1))) def imdecode(image): # load reference keypad keypad = Image.open(os.path.join(os.path.dirname(__file__), 'keypad.png')).convert('L') keypad = [ keypad.crop((26 * i, 0, 26 * (i + 1), 26)) for i in range(10) ] immap = {} for n, tile in enumerate(imslice(image)): # skip tiles with background only if tile.getextrema()[0] > 0: continue # compare to reference tiles for d in range(0, 10): if tile == keypad[d]: immap[d] = n + 1 break if sorted(immap.keys()) != list(range(10)): raise ValueError('keypad decode failed') return immap def amountparse(value): # empty if value == '\xa0': return None m = re.match(r'\s+((?:\d+\.)?\d+,\d+)\s+([^\s]+)\s+$', value, re.U|re.S) if m is None: raise ValueError(repr(value)) # euro currency = m.group(2) if currency != EURO: raise ValueError(repr(currency)) return Decimal(m.group(1).replace('.', '').replace(',', '.')) class Site: def __init__(self): self.url = 'https://www.secure.bnpparibas.net' self.req = requests.Session() def login(self, user, passwd): # login page url = urljoin(self.url, '/banque/portail/particulier/HomeConnexion') r = self.req.get(url, params={'type': 'homeconnex'}) r.raise_for_status() # login form soup = bs4.BeautifulSoup(r.text) form = soup.find('form', attrs={'name': 'logincanalnet'}) # extract relevant data action = form['action'] data = { field['name']: field['value'] for field in form('input') } # keyboard image url src = '' tag = soup.find(attrs={'id': 'secret-nbr-keyboard'}) for prop in tag['style'].split(';'): match = re.match(r'background-image:\s+url\(\'(.*)\'\)\s*', prop) if match: src = match.group(1) break # download keyboard image r = self.req.get(urljoin(self.url, src)) image = Image.open(BytesIO(r.content)).convert('L') # decode digits position passwdmap = imdecode(image) # encode password passwdenc = ''.join('%02d' % passwdmap[d] for d in map(int, passwd)) # username and password data['ch1'] = user data['ch5'] = passwdenc # post r = self.req.post(urljoin(self.url, action), data=data) r.raise_for_status() # redirection m = re.search(r'document\.location\.replace\(\"(.+)\"\)', r.text) dest = m.group(1) r = self.req.get(dest) r.raise_for_status() # check for errors soup = bs4.BeautifulSoup(r.text) err = soup.find(attrs={'class': 'TitreErreur'}) if err: raise ValueError(err.text) def recent(self): data = { 'BeginDate': '', 'Categs': '', 'Contracts': '', 'EndDate': '', 'OpTypes': '', 'cboFlowName': 'flow/iastatement', 'contractId': CONTRACT, 'contractIds': '', 'entryDashboard': '', 'execution': 'e6s1', 'externalIAId': 'IAStatements', 'g1Style': 'expand', 'g1Type': '', 'g2Style': 'collapse', 'g2Type': '', 'g3Style': 'collapse', 'g3Type': '', 'g4Style': 'collapse', 'g4Type': '', 'groupId': '-2', 'groupSelected': '-2', 'gt': 'homepage:basic-theme', 'pageId': 'releveoperations', 'pastOrPendingOperations': '1', 'sendEUD': 'true', 'step': 'STAMENTS', } url = urljoin(self.url, '/banque/portail/particulier/FicheA') r = self.req.post(url, data=data) r.raise_for_status() text = r.text # the html is so broken beautifulsoup does not understand it text = text.replace( '<th class="thTitre" style="width:7%">Pointage </td>', '<th class="thTitre" style="width:7%">Pointage </th>') s = bs4.BeautifulSoup(text) # extract transactions table = s.find('table', id='tableCompte') rows = table.find_all('tr') for row in rows: fields = row.find_all('td') if not fields: # skip headers row continue id = int(fields[0].input['id'].lstrip('_')) date = datetime.strptime(fields[1].text, '%d/%m/%Y') descr = fields[2].text.strip() debit = amountparse(fields[3].text) credit = amountparse(fields[4].text) category = fields[5].text.strip() categoryid = fields[6].span['class'][2][4:] yield Transaction(id, date, descr, debit, credit, categoryid) def messages(self): data = { 'identifiant': 'BmmFicheListerMessagesRecus_20100607022434', 'type': 'fiche', } url = urljoin(self.url, '/banque/portail/particulier/Fiche') r = self.req.post(url, data=data) r.raise_for_status() s = bs4.BeautifulSoup(r.text) # messages list table = s.find('table', id='listeMessages') for row in table.find_all('tr', recursive=False): # skip headers and separators if 'entete' in row['class']: continue # skip separators if 'sep' in row['class']: continue # skip footer if 'actions_bas' in row['class']: continue fields = row.find_all('td') icon = fields[1].img['src'] sender = fields[2].text.strip() subject = fields[4].a.text.strip() date = datetime.strptime(fields[5]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec') validity = datetime.strptime(fields[6]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec') m = re.match(r'''validerFormulaire\('BmmFicheLireMessage_20100607022346','(.+)','(true|false)'\);$''', fields[4].a['onclick']) mid = m.group(1) read = m.group(2) == 'false' yield Message(mid, read, icon, sender, subject, date, validity) def message(self, mid): data = { 'etape': 'boiteReception', 'idMessage': mid, 'identifiant': 'BmmFicheLireMessage_20100607022346', 'maxPagination': 2, 'minPagination': 1, 'nbElementParPage': 20, 'nbEltPagination': 5, 'nbPages': 2, 'newMsg': 'false', 'pagination': 1, 'type': 'fiche', 'typeAction': '', } url = urljoin(self.url, '/banque/portail/particulier/Fiche') r = self.req.post(url, data=data) r.raise_for_status() # fix badly broken html text = r.text.replace('<br>', '<br/>').replace('</br>', '') s = bs4.BeautifulSoup(text) envelope = s.find('div', attrs={'class': 'enveloppe'}) rows = envelope.find_all('tr') fields = rows[1].find_all('td') # the messages list present a truncated sender sender = fields[0].text.strip() # not used subject = fields[1].text.strip() date = fields[2].text.strip() content = s.find('div', attrs={'class': 'txtMessage'}) # clean up text for t in content.find_all('style'): t.extract() for t in content.find_all('script'): t.extract() for t in content.find_all(id='info_pro'): t.extract() for t in content.find_all('br'): t.replace_with('\n\n') for t in content.find_all('b'): if t.string: t.replace_with('*%s*' % t.string.strip()) for t in content.find_all('li'): t.replace_with('- %s\n\n' % t.text.strip()) # format nicely text = re.sub(' +', ' ', content.text) text = re.sub(r'\s+([\.:])', r'\1', text) pars = [] for p in re.split('\n\n+', text): p = p.strip() if p: pars.append('\n'.join(textwrap.wrap(p, 72))) body = '\n\n'.join(pars) return sender, body def transactions(self): data = {'ch_memo': 'NON', 'ch_rop_cpt_0': 'FR7630004001640000242975804', 'ch_rop_dat': 'tous', 'ch_rop_dat_deb': '', 'ch_rop_dat_fin': '', 'ch_rop_fmt_dat': 'JJMMAAAA', 'ch_rop_fmt_fic': 'RTEXC', 'ch_rop_fmt_sep': 'PT', 'ch_rop_mon': 'EUR', 'x': '55', 'y': '7'} r = self.req.post(urljoin(self.url, '/SAF_TLC_CNF'), data=data) r.raise_for_status() s = bs4.BeautifulSoup(r.text) path = s.find('a')['href'] r = self.req.get(urljoin(self.url, path)) r.raise_for_status() return r.text class Mailer: def __init__(self): self.server = SMTPSERVER self.port = SMTPPORT self.starttls = SMTPSTARTTLS self.username = SMTPUSER self.password = SMTPPASSWD @contextmanager def connect(self): smtp = smtplib.SMTP(self.server, self.port) if self.starttls: smtp.starttls() if self.username: smtp.login(self.username, self.password) yield smtp smtp.quit() def send(self, message, fromaddr=None, toaddr=None): if not fromaddr: fromaddr = message['From'] if not toaddr: toaddr = message['To'] with self.connect() as conn: conn.sendmail(fromaddr, toaddr, str(message)) def encrypt(message, sender, recipient): sender = parseaddr(sender)[1] recipient = parseaddr(recipient)[1] cmd = [ "gpg", "--homedir", GNUPGHOME, "--batch", "--yes", "--no-options", "--armor", "--local-user", sender, "--recipient", recipient, "--sign", "--encrypt"] p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) encdata = p.communicate(input=message.encode('utf-8'))[0].decode('ascii') return encdata def main(): bnp = Site() bnp.login(USERNAME, PASSWORD) db = sqlite3.connect(DB) db.execute('''CREATE TABLE IF NOT EXISTS messages (id TEXT PRIMARY KEY)''') db.execute('''CREATE TABLE IF NOT EXISTS transactions (id INTEGER PRIMARY KEY)''') mailer = Mailer() ## unread messages messages = filter(lambda x: not x.read, bnp.messages()) for m in sorted(messages, key=lambda x: x.date): curs = db.cursor() curs.execute('''SELECT IFNULL((SELECT id FROM messages WHERE id = ?), 0)''', (m.id, )) if curs.fetchone()[0]: # already handled continue # retrieve complete sender and message body sender, body = bnp.message(m.id) # compose and send message body = MESSAGE.format(id=m.id, sender=sender, date=m.date, subject=m.subject, body=body) message = MIMEText(encrypt(body, MAILFROM, MAILTO), _charset='utf8 7bit') message['Subject'] = 'BNP Paribas message' message['From'] = MAILFROM message['To'] = MAILTO message['Date'] = format_datetime(localtime(m.date)) mailer.send(message) curs.execute('''INSERT INTO messages (id) VALUES (?)''', (m.id, )) db.commit() ## transactions transactions = bnp.recent() curs = db.cursor() lines = [] for t in transactions: curs.execute('''SELECT IFNULL((SELECT id FROM transactions WHERE id = ?), 0)''', (t.id, )) if curs.fetchone()[0]: # already handled continue lines.append(str(t)) curs.execute('''INSERT INTO transactions (id) VALUES (?)''', (t.id, )) if lines: lines.insert(0, HEADER) lines.insert(1, '-' * len(HEADER)) body = '\n'.join(lines) message = MIMEText(encrypt(body, MAILFROM, MAILTO), _charset='utf8 7bit') message['Subject'] = 'BNP Paribas update' message['From'] = MAILFROM message['To'] = MAILTO message['Date'] = format_datetime(localtime()) mailer.send(message) db.commit() if __name__ == '__main__': main()