diff options
Diffstat (limited to 'test/engine/reconnect.py')
-rw-r--r-- | test/engine/reconnect.py | 80 |
1 files changed, 40 insertions, 40 deletions
diff --git a/test/engine/reconnect.py b/test/engine/reconnect.py index 40b838e3f..2a18fca2c 100644 --- a/test/engine/reconnect.py +++ b/test/engine/reconnect.py @@ -1,4 +1,4 @@ -import testbase +import testenv; testenv.configure_for_tests() import sys, weakref from sqlalchemy import create_engine, exceptions, select from testlib import * @@ -17,7 +17,7 @@ class MockDBAPI(object): for c in self.connections: c.explode[0] = True Error = MockDisconnect - + class MockConnection(object): def __init__(self, dbapi): dbapi.connections[self] = True @@ -30,7 +30,7 @@ class MockConnection(object): return MockCursor(self) def close(self): pass - + class MockCursor(object): def __init__(self, parent): self.explode = parent.explode @@ -42,31 +42,31 @@ class MockCursor(object): return def close(self): pass - + class MockReconnectTest(PersistTest): def setUp(self): global db, dbapi dbapi = MockDBAPI() - + # create engine using our current dburi db = create_engine('postgres://foo:bar@localhost/test', module=dbapi) - + # monkeypatch disconnect checker db.dialect.is_disconnect = lambda e: isinstance(e, MockDisconnect) - + def test_reconnect(self): """test that an 'is_disconnect' condition will invalidate the connection, and additionally dispose the previous connection pool and recreate.""" - - + + pid = id(db.pool) - + # make a connection conn = db.connect() - + # connection works conn.execute(select([1])) - + # create a second connection within the pool, which we'll ensure also goes away conn2 = db.connect() conn2.close() @@ -82,24 +82,24 @@ class MockReconnectTest(PersistTest): assert False except exceptions.DBAPIError: pass - + # assert was invalidated assert not conn.closed assert conn.invalidated - + # close shouldnt break conn.close() assert id(db.pool) != pid - + # ensure all connections closed (pool was recycled) assert len(dbapi.connections) == 0 - + conn =db.connect() conn.execute(select([1])) conn.close() assert len(dbapi.connections) == 1 - + def test_invalidate_trans(self): conn = db.connect() trans = conn.begin() @@ -110,7 +110,7 @@ class MockReconnectTest(PersistTest): assert False except exceptions.DBAPIError: pass - + # assert was invalidated assert len(dbapi.connections) == 0 assert not conn.closed @@ -135,19 +135,19 @@ class MockReconnectTest(PersistTest): trans.rollback() assert not trans.is_active - + conn.execute(select([1])) assert not conn.invalidated - + assert len(dbapi.connections) == 1 - + def test_conn_reusable(self): conn = db.connect() - + conn.execute(select([1])) assert len(dbapi.connections) == 1 - + dbapi.shutdown() # raises error @@ -162,25 +162,25 @@ class MockReconnectTest(PersistTest): # ensure all connections closed (pool was recycled) assert len(dbapi.connections) == 0 - + # test reconnects conn.execute(select([1])) assert not conn.invalidated assert len(dbapi.connections) == 1 - + class RealReconnectTest(PersistTest): def setUp(self): global engine engine = engines.reconnecting_engine() - + def tearDown(self): engine.dispose() - + def test_reconnect(self): conn = engine.connect() - self.assertEquals(conn.execute(select([1])).scalar(), 1) + self.assertEquals(conn.execute(select([1])).scalar(), 1) assert not conn.closed engine.test_shutdown() @@ -196,7 +196,7 @@ class RealReconnectTest(PersistTest): assert conn.invalidated assert conn.invalidated - self.assertEquals(conn.execute(select([1])).scalar(), 1) + self.assertEquals(conn.execute(select([1])).scalar(), 1) assert not conn.invalidated # one more time @@ -208,14 +208,14 @@ class RealReconnectTest(PersistTest): if not e.connection_invalidated: raise assert conn.invalidated - self.assertEquals(conn.execute(select([1])).scalar(), 1) + self.assertEquals(conn.execute(select([1])).scalar(), 1) assert not conn.invalidated conn.close() - + def test_close(self): conn = engine.connect() - self.assertEquals(conn.execute(select([1])).scalar(), 1) + self.assertEquals(conn.execute(select([1])).scalar(), 1) assert not conn.closed engine.test_shutdown() @@ -229,14 +229,14 @@ class RealReconnectTest(PersistTest): conn.close() conn = engine.connect() - self.assertEquals(conn.execute(select([1])).scalar(), 1) - + self.assertEquals(conn.execute(select([1])).scalar(), 1) + def test_with_transaction(self): conn = engine.connect() trans = conn.begin() - self.assertEquals(conn.execute(select([1])).scalar(), 1) + self.assertEquals(conn.execute(select([1])).scalar(), 1) assert not conn.closed engine.test_shutdown() @@ -267,14 +267,14 @@ class RealReconnectTest(PersistTest): assert str(e) == "Can't reconnect until invalid transaction is rolled back" assert trans.is_active - + trans.rollback() assert not trans.is_active assert conn.invalidated - self.assertEquals(conn.execute(select([1])).scalar(), 1) + self.assertEquals(conn.execute(select([1])).scalar(), 1) assert not conn.invalidated - - + + if __name__ == '__main__': - testbase.main() + testenv.main() |