view allan.py @ 0:3dcd4cfd8f82

Import
author Daniele Nicolodi <daniele.nicolodi@obspm.fr>
date Fri, 18 Apr 2014 18:44:54 +0200
parents
children ebc4e1d7c32f
line wrap: on
line source

import numpy as np
from scipy import polyfit, polyval
import matplotlib.pyplot as plt
import numexpr

__all__ = ['outliers', 'detrend', 'fastdetrend', 
           'adev', 'xadev', 'xmdev', 'xtdev', 'xavar', 'xmvar', ]


def outliers(y, alpha=6, remove=False):
    """Detect and optionally remove outliers"""
    
    m = np.mean(y)
    v = np.var(y)

    outliers = np.abs(y - m) > (alpha * np.sqrt(v))

    if remove:
        x = lambda z: z.nonzero()[0]
        y[outliers] = np.interp(x(outliers), x(~outliers), y[~outliers])

    return np.nonzero(outliers)[0]


def detrend(y, order=1):
    """Detrend"""

    x = np.arange(y.size)
    p = polyfit(x, y, order)
    y = y - polyval(p, x)
    return y


def fastdetrend(y, order=1, downsample=1000):
    # time base
    x = np.arange(y.size)
    # compute detrend polynomial from downsampled data
    p = polyfit(x[::downsample], y[::downsample], order)
    
    # construct numexpr expression
    expr = []
    args = {'y': y, 'x': x}
    for i in range(order + 1):
        expr.append("p%d" % i + "*x" * i)
        args["p%d" % i] = p[order - i]
    expr = "y - (%s)" % " + ".join(expr)

    # evaluate expression
    y = numexpr.evaluate(expr, local_dict=args)
    return y


def adev(x, tau, sampl=1.0):
    """Allan deviation"""

    x = np.asarray(x)
    tau = np.asarray(tau)

    # allocate output vectors
    adev = np.zeros(tau.size)
    dadev = np.zeros(tau.size)

    # samples
    n = x.size
    # partitioning
    p = np.floor(tau * sampl).astype(int)

    for i, m in enumerate(p):
        d = x[0:n - n % m].reshape(-1, m)
        y = np.mean(d, axis=1)
        adev[i] = np.sqrt(0.5 * np.mean(np.diff(y)**2))
        dadev[i] = adev[i] / np.sqrt(y.size)

    return adev, dadev
    

from _allan import _xmvar, _xmvar2, _xavar

# Functions to compute Allan deviation and modified Allan deviation
# from time error data.  For an oscillator described by the equation
# v(t) = v0 * cos(2*pi * f0 * t + phi(t)) where f0 is the nominal
# oscillator frequency the time deviation is computed as: x(t) =
# phi(t) / (2*pi * f0)
#
# The Symmetricom 5115A phase analyzer outputs data in units of cycles
# of the input signal.  To obtain time deviation from the data stream
# d(t) it is sufficient to compute: x(t) = d(t) / f0 where f0 is the
# nominal frequency of the oscillator under test

def xmvar0(x, tau, tau0=1.0):
    
    x = np.asarray(x)
    tau = np.asarray(tau)

    # allocate output vectors
    mvar = np.zeros(tau.size)

    # partitioning
    p = (tau // tau0).astype(int)

    for k, m in enumerate(p):
        mvar[k] = _xmvar(x, m)
        # c = 0
        # for j in xrange(x.size - 3 * m + 1):
        #     v = 0
        #     for i in xrange(j, j + m):
        #         v += (x[i + 2 * m] - 2 * x[i + m] + x[i]) / m
        #     c += v*v
        # mavar[k] = c / (2.0 * m*m * (n - 3 * m + 1))

    return np.sqrt(mvar) / tau0


def xmvar(x, tau, tau0=1.0):
    
    x = np.asarray(x)
    tau = np.asarray(tau)

    # partitioning
    p = (tau // tau0).astype(int)

    return _xmvar2(x, p) / (tau0 * tau0)


def xmdev(x, tau, tau0=1.0):
    
    return np.sqrt(xmvar(x, tau, tau0))


def xtdev(x, tau, tau0=1.0):
    
    return np.sqrt(xmvar(x, tau, tau0)) * tau / np.sqrt(3.0)


def xavar(x, tau, tau0=1.0):
    
    x = np.asarray(x)
    tau = np.asarray(tau)

    # partitioning
    p = (tau // tau0).astype(int)

    return _xavar(x, p) / (tau0 * tau0)


def xadev(x, tau, tau0=1.0):
    
    return np.sqrt(xavar(x, tau, tau0))