summaryrefslogtreecommitdiff
path: root/test/engine/test_pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_pool.py')
-rw-r--r--test/engine/test_pool.py158
1 files changed, 84 insertions, 74 deletions
diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py
index 6b0b187e6..9db65d2ab 100644
--- a/test/engine/test_pool.py
+++ b/test/engine/test_pool.py
@@ -73,12 +73,13 @@ class PoolTest(PoolTestBase):
self.assert_(connection.cursor() is not None)
self.assert_(connection is not connection2)
- @testing.fails_on('+pyodbc', "pyodbc cursor doesn't implement tuple __eq__")
+ @testing.fails_on('+pyodbc',
+ "pyodbc cursor doesn't implement tuple __eq__")
def test_cursor_iterable(self):
conn = testing.db.raw_connection()
cursor = conn.cursor()
cursor.execute(str(select([1], bind=testing.db)))
- expected = [(1,)]
+ expected = [(1, )]
for row in cursor:
eq_(row, expected.pop(0))
@@ -108,10 +109,11 @@ class PoolTest(PoolTestBase):
self._do_testthreadlocal(useclose=True)
def _do_testthreadlocal(self, useclose=False):
- for p in (
- pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True),
- pool.SingletonThreadPool(creator = mock_dbapi.connect, use_threadlocal = True)
- ):
+ for p in pool.QueuePool(creator=mock_dbapi.connect,
+ pool_size=3, max_overflow=-1,
+ use_threadlocal=True), \
+ pool.SingletonThreadPool(creator=mock_dbapi.connect,
+ use_threadlocal=True):
c1 = p.connect()
c2 = p.connect()
self.assert_(c1 is c2)
@@ -129,7 +131,6 @@ class PoolTest(PoolTestBase):
else:
c2 = None
lazy_gc()
-
if useclose:
c1 = p.connect()
c2 = p.connect()
@@ -138,13 +139,13 @@ class PoolTest(PoolTestBase):
c2.close()
self.assert_(c1.connection is not None)
c1.close()
-
c1 = c2 = c3 = None
- # extra tests with QueuePool to ensure connections get __del__()ed when dereferenced
+ # extra tests with QueuePool to ensure connections get
+ # __del__()ed when dereferenced
+
if isinstance(p, pool.QueuePool):
lazy_gc()
-
self.assert_(p.checkedout() == 0)
c1 = p.connect()
c2 = p.connect()
@@ -210,7 +211,8 @@ class PoolTest(PoolTestBase):
eq_(len(innerself.first_connected), fconn)
eq_(len(innerself.checked_out), cout)
eq_(len(innerself.checked_in), cin)
- def assert_in(innerself, item, in_conn, in_fconn, in_cout, in_cin):
+ def assert_in(innerself, item, in_conn, in_fconn,
+ in_cout, in_cin):
self.assert_((item in innerself.connected) == in_conn)
self.assert_((item in innerself.first_connected) == in_fconn)
self.assert_((item in innerself.checked_out) == in_cout)
@@ -445,25 +447,28 @@ class QueuePoolTest(PoolTestBase):
self._do_testqueuepool(useclose=True)
def _do_testqueuepool(self, useclose=False):
- p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = False)
+ p = pool.QueuePool(creator=mock_dbapi.connect, pool_size=3,
+ max_overflow=-1, use_threadlocal=False)
def status(pool):
- tup = (pool.size(), pool.checkedin(), pool.overflow(), pool.checkedout())
- print "Pool size: %d Connections in pool: %d Current Overflow: %d Current Checked out connections: %d" % tup
+ tup = pool.size(), pool.checkedin(), pool.overflow(), \
+ pool.checkedout()
+ print 'Pool size: %d Connections in pool: %d Current '\
+ 'Overflow: %d Current Checked out connections: %d' % tup
return tup
c1 = p.connect()
- self.assert_(status(p) == (3,0,-2,1))
+ self.assert_(status(p) == (3, 0, -2, 1))
c2 = p.connect()
- self.assert_(status(p) == (3,0,-1,2))
+ self.assert_(status(p) == (3, 0, -1, 2))
c3 = p.connect()
- self.assert_(status(p) == (3,0,0,3))
+ self.assert_(status(p) == (3, 0, 0, 3))
c4 = p.connect()
- self.assert_(status(p) == (3,0,1,4))
+ self.assert_(status(p) == (3, 0, 1, 4))
c5 = p.connect()
- self.assert_(status(p) == (3,0,2,5))
+ self.assert_(status(p) == (3, 0, 2, 5))
c6 = p.connect()
- self.assert_(status(p) == (3,0,3,6))
+ self.assert_(status(p) == (3, 0, 3, 6))
if useclose:
c4.close()
c3.close()
@@ -471,8 +476,7 @@ class QueuePoolTest(PoolTestBase):
else:
c4 = c3 = c2 = None
lazy_gc()
-
- self.assert_(status(p) == (3,3,3,3))
+ self.assert_(status(p) == (3, 3, 3, 3))
if useclose:
c1.close()
c5.close()
@@ -480,9 +484,7 @@ class QueuePoolTest(PoolTestBase):
else:
c1 = c5 = c6 = None
lazy_gc()
-
- self.assert_(status(p) == (3,3,0,0))
-
+ self.assert_(status(p) == (3, 3, 0, 0))
c1 = p.connect()
c2 = p.connect()
self.assert_(status(p) == (3, 1, 0, 2), status(p))
@@ -491,16 +493,15 @@ class QueuePoolTest(PoolTestBase):
else:
c2 = None
lazy_gc()
-
- self.assert_(status(p) == (3, 2, 0, 1))
-
+ self.assert_(status(p) == (3, 2, 0, 1))
c1.close()
-
lazy_gc()
assert not pool._refs
def test_timeout(self):
- p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = 0, use_threadlocal = False, timeout=2)
+ p = pool.QueuePool(creator=mock_dbapi.connect, pool_size=3,
+ max_overflow=0, use_threadlocal=False,
+ timeout=2)
c1 = p.connect()
c2 = p.connect()
c3 = p.connect()
@@ -591,7 +592,8 @@ class QueuePoolTest(PoolTestBase):
self._test_overflow(40, 5)
def test_mixed_close(self):
- p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True)
+ p = pool.QueuePool(creator=mock_dbapi.connect, pool_size=3,
+ max_overflow=-1, use_threadlocal=True)
c1 = p.connect()
c2 = p.connect()
assert c1 is c2
@@ -601,12 +603,12 @@ class QueuePoolTest(PoolTestBase):
c1 = None
lazy_gc()
assert p.checkedout() == 0
-
lazy_gc()
assert not pool._refs
-
+
def test_weakref_kaboom(self):
- p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True)
+ p = pool.QueuePool(creator=mock_dbapi.connect, pool_size=3,
+ max_overflow=-1, use_threadlocal=True)
c1 = p.connect()
c2 = p.connect()
c1.close()
@@ -617,12 +619,16 @@ class QueuePoolTest(PoolTestBase):
assert p.checkedout() == 0
c3 = p.connect()
assert c3 is not None
-
+
def test_trick_the_counter(self):
- """this is a "flaw" in the connection pool; since threadlocal uses a single ConnectionFairy per thread
- with an open/close counter, you can fool the counter into giving you a ConnectionFairy with an
- ambiguous counter. i.e. its not true reference counting."""
- p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True)
+ """this is a "flaw" in the connection pool; since threadlocal
+ uses a single ConnectionFairy per thread with an open/close
+ counter, you can fool the counter into giving you a
+ ConnectionFairy with an ambiguous counter. i.e. its not true
+ reference counting."""
+
+ p = pool.QueuePool(creator=mock_dbapi.connect, pool_size=3,
+ max_overflow=-1, use_threadlocal=True)
c1 = p.connect()
c2 = p.connect()
assert c1 is c2
@@ -630,13 +636,13 @@ class QueuePoolTest(PoolTestBase):
c2 = p.connect()
c2.close()
self.assert_(p.checkedout() != 0)
-
c2.close()
self.assert_(p.checkedout() == 0)
def test_recycle(self):
- p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 1, max_overflow = 0, use_threadlocal = False, recycle=3)
-
+ p = pool.QueuePool(creator=mock_dbapi.connect, pool_size=1,
+ max_overflow=0, use_threadlocal=False,
+ recycle=3)
c1 = p.connect()
c_id = id(c1.connection)
c1.close()
@@ -644,96 +650,101 @@ class QueuePoolTest(PoolTestBase):
assert id(c2.connection) == c_id
c2.close()
time.sleep(4)
- c3= p.connect()
+ c3 = p.connect()
assert id(c3.connection) != c_id
-
+
def test_invalidate(self):
dbapi = MockDBAPI()
- p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False)
+ p = pool.QueuePool(creator=lambda : dbapi.connect('foo.db'),
+ pool_size=1, max_overflow=0,
+ use_threadlocal=False)
c1 = p.connect()
c_id = c1.connection.id
- c1.close(); c1=None
+ c1.close()
+ c1 = None
c1 = p.connect()
assert c1.connection.id == c_id
c1.invalidate()
c1 = None
-
c1 = p.connect()
assert c1.connection.id != c_id
-
+
def test_recreate(self):
dbapi = MockDBAPI()
- p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False)
+ p = pool.QueuePool(creator=lambda : dbapi.connect('foo.db'),
+ pool_size=1, max_overflow=0,
+ use_threadlocal=False)
p2 = p.recreate()
assert p2.size() == 1
assert p2._use_threadlocal is False
assert p2._max_overflow == 0
-
+
def test_reconnect(self):
- """tests reconnect operations at the pool level. SA's engine/dialect includes another
- layer of reconnect support for 'database was lost' errors."""
-
+ """tests reconnect operations at the pool level. SA's
+ engine/dialect includes another layer of reconnect support for
+ 'database was lost' errors."""
+
dbapi = MockDBAPI()
- p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False)
+ p = pool.QueuePool(creator=lambda : dbapi.connect('foo.db'),
+ pool_size=1, max_overflow=0,
+ use_threadlocal=False)
c1 = p.connect()
c_id = c1.connection.id
- c1.close(); c1=None
-
+ c1.close()
+ c1 = None
c1 = p.connect()
assert c1.connection.id == c_id
dbapi.raise_error = True
c1.invalidate()
c1 = None
-
c1 = p.connect()
assert c1.connection.id != c_id
-
+
def test_detach(self):
dbapi = MockDBAPI()
- p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False)
-
+ p = pool.QueuePool(creator=lambda : dbapi.connect('foo.db'),
+ pool_size=1, max_overflow=0,
+ use_threadlocal=False)
c1 = p.connect()
c1.detach()
c_id = c1.connection.id
-
c2 = p.connect()
assert c2.connection.id != c1.connection.id
dbapi.raise_error = True
-
c2.invalidate()
c2 = None
-
c2 = p.connect()
assert c2.connection.id != c1.connection.id
-
con = c1.connection
-
assert not con.closed
c1.close()
assert con.closed
-
+
def test_threadfairy(self):
- p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True)
+ p = pool.QueuePool(creator=mock_dbapi.connect, pool_size=3,
+ max_overflow=-1, use_threadlocal=True)
c1 = p.connect()
c1.close()
c2 = p.connect()
assert c2.connection is not None
class SingletonThreadPoolTest(PoolTestBase):
+
def test_cleanup(self):
- """test that the pool's connections are OK after cleanup() has been called."""
-
- p = pool.SingletonThreadPool(creator = mock_dbapi.connect, pool_size=3)
-
+ """test that the pool's connections are OK after cleanup() has
+ been called."""
+
+ p = pool.SingletonThreadPool(creator=mock_dbapi.connect,
+ pool_size=3)
+
def checkout():
for x in xrange(10):
c = p.connect()
assert c
c.cursor()
c.close()
-
time.sleep(.1)
-
+
threads = []
for i in xrange(10):
th = threading.Thread(target=checkout)
@@ -741,7 +752,6 @@ class SingletonThreadPoolTest(PoolTestBase):
threads.append(th)
for th in threads:
th.join()
-
assert len(p._all_conns) == 3
class NullPoolTest(PoolTestBase):