view src/ltpdarepo/database.py @ 90:c55432c9600b

Adapt tests to user handling changes.
author Daniele Nicolodi <daniele@grinta.net>
date Sun, 21 Aug 2011 18:17:27 +0200
parents 18820d874f33
children fbab144c296c
line wrap: on
line source

import re
import os.path

from flask import g
from wtforms import validators
from wtforms.fields import TextField
from wtforms.validators import ValidationError

from MySQLdb.cursors import DictCursor

from ltpdarepo.form import Form


class IDatabase(Form):
    id = TextField("Id", validators=[validators.Required(), ])
    name = TextField("Name")
    description = TextField("Description")

    def validate_id(form, field):
        expr = r'^[0-9a-zA-Z\-\._]+$'
        if not re.match(expr, field.data):
            raise ValidationError(u"Invalid ID.")
        curs = g.db.cursor()
        curs.execute("SHOW DATABASES")
        dbs = [r[0] for r in curs.fetchall()]
        if field.data in dbs:
            raise ValidationError(u"MySQL already contains a datbase with this ID.")


class Database(object):
    __slots__ = ('id', 'name', 'description')

    def __init__(self, id='', name='', description=''):
        self.id = id
        self.name = name
        self.description = description

    @staticmethod
    def load(id):
        curs = g.db.cursor(DictCursor)
        curs.execute("""SELECT db_name AS id, name, description
                        FROM available_dbs WHERE db_name=%s""", id)
        db = curs.fetchone()
        if db is None:
            return None
        return Database(**db)

    def create(self):
        curs = g.db.cursor()

        curs.execute("""CREATE DATABASE `%s`""" % self.id)
        curs.execute("""INSERT INTO available_dbs (db_name, name, description, version)
                        VALUES (%s, %s, %s, 2)""", (self.id, self.name, self.description))

        initdb(g.db, self.id)

        g.db.commit()

    def save(self):
        curs = g.db.cursor()
        curs.execute("""UPDATE available_dbs
                        SET name=%s, description=%s
                        WHERE db_name=%s""", (self.name, self.description, self.id))
        g.db.commit()

    def drop(self):
        curs = g.db.cursor()

        # remove database from ltpda databases list
        curs.execute('DELETE FROM available_dbs WHERE db_name=%s', self.id)
        # drop database
        curs.execute('DROP DATABASE `%s`' % self.id)
        # revoke privileges assigned for the database
        curs.execute('DELETE FROM mysql.db WHERE Db=%s', self.id)
        # flush privileges
        curs.execute('FLUSH PRIVILEGES')

        g.db.commit()


def initdb(conn, database):
    curs = conn.cursor()

    # store current database
    curs.execute("SELECT DATABASE()")
    current = curs.fetchone()[0]
    # switch to the database we want to populate
    curs.execute("USE `%s`" % database)
    try:
        # create tables
        pwd = os.path.dirname(__file__)
        sql = open(os.path.join(pwd, 'sql', 'database.sql'))
        for stmt in [x.strip() for x in sql.read().split(';')]:
            if stmt:
                curs.execute(stmt)
        # create view
        curs.execute("CREATE VIEW `users` AS "
                     "SELECT id, username FROM `%s`.users" % current)
    finally:
        # switch back to default database
        curs.execute("USE `%s`" % current)