signal_node.py :  » Math » Modular-toolkit-for-Data-Processing » MDP-2.6 » mdp » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Math » Modular toolkit for Data Processing 
Modular toolkit for Data Processing » MDP 2.6 » mdp » signal_node.py
import cPickle as _cPickle
import inspect as _inspect

import mdp
from mdp import numx


class NodeException(mdp.MDPException):
    """Base class for exceptions in Node subclasses."""
    pass

class TrainingException(NodeException):
    """Base class for exceptions in the training phase."""
    pass

class TrainingFinishedException(TrainingException):
    """Raised when the 'train' method is called although the
    training phase is closed."""
    pass

class IsNotTrainableException(TrainingException):
    """Raised when the 'train' method is called although the
    node is not trainable."""
    pass

class IsNotInvertibleException(NodeException):
    """Raised when the 'inverse' method is called although the
    node is not invertible."""
    pass


class NodeMetaclass(type):
    """This Metaclass is meant to overwrite doc strings of methods like
    execute, stop_training, inverse with the ones defined in the corresponding
    private methods _execute, _stop_training, _inverse, etc...

    This makes it possible for subclasses of Node to document the usage
    of public methods, without the need to overwrite the ancestor's methods.
    """

    # methods that can overwrite docs:
    DOC_METHODS = ['_train', '_stop_training', '_execute', '_inverse',
                   '_label', '_prob']
    
    def __new__(cls, classname, bases, members):
        # select private methods that can overwrite the docstring
        wrapper_names = []
        priv_infos = []
        for privname in cls.DOC_METHODS:
            if privname in members:
                priv_info = cls._get_infodict(members[privname])
                # if the docstring is empty, don't overwrite it
                if not priv_info['doc']:
                    continue
                # get the name of the corresponding public method
                pubname = privname[1:]
                # If the public method has been overwritten in this
                # subclass, then keep it.
                # This is also important because we use super in the wrapper
                # (so the public method in this class would be missed).
                if pubname not in members:
                    wrapper_names.append(pubname)
                    priv_infos.append(priv_info)
        new_cls = super(NodeMetaclass, cls).__new__(cls, classname,
                                                    bases, members)
        # now add the wrappers
        for wrapper_name, priv_info in zip(wrapper_names, priv_infos):
            # Note: super works because we never wrap in the defining class
            wrapped_method = getattr(super(new_cls, new_cls), wrapper_name)
            wrapped_info = cls._get_infodict(wrapped_method)
            priv_info['name'] = wrapper_name
            # Preserve the signature only if it does not end with kwargs
            # (this is important for binodes).
            # Note that this relies on the exact name 'kwargs', if this causes
            # problems we could switch to looking for ** in the signature.
            if not wrapped_info['argnames'][-1] == "kwargs":
                priv_info['signature'] = wrapped_info['signature']
                priv_info['argnames'] = wrapped_info['argnames']
                priv_info['defaults'] = wrapped_info['defaults']
            setattr(new_cls, wrapper_name,
                    cls._wrap_method(priv_info, new_cls))
        return new_cls
         
    # The next two functions (originally called get_info, wrapper)
    # are adapted versions of functions in the
    # decorator module by Michele Simionato
    # Version: 2.3.1 (25 July 2008)
    # Download page: http://pypi.python.org/pypi/decorator
    # Note: Moving these functions to utils would cause circular import.
    
    @staticmethod
    def _get_infodict(func):
        """
        Returns an info dictionary containing:
        - name (the name of the function : str)
        - argnames (the names of the arguments : list)
        - defaults (the values of the default arguments : tuple)
        - signature (the signature : str)
        - doc (the docstring : str)
        - module (the module name : str)
        - dict (the function __dict__ : str)
        
        >>> def f(self, x=1, y=2, *args, **kw): pass
    
        >>> info = getinfo(f)
    
        >>> info["name"]
        'f'
        >>> info["argnames"]
        ['self', 'x', 'y', 'args', 'kw']
        
        >>> info["defaults"]
        (1, 2)
    
        >>> info["signature"]
        'self, x, y, *args, **kw'
        """
        regargs, varargs, varkwargs, defaults = _inspect.getargspec(func)
        argnames = list(regargs)
        if varargs:
            argnames.append(varargs)
        if varkwargs:
            argnames.append(varkwargs)
        signature = _inspect.formatargspec(regargs,
                                           varargs,
                                           varkwargs,
                                           defaults,
                                           formatvalue=lambda value: "")[1:-1]
        return dict(name=func.__name__, argnames=argnames, signature=signature,
                    defaults = func.func_defaults, doc=func.__doc__,
                    module=func.__module__, dict=func.__dict__,
                    globals=func.func_globals, closure=func.func_closure)
    
    @staticmethod
    def _wrap_function(original_func, wrapper_infodict):
        """Return a wrapped version of func.
        
        original_func -- The function to be wrapped.
        wrapper_infodict -- The infodict to be used for constructing the
            wrapper.
        """
        src = ("lambda %(signature)s: _original_func_(%(signature)s)" %
               wrapper_infodict)
        wrapped_func = eval(src, dict(_original_func_=original_func))
        wrapped_func.__name__ = wrapper_infodict['name']
        wrapped_func.__doc__ = wrapper_infodict['doc']
        wrapped_func.__module__ = wrapper_infodict['module']
        wrapped_func.__dict__.update(wrapper_infodict['dict'])
        wrapped_func.func_defaults = wrapper_infodict['defaults']
        wrapped_func.undecorated = wrapper_infodict
        return wrapped_func
    
    @staticmethod
    def _wrap_method(wrapper_infodict, cls):
        """Return a wrapped version of func.
        
        wrapper_infodict -- The infodict to be used for constructing the
            wrapper.
        cls -- Class to which the wrapper method will be added, this is used
            for the super call.
        """
        src = ("lambda %(signature)s: " % wrapper_infodict+
               "super(_wrapper_class_, _wrapper_class_)." +
               "%(name)s(%(signature)s)" % wrapper_infodict)
        wrapped_func = eval(src, {"_wrapper_class_": cls})
        wrapped_func.__name__ = wrapper_infodict['name']
        wrapped_func.__doc__ = wrapper_infodict['doc']
        wrapped_func.__module__ = wrapper_infodict['module']
        wrapped_func.__dict__.update(wrapper_infodict['dict'])
        wrapped_func.func_defaults = wrapper_infodict['defaults']
        wrapped_func.undecorated = wrapper_infodict
        return wrapped_func



class Node(object):
    """A 'Node' is the basic building block of an MDP application.

    It represents a data processing element, like for example a learning
    algorithm, a data filter, or a visualization step.
    Each node can have one or more training phases, during which the
    internal structures are learned from training data (e.g. the weights
    of a neural network are adapted or the covariance matrix is estimated)
    and an execution phase, where new data can be processed forwards (by
    processing the data through the node) or backwards (by applying the
    inverse of the transformation computed by the node if defined).

    Nodes have been designed to be applied to arbitrarily long sets of data:
    if the underlying algorithms supports it, the internal structures can
    be updated incrementally by sending multiple batches of data (this is
    equivalent to online learning if the chunks consists of single
    observations, or to batch learning if the whole data is sent in a
    single chunk). It is thus possible to perform computations on amounts
    of data that would not fit into memory or to generate data on-the-fly.

    A 'Node' also defines some utility methods, like for example
    'copy' and 'save', that return an exact copy of a node and save it
    in a file, respectively. Additional methods may be present, depending
    on the algorithm.

    Node subclasses should take care of overwriting (if necessary)
    the functions is_trainable, _train, _stop_training, _execute,
    is_invertible, _inverse, _get_train_seq, and _get_supported_dtypes.
    If you need to overwrite the getters and setters of the
    node's properties refer to the docstring of get/set_input_dim,
    get/set_output_dim, and get/set_dtype.
    """

    __metaclass__ = NodeMetaclass

    def __init__(self, input_dim = None, output_dim = None, dtype = None):
        """If the input dimension and the output dimension are
        unspecified, they will be set when the 'train' or 'execute'
        method is called for the first time.
        If dtype is unspecified, it will be inherited from the data
        it receives at the first call of 'train' or 'execute'.

        Every subclass must take care of up- or down-casting the internal
        structures to match this argument (use _refcast private
        method when possible).
        """
        # initialize basic attributes
        self._input_dim = None
        self._output_dim = None
        self._dtype = None
        # call set functions for properties
        self.set_input_dim(input_dim)
        self.set_output_dim(output_dim)
        self.set_dtype(dtype)
 
        # skip the training phase if the node is not trainable
        if not self.is_trainable():
            self._training = False
            self._train_phase = -1
            self._train_phase_started = False
        else:
            # this var stores at which point in the training sequence we are
            self._train_phase = 0          
            # this var is False if the training of the current phase hasn't
            #  started yet, True otherwise
            self._train_phase_started = False
            # this var is False if the complete training is finished
            self._training = True
            
    ### properties

    def get_input_dim(self):
        """Return input dimensions."""
        return self._input_dim

    def set_input_dim(self, n):
        """Set input dimensions.
        
        Perform sanity checks and then calls self._set_input_dim(n), which
        is responsible for setting the internal attribute self._input_dim.
        Note that subclasses should overwrite self._set_input_dim
        when needed."""
        if n is None:
            pass
        elif (self._input_dim is not None) and (self._input_dim !=  n):
            msg = ("Input dim are set already (%d) "
                   "(%d given)!" % (self.input_dim, n))
            raise NodeException(msg)
        else:
            self._set_input_dim(n)

    def _set_input_dim(self, n):
        self._input_dim = n

    input_dim = property(get_input_dim,
                         set_input_dim,
                         doc = "Input dimensions")

    def get_output_dim(self):
        """Return output dimensions."""
        return self._output_dim

    def set_output_dim(self, n):
        """Set output dimensions.
        Perform sanity checks and then calls self._set_output_dim(n), which
        is responsible for setting the internal attribute self._output_dim.
        Note that subclasses should overwrite self._set_output_dim
        when needed."""
        if n is None:
            pass
        elif (self._output_dim is not None) and (self._output_dim != n):
            msg = ("Output dim are set already (%d) "
                   "(%d given)!" % (self.output_dim, n))
            raise NodeException(msg)
        else:
            self._set_output_dim(n)

    def _set_output_dim(self, n):
        self._output_dim = n

    output_dim = property(get_output_dim,
                          set_output_dim,
                          doc = "Output dimensions")

    def get_dtype(self):
        """Return dtype."""
        return self._dtype
    
    def set_dtype(self, t):
        """Set internal structures' dtype.
        Perform sanity checks and then calls self._set_dtype(n), which
        is responsible for setting the internal attribute self._dtype.
        Note that subclasses should overwrite self._set_dtype
        when needed."""
        if t is None:
            return
        t = numx.dtype(t)
        if (self._dtype is not None) and (self._dtype != t):
            errstr = ("dtype is already set to '%s' "
                      "('%s' given)!" % (t, self.dtype.name)) 
            raise NodeException(errstr)
        elif t not in self.get_supported_dtypes():
            errstr = ("\ndtype '%s' is not supported.\n"
                      "Supported dtypes: %s" % ( t.name,
                                                 [numx.dtype(t).name for t in
                                                  self.get_supported_dtypes()]))
            raise NodeException(errstr)
        else:
            self._set_dtype(t)

    def _set_dtype(self, t):
        self._dtype = t
        
    dtype = property(get_dtype,
                     set_dtype,
                     doc = "dtype")

    def _get_supported_dtypes(self):
        """Return the list of dtypes supported by this node.
        The types can be specified in any format allowed by numpy.dtype."""
        return mdp.utils.get_dtypes('All')

    def get_supported_dtypes(self):
        """Return dtypes supported by the node as a list of numpy.dtype
        objects.
        Note that subclasses should overwrite self._get_supported_dtypes
        when needed."""
        return [numx.dtype(t) for t in self._get_supported_dtypes()]

    supported_dtypes = property(get_supported_dtypes,
                                doc = "Supported dtypes")

    _train_seq = property(lambda self: self._get_train_seq(),
                          doc = "List of tuples: [(training-phase1, "
                          "stop-training-phase1), (training-phase2, "
                          "stop_training-phase2), ... ].\n"
                          " By default _train_seq = [(self._train,"
                          " self._stop_training]")

    def _get_train_seq(self):
        return [(self._train, self._stop_training)]

    def has_multiple_training_phases(self):
        """Return True if the node has multiple training phases."""
        return len(self._train_seq) > 1
    
    ### Node states
    def is_training(self):
        """Return True if the node is in the training phase,
        False otherwise."""
        return self._training

    def get_current_train_phase(self):
        """Return the index of the current training phase. The training phases
        are defined in the list self._train_seq."""
        return self._train_phase

    def get_remaining_train_phase(self):
        """Return the number of training phases still to accomplish.
        
        If the node is not trainable then the return value is 0.
        """
        if self.is_trainable():
            return len(self._train_seq) - self._train_phase
        else:
            return 0

    ### Node capabilities
    def is_trainable(self):
        """Return True if the node can be trained, False otherwise."""
        return True

    def is_invertible(self):
        """Return True if the node can be inverted, False otherwise."""
        return True

    ### check functions
    def _check_input(self, x):
        # check input rank
        if not x.ndim == 2:
            error_str = "x has rank %d, should be 2" % (x.ndim)
            raise NodeException(error_str)

        # set the input dimension if necessary
        if self.input_dim is None:
            self.input_dim = x.shape[1]

        # set the dtype if necessary
        if self.dtype is None:
            self.dtype = x.dtype

        # check the input dimension
        if not x.shape[1] == self.input_dim:
            error_str = "x has dimension %d, should be %d" % (x.shape[1],
                                                              self.input_dim)
            raise NodeException(error_str)

        if x.shape[0] == 0:
            error_str = "x must have at least one observation (zero given)"
            raise NodeException(error_str)
        
    def _check_output(self, y):
        # check output rank
        if not y.ndim == 2:
            error_str = "y has rank %d, should be 2" % (y.ndim)
            raise NodeException(error_str)

        # check the output dimension
        if not y.shape[1] == self.output_dim:
            error_str = "y has dimension %d, should be %d" % (y.shape[1],
                                                              self.output_dim)
            raise NodeException(error_str)

    def _if_training_stop_training(self):
        if self.is_training():
            self.stop_training()
            # if there is some training phases left we shouldn't be here!
            if self.get_remaining_train_phase() > 0:
                error_str = "The training phases are not completed yet."
                raise TrainingException(error_str)

    def _pre_execution_checks(self, x):
        """This method contains all pre-execution checks.
        It can be used when a subclass defines multiple execution methods."""

        # if training has not started yet, assume we want to train the node
        if (self.get_current_train_phase() == 0 and
            not self._train_phase_started):
            while True:
                self.train(x)
                if self.get_remaining_train_phase() > 1:
                    self.stop_training()
                else:
                    break
        
        self._if_training_stop_training()
        
        # control the dimension x
        self._check_input(x)

        # set the output dimension if necessary
        if self.output_dim is None:
            self.output_dim = self.input_dim

    def _pre_inversion_checks(self, y):
        """This method contains all pre-inversion checks.
        It can be used when a subclass defines multiple inversion methods."""
        
        if not self.is_invertible():
            raise IsNotInvertibleException("This node is not invertible.")

        self._if_training_stop_training()

        # set the output dimension if necessary
        if self.output_dim is None:
            # if the input_dim is not defined, raise an exception
            if self.input_dim is None:
                errstr = ("Number of input dimensions undefined. Inversion"
                          "not possible.")
                raise NodeException(errstr)
            self.output_dim = self.input_dim
        
        # control the dimension of y
        self._check_output(y)

    ### casting helper functions

    def _refcast(self, x):
        """Helper function to cast arrays to the internal dtype."""
        return mdp.utils.refcast(x, self.dtype)
    
    ### Methods to be implemented by the user

    # this are the methods the user has to overwrite
    # they receive the data already casted to the correct type
    
    def _train(self, x):
        if self.is_trainable():
            raise NotImplementedError

    def _stop_training(self, *args, **kwargs):
        pass

    def _execute(self, x):
        return x
        
    def _inverse(self, x):
        if self.is_invertible():
            return x

    def _check_train_args(self, x, *args, **kwargs):
        # implemented by subclasses if needed
        pass

    ### User interface to the overwritten methods
    
    def train(self, x, *args, **kwargs):
        """Update the internal structures according to the input data 'x'.
        
        'x' is a matrix having different variables on different columns
        and observations on the rows.

        By default, subclasses should overwrite _train to implement their
        training phase. The docstring of the '_train' method overwrites this
        docstring.

        Note: a subclass supporting multiple training phases should implement
        the *same* signature for all the training phases and document the
        meaning of the arguments in the '_train' method doc-string. Having
        consistent signatures is a requirement to use the node in a flow.
        """

        if not self.is_trainable():
            raise IsNotTrainableException("This node is not trainable.")

        if not self.is_training():
            err_str = "The training phase has already finished."
            raise TrainingFinishedException(err_str)

        self._check_input(x)
        self._check_train_args(x, *args, **kwargs)        
        
        self._train_phase_started = True
        self._train_seq[self._train_phase][0](self._refcast(x), *args, **kwargs)

    def stop_training(self, *args, **kwargs):
        """Stop the training phase.

        By default, subclasses should overwrite _stop_training to implement
        their stop-training. The docstring of the '_stop_training' method
        overwrites this docstring.
        """
        if self.is_training() and self._train_phase_started == False:
            raise TrainingException("The node has not been trained.")
        
        if not self.is_training():
            err_str = "The training phase has already finished."
            raise TrainingFinishedException(err_str)

        # close the current phase.
        self._train_seq[self._train_phase][1](*args, **kwargs)
        self._train_phase += 1
        self._train_phase_started = False
        # check if we have some training phase left
        if self.get_remaining_train_phase() == 0:
            self._training = False

    def execute(self, x, *args, **kwargs):
        """Process the data contained in 'x'.
        
        If the object is still in the training phase, the function
        'stop_training' will be called.
        'x' is a matrix having different variables on different columns
        and observations on the rows.
        
        By default, subclasses should overwrite _execute to implement
        their execution phase. The docstring of the '_execute' method
        overwrites this docstring.
        """
        self._pre_execution_checks(x)
        return self._execute(self._refcast(x), *args, **kwargs)

    def inverse(self, y, *args, **kwargs):
        """Invert 'y'.
        
        If the node is invertible, compute the input x such that
        y = execute(x).
        
        By default, subclasses should overwrite _inverse to implement
        their inverse function. The docstring of the '_inverse' method
        overwrites this docstring.
        """
        self._pre_inversion_checks(y)
        return self._inverse(self._refcast(y), *args, **kwargs)

    def __call__(self, x, *args, **kwargs):
        """Calling an instance of Node is equivalent to call
        its 'execute' method."""
        return self.execute(x, *args, **kwargs)

    ###### adding nodes returns flows

    def __add__(self, other):
        # check other is a node
        if isinstance(other, Node):
            return mdp.Flow([self, other])
        elif isinstance(other, mdp.Flow):
            flow_copy = other.copy()
            flow_copy.insert(0, self)
            return flow_copy.copy()
        else:
            err_str = ('can only concatenate node'
                       ' (not \'%s\') to node' % (type(other).__name__) )
            raise TypeError(err_str)
        
    ###### string representation
    
    def __str__(self):
        return str(type(self).__name__)
    
    def __repr__(self):
        # print input_dim, output_dim, dtype 
        name = type(self).__name__
        inp = "input_dim=%s" % str(self.input_dim)
        out = "output_dim=%s" % str(self.output_dim)
        if self.dtype is None:
            typ = 'dtype=None'
        else:
            typ = "dtype='%s'" % self.dtype.name
        args = ', '.join((inp, out, typ))
        return name+'('+args+')'

    def copy(self, protocol = -1):
        """Return a deep copy of the node.
        Protocol is the pickle protocol."""
        as_str = _cPickle.dumps(self, protocol)
        return _cPickle.loads(as_str)

    def save(self, filename, protocol = -1):
        """Save a pickled serialization of the node to 'filename'.
        If 'filename' is None, return a string.

        Note: the pickled Node is not guaranteed to be upward or
        backward compatible."""
        if filename is None:
            return _cPickle.dumps(self, protocol)
        else:
            # if protocol != 0 open the file in binary mode
            if protocol != 0:
                mode = 'wb'
            else:
                mode = 'w'
            flh = open(filename, mode)
            _cPickle.dump(self, flh, protocol)
            flh.close()


class Cumulator(Node):
    """A Cumulator is a Node whose training phase simply collects
    all input data. In this way it is possible to easily implement
    batch-mode learning.

    The data is accessible in the attribute 'self.data' after
    the beginning of the '_stop_training' phase. 'self.tlen' contains
    the number of data points collected.
    """

    def __init__(self, input_dim = None, output_dim = None, dtype = None):
        super(Cumulator, self).__init__(input_dim, output_dim, dtype)
        self.data = []
        self.tlen = 0

    def _train(self, x):
        """Cumulate all input data in a one dimensional list."""
        self.tlen += x.shape[0]
        self.data.extend(x.ravel().tolist())

    def _stop_training(self, *args, **kwargs):
        """Transform the data list to an array object and reshape it."""
        self.data = numx.array(self.data, dtype = self.dtype)
        self.data.shape = (self.tlen, self.input_dim)
        
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.