diff options
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r-- | test/engine/test_execute.py | 380 |
1 files changed, 348 insertions, 32 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index e0bba0afa..b0256d325 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -25,6 +25,10 @@ from sqlalchemy.util import nested users, metadata, users_autoinc = None, None, None +class SomeException(Exception): + pass + + class ExecuteTest(fixtures.TestBase): __backend__ = True @@ -170,7 +174,7 @@ class ExecuteTest(fixtures.TestBase): @testing.skip_if( lambda: testing.against('mysql+mysqldb'), 'db-api flaky') @testing.fails_on_everything_except( - 'postgresql+psycopg2', + 'postgresql+psycopg2', 'postgresql+psycopg2cffi', 'postgresql+pypostgresql', 'mysql+mysqlconnector', 'mysql+pymysql', 'mysql+cymysql') def test_raw_python(self): @@ -280,12 +284,13 @@ class ExecuteTest(fixtures.TestBase): impl = Integer def process_bind_param(self, value, dialect): - raise Exception("nope") + raise SomeException("nope") def _go(conn): assert_raises_message( tsa.exc.StatementError, - r"nope \(original cause: Exception: nope\) u?'SELECT 1 ", + r"\(test.engine.test_execute.SomeException\) " + "nope \[SQL\: u?'SELECT 1 ", conn.execute, select([1]). where( @@ -561,7 +566,7 @@ class ConvenienceExecuteTest(fixtures.TablesTest): if is_transaction: conn = conn.connection conn.execute(self.table.insert().values(a=x, b=value)) - raise Exception("breakage") + raise SomeException("breakage") return go def _assert_no_data(self): @@ -634,21 +639,21 @@ class ConvenienceExecuteTest(fixtures.TablesTest): def test_transaction_connection_ctx_commit(self): fn = self._trans_fn(True) - conn = testing.db.connect() - ctx = conn.begin() - testing.run_as_contextmanager(ctx, fn, 5, value=8) - self._assert_fn(5, value=8) + with testing.db.connect() as conn: + ctx = conn.begin() + testing.run_as_contextmanager(ctx, fn, 5, value=8) + self._assert_fn(5, value=8) def test_transaction_connection_ctx_rollback(self): fn = self._trans_rollback_fn(True) - conn = testing.db.connect() - ctx = conn.begin() - assert_raises_message( - Exception, - "breakage", - testing.run_as_contextmanager, ctx, fn, 5, value=8 - ) - self._assert_no_data() + with testing.db.connect() as conn: + ctx = conn.begin() + assert_raises_message( + Exception, + "breakage", + testing.run_as_contextmanager, ctx, fn, 5, value=8 + ) + self._assert_no_data() def test_connection_as_ctx(self): fn = self._trans_fn() @@ -661,10 +666,12 @@ class ConvenienceExecuteTest(fixtures.TablesTest): def test_connect_as_ctx_noautocommit(self): fn = self._trans_fn() self._assert_no_data() - ctx = testing.db.connect().execution_options(autocommit=False) - testing.run_as_contextmanager(ctx, fn, 5, value=8) - # autocommit is off - self._assert_no_data() + + with testing.db.connect() as conn: + ctx = conn.execution_options(autocommit=False) + testing.run_as_contextmanager(ctx, fn, 5, value=8) + # autocommit is off + self._assert_no_data() def test_transaction_engine_fn_commit(self): fn = self._trans_fn() @@ -682,17 +689,17 @@ class ConvenienceExecuteTest(fixtures.TablesTest): def test_transaction_connection_fn_commit(self): fn = self._trans_fn() - conn = testing.db.connect() - conn.transaction(fn, 5, value=8) - self._assert_fn(5, value=8) + with testing.db.connect() as conn: + conn.transaction(fn, 5, value=8) + self._assert_fn(5, value=8) def test_transaction_connection_fn_rollback(self): fn = self._trans_rollback_fn() - conn = testing.db.connect() - assert_raises( - Exception, - conn.transaction, fn, 5, value=8 - ) + with testing.db.connect() as conn: + assert_raises( + Exception, + conn.transaction, fn, 5, value=8 + ) self._assert_no_data() @@ -1063,6 +1070,47 @@ class AlternateResultProxyTest(fixtures.TestBase): rows = r.fetchmany(6) eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) + # result keeps going just fine with blank results... + eq_(r.fetchmany(2), []) + + eq_(r.fetchmany(2), []) + + eq_(r.fetchall(), []) + + eq_(r.fetchone(), None) + + # until we close + r.close() + + self._assert_result_closed(r) + + r = self.engine.execute(select([self.table]).limit(5)) + eq_(r.first(), (1, "t_1")) + self._assert_result_closed(r) + + r = self.engine.execute(select([self.table]).limit(5)) + eq_(r.scalar(), 1) + self._assert_result_closed(r) + + def _assert_result_closed(self, r): + assert_raises_message( + tsa.exc.ResourceClosedError, + "object is closed", + r.fetchone + ) + + assert_raises_message( + tsa.exc.ResourceClosedError, + "object is closed", + r.fetchmany, 2 + ) + + assert_raises_message( + tsa.exc.ResourceClosedError, + "object is closed", + r.fetchall + ) + def test_plain(self): self._test_proxy(_result.ResultProxy) @@ -1597,7 +1645,7 @@ class HandleErrorTest(fixtures.TestBase): listener = Mock(return_value=None) event.listen(engine, 'dbapi_error', listener) - nope = Exception("nope") + nope = SomeException("nope") class MyType(TypeDecorator): impl = Integer @@ -1608,7 +1656,8 @@ class HandleErrorTest(fixtures.TestBase): with engine.connect() as conn: assert_raises_message( tsa.exc.StatementError, - r"nope \(original cause: Exception: nope\) u?'SELECT 1 ", + r"\(test.engine.test_execute.SomeException\) " + "nope \[SQL\: u?'SELECT 1 ", conn.execute, select([1]).where( column('foo') == literal('bar', MyType())) @@ -1788,7 +1837,7 @@ class HandleErrorTest(fixtures.TestBase): listener = Mock(return_value=None) event.listen(engine, 'handle_error', listener) - nope = Exception("nope") + nope = SomeException("nope") class MyType(TypeDecorator): impl = Integer @@ -1799,7 +1848,8 @@ class HandleErrorTest(fixtures.TestBase): with engine.connect() as conn: assert_raises_message( tsa.exc.StatementError, - r"nope \(original cause: Exception: nope\) u?'SELECT 1 ", + r"\(test.engine.test_execute.SomeException\) " + "nope \[SQL\: u?'SELECT 1 ", conn.execute, select([1]).where( column('foo') == literal('bar', MyType())) @@ -1893,6 +1943,272 @@ class HandleErrorTest(fixtures.TestBase): self._test_alter_disconnect(True, False) self._test_alter_disconnect(False, False) + def test_handle_error_event_connect_isolation_level(self): + engine = engines.testing_engine() + + class MySpecialException(Exception): + pass + + @event.listens_for(engine, "handle_error") + def handle_error(ctx): + raise MySpecialException("failed operation") + + ProgrammingError = engine.dialect.dbapi.ProgrammingError + with engine.connect() as conn: + with patch.object( + conn.dialect, "get_isolation_level", + Mock(side_effect=ProgrammingError("random error")) + ): + assert_raises( + MySpecialException, + conn.get_isolation_level + ) + + +class HandleInvalidatedOnConnectTest(fixtures.TestBase): + __requires__ = ('sqlite', ) + + def setUp(self): + e = create_engine('sqlite://') + + connection = Mock( + get_server_version_info=Mock(return_value='5.0')) + + def connect(*args, **kwargs): + return connection + dbapi = Mock( + sqlite_version_info=(99, 9, 9,), + version_info=(99, 9, 9,), + sqlite_version='99.9.9', + paramstyle='named', + connect=Mock(side_effect=connect) + ) + + sqlite3 = e.dialect.dbapi + dbapi.Error = sqlite3.Error, + dbapi.ProgrammingError = sqlite3.ProgrammingError + + self.dbapi = dbapi + self.ProgrammingError = sqlite3.ProgrammingError + + def test_wraps_connect_in_dbapi(self): + dbapi = self.dbapi + dbapi.connect = Mock( + side_effect=self.ProgrammingError("random error")) + try: + create_engine('sqlite://', module=dbapi).connect() + assert False + except tsa.exc.DBAPIError as de: + assert not de.connection_invalidated + + def test_handle_error_event_connect(self): + dbapi = self.dbapi + dbapi.connect = Mock( + side_effect=self.ProgrammingError("random error")) + + class MySpecialException(Exception): + pass + + eng = create_engine('sqlite://', module=dbapi) + + @event.listens_for(eng, "handle_error") + def handle_error(ctx): + assert ctx.engine is eng + assert ctx.connection is None + raise MySpecialException("failed operation") + + assert_raises( + MySpecialException, + eng.connect + ) + + def test_handle_error_event_revalidate(self): + dbapi = self.dbapi + + class MySpecialException(Exception): + pass + + eng = create_engine('sqlite://', module=dbapi, _initialize=False) + + @event.listens_for(eng, "handle_error") + def handle_error(ctx): + assert ctx.engine is eng + assert ctx.connection is conn + assert isinstance(ctx.sqlalchemy_exception, tsa.exc.ProgrammingError) + raise MySpecialException("failed operation") + + conn = eng.connect() + conn.invalidate() + + dbapi.connect = Mock( + side_effect=self.ProgrammingError("random error")) + + assert_raises( + MySpecialException, + getattr, conn, 'connection' + ) + + def test_handle_error_event_implicit_revalidate(self): + dbapi = self.dbapi + + class MySpecialException(Exception): + pass + + eng = create_engine('sqlite://', module=dbapi, _initialize=False) + + @event.listens_for(eng, "handle_error") + def handle_error(ctx): + assert ctx.engine is eng + assert ctx.connection is conn + assert isinstance( + ctx.sqlalchemy_exception, tsa.exc.ProgrammingError) + raise MySpecialException("failed operation") + + conn = eng.connect() + conn.invalidate() + + dbapi.connect = Mock( + side_effect=self.ProgrammingError("random error")) + + assert_raises( + MySpecialException, + conn.execute, select([1]) + ) + + def test_handle_error_custom_connect(self): + dbapi = self.dbapi + + class MySpecialException(Exception): + pass + + def custom_connect(): + raise self.ProgrammingError("random error") + + eng = create_engine('sqlite://', module=dbapi, creator=custom_connect) + + @event.listens_for(eng, "handle_error") + def handle_error(ctx): + assert ctx.engine is eng + assert ctx.connection is None + raise MySpecialException("failed operation") + + assert_raises( + MySpecialException, + eng.connect + ) + + def test_handle_error_event_connect_invalidate_flag(self): + dbapi = self.dbapi + dbapi.connect = Mock( + side_effect=self.ProgrammingError( + "Cannot operate on a closed database.")) + + class MySpecialException(Exception): + pass + + eng = create_engine('sqlite://', module=dbapi) + + @event.listens_for(eng, "handle_error") + def handle_error(ctx): + assert ctx.is_disconnect + ctx.is_disconnect = False + + try: + eng.connect() + assert False + except tsa.exc.DBAPIError as de: + assert not de.connection_invalidated + + def test_cant_connect_stay_invalidated(self): + class MySpecialException(Exception): + pass + + eng = create_engine('sqlite://') + + @event.listens_for(eng, "handle_error") + def handle_error(ctx): + assert ctx.is_disconnect + + conn = eng.connect() + + conn.invalidate() + + eng.pool._creator = Mock( + side_effect=self.ProgrammingError( + "Cannot operate on a closed database.")) + + try: + conn.connection + assert False + except tsa.exc.DBAPIError: + assert conn.invalidated + + def _test_dont_touch_non_dbapi_exception_on_connect(self, connect_fn): + dbapi = self.dbapi + dbapi.connect = Mock(side_effect=TypeError("I'm not a DBAPI error")) + + e = create_engine('sqlite://', module=dbapi) + e.dialect.is_disconnect = is_disconnect = Mock() + assert_raises_message( + TypeError, + "I'm not a DBAPI error", + connect_fn, e + ) + eq_(is_disconnect.call_count, 0) + + def test_dont_touch_non_dbapi_exception_on_connect(self): + self._test_dont_touch_non_dbapi_exception_on_connect( + lambda engine: engine.connect()) + + def test_dont_touch_non_dbapi_exception_on_contextual_connect(self): + self._test_dont_touch_non_dbapi_exception_on_connect( + lambda engine: engine.contextual_connect()) + + def test_ensure_dialect_does_is_disconnect_no_conn(self): + """test that is_disconnect() doesn't choke if no connection, + cursor given.""" + dialect = testing.db.dialect + dbapi = dialect.dbapi + assert not dialect.is_disconnect( + dbapi.OperationalError("test"), None, None) + + def _test_invalidate_on_connect(self, connect_fn): + """test that is_disconnect() is called during connect. + + interpretation of connection failures are not supported by + every backend. + + """ + + dbapi = self.dbapi + dbapi.connect = Mock( + side_effect=self.ProgrammingError( + "Cannot operate on a closed database.")) + try: + connect_fn(create_engine('sqlite://', module=dbapi)) + assert False + except tsa.exc.DBAPIError as de: + assert de.connection_invalidated + + def test_invalidate_on_connect(self): + """test that is_disconnect() is called during connect. + + interpretation of connection failures are not supported by + every backend. + + """ + self._test_invalidate_on_connect(lambda engine: engine.connect()) + + def test_invalidate_on_contextual_connect(self): + """test that is_disconnect() is called during connect. + + interpretation of connection failures are not supported by + every backend. + + """ + self._test_invalidate_on_connect( + lambda engine: engine.contextual_connect()) + class ProxyConnectionTest(fixtures.TestBase): |