Oracle.py :  » Development » SnapLogic » snaplogic » components » DBUtils » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » SnapLogic 
SnapLogic » snaplogic » components » DBUtils » Oracle.py
# $SnapHashLicense:
# 
# SnapLogic - Open source data services
# 
# Copyright (C) 2008-2009, SnapLogic, Inc.  All rights reserved.
# 
# See http://www.snaplogic.org for more information about
# the SnapLogic project. 
# 
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
# 
# "SnapLogic" is a trademark of SnapLogic, Inc.
# 
# 
# $
# $Id: Oracle.py 9960 2009-12-02 01:02:49Z grisha $

import cx_Oracle
import threading
import datetime
import os

from decimal import Decimal
from distutils.version import LooseVersion

from snaplogic.components.DBUtils import SnapDBAdapter
from snaplogic.common.snap_exceptions import SnapException,SnapComponentError
from snaplogic.common.data_types import SnapDateTime,SnapNumber,SnapString

"""
Oracle (and cx_Oracle driver)-specific implementation of L{SnapDBAdapter}

"""
CURRENT_CX_ORACLE_VERSION = LooseVersion(cx_Oracle.version)
CX_ORACLE_AFTER_5 = CURRENT_CX_ORACLE_VERSION >= LooseVersion('5.0')

"""See http://python.net/crew/atuining/cx_Oracle/html/node4.html"""
TYPE_CODE_TO_NATIVE_TYPE = {
                            cx_Oracle.DATETIME      :  'date',
                            cx_Oracle.TIMESTAMP     :  'timestamp',
                            cx_Oracle.NUMBER        :  'number',
                            cx_Oracle.STRING        :  'varchar2',
                            cx_Oracle.FIXED_CHAR    :  'char',
                            cx_Oracle.LONG_STRING   :  'long',
                            cx_Oracle.BINARY        :  'raw',
                            cx_Oracle.CLOB          :  'clob',
                            cx_Oracle.BLOB          :  'blob',
                            cx_Oracle.NCLOB         :  'nclob'
                            }

if CX_ORACLE_AFTER_5:
    TYPE_CODE_TO_NATIVE_TYPE_5 = {
                            cx_Oracle.UNICODE       :  'nvarchar2',
                            cx_Oracle.FIXED_UNICODE :  'nchar',
                            
                            }
    TYPE_CODE_TO_NATIVE_TYPE.update(TYPE_CODE_TO_NATIVE_TYPE_5)

NATIVE_TYPE_TO_SNAP_TYPE = {
        'nvarchar2' : SnapString,
        'varchar2' : SnapString,
        'char' : SnapString,
        'nchar' : SnapString,
        'long' : SnapString,
        
        'float' : SnapNumber,
        'number' : SnapNumber,
        
        'date' : SnapDateTime,
        'timestamp' : SnapDateTime,
        'interval' : SnapDateTime
        }

class OracleCursorWrapper(object):
    """
    A wrapper around DB API 2.0 cursor, to handle encoding and type conversion.
    Will be returned by L{SnapCx_Oracle.cursor}
    
    """
    def __init__(self, cursor, snap_conn):
        """
        Initialize.
        
        @param cursor: DB API 2.0 cursor object, to which most requests will
        be delegated.
        @type: cursor
        
        @param snap_conn: The L{SnapDBAdapter} object that generated this cursor
        @type snap_conn: L{SnapDBAdapter} 
        
        """
        self._snap_conn = snap_conn
        self._delegate = cursor
        self._delegate.numbersAsStrings = 1
        self._metadata = None
        self._string_fields = None
        self._lob_fields = None
        self._num_fields = None
    
    def convert_row(self, row):
        """
        Convert a row of data in native data types into a row of Snap types.
        
        @param row: row returned by database
        @type row: tuple
        
        @return: row converted to Snap data types
        @rtype: list 
        """
        if self._metadata is not None and self._string_fields is None and self._num_fields is None:
            return row
        if not row:
            return row
        if self._metadata is None:
            self._metadata = self._delegate.description
            self._string_fields = None
            self._num_fields = None
            i = 0
            for col_metadata in self._metadata:
                type_code = col_metadata[1]
                native_type = self._snap_conn.type_code_to_native_type(type_code)
                snap_type = self._snap_conn.native_type_to_snap_type(native_type)
                if snap_type == SnapString:
                    if not CX_ORACLE_AFTER_5 or not type_code in [cx_Oracle.UNICODE, cx_Oracle.FIXED_UNICODE]:
                        if self._string_fields is None:
                            self._string_fields = []
                        self._string_fields.append(i)
                elif snap_type == SnapNumber:
                    if self._num_fields is None:
                        self._num_fields = []
                    self._num_fields.append(i)
                else:
                    pass
                i += 1

        new_row = list(row)
        if self._string_fields is not None:
            for i in self._string_fields:
                if new_row[i] is not None:
                    new_row[i] = new_row[i].decode(self._snap_conn._conn.encoding)
        if self._num_fields is not None:
            for i in self._num_fields:
                val = new_row[i]
                if val is not None:
                    val = str(val)
                    try:
                        val = Decimal(val)
                    except:
                        # This will happen if the locale is such that group and
                        # decimal delimiters are, for instance, European - 
                        # period for a group, comma for decimal
                        val = val.replace(self._snap_conn._decimal_delimiter, '.')
                        val = val.replace(self._snap_conn._group_delimiter, '')
                        val = Decimal(val) 
                new_row[i] = val
        return new_row
    
    def convert_results(self, rs):
        """
        Convert the result set from native data types to Snap data types.
        This is similar to L{convert_row}, except it acts on the entire result
        set
        
        @param rs: Result set to convert
        @type rs: list or tuple
        
        @return: converted result set
        @type: list
        
        """
        if self._metadata is not None and self._string_fields is None and self._num_fields is None:
            return rs
        if not rs:
            return rs
        converted_rs = []
        for row in rs:
            new_row = self.convert_row(row)
            converted_rs.append(new_row)
        return converted_rs 
        
    def fetchone(self):
        """
        Same as cursor.fetchone() specified in DB API 2.0, except returning
        Snap data types.
        
        """
        row = self._delegate.fetchone()
        if row is not None:
            row = self.convert_row(row)
        return row
    
    def fetchmany(self, size=None):
        """
        Same as cursor.fetchmany() specified in DB API 2.0, except returning
        Snap data types.
        
        """
        if size is None:
            size = self._delegate.arraysize
        rs = self._delegate.fetchmany(size)
        rs = self.convert_results(rs)
        return rs 
    
    def fetchall(self):
        """
        Same as cursor.fetchall() specified in DB API 2.0, except returning
        Snap data types.
        
        """
        rs = self._delegate.fetchall()
        rs = self.convert_results(rs)
        return rs
            
    def execute(self, operation, parameters=None):
        """
        Same as cursor.execute() specified in DB API 2.0. Used to properly
        encode SQL statement and parameters.
        
        """
        # https://www.snaplogic.org/trac/ticket/1351
        self._metadata = None
        if type(operation) == unicode:
            try:
                operation = operation.encode(self._snap_conn._conn.encoding) 
            except UnicodeEncodeError, e:
                snap_exc = SnapException("Cannot encode SQL command (%s) into the Oracle client encoding (%s); consider setting NLS_LANG variable appropriately" % (operation, self._snap_conn._conn.encoding))
                snap_exc.append(e)
                raise snap_exc
        try:
            if parameters:
                enc_params = {}
                for p_name in parameters:
                    p_val = parameters[p_name]
                    p_val_t = type(p_val)
                    if p_val_t == unicode:
                        p_val = p_val.encode(self._snap_conn._conn.encoding)
                    enc_params[str(p_name)] = p_val
                parameters = enc_params
                return self._delegate.execute(operation, parameters)
            else:
                return self._delegate.execute(operation)
        except Exception, e:
            raise
        
    def __getattr__(self, name):
        """
        Used to delegate to the native cursor object those methods that are not
        wrapped by this class.
        
        """
        result = getattr(self._delegate, name)
        return result
        
class Oracle(SnapDBAdapter):
    """
    Implementation of L{SnapDBAdapter} for Oracle.
    
    """
    
    def __init__(self, *args, **kwargs):
        if not os.environ.has_key('NLS_LANG'):
            os.environ['NLS_LANG'] = 'American_America.UTF8'
        kw2 = {}
        for k in kwargs.keys():
            v = kwargs[k]
            if isinstance(v, unicode):
                v = v.encode('utf-8')
            kw2[k] = v
        conn = cx_Oracle.connect(**kwargs)
        self._user = kwargs['user']
        
        cursor = conn.cursor()
        # We need to keep this result because setting this value later will work for binding
        # but not for retrieving data.
        # See also
        # http://sourceforge.net/mailarchive/forum.php?thread_name=48C1B0FC.4060809%40snaplogic.org&forum_name=cx-oracle-users
        cursor.execute("SELECT value FROM nls_session_parameters WHERE parameter='NLS_NUMERIC_CHARACTERS'");
        num_char_result = cursor.fetchall()[0][0]
        self._decimal_delimiter = num_char_result[0]
        self._group_delimiter = num_char_result[1]
        # This request will work for binding...
        cursor.execute("ALTER session SET NLS_NUMERIC_CHARACTERS = '.,'")
        cursor.execute("ALTER session SET TIME_ZONE = '00:00'")
        conn.commit()
        cursor.close()
        super(Oracle, self).__init__(conn, cx_Oracle)
        
    
    def cursor(self):
        """
        See L{SnapDBAdapter.cursor} and L{OracleCursorWrapper}
        
        """ 
        native_cursor = SnapDBAdapter.cursor(self)
        my_cursor = OracleCursorWrapper(native_cursor, self)
        return my_cursor

    def upsert(self, table, row, keys, table_metadata=None):
        """
        Oracle-specific implementation of L{SnapDBAdapter.upsert()
        by using MERGE.
        
        """
        
        field_names = row.keys()
        inner_select_clause = [':%s AS %s' % (f, f) for f in field_names]
        sql = "MERGE INTO " + \
                table + \
                " t1 USING (SELECT " + \
                ', '.join(inner_select_clause) + \
                " FROM DUAL) t2 ON (";
        set_clause = ["t1.%s = t2.%s" % (key, key) for key in keys]
        sql += ' AND '.join(set_clause)
        sql += ")"
        sql += " WHEN MATCHED THEN UPDATE SET "
        fields_to_set = list(set(field_names) - set(keys))
        update_clause = ["t1.%s = t2.%s" % (f, f) for f in fields_to_set]
        sql += ",".join(update_clause)
        sql += " WHEN NOT MATCHED THEN INSERT ("
        sql += ",".join(field_names)
        sql += ") VALUES ("
        bound_field_names = self.bindVariableList(field_names)
        sql += ",".join(bound_field_names)
        sql += ")"
        cur = self.cursor()
        cur.execute(sql, row)
    
    def fix_bound_values(self, record):
        """
        Given a record (really, a sequence) whose elements are 
        Python objects, returns a sequence whose elements
        are of the type that the DB expects.
        
        @param record: sequence of values 
        @type record: sequence
        
        @return: a record with elements converted to types the DB expects. 
        @rtype: sequence
        
        """
        new_result = {}
        for param in record.keys():
            value = record[param]
            param_t = type(param)
            if param_t == unicode:
                param = param.encode(self._conn.encoding)
            value_t = type(value)
            if value_t == unicode:
                value = value.encode(self._conn.encoding)
            elif value_t == Decimal:
                value = str(value)
            new_result[param] = value
        return new_result

    def get_default_schema(self):
        """
        See L{SnapDBAdapter.get_default_schema}. Default here is assumed
        to be the user connected to Oracle.
        
        """
        return self._user

    def list_tables(self, schema = None):
        """
        See L{SnapDBAdapter.list_tables}. 
        """
        if not schema:
            schema = self.get_default_schema()
        cur = self.cursor()
        bind = {':schema' : schema }
        sql = """
                SELECT table_name FROM all_all_tables 
                WHERE LOWER(owner) = :schema
                UNION ALL 
                SELECT view_name AS table_name FROM all_views
                WHERE LOWER(owner) = :schema
                """
        cur.execute(sql, bind)     
        result = cur.fetchall()
        result = [row[0] for row in result]
        cur.close()
        return result

    def limit_rows_clause(self, limit=1):
        """
        See L{SnapDBAdapter.limit_rows_clause()}
        """
        return "AND rownum<=%s" % limit

    def get_snap_view_metadata(self, table_name):
        view_def = {}
        field_defs = []
        (schema, table_name) = self._parse_table_name(table_name)
        view_def['schema'] = schema
        cur = self.cursor()
        sql = """
                SELECT * FROM all_tab_columns 
                WHERE LOWER(owner) = LOWER(:schema) AND 
                LOWER(table_name) = LOWER(:table_name) 
                ORDER BY column_id ASC
              """
        bind = {':schema' : schema, ':table_name' : table_name}
        cur.execute(sql, bind)        
        result = cur._delegate.fetchall()
        if not result:
            raise SnapComponentError("Table '%s' not found in schema '%s'" % (table_name, schema))
        indices = {}
        for i in range(len(cur.description)):
            meta = cur.description[i]
            indices[meta[0]] = i 
        for row in result:
            # These we need for actual metadata
            name = row[indices['COLUMN_NAME']]
            data_type = row[indices['DATA_TYPE']].lower()
            if data_type.startswith('timestamp('):
                data_type = 'timestamp'
            elif data_type.startswith('interval '):
                data_type = 'interval'
            snap_type = self.native_type_to_snap_type(data_type)
            
            desc = []
            data_type_mod = row[indices['DATA_TYPE_MOD']]
            if data_type_mod:
                pass
            data_type_owner = row[indices['DATA_TYPE_OWNER']]
            if data_type_owner:
                pass
            
            nullable = row[indices['NULLABLE']]
            desc.append("Nullable: %s" % nullable)
            length = row[indices['DATA_LENGTH']]
            if length:
                desc.append("Length: %s" % length)
            precision = row[indices['DATA_PRECISION']]
            if precision:
                desc.append("Precision: %s" % precision)
            scale = row[indices['DATA_SCALE']]
            if scale:
                desc.append("Scale: %s" % scale)
            default = row[indices['DATA_DEFAULT']]
            if default:
                desc.append("Default: %s" % default)
            charset = row[indices['CHARACTER_SET_NAME']]
            if charset:
                desc.append("Character set: %s" % charset)
            desc = '; '.join(desc)
            field_def = (name, snap_type, desc,)
            field_defs.append(field_def)
            
        pkey_sql = """
                    SELECT 
                    aic.column_name
                    FROM all_ind_columns aic 
                    LEFT JOIN all_constraints alc 
                    ON aic.index_name = alc.constraint_name AND 
                    aic.table_name = alc.table_name AND 
                    aic.table_owner = alc.owner 
                    WHERE 
                    LOWER(aic.table_name) = LOWER(:table_name)
                    AND LOWER(aic.table_owner) = LOWER(:schema)
                    AND alc.constraint_type = 'P'
                    """
        cur.execute(pkey_sql, bind)
        pkey_rs = cur.fetchall()
        primary_key = [row[0] for row in pkey_rs]
        cur.close()
        view_def['fields'] = tuple(field_defs)
        view_def['primary_key'] = primary_key
        return view_def
  
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.