diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-07-04 12:21:36 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-08-13 18:41:53 -0400 |
commit | 5fb0138a3220161703e6ab1087319a669d14e7f4 (patch) | |
tree | 25d006b30830ce6bc71f7a69bed9b570e1ae9654 /lib/sqlalchemy/ext/asyncio | |
parent | cd03b8f0cecbf72ecd6c99c4d3a6338c8278b40d (diff) | |
download | sqlalchemy-5fb0138a3220161703e6ab1087319a669d14e7f4.tar.gz |
Implement rudimentary asyncio support w/ asyncpg
Using the approach introduced at
https://gist.github.com/zzzeek/6287e28054d3baddc07fa21a7227904e
We can now create asyncio endpoints that are then handled
in "implicit IO" form within the majority of the Core internals.
Then coroutines are re-exposed at the point at which we call
into asyncpg methods.
Patch includes:
* asyncpg dialect
* asyncio package
* engine, result, ORM session classes
* new test fixtures, tests
* some work with pep-484 and a short plugin for the
pyannotate package, which seems to have so-so results
Change-Id: Idbcc0eff72c4cad572914acdd6f40ddb1aef1a7d
Fixes: #3414
Diffstat (limited to 'lib/sqlalchemy/ext/asyncio')
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/__init__.py | 9 | ||||
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/base.py | 25 | ||||
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/engine.py | 461 | ||||
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/exc.py | 14 | ||||
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/result.py | 669 | ||||
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/session.py | 293 |
6 files changed, 1471 insertions, 0 deletions
diff --git a/lib/sqlalchemy/ext/asyncio/__init__.py b/lib/sqlalchemy/ext/asyncio/__init__.py new file mode 100644 index 000000000..fbbc958d4 --- /dev/null +++ b/lib/sqlalchemy/ext/asyncio/__init__.py @@ -0,0 +1,9 @@ +from .engine import AsyncConnection # noqa +from .engine import AsyncEngine # noqa +from .engine import AsyncTransaction # noqa +from .engine import create_async_engine # noqa +from .result import AsyncMappingResult # noqa +from .result import AsyncResult # noqa +from .result import AsyncScalarResult # noqa +from .session import AsyncSession # noqa +from .session import AsyncSessionTransaction # noqa diff --git a/lib/sqlalchemy/ext/asyncio/base.py b/lib/sqlalchemy/ext/asyncio/base.py new file mode 100644 index 000000000..051f9e21a --- /dev/null +++ b/lib/sqlalchemy/ext/asyncio/base.py @@ -0,0 +1,25 @@ +import abc + +from . import exc as async_exc + + +class StartableContext(abc.ABC): + @abc.abstractmethod + async def start(self) -> "StartableContext": + pass + + def __await__(self): + return self.start().__await__() + + async def __aenter__(self): + return await self.start() + + @abc.abstractmethod + async def __aexit__(self, type_, value, traceback): + pass + + def _raise_for_not_started(self): + raise async_exc.AsyncContextNotStarted( + "%s context has not been started and object has not been awaited." + % (self.__class__.__name__) + ) diff --git a/lib/sqlalchemy/ext/asyncio/engine.py b/lib/sqlalchemy/ext/asyncio/engine.py new file mode 100644 index 000000000..2d9198d16 --- /dev/null +++ b/lib/sqlalchemy/ext/asyncio/engine.py @@ -0,0 +1,461 @@ +from typing import Any +from typing import Callable +from typing import Mapping +from typing import Optional + +from . import exc as async_exc +from .base import StartableContext +from .result import AsyncResult +from ... import exc +from ... import util +from ...engine import Connection +from ...engine import create_engine as _create_engine +from ...engine import Engine +from ...engine import Result +from ...engine import Transaction +from ...engine.base import OptionEngineMixin +from ...sql import Executable +from ...util.concurrency import greenlet_spawn + + +def create_async_engine(*arg, **kw): + """Create a new async engine instance. + + Arguments passed to :func:`_asyncio.create_async_engine` are mostly + identical to those passed to the :func:`_sa.create_engine` function. + The specified dialect must be an asyncio-compatible dialect + such as :ref:`dialect-postgresql-asyncpg`. + + .. versionadded:: 1.4 + + """ + + if kw.get("server_side_cursors", False): + raise exc.AsyncMethodRequired( + "Can't set server_side_cursors for async engine globally; " + "use the connection.stream() method for an async " + "streaming result set" + ) + kw["future"] = True + sync_engine = _create_engine(*arg, **kw) + return AsyncEngine(sync_engine) + + +class AsyncConnection(StartableContext): + """An asyncio proxy for a :class:`_engine.Connection`. + + :class:`_asyncio.AsyncConnection` is acquired using the + :meth:`_asyncio.AsyncEngine.connect` + method of :class:`_asyncio.AsyncEngine`:: + + from sqlalchemy.ext.asyncio import create_async_engine + engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname") + + async with engine.connect() as conn: + result = await conn.execute(select(table)) + + .. versionadded:: 1.4 + + """ # noqa + + __slots__ = ( + "sync_engine", + "sync_connection", + ) + + def __init__( + self, sync_engine: Engine, sync_connection: Optional[Connection] = None + ): + self.sync_engine = sync_engine + self.sync_connection = sync_connection + + async def start(self): + """Start this :class:`_asyncio.AsyncConnection` object's context + outside of using a Python ``with:`` block. + + """ + if self.sync_connection: + raise exc.InvalidRequestError("connection is already started") + self.sync_connection = await (greenlet_spawn(self.sync_engine.connect)) + return self + + def _sync_connection(self): + if not self.sync_connection: + self._raise_for_not_started() + return self.sync_connection + + def begin(self) -> "AsyncTransaction": + """Begin a transaction prior to autobegin occurring. + + """ + self._sync_connection() + return AsyncTransaction(self) + + def begin_nested(self) -> "AsyncTransaction": + """Begin a nested transaction and return a transaction handle. + + """ + self._sync_connection() + return AsyncTransaction(self, nested=True) + + async def commit(self): + """Commit the transaction that is currently in progress. + + This method commits the current transaction if one has been started. + If no transaction was started, the method has no effect, assuming + the connection is in a non-invalidated state. + + A transaction is begun on a :class:`_future.Connection` automatically + whenever a statement is first executed, or when the + :meth:`_future.Connection.begin` method is called. + + """ + conn = self._sync_connection() + await greenlet_spawn(conn.commit) + + async def rollback(self): + """Roll back the transaction that is currently in progress. + + This method rolls back the current transaction if one has been started. + If no transaction was started, the method has no effect. If a + transaction was started and the connection is in an invalidated state, + the transaction is cleared using this method. + + A transaction is begun on a :class:`_future.Connection` automatically + whenever a statement is first executed, or when the + :meth:`_future.Connection.begin` method is called. + + + """ + conn = self._sync_connection() + await greenlet_spawn(conn.rollback) + + async def close(self): + """Close this :class:`_asyncio.AsyncConnection`. + + This has the effect of also rolling back the transaction if one + is in place. + + """ + conn = self._sync_connection() + await greenlet_spawn(conn.close) + + async def exec_driver_sql( + self, + statement: Executable, + parameters: Optional[Mapping] = None, + execution_options: Mapping = util.EMPTY_DICT, + ) -> Result: + r"""Executes a driver-level SQL string and return buffered + :class:`_engine.Result`. + + """ + + conn = self._sync_connection() + + result = await greenlet_spawn( + conn.exec_driver_sql, statement, parameters, execution_options, + ) + if result.context._is_server_side: + raise async_exc.AsyncMethodRequired( + "Can't use the connection.exec_driver_sql() method with a " + "server-side cursor." + "Use the connection.stream() method for an async " + "streaming result set." + ) + + return result + + async def stream( + self, + statement: Executable, + parameters: Optional[Mapping] = None, + execution_options: Mapping = util.EMPTY_DICT, + ) -> AsyncResult: + """Execute a statement and return a streaming + :class:`_asyncio.AsyncResult` object.""" + + conn = self._sync_connection() + + result = await greenlet_spawn( + conn._execute_20, + statement, + parameters, + util.EMPTY_DICT.merge_with( + execution_options, {"stream_results": True} + ), + ) + if not result.context._is_server_side: + # TODO: real exception here + assert False, "server side result expected" + return AsyncResult(result) + + async def execute( + self, + statement: Executable, + parameters: Optional[Mapping] = None, + execution_options: Mapping = util.EMPTY_DICT, + ) -> Result: + r"""Executes a SQL statement construct and return a buffered + :class:`_engine.Result`. + + :param object: The statement to be executed. This is always + an object that is in both the :class:`_expression.ClauseElement` and + :class:`_expression.Executable` hierarchies, including: + + * :class:`_expression.Select` + * :class:`_expression.Insert`, :class:`_expression.Update`, + :class:`_expression.Delete` + * :class:`_expression.TextClause` and + :class:`_expression.TextualSelect` + * :class:`_schema.DDL` and objects which inherit from + :class:`_schema.DDLElement` + + :param parameters: parameters which will be bound into the statement. + This may be either a dictionary of parameter names to values, + or a mutable sequence (e.g. a list) of dictionaries. When a + list of dictionaries is passed, the underlying statement execution + will make use of the DBAPI ``cursor.executemany()`` method. + When a single dictionary is passed, the DBAPI ``cursor.execute()`` + method will be used. + + :param execution_options: optional dictionary of execution options, + which will be associated with the statement execution. This + dictionary can provide a subset of the options that are accepted + by :meth:`_future.Connection.execution_options`. + + :return: a :class:`_engine.Result` object. + + """ + conn = self._sync_connection() + + result = await greenlet_spawn( + conn._execute_20, statement, parameters, execution_options, + ) + if result.context._is_server_side: + raise async_exc.AsyncMethodRequired( + "Can't use the connection.execute() method with a " + "server-side cursor." + "Use the connection.stream() method for an async " + "streaming result set." + ) + return result + + async def scalar( + self, + statement: Executable, + parameters: Optional[Mapping] = None, + execution_options: Mapping = util.EMPTY_DICT, + ) -> Any: + r"""Executes a SQL statement construct and returns a scalar object. + + This method is shorthand for invoking the + :meth:`_engine.Result.scalar` method after invoking the + :meth:`_future.Connection.execute` method. Parameters are equivalent. + + :return: a scalar Python value representing the first column of the + first row returned. + + """ + result = await self.execute(statement, parameters, execution_options) + return result.scalar() + + async def run_sync(self, fn: Callable, *arg, **kw) -> Any: + """"Invoke the given sync callable passing self as the first argument. + + This method maintains the asyncio event loop all the way through + to the database connection by running the given callable in a + specially instrumented greenlet. + + E.g.:: + + with async_engine.begin() as conn: + await conn.run_sync(metadata.create_all) + + """ + + conn = self._sync_connection() + + return await greenlet_spawn(fn, conn, *arg, **kw) + + def __await__(self): + return self.start().__await__() + + async def __aexit__(self, type_, value, traceback): + await self.close() + + +class AsyncEngine: + """An asyncio proxy for a :class:`_engine.Engine`. + + :class:`_asyncio.AsyncEngine` is acquired using the + :func:`_asyncio.create_async_engine` function:: + + from sqlalchemy.ext.asyncio import create_async_engine + engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname") + + .. versionadded:: 1.4 + + + """ # noqa + + __slots__ = ("sync_engine",) + + _connection_cls = AsyncConnection + + _option_cls: type + + class _trans_ctx(StartableContext): + def __init__(self, conn): + self.conn = conn + + async def start(self): + await self.conn.start() + self.transaction = self.conn.begin() + await self.transaction.__aenter__() + + return self.conn + + async def __aexit__(self, type_, value, traceback): + if type_ is not None: + await self.transaction.rollback() + else: + if self.transaction.is_active: + await self.transaction.commit() + await self.conn.close() + + def __init__(self, sync_engine: Engine): + self.sync_engine = sync_engine + + def begin(self): + """Return a context manager which when entered will deliver an + :class:`_asyncio.AsyncConnection` with an + :class:`_asyncio.AsyncTransaction` established. + + E.g.:: + + async with async_engine.begin() as conn: + await conn.execute( + text("insert into table (x, y, z) values (1, 2, 3)") + ) + await conn.execute(text("my_special_procedure(5)")) + + + """ + conn = self.connect() + return self._trans_ctx(conn) + + def connect(self) -> AsyncConnection: + """Return an :class:`_asyncio.AsyncConnection` object. + + The :class:`_asyncio.AsyncConnection` will procure a database + connection from the underlying connection pool when it is entered + as an async context manager:: + + async with async_engine.connect() as conn: + result = await conn.execute(select(user_table)) + + The :class:`_asyncio.AsyncConnection` may also be started outside of a + context manager by invoking its :meth:`_asyncio.AsyncConnection.start` + method. + + """ + + return self._connection_cls(self.sync_engine) + + async def raw_connection(self) -> Any: + """Return a "raw" DBAPI connection from the connection pool. + + .. seealso:: + + :ref:`dbapi_connections` + + """ + return await greenlet_spawn(self.sync_engine.raw_connection) + + +class AsyncOptionEngine(OptionEngineMixin, AsyncEngine): + pass + + +AsyncEngine._option_cls = AsyncOptionEngine + + +class AsyncTransaction(StartableContext): + """An asyncio proxy for a :class:`_engine.Transaction`.""" + + __slots__ = ("connection", "sync_transaction", "nested") + + def __init__(self, connection: AsyncConnection, nested: bool = False): + self.connection = connection + self.sync_transaction: Optional[Transaction] = None + self.nested = nested + + def _sync_transaction(self): + if not self.sync_transaction: + self._raise_for_not_started() + return self.sync_transaction + + @property + def is_valid(self) -> bool: + return self._sync_transaction().is_valid + + @property + def is_active(self) -> bool: + return self._sync_transaction().is_active + + async def close(self): + """Close this :class:`.Transaction`. + + If this transaction is the base transaction in a begin/commit + nesting, the transaction will rollback(). Otherwise, the + method returns. + + This is used to cancel a Transaction without affecting the scope of + an enclosing transaction. + + """ + await greenlet_spawn(self._sync_transaction().close) + + async def rollback(self): + """Roll back this :class:`.Transaction`. + + """ + await greenlet_spawn(self._sync_transaction().rollback) + + async def commit(self): + """Commit this :class:`.Transaction`.""" + + await greenlet_spawn(self._sync_transaction().commit) + + async def start(self): + """Start this :class:`_asyncio.AsyncTransaction` object's context + outside of using a Python ``with:`` block. + + """ + + self.sync_transaction = await greenlet_spawn( + self.connection._sync_connection().begin_nested + if self.nested + else self.connection._sync_connection().begin + ) + return self + + async def __aexit__(self, type_, value, traceback): + if type_ is None and self.is_active: + try: + await self.commit() + except: + with util.safe_reraise(): + await self.rollback() + else: + await self.rollback() + + +def _get_sync_engine(async_engine): + try: + return async_engine.sync_engine + except AttributeError as e: + raise exc.ArgumentError( + "AsyncEngine expected, got %r" % async_engine + ) from e diff --git a/lib/sqlalchemy/ext/asyncio/exc.py b/lib/sqlalchemy/ext/asyncio/exc.py new file mode 100644 index 000000000..6137bf6df --- /dev/null +++ b/lib/sqlalchemy/ext/asyncio/exc.py @@ -0,0 +1,14 @@ +from ... import exc + + +class AsyncMethodRequired(exc.InvalidRequestError): + """an API can't be used because its result would not be + compatible with async""" + + +class AsyncContextNotStarted(exc.InvalidRequestError): + """a startable context manager has not been started.""" + + +class AsyncContextAlreadyStarted(exc.InvalidRequestError): + """a startable context manager is already started.""" diff --git a/lib/sqlalchemy/ext/asyncio/result.py b/lib/sqlalchemy/ext/asyncio/result.py new file mode 100644 index 000000000..52b40acba --- /dev/null +++ b/lib/sqlalchemy/ext/asyncio/result.py @@ -0,0 +1,669 @@ +import operator + +from ... import util +from ...engine.result import _NO_ROW +from ...engine.result import FilterResult +from ...engine.result import FrozenResult +from ...engine.result import MergedResult +from ...util.concurrency import greenlet_spawn + +if util.TYPE_CHECKING: + from typing import Any + from typing import List + from typing import Optional + from typing import Int + from typing import Iterator + from typing import Mapping + from ...engine.result import Row + + +class AsyncResult(FilterResult): + """An asyncio wrapper around a :class:`_result.Result` object. + + The :class:`_asyncio.AsyncResult` only applies to statement executions that + use a server-side cursor. It is returned only from the + :meth:`_asyncio.AsyncConnection.stream` and + :meth:`_asyncio.AsyncSession.stream` methods. + + .. versionadded:: 1.4 + + """ + + def __init__(self, real_result): + self._real_result = real_result + + self._metadata = real_result._metadata + self._unique_filter_state = real_result._unique_filter_state + + # BaseCursorResult pre-generates the "_row_getter". Use that + # if available rather than building a second one + if "_row_getter" in real_result.__dict__: + self._set_memoized_attribute( + "_row_getter", real_result.__dict__["_row_getter"] + ) + + def keys(self): + """Return the :meth:`_engine.Result.keys` collection from the + underlying :class:`_engine.Result`. + + """ + return self._metadata.keys + + def unique(self, strategy=None): + """Apply unique filtering to the objects returned by this + :class:`_asyncio.AsyncResult`. + + Refer to :meth:`_engine.Result.unique` in the synchronous + SQLAlchemy API for a complete behavioral description. + + + """ + self._unique_filter_state = (set(), strategy) + return self + + def columns(self, *col_expressions): + # type: (*object) -> AsyncResult + r"""Establish the columns that should be returned in each row. + + Refer to :meth:`_engine.Result.columns` in the synchronous + SQLAlchemy API for a complete behavioral description. + + + """ + return self._column_slices(col_expressions) + + async def partitions(self, size=None): + # type: (Optional[Int]) -> Iterator[List[Any]] + """Iterate through sub-lists of rows of the size given. + + An async iterator is returned:: + + async def scroll_results(connection): + result = await connection.stream(select(users_table)) + + async for partition in result.partitions(100): + print("list of rows: %s" % partition) + + .. seealso:: + + :meth:`_engine.Result.partitions` + + """ + + getter = self._manyrow_getter + + while True: + partition = await greenlet_spawn(getter, self, size) + if partition: + yield partition + else: + break + + async def fetchone(self): + # type: () -> Row + """Fetch one row. + + When all rows are exhausted, returns None. + + This method is provided for backwards compatibility with + SQLAlchemy 1.x.x. + + To fetch the first row of a result only, use the + :meth:`_engine.Result.first` method. To iterate through all + rows, iterate the :class:`_engine.Result` object directly. + + :return: a :class:`.Row` object if no filters are applied, or None + if no rows remain. + + """ + row = await greenlet_spawn(self._onerow_getter, self) + if row is _NO_ROW: + return None + else: + return row + + async def fetchmany(self, size=None): + # type: (Optional[Int]) -> List[Row] + """Fetch many rows. + + When all rows are exhausted, returns an empty list. + + This method is provided for backwards compatibility with + SQLAlchemy 1.x.x. + + To fetch rows in groups, use the + :meth:`._asyncio.AsyncResult.partitions` method. + + :return: a list of :class:`.Row` objects. + + .. seealso:: + + :meth:`_asyncio.AsyncResult.partitions` + + """ + + return await greenlet_spawn(self._manyrow_getter, self, size) + + async def all(self): + # type: () -> List[Row] + """Return all rows in a list. + + Closes the result set after invocation. Subsequent invocations + will return an empty list. + + :return: a list of :class:`.Row` objects. + + """ + + return await greenlet_spawn(self._allrows) + + def __aiter__(self): + return self + + async def __anext__(self): + row = await greenlet_spawn(self._onerow_getter, self) + if row is _NO_ROW: + raise StopAsyncIteration() + else: + return row + + async def first(self): + # type: () -> Row + """Fetch the first row or None if no row is present. + + Closes the result set and discards remaining rows. + + .. note:: This method returns one **row**, e.g. tuple, by default. To + return exactly one single scalar value, that is, the first column of + the first row, use the :meth:`_asyncio.AsyncResult.scalar` method, + or combine :meth:`_asyncio.AsyncResult.scalars` and + :meth:`_asyncio.AsyncResult.first`. + + :return: a :class:`.Row` object, or None + if no rows remain. + + .. seealso:: + + :meth:`_asyncio.AsyncResult.scalar` + + :meth:`_asyncio.AsyncResult.one` + + """ + return await greenlet_spawn(self._only_one_row, False, False, False) + + async def one_or_none(self): + # type: () -> Optional[Row] + """Return at most one result or raise an exception. + + Returns ``None`` if the result has no rows. + Raises :class:`.MultipleResultsFound` + if multiple rows are returned. + + .. versionadded:: 1.4 + + :return: The first :class:`.Row` or None if no row is available. + + :raises: :class:`.MultipleResultsFound` + + .. seealso:: + + :meth:`_asyncio.AsyncResult.first` + + :meth:`_asyncio.AsyncResult.one` + + """ + return await greenlet_spawn(self._only_one_row, True, False, False) + + async def scalar_one(self): + # type: () -> Any + """Return exactly one scalar result or raise an exception. + + This is equvalent to calling :meth:`_asyncio.AsyncResult.scalars` and + then :meth:`_asyncio.AsyncResult.one`. + + .. seealso:: + + :meth:`_asyncio.AsyncResult.one` + + :meth:`_asyncio.AsyncResult.scalars` + + """ + return await greenlet_spawn(self._only_one_row, True, True, True) + + async def scalar_one_or_none(self): + # type: () -> Optional[Any] + """Return exactly one or no scalar result. + + This is equvalent to calling :meth:`_asyncio.AsyncResult.scalars` and + then :meth:`_asyncio.AsyncResult.one_or_none`. + + .. seealso:: + + :meth:`_asyncio.AsyncResult.one_or_none` + + :meth:`_asyncio.AsyncResult.scalars` + + """ + return await greenlet_spawn(self._only_one_row, True, False, True) + + async def one(self): + # type: () -> Row + """Return exactly one row or raise an exception. + + Raises :class:`.NoResultFound` if the result returns no + rows, or :class:`.MultipleResultsFound` if multiple rows + would be returned. + + .. note:: This method returns one **row**, e.g. tuple, by default. + To return exactly one single scalar value, that is, the first + column of the first row, use the + :meth:`_asyncio.AsyncResult.scalar_one` method, or combine + :meth:`_asyncio.AsyncResult.scalars` and + :meth:`_asyncio.AsyncResult.one`. + + .. versionadded:: 1.4 + + :return: The first :class:`.Row`. + + :raises: :class:`.MultipleResultsFound`, :class:`.NoResultFound` + + .. seealso:: + + :meth:`_asyncio.AsyncResult.first` + + :meth:`_asyncio.AsyncResult.one_or_none` + + :meth:`_asyncio.AsyncResult.scalar_one` + + """ + return await greenlet_spawn(self._only_one_row, True, True, False) + + async def scalar(self): + # type: () -> Optional[Any] + """Fetch the first column of the first row, and close the result set. + + Returns None if there are no rows to fetch. + + No validation is performed to test if additional rows remain. + + After calling this method, the object is fully closed, + e.g. the :meth:`_engine.CursorResult.close` + method will have been called. + + :return: a Python scalar value , or None if no rows remain. + + """ + return await greenlet_spawn(self._only_one_row, False, False, True) + + async def freeze(self): + """Return a callable object that will produce copies of this + :class:`_asyncio.AsyncResult` when invoked. + + The callable object returned is an instance of + :class:`_engine.FrozenResult`. + + This is used for result set caching. The method must be called + on the result when it has been unconsumed, and calling the method + will consume the result fully. When the :class:`_engine.FrozenResult` + is retrieved from a cache, it can be called any number of times where + it will produce a new :class:`_engine.Result` object each time + against its stored set of rows. + + .. seealso:: + + :ref:`do_orm_execute_re_executing` - example usage within the + ORM to implement a result-set cache. + + """ + + return await greenlet_spawn(FrozenResult, self) + + def merge(self, *others): + """Merge this :class:`_asyncio.AsyncResult` with other compatible result + objects. + + The object returned is an instance of :class:`_engine.MergedResult`, + which will be composed of iterators from the given result + objects. + + The new result will use the metadata from this result object. + The subsequent result objects must be against an identical + set of result / cursor metadata, otherwise the behavior is + undefined. + + """ + return MergedResult(self._metadata, (self,) + others) + + def scalars(self, index=0): + # type: (Int) -> AsyncScalarResult + """Return an :class:`_asyncio.AsyncScalarResult` filtering object which + will return single elements rather than :class:`_row.Row` objects. + + Refer to :meth:`_result.Result.scalars` in the synchronous + SQLAlchemy API for a complete behavioral description. + + :param index: integer or row key indicating the column to be fetched + from each row, defaults to ``0`` indicating the first column. + + :return: a new :class:`_asyncio.AsyncScalarResult` filtering object + referring to this :class:`_asyncio.AsyncResult` object. + + """ + return AsyncScalarResult(self._real_result, index) + + def mappings(self): + # type() -> AsyncMappingResult + """Apply a mappings filter to returned rows, returning an instance of + :class:`_asyncio.AsyncMappingResult`. + + When this filter is applied, fetching rows will return + :class:`.RowMapping` objects instead of :class:`.Row` objects. + + Refer to :meth:`_result.Result.mappings` in the synchronous + SQLAlchemy API for a complete behavioral description. + + :return: a new :class:`_asyncio.AsyncMappingResult` filtering object + referring to the underlying :class:`_result.Result` object. + + """ + + return AsyncMappingResult(self._real_result) + + +class AsyncScalarResult(FilterResult): + """A wrapper for a :class:`_asyncio.AsyncResult` that returns scalar values + rather than :class:`_row.Row` values. + + The :class:`_asyncio.AsyncScalarResult` object is acquired by calling the + :meth:`_asyncio.AsyncResult.scalars` method. + + Refer to the :class:`_result.ScalarResult` object in the synchronous + SQLAlchemy API for a complete behavioral description. + + .. versionadded:: 1.4 + + """ + + _generate_rows = False + + def __init__(self, real_result, index): + self._real_result = real_result + + if real_result._source_supports_scalars: + self._metadata = real_result._metadata + self._post_creational_filter = None + else: + self._metadata = real_result._metadata._reduce([index]) + self._post_creational_filter = operator.itemgetter(0) + + self._unique_filter_state = real_result._unique_filter_state + + def unique(self, strategy=None): + # type: () -> AsyncScalarResult + """Apply unique filtering to the objects returned by this + :class:`_asyncio.AsyncScalarResult`. + + See :meth:`_asyncio.AsyncResult.unique` for usage details. + + """ + self._unique_filter_state = (set(), strategy) + return self + + async def partitions(self, size=None): + # type: (Optional[Int]) -> Iterator[List[Any]] + """Iterate through sub-lists of elements of the size given. + + Equivalent to :meth:`_asyncio.AsyncResult.partitions` except that + scalar values, rather than :class:`_result.Row` objects, + are returned. + + """ + + getter = self._manyrow_getter + + while True: + partition = await greenlet_spawn(getter, self, size) + if partition: + yield partition + else: + break + + async def fetchall(self): + # type: () -> List[Any] + """A synonym for the :meth:`_asyncio.AsyncScalarResult.all` method.""" + + return await greenlet_spawn(self._allrows) + + async def fetchmany(self, size=None): + # type: (Optional[Int]) -> List[Any] + """Fetch many objects. + + Equivalent to :meth:`_asyncio.AsyncResult.fetchmany` except that + scalar values, rather than :class:`_result.Row` objects, + are returned. + + """ + return await greenlet_spawn(self._manyrow_getter, self, size) + + async def all(self): + # type: () -> List[Any] + """Return all scalar values in a list. + + Equivalent to :meth:`_asyncio.AsyncResult.all` except that + scalar values, rather than :class:`_result.Row` objects, + are returned. + + """ + return await greenlet_spawn(self._allrows) + + def __aiter__(self): + return self + + async def __anext__(self): + row = await greenlet_spawn(self._onerow_getter, self) + if row is _NO_ROW: + raise StopAsyncIteration() + else: + return row + + async def first(self): + # type: () -> Optional[Any] + """Fetch the first object or None if no object is present. + + Equivalent to :meth:`_asyncio.AsyncResult.first` except that + scalar values, rather than :class:`_result.Row` objects, + are returned. + + """ + return await greenlet_spawn(self._only_one_row, False, False, False) + + async def one_or_none(self): + # type: () -> Optional[Any] + """Return at most one object or raise an exception. + + Equivalent to :meth:`_asyncio.AsyncResult.one_or_none` except that + scalar values, rather than :class:`_result.Row` objects, + are returned. + + """ + return await greenlet_spawn(self._only_one_row, True, False, False) + + async def one(self): + # type: () -> Any + """Return exactly one object or raise an exception. + + Equivalent to :meth:`_asyncio.AsyncResult.one` except that + scalar values, rather than :class:`_result.Row` objects, + are returned. + + """ + return await greenlet_spawn(self._only_one_row, True, True, False) + + +class AsyncMappingResult(FilterResult): + """A wrapper for a :class:`_asyncio.AsyncResult` that returns dictionary values + rather than :class:`_engine.Row` values. + + The :class:`_asyncio.AsyncMappingResult` object is acquired by calling the + :meth:`_asyncio.AsyncResult.mappings` method. + + Refer to the :class:`_result.MappingResult` object in the synchronous + SQLAlchemy API for a complete behavioral description. + + .. versionadded:: 1.4 + + """ + + _generate_rows = True + + _post_creational_filter = operator.attrgetter("_mapping") + + def __init__(self, result): + self._real_result = result + self._unique_filter_state = result._unique_filter_state + self._metadata = result._metadata + if result._source_supports_scalars: + self._metadata = self._metadata._reduce([0]) + + def keys(self): + """Return an iterable view which yields the string keys that would + be represented by each :class:`.Row`. + + The view also can be tested for key containment using the Python + ``in`` operator, which will test both for the string keys represented + in the view, as well as for alternate keys such as column objects. + + .. versionchanged:: 1.4 a key view object is returned rather than a + plain list. + + + """ + return self._metadata.keys + + def unique(self, strategy=None): + # type: () -> AsyncMappingResult + """Apply unique filtering to the objects returned by this + :class:`_asyncio.AsyncMappingResult`. + + See :meth:`_asyncio.AsyncResult.unique` for usage details. + + """ + self._unique_filter_state = (set(), strategy) + return self + + def columns(self, *col_expressions): + # type: (*object) -> AsyncMappingResult + r"""Establish the columns that should be returned in each row. + + + """ + return self._column_slices(col_expressions) + + async def partitions(self, size=None): + # type: (Optional[Int]) -> Iterator[List[Mapping]] + """Iterate through sub-lists of elements of the size given. + + Equivalent to :meth:`_asyncio.AsyncResult.partitions` except that + mapping values, rather than :class:`_result.Row` objects, + are returned. + + """ + + getter = self._manyrow_getter + + while True: + partition = await greenlet_spawn(getter, self, size) + if partition: + yield partition + else: + break + + async def fetchall(self): + # type: () -> List[Mapping] + """A synonym for the :meth:`_asyncio.AsyncMappingResult.all` method.""" + + return await greenlet_spawn(self._allrows) + + async def fetchone(self): + # type: () -> Mapping + """Fetch one object. + + Equivalent to :meth:`_asyncio.AsyncResult.fetchone` except that + mapping values, rather than :class:`_result.Row` objects, + are returned. + + """ + + row = await greenlet_spawn(self._onerow_getter, self) + if row is _NO_ROW: + return None + else: + return row + + async def fetchmany(self, size=None): + # type: (Optional[Int]) -> List[Mapping] + """Fetch many objects. + + Equivalent to :meth:`_asyncio.AsyncResult.fetchmany` except that + mapping values, rather than :class:`_result.Row` objects, + are returned. + + """ + + return await greenlet_spawn(self._manyrow_getter, self, size) + + async def all(self): + # type: () -> List[Mapping] + """Return all scalar values in a list. + + Equivalent to :meth:`_asyncio.AsyncResult.all` except that + mapping values, rather than :class:`_result.Row` objects, + are returned. + + """ + + return await greenlet_spawn(self._allrows) + + def __aiter__(self): + return self + + async def __anext__(self): + row = await greenlet_spawn(self._onerow_getter, self) + if row is _NO_ROW: + raise StopAsyncIteration() + else: + return row + + async def first(self): + # type: () -> Optional[Mapping] + """Fetch the first object or None if no object is present. + + Equivalent to :meth:`_asyncio.AsyncResult.first` except that + mapping values, rather than :class:`_result.Row` objects, + are returned. + + + """ + return await greenlet_spawn(self._only_one_row, False, False, False) + + async def one_or_none(self): + # type: () -> Optional[Mapping] + """Return at most one object or raise an exception. + + Equivalent to :meth:`_asyncio.AsyncResult.one_or_none` except that + mapping values, rather than :class:`_result.Row` objects, + are returned. + + """ + return await greenlet_spawn(self._only_one_row, True, False, False) + + async def one(self): + # type: () -> Mapping + """Return exactly one object or raise an exception. + + Equivalent to :meth:`_asyncio.AsyncResult.one` except that + mapping values, rather than :class:`_result.Row` objects, + are returned. + + """ + return await greenlet_spawn(self._only_one_row, True, True, False) diff --git a/lib/sqlalchemy/ext/asyncio/session.py b/lib/sqlalchemy/ext/asyncio/session.py new file mode 100644 index 000000000..167301780 --- /dev/null +++ b/lib/sqlalchemy/ext/asyncio/session.py @@ -0,0 +1,293 @@ +from typing import Any +from typing import Callable +from typing import List +from typing import Mapping +from typing import Optional + +from . import engine +from . import result as _result +from .base import StartableContext +from .engine import AsyncEngine +from ... import util +from ...engine import Result +from ...orm import Session +from ...sql import Executable +from ...util.concurrency import greenlet_spawn + + +class AsyncSession: + """Asyncio version of :class:`_orm.Session`. + + + .. versionadded:: 1.4 + + """ + + def __init__( + self, + bind: AsyncEngine = None, + binds: Mapping[object, AsyncEngine] = None, + **kw + ): + kw["future"] = True + if bind: + bind = engine._get_sync_engine(bind) + + if binds: + binds = { + key: engine._get_sync_engine(b) for key, b in binds.items() + } + + self.sync_session = Session(bind=bind, binds=binds, **kw) + + def add(self, instance: object) -> None: + """Place an object in this :class:`_asyncio.AsyncSession`. + + .. seealso:: + + :meth:`_orm.Session.add` + + """ + self.sync_session.add(instance) + + def add_all(self, instances: List[object]) -> None: + """Add the given collection of instances to this + :class:`_asyncio.AsyncSession`.""" + + self.sync_session.add_all(instances) + + def expire_all(self): + """Expires all persistent instances within this Session. + + See :meth:`_orm.Session.expire_all` for usage details. + + """ + self.sync_session.expire_all() + + def expire(self, instance, attribute_names=None): + """Expire the attributes on an instance. + + See :meth:`._orm.Session.expire` for usage details. + + """ + self.sync_session.expire() + + async def refresh( + self, instance, attribute_names=None, with_for_update=None + ): + """Expire and refresh the attributes on the given instance. + + A query will be issued to the database and all attributes will be + refreshed with their current database value. + + This is the async version of the :meth:`_orm.Session.refresh` method. + See that method for a complete description of all options. + + """ + + return await greenlet_spawn( + self.sync_session.refresh, + instance, + attribute_names=attribute_names, + with_for_update=with_for_update, + ) + + async def run_sync(self, fn: Callable, *arg, **kw) -> Any: + """Invoke the given sync callable passing sync self as the first + argument. + + This method maintains the asyncio event loop all the way through + to the database connection by running the given callable in a + specially instrumented greenlet. + + E.g.:: + + with AsyncSession(async_engine) as session: + await session.run_sync(some_business_method) + + """ + + return await greenlet_spawn(fn, self.sync_session, *arg, **kw) + + async def execute( + self, + statement: Executable, + params: Optional[Mapping] = None, + execution_options: Mapping = util.EMPTY_DICT, + bind_arguments: Optional[Mapping] = None, + **kw + ) -> Result: + """Execute a statement and return a buffered + :class:`_engine.Result` object.""" + + execution_options = execution_options.union({"prebuffer_rows": True}) + + return await greenlet_spawn( + self.sync_session.execute, + statement, + params=params, + execution_options=execution_options, + bind_arguments=bind_arguments, + **kw + ) + + async def stream( + self, + statement, + params=None, + execution_options=util.EMPTY_DICT, + bind_arguments=None, + **kw + ): + """Execute a statement and return a streaming + :class:`_asyncio.AsyncResult` object.""" + + execution_options = execution_options.union({"stream_results": True}) + + result = await greenlet_spawn( + self.sync_session.execute, + statement, + params=params, + execution_options=execution_options, + bind_arguments=bind_arguments, + **kw + ) + return _result.AsyncResult(result) + + async def merge(self, instance, load=True): + """Copy the state of a given instance into a corresponding instance + within this :class:`_asyncio.AsyncSession`. + + """ + return await greenlet_spawn( + self.sync_session.merge, instance, load=load + ) + + async def flush(self, objects=None): + """Flush all the object changes to the database. + + .. seealso:: + + :meth:`_orm.Session.flush` + + """ + await greenlet_spawn(self.sync_session.flush, objects=objects) + + async def connection(self): + r"""Return a :class:`_asyncio.AsyncConnection` object corresponding to this + :class:`.Session` object's transactional state. + + """ + sync_connection = await greenlet_spawn(self.sync_session.connection) + return engine.AsyncConnection(sync_connection.engine, sync_connection) + + def begin(self, **kw): + """Return an :class:`_asyncio.AsyncSessionTransaction` object. + + The underlying :class:`_orm.Session` will perform the + "begin" action when the :class:`_asyncio.AsyncSessionTransaction` + object is entered:: + + async with async_session.begin(): + # .. ORM transaction is begun + + Note that database IO will not normally occur when the session-level + transaction is begun, as database transactions begin on an + on-demand basis. However, the begin block is async to accommodate + for a :meth:`_orm.SessionEvents.after_transaction_create` + event hook that may perform IO. + + For a general description of ORM begin, see + :meth:`_orm.Session.begin`. + + """ + + return AsyncSessionTransaction(self) + + def begin_nested(self, **kw): + """Return an :class:`_asyncio.AsyncSessionTransaction` object + which will begin a "nested" transaction, e.g. SAVEPOINT. + + Behavior is the same as that of :meth:`_asyncio.AsyncSession.begin`. + + For a general description of ORM begin nested, see + :meth:`_orm.Session.begin_nested`. + + """ + + return AsyncSessionTransaction(self, nested=True) + + async def rollback(self): + return await greenlet_spawn(self.sync_session.rollback) + + async def commit(self): + return await greenlet_spawn(self.sync_session.commit) + + async def close(self): + return await greenlet_spawn(self.sync_session.close) + + async def __aenter__(self): + return self + + async def __aexit__(self, type_, value, traceback): + await self.close() + + +class AsyncSessionTransaction(StartableContext): + """A wrapper for the ORM :class:`_orm.SessionTransaction` object. + + This object is provided so that a transaction-holding object + for the :meth:`_asyncio.AsyncSession.begin` may be returned. + + The object supports both explicit calls to + :meth:`_asyncio.AsyncSessionTransaction.commit` and + :meth:`_asyncio.AsyncSessionTransaction.rollback`, as well as use as an + async context manager. + + + .. versionadded:: 1.4 + + """ + + __slots__ = ("session", "sync_transaction", "nested") + + def __init__(self, session, nested=False): + self.session = session + self.nested = nested + self.sync_transaction = None + + @property + def is_active(self): + return ( + self._sync_transaction() is not None + and self._sync_transaction().is_active + ) + + def _sync_transaction(self): + if not self.sync_transaction: + self._raise_for_not_started() + return self.sync_transaction + + async def rollback(self): + """Roll back this :class:`_asyncio.AsyncTransaction`. + + """ + await greenlet_spawn(self._sync_transaction().rollback) + + async def commit(self): + """Commit this :class:`_asyncio.AsyncTransaction`.""" + + await greenlet_spawn(self._sync_transaction().commit) + + async def start(self): + self.sync_transaction = await greenlet_spawn( + self.session.sync_session.begin_nested + if self.nested + else self.session.sync_session.begin + ) + return self + + async def __aexit__(self, type_, value, traceback): + return await greenlet_spawn( + self._sync_transaction().__exit__, type_, value, traceback + ) |