Mercurial > hg > ltpdarepo
view src/ltpdarepo/user.py @ 213:3d524d31d1c2
Move setup.py to better location.
author | Daniele Nicolodi <daniele@grinta.net> |
---|---|
date | Fri, 18 Nov 2011 01:49:52 +0100 |
parents | 234d57f3fd1d |
children |
line wrap: on
line source
# Copyright 2011 Daniele Nicolodi <nicolodi@science.unitn.it> # # This software may be used and distributed according to the terms of # the GNU Affero General Public License version 3 or any later version. from email.utils import formataddr from flask import g from wtforms import validators from wtforms.fields import TextField, PasswordField, BooleanField from wtforms.validators import ValidationError from MySQLdb.cursors import DictCursor from ltpdarepo.form import Form INVALIDPASSWORD = '!xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' class IUser(Form): username = TextField("Username", validators=[validators.Required(), validators.Length(max=16, message=u'Usernames cannot be ' 'longer than 16 characters'), validators.Regexp(r'^[0-9a-zA-Z\-\._]+$', message=u'Invalid identifier.')]) name = TextField("Given name") surname = TextField("Family name") email = TextField("Email", validators=[validators.Required(), validators.Email()]) telephone = TextField("Telephone") institution = TextField("Institution") admin = BooleanField("Admin") def validate_username(form, field): curs = g.db.cursor() curs.execute("SELECT DISTINCT user FROM mysql.user WHERE user <> ''") users = [r[0] for r in curs.fetchall()] if field.data in users: raise ValidationError(u"MySQL already contains an user with this username.") class IPassword(Form): password = PasswordField() confirm = PasswordField() def validate_password(form, field): if not form.password.data == form.confirm.data: raise validators.ValidationError(u"Passwords do not match.") class User(object): __slots__ = ('username', 'name', 'surname', 'email', 'telephone', 'institution', 'admin') def __init__(self, username='', name='', surname='', email='', telephone='', institution='', admin=False): self.username = username self.name = name self.surname = surname self.email = email self.telephone = telephone self.institution = institution self.admin = bool(admin) def __getitem__(self, name): return getattr(self, name) @property def emailaddr(self): # user email address in the form 'Name Surname <email>' return formataddr(('%s %s' % (self.name, self.surname), self.email)) @staticmethod def load(username): curs = g.db.cursor(DictCursor) curs.execute("""SELECT username, given_name AS name, family_name AS surname, email, institution, telephone, is_admin AS admin FROM users WHERE username=%s""", username) user = curs.fetchone() if user is None: return None return User(**user) def create(self): # use an invalid password to prevent user login password = INVALIDPASSWORD curs = g.db.cursor() for host in ('localhost', '%'): curs.execute("""CREATE USER %s@%s IDENTIFIED BY PASSWORD %s""", (self.username, host, password)) curs.execute("""INSERT INTO users (username, given_name, family_name, email, telephone, institution, is_admin) VALUES (%s, %s, %s, %s, %s, %s, %s)""", (self.username, self.name, self.surname, self.email, self.telephone, self.institution, self.admin)) g.db.commit() def delete(self): curs = g.db.cursor() curs.execute("""DELETE FROM users WHERE username=%s""", self.username) curs.execute("""SELECT Host FROM mysql.user WHERE User=%s""", self.username) hosts = [row[0] for row in curs.fetchall()] for host in hosts: curs.execute("""DROP USER %s@%s""", (self.username, host)) g.db.commit() def reset(self): curs = g.db.cursor() curs.execute("""SELECT Host FROM mysql.user WHERE User=%s""", self.username) hosts = [row[0] for row in curs.fetchall()] for host in hosts: curs.execute("""SET PASSWORD FOR %s@%s = %s""", (self.username, host, INVALIDPASSWORD)) g.db.commit() def save(self): curs = g.db.cursor() curs.execute("""UPDATE users SET given_name=%s, family_name=%s, email=%s, institution=%s, telephone=%s, is_admin=%s WHERE username=%s""", (self.name, self.surname, self.email, self.telephone, self.institution, self.admin, self.username)) g.db.commit() def passwd(self, password): curs = g.db.cursor() curs.execute("""SELECT Host FROM mysql.user WHERE User=%s""", self.username) hosts = [row[0] for row in curs.fetchall()] for host in hosts: curs.execute("""SET PASSWORD FOR %s@%s = PASSWORD(%s)""", (self.username, host, password)) g.db.commit()