test_merkle.py :  » Network » Torrent-Swapper » swapper » test » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Network » Torrent Swapper 
Torrent Swapper » swapper » test » test_merkle.py
# Written by Arno Bakker
# see LICENSE.txt for license information

import unittest

from tempfile import mkstemp
from threading import Event
import os
from types import StringType,IntType,ListType,DictType

from Swapper.Merkle.merkle import *
from TorrentMaker.btmakemetafile import make_meta_file
from BitTornado.bencode import bdecode

from traceback import print_exc

DEBUG=False

class TestMerkleHashes(unittest.TestCase):
    """ 
    Testing Simple Merkle Hashes extension version 0
    """
    
    def setUp(self):
        pass
        
    def tearDown(self):
        pass

    def test_get_hashes_for_piece(self):
        """ 
            test MerkleTree.get_hashes_for_piece() method 
        """
        self._test_123pieces_tree_get_hashes()
        self._test_8piece_tree_uncle_calc()

    def _test_123pieces_tree_get_hashes(self):
        for n in range(1,64):
            piece_size = 2 ** n
            self._test_1piece_tree_get_hashes(piece_size,piece_size)
            for add in [1,piece_size-1]:
                self._test_1piece_tree_get_hashes(piece_size,add)
                self._test_2piece_tree_get_hashes(piece_size,add)
                self._test_3piece_tree_get_hashes(piece_size,add)

    def _test_1piece_tree_get_hashes(self,piece_size,length_add):
        """ testing get_hashes_for_piece on tree with 1 piece """
        msg = "1piece_get_hashes("+str(piece_size)+","+str(length_add)+") failed"
        npieces = 1
        total_length = length_add

        piece_hashes = ['\x01\x02\x03\x04\x05\x06\x07\x08\x07\x06\x05\x04\x03\x02\x01\x00\x01\x02\x03\x04' ] * npieces
        tree = MerkleTree(piece_size,total_length,None,piece_hashes)
        for p in range(npieces):
            ohlist = tree.get_hashes_for_piece(p)
            self.assert_(len(ohlist)==1,msg)
            self.assert_(ohlist[0][0] == 0,msg)
            self.assertEquals(ohlist[0][1],piece_hashes[0],msg)

    def _test_2piece_tree_get_hashes(self,piece_size,length_add):
        """testing get_hashes_for_piece on tree with 2 pieces """
        msg = "2piece_get_hashes("+str(piece_size)+","+str(length_add)+") failed"
        npieces = 2
        total_length = piece_size+length_add

        piece_hashes = ['\x01\x02\x03\x04\x05\x06\x07\x08\x07\x06\x05\x04\x03\x02\x01\x00\x01\x02\x03\x04' ] * npieces
        tree = MerkleTree(piece_size,total_length,None,piece_hashes)
        for p in range(npieces):
            ohlist = tree.get_hashes_for_piece(p)
            self.assert_(len(ohlist)==3)
            ohlist.sort()
            self.assert_(ohlist[0][0] == 0,msg)
            self.assert_(ohlist[1][0] == 1,msg)
            self.assert_(ohlist[2][0] == 2,msg)
            self.assertDigestEquals(ohlist[1][1]+ohlist[2][1], ohlist[0][1],msg)

    def _test_3piece_tree_get_hashes(self,piece_size,length_add):
        """ testing get_hashes_for_piece on tree with 3 pieces """
        msg = "3piece_get_hashes("+str(piece_size)+","+str(length_add)+") failed"
        npieces = 3
        total_length = 2*piece_size+length_add

        piece_hashes = ['\x01\x02\x03\x04\x05\x06\x07\x08\x07\x06\x05\x04\x03\x02\x01\x00\x01\x02\x03\x04' ] * npieces
        tree = MerkleTree(piece_size,total_length,None,piece_hashes)
        for p in range(npieces):
            ohlist = tree.get_hashes_for_piece(p)
            self.assert_(len(ohlist)==4,msg)
            ohlist.sort()
            if p == 0 or p == 1:
                self.assert_(ohlist[0][0] == 0,msg)
                self.assert_(ohlist[1][0] == 2,msg)
                self.assert_(ohlist[2][0] == 3,msg)
                self.assert_(ohlist[3][0] == 4,msg)
                digest34 = self.calc_digest(ohlist[2][1]+ohlist[3][1])
                self.assertDigestEquals(digest34+ohlist[1][1],ohlist[0][1],msg)
            else:
                self.assert_(ohlist[0][0] == 0,msg)
                self.assert_(ohlist[1][0] == 1,msg)
                self.assert_(ohlist[2][0] == 5,msg)
                self.assert_(ohlist[3][0] == 6,msg)
                digest56 = self.calc_digest(ohlist[2][1]+ohlist[3][1])
                self.assertDigestEquals(ohlist[1][1]+digest56,ohlist[0][1],msg)

    def assertDigestEquals(self,data,digest,msg=None):
        self.assertEquals(self.calc_digest(data),digest,msg)

    def calc_digest(self,data):
        digester = sha()
        digester.update(data)
        return digester.digest()

    def _test_8piece_tree_uncle_calc(self):
        npieces = 8
        hashlist = self.get_indices_for_piece(0,npieces)
        assert hashlist == [7, 8, 4, 2, 0]

        hashlist = self.get_indices_for_piece(1,npieces)
        assert hashlist == [8, 7, 4, 2, 0]

        hashlist = self.get_indices_for_piece(2,npieces)
        assert hashlist == [9, 10, 3, 2, 0]

        hashlist = self.get_indices_for_piece(3,npieces)
        assert hashlist == [10, 9, 3, 2, 0]

        hashlist = self.get_indices_for_piece(4,npieces)
        assert hashlist == [11, 12, 6, 1, 0]

        hashlist = self.get_indices_for_piece(5,npieces)
        assert hashlist == [12, 11, 6, 1, 0]

        hashlist = self.get_indices_for_piece(6,npieces)
        assert hashlist == [13, 14, 5, 1, 0]

        hashlist = self.get_indices_for_piece(7,npieces)
        assert hashlist == [14, 13, 5, 1, 0]

    def get_indices_for_piece(self,index,npieces):
        height = get_tree_height(npieces)
        tree = create_tree(height)
        ohlist = get_hashes_for_piece(tree,height,index)
        list = []
        for oh in ohlist:
            list.append(oh[0])
        return list

    def test_check_hashes_update_hash_admin(self):
        """ 
            test MerkleTree.check_hashes() and update_hash_admin() methods
        """
        for n in range(1,64):
            piece_size = 2 ** n
            for add in [1,piece_size-1]:
                self._test_3piece_tree_check_hashes_update_hash_admin(piece_size, add)

    def _test_3piece_tree_check_hashes_update_hash_admin(self,piece_size,length_add):
        """ testing check_hashes and update_hash_admin tree with 3 pieces """
        msg = "3piece_check_hashes("+str(piece_size)+","+str(length_add)+") failed"
        npieces = 3
        total_length = 2*piece_size+length_add

        piece_hashes = ['\x01\x02\x03\x04\x05\x06\x07\x08\x07\x06\x05\x04\x03\x02\x01\x00\x01\x02\x03\x04' ] * npieces
        fulltree = MerkleTree(piece_size,total_length,None,piece_hashes)
        root_hash = fulltree.get_root_hash()
        emptytree = MerkleTree(piece_size,total_length,root_hash,None)
        empty_piece_hashes = [0] * npieces

        for p in range(npieces):
            ohlist = fulltree.get_hashes_for_piece(p)
            self.assert_(emptytree.check_hashes(ohlist),msg)

        for p in range(npieces):
            ohlist = fulltree.get_hashes_for_piece(p)
            self.assert_(emptytree.check_hashes(ohlist),msg)
            emptytree.update_hash_admin(ohlist,empty_piece_hashes)

        for p in range(npieces):
            self.assert_(piece_hashes[p] == empty_piece_hashes[p],msg)

    def test_merkle_torrent(self):
        """
            test the creation of Merkle torrent files via TorrentMaker/btmakemetafile.py
        """
        piece_size = 2 ** 18
        for file_size in [1,piece_size-1,piece_size,piece_size+1,2*piece_size,(2*piece_size)+1]:
            self.create_merkle_torrent(file_size,piece_size)

    def create_merkle_torrent(self,file_size,piece_size):
        try:
            # 1. create file
            [handle,datafilename]= mkstemp()
            os.close(handle)
            block = "".zfill(file_size)
            fp = open(datafilename,"wb")
            fp.write(block)
            fp.close()
            torrentfilename = datafilename+'.merkle.torrent'

            # 2. create args for make_meta_file
            url = "http://localhost:6969/announce"
            params = {}
            params['merkle_torrent'] = 1
            params['piece_size_pow2'] = int(log(piece_size,2))
            flag = Event()
            def dummy_progress(val):
                pass
            def dummy_filecallback(f1,f2):
                pass

            # 3. create Merkle torrent
            make_meta_file(datafilename,url,params,flag,dummy_progress,1,dummy_filecallback)

            # 4. read Merkle torrent
            fp = open(torrentfilename,"rb")
            data = fp.read(10000)
            fp.close()

            # 5. test Merkle torrent
            # basic tests
            dict = bdecode(data)
            self.assert_(type(dict) == DictType)
            self.assert_(dict.has_key('info'))
            info = dict['info']
            self.assert_(type(info) == DictType)
            self.assert_(not info.has_key('pieces'))
            self.assert_(info.has_key('root hash'))
            roothash = info['root hash']
            self.assert_(type(roothash) == StringType)
            self.assert_(len(roothash)== 20)

            # create hash tree
            hashes = self.read_and_calc_hashes(datafilename,piece_size)
            npieces = len(hashes)
            if DEBUG:
                print "npieces is",npieces
            height = log(npieces,2)+1
            if height > int(height):
                height += 1
            height = int(height)
            if DEBUG:
                print "height is",height

            starto = (2 ** (height-1))-1

            if DEBUG:
                print "starto is",starto
            tree = [0] * ((2 ** (height))-1)
            if DEBUG:
                print "len tree is",len(tree)
            # put hashes in tree
            for i in range(len(hashes)):
                o = starto + i
                tree[o] = hashes[i]

            # fill unused
            nplaces = (2 ** height)-(2 ** (height-1))        
            xso = starto+npieces
            xeo = starto+nplaces
            for o in range(xso,xeo):
                tree[o] = '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'

            # calc higher level ones
            if height > 1:
                for o in range(len(tree)-starto-2,-1,-1):
                    co = self.get_child_offset(o,height)
                    if DEBUG:
                        print "offset is",o,"co is",co
                    data = tree[co]+tree[co+1]
                    digest = self.calc_digest(data)
                    tree[o] = digest
            self.assert_(tree[0],roothash)
        except Exception,e:
            print_exc()
        #finally:
        #    os.remove(datafilename)
        #    os.remove(torrentfilename)

    def read_and_calc_hashes(self,filename,piece_size):
        hashes = []
        fp = open(filename,"rb")
        while True:
            block = fp.read(piece_size)
            if len(block) == 0:
                break
            digest = self.calc_digest(block)
            hashes.append(digest)
            if len(block) != piece_size:
                break
        fp.close()
        return hashes

    def get_child_offset(self,offset,height):
        if DEBUG:
            print "get_child(",offset,",",height,")"
        if offset == 0:
            level = 1
        else:
            level = log(offset,2)
            if level == int(level):
                level += 1
            else:
                level = ceil(level)
            level = int(level)
        starto = (2 ** (level-1))-1
        diffo = offset-starto
        diffo *= 2
        cstarto = (2 ** level)-1
        return cstarto+diffo

def test_suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(TestMerkleHashes))
    
    return suite

if __name__ == "__main__":
    unittest.main()
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.