sfa_nodes.py :  » Math » Modular-toolkit-for-Data-Processing » MDP-2.6 » mdp » nodes » Python Open Source

Python Open Source Python
3.Aspect Oriented
6.Business Application
7.Chart Report
8.Content Management Systems
15.Game 2D 3D
21.Issue Tracker
22.Language Interface
25.Media Sound Audio
30.Project Management
34.Template Engines
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
Python Open Source » Math » Modular toolkit for Data Processing 
Modular toolkit for Data Processing » MDP 2.6 » mdp » nodes » sfa_nodes.py
import mdp
from mdp import numx,Node,NodeException
from mdp.utils import (mult,pinv,symeig,CovarianceMatrix,QuadraticForm

class SFANode(Node):
    """Extract the slowly varying components from the input data.
    More information about Slow Feature Analysis can be found in
    Wiskott, L. and Sejnowski, T.J., Slow Feature Analysis: Unsupervised
    Learning of Invariances, Neural Computation, 14(4):715-770 (2002).

    Internal variables of interest:
    self.avg -- Mean of the input data (available after training)
    self.sf -- Matrix of the SFA filters (available after training)
    self.d -- Delta values corresponding to the SFA components
              (generalized eigenvalues).
              (See the docs of the 'get_eta_values' method for
              more information)
    def __init__(self, input_dim=None, output_dim=None, dtype=None):
        super(SFANode, self).__init__(input_dim, output_dim, dtype)

        # init two covariance matrices
        # one for the input data
        self._cov_mtx = CovarianceMatrix(dtype)
        # one for the derivatives
        self._dcov_mtx = CovarianceMatrix(dtype)

        # set routine for eigenproblem
        self._symeig = symeig
        # SFA eigenvalues and eigenvectors, will be set after training
        self.d = None
        self.sf = None
    def _get_supported_dtypes(self):
        return ['float32', 'float64']

    def time_derivative(self, x):
        """Compute the linear approximation of the time derivative."""
        # this is faster than a linear_filter or a weave-inline solution
        return x[1:, :]-x[:-1, :]

    def _set_range(self):
        if self.output_dim is not None and self.output_dim <= self.input_dim:
            # (eigenvalues sorted in ascending order)
            rng = (1, self.output_dim)
            # otherwise, keep all output components
            rng = None
            self.output_dim = self.input_dim
        return rng

    def _train(self, x):
        ## update the covariance matrices
        # Cut the final point to avoid a trivial solution in special cases. (?)
        # This also makes sense if the last data point is duplicated as the
        # first data point of the next chunk, in which case the chunking
        # of the data has no influence on the result.
        self._cov_mtx.update(x[:-1, :])

    def _stop_training(self, debug=False):
        ##### request the covariance matrices and clean up
        self.cov_mtx, self.avg, self.tlen = self._cov_mtx.fix()
        del self._cov_mtx
        self.dcov_mtx, davg, dtlen = self._dcov_mtx.fix()
        del self._dcov_mtx

        rng = self._set_range()
        #### solve the generalized eigenvalue problem
        # the eigenvalues are already ordered in ascending order
            self.d, self.sf = self._symeig(self.dcov_mtx, self.cov_mtx,
                                     range=rng, overwrite=(not debug))
            d = self.d
            # check that we get only *positive* eigenvalues
            if d.min() < 0:
                err_msg = ("Got negative eigenvalues: %s."
                           " You may either set output_dim to be smaller,"
                           " or prepend the SFANode with a PCANode(reduce=True)"
                           " or PCANode(svd=True)"% str(d))
                raise NodeException(err_msg)
        except SymeigException, exception:
            errstr = str(exception)+"\n Covariance matrices may be singular."
            raise NodeException(errstr)

        # delete covariance matrix if no exception occurred
        del self.cov_mtx
        del self.dcov_mtx

        # store bias
        self._bias = mult(self.avg, self.sf)

    def _execute(self, x, range=None):
        """Compute the output of the slowest functions.
        if 'range' is a number, then use the first 'range' functions.
        if 'range' is the interval=(i,j), then use all functions
                   between i and j."""
        if range:
            if isinstance(range, (list, tuple)):
                sf = self.sf[:, range[0]:range[1]]
                bias = self._bias[:, range[0]:range[1]]
                sf = self.sf[:, 0:range]
                bias = self._bias[:, 0:range]
            sf = self.sf
            bias = self._bias
        return mult(x, sf) - bias

    def _inverse(self, y):
        return mult(y, pinv(self.sf)) + self.avg

    def get_eta_values(self, t=1):
        """Return the eta values of the slow components learned during
        the training phase. If the training phase has not been completed
        yet, call stop_training.
        The delta value of a signal is a measure of its temporal
        variation, and is defined as the mean of the derivative squared,
        i.e. delta(x) = mean(dx/dt(t)^2).  delta(x) is zero if
        x is a constant signal, and increases if the temporal variation
        of the signal is bigger.
        The eta value is a more intuitive measure of temporal variation,
        defined as
            eta(x) = t/(2*pi) * sqrt(delta(x))
        If x is a signal of length 't' which consists of a sine function
        that accomplishes exactly N oscillations, then eta(x)=N.
        Input arguments:
        t -- Sampling frequency in Hz
             The original definition in (Wiskott and Sejnowski, 2002)
             is obtained for t = number of training data points, while
             for t=1 (default), this corresponds to the beta-value defined in
             (Berkes and Wiskott, 2005).
        if self.is_training():
        return self._refcast(t / (2 * numx.pi) * numx.sqrt(self.d))

class SFA2Node(SFANode):
    """Get an input signal, expand it in the space of
    inhomogeneous polynomials of degree 2 and extract its slowly varying
    components. The 'get_quadratic_form' method returns the input-output
    function of one of the learned unit as a QuadraticForm object.
    See the documentation of mdp.utils.QuadraticForm for additional

    More information about Slow Feature Analysis can be found in
    Wiskott, L. and Sejnowski, T.J., Slow Feature Analysis: Unsupervised
    Learning of Invariances, Neural Computation, 14(4):715-770 (2002)."""

    def __init__(self, input_dim=None, output_dim=None, dtype=None):
        self._expnode = mdp.nodes.QuadraticExpansionNode(input_dim=input_dim,
        super(SFA2Node, self).__init__(input_dim, output_dim, dtype)

    def is_invertible(self):
        """Return True if the node can be inverted, False otherwise."""
        return False

    def _set_input_dim(self, n):
        self._expnode.input_dim = n
        self._input_dim = n
    def _train(self, x):
        # expand in the space of polynomials of degree 2
        super(SFA2Node, self)._train(self._expnode(x))

    def _set_range(self):
        if (self.output_dim is not None) and (
            self.output_dim <= self._expnode.output_dim):
            # (eigenvalues sorted in ascending order)
            rng = (1, self.output_dim)
            # otherwise, keep all output components
            rng = None        
        return rng
    def _stop_training(self, debug=False):
        super(SFA2Node, self)._stop_training(debug)

        # set the output dimension if necessary
        if self.output_dim is None:
            self.output_dim = self._expnode.output_dim

    def _execute(self, x, range=None):
        """Compute the output of the slowest functions.
        if 'range' is a number, then use the first 'range' functions.
        if 'range' is the interval=(i,j), then use all functions
                   between i and j."""
        return super(SFA2Node, self)._execute(self._expnode(x))

    def get_quadratic_form(self, nr):
        Return the matrix H, the vector f and the constant c of the
        quadratic form 1/2 x'Hx + f'x + c that defines the output
        of the component 'nr' of the SFA node.
        if self.sf is None:
        sf = self.sf[:, nr]
        c = -mult(self.avg, sf)
        n = self.input_dim
        f = sf[:n]
        h = numx.zeros((n, n), dtype=self.dtype)
        k = n
        for i in range(n):
            for j in range(n):
                if j > i:
                    h[i, j] = sf[k]
                    k = k+1
                elif j == i:
                    h[i, j] = 2*sf[k]
                    k = k+1
                    h[i, j] = h[j, i]

        return QuadraticForm(h, f, c, dtype=self.dtype)


### old weave inline code to perform the time derivative

# weave C code executed in the function SfaNode.time_derivative
##   for( int i=0; i<columns; i++ ) {
##     for( int j=0; j<rows-1; j++ ) {
##       deriv(j,i) = x(j+1,i)-x(j,i);
##     }
##   }
## """

# it was called like that:
## def time_derivative(self, x):
##     rows = x.shape[0]
##     columns = x.shape[1]
##     deriv = numx.zeros((rows-1, columns), dtype=self.dtype)

##     weave.inline(_TDERIVATIVE_1ORDER_CCODE,['rows','columns','deriv','x'],
##                  type_factories = weave.blitz_tools.blitz_type_factories,
##                  compiler='gcc',extra_compile_args=['-O3']);
##     return deriv
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.