Mercurial > hg > ltpdarepo
view src/ltpdarepo/security.py @ 197:891229adcf66
Use correct path for redirect after login.
author | Daniele Nicolodi <daniele@grinta.net> |
---|---|
date | Fri, 11 Nov 2011 12:28:36 +0100 |
parents | fbab144c296c |
children |
line wrap: on
line source
# Copyright 2011 Daniele Nicolodi <nicolodi@science.unitn.it> # # This software may be used and distributed according to the terms of # the GNU Affero General Public License version 3 or any later version. from functools import partial, wraps from flask import g, session, request, abort, redirect, url_for from .memoize import memoize def authenticate(username, password): curs = g.db.cursor() rows = curs.execute("""SELECT User FROM mysql.user WHERE User=%s AND Password=PASSWORD(%s)""", (username, password)) if rows: return True return False def _set_identity(): if 'username' in session: g.identity = Identity(session['username']) def secure(app): app.before_request(_set_identity) return app class Secure(object): def __init__(self, app): # funny assignement required because of __setattr__ self.__dict__['app'] = app app.before_request(_set_identity) def require(self, role): return SecurityWrapper(role) def route(self, *args, **kwargs): role = kwargs.get('require', None) if role is not None: return RoutingWrapper(self.app, *args, **kwargs) return self.app.route(*args, **kwargs) def __getattr__(self, name): return getattr(self.app, name) def __setattr__(self, name, value): return setattr(self.app, name, value) def require(role): return SecurityWrapper(role) class SecurityWrapper(object): def __init__(self, role): self.role = role def __call__(self, func): @wraps(func) def decorated(*args, **kwargs): if 'username' not in session: target = None if request.path != '/': target = request.script_root + request.path return redirect(url_for('login', next=target)) if self.role not in g.identity.roles: abort(403) return func(*args, **kwargs) return decorated class RoutingWrapper(SecurityWrapper): def __init__(self, app, *args, **kwargs): role = kwargs.pop('require') super(RoutingWrapper, self).__init__(role=role) self.route = app.route(*args, **kwargs) def __call__(self, func): func = super(RoutingWrapper, self).__call__(func) self.route(func) class Permissions(object): def __init__(self, username): self.username = username @memoize def __contains__(self, perm): if perm.objtype == 'database': curs = g.db.cursor() curs.execute("""SELECT COUNT(*) FROM mysql.db WHERE User=%s AND Db=%s AND Select_priv='Y'""", (self.username, perm.objid, )) if curs.fetchone()[0] > 0: return True if perm.objtype == 'user': if perm.objid == g.identity.username: return True return False class Identity(object): def __init__(self, username): self.username = username def can(self, what): return what in self.permissions @property @memoize def roles(self): curs = g.db.cursor() rows = curs.execute("""SELECT id FROM users WHERE is_admin=1 AND username=%s""", self.username) if rows > 0: return set(('user', 'admin', )) return set(('user', )) @property @memoize def permissions(self): return Permissions(self.username) class permission(object): def __init__(self, perm, objtype, objid): self.perm = perm self.objtype = objtype self.objid = objid def __enter__(self): if g.identity.can(self): return self abort(403) def __exit__(self, exctype, excvalue, trace): pass view = partial(permission, 'view') edit = partial(permission, 'edit') delete = partial(permission, 'delete')