rpc_server.py :  » Web-Frameworks » Zope » Zope-2.6.0 » ZServer » medusa » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Web Frameworks » Zope 
Zope » Zope 2.6.0 » ZServer » medusa » rpc_server.py
# -*- Mode: Python; tab-width: 4 -*-

# Copyright 1999, 2000 by eGroups, Inc.
# 
#                         All Rights Reserved
# 
# Permission to use, copy, modify, and distribute this software and
# its documentation for any purpose and without fee is hereby
# granted, provided that the above copyright notice appear in all
# copies and that both that copyright notice and this permission
# notice appear in supporting documentation, and that the name of
# eGroups not be used in advertising or publicity pertaining to
# distribution of the software without specific, written prior
# permission.
# 
# EGROUPS DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
# NO EVENT SHALL EGROUPS BE LIABLE FOR ANY SPECIAL, INDIRECT OR
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

# There are two RPC implementations here.

# The first ('rpc') attempts to be as transparent as possible, and
# passes along 'internal' methods like __getattr__, __getitem__, and
# __del__.  It is rather 'chatty', and may not be suitable for a
# high-performance system.

# The second ('fastrpc') is less flexible, but has much less overhead,
# and is easier to use from an asynchronous client.

import marshal
import socket
import string
import sys
import types

import asyncore
import asynchat

from producers import scanning_producer
from counter import counter

MY_NAME = string.split (socket.gethostname(), '.')[0]

# ===========================================================================
#                RPC server
# ===========================================================================

# marshal is good for low-level data structures.
# but when passing an 'object' (any non-marshallable object)
# we really want to pass a 'reference', which will act on
# the other side as a proxy.  How transparent can we make this?

class rpc_channel (asynchat.async_chat):

    'Simple RPC server.'
    
    # a 'packet': NNNNNNNNmmmmmmmmmmmmmmmm
    # (hex length in 8 bytes, followed by marshal'd packet data)
    # same protocol used in both directions.
    
    STATE_LENGTH = 'length state'
    STATE_PACKET = 'packet state'
    
    ac_out_buffer_size = 65536
    
    request_counter = counter()
    exception_counter = counter()
    client_counter = counter()
    
    def __init__ (self, root, conn, addr):
        self.root = root
        self.addr = addr
        asynchat.async_chat.__init__ (self, conn)
        self.pstate = self.STATE_LENGTH
        self.set_terminator (8)
        self.buffer = []
        self.proxies = {}
        rid = id(root)
        self.new_reference (root)
        p = marshal.dumps ((rid,))
        # send root oid to the other side
        self.push ('%08x%s' % (len(p), p))
        self.client_counter.increment()
        
    def new_reference (self, object):
        oid = id(object)
        ignore, refcnt = self.proxies.get (oid, (None, 0))
        self.proxies[oid] = (object, refcnt + 1)
        
    def forget_reference (self, oid):
        object, refcnt = self.proxies.get (oid, (None, 0))
        if refcnt > 1:
            self.proxies[oid] = (object, refcnt - 1)
        else:
            del self.proxies[oid]
            
    def log (self, *ignore):
        pass
        
    def collect_incoming_data (self, data):
        self.buffer.append (data)
        
    def found_terminator (self):
        self.buffer, data = [], string.join (self.buffer, '')
        
        if self.pstate is self.STATE_LENGTH:
            packet_length = string.atoi (data, 16)
            self.set_terminator (packet_length)
            self.pstate = self.STATE_PACKET
        else:
        
            self.set_terminator (8)
            self.pstate = self.STATE_LENGTH
            
            oid, kind, arg = marshal.loads (data)
            
            obj, refcnt = self.proxies[oid]
            e = None
            reply_kind = 2
            
            try:
                if kind == 0:
                        # __call__
                    result = apply (obj, arg)
                elif kind == 1:
                        # __getattr__
                    result = getattr (obj, arg)
                elif kind == 2:
                        # __setattr__
                    key, value = arg
                    result = setattr (obj, key, value)
                elif kind == 3:
                        # __repr__
                    result = repr(obj)
                elif kind == 4:
                        # __del__
                    self.forget_reference (oid)
                    result = None
                elif kind == 5:
                        # __getitem__
                    result = obj[arg]
                elif kind == 6:
                        # __setitem__
                    (key, value) = arg
                    obj[key] = value
                    result = None
                elif kind == 7:
                        # __len__
                    result = len(obj)
                    
            except:
                reply_kind = 1
                (file,fun,line), t, v, tbinfo = asyncore.compact_traceback()
                result = '%s:%s:%s:%s (%s:%s)' % (MY_NAME, file, fun, line, t, str(v))
                self.log_info (result, 'error')
                self.exception_counter.increment()
                
            self.request_counter.increment()
            
            # optimize a common case
            if type(result) is types.InstanceType:
                can_marshal = 0
            else:
                can_marshal = 1
                
            try:
                rb = marshal.dumps ((reply_kind, result))
            except ValueError:
                can_marshal = 0
                
            if not can_marshal:
                    # unmarshallable object, return a reference
                rid = id(result)
                self.new_reference (result)
                rb = marshal.dumps ((0, rid))
                
            self.push_with_producer (
                    scanning_producer (
                            ('%08x' % len(rb)) + rb,
                            buffer_size = 65536
                            )
                    )
            
class rpc_server_root:
    pass
    
class rpc_server (asyncore.dispatcher):

    def __init__ (self, root, address = ('', 8746)):
        self.create_socket (socket.AF_INET, socket.SOCK_STREAM)
        self.set_reuse_addr()
        self.bind (address)
        self.listen (128)
        self.root = root
        
    def handle_accept (self):
        conn, addr = self.accept()
        rpc_channel (self.root, conn, addr)
        
        
        # ===========================================================================
        #               Fast RPC server
        # ===========================================================================
        
        # no proxies, request consists
        # of a 'chain' of getattrs terminated by a __call__.
        
        # Protocol:
        # <path>.<to>.<object> ( <param1>, <param2>, ... )
        # => ( <value1>, <value2>, ... )
        #
        #
        # (<path>, <params>)
        # path: tuple of strings
        # params: tuple of objects
        
class fastrpc_channel (asynchat.async_chat):

    'Simple RPC server'
    
    # a 'packet': NNNNNNNNmmmmmmmmmmmmmmmm
    # (hex length in 8 bytes, followed by marshal'd packet data)
    # same protocol used in both directions.
    
    # A request consists of (<path-tuple>, <args-tuple>)
    # where <path-tuple> is a list of strings (eqv to string.split ('a.b.c', '.'))
    
    STATE_LENGTH = 'length state'
    STATE_PACKET = 'packet state'
    
    def __init__ (self, root, conn, addr):
        self.root = root
        self.addr = addr
        asynchat.async_chat.__init__ (self, conn)
        self.pstate = self.STATE_LENGTH
        self.set_terminator (8)
        self.buffer = []
        
    def log (*ignore):
        pass
        
    def collect_incoming_data (self, data):
        self.buffer.append (data)
        
    def found_terminator (self):
        self.buffer, data = [], string.join (self.buffer, '')
        
        if self.pstate is self.STATE_LENGTH:
            packet_length = string.atoi (data, 16)
            self.set_terminator (packet_length)
            self.pstate = self.STATE_PACKET
        else:
            self.set_terminator (8)
            self.pstate = self.STATE_LENGTH
            (path, params) = marshal.loads (data)
            o = self.root
            
            e = None
            
            try:
                for p in path:
                    o = getattr (o, p)
                result = apply (o, params)
            except:
                e = repr (asyncore.compact_traceback())
                result = None
                
            rb = marshal.dumps ((e,result))
            self.push (('%08x' % len(rb)) + rb)
            
class fastrpc_server (asyncore.dispatcher):

    def __init__ (self, root, address = ('', 8748)):
        self.create_socket (socket.AF_INET, socket.SOCK_STREAM)
        self.set_reuse_addr()
        self.bind (address)
        self.listen (128)
        self.root = root
        
    def handle_accept (self):
        conn, addr = self.accept()
        fastrpc_channel (self.root, conn, addr)
        
        # ===========================================================================
        
if __name__ == '__main__':

    class thing:
        def __del__ (self):
            print 'a thing has gone away %08x' % id(self)
            
    class sample_calc:
    
        def product (self, *values):
            return reduce (lambda a,b: a*b, values, 1)
            
        def sum (self, *values):
            return reduce (lambda a,b: a+b, values, 0)
            
        def eval (self, string):
            return eval (string)
            
        def make_a_thing (self):
            return thing()
            
    import sys
    
    if '-f' in sys.argv:
        server_class = fastrpc_server
        address = ('', 8748)
    else:
        server_class = rpc_server
        address = ('', 8746)
        
    root = rpc_server_root()
    root.calc = sample_calc()
    root.sys = sys
    rs = server_class (root, address)
    asyncore.loop()
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.