diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-04-10 15:42:35 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-04-11 22:11:07 -0400 |
commit | a45e2284dad17fbbba3bea9d5e5304aab21c8c94 (patch) | |
tree | ac31614f2d53059570e2edffe731baf384baea23 /lib/sqlalchemy/ext/asyncio/scoping.py | |
parent | aa9cd878e8249a4a758c7f968e929e92fede42a5 (diff) | |
download | sqlalchemy-a45e2284dad17fbbba3bea9d5e5304aab21c8c94.tar.gz |
pep-484: asyncio
in this patch the asyncio/events.py module, which
existed only to raise errors when trying to attach event
listeners, is removed, as we were already coding an asyncio-specific
workaround in upstream Pool / Session to raise this error,
just moved the error out to the target and did the same thing
for Engine.
We also add an async_sessionmaker class. The initial rationale
here is because sessionmaker() is hardcoded to Session subclasses,
and there's not a way to get the use case of
sessionmaker(class_=AsyncSession) to type correctly without changing
the sessionmaker() symbol itself to be a function and not a class,
which gets too complicated for what this is. Additionally,
_SessionClassMethods has only three methods on it, one of which
is not usable with asyncio (close_all()), the others
not generally used from the session class.
Change-Id: I064a5fa5d91cc8d5bbe9597437536e37b4e801fe
Diffstat (limited to 'lib/sqlalchemy/ext/asyncio/scoping.py')
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/scoping.py | 241 |
1 files changed, 177 insertions, 64 deletions
diff --git a/lib/sqlalchemy/ext/asyncio/scoping.py b/lib/sqlalchemy/ext/asyncio/scoping.py index 0503076aa..0d6ae92b4 100644 --- a/lib/sqlalchemy/ext/asyncio/scoping.py +++ b/lib/sqlalchemy/ext/asyncio/scoping.py @@ -8,12 +8,50 @@ from __future__ import annotations from typing import Any - +from typing import Callable +from typing import Iterable +from typing import Iterator +from typing import Optional +from typing import Sequence +from typing import Tuple +from typing import Type +from typing import TYPE_CHECKING +from typing import Union + +from .session import async_sessionmaker from .session import AsyncSession +from ... import exc as sa_exc from ... import util -from ...orm.scoping import ScopedSessionMixin +from ...orm.session import Session from ...util import create_proxy_methods from ...util import ScopedRegistry +from ...util import warn +from ...util import warn_deprecated + +if TYPE_CHECKING: + from .engine import AsyncConnection + from .result import AsyncResult + from .result import AsyncScalarResult + from .session import AsyncSessionTransaction + from ...engine import Connection + from ...engine import Engine + from ...engine import Result + from ...engine import Row + from ...engine.interfaces import _CoreAnyExecuteParams + from ...engine.interfaces import _CoreSingleExecuteParams + from ...engine.interfaces import _ExecuteOptions + from ...engine.interfaces import _ExecuteOptionsParameter + from ...engine.result import ScalarResult + from ...orm._typing import _IdentityKeyType + from ...orm._typing import _O + from ...orm.interfaces import ORMOption + from ...orm.session import _BindArguments + from ...orm.session import _EntityBindKey + from ...orm.session import _PKIdentityArgument + from ...orm.session import _SessionBind + from ...sql.base import Executable + from ...sql.elements import ClauseElement + from ...sql.selectable import ForUpdateArg @create_proxy_methods( @@ -62,7 +100,7 @@ from ...util import ScopedRegistry "info", ], ) -class async_scoped_session(ScopedSessionMixin): +class async_scoped_session: """Provides scoped management of :class:`.AsyncSession` objects. See the section :ref:`asyncio_scoped_session` for usage details. @@ -74,17 +112,23 @@ class async_scoped_session(ScopedSessionMixin): _support_async = True - def __init__(self, session_factory, scopefunc): + session_factory: async_sessionmaker + """The `session_factory` provided to `__init__` is stored in this + attribute and may be accessed at a later time. This can be useful when + a new non-scoped :class:`.AsyncSession` is needed.""" + + registry: ScopedRegistry[AsyncSession] + + def __init__( + self, + session_factory: async_sessionmaker, + scopefunc: Callable[[], Any], + ): """Construct a new :class:`_asyncio.async_scoped_session`. :param session_factory: a factory to create new :class:`_asyncio.AsyncSession` instances. This is usually, but not necessarily, an instance - of :class:`_orm.sessionmaker` which itself was passed the - :class:`_asyncio.AsyncSession` to its :paramref:`_orm.sessionmaker.class_` - parameter:: - - async_session_factory = sessionmaker(some_async_engine, class_= AsyncSession) - AsyncSession = async_scoped_session(async_session_factory, scopefunc=current_task) + of :class:`_asyncio.async_sessionmaker`. :param scopefunc: function which defines the current scope. A function such as ``asyncio.current_task`` @@ -96,10 +140,59 @@ class async_scoped_session(ScopedSessionMixin): self.registry = ScopedRegistry(session_factory, scopefunc) @property - def _proxied(self): + def _proxied(self) -> AsyncSession: return self.registry() - async def remove(self): + def __call__(self, **kw: Any) -> AsyncSession: + r"""Return the current :class:`.AsyncSession`, creating it + using the :attr:`.scoped_session.session_factory` if not present. + + :param \**kw: Keyword arguments will be passed to the + :attr:`.scoped_session.session_factory` callable, if an existing + :class:`.AsyncSession` is not present. If the + :class:`.AsyncSession` is present + and keyword arguments have been passed, + :exc:`~sqlalchemy.exc.InvalidRequestError` is raised. + + """ + if kw: + if self.registry.has(): + raise sa_exc.InvalidRequestError( + "Scoped session is already present; " + "no new arguments may be specified." + ) + else: + sess = self.session_factory(**kw) + self.registry.set(sess) + else: + sess = self.registry() + if not self._support_async and sess._is_asyncio: + warn_deprecated( + "Using `scoped_session` with asyncio is deprecated and " + "will raise an error in a future version. " + "Please use `async_scoped_session` instead.", + "1.4.23", + ) + return sess + + def configure(self, **kwargs: Any) -> None: + """reconfigure the :class:`.sessionmaker` used by this + :class:`.scoped_session`. + + See :meth:`.sessionmaker.configure`. + + """ + + if self.registry.has(): + warn( + "At least one scoped session is already present. " + " configure() can not affect sessions that have " + "already been created." + ) + + self.session_factory.configure(**kwargs) + + async def remove(self) -> None: """Dispose of the current :class:`.AsyncSession`, if present. Different from scoped_session's remove method, this method would use @@ -152,7 +245,9 @@ class async_scoped_session(ScopedSessionMixin): Proxied for the :class:`_orm.Session` class on behalf of the :class:`_asyncio.AsyncSession` class. - """ + + + """ # noqa: E501 return self._proxied.__iter__() @@ -199,7 +294,7 @@ class async_scoped_session(ScopedSessionMixin): return self._proxied.add_all(instances) - def begin(self): + def begin(self) -> AsyncSessionTransaction: r"""Return an :class:`_asyncio.AsyncSessionTransaction` object. .. container:: class_bases @@ -228,7 +323,7 @@ class async_scoped_session(ScopedSessionMixin): return self._proxied.begin() - def begin_nested(self): + def begin_nested(self) -> AsyncSessionTransaction: r"""Return an :class:`_asyncio.AsyncSessionTransaction` object which will begin a "nested" transaction, e.g. SAVEPOINT. @@ -247,7 +342,7 @@ class async_scoped_session(ScopedSessionMixin): return self._proxied.begin_nested() - async def close(self): + async def close(self) -> None: r"""Close out the transactional resources and ORM objects used by this :class:`_asyncio.AsyncSession`. @@ -284,7 +379,7 @@ class async_scoped_session(ScopedSessionMixin): return await self._proxied.close() - async def commit(self): + async def commit(self) -> None: r"""Commit the current transaction in progress. .. container:: class_bases @@ -296,7 +391,7 @@ class async_scoped_session(ScopedSessionMixin): return await self._proxied.commit() - async def connection(self, **kw): + async def connection(self, **kw: Any) -> AsyncConnection: r"""Return a :class:`_asyncio.AsyncConnection` object corresponding to this :class:`.Session` object's transactional state. @@ -321,7 +416,7 @@ class async_scoped_session(ScopedSessionMixin): return await self._proxied.connection(**kw) - async def delete(self, instance): + async def delete(self, instance: object) -> None: r"""Mark an instance as deleted. .. container:: class_bases @@ -345,12 +440,12 @@ class async_scoped_session(ScopedSessionMixin): async def execute( self, - statement, - params=None, - execution_options=util.EMPTY_DICT, - bind_arguments=None, - **kw, - ): + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> Result: r"""Execute a statement and return a buffered :class:`_engine.Result` object. @@ -519,7 +614,7 @@ class async_scoped_session(ScopedSessionMixin): return self._proxied.expunge_all() - async def flush(self, objects=None): + async def flush(self, objects: Optional[Sequence[Any]] = None) -> None: r"""Flush all the object changes to the database. .. container:: class_bases @@ -538,13 +633,15 @@ class async_scoped_session(ScopedSessionMixin): async def get( self, - entity, - ident, - options=None, - populate_existing=False, - with_for_update=None, - identity_token=None, - ): + entity: _EntityBindKey[_O], + ident: _PKIdentityArgument, + *, + options: Optional[Sequence[ORMOption]] = None, + populate_existing: bool = False, + with_for_update: Optional[ForUpdateArg] = None, + identity_token: Optional[Any] = None, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + ) -> Optional[_O]: r"""Return an instance based on the given primary key identifier, or ``None`` if not found. @@ -568,9 +665,16 @@ class async_scoped_session(ScopedSessionMixin): populate_existing=populate_existing, with_for_update=with_for_update, identity_token=identity_token, + execution_options=execution_options, ) - def get_bind(self, mapper=None, clause=None, bind=None, **kw): + def get_bind( + self, + mapper: Optional[_EntityBindKey[_O]] = None, + clause: Optional[ClauseElement] = None, + bind: Optional[_SessionBind] = None, + **kw: Any, + ) -> Union[Engine, Connection]: r"""Return a "bind" to which the synchronous proxied :class:`_orm.Session` is bound. @@ -724,7 +828,7 @@ class async_scoped_session(ScopedSessionMixin): instance, include_collections=include_collections ) - async def invalidate(self): + async def invalidate(self) -> None: r"""Close this Session, using connection invalidation. .. container:: class_bases @@ -738,7 +842,13 @@ class async_scoped_session(ScopedSessionMixin): return await self._proxied.invalidate() - async def merge(self, instance, load=True, options=None): + async def merge( + self, + instance: _O, + *, + load: bool = True, + options: Optional[Sequence[ORMOption]] = None, + ) -> _O: r"""Copy the state of a given instance into a corresponding instance within this :class:`_asyncio.AsyncSession`. @@ -757,8 +867,11 @@ class async_scoped_session(ScopedSessionMixin): return await self._proxied.merge(instance, load=load, options=options) async def refresh( - self, instance, attribute_names=None, with_for_update=None - ): + self, + instance: object, + attribute_names: Optional[Iterable[str]] = None, + with_for_update: Optional[ForUpdateArg] = None, + ) -> None: r"""Expire and refresh the attributes on the given instance. .. container:: class_bases @@ -785,7 +898,7 @@ class async_scoped_session(ScopedSessionMixin): with_for_update=with_for_update, ) - async def rollback(self): + async def rollback(self) -> None: r"""Rollback the current transaction in progress. .. container:: class_bases @@ -799,12 +912,12 @@ class async_scoped_session(ScopedSessionMixin): async def scalar( self, - statement, - params=None, - execution_options=util.EMPTY_DICT, - bind_arguments=None, - **kw, - ): + statement: Executable, + params: Optional[_CoreSingleExecuteParams] = None, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> Any: r"""Execute a statement and return a scalar result. .. container:: class_bases @@ -829,12 +942,12 @@ class async_scoped_session(ScopedSessionMixin): async def scalars( self, - statement, - params=None, - execution_options=util.EMPTY_DICT, - bind_arguments=None, - **kw, - ): + statement: Executable, + params: Optional[_CoreSingleExecuteParams] = None, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> ScalarResult[Any]: r"""Execute a statement and return scalar results. .. container:: class_bases @@ -865,12 +978,12 @@ class async_scoped_session(ScopedSessionMixin): async def stream( self, - statement, - params=None, - execution_options=util.EMPTY_DICT, - bind_arguments=None, - **kw, - ): + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncResult: r"""Execute a statement and return a streaming :class:`_asyncio.AsyncResult` object. @@ -892,12 +1005,12 @@ class async_scoped_session(ScopedSessionMixin): async def stream_scalars( self, - statement, - params=None, - execution_options=util.EMPTY_DICT, - bind_arguments=None, - **kw, - ): + statement: Executable, + params: Optional[_CoreSingleExecuteParams] = None, + execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncScalarResult[Any]: r"""Execute a statement and return a stream of scalar results. .. container:: class_bases @@ -1159,7 +1272,7 @@ class async_scoped_session(ScopedSessionMixin): return self._proxied.info @classmethod - async def close_all(self): + async def close_all(self) -> None: r"""Close all :class:`_asyncio.AsyncSession` sessions. .. container:: class_bases |