diff options
Diffstat (limited to 'concurrent/futures/_base.py')
-rw-r--r-- | concurrent/futures/_base.py | 12 |
1 files changed, 6 insertions, 6 deletions
diff --git a/concurrent/futures/_base.py b/concurrent/futures/_base.py index aaefa2b..8ed69b7 100644 --- a/concurrent/futures/_base.py +++ b/concurrent/futures/_base.py @@ -2,7 +2,6 @@ # Licensed to PSF under a Contributor Agreement. from __future__ import with_statement -import functools import logging import threading import time @@ -46,8 +45,6 @@ _STATE_TO_DESCRIPTION_MAP = { # Logger for internal use by the futures package. LOGGER = logging.getLogger("concurrent.futures") -STDERR_HANDLER = logging.StreamHandler() -LOGGER.addHandler(STDERR_HANDLER) class Error(Exception): """Base class for all future-related exceptions.""" @@ -119,11 +116,14 @@ class _AllCompletedWaiter(_Waiter): def __init__(self, num_pending_calls, stop_on_exception): self.num_pending_calls = num_pending_calls self.stop_on_exception = stop_on_exception + self.lock = threading.Lock() super(_AllCompletedWaiter, self).__init__() def _decrement_pending_calls(self): - if self.num_pending_calls == len(self.finished_futures): - self.event.set() + with self.lock: + self.num_pending_calls -= 1 + if not self.num_pending_calls: + self.event.set() def add_result(self, future): super(_AllCompletedWaiter, self).add_result(future) @@ -523,7 +523,7 @@ class Executor(object): """Returns a iterator equivalent to map(fn, iter). Args: - fn: A callable that will take take as many arguments as there are + fn: A callable that will take as many arguments as there are passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. |