diff options
Diffstat (limited to 'lib/sqlalchemy/ext/asyncio')
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/engine.py | 132 | ||||
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/result.py | 339 | ||||
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/scoping.py | 135 | ||||
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/session.py | 136 |
4 files changed, 697 insertions, 45 deletions
diff --git a/lib/sqlalchemy/ext/asyncio/engine.py b/lib/sqlalchemy/ext/asyncio/engine.py index fb05f512e..95549ada6 100644 --- a/lib/sqlalchemy/ext/asyncio/engine.py +++ b/lib/sqlalchemy/ext/asyncio/engine.py @@ -12,8 +12,10 @@ from typing import Generator from typing import NoReturn from typing import Optional from typing import overload +from typing import Tuple from typing import Type from typing import TYPE_CHECKING +from typing import TypeVar from typing import Union from . import exc as async_exc @@ -50,6 +52,9 @@ if TYPE_CHECKING: from ...pool import PoolProxiedConnection from ...sql._typing import _InfoType from ...sql.base import Executable + from ...sql.selectable import TypedReturnsRows + +_T = TypeVar("_T", bound=Any) class _SyncConnectionCallable(Protocol): @@ -407,7 +412,7 @@ class AsyncConnection( statement: str, parameters: Optional[_DBAPIAnyExecuteParams] = None, execution_options: Optional[_ExecuteOptionsParameter] = None, - ) -> CursorResult: + ) -> CursorResult[Any]: r"""Executes a driver-level SQL string and return buffered :class:`_engine.Result`. @@ -423,12 +428,33 @@ class AsyncConnection( return await _ensure_sync_result(result, self.exec_driver_sql) + @overload + async def stream( + self, + statement: TypedReturnsRows[_T], + parameters: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> AsyncResult[_T]: + ... + + @overload async def stream( self, statement: Executable, parameters: Optional[_CoreAnyExecuteParams] = None, + *, execution_options: Optional[_ExecuteOptionsParameter] = None, - ) -> AsyncResult: + ) -> AsyncResult[Any]: + ... + + async def stream( + self, + statement: Executable, + parameters: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> AsyncResult[Any]: """Execute a statement and return a streaming :class:`_asyncio.AsyncResult` object.""" @@ -436,7 +462,7 @@ class AsyncConnection( self._proxied.execute, statement, parameters, - util.EMPTY_DICT.merge_with( + execution_options=util.EMPTY_DICT.merge_with( execution_options, {"stream_results": True} ), _require_await=True, @@ -446,12 +472,33 @@ class AsyncConnection( assert False, "server side result expected" return AsyncResult(result) + @overload + async def execute( + self, + statement: TypedReturnsRows[_T], + parameters: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> CursorResult[_T]: + ... + + @overload async def execute( self, statement: Executable, parameters: Optional[_CoreAnyExecuteParams] = None, + *, execution_options: Optional[_ExecuteOptionsParameter] = None, - ) -> CursorResult: + ) -> CursorResult[Any]: + ... + + async def execute( + self, + statement: Executable, + parameters: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> CursorResult[Any]: r"""Executes a SQL statement construct and return a buffered :class:`_engine.Result`. @@ -487,15 +534,36 @@ class AsyncConnection( self._proxied.execute, statement, parameters, - execution_options, + execution_options=execution_options, _require_await=True, ) return await _ensure_sync_result(result, self.execute) + @overload + async def scalar( + self, + statement: TypedReturnsRows[Tuple[_T]], + parameters: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> Optional[_T]: + ... + + @overload async def scalar( self, statement: Executable, parameters: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> Any: + ... + + async def scalar( + self, + statement: Executable, + parameters: Optional[_CoreSingleExecuteParams] = None, + *, execution_options: Optional[_ExecuteOptionsParameter] = None, ) -> Any: r"""Executes a SQL statement construct and returns a scalar object. @@ -508,13 +576,36 @@ class AsyncConnection( first row returned. """ - result = await self.execute(statement, parameters, execution_options) + result = await self.execute( + statement, parameters, execution_options=execution_options + ) return result.scalar() + @overload + async def scalars( + self, + statement: TypedReturnsRows[Tuple[_T]], + parameters: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> ScalarResult[_T]: + ... + + @overload + async def scalars( + self, + statement: Executable, + parameters: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> ScalarResult[Any]: + ... + async def scalars( self, statement: Executable, parameters: Optional[_CoreSingleExecuteParams] = None, + *, execution_options: Optional[_ExecuteOptionsParameter] = None, ) -> ScalarResult[Any]: r"""Executes a SQL statement construct and returns a scalar objects. @@ -528,13 +619,36 @@ class AsyncConnection( .. versionadded:: 1.4.24 """ - result = await self.execute(statement, parameters, execution_options) + result = await self.execute( + statement, parameters, execution_options=execution_options + ) return result.scalars() + @overload + async def stream_scalars( + self, + statement: TypedReturnsRows[Tuple[_T]], + parameters: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> AsyncScalarResult[_T]: + ... + + @overload async def stream_scalars( self, statement: Executable, parameters: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> AsyncScalarResult[Any]: + ... + + async def stream_scalars( + self, + statement: Executable, + parameters: Optional[_CoreSingleExecuteParams] = None, + *, execution_options: Optional[_ExecuteOptionsParameter] = None, ) -> AsyncScalarResult[Any]: r"""Executes a SQL statement and returns a streaming scalar result @@ -549,7 +663,9 @@ class AsyncConnection( .. versionadded:: 1.4.24 """ - result = await self.stream(statement, parameters, execution_options) + result = await self.stream( + statement, parameters, execution_options=execution_options + ) return result.scalars() async def run_sync( diff --git a/lib/sqlalchemy/ext/asyncio/result.py b/lib/sqlalchemy/ext/asyncio/result.py index d0337554c..ff3dcf417 100644 --- a/lib/sqlalchemy/ext/asyncio/result.py +++ b/lib/sqlalchemy/ext/asyncio/result.py @@ -9,12 +9,15 @@ from __future__ import annotations import operator from typing import Any from typing import AsyncIterator -from typing import List from typing import Optional +from typing import overload +from typing import Sequence +from typing import Tuple from typing import TYPE_CHECKING from typing import TypeVar from . import exc as async_exc +from ... import util from ...engine.result import _NO_ROW from ...engine.result import _R from ...engine.result import FilterResult @@ -24,6 +27,7 @@ from ...engine.result import ResultMetaData from ...engine.row import Row from ...engine.row import RowMapping from ...util.concurrency import greenlet_spawn +from ...util.typing import Literal if TYPE_CHECKING: from ...engine import CursorResult @@ -32,9 +36,14 @@ if TYPE_CHECKING: from ...engine.result import _UniqueFilterType from ...engine.result import RMKeyView +_T = TypeVar("_T", bound=Any) +_TP = TypeVar("_TP", bound=Tuple[Any, ...]) + class AsyncCommon(FilterResult[_R]): - _real_result: Result + __slots__ = () + + _real_result: Result[Any] _metadata: ResultMetaData async def close(self) -> None: @@ -43,10 +52,10 @@ class AsyncCommon(FilterResult[_R]): await greenlet_spawn(self._real_result.close) -SelfAsyncResult = TypeVar("SelfAsyncResult", bound="AsyncResult") +SelfAsyncResult = TypeVar("SelfAsyncResult", bound="AsyncResult[Any]") -class AsyncResult(AsyncCommon[Row]): +class AsyncResult(AsyncCommon[Row[_TP]]): """An asyncio wrapper around a :class:`_result.Result` object. The :class:`_asyncio.AsyncResult` only applies to statement executions that @@ -67,11 +76,16 @@ class AsyncResult(AsyncCommon[Row]): """ - def __init__(self, real_result: Result): + __slots__ = () + + _real_result: Result[_TP] + + def __init__(self, real_result: Result[_TP]): self._real_result = real_result self._metadata = real_result._metadata self._unique_filter_state = real_result._unique_filter_state + self._post_creational_filter = None # BaseCursorResult pre-generates the "_row_getter". Use that # if available rather than building a second one @@ -80,6 +94,43 @@ class AsyncResult(AsyncCommon[Row]): "_row_getter", real_result.__dict__["_row_getter"] ) + @property + def t(self) -> AsyncTupleResult[_TP]: + """Apply a "typed tuple" typing filter to returned rows. + + The :attr:`.AsyncResult.t` attribute is a synonym for calling the + :meth:`.AsyncResult.tuples` method. + + .. versionadded:: 2.0 + + """ + return self # type: ignore + + def tuples(self) -> AsyncTupleResult[_TP]: + """Apply a "typed tuple" typing filter to returned rows. + + This method returns the same :class:`.AsyncResult` object at runtime, + however annotates as returning a :class:`.AsyncTupleResult` object + that will indicate to :pep:`484` typing tools that plain typed + ``Tuple`` instances are returned rather than rows. This allows + tuple unpacking and ``__getitem__`` access of :class:`.Row` objects + to by typed, for those cases where the statement invoked itself + included typing information. + + .. versionadded:: 2.0 + + :return: the :class:`_result.AsyncTupleResult` type at typing time. + + .. seealso:: + + :attr:`.AsyncResult.t` - shorter synonym + + :attr:`.Row.t` - :class:`.Row` version + + """ + + return self # type: ignore + def keys(self) -> RMKeyView: """Return the :meth:`_engine.Result.keys` collection from the underlying :class:`_engine.Result`. @@ -115,7 +166,7 @@ class AsyncResult(AsyncCommon[Row]): async def partitions( self, size: Optional[int] = None - ) -> AsyncIterator[List[Row]]: + ) -> AsyncIterator[Sequence[Row[_TP]]]: """Iterate through sub-lists of rows of the size given. An async iterator is returned:: @@ -141,7 +192,16 @@ class AsyncResult(AsyncCommon[Row]): else: break - async def fetchone(self) -> Optional[Row]: + async def fetchall(self) -> Sequence[Row[_TP]]: + """A synonym for the :meth:`.AsyncResult.all` method. + + .. versionadded:: 2.0 + + """ + + return await greenlet_spawn(self._allrows) + + async def fetchone(self) -> Optional[Row[_TP]]: """Fetch one row. When all rows are exhausted, returns None. @@ -163,7 +223,9 @@ class AsyncResult(AsyncCommon[Row]): else: return row - async def fetchmany(self, size: Optional[int] = None) -> List[Row]: + async def fetchmany( + self, size: Optional[int] = None + ) -> Sequence[Row[_TP]]: """Fetch many rows. When all rows are exhausted, returns an empty list. @@ -184,7 +246,7 @@ class AsyncResult(AsyncCommon[Row]): return await greenlet_spawn(self._manyrow_getter, self, size) - async def all(self) -> List[Row]: + async def all(self) -> Sequence[Row[_TP]]: """Return all rows in a list. Closes the result set after invocation. Subsequent invocations @@ -196,17 +258,17 @@ class AsyncResult(AsyncCommon[Row]): return await greenlet_spawn(self._allrows) - def __aiter__(self) -> AsyncResult: + def __aiter__(self) -> AsyncResult[_TP]: return self - async def __anext__(self) -> Row: + async def __anext__(self) -> Row[_TP]: row = await greenlet_spawn(self._onerow_getter, self) if row is _NO_ROW: raise StopAsyncIteration() else: return row - async def first(self) -> Optional[Row]: + async def first(self) -> Optional[Row[_TP]]: """Fetch the first row or None if no row is present. Closes the result set and discards remaining rows. @@ -229,7 +291,7 @@ class AsyncResult(AsyncCommon[Row]): """ return await greenlet_spawn(self._only_one_row, False, False, False) - async def one_or_none(self) -> Optional[Row]: + async def one_or_none(self) -> Optional[Row[_TP]]: """Return at most one result or raise an exception. Returns ``None`` if the result has no rows. @@ -251,6 +313,14 @@ class AsyncResult(AsyncCommon[Row]): """ return await greenlet_spawn(self._only_one_row, True, False, False) + @overload + async def scalar_one(self: AsyncResult[Tuple[_T]]) -> _T: + ... + + @overload + async def scalar_one(self) -> Any: + ... + async def scalar_one(self) -> Any: """Return exactly one scalar result or raise an exception. @@ -266,6 +336,16 @@ class AsyncResult(AsyncCommon[Row]): """ return await greenlet_spawn(self._only_one_row, True, True, True) + @overload + async def scalar_one_or_none( + self: AsyncResult[Tuple[_T]], + ) -> Optional[_T]: + ... + + @overload + async def scalar_one_or_none(self) -> Optional[Any]: + ... + async def scalar_one_or_none(self) -> Optional[Any]: """Return exactly one or no scalar result. @@ -281,7 +361,7 @@ class AsyncResult(AsyncCommon[Row]): """ return await greenlet_spawn(self._only_one_row, True, False, True) - async def one(self) -> Row: + async def one(self) -> Row[_TP]: """Return exactly one row or raise an exception. Raises :class:`.NoResultFound` if the result returns no @@ -312,6 +392,14 @@ class AsyncResult(AsyncCommon[Row]): """ return await greenlet_spawn(self._only_one_row, True, True, False) + @overload + async def scalar(self: AsyncResult[Tuple[_T]]) -> Optional[_T]: + ... + + @overload + async def scalar(self) -> Any: + ... + async def scalar(self) -> Any: """Fetch the first column of the first row, and close the result set. @@ -328,7 +416,7 @@ class AsyncResult(AsyncCommon[Row]): """ return await greenlet_spawn(self._only_one_row, False, False, True) - async def freeze(self) -> FrozenResult: + async def freeze(self) -> FrozenResult[_TP]: """Return a callable object that will produce copies of this :class:`_asyncio.AsyncResult` when invoked. @@ -351,7 +439,7 @@ class AsyncResult(AsyncCommon[Row]): return await greenlet_spawn(FrozenResult, self) - def merge(self, *others: AsyncResult) -> MergedResult: + def merge(self, *others: AsyncResult[_TP]) -> MergedResult[_TP]: """Merge this :class:`_asyncio.AsyncResult` with other compatible result objects. @@ -370,6 +458,20 @@ class AsyncResult(AsyncCommon[Row]): (self._real_result,) + tuple(o._real_result for o in others), ) + @overload + def scalars( + self: AsyncResult[Tuple[_T]], index: Literal[0] + ) -> AsyncScalarResult[_T]: + ... + + @overload + def scalars(self: AsyncResult[Tuple[_T]]) -> AsyncScalarResult[_T]: + ... + + @overload + def scalars(self, index: _KeyIndexType = 0) -> AsyncScalarResult[Any]: + ... + def scalars(self, index: _KeyIndexType = 0) -> AsyncScalarResult[Any]: """Return an :class:`_asyncio.AsyncScalarResult` filtering object which will return single elements rather than :class:`_row.Row` objects. @@ -423,9 +525,11 @@ class AsyncScalarResult(AsyncCommon[_R]): """ + __slots__ = () + _generate_rows = False - def __init__(self, real_result: Result, index: _KeyIndexType): + def __init__(self, real_result: Result[Any], index: _KeyIndexType): self._real_result = real_result if real_result._source_supports_scalars: @@ -452,7 +556,7 @@ class AsyncScalarResult(AsyncCommon[_R]): async def partitions( self, size: Optional[int] = None - ) -> AsyncIterator[List[_R]]: + ) -> AsyncIterator[Sequence[_R]]: """Iterate through sub-lists of elements of the size given. Equivalent to :meth:`_asyncio.AsyncResult.partitions` except that @@ -470,12 +574,12 @@ class AsyncScalarResult(AsyncCommon[_R]): else: break - async def fetchall(self) -> List[_R]: + async def fetchall(self) -> Sequence[_R]: """A synonym for the :meth:`_asyncio.AsyncScalarResult.all` method.""" return await greenlet_spawn(self._allrows) - async def fetchmany(self, size: Optional[int] = None) -> List[_R]: + async def fetchmany(self, size: Optional[int] = None) -> Sequence[_R]: """Fetch many objects. Equivalent to :meth:`_asyncio.AsyncResult.fetchmany` except that @@ -485,7 +589,7 @@ class AsyncScalarResult(AsyncCommon[_R]): """ return await greenlet_spawn(self._manyrow_getter, self, size) - async def all(self) -> List[_R]: + async def all(self) -> Sequence[_R]: """Return all scalar values in a list. Equivalent to :meth:`_asyncio.AsyncResult.all` except that @@ -555,11 +659,13 @@ class AsyncMappingResult(AsyncCommon[RowMapping]): """ + __slots__ = () + _generate_rows = True _post_creational_filter = operator.attrgetter("_mapping") - def __init__(self, result: Result): + def __init__(self, result: Result[Any]): self._real_result = result self._unique_filter_state = result._unique_filter_state self._metadata = result._metadata @@ -602,7 +708,7 @@ class AsyncMappingResult(AsyncCommon[RowMapping]): async def partitions( self, size: Optional[int] = None - ) -> AsyncIterator[List[RowMapping]]: + ) -> AsyncIterator[Sequence[RowMapping]]: """Iterate through sub-lists of elements of the size given. @@ -621,7 +727,7 @@ class AsyncMappingResult(AsyncCommon[RowMapping]): else: break - async def fetchall(self) -> List[RowMapping]: + async def fetchall(self) -> Sequence[RowMapping]: """A synonym for the :meth:`_asyncio.AsyncMappingResult.all` method.""" return await greenlet_spawn(self._allrows) @@ -641,7 +747,9 @@ class AsyncMappingResult(AsyncCommon[RowMapping]): else: return row - async def fetchmany(self, size: Optional[int] = None) -> List[RowMapping]: + async def fetchmany( + self, size: Optional[int] = None + ) -> Sequence[RowMapping]: """Fetch many rows. Equivalent to :meth:`_asyncio.AsyncResult.fetchmany` except that @@ -652,7 +760,7 @@ class AsyncMappingResult(AsyncCommon[RowMapping]): return await greenlet_spawn(self._manyrow_getter, self, size) - async def all(self) -> List[RowMapping]: + async def all(self) -> Sequence[RowMapping]: """Return all rows in a list. Equivalent to :meth:`_asyncio.AsyncResult.all` except that @@ -705,11 +813,186 @@ class AsyncMappingResult(AsyncCommon[RowMapping]): return await greenlet_spawn(self._only_one_row, True, True, False) -_RT = TypeVar("_RT", bound="Result") +SelfAsyncTupleResult = TypeVar( + "SelfAsyncTupleResult", bound="AsyncTupleResult[Any]" +) + + +class AsyncTupleResult(AsyncCommon[_R], util.TypingOnly): + """a :class:`.AsyncResult` that's typed as returning plain Python tuples + instead of rows. + + Since :class:`.Row` acts like a tuple in every way already, + this class is a typing only class, regular :class:`.AsyncResult` is + still used at runtime. + + """ + + __slots__ = () + + if TYPE_CHECKING: + + async def partitions( + self, size: Optional[int] = None + ) -> AsyncIterator[Sequence[_R]]: + """Iterate through sub-lists of elements of the size given. + + Equivalent to :meth:`_result.Result.partitions` except that + tuple values, rather than :class:`_result.Row` objects, + are returned. + + """ + ... + + async def fetchone(self) -> Optional[_R]: + """Fetch one tuple. + + Equivalent to :meth:`_result.Result.fetchone` except that + tuple values, rather than :class:`_result.Row` + objects, are returned. + + """ + ... + + async def fetchall(self) -> Sequence[_R]: + """A synonym for the :meth:`_engine.ScalarResult.all` method.""" + ... + + async def fetchmany(self, size: Optional[int] = None) -> Sequence[_R]: + """Fetch many objects. + + Equivalent to :meth:`_result.Result.fetchmany` except that + tuple values, rather than :class:`_result.Row` objects, + are returned. + + """ + ... + + async def all(self) -> Sequence[_R]: # noqa: A001 + """Return all scalar values in a list. + + Equivalent to :meth:`_result.Result.all` except that + tuple values, rather than :class:`_result.Row` objects, + are returned. + + """ + ... + + async def __aiter__(self) -> AsyncIterator[_R]: + ... + + async def __anext__(self) -> _R: + ... + + async def first(self) -> Optional[_R]: + """Fetch the first object or None if no object is present. + + Equivalent to :meth:`_result.Result.first` except that + tuple values, rather than :class:`_result.Row` objects, + are returned. + + + """ + ... + + async def one_or_none(self) -> Optional[_R]: + """Return at most one object or raise an exception. + + Equivalent to :meth:`_result.Result.one_or_none` except that + tuple values, rather than :class:`_result.Row` objects, + are returned. + + """ + ... + + async def one(self) -> _R: + """Return exactly one object or raise an exception. + + Equivalent to :meth:`_result.Result.one` except that + tuple values, rather than :class:`_result.Row` objects, + are returned. + + """ + ... + + @overload + async def scalar_one(self: AsyncTupleResult[Tuple[_T]]) -> _T: + ... + + @overload + async def scalar_one(self) -> Any: + ... + + async def scalar_one(self) -> Any: + """Return exactly one scalar result or raise an exception. + + This is equivalent to calling :meth:`.Result.scalars` and then + :meth:`.Result.one`. + + .. seealso:: + + :meth:`.Result.one` + + :meth:`.Result.scalars` + + """ + ... + + @overload + async def scalar_one_or_none( + self: AsyncTupleResult[Tuple[_T]], + ) -> Optional[_T]: + ... + + @overload + async def scalar_one_or_none(self) -> Optional[Any]: + ... + + async def scalar_one_or_none(self) -> Optional[Any]: + """Return exactly one or no scalar result. + + This is equivalent to calling :meth:`.Result.scalars` and then + :meth:`.Result.one_or_none`. + + .. seealso:: + + :meth:`.Result.one_or_none` + + :meth:`.Result.scalars` + + """ + ... + + @overload + async def scalar(self: AsyncTupleResult[Tuple[_T]]) -> Optional[_T]: + ... + + @overload + async def scalar(self) -> Any: + ... + + async def scalar(self) -> Any: + """Fetch the first column of the first row, and close the result set. + + Returns None if there are no rows to fetch. + + No validation is performed to test if additional rows remain. + + After calling this method, the object is fully closed, + e.g. the :meth:`_engine.CursorResult.close` + method will have been called. + + :return: a Python scalar value , or None if no rows remain. + + """ + ... + + +_RT = TypeVar("_RT", bound="Result[Any]") async def _ensure_sync_result(result: _RT, calling_method: Any) -> _RT: - cursor_result: CursorResult + cursor_result: CursorResult[Any] try: is_cursor = result._is_cursor diff --git a/lib/sqlalchemy/ext/asyncio/scoping.py b/lib/sqlalchemy/ext/asyncio/scoping.py index c7a6e2ca0..22a060a0d 100644 --- a/lib/sqlalchemy/ext/asyncio/scoping.py +++ b/lib/sqlalchemy/ext/asyncio/scoping.py @@ -12,10 +12,12 @@ from typing import Callable from typing import Iterable from typing import Iterator from typing import Optional +from typing import overload from typing import Sequence from typing import Tuple from typing import Type from typing import TYPE_CHECKING +from typing import TypeVar from typing import Union from .session import async_sessionmaker @@ -37,9 +39,9 @@ if TYPE_CHECKING: from ...engine import Engine from ...engine import Result from ...engine import Row + from ...engine import RowMapping from ...engine.interfaces import _CoreAnyExecuteParams from ...engine.interfaces import _CoreSingleExecuteParams - from ...engine.interfaces import _ExecuteOptions from ...engine.interfaces import _ExecuteOptionsParameter from ...engine.result import ScalarResult from ...orm._typing import _IdentityKeyType @@ -52,6 +54,9 @@ if TYPE_CHECKING: from ...sql.base import Executable from ...sql.elements import ClauseElement from ...sql.selectable import ForUpdateArg + from ...sql.selectable import TypedReturnsRows + +_T = TypeVar("_T", bound=Any) @create_proxy_methods( @@ -480,6 +485,32 @@ class async_scoped_session: return await self._proxied.delete(instance) + @overload + async def execute( + self, + statement: TypedReturnsRows[_T], + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + _parent_execute_state: Optional[Any] = None, + _add_event: Optional[Any] = None, + ) -> Result[_T]: + ... + + @overload + async def execute( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + _parent_execute_state: Optional[Any] = None, + _add_event: Optional[Any] = None, + ) -> Result[Any]: + ... + async def execute( self, statement: Executable, @@ -488,7 +519,7 @@ class async_scoped_session: execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, bind_arguments: Optional[_BindArguments] = None, **kw: Any, - ) -> Result: + ) -> Result[Any]: r"""Execute a statement and return a buffered :class:`_engine.Result` object. @@ -916,6 +947,30 @@ class async_scoped_session: return await self._proxied.rollback() + @overload + async def scalar( + self, + statement: TypedReturnsRows[Tuple[_T]], + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> Optional[_T]: + ... + + @overload + async def scalar( + self, + statement: Executable, + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> Any: + ... + async def scalar( self, statement: Executable, @@ -947,6 +1002,30 @@ class async_scoped_session: **kw, ) + @overload + async def scalars( + self, + statement: TypedReturnsRows[Tuple[_T]], + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> ScalarResult[_T]: + ... + + @overload + async def scalars( + self, + statement: Executable, + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> ScalarResult[Any]: + ... + async def scalars( self, statement: Executable, @@ -984,6 +1063,19 @@ class async_scoped_session: **kw, ) + @overload + async def stream( + self, + statement: TypedReturnsRows[_T], + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncResult[_T]: + ... + + @overload async def stream( self, statement: Executable, @@ -992,7 +1084,18 @@ class async_scoped_session: execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, bind_arguments: Optional[_BindArguments] = None, **kw: Any, - ) -> AsyncResult: + ) -> AsyncResult[Any]: + ... + + async def stream( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncResult[Any]: r"""Execute a statement and return a streaming :class:`_asyncio.AsyncResult` object. @@ -1012,6 +1115,30 @@ class async_scoped_session: **kw, ) + @overload + async def stream_scalars( + self, + statement: TypedReturnsRows[Tuple[_T]], + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncScalarResult[_T]: + ... + + @overload + async def stream_scalars( + self, + statement: Executable, + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncScalarResult[Any]: + ... + async def stream_scalars( self, statement: Executable, @@ -1323,7 +1450,7 @@ class async_scoped_session: ident: Union[Any, Tuple[Any, ...]] = None, *, instance: Optional[Any] = None, - row: Optional[Row] = None, + row: Optional[Union[Row[Any], RowMapping]] = None, identity_token: Optional[Any] = None, ) -> _IdentityKeyType[Any]: r"""Return an identity key. diff --git a/lib/sqlalchemy/ext/asyncio/session.py b/lib/sqlalchemy/ext/asyncio/session.py index 1422f99a3..f2a69e9cd 100644 --- a/lib/sqlalchemy/ext/asyncio/session.py +++ b/lib/sqlalchemy/ext/asyncio/session.py @@ -12,10 +12,12 @@ from typing import Iterable from typing import Iterator from typing import NoReturn from typing import Optional +from typing import overload from typing import Sequence from typing import Tuple from typing import Type from typing import TYPE_CHECKING +from typing import TypeVar from typing import Union from . import engine @@ -39,11 +41,10 @@ if TYPE_CHECKING: from ...engine import Engine from ...engine import Result from ...engine import Row + from ...engine import RowMapping from ...engine import ScalarResult - from ...engine import Transaction from ...engine.interfaces import _CoreAnyExecuteParams from ...engine.interfaces import _CoreSingleExecuteParams - from ...engine.interfaces import _ExecuteOptions from ...engine.interfaces import _ExecuteOptionsParameter from ...event import dispatcher from ...orm._typing import _IdentityKeyType @@ -59,9 +60,12 @@ if TYPE_CHECKING: from ...sql.base import Executable from ...sql.elements import ClauseElement from ...sql.selectable import ForUpdateArg + from ...sql.selectable import TypedReturnsRows _AsyncSessionBind = Union["AsyncEngine", "AsyncConnection"] +_T = TypeVar("_T", bound=Any) + class _SyncSessionCallable(Protocol): def __call__(self, session: Session, *arg: Any, **kw: Any) -> Any: @@ -257,6 +261,32 @@ class AsyncSession(ReversibleProxy[Session]): return await greenlet_spawn(fn, self.sync_session, *arg, **kw) + @overload + async def execute( + self, + statement: TypedReturnsRows[_T], + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + _parent_execute_state: Optional[Any] = None, + _add_event: Optional[Any] = None, + ) -> Result[_T]: + ... + + @overload + async def execute( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + _parent_execute_state: Optional[Any] = None, + _add_event: Optional[Any] = None, + ) -> Result[Any]: + ... + async def execute( self, statement: Executable, @@ -265,7 +295,7 @@ class AsyncSession(ReversibleProxy[Session]): execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, bind_arguments: Optional[_BindArguments] = None, **kw: Any, - ) -> Result: + ) -> Result[Any]: """Execute a statement and return a buffered :class:`_engine.Result` object. @@ -292,6 +322,30 @@ class AsyncSession(ReversibleProxy[Session]): ) return await _ensure_sync_result(result, self.execute) + @overload + async def scalar( + self, + statement: TypedReturnsRows[Tuple[_T]], + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> Optional[_T]: + ... + + @overload + async def scalar( + self, + statement: Executable, + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> Any: + ... + async def scalar( self, statement: Executable, @@ -326,6 +380,30 @@ class AsyncSession(ReversibleProxy[Session]): ) return result + @overload + async def scalars( + self, + statement: TypedReturnsRows[Tuple[_T]], + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> ScalarResult[_T]: + ... + + @overload + async def scalars( + self, + statement: Executable, + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> ScalarResult[Any]: + ... + async def scalars( self, statement: Executable, @@ -391,6 +469,30 @@ class AsyncSession(ReversibleProxy[Session]): ) return result_obj + @overload + async def stream( + self, + statement: TypedReturnsRows[_T], + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncResult[_T]: + ... + + @overload + async def stream( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncResult[Any]: + ... + async def stream( self, statement: Executable, @@ -399,7 +501,7 @@ class AsyncSession(ReversibleProxy[Session]): execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, bind_arguments: Optional[_BindArguments] = None, **kw: Any, - ) -> AsyncResult: + ) -> AsyncResult[Any]: """Execute a statement and return a streaming :class:`_asyncio.AsyncResult` object. @@ -423,6 +525,30 @@ class AsyncSession(ReversibleProxy[Session]): ) return AsyncResult(result) + @overload + async def stream_scalars( + self, + statement: TypedReturnsRows[Tuple[_T]], + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncScalarResult[_T]: + ... + + @overload + async def stream_scalars( + self, + statement: Executable, + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncScalarResult[Any]: + ... + async def stream_scalars( self, statement: Executable, @@ -1215,7 +1341,7 @@ class AsyncSession(ReversibleProxy[Session]): ident: Union[Any, Tuple[Any, ...]] = None, *, instance: Optional[Any] = None, - row: Optional[Row] = None, + row: Optional[Union[Row[Any], RowMapping]] = None, identity_token: Optional[Any] = None, ) -> _IdentityKeyType[Any]: r"""Return an identity key. |