diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2021-01-15 17:23:52 -0500 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2021-01-15 17:51:34 -0500 |
commit | f898ef162da65a3755d25ee194885677c880c91f (patch) | |
tree | 2d45d310f62fd53bb69e73285b2d95f0e2f6c70e /test/dialect/postgresql/test_async_pg_py3k.py | |
parent | 5be3c030b321ecc342bf4cd84cbb0838efacd155 (diff) | |
download | sqlalchemy-f898ef162da65a3755d25ee194885677c880c91f.tar.gz |
run handle error for commit/rollback fail and cancel transaction
Fixed bug in asyncpg dialect where a failure during a "commit" or less
likely a "rollback" should cancel the entire transaction; it's no longer
possible to emit rollback. Previously the connection would continue to
await a rollback that could not succeed as asyncpg would reject it.
Fixes: #5824
Change-Id: I5a4916740c269b410f4d1a78ed25191de344b9d0
Diffstat (limited to 'test/dialect/postgresql/test_async_pg_py3k.py')
-rw-r--r-- | test/dialect/postgresql/test_async_pg_py3k.py | 135 |
1 files changed, 103 insertions, 32 deletions
diff --git a/test/dialect/postgresql/test_async_pg_py3k.py b/test/dialect/postgresql/test_async_pg_py3k.py index f6d48f3c6..62c8f5dde 100644 --- a/test/dialect/postgresql/test_async_pg_py3k.py +++ b/test/dialect/postgresql/test_async_pg_py3k.py @@ -2,15 +2,16 @@ import random from sqlalchemy import Column from sqlalchemy import exc +from sqlalchemy import ForeignKey from sqlalchemy import Integer from sqlalchemy import MetaData +from sqlalchemy import select from sqlalchemy import String from sqlalchemy import Table from sqlalchemy import testing from sqlalchemy.dialects.postgresql import ENUM -from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.testing import async_test -from sqlalchemy.testing import engines +from sqlalchemy.testing import eq_ from sqlalchemy.testing import fixtures @@ -18,28 +19,9 @@ class AsyncPgTest(fixtures.TestBase): __requires__ = ("async_dialect",) __only_on__ = "postgresql+asyncpg" - @testing.fixture - def async_engine(self): - return create_async_engine(testing.db.url) - - @testing.fixture() - def metadata(self): - # TODO: remove when Iae6ab95938a7e92b6d42086aec534af27b5577d3 - # merges - - from sqlalchemy.testing import util as testing_util - from sqlalchemy.sql import schema - - metadata = schema.MetaData() - - try: - yield metadata - finally: - testing_util.drop_all_tables_from_metadata(metadata, testing.db) - @async_test async def test_detect_stale_ddl_cache_raise_recover( - self, metadata, async_engine + self, metadata, async_testing_engine ): async def async_setup(engine, strlen): metadata.clear() @@ -68,9 +50,10 @@ class AsyncPgTest(fixtures.TestBase): Column("name", String), ) - await async_setup(async_engine, 30) + first_engine = async_testing_engine() + second_engine = async_testing_engine() - second_engine = engines.testing_engine(asyncio=True) + await async_setup(first_engine, 30) async with second_engine.connect() as conn: result = await conn.execute( @@ -82,7 +65,7 @@ class AsyncPgTest(fixtures.TestBase): rows = result.fetchall() assert len(rows) >= 29 - await async_setup(async_engine, 20) + await async_setup(first_engine, 20) async with second_engine.connect() as conn: with testing.expect_raises_message( @@ -112,7 +95,7 @@ class AsyncPgTest(fixtures.TestBase): @async_test async def test_detect_stale_type_cache_raise_recover( - self, metadata, async_engine + self, metadata, async_testing_engine ): async def async_setup(engine, enums): metadata = MetaData() @@ -141,13 +124,13 @@ class AsyncPgTest(fixtures.TestBase): ), ) - await async_setup(async_engine, ("beans", "means", "keens")) - - second_engine = engines.testing_engine( - asyncio=True, - options={"connect_args": {"prepared_statement_cache_size": 0}}, + first_engine = async_testing_engine() + second_engine = async_testing_engine( + options={"connect_args": {"prepared_statement_cache_size": 0}} ) + await async_setup(first_engine, ("beans", "means", "keens")) + async with second_engine.connect() as conn: await conn.execute( t1.insert(), @@ -157,7 +140,7 @@ class AsyncPgTest(fixtures.TestBase): ], ) - await async_setup(async_engine, ("faux", "beau", "flow")) + await async_setup(first_engine, ("faux", "beau", "flow")) async with second_engine.connect() as conn: with testing.expect_raises_message( @@ -180,3 +163,91 @@ class AsyncPgTest(fixtures.TestBase): for i in range(10) ], ) + + @async_test + async def test_failed_commit_recover(self, metadata, async_testing_engine): + + Table("t1", metadata, Column("id", Integer, primary_key=True)) + + t2 = Table( + "t2", + metadata, + Column("id", Integer, primary_key=True), + Column( + "t1_id", + Integer, + ForeignKey("t1.id", deferrable=True, initially="deferred"), + ), + ) + + engine = async_testing_engine() + + async with engine.connect() as conn: + await conn.run_sync(metadata.create_all) + + await conn.execute(t2.insert().values(id=1, t1_id=2)) + + with testing.expect_raises_message( + exc.IntegrityError, 'insert or update on table "t2"' + ): + await conn.commit() + + await conn.rollback() + + eq_((await conn.execute(select(1))).scalar(), 1) + + @async_test + async def test_rollback_twice_no_problem( + self, metadata, async_testing_engine + ): + + engine = async_testing_engine() + + async with engine.connect() as conn: + + trans = await conn.begin() + + await trans.rollback() + + await conn.rollback() + + @async_test + async def test_closed_during_execute(self, metadata, async_testing_engine): + + engine = async_testing_engine() + + async with engine.connect() as conn: + await conn.begin() + + with testing.expect_raises_message( + exc.DBAPIError, "connection was closed" + ): + await conn.exec_driver_sql( + "select pg_terminate_backend(pg_backend_pid())" + ) + + @async_test + async def test_failed_rollback_recover( + self, metadata, async_testing_engine + ): + + engine = async_testing_engine() + + async with engine.connect() as conn: + await conn.begin() + + (await conn.execute(select(1))).scalar() + + raw_connection = await conn.get_raw_connection() + # close the asyncpg transaction directly + await raw_connection._transaction.rollback() + + with testing.expect_raises_message( + exc.InterfaceError, "already rolled back" + ): + await conn.rollback() + + # recovers no problem + + await conn.begin() + await conn.rollback() |