DB2.py :  » Development » SnapLogic » snaplogic » components » DBUtils » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » SnapLogic 
SnapLogic » snaplogic » components » DBUtils » DB2.py
# $SnapHashLicense:
# 
# SnapLogic - Open source data services
# 
# Copyright (C) 2008-2009, SnapLogic, Inc.  All rights reserved.
# 
# See http://www.snaplogic.org for more information about
# the SnapLogic project. 
# 
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
# 
# "SnapLogic" is a trademark of SnapLogic, Inc.
# 
# 
# $

# $Id: DB2.py 8009 2009-06-26 04:30:52Z grisha $

# Doing this instead of "import DB2" because 
# our name convention for DB wrapper modules and
# classes happens to coincide with the DB-API driver
# in this case. 

import types
db2_driver = __import__('DB2')

from snaplogic.common.data_types import SnapDateTime,SnapNumber,SnapString

TYPE_CODE_TO_NATIVE_TYPE = db2_driver.SQL_type_dict

NATIVE_TYPE_TO_SNAP_TYPE = {
               'character' : SnapString,
               'varchar' : SnapString,
               'longvarchar' : SnapString,
               'long varchar' : SnapString,
               'char' : SnapString,
               'binary' : SnapString,
               'varbinary' : SnapString,
               'longvarbinary' : SnapString,
               'long varbinary' : SnapString,
               
               'bigint' : SnapNumber,
               'integer' : SnapNumber,
               'smallint' : SnapNumber,
               'decimal' : SnapNumber,
               'decimal' : SnapNumber,
               'decfloat' : SnapNumber,
               'double' : SnapNumber,
               'real' : SnapNumber,
               
               'time' : SnapDateTime,
               'date' : SnapDateTime,
               'timestamp' : SnapDateTime,
              }

def _convert_result_rows(self, rows):
    """
    This is a workaround for PyDB2 bug #2618159 
    (https://sourceforge.net/tracker2/?func=detail&aid=2618159&group_id=67548&atid=518206)
    
    """
    TupleType = types.TupleType
    # Begin workaround
    if rows is None:
        return rows
    # End workaround
    for r in rows:
        for i in range(len(r)):
            if type(r[i]) == TupleType:
                r[i] = self._convert_result_col(r[i])
    return rows

db2_driver.Cursor._convert_result_rows = _convert_result_rows

# See PyDB2 bug #2618188
# (https://sourceforge.net/tracker2/?func=detail&aid=2618188&group_id=67548&atid=518206)
db2_driver.SQL_type_dict[-360] = 'DECFLOAT'

from datetime import datetime,time
from decimal import Decimal
import os
import sys
from time import strptime

from snaplogic.components.DBUtils import SnapDBAdapter,_acquireLock,_releaseLock
from snaplogic.common.snap_exceptions import SnapException,SnapComponentError

class DB2CursorWrapper(object):
    """
    A wrapper around DB API 2.0 cursor, to handle encoding and type conversion.
    
    """
    def __init__(self, cursor, snap_con):
        """
        Initialize.
        
        @param cursor: DB API 2.0 cursor object, to which most requests will
        be delegated.
        @type: cursor
        
        @param snap_con: Snap DB connection that produced this cursor
        @type snap_con: SnapDBAdapter
        """
        self._snap_con = snap_con
        self._delegate = cursor
        self._metadata = None
        self._date_fields = None
        self._num_fields = None
        self._str_fields = None
    
    def execute(self, operation, params = None):
        self._metadata = None
        operation = operation.encode('utf-8')
        new_params = None
        if params:
            new_params = []
            for p in params:
                p_t = type(p)
                if p_t == unicode:
                    new_params.append(p.encode('utf-8'))
                else:
                    new_params.append(p)
        return self._delegate.execute(operation, new_params)
    
    def convert_row(self, row):
        """
        Convert a row of data in native data types into a row of Snap types.
        
        @param row: row returned by database
        @type row: tuple
        
        @return: row converted to Snap data types
        @rtype: list 
        """
        if self._metadata is not None and \
            self._date_fields is None and \
            self._num_fields is None and \
            self._str_fields is None:
            return row
        
        if not row:
            return row
        
        if self._metadata is None:
            self._metadata = self._delegate.description
            self._date_fields = None
            self._num_fields = None
            self._str_fields = None
            i = 0
            for col_metadata in self._metadata:
                type_code = col_metadata[1]
                native_type = self._snap_con.type_code_to_native_type(type_code)
                snap_type = self._snap_con.native_type_to_snap_type(native_type)
                if snap_type == SnapNumber:
                    if self._num_fields is None:
                        self._num_fields = {}
                    self._num_fields[i] = native_type
                elif snap_type == SnapString:
                    if self._str_fields is None:
                        self._str_fields = {}
                    self._str_fields[i] = native_type
                elif snap_type == SnapDateTime:
                    if self._date_fields is None:
                        self._date_fields = {}
                    self._date_fields[i] = native_type
                i += 1

        new_row = list(row)
        if self._num_fields is not None:
            for idx in self._num_fields:
                val = row[idx]
                if val is None:
                    continue
                t = type(val)
                if t == int or t == long or t == bool:
                    new_row[idx] = Decimal(val)
                else:
                    new_row[idx] = Decimal(str(val))
        if self._str_fields is not None:
            for idx in self._str_fields:
                val = row[idx]
                if val is None:
                    continue
                new_row[idx] = val.decode('utf-8')
        if self._date_fields is not None:
            for idx in self._date_fields.keys():
                val = row[idx]
                if val is None:
                    continue
                type_name = self._date_fields[idx]
                str_val = row[idx]
                if type_name == 'TIMESTAMP':
                    dot_idx = str_val.rindex('.')
                    micros = str_val[dot_idx+1:]
                    micros = int(micros)
                    str_val = str_val[:dot_idx]
                    tt = strptime(str_val,'%Y-%m-%d-%H.%M.%S')
                    tt = list(tt[0:6])
                    tt.append(micros)
                    dt = datetime(*tt)
                    new_row[idx] = dt
                elif type_name == 'DATE':
                    tt = strptime(str_val,'%Y-%m-%d')
                    tt = tt[0:3]
                    dt = datetime(*tt)
                    new_row[idx] = dt
                elif type_name == 'TIME':
                    tt = strptime(str_val,'%H:%M:%S')
                    tt = tt[3:6]
                    dt = time(*tt)
                    dt = datetime.combine(datetime.today(), dt)
                    new_row[idx] = dt
                    
        return new_row
    
    def convert_results(self, rs):
        """
        Convert the result set from native data types to Snap data types.
        This is similar to L{convert_row}, except it acts on the entire result
        set
        
        @param rs: Result set to convert
        @type rs: list or tuple
        
        @return: converted result set
        @type: list
        
        """
        if self._metadata is not None and \
            self._str_fields is None and \
            self._num_fields is None and \
            self._date_fields is None:
            return rs
        if not rs:
            return rs
        converted_rs = []
        for row in rs:
            new_row = self.convert_row(row)
            converted_rs.append(new_row)
        return converted_rs 
        
    def fetchone(self):
        """
        Same as cursor.fetchone() specified in DB API 2.0, except returning
        Snap data types.
        
        """
        row = self._delegate.fetchone()
        if row is not None:
            row = self.convert_row(row)
        return row
    
    def fetchmany(self, size=None):
        """
        Same as cursor.fetchmany() specified in DB API 2.0, except returning
        Snap data types.
        
        """
        rs = self._delegate.fetchmany(size)
        rs = self.convert_results(rs)
        return rs 
    
    def fetchall(self):
        """
        Same as cursor.fetchall() specified in DB API 2.0, except returning
        Snap data types.
        
        """
        rs = self._delegate.fetchall()
        rs = self.convert_results(rs)
        return rs
        
    def __getattr__(self, name):
        """
        Used to delegate to the native cursor object those methods that are not
        wrapped by this class.
        
        """
        result = getattr(self._delegate, name)
        return result


class DB2(SnapDBAdapter):
    """
    Implementation of L{SnapDBAdapter} for DB2.
    
    """
    def __init__(self, *args, **kwargs):
        dsn = 'driver={IBM DB2 ODBC DRIVER};database=%(db)s;hostname=%(host)s;port=%(port)s;protocol=tcpip;' % kwargs        
        self._user = kwargs['user']
        passwd = kwargs['password']
        conn = db2_driver.connect(dsn=dsn,uid=self._user,pwd=passwd)
        super(DB2, self).__init__(conn, db2_driver)
        self._table_meta = {}

    def upsert(self, table, row, keys, table_metadata, cur=None):
        """
        DB2-specific implementation of L{SnapDBAdapter.upsert()
        by using MERGE.
        
        """
        
        native_types = table_metadata['native_types']
        field_names = row.keys()
        bind_container = [row[f] for f in field_names]
        inner_select_clause = ['CAST(? AS %s) AS %s' % (native_types[f.upper()], f) for f in field_names]
        sql = "MERGE INTO " + \
                table + \
                " t1 USING (SELECT " + \
                ', '.join(inner_select_clause) + \
                " FROM sysibm.sysdummy1) t2 ON (";
        set_clause = ["t1.%s = t2.%s" % (key, key) for key in keys]
        sql += ' AND '.join(set_clause)
        sql += ")"
        sql += " WHEN MATCHED THEN UPDATE SET "
        fields_to_set = list(set(field_names) - set(keys))
        update_clause = ["t1.%s = t2.%s" % (f, f) for f in fields_to_set]
        sql += ",".join(update_clause)
        sql += " WHEN NOT MATCHED THEN INSERT ("
        sql += ",".join(field_names)
        sql += ") VALUES ("
        sql += ",".join(['?' for f in field_names])
        bind_container += bind_container
        sql += ")"
        cur = self.cursor()
        bind_container = self.fix_bound_values(bind_container)
        cur.execute(sql, bind_container)
        cur.close()
        
    def cursor(self):
        """
        See L{SnapDBAdapter.cursor} and L{DB2CursorWrapper}
        
        """ 
        native_cursor = SnapDBAdapter.cursor(self)
        my_cursor = DB2CursorWrapper(native_cursor, self)
        
        # Used for a workaround for Python issue #5377
        # (http://bugs.python.org/issue5377) 
        version_info = sys.version_info
        self._long_workaround = False
        if version_info[1] == 6 or (version_info[1] == 5 and version_info[2] >= 2):
            self._long_workaround = True
        
        return my_cursor

    def fix_bound_values(self, record):
        """
        Given a record (really, a sequence) whose elements are 
        Python objects, returns a sequence whose elements
        are of the type that the DB expects.
        
        @param record: sequence of values 
        @type record: sequence
        
        @return: a record with elements converted to types the DB expects. 
        @rtype: sequence
        
        """
        new_result = []
        for value in record:
            value_t = type(value)
            if value_t == unicode:
                value = value.encode('utf-8')
            elif value_t == datetime:
                value = db2_driver.Timestamp(value.year,   \
                                             value.month,  \
                                             value.day,    \
                                             value.hour,   \
                                             value.minute, \
                                             value.second, \
                                             value.microsecond)
            elif value_t == Decimal:
                int_value = int(value)
                str_value = str(value)
                if '.' in str_value:
                    value = float(value)
                else:
                    value = int_value
                    if self._long_workaround:
                        value = int(int_value)
                
            new_result.append(value)
        return new_result

    def get_default_schema(self):
        """
        See L{SnapDBAdapter.get_default_schema}. Default here is assumed
        to be the user connected to Oracle.
        
        """
        return self._user
    
    def list_tables(self, schema = None):
        """
        See L{SnapDBAdapter.list_tables}. 
        """
        if not schema:
            schema = self.get_default_schema()
            
        cur = self.cursor()
        sql = "SELECT tabname FROM syscat.tables WHERE LOWER(tabschema) = LOWER(CAST(? AS VARCHAR(128)))"
        cur.execute(sql, (schema,))        
        result = cur.fetchall()
        result = [row[0] for row in result]
        cur.close()
        return result
    
    def limit_rows_clause(self, limit=1):
        """
        See L{SnapDBAdapter.limit_rows_clause()}
        """
        return "FETCH FIRST %s ROWS ONLY OPTIMIZE FOR %s ROW FOR FETCH ONLY" % (limit, limit)
    
    def get_snap_view_metadata(self, table_name):
        view_def = {}
        field_defs = []
        (schema, table_name) = self._parse_table_name(table_name)
        view_def['schema'] = schema
        view_def['native_types'] = {}
        cur = self.cursor()
        sql =   """
                SELECT * FROM syscat.columns 
                WHERE 
                LOWER(tabname) = LOWER(CAST(? AS VARCHAR(128))) 
                AND 
                LOWER(tabschema) = LOWER(CAST(? AS VARCHAR(128)))
                ORDER BY
                colno
                """
                
        cur.execute(sql, (table_name, schema))        
        result = cur._delegate.fetchall()
        if not result:
            raise SnapComponentError("Table '%s' not found in schema '%s'" % (table_name, schema))
        indices = {}
        for i in range(len(cur.description)):
            meta = cur.description[i]
            col_name = meta[0]
            if not indices.has_key(col_name):
                indices[col_name] = i 
        for row in result:
            # These we need for actual metadata
            name = row[indices['COLNAME']]
            data_type = row[indices['TYPENAME']]
            snap_type = self.native_type_to_snap_type(data_type)
            desc = []
            
            nullable = row[indices['NULLS']]
            desc.append("Nullable: %s" % nullable)
                
            length = row[indices['LENGTH']]
            if length:
                desc.append("Length: %s" % length)
            
            scale = row[indices['SCALE']]
            if scale:
                desc.append("Scale: %s" % scale)
            
            native_type = data_type.upper()
            if native_type == 'VARCHAR' or native_type == 'CHARACTER':
                native_type += "(%s)" % length
            elif native_type == 'DECIMAL':
                native_type += "(%s,%s)" % (length, scale)
           
            view_def['native_types'][name] = native_type
            
            default = row[indices['DEFAULT']]
            if default:
                desc.append("Default: %s" % default)
            codepage = row[indices['CODEPAGE']]
            if codepage:
                desc.append("Codepage: %s" % codepage) 
            collation = row[indices['COLLATIONNAME']]
            if collation:
                desc.append("Collation: %s" % collation)
            
            remarks = row[indices['REMARKS']]
            if remarks:
                desc.append("Remarks: %s" % remarks)
            
            desc = '; '.join(desc)
            field_def = (name, snap_type, desc,)
            field_defs.append(field_def)
            
        view_def['fields'] = tuple(field_defs)
        
        pkey_sql = """
                    SELECT colnames FROM syscat.indexes WHERE
                    LOWER(tabschema) = LOWER(CAST(? AS VARCHAR(128)))  AND 
                    LOWER(tabname) = LOWER(CAST(? AS VARCHAR(128)))  AND 
                    uniquerule='P'
                    """
        cur.execute(pkey_sql, (schema, table_name,))
        pkey_rs = cur.fetchone()
        if pkey_rs:
            pkey_cols = pkey_rs[0]
            if pkey_cols:
                view_def['primary_key'] = pkey_cols.split('+')[1:]
        if not view_def.has_key('primary_key'):
            view_def['primary_key'] = []        
        cur.close()
        return view_def
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.