summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/ext/asyncio/scoping.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2022-04-10 15:42:35 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2022-04-11 22:11:07 -0400
commita45e2284dad17fbbba3bea9d5e5304aab21c8c94 (patch)
treeac31614f2d53059570e2edffe731baf384baea23 /lib/sqlalchemy/ext/asyncio/scoping.py
parentaa9cd878e8249a4a758c7f968e929e92fede42a5 (diff)
downloadsqlalchemy-a45e2284dad17fbbba3bea9d5e5304aab21c8c94.tar.gz
pep-484: asyncio
in this patch the asyncio/events.py module, which existed only to raise errors when trying to attach event listeners, is removed, as we were already coding an asyncio-specific workaround in upstream Pool / Session to raise this error, just moved the error out to the target and did the same thing for Engine. We also add an async_sessionmaker class. The initial rationale here is because sessionmaker() is hardcoded to Session subclasses, and there's not a way to get the use case of sessionmaker(class_=AsyncSession) to type correctly without changing the sessionmaker() symbol itself to be a function and not a class, which gets too complicated for what this is. Additionally, _SessionClassMethods has only three methods on it, one of which is not usable with asyncio (close_all()), the others not generally used from the session class. Change-Id: I064a5fa5d91cc8d5bbe9597437536e37b4e801fe
Diffstat (limited to 'lib/sqlalchemy/ext/asyncio/scoping.py')
-rw-r--r--lib/sqlalchemy/ext/asyncio/scoping.py241
1 files changed, 177 insertions, 64 deletions
diff --git a/lib/sqlalchemy/ext/asyncio/scoping.py b/lib/sqlalchemy/ext/asyncio/scoping.py
index 0503076aa..0d6ae92b4 100644
--- a/lib/sqlalchemy/ext/asyncio/scoping.py
+++ b/lib/sqlalchemy/ext/asyncio/scoping.py
@@ -8,12 +8,50 @@
from __future__ import annotations
from typing import Any
-
+from typing import Callable
+from typing import Iterable
+from typing import Iterator
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+from typing import Type
+from typing import TYPE_CHECKING
+from typing import Union
+
+from .session import async_sessionmaker
from .session import AsyncSession
+from ... import exc as sa_exc
from ... import util
-from ...orm.scoping import ScopedSessionMixin
+from ...orm.session import Session
from ...util import create_proxy_methods
from ...util import ScopedRegistry
+from ...util import warn
+from ...util import warn_deprecated
+
+if TYPE_CHECKING:
+ from .engine import AsyncConnection
+ from .result import AsyncResult
+ from .result import AsyncScalarResult
+ from .session import AsyncSessionTransaction
+ from ...engine import Connection
+ from ...engine import Engine
+ from ...engine import Result
+ from ...engine import Row
+ from ...engine.interfaces import _CoreAnyExecuteParams
+ from ...engine.interfaces import _CoreSingleExecuteParams
+ from ...engine.interfaces import _ExecuteOptions
+ from ...engine.interfaces import _ExecuteOptionsParameter
+ from ...engine.result import ScalarResult
+ from ...orm._typing import _IdentityKeyType
+ from ...orm._typing import _O
+ from ...orm.interfaces import ORMOption
+ from ...orm.session import _BindArguments
+ from ...orm.session import _EntityBindKey
+ from ...orm.session import _PKIdentityArgument
+ from ...orm.session import _SessionBind
+ from ...sql.base import Executable
+ from ...sql.elements import ClauseElement
+ from ...sql.selectable import ForUpdateArg
@create_proxy_methods(
@@ -62,7 +100,7 @@ from ...util import ScopedRegistry
"info",
],
)
-class async_scoped_session(ScopedSessionMixin):
+class async_scoped_session:
"""Provides scoped management of :class:`.AsyncSession` objects.
See the section :ref:`asyncio_scoped_session` for usage details.
@@ -74,17 +112,23 @@ class async_scoped_session(ScopedSessionMixin):
_support_async = True
- def __init__(self, session_factory, scopefunc):
+ session_factory: async_sessionmaker
+ """The `session_factory` provided to `__init__` is stored in this
+ attribute and may be accessed at a later time. This can be useful when
+ a new non-scoped :class:`.AsyncSession` is needed."""
+
+ registry: ScopedRegistry[AsyncSession]
+
+ def __init__(
+ self,
+ session_factory: async_sessionmaker,
+ scopefunc: Callable[[], Any],
+ ):
"""Construct a new :class:`_asyncio.async_scoped_session`.
:param session_factory: a factory to create new :class:`_asyncio.AsyncSession`
instances. This is usually, but not necessarily, an instance
- of :class:`_orm.sessionmaker` which itself was passed the
- :class:`_asyncio.AsyncSession` to its :paramref:`_orm.sessionmaker.class_`
- parameter::
-
- async_session_factory = sessionmaker(some_async_engine, class_= AsyncSession)
- AsyncSession = async_scoped_session(async_session_factory, scopefunc=current_task)
+ of :class:`_asyncio.async_sessionmaker`.
:param scopefunc: function which defines
the current scope. A function such as ``asyncio.current_task``
@@ -96,10 +140,59 @@ class async_scoped_session(ScopedSessionMixin):
self.registry = ScopedRegistry(session_factory, scopefunc)
@property
- def _proxied(self):
+ def _proxied(self) -> AsyncSession:
return self.registry()
- async def remove(self):
+ def __call__(self, **kw: Any) -> AsyncSession:
+ r"""Return the current :class:`.AsyncSession`, creating it
+ using the :attr:`.scoped_session.session_factory` if not present.
+
+ :param \**kw: Keyword arguments will be passed to the
+ :attr:`.scoped_session.session_factory` callable, if an existing
+ :class:`.AsyncSession` is not present. If the
+ :class:`.AsyncSession` is present
+ and keyword arguments have been passed,
+ :exc:`~sqlalchemy.exc.InvalidRequestError` is raised.
+
+ """
+ if kw:
+ if self.registry.has():
+ raise sa_exc.InvalidRequestError(
+ "Scoped session is already present; "
+ "no new arguments may be specified."
+ )
+ else:
+ sess = self.session_factory(**kw)
+ self.registry.set(sess)
+ else:
+ sess = self.registry()
+ if not self._support_async and sess._is_asyncio:
+ warn_deprecated(
+ "Using `scoped_session` with asyncio is deprecated and "
+ "will raise an error in a future version. "
+ "Please use `async_scoped_session` instead.",
+ "1.4.23",
+ )
+ return sess
+
+ def configure(self, **kwargs: Any) -> None:
+ """reconfigure the :class:`.sessionmaker` used by this
+ :class:`.scoped_session`.
+
+ See :meth:`.sessionmaker.configure`.
+
+ """
+
+ if self.registry.has():
+ warn(
+ "At least one scoped session is already present. "
+ " configure() can not affect sessions that have "
+ "already been created."
+ )
+
+ self.session_factory.configure(**kwargs)
+
+ async def remove(self) -> None:
"""Dispose of the current :class:`.AsyncSession`, if present.
Different from scoped_session's remove method, this method would use
@@ -152,7 +245,9 @@ class async_scoped_session(ScopedSessionMixin):
Proxied for the :class:`_orm.Session` class on
behalf of the :class:`_asyncio.AsyncSession` class.
- """
+
+
+ """ # noqa: E501
return self._proxied.__iter__()
@@ -199,7 +294,7 @@ class async_scoped_session(ScopedSessionMixin):
return self._proxied.add_all(instances)
- def begin(self):
+ def begin(self) -> AsyncSessionTransaction:
r"""Return an :class:`_asyncio.AsyncSessionTransaction` object.
.. container:: class_bases
@@ -228,7 +323,7 @@ class async_scoped_session(ScopedSessionMixin):
return self._proxied.begin()
- def begin_nested(self):
+ def begin_nested(self) -> AsyncSessionTransaction:
r"""Return an :class:`_asyncio.AsyncSessionTransaction` object
which will begin a "nested" transaction, e.g. SAVEPOINT.
@@ -247,7 +342,7 @@ class async_scoped_session(ScopedSessionMixin):
return self._proxied.begin_nested()
- async def close(self):
+ async def close(self) -> None:
r"""Close out the transactional resources and ORM objects used by this
:class:`_asyncio.AsyncSession`.
@@ -284,7 +379,7 @@ class async_scoped_session(ScopedSessionMixin):
return await self._proxied.close()
- async def commit(self):
+ async def commit(self) -> None:
r"""Commit the current transaction in progress.
.. container:: class_bases
@@ -296,7 +391,7 @@ class async_scoped_session(ScopedSessionMixin):
return await self._proxied.commit()
- async def connection(self, **kw):
+ async def connection(self, **kw: Any) -> AsyncConnection:
r"""Return a :class:`_asyncio.AsyncConnection` object corresponding to
this :class:`.Session` object's transactional state.
@@ -321,7 +416,7 @@ class async_scoped_session(ScopedSessionMixin):
return await self._proxied.connection(**kw)
- async def delete(self, instance):
+ async def delete(self, instance: object) -> None:
r"""Mark an instance as deleted.
.. container:: class_bases
@@ -345,12 +440,12 @@ class async_scoped_session(ScopedSessionMixin):
async def execute(
self,
- statement,
- params=None,
- execution_options=util.EMPTY_DICT,
- bind_arguments=None,
- **kw,
- ):
+ statement: Executable,
+ params: Optional[_CoreAnyExecuteParams] = None,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> Result:
r"""Execute a statement and return a buffered
:class:`_engine.Result` object.
@@ -519,7 +614,7 @@ class async_scoped_session(ScopedSessionMixin):
return self._proxied.expunge_all()
- async def flush(self, objects=None):
+ async def flush(self, objects: Optional[Sequence[Any]] = None) -> None:
r"""Flush all the object changes to the database.
.. container:: class_bases
@@ -538,13 +633,15 @@ class async_scoped_session(ScopedSessionMixin):
async def get(
self,
- entity,
- ident,
- options=None,
- populate_existing=False,
- with_for_update=None,
- identity_token=None,
- ):
+ entity: _EntityBindKey[_O],
+ ident: _PKIdentityArgument,
+ *,
+ options: Optional[Sequence[ORMOption]] = None,
+ populate_existing: bool = False,
+ with_for_update: Optional[ForUpdateArg] = None,
+ identity_token: Optional[Any] = None,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ ) -> Optional[_O]:
r"""Return an instance based on the given primary key identifier,
or ``None`` if not found.
@@ -568,9 +665,16 @@ class async_scoped_session(ScopedSessionMixin):
populate_existing=populate_existing,
with_for_update=with_for_update,
identity_token=identity_token,
+ execution_options=execution_options,
)
- def get_bind(self, mapper=None, clause=None, bind=None, **kw):
+ def get_bind(
+ self,
+ mapper: Optional[_EntityBindKey[_O]] = None,
+ clause: Optional[ClauseElement] = None,
+ bind: Optional[_SessionBind] = None,
+ **kw: Any,
+ ) -> Union[Engine, Connection]:
r"""Return a "bind" to which the synchronous proxied :class:`_orm.Session`
is bound.
@@ -724,7 +828,7 @@ class async_scoped_session(ScopedSessionMixin):
instance, include_collections=include_collections
)
- async def invalidate(self):
+ async def invalidate(self) -> None:
r"""Close this Session, using connection invalidation.
.. container:: class_bases
@@ -738,7 +842,13 @@ class async_scoped_session(ScopedSessionMixin):
return await self._proxied.invalidate()
- async def merge(self, instance, load=True, options=None):
+ async def merge(
+ self,
+ instance: _O,
+ *,
+ load: bool = True,
+ options: Optional[Sequence[ORMOption]] = None,
+ ) -> _O:
r"""Copy the state of a given instance into a corresponding instance
within this :class:`_asyncio.AsyncSession`.
@@ -757,8 +867,11 @@ class async_scoped_session(ScopedSessionMixin):
return await self._proxied.merge(instance, load=load, options=options)
async def refresh(
- self, instance, attribute_names=None, with_for_update=None
- ):
+ self,
+ instance: object,
+ attribute_names: Optional[Iterable[str]] = None,
+ with_for_update: Optional[ForUpdateArg] = None,
+ ) -> None:
r"""Expire and refresh the attributes on the given instance.
.. container:: class_bases
@@ -785,7 +898,7 @@ class async_scoped_session(ScopedSessionMixin):
with_for_update=with_for_update,
)
- async def rollback(self):
+ async def rollback(self) -> None:
r"""Rollback the current transaction in progress.
.. container:: class_bases
@@ -799,12 +912,12 @@ class async_scoped_session(ScopedSessionMixin):
async def scalar(
self,
- statement,
- params=None,
- execution_options=util.EMPTY_DICT,
- bind_arguments=None,
- **kw,
- ):
+ statement: Executable,
+ params: Optional[_CoreSingleExecuteParams] = None,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> Any:
r"""Execute a statement and return a scalar result.
.. container:: class_bases
@@ -829,12 +942,12 @@ class async_scoped_session(ScopedSessionMixin):
async def scalars(
self,
- statement,
- params=None,
- execution_options=util.EMPTY_DICT,
- bind_arguments=None,
- **kw,
- ):
+ statement: Executable,
+ params: Optional[_CoreSingleExecuteParams] = None,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> ScalarResult[Any]:
r"""Execute a statement and return scalar results.
.. container:: class_bases
@@ -865,12 +978,12 @@ class async_scoped_session(ScopedSessionMixin):
async def stream(
self,
- statement,
- params=None,
- execution_options=util.EMPTY_DICT,
- bind_arguments=None,
- **kw,
- ):
+ statement: Executable,
+ params: Optional[_CoreAnyExecuteParams] = None,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> AsyncResult:
r"""Execute a statement and return a streaming
:class:`_asyncio.AsyncResult` object.
@@ -892,12 +1005,12 @@ class async_scoped_session(ScopedSessionMixin):
async def stream_scalars(
self,
- statement,
- params=None,
- execution_options=util.EMPTY_DICT,
- bind_arguments=None,
- **kw,
- ):
+ statement: Executable,
+ params: Optional[_CoreSingleExecuteParams] = None,
+ execution_options: _ExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> AsyncScalarResult[Any]:
r"""Execute a statement and return a stream of scalar results.
.. container:: class_bases
@@ -1159,7 +1272,7 @@ class async_scoped_session(ScopedSessionMixin):
return self._proxied.info
@classmethod
- async def close_all(self):
+ async def close_all(self) -> None:
r"""Close all :class:`_asyncio.AsyncSession` sessions.
.. container:: class_bases