test_memusage.py :  » Database » SQLAlchemy » SQLAlchemy-0.6.0 » test » aaa_profiling » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Database » SQLAlchemy 
SQLAlchemy » SQLAlchemy 0.6.0 » test » aaa_profiling » test_memusage.py
from sqlalchemy.test.testing import eq_
from sqlalchemy.orm import mapper,relationship,create_session,clear_mappers,sessionmaker
from sqlalchemy.orm.mapper import _mapper_registry
from sqlalchemy.orm.session import _sessions
from sqlalchemy.util import jython
import operator
from sqlalchemy.test import testing
from sqlalchemy import MetaData,Integer,String,ForeignKey,PickleType
from sqlalchemy.test.schema import Table,Column
import sqlalchemy as sa
from sqlalchemy.sql import column
from sqlalchemy.test.util import gc_collect
import gc
from test.orm import _base

if jython:
    from nose import SkipTest
    raise SkipTest("Profiling not supported on this platform")


class A(_base.ComparableEntity):
    pass
class B(_base.ComparableEntity):
    pass

def profile_memory(func):
    # run the test 50 times.  if length of gc.get_objects()
    # keeps growing, assert false
    def profile(*args):
        gc_collect()
        samples = [0 for x in range(0, 50)]
        for x in range(0, 50):
            func(*args)
            gc_collect()
            samples[x] = len(gc.get_objects())
        print "sample gc sizes:", samples

        assert len(_sessions) == 0
        
        for x in samples[-4:]:
            if x != samples[-5]:
                flatline = False
                break
        else:
            flatline = True

        if not flatline and samples[-1] > samples[0]:  # object count is bigger than when it started
            for x in samples[1:-2]:
                if x > samples[-1]:     # see if a spike bigger than the endpoint exists
                    break
            else:
                assert False, repr(samples) + " " + repr(flatline)

    return profile

def assert_no_mappers():
    clear_mappers()
    gc_collect()
    assert len(_mapper_registry) == 0

class EnsureZeroed(_base.ORMTest):
    def setup(self):
        _sessions.clear()
        _mapper_registry.clear()

class MemUsageTest(EnsureZeroed):
    
    # ensure a pure growing test trips the assertion
    @testing.fails_if(lambda: True)
    def test_fixture(self):
        class Foo(object):
            pass
            
        x = []
        @profile_memory
        def go():
            x[-1:] = [Foo(), Foo(), Foo(), Foo(), Foo(), Foo()]
        go()
            
    def test_session(self):
        metadata = MetaData(testing.db)

        table1 = Table("mytable", metadata,
            Column('col1', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('col2', String(30)))

        table2 = Table("mytable2", metadata,
            Column('col1', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('col2', String(30)),
            Column('col3', Integer, ForeignKey("mytable.col1")))

        metadata.create_all()

        m1 = mapper(A, table1, properties={
            "bs":relationship(B, cascade="all, delete", order_by=table2.c.col1)},
            order_by=table1.c.col1)
        m2 = mapper(B, table2)

        m3 = mapper(A, table1, non_primary=True)

        @profile_memory
        def go():
            sess = create_session()
            a1 = A(col2="a1")
            a2 = A(col2="a2")
            a3 = A(col2="a3")
            a1.bs.append(B(col2="b1"))
            a1.bs.append(B(col2="b2"))
            a3.bs.append(B(col2="b3"))
            for x in [a1,a2,a3]:
                sess.add(x)
            sess.flush()
            sess.expunge_all()

            alist = sess.query(A).all()
            eq_(
                [
                    A(col2="a1", bs=[B(col2="b1"), B(col2="b2")]),
                    A(col2="a2", bs=[]),
                    A(col2="a3", bs=[B(col2="b3")])
                ],
                alist)

            for a in alist:
                sess.delete(a)
            sess.flush()
        go()

        metadata.drop_all()
        del m1, m2, m3
        assert_no_mappers()

    def test_mapper_reset(self):
        metadata = MetaData(testing.db)

        table1 = Table("mytable", metadata,
            Column('col1', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('col2', String(30)))

        table2 = Table("mytable2", metadata,
            Column('col1', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('col2', String(30)),
            Column('col3', Integer, ForeignKey("mytable.col1")))

        @profile_memory
        def go():
            m1 = mapper(A, table1, properties={
                "bs":relationship(B, order_by=table2.c.col1)
            })
            m2 = mapper(B, table2)

            m3 = mapper(A, table1, non_primary=True)

            sess = create_session()
            a1 = A(col2="a1")
            a2 = A(col2="a2")
            a3 = A(col2="a3")
            a1.bs.append(B(col2="b1"))
            a1.bs.append(B(col2="b2"))
            a3.bs.append(B(col2="b3"))
            for x in [a1,a2,a3]:
                sess.add(x)
            sess.flush()
            sess.expunge_all()

            alist = sess.query(A).order_by(A.col1).all()
            eq_(
                [
                    A(col2="a1", bs=[B(col2="b1"), B(col2="b2")]),
                    A(col2="a2", bs=[]),
                    A(col2="a3", bs=[B(col2="b3")])
                ],
                alist)

            for a in alist:
                sess.delete(a)
            sess.flush()
            sess.close()
            clear_mappers()

        metadata.create_all()
        try:
            go()
        finally:
            metadata.drop_all()
        assert_no_mappers()

    def test_with_inheritance(self):
        metadata = MetaData(testing.db)

        table1 = Table("mytable", metadata,
            Column('col1', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('col2', String(30))
            )

        table2 = Table("mytable2", metadata,
            Column('col1', Integer, ForeignKey('mytable.col1'),
                   primary_key=True, test_needs_autoincrement=True),
            Column('col3', String(30)),
            )

        @profile_memory
        def go():
            class A(_base.ComparableEntity):
                pass
            class B(A):
                pass

            mapper(A, table1,
                   polymorphic_on=table1.c.col2,
                   polymorphic_identity='a')
            mapper(B, table2,
                   inherits=A,
                   polymorphic_identity='b')

            sess = create_session()
            a1 = A()
            a2 = A()
            b1 = B(col3='b1')
            b2 = B(col3='b2')
            for x in [a1,a2,b1, b2]:
                sess.add(x)
            sess.flush()
            sess.expunge_all()

            alist = sess.query(A).order_by(A.col1).all()
            eq_(
                [
                    A(), A(), B(col3='b1'), B(col3='b2')
                ],
                alist)

            for a in alist:
                sess.delete(a)
            sess.flush()

            # dont need to clear_mappers()
            del B
            del A

        metadata.create_all()
        try:
            go()
        finally:
            metadata.drop_all()
        assert_no_mappers()

    def test_with_manytomany(self):
        metadata = MetaData(testing.db)

        table1 = Table("mytable", metadata,
            Column('col1', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('col2', String(30))
            )

        table2 = Table("mytable2", metadata,
            Column('col1', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('col2', String(30)),
            )

        table3 = Table('t1tot2', metadata,
            Column('t1', Integer, ForeignKey('mytable.col1')),
            Column('t2', Integer, ForeignKey('mytable2.col1')),
            )

        @profile_memory
        def go():
            class A(_base.ComparableEntity):
                pass
            class B(_base.ComparableEntity):
                pass

            mapper(A, table1, properties={
                'bs':relationship(B, secondary=table3, backref='as', order_by=table3.c.t1)
            })
            mapper(B, table2)

            sess = create_session()
            a1 = A(col2='a1')
            a2 = A(col2='a2')
            b1 = B(col2='b1')
            b2 = B(col2='b2')
            a1.bs.append(b1)
            a2.bs.append(b2)
            for x in [a1,a2]:
                sess.add(x)
            sess.flush()
            sess.expunge_all()

            alist = sess.query(A).order_by(A.col1).all()
            eq_(
                [
                    A(bs=[B(col2='b1')]), A(bs=[B(col2='b2')])
                ],
                alist)

            for a in alist:
                sess.delete(a)
            sess.flush()

            # dont need to clear_mappers()
            del B
            del A
            
        metadata.create_all()
        try:
            go()
        finally:
            metadata.drop_all()
        assert_no_mappers()

    # fails on newer versions of pysqlite due to unusual memory 
    # behvior in pysqlite itself.
    # background at: http://thread.gmane.org/gmane.comp.python.db.pysqlite.user/2290
    @testing.fails_if(lambda: 
                        testing.db.dialect.name == 'sqlite' and 
                        testing.db.dialect.dbapi.version > '2.5')
    def test_join_cache(self):
        metadata = MetaData(testing.db)

        table1 = Table("table1", metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('data', String(30))
            )

        table2 = Table("table2", metadata,
            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('data', String(30)),
            Column('t1id', Integer, ForeignKey('table1.id'))
            )
        
        class Foo(object):
            pass
            
        class Bar(object):
            pass
            
        mapper(Foo, table1, properties={
            'bars':relationship(mapper(Bar, table2))
        })
        metadata.create_all()

        session = sessionmaker()
        
        @profile_memory
        def go():
            s = table2.select()
            sess = session()
            sess.query(Foo).join((s, Foo.bars)).all()
            sess.rollback()
        try:
            go()
        finally:
            metadata.drop_all()
            
            
    def test_mutable_identity(self):
        metadata = MetaData(testing.db)

        table1 = Table("mytable", metadata,
            Column('col1', Integer, primary_key=True, test_needs_autoincrement=True),
            Column('col2', PickleType(comparator=operator.eq))
            )
        
        class Foo(object):
            def __init__(self, col2):
                self.col2 = col2
        
        mapper(Foo, table1)
        metadata.create_all()
        
        session = sessionmaker()()
        
        def go():
            obj = [
                Foo({'a':1}),
                Foo({'b':1}),
                Foo({'c':1}),
                Foo({'d':1}),
                Foo({'e':1}),
                Foo({'f':1}),
                Foo({'g':1}),
                Foo({'h':1}),
                Foo({'i':1}),
                Foo({'j':1}),
                Foo({'k':1}),
                Foo({'l':1}),
            ]
            
            session.add_all(obj)
            session.commit()
            
            testing.eq_(len(session.identity_map._mutable_attrs), 12)
            testing.eq_(len(session.identity_map), 12)
            obj = None
            gc_collect()
            testing.eq_(len(session.identity_map._mutable_attrs), 0)
            testing.eq_(len(session.identity_map), 0)
            
        try:
            go()
        finally:
            metadata.drop_all()

    def test_type_compile(self):
        from sqlalchemy.dialects.sqlite.base import dialect
        cast = sa.cast(column('x'), sa.Integer)
        @profile_memory
        def go():
            dialect = SQLiteDialect()
            cast.compile(dialect=dialect)
        go()
        
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.