diff options
author | Alex Gr?nholm <alex.gronholm@nextday.fi> | 2013-06-24 01:20:47 +0300 |
---|---|---|
committer | Alex Gr?nholm <alex.gronholm@nextday.fi> | 2013-06-24 01:20:47 +0300 |
commit | acb8961109b61390bac0a0cd795d53924db20f4c (patch) | |
tree | 810b408eff45e7507e131e6f4f7eed6c798c7206 /concurrent | |
parent | 24ffe396c4da66cfa65b5d8641f9c39aa3ada5b1 (diff) | |
download | futures-acb8961109b61390bac0a0cd795d53924db20f4c.tar.gz |
Redid the port from scratch using Python 3.2.5 as base2.1.4
Diffstat (limited to 'concurrent')
-rw-r--r-- | concurrent/futures/_base.py | 12 | ||||
-rw-r--r-- | concurrent/futures/process.py | 170 | ||||
-rw-r--r-- | concurrent/futures/thread.py | 60 |
3 files changed, 127 insertions, 115 deletions
diff --git a/concurrent/futures/_base.py b/concurrent/futures/_base.py index aaefa2b..8ed69b7 100644 --- a/concurrent/futures/_base.py +++ b/concurrent/futures/_base.py @@ -2,7 +2,6 @@ # Licensed to PSF under a Contributor Agreement. from __future__ import with_statement -import functools import logging import threading import time @@ -46,8 +45,6 @@ _STATE_TO_DESCRIPTION_MAP = { # Logger for internal use by the futures package. LOGGER = logging.getLogger("concurrent.futures") -STDERR_HANDLER = logging.StreamHandler() -LOGGER.addHandler(STDERR_HANDLER) class Error(Exception): """Base class for all future-related exceptions.""" @@ -119,11 +116,14 @@ class _AllCompletedWaiter(_Waiter): def __init__(self, num_pending_calls, stop_on_exception): self.num_pending_calls = num_pending_calls self.stop_on_exception = stop_on_exception + self.lock = threading.Lock() super(_AllCompletedWaiter, self).__init__() def _decrement_pending_calls(self): - if self.num_pending_calls == len(self.finished_futures): - self.event.set() + with self.lock: + self.num_pending_calls -= 1 + if not self.num_pending_calls: + self.event.set() def add_result(self, future): super(_AllCompletedWaiter, self).add_result(future) @@ -523,7 +523,7 @@ class Executor(object): """Returns a iterator equivalent to map(fn, iter). Args: - fn: A callable that will take take as many arguments as there are + fn: A callable that will take as many arguments as there are passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. diff --git a/concurrent/futures/process.py b/concurrent/futures/process.py index 87dc789..98684f8 100644 --- a/concurrent/futures/process.py +++ b/concurrent/futures/process.py @@ -73,28 +73,17 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)' # workers to exit when their work queues are empty and then waits until the # threads/processes finish. -_thread_references = set() +_threads_queues = weakref.WeakKeyDictionary() _shutdown = False def _python_exit(): global _shutdown _shutdown = True - for thread_reference in _thread_references: - thread = thread_reference() - if thread is not None: - thread.join() - -def _remove_dead_thread_references(): - """Remove inactive threads from _thread_references. - - Should be called periodically to prevent memory leaks in scenarios such as: - >>> while True: - >>> ... t = ThreadPoolExecutor(max_workers=5) - >>> ... t.map(int, ['1', '2', '3', '4', '5']) - """ - for thread_reference in set(_thread_references): - if thread_reference() is None: - _thread_references.discard(thread_reference) + items = list(_threads_queues.items()) + for t, q in items: + q.put(None) + for t, q in items: + t.join() # Controls how many more calls than processes will be queued in the call queue. # A smaller number will mean that processes spend more time idle waiting for @@ -122,10 +111,10 @@ class _CallItem(object): self.args = args self.kwargs = kwargs -def _process_worker(call_queue, result_queue, shutdown): +def _process_worker(call_queue, result_queue): """Evaluates calls from call_queue and places the results in result_queue. - This worker is run in a seperate process. + This worker is run in a separate process. Args: call_queue: A multiprocessing.Queue of _CallItems that will be read and @@ -136,21 +125,20 @@ def _process_worker(call_queue, result_queue, shutdown): worker that it should exit when call_queue is empty. """ while True: + call_item = call_queue.get(block=True) + if call_item is None: + # Wake up queue management thread + result_queue.put(None) + return try: - call_item = call_queue.get(block=True, timeout=0.1) - except queue.Empty: - if shutdown.is_set(): - return + r = call_item.fn(*call_item.args, **call_item.kwargs) + except BaseException: + e = sys.exc_info()[1] + result_queue.put(_ResultItem(call_item.work_id, + exception=e)) else: - try: - r = call_item.fn(*call_item.args, **call_item.kwargs) - except BaseException: - e = sys.exc_info()[1] - result_queue.put(_ResultItem(call_item.work_id, - exception=e)) - else: - result_queue.put(_ResultItem(call_item.work_id, - result=r)) + result_queue.put(_ResultItem(call_item.work_id, + result=r)) def _add_call_item_to_queue(pending_work_items, work_ids, @@ -189,13 +177,12 @@ def _add_call_item_to_queue(pending_work_items, del pending_work_items[work_id] continue -def _queue_manangement_worker(executor_reference, - processes, - pending_work_items, - work_ids_queue, - call_queue, - result_queue, - shutdown_process_event): +def _queue_management_worker(executor_reference, + processes, + pending_work_items, + work_ids_queue, + call_queue, + result_queue): """Manages the communication between this process and the worker processes. This function is run in a local thread. @@ -213,37 +200,19 @@ def _queue_manangement_worker(executor_reference, derived from _WorkItems for processing by the process workers. result_queue: A multiprocessing.Queue of _ResultItems generated by the process workers. - shutdown_process_event: A multiprocessing.Event used to signal the - process workers that they should exit when their work queue is - empty. """ + nb_shutdown_processes = [0] + def shutdown_one_process(): + """Tell a worker to terminate, which will in turn wake us again""" + call_queue.put(None) + nb_shutdown_processes[0] += 1 while True: _add_call_item_to_queue(pending_work_items, work_ids_queue, call_queue) - try: - result_item = result_queue.get(block=True, timeout=0.1) - except queue.Empty: - executor = executor_reference() - # No more work items can be added if: - # - The interpreter is shutting down OR - # - The executor that owns this worker has been collected OR - # - The executor that owns this worker has been shutdown. - if _shutdown or executor is None or executor._shutdown_thread: - # Since no new work items can be added, it is safe to shutdown - # this thread if there are no pending work items. - if not pending_work_items: - shutdown_process_event.set() - - # If .join() is not called on the created processes then - # some multiprocessing.Queue methods may deadlock on Mac OS - # X. - for p in processes: - p.join() - return - del executor - else: + result_item = result_queue.get(block=True) + if result_item is not None: work_item = pending_work_items[result_item.work_id] del pending_work_items[result_item.work_id] @@ -251,6 +220,51 @@ def _queue_manangement_worker(executor_reference, work_item.future.set_exception(result_item.exception) else: work_item.future.set_result(result_item.result) + # Check whether we should start shutting down. + executor = executor_reference() + # No more work items can be added if: + # - The interpreter is shutting down OR + # - The executor that owns this worker has been collected OR + # - The executor that owns this worker has been shutdown. + if _shutdown or executor is None or executor._shutdown_thread: + # Since no new work items can be added, it is safe to shutdown + # this thread if there are no pending work items. + if not pending_work_items: + while nb_shutdown_processes[0] < len(processes): + shutdown_one_process() + # If .join() is not called on the created processes then + # some multiprocessing.Queue methods may deadlock on Mac OS + # X. + for p in processes: + p.join() + call_queue.close() + return + del executor + +_system_limits_checked = False +_system_limited = None +def _check_system_limits(): + global _system_limits_checked, _system_limited + if _system_limits_checked: + if _system_limited: + raise NotImplementedError(_system_limited) + _system_limits_checked = True + try: + import os + nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") + except (AttributeError, ValueError): + # sysconf not available or setting not available + return + if nsems_max == -1: + # indetermine limit, assume that limit is determined + # by available memory only + return + if nsems_max >= 256: + # minimum number of semaphores available + # according to POSIX + return + _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max + raise NotImplementedError(_system_limited) class ProcessPoolExecutor(_base.Executor): def __init__(self, max_workers=None): @@ -261,7 +275,7 @@ class ProcessPoolExecutor(_base.Executor): execute the given calls. If None or not given then as many worker processes will be created as the machine has processors. """ - _remove_dead_thread_references() + _check_system_limits() if max_workers is None: self._max_workers = multiprocessing.cpu_count() @@ -280,33 +294,34 @@ class ProcessPoolExecutor(_base.Executor): # Shutdown is a two-step process. self._shutdown_thread = False - self._shutdown_process_event = multiprocessing.Event() self._shutdown_lock = threading.Lock() self._queue_count = 0 self._pending_work_items = {} def _start_queue_management_thread(self): + # When the executor gets lost, the weakref callback will wake up + # the queue management thread. + def weakref_cb(_, q=self._result_queue): + q.put(None) if self._queue_management_thread is None: self._queue_management_thread = threading.Thread( - target=_queue_manangement_worker, - args=(weakref.ref(self), + target=_queue_management_worker, + args=(weakref.ref(self, weakref_cb), self._processes, self._pending_work_items, self._work_ids, self._call_queue, - self._result_queue, - self._shutdown_process_event)) + self._result_queue)) self._queue_management_thread.daemon = True self._queue_management_thread.start() - _thread_references.add(weakref.ref(self._queue_management_thread)) + _threads_queues[self._queue_management_thread] = self._result_queue def _adjust_process_count(self): for _ in range(len(self._processes), self._max_workers): p = multiprocessing.Process( target=_process_worker, args=(self._call_queue, - self._result_queue, - self._shutdown_process_event)) + self._result_queue)) p.start() self._processes.add(p) @@ -321,6 +336,8 @@ class ProcessPoolExecutor(_base.Executor): self._pending_work_items[self._queue_count] = w self._work_ids.put(self._queue_count) self._queue_count += 1 + # Wake up queue management thread + self._result_queue.put(None) self._start_queue_management_thread() self._adjust_process_count() @@ -330,15 +347,16 @@ class ProcessPoolExecutor(_base.Executor): def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown_thread = True - if wait: - if self._queue_management_thread: + if self._queue_management_thread: + # Wake up queue management thread + self._result_queue.put(None) + if wait: self._queue_management_thread.join() # To reduce the risk of openning too many files, remove references to # objects that use file descriptors. self._queue_management_thread = None self._call_queue = None self._result_queue = None - self._shutdown_process_event = None self._processes = None shutdown.__doc__ = _base.Executor.shutdown.__doc__ diff --git a/concurrent/futures/thread.py b/concurrent/futures/thread.py index ce0dda0..a45959d 100644 --- a/concurrent/futures/thread.py +++ b/concurrent/futures/thread.py @@ -32,28 +32,17 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)' # workers to exit when their work queues are empty and then waits until the # threads finish. -_thread_references = set() +_threads_queues = weakref.WeakKeyDictionary() _shutdown = False def _python_exit(): global _shutdown _shutdown = True - for thread_reference in _thread_references: - thread = thread_reference() - if thread is not None: - thread.join() - -def _remove_dead_thread_references(): - """Remove inactive threads from _thread_references. - - Should be called periodically to prevent memory leaks in scenarios such as: - >>> while True: - ... t = ThreadPoolExecutor(max_workers=5) - ... t.map(int, ['1', '2', '3', '4', '5']) - """ - for thread_reference in set(_thread_references): - if thread_reference() is None: - _thread_references.discard(thread_reference) + items = list(_threads_queues.items()) + for t, q in items: + q.put(None) + for t, q in items: + t.join() atexit.register(_python_exit) @@ -79,19 +68,20 @@ class _WorkItem(object): def _worker(executor_reference, work_queue): try: while True: - try: - work_item = work_queue.get(block=True, timeout=0.1) - except queue.Empty: - executor = executor_reference() - # Exit if: - # - The interpreter is shutting down OR - # - The executor that owns the worker has been collected OR - # - The executor that owns the worker has been shutdown. - if _shutdown or executor is None or executor._shutdown: - return - del executor - else: + work_item = work_queue.get(block=True) + if work_item is not None: work_item.run() + continue + executor = executor_reference() + # Exit if: + # - The interpreter is shutting down OR + # - The executor that owns the worker has been collected OR + # - The executor that owns the worker has been shutdown. + if _shutdown or executor is None or executor._shutdown: + # Notice other workers + work_queue.put(None) + return + del executor except BaseException: _base.LOGGER.critical('Exception in worker', exc_info=True) @@ -103,8 +93,6 @@ class ThreadPoolExecutor(_base.Executor): max_workers: The maximum number of threads that can be used to execute the given calls. """ - _remove_dead_thread_references() - self._max_workers = max_workers self._work_queue = queue.Queue() self._threads = set() @@ -125,19 +113,25 @@ class ThreadPoolExecutor(_base.Executor): submit.__doc__ = _base.Executor.submit.__doc__ def _adjust_thread_count(self): + # When the executor gets lost, the weakref callback will wake up + # the worker threads. + def weakref_cb(_, q=self._work_queue): + q.put(None) # TODO(bquinlan): Should avoid creating new threads if there are more # idle threads than items in the work queue. if len(self._threads) < self._max_workers: t = threading.Thread(target=_worker, - args=(weakref.ref(self), self._work_queue)) + args=(weakref.ref(self, weakref_cb), + self._work_queue)) t.daemon = True t.start() self._threads.add(t) - _thread_references.add(weakref.ref(t)) + _threads_queues[t] = self._work_queue def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown = True + self._work_queue.put(None) if wait: for t in self._threads: t.join() |