Mercurial > hg > ltpdarepo
view src/ltpdarepo/admin.py @ 202:10801d55c5d5
Reorganize command line interface code.
author | Daniele Nicolodi <daniele@grinta.net> |
---|---|
date | Wed, 16 Nov 2011 19:08:45 +0100 |
parents | ac0a27a72b9e |
children | f8f898b39058 |
line wrap: on
line source
# Copyright 2011 Daniele Nicolodi <nicolodi@science.unitn.it> # # This software may be used and distributed according to the terms of # the GNU Affero General Public License version 3 or any later version. import argparse import logging import sys import warnings from contextlib import contextmanager from string import upper import MySQLdb as mysql import ltpdarepo from .database import Database from .install import install from .tests.populate import populate from .upgrade import upgrade from .user import User try: import sqlalchemy HAS_SQL_ALCHEMY = True except ImportError: HAS_SQL_ALCHEMY = False class verbosity(argparse.Action): def __call__(self, *args, **kwargs): # increse logging level logger = logging.getLogger('ltpdarepo') logger.setLevel(logger.level - 10) def _bool(value): # parse string into boolean value if value not in ('yes', 'no', 'true', 'false'): raise argparse.ArgumentTypeError('invalid value for bool parameter: %s' % value) return value in ('yes', 'true') class Commands(object): def __init__(self): self.parser = argparse.ArgumentParser(add_help=False) self.commands = self.parser.add_subparsers(metavar='command') options = self.parser.add_argument_group('options') options.add_argument('-v', '--verbose', action=verbosity, nargs=0, dest=argparse.SUPPRESS, help='increase verbosity') def add(self, func, name=None): name = name or func.__name__ desc = func.__doc__ or ' ' parser = self.commands.add_parser(name, help=desc.splitlines()[0], description=desc, add_help=False) parser.set_defaults(command=func) return parser def parse(self, *args): return self.parser.parse_args(*args) def dispatch(self): args = vars(self.parser.parse_args()) command = args.pop('command') try: command(**args) except Exception: import traceback sys.stderr.write(traceback.format_exc()) sys.exit(1) def __iter__(self): return iter(sorted(self.commands.choices.keys())) def __getitem__(self, name): return self.commands.choices[name] class Application(ltpdarepo.Application): commands = Commands() def __init__(self, *args, **kwargs): super(Application, self).__init__(*args, **kwargs) # database connection parameters that may be # overridden by command line arguments self._hostname = self.config['HOSTNAME'] self._database = self.config['DATABASE'] self._username = self.config['USERNAME'] self._password = self.config['PASSWORD'] def __getattr__(self, name): if name in self.config: return self.config[name] return self[name] def connect(self, **kwargs): # open connection to the database return mysql.connect(host=self._hostname, user=self._username, passwd=self._password, charset='utf8', **kwargs) @contextmanager def interact(self): # fake request ctx = self.test_request_context('') ctx.push() # execute before request handlers self.preprocess_request() yield # execute after request handlers self.process_response(self.response_class()) ctx.pop() def dispatch(self): args = vars(self.commands.parser.parse_args()) # common parameters username = args.pop('_username', None) if username: self._username = username password = args.pop('_password', None) if username or password: self._password = password # run command command = args.pop('command') command(self, **args) def help(self, name=None, out=sys.stderr): """provide commands help""" if name is not None: print >>out, self.commands[name].format_help().strip() return print >>out, "LTPDA Repository administration tool" print >>out, self.commands.parser.format_usage().strip() print >>out, "" print >>out, "commands:" for cmd in self.commands: doc = self.commands[cmd].description or ' ' print >>out, " %-15s %s" % (cmd, doc.splitlines()[0]) print >>out, "" print >>out, "options:" print >>out, " %-15s %s" % ("-v, --verbose", "increase verbosity. may be specified multiple times") print >>out, "" cmd = commands.add(help) cmd.add_argument('name', nargs='?', help='command', metavar='NAME') def user(self, username): """show user""" with self.interact(): user = User.load(username) if user is None: raise Exception("user '%s' not found" % username) for name in user.__slots__: print '%12s: %s' % (name, getattr(user, name)) cmd = commands.add(user) cmd.add_argument('username') def useradd(self, username, **kwargs): """create user account""" with self.interact(): user = User.load(username) if user is not None: raise Exception("user '%s' exists" % username) user = User(username, **kwargs) user.create() cmd = commands.add(useradd) cmd.add_argument('username', metavar='USERNAME') cmd.add_argument('-a', '--admin', default=argparse.SUPPRESS, type=_bool) cmd.add_argument('-n', '--name', default=argparse.SUPPRESS) cmd.add_argument('-s', '--surname', default=argparse.SUPPRESS) cmd.add_argument('-e', '--email', default=argparse.SUPPRESS) def userdel(self, username): """delete user account""" with self.interact(): user = User.load(username) if user is None: raise Exception("user '%s' not found" % username) user.delete() cmd = commands.add(userdel) cmd.add_argument('username', metavar='USERNAME') def usermod(self, username, **kwargs): """modify user account""" with self.interact(): user = User.load(username) if user is None: raise Exception("user '%s' not found" % username) # update user for name, value in kwargs.iteritems(): setattr(user, name, value) user.save() cmd = commands.add(usermod) cmd.add_argument('username', metavar='username') cmd.add_argument('-a', '--admin', default=argparse.SUPPRESS, type=_bool) cmd.add_argument('-n', '--name', default=argparse.SUPPRESS) cmd.add_argument('-s', '--surname', default=argparse.SUPPRESS) cmd.add_argument('-e', '--email', default=argparse.SUPPRESS) def passwd(self, username, password): """change password for a given user""" with self.interact(): user = User.load(username) if user is None: raise Exception("user '%s' not found" % username) user.passwd(password) cmd = commands.add(passwd) cmd.add_argument('username', metavar='USERNAME') cmd.add_argument('password', metavar='PASSWORD') def grant(self, username, database, **privs): conn = self.connect() curs = conn.cursor() for priv in privs: curs.execute('''GRANT %s ON %s.* TO %%s@%%s''' % (priv, database), (username, '%')) conn.commit() conn.close() def _grant(self, username, database, privs): """grant permissions to users""" privs = dict((priv, True) for priv in privs) return self.grant(username, database, **privs) cmd = commands.add(_grant, name='grant') cmd.add_argument('username') cmd.add_argument('database') cmd.add_argument('privs', metavar='PRIV', nargs='+', type=upper, choices=frozenset(['SELECT', 'INSERT', 'UPDATE', 'DELETE'])) def privileges(self, username, database=None): """show privileges for given user""" conn = self.connect() curs = conn.cursor() from collections import defaultdict privs = defaultdict(lambda: {'select': False, 'insert': False, 'update': False, 'delete': False}) curs.execute('''SELECT DISTINCT Db, Select_priv, Insert_priv, Update_priv, Delete_priv FROM mysql.db WHERE User=%s ORDER BY Db''', username) for row in curs.fetchall(): privs[row[0]] = {'select': row[1] == 'Y', 'insert': row[2] == 'Y', 'update': row[3] == 'Y', 'delete': row[4] == 'Y'} conn.close() if database is not None: return privs[database] return privs def _privileges(self, username): """show user privileges""" privs = self.privileges(username) if privs: headers = tuple('database select insert update delete'.split()) print '%-20s %7s %7s %7s %7s' % headers for db, priv in privs.iteritems(): values = [priv[x] and 'Y' or 'N' for x in 'select insert update delete'.split()] print '%-20s' % db, '%7s %7s %7s %7s' % tuple(values) cmd = commands.add(_privileges, name='privs') cmd.add_argument('username', metavar='USERNAME') def createdb(self, database, name='', description=''): """create database""" with self.interact(): db = Database.load(database) if db is not None: raise Exception("database '%s' exists" % database) db = Database(database, name=name, description=description) db.create() cmd = commands.add(createdb) cmd.add_argument('database', help='database name') cmd.add_argument('-n', '--name', default='') cmd.add_argument('-d', '--description', default='') def dropdb(self, database): """delete database""" with self.interact(): db = Database.load(database) if db is None: raise Exception('database "%s" not found' % database) db.drop() cmd = commands.add(dropdb) cmd.add_argument('database', help='database name') # install install = install cmd = commands.add(install) cmd.add_argument('--user', '-u', dest='_username', metavar='USER') cmd.add_argument('--passwd', '-p', dest='_password', metavar='PASSWORD', default='') # upgrade upgrade = upgrade cmd = commands.add(upgrade) cmd.add_argument('--user', '-u', dest='_username', metavar='USER') cmd.add_argument('--passwd', '-p', dest='_password', metavar='PASSWORD', default='') cmd.add_argument('--from', type=float, dest='_from', metavar='REV') cmd.add_argument('--to', type=float, dest='_to', metavar='REV') # populate populate = populate cmd = commands.add(populate) cmd.add_argument('--user', '-u', dest='_username', metavar='USER') cmd.add_argument('--passwd', '-p', dest='_password', metavar='PASSWORD', default='') cmd.add_argument('database', help='database') cmd.add_argument('nobjs', nargs='?', default=30, type=int, help='number of obejcts') # dump if HAS_SQL_ALCHEMY: def dump(self, database, tables=None, out=sys.stdout): """dump database structure""" from .tests import schema schema.dump(self.USERNAME, self.PASSWORD, self.HOSTNAME, database, tables=tables, out=out) cmd = commands.add(dump) cmd.add_argument('database') cmd.add_argument('--user', '-u', dest='_username', metavar='USER') cmd.add_argument('--passwd', '-p', dest='_password', metavar='PASSWORD', default='') cmd.add_argument('--tables', nargs='+') def wipe(self, yes=True): """delete all database content""" # connect to the database conn = self.connect() curs = conn.cursor() # databases list curs.execute("""SHOW DATABASES""") databases = [row[0] for row in curs.fetchall()] # delete databases for db in databases: if db not in ('mysql', 'information_schema'): curs.execute("""DROP DATABASE `%s`""" % db) # delete users curs.execute("""DELETE FROM mysql.user WHERE user <> 'root' and user <> ''""") # delete privileges curs.execute("""DELETE FROM mysql.db""") curs.execute("""DELETE FROM mysql.tables_priv""") curs.execute("""DELETE FROM mysql.columns_priv""") # flush privileges curs.execute("""FLUSH PRIVILEGES""") conn.commit() conn.close() cmd = commands.add(wipe) cmd.add_argument('--user', '-u', dest='_username', metavar='USER') cmd.add_argument('--passwd', '-p', dest='_password', metavar='PASSWORD', default='') cmd.add_argument('--yes', action='store_true', required=True) def setup(self): """setup test environment""" self.install() self.useradd('u1', admin=True) self.passwd('u1', 'u1') self.createdb('db1', description=u'Test database One') self.createdb('db2', description=u'Test database Tw\u00F6') self.populate('db1', 30) self.grant('u1', 'db1', select=True, insert=True, update=True, delete=True) cmd = commands.add(setup) cmd.add_argument('--user', '-u', dest='_username', metavar='USER') cmd.add_argument('--passwd', '-p', dest='_password', metavar='PASSWORD', default='') def main(conf=None): # setup logging handler = logging.StreamHandler() handler.setLevel(logging.DEBUG) logger = logging.getLogger('ltpdarepo') logger.addHandler(handler) logger.setLevel(logging.WARNING) # silence annoying mysql warnings warnings.filterwarnings('ignore', category=mysql.Warning) # dispatch command line app = Application(conf) app.dispatch()