diff options
author | brian.quinlan <devnull@localhost> | 2010-01-08 10:14:29 +0000 |
---|---|---|
committer | brian.quinlan <devnull@localhost> | 2010-01-08 10:14:29 +0000 |
commit | 3af3ab0315f411fa282a73041ac279b765512911 (patch) | |
tree | dd55c4af833dec04f5a3818cb994e48478bbf42c | |
parent | 0e394dbfc733282258d6433248672e5e5cf9e2f9 (diff) | |
download | futures-3af3ab0315f411fa282a73041ac279b765512911.tar.gz |
More efficient iter_as_completed implementation
-rw-r--r-- | python3/crawl.py | 2 | ||||
-rw-r--r-- | python3/futures/__init__.py | 12 | ||||
-rw-r--r-- | python3/futures/_base.py | 146 |
3 files changed, 90 insertions, 70 deletions
diff --git a/python3/crawl.py b/python3/crawl.py index 3423801..96dfc67 100644 --- a/python3/crawl.py +++ b/python3/crawl.py @@ -37,7 +37,7 @@ def download_urls_with_executor(urls, executor, timeout=60): future_to_url = dict((executor.submit(load_url, url, timeout), url) for url in urls) - for future in futures.iter_as_completed(future_to_url): + for future in futures.as_completed(future_to_url): try: url_to_content[future_to_url[future]] = future.result() except: diff --git a/python3/futures/__init__.py b/python3/futures/__init__.py index 934ce8e..dfa0da6 100644 --- a/python3/futures/__init__.py +++ b/python3/futures/__init__.py @@ -4,10 +4,14 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)' -from futures._base import (FIRST_COMPLETED, FIRST_EXCEPTION, - ALL_COMPLETED, RETURN_IMMEDIATELY, - CancelledError, TimeoutError, - Future, wait, iter_as_completed) +from futures._base import (FIRST_COMPLETED, + FIRST_EXCEPTION, + ALL_COMPLETED, + CancelledError, + TimeoutError, + Future, + wait, + as_completed) from futures.thread import ThreadPoolExecutor from futures.process import ProcessPoolExecutor diff --git a/python3/futures/_base.py b/python3/futures/_base.py index 5c1c9ef..8f0ef03 100644 --- a/python3/futures/_base.py +++ b/python3/futures/_base.py @@ -10,7 +10,6 @@ import time FIRST_COMPLETED = 'FIRST_COMPLETED' FIRST_EXCEPTION = 'FIRST_EXCEPTION' ALL_COMPLETED = 'ALL_COMPLETED' -RETURN_IMMEDIATELY = 'RETURN_IMMEDIATELY' # Possible future states (for internal use by the futures package). PENDING = 'PENDING' @@ -47,32 +46,30 @@ class Error(Exception): pass class CancelledError(Error): + """The Future was cancelled.""" pass class TimeoutError(Error): + """The operation exceeded the given deadline.""" pass class _Waiter(object): - """Provides the event that FutureList.wait(...) blocks on. - - """ + """Provides the event that wait() and as_completed() block on.""" def __init__(self): self.event = threading.Event() - self.result_futures = [] - self.exception_futures = [] - self.cancelled_futures = [] + self.finished_futures = [] def add_result(self, future): - self.result_futures.append(future) + self.finished_futures.append(future) def add_exception(self, future): - self.exception_futures.append(future) + self.finished_futures.append(future) def add_cancelled(self, future): - self.cancelled_futures.append(future) + self.finished_futures.append(future) class _FirstCompletedWaiter(_Waiter): - """Used by wait(return_when=FIRST_COMPLETED).""" + """Used by wait(return_when=FIRST_COMPLETED) and as_completed().""" def add_result(self, future): super().add_result(future) @@ -94,27 +91,29 @@ class _AllCompletedWaiter(_Waiter): self.stop_on_exception = stop_on_exception super().__init__() - def _XXX(self): + def _decrement_pending_calls(self): self.num_pending_calls -= 1 if not self.num_pending_calls: self.event.set() def add_result(self, future): super().add_result(future) - self._XXX() + self._decrement_pending_calls() def add_exception(self, future): super().add_exception(future) if self.stop_on_exception: self.event.set() else: - self._XXX() + self._decrement_pending_calls() def add_cancelled(self, future): super().add_cancelled(future) - self._XXX() + self._decrement_pending_calls() + +class _AcquireFutures(object): + """A context manager that does an ordered acquire of Future conditions.""" -class YYY(object): def __init__(self, futures): self.futures = sorted(futures, key=id) @@ -126,31 +125,74 @@ class YYY(object): for future in self.futures: future._condition.release() -def iter_as_completed(fs, timeout=None): +def _create_and_install_waiters(fs, return_when): + if return_when == FIRST_COMPLETED: + waiter = _FirstCompletedWaiter() + else: + pending_count = sum( + f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) + + if return_when == FIRST_EXCEPTION: + waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) + elif return_when == ALL_COMPLETED: + waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) + else: + raise ValueError("Invalid return condition: %r" % return_when) + + for f in fs: + f._waiters.append(waiter) + + return waiter + +def as_completed(fs, timeout=None): + """An iterator over the given futures that yields each as it completes. + + Args: + fs: The sequence of Futures to iterate over. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Returns: + An iterator that yields the given Futures as they complete (finish or + are cancelled). + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + """ if timeout is not None: end_time = timeout + time.time() - pending = set(fs) + with _AcquireFutures(fs): + finished = set( + f for f in fs + if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]) + pending = set(fs) - finished + waiter = _create_and_install_waiters(fs, FIRST_COMPLETED) + + try: + for future in finished: + yield finished - while pending: - if timeout is None: - wait_timeout = None - else: - wait_timeout = end_time - time.time() - if wait_timeout < 0: - raise TimeoutError() + while pending: + if timeout is None: + wait_timeout = None + else: + wait_timeout = end_time - time.time() + if wait_timeout < 0: + raise TimeoutError('timeout with %d unfinished futures' % + len(pending)) - print('HERE 2') - # TODO(brian@sweetapp.com): wait() involves a lot of setup and - # tear-down - check to see if that makes this implementation - # unreasonably expensive. - completed, pending = wait(pending, - timeout=wait_timeout, - return_when=FIRST_COMPLETED) + waiter.event.wait(timeout) - for future in completed: - print('HERE 5') - yield future + for future in waiter.finished_futures: + yield future + waiter.finished_futures.remove(future) + pending.remove(future) + + finally: + for f in fs: + f._waiters.remove(waiter) def wait(fs, timeout=None, return_when=ALL_COMPLETED): """Wait for the futures in the list to complete. @@ -172,7 +214,7 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED): TimeoutError: If the wait condition wasn't satisfied before the given timeout. """ - with YYY(fs): + with _AcquireFutures(fs): finished = set( f for f in fs if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]) @@ -188,45 +230,18 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED): if len(finished) == len(fs): return finished, not_finished - if return_when == FIRST_COMPLETED: - waiter = _FirstCompletedWaiter() - else: - pending_count = sum( - f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] - for f in fs) - - if return_when == FIRST_EXCEPTION: - waiter = _AllCompletedWaiter( - pending_count, stop_on_exception=True) - elif return_when == ALL_COMPLETED: - waiter = _AllCompletedWaiter( - pending_count, stop_on_exception=False) - else: - raise Exception("XXX") - - for f in fs: - f._waiters.append(waiter) + waiter = _create_and_install_waiters(fs, return_when) waiter.event.wait(timeout) for f in fs: f._waiters.remove(waiter) - finished.update(waiter.result_futures) - finished.update(waiter.exception_futures) - finished.update(waiter.cancelled_futures) - + finished.update(waiter.finished_futures) return finished, set(fs) - finished class Future(object): """Represents the result of an asynchronous computation.""" - # Transitions into the CANCELLED_AND_NOTIFIED and FINISHED states trigger notifications to the ThreadEventSink - # belonging to the Future's FutureList and must be made with ThreadEventSink._condition held to prevent a race - # condition when the transition is made concurrently with the addition of a new _WaitTracker to the ThreadEventSink. - # Other state transitions need only have the Future._condition held. - # When ThreadEventSink._condition and Future._condition must both be held then Future._condition is always acquired - # first. - def __init__(self): """Initializes the future. Should not be called by clients.""" self._condition = threading.Condition() @@ -269,6 +284,7 @@ class Future(object): return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] def running(self): + """Return True if the future is currently executing.""" with self._condition: return self._state == RUNNING |