component_runtime.py :  » Development » SnapLogic » snaplogic » cc » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » SnapLogic 
SnapLogic » snaplogic » cc » component_runtime.py
# $SnapHashLicense:
# 
# SnapLogic - Open source data services
# 
# Copyright (C) 2009, SnapLogic, Inc.  All rights reserved.
# 
# See http://www.snaplogic.org for more information about
# the SnapLogic project. 
# 
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
# 
# "SnapLogic" is a trademark of SnapLogic, Inc.
# 
# 
# $
# $Id:component_runtime.py 1006 2008-01-25 04:06:55Z dhiraj $

import re
import time
import copy
import threading
import decimal
import datetime
from pprint import PrettyPrinter

from snaplogic import rp
from snaplogic import cc
from snaplogic.common.snap_exceptions import *
from snaplogic.common import snap_log
from snaplogic.common.runtime_status import RuntimeStatus
from snaplogic.common import snap_http_lib
from snaplogic.common import runtime_table,snap_params
from snaplogic.common import resource_runtime
from snaplogic.common import snap_control
from snaplogic.common import snapstream
from snaplogic.common.snapstream.stream_driver import StreamDriver
from snaplogic.common.snapstream.url_reader import URLReader
from snaplogic.common.snapstream import rh_frontend
from snaplogic.common import uri_prefix
from snaplogic import snapi_base
from snaplogic.snapi_base import resdef
from snaplogic.snapi_base import keys
from snaplogic.cc.input_view import InputView
from snaplogic.cc.output_view import OutputView
from snaplogic.cc import registration
import snaplogic.cc.prop as prop

validate_recs = True

# Flags for data tracing that are set based on
# the config file. These global flags will be
# OR-ed with the sn.trace_data parameter values
# gotten per pipeline.
glob_inp_trace_flag = False
glob_out_trace_flag = False
    
pp = PrettyPrinter(indent=4)

def init(conf):
    global validate_recs, glob_inp_trace_flag, glob_out_trace_flag
    if "type_check_records" in conf:
        if conf["type_check_records"] == "yes":
            validate_recs = True
        elif conf["type_check_records"] == "no":
            validate_recs = False
        else:
            raise SnapException("Config file entry 'type_check_records' must have a yes/no value, found '%s'" %
                                conf["type_check_records"])
    if snap_params.TRACE_DATA_CONFIG in conf:
        trace_config = conf[snap_params.TRACE_DATA_CONFIG]
        if isinstance(trace_config, list):
            trace_config = ','.join(trace_config)
        (glob_inp_trace_flag, glob_out_trace_flag, err_msg) = get_trace_flags(trace_config)
        if err_msg:
            cc.log(snap_log.LEVEL_WARN, 
                       "Error parsing config option %s: %s, tracing will not be set"  % (snap_params.TRACE_DATA_CONFIG, err_msg))
            glob_inp_trace_flag = glob_out_trace_flag = False 
            
class ComponentThread(threading.Thread):
    """
    Encapsulates the component class and provides thread interface.
    
    The primary purpose of this class is to keep the component's backend code out of
    the ComponentAPI codehus providing the component author with simple code to refer
    to and also in the process, minimize chances of accidentally disrupting the backend
    code in the derived component class.
    
    """
    def __init__(self, comp_object, resource_name, context_name, rid, inp_trace_flag, out_trace_flag):
        """
        Initialize the ComponentThread object.
        
        @param comp_object: The object of a class derived from ComponentAPI.
        @type comp_object:  derived from L{ComponentAPI}
        
        @param resource_name: The name given to the resource inside the pipeline that
            is executing it. Or, a name given by a client that is directly  eexecuting
            the resource. The resource here is based on the component.
        @type resource_name:  str
        
        @param trace_flag: If set to true, that record tracing should be enabled.
        @type trace_flag:  bool
        
        """
        threading.Thread.__init__(self)
        
        # The instantiated component object.
        self.component = comp_object
        """The component object being executed by this thread."""
        
        self.got_stopped = False
        """This flag is set to True when a STOP request is received."""
        
        # Staging area for collecting output stream connections, as they trickle in.
        # These will be organized under their respective output view objects, before
        # the component is started. 
        self._output_streams = []
        
        # This dictionary keeps track of input streams that are using HTTP POST method
        # to send data.
        self._posted_input_streams = {}
        
        self.input_views = {}
        """Input views of the component that have got streams associated with them. Dict of L{InputView}"""
        
        self.output_views = {}
        """Output views of the component that have got streams associated with them. Dict of L{OutputView}"""
                
        self.inp_trace_flag = inp_trace_flag
        """Flag to indicate if input tracing is enabled or disabled."""
        
        self.out_trace_flag = out_trace_flag
        """Flag to indicate if output tracing is enabled or disabled."""
        
        self.top_pipeline_rid = rid.split(".")[0]
        """ The rid of the topmost pipeline, inside which this resource runs."""
        
        self.fully_qualified_resource_name = \
                    context_name + "." + resource_name if context_name is not None else resource_name
        
        self.setName(resource_name)
        
        self.rid = rid

        cc.stats_group.get_stat("number_of_spawned_component_threads").inc()

        
    def set_finished_state(self, runtime_status_uri, state):
        """
        Change state of runtime resource.
        
        This method locks and changes the state of resource runtime. It should NOT be called
        if the caller has already got a lock the resource runtime object.
        
        @param runtime_status_uri: The status URI of the resource runtime object that must be
            modified.
        @type runtime_status_uri:  str
        
        @param state: The new state to be set to.
        @type state:  str
        
        """
        
        res_rt = runtime_table.get_status_uri_entry(runtime_status_uri)
        res_rt.lock.acquire()
        try:
            res_rt.status.state = state
            res_rt.exit_time = time.time()
            # Remove any POST input URIs.
            del_uri = [v.uri for v in res_rt.input_views.values() if v.http_method == "POST"]
            if len(del_uri) > 0:
                runtime_table.remove_runtime_view_uris(del_uri)
        finally:
            res_rt.lock.release()   
        
    def process_posted_input_stream(self, http_req, res_rt, view_name):
        """
        Sending POST data by stream method (specifically in HTTP 1.0)
        is a challenge, as one has to specify content length of data being sent upfront.
        To deal with this issue, it was decided that CC will accept a sequence of POSTs,
        to the same input URI, to allow data to be sent in chunks.
        The dictionary below will indicate to followup POST requests, that a initial
        POST request to the view has already been sent and has resulted in a stream and
        view object being created and therefore there is no need to create a view object.
        The followup request only needs to interact with the snap stream module to get
        associated with the existing snap stream.
        
        @param http_req: The HTTP POST request.
        @type http_req:  L{HttpRequest}
        
        @param res_rt: Resource runtime which has that input view.
        @type res_rt:  L{resource_runtime.ServerResourceRuntime}
        
        @param view_name: Name of the input view.
        @type view_name:  str
        
        """
        
        res_rt.lock.acquire()
        try:
            if view_name not in self._posted_input_streams:
                # This is the first POST to this input view, create an input view object for it.
                if res_rt.status.state != RuntimeStatus.Prepared:
                    self.component.log(snap_log.LEVEL_ERR,
                                       "Received initial POST on input view %s (resource %s) when not in Prepared state"
                                        % (view_name, res_rt.resource_uri))
                    return (http_req.FORBIDDEN, "The resource is already running.")
                
                ret = self._add_posted_input_view_object(http_req, res_rt, view_name)
                if ret is None:
                    return (http_req.INTERNAL_SERVER_ERROR, None)
                else:
                    (pipe, stream) = ret
                
                # We use this dictionary, even though similar view information is stored in the ComponentAPI
                # class, because there is always a chance that the data in ComponentAPI class may be accidentally
                # messed up by component authors.
                self._posted_input_streams[view_name] = stream
            else:
                # This is a followup POST.
                if res_rt.resdef.input_is_record_mode(view_name):
                    content_types = None
                else:
                    content_types = res_rt.resdef.get_input_view_content_types(view_name)
                (pipe, stream) = request_handler.process_post(http_req, content_types, view_name)
                # Just do a sanity check to make sure snap stream layer has returned the same stream
                # object as the last time.
                if stream != self._posted_input_streams[view_name]:
                    self.component.log(snap_log.LEVEL_ERR,
                                       "The stream object for followup POST to input view %s (%s) does not match %s" %
                                       (view_name, res_rt.resource_uri, http_req.snap_stream_continue))
                    return (http_req.INTERNAL_SERVER_ERROR, None)
        finally:
            res_rt.lock.release()
        
        # Finally, we let the thread go into a loop, reading from the POST.
        request_handler.stream_input(http_req, pipe)
                
    def _add_posted_input_view_object(self, http_req, res_rt, view_name):
        """
        Create a input view object for the first POST to an input view URI.
        
        @param http_req: The HTTP POST request.
        @type http_req:  L{HttpRequest}
        
        @param res_rt: Resource runtime which has that input view.
        @type res_rt:  L{resource_runtime.ServerResourceRuntime}
        
        @param view_name: Name of the input view.
        @type view_name:  str
        
        @return: A sequence (pipe, snap stream) if successful, else None
        @rtype:  2-tuple or None.
        
        """
        if res_rt.resdef.input_is_record_mode(view_name):
            is_pass_thru = res_rt.input_views[view_name].is_pass_through
            (field_mapper, field_numbers, null_dest_field_indices) = _process_field_links(res_rt, res_rt.resdef,
                                                                                          view_name)
            fields = res_rt.resdef.list_input_view_fields(view_name)
            # User defined content types is only a feature of binary mode views.
            content_types = None
        else:
            content_types = res_rt.resdef.get_input_view_content_types(view_name)
            # Fields and pass-through is only a feature of record mode views.
            is_pass_thru = False
            fields = None
            field_mapper = None
            field_numbers = None
            null_dest_field_indices = None
        (pipe, stream) = request_handler.process_post(http_req, content_types, view_name)
        
        if stream is None:
            res_rt.log(snap_log.LEVEL_ERR, "Failed to create stream for POST request to %s" % http_req.path)
            return None
    
        inp_view = InputView(view_name, stream, fields, field_mapper, null_dest_field_indices,
                             is_pass_thru, self.inp_trace_flag, self.fully_qualified_resource_name,
                             self.top_pipeline_rid)
        
        if view_name in self.input_views:
            # This should never happen.
            res_rt.log(snap_log.LEVEL_ERR, "CC has strange mismatch for resource %s, POST-ed view %s URI %s" %
                       (res_rt.resource_uri, view_name, http_req.path))
            return None
            
        self.input_views[view_name] = inp_view
        
        return (pipe, stream)
        
    def setup_input_views(self, res_def, res_rt):
        """
        Establishes snap stream with all the input sources by calling GET on them. For Input views, that
        have been requested to provide a URI for POST-ing, a URI is setup. In this case, The actual stream
        for that view be setup when the POST request arrives.
        
        @returns: Returns the list of input view objects created.
        @rtype:   list of L{InputView}
        
        """
        
        input_views = []
    
        for (inp_name, view) in res_rt.input_views.iteritems():
            
            if view.http_method == "POST":
                # We don't create a stream for this. We register a URI for it.
                u = runtime_table.add_runtime_view_entry(res_rt.resource_uri, view.is_record,
                                                         (res_rt.runtime_status_uri, False, inp_name))
                res_rt.input_views[inp_name].uri = snap_http_lib.concat_paths(cc.my_process_uri, u)
                continue
            
            if res_def.input_is_record_mode(inp_name):     
                (field_mapper, field_numbers, null_dest_field_indices) = _process_field_links(res_rt, res_def, inp_name)
                fields = res_def.list_input_view_fields(view.name)
                if view.is_pass_through:
                    uri = view.uri
                else:
                    # Does not accept all source view fields, as it isn't a pass-through input view.
                    # For efficiency reasons, tell the source view what fields we are interested in,
                    # while making the GET request
                    uri = _append_params_to_view_uri(view.uri, field_numbers)
    
                # Record mode views don't invlove user defined content types.
                content_types = None
                res_rt.log(snap_log.LEVEL_DEBUG, "Create input stream: view <%s> field_links <%s> uri <%s>"
                                             % (view.name, view.field_links, uri))
            else:
                uri = view.uri
                content_types = res_def.get_input_view_content_types(inp_name)
                # Binary mode views don't involve fields and pass-through
                fields = None
                field_mapper = None
                field_numbers = None
                null_dest_field_indices = None
              
            try:

                stream = snapstream.open_url(uri, content_types, view.name)
                stream.field_numbers = field_numbers
                inp_view = InputView(view.name, stream, fields, field_mapper, null_dest_field_indices,
                                     view.is_pass_through, self.inp_trace_flag, self.fully_qualified_resource_name,
                                     self.top_pipeline_rid)
                input_views.append(inp_view)

                # We only need to start the stream driver if we're reading from the URL
                if isinstance(stream, URLReader):
                    driver = StreamDriver(stream, res_rt)
                    driver.start()

            except SnapException, e:
                enew = SnapIOError("Failed to setup input streams for resource %s (%s). Target URI %s" %
                                   (res_rt.resource_uri, res_rt.resource_name, uri))
                raise SnapException.chain(enew, e)
            
        # Setup the input streams as we must have got input URLs from the prepare request
        for inp_view in input_views:
            self.input_views[inp_view.name] = inp_view
            
    def run(self):
        
        res_rt = runtime_table.get_status_uri_entry(self.component.runtime_status_uri)
        res_rt.lock.acquire()
        try:
            output_uris = []
            for v in res_rt.output_views.values():
                local_outputs = snapstream.open_local_writers(v.uri)
                self._output_streams += local_outputs
                output_uris.append(v.uri)
            snapstream.unregister_output_views(output_uris)
        finally:
            res_rt.lock.release()
        view_dict = {}
        for out in self._output_streams:
            if out.view_name not in view_dict:
                view_dict[out.view_name] = [out]
            else:
                view_dict[out.view_name].append(out)
        resdef = self.component._resdef
        for view_name in view_dict:
            if resdef.output_is_record_mode(view_name):
                fields = resdef.list_output_view_fields(view_name)
                if view_name in resdef.list_pass_through_output_views():
                    pass_through_inputs = resdef.get_output_view_pass_through(view_name)[keys.INPUT_VIEWS]
                else:
                    pass_through_inputs = None
            else:
                fields = None
                pass_through_inputs = None
            view = resdef.get_output_view(view_name)
            self.output_views[view_name] = OutputView(view_name, view_dict[view_name], fields,
                                                      pass_through_inputs, validate_recs, self.out_trace_flag,
                                                      self.fully_qualified_resource_name, self.top_pipeline_rid)
        for inp in self.input_views.values():
            self.component._stream_to_input_view[inp.snap_stream] = inp
        
        exec_complete = False
        try:
            self.component.execute(dict(self.input_views), dict(self.output_views))
            exec_complete = True
        except SnapStreamError, e:
            if self.got_stopped:
                self.component.log(snap_log.LEVEL_INFO, "Component %s (resource name %s) was stopped" % 
                                        (self.component.__class__.__name__, self.getName()))
            else:
                # Log specifically as SNAPSTREAM error. This will allow us to filter out snap stream
                # messages using the facility ID.
                (ignore_log, ss_elog, ignore_rlog) = cc.logger.make_specific_loggers(snap_log.LOG_SNAPSTREAM,
                                                                    self.rid, self.getName(),
                                                                    self.component._invoker_username)
                ss_elog(e, "Component %s execution failed with snapstream exception" % self.component.__class__.__name__)
        except Exception, e:
            self.component.elog(e, "Component execution failed")
        finally:
            if self.got_stopped:
                self.set_finished_state(self.component.runtime_status_uri, RuntimeStatus.Stopped)
            elif exec_complete:
                # We had a graceful return from the process() method and no one tried stopped the component
                self.set_finished_state(self.component.runtime_status_uri, RuntimeStatus.Completed)
            else:
                # The component exited with a failure.
                self.set_finished_state(self.component.runtime_status_uri, RuntimeStatus.Failed)
            
            for inp in self.input_views.values():
                try:
                    inp._close()
                except:
                    pass
            for out in self.output_views.values():
                try:
                    out._close()
                except:
                    pass
            if self.component.notification_uri is not None:
                try:
                    res_rt = runtime_table.get_status_uri_entry(self.component.runtime_status_uri)
                    res_rt.lock.acquire()
                    try:
                        res_rt.status.statistics = _gather_statistics(self)
                        notify_req = snap_control.create_notification_request(res_rt.resource_name, res_rt.status)
                    finally:
                        res_rt.lock.release()
                    snapi_base.send_req("PUT", self.component.notification_uri, notify_req)
                except Exception, e:
                    # Just log and move on.
                    self.component.elog(e, "Component resource notification send failed")
    def stop(self):
        """Stop the running component."""
        
        self.got_stopped = True
        self.component.got_stopped = True
        # Close the streams of the component. That should get the attention of the component.
        for inp in self.input_views.values():
            try:
                inp._close()
            except:
                pass
        for out in self.output_views.values():
            try:
                out._close()
            except:
                pass
        
def get_trace_flags(trace_option):
    """
    Parses the trace_option and returns the boolean flags
    or an error message.
    
    @param trace_option: option setting trace - L{snap_params.VALUE_INPUT}, L{snap_params.VALUE_OUTPUT},
                         or both, comma-separated 
    @type trace_option: str
    
    @return: a tuple with 3 elements:
                1. input_trace flag (bool)
                2. output_trace flag (bool)
                3. error message (if this is not None, the above flags should be ignored - 
                                  we could not parse the trace_option)
    @rtype: tuple
    
    """
    err_msg = None
    if trace_option is None:
        return (False, False, None)
    if not isinstance(trace_option, str) and not isinstance(trace_option, unicode):
        return (False, False, "String or unicode expected, received %s" % type(trace_option))
    trace_option = trace_option.strip()
    if not trace_option:
        return (False, False, None)
    inp_trace_flag = out_trace_flag = False
    l = trace_option.split(",")
    for v in l:
        v = v.strip().lower()
        if not v:
            continue
        if v == snap_params.VALUE_INPUT.lower():
            inp_trace_flag = True
        elif v == snap_params.VALUE_OUTPUT.lower():
            out_trace_flag = True
        else:
            err_msg = 'Expected one of: "%s", "%s", "%s,%s"; received %s' % (snap_params.VALUE_INPUT, snap_params.VALUE_OUTPUT, snap_params.VALUE_INPUT, snap_params.VALUE_OUTPUT, trace_option) 
            break
    return (inp_trace_flag, out_trace_flag, err_msg)
                     
def process_prepare_request(http_req):
    """
    Prepare a component for execution.
    
    This PREPARE request results in the creation of a resource runtime object (res_rt)
    containing information from the PREPARE request like:
     - pipeline's runtime id (if any)
     - resource name of the component in a pipeline or the naem given by a client
     - URIs of output views that the component resource is linked to.
     - HTTP methods used to access input views (POST or GET)
     - Params for the resource.
     
    The component now needs to provide the following missing information in the res_rt.
    - Runtime status URI for the resource runtime that was just created. This is made
      available by the runtime table, when the res_rt is added to it.
    - Runtime output view URIs for the output views of the component. These URIs are
      created by adding entries to the view runtime table
    - Runtime input view URIs, if the agent wants to use http method POST to send data
      to the input view. Again, these URIs are created by adding entries to the view
      runtime table
    Once this information is generated by this function, it is returned in the PREPARE
    response document. 
      
    @param http_req: The HTTP request that initiated the PREPARE request.
    @type http_req:  L{HttpRequest}
    
    @return: (HTTP status code, response object). The response object will be an error
        string the HTTP code is not OK. If it is OK, then the response object will be
        the PREPARE response document.
    @rtype:  2-tuple
    
    """
    resource_uri = http_req.path
    http_req.make_input_rp()
    try:
        mesg = http_req.input.next()
    except StopIteration:
        cc.log(snap_log.LEVEL_ERR, "POST to resource URI %s was missing prepare message" % resource_uri)
        return (http_req.BAD_REQUEST, "Resource definition missing")
    
    #print pp.pformat(mesg)
    try:
        resdef_dict = http_req.input.next()
    except StopIteration:
        return (http_req.BAD_REQUEST, "Resource definition missing")
    
    try:
        # Routine cleanup of runtime table.
        runtime_table.remove_completed_runtime_entries()
        res_rt = resource_runtime.ServerResourceRuntime(resource_uri)
        snap_control.parse_prepare_request(mesg, res_rt)
        res_def = resdef_module.ResDef(resdef_dict)
        (res_rt.log, res_rt.elog, ignore_rlog) = cc.logger.make_specific_loggers(res_def.get_component_name(),
                                                                    res_rt.rid, res_rt.resource_name, http_req.invoker)
    except Exception, e:
        cc.elog(e, "Prepare request to %s failed" % resource_uri)
        return (http_req.INTERNAL_SERVER_ERROR, None)
        
    try:
        retval = resource_runtime.synch_with_resdef(res_rt, res_def)
        if retval is not None:
            return retval
        comp_name = res_def.get_component_name()
        if comp_name is None:
            cc.log(snap_log.LEVEL_ERR, "POST to resource URI %s had a resdef without a component name." % resource_uri)
            return (http_req.BAD_REQUEST, "Resource definition has not component name in it")
        res_rt.log(snap_log.LEVEL_DEBUG, "Doing prepare for resource name \"%s\", component \"%s\"" %
                                         (res_rt.resource_name, comp_name))
        comp_class = registration.get_component_class(comp_name)
        if comp_class is None:
            res_rt.log(snap_log.LEVEL_ERR, "Component %s specified in resource definition %s was not found." 
                                           %(comp_name, resource_uri))
            
            return (http_req.INTERNAL_SERVER_ERROR, "Resource definition for %s was invalid" % resource_uri)
        
        # Create and populate the component object.
        component = comp_class()
        config_dict = registration.get_component_config(comp_name)
        r = res_def._check_param_values(res_rt.parameters)
        if r is not None:
            return (http_req.BAD_REQUEST, r)
        params = res_def._update_param_values_with_defaults(res_rt.parameters)
        # Remove the snaplagic specific param, as it should not be passed onto the component user space.
        trace_param = params.pop(snap_params.TRACE_DATA, None)
        (inp_trace_flag, out_trace_flag, err_msg) = get_trace_flags(trace_param)
        if err_msg:
            return (http_req.BAD_REQUEST, "Invalid value for %s: %s" % (snap_params.TRACE_DATA, err_msg)) 
        inp_trace_flag = inp_trace_flag or glob_inp_trace_flag
        out_trace_flag = out_trace_flag or glob_out_trace_flag 
 
            
        # This substitutes param values into resdef properties.
        prop.substitute_params_in_resdef(res_def, params)
        component._init_comp(config_dict, res_def, res_rt.resource_name, res_rt.rid, False, http_req.invoker,
                             res_rt.resource_ref)
        component._set_parameters(params)
        res_rt.status.state = RuntimeStatus.Prepared
        res_rt.resdef = res_def
        runtime_table.add_runtime_entry(res_rt, cc.my_process_uri)
        res_rt.lock.acquire()
        try:
            component.runtime_status_uri = res_rt.runtime_status_uri
            res_rt.comp_thread = ComponentThread(component, res_rt.resource_name, res_rt.context_name,
                                                 res_rt.rid, inp_trace_flag, out_trace_flag)
            res_rt.comp_thread.setup_input_views(res_def, res_rt)
            # Populate the output view URIs
            _setup_output_view_uris(res_rt)
            # Create the prepare response document and send it out.
            resp = snap_control.create_prepare_response(res_rt)
        finally:
            res_rt.lock.release()
        return (http_req.OK, resp)
        
    except Exception, e:
        res_rt.elog(e, "Prepare request to %s failed" % resource_uri)
        return (http_req.INTERNAL_SERVER_ERROR, None)

def _setup_output_view_uris(res_rt):
    """Creates and sets output URIs for output views in the runtime table"""
    
    for out_name in res_rt.output_views:
        if res_rt.resdef.output_is_record_mode(out_name):
            content_types = None
        else:
            content_types = res_rt.resdef.get_output_view_content_types(out_name)
        u = runtime_table.add_runtime_view_entry(res_rt.resource_uri, res_rt.output_views[out_name].is_record, 
                                                 (res_rt.runtime_status_uri, True, out_name))
        res_rt.output_views[out_name].uri = snap_http_lib.concat_paths(cc.my_process_uri, u)
        snapstream.register_output_view(res_rt.output_views[out_name].uri, content_types, out_name)
 



def _process_field_links(res_rt, res_def, inp_view_name):
    """
    Process the field links received in the prepare request.
    
    The field links are received in the prepare request in the following format:
    [
       ["src_field1", ["dest_field1", "dest_field2"]],
       ["src_field2", []],
       ["src_field3", ["dest_field3"],
       [None, ["dest_field4", "dest_field5"]]  # Destination fields that have been hard coded to the value None.
    ]
    
    In this format, every source field name is specified (whether it is linked or not) in the order
    defined by the output view. The destination field names linked to the source field are specified as a list,
    since there can be more than one. If the source field is not linked, then an empty list should be specified.
    
    """
    
    view = res_rt.input_views[inp_view_name]
    input_field_names = res_def.list_input_field_names(inp_view_name)
    null_dest_field_indices = []
    if view.field_links == resource_runtime.EXACT_MATCH:
        # EXACT_MATCH means that the requestor has not bothered to provide field link information,
        # but has assured the input view that data sent will exactly match the fields described by
        # the input view. For this reason, we cookup a field link here, which exactly matches
        # the fields of the input view. This situation typically happens when the agent sending
        # data to the input view is not another component in a pipeline, but some client using
        # snapi interface to POST data to the input view. Such a client does not have ability to
        # create field links.
        field_links = []
        for f in input_field_names:
            field_links.append((f, [f]))
    else:
        # Remove the nullified dest fields information from field_links and place it in a separate list.
        field_links = []
        unmapped_fields = list(input_field_names)
        for (f, l) in view.field_links:
            for dest_name in l:
                if dest_name not in input_field_names:
                    raise SnapValueError(
                            "Resource %s (%s) input view \"%s\" does not have field \"%s\ specified in field linking" %
                            (res_rt.resource_uri, res_rt.resource_name, inp_view_name, dest_name))
                if dest_name not in unmapped_fields:
                    # If dest field_name was in input_field_names list, but not in unmapped_fields, then it must be
                    # because the field has been linked to more than one source field.
                    raise SnapValueError("Resource %s (%s) input view \"%s\"  has the input field \"%s\" linked twice" %
                                         (res_rt.resource_uri, res_rt.resource_name, inp_view_name, dest_name))
                unmapped_fields.remove(dest_name)
            if f is None:
                null_dest_field_indices = [input_field_names.index(dname) for dname in l]
                
            else:
                field_links.append((f, l))
        if len(unmapped_fields) != 0:
            raise SnapValueError("Resource %s (%s) input view \"%s\" has unlinked fields %s" %
                                 (res_rt.resource_uri, res_rt.resource_name, inp_view_name, unmapped_fields))
                
    src_field_view_size = len(field_links)        
    src_field_position_nums = None
    if not view.is_pass_through:
        # If this view is not pass through, then filter out the source fields that are not manually linked.
        src_position = 0
        src_field_position_nums = []
        new_links = []
        for (src_field_name, dest_field_names) in field_links:
            if len(dest_field_names) > 0:
                src_field_position_nums.append(src_position)
                new_links.append((src_field_name, dest_field_names))
            src_position += 1
        field_links = new_links
        
    unmapped_fields = list(input_field_names)
    field_mapper = []
    for (src_field_name, dest_field_names) in field_links:
        if len(dest_field_names) == 0:
            # Pass through field
            field_mapper.append(None)
            continue
        
        # Set the indices of these destination fields in the field mapper
        field_mapper.append([input_field_names.index(dest_name) for dest_name in dest_field_names])
        
    
    
    if (src_field_position_nums is not None) and (len(src_field_position_nums) == src_field_view_size):
        # If we are using all the source field names, then there is no need to return the source field
        # positions needed, we want all of them from the source.
        src_field_position_nums = None
        
    return (field_mapper, src_field_position_nums, null_dest_field_indices)

        
def _append_params_to_view_uri(uri, src_field_nums):
    """
    Appends field numbers specified as parameter in the URL args
    
    """
    params = {}
    
    if src_field_nums is not None:
        prev = None
        strval = ""
        skipped_one = False
        src_field_nums.sort()
        for num in src_field_nums:
            if prev != None and prev + 1 == num :
                prev = num
                # It is contiguous, instead of writing 1,2,3 the aim is to write 1-3
                skipped_one = True
                continue
            
            if skipped_one:
                strval += ("-%s,%s" % (prev, num))
                skipped_one = False
                prev = num
                continue
            
            prev = num
        
            if len(strval):
                strval += "," + str(num)
            else:
                strval += str(num)
            
        if skipped_one:
            strval += ("-%s" %  num)
        params[snap_params.FIELD_NUMBERS] = strval
    
    return snap_http_lib.add_params_to_uri(uri, params)    

    
def process_runtime_put_request(http_req):
    """
    Called when CC gets a PUT request to a runtime URI.
    
    This PUT request can be one of two possible requests:
    1) A start request to a component runtime that is in prepared state. The URI is the status URI previously
       returned in the prepare response.
    2) A stop request to a component runtime that is in prepared or started state. The URI is the status URI
       previously returned in the prepare response.
    
    """
    res_rt = runtime_table.get_status_uri_entry(http_req.path, http_req.method)    
    http_req.make_input_rp()
    try:
        mesg = http_req.input.next()
    except StopIteration:
        res_rt.log(snap_log.LEVEL_ERR, "PUT request did not have data")
        return (http_req.BAD_REQUEST, "PUT request did not have data")
    response = None
    (req, req_data) = snap_control.parse_put_request(mesg)
    res_rt.lock.acquire()
    try:
        state = res_rt.status.state
        # Start Request
        if req == "start":
            if state == RuntimeStatus.Prepared:
                output_uris = [v.uri for v in res_rt.output_views.values()]
                runtime_table.remove_runtime_view_uris(output_uris)
                res_rt.comp_thread.component.notification_uri = req_data
                res_rt.comp_thread.start()
                res_rt.status.state = RuntimeStatus.Started
                response = snap_control.create_start_stop_response(res_rt.status)
            else:
                res_rt.log(snap_log.LEVEL_ERR, "Cannot send start request to resource %s (%s) that is in state: %s" %
                                             (res_rt.resource_uri, res_rt.resource_name, state))
                return (http_req.FORBIDDEN, "The resource is not in prepared state")
            
        # Stop Request
        elif req == "stop":
            if state == RuntimeStatus.Prepared:
                # It is only in prepared stated. There is no component thread running. Just mark the runtime
                # state as stopped and set the time of exit of this runtime resource.
                output_uris = [v.uri for v in res_rt.output_views.values()]
                runtime_table.remove_runtime_view_uris(output_uris +
                                               [v.uri for v in res_rt.input_views.values() if v.http_method == "POST"])
                snapstream.unregister_output_views(output_uris)
                res_rt.status.state = RuntimeStatus.Stopped
                res_rt.exit_time = time.time()
                response = snap_control.create_start_stop_response(res_rt.status)
            elif state == RuntimeStatus.Started:
                # We have already started the component thread. Request that thread to stop and set state
                # to Stopping.
                res_rt.comp_thread.stop()
                res_rt.status.state = RuntimeStatus.Stopping
                response = snap_control.create_start_stop_response(res_rt.status)
            elif state not in (RuntimeStatus.Completed, RuntimeStatus.Failed,
                               RuntimeStatus.Stopped, RuntimeStatus.Stopping):
                # If the component has already entered one of the 3 finished states or Stopping state, then
                # we ignore this stop request. If not, then we should have never received this stop request,
                # raise an error.
                res_rt.log(snap_log.LEVEL_ERR, "Cannot send stop request to resource %s (%s) that is in state: %s" %
                                               (res_rt.resource_uri, res_rt.resource_name, state))
                return (http_req.FORBIDDEN, "Resource in state %s cannot be stopped" % state)
        else:
            res_rt.log(snap_log.LEVEL_ERR, "Unknown PUT request to resource %s (%s): %s" %
                                            (res_rt.resource_uri, res_rt.resource_name, req))
            return (http_req.BAD_REQUEST, None)
    finally:
        res_rt.lock.release()
        
    return (http_req.OK, response)
    
def process_runtime_get_request(http_req):
    """
    Process a get request for a runtime URI.
    
    This can be one of two possible types of request:
    1) A request to start a snapstream to a runtime output view URI.
    2) A request to get status of the component runtime.
    
    """
    
    if http_req.path.startswith(uri_prefix.RUNTIME_STATUS_ENTRY):
       resp = _status_get_request(http_req)
       return (http_req.OK, resp)
       
    elif http_req.path.startswith(uri_prefix.RUNTIME_VIEW):
        ret = _output_view_get_request(http_req)
        # No status can be written after processing streaming data. Just return None.
        return ret
    else:
        cc.log(snap_log.LEVEL_ERR, "No such runtime uri found (%s)" % http_req.path)
        return (http_req.NOT_FOUND, "No such runtime uri found (%s)" % http_req.path)
    
def _status_get_request(http_req):
    """
    Process request for the component runtime status.
    
    """
    res_rt = runtime_table.get_status_uri_entry(http_req.path, http_req.method)
    res_rt.lock.acquire()
    try:
        res_rt.status.statistics = _gather_statistics(res_rt.comp_thread)
        resp = snap_control.create_status_info(res_rt.status)
    finally:
        res_rt.lock.release()
    return resp

def _gather_statistics(comp_thread):
    
    statistics = {}
    statistics[keys.INPUT_VIEWS] = {}
    statistics[keys.OUTPUT_VIEWS] = {}
    for inp_name in comp_thread.input_views:
        inp_stat = statistics[keys.INPUT_VIEWS][inp_name] = {}
        if comp_thread.input_views[inp_name].is_binary:
            inp_stat[keys.STATISTICS_BYTE_COUNT] = comp_thread.input_views[inp_name].byte_count
        else:
            inp_stat[keys.STATISTICS_RECORD_COUNT] = comp_thread.input_views[inp_name].record_count
    for out_name in comp_thread.output_views:
        out_stat = statistics[keys.OUTPUT_VIEWS][out_name] = {}
        if comp_thread.output_views[out_name].is_binary:
            out_stat[keys.STATISTICS_BYTE_COUNT] = comp_thread.output_views[out_name].byte_count
        else:
            out_stat[keys.STATISTICS_RECORD_COUNT] = comp_thread.output_views[out_name].record_count
    return statistics
    
                    
def _output_view_get_request(http_req):
    """
    Add an output stream to the component which was created as a result of a GET request
    
    """
    try:
        field_numbers = _parse_view_uri_params(http_req.params)
        (runtime_status_uri, is_output, view_name) = runtime_table.get_view_uri_entry(http_req.path)
        res_rt = runtime_table.get_status_uri_entry(runtime_status_uri)
        
    except Exception, e:
        cc.elog(e, "Failed to process GET request to output view at %s" % http_req.path)
        return (http_req.INTERNAL_SERVER_ERROR, None)
     
    if not is_output:
        res_rt.log(snap_log.LEVEL_ERR, "Input view \"%s\" (uri %s) received a GET  request. Expecting a POST" %
                    (view_name, http_req.path))
        return (http_req.METHOD_NOT_ALLOWED, None)
    
    res_rt.lock.acquire()
    try:
        if res_rt.resdef.output_is_record_mode(view_name):
            content_types = None
        else:
            content_types = res_rt.resdef.get_output_view_content_types(view_name)
    finally:
        res_rt.lock.release()
        
    (pipe, stream) = request_handler.process_get(http_req, content_types, view_name)
    if stream is None:
        res_rt.log(snap_log.LEVEL_ERR, "Failed to create stream for GET request to %s" % http_req.path)
        return (http_req.INTERNAL_SERVER_ERROR, None)
    # Since each output view can have multiple streams, we need to keep field_numbers information on a
    # per-stream basis.
    stream.field_numbers = field_numbers
    res_rt.lock.acquire()
    try:
        res_rt.comp_thread._output_streams.append(stream)
    finally:
        res_rt.lock.release()
        
    # This handler method will go into a loop and the thread will return only when the stream is
    # closed.
    request_handler.stream_output(http_req, pipe)
    return None

    
def _parse_view_uri_params(params):
    """
    Get field numbers specified as parameter in the request URL.
    
    This is specified, when the downstream input view is requesting only a subset of the
    output view's fields. This is an optimization to prevent an output view from sending
    fields that are not even linked to the downstream input view.
    
    @param params: A param dictionary that might have the fields list param.
    @type params:  dict
    
    @return: List of field numbers that have been requested by down stream input view.
    @rtype:  list
    
    """
    num_list = None
    strval = params.get(snap_params.FIELD_NUMBERS)
    
    if strval is not None: 
        values = strval.split(',')
        num_list = []
        for v in values:
            if v.find('-') != -1:
                vrange = v.split('-')
                if len(vrange) != 2:
                    raise SnapFormatError("Field number arg <%s> has invalid range <%s>" %
                                         (strval, v))
                try:
                    lowval = int(vrange[0])
                    highval = int(vrange[1])
                except ValueError:
                    raise SnapFormatError("Field number arg <%s> has non numeric range <%s>" %
                                          (strval, v))
                if highval <= lowval:
                    raise SnapFormatError("Field number arg <%s> has invalid range <%s>" %
                                          (strval, v))
                tmplist = range(lowval, highval + 1)
                num_list.extend(tmplist)
            else:
                try:
                    val = int(v)
                except ValueError:
                    raise SnapFormatError("Field number arg <%s> has non numeric value <%s>" %
                                          (strval, v))
                num_list.append(val)
        
        num_list.sort()
    
    return num_list

def process_runtime_post_request(http_req):
    """
    Process POST request to an input view URI.
    
    This request is made by a external client, POSTing to input view.
    
    """
    if http_req.path.startswith(uri_prefix.RUNTIME_RECORD_VIEW):
        # We only fetch an RP for the record mode view. We should not do it for binary mode view.
        http_req.make_input_rp()
    (runtime_status_uri, is_output, view_name) = runtime_table.get_view_uri_entry(http_req.path)
    res_rt = runtime_table.get_status_uri_entry(runtime_status_uri)
    if is_output:
        res_rt.log(snap_log.LEVEL_ERR,
                   "Output view \"%s\" (resource %s uri %s) received a POST request. Expecting a GET"
                   % (view_name, res_rt.resource_name, res_rt.resource_uri))
        return (http_req.METHOD_NOT_ALLOWED, None)
    
    res_rt.comp_thread.process_posted_input_stream(http_req, res_rt, view_name)
    
    return None
    
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.