exec_interface.py :  » Development » SnapLogic » snaplogic » snapi_base » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » SnapLogic 
SnapLogic » snaplogic » snapi_base » exec_interface.py
# $SnapHashLicense:
# 
# SnapLogic - Open source data services
# 
# Copyright (C) 2009, SnapLogic, Inc.  All rights reserved.
# 
# See http://www.snaplogic.org for more information about
# the SnapLogic project. 
# 
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
# 
# "SnapLogic" is a trademark of SnapLogic, Inc.
# 
# 
# $

# $Id:exec_interface.py 1764 2008-03-21 04:27:56Z dhiraj $

"""
This module provides a set of functions that can be used by a Python based client to
start/stop and monitor a resource.

"""

import StringIO
import time

from snaplogic.common.snap_exceptions import *
from snaplogic.common import headers
from snaplogic.common import snap_http_lib,snap_params
from snaplogic.common import snap_control
import snaplogic.common.resource_runtime as resource_runtime
from snaplogic.common.runtime_status import RuntimeStatus
from snaplogic import rp
from snaplogic.snapi_base import keys
from snaplogic.snapi_base.exceptions import SnapiException,SnapiHttpException
from snaplogic import snapi_base

class WriteStream(object):
    """
    This class provides the interface for writing to the input view of a resource.
    
    If the view is in record mode, then write_record must be called, if it is in
    binary mode, the write_binary() must be called. The attribute is_record will
    be True if the view is in record mode and False if it is binary mode.
    
    """
    def __init__(self, name, is_record, uri, content_type):
        """
        Initialize the stream
        
        @param name: Name of the input view.
        @type name:  str
        
        @param is_record: True if view is record mode, False otherwise.
        @param is_record: bool
        
        @param uri: The URI of the input view.
        @type uri:  str 
        
        @param content_type: Content type being POSTed.
        @type content_type:  str
         
        """
        
        self.uri = uri
        """The URI for the POST request."""
        
        self.view_name = name
        
        self.content_type = content_type
        """ The content type of the POST."""
        
        self.is_record = is_record
        """ True if stream is in record mode. False, if it is in binary mode."""
        
        self.buffer_size = 1024
        """This is the amount of bytes that will be buffered before data is written out in POST request."""
        
     
        self._stream = StringIO.StringIO()
        
        self.continued_count = 0
        """This number is incremented, every time data is POST-ed for this stream session."""
        
        self.field_names = None
        """If this is a record mode view, then this attribute will hold the field names of the view."""
        
        self.fields = None
        """
        If this is a record mode view, then this attribute will hold the field definitions in this format -
        ((field_name, type, description), (field_name, type, description), ...
        
        """
        
        self.dictionary_format_record = False
        """
        If set to True, the records are represented as dictionaries with field name as key and field
        value as dict value.

        """
            
        if self.is_record:
            # For record mode view, locate the readily available RPs.
            if content_type is None:
                # Default is ASN1 for record mode
                content_type = rp.SNAP_ASN1_CONTENT_TYPE
            requested_rp = rp.get_rp(content_type)
            if requested_rp is None:
                raise SnapiException("Content type %s cannot be handled by this client" % content_type)
            self._writer = requested_rp.Writer(self._stream)
            self._writer.initialize()

        else:
            if content_type is None:
                # For binary mode, default is to take whatever server will provide.
                raise SnapiException("For binary input view %s no content type has been specified" % self.view_name)
                
        self._hdr = {}
        self._hdr["Content-Type"] = content_type
        self._hdr[headers.SNAPSTREAM_CONTINUED] = str(self.continued_count)
        
        # Now make an initial connection.
        self.flush()
    
    def flush(self, end_stream = False):
        """
        POST whatever data we have buffered since the last POST.
        
        @param end_stream: This is set to True if no further data is going to be sent.
        @type end_stream:  bool
        
        """
        
        if end_stream:
            del self._hdr[headers.SNAPSTREAM_CONTINUED]
        else:
            self._hdr[headers.SNAPSTREAM_CONTINUED] = str(self.continued_count + 1)
           
        response = snap_http_lib.sendreq("POST", self.uri, self._stream.getvalue(), self._hdr)
        response.getResponse()
        if response.getStatus() != 200:
            response.close()
            raise SnapiHttpException(response.getStatus(), response.getReason(), None,
                                     "Got HTTP Error code %s for sending data to view %s" % (response.getStatus(),
                                                                                         self.view_name))
        response.close()
        self._stream.truncate(0)
        
    def write_record(self, rec):
        """
        Write a raw record to the stream.
        @param data: A raw record to be written to stream. Raw record is bascially
            a sequence of field values.
        @type raw_rec:  list or tuple.
        
        """
        if self.is_record:
            if self.dictionary_format_record:
                rec = [rec.get(fname) for fname in self.field_names]
            
            self._writer.write(rec)
        else:
            raise SnapiException("This is a binary mode view")
        
        if len(self._stream.getvalue()) > self.buffer_size:
            self.flush()
            
    def write_binary(self, data):
        """
        Write a raw record to the stream.
        @param data: A raw record to be written to stream. Raw record is bascially
            a sequence of field values.
        @type raw_rec:  list or tuple.
        
        """
        if self.is_record:
            raise SnapiException("This is a record mode view")
        else:
            self._stream.write(data)     
        
        if len(self._stream.getvalue()) > self.buffer_size:
            self.flush()
              
    def close(self):
        """Close the stream."""
        
        if self.is_record:
            self._writer.end()
        self.flush(True)
        self._stream.close()

        
class ReadStream(object):
    
    """
    This class provides a streaming interface to the
    output view of a resource.
    
    """
    
    def __init__(self, view_name, is_record, uri, accept_type):
        """
        Initialize the stream
        
        @param name: Name of the output view.
        @type name:  str
        
        @param is_record: True if view is record mode, False otherwise.
        @param is_record: bool
        
        @param uri: The URI of the output view.
        @type uri:  str 
        
        @param accept_type: Content type being requested.
        @type accept_type:  str
         
        """
        
        self.uri = uri
        """URI for the output view."""
        
        self.view_name = view_name
        """Name of the output view being read."""
        
        self.content_type = None
        """The content type negotiated on."""
        
        self.accept_type = accept_type
        """Content type requested by the GET request."""
         
        self.is_record = is_record
        """ True if stream is in record mode. False, if it is in binary mode."""
        
        self.field_names = None
        """If this is a record mode view, then this attribute will hold the field names of the view."""
        
        self.fields = None
        """
        If this is a record mode view, then this attribute will hold the field definitions in this format -
        ((field_name, type, description), (field_name, type, description), ...
        
        """
        
        self.dictionary_format_record = False
        """
        If set to True, the records are represented as dictionaries with field name as key and field
        value as dict value.

        """
        
        hdr = {}
        if accept_type is None:
            if is_record:
                # Default is ASN1 for record mode
                hdr["Accept"] = rp.SNAP_ASN1_CONTENT_TYPE
            else:
                # This is binary mode. Default is to take whatever server will provide.
                hdr["Accept"] = "*/*"
        else:
            hdr["Accept"] = accept_type
            
        self._resp = snap_http_lib.sendreq("GET", self.uri, None, hdr)
        self._resp.getResponse()
        if self._resp.getStatus() != 200:
            self._resp.close()
            raise SnapiHttpException(self._resp.getStatus(), self._resp.getReason(), None,
                                     "Got HTTP error %s while connecting to output view %s URI %s" %
                                     (self._resp.getStatus(), self.view_name, self.uri))
        
        self.content_type = self._resp.getHeaders()['content-type']
        if self.is_record:
            requested_rp = rp.get_rp(self.content_type)
            if requested_rp is None:
                self._resp.close()
                raise SnapValueError("Content type %s cannot be handled by this client" % self.content_type)
            self._gen = requested_rp.Reader(self._resp)
            
        
        
    def read_record(self):
        """
        Read Snap records from record mode output view.
        
        @return: A sequence of field values of the record that was just read. None, if no more records are
            going to be sent in this output view.
        @rtype:  list
        
        """
        if self.is_record:
            try:
                rec = self._gen.next()
            except StopIteration:
                return None
            if self.dictionary_format_record:
                # We need to convert the tuple into a dictionary format.
                return dict(zip(self.field_names, rec))
            else:
                return rec
            
        else:
            raise SnapiException("This is a binary mode view")
        
    def read_binary(self, size):
        """
        Read binary data from binary mode output view.
        
        @param size: Size to read in bytes.
        @type size:  int
        
        @return: The binary data read. None, if no more data is available from the stream.
        @rtype:  str
        
        """
        
        if self.is_record:
            raise SnapiException("This is a record mode view")
        
        ret = self._resp.read(size)
        if ret == "":
            return None
        return ret    
        
    def close(self):
        """Closes the stream."""
        self._resp.close()

class Handle(object):
    """
    This class represents the handle returned when a resource is executed.
    
    This handle can be used for monitoring and controlling the resource,  as
    it executes.
    
    """
    def __init__(self, status_uri, rid, resource_name, resource_uri, creds=None, inp_dict=None, out_dict=None,
                 resdef=None, control_uri=None):
        """Initializes the object with URI of the executing resource."""
        self.status_uri = status_uri
        self.control_uri = control_uri
        self.resource_uri = resource_uri
        self.resource_name = resource_name
        self.credentials = creds
        self.rid = rid
        
        if inp_dict is None:
            inp_dict = {}
        
        if out_dict is None:
            out_dict = {}
            
        self.inputs = inp_dict
        self.outputs = out_dict
        self.resdef = resdef
        for inp_name in self.inputs:
            if self.resdef.input_is_record_mode(inp_name):
                self.inputs[inp_name].field_names = self.resdef.list_input_field_names(inp_name)
                self.inputs[inp_name].fields = self.resdef.list_input_view_fields(inp_name)
                
        for out_name in self.outputs:
            if self.resdef.output_is_record_mode(out_name):
                self.outputs[out_name].field_names = self.resdef.list_output_field_names(out_name)
                self.outputs[out_name].fields = self.resdef.list_output_view_fields(out_name)
    
    def get_current_status(self, detailed = False):
        """
        Returns the status of the resource execution.
        
        @param detailed: If set to True, then it returns the status of
            the resources inside the pipeline
        @type detailed: boolean
        
        @return: Status of the session
        @rtype: L{RuntimeStatus}
        
        """
        return get_status(self.status_uri, detailed)
    
    def stop(self):
        """Send STOP request to an executing resource."""
        send_stop(self.control_uri)
        
    def wait(self, polling_interval = 10, timeout = 0):
        """
        Waits for the session represented by this handle to finish.
        
        @param polling_interval: The  number of seconds between each check for status
        @type polling_interval:  int
        
        @param timeout:  The amount of time to wait for the pipeline to complete. If 0, then block until
            it completes.
        @type timeout:  int
        
        @return: Status of the finished session or None, if timed out.
        @rtype:  L{RuntimeStatus}
        
        """
        
        if timeout:
            start_time = time.time()
        
        while True:
            ret = get_status(self.status_uri)
            if ret.is_finished():
                return ret
            if timeout:
                time_used = time.time() - start_time
                if time_used > timeout:
                    # Exceeded timeout.
                    return None
                elif polling_interval + time_used > timeout:
                    # Don't even bother sleeping, we will exceed timeout. Just return
                    return None
                
            time.sleep(polling_interval)
            
    def get_resource_logs(self, include_facilities=None, exclude_facilities=None):
        """
        Fetch all the log messages related to this resource execution.
        
        @return: List of strings.
        @rtype:  list
        
        """
        return snapi_base.get_resource_logs(self.rid, self.resource_uri, include_facilities,
                                            exclude_facilities, self.credentials)
                
    def dictionary_format_record():
        doc = "If set to True, the records are expected and provided in dictionary format (instead of tuple format)."
        def fget(self):
            return self._dict_format_record
        
        def fset(self, value):
            self._dict_format_record = value
            for inp_name in self.inputs:
                self.inputs[inp_name].dictionary_format_record = value
            for out_name in self.outputs:
                self.outputs[out_name].dictionary_format_record = value
        return locals()
    dictionary_format_record = property(**dictionary_format_record())
         
    
def exec_resource(resource_name, resource_uri, resdef, inputs = None, outputs = None, params = None, creds = None,
                  custom_header = None):
    """
    Internally used function that executes a resource.
    
    @param resource_name: User defined name for the resource. Could be any string the caller of this
        method would like to use to identify the pipeline being invoked. This name does not have to
        be unique.
    @type resource_name:  str
    
    @param resource_uri: The URI of the resource to be executed.
    @type resource_uri:  str
    
    @param resdef: The resource definition of the resource being executed (can be a shallow copy).
    @type resdef:  L{ResDef}
    
    @param inputs: Dictionary with names of the input views as keys. The value can be requested content type or
        can be None, if default content_type is fine.
    @type inputs:  dict
    
    @param outputs: Dictionary with names of the output views as keys. The value can be requested content type or
        can be None, if default content_type is fine.
    @type outputs:  dict
    
    @param params: Dictionary with param name as key and param value as value.
    @type params:  dict 
    
    @param creds: A 2-tuple containing (username, password)
    @type creds:  tuple
    
    @param custom_header: If not None, contains header entries that must be sent with the prepare request.
    @type custom_header:  dict 
    
    @return: Handle for the executing resource.
    @rtype:  L{Handle}
    
    """
    if not inputs:
        inputs = {}
    if not outputs:
        outputs = {}
    if not params:
        params = {}
    
    out_stream_dict = {}
    inp_stream_dict = {}
    if resource_uri is None:
        raise SnapiException("The resource definition needs a URI in order to be executed.")
    
    res_rt = _send_prepare(resource_name, resource_uri, inputs.keys(), outputs.keys(), params, creds, custom_header)
    for out in res_rt.output_views.values():
        try:
            out_stream_dict[out.name] = ReadStream(out.name, out.is_record, out.uri, outputs[out.name])
        except Exception, e:
            for s in out_stream_dict.values():
                if s is not None:
                    s.close()
            raise
    

    for inp in res_rt.input_views.values():        
        try:
            inp_stream_dict[inp.name] = WriteStream(inp.name, inp.is_record, inp.uri, inputs[inp.name])
        except Exception, e:
            for s in out_stream_dict.values():
                s.close()
                
            for s in inp_stream_dict.values():
                if s is not None:
                    s.close()
            raise
        
    try:
        _send_start(res_rt)
    except:
        for s in inp_stream_dict.values():
            s.close()
                
        for s in out_stream_dict.values():
            s.close()
        raise
        
    return Handle(res_rt.runtime_status_uri, res_rt.rid, resource_name, resource_uri,
                  creds, inp_stream_dict, out_stream_dict, resdef, res_rt.runtime_control_uri)
    
def _send_prepare(resource_name, resource_uri, input_names = None, output_names = None, params = None, cred = None,
                  custom_header = None):
    """
    Send PREPARE request to resource.
    
    @param resource_name: Caller defined named for the resource. Does not have to be unique.
    @type resource_name:  str
    
    @param input_names: Names of the input views, that the caller wishes to POST input to.
    @type input_names:  list
    
    @param output_names: Names of the output views, that the caller wishes to GET output from.
    @type output_names:  list
    
    @param params: Dictionary with param name as key and param value as value.
    @type params:  dict 
    
    @param cred: A 2-tuple containing (username, password)
    @type cred:  tuple
    
    @param custom_header: If not None, contains header entries that must be sent with the prepare request.
    @type custom_header:  dict 
    
    @return: Runtime status URI of the prepared resource.
    @rtype:  L{resource_runtime.ResourceRuntime}
    
    """
    
    res_rt = resource_runtime.ResourceRuntime(resource_uri)
    res_rt.resource_name = resource_name

    if input_names is not None:
        for name in input_names:
            rt_inp = resource_runtime.RuntimeInputView(name)
            rt_inp.http_method = "POST"
            rt_inp.field_links = resource_runtime.EXACT_MATCH
            res_rt.input_views[name] = rt_inp
            
            
    if output_names is not None:
        for name in output_names:
            rt_out = resource_runtime.RuntimeOutputView(name)
            res_rt.output_views[name] = rt_out
            
    res_rt.parameters = params
    
    # Send prepare request
    try:
        prep_req = snap_control.create_prepare_request(res_rt)
    except Exception, e:
        raise SnapiException("Unable to send PREPARE request to resource %s. %s" %
                              (res_rt.resource_uri, str(e)))
    
    resp_dict = snapi_base.send_req("POST", res_rt.resource_uri, prep_req, custom_header, cred)

    snap_control.parse_prepare_response(resp_dict, res_rt)
    
    return res_rt

def _send_start(res_rt):
    """
    Send START request to a prepared resource.
    
    @param res_rt: The resource runtime object for the prepared resource
    @type res_rt:  L{ResourceRuntime}
    
    @raise SnapiException: If the request failed.
    
    """
    
    try:
        start_dict = snap_control.create_start_request()
        resp_dict = snapi_base.send_req("PUT", res_rt.runtime_control_uri, start_dict)
        snap_control.parse_start_stop_response(resp_dict, res_rt.status)
    except SnapiException, e:
        raise
    except Exception, e:
        raise SnapiException("Unable to send START document. %s " % str(e))
    
    
            
def send_stop(runtime_control_uri):
    """
    Send STOP request to an executing resource.
    
    @param runtime_control_uri: The control URI of the executing resource
    @type runtime_control_uri:  str
    
    @raise SnapiException: If the request failed.
    
    """
    status_obj = RuntimeStatus()
    stop_dict = snap_control.create_stop_request()
    try: 
        resp_dict = snapi_base.send_req("PUT", runtime_control_uri, stop_dict)
        snap_control.parse_start_stop_response(resp_dict, status_obj)
    except SnapiException, e:
        raise
    except Exception, e:
        raise SnapiException("Unable to send STOP document. %s " % str(e))

            
def get_status(runtime_status_uri, is_detailed=False):
    """
    Fetch status from a specified resource runtime.
    
    @param runtime_status_uri: The status URI of the executing resource
    @type runtime_status_uri:  str
    
    @param is_detailed: Request detailed status information from the executing resource. If the
        resource is a pipeline, then setting this param to True will provide status information
        of the resources inside the pipeline too.
    @type is_detailed:  bool
    
    @return: The status object parsed from the GET response.
    @rtype:  L{RuntimeStatus}
    
    @raise SnapiException: If the request failed.
    
    """

    if is_detailed:
        uri = snap_http_lib.add_params_to_uri(runtime_status_uri, {snap_params.LEVEL : snap_params.VALUE_DETAIL})
    else:
        uri = snap_http_lib.add_params_to_uri(runtime_status_uri, {snap_params.LEVEL : snap_params.VALUE_DESCRIBE})
   
    status_dict = snapi_base.send_req("GET", uri)
    
    return snap_control.parse_status_response(status_dict, is_detailed)
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.