Mercurial > hg > bnpparibas
view bnpparibas.py @ 13:37ce0dc68cad
Consider configuration parhs relartive to the configuration file location
author | Daniele Nicolodi <daniele@grinta.net> |
---|---|
date | Mon, 11 Jan 2016 21:39:40 +0100 |
parents | 4747393db602 |
children | 0a3509a12762 |
line wrap: on
line source
import cgi import email import imp import itertools import json import os.path import requests import smtplib import sqlite3 import subprocess import sys import textwrap import time from contextlib import contextmanager from datetime import datetime from email.mime.text import MIMEText from email.utils import format_datetime, localtime, parseaddr from io import BytesIO from pprint import pprint from urllib.parse import urljoin from html2text import HTML2Text from PIL import Image URL = 'https://mabanque.bnpparibas/' # message template MESSAGE = """\ From: {message.sender:} Subject: {message.subject:} Date: {message.date:%a, %d %b %Y %H:%M:%S} {message.body:} """ # transactions table row template TRANSACTION = """{xact.id:14d} {xact.date:10s} {xact.description:54s} {xact.amount:+8.2f}""" # transactions table header HEADER = """{:14s} {:10s} {:54s} {:>8s}""".format('Id', 'Date', 'Description', 'Amount') # transactions table footer FOOTER = """{:14s} {:10s} {:54s} {{balance:8.2f}}""".format('', '', 'BALANCE') # transactions table horizontal separator SEP = """-""" * len(HEADER) # GPG encrypted text is ascii and as such does not require encoding # but its decrypted form is utf-8 and therefore the charset header # must be set accordingly. define an appropriate charset object email.charset.add_charset('utf8 7bit', header_enc=email.charset.SHORTEST, body_enc=None, output_charset='utf-8') def loadconf(filename): module = imp.new_module('conf') module.__file__ = filename with open(filename) as fd: exec(compile(fd.read(), filename, 'exec'), module.__dict__) conf = {} for key in dir(module): if key.isupper(): conf[key] = getattr(module, key) return conf def wrap(p, indent): return textwrap.fill(p, 72, initial_indent=indent, subsequent_indent=indent) def html2text(html): # the html2text module does an ok job, but it can be improved in # the quality of the transformation and in the coding style conv = HTML2Text() conv.ignore_links = True conv.ignore_images = True conv.ignore_emphasis = True return conv.handle(html) class Mailer: def __init__(self, host='localhost', port=25, starttls=True, username=None, password=None): self.host = host self.port = port self.starttls = starttls self.username = username self.password = password @contextmanager def connect(self): smtp = smtplib.SMTP(self.host, self.port) if self.starttls: smtp.starttls() if self.username: smtp.login(self.username, self.password) yield smtp smtp.quit() def send(self, message, fromaddr=None, toaddr=None): if not fromaddr: fromaddr = message['From'] if not toaddr: toaddr = message['To'] with self.connect() as conn: conn.sendmail(fromaddr, toaddr, str(message)) class GPG: def __init__(self, homedir): self.homedir = homedir def encrypt(self, message, sender, recipient): sender = parseaddr(sender)[1] recipient = parseaddr(recipient)[1] cmd = [ "gpg2", "--homedir", self.homedir, "--sign", "--encrypt", "--batch", "--no-options", "--yes", "--armor", "--local-user", sender, "--recipient", recipient, ] p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) encdata, err = p.communicate(input=message.encode('utf-8')) if p.returncode: raise RuntimeError(p.returncode, err) return encdata.decode('ascii') class Transaction: __slots__ = 'id', 'date', 'category', 'description', 'amount', 'currency' def __init__(self, id, date, category, description, amount, currency): self.id = id self.date = date self.category = category self.description = description self.amount = amount self.currency = currency @classmethod def fromjson(cls, x): data = {'id': int(x['idOperation']), 'date': x['dateOperation'], 'category': int(x['idCategorie']), 'description': x['libelleOperation'].strip().replace('VIREMENT', 'VIR'), 'amount': x['montant']['montant'], 'currency': x['montant']['currency'], } return cls(**data) def __str__(self): return TRANSACTION.format(xact=self) class Message: __slots__ = 'id', 'date', 'subject', 'sender', 'content', 'quoted' def __init__(self, id, date, subject, sender, content, quoted): self.id = id self.date = date self.subject = subject self.sender = sender self.content = content self.quoted = quoted @classmethod def fromjson(cls, x): data = {'id': x['msg']['id'], 'date': datetime.strptime(x['msg']['id'], '%Y-%m-%d-%H.%M.%S.%f'), 'subject': x['msg']['objet'], 'sender': x['msg']['emetteur']['nom'], 'content': x['msg']['contenu'], 'quoted': None, } quoted = x.get('msgAttache') if quoted: data['quoted'] = quoted['contenu'] return cls(**data) @staticmethod def normalize(txt, indent=''): if '<div' in txt: txt = txt.replace('<br></br>','<br/><br/>') return html2text(txt) txt = txt.replace(r'<br/>', '\n') parts = [] for p in txt.split('\n'): p = p.strip() if p: p = wrap(p, indent) else: p = indent parts.append(p) return '\n'.join(parts) @property def body(self): body = self.normalize(self.content) if self.quoted is not None: body = body + '\n\n' + self.normalize(self.quoted, '> ') return body def __str__(self): return MESSAGE.format(message=self) class Keypad: def __init__(self, data): # reference keypad fname = os.path.join(os.path.dirname(__file__), 'keypad.jpeg') reference = Image.open(fname).convert('L') symbols = [ 8, 4, 1, 6, 3, 7, 9, 0, 5, 2 ] self.keypad = dict(zip(symbols, self.imslice(reference))) # decode keypad image = Image.open(BytesIO(data)).convert('L') self.keymap = {} for n, tile in enumerate(self.imslice(image), 1): # compare to reference tiles for sym, key in self.keypad.items(): if tile == key: self.keymap[sym] = n break # verify if sorted(self.keymap.keys()) != list(range(10)): raise ValueError('keypad decode failed') @staticmethod def imslice(image): for j, i in itertools.product(range(2), range(5)): yield image.crop((83 * i, 80 * j, 83 * (i + 1), 80 * (j + 1))) def encode(self, passwd): return ''.join('%02d' % self.keymap[d] for d in map(int, passwd)) class BNPParibas: def __init__(self): self.session = requests.Session() self.session.headers.update({'X-Requested-With': 'XMLHttpRequest'}) @staticmethod def validate(response): response.raise_for_status() ctype, params = cgi.parse_header(response.headers.get('content-type')) if ctype == 'application/json': data = response.json() # the status code may sometime be represented as string not int code = data.get('codeRetour', -1) if int(code) != 0: raise requests.HTTPError() return data def login(self, username, password): url = urljoin(URL, 'identification-wspl-pres/identification') r = self.session.get(url, params={'timestamp': int(time.time())}) v = self.validate(r) keypadid = v['data']['grille']['idGrille'] authtemplate = v['data']['authTemplate'] url = urljoin(URL, 'identification-wspl-pres/grille/' + keypadid) r = self.session.get(url) r.raise_for_status() keypad = Keypad(r.content) # fill authentication template auth = authtemplate auth = auth.replace('{{ idTelematique }}', username) auth = auth.replace('{{ password }}', keypad.encode(password)) auth = auth.replace('{{ clientele }}', '') url = urljoin(URL, 'SEEA-pa01/devServer/seeaserver') r = self.session.post(url, data={'AUTH': auth}) v = self.validate(r) return v['data'] def info(self): url = urljoin(URL, 'serviceinfosclient-wspl/rpc/InfosClient') r = self.session.get(url, params={'modeAppel': 0}) v = self.validate(r) return v['data'] def recent(self): url = urljoin(URL, 'udc-wspl/rest/getlstcpt') r = self.session.get(url) v = self.validate(r) account = v['data']['infoUdc']['familleCompte'][0]['compte'][0] url = urljoin(URL, 'rop-wspl/rest/releveOp') data = json.dumps({'ibanCrypte': account['key'], 'pastOrPending': 1, 'triAV': 0, 'startDate': None, # ddmmyyyy 'endDate': None}) headers = {'Content-Type': 'application/json'} r = self.session.post(url, headers=headers, data=data) v = self.validate(r) return v['data'] def messages(self): url = urljoin(URL, 'bmm-wspl/recupMsgRecu') r = self.session.get(url, params={'nbMessagesParPage': 200, 'index': 0}) v = self.validate(r) return v['data'] def message(self, mid): # required to set some cookies required by the next call url = urljoin(URL, 'fr/connexion/mes-outils/messagerie') r = self.session.get(url) self.validate(r) url = urljoin(URL, 'bmm-wspl/recupMsg') r = self.session.get(url, params={'identifiant': mid}) v = self.validate(r) return v['data'] def main(): conffile = sys.argv[1] conf = loadconf(conffile) datadir = os.path.dirname(conffile) for key in 'DATABASE', 'GNUPGHOME': # if path is not absolute, it is interpreted as relative # to the location of the configuration file if not os.path.isabs(conf[key]): conf[key] = os.path.join(datadir, conf[key]) db = sqlite3.connect(conf['DATABASE']) db.execute('''CREATE TABLE IF NOT EXISTS messages (id TEXT PRIMARY KEY)''') db.execute('''CREATE TABLE IF NOT EXISTS transactions (id INTEGER PRIMARY KEY)''') sendmail = Mailer(host=conf['SMTPHOST'], port=conf['SMTPPORT'], starttls=conf['SMTPSTARTTLS'], username=conf['SMTPUSER'], password=conf['SMTPPASSWD']).send encrypt = GPG(conf['GNUPGHOME']).encrypt remote = BNPParibas() remote.login(conf['USERNAME'], conf['PASSWORD']) ## transactions recent = remote.recent() data = recent['listerOperations']['compte'] transactions = [ Transaction.fromjson(x) for x in data['operationPassee'] ] balance = data['soldeDispo'] curs = db.cursor() unseen = [] for t in transactions: curs.execute('''SELECT COUNT(*) FROM transactions WHERE id = ?''', (t.id, )) if not curs.fetchone()[0]: # not seen before unseen.append(t) if unseen: lines = [] lines.append(HEADER) lines.append(SEP) for t in unseen: lines.append(str(t)) lines.append(SEP) lines.append(FOOTER.format(balance=balance)) text = '\n'.join(lines) payload = encrypt(text, conf['MAILFROM'], conf['MAILTO']) message = MIMEText(payload, _charset='utf8 7bit') message['Subject'] = 'BNP Paribas Account update' message['From'] = conf['MAILFROM'] message['To'] = conf['MAILTO'] message['Date'] = format_datetime(localtime()) sendmail(message) curs.executemany('''INSERT INTO transactions (id) VALUES (?)''', ((x.id, ) for x in unseen)) db.commit() ## messages data = remote.info() info = data['abonnement'] nnew = info['nombreMessageBMMNonLus'] + info['nombreMessageBilatNonLus'] data = remote.messages() for m in data['messages']: curs = db.cursor() curs.execute('''SELECT COUNT(*) FROM messages WHERE id = ?''', (m['id'], )) if curs.fetchone()[0]: # already handled continue text = Message.fromjson(remote.message(m['id'])) payload = encrypt(str(text), conf['MAILFROM'], conf['MAILTO']) message = MIMEText(payload, _charset='utf8 7bit') message['Subject'] = 'BNP Paribas Message' message['From'] = conf['MAILFROM'] message['To'] = conf['MAILTO'] message['Date'] = format_datetime(localtime()) sendmail(message) curs.execute('''INSERT INTO messages (id) VALUES (?)''', (m['id'], )) db.commit() if __name__ == '__main__': main()