test_EditingContext_Global_Inheritance.py :  » Database » Modeling-Framework » Modeling-0.9 » Modeling » tests » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Database » Modeling Framework 
Modeling Framework » Modeling 0.9 » Modeling » tests » test_EditingContext_Global_Inheritance.py
#! /usr/bin/env python
# -*- coding: iso-8859-1 -*-
#-----------------------------------------------------------------------------
# Modeling Framework: an Object-Relational Bridge for python
#
# Copyright (c) 2001-2004 Sbastien Bigaret <sbigaret@users.sourceforge.net>
# All rights reserved.
#
# This file is part of the Modeling Framework.
#
# This code is distributed under a "3-clause BSD"-style license;
# see the LICENSE file for details.
#-----------------------------------------------------------------------------

"""
Global tests for EditingContext, relative to inheritance

  This tests the editing context and the whole underlying framework


  CVS information

    $Id: test_EditingContext_Global_Inheritance.py 968 2006-02-25 15:46:25Z sbigaret $
"""
__version__='$Revision: 968 $'[11:-2]

import unittest, sys
import utils
if __name__ == "__main__":
  utils.disable_model_cache()
  if '-c' in sys.argv or '-C' in sys.argv \
     or '-m' in sys.argv or '-M' in sys.argv:
    utils.fixpath(include_testPackages=0)
    utils.dynamically_build_test_packages(sys.argv)
  else:
    utils.fixpath(include_testPackages=1)

from Modeling import ModelSet,Model
from Modeling.EditingContext import EditingContext
from Modeling.FetchSpecification import FetchSpecification
from Modeling.Qualifier import qualifierWithQualifierFormat
from Modeling.Validation import ValidationException,\
     CUSTOM_OBJECT_VALIDATION, OBJECT_WIDE_KEY, DELETE_DENY_KEY
from Modeling import Adaptor
from Modeling import Database

from StoreEmployees.Store import Store
from StoreEmployees import Employee
from StoreEmployees.SalesClerk import SalesClerk
from StoreEmployees.Executive import Executive
from StoreEmployees.Address import Address
from StoreEmployees.Holidays import Holidays

from mx import DateTime
DateFrom=DateTime.DateFrom

class TestEditingContext_Global_Inheritance(unittest.TestCase):
  "Global tests for EditingContext in relation to inheritance in a model"

  def test_01_objectsWithFetchSpecification(self):
    "[EC/Inheritance] objectsWithFetchSpecification"
    ec=EditingContext()
    fetchSpec=FetchSpecification(entityName='Employee', deepFlag=0)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(len(objects)==0)

    fetchSpec=FetchSpecification(entityName='Employee', deepFlag=1)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    #print len(objects), objects
    self.failUnless(len(objects)==3)

  def test_02_toOneFault(self):
    "[EC/Inheritance] toOneFault"
    ec=EditingContext()
    qual=qualifierWithQualifierFormat('street like "2,*"')
    fetchSpec=FetchSpecification(entityName='Address', qualifier=qual)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(len(objects)==1)

    addr_JohnJr=objects[0]
    johnJr=addr_JohnJr.getToEmployee()
    self.failUnless(johnJr.isFault())
    self.failUnless(johnJr.__class__==Employee.Employee)

    johnJr.willRead()
    self.failIf(johnJr.isFault())
    self.failUnless(johnJr.__class__==SalesClerk)

    ec.dispose()
    
  def test_03_toOneFault_notifications(self):
    "[EC/Inheritance] toOneFault/GlobalIDChangedNotification"
    ec1=EditingContext()
    ec2=EditingContext()
    qual=qualifierWithQualifierFormat('street like "2,*"')
    fetchSpec=FetchSpecification(entityName='Address', qualifier=qual)

    objects_1=ec1.objectsWithFetchSpecification(fetchSpec)
    objects_2=ec2.objectsWithFetchSpecification(fetchSpec)

    addr_JohnJr_1=objects_1[0]
    johnJr_1=addr_JohnJr_1.getToEmployee()
    addr_JohnJr_2=objects_2[0]
    johnJr_2=addr_JohnJr_2.getToEmployee()

    gid_2=ec2.globalIDForObject(johnJr_2)
    # trigger the first one (1)
    johnJr_1.willRead()
    gid_1=ec1.globalIDForObject(johnJr_1)
    # check that the notification was propagated to the 2nd EC
    # meaning that the ``root'' globalID was turned into the concrete one

    # the second fault shouldnt have been triggered
    self.failUnless(johnJr_2.isFault())

    # it is still available under the previous gid (gid_2)
    self.failUnless(ec2.objectForGlobalID(gid_2)==johnJr_2)

    # but not registered with that GlobalID!
    self.failUnless(ec2.globalIDForObject(johnJr_2)==gid_1) 

    # Rather, with the new, concrete one
    self.failUnless(ec2.objectForGlobalID(gid_1)==johnJr_2)

    # 
    # Make some changes in the database, in the meantime
    #
    model=ModelSet.defaultModelSet().modelNamed('StoreEmployees')
    pgAdaptor=Adaptor.adaptorWithModel(model)
    context=pgAdaptor.createAdaptorContext()
    channel=context.createAdaptorChannel()
  
    sqlExpr=pgAdaptor.expressionClass()()
    original_store_area=johnJr_1.getStoreArea()
    if original_store_area=='AB':
      new_store_area='AC'
    else:
      new_store_area='AB'
    sqlExpr.setStatement("update SALES_CLERK set STORE_AREA='%s' where id=%i"\
                         %(new_store_area, gid_1.keyValues().values()[0]))
    channel.evaluateExpression(sqlExpr)

    #
    # Now, check that triggering the fault for johnJr_2 uses the DB's snapshot
    #
    johnJr_2.willRead()
    # Note: This also check that john2_Jr is of the correct class.
    self.failIf(johnJr_2.getStoreArea()!=original_store_area)
    self.failIf(johnJr_1.getStoreArea()!=original_store_area)

  def test_04_validation(self):
    "[EC/Inheritance] test Validation"
    ec=EditingContext()
    qual=qualifierWithQualifierFormat('street like "2,*"')
    fetchSpec=FetchSpecification(entityName='Address', qualifier=qual)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(len(objects)==1)

    addr_JohnJr=objects[0]
    johnJr=addr_JohnJr.getToEmployee()
    addr_JohnJr.validateValueForKey(johnJr, 'toEmployee')
    johnJr.willRead()
    addr_JohnJr.validateValueForKey(johnJr, 'toEmployee')
    
  def test_05_toOneFault_uniquing(self):
    "[EC/Inheritance] Tests that toFaults are also unique within a ec"
    ec=EditingContext()
    # get address for John Cleese
    qual=qualifierWithQualifierFormat('street like "4,*"')
    fetchSpec=FetchSpecification(entityName='Address', qualifier=qual)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(len(objects)==1)
    john_addr=objects[0]
    # get John's mark
    fetchSpec=FetchSpecification(entityName='Mark')
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(len(objects)==1)
    john_mark=objects[0]

    # Verify that both faults points to the same object
    self.failUnless(john_addr.getToEmployee().isFault())
    self.failIf(john_addr.getToEmployee()!=john_mark.getExecutive())
    # trigger one of the fault, then check again
    john_addr.getToEmployee().willRead()
    self.failIf(john_mark.getExecutive().isFault())
    self.failIf(john_addr.getToEmployee()!=john_mark.getExecutive())

  def test_05b_toOneFault_uniquing(self):
    "[EC/Inheritance] Tests unicity for toOne-related objects when they are already fetched"
    ## In this test we make sure that, after an object has been fetched,
    ## any other object pointing to it do not get a to-one fault, but the
    ## object that is previously fetched.

    ec=EditingContext()
    ## get John

    qual=qualifierWithQualifierFormat('firstName=="John" AND lastName=="Cleese"')
    fetchSpec=FetchSpecification(entityName='Executive', qualifier=qual)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(len(objects)==1)
    john=objects[0]

    ## get address for John Cleese

    qual=qualifierWithQualifierFormat('street like "4,*"')
    fetchSpec=FetchSpecification(entityName='Address', qualifier=qual)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(len(objects)==1)
    john_addr=objects[0]

    ## get John's mark

    fetchSpec=FetchSpecification(entityName='Mark')
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(len(objects)==1)
    john_mark=objects[0]

    self.failIf(john_addr.getToEmployee().isFault())
    self.failIf(john_mark.getExecutive().isFault())
    
    ## Check that inheritance does not defeat the uniquing strategy...
    ## Note: this is what used to fail when john is fetched before the others:
    ##       in this case john_mark.getExecutive() was NOT a fault,
    ##       while john_addr.getToEmployee() was a fault

    gid1=ec.globalIDForObject(john)
    gid2=ec.globalIDForObject(john_mark.getExecutive())
    gid3=ec.globalIDForObject(john_addr.getToEmployee())

    ## ... for GlobalIDs...

    self.failIf(gid1!=gid2)
    self.failIf(gid2!=gid3)
    
    ## ...and for the corresponding objects

    self.failIf(john_addr.getToEmployee()!=john)
    self.failIf(john_addr.getToEmployee()!=john_mark.getExecutive())

    ## trigger one of the fault, then check again

    john_addr.getToEmployee().willRead()
    self.failIf(john_mark.getExecutive().isFault())
    self.failIf(john_addr.getToEmployee()!=john_mark.getExecutive())
    #self.failIf(john_addr.getToEmployee()!=john)
    
  def test_06_objectsCountWithFetchSpecification(self):
    "[EC/Inheritance] objectsCountWithFetchSpecification"
    ec=EditingContext()
    fs=FetchSpecification(entityName='Employee')
    fs.setIsDeep(1)
    nb=ec.objectsCountWithFetchSpecification(fs)
    self.failIf(nb!=3)
    #
    fs=FetchSpecification(entityName='SalesClerk')
    nb=ec.objectsCountWithFetchSpecification(fs)
    self.failIf(nb!=2)

  def test_07_fetch_shouldnt_change_FetchSpecification(self):
    "[EC/Inheritance] fetching shouldn't change FetchSpecification"
    # Bug #753147
    from StoreEmployees import Address
    from Modeling.FetchSpecification import FetchSpecification
    from Modeling.EditingContext import EditingContext
    from Modeling.Qualifier import qualifierWithQualifierFormat
    
    ec = EC()
    q = qWQF('toAddresses.zipCode like "4*"')
    fs_deep = FS('Employee', qualifier=q, deepFlag=1) # Note the deepFlag
    objs1=ec.objectsWithFetchSpecification(fs_deep)
    self.failIf(len(objs1)!=3)
    self.failIf(fs_deep.entityName()!='Employee')
    objs2=ec.objectsWithFetchSpecification(fs_deep)
    self.failIf(len(objs2)!=3)

  def test_08a_fetchesRawRows(self):
    "[EC/Inheritance] fetchesRawRows / basics"
    ec=EditingContext()

    # No inheritance
    addresses=ec.fetch('Address', rawRows=1)
    self.failIf(len(addresses)!=3)
    for a in addresses:
      self.failIf(type(a) is not type({}))

    # Inheritance
    employees=ec.fetch('Employee', isDeep=1, rawRows=1)
    self.failIf(len(employees)!=3)
    for e in employees:
      self.failIf(type(e) is not type({}))

    # modified objects
    john=ec.fetch('Executive', 'lastName == "Cleese"')[0]
    john_gid=john.globalID()
    john_name=john.getLastName()
    alternate_john_name=john_name+' -- testing'
    john.setLastName(alternate_john_name)
    john=ec.fetch('Executive', isDeep=1, rawRows=1)[0]
    self.assertEqual(john['lastName'], alternate_john_name)

  def test_08b_fetchesRawRows(self):
    "[EC/Inheritance] fetchesRawRows / inserted & deleted objects"
    ec=EditingContext()

    # Inserted objects should appear, as expected
    nsc=SalesClerk()
    nsc.setLastName('New SC')
    ne=Executive()
    ne.setLastName('New Executive')
    ec.insert(nsc)
    ec.insert(ne)
    employees=ec.fetch('Employee', isDeep=1, rawRows=1)
    self.assertEqual(len(employees), 5)
    rnsc=rne=None # returned nsc/ne
    for e in employees:
      self.assertEqual(type(e), type({}))

    # deletedObjects should not appear
    ec.delete(nsc)
    jeanne=ec.fetch('SalesClerk', 'firstName=="Jeanne"')[0]
    ec.deleteObject(jeanne)
    employees=ec.fetch('Employee', isDeep=1, rawRows=1)
    self.assertEqual(len(employees), 3)
    rnsc=rne=rjeanne=None # returned nsc/ne
    for e in employees:
      self.assertEqual(type(e), type({}))
      if e['lastName']==nsc.getLastName(): rnsc=e
      if e['lastName']==ne.getLastName(): rne=e
      if e['firstName']==jeanne.getFirstName(): rjeanne=e
    self.assertEqual(rnsc, None)
    self.assertEqual(rjeanne, None)
    self.assertEqual(rne, ne.snapshot_raw())
    
  def test_09_faultForRawRow(self):
    "[EC/Inheritance] faultForRawRow"
    ec=EditingContext()
    employees=ec.fetch('Employee', isDeep=1, rawRows=1)
    r_executives=[]
    r_sales_clerks=[]
    for e in employees:
      if e.has_key('officeLocation'):
        r_executives.append(e)
      else:
        r_sales_clerks.append(e)
    self.failIf(len(r_executives)!=1)
    self.failIf(len(r_sales_clerks)!=2)

    r_john_c=r_executives[0]
    john_c1=ec.faultForRawRow(r_john_c, 'Employee')
    john_c2=ec.faultForRawRow(r_john_c, 'Executive')
    self.failUnless(john_c1.isFault())
    self.failUnless(john_c1==john_c2)
    john_c3=ec.fetch('Employee', 'firstName=="John" AND lastName=="Cleese"',
                     isDeep=1)[0]
    self.failUnless(john_c2==john_c3)
    john_c3.getFirstName()
    self.failIf(john_c1.isFault())
    
    # Anything in the correct inheritance tree return the right object
    john_c4=ec.faultForRawRow(r_john_c, 'SalesClerk')
    self.failUnless(john_c1==john_c4)

    # Check that it correctly raise when 'pk' is not present
    self.assertRaises(ValueError,
                      ec.faultForRawRow, {'incorrect':0}, 'Employee')

    ## TBD: CHECK FOR TEMPORARY/INSERTED

  def test_10_toMany_no_inverse__db_snapshot_is_correct(self):
    "[EC/Inheritance] toMany w/o inverse: check db snapshot"
    # We check here that a database snapshot for an object being pointed to
    # by a toMany relationship with no inverse correctly contains its FK
    ec=EditingContext()
    holidays=Holidays()
    holidays.setStartDate(DateFrom('2003-07-01 08:00'))
    holidays.setEndDate(DateFrom('2003-08-01 08:00'))
    ec.insert(holidays)
    ec.saveChanges()

    database=ec.rootObjectStore().objectStoreForObject(holidays).database()
    snap=database.snapshotForGlobalID(holidays.globalID())
    self.failIf(not snap.has_key('fkEmployeeId'))
    
  ## ToMany with no inverse: 
  # test10 / insertion:
  # [a] insert (h.+emp.) + emp.addToHolidays + saveChges
  # [b] insert h. + saveChges + emp.addToHolidays + saveChges
  # [c] insert h.+ emp.addToHolidays + saveChges
  # [d] insert h.+ saveChges + emp.addToHolidays + h.setStartDate + saveChges
  # [e] insert h. + saveChges + insert emp. + emp.addToHolidays + saveChanges
  #
  # test11 / remove:
  # [a] remove simple
  # [b] add h1 + remove h2
  # [c] remove (+ implicit delete w/ cascade rule) emp.

  def _test_10_create_emp_n_holidays(self):
    """
    helper function for tests test_10...: return (employee, holidays), brand
    new instances of Employee/Holidays.
    """
    emp=self._test_10_class_employee()
    emp.setLastName('Test10'); emp.setFirstName('T.')
    holidays=Holidays()
    holidays.setStartDate(DateFrom('2003-07-01 08:00'))
    holidays.setEndDate(DateFrom('2003-08-01 08:00'))
    return emp, holidays

  def _test_10_check_insert_correctness_part1(self, ec, emp, holidays):
    emp_gid=emp.globalID()
    holidays_gid=holidays.globalID()
    db=ec.rootObjectStore().objectStoreForObject(emp).database()

    self.failUnless(holidays_gid)
    self.failIf(not db.snapshotForGlobalID(holidays_gid))
    self.failIf(not db.snapshotForGlobalID(holidays_gid).has_key('fkEmployeeId'))
    self.assertEqual(db.snapshotForGlobalID(holidays_gid)['fkEmployeeId'],
                     emp_gid.keyValues()['id'])
    self.failUnless(db.snapshotForSourceGlobalID(emp.globalID(), 'holidays'))
    return emp_gid, db
  
  def _test_10_check_insert_correctness_part2(self, emp_gid, database):
    ""
    # Now check that it was correctly saved. If 'holidays' was not correctly
    # saved, it hasn't get the right value for its FK and we won't retrieve
    # any holidays for the test employee

    # make sure we won't get informations from the db cache
    self.failIf(database.snapshotForSourceGlobalID(emp_gid, 'holidays'))

    ec2=EditingContext()
    emp2=ec2.fetch('Employee','lastName=="Test10"', isDeep=1)[0]
    self.failIf(len(emp2.getHolidays())==0)

  def _test_10a_toMany_with_no_inverse(self):
    "[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/a]"
    # insert (h.+emp) + emp.addToHolidays + saveChanges
    ec=EditingContext()
    emp, holidays=self._test_10_create_emp_n_holidays()
    ec.insert(emp)
    ec.insert(holidays)
    emp.addToHolidays(holidays)
    ec.saveChanges()

    emp_gid, db=self._test_10_check_insert_correctness_part1(ec, emp, holidays)
    del ec # deleting the ec invalidates the database's cache
    self._test_10_check_insert_correctness_part2(emp_gid, db)
    
  def test_10a_toMany_with_no_inverse(self):
    "[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/a]"
    self._test_10_class_employee=Employee.Employee
    self._test_10a_toMany_with_no_inverse()
    self._test_10_class_employee=SalesClerk
    self._test_10a_toMany_with_no_inverse()

  def _test_10b_toMany_with_no_inverse(self):
    "[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/b]"
    # insert h. + saveChanges + emp.addToHolidays + saveChanges
    ec=EditingContext()
    emp, holidays=self._test_10_create_emp_n_holidays()
    emp_gid=emp.globalID()
    ec.insert(emp)
    ec.saveChanges()

    ec.insert(holidays)
    ec.saveChanges()

    emp.addToHolidays(holidays)
    #emp.setLastName('fdsk') 
    ec.saveChanges()
    
    emp_gid, db=self._test_10_check_insert_correctness_part1(ec, emp, holidays)
    del ec # deleting the ec invalidates the database's cache
    self._test_10_check_insert_correctness_part2(emp_gid, db)

  def test_10b_toMany_with_no_inverse(self):
    "[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/b]"
    self._test_10_class_employee=Employee.Employee
    self._test_10b_toMany_with_no_inverse()
    self._test_10_class_employee=SalesClerk
    self._test_10b_toMany_with_no_inverse()

  def _test_10c_toMany_with_no_inverse(self):
    "[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/c]"
    # insert h. + emp.addToHolidays + saveChanges
    ec=EditingContext()
    emp, holidays=self._test_10_create_emp_n_holidays()
    ec.insert(emp)
    ec.saveChanges()

    ec.insert(holidays)
    emp.addToHolidays(holidays)
    ec.saveChanges()

    emp_gid, db=self._test_10_check_insert_correctness_part1(ec, emp, holidays)
    del ec # deleting the ec invalidates the database's cache
    self._test_10_check_insert_correctness_part2(emp_gid, db)
    
  def test_10c_toMany_with_no_inverse(self):
    "[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/c]"
    self._test_10_class_employee=Employee.Employee
    self._test_10c_toMany_with_no_inverse()
    self._test_10_class_employee=SalesClerk
    self._test_10c_toMany_with_no_inverse()

  def _test_10d_toMany_with_no_inverse(self):
    "[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/d]"
    #insert h. + saveChanges + emp.addToHolidays + h.setStartDate + saveChanges
    ec=EditingContext()
    emp, holidays=self._test_10_create_emp_n_holidays()
    ec.insert(emp)
    ec.saveChanges()

    ec.insert(holidays)
    ec.saveChanges()

    emp.addToHolidays(holidays)
    holidays.setStartDate(DateFrom('2003-07-02 08:00'))
    ec.saveChanges()
    
    emp_gid, db=self._test_10_check_insert_correctness_part1(ec, emp, holidays)
    del ec # deleting the ec invalidates the database's cache
    self._test_10_check_insert_correctness_part2(emp_gid, db)
    
  def test_10d_toMany_with_no_inverse(self):
    "[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/d]"
    self._test_10_class_employee=Employee.Employee
    self._test_10d_toMany_with_no_inverse()
    self._test_10_class_employee=SalesClerk
    self._test_10d_toMany_with_no_inverse()

  def _test_10e_toMany_with_no_inverse(self):
    "[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/e]"
    # insert h. + save + insert emp. + emp.addToHolidays + saveChanges
    ec=EditingContext()
    emp, holidays=self._test_10_create_emp_n_holidays()
    ec.insert(holidays)
    ec.saveChanges()

    ec.insert(emp)
    emp.addToHolidays(holidays)
    ec.saveChanges()
    
    emp_gid, db=self._test_10_check_insert_correctness_part1(ec, emp, holidays)
    del ec # deleting the ec invalidates the database's cache
    self._test_10_check_insert_correctness_part2(emp_gid, db)
    
  def test_10e_toMany_with_no_inverse(self):
    "[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/e]"
    self._test_10_class_employee=Employee.Employee
    self._test_10e_toMany_with_no_inverse()
    self._test_10_class_employee=SalesClerk
    self._test_10e_toMany_with_no_inverse()

  def _test_11a_init_emp_and_holidays(self, ec):
    emp=self._test_11_class_employee()
    emp.setLastName('Test11'); emp.setFirstName('T.')
    holidays=Holidays()
    holidays.setStartDate(DateFrom('2003-07-01 08:00'))
    holidays.setEndDate(DateFrom('2003-08-01 08:00'))
    ec.insert(emp)
    ec.insert(holidays)
    emp.addToHolidays(holidays)
    ec.saveChanges()
    return emp, holidays

  def _test_11a_toMany_with_no_inverse__remove(self):
    "[EC/Inheritance] toMany w/o inverse: obj. correctly updated [remove/a]"
    # remove simple
    ec=EditingContext()
    emp, holidays=self._test_11a_init_emp_and_holidays(ec)

    emp.removeFromHolidays(holidays)
    ec.saveChanges()

    emp_gid=emp.globalID()
    h_gid=holidays.globalID()
    db=ec.rootObjectStore().objectStoreForObject(emp).database()
    self.failUnless(db.snapshotForGlobalID(h_gid)['fkEmployeeId'] is None)
    del ec

    # make sure we won't get informations from the db cache
    self.failIf(db.snapshotForSourceGlobalID(emp_gid, 'holidays'))
    ec2=EditingContext()
    emp2=ec2.fetch('Employee', 'lastName=="Test11"', isDeep=1)[0]
    self.failIf(len(emp2.getHolidays())!=0)

  def test_11a_toMany_with_no_inverse__remove(self):
    "[EC/Inheritance] toMany w/o inverse: obj. correctly updated [remove/a]"
    self._test_11_class_employee=Employee.Employee
    self._test_11a_toMany_with_no_inverse__remove()
    self._test_11_class_employee=SalesClerk
    self._test_11a_toMany_with_no_inverse__remove()

  def _test_11b_toMany_with_no_inverse__remove(self):
    "[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/b]"
    # remove + add
    ec=EditingContext()
    emp, holidays=self._test_11a_init_emp_and_holidays(ec)

    emp.removeFromHolidays(holidays)
    h2=Holidays()
    sd2=DateFrom('2003-01-01 01:00')
    ed2=DateFrom('2003-02-02 02:00')
    h2.setStartDate(sd2)
    h2.setEndDate(ed2)
    ec.insert(h2)
    emp.addToHolidays(h2)
    ec.saveChanges()

    emp_gid=emp.globalID()
    h_gid=holidays.globalID()
    db=ec.rootObjectStore().objectStoreForObject(emp).database()
    self.failUnless(db.snapshotForGlobalID(h_gid)['fkEmployeeId'] is None)
    del ec

    # make sure we won't get informations from the db cache
    self.failIf(db.snapshotForSourceGlobalID(emp_gid, 'holidays'))
    ec2=EditingContext()
    emp2=ec2.fetch('Employee', 'lastName=="Test11"', isDeep=1)[0]
    self.assertEqual(len(emp2.getHolidays()), 1)
    h2b=emp2.getHolidays()[0]

    # the fetched value may be of a different type (python's datetime),
    # if returned as such by the python db-adapter (mysqldb e.g.)
    if getattr(h2b.getStartDate(),'isoformat',None):
      self.assertEqual(sd2.Format('%Y-%m-%dT%H:%M:%S'),
                       h2b.getStartDate().isoformat())
      self.assertEqual(ed2.Format('%Y-%m-%dT%H:%M:%S'),
                       h2b.getEndDate().isoformat())
    else:
      self.assertEqual(sd2, h2b.getStartDate())
      self.assertEqual(ed2, h2b.getEndDate())
    #print db.snapshotForGlobalID(h_gid)
    
  def test_11b_toMany_with_no_inverse__remove(self):
    "[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/b]"
    self._test_11_class_employee=Employee.Employee
    self._test_11b_toMany_with_no_inverse__remove()
    self._test_11_class_employee=SalesClerk
    self._test_11b_toMany_with_no_inverse__remove()

  def _test_11c_toMany_with_no_inverse__remove(self):
    "[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/c]"
    # remove + add
    ec=EditingContext()
    emp, holidays=self._test_11a_init_emp_and_holidays(ec)

    emp.removeFromHolidays(holidays)
    ec.delete(emp)
    ec.saveChanges()

    db=ec.rootObjectStore().objectStoreForObject(holidays).database()
    h_gid=holidays.globalID()
    del ec

    ec2=EditingContext()
    h2=ec2.fetch('Holidays')[0]
    #print db.snapshotForGlobalID(h_gid)
    self.failIf(db.snapshotForGlobalID(h_gid)['fkEmployeeId']!=None)

  def test_11c_toMany_with_no_inverse__remove(self):
    "[EC/Inheritance] toMany w/o inverse: obj. correctly updated [insert/c]"
    self._test_11_class_employee=Employee.Employee
    self._test_11c_toMany_with_no_inverse__remove()
    self._test_11_class_employee=SalesClerk
    self._test_11c_toMany_with_no_inverse__remove()

  def _test_12_toMany_with_no_inverse__correct_fetch(self):
    "[EC/Inheritance] toMany w/o inverse: obj. correctly fetched"
    ec=EditingContext()
    emp, holidays=self._test_11a_init_emp_and_holidays(ec)
    db=ec.rootObjectStore().objectStoreForObject(holidays).database()
    h_gid=holidays.globalID()
    del ec
    ec2=EditingContext()
    h2=ec2.fetch('Holidays')[0]
    self.failIf(db.snapshotForGlobalID(h_gid)['fkEmployeeId']==None)
    
  def test_12_toMany_with_no_inverse__correct_fetch(self):
    "[EC/Inheritance] toMany w/o inverse: obj. correctly fetched"
    self._test_11_class_employee=Employee.Employee
    self._test_12_toMany_with_no_inverse__correct_fetch()
    self._test_11_class_employee=SalesClerk
    self._test_12_toMany_with_no_inverse__correct_fetch()

  def _test_13_toMany_with_no_inverse__snapshot_row(self):
    "[EC/Inheritance] toMany w/o inverse: obj.snapshot_raw()"
    ec=EditingContext()
    emp, holidays=self._test_10_create_emp_n_holidays()
    ec.insert(emp)
    ec.insert(holidays)
    emp.addToHolidays(holidays)

    # Not computed before saveChanges()
    self.failUnless(holidays.snapshot_raw().has_key('fkEmployeeId'))
    self.assertEqual(holidays.snapshot_raw()['fkEmployeeId'], None)
    ec.saveChanges()
    #print holidays.snapshot_raw()
    # We check here that snapshot_raw() is able to find the value
    # for the FK by examining the database's cache.
    self.assertEqual(holidays.snapshot_raw()['fkEmployeeId'],
                     emp.globalID().keyValues()['id'])
    
  def test_13_toMany_with_no_inverse__snapshot_row(self):
    "[EC/Inheritance] toMany w/o inverse: obj.snapshot_raw()"
    self._test_10_class_employee=Employee.Employee
    self._test_13_toMany_with_no_inverse__snapshot_row()

    self._test_10_class_employee=SalesClerk
    self._test_13_toMany_with_no_inverse__snapshot_row()
  

  def tearDown(self):
    """
    Cleans up the Database after each tests
    """
    ec=EditingContext()
    w=Store()
    dbChannel=ec.rootObjectStore().objectStoreForObject(w).availableChannel()
    channel=dbChannel.adaptorChannel()
    sqlExpr=channel.adaptorContext().adaptor().expressionClass()()
    sqlExpr.setStatement("delete from STORE where id>1")
    channel.evaluateExpression(sqlExpr)
    sqlExpr.setStatement("delete from SALES_CLERK where id>2")
    channel.evaluateExpression(sqlExpr)
    # id>3 coz' SALES_CLERK and EXECUTIVE share the same SQL sequence
    sqlExpr.setStatement("delete from EXECUTIVE where id>3") 
    channel.evaluateExpression(sqlExpr)
    sqlExpr.setStatement("delete from ADDRESS where id>3")
    channel.evaluateExpression(sqlExpr)
    sqlExpr.setStatement("delete from EMPLOYEE")
    channel.evaluateExpression(sqlExpr)
    sqlExpr.setStatement("delete from HOLIDAYS")
    channel.evaluateExpression(sqlExpr)


def reinitDB(database_cfg):
  """
  Reinitializes the database: recreates the db's schema needed for these tests
  to run. Also inserts some data in the created db.
  """
  from Modeling.SchemaGeneration import SchemaGeneration,\
       CreateDatabaseKey, DropDatabaseKey, CreateTablesKey, DropTablesKey, \
       CreatePrimaryKeySupportKey, DropPrimaryKeySupportKey,\
       ForeignKeyConstraintsKey, PrimaryKeyConstraintsKey
  options={
    DropDatabaseKey            : 1,
    CreateDatabaseKey          : 1,
    DropTablesKey              : 1,
    CreateTablesKey            : 1,
    CreatePrimaryKeySupportKey : 1,
    DropPrimaryKeySupportKey   : 1,
    ForeignKeyConstraintsKey   : 0,
    PrimaryKeyConstraintsKey   : 1,
    }
  model=ModelSet.defaultModelSet().modelNamed('StoreEmployees')
  pgAdaptor=Adaptor.adaptorWithModel(model)
  schemaGeneration = pgAdaptor.schemaGenerationFactory()
  sqlExprs=schemaGeneration.schemaCreationStatementsForEntities(model.entities(), options)
  context=pgAdaptor.createAdaptorContext()
  channel=context.createAdaptorChannel()
  for sqlExpr in sqlExprs:
    try:   channel.evaluateExpression(sqlExpr)
    except Exception, exc:
      print 'error: %s'%str(exc)

  if database_cfg=='Oracle.cfg':
    sqlExpr.setStatement("alter session set nls_date_format = 'YYYY-MM-DD HH24:mi:ss'")
    channel.evaluateExpression(sqlExpr)

  # Store
  Store=model.entityNamed('Store')
  pks=channel.primaryKeysForNewRowsWithEntity(1, Store)
  st_pk=pks[0].values()[0]
  sqlExpr=pgAdaptor.expressionClass()()
  sqlExpr.setStatement("insert into STORE (id, corporate_name) "\
                       "values (%i,'Flying Circus')"%st_pk)
  channel.evaluateExpression(sqlExpr)

  # SalesClerk
  SalesClerk=model.entityNamed('SalesClerk')
  pks=channel.primaryKeysForNewRowsWithEntity(2, SalesClerk)
  sc_pks=[pk.values()[0] for pk in pks]
  sqlExpr.setStatement("insert into SALES_CLERK "\
                       "(id, FIRST_NAME, LAST_NAME, STORE_AREA, FK_STORE_ID) "\
                       "values (%i,'John Jr.','Cleese', 'AB', %i)"%(sc_pks[0],
                                                                    st_pk))
  channel.evaluateExpression(sqlExpr)
  sqlExpr.setStatement("insert into SALES_CLERK "\
                       "(ID, FIRST_NAME, LAST_NAME, STORE_AREA, FK_STORE_ID) "\
                       "values (%i,'Jeanne','Cleese', 'DE', %i)"%(sc_pks[1],
                                                                  st_pk))
  channel.evaluateExpression(sqlExpr)

  # Executive
  Executive=model.entityNamed('Executive')
  pks=channel.primaryKeysForNewRowsWithEntity(1, Executive)
  ex_pk=pks[0].values()[0]
  sqlExpr.setStatement("insert into EXECUTIVE "\
                       "(id,FIRST_NAME,LAST_NAME,OFFICE_LOCATION,FK_STORE_ID)"\
                       " values (%i,'John','Cleese', '4XD7', %i)"%(ex_pk,
                                                                   st_pk))
  channel.evaluateExpression(sqlExpr)

  # Address
  Address=model.entityNamed('Address')
  pks=channel.primaryKeysForNewRowsWithEntity(3, Address)
  ad_pks=[pk.values()[0] for pk in pks]
  sqlExpr.setStatement("insert into ADDRESS "\
                       "(ID, STREET, ZIP_CODE, TOWN, FK_EMPLOYEE_ID) "\
                       "values (%i,'2, rue Chose','45','ChoseVille',%i)"\
                       %(ad_pks[0],
                         sc_pks[0])) # John Jr.
  channel.evaluateExpression(sqlExpr)
  sqlExpr.setStatement("insert into ADDRESS "\
                       "(ID, STREET, ZIP_CODE, TOWN, FK_EMPLOYEE_ID) "\
                       "values (%i,'2b, rue Chose','45','ChoseVille',%i)"\
                       %(ad_pks[1],
                         sc_pks[1])) # Jeanne.
  channel.evaluateExpression(sqlExpr)
  sqlExpr.setStatement("insert into ADDRESS "\
                       "(ID, STREET, ZIP_CODE, TOWN, FK_EMPLOYEE_ID) "\
                       "values (%i,'4, rue MachinChose','45','ChoseVille',%i)"\
                       %(ad_pks[2],
                         ex_pk)) # John C.
  channel.evaluateExpression(sqlExpr)

  # Mark
  Mark=model.entityNamed('Mark')
  pks=channel.primaryKeysForNewRowsWithEntity(1, Mark)
  mark_pk=pks[0].values()[0]
  sqlExpr=pgAdaptor.expressionClass()()
  # John Cleese got a very good mark in January: 20/20
  sqlExpr.setStatement("insert into MARK (id, month, mark, FK_EXECUTIVE_ID) "\
                       "values (%i,'01','20',%i)"%(mark_pk,ex_pk)) # John C.
  channel.evaluateExpression(sqlExpr)


def usage(prgName, exitStatus=None):
  _usage="""%s [-v] [-V]
Global tests for the framework's core, testing inheritance-related features

Options
--------
  -v  Minimally verbose
  -V  Really verbose
  -h  Prints this message
  -p  enables profiling
  -r  reinitialize the (postgresql) database
         the database defined in the model 'model_StoreEmployees.xml'
         is altered: tables are dropped, then they are re-created along with
         constraints for PKs and FKs and PK support (sequences)
  -d  sets the database adaptor to use: Postgresql (default), MySQL, Oracle or
                                        or SQLite
""" % prgName
  if exitStatus is not None:
    print _usage
    sys.exit(exitStatus)
  else:
    return _usage


verbose=0
database_cfg='Postgresql.cfg'

def main(args):
  me=args[0]
  import getopt
  options, args = getopt.getopt(sys.argv[1:], 'vVprd:')
  #except: usage(me, 1)
  global verbose, database_cfg
  profile=0; reinitDB_flag=0
  for k, v in options:
    if k=='-h': usage(me, 1)
    if k=='-v': verbose=1; continue
    if k=='-V': verbose="Y"; continue
    if k=='-p': profile=1; continue
    if k=='-d':
      if v not in ('Postgresql', 'MySQL', 'Oracle', 'SQLite'): usage(me, 1)
      database_cfg='%s.cfg'%v
      continue
    if k=='-r': reinitDB_flag=1; continue
  if args: usage(me, 1) #raise 'Unexpected arguments', args

  # Initialization of model's caracs.
  model=ModelSet.defaultModelSet().modelNamed('StoreEmployees')
  Model.updateModelWithCFG(model, database_cfg)

  # MySQL specifics: change TIMESTAMP to DATETIME
  if database_cfg=='MySQL.cfg':
    model.entityNamed('Holidays').attributeNamed('startDate').setExternalType('DATETIME')
    model.entityNamed('Holidays').attributeNamed('endDate').setExternalType('DATETIME')

  # Oracle specifics: change TIMESTAMP to DATE
  if database_cfg=='Oracle.cfg':
    model.entityNamed('Holidays').attributeNamed('startDate').setExternalType('DATE')
    model.entityNamed('Holidays').attributeNamed('endDate').setExternalType('DATE')
    global DateFrom
    def OracleDateFrom(str):
      date, time=str.split()
      yyyy,mm,dd=map(lambda s: int(s), date.split('-'))
      h,m=map(lambda s: int(s), time.split(':'))
      s=0
      import DCOracle2
      return DCOracle2.Timestamp(yyyy,mm,dd,h,m,s)
    DateFrom=OracleDateFrom

  # SQLite specifics: test_03 requires that the framework closes the db
  # connection, so that the new context/channel can connect to the db (sqlite
  # does not allow concurrent access to the db).
  if database_cfg=='SQLite.cfg':
    import os
    os.environ['MDL_TRANSIENT_DB_CONNECTION']='yes'

  utils.enable_model_cache_and_compute()

  if reinitDB_flag: reinitDB(database_cfg); return
  
  if profile:
    import profile
    profile.run('utils.run_suite(test_suite(), verbosity=verbose)',
                'profile.out')
    return 
  else:
      return utils.run_suite(test_suite(), verbosity=verbose)

def test_suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(TestEditingContext_Global_Inheritance,
                                     "test_"))
    return suite

if __name__ == "__main__":
  errs = main(sys.argv)
  #errs = utils.run_suite(test_suite())
  sys.exit(errs and 1 or 0)
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.