view src/ltpdarepo/upgrade.py @ 188:fbab144c296c

Add license information.
author Daniele Nicolodi <daniele@grinta.net>
date Tue, 08 Nov 2011 15:23:32 +0100
parents c690b879ddee
children 10801d55c5d5
line wrap: on
line source

# Copyright 2011 Daniele Nicolodi <nicolodi@science.unitn.it>
#
# This software may be used and distributed according to the terms of
# the GNU Affero General Public License version 3 or any later version.

import logging
from functools import partial

import MySQLdb as mysql

from ltpdarepo import SCHEMA
from .config import HOSTNAME, DATABASE, USERNAME, PASSWORD

# upgrade steps register
steps = {}


def register(r0, r1, func=None):
    # register upgrade functions
    if func == None:
        return partial(register, r0, r1)
    steps[r0] = (r0, r1, func)
    return func


def upgrade(_from=None, _to=None):
    """run database schema upgrade steps"""
    logger = logging.getLogger(__name__)

    conn = mysql.connect(host=HOSTNAME, db=DATABASE, user=USERNAME, passwd=PASSWORD, charset='utf8')
    curs = conn.cursor()

    # current schema version
    curs.execute("""SELECT value+0 FROM options WHERE name='version'""")
    curr = curs.fetchone()[0]
    logger.info("current database shema v%s", curr)

    # allow to force upgrade from specific version
    if _from is not None:
        curr = _from
        logger.info("force upgrade from schema v%s", curr)

    # allow to force upgrade to specific version
    dest = SCHEMA
    if _to is not None:
        dest = _to
        logger.info("force upgrade to schema v%s", dest)

    while curr < dest:
        # next upgrade step
        step = steps.get(curr, None)
        if step is None:
            raise ValueError("no upgrade path from database shema v%s" % curr)
        fromr, tor, func = step

        # run upgrade step
        logger.info("upgrading database schema from v%s to v%s", fromr, tor)
        func(conn)

        # update current schema version
        curr = tor
        curs.execute("""UPDATE options SET value=%s
                        WHERE name='version'""", str(curr))

    conn.commit()
    conn.close()


@register(2.3, 2.4)
def upgrade_23_24(conn):
    curs = conn.cursor()

    # no idea why this table is here
    curs.execute("DROP TABLE IF EXISTS bobjs")

    # unused so far and in the wrong incarnation
    curs.execute("DROP TABLE IF EXISTS queries")

    conn.commit()


@register(2.4, 2.5)
def upgrade_24_to_25(conn):
    curs = conn.cursor()

    # make mysql beheave: use sql strict mode
    curs.execute("""SET GLOBAL SQL_MODE='STRICT_TRANS_TABLES'""")

    # consolidate privileges: there is no need to specify grants
    # both for 'localhost' and for '%' hosts. drop privileges granted
    # for hosts different than '%'
    curs.execute("""DELETE mysql.db FROM mysql.db, users
                    WHERE User=username AND Host <> '%'""")

    # drop privileges granted explicitly on 'transactions' tables
    curs.execute("""DELETE mysql.tables_priv FROM mysql.tables_priv, users
                    WHERE User=username AND Table_name='transactions'""")

    # drop privileges granted on 'test' database
    curs.execute("""DELETE mysql.db FROM mysql.db, users
                    WHERE User=username AND Db='test'""")

    # consolidate users accounts: there is no need to specify user
    # accounts both for 'localhost' and for '127.0.0.1' hosts. drop
    # user accounts for '127.0.0.1'
    curs.execute("""DELETE mysql.user FROM mysql.user, users
                    WHERE User=username AND Host='127.0.0.1'""")

    # reload grant tables
    curs.execute("FLUSH PRIVILEGES")

    # drop unused tables
    curs.execute("DROP TABLE IF EXISTS user_access")
    curs.execute("DROP TABLE IF EXISTS user_hosts")

    # drop 'password' column from users table in administrative
    # database: authentication is done using mysql database
    curs.execute("DESCRIBE users")
    fields = [row[0] for row in curs.fetchall()]
    if 'password' in fields:
        curs.execute("ALTER TABLE users DROP COLUMN password")

    # for each registered database
    curs.execute("SELECT db_name FROM available_dbs")
    databases = [row[0] for row in curs.fetchall()]
    for db in databases:
        # replace 'users' table with a view
        curs.execute("""DROP TABLE IF EXISTS `%s`.users""" % db)
        curs.execute("""CREATE VIEW `%s`.users AS
                        SELECT id, username FROM `%s`.users""" % (db, DATABASE))

    conn.commit()


@register(2.5, 2.6)
def upgrade_25_to_26(conn):
    curs = conn.cursor()

    # crerate "queries" table
    curs.execute("""CREATE TABLE IF NOT EXISTS queries (
                    id INT NOT NULL AUTO_INCREMENT,
                    title TEXT NOT NULL,
                    db TEXT NOT NULL,
                    query TEXT NOT NULL,
                    PRIMARY KEY (id)) CHARSET=utf8""")

    # add 'version' field to 'available_dbs' table if not already there
    curs.execute("DESCRIBE available_dbs")
    fields = [row[0] for row in curs.fetchall()]
    if 'version' not in fields:
        curs.execute("""ALTER TABLE available_dbs
                        ADD COLUMN version INT DEFAULT 1
                        COMMENT 'database layout version'""")

    conn.commit()


@register(2.6, 2.7)
def upgrade_26_to_27(conn):
    logger = logging.getLogger(__name__)
    curs = conn.cursor()

    # store current database name
    curs.execute("SELECT DATABASE()")
    current = curs.fetchone()[0]

    # for each registered database with the old version
    curs.execute("SELECT db_name FROM available_dbs WHERE version < 2")
    databases = [row[0] for row in curs.fetchall()]
    for db in databases:
        try:
            curs.execute("USE `%s`" % db)
            # detect dabases with hybrid schema version
            curs.execute("DESCRIBE objmeta")
            fields = [row[0] for row in curs.fetchall()]
            if 'id' not in fields:
                # perform schema upgrade from hybrid schema version
                logger.info("  upgrading hybrid database %s", db)
                hybrid(conn)
            else:
                # perform schema upgrade
                logger.info("  upgrading database %s", db)
                transmogrify(conn)
        except Exception:
            import traceback
            logger.info("  FAILED!")
            traceback.print_exc()
        else:
            # update version information
            curs.execute("USE `%s`" % current)
            curs.execute("""UPDATE available_dbs SET version=2
                            WHERE db_name=%s""", db)

    # switch back to default database
    curs.execute("USE `%s`" % current)


def transmogrify(conn):
    """the big bad database schema upgrade"""

    logger = logging.getLogger(__name__)
    curs = conn.cursor()

    # upgrade "objmeta" table
    curs.execute("ALTER TABLE objmeta DROP COLUMN id")
    curs.execute("ALTER TABLE objmeta MODIFY obj_id INT(11) UNSIGNED NOT NULL")

    # list of object types in obtained with
    # grep classdef classes/\@* -r | grep ltpda_ | grep -v Hidden | awk '{print $2}' | sort
    curs.execute("""ALTER TABLE objmeta MODIFY obj_type ENUM('ao', 'collection',
                    'filterbank', 'matrix', 'mfir', 'miir', 'parfrac', 'pest',
                    'plist', 'pzmodel', 'rational', 'smodel', 'ssm', 'timespan') NOT NULL""")

    # upgrade "bobjs" table
    curs.execute("ALTER TABLE bobjs DROP COLUMN id")
    curs.execute("ALTER TABLE bobjs MODIFY obj_id INT(11) UNSIGNED NOT NULL")
    curs.execute("ALTER TABLE bobjs DROP INDEX object_index")

    # upgrade "xxdata" tables
    for table in ('cdata', 'fsdata', 'tsdata', 'xydata'):
        # construct new table name
        newtable = '%s%d' % (table, 2)

        # upgrade
        curs.execute("DROP TABLE IF EXISTS %s" % newtable)
        curs.execute("CREATE TABLE %s SELECT * FROM %s" % (newtable, table))
        curs.execute("ALTER TABLE %s ADD COLUMN obj_id INT(11) UNSIGNED NOT NULL FIRST" % newtable)
        curs.execute("UPDATE %s AS new, ao SET new.obj_id=ao.obj_id WHERE new.id=ao.data_id" % newtable)

        # remove columns
        curs.execute("ALTER TABLE %s DROP COLUMN id" % newtable)

        # drop old table
        curs.execute("DROP TABLE %s" % table)
        curs.execute("CREATE TABLE %s SELECT * FROM %s" % (table, newtable))
        curs.execute("DROP TABLE %s" % newtable)

    # upgrade "cdata" table
    curs.execute("ALTER TABLE cdata DROP COLUMN xunits")

    # upgrade "ao" table
    curs.execute("ALTER TABLE ao DROP COLUMN id")
    curs.execute("ALTER TABLE ao MODIFY obj_id INT(11) UNSIGNED NOT NULL")
    curs.execute("ALTER TABLE ao DROP COLUMN mfilename")
    curs.execute("ALTER TABLE ao DROP COLUMN mdlfilename")
    curs.execute("ALTER TABLE ao DROP COLUMN data_id")
    curs.execute("""ALTER TABLE ao MODIFY data_type
                    ENUM('cdata', 'tsdata', 'fsdata', 'xydata') NOT NULL""")

    # upgrade "xxir" tables
    for table in ('mfir', 'miir'):
        curs.execute("ALTER TABLE %s MODIFY obj_id INT(11) UNSIGNED NOT NULL" % table)
        curs.execute("ALTER TABLE %s DROP COLUMN id" % table)

    # use "utf8" encoding
    curs.execute("SHOW TABLES")
    for table in [d[0] for d in curs.fetchall()]:
        dcurs = conn.cursor(mysql.cursors.DictCursor)
        dcurs.execute("DESCRIBE %s" % table)
        columns = [d['Field'] for d in dcurs.fetchall() if d['Type'] == 'text']
        for col in columns:
            curs.execute("ALTER TABLE %s MODIFY %s TEXT CHARACTER SET utf8" % (table, col))

    # upgrade to innodb storage
    curs.execute("SHOW TABLES")
    tables = [t[0] for t in curs.fetchall()]
    conn.autocommit(False)
    for table in tables:
        # "users" is a view and not a table
        if table in ('users', ):
            continue
        curs = conn.cursor()
        curs.execute("ALTER TABLE %s ENGINE = InnoDB" % table)
        # commit after each table conversion
        conn.commit()

    # for all tables
    curs.execute("SHOW TABLES")
    tables = [t[0] for t in curs.fetchall()]
    for table in tables:
        # except "objs", "collections" and "transactions"
        if table in ('collections', 'objs', 'transactions', 'users'):
            continue
        # add primary key
        logger.info("    add primary key to table %s", table)
        curs.execute("ALTER TABLE %s ADD PRIMARY KEY (obj_id)" % table)
        # add foreign keys
        logger.info("    add foreign key to table %s", table)
        curs.execute("""ALTER TABLE %s ADD FOREIGN KEY (obj_id)
                        REFERENCES objs(id) ON DELETE CASCADE""" % table)
        conn.commit()


def hybrid(conn):
    """big bad database schema upgrade for hybrid tables"""

    logger = logging.getLogger(__name__)
    curs = conn.cursor()

    # upgrade "xxir" tables
    for table in ('mfir', 'miir'):
        curs.execute("DESCRIBE %s" % table)
        fields = [row[0] for row in curs.fetchall()]
        if 'id' not in fields:
            continue
        # drop 'id' column
        logger.info("    fix table %s", table)
        curs.execute("ALTER TABLE %s MODIFY obj_id INT(11) UNSIGNED NOT NULL" % table)
        curs.execute("ALTER TABLE %s DROP COLUMN id" % table)

    # upgrade "cdata" table
    curs.execute("DESCRIBE cdata")
    fields = [row[0] for row in curs.fetchall()]
    if 'xunits' in fields:
        logger.info("    fix table cdata")
        curs.execute("ALTER TABLE cdata DROP COLUMN xunits")

    # for all tables
    curs.execute("SHOW TABLES")
    tables = [t[0] for t in curs.fetchall()]
    for table in tables:
        # except "objs", "collections" and "transactions"
        if table in ('collections', 'objs', 'transactions', 'users'):
            continue
        # add primary key
        curs.execute("DESCRIBE %s obj_id" % table)
        row = curs.fetchone()
        if row[3] != 'PRI':
            logger.info("    add primary key to table %s", table)
            curs.execute("ALTER TABLE %s ADD PRIMARY KEY (obj_id)" % table)
        # add foreign keys
        fk = _get_foreign_keys(conn, table)
        for name in fk:
            logger.info("    foreign key %s present for table %s", name, table)
        if not fk:
            logger.info("    add foreign key to table %s", table)
            curs.execute("""ALTER TABLE %s ADD FOREIGN KEY (obj_id)
                            REFERENCES objs(id) ON DELETE CASCADE""" % table)
        conn.commit()


def _get_foreign_keys(conn, table):
    curs = conn.cursor()
    curs.execute("SELECT DATABASE()")
    database = curs.fetchone()[0]
    curs.execute("""SELECT CONCAT(table_name, '.', column_name) AS 'foreign key',  
                           CONCAT(referenced_table_name, '.', referenced_column_name) AS 'references',
                           constraint_name
                      FROM information_schema.key_column_usage
                     WHERE referenced_table_name IS NOT NULL
                       AND constraint_schema=%s AND table_name=%s AND column_name='obj_id'
                  ORDER BY constraint_name""", (database, table))
    return [row[2] for row in curs.fetchall()]


@register(2.7, 2.8)
def upgrade_27_to_28(conn):
    curs = conn.cursor()

    # for each registered database
    curs.execute("SELECT db_name FROM available_dbs")
    databases = [row[0] for row in curs.fetchall()]
    for db in databases:
        # add 'toffset' field to 'tsdata' table
        curs.execute("DESCRIBE `%s`.tsdata" % db)
        fields = [row[0] for row in curs.fetchall()]
        if 'toffset' not in fields:
            curs.execute("""ALTER TABLE `%s`.tsdata
                            ADD COLUMN toffset BIGINT NOT NULL DEFAULT 0""" % db)

    conn.commit()


@register(2.8, 29)
def upgrade_28_29(conn):
    curs = conn.cursor()

    # support procedure for activity view
    curs.execute("DROP PROCEDURE IF EXISTS makeintervals")
    curs.execute("""
    CREATE PROCEDURE makeintervals(startdate timestamp, enddate timestamp, intval integer, unitval varchar(10))
    BEGIN
    -- create temporary timeintervals table with dt field specifed
    -- from the startdate and enddate arguments at intervals of intval
    -- unitval size

    declare thisdate timestamp;
    declare nextdate timestamp;
    set thisdate = startdate;
    
    -- create temp table
    drop temporary table if exists timeintervals;
    create temporary table if not exists timeintervals (begin timestamp, end timestamp);

    -- loop through the startdate adding each intval interval until enddate
    repeat
        select
           case unitval
               when 'SECOND'      then timestampadd(SECOND, intval, thisdate)
               when 'MINUTE'      then timestampadd(MINUTE, intval, thisdate)
               when 'HOUR'        then timestampadd(HOUR, intval, thisdate)
               when 'DAY'         then timestampadd(DAY, intval, thisdate)
               when 'WEEK'        then timestampadd(WEEK, intval, thisdate)
               when 'MONTH'       then timestampadd(MONTH, intval, thisdate)
               when 'QUARTER'     then timestampadd(QUARTER, intval, thisdate)
               when 'YEAR'        then timestampadd(YEAR, intval, thisdate)
           end into nextdate;

      insert into timeintervals select thisdate, timestampadd(second, -1, nextdate);
      set thisdate = nextdate;
    until thisdate > enddate
    end repeat;
    END;""")

    # for each registered database
    curs.execute("SELECT db_name FROM available_dbs")
    databases = [row[0] for row in curs.fetchall()]
    for db in databases:
        # add index on "submitted" column to speed up activity view
        curs.execute("""ALTER TABLE `%s`.objmeta ADD INDEX (submitted)""" % db)