summaryrefslogtreecommitdiff
path: root/test/engine/test_reconnect.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2010-07-11 13:15:51 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2010-07-11 13:15:51 -0400
commita6b62cc3fed5f06d3428b1f6ee13756175ded61b (patch)
tree6483de311229fae0b40e9caf36b579b4f1c58ff2 /test/engine/test_reconnect.py
parent5cce6bf2a86cb318d80953cd3712fcb77bbc52db (diff)
downloadsqlalchemy-a6b62cc3fed5f06d3428b1f6ee13756175ded61b.tar.gz
Python-tidy test/engine and test/aaa_profiling, 80% auto + 20% manual intervention
Diffstat (limited to 'test/engine/test_reconnect.py')
-rw-r--r--test/engine/test_reconnect.py86
1 files changed, 41 insertions, 45 deletions
diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py
index dddc445ac..b92c88066 100644
--- a/test/engine/test_reconnect.py
+++ b/test/engine/test_reconnect.py
@@ -62,28 +62,33 @@ class MockReconnectTest(TestBase):
db.dialect.is_disconnect = lambda e: isinstance(e, MockDisconnect)
def test_reconnect(self):
- """test that an 'is_disconnect' condition will invalidate the connection, and additionally
- dispose the previous connection pool and recreate."""
-
+ """test that an 'is_disconnect' condition will invalidate the
+ connection, and additionally dispose the previous connection
+ pool and recreate."""
pid = id(db.pool)
# make a connection
+
conn = db.connect()
# connection works
+
conn.execute(select([1]))
- # create a second connection within the pool, which we'll ensure also goes away
+ # create a second connection within the pool, which we'll ensure
+ # also goes away
+
conn2 = db.connect()
conn2.close()
# two connections opened total now
+
assert len(dbapi.connections) == 2
# set it to fail
- dbapi.shutdown()
+ dbapi.shutdown()
try:
conn.execute(select([1]))
assert False
@@ -91,19 +96,20 @@ class MockReconnectTest(TestBase):
pass
# assert was invalidated
+
assert not conn.closed
assert conn.invalidated
# close shouldnt break
- conn.close()
+ conn.close()
assert id(db.pool) != pid
# ensure all connections closed (pool was recycled)
+
gc_collect()
assert len(dbapi.connections) == 0
-
- conn =db.connect()
+ conn = db.connect()
conn.execute(select([1]))
conn.close()
assert len(dbapi.connections) == 1
@@ -112,7 +118,6 @@ class MockReconnectTest(TestBase):
conn = db.connect()
trans = conn.begin()
dbapi.shutdown()
-
try:
conn.execute(select([1]))
assert False
@@ -120,34 +125,32 @@ class MockReconnectTest(TestBase):
pass
# assert was invalidated
+
gc_collect()
assert len(dbapi.connections) == 0
assert not conn.closed
assert conn.invalidated
assert trans.is_active
-
try:
conn.execute(select([1]))
assert False
except tsa.exc.InvalidRequestError, e:
- assert str(e) == "Can't reconnect until invalid transaction is rolled back"
-
+ assert str(e) \
+ == "Can't reconnect until invalid transaction is "\
+ "rolled back"
assert trans.is_active
-
try:
trans.commit()
assert False
except tsa.exc.InvalidRequestError, e:
- assert str(e) == "Can't reconnect until invalid transaction is rolled back"
-
+ assert str(e) \
+ == "Can't reconnect until invalid transaction is "\
+ "rolled back"
assert trans.is_active
-
trans.rollback()
assert not trans.is_active
-
conn.execute(select([1]))
assert not conn.invalidated
-
assert len(dbapi.connections) == 1
def test_conn_reusable(self):
@@ -256,7 +259,8 @@ class RealReconnectTest(TestBase):
conn.close()
def test_null_pool(self):
- engine = engines.reconnecting_engine(options=dict(poolclass=pool.NullPool))
+ engine = \
+ engines.reconnecting_engine(options=dict(poolclass=pool.NullPool))
conn = engine.connect()
eq_(conn.execute(select([1])).scalar(), 1)
assert not conn.closed
@@ -292,60 +296,52 @@ class RealReconnectTest(TestBase):
def test_with_transaction(self):
conn = engine.connect()
-
trans = conn.begin()
-
eq_(conn.execute(select([1])).scalar(), 1)
assert not conn.closed
-
engine.test_shutdown()
-
try:
conn.execute(select([1]))
assert False
except tsa.exc.DBAPIError, e:
if not e.connection_invalidated:
raise
-
assert not conn.closed
assert conn.invalidated
assert trans.is_active
-
try:
conn.execute(select([1]))
assert False
except tsa.exc.InvalidRequestError, e:
- assert str(e) == "Can't reconnect until invalid transaction is rolled back"
-
+ assert str(e) \
+ == "Can't reconnect until invalid transaction is "\
+ "rolled back"
assert trans.is_active
-
try:
trans.commit()
assert False
except tsa.exc.InvalidRequestError, e:
- assert str(e) == "Can't reconnect until invalid transaction is rolled back"
-
+ assert str(e) \
+ == "Can't reconnect until invalid transaction is "\
+ "rolled back"
assert trans.is_active
-
trans.rollback()
assert not trans.is_active
-
assert conn.invalidated
eq_(conn.execute(select([1])).scalar(), 1)
assert not conn.invalidated
class RecycleTest(TestBase):
+
def test_basic(self):
- for threadlocal in (False, True):
- engine = engines.reconnecting_engine(options={'pool_recycle':1, 'pool_threadlocal':threadlocal})
-
+ for threadlocal in False, True:
+ engine = engines.reconnecting_engine(options={'pool_recycle'
+ : 1, 'pool_threadlocal': threadlocal})
conn = engine.contextual_connect()
eq_(conn.execute(select([1])).scalar(), 1)
conn.close()
-
engine.test_shutdown()
time.sleep(2)
-
conn = engine.contextual_connect()
eq_(conn.execute(select([1])).scalar(), 1)
conn.close()
@@ -368,22 +364,22 @@ class InvalidateDuringResultTest(TestBase):
meta.drop_all()
engine.dispose()
- @testing.fails_on('+mysqldb', "Buffers the result set and doesn't check for connection close")
- @testing.fails_on('+pg8000', "Buffers the result set and doesn't check for connection close")
+ @testing.fails_on('+mysqldb',
+ "Buffers the result set and doesn't check for "
+ "connection close")
+ @testing.fails_on('+pg8000',
+ "Buffers the result set and doesn't check for "
+ "connection close")
def test_invalidate_on_results(self):
conn = engine.connect()
-
- result = conn.execute("select * from sometable")
+ result = conn.execute('select * from sometable')
for x in xrange(20):
result.fetchone()
-
engine.test_shutdown()
try:
- print "ghost result: %r" % result.fetchone()
+ print 'ghost result: %r' % result.fetchone()
assert False
except tsa.exc.DBAPIError, e:
if not e.connection_invalidated:
raise
-
assert conn.invalidated
-