summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/ext/asyncio/engine.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/ext/asyncio/engine.py')
-rw-r--r--lib/sqlalchemy/ext/asyncio/engine.py401
1 files changed, 231 insertions, 170 deletions
diff --git a/lib/sqlalchemy/ext/asyncio/engine.py b/lib/sqlalchemy/ext/asyncio/engine.py
index 3b54405c1..bb51a4d22 100644
--- a/lib/sqlalchemy/ext/asyncio/engine.py
+++ b/lib/sqlalchemy/ext/asyncio/engine.py
@@ -7,23 +7,56 @@
from __future__ import annotations
from typing import Any
+from typing import Dict
+from typing import Generator
+from typing import NoReturn
+from typing import Optional
+from typing import overload
+from typing import Type
+from typing import TYPE_CHECKING
+from typing import Union
from . import exc as async_exc
from .base import ProxyComparable
from .base import StartableContext
from .result import _ensure_sync_result
from .result import AsyncResult
+from .result import AsyncScalarResult
from ... import exc
from ... import inspection
from ... import util
+from ...engine import Connection
from ...engine import create_engine as _create_engine
+from ...engine import Engine
from ...engine.base import NestedTransaction
-from ...future import Connection
-from ...future import Engine
+from ...engine.base import Transaction
from ...util.concurrency import greenlet_spawn
-
-
-def create_async_engine(*arg, **kw):
+from ...util.typing import Protocol
+
+if TYPE_CHECKING:
+ from ...engine import Connection
+ from ...engine import Engine
+ from ...engine.cursor import CursorResult
+ from ...engine.interfaces import _CoreAnyExecuteParams
+ from ...engine.interfaces import _CoreSingleExecuteParams
+ from ...engine.interfaces import _DBAPIAnyExecuteParams
+ from ...engine.interfaces import _ExecuteOptions
+ from ...engine.interfaces import _ExecuteOptionsParameter
+ from ...engine.interfaces import _IsolationLevel
+ from ...engine.interfaces import Dialect
+ from ...engine.result import ScalarResult
+ from ...engine.url import URL
+ from ...pool import Pool
+ from ...pool import PoolProxiedConnection
+ from ...sql.base import Executable
+
+
+class _SyncConnectionCallable(Protocol):
+ def __call__(self, connection: Connection, *arg: Any, **kw: Any) -> Any:
+ ...
+
+
+def create_async_engine(url: Union[str, URL], **kw: Any) -> AsyncEngine:
"""Create a new async engine instance.
Arguments passed to :func:`_asyncio.create_async_engine` are mostly
@@ -43,11 +76,13 @@ def create_async_engine(*arg, **kw):
)
kw["future"] = True
kw["_is_async"] = True
- sync_engine = _create_engine(*arg, **kw)
+ sync_engine = _create_engine(url, **kw)
return AsyncEngine(sync_engine)
-def async_engine_from_config(configuration, prefix="sqlalchemy.", **kwargs):
+def async_engine_from_config(
+ configuration: Dict[str, Any], prefix: str = "sqlalchemy.", **kwargs: Any
+) -> AsyncEngine:
"""Create a new AsyncEngine instance using a configuration dictionary.
This function is analogous to the :func:`_sa.engine_from_config` function
@@ -73,6 +108,14 @@ def async_engine_from_config(configuration, prefix="sqlalchemy.", **kwargs):
class AsyncConnectable:
__slots__ = "_slots_dispatch", "__weakref__"
+ @classmethod
+ def _no_async_engine_events(cls) -> NoReturn:
+ raise NotImplementedError(
+ "asynchronous events are not implemented at this time. Apply "
+ "synchronous listeners to the AsyncEngine.sync_engine or "
+ "AsyncConnection.sync_connection attributes."
+ )
+
@util.create_proxy_methods(
Connection,
@@ -87,7 +130,9 @@ class AsyncConnectable:
"default_isolation_level",
],
)
-class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable):
+class AsyncConnection(
+ ProxyComparable[Connection], StartableContext, AsyncConnectable
+):
"""An asyncio proxy for a :class:`_engine.Connection`.
:class:`_asyncio.AsyncConnection` is acquired using the
@@ -115,12 +160,16 @@ class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable):
"sync_connection",
)
- def __init__(self, async_engine, sync_connection=None):
+ def __init__(
+ self,
+ async_engine: AsyncEngine,
+ sync_connection: Optional[Connection] = None,
+ ):
self.engine = async_engine
self.sync_engine = async_engine.sync_engine
self.sync_connection = self._assign_proxied(sync_connection)
- sync_connection: Connection
+ sync_connection: Optional[Connection]
"""Reference to the sync-style :class:`_engine.Connection` this
:class:`_asyncio.AsyncConnection` proxies requests towards.
@@ -146,12 +195,14 @@ class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable):
"""
@classmethod
- def _regenerate_proxy_for_target(cls, target):
+ def _regenerate_proxy_for_target(
+ cls, target: Connection
+ ) -> AsyncConnection:
return AsyncConnection(
AsyncEngine._retrieve_proxy_for_target(target.engine), target
)
- async def start(self, is_ctxmanager=False):
+ async def start(self, is_ctxmanager: bool = False) -> AsyncConnection:
"""Start this :class:`_asyncio.AsyncConnection` object's context
outside of using a Python ``with:`` block.
@@ -164,7 +215,7 @@ class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable):
return self
@property
- def connection(self):
+ def connection(self) -> NoReturn:
"""Not implemented for async; call
:meth:`_asyncio.AsyncConnection.get_raw_connection`.
"""
@@ -174,7 +225,7 @@ class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable):
"Use the get_raw_connection() method."
)
- async def get_raw_connection(self):
+ async def get_raw_connection(self) -> PoolProxiedConnection:
"""Return the pooled DBAPI-level connection in use by this
:class:`_asyncio.AsyncConnection`.
@@ -187,16 +238,11 @@ class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable):
adapts the driver connection to the DBAPI protocol.
"""
- conn = self._sync_connection()
-
- return await greenlet_spawn(getattr, conn, "connection")
- @property
- def _proxied(self):
- return self.sync_connection
+ return await greenlet_spawn(getattr, self._proxied, "connection")
@property
- def info(self):
+ def info(self) -> Dict[str, Any]:
"""Return the :attr:`_engine.Connection.info` dictionary of the
underlying :class:`_engine.Connection`.
@@ -211,24 +257,28 @@ class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable):
.. versionadded:: 1.4.0b2
"""
- return self.sync_connection.info
+ return self._proxied.info
- def _sync_connection(self):
+ @util.ro_non_memoized_property
+ def _proxied(self) -> Connection:
if not self.sync_connection:
self._raise_for_not_started()
return self.sync_connection
- def begin(self):
+ def begin(self) -> AsyncTransaction:
"""Begin a transaction prior to autobegin occurring."""
- self._sync_connection()
+ assert self._proxied
return AsyncTransaction(self)
- def begin_nested(self):
+ def begin_nested(self) -> AsyncTransaction:
"""Begin a nested transaction and return a transaction handle."""
- self._sync_connection()
+ assert self._proxied
return AsyncTransaction(self, nested=True)
- async def invalidate(self, exception=None):
+ async def invalidate(
+ self, exception: Optional[BaseException] = None
+ ) -> None:
+
"""Invalidate the underlying DBAPI connection associated with
this :class:`_engine.Connection`.
@@ -237,39 +287,27 @@ class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable):
"""
- conn = self._sync_connection()
- return await greenlet_spawn(conn.invalidate, exception=exception)
-
- async def get_isolation_level(self):
- conn = self._sync_connection()
- return await greenlet_spawn(conn.get_isolation_level)
-
- async def set_isolation_level(self):
- conn = self._sync_connection()
- return await greenlet_spawn(conn.get_isolation_level)
-
- def in_transaction(self):
- """Return True if a transaction is in progress.
-
- .. versionadded:: 1.4.0b2
+ return await greenlet_spawn(
+ self._proxied.invalidate, exception=exception
+ )
- """
+ async def get_isolation_level(self) -> _IsolationLevel:
+ return await greenlet_spawn(self._proxied.get_isolation_level)
- conn = self._sync_connection()
+ def in_transaction(self) -> bool:
+ """Return True if a transaction is in progress."""
- return conn.in_transaction()
+ return self._proxied.in_transaction()
- def in_nested_transaction(self):
+ def in_nested_transaction(self) -> bool:
"""Return True if a transaction is in progress.
.. versionadded:: 1.4.0b2
"""
- conn = self._sync_connection()
-
- return conn.in_nested_transaction()
+ return self._proxied.in_nested_transaction()
- def get_transaction(self):
+ def get_transaction(self) -> Optional[AsyncTransaction]:
"""Return an :class:`.AsyncTransaction` representing the current
transaction, if any.
@@ -281,15 +319,14 @@ class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable):
.. versionadded:: 1.4.0b2
"""
- conn = self._sync_connection()
- trans = conn.get_transaction()
+ trans = self._proxied.get_transaction()
if trans is not None:
return AsyncTransaction._retrieve_proxy_for_target(trans)
else:
return None
- def get_nested_transaction(self):
+ def get_nested_transaction(self) -> Optional[AsyncTransaction]:
"""Return an :class:`.AsyncTransaction` representing the current
nested (savepoint) transaction, if any.
@@ -301,15 +338,14 @@ class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable):
.. versionadded:: 1.4.0b2
"""
- conn = self._sync_connection()
- trans = conn.get_nested_transaction()
+ trans = self._proxied.get_nested_transaction()
if trans is not None:
return AsyncTransaction._retrieve_proxy_for_target(trans)
else:
return None
- async def execution_options(self, **opt):
+ async def execution_options(self, **opt: Any) -> AsyncConnection:
r"""Set non-SQL options for the connection which take effect
during execution.
@@ -321,12 +357,12 @@ class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable):
"""
- conn = self._sync_connection()
+ conn = self._proxied
c2 = await greenlet_spawn(conn.execution_options, **opt)
assert c2 is conn
return self
- async def commit(self):
+ async def commit(self) -> None:
"""Commit the transaction that is currently in progress.
This method commits the current transaction if one has been started.
@@ -338,10 +374,9 @@ class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable):
:meth:`_future.Connection.begin` method is called.
"""
- conn = self._sync_connection()
- await greenlet_spawn(conn.commit)
+ await greenlet_spawn(self._proxied.commit)
- async def rollback(self):
+ async def rollback(self) -> None:
"""Roll back the transaction that is currently in progress.
This method rolls back the current transaction if one has been started.
@@ -355,34 +390,30 @@ class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable):
"""
- conn = self._sync_connection()
- await greenlet_spawn(conn.rollback)
+ await greenlet_spawn(self._proxied.rollback)
- async def close(self):
+ async def close(self) -> None:
"""Close this :class:`_asyncio.AsyncConnection`.
This has the effect of also rolling back the transaction if one
is in place.
"""
- conn = self._sync_connection()
- await greenlet_spawn(conn.close)
+ await greenlet_spawn(self._proxied.close)
async def exec_driver_sql(
self,
- statement,
- parameters=None,
- execution_options=util.EMPTY_DICT,
- ):
+ statement: str,
+ parameters: Optional[_DBAPIAnyExecuteParams] = None,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> CursorResult:
r"""Executes a driver-level SQL string and return buffered
:class:`_engine.Result`.
"""
- conn = self._sync_connection()
-
result = await greenlet_spawn(
- conn.exec_driver_sql,
+ self._proxied.exec_driver_sql,
statement,
parameters,
execution_options,
@@ -393,17 +424,15 @@ class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable):
async def stream(
self,
- statement,
- parameters=None,
- execution_options=util.EMPTY_DICT,
- ):
+ statement: Executable,
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> AsyncResult:
"""Execute a statement and return a streaming
:class:`_asyncio.AsyncResult` object."""
- conn = self._sync_connection()
-
result = await greenlet_spawn(
- conn.execute,
+ self._proxied.execute,
statement,
parameters,
util.EMPTY_DICT.merge_with(
@@ -418,10 +447,10 @@ class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable):
async def execute(
self,
- statement,
- parameters=None,
- execution_options=util.EMPTY_DICT,
- ):
+ statement: Executable,
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> CursorResult:
r"""Executes a SQL statement construct and return a buffered
:class:`_engine.Result`.
@@ -453,10 +482,8 @@ class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable):
:return: a :class:`_engine.Result` object.
"""
- conn = self._sync_connection()
-
result = await greenlet_spawn(
- conn.execute,
+ self._proxied.execute,
statement,
parameters,
execution_options,
@@ -466,10 +493,10 @@ class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable):
async def scalar(
self,
- statement,
- parameters=None,
- execution_options=util.EMPTY_DICT,
- ):
+ statement: Executable,
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> Any:
r"""Executes a SQL statement construct and returns a scalar object.
This method is shorthand for invoking the
@@ -485,10 +512,10 @@ class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable):
async def scalars(
self,
- statement,
- parameters=None,
- execution_options=util.EMPTY_DICT,
- ):
+ statement: Executable,
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> ScalarResult[Any]:
r"""Executes a SQL statement construct and returns a scalar objects.
This method is shorthand for invoking the
@@ -505,10 +532,10 @@ class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable):
async def stream_scalars(
self,
- statement,
- parameters=None,
- execution_options=util.EMPTY_DICT,
- ):
+ statement: Executable,
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> AsyncScalarResult[Any]:
r"""Executes a SQL statement and returns a streaming scalar result
object.
@@ -524,7 +551,9 @@ class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable):
result = await self.stream(statement, parameters, execution_options)
return result.scalars()
- async def run_sync(self, fn, *arg, **kw):
+ async def run_sync(
+ self, fn: _SyncConnectionCallable, *arg: Any, **kw: Any
+ ) -> Any:
"""Invoke the given sync callable passing self as the first argument.
This method maintains the asyncio event loop all the way through
@@ -548,14 +577,12 @@ class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable):
:ref:`session_run_sync`
"""
- conn = self._sync_connection()
-
- return await greenlet_spawn(fn, conn, *arg, **kw)
+ return await greenlet_spawn(fn, self._proxied, *arg, **kw)
- def __await__(self):
+ def __await__(self) -> Generator[Any, None, AsyncConnection]:
return self.start().__await__()
- async def __aexit__(self, type_, value, traceback):
+ async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None:
await self.close()
# START PROXY METHODS AsyncConnection
@@ -661,7 +688,7 @@ class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable):
],
attributes=["url", "pool", "dialect", "engine", "name", "driver", "echo"],
)
-class AsyncEngine(ProxyComparable, AsyncConnectable):
+class AsyncEngine(ProxyComparable[Engine], AsyncConnectable):
"""An asyncio proxy for a :class:`_engine.Engine`.
:class:`_asyncio.AsyncEngine` is acquired using the
@@ -679,51 +706,60 @@ class AsyncEngine(ProxyComparable, AsyncConnectable):
# current transaction, info, etc. It should be possible to
# create a new AsyncEngine that matches this one given only the
# "sync" elements.
- __slots__ = ("sync_engine", "_proxied")
+ __slots__ = "sync_engine"
- _connection_cls = AsyncConnection
+ _connection_cls: Type[AsyncConnection] = AsyncConnection
- _option_cls: type
+ sync_engine: Engine
+ """Reference to the sync-style :class:`_engine.Engine` this
+ :class:`_asyncio.AsyncEngine` proxies requests towards.
+
+ This instance can be used as an event target.
+
+ .. seealso::
+
+ :ref:`asyncio_events`
+ """
class _trans_ctx(StartableContext):
- def __init__(self, conn):
+ __slots__ = ("conn", "transaction")
+
+ conn: AsyncConnection
+ transaction: AsyncTransaction
+
+ def __init__(self, conn: AsyncConnection):
self.conn = conn
- async def start(self, is_ctxmanager=False):
+ async def start(self, is_ctxmanager: bool = False) -> AsyncConnection:
await self.conn.start(is_ctxmanager=is_ctxmanager)
self.transaction = self.conn.begin()
await self.transaction.__aenter__()
return self.conn
- async def __aexit__(self, type_, value, traceback):
+ async def __aexit__(
+ self, type_: Any, value: Any, traceback: Any
+ ) -> None:
await self.transaction.__aexit__(type_, value, traceback)
await self.conn.close()
- def __init__(self, sync_engine):
+ def __init__(self, sync_engine: Engine):
if not sync_engine.dialect.is_async:
raise exc.InvalidRequestError(
"The asyncio extension requires an async driver to be used. "
f"The loaded {sync_engine.dialect.driver!r} is not async."
)
- self.sync_engine = self._proxied = self._assign_proxied(sync_engine)
-
- sync_engine: Engine
- """Reference to the sync-style :class:`_engine.Engine` this
- :class:`_asyncio.AsyncEngine` proxies requests towards.
+ self.sync_engine = self._assign_proxied(sync_engine)
- This instance can be used as an event target.
-
- .. seealso::
-
- :ref:`asyncio_events`
- """
+ @util.ro_non_memoized_property
+ def _proxied(self) -> Engine:
+ return self.sync_engine
@classmethod
- def _regenerate_proxy_for_target(cls, target):
+ def _regenerate_proxy_for_target(cls, target: Engine) -> AsyncEngine:
return AsyncEngine(target)
- def begin(self):
+ def begin(self) -> AsyncEngine._trans_ctx:
"""Return a context manager which when entered will deliver an
:class:`_asyncio.AsyncConnection` with an
:class:`_asyncio.AsyncTransaction` established.
@@ -741,7 +777,7 @@ class AsyncEngine(ProxyComparable, AsyncConnectable):
conn = self.connect()
return self._trans_ctx(conn)
- def connect(self):
+ def connect(self) -> AsyncConnection:
"""Return an :class:`_asyncio.AsyncConnection` object.
The :class:`_asyncio.AsyncConnection` will procure a database
@@ -759,7 +795,7 @@ class AsyncEngine(ProxyComparable, AsyncConnectable):
return self._connection_cls(self)
- async def raw_connection(self):
+ async def raw_connection(self) -> PoolProxiedConnection:
"""Return a "raw" DBAPI connection from the connection pool.
.. seealso::
@@ -769,7 +805,7 @@ class AsyncEngine(ProxyComparable, AsyncConnectable):
"""
return await greenlet_spawn(self.sync_engine.raw_connection)
- def execution_options(self, **opt):
+ def execution_options(self, **opt: Any) -> AsyncEngine:
"""Return a new :class:`_asyncio.AsyncEngine` that will provide
:class:`_asyncio.AsyncConnection` objects with the given execution
options.
@@ -781,21 +817,31 @@ class AsyncEngine(ProxyComparable, AsyncConnectable):
return AsyncEngine(self.sync_engine.execution_options(**opt))
- async def dispose(self):
+ async def dispose(self, close: bool = True) -> None:
+
"""Dispose of the connection pool used by this
:class:`_asyncio.AsyncEngine`.
- This will close all connection pool connections that are
- **currently checked in**. See the documentation for the underlying
- :meth:`_future.Engine.dispose` method for further notes.
+ :param close: if left at its default of ``True``, has the
+ effect of fully closing all **currently checked in**
+ database connections. Connections that are still checked out
+ will **not** be closed, however they will no longer be associated
+ with this :class:`_engine.Engine`,
+ so when they are closed individually, eventually the
+ :class:`_pool.Pool` which they are associated with will
+ be garbage collected and they will be closed out fully, if
+ not already closed on checkin.
+
+ If set to ``False``, the previous connection pool is de-referenced,
+ and otherwise not touched in any way.
.. seealso::
- :meth:`_future.Engine.dispose`
+ :meth:`_engine.Engine.dispose`
"""
- return await greenlet_spawn(self.sync_engine.dispose)
+ return await greenlet_spawn(self.sync_engine.dispose, close=close)
# START PROXY METHODS AsyncEngine
@@ -973,18 +1019,24 @@ class AsyncEngine(ProxyComparable, AsyncConnectable):
# END PROXY METHODS AsyncEngine
-class AsyncTransaction(ProxyComparable, StartableContext):
+class AsyncTransaction(ProxyComparable[Transaction], StartableContext):
"""An asyncio proxy for a :class:`_engine.Transaction`."""
__slots__ = ("connection", "sync_transaction", "nested")
- def __init__(self, connection, nested=False):
- self.connection = connection # AsyncConnection
- self.sync_transaction = None # sqlalchemy.engine.Transaction
+ sync_transaction: Optional[Transaction]
+ connection: AsyncConnection
+ nested: bool
+
+ def __init__(self, connection: AsyncConnection, nested: bool = False):
+ self.connection = connection
+ self.sync_transaction = None
self.nested = nested
@classmethod
- def _regenerate_proxy_for_target(cls, target):
+ def _regenerate_proxy_for_target(
+ cls, target: Transaction
+ ) -> AsyncTransaction:
sync_connection = target.connection
sync_transaction = target
nested = isinstance(target, NestedTransaction)
@@ -1000,25 +1052,22 @@ class AsyncTransaction(ProxyComparable, StartableContext):
obj.nested = nested
return obj
- def _sync_transaction(self):
+ @util.ro_non_memoized_property
+ def _proxied(self) -> Transaction:
if not self.sync_transaction:
self._raise_for_not_started()
return self.sync_transaction
@property
- def _proxied(self):
- return self.sync_transaction
+ def is_valid(self) -> bool:
+ return self._proxied.is_valid
@property
- def is_valid(self):
- return self._sync_transaction().is_valid
+ def is_active(self) -> bool:
+ return self._proxied.is_active
- @property
- def is_active(self):
- return self._sync_transaction().is_active
-
- async def close(self):
- """Close this :class:`.Transaction`.
+ async def close(self) -> None:
+ """Close this :class:`.AsyncTransaction`.
If this transaction is the base transaction in a begin/commit
nesting, the transaction will rollback(). Otherwise, the
@@ -1028,18 +1077,18 @@ class AsyncTransaction(ProxyComparable, StartableContext):
an enclosing transaction.
"""
- await greenlet_spawn(self._sync_transaction().close)
+ await greenlet_spawn(self._proxied.close)
- async def rollback(self):
- """Roll back this :class:`.Transaction`."""
- await greenlet_spawn(self._sync_transaction().rollback)
+ async def rollback(self) -> None:
+ """Roll back this :class:`.AsyncTransaction`."""
+ await greenlet_spawn(self._proxied.rollback)
- async def commit(self):
- """Commit this :class:`.Transaction`."""
+ async def commit(self) -> None:
+ """Commit this :class:`.AsyncTransaction`."""
- await greenlet_spawn(self._sync_transaction().commit)
+ await greenlet_spawn(self._proxied.commit)
- async def start(self, is_ctxmanager=False):
+ async def start(self, is_ctxmanager: bool = False) -> AsyncTransaction:
"""Start this :class:`_asyncio.AsyncTransaction` object's context
outside of using a Python ``with:`` block.
@@ -1047,24 +1096,36 @@ class AsyncTransaction(ProxyComparable, StartableContext):
self.sync_transaction = self._assign_proxied(
await greenlet_spawn(
- self.connection._sync_connection().begin_nested
+ self.connection._proxied.begin_nested
if self.nested
- else self.connection._sync_connection().begin
+ else self.connection._proxied.begin
)
)
if is_ctxmanager:
self.sync_transaction.__enter__()
return self
- async def __aexit__(self, type_, value, traceback):
- await greenlet_spawn(
- self._sync_transaction().__exit__, type_, value, traceback
- )
+ async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None:
+ await greenlet_spawn(self._proxied.__exit__, type_, value, traceback)
+
+
+@overload
+def _get_sync_engine_or_connection(async_engine: AsyncEngine) -> Engine:
+ ...
+
+
+@overload
+def _get_sync_engine_or_connection(
+ async_engine: AsyncConnection,
+) -> Connection:
+ ...
-def _get_sync_engine_or_connection(async_engine):
+def _get_sync_engine_or_connection(
+ async_engine: Union[AsyncEngine, AsyncConnection]
+) -> Union[Engine, Connection]:
if isinstance(async_engine, AsyncConnection):
- return async_engine.sync_connection
+ return async_engine._proxied
try:
return async_engine.sync_engine
@@ -1075,7 +1136,7 @@ def _get_sync_engine_or_connection(async_engine):
@inspection._inspects(AsyncConnection)
-def _no_insp_for_async_conn_yet(subject):
+def _no_insp_for_async_conn_yet(subject: AsyncConnection) -> NoReturn:
raise exc.NoInspectionAvailable(
"Inspection on an AsyncConnection is currently not supported. "
"Please use ``run_sync`` to pass a callable where it's possible "
@@ -1085,7 +1146,7 @@ def _no_insp_for_async_conn_yet(subject):
@inspection._inspects(AsyncEngine)
-def _no_insp_for_async_engine_xyet(subject):
+def _no_insp_for_async_engine_xyet(subject: AsyncEngine) -> NoReturn:
raise exc.NoInspectionAvailable(
"Inspection on an AsyncEngine is currently not supported. "
"Please obtain a connection then use ``conn.run_sync`` to pass a "