summaryrefslogtreecommitdiff
path: root/test/engine/reconnect.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/reconnect.py')
-rw-r--r--test/engine/reconnect.py80
1 files changed, 40 insertions, 40 deletions
diff --git a/test/engine/reconnect.py b/test/engine/reconnect.py
index 40b838e3f..2a18fca2c 100644
--- a/test/engine/reconnect.py
+++ b/test/engine/reconnect.py
@@ -1,4 +1,4 @@
-import testbase
+import testenv; testenv.configure_for_tests()
import sys, weakref
from sqlalchemy import create_engine, exceptions, select
from testlib import *
@@ -17,7 +17,7 @@ class MockDBAPI(object):
for c in self.connections:
c.explode[0] = True
Error = MockDisconnect
-
+
class MockConnection(object):
def __init__(self, dbapi):
dbapi.connections[self] = True
@@ -30,7 +30,7 @@ class MockConnection(object):
return MockCursor(self)
def close(self):
pass
-
+
class MockCursor(object):
def __init__(self, parent):
self.explode = parent.explode
@@ -42,31 +42,31 @@ class MockCursor(object):
return
def close(self):
pass
-
+
class MockReconnectTest(PersistTest):
def setUp(self):
global db, dbapi
dbapi = MockDBAPI()
-
+
# create engine using our current dburi
db = create_engine('postgres://foo:bar@localhost/test', module=dbapi)
-
+
# monkeypatch disconnect checker
db.dialect.is_disconnect = lambda e: isinstance(e, MockDisconnect)
-
+
def test_reconnect(self):
"""test that an 'is_disconnect' condition will invalidate the connection, and additionally
dispose the previous connection pool and recreate."""
-
-
+
+
pid = id(db.pool)
-
+
# make a connection
conn = db.connect()
-
+
# connection works
conn.execute(select([1]))
-
+
# create a second connection within the pool, which we'll ensure also goes away
conn2 = db.connect()
conn2.close()
@@ -82,24 +82,24 @@ class MockReconnectTest(PersistTest):
assert False
except exceptions.DBAPIError:
pass
-
+
# assert was invalidated
assert not conn.closed
assert conn.invalidated
-
+
# close shouldnt break
conn.close()
assert id(db.pool) != pid
-
+
# ensure all connections closed (pool was recycled)
assert len(dbapi.connections) == 0
-
+
conn =db.connect()
conn.execute(select([1]))
conn.close()
assert len(dbapi.connections) == 1
-
+
def test_invalidate_trans(self):
conn = db.connect()
trans = conn.begin()
@@ -110,7 +110,7 @@ class MockReconnectTest(PersistTest):
assert False
except exceptions.DBAPIError:
pass
-
+
# assert was invalidated
assert len(dbapi.connections) == 0
assert not conn.closed
@@ -135,19 +135,19 @@ class MockReconnectTest(PersistTest):
trans.rollback()
assert not trans.is_active
-
+
conn.execute(select([1]))
assert not conn.invalidated
-
+
assert len(dbapi.connections) == 1
-
+
def test_conn_reusable(self):
conn = db.connect()
-
+
conn.execute(select([1]))
assert len(dbapi.connections) == 1
-
+
dbapi.shutdown()
# raises error
@@ -162,25 +162,25 @@ class MockReconnectTest(PersistTest):
# ensure all connections closed (pool was recycled)
assert len(dbapi.connections) == 0
-
+
# test reconnects
conn.execute(select([1]))
assert not conn.invalidated
assert len(dbapi.connections) == 1
-
+
class RealReconnectTest(PersistTest):
def setUp(self):
global engine
engine = engines.reconnecting_engine()
-
+
def tearDown(self):
engine.dispose()
-
+
def test_reconnect(self):
conn = engine.connect()
- self.assertEquals(conn.execute(select([1])).scalar(), 1)
+ self.assertEquals(conn.execute(select([1])).scalar(), 1)
assert not conn.closed
engine.test_shutdown()
@@ -196,7 +196,7 @@ class RealReconnectTest(PersistTest):
assert conn.invalidated
assert conn.invalidated
- self.assertEquals(conn.execute(select([1])).scalar(), 1)
+ self.assertEquals(conn.execute(select([1])).scalar(), 1)
assert not conn.invalidated
# one more time
@@ -208,14 +208,14 @@ class RealReconnectTest(PersistTest):
if not e.connection_invalidated:
raise
assert conn.invalidated
- self.assertEquals(conn.execute(select([1])).scalar(), 1)
+ self.assertEquals(conn.execute(select([1])).scalar(), 1)
assert not conn.invalidated
conn.close()
-
+
def test_close(self):
conn = engine.connect()
- self.assertEquals(conn.execute(select([1])).scalar(), 1)
+ self.assertEquals(conn.execute(select([1])).scalar(), 1)
assert not conn.closed
engine.test_shutdown()
@@ -229,14 +229,14 @@ class RealReconnectTest(PersistTest):
conn.close()
conn = engine.connect()
- self.assertEquals(conn.execute(select([1])).scalar(), 1)
-
+ self.assertEquals(conn.execute(select([1])).scalar(), 1)
+
def test_with_transaction(self):
conn = engine.connect()
trans = conn.begin()
- self.assertEquals(conn.execute(select([1])).scalar(), 1)
+ self.assertEquals(conn.execute(select([1])).scalar(), 1)
assert not conn.closed
engine.test_shutdown()
@@ -267,14 +267,14 @@ class RealReconnectTest(PersistTest):
assert str(e) == "Can't reconnect until invalid transaction is rolled back"
assert trans.is_active
-
+
trans.rollback()
assert not trans.is_active
assert conn.invalidated
- self.assertEquals(conn.execute(select([1])).scalar(), 1)
+ self.assertEquals(conn.execute(select([1])).scalar(), 1)
assert not conn.invalidated
-
-
+
+
if __name__ == '__main__':
- testbase.main()
+ testenv.main()