SecureOverlay.py :  » Network » Torrent-Swapper » swapper » Swapper » Overlay » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Network » Torrent Swapper 
Torrent Swapper » swapper » Swapper » Overlay » SecureOverlay.py
# Written by Jie Yang, Arno Bakker
# see LICENSE.txt for license information

""" 
- The middle layer between OverlaySwarm and BuddyCast/DownloadHelp 
- A high level module, like buddycast or dlhelp, calls SecureOverlay.addTask,
and then SecureOverlay will handle the task.
- There is only one task for secure overlay: Send message (the message can be None)
- But each time before sending a message, secure overlay must connect the target's
overlay.
- If the message is None, secure overlay only creates an overlay connection.
- Each time after an normal connection is created and if the other peer supports
overlay swarm, it will always create a task without message
- After overlay connection is created, secure overlay will update the (permid, (ip, port))
in local cache.
- The target can be either permid or (ip, port)
- If the target is permid, the task much check if target's permid matches
the task's permid
- If the target is (ip, port), connect directly and record the peer's permid later on.
"""

from time import time,ctime
from socket import inet_aton,gethostbyname
from traceback import print_exc,print_stack
from threading import RLock,currentThread
import sys

from BitTornado.BT1.MessageID import CANCEL,getMessageName
from Swapper.CacheDB.CacheDBHandler import PeerDBHandler,MyDBHandler
from Swapper.utilities import *
from Swapper.__init__ import GLOBAL
from Swapper.Statistics.Logger import OverlayLogger



try:
    True
except:
    True = 1
    False = 0

DEBUG = False

Length_of_permid = 0    # 0: no restriction

def isValidDNS(dns):
    if isinstance(dns, tuple) and len(dns)==2 and \
       validIP(dns[0]) and isValidPort(dns[1]):
           return True
    return False
    

class DNSOverlayTask:
    """ 
    Basic task to connect peer's overlay swarm by dns and send message by Secure Overlay.
    It is an observer class in Observer Pattern.
    """
    
    def __init__(self, secure_overlay, subject_manager, dns, message=None, timeout=15):
        self.subject_manager = subject_manager
        self.dns = dns
        self.message_to_send = message
        self.callback = None    # used by update
        self.secure_overlay = secure_overlay
        self.subject = None
        self.expire = int(time() + timeout)
        self.registered = False
        
    def isExpired(self, now=0):
        if now == 0:
            now = time()
        return now > self.expire
        
    def register(self, dns):    # register a subject
        if self.registered or not dns:
            return
        self.subject = self.subject_manager.getSubject(dns)    # register a subject or get an old subject
        self.subject.attachObserver(self)
        self.registered = True

    def unregister(self, reason='done'):    # TODO: count and record the fail reason
#        if DEBUG:
#            print >> sys.stderr, "secover: task on %s %s" % (self.dns, reason)
        if self.registered:
            if self.subject:
                self.subject.detachObserver(self, reason)
        self.registered = False
            
    def setCallback(self, callback):    # it must be set before start
        self.callback = callback
        
    def start(self):    # phase 1: find or make overlay connection
        if self.isExpired():
            self.unregister('expired')
        elif self.findConnection():    # if connection exists, send message now
            self.sendMessage()    
        else:
            self.register(self.dns)    # otherwise make overlay connection
            self.makeConnection()
            return 1    # make a new connecting attempt
        return 0
                
    def update(self):    # phase 2: overlay connection has been made; send msg now
#        if DEBUG:
#            print >> sys.stderr,"secover: overlay task update", self.dns, id(self)
        if not self.registered:
            return
        # to improve performance, don't remove expired tasks at this point
        if self.callback:    # used by other task
            reason = self.callback()
            if reason != 'done':
                self.unregister(reason)
        else:
            self.sendMessage()
        
    def sendMessage(self):
        if self.message_to_send != None:
            if not self.permid:
                self.permid = self.secure_overlay.findPermidByDNS(self.dns)
            self.secure_overlay.sendMessage(self.permid, self.message_to_send)
            if DEBUG:
                print >> sys.stderr,"secover: dns task send message", getMessageName(self.message_to_send[0]), self.dns
            self.message_to_send = None
        self.unregister('done')
        
    def makeConnection(self):
        self.secure_overlay.connectPeer(self.dns)
        
    def findConnection(self):
        # if connection is created, secure overlay must have the permid
        self.permid = self.secure_overlay.findPermidByDNS(self.dns)
        return self.permid


class PermidOverlayTask:
    """ 
    A task to connect peer's overlay swarm by permid and send message.
    It delegates DNSOverlayTask to do the real stuffs.
    """
    
    def __init__(self, secure_overlay, subject_manager, permid, message=None, timeout=15):
        self.secure_overlay = secure_overlay
        self.permid = permid
        self.dns = self.secure_overlay.findDNSByPermid(self.permid)    # first lookup overlay
        self.peer_db = secure_overlay.peer_db
        if not self.dns:    # then lookup local cache
            #if DEBUG:
            #    print >> sys.stderr,"secover: PermidOverlayTask: don't know dns for permid",show_permid(permid)
            self.dns = self.findDNSFromCache()
        else:
            #if DEBUG:
            #    print >> sys.stderr,"secover: PermidOverlayTask: dns for permid is known",self.dns
            pass
        if isValidDNS(self.dns):
            if GLOBAL.overlay_log:
                write_overlay_log('CONN_TRY', permid, dns=self.dns)
            self.task = DNSOverlayTask(secure_overlay, subject_manager, self.dns, message, timeout)
        else:
            self.task = None
        
    def findDNSFromCache(self):
        #if DEBUG:
        #    return ('1.2.3.4', 80)
        peer = self.peer_db.getPeer(self.permid)
        if peer:
            return (peer['ip'], int(peer['port']))
        
    def start(self):    # phase 1: start basic overlay task
        if self.task:
            self.task.setCallback(self.update)
            ret = self.task.start()
            if ret == 1:
                self.secure_overlay.addTryTimes(self.permid)
            
    def update(self):    # phase 2: check permids, and send msg if they match
#        if DEBUG:
#            print >> sys.stderr,"secover: permid task update", self.dns
        
        if self.dns:
            permid = self.secure_overlay.findPermidByDNS(self.dns)

            #if DEBUG:
            #    print >> sys.stderr,"secover: Think connecting to",show_permid(self.permid)," and connected to",show_permid(permid)

            if self.permid == permid and self.task:
                self.task.sendMessage()
                return 'done'
            elif DEBUG and self.permid != permid:
                print >> sys.stderr,"secover: Connection established but permid does not match!"
                return 'wrong_permid'
    
                        
class Subject:
    """ A subject class in Observer Pattern """
    
    def __init__(self, dns, subject_manager):
        self.dns = dns    # dns = (ip, port)
        self.observers = []    # tasks
        self.subject_manager = subject_manager
        
    def isEmpty(self):
        return len(self.observers) == 0
        
    def notify(self):
        for observer in self.observers:
            #if DEBUG:
            #    print >> sys.stderr,"secover: subject", self.dns, "notifies observer", observer
            observer.update()
            
    def attachObserver(self, observer):
        if observer not in self.observers:
            self.observers.append(observer)
            #if DEBUG:
            #    print >> sys.stderr,"secover: subject", self.dns, "attaches observer", self.observers
        
    def detachObserver(self, observer, reason):
#        if DEBUG:
#            print >> sys.stderr,"secover: subject", self.dns, "detaches observer", observer
        
        self.observers.remove(observer)
        
        if self.isEmpty():
            self.subject_manager.unregisterSubject(self.dns, reason)
        
        
class SubjectManager:
    """ Command Pattern. Used for sending overlay message. """
    
    def __init__(self):
        self.subjects = {}    # (ip,port): Subject
    
    def registerSubject(self, dns):
        #if DEBUG:
        #    print >> sys.stderr,"secover: register subject", dns
        if not self.subjects.has_key(dns):
            self.subjects[dns] = Subject(dns, self)
                
    def unregisterSubject(self, dns, reason):
        if self.subjects.has_key(dns) and self.subjects[dns].isEmpty():
            if DEBUG:
                print >> sys.stderr,"secover: unregister subject", dns, reason
            sbj = self.subjects.pop(dns)
            del sbj
            
    
    def getSubject(self, dns):
        self.registerSubject(dns)    # ensure the subject exists
        return self.subjects[dns]        
            
    def notifySubject(self, dns):    # notify the connection is made
        #if DEBUG:
        #    print >> sys.stderr,"secover: notify subject", dns
        if dns and self.subjects.has_key(dns):
            subject = self.subjects[dns]
            subject.notify()

    def scanTasks(self):    # remove outdate subjects
        now = time()
        for dns in self.subjects.keys():
            expired = True
            sbj_obs = self.subjects[dns].observers
            for obs in sbj_obs[:]:
                if not obs.isExpired(now):
                    expired = False    # don't remove a subject if one of its observer is not expired
                else:
                    l = len(self.subjects[dns].observers)
                    obs.unregister('expired')
                    del obs

class IncomingMessageHandler:
    """ a variant of Chain of Responsibility Pattern """
    
    def __init__(self):
        self.handlers = {}
        
    def registerHandler(self, ids, handler):
        for id in ids:
#            if DEBUG:
#                print >> sys.stderr,"secover: Handler registered for",getMessageName(id)
            self.handlers[id] = handler
        
    def handleMessage(self, permid, message):    # connection is type of Conneter.Connection 
        id = message[0]
        handled = False
        if not self.handlers.has_key(id):
            if DEBUG:
                print >> sys.stderr,"secover: No handler found for",getMessageName(id),currentThread().getName()
            return False
        else:
            #if DEBUG:
            #    print >> sys.stderr,"secover: Giving message to handler for",getMessageName(id)
            self.handlers[id].handleMessage(permid, message)
            return True

class SecureOverlay:
    __single = None

    def __init__(self):
        if SecureOverlay.__single:
            raise RuntimeError, "SecureOverlay is Singleton"
        SecureOverlay.__single = self 
        self.subject_manager = SubjectManager()    #???? for outgoing message
        self.incoming_handler = IncomingMessageHandler()    # for incoming message
        self.peer_db = PeerDBHandler()
        self.connection_list = {}    # overlay connections. permid:{'c_conn': Connecter.Connection, 'expire':expire, 'dns':(ip, port)}
        self.timeout = 300    # TODO: adjust it by firewall status. the value is smaller if no firewall
        self.check_interval = 60
        self.my_db = MyDBHandler()
        self.permid = self.my_db.getMyPermid()
        self.ip = self.my_db.getMyIP()
        self.lock = RLock()
        
    def getInstance(*args, **kw):
        if SecureOverlay.__single is None:
            SecureOverlay(*args, **kw)
        return SecureOverlay.__single
    getInstance = staticmethod(getInstance)
    
    def register(self,overlayswarm):
        self.overlayswarm = overlayswarm
        self.overlayswarm.rawserver.add_task(self._auto_close, self.check_interval)
        self.overlayswarm.rawserver.add_task(self._scan_tasks, self.check_interval)

    def registerHandler(self, ids, handler):    
        """ 
          ids is the [ID1, ID2, ..] where IDn is a sort of message ID in overlay swarm. 
          Each ID can only be handled by one handler, but a handler can handle multiple IDs
        """
        # I assume that all handler registration is done before handling, so no
        # concurrency on incoming_handler
        self.incoming_handler.registerHandler(ids, handler)

    def _auto_close(self):
        self.acquire()
        self.overlayswarm.rawserver.add_task(self._auto_close, self.check_interval)
        self._checkConnections()
        self.release()    

    def _scan_tasks(self):
        self.acquire()
        self.overlayswarm.rawserver.add_task(self._scan_tasks, self.check_interval)
        self.subject_manager.scanTasks()
        self.release()    

    def _checkConnections(self):
        for permid in self.connection_list.keys():
            self._checkConnection(permid)

    def _checkConnection(self, permid):
        conn = self.connection_list[permid]['c_conn']
        expire = self.connection_list[permid]['expire']
        expired = time() > expire
        if not conn or conn.closed or expired:
            if expired:
                if DEBUG:                         
                    print >> sys.stderr,"secover: closing expired conn",show_permid2(permid), int(time())
                self._closeConnection(conn, 'TIME_OUT')
            else:
                if DEBUG:                         
                    print >> sys.stderr,"secover: removing closed conn",show_permid2(permid)
                self._closePermidConnection(permid, 'CON_CLOS')
            ret = None
        else:
            ret = conn
        return ret
        
    # the central place to close connection
    def _closeConnection(self, connection, reason):
        self.acquire()
        if connection is not None and not connection.closed:
            permid = connection.permid
            connection.close()
            ## connectionLost callback is called by connection.close() which
            ## will remove the conn from the list, but just to be safe:
            if permid and self.connection_list.has_key(permid):
                if GLOBAL.overlay_log:
                    write_overlay_log('CONN_DEL', permid, reason=reason)
                self.connection_list.pop(permid)
        self.release()        
    
    def _closePermidConnection(self, permid, reason):
        self.acquire()
        if self.connection_list.has_key(permid):
            connection = self.connection_list[permid]['c_conn']
            if connection is not None and not connection.closed:
                connection.close()
            if GLOBAL.overlay_log:
                write_overlay_log('CONN_DEL', permid, reason=reason)
            self.connection_list.pop(permid)
        self.release()        

    def _findConnByPermid(self, permid):
        if self.connection_list.has_key(permid):
            return self._checkConnection(permid)
        else:
            return None
        
    def findPermidByDNS(self, dns):    #find permid from connection_list
        self.acquire()
        ret = None
        for permid, value in self.connection_list.items():
            if value['dns'] == dns and self._checkConnection(permid):
                ret = permid
                break
        self.release()
        return ret

    def findDNSByPermid(self, permid):
        self.acquire()
        ret = None
        if self._findConnByPermid(permid):
            ret = self.connection_list[permid]['dns']
        self.release()
        return ret
        
    # Main function to send messages
    def addTask(self, target, message=None, timeout=15):    # target = [permid|(ip,port)]
        """ Command Pattern """

        if GLOBAL.do_overlay == 0:
            return
        
        self.acquire()
        #TODO: priority task queue
        try:
            try:
                if message is None:
                    msg_id = 'None'
                else:
                    msg_id = getMessageName(message[0])
                if msg_id.startswith('Unknown'):
                    return
                if isValidPermid(target) and target != self.permid:
                    if DEBUG:
                        msg = msg_id + ' '+currentThread().getName()
                        if DEBUG:
                            print >> sys.stderr,"secover: add PermidOverlayTask", show_permid_short(target), msg
                    task = PermidOverlayTask(self, self.subject_manager, target, message, timeout)
                elif isValidDNS(target): # and target[0] != self.ip:    # for testing
                    if DEBUG:
                        if message is None:
                            msg = 'None'
                        else:
                            msg = getMessageName(message[0])
                        msg = msg_id + ' '+currentThread().getName()
                        if DEBUG:
                            print >> sys.stderr,"secover: add DNSOverlayTask", target, msg
                    task = DNSOverlayTask(self, self.subject_manager, target, message, timeout)
                else:
                    return

                if task and self.overlayswarm.registered:
                    ## Arno: I don't see the need for letting the rawserver do it.
                    ## Except that it potentially avoids a concurrency problem of
                    ## multiple threads writing to the same socket.
                    if DEBUG:
                        if message:
                            msg_id = getMessageName(message[0])
                        else:
                            msg_id = ''
                        print >> sys.stderr,"secover: add task to rawserver", msg_id, currentThread().getName()
                    self.overlayswarm.rawserver.add_task(task.start, 0)
                    ##task.start()
            except Exception,e:
                print_exc()
        finally:
            self.release()        

    def connectionMade(self, connection):    # OverlayConnecter.Connection
        self.acquire()
        if DEBUG:
            print >> sys.stderr,"secover: *** secure overlay to %s connection made." % show_permid2(connection.permid), connection.get_ip(), int(time())
        #TODO: schedule it on rawserver task queue?
        dns = self._addConnection(connection)
        if dns:
            if GLOBAL.overlay_log:
                write_overlay_log('CONN_ADD', connection.permid)
            self.subject_manager.notifySubject(dns)
        self.release()    
        
    def addTryTimes(self, permid):
        self.peer_db.updateTimes(permid, 'tried_times', 1)
            
    def _addConnection(self, connection):
        dns = connection.dns
        permid = connection.permid
        self.peer_db.updateTimes(permid, 'connected_times', 1)
        auth_listen_port = connection.get_auth_listen_port()
        if DEBUG:
            print >> sys.stderr,"secover: add connection in secure overlay", dns, "auth listen port", auth_listen_port
        #
        # Arno: if DNS is none, this is an incoming connection from another
        # peer. We cannot enter this connection into the table because we don't
        # know the listen port of the peer (and if we would initiate a connection
        # that is the port we look for). However, I encoded the listen port of a peer
        # into its peerID. So now we know the initiating peers listen port and
        # the problem is solved.
        #
        if dns is None:
            dns = ( connection.get_ip(), auth_listen_port )
        else:
            if dns[1] != auth_listen_port:
                if DEBUG:
                    print >> sys.stderr,"secover: WARNING given listen port not equal to authenticated one"

        if isValidPermid(permid) and isValidDNS(dns):
            if self.connection_list.has_key(permid):
                # Conccurency: When a peer starts an overlay connection at
                # the same time, and we start it before the C/R protocol
                # has finished, we'll end up with two connections. In that
                # case we drop the last one established.
                if DEBUG:
                    print >> sys.stderr,"secover: dropping superfluous double connection to",show_permid2(permid)
                connection.close()
                # Don't stop
                return dns

            self._updateDNS(permid, dns)
            expire = int(time() + self.timeout)
            self.connection_list[permid] = {'c_conn':connection, 'dns':dns, 'expire':expire}
            if DEBUG:
                print >> sys.stderr,"secover: permid received is", show_permid2(permid)
            #x = self.peer_db.getPeer(permid)
            #print >> sys.stderr,"secover: old peer is",x
            #self.peer_db.updatePeerIPPort(permid, dns[0], dns[1])
            #y = self.peer_db.getPeer(permid)
            #print >> sys.stderr,"secover: new peer is",y
            return dns
        return None
        
    def _updateDNS(self, permid, dns):
        self.peer_db.updatePeerIPPort(permid, dns[0], dns[1])
        
    def _extendExpire(self, permid):
        self.connection_list[permid]['expire'] = int(time() + self.timeout)
        
    def connectionLost(self, connection):    # OverlayConnecter.Connection
        if DEBUG:
            print >> sys.stderr,"secover: ***** secure overlay connection lost", show_permid2(connection.permid), connection.get_ip(), int(time())
        self.acquire()
        self._closeConnection(connection, 'CON_LOST')
        self.release()

    def connectPeer(self, dns):    # called by task
        self.acquire()
        self.overlayswarm.connectPeer(dns)
        self.release()    

    def sendMessage(self, permid, message):
        if not permid:
            return
        self.acquire()
        connection = self._findConnByPermid(permid)
        if connection:
            if GLOBAL.overlay_log:
                write_overlay_log('SEND_MSG', permid, message)
            self._extendExpire(permid)
            self.overlayswarm.sendMessage(connection, message)
        self.release()

    def gotMessage(self, permid, message):    # connection is type of Connecter.Connection 
        self.acquire()
        try:
            if GLOBAL.overlay_log:
                write_overlay_log('RECV_MSG', permid, message)
            t = message[0]
            if t == CANCEL:    # the only message handled by secure overlay
                self._closePermidConnection(permid, 'CANCELED')
            elif self.incoming_handler.handleMessage(permid, message) == False:
                self._closePermidConnection(permid, 'FAKE_MSG')
            else:
                self._extendExpire(permid)
        except:
            print_exc()
        self.release()

    def acquire(self):
#        if DEBUG:
#            print >> sys.stderr,"secover: LOCK",currentThread().getName()
        self.lock.acquire()
        
    def release(self):
#        if DEBUG:
#            print >> sys.stderr,"secover: UNLOCK",currentThread().getName()
        self.lock.release()


def write_overlay_log(action, permid, msg=None, dns=None, reason=None):
    """
      SecureOverlay log format:
          TIME - CONN_TRY - IP - PORT - PERMID
          TIME - CONN_ADD - IP - PORT - PERMID 
          TIME - CONN_DEL - IP - PORT - REASON(TIME_OUT, CON_CLOS, CON_LOST, CANCELED, FAKE_MSG) - PERMID
          TIME - SEND_MSG - IP - PORT - MSG_ID - PERMID - MSG 
          TIME - RECV_MSG - IP - PORT - MSG_ID - PERMID - MSG
    """
    
    if dns is not None and permid is not None:
        ip, port = dns
    elif isValidPermid(permid):    # permid, msg
        secure_overlay = SecureOverlay.getInstance()
        dns = secure_overlay.connection_list[permid]['dns']
        ip = dns[0]
        port = dns[1]
    else:    # connection
        permid = 'Permid_None'
        ip = 'None_ip'
        port = 0        
        
    if permid != 'Permid_None':
        permid = show_permid(permid)
    port = str(port)
    sp_log = OverlayLogger.getInstance(GLOBAL.overlay_log)
    if msg:
        msg_name = getMessageName(msg[0])
        sp_log.log(action, ip, port, msg_name, permid, `msg`)    # SEND_MSG, RECV_MSG
    else:
        if reason is not None:
            sp_log.log(action, ip, port, reason, permid)    # CONN_DEL
        else:
            sp_log.log(action, ip, port, permid)    # CONN_TRY, CONN_ADD
    

def test():            
    so = SecureOverlay.getInstance()
    so.overlayswarm.secure_overlay = so
    dns = ('4.3.2.1', 1111)
    permid = 'permid1'
    so.addTask(permid)
    so.addTask(dns, message="hello overlay")

www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.