scheduling.py :  » Math » Modular-toolkit-for-Data-Processing » MDP-2.6 » mdp » parallel » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Math » Modular toolkit for Data Processing 
Modular toolkit for Data Processing » MDP 2.6 » mdp » parallel » scheduling.py
"""
This module contains the basic classes for task processing via a scheduler.
"""

import threading
import time
import os
try:
    import multiprocessing
except ImportError:
    # Python version < 2.6, have to use fallbacks
    pass


class ResultContainer(object):
    """Abstract base class for result containers."""
    
    def add_result(self, result_data, task_index):
        """Store a result in the container."""
        pass
    
    def get_results(self):
        """Return results and reset container."""
        pass
    

class ListResultContainer(ResultContainer):
    """Basic result container using simply a list."""
    
    def __init__(self):
        super(ListResultContainer, self).__init__()
        self._results = []
        
    def add_result(self, result, task_index):
        """Store a result in the container."""
        self._results.append(result)
        
    def get_results(self):
        """Return the list of results and reset this container.
        
        Note that the results are stored in the order that they come in, which
        can be different from the orginal task order.
        """
        results = self._results
        self._results = []
        return results
    
    
class OrderedResultContainer(ListResultContainer):
    """Default result container with automatic restoring of the result order.
    
    In general the order of the incoming results in the scheduler can be
    different from the order of the tasks, since some tasks may finish quicker
    than other tasks. This result container restores the original order.
    """
    
    def __init__(self):
        super(OrderedResultContainer, self).__init__()
        
    def add_result(self, result, task_index):
        """Store a result in the container.
        
        The task index is also stored and later used to reconstruct the
        original task order.
        """
        self._results.append((result, task_index))
        
    def get_results(self):
        """Sort the results into the original order and return them in list."""
        results = self._results
        self._results = []
        def compare_marker(x, y):
            return x[1] - y[1]
        results.sort(compare_marker)
        return zip(*results)[0]
    
    
class TaskCallable(object):
    """Abstract base class for callables."""
    
    def setup_environment(self):
        """This hook method is called when the callable is first loaded.
        
        It should be used to make any required modifications in the Python
        environment that are required by this callable.
        """
        pass
    
    def __call__(self, data):
        """Perform the computation and return the result.
        
        Override this method with a concrete implementation."""
        return data
    
    def fork(self):
        """Return a fork of this callable, e.g. by making a copy.
        
        This method is always used before a callable is actually called, so
        instead of the original callable the fork is called. The ensures that
        the original callable is preserved when cachin is used. If the callable
        is not modified by the call it can simply return itself.  
        """
        return self
    

class SqrTestCallable(TaskCallable):
    """Callable for testing."""
    
    def __call__(self, data):
        """Return the squared data."""
        return data**2
    
    
class SleepSqrTestCallable(TaskCallable):
    """Callable for testing."""
    
    def __call__(self, data):
        """Return the squared data[0] after sleeping for data[1] seconds."""
        time.sleep(data[1])
        return data[0]**2
    

class TaskCallableWrapper(TaskCallable):
    """Wrapper to provide a fork method for simple callables like a function.
    
    This wrapper is applied internally in Scheduler.
    """
    
    def __init__(self, callable):
        """Store and wrap the callable."""
        self._callable = callable
        
    def __call__(self, data):
        """Call the internal callable with the data and return the result."""
        return self._callable(data)
    
    
# helper function
def cpu_count():
    """Return the number of CPU cores."""
    try:
        return multiprocessing.cpu_count()
    except NameError:
        ## This code part is taken from parallel python.
        # Linux, Unix and MacOS
        if hasattr(os, "sysconf"):
            if os.sysconf_names.has_key("SC_NPROCESSORS_ONLN"):
                # Linux & Unix
                n_cpus = os.sysconf("SC_NPROCESSORS_ONLN")
                if isinstance(n_cpus, int) and n_cpus > 0:
                    return n_cpus
            else:
                # OSX
                return int(os.popen2("sysctl -n hw.ncpu")[1].read())
        # Windows
        if os.environ.has_key("NUMBER_OF_PROCESSORS"):
            n_cpus = int(os.environ["NUMBER_OF_PROCESSORS"])
            if n_cpus > 0:
                return n_cpus
        # Default
        return 1 
    

class Scheduler(object):
    """Base class and trivial implementation for schedulers.
    
    New tasks are added with add_task(data, callable).
    get_results then returns the results (and locks if tasks are
    pending).
    
    In this simple scheduler implementation the tasks are simply executed in the 
    add_task method.
    """

    def __init__(self, result_container=None, verbose=False):
        """Initialize the scheduler.
        
        result_container -- Instance of ResultContainer that is used to store
            the results (default is None, in which case a ListResultContainer
            is used).
        verbose -- If True then status messages will be printed to sys.stdout.
        """
        if result_container is None:
            result_container = OrderedResultContainer()
        self.result_container = result_container
        self.verbose = verbose
        self._n_open_tasks = 0  # number of tasks that are currently running
        # count the number of submitted tasks, also used for the task index
        self._task_counter = 0
        self._lock = threading.Lock() 
        self._last_callable = None  # last callable is stored
        # task index of the _last_callable, can be *.5 if updated between tasks
        self._last_callable_index = -1.0
        
    ## public read only properties ##
    
    @property
    def task_counter(self):
        """This property counts the number of submitted tasks."""
        return self._task_counter

    @property
    def n_open_tasks(self):
        """This property counts of submitted but unfinished tasks."""
        return self._n_open_tasks
    
    ## main methods ##
           
    def add_task(self, data, task_callable=None):
        """Add a task to be executed.
        
        data -- Data for the task.
        task_callable -- A callable, which is called with the data. If it is 
            None (default value) then the last provided callable is used.
            If task_callable is not an instance of TaskCallable then a
            TaskCallableWrapper is used.
        
        The callable together with the data constitutes the task. This method
        blocks if there are no free recources to store or process the task
        (e.g. if no free worker processes are available). 
        """
        self._lock.acquire()
        if task_callable is None:
            if self._last_callable is None:
                raise Exception("No task_callable specified and " + 
                                "no previous callable available.")
        self._n_open_tasks += 1
        self._task_counter += 1
        task_index = self.task_counter
        if task_callable is None:
            # use the _last_callable_index in _process_task to
            # decide if a cached callable can be used 
            task_callable = self._last_callable
        else:
            if not hasattr(task_callable, "fork"):
                # not a TaskCallable (probably a function), so wrap it
                task_callable = TaskCallableWrapper(task_callable)
            self._last_callable = task_callable
            self._last_callable_index = self.task_counter
        self._process_task(data, task_callable, task_index)
        
    def set_task_callable(self, task_callable):
        """Set the callable that will be used if no task_callable is given.
        
        Normally the callables are provided via add_task, in which case there
        is no need for this method.
        
        task_callable -- Callable that will be used unless a new task_callable
            is given.
        """
        self._lock.acquire()
        self._last_callable = task_callable
        # set _last_callable_index to half value since the callable is newer 
        # than the last task, but not newer than the next incoming task
        self._last_callable_index = self.task_counter + 0.5
        self._lock.release()
        
    def _store_result(self, result, task_index):
        """Store a result in the internal result container.
        
        result -- Tuple of result data and task index.
        
        This function blocks to avoid any problems during result storage.
        """
        self._lock.acquire()
        self.result_container.add_result(result, task_index)
        if self.verbose:
            print "    finished task no. %d" % task_index
        self._n_open_tasks -= 1
        self._lock.release()
        
    def get_results(self):
        """Get the accumulated results from the result container.
        
        This method blocks if there are open tasks. 
        """
        while True:
            self._lock.acquire()
            if self._n_open_tasks == 0:
                results = self.result_container.get_results()
                self._lock.release()
                return results
            else:
                self._lock.release()
                time.sleep(1)
                
    def shutdown(self):
        """Controlled shutdown of the scheduler.
        
        This method should always be called when the scheduler is no longer 
        needed and before the program shuts down! Otherwise one might get
        error messages.
        """
        self._shutdown()
                
    ## override these methods in custom schedulers ##
                
    def _process_task(self, data, task_callable, task_index):
        """Process the task and store the result.
        
        Warning: When this method is entered is has the lock, the lock must be
        released here. Also note that fork has not been called yet, so the
        provided task_callable is the original and must not be modified
        in any way.
        
        You can override this method for custom schedulers.
        """
        task_callable = task_callable.fork()
        result = task_callable(data)
        # release lock before store_result
        self._lock.release()
        self._store_result(result, task_index)

    def _shutdown(self):
        """Hook method for shutdown to be used in custom schedulers."""
        pass
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.