diff options
author | Georg Brandl <georg@python.org> | 2014-09-22 16:21:04 +0200 |
---|---|---|
committer | Georg Brandl <georg@python.org> | 2014-09-22 16:21:04 +0200 |
commit | 1f23a5c369ba58c6ec5ab806e25c63dd615327dd (patch) | |
tree | be37a9d8b8bc0b94da85227318617f9889473b21 /sphinx/util/parallel.py | |
parent | 7bbaa4c73f9e0188d13360d981c25403393c13f6 (diff) | |
download | sphinx-git-1f23a5c369ba58c6ec5ab806e25c63dd615327dd.tar.gz |
Factor out parallel building into a utility class. Better error handling
with traceback of the parallel process saved in the error log.
Diffstat (limited to 'sphinx/util/parallel.py')
-rw-r--r-- | sphinx/util/parallel.py | 106 |
1 files changed, 106 insertions, 0 deletions
diff --git a/sphinx/util/parallel.py b/sphinx/util/parallel.py new file mode 100644 index 000000000..44a69800d --- /dev/null +++ b/sphinx/util/parallel.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8 -*- +""" + sphinx.util.parallel + ~~~~~~~~~~~~~~~~~~~~ + + Parallel building utilities. + + :copyright: Copyright 2007-2014 by the Sphinx team, see AUTHORS. + :license: BSD, see LICENSE for details. +""" + +import os +import traceback + +try: + import multiprocessing + import threading +except ImportError: + multiprocessing = threading = None + +from six.moves import queue + +from sphinx.errors import SphinxParallelError + +# our parallel functionality only works for the forking Process +parallel_available = multiprocessing and (os.name == 'posix') + + +class ParallelProcess(object): + + def __init__(self, process_func, result_func, nproc, maxbatch=10): + self.process_func = process_func + self.result_func = result_func + self.nproc = nproc + self.maxbatch = maxbatch + # list of threads to join when waiting for completion + self._threads = [] + self._chunks = [] + self.nchunks = 0 + # queue of result objects to process + self.result_queue = queue.Queue() + self._nprocessed = 0 + + def set_arguments(self, arguments): + # determine how many documents to read in one go + nargs = len(arguments) + chunksize = min(nargs // self.nproc, self.maxbatch) + if chunksize == 0: + chunksize = 1 + nchunks, rest = divmod(nargs, chunksize) + if rest: + nchunks += 1 + # partition documents in "chunks" that will be written by one Process + self._chunks = [arguments[i*chunksize:(i+1)*chunksize] for i in range(nchunks)] + self.nchunks = len(self._chunks) + + def spawn(self): + assert self._chunks + + def process(pipe, chunk): + try: + ret = self.process_func(chunk) + pipe.send((False, ret)) + except BaseException as err: + pipe.send((True, (err, traceback.format_exc()))) + + def process_thread(chunk): + precv, psend = multiprocessing.Pipe(False) + proc = multiprocessing.Process(target=process, args=(psend, chunk)) + proc.start() + result = precv.recv() + self.result_queue.put((chunk,) + result) + proc.join() + semaphore.release() + + # allow only "nproc" worker processes at once + semaphore = threading.Semaphore(self.nproc) + + for chunk in self._chunks: + yield chunk + semaphore.acquire() + t = threading.Thread(target=process_thread, args=(chunk,)) + t.setDaemon(True) + t.start() + self._threads.append(t) + # try processing results already in parallel + try: + chunk, exc, result = self.result_queue.get(False) + except queue.Empty: + pass + else: + if exc: + raise SphinxParallelError(*result) + self.result_func(chunk, result) + self._nprocessed += 1 + + def join(self): + while self._nprocessed < self.nchunks: + chunk, exc, result = self.result_queue.get() + if exc: + raise SphinxParallelError(*result) + self.result_func(chunk, result) + self._nprocessed += 1 + + for t in self._threads: + t.join() |