tableactions.py :  » Database » PyTable » pytable-0.8.20a » pytable » pysqlite » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Database » PyTable 
PyTable » pytable 0.8.20a » pytable » pysqlite » tableactions.py
from pytable import sqlquery,dbschema
from basicproperty import common

try:
  enumerate
except NameError:
  def enumerate( sequence ):
    index = 0
    for item in sequence:
      yield (index, item)
      item += 1

class TableStructure (sqlquery.SQLQuery ):
  """Reverse-engineer table structure/schema from database"""
  sql = """PRAGMA table_info(%(tableName)s);"""
  def processResults(
    self, cursor, tableName,
    **namedarguments ):
    """Build Table and Field descriptors through introspection
    """
    table = dbschema.TableSchema( name=tableName )
    descriptors = []
    nameMap = {}

    for index, (cid, name, dbDataType, notNull, defaultValue, primary) in enumerate(cursor.fetchall()):
##      print (cid, name, dbDataType, notNull, defaultValue, primary)
      extras = {}
      constraints = []
      try:
        extras['dbDataType'] = dbDataType
      except KeyError:
        pass
      else:
        try:
          extras['dataType'] = cursor.connection.driver.sqlToDataType(
            extras['dbDataType']
          )
        except KeyError:
          pass
      if notNull:
        constraints.append(
          dbschema.NotNullConstraint( fields=(name,))
        )
      if primary:
        constraints.append(
          dbschema.PrimaryConstraint( fields=(name,))
        )
      if defaultValue is not None:
        extras['defaultValue'] = defaultValue
      if constraints:
        extras['constraints']= constraints
      new = dbschema.FieldSchema(
        name = name,
        nullOk = not notNull,
        index = index,
        table = table,
        internalSize = 0,
        displaySize = 0,
        **extras
      )
      descriptors.append( new )
      nameMap[new.name] = new
    table.fields = descriptors
    
    ## now get the index information
    newIndices = self.buildIndices( cursor, tableName, nameMap )
    if newIndices:
      table.indices = newIndices
    return table

  def buildIndices( self, cursor, tableName, nameMap ):
    newIndices = []
    for (name,masterRecord,fields) in ListIndices()( cursor, tableName=tableName ):
      unique = self.isUniqueIndex( nameMap, name, masterRecord, fields )
      if unique > 1:
        primary = True
      else:
        primary = False
      fields = [fieldName for (cid,table_cid,fieldName) in fields]
      new = dbschema.IndexSchema(
        name = name,
        unique = unique,
        primary=primary,
        fields = fields,
      )
      newIndices.append(new)
    return newIndices
  
  def isUniqueIndex( self, nameMap, name, masterRecord, fields ):
    """Determine whether this is a unique index

    returns 0 -> not unique, 1 -> unique or 2 -> primary
    """
    if masterRecord[4]:
      for word in masterRecord[4].upper().split( None, 5 ):
        if word == 'UNIQUE':
          return 1
        elif word == 'PRIMARY':
          return 2
      return 0
    else:
      # is automatically generated, so either primary or unique contraint on the field
      # record on the field object while we're at it...
      if len(fields) != 1:
        raise RuntimeError(
          """An index record %r with no definition has more than one field, cannot reverse engineer schema, please report this! %r %r"""%(
            name, masterRecord, fields,
          ),
        )
      fieldName = fields[0][2]
      field = nameMap.get( fieldName )
      if field is None:
        raise RuntimeError( """Field %r, which is declared as part of index %r doesn't appear to exist"""%(
          fieldName, name,
        ))
      returnValue = 1
      found = False
      for constraint in field.constraints:
        if dbschema.isPrimary( constraint ):
          returnValue = 2
          found = True
        elif dbschema.isUnique( constraint ):
          # already specified
          returnValue = 1
          found = True
      if not found:
        field.constraints.append( dbschema.UniqueConstraint(fields=(fieldName,)))
      return returnValue



class ListDatabases( sqlquery.SQLQuery ):
  """Queries PostgreSQL server for list of database-names

  returns a simple list of string names
  """
  sql = """PRAGMA database_list;"""
  def processResults( self, cursor, **namedarguments ):
    """Read database name list from cursor"""
    return [ row[1] for row in cursor.fetchall() ]

class _ListTables( sqlquery.SQLQuery ):
  """Queries connection/cursor for list of table-names

  returns a simple list of string names
  """
  sql = """SELECT * FROM sqlite_master %(whereFragments)s
  UNION
SELECT * FROM sqlite_temp_master %(whereFragments)s;"""
  def __call__( self, cursor, type='table', name=None, **named ):
    """Return given tables (or indices)"""
    whereFragments = []
    if type is not None:
      whereFragments.append( 'type = %(type)s' )
      named['type'] = type
    if name is not None:
      whereFragments.append( 'name = %(name)s' )
      named['name'] = name
    if whereFragments:
      whereFragments = " WHERE %s" % (
        " AND ".join(whereFragments),
      )
      named['whereFragments' ] = whereFragments
    return super( _ListTables, self ).__call__( cursor, **named )
class ListTables( _ListTables ):
  def processResults( self, cursor, **namedarguments ):
    """Read table name list from cursor"""
    return [ record[1] for record in cursor.fetchall()]

class ListIndices( sqlquery.SQLQuery ):
  """Get index names for a given table

  Returns a list of simple string names
  """
  sql = """PRAGMA index_list(%(tableName)s);"""
  def processResults(self, cursor, **named ):
    """returns results of the selection as an unadorned set"""
    lt = _ListTables()
    ii = IndexInfo()
    return [
      (
        row[1],
        lt( cursor, type='index', name=row[1]).fetchone(),
        list(ii(cursor, indexName=row[1])),
      )
      for row in cursor.fetchall()
    ]

class IndexInfo( sqlquery.SQLQuery ):
  """Get index-data-records for a given table

  Returns a sqlite-specific table format...
  """
  sql = """PRAGMA index_info(%(indexName)s);"""
  def processResults(self, cursor, **named ):
    """returns results of the selection as an unadorned set"""
    return cursor.fetchall()

www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.