summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/ext/asyncio/engine.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2022-04-19 21:06:41 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2022-04-27 14:46:36 -0400
commitad11c482e2233f44e8747d4d5a2b17a995fff1fa (patch)
tree57f8ddd30928951519fd6ac0f418e9cbf8e65610 /lib/sqlalchemy/ext/asyncio/engine.py
parent033d1a16e7a220555d7611a5b8cacb1bd83822ae (diff)
downloadsqlalchemy-ad11c482e2233f44e8747d4d5a2b17a995fff1fa.tar.gz
pep484 ORM / SQL result support
after some experimentation it seems mypy is more amenable to the generic types being fully integrated rather than having separate spin-off types. so key structures like Result, Row, Select become generic. For DML Insert, Update, Delete, these are spun into type-specific subclasses ReturningInsert, ReturningUpdate, ReturningDelete, which is fine since the "row-ness" of these constructs doesn't happen until returning() is called in any case. a Tuple based model is then integrated so that these objects can carry along information about their return types. Overloads at the .execute() level carry through the Tuple from the invoked object to the result. To suit the issue of AliasedClass generating attributes that are dynamic, experimented with a custom subclass AsAliased, but then just settled on having aliased() lie to the type checker and return `Type[_O]`, essentially. will need some type-related accessors for with_polymorphic() also. Additionally, identified an issue in Update when used "mysql style" against a join(), it basically doesn't work if asked to UPDATE two tables on the same column name. added an error message to the specific condition where it happens with a very non-specific error message that we hit a thing we can't do right now, suggest multi-table update as a possible cause. Change-Id: I5eff7eefe1d6166ee74160b2785c5e6a81fa8b95
Diffstat (limited to 'lib/sqlalchemy/ext/asyncio/engine.py')
-rw-r--r--lib/sqlalchemy/ext/asyncio/engine.py132
1 files changed, 124 insertions, 8 deletions
diff --git a/lib/sqlalchemy/ext/asyncio/engine.py b/lib/sqlalchemy/ext/asyncio/engine.py
index fb05f512e..95549ada6 100644
--- a/lib/sqlalchemy/ext/asyncio/engine.py
+++ b/lib/sqlalchemy/ext/asyncio/engine.py
@@ -12,8 +12,10 @@ from typing import Generator
from typing import NoReturn
from typing import Optional
from typing import overload
+from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
+from typing import TypeVar
from typing import Union
from . import exc as async_exc
@@ -50,6 +52,9 @@ if TYPE_CHECKING:
from ...pool import PoolProxiedConnection
from ...sql._typing import _InfoType
from ...sql.base import Executable
+ from ...sql.selectable import TypedReturnsRows
+
+_T = TypeVar("_T", bound=Any)
class _SyncConnectionCallable(Protocol):
@@ -407,7 +412,7 @@ class AsyncConnection(
statement: str,
parameters: Optional[_DBAPIAnyExecuteParams] = None,
execution_options: Optional[_ExecuteOptionsParameter] = None,
- ) -> CursorResult:
+ ) -> CursorResult[Any]:
r"""Executes a driver-level SQL string and return buffered
:class:`_engine.Result`.
@@ -423,12 +428,33 @@ class AsyncConnection(
return await _ensure_sync_result(result, self.exec_driver_sql)
+ @overload
+ async def stream(
+ self,
+ statement: TypedReturnsRows[_T],
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> AsyncResult[_T]:
+ ...
+
+ @overload
async def stream(
self,
statement: Executable,
parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
execution_options: Optional[_ExecuteOptionsParameter] = None,
- ) -> AsyncResult:
+ ) -> AsyncResult[Any]:
+ ...
+
+ async def stream(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> AsyncResult[Any]:
"""Execute a statement and return a streaming
:class:`_asyncio.AsyncResult` object."""
@@ -436,7 +462,7 @@ class AsyncConnection(
self._proxied.execute,
statement,
parameters,
- util.EMPTY_DICT.merge_with(
+ execution_options=util.EMPTY_DICT.merge_with(
execution_options, {"stream_results": True}
),
_require_await=True,
@@ -446,12 +472,33 @@ class AsyncConnection(
assert False, "server side result expected"
return AsyncResult(result)
+ @overload
+ async def execute(
+ self,
+ statement: TypedReturnsRows[_T],
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> CursorResult[_T]:
+ ...
+
+ @overload
async def execute(
self,
statement: Executable,
parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
execution_options: Optional[_ExecuteOptionsParameter] = None,
- ) -> CursorResult:
+ ) -> CursorResult[Any]:
+ ...
+
+ async def execute(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> CursorResult[Any]:
r"""Executes a SQL statement construct and return a buffered
:class:`_engine.Result`.
@@ -487,15 +534,36 @@ class AsyncConnection(
self._proxied.execute,
statement,
parameters,
- execution_options,
+ execution_options=execution_options,
_require_await=True,
)
return await _ensure_sync_result(result, self.execute)
+ @overload
+ async def scalar(
+ self,
+ statement: TypedReturnsRows[Tuple[_T]],
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> Optional[_T]:
+ ...
+
+ @overload
async def scalar(
self,
statement: Executable,
parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> Any:
+ ...
+
+ async def scalar(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
execution_options: Optional[_ExecuteOptionsParameter] = None,
) -> Any:
r"""Executes a SQL statement construct and returns a scalar object.
@@ -508,13 +576,36 @@ class AsyncConnection(
first row returned.
"""
- result = await self.execute(statement, parameters, execution_options)
+ result = await self.execute(
+ statement, parameters, execution_options=execution_options
+ )
return result.scalar()
+ @overload
+ async def scalars(
+ self,
+ statement: TypedReturnsRows[Tuple[_T]],
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> ScalarResult[_T]:
+ ...
+
+ @overload
+ async def scalars(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> ScalarResult[Any]:
+ ...
+
async def scalars(
self,
statement: Executable,
parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
execution_options: Optional[_ExecuteOptionsParameter] = None,
) -> ScalarResult[Any]:
r"""Executes a SQL statement construct and returns a scalar objects.
@@ -528,13 +619,36 @@ class AsyncConnection(
.. versionadded:: 1.4.24
"""
- result = await self.execute(statement, parameters, execution_options)
+ result = await self.execute(
+ statement, parameters, execution_options=execution_options
+ )
return result.scalars()
+ @overload
+ async def stream_scalars(
+ self,
+ statement: TypedReturnsRows[Tuple[_T]],
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> AsyncScalarResult[_T]:
+ ...
+
+ @overload
async def stream_scalars(
self,
statement: Executable,
parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> AsyncScalarResult[Any]:
+ ...
+
+ async def stream_scalars(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
execution_options: Optional[_ExecuteOptionsParameter] = None,
) -> AsyncScalarResult[Any]:
r"""Executes a SQL statement and returns a streaming scalar result
@@ -549,7 +663,9 @@ class AsyncConnection(
.. versionadded:: 1.4.24
"""
- result = await self.stream(statement, parameters, execution_options)
+ result = await self.stream(
+ statement, parameters, execution_options=execution_options
+ )
return result.scalars()
async def run_sync(