diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2021-09-01 08:58:06 -0400 |
|---|---|---|
| committer | mike bayer <mike_mp@zzzcomputing.com> | 2021-09-02 14:19:20 +0000 |
| commit | d640192877e4d1da75e8dea34d2374c404e80538 (patch) | |
| tree | 9bb87b162c50ad6385949bc47369388752f020f5 /examples/asyncio | |
| parent | bf1fe670513abeb1596bc5266f50db1ffe62f3bd (diff) | |
| download | sqlalchemy-d640192877e4d1da75e8dea34d2374c404e80538.tar.gz | |
add asyncio.gather() example; add connection opts
while I dont like this approach very much, people will likely
be asking for it a lot, so represent the most correct and
efficient form we can handle right now.
Added missing ``**kw`` arguments to the
:meth:`_asyncio.AsyncSession.connection` method.
Change-Id: Idadae2a02a4d96ecb96a5723ce64d017ab4c6217
References: https://github.com/sqlalchemy/sqlalchemy/discussions/6965
Diffstat (limited to 'examples/asyncio')
| -rw-r--r-- | examples/asyncio/gather_orm_statements.py | 118 |
1 files changed, 118 insertions, 0 deletions
diff --git a/examples/asyncio/gather_orm_statements.py b/examples/asyncio/gather_orm_statements.py new file mode 100644 index 000000000..edcdc1fe8 --- /dev/null +++ b/examples/asyncio/gather_orm_statements.py @@ -0,0 +1,118 @@ +""" +Illustrates how to run many statements concurrently using ``asyncio.gather()`` +along many asyncio database connections, merging ORM results into a single +``AsyncSession``. + +Note that this pattern loses all transactional safety and is also not +necessarily any more performant than using a single Session, as it adds +significant CPU-bound work both to maintain more database connections +and sessions, as well as within the merging of results from external sessions +into one. + +Python is a CPU-intensive language even in trivial cases, so it is strongly +recommended that any workarounds for "speed" such as the one below are +carefully vetted to show that they do in fact improve performance vs a +traditional approach. + +""" + +import asyncio +import random + +from sqlalchemy import Column +from sqlalchemy import Integer +from sqlalchemy import String +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.ext.asyncio import create_async_engine +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.future import select +from sqlalchemy.orm import merge_frozen_result +from sqlalchemy.orm import sessionmaker + +Base = declarative_base() + + +class A(Base): + __tablename__ = "a" + + id = Column(Integer, primary_key=True) + data = Column(String) + + +async def run_out_of_band( + sessionmaker, session, statement, merge_results=True +): + """run an ORM statement in a distinct session, merging the result + back into the given session. + + """ + + async with sessionmaker() as oob_session: + + # use AUTOCOMMIT for each connection to reduce transaction + # overhead / contention + await oob_session.connection( + execution_options={"isolation_level": "AUTOCOMMIT"} + ) + + # pre 1.4.24 + # await oob_session.run_sync( + # lambda sync_session: sync_session.connection( + # execution_options={"isolation_level": "AUTOCOMMIT"} + # ) + # ) + + result = await oob_session.execute(statement) + + if merge_results: + # merge_results means the ORM objects from the result + # will be merged back into the original session. + # load=False means we can use the objects directly without + # re-selecting them. however this merge operation is still + # more expensive CPU-wise than a regular ORM load because the + # objects are copied into new instances + return ( + await session.run_sync( + merge_frozen_result, + statement, + result.freeze(), + load=False, + ) + )() + else: + await result.close() + + +async def async_main(): + + engine = create_async_engine( + "postgresql+asyncpg://scott:tiger@localhost/test", + echo=True, + ) + + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + await conn.run_sync(Base.metadata.create_all) + + async_session = sessionmaker( + engine, expire_on_commit=False, class_=AsyncSession + ) + + async with async_session() as session, session.begin(): + session.add_all([A(data="a_%d" % i) for i in range(100)]) + + statements = [ + select(A).where(A.data == "a_%d" % random.choice(range(100))) + for i in range(30) + ] + + results = await asyncio.gather( + *( + run_out_of_band(async_session, session, statement) + for statement in statements + ) + ) + print(f"results: {[r.all() for r in results]}") + + +asyncio.run(async_main()) |
