view src/ltpdarepo/user.py @ 91:5c1c7f2d6469

Implement user password reset. Use the same mechanism used for account activation for password reset.
author Daniele Nicolodi <daniele@grinta.net>
date Sun, 21 Aug 2011 18:17:27 +0200
parents 7d03f602cade
children fbab144c296c
line wrap: on
line source

from email.utils import formataddr

from flask import g
from wtforms import validators
from wtforms.fields import TextField, PasswordField, BooleanField
from wtforms.validators import ValidationError

from MySQLdb.cursors import DictCursor

from ltpdarepo.form import Form

INVALIDPASSWORD = '!xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'


class IUser(Form):
    username = TextField("Username",
        validators=[validators.Required(),
                    validators.Length(max=16,
                                      message=u'Usernames cannot be '
                                      'longer than 16 characters'),
                    validators.Regexp(r'^[a-zA-Z][0-9a-zA-Z\-\._]+$',
                                      message=u'Invalid identifier.')])
    name = TextField("Given name")
    surname = TextField("Family name")
    email = TextField("Email",
                      validators=[validators.Required(),
                                  validators.Email()])
    telephone = TextField("Telephone")
    institution = TextField("Institution")
    admin = BooleanField("Admin")

    def validate_username(form, field):
        curs = g.db.cursor()
        curs.execute("SELECT DISTINCT user FROM mysql.user WHERE user <> ''")
        users = [r[0] for r in curs.fetchall()]
        if field.data in users:
            raise ValidationError(u"MySQL already contains an user with this username.")


class IPassword(Form):
    password = PasswordField()
    confirm = PasswordField()

    def validate_password(form, field):
        if not form.password.data == form.confirm.data:
            raise validators.ValidationError(u"Passwords do not match.")


class User(object):
    __slots__ = ('username', 'name', 'surname', 'email',
                 'telephone', 'institution', 'admin')

    def __init__(self, username='', name='', surname='', email='',
                 telephone='', institution='', admin=False):
        self.username = username
        self.name = name
        self.surname = surname
        self.email = email
        self.telephone = telephone
        self.institution = institution
        self.admin = bool(admin)

    def __getitem__(self, name):
        return getattr(self, name)

    @property
    def emailaddr(self):
        # user email address in the form 'Name Surname <email>'
        return formataddr(('%s %s' % (self.name, self.surname), self.email))

    @staticmethod
    def load(username):
        curs = g.db.cursor(DictCursor)
        curs.execute("""SELECT username,
                               given_name AS name,
                               family_name AS surname,
                               email, institution, telephone,
                               is_admin AS admin
                        FROM users WHERE username=%s""", username)
        user = curs.fetchone()
        if user is None:
            return None
        return User(**user)

    def create(self):
        # use an invalid password to prevent user login
        password = INVALIDPASSWORD

        curs = g.db.cursor()

        for host in ('localhost', '%'):
            curs.execute("""CREATE USER %s@%s IDENTIFIED BY PASSWORD %s""",
                         (self.username, host, password))

        curs.execute("""INSERT INTO users (username, given_name, family_name,
                                           email, telephone, institution, is_admin)
                        VALUES (%s, %s, %s, %s, %s, %s, %s)""",
                     (self.username, self.name, self.surname,
                      self.email, self.telephone, self.institution, self.admin))

        g.db.commit()

    def delete(self):
        curs = g.db.cursor()

        curs.execute("""DELETE FROM users WHERE username=%s""", self.username)
        curs.execute("""SELECT Host FROM mysql.user WHERE User=%s""", self.username)
        hosts = [row[0] for row in curs.fetchall()]
        for host in hosts:
            curs.execute("""DROP USER %s@%s""", (self.username, host))

        g.db.commit()

    def reset(self):
        curs = g.db.cursor()

        curs.execute("""SELECT Host FROM mysql.user WHERE User=%s""", self.username)
        hosts = [row[0] for row in curs.fetchall()]
        for host in hosts:
            curs.execute("""SET PASSWORD FOR %s@%s = %s""",
                         (self.username, host, INVALIDPASSWORD))

        g.db.commit()

    def save(self):
        curs = g.db.cursor()

        curs.execute("""UPDATE users SET given_name=%s, family_name=%s, email=%s,
                                         institution=%s, telephone=%s, is_admin=%s
                        WHERE username=%s""",
                     (self.name, self.surname, self.email,
                      self.telephone, self.institution, self.admin, self.username))

        g.db.commit()

    def passwd(self, password):
        curs = g.db.cursor()

        curs.execute("""SELECT Host FROM mysql.user WHERE User=%s""", self.username)
        hosts = [row[0] for row in curs.fetchall()]
        for host in hosts:
            curs.execute("""SET PASSWORD FOR %s@%s = PASSWORD(%s)""",
                         (self.username, host, password))

        g.db.commit()