scheduler.py :  » Web-Frameworks » Spyce » spyce-2.1 » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Web Frameworks » Spyce 
Spyce » spyce 2.1 » scheduler.py
"""
A module for scheduling arbitrary callables to run at given times
or intervals, modeled on the naviserver API.  Scheduler runs in
its own thread; callables run in this same thread, so if you have
an unusually long callable to run you may wish to give it its own
thread, for instance,

schedule(3600, lambda: threading.Thread(target=longcallable).start())

Scheduler does not provide subsecond resolution.

Public functions are threadsafe.
"""

import atexit, sys, threading, time, traceback


logger = lambda s: sys.stderr.write('%s\n' % s)
def debuglogger(s):
  import spyce
  spyce.DEBUG(s)

def schedule(interval, callable, once=False):
    """
    Schedules callable to be run every interval seconds.
    Returns the scheduled Task object.
    """
    if interval < 1:
        raise Exception("Interval must be postitive")
    return _insertsorted(Task(time.time() + interval, interval, callable, once))

def _nearest_epoch(hours, minutes):
    L = list(time.localtime(time.time()))
    L[3] = hours
    L[4] = minutes
    L[5] = 0
    epoch = time.mktime(L)
    if epoch < time.time():
        epoch += 24 * 3600
    return epoch

def schedule_daily(hours, minutes, callable, once=False):
    """
    Schedules callable to be run at hours:minutes every day.
    (Hours is a 24-hour format.)
    Returns the scheduled Task object.
    """
    if hours > 23 or hours < 0:
        raise ValueError("Invalid hours %s" % hours)
    if minutes > 59 or minutes < 0:
        raise ValueError("Invalid minutes %s" % minutes)
    epoch = _nearest_epoch(hours, minutes)
    return _insertsorted(Task(epoch, 24 * 3600, callable, once))

def schedule_weekly(day, hours, minutes, callable, once=False):
    """
    Schedules callable to be run at hours:minutes on the given
    zero-based day of the week.  (Monday is 0.)
    Returns the scheduled Task object.
    """
    if day > 6 or day < 0:
        raise ValueError("Invalid day %s" % day)
    if hours > 23 or hours < 0:
        raise ValueError("Invalid hours %s" % hours)
    if minutes > 59 or minutes < 0:
        raise ValueError("Invalid minutes %s" % minutes)
    epoch = _nearest_epoch(hours, minutes)
    while time.localtime(epoch)[6] != day:
        epoch += 24 * 3600
    return _insertsorted(Task(epoch, 7 * 24 * 3600, callable, once))

def unschedule(task):
    """
    Removes the given task from the scheduling queue.
    """
    _qlock.acquire()
    try:
        _queue.remove(task)
    finally:
        _qlock.release()

class Task:
    """
    Instantiated by the schedule methods.
    
    Instance variables:
      nextrun: epoch seconds at which to run next
      interval: seconds before repeating
      callable: function to invoke
      last: if True, will be unscheduled after nextrun

    (Note that by manually setting last on a Task instance, you
    can cause it to run an arbitrary number of times.)
    """
    def __init__(self, firstrun, interval, callable, once):
        self.nextrun = firstrun
        self.interval = interval
        self.callable = callable
        self.last = once
    def __repr__(self):
        return 'Task(nextrun=%r, interval=%d, callable=%s, last=%s)' \
               % (time.asctime(time.localtime(self.nextrun)), self.interval, self.callable, self.last)

_qlock = threading.RLock()
# (we don't use a Queue object here since we need to do our own locking anyway)
_queue = []

def _insertsorted(task):
    _qlock.acquire()
    i = len(_queue)
    while i > 0:
        if task.nextrun < _queue[i - 1].nextrun:
            break
        i -= 1
    _queue.insert(i, task)
    _qlock.release()
    return task

_keepgoing = True
_paused = False

def _process():
    """True if a task was run"""
    try:
        _qlock.acquire()
        if not len(_queue):
            return False
        if _queue[-1].nextrun > time.time():
            return False
        task = _queue.pop()
    finally:
        _qlock.release()
    debuglogger('running scheduled task %s' % task.callable)
    try:
        task.callable()
    except Exception:
        logger(traceback.format_exc())
    if not task.last:
        task.nextrun = max(task.nextrun + task.interval, time.time())
        _insertsorted(task)
    return True

def _loop():
    sleep = False
    while _keepgoing:
        if sleep:
            time.sleep(1)
        if _paused:
            sleep = True
            continue
        sleep = not _process()
    debuglogger("Scheduler thread has stopped")

_thread = threading.Thread(target=_loop, name='spyce-scheduler')
_thread.start()

def pause():
    """Temporarily suspend running scheduled tasks"""
    _paused = True
    
def unpause():
    """
    Resume running scheduled tasks.  If a task came due while
    it was paused, it will run immediately after unpausing.
    """
    _paused = False

def _cleanup():
    debuglogger('Waiting for scheduler thread...')
    global _keepgoing
    _keepgoing = False
    _thread.join()
    
atexit.register(_cleanup)

# yes, I need more and better tests...  you're welcome to add some :)
if __name__ == "__main__":
    d = {}
    def foo():
        d['test'] = 1
    print schedule(1, lambda: threading.Thread(target=foo).start(), True)
    while not d:
        time.sleep(1)
    print d
    assert not _queue

    d = {}
    hours, minutes, _, day = time.localtime(time.time() + 61)[3:7]
    print schedule_weekly(day, hours, minutes, foo)
    print '(waiting...  could be up to 60 seconds)'
    while not d:
        time.sleep(1)
    print d
    assert len(_queue) == 1
    
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.