summaryrefslogtreecommitdiff
path: root/Lib/concurrent
diff options
context:
space:
mode:
authorAntoine Pitrou <solipsis@pitrou.net>2014-10-04 20:20:10 +0200
committerAntoine Pitrou <solipsis@pitrou.net>2014-10-04 20:20:10 +0200
commit72425d2c7a2fff49e6126f575a385b0da2c1efc8 (patch)
tree661fedd1da777aa159411442c97327481656f07b /Lib/concurrent
parent99087ce79d2ad6fcc366064b872e4b312fdfe836 (diff)
downloadcpython-72425d2c7a2fff49e6126f575a385b0da2c1efc8.tar.gz
Issue #11271: concurrent.futures.Executor.map() now takes a *chunksize*
argument to allow batching of tasks in child processes and improve performance of ProcessPoolExecutor. Patch by Dan O'Reilly.
Diffstat (limited to 'Lib/concurrent')
-rw-r--r--Lib/concurrent/futures/_base.py6
-rw-r--r--Lib/concurrent/futures/process.py51
2 files changed, 56 insertions, 1 deletions
diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py
index c13b3b6b29..9e447137ad 100644
--- a/Lib/concurrent/futures/_base.py
+++ b/Lib/concurrent/futures/_base.py
@@ -520,7 +520,7 @@ class Executor(object):
"""
raise NotImplementedError()
- def map(self, fn, *iterables, timeout=None):
+ def map(self, fn, *iterables, timeout=None, chunksize=1):
"""Returns a iterator equivalent to map(fn, iter).
Args:
@@ -528,6 +528,10 @@ class Executor(object):
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
+ chunksize: The size of the chunks the iterable will be broken into
+ before being passed to a child process. This argument is only
+ used by ProcessPoolExecutor; it is ignored by
+ ThreadPoolExecutor.
Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 12993901e3..fc64dbe84b 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -55,6 +55,8 @@ from multiprocessing import SimpleQueue
from multiprocessing.connection import wait
import threading
import weakref
+from functools import partial
+import itertools
# Workers are created as daemon threads and processes. This is done to allow the
# interpreter to exit when there are still idle processes in a
@@ -108,6 +110,26 @@ class _CallItem(object):
self.args = args
self.kwargs = kwargs
+def _get_chunks(*iterables, chunksize):
+ """ Iterates over zip()ed iterables in chunks. """
+ it = zip(*iterables)
+ while True:
+ chunk = tuple(itertools.islice(it, chunksize))
+ if not chunk:
+ return
+ yield chunk
+
+def _process_chunk(fn, chunk):
+ """ Processes a chunk of an iterable passed to map.
+
+ Runs the function passed to map() on a chunk of the
+ iterable passed to map.
+
+ This function is run in a separate process.
+
+ """
+ return [fn(*args) for args in chunk]
+
def _process_worker(call_queue, result_queue):
"""Evaluates calls from call_queue and places the results in result_queue.
@@ -411,6 +433,35 @@ class ProcessPoolExecutor(_base.Executor):
return f
submit.__doc__ = _base.Executor.submit.__doc__
+ def map(self, fn, *iterables, timeout=None, chunksize=1):
+ """Returns a iterator equivalent to map(fn, iter).
+
+ Args:
+ fn: A callable that will take as many arguments as there are
+ passed iterables.
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+ chunksize: If greater than one, the iterables will be chopped into
+ chunks of size chunksize and submitted to the process pool.
+ If set to one, the items in the list will be sent one at a time.
+
+ Returns:
+ An iterator equivalent to: map(func, *iterables) but the calls may
+ be evaluated out-of-order.
+
+ Raises:
+ TimeoutError: If the entire result iterator could not be generated
+ before the given timeout.
+ Exception: If fn(*args) raises for any values.
+ """
+ if chunksize < 1:
+ raise ValueError("chunksize must be >= 1.")
+
+ results = super().map(partial(_process_chunk, fn),
+ _get_chunks(*iterables, chunksize=chunksize),
+ timeout=timeout)
+ return itertools.chain.from_iterable(results)
+
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown_thread = True