comparison src/bnpparibas.py @ 0:02ec4a9ab0f0

Import
author Daniele Nicolodi <daniele.nicolodi@obspm.fr>
date Tue, 24 Feb 2015 15:50:21 +0100
parents
children ad577744dd8e
comparison
equal deleted inserted replaced
-1:000000000000 0:02ec4a9ab0f0
1 import email
2 import os.path
3 import re
4 import smtplib
5 import sqlite3
6 import subprocess
7 import textwrap
8
9 from collections import namedtuple, defaultdict
10 from contextlib import contextmanager
11 from datetime import datetime
12 from decimal import Decimal
13 from email.mime.text import MIMEText
14 from email.utils import format_datetime, localtime, parseaddr
15 from io import BytesIO
16 from itertools import product, islice
17 from urllib.parse import urljoin
18
19 import bs4
20 import numpy as np
21 import requests
22
23 from PIL import Image
24
25 DB = 'bnpparibas.sqlite'
26
27 # message template
28 MESSAGE = """\
29 From: {sender:}
30 Subject: {subject:}
31 Date: {date:}
32 Message-Id: {id:}
33
34 {body:}
35 """
36
37 # transaction template
38 HEADER = '{:14s} {:10s} {:59s} {:>8s}'.format('Id', 'Date', 'Description', 'Amount')
39 TRANSACTION = '{id:} {date:%d/%m/%Y} {descr:59s} {amount:>8s}'
40
41 # as defined in bnpbaribas web app
42 CATEGORIES = {
43 '1': 'Alimentation',
44 '7': 'Logement',
45 '8': 'Loisirs',
46 '9': 'Transport',
47 '12': 'Opérations bancaires',
48 '13': 'Non défini',
49 '14': 'Multimédia',
50 '20': 'Energies',
51 '22': 'Retrait',
52 '23': 'Sorties',
53 'R58': 'Non défini',
54 }
55
56 # euro symbol
57 EURO = b'\xe2\x82\xac'.decode('utf-8')
58
59
60 # load configuration
61 from config import *
62
63
64 # GPG encrypted text is ascii and as such does not require encoding
65 # but its decrypted form is utf-8 and therefore the charset header
66 # must be set accordingly. define an appropriate charset object
67 email.charset.add_charset('utf8 7bit', header_enc=email.charset.SHORTEST,
68 body_enc=None, output_charset='utf-8')
69
70
71 Message = namedtuple('Message', 'id read icon sender subject date validity'.split())
72
73
74 class Transaction:
75 def __init__(self, tid, date, descr, debit, credit, category):
76 self.id = tid
77 self.date = date
78 self.descr = descr
79 self.debit = debit
80 self.credit = credit
81 self.category = category
82
83 def __str__(self):
84 # there does not seem to be an easy way to format Decimal
85 # objects with a leading sign in both the positive and
86 # negative value cases so do it manually
87 d = vars(self)
88 if d['debit']:
89 d['amount'] = '-' + str(d['debit'])
90 if d['credit']:
91 d['amount'] = '+' + str(d['credit'])
92 return TRANSACTION.format(**d)
93
94
95 def imslice(image):
96 for x, y in product(range(0, 5), range(0, 5)):
97 islice = image[27*x+1:27*(x+1), 27*y+1:27*(y+1), 0]
98 yield islice
99
100
101 def imdecode(image):
102 keypad = np.load(os.path.join(os.path.dirname(__file__), 'keypad.npy'))
103 immap = {}
104 for n, islice in enumerate(imslice(image)):
105 # skip empty tiles
106 if np.mean(islice) > 248.0:
107 continue
108 # compare to reference tiles
109 for d in range(0, 10):
110 delta = np.sum(islice - keypad[d])
111 if delta < 100:
112 print(delta)
113 immap[d] = n + 1
114 return immap
115
116
117 def amountparse(value):
118 # empty
119 if value == '\xa0':
120 return None
121 m = re.match(r'\s+((?:\d+\.)?\d+,\d+)\s+([^\s]+)\s+$', value, re.U|re.S)
122 if m is None:
123 raise ValueError(repr(value))
124 # euro
125 currency = m.group(2)
126 if currency != EURO:
127 raise ValueError(repr(currency))
128 return Decimal(m.group(1).replace('.', '').replace(',', '.'))
129
130
131 class Site:
132 def __init__(self):
133 self.url = 'https://www.secure.bnpparibas.net'
134 self.req = requests.Session()
135
136 def login(self, user, passwd):
137 # login page
138 url = urljoin(self.url, '/banque/portail/particulier/HomeConnexion')
139 r = self.req.get(url, params={'type': 'homeconnex'})
140 r.raise_for_status()
141 # login form
142 soup = bs4.BeautifulSoup(r.text)
143 form = soup.find('form', attrs={'name': 'logincanalnet'})
144 # extract relevant data
145 action = form['action']
146 data = { field['name']: field['value'] for field in form('input') }
147
148 # keyboard image url
149 src = ''
150 tag = soup.find(attrs={'id': 'secret-nbr-keyboard'})
151 for prop in tag['style'].split(';'):
152 match = re.match(r'background-image:\s+url\(\'(.*)\'\)\s*', prop)
153 if match:
154 src = match.group(1)
155 break
156 # download keyboard image
157 r = self.req.get(urljoin(self.url, src))
158 image = np.array(Image.open(BytesIO(r.content)).convert('RGB'))
159 # decode digits position
160 passwdmap = imdecode(image)
161
162 # encode password
163 passwdenc = ''.join('%02d' % passwdmap[d] for d in map(int, passwd))
164
165 # username and password
166 data['ch1'] = user
167 data['ch5'] = passwdenc
168
169 # post
170 r = self.req.post(urljoin(self.url, action), data=data)
171 r.raise_for_status()
172 # redirection
173 m = re.search(r'document\.location\.replace\(\"(.+)\"\)', r.text)
174 dest = m.group(1)
175 r = self.req.get(dest)
176 r.raise_for_status()
177
178 # check for errors
179 soup = bs4.BeautifulSoup(r.text)
180 err = soup.find(attrs={'class': 'TitreErreur'})
181 if err:
182 raise ValueError(err.text)
183
184
185 def recent(self):
186 data = {
187 'BeginDate': '',
188 'Categs': '',
189 'Contracts': '',
190 'EndDate': '',
191 'OpTypes': '',
192 'cboFlowName': 'flow/iastatement',
193 'contractId': CONTRACT,
194 'contractIds': '',
195 'entryDashboard': '',
196 'execution': 'e6s1',
197 'externalIAId': 'IAStatements',
198 'g1Style': 'expand',
199 'g1Type': '',
200 'g2Style': 'collapse',
201 'g2Type': '',
202 'g3Style': 'collapse',
203 'g3Type': '',
204 'g4Style': 'collapse',
205 'g4Type': '',
206 'groupId': '-2',
207 'groupSelected': '-2',
208 'gt': 'homepage:basic-theme',
209 'pageId': 'releveoperations',
210 'pastOrPendingOperations': '1',
211 'sendEUD': 'true',
212 'step': 'STAMENTS', }
213
214 url = urljoin(self.url, '/banque/portail/particulier/FicheA')
215 r = self.req.post(url, data=data)
216 r.raise_for_status()
217 text = r.text
218
219 # the html is so broken beautifulsoup does not understand it
220 text = text.replace(
221 '<th class="thTitre" style="width:7%">Pointage </td>',
222 '<th class="thTitre" style="width:7%">Pointage </th>')
223 s = bs4.BeautifulSoup(text)
224
225 # extract transactions
226 table = s.find('table', id='tableCompte')
227 rows = table.find_all('tr')
228 for row in rows:
229 fields = row.find_all('td')
230 if not fields:
231 # skip headers row
232 continue
233 id = int(fields[0].input['id'].lstrip('_'))
234 date = datetime.strptime(fields[1].text, '%d/%m/%Y')
235 descr = fields[2].text.strip()
236 debit = amountparse(fields[3].text)
237 credit = amountparse(fields[4].text)
238 category = fields[5].text.strip()
239 categoryid = fields[6].span['class'][2][4:]
240 yield Transaction(id, date, descr, debit, credit, categoryid)
241
242
243 def messages(self):
244 data = {
245 'identifiant': 'BmmFicheListerMessagesRecus_20100607022434',
246 'type': 'fiche', }
247
248 url = urljoin(self.url, '/banque/portail/particulier/Fiche')
249 r = self.req.post(url, data=data)
250 r.raise_for_status()
251 s = bs4.BeautifulSoup(r.text)
252
253 # messages list
254 table = s.find('table', id='listeMessages')
255 for row in table.find_all('tr', recursive=False):
256 # skip headers and separators
257 if 'entete' in row['class']:
258 continue
259 # skip separators
260 if 'sep' in row['class']:
261 continue
262 # skip footer
263 if 'actions_bas' in row['class']:
264 continue
265 fields = row.find_all('td')
266 icon = fields[1].img['src']
267 sender = fields[2].text.strip()
268 subject = fields[4].a.text.strip()
269 date = datetime.strptime(fields[5]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec')
270 validity = datetime.strptime(fields[6]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec')
271 m = re.match(r'''validerFormulaire\('BmmFicheLireMessage_20100607022346','(.+)','(true|false)'\);$''', fields[4].a['onclick'])
272 mid = m.group(1)
273 read = m.group(2) == 'false'
274 yield Message(mid, read, icon, sender, subject, date, validity)
275
276
277 def message(self, mid):
278 data = {
279 'etape': 'boiteReception',
280 'idMessage': mid,
281 'identifiant': 'BmmFicheLireMessage_20100607022346',
282 'maxPagination': 2,
283 'minPagination': 1,
284 'nbElementParPage': 20,
285 'nbEltPagination': 5,
286 'nbPages': 2,
287 'newMsg': 'false',
288 'pagination': 1,
289 'type': 'fiche',
290 'typeAction': '', }
291
292 url = urljoin(self.url, '/banque/portail/particulier/Fiche')
293 r = self.req.post(url, data=data)
294 r.raise_for_status()
295 # fix badly broken html
296 text = r.text.replace('<br>', '<br/>').replace('</br>', '')
297 s = bs4.BeautifulSoup(text)
298
299 envelope = s.find('div', attrs={'class': 'enveloppe'})
300 rows = envelope.find_all('tr')
301 fields = rows[1].find_all('td')
302 # the messages list present a truncated sender
303 sender = fields[0].text.strip()
304 # not used
305 subject = fields[1].text.strip()
306 date = fields[2].text.strip()
307
308 content = s.find('div', attrs={'class': 'txtMessage'})
309 # clean up text
310 for t in content.find_all('style'):
311 t.extract()
312 for t in content.find_all('script'):
313 t.extract()
314 for t in content.find_all(id='info_pro'):
315 t.extract()
316 for t in content.find_all('br'):
317 t.replace_with('\n\n')
318 for t in content.find_all('b'):
319 if t.string:
320 t.replace_with('*%s*' % t.string.strip())
321 for t in content.find_all('li'):
322 t.replace_with('- %s\n\n' % t.text.strip())
323 # format nicely
324 text = re.sub(' +', ' ', content.text)
325 text = re.sub(r'\s+([\.:])', r'\1', text)
326 pars = []
327 for p in re.split('\n\n+', text):
328 p = p.strip()
329 if p:
330 pars.append('\n'.join(textwrap.wrap(p, 72)))
331 body = '\n\n'.join(pars)
332 return sender, body
333
334
335 def transactions(self):
336 data = {'ch_memo': 'NON',
337 'ch_rop_cpt_0': 'FR7630004001640000242975804',
338 'ch_rop_dat': 'tous',
339 'ch_rop_dat_deb': '',
340 'ch_rop_dat_fin': '',
341 'ch_rop_fmt_dat': 'JJMMAAAA',
342 'ch_rop_fmt_fic': 'RTEXC',
343 'ch_rop_fmt_sep': 'PT',
344 'ch_rop_mon': 'EUR',
345 'x': '55',
346 'y': '7'}
347 r = self.req.post(urljoin(self.url, '/SAF_TLC_CNF'), data=data)
348 r.raise_for_status()
349 s = bs4.BeautifulSoup(r.text)
350 path = s.find('a')['href']
351 r = self.req.get(urljoin(self.url, path))
352 r.raise_for_status()
353 return r.text
354
355
356 class Mailer:
357 def __init__(self):
358 self.server = SMTPSERVER
359 self.port = SMTPPORT
360 self.starttls = SMTPSTARTTLS
361 self.username = SMTPUSER
362 self.password = SMTPPASSWD
363
364 @contextmanager
365 def connect(self):
366 smtp = smtplib.SMTP(self.server, self.port)
367 if self.starttls:
368 smtp.starttls()
369 if self.username:
370 smtp.login(self.username, self.password)
371 yield smtp
372 smtp.quit()
373
374 def send(self, message, fromaddr=None, toaddr=None):
375 if not fromaddr:
376 fromaddr = message['From']
377 if not toaddr:
378 toaddr = message['To']
379 with self.connect() as conn:
380 conn.sendmail(fromaddr, toaddr, str(message))
381
382
383 def encrypt(message, sender, recipient):
384 sender = parseaddr(sender)[1]
385 recipient = parseaddr(recipient)[1]
386 cmd = [ "gpg", "--homedir", GNUPGHOME, "--batch", "--yes", "--no-options", "--armor",
387 "--local-user", sender, "--recipient", recipient, "--sign", "--encrypt"]
388 p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
389 encdata = p.communicate(input=message.encode('utf-8'))[0].decode('ascii')
390 return encdata
391
392
393 def main():
394 bnp = Site()
395 bnp.login(USERNAME, PASSWORD)
396
397 db = sqlite3.connect(DB)
398 db.execute('''CREATE TABLE IF NOT EXISTS messages (id TEXT PRIMARY KEY)''')
399 db.execute('''CREATE TABLE IF NOT EXISTS transactions (id INTEGER PRIMARY KEY)''')
400
401 mailer = Mailer()
402
403 ## unread messages
404 messages = filter(lambda x: not x.read, bnp.messages())
405 for m in sorted(messages, key=lambda x: x.date):
406 curs = db.cursor()
407 curs.execute('''SELECT IFNULL((SELECT id FROM messages WHERE id = ?), 0)''', (m.id, ))
408 if curs.fetchone()[0]:
409 # already handled
410 continue
411
412 # retrieve complete sender and message body
413 sender, body = bnp.message(m.id)
414
415 # compose and send message
416 body = MESSAGE.format(id=m.id, sender=sender, date=m.date, subject=m.subject, body=body)
417 message = MIMEText(encrypt(body, MAILFROM, MAILTO), _charset='utf8 7bit')
418 message['Subject'] = 'BNP Paribas message'
419 message['From'] = MAILFROM
420 message['To'] = MAILTO
421 message['Date'] = format_datetime(localtime(m.date))
422 mailer.send(message)
423
424 curs.execute('''INSERT INTO messages (id) VALUES (?)''', (m.id, ))
425 db.commit()
426
427
428 ## transactions
429 transactions = bnp.recent()
430 curs = db.cursor()
431 lines = []
432 for t in transactions:
433 curs.execute('''SELECT IFNULL((SELECT id FROM transactions WHERE id = ?), 0)''', (t.id, ))
434 if curs.fetchone()[0]:
435 # already handled
436 continue
437 lines.append(str(t))
438 curs.execute('''INSERT INTO transactions (id) VALUES (?)''', (t.id, ))
439
440 if lines:
441 lines.insert(0, HEADER)
442 lines.insert(1, '-' * len(HEADER))
443 body = '\n'.join(lines)
444 message = MIMEText(encrypt(body, MAILFROM, MAILTO), _charset='utf8 7bit')
445 message['Subject'] = 'BNP Paribas update'
446 message['From'] = MAILFROM
447 message['To'] = MAILTO
448 message['Date'] = format_datetime(localtime())
449 mailer.send(message)
450
451 db.commit()
452
453
454 if __name__ == '__main__':
455 main()