diff options
Diffstat (limited to 'lib/sqlalchemy/util/_concurrency_py3k.py')
-rw-r--r-- | lib/sqlalchemy/util/_concurrency_py3k.py | 181 |
1 files changed, 87 insertions, 94 deletions
diff --git a/lib/sqlalchemy/util/_concurrency_py3k.py b/lib/sqlalchemy/util/_concurrency_py3k.py index ee3abe5fa..5d11bf92c 100644 --- a/lib/sqlalchemy/util/_concurrency_py3k.py +++ b/lib/sqlalchemy/util/_concurrency_py3k.py @@ -4,110 +4,103 @@ from typing import Any from typing import Callable from typing import Coroutine +import greenlet + from .. import exc -try: - import greenlet - # implementation based on snaury gist at - # https://gist.github.com/snaury/202bf4f22c41ca34e56297bae5f33fef - # Issue for context: https://github.com/python-greenlet/greenlet/issues/173 +# implementation based on snaury gist at +# https://gist.github.com/snaury/202bf4f22c41ca34e56297bae5f33fef +# Issue for context: https://github.com/python-greenlet/greenlet/issues/173 + + +class _AsyncIoGreenlet(greenlet.greenlet): + def __init__(self, fn, driver): + greenlet.greenlet.__init__(self, fn, driver) + self.driver = driver + + +def await_only(awaitable: Coroutine) -> Any: + """Awaits an async function in a sync method. + + The sync method must be insice a :func:`greenlet_spawn` context. + :func:`await_` calls cannot be nested. - class _AsyncIoGreenlet(greenlet.greenlet): - def __init__(self, fn, driver): - greenlet.greenlet.__init__(self, fn, driver) - self.driver = driver + :param awaitable: The coroutine to call. - def await_only(awaitable: Coroutine) -> Any: - """Awaits an async function in a sync method. + """ + # this is called in the context greenlet while running fn + current = greenlet.getcurrent() + if not isinstance(current, _AsyncIoGreenlet): + raise exc.InvalidRequestError( + "greenlet_spawn has not been called; can't call await_() here." + ) - The sync method must be insice a :func:`greenlet_spawn` context. - :func:`await_` calls cannot be nested. + # returns the control to the driver greenlet passing it + # a coroutine to run. Once the awaitable is done, the driver greenlet + # switches back to this greenlet with the result of awaitable that is + # then returned to the caller (or raised as error) + return current.driver.switch(awaitable) - :param awaitable: The coroutine to call. - """ - # this is called in the context greenlet while running fn - current = greenlet.getcurrent() - if not isinstance(current, _AsyncIoGreenlet): +def await_fallback(awaitable: Coroutine) -> Any: + """Awaits an async function in a sync method. + + The sync method must be insice a :func:`greenlet_spawn` context. + :func:`await_` calls cannot be nested. + + :param awaitable: The coroutine to call. + + """ + + # this is called in the context greenlet while running fn + current = greenlet.getcurrent() + if not isinstance(current, _AsyncIoGreenlet): + loop = asyncio.get_event_loop() + if loop.is_running(): raise exc.InvalidRequestError( - "greenlet_spawn has not been called; can't call await_() here." + "greenlet_spawn has not been called and asyncio event " + "loop is already running; can't call await_() here." ) - - # returns the control to the driver greenlet passing it - # a coroutine to run. Once the awaitable is done, the driver greenlet - # switches back to this greenlet with the result of awaitable that is - # then returned to the caller (or raised as error) - return current.driver.switch(awaitable) - - def await_fallback(awaitable: Coroutine) -> Any: - """Awaits an async function in a sync method. - - The sync method must be insice a :func:`greenlet_spawn` context. - :func:`await_` calls cannot be nested. - - :param awaitable: The coroutine to call. - - """ - # this is called in the context greenlet while running fn - current = greenlet.getcurrent() - if not isinstance(current, _AsyncIoGreenlet): - loop = asyncio.get_event_loop() - if loop.is_running(): - raise exc.InvalidRequestError( - "greenlet_spawn has not been called and asyncio event " - "loop is already running; can't call await_() here." - ) - return loop.run_until_complete(awaitable) - - return current.driver.switch(awaitable) - - async def greenlet_spawn(fn: Callable, *args, **kwargs) -> Any: - """Runs a sync function ``fn`` in a new greenlet. - - The sync function can then use :func:`await_` to wait for async - functions. - - :param fn: The sync callable to call. - :param \\*args: Positional arguments to pass to the ``fn`` callable. - :param \\*\\*kwargs: Keyword arguments to pass to the ``fn`` callable. - """ - context = _AsyncIoGreenlet(fn, greenlet.getcurrent()) - # runs the function synchronously in gl greenlet. If the execution - # is interrupted by await_, context is not dead and result is a - # coroutine to wait. If the context is dead the function has - # returned, and its result can be returned. - try: - result = context.switch(*args, **kwargs) - while not context.dead: - try: - # wait for a coroutine from await_ and then return its - # result back to it. - value = await result - except Exception: - # this allows an exception to be raised within - # the moderated greenlet so that it can continue - # its expected flow. - result = context.throw(*sys.exc_info()) - else: - result = context.switch(value) - finally: - # clean up to avoid cycle resolution by gc - del context.driver - return result - - -except ImportError: # pragma: no cover - greenlet = None - - def await_fallback(awaitable): - return asyncio.get_event_loop().run_until_complete(awaitable) - - def await_only(awaitable): - raise ValueError("Greenlet is required to use this function") - - async def greenlet_spawn(fn, *args, **kw): - raise ValueError("Greenlet is required to use this function") + return loop.run_until_complete(awaitable) + + return current.driver.switch(awaitable) + + +async def greenlet_spawn(fn: Callable, *args, **kwargs) -> Any: + """Runs a sync function ``fn`` in a new greenlet. + + The sync function can then use :func:`await_` to wait for async + functions. + + :param fn: The sync callable to call. + :param \\*args: Positional arguments to pass to the ``fn`` callable. + :param \\*\\*kwargs: Keyword arguments to pass to the ``fn`` callable. + """ + + context = _AsyncIoGreenlet(fn, greenlet.getcurrent()) + # runs the function synchronously in gl greenlet. If the execution + # is interrupted by await_, context is not dead and result is a + # coroutine to wait. If the context is dead the function has + # returned, and its result can be returned. + try: + result = context.switch(*args, **kwargs) + while not context.dead: + try: + # wait for a coroutine from await_ and then return its + # result back to it. + value = await result + except Exception: + # this allows an exception to be raised within + # the moderated greenlet so that it can continue + # its expected flow. + result = context.throw(*sys.exc_info()) + else: + result = context.switch(value) + finally: + # clean up to avoid cycle resolution by gc + del context.driver + return result class AsyncAdaptedLock: |