Mercurial > hg > bnpparibas
comparison src/bnpparibas.py @ 0:02ec4a9ab0f0
Import
author | Daniele Nicolodi <daniele.nicolodi@obspm.fr> |
---|---|
date | Tue, 24 Feb 2015 15:50:21 +0100 |
parents | |
children | ad577744dd8e |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:02ec4a9ab0f0 |
---|---|
1 import email | |
2 import os.path | |
3 import re | |
4 import smtplib | |
5 import sqlite3 | |
6 import subprocess | |
7 import textwrap | |
8 | |
9 from collections import namedtuple, defaultdict | |
10 from contextlib import contextmanager | |
11 from datetime import datetime | |
12 from decimal import Decimal | |
13 from email.mime.text import MIMEText | |
14 from email.utils import format_datetime, localtime, parseaddr | |
15 from io import BytesIO | |
16 from itertools import product, islice | |
17 from urllib.parse import urljoin | |
18 | |
19 import bs4 | |
20 import numpy as np | |
21 import requests | |
22 | |
23 from PIL import Image | |
24 | |
25 DB = 'bnpparibas.sqlite' | |
26 | |
27 # message template | |
28 MESSAGE = """\ | |
29 From: {sender:} | |
30 Subject: {subject:} | |
31 Date: {date:} | |
32 Message-Id: {id:} | |
33 | |
34 {body:} | |
35 """ | |
36 | |
37 # transaction template | |
38 HEADER = '{:14s} {:10s} {:59s} {:>8s}'.format('Id', 'Date', 'Description', 'Amount') | |
39 TRANSACTION = '{id:} {date:%d/%m/%Y} {descr:59s} {amount:>8s}' | |
40 | |
41 # as defined in bnpbaribas web app | |
42 CATEGORIES = { | |
43 '1': 'Alimentation', | |
44 '7': 'Logement', | |
45 '8': 'Loisirs', | |
46 '9': 'Transport', | |
47 '12': 'Opérations bancaires', | |
48 '13': 'Non défini', | |
49 '14': 'Multimédia', | |
50 '20': 'Energies', | |
51 '22': 'Retrait', | |
52 '23': 'Sorties', | |
53 'R58': 'Non défini', | |
54 } | |
55 | |
56 # euro symbol | |
57 EURO = b'\xe2\x82\xac'.decode('utf-8') | |
58 | |
59 | |
60 # load configuration | |
61 from config import * | |
62 | |
63 | |
64 # GPG encrypted text is ascii and as such does not require encoding | |
65 # but its decrypted form is utf-8 and therefore the charset header | |
66 # must be set accordingly. define an appropriate charset object | |
67 email.charset.add_charset('utf8 7bit', header_enc=email.charset.SHORTEST, | |
68 body_enc=None, output_charset='utf-8') | |
69 | |
70 | |
71 Message = namedtuple('Message', 'id read icon sender subject date validity'.split()) | |
72 | |
73 | |
74 class Transaction: | |
75 def __init__(self, tid, date, descr, debit, credit, category): | |
76 self.id = tid | |
77 self.date = date | |
78 self.descr = descr | |
79 self.debit = debit | |
80 self.credit = credit | |
81 self.category = category | |
82 | |
83 def __str__(self): | |
84 # there does not seem to be an easy way to format Decimal | |
85 # objects with a leading sign in both the positive and | |
86 # negative value cases so do it manually | |
87 d = vars(self) | |
88 if d['debit']: | |
89 d['amount'] = '-' + str(d['debit']) | |
90 if d['credit']: | |
91 d['amount'] = '+' + str(d['credit']) | |
92 return TRANSACTION.format(**d) | |
93 | |
94 | |
95 def imslice(image): | |
96 for x, y in product(range(0, 5), range(0, 5)): | |
97 islice = image[27*x+1:27*(x+1), 27*y+1:27*(y+1), 0] | |
98 yield islice | |
99 | |
100 | |
101 def imdecode(image): | |
102 keypad = np.load(os.path.join(os.path.dirname(__file__), 'keypad.npy')) | |
103 immap = {} | |
104 for n, islice in enumerate(imslice(image)): | |
105 # skip empty tiles | |
106 if np.mean(islice) > 248.0: | |
107 continue | |
108 # compare to reference tiles | |
109 for d in range(0, 10): | |
110 delta = np.sum(islice - keypad[d]) | |
111 if delta < 100: | |
112 print(delta) | |
113 immap[d] = n + 1 | |
114 return immap | |
115 | |
116 | |
117 def amountparse(value): | |
118 # empty | |
119 if value == '\xa0': | |
120 return None | |
121 m = re.match(r'\s+((?:\d+\.)?\d+,\d+)\s+([^\s]+)\s+$', value, re.U|re.S) | |
122 if m is None: | |
123 raise ValueError(repr(value)) | |
124 # euro | |
125 currency = m.group(2) | |
126 if currency != EURO: | |
127 raise ValueError(repr(currency)) | |
128 return Decimal(m.group(1).replace('.', '').replace(',', '.')) | |
129 | |
130 | |
131 class Site: | |
132 def __init__(self): | |
133 self.url = 'https://www.secure.bnpparibas.net' | |
134 self.req = requests.Session() | |
135 | |
136 def login(self, user, passwd): | |
137 # login page | |
138 url = urljoin(self.url, '/banque/portail/particulier/HomeConnexion') | |
139 r = self.req.get(url, params={'type': 'homeconnex'}) | |
140 r.raise_for_status() | |
141 # login form | |
142 soup = bs4.BeautifulSoup(r.text) | |
143 form = soup.find('form', attrs={'name': 'logincanalnet'}) | |
144 # extract relevant data | |
145 action = form['action'] | |
146 data = { field['name']: field['value'] for field in form('input') } | |
147 | |
148 # keyboard image url | |
149 src = '' | |
150 tag = soup.find(attrs={'id': 'secret-nbr-keyboard'}) | |
151 for prop in tag['style'].split(';'): | |
152 match = re.match(r'background-image:\s+url\(\'(.*)\'\)\s*', prop) | |
153 if match: | |
154 src = match.group(1) | |
155 break | |
156 # download keyboard image | |
157 r = self.req.get(urljoin(self.url, src)) | |
158 image = np.array(Image.open(BytesIO(r.content)).convert('RGB')) | |
159 # decode digits position | |
160 passwdmap = imdecode(image) | |
161 | |
162 # encode password | |
163 passwdenc = ''.join('%02d' % passwdmap[d] for d in map(int, passwd)) | |
164 | |
165 # username and password | |
166 data['ch1'] = user | |
167 data['ch5'] = passwdenc | |
168 | |
169 # post | |
170 r = self.req.post(urljoin(self.url, action), data=data) | |
171 r.raise_for_status() | |
172 # redirection | |
173 m = re.search(r'document\.location\.replace\(\"(.+)\"\)', r.text) | |
174 dest = m.group(1) | |
175 r = self.req.get(dest) | |
176 r.raise_for_status() | |
177 | |
178 # check for errors | |
179 soup = bs4.BeautifulSoup(r.text) | |
180 err = soup.find(attrs={'class': 'TitreErreur'}) | |
181 if err: | |
182 raise ValueError(err.text) | |
183 | |
184 | |
185 def recent(self): | |
186 data = { | |
187 'BeginDate': '', | |
188 'Categs': '', | |
189 'Contracts': '', | |
190 'EndDate': '', | |
191 'OpTypes': '', | |
192 'cboFlowName': 'flow/iastatement', | |
193 'contractId': CONTRACT, | |
194 'contractIds': '', | |
195 'entryDashboard': '', | |
196 'execution': 'e6s1', | |
197 'externalIAId': 'IAStatements', | |
198 'g1Style': 'expand', | |
199 'g1Type': '', | |
200 'g2Style': 'collapse', | |
201 'g2Type': '', | |
202 'g3Style': 'collapse', | |
203 'g3Type': '', | |
204 'g4Style': 'collapse', | |
205 'g4Type': '', | |
206 'groupId': '-2', | |
207 'groupSelected': '-2', | |
208 'gt': 'homepage:basic-theme', | |
209 'pageId': 'releveoperations', | |
210 'pastOrPendingOperations': '1', | |
211 'sendEUD': 'true', | |
212 'step': 'STAMENTS', } | |
213 | |
214 url = urljoin(self.url, '/banque/portail/particulier/FicheA') | |
215 r = self.req.post(url, data=data) | |
216 r.raise_for_status() | |
217 text = r.text | |
218 | |
219 # the html is so broken beautifulsoup does not understand it | |
220 text = text.replace( | |
221 '<th class="thTitre" style="width:7%">Pointage </td>', | |
222 '<th class="thTitre" style="width:7%">Pointage </th>') | |
223 s = bs4.BeautifulSoup(text) | |
224 | |
225 # extract transactions | |
226 table = s.find('table', id='tableCompte') | |
227 rows = table.find_all('tr') | |
228 for row in rows: | |
229 fields = row.find_all('td') | |
230 if not fields: | |
231 # skip headers row | |
232 continue | |
233 id = int(fields[0].input['id'].lstrip('_')) | |
234 date = datetime.strptime(fields[1].text, '%d/%m/%Y') | |
235 descr = fields[2].text.strip() | |
236 debit = amountparse(fields[3].text) | |
237 credit = amountparse(fields[4].text) | |
238 category = fields[5].text.strip() | |
239 categoryid = fields[6].span['class'][2][4:] | |
240 yield Transaction(id, date, descr, debit, credit, categoryid) | |
241 | |
242 | |
243 def messages(self): | |
244 data = { | |
245 'identifiant': 'BmmFicheListerMessagesRecus_20100607022434', | |
246 'type': 'fiche', } | |
247 | |
248 url = urljoin(self.url, '/banque/portail/particulier/Fiche') | |
249 r = self.req.post(url, data=data) | |
250 r.raise_for_status() | |
251 s = bs4.BeautifulSoup(r.text) | |
252 | |
253 # messages list | |
254 table = s.find('table', id='listeMessages') | |
255 for row in table.find_all('tr', recursive=False): | |
256 # skip headers and separators | |
257 if 'entete' in row['class']: | |
258 continue | |
259 # skip separators | |
260 if 'sep' in row['class']: | |
261 continue | |
262 # skip footer | |
263 if 'actions_bas' in row['class']: | |
264 continue | |
265 fields = row.find_all('td') | |
266 icon = fields[1].img['src'] | |
267 sender = fields[2].text.strip() | |
268 subject = fields[4].a.text.strip() | |
269 date = datetime.strptime(fields[5]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec') | |
270 validity = datetime.strptime(fields[6]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec') | |
271 m = re.match(r'''validerFormulaire\('BmmFicheLireMessage_20100607022346','(.+)','(true|false)'\);$''', fields[4].a['onclick']) | |
272 mid = m.group(1) | |
273 read = m.group(2) == 'false' | |
274 yield Message(mid, read, icon, sender, subject, date, validity) | |
275 | |
276 | |
277 def message(self, mid): | |
278 data = { | |
279 'etape': 'boiteReception', | |
280 'idMessage': mid, | |
281 'identifiant': 'BmmFicheLireMessage_20100607022346', | |
282 'maxPagination': 2, | |
283 'minPagination': 1, | |
284 'nbElementParPage': 20, | |
285 'nbEltPagination': 5, | |
286 'nbPages': 2, | |
287 'newMsg': 'false', | |
288 'pagination': 1, | |
289 'type': 'fiche', | |
290 'typeAction': '', } | |
291 | |
292 url = urljoin(self.url, '/banque/portail/particulier/Fiche') | |
293 r = self.req.post(url, data=data) | |
294 r.raise_for_status() | |
295 # fix badly broken html | |
296 text = r.text.replace('<br>', '<br/>').replace('</br>', '') | |
297 s = bs4.BeautifulSoup(text) | |
298 | |
299 envelope = s.find('div', attrs={'class': 'enveloppe'}) | |
300 rows = envelope.find_all('tr') | |
301 fields = rows[1].find_all('td') | |
302 # the messages list present a truncated sender | |
303 sender = fields[0].text.strip() | |
304 # not used | |
305 subject = fields[1].text.strip() | |
306 date = fields[2].text.strip() | |
307 | |
308 content = s.find('div', attrs={'class': 'txtMessage'}) | |
309 # clean up text | |
310 for t in content.find_all('style'): | |
311 t.extract() | |
312 for t in content.find_all('script'): | |
313 t.extract() | |
314 for t in content.find_all(id='info_pro'): | |
315 t.extract() | |
316 for t in content.find_all('br'): | |
317 t.replace_with('\n\n') | |
318 for t in content.find_all('b'): | |
319 if t.string: | |
320 t.replace_with('*%s*' % t.string.strip()) | |
321 for t in content.find_all('li'): | |
322 t.replace_with('- %s\n\n' % t.text.strip()) | |
323 # format nicely | |
324 text = re.sub(' +', ' ', content.text) | |
325 text = re.sub(r'\s+([\.:])', r'\1', text) | |
326 pars = [] | |
327 for p in re.split('\n\n+', text): | |
328 p = p.strip() | |
329 if p: | |
330 pars.append('\n'.join(textwrap.wrap(p, 72))) | |
331 body = '\n\n'.join(pars) | |
332 return sender, body | |
333 | |
334 | |
335 def transactions(self): | |
336 data = {'ch_memo': 'NON', | |
337 'ch_rop_cpt_0': 'FR7630004001640000242975804', | |
338 'ch_rop_dat': 'tous', | |
339 'ch_rop_dat_deb': '', | |
340 'ch_rop_dat_fin': '', | |
341 'ch_rop_fmt_dat': 'JJMMAAAA', | |
342 'ch_rop_fmt_fic': 'RTEXC', | |
343 'ch_rop_fmt_sep': 'PT', | |
344 'ch_rop_mon': 'EUR', | |
345 'x': '55', | |
346 'y': '7'} | |
347 r = self.req.post(urljoin(self.url, '/SAF_TLC_CNF'), data=data) | |
348 r.raise_for_status() | |
349 s = bs4.BeautifulSoup(r.text) | |
350 path = s.find('a')['href'] | |
351 r = self.req.get(urljoin(self.url, path)) | |
352 r.raise_for_status() | |
353 return r.text | |
354 | |
355 | |
356 class Mailer: | |
357 def __init__(self): | |
358 self.server = SMTPSERVER | |
359 self.port = SMTPPORT | |
360 self.starttls = SMTPSTARTTLS | |
361 self.username = SMTPUSER | |
362 self.password = SMTPPASSWD | |
363 | |
364 @contextmanager | |
365 def connect(self): | |
366 smtp = smtplib.SMTP(self.server, self.port) | |
367 if self.starttls: | |
368 smtp.starttls() | |
369 if self.username: | |
370 smtp.login(self.username, self.password) | |
371 yield smtp | |
372 smtp.quit() | |
373 | |
374 def send(self, message, fromaddr=None, toaddr=None): | |
375 if not fromaddr: | |
376 fromaddr = message['From'] | |
377 if not toaddr: | |
378 toaddr = message['To'] | |
379 with self.connect() as conn: | |
380 conn.sendmail(fromaddr, toaddr, str(message)) | |
381 | |
382 | |
383 def encrypt(message, sender, recipient): | |
384 sender = parseaddr(sender)[1] | |
385 recipient = parseaddr(recipient)[1] | |
386 cmd = [ "gpg", "--homedir", GNUPGHOME, "--batch", "--yes", "--no-options", "--armor", | |
387 "--local-user", sender, "--recipient", recipient, "--sign", "--encrypt"] | |
388 p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
389 encdata = p.communicate(input=message.encode('utf-8'))[0].decode('ascii') | |
390 return encdata | |
391 | |
392 | |
393 def main(): | |
394 bnp = Site() | |
395 bnp.login(USERNAME, PASSWORD) | |
396 | |
397 db = sqlite3.connect(DB) | |
398 db.execute('''CREATE TABLE IF NOT EXISTS messages (id TEXT PRIMARY KEY)''') | |
399 db.execute('''CREATE TABLE IF NOT EXISTS transactions (id INTEGER PRIMARY KEY)''') | |
400 | |
401 mailer = Mailer() | |
402 | |
403 ## unread messages | |
404 messages = filter(lambda x: not x.read, bnp.messages()) | |
405 for m in sorted(messages, key=lambda x: x.date): | |
406 curs = db.cursor() | |
407 curs.execute('''SELECT IFNULL((SELECT id FROM messages WHERE id = ?), 0)''', (m.id, )) | |
408 if curs.fetchone()[0]: | |
409 # already handled | |
410 continue | |
411 | |
412 # retrieve complete sender and message body | |
413 sender, body = bnp.message(m.id) | |
414 | |
415 # compose and send message | |
416 body = MESSAGE.format(id=m.id, sender=sender, date=m.date, subject=m.subject, body=body) | |
417 message = MIMEText(encrypt(body, MAILFROM, MAILTO), _charset='utf8 7bit') | |
418 message['Subject'] = 'BNP Paribas message' | |
419 message['From'] = MAILFROM | |
420 message['To'] = MAILTO | |
421 message['Date'] = format_datetime(localtime(m.date)) | |
422 mailer.send(message) | |
423 | |
424 curs.execute('''INSERT INTO messages (id) VALUES (?)''', (m.id, )) | |
425 db.commit() | |
426 | |
427 | |
428 ## transactions | |
429 transactions = bnp.recent() | |
430 curs = db.cursor() | |
431 lines = [] | |
432 for t in transactions: | |
433 curs.execute('''SELECT IFNULL((SELECT id FROM transactions WHERE id = ?), 0)''', (t.id, )) | |
434 if curs.fetchone()[0]: | |
435 # already handled | |
436 continue | |
437 lines.append(str(t)) | |
438 curs.execute('''INSERT INTO transactions (id) VALUES (?)''', (t.id, )) | |
439 | |
440 if lines: | |
441 lines.insert(0, HEADER) | |
442 lines.insert(1, '-' * len(HEADER)) | |
443 body = '\n'.join(lines) | |
444 message = MIMEText(encrypt(body, MAILFROM, MAILTO), _charset='utf8 7bit') | |
445 message['Subject'] = 'BNP Paribas update' | |
446 message['From'] = MAILFROM | |
447 message['To'] = MAILTO | |
448 message['Date'] = format_datetime(localtime()) | |
449 mailer.send(message) | |
450 | |
451 db.commit() | |
452 | |
453 | |
454 if __name__ == '__main__': | |
455 main() |