Mercurial > hg > ltpdarepo
changeset 109:598f84f0db4e
Extend test to exercise the permissions management interface.
Doing so improve the 'privileges' function of the command line
administration tool, cleanup the implementation and most importantly
add CSRF protection that was missing for this form.
author | Daniele Nicolodi <daniele@grinta.net> |
---|---|
date | Tue, 23 Aug 2011 21:03:50 +0200 |
parents | f075650d3e1d |
children | e2338b374af4 |
files | src/ltpdarepo/admin.py src/ltpdarepo/templates/databases/permissions.html src/ltpdarepo/tests/manage-databases.txt src/ltpdarepo/views/databases.py |
diffstat | 4 files changed, 117 insertions(+), 47 deletions(-) [+] |
line wrap: on
line diff
--- a/src/ltpdarepo/admin.py Tue Aug 23 19:11:25 2011 +0200 +++ b/src/ltpdarepo/admin.py Tue Aug 23 21:03:50 2011 +0200 @@ -158,13 +158,15 @@ choices=frozenset(['select', 'insert', 'update', 'delete', 'admin'])) -def privileges(username): +def privileges(username, database=None): """show privileges for given user""" conn = mysql.connect(host=HOSTNAME, db=DATABASE, user=USERNAME, passwd=PASSWORD, charset='utf8') curs = conn.cursor() - privs = {} + from collections import defaultdict + privs = defaultdict(lambda: {'select': False, 'insert': False, 'update': False, 'delete': False}) + curs.execute('''SELECT DISTINCT Db, Select_priv, Insert_priv, Update_priv, Delete_priv FROM mysql.db WHERE User=%s''', username) for row in curs.fetchall(): @@ -173,9 +175,20 @@ 'update': row[3] == 'Y', 'delete': row[4] == 'Y'} conn.close() - pprint(privs) + if database is not None: + return privs[database] + return privs -cmd = commands.add(privileges) +def _privileges(username): + privs = privileges(username) + if privs: + headers = tuple('username select insert update delete'.split()) + print '%-20s %7s %7s %7s %7s' % headers + for user, priv in privs.iteritems(): + values = [priv[x] and 'Y' or 'N' for x in 'select insert update delete'.split()] + print '%-20s' % username, '%7s %7s %7s %7s' % tuple(values) + +cmd = commands.add(_privileges, name='privileges') cmd.add_argument('username', metavar='USERNAME')
--- a/src/ltpdarepo/templates/databases/permissions.html Tue Aug 23 19:11:25 2011 +0200 +++ b/src/ltpdarepo/templates/databases/permissions.html Tue Aug 23 21:03:50 2011 +0200 @@ -1,29 +1,33 @@ +{% import 'forms.html' as forms %} {% extends "layout.html" %} {% block title %}{{ database.id }}{% endblock %} {% block body %} -<h2>Permissions for database «{{ database }}»</h2> +<h2>Permissions for database «{{ database.id }}»</h2> <form action="" method="post"> - <table class="permissions"> - <tr> - <th></th> - <th>select</th> - <th>insert</th> - <th>update</th> - <th>delete</th> - </tr> - {% for user, priv in permissions.iteritems() %} - <tr> - <td> - <a href="{{ url_for('manage.users.view', username=user) }}">{{ user }}</a> - <input type="hidden" name="{{ user }}" value="" /> - </td> - <td><input type="checkbox" name="{{user}}:select" value="Y" {%- if priv['select'] %} checked="checked" {% endif -%} /></td> - <td><input type="checkbox" name="{{user}}:insert" value="Y" {%- if priv['insert'] %} checked="checked" {% endif -%} /></td> - <td><input type="checkbox" name="{{user}}:update" value="Y" {%- if priv['update'] %} checked="checked" {% endif -%} /></td> - <td><input type="checkbox" name="{{user}}:delete" value="Y" {%- if priv['delete'] %} checked="checked" {% endif -%} /></td> - </tr> - {% endfor %} - </table> - <input id="submit" type="submit" value="save" /> + <fieldset> + {% for field in form %}{{ forms.render_form_field(field) }}{% endfor %} + <table class="permissions"> + <tr> + <th></th> + <th>select</th> + <th>insert</th> + <th>update</th> + <th>delete</th> + </tr> + {% for user, priv in permissions.iteritems() %} + <tr> + <td> + <a href="{{ url_for('manage.users.view', username=user) }}">{{ user }}</a> + <input type="hidden" name="{{ user }}" value="" /> + </td> + <td><input type="checkbox" name="{{user}}:select" value="Y" {%- if priv['select'] %} checked="checked" {% endif -%} /></td> + <td><input type="checkbox" name="{{user}}:insert" value="Y" {%- if priv['insert'] %} checked="checked" {% endif -%} /></td> + <td><input type="checkbox" name="{{user}}:update" value="Y" {%- if priv['update'] %} checked="checked" {% endif -%} /></td> + <td><input type="checkbox" name="{{user}}:delete" value="Y" {%- if priv['delete'] %} checked="checked" {% endif -%} /></td> + </tr> + {% endfor %} + </table> + <input id="submit" type="submit" name="submit" value="save" /> + </fieldset> </form> {% endblock %}
--- a/src/ltpdarepo/tests/manage-databases.txt Tue Aug 23 19:11:25 2011 +0200 +++ b/src/ltpdarepo/tests/manage-databases.txt Tue Aug 23 21:03:50 2011 +0200 @@ -56,6 +56,54 @@ >>> browser.contents '...<p class="field"><span class="label">Description:</span> Test Database One</p>...' + +Test permissions. The user should have no permissions on the new database:: + + >>> from ltpdarepo.admin import privileges + >>> privileges('u1', 'database1') + {'insert': False, 'update': False, 'select': False, 'delete': False} + +Edit permissions:: + + >>> browser.open('http://localhost/manage/databases/database1') + >>> browser.follow('Permissions') + >>> browser.url + 'http://localhost/manage/databases/database1/permissions' + + >>> browser.getControl(name='u1:select').value = True + >>> browser.getControl(name='u1:insert').value = True + >>> browser.getControl(name='submit').click() + >>> browser.url + 'http://localhost/manage/databases/database1' + +Check that the permissions have been updated:: + + >>> privileges('u1', 'database1') + {'insert': True, 'update': False, 'select': True, 'delete': False} + +that the form is updated accordingly:: + + >>> browser.follow('Permissions') + >>> browser.getControl(name='u1:select').value + ['Y'] + >>> browser.getControl(name='u1:insert').value + ['Y'] + >>> browser.getControl(name='u1:update').value + [] + >>> browser.getControl(name='u1:delete').value + [] + +and that we can revoke permissions:: + + >>> browser.getControl(name='u1:insert').value = False + >>> browser.getControl(name='submit').click() + >>> browser.url + 'http://localhost/manage/databases/database1' + + >>> privileges('u1', 'database1') + {'insert': False, 'update': False, 'select': True, 'delete': False} + + Cannot create database with bad id:: >>> browser.open('/manage/databases/')
--- a/src/ltpdarepo/views/databases.py Tue Aug 23 19:11:25 2011 +0200 +++ b/src/ltpdarepo/views/databases.py Tue Aug 23 21:03:50 2011 +0200 @@ -27,7 +27,7 @@ @app.route('/<database>') @require('admin') def view(database): - db = Database().load(id=database) + db = Database.load(id=database) if db is None: # not found abort(404) @@ -50,7 +50,7 @@ @app.route('/<database>/edit', methods=['GET', 'POST']) @require('admin') def edit(database): - db = Database().load(id=database) + db = Database.load(id=database) if db is None: # not found abort(404) @@ -66,7 +66,7 @@ @app.route('/<database>/drop', methods=['GET', 'POST']) @require('admin') def drop(database): - db = Database().load(id=database) + db = Database.load(id=database) if db is None: # not found abort(404) @@ -101,40 +101,45 @@ return privs -def _permissions(permissions, formdata): +def _permissions_updates(permissions, formdata): users = permissions.keys() updates = [] for user in users: permissions[user]['modified'] = False if user in formdata: - for act in ('select', 'insert', 'update', 'delete'): - p = bool(formdata.get('%s:%s' % (user, act), False)) - if permissions[user][act] != p: - updates.append({'user': user, 'priv': act, 'grant': p}) + for priv in ('select', 'insert', 'update', 'delete'): + value = bool(formdata.get('%s:%s' % (user, priv), False)) + if permissions[user][priv] != value: + updates.append((user, priv, value)) return updates -def _set_permissions(database, updates): +def _update_permissions(database, updates): curs = g.db.cursor() - for update in updates: - if update['grant']: - curs.execute("""GRANT %s ON `%s`.* TO %%s@%%s""" - % (update['priv'], database), (update['user'], '%')) + for user, priv, value in updates: + if value: + cmd = "GRANT %s ON `%s`.* TO %%s@%%s""" % (priv, database) else: - curs.execute("""REVOKE %s ON `%s`.* FROM %%s@%%s""" - % (update['priv'], database), (update['user'], '%')) + cmd = "REVOKE %s ON `%s`.* FROM %%s@%%s""" % (priv, database) + curs.execute(cmd, (user, '%')) @app.route('/<database>/permissions', methods=['GET', 'POST']) @require('admin') def permissions(database): + db = Database.load(id=database) + if db is None: + # not found + abort(404) permissions = _get_permissions(database) - if request.method == 'POST': - updates = _permissions(permissions, request.form) - _set_permissions(database, updates) + # use an empty form to have CSRF protection + form = Form() + if request.method == 'POST' and form.validate(): + updates = _permissions_updates(permissions, request.form) + _update_permissions(database, updates) flash('Permissions updated.') - return redirect(url_for('manage.databases.permissions', database=database)) - return render_template('databases/permissions.html', database=database, permissions=permissions) + return redirect(url_for('manage.databases.view', database=database)) + return render_template('databases/permissions.html', database=db, permissions=permissions, form=form) module = app