view src/ltpdarepo/admin.py @ 236:39b50b763f11

New collections database representation.
author Daniele Nicolodi <daniele@grinta.net>
date Mon, 12 Dec 2011 15:10:28 +0100
parents f1c4825fef1c
children 3fee7db86a99
line wrap: on
line source

# Copyright 2011 Daniele Nicolodi <nicolodi@science.unitn.it>
#
# This software may be used and distributed according to the terms of
# the GNU Affero General Public License version 3 or any later version.

import argparse
import logging
import sys
import warnings

from contextlib import contextmanager
from string import upper

import MySQLdb as mysql

import ltpdarepo

from .database import Database
from .install import install
from .tests.populate import populate
from .upgrade import upgrade
from .user import User

try:
    import sqlalchemy
    HAS_SQL_ALCHEMY = True
except ImportError:
    HAS_SQL_ALCHEMY = False


def _bool(value):
    # parse string into boolean value
    if value not in ('yes', 'no', 'true', 'false'):
        raise argparse.ArgumentTypeError('invalid value for bool parameter: %s' % value)
    return value in ('yes', 'true')


class ArgumentParser(argparse.ArgumentParser):
    def _check_value(self, action, value):
        if action.choices is not None and value not in action.choices:
            self.error("unknown command %s" % value)

    def print_help(self, name=None, out=sys.stdout):

        commands = self._subparsers._group_actions[0].choices
        options = self._action_groups[1]._group_actions

        if name is not None:
            command = commands.get(name, None)
            if command is None:
                commands['help'].error("unknown command %s" % name)
            print >>out, commands[name].format_help().strip()
            return

        print >>out, self.description
        print >>out, self.format_usage().strip()
        print >>out, ""
        print >>out, "commands:"
        for name, cmd in sorted(commands.iteritems()):
            doc = ''
            if cmd.description:
                doc = cmd.description.splitlines()[0]
            print >>out, "  %-13s  %s" % (name, doc)
        print >>out, ""
        print >>out, "options:"
        for opt in options:
            print >>out, "  %-13s  %s" % (", ".join(opt.option_strings), opt.help)
        print >>out, ""


class Commands(object):
    def __init__(self):
        self.parser = ArgumentParser(add_help=False,
                                     description='LTPDA Repository administration tool')
        self.commands = self.parser.add_subparsers(metavar='command',
                                                   parser_class=argparse.ArgumentParser)

        self.parser.add_argument('-v', '--verbose', action='count',
                                 dest='verbosity',
                                 help='increase verbosity. may be specified multiple times')

        self.parser.add_argument('-h', '--help', action='help',
                                 help='show this help message')

    def add(self, func, name=None, help=None):
        name = name or func.__name__
        desc = help or func.__doc__
        parser = self.commands.add_parser(name, help=help,
                                          description=desc, add_help=False)
        parser.set_defaults(_command=func)
        return parser

    def parse(self, *args):
        return self.parser.parse_args(*args)


class Application(ltpdarepo.Application):

    commands = Commands()

    def __init__(self, *args, **kwargs):
        super(Application, self).__init__(*args, **kwargs)

        # database connection parameters that may be
        # overridden by command line arguments
        self.hostname = self.config['HOSTNAME']
        self.database = self.config['DATABASE']
        self.username = self.config['USERNAME']
        self.password = self.config['PASSWORD']

    def __getattr__(self, name):
        if name in self.config:
            return self.config[name]
        return self.__dict__[name]

    def connect(self, **kwargs):
        # open connection to the database
        return mysql.connect(host=self.hostname, user=self.username,
                             passwd=self.password, charset='utf8', **kwargs)

    @contextmanager
    def interact(self):
        # fake request
        ctx = self.test_request_context('')
        ctx.push()
        # execute before request handlers
        self.preprocess_request()

        yield

        # execute after request handlers
        self.process_response(self.response_class())
        ctx.pop()

    def dispatch(self):
        args = vars(self.commands.parser.parse_args())

        # verbosity
        verbosity = args.pop('verbosity')
        if verbosity:
            logger = logging.getLogger('ltpdarepo')
            logger.setLevel(logger.level - 10 * verbosity)

        # common parameters
        username = args.pop('_username', None)
        if username:
            self._username = username
        password = args.pop('_password', None)
        if username or password:
            self._password = password

        # run command
        command = args.pop('_command')
        command(self, **args)


    def help(self, command=None):
        """show this help message or given command help """
        self.commands.parser.print_help(command)
        self.commands.parser.exit()

    cmd = commands.add(help)
    cmd.add_argument('command', nargs='?', metavar='COMMAND')


    def user(self, username):
        """show user"""

        with self.interact():
            user = User.load(username)
            if user is None:
                raise Exception("user '%s' not found" % username)
            for name in user.__slots__:
                print '%12s: %s' % (name, getattr(user, name))

    cmd = commands.add(user)
    cmd.add_argument('username', metavar='USERNAME')


    def useradd(self, username, **kwargs):
        """create user account"""

        with self.interact():
            user = User.load(username)
            if user is not None:
                raise Exception("user '%s' exists" % username)
            user = User(username, **kwargs)
            user.create()

    cmd = commands.add(useradd)
    cmd.add_argument('username', metavar='USERNAME')
    cmd.add_argument('-a', '--admin', default=argparse.SUPPRESS, type=_bool)
    cmd.add_argument('-n', '--name', default=argparse.SUPPRESS)
    cmd.add_argument('-s', '--surname', default=argparse.SUPPRESS)
    cmd.add_argument('-e', '--email', default=argparse.SUPPRESS)


    def userdel(self, username):
        """delete user account"""

        with self.interact():
            user = User.load(username)
            if user is None:
                raise Exception("user '%s' not found" % username)
            user.delete()

    cmd = commands.add(userdel)
    cmd.add_argument('username', metavar='USERNAME')


    def usermod(self, username, **kwargs):
        """modify user account"""

        with self.interact():
            user = User.load(username)
            if user is None:
                raise Exception("user '%s' not found" % username)
            # update user
            for name, value in kwargs.iteritems():
                setattr(user, name, value)
            user.save()

    cmd = commands.add(usermod)
    cmd.add_argument('username', metavar='USERNAME')
    cmd.add_argument('-a', '--admin', default=argparse.SUPPRESS, type=_bool)
    cmd.add_argument('-n', '--name', default=argparse.SUPPRESS)
    cmd.add_argument('-s', '--surname', default=argparse.SUPPRESS)
    cmd.add_argument('-e', '--email', default=argparse.SUPPRESS)


    def passwd(self, username, password):
        """change password for a given user"""

        with self.interact():
            user = User.load(username)
            if user is None:
                raise Exception("user '%s' not found" % username)
            user.passwd(password)

    cmd = commands.add(passwd)
    cmd.add_argument('username', metavar='USERNAME')
    cmd.add_argument('password', metavar='PASSWORD')


    def grant(self, username, database, **privs):

        conn = self.connect()
        curs = conn.cursor()

        for priv in privs:
            curs.execute("GRANT %s ON %s.* TO %%s@%%s" %
                         (priv, database), (username, '%'))
            # explicitly grant privileges on transactions table
            curs.execute("GRANT INSERT ON %s.transactions TO %%s@%%s" %
                         (database, ), (username, '%'))

        conn.commit()
        conn.close()

    def _grant(self, username, database, privs):
        """grant permissions to users"""

        privs = dict((priv, True) for priv in privs)
        return self.grant(username, database, **privs)

    cmd = commands.add(_grant, name='grant')
    cmd.add_argument('username', metavar='USERNAME')
    cmd.add_argument('database', metavar='PASSWORD')
    cmd.add_argument('privs', metavar='PRIV', nargs='+', type=upper,
                     choices=frozenset(['SELECT', 'INSERT', 'UPDATE', 'DELETE']))


    def privileges(self, username, database=None):
        """show privileges for given user"""

        conn = self.connect()
        curs = conn.cursor()

        from collections import defaultdict
        privs = defaultdict(lambda: {'select': False, 'insert': False, 'update': False, 'delete': False})

        curs.execute('''SELECT DISTINCT Db, Select_priv, Insert_priv,
                        Update_priv, Delete_priv FROM mysql.db WHERE User=%s
                        ORDER BY Db''', username)
        for row in curs.fetchall():
            privs[row[0]] = {'select': row[1] == 'Y',
                             'insert': row[2] == 'Y',
                             'update': row[3] == 'Y',
                             'delete': row[4] == 'Y'}
        conn.close()

        if database is not None:
            return privs[database]
        return privs

    def _privileges(self, username):
        """show user privileges"""

        privs = self.privileges(username)
        if privs:
            headers = tuple('database select insert update delete'.split())
            print '%-20s %7s %7s %7s %7s' % headers
            for db, priv in privs.iteritems():
                values = [priv[x] and 'Y' or 'N' for x in 'select insert update delete'.split()]
                print '%-20s' % db, '%7s %7s %7s %7s' % tuple(values)

    cmd = commands.add(_privileges, name='privs')
    cmd.add_argument('username', metavar='USERNAME')


    def createdb(self, database, name='', description=''):
        """create database"""

        with self.interact():
            db = Database.load(database)
            if db is not None:
                raise Exception("database '%s' exists" % database)
            db = Database(database, name=name, description=description)
            db.create()

    cmd = commands.add(createdb)
    cmd.add_argument('database', metavar='DATABASE', help='database name')
    cmd.add_argument('-n', '--name', default='')
    cmd.add_argument('-d', '--description', default='')


    def dropdb(self, database):
        """delete database"""

        with self.interact():
            db = Database.load(database)
            if db is None:
                raise Exception('database "%s" not found' % database)
            db.drop()

    cmd = commands.add(dropdb)
    cmd.add_argument('database', metavar='DATABASE', help='database name')


    # install
    install = install
    cmd = commands.add(install)
    cmd.add_argument('--create-user', '-c', action='store_true', dest='createuser')
    cmd.add_argument('--user', '-u', dest='_username', metavar='USER')
    cmd.add_argument('--passwd', '-p', dest='_password', metavar='PASSWORD', default='')


    # upgrade
    upgrade = upgrade
    cmd = commands.add(upgrade)
    cmd.add_argument('--user', '-u', dest='_username', metavar='USER')
    cmd.add_argument('--passwd', '-p', dest='_password', metavar='PASSWORD', default='')
    cmd.add_argument('--from', type=float, dest='_from', metavar='REV')
    cmd.add_argument('--to', type=float, dest='_to', metavar='REV')


    # populate
    populate = populate
    cmd = commands.add(populate)
    cmd.add_argument('--user', '-u', dest='_username', metavar='USER')
    cmd.add_argument('--passwd', '-p', dest='_password', metavar='PASSWORD', default='')
    cmd.add_argument('database', help='database')
    cmd.add_argument('nobjs', nargs='?', default=30, type=int, help='number of obejcts')


    # dump
    if HAS_SQL_ALCHEMY:
        def dump(self, database, tables=None, out=sys.stdout):
            """dump database structure"""

            from .tests import schema
            schema.dump(self.USERNAME, self.PASSWORD, self.HOSTNAME,
                        database, tables=tables, out=out)

        cmd = commands.add(dump)
        cmd.add_argument('database')
        cmd.add_argument('--user', '-u', dest='_username', metavar='USER')
        cmd.add_argument('--passwd', '-p', dest='_password', metavar='PASSWORD', default='')
        cmd.add_argument('--tables', nargs='+')


    def wipe(self, yes=True):
        """delete all database content"""

        # connect to the database
        conn = self.connect()
        curs = conn.cursor()

        # databases list
        curs.execute("""SHOW DATABASES""")
        databases = [row[0] for row in curs.fetchall()]

        # delete databases
        for db in databases:
            if db not in ('mysql', 'information_schema'):
                curs.execute("""DROP DATABASE `%s`""" % db)

        # delete users
        curs.execute("""DELETE FROM mysql.user
                        WHERE user <> 'root' and user <> ''""")

        # delete privileges
        curs.execute("""DELETE FROM mysql.db""")
        curs.execute("""DELETE FROM mysql.tables_priv""")
        curs.execute("""DELETE FROM mysql.columns_priv""")

        # flush privileges
        curs.execute("""FLUSH PRIVILEGES""")

        conn.commit()
        conn.close()

    cmd = commands.add(wipe)
    cmd.add_argument('--user', '-u', dest='_username', metavar='USER')
    cmd.add_argument('--passwd', '-p', dest='_password', metavar='PASSWORD', default='')
    cmd.add_argument('--yes', action='store_true', required=True)


    def setup(self):
        """setup test environment"""

        self.install(createuser=True)
        self.useradd('u1', admin=True)
        self.passwd('u1', 'u1')
        self.createdb('db1', description=u'Test database One')
        self.createdb('db2', description=u'Test database Tw\u00F6')
        self.populate('db1', 30)
        self.grant('u1', 'db1', select=True, insert=True, update=True, delete=True)

    cmd = commands.add(setup)
    cmd.add_argument('--user', '-u', dest='_username', metavar='USER')
    cmd.add_argument('--passwd', '-p', dest='_password', metavar='PASSWORD', default='')


def main(conf=None):
    # setup logging
    handler = logging.StreamHandler()
    handler.setLevel(logging.DEBUG)
    logger = logging.getLogger('ltpdarepo')
    logger.addHandler(handler)
    logger.setLevel(logging.WARNING)

    # silence annoying mysql warnings
    warnings.filterwarnings('ignore', category=mysql.Warning)

    # dispatch command line
    app = Application(conf)
    app.dispatch()