diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2021-01-06 22:56:14 -0500 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2021-01-07 15:59:59 -0500 |
commit | b45aa7c4062bafae23286c3069571c2596aabc66 (patch) | |
tree | 12d496c61ab71e02ca0fa1c785ee8b34d577b73a /lib/sqlalchemy/ext/asyncio | |
parent | 7f92fdbd8ec479a61c53c11921ce0688ad4dd94b (diff) | |
download | sqlalchemy-b45aa7c4062bafae23286c3069571c2596aabc66.tar.gz |
Implement connection binding for AsyncSession
Implemented "connection-binding" for :class:`.AsyncSession`, the ability to
pass an :class:`.AsyncConnection` to create an :class:`.AsyncSession`.
Previously, this use case was not implemented and would use the associated
engine when the connection were passed. This fixes the issue where the
"join a session to an external transaction" use case would not work
correctly for the :class:`.AsyncSession`. Additionally, added methods
:meth:`.AsyncConnection.in_transaction`,
:meth:`.AsyncConnection.in_nested_transaction`,
:meth:`.AsyncConnection.get_transaction`.
The :class:`.AsyncEngine`, :class:`.AsyncConnection` and
:class:`.AsyncTransaction` objects may be compared using Python ``==`` or
``!=``, which will compare the two given objects based on the "sync" object
they are proxying towards. This is useful as there are cases particularly
for :class:`.AsyncTransaction` where multiple instances of
:class:`.AsyncTransaction` can be proxying towards the same sync
:class:`_engine.Transaction`, and are actually equivalent. The
:meth:`.AsyncConnection.get_transaction` method will currently return a new
proxying :class:`.AsyncTransaction` each time as the
:class:`.AsyncTransaction` is not otherwise statefully associated with its
originating :class:`.AsyncConnection`.
Fixes: #5811
Change-Id: I5a3a6b2f088541eee7b0e0f393510e61bc9f986b
Diffstat (limited to 'lib/sqlalchemy/ext/asyncio')
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/base.py | 17 | ||||
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/engine.py | 110 | ||||
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/session.py | 5 |
3 files changed, 126 insertions, 6 deletions
diff --git a/lib/sqlalchemy/ext/asyncio/base.py b/lib/sqlalchemy/ext/asyncio/base.py index 051f9e21a..fa8c5006e 100644 --- a/lib/sqlalchemy/ext/asyncio/base.py +++ b/lib/sqlalchemy/ext/asyncio/base.py @@ -23,3 +23,20 @@ class StartableContext(abc.ABC): "%s context has not been started and object has not been awaited." % (self.__class__.__name__) ) + + +class ProxyComparable: + def __hash__(self): + return id(self) + + def __eq__(self, other): + return ( + isinstance(other, self.__class__) + and self._proxied == other._proxied + ) + + def __ne__(self, other): + return ( + not isinstance(other, self.__class__) + or self._proxied != other._proxied + ) diff --git a/lib/sqlalchemy/ext/asyncio/engine.py b/lib/sqlalchemy/ext/asyncio/engine.py index 93adaf78a..5951abc1e 100644 --- a/lib/sqlalchemy/ext/asyncio/engine.py +++ b/lib/sqlalchemy/ext/asyncio/engine.py @@ -4,6 +4,7 @@ from typing import Mapping from typing import Optional from . import exc as async_exc +from .base import ProxyComparable from .base import StartableContext from .result import AsyncResult from ... import exc @@ -57,7 +58,7 @@ class AsyncConnectable: "default_isolation_level", ], ) -class AsyncConnection(StartableContext, AsyncConnectable): +class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable): """An asyncio proxy for a :class:`_engine.Connection`. :class:`_asyncio.AsyncConnection` is acquired using the @@ -131,6 +132,24 @@ class AsyncConnection(StartableContext, AsyncConnectable): def _proxied(self): return self.sync_connection + @property + def info(self): + """Return the :attr:`_engine.Connection.info` dictionary of the + underlying :class:`_engine.Connection`. + + This dictionary is freely writable for user-defined state to be + associated with the database connection. + + This attribute is only available if the :class:`.AsyncConnection` is + currently connected. If the :attr:`.AsyncConnection.closed` attribute + is ``True``, then accessing this attribute will raise + :class:`.ResourceClosedError`. + + .. versionadded:: 1.4.0b2 + + """ + return self.sync_connection.info + def _sync_connection(self): if not self.sync_connection: self._raise_for_not_started() @@ -166,6 +185,69 @@ class AsyncConnection(StartableContext, AsyncConnectable): conn = self._sync_connection() return await greenlet_spawn(conn.get_isolation_level) + def in_transaction(self): + """Return True if a transaction is in progress. + + .. versionadded:: 1.4.0b2 + + """ + + conn = self._sync_connection() + + return conn.in_transaction() + + def in_nested_transaction(self): + """Return True if a transaction is in progress. + + .. versionadded:: 1.4.0b2 + + """ + conn = self._sync_connection() + + return conn.in_nested_transaction() + + def get_transaction(self): + """Return an :class:`.AsyncTransaction` representing the current + transaction, if any. + + This makes use of the underlying synchronous connection's + :meth:`_engine.Connection.get_transaction` method to get the current + :class:`_engine.Transaction`, which is then proxied in a new + :class:`.AsyncTransaction` object. + + .. versionadded:: 1.4.0b2 + + """ + conn = self._sync_connection() + + trans = conn.get_transaction() + if trans is not None: + return AsyncTransaction._from_existing_transaction(self, trans) + else: + return None + + def get_nested_transaction(self): + """Return an :class:`.AsyncTransaction` representing the current + nested (savepoint) transaction, if any. + + This makes use of the underlying synchronous connection's + :meth:`_engine.Connection.get_nested_transaction` method to get the + current :class:`_engine.Transaction`, which is then proxied in a new + :class:`.AsyncTransaction` object. + + .. versionadded:: 1.4.0b2 + + """ + conn = self._sync_connection() + + trans = conn.get_nested_transaction() + if trans is not None: + return AsyncTransaction._from_existing_transaction( + self, trans, True + ) + else: + return None + async def execution_options(self, **opt): r"""Set non-SQL options for the connection which take effect during execution. @@ -391,7 +473,7 @@ class AsyncConnection(StartableContext, AsyncConnectable): ], attributes=["url", "pool", "dialect", "engine", "name", "driver", "echo"], ) -class AsyncEngine(AsyncConnectable): +class AsyncEngine(ProxyComparable, AsyncConnectable): """An asyncio proxy for a :class:`_engine.Engine`. :class:`_asyncio.AsyncEngine` is acquired using the @@ -513,7 +595,7 @@ class AsyncEngine(AsyncConnectable): return await greenlet_spawn(self.sync_engine.dispose) -class AsyncTransaction(StartableContext): +class AsyncTransaction(ProxyComparable, StartableContext): """An asyncio proxy for a :class:`_engine.Transaction`.""" __slots__ = ("connection", "sync_transaction", "nested") @@ -523,12 +605,29 @@ class AsyncTransaction(StartableContext): self.sync_transaction: Optional[Transaction] = None self.nested = nested + @classmethod + def _from_existing_transaction( + cls, + connection: AsyncConnection, + sync_transaction: Transaction, + nested: bool = False, + ): + obj = cls.__new__(cls) + obj.connection = connection + obj.sync_transaction = sync_transaction + obj.nested = nested + return obj + def _sync_transaction(self): if not self.sync_transaction: self._raise_for_not_started() return self.sync_transaction @property + def _proxied(self): + return self.sync_transaction + + @property def is_valid(self) -> bool: return self._sync_transaction().is_valid @@ -582,7 +681,10 @@ class AsyncTransaction(StartableContext): await self.rollback() -def _get_sync_engine(async_engine): +def _get_sync_engine_or_connection(async_engine): + if isinstance(async_engine, AsyncConnection): + return async_engine.sync_connection + try: return async_engine.sync_engine except AttributeError as e: diff --git a/lib/sqlalchemy/ext/asyncio/session.py b/lib/sqlalchemy/ext/asyncio/session.py index bac2aa44b..9a8284e64 100644 --- a/lib/sqlalchemy/ext/asyncio/session.py +++ b/lib/sqlalchemy/ext/asyncio/session.py @@ -75,12 +75,13 @@ class AsyncSession: kw["future"] = True if bind: self.bind = engine - bind = engine._get_sync_engine(bind) + bind = engine._get_sync_engine_or_connection(bind) if binds: self.binds = binds binds = { - key: engine._get_sync_engine(b) for key, b in binds.items() + key: engine._get_sync_engine_or_connection(b) + for key, b in binds.items() } self.sync_session = self._proxied = Session( |