Connection.py :  » Web-Frameworks » Zope » Zope-2.6.0 » lib » python » ZODB » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Web Frameworks » Zope 
Zope » Zope 2.6.0 » lib » python » ZODB » Connection.py
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Database connection support

$Id: Connection.py,v 1.72 2002/08/14 22:07:09 mj Exp $"""

from cPickleCache import PickleCache,MUCH_RING_CHECKING
from POSException import ConflictError,ReadConflictError
from ExtensionClass import Base
import ExportImport, TmpStore
from zLOG import LOG,ERROR,BLATHER,WARNING
from coptimizations import new_persistent_id
from ConflictResolution import ResolvedSerial

from cPickle import Unpickler,Pickler
from cStringIO import StringIO
import sys
from time import time
from types import StringType,ClassType

global_code_timestamp = 0

if MUCH_RING_CHECKING:
    # To get rid of this warning, change the define inside
    # cPickleCache.c and recompile.
    LOG('ZODB',WARNING,
        'Using cPickleCache with low performance (but extra debugging checks)')
del MUCH_RING_CHECKING

def updateCodeTimestamp():
    '''
    Called after changes are made to persistence-based classes.
    Causes all connection caches to be re-created as the
    connections are reopened.
    '''
    global global_code_timestamp
    global_code_timestamp = time()

ExtensionKlass=Base.__class__

class Connection(ExportImport.ExportImport):
    """Object managers for individual object space.

    An object space is a version of collection of objects.  In a
    multi-threaded application, each thread get's it's own object
    space.

    The Connection manages movement of objects in and out of object storage.
    """
    _tmp=None
    _debug_info=()
    _opened=None
    _code_timestamp = 0

    # Experimental. Other connections can register to be closed
    # when we close by putting something here.

    def __init__(self, version='', cache_size=400,
                 cache_deactivate_after=60):
        """Create a new Connection"""
        self._version=version
        self._cache = cache = PickleCache(self, cache_size)
        if version:
            # Caches for versions end up empty if the version
            # is not used for a while. Non-version caches
            # keep their content indefinitely.

            # XXX Why do we want version caches to behave this way?

            self._cache.cache_drain_resistance = 100
        self._incrgc=self.cacheGC=cache.incrgc
        self._invalidated=d={}
        self._invalid=d.has_key
        self._committed=[]
        self._code_timestamp = global_code_timestamp
        self._load_count = 0   # Number of objects unghosted
        self._store_count = 0  # Number of objects stored

    def _cache_items(self):
        # find all items on the lru list
        items = self._cache.lru_items()
        # fine everything. some on the lru list, some not
        everything = self._cache.cache_data
        # remove those items that are on the lru list
        for k,v in items:
            del everything[k]
        # return a list of [ghosts....not recently used.....recently used]
        return everything.items() + items

    def __repr__(self):
        if self._version:
            ver = ' (in version %s)' % `self._version`
        else:
            ver = ''
        return '<Connection at %08x%s>' % (id(self), ver)

    def _breakcr(self):
        try: del self._cache
        except: pass
        try: del self._incrgc
        except: pass
        try: del self.cacheGC
        except: pass

    def __getitem__(self, oid, tt=type(())):
        obj = self._cache.get(oid, None)
        if obj is not None:
            return obj

        __traceback_info__ = (oid)
        p, serial = self._storage.load(oid, self._version)
        __traceback_info__ = (oid, p)
        file=StringIO(p)
        unpickler=Unpickler(file)
        unpickler.persistent_load=self._persistent_load

        try:
            object = unpickler.load()
        except:
            raise "Could not load oid %s, pickled data in traceback info may\
            contain clues" % (oid)

        klass, args = object

        if type(klass) is tt:
            module, name = klass
            klass=self._db._classFactory(self, module, name)

        if (args is None or
            not args and not hasattr(klass,'__getinitargs__')):
            object=klass.__basicnew__()
        else:
            object=apply(klass,args)
            if klass is not ExtensionKlass:
                object.__dict__.clear()

        object._p_oid=oid
        object._p_jar=self
        object._p_changed=None
        object._p_serial=serial

        self._cache[oid] = object
        if oid=='\0\0\0\0\0\0\0\0':
            self._root_=object # keep a ref
        return object

    def _persistent_load(self,oid,
                        tt=type(())):

        __traceback_info__=oid

        if type(oid) is tt:
            # Quick instance reference.  We know all we need to know
            # to create the instance wo hitting the db, so go for it!
            oid, klass = oid
            obj = self._cache.get(oid, None)
            if obj is not None:
                return obj

            if type(klass) is tt:
                module, name = klass
                try: klass=self._db._classFactory(self, module, name)
                except:
                    # Eek, we couldn't get the class. Hm.
                    # Maybe their's more current data in the
                    # object's actual record!
                    return self[oid]

            object=klass.__basicnew__()
            object._p_oid=oid
            object._p_jar=self
            object._p_changed=None

            self._cache[oid] = object

            return object

        obj = self._cache.get(oid, None)
        if obj is not None:
            return obj
        return self[oid]

    def _setDB(self, odb):
        """Begin a new transaction.

        Any objects modified since the last transaction are invalidated.
        """
        self._db=odb
        self._storage=s=odb._storage
        self.new_oid=s.new_oid
        if self._code_timestamp != global_code_timestamp:
            # New code is in place.  Start a new cache.
            self._resetCache()
        else:
            self._cache.invalidate(self._invalidated)
        self._opened=time()

        return self

    def _resetCache(self):
        '''
        Creates a new cache, discarding the old.
        '''
        self._code_timestamp = global_code_timestamp
        self._invalidated.clear()
        orig_cache = self._cache
        self._cache = PickleCache(self, orig_cache.cache_size)

    def abort(self, object, transaction):
        """Abort the object in the transaction.

        This just deactivates the thing.
        """
        if object is self:
            self._cache.invalidate(self._invalidated)
        else:
            assert object._p_oid is not None
            self._cache.invalidate(object._p_oid)

    def cacheFullSweep(self, dt=0):
        self._cache.full_sweep(dt)

    def cacheMinimize(self, dt=0):
        # dt is ignored
        self._cache.minimize()

    __onCloseCallbacks = None

    def onCloseCallback(self, f):
        if self.__onCloseCallbacks is None:
            self.__onCloseCallbacks = []
        self.__onCloseCallbacks.append(f)

    def close(self):
        self._incrgc() # This is a good time to do some GC
        db=self._db

        # Call the close callbacks.
        if self.__onCloseCallbacks is not None:
            for f in self.__onCloseCallbacks:
                try: f()
                except:
                    f=getattr(f, 'im_self', f)
                    LOG('ZODB',ERROR, 'Close callback failed for %s' % f,
                        error=sys.exc_info())
            self.__onCloseCallbacks = None
        self._db=self._storage=self._tmp=self.new_oid=self._opened=None
        self._debug_info=()
        # Return the connection to the pool.
        db._closeConnection(self)

    __onCommitActions = None

    def onCommitAction(self, method_name, *args, **kw):
        if self.__onCommitActions is None:
            self.__onCommitActions = []
        self.__onCommitActions.append((method_name, args, kw))
        get_transaction().register(self)

    def commit(self, object, transaction):
        if object is self:
            # We registered ourself.  Execute a commit action, if any.
            if self.__onCommitActions is not None:
                method_name, args, kw = self.__onCommitActions.pop(0)
                apply(getattr(self, method_name), (transaction,) + args, kw)
            return
        oid=object._p_oid
        invalid=self._invalid
        if oid is None or object._p_jar is not self:
            # new object
            oid = self.new_oid()
            object._p_jar=self
            object._p_oid=oid
            self._creating.append(oid)

        elif object._p_changed:
            if (
                (invalid(oid) and not hasattr(object, '_p_resolveConflict'))
                or
                invalid(None)
                ):
                raise ConflictError(object=object)
            self._invalidating.append(oid)

        else:
            # Nothing to do
            return

        stack=[object]

        # Create a special persistent_id that passes T and the subobject
        # stack along:
        #
        # def persistent_id(object,
        #                   self=self,
        #                   stackup=stackup, new_oid=self.new_oid):
        #     if (not hasattr(object, '_p_oid') or
        #         type(object) is ClassType): return None
        #
        #     oid=object._p_oid
        #
        #     if oid is None or object._p_jar is not self:
        #         oid = self.new_oid()
        #         object._p_jar=self
        #         object._p_oid=oid
        #         stackup(object)
        #
        #     klass=object.__class__
        #
        #     if klass is ExtensionKlass: return oid
        #
        #     if hasattr(klass, '__getinitargs__'): return oid
        #
        #     module=getattr(klass,'__module__','')
        #     if module: klass=module, klass.__name__
        #
        #     return oid, klass

        file=StringIO()
        seek=file.seek
        pickler=Pickler(file,1)
        pickler.persistent_id=new_persistent_id(self, stack.append)
        dbstore=self._storage.store
        file=file.getvalue
        cache=self._cache
        get=cache.get
        dump=pickler.dump
        clear_memo=pickler.clear_memo


        version=self._version

        while stack:
            object=stack[-1]
            del stack[-1]
            oid=object._p_oid
            serial=getattr(object, '_p_serial', '\0\0\0\0\0\0\0\0')
            if serial == '\0\0\0\0\0\0\0\0':
                # new object
                self._creating.append(oid)
            else:
                #XXX We should never get here
                if (
                    (invalid(oid) and
                     not hasattr(object, '_p_resolveConflict'))
                    or
                    invalid(None)
                    ):
                    raise ConflictError(object=object)
                self._invalidating.append(oid)

            klass = object.__class__

            if klass is ExtensionKlass:
                # Yee Ha!
                dict={}
                dict.update(object.__dict__)
                del dict['_p_jar']
                args=object.__name__, object.__bases__, dict
                state=None
            else:
                if hasattr(klass, '__getinitargs__'):
                    args = object.__getinitargs__()
                    len(args) # XXX Assert it's a sequence
                else:
                    args = None # New no-constructor protocol!

                module=getattr(klass,'__module__','')
                if module: klass=module, klass.__name__
                __traceback_info__=klass, oid, self._version
                state=object.__getstate__()

            seek(0)
            clear_memo()
            dump((klass,args))
            dump(state)
            p=file(1)
            s=dbstore(oid,serial,p,version,transaction)
            self._store_count = self._store_count + 1
            # Put the object in the cache before handling the
            # response, just in case the response contains the
            # serial number for a newly created object
            try: cache[oid]=object
            except:
                # Dang, I bet its wrapped:
                if hasattr(object, 'aq_base'):
                    cache[oid]=object.aq_base
                else:
                    raise

            self._handle_serial(s, oid)

    def commit_sub(self, t):
        """Commit all work done in subtransactions"""
        tmp=self._tmp
        if tmp is None: return
        src=self._storage

        LOG('ZODB', BLATHER,
            'Commiting subtransaction of size %s' % src.getSize())

        self._storage=tmp
        self._tmp=None

        tmp.tpc_begin(t)

        load=src.load
        store=tmp.store
        dest=self._version
        get=self._cache.get
        oids=src._index.keys()

        # Copy invalidating and creating info from temporary storage:
        invalidating=self._invalidating
        invalidating[len(invalidating):]=oids
        creating=self._creating
        creating[len(creating):]=src._creating

        for oid in oids:
            data, serial = load(oid, src)
            s=store(oid, serial, data, dest, t)
            self._handle_serial(s, oid, change=0)

    def abort_sub(self, t):
        """Abort work done in subtransactions"""
        tmp=self._tmp
        if tmp is None: return
        src=self._storage
        self._tmp=None
        self._storage=tmp

        self._cache.invalidate(src._index.keys())
        self._invalidate_creating(src._creating)

    def _invalidate_creating(self, creating=None):
        """Dissown any objects newly saved in an uncommitted transaction.
        """
        if creating is None:
            creating=self._creating
            self._creating=[]

        cache=self._cache
        cache_get=cache.get
        for oid in creating:
            o=cache_get(oid, None)
            if o is not None:
                del cache[oid]
                del o._p_jar
                del o._p_oid

    #XXX

    def db(self): return self._db

    def getVersion(self): return self._version

    def invalidate(self, oid):
        """Invalidate a particular oid

        This marks the oid as invalid, but doesn't actually invalidate
        it.  The object data will be actually invalidated at certain
        transaction boundaries.
        """
        assert oid is not None
        self._invalidated[oid] = 1

    def modifiedInVersion(self, oid):
        try: return self._db.modifiedInVersion(oid)
        except KeyError:
            return self._version

    def register(self, object):
        """Register an object with the appropriate transaction manager.

        A subclass could override this method to customize the default
        policy of one transaction manager for each thread.
        """
        assert object._p_jar is self
        # XXX Figure out why this assert causes test failures
        # assert object._p_oid is not None
        get_transaction().register(object)

    def root(self):
        return self['\0\0\0\0\0\0\0\0']

    def setstate(self, object):
        oid = object._p_oid

        if self._storage is None:
            msg = ("Shouldn't load state for %s "
                   "when the connection is closed" % `oid`)
            LOG('ZODB', ERROR, msg)
            raise RuntimeError(msg)

        try:
            p, serial = self._storage.load(oid, self._version)
            self._load_count = self._load_count + 1

            # XXX this is quite conservative!
            # We need, however, to avoid reading data from a transaction
            # that committed after the current "session" started, as
            # that might lead to mixing of cached data from earlier
            # transactions and new inconsistent data.
            #
            # Note that we (carefully) wait until after we call the
            # storage to make sure that we don't miss an invaildation
            # notifications between the time we check and the time we
            # read.
            invalid = self._invalid
            if invalid(oid) or invalid(None):
                if not hasattr(object.__class__, '_p_independent'):
                    get_transaction().register(self)
                    raise ReadConflictError(object=object)
                invalid = 1
            else:
                invalid = 0

            file = StringIO(p)
            unpickler = Unpickler(file)
            unpickler.persistent_load = self._persistent_load
            unpickler.load()
            state = unpickler.load()

            if hasattr(object, '__setstate__'):
                object.__setstate__(state)
            else:
                d = object.__dict__
                for k, v in state.items():
                    d[k] = v

            object._p_serial = serial

            if invalid:
                if object._p_independent():
                    try:
                        del self._invalidated[oid]
                    except KeyError:
                        pass
                else:
                    get_transaction().register(self)
                    raise ConflictError(object=object)

        except ConflictError:
            raise
        except:
            LOG('ZODB',ERROR, "Couldn't load state for %s" % `oid`,
                error=sys.exc_info())
            raise

    def oldstate(self, object, serial):
        oid=object._p_oid
        p = self._storage.loadSerial(oid, serial)
        file=StringIO(p)
        unpickler=Unpickler(file)
        unpickler.persistent_load=self._persistent_load
        unpickler.load()
        return  unpickler.load()

    def setklassstate(self, object):
        try:
            oid=object._p_oid
            __traceback_info__=oid
            p, serial = self._storage.load(oid, self._version)
            file=StringIO(p)
            unpickler=Unpickler(file)
            unpickler.persistent_load=self._persistent_load

            copy = unpickler.load()

            klass, args = copy

            if klass is not ExtensionKlass:
                LOG('ZODB',ERROR,
                    "Unexpected klass when setting class state on %s"
                    % getattr(object,'__name__','(?)'))
                return

            copy=apply(klass,args)
            object.__dict__.clear()
            object.__dict__.update(copy.__dict__)

            object._p_oid=oid
            object._p_jar=self
            object._p_changed=0
            object._p_serial=serial
        except:
            LOG('ZODB',ERROR, 'setklassstate failed', error=sys.exc_info())
            raise

    def tpc_abort(self, transaction):
        if self.__onCommitActions is not None:
            del self.__onCommitActions
        self._storage.tpc_abort(transaction)
        cache=self._cache
        cache.invalidate(self._invalidated)
        cache.invalidate(self._invalidating)
        self._invalidate_creating()

    def tpc_begin(self, transaction, sub=None):
        if self._invalid(None): # Some nitwit invalidated everything!
            raise ConflictError("transaction already invalidated")
        self._invalidating=[]
        self._creating=[]

        if sub:
            # Sub-transaction!
            _tmp=self._tmp
            if _tmp is None:
                _tmp=TmpStore.TmpStore(self._version)
                self._tmp=self._storage
                self._storage=_tmp
                _tmp.registerDB(self._db, 0)

        self._storage.tpc_begin(transaction)

    def tpc_vote(self, transaction):
        if self.__onCommitActions is not None:
            del self.__onCommitActions
        try:
            vote=self._storage.tpc_vote
        except AttributeError:
            return
        s = vote(transaction)
        self._handle_serial(s)

    def _handle_serial(self, store_return, oid=None, change=1):
        """Handle the returns from store() and tpc_vote() calls."""

        # These calls can return different types depending on whether
        # ZEO is used.  ZEO uses asynchronous returns that may be
        # returned in batches by the ClientStorage.  ZEO1 can also
        # return an exception object and expect that the Connection
        # will raise the exception.

        # When commit_sub() exceutes a store, there is no need to
        # update the _p_changed flag, because the subtransaction
        # tpc_vote() calls already did this.  The change=1 argument
        # exists to allow commit_sub() to avoid setting the flag
        # again.
        if not store_return:
            return
        if isinstance(store_return, StringType):
            assert oid is not None
            serial = store_return
            obj = self._cache.get(oid, None)
            if obj is None:
                return
            if serial == ResolvedSerial:
                obj._p_changed = None
            else:
                if change:
                    obj._p_changed = 0
                obj._p_serial = serial
        else:
            for oid, serial in store_return:
                if not isinstance(serial, StringType):
                    raise serial
                obj = self._cache.get(oid, None)
                if obj is None:
                    continue
                if serial == ResolvedSerial:
                    obj._p_changed = None
                else:
                    if change:
                        obj._p_changed = 0
                    obj._p_serial = serial

    def tpc_finish(self, transaction):
        # It's important that the storage call the function we pass
        # (self._invalidate_invalidating) while it still has it's
        # lock.  We don't want another thread to be able to read any
        # updated data until we've had a chance to send an
        # invalidation message to all of the other connections!

        if self._tmp is not None:
            # Commiting a subtransaction!
            # There is no need to invalidate anything.
            self._storage.tpc_finish(transaction)
            self._storage._creating[:0]=self._creating
            del self._creating[:]
        else:
            self._db.begin_invalidation()
            self._storage.tpc_finish(transaction,
                                     self._invalidate_invalidating)

        self._cache.invalidate(self._invalidated)
        self._incrgc() # This is a good time to do some GC

    def _invalidate_invalidating(self):
        for oid in self._invalidating:
            assert oid is not None
            self._db.invalidate(oid, self)
        self._db.finish_invalidation()

    def sync(self):
        get_transaction().abort()
        sync=getattr(self._storage, 'sync', 0)
        if sync != 0: sync()
        self._cache.invalidate(self._invalidated)
        self._incrgc() # This is a good time to do some GC

    def getDebugInfo(self):
        return self._debug_info

    def setDebugInfo(self, *args):
        self._debug_info = self._debug_info + args

    def getTransferCounts(self, clear=0):
        """Returns the number of objects loaded and stored.

        Set the clear argument to reset the counters.
        """
        res = self._load_count, self._store_count
        if clear:
            self._load_count = 0
            self._store_count = 0
        return res


    ######################################################################
    # Just plain weird. Don't try this at home kids.
    def exchange(self, old, new):
        oid=old._p_oid
        new._p_oid=oid
        new._p_jar=self
        new._p_changed=1
        get_transaction().register(new)
        self._cache[oid]=new

class tConnection(Connection):

    def close(self):
        self._breakcr()
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.