summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/ext/asyncio
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2020-07-04 12:21:36 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2020-08-13 18:41:53 -0400
commit5fb0138a3220161703e6ab1087319a669d14e7f4 (patch)
tree25d006b30830ce6bc71f7a69bed9b570e1ae9654 /lib/sqlalchemy/ext/asyncio
parentcd03b8f0cecbf72ecd6c99c4d3a6338c8278b40d (diff)
downloadsqlalchemy-5fb0138a3220161703e6ab1087319a669d14e7f4.tar.gz
Implement rudimentary asyncio support w/ asyncpg
Using the approach introduced at https://gist.github.com/zzzeek/6287e28054d3baddc07fa21a7227904e We can now create asyncio endpoints that are then handled in "implicit IO" form within the majority of the Core internals. Then coroutines are re-exposed at the point at which we call into asyncpg methods. Patch includes: * asyncpg dialect * asyncio package * engine, result, ORM session classes * new test fixtures, tests * some work with pep-484 and a short plugin for the pyannotate package, which seems to have so-so results Change-Id: Idbcc0eff72c4cad572914acdd6f40ddb1aef1a7d Fixes: #3414
Diffstat (limited to 'lib/sqlalchemy/ext/asyncio')
-rw-r--r--lib/sqlalchemy/ext/asyncio/__init__.py9
-rw-r--r--lib/sqlalchemy/ext/asyncio/base.py25
-rw-r--r--lib/sqlalchemy/ext/asyncio/engine.py461
-rw-r--r--lib/sqlalchemy/ext/asyncio/exc.py14
-rw-r--r--lib/sqlalchemy/ext/asyncio/result.py669
-rw-r--r--lib/sqlalchemy/ext/asyncio/session.py293
6 files changed, 1471 insertions, 0 deletions
diff --git a/lib/sqlalchemy/ext/asyncio/__init__.py b/lib/sqlalchemy/ext/asyncio/__init__.py
new file mode 100644
index 000000000..fbbc958d4
--- /dev/null
+++ b/lib/sqlalchemy/ext/asyncio/__init__.py
@@ -0,0 +1,9 @@
+from .engine import AsyncConnection # noqa
+from .engine import AsyncEngine # noqa
+from .engine import AsyncTransaction # noqa
+from .engine import create_async_engine # noqa
+from .result import AsyncMappingResult # noqa
+from .result import AsyncResult # noqa
+from .result import AsyncScalarResult # noqa
+from .session import AsyncSession # noqa
+from .session import AsyncSessionTransaction # noqa
diff --git a/lib/sqlalchemy/ext/asyncio/base.py b/lib/sqlalchemy/ext/asyncio/base.py
new file mode 100644
index 000000000..051f9e21a
--- /dev/null
+++ b/lib/sqlalchemy/ext/asyncio/base.py
@@ -0,0 +1,25 @@
+import abc
+
+from . import exc as async_exc
+
+
+class StartableContext(abc.ABC):
+ @abc.abstractmethod
+ async def start(self) -> "StartableContext":
+ pass
+
+ def __await__(self):
+ return self.start().__await__()
+
+ async def __aenter__(self):
+ return await self.start()
+
+ @abc.abstractmethod
+ async def __aexit__(self, type_, value, traceback):
+ pass
+
+ def _raise_for_not_started(self):
+ raise async_exc.AsyncContextNotStarted(
+ "%s context has not been started and object has not been awaited."
+ % (self.__class__.__name__)
+ )
diff --git a/lib/sqlalchemy/ext/asyncio/engine.py b/lib/sqlalchemy/ext/asyncio/engine.py
new file mode 100644
index 000000000..2d9198d16
--- /dev/null
+++ b/lib/sqlalchemy/ext/asyncio/engine.py
@@ -0,0 +1,461 @@
+from typing import Any
+from typing import Callable
+from typing import Mapping
+from typing import Optional
+
+from . import exc as async_exc
+from .base import StartableContext
+from .result import AsyncResult
+from ... import exc
+from ... import util
+from ...engine import Connection
+from ...engine import create_engine as _create_engine
+from ...engine import Engine
+from ...engine import Result
+from ...engine import Transaction
+from ...engine.base import OptionEngineMixin
+from ...sql import Executable
+from ...util.concurrency import greenlet_spawn
+
+
+def create_async_engine(*arg, **kw):
+ """Create a new async engine instance.
+
+ Arguments passed to :func:`_asyncio.create_async_engine` are mostly
+ identical to those passed to the :func:`_sa.create_engine` function.
+ The specified dialect must be an asyncio-compatible dialect
+ such as :ref:`dialect-postgresql-asyncpg`.
+
+ .. versionadded:: 1.4
+
+ """
+
+ if kw.get("server_side_cursors", False):
+ raise exc.AsyncMethodRequired(
+ "Can't set server_side_cursors for async engine globally; "
+ "use the connection.stream() method for an async "
+ "streaming result set"
+ )
+ kw["future"] = True
+ sync_engine = _create_engine(*arg, **kw)
+ return AsyncEngine(sync_engine)
+
+
+class AsyncConnection(StartableContext):
+ """An asyncio proxy for a :class:`_engine.Connection`.
+
+ :class:`_asyncio.AsyncConnection` is acquired using the
+ :meth:`_asyncio.AsyncEngine.connect`
+ method of :class:`_asyncio.AsyncEngine`::
+
+ from sqlalchemy.ext.asyncio import create_async_engine
+ engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname")
+
+ async with engine.connect() as conn:
+ result = await conn.execute(select(table))
+
+ .. versionadded:: 1.4
+
+ """ # noqa
+
+ __slots__ = (
+ "sync_engine",
+ "sync_connection",
+ )
+
+ def __init__(
+ self, sync_engine: Engine, sync_connection: Optional[Connection] = None
+ ):
+ self.sync_engine = sync_engine
+ self.sync_connection = sync_connection
+
+ async def start(self):
+ """Start this :class:`_asyncio.AsyncConnection` object's context
+ outside of using a Python ``with:`` block.
+
+ """
+ if self.sync_connection:
+ raise exc.InvalidRequestError("connection is already started")
+ self.sync_connection = await (greenlet_spawn(self.sync_engine.connect))
+ return self
+
+ def _sync_connection(self):
+ if not self.sync_connection:
+ self._raise_for_not_started()
+ return self.sync_connection
+
+ def begin(self) -> "AsyncTransaction":
+ """Begin a transaction prior to autobegin occurring.
+
+ """
+ self._sync_connection()
+ return AsyncTransaction(self)
+
+ def begin_nested(self) -> "AsyncTransaction":
+ """Begin a nested transaction and return a transaction handle.
+
+ """
+ self._sync_connection()
+ return AsyncTransaction(self, nested=True)
+
+ async def commit(self):
+ """Commit the transaction that is currently in progress.
+
+ This method commits the current transaction if one has been started.
+ If no transaction was started, the method has no effect, assuming
+ the connection is in a non-invalidated state.
+
+ A transaction is begun on a :class:`_future.Connection` automatically
+ whenever a statement is first executed, or when the
+ :meth:`_future.Connection.begin` method is called.
+
+ """
+ conn = self._sync_connection()
+ await greenlet_spawn(conn.commit)
+
+ async def rollback(self):
+ """Roll back the transaction that is currently in progress.
+
+ This method rolls back the current transaction if one has been started.
+ If no transaction was started, the method has no effect. If a
+ transaction was started and the connection is in an invalidated state,
+ the transaction is cleared using this method.
+
+ A transaction is begun on a :class:`_future.Connection` automatically
+ whenever a statement is first executed, or when the
+ :meth:`_future.Connection.begin` method is called.
+
+
+ """
+ conn = self._sync_connection()
+ await greenlet_spawn(conn.rollback)
+
+ async def close(self):
+ """Close this :class:`_asyncio.AsyncConnection`.
+
+ This has the effect of also rolling back the transaction if one
+ is in place.
+
+ """
+ conn = self._sync_connection()
+ await greenlet_spawn(conn.close)
+
+ async def exec_driver_sql(
+ self,
+ statement: Executable,
+ parameters: Optional[Mapping] = None,
+ execution_options: Mapping = util.EMPTY_DICT,
+ ) -> Result:
+ r"""Executes a driver-level SQL string and return buffered
+ :class:`_engine.Result`.
+
+ """
+
+ conn = self._sync_connection()
+
+ result = await greenlet_spawn(
+ conn.exec_driver_sql, statement, parameters, execution_options,
+ )
+ if result.context._is_server_side:
+ raise async_exc.AsyncMethodRequired(
+ "Can't use the connection.exec_driver_sql() method with a "
+ "server-side cursor."
+ "Use the connection.stream() method for an async "
+ "streaming result set."
+ )
+
+ return result
+
+ async def stream(
+ self,
+ statement: Executable,
+ parameters: Optional[Mapping] = None,
+ execution_options: Mapping = util.EMPTY_DICT,
+ ) -> AsyncResult:
+ """Execute a statement and return a streaming
+ :class:`_asyncio.AsyncResult` object."""
+
+ conn = self._sync_connection()
+
+ result = await greenlet_spawn(
+ conn._execute_20,
+ statement,
+ parameters,
+ util.EMPTY_DICT.merge_with(
+ execution_options, {"stream_results": True}
+ ),
+ )
+ if not result.context._is_server_side:
+ # TODO: real exception here
+ assert False, "server side result expected"
+ return AsyncResult(result)
+
+ async def execute(
+ self,
+ statement: Executable,
+ parameters: Optional[Mapping] = None,
+ execution_options: Mapping = util.EMPTY_DICT,
+ ) -> Result:
+ r"""Executes a SQL statement construct and return a buffered
+ :class:`_engine.Result`.
+
+ :param object: The statement to be executed. This is always
+ an object that is in both the :class:`_expression.ClauseElement` and
+ :class:`_expression.Executable` hierarchies, including:
+
+ * :class:`_expression.Select`
+ * :class:`_expression.Insert`, :class:`_expression.Update`,
+ :class:`_expression.Delete`
+ * :class:`_expression.TextClause` and
+ :class:`_expression.TextualSelect`
+ * :class:`_schema.DDL` and objects which inherit from
+ :class:`_schema.DDLElement`
+
+ :param parameters: parameters which will be bound into the statement.
+ This may be either a dictionary of parameter names to values,
+ or a mutable sequence (e.g. a list) of dictionaries. When a
+ list of dictionaries is passed, the underlying statement execution
+ will make use of the DBAPI ``cursor.executemany()`` method.
+ When a single dictionary is passed, the DBAPI ``cursor.execute()``
+ method will be used.
+
+ :param execution_options: optional dictionary of execution options,
+ which will be associated with the statement execution. This
+ dictionary can provide a subset of the options that are accepted
+ by :meth:`_future.Connection.execution_options`.
+
+ :return: a :class:`_engine.Result` object.
+
+ """
+ conn = self._sync_connection()
+
+ result = await greenlet_spawn(
+ conn._execute_20, statement, parameters, execution_options,
+ )
+ if result.context._is_server_side:
+ raise async_exc.AsyncMethodRequired(
+ "Can't use the connection.execute() method with a "
+ "server-side cursor."
+ "Use the connection.stream() method for an async "
+ "streaming result set."
+ )
+ return result
+
+ async def scalar(
+ self,
+ statement: Executable,
+ parameters: Optional[Mapping] = None,
+ execution_options: Mapping = util.EMPTY_DICT,
+ ) -> Any:
+ r"""Executes a SQL statement construct and returns a scalar object.
+
+ This method is shorthand for invoking the
+ :meth:`_engine.Result.scalar` method after invoking the
+ :meth:`_future.Connection.execute` method. Parameters are equivalent.
+
+ :return: a scalar Python value representing the first column of the
+ first row returned.
+
+ """
+ result = await self.execute(statement, parameters, execution_options)
+ return result.scalar()
+
+ async def run_sync(self, fn: Callable, *arg, **kw) -> Any:
+ """"Invoke the given sync callable passing self as the first argument.
+
+ This method maintains the asyncio event loop all the way through
+ to the database connection by running the given callable in a
+ specially instrumented greenlet.
+
+ E.g.::
+
+ with async_engine.begin() as conn:
+ await conn.run_sync(metadata.create_all)
+
+ """
+
+ conn = self._sync_connection()
+
+ return await greenlet_spawn(fn, conn, *arg, **kw)
+
+ def __await__(self):
+ return self.start().__await__()
+
+ async def __aexit__(self, type_, value, traceback):
+ await self.close()
+
+
+class AsyncEngine:
+ """An asyncio proxy for a :class:`_engine.Engine`.
+
+ :class:`_asyncio.AsyncEngine` is acquired using the
+ :func:`_asyncio.create_async_engine` function::
+
+ from sqlalchemy.ext.asyncio import create_async_engine
+ engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname")
+
+ .. versionadded:: 1.4
+
+
+ """ # noqa
+
+ __slots__ = ("sync_engine",)
+
+ _connection_cls = AsyncConnection
+
+ _option_cls: type
+
+ class _trans_ctx(StartableContext):
+ def __init__(self, conn):
+ self.conn = conn
+
+ async def start(self):
+ await self.conn.start()
+ self.transaction = self.conn.begin()
+ await self.transaction.__aenter__()
+
+ return self.conn
+
+ async def __aexit__(self, type_, value, traceback):
+ if type_ is not None:
+ await self.transaction.rollback()
+ else:
+ if self.transaction.is_active:
+ await self.transaction.commit()
+ await self.conn.close()
+
+ def __init__(self, sync_engine: Engine):
+ self.sync_engine = sync_engine
+
+ def begin(self):
+ """Return a context manager which when entered will deliver an
+ :class:`_asyncio.AsyncConnection` with an
+ :class:`_asyncio.AsyncTransaction` established.
+
+ E.g.::
+
+ async with async_engine.begin() as conn:
+ await conn.execute(
+ text("insert into table (x, y, z) values (1, 2, 3)")
+ )
+ await conn.execute(text("my_special_procedure(5)"))
+
+
+ """
+ conn = self.connect()
+ return self._trans_ctx(conn)
+
+ def connect(self) -> AsyncConnection:
+ """Return an :class:`_asyncio.AsyncConnection` object.
+
+ The :class:`_asyncio.AsyncConnection` will procure a database
+ connection from the underlying connection pool when it is entered
+ as an async context manager::
+
+ async with async_engine.connect() as conn:
+ result = await conn.execute(select(user_table))
+
+ The :class:`_asyncio.AsyncConnection` may also be started outside of a
+ context manager by invoking its :meth:`_asyncio.AsyncConnection.start`
+ method.
+
+ """
+
+ return self._connection_cls(self.sync_engine)
+
+ async def raw_connection(self) -> Any:
+ """Return a "raw" DBAPI connection from the connection pool.
+
+ .. seealso::
+
+ :ref:`dbapi_connections`
+
+ """
+ return await greenlet_spawn(self.sync_engine.raw_connection)
+
+
+class AsyncOptionEngine(OptionEngineMixin, AsyncEngine):
+ pass
+
+
+AsyncEngine._option_cls = AsyncOptionEngine
+
+
+class AsyncTransaction(StartableContext):
+ """An asyncio proxy for a :class:`_engine.Transaction`."""
+
+ __slots__ = ("connection", "sync_transaction", "nested")
+
+ def __init__(self, connection: AsyncConnection, nested: bool = False):
+ self.connection = connection
+ self.sync_transaction: Optional[Transaction] = None
+ self.nested = nested
+
+ def _sync_transaction(self):
+ if not self.sync_transaction:
+ self._raise_for_not_started()
+ return self.sync_transaction
+
+ @property
+ def is_valid(self) -> bool:
+ return self._sync_transaction().is_valid
+
+ @property
+ def is_active(self) -> bool:
+ return self._sync_transaction().is_active
+
+ async def close(self):
+ """Close this :class:`.Transaction`.
+
+ If this transaction is the base transaction in a begin/commit
+ nesting, the transaction will rollback(). Otherwise, the
+ method returns.
+
+ This is used to cancel a Transaction without affecting the scope of
+ an enclosing transaction.
+
+ """
+ await greenlet_spawn(self._sync_transaction().close)
+
+ async def rollback(self):
+ """Roll back this :class:`.Transaction`.
+
+ """
+ await greenlet_spawn(self._sync_transaction().rollback)
+
+ async def commit(self):
+ """Commit this :class:`.Transaction`."""
+
+ await greenlet_spawn(self._sync_transaction().commit)
+
+ async def start(self):
+ """Start this :class:`_asyncio.AsyncTransaction` object's context
+ outside of using a Python ``with:`` block.
+
+ """
+
+ self.sync_transaction = await greenlet_spawn(
+ self.connection._sync_connection().begin_nested
+ if self.nested
+ else self.connection._sync_connection().begin
+ )
+ return self
+
+ async def __aexit__(self, type_, value, traceback):
+ if type_ is None and self.is_active:
+ try:
+ await self.commit()
+ except:
+ with util.safe_reraise():
+ await self.rollback()
+ else:
+ await self.rollback()
+
+
+def _get_sync_engine(async_engine):
+ try:
+ return async_engine.sync_engine
+ except AttributeError as e:
+ raise exc.ArgumentError(
+ "AsyncEngine expected, got %r" % async_engine
+ ) from e
diff --git a/lib/sqlalchemy/ext/asyncio/exc.py b/lib/sqlalchemy/ext/asyncio/exc.py
new file mode 100644
index 000000000..6137bf6df
--- /dev/null
+++ b/lib/sqlalchemy/ext/asyncio/exc.py
@@ -0,0 +1,14 @@
+from ... import exc
+
+
+class AsyncMethodRequired(exc.InvalidRequestError):
+ """an API can't be used because its result would not be
+ compatible with async"""
+
+
+class AsyncContextNotStarted(exc.InvalidRequestError):
+ """a startable context manager has not been started."""
+
+
+class AsyncContextAlreadyStarted(exc.InvalidRequestError):
+ """a startable context manager is already started."""
diff --git a/lib/sqlalchemy/ext/asyncio/result.py b/lib/sqlalchemy/ext/asyncio/result.py
new file mode 100644
index 000000000..52b40acba
--- /dev/null
+++ b/lib/sqlalchemy/ext/asyncio/result.py
@@ -0,0 +1,669 @@
+import operator
+
+from ... import util
+from ...engine.result import _NO_ROW
+from ...engine.result import FilterResult
+from ...engine.result import FrozenResult
+from ...engine.result import MergedResult
+from ...util.concurrency import greenlet_spawn
+
+if util.TYPE_CHECKING:
+ from typing import Any
+ from typing import List
+ from typing import Optional
+ from typing import Int
+ from typing import Iterator
+ from typing import Mapping
+ from ...engine.result import Row
+
+
+class AsyncResult(FilterResult):
+ """An asyncio wrapper around a :class:`_result.Result` object.
+
+ The :class:`_asyncio.AsyncResult` only applies to statement executions that
+ use a server-side cursor. It is returned only from the
+ :meth:`_asyncio.AsyncConnection.stream` and
+ :meth:`_asyncio.AsyncSession.stream` methods.
+
+ .. versionadded:: 1.4
+
+ """
+
+ def __init__(self, real_result):
+ self._real_result = real_result
+
+ self._metadata = real_result._metadata
+ self._unique_filter_state = real_result._unique_filter_state
+
+ # BaseCursorResult pre-generates the "_row_getter". Use that
+ # if available rather than building a second one
+ if "_row_getter" in real_result.__dict__:
+ self._set_memoized_attribute(
+ "_row_getter", real_result.__dict__["_row_getter"]
+ )
+
+ def keys(self):
+ """Return the :meth:`_engine.Result.keys` collection from the
+ underlying :class:`_engine.Result`.
+
+ """
+ return self._metadata.keys
+
+ def unique(self, strategy=None):
+ """Apply unique filtering to the objects returned by this
+ :class:`_asyncio.AsyncResult`.
+
+ Refer to :meth:`_engine.Result.unique` in the synchronous
+ SQLAlchemy API for a complete behavioral description.
+
+
+ """
+ self._unique_filter_state = (set(), strategy)
+ return self
+
+ def columns(self, *col_expressions):
+ # type: (*object) -> AsyncResult
+ r"""Establish the columns that should be returned in each row.
+
+ Refer to :meth:`_engine.Result.columns` in the synchronous
+ SQLAlchemy API for a complete behavioral description.
+
+
+ """
+ return self._column_slices(col_expressions)
+
+ async def partitions(self, size=None):
+ # type: (Optional[Int]) -> Iterator[List[Any]]
+ """Iterate through sub-lists of rows of the size given.
+
+ An async iterator is returned::
+
+ async def scroll_results(connection):
+ result = await connection.stream(select(users_table))
+
+ async for partition in result.partitions(100):
+ print("list of rows: %s" % partition)
+
+ .. seealso::
+
+ :meth:`_engine.Result.partitions`
+
+ """
+
+ getter = self._manyrow_getter
+
+ while True:
+ partition = await greenlet_spawn(getter, self, size)
+ if partition:
+ yield partition
+ else:
+ break
+
+ async def fetchone(self):
+ # type: () -> Row
+ """Fetch one row.
+
+ When all rows are exhausted, returns None.
+
+ This method is provided for backwards compatibility with
+ SQLAlchemy 1.x.x.
+
+ To fetch the first row of a result only, use the
+ :meth:`_engine.Result.first` method. To iterate through all
+ rows, iterate the :class:`_engine.Result` object directly.
+
+ :return: a :class:`.Row` object if no filters are applied, or None
+ if no rows remain.
+
+ """
+ row = await greenlet_spawn(self._onerow_getter, self)
+ if row is _NO_ROW:
+ return None
+ else:
+ return row
+
+ async def fetchmany(self, size=None):
+ # type: (Optional[Int]) -> List[Row]
+ """Fetch many rows.
+
+ When all rows are exhausted, returns an empty list.
+
+ This method is provided for backwards compatibility with
+ SQLAlchemy 1.x.x.
+
+ To fetch rows in groups, use the
+ :meth:`._asyncio.AsyncResult.partitions` method.
+
+ :return: a list of :class:`.Row` objects.
+
+ .. seealso::
+
+ :meth:`_asyncio.AsyncResult.partitions`
+
+ """
+
+ return await greenlet_spawn(self._manyrow_getter, self, size)
+
+ async def all(self):
+ # type: () -> List[Row]
+ """Return all rows in a list.
+
+ Closes the result set after invocation. Subsequent invocations
+ will return an empty list.
+
+ :return: a list of :class:`.Row` objects.
+
+ """
+
+ return await greenlet_spawn(self._allrows)
+
+ def __aiter__(self):
+ return self
+
+ async def __anext__(self):
+ row = await greenlet_spawn(self._onerow_getter, self)
+ if row is _NO_ROW:
+ raise StopAsyncIteration()
+ else:
+ return row
+
+ async def first(self):
+ # type: () -> Row
+ """Fetch the first row or None if no row is present.
+
+ Closes the result set and discards remaining rows.
+
+ .. note:: This method returns one **row**, e.g. tuple, by default. To
+ return exactly one single scalar value, that is, the first column of
+ the first row, use the :meth:`_asyncio.AsyncResult.scalar` method,
+ or combine :meth:`_asyncio.AsyncResult.scalars` and
+ :meth:`_asyncio.AsyncResult.first`.
+
+ :return: a :class:`.Row` object, or None
+ if no rows remain.
+
+ .. seealso::
+
+ :meth:`_asyncio.AsyncResult.scalar`
+
+ :meth:`_asyncio.AsyncResult.one`
+
+ """
+ return await greenlet_spawn(self._only_one_row, False, False, False)
+
+ async def one_or_none(self):
+ # type: () -> Optional[Row]
+ """Return at most one result or raise an exception.
+
+ Returns ``None`` if the result has no rows.
+ Raises :class:`.MultipleResultsFound`
+ if multiple rows are returned.
+
+ .. versionadded:: 1.4
+
+ :return: The first :class:`.Row` or None if no row is available.
+
+ :raises: :class:`.MultipleResultsFound`
+
+ .. seealso::
+
+ :meth:`_asyncio.AsyncResult.first`
+
+ :meth:`_asyncio.AsyncResult.one`
+
+ """
+ return await greenlet_spawn(self._only_one_row, True, False, False)
+
+ async def scalar_one(self):
+ # type: () -> Any
+ """Return exactly one scalar result or raise an exception.
+
+ This is equvalent to calling :meth:`_asyncio.AsyncResult.scalars` and
+ then :meth:`_asyncio.AsyncResult.one`.
+
+ .. seealso::
+
+ :meth:`_asyncio.AsyncResult.one`
+
+ :meth:`_asyncio.AsyncResult.scalars`
+
+ """
+ return await greenlet_spawn(self._only_one_row, True, True, True)
+
+ async def scalar_one_or_none(self):
+ # type: () -> Optional[Any]
+ """Return exactly one or no scalar result.
+
+ This is equvalent to calling :meth:`_asyncio.AsyncResult.scalars` and
+ then :meth:`_asyncio.AsyncResult.one_or_none`.
+
+ .. seealso::
+
+ :meth:`_asyncio.AsyncResult.one_or_none`
+
+ :meth:`_asyncio.AsyncResult.scalars`
+
+ """
+ return await greenlet_spawn(self._only_one_row, True, False, True)
+
+ async def one(self):
+ # type: () -> Row
+ """Return exactly one row or raise an exception.
+
+ Raises :class:`.NoResultFound` if the result returns no
+ rows, or :class:`.MultipleResultsFound` if multiple rows
+ would be returned.
+
+ .. note:: This method returns one **row**, e.g. tuple, by default.
+ To return exactly one single scalar value, that is, the first
+ column of the first row, use the
+ :meth:`_asyncio.AsyncResult.scalar_one` method, or combine
+ :meth:`_asyncio.AsyncResult.scalars` and
+ :meth:`_asyncio.AsyncResult.one`.
+
+ .. versionadded:: 1.4
+
+ :return: The first :class:`.Row`.
+
+ :raises: :class:`.MultipleResultsFound`, :class:`.NoResultFound`
+
+ .. seealso::
+
+ :meth:`_asyncio.AsyncResult.first`
+
+ :meth:`_asyncio.AsyncResult.one_or_none`
+
+ :meth:`_asyncio.AsyncResult.scalar_one`
+
+ """
+ return await greenlet_spawn(self._only_one_row, True, True, False)
+
+ async def scalar(self):
+ # type: () -> Optional[Any]
+ """Fetch the first column of the first row, and close the result set.
+
+ Returns None if there are no rows to fetch.
+
+ No validation is performed to test if additional rows remain.
+
+ After calling this method, the object is fully closed,
+ e.g. the :meth:`_engine.CursorResult.close`
+ method will have been called.
+
+ :return: a Python scalar value , or None if no rows remain.
+
+ """
+ return await greenlet_spawn(self._only_one_row, False, False, True)
+
+ async def freeze(self):
+ """Return a callable object that will produce copies of this
+ :class:`_asyncio.AsyncResult` when invoked.
+
+ The callable object returned is an instance of
+ :class:`_engine.FrozenResult`.
+
+ This is used for result set caching. The method must be called
+ on the result when it has been unconsumed, and calling the method
+ will consume the result fully. When the :class:`_engine.FrozenResult`
+ is retrieved from a cache, it can be called any number of times where
+ it will produce a new :class:`_engine.Result` object each time
+ against its stored set of rows.
+
+ .. seealso::
+
+ :ref:`do_orm_execute_re_executing` - example usage within the
+ ORM to implement a result-set cache.
+
+ """
+
+ return await greenlet_spawn(FrozenResult, self)
+
+ def merge(self, *others):
+ """Merge this :class:`_asyncio.AsyncResult` with other compatible result
+ objects.
+
+ The object returned is an instance of :class:`_engine.MergedResult`,
+ which will be composed of iterators from the given result
+ objects.
+
+ The new result will use the metadata from this result object.
+ The subsequent result objects must be against an identical
+ set of result / cursor metadata, otherwise the behavior is
+ undefined.
+
+ """
+ return MergedResult(self._metadata, (self,) + others)
+
+ def scalars(self, index=0):
+ # type: (Int) -> AsyncScalarResult
+ """Return an :class:`_asyncio.AsyncScalarResult` filtering object which
+ will return single elements rather than :class:`_row.Row` objects.
+
+ Refer to :meth:`_result.Result.scalars` in the synchronous
+ SQLAlchemy API for a complete behavioral description.
+
+ :param index: integer or row key indicating the column to be fetched
+ from each row, defaults to ``0`` indicating the first column.
+
+ :return: a new :class:`_asyncio.AsyncScalarResult` filtering object
+ referring to this :class:`_asyncio.AsyncResult` object.
+
+ """
+ return AsyncScalarResult(self._real_result, index)
+
+ def mappings(self):
+ # type() -> AsyncMappingResult
+ """Apply a mappings filter to returned rows, returning an instance of
+ :class:`_asyncio.AsyncMappingResult`.
+
+ When this filter is applied, fetching rows will return
+ :class:`.RowMapping` objects instead of :class:`.Row` objects.
+
+ Refer to :meth:`_result.Result.mappings` in the synchronous
+ SQLAlchemy API for a complete behavioral description.
+
+ :return: a new :class:`_asyncio.AsyncMappingResult` filtering object
+ referring to the underlying :class:`_result.Result` object.
+
+ """
+
+ return AsyncMappingResult(self._real_result)
+
+
+class AsyncScalarResult(FilterResult):
+ """A wrapper for a :class:`_asyncio.AsyncResult` that returns scalar values
+ rather than :class:`_row.Row` values.
+
+ The :class:`_asyncio.AsyncScalarResult` object is acquired by calling the
+ :meth:`_asyncio.AsyncResult.scalars` method.
+
+ Refer to the :class:`_result.ScalarResult` object in the synchronous
+ SQLAlchemy API for a complete behavioral description.
+
+ .. versionadded:: 1.4
+
+ """
+
+ _generate_rows = False
+
+ def __init__(self, real_result, index):
+ self._real_result = real_result
+
+ if real_result._source_supports_scalars:
+ self._metadata = real_result._metadata
+ self._post_creational_filter = None
+ else:
+ self._metadata = real_result._metadata._reduce([index])
+ self._post_creational_filter = operator.itemgetter(0)
+
+ self._unique_filter_state = real_result._unique_filter_state
+
+ def unique(self, strategy=None):
+ # type: () -> AsyncScalarResult
+ """Apply unique filtering to the objects returned by this
+ :class:`_asyncio.AsyncScalarResult`.
+
+ See :meth:`_asyncio.AsyncResult.unique` for usage details.
+
+ """
+ self._unique_filter_state = (set(), strategy)
+ return self
+
+ async def partitions(self, size=None):
+ # type: (Optional[Int]) -> Iterator[List[Any]]
+ """Iterate through sub-lists of elements of the size given.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.partitions` except that
+ scalar values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+
+ getter = self._manyrow_getter
+
+ while True:
+ partition = await greenlet_spawn(getter, self, size)
+ if partition:
+ yield partition
+ else:
+ break
+
+ async def fetchall(self):
+ # type: () -> List[Any]
+ """A synonym for the :meth:`_asyncio.AsyncScalarResult.all` method."""
+
+ return await greenlet_spawn(self._allrows)
+
+ async def fetchmany(self, size=None):
+ # type: (Optional[Int]) -> List[Any]
+ """Fetch many objects.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.fetchmany` except that
+ scalar values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+ return await greenlet_spawn(self._manyrow_getter, self, size)
+
+ async def all(self):
+ # type: () -> List[Any]
+ """Return all scalar values in a list.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.all` except that
+ scalar values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+ return await greenlet_spawn(self._allrows)
+
+ def __aiter__(self):
+ return self
+
+ async def __anext__(self):
+ row = await greenlet_spawn(self._onerow_getter, self)
+ if row is _NO_ROW:
+ raise StopAsyncIteration()
+ else:
+ return row
+
+ async def first(self):
+ # type: () -> Optional[Any]
+ """Fetch the first object or None if no object is present.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.first` except that
+ scalar values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+ return await greenlet_spawn(self._only_one_row, False, False, False)
+
+ async def one_or_none(self):
+ # type: () -> Optional[Any]
+ """Return at most one object or raise an exception.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.one_or_none` except that
+ scalar values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+ return await greenlet_spawn(self._only_one_row, True, False, False)
+
+ async def one(self):
+ # type: () -> Any
+ """Return exactly one object or raise an exception.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.one` except that
+ scalar values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+ return await greenlet_spawn(self._only_one_row, True, True, False)
+
+
+class AsyncMappingResult(FilterResult):
+ """A wrapper for a :class:`_asyncio.AsyncResult` that returns dictionary values
+ rather than :class:`_engine.Row` values.
+
+ The :class:`_asyncio.AsyncMappingResult` object is acquired by calling the
+ :meth:`_asyncio.AsyncResult.mappings` method.
+
+ Refer to the :class:`_result.MappingResult` object in the synchronous
+ SQLAlchemy API for a complete behavioral description.
+
+ .. versionadded:: 1.4
+
+ """
+
+ _generate_rows = True
+
+ _post_creational_filter = operator.attrgetter("_mapping")
+
+ def __init__(self, result):
+ self._real_result = result
+ self._unique_filter_state = result._unique_filter_state
+ self._metadata = result._metadata
+ if result._source_supports_scalars:
+ self._metadata = self._metadata._reduce([0])
+
+ def keys(self):
+ """Return an iterable view which yields the string keys that would
+ be represented by each :class:`.Row`.
+
+ The view also can be tested for key containment using the Python
+ ``in`` operator, which will test both for the string keys represented
+ in the view, as well as for alternate keys such as column objects.
+
+ .. versionchanged:: 1.4 a key view object is returned rather than a
+ plain list.
+
+
+ """
+ return self._metadata.keys
+
+ def unique(self, strategy=None):
+ # type: () -> AsyncMappingResult
+ """Apply unique filtering to the objects returned by this
+ :class:`_asyncio.AsyncMappingResult`.
+
+ See :meth:`_asyncio.AsyncResult.unique` for usage details.
+
+ """
+ self._unique_filter_state = (set(), strategy)
+ return self
+
+ def columns(self, *col_expressions):
+ # type: (*object) -> AsyncMappingResult
+ r"""Establish the columns that should be returned in each row.
+
+
+ """
+ return self._column_slices(col_expressions)
+
+ async def partitions(self, size=None):
+ # type: (Optional[Int]) -> Iterator[List[Mapping]]
+ """Iterate through sub-lists of elements of the size given.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.partitions` except that
+ mapping values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+
+ getter = self._manyrow_getter
+
+ while True:
+ partition = await greenlet_spawn(getter, self, size)
+ if partition:
+ yield partition
+ else:
+ break
+
+ async def fetchall(self):
+ # type: () -> List[Mapping]
+ """A synonym for the :meth:`_asyncio.AsyncMappingResult.all` method."""
+
+ return await greenlet_spawn(self._allrows)
+
+ async def fetchone(self):
+ # type: () -> Mapping
+ """Fetch one object.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.fetchone` except that
+ mapping values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+
+ row = await greenlet_spawn(self._onerow_getter, self)
+ if row is _NO_ROW:
+ return None
+ else:
+ return row
+
+ async def fetchmany(self, size=None):
+ # type: (Optional[Int]) -> List[Mapping]
+ """Fetch many objects.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.fetchmany` except that
+ mapping values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+
+ return await greenlet_spawn(self._manyrow_getter, self, size)
+
+ async def all(self):
+ # type: () -> List[Mapping]
+ """Return all scalar values in a list.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.all` except that
+ mapping values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+
+ return await greenlet_spawn(self._allrows)
+
+ def __aiter__(self):
+ return self
+
+ async def __anext__(self):
+ row = await greenlet_spawn(self._onerow_getter, self)
+ if row is _NO_ROW:
+ raise StopAsyncIteration()
+ else:
+ return row
+
+ async def first(self):
+ # type: () -> Optional[Mapping]
+ """Fetch the first object or None if no object is present.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.first` except that
+ mapping values, rather than :class:`_result.Row` objects,
+ are returned.
+
+
+ """
+ return await greenlet_spawn(self._only_one_row, False, False, False)
+
+ async def one_or_none(self):
+ # type: () -> Optional[Mapping]
+ """Return at most one object or raise an exception.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.one_or_none` except that
+ mapping values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+ return await greenlet_spawn(self._only_one_row, True, False, False)
+
+ async def one(self):
+ # type: () -> Mapping
+ """Return exactly one object or raise an exception.
+
+ Equivalent to :meth:`_asyncio.AsyncResult.one` except that
+ mapping values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+ return await greenlet_spawn(self._only_one_row, True, True, False)
diff --git a/lib/sqlalchemy/ext/asyncio/session.py b/lib/sqlalchemy/ext/asyncio/session.py
new file mode 100644
index 000000000..167301780
--- /dev/null
+++ b/lib/sqlalchemy/ext/asyncio/session.py
@@ -0,0 +1,293 @@
+from typing import Any
+from typing import Callable
+from typing import List
+from typing import Mapping
+from typing import Optional
+
+from . import engine
+from . import result as _result
+from .base import StartableContext
+from .engine import AsyncEngine
+from ... import util
+from ...engine import Result
+from ...orm import Session
+from ...sql import Executable
+from ...util.concurrency import greenlet_spawn
+
+
+class AsyncSession:
+ """Asyncio version of :class:`_orm.Session`.
+
+
+ .. versionadded:: 1.4
+
+ """
+
+ def __init__(
+ self,
+ bind: AsyncEngine = None,
+ binds: Mapping[object, AsyncEngine] = None,
+ **kw
+ ):
+ kw["future"] = True
+ if bind:
+ bind = engine._get_sync_engine(bind)
+
+ if binds:
+ binds = {
+ key: engine._get_sync_engine(b) for key, b in binds.items()
+ }
+
+ self.sync_session = Session(bind=bind, binds=binds, **kw)
+
+ def add(self, instance: object) -> None:
+ """Place an object in this :class:`_asyncio.AsyncSession`.
+
+ .. seealso::
+
+ :meth:`_orm.Session.add`
+
+ """
+ self.sync_session.add(instance)
+
+ def add_all(self, instances: List[object]) -> None:
+ """Add the given collection of instances to this
+ :class:`_asyncio.AsyncSession`."""
+
+ self.sync_session.add_all(instances)
+
+ def expire_all(self):
+ """Expires all persistent instances within this Session.
+
+ See :meth:`_orm.Session.expire_all` for usage details.
+
+ """
+ self.sync_session.expire_all()
+
+ def expire(self, instance, attribute_names=None):
+ """Expire the attributes on an instance.
+
+ See :meth:`._orm.Session.expire` for usage details.
+
+ """
+ self.sync_session.expire()
+
+ async def refresh(
+ self, instance, attribute_names=None, with_for_update=None
+ ):
+ """Expire and refresh the attributes on the given instance.
+
+ A query will be issued to the database and all attributes will be
+ refreshed with their current database value.
+
+ This is the async version of the :meth:`_orm.Session.refresh` method.
+ See that method for a complete description of all options.
+
+ """
+
+ return await greenlet_spawn(
+ self.sync_session.refresh,
+ instance,
+ attribute_names=attribute_names,
+ with_for_update=with_for_update,
+ )
+
+ async def run_sync(self, fn: Callable, *arg, **kw) -> Any:
+ """Invoke the given sync callable passing sync self as the first
+ argument.
+
+ This method maintains the asyncio event loop all the way through
+ to the database connection by running the given callable in a
+ specially instrumented greenlet.
+
+ E.g.::
+
+ with AsyncSession(async_engine) as session:
+ await session.run_sync(some_business_method)
+
+ """
+
+ return await greenlet_spawn(fn, self.sync_session, *arg, **kw)
+
+ async def execute(
+ self,
+ statement: Executable,
+ params: Optional[Mapping] = None,
+ execution_options: Mapping = util.EMPTY_DICT,
+ bind_arguments: Optional[Mapping] = None,
+ **kw
+ ) -> Result:
+ """Execute a statement and return a buffered
+ :class:`_engine.Result` object."""
+
+ execution_options = execution_options.union({"prebuffer_rows": True})
+
+ return await greenlet_spawn(
+ self.sync_session.execute,
+ statement,
+ params=params,
+ execution_options=execution_options,
+ bind_arguments=bind_arguments,
+ **kw
+ )
+
+ async def stream(
+ self,
+ statement,
+ params=None,
+ execution_options=util.EMPTY_DICT,
+ bind_arguments=None,
+ **kw
+ ):
+ """Execute a statement and return a streaming
+ :class:`_asyncio.AsyncResult` object."""
+
+ execution_options = execution_options.union({"stream_results": True})
+
+ result = await greenlet_spawn(
+ self.sync_session.execute,
+ statement,
+ params=params,
+ execution_options=execution_options,
+ bind_arguments=bind_arguments,
+ **kw
+ )
+ return _result.AsyncResult(result)
+
+ async def merge(self, instance, load=True):
+ """Copy the state of a given instance into a corresponding instance
+ within this :class:`_asyncio.AsyncSession`.
+
+ """
+ return await greenlet_spawn(
+ self.sync_session.merge, instance, load=load
+ )
+
+ async def flush(self, objects=None):
+ """Flush all the object changes to the database.
+
+ .. seealso::
+
+ :meth:`_orm.Session.flush`
+
+ """
+ await greenlet_spawn(self.sync_session.flush, objects=objects)
+
+ async def connection(self):
+ r"""Return a :class:`_asyncio.AsyncConnection` object corresponding to this
+ :class:`.Session` object's transactional state.
+
+ """
+ sync_connection = await greenlet_spawn(self.sync_session.connection)
+ return engine.AsyncConnection(sync_connection.engine, sync_connection)
+
+ def begin(self, **kw):
+ """Return an :class:`_asyncio.AsyncSessionTransaction` object.
+
+ The underlying :class:`_orm.Session` will perform the
+ "begin" action when the :class:`_asyncio.AsyncSessionTransaction`
+ object is entered::
+
+ async with async_session.begin():
+ # .. ORM transaction is begun
+
+ Note that database IO will not normally occur when the session-level
+ transaction is begun, as database transactions begin on an
+ on-demand basis. However, the begin block is async to accommodate
+ for a :meth:`_orm.SessionEvents.after_transaction_create`
+ event hook that may perform IO.
+
+ For a general description of ORM begin, see
+ :meth:`_orm.Session.begin`.
+
+ """
+
+ return AsyncSessionTransaction(self)
+
+ def begin_nested(self, **kw):
+ """Return an :class:`_asyncio.AsyncSessionTransaction` object
+ which will begin a "nested" transaction, e.g. SAVEPOINT.
+
+ Behavior is the same as that of :meth:`_asyncio.AsyncSession.begin`.
+
+ For a general description of ORM begin nested, see
+ :meth:`_orm.Session.begin_nested`.
+
+ """
+
+ return AsyncSessionTransaction(self, nested=True)
+
+ async def rollback(self):
+ return await greenlet_spawn(self.sync_session.rollback)
+
+ async def commit(self):
+ return await greenlet_spawn(self.sync_session.commit)
+
+ async def close(self):
+ return await greenlet_spawn(self.sync_session.close)
+
+ async def __aenter__(self):
+ return self
+
+ async def __aexit__(self, type_, value, traceback):
+ await self.close()
+
+
+class AsyncSessionTransaction(StartableContext):
+ """A wrapper for the ORM :class:`_orm.SessionTransaction` object.
+
+ This object is provided so that a transaction-holding object
+ for the :meth:`_asyncio.AsyncSession.begin` may be returned.
+
+ The object supports both explicit calls to
+ :meth:`_asyncio.AsyncSessionTransaction.commit` and
+ :meth:`_asyncio.AsyncSessionTransaction.rollback`, as well as use as an
+ async context manager.
+
+
+ .. versionadded:: 1.4
+
+ """
+
+ __slots__ = ("session", "sync_transaction", "nested")
+
+ def __init__(self, session, nested=False):
+ self.session = session
+ self.nested = nested
+ self.sync_transaction = None
+
+ @property
+ def is_active(self):
+ return (
+ self._sync_transaction() is not None
+ and self._sync_transaction().is_active
+ )
+
+ def _sync_transaction(self):
+ if not self.sync_transaction:
+ self._raise_for_not_started()
+ return self.sync_transaction
+
+ async def rollback(self):
+ """Roll back this :class:`_asyncio.AsyncTransaction`.
+
+ """
+ await greenlet_spawn(self._sync_transaction().rollback)
+
+ async def commit(self):
+ """Commit this :class:`_asyncio.AsyncTransaction`."""
+
+ await greenlet_spawn(self._sync_transaction().commit)
+
+ async def start(self):
+ self.sync_transaction = await greenlet_spawn(
+ self.session.sync_session.begin_nested
+ if self.nested
+ else self.session.sync_session.begin
+ )
+ return self
+
+ async def __aexit__(self, type_, value, traceback):
+ return await greenlet_spawn(
+ self._sync_transaction().__exit__, type_, value, traceback
+ )