StorageTestBase.py :  » Web-Frameworks » Zope » Zope-2.6.0 » lib » python » ZODB » tests » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Web Frameworks » Zope 
Zope » Zope 2.6.0 » lib » python » ZODB » tests » StorageTestBase.py
"""Provide a mixin base class for storage tests.

The StorageTestBase class provides basic setUp() and tearDown()
semantics (which you can override), and it also provides a helper
method _dostore() which performs a complete store transaction for a
single object revision.
"""

import errno
import os
import pickle
import string
import sys
import types
import unittest
from cPickle import Pickler,Unpickler
from cStringIO import StringIO

from ZODB.Transaction import Transaction

from ZODB.tests.MinPO import MinPO

ZERO = '\0'*8

def zodb_pickle(obj):
    """Create a pickle in the format expected by ZODB."""
    f = StringIO()
    p = Pickler(f, 1)
    klass = obj.__class__
    assert not hasattr(obj, '__getinitargs__'), "not ready for constructors"
    args = None

    mod = getattr(klass, '__module__', None)
    if mod is not None:
        klass = mod, klass.__name__

    state = obj.__getstate__()

    p.dump((klass, args))
    p.dump(state)
    return f.getvalue(1)

def zodb_unpickle(data):
    """Unpickle an object stored using the format expected by ZODB."""
    f = StringIO(data)
    u = Unpickler(f)
    klass_info = u.load()
    if isinstance(klass_info, types.TupleType):
        if isinstance(klass_info[0], types.TupleType):
            modname, klassname = klass_info[0]
            args = klass_info[1]
        else:
            modname, klassname = klass_info
            args = None
        if modname == "__main__":
            ns = globals()
        else:
            mod = import_helper(modname)
            ns = mod.__dict__
        try:
            klass = ns[klassname]
        except KeyError:
            sys.stderr.write("can't find %s in %s" % (klassname,
                                                      repr(ns)))
        inst = klass()
    else:
        raise ValueError, "expected class info: %s" % repr(klass_info)
    state = u.load()
    inst.__setstate__(state)
    return inst

def handle_all_serials(oid, *args):
    """Return dict of oid to serialno from store() and tpc_vote().

    Raises an exception if one of the calls raised an exception.

    The storage interface got complicated when ZEO was introduced.
    Any individual store() call can return None or a sequence of
    2-tuples where the 2-tuple is either oid, serialno or an
    exception to be raised by the client.

    The original interface just returned the serialno for the
    object.
    """
    d = {}
    for arg in args:
        if isinstance(arg, types.StringType):
            d[oid] = arg
        elif arg is None:
            pass
        else:
            for oid, serial in arg:
                if not isinstance(serial, types.StringType):
                    raise serial # error from ZEO server
                d[oid] = serial
    return d

def handle_serials(oid, *args):
    """Return the serialno for oid based on multiple return values.

    A helper for function _handle_all_serials().
    """
    args = (oid,) + args
    return apply(handle_all_serials, args)[oid]

def import_helper(name):
    mod = __import__(name)
    return sys.modules[name]

def removefs(base):
    """Remove all files created by FileStorage with path base."""
    for ext in '', '.old', '.tmp', '.lock', '.index', '.pack':
        path = base + ext
        try:
            os.remove(path)
        except os.error, err:
            if err[0] != errno.ENOENT:
                raise


class StorageTestBase(unittest.TestCase):

    # XXX It would be simpler if concrete tests didn't need to extend
    # setUp() and tearDown().

    def setUp(self):
        # You need to override this with a setUp that creates self._storage
        self._storage = None

    def _close(self):
        # You should override this if closing your storage requires additional
        # shutdown operations.
        if self._storage is not None:
            self._storage.close()

    def tearDown(self):
        self._close()

    def _dostore(self, oid=None, revid=None, data=None, version=None,
                 already_pickled=0, user=None, description=None):
        """Do a complete storage transaction.  The defaults are:

         - oid=None, ask the storage for a new oid
         - revid=None, use a revid of ZERO
         - data=None, pickle up some arbitrary data (the integer 7)
         - version=None, use the empty string version

        Returns the object's new revision id.
        """
        if oid is None:
            oid = self._storage.new_oid()
        if revid is None:
            revid = ZERO
        if data is None:
            data = MinPO(7)
        if type(data) == types.IntType:
            data = MinPO(data)
        if not already_pickled:
            data = zodb_pickle(data)
        if version is None:
            version = ''
        # Begin the transaction
        t = Transaction()
        if user is not None:
            t.user = user
        if description is not None:
            t.description = description
        try:
            self._storage.tpc_begin(t)
            # Store an object
            r1 = self._storage.store(oid, revid, data, version, t)
            # Finish the transaction
            r2 = self._storage.tpc_vote(t)
            revid = handle_serials(oid, r1, r2)
            self._storage.tpc_finish(t)
        except:
            self._storage.tpc_abort(t)
            raise
        return revid

    def _dostoreNP(self, oid=None, revid=None, data=None, version=None,
                   user=None, description=None):
        return self._dostore(oid, revid, data, version, already_pickled=1)
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.