diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-02-15 23:43:51 -0500 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-02-17 14:45:04 -0500 |
commit | 5157e0aa542f390242dd7a6d27a6ce1663230e46 (patch) | |
tree | 113f0e5a83e8229c7d0cb9e9c47387e1d703cb29 /lib/sqlalchemy/util/_concurrency_py3k.py | |
parent | 20213fd1f27fea51015d753bf94c6f40674ae86f (diff) | |
download | sqlalchemy-5157e0aa542f390242dd7a6d27a6ce1663230e46.tar.gz |
pep-484 for pool
also extends into some areas of utils, events and others
as needed.
Formalizes a public hierarchy for pool API,
with ManagesConnection -> PoolProxiedConnection /
ConnectionPoolEntry for connectionfairy / connectionrecord,
which are now what's exposed in the event API and other
APIs. all public API docs moved to the new objects.
Corrects the mypy plugin's check for sqlalchemy-stubs
not being insatlled, which has to be imported using the
dash in the name to be effective.
Change-Id: I16c2cb43b2e840d28e70a015f370a768e70f3581
Diffstat (limited to 'lib/sqlalchemy/util/_concurrency_py3k.py')
-rw-r--r-- | lib/sqlalchemy/util/_concurrency_py3k.py | 78 |
1 files changed, 56 insertions, 22 deletions
diff --git a/lib/sqlalchemy/util/_concurrency_py3k.py b/lib/sqlalchemy/util/_concurrency_py3k.py index fa2066702..b17b408dd 100644 --- a/lib/sqlalchemy/util/_concurrency_py3k.py +++ b/lib/sqlalchemy/util/_concurrency_py3k.py @@ -10,13 +10,37 @@ from contextvars import copy_context as _copy_context import sys import typing from typing import Any +from typing import Awaitable from typing import Callable from typing import Coroutine - -import greenlet # type: ignore # noqa +from typing import TypeVar from .langhelpers import memoized_property from .. import exc +from ..util.typing import Protocol + +if typing.TYPE_CHECKING: + + class greenlet(Protocol): + + dead: bool + + def __init__(self, fn: Callable[..., Any], driver: "greenlet"): + ... + + def throw(self, *arg: Any) -> Any: + ... + + def switch(self, value: Any) -> Any: + ... + + def getcurrent() -> greenlet: + ... + +else: + from greenlet import getcurrent + from greenlet import greenlet + if not typing.TYPE_CHECKING: try: @@ -24,12 +48,14 @@ if not typing.TYPE_CHECKING: # If greenlet.gr_context is present in current version of greenlet, # it will be set with a copy of the current context on creation. # Refs: https://github.com/python-greenlet/greenlet/pull/198 - getattr(greenlet.greenlet, "gr_context") + getattr(greenlet, "gr_context") except (ImportError, AttributeError): _copy_context = None # noqa +_T = TypeVar("_T", bound=Any) -def is_exit_exception(e): + +def is_exit_exception(e: BaseException) -> bool: # note asyncio.CancelledError is already BaseException # so was an exit exception in any case return not isinstance(e, Exception) or isinstance( @@ -42,15 +68,17 @@ def is_exit_exception(e): # Issue for context: https://github.com/python-greenlet/greenlet/issues/173 -class _AsyncIoGreenlet(greenlet.greenlet): # type: ignore - def __init__(self, fn, driver): - greenlet.greenlet.__init__(self, fn, driver) +class _AsyncIoGreenlet(greenlet): # type: ignore + dead: bool + + def __init__(self, fn: Callable[..., Any], driver: greenlet): + greenlet.__init__(self, fn, driver) self.driver = driver if _copy_context is not None: self.gr_context = _copy_context() -def await_only(awaitable: Coroutine[Any, Any, Any]) -> Any: +def await_only(awaitable: Awaitable[_T]) -> _T: """Awaits an async function in a sync method. The sync method must be inside a :func:`greenlet_spawn` context. @@ -60,7 +88,7 @@ def await_only(awaitable: Coroutine[Any, Any, Any]) -> Any: """ # this is called in the context greenlet while running fn - current = greenlet.getcurrent() + current = getcurrent() if not isinstance(current, _AsyncIoGreenlet): raise exc.MissingGreenlet( "greenlet_spawn has not been called; can't call await_() here. " @@ -71,10 +99,10 @@ def await_only(awaitable: Coroutine[Any, Any, Any]) -> Any: # a coroutine to run. Once the awaitable is done, the driver greenlet # switches back to this greenlet with the result of awaitable that is # then returned to the caller (or raised as error) - return current.driver.switch(awaitable) + return current.driver.switch(awaitable) # type: ignore[no-any-return] -def await_fallback(awaitable: Coroutine[Any, Any, Any]) -> Any: +def await_fallback(awaitable: Awaitable[_T]) -> _T: """Awaits an async function in a sync method. The sync method must be inside a :func:`greenlet_spawn` context. @@ -83,8 +111,9 @@ def await_fallback(awaitable: Coroutine[Any, Any, Any]) -> Any: :param awaitable: The coroutine to call. """ + # this is called in the context greenlet while running fn - current = greenlet.getcurrent() + current = getcurrent() if not isinstance(current, _AsyncIoGreenlet): loop = get_event_loop() if loop.is_running(): @@ -93,9 +122,9 @@ def await_fallback(awaitable: Coroutine[Any, Any, Any]) -> Any: "loop is already running; can't call await_() here. " "Was IO attempted in an unexpected place?" ) - return loop.run_until_complete(awaitable) + return loop.run_until_complete(awaitable) # type: ignore[no-any-return] # noqa E501 - return current.driver.switch(awaitable) + return current.driver.switch(awaitable) # type: ignore[no-any-return] async def greenlet_spawn( @@ -114,7 +143,7 @@ async def greenlet_spawn( :param \\*\\*kwargs: Keyword arguments to pass to the ``fn`` callable. """ - context = _AsyncIoGreenlet(fn, greenlet.getcurrent()) + context = _AsyncIoGreenlet(fn, getcurrent()) # runs the function synchronously in gl greenlet. If the execution # is interrupted by await_, context is not dead and result is a # coroutine to wait. If the context is dead the function has @@ -149,21 +178,23 @@ async def greenlet_spawn( class AsyncAdaptedLock: @memoized_property - def mutex(self): + def mutex(self) -> asyncio.Lock: # there should not be a race here for coroutines creating the # new lock as we are not using await, so therefore no concurrency return asyncio.Lock() - def __enter__(self): + def __enter__(self) -> bool: # await is used to acquire the lock only after the first calling # coroutine has created the mutex. return await_fallback(self.mutex.acquire()) - def __exit__(self, *arg, **kw): + def __exit__(self, *arg: Any, **kw: Any) -> None: self.mutex.release() -def _util_async_run_coroutine_function(fn, *args, **kwargs): +def _util_async_run_coroutine_function( + fn: Callable[..., Coroutine[Any, Any, Any]], *args: Any, **kwargs: Any +) -> Any: """for test suite/ util only""" loop = get_event_loop() @@ -175,7 +206,10 @@ def _util_async_run_coroutine_function(fn, *args, **kwargs): return loop.run_until_complete(fn(*args, **kwargs)) -def _util_async_run(fn, *args, **kwargs): +def _util_async_run( + fn: Callable[..., Coroutine[Any, Any, Any]], *args: Any, **kwargs: Any +) -> Any: + """for test suite/ util only""" loop = get_event_loop() @@ -183,11 +217,11 @@ def _util_async_run(fn, *args, **kwargs): return loop.run_until_complete(greenlet_spawn(fn, *args, **kwargs)) else: # allow for a wrapped test function to call another - assert isinstance(greenlet.getcurrent(), _AsyncIoGreenlet) + assert isinstance(getcurrent(), _AsyncIoGreenlet) return fn(*args, **kwargs) -def get_event_loop(): +def get_event_loop() -> asyncio.AbstractEventLoop: """vendor asyncio.get_event_loop() for python 3.7 and above. Python 3.10 deprecates get_event_loop() as a standalone. |