summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGeorg Brandl <georg@python.org>2014-09-22 17:29:52 +0200
committerGeorg Brandl <georg@python.org>2014-09-22 17:29:52 +0200
commit0488306cad87d948c89f00f2968eabc3dc5caec0 (patch)
treec3fabb7129b30d77bbae2304bddd1c03cce01d09
parent1d8e60fd86bc5297ad419fac45c5ea2a6b23513f (diff)
downloadsphinx-0488306cad87d948c89f00f2968eabc3dc5caec0.tar.gz
Refactor parallel process into a base class that executes any task, and a derived class that executes a batch of the same task.
-rw-r--r--sphinx/builders/__init__.py10
-rw-r--r--sphinx/builders/html.py4
-rw-r--r--sphinx/environment.py10
-rw-r--r--sphinx/util/parallel.py145
4 files changed, 103 insertions, 66 deletions
diff --git a/sphinx/builders/__init__.py b/sphinx/builders/__init__.py
index 2fe45db2..33120a2b 100644
--- a/sphinx/builders/__init__.py
+++ b/sphinx/builders/__init__.py
@@ -23,7 +23,7 @@ from docutils import nodes
from sphinx.util import i18n, path_stabilize
from sphinx.util.osutil import SEP, relative_uri, find_catalog
from sphinx.util.console import bold, darkgreen
-from sphinx.util.parallel import ParallelProcess, parallel_available
+from sphinx.util.parallel import ParallelChunked, parallel_available
# side effect: registers roles and directives
from sphinx import roles
@@ -361,18 +361,18 @@ class Builder(object):
self.write_doc_serialized(firstname, doctree)
self.write_doc(firstname, doctree)
- proc = ParallelProcess(write_process, process_warnings, nproc)
+ proc = ParallelChunked(write_process, process_warnings, nproc)
proc.set_arguments(docnames)
- for chunk in self.app.status_iterator(proc.spawn(), 'writing output... ',
- darkgreen, proc.nchunks):
+ for chunk in self.app.status_iterator(
+ proc.iter_chunks(), 'writing output... ', darkgreen, proc.nchunks):
for i, docname in enumerate(chunk):
doctree = self.env.get_and_resolve_doctree(docname, self)
self.write_doc_serialized(docname, doctree)
chunk[i] = (docname, doctree)
# make sure all threads have finished
- self.info(bold('waiting for workers... '))
+ self.info(bold('waiting for workers...'))
proc.join()
def prepare_writing(self, docnames):
diff --git a/sphinx/builders/html.py b/sphinx/builders/html.py
index e364a4da..8f853310 100644
--- a/sphinx/builders/html.py
+++ b/sphinx/builders/html.py
@@ -29,7 +29,7 @@ from docutils.readers.doctree import Reader as DoctreeReader
from sphinx import package_dir, __version__
from sphinx.util import jsonimpl, copy_static_entry
from sphinx.util.osutil import SEP, os_path, relative_uri, ensuredir, \
- movefile, ustrftime, copyfile
+ movefile, ustrftime, copyfile
from sphinx.util.nodes import inline_all_toctrees
from sphinx.util.matching import patmatch, compile_matchers
from sphinx.locale import _
@@ -40,7 +40,7 @@ from sphinx.application import ENV_PICKLE_FILENAME
from sphinx.highlighting import PygmentsBridge
from sphinx.util.console import bold, darkgreen, brown
from sphinx.writers.html import HTMLWriter, HTMLTranslator, \
- SmartyPantsHTMLTranslator
+ SmartyPantsHTMLTranslator
#: the filename for the inventory of objects
INVENTORY_FILENAME = 'objects.inv'
diff --git a/sphinx/environment.py b/sphinx/environment.py
index 86718ed8..c5031528 100644
--- a/sphinx/environment.py
+++ b/sphinx/environment.py
@@ -42,7 +42,7 @@ from sphinx.util.nodes import clean_astext, make_refnode, WarningStream
from sphinx.util.osutil import SEP, find_catalog_files, getcwd, fs_encoding
from sphinx.util.console import bold, purple
from sphinx.util.matching import compile_matchers
-from sphinx.util.parallel import ParallelProcess, parallel_available
+from sphinx.util.parallel import ParallelChunked, parallel_available
from sphinx.util.websupport import is_commentable
from sphinx.errors import SphinxError, ExtensionError
from sphinx.locale import _
@@ -619,16 +619,16 @@ class BuildEnvironment:
warnings.extend(otherenv.warnings)
self.merge_info_from(docs, otherenv, app)
- proc = ParallelProcess(read_process, merge, nproc)
+ proc = ParallelChunked(read_process, merge, nproc)
proc.set_arguments(docnames)
warnings = []
- for chunk in app.status_iterator(proc.spawn(), 'reading sources... ',
- purple, proc.nchunks):
+ for chunk in app.status_iterator(
+ proc.iter_chunks(), 'reading sources... ', purple, proc.nchunks):
pass # spawning in the iterator
# make sure all threads have finished
- app.info(bold('waiting for workers... '))
+ app.info(bold('waiting for workers...'))
proc.join()
for warning in warnings:
diff --git a/sphinx/util/parallel.py b/sphinx/util/parallel.py
index 44a69800..3fb40895 100644
--- a/sphinx/util/parallel.py
+++ b/sphinx/util/parallel.py
@@ -26,20 +26,102 @@ from sphinx.errors import SphinxParallelError
parallel_available = multiprocessing and (os.name == 'posix')
-class ParallelProcess(object):
+class SerialTasks(object):
+ """Has the same interface as ParallelTasks, but executes tasks directly."""
- def __init__(self, process_func, result_func, nproc, maxbatch=10):
- self.process_func = process_func
- self.result_func = result_func
+ def __init__(self, nproc=1):
+ pass
+
+ def add_task(self, task_func, arg=None, result_func=None):
+ if arg is not None:
+ res = task_func(arg)
+ else:
+ res = task_func()
+ if result_func:
+ result_func(res)
+
+ def join(self):
+ pass
+
+
+class ParallelTasks(object):
+ """Executes *nproc* tasks in parallel after forking."""
+
+ def __init__(self, nproc):
self.nproc = nproc
- self.maxbatch = maxbatch
# list of threads to join when waiting for completion
self._threads = []
- self._chunks = []
- self.nchunks = 0
+ self._nthreads = 0
# queue of result objects to process
self.result_queue = queue.Queue()
self._nprocessed = 0
+ # maps tasks to result functions
+ self._result_funcs = {}
+ # allow only "nproc" worker processes at once
+ self._semaphore = threading.Semaphore(self.nproc)
+
+ def _process_thread(self, tid, func, arg):
+ def process(pipe, arg):
+ try:
+ if arg is None:
+ ret = func()
+ else:
+ ret = func(arg)
+ pipe.send((False, ret))
+ except BaseException as err:
+ pipe.send((True, (err, traceback.format_exc())))
+
+ precv, psend = multiprocessing.Pipe(False)
+ proc = multiprocessing.Process(target=process, args=(psend, arg))
+ proc.start()
+ result = precv.recv()
+ self.result_queue.put((tid, arg) + result)
+ proc.join()
+ self._semaphore.release()
+
+ def add_task(self, task_func, arg=None, result_func=None):
+ tid = len(self._threads)
+ self._semaphore.acquire()
+ t = threading.Thread(target=self._process_thread,
+ args=(tid, task_func, arg))
+ t.setDaemon(True)
+ t.start()
+ self._nthreads += 1
+ self._threads.append(t)
+ self._result_funcs[tid] = result_func or (lambda *x: None)
+ # try processing results already in parallel
+ try:
+ tid, arg, exc, result = self.result_queue.get(False)
+ except queue.Empty:
+ pass
+ else:
+ if exc:
+ raise SphinxParallelError(*result)
+ self._result_funcs.pop(tid)(arg, result)
+ self._nprocessed += 1
+
+ def join(self):
+ while self._nprocessed < self._nthreads:
+ tid, arg, exc, result = self.result_queue.get()
+ if exc:
+ raise SphinxParallelError(*result)
+ self._result_funcs.pop(tid)(arg, result)
+ self._nprocessed += 1
+
+ for t in self._threads:
+ t.join()
+
+
+class ParallelChunked(ParallelTasks):
+ """Executes chunks of a list of arguments in parallel."""
+
+ def __init__(self, process_func, result_func, nproc, maxbatch=10):
+ ParallelTasks.__init__(self, nproc)
+ self.process_func = process_func
+ self.result_func = result_func
+ self.maxbatch = maxbatch
+ self._chunks = []
+ self.nchunks = 0
def set_arguments(self, arguments):
# determine how many documents to read in one go
@@ -54,53 +136,8 @@ class ParallelProcess(object):
self._chunks = [arguments[i*chunksize:(i+1)*chunksize] for i in range(nchunks)]
self.nchunks = len(self._chunks)
- def spawn(self):
+ def iter_chunks(self):
assert self._chunks
-
- def process(pipe, chunk):
- try:
- ret = self.process_func(chunk)
- pipe.send((False, ret))
- except BaseException as err:
- pipe.send((True, (err, traceback.format_exc())))
-
- def process_thread(chunk):
- precv, psend = multiprocessing.Pipe(False)
- proc = multiprocessing.Process(target=process, args=(psend, chunk))
- proc.start()
- result = precv.recv()
- self.result_queue.put((chunk,) + result)
- proc.join()
- semaphore.release()
-
- # allow only "nproc" worker processes at once
- semaphore = threading.Semaphore(self.nproc)
-
for chunk in self._chunks:
yield chunk
- semaphore.acquire()
- t = threading.Thread(target=process_thread, args=(chunk,))
- t.setDaemon(True)
- t.start()
- self._threads.append(t)
- # try processing results already in parallel
- try:
- chunk, exc, result = self.result_queue.get(False)
- except queue.Empty:
- pass
- else:
- if exc:
- raise SphinxParallelError(*result)
- self.result_func(chunk, result)
- self._nprocessed += 1
-
- def join(self):
- while self._nprocessed < self.nchunks:
- chunk, exc, result = self.result_queue.get()
- if exc:
- raise SphinxParallelError(*result)
- self.result_func(chunk, result)
- self._nprocessed += 1
-
- for t in self._threads:
- t.join()
+ self.add_task(self.process_func, chunk, self.result_func)