__init__.py :  » Web-Server » Porcupine-Web-Application-Server » porcupine-0.6-src » porcupine » db » bsddb » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Web Server » Porcupine Web Application Server 
Porcupine Web Application Server » porcupine 0.6 src » porcupine » db » bsddb » __init__.py
#===============================================================================
#    Copyright 2005-2009, Tassos Koutsovassilis
#
#    This file is part of Porcupine.
#    Porcupine is free software; you can redistribute it and/or modify
#    it under the terms of the GNU Lesser General Public License as published by
#    the Free Software Foundation; either version 2.1 of the License, or
#    (at your option) any later version.
#    Porcupine is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU Lesser General Public License for more details.
#    You should have received a copy of the GNU Lesser General Public License
#    along with Porcupine; if not, write to the Free Software
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
#===============================================================================
"""
Porcupine server Berkeley DB interface
"""
import os.path
import os
import time
import glob
try:
    from bsddb3 import db
except ImportError:
    from bsddb import db
from threading import Thread

from porcupine import context
from porcupine import exceptions
from porcupine.core import persist
from porcupine.config.settings import settings
from porcupine.db.bsddb.transaction import Transaction
from porcupine.db.bsddb.index import DbIndex
from porcupine.db.bsddb.cursor import Cursor,Join
from porcupine.utils.db import _err_unsupported_index_type
from porcupine.utils.db.backup import BackupFile

class DB(object):
    "Berkeley DB database interface"
    # data dir
    dir = os.path.abspath(settings['store']['bdb_data_dir'])
    if dir[-1] != '/':
        dir += '/'
    # log_dir
    log_dir = os.path.abspath(settings['store'].get('bdb_log_dir', dir))
    if log_dir[-1] != '/':
        log_dir += '/'
    # temporary directory
    tmp_dir = os.path.abspath(settings['global']['temp_folder'])
    if tmp_dir[-1] != '/':
        tmp_dir += '/'
    # cache size
    cache_size = settings['store'].get('cache_size', None)
    # transaction timeout
    txn_timeout = settings['store'].get('tx_timeout', 0)
    # shared memory key
    shm_key = settings['store'].get('shm_key', None)
    # maintenance (checkpoint) thread
    _maintenance_thread = None

    def __init__(self, **kwargs):
        # create db environment
        additional_flags = kwargs.get('flags', 0)
        recovery_mode = kwargs.get('recover', 0)
        if recovery_mode == 2:
            additional_flags |= db.DB_RECOVER_FATAL
        elif recovery_mode == 1:
            additional_flags |= db.DB_RECOVER
            if hasattr(db, 'DB_REGISTER'):
                additional_flags |= db.DB_REGISTER

        self._env = db.DBEnv()
        self._env.set_data_dir(self.dir)
        self._env.set_lg_dir(self.log_dir)
        if self.txn_timeout > 0:
            self._env.set_timeout(self.txn_timeout, db.DB_SET_TXN_TIMEOUT)
        else:
            self._env.set_flags(db.DB_TXN_NOWAIT, 1)

        if self.cache_size != None:
            self._env.set_cachesize(*self.cache_size)

        if os.name != 'nt' and self.shm_key:
            self._env.set_shm_key(self.shm_key)
            additional_flags |= db.DB_SYSTEM_MEM

        self._env.open(self.tmp_dir,
                       db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
                       db.DB_INIT_LOG | db.DB_INIT_TXN | db.DB_CREATE |
                       additional_flags)

        dbMode = 0660
        dbFlags = db.DB_THREAD | db.DB_CREATE | db.DB_AUTO_COMMIT

        # open items db
        self._itemdb = db.DB(self._env)
        self._itemdb.open(
            'porcupine.db',
            'items',
            dbtype = db.DB_HASH,
            mode = dbMode,
            flags = dbFlags
        )

        # open documents db
        self._docdb = db.DB(self._env)
        self._docdb.open(
            'porcupine.db',
            'docs',
            dbtype = db.DB_HASH,
            mode = dbMode,
            flags = dbFlags
        )

        # open indices
        self._indices = {}
        for name, unique, immutable in settings['store']['indices']:
            self._indices[name] = DbIndex(self._env, self._itemdb,
                                          name, unique, immutable)

        self._running = True

        maintain = kwargs.get('maintain', True)
        if maintain and self._maintenance_thread == None:
            self._maintenance_thread = Thread(target=self.__maintain,
                                              name='DB maintenance thread')
            self._maintenance_thread.start()
    
    def is_open(self):
        return self._running

    # item operations
    def get_item(self, oid):
        try:
            return self._itemdb.get(oid,
                                    txn=context._trans and context._trans.txn)
        except (db.DBLockDeadlockError, db.DBLockNotGrantedError):
            context._trans.abort()
            raise exceptions.DBRetryTransaction

    def put_item(self, item):
        try:
            self._itemdb.put(item._id, persist.dumps(item), context._trans.txn)
        except (db.DBLockDeadlockError, db.DBLockNotGrantedError):
            context._trans.abort()
            raise exceptions.DBRetryTransaction
        except db.DBError, e:
            if e[0] == _err_unsupported_index_type:
                raise db.DBError, "Unsupported indexed data type"
            else:
                raise

    def delete_item(self, oid):
        try:
            self._itemdb.delete(oid, context._trans.txn)
        except (db.DBLockDeadlockError, db.DBLockNotGrantedError):
            context._trans.abort()
            raise exceptions.DBRetryTransaction

    # external attributes
    def get_external(self, id):
        try:
            return self._docdb.get(id,
                                   txn=context._trans and context._trans.txn)
        except (db.DBLockDeadlockError, db.DBLockNotGrantedError):
            context._trans.abort()
            raise exceptions.DBRetryTransaction

    def put_external(self, id, stream):
        try:
            self._docdb.put(id, stream, context._trans.txn)
        except (db.DBLockDeadlockError, db.DBLockNotGrantedError):
            context._trans.abort()
            raise exceptions.DBRetryTransaction

    def delete_external(self, id):
        try:
            self._docdb.delete(id, context._trans.txn)
        except (db.DBLockDeadlockError, db.DBLockNotGrantedError):
            context._trans.abort()
            raise exceptions.DBRetryTransaction

    # indices
    def get_cursor_list(self, conditions):
        cur_list = []
        for index, value in conditions:
            cursor = Cursor(self._indices[index])
            if type(value) == tuple:
                cursor.set_range(value[0], value[1])
            else:
                cursor.set(value)
            cur_list.append(cursor)
        return cur_list

    def query_index(self, index, value):
        cursor = self.get_cursor_list(((index, value),))[0]
        return cursor

    def join(self, conditions):
        cur_list = self.get_cursor_list(conditions)
        c_join = Join(self._itemdb, cur_list)
        return c_join

    def switch_cursor_scope(self, cursor, scope):
        if isinstance(cursor, Join):
            # assume that the scope cursor is always last
            cursor._cur_list[-1].set(scope)
        else:
            cursor.set(scope)

    def test_join(self, conditions):
        cur_list = self.get_cursor_list(conditions)
        c_join = Join(self._itemdb, cur_list)
        iterator = iter(c_join)
        try:
            result = bool(iterator.next())
        except StopIteration:
            result = False
        c_join.close()
        return result

    # transactions
    def get_transaction(self, nosync):
        return Transaction(self._env, nosync)

    def __remove_env(self):
        files = glob.glob(self.tmp_dir + '__db.???')
        for file in files:
            os.remove(file)

    # administrative
    def __remove_files(self):
        # environment files
        self.__remove_env()
        # log files
        files = glob.glob(self.log_dir + 'log.*')
        for file in files:
            os.remove(file)
        # database file
        os.remove(self.dir + 'porcupine.db')
        # index file
        os.remove(self.dir + 'porcupine.idx')
        
    def truncate(self):
        # older versions of bsddb do not support truncate
        if hasattr(self._itemdb, 'truncate'):
            self._itemdb.truncate()
            self._docdb.truncate()
        else:
            # close database
            self.close()
            # remove old database files
            self.__remove_files()
            # open db
            self.__init__()
    
    def backup(self, output_file):
        # force checkpoint
        self._env.txn_checkpoint(0, 0, db.DB_FORCE)
        logs = self._env.log_archive(db.DB_ARCH_LOG)
        backfiles = [self.dir + 'porcupine.db', self.dir + 'porcupine.idx'] + \
                    [self.log_dir + log for log in logs]
        # compact backup....
        backup = BackupFile(output_file)
        backup.add_files(backfiles)
        
    def restore(self, bset):
        self.__remove_files()
        backup = BackupFile(bset)
        backup.extract(self.dir, self.log_dir)

    def shrink(self):
        logs = self._env.log_archive()
        for log in logs:
            os.remove(self.log_dir + log)
        return len(logs)

    def __maintain(self):
        "deadlock detection thread"
        from porcupine.core.runtime import logger
        while self._running:
            time.sleep(0.05)
            # deadlock detection
            try:
                aborted = self._env.lock_detect(db.DB_LOCK_RANDOM,
                                                db.DB_LOCK_CONFLICT)
                if aborted:
                    logger.critical(
                        "Deadlock: Aborted %d deadlocked transaction(s)"
                        % aborted)
            except db.DBError:
                 pass
            #stats = self._env.txn_stat()
            #print 'txns: %d' % stats['nactive']
            #print 'max txns: %d' % stats['maxnactive']
            #print
            #stats = self._env.lock_stat()
            #print 'Lockers: %d' % stats['nlockers']
            #print 'Max Lockers: %d' % stats['maxnlockers']
            #print 'Lockers wait: %d' % stats['lockers_wait']
            #print
            #print 'Locks: %d' % stats['nlocks']
            #print 'Max Locks: %d' % stats['maxnlocks']
            #print 'Locks wait: %d' % stats['lock_wait']
            #print 'Locks no-wait: %d' % stats['lock_nowait']
            #print
            #print 'Lock objects: %d' % stats['nobjects']
            #print 'Max objects: %d' % stats['maxnobjects']
            #print 'Objects wait: %d' % stats['objs_wait']
            #print
            #print 'Requested: %d' % stats['nrequests']
            #print 'Released: %d' % stats['nreleases']
            #print '-' * 80

    def close(self):
        if self._running:
            self._running = False
            if self._maintenance_thread != None:
                self._maintenance_thread.join()
            self._itemdb.close()
            self._docdb.close()
            # close indexes
            [index.close() for index in self._indices.values()]
            self._env.close()
            # clean-up environment files
            #if self._maintenance_thread != None:
            #    self.__remove_env()
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.