view src/ltpdarepo/admin.py @ 202:10801d55c5d5

Reorganize command line interface code.
author Daniele Nicolodi <daniele@grinta.net>
date Wed, 16 Nov 2011 19:08:45 +0100
parents ac0a27a72b9e
children f8f898b39058
line wrap: on
line source

# Copyright 2011 Daniele Nicolodi <nicolodi@science.unitn.it>
#
# This software may be used and distributed according to the terms of
# the GNU Affero General Public License version 3 or any later version.

import argparse
import logging
import sys
import warnings

from contextlib import contextmanager
from string import upper

import MySQLdb as mysql

import ltpdarepo

from .database import Database
from .install import install
from .tests.populate import populate
from .upgrade import upgrade
from .user import User

try:
    import sqlalchemy
    HAS_SQL_ALCHEMY = True
except ImportError:
    HAS_SQL_ALCHEMY = False


class verbosity(argparse.Action):
    def __call__(self, *args, **kwargs):
        # increse logging level
        logger = logging.getLogger('ltpdarepo')
        logger.setLevel(logger.level - 10)


def _bool(value):
    # parse string into boolean value
    if value not in ('yes', 'no', 'true', 'false'):
        raise argparse.ArgumentTypeError('invalid value for bool parameter: %s' % value)
    return value in ('yes', 'true')


class Commands(object):
    def __init__(self):
        self.parser = argparse.ArgumentParser(add_help=False)
        self.commands = self.parser.add_subparsers(metavar='command')
        options = self.parser.add_argument_group('options')
        options.add_argument('-v', '--verbose', action=verbosity,
                             nargs=0, dest=argparse.SUPPRESS,
                             help='increase verbosity')

    def add(self, func, name=None):
        name = name or func.__name__
        desc = func.__doc__ or ' '
        parser = self.commands.add_parser(name,
                                          help=desc.splitlines()[0],
                                          description=desc,
                                          add_help=False)
        parser.set_defaults(command=func)
        return parser

    def parse(self, *args):
        return self.parser.parse_args(*args)

    def dispatch(self):
        args = vars(self.parser.parse_args())
        command = args.pop('command')
        try:
            command(**args)
        except Exception:
            import traceback
            sys.stderr.write(traceback.format_exc())
            sys.exit(1)

    def __iter__(self):
        return iter(sorted(self.commands.choices.keys()))

    def __getitem__(self, name):
        return self.commands.choices[name]


class Application(ltpdarepo.Application):

    commands = Commands()

    def __init__(self, *args, **kwargs):
        super(Application, self).__init__(*args, **kwargs)

        # database connection parameters that may be
        # overridden by command line arguments
        self._hostname = self.config['HOSTNAME']
        self._database = self.config['DATABASE']
        self._username = self.config['USERNAME']
        self._password = self.config['PASSWORD']

    def __getattr__(self, name):
        if name in self.config:
            return self.config[name]
        return self[name]

    def connect(self, **kwargs):
        # open connection to the database
        return mysql.connect(host=self._hostname, user=self._username,
                             passwd=self._password, charset='utf8', **kwargs)

    @contextmanager
    def interact(self):
        # fake request
        ctx = self.test_request_context('')
        ctx.push()
        # execute before request handlers
        self.preprocess_request()

        yield

        # execute after request handlers
        self.process_response(self.response_class())
        ctx.pop()

    def dispatch(self):
        args = vars(self.commands.parser.parse_args())

        # common parameters
        username = args.pop('_username', None)
        if username:
            self._username = username
        password = args.pop('_password', None)
        if username or password:
            self._password = password

        # run command
        command = args.pop('command')
        command(self, **args)


    def help(self, name=None, out=sys.stderr):
        """provide commands help"""

        if name is not None:
            print >>out, self.commands[name].format_help().strip()
            return

        print >>out, "LTPDA Repository administration tool"
        print >>out, self.commands.parser.format_usage().strip()
        print >>out, ""
        print >>out, "commands:"
        for cmd in self.commands:
            doc = self.commands[cmd].description or ' '
            print >>out, "  %-15s  %s" % (cmd, doc.splitlines()[0])
        print >>out, ""
        print >>out, "options:"
        print >>out, "  %-15s  %s" % ("-v, --verbose", "increase verbosity. may be specified multiple times")
        print >>out, ""

    cmd = commands.add(help)
    cmd.add_argument('name', nargs='?', help='command', metavar='NAME')


    def user(self, username):
        """show user"""

        with self.interact():
            user = User.load(username)
            if user is None:
                raise Exception("user '%s' not found" % username)
            for name in user.__slots__:
                print '%12s: %s' % (name, getattr(user, name))

    cmd = commands.add(user)
    cmd.add_argument('username')


    def useradd(self, username, **kwargs):
        """create user account"""

        with self.interact():
            user = User.load(username)
            if user is not None:
                raise Exception("user '%s' exists" % username)
            user = User(username, **kwargs)
            user.create()

    cmd = commands.add(useradd)
    cmd.add_argument('username', metavar='USERNAME')
    cmd.add_argument('-a', '--admin', default=argparse.SUPPRESS, type=_bool)
    cmd.add_argument('-n', '--name', default=argparse.SUPPRESS)
    cmd.add_argument('-s', '--surname', default=argparse.SUPPRESS)
    cmd.add_argument('-e', '--email', default=argparse.SUPPRESS)


    def userdel(self, username):
        """delete user account"""

        with self.interact():
            user = User.load(username)
            if user is None:
                raise Exception("user '%s' not found" % username)
            user.delete()

    cmd = commands.add(userdel)
    cmd.add_argument('username', metavar='USERNAME')


    def usermod(self, username, **kwargs):
        """modify user account"""

        with self.interact():
            user = User.load(username)
            if user is None:
                raise Exception("user '%s' not found" % username)
            # update user
            for name, value in kwargs.iteritems():
                setattr(user, name, value)
            user.save()

    cmd = commands.add(usermod)
    cmd.add_argument('username', metavar='username')
    cmd.add_argument('-a', '--admin', default=argparse.SUPPRESS, type=_bool)
    cmd.add_argument('-n', '--name', default=argparse.SUPPRESS)
    cmd.add_argument('-s', '--surname', default=argparse.SUPPRESS)
    cmd.add_argument('-e', '--email', default=argparse.SUPPRESS)


    def passwd(self, username, password):
        """change password for a given user"""

        with self.interact():
            user = User.load(username)
            if user is None:
                raise Exception("user '%s' not found" % username)
            user.passwd(password)

    cmd = commands.add(passwd)
    cmd.add_argument('username', metavar='USERNAME')
    cmd.add_argument('password', metavar='PASSWORD')


    def grant(self, username, database, **privs):

        conn = self.connect()
        curs = conn.cursor()

        for priv in privs:
            curs.execute('''GRANT %s ON %s.* TO %%s@%%s''' % (priv, database), (username, '%'))

        conn.commit()
        conn.close()

    def _grant(self, username, database, privs):
        """grant permissions to users"""

        privs = dict((priv, True) for priv in privs)
        return self.grant(username, database, **privs)

    cmd = commands.add(_grant, name='grant')
    cmd.add_argument('username')
    cmd.add_argument('database')
    cmd.add_argument('privs', metavar='PRIV', nargs='+', type=upper,
                     choices=frozenset(['SELECT', 'INSERT', 'UPDATE', 'DELETE']))


    def privileges(self, username, database=None):
        """show privileges for given user"""

        conn = self.connect()
        curs = conn.cursor()

        from collections import defaultdict
        privs = defaultdict(lambda: {'select': False, 'insert': False, 'update': False, 'delete': False})

        curs.execute('''SELECT DISTINCT Db, Select_priv, Insert_priv,
                        Update_priv, Delete_priv FROM mysql.db WHERE User=%s
                        ORDER BY Db''', username)
        for row in curs.fetchall():
            privs[row[0]] = {'select': row[1] == 'Y',
                             'insert': row[2] == 'Y',
                             'update': row[3] == 'Y',
                             'delete': row[4] == 'Y'}
        conn.close()

        if database is not None:
            return privs[database]
        return privs

    def _privileges(self, username):
        """show user privileges"""

        privs = self.privileges(username)
        if privs:
            headers = tuple('database select insert update delete'.split())
            print '%-20s %7s %7s %7s %7s' % headers
            for db, priv in privs.iteritems():
                values = [priv[x] and 'Y' or 'N' for x in 'select insert update delete'.split()]
                print '%-20s' % db, '%7s %7s %7s %7s' % tuple(values)

    cmd = commands.add(_privileges, name='privs')
    cmd.add_argument('username', metavar='USERNAME')


    def createdb(self, database, name='', description=''):
        """create database"""

        with self.interact():
            db = Database.load(database)
            if db is not None:
                raise Exception("database '%s' exists" % database)
            db = Database(database, name=name, description=description)
            db.create()

    cmd = commands.add(createdb)
    cmd.add_argument('database', help='database name')
    cmd.add_argument('-n', '--name', default='')
    cmd.add_argument('-d', '--description', default='')


    def dropdb(self, database):
        """delete database"""

        with self.interact():
            db = Database.load(database)
            if db is None:
                raise Exception('database "%s" not found' % database)
            db.drop()

    cmd = commands.add(dropdb)
    cmd.add_argument('database', help='database name')


    # install
    install = install
    cmd = commands.add(install)
    cmd.add_argument('--user', '-u', dest='_username', metavar='USER')
    cmd.add_argument('--passwd', '-p', dest='_password', metavar='PASSWORD', default='')


    # upgrade
    upgrade = upgrade
    cmd = commands.add(upgrade)
    cmd.add_argument('--user', '-u', dest='_username', metavar='USER')
    cmd.add_argument('--passwd', '-p', dest='_password', metavar='PASSWORD', default='')
    cmd.add_argument('--from', type=float, dest='_from', metavar='REV')
    cmd.add_argument('--to', type=float, dest='_to', metavar='REV')


    # populate
    populate = populate
    cmd = commands.add(populate)
    cmd.add_argument('--user', '-u', dest='_username', metavar='USER')
    cmd.add_argument('--passwd', '-p', dest='_password', metavar='PASSWORD', default='')
    cmd.add_argument('database', help='database')
    cmd.add_argument('nobjs', nargs='?', default=30, type=int, help='number of obejcts')


    # dump
    if HAS_SQL_ALCHEMY:
        def dump(self, database, tables=None, out=sys.stdout):
            """dump database structure"""

            from .tests import schema
            schema.dump(self.USERNAME, self.PASSWORD, self.HOSTNAME,
                        database, tables=tables, out=out)

        cmd = commands.add(dump)
        cmd.add_argument('database')
        cmd.add_argument('--user', '-u', dest='_username', metavar='USER')
        cmd.add_argument('--passwd', '-p', dest='_password', metavar='PASSWORD', default='')
        cmd.add_argument('--tables', nargs='+')


    def wipe(self, yes=True):
        """delete all database content"""

        # connect to the database
        conn = self.connect()
        curs = conn.cursor()

        # databases list
        curs.execute("""SHOW DATABASES""")
        databases = [row[0] for row in curs.fetchall()]

        # delete databases
        for db in databases:
            if db not in ('mysql', 'information_schema'):
                curs.execute("""DROP DATABASE `%s`""" % db)

        # delete users
        curs.execute("""DELETE FROM mysql.user
                        WHERE user <> 'root' and user <> ''""")

        # delete privileges
        curs.execute("""DELETE FROM mysql.db""")
        curs.execute("""DELETE FROM mysql.tables_priv""")
        curs.execute("""DELETE FROM mysql.columns_priv""")

        # flush privileges
        curs.execute("""FLUSH PRIVILEGES""")

        conn.commit()
        conn.close()

    cmd = commands.add(wipe)
    cmd.add_argument('--user', '-u', dest='_username', metavar='USER')
    cmd.add_argument('--passwd', '-p', dest='_password', metavar='PASSWORD', default='')
    cmd.add_argument('--yes', action='store_true', required=True)


    def setup(self):
        """setup test environment"""

        self.install()
        self.useradd('u1', admin=True)
        self.passwd('u1', 'u1')
        self.createdb('db1', description=u'Test database One')
        self.createdb('db2', description=u'Test database Tw\u00F6')
        self.populate('db1', 30)
        self.grant('u1', 'db1', select=True, insert=True, update=True, delete=True)

    cmd = commands.add(setup)
    cmd.add_argument('--user', '-u', dest='_username', metavar='USER')
    cmd.add_argument('--passwd', '-p', dest='_password', metavar='PASSWORD', default='')


def main(conf=None):
    # setup logging
    handler = logging.StreamHandler()
    handler.setLevel(logging.DEBUG)
    logger = logging.getLogger('ltpdarepo')
    logger.addHandler(handler)
    logger.setLevel(logging.WARNING)

    # silence annoying mysql warnings
    warnings.filterwarnings('ignore', category=mysql.Warning)

    # dispatch command line
    app = Application(conf)
    app.dispatch()