Mercurial > hg > ltpdarepo
view src/ltpdarepo/admin.py @ 236:39b50b763f11
New collections database representation.
author | Daniele Nicolodi <daniele@grinta.net> |
---|---|
date | Mon, 12 Dec 2011 15:10:28 +0100 |
parents | f1c4825fef1c |
children | 3fee7db86a99 |
line wrap: on
line source
# Copyright 2011 Daniele Nicolodi <nicolodi@science.unitn.it> # # This software may be used and distributed according to the terms of # the GNU Affero General Public License version 3 or any later version. import argparse import logging import sys import warnings from contextlib import contextmanager from string import upper import MySQLdb as mysql import ltpdarepo from .database import Database from .install import install from .tests.populate import populate from .upgrade import upgrade from .user import User try: import sqlalchemy HAS_SQL_ALCHEMY = True except ImportError: HAS_SQL_ALCHEMY = False def _bool(value): # parse string into boolean value if value not in ('yes', 'no', 'true', 'false'): raise argparse.ArgumentTypeError('invalid value for bool parameter: %s' % value) return value in ('yes', 'true') class ArgumentParser(argparse.ArgumentParser): def _check_value(self, action, value): if action.choices is not None and value not in action.choices: self.error("unknown command %s" % value) def print_help(self, name=None, out=sys.stdout): commands = self._subparsers._group_actions[0].choices options = self._action_groups[1]._group_actions if name is not None: command = commands.get(name, None) if command is None: commands['help'].error("unknown command %s" % name) print >>out, commands[name].format_help().strip() return print >>out, self.description print >>out, self.format_usage().strip() print >>out, "" print >>out, "commands:" for name, cmd in sorted(commands.iteritems()): doc = '' if cmd.description: doc = cmd.description.splitlines()[0] print >>out, " %-13s %s" % (name, doc) print >>out, "" print >>out, "options:" for opt in options: print >>out, " %-13s %s" % (", ".join(opt.option_strings), opt.help) print >>out, "" class Commands(object): def __init__(self): self.parser = ArgumentParser(add_help=False, description='LTPDA Repository administration tool') self.commands = self.parser.add_subparsers(metavar='command', parser_class=argparse.ArgumentParser) self.parser.add_argument('-v', '--verbose', action='count', dest='verbosity', help='increase verbosity. may be specified multiple times') self.parser.add_argument('-h', '--help', action='help', help='show this help message') def add(self, func, name=None, help=None): name = name or func.__name__ desc = help or func.__doc__ parser = self.commands.add_parser(name, help=help, description=desc, add_help=False) parser.set_defaults(_command=func) return parser def parse(self, *args): return self.parser.parse_args(*args) class Application(ltpdarepo.Application): commands = Commands() def __init__(self, *args, **kwargs): super(Application, self).__init__(*args, **kwargs) # database connection parameters that may be # overridden by command line arguments self.hostname = self.config['HOSTNAME'] self.database = self.config['DATABASE'] self.username = self.config['USERNAME'] self.password = self.config['PASSWORD'] def __getattr__(self, name): if name in self.config: return self.config[name] return self.__dict__[name] def connect(self, **kwargs): # open connection to the database return mysql.connect(host=self.hostname, user=self.username, passwd=self.password, charset='utf8', **kwargs) @contextmanager def interact(self): # fake request ctx = self.test_request_context('') ctx.push() # execute before request handlers self.preprocess_request() yield # execute after request handlers self.process_response(self.response_class()) ctx.pop() def dispatch(self): args = vars(self.commands.parser.parse_args()) # verbosity verbosity = args.pop('verbosity') if verbosity: logger = logging.getLogger('ltpdarepo') logger.setLevel(logger.level - 10 * verbosity) # common parameters username = args.pop('_username', None) if username: self._username = username password = args.pop('_password', None) if username or password: self._password = password # run command command = args.pop('_command') command(self, **args) def help(self, command=None): """show this help message or given command help """ self.commands.parser.print_help(command) self.commands.parser.exit() cmd = commands.add(help) cmd.add_argument('command', nargs='?', metavar='COMMAND') def user(self, username): """show user""" with self.interact(): user = User.load(username) if user is None: raise Exception("user '%s' not found" % username) for name in user.__slots__: print '%12s: %s' % (name, getattr(user, name)) cmd = commands.add(user) cmd.add_argument('username', metavar='USERNAME') def useradd(self, username, **kwargs): """create user account""" with self.interact(): user = User.load(username) if user is not None: raise Exception("user '%s' exists" % username) user = User(username, **kwargs) user.create() cmd = commands.add(useradd) cmd.add_argument('username', metavar='USERNAME') cmd.add_argument('-a', '--admin', default=argparse.SUPPRESS, type=_bool) cmd.add_argument('-n', '--name', default=argparse.SUPPRESS) cmd.add_argument('-s', '--surname', default=argparse.SUPPRESS) cmd.add_argument('-e', '--email', default=argparse.SUPPRESS) def userdel(self, username): """delete user account""" with self.interact(): user = User.load(username) if user is None: raise Exception("user '%s' not found" % username) user.delete() cmd = commands.add(userdel) cmd.add_argument('username', metavar='USERNAME') def usermod(self, username, **kwargs): """modify user account""" with self.interact(): user = User.load(username) if user is None: raise Exception("user '%s' not found" % username) # update user for name, value in kwargs.iteritems(): setattr(user, name, value) user.save() cmd = commands.add(usermod) cmd.add_argument('username', metavar='USERNAME') cmd.add_argument('-a', '--admin', default=argparse.SUPPRESS, type=_bool) cmd.add_argument('-n', '--name', default=argparse.SUPPRESS) cmd.add_argument('-s', '--surname', default=argparse.SUPPRESS) cmd.add_argument('-e', '--email', default=argparse.SUPPRESS) def passwd(self, username, password): """change password for a given user""" with self.interact(): user = User.load(username) if user is None: raise Exception("user '%s' not found" % username) user.passwd(password) cmd = commands.add(passwd) cmd.add_argument('username', metavar='USERNAME') cmd.add_argument('password', metavar='PASSWORD') def grant(self, username, database, **privs): conn = self.connect() curs = conn.cursor() for priv in privs: curs.execute("GRANT %s ON %s.* TO %%s@%%s" % (priv, database), (username, '%')) # explicitly grant privileges on transactions table curs.execute("GRANT INSERT ON %s.transactions TO %%s@%%s" % (database, ), (username, '%')) conn.commit() conn.close() def _grant(self, username, database, privs): """grant permissions to users""" privs = dict((priv, True) for priv in privs) return self.grant(username, database, **privs) cmd = commands.add(_grant, name='grant') cmd.add_argument('username', metavar='USERNAME') cmd.add_argument('database', metavar='PASSWORD') cmd.add_argument('privs', metavar='PRIV', nargs='+', type=upper, choices=frozenset(['SELECT', 'INSERT', 'UPDATE', 'DELETE'])) def privileges(self, username, database=None): """show privileges for given user""" conn = self.connect() curs = conn.cursor() from collections import defaultdict privs = defaultdict(lambda: {'select': False, 'insert': False, 'update': False, 'delete': False}) curs.execute('''SELECT DISTINCT Db, Select_priv, Insert_priv, Update_priv, Delete_priv FROM mysql.db WHERE User=%s ORDER BY Db''', username) for row in curs.fetchall(): privs[row[0]] = {'select': row[1] == 'Y', 'insert': row[2] == 'Y', 'update': row[3] == 'Y', 'delete': row[4] == 'Y'} conn.close() if database is not None: return privs[database] return privs def _privileges(self, username): """show user privileges""" privs = self.privileges(username) if privs: headers = tuple('database select insert update delete'.split()) print '%-20s %7s %7s %7s %7s' % headers for db, priv in privs.iteritems(): values = [priv[x] and 'Y' or 'N' for x in 'select insert update delete'.split()] print '%-20s' % db, '%7s %7s %7s %7s' % tuple(values) cmd = commands.add(_privileges, name='privs') cmd.add_argument('username', metavar='USERNAME') def createdb(self, database, name='', description=''): """create database""" with self.interact(): db = Database.load(database) if db is not None: raise Exception("database '%s' exists" % database) db = Database(database, name=name, description=description) db.create() cmd = commands.add(createdb) cmd.add_argument('database', metavar='DATABASE', help='database name') cmd.add_argument('-n', '--name', default='') cmd.add_argument('-d', '--description', default='') def dropdb(self, database): """delete database""" with self.interact(): db = Database.load(database) if db is None: raise Exception('database "%s" not found' % database) db.drop() cmd = commands.add(dropdb) cmd.add_argument('database', metavar='DATABASE', help='database name') # install install = install cmd = commands.add(install) cmd.add_argument('--create-user', '-c', action='store_true', dest='createuser') cmd.add_argument('--user', '-u', dest='_username', metavar='USER') cmd.add_argument('--passwd', '-p', dest='_password', metavar='PASSWORD', default='') # upgrade upgrade = upgrade cmd = commands.add(upgrade) cmd.add_argument('--user', '-u', dest='_username', metavar='USER') cmd.add_argument('--passwd', '-p', dest='_password', metavar='PASSWORD', default='') cmd.add_argument('--from', type=float, dest='_from', metavar='REV') cmd.add_argument('--to', type=float, dest='_to', metavar='REV') # populate populate = populate cmd = commands.add(populate) cmd.add_argument('--user', '-u', dest='_username', metavar='USER') cmd.add_argument('--passwd', '-p', dest='_password', metavar='PASSWORD', default='') cmd.add_argument('database', help='database') cmd.add_argument('nobjs', nargs='?', default=30, type=int, help='number of obejcts') # dump if HAS_SQL_ALCHEMY: def dump(self, database, tables=None, out=sys.stdout): """dump database structure""" from .tests import schema schema.dump(self.USERNAME, self.PASSWORD, self.HOSTNAME, database, tables=tables, out=out) cmd = commands.add(dump) cmd.add_argument('database') cmd.add_argument('--user', '-u', dest='_username', metavar='USER') cmd.add_argument('--passwd', '-p', dest='_password', metavar='PASSWORD', default='') cmd.add_argument('--tables', nargs='+') def wipe(self, yes=True): """delete all database content""" # connect to the database conn = self.connect() curs = conn.cursor() # databases list curs.execute("""SHOW DATABASES""") databases = [row[0] for row in curs.fetchall()] # delete databases for db in databases: if db not in ('mysql', 'information_schema'): curs.execute("""DROP DATABASE `%s`""" % db) # delete users curs.execute("""DELETE FROM mysql.user WHERE user <> 'root' and user <> ''""") # delete privileges curs.execute("""DELETE FROM mysql.db""") curs.execute("""DELETE FROM mysql.tables_priv""") curs.execute("""DELETE FROM mysql.columns_priv""") # flush privileges curs.execute("""FLUSH PRIVILEGES""") conn.commit() conn.close() cmd = commands.add(wipe) cmd.add_argument('--user', '-u', dest='_username', metavar='USER') cmd.add_argument('--passwd', '-p', dest='_password', metavar='PASSWORD', default='') cmd.add_argument('--yes', action='store_true', required=True) def setup(self): """setup test environment""" self.install(createuser=True) self.useradd('u1', admin=True) self.passwd('u1', 'u1') self.createdb('db1', description=u'Test database One') self.createdb('db2', description=u'Test database Tw\u00F6') self.populate('db1', 30) self.grant('u1', 'db1', select=True, insert=True, update=True, delete=True) cmd = commands.add(setup) cmd.add_argument('--user', '-u', dest='_username', metavar='USER') cmd.add_argument('--passwd', '-p', dest='_password', metavar='PASSWORD', default='') def main(conf=None): # setup logging handler = logging.StreamHandler() handler.setLevel(logging.DEBUG) logger = logging.getLogger('ltpdarepo') logger.addHandler(handler) logger.setLevel(logging.WARNING) # silence annoying mysql warnings warnings.filterwarnings('ignore', category=mysql.Warning) # dispatch command line app = Application(conf) app.dispatch()