From d640192877e4d1da75e8dea34d2374c404e80538 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Wed, 1 Sep 2021 08:58:06 -0400 Subject: add asyncio.gather() example; add connection opts while I dont like this approach very much, people will likely be asking for it a lot, so represent the most correct and efficient form we can handle right now. Added missing ``**kw`` arguments to the :meth:`_asyncio.AsyncSession.connection` method. Change-Id: Idadae2a02a4d96ecb96a5723ce64d017ab4c6217 References: https://github.com/sqlalchemy/sqlalchemy/discussions/6965 --- examples/asyncio/gather_orm_statements.py | 118 ++++++++++++++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 examples/asyncio/gather_orm_statements.py (limited to 'examples/asyncio') diff --git a/examples/asyncio/gather_orm_statements.py b/examples/asyncio/gather_orm_statements.py new file mode 100644 index 000000000..edcdc1fe8 --- /dev/null +++ b/examples/asyncio/gather_orm_statements.py @@ -0,0 +1,118 @@ +""" +Illustrates how to run many statements concurrently using ``asyncio.gather()`` +along many asyncio database connections, merging ORM results into a single +``AsyncSession``. + +Note that this pattern loses all transactional safety and is also not +necessarily any more performant than using a single Session, as it adds +significant CPU-bound work both to maintain more database connections +and sessions, as well as within the merging of results from external sessions +into one. + +Python is a CPU-intensive language even in trivial cases, so it is strongly +recommended that any workarounds for "speed" such as the one below are +carefully vetted to show that they do in fact improve performance vs a +traditional approach. + +""" + +import asyncio +import random + +from sqlalchemy import Column +from sqlalchemy import Integer +from sqlalchemy import String +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.ext.asyncio import create_async_engine +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.future import select +from sqlalchemy.orm import merge_frozen_result +from sqlalchemy.orm import sessionmaker + +Base = declarative_base() + + +class A(Base): + __tablename__ = "a" + + id = Column(Integer, primary_key=True) + data = Column(String) + + +async def run_out_of_band( + sessionmaker, session, statement, merge_results=True +): + """run an ORM statement in a distinct session, merging the result + back into the given session. + + """ + + async with sessionmaker() as oob_session: + + # use AUTOCOMMIT for each connection to reduce transaction + # overhead / contention + await oob_session.connection( + execution_options={"isolation_level": "AUTOCOMMIT"} + ) + + # pre 1.4.24 + # await oob_session.run_sync( + # lambda sync_session: sync_session.connection( + # execution_options={"isolation_level": "AUTOCOMMIT"} + # ) + # ) + + result = await oob_session.execute(statement) + + if merge_results: + # merge_results means the ORM objects from the result + # will be merged back into the original session. + # load=False means we can use the objects directly without + # re-selecting them. however this merge operation is still + # more expensive CPU-wise than a regular ORM load because the + # objects are copied into new instances + return ( + await session.run_sync( + merge_frozen_result, + statement, + result.freeze(), + load=False, + ) + )() + else: + await result.close() + + +async def async_main(): + + engine = create_async_engine( + "postgresql+asyncpg://scott:tiger@localhost/test", + echo=True, + ) + + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + await conn.run_sync(Base.metadata.create_all) + + async_session = sessionmaker( + engine, expire_on_commit=False, class_=AsyncSession + ) + + async with async_session() as session, session.begin(): + session.add_all([A(data="a_%d" % i) for i in range(100)]) + + statements = [ + select(A).where(A.data == "a_%d" % random.choice(range(100))) + for i in range(30) + ] + + results = await asyncio.gather( + *( + run_out_of_band(async_session, session, statement) + for statement in statements + ) + ) + print(f"results: {[r.all() for r in results]}") + + +asyncio.run(async_main()) -- cgit v1.2.1