summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/ext/asyncio/engine.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/ext/asyncio/engine.py')
-rw-r--r--lib/sqlalchemy/ext/asyncio/engine.py132
1 files changed, 124 insertions, 8 deletions
diff --git a/lib/sqlalchemy/ext/asyncio/engine.py b/lib/sqlalchemy/ext/asyncio/engine.py
index fb05f512e..95549ada6 100644
--- a/lib/sqlalchemy/ext/asyncio/engine.py
+++ b/lib/sqlalchemy/ext/asyncio/engine.py
@@ -12,8 +12,10 @@ from typing import Generator
from typing import NoReturn
from typing import Optional
from typing import overload
+from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
+from typing import TypeVar
from typing import Union
from . import exc as async_exc
@@ -50,6 +52,9 @@ if TYPE_CHECKING:
from ...pool import PoolProxiedConnection
from ...sql._typing import _InfoType
from ...sql.base import Executable
+ from ...sql.selectable import TypedReturnsRows
+
+_T = TypeVar("_T", bound=Any)
class _SyncConnectionCallable(Protocol):
@@ -407,7 +412,7 @@ class AsyncConnection(
statement: str,
parameters: Optional[_DBAPIAnyExecuteParams] = None,
execution_options: Optional[_ExecuteOptionsParameter] = None,
- ) -> CursorResult:
+ ) -> CursorResult[Any]:
r"""Executes a driver-level SQL string and return buffered
:class:`_engine.Result`.
@@ -423,12 +428,33 @@ class AsyncConnection(
return await _ensure_sync_result(result, self.exec_driver_sql)
+ @overload
+ async def stream(
+ self,
+ statement: TypedReturnsRows[_T],
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> AsyncResult[_T]:
+ ...
+
+ @overload
async def stream(
self,
statement: Executable,
parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
execution_options: Optional[_ExecuteOptionsParameter] = None,
- ) -> AsyncResult:
+ ) -> AsyncResult[Any]:
+ ...
+
+ async def stream(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> AsyncResult[Any]:
"""Execute a statement and return a streaming
:class:`_asyncio.AsyncResult` object."""
@@ -436,7 +462,7 @@ class AsyncConnection(
self._proxied.execute,
statement,
parameters,
- util.EMPTY_DICT.merge_with(
+ execution_options=util.EMPTY_DICT.merge_with(
execution_options, {"stream_results": True}
),
_require_await=True,
@@ -446,12 +472,33 @@ class AsyncConnection(
assert False, "server side result expected"
return AsyncResult(result)
+ @overload
+ async def execute(
+ self,
+ statement: TypedReturnsRows[_T],
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> CursorResult[_T]:
+ ...
+
+ @overload
async def execute(
self,
statement: Executable,
parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
execution_options: Optional[_ExecuteOptionsParameter] = None,
- ) -> CursorResult:
+ ) -> CursorResult[Any]:
+ ...
+
+ async def execute(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> CursorResult[Any]:
r"""Executes a SQL statement construct and return a buffered
:class:`_engine.Result`.
@@ -487,15 +534,36 @@ class AsyncConnection(
self._proxied.execute,
statement,
parameters,
- execution_options,
+ execution_options=execution_options,
_require_await=True,
)
return await _ensure_sync_result(result, self.execute)
+ @overload
+ async def scalar(
+ self,
+ statement: TypedReturnsRows[Tuple[_T]],
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> Optional[_T]:
+ ...
+
+ @overload
async def scalar(
self,
statement: Executable,
parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> Any:
+ ...
+
+ async def scalar(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
execution_options: Optional[_ExecuteOptionsParameter] = None,
) -> Any:
r"""Executes a SQL statement construct and returns a scalar object.
@@ -508,13 +576,36 @@ class AsyncConnection(
first row returned.
"""
- result = await self.execute(statement, parameters, execution_options)
+ result = await self.execute(
+ statement, parameters, execution_options=execution_options
+ )
return result.scalar()
+ @overload
+ async def scalars(
+ self,
+ statement: TypedReturnsRows[Tuple[_T]],
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> ScalarResult[_T]:
+ ...
+
+ @overload
+ async def scalars(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> ScalarResult[Any]:
+ ...
+
async def scalars(
self,
statement: Executable,
parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
execution_options: Optional[_ExecuteOptionsParameter] = None,
) -> ScalarResult[Any]:
r"""Executes a SQL statement construct and returns a scalar objects.
@@ -528,13 +619,36 @@ class AsyncConnection(
.. versionadded:: 1.4.24
"""
- result = await self.execute(statement, parameters, execution_options)
+ result = await self.execute(
+ statement, parameters, execution_options=execution_options
+ )
return result.scalars()
+ @overload
+ async def stream_scalars(
+ self,
+ statement: TypedReturnsRows[Tuple[_T]],
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> AsyncScalarResult[_T]:
+ ...
+
+ @overload
async def stream_scalars(
self,
statement: Executable,
parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> AsyncScalarResult[Any]:
+ ...
+
+ async def stream_scalars(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
execution_options: Optional[_ExecuteOptionsParameter] = None,
) -> AsyncScalarResult[Any]:
r"""Executes a SQL statement and returns a streaming scalar result
@@ -549,7 +663,9 @@ class AsyncConnection(
.. versionadded:: 1.4.24
"""
- result = await self.stream(statement, parameters, execution_options)
+ result = await self.stream(
+ statement, parameters, execution_options=execution_options
+ )
return result.scalars()
async def run_sync(