summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/ext/asyncio
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2021-01-06 22:56:14 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2021-01-07 15:59:59 -0500
commitb45aa7c4062bafae23286c3069571c2596aabc66 (patch)
tree12d496c61ab71e02ca0fa1c785ee8b34d577b73a /lib/sqlalchemy/ext/asyncio
parent7f92fdbd8ec479a61c53c11921ce0688ad4dd94b (diff)
downloadsqlalchemy-b45aa7c4062bafae23286c3069571c2596aabc66.tar.gz
Implement connection binding for AsyncSession
Implemented "connection-binding" for :class:`.AsyncSession`, the ability to pass an :class:`.AsyncConnection` to create an :class:`.AsyncSession`. Previously, this use case was not implemented and would use the associated engine when the connection were passed. This fixes the issue where the "join a session to an external transaction" use case would not work correctly for the :class:`.AsyncSession`. Additionally, added methods :meth:`.AsyncConnection.in_transaction`, :meth:`.AsyncConnection.in_nested_transaction`, :meth:`.AsyncConnection.get_transaction`. The :class:`.AsyncEngine`, :class:`.AsyncConnection` and :class:`.AsyncTransaction` objects may be compared using Python ``==`` or ``!=``, which will compare the two given objects based on the "sync" object they are proxying towards. This is useful as there are cases particularly for :class:`.AsyncTransaction` where multiple instances of :class:`.AsyncTransaction` can be proxying towards the same sync :class:`_engine.Transaction`, and are actually equivalent. The :meth:`.AsyncConnection.get_transaction` method will currently return a new proxying :class:`.AsyncTransaction` each time as the :class:`.AsyncTransaction` is not otherwise statefully associated with its originating :class:`.AsyncConnection`. Fixes: #5811 Change-Id: I5a3a6b2f088541eee7b0e0f393510e61bc9f986b
Diffstat (limited to 'lib/sqlalchemy/ext/asyncio')
-rw-r--r--lib/sqlalchemy/ext/asyncio/base.py17
-rw-r--r--lib/sqlalchemy/ext/asyncio/engine.py110
-rw-r--r--lib/sqlalchemy/ext/asyncio/session.py5
3 files changed, 126 insertions, 6 deletions
diff --git a/lib/sqlalchemy/ext/asyncio/base.py b/lib/sqlalchemy/ext/asyncio/base.py
index 051f9e21a..fa8c5006e 100644
--- a/lib/sqlalchemy/ext/asyncio/base.py
+++ b/lib/sqlalchemy/ext/asyncio/base.py
@@ -23,3 +23,20 @@ class StartableContext(abc.ABC):
"%s context has not been started and object has not been awaited."
% (self.__class__.__name__)
)
+
+
+class ProxyComparable:
+ def __hash__(self):
+ return id(self)
+
+ def __eq__(self, other):
+ return (
+ isinstance(other, self.__class__)
+ and self._proxied == other._proxied
+ )
+
+ def __ne__(self, other):
+ return (
+ not isinstance(other, self.__class__)
+ or self._proxied != other._proxied
+ )
diff --git a/lib/sqlalchemy/ext/asyncio/engine.py b/lib/sqlalchemy/ext/asyncio/engine.py
index 93adaf78a..5951abc1e 100644
--- a/lib/sqlalchemy/ext/asyncio/engine.py
+++ b/lib/sqlalchemy/ext/asyncio/engine.py
@@ -4,6 +4,7 @@ from typing import Mapping
from typing import Optional
from . import exc as async_exc
+from .base import ProxyComparable
from .base import StartableContext
from .result import AsyncResult
from ... import exc
@@ -57,7 +58,7 @@ class AsyncConnectable:
"default_isolation_level",
],
)
-class AsyncConnection(StartableContext, AsyncConnectable):
+class AsyncConnection(ProxyComparable, StartableContext, AsyncConnectable):
"""An asyncio proxy for a :class:`_engine.Connection`.
:class:`_asyncio.AsyncConnection` is acquired using the
@@ -131,6 +132,24 @@ class AsyncConnection(StartableContext, AsyncConnectable):
def _proxied(self):
return self.sync_connection
+ @property
+ def info(self):
+ """Return the :attr:`_engine.Connection.info` dictionary of the
+ underlying :class:`_engine.Connection`.
+
+ This dictionary is freely writable for user-defined state to be
+ associated with the database connection.
+
+ This attribute is only available if the :class:`.AsyncConnection` is
+ currently connected. If the :attr:`.AsyncConnection.closed` attribute
+ is ``True``, then accessing this attribute will raise
+ :class:`.ResourceClosedError`.
+
+ .. versionadded:: 1.4.0b2
+
+ """
+ return self.sync_connection.info
+
def _sync_connection(self):
if not self.sync_connection:
self._raise_for_not_started()
@@ -166,6 +185,69 @@ class AsyncConnection(StartableContext, AsyncConnectable):
conn = self._sync_connection()
return await greenlet_spawn(conn.get_isolation_level)
+ def in_transaction(self):
+ """Return True if a transaction is in progress.
+
+ .. versionadded:: 1.4.0b2
+
+ """
+
+ conn = self._sync_connection()
+
+ return conn.in_transaction()
+
+ def in_nested_transaction(self):
+ """Return True if a transaction is in progress.
+
+ .. versionadded:: 1.4.0b2
+
+ """
+ conn = self._sync_connection()
+
+ return conn.in_nested_transaction()
+
+ def get_transaction(self):
+ """Return an :class:`.AsyncTransaction` representing the current
+ transaction, if any.
+
+ This makes use of the underlying synchronous connection's
+ :meth:`_engine.Connection.get_transaction` method to get the current
+ :class:`_engine.Transaction`, which is then proxied in a new
+ :class:`.AsyncTransaction` object.
+
+ .. versionadded:: 1.4.0b2
+
+ """
+ conn = self._sync_connection()
+
+ trans = conn.get_transaction()
+ if trans is not None:
+ return AsyncTransaction._from_existing_transaction(self, trans)
+ else:
+ return None
+
+ def get_nested_transaction(self):
+ """Return an :class:`.AsyncTransaction` representing the current
+ nested (savepoint) transaction, if any.
+
+ This makes use of the underlying synchronous connection's
+ :meth:`_engine.Connection.get_nested_transaction` method to get the
+ current :class:`_engine.Transaction`, which is then proxied in a new
+ :class:`.AsyncTransaction` object.
+
+ .. versionadded:: 1.4.0b2
+
+ """
+ conn = self._sync_connection()
+
+ trans = conn.get_nested_transaction()
+ if trans is not None:
+ return AsyncTransaction._from_existing_transaction(
+ self, trans, True
+ )
+ else:
+ return None
+
async def execution_options(self, **opt):
r"""Set non-SQL options for the connection which take effect
during execution.
@@ -391,7 +473,7 @@ class AsyncConnection(StartableContext, AsyncConnectable):
],
attributes=["url", "pool", "dialect", "engine", "name", "driver", "echo"],
)
-class AsyncEngine(AsyncConnectable):
+class AsyncEngine(ProxyComparable, AsyncConnectable):
"""An asyncio proxy for a :class:`_engine.Engine`.
:class:`_asyncio.AsyncEngine` is acquired using the
@@ -513,7 +595,7 @@ class AsyncEngine(AsyncConnectable):
return await greenlet_spawn(self.sync_engine.dispose)
-class AsyncTransaction(StartableContext):
+class AsyncTransaction(ProxyComparable, StartableContext):
"""An asyncio proxy for a :class:`_engine.Transaction`."""
__slots__ = ("connection", "sync_transaction", "nested")
@@ -523,12 +605,29 @@ class AsyncTransaction(StartableContext):
self.sync_transaction: Optional[Transaction] = None
self.nested = nested
+ @classmethod
+ def _from_existing_transaction(
+ cls,
+ connection: AsyncConnection,
+ sync_transaction: Transaction,
+ nested: bool = False,
+ ):
+ obj = cls.__new__(cls)
+ obj.connection = connection
+ obj.sync_transaction = sync_transaction
+ obj.nested = nested
+ return obj
+
def _sync_transaction(self):
if not self.sync_transaction:
self._raise_for_not_started()
return self.sync_transaction
@property
+ def _proxied(self):
+ return self.sync_transaction
+
+ @property
def is_valid(self) -> bool:
return self._sync_transaction().is_valid
@@ -582,7 +681,10 @@ class AsyncTransaction(StartableContext):
await self.rollback()
-def _get_sync_engine(async_engine):
+def _get_sync_engine_or_connection(async_engine):
+ if isinstance(async_engine, AsyncConnection):
+ return async_engine.sync_connection
+
try:
return async_engine.sync_engine
except AttributeError as e:
diff --git a/lib/sqlalchemy/ext/asyncio/session.py b/lib/sqlalchemy/ext/asyncio/session.py
index bac2aa44b..9a8284e64 100644
--- a/lib/sqlalchemy/ext/asyncio/session.py
+++ b/lib/sqlalchemy/ext/asyncio/session.py
@@ -75,12 +75,13 @@ class AsyncSession:
kw["future"] = True
if bind:
self.bind = engine
- bind = engine._get_sync_engine(bind)
+ bind = engine._get_sync_engine_or_connection(bind)
if binds:
self.binds = binds
binds = {
- key: engine._get_sync_engine(b) for key, b in binds.items()
+ key: engine._get_sync_engine_or_connection(b)
+ for key, b in binds.items()
}
self.sync_session = self._proxied = Session(