summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/ext/asyncio
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/ext/asyncio')
-rw-r--r--lib/sqlalchemy/ext/asyncio/engine.py132
-rw-r--r--lib/sqlalchemy/ext/asyncio/result.py339
-rw-r--r--lib/sqlalchemy/ext/asyncio/scoping.py135
-rw-r--r--lib/sqlalchemy/ext/asyncio/session.py136
4 files changed, 697 insertions, 45 deletions
diff --git a/lib/sqlalchemy/ext/asyncio/engine.py b/lib/sqlalchemy/ext/asyncio/engine.py
index fb05f512e..95549ada6 100644
--- a/lib/sqlalchemy/ext/asyncio/engine.py
+++ b/lib/sqlalchemy/ext/asyncio/engine.py
@@ -12,8 +12,10 @@ from typing import Generator
from typing import NoReturn
from typing import Optional
from typing import overload
+from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
+from typing import TypeVar
from typing import Union
from . import exc as async_exc
@@ -50,6 +52,9 @@ if TYPE_CHECKING:
from ...pool import PoolProxiedConnection
from ...sql._typing import _InfoType
from ...sql.base import Executable
+ from ...sql.selectable import TypedReturnsRows
+
+_T = TypeVar("_T", bound=Any)
class _SyncConnectionCallable(Protocol):
@@ -407,7 +412,7 @@ class AsyncConnection(
statement: str,
parameters: Optional[_DBAPIAnyExecuteParams] = None,
execution_options: Optional[_ExecuteOptionsParameter] = None,
- ) -> CursorResult:
+ ) -> CursorResult[Any]:
r"""Executes a driver-level SQL string and return buffered
:class:`_engine.Result`.
@@ -423,12 +428,33 @@ class AsyncConnection(
return await _ensure_sync_result(result, self.exec_driver_sql)
+ @overload
+ async def stream(
+ self,
+ statement: TypedReturnsRows[_T],
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> AsyncResult[_T]:
+ ...
+
+ @overload
async def stream(
self,
statement: Executable,
parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
execution_options: Optional[_ExecuteOptionsParameter] = None,
- ) -> AsyncResult:
+ ) -> AsyncResult[Any]:
+ ...
+
+ async def stream(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> AsyncResult[Any]:
"""Execute a statement and return a streaming
:class:`_asyncio.AsyncResult` object."""
@@ -436,7 +462,7 @@ class AsyncConnection(
self._proxied.execute,
statement,
parameters,
- util.EMPTY_DICT.merge_with(
+ execution_options=util.EMPTY_DICT.merge_with(
execution_options, {"stream_results": True}
),
_require_await=True,
@@ -446,12 +472,33 @@ class AsyncConnection(
assert False, "server side result expected"
return AsyncResult(result)
+ @overload
+ async def execute(
+ self,
+ statement: TypedReturnsRows[_T],
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> CursorResult[_T]:
+ ...
+
+ @overload
async def execute(
self,
statement: Executable,
parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
execution_options: Optional[_ExecuteOptionsParameter] = None,
- ) -> CursorResult:
+ ) -> CursorResult[Any]:
+ ...
+
+ async def execute(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> CursorResult[Any]:
r"""Executes a SQL statement construct and return a buffered
:class:`_engine.Result`.
@@ -487,15 +534,36 @@ class AsyncConnection(
self._proxied.execute,
statement,
parameters,
- execution_options,
+ execution_options=execution_options,
_require_await=True,
)
return await _ensure_sync_result(result, self.execute)
+ @overload
+ async def scalar(
+ self,
+ statement: TypedReturnsRows[Tuple[_T]],
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> Optional[_T]:
+ ...
+
+ @overload
async def scalar(
self,
statement: Executable,
parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> Any:
+ ...
+
+ async def scalar(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
execution_options: Optional[_ExecuteOptionsParameter] = None,
) -> Any:
r"""Executes a SQL statement construct and returns a scalar object.
@@ -508,13 +576,36 @@ class AsyncConnection(
first row returned.
"""
- result = await self.execute(statement, parameters, execution_options)
+ result = await self.execute(
+ statement, parameters, execution_options=execution_options
+ )
return result.scalar()
+ @overload
+ async def scalars(
+ self,
+ statement: TypedReturnsRows[Tuple[_T]],
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> ScalarResult[_T]:
+ ...
+
+ @overload
+ async def scalars(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> ScalarResult[Any]:
+ ...
+
async def scalars(
self,
statement: Executable,
parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
execution_options: Optional[_ExecuteOptionsParameter] = None,
) -> ScalarResult[Any]:
r"""Executes a SQL statement construct and returns a scalar objects.
@@ -528,13 +619,36 @@ class AsyncConnection(
.. versionadded:: 1.4.24
"""
- result = await self.execute(statement, parameters, execution_options)
+ result = await self.execute(
+ statement, parameters, execution_options=execution_options
+ )
return result.scalars()
+ @overload
+ async def stream_scalars(
+ self,
+ statement: TypedReturnsRows[Tuple[_T]],
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> AsyncScalarResult[_T]:
+ ...
+
+ @overload
async def stream_scalars(
self,
statement: Executable,
parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> AsyncScalarResult[Any]:
+ ...
+
+ async def stream_scalars(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
execution_options: Optional[_ExecuteOptionsParameter] = None,
) -> AsyncScalarResult[Any]:
r"""Executes a SQL statement and returns a streaming scalar result
@@ -549,7 +663,9 @@ class AsyncConnection(
.. versionadded:: 1.4.24
"""
- result = await self.stream(statement, parameters, execution_options)
+ result = await self.stream(
+ statement, parameters, execution_options=execution_options
+ )
return result.scalars()
async def run_sync(
diff --git a/lib/sqlalchemy/ext/asyncio/result.py b/lib/sqlalchemy/ext/asyncio/result.py
index d0337554c..ff3dcf417 100644
--- a/lib/sqlalchemy/ext/asyncio/result.py
+++ b/lib/sqlalchemy/ext/asyncio/result.py
@@ -9,12 +9,15 @@ from __future__ import annotations
import operator
from typing import Any
from typing import AsyncIterator
-from typing import List
from typing import Optional
+from typing import overload
+from typing import Sequence
+from typing import Tuple
from typing import TYPE_CHECKING
from typing import TypeVar
from . import exc as async_exc
+from ... import util
from ...engine.result import _NO_ROW
from ...engine.result import _R
from ...engine.result import FilterResult
@@ -24,6 +27,7 @@ from ...engine.result import ResultMetaData
from ...engine.row import Row
from ...engine.row import RowMapping
from ...util.concurrency import greenlet_spawn
+from ...util.typing import Literal
if TYPE_CHECKING:
from ...engine import CursorResult
@@ -32,9 +36,14 @@ if TYPE_CHECKING:
from ...engine.result import _UniqueFilterType
from ...engine.result import RMKeyView
+_T = TypeVar("_T", bound=Any)
+_TP = TypeVar("_TP", bound=Tuple[Any, ...])
+
class AsyncCommon(FilterResult[_R]):
- _real_result: Result
+ __slots__ = ()
+
+ _real_result: Result[Any]
_metadata: ResultMetaData
async def close(self) -> None:
@@ -43,10 +52,10 @@ class AsyncCommon(FilterResult[_R]):
await greenlet_spawn(self._real_result.close)
-SelfAsyncResult = TypeVar("SelfAsyncResult", bound="AsyncResult")
+SelfAsyncResult = TypeVar("SelfAsyncResult", bound="AsyncResult[Any]")
-class AsyncResult(AsyncCommon[Row]):
+class AsyncResult(AsyncCommon[Row[_TP]]):
"""An asyncio wrapper around a :class:`_result.Result` object.
The :class:`_asyncio.AsyncResult` only applies to statement executions that
@@ -67,11 +76,16 @@ class AsyncResult(AsyncCommon[Row]):
"""
- def __init__(self, real_result: Result):
+ __slots__ = ()
+
+ _real_result: Result[_TP]
+
+ def __init__(self, real_result: Result[_TP]):
self._real_result = real_result
self._metadata = real_result._metadata
self._unique_filter_state = real_result._unique_filter_state
+ self._post_creational_filter = None
# BaseCursorResult pre-generates the "_row_getter". Use that
# if available rather than building a second one
@@ -80,6 +94,43 @@ class AsyncResult(AsyncCommon[Row]):
"_row_getter", real_result.__dict__["_row_getter"]
)
+ @property
+ def t(self) -> AsyncTupleResult[_TP]:
+ """Apply a "typed tuple" typing filter to returned rows.
+
+ The :attr:`.AsyncResult.t` attribute is a synonym for calling the
+ :meth:`.AsyncResult.tuples` method.
+
+ .. versionadded:: 2.0
+
+ """
+ return self # type: ignore
+
+ def tuples(self) -> AsyncTupleResult[_TP]:
+ """Apply a "typed tuple" typing filter to returned rows.
+
+ This method returns the same :class:`.AsyncResult` object at runtime,
+ however annotates as returning a :class:`.AsyncTupleResult` object
+ that will indicate to :pep:`484` typing tools that plain typed
+ ``Tuple`` instances are returned rather than rows. This allows
+ tuple unpacking and ``__getitem__`` access of :class:`.Row` objects
+ to by typed, for those cases where the statement invoked itself
+ included typing information.
+
+ .. versionadded:: 2.0
+
+ :return: the :class:`_result.AsyncTupleResult` type at typing time.
+
+ .. seealso::
+
+ :attr:`.AsyncResult.t` - shorter synonym
+
+ :attr:`.Row.t` - :class:`.Row` version
+
+ """
+
+ return self # type: ignore
+
def keys(self) -> RMKeyView:
"""Return the :meth:`_engine.Result.keys` collection from the
underlying :class:`_engine.Result`.
@@ -115,7 +166,7 @@ class AsyncResult(AsyncCommon[Row]):
async def partitions(
self, size: Optional[int] = None
- ) -> AsyncIterator[List[Row]]:
+ ) -> AsyncIterator[Sequence[Row[_TP]]]:
"""Iterate through sub-lists of rows of the size given.
An async iterator is returned::
@@ -141,7 +192,16 @@ class AsyncResult(AsyncCommon[Row]):
else:
break
- async def fetchone(self) -> Optional[Row]:
+ async def fetchall(self) -> Sequence[Row[_TP]]:
+ """A synonym for the :meth:`.AsyncResult.all` method.
+
+ .. versionadded:: 2.0
+
+ """
+
+ return await greenlet_spawn(self._allrows)
+
+ async def fetchone(self) -> Optional[Row[_TP]]:
"""Fetch one row.
When all rows are exhausted, returns None.
@@ -163,7 +223,9 @@ class AsyncResult(AsyncCommon[Row]):
else:
return row
- async def fetchmany(self, size: Optional[int] = None) -> List[Row]:
+ async def fetchmany(
+ self, size: Optional[int] = None
+ ) -> Sequence[Row[_TP]]:
"""Fetch many rows.
When all rows are exhausted, returns an empty list.
@@ -184,7 +246,7 @@ class AsyncResult(AsyncCommon[Row]):
return await greenlet_spawn(self._manyrow_getter, self, size)
- async def all(self) -> List[Row]:
+ async def all(self) -> Sequence[Row[_TP]]:
"""Return all rows in a list.
Closes the result set after invocation. Subsequent invocations
@@ -196,17 +258,17 @@ class AsyncResult(AsyncCommon[Row]):
return await greenlet_spawn(self._allrows)
- def __aiter__(self) -> AsyncResult:
+ def __aiter__(self) -> AsyncResult[_TP]:
return self
- async def __anext__(self) -> Row:
+ async def __anext__(self) -> Row[_TP]:
row = await greenlet_spawn(self._onerow_getter, self)
if row is _NO_ROW:
raise StopAsyncIteration()
else:
return row
- async def first(self) -> Optional[Row]:
+ async def first(self) -> Optional[Row[_TP]]:
"""Fetch the first row or None if no row is present.
Closes the result set and discards remaining rows.
@@ -229,7 +291,7 @@ class AsyncResult(AsyncCommon[Row]):
"""
return await greenlet_spawn(self._only_one_row, False, False, False)
- async def one_or_none(self) -> Optional[Row]:
+ async def one_or_none(self) -> Optional[Row[_TP]]:
"""Return at most one result or raise an exception.
Returns ``None`` if the result has no rows.
@@ -251,6 +313,14 @@ class AsyncResult(AsyncCommon[Row]):
"""
return await greenlet_spawn(self._only_one_row, True, False, False)
+ @overload
+ async def scalar_one(self: AsyncResult[Tuple[_T]]) -> _T:
+ ...
+
+ @overload
+ async def scalar_one(self) -> Any:
+ ...
+
async def scalar_one(self) -> Any:
"""Return exactly one scalar result or raise an exception.
@@ -266,6 +336,16 @@ class AsyncResult(AsyncCommon[Row]):
"""
return await greenlet_spawn(self._only_one_row, True, True, True)
+ @overload
+ async def scalar_one_or_none(
+ self: AsyncResult[Tuple[_T]],
+ ) -> Optional[_T]:
+ ...
+
+ @overload
+ async def scalar_one_or_none(self) -> Optional[Any]:
+ ...
+
async def scalar_one_or_none(self) -> Optional[Any]:
"""Return exactly one or no scalar result.
@@ -281,7 +361,7 @@ class AsyncResult(AsyncCommon[Row]):
"""
return await greenlet_spawn(self._only_one_row, True, False, True)
- async def one(self) -> Row:
+ async def one(self) -> Row[_TP]:
"""Return exactly one row or raise an exception.
Raises :class:`.NoResultFound` if the result returns no
@@ -312,6 +392,14 @@ class AsyncResult(AsyncCommon[Row]):
"""
return await greenlet_spawn(self._only_one_row, True, True, False)
+ @overload
+ async def scalar(self: AsyncResult[Tuple[_T]]) -> Optional[_T]:
+ ...
+
+ @overload
+ async def scalar(self) -> Any:
+ ...
+
async def scalar(self) -> Any:
"""Fetch the first column of the first row, and close the result set.
@@ -328,7 +416,7 @@ class AsyncResult(AsyncCommon[Row]):
"""
return await greenlet_spawn(self._only_one_row, False, False, True)
- async def freeze(self) -> FrozenResult:
+ async def freeze(self) -> FrozenResult[_TP]:
"""Return a callable object that will produce copies of this
:class:`_asyncio.AsyncResult` when invoked.
@@ -351,7 +439,7 @@ class AsyncResult(AsyncCommon[Row]):
return await greenlet_spawn(FrozenResult, self)
- def merge(self, *others: AsyncResult) -> MergedResult:
+ def merge(self, *others: AsyncResult[_TP]) -> MergedResult[_TP]:
"""Merge this :class:`_asyncio.AsyncResult` with other compatible result
objects.
@@ -370,6 +458,20 @@ class AsyncResult(AsyncCommon[Row]):
(self._real_result,) + tuple(o._real_result for o in others),
)
+ @overload
+ def scalars(
+ self: AsyncResult[Tuple[_T]], index: Literal[0]
+ ) -> AsyncScalarResult[_T]:
+ ...
+
+ @overload
+ def scalars(self: AsyncResult[Tuple[_T]]) -> AsyncScalarResult[_T]:
+ ...
+
+ @overload
+ def scalars(self, index: _KeyIndexType = 0) -> AsyncScalarResult[Any]:
+ ...
+
def scalars(self, index: _KeyIndexType = 0) -> AsyncScalarResult[Any]:
"""Return an :class:`_asyncio.AsyncScalarResult` filtering object which
will return single elements rather than :class:`_row.Row` objects.
@@ -423,9 +525,11 @@ class AsyncScalarResult(AsyncCommon[_R]):
"""
+ __slots__ = ()
+
_generate_rows = False
- def __init__(self, real_result: Result, index: _KeyIndexType):
+ def __init__(self, real_result: Result[Any], index: _KeyIndexType):
self._real_result = real_result
if real_result._source_supports_scalars:
@@ -452,7 +556,7 @@ class AsyncScalarResult(AsyncCommon[_R]):
async def partitions(
self, size: Optional[int] = None
- ) -> AsyncIterator[List[_R]]:
+ ) -> AsyncIterator[Sequence[_R]]:
"""Iterate through sub-lists of elements of the size given.
Equivalent to :meth:`_asyncio.AsyncResult.partitions` except that
@@ -470,12 +574,12 @@ class AsyncScalarResult(AsyncCommon[_R]):
else:
break
- async def fetchall(self) -> List[_R]:
+ async def fetchall(self) -> Sequence[_R]:
"""A synonym for the :meth:`_asyncio.AsyncScalarResult.all` method."""
return await greenlet_spawn(self._allrows)
- async def fetchmany(self, size: Optional[int] = None) -> List[_R]:
+ async def fetchmany(self, size: Optional[int] = None) -> Sequence[_R]:
"""Fetch many objects.
Equivalent to :meth:`_asyncio.AsyncResult.fetchmany` except that
@@ -485,7 +589,7 @@ class AsyncScalarResult(AsyncCommon[_R]):
"""
return await greenlet_spawn(self._manyrow_getter, self, size)
- async def all(self) -> List[_R]:
+ async def all(self) -> Sequence[_R]:
"""Return all scalar values in a list.
Equivalent to :meth:`_asyncio.AsyncResult.all` except that
@@ -555,11 +659,13 @@ class AsyncMappingResult(AsyncCommon[RowMapping]):
"""
+ __slots__ = ()
+
_generate_rows = True
_post_creational_filter = operator.attrgetter("_mapping")
- def __init__(self, result: Result):
+ def __init__(self, result: Result[Any]):
self._real_result = result
self._unique_filter_state = result._unique_filter_state
self._metadata = result._metadata
@@ -602,7 +708,7 @@ class AsyncMappingResult(AsyncCommon[RowMapping]):
async def partitions(
self, size: Optional[int] = None
- ) -> AsyncIterator[List[RowMapping]]:
+ ) -> AsyncIterator[Sequence[RowMapping]]:
"""Iterate through sub-lists of elements of the size given.
@@ -621,7 +727,7 @@ class AsyncMappingResult(AsyncCommon[RowMapping]):
else:
break
- async def fetchall(self) -> List[RowMapping]:
+ async def fetchall(self) -> Sequence[RowMapping]:
"""A synonym for the :meth:`_asyncio.AsyncMappingResult.all` method."""
return await greenlet_spawn(self._allrows)
@@ -641,7 +747,9 @@ class AsyncMappingResult(AsyncCommon[RowMapping]):
else:
return row
- async def fetchmany(self, size: Optional[int] = None) -> List[RowMapping]:
+ async def fetchmany(
+ self, size: Optional[int] = None
+ ) -> Sequence[RowMapping]:
"""Fetch many rows.
Equivalent to :meth:`_asyncio.AsyncResult.fetchmany` except that
@@ -652,7 +760,7 @@ class AsyncMappingResult(AsyncCommon[RowMapping]):
return await greenlet_spawn(self._manyrow_getter, self, size)
- async def all(self) -> List[RowMapping]:
+ async def all(self) -> Sequence[RowMapping]:
"""Return all rows in a list.
Equivalent to :meth:`_asyncio.AsyncResult.all` except that
@@ -705,11 +813,186 @@ class AsyncMappingResult(AsyncCommon[RowMapping]):
return await greenlet_spawn(self._only_one_row, True, True, False)
-_RT = TypeVar("_RT", bound="Result")
+SelfAsyncTupleResult = TypeVar(
+ "SelfAsyncTupleResult", bound="AsyncTupleResult[Any]"
+)
+
+
+class AsyncTupleResult(AsyncCommon[_R], util.TypingOnly):
+ """a :class:`.AsyncResult` that's typed as returning plain Python tuples
+ instead of rows.
+
+ Since :class:`.Row` acts like a tuple in every way already,
+ this class is a typing only class, regular :class:`.AsyncResult` is
+ still used at runtime.
+
+ """
+
+ __slots__ = ()
+
+ if TYPE_CHECKING:
+
+ async def partitions(
+ self, size: Optional[int] = None
+ ) -> AsyncIterator[Sequence[_R]]:
+ """Iterate through sub-lists of elements of the size given.
+
+ Equivalent to :meth:`_result.Result.partitions` except that
+ tuple values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+ ...
+
+ async def fetchone(self) -> Optional[_R]:
+ """Fetch one tuple.
+
+ Equivalent to :meth:`_result.Result.fetchone` except that
+ tuple values, rather than :class:`_result.Row`
+ objects, are returned.
+
+ """
+ ...
+
+ async def fetchall(self) -> Sequence[_R]:
+ """A synonym for the :meth:`_engine.ScalarResult.all` method."""
+ ...
+
+ async def fetchmany(self, size: Optional[int] = None) -> Sequence[_R]:
+ """Fetch many objects.
+
+ Equivalent to :meth:`_result.Result.fetchmany` except that
+ tuple values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+ ...
+
+ async def all(self) -> Sequence[_R]: # noqa: A001
+ """Return all scalar values in a list.
+
+ Equivalent to :meth:`_result.Result.all` except that
+ tuple values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+ ...
+
+ async def __aiter__(self) -> AsyncIterator[_R]:
+ ...
+
+ async def __anext__(self) -> _R:
+ ...
+
+ async def first(self) -> Optional[_R]:
+ """Fetch the first object or None if no object is present.
+
+ Equivalent to :meth:`_result.Result.first` except that
+ tuple values, rather than :class:`_result.Row` objects,
+ are returned.
+
+
+ """
+ ...
+
+ async def one_or_none(self) -> Optional[_R]:
+ """Return at most one object or raise an exception.
+
+ Equivalent to :meth:`_result.Result.one_or_none` except that
+ tuple values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+ ...
+
+ async def one(self) -> _R:
+ """Return exactly one object or raise an exception.
+
+ Equivalent to :meth:`_result.Result.one` except that
+ tuple values, rather than :class:`_result.Row` objects,
+ are returned.
+
+ """
+ ...
+
+ @overload
+ async def scalar_one(self: AsyncTupleResult[Tuple[_T]]) -> _T:
+ ...
+
+ @overload
+ async def scalar_one(self) -> Any:
+ ...
+
+ async def scalar_one(self) -> Any:
+ """Return exactly one scalar result or raise an exception.
+
+ This is equivalent to calling :meth:`.Result.scalars` and then
+ :meth:`.Result.one`.
+
+ .. seealso::
+
+ :meth:`.Result.one`
+
+ :meth:`.Result.scalars`
+
+ """
+ ...
+
+ @overload
+ async def scalar_one_or_none(
+ self: AsyncTupleResult[Tuple[_T]],
+ ) -> Optional[_T]:
+ ...
+
+ @overload
+ async def scalar_one_or_none(self) -> Optional[Any]:
+ ...
+
+ async def scalar_one_or_none(self) -> Optional[Any]:
+ """Return exactly one or no scalar result.
+
+ This is equivalent to calling :meth:`.Result.scalars` and then
+ :meth:`.Result.one_or_none`.
+
+ .. seealso::
+
+ :meth:`.Result.one_or_none`
+
+ :meth:`.Result.scalars`
+
+ """
+ ...
+
+ @overload
+ async def scalar(self: AsyncTupleResult[Tuple[_T]]) -> Optional[_T]:
+ ...
+
+ @overload
+ async def scalar(self) -> Any:
+ ...
+
+ async def scalar(self) -> Any:
+ """Fetch the first column of the first row, and close the result set.
+
+ Returns None if there are no rows to fetch.
+
+ No validation is performed to test if additional rows remain.
+
+ After calling this method, the object is fully closed,
+ e.g. the :meth:`_engine.CursorResult.close`
+ method will have been called.
+
+ :return: a Python scalar value , or None if no rows remain.
+
+ """
+ ...
+
+
+_RT = TypeVar("_RT", bound="Result[Any]")
async def _ensure_sync_result(result: _RT, calling_method: Any) -> _RT:
- cursor_result: CursorResult
+ cursor_result: CursorResult[Any]
try:
is_cursor = result._is_cursor
diff --git a/lib/sqlalchemy/ext/asyncio/scoping.py b/lib/sqlalchemy/ext/asyncio/scoping.py
index c7a6e2ca0..22a060a0d 100644
--- a/lib/sqlalchemy/ext/asyncio/scoping.py
+++ b/lib/sqlalchemy/ext/asyncio/scoping.py
@@ -12,10 +12,12 @@ from typing import Callable
from typing import Iterable
from typing import Iterator
from typing import Optional
+from typing import overload
from typing import Sequence
from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
+from typing import TypeVar
from typing import Union
from .session import async_sessionmaker
@@ -37,9 +39,9 @@ if TYPE_CHECKING:
from ...engine import Engine
from ...engine import Result
from ...engine import Row
+ from ...engine import RowMapping
from ...engine.interfaces import _CoreAnyExecuteParams
from ...engine.interfaces import _CoreSingleExecuteParams
- from ...engine.interfaces import _ExecuteOptions
from ...engine.interfaces import _ExecuteOptionsParameter
from ...engine.result import ScalarResult
from ...orm._typing import _IdentityKeyType
@@ -52,6 +54,9 @@ if TYPE_CHECKING:
from ...sql.base import Executable
from ...sql.elements import ClauseElement
from ...sql.selectable import ForUpdateArg
+ from ...sql.selectable import TypedReturnsRows
+
+_T = TypeVar("_T", bound=Any)
@create_proxy_methods(
@@ -480,6 +485,32 @@ class async_scoped_session:
return await self._proxied.delete(instance)
+ @overload
+ async def execute(
+ self,
+ statement: TypedReturnsRows[_T],
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ _parent_execute_state: Optional[Any] = None,
+ _add_event: Optional[Any] = None,
+ ) -> Result[_T]:
+ ...
+
+ @overload
+ async def execute(
+ self,
+ statement: Executable,
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ _parent_execute_state: Optional[Any] = None,
+ _add_event: Optional[Any] = None,
+ ) -> Result[Any]:
+ ...
+
async def execute(
self,
statement: Executable,
@@ -488,7 +519,7 @@ class async_scoped_session:
execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
- ) -> Result:
+ ) -> Result[Any]:
r"""Execute a statement and return a buffered
:class:`_engine.Result` object.
@@ -916,6 +947,30 @@ class async_scoped_session:
return await self._proxied.rollback()
+ @overload
+ async def scalar(
+ self,
+ statement: TypedReturnsRows[Tuple[_T]],
+ params: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> Optional[_T]:
+ ...
+
+ @overload
+ async def scalar(
+ self,
+ statement: Executable,
+ params: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> Any:
+ ...
+
async def scalar(
self,
statement: Executable,
@@ -947,6 +1002,30 @@ class async_scoped_session:
**kw,
)
+ @overload
+ async def scalars(
+ self,
+ statement: TypedReturnsRows[Tuple[_T]],
+ params: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> ScalarResult[_T]:
+ ...
+
+ @overload
+ async def scalars(
+ self,
+ statement: Executable,
+ params: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> ScalarResult[Any]:
+ ...
+
async def scalars(
self,
statement: Executable,
@@ -984,6 +1063,19 @@ class async_scoped_session:
**kw,
)
+ @overload
+ async def stream(
+ self,
+ statement: TypedReturnsRows[_T],
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> AsyncResult[_T]:
+ ...
+
+ @overload
async def stream(
self,
statement: Executable,
@@ -992,7 +1084,18 @@ class async_scoped_session:
execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
- ) -> AsyncResult:
+ ) -> AsyncResult[Any]:
+ ...
+
+ async def stream(
+ self,
+ statement: Executable,
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> AsyncResult[Any]:
r"""Execute a statement and return a streaming
:class:`_asyncio.AsyncResult` object.
@@ -1012,6 +1115,30 @@ class async_scoped_session:
**kw,
)
+ @overload
+ async def stream_scalars(
+ self,
+ statement: TypedReturnsRows[Tuple[_T]],
+ params: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> AsyncScalarResult[_T]:
+ ...
+
+ @overload
+ async def stream_scalars(
+ self,
+ statement: Executable,
+ params: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> AsyncScalarResult[Any]:
+ ...
+
async def stream_scalars(
self,
statement: Executable,
@@ -1323,7 +1450,7 @@ class async_scoped_session:
ident: Union[Any, Tuple[Any, ...]] = None,
*,
instance: Optional[Any] = None,
- row: Optional[Row] = None,
+ row: Optional[Union[Row[Any], RowMapping]] = None,
identity_token: Optional[Any] = None,
) -> _IdentityKeyType[Any]:
r"""Return an identity key.
diff --git a/lib/sqlalchemy/ext/asyncio/session.py b/lib/sqlalchemy/ext/asyncio/session.py
index 1422f99a3..f2a69e9cd 100644
--- a/lib/sqlalchemy/ext/asyncio/session.py
+++ b/lib/sqlalchemy/ext/asyncio/session.py
@@ -12,10 +12,12 @@ from typing import Iterable
from typing import Iterator
from typing import NoReturn
from typing import Optional
+from typing import overload
from typing import Sequence
from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
+from typing import TypeVar
from typing import Union
from . import engine
@@ -39,11 +41,10 @@ if TYPE_CHECKING:
from ...engine import Engine
from ...engine import Result
from ...engine import Row
+ from ...engine import RowMapping
from ...engine import ScalarResult
- from ...engine import Transaction
from ...engine.interfaces import _CoreAnyExecuteParams
from ...engine.interfaces import _CoreSingleExecuteParams
- from ...engine.interfaces import _ExecuteOptions
from ...engine.interfaces import _ExecuteOptionsParameter
from ...event import dispatcher
from ...orm._typing import _IdentityKeyType
@@ -59,9 +60,12 @@ if TYPE_CHECKING:
from ...sql.base import Executable
from ...sql.elements import ClauseElement
from ...sql.selectable import ForUpdateArg
+ from ...sql.selectable import TypedReturnsRows
_AsyncSessionBind = Union["AsyncEngine", "AsyncConnection"]
+_T = TypeVar("_T", bound=Any)
+
class _SyncSessionCallable(Protocol):
def __call__(self, session: Session, *arg: Any, **kw: Any) -> Any:
@@ -257,6 +261,32 @@ class AsyncSession(ReversibleProxy[Session]):
return await greenlet_spawn(fn, self.sync_session, *arg, **kw)
+ @overload
+ async def execute(
+ self,
+ statement: TypedReturnsRows[_T],
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ _parent_execute_state: Optional[Any] = None,
+ _add_event: Optional[Any] = None,
+ ) -> Result[_T]:
+ ...
+
+ @overload
+ async def execute(
+ self,
+ statement: Executable,
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ _parent_execute_state: Optional[Any] = None,
+ _add_event: Optional[Any] = None,
+ ) -> Result[Any]:
+ ...
+
async def execute(
self,
statement: Executable,
@@ -265,7 +295,7 @@ class AsyncSession(ReversibleProxy[Session]):
execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
- ) -> Result:
+ ) -> Result[Any]:
"""Execute a statement and return a buffered
:class:`_engine.Result` object.
@@ -292,6 +322,30 @@ class AsyncSession(ReversibleProxy[Session]):
)
return await _ensure_sync_result(result, self.execute)
+ @overload
+ async def scalar(
+ self,
+ statement: TypedReturnsRows[Tuple[_T]],
+ params: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> Optional[_T]:
+ ...
+
+ @overload
+ async def scalar(
+ self,
+ statement: Executable,
+ params: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> Any:
+ ...
+
async def scalar(
self,
statement: Executable,
@@ -326,6 +380,30 @@ class AsyncSession(ReversibleProxy[Session]):
)
return result
+ @overload
+ async def scalars(
+ self,
+ statement: TypedReturnsRows[Tuple[_T]],
+ params: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> ScalarResult[_T]:
+ ...
+
+ @overload
+ async def scalars(
+ self,
+ statement: Executable,
+ params: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> ScalarResult[Any]:
+ ...
+
async def scalars(
self,
statement: Executable,
@@ -391,6 +469,30 @@ class AsyncSession(ReversibleProxy[Session]):
)
return result_obj
+ @overload
+ async def stream(
+ self,
+ statement: TypedReturnsRows[_T],
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> AsyncResult[_T]:
+ ...
+
+ @overload
+ async def stream(
+ self,
+ statement: Executable,
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> AsyncResult[Any]:
+ ...
+
async def stream(
self,
statement: Executable,
@@ -399,7 +501,7 @@ class AsyncSession(ReversibleProxy[Session]):
execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
- ) -> AsyncResult:
+ ) -> AsyncResult[Any]:
"""Execute a statement and return a streaming
:class:`_asyncio.AsyncResult` object.
@@ -423,6 +525,30 @@ class AsyncSession(ReversibleProxy[Session]):
)
return AsyncResult(result)
+ @overload
+ async def stream_scalars(
+ self,
+ statement: TypedReturnsRows[Tuple[_T]],
+ params: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> AsyncScalarResult[_T]:
+ ...
+
+ @overload
+ async def stream_scalars(
+ self,
+ statement: Executable,
+ params: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> AsyncScalarResult[Any]:
+ ...
+
async def stream_scalars(
self,
statement: Executable,
@@ -1215,7 +1341,7 @@ class AsyncSession(ReversibleProxy[Session]):
ident: Union[Any, Tuple[Any, ...]] = None,
*,
instance: Optional[Any] = None,
- row: Optional[Row] = None,
+ row: Optional[Union[Row[Any], RowMapping]] = None,
identity_token: Optional[Any] = None,
) -> _IdentityKeyType[Any]:
r"""Return an identity key.