snap_sql_store.py :  » Development » SnapLogic » snaplogic » server » repository » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » SnapLogic 
SnapLogic » snaplogic » server » repository » snap_sql_store.py
# $SnapHashLicense:
# 
# SnapLogic - Open source data services
# 
# Copyright (C) 2009, SnapLogic, Inc.  All rights reserved.
# 
# See http://www.snaplogic.org for more information about
# the SnapLogic project. 
# 
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
# 
# "SnapLogic" is a trademark of SnapLogic, Inc.
# 
# 
# $

# $Id: snap_sql_store.py 6804 2009-03-16 18:06:03Z kurt $

from __future__ import with_statement

"""
Abstract snap store module for SQL-based databases using the DB-API2 interfaces.

"""
__docformat__ = "epytext en"

import uuid

from snaplogic.server.repository.snap_store import SnapStore
from snaplogic.common import sqlite_iter,version_info,snap_log
from snaplogic.common.snap_exceptions import *
from snaplogic.common.db_utils import SQLQueryBuilder
from snaplogic.common.thread_utils import RWLock
from snaplogic.snapi_base import keys

class ManagedHandle(object):
    """
    Wraps a real database object with additional functionality.

    There are two main purposes to this class. The first is to provide a context manager wrappar around the
    database object to automatically handle rollbacks if commit was not called. The second is to manage a
    read/write lock around the database handle. The locking is only performed if an instance of this object is
    created with locking enabled.

    """

    def __init__(self, db, rwlock=None, lock_mode=None):
        """
        Initialization.

        The db parameter provides an opened database object. The lock_mode parameter should be set to one of
        L{RWLock.READ_LOCK} for read locking, L{RWLock.WRITE_LOCK} for write locking, or None to disable locking.

        @param db: An opened database object.
        @type db: object

        @param lock_mode: Mode to lock RWLock inside of with statement. None to disable locking.
        @type lock_mode: RWLock constant

        """
        super(ManagedHandle, self).__init__()
        self._db = db
        self._dirty = False
        self._lock = rwlock
        self._lock_mode = lock_mode

    def __enter__(self):
        self._cursor = self._db.cursor()
        if self._lock is not None:
            self._lock.acquire(self._lock_mode)

        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        try:
            if exc_type is not None and self._dirty:
                self._db.rollback()
                self._dirty = False
                self._cursor.close()
        finally:
            if self._lock is not None:
                self._lock.release()
            
        return False

    def execute(self, *args):
        if self._lock is not None and self._lock_mode is RWLock.WRITE_LOCK:
            # Assume when execute is called on a write lock, a change was made.
            self._dirty = True

        self._cursor.execute(*args)

    def executemany(self, *args):
        if self._lock is not None and self._lock_mode is RWLock.WRITE_LOCK:
            # Assume when execute is called on a write lock, a change was made.
            self._dirty = True
            
        self._cursor.executemany(*args)

    def rollback(self):
        self._dirty = False
        self._db.rollback()

    def commit(self):
        if self._lock is None or self._lock_mode is RWLock.WRITE_LOCK:
            self._db.commit()
            self._dirty = False
        else:
            self._dirty = True
            raise SnapRepInternalError("Attempt to commit changes without a write lock")

    def __getattr__(self, name):
        # Pretend to be the cursor for any attribute not on this object.
        return getattr(self._cursor, name)

class SnapSQLStore(SnapStore):
    """
    An abstract base class for SQL-based SnapStore classes.

    Using the standard DB-API2, this base class takes care of almost all details in providing a SnapStore API
    using standard SQL instructions. Descendents of this class must still implement connecting to the database
    and may have to override certain other methods if they use incompatible SQL instructions.

    The following methods are left to the 
    The most likely method that descendents will have to override is _createTables(), because the data types
    between SQL implementations vary so.


    """
    
    CREATE_TABLE_VERSION_INFO = """
                                CREATE TABLE version_info (version int NOT NULL,
                                                           designated_version varchar(16) NOT NULL)
                                """

    CREATE_TABLE_RESOURCE = """
                            CREATE TABLE resource (uri text(767) PRIMARY KEY NOT NULL,
                                                   guid char(32),
                                                   gen_id int,
                                                   object text NOT NULL,
                                                   validated int NOT NULL)
                            """

    CREATE_TABLE_SCHEDULER_EVENT = """
                                   CREATE TABLE scheduler_event (uri text(767) NOT NULL,
                                                                 event text NOT NULL)
                                   """

    CREATE_TABLE_REGISTRY = """CREATE TABLE registry (name varchar(255) PRIMARY KEY NOT NULL, value text)"""

    SELECT_VERSION = """SELECT version FROM version_info"""

    SELECT_DESIGNATED_VERSION = """SELECT designated_version FROM version_info"""
                     
    UPDATE_DESIGNATED_VERSION = SQLQueryBuilder("""
                                                UPDATE version_info SET designated_version = ${designated_version}
                                                """)

    SELECT_RESOURCES = """
                       SELECT uri, guid, gen_id, object FROM resource
                       """

    SELECT_RESOURCE_BY_URI = SQLQueryBuilder("""
                                             SELECT uri, guid, gen_id, object FROM resource WHERE uri = ${uri}
                                             """)

    SELECT_RESOURCE_SUMMARY_BY_URI = SQLQueryBuilder("""
                                                     SELECT uri, guid, gen_id FROM resource WHERE uri = ${uri}
                                                     """)
    SELECT_RESOURCE_SUMMARIES = """
                                SELECT uri, guid, gen_id FROM resource
                                """

    SELECT_RESOURCE_VALIDATED_FLAG = SQLQueryBuilder("""
                                                     SELECT validated FROM resource
                                                     WHERE uri = ${uri} AND guid = ${guid} AND gen_id = ${gen_id}
                                                     """)

    SELECT_SCHEDULER_EVENT_URIS = """SELECT uri FROM scheduler_event"""

    SELECT_SCHEDULER_EVENT_BY_URI = SQLQueryBuilder("""SELECT uri, event FROM scheduler_event WHERE uri = ${uri}""")

    SELECT_SCHEDULER_EVENT_URIS_LIKE_URI = SQLQueryBuilder("""
                                                           SELECT uri FROM scheduler_event
                                                           WHERE uri LIKE ${uri}
                                                           """)

    SELECT_REGISTRY_ENTRY_BY_NAME = SQLQueryBuilder("""SELECT value FROM registry WHERE name = ${name}""")

    INSERT_VERSION = SQLQueryBuilder("""
                                     INSERT INTO version_info VALUES (${version}, ${designated_version})
                                     """)

    INSERT_RESOURCE = SQLQueryBuilder("""
                                      INSERT INTO resource VALUES (${uri}, ${guid}, ${gen_id}, ${object}, 0)
                                      """)

    INSERT_SCHEDULER_EVENT = SQLQueryBuilder("""
                                             INSERT INTO scheduler_event VALUES (${uri}, ${event})
                                             """)

    INSERT_OR_UPDATE_REGISTRY_ENTRY = None      # Children must set this implementation-specific SQL.

    REPLACE_RESOURCE = SQLQueryBuilder("""REPLACE INTO resource VALUES (${uri}, ${guid}, ${gen_id}, ${object}, 0)""")

    UPDATE_RESOURCE = SQLQueryBuilder("""
                                      UPDATE resource
                                      SET gen_id = gen_id + 1, object = ${object}, validated = 0
                                      WHERE uri = ${uri} AND guid = ${guid} AND gen_id = ${gen_id}
                                      """)

    UPDATE_RESOURCE_FORCE = SQLQueryBuilder("""
                                            UPDATE resource
                                            SET guid = ${new_guid}, gen_id = 0, object = ${object}, validated = 0
                                            WHERE uri = ${uri}
                                            """)

    UPDATE_RESOURCE_FORCE_WITH_GUID = SQLQueryBuilder("""
                                                      UPDATE resource
                                                      SET gen_id = gen_id + 1, object = ${object}, validated = 0
                                                      WHERE uri = ${uri} AND guid = ${guid}
                                                      """)

    UPDATE_RESOURCE_VALIDATED_FLAG = SQLQueryBuilder("""
                                                     UPDATE resource
                                                     SET validated = ${validated}
                                                     WHERE uri = ${uri} AND guid = ${guid} AND gen_id = ${gen_id}
                                                     """)

    UPDATE_SCHEDULER_EVENT = SQLQueryBuilder("""
                                             UPDATE scheduler_event SET event = ${event} WHERE uri = ${uri}
                                             """)                            

    DELETE_RESOURCE = SQLQueryBuilder("""
                                      DELETE FROM resource WHERE uri = ${uri} AND guid = ${guid} AND gen_id = ${gen_id}
                                      """)

    DELETE_RESOURCE_FORCE = SQLQueryBuilder("""DELETE FROM resource WHERE uri = ${uri}""")

    DELETE_RESOURCE_FORCE_WITH_GUID = SQLQueryBuilder("""DELETE FROM resource WHERE uri = ${uri} AND guid = ${guid}""")

    DELETE_SCHEDULER_EVENT = SQLQueryBuilder("""DELETE FROM scheduler_event WHERE uri = ${uri}""")

    DELETE_REGISTRY_ENTRY = SQLQueryBuilder("""DELETE FROM registry WHERE name = ${name}""")

    DROP_TABLE_PS = """DROP TABLE %s"""

    def __init__(self, module, rwlock=None):
        """
        Initialization.

        @param module: The DB-API2 module for the relevant database backend.
        @type paramstyle: module

        @param rwlock: A RWLock instance, if locks are desired. None if not.
        @type rwlock: RWLock

        """
        super(SnapSQLStore, self).__init__()
        self._db = None
        self._module = module
        self._prepare_query_builders(module.paramstyle)
        self._lock = rwlock

    def _prepare_query_builders(self, paramstyle):
        for attr in dir(self):
            attr_value = getattr(self, attr)
            if isinstance(attr_value, SQLQueryBuilder):
                setattr(self, attr, SQLQueryBuilder(attr_value.query_template, paramstyle))

    def _read_handle(self):
        if self._db is not None:
            return ManagedHandle(self._db, self._lock, RWLock.READ_LOCK)
        else:
            raise SnapRepConnectionError("Unable to read from repository database")

    def _write_handle(self):
        if self._db is not None:
            return ManagedHandle(self._db, self._lock, RWLock.WRITE_LOCK)
        else:
            raise SnapRepConnectionError("Unable to write to repository database")

    def _assert_current_ids(self, handle, uri, guid, gen_id, partial_match=False):
        """
        Asserts that the IDs given are the same as those stored for the resource at uri.

        The GUID and generation ID for the resource bound at uri are retrieved and compared to those given
        in guid and gen_id. If they do not match, an exception is thrown.

        If partial_match is True, guid may be None to indicate that the GUID should not be considered for
        the existence check. The effect is this method merely checks that a resource with the given URI
        exists. If it does not, an exception is thrown.

        @param handle: DB handle object.
        @type handle: object
        
        @param uri: URI of resource to check.
        @type uri: string

        @param guid: GUID expected for stored resource.
        @type guid: string

        @param gen_id: Generation ID expected for stored resource.
        @type gen_id: integer

        @param partial_match: Flag to allow partial matches if guid and/or gen_id are None.
        @type partial_match: bool

        @raise SnapObjNotFoundError: No resource is lcoated at uri within the store.
        @raise SnapResDefGUIDError: Resource in store at uri has a different GUID than provided in guid.
        @raise SnapResDefGenIDError: Resource in store at uri has different generation ID than provided in gen_id.

        """
        # XXX Assume the read or write lock has already been established.
        (query, values) = self.SELECT_RESOURCE_SUMMARY_BY_URI.build({'uri': uri})
        handle.execute(query, values)
        row = handle.fetchone()
        
        if row is not None:
            if not partial_match or guid is not None:
                (stored_guid, stored_gen_id) = row[1:]
                if stored_guid != guid:
                    raise SnapResDefGUIDError("Resource '%s' GUID did not match." % uri,
                                              ('Expected', guid),
                                              ('Found', stored_guid))
                elif (not partial_match or gen_id is not None) and stored_gen_id != gen_id:
                    raise SnapResDefGenIDError("Resource '%s' generation ID did not match." % uri,
                                               ('Expected', gen_id),
                                               ('Found', stored_gen_id))
        else:
            raise SnapObjNotFoundError("Resource '%s' was not found in the Repository." % uri)
        
    def _raise_mismatch_error(self, handle, uri, guid, gen_id, partial_match=False):
        """
        Raise an exception for the kind of error that caused a resource mismatch.

        This method is used in the error case when a resource operation failed to match a resource in the store.
        It is the same as the _assert_current_ids() method, except that if that function should return for some reason,
        a general purpose internal error exception is raised instead.
        
        @param handle: DB handle object.
        @type handle: object
        
        @param uri: URI of resource to check.
        @type uri: string

        @param guid: GUID expected for stored resource or None to match any.
        @type guid: string

        @param gen_id: Generation ID expected for stored resource or None to match any.
        @type gen_id: integer

        @param partial_match: Flag to allow partial matches if guid and/or gen_id are None.
        @type partial_match: bool

        @raise SnapObjNotFoundError: No resource is lcoated at uri within the store.
        @raise SnapResDefGUIDError: Resource in store at uri has a different GUID than provided in guid.
        @raise SnapResDefGenIDError: Resource in store at uri has different generation ID than provided in gen_id.

        """
        self._assert_current_ids(handle, uri, guid, gen_id, partial_match)

        # If execution gets to this point, then there was some kind of internal error.
        raise SnapRepInternalError("Internal Repository error: resource match failed for unknown reason:",
                                   ("uri", uri),
                                   ("guid", guid),
                                   ("gen_id", gen_id))
    
    def read_version(self):
        """
        Read the version number stored for the repository.

        @return: The repository version number stored.
        @rtype: integer

        """
        with self._read_handle() as handle:
            handle.execute(self.SELECT_VERSION)
            row = handle.fetchone()
            
        if row is not None:
            return row[0]
        else:
            raise SnapRepInternalError("Repository version data missing.")

    def close(self):
        with self._write_handle() as handle:
            self._db.close()
            self._db = None
        
    def read_designated_version(self):
        """
        Read the designated server version from the store.
        
        @return: Designated server version.
        @rtype: string
        
        """
        with self._read_handle() as handle:
            try:
                handle.execute(self.SELECT_DESIGNATED_VERSION)
                row = handle.fetchone()
            except Exception, e:
                raise SnapException.chain(e,
                                          SnapRepInternalError("Error reading designated server version: " + str(e)))

        if row is not None:
            return row[0]
        else:
            raise SnapRepInternalError("Repository version data missing")

    def set_designated_version(self, version):
        """
        Set the designated server version in the store.

        @param version: New version string to set.
        @type version: str

        """
        with self._write_handle() as handle:
            (query, values) = self.UPDATE_DESIGNATED_VERSION.build({'designated_version': version})
            handle.execute(query, values)
            handle.commit()
        
    def create_resource(self, uri, resdef, force_flag=False):
        """
        Create a resource in the database.

        Implementation of the SnapStore method.
        
        """
        res_guid = uuid.uuid4().hex
        res_gen_id = 0
        encoded_resdef = self._encode_value(resdef)
        if uri is None:
            uri = self._generate_uri(res_guid, resdef)

        if not force_flag:
            (query, values) = self.INSERT_RESOURCE.build({'uri': uri,
                                                          'guid': res_guid,
                                                          'gen_id': res_gen_id,
                                                          'object': encoded_resdef})
        else:
            (query, values) = self.REPLACE_RESOURCE.build({'uri': uri,
                                                           'guid': res_guid,
                                                           'gen_id': res_gen_id,
                                                           'object': encoded_resdef})

        with self._write_handle() as handle:
            try:
                handle.execute(query, values)
                handle.commit()
            except self._module.IntegrityError:
                raise SnapObjExistsError("A resource already exists at the URI '%s'." % uri)

            return {keys.URI: uri, keys.GUID: res_guid, keys.GEN_ID: res_gen_id}

    def read_all_resources(self):
        """
        Read all resources from the store.

        Implementation of the SnapStore method.

        """
        with self._read_handle() as handle:
            result = {}
            handle.execute(self.SELECT_RESOURCES)
            for (uri, guid, gen_id, encoded_resdef) in sqlite_iter(handle):
                resdef = self._decode_value(encoded_resdef)
                result[uri] = {keys.GUID: guid, keys.GEN_ID: gen_id, keys.RESDEF: resdef}

            return result
        
    def read_resources(self, uri_list):
        """
        Read resources from the store.

        Implementation of the SnapStore method.
        
        """
        with self._read_handle() as handle:
            result = {keys.SUCCESS: {}, keys.ERROR: {}}
            for uri in uri_list:
                (query, values) = self.SELECT_RESOURCE_BY_URI.build({'uri': uri})
                handle.execute(query, values)
                row = handle.fetchone()
                if row is not None:
                    (uri, guid, gen_id, encoded_resdef) = row
                    resdef = self._decode_value(encoded_resdef)
                    result[keys.SUCCESS][uri] = {keys.GUID: guid, keys.GEN_ID: gen_id, keys.RESDEF: resdef}
                else:
                    result[keys.ERROR][uri] = None

            return result

    def list_resources(self, uri_list=None):
        """
        Return dictionary of resources contained in store.
        
        Implementation of the SnapStore method.
        
        """
        return self.summarize_resources(uri_list)
    
    def summarize_resources(self, uri_list=None):
        """
        Return dictionary of resources contained in store.
        
        Implementation of the SnapStore method.
        
        """
        with self._read_handle() as handle:
            if uri_list is None:
                handle.execute(self.SELECT_RESOURCE_SUMMARIES)
                result = {}
                for row in sqlite_iter(handle):
                    (uri, guid, gen_id) = row
                    result[uri] = self._make_summary_dict(guid, gen_id)
            else:
                # Create a dictionary with all URIs initially set to None
                result = dict.fromkeys(uri_list, None)
                rows = []
                for uri in uri_list:
                    (query, values) = self.SELECT_RESOURCE_SUMMARY_BY_URI.build({'uri': uri})
                    handle.execute(query, values)
                    row = handle.fetchone()
                    if row is not None:
                        (uri, guid, gen_id) = row
                        result[uri] = self._make_summary_dict(guid, gen_id)

            return result

    def update_resource(self, uri, guid, gen_id, resdef, force_flag=False):
        """
        Update the resource associated with uri.

        Implementation of the SnapStore method.
        
        """
        if force_flag and guid is None:
            return self.create_resource(uri, resdef, force_flag)
            
        encoded_resdef = self._encode_value(resdef)

        if not force_flag:
            # No force flag. (uri, guid, gen_id) must all match.
            (query, values) = self.UPDATE_RESOURCE.build({'uri': uri,
                                                          'guid': guid,
                                                          'gen_id': gen_id,
                                                          'object': encoded_resdef})
        else:
            # Force flag with guid given. (uri, guid) must match.
            (query, values) = self.UPDATE_RESOURCE_FORCE_WITH_GUID.build({'uri': uri,
                                                                          'guid': guid,
                                                                          'gen_id': gen_id,
                                                                          'object': encoded_resdef})

        with self._write_handle() as handle:
            handle.execute(query, values)
            rowcount = handle.rowcount
            if rowcount != 1:
                self._raise_mismatch_error(handle, uri, guid, gen_id, force_flag)
                
            # Read the resdef back out for the updated values.
            (query, values) = self.SELECT_RESOURCE_SUMMARY_BY_URI.build({'uri': uri})
            handle.execute(query, values)
            row = handle.fetchone()
            if row is None:
                raise SnapGeneralError("Internal Repository error: resource match failed for unknown reason:",
                                       ("uri", uri))
            
            handle.commit()
            return {keys.URI: row[0],
                    keys.GUID: row[1],
                    keys.GEN_ID: row[2]}

    def delete_resource(self, uri, guid, gen_id, force_flag=False):
        """
        Delete a resource from the store.
        
        Implementation of the SnapStore method.
        
        """
        if force_flag:
            if guid is None:
                (query, values) = self.DELETE_RESOURCE_FORCE.build({'uri': uri})
            else:
                (query, values) = self.DELETE_RESOURCE_FORCE_WITH_GUID.build({'uri': uri, 'guid': guid})
        else:
            (query, values) = self.DELETE_RESOURCE.build({'uri': uri,
                                                          'guid': guid,
                                                          'gen_id': gen_id})
            
        with self._write_handle() as handle:
            handle.execute(query, values)
            rowcount = handle.rowcount
            if rowcount == 1:
                handle.commit()
            else:
                self._raise_mismatch_error(handle, uri, guid, gen_id, force_flag)
        
    def read_resource_validated_flag(self, uri, guid, gen_id):
        """
        Retrieve the validated flag on a resource.

        See L{snaplogic.server.repository.request_proxy.RequestProxy.is_resource_validated}.
        
        """
        (query, values) = self.SELECT_RESOURCE_VALIDATED_FLAG.build({'uri': uri,
                                                                     'guid': guid,
                                                                     'gen_id': gen_id})
        with self._read_handle() as handle:
            handle.execute(query, values)
            row = handle.fetchone()
            return row is not None and row[0] != 0
            
    def set_resource_validated_flag(self, uri, guid, gen_id):
        """
        Set the validated flag on a resource.

        See L{snaplogic.server.repository.request_proxy.RequestProxy.set_resource_validated_flag}.
        
        """
        (query, values) = self.UPDATE_RESOURCE_VALIDATED_FLAG.build({'uri': uri,
                                                                     'guid': guid,
                                                                     'gen_id': gen_id,
                                                                     'validated': 1})
        with self._write_handle() as handle:
            handle.execute(query, values)
            rowcount = handle.rowcount

            if rowcount == 1:
                handle.commit()
            else:
                self._raise_mismatch_error(handle, uri, guid, gen_id)
        
    def create_scheduler_event(self, uri, event):
        encoded_event = self._encode_value(event)
        (query, values) = self.INSERT_SCHEDULER_EVENT.build({'uri': uri, 'event': encoded_event})
        with self._write_handle() as handle:
            handle.execute(query, values)
            handle.commit()
        
    def read_scheduler_event(self, uri):
        (query, values) = self.SELECT_SCHEDULER_EVENT_BY_URI.build({'uri': uri})
        with self._read_handle() as handle:
            handle.execute(query, values)
            result = handle.fetchone()
            
        if result is None:
            raise SnapObjNotFoundError("Unable to read scheduler event: event '%s' not found." % uri)
        else:
            return self._decode_value(result[1])
        
    def update_scheduler_event(self, uri, event):
        with self._write_handle() as handle:
            encoded_event = self._encode_value(event)
            (query, values) = self.UPDATE_SCHEDULER_EVENT.build({'uri': uri, 'event': encoded_event})
            handle.execute(query, values)
            if handle.rowcount == 1:
                handle.commit()
            else:
                raise SnapObjNotFoundError("Unable to update scheduler event: event '%s' not found." % uri)
            
    def delete_scheduler_event(self, uri):
        with self._write_handle() as handle:
            (query, values) = self.DELETE_SCHEDULER_EVENT.build({'uri': uri})
            handle.execute(query, values)
            if handle.rowcount == 1:
                handle.commit()
            else:
                raise SnapObjNotFoundError("Unable to delete scheduler event: event '%s' not found." % uri)
            
    def list_scheduler_events(self, uri_prefix=None):
        with self._read_handle() as handle:
            if uri_prefix is None:
                handle.execute(self.SELECT_SCHEDULER_EVENT_URIS)
            else:
                (query, values) = self.SELECT_SCHEDULER_EVENT_URIS_LIKE_URI.build({'uri': uri_prefix + '%'})
                handle.execute(query, values)
            
        return [row[0] for row in sqlite_iter(handle)]

    def set_registry_key(self, key, value):
        (query, values) = self.INSERT_OR_UPDATE_REGISTRY_ENTRY.build({'name': key,
                                                                      'value': self._encode_value(value)})
        with self._write_handle() as handle:
            handle.execute(query, values)
            handle.commit()

    def get_registry_key(self, key):
        (query, values) = self.SELECT_REGISTRY_ENTRY_BY_NAME.build({'name': key})
        with self._read_handle() as handle:
            handle.execute(query, values)
            result = handle.fetchone()
            
        if result is not None:
            return self._decode_value(result[0])
        else:
            raise SnapObjNotFoundError("Registry key '%s' not set." % key)

    def delete_registry_key(self, key):
        (query, values) = self.DELETE_REGISTRY_ENTRY.build({'name': key})
        with self._write_handle() as handle:
            handle.execute(query, values)
            if handle.rowcount == 1:
                handle.commit()
            else:
                raise SnapObjNotFoundError("Registry key '%s' not set." % key)

    def upgrade_store(self):
        """
        Upgrade the store's database to the latest schema.

        @raises SnapRepUpgradeError: An error occurs while upgrading the database schema.

        """
        with self._write_handle() as handle:
            handle.execute(self.SELECT_VERSION)
            row = handle.fetchone()
            if row is None:
                raise SnapRepUpgradeError("Repository version data missing")

            version = row[0]
            if version < version_info.repository_version:
                # Check that all upgrade functions between the version the store is at and the current version
                # exist.
                upgrade_methods = []
                for mid_ver in xrange(version, version_info.repository_version):
                    method_name = "_upgrade_store_%d_to_%d" % (mid_ver, mid_ver + 1)
                    if not hasattr(self, method_name):
                        raise SnapRepUpgradeError("Internal error: upgrade method %s not defined" % method_name)
                    else:
                        upgrade_methods.append(method_name)

                # Perform upgrade one version change at a time.
                for method_name in upgrade_methods:
                    method = getattr(self, method_name)
                    method(handle)

                # If all upgrade methods passed successfully, update the version info.
                sql = SQLQueryBuilder("""
                                      UPDATE version_info
                                      SET version = ${r_ver}, designated_version = ${des_ver}
                                      """,
                                      self._module.paramstyle)
                (query, values) = sql.build({'r_ver': version_info.repository_version,
                                             'des_ver': version_info.server_version})
                handle.execute(query, values)
                if handle.rowcount != 1:
                    raise SnapRepUpgradeError("Unable to set the new version information")

                handle.commit()
            elif version != version_info.repository_version:
                raise SnapRepUpgradeError("Unable to upgrade a repository with a greater version than server")

    def _upgrade_store_1_to_2(self, handle):
        handle.execute('DROP TABLE version_info')
        handle.execute(self.CREATE_TABLE_VERSION_INFO)

        sql = SQLQueryBuilder('INSERT INTO version_info VALUES (2, ${designated_version})',
                              self._module.paramstyle)
        (query, values) = sql.build({'designated_version': version_info.server_version})
        handle.execute(query, values)
        if handle.rowcount != 1:
            raise SnapRepUpgradeError("Unable to initialize new version_info table")

        handle.commit()

    def _upgrade_store_2_to_3(self, handle):
        # The change from version 2 to 3 is a MySQL bug fix. All other SQL-based stores do not
        # need any real upgrade steps.
        return

    def _upgrade_store_3_to_4(self, handle):
        """
        Create tables for the new registry and scheduler features.

        """
        # Some internal SnapLogic repositories may already have these tables while others do not. The
        # scheduler and registry features predate the registry upgrade code. Ignore the error raised if the
        # table already exists.
        try:
            handle.execute(self.CREATE_TABLE_SCHEDULER_EVENT)
        except self._module.OperationalError:
            pass

        try:
            handle.execute(self.CREATE_TABLE_REGISTRY)
        except self._module.OperationalError:
            pass

        handle.commit()
            
    @classmethod
    def _destroy(cls, db, module):
        cursor = db.cursor()

        # Might have partial/broken setup. Attempt these steps one at a time.
        for table_name in ['resource', 'version_info', 'scheduler_event', 'registry']:
            try:
                cursor.execute(cls.DROP_TABLE_PS % table_name)
                db.commit()
            except module.OperationalError, e:
                # Ignore missing table.
                pass

        cursor.close()

    @classmethod
    def _createTables(cls, db, module):
        """
        Create repository database tables.

        This function executes SQL statements to create database tables necessary for a repository.

        @param db: A DB-API2 database connection handle.
        @type db: object

        @param module: The DB-API2 module of the relevant database backend.
        @type paramstyle: module

        """
        cursor = db.cursor()
        
        # Create the tables
        cursor.execute(cls.CREATE_TABLE_VERSION_INFO)
        cursor.execute(cls.CREATE_TABLE_RESOURCE)
        cursor.execute(cls.CREATE_TABLE_SCHEDULER_EVENT)
        cursor.execute(cls.CREATE_TABLE_REGISTRY)
        
        # Initialize the tables
        (query, values) = cls.INSERT_VERSION.build({'version': version_info.repository_version, 
                                                    'designated_version': version_info.server_version}, 
                                                   module.paramstyle)
        cursor.execute(query, values)
        cursor.close()
        db.commit()

www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.