view src/ltpdarepo/views/browse.py @ 186:8da56d2a70d6

Enormously speed up activity view.
author Daniele Nicolodi <daniele@grinta.net>
date Tue, 08 Nov 2011 12:46:54 +0100
parents c52ae3f0196f
children fbab144c296c
line wrap: on
line source

import re
from datetime import datetime
from dateutil.relativedelta import relativedelta

import dateutil.tz
import dateutil.parser

from flask import Blueprint, Markup, abort, g, request, render_template, json, make_response, url_for, redirect
from MySQLdb.cursors import DictCursor
from wtforms import Form
from wtforms.fields import Field
from wtforms.widgets import TextInput
from wtforms.validators import ValidationError, Optional

from ltpdarepo.security import require, view
from ltpdarepo.database import Database
from ltpdarepo.query import Query
from ltpdarepo.pagination import Pagination

try:
    from collections import OrderedDict
except ImportError:
    from ordereddict import OrderedDict


FIELDS = ('id', 'name', 'type', 'quantity', 'keywords', 'submitted', 'title', 'description',)
EXTRA = ('analysis', 'author', 'additional authors', 'comments', 'reference ids', 'created', )
MORE = ('version', 'ip', 'hostname', 'os', 'validated', 'vdate')
TIMESERIESFIELDS = ('id', 'name', 't0', 'nsecs', 'quantity', 'keywords', 'title', 'description')

PAGESIZE = 64


class Objs(object):
    columns = OrderedDict([
        ('id',                 'objmeta.obj_id'),
        ('name',               'objmeta.name'),
        ('type',               'objmeta.obj_type'),
        ('quantity',           'objmeta.quantity'),
        ('keywords',           'objmeta.keywords'),
        ('submitted',          'objmeta.submitted'),
        ('title',              'objmeta.experiment_title'),
        ('description',        'objmeta.experiment_desc'),
        ('analysis',           'objmeta.analysis_desc'),
        ('author',             'objmeta.author'),
        ('additional authors', 'objmeta.additional_authors'),
        ('comments',           'objmeta.additional_comments'),
        ('reference ids',      'objmeta.reference_ids'),
        ('created',            'objmeta.created')])

    def __init__(self, database):
        self._database = database
        self._orderby = None
        self._reverse = None
        self._limit = None
        self._where = None
        self._values = None

    @property
    def _query(self):
        columns = ", ".join("%s AS `%s`" % (col, name) for name, col in self.columns.iteritems())
        query = "SELECT " + columns + " FROM `%s`.objmeta" % self._database
        if self._where:
            query += " WHERE %s" % self._where
        if self._orderby:
            query += " ORDER BY %s" % self._orderby
        if self._reverse:
            query += " DESC"
        if self._limit:
            query += " LIMIT %d,%d" % self._limit
        return query

    @property
    def _count(self):
        query = "SELECT COUNT(*) FROM `%s`.objmeta" % self._database
        if self._where:
            query += " WHERE %s" % self._where
        return query

    def filter(self, where, values):
        self._where = where
        self._values = values
        return self

    def orderby(self, order, desc=False):
        self._orderby = order
        self._reverse = desc
        return self

    def limit(self, start, count=None):
        if count is None:
            return self.limit(0, start)
        self._limit = (start, count)
        return self

    def all(self):
        curs = g.db.cursor(DictCursor)
        curs.execute(self._query, self._values)
        return curs.fetchall()

    def count(self):
        curs = g.db.cursor()
        curs.execute(self._count, self._values)
        return curs.fetchone()[0]

    def __len__(self):
        return self.count()

    def __getitem__(self, item):
        obj = None
        limit = self._limit
        start = limit and limit[0] or 0
        self._limit = start, 1
        objs = self.all()
        if objs is not None:
            obj = objs[0]
        self._limit = limit
        return obj


class Timeseries(Objs):
    columns = Objs.columns.copy()
    columns.update([
        ('t0',                 'tsdata.t0'),
        ('nsecs',              'tsdata.nsecs')])

    def __init__(self, *args, **kwargs):
        super(Timeseries, self).__init__(*args, **kwargs)
        self._start = None
        self._end = None

    def timespan(self, start=None, end=None):
        self._start = start
        self._end = end
        return self

    @property
    def _query(self):
        columns = ", ".join("%s AS `%s`" % (col, name) for name, col in self.columns.iteritems())
        query = "SELECT " + columns + " FROM `%s`.objmeta, `%s`.tsdata" % (self._database, self._database)
        query += " WHERE objmeta.obj_id=tsdata.obj_id"
        if self._end:
            query += " AND tsdata.t0 <= '%s'" % self._end
        if self._start:
            query += " AND tsdata.t0 + INTERVAL tsdata.nsecs SECOND >= '%s'" % self._start
        if self._where:
            query += " AND %s" % self._where
        if self._orderby:
            query += " ORDER BY %s" % self._orderby
        if self._reverse:
            query += " DESC"
        if self._limit:
            query += " LIMIT %d,%d" % self._limit
        return query

    @property
    def _count(self):
        query = "SELECT COUNT(*) FROM `%s`.objmeta, `%s`.tsdata" % (self._database, self._database)
        query += " WHERE objmeta.obj_id=tsdata.obj_id"
        if self._end:
            query += " AND tsdata.t0 <= '%s'" % self._end
        if self._start:
            query += " AND tsdata.t0 + INTERVAL tsdata.nsecs SECOND >= '%s'" % self._start
        if self._where:
            query += " AND %s" % self._where
        if self._limit:
            query += " LIMIT %d,%d" % self._limit
        return query


class Index(dict):
    def __init__(self, **kwargs):
        self.__dict__ = self
        self.update(**kwargs)


class Indexes(OrderedDict):

    operators = {'int':      ('=', '>', '<'),
                 'tinyint':  ('=', '>', '<'),
                 'double':   ('=', '>', '<'),
                 'text':     ('LIKE', '='),
                 'enum':     ('=',),
                 'datetime': ('=', '>', '<')}

    def __init__(self, database, columns):
        super(Indexes, self).__init__()
        indexes = []

        # map column name to index name
        mapping = dict(((v.split('.')[1], k) for k, v in columns.iteritems()))
        # map index name to ordering position
        order = dict((v, i) for i, v in enumerate(columns.keys()))

        curs = g.db.cursor()
        curs.execute("""DESCRIBE `%s`.`%s`""" % (database, 'objmeta'))

        for column, kind, values in [self.parse(x) for x in curs.fetchall()]:
            name = mapping.get(column)
            if name:
                indexes.append(Index(name=name, type=kind,
                                     operators=Indexes.operators[kind],
                                     values=values))

        for index in sorted(indexes, key=lambda v: order.get(v.name, 100)):
            self[index.name] = index

    def parse(self, desc):
        column = desc[0]
        # extract column type
        kind = re.match(r'(\w+)', desc[1]).group(1)
        values = None
        if kind == 'enum':
            # extract enum values
            values = re.match(r'(\w+)\((.*)\)', desc[1]).group(2).split(',')
            values = [v.strip("'") for v in values]
        return column, kind, values


class datetimefield(datetime):
    """Timezone aware subclass of `datetime.datetime` with a new
    constructor that parses strings and a default string
    representation including the timezone name."""

    def __new__(cls, string):
        # parsing default is midnight today in UTC timezone
        default = datetime.utcnow().replace(
            tzinfo=dateutil.tz.tzutc(), hour=0, minute=0, second=0, microsecond=0)

        value = dateutil.parser.parse(string, dayfirst=True, yearfirst=True, default=default)
        return datetime.__new__(cls, value.year, value.month, value.day, value.hour,
                                value.minute, value.second, value.microsecond, value.tzinfo)

    def __str__(self):
        return self.strftime('%Y-%m-%d %H:%M:%S %Z')


class DateTimeJSONEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.strftime('%Y-%m-%dT%H:%M:%S%Z')
        return super(DateTimeJSONEncoder, self).default(obj)


class Request(object):
    """Retrieves and validates query parameters from the request"""

    validators = {'int': int,
                  'tinyint': int,
                  'double': float,
                  'text': unicode,
                  'enum': unicode,
                  'datetime': datetimefield}

    def __init__(self, formdata, columns, indexes={}):
        # incoming data
        self.formdata = formdata
        # column names mapping. used to validate orderby parameter
        self.columns = columns
        # indexes description. used to validate query parameters
        self.indexes = indexes

        # process incoming data
        self.process(self.formdata)

    @staticmethod
    def fromqs(string, columns, indexes={}):
        from urlparse import parse_qs
        from werkzeug.datastructures import MultiDict
        # parse query string
        formdata = parse_qs(string, keep_blank_values=True, strict_parsing=True)
        formdata = MultiDict(formdata)
        return Request(formdata, columns, indexes)

    def process(self, formdata):
        fields = formdata.getlist('field')
        ops = formdata.getlist('operator')
        values = formdata.getlist('value')
        self.parsequery(zip(fields, ops, values))

    def parsequery(self, items):
        query, vals, parsed = [], [], []
        for field, op, value in items:
            # index description
            index = self.indexes[field]
            validate = self.validators[index['type']]
            # validate operator
            if op not in index['operators']: op = '='
            # validate value
            error = False
            try:
                value = validate(value)
                # add to query parameters
                vals.append(value)
                query.append('%s %s %%s' % (self.columns[field], op))
            except ValueError:
                error = True
            finally:
                # append to parse results
                parsed.append((field, op, value, error))

        self.where = ' AND '.join(query)
        self.vals = vals
        self.criteria = parsed

    @property
    def order(self):
        """Current ordering in an ordered view"""
        # validation by dict lookup
        order = self.formdata.get('o', 'id')
        order = self.columns[order]
        # validation by conversion to int
        reverse = self.formdata.get('r', 0, type=int)
        return order, reverse

    @property
    def pagesize(self):
        """Current page size in a paginated view"""
        # validation by conversion to int
        return self.formdata.get('n', PAGESIZE, type=int)

    @property
    def page(self):
        """Current page in a paginated view"""
        # validation by conversion to int
        return self.formdata.get('p', 1, type=int)

    @property
    def query(self):
        return self.where, self.vals

    def tostring(self):
        query = ["%s %s '%s'" % (field, op, value) for field, op, value, err in self.criteria if not err]
        string = ' AND&nbsp;'.join(s.replace(' ', '&nbsp;') for s in query)
        return Markup(string)


class DateTimeField(Field):
    """Text input and that uses `dateutil.parser.parse` to parse the
    entered value into a `datetime.datetime` object. This differs from
    `wtforms.ext.dateutil.field.DateTimeField` in parameters used for
    the datetime parsing and in the display format of the field value."""

    widget = TextInput()

    def __init__(self, label=None, validators=None, **kwargs):
        super(DateTimeField, self).__init__(label, validators, **kwargs)
        self.parseargs = dict(dayfirst=True, yearfirst=True)

    def _value(self):
        if self.data:
            return self.data.strftime('%Y-%m-%d %H:%M:%S %Z')
        return self.raw_data and u' '.join(self.raw_data) or u''

    def process_formdata(self, valuelist):
        if valuelist:
            datestr = u' '.join(valuelist)
            if not datestr:
                self.data = None
                raise ValidationError(self.gettext(u'Input a datetime value'))

            # parsing default is midnight today in UTC timezone
            default = datetime.utcnow().replace(
                tzinfo=dateutil.tz.tzutc(), hour=0, minute=0, second=0, microsecond=0)
            try:
                self.data = dateutil.parser.parse(datestr, default=default, **self.parseargs)
            except ValueError:
                self.data = None
                raise ValidationError(self.gettext(u'Invalid datetime input'))


class Timerange(Form):
    """Form used to specify time ranges in timeseries search query"""

    t1 = DateTimeField(validators=[Optional()])
    t2 = DateTimeField(validators=[Optional()])

    def __init__(self, *args, **kwargs):
        super(Timerange, self).__init__(*args, **kwargs)
        self.validate()

    @property
    def range(self):
        return self.t1.data, self.t2.data


app = Blueprint('browse', __name__)


@app.route('/<database>/')
@require('user')
def database(database):
    with view('database', database):
        db = Database.load(database)
        if db is None:
            # not found
            abort(404)
        curs = g.db.cursor(DictCursor)
        curs.execute("""SELECT id, title, db FROM queries WHERE %s RLIKE db""", database)
        queries = curs.fetchall()
        return render_template('database.html', database=db, queries=queries)


@app.route('/<database>/objs')
@require('user')
def browse(database):
    with view('database', database):
        db = Database.load(database)
        if db is None:
            # not found
            abort(404)

        # parse request
        r = Request(request.args, Objs.columns)

        count = Objs(database).count()
        batch = Pagination(r.page, size=r.pagesize, count=count)
        objs  = Objs(database).orderby(*r.order).limit(*batch.limits).all()

        return render_template('browse.html', objs=objs, batch=batch,
                               fields=FIELDS, database=db)


@app.route('/<database>/<int:objid>')
@require('user')
def obj(database, objid):
    with view('database', database):
        db = Database.load(database)
        if db is None:
            # not found
            abort(404)
        obj = Objs(database).filter('obj_id=%s', objid)[0]
        if obj is None:
            # not found
            abort(404)

        fields = FIELDS + EXTRA

        # check for mat representation
        curs = g.db.cursor()
        curs.execute("""SELECT obj_id
                        FROM `%s`.bobjs WHERE obj_id=%%s""" % database, objid)
        row = curs.fetchone()
        if row is not None:
            obj['mat'] = True

        # check for xml representation
        curs = g.db.cursor()
        curs.execute("""SELECT uuid, xml LIKE %%s
                        FROM `%s`.objs WHERE id=%%s""" % database, ('<?xml %', objid))
        uuid, xml = curs.fetchone()
        if xml:
            obj['xml'] = True

        # add uuid
        obj['uuid'] = uuid
        fields += ('uuid', )

        # collect additional informations
        if obj['type'] == 'ao':
            curs = g.db.cursor(DictCursor)
            curs.execute("""SELECT data_type AS `data type`,
                                   description AS `description`
                            FROM `%s`.ao WHERE obj_id=%%s""" % database, objid)
            fields += ('data type', 'data description', )
            details = curs.fetchone()
            obj.update(details)

            if   obj['data type'] == 'tsdata':
                curs = g.db.cursor(DictCursor)
                curs.execute("""SELECT t0, nsecs, fs, xunits, yunits,
                                       t0 - INTERVAL toffset/1000 SECOND AS `reference time`
                                FROM `%s`.tsdata WHERE obj_id=%%s""" % database, objid)
                details = curs.fetchone()
                if details is not None:
                    obj.update(details)
                fields += ('reference time', 't0', 'nsecs', 'fs', 'xunits', 'yunits', )

            elif obj['data type'] == 'xydata':
                curs = g.db.cursor(DictCursor)
                curs.execute("""SELECT xunits, yunits
                                FROM `%s`.xydata WHERE obj_id=%%s""" % database, objid)
                details = curs.fetchone()
                if details is not None:
                    obj.update(details)
                fields += ('xunits', 'yunits', )

            elif obj['data type'] == 'fsdata':
                curs = g.db.cursor(DictCursor)
                curs.execute("""SELECT fs, xunits, yunits
                                FROM `%s`.fsdata WHERE obj_id=%%s""" % database, objid)
                details = curs.fetchone()
                if details is not None:
                    obj.update(details)
                fields += ('fs', 'xunits', 'yunits', )

            elif obj['data type'] == 'cdata':
                curs = g.db.cursor(DictCursor)
                curs.execute("""SELECT yunits
                                FROM `%s`.cdata WHERE obj_id=%%s""" % database, objid)
                details = curs.fetchone()
                if details is not None:
                    obj.update(details)
                fields += ('yunits', )

        return render_template('obj.html', obj=obj, database=db, fields=fields)


@app.route('/<database>/<int:objid>/<frmt>')
@require('user')
def download(database, objid, frmt):
    with view('database', database):
        db = Database.load(database)
        if db is None:
            # not found
            abort(404)

        if frmt not in ('xml', 'mat'):
            # not found
            abort(404)

        if frmt == 'xml':
            curs = g.db.cursor()
            curs.execute("SELECT xml FROM `%s`.objs WHERE id=%%s" % database, objid)
            mimetype = 'text/xml'
        if frmt == 'mat':
            curs = g.db.cursor()
            curs.execute("SELECT mat FROM `%s`.bobjs WHERE obj_id=%%s" % database, objid)
            mimetype = 'application/matlab'

        data = curs.fetchone()
        if data is None:
            # not found
            abort(404)
        data = data[0] or ''

        # construct response
        response = make_response(data)
        response.mimetype = mimetype
        filename = '%s-%s.%s' % (database, objid, frmt)
        response.headers.add('Content-Disposition', 'attachment', filename=filename)
        return response


@app.route('/<database>/search')
@require('user')
def search(database):
    with view('database', database):
        db = Database.load(database)
        if db is None:
            # not found
            abort(404)

        # parse request
        r = Request(request.args, Objs.columns)
        q = '%' + request.args.get('q', '') + '%'

        # collect objects
        count = Objs(database).filter('name LIKE %s', q).count()
        batch = Pagination(r.page, size=r.pagesize, count=count)
        objs  = Objs(database).filter('name LIKE %s', q).orderby(*r.order).limit(*batch.limits).all()

        return render_template('browse.html', objs=objs, batch=batch,
                               fields=FIELDS, database=db)


@app.route('/<database>/query')
@require('user')
def query(database):
    with view('database', database):
        db = Database.load(database)
        if db is None:
            # not found
            abort(404)

        # applicable search criteria
        indexes = Indexes(database, Objs.columns)

        # parse request
        r = Request(request.args, Objs.columns, indexes)

        # collect objects
        count = Objs(database).filter(*r.query).count()
        batch = Pagination(r.page, size=r.pagesize, count=count)
        objs  = Objs(database).filter(*r.query).orderby(*r.order).limit(*batch.limits).all()

        return render_template('query.html', objs=objs, batch=batch,
                               fields=FIELDS, database=db,
                               indexes=indexes, query=r.criteria)


@app.route('/<database>/query/<int:id>')
@require('user')
def namedquery(database, id):
    with view('database', database):
        db = Database.load(database)
        if db is None:
            # not found
            abort(404)
        query = Query.load(id)
        if query is None:
            # not found
            abort(404)

        # redirect to query view
        return redirect(url_for('browse.%s' % query.kind, database=database, **query.params))


@app.route('/<database>/timeseries')
@require('user')
def timeseries(database):
    with view('database', database):
        db = Database.load(database)
        if db is None:
            # not found
            abort(404)

        # applicable search criteria
        indexes = Indexes(database, Timeseries.columns)

        # parse reuest
        r = Request(request.args, Timeseries.columns, indexes)
        t = Timerange(request.args)

        # collect objects
        count = Timeseries(database).filter(*r.query).timespan(*t.range).count()
        batch = Pagination(r.page, size=r.pagesize, count=count)
        objs  = Timeseries(database).filter(*r.query).timespan(*t.range).orderby(*r.order).limit(*batch.limits).all()

        return render_template('timerange.html', objs=objs, batch=batch,
                               fields=TIMESERIESFIELDS, database=db,
                               indexes=indexes, query=r.criteria, timerange=t)


@app.route('/<database>/activity')
@app.route('/<database>/activity/<when>')
@require('user')
def activity(database, when=None):
    with view('database', database):
        db = Database().load(database)
        if db is None:
            # not found
            abort(404)

        # defaults
        today = datetime.today().date()
        span = 'MONTH'

        if when is not None:
            if when.count('-') == 2:
                span = 'DAY'
                today = datetime.strptime(when, '%Y-%m-%d').date()
            if when.count('-') == 1:
                span = 'MONTH'
                today = datetime.strptime(when, '%Y-%m').date()


        if span == 'MONTH':
            begin = today + relativedelta(day=1)
            end = begin + relativedelta(months=1, days=-1)
            prev = (begin + relativedelta(months=-1)).strftime('%Y-%m')
            next = (begin + relativedelta(months=+1)).strftime('%Y-%m')
            increment = 'DAY'
            dt = relativedelta(days=1)

        if span == 'DAY':
            begin = today + relativedelta(hour=0)
            end = begin + relativedelta(hour=0, days=1)
            prev = (begin + relativedelta(days=-1)).strftime('%Y-%m-%d')
            next = (begin + relativedelta(days=+1)).strftime('%Y-%m-%d')
            increment = 'HOUR'
            dt = relativedelta(hours=1)

        curs = g.db.cursor()

        # prepare timeintervals table with proper intervals
        curs.execute("""CALL makeintervals(%s, %s, 1, %s)""", (begin, end, increment))

        # collect number of objects for each interval
        curs.execute("""SELECT ts.begin AS date, COUNT(objmeta.obj_id) AS count
                        FROM timeintervals AS ts
                        LEFT OUTER JOIN (SELECT obj_id, submitted FROM `%s`.objmeta
                                          WHERE submitted > %%s AND submitted < %%s) AS objmeta
                                     ON (objmeta.submitted BETWEEN ts.begin AND ts.end)
                        GROUP BY ts.begin, ts.end""" % database, (begin, end + dt))

        activity = curs.fetchall()

        from math import ceil, floor, log10
        nmax = max(num for day, num in activity) or 1
        base = 10**floor(log10(nmax))
        if base < 10: base = 10.0
        nmax = ceil(nmax / base)*base

        return render_template('activity.html', database=db, activity=activity, nmax=nmax,
                               curr=today, prev=prev, next=next, span=span, dt=dt)


module = app