diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-04-19 21:06:41 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-04-27 14:46:36 -0400 |
commit | ad11c482e2233f44e8747d4d5a2b17a995fff1fa (patch) | |
tree | 57f8ddd30928951519fd6ac0f418e9cbf8e65610 /lib/sqlalchemy/ext/asyncio | |
parent | 033d1a16e7a220555d7611a5b8cacb1bd83822ae (diff) | |
download | sqlalchemy-ad11c482e2233f44e8747d4d5a2b17a995fff1fa.tar.gz |
pep484 ORM / SQL result support
after some experimentation it seems mypy is more amenable
to the generic types being fully integrated rather than
having separate spin-off types. so key structures
like Result, Row, Select become generic. For DML
Insert, Update, Delete, these are spun into type-specific
subclasses ReturningInsert, ReturningUpdate, ReturningDelete,
which is fine since the "row-ness" of these constructs
doesn't happen until returning() is called in any case.
a Tuple based model is then integrated so that these
objects can carry along information about their return
types. Overloads at the .execute() level carry through
the Tuple from the invoked object to the result.
To suit the issue of AliasedClass generating attributes
that are dynamic, experimented with a custom subclass
AsAliased, but then just settled on having aliased()
lie to the type checker and return `Type[_O]`, essentially.
will need some type-related accessors for with_polymorphic()
also.
Additionally, identified an issue in Update when used
"mysql style" against a join(), it basically doesn't work
if asked to UPDATE two tables on the same column name.
added an error message to the specific condition where
it happens with a very non-specific error message that we
hit a thing we can't do right now, suggest multi-table
update as a possible cause.
Change-Id: I5eff7eefe1d6166ee74160b2785c5e6a81fa8b95
Diffstat (limited to 'lib/sqlalchemy/ext/asyncio')
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/engine.py | 132 | ||||
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/result.py | 339 | ||||
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/scoping.py | 135 | ||||
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/session.py | 136 |
4 files changed, 697 insertions, 45 deletions
diff --git a/lib/sqlalchemy/ext/asyncio/engine.py b/lib/sqlalchemy/ext/asyncio/engine.py index fb05f512e..95549ada6 100644 --- a/lib/sqlalchemy/ext/asyncio/engine.py +++ b/lib/sqlalchemy/ext/asyncio/engine.py @@ -12,8 +12,10 @@ from typing import Generator from typing import NoReturn from typing import Optional from typing import overload +from typing import Tuple from typing import Type from typing import TYPE_CHECKING +from typing import TypeVar from typing import Union from . import exc as async_exc @@ -50,6 +52,9 @@ if TYPE_CHECKING: from ...pool import PoolProxiedConnection from ...sql._typing import _InfoType from ...sql.base import Executable + from ...sql.selectable import TypedReturnsRows + +_T = TypeVar("_T", bound=Any) class _SyncConnectionCallable(Protocol): @@ -407,7 +412,7 @@ class AsyncConnection( statement: str, parameters: Optional[_DBAPIAnyExecuteParams] = None, execution_options: Optional[_ExecuteOptionsParameter] = None, - ) -> CursorResult: + ) -> CursorResult[Any]: r"""Executes a driver-level SQL string and return buffered :class:`_engine.Result`. @@ -423,12 +428,33 @@ class AsyncConnection( return await _ensure_sync_result(result, self.exec_driver_sql) + @overload + async def stream( + self, + statement: TypedReturnsRows[_T], + parameters: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> AsyncResult[_T]: + ... + + @overload async def stream( self, statement: Executable, parameters: Optional[_CoreAnyExecuteParams] = None, + *, execution_options: Optional[_ExecuteOptionsParameter] = None, - ) -> AsyncResult: + ) -> AsyncResult[Any]: + ... + + async def stream( + self, + statement: Executable, + parameters: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> AsyncResult[Any]: """Execute a statement and return a streaming :class:`_asyncio.AsyncResult` object.""" @@ -436,7 +462,7 @@ class AsyncConnection( self._proxied.execute, statement, parameters, - util.EMPTY_DICT.merge_with( + execution_options=util.EMPTY_DICT.merge_with( execution_options, {"stream_results": True} ), _require_await=True, @@ -446,12 +472,33 @@ class AsyncConnection( assert False, "server side result expected" return AsyncResult(result) + @overload + async def execute( + self, + statement: TypedReturnsRows[_T], + parameters: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> CursorResult[_T]: + ... + + @overload async def execute( self, statement: Executable, parameters: Optional[_CoreAnyExecuteParams] = None, + *, execution_options: Optional[_ExecuteOptionsParameter] = None, - ) -> CursorResult: + ) -> CursorResult[Any]: + ... + + async def execute( + self, + statement: Executable, + parameters: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> CursorResult[Any]: r"""Executes a SQL statement construct and return a buffered :class:`_engine.Result`. @@ -487,15 +534,36 @@ class AsyncConnection( self._proxied.execute, statement, parameters, - execution_options, + execution_options=execution_options, _require_await=True, ) return await _ensure_sync_result(result, self.execute) + @overload + async def scalar( + self, + statement: TypedReturnsRows[Tuple[_T]], + parameters: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> Optional[_T]: + ... + + @overload async def scalar( self, statement: Executable, parameters: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> Any: + ... + + async def scalar( + self, + statement: Executable, + parameters: Optional[_CoreSingleExecuteParams] = None, + *, execution_options: Optional[_ExecuteOptionsParameter] = None, ) -> Any: r"""Executes a SQL statement construct and returns a scalar object. @@ -508,13 +576,36 @@ class AsyncConnection( first row returned. """ - result = await self.execute(statement, parameters, execution_options) + result = await self.execute( + statement, parameters, execution_options=execution_options + ) return result.scalar() + @overload + async def scalars( + self, + statement: TypedReturnsRows[Tuple[_T]], + parameters: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> ScalarResult[_T]: + ... + + @overload + async def scalars( + self, + statement: Executable, + parameters: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> ScalarResult[Any]: + ... + async def scalars( self, statement: Executable, parameters: Optional[_CoreSingleExecuteParams] = None, + *, execution_options: Optional[_ExecuteOptionsParameter] = None, ) -> ScalarResult[Any]: r"""Executes a SQL statement construct and returns a scalar objects. @@ -528,13 +619,36 @@ class AsyncConnection( .. versionadded:: 1.4.24 """ - result = await self.execute(statement, parameters, execution_options) + result = await self.execute( + statement, parameters, execution_options=execution_options + ) return result.scalars() + @overload + async def stream_scalars( + self, + statement: TypedReturnsRows[Tuple[_T]], + parameters: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> AsyncScalarResult[_T]: + ... + + @overload async def stream_scalars( self, statement: Executable, parameters: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> AsyncScalarResult[Any]: + ... + + async def stream_scalars( + self, + statement: Executable, + parameters: Optional[_CoreSingleExecuteParams] = None, + *, execution_options: Optional[_ExecuteOptionsParameter] = None, ) -> AsyncScalarResult[Any]: r"""Executes a SQL statement and returns a streaming scalar result @@ -549,7 +663,9 @@ class AsyncConnection( .. versionadded:: 1.4.24 """ - result = await self.stream(statement, parameters, execution_options) + result = await self.stream( + statement, parameters, execution_options=execution_options + ) return result.scalars() async def run_sync( diff --git a/lib/sqlalchemy/ext/asyncio/result.py b/lib/sqlalchemy/ext/asyncio/result.py index d0337554c..ff3dcf417 100644 --- a/lib/sqlalchemy/ext/asyncio/result.py +++ b/lib/sqlalchemy/ext/asyncio/result.py @@ -9,12 +9,15 @@ from __future__ import annotations import operator from typing import Any from typing import AsyncIterator -from typing import List from typing import Optional +from typing import overload +from typing import Sequence +from typing import Tuple from typing import TYPE_CHECKING from typing import TypeVar from . import exc as async_exc +from ... import util from ...engine.result import _NO_ROW from ...engine.result import _R from ...engine.result import FilterResult @@ -24,6 +27,7 @@ from ...engine.result import ResultMetaData from ...engine.row import Row from ...engine.row import RowMapping from ...util.concurrency import greenlet_spawn +from ...util.typing import Literal if TYPE_CHECKING: from ...engine import CursorResult @@ -32,9 +36,14 @@ if TYPE_CHECKING: from ...engine.result import _UniqueFilterType from ...engine.result import RMKeyView +_T = TypeVar("_T", bound=Any) +_TP = TypeVar("_TP", bound=Tuple[Any, ...]) + class AsyncCommon(FilterResult[_R]): - _real_result: Result + __slots__ = () + + _real_result: Result[Any] _metadata: ResultMetaData async def close(self) -> None: @@ -43,10 +52,10 @@ class AsyncCommon(FilterResult[_R]): await greenlet_spawn(self._real_result.close) -SelfAsyncResult = TypeVar("SelfAsyncResult", bound="AsyncResult") +SelfAsyncResult = TypeVar("SelfAsyncResult", bound="AsyncResult[Any]") -class AsyncResult(AsyncCommon[Row]): +class AsyncResult(AsyncCommon[Row[_TP]]): """An asyncio wrapper around a :class:`_result.Result` object. The :class:`_asyncio.AsyncResult` only applies to statement executions that @@ -67,11 +76,16 @@ class AsyncResult(AsyncCommon[Row]): """ - def __init__(self, real_result: Result): + __slots__ = () + + _real_result: Result[_TP] + + def __init__(self, real_result: Result[_TP]): self._real_result = real_result self._metadata = real_result._metadata self._unique_filter_state = real_result._unique_filter_state + self._post_creational_filter = None # BaseCursorResult pre-generates the "_row_getter". Use that # if available rather than building a second one @@ -80,6 +94,43 @@ class AsyncResult(AsyncCommon[Row]): "_row_getter", real_result.__dict__["_row_getter"] ) + @property + def t(self) -> AsyncTupleResult[_TP]: + """Apply a "typed tuple" typing filter to returned rows. + + The :attr:`.AsyncResult.t` attribute is a synonym for calling the + :meth:`.AsyncResult.tuples` method. + + .. versionadded:: 2.0 + + """ + return self # type: ignore + + def tuples(self) -> AsyncTupleResult[_TP]: + """Apply a "typed tuple" typing filter to returned rows. + + This method returns the same :class:`.AsyncResult` object at runtime, + however annotates as returning a :class:`.AsyncTupleResult` object + that will indicate to :pep:`484` typing tools that plain typed + ``Tuple`` instances are returned rather than rows. This allows + tuple unpacking and ``__getitem__`` access of :class:`.Row` objects + to by typed, for those cases where the statement invoked itself + included typing information. + + .. versionadded:: 2.0 + + :return: the :class:`_result.AsyncTupleResult` type at typing time. + + .. seealso:: + + :attr:`.AsyncResult.t` - shorter synonym + + :attr:`.Row.t` - :class:`.Row` version + + """ + + return self # type: ignore + def keys(self) -> RMKeyView: """Return the :meth:`_engine.Result.keys` collection from the underlying :class:`_engine.Result`. @@ -115,7 +166,7 @@ class AsyncResult(AsyncCommon[Row]): async def partitions( self, size: Optional[int] = None - ) -> AsyncIterator[List[Row]]: + ) -> AsyncIterator[Sequence[Row[_TP]]]: """Iterate through sub-lists of rows of the size given. An async iterator is returned:: @@ -141,7 +192,16 @@ class AsyncResult(AsyncCommon[Row]): else: break - async def fetchone(self) -> Optional[Row]: + async def fetchall(self) -> Sequence[Row[_TP]]: + """A synonym for the :meth:`.AsyncResult.all` method. + + .. versionadded:: 2.0 + + """ + + return await greenlet_spawn(self._allrows) + + async def fetchone(self) -> Optional[Row[_TP]]: """Fetch one row. When all rows are exhausted, returns None. @@ -163,7 +223,9 @@ class AsyncResult(AsyncCommon[Row]): else: return row - async def fetchmany(self, size: Optional[int] = None) -> List[Row]: + async def fetchmany( + self, size: Optional[int] = None + ) -> Sequence[Row[_TP]]: """Fetch many rows. When all rows are exhausted, returns an empty list. @@ -184,7 +246,7 @@ class AsyncResult(AsyncCommon[Row]): return await greenlet_spawn(self._manyrow_getter, self, size) - async def all(self) -> List[Row]: + async def all(self) -> Sequence[Row[_TP]]: """Return all rows in a list. Closes the result set after invocation. Subsequent invocations @@ -196,17 +258,17 @@ class AsyncResult(AsyncCommon[Row]): return await greenlet_spawn(self._allrows) - def __aiter__(self) -> AsyncResult: + def __aiter__(self) -> AsyncResult[_TP]: return self - async def __anext__(self) -> Row: + async def __anext__(self) -> Row[_TP]: row = await greenlet_spawn(self._onerow_getter, self) if row is _NO_ROW: raise StopAsyncIteration() else: return row - async def first(self) -> Optional[Row]: + async def first(self) -> Optional[Row[_TP]]: """Fetch the first row or None if no row is present. Closes the result set and discards remaining rows. @@ -229,7 +291,7 @@ class AsyncResult(AsyncCommon[Row]): """ return await greenlet_spawn(self._only_one_row, False, False, False) - async def one_or_none(self) -> Optional[Row]: + async def one_or_none(self) -> Optional[Row[_TP]]: """Return at most one result or raise an exception. Returns ``None`` if the result has no rows. @@ -251,6 +313,14 @@ class AsyncResult(AsyncCommon[Row]): """ return await greenlet_spawn(self._only_one_row, True, False, False) + @overload + async def scalar_one(self: AsyncResult[Tuple[_T]]) -> _T: + ... + + @overload + async def scalar_one(self) -> Any: + ... + async def scalar_one(self) -> Any: """Return exactly one scalar result or raise an exception. @@ -266,6 +336,16 @@ class AsyncResult(AsyncCommon[Row]): """ return await greenlet_spawn(self._only_one_row, True, True, True) + @overload + async def scalar_one_or_none( + self: AsyncResult[Tuple[_T]], + ) -> Optional[_T]: + ... + + @overload + async def scalar_one_or_none(self) -> Optional[Any]: + ... + async def scalar_one_or_none(self) -> Optional[Any]: """Return exactly one or no scalar result. @@ -281,7 +361,7 @@ class AsyncResult(AsyncCommon[Row]): """ return await greenlet_spawn(self._only_one_row, True, False, True) - async def one(self) -> Row: + async def one(self) -> Row[_TP]: """Return exactly one row or raise an exception. Raises :class:`.NoResultFound` if the result returns no @@ -312,6 +392,14 @@ class AsyncResult(AsyncCommon[Row]): """ return await greenlet_spawn(self._only_one_row, True, True, False) + @overload + async def scalar(self: AsyncResult[Tuple[_T]]) -> Optional[_T]: + ... + + @overload + async def scalar(self) -> Any: + ... + async def scalar(self) -> Any: """Fetch the first column of the first row, and close the result set. @@ -328,7 +416,7 @@ class AsyncResult(AsyncCommon[Row]): """ return await greenlet_spawn(self._only_one_row, False, False, True) - async def freeze(self) -> FrozenResult: + async def freeze(self) -> FrozenResult[_TP]: """Return a callable object that will produce copies of this :class:`_asyncio.AsyncResult` when invoked. @@ -351,7 +439,7 @@ class AsyncResult(AsyncCommon[Row]): return await greenlet_spawn(FrozenResult, self) - def merge(self, *others: AsyncResult) -> MergedResult: + def merge(self, *others: AsyncResult[_TP]) -> MergedResult[_TP]: """Merge this :class:`_asyncio.AsyncResult` with other compatible result objects. @@ -370,6 +458,20 @@ class AsyncResult(AsyncCommon[Row]): (self._real_result,) + tuple(o._real_result for o in others), ) + @overload + def scalars( + self: AsyncResult[Tuple[_T]], index: Literal[0] + ) -> AsyncScalarResult[_T]: + ... + + @overload + def scalars(self: AsyncResult[Tuple[_T]]) -> AsyncScalarResult[_T]: + ... + + @overload + def scalars(self, index: _KeyIndexType = 0) -> AsyncScalarResult[Any]: + ... + def scalars(self, index: _KeyIndexType = 0) -> AsyncScalarResult[Any]: """Return an :class:`_asyncio.AsyncScalarResult` filtering object which will return single elements rather than :class:`_row.Row` objects. @@ -423,9 +525,11 @@ class AsyncScalarResult(AsyncCommon[_R]): """ + __slots__ = () + _generate_rows = False - def __init__(self, real_result: Result, index: _KeyIndexType): + def __init__(self, real_result: Result[Any], index: _KeyIndexType): self._real_result = real_result if real_result._source_supports_scalars: @@ -452,7 +556,7 @@ class AsyncScalarResult(AsyncCommon[_R]): async def partitions( self, size: Optional[int] = None - ) -> AsyncIterator[List[_R]]: + ) -> AsyncIterator[Sequence[_R]]: """Iterate through sub-lists of elements of the size given. Equivalent to :meth:`_asyncio.AsyncResult.partitions` except that @@ -470,12 +574,12 @@ class AsyncScalarResult(AsyncCommon[_R]): else: break - async def fetchall(self) -> List[_R]: + async def fetchall(self) -> Sequence[_R]: """A synonym for the :meth:`_asyncio.AsyncScalarResult.all` method.""" return await greenlet_spawn(self._allrows) - async def fetchmany(self, size: Optional[int] = None) -> List[_R]: + async def fetchmany(self, size: Optional[int] = None) -> Sequence[_R]: """Fetch many objects. Equivalent to :meth:`_asyncio.AsyncResult.fetchmany` except that @@ -485,7 +589,7 @@ class AsyncScalarResult(AsyncCommon[_R]): """ return await greenlet_spawn(self._manyrow_getter, self, size) - async def all(self) -> List[_R]: + async def all(self) -> Sequence[_R]: """Return all scalar values in a list. Equivalent to :meth:`_asyncio.AsyncResult.all` except that @@ -555,11 +659,13 @@ class AsyncMappingResult(AsyncCommon[RowMapping]): """ + __slots__ = () + _generate_rows = True _post_creational_filter = operator.attrgetter("_mapping") - def __init__(self, result: Result): + def __init__(self, result: Result[Any]): self._real_result = result self._unique_filter_state = result._unique_filter_state self._metadata = result._metadata @@ -602,7 +708,7 @@ class AsyncMappingResult(AsyncCommon[RowMapping]): async def partitions( self, size: Optional[int] = None - ) -> AsyncIterator[List[RowMapping]]: + ) -> AsyncIterator[Sequence[RowMapping]]: """Iterate through sub-lists of elements of the size given. @@ -621,7 +727,7 @@ class AsyncMappingResult(AsyncCommon[RowMapping]): else: break - async def fetchall(self) -> List[RowMapping]: + async def fetchall(self) -> Sequence[RowMapping]: """A synonym for the :meth:`_asyncio.AsyncMappingResult.all` method.""" return await greenlet_spawn(self._allrows) @@ -641,7 +747,9 @@ class AsyncMappingResult(AsyncCommon[RowMapping]): else: return row - async def fetchmany(self, size: Optional[int] = None) -> List[RowMapping]: + async def fetchmany( + self, size: Optional[int] = None + ) -> Sequence[RowMapping]: """Fetch many rows. Equivalent to :meth:`_asyncio.AsyncResult.fetchmany` except that @@ -652,7 +760,7 @@ class AsyncMappingResult(AsyncCommon[RowMapping]): return await greenlet_spawn(self._manyrow_getter, self, size) - async def all(self) -> List[RowMapping]: + async def all(self) -> Sequence[RowMapping]: """Return all rows in a list. Equivalent to :meth:`_asyncio.AsyncResult.all` except that @@ -705,11 +813,186 @@ class AsyncMappingResult(AsyncCommon[RowMapping]): return await greenlet_spawn(self._only_one_row, True, True, False) -_RT = TypeVar("_RT", bound="Result") +SelfAsyncTupleResult = TypeVar( + "SelfAsyncTupleResult", bound="AsyncTupleResult[Any]" +) + + +class AsyncTupleResult(AsyncCommon[_R], util.TypingOnly): + """a :class:`.AsyncResult` that's typed as returning plain Python tuples + instead of rows. + + Since :class:`.Row` acts like a tuple in every way already, + this class is a typing only class, regular :class:`.AsyncResult` is + still used at runtime. + + """ + + __slots__ = () + + if TYPE_CHECKING: + + async def partitions( + self, size: Optional[int] = None + ) -> AsyncIterator[Sequence[_R]]: + """Iterate through sub-lists of elements of the size given. + + Equivalent to :meth:`_result.Result.partitions` except that + tuple values, rather than :class:`_result.Row` objects, + are returned. + + """ + ... + + async def fetchone(self) -> Optional[_R]: + """Fetch one tuple. + + Equivalent to :meth:`_result.Result.fetchone` except that + tuple values, rather than :class:`_result.Row` + objects, are returned. + + """ + ... + + async def fetchall(self) -> Sequence[_R]: + """A synonym for the :meth:`_engine.ScalarResult.all` method.""" + ... + + async def fetchmany(self, size: Optional[int] = None) -> Sequence[_R]: + """Fetch many objects. + + Equivalent to :meth:`_result.Result.fetchmany` except that + tuple values, rather than :class:`_result.Row` objects, + are returned. + + """ + ... + + async def all(self) -> Sequence[_R]: # noqa: A001 + """Return all scalar values in a list. + + Equivalent to :meth:`_result.Result.all` except that + tuple values, rather than :class:`_result.Row` objects, + are returned. + + """ + ... + + async def __aiter__(self) -> AsyncIterator[_R]: + ... + + async def __anext__(self) -> _R: + ... + + async def first(self) -> Optional[_R]: + """Fetch the first object or None if no object is present. + + Equivalent to :meth:`_result.Result.first` except that + tuple values, rather than :class:`_result.Row` objects, + are returned. + + + """ + ... + + async def one_or_none(self) -> Optional[_R]: + """Return at most one object or raise an exception. + + Equivalent to :meth:`_result.Result.one_or_none` except that + tuple values, rather than :class:`_result.Row` objects, + are returned. + + """ + ... + + async def one(self) -> _R: + """Return exactly one object or raise an exception. + + Equivalent to :meth:`_result.Result.one` except that + tuple values, rather than :class:`_result.Row` objects, + are returned. + + """ + ... + + @overload + async def scalar_one(self: AsyncTupleResult[Tuple[_T]]) -> _T: + ... + + @overload + async def scalar_one(self) -> Any: + ... + + async def scalar_one(self) -> Any: + """Return exactly one scalar result or raise an exception. + + This is equivalent to calling :meth:`.Result.scalars` and then + :meth:`.Result.one`. + + .. seealso:: + + :meth:`.Result.one` + + :meth:`.Result.scalars` + + """ + ... + + @overload + async def scalar_one_or_none( + self: AsyncTupleResult[Tuple[_T]], + ) -> Optional[_T]: + ... + + @overload + async def scalar_one_or_none(self) -> Optional[Any]: + ... + + async def scalar_one_or_none(self) -> Optional[Any]: + """Return exactly one or no scalar result. + + This is equivalent to calling :meth:`.Result.scalars` and then + :meth:`.Result.one_or_none`. + + .. seealso:: + + :meth:`.Result.one_or_none` + + :meth:`.Result.scalars` + + """ + ... + + @overload + async def scalar(self: AsyncTupleResult[Tuple[_T]]) -> Optional[_T]: + ... + + @overload + async def scalar(self) -> Any: + ... + + async def scalar(self) -> Any: + """Fetch the first column of the first row, and close the result set. + + Returns None if there are no rows to fetch. + + No validation is performed to test if additional rows remain. + + After calling this method, the object is fully closed, + e.g. the :meth:`_engine.CursorResult.close` + method will have been called. + + :return: a Python scalar value , or None if no rows remain. + + """ + ... + + +_RT = TypeVar("_RT", bound="Result[Any]") async def _ensure_sync_result(result: _RT, calling_method: Any) -> _RT: - cursor_result: CursorResult + cursor_result: CursorResult[Any] try: is_cursor = result._is_cursor diff --git a/lib/sqlalchemy/ext/asyncio/scoping.py b/lib/sqlalchemy/ext/asyncio/scoping.py index c7a6e2ca0..22a060a0d 100644 --- a/lib/sqlalchemy/ext/asyncio/scoping.py +++ b/lib/sqlalchemy/ext/asyncio/scoping.py @@ -12,10 +12,12 @@ from typing import Callable from typing import Iterable from typing import Iterator from typing import Optional +from typing import overload from typing import Sequence from typing import Tuple from typing import Type from typing import TYPE_CHECKING +from typing import TypeVar from typing import Union from .session import async_sessionmaker @@ -37,9 +39,9 @@ if TYPE_CHECKING: from ...engine import Engine from ...engine import Result from ...engine import Row + from ...engine import RowMapping from ...engine.interfaces import _CoreAnyExecuteParams from ...engine.interfaces import _CoreSingleExecuteParams - from ...engine.interfaces import _ExecuteOptions from ...engine.interfaces import _ExecuteOptionsParameter from ...engine.result import ScalarResult from ...orm._typing import _IdentityKeyType @@ -52,6 +54,9 @@ if TYPE_CHECKING: from ...sql.base import Executable from ...sql.elements import ClauseElement from ...sql.selectable import ForUpdateArg + from ...sql.selectable import TypedReturnsRows + +_T = TypeVar("_T", bound=Any) @create_proxy_methods( @@ -480,6 +485,32 @@ class async_scoped_session: return await self._proxied.delete(instance) + @overload + async def execute( + self, + statement: TypedReturnsRows[_T], + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + _parent_execute_state: Optional[Any] = None, + _add_event: Optional[Any] = None, + ) -> Result[_T]: + ... + + @overload + async def execute( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + _parent_execute_state: Optional[Any] = None, + _add_event: Optional[Any] = None, + ) -> Result[Any]: + ... + async def execute( self, statement: Executable, @@ -488,7 +519,7 @@ class async_scoped_session: execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, bind_arguments: Optional[_BindArguments] = None, **kw: Any, - ) -> Result: + ) -> Result[Any]: r"""Execute a statement and return a buffered :class:`_engine.Result` object. @@ -916,6 +947,30 @@ class async_scoped_session: return await self._proxied.rollback() + @overload + async def scalar( + self, + statement: TypedReturnsRows[Tuple[_T]], + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> Optional[_T]: + ... + + @overload + async def scalar( + self, + statement: Executable, + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> Any: + ... + async def scalar( self, statement: Executable, @@ -947,6 +1002,30 @@ class async_scoped_session: **kw, ) + @overload + async def scalars( + self, + statement: TypedReturnsRows[Tuple[_T]], + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> ScalarResult[_T]: + ... + + @overload + async def scalars( + self, + statement: Executable, + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> ScalarResult[Any]: + ... + async def scalars( self, statement: Executable, @@ -984,6 +1063,19 @@ class async_scoped_session: **kw, ) + @overload + async def stream( + self, + statement: TypedReturnsRows[_T], + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncResult[_T]: + ... + + @overload async def stream( self, statement: Executable, @@ -992,7 +1084,18 @@ class async_scoped_session: execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, bind_arguments: Optional[_BindArguments] = None, **kw: Any, - ) -> AsyncResult: + ) -> AsyncResult[Any]: + ... + + async def stream( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncResult[Any]: r"""Execute a statement and return a streaming :class:`_asyncio.AsyncResult` object. @@ -1012,6 +1115,30 @@ class async_scoped_session: **kw, ) + @overload + async def stream_scalars( + self, + statement: TypedReturnsRows[Tuple[_T]], + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncScalarResult[_T]: + ... + + @overload + async def stream_scalars( + self, + statement: Executable, + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncScalarResult[Any]: + ... + async def stream_scalars( self, statement: Executable, @@ -1323,7 +1450,7 @@ class async_scoped_session: ident: Union[Any, Tuple[Any, ...]] = None, *, instance: Optional[Any] = None, - row: Optional[Row] = None, + row: Optional[Union[Row[Any], RowMapping]] = None, identity_token: Optional[Any] = None, ) -> _IdentityKeyType[Any]: r"""Return an identity key. diff --git a/lib/sqlalchemy/ext/asyncio/session.py b/lib/sqlalchemy/ext/asyncio/session.py index 1422f99a3..f2a69e9cd 100644 --- a/lib/sqlalchemy/ext/asyncio/session.py +++ b/lib/sqlalchemy/ext/asyncio/session.py @@ -12,10 +12,12 @@ from typing import Iterable from typing import Iterator from typing import NoReturn from typing import Optional +from typing import overload from typing import Sequence from typing import Tuple from typing import Type from typing import TYPE_CHECKING +from typing import TypeVar from typing import Union from . import engine @@ -39,11 +41,10 @@ if TYPE_CHECKING: from ...engine import Engine from ...engine import Result from ...engine import Row + from ...engine import RowMapping from ...engine import ScalarResult - from ...engine import Transaction from ...engine.interfaces import _CoreAnyExecuteParams from ...engine.interfaces import _CoreSingleExecuteParams - from ...engine.interfaces import _ExecuteOptions from ...engine.interfaces import _ExecuteOptionsParameter from ...event import dispatcher from ...orm._typing import _IdentityKeyType @@ -59,9 +60,12 @@ if TYPE_CHECKING: from ...sql.base import Executable from ...sql.elements import ClauseElement from ...sql.selectable import ForUpdateArg + from ...sql.selectable import TypedReturnsRows _AsyncSessionBind = Union["AsyncEngine", "AsyncConnection"] +_T = TypeVar("_T", bound=Any) + class _SyncSessionCallable(Protocol): def __call__(self, session: Session, *arg: Any, **kw: Any) -> Any: @@ -257,6 +261,32 @@ class AsyncSession(ReversibleProxy[Session]): return await greenlet_spawn(fn, self.sync_session, *arg, **kw) + @overload + async def execute( + self, + statement: TypedReturnsRows[_T], + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + _parent_execute_state: Optional[Any] = None, + _add_event: Optional[Any] = None, + ) -> Result[_T]: + ... + + @overload + async def execute( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + _parent_execute_state: Optional[Any] = None, + _add_event: Optional[Any] = None, + ) -> Result[Any]: + ... + async def execute( self, statement: Executable, @@ -265,7 +295,7 @@ class AsyncSession(ReversibleProxy[Session]): execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, bind_arguments: Optional[_BindArguments] = None, **kw: Any, - ) -> Result: + ) -> Result[Any]: """Execute a statement and return a buffered :class:`_engine.Result` object. @@ -292,6 +322,30 @@ class AsyncSession(ReversibleProxy[Session]): ) return await _ensure_sync_result(result, self.execute) + @overload + async def scalar( + self, + statement: TypedReturnsRows[Tuple[_T]], + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> Optional[_T]: + ... + + @overload + async def scalar( + self, + statement: Executable, + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> Any: + ... + async def scalar( self, statement: Executable, @@ -326,6 +380,30 @@ class AsyncSession(ReversibleProxy[Session]): ) return result + @overload + async def scalars( + self, + statement: TypedReturnsRows[Tuple[_T]], + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> ScalarResult[_T]: + ... + + @overload + async def scalars( + self, + statement: Executable, + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> ScalarResult[Any]: + ... + async def scalars( self, statement: Executable, @@ -391,6 +469,30 @@ class AsyncSession(ReversibleProxy[Session]): ) return result_obj + @overload + async def stream( + self, + statement: TypedReturnsRows[_T], + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncResult[_T]: + ... + + @overload + async def stream( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncResult[Any]: + ... + async def stream( self, statement: Executable, @@ -399,7 +501,7 @@ class AsyncSession(ReversibleProxy[Session]): execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, bind_arguments: Optional[_BindArguments] = None, **kw: Any, - ) -> AsyncResult: + ) -> AsyncResult[Any]: """Execute a statement and return a streaming :class:`_asyncio.AsyncResult` object. @@ -423,6 +525,30 @@ class AsyncSession(ReversibleProxy[Session]): ) return AsyncResult(result) + @overload + async def stream_scalars( + self, + statement: TypedReturnsRows[Tuple[_T]], + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncScalarResult[_T]: + ... + + @overload + async def stream_scalars( + self, + statement: Executable, + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncScalarResult[Any]: + ... + async def stream_scalars( self, statement: Executable, @@ -1215,7 +1341,7 @@ class AsyncSession(ReversibleProxy[Session]): ident: Union[Any, Tuple[Any, ...]] = None, *, instance: Optional[Any] = None, - row: Optional[Row] = None, + row: Optional[Union[Row[Any], RowMapping]] = None, identity_token: Optional[Any] = None, ) -> _IdentityKeyType[Any]: r"""Return an identity key. |