diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-07-04 12:21:36 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-08-13 18:41:53 -0400 |
commit | 5fb0138a3220161703e6ab1087319a669d14e7f4 (patch) | |
tree | 25d006b30830ce6bc71f7a69bed9b570e1ae9654 /lib/sqlalchemy/ext/asyncio/session.py | |
parent | cd03b8f0cecbf72ecd6c99c4d3a6338c8278b40d (diff) | |
download | sqlalchemy-5fb0138a3220161703e6ab1087319a669d14e7f4.tar.gz |
Implement rudimentary asyncio support w/ asyncpg
Using the approach introduced at
https://gist.github.com/zzzeek/6287e28054d3baddc07fa21a7227904e
We can now create asyncio endpoints that are then handled
in "implicit IO" form within the majority of the Core internals.
Then coroutines are re-exposed at the point at which we call
into asyncpg methods.
Patch includes:
* asyncpg dialect
* asyncio package
* engine, result, ORM session classes
* new test fixtures, tests
* some work with pep-484 and a short plugin for the
pyannotate package, which seems to have so-so results
Change-Id: Idbcc0eff72c4cad572914acdd6f40ddb1aef1a7d
Fixes: #3414
Diffstat (limited to 'lib/sqlalchemy/ext/asyncio/session.py')
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/session.py | 293 |
1 files changed, 293 insertions, 0 deletions
diff --git a/lib/sqlalchemy/ext/asyncio/session.py b/lib/sqlalchemy/ext/asyncio/session.py new file mode 100644 index 000000000..167301780 --- /dev/null +++ b/lib/sqlalchemy/ext/asyncio/session.py @@ -0,0 +1,293 @@ +from typing import Any +from typing import Callable +from typing import List +from typing import Mapping +from typing import Optional + +from . import engine +from . import result as _result +from .base import StartableContext +from .engine import AsyncEngine +from ... import util +from ...engine import Result +from ...orm import Session +from ...sql import Executable +from ...util.concurrency import greenlet_spawn + + +class AsyncSession: + """Asyncio version of :class:`_orm.Session`. + + + .. versionadded:: 1.4 + + """ + + def __init__( + self, + bind: AsyncEngine = None, + binds: Mapping[object, AsyncEngine] = None, + **kw + ): + kw["future"] = True + if bind: + bind = engine._get_sync_engine(bind) + + if binds: + binds = { + key: engine._get_sync_engine(b) for key, b in binds.items() + } + + self.sync_session = Session(bind=bind, binds=binds, **kw) + + def add(self, instance: object) -> None: + """Place an object in this :class:`_asyncio.AsyncSession`. + + .. seealso:: + + :meth:`_orm.Session.add` + + """ + self.sync_session.add(instance) + + def add_all(self, instances: List[object]) -> None: + """Add the given collection of instances to this + :class:`_asyncio.AsyncSession`.""" + + self.sync_session.add_all(instances) + + def expire_all(self): + """Expires all persistent instances within this Session. + + See :meth:`_orm.Session.expire_all` for usage details. + + """ + self.sync_session.expire_all() + + def expire(self, instance, attribute_names=None): + """Expire the attributes on an instance. + + See :meth:`._orm.Session.expire` for usage details. + + """ + self.sync_session.expire() + + async def refresh( + self, instance, attribute_names=None, with_for_update=None + ): + """Expire and refresh the attributes on the given instance. + + A query will be issued to the database and all attributes will be + refreshed with their current database value. + + This is the async version of the :meth:`_orm.Session.refresh` method. + See that method for a complete description of all options. + + """ + + return await greenlet_spawn( + self.sync_session.refresh, + instance, + attribute_names=attribute_names, + with_for_update=with_for_update, + ) + + async def run_sync(self, fn: Callable, *arg, **kw) -> Any: + """Invoke the given sync callable passing sync self as the first + argument. + + This method maintains the asyncio event loop all the way through + to the database connection by running the given callable in a + specially instrumented greenlet. + + E.g.:: + + with AsyncSession(async_engine) as session: + await session.run_sync(some_business_method) + + """ + + return await greenlet_spawn(fn, self.sync_session, *arg, **kw) + + async def execute( + self, + statement: Executable, + params: Optional[Mapping] = None, + execution_options: Mapping = util.EMPTY_DICT, + bind_arguments: Optional[Mapping] = None, + **kw + ) -> Result: + """Execute a statement and return a buffered + :class:`_engine.Result` object.""" + + execution_options = execution_options.union({"prebuffer_rows": True}) + + return await greenlet_spawn( + self.sync_session.execute, + statement, + params=params, + execution_options=execution_options, + bind_arguments=bind_arguments, + **kw + ) + + async def stream( + self, + statement, + params=None, + execution_options=util.EMPTY_DICT, + bind_arguments=None, + **kw + ): + """Execute a statement and return a streaming + :class:`_asyncio.AsyncResult` object.""" + + execution_options = execution_options.union({"stream_results": True}) + + result = await greenlet_spawn( + self.sync_session.execute, + statement, + params=params, + execution_options=execution_options, + bind_arguments=bind_arguments, + **kw + ) + return _result.AsyncResult(result) + + async def merge(self, instance, load=True): + """Copy the state of a given instance into a corresponding instance + within this :class:`_asyncio.AsyncSession`. + + """ + return await greenlet_spawn( + self.sync_session.merge, instance, load=load + ) + + async def flush(self, objects=None): + """Flush all the object changes to the database. + + .. seealso:: + + :meth:`_orm.Session.flush` + + """ + await greenlet_spawn(self.sync_session.flush, objects=objects) + + async def connection(self): + r"""Return a :class:`_asyncio.AsyncConnection` object corresponding to this + :class:`.Session` object's transactional state. + + """ + sync_connection = await greenlet_spawn(self.sync_session.connection) + return engine.AsyncConnection(sync_connection.engine, sync_connection) + + def begin(self, **kw): + """Return an :class:`_asyncio.AsyncSessionTransaction` object. + + The underlying :class:`_orm.Session` will perform the + "begin" action when the :class:`_asyncio.AsyncSessionTransaction` + object is entered:: + + async with async_session.begin(): + # .. ORM transaction is begun + + Note that database IO will not normally occur when the session-level + transaction is begun, as database transactions begin on an + on-demand basis. However, the begin block is async to accommodate + for a :meth:`_orm.SessionEvents.after_transaction_create` + event hook that may perform IO. + + For a general description of ORM begin, see + :meth:`_orm.Session.begin`. + + """ + + return AsyncSessionTransaction(self) + + def begin_nested(self, **kw): + """Return an :class:`_asyncio.AsyncSessionTransaction` object + which will begin a "nested" transaction, e.g. SAVEPOINT. + + Behavior is the same as that of :meth:`_asyncio.AsyncSession.begin`. + + For a general description of ORM begin nested, see + :meth:`_orm.Session.begin_nested`. + + """ + + return AsyncSessionTransaction(self, nested=True) + + async def rollback(self): + return await greenlet_spawn(self.sync_session.rollback) + + async def commit(self): + return await greenlet_spawn(self.sync_session.commit) + + async def close(self): + return await greenlet_spawn(self.sync_session.close) + + async def __aenter__(self): + return self + + async def __aexit__(self, type_, value, traceback): + await self.close() + + +class AsyncSessionTransaction(StartableContext): + """A wrapper for the ORM :class:`_orm.SessionTransaction` object. + + This object is provided so that a transaction-holding object + for the :meth:`_asyncio.AsyncSession.begin` may be returned. + + The object supports both explicit calls to + :meth:`_asyncio.AsyncSessionTransaction.commit` and + :meth:`_asyncio.AsyncSessionTransaction.rollback`, as well as use as an + async context manager. + + + .. versionadded:: 1.4 + + """ + + __slots__ = ("session", "sync_transaction", "nested") + + def __init__(self, session, nested=False): + self.session = session + self.nested = nested + self.sync_transaction = None + + @property + def is_active(self): + return ( + self._sync_transaction() is not None + and self._sync_transaction().is_active + ) + + def _sync_transaction(self): + if not self.sync_transaction: + self._raise_for_not_started() + return self.sync_transaction + + async def rollback(self): + """Roll back this :class:`_asyncio.AsyncTransaction`. + + """ + await greenlet_spawn(self._sync_transaction().rollback) + + async def commit(self): + """Commit this :class:`_asyncio.AsyncTransaction`.""" + + await greenlet_spawn(self._sync_transaction().commit) + + async def start(self): + self.sync_transaction = await greenlet_spawn( + self.session.sync_session.begin_nested + if self.nested + else self.session.sync_session.begin + ) + return self + + async def __aexit__(self, type_, value, traceback): + return await greenlet_spawn( + self._sync_transaction().__exit__, type_, value, traceback + ) |