# HG changeset patch # User Daniele Nicolodi # Date 1314126230 -7200 # Node ID 598f84f0db4e02c3b8f9b54070398a00a0dfe663 # Parent f075650d3e1d609d0ef94389690189457beeb1f7 Extend test to exercise the permissions management interface. Doing so improve the 'privileges' function of the command line administration tool, cleanup the implementation and most importantly add CSRF protection that was missing for this form. diff -r f075650d3e1d -r 598f84f0db4e src/ltpdarepo/admin.py --- a/src/ltpdarepo/admin.py Tue Aug 23 19:11:25 2011 +0200 +++ b/src/ltpdarepo/admin.py Tue Aug 23 21:03:50 2011 +0200 @@ -158,13 +158,15 @@ choices=frozenset(['select', 'insert', 'update', 'delete', 'admin'])) -def privileges(username): +def privileges(username, database=None): """show privileges for given user""" conn = mysql.connect(host=HOSTNAME, db=DATABASE, user=USERNAME, passwd=PASSWORD, charset='utf8') curs = conn.cursor() - privs = {} + from collections import defaultdict + privs = defaultdict(lambda: {'select': False, 'insert': False, 'update': False, 'delete': False}) + curs.execute('''SELECT DISTINCT Db, Select_priv, Insert_priv, Update_priv, Delete_priv FROM mysql.db WHERE User=%s''', username) for row in curs.fetchall(): @@ -173,9 +175,20 @@ 'update': row[3] == 'Y', 'delete': row[4] == 'Y'} conn.close() - pprint(privs) + if database is not None: + return privs[database] + return privs -cmd = commands.add(privileges) +def _privileges(username): + privs = privileges(username) + if privs: + headers = tuple('username select insert update delete'.split()) + print '%-20s %7s %7s %7s %7s' % headers + for user, priv in privs.iteritems(): + values = [priv[x] and 'Y' or 'N' for x in 'select insert update delete'.split()] + print '%-20s' % username, '%7s %7s %7s %7s' % tuple(values) + +cmd = commands.add(_privileges, name='privileges') cmd.add_argument('username', metavar='USERNAME') diff -r f075650d3e1d -r 598f84f0db4e src/ltpdarepo/templates/databases/permissions.html --- a/src/ltpdarepo/templates/databases/permissions.html Tue Aug 23 19:11:25 2011 +0200 +++ b/src/ltpdarepo/templates/databases/permissions.html Tue Aug 23 21:03:50 2011 +0200 @@ -1,29 +1,33 @@ +{% import 'forms.html' as forms %} {% extends "layout.html" %} {% block title %}{{ database.id }}{% endblock %} {% block body %} -

Permissions for database «{{ database }}»

+

Permissions for database «{{ database.id }}»

- - - - - - - - - {% for user, priv in permissions.iteritems() %} - - - - - - - - {% endfor %} -
selectinsertupdatedelete
- {{ user }} - -
- +
+ {% for field in form %}{{ forms.render_form_field(field) }}{% endfor %} + + + + + + + + + {% for user, priv in permissions.iteritems() %} + + + + + + + + {% endfor %} +
selectinsertupdatedelete
+ {{ user }} + +
+ +
{% endblock %} diff -r f075650d3e1d -r 598f84f0db4e src/ltpdarepo/tests/manage-databases.txt --- a/src/ltpdarepo/tests/manage-databases.txt Tue Aug 23 19:11:25 2011 +0200 +++ b/src/ltpdarepo/tests/manage-databases.txt Tue Aug 23 21:03:50 2011 +0200 @@ -56,6 +56,54 @@ >>> browser.contents '...

Description: Test Database One

...' + +Test permissions. The user should have no permissions on the new database:: + + >>> from ltpdarepo.admin import privileges + >>> privileges('u1', 'database1') + {'insert': False, 'update': False, 'select': False, 'delete': False} + +Edit permissions:: + + >>> browser.open('http://localhost/manage/databases/database1') + >>> browser.follow('Permissions') + >>> browser.url + 'http://localhost/manage/databases/database1/permissions' + + >>> browser.getControl(name='u1:select').value = True + >>> browser.getControl(name='u1:insert').value = True + >>> browser.getControl(name='submit').click() + >>> browser.url + 'http://localhost/manage/databases/database1' + +Check that the permissions have been updated:: + + >>> privileges('u1', 'database1') + {'insert': True, 'update': False, 'select': True, 'delete': False} + +that the form is updated accordingly:: + + >>> browser.follow('Permissions') + >>> browser.getControl(name='u1:select').value + ['Y'] + >>> browser.getControl(name='u1:insert').value + ['Y'] + >>> browser.getControl(name='u1:update').value + [] + >>> browser.getControl(name='u1:delete').value + [] + +and that we can revoke permissions:: + + >>> browser.getControl(name='u1:insert').value = False + >>> browser.getControl(name='submit').click() + >>> browser.url + 'http://localhost/manage/databases/database1' + + >>> privileges('u1', 'database1') + {'insert': False, 'update': False, 'select': True, 'delete': False} + + Cannot create database with bad id:: >>> browser.open('/manage/databases/') diff -r f075650d3e1d -r 598f84f0db4e src/ltpdarepo/views/databases.py --- a/src/ltpdarepo/views/databases.py Tue Aug 23 19:11:25 2011 +0200 +++ b/src/ltpdarepo/views/databases.py Tue Aug 23 21:03:50 2011 +0200 @@ -27,7 +27,7 @@ @app.route('/') @require('admin') def view(database): - db = Database().load(id=database) + db = Database.load(id=database) if db is None: # not found abort(404) @@ -50,7 +50,7 @@ @app.route('//edit', methods=['GET', 'POST']) @require('admin') def edit(database): - db = Database().load(id=database) + db = Database.load(id=database) if db is None: # not found abort(404) @@ -66,7 +66,7 @@ @app.route('//drop', methods=['GET', 'POST']) @require('admin') def drop(database): - db = Database().load(id=database) + db = Database.load(id=database) if db is None: # not found abort(404) @@ -101,40 +101,45 @@ return privs -def _permissions(permissions, formdata): +def _permissions_updates(permissions, formdata): users = permissions.keys() updates = [] for user in users: permissions[user]['modified'] = False if user in formdata: - for act in ('select', 'insert', 'update', 'delete'): - p = bool(formdata.get('%s:%s' % (user, act), False)) - if permissions[user][act] != p: - updates.append({'user': user, 'priv': act, 'grant': p}) + for priv in ('select', 'insert', 'update', 'delete'): + value = bool(formdata.get('%s:%s' % (user, priv), False)) + if permissions[user][priv] != value: + updates.append((user, priv, value)) return updates -def _set_permissions(database, updates): +def _update_permissions(database, updates): curs = g.db.cursor() - for update in updates: - if update['grant']: - curs.execute("""GRANT %s ON `%s`.* TO %%s@%%s""" - % (update['priv'], database), (update['user'], '%')) + for user, priv, value in updates: + if value: + cmd = "GRANT %s ON `%s`.* TO %%s@%%s""" % (priv, database) else: - curs.execute("""REVOKE %s ON `%s`.* FROM %%s@%%s""" - % (update['priv'], database), (update['user'], '%')) + cmd = "REVOKE %s ON `%s`.* FROM %%s@%%s""" % (priv, database) + curs.execute(cmd, (user, '%')) @app.route('//permissions', methods=['GET', 'POST']) @require('admin') def permissions(database): + db = Database.load(id=database) + if db is None: + # not found + abort(404) permissions = _get_permissions(database) - if request.method == 'POST': - updates = _permissions(permissions, request.form) - _set_permissions(database, updates) + # use an empty form to have CSRF protection + form = Form() + if request.method == 'POST' and form.validate(): + updates = _permissions_updates(permissions, request.form) + _update_permissions(database, updates) flash('Permissions updated.') - return redirect(url_for('manage.databases.permissions', database=database)) - return render_template('databases/permissions.html', database=database, permissions=permissions) + return redirect(url_for('manage.databases.view', database=database)) + return render_template('databases/permissions.html', database=db, permissions=permissions, form=form) module = app