summaryrefslogtreecommitdiff
path: root/Lib/concurrent/futures/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/concurrent/futures/_base.py')
-rw-r--r--Lib/concurrent/futures/_base.py35
1 files changed, 20 insertions, 15 deletions
diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py
index 7929c3cc6a..d45a404d37 100644
--- a/Lib/concurrent/futures/_base.py
+++ b/Lib/concurrent/futures/_base.py
@@ -4,7 +4,6 @@
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
import collections
-import functools
import logging
import threading
import time
@@ -182,7 +181,8 @@ def as_completed(fs, timeout=None):
Returns:
An iterator that yields the given Futures as they complete (finished or
- cancelled).
+ cancelled). If any given Futures are duplicated, they will be returned
+ once.
Raises:
TimeoutError: If the entire result iterator could not be generated
@@ -191,11 +191,12 @@ def as_completed(fs, timeout=None):
if timeout is not None:
end_time = timeout + time.time()
+ fs = set(fs)
with _AcquireFutures(fs):
finished = set(
f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
- pending = set(fs) - finished
+ pending = fs - finished
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
try:
@@ -333,7 +334,7 @@ class Future(object):
return True
def cancelled(self):
- """Return True if the future has cancelled."""
+ """Return True if the future was cancelled."""
with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
@@ -471,8 +472,8 @@ class Future(object):
return True
else:
LOGGER.critical('Future %s in unexpected state: %s',
- id(self.future),
- self.future._state)
+ id(self),
+ self._state)
raise RuntimeError('Future in unexpected state')
def set_result(self, result):
@@ -538,15 +539,19 @@ class Executor(object):
fs = [self.submit(fn, *args) for args in zip(*iterables)]
- try:
- for future in fs:
- if timeout is None:
- yield future.result()
- else:
- yield future.result(end_time - time.time())
- finally:
- for future in fs:
- future.cancel()
+ # Yield must be hidden in closure so that the futures are submitted
+ # before the first iterator value is required.
+ def result_iterator():
+ try:
+ for future in fs:
+ if timeout is None:
+ yield future.result()
+ else:
+ yield future.result(end_time - time.time())
+ finally:
+ for future in fs:
+ future.cancel()
+ return result_iterator()
def shutdown(self, wait=True):
"""Clean-up the resources associated with the Executor.