diff options
author | Georg Brandl <georg@python.org> | 2014-09-22 17:29:52 +0200 |
---|---|---|
committer | Georg Brandl <georg@python.org> | 2014-09-22 17:29:52 +0200 |
commit | 0488306cad87d948c89f00f2968eabc3dc5caec0 (patch) | |
tree | c3fabb7129b30d77bbae2304bddd1c03cce01d09 | |
parent | 1d8e60fd86bc5297ad419fac45c5ea2a6b23513f (diff) | |
download | sphinx-0488306cad87d948c89f00f2968eabc3dc5caec0.tar.gz |
Refactor parallel process into a base class that executes any task, and a derived class that executes a batch of the same task.
-rw-r--r-- | sphinx/builders/__init__.py | 10 | ||||
-rw-r--r-- | sphinx/builders/html.py | 4 | ||||
-rw-r--r-- | sphinx/environment.py | 10 | ||||
-rw-r--r-- | sphinx/util/parallel.py | 145 |
4 files changed, 103 insertions, 66 deletions
diff --git a/sphinx/builders/__init__.py b/sphinx/builders/__init__.py index 2fe45db2..33120a2b 100644 --- a/sphinx/builders/__init__.py +++ b/sphinx/builders/__init__.py @@ -23,7 +23,7 @@ from docutils import nodes from sphinx.util import i18n, path_stabilize from sphinx.util.osutil import SEP, relative_uri, find_catalog from sphinx.util.console import bold, darkgreen -from sphinx.util.parallel import ParallelProcess, parallel_available +from sphinx.util.parallel import ParallelChunked, parallel_available # side effect: registers roles and directives from sphinx import roles @@ -361,18 +361,18 @@ class Builder(object): self.write_doc_serialized(firstname, doctree) self.write_doc(firstname, doctree) - proc = ParallelProcess(write_process, process_warnings, nproc) + proc = ParallelChunked(write_process, process_warnings, nproc) proc.set_arguments(docnames) - for chunk in self.app.status_iterator(proc.spawn(), 'writing output... ', - darkgreen, proc.nchunks): + for chunk in self.app.status_iterator( + proc.iter_chunks(), 'writing output... ', darkgreen, proc.nchunks): for i, docname in enumerate(chunk): doctree = self.env.get_and_resolve_doctree(docname, self) self.write_doc_serialized(docname, doctree) chunk[i] = (docname, doctree) # make sure all threads have finished - self.info(bold('waiting for workers... ')) + self.info(bold('waiting for workers...')) proc.join() def prepare_writing(self, docnames): diff --git a/sphinx/builders/html.py b/sphinx/builders/html.py index e364a4da..8f853310 100644 --- a/sphinx/builders/html.py +++ b/sphinx/builders/html.py @@ -29,7 +29,7 @@ from docutils.readers.doctree import Reader as DoctreeReader from sphinx import package_dir, __version__ from sphinx.util import jsonimpl, copy_static_entry from sphinx.util.osutil import SEP, os_path, relative_uri, ensuredir, \ - movefile, ustrftime, copyfile + movefile, ustrftime, copyfile from sphinx.util.nodes import inline_all_toctrees from sphinx.util.matching import patmatch, compile_matchers from sphinx.locale import _ @@ -40,7 +40,7 @@ from sphinx.application import ENV_PICKLE_FILENAME from sphinx.highlighting import PygmentsBridge from sphinx.util.console import bold, darkgreen, brown from sphinx.writers.html import HTMLWriter, HTMLTranslator, \ - SmartyPantsHTMLTranslator + SmartyPantsHTMLTranslator #: the filename for the inventory of objects INVENTORY_FILENAME = 'objects.inv' diff --git a/sphinx/environment.py b/sphinx/environment.py index 86718ed8..c5031528 100644 --- a/sphinx/environment.py +++ b/sphinx/environment.py @@ -42,7 +42,7 @@ from sphinx.util.nodes import clean_astext, make_refnode, WarningStream from sphinx.util.osutil import SEP, find_catalog_files, getcwd, fs_encoding from sphinx.util.console import bold, purple from sphinx.util.matching import compile_matchers -from sphinx.util.parallel import ParallelProcess, parallel_available +from sphinx.util.parallel import ParallelChunked, parallel_available from sphinx.util.websupport import is_commentable from sphinx.errors import SphinxError, ExtensionError from sphinx.locale import _ @@ -619,16 +619,16 @@ class BuildEnvironment: warnings.extend(otherenv.warnings) self.merge_info_from(docs, otherenv, app) - proc = ParallelProcess(read_process, merge, nproc) + proc = ParallelChunked(read_process, merge, nproc) proc.set_arguments(docnames) warnings = [] - for chunk in app.status_iterator(proc.spawn(), 'reading sources... ', - purple, proc.nchunks): + for chunk in app.status_iterator( + proc.iter_chunks(), 'reading sources... ', purple, proc.nchunks): pass # spawning in the iterator # make sure all threads have finished - app.info(bold('waiting for workers... ')) + app.info(bold('waiting for workers...')) proc.join() for warning in warnings: diff --git a/sphinx/util/parallel.py b/sphinx/util/parallel.py index 44a69800..3fb40895 100644 --- a/sphinx/util/parallel.py +++ b/sphinx/util/parallel.py @@ -26,20 +26,102 @@ from sphinx.errors import SphinxParallelError parallel_available = multiprocessing and (os.name == 'posix') -class ParallelProcess(object): +class SerialTasks(object): + """Has the same interface as ParallelTasks, but executes tasks directly.""" - def __init__(self, process_func, result_func, nproc, maxbatch=10): - self.process_func = process_func - self.result_func = result_func + def __init__(self, nproc=1): + pass + + def add_task(self, task_func, arg=None, result_func=None): + if arg is not None: + res = task_func(arg) + else: + res = task_func() + if result_func: + result_func(res) + + def join(self): + pass + + +class ParallelTasks(object): + """Executes *nproc* tasks in parallel after forking.""" + + def __init__(self, nproc): self.nproc = nproc - self.maxbatch = maxbatch # list of threads to join when waiting for completion self._threads = [] - self._chunks = [] - self.nchunks = 0 + self._nthreads = 0 # queue of result objects to process self.result_queue = queue.Queue() self._nprocessed = 0 + # maps tasks to result functions + self._result_funcs = {} + # allow only "nproc" worker processes at once + self._semaphore = threading.Semaphore(self.nproc) + + def _process_thread(self, tid, func, arg): + def process(pipe, arg): + try: + if arg is None: + ret = func() + else: + ret = func(arg) + pipe.send((False, ret)) + except BaseException as err: + pipe.send((True, (err, traceback.format_exc()))) + + precv, psend = multiprocessing.Pipe(False) + proc = multiprocessing.Process(target=process, args=(psend, arg)) + proc.start() + result = precv.recv() + self.result_queue.put((tid, arg) + result) + proc.join() + self._semaphore.release() + + def add_task(self, task_func, arg=None, result_func=None): + tid = len(self._threads) + self._semaphore.acquire() + t = threading.Thread(target=self._process_thread, + args=(tid, task_func, arg)) + t.setDaemon(True) + t.start() + self._nthreads += 1 + self._threads.append(t) + self._result_funcs[tid] = result_func or (lambda *x: None) + # try processing results already in parallel + try: + tid, arg, exc, result = self.result_queue.get(False) + except queue.Empty: + pass + else: + if exc: + raise SphinxParallelError(*result) + self._result_funcs.pop(tid)(arg, result) + self._nprocessed += 1 + + def join(self): + while self._nprocessed < self._nthreads: + tid, arg, exc, result = self.result_queue.get() + if exc: + raise SphinxParallelError(*result) + self._result_funcs.pop(tid)(arg, result) + self._nprocessed += 1 + + for t in self._threads: + t.join() + + +class ParallelChunked(ParallelTasks): + """Executes chunks of a list of arguments in parallel.""" + + def __init__(self, process_func, result_func, nproc, maxbatch=10): + ParallelTasks.__init__(self, nproc) + self.process_func = process_func + self.result_func = result_func + self.maxbatch = maxbatch + self._chunks = [] + self.nchunks = 0 def set_arguments(self, arguments): # determine how many documents to read in one go @@ -54,53 +136,8 @@ class ParallelProcess(object): self._chunks = [arguments[i*chunksize:(i+1)*chunksize] for i in range(nchunks)] self.nchunks = len(self._chunks) - def spawn(self): + def iter_chunks(self): assert self._chunks - - def process(pipe, chunk): - try: - ret = self.process_func(chunk) - pipe.send((False, ret)) - except BaseException as err: - pipe.send((True, (err, traceback.format_exc()))) - - def process_thread(chunk): - precv, psend = multiprocessing.Pipe(False) - proc = multiprocessing.Process(target=process, args=(psend, chunk)) - proc.start() - result = precv.recv() - self.result_queue.put((chunk,) + result) - proc.join() - semaphore.release() - - # allow only "nproc" worker processes at once - semaphore = threading.Semaphore(self.nproc) - for chunk in self._chunks: yield chunk - semaphore.acquire() - t = threading.Thread(target=process_thread, args=(chunk,)) - t.setDaemon(True) - t.start() - self._threads.append(t) - # try processing results already in parallel - try: - chunk, exc, result = self.result_queue.get(False) - except queue.Empty: - pass - else: - if exc: - raise SphinxParallelError(*result) - self.result_func(chunk, result) - self._nprocessed += 1 - - def join(self): - while self._nprocessed < self.nchunks: - chunk, exc, result = self.result_queue.get() - if exc: - raise SphinxParallelError(*result) - self.result_func(chunk, result) - self._nprocessed += 1 - - for t in self._threads: - t.join() + self.add_task(self.process_func, chunk, self.result_func) |