cachedb.py :  » Network » Torrent-Swapper » swapper » Swapper » CacheDB » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Network » Torrent Swapper 
Torrent Swapper » swapper » Swapper » CacheDB » cachedb.py
# Written by Jie Yang
# see LICENSE.txt for license information

"""
Database design
Value in bracket is the default value
Don't use None as a default value

MyDB - (PeerDB)
  mydata.bsd:    # future keys: pictures, 
    version: int (curr_version)    # required
    permid: str                    # required
    ip: str ('')
    port: int (0)
    name: str ('Swapper')
    torrent_path: str ('')    # default path to store torrents
    prefxchg_queue: list ([]) # permid
    bootstrapping: int (1)
    max_num_torrents: int (100000)
    max_num_my_preferences: int (1000)
    superpeers: Set([permid])
    friends: Set([permid])
            
PeerDB - (MyFriendDB, PreferenceDB, OwnerDB)   
  peers.bsd:    # future keys: sys_trust, reliablity, speed, personal_info, ..
    permid:{       
        ip: str ('')
        port: int (0)    # listening port, even behind firewall
        name: str ('unknown')
        last_seen: int (0)
        similarity: int (0)    # [0, 1000]
        connected_times: int(0)    # times to connect the peer successfully
        tried_times: int(0)        # times to attempt to connect the peer
        buddycast_times: int(0)    # times to receive buddycast message
        #relability (uptime, IP fixed/changing)
        #trust: int (0)    # [0, 100]
        #icon: str ('')    # name + '_' + permid[-4:]
    }

TorrentDB - (PreferenceDB, MyPreference, OwnerDB)
  torrents.bsd:    # future keys: names, tags, trackers, ..
    infohash:{
        relevance: int (0)    # [0, 1000]
        torrent_name: str ('')    # torrent name
        torrent_dir: str ('')    # path of the torrent (without the file name). '\x01' for default path
        info: dict ({})   # {name, length, announce, creation date, comment, announce-list, num_files}
        leecher: int (0)
        seeder: int (0)
    }

PreferenceDB - (PeerDB, TorrentDB)    # other peers' preferences
  preferences.bsd:
    permid:{
        torrent_id:{'relevance': int (0), 'rank': int (0)}    # re: [0, 1000], rank: [-1, 5]
    }

MyPreferenceDB - (TorrentDB)
  mypreferences.bsd:    # future keys: speed
    infohash:{
        created_time: int (0)   # time to start download/upload the torrent
        content_name: str ('')  # real file name in disk, may be different with info['name']
        content_dir: str ('')   # content_dir + content_name = full path
        rank: int (0)  # [-1, 5], # -1 means it is a fake torrent
        last_seen: int (0)
    }
        
OwnerDB - (PeerDB, TorrentDB)
  owner.bsd:
    infohash: Set([permid])    # future keys: tags, name

"""

import os, sys
from time import time,ctime
from random import random
from sha import sha
from copy import deepcopy
from sets import Set
from traceback import print_exc,print_stack
from threading import currentThread

#from Swapper.utilities import isValidPermid, isValidInfohash

try:
    # For Python 2.3
    from bsddb import db,dbshelve,dbutils
except ImportError:
    # For earlier Pythons w/distutils pybsddb
    from bsddb3 import db,dbshelve,dbutils

from shelve import BsdDbShelf

#permid_len = 0  #112
#infohash_len = 20
#

home_dir = 'bsddb'
curr_version = 1
permid_length = 112
infohash_length = 20
torrent_id_length = 20
MAX_RETRIES = 12
STRICT_CHECK = False
DEBUG = False
    
def isValidPermid(permid):    # validate permid in outer layer
    return True
    
def isValidInfohash(infohash):
    return True

def init(config_dir, myinfo):
    """ create all databases """
    
    global home_dir
    home_dir = make_filename(config_dir, 'bsddb')
    MyDB.getInstance(myinfo, home_dir)
    PeerDB.getInstance(home_dir)
    TorrentDB.getInstance(home_dir)
    PreferenceDB.getInstance(home_dir)
    MyPreferenceDB.getInstance(home_dir)
    OwnerDB.getInstance(home_dir)
    
def done(config_dir):
    MyDB.getInstance().close()
    MyPreferenceDB.getInstance().close()
    OwnerDB.getInstance().close()
    PeerDB.getInstance().close()
    PreferenceDB.getInstance().close()
    TorrentDB.getInstance().close()


def make_filename(config_dir,filename):
    if config_dir is None:
        return filename
    else:
        return os.path.join(config_dir,filename)    
    
def setDBPath(db_dir = ''):
    if not db_dir:
        db_dir = '.'
    if not os.access(db_dir, os.F_OK):
        try: 
            os.mkdir(db_dir)
        except os.error, msg:
            print >> sys.stderr, "cachedb: cannot set db path:", msg
            db_dir = '.'
    return db_dir

def open_db2(filename, db_dir='', filetype=db.DB_BTREE):    # backup
    global home_dir
    if not db_dir:
        db_dir = home_dir
    dir = setDBPath(db_dir)
    path = os.path.join(dir, filename)
    try:
        d = dbshelve.open(path, filetype=filetype)
    except Exception, msg:
        print >> sys.stderr, "cachedb: cannot open dbshelve on", path, msg
        d = dbshelve.open(filename, filetype=filetype)
    return d

def open_db(filename, db_dir='', filetype=db.DB_BTREE, writeback=False):
    global home_dir
    if not db_dir:
        db_dir = home_dir
    dir = setDBPath(db_dir)
    path = os.path.join(dir, filename)
    env = db.DBEnv()
    # Concurrent Data Store
    env.open(dir, db.DB_THREAD|db.DB_INIT_CDB|db.DB_INIT_MPOOL|db.DB_CREATE|db.DB_PRIVATE)
    #d = db.DB(env)
    #d.open(path, filetype, db.DB_THREAD|db.DB_CREATE)
    #_db = BsdDbShelf(d, writeback=writeback) 
    _db = dbshelve.open(filename, flags=db.DB_THREAD|db.DB_CREATE, 
            filetype=filetype, dbenv=env)
    return _db

def validDict(data, keylen=0):    # basic requirement for a data item in DB
    if not isinstance(data, dict):
        return False
    for key in data:
        if not isinstance(key, str):
            return False
        if STRICT_CHECK and keylen and len(key) != keylen:
            return False
    return True        
    
def validList(data, keylen=0):
    if not isinstance(data, list):
        return False
    for key in data:
        if not isinstance(key, str):
            return False
        if STRICT_CHECK and keylen and len(key) != keylen:
            return False
    return True        

# Abstract base calss    
class BasicDB:    # Should we use delegation instead of inheritance?
        
    def __init__(self, db_dir=''):
        self.default_item = {'d':1, 'e':'abc', 'f':{'k':'v'}, 'g':[1,'2']} # for test
        if self.__class__ == BasicDB:
            self.db_name = 'basic.bsd'    # for testing
            self.opened = True
            self._data = open_db(self.db_name, db_dir, filetype=db.DB_HASH)
            #raise NotImplementedError, "Cannot create object of class BasicDB"
    
#------------ Basic interfaces, used by member func and handlers -------------#
    def __del__(self):
        self.close()
        
    threadnames = {}
    
    def _put(self, key, value):    # write
        try:
            if DEBUG:
                name = currentThread().getName()
                if name not in self.threadnames:
                    self.threadnames[name] = 0
                self.threadnames[name] += 1
                print >> sys.stderr, "cachedb: put", len(self.threadnames), name, \
                    self.threadnames[name], time(), self.__class__.__name__
                    
            dbutils.DeadlockWrap(self._data.put, key, value, max_retries=MAX_RETRIES)
            #self._data.put(key, value)
        except:
            pass
        
    def _has_key(self, key):    # find a key
        try:
            return dbutils.DeadlockWrap(self._data.has_key, key, max_retries=MAX_RETRIES)
            #return self._data.has_key(key)
        except:
            return False
    
    def _get(self, key, value=None):    # read
        try:
            return dbutils.DeadlockWrap(self._data.get, key, value, max_retries=MAX_RETRIES)
            #return self._data.get(key, value)
        except:
            print >> sys.stderr, "cachedb: _get EXCEPTION BY",currentThread().getName()
            print_exc()
            return None
        
    def _updateItem(self, key, data):
        try:
            x = self._get(key)
            if isinstance(x, dict):
                x.update(data)
            else:
                x = data
            self._put(key, x)
        except:
            print_exc()
    
    def _delete(self, key):
        try:
            if DEBUG:
                name = currentThread().getName()
                if name not in self.threadnames:
                    self.threadnames[name] = 0
                self.threadnames[name] += 1
                print >> sys.stderr, "cachedb: del", len(self.threadnames), name, \
                    self.threadnames[name], time(), self.__class__.__name__
                
            dbutils.DeadlockWrap(self._data.delete, key, max_retries=MAX_RETRIES)
            #self._data.delete(key)
        except:
            pass

    def _sync(self):            # write data from mem to disk
        dbutils.DeadlockWrap(self._data.sync, max_retries=MAX_RETRIES)
        #self._data.sync()
            
    def _clear(self):
        dbutils.DeadlockWrap(self._data.clear, max_retries=MAX_RETRIES)
        #self._data.clear()
    
    def _keys(self):
        return dbutils.DeadlockWrap(self._data.keys, max_retries=MAX_RETRIES)
        #return self._data.keys()
    
    def _values(self):
        return dbutils.DeadlockWrap(self._data.values, max_retries=MAX_RETRIES)
        #return self._data.values()
    
    def _items(self):
        return dbutils.DeadlockWrap(self._data.items, max_retries=MAX_RETRIES)
        #return self._data.items()
    
    def _size(self):
        try:
            return dbutils.DeadlockWrap(len, self._data, max_retries=MAX_RETRIES)
            #return len(self._data)
        except:
            print_exc()
            print >> sys.stderr, "cachedb: cachedb.BasicDB._size error", self.__class__.__name__
            return 0
    
    def close(self):
        if DEBUG:
            print >> sys.stderr, "cachedb: Closing database",self.db_name,currentThread().getName()
        if self.opened:
            try:
                self._sync()
                dbutils.DeadlockWrap(self._data.close, max_retries=MAX_RETRIES)
                if DEBUG:
                    print >> sys.stderr, "cachedb: Done waiting for database close",self.db_name,currentThread().getName()
                #self._data.close()
            except:
                print_exc()
        self.opened = False
        
    def updateDB(self, old_version):
        raise NotImplementedError

    def setDefaultItem(self, item):
        df = deepcopy(self.default_item)
        df.update(item)
        return df
    
    
class MyDB(BasicDB):
    
    __single = None

    def __init__(self, myinfo=None, db_dir=''):
        if MyDB.__single:
            raise RuntimeError, "MyDB is singleton"
        self.db_name = 'mydata.bsd'
        self.opened = True
        self._data = open_db(self.db_name, db_dir, filetype=db.DB_HASH)    # dbshelve object
        MyDB.__single = self 
        self.default_data = {
            'version':curr_version, 
            'permid':'', 
            'ip':'', 
            'port':0, 
            'name':'Swapper', 
            'torrent_path':'',
            'prefxchg_queue':[],
            'bootstrapping':1, 
            'max_num_torrents':100000,
            'max_num_my_preferences':1000,
            'superpeers':Set(),
            'friends':Set(),
        }
        self.preload_keys = ['ip', 'torrent_path', 'permid']    # these keys can be changed at each bootstrap
        self.initData(myinfo)
            
    def getInstance(*args, **kw):
        if MyDB.__single is None:
            MyDB(*args, **kw)
        if MyDB.__single._size() < len(MyDB.__single.default_data):
            MyDB.__single.initData()
        return MyDB.__single
    getInstance = staticmethod(getInstance)

    def setDefault(self, data):    # it is only used by validData()
        dd = deepcopy(self.default_data)
        dd.update(data)
        return dd

    def initData(self, myinfo=None):
        MyDB.checkVersion(self)
        if not myinfo:
            myinfo = {}
        myinfo = self.setDefault(myinfo)
        self.load(myinfo)
        
    def load(self, myinfo):
        for key in myinfo:
            if not self._has_key(key) or key in self.preload_keys:    # right?
                self._put(key, myinfo[key])
        
    def checkVersion(db):
        if not MyDB.__single:
            MyDB()        # it should never be entered
        old_version = MyDB.__single._get('version')
        if not old_version:
            MyDB.__single._put('version', curr_version)
        elif old_version < curr_version:
            db.updateDB(old_version)
        elif old_version > curr_version:
            raise RuntimeError, "The version of database is too high. Please update the software."
    checkVersion = staticmethod(checkVersion)
    
    # superpeers
    def addSuperPeer(self, permid):
        if isValidPermid(permid):
            sp = self._get('superpeers')
            sp.add(permid)
            self._put('superpeers', sp)
            
    def deleteSuperPeer(self, permid):
        if isValidPermid(permid):
            try:
                sp = self._get('superpeers')
                sp.remove(permid)
                self._put('superpeers', sp)
            except:
                pass
            
    def isSuperPeer(self, permid):
        return permid in self._get('superpeers')
    
    def getSuperPeers(self):
        superpeers = self._get('superpeers')
        if superpeers is not None:
            return list(superpeers)
        else:
            return []
    
    # friends
    def addFriend(self, permid):
        if isValidPermid(permid):
            if not 'friends' in self._keys():
                print >> sys.stderr, "cachedb: addFriend key error", self._keys()
            fr = self._get('friends')
            fr.add(permid)
            self._put('friends', fr)
            
    def deleteFriend(self, permid):
        try:
            fr = self._get('friends')
            fr.remove(permid)
            self._put('friends', fr)
        except:
            pass
            
    def isFriend(self, permid):
        return permid in self._get('friends')
    
    def getFriends(self):
        friends = self._get('friends')
        if friends is not None:
            return list(friends)
        else:
            return []
    

class PeerDB(BasicDB):
    """ List of Peers, e.g. Host Cache """
    
    __single = None
    
    def __init__(self, db_dir=''):
        if PeerDB.__single:
            raise RuntimeError, "PeerDB is singleton"
        self.db_name = 'peers.bsd'
        self.opened = True
        self._data = open_db(self.db_name, db_dir)    # dbshelve object
        MyDB.checkVersion(self)
        PeerDB.__single = self
        self.default_item = {
            'ip':'',
            'port':0,
            'name':'',
            'last_seen':0,
            'similarity':0,
            'connected_times':0,
            'tried_times':0,
            'buddycast_times':0,
            #'trust':50,
            #'reliability':
            #'icon':'',
        }
        self.new_encountered_peer = True
        
    def getInstance(*args, **kw):
        if PeerDB.__single is None:
            PeerDB(*args, **kw)
        return PeerDB.__single
    getInstance = staticmethod(getInstance)

    def updateItem(self, permid, item={}, update_dns=True, update_time=True):    # insert a peer; update it if existed
#        if item.has_key('name'):
#            assert item['name'] != 'qfqf'
        if isValidPermid(permid) and validDict(item):
            if self._has_key(permid):
                _item = self.getItem(permid)
                if _item is None:  # database error, the key exists, but the data ain't there
                    return
                if not update_dns:
                    if item.has_key('ip'):
                        item.pop('ip')
                    if item.has_key('port'):
                        item.pop('port')
                _item.update(item)
                if update_time:
                    _item.update({'last_seen':int(time())})
                self._updateItem(permid, _item)
            else:
                item = self.setDefaultItem(item)
                if update_time:
                    item.update({'last_seen':int(time())})
                self._put(permid, item)
                
    def deleteItem(self, permid):
        self._delete(permid)
        
    def getItem(self, permid, default=False):
        ret = self._get(permid, None)
        if ret is None and default:
            ret = deepcopy(self.default_item)
        return ret
    
    def hasItem(self, permid):
        return self._has_key(permid)
        
    def hasNewEncounteredPeer(self, v):
        self.new_encountered_peer = v


class TorrentDB(BasicDB):
    """ Database of all torrent files, including the torrents I don't have yet """
    
    __single = None
        
    def __init__(self, db_dir=''):
        if TorrentDB.__single:
            raise RuntimeError, "TorrentDB is singleton"
        self.db_name = 'torrents.bsd'
        self.opened = True
        self._data = open_db(self.db_name, db_dir)    # dbshelve object
        MyDB.checkVersion(self)
        TorrentDB.__single = self
        self.default_item = {
            'relevance':0,
            'torrent_name':'',   # name of the torrent
            'torrent_dir':'',   # dir+name=full path. Default path if the value is '\x01'
            'info':{},   # {name, length, announce, creation date, comment}
        }
        self.new_metadata = True
        
    def getInstance(*args, **kw):
        if TorrentDB.__single is None:
            TorrentDB(*args, **kw)
        return TorrentDB.__single
    getInstance = staticmethod(getInstance)
    
    def updateItem(self, infohash, item={}):    # insert a torrent; update it if existed
        if isValidInfohash(infohash) and validDict(item):
            if self._has_key(infohash):
                _item = self.getItem(infohash)
                if not _item:
                    print >> sys.stderr, "cachedb: Error in cachedb.TorrentDB.updateItem: database inconsistant!", self._has_key(infohash), self.getItem(infohash)
                    return
                _item.update(item)
                self._updateItem(infohash, _item)
            else:
                item = self.setDefaultItem(item)
                self._put(infohash, item)

    def deleteItem(self, infohash):
        self._delete(infohash)
        
    def getItem(self, infohash, default=False):
        ret = self._get(infohash, None)
        if ret is None and default:
            ret = deepcopy(self.default_item)
        return ret
    
    def hasNewMetadata(self, v):
        self.new_metadata = v
        

class PreferenceDB(BasicDB):
    """ Peer * Torrent """
    
    __single = None
    
    def __init__(self, db_dir=''):
        if PreferenceDB.__single:
            raise RuntimeError, "PreferenceDB is singleton"
        self.db_name = 'preferences.bsd'
        self.opened = True
        self._data = open_db(self.db_name, db_dir)    # dbshelve object
        MyDB.checkVersion(self)
        PreferenceDB.__single = self 
        self.default_item = {    # subitem actually
            'relevance':0,     # relevance from the owner of this torrent
            'rank':0
        }

    def getInstance(*args, **kw):
        if PreferenceDB.__single is None:
            PreferenceDB(*args, **kw)
        return PreferenceDB.__single
    getInstance = staticmethod(getInstance)

    def addPreference(self, permid, infohash, data={}):    # add or update pref
        if not isValidPermid(permid) or not isValidInfohash(infohash):
            return
        
        if not self._has_key(permid):
            data = self.setDefaultItem(data)
            item = {infohash:data}
        else:
            if self.hasPreference(permid, infohash):
                _data = self.getPreference(permid, infohash)
                _data.update(data)
            else:
                _data = self.setDefaultItem(data)
            _item = {infohash:_data}
            item = self.getItem(permid)
            item.update(_item)
        self._put(permid, item)
                        
    def deletePreference(self, permid, infohash):
        if self._has_key(permid):
            preferences = self._get(permid)
            preferences.pop(infohash)
            self._put(permid, preferences)
            
    def getPreference(self, permid, infohash):
        if self._has_key(permid):
            preferences = self._get(permid)
            if preferences.has_key(infohash):
                return preferences[infohash]
        return None
            
    def hasPreference(self, permid, infohash):
        if self._has_key(permid):
            return infohash in self._get(permid)
        else:
            return False

    def deleteItem(self, permid):
        self._delete(permid)

    def getItem(self, permid):
        return self._get(permid, {})


class MyPreferenceDB(BasicDB):     #  = FileDB
    
    __single = None
        
    def __init__(self, db_dir=''):
        if MyPreferenceDB.__single:
            raise RuntimeError, "TorrentDB is singleton"
        self.db_name = 'mypreferences.bsd'
        self.opened = True
        self._data = open_db(self.db_name, db_dir)    # dbshelve object
        MyDB.checkVersion(self)
        MyPreferenceDB.__single = self 
        self.default_item = {
            'created_time':0,
            'rank':0,  # -1 ~ 5, as a recommendation degree to others
            'content_name':'',  # real file name in disk, may be different with info['name']
            'content_dir':'',   # dir + name = full path
            'last_seen':0,
        }
                
    def getInstance(*args, **kw):
        if MyPreferenceDB.__single is None:
            MyPreferenceDB(*args, **kw)
        return MyPreferenceDB.__single
    getInstance = staticmethod(getInstance)

    def updateItem(self, infohash, item={}):    # insert a torrent; update it if existed
        if isValidInfohash(infohash) and validDict(item):
            if self._has_key(infohash):
                _item = self.getItem(infohash)
                _item.update(item)
                _item.update({'last_seen':int(time())})
                self._updateItem(infohash, _item)
            else:
                self.default_item['created_time'] = self.default_item['last_seen'] = int(time())
                item = self.setDefaultItem(item)
                self._put(infohash, item)
        self._sync()
                
    def deleteItem(self, infohash):
        self._delete(infohash)
        self._sync()
        
    def getItem(self, infohash, default=False):
        ret = self._get(infohash, None)
        if ret is None and default:
            ret = deepcopy(self.default_item)
        return ret
    
    def getRank(self, infohash):
        v = self._get(infohash)
        if not v:
            return 0
        return v.get('rank', 0)
        
    
class OwnerDB(BasicDB):
    """ Torrent * Peer """
    
    __single = None
    
    def __init__(self, db_dir=''):
        if OwnerDB.__single:
            raise RuntimeError, "OwnerDB is singleton"
        self.db_name = 'owners.bsd'
        self.opened = True
        self._data = open_db(self.db_name, db_dir)    # dbshelve object
        OwnerDB.__single = self 
                
    def getInstance(*args, **kw):
        if OwnerDB.__single is None:
            OwnerDB(*args, **kw)
        return OwnerDB.__single
    getInstance = staticmethod(getInstance)
    
    def getNumOwners(self, infohash):
        owners = self._get(infohash)
        if owners is not None:
            n = len(owners)
        else:
            n = 0
        #print n, `infohash`, owners
        return n
        

    def addOwner(self, infohash, permid):
        if isValidPermid(permid) and isValidInfohash(infohash):
            if self._has_key(infohash):
                owners = self._get(infohash)
                owners.add(permid)
                self._put(infohash, owners)
            else:
                self._put(infohash, Set([permid]))
        
    def deleteOwner(self, infohash, permid):
        try:
            owners = self._get(infohash)
            owners.remove(permid)
            if not owners:    # remove the item if it is empty
                self._delete(infohash)
            else:
                self._put(infohash, owners)
        except:
            pass
        
    def isOwner(self, permid, infohash):
        if self._has_key(infohash):
            owners = self._get(infohash)
            return permid in owners
        else:
            return False
        
    def deleteItem(self, infohash):
        self._delete(infohash)

    def getItem(self, infohash):
        owners = self._get(infohash)
        if owners is not None:
            return list(owners)
        else:
            return []
                    
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.