summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/ext/asyncio/session.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/ext/asyncio/session.py')
-rw-r--r--lib/sqlalchemy/ext/asyncio/session.py136
1 files changed, 131 insertions, 5 deletions
diff --git a/lib/sqlalchemy/ext/asyncio/session.py b/lib/sqlalchemy/ext/asyncio/session.py
index 1422f99a3..f2a69e9cd 100644
--- a/lib/sqlalchemy/ext/asyncio/session.py
+++ b/lib/sqlalchemy/ext/asyncio/session.py
@@ -12,10 +12,12 @@ from typing import Iterable
from typing import Iterator
from typing import NoReturn
from typing import Optional
+from typing import overload
from typing import Sequence
from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
+from typing import TypeVar
from typing import Union
from . import engine
@@ -39,11 +41,10 @@ if TYPE_CHECKING:
from ...engine import Engine
from ...engine import Result
from ...engine import Row
+ from ...engine import RowMapping
from ...engine import ScalarResult
- from ...engine import Transaction
from ...engine.interfaces import _CoreAnyExecuteParams
from ...engine.interfaces import _CoreSingleExecuteParams
- from ...engine.interfaces import _ExecuteOptions
from ...engine.interfaces import _ExecuteOptionsParameter
from ...event import dispatcher
from ...orm._typing import _IdentityKeyType
@@ -59,9 +60,12 @@ if TYPE_CHECKING:
from ...sql.base import Executable
from ...sql.elements import ClauseElement
from ...sql.selectable import ForUpdateArg
+ from ...sql.selectable import TypedReturnsRows
_AsyncSessionBind = Union["AsyncEngine", "AsyncConnection"]
+_T = TypeVar("_T", bound=Any)
+
class _SyncSessionCallable(Protocol):
def __call__(self, session: Session, *arg: Any, **kw: Any) -> Any:
@@ -257,6 +261,32 @@ class AsyncSession(ReversibleProxy[Session]):
return await greenlet_spawn(fn, self.sync_session, *arg, **kw)
+ @overload
+ async def execute(
+ self,
+ statement: TypedReturnsRows[_T],
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ _parent_execute_state: Optional[Any] = None,
+ _add_event: Optional[Any] = None,
+ ) -> Result[_T]:
+ ...
+
+ @overload
+ async def execute(
+ self,
+ statement: Executable,
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ _parent_execute_state: Optional[Any] = None,
+ _add_event: Optional[Any] = None,
+ ) -> Result[Any]:
+ ...
+
async def execute(
self,
statement: Executable,
@@ -265,7 +295,7 @@ class AsyncSession(ReversibleProxy[Session]):
execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
- ) -> Result:
+ ) -> Result[Any]:
"""Execute a statement and return a buffered
:class:`_engine.Result` object.
@@ -292,6 +322,30 @@ class AsyncSession(ReversibleProxy[Session]):
)
return await _ensure_sync_result(result, self.execute)
+ @overload
+ async def scalar(
+ self,
+ statement: TypedReturnsRows[Tuple[_T]],
+ params: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> Optional[_T]:
+ ...
+
+ @overload
+ async def scalar(
+ self,
+ statement: Executable,
+ params: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> Any:
+ ...
+
async def scalar(
self,
statement: Executable,
@@ -326,6 +380,30 @@ class AsyncSession(ReversibleProxy[Session]):
)
return result
+ @overload
+ async def scalars(
+ self,
+ statement: TypedReturnsRows[Tuple[_T]],
+ params: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> ScalarResult[_T]:
+ ...
+
+ @overload
+ async def scalars(
+ self,
+ statement: Executable,
+ params: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> ScalarResult[Any]:
+ ...
+
async def scalars(
self,
statement: Executable,
@@ -391,6 +469,30 @@ class AsyncSession(ReversibleProxy[Session]):
)
return result_obj
+ @overload
+ async def stream(
+ self,
+ statement: TypedReturnsRows[_T],
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> AsyncResult[_T]:
+ ...
+
+ @overload
+ async def stream(
+ self,
+ statement: Executable,
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> AsyncResult[Any]:
+ ...
+
async def stream(
self,
statement: Executable,
@@ -399,7 +501,7 @@ class AsyncSession(ReversibleProxy[Session]):
execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
- ) -> AsyncResult:
+ ) -> AsyncResult[Any]:
"""Execute a statement and return a streaming
:class:`_asyncio.AsyncResult` object.
@@ -423,6 +525,30 @@ class AsyncSession(ReversibleProxy[Session]):
)
return AsyncResult(result)
+ @overload
+ async def stream_scalars(
+ self,
+ statement: TypedReturnsRows[Tuple[_T]],
+ params: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> AsyncScalarResult[_T]:
+ ...
+
+ @overload
+ async def stream_scalars(
+ self,
+ statement: Executable,
+ params: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> AsyncScalarResult[Any]:
+ ...
+
async def stream_scalars(
self,
statement: Executable,
@@ -1215,7 +1341,7 @@ class AsyncSession(ReversibleProxy[Session]):
ident: Union[Any, Tuple[Any, ...]] = None,
*,
instance: Optional[Any] = None,
- row: Optional[Row] = None,
+ row: Optional[Union[Row[Any], RowMapping]] = None,
identity_token: Optional[Any] = None,
) -> _IdentityKeyType[Any]:
r"""Return an identity key.