summaryrefslogtreecommitdiff
path: root/concurrent/futures/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'concurrent/futures/_base.py')
-rw-r--r--concurrent/futures/_base.py12
1 files changed, 6 insertions, 6 deletions
diff --git a/concurrent/futures/_base.py b/concurrent/futures/_base.py
index aaefa2b..8ed69b7 100644
--- a/concurrent/futures/_base.py
+++ b/concurrent/futures/_base.py
@@ -2,7 +2,6 @@
# Licensed to PSF under a Contributor Agreement.
from __future__ import with_statement
-import functools
import logging
import threading
import time
@@ -46,8 +45,6 @@ _STATE_TO_DESCRIPTION_MAP = {
# Logger for internal use by the futures package.
LOGGER = logging.getLogger("concurrent.futures")
-STDERR_HANDLER = logging.StreamHandler()
-LOGGER.addHandler(STDERR_HANDLER)
class Error(Exception):
"""Base class for all future-related exceptions."""
@@ -119,11 +116,14 @@ class _AllCompletedWaiter(_Waiter):
def __init__(self, num_pending_calls, stop_on_exception):
self.num_pending_calls = num_pending_calls
self.stop_on_exception = stop_on_exception
+ self.lock = threading.Lock()
super(_AllCompletedWaiter, self).__init__()
def _decrement_pending_calls(self):
- if self.num_pending_calls == len(self.finished_futures):
- self.event.set()
+ with self.lock:
+ self.num_pending_calls -= 1
+ if not self.num_pending_calls:
+ self.event.set()
def add_result(self, future):
super(_AllCompletedWaiter, self).add_result(future)
@@ -523,7 +523,7 @@ class Executor(object):
"""Returns a iterator equivalent to map(fn, iter).
Args:
- fn: A callable that will take take as many arguments as there are
+ fn: A callable that will take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.