summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/ext/asyncio
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/ext/asyncio')
-rw-r--r--lib/sqlalchemy/ext/asyncio/__init__.py2
-rw-r--r--lib/sqlalchemy/ext/asyncio/engine.py153
-rw-r--r--lib/sqlalchemy/ext/asyncio/events.py29
-rw-r--r--lib/sqlalchemy/ext/asyncio/session.py97
4 files changed, 233 insertions, 48 deletions
diff --git a/lib/sqlalchemy/ext/asyncio/__init__.py b/lib/sqlalchemy/ext/asyncio/__init__.py
index fbbc958d4..9c7d6443c 100644
--- a/lib/sqlalchemy/ext/asyncio/__init__.py
+++ b/lib/sqlalchemy/ext/asyncio/__init__.py
@@ -2,6 +2,8 @@ from .engine import AsyncConnection # noqa
from .engine import AsyncEngine # noqa
from .engine import AsyncTransaction # noqa
from .engine import create_async_engine # noqa
+from .events import AsyncConnectionEvents # noqa
+from .events import AsyncSessionEvents # noqa
from .result import AsyncMappingResult # noqa
from .result import AsyncResult # noqa
from .result import AsyncScalarResult # noqa
diff --git a/lib/sqlalchemy/ext/asyncio/engine.py b/lib/sqlalchemy/ext/asyncio/engine.py
index 4a92fb1f2..9e4851dfc 100644
--- a/lib/sqlalchemy/ext/asyncio/engine.py
+++ b/lib/sqlalchemy/ext/asyncio/engine.py
@@ -8,12 +8,11 @@ from .base import StartableContext
from .result import AsyncResult
from ... import exc
from ... import util
-from ...engine import Connection
from ...engine import create_engine as _create_engine
-from ...engine import Engine
from ...engine import Result
from ...engine import Transaction
-from ...engine.base import OptionEngineMixin
+from ...future import Connection
+from ...future import Engine
from ...sql import Executable
from ...util.concurrency import greenlet_spawn
@@ -41,7 +40,24 @@ def create_async_engine(*arg, **kw):
return AsyncEngine(sync_engine)
-class AsyncConnection(StartableContext):
+class AsyncConnectable:
+ __slots__ = "_slots_dispatch"
+
+
+@util.create_proxy_methods(
+ Connection,
+ ":class:`_future.Connection`",
+ ":class:`_asyncio.AsyncConnection`",
+ classmethods=[],
+ methods=[],
+ attributes=[
+ "closed",
+ "invalidated",
+ "dialect",
+ "default_isolation_level",
+ ],
+)
+class AsyncConnection(StartableContext, AsyncConnectable):
"""An asyncio proxy for a :class:`_engine.Connection`.
:class:`_asyncio.AsyncConnection` is acquired using the
@@ -58,15 +74,23 @@ class AsyncConnection(StartableContext):
""" # noqa
+ # AsyncConnection is a thin proxy; no state should be added here
+ # that is not retrievable from the "sync" engine / connection, e.g.
+ # current transaction, info, etc. It should be possible to
+ # create a new AsyncConnection that matches this one given only the
+ # "sync" elements.
__slots__ = (
"sync_engine",
"sync_connection",
)
def __init__(
- self, sync_engine: Engine, sync_connection: Optional[Connection] = None
+ self,
+ async_engine: "AsyncEngine",
+ sync_connection: Optional[Connection] = None,
):
- self.sync_engine = sync_engine
+ self.engine = async_engine
+ self.sync_engine = async_engine.sync_engine
self.sync_connection = sync_connection
async def start(self):
@@ -79,6 +103,34 @@ class AsyncConnection(StartableContext):
self.sync_connection = await (greenlet_spawn(self.sync_engine.connect))
return self
+ @property
+ def connection(self):
+ """Not implemented for async; call
+ :meth:`_asyncio.AsyncConnection.get_raw_connection`.
+
+ """
+ raise exc.InvalidRequestError(
+ "AsyncConnection.connection accessor is not implemented as the "
+ "attribute may need to reconnect on an invalidated connection. "
+ "Use the get_raw_connection() method."
+ )
+
+ async def get_raw_connection(self):
+ """Return the pooled DBAPI-level connection in use by this
+ :class:`_asyncio.AsyncConnection`.
+
+ This is typically the SQLAlchemy connection-pool proxied connection
+ which then has an attribute .connection that refers to the actual
+ DBAPI-level connection.
+ """
+ conn = self._sync_connection()
+
+ return await greenlet_spawn(getattr, conn, "connection")
+
+ @property
+ def _proxied(self):
+ return self.sync_connection
+
def _sync_connection(self):
if not self.sync_connection:
self._raise_for_not_started()
@@ -94,6 +146,43 @@ class AsyncConnection(StartableContext):
self._sync_connection()
return AsyncTransaction(self, nested=True)
+ async def invalidate(self, exception=None):
+ """Invalidate the underlying DBAPI connection associated with
+ this :class:`_engine.Connection`.
+
+ See the method :meth:`_engine.Connection.invalidate` for full
+ detail on this method.
+
+ """
+
+ conn = self._sync_connection()
+ return await greenlet_spawn(conn.invalidate, exception=exception)
+
+ async def get_isolation_level(self):
+ conn = self._sync_connection()
+ return await greenlet_spawn(conn.get_isolation_level)
+
+ async def set_isolation_level(self):
+ conn = self._sync_connection()
+ return await greenlet_spawn(conn.get_isolation_level)
+
+ async def execution_options(self, **opt):
+ r"""Set non-SQL options for the connection which take effect
+ during execution.
+
+ This returns this :class:`_asyncio.AsyncConnection` object with
+ the new options added.
+
+ See :meth:`_future.Connection.execution_options` for full details
+ on this method.
+
+ """
+
+ conn = self._sync_connection()
+ c2 = await greenlet_spawn(conn.execution_options, **opt)
+ assert c2 is conn
+ return self
+
async def commit(self):
"""Commit the transaction that is currently in progress.
@@ -287,7 +376,19 @@ class AsyncConnection(StartableContext):
await self.close()
-class AsyncEngine:
+@util.create_proxy_methods(
+ Engine,
+ ":class:`_future.Engine`",
+ ":class:`_asyncio.AsyncEngine`",
+ classmethods=[],
+ methods=[
+ "clear_compiled_cache",
+ "update_execution_options",
+ "get_execution_options",
+ ],
+ attributes=["url", "pool", "dialect", "engine", "name", "driver", "echo"],
+)
+class AsyncEngine(AsyncConnectable):
"""An asyncio proxy for a :class:`_engine.Engine`.
:class:`_asyncio.AsyncEngine` is acquired using the
@@ -301,7 +402,12 @@ class AsyncEngine:
""" # noqa
- __slots__ = ("sync_engine",)
+ # AsyncEngine is a thin proxy; no state should be added here
+ # that is not retrievable from the "sync" engine / connection, e.g.
+ # current transaction, info, etc. It should be possible to
+ # create a new AsyncEngine that matches this one given only the
+ # "sync" elements.
+ __slots__ = ("sync_engine", "_proxied")
_connection_cls = AsyncConnection
@@ -327,7 +433,7 @@ class AsyncEngine:
await self.conn.close()
def __init__(self, sync_engine: Engine):
- self.sync_engine = sync_engine
+ self.sync_engine = self._proxied = sync_engine
def begin(self):
"""Return a context manager which when entered will deliver an
@@ -363,7 +469,7 @@ class AsyncEngine:
"""
- return self._connection_cls(self.sync_engine)
+ return self._connection_cls(self)
async def raw_connection(self) -> Any:
"""Return a "raw" DBAPI connection from the connection pool.
@@ -375,12 +481,33 @@ class AsyncEngine:
"""
return await greenlet_spawn(self.sync_engine.raw_connection)
+ def execution_options(self, **opt):
+ """Return a new :class:`_asyncio.AsyncEngine` that will provide
+ :class:`_asyncio.AsyncConnection` objects with the given execution
+ options.
+
+ Proxied from :meth:`_future.Engine.execution_options`. See that
+ method for details.
+
+ """
+
+ return AsyncEngine(self.sync_engine.execution_options(**opt))
-class AsyncOptionEngine(OptionEngineMixin, AsyncEngine):
- pass
+ async def dispose(self):
+ """Dispose of the connection pool used by this
+ :class:`_asyncio.AsyncEngine`.
+ This will close all connection pool connections that are
+ **currently checked in**. See the documentation for the underlying
+ :meth:`_future.Engine.dispose` method for further notes.
+
+ .. seealso::
+
+ :meth:`_future.Engine.dispose`
+
+ """
-AsyncEngine._option_cls = AsyncOptionEngine
+ return await greenlet_spawn(self.sync_engine.dispose)
class AsyncTransaction(StartableContext):
diff --git a/lib/sqlalchemy/ext/asyncio/events.py b/lib/sqlalchemy/ext/asyncio/events.py
new file mode 100644
index 000000000..a8daefc4b
--- /dev/null
+++ b/lib/sqlalchemy/ext/asyncio/events.py
@@ -0,0 +1,29 @@
+from .engine import AsyncConnectable
+from .session import AsyncSession
+from ...engine import events as engine_event
+from ...orm import events as orm_event
+
+
+class AsyncConnectionEvents(engine_event.ConnectionEvents):
+ _target_class_doc = "SomeEngine"
+ _dispatch_target = AsyncConnectable
+
+ @classmethod
+ def _listen(cls, event_key, retval=False):
+ raise NotImplementedError(
+ "asynchronous events are not implemented at this time. Apply "
+ "synchronous listeners to the AsyncEngine.sync_engine or "
+ "AsyncConnection.sync_connection attributes."
+ )
+
+
+class AsyncSessionEvents(orm_event.SessionEvents):
+ _target_class_doc = "SomeSession"
+ _dispatch_target = AsyncSession
+
+ @classmethod
+ def _listen(cls, event_key, retval=False):
+ raise NotImplementedError(
+ "asynchronous events are not implemented at this time. Apply "
+ "synchronous listeners to the AsyncSession.sync_session."
+ )
diff --git a/lib/sqlalchemy/ext/asyncio/session.py b/lib/sqlalchemy/ext/asyncio/session.py
index cb06aa26d..4ae1fb385 100644
--- a/lib/sqlalchemy/ext/asyncio/session.py
+++ b/lib/sqlalchemy/ext/asyncio/session.py
@@ -1,6 +1,5 @@
from typing import Any
from typing import Callable
-from typing import List
from typing import Mapping
from typing import Optional
@@ -15,6 +14,35 @@ from ...sql import Executable
from ...util.concurrency import greenlet_spawn
+@util.create_proxy_methods(
+ Session,
+ ":class:`_orm.Session`",
+ ":class:`_asyncio.AsyncSession`",
+ classmethods=["object_session", "identity_key"],
+ methods=[
+ "__contains__",
+ "__iter__",
+ "add",
+ "add_all",
+ "delete",
+ "expire",
+ "expire_all",
+ "expunge",
+ "expunge_all",
+ "get_bind",
+ "is_modified",
+ ],
+ attributes=[
+ "dirty",
+ "deleted",
+ "new",
+ "identity_map",
+ "is_active",
+ "autoflush",
+ "no_autoflush",
+ "info",
+ ],
+)
class AsyncSession:
"""Asyncio version of :class:`_orm.Session`.
@@ -23,6 +51,16 @@ class AsyncSession:
"""
+ __slots__ = (
+ "binds",
+ "bind",
+ "sync_session",
+ "_proxied",
+ "_slots_dispatch",
+ )
+
+ dispatch = None
+
def __init__(
self,
bind: AsyncEngine = None,
@@ -31,46 +69,18 @@ class AsyncSession:
):
kw["future"] = True
if bind:
+ self.bind = engine
bind = engine._get_sync_engine(bind)
if binds:
+ self.binds = binds
binds = {
key: engine._get_sync_engine(b) for key, b in binds.items()
}
- self.sync_session = Session(bind=bind, binds=binds, **kw)
-
- def add(self, instance: object) -> None:
- """Place an object in this :class:`_asyncio.AsyncSession`.
-
- .. seealso::
-
- :meth:`_orm.Session.add`
-
- """
- self.sync_session.add(instance)
-
- def add_all(self, instances: List[object]) -> None:
- """Add the given collection of instances to this
- :class:`_asyncio.AsyncSession`."""
-
- self.sync_session.add_all(instances)
-
- def expire_all(self):
- """Expires all persistent instances within this Session.
-
- See :meth:`_orm.Session.expire_all` for usage details.
-
- """
- self.sync_session.expire_all()
-
- def expire(self, instance, attribute_names=None):
- """Expire the attributes on an instance.
-
- See :meth:`._orm.Session.expire` for usage details.
-
- """
- self.sync_session.expire()
+ self.sync_session = self._proxied = Session(
+ bind=bind, binds=binds, **kw
+ )
async def refresh(
self, instance, attribute_names=None, with_for_update=None
@@ -178,8 +188,17 @@ class AsyncSession:
:class:`.Session` object's transactional state.
"""
+
+ # POSSIBLY TODO: here, we see that the sync engine / connection
+ # that are generated from AsyncEngine / AsyncConnection don't
+ # provide any backlink from those sync objects back out to the
+ # async ones. it's not *too* big a deal since AsyncEngine/Connection
+ # are just proxies and all the state is actually in the sync
+ # version of things. However! it has to stay that way :)
sync_connection = await greenlet_spawn(self.sync_session.connection)
- return engine.AsyncConnection(sync_connection.engine, sync_connection)
+ return engine.AsyncConnection(
+ engine.AsyncEngine(sync_connection.engine), sync_connection
+ )
def begin(self, **kw):
"""Return an :class:`_asyncio.AsyncSessionTransaction` object.
@@ -218,14 +237,22 @@ class AsyncSession:
return AsyncSessionTransaction(self, nested=True)
async def rollback(self):
+ """Rollback the current transaction in progress."""
return await greenlet_spawn(self.sync_session.rollback)
async def commit(self):
+ """Commit the current transaction in progress."""
return await greenlet_spawn(self.sync_session.commit)
async def close(self):
+ """Close this :class:`_asyncio.AsyncSession`."""
return await greenlet_spawn(self.sync_session.close)
+ @classmethod
+ async def close_all(self):
+ """Close all :class:`_asyncio.AsyncSession` sessions."""
+ return await greenlet_spawn(self.sync_session.close_all)
+
async def __aenter__(self):
return self