output_view.py :  » Development » SnapLogic » snaplogic » cc » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » SnapLogic 
SnapLogic » snaplogic » cc » output_view.py
# $SnapHashLicense:
# 
# SnapLogic - Open source data services
# 
# Copyright (C) 2009, SnapLogic, Inc.  All rights reserved.
# 
# See http://www.snaplogic.org for more information about
# the SnapLogic project. 
# 
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
# 
# "SnapLogic" is a trademark of SnapLogic, Inc.
# 
# 
# $
# $Id: output_view.py 7261 2009-04-16 01:05:17Z dhiraj $

from snaplogic.common.data_tracing import DataTracer
from snaplogic.common.snap_exceptions import *
from snaplogic.common.data_types import Record

class OutputViewBase(object):
    
    """
    This base class defines some of the interfaces needed by a component to write to an output view.
    
    This base class is derived from to implement the full interface needed by components. We have this
    class hierarchy, so that one derived class can be implemented for use by the CC and another derived
    class can be used by the "CC simulator" that can be used to unit test components.
    
    """
    
    def __init__(self, view_name, fields = None, pass_through_inputs = None, validate_recs = False):
        """
        Initialize output view object.
        
        @param view_name: Name of the output view
        @type view_name:  str
        
        @param fields: sequence of 3 tuples, where each tuple consists of (name, type, field documentation)
        @type fields:  sequence of tuples
        
        @param pass_through_inputs: List of input view names, specified for this output view (if any).
        @type pass_through_inputs:  List of str
        
        @param validate_recs: Set to True, if outgoing record must be validated.
        @type validate_recs:  bool
        
        """
        self.name                 = view_name
        self.is_binary            = False
        self.record_count         = 0
        self.byte_count           = 0
        self.is_binary            = fields is None
        
        if self.is_binary:
            # Mark write_record method as not usable in binary mode view.
            self.write_record = self.error_binary
            self.end_record_stream = self.error_binary
        else:
            self.fields = fields
            self.field_names = []
            self.field_types = []
            for f in self.fields:
                self.field_names.append(f[0])
                self.field_types.append(f[1])
            self._python_types_map = Record.create_python_types_map(self.field_names, self.field_types)
            self._python_types_list = [self._python_types_map[fname] for fname in self.field_names ]
            self.field_names = tuple(self.field_names)
            self.field_types = tuple(self.field_types)
            self.pass_through_inputs = pass_through_inputs
            self._idx_list = range(len(self.field_names))
            self._validate_field_type = validate_recs
            # Mark write_binary and list_binary_content_types methods as not usable in record mode.
            self.write_binary = self.error_record
            self.list_binary_content_types = self.error_record
        self._is_closed = False
        
    
    def error_record(self, *args, **kargs):
        """Generates an error message for calling invalid method for record mode. The params are ignored."""
        raise SnapObjTypeError("This method is not supported in record mode view")
    
    def error_binary(self, *args, **kargs):
        """Generates an error message for calling invalid method for binary mode. The params are ignored."""
        raise SnapObjTypeError("This method is not supported in binary mode view")
        
        
    def list_binary_content_types(self):
        """
        Return the list of content types that been negotiated by the streams of this output view.
        
        This method is only valid for binary mode views.
        
        @return: List of content types that have been negotiated by the streams.
        @rtype:  list
        """
        return self._content_type_map.keys()
    
    def create_record(self):
        """
        Create a record that matches the  fields defined by this output view.
        
        @return: Empty record with appropriate structure for this output view.
        @rtype:  L{Record}
        
        """
        
        return Record(self.name, self.field_names, self._python_types_map)    
    
        
class OutputView(OutputViewBase):
    
    """
    This class provides the interface needed by a component to write to an output view.
    
    An output view can be in record mode or binary mode. By default, views are in record mode
    and send data in a record structure, via the method write_record(). In binary mode,
    the data is treated as a string containing binary data and are written via the method
    write_binary(). 
    
    NOTE: A view at its definition time is defined as record or binary and this does not
        change at runtime.
    
    """
    
    def __init__(self, view_name, snap_stream_list, fields = None, pass_through_inputs = None, validate_recs = False,
                 trace_flag = False, fq_res_name = None, top_pipe_id = None):
        """
        Initialize output view object.
        
        @param view_name: Name of the output view
        @type view_name:  str

        @param snap_stream_list: List of snap_stream objects associated with the output view.
        @type snap_stream_list:  list
        
        @param fields: sequence of 3 tuples, where each tuple consists of (name, type, field documentation)
        @type fields:  sequence of tuples
        
        @param pass_through_inputs: List of input view names, specified for this output view (if any).
        @type pass_through_inputs:  List of str
        
        @param validate_recs: Set to True, if outgoing record must be validated.
        @type validate_recs:  bool
        
        @param trace_flag: If set to True, then records written out will be dumped into a trace file. Default is False.
        @type trace_flag:  bool
        
        @param fq_res_name: Fully qualified resource name (the names of enclosing pipelines prefixed to it in dotted
            notation format.
        @type fq_res_name:  str
        
        @param top_pipe_id: Runtime id of the topmost enclosing pipeline which triggered the execution of this resource
        @type top_pipe_id:  str
        
        """
        super(OutputView, self).__init__(view_name, fields, pass_through_inputs, validate_recs)
        self.snap_stream_list     = snap_stream_list
        self.tracer               = DataTracer() if trace_flag else None
        
        mode = None
        for sstream in snap_stream_list:
            if mode is not None:
                # Make sure all streams in this view are in the same mode.
                if mode != sstream.stream_mode:
                    raise SnapValueError("Output view \"%s\" has some streams as binary and others as record" %
                                          self.name)
            else:
                mode = sstream.stream_mode
        
        if self.is_binary:
            if mode != sstream.BINARY_STREAM:
                raise SnapValueError("Output view \"%s\" is in binary mode, but the streams are not." % self.name)
            if self.tracer is not None:
                self.tracer.begin_binary_tracing(top_pipe_id, fq_res_name, "out", self.name)
            # Create a dictionary mapping the content types to the streams.
            self._content_type_map = {}
            for ss in self.snap_stream_list:
                ctype = ss.content_type
                if ctype not in self._content_type_map:
                    self._content_type_map[ctype] = [ss]
                else:
                    self._content_type_map[ctype].append(ss)
        else:
            if mode != sstream.RECORD_STREAM:
                raise SnapValueError("Output view \"%s\" is in record mode, but the streams are not." % self.name)
            if self.tracer is not None:
                self.tracer.begin_record_tracing(top_pipe_id, fq_res_name, "out", self.name)
        
    def write_record(self, record):
        """
        Writes the record to all the streams associated with the output view.
        
        This method is only valid for record mode views. 
        
        @param record: The record that needs to be written.
        @type record:  L{Record}
        
        """
        # We want to get None for fields that are not set.
        raw_rec = [record.get(f) for f in record.field_names]
            
        if self._validate_field_type:
            i = 0
            for f in raw_rec:
                if f is not None and type(f) != self._python_types_list[i]:
                    raise SnapObjTypeError("Field '%s' for output view '%s' had wrong data type '%s'. Expected '%s'. "
                                        "Please refer to https://www.snaplogic.org/trac/ticket/1355 for more details."
                                        % (self.field_names[i], self.name, type(f).__name__,
                                            self._python_types_list[i].__name__))
                i += 1
           
        if self.pass_through_inputs is not None:
            for inp_name in self.pass_through_inputs:
                raw_rec += record._internal_pass_through[inp_name]
            for inp_name in self.pass_through_inputs:
                raw_rec += record._external_pass_through[inp_name]
        
        if self.tracer is not None:
            self.tracer.trace_record(raw_rec)
                            
        for sstream in self.snap_stream_list:
            if sstream.field_numbers != None:
                # Specific field numbers have been requested by the downstream component
                out_rec = [ field for (i, field) in enumerate(raw_rec) if i in sstream.field_numbers]
                sstream.write_record(out_rec)
            else:
                sstream.write_record(raw_rec)
        
        self.record_count += 1
        
    def write_binary(self, binary_data, content_type):
        """
        Write binary data to all the streams associated with the view that have negotiated to that content type. 
        
        @param binary_data: The data to be written out.
        @type binary_data:  str
        
        @param content_type: The content type being written. Please make sure that only the content_type in the
            list returned by get_binary_content_type() is used in this param.
        @type content_type:  str
        
        @raise KeyError: If the specified content_type does not exist in any of the streams. Please make sure that
            only the content_type in the list returned by get_binary_content_type() is used to avoid this
            exception.
            
        """
        
        if self.tracer is not None:
            self.tracer.trace_bytes(binary_data)
            
        for sstream in self._content_type_map[content_type]:
            sstream.write_bytes(binary_data)
            
        self.byte_count += len(binary_data)
        
    def completed(self):
        """
        Output to the view has been successfully completed.
        
        While calling this in general after sending all the data is a good practice, it is actually
        a "must" to call this for a record mode output view, otherwise the downstream components will
        assume an error has occured.
        
        """
        if not self.is_binary:
            for sstream in self.snap_stream_list:
                sstream.end_of_records()
        
        self._close()
        
    def _close(self):
        """
        Close and do cleanup of the view object.
        
        This method is meant for internal use by the CC.
        
        """
        if self._is_closed:
            return
        if self.tracer is not None:
            self.tracer.end_tracing()
            
        for sstream in self.snap_stream_list:
            sstream.close()
        self._is_closed = True
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.