Mercurial > hg > bnpparibas
changeset 6:13a8bc43bc09
Simplify package structure
author | Daniele Nicolodi <daniele@grinta.net> |
---|---|
date | Mon, 11 Jan 2016 18:57:25 +0100 |
parents | a47012c9db15 |
children | 90f4e0bd0c2d |
files | bnpparibas.py keypad.png src/bnpparibas.py src/keypad.png |
diffstat | 4 files changed, 481 insertions(+), 481 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/bnpparibas.py Mon Jan 11 18:57:25 2016 +0100 @@ -0,0 +1,481 @@ +import email +import imp +import os.path +import re +import smtplib +import sqlite3 +import subprocess +import textwrap + +from collections import namedtuple, defaultdict +from contextlib import contextmanager +from datetime import datetime +from decimal import Decimal +from email.mime.text import MIMEText +from email.utils import format_datetime, localtime, parseaddr +from io import BytesIO +from itertools import product, islice +from urllib.parse import urljoin + +import bs4 +import click +import requests + +from PIL import Image + + +# message template +MESSAGE = """\ +From: {sender:} +Subject: {subject:} +Date: {date:} +Message-Id: {id:} + +{body:} +""" + +# transaction template +HEADER = '{:14s} {:10s} {:59s} {:>8s}'.format('Id', 'Date', 'Description', 'Amount') +TRANSACTION = '{id:} {date:%d/%m/%Y} {descr:59s} {amount:>8s}' + +# as defined in bnpbaribas web app +CATEGORIES = { + '1': 'Alimentation', + '7': 'Logement', + '8': 'Loisirs', + '9': 'Transport', + '12': 'Opérations bancaires', + '13': 'Non défini', + '14': 'Multimédia', + '20': 'Energies', + '22': 'Retrait', + '23': 'Sorties', + 'R58': 'Non défini', +} + +# euro symbol +EURO = b'\xe2\x82\xac'.decode('utf-8') + + +# load configuration +def loadconfig(filename): + module = imp.new_module('config') + module.__file__ = filename + try: + with open(filename) as fd: + exec(compile(fd.read(), filename, 'exec'), module.__dict__) + except IOError as e: + e.strerror = 'Unable to load configuration file (%s)' % e.strerror + raise + config = {} + for key in dir(module): + if key.isupper(): + config[key] = getattr(module, key) + return config + + +# GPG encrypted text is ascii and as such does not require encoding +# but its decrypted form is utf-8 and therefore the charset header +# must be set accordingly. define an appropriate charset object +email.charset.add_charset('utf8 7bit', header_enc=email.charset.SHORTEST, + body_enc=None, output_charset='utf-8') + + +Message = namedtuple('Message', 'id read icon sender subject date validity'.split()) + + +class Transaction: + def __init__(self, tid, date, descr, debit, credit, category): + self.id = tid + self.date = date + self.descr = descr + self.debit = debit + self.credit = credit + self.category = category + + def __str__(self): + # there does not seem to be an easy way to format Decimal + # objects with a leading sign in both the positive and + # negative value cases so do it manually + d = vars(self) + if d['debit']: + d['amount'] = '-' + str(d['debit']) + if d['credit']: + d['amount'] = '+' + str(d['credit']) + return TRANSACTION.format(**d) + + +def imslice(image): + for y, x in product(range(0, 5), range(0, 5)): + yield image.crop((27 * x + 1, 27 * y + 1, 27 * (x + 1), 27 * (y + 1))) + + +def imdecode(image): + # load reference keypad + keypad = Image.open(os.path.join(os.path.dirname(__file__), 'keypad.png')).convert('L') + keypad = [ keypad.crop((26 * i, 0, 26 * (i + 1), 26)) for i in range(10) ] + immap = {} + for n, tile in enumerate(imslice(image)): + # skip tiles with background only + if tile.getextrema()[0] > 0: + continue + # compare to reference tiles + for d in range(0, 10): + if tile == keypad[d]: + immap[d] = n + 1 + break + if sorted(immap.keys()) != list(range(10)): + raise ValueError('keypad decode failed') + return immap + + +def amountparse(value): + # empty + if value == '\xa0': + return None + m = re.match(r'\s+((?:\d+\.)?\d+,\d+)\s+([^\s]+)\s+$', value, re.U|re.S) + if m is None: + raise ValueError(repr(value)) + # euro + currency = m.group(2) + if currency != EURO: + raise ValueError(repr(currency)) + return Decimal(m.group(1).replace('.', '').replace(',', '.')) + + +class Site: + def __init__(self): + self.url = 'https://www.secure.bnpparibas.net' + self.req = requests.Session() + + def login(self, user, passwd): + # login page + url = urljoin(self.url, '/banque/portail/particulier/HomeConnexion') + r = self.req.get(url, params={'type': 'homeconnex'}) + r.raise_for_status() + # login form + soup = bs4.BeautifulSoup(r.text) + form = soup.find('form', attrs={'name': 'logincanalnet'}) + # extract relevant data + action = form['action'] + data = { field['name']: field['value'] for field in form('input') } + + # keyboard image url + tag = soup.find(attrs={'id': 'secret-nbr-keyboard'}) + for prop in tag['style'].split(';'): + match = re.match(r'background-image:\s+url\(\'(.*)\'\)\s*', prop) + if match: + src = match.group(1) + break + # download keyboard image + r = self.req.get(urljoin(self.url, src)) + image = Image.open(BytesIO(r.content)).convert('L') + # decode digits position + passwdmap = imdecode(image) + + # encode password + passwdenc = ''.join('%02d' % passwdmap[d] for d in map(int, passwd)) + + # username and password + data['ch1'] = user + data['ch5'] = passwdenc + + # post + r = self.req.post(urljoin(self.url, action), data=data) + r.raise_for_status() + # redirection + m = re.search(r'document\.location\.replace\(\"(.+)\"\)', r.text) + dest = m.group(1) + r = self.req.get(dest) + r.raise_for_status() + + # check for errors + soup = bs4.BeautifulSoup(r.text) + err = soup.find(attrs={'class': 'TitreErreur'}) + if err: + raise ValueError(err.text) + + + def recent(self, contract): + data = { + 'BeginDate': '', + 'Categs': '', + 'Contracts': '', + 'EndDate': '', + 'OpTypes': '', + 'cboFlowName': 'flow/iastatement', + 'contractId': contract, + 'contractIds': '', + 'entryDashboard': '', + 'execution': 'e6s1', + 'externalIAId': 'IAStatements', + 'g1Style': 'expand', + 'g1Type': '', + 'g2Style': 'collapse', + 'g2Type': '', + 'g3Style': 'collapse', + 'g3Type': '', + 'g4Style': 'collapse', + 'g4Type': '', + 'groupId': '-2', + 'groupSelected': '-2', + 'gt': 'homepage:basic-theme', + 'pageId': 'releveoperations', + 'pastOrPendingOperations': '1', + 'sendEUD': 'true', + 'step': 'STAMENTS', } + + url = urljoin(self.url, '/banque/portail/particulier/FicheA') + r = self.req.post(url, data=data) + r.raise_for_status() + text = r.text + + # the html is so broken beautifulsoup does not understand it + text = text.replace( + '<th class="thTitre" style="width:7%">Pointage </td>', + '<th class="thTitre" style="width:7%">Pointage </th>') + s = bs4.BeautifulSoup(text) + + # extract transactions + table = s.find('table', id='tableCompte') + rows = table.find_all('tr') + for row in rows: + fields = row.find_all('td') + if not fields: + # skip headers row + continue + id = int(fields[0].input['id'].lstrip('_')) + date = datetime.strptime(fields[1].text, '%d/%m/%Y') + descr = fields[2].text.strip() + debit = amountparse(fields[3].text) + credit = amountparse(fields[4].text) + category = fields[5].text.strip() + categoryid = fields[6].span['class'][2][4:] + yield Transaction(id, date, descr, debit, credit, categoryid) + + + def messages(self): + data = { + 'identifiant': 'BmmFicheListerMessagesRecus_20100607022434', + 'type': 'fiche', } + + url = urljoin(self.url, '/banque/portail/particulier/Fiche') + r = self.req.post(url, data=data) + r.raise_for_status() + s = bs4.BeautifulSoup(r.text) + + # messages list + table = s.find('table', id='listeMessages') + for row in table.find_all('tr', recursive=False): + # skip headers and separators + if 'entete' in row['class']: + continue + # skip separators + if 'sep' in row['class']: + continue + # skip footer + if 'actions_bas' in row['class']: + continue + fields = row.find_all('td') + icon = fields[1].img['src'] + sender = fields[2].text.strip() + subject = fields[4].a.text.strip() + date = datetime.strptime(fields[5]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec') + validity = datetime.strptime(fields[6]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec') + m = re.match(r'''validerFormulaire\('BmmFicheLireMessage_20100607022346','(.+)','(true|false)'\);$''', fields[4].a['onclick']) + mid = m.group(1) + read = m.group(2) == 'false' + yield Message(mid, read, icon, sender, subject, date, validity) + + + def message(self, mid): + data = { + 'etape': 'boiteReception', + 'idMessage': mid, + 'identifiant': 'BmmFicheLireMessage_20100607022346', + 'maxPagination': 2, + 'minPagination': 1, + 'nbElementParPage': 20, + 'nbEltPagination': 5, + 'nbPages': 2, + 'newMsg': 'false', + 'pagination': 1, + 'type': 'fiche', + 'typeAction': '', } + + url = urljoin(self.url, '/banque/portail/particulier/Fiche') + r = self.req.post(url, data=data) + r.raise_for_status() + # fix badly broken html + text = r.text.replace('<br>', '<br/>').replace('</br>', '') + s = bs4.BeautifulSoup(text) + + envelope = s.find('div', attrs={'class': 'enveloppe'}) + rows = envelope.find_all('tr') + fields = rows[1].find_all('td') + # the messages list present a truncated sender + sender = fields[0].text.strip() + # not used + subject = fields[1].text.strip() + date = fields[2].text.strip() + + content = s.find('div', attrs={'class': 'txtMessage'}) + # clean up text + for t in content.find_all('style'): + t.extract() + for t in content.find_all('script'): + t.extract() + for t in content.find_all(id='info_pro'): + t.extract() + for t in content.find_all('br'): + t.replace_with('\n\n') + for t in content.find_all('b'): + if t.string: + t.replace_with('*%s*' % t.string.strip()) + for t in content.find_all('li'): + t.replace_with('- %s\n\n' % t.text.strip()) + # format nicely + text = re.sub(' +', ' ', content.text) + text = re.sub(r'\s+([\.:])', r'\1', text) + pars = [] + for p in re.split('\n\n+', text): + p = p.strip() + if p: + pars.append('\n'.join(textwrap.wrap(p, 72))) + body = '\n\n'.join(pars) + return sender, body + + + def transactions(self): + data = {'ch_memo': 'NON', + 'ch_rop_cpt_0': 'FR7630004001640000242975804', + 'ch_rop_dat': 'tous', + 'ch_rop_dat_deb': '', + 'ch_rop_dat_fin': '', + 'ch_rop_fmt_dat': 'JJMMAAAA', + 'ch_rop_fmt_fic': 'RTEXC', + 'ch_rop_fmt_sep': 'PT', + 'ch_rop_mon': 'EUR', + 'x': '55', + 'y': '7'} + r = self.req.post(urljoin(self.url, '/SAF_TLC_CNF'), data=data) + r.raise_for_status() + s = bs4.BeautifulSoup(r.text) + path = s.find('a')['href'] + r = self.req.get(urljoin(self.url, path)) + r.raise_for_status() + return r.text + + +class Mailer: + def __init__(self, config): + self.server = config.get('SMTPSERVER', 'localhost') + self.port = config.get('SMTPPORT', 25) + self.starttls = config.get('SMTPSTARTTLS', False) + self.username = config.get('SMTPUSER', '') + self.password = config.get('SMTPPASSWD', '') + + @contextmanager + def connect(self): + smtp = smtplib.SMTP(self.server, self.port) + if self.starttls: + smtp.starttls() + if self.username: + smtp.login(self.username, self.password) + yield smtp + smtp.quit() + + def send(self, message, fromaddr=None, toaddr=None): + if not fromaddr: + fromaddr = message['From'] + if not toaddr: + toaddr = message['To'] + with self.connect() as conn: + conn.sendmail(fromaddr, toaddr, str(message)) + + +class GPG: + def __init__(self, homedir): + self.homedir = homedir + + def encrypt(self, message, sender, recipient): + sender = parseaddr(sender)[1] + recipient = parseaddr(recipient)[1] + cmd = [ "gpg", "--homedir", self.homedir, "--batch", "--yes", "--no-options", "--armor", + "--local-user", sender, "--recipient", recipient, "--sign", "--encrypt"] + p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + encdata, err = p.communicate(input=message.encode('utf-8')) + if p.returncode: + raise RuntimeError(p.returncode, err) + return encdata.decode('ascii') + + +@click.command() +@click.argument('filename') +def main(filename): + # load configuration + config = loadconfig(filename) + + bnp = Site() + bnp.login(config['USERNAME'], config['PASSWORD']) + + db = sqlite3.connect(config['DATABASE']) + db.execute('''CREATE TABLE IF NOT EXISTS messages (id TEXT PRIMARY KEY)''') + db.execute('''CREATE TABLE IF NOT EXISTS transactions (id INTEGER PRIMARY KEY)''') + + mailer = Mailer(config) + encrypt = GPG(config['GNUPGHOME']).encrypt + + ## unread messages + messages = filter(lambda x: not x.read, bnp.messages()) + for m in sorted(messages, key=lambda x: x.date): + curs = db.cursor() + curs.execute('''SELECT IFNULL((SELECT id FROM messages WHERE id = ?), 0)''', (m.id, )) + if curs.fetchone()[0]: + # already handled + continue + + # retrieve complete sender and message body + sender, body = bnp.message(m.id) + + # compose and send message + body = MESSAGE.format(id=m.id, sender=sender, date=m.date, subject=m.subject, body=body) + message = MIMEText(encrypt(body, config['MAILFROM'], config['MAILTO']), _charset='utf8 7bit') + message['Subject'] = 'BNP Paribas message' + message['From'] = config['MAILFROM'] + message['To'] = config['MAILTO'] + message['Date'] = format_datetime(localtime(m.date)) + mailer.send(message) + + curs.execute('''INSERT INTO messages (id) VALUES (?)''', (m.id, )) + db.commit() + + + ## transactions + transactions = bnp.recent(config['CONTRACT']) + curs = db.cursor() + lines = [] + for t in transactions: + curs.execute('''SELECT IFNULL((SELECT id FROM transactions WHERE id = ?), 0)''', (t.id, )) + if curs.fetchone()[0]: + # already handled + continue + lines.append(str(t)) + curs.execute('''INSERT INTO transactions (id) VALUES (?)''', (t.id, )) + + if lines: + lines.insert(0, HEADER) + lines.insert(1, '-' * len(HEADER)) + body = '\n'.join(lines) + message = MIMEText(encrypt(body, config['MAILFROM'], config['MAILTO']), _charset='utf8 7bit') + message['Subject'] = 'BNP Paribas update' + message['From'] = config['MAILFROM'] + message['To'] = config['MAILTO'] + message['Date'] = format_datetime(localtime()) + mailer.send(message) + + db.commit() + + +if __name__ == '__main__': + main()
--- a/src/bnpparibas.py Wed Mar 25 01:42:21 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,481 +0,0 @@ -import email -import imp -import os.path -import re -import smtplib -import sqlite3 -import subprocess -import textwrap - -from collections import namedtuple, defaultdict -from contextlib import contextmanager -from datetime import datetime -from decimal import Decimal -from email.mime.text import MIMEText -from email.utils import format_datetime, localtime, parseaddr -from io import BytesIO -from itertools import product, islice -from urllib.parse import urljoin - -import bs4 -import click -import requests - -from PIL import Image - - -# message template -MESSAGE = """\ -From: {sender:} -Subject: {subject:} -Date: {date:} -Message-Id: {id:} - -{body:} -""" - -# transaction template -HEADER = '{:14s} {:10s} {:59s} {:>8s}'.format('Id', 'Date', 'Description', 'Amount') -TRANSACTION = '{id:} {date:%d/%m/%Y} {descr:59s} {amount:>8s}' - -# as defined in bnpbaribas web app -CATEGORIES = { - '1': 'Alimentation', - '7': 'Logement', - '8': 'Loisirs', - '9': 'Transport', - '12': 'Opérations bancaires', - '13': 'Non défini', - '14': 'Multimédia', - '20': 'Energies', - '22': 'Retrait', - '23': 'Sorties', - 'R58': 'Non défini', -} - -# euro symbol -EURO = b'\xe2\x82\xac'.decode('utf-8') - - -# load configuration -def loadconfig(filename): - module = imp.new_module('config') - module.__file__ = filename - try: - with open(filename) as fd: - exec(compile(fd.read(), filename, 'exec'), module.__dict__) - except IOError as e: - e.strerror = 'Unable to load configuration file (%s)' % e.strerror - raise - config = {} - for key in dir(module): - if key.isupper(): - config[key] = getattr(module, key) - return config - - -# GPG encrypted text is ascii and as such does not require encoding -# but its decrypted form is utf-8 and therefore the charset header -# must be set accordingly. define an appropriate charset object -email.charset.add_charset('utf8 7bit', header_enc=email.charset.SHORTEST, - body_enc=None, output_charset='utf-8') - - -Message = namedtuple('Message', 'id read icon sender subject date validity'.split()) - - -class Transaction: - def __init__(self, tid, date, descr, debit, credit, category): - self.id = tid - self.date = date - self.descr = descr - self.debit = debit - self.credit = credit - self.category = category - - def __str__(self): - # there does not seem to be an easy way to format Decimal - # objects with a leading sign in both the positive and - # negative value cases so do it manually - d = vars(self) - if d['debit']: - d['amount'] = '-' + str(d['debit']) - if d['credit']: - d['amount'] = '+' + str(d['credit']) - return TRANSACTION.format(**d) - - -def imslice(image): - for y, x in product(range(0, 5), range(0, 5)): - yield image.crop((27 * x + 1, 27 * y + 1, 27 * (x + 1), 27 * (y + 1))) - - -def imdecode(image): - # load reference keypad - keypad = Image.open(os.path.join(os.path.dirname(__file__), 'keypad.png')).convert('L') - keypad = [ keypad.crop((26 * i, 0, 26 * (i + 1), 26)) for i in range(10) ] - immap = {} - for n, tile in enumerate(imslice(image)): - # skip tiles with background only - if tile.getextrema()[0] > 0: - continue - # compare to reference tiles - for d in range(0, 10): - if tile == keypad[d]: - immap[d] = n + 1 - break - if sorted(immap.keys()) != list(range(10)): - raise ValueError('keypad decode failed') - return immap - - -def amountparse(value): - # empty - if value == '\xa0': - return None - m = re.match(r'\s+((?:\d+\.)?\d+,\d+)\s+([^\s]+)\s+$', value, re.U|re.S) - if m is None: - raise ValueError(repr(value)) - # euro - currency = m.group(2) - if currency != EURO: - raise ValueError(repr(currency)) - return Decimal(m.group(1).replace('.', '').replace(',', '.')) - - -class Site: - def __init__(self): - self.url = 'https://www.secure.bnpparibas.net' - self.req = requests.Session() - - def login(self, user, passwd): - # login page - url = urljoin(self.url, '/banque/portail/particulier/HomeConnexion') - r = self.req.get(url, params={'type': 'homeconnex'}) - r.raise_for_status() - # login form - soup = bs4.BeautifulSoup(r.text) - form = soup.find('form', attrs={'name': 'logincanalnet'}) - # extract relevant data - action = form['action'] - data = { field['name']: field['value'] for field in form('input') } - - # keyboard image url - tag = soup.find(attrs={'id': 'secret-nbr-keyboard'}) - for prop in tag['style'].split(';'): - match = re.match(r'background-image:\s+url\(\'(.*)\'\)\s*', prop) - if match: - src = match.group(1) - break - # download keyboard image - r = self.req.get(urljoin(self.url, src)) - image = Image.open(BytesIO(r.content)).convert('L') - # decode digits position - passwdmap = imdecode(image) - - # encode password - passwdenc = ''.join('%02d' % passwdmap[d] for d in map(int, passwd)) - - # username and password - data['ch1'] = user - data['ch5'] = passwdenc - - # post - r = self.req.post(urljoin(self.url, action), data=data) - r.raise_for_status() - # redirection - m = re.search(r'document\.location\.replace\(\"(.+)\"\)', r.text) - dest = m.group(1) - r = self.req.get(dest) - r.raise_for_status() - - # check for errors - soup = bs4.BeautifulSoup(r.text) - err = soup.find(attrs={'class': 'TitreErreur'}) - if err: - raise ValueError(err.text) - - - def recent(self, contract): - data = { - 'BeginDate': '', - 'Categs': '', - 'Contracts': '', - 'EndDate': '', - 'OpTypes': '', - 'cboFlowName': 'flow/iastatement', - 'contractId': contract, - 'contractIds': '', - 'entryDashboard': '', - 'execution': 'e6s1', - 'externalIAId': 'IAStatements', - 'g1Style': 'expand', - 'g1Type': '', - 'g2Style': 'collapse', - 'g2Type': '', - 'g3Style': 'collapse', - 'g3Type': '', - 'g4Style': 'collapse', - 'g4Type': '', - 'groupId': '-2', - 'groupSelected': '-2', - 'gt': 'homepage:basic-theme', - 'pageId': 'releveoperations', - 'pastOrPendingOperations': '1', - 'sendEUD': 'true', - 'step': 'STAMENTS', } - - url = urljoin(self.url, '/banque/portail/particulier/FicheA') - r = self.req.post(url, data=data) - r.raise_for_status() - text = r.text - - # the html is so broken beautifulsoup does not understand it - text = text.replace( - '<th class="thTitre" style="width:7%">Pointage </td>', - '<th class="thTitre" style="width:7%">Pointage </th>') - s = bs4.BeautifulSoup(text) - - # extract transactions - table = s.find('table', id='tableCompte') - rows = table.find_all('tr') - for row in rows: - fields = row.find_all('td') - if not fields: - # skip headers row - continue - id = int(fields[0].input['id'].lstrip('_')) - date = datetime.strptime(fields[1].text, '%d/%m/%Y') - descr = fields[2].text.strip() - debit = amountparse(fields[3].text) - credit = amountparse(fields[4].text) - category = fields[5].text.strip() - categoryid = fields[6].span['class'][2][4:] - yield Transaction(id, date, descr, debit, credit, categoryid) - - - def messages(self): - data = { - 'identifiant': 'BmmFicheListerMessagesRecus_20100607022434', - 'type': 'fiche', } - - url = urljoin(self.url, '/banque/portail/particulier/Fiche') - r = self.req.post(url, data=data) - r.raise_for_status() - s = bs4.BeautifulSoup(r.text) - - # messages list - table = s.find('table', id='listeMessages') - for row in table.find_all('tr', recursive=False): - # skip headers and separators - if 'entete' in row['class']: - continue - # skip separators - if 'sep' in row['class']: - continue - # skip footer - if 'actions_bas' in row['class']: - continue - fields = row.find_all('td') - icon = fields[1].img['src'] - sender = fields[2].text.strip() - subject = fields[4].a.text.strip() - date = datetime.strptime(fields[5]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec') - validity = datetime.strptime(fields[6]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec') - m = re.match(r'''validerFormulaire\('BmmFicheLireMessage_20100607022346','(.+)','(true|false)'\);$''', fields[4].a['onclick']) - mid = m.group(1) - read = m.group(2) == 'false' - yield Message(mid, read, icon, sender, subject, date, validity) - - - def message(self, mid): - data = { - 'etape': 'boiteReception', - 'idMessage': mid, - 'identifiant': 'BmmFicheLireMessage_20100607022346', - 'maxPagination': 2, - 'minPagination': 1, - 'nbElementParPage': 20, - 'nbEltPagination': 5, - 'nbPages': 2, - 'newMsg': 'false', - 'pagination': 1, - 'type': 'fiche', - 'typeAction': '', } - - url = urljoin(self.url, '/banque/portail/particulier/Fiche') - r = self.req.post(url, data=data) - r.raise_for_status() - # fix badly broken html - text = r.text.replace('<br>', '<br/>').replace('</br>', '') - s = bs4.BeautifulSoup(text) - - envelope = s.find('div', attrs={'class': 'enveloppe'}) - rows = envelope.find_all('tr') - fields = rows[1].find_all('td') - # the messages list present a truncated sender - sender = fields[0].text.strip() - # not used - subject = fields[1].text.strip() - date = fields[2].text.strip() - - content = s.find('div', attrs={'class': 'txtMessage'}) - # clean up text - for t in content.find_all('style'): - t.extract() - for t in content.find_all('script'): - t.extract() - for t in content.find_all(id='info_pro'): - t.extract() - for t in content.find_all('br'): - t.replace_with('\n\n') - for t in content.find_all('b'): - if t.string: - t.replace_with('*%s*' % t.string.strip()) - for t in content.find_all('li'): - t.replace_with('- %s\n\n' % t.text.strip()) - # format nicely - text = re.sub(' +', ' ', content.text) - text = re.sub(r'\s+([\.:])', r'\1', text) - pars = [] - for p in re.split('\n\n+', text): - p = p.strip() - if p: - pars.append('\n'.join(textwrap.wrap(p, 72))) - body = '\n\n'.join(pars) - return sender, body - - - def transactions(self): - data = {'ch_memo': 'NON', - 'ch_rop_cpt_0': 'FR7630004001640000242975804', - 'ch_rop_dat': 'tous', - 'ch_rop_dat_deb': '', - 'ch_rop_dat_fin': '', - 'ch_rop_fmt_dat': 'JJMMAAAA', - 'ch_rop_fmt_fic': 'RTEXC', - 'ch_rop_fmt_sep': 'PT', - 'ch_rop_mon': 'EUR', - 'x': '55', - 'y': '7'} - r = self.req.post(urljoin(self.url, '/SAF_TLC_CNF'), data=data) - r.raise_for_status() - s = bs4.BeautifulSoup(r.text) - path = s.find('a')['href'] - r = self.req.get(urljoin(self.url, path)) - r.raise_for_status() - return r.text - - -class Mailer: - def __init__(self, config): - self.server = config.get('SMTPSERVER', 'localhost') - self.port = config.get('SMTPPORT', 25) - self.starttls = config.get('SMTPSTARTTLS', False) - self.username = config.get('SMTPUSER', '') - self.password = config.get('SMTPPASSWD', '') - - @contextmanager - def connect(self): - smtp = smtplib.SMTP(self.server, self.port) - if self.starttls: - smtp.starttls() - if self.username: - smtp.login(self.username, self.password) - yield smtp - smtp.quit() - - def send(self, message, fromaddr=None, toaddr=None): - if not fromaddr: - fromaddr = message['From'] - if not toaddr: - toaddr = message['To'] - with self.connect() as conn: - conn.sendmail(fromaddr, toaddr, str(message)) - - -class GPG: - def __init__(self, homedir): - self.homedir = homedir - - def encrypt(self, message, sender, recipient): - sender = parseaddr(sender)[1] - recipient = parseaddr(recipient)[1] - cmd = [ "gpg", "--homedir", self.homedir, "--batch", "--yes", "--no-options", "--armor", - "--local-user", sender, "--recipient", recipient, "--sign", "--encrypt"] - p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - encdata, err = p.communicate(input=message.encode('utf-8')) - if p.returncode: - raise RuntimeError(p.returncode, err) - return encdata.decode('ascii') - - -@click.command() -@click.argument('filename') -def main(filename): - # load configuration - config = loadconfig(filename) - - bnp = Site() - bnp.login(config['USERNAME'], config['PASSWORD']) - - db = sqlite3.connect(config['DATABASE']) - db.execute('''CREATE TABLE IF NOT EXISTS messages (id TEXT PRIMARY KEY)''') - db.execute('''CREATE TABLE IF NOT EXISTS transactions (id INTEGER PRIMARY KEY)''') - - mailer = Mailer(config) - encrypt = GPG(config['GNUPGHOME']).encrypt - - ## unread messages - messages = filter(lambda x: not x.read, bnp.messages()) - for m in sorted(messages, key=lambda x: x.date): - curs = db.cursor() - curs.execute('''SELECT IFNULL((SELECT id FROM messages WHERE id = ?), 0)''', (m.id, )) - if curs.fetchone()[0]: - # already handled - continue - - # retrieve complete sender and message body - sender, body = bnp.message(m.id) - - # compose and send message - body = MESSAGE.format(id=m.id, sender=sender, date=m.date, subject=m.subject, body=body) - message = MIMEText(encrypt(body, config['MAILFROM'], config['MAILTO']), _charset='utf8 7bit') - message['Subject'] = 'BNP Paribas message' - message['From'] = config['MAILFROM'] - message['To'] = config['MAILTO'] - message['Date'] = format_datetime(localtime(m.date)) - mailer.send(message) - - curs.execute('''INSERT INTO messages (id) VALUES (?)''', (m.id, )) - db.commit() - - - ## transactions - transactions = bnp.recent(config['CONTRACT']) - curs = db.cursor() - lines = [] - for t in transactions: - curs.execute('''SELECT IFNULL((SELECT id FROM transactions WHERE id = ?), 0)''', (t.id, )) - if curs.fetchone()[0]: - # already handled - continue - lines.append(str(t)) - curs.execute('''INSERT INTO transactions (id) VALUES (?)''', (t.id, )) - - if lines: - lines.insert(0, HEADER) - lines.insert(1, '-' * len(HEADER)) - body = '\n'.join(lines) - message = MIMEText(encrypt(body, config['MAILFROM'], config['MAILTO']), _charset='utf8 7bit') - message['Subject'] = 'BNP Paribas update' - message['From'] = config['MAILFROM'] - message['To'] = config['MAILTO'] - message['Date'] = format_datetime(localtime()) - mailer.send(message) - - db.commit() - - -if __name__ == '__main__': - main()