summaryrefslogtreecommitdiff
path: root/test/engine/test_reconnect.py
diff options
context:
space:
mode:
authorTony Locke <tlocke@tlocke.org.uk>2014-07-26 18:15:06 +0100
committerTony Locke <tlocke@tlocke.org.uk>2014-08-02 15:29:36 +0100
commitf586754c864056a7739ef515b8aece3001a377fc (patch)
tree7783cb5b857d1459d9e04787fa3554263ef113d2 /test/engine/test_reconnect.py
parentc2a00153f1a8d4591de6ebe4f75e8595d7193226 (diff)
downloadsqlalchemy-f586754c864056a7739ef515b8aece3001a377fc.tar.gz
PEP8 tidy of test/engine/test_reconnect
Diffstat (limited to 'test/engine/test_reconnect.py')
-rw-r--r--test/engine/test_reconnect.py110
1 files changed, 52 insertions, 58 deletions
diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py
index f92b874da..c82cca5a1 100644
--- a/test/engine/test_reconnect.py
+++ b/test/engine/test_reconnect.py
@@ -1,23 +1,24 @@
from sqlalchemy.testing import eq_, assert_raises, assert_raises_message
import time
-from sqlalchemy import select, MetaData, Integer, String, create_engine, pool
+from sqlalchemy import (
+ select, MetaData, Integer, String, create_engine, pool, exc, util)
from sqlalchemy.testing.schema import Table, Column
import sqlalchemy as tsa
from sqlalchemy import testing
from sqlalchemy.testing import engines
-from sqlalchemy.testing.util import gc_collect
-from sqlalchemy import exc, util
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.engines import testing_engine
-from sqlalchemy.testing import is_not_
from sqlalchemy.testing.mock import Mock, call
+
class MockError(Exception):
pass
+
class MockDisconnect(MockError):
pass
+
def mock_connection():
def mock_cursor():
def execute(*args, **kwargs):
@@ -25,10 +26,12 @@ def mock_connection():
raise MockDisconnect("Lost the DB connection on execute")
elif conn.explode in ('execute_no_disconnect', ):
raise MockError(
- "something broke on execute but we didn't lose the connection")
+ "something broke on execute but we didn't lose the "
+ "connection")
elif conn.explode in ('rollback', 'rollback_no_disconnect'):
raise MockError(
- "something broke on execute but we didn't lose the connection")
+ "something broke on execute but we didn't lose the "
+ "connection")
elif args and "SELECT" in args[0]:
cursor.description = [('foo', None, None, None, None, None)]
else:
@@ -38,9 +41,8 @@ def mock_connection():
cursor.fetchall = cursor.fetchone = \
Mock(side_effect=MockError("cursor closed"))
cursor = Mock(
- execute=Mock(side_effect=execute),
- close=Mock(side_effect=close)
- )
+ execute=Mock(side_effect=execute),
+ close=Mock(side_effect=close))
return cursor
def cursor():
@@ -52,18 +54,20 @@ def mock_connection():
raise MockDisconnect("Lost the DB connection on rollback")
if conn.explode == 'rollback_no_disconnect':
raise MockError(
- "something broke on rollback but we didn't lose the connection")
+ "something broke on rollback but we didn't lose the "
+ "connection")
else:
return
conn = Mock(
- rollback=Mock(side_effect=rollback),
- cursor=Mock(side_effect=cursor())
- )
+ rollback=Mock(side_effect=rollback),
+ cursor=Mock(side_effect=cursor()))
return conn
+
def MockDBAPI():
connections = []
+
def connect():
while True:
conn = mock_connection()
@@ -80,13 +84,12 @@ def MockDBAPI():
connections[:] = []
return Mock(
- connect=Mock(side_effect=connect()),
- shutdown=Mock(side_effect=shutdown),
- dispose=Mock(side_effect=dispose),
- paramstyle='named',
- connections=connections,
- Error=MockError
- )
+ connect=Mock(side_effect=connect()),
+ shutdown=Mock(side_effect=shutdown),
+ dispose=Mock(side_effect=dispose),
+ paramstyle='named',
+ connections=connections,
+ Error=MockError)
class MockReconnectTest(fixtures.TestBase):
@@ -94,13 +97,14 @@ class MockReconnectTest(fixtures.TestBase):
self.dbapi = MockDBAPI()
self.db = testing_engine(
- 'postgresql://foo:bar@localhost/test',
- options=dict(module=self.dbapi, _initialize=False))
+ 'postgresql://foo:bar@localhost/test',
+ options=dict(module=self.dbapi, _initialize=False))
- self.mock_connect = call(host='localhost', password='bar',
- user='foo', database='test')
+ self.mock_connect = call(
+ host='localhost', password='bar', user='foo', database='test')
# monkeypatch disconnect checker
- self.db.dialect.is_disconnect = lambda e, conn, cursor: isinstance(e, MockDisconnect)
+ self.db.dialect.is_disconnect = \
+ lambda e, conn, cursor: isinstance(e, MockDisconnect)
def teardown(self):
self.dbapi.dispose()
@@ -194,10 +198,8 @@ class MockReconnectTest(fixtures.TestBase):
assert_raises_message(
tsa.exc.InvalidRequestError,
- "Can't reconnect until invalid transaction is "
- "rolled back",
- trans.commit
- )
+ "Can't reconnect until invalid transaction is rolled back",
+ trans.commit)
assert trans.is_active
trans.rollback()
@@ -351,16 +353,16 @@ class MockReconnectTest(fixtures.TestBase):
)
def test_dialect_initialize_once(self):
- from sqlalchemy.engine.base import Engine
from sqlalchemy.engine.url import URL
from sqlalchemy.engine.default import DefaultDialect
- from sqlalchemy.pool import QueuePool
dbapi = self.dbapi
mock_dialect = Mock()
+
class MyURL(URL):
def get_dialect(self):
return Dialect
+
class Dialect(DefaultDialect):
initialize = Mock()
@@ -371,7 +373,6 @@ class MockReconnectTest(fixtures.TestBase):
eq_(Dialect.initialize.call_count, 1)
-
class CursorErrTest(fixtures.TestBase):
# this isn't really a "reconnect" test, it's more of
# a generic "recovery". maybe this test suite should have been
@@ -394,29 +395,24 @@ class CursorErrTest(fixtures.TestBase):
description=[],
close=Mock(side_effect=Exception("explode")),
)
+
def connect():
while True:
yield Mock(
- spec=['cursor', 'commit', 'rollback', 'close'],
- cursor=Mock(side_effect=cursor()),
- )
+ spec=['cursor', 'commit', 'rollback', 'close'],
+ cursor=Mock(side_effect=cursor()),)
return Mock(
- Error = DBAPIError,
- paramstyle='qmark',
- connect=Mock(side_effect=connect())
- )
+ Error=DBAPIError, paramstyle='qmark',
+ connect=Mock(side_effect=connect()))
dbapi = MockDBAPI()
from sqlalchemy.engine import default
url = Mock(
- get_dialect=lambda: default.DefaultDialect,
- translate_connect_args=lambda: {},
- query={},
- )
+ get_dialect=lambda: default.DefaultDialect,
+ translate_connect_args=lambda: {}, query={},)
eng = testing_engine(
- url,
- options=dict(module=dbapi, _initialize=initialize))
+ url, options=dict(module=dbapi, _initialize=initialize))
eng.pool.logger = Mock()
return eng
@@ -508,7 +504,6 @@ class RealReconnectTest(fixtures.TestBase):
# pool isn't replaced
assert self.engine.pool is p2
-
def test_ensure_is_disconnect_gets_connection(self):
def is_disconnect(e, conn, cursor):
# connection is still present
@@ -556,6 +551,7 @@ class RealReconnectTest(fixtures.TestBase):
"Crashes on py3k+cx_oracle")
def test_explode_in_initializer(self):
engine = engines.testing_engine()
+
def broken_initialize(connection):
connection.execute("select fake_stuff from _fake_table")
@@ -569,6 +565,7 @@ class RealReconnectTest(fixtures.TestBase):
"Crashes on py3k+cx_oracle")
def test_explode_in_initializer_disconnect(self):
engine = engines.testing_engine()
+
def broken_initialize(connection):
connection.execute("select fake_stuff from _fake_table")
@@ -584,7 +581,6 @@ class RealReconnectTest(fixtures.TestBase):
# invalidate() also doesn't screw up
assert_raises(exc.DBAPIError, engine.connect)
-
def test_null_pool(self):
engine = \
engines.reconnecting_engine(options=dict(poolclass=pool.NullPool))
@@ -623,10 +619,8 @@ class RealReconnectTest(fixtures.TestBase):
assert trans.is_active
assert_raises_message(
tsa.exc.StatementError,
- "Can't reconnect until invalid transaction is "\
- "rolled back",
- conn.execute, select([1])
- )
+ "Can't reconnect until invalid transaction is rolled back",
+ conn.execute, select([1]))
assert trans.is_active
assert_raises_message(
tsa.exc.InvalidRequestError,
@@ -640,13 +634,14 @@ class RealReconnectTest(fixtures.TestBase):
eq_(conn.execute(select([1])).scalar(), 1)
assert not conn.invalidated
+
class RecycleTest(fixtures.TestBase):
__backend__ = True
def test_basic(self):
for threadlocal in False, True:
engine = engines.reconnecting_engine(
- options={'pool_threadlocal': threadlocal})
+ options={'pool_threadlocal': threadlocal})
conn = engine.contextual_connect()
eq_(conn.execute(select([1])).scalar(), 1)
@@ -671,13 +666,15 @@ class RecycleTest(fixtures.TestBase):
eq_(conn.execute(select([1])).scalar(), 1)
conn.close()
+
class InvalidateDuringResultTest(fixtures.TestBase):
__backend__ = True
def setup(self):
self.engine = engines.reconnecting_engine()
self.meta = MetaData(self.engine)
- table = Table('sometable', self.meta,
+ table = Table(
+ 'sometable', self.meta,
Column('id', Integer, primary_key=True),
Column('name', String(50)))
self.meta.create_all()
@@ -690,10 +687,8 @@ class InvalidateDuringResultTest(fixtures.TestBase):
self.engine.dispose()
@testing.fails_if([
- '+mysqlconnector', '+mysqldb',
- '+cymysql', '+pymysql', '+pg8000'
- ], "Buffers the result set and doesn't check for "
- "connection close")
+ '+mysqlconnector', '+mysqldb', '+cymysql', '+pymysql', '+pg8000'],
+ "Buffers the result set and doesn't check for connection close")
def test_invalidate_on_results(self):
conn = self.engine.connect()
result = conn.execute('select * from sometable')
@@ -702,4 +697,3 @@ class InvalidateDuringResultTest(fixtures.TestBase):
self.engine.test_shutdown()
_assert_invalidated(result.fetchone)
assert conn.invalidated
-