From 0beb754040c1e2807f1bdef0ccf6676ce2572efb Mon Sep 17 00:00:00 2001 From: "brian.quinlan" Date: Sun, 25 Oct 2009 03:48:12 +0000 Subject: Improved ProcessPoolExecutor module documentation. Ensure that inactive threads are periodically collected rather than leaking. --- python2/futures/process.py | 162 +++++++++++++++++++++++++++++++++++++++----- python3/futures/process.py | 164 ++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 291 insertions(+), 35 deletions(-) diff --git a/python2/futures/process.py b/python2/futures/process.py index fdea346..7f1b153 100644 --- a/python2/futures/process.py +++ b/python2/futures/process.py @@ -1,7 +1,48 @@ # Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. -"""Implements ProcessPoolExecutor.""" +"""Implements ProcessPoolExecutor. +The follow diagram and text describe the data-flow through the system: + +|======================= In-process =====================|== Out-of-process ==| + ++----------+ +----------+ +--------+ +-----------+ +---------+ +| | => | Work Ids | => | | => | Call Q | => | | +| | +----------+ | | +-----------+ | | +| | | ... | | | | ... | | | +| | | 6 | | | | 5, call() | | | +| | | 7 | | | | ... | | | +| Process | | ... | | Local | +-----------+ | Process | +| Pool | +----------+ | Worker | | #1..n | +| Executor | | Thread | | | +| | +----------- + | | +-----------+ | | +| | <=> | Work Items | <=> | | <= | Result Q | <= | | +| | +------------+ | | +-----------+ | | +| | | 6: call() | | | | ... | | | +| | | future | | | | 4, result | | | +| | | ... | | | | 3, except | | | ++----------+ +------------+ +--------+ +-----------+ +---------+ + +Executor.run_to_futures() called: +- creates a uniquely numbered _WorkItem for each call and adds them to the + "Work Items" dict +- adds the id of the _WorkItem to the "Work Ids" queue + +Local worker thread: +- reads work ids from the "Work Ids" queue and looks up the corresponding + WorkItem from the "Work Items" dict: if the work item has been cancelled then + it is simply removed from the dict, otherwise it is repackaged as a + _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" + until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because + calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). +- reads _ResultItems from "Result Q", updates the future stored in the + "Work Items" dict and deletes the dict entry + +Process #1..n: +- reads _CallItems from "Call Q", executes the calls, and puts the resulting + _ResultItems in "Request Q" +""" + __author__ = 'Brian Quinlan (brian@sweetapp.com)' from futures._base import (PENDING, RUNNING, CANCELLED, @@ -15,9 +56,22 @@ import multiprocessing import threading import weakref +# Workers are created as daemon threads and processes. This is done to allow the +# interpreter to exit when there are still idle processes in a +# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, +# allowing workers to die with the interpreter has two undesirable properties: +# - The workers would still be running during interpretor shutdown, +# meaning that they would fail in unpredictable ways. +# - The workers could be killed while evaluating a work item, which could +# be bad if the function being evaluated has external side-effects e.g. +# writing to a file. +# +# To work around this problem, an exit handler is installed which tells the +# workers to exit when their work queues are empty and then waits until the +# threads/processes finish. + _thread_references = set() _shutdown = False -EXTRA_QUEUED_CALLS = 1 def _python_exit(): global _shutdown @@ -27,6 +81,24 @@ def _python_exit(): if thread is not None: thread.join() +def _remove_dead_thread_references(): + """Remove inactive threads from _thread_references. + + Should be called periodically to prevent memory leaks in scenarios such as: + >>> while True: + >>> ... t = ThreadPoolExecutor(max_threads=5) + >>> ... t.map(int, ['1', '2', '3', '4', '5']) + """ + for thread_reference in set(_thread_references): + if thread_reference() is None: + _thread_references.discard(thread_reference) + +# Controls how many more calls than processes will be queued in the call queue. +# A smaller number will mean that processes spend more time idle waiting for +# work while a larger number will make Future.cancel() succeed less frequently +# (Futures in the call queue cannot be cancelled). +EXTRA_QUEUED_CALLS = 1 + class _WorkItem(object): def __init__(self, call, future, completion_tracker): self.call = call @@ -45,6 +117,18 @@ class _CallItem(object): self.call = call def _process_worker(call_queue, result_queue, shutdown): + """Evaluates calls from call_queue and places the results in result_queue. + + This worker is run in a seperate process. + + Args: + call_queue: A multiprocessing.Queue of _CallItems that will be read and + evaluated by the worker. + result_queue: A multiprocessing.Queue of _ResultItems that will written + to by the worker. + shutdown: A multiprocessing.Event that will be set as a signal to the + worker that it should exit when call_queue is empty. + """ while True: try: call_item = call_queue.get(block=True, timeout=0.1) @@ -64,6 +148,20 @@ def _process_worker(call_queue, result_queue, shutdown): def _add_call_item_to_queue(pending_work_items, work_ids, call_queue): + """Fills call_queue with _WorkItems from pending_work_items. + + This function never blocks. + + Args: + pending_work_items: A dict mapping work ids to _WorkItems e.g. + {5: <_WorkItem...>, 6: <_WorkItem...>, ...} + work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids + are consumed and the corresponding _WorkItems from + pending_work_items are transformed into _CallItems and put in + call_queue. + call_queue: A multiprocessing.Queue that will be filled with _CallItems + derived from _WorkItems. + """ while True: if call_queue.full(): return @@ -88,13 +186,34 @@ def _add_call_item_to_queue(pending_work_items, work_item.future._condition.release() call_queue.put(_CallItem(work_id, work_item.call), block=True) -def _result(executor_reference, - processes, - pending_work_items, - work_ids_queue, - call_queue, - result_queue, - shutdown_process_event): +def _queue_manangement_worker(executor_reference, + processes, + pending_work_items, + work_ids_queue, + call_queue, + result_queue, + shutdown_process_event): + """Manages the communication between this process and the worker processes. + + This function is run in a local thread. + + Args: + executor_reference: A weakref.ref to the ProcessPoolExecutor that owns + this thread. Used to determine if the ProcessPoolExecutor has been + garbage collected and that this function can exit. + process: A list of the multiprocessing.Process instances used as + workers. + pending_work_items: A dict mapping work ids to _WorkItems e.g. + {5: <_WorkItem...>, 6: <_WorkItem...>, ...} + work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). + call_queue: A multiprocessing.Queue that will be filled with _CallItems + derived from _WorkItems for processing by the process workers. + result_queue: A multiprocessing.Queue of _ResultItems generated by the + process workers. + shutdown_process_event: A multiprocessing.Event used to signal the + process workers that they should exit when their work queue is + empty. + """ while True: _add_call_item_to_queue(pending_work_items, work_ids_queue, @@ -135,16 +254,23 @@ def _result(executor_reference, class ProcessPoolExecutor(Executor): def __init__(self, max_processes=None): - import warnings - warnings.warn('ProcessPoolExecutor has known deadlocking behavior') + """Initializes a new ProcessPoolExecutor instance. + + Args: + max_processes: The maximum number of processes that can be used to + execute the given calls. If None or not given then as many + worker processes will be created as the machine has processors. + """ + _remove_dead_thread_references() if max_processes is None: - max_processes = multiprocessing.cpu_count() + self._max_processes = multiprocessing.cpu_count() + else: + self._max_processes = max_processes - self._max_processes = max_processes # Make the call queue slightly larger than the number of processes to - # prevent the worker processes from starving but to make future.cancel() - # responsive. + # prevent the worker processes from idling. But don't make it too big + # because futures in the call queue cannot be cancelled. self._call_queue = multiprocessing.Queue(self._max_processes + EXTRA_QUEUED_CALLS) self._result_queue = multiprocessing.Queue() @@ -159,10 +285,10 @@ class ProcessPoolExecutor(Executor): self._queue_count = 0 self._pending_work_items = {} - def _adjust_process_count(self): + def _start_queue_management_thread(self): if self._queue_management_thread is None: self._queue_management_thread = threading.Thread( - target=_result, + target=_queue_manangement_worker, args=(weakref.ref(self), self._processes, self._pending_work_items, @@ -174,6 +300,7 @@ class ProcessPoolExecutor(Executor): self._queue_management_thread.start() _thread_references.add(weakref.ref(self._queue_management_thread)) + def _adjust_process_count(self): for _ in range(len(self._processes), self._max_processes): p = multiprocessing.Process( target=_process_worker, @@ -200,6 +327,7 @@ class ProcessPoolExecutor(Executor): futures.append(f) self._queue_count += 1 + self._start_queue_management_thread() self._adjust_process_count() fl = FutureList(futures, event_sink) fl.wait(timeout=timeout, return_when=return_when) diff --git a/python3/futures/process.py b/python3/futures/process.py index af6e6be..e888a14 100644 --- a/python3/futures/process.py +++ b/python3/futures/process.py @@ -1,6 +1,47 @@ # Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. -"""Implements ProcessPoolExecutor.""" +"""Implements ProcessPoolExecutor. + +The follow diagram and text describe the data-flow through the system: + +|======================= In-process =====================|== Out-of-process ==| + ++----------+ +----------+ +--------+ +-----------+ +---------+ +| | => | Work Ids | => | | => | Call Q | => | | +| | +----------+ | | +-----------+ | | +| | | ... | | | | ... | | | +| | | 6 | | | | 5, call() | | | +| | | 7 | | | | ... | | | +| Process | | ... | | Local | +-----------+ | Process | +| Pool | +----------+ | Worker | | #1..n | +| Executor | | Thread | | | +| | +----------- + | | +-----------+ | | +| | <=> | Work Items | <=> | | <= | Result Q | <= | | +| | +------------+ | | +-----------+ | | +| | | 6: call() | | | | ... | | | +| | | future | | | | 4, result | | | +| | | ... | | | | 3, except | | | ++----------+ +------------+ +--------+ +-----------+ +---------+ + +Executor.run_to_futures() called: +- creates a uniquely numbered _WorkItem for each call and adds them to the + "Work Items" dict +- adds the id of the _WorkItem to the "Work Ids" queue + +Local worker thread: +- reads work ids from the "Work Ids" queue and looks up the corresponding + WorkItem from the "Work Items" dict: if the work item has been cancelled then + it is simply removed from the dict, otherwise it is repackaged as a + _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" + until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because + calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). +- reads _ResultItems from "Result Q", updates the future stored in the + "Work Items" dict and deletes the dict entry + +Process #1..n: +- reads _CallItems from "Call Q", executes the calls, and puts the resulting + _ResultItems in "Request Q" +""" __author__ = 'Brian Quinlan (brian@sweetapp.com)' @@ -13,12 +54,24 @@ import atexit import queue import multiprocessing import threading -import warnings import weakref +# Workers are created as daemon threads and processes. This is done to allow the +# interpreter to exit when there are still idle processes in a +# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, +# allowing workers to die with the interpreter has two undesirable properties: +# - The workers would still be running during interpretor shutdown, +# meaning that they would fail in unpredictable ways. +# - The workers could be killed while evaluating a work item, which could +# be bad if the function being evaluated has external side-effects e.g. +# writing to a file. +# +# To work around this problem, an exit handler is installed which tells the +# workers to exit when their work queues are empty and then waits until the +# threads/processes finish. + _thread_references = set() _shutdown = False -EXTRA_QUEUED_CALLS = 1 def _python_exit(): global _shutdown @@ -28,6 +81,24 @@ def _python_exit(): if thread is not None: thread.join() +def _remove_dead_thread_references(): + """Remove inactive threads from _thread_references. + + Should be called periodically to prevent memory leaks in scenarios such as: + >>> while True: + >>> ... t = ThreadPoolExecutor(max_threads=5) + >>> ... t.map(int, ['1', '2', '3', '4', '5']) + """ + for thread_reference in set(_thread_references): + if thread_reference() is None: + _thread_references.discard(thread_reference) + +# Controls how many more calls than processes will be queued in the call queue. +# A smaller number will mean that processes spend more time idle waiting for +# work while a larger number will make Future.cancel() succeed less frequently +# (Futures in the call queue cannot be cancelled). +EXTRA_QUEUED_CALLS = 1 + class _WorkItem(object): def __init__(self, call, future, completion_tracker): self.call = call @@ -46,6 +117,18 @@ class _CallItem(object): self.call = call def _process_worker(call_queue, result_queue, shutdown): + """Evaluates calls from call_queue and places the results in result_queue. + + This worker is run in a seperate process. + + Args: + call_queue: A multiprocessing.Queue of _CallItems that will be read and + evaluated by the worker. + result_queue: A multiprocessing.Queue of _ResultItems that will written + to by the worker. + shutdown: A multiprocessing.Event that will be set as a signal to the + worker that it should exit when call_queue is empty. + """ while True: try: call_item = call_queue.get(block=True, timeout=0.1) @@ -65,6 +148,20 @@ def _process_worker(call_queue, result_queue, shutdown): def _add_call_item_to_queue(pending_work_items, work_ids, call_queue): + """Fills call_queue with _WorkItems from pending_work_items. + + This function never blocks. + + Args: + pending_work_items: A dict mapping work ids to _WorkItems e.g. + {5: <_WorkItem...>, 6: <_WorkItem...>, ...} + work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids + are consumed and the corresponding _WorkItems from + pending_work_items are transformed into _CallItems and put in + call_queue. + call_queue: A multiprocessing.Queue that will be filled with _CallItems + derived from _WorkItems. + """ while True: if call_queue.full(): return @@ -87,13 +184,34 @@ def _add_call_item_to_queue(pending_work_items, call_queue.put(_CallItem(work_id, work_item.call), block=True) -def _result(executor_reference, - processes, - pending_work_items, - work_ids_queue, - call_queue, - result_queue, - shutdown_process_event): +def _queue_manangement_worker(executor_reference, + processes, + pending_work_items, + work_ids_queue, + call_queue, + result_queue, + shutdown_process_event): + """Manages the communication between this process and the worker processes. + + This function is run in a local thread. + + Args: + executor_reference: A weakref.ref to the ProcessPoolExecutor that owns + this thread. Used to determine if the ProcessPoolExecutor has been + garbage collected and that this function can exit. + process: A list of the multiprocessing.Process instances used as + workers. + pending_work_items: A dict mapping work ids to _WorkItems e.g. + {5: <_WorkItem...>, 6: <_WorkItem...>, ...} + work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). + call_queue: A multiprocessing.Queue that will be filled with _CallItems + derived from _WorkItems for processing by the process workers. + result_queue: A multiprocessing.Queue of _ResultItems generated by the + process workers. + shutdown_process_event: A multiprocessing.Event used to signal the + process workers that they should exit when their work queue is + empty. + """ while True: _add_call_item_to_queue(pending_work_items, work_ids_queue, @@ -135,15 +253,23 @@ def _result(executor_reference, class ProcessPoolExecutor(Executor): def __init__(self, max_processes=None): - warnings.warn('ProcessPoolExecutor has known deadlocking behavior') + """Initializes a new ProcessPoolExecutor instance. + + Args: + max_processes: The maximum number of processes that can be used to + execute the given calls. If None or not given then as many + worker processes will be created as the machine has processors. + """ + _remove_dead_thread_references() if max_processes is None: - max_processes = multiprocessing.cpu_count() + self._max_processes = multiprocessing.cpu_count() + else: + self._max_processes = max_processes - self._max_processes = max_processes # Make the call queue slightly larger than the number of processes to - # prevent the worker processes from starving but to make future.cancel() - # responsive. + # prevent the worker processes from idling. But don't make it too big + # because futures in the call queue cannot be cancelled. self._call_queue = multiprocessing.Queue(self._max_processes + EXTRA_QUEUED_CALLS) self._result_queue = multiprocessing.Queue() @@ -158,10 +284,10 @@ class ProcessPoolExecutor(Executor): self._queue_count = 0 self._pending_work_items = {} - def _adjust_process_count(self): + def _start_queue_management_thread(self): if self._queue_management_thread is None: self._queue_management_thread = threading.Thread( - target=_result, + target=_queue_manangement_worker, args=(weakref.ref(self), self._processes, self._pending_work_items, @@ -169,10 +295,11 @@ class ProcessPoolExecutor(Executor): self._call_queue, self._result_queue, self._shutdown_process_event)) - self._queue_management_thread.setDaemon(True) + self._queue_management_thread.daemon = True self._queue_management_thread.start() _thread_references.add(weakref.ref(self._queue_management_thread)) + def _adjust_process_count(self): for _ in range(len(self._processes), self._max_processes): p = multiprocessing.Process( target=_process_worker, @@ -198,6 +325,7 @@ class ProcessPoolExecutor(Executor): futures.append(f) self._queue_count += 1 + self._start_queue_management_thread() self._adjust_process_count() fl = FutureList(futures, event_sink) fl.wait(timeout=timeout, return_when=return_when) -- cgit v1.2.1