test_EditingContext_ParentChild.py :  » Database » Modeling-Framework » Modeling-0.9 » Modeling » tests » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Database » Modeling Framework 
Modeling Framework » Modeling 0.9 » Modeling » tests » test_EditingContext_ParentChild.py
#! /usr/bin/env python
# -*- coding: iso-8859-1 -*-
#-----------------------------------------------------------------------------
# Modeling Framework: an Object-Relational Bridge for python
#
# Copyright (c) 2001-2004 Sbastien Bigaret <sbigaret@users.sourceforge.net>
# All rights reserved.
#
# This file is part of the Modeling Framework.
#
# This code is distributed under a "3-clause BSD"-style license;
# see the LICENSE file for details.
#-----------------------------------------------------------------------------

"""
Global tests for EditingContext

  This tests the editing context and the whole underlying framework


  CVS information

    $Id: test_EditingContext_ParentChild.py 932 2004-07-20 06:21:57Z sbigaret $
"""
__version__='$Revision: 932 $'[11:-2]

import unittest, sys
import utils
if __name__ == "__main__":
  utils.disable_model_cache()
  if '-c' in sys.argv or '-C' in sys.argv \
     or '-m' in sys.argv or '-M' in sys.argv:
    utils.fixpath(include_testPackages=0)
    utils.dynamically_build_test_packages(sys.argv)
  else:
    utils.fixpath(include_testPackages=1)

from Modeling import ModelSet,Model
from Modeling.EditingContext import EditingContext
from Modeling.FetchSpecification import FetchSpecification
from Modeling.Qualifier import qualifierWithQualifierFormat
from Modeling.Validation import ValidationException,\
     CUSTOM_OBJECT_VALIDATION, OBJECT_WIDE_KEY, DELETE_DENY_KEY
from Modeling import Adaptor
from Modeling import Database

from AuthorBooks.Writer import Writer
from AuthorBooks.Book import Book

from StoreEmployees import Employee


# utilities
def concreteAdaptorChannelForEntity(anEntityName, ec):
  fs=FetchSpecification(anEntityName)
  dbContext=ec.rootObjectStore().objectStoreForFetchSpecification(fs)
  return dbContext.availableChannel().adaptorChannel()


class TestEditingContext_Global_ParentChild(unittest.TestCase):
  "Global tests for parent/child configuration of EditingContexts"


  def test_00_insertObject_loads_databaseContexts(self):
    "[EditingContext] ..."
    # drop any existing default ObjectStoreCoordinator
    from Modeling import ObjectStoreCoordinator
    ObjectStoreCoordinator.setDefaultCoordinator(None)
    ec=EditingContext()
    w=Writer()
    ec.insertObject(w)
    objStoreCoord=ec.rootObjectStore()
    self.failUnless(objStoreCoord)
    self.failUnless(len(objStoreCoord.cooperatingObjectStores())==1)
    self.failUnless(objStoreCoord.cooperatingObjectStores()[0].ownsObject(w))
    
  def test_01_creation(self):
    "[EC:parent/child] creation"
    ec=EditingContext()
    try:
      ec1=EditingContext(ec)
    except:
      self.fail('Could not create a ec with parent object store==ec')
      
  def test_02_objectsWithFetchSpecification(self):
    "[EC:parent/child] objectsWithFetchSpecification simple verifications"
    ## We check here that the basic functionality of fetching objects
    ## from a nested EC does behave as expected. Further testing is done
    ## in tests 10 and 11.

    parent_ec=EditingContext()
    ec=EditingContext(parent_ec)
    fetchSpec=FetchSpecification(entityName='Writer')
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(len(objects)==3)
    for object in objects:
      if object.getLastName()=='Dard':
        dard=object
      if object.getLastName()=='Cleese':
        cleese=object
      if object.getLastName()=='Rabelais':
        rabelais=object
    self.failUnless(cleese.getLastName()=='Cleese')
    object=cleese
    self.failUnless('_lastName' in object.__dict__.keys()) 
    self.failUnless('_firstName' in object.__dict__.keys()) 
    self.failUnless('_age' in object.__dict__.keys()) 
    self.failIf('_id' in object.__dict__.keys()) 
    self.failIf('id' in object.__dict__.keys()) 
    self.failIf('fk_writer_id' in object.__dict__.keys()) 
    self.failIf('_fk_writer_id' in object.__dict__.keys()) 
    #print objects
    #print objects[0].getE1()
    #print objects[0].__dict__
    #print objects[1].__dict__
    #print objects[2].__dict__
    #print '------------------Triggerring fault ------------------------'
    dard_pygmalion=dard.getPygmalion()
    self.failIf(dard_pygmalion.isFault()) # Shouldnt be a fault

    adaptorChannel=concreteAdaptorChannelForEntity('Writer', ec)
    fetchCount=adaptorChannel._count_for_execute
    dard_pygmalion.getLastName() # hence, this should not fetch from DB
    self.failIf(adaptorChannel._count_for_execute!=fetchCount)
    self.failIf(dard_pygmalion.isFault())

    # Last, check that the snapshots exists in the Database
    dard_gID=ec.globalIDForObject(dard)
    pyg_dard_gID=ec.globalIDForObject(dard_pygmalion)
    db=ec.rootObjectStore().objectStoreForGlobalID(dard_gID).database()
    self.failUnless(db.snapshotForGlobalID(dard_gID))
    


  # Tests:
  # 1. with fetched / non-modified objects in parent
  # 2. with modified objects in parent
  #   # saveChanges

  def test_03_fetchedAndNonModifiedObjectsInParent(self):
    "[EC:parent/child] fetchedAndNonModifiedObjectsInParent"
    parent_ec=EditingContext()
    child_ec=EditingContext(parent_ec)

    qualifier=qualifierWithQualifierFormat('lastName=="Dard"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=parent_ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(len(objects)==1, 'Could not get Writer "Dard"')
    parent_dard=objects[0]
    
    objects=child_ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(len(objects)==1, 'Could not get Writer "Dard"')
    child_dard=objects[0]

    # objects are different
    self.failIf(parent_dard==child_dard)
    
    # but have the same GlobalID
    child_gID=child_ec.globalIDForObject(child_dard)
    parent_gID=parent_ec.globalIDForObject(parent_dard)
    self.failIf(parent_gID!=child_gID)

    # both are unmodified
    parent_ec.processRecentChanges()
    child_ec.processRecentChanges()
    self.failIf(child_gID in child_ec.updatedGlobalIDs())
    self.failIf(parent_gID in parent_ec.updatedGlobalIDs())
    
  def test_03_fetchedAndModifiedObjectsInParent(self):
    "[EC:parent/child] fetchedAndModifiedObjectsInParent"
    parent_ec=EditingContext()
    parent_ec.setPropagatesInsertionForRelatedObjects(1)
    child_ec=EditingContext(parent_ec)

    qualifier=qualifierWithQualifierFormat('lastName=="Cleese"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=parent_ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(len(objects)==1, 'Could not get Writer "Cleese"')
    parent_cleese=objects[0]
    
    # Modify it
    parent_cleese.setLastName('cleese')
    b=Book() ; b.setTitle('test')
    #parent_ec.insertObject(b)
    parent_cleese.addToBooks(b) ; b.setAuthor(parent_cleese)
    w=Writer()
    w.setLastName("Cleese's pygmalion")
    #parent_ec.insertObject(w)
    parent_cleese.setPygmalion(w)

    objects=child_ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(len(objects)==1, 'Could not get Writer "Cleese"')
    child_cleese=objects[0]

    # objects are different
    self.failIf(parent_cleese==child_cleese)
    
    # but have the same GlobalID
    child_gID=child_ec.globalIDForObject(child_cleese)
    parent_gID=parent_ec.globalIDForObject(parent_cleese)
    self.failIf(parent_gID!=child_gID)

    # Check that the object we got in the child EC is the modified one
    ## check attributes
    self.failIf(parent_cleese.getLastName()!=child_cleese.getLastName())

    ## check toOne Relationship
    parent_c_pyg_gid=parent_ec.globalIDForObject(parent_cleese.getPygmalion())
    child_c_pyg_gid=child_ec.globalIDForObject(child_cleese.getPygmalion())

    self.failIf(parent_cleese.getPygmalion()==child_cleese.getPygmalion())
    self.failIf(parent_c_pyg_gid==None)
    self.failIf(parent_c_pyg_gid!=child_c_pyg_gid)
    
    ## check toMany Relationship
    self.failIf(len(parent_cleese.getBooks())!=1)
    self.failIf(len(child_cleese.getBooks())!=1)

    parent_book=parent_cleese.getBooks()[0]
    child_book=child_cleese.getBooks()[0]
    self.failIf(parent_book==child_book)
    self.failIf(parent_ec.globalIDForObject(parent_book)!=child_ec.globalIDForObject(child_book))

    
    # Only the one in the parent EC is modified
    parent_ec.processRecentChanges()
    child_ec.processRecentChanges()
    self.failIf(child_gID in child_ec.updatedGlobalIDs())
    self.failIf(parent_gID not in parent_ec.updatedGlobalIDs())
    
  def test_04_saveChanges(self):
    "[EC:parent/child] saveChanges / attributes"
    parent_ec=EditingContext()
    parent_ec.setPropagatesInsertionForRelatedObjects(1)
    child_ec=EditingContext(parent_ec)

    qualifier=qualifierWithQualifierFormat('lastName=="Dard"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=child_ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(len(objects)==1)

    dard=objects[0]
    dard.setLastName('dard')
    try:
      child_ec.saveChanges()
    except:
      self.fail('child_ec.saveChanges() should not fail')

    parent_dard=parent_ec.objectForGlobalID(child_ec.globalIDForObject(dard))
    parent_ec.processRecentChanges()
    self.failIf(parent_dard is None)
    self.failIf(parent_dard not in parent_ec.updatedObjects())
    self.failUnless(parent_dard.getLastName()=='dard')
      
  def test_05_saveChanges(self):
    "[EC:parent/child] saveChanges / relationships: add"
    parent_ec=EditingContext()
    parent_ec.setPropagatesInsertionForRelatedObjects(1)
    child_ec=EditingContext(parent_ec)
    child_ec.setPropagatesInsertionForRelatedObjects(1)

    qualifier=qualifierWithQualifierFormat('lastName=="Dard"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=child_ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(len(objects)==1)

    dard=objects[0]

    b=Book() ; b.setTitle('test')
    dard.addToBooks(b) ; b.setAuthor(dard)
    w=Writer()
    w.setLastName("dard's pygmalion")
    
    dard.addObjectToBothSidesOfRelationshipWithKey(w, 'pygmalion')
    try:
      child_ec.saveChanges()
    except:
      raise
      self.fail('child_ec.saveChanges() should not fail')

    parent_dard=parent_ec.objectForGlobalID(child_ec.globalIDForObject(dard))
    parent_ec.processRecentChanges()
    self.failIf(parent_dard is None)
    self.failIf(parent_dard not in parent_ec.updatedObjects())

    # check toMany relationship 'books'
    self.failUnless(len(parent_dard.getBooks())==4)
    test_book=[b for b in parent_dard.getBooks() if b.getTitle()=='test']
    #print test_book, parent_dard.getBooks()
    self.failUnless(len(test_book)==1)
    test_book=test_book[0]
    self.failUnless(test_book in parent_ec.insertedObjects())

    # check toOne relationship 'pygmalion'
    parent_pyg=parent_dard.getPygmalion()
    self.failIf(parent_pyg is None)
    self.failIf(parent_pyg not in parent_ec.insertedObjects())
    
  def test_06_saveChanges(self):
    "[EC:parent/child] saveChanges / relationships: remove"
    parent_ec=EditingContext()
    parent_ec.setPropagatesInsertionForRelatedObjects(1)
    child_ec=EditingContext(parent_ec)
    child_ec.setPropagatesInsertionForRelatedObjects(1)

    qualifier=qualifierWithQualifierFormat('lastName=="Dard"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=child_ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(len(objects)==1)

    dard=objects[0]
    child_dard_pygmalion_gid=child_ec.globalIDForObject(dard.getPygmalion())
    dard.removeObjectFromBothSidesOfRelationshipWithKey(dard.getPygmalion(),
                                                        'pygmalion')
    removed_book=dard.getBooks()[0]
    dard.removeObjectFromBothSidesOfRelationshipWithKey(removed_book, 'books')
    child_ec.deleteObject(removed_book)
    removed_book_gid=child_ec.globalIDForObject(removed_book)
    try:
      child_ec.saveChanges()
    except:
      raise
      self.fail('child_ec.saveChanges() should not fail')

    parent_dard=parent_ec.objectForGlobalID(child_ec.globalIDForObject(dard))
    parent_ec.processRecentChanges()
    self.failIf(parent_dard is None)
    self.failIf(parent_dard not in parent_ec.updatedObjects())

    # check toMany relationship 'books'
    self.failUnless(len(parent_dard.getBooks())==2)
    parent_removed_book=[b for b in parent_dard.getBooks()
                         if b.getTitle()==removed_book.getTitle()]
    self.failUnless(len(parent_removed_book)==0)
    parent_removed_book=parent_ec.objectForGlobalID(removed_book_gid)
    
    self.failUnless(parent_removed_book in parent_ec.deletedObjects())
    parent_removed_book.willRead()
    
    # check that the snapshot ref-count was transferred to the parent ec
    del child_ec
    db=parent_ec.rootObjectStore().objectStoreForObject(parent_removed_book).database()
    self.failUnless(db.snapshotCountForGlobalID(removed_book_gid)==1)

    # check toOne relationship 'pygmalion'
    parent_pyg=parent_dard.getPygmalion()
    self.failUnless(parent_pyg is None)
    #self.failUnless(parent_pyg not in parent_ec.insertedObjects())

  def test_07_snapshotRefCount(self):
    "[EC:parent/child] Checks that snapshotCounting is OK"
    parent_ec=EditingContext()
  
    qualifier=qualifierWithQualifierFormat('lastName=="Cleese"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=parent_ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(len(objects)==1, 'Could not get "Cleese"')
    parent_cleese=objects[0]
    
    cleese_gID=parent_cleese.editingContext().globalIDForObject(parent_cleese)

    db=parent_ec.rootObjectStore().\
        objectStoreForObject(parent_cleese).database()

    # This is the reference (normally equal to 1)
    snapRefCount=db.snapshotCountForGlobalID(cleese_gID)
  
    # Now pass the object to the child EC (using its GlobalID)
    # and check that it has incremented the snapshot count
    child_ec=EditingContext(parent_ec)
    child_cleese=child_ec.faultForGlobalID(cleese_gID, child_ec)
    child_cleese.willRead()
    self.failIf(db.snapshotCountForGlobalID(cleese_gID)!=snapRefCount+1)

    # After deletion the snapshotRefCount should be back to its initial value
    del child_ec
    self.failIf(db.snapshotCountForGlobalID(cleese_gID)!=snapRefCount)
    
  def test_08_insertionAndGlobalIDChangedNotification(self):
    "[EC:parent/child] insertion and GlobalIDChangedNotification"
    parent_ec=EditingContext()
    child_ec=EditingContext(parent_ec)
    w=Writer(); w.setLastName('test_08')
    child_ec.insertObject(w)
    tmp_child_Gid=child_ec.globalIDForObject(w)
    child_ec.saveChanges()

    # The TempGlobalID should be present in the parent EC
    parent_w=parent_ec.objectForGlobalID(tmp_child_Gid)
    self.failUnless(parent_w)

    # Save changes and check that the new KeyGlobalID was broadcasted as
    # expected, in the parent AND in the child
    parent_ec.saveChanges()
    parent_keyGid=parent_ec.globalIDForObject(parent_w)
    self.failUnless(parent_keyGid and not parent_keyGid.isTemporary())
    self.failUnless(child_ec.objectForGlobalID(parent_keyGid)==w)

  def test_09_fetch_and_fault(self):
    "[EC:parent/child] fetch and fault"
    pec=EditingContext()
    ec=EditingContext(pec)
    fs=FetchSpecification(entityName='Executive')
    fs.setIsDeep(1)
    objs=ec.objectsWithFetchSpecification(fs)
    cleese=objs[0]
    cleese.willChange()
    ec.saveChanges()

    ec=EditingContext(pec)
    fs=FetchSpecification(entityName='Executive')
    fs.setIsDeep(1)
    objs=ec.objectsWithFetchSpecification(fs)
    cleese=objs[0]


    fs=FetchSpecification(entityName='Mark')
    objs=ec.objectsWithFetchSpecification(fs)
    mark=objs[0]

    self.failIf(cleese.isFault())
    self.failUnless(mark.getExecutive()==cleese)
     
  def test_10_child_gets_newly_inserted_objects(self):
    "[EC:parent/child] Child-ec fetches newly inserted objects in parent"
    # This starts like test_03_fetchedAndModifiedObjectsInParent
    # We here want to check that when fetching objects in a child EC
    # we get insertedObjects in the parent EC, as expected
    parent_ec=EditingContext()
    parent_ec.setPropagatesInsertionForRelatedObjects(1)
    child_ec=EditingContext(parent_ec)

    qualifier=qualifierWithQualifierFormat('lastName=="Cleese"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=parent_ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(len(objects)==1, 'Could not get Writer "Cleese"')
    parent_cleese=objects[0]
    
    # Add a new Book
    parent_test_book=Book() ; parent_test_book.setTitle('test')
    parent_ec.insertObject(parent_test_book)
    #parent_cleese.addToBooks(parent_test_book)
    #parent_test_book.setAuthor(parent_cleese)

    # Now, ask the child for books: the new one, 'test', should be included
    fetchSpec=FetchSpecification(entityName='Book')
    objects=child_ec.objectsWithFetchSpecification(fetchSpec)

    #print len(objects), [o.getTitle() for o in objects]
    self.failUnless(len(objects)==5,
                    'Failed to get the object inserted in the parent_ec()')
    child_test_book=[b for b in objects if b.getTitle()=='test'][0]


    # Check that the object was correctly registered within the child ec
    self.failUnless(child_ec.globalIDForObject(child_test_book),
                    "child_test_book is not known to the child ec")

    # Check that the objects are different but have the same GlobalID
    self.assertNotEqual(child_test_book, parent_test_book)
    self.assertEqual(child_ec.globalIDForObject(child_test_book),
                     parent_ec.globalIDForObject(parent_test_book))
   
  def test_11_child_doesnt_get_deleted_objects(self):
    "[EC:parent/child] Child-ec does not fetch objects deleted in parent"
    # We want to check that when fetching objects in a child EC
    # we do not get objects marked as deleted in the parent EC, as expected
    parent_ec=EditingContext()
    parent_ec.setPropagatesInsertionForRelatedObjects(1)
    child_ec=EditingContext(parent_ec)

    qualifier=qualifierWithQualifierFormat('lastName=="Cleese"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=parent_ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(len(objects)==1, 'Could not get Writer "Cleese"')
    parent_cleese=objects[0]
    
    parent_ec.deleteObject(parent_cleese)

    # Now, ask the child for writers: we shouldn't get John Cleese
    fetchSpec=FetchSpecification(entityName='Writer')
    objects=child_ec.objectsWithFetchSpecification(fetchSpec)

    self.failUnless(len(objects)==2,
                    'Got %s writers, expected: 2'%len(objects))
    verif=[w for w in objects if w.getLastName()=='Cleese']
    self.failIf(verif,
                'obj.W/FetchSpec() should not have returned John Cleese')

  def test_12a_fetchesRawRows(self):
    "[EC:parent/child] fetchesRawRows / basics"
    ## Basically the same test than
    ## test_EC_Global_Inheritance.test_08a_fetchesRawRows()
    ## except for the part on modified objects (modified in parent EC here)
    parent_ec=EditingContext()
    child_ec=EditingContext(parent_ec)

    # No inheritance
    addresses=child_ec.fetch('Address', rawRows=1)
    self.failIf(len(addresses)!=3)
    for a in addresses:
      self.failIf(type(a) is not type({}))

    # Inheritance
    employees=child_ec.fetch('Employee', isDeep=1, rawRows=1)
    self.failIf(len(employees)!=3)
    for e in employees:
      self.failIf(type(e) is not type({}))

    # modified objects in parent
    pjohn=parent_ec.fetch('Executive', 'lastName == "Cleese"')[0]
    pjohn_gid=pjohn.globalID()
    john_name=pjohn.getLastName()
    alternate_john_name=john_name+' -- testing'
    pjohn.setLastName(alternate_john_name)
    cjohn=child_ec.fetch('Executive', isDeep=1, rawRows=1)[0]
    self.assertEqual(cjohn['lastName'], alternate_john_name)

  def test_12b_fetchesRawRows(self):
    "[EC:parent/child] fetchesRawRows / inserted & deleted objects"
    from StoreEmployees.SalesClerk import SalesClerk
    from StoreEmployees.Executive import Executive

    parent_ec=EditingContext()
    child_ec=EditingContext(parent_ec)

    # Inserted objects (either in parent or in child) should appear,
    # as expected
    nsc=SalesClerk()
    nsc.setLastName('New SC')
    ne=Executive()
    ne.setLastName('New Executive')
    parent_ec.insert(nsc)
    child_ec.insert(ne)
    employees=child_ec.fetch('Employee', isDeep=1, rawRows=1)
    self.assertEqual(len(employees), 5)
    rnsc=rne=None # returned nsc/ne
    for e in employees:
      self.assertEqual(type(e), type({}))
    
    parent_ec.delete(nsc) # clean-up

    # deletedObjects (either in parent or in child) should not appear
    # Note: we left: 'ne' in the parent
    jeanne=child_ec.fetch('SalesClerk', 'firstName=="Jeanne"')[0]
    child_ec.deleteObject(jeanne)
    john_jr=parent_ec.fetch('SalesClerk', 'firstName=="John Jr."')[0]
    john=parent_ec.fetch('Executive', 'firstName=="John"')[0]
    parent_ec.deleteObject(john_jr)

    employees=child_ec.fetch('Employee', isDeep=1, rawRows=1)
    self.assertEqual(len(employees), 2)
    rnsc=rne=rjeanne=rjohn_jr=rjohn=None # returned nsc/ne
    for e in employees:
      self.assertEqual(type(e), type({}))
      if e['lastName']==nsc.getLastName(): rnsc=e
      if e['lastName']==ne.getLastName(): rne=e
      if e['firstName']==jeanne.getFirstName(): rjeanne=e
      if e['firstName']==john_jr.getFirstName(): rjohn_jr=e
      if e['firstName']==john.getFirstName(): rjohn=e
    self.assertEqual(rnsc, None)
    self.assertEqual(rjeanne, None)
    self.assertEqual(rjohn_jr, None)
    self.assertEqual(rne, ne.snapshot_raw())
    self.assertEqual(rjohn, john.snapshot_raw())

    
  def test_13_faultForRawRow(self):
    "[EC:parent/child] faultForRawRow"
    parent_ec=EditingContext()
    child_ec=EditingContext(parent_ec)

    employees=child_ec.fetch('Employee', isDeep=1, rawRows=1)
    r_executives=[]
    r_sales_clerks=[]
    for e in employees:
      if e.has_key('officeLocation'):
        r_executives.append(e)
      else:
        r_sales_clerks.append(e)
    self.failIf(len(r_executives)!=1)
    self.failIf(len(r_sales_clerks)!=2)

    r_john_c=r_executives[0]
    john_c1=child_ec.faultForRawRow(r_john_c, 'Employee')
    john_c2=child_ec.faultForRawRow(r_john_c, 'Executive')
    self.failUnless(john_c1.isFault())
    self.failUnless(john_c1==john_c2)
    john_c3=child_ec.fetch('Employee',
                           'firstName=="John" AND lastName=="Cleese"',
                           isDeep=1)[0]
    self.failUnless(john_c2==john_c3)
    john_c3.getFirstName()
    self.failIf(john_c1.isFault())
    
    # Anything in the correct inheritance tree return the right object
    john_c4=child_ec.faultForRawRow(r_john_c, 'SalesClerk')
    self.failUnless(john_c1==john_c4)

    # Last, check that it correctly raise when 'pk' is not present
    self.assertRaises(ValueError,
                      child_ec.faultForRawRow, {'incorrect':0}, 'Employee')

  def tearDown(self):
    """
    Cleans up the Database after each tests
    """
    ec=EditingContext()
    w=Writer()
    dbChannel=ec.rootObjectStore().objectStoreForObject(w).availableChannel()
    channel=dbChannel.adaptorChannel()
    sqlExpr=channel.adaptorContext().adaptor().expressionClass()()
    sqlExpr.setStatement("delete from BOOK where id>4")
    channel.evaluateExpression(sqlExpr)
    sqlExpr.setStatement("delete from WRITER where id>3")
    channel.evaluateExpression(sqlExpr)

def usage(prgName, exitStatus=None):
  _usage="""%s [-vV] [-p] [-d <dbAdaptorName>] [-r]
Runs the tests for the Modeling package

Options
--------
  -v  Minimally verbose
  -V  Really verbose
  -h  Prints this message
  -p  enables profiling
  -d  sets the database adaptor to use: Postgresql (default),MySQL, Oracle
                                        or SQLite
  -r  reinitialize the (postgresql) database
         the database defined in the model 'model_testBothSidesRels'
         is altered: tables are dropped, then they are re-created along with
         constraints for PKs and FKs and PK support (sequences)
""" % prgName
  if exitStatus is not None:
    print _usage
    sys.exit(exitStatus)
  else:
    return _usage


verbose=0
database_cfg='Postgresql.cfg'

def main(args):
  me=args[0]
  import getopt
  options, args = getopt.getopt(sys.argv[1:], 'vVprd:')
  #except: usage(me, 1)
  global verbose, database_cfg

  profile=0; reinitDB_flag=0
  for k, v in options:
    if k=='-h': usage(me, 1)
    if k=='-v': verbose=1; continue
    if k=='-V': verbose="Y"; continue
    if k=='-p': profile=1; continue
    if k=='-d':
      if v not in ('Postgresql', 'MySQL', 'Oracle', 'SQLite'): usage(me, 1)
      database_cfg='%s.cfg'%v
      continue
    if k=='-r': reinitDB_flag=1; continue

  if args: usage(me, 1)

  author_books_model=ModelSet.defaultModelSet().modelNamed('AuthorBooks')
  store_employee_model=ModelSet.defaultModelSet().modelNamed('StoreEmployees')
  # Initialization of model's caracs.
  Model.updateModelWithCFG(author_books_model, database_cfg)
  Model.updateModelWithCFG(store_employee_model, database_cfg)

  # MySQL specifics: change TIMESTAMP to DATETIME
  if database_cfg=='MySQL.cfg':
    author_books_model.entityNamed('Writer').attributeNamed('birthday').setExternalType('DATETIME')
    store_employee_model.entityNamed('Holidays').attributeNamed('startDate').setExternalType('DATETIME')
    store_employee_model.entityNamed('Holidays').attributeNamed('endDate').setExternalType('DATETIME')

  # Oracle specifics: change TIMESTAMP to DATE
  if database_cfg=='Oracle.cfg':
    author_books_model.entityNamed('Writer').attributeNamed('birthday').setExternalType('DATE')

  utils.enable_model_cache_and_compute()
  
  if reinitDB_flag:
    from test_EditingContext_Global import reinitDB
    reinitDB(database_cfg)
    return
  
  if profile:
    import profile
    profile.run('utils.run_suite(test_suite(), verbosity=verbose)',
                'profile.out')
    return 
  else:
      return utils.run_suite(test_suite(), verbosity=verbose)

def test_suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(TestEditingContext_Global_ParentChild,
                                     "test_"))
    return suite

if __name__ == "__main__":
  errs = main(sys.argv)
  sys.exit(errs and 1 or 0)

www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.