comparison bnpparibas.py @ 6:13a8bc43bc09

Simplify package structure
author Daniele Nicolodi <daniele@grinta.net>
date Mon, 11 Jan 2016 18:57:25 +0100
parents src/bnpparibas.py@a47012c9db15
children 90f4e0bd0c2d
comparison
equal deleted inserted replaced
5:a47012c9db15 6:13a8bc43bc09
1 import email
2 import imp
3 import os.path
4 import re
5 import smtplib
6 import sqlite3
7 import subprocess
8 import textwrap
9
10 from collections import namedtuple, defaultdict
11 from contextlib import contextmanager
12 from datetime import datetime
13 from decimal import Decimal
14 from email.mime.text import MIMEText
15 from email.utils import format_datetime, localtime, parseaddr
16 from io import BytesIO
17 from itertools import product, islice
18 from urllib.parse import urljoin
19
20 import bs4
21 import click
22 import requests
23
24 from PIL import Image
25
26
27 # message template
28 MESSAGE = """\
29 From: {sender:}
30 Subject: {subject:}
31 Date: {date:}
32 Message-Id: {id:}
33
34 {body:}
35 """
36
37 # transaction template
38 HEADER = '{:14s} {:10s} {:59s} {:>8s}'.format('Id', 'Date', 'Description', 'Amount')
39 TRANSACTION = '{id:} {date:%d/%m/%Y} {descr:59s} {amount:>8s}'
40
41 # as defined in bnpbaribas web app
42 CATEGORIES = {
43 '1': 'Alimentation',
44 '7': 'Logement',
45 '8': 'Loisirs',
46 '9': 'Transport',
47 '12': 'Opérations bancaires',
48 '13': 'Non défini',
49 '14': 'Multimédia',
50 '20': 'Energies',
51 '22': 'Retrait',
52 '23': 'Sorties',
53 'R58': 'Non défini',
54 }
55
56 # euro symbol
57 EURO = b'\xe2\x82\xac'.decode('utf-8')
58
59
60 # load configuration
61 def loadconfig(filename):
62 module = imp.new_module('config')
63 module.__file__ = filename
64 try:
65 with open(filename) as fd:
66 exec(compile(fd.read(), filename, 'exec'), module.__dict__)
67 except IOError as e:
68 e.strerror = 'Unable to load configuration file (%s)' % e.strerror
69 raise
70 config = {}
71 for key in dir(module):
72 if key.isupper():
73 config[key] = getattr(module, key)
74 return config
75
76
77 # GPG encrypted text is ascii and as such does not require encoding
78 # but its decrypted form is utf-8 and therefore the charset header
79 # must be set accordingly. define an appropriate charset object
80 email.charset.add_charset('utf8 7bit', header_enc=email.charset.SHORTEST,
81 body_enc=None, output_charset='utf-8')
82
83
84 Message = namedtuple('Message', 'id read icon sender subject date validity'.split())
85
86
87 class Transaction:
88 def __init__(self, tid, date, descr, debit, credit, category):
89 self.id = tid
90 self.date = date
91 self.descr = descr
92 self.debit = debit
93 self.credit = credit
94 self.category = category
95
96 def __str__(self):
97 # there does not seem to be an easy way to format Decimal
98 # objects with a leading sign in both the positive and
99 # negative value cases so do it manually
100 d = vars(self)
101 if d['debit']:
102 d['amount'] = '-' + str(d['debit'])
103 if d['credit']:
104 d['amount'] = '+' + str(d['credit'])
105 return TRANSACTION.format(**d)
106
107
108 def imslice(image):
109 for y, x in product(range(0, 5), range(0, 5)):
110 yield image.crop((27 * x + 1, 27 * y + 1, 27 * (x + 1), 27 * (y + 1)))
111
112
113 def imdecode(image):
114 # load reference keypad
115 keypad = Image.open(os.path.join(os.path.dirname(__file__), 'keypad.png')).convert('L')
116 keypad = [ keypad.crop((26 * i, 0, 26 * (i + 1), 26)) for i in range(10) ]
117 immap = {}
118 for n, tile in enumerate(imslice(image)):
119 # skip tiles with background only
120 if tile.getextrema()[0] > 0:
121 continue
122 # compare to reference tiles
123 for d in range(0, 10):
124 if tile == keypad[d]:
125 immap[d] = n + 1
126 break
127 if sorted(immap.keys()) != list(range(10)):
128 raise ValueError('keypad decode failed')
129 return immap
130
131
132 def amountparse(value):
133 # empty
134 if value == '\xa0':
135 return None
136 m = re.match(r'\s+((?:\d+\.)?\d+,\d+)\s+([^\s]+)\s+$', value, re.U|re.S)
137 if m is None:
138 raise ValueError(repr(value))
139 # euro
140 currency = m.group(2)
141 if currency != EURO:
142 raise ValueError(repr(currency))
143 return Decimal(m.group(1).replace('.', '').replace(',', '.'))
144
145
146 class Site:
147 def __init__(self):
148 self.url = 'https://www.secure.bnpparibas.net'
149 self.req = requests.Session()
150
151 def login(self, user, passwd):
152 # login page
153 url = urljoin(self.url, '/banque/portail/particulier/HomeConnexion')
154 r = self.req.get(url, params={'type': 'homeconnex'})
155 r.raise_for_status()
156 # login form
157 soup = bs4.BeautifulSoup(r.text)
158 form = soup.find('form', attrs={'name': 'logincanalnet'})
159 # extract relevant data
160 action = form['action']
161 data = { field['name']: field['value'] for field in form('input') }
162
163 # keyboard image url
164 tag = soup.find(attrs={'id': 'secret-nbr-keyboard'})
165 for prop in tag['style'].split(';'):
166 match = re.match(r'background-image:\s+url\(\'(.*)\'\)\s*', prop)
167 if match:
168 src = match.group(1)
169 break
170 # download keyboard image
171 r = self.req.get(urljoin(self.url, src))
172 image = Image.open(BytesIO(r.content)).convert('L')
173 # decode digits position
174 passwdmap = imdecode(image)
175
176 # encode password
177 passwdenc = ''.join('%02d' % passwdmap[d] for d in map(int, passwd))
178
179 # username and password
180 data['ch1'] = user
181 data['ch5'] = passwdenc
182
183 # post
184 r = self.req.post(urljoin(self.url, action), data=data)
185 r.raise_for_status()
186 # redirection
187 m = re.search(r'document\.location\.replace\(\"(.+)\"\)', r.text)
188 dest = m.group(1)
189 r = self.req.get(dest)
190 r.raise_for_status()
191
192 # check for errors
193 soup = bs4.BeautifulSoup(r.text)
194 err = soup.find(attrs={'class': 'TitreErreur'})
195 if err:
196 raise ValueError(err.text)
197
198
199 def recent(self, contract):
200 data = {
201 'BeginDate': '',
202 'Categs': '',
203 'Contracts': '',
204 'EndDate': '',
205 'OpTypes': '',
206 'cboFlowName': 'flow/iastatement',
207 'contractId': contract,
208 'contractIds': '',
209 'entryDashboard': '',
210 'execution': 'e6s1',
211 'externalIAId': 'IAStatements',
212 'g1Style': 'expand',
213 'g1Type': '',
214 'g2Style': 'collapse',
215 'g2Type': '',
216 'g3Style': 'collapse',
217 'g3Type': '',
218 'g4Style': 'collapse',
219 'g4Type': '',
220 'groupId': '-2',
221 'groupSelected': '-2',
222 'gt': 'homepage:basic-theme',
223 'pageId': 'releveoperations',
224 'pastOrPendingOperations': '1',
225 'sendEUD': 'true',
226 'step': 'STAMENTS', }
227
228 url = urljoin(self.url, '/banque/portail/particulier/FicheA')
229 r = self.req.post(url, data=data)
230 r.raise_for_status()
231 text = r.text
232
233 # the html is so broken beautifulsoup does not understand it
234 text = text.replace(
235 '<th class="thTitre" style="width:7%">Pointage </td>',
236 '<th class="thTitre" style="width:7%">Pointage </th>')
237 s = bs4.BeautifulSoup(text)
238
239 # extract transactions
240 table = s.find('table', id='tableCompte')
241 rows = table.find_all('tr')
242 for row in rows:
243 fields = row.find_all('td')
244 if not fields:
245 # skip headers row
246 continue
247 id = int(fields[0].input['id'].lstrip('_'))
248 date = datetime.strptime(fields[1].text, '%d/%m/%Y')
249 descr = fields[2].text.strip()
250 debit = amountparse(fields[3].text)
251 credit = amountparse(fields[4].text)
252 category = fields[5].text.strip()
253 categoryid = fields[6].span['class'][2][4:]
254 yield Transaction(id, date, descr, debit, credit, categoryid)
255
256
257 def messages(self):
258 data = {
259 'identifiant': 'BmmFicheListerMessagesRecus_20100607022434',
260 'type': 'fiche', }
261
262 url = urljoin(self.url, '/banque/portail/particulier/Fiche')
263 r = self.req.post(url, data=data)
264 r.raise_for_status()
265 s = bs4.BeautifulSoup(r.text)
266
267 # messages list
268 table = s.find('table', id='listeMessages')
269 for row in table.find_all('tr', recursive=False):
270 # skip headers and separators
271 if 'entete' in row['class']:
272 continue
273 # skip separators
274 if 'sep' in row['class']:
275 continue
276 # skip footer
277 if 'actions_bas' in row['class']:
278 continue
279 fields = row.find_all('td')
280 icon = fields[1].img['src']
281 sender = fields[2].text.strip()
282 subject = fields[4].a.text.strip()
283 date = datetime.strptime(fields[5]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec')
284 validity = datetime.strptime(fields[6]['data'], '%Y/%m/%d:%Hh%Mmin%Ssec')
285 m = re.match(r'''validerFormulaire\('BmmFicheLireMessage_20100607022346','(.+)','(true|false)'\);$''', fields[4].a['onclick'])
286 mid = m.group(1)
287 read = m.group(2) == 'false'
288 yield Message(mid, read, icon, sender, subject, date, validity)
289
290
291 def message(self, mid):
292 data = {
293 'etape': 'boiteReception',
294 'idMessage': mid,
295 'identifiant': 'BmmFicheLireMessage_20100607022346',
296 'maxPagination': 2,
297 'minPagination': 1,
298 'nbElementParPage': 20,
299 'nbEltPagination': 5,
300 'nbPages': 2,
301 'newMsg': 'false',
302 'pagination': 1,
303 'type': 'fiche',
304 'typeAction': '', }
305
306 url = urljoin(self.url, '/banque/portail/particulier/Fiche')
307 r = self.req.post(url, data=data)
308 r.raise_for_status()
309 # fix badly broken html
310 text = r.text.replace('<br>', '<br/>').replace('</br>', '')
311 s = bs4.BeautifulSoup(text)
312
313 envelope = s.find('div', attrs={'class': 'enveloppe'})
314 rows = envelope.find_all('tr')
315 fields = rows[1].find_all('td')
316 # the messages list present a truncated sender
317 sender = fields[0].text.strip()
318 # not used
319 subject = fields[1].text.strip()
320 date = fields[2].text.strip()
321
322 content = s.find('div', attrs={'class': 'txtMessage'})
323 # clean up text
324 for t in content.find_all('style'):
325 t.extract()
326 for t in content.find_all('script'):
327 t.extract()
328 for t in content.find_all(id='info_pro'):
329 t.extract()
330 for t in content.find_all('br'):
331 t.replace_with('\n\n')
332 for t in content.find_all('b'):
333 if t.string:
334 t.replace_with('*%s*' % t.string.strip())
335 for t in content.find_all('li'):
336 t.replace_with('- %s\n\n' % t.text.strip())
337 # format nicely
338 text = re.sub(' +', ' ', content.text)
339 text = re.sub(r'\s+([\.:])', r'\1', text)
340 pars = []
341 for p in re.split('\n\n+', text):
342 p = p.strip()
343 if p:
344 pars.append('\n'.join(textwrap.wrap(p, 72)))
345 body = '\n\n'.join(pars)
346 return sender, body
347
348
349 def transactions(self):
350 data = {'ch_memo': 'NON',
351 'ch_rop_cpt_0': 'FR7630004001640000242975804',
352 'ch_rop_dat': 'tous',
353 'ch_rop_dat_deb': '',
354 'ch_rop_dat_fin': '',
355 'ch_rop_fmt_dat': 'JJMMAAAA',
356 'ch_rop_fmt_fic': 'RTEXC',
357 'ch_rop_fmt_sep': 'PT',
358 'ch_rop_mon': 'EUR',
359 'x': '55',
360 'y': '7'}
361 r = self.req.post(urljoin(self.url, '/SAF_TLC_CNF'), data=data)
362 r.raise_for_status()
363 s = bs4.BeautifulSoup(r.text)
364 path = s.find('a')['href']
365 r = self.req.get(urljoin(self.url, path))
366 r.raise_for_status()
367 return r.text
368
369
370 class Mailer:
371 def __init__(self, config):
372 self.server = config.get('SMTPSERVER', 'localhost')
373 self.port = config.get('SMTPPORT', 25)
374 self.starttls = config.get('SMTPSTARTTLS', False)
375 self.username = config.get('SMTPUSER', '')
376 self.password = config.get('SMTPPASSWD', '')
377
378 @contextmanager
379 def connect(self):
380 smtp = smtplib.SMTP(self.server, self.port)
381 if self.starttls:
382 smtp.starttls()
383 if self.username:
384 smtp.login(self.username, self.password)
385 yield smtp
386 smtp.quit()
387
388 def send(self, message, fromaddr=None, toaddr=None):
389 if not fromaddr:
390 fromaddr = message['From']
391 if not toaddr:
392 toaddr = message['To']
393 with self.connect() as conn:
394 conn.sendmail(fromaddr, toaddr, str(message))
395
396
397 class GPG:
398 def __init__(self, homedir):
399 self.homedir = homedir
400
401 def encrypt(self, message, sender, recipient):
402 sender = parseaddr(sender)[1]
403 recipient = parseaddr(recipient)[1]
404 cmd = [ "gpg", "--homedir", self.homedir, "--batch", "--yes", "--no-options", "--armor",
405 "--local-user", sender, "--recipient", recipient, "--sign", "--encrypt"]
406 p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
407 encdata, err = p.communicate(input=message.encode('utf-8'))
408 if p.returncode:
409 raise RuntimeError(p.returncode, err)
410 return encdata.decode('ascii')
411
412
413 @click.command()
414 @click.argument('filename')
415 def main(filename):
416 # load configuration
417 config = loadconfig(filename)
418
419 bnp = Site()
420 bnp.login(config['USERNAME'], config['PASSWORD'])
421
422 db = sqlite3.connect(config['DATABASE'])
423 db.execute('''CREATE TABLE IF NOT EXISTS messages (id TEXT PRIMARY KEY)''')
424 db.execute('''CREATE TABLE IF NOT EXISTS transactions (id INTEGER PRIMARY KEY)''')
425
426 mailer = Mailer(config)
427 encrypt = GPG(config['GNUPGHOME']).encrypt
428
429 ## unread messages
430 messages = filter(lambda x: not x.read, bnp.messages())
431 for m in sorted(messages, key=lambda x: x.date):
432 curs = db.cursor()
433 curs.execute('''SELECT IFNULL((SELECT id FROM messages WHERE id = ?), 0)''', (m.id, ))
434 if curs.fetchone()[0]:
435 # already handled
436 continue
437
438 # retrieve complete sender and message body
439 sender, body = bnp.message(m.id)
440
441 # compose and send message
442 body = MESSAGE.format(id=m.id, sender=sender, date=m.date, subject=m.subject, body=body)
443 message = MIMEText(encrypt(body, config['MAILFROM'], config['MAILTO']), _charset='utf8 7bit')
444 message['Subject'] = 'BNP Paribas message'
445 message['From'] = config['MAILFROM']
446 message['To'] = config['MAILTO']
447 message['Date'] = format_datetime(localtime(m.date))
448 mailer.send(message)
449
450 curs.execute('''INSERT INTO messages (id) VALUES (?)''', (m.id, ))
451 db.commit()
452
453
454 ## transactions
455 transactions = bnp.recent(config['CONTRACT'])
456 curs = db.cursor()
457 lines = []
458 for t in transactions:
459 curs.execute('''SELECT IFNULL((SELECT id FROM transactions WHERE id = ?), 0)''', (t.id, ))
460 if curs.fetchone()[0]:
461 # already handled
462 continue
463 lines.append(str(t))
464 curs.execute('''INSERT INTO transactions (id) VALUES (?)''', (t.id, ))
465
466 if lines:
467 lines.insert(0, HEADER)
468 lines.insert(1, '-' * len(HEADER))
469 body = '\n'.join(lines)
470 message = MIMEText(encrypt(body, config['MAILFROM'], config['MAILTO']), _charset='utf8 7bit')
471 message['Subject'] = 'BNP Paribas update'
472 message['From'] = config['MAILFROM']
473 message['To'] = config['MAILTO']
474 message['Date'] = format_datetime(localtime())
475 mailer.send(message)
476
477 db.commit()
478
479
480 if __name__ == '__main__':
481 main()