diff options
Diffstat (limited to 'lib/sqlalchemy/ext/asyncio/engine.py')
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/engine.py | 132 |
1 files changed, 124 insertions, 8 deletions
diff --git a/lib/sqlalchemy/ext/asyncio/engine.py b/lib/sqlalchemy/ext/asyncio/engine.py index fb05f512e..95549ada6 100644 --- a/lib/sqlalchemy/ext/asyncio/engine.py +++ b/lib/sqlalchemy/ext/asyncio/engine.py @@ -12,8 +12,10 @@ from typing import Generator from typing import NoReturn from typing import Optional from typing import overload +from typing import Tuple from typing import Type from typing import TYPE_CHECKING +from typing import TypeVar from typing import Union from . import exc as async_exc @@ -50,6 +52,9 @@ if TYPE_CHECKING: from ...pool import PoolProxiedConnection from ...sql._typing import _InfoType from ...sql.base import Executable + from ...sql.selectable import TypedReturnsRows + +_T = TypeVar("_T", bound=Any) class _SyncConnectionCallable(Protocol): @@ -407,7 +412,7 @@ class AsyncConnection( statement: str, parameters: Optional[_DBAPIAnyExecuteParams] = None, execution_options: Optional[_ExecuteOptionsParameter] = None, - ) -> CursorResult: + ) -> CursorResult[Any]: r"""Executes a driver-level SQL string and return buffered :class:`_engine.Result`. @@ -423,12 +428,33 @@ class AsyncConnection( return await _ensure_sync_result(result, self.exec_driver_sql) + @overload + async def stream( + self, + statement: TypedReturnsRows[_T], + parameters: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> AsyncResult[_T]: + ... + + @overload async def stream( self, statement: Executable, parameters: Optional[_CoreAnyExecuteParams] = None, + *, execution_options: Optional[_ExecuteOptionsParameter] = None, - ) -> AsyncResult: + ) -> AsyncResult[Any]: + ... + + async def stream( + self, + statement: Executable, + parameters: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> AsyncResult[Any]: """Execute a statement and return a streaming :class:`_asyncio.AsyncResult` object.""" @@ -436,7 +462,7 @@ class AsyncConnection( self._proxied.execute, statement, parameters, - util.EMPTY_DICT.merge_with( + execution_options=util.EMPTY_DICT.merge_with( execution_options, {"stream_results": True} ), _require_await=True, @@ -446,12 +472,33 @@ class AsyncConnection( assert False, "server side result expected" return AsyncResult(result) + @overload + async def execute( + self, + statement: TypedReturnsRows[_T], + parameters: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> CursorResult[_T]: + ... + + @overload async def execute( self, statement: Executable, parameters: Optional[_CoreAnyExecuteParams] = None, + *, execution_options: Optional[_ExecuteOptionsParameter] = None, - ) -> CursorResult: + ) -> CursorResult[Any]: + ... + + async def execute( + self, + statement: Executable, + parameters: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> CursorResult[Any]: r"""Executes a SQL statement construct and return a buffered :class:`_engine.Result`. @@ -487,15 +534,36 @@ class AsyncConnection( self._proxied.execute, statement, parameters, - execution_options, + execution_options=execution_options, _require_await=True, ) return await _ensure_sync_result(result, self.execute) + @overload + async def scalar( + self, + statement: TypedReturnsRows[Tuple[_T]], + parameters: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> Optional[_T]: + ... + + @overload async def scalar( self, statement: Executable, parameters: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> Any: + ... + + async def scalar( + self, + statement: Executable, + parameters: Optional[_CoreSingleExecuteParams] = None, + *, execution_options: Optional[_ExecuteOptionsParameter] = None, ) -> Any: r"""Executes a SQL statement construct and returns a scalar object. @@ -508,13 +576,36 @@ class AsyncConnection( first row returned. """ - result = await self.execute(statement, parameters, execution_options) + result = await self.execute( + statement, parameters, execution_options=execution_options + ) return result.scalar() + @overload + async def scalars( + self, + statement: TypedReturnsRows[Tuple[_T]], + parameters: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> ScalarResult[_T]: + ... + + @overload + async def scalars( + self, + statement: Executable, + parameters: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> ScalarResult[Any]: + ... + async def scalars( self, statement: Executable, parameters: Optional[_CoreSingleExecuteParams] = None, + *, execution_options: Optional[_ExecuteOptionsParameter] = None, ) -> ScalarResult[Any]: r"""Executes a SQL statement construct and returns a scalar objects. @@ -528,13 +619,36 @@ class AsyncConnection( .. versionadded:: 1.4.24 """ - result = await self.execute(statement, parameters, execution_options) + result = await self.execute( + statement, parameters, execution_options=execution_options + ) return result.scalars() + @overload + async def stream_scalars( + self, + statement: TypedReturnsRows[Tuple[_T]], + parameters: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> AsyncScalarResult[_T]: + ... + + @overload async def stream_scalars( self, statement: Executable, parameters: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: Optional[_ExecuteOptionsParameter] = None, + ) -> AsyncScalarResult[Any]: + ... + + async def stream_scalars( + self, + statement: Executable, + parameters: Optional[_CoreSingleExecuteParams] = None, + *, execution_options: Optional[_ExecuteOptionsParameter] = None, ) -> AsyncScalarResult[Any]: r"""Executes a SQL statement and returns a streaming scalar result @@ -549,7 +663,9 @@ class AsyncConnection( .. versionadded:: 1.4.24 """ - result = await self.stream(statement, parameters, execution_options) + result = await self.stream( + statement, parameters, execution_options=execution_options + ) return result.scalars() async def run_sync( |