test_pool.py :  » Database » SQLAlchemy » SQLAlchemy-0.6.0 » test » engine » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Database » SQLAlchemy 
SQLAlchemy » SQLAlchemy 0.6.0 » test » engine » test_pool.py
import threading, time
from sqlalchemy import pool,interfaces,create_engine,select
import sqlalchemy as tsa
from sqlalchemy.test import TestBase,testing
from sqlalchemy.test.util import gc_collect,lazy_gc
from sqlalchemy.test.testing import eq_

mcid = 1
class MockDBAPI(object):
    def __init__(self):
        self.throw_error = False
    def connect(self, *args, **kwargs):
        if self.throw_error:
            raise Exception("couldnt connect !")
        delay = kwargs.pop('delay', 0)
        if delay:
            time.sleep(delay)
        return MockConnection()
class MockConnection(object):
    def __init__(self):
        global mcid
        self.id = mcid
        self.closed = False
        mcid += 1
    def close(self):
        self.closed = True
    def rollback(self):
        pass
    def cursor(self):
        return MockCursor()
class MockCursor(object):
    def execute(self, *args, **kw):
        pass
    def close(self):
        pass
mock_dbapi = MockDBAPI()


class PoolTestBase(TestBase):    
    def setup(self):
        pool.clear_managers()
        
    @classmethod
    def teardown_class(cls):
       pool.clear_managers()

class PoolTest(PoolTestBase):
    def testmanager(self):
        manager = pool.manage(mock_dbapi, use_threadlocal=True)

        connection = manager.connect('foo.db')
        connection2 = manager.connect('foo.db')
        connection3 = manager.connect('bar.db')

        self.assert_(connection.cursor() is not None)
        self.assert_(connection is connection2)
        self.assert_(connection2 is not connection3)

    def testbadargs(self):
        manager = pool.manage(mock_dbapi)

        try:
            connection = manager.connect(None)
        except:
            pass

    def testnonthreadlocalmanager(self):
        manager = pool.manage(mock_dbapi, use_threadlocal = False)

        connection = manager.connect('foo.db')
        connection2 = manager.connect('foo.db')

        self.assert_(connection.cursor() is not None)
        self.assert_(connection is not connection2)

    @testing.fails_on('+pyodbc', "pyodbc cursor doesn't implement tuple __eq__")
    def test_cursor_iterable(self):
        conn = testing.db.raw_connection()
        cursor = conn.cursor()
        cursor.execute(str(select([1], bind=testing.db)))
        expected = [(1,)]
        for row in cursor:
            eq_(row, expected.pop(0))
    
    def test_no_connect_on_recreate(self):
        def creator():
            raise Exception("no creates allowed")
        
        for cls in (pool.SingletonThreadPool, pool.StaticPool, 
                    pool.QueuePool, pool.NullPool, pool.AssertionPool):
            p = cls(creator=creator)
            p.dispose()
            p.recreate()
        
            mock_dbapi = MockDBAPI()
            p = cls(creator=mock_dbapi.connect)
            conn = p.connect()
            conn.close()
            mock_dbapi.throw_error = True
            p.dispose()
            p.recreate()
            
            
    def testthreadlocal_del(self):
        self._do_testthreadlocal(useclose=False)

    def testthreadlocal_close(self):
        self._do_testthreadlocal(useclose=True)

    def _do_testthreadlocal(self, useclose=False):
        for p in (
            pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True),
            pool.SingletonThreadPool(creator = mock_dbapi.connect, use_threadlocal = True)
        ):
            c1 = p.connect()
            c2 = p.connect()
            self.assert_(c1 is c2)
            c3 = p.unique_connection()
            self.assert_(c3 is not c1)
            if useclose:
                c2.close()
            else:
                c2 = None
            c2 = p.connect()
            self.assert_(c1 is c2)
            self.assert_(c3 is not c1)
            if useclose:
                c2.close()
            else:
                c2 = None
                lazy_gc()
                
            if useclose:
                c1 = p.connect()
                c2 = p.connect()
                c3 = p.connect()
                c3.close()
                c2.close()
                self.assert_(c1.connection is not None)
                c1.close()

            c1 = c2 = c3 = None

            # extra tests with QueuePool to ensure connections get __del__()ed when dereferenced
            if isinstance(p, pool.QueuePool):
                lazy_gc()

                self.assert_(p.checkedout() == 0)
                c1 = p.connect()
                c2 = p.connect()
                if useclose:
                    c2.close()
                    c1.close()
                else:
                    c2 = None
                    c1 = None
                    lazy_gc()
                self.assert_(p.checkedout() == 0)

    def test_properties(self):
        dbapi = MockDBAPI()
        p = pool.QueuePool(creator=lambda: dbapi.connect('foo.db'),
                           pool_size=1, max_overflow=0, use_threadlocal=False)

        c = p.connect()
        self.assert_(not c.info)
        self.assert_(c.info is c._connection_record.info)

        c.info['foo'] = 'bar'
        c.close()
        del c

        c = p.connect()
        self.assert_('foo' in c.info)

        c.invalidate()
        c = p.connect()
        self.assert_('foo' not in c.info)

        c.info['foo2'] = 'bar2'
        c.detach()
        self.assert_('foo2' in c.info)

        c2 = p.connect()
        self.assert_(c.connection is not c2.connection)
        self.assert_(not c2.info)
        self.assert_('foo2' in c.info)

    def test_listeners(self):
        dbapi = MockDBAPI()

        class InstrumentingListener(object):
            def __init__(self):
                if hasattr(self, 'connect'):
                    self.connect = self.inst_connect
                if hasattr(self, 'first_connect'):
                    self.first_connect = self.inst_first_connect
                if hasattr(self, 'checkout'):
                    self.checkout = self.inst_checkout
                if hasattr(self, 'checkin'):
                    self.checkin = self.inst_checkin
                self.clear()
            def clear(self):
                self.connected = []
                self.first_connected = []
                self.checked_out = []
                self.checked_in = []
            def assert_total(innerself, conn, fconn, cout, cin):
                eq_(len(innerself.connected), conn)
                eq_(len(innerself.first_connected), fconn)
                eq_(len(innerself.checked_out), cout)
                eq_(len(innerself.checked_in), cin)
            def assert_in(innerself, item, in_conn, in_fconn, in_cout, in_cin):
                self.assert_((item in innerself.connected) == in_conn)
                self.assert_((item in innerself.first_connected) == in_fconn)
                self.assert_((item in innerself.checked_out) == in_cout)
                self.assert_((item in innerself.checked_in) == in_cin)
            def inst_connect(self, con, record):
                print "connect(%s, %s)" % (con, record)
                assert con is not None
                assert record is not None
                self.connected.append(con)
            def inst_first_connect(self, con, record):
                print "first_connect(%s, %s)" % (con, record)
                assert con is not None
                assert record is not None
                self.first_connected.append(con)
            def inst_checkout(self, con, record, proxy):
                print "checkout(%s, %s, %s)" % (con, record, proxy)
                assert con is not None
                assert record is not None
                assert proxy is not None
                self.checked_out.append(con)
            def inst_checkin(self, con, record):
                print "checkin(%s, %s)" % (con, record)
                # con can be None if invalidated
                assert record is not None
                self.checked_in.append(con)

        class ListenAll(tsa.interfaces.PoolListener, InstrumentingListener):
            pass
        class ListenConnect(InstrumentingListener):
            def connect(self, con, record):
                pass
        class ListenFirstConnect(InstrumentingListener):
            def first_connect(self, con, record):
                pass
        class ListenCheckOut(InstrumentingListener):
            def checkout(self, con, record, proxy, num):
                pass
        class ListenCheckIn(InstrumentingListener):
            def checkin(self, con, record):
                pass

        def _pool(**kw):
            return pool.QueuePool(creator=lambda: dbapi.connect('foo.db'),
                                  use_threadlocal=False, **kw)

        def assert_listeners(p, total, conn, fconn, cout, cin):
            for instance in (p, p.recreate()):
                self.assert_(len(instance.listeners) == total)
                self.assert_(len(instance._on_connect) == conn)
                self.assert_(len(instance._on_first_connect) == fconn)
                self.assert_(len(instance._on_checkout) == cout)
                self.assert_(len(instance._on_checkin) == cin)

        p = _pool()
        assert_listeners(p, 0, 0, 0, 0, 0)

        p.add_listener(ListenAll())
        assert_listeners(p, 1, 1, 1, 1, 1)

        p.add_listener(ListenConnect())
        assert_listeners(p, 2, 2, 1, 1, 1)

        p.add_listener(ListenFirstConnect())
        assert_listeners(p, 3, 2, 2, 1, 1)

        p.add_listener(ListenCheckOut())
        assert_listeners(p, 4, 2, 2, 2, 1)

        p.add_listener(ListenCheckIn())
        assert_listeners(p, 5, 2, 2, 2, 2)
        del p

        snoop = ListenAll()
        p = _pool(listeners=[snoop])
        assert_listeners(p, 1, 1, 1, 1, 1)

        c = p.connect()
        snoop.assert_total(1, 1, 1, 0)
        cc = c.connection
        snoop.assert_in(cc, True, True, True, False)
        c.close()
        snoop.assert_in(cc, True, True, True, True)
        del c, cc

        snoop.clear()

        # this one depends on immediate gc
        c = p.connect()
        cc = c.connection
        snoop.assert_in(cc, False, False, True, False)
        snoop.assert_total(0, 0, 1, 0)
        del c, cc
        lazy_gc()
        snoop.assert_total(0, 0, 1, 1)

        p.dispose()
        snoop.clear()

        c = p.connect()
        c.close()
        c = p.connect()
        snoop.assert_total(1, 0, 2, 1)
        c.close()
        snoop.assert_total(1, 0, 2, 2)

        # invalidation
        p.dispose()
        snoop.clear()

        c = p.connect()
        snoop.assert_total(1, 0, 1, 0)
        c.invalidate()
        snoop.assert_total(1, 0, 1, 1)
        c.close()
        snoop.assert_total(1, 0, 1, 1)
        del c
        lazy_gc()
        snoop.assert_total(1, 0, 1, 1)
        c = p.connect()
        snoop.assert_total(2, 0, 2, 1)
        c.close()
        del c
        lazy_gc()
        snoop.assert_total(2, 0, 2, 2)

        # detached
        p.dispose()
        snoop.clear()

        c = p.connect()
        snoop.assert_total(1, 0, 1, 0)
        c.detach()
        snoop.assert_total(1, 0, 1, 0)
        c.close()
        del c
        snoop.assert_total(1, 0, 1, 0)
        c = p.connect()
        snoop.assert_total(2, 0, 2, 0)
        c.close()
        del c
        snoop.assert_total(2, 0, 2, 1)

        # recreated
        p = p.recreate()
        snoop.clear()

        c = p.connect()
        snoop.assert_total(1, 1, 1, 0)
        c.close()
        snoop.assert_total(1, 1, 1, 1)
        c = p.connect()
        snoop.assert_total(1, 1, 2, 1)
        c.close()
        snoop.assert_total(1, 1, 2, 2)
    
    def test_listeners_callables(self):
        dbapi = MockDBAPI()

        def connect(dbapi_con, con_record):
            counts[0] += 1
        def checkout(dbapi_con, con_record, con_proxy):
            counts[1] += 1
        def checkin(dbapi_con, con_record):
            counts[2] += 1

        i_all = dict(connect=connect, checkout=checkout, checkin=checkin)
        i_connect = dict(connect=connect)
        i_checkout = dict(checkout=checkout)
        i_checkin = dict(checkin=checkin)

        for cls in (pool.QueuePool, pool.StaticPool):
            counts = [0, 0, 0]
            def _pool(**kw):
                return cls(creator=lambda: dbapi.connect('foo.db'),
                                      use_threadlocal=False, **kw)

            def assert_listeners(p, total, conn, cout, cin):
                for instance in (p, p.recreate()):
                    self.assert_(len(instance.listeners) == total)
                    self.assert_(len(instance._on_connect) == conn)
                    self.assert_(len(instance._on_checkout) == cout)
                    self.assert_(len(instance._on_checkin) == cin)

            p = _pool()
            assert_listeners(p, 0, 0, 0, 0)

            p.add_listener(i_all)
            assert_listeners(p, 1, 1, 1, 1)

            p.add_listener(i_connect)
            assert_listeners(p, 2, 2, 1, 1)

            p.add_listener(i_checkout)
            assert_listeners(p, 3, 2, 2, 1)

            p.add_listener(i_checkin)
            assert_listeners(p, 4, 2, 2, 2)
            del p

            p = _pool(listeners=[i_all])
            assert_listeners(p, 1, 1, 1, 1)

            c = p.connect()
            assert counts == [1, 1, 0]
            c.close()
            assert counts == [1, 1, 1]

            c = p.connect()
            assert counts == [1, 2, 1]
            p.add_listener(i_checkin)
            c.close()
            assert counts == [1, 2, 3]

    def test_listener_after_oninit(self):
        """Test that listeners are called after OnInit is removed"""
        called = []
        def listener(*args):
            called.append(True)
        listener.connect = listener
        engine = create_engine(testing.db.url)
        engine.pool.add_listener(listener)
        engine.execute(select([1])).close()
        assert called, "Listener not called on connect"


class QueuePoolTest(PoolTestBase):

    def testqueuepool_del(self):
        self._do_testqueuepool(useclose=False)

    def testqueuepool_close(self):
        self._do_testqueuepool(useclose=True)

    def _do_testqueuepool(self, useclose=False):
        p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = False)

        def status(pool):
            tup = (pool.size(), pool.checkedin(), pool.overflow(), pool.checkedout())
            print "Pool size: %d  Connections in pool: %d Current Overflow: %d Current Checked out connections: %d" % tup
            return tup

        c1 = p.connect()
        self.assert_(status(p) == (3,0,-2,1))
        c2 = p.connect()
        self.assert_(status(p) == (3,0,-1,2))
        c3 = p.connect()
        self.assert_(status(p) == (3,0,0,3))
        c4 = p.connect()
        self.assert_(status(p) == (3,0,1,4))
        c5 = p.connect()
        self.assert_(status(p) == (3,0,2,5))
        c6 = p.connect()
        self.assert_(status(p) == (3,0,3,6))
        if useclose:
            c4.close()
            c3.close()
            c2.close()
        else:
            c4 = c3 = c2 = None
            lazy_gc()
            
        self.assert_(status(p) == (3,3,3,3))
        if useclose:
            c1.close()
            c5.close()
            c6.close()
        else:
            c1 = c5 = c6 = None
            lazy_gc()
            
        self.assert_(status(p) == (3,3,0,0))
        
        c1 = p.connect()
        c2 = p.connect()
        self.assert_(status(p) == (3, 1, 0, 2), status(p))
        if useclose:
            c2.close()
        else:
            c2 = None
            lazy_gc()
            
        self.assert_(status(p) == (3, 2, 0, 1))  

        c1.close()
       
        lazy_gc()
        assert not pool._refs
       
    def test_timeout(self):
        p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = 0, use_threadlocal = False, timeout=2)
        c1 = p.connect()
        c2 = p.connect()
        c3 = p.connect()
        now = time.time()
        try:
            c4 = p.connect()
            assert False
        except tsa.exc.TimeoutError, e:
            assert int(time.time() - now) == 2

    def test_timeout_race(self):
        # test a race condition where the initial connecting threads all race
        # to queue.Empty, then block on the mutex.  each thread consumes a
        # connection as they go in.  when the limit is reached, the remaining
        # threads go in, and get TimeoutError; even though they never got to
        # wait for the timeout on queue.get().  the fix involves checking the
        # timeout again within the mutex, and if so, unlocking and throwing
        # them back to the start of do_get()
        p = pool.QueuePool(
                creator = lambda: mock_dbapi.connect(delay=.05), 
                pool_size = 2, 
                max_overflow = 1, use_threadlocal = False, timeout=3)
        timeouts = []
        def checkout():
            for x in xrange(1):
                now = time.time()
                try:
                    c1 = p.connect()
                except tsa.exc.TimeoutError, e:
                    timeouts.append(time.time() - now)
                    continue
                time.sleep(4)
                c1.close()  

        threads = []
        for i in xrange(10):
            th = threading.Thread(target=checkout)
            th.start()
            threads.append(th)
        for th in threads:
            th.join() 

        assert len(timeouts) > 0
        for t in timeouts:
            assert t >= 3, "Not all timeouts were >= 3 seconds %r" % timeouts
            # normally, the timeout should under 4 seconds,
            # but on a loaded down buildbot it can go up.
            assert t < 10, "Not all timeouts were < 10 seconds %r" % timeouts

    def _test_overflow(self, thread_count, max_overflow):
        gc_collect()
        
        def creator():
            time.sleep(.05)
            return mock_dbapi.connect()
 
        p = pool.QueuePool(creator=creator,
                           pool_size=3, timeout=2,
                           max_overflow=max_overflow)
        peaks = []
        def whammy():
            for i in range(10):
                try:
                    con = p.connect()
                    time.sleep(.005)
                    peaks.append(p.overflow())
                    con.close()
                    del con
                except tsa.exc.TimeoutError:
                    pass
        threads = []
        for i in xrange(thread_count):
            th = threading.Thread(target=whammy)
            th.start()
            threads.append(th)
        for th in threads:
            th.join()
 
        self.assert_(max(peaks) <= max_overflow)
        
        lazy_gc()
        assert not pool._refs
 
    def test_no_overflow(self):
        self._test_overflow(40, 0)
 
    def test_max_overflow(self):
        self._test_overflow(40, 5)
 
    def test_mixed_close(self):
        p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True)
        c1 = p.connect()
        c2 = p.connect()
        assert c1 is c2
        c1.close()
        c2 = None
        assert p.checkedout() == 1
        c1 = None
        lazy_gc()
        assert p.checkedout() == 0
        
        lazy_gc()
        assert not pool._refs
 
    def test_weakref_kaboom(self):
        p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True)
        c1 = p.connect()
        c2 = p.connect()
        c1.close()
        c2 = None
        del c1
        del c2
        gc_collect()
        assert p.checkedout() == 0
        c3 = p.connect()
        assert c3 is not None
 
    def test_trick_the_counter(self):
        """this is a "flaw" in the connection pool; since threadlocal uses a single ConnectionFairy per thread
        with an open/close counter, you can fool the counter into giving you a ConnectionFairy with an
        ambiguous counter.  i.e. its not true reference counting."""
        p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True)
        c1 = p.connect()
        c2 = p.connect()
        assert c1 is c2
        c1.close()
        c2 = p.connect()
        c2.close()
        self.assert_(p.checkedout() != 0)
 
        c2.close()
        self.assert_(p.checkedout() == 0)
 
    def test_recycle(self):
        p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 1, max_overflow = 0, use_threadlocal = False, recycle=3)
 
        c1 = p.connect()
        c_id = id(c1.connection)
        c1.close()
        c2 = p.connect()
        assert id(c2.connection) == c_id
        c2.close()
        time.sleep(4)
        c3= p.connect()
        assert id(c3.connection) != c_id
 
    def test_invalidate(self):
        dbapi = MockDBAPI()
        p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False)
        c1 = p.connect()
        c_id = c1.connection.id
        c1.close(); c1=None
        c1 = p.connect()
        assert c1.connection.id == c_id
        c1.invalidate()
        c1 = None
 
        c1 = p.connect()
        assert c1.connection.id != c_id
 
    def test_recreate(self):
        dbapi = MockDBAPI()
        p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False)
        p2 = p.recreate()
        assert p2.size() == 1
        assert p2._use_threadlocal is False
        assert p2._max_overflow == 0
 
    def test_reconnect(self):
        """tests reconnect operations at the pool level.  SA's engine/dialect includes another
        layer of reconnect support for 'database was lost' errors."""
        
        dbapi = MockDBAPI()
        p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False)
        c1 = p.connect()
        c_id = c1.connection.id
        c1.close(); c1=None
 
        c1 = p.connect()
        assert c1.connection.id == c_id
        dbapi.raise_error = True
        c1.invalidate()
        c1 = None
 
        c1 = p.connect()
        assert c1.connection.id != c_id
 
    def test_detach(self):
        dbapi = MockDBAPI()
        p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False)
 
        c1 = p.connect()
        c1.detach()
        c_id = c1.connection.id
 
        c2 = p.connect()
        assert c2.connection.id != c1.connection.id
        dbapi.raise_error = True
 
        c2.invalidate()
        c2 = None
 
        c2 = p.connect()
        assert c2.connection.id != c1.connection.id
 
        con = c1.connection
 
        assert not con.closed
        c1.close()
        assert con.closed
 
    def test_threadfairy(self):
        p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True)
        c1 = p.connect()
        c1.close()
        c2 = p.connect()
        assert c2.connection is not None

class SingletonThreadPoolTest(PoolTestBase):
    def test_cleanup(self):
        """test that the pool's connections are OK after cleanup() has been called."""
        
        p = pool.SingletonThreadPool(creator = mock_dbapi.connect, pool_size=3)
        
        def checkout():
            for x in xrange(10):
                c = p.connect()
                assert c
                c.cursor()
                c.close()
                
                time.sleep(.1)
                
        threads = []
        for i in xrange(10):
            th = threading.Thread(target=checkout)
            th.start()
            threads.append(th)
        for th in threads:
            th.join()
            
        assert len(p._all_conns) == 3

class NullPoolTest(PoolTestBase):
    def test_reconnect(self):
        dbapi = MockDBAPI()
        p = pool.NullPool(creator = lambda: dbapi.connect('foo.db'))
        c1 = p.connect()
        c_id = c1.connection.id
        c1.close(); c1=None

        c1 = p.connect()
        dbapi.raise_error = True
        c1.invalidate()
        c1 = None

        c1 = p.connect()
        assert c1.connection.id != c_id


class StaticPoolTest(PoolTestBase):
    def test_recreate(self):
        dbapi = MockDBAPI()
        creator = lambda: dbapi.connect('foo.db')
        p = pool.StaticPool(creator)
        p2 = p.recreate()
        assert p._creator is p2._creator
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.