view src/ltpdarepo/admin.py @ 201:ac0a27a72b9e

Reorganize Flask application setup code.
author Daniele Nicolodi <daniele@grinta.net>
date Wed, 16 Nov 2011 16:09:39 +0100
parents fbab144c296c
children 10801d55c5d5
line wrap: on
line source

# Copyright 2011 Daniele Nicolodi <nicolodi@science.unitn.it>
#
# This software may be used and distributed according to the terms of
# the GNU Affero General Public License version 3 or any later version.

import sys
import argparse
import logging
import warnings

from contextlib import contextmanager
from string import lower

import MySQLdb as mysql

try:
    import sqlalchemy
    HAS_SQL_ALCHEMY = True
except ImportError:
    HAS_SQL_ALCHEMY = False


from .user import User
from .database import Database
from .config import HOSTNAME, DATABASE, USERNAME, PASSWORD


from . import Application
app = Application()


@contextmanager
def interact(app):
    # fake request
    ctx = app.test_request_context('')
    ctx.push()
    # execute before request handlers
    app.preprocess_request()

    yield

    # execute after request handlers
    app.process_response(app.response_class())
    ctx.pop()


class verbosity(argparse.Action):
    def __call__(self, *args, **kwargs):
        # increse logging level
        logger = logging.getLogger('ltpdarepo')
        logger.setLevel(logger.level - 10)


class Commands(object):
    def __init__(self):
        self.parser = argparse.ArgumentParser(add_help=False)
        self.commands = self.parser.add_subparsers(metavar='command')
        options = self.parser.add_argument_group('options')
        options.add_argument('-v', '--verbose', action=verbosity,
                             nargs=0, dest=argparse.SUPPRESS,
                             help='increase verbosity')

    def add_argument(self, *args, **kwargs):
        return self.parser.add_argument(*args, **kwargs)

    def add(self, func, name=None):
        name = name or func.__name__
        desc = func.__doc__ or ' '
        parser = self.commands.add_parser(name,
                                          help=desc.splitlines()[0],
                                          description=desc,
                                          add_help=False)
        parser.set_defaults(command=func)
        return parser

    def parse(self, *args):
        return self.parser.parse_args(*args)

    def dispatch(self):
        args = vars(self.parser.parse_args())
        command = args.pop('command')
        try:
            command(**args)
        except Exception:
            import traceback
            sys.stderr.write(traceback.format_exc())
            sys.exit(1)

    def __iter__(self):
        return iter(sorted(self.commands.choices.keys()))

    def __getitem__(self, name):
        return self.commands.choices[name]


commands = Commands()


from .install import install
cmd = commands.add(install)

from .upgrade import upgrade
cmd = commands.add(upgrade)
cmd.add_argument('--from', type=float, dest='_from', metavar='REV')
cmd.add_argument('--to', type=float, dest='_to', metavar='REV')


if HAS_SQL_ALCHEMY:
    from .tests import schema

    def dump(database, tables=None, out=sys.stdout):
        """dump database structure text representation"""

        schema.dump(USERNAME, PASSWORD, HOSTNAME, database, tables=tables, out=out)

    cmd = commands.add(dump)
    cmd.add_argument('database')
    cmd.add_argument('--tables', nargs='+')


def useradd(username, password=None, **kwargs):
    """create user account"""

    with interact(app):
        user = User.load(username)
        if user is not None:
            raise Exception('User already present')
        user = User(username, **kwargs)
        user.create()
        if password:
            user.passwd(password)

cmd = commands.add(useradd)
cmd.add_argument('username', metavar='USERNAME')
cmd.add_argument('-p', '--password', default='')
cmd.add_argument('-n', '--name', default='')
cmd.add_argument('-s', '--surname', default='')
cmd.add_argument('-e', '--email', default='')


def userdel(username):
    """delete user account"""

    with interact(app):
        user = User.load(username)
        if user is None:
            raise Exception('user not found')
        user.delete()

cmd = commands.add(userdel)
cmd.add_argument('username', metavar='USERNAME')


def passwd(username, password):
    """change password for a given user"""

    with interact(app):
        user = User.load(username)
        if user is None:
            raise Exception('user not found')
        user.passwd(password)

cmd = commands.add(passwd)
cmd.add_argument('username', metavar='USERNAME')
cmd.add_argument('password', metavar='PASSWORD')


def grant(username, database, privs):
    """grant permissions to given user for a specific database"""

    conn = mysql.connect(host=HOSTNAME, db=DATABASE, user=USERNAME, passwd=PASSWORD, charset='utf8')
    curs = conn.cursor()

    privs = list(privs)
    if 'admin' in privs:
        curs.execute('''UPDATE users SET is_admin=1 WHERE username=%s''', username)
        privs.remove('admin')

    for priv in privs:
        curs.execute('''GRANT %s ON %s.* TO %%s@%%s''' % (priv, database), (username, '%'))

    conn.commit()
    conn.close()

cmd = commands.add(grant)
cmd.add_argument('username', metavar='USERNAME')
cmd.add_argument('database', metavar='DATABASE')
cmd.add_argument('privs', metavar='PRIV', nargs='+', type=lower,
                 choices=frozenset(['select', 'insert', 'update', 'delete', 'admin']))


def privileges(username, database=None):
    """show privileges for given user"""

    conn = mysql.connect(host=HOSTNAME, db=DATABASE, user=USERNAME, passwd=PASSWORD, charset='utf8')
    curs = conn.cursor()

    from collections import defaultdict
    privs = defaultdict(lambda: {'select': False, 'insert': False, 'update': False, 'delete': False})

    curs.execute('''SELECT DISTINCT Db, Select_priv, Insert_priv,
                    Update_priv, Delete_priv FROM mysql.db WHERE User=%s''', username)
    for row in curs.fetchall():
        privs[row[0]] = {'select': row[1] == 'Y',
                         'insert': row[2] == 'Y',
                         'update': row[3] == 'Y',
                         'delete': row[4] == 'Y'}
    conn.close()
    if database is not None:
        return privs[database]
    return privs


def _privileges(username):
    privs = privileges(username)
    if privs:
        headers = tuple('username select insert update delete'.split())
        print '%-20s %7s %7s %7s %7s' % headers
    for user, priv in privs.iteritems():
        values = [priv[x] and 'Y' or 'N' for x in 'select insert update delete'.split()]
        print '%-20s' % username, '%7s %7s %7s %7s' % tuple(values)

cmd = commands.add(_privileges, name='privileges')
cmd.add_argument('username', metavar='USERNAME')


def createdb(database, name='', description=''):
    """create database"""

    with interact(app):
        db = Database.load(database)
        if db is not None:
            raise Exception('database "%s" exists' % database)
        db = Database(database, name=name, description=description)
        db.create()

cmd = commands.add(createdb, name='create-database')
cmd.add_argument('database', help='database name')
cmd.add_argument('-n', '--name', default='')
cmd.add_argument('-d', '--description', default='')


def dropdb(database):
    """delete database"""

    with interact(app):
        db = Database.load(database)
        if db is None:
            raise Exception('database "%s" not found' % database)
        db.drop()

cmd = commands.add(dropdb, name='drop-database')
cmd.add_argument('database', help='database name')


def wipe(yes=True):
    """delete all database content"""

    conn = mysql.connect(host=HOSTNAME, db='', user=USERNAME, passwd=PASSWORD, charset='utf8')
    curs = conn.cursor()

    # databases list
    curs.execute("""SHOW DATABASES""")
    databases = [row[0] for row in curs.fetchall()]

    # delete databases
    for db in databases:
        if db not in ('mysql', 'information_schema'):
            curs.execute("""DROP DATABASE `%s`""" % db)

    # delete users
    curs.execute("""DELETE FROM mysql.user
                    WHERE user <> 'root' and user <> ''""")

    # delete privileges
    curs.execute("""DELETE FROM mysql.db""")
    curs.execute("""DELETE FROM mysql.tables_priv""")
    curs.execute("""DELETE FROM mysql.columns_priv""")

    # flush privileges
    curs.execute("""FLUSH PRIVILEGES""")

    conn.commit()
    conn.close()

cmd = commands.add(wipe)
cmd.add_argument('--yes', action='store_true', required=True)


def populate(database, nobjs):
    """populate a dababase witn fake objects"""

    conn = mysql.connect(host=HOSTNAME, db=database, user=USERNAME, passwd=PASSWORD, charset='utf8')
    curs = conn.cursor()

    from datetime import datetime, timedelta
    import uuid
    import random
    import re

    words = re.split('\W+', lorem)
    sentences = [s + '.' for s in [s.strip() for s in lorem.split('.')] if s]

    nsecs = 100
    t0 = datetime(1970, 1, 1, 0, 0, 0)

    for i in range(nobjs):

        name = random.choice(words)
        title = ' '.join(words[0:2])
        description = random.choice(sentences)
        analysis = random.choice(sentences)
        submitted = datetime.utcnow() - timedelta(days=random.randint(0, nobjs / 10.0))

        curs.execute("""INSERT INTO objs (xml, uuid) VALUES (%s, %s)""",
                     ('<?xml version="1.0" encoding="UTF-8" standalone="no"?>', str(uuid.uuid4())))
        objid = curs.lastrowid
        curs.execute("""INSERT INTO objmeta (
                          obj_id, obj_type, name, created, version, ip, hostname, os,
                          submitted, experiment_title, experiment_desc, analysis_desc,
                          quantity, additional_authors, additional_comments, keywords,
                          reference_ids, validated, vdate, author)
                        VALUES (
                          %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
                          %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
                     (objid, 'ao', name, datetime.utcnow(), '0.1', '127.0.0.1', 'localhost', 'any',
                      submitted, title, description, analysis, 'FOO', '', '', 'testing', '',
                      None, None, None))
        curs.execute("""INSERT INTO ao (obj_id, data_type, description)
                        VALUES (%s, %s, %s)""", (objid, 'tsdata', 'Foo'))
        curs.execute("""INSERT INTO tsdata (obj_id, fs, t0, nsecs)
                        VALUES (%s, %s, %s, %s)""", (objid, 10, t0, nsecs))
        t0 += timedelta(seconds=nsecs)

    conn.commit()

cmd = commands.add(populate)
cmd.add_argument('database', help='database')
cmd.add_argument('nobjs', nargs='?', default=30, type=int, help='number of obejcts')


def setup():
    """setup test environment"""

    install()
    useradd('u1', password='u1')
    grant('u1', '%', ['admin', ])
    createdb('db1', description=u'Test database One')
    createdb('db2', description=u'Test database Tw\u00F6')
    populate('db1', 30)
    grant('u1', 'db1', ['select', 'insert', 'update', 'delete'])

cmd = commands.add(setup)


def help(name=None):
    """show help for a specific command or commands overview

    With no arguments, print a list of commands with short help
    messages.  Given a command name, print help for that command.
    """

    if name is not None:
        print commands[name].format_help().strip()
        return

    print "LTPDA Repository administration tool"
    print commands.parser.format_usage().strip()
    print ""
    print "commands:"
    for cmd in commands:
        doc = commands[cmd].description or ' '
        print "  %-15s  %s" % (cmd, doc.splitlines()[0])
    print ""
    print "options:"
    print "  %-15s  %s" % ("-v, --verbose", "increase verbosity. may be specified multiple times")
    print ""

cmd = commands.add(help)
cmd.add_argument('name', nargs='?', help='command', metavar='NAME')


def main():
    # setup logging
    handler = logging.StreamHandler()
    handler.setLevel(logging.DEBUG)
    logger = logging.getLogger('ltpdarepo')
    logger.addHandler(handler)
    logger.setLevel(logging.WARNING)

    # silence annoying mysql warnings
    warnings.filterwarnings('ignore', category=mysql.Warning)

    # dispatch command line
    commands.dispatch()


if __name__ == '__main__':
    main()


lorem = """Lorem ipsum dolor sit amet, consectetur adipiscing
elit. Nam at velit lacus, quis interdum ligula. Fusce imperdiet
aliquam augue, ut volutpat mi adipiscing nec. Quisque vitae augue
felis, ut ultrices sapien. Etiam accumsan convallis dignissim. Nulla
ullamcorper consequat urna, a tempor diam volutpat vitae. Nunc
sollicitudin dapibus auctor. Cras a risus neque. Duis cursus tempus
metus vel pharetra. Morbi euismod neque eget justo dapibus sit amet
fermentum arcu fringilla. Sed gravida, sem tincidunt gravida
scelerisque, turpis ligula adipiscing lorem, vitae cursus ipsum eros
ut elit. Duis non mattis diam. Curabitur eget mauris vitae lacus
sodales elementum. Maecenas auctor dapibus molestie. Sed id diam sed
eros tincidunt semper.

Integer in massa lorem. Nam eleifend euismod ipsum a convallis. Cras
odio orci, ornare non mattis eget, pellentesque non turpis. Ut feugiat
nunc eget mi venenatis hendrerit. Ut blandit, quam id accumsan
commodo, turpis orci sollicitudin lacus, eu iaculis arcu ligula quis
dolor. Donec posuere, mauris in pellentesque aliquam, est nibh
pharetra lacus, et hendrerit diam erat sit amet purus. Curabitur et
vehicula urna. Etiam quis enim est. Nunc sagittis urna sit amet augue
euismod interdum tincidunt sem feugiat. Phasellus nisl est, ultrices
at cursus vitae, auctor at diam.

Vivamus non diam urna. Phasellus aliquet, eros molestie vestibulum
imperdiet, eros est semper purus, et aliquet lectus eros non
eros. Vivamus laoreet diam sit amet nisi euismod sed sollicitudin
ipsum vehicula. Mauris ut tristique magna. Donec augue felis,
dignissim at mollis et, ultricies dictum magna. Mauris ac leo
mauris. Vivamus pulvinar, tellus in ullamcorper euismod, ipsum magna
rutrum erat, eu convallis nibh dui id ante. Proin eu tincidunt
velit. Fusce ipsum massa, luctus quis malesuada at, porta sed
nibh. Nam molestie fringilla mi, eget sagittis purus accumsan
ut. Donec luctus faucibus tempus. Aliquam erat volutpat. Vivamus
egestas accumsan libero, eu rutrum nunc placerat sit amet. Quisque
iaculis dictum lorem, eu adipiscing mi aliquet placerat. Maecenas
eleifend, felis vel placerat viverra, lectus nisl aliquam purus, nec
dictum orci nisi mollis sapien. Duis rhoncus diam in felis tempor
rhoncus. Fusce lectus arcu, viverra in fringilla sit amet, ullamcorper
at mauris.

Nunc ac ornare felis. In hac habitasse platea dictumst. Nam ac turpis
vitae lectus porta placerat imperdiet ac urna. Integer sed varius
diam. Praesent at neque sed arcu dictum dapibus. Quisque aliquet
adipiscing ullamcorper. Nullam adipiscing vulputate libero in
lobortis. Morbi turpis neque, vehicula et placerat et, gravida
venenatis mi. Etiam tincidunt nunc et nulla ullamcorper vestibulum ac
vitae sem. Ut malesuada adipiscing nisl et scelerisque. Nulla non
risus purus. Fusce vulputate, urna a egestas laoreet, tellus dui
convallis magna, sit amet facilisis ipsum metus id nisi.

Suspendisse condimentum ultricies sapien, eget viverra lorem luctus
vel. Pellentesque augue sapien, rutrum sit amet volutpat ut,
ullamcorper nec odio. Donec vehicula lacus non risus mollis at
vulputate libero tristique. Ut commodo pretium nisl ut malesuada. Sed
molestie, est et varius vestibulum, sem nunc venenatis quam, a euismod
elit lorem non ligula. Proin interdum molestie placerat. Duis pulvinar
lorem sed elit ullamcorper hendrerit pellentesque enim interdum. Morbi
sit amet dolor in felis ullamcorper dictum in vel arcu. Sed fringilla
sapien nisi, vel iaculis mauris. Duis molestie luctus eleifend. Nunc
lacus tortor, mattis eget consectetur ac, volutpat et nisi. Donec in
nisi magna.

Morbi sed risus est, et fringilla tortor. Proin ipsum ipsum, tincidunt
id scelerisque sed, ultricies vitae elit. Curabitur lobortis ipsum nec
enim egestas aliquam semper justo mattis. Duis pulvinar, augue nec
suscipit porta, nulla ante egestas turpis, ut tincidunt erat libero
vitae enim. Duis ullamcorper feugiat dui, ut dapibus urna eleifend
tristique. In vehicula, risus ut rutrum euismod, ipsum libero
dignissim leo, in mollis ante lectus nec purus. Quisque non dui vel
nunc ullamcorper iaculis. Vestibulum ante ipsum primis in faucibus
orci luctus et ultrices posuere cubilia Curae; Pellentesque habitant
morbi tristique senectus et netus et malesuada fames ac turpis
egestas. Sed condimentum elementum orci, id semper arcu egestas
nec. Nam commodo feugiat tortor, nec porttitor tortor congue sit
amet. Suspendisse aliquet, ligula ac mattis feugiat, leo odio gravida
nunc, sed accumsan nunc mauris porttitor leo. Fusce imperdiet
vestibulum sem, nec varius metus porttitor nec. Nulla facilisi. Morbi
venenatis ante at felis molestie ut placerat leo imperdiet.

Suspendisse condimentum neque nec massa volutpat luctus. Integer a
ante non mauris rutrum lacinia et nec ligula. Proin tincidunt laoreet
sapien eu iaculis. Fusce tempus commodo metus, quis adipiscing ligula
volutpat eu. Vestibulum ultrices, magna non congue pharetra, mi lacus
imperdiet quam, accumsan malesuada magna sapien ac arcu. Vestibulum
sagittis rhoncus egestas. Class aptent taciti sociosqu ad litora
torquent per conubia nostra, per inceptos himenaeos. Nam gravida
congue sagittis. Aliquam adipiscing, tellus pulvinar interdum tempus,
eros massa elementum mi, at pulvinar mauris nisl ac diam. Lorem ipsum
dolor sit amet, consectetur adipiscing elit. Curabitur scelerisque
risus id odio viverra ac tincidunt elit lobortis. Sed rhoncus dapibus
magna, sed ultricies enim ultrices a. Vestibulum nec nibh
eros. Quisque volutpat eleifend ultricies. Mauris et orci dolor. Ut
orci nisl, sagittis vel cursus at, rutrum sit amet odio. Mauris
malesuada massa sit amet ligula lacinia lacinia. Sed a nisi libero, at
pulvinar lorem.

Curabitur egestas commodo purus, non imperdiet diam auctor sit
amet. Duis eget lectus leo. Pellentesque lobortis risus et libero
ultricies ornare. In lobortis pharetra sapien, id pretium turpis
tempus eget. Suspendisse mollis, mauris non bibendum placerat, nisi
velit ullamcorper magna, vel imperdiet tellus ligula mattis
tortor. Sed et urna eu ipsum interdum commodo. Suspendisse vehicula
sagittis nibh, vitae congue arcu placerat at. Nulla nulla risus,
mattis quis imperdiet non, pharetra at lorem. Curabitur viverra
eleifend sem, ac dictum nulla elementum in. Pellentesque orci metus,
vestibulum sit amet ultrices et, tincidunt sed velit. Cras sapien
lectus, semper sed tincidunt a, dignissim sit amet nisl. In ultricies
odio ac est consequat nec posuere nisi scelerisque. Vivamus in sapien
eu lacus consequat tempor. Proin in purus purus. Curabitur et velit
vitae risus viverra fringilla vitae at nunc."""