Helper.py :  » Network » Torrent-Swapper » swapper » Swapper » toofastbt » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Network » Torrent Swapper 
Torrent Swapper » swapper » Swapper » toofastbt » Helper.py
# Written by Pawel Garbacki
# see LICENSE.txt for license information

import sys
from traceback import print_exc,print_stack
from time import time

from Logger import get_logger
from Swapper.Overlay.SecureOverlay import SecureOverlay
from Swapper.CacheDB.CacheDBHandler import PeerDBHandler
from BitTornado.bencode import bencode
from BitTornado.BT1.MessageID import RESERVE_PIECES

MAX_ROUNDS = 200
DEBUG = False

class SingleDownloadHelperInterface:
    """ This interface should contain all methods that the PiecePiecker/Helper
        calls on the SingleDownload class.
    """
    def __init__(self):
        self.frozen_by_helper = False

    def helper_set_freezing(self,val):
        self.frozen_by_helper = val

    def is_frozen_by_helper(self):
        return self.frozen_by_helper

    def is_choked(self):
        pass

    def helper_forces_unchoke(self):
        pass

    def _request_more(self, new_unchoke = False):
        pass


class Helper:
    def __init__(self, torrent_hash, num_pieces, coordinator_permid, coordinator = None):
        self.secure_overlay = SecureOverlay.getInstance()
        self.torrent_hash = torrent_hash
        if coordinator_permid is not None and coordinator_permid == '':
            self.coordinator_permid = None
        else:
            self.coordinator_permid = coordinator_permid
        self.coordinator_ip = None  # see is_coordinator()
        self.coordinator_port = -1

        if self.coordinator_permid is not None:
            peerdb = PeerDBHandler()
            peer = peerdb.getPeer(coordinator_permid)
            if peer is not None:
                self.coordinator_ip = peer['ip']
                self.coordinator_port = peer['port']

        self.reserved_pieces = [False] * num_pieces
        self.ignored_pieces = [False] * num_pieces
        self.coordinator = coordinator
        self.counter = 0
        self.completed = False
        self.distr_reserved_pieces = [False] * num_pieces
        self.marker = [True] * num_pieces
        self.round = 0
        self.encoder = None
        self.continuations = []
        self.outstanding = None
        self.last_req_time = 0

    def set_encoder(self,encoder):
        self.encoder = encoder
        self.encoder.set_coordinator_ip(self.coordinator_ip)
        # To support a helping user stopping and restarting a torrent
        if self.coordinator_permid is not None:
            self.start_data_connection()   

    def test(self):
        result = self.reserve_piece(10,None)
        print >> sys.stderr,"reserve piece returned: " + str(result)
        print >> sys.stderr,"Test passed"

    def _reserve_piece(self, piece):
        self.reserved_pieces[piece] = True
        self.distr_reserved_pieces[piece] = True
        self.ignored_pieces[piece] = False

    def _ignore_piece(self, piece):
        if not self.is_reserved(piece):
            self.ignored_pieces[piece] = True
            self.distr_reserved_pieces[piece] = True

    def is_coordinator(self,permid):
        # If we could get coordinator_ip, don't help
        if self.coordinator_ip is None:
            return False

        if self.coordinator_permid == permid:
            return True
        else:
            return False

### PiecePicker and Downloader interface
    def is_reserved(self, piece):
        if self.reserved_pieces[piece] or (self.coordinator is not None and self.is_complete()):
            return True
        return self.reserved_pieces[piece]

    def is_ignored(self, piece):
        if not self.ignored_pieces[piece] or (self.coordinator is not None and self.is_complete()):
            return False
        return self.ignored_pieces[piece]

    def is_complete(self):
        if self.completed:
            return True
        self.round = (self.round + 1) % MAX_ROUNDS
        if self.round != 0:
            return False
        if self.coordinator is not None:
            self.completed = (self.coordinator.reserved_pieces == self.marker)
        else:
            self.completed = (self.distr_reserved_pieces == self.marker)
        return self.completed

    def reserve_pieces(self, pieces, sdownload, all_or_nothing = False):
        pieces_to_send = []
        ex = "None"
        result = []
        for piece in pieces:
            if self.is_reserved(piece):
                result.append(piece)
            elif not self.is_ignored(piece):
                pieces_to_send.append(piece)

        #if DEBUG:
        #    print >> sys.stderr,"helper: reserve_pieces: result is",result,"to_send is",pieces_to_send

        if pieces_to_send == []:
            return result
        if self.coordinator is not None:
            new_reserved_pieces = self.coordinator.reserve_pieces(pieces_to_send, all_or_nothing)
            for piece in new_reserved_pieces:
                self._reserve_piece(piece)
        else:
            self.send_or_queue_reservation(sdownload,pieces_to_send,result)
            return []

        result = []
        for piece in pieces:
            if self.is_reserved(piece):
                result.append(piece)
            else:
                self._ignore_piece(piece)
        return result

    def reserve_piece(self, piece, sdownload):
        if self.coordinator is not None and self.is_complete():
            return True
        new_reserved_pieces = self.reserve_pieces([piece],sdownload)
        if new_reserved_pieces == []:
            return False
        else:
            return True


## Synchronization interface

    def send_or_queue_reservation(self,sdownload,pieces_to_send,result):
        """ Records the fact that a SingleDownload wants to reserve a
            piece with the coordinator. If it's the first, send the
            actual reservation request.
        """
        if sdownload not in self.continuations:
            if DEBUG:
                print >> sys.stderr,"helper: Queuing reservation for",pieces_to_send
            self.continuations.append(sdownload)
            sdownload.helper_set_freezing(True)
        if len(self.continuations) > 0:
            self.send_reservation(pieces_to_send)

    def send_reservation(self,pieces_to_send):
        # Arno: I sometimes see no reply to a RESERVE_PIECE and the client
        # stops acquiring new pieces. The last_req_time is supposed
        # to fix this.
        waited = int(time())-self.last_req_time
        if self.outstanding is None or waited > 60:
            self.counter += 1
            self.last_req_time = int(time())
            if DEBUG:
                if self.outstanding is None:
                    print >> sys.stderr,"helper: Sending reservation for",pieces_to_send,"because none"
                else:
                    print >> sys.stderr,"helper: Sending reservation for",pieces_to_send,"because timeout"
            sdownload = self.continuations.pop(0)
            if self.outstanding is not None: # allow bypassed conn to restart
                self.outstanding.helper_set_freezing(False)
            self.outstanding = sdownload            
            ex = "self.send_reserve_pieces(pieces_to_send)"
            self.send_reserve_pieces(pieces_to_send)


    def notify(self):
        """ Called by HelperMessageHandler to "wake up" the download that's
            waiting for its coordinator to reserve it a piece 
        """
        if self.outstanding is None:
            if DEBUG:
                print >> sys.stderr,"helper: notify: No continuation waiting???"
        else:
            if DEBUG:
                print >> sys.stderr,"helper: notify: Waking downloader"
            sdownload = self.outstanding
            self.outstanding = None # must be not before calling self.restart!
            self.restart(sdownload)
            
            #self.send_reservation()
            list = self.continuations[:] # copy just to be sure
            self.continuations = []
            for sdownload in list:
                self.restart(sdownload)

    def restart(self,sdownload):
        # Chokes can get in while we're waiting for reply from coordinator. 
        # But as we were called from _request_more() we were not choked 
        # just before, so pretend we didn't see the message yet.
        if sdownload.is_choked():
            sdownload.helper_forces_unchoke()
        sdownload.helper_set_freezing(False)
        sdownload._request_more()

## Coordinator comm.       
    def send_reserve_pieces(self, pieces, all_or_nothing = False):
        if all_or_nothing:
            all_or_nothing = chr(1)
        else:
            all_or_nothing = chr(0)
        payload = self.torrent_hash + all_or_nothing + bencode(pieces)
        self.secure_overlay.addTask(self.coordinator_permid, RESERVE_PIECES + payload )

### HelperMessageHandler interface
    def got_pieces_reserved(self, permid, pieces):
        self.handle_pieces_reserved(pieces)
        self.start_data_connection()

    def handle_pieces_reserved(self,pieces):
        if DEBUG:
            print >> sys.stderr,"helper: Coordinator replied",pieces
        try:
            for piece in pieces:
                if piece > 0:
                    self._reserve_piece(piece)
                else:
                    self._ignore_piece(-piece)
            self.counter -= 1

        except Exception,e:
            print_exc()
            print >> sys.stderr,"helper: Exception in handle_pieces_reserved",e

    def start_data_connection(self):
        # Do this always, will return quickly when connection already exists
        dns = (self.coordinator_ip, self.coordinator_port)
        if DEBUG:
            print >> sys.stderr,"helper: Starting data connection to coordinator",dns
        self.encoder.start_connection(dns,id = None,coord_con = True)
      
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.