cx_oracle.py :  » Database » SQLAlchemy » SQLAlchemy-0.6.0 » lib » sqlalchemy » dialects » oracle » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Database » SQLAlchemy 
SQLAlchemy » SQLAlchemy 0.6.0 » lib » sqlalchemy » dialects » oracle » cx_oracle.py
"""Support for the Oracle database via the cx_oracle driver.

Driver
------

The Oracle dialect uses the cx_oracle driver, available at 
http://cx-oracle.sourceforge.net/ .   The dialect has several behaviors 
which are specifically tailored towards compatibility with this module.
Version 5.0 or greater is **strongly** recommended, as SQLAlchemy makes
extensive use of the cx_oracle output converters for numeric and 
string conversions.

Connecting
----------

Connecting with create_engine() uses the standard URL approach of 
``oracle://user:pass@host:port/dbname[?key=value&key=value...]``.  If dbname is present, the 
host, port, and dbname tokens are converted to a TNS name using the cx_oracle 
:func:`makedsn()` function.  Otherwise, the host token is taken directly as a TNS name.

Additional arguments which may be specified either as query string arguments on the
URL, or as keyword arguments to :func:`~sqlalchemy.create_engine()` are:

* *allow_twophase* - enable two-phase transactions.  Defaults to ``True``.

* *arraysize* - set the cx_oracle.arraysize value on cursors, in SQLAlchemy
  it defaults to 50.  See the section on "LOB Objects" below.
  
* *auto_convert_lobs* - defaults to True, see the section on LOB objects.

* *auto_setinputsizes* - the cx_oracle.setinputsizes() call is issued for all bind parameters.
  This is required for LOB datatypes but can be disabled to reduce overhead.  Defaults
  to ``True``.

* *mode* - This is given the string value of SYSDBA or SYSOPER, or alternatively an
  integer value.  This value is only available as a URL query string argument.

* *threaded* - enable multithreaded access to cx_oracle connections.  Defaults
  to ``True``.  Note that this is the opposite default of cx_oracle itself.

Unicode
-------

cx_oracle 5 fully supports Python unicode objects.   SQLAlchemy will pass
all unicode strings directly to cx_oracle, and additionally uses an output
handler so that all string based result values are returned as unicode as well.

LOB Objects
-----------

cx_oracle returns oracle LOBs using the cx_oracle.LOB object.  SQLAlchemy converts
these to strings so that the interface of the Binary type is consistent with that of
other backends, and so that the linkage to a live cursor is not needed in scenarios
like result.fetchmany() and result.fetchall().   This means that by default, LOB
objects are fully fetched unconditionally by SQLAlchemy, and the linkage to a live
cursor is broken.  

To disable this processing, pass ``auto_convert_lobs=False`` to :func:`create_engine()`.

Two Phase Transaction Support
-----------------------------

Two Phase transactions are implemented using XA transactions.  Success has been reported 
with this feature but it should be regarded as experimental.

"""

from sqlalchemy.dialects.oracle.base import OracleCompiler,OracleDialect,\
                                        RESERVED_WORDS, OracleExecutionContext
from sqlalchemy.dialects.oracle import base
from sqlalchemy.engine import base
from sqlalchemy import types
from datetime import datetime
import random
from decimal import Decimal

class _OracleNumeric(sqltypes.Numeric):
    def bind_processor(self, dialect):
        # cx_oracle accepts Decimal objects and floats
        return None

    def result_processor(self, dialect, coltype):
        # we apply a connection output handler that 
        # returns Decimal for positive precision + scale NUMBER 
        # types
        if dialect.supports_native_decimal:
            if self.asdecimal and self.scale is None:
                processors.to_decimal_processor_factory(Decimal)
            elif not self.asdecimal and self.scale > 0:
                return processors.to_float
            else:
                return None
        else:
            # cx_oracle 4 behavior, will assume 
            # floats
            return super(_OracleNumeric, self).\
                            result_processor(dialect, coltype)
            
class _OracleDate(sqltypes.Date):
    def bind_processor(self, dialect):
        return None

    def result_processor(self, dialect, coltype):
        def process(value):
            if value is not None:
                return value.date()
            else:
                return value
        return process

class _LOBMixin(object):
    def result_processor(self, dialect, coltype):
        if not dialect.auto_convert_lobs:
            # return the cx_oracle.LOB directly.
            return None
            
        def process(value):
            if value is not None:
                return value.read()
            else:
                return value
        return process

class _NativeUnicodeMixin(object):
    # Py2K
    def bind_processor(self, dialect):
        if dialect._cx_oracle_with_unicode:
            def process(value):
                if value is None:
                    return value
                else:
                    return unicode(value)
            return process
        else:
            return super(_NativeUnicodeMixin, self).bind_processor(dialect)
    # end Py2K
    
    # we apply a connection output handler that returns
    # unicode in all cases, so the "native_unicode" flag 
    # will be set for the default String.result_processor.
    
class _OracleChar(_NativeUnicodeMixin, sqltypes.CHAR):
    def get_dbapi_type(self, dbapi):
        return dbapi.FIXED_CHAR

class _OracleNVarChar(_NativeUnicodeMixin, sqltypes.NVARCHAR):
    def get_dbapi_type(self, dbapi):
        return getattr(dbapi, 'UNICODE', dbapi.STRING)
        
class _OracleText(_LOBMixin, sqltypes.Text):
    def get_dbapi_type(self, dbapi):
        return dbapi.CLOB

class _OracleString(_NativeUnicodeMixin, sqltypes.String):
    pass

class _OracleUnicodeText(_LOBMixin, _NativeUnicodeMixin, sqltypes.UnicodeText):
    def get_dbapi_type(self, dbapi):
        return dbapi.NCLOB

    def result_processor(self, dialect, coltype):
        lob_processor = _LOBMixin.result_processor(self, dialect, coltype)
        if lob_processor is None:
            return None

        string_processor = sqltypes.UnicodeText.result_processor(self, dialect, coltype)

        if string_processor is None:
            return lob_processor
        else:
            def process(value):
                return string_processor(lob_processor(value))
            return process

class _OracleInteger(sqltypes.Integer):
    def result_processor(self, dialect, coltype):
        def to_int(val):
            if val is not None:
                val = int(val)
            return val
        return to_int
        
class _OracleBinary(_LOBMixin, sqltypes.LargeBinary):
    def get_dbapi_type(self, dbapi):
        return dbapi.BLOB

    def bind_processor(self, dialect):
        return None

class _OracleInterval(oracle.INTERVAL):
    def get_dbapi_type(self, dbapi):
        return dbapi.INTERVAL
    
class _OracleRaw(oracle.RAW):
    pass

class OracleCompiler_cx_oracle(OracleCompiler):
    def bindparam_string(self, name):
        if self.preparer._bindparam_requires_quotes(name):
            quoted_name = '"%s"' % name
            self._quoted_bind_names[name] = quoted_name
            return OracleCompiler.bindparam_string(self, quoted_name)
        else:
            return OracleCompiler.bindparam_string(self, name)

    
class OracleExecutionContext_cx_oracle(OracleExecutionContext):
    
    def pre_exec(self):
        quoted_bind_names = \
            getattr(self.compiled, '_quoted_bind_names', None)
        if quoted_bind_names:
            if not self.dialect.supports_unicode_binds:
                quoted_bind_names = \
                                dict(
                                    (fromname, toname.encode(self.dialect.encoding)) 
                                    for fromname, toname in 
                                    quoted_bind_names.items()
                                )
            for param in self.parameters:
                for fromname, toname in quoted_bind_names.items():
                    param[toname] = param[fromname]
                    del param[fromname]

        if self.dialect.auto_setinputsizes:
            # cx_oracle really has issues when you setinputsizes 
            # on String, including that outparams/RETURNING
            # breaks for varchars
            self.set_input_sizes(quoted_bind_names, 
                                 exclude_types=self.dialect._cx_oracle_string_types
                                )

        # if a single execute, check for outparams
        if len(self.compiled_parameters) == 1:
            for bindparam in self.compiled.binds.values():
                if bindparam.isoutparam:
                    dbtype = bindparam.type.dialect_impl(self.dialect).\
                                    get_dbapi_type(self.dialect.dbapi)
                    if not hasattr(self, 'out_parameters'):
                        self.out_parameters = {}
                    if dbtype is None:
                        raise exc.InvalidRequestError("Cannot create out parameter for parameter "
                                                        "%r - it's type %r is not supported by"
                                                        " cx_oracle" %
                                                        (name, bindparam.type)
                                                        )
                    name = self.compiled.bind_names[bindparam]
                    self.out_parameters[name] = self.cursor.var(dbtype)
                    self.parameters[0][quoted_bind_names.get(name, name)] = \
                                                        self.out_parameters[name]
        
    def create_cursor(self):
        c = self._connection.connection.cursor()
        if self.dialect.arraysize:
            c.arraysize = self.dialect.arraysize

        return c

    def get_result_proxy(self):
        if hasattr(self, 'out_parameters') and self.compiled.returning:
            returning_params = dict(
                                    (k, v.getvalue()) 
                                    for k, v in self.out_parameters.items()
                                )
            return ReturningResultProxy(self, returning_params)

        result = None
        if self.cursor.description is not None:
            for column in self.cursor.description:
                type_code = column[1]
                if type_code in self.dialect._cx_oracle_binary_types:
                    result = base.BufferedColumnResultProxy(self)
        
        if result is None:
            result = base.ResultProxy(self)
            
        if hasattr(self, 'out_parameters'):
            if self.compiled_parameters is not None and \
                    len(self.compiled_parameters) == 1:
                result.out_parameters = out_parameters = {}
                
                for bind, name in self.compiled.bind_names.items():
                    if name in self.out_parameters:
                        type = bind.type
                        impl_type = type.dialect_impl(self.dialect)
                        dbapi_type = impl_type.get_dbapi_type(self.dialect.dbapi)
                        result_processor = impl_type.\
                                                    result_processor(self.dialect, 
                                                    dbapi_type)
                        if result_processor is not None:
                            out_parameters[name] = \
                                    result_processor(self.out_parameters[name].getvalue())
                        else:
                            out_parameters[name] = self.out_parameters[name].getvalue()
            else:
                result.out_parameters = dict(
                                            (k, v.getvalue()) 
                                            for k, v in self.out_parameters.items()
                                        )

        return result

class OracleExecutionContext_cx_oracle_with_unicode(OracleExecutionContext_cx_oracle):
    """Support WITH_UNICODE in Python 2.xx.
    
    WITH_UNICODE allows cx_Oracle's Python 3 unicode handling 
    behavior under Python 2.x. This mode in some cases disallows 
    and in other cases silently passes corrupted data when 
    non-Python-unicode strings (a.k.a. plain old Python strings) 
    are passed as arguments to connect(), the statement sent to execute(), 
    or any of the bind parameter keys or values sent to execute().  
    This optional context therefore ensures that all statements are 
    passed as Python unicode objects.
    
    """
    def __init__(self, *arg, **kw):
        OracleExecutionContext_cx_oracle.__init__(self, *arg, **kw)
        self.statement = unicode(self.statement)

    def _execute_scalar(self, stmt):
        return super(OracleExecutionContext_cx_oracle_with_unicode, self).\
                            _execute_scalar(unicode(stmt))
                            
class ReturningResultProxy(base.FullyBufferedResultProxy):
    """Result proxy which stuffs the _returning clause + outparams into the fetch."""
    
    def __init__(self, context, returning_params):
        self._returning_params = returning_params
        super(ReturningResultProxy, self).__init__(context)
        
    def _cursor_description(self):
        returning = self.context.compiled.returning
        
        ret = []
        for c in returning:
            if hasattr(c, 'name'):
                ret.append((c.name, c.type))
            else:
                ret.append((c.anon_label, c.type))
        return ret
    
    def _buffer_rows(self):
        return [tuple(self._returning_params["ret_%d" % i] 
                    for i, c in enumerate(self._returning_params))]

class OracleDialect_cx_oracle(OracleDialect):
    execution_ctx_cls = OracleExecutionContext_cx_oracle
    statement_compiler = OracleCompiler_cx_oracle

    driver = "cx_oracle"
    
    colspecs = colspecs = {
        sqltypes.Numeric: _OracleNumeric,
        sqltypes.Date : _OracleDate, # generic type, assume datetime.date is desired
        oracle.DATE: oracle.DATE,  # non generic type - passthru
        sqltypes.LargeBinary : _OracleBinary,
        sqltypes.Boolean : oracle._OracleBoolean,
        sqltypes.Interval : _OracleInterval,
        oracle.INTERVAL : _OracleInterval,
        sqltypes.Text : _OracleText,
        sqltypes.String : _OracleString,
        sqltypes.UnicodeText : _OracleUnicodeText,
        sqltypes.CHAR : _OracleChar,
        sqltypes.Integer : _OracleInteger,  # this is only needed for OUT parameters.
                                            # it would be nice if we could not use it otherwise.
        oracle.RAW: _OracleRaw,
        sqltypes.Unicode: _OracleNVarChar,
        sqltypes.NVARCHAR : _OracleNVarChar,
    }

    
    execute_sequence_format = list
    
    def __init__(self, 
                auto_setinputsizes=True, 
                auto_convert_lobs=True, 
                threaded=True, 
                allow_twophase=True, 
                arraysize=50, **kwargs):
        OracleDialect.__init__(self, **kwargs)
        self.threaded = threaded
        self.arraysize = arraysize
        self.allow_twophase = allow_twophase
        self.supports_timestamp = self.dbapi is None or hasattr(self.dbapi, 'TIMESTAMP' )
        self.auto_setinputsizes = auto_setinputsizes
        self.auto_convert_lobs = auto_convert_lobs
        
        if hasattr(self.dbapi, 'version'):
            cx_oracle_ver = tuple([int(x) for x in self.dbapi.version.split('.')])
        else:  
           cx_oracle_ver = (0, 0, 0)
        
        def types(*names):
            return set([
                        getattr(self.dbapi, name, None) for name in names
                    ]).difference([None])

        self._cx_oracle_string_types = types("STRING", "UNICODE", "NCLOB", "CLOB")
        self._cx_oracle_unicode_types = types("UNICODE", "NCLOB")
        self._cx_oracle_binary_types = types("BFILE", "CLOB", "NCLOB", "BLOB") 
        self.supports_unicode_binds = cx_oracle_ver >= (5, 0)
        self.supports_native_decimal = cx_oracle_ver >= (5, 0)
        self._cx_oracle_native_nvarchar = cx_oracle_ver >= (5, 0)

        if cx_oracle_ver is None:
            # this occurs in tests with mock DBAPIs
            self._cx_oracle_string_types = set()
            self._cx_oracle_with_unicode = False
        elif cx_oracle_ver >= (5,) and not hasattr(self.dbapi, 'UNICODE'):
            # cx_Oracle WITH_UNICODE mode.  *only* python
            # unicode objects accepted for anything
            self.supports_unicode_statements = True
            self.supports_unicode_binds = True
            self._cx_oracle_with_unicode = True
            # Py2K
            # There's really no reason to run with WITH_UNICODE under Python 2.x.
            # Give the user a hint.
            util.warn("cx_Oracle is compiled under Python 2.xx using the "
                        "WITH_UNICODE flag.  Consider recompiling cx_Oracle without "
                        "this flag, which is in no way necessary for full support of Unicode. "
                        "Otherwise, all string-holding bind parameters must "
                        "be explicitly typed using SQLAlchemy's String type or one of its subtypes,"
                        "or otherwise be passed as Python unicode.  Plain Python strings "
                        "passed as bind parameters will be silently corrupted by cx_Oracle."
                        )
            self.execution_ctx_cls = OracleExecutionContext_cx_oracle_with_unicode
            # end Py2K
        else:
            self._cx_oracle_with_unicode = False

        if cx_oracle_ver is None or \
                    not self.auto_convert_lobs or \
                    not hasattr(self.dbapi, 'CLOB'):
            self.dbapi_type_map = {}
        else:
            # only use this for LOB objects.  using it for strings, dates
            # etc. leads to a little too much magic, reflection doesn't know if it should
            # expect encoded strings or unicodes, etc.
            self.dbapi_type_map = {
                self.dbapi.CLOB: oracle.CLOB(),
                self.dbapi.NCLOB:oracle.NCLOB(),
                self.dbapi.BLOB: oracle.BLOB(),
                self.dbapi.BINARY: oracle.RAW(),
            }
    
    @classmethod
    def dbapi(cls):
        import cx_Oracle
        return cx_Oracle

    def on_connect(self):
        cx_Oracle = self.dbapi
        def output_type_handler(cursor, name, defaultType, size, precision, scale):
            # convert all NUMBER with precision + positive scale to Decimal.
            # this effectively allows "native decimal" mode.
            if defaultType == cx_Oracle.NUMBER and precision and scale > 0:
                return cursor.var(
                            cx_Oracle.STRING, 
                            255, 
                            outconverter=Decimal, 
                            arraysize=cursor.arraysize)
            # allow all strings to come back natively as Unicode
            elif defaultType in (cx_Oracle.STRING, cx_Oracle.FIXED_CHAR):
                return cursor.var(unicode, size, cursor.arraysize)
            
        def on_connect(conn):
            conn.outputtypehandler = output_type_handler
            
        return on_connect
    
    def create_connect_args(self, url):
        dialect_opts = dict(url.query)
        for opt in ('use_ansi', 'auto_setinputsizes', 'auto_convert_lobs',
                    'threaded', 'allow_twophase'):
            if opt in dialect_opts:
                util.coerce_kw_type(dialect_opts, opt, bool)
                setattr(self, opt, dialect_opts[opt])

        if url.database:
            # if we have a database, then we have a remote host
            port = url.port
            if port:
                port = int(port)
            else:
                port = 1521
            dsn = self.dbapi.makedsn(url.host, port, url.database)
        else:
            # we have a local tnsname
            dsn = url.host

        opts = dict(
            user=url.username,
            password=url.password,
            dsn=dsn,
            threaded=self.threaded,
            twophase=self.allow_twophase,
            )

        # Py2K
        if self._cx_oracle_with_unicode:
            for k, v in opts.items():
                if isinstance(v, str):
                    opts[k] = unicode(v)
        # end Py2K

        if 'mode' in url.query:
            opts['mode'] = url.query['mode']
            if isinstance(opts['mode'], basestring):
                mode = opts['mode'].upper()
                if mode == 'SYSDBA':
                    opts['mode'] = self.dbapi.SYSDBA
                elif mode == 'SYSOPER':
                    opts['mode'] = self.dbapi.SYSOPER
                else:
                    util.coerce_kw_type(opts, 'mode', int)
        return ([], opts)

    def _get_server_version_info(self, connection):
        return tuple(int(x) for x in connection.connection.version.split('.'))

    def is_disconnect(self, e):
        if isinstance(e, self.dbapi.InterfaceError):
            return "not connected" in str(e)
        else:
            return "ORA-03114" in str(e) or "ORA-03113" in str(e)

    def create_xid(self):
        """create a two-phase transaction ID.

        this id will be passed to do_begin_twophase(), do_rollback_twophase(),
        do_commit_twophase().  its format is unspecified."""

        id = random.randint(0, 2 ** 128)
        return (0x1234, "%032x" % id, "%032x" % 9)

    def do_begin_twophase(self, connection, xid):
        connection.connection.begin(*xid)

    def do_prepare_twophase(self, connection, xid):
        connection.connection.prepare()

    def do_rollback_twophase(self, connection, xid, is_prepared=True, recover=False):
        self.do_rollback(connection.connection)

    def do_commit_twophase(self, connection, xid, is_prepared=True, recover=False):
        self.do_commit(connection.connection)

    def do_recover_twophase(self, connection):
        pass

dialect = OracleDialect_cx_oracle
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.