thread_utils.py :  » Development » SnapLogic » snaplogic » common » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » SnapLogic 
SnapLogic » snaplogic » common » thread_utils.py
# $SnapHashLicense:
# 
# SnapLogic - Open source data services
# 
# Copyright (C) 2008, SnapLogic, Inc.  All rights reserved.
# 
# See http://www.snaplogic.org for more information about
# the SnapLogic project. 
# 
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
# 
# "SnapLogic" is a trademark of SnapLogic, Inc.
# 
# 
# $

# $Id: thread_utils.py 4747 2008-10-22 19:58:55Z kurt $

from __future__ import with_statement
import threading
import sys

from snaplogic.common.snap_exceptions import SnapValueError

class RecursiveLockError(Exception):
    pass

class ThreadNotLockedError(Exception):
    pass

class SubLockHandle(object):
    """
    A context manager class used as a helper to RWLock.

    Instances of this class will allow use of the RWLock.acquire_read and RWLock.acquire_write
    methods in a 'with' statement. For example, the two following snippets are identical:
    {{{
    lock = RWLock()
    lock.acquire_read()
    <code statements>
    lock.release()
    }}}
    {{{
    lock = RWLock()
    with lock.acquire_read():
        <code statements>
    }}}

    """

    def __init__(self, rwlock, lock_type):
        """
        Initialization.

        """
        super(SubLockHandle, self).__init__()
        self._rwlock = rwlock
        self.lock_type = lock_type

    def __enter__(self):
        self.acquire()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()
        return False

    def acquire(self, blocking=True):
        """See description of RWLock.acquire_read and RWLock.acquire_write for details."""
        if self.lock_type is RWLock.READ_LOCK:
            return self._rwlock.acquire_read(blocking)
        else:
            return self._rwlock.acquire_write(blocking)

    def release(self):
        """See description of RWLock.release for details."""
        self._rwlock.release()

class RWLock(object):
    READ_LOCK = 'read'
    WRITE_LOCK = 'write'

    def __init__(self):
        self._cond = threading.Condition()
        self._lockers = {}
        self._write_active = False

    def read_lock(self):
        return SubLockHandle(self, self.READ_LOCK)

    def write_lock(self):
        return SubLockHandle(self, self.WRITE_LOCK)

    def get_lock_handle(self, lock_mode):
        """
        Return a lock handle for the given lock mode.

        Returns the appropriate handle as returned by read_lock or write_lock depending on lock_mode. The
        lock_mode parameter should be either RWLock.READ_LOCK or RWLock.WRITE_LOCK.

        @param lock_mode: Lock mode of handle to return.
        @type lock_mode: RWlock constant

        @return: Handle object returned by read_lock or write_lock.
        @rtype: object

        @raises SnapValueError: The lock_mode parameter is invalid.

        """
        if lock_mode is self.READ_LOCK:
            return self.read_lock()
        elif lock_mode is self.WRITE_LOCK:
            return self.write_lock()
        else:
            raise SnapValueError("Invalid lock mode '%s'" % str(lock_mode))
    
    def acquire_read(self, blocking=True):
        """
        Acquire a (shared) read lock.

        Multiple threads may hold a read lock at the same time. Write threads are blocked until all reader
        thread locks are released. If a write lock is in progress, all read threads are blocked.

        If a write lock is currently held by some thread and acquire_read() is called with blocking=False,
        the call returns immediately with False. If a write lock is not currently held, the read lock is
        acquired and True is returned.

        @param blocking: Flag indicating if acquiring the read lock should block.
        @type blocking: bool

        @return: True if the read lock was acquired or shared with other read threads. False if the lock
                 could not be immediately acquired and blocking=False.
        @rtype: bool

        """
        if self._cond.acquire(blocking):
            try:
                thread = threading.currentThread()
                if thread not in self._lockers:
                    if blocking or not self._lockers:
                        # Wait until no writing thread active.
                        while self._write_active:
                            self._cond.wait()

                        self._lockers[thread] = self.READ_LOCK
                        return True
                    else:
                        # Couldn't immediately acquire lock and blocking=False.
                        return False
                else:
                    raise RecursiveLockError("Lock (%s) already acquired by thread '%s'" % (self._lockers[thread],
                                                                                            thread.getName()))
            finally:
                self._cond.release()
        else:
            return False

    def acquire_write(self, blocking=True):
        """
        Acquire an exclusive write lock.

        Only one thread may hold a write lock at a time, and no threads may hold a read lock while a
        write lock is held.
        
        If a read or write lock is currently held and acquire_write() is called with blocking=False, the call
        returns immediately with False. If neither a write nor read lock are currently held, the write lock is
        acquired True is returned.

        @param blocking: Flag indicating if acquiring the write lock should block.
        @type blocking: bool

        @return: True if the write lock was acquired. False if the lock could not be immediately acquired and
                 blocking=False.
        @rtype: bool

        """
        if self._cond.acquire(blocking):
            try:
                thread = threading.currentThread()
                if thread not in self._lockers:
                    if blocking or not self._lockers:
                        # Wait until there are no locking threads.
                        while self._lockers:
                            self._cond.wait()
                            
                        self._lockers[thread] = self.WRITE_LOCK
                        self._write_active = True
                        return True
                    else:
                        # Couldn't immediately acquire lock and blocking=False.
                        return False
                else:
                    raise RecursiveLockError("Lock (%s) already acquired by thread '%s'" % (self._lockers[thread],
                                                                                            thread.getName()))
            finally:
                self._cond.release()
        else:
            # Couldn't immediately acquire lock and blocking=False.
            return False

    def acquire(self, lock_mode, blocking=True):
        """
        Acquire the read or write lock depending on lock_mode.

        The lock_mode parameter should be either RWLock.READ_LOCK or RWLock.WRITE_LOCK and the appropriate
        acquire_* method will be called.

        @param lock_mode: Lock mode to acquire.
        @param lock_mode: RWLock constant

        @param blocking: Blocking flag passed on to acquire_read or acquire_write.
        @type blocking: bool

        @return: Returns whatever acquire_read or acquire_write return.
        @rtype: bool

        @raises SnapValueError: The lock_mode parameter is invalid.
        
        """
        if lock_mode is self.READ_LOCK:
            return self.acquire_read(blocking)
        elif lock_mode is self.WRITE_LOCK:
            return self.acquire_write(blocking)
        else:
            raise SnapValueError("Invalid lock mode '%s'" % str(lock_mode))
        
    def release(self):
        """
        Release the currently held read or write lock.

        """
        with self._cond:
            thread = threading.currentThread()
            if thread in self._lockers:
                if self._lockers[thread] is self.WRITE_LOCK:
                    self._write_active = False
                    
                del self._lockers[thread]
                self._cond.notifyAll()
            else:
                raise ThreadNotLockedError("Thread '%s' attempted to release unheld lock" % thread.getName())

        
            
        
            
        
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.