summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/util/_concurrency_py3k.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/util/_concurrency_py3k.py')
-rw-r--r--lib/sqlalchemy/util/_concurrency_py3k.py181
1 files changed, 87 insertions, 94 deletions
diff --git a/lib/sqlalchemy/util/_concurrency_py3k.py b/lib/sqlalchemy/util/_concurrency_py3k.py
index ee3abe5fa..5d11bf92c 100644
--- a/lib/sqlalchemy/util/_concurrency_py3k.py
+++ b/lib/sqlalchemy/util/_concurrency_py3k.py
@@ -4,110 +4,103 @@ from typing import Any
from typing import Callable
from typing import Coroutine
+import greenlet
+
from .. import exc
-try:
- import greenlet
- # implementation based on snaury gist at
- # https://gist.github.com/snaury/202bf4f22c41ca34e56297bae5f33fef
- # Issue for context: https://github.com/python-greenlet/greenlet/issues/173
+# implementation based on snaury gist at
+# https://gist.github.com/snaury/202bf4f22c41ca34e56297bae5f33fef
+# Issue for context: https://github.com/python-greenlet/greenlet/issues/173
+
+
+class _AsyncIoGreenlet(greenlet.greenlet):
+ def __init__(self, fn, driver):
+ greenlet.greenlet.__init__(self, fn, driver)
+ self.driver = driver
+
+
+def await_only(awaitable: Coroutine) -> Any:
+ """Awaits an async function in a sync method.
+
+ The sync method must be insice a :func:`greenlet_spawn` context.
+ :func:`await_` calls cannot be nested.
- class _AsyncIoGreenlet(greenlet.greenlet):
- def __init__(self, fn, driver):
- greenlet.greenlet.__init__(self, fn, driver)
- self.driver = driver
+ :param awaitable: The coroutine to call.
- def await_only(awaitable: Coroutine) -> Any:
- """Awaits an async function in a sync method.
+ """
+ # this is called in the context greenlet while running fn
+ current = greenlet.getcurrent()
+ if not isinstance(current, _AsyncIoGreenlet):
+ raise exc.InvalidRequestError(
+ "greenlet_spawn has not been called; can't call await_() here."
+ )
- The sync method must be insice a :func:`greenlet_spawn` context.
- :func:`await_` calls cannot be nested.
+ # returns the control to the driver greenlet passing it
+ # a coroutine to run. Once the awaitable is done, the driver greenlet
+ # switches back to this greenlet with the result of awaitable that is
+ # then returned to the caller (or raised as error)
+ return current.driver.switch(awaitable)
- :param awaitable: The coroutine to call.
- """
- # this is called in the context greenlet while running fn
- current = greenlet.getcurrent()
- if not isinstance(current, _AsyncIoGreenlet):
+def await_fallback(awaitable: Coroutine) -> Any:
+ """Awaits an async function in a sync method.
+
+ The sync method must be insice a :func:`greenlet_spawn` context.
+ :func:`await_` calls cannot be nested.
+
+ :param awaitable: The coroutine to call.
+
+ """
+
+ # this is called in the context greenlet while running fn
+ current = greenlet.getcurrent()
+ if not isinstance(current, _AsyncIoGreenlet):
+ loop = asyncio.get_event_loop()
+ if loop.is_running():
raise exc.InvalidRequestError(
- "greenlet_spawn has not been called; can't call await_() here."
+ "greenlet_spawn has not been called and asyncio event "
+ "loop is already running; can't call await_() here."
)
-
- # returns the control to the driver greenlet passing it
- # a coroutine to run. Once the awaitable is done, the driver greenlet
- # switches back to this greenlet with the result of awaitable that is
- # then returned to the caller (or raised as error)
- return current.driver.switch(awaitable)
-
- def await_fallback(awaitable: Coroutine) -> Any:
- """Awaits an async function in a sync method.
-
- The sync method must be insice a :func:`greenlet_spawn` context.
- :func:`await_` calls cannot be nested.
-
- :param awaitable: The coroutine to call.
-
- """
- # this is called in the context greenlet while running fn
- current = greenlet.getcurrent()
- if not isinstance(current, _AsyncIoGreenlet):
- loop = asyncio.get_event_loop()
- if loop.is_running():
- raise exc.InvalidRequestError(
- "greenlet_spawn has not been called and asyncio event "
- "loop is already running; can't call await_() here."
- )
- return loop.run_until_complete(awaitable)
-
- return current.driver.switch(awaitable)
-
- async def greenlet_spawn(fn: Callable, *args, **kwargs) -> Any:
- """Runs a sync function ``fn`` in a new greenlet.
-
- The sync function can then use :func:`await_` to wait for async
- functions.
-
- :param fn: The sync callable to call.
- :param \\*args: Positional arguments to pass to the ``fn`` callable.
- :param \\*\\*kwargs: Keyword arguments to pass to the ``fn`` callable.
- """
- context = _AsyncIoGreenlet(fn, greenlet.getcurrent())
- # runs the function synchronously in gl greenlet. If the execution
- # is interrupted by await_, context is not dead and result is a
- # coroutine to wait. If the context is dead the function has
- # returned, and its result can be returned.
- try:
- result = context.switch(*args, **kwargs)
- while not context.dead:
- try:
- # wait for a coroutine from await_ and then return its
- # result back to it.
- value = await result
- except Exception:
- # this allows an exception to be raised within
- # the moderated greenlet so that it can continue
- # its expected flow.
- result = context.throw(*sys.exc_info())
- else:
- result = context.switch(value)
- finally:
- # clean up to avoid cycle resolution by gc
- del context.driver
- return result
-
-
-except ImportError: # pragma: no cover
- greenlet = None
-
- def await_fallback(awaitable):
- return asyncio.get_event_loop().run_until_complete(awaitable)
-
- def await_only(awaitable):
- raise ValueError("Greenlet is required to use this function")
-
- async def greenlet_spawn(fn, *args, **kw):
- raise ValueError("Greenlet is required to use this function")
+ return loop.run_until_complete(awaitable)
+
+ return current.driver.switch(awaitable)
+
+
+async def greenlet_spawn(fn: Callable, *args, **kwargs) -> Any:
+ """Runs a sync function ``fn`` in a new greenlet.
+
+ The sync function can then use :func:`await_` to wait for async
+ functions.
+
+ :param fn: The sync callable to call.
+ :param \\*args: Positional arguments to pass to the ``fn`` callable.
+ :param \\*\\*kwargs: Keyword arguments to pass to the ``fn`` callable.
+ """
+
+ context = _AsyncIoGreenlet(fn, greenlet.getcurrent())
+ # runs the function synchronously in gl greenlet. If the execution
+ # is interrupted by await_, context is not dead and result is a
+ # coroutine to wait. If the context is dead the function has
+ # returned, and its result can be returned.
+ try:
+ result = context.switch(*args, **kwargs)
+ while not context.dead:
+ try:
+ # wait for a coroutine from await_ and then return its
+ # result back to it.
+ value = await result
+ except Exception:
+ # this allows an exception to be raised within
+ # the moderated greenlet so that it can continue
+ # its expected flow.
+ result = context.throw(*sys.exc_info())
+ else:
+ result = context.switch(value)
+ finally:
+ # clean up to avoid cycle resolution by gc
+ del context.driver
+ return result
class AsyncAdaptedLock: