__init__.py :  » Development » SnapLogic » snaplogic » common » snapstream » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » SnapLogic 
SnapLogic » snaplogic » common » snapstream » __init__.py
# $SnapHashLicense:
# 
# SnapLogic - Open source data services
# 
# Copyright (C) 2008, SnapLogic, Inc.  All rights reserved.
# 
# See http://www.snaplogic.org for more information about
# the SnapLogic project. 
# 
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
# 
# "SnapLogic" is a trademark of SnapLogic, Inc.
# 
# 
# $

# $Id: __init__.py 3388 2008-06-19 21:11:37Z kurt $

from __future__ import with_statement

"""
Streaming package used within pipeline data flow to pass record or binary streamed data between components and
external client endpoints.

"""

from snaplogic.common.snap_exceptions import *
from snaplogic.common.snapstream import selectable_pipe
from snaplogic.common.snapstream import view_registry
from snaplogic.common.snapstream.pipe_writer import PipeWriter
from snaplogic.common.snapstream.pipe_reader import PipeReader
from snaplogic.common.snapstream.url_reader import URLReader
from snaplogic.common.snapstream import memory
from snaplogic.common.snap_exceptions import *
from snaplogic.common import snap_log
from snaplogic.common.config import snap_config
from snaplogic import cc

OPTIMIZATION_ENABLED = True

MAX_STREAM_BUFFER_SIZE_KEY = "max_stream_buffer_size"
OPTIMIZE_STREAMS_KEY = "optimize_streams"

def init(config):
    global OPTIMIZATION_ENABLED

    cc_config = config.get_section("cc")
    try:
        value = int(cc_config[MAX_STREAM_BUFFER_SIZE_KEY])
    except ValueError, e:
        value = -1
        
    if value <= 0:
        raise SnapException.chain(e, 
                                  SnapValueError("Config parameter '%s' must be an integer greater than 0." % 
                                                 MAX_STREAM_BUFFER_SIZE_KEY))
    else:
        memory.MAX_BUFFER_SIZE = value

    OPTIMIZATION_ENABLED = snap_config.parse_bool(cc_config[OPTIMIZE_STREAMS_KEY])
    if not OPTIMIZATION_ENABLED:
        cc.log(snap_log.LEVEL_WARN, "Stream optimization disabled.")
            
def select_streams(stream_list, timeout=None):
    """
    This is a module-level function capable of providing the resemblance of select() for SnapStream Reader
    objects. Given the list of streams stream_list, this call blocks until at least one stream has available
    data. The list of all streams with available data are returned. If timeout is not None and no streams
    become available within the time limit specified, the empty list is returned instead.
    
    If a stream encounters an error, it will be treated as becoming available and cause this function to
    return it with the list of available streams. The error will remain silent until the stream is actually
    read by the caller.

    @param stream_list: A list of SnapStream Reader objects.
    @type stream_list: list of objects.

    @param timeout: A floating point value giving a number of seconds to wait before returning without
                    available streams. If None, this function will wait indefinitely until a stream
                    becomes available.
    @type timeout: float
    
    @return: A list of streams. Each stream object will be available for reading. It is possible the
             read will result in an error state if the stream experienced an error during this function
             call.

    @raises SnapValueError: Timeout is not a floating point value.
     
    """
    # This code assumes that pipe_select() returns a list in the same order as stream_list
    pipes = [stream._pipe for stream in stream_list]
    avail_pipes = selectable_pipe.pipe_select(pipes, timeout)
    avail_streams = []
    for i in range(len(pipes)):
        if pipes[i] is avail_pipes[0]:
            avail_pipes.pop(0)
            avail_streams.append(stream_list[i])
            if not avail_pipes:
                break
    return avail_streams

register_output_view = view_registry.register_output_view
"""Convenience alias for L{view_registry.register_output_view}."""

unregister_output_view = view_registry.unregister_output_view
"""Convenience alias for L{view_registry.unregister_output_view}."""

def unregister_output_views(url_list):
    """
    Unregister a list of output view URLs.

    @param url_list: A list of output view URLs.
    @type url_list: list

    """
    for url in url_list:
        view_registry.unregister_output_view(url)
    
def open_local_writers(url):
    """
    Return a list of writer streams setup for inter-thread communication.
    
    Used as part of the in-process optimization, this method looks for any reader streams that have linked themselves
    to this output view URL. The output view must have been previously registered via L{register_output_view}. For
    each reader that was linked, a writer object is initialized and returned for it.
    
    This method will automatically unregister the output view URL and is expected to only be called once. It is ok
    to call L{unregister_output_view} multiple times on the same URL and will not result in an error.
    
    @param url: The URL of a previously registered output view.
    @type url: string
    
    @return: A list of Writer objects for each downstream view linked to the output view at url.
    @rtype: list
    
    @raises SnapStreamNegotiationError: Negotiation failed between endpoints.
                              
    """
    view = view_registry.get_view(url)
    linked_pipes = view.linked_pipes if view is not None else []
    writers = []
    for link in linked_pipes:
        mode = PipeWriter.RECORD_STREAM if link.content_type is None else PipeWriter.BINARY_STREAM
        w = PipeWriter(link.pipe, link.content_type, mode, link.url, view.name)
        writers.append(w)

    return writers

def open_url(url, accept_types=None, view_name='Unspecified'):
    """
    Create a SnapStream reader object for an upstream URL output view.
    
    If url points to an output view external to the current process, this method opens an HTTP connection. If url
    is local to the current process, an alternate and optimized transfer process is used instead.
    
    If accept_types is not given or None, create record stream. Otherwise, accept_types must be a list of
    content type specification strings that detail what supported encodings this side of the connection supports.

    @param url: URL of output view to connect to.
    @type url: string
    
    @param accept_types: A list of acceptable content type specifiers or None for record-based streams.
    @type accept_types: list
                          
    @param view_name: The name of the input view this reader is associated with.
    @type view_name: string

    @return: A new SnapStream reader object.
    @rtype: L{snaplogic.common.snapstream.reader.Reader}
    
    @raise SnapStreamNegotiationError: Negotiation failed between endpoints.
    @raise SnapStreamConnectionError: Unable to open a connection to the given URL.
    
    """
    view = view_registry.get_view(url)
    if view is not None and OPTIMIZATION_ENABLED:
        (content_type, pipe) = view.connect(url, accept_types)
        mode = PipeReader.RECORD_STREAM if content_type is None else PipeReader.BINARY_STREAM
        return PipeReader(pipe, content_type, mode, url, view_name)
    else:
        return URLReader(url, accept_types, view_name)
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.