diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-04-10 15:42:35 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-04-11 22:11:07 -0400 |
commit | a45e2284dad17fbbba3bea9d5e5304aab21c8c94 (patch) | |
tree | ac31614f2d53059570e2edffe731baf384baea23 /lib/sqlalchemy/ext/asyncio/result.py | |
parent | aa9cd878e8249a4a758c7f968e929e92fede42a5 (diff) | |
download | sqlalchemy-a45e2284dad17fbbba3bea9d5e5304aab21c8c94.tar.gz |
pep-484: asyncio
in this patch the asyncio/events.py module, which
existed only to raise errors when trying to attach event
listeners, is removed, as we were already coding an asyncio-specific
workaround in upstream Pool / Session to raise this error,
just moved the error out to the target and did the same thing
for Engine.
We also add an async_sessionmaker class. The initial rationale
here is because sessionmaker() is hardcoded to Session subclasses,
and there's not a way to get the use case of
sessionmaker(class_=AsyncSession) to type correctly without changing
the sessionmaker() symbol itself to be a function and not a class,
which gets too complicated for what this is. Additionally,
_SessionClassMethods has only three methods on it, one of which
is not usable with asyncio (close_all()), the others
not generally used from the session class.
Change-Id: I064a5fa5d91cc8d5bbe9597437536e37b4e801fe
Diffstat (limited to 'lib/sqlalchemy/ext/asyncio/result.py')
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/result.py | 180 |
1 files changed, 119 insertions, 61 deletions
diff --git a/lib/sqlalchemy/ext/asyncio/result.py b/lib/sqlalchemy/ext/asyncio/result.py index 39718735c..a9db822a6 100644 --- a/lib/sqlalchemy/ext/asyncio/result.py +++ b/lib/sqlalchemy/ext/asyncio/result.py @@ -4,25 +4,49 @@ # # This module is part of SQLAlchemy and is released under # the MIT License: https://www.opensource.org/licenses/mit-license.php +from __future__ import annotations import operator +from typing import Any +from typing import AsyncIterator +from typing import List +from typing import Optional +from typing import TYPE_CHECKING +from typing import TypeVar from . import exc as async_exc from ...engine.result import _NO_ROW +from ...engine.result import _R from ...engine.result import FilterResult from ...engine.result import FrozenResult from ...engine.result import MergedResult +from ...engine.result import ResultMetaData +from ...engine.row import Row +from ...engine.row import RowMapping from ...util.concurrency import greenlet_spawn +if TYPE_CHECKING: + from ...engine import CursorResult + from ...engine import Result + from ...engine.result import _KeyIndexType + from ...engine.result import _UniqueFilterType + from ...engine.result import RMKeyView -class AsyncCommon(FilterResult): - async def close(self): + +class AsyncCommon(FilterResult[_R]): + _real_result: Result + _metadata: ResultMetaData + + async def close(self) -> None: """Close this result.""" await greenlet_spawn(self._real_result.close) -class AsyncResult(AsyncCommon): +SelfAsyncResult = TypeVar("SelfAsyncResult", bound="AsyncResult") + + +class AsyncResult(AsyncCommon[Row]): """An asyncio wrapper around a :class:`_result.Result` object. The :class:`_asyncio.AsyncResult` only applies to statement executions that @@ -43,7 +67,7 @@ class AsyncResult(AsyncCommon): """ - def __init__(self, real_result): + def __init__(self, real_result: Result): self._real_result = real_result self._metadata = real_result._metadata @@ -56,14 +80,16 @@ class AsyncResult(AsyncCommon): "_row_getter", real_result.__dict__["_row_getter"] ) - def keys(self): + def keys(self) -> RMKeyView: """Return the :meth:`_engine.Result.keys` collection from the underlying :class:`_engine.Result`. """ return self._metadata.keys - def unique(self, strategy=None): + def unique( + self: SelfAsyncResult, strategy: Optional[_UniqueFilterType] = None + ) -> SelfAsyncResult: """Apply unique filtering to the objects returned by this :class:`_asyncio.AsyncResult`. @@ -75,7 +101,9 @@ class AsyncResult(AsyncCommon): self._unique_filter_state = (set(), strategy) return self - def columns(self, *col_expressions): + def columns( + self: SelfAsyncResult, *col_expressions: _KeyIndexType + ) -> SelfAsyncResult: r"""Establish the columns that should be returned in each row. Refer to :meth:`_engine.Result.columns` in the synchronous @@ -85,7 +113,9 @@ class AsyncResult(AsyncCommon): """ return self._column_slices(col_expressions) - async def partitions(self, size=None): + async def partitions( + self, size: Optional[int] = None + ) -> AsyncIterator[List[Row]]: """Iterate through sub-lists of rows of the size given. An async iterator is returned:: @@ -111,7 +141,7 @@ class AsyncResult(AsyncCommon): else: break - async def fetchone(self): + async def fetchone(self) -> Optional[Row]: """Fetch one row. When all rows are exhausted, returns None. @@ -131,9 +161,9 @@ class AsyncResult(AsyncCommon): if row is _NO_ROW: return None else: - return row # type: ignore[return-value] + return row - async def fetchmany(self, size=None): + async def fetchmany(self, size: Optional[int] = None) -> List[Row]: """Fetch many rows. When all rows are exhausted, returns an empty list. @@ -152,11 +182,9 @@ class AsyncResult(AsyncCommon): """ - return await greenlet_spawn( - self._manyrow_getter, self, size # type: ignore - ) + return await greenlet_spawn(self._manyrow_getter, self, size) - async def all(self): + async def all(self) -> List[Row]: """Return all rows in a list. Closes the result set after invocation. Subsequent invocations @@ -166,19 +194,19 @@ class AsyncResult(AsyncCommon): """ - return await greenlet_spawn(self._allrows) # type: ignore + return await greenlet_spawn(self._allrows) - def __aiter__(self): + def __aiter__(self) -> AsyncResult: return self - async def __anext__(self): + async def __anext__(self) -> Row: row = await greenlet_spawn(self._onerow_getter, self) if row is _NO_ROW: raise StopAsyncIteration() else: return row - async def first(self): + async def first(self) -> Optional[Row]: """Fetch the first row or None if no row is present. Closes the result set and discards remaining rows. @@ -201,7 +229,7 @@ class AsyncResult(AsyncCommon): """ return await greenlet_spawn(self._only_one_row, False, False, False) - async def one_or_none(self): + async def one_or_none(self) -> Optional[Row]: """Return at most one result or raise an exception. Returns ``None`` if the result has no rows. @@ -223,7 +251,7 @@ class AsyncResult(AsyncCommon): """ return await greenlet_spawn(self._only_one_row, True, False, False) - async def scalar_one(self): + async def scalar_one(self) -> Any: """Return exactly one scalar result or raise an exception. This is equivalent to calling :meth:`_asyncio.AsyncResult.scalars` and @@ -238,7 +266,7 @@ class AsyncResult(AsyncCommon): """ return await greenlet_spawn(self._only_one_row, True, True, True) - async def scalar_one_or_none(self): + async def scalar_one_or_none(self) -> Optional[Any]: """Return exactly one or no scalar result. This is equivalent to calling :meth:`_asyncio.AsyncResult.scalars` and @@ -253,7 +281,7 @@ class AsyncResult(AsyncCommon): """ return await greenlet_spawn(self._only_one_row, True, False, True) - async def one(self): + async def one(self) -> Row: """Return exactly one row or raise an exception. Raises :class:`.NoResultFound` if the result returns no @@ -284,7 +312,7 @@ class AsyncResult(AsyncCommon): """ return await greenlet_spawn(self._only_one_row, True, True, False) - async def scalar(self): + async def scalar(self) -> Any: """Fetch the first column of the first row, and close the result set. Returns None if there are no rows to fetch. @@ -300,7 +328,7 @@ class AsyncResult(AsyncCommon): """ return await greenlet_spawn(self._only_one_row, False, False, True) - async def freeze(self): + async def freeze(self) -> FrozenResult: """Return a callable object that will produce copies of this :class:`_asyncio.AsyncResult` when invoked. @@ -323,7 +351,7 @@ class AsyncResult(AsyncCommon): return await greenlet_spawn(FrozenResult, self) - def merge(self, *others): + def merge(self, *others: AsyncResult) -> MergedResult: """Merge this :class:`_asyncio.AsyncResult` with other compatible result objects. @@ -337,9 +365,12 @@ class AsyncResult(AsyncCommon): undefined. """ - return MergedResult(self._metadata, (self,) + others) + return MergedResult( + self._metadata, + (self._real_result,) + tuple(o._real_result for o in others), + ) - def scalars(self, index=0): + def scalars(self, index: _KeyIndexType = 0) -> AsyncScalarResult[Any]: """Return an :class:`_asyncio.AsyncScalarResult` filtering object which will return single elements rather than :class:`_row.Row` objects. @@ -355,7 +386,7 @@ class AsyncResult(AsyncCommon): """ return AsyncScalarResult(self._real_result, index) - def mappings(self): + def mappings(self) -> AsyncMappingResult: """Apply a mappings filter to returned rows, returning an instance of :class:`_asyncio.AsyncMappingResult`. @@ -373,7 +404,12 @@ class AsyncResult(AsyncCommon): return AsyncMappingResult(self._real_result) -class AsyncScalarResult(AsyncCommon): +SelfAsyncScalarResult = TypeVar( + "SelfAsyncScalarResult", bound="AsyncScalarResult[Any]" +) + + +class AsyncScalarResult(AsyncCommon[_R]): """A wrapper for a :class:`_asyncio.AsyncResult` that returns scalar values rather than :class:`_row.Row` values. @@ -389,7 +425,7 @@ class AsyncScalarResult(AsyncCommon): _generate_rows = False - def __init__(self, real_result, index): + def __init__(self, real_result: Result, index: _KeyIndexType): self._real_result = real_result if real_result._source_supports_scalars: @@ -401,7 +437,10 @@ class AsyncScalarResult(AsyncCommon): self._unique_filter_state = real_result._unique_filter_state - def unique(self, strategy=None): + def unique( + self: SelfAsyncScalarResult, + strategy: Optional[_UniqueFilterType] = None, + ) -> SelfAsyncScalarResult: """Apply unique filtering to the objects returned by this :class:`_asyncio.AsyncScalarResult`. @@ -411,7 +450,9 @@ class AsyncScalarResult(AsyncCommon): self._unique_filter_state = (set(), strategy) return self - async def partitions(self, size=None): + async def partitions( + self, size: Optional[int] = None + ) -> AsyncIterator[List[_R]]: """Iterate through sub-lists of elements of the size given. Equivalent to :meth:`_asyncio.AsyncResult.partitions` except that @@ -429,12 +470,12 @@ class AsyncScalarResult(AsyncCommon): else: break - async def fetchall(self): + async def fetchall(self) -> List[_R]: """A synonym for the :meth:`_asyncio.AsyncScalarResult.all` method.""" return await greenlet_spawn(self._allrows) - async def fetchmany(self, size=None): + async def fetchmany(self, size: Optional[int] = None) -> List[_R]: """Fetch many objects. Equivalent to :meth:`_asyncio.AsyncResult.fetchmany` except that @@ -444,7 +485,7 @@ class AsyncScalarResult(AsyncCommon): """ return await greenlet_spawn(self._manyrow_getter, self, size) - async def all(self): + async def all(self) -> List[_R]: """Return all scalar values in a list. Equivalent to :meth:`_asyncio.AsyncResult.all` except that @@ -454,17 +495,17 @@ class AsyncScalarResult(AsyncCommon): """ return await greenlet_spawn(self._allrows) - def __aiter__(self): + def __aiter__(self) -> AsyncScalarResult[_R]: return self - async def __anext__(self): + async def __anext__(self) -> _R: row = await greenlet_spawn(self._onerow_getter, self) if row is _NO_ROW: raise StopAsyncIteration() else: return row - async def first(self): + async def first(self) -> Optional[_R]: """Fetch the first object or None if no object is present. Equivalent to :meth:`_asyncio.AsyncResult.first` except that @@ -474,7 +515,7 @@ class AsyncScalarResult(AsyncCommon): """ return await greenlet_spawn(self._only_one_row, False, False, False) - async def one_or_none(self): + async def one_or_none(self) -> Optional[_R]: """Return at most one object or raise an exception. Equivalent to :meth:`_asyncio.AsyncResult.one_or_none` except that @@ -484,7 +525,7 @@ class AsyncScalarResult(AsyncCommon): """ return await greenlet_spawn(self._only_one_row, True, False, False) - async def one(self): + async def one(self) -> _R: """Return exactly one object or raise an exception. Equivalent to :meth:`_asyncio.AsyncResult.one` except that @@ -495,7 +536,12 @@ class AsyncScalarResult(AsyncCommon): return await greenlet_spawn(self._only_one_row, True, True, False) -class AsyncMappingResult(AsyncCommon): +SelfAsyncMappingResult = TypeVar( + "SelfAsyncMappingResult", bound="AsyncMappingResult" +) + + +class AsyncMappingResult(AsyncCommon[RowMapping]): """A wrapper for a :class:`_asyncio.AsyncResult` that returns dictionary values rather than :class:`_engine.Row` values. @@ -513,14 +559,14 @@ class AsyncMappingResult(AsyncCommon): _post_creational_filter = operator.attrgetter("_mapping") - def __init__(self, result): + def __init__(self, result: Result): self._real_result = result self._unique_filter_state = result._unique_filter_state self._metadata = result._metadata if result._source_supports_scalars: self._metadata = self._metadata._reduce([0]) - def keys(self): + def keys(self) -> RMKeyView: """Return an iterable view which yields the string keys that would be represented by each :class:`.Row`. @@ -535,7 +581,10 @@ class AsyncMappingResult(AsyncCommon): """ return self._metadata.keys - def unique(self, strategy=None): + def unique( + self: SelfAsyncMappingResult, + strategy: Optional[_UniqueFilterType] = None, + ) -> SelfAsyncMappingResult: """Apply unique filtering to the objects returned by this :class:`_asyncio.AsyncMappingResult`. @@ -545,11 +594,16 @@ class AsyncMappingResult(AsyncCommon): self._unique_filter_state = (set(), strategy) return self - def columns(self, *col_expressions): + def columns( + self: SelfAsyncMappingResult, *col_expressions: _KeyIndexType + ) -> SelfAsyncMappingResult: r"""Establish the columns that should be returned in each row.""" return self._column_slices(col_expressions) - async def partitions(self, size=None): + async def partitions( + self, size: Optional[int] = None + ) -> AsyncIterator[List[RowMapping]]: + """Iterate through sub-lists of elements of the size given. Equivalent to :meth:`_asyncio.AsyncResult.partitions` except that @@ -567,12 +621,12 @@ class AsyncMappingResult(AsyncCommon): else: break - async def fetchall(self): + async def fetchall(self) -> List[RowMapping]: """A synonym for the :meth:`_asyncio.AsyncMappingResult.all` method.""" return await greenlet_spawn(self._allrows) - async def fetchone(self): + async def fetchone(self) -> Optional[RowMapping]: """Fetch one object. Equivalent to :meth:`_asyncio.AsyncResult.fetchone` except that @@ -587,8 +641,8 @@ class AsyncMappingResult(AsyncCommon): else: return row - async def fetchmany(self, size=None): - """Fetch many objects. + async def fetchmany(self, size: Optional[int] = None) -> List[RowMapping]: + """Fetch many rows. Equivalent to :meth:`_asyncio.AsyncResult.fetchmany` except that :class:`_result.RowMapping` values, rather than :class:`_result.Row` @@ -598,8 +652,8 @@ class AsyncMappingResult(AsyncCommon): return await greenlet_spawn(self._manyrow_getter, self, size) - async def all(self): - """Return all scalar values in a list. + async def all(self) -> List[RowMapping]: + """Return all rows in a list. Equivalent to :meth:`_asyncio.AsyncResult.all` except that :class:`_result.RowMapping` values, rather than :class:`_result.Row` @@ -609,17 +663,17 @@ class AsyncMappingResult(AsyncCommon): return await greenlet_spawn(self._allrows) - def __aiter__(self): + def __aiter__(self) -> AsyncMappingResult: return self - async def __anext__(self): + async def __anext__(self) -> RowMapping: row = await greenlet_spawn(self._onerow_getter, self) if row is _NO_ROW: raise StopAsyncIteration() else: return row - async def first(self): + async def first(self) -> Optional[RowMapping]: """Fetch the first object or None if no object is present. Equivalent to :meth:`_asyncio.AsyncResult.first` except that @@ -630,7 +684,7 @@ class AsyncMappingResult(AsyncCommon): """ return await greenlet_spawn(self._only_one_row, False, False, False) - async def one_or_none(self): + async def one_or_none(self) -> Optional[RowMapping]: """Return at most one object or raise an exception. Equivalent to :meth:`_asyncio.AsyncResult.one_or_none` except that @@ -640,7 +694,7 @@ class AsyncMappingResult(AsyncCommon): """ return await greenlet_spawn(self._only_one_row, True, False, False) - async def one(self): + async def one(self) -> RowMapping: """Return exactly one object or raise an exception. Equivalent to :meth:`_asyncio.AsyncResult.one` except that @@ -651,11 +705,15 @@ class AsyncMappingResult(AsyncCommon): return await greenlet_spawn(self._only_one_row, True, True, False) -async def _ensure_sync_result(result, calling_method): +_RT = TypeVar("_RT", bound="Result") + + +async def _ensure_sync_result(result: _RT, calling_method: Any) -> _RT: + cursor_result: CursorResult if not result._is_cursor: - cursor_result = getattr(result, "raw", None) + cursor_result = getattr(result, "raw", None) # type: ignore else: - cursor_result = result + cursor_result = result # type: ignore if cursor_result and cursor_result.context._is_server_side: await greenlet_spawn(cursor_result.close) raise async_exc.AsyncMethodRequired( |