diff options
Diffstat (limited to 'lib/sqlalchemy/ext/asyncio')
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/__init__.py | 2 | ||||
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/engine.py | 153 | ||||
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/events.py | 29 | ||||
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/session.py | 97 |
4 files changed, 233 insertions, 48 deletions
diff --git a/lib/sqlalchemy/ext/asyncio/__init__.py b/lib/sqlalchemy/ext/asyncio/__init__.py index fbbc958d4..9c7d6443c 100644 --- a/lib/sqlalchemy/ext/asyncio/__init__.py +++ b/lib/sqlalchemy/ext/asyncio/__init__.py @@ -2,6 +2,8 @@ from .engine import AsyncConnection # noqa from .engine import AsyncEngine # noqa from .engine import AsyncTransaction # noqa from .engine import create_async_engine # noqa +from .events import AsyncConnectionEvents # noqa +from .events import AsyncSessionEvents # noqa from .result import AsyncMappingResult # noqa from .result import AsyncResult # noqa from .result import AsyncScalarResult # noqa diff --git a/lib/sqlalchemy/ext/asyncio/engine.py b/lib/sqlalchemy/ext/asyncio/engine.py index 4a92fb1f2..9e4851dfc 100644 --- a/lib/sqlalchemy/ext/asyncio/engine.py +++ b/lib/sqlalchemy/ext/asyncio/engine.py @@ -8,12 +8,11 @@ from .base import StartableContext from .result import AsyncResult from ... import exc from ... import util -from ...engine import Connection from ...engine import create_engine as _create_engine -from ...engine import Engine from ...engine import Result from ...engine import Transaction -from ...engine.base import OptionEngineMixin +from ...future import Connection +from ...future import Engine from ...sql import Executable from ...util.concurrency import greenlet_spawn @@ -41,7 +40,24 @@ def create_async_engine(*arg, **kw): return AsyncEngine(sync_engine) -class AsyncConnection(StartableContext): +class AsyncConnectable: + __slots__ = "_slots_dispatch" + + +@util.create_proxy_methods( + Connection, + ":class:`_future.Connection`", + ":class:`_asyncio.AsyncConnection`", + classmethods=[], + methods=[], + attributes=[ + "closed", + "invalidated", + "dialect", + "default_isolation_level", + ], +) +class AsyncConnection(StartableContext, AsyncConnectable): """An asyncio proxy for a :class:`_engine.Connection`. :class:`_asyncio.AsyncConnection` is acquired using the @@ -58,15 +74,23 @@ class AsyncConnection(StartableContext): """ # noqa + # AsyncConnection is a thin proxy; no state should be added here + # that is not retrievable from the "sync" engine / connection, e.g. + # current transaction, info, etc. It should be possible to + # create a new AsyncConnection that matches this one given only the + # "sync" elements. __slots__ = ( "sync_engine", "sync_connection", ) def __init__( - self, sync_engine: Engine, sync_connection: Optional[Connection] = None + self, + async_engine: "AsyncEngine", + sync_connection: Optional[Connection] = None, ): - self.sync_engine = sync_engine + self.engine = async_engine + self.sync_engine = async_engine.sync_engine self.sync_connection = sync_connection async def start(self): @@ -79,6 +103,34 @@ class AsyncConnection(StartableContext): self.sync_connection = await (greenlet_spawn(self.sync_engine.connect)) return self + @property + def connection(self): + """Not implemented for async; call + :meth:`_asyncio.AsyncConnection.get_raw_connection`. + + """ + raise exc.InvalidRequestError( + "AsyncConnection.connection accessor is not implemented as the " + "attribute may need to reconnect on an invalidated connection. " + "Use the get_raw_connection() method." + ) + + async def get_raw_connection(self): + """Return the pooled DBAPI-level connection in use by this + :class:`_asyncio.AsyncConnection`. + + This is typically the SQLAlchemy connection-pool proxied connection + which then has an attribute .connection that refers to the actual + DBAPI-level connection. + """ + conn = self._sync_connection() + + return await greenlet_spawn(getattr, conn, "connection") + + @property + def _proxied(self): + return self.sync_connection + def _sync_connection(self): if not self.sync_connection: self._raise_for_not_started() @@ -94,6 +146,43 @@ class AsyncConnection(StartableContext): self._sync_connection() return AsyncTransaction(self, nested=True) + async def invalidate(self, exception=None): + """Invalidate the underlying DBAPI connection associated with + this :class:`_engine.Connection`. + + See the method :meth:`_engine.Connection.invalidate` for full + detail on this method. + + """ + + conn = self._sync_connection() + return await greenlet_spawn(conn.invalidate, exception=exception) + + async def get_isolation_level(self): + conn = self._sync_connection() + return await greenlet_spawn(conn.get_isolation_level) + + async def set_isolation_level(self): + conn = self._sync_connection() + return await greenlet_spawn(conn.get_isolation_level) + + async def execution_options(self, **opt): + r"""Set non-SQL options for the connection which take effect + during execution. + + This returns this :class:`_asyncio.AsyncConnection` object with + the new options added. + + See :meth:`_future.Connection.execution_options` for full details + on this method. + + """ + + conn = self._sync_connection() + c2 = await greenlet_spawn(conn.execution_options, **opt) + assert c2 is conn + return self + async def commit(self): """Commit the transaction that is currently in progress. @@ -287,7 +376,19 @@ class AsyncConnection(StartableContext): await self.close() -class AsyncEngine: +@util.create_proxy_methods( + Engine, + ":class:`_future.Engine`", + ":class:`_asyncio.AsyncEngine`", + classmethods=[], + methods=[ + "clear_compiled_cache", + "update_execution_options", + "get_execution_options", + ], + attributes=["url", "pool", "dialect", "engine", "name", "driver", "echo"], +) +class AsyncEngine(AsyncConnectable): """An asyncio proxy for a :class:`_engine.Engine`. :class:`_asyncio.AsyncEngine` is acquired using the @@ -301,7 +402,12 @@ class AsyncEngine: """ # noqa - __slots__ = ("sync_engine",) + # AsyncEngine is a thin proxy; no state should be added here + # that is not retrievable from the "sync" engine / connection, e.g. + # current transaction, info, etc. It should be possible to + # create a new AsyncEngine that matches this one given only the + # "sync" elements. + __slots__ = ("sync_engine", "_proxied") _connection_cls = AsyncConnection @@ -327,7 +433,7 @@ class AsyncEngine: await self.conn.close() def __init__(self, sync_engine: Engine): - self.sync_engine = sync_engine + self.sync_engine = self._proxied = sync_engine def begin(self): """Return a context manager which when entered will deliver an @@ -363,7 +469,7 @@ class AsyncEngine: """ - return self._connection_cls(self.sync_engine) + return self._connection_cls(self) async def raw_connection(self) -> Any: """Return a "raw" DBAPI connection from the connection pool. @@ -375,12 +481,33 @@ class AsyncEngine: """ return await greenlet_spawn(self.sync_engine.raw_connection) + def execution_options(self, **opt): + """Return a new :class:`_asyncio.AsyncEngine` that will provide + :class:`_asyncio.AsyncConnection` objects with the given execution + options. + + Proxied from :meth:`_future.Engine.execution_options`. See that + method for details. + + """ + + return AsyncEngine(self.sync_engine.execution_options(**opt)) -class AsyncOptionEngine(OptionEngineMixin, AsyncEngine): - pass + async def dispose(self): + """Dispose of the connection pool used by this + :class:`_asyncio.AsyncEngine`. + This will close all connection pool connections that are + **currently checked in**. See the documentation for the underlying + :meth:`_future.Engine.dispose` method for further notes. + + .. seealso:: + + :meth:`_future.Engine.dispose` + + """ -AsyncEngine._option_cls = AsyncOptionEngine + return await greenlet_spawn(self.sync_engine.dispose) class AsyncTransaction(StartableContext): diff --git a/lib/sqlalchemy/ext/asyncio/events.py b/lib/sqlalchemy/ext/asyncio/events.py new file mode 100644 index 000000000..a8daefc4b --- /dev/null +++ b/lib/sqlalchemy/ext/asyncio/events.py @@ -0,0 +1,29 @@ +from .engine import AsyncConnectable +from .session import AsyncSession +from ...engine import events as engine_event +from ...orm import events as orm_event + + +class AsyncConnectionEvents(engine_event.ConnectionEvents): + _target_class_doc = "SomeEngine" + _dispatch_target = AsyncConnectable + + @classmethod + def _listen(cls, event_key, retval=False): + raise NotImplementedError( + "asynchronous events are not implemented at this time. Apply " + "synchronous listeners to the AsyncEngine.sync_engine or " + "AsyncConnection.sync_connection attributes." + ) + + +class AsyncSessionEvents(orm_event.SessionEvents): + _target_class_doc = "SomeSession" + _dispatch_target = AsyncSession + + @classmethod + def _listen(cls, event_key, retval=False): + raise NotImplementedError( + "asynchronous events are not implemented at this time. Apply " + "synchronous listeners to the AsyncSession.sync_session." + ) diff --git a/lib/sqlalchemy/ext/asyncio/session.py b/lib/sqlalchemy/ext/asyncio/session.py index cb06aa26d..4ae1fb385 100644 --- a/lib/sqlalchemy/ext/asyncio/session.py +++ b/lib/sqlalchemy/ext/asyncio/session.py @@ -1,6 +1,5 @@ from typing import Any from typing import Callable -from typing import List from typing import Mapping from typing import Optional @@ -15,6 +14,35 @@ from ...sql import Executable from ...util.concurrency import greenlet_spawn +@util.create_proxy_methods( + Session, + ":class:`_orm.Session`", + ":class:`_asyncio.AsyncSession`", + classmethods=["object_session", "identity_key"], + methods=[ + "__contains__", + "__iter__", + "add", + "add_all", + "delete", + "expire", + "expire_all", + "expunge", + "expunge_all", + "get_bind", + "is_modified", + ], + attributes=[ + "dirty", + "deleted", + "new", + "identity_map", + "is_active", + "autoflush", + "no_autoflush", + "info", + ], +) class AsyncSession: """Asyncio version of :class:`_orm.Session`. @@ -23,6 +51,16 @@ class AsyncSession: """ + __slots__ = ( + "binds", + "bind", + "sync_session", + "_proxied", + "_slots_dispatch", + ) + + dispatch = None + def __init__( self, bind: AsyncEngine = None, @@ -31,46 +69,18 @@ class AsyncSession: ): kw["future"] = True if bind: + self.bind = engine bind = engine._get_sync_engine(bind) if binds: + self.binds = binds binds = { key: engine._get_sync_engine(b) for key, b in binds.items() } - self.sync_session = Session(bind=bind, binds=binds, **kw) - - def add(self, instance: object) -> None: - """Place an object in this :class:`_asyncio.AsyncSession`. - - .. seealso:: - - :meth:`_orm.Session.add` - - """ - self.sync_session.add(instance) - - def add_all(self, instances: List[object]) -> None: - """Add the given collection of instances to this - :class:`_asyncio.AsyncSession`.""" - - self.sync_session.add_all(instances) - - def expire_all(self): - """Expires all persistent instances within this Session. - - See :meth:`_orm.Session.expire_all` for usage details. - - """ - self.sync_session.expire_all() - - def expire(self, instance, attribute_names=None): - """Expire the attributes on an instance. - - See :meth:`._orm.Session.expire` for usage details. - - """ - self.sync_session.expire() + self.sync_session = self._proxied = Session( + bind=bind, binds=binds, **kw + ) async def refresh( self, instance, attribute_names=None, with_for_update=None @@ -178,8 +188,17 @@ class AsyncSession: :class:`.Session` object's transactional state. """ + + # POSSIBLY TODO: here, we see that the sync engine / connection + # that are generated from AsyncEngine / AsyncConnection don't + # provide any backlink from those sync objects back out to the + # async ones. it's not *too* big a deal since AsyncEngine/Connection + # are just proxies and all the state is actually in the sync + # version of things. However! it has to stay that way :) sync_connection = await greenlet_spawn(self.sync_session.connection) - return engine.AsyncConnection(sync_connection.engine, sync_connection) + return engine.AsyncConnection( + engine.AsyncEngine(sync_connection.engine), sync_connection + ) def begin(self, **kw): """Return an :class:`_asyncio.AsyncSessionTransaction` object. @@ -218,14 +237,22 @@ class AsyncSession: return AsyncSessionTransaction(self, nested=True) async def rollback(self): + """Rollback the current transaction in progress.""" return await greenlet_spawn(self.sync_session.rollback) async def commit(self): + """Commit the current transaction in progress.""" return await greenlet_spawn(self.sync_session.commit) async def close(self): + """Close this :class:`_asyncio.AsyncSession`.""" return await greenlet_spawn(self.sync_session.close) + @classmethod + async def close_all(self): + """Close all :class:`_asyncio.AsyncSession` sessions.""" + return await greenlet_spawn(self.sync_session.close_all) + async def __aenter__(self): return self |