multi_pipe.py :  » Development » SnapLogic » snaplogic » common » snapstream » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » SnapLogic 
SnapLogic » snaplogic » common » snapstream » multi_pipe.py
# $SnapHashLicense:
# 
# SnapLogic - Open source data services
# 
# Copyright (C) 2008, SnapLogic, Inc.  All rights reserved.
# 
# See http://www.snaplogic.org for more information about
# the SnapLogic project. 
# 
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
# 
# "SnapLogic" is a trademark of SnapLogic, Inc.
# 
# 
# $

# $Id: multi_pipe.py 1749 2008-03-20 22:34:26Z kurt $

from __future__ import with_statement

"""
Utilities for handling the multi-POST pseudo-stream protocol.

"""

from threading import Thread,Condition

from snaplogic.common.snap_exceptions import *

class MultiPipeAggregatorThread(Thread):
    """
    Aggregator thread for serializing multiple POST requests into a single stream.
    
    For the multi-POST protocol, a series of POSTs can occur to the same view URL allowing a pseudo-streaming
    capability. This convoluted protocol is currently necessary due to paste WSGI not supporting the chunked
    HTTP transfer encoding.
    
    This class created a thread whose purpose is to read data in from the various pipes and serialize their output
    into a destination pipe. The serialization is done by reading all the data from each pipe in order and transfering 
    it to the output pipe. When one pipe is exhausted and has been closed, the class moves on to the next pipe
    in the queue. If there currently aren't any pipes in the queue, there are two possibilties. If the last pipe
    has not been added to the queue yet, the class waits until a new pipe is added. If the last pipe has been found,
    the destination pipe is closed and the thread exits.
    
    """
    def __init__(self, dest_pipe, thread_name=None):
        """
        Initialization.
        
        @param dest_pipe: A pipe to serialize data out to.
        @type dest_pipe: L{snaplogic.common.snapstream.selectable_pipe.SelectablePipe}
        
        @param thread_name: A name to assign the thread.
        @type thread_name: string
        
        """
        super(MultiPipeAggregatorThread, self).__init__(name=thread_name)
        self._dest_pipe = dest_pipe
        self._pipe_queue = []
        self._cond = Condition()
        self._last_pipe_flag = False
        
    def add_pipe(self, pipe, last_flag=False):
        """
        Add a pipe to the queue.
        
        Adds the pipe to the queue for the transfer thread. If last_flag is given and True, this pipe is expected
        to be the last pipe added. This will queue the thread that when the pipe queue is exhausted, it should exit.
        
        @param pipe: A pipe to read as input.
        @type pipe: L{snaplogic.common.snapstream.selectable_pipe.SelectablePipe}
        
        @param last_flag: A flag indicating this is the last pipe that will be added.
        @type last_flag: bool
        
        """
        with self._cond:
            if self._last_pipe_flag:
                raise SnapValueError("Error adding new POST pipe to aggregator: last POST already received.")
            
            self._pipe_queue.append(pipe)
            self._last_pipe_flag = last_flag
            self._cond.notify()
        
    def _get_next_pipe(self):
        """
        Get the next pipe from the queue.
        
        If there are no more pipes in the queue and the last pipe has not been seen, this call will block
        until a pipe is available.
        
        @return: The next pipe object in the queue.
        @rtype: L{snaplogic.common.snapstream.selectable_pipe.SelectablePipe}
        
        """
        with self._cond:
            while not self._pipe_queue:
                if self._last_pipe_flag:
                    return None
                else:
                    self._cond.wait()
                
            return self._pipe_queue.pop(0)
        
    def _pipe_iterator(self):
        """
        Get an iterator over the pipe queue.
        
        @return: An iterator over the pipe queue.
        @rtype: iterator
        
        """
        while True:
            current_pipe = self._get_next_pipe()
            if current_pipe is not None:
                yield current_pipe
            else:
                return
        
    def run(self):
        for pipe in self._pipe_iterator():
            try:
                while True:
                    item = pipe.get()
                    if item is not None:
                        self._dest_pipe.put(item)
                    else:
                        break
            except Exception:
                self._dest_pipe.abort()
                pipe.abort()
                for pipe in self._pipe_iterator():
                    pipe.abort()
                raise
            
        self._dest_pipe.close()
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.