summaryrefslogtreecommitdiff
path: root/Lib/asyncio/futures.py
diff options
context:
space:
mode:
authorGuido van Rossum <guido@python.org>2015-10-03 08:31:42 -0700
committerGuido van Rossum <guido@python.org>2015-10-03 08:31:42 -0700
commit0a568de66e0c1b4c674289aed77b596526311dec (patch)
tree4d452efaecee44d799611dbf201d6ab09422f71b /Lib/asyncio/futures.py
parent08c285751cbb0992da44881b1b42a66f6fe79de5 (diff)
downloadcpython-0a568de66e0c1b4c674289aed77b596526311dec.tar.gz
Issue #25304: Add asyncio.run_coroutine_threadsafe(). By Vincent Michel.
Diffstat (limited to 'Lib/asyncio/futures.py')
-rw-r--r--Lib/asyncio/futures.py74
1 files changed, 58 insertions, 16 deletions
diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py
index dbe06c4a98..166bc8047b 100644
--- a/Lib/asyncio/futures.py
+++ b/Lib/asyncio/futures.py
@@ -390,22 +390,64 @@ class Future:
__await__ = __iter__ # make compatible with 'await' expression
-def wrap_future(fut, *, loop=None):
- """Wrap concurrent.futures.Future object."""
- if isinstance(fut, Future):
- return fut
- assert isinstance(fut, concurrent.futures.Future), \
- 'concurrent.futures.Future is expected, got {!r}'.format(fut)
- if loop is None:
- loop = events.get_event_loop()
- new_future = Future(loop=loop)
+def _set_concurrent_future_state(concurrent, source):
+ """Copy state from a future to a concurrent.futures.Future."""
+ assert source.done()
+ if source.cancelled():
+ concurrent.cancel()
+ if not concurrent.set_running_or_notify_cancel():
+ return
+ exception = source.exception()
+ if exception is not None:
+ concurrent.set_exception(exception)
+ else:
+ result = source.result()
+ concurrent.set_result(result)
+
+
+def _chain_future(source, destination):
+ """Chain two futures so that when one completes, so does the other.
+
+ The result (or exception) of source will be copied to destination.
+ If destination is cancelled, source gets cancelled too.
+ Compatible with both asyncio.Future and concurrent.futures.Future.
+ """
+ if not isinstance(source, (Future, concurrent.futures.Future)):
+ raise TypeError('A future is required for source argument')
+ if not isinstance(destination, (Future, concurrent.futures.Future)):
+ raise TypeError('A future is required for destination argument')
+ source_loop = source._loop if isinstance(source, Future) else None
+ dest_loop = destination._loop if isinstance(destination, Future) else None
+
+ def _set_state(future, other):
+ if isinstance(future, Future):
+ future._copy_state(other)
+ else:
+ _set_concurrent_future_state(future, other)
- def _check_cancel_other(f):
- if f.cancelled():
- fut.cancel()
+ def _call_check_cancel(destination):
+ if destination.cancelled():
+ if source_loop is None or source_loop is dest_loop:
+ source.cancel()
+ else:
+ source_loop.call_soon_threadsafe(source.cancel)
- new_future.add_done_callback(_check_cancel_other)
- fut.add_done_callback(
- lambda future: loop.call_soon_threadsafe(
- new_future._copy_state, future))
+ def _call_set_state(source):
+ if dest_loop is None or dest_loop is source_loop:
+ _set_state(destination, source)
+ else:
+ dest_loop.call_soon_threadsafe(_set_state, destination, source)
+
+ destination.add_done_callback(_call_check_cancel)
+ source.add_done_callback(_call_set_state)
+
+
+def wrap_future(future, *, loop=None):
+ """Wrap concurrent.futures.Future object."""
+ if isinstance(future, Future):
+ return future
+ assert isinstance(future, concurrent.futures.Future), \
+ 'concurrent.futures.Future is expected, got {!r}'.format(future)
+ new_future = Future(loop=loop)
+ _chain_future(future, new_future)
return new_future