summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/util/_concurrency_py3k.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2022-02-15 23:43:51 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2022-02-17 14:45:04 -0500
commit5157e0aa542f390242dd7a6d27a6ce1663230e46 (patch)
tree113f0e5a83e8229c7d0cb9e9c47387e1d703cb29 /lib/sqlalchemy/util/_concurrency_py3k.py
parent20213fd1f27fea51015d753bf94c6f40674ae86f (diff)
downloadsqlalchemy-5157e0aa542f390242dd7a6d27a6ce1663230e46.tar.gz
pep-484 for pool
also extends into some areas of utils, events and others as needed. Formalizes a public hierarchy for pool API, with ManagesConnection -> PoolProxiedConnection / ConnectionPoolEntry for connectionfairy / connectionrecord, which are now what's exposed in the event API and other APIs. all public API docs moved to the new objects. Corrects the mypy plugin's check for sqlalchemy-stubs not being insatlled, which has to be imported using the dash in the name to be effective. Change-Id: I16c2cb43b2e840d28e70a015f370a768e70f3581
Diffstat (limited to 'lib/sqlalchemy/util/_concurrency_py3k.py')
-rw-r--r--lib/sqlalchemy/util/_concurrency_py3k.py78
1 files changed, 56 insertions, 22 deletions
diff --git a/lib/sqlalchemy/util/_concurrency_py3k.py b/lib/sqlalchemy/util/_concurrency_py3k.py
index fa2066702..b17b408dd 100644
--- a/lib/sqlalchemy/util/_concurrency_py3k.py
+++ b/lib/sqlalchemy/util/_concurrency_py3k.py
@@ -10,13 +10,37 @@ from contextvars import copy_context as _copy_context
import sys
import typing
from typing import Any
+from typing import Awaitable
from typing import Callable
from typing import Coroutine
-
-import greenlet # type: ignore # noqa
+from typing import TypeVar
from .langhelpers import memoized_property
from .. import exc
+from ..util.typing import Protocol
+
+if typing.TYPE_CHECKING:
+
+ class greenlet(Protocol):
+
+ dead: bool
+
+ def __init__(self, fn: Callable[..., Any], driver: "greenlet"):
+ ...
+
+ def throw(self, *arg: Any) -> Any:
+ ...
+
+ def switch(self, value: Any) -> Any:
+ ...
+
+ def getcurrent() -> greenlet:
+ ...
+
+else:
+ from greenlet import getcurrent
+ from greenlet import greenlet
+
if not typing.TYPE_CHECKING:
try:
@@ -24,12 +48,14 @@ if not typing.TYPE_CHECKING:
# If greenlet.gr_context is present in current version of greenlet,
# it will be set with a copy of the current context on creation.
# Refs: https://github.com/python-greenlet/greenlet/pull/198
- getattr(greenlet.greenlet, "gr_context")
+ getattr(greenlet, "gr_context")
except (ImportError, AttributeError):
_copy_context = None # noqa
+_T = TypeVar("_T", bound=Any)
-def is_exit_exception(e):
+
+def is_exit_exception(e: BaseException) -> bool:
# note asyncio.CancelledError is already BaseException
# so was an exit exception in any case
return not isinstance(e, Exception) or isinstance(
@@ -42,15 +68,17 @@ def is_exit_exception(e):
# Issue for context: https://github.com/python-greenlet/greenlet/issues/173
-class _AsyncIoGreenlet(greenlet.greenlet): # type: ignore
- def __init__(self, fn, driver):
- greenlet.greenlet.__init__(self, fn, driver)
+class _AsyncIoGreenlet(greenlet): # type: ignore
+ dead: bool
+
+ def __init__(self, fn: Callable[..., Any], driver: greenlet):
+ greenlet.__init__(self, fn, driver)
self.driver = driver
if _copy_context is not None:
self.gr_context = _copy_context()
-def await_only(awaitable: Coroutine[Any, Any, Any]) -> Any:
+def await_only(awaitable: Awaitable[_T]) -> _T:
"""Awaits an async function in a sync method.
The sync method must be inside a :func:`greenlet_spawn` context.
@@ -60,7 +88,7 @@ def await_only(awaitable: Coroutine[Any, Any, Any]) -> Any:
"""
# this is called in the context greenlet while running fn
- current = greenlet.getcurrent()
+ current = getcurrent()
if not isinstance(current, _AsyncIoGreenlet):
raise exc.MissingGreenlet(
"greenlet_spawn has not been called; can't call await_() here. "
@@ -71,10 +99,10 @@ def await_only(awaitable: Coroutine[Any, Any, Any]) -> Any:
# a coroutine to run. Once the awaitable is done, the driver greenlet
# switches back to this greenlet with the result of awaitable that is
# then returned to the caller (or raised as error)
- return current.driver.switch(awaitable)
+ return current.driver.switch(awaitable) # type: ignore[no-any-return]
-def await_fallback(awaitable: Coroutine[Any, Any, Any]) -> Any:
+def await_fallback(awaitable: Awaitable[_T]) -> _T:
"""Awaits an async function in a sync method.
The sync method must be inside a :func:`greenlet_spawn` context.
@@ -83,8 +111,9 @@ def await_fallback(awaitable: Coroutine[Any, Any, Any]) -> Any:
:param awaitable: The coroutine to call.
"""
+
# this is called in the context greenlet while running fn
- current = greenlet.getcurrent()
+ current = getcurrent()
if not isinstance(current, _AsyncIoGreenlet):
loop = get_event_loop()
if loop.is_running():
@@ -93,9 +122,9 @@ def await_fallback(awaitable: Coroutine[Any, Any, Any]) -> Any:
"loop is already running; can't call await_() here. "
"Was IO attempted in an unexpected place?"
)
- return loop.run_until_complete(awaitable)
+ return loop.run_until_complete(awaitable) # type: ignore[no-any-return] # noqa E501
- return current.driver.switch(awaitable)
+ return current.driver.switch(awaitable) # type: ignore[no-any-return]
async def greenlet_spawn(
@@ -114,7 +143,7 @@ async def greenlet_spawn(
:param \\*\\*kwargs: Keyword arguments to pass to the ``fn`` callable.
"""
- context = _AsyncIoGreenlet(fn, greenlet.getcurrent())
+ context = _AsyncIoGreenlet(fn, getcurrent())
# runs the function synchronously in gl greenlet. If the execution
# is interrupted by await_, context is not dead and result is a
# coroutine to wait. If the context is dead the function has
@@ -149,21 +178,23 @@ async def greenlet_spawn(
class AsyncAdaptedLock:
@memoized_property
- def mutex(self):
+ def mutex(self) -> asyncio.Lock:
# there should not be a race here for coroutines creating the
# new lock as we are not using await, so therefore no concurrency
return asyncio.Lock()
- def __enter__(self):
+ def __enter__(self) -> bool:
# await is used to acquire the lock only after the first calling
# coroutine has created the mutex.
return await_fallback(self.mutex.acquire())
- def __exit__(self, *arg, **kw):
+ def __exit__(self, *arg: Any, **kw: Any) -> None:
self.mutex.release()
-def _util_async_run_coroutine_function(fn, *args, **kwargs):
+def _util_async_run_coroutine_function(
+ fn: Callable[..., Coroutine[Any, Any, Any]], *args: Any, **kwargs: Any
+) -> Any:
"""for test suite/ util only"""
loop = get_event_loop()
@@ -175,7 +206,10 @@ def _util_async_run_coroutine_function(fn, *args, **kwargs):
return loop.run_until_complete(fn(*args, **kwargs))
-def _util_async_run(fn, *args, **kwargs):
+def _util_async_run(
+ fn: Callable[..., Coroutine[Any, Any, Any]], *args: Any, **kwargs: Any
+) -> Any:
+
"""for test suite/ util only"""
loop = get_event_loop()
@@ -183,11 +217,11 @@ def _util_async_run(fn, *args, **kwargs):
return loop.run_until_complete(greenlet_spawn(fn, *args, **kwargs))
else:
# allow for a wrapped test function to call another
- assert isinstance(greenlet.getcurrent(), _AsyncIoGreenlet)
+ assert isinstance(getcurrent(), _AsyncIoGreenlet)
return fn(*args, **kwargs)
-def get_event_loop():
+def get_event_loop() -> asyncio.AbstractEventLoop:
"""vendor asyncio.get_event_loop() for python 3.7 and above.
Python 3.10 deprecates get_event_loop() as a standalone.