summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbrian.quinlan <devnull@localhost>2010-01-08 10:14:29 +0000
committerbrian.quinlan <devnull@localhost>2010-01-08 10:14:29 +0000
commit3af3ab0315f411fa282a73041ac279b765512911 (patch)
treedd55c4af833dec04f5a3818cb994e48478bbf42c
parent0e394dbfc733282258d6433248672e5e5cf9e2f9 (diff)
downloadfutures-3af3ab0315f411fa282a73041ac279b765512911.tar.gz
More efficient iter_as_completed implementation
-rw-r--r--python3/crawl.py2
-rw-r--r--python3/futures/__init__.py12
-rw-r--r--python3/futures/_base.py146
3 files changed, 90 insertions, 70 deletions
diff --git a/python3/crawl.py b/python3/crawl.py
index 3423801..96dfc67 100644
--- a/python3/crawl.py
+++ b/python3/crawl.py
@@ -37,7 +37,7 @@ def download_urls_with_executor(urls, executor, timeout=60):
future_to_url = dict((executor.submit(load_url, url, timeout), url)
for url in urls)
- for future in futures.iter_as_completed(future_to_url):
+ for future in futures.as_completed(future_to_url):
try:
url_to_content[future_to_url[future]] = future.result()
except:
diff --git a/python3/futures/__init__.py b/python3/futures/__init__.py
index 934ce8e..dfa0da6 100644
--- a/python3/futures/__init__.py
+++ b/python3/futures/__init__.py
@@ -4,10 +4,14 @@
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
-from futures._base import (FIRST_COMPLETED, FIRST_EXCEPTION,
- ALL_COMPLETED, RETURN_IMMEDIATELY,
- CancelledError, TimeoutError,
- Future, wait, iter_as_completed)
+from futures._base import (FIRST_COMPLETED,
+ FIRST_EXCEPTION,
+ ALL_COMPLETED,
+ CancelledError,
+ TimeoutError,
+ Future,
+ wait,
+ as_completed)
from futures.thread import ThreadPoolExecutor
from futures.process import ProcessPoolExecutor
diff --git a/python3/futures/_base.py b/python3/futures/_base.py
index 5c1c9ef..8f0ef03 100644
--- a/python3/futures/_base.py
+++ b/python3/futures/_base.py
@@ -10,7 +10,6 @@ import time
FIRST_COMPLETED = 'FIRST_COMPLETED'
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
ALL_COMPLETED = 'ALL_COMPLETED'
-RETURN_IMMEDIATELY = 'RETURN_IMMEDIATELY'
# Possible future states (for internal use by the futures package).
PENDING = 'PENDING'
@@ -47,32 +46,30 @@ class Error(Exception):
pass
class CancelledError(Error):
+ """The Future was cancelled."""
pass
class TimeoutError(Error):
+ """The operation exceeded the given deadline."""
pass
class _Waiter(object):
- """Provides the event that FutureList.wait(...) blocks on.
-
- """
+ """Provides the event that wait() and as_completed() block on."""
def __init__(self):
self.event = threading.Event()
- self.result_futures = []
- self.exception_futures = []
- self.cancelled_futures = []
+ self.finished_futures = []
def add_result(self, future):
- self.result_futures.append(future)
+ self.finished_futures.append(future)
def add_exception(self, future):
- self.exception_futures.append(future)
+ self.finished_futures.append(future)
def add_cancelled(self, future):
- self.cancelled_futures.append(future)
+ self.finished_futures.append(future)
class _FirstCompletedWaiter(_Waiter):
- """Used by wait(return_when=FIRST_COMPLETED)."""
+ """Used by wait(return_when=FIRST_COMPLETED) and as_completed()."""
def add_result(self, future):
super().add_result(future)
@@ -94,27 +91,29 @@ class _AllCompletedWaiter(_Waiter):
self.stop_on_exception = stop_on_exception
super().__init__()
- def _XXX(self):
+ def _decrement_pending_calls(self):
self.num_pending_calls -= 1
if not self.num_pending_calls:
self.event.set()
def add_result(self, future):
super().add_result(future)
- self._XXX()
+ self._decrement_pending_calls()
def add_exception(self, future):
super().add_exception(future)
if self.stop_on_exception:
self.event.set()
else:
- self._XXX()
+ self._decrement_pending_calls()
def add_cancelled(self, future):
super().add_cancelled(future)
- self._XXX()
+ self._decrement_pending_calls()
+
+class _AcquireFutures(object):
+ """A context manager that does an ordered acquire of Future conditions."""
-class YYY(object):
def __init__(self, futures):
self.futures = sorted(futures, key=id)
@@ -126,31 +125,74 @@ class YYY(object):
for future in self.futures:
future._condition.release()
-def iter_as_completed(fs, timeout=None):
+def _create_and_install_waiters(fs, return_when):
+ if return_when == FIRST_COMPLETED:
+ waiter = _FirstCompletedWaiter()
+ else:
+ pending_count = sum(
+ f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
+
+ if return_when == FIRST_EXCEPTION:
+ waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
+ elif return_when == ALL_COMPLETED:
+ waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
+ else:
+ raise ValueError("Invalid return condition: %r" % return_when)
+
+ for f in fs:
+ f._waiters.append(waiter)
+
+ return waiter
+
+def as_completed(fs, timeout=None):
+ """An iterator over the given futures that yields each as it completes.
+
+ Args:
+ fs: The sequence of Futures to iterate over.
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+
+ Returns:
+ An iterator that yields the given Futures as they complete (finish or
+ are cancelled).
+
+ Raises:
+ TimeoutError: If the entire result iterator could not be generated
+ before the given timeout.
+ """
if timeout is not None:
end_time = timeout + time.time()
- pending = set(fs)
+ with _AcquireFutures(fs):
+ finished = set(
+ f for f in fs
+ if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED])
+ pending = set(fs) - finished
+ waiter = _create_and_install_waiters(fs, FIRST_COMPLETED)
+
+ try:
+ for future in finished:
+ yield finished
- while pending:
- if timeout is None:
- wait_timeout = None
- else:
- wait_timeout = end_time - time.time()
- if wait_timeout < 0:
- raise TimeoutError()
+ while pending:
+ if timeout is None:
+ wait_timeout = None
+ else:
+ wait_timeout = end_time - time.time()
+ if wait_timeout < 0:
+ raise TimeoutError('timeout with %d unfinished futures' %
+ len(pending))
- print('HERE 2')
- # TODO(brian@sweetapp.com): wait() involves a lot of setup and
- # tear-down - check to see if that makes this implementation
- # unreasonably expensive.
- completed, pending = wait(pending,
- timeout=wait_timeout,
- return_when=FIRST_COMPLETED)
+ waiter.event.wait(timeout)
- for future in completed:
- print('HERE 5')
- yield future
+ for future in waiter.finished_futures:
+ yield future
+ waiter.finished_futures.remove(future)
+ pending.remove(future)
+
+ finally:
+ for f in fs:
+ f._waiters.remove(waiter)
def wait(fs, timeout=None, return_when=ALL_COMPLETED):
"""Wait for the futures in the list to complete.
@@ -172,7 +214,7 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
TimeoutError: If the wait condition wasn't satisfied before the
given timeout.
"""
- with YYY(fs):
+ with _AcquireFutures(fs):
finished = set(
f for f in fs
if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED])
@@ -188,45 +230,18 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
if len(finished) == len(fs):
return finished, not_finished
- if return_when == FIRST_COMPLETED:
- waiter = _FirstCompletedWaiter()
- else:
- pending_count = sum(
- f._state not in [CANCELLED_AND_NOTIFIED, FINISHED]
- for f in fs)
-
- if return_when == FIRST_EXCEPTION:
- waiter = _AllCompletedWaiter(
- pending_count, stop_on_exception=True)
- elif return_when == ALL_COMPLETED:
- waiter = _AllCompletedWaiter(
- pending_count, stop_on_exception=False)
- else:
- raise Exception("XXX")
-
- for f in fs:
- f._waiters.append(waiter)
+ waiter = _create_and_install_waiters(fs, return_when)
waiter.event.wait(timeout)
for f in fs:
f._waiters.remove(waiter)
- finished.update(waiter.result_futures)
- finished.update(waiter.exception_futures)
- finished.update(waiter.cancelled_futures)
-
+ finished.update(waiter.finished_futures)
return finished, set(fs) - finished
class Future(object):
"""Represents the result of an asynchronous computation."""
- # Transitions into the CANCELLED_AND_NOTIFIED and FINISHED states trigger notifications to the ThreadEventSink
- # belonging to the Future's FutureList and must be made with ThreadEventSink._condition held to prevent a race
- # condition when the transition is made concurrently with the addition of a new _WaitTracker to the ThreadEventSink.
- # Other state transitions need only have the Future._condition held.
- # When ThreadEventSink._condition and Future._condition must both be held then Future._condition is always acquired
- # first.
-
def __init__(self):
"""Initializes the future. Should not be called by clients."""
self._condition = threading.Condition()
@@ -269,6 +284,7 @@ class Future(object):
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
def running(self):
+ """Return True if the future is currently executing."""
with self._condition:
return self._state == RUNNING