summaryrefslogtreecommitdiff
path: root/test/engine/test_execute.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r--test/engine/test_execute.py380
1 files changed, 348 insertions, 32 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index e0bba0afa..b0256d325 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -25,6 +25,10 @@ from sqlalchemy.util import nested
users, metadata, users_autoinc = None, None, None
+class SomeException(Exception):
+ pass
+
+
class ExecuteTest(fixtures.TestBase):
__backend__ = True
@@ -170,7 +174,7 @@ class ExecuteTest(fixtures.TestBase):
@testing.skip_if(
lambda: testing.against('mysql+mysqldb'), 'db-api flaky')
@testing.fails_on_everything_except(
- 'postgresql+psycopg2',
+ 'postgresql+psycopg2', 'postgresql+psycopg2cffi',
'postgresql+pypostgresql', 'mysql+mysqlconnector',
'mysql+pymysql', 'mysql+cymysql')
def test_raw_python(self):
@@ -280,12 +284,13 @@ class ExecuteTest(fixtures.TestBase):
impl = Integer
def process_bind_param(self, value, dialect):
- raise Exception("nope")
+ raise SomeException("nope")
def _go(conn):
assert_raises_message(
tsa.exc.StatementError,
- r"nope \(original cause: Exception: nope\) u?'SELECT 1 ",
+ r"\(test.engine.test_execute.SomeException\) "
+ "nope \[SQL\: u?'SELECT 1 ",
conn.execute,
select([1]).
where(
@@ -561,7 +566,7 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
if is_transaction:
conn = conn.connection
conn.execute(self.table.insert().values(a=x, b=value))
- raise Exception("breakage")
+ raise SomeException("breakage")
return go
def _assert_no_data(self):
@@ -634,21 +639,21 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
def test_transaction_connection_ctx_commit(self):
fn = self._trans_fn(True)
- conn = testing.db.connect()
- ctx = conn.begin()
- testing.run_as_contextmanager(ctx, fn, 5, value=8)
- self._assert_fn(5, value=8)
+ with testing.db.connect() as conn:
+ ctx = conn.begin()
+ testing.run_as_contextmanager(ctx, fn, 5, value=8)
+ self._assert_fn(5, value=8)
def test_transaction_connection_ctx_rollback(self):
fn = self._trans_rollback_fn(True)
- conn = testing.db.connect()
- ctx = conn.begin()
- assert_raises_message(
- Exception,
- "breakage",
- testing.run_as_contextmanager, ctx, fn, 5, value=8
- )
- self._assert_no_data()
+ with testing.db.connect() as conn:
+ ctx = conn.begin()
+ assert_raises_message(
+ Exception,
+ "breakage",
+ testing.run_as_contextmanager, ctx, fn, 5, value=8
+ )
+ self._assert_no_data()
def test_connection_as_ctx(self):
fn = self._trans_fn()
@@ -661,10 +666,12 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
def test_connect_as_ctx_noautocommit(self):
fn = self._trans_fn()
self._assert_no_data()
- ctx = testing.db.connect().execution_options(autocommit=False)
- testing.run_as_contextmanager(ctx, fn, 5, value=8)
- # autocommit is off
- self._assert_no_data()
+
+ with testing.db.connect() as conn:
+ ctx = conn.execution_options(autocommit=False)
+ testing.run_as_contextmanager(ctx, fn, 5, value=8)
+ # autocommit is off
+ self._assert_no_data()
def test_transaction_engine_fn_commit(self):
fn = self._trans_fn()
@@ -682,17 +689,17 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
def test_transaction_connection_fn_commit(self):
fn = self._trans_fn()
- conn = testing.db.connect()
- conn.transaction(fn, 5, value=8)
- self._assert_fn(5, value=8)
+ with testing.db.connect() as conn:
+ conn.transaction(fn, 5, value=8)
+ self._assert_fn(5, value=8)
def test_transaction_connection_fn_rollback(self):
fn = self._trans_rollback_fn()
- conn = testing.db.connect()
- assert_raises(
- Exception,
- conn.transaction, fn, 5, value=8
- )
+ with testing.db.connect() as conn:
+ assert_raises(
+ Exception,
+ conn.transaction, fn, 5, value=8
+ )
self._assert_no_data()
@@ -1063,6 +1070,47 @@ class AlternateResultProxyTest(fixtures.TestBase):
rows = r.fetchmany(6)
eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
+ # result keeps going just fine with blank results...
+ eq_(r.fetchmany(2), [])
+
+ eq_(r.fetchmany(2), [])
+
+ eq_(r.fetchall(), [])
+
+ eq_(r.fetchone(), None)
+
+ # until we close
+ r.close()
+
+ self._assert_result_closed(r)
+
+ r = self.engine.execute(select([self.table]).limit(5))
+ eq_(r.first(), (1, "t_1"))
+ self._assert_result_closed(r)
+
+ r = self.engine.execute(select([self.table]).limit(5))
+ eq_(r.scalar(), 1)
+ self._assert_result_closed(r)
+
+ def _assert_result_closed(self, r):
+ assert_raises_message(
+ tsa.exc.ResourceClosedError,
+ "object is closed",
+ r.fetchone
+ )
+
+ assert_raises_message(
+ tsa.exc.ResourceClosedError,
+ "object is closed",
+ r.fetchmany, 2
+ )
+
+ assert_raises_message(
+ tsa.exc.ResourceClosedError,
+ "object is closed",
+ r.fetchall
+ )
+
def test_plain(self):
self._test_proxy(_result.ResultProxy)
@@ -1597,7 +1645,7 @@ class HandleErrorTest(fixtures.TestBase):
listener = Mock(return_value=None)
event.listen(engine, 'dbapi_error', listener)
- nope = Exception("nope")
+ nope = SomeException("nope")
class MyType(TypeDecorator):
impl = Integer
@@ -1608,7 +1656,8 @@ class HandleErrorTest(fixtures.TestBase):
with engine.connect() as conn:
assert_raises_message(
tsa.exc.StatementError,
- r"nope \(original cause: Exception: nope\) u?'SELECT 1 ",
+ r"\(test.engine.test_execute.SomeException\) "
+ "nope \[SQL\: u?'SELECT 1 ",
conn.execute,
select([1]).where(
column('foo') == literal('bar', MyType()))
@@ -1788,7 +1837,7 @@ class HandleErrorTest(fixtures.TestBase):
listener = Mock(return_value=None)
event.listen(engine, 'handle_error', listener)
- nope = Exception("nope")
+ nope = SomeException("nope")
class MyType(TypeDecorator):
impl = Integer
@@ -1799,7 +1848,8 @@ class HandleErrorTest(fixtures.TestBase):
with engine.connect() as conn:
assert_raises_message(
tsa.exc.StatementError,
- r"nope \(original cause: Exception: nope\) u?'SELECT 1 ",
+ r"\(test.engine.test_execute.SomeException\) "
+ "nope \[SQL\: u?'SELECT 1 ",
conn.execute,
select([1]).where(
column('foo') == literal('bar', MyType()))
@@ -1893,6 +1943,272 @@ class HandleErrorTest(fixtures.TestBase):
self._test_alter_disconnect(True, False)
self._test_alter_disconnect(False, False)
+ def test_handle_error_event_connect_isolation_level(self):
+ engine = engines.testing_engine()
+
+ class MySpecialException(Exception):
+ pass
+
+ @event.listens_for(engine, "handle_error")
+ def handle_error(ctx):
+ raise MySpecialException("failed operation")
+
+ ProgrammingError = engine.dialect.dbapi.ProgrammingError
+ with engine.connect() as conn:
+ with patch.object(
+ conn.dialect, "get_isolation_level",
+ Mock(side_effect=ProgrammingError("random error"))
+ ):
+ assert_raises(
+ MySpecialException,
+ conn.get_isolation_level
+ )
+
+
+class HandleInvalidatedOnConnectTest(fixtures.TestBase):
+ __requires__ = ('sqlite', )
+
+ def setUp(self):
+ e = create_engine('sqlite://')
+
+ connection = Mock(
+ get_server_version_info=Mock(return_value='5.0'))
+
+ def connect(*args, **kwargs):
+ return connection
+ dbapi = Mock(
+ sqlite_version_info=(99, 9, 9,),
+ version_info=(99, 9, 9,),
+ sqlite_version='99.9.9',
+ paramstyle='named',
+ connect=Mock(side_effect=connect)
+ )
+
+ sqlite3 = e.dialect.dbapi
+ dbapi.Error = sqlite3.Error,
+ dbapi.ProgrammingError = sqlite3.ProgrammingError
+
+ self.dbapi = dbapi
+ self.ProgrammingError = sqlite3.ProgrammingError
+
+ def test_wraps_connect_in_dbapi(self):
+ dbapi = self.dbapi
+ dbapi.connect = Mock(
+ side_effect=self.ProgrammingError("random error"))
+ try:
+ create_engine('sqlite://', module=dbapi).connect()
+ assert False
+ except tsa.exc.DBAPIError as de:
+ assert not de.connection_invalidated
+
+ def test_handle_error_event_connect(self):
+ dbapi = self.dbapi
+ dbapi.connect = Mock(
+ side_effect=self.ProgrammingError("random error"))
+
+ class MySpecialException(Exception):
+ pass
+
+ eng = create_engine('sqlite://', module=dbapi)
+
+ @event.listens_for(eng, "handle_error")
+ def handle_error(ctx):
+ assert ctx.engine is eng
+ assert ctx.connection is None
+ raise MySpecialException("failed operation")
+
+ assert_raises(
+ MySpecialException,
+ eng.connect
+ )
+
+ def test_handle_error_event_revalidate(self):
+ dbapi = self.dbapi
+
+ class MySpecialException(Exception):
+ pass
+
+ eng = create_engine('sqlite://', module=dbapi, _initialize=False)
+
+ @event.listens_for(eng, "handle_error")
+ def handle_error(ctx):
+ assert ctx.engine is eng
+ assert ctx.connection is conn
+ assert isinstance(ctx.sqlalchemy_exception, tsa.exc.ProgrammingError)
+ raise MySpecialException("failed operation")
+
+ conn = eng.connect()
+ conn.invalidate()
+
+ dbapi.connect = Mock(
+ side_effect=self.ProgrammingError("random error"))
+
+ assert_raises(
+ MySpecialException,
+ getattr, conn, 'connection'
+ )
+
+ def test_handle_error_event_implicit_revalidate(self):
+ dbapi = self.dbapi
+
+ class MySpecialException(Exception):
+ pass
+
+ eng = create_engine('sqlite://', module=dbapi, _initialize=False)
+
+ @event.listens_for(eng, "handle_error")
+ def handle_error(ctx):
+ assert ctx.engine is eng
+ assert ctx.connection is conn
+ assert isinstance(
+ ctx.sqlalchemy_exception, tsa.exc.ProgrammingError)
+ raise MySpecialException("failed operation")
+
+ conn = eng.connect()
+ conn.invalidate()
+
+ dbapi.connect = Mock(
+ side_effect=self.ProgrammingError("random error"))
+
+ assert_raises(
+ MySpecialException,
+ conn.execute, select([1])
+ )
+
+ def test_handle_error_custom_connect(self):
+ dbapi = self.dbapi
+
+ class MySpecialException(Exception):
+ pass
+
+ def custom_connect():
+ raise self.ProgrammingError("random error")
+
+ eng = create_engine('sqlite://', module=dbapi, creator=custom_connect)
+
+ @event.listens_for(eng, "handle_error")
+ def handle_error(ctx):
+ assert ctx.engine is eng
+ assert ctx.connection is None
+ raise MySpecialException("failed operation")
+
+ assert_raises(
+ MySpecialException,
+ eng.connect
+ )
+
+ def test_handle_error_event_connect_invalidate_flag(self):
+ dbapi = self.dbapi
+ dbapi.connect = Mock(
+ side_effect=self.ProgrammingError(
+ "Cannot operate on a closed database."))
+
+ class MySpecialException(Exception):
+ pass
+
+ eng = create_engine('sqlite://', module=dbapi)
+
+ @event.listens_for(eng, "handle_error")
+ def handle_error(ctx):
+ assert ctx.is_disconnect
+ ctx.is_disconnect = False
+
+ try:
+ eng.connect()
+ assert False
+ except tsa.exc.DBAPIError as de:
+ assert not de.connection_invalidated
+
+ def test_cant_connect_stay_invalidated(self):
+ class MySpecialException(Exception):
+ pass
+
+ eng = create_engine('sqlite://')
+
+ @event.listens_for(eng, "handle_error")
+ def handle_error(ctx):
+ assert ctx.is_disconnect
+
+ conn = eng.connect()
+
+ conn.invalidate()
+
+ eng.pool._creator = Mock(
+ side_effect=self.ProgrammingError(
+ "Cannot operate on a closed database."))
+
+ try:
+ conn.connection
+ assert False
+ except tsa.exc.DBAPIError:
+ assert conn.invalidated
+
+ def _test_dont_touch_non_dbapi_exception_on_connect(self, connect_fn):
+ dbapi = self.dbapi
+ dbapi.connect = Mock(side_effect=TypeError("I'm not a DBAPI error"))
+
+ e = create_engine('sqlite://', module=dbapi)
+ e.dialect.is_disconnect = is_disconnect = Mock()
+ assert_raises_message(
+ TypeError,
+ "I'm not a DBAPI error",
+ connect_fn, e
+ )
+ eq_(is_disconnect.call_count, 0)
+
+ def test_dont_touch_non_dbapi_exception_on_connect(self):
+ self._test_dont_touch_non_dbapi_exception_on_connect(
+ lambda engine: engine.connect())
+
+ def test_dont_touch_non_dbapi_exception_on_contextual_connect(self):
+ self._test_dont_touch_non_dbapi_exception_on_connect(
+ lambda engine: engine.contextual_connect())
+
+ def test_ensure_dialect_does_is_disconnect_no_conn(self):
+ """test that is_disconnect() doesn't choke if no connection,
+ cursor given."""
+ dialect = testing.db.dialect
+ dbapi = dialect.dbapi
+ assert not dialect.is_disconnect(
+ dbapi.OperationalError("test"), None, None)
+
+ def _test_invalidate_on_connect(self, connect_fn):
+ """test that is_disconnect() is called during connect.
+
+ interpretation of connection failures are not supported by
+ every backend.
+
+ """
+
+ dbapi = self.dbapi
+ dbapi.connect = Mock(
+ side_effect=self.ProgrammingError(
+ "Cannot operate on a closed database."))
+ try:
+ connect_fn(create_engine('sqlite://', module=dbapi))
+ assert False
+ except tsa.exc.DBAPIError as de:
+ assert de.connection_invalidated
+
+ def test_invalidate_on_connect(self):
+ """test that is_disconnect() is called during connect.
+
+ interpretation of connection failures are not supported by
+ every backend.
+
+ """
+ self._test_invalidate_on_connect(lambda engine: engine.connect())
+
+ def test_invalidate_on_contextual_connect(self):
+ """test that is_disconnect() is called during connect.
+
+ interpretation of connection failures are not supported by
+ every backend.
+
+ """
+ self._test_invalidate_on_connect(
+ lambda engine: engine.contextual_connect())
+
class ProxyConnectionTest(fixtures.TestBase):