summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/ext/asyncio/session.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2020-07-04 12:21:36 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2020-08-13 18:41:53 -0400
commit5fb0138a3220161703e6ab1087319a669d14e7f4 (patch)
tree25d006b30830ce6bc71f7a69bed9b570e1ae9654 /lib/sqlalchemy/ext/asyncio/session.py
parentcd03b8f0cecbf72ecd6c99c4d3a6338c8278b40d (diff)
downloadsqlalchemy-5fb0138a3220161703e6ab1087319a669d14e7f4.tar.gz
Implement rudimentary asyncio support w/ asyncpg
Using the approach introduced at https://gist.github.com/zzzeek/6287e28054d3baddc07fa21a7227904e We can now create asyncio endpoints that are then handled in "implicit IO" form within the majority of the Core internals. Then coroutines are re-exposed at the point at which we call into asyncpg methods. Patch includes: * asyncpg dialect * asyncio package * engine, result, ORM session classes * new test fixtures, tests * some work with pep-484 and a short plugin for the pyannotate package, which seems to have so-so results Change-Id: Idbcc0eff72c4cad572914acdd6f40ddb1aef1a7d Fixes: #3414
Diffstat (limited to 'lib/sqlalchemy/ext/asyncio/session.py')
-rw-r--r--lib/sqlalchemy/ext/asyncio/session.py293
1 files changed, 293 insertions, 0 deletions
diff --git a/lib/sqlalchemy/ext/asyncio/session.py b/lib/sqlalchemy/ext/asyncio/session.py
new file mode 100644
index 000000000..167301780
--- /dev/null
+++ b/lib/sqlalchemy/ext/asyncio/session.py
@@ -0,0 +1,293 @@
+from typing import Any
+from typing import Callable
+from typing import List
+from typing import Mapping
+from typing import Optional
+
+from . import engine
+from . import result as _result
+from .base import StartableContext
+from .engine import AsyncEngine
+from ... import util
+from ...engine import Result
+from ...orm import Session
+from ...sql import Executable
+from ...util.concurrency import greenlet_spawn
+
+
+class AsyncSession:
+ """Asyncio version of :class:`_orm.Session`.
+
+
+ .. versionadded:: 1.4
+
+ """
+
+ def __init__(
+ self,
+ bind: AsyncEngine = None,
+ binds: Mapping[object, AsyncEngine] = None,
+ **kw
+ ):
+ kw["future"] = True
+ if bind:
+ bind = engine._get_sync_engine(bind)
+
+ if binds:
+ binds = {
+ key: engine._get_sync_engine(b) for key, b in binds.items()
+ }
+
+ self.sync_session = Session(bind=bind, binds=binds, **kw)
+
+ def add(self, instance: object) -> None:
+ """Place an object in this :class:`_asyncio.AsyncSession`.
+
+ .. seealso::
+
+ :meth:`_orm.Session.add`
+
+ """
+ self.sync_session.add(instance)
+
+ def add_all(self, instances: List[object]) -> None:
+ """Add the given collection of instances to this
+ :class:`_asyncio.AsyncSession`."""
+
+ self.sync_session.add_all(instances)
+
+ def expire_all(self):
+ """Expires all persistent instances within this Session.
+
+ See :meth:`_orm.Session.expire_all` for usage details.
+
+ """
+ self.sync_session.expire_all()
+
+ def expire(self, instance, attribute_names=None):
+ """Expire the attributes on an instance.
+
+ See :meth:`._orm.Session.expire` for usage details.
+
+ """
+ self.sync_session.expire()
+
+ async def refresh(
+ self, instance, attribute_names=None, with_for_update=None
+ ):
+ """Expire and refresh the attributes on the given instance.
+
+ A query will be issued to the database and all attributes will be
+ refreshed with their current database value.
+
+ This is the async version of the :meth:`_orm.Session.refresh` method.
+ See that method for a complete description of all options.
+
+ """
+
+ return await greenlet_spawn(
+ self.sync_session.refresh,
+ instance,
+ attribute_names=attribute_names,
+ with_for_update=with_for_update,
+ )
+
+ async def run_sync(self, fn: Callable, *arg, **kw) -> Any:
+ """Invoke the given sync callable passing sync self as the first
+ argument.
+
+ This method maintains the asyncio event loop all the way through
+ to the database connection by running the given callable in a
+ specially instrumented greenlet.
+
+ E.g.::
+
+ with AsyncSession(async_engine) as session:
+ await session.run_sync(some_business_method)
+
+ """
+
+ return await greenlet_spawn(fn, self.sync_session, *arg, **kw)
+
+ async def execute(
+ self,
+ statement: Executable,
+ params: Optional[Mapping] = None,
+ execution_options: Mapping = util.EMPTY_DICT,
+ bind_arguments: Optional[Mapping] = None,
+ **kw
+ ) -> Result:
+ """Execute a statement and return a buffered
+ :class:`_engine.Result` object."""
+
+ execution_options = execution_options.union({"prebuffer_rows": True})
+
+ return await greenlet_spawn(
+ self.sync_session.execute,
+ statement,
+ params=params,
+ execution_options=execution_options,
+ bind_arguments=bind_arguments,
+ **kw
+ )
+
+ async def stream(
+ self,
+ statement,
+ params=None,
+ execution_options=util.EMPTY_DICT,
+ bind_arguments=None,
+ **kw
+ ):
+ """Execute a statement and return a streaming
+ :class:`_asyncio.AsyncResult` object."""
+
+ execution_options = execution_options.union({"stream_results": True})
+
+ result = await greenlet_spawn(
+ self.sync_session.execute,
+ statement,
+ params=params,
+ execution_options=execution_options,
+ bind_arguments=bind_arguments,
+ **kw
+ )
+ return _result.AsyncResult(result)
+
+ async def merge(self, instance, load=True):
+ """Copy the state of a given instance into a corresponding instance
+ within this :class:`_asyncio.AsyncSession`.
+
+ """
+ return await greenlet_spawn(
+ self.sync_session.merge, instance, load=load
+ )
+
+ async def flush(self, objects=None):
+ """Flush all the object changes to the database.
+
+ .. seealso::
+
+ :meth:`_orm.Session.flush`
+
+ """
+ await greenlet_spawn(self.sync_session.flush, objects=objects)
+
+ async def connection(self):
+ r"""Return a :class:`_asyncio.AsyncConnection` object corresponding to this
+ :class:`.Session` object's transactional state.
+
+ """
+ sync_connection = await greenlet_spawn(self.sync_session.connection)
+ return engine.AsyncConnection(sync_connection.engine, sync_connection)
+
+ def begin(self, **kw):
+ """Return an :class:`_asyncio.AsyncSessionTransaction` object.
+
+ The underlying :class:`_orm.Session` will perform the
+ "begin" action when the :class:`_asyncio.AsyncSessionTransaction`
+ object is entered::
+
+ async with async_session.begin():
+ # .. ORM transaction is begun
+
+ Note that database IO will not normally occur when the session-level
+ transaction is begun, as database transactions begin on an
+ on-demand basis. However, the begin block is async to accommodate
+ for a :meth:`_orm.SessionEvents.after_transaction_create`
+ event hook that may perform IO.
+
+ For a general description of ORM begin, see
+ :meth:`_orm.Session.begin`.
+
+ """
+
+ return AsyncSessionTransaction(self)
+
+ def begin_nested(self, **kw):
+ """Return an :class:`_asyncio.AsyncSessionTransaction` object
+ which will begin a "nested" transaction, e.g. SAVEPOINT.
+
+ Behavior is the same as that of :meth:`_asyncio.AsyncSession.begin`.
+
+ For a general description of ORM begin nested, see
+ :meth:`_orm.Session.begin_nested`.
+
+ """
+
+ return AsyncSessionTransaction(self, nested=True)
+
+ async def rollback(self):
+ return await greenlet_spawn(self.sync_session.rollback)
+
+ async def commit(self):
+ return await greenlet_spawn(self.sync_session.commit)
+
+ async def close(self):
+ return await greenlet_spawn(self.sync_session.close)
+
+ async def __aenter__(self):
+ return self
+
+ async def __aexit__(self, type_, value, traceback):
+ await self.close()
+
+
+class AsyncSessionTransaction(StartableContext):
+ """A wrapper for the ORM :class:`_orm.SessionTransaction` object.
+
+ This object is provided so that a transaction-holding object
+ for the :meth:`_asyncio.AsyncSession.begin` may be returned.
+
+ The object supports both explicit calls to
+ :meth:`_asyncio.AsyncSessionTransaction.commit` and
+ :meth:`_asyncio.AsyncSessionTransaction.rollback`, as well as use as an
+ async context manager.
+
+
+ .. versionadded:: 1.4
+
+ """
+
+ __slots__ = ("session", "sync_transaction", "nested")
+
+ def __init__(self, session, nested=False):
+ self.session = session
+ self.nested = nested
+ self.sync_transaction = None
+
+ @property
+ def is_active(self):
+ return (
+ self._sync_transaction() is not None
+ and self._sync_transaction().is_active
+ )
+
+ def _sync_transaction(self):
+ if not self.sync_transaction:
+ self._raise_for_not_started()
+ return self.sync_transaction
+
+ async def rollback(self):
+ """Roll back this :class:`_asyncio.AsyncTransaction`.
+
+ """
+ await greenlet_spawn(self._sync_transaction().rollback)
+
+ async def commit(self):
+ """Commit this :class:`_asyncio.AsyncTransaction`."""
+
+ await greenlet_spawn(self._sync_transaction().commit)
+
+ async def start(self):
+ self.sync_transaction = await greenlet_spawn(
+ self.session.sync_session.begin_nested
+ if self.nested
+ else self.session.sync_session.begin
+ )
+ return self
+
+ async def __aexit__(self, type_, value, traceback):
+ return await greenlet_spawn(
+ self._sync_transaction().__exit__, type_, value, traceback
+ )