test_EditingContext_Global.py :  » Database » Modeling-Framework » Modeling-0.9 » Modeling » tests » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Database » Modeling Framework 
Modeling Framework » Modeling 0.9 » Modeling » tests » test_EditingContext_Global.py
#! /usr/bin/env python
# -*- coding: iso-8859-1 -*-
#-----------------------------------------------------------------------------
# Modeling Framework: an Object-Relational Bridge for python
#
# Copyright (c) 2001-2004 Sbastien Bigaret <sbigaret@users.sourceforge.net>
# All rights reserved.
#
# This file is part of the Modeling Framework.
#
# This code is distributed under a "3-clause BSD"-style license;
# see the LICENSE file for details.
#-----------------------------------------------------------------------------
from __future__ import nested_scopes

"""
Global tests for EditingContext

  This tests the editing context and the whole underlying framework


  CVS information

    $Id: test_EditingContext_Global.py 942 2004-09-21 18:49:10Z sbigaret $
"""
__version__='$Revision: 942 $'[11:-2]

import unittest, sys
import utils

if __name__ == "__main__":
  utils.disable_model_cache()
  if '-c' in sys.argv or '-C' in sys.argv \
     or '-m' in sys.argv or '-M' in sys.argv:
    utils.fixpath(include_testPackages=0)
    utils.dynamically_build_test_packages(sys.argv)
  else:
    utils.fixpath(include_testPackages=1)

from Modeling import ModelSet,Model

from AuthorBooks.Writer import Writer
from AuthorBooks.Book import Book
  
from Modeling.EditingContext import EditingContext
from Modeling.FetchSpecification import FetchSpecification
from Modeling.Qualifier import qualifierWithQualifierFormat
from Modeling.Validation import ValidationException,\
     CUSTOM_OBJECT_VALIDATION, OBJECT_WIDE_KEY, DELETE_DENY_KEY
from Modeling import Adaptor
from Modeling import Database


class Writer_test07(Writer):
  refusesValidateForDelete=0
  def validateLastName(self, value):
    "Refuses empty strings"
    if not value:
      raise ValidationException
  def validateForDelete(self):
    _error=ValidationException()
    try:
      Writer.validateForDelete(self)
    except ValidationException, error:
      _error.aggregateException(error)
    if self.refusesValidateForDelete:
      _error.aggregateError(CUSTOM_OBJECT_VALIDATION, OBJECT_WIDE_KEY)
    _error.finalize()
    
# utilities
def concreteAdaptorChannelForEntity(anEntityName, ec):
  fs=FetchSpecification(anEntityName)
  dbContext=ec.rootObjectStore().objectStoreForFetchSpecification(fs)
  return dbContext.availableChannel().adaptorChannel()


class TestEditingContext_Global(unittest.TestCase):
  "Global tests for EditingContext"

  def test_00_insertObject_loads_databaseContexts(self):
    "[EditingContext] ..."
    # drop any existing default ObjectStoreCoordinator
    from Modeling import ObjectStoreCoordinator
    ObjectStoreCoordinator.setDefaultCoordinator(None)
    ec=EditingContext()
    w=Writer()
    ec.insertObject(w)
    objStoreCoord=ec.parentObjectStore()
    self.failUnless(objStoreCoord)
    self.failUnless(len(objStoreCoord.cooperatingObjectStores())==1)
    self.failUnless(objStoreCoord.cooperatingObjectStores()[0].ownsObject(w))
    
  def test_01_objectsWithFetchSpecification_alone(self):
    "[EditingContext] objectsWithFetchSpecification alone"
    ec=EditingContext()
    fetchSpec=FetchSpecification(entityName='Writer')
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(len(objects)==3)
    for object in objects:
      if object.getLastName()=='Dard':
        dard=object
      if object.getLastName()=='Cleese':
        cleese=object
      if object.getLastName()=='Rabelais':
        rabelais=object
    self.failUnless(cleese.getLastName()=='Cleese')
    object=cleese
    self.failUnless('_lastName' in object.__dict__.keys()) 
    self.failUnless('_firstName' in object.__dict__.keys()) 
    self.failUnless('_age' in object.__dict__.keys()) 
    self.failIf('_id' in object.__dict__.keys()) 
    self.failIf('id' in object.__dict__.keys()) 
    self.failIf('fk_writer_id' in object.__dict__.keys()) 
    self.failIf('_fk_writer_id' in object.__dict__.keys()) 

    # check types
    self.failIf(cleese.getAge()!=24)
    from mx.DateTime import DateTime
    self.failUnless(type(cleese.getBirthday())!=DateTime)
    #print objects
    #print objects[0].getE1()
    #print objects[0].__dict__
    #print objects[1].__dict__
    #print objects[2].__dict__
    #print '------------------Triggerring fault ------------------------'
    dard_pygmalion=dard.getPygmalion()
    self.failIf(dard_pygmalion.isFault()) # Shouldnt be a fault

    adaptorChannel=concreteAdaptorChannelForEntity('Writer', ec)
    fetchCount=adaptorChannel._count_for_execute
    dard_pygmalion.getLastName() # hence, this should not fetch from DB
    self.failIf(adaptorChannel._count_for_execute!=fetchCount)
    self.failIf(dard_pygmalion.isFault())

    # Last, check that the snapshots exists in the Database
    dard_gID=ec.globalIDForObject(dard)
    pyg_dard_gID=ec.globalIDForObject(dard_pygmalion)
    db=ec.parentObjectStore().objectStoreForGlobalID(dard_gID).database()
    self.failUnless(db.snapshotForGlobalID(dard_gID))

  def test_01b_objectsWithFetchSpecification_and_insertObject(self):
    "[EditingContext] objectsWithFetchSpecification & insertObject"
    # We want to check that objectsWithFetchSpecification() returns
    # objects within the database AND the newly inserted objects
    ec=EditingContext()
    w=Writer()
    ec.insertObject(w)
    fetchSpec=FetchSpecification(entityName='Writer')
    objects=ec.objectsWithFetchSpecification(fetchSpec)

    self.failUnless(w in objects)

  def test_01c_objectsWithFetchSpecification_closes_adaptorChannel(self):
    "[EditingContext] objectsWithFetchSpecification closes adaptorChannel"
    # We want to check that objectsWithFetchSpecification() closes its
    # adaptor channel afetr finishing
    ec=EditingContext()
    fetchSpec=FetchSpecification(entityName='Writer')
    objects=ec.objectsWithFetchSpecification(fetchSpec)

    objStore=ec.rootObjectStore().objectStoreForFetchSpecification(fetchSpec)
    adaptorChannel=objStore.availableChannel().adaptorChannel()
    self.failIf(adaptorChannel.isOpen())

  def test_01d_fetch(self):
    "[EditingContext] fetch == objectsWithFetchSpecification"
    ec=EditingContext()
    qualifier=qualifierWithQualifierFormat('lastName like "?a*"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects_1=ec.objectsWithFetchSpecification(fetchSpec)
    objects_2=ec.fetch('Writer', 'lastName like "?a*"')
    self.failIf(len(objects_1)!=len(objects_2))
    self.failIf([o for o in objects_1 if o not in objects_2])
    objects_3=ec.fetch('Writer', qualifier)
    self.failIf(len(objects_1)!=len(objects_3))
    self.failIf([o for o in objects_1 if o not in objects_3])
    
  def test_02_toOneFaultTrigger(self):
    "[EditingContext] toOneFaultTrigger"
    ec=EditingContext()
    qualifier=qualifierWithQualifierFormat('lastName=="Dard"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(len(objects)==1)

    dard=objects[0]
    self.failUnless(dard.getLastName()=='Dard')

    dard_pygmalion=dard.getPygmalion()

    self.failUnless(dard_pygmalion.isFault())
    dard_pygmalion.getLastName() #trigger
    self.failIf(dard_pygmalion.isFault())
    self.failUnless(dard._pygmalion==dard_pygmalion)

    # Last, check that the original object containing the toMany fault was not
    # marked as changed
    self.failIf(dard in ec.updatedObjects())
    self.failIf(ec.hasChanges())

  def test_02b_toOneFaultOrNone(self):
    "[EditingContext] toOne relationship are None, not fault"
    ec=EditingContext()
    qualifier=qualifierWithQualifierFormat('lastName=="Rabelais"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    rabelais=ec.objectsWithFetchSpecification(fetchSpec)[0]

    self.failUnless(rabelais.getPygmalion() is None)
    
  def test_03_toManyFaultTrigger(self):
    "[EditingContext] toManyFaultTrigger"
    ec=EditingContext()
    qualifier=qualifierWithQualifierFormat('lastName=="Dard"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(len(objects)==1)
    dard=objects[0]
    #print '--------- triggering toMany fault -----'
    #print dard.getBooks()
    #print len(dard.getBooks())
    #print type(dard.getBooks())
    #print dard.getBooks()[0]

    adaptorChannel=concreteAdaptorChannelForEntity('Writer', ec)
    books=dard.getBooks()
    self.failUnless(len(books)==3) # triggers the fault
    fetchCount=adaptorChannel._count_for_execute
    # all subsequent accesses to the AccessArrayFaultHandler should not
    # trigger the already-fired-fault
    self.failUnless(books[0]) # just test access w/ __getitem__
    self.failUnless(adaptorChannel._count_for_execute==fetchCount)
    
    # Last, check that the original object containing the toMany fault was not
    # marked as changed
    self.failIf(dard in ec.updatedObjects())

  def test_04_trackingChanges(self):
    "[EditingContext] tracking changes and processing changes"
    ##
    ## These tests was moved here from test_EditingContext.py
    ## We need the whole framework, somehow, so that fetch is available
    ##
    ec=EditingContext()
    w=Writer()
    ec.insertObject(w)
    self.failUnless(ec.hasChanges(), 'has changes')
    self.failUnless(ec.hasUnprocessedChanges(), 'has unprocessed changes')
    self.failIf(ec.insertedObjects())
    ec.processRecentChanges()
    self.failIf(ec.hasUnprocessedChanges(), 'no unprocessed changes anymore')
    self.failUnless(ec.hasChanges(), 'has changes')
    self.assertEqual(ec.insertedObjects(), [w])

    # new EC
    ec=EditingContext()
    fetchSpec=FetchSpecification(entityName='Writer')
    ws=ec.objectsWithFetchSpecification(fetchSpec)
    ws[0].setFirstName('Glop')
    self.failUnless(ec.hasChanges(), 'has changes')
    self.failUnless(ec.hasUnprocessedChanges(), 'has unprocessed changes')
    self.failIf(ec.updatedObjects())

    ec.processRecentChanges()
    self.failIf(ec.hasUnprocessedChanges(), 'no unprocessed changes anymore')
    self.failUnless(ec.hasChanges(), 'has changes')
    self.assertEqual(ec.updatedObjects(), [ws[0]])

    # new EC
    ec=EditingContext()
    qualifier=qualifierWithQualifierFormat('lastName=="Dard"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    dard=ec.objectsWithFetchSpecification(fetchSpec)[0]
    ec.deleteObject(dard)
    self.failIf(ec.deletedObjects())
    self.failUnless(ec.hasChanges(), 'has changes')
    self.failUnless(ec.hasUnprocessedChanges(), 'has unprocessed changes')
    dard.getPygmalion().willRead()
    #print '###############', dard.getPygmalion()
    #print '###############', dard.storedValueForKey('pygmalion')
    ec.processRecentChanges()
    self.failIf(ec.hasUnprocessedChanges(), 'no unprocessed changes anymore')
    self.failUnless(ec.hasChanges(), 'has changes')
    #self.assertEqual(ec.deletedObjects(), [dard])

  def test_05_deleteRule_cascade(self):
    "[EditingContext] processRecentChanges: CASCADE"
    # tests for CASCADE
    # __TBD we do not need to perform an actual fetch from the DB:
    # __TBD this is just a shortcut here, and it should be eventually moved to
    # __TBD test_EditingContext.py.
    ec=EditingContext()
    qualifier=qualifierWithQualifierFormat('lastName=="Dard"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    dard=ec.objectsWithFetchSpecification(fetchSpec)[0]

    # Delete object
    ec.deleteObject(dard)

    dard_books=dard.getBooks()
    ldb=len(dard_books) # trigger
    dard_books=dard.getBooks()
    self.failIf(dard_books[0] in ec.deletedObjects())
    ec.processRecentChanges()
    self.failUnless(len(ec.deletedObjects())==ldb+1)

    # Check that the author was also deleted [cascade]
    self.failUnless(dard in ec.deletedObjects())
    # AND that neither the author nor the book are marked as updated
    # [Bug #599602]
    self.failIf(dard_books[0] in ec.updatedObjects())
    self.failIf(dard in ec.updatedObjects())

    for book in dard_books:
      self.failUnless(book in ec.deletedObjects())
      
  def test_06_deleteRule_nullify(self):
    "[EditingContext] processRecentChanges: deleteRule: NULLIFY"
    # tests for NULLIFY
    # __TBD we do not need to perform an actual fetch from the DB:
    # __TBD this is just a shortcut here, and it should be eventually moved to
    # __TBD test_EditingContext.py.
    ec=EditingContext()
    qualifier=qualifierWithQualifierFormat('author.lastName caseInsensitiveLike "dard"')
    fetchSpec=FetchSpecification(entityName='Book', qualifier=qualifier)
    dard_book1=ec.objectsWithFetchSpecification(fetchSpec)[0]
    dard=dard_book1.getAuthor()

    # We need to trigger the fault here: if not, it will be triggered AFTER
    # deleteObject(), and the triggerring will in turn filters the
    # deletedObject from the returned set of objectsWithFetchSpecification(),
    # hence, the result will be 2, instead of 3 as expected for this
    # test
    #               !!! NOT TRUE ANYMORE, see notes below !!!
    #len(dard.getBooks()) 

    # Second note: this is not true anymore since DBContext's objsW/FetchSpec.
    # no longer filters its content depending on the state of the EC.
    # (changes made for release 0.9)
    
    # Implementation note:
    # AccessArrayFaultHandler.completeInitializationOfObject() calls
    # DBContext.objectsForSourceGlobalID() which in turn calls
    # DBContext.objectsWithFetchSpecification(). It seems reasonable, since
    # the deleted objects are nont deleted yet in the EC. However, we need
    # additional tests before removing these comments, to be sure that the
    # pendingDeletedObjects and deletedObjects in EC are not treated the same
    # way (put differently: unprocessed deleted objects() should appear in a
    # toMany fault after the fault is cleared, but should NOT be there if
    # processRecentChanges() already processed the deleted object).
    
    ec.deleteObject(dard_book1)
    self.failUnless(len(dard.getBooks())==3)
    ec.processRecentChanges()
    # Now the dard.getBooks() should have been refreshed and the book
    # that was deleted should be removed from dard's books.
    self.failIf(len(dard.getBooks())!=2)
    self.failIf(dard_book1 in dard.getBooks())


  def test_07_savechanges_01(self):
    "[EditingContext] saveChanges part 01: insert + validation"
    ec=EditingContext()
    hugo=Writer_test07()
    ec.insertObject(hugo)
    hugo.setLastName(None)
    hugo.setFirstName('Victor')
    self.assertRaises(ValidationException, ec.saveChanges)
    hugo.setLastName('')
    self.assertRaises(ValidationException, ec.saveChanges)
    hugo.setLastName('Hugo')
    hugo_temp_gID=ec.globalIDForObject(hugo)
    self.failUnless(hugo_temp_gID.isTemporary())
    try:
      ec.saveChanges()
    except:
      raise #self.fail("saveChanges() shouldn't fail here")

    ## No more inserted, updated or deleted objects
    self.failIf(ec.insertedObjects())
    self.failIf(ec.updatedObjects())
    self.failIf(ec.deletedObjects())
      
    ## Check that the globalID has been changed
    hugo_gID=ec.globalIDForObject(hugo)
    self.failIf(hugo_gID.isTemporary())
    self.failIf(ec.objectForGlobalID(hugo_temp_gID))

    ## since we're here, check CustomObject.globalID
    self.assertEqual(hugo_gID, hugo.globalID())
    
    ## check that DB snapshots are updated as expected
    db=ec.parentObjectStore().objectStoreForGlobalID(hugo_gID).database()
    self.failUnless(db.snapshotForGlobalID(hugo_gID))
      
  def test_08_savechanges_02(self):
    "[EditingContext] saveChanges part 02: delete + validation"
    # create it: this implies that test_07 did pass, or this will fail as well
    ec=EditingContext()
    hugo=Writer_test07()
    ec.insertObject(hugo)
    hugo.setFirstName('Victor'); hugo.setLastName('Hugo')
    try:
      ec.saveChanges()
    except:
      self.fail("insert failed: test aborted")
    
    # 
    ec=EditingContext()
    qualifier=qualifierWithQualifierFormat('lastName=="Hugo"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    hugo=ec.objectsWithFetchSpecification(fetchSpec)[0]

    ## 1st test: make sure a delete cannot be made persistent if an object
    ## has objects bound to a key corresponding to a relationship whose
    ## deleteRule() is DELETE_DENY

    # make sure that the relationship's rule is DELETE_DENY
    from Modeling.Relationship import DELETE_DENY
    writer=ModelSet.defaultModelSet().entityNamed('Writer')
    writer_books=writer.relationshipNamed('books')
    writer_books.setDeleteRule(DELETE_DENY)
    b=Book()
    hugo.__class__=Writer_test07 #so that we can easily fake custom validation
    hugo.addToBooks(b)
    ec.deleteObject(hugo)
    try:
      ec.saveChanges()
    except ValidationException, exc:
      self.failUnless(exc.errorsDict().has_key('books'))
      self.failUnless(DELETE_DENY_KEY in exc.errorsDict()['books'])
    else:
      self.fail('deleteObject() should have raised')

    ## 2nd test: check that custom validation works as expected
    hugo.removeFromBooks(b)
    hugo.refusesValidateForDelete=1
    try:
      ec.saveChanges()
    except ValidationException, exc:
      errorsDict=exc.errorsDict()
      self.failUnless(errorsDict.has_key(OBJECT_WIDE_KEY))
      self.failUnless(CUSTOM_OBJECT_VALIDATION in errorsDict[OBJECT_WIDE_KEY])
    else:
      self.fail('deleteObject() should have raised')

    ## 3rd test: check that 'objectsWithFetchSpecification()' does not
    ##           return objects marked for deletion
    fetchSpec=FetchSpecification(entityName='Writer')
    writers=ec.objectsWithFetchSpecification(fetchSpec)
    self.failIf(hugo in writers)
    
    ## 4rd test: now deleting it for real should be OK
    hugo.refusesValidateForDelete=0
    try:
      ec.saveChanges()
    except:
      self.fail("Error: deletion failed")

    ## 5th test: check that the object has been removed from the uniquing table
    self.failIf(hugo in ec.registeredObjects())
    
  def test_09_savechanges_03(self):
    "[EditingContext] saveChanges part 03: change + validation"
    # create it: this implies that test_07 did pass, or this will fail as well
    ec=EditingContext()
    hugo=Writer_test07()
    ec.insertObject(hugo)
    hugo.setFirstName('Victor'); hugo.setLastName('Hugo')
    hugo.setAge(46)
    ec.saveChanges()
    
    # TBD: invalidate to refault everything!
    ec=EditingContext()
    qualifier=qualifierWithQualifierFormat('lastName=="Hugo"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    hugo=ec.objectsWithFetchSpecification(fetchSpec)[0]

    hugo.setLastName(None)
    self.assertRaises(ValidationException, ec.saveChanges)
    hugo.setLastName('Hugo') # Nothing changed
    try:
      ec.saveChanges()
    except:
      raise
      self.fail("saveChanges() should not fail here")
    hugo.setLastName('hugo')
    try:
      ec.saveChanges()
    except:
      self.fail("saveChanges() should not fail here")

  def test_10_saveChanges_04(self):
    "[EditingContext] saveChanges part 04"
    # create it: this implies that test_07 did pass, or this will fail as well
    ec=EditingContext()
    hugo=Writer_test07()
    ec.insertObject(hugo)
    hugo.setFirstName('Victor'); hugo.setLastName('hugo')
    hugo.setAge(46)

    #b1=Book()
    #b1.setTitle("Les Misrables")
    #ec.insertObject(b1)
    #b1.addObjectToBothSidesOfRelationshipWithKey(hugo, 'author')
    
    ec.saveChanges()
    ec.dispose()
    # TBD: invalidate to refault everything!
    ec=EditingContext()
    qualifier=qualifierWithQualifierFormat('lastName caseInsensitiveLike "Hugo"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    hugo=ec.objectsWithFetchSpecification(fetchSpec)[0]
    qualifier=qualifierWithQualifierFormat('lastName=="Dard"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    dard=ec.objectsWithFetchSpecification(fetchSpec)[0]

    b1=Book()
    b1.setTitle("Les Misrables")
    b2=Book()
    b2.setTitle("Les Misrables: le retour")
    b2.setPrice(4.7)
    b3=Book()
    b3.setTitle("Les Misrables: la vengeance")
    ec.insertObject(b1) ; ec.insertObject(b2) ; ec.insertObject(b3)
    b1.addObjectToBothSidesOfRelationshipWithKey(hugo, 'author')
    b2.addObjectToBothSidesOfRelationshipWithKey(hugo, 'author')
    hugo.addObjectToBothSidesOfRelationshipWithKey(b3, 'books')
    hugo.setLastName('Hugo')
    b4=Book()
    b4.setTitle("Bloub bloub")
    dard.setAge(82)
    from mx.DateTime import DateFrom
    hugo.setBirthday(DateFrom('1802-02-26 14:28'))
    ec.insertObject(b4)
    dard.addToBooks(b4) ; b4.setAuthor(dard)
    
    ec.saveChanges()

    # Last, check that the Database has no snapshots left when the
    # EditingContext is finalized
    db=ec.rootObjectStore().objectStoreForObject(b4).database()
    #ec.dispose() # normally useless
    del ec
    self.failIf(db.snapshots())
    
  def test_11_allQualifierOperators(self):
    "[EditingContext] tests all qualifier operators"
    ec=EditingContext()
    qWQF=qualifierWithQualifierFormat
    # LIKE
    qualifier=qWQF('lastName like "Dard"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(objects[0].getLastName()=='Dard')
    
    qualifier=qWQF('lastName like "Da?d"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(objects[0].getLastName()=='Dard')

    qualifier=qWQF('lastName like "Da*"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(objects[0].getLastName()=='Dard')

    qualifier=qWQF('lastName like "?a*"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    objects_names=[o.getLastName() for o in objects]
    self.failUnless('Dard' in objects_names)
    self.failUnless('Rabelais' in objects_names)

    #CASE_INSENSITIVE_LIKE
    qualifier=qWQF('lastName caseInsensitiveLike "Dard"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(objects[0].getLastName()=='Dard')

    qualifier=qWQF('lastName caseInsensitiveLike "dard"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(objects[0].getLastName()=='Dard')

    qualifier=qWQF('lastName caseInsensitiveLike "?A*"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    objects_names=[o.getLastName() for o in objects]
    self.failUnless('Dard' in objects_names)
    self.failUnless('Rabelais' in objects_names)

    # EQUAL
    qualifier=qWQF('age==508')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    objects_names=[o.getLastName() for o in objects]
    self.failUnless(len(objects)==1)
    self.failUnless('Rabelais' in objects_names)

    # DIFFERENT
    qualifier=qWQF('age != 508')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    objects_names=[o.getLastName() for o in objects]
    self.failUnless(len(objects)==2)
    self.failUnless('Dard' in objects_names)
    self.failUnless('Cleese' in objects_names)

    # GREATER THAN
    qualifier=qWQF('age>90')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    objects_names=[o.getLastName() for o in objects]
    self.failUnless(len(objects)==1)
    self.failUnless('Rabelais' in objects_names)

    # GREATER THAN, OR EQUAL
    qualifier=qWQF('age>=50')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    objects_names=[o.getLastName() for o in objects]
    self.failUnless(len(objects)==2)
    self.failUnless('Rabelais' in objects_names)
    self.failUnless('Dard' in objects_names)

    # LESSER THAN
    qualifier=qWQF('age<90')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    objects_names=[o.getLastName() for o in objects]
    self.failUnless(len(objects)==2)
    self.failUnless('Dard' in objects_names)
    self.failUnless('Cleese' in objects_names)

    # LESSER THAN, OR EQUAL
    qualifier=qWQF('age<=24')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    objects_names=[o.getLastName() for o in objects]
    self.failUnless(len(objects)==1)
    self.failUnless('Cleese' in objects_names)

    # AND
    qualifier=qWQF('age>=24 AND lastName like "???e*"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    objects_names=[o.getLastName() for o in objects]
    self.failUnless(len(objects)==2)
    self.failUnless('Cleese' in objects_names)
    self.failUnless('Rabelais' in objects_names)

    # OR
    qualifier=qWQF('age<50 OR lastName like "????"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    objects_names=[o.getLastName() for o in objects]
    self.failUnless(len(objects)==2)
    self.failUnless('Cleese' in objects_names)
    self.failUnless('Dard' in objects_names)

    qualifier=qWQF('age<50 OR age>200')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    objects_names=[o.getLastName() for o in objects]
    self.failUnless(len(objects)==2)
    self.failUnless('Cleese' in objects_names)
    self.failUnless('Rabelais' in objects_names)

    # NOT
    qualifier=qWQF('age>200 OR NOT age<50')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    objects_names=[o.getLastName() for o in objects]
    self.failUnless(len(objects)==2)
    self.failUnless('Dard' in objects_names)
    self.failUnless('Rabelais' in objects_names)

    qualifier=qWQF('NOT(age<50 OR age>200)')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    objects_names=[o.getLastName() for o in objects]
    self.failUnless(len(objects)==1)
    self.failUnless('Dard' in objects_names)

    # Some others, mixed
    qualifier=qWQF('NOT age<50 OR age>200') #!!! equiv. to NOT(...)
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    objects_names=[o.getLastName() for o in objects]
    self.failUnless(len(objects)==1)
    self.failUnless('Dard' in objects_names)

    qualifier=qWQF('(age<100 OR age>200) AND books.title like "G*"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    objects_names=[o.getLastName() for o in objects]
    self.failUnless(len(objects)==1)
    self.failUnless('Rabelais' in objects_names)

    ## This one was added after a fix was proposed for bug #614261
    ## to make sure that a complex qualifier is okay as far as MySQL
    ## is concerned
    qualifier=qWQF('(age<100) AND pygmalion.books.title like "G*"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    objects_names=[o.getLastName() for o in objects]
    self.failUnless(len(objects)==1)
    self.failUnless('Dard' in objects_names)

  def test_12_fetchingDoesNotUpdateSnapshot(self):
    "[EditingContext] Fetching should not update the Database's snapshot"
    ec=EditingContext()
    hugo=Writer_test07()
    ec.insertObject(hugo)
    hugo.setFirstName('Victor'); hugo.setLastName('Hugo')
    hugo.setAge(46)
    ec.saveChanges()
    del ec
    
    ec1=EditingContext()
    qualifier=qualifierWithQualifierFormat('lastName=="Hugo"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec1.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(len(objects)==1)
    hugo1=objects[0]
    gid1=ec1.globalIDForObject(hugo1)
    database1=ec1.rootObjectStore().objectStoreForObject(hugo1).database()
    timestamp1=database1.timestampForGlobalID(gid1)
    snapshot1=database1.snapshotForGlobalID(gid1)
    # 
    # Change in the meantime
    #
    model=ModelSet.defaultModelSet().modelNamed('AuthorBooks')
    pgAdaptor=Adaptor.adaptorWithModel(model)
    context=pgAdaptor.createAdaptorContext()
    channel=context.createAdaptorChannel()
  
    sqlExpr=pgAdaptor.expressionClass()()
    sqlExpr.setStatement("update WRITER set LAST_NAME='hugoo' where id=%i"\
                         %gid1.keyValues().values()[0])
    channel.evaluateExpression(sqlExpr)

    
    #
    ec2=EditingContext()
    qualifier=qualifierWithQualifierFormat('firstName=="Victor"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec2.objectsWithFetchSpecification(fetchSpec)
    self.failUnless(len(objects)==1)
    hugo2=objects[0]
    gid2=ec2.globalIDForObject(hugo2)
    database2=ec2.rootObjectStore().objectStoreForObject(hugo2).database()
    timestamp2=database2.timestampForGlobalID(gid2)
    snapshot2=database2.snapshotForGlobalID(gid2)
    # 

    # Since we're here, we can check that we got the same Database object
    # as well
    self.failIf(database1!=database2, 'Database objects should be the same')

    # Now check that the corresponding snapshot has NOT been updated, nor its
    # timestamp
    self.failIf(timestamp1!=timestamp2, 'timestamp shouldnt have been changed')
    self.failIf(snapshot1!=snapshot2, 'snapshot shouldnt have been changed')

  def test_13_objectsCountWithFetchSpecification(self):
    "[EditingContext] objectsCountWithFetchSpecification"
    fs=FetchSpecification(entityName='Writer')
    ec=EditingContext()
    nb=ec.objectsCountWithFetchSpecification(fs)
    self.failIf(nb!=3)

  def test_13b_objectsCountWithFetchSpecification_closes_channel(self):
    "[EditingContext] objectsCountWithFetchSpecification closes channel"
    fs=FetchSpecification(entityName='Writer')
    ec=EditingContext()
    nb=ec.objectsCountWithFetchSpecification(fs)
    self.failIf(nb!=3)
    objStore=ec.rootObjectStore().objectStoreForFetchSpecification(fs)
    adaptorChannel=objStore.availableChannel().adaptorChannel()
    self.failIf(adaptorChannel.isOpen())
    
  def test_13c_fetchCount(self):
    "[EditingContext] fetchCount == objectsCountWithFetchSpecification"
    q=qualifierWithQualifierFormat('lastName like "?a*"')
    fs=FetchSpecification(entityName='Writer', qualifier=q)
    ec=EditingContext()
    self.assertEqual(ec.objectsCountWithFetchSpecification(fs),
                     ec.fetchCount('Writer', 'lastName like "?a*"'))
    #print ec.fetchCount('Writer', 'lastName like "?a*"')

  def test_14_qualifierWithNullValue(self):
    "[EditingContext] objectsWithFetchSpec. & NULL value"
    ## Note: this used to fail w/ MySQL, cf bug #614261
    q=qualifierWithQualifierFormat('author.pygmalion.books.price == NULL')
    fs=FetchSpecification('Book', qualifier=q)
    ec=EditingContext()
    nb_objs=ec.objectsCountWithFetchSpecification(fs)
    self.assertEqual(nb_objs, 3)        # only 3: author 'Dard' is the only
                                        # one with a pygmalion
    
  def test_15_propagatesInsertionForRelatedObjects_01(self):
    "[EditingContext] propagatesInsertionForRelatedObjects (updated objects)"
    ec=EditingContext()
    qualifier=qualifierWithQualifierFormat('lastName=="Cleese"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    cleese=ec.objectsWithFetchSpecification(fetchSpec)[0]
    pygmalion_cleese=Writer()
    book_cleese=Book()
    cleese.setPygmalion(pygmalion_cleese)
    cleese.addToBooks(book_cleese)
    ec.processRecentChanges()
    self.failIf(book_cleese in ec.insertedObjects())
    self.failIf(pygmalion_cleese in ec.insertedObjects())

    cleese.willChange()
    ec.setPropagatesInsertionForRelatedObjects(1)
    ec.processRecentChanges()
    self.failUnless(book_cleese in ec.insertedObjects())
    self.failUnless(pygmalion_cleese in ec.insertedObjects())
    
    # Last: check that subsequent changes are notified after
    #       ec.processRecentChanges()
    ec=EditingContext()
    ec.setPropagatesInsertionForRelatedObjects(1)
    qualifier=qualifierWithQualifierFormat('lastName=="Cleese"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    cleese=ec.objectsWithFetchSpecification(fetchSpec)[0]
    cleese.setLastName('Cleese')
    ec.processRecentChanges()
    
    pygmalion_cleese=Writer()
    book_cleese=Book()
    cleese.setPygmalion(pygmalion_cleese)
    cleese.addToBooks(book_cleese)
    ec.processRecentChanges()
    self.failUnless(book_cleese in ec.insertedObjects())
    self.failUnless(pygmalion_cleese in ec.insertedObjects())

  def test_15_propagatesInsertionForRelatedObjects_02(self):
    "[EditingContext] propagatesInsertionForRelatedObjects (inserted objects)"
    ec=EditingContext()
    ec.setPropagatesInsertionForRelatedObjects(1)
    hugo=Writer()
    hugo.setFirstName('Victor'); hugo.setLastName('hugo')
    hugo.setAge(46)
    ec.insertObject(hugo)

    b1=Book() ; b1.setTitle("Les Misrables")
    hugo.addObjectToBothSidesOfRelationshipWithKey(b1, 'books')

    ec.processRecentChanges()
    self.failUnless(b1 in ec.insertedObjects())
    
    #Check that subsequent changes are notified after ec.processRecentChanges()
    b2=Book() ; b2.setTitle("Les travailleurs de la mer")
    hugo.addObjectToBothSidesOfRelationshipWithKey(b2, 'books')
    ec.processRecentChanges()
    self.failUnless(b2 in ec.insertedObjects())
    
  def test_16_toManySnapshotsUpdated(self):
    "[EditingContext] checks that toManySnapshots are correctly updated"
    # Description of the bug tested here:
    # cf. comment in DatabaseContext.performChanges()
    ec=EditingContext(); ec.setPropagatesInsertionForRelatedObjects(1)

    # Create a new object, with rels.
    hugo=Writer()
    hugo.setFirstName('Victor'); hugo.setLastName('Hugo'); hugo.setAge(46)
    ec.insertObject(hugo)
    b1=Book() ; b1.setTitle("Les Misrables")
    b2=Book() ; b2.setTitle("Les travailleurs de la mer")
    hugo.addObjectToBothSidesOfRelationshipWithKey(b1, 'books')
    hugo.addObjectToBothSidesOfRelationshipWithKey(b2, 'books')
    ec.saveChanges()

    # Now delete one of these, on a new EditingContext, WITHOUT deleting the
    # former EC so that snapshots are not forgotten
    ec2=EditingContext()
    qualifier=qualifierWithQualifierFormat('lastName=="Hugo"')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    hugo2=ec2.objectsWithFetchSpecification(fetchSpec)[0]
    book=hugo2.getBooks()[0] # any of the two
    hugo2.removeObjectFromBothSidesOfRelationshipWithKey(book, 'books')
    ec2.deleteObject(book)
    ec2.saveChanges()

    # Check that the database snapshots were updated, as expected
    remaining_book=hugo2.getBooks()[0]
    db=ec2.rootObjectStore().objectStoreForObject(hugo2).database()
    hugo2_gID=ec2.globalIDForObject(hugo2)
    toManySnap=db.snapshotForSourceGlobalID(hugo2_gID, 'books')
    #print toManySnap
    self.failIf(len(toManySnap)!=1)
        
  def test_17_insertedObject_and_PK_as_classProperty(self):
    "[EC] Checks that a PK/class prop. gets its value after saving changes"
    ec=EditingContext()
    b=Book()
    b.setTitle('dummy title')
    ec.insertObject(b)
    ec.saveChanges()
    gid=ec.globalIDForObject(b)
    self.failUnless(b.getId() == gid.keyValues()['id'])
    
  def test_17b_insertedObject_and_FK_as_classProperty(self):
    "[EC] Checks that a FK/class prop. gets its value after saving changes"
    ec=EditingContext()
    w=Writer()
    w.setLastName('test author')
    ec.insertObject(w)
    ec.saveChanges()

    b=Book()
    ec.insertObject(b)
    b.setTitle('dummy title')
    b.setAuthor(w)
    w.addToBooks(b)
    self.failIf(b.getFK_Writer_Id())
    ec.saveChanges()
    gid=ec.globalIDForObject(w)
    # a FK defined as a class property is NOT updated by the framework
    # see the User's Guide, section B.1
    # so we get None instead of gid.keyValues()['id']
    self.assertEqual(b.getFK_Writer_Id(), None)
    
  def test_18_percent_in_qualifiers_should_be_correctly_escaped(self):
    "[EditingContext] percent in qualifiers should be correctly escaped"
    q=qualifierWithQualifierFormat('title like "G%"')
    fs=FetchSpecification('Book', qualifier=q)
    ec=EditingContext()
    objs=ec.objectsWithFetchSpecification(fs)
    self.failIf(objs)
   
  def test_19_in_not_in_operators_in_qualifiers(self):
    "[EditingContext] IN/NOT IN operators in qualifiers"
    ec=EditingContext()
    objects=ec.fetch('Writer', 'age in [24, 82, 81]')
    objects_names=[o.getLastName() for o in objects]
    self.failUnless(len(objects)==2)
    self.failUnless('Cleese' in objects_names)
    self.failUnless('Dard' in objects_names)

    objects=ec.fetch('Writer', 'age not in [24, 82, 81]')
    objects_names=[o.getLastName() for o in objects]
    self.failUnless(len(objects)==1)
    self.failUnless('Rabelais' in objects_names)

    objects=ec.fetch('Writer', 'age in [24]')
    objects_names=[o.getLastName() for o in objects]
    self.failUnless(len(objects)==1)
    self.failUnless('Cleese' in objects_names)

  def test_19b_in_not_in_operators_in_qualifiers_with_long(self):
    "[EditingContext] IN/NOT IN operators in qualifiers with longs"
    # Bug #779775
    from Modeling.Qualifier import KeyValueQualifier,QualifierOperatorIn
    ec=EditingContext()
    q=KeyValueQualifier('age', QualifierOperatorIn, [24L, 81L, 82L])
    objects=ec.fetch('Writer', q)
    objects_names=[o.getLastName() for o in objects]
    self.failUnless(len(objects)==2)
    self.failUnless('Cleese' in objects_names)
    self.failUnless('Dard' in objects_names)

  def test_19c_in_not_in_operators_in_qualifiers_with_strings(self):
    "[EditingContext] IN/NOT IN operators in qualifiers with strings & date"
    # Bug #847212: strings not handled properly
    from Modeling.Qualifier import KeyValueQualifier,QualifierOperatorIn
    ec=EditingContext()
    q=KeyValueQualifier('lastName', QualifierOperatorIn, ['Dard', 'Cleese'])
    objects=ec.fetch('Writer', q)
    objects_names=[o.getLastName() for o in objects]
    self.failUnless(len(objects)==2)
    self.failUnless('Cleese' in objects_names)
    self.failUnless('Dard' in objects_names)

    # While we're at it, test date
    q=KeyValueQualifier('author.birthday',
                        QualifierOperatorIn, ['1484-07-02 18:16:12'])
    objects=ec.fetch('Book', q)
    objects_names=[b.getTitle() for b in objects]
    self.failUnless(len(objects)==1)
    self.failUnless('Gargantua' in objects_names)
    # prepare last test w/ floats
    g=objects[0]
    g.setPrice(12.7)
    ec.saveChanges()

    try:
      # Test float
      ec2=EditingContext()
      q=KeyValueQualifier('books.price', QualifierOperatorIn, [12.7, 12.2])
      objects=ec2.fetch('Writer', q)
      objects_names=[o.getLastName() for o in objects]
      #print objects_names
      self.failUnless(len(objects)==1)
      self.failUnless('Rabelais' in objects_names)
    finally:
      g.setPrice(None)
      ec.saveChanges()

  def test_20_snapshot(self):
    "[EditingContext] CustomObject.snapshot()"
    ec=EditingContext()

    ## single object, no object related
    w=Writer()
    w.setLastName('Test'); w.setFirstName('20')
    ec.insertObject(w)

    ws=w.snapshot()
    def check_single_object():
      self.assertEqual(ws['lastName'], 'Test')
      self.assertEqual(ws['firstName'], '20')
      self.assertEqual(ws['birthday'], None)
      self.assertEqual(ws['pygmalion'], None)
      self.assertEqual(ws['books'], [])
    check_single_object()
    ec.saveChanges()
    check_single_object()

    ## all objects saved, no faults here
    b1=Book(); b1.setTitle('b1')
    b2=Book(); b2.setTitle('b2')
    ec.insert(b1)
    ec.insert(b2)
    w.addToBooks(b1) ; b1.setAuthor(w)
    w.addToBooks(b2) ; b2.setAuthor(w)
    p=Writer(); p.setLastName('test_20 pygmalion')
    ec.insert(p)
    w.setPygmalion(p)
    ws=w.snapshot()
    self.assertEqual(ws['lastName'], 'Test')
    self.assertEqual(ws['firstName'], '20')
    self.assertEqual(ws['pygmalion'], p.globalID())
    self.assertEqual(len(ws['books']), 2)
    self.failUnless(b1.globalID() in ws['books'])
    self.failUnless(b2.globalID() in ws['books'])
    ec.saveChanges()
    p_globalID=p.globalID()
    del ec

    ## Now both relationships 'pygmalion' and 'books' are faults
    ec=EditingContext()
    w=ec.fetch('Writer', 'lastName=="Test" and firstName=="20"')[0]
    ws=w.snapshot()
    self.assertEqual(ws['lastName'], 'Test')
    self.assertEqual(ws['firstName'], '20')

    # for a faulted to-one rel, we get the corresponding globalID
    self.assertEqual(ws['pygmalion'], p_globalID)
    self.failUnless(w.getPygmalion().isFault())

    # for a faulted to-many rel, we get a CustomObject.Snapshot_ToManyFault
    self.failUnless(w.getBooks().isFault())
    from Modeling.FaultHandler import AccessArrayFaultHandler
    from Modeling.CustomObject import Snapshot_ToManyFault
    self.assertEqual(ws['books'].__class__, Snapshot_ToManyFault)

    # last, we check that the to-many fault is accessible, if we want to:
    self.assertEqual(ws['books'].sourceGlobalID, w.globalID())
    self.assertEqual(ws['books'].key, 'books')
    tomany_fault=ws['books'].getToManyFault(ec)
    from Modeling.FaultHandler import AccessArrayFaultHandler
    self.failUnless(isinstance(tomany_fault, AccessArrayFaultHandler))
    self.failUnless(tomany_fault.isFault())

    # check that we got the same set of objects
    adaptorChannel=concreteAdaptorChannelForEntity('Writer', ec)
    fetchCount=adaptorChannel._count_for_execute
    self.assertEqual(len(tomany_fault), 2)
    self.assertEqual(adaptorChannel._count_for_execute,
                     fetchCount+1) # fault was cleared
    for book in w.getBooks():
      self.failIf(book not in tomany_fault)
      
  def test_21_snapshot_raw(self):
    "[EditingContext] CustomObject.snapshot_raw()"
    ec=EditingContext()

    # saved objects, related to other saved objects
    raw_fdard=ec.fetch('Writer', 'age in [81, 82]', rawRows=1)[0]
    fdard=ec.fetch('Writer', 'age in [81, 82]')[0]
    rabelais=ec.fetch('Writer', 'lastName == "Rabelais"')[0]
    fdard_snapshot_raw=fdard.snapshot_raw()
    self.assertEqual(raw_fdard, fdard_snapshot_raw)
    self.assertEqual(fdard_snapshot_raw['FK_Writer_id'],
                     rabelais.globalID().keyValues()['id'])

    # new object, related to saved objects
    w=Writer(); w.setFirstName('F'), w.setLastName('L')
    ec.insert(w)
    w.setPygmalion(fdard)
    w_sr=w.snapshot_raw()
    self.assertEqual(w_sr.keys(), fdard_snapshot_raw.keys())
    self.assertEqual(w_sr['FK_Writer_id'], fdard.globalID().keyValues()['id'])
    self.assertEqual(w_sr['id'], w.globalID()) # we get a TemporaryGlobalID
    self.failUnless(w_sr['id'].isTemporary())
    w.setPygmalion(None)
    ec.delete(w)

    # saved object, related to a new object
    w=Writer(); w.setFirstName('F'), w.setLastName('L')
    ec.insert(w)
    rabelais=fdard.getPygmalion()
    fdard.setPygmalion(w)
    self.assertEqual(fdard.snapshot_raw().keys(), fdard_snapshot_raw.keys())
    self.assertEqual(fdard.snapshot_raw()['FK_Writer_id'], w.globalID())
    fdard.setPygmalion(rabelais)
    ec.delete(w)
    
    # new object, related to new objects
    w1=Writer(); w1.setFirstName('F'), w1.setLastName('L')
    w2=Writer(); w2.setFirstName('F'), w2.setLastName('L')
    ec.insert(w1)
    ec.insert(w2)
    w1.setPygmalion(w2)
    self.assertEqual(w1.snapshot_raw().keys(), fdard_snapshot_raw.keys())
    self.assertEqual(w1.snapshot_raw()['id'], w1.globalID())
    self.assertEqual(w1.snapshot_raw()['FK_Writer_id'], w2.globalID())
    ec.delete(w1); ec.delete(w2)

    # modified object
    fdard_name=fdard.getLastName()
    alternate_fdard_name=fdard_name+' -- testing'
    fdard.setLastName(alternate_fdard_name)
    self.assertEqual(fdard.snapshot_raw()['lastName'], alternate_fdard_name)

    # Faults
    fdard_gid=fdard.globalID()
    ec=EditingContext()
    fdard=ec.faultForGlobalID(fdard_gid, ec)
    self.failUnless(fdard.isFault())
    self.assertEqual(fdard.snapshot_raw()['lastName'], fdard_name)

  def test_22_cancel_delete(self):
    "[EditingContext] cancel a delete on an object"
    ## check: delete / insert
    ec=EditingContext()
    rabelais=ec.fetch('Writer', 'lastName == "Rabelais"')[0]
    ec.deleteObject(rabelais)
    self.failUnless(rabelais in ec.allDeletedObjects())
    self.failIf(rabelais in ec.deletedObjects()) # unprocessed
    ec.insertObject(rabelais) # cancel the deletion
    self.failIf(len(ec.fetch('Writer', 'lastName == "Rabelais"'))!=1)
    self.failIf(rabelais in ec.allDeletedObjects())
    self.failIf(rabelais in ec.allInsertedObjects())

    ## check: delete / processRecentChanges / insert
    ec=EditingContext()
    rabelais=ec.fetch('Writer', 'lastName == "Rabelais"')[0]
    ec.deleteObject(rabelais)
    self.failUnless(rabelais in ec.allDeletedObjects())
    ec.processRecentChanges()
    self.failUnless(rabelais in ec.deletedObjects())
    ec.insertObject(rabelais) # cancel the deletion
    self.failIf(rabelais in ec.allDeletedObjects())
    self.failIf(rabelais in ec.allInsertedObjects())

    ## check: delete / saveChanges / insert
    ec=EditingContext()
    new=Writer(); new.setLastName('test_22')
    ec.insert(new)
    ec.saveChanges()
    
    ec=EditingContext()
    new=ec.fetch('Writer', 'lastName == "test_22"')[0]
    new_gid=new.globalID()
    ec.deleteObject(new)
    self.failUnless(new in ec.allDeletedObjects())
    self.failIf(new in ec.deletedObjects())
    ec.saveChanges()
    ec.insertObject(new) # cancel the deletion? No, insertion of a new object
    self.failIf(new in ec.allDeletedObjects())
    self.failUnless(new in ec.allInsertedObjects())
    self.failIf(new.globalID()==new_gid)
    self.failIf(not new.globalID().isTemporary())
    
    ## check: modify / delete / processRecentChanges / insert -> in modify?
    ec=EditingContext()
    new=Writer(); new.setLastName('test_22')
    ec.insert(new)
    ec.saveChanges()
    
    ec=EditingContext()
    new=ec.fetch('Writer', 'lastName == "test_22"')[0]
    new_gid=new.globalID()
    new.setLastName('test_22 alternate value')
    self.failUnless(new in ec.allUpdatedObjects())
    ec.deleteObject(new)
    self.failUnless(new in ec.allDeletedObjects())
    self.failIf(new in ec.deletedObjects())
    self.failIf(new in ec.allUpdatedObjects())
    ec.processRecentChanges()
    self.failUnless(new in ec.deletedObjects())
    self.failIf(new in ec.allUpdatedObjects())
    ec.insertObject(new) # cancel the deletion
    self.failIf(new in ec.allDeletedObjects())
    self.failUnless(new in ec.allUpdatedObjects())
    self.failUnless(new.globalID()==new_gid)
    

    ## check: modify / delete / saveChanges / insert
    
  def test_23_fetch_star_and_interrogation_mark_chars(self):
    "[EditingContext] fetch real '?' and '*' w/ LIKE"
    ec=EditingContext()
    b1=Book(); b1.setTitle('abc?d')
    b2=Book(); b2.setTitle('abcXd')
    b3=Book(); b3.setTitle('abc*d')
    ec.insert(b1); ec.insert(b2); ec.insert(b3)
    ec.saveChanges()

    res=ec.fetch('Book', 'title like "abc?d"')
    self.assertEqual(len(res), 3)
                
    res=ec.fetch('Book', 'title like "abc*d"')
    self.assertEqual(len(res), 3)

    res=ec.fetch('Book', 'title like "abc\?d"')
    self.assertEqual(len(res), 1)
    self.assertEqual(res[0].getTitle(), 'abc?d')

    res=ec.fetch('Book', 'title like "abc\*d"')
    self.assertEqual(len(res), 1)
    self.assertEqual(res[0].getTitle(), 'abc*d')

  def test_23b_fetch_star_and_interrogation_mark_chars(self):
    "[EditingContext] fetch real '?' and '*' w/ ILIKE"
    ec=EditingContext()
    b1=Book(); b1.setTitle('abc?d')
    b2=Book(); b2.setTitle('aBC?d')
    b3=Book(); b3.setTitle('abcXd')
    b4=Book(); b4.setTitle('abc*d')
    b5=Book(); b5.setTitle('abc*D')
    ec.insert(b1); ec.insert(b2); ec.insert(b3); ec.insert(b4); ec.insert(b5)
    ec.saveChanges()

    res=ec.fetch('Book', 'title ilike "abc?d"')
    self.assertEqual(len(res), 5)
                
    res=ec.fetch('Book', 'title ilike "abc*d"')
    self.assertEqual(len(res), 5)

    res=ec.fetch('Book', 'title ilike "abc\?d"')
    self.assertEqual(len(res), 2)
    titles=[r.getTitle() for r in res]
    self.failIf('abc?d' not in titles)
    self.failIf('aBC?d' not in titles)

    res=ec.fetch('Book', 'title ilike "abc\*d"')
    self.assertEqual(len(res), 2)
    titles=[r.getTitle() for r in res]
    self.failIf('abc*d' not in titles)
    self.failIf('abc*D' not in titles)

  def test_24_fetch_does_not_return_duplicates(self):
    "[EditingContext] fetch does not return duplicates"
    # bug #780495
    ec=EditingContext()
    ws=ec.fetch('Writer', 'books.title like "*"')
    # explicitely look for duplicates
    for w in ws:
      _ws=list(ws) #[:-1])
      _ws.remove(w)
      self.failIf(w in _ws, 'Duplicate found: %s'%w)
    
  def test_25_fetchCount_is_not_affected_by_duplicates(self):
    "[EditingContext] fetchCount is not affected by duplicates"
    # bug #780495
    ec=EditingContext()
    ws=ec.fetchCount('Writer', 'books.title like "*"')
    self.assertEqual(ws, 2)

  def test_26_fetch_underscore_percent(self):
    "[EditingContext] fetch real '_' and '%'"
    # bug #785913
    ec=EditingContext()
    b1=Book(); b1.setTitle('abc_d')
    b2=Book(); b2.setTitle('abcXd')
    b3=Book(); b3.setTitle('abc%d')
    ec.insert(b1); ec.insert(b2); ec.insert(b3)
    ec.saveChanges()

    res=ec.fetch('Book', 'title like "*abc_d*"')
    self.assertEqual(len(res), 1)
    self.assertEqual(res[0].getTitle(), "abc_d")
    res=ec.fetch('Book', 'title like "*abc%d*"')
    self.assertEqual(len(res), 1)
    self.assertEqual(res[0].getTitle(), "abc%d")
    # TBD: with ILIKE
    
  def test_27_like_is_case_sensitive(self):
    "[EditingContext] check that LIKE is case-sensitive"
    # bug #786217
    ec=EditingContext()
    b1=Book(); b1.setTitle('abcd')
    b2=Book(); b2.setTitle('aBcd')
    b3=Book(); b3.setTitle('aBCd')
    ec.insert(b1); ec.insert(b2); ec.insert(b3)
    ec.saveChanges()

    res=ec.fetch('Book', 'title like "abcd"')
    self.assertEqual(len(res), 1)
    self.assertEqual(res[0].getTitle(), "abcd")
    res=ec.fetch('Book', 'title like "aBCd"')
    self.assertEqual(len(res), 1)
    self.assertEqual(res[0].getTitle(), "aBCd")
                
  def test_28_bug857803(self):
    "[EditingContext] Complex query bug #857803"
    # bug #857803
    #'(age<100) AND ((books.title like "G*") OR (pygmalion.books.title like "G*"))'
    ec=EditingContext()
    qWQF=qualifierWithQualifierFormat
    qualifier=qWQF('(age<100) AND ((books.title like "G*") OR (pygmalion.books.title like "G*"))')
    fetchSpec=FetchSpecification(entityName='Writer', qualifier=qualifier)
    objects=ec.objectsWithFetchSpecification(fetchSpec)
    objects_names=[o.getLastName() for o in objects]
    self.failUnless(len(objects)==1)
    self.failUnless('Dard' in objects_names)

  def test_999_customSQLQuery(self):
    "[EditingContext] custom SQL Query"
    fs=FetchSpecification(entityName='Writer')
    #dbContext=EditingContext().rootObjectStore().objectStoreForFetchSpecification(fs)
    ec=EditingContext()
    #ec.insertObject(Writer())
    model=ModelSet.defaultModelSet().modelNamed('AuthorBooks')
    from Modeling import DatabaseContext
    dbContext=DatabaseContext.registeredDatabaseContextForModel(model, ec)
    adaptorChannel=dbContext.availableChannel().adaptorChannel()
    sqlExpr=adaptorChannel.adaptorContext().adaptor().expressionClass()()
    sqlExpr.setStatement('select max(age) from WRITER')
    adaptorChannel.setAttributesToFetch([model.entityNamed('Writer').attributeNamed('age')])

    ## Note: added begin/endTransaction()
    ## The reason for this is: module pypgsql discards a connection's cursor
    ## status when the cnx is committed or rolled back ; this makes this test
    ## fail when using pypgsql, since evaluateExpression() automatically
    ## opens and closes transaction if no transaction is opened when it is
    ## called
    adaptorChannel.adaptorContext().beginTransaction()

    if database_cfg=='SQLite.cfg':
      sqlExpr2=adaptorChannel.adaptorContext().adaptor().expressionClass()()
      sqlExpr2.setStatement('-- types int')
      adaptorChannel.evaluateExpression(sqlExpr2)
    adaptorChannel.evaluateExpression(sqlExpr)
    row=adaptorChannel.fetchRow()

    adaptorChannel.adaptorContext().commitTransaction() ## See comments above

    self.failUnless(row.get('age', None))
    self.failUnless(int(row['age'])==508)
    adaptorChannel.cancelFetch()

  def tearDown(self):
    """
    Cleans up the Database after each tests
    """
    ec=EditingContext()
    w=Writer()
    dbChannel=ec.rootObjectStore().objectStoreForObject(w).availableChannel()
    channel=dbChannel.adaptorChannel()
    sqlExpr=channel.adaptorContext().adaptor().expressionClass()()
    sqlExpr.setStatement("delete from BOOK where id>4")
    channel.evaluateExpression(sqlExpr)
    sqlExpr.setStatement("delete from WRITER where id>3")
    channel.evaluateExpression(sqlExpr)

def reinitDB(database_cfg):
  """
  Reinitializes the database: recreates the db's schema needed for these tests
  to run. Also inserts some data in the created db.
  """
  from Modeling.SchemaGeneration import SchemaGeneration,\
       DropPrimaryKeySupportKey, DropForeignKeyConstraintsKey,       \
       DropPrimaryKeyConstraintsKey, DropTablesKey, DropDatabaseKey, \
       CreateDatabaseKey, CreateTablesKey, PrimaryKeyConstraintsKey, \
       ForeignKeyConstraintsKey, CreatePrimaryKeySupportKey
  options={
    DropPrimaryKeySupportKey     : 1,
    DropForeignKeyConstraintsKey : 1,
    DropPrimaryKeyConstraintsKey : 1,
    DropTablesKey                : 1,
    DropDatabaseKey              : 0,
    CreateDatabaseKey            : 0,
    CreateTablesKey              : 1,
    PrimaryKeyConstraintsKey     : 1,
    ForeignKeyConstraintsKey     : 1,
    CreatePrimaryKeySupportKey   : 1,
  }
  model=ModelSet.defaultModelSet().modelNamed('AuthorBooks')
  pgAdaptor=Adaptor.adaptorWithModel(model)
  schemaGeneration = pgAdaptor.schemaGenerationFactory()
  sqlExprs=schemaGeneration.schemaCreationStatementsForEntities(model.entities(), options)
  context=pgAdaptor.createAdaptorContext()
  channel=context.createAdaptorChannel()
  for sqlExpr in sqlExprs:
    try:   channel.evaluateExpression(sqlExpr)
    except Exception, exc:
      print 'error: %s'%str(exc)

  pks=channel.primaryKeysForNewRowsWithEntity(3, model.entityNamed('Writer'))
  pks=[pk.values()[0] for pk in pks]
  sqlExpr=pgAdaptor.expressionClass()()

  if database_cfg=='Oracle.cfg':
    sqlExpr.setStatement("alter session set nls_date_format = 'YYYY-MM-DD HH24:mi:ss'")
    channel.evaluateExpression(sqlExpr)
    

  sqlExpr.setStatement("insert into WRITER "\
                       "(id, age, last_name,first_name,birthday,fk_writer_id)"\
                       " values (%i,24,'Cleese','John','1939-10-27 08:31:15',null)"%pks[0])
  channel.evaluateExpression(sqlExpr)
  sqlExpr.setStatement("insert into WRITER "\
                       "(id, age, last_name,first_name,birthday,fk_writer_id)"\
                       " values (%i,508,'Rabelais','Francois','1484-07-02 18:16:12',null)"%pks[1])
  channel.evaluateExpression(sqlExpr)
  sqlExpr.setStatement("insert into WRITER "\
                       "(id, age, last_name,first_name,birthday,fk_writer_id)"\
                       " values (%i,81,'Dard','Frederic','1921-06-29 04:56:34', %i)"%(pks[2], pks[1]))
  channel.evaluateExpression(sqlExpr)
  # Book
  bookPKs=channel.primaryKeysForNewRowsWithEntity(4, model.entityNamed('Book'))
  bookPKs=[pk.values()[0] for pk in bookPKs]
  sqlExpr.setStatement("insert into BOOK "\
                       "(id, title, fk_writer_id) "\
                       "values (%i,'Gargantua',%i)"%(bookPKs[0], pks[1]))
  channel.evaluateExpression(sqlExpr)
  sqlExpr.setStatement("insert into BOOK "\
                       "(id, title, fk_writer_id) "\
                       "values (%i,'Bouge ton pied que je voie la mer',%i)"%\
                       (bookPKs[1], pks[2]))
  channel.evaluateExpression(sqlExpr)
  sqlExpr.setStatement("insert into BOOK "\
                       "(id, title, fk_writer_id) "\
                       "values (%i,'Le coup du pere Francois',%i)"%\
                       (bookPKs[2], pks[2]))
  channel.evaluateExpression(sqlExpr)
  sqlExpr.setStatement("insert into BOOK "\
                       "(id, title, fk_writer_id) "\
                       "values(%i,'T\'\'assieds pas sur le compte-gouttes',%i)"% (bookPKs[3], pks[2]))
  channel.evaluateExpression(sqlExpr)
  #Fautil tuer les petits garons qui ont les mains sur les hanches ?
  #

def usage(prgName, exitStatus=None):
  _usage="""%s [-vV] [-p] [-d <dbAdaptorName>] [-r]
Runs the tests for the Modeling package

Options
--------
  -v  Minimally verbose
  -V  Really verbose
  -h  Prints this message
  -p  enables profiling
  -d  sets the database adaptor to use: Postgresql (default), MySQL, Oracle
                                        or SQLite
  -r  reinitialize the (postgresql) database
         the database defined in the model 'model_AuthorBooks'
         (located in testPackages/AuthorBooks/) is altered: tables are
         dropped, then they are re-created along with constraints for PKs and
         FKs and PK support (sequences)
""" % prgName
  if exitStatus is not None:
    print _usage
    sys.exit(exitStatus)
  else:
    return _usage


verbose=0
database_cfg='Postgresql.cfg'

def main(args):
  me=args[0]
  import getopt
  options, args = getopt.getopt(sys.argv[1:], 'vVprd:')
  #except: usage(me, 1)
  global verbose, database_cfg

  profile=0; reinitDB_flag=0
  for k, v in options:
    if k=='-h': usage(me, 1)
    if k=='-v': verbose=1; continue
    if k=='-V': verbose="Y"; continue
    if k=='-p': profile=1; continue
    if k=='-d':
      if v not in ('Postgresql', 'MySQL', 'Oracle', 'SQLite'): usage(me, 1)
      database_cfg='%s.cfg'%v
      continue
    if k=='-r': reinitDB_flag=1; continue

  if args: usage(me, 1) #raise 'Unexpected arguments', args

  author_books_model=ModelSet.defaultModelSet().modelNamed('AuthorBooks')
  # Initialization of model's caracs.
  Model.updateModelWithCFG(author_books_model, database_cfg)

  # MySQL specifics: change TIMESTAMP to DATETIME
  if database_cfg=='MySQL.cfg':
    author_books_model.entityNamed('Writer').attributeNamed('birthday').setExternalType('DATETIME')

  # Oracle specifics: change TIMESTAMP to DATE
  if database_cfg=='Oracle.cfg':
    author_books_model.entityNamed('Writer').attributeNamed('birthday').setExternalType('DATE')
    from AuthorBooks import Writer
    reload(Writer)
 
  # SQLite specifics: test_12 requires that the framework closes the db
  # connection, so that the new context/channel can connect to the db (sqlite
  # does not allow concurrent access to the db).
  if database_cfg=='SQLite.cfg':
    import os
    os.environ['MDL_TRANSIENT_DB_CONNECTION']='yes'

  utils.enable_model_cache_and_compute()

  if reinitDB_flag: reinitDB(database_cfg); return
  
  if profile:
    import profile
    profile.run('utils.run_suite(test_suite(), verbosity=verbose)',
                'profile.out')
    return 
  else:
      return utils.run_suite(test_suite(), verbosity=verbose)

def test_suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(TestEditingContext_Global, "test_"))
    return suite

if __name__ == "__main__":
  errs = main(sys.argv)
  #errs = utils.run_suite(test_suite())
  sys.exit(errs and 1 or 0)
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.