diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2021-01-01 13:21:55 -0500 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2021-01-02 22:09:39 -0500 |
commit | dd41a5e61a30a2d05ee09f583fdfde1f1c204807 (patch) | |
tree | 0d03b3f15c3b76ad3c0da5a75ddf524b326d58e2 /test/dialect/postgresql/test_async_pg_py3k.py | |
parent | 1c35a3790c1f186b91d556fb161c1958cbf718bb (diff) | |
download | sqlalchemy-dd41a5e61a30a2d05ee09f583fdfde1f1c204807.tar.gz |
Cache asyngpg prepared statements
Enhanced the performance of the asyncpg dialect by caching the asyncpg
PreparedStatement objects on a per-connection basis. For a test case that
makes use of the same statement on a set of pooled connections this appears
to grant a 10-20% speed improvement. The cache size is adjustable and may
also be disabled.
Unfortunately the caching gets more complicated when there are
schema changes present. An invalidation scheme is now also added
to accommodate for prepared statements as well as asyncpg cached OIDs.
However, the exception raises cannot be prevented if DDL has changed
database structures that were cached for a particular asyncpg
connection. Logic is added to clear the caches when these errors occur.
Change-Id: Icf02aa4871eb192f245690f28be4e9f9c35656c6
Diffstat (limited to 'test/dialect/postgresql/test_async_pg_py3k.py')
-rw-r--r-- | test/dialect/postgresql/test_async_pg_py3k.py | 182 |
1 files changed, 182 insertions, 0 deletions
diff --git a/test/dialect/postgresql/test_async_pg_py3k.py b/test/dialect/postgresql/test_async_pg_py3k.py new file mode 100644 index 000000000..fadf939b8 --- /dev/null +++ b/test/dialect/postgresql/test_async_pg_py3k.py @@ -0,0 +1,182 @@ +import random + +from sqlalchemy import Column +from sqlalchemy import exc +from sqlalchemy import Integer +from sqlalchemy import MetaData +from sqlalchemy import String +from sqlalchemy import Table +from sqlalchemy import testing +from sqlalchemy.dialects.postgresql import ENUM +from sqlalchemy.ext.asyncio import create_async_engine +from sqlalchemy.testing import async_test +from sqlalchemy.testing import engines +from sqlalchemy.testing import fixtures + + +class AsyncPgTest(fixtures.TestBase): + __requires__ = ("async_dialect",) + __only_on__ = "postgresql+asyncpg" + + @testing.fixture + def async_engine(self): + return create_async_engine(testing.db.url) + + @testing.fixture() + def metadata(self): + # TODO: remove when Iae6ab95938a7e92b6d42086aec534af27b5577d3 + # merges + + from sqlalchemy.testing import engines + from sqlalchemy.sql import schema + + metadata = schema.MetaData() + + try: + yield metadata + finally: + engines.drop_all_tables(metadata, testing.db) + + @async_test + async def test_detect_stale_ddl_cache_raise_recover( + self, metadata, async_engine + ): + async def async_setup(engine, strlen): + metadata.clear() + t1 = Table( + "t1", + metadata, + Column("id", Integer, primary_key=True), + Column("name", String(strlen)), + ) + + # conn is an instance of AsyncConnection + async with engine.begin() as conn: + await conn.run_sync(metadata.drop_all) + await conn.run_sync(metadata.create_all) + await conn.execute( + t1.insert(), + [{"name": "some name %d" % i} for i in range(500)], + ) + + meta = MetaData() + + t1 = Table( + "t1", + meta, + Column("id", Integer, primary_key=True), + Column("name", String), + ) + + await async_setup(async_engine, 30) + + second_engine = engines.testing_engine(asyncio=True) + + async with second_engine.connect() as conn: + result = await conn.execute( + t1.select() + .where(t1.c.name.like("some name%")) + .where(t1.c.id % 17 == 6) + ) + + rows = result.fetchall() + assert len(rows) >= 29 + + await async_setup(async_engine, 20) + + async with second_engine.connect() as conn: + with testing.expect_raises_message( + exc.NotSupportedError, + r"cached statement plan is invalid due to a database schema " + r"or configuration change \(SQLAlchemy asyncpg dialect " + r"will now invalidate all prepared caches in response " + r"to this exception\)", + ): + + result = await conn.execute( + t1.select() + .where(t1.c.name.like("some name%")) + .where(t1.c.id % 17 == 6) + ) + + # works again + async with second_engine.connect() as conn: + result = await conn.execute( + t1.select() + .where(t1.c.name.like("some name%")) + .where(t1.c.id % 17 == 6) + ) + + rows = result.fetchall() + assert len(rows) >= 29 + + @async_test + async def test_detect_stale_type_cache_raise_recover( + self, metadata, async_engine + ): + async def async_setup(engine, enums): + metadata = MetaData() + Table( + "t1", + metadata, + Column("id", Integer, primary_key=True), + Column("name", ENUM(*enums, name="my_enum")), + ) + + # conn is an instance of AsyncConnection + async with engine.begin() as conn: + await conn.run_sync(metadata.drop_all) + await conn.run_sync(metadata.create_all) + + t1 = Table( + "t1", + metadata, + Column("id", Integer, primary_key=True), + Column( + "name", + ENUM( + *("beans", "means", "keens", "faux", "beau", "flow"), + name="my_enum" + ), + ), + ) + + await async_setup(async_engine, ("beans", "means", "keens")) + + second_engine = engines.testing_engine( + asyncio=True, + options={"connect_args": {"prepared_statement_cache_size": 0}}, + ) + + async with second_engine.connect() as conn: + await conn.execute( + t1.insert(), + [ + {"name": random.choice(("beans", "means", "keens"))} + for i in range(10) + ], + ) + + await async_setup(async_engine, ("faux", "beau", "flow")) + + async with second_engine.connect() as conn: + with testing.expect_raises_message( + exc.InternalError, "cache lookup failed for type" + ): + await conn.execute( + t1.insert(), + [ + {"name": random.choice(("faux", "beau", "flow"))} + for i in range(10) + ], + ) + + # works again + async with second_engine.connect() as conn: + await conn.execute( + t1.insert(), + [ + {"name": random.choice(("faux", "beau", "flow"))} + for i in range(10) + ], + ) |