diff options
author | Georg Brandl <georg@python.org> | 2015-12-18 21:02:46 +0100 |
---|---|---|
committer | Georg Brandl <georg@python.org> | 2015-12-18 21:37:20 +0100 |
commit | 580405c714fc338f45e6694120a52bfdb84f8ca9 (patch) | |
tree | 071afcab2bb2d630478809754ada5fdabf45e768 /sphinx/util/parallel.py | |
parent | 9cdaf3406f4201b6d2c1116dcb19b12bfc481393 (diff) | |
download | sphinx-git-580405c714fc338f45e6694120a52bfdb84f8ca9.tar.gz |
More parallel code optimizations.
Diffstat (limited to 'sphinx/util/parallel.py')
-rw-r--r-- | sphinx/util/parallel.py | 67 |
1 files changed, 31 insertions, 36 deletions
diff --git a/sphinx/util/parallel.py b/sphinx/util/parallel.py index a6985c86b..618fbdc38 100644 --- a/sphinx/util/parallel.py +++ b/sphinx/util/parallel.py @@ -10,14 +10,16 @@ """ import os +import time import traceback +from math import sqrt try: import multiprocessing except ImportError: multiprocessing = None -from math import sqrt +from six import iteritems from sphinx.errors import SphinxParallelError @@ -48,10 +50,8 @@ class ParallelTasks(object): def __init__(self, nproc): self.nproc = nproc - # main task performed by each task, returning result - self._task_func = 0 # (optional) function performed by each task on the result of main task - self._result_func = 0 + self._result_funcs = {} # task arguments self._args = {} # list of subprocesses (both started and waiting) @@ -75,55 +75,50 @@ class ParallelTasks(object): except BaseException as err: pipe.send((True, (err, traceback.format_exc()))) - def _result_func_wrapper(self, arg, result): - result_func = self._result_func(arg, result) - if result_func: - result_func(result) - def add_task(self, task_func, arg=None, result_func=None): - self._task_func = task_func # dummy code after first call - self._result_func = result_func or (lambda *x: None) # dummy code after first call tid = self._taskid self._taskid += 1 + self._result_funcs[tid] = result_func or (lambda arg: None) self._args[tid] = arg precv, psend = multiprocessing.Pipe(False) proc = multiprocessing.Process(target=self._process, - args=(psend, self._task_func, arg)) + args=(psend, task_func, arg)) self._procs[tid] = proc - if self._pworking < self.nproc: - self._precvs[tid] = precv - self._pworking += 1 - proc.start() - else: - self._precvsWaiting[tid] = precv + self._precvsWaiting[tid] = precv + self._join_one() def join(self): while self._pworking: - for tid, pipe in self._precvs.items(): - if pipe.poll(): - exc, result = pipe.recv() - if exc: - raise SphinxParallelError(*result) - self._result_func_wrapper(self._args[tid], result) - self._procs[tid].join() - if len(self._precvsWaiting) > 0: - newtid, newprecv = self._precvsWaiting.popitem() - self._precvs[newtid] = newprecv - self._procs[newtid].start() - break - else: - self._pworking -= 1 + self._join_one() + + def _join_one(self): + for tid, pipe in iteritems(self._precvs): + if pipe.poll(): + exc, result = pipe.recv() + if exc: + raise SphinxParallelError(*result) + self._result_funcs.pop(tid)(self._args.pop(tid), result) + self._procs[tid].join() + self._pworking -= 1 + break + else: + time.sleep(0.02) + while self._precvsWaiting and self._pworking < self.nproc: + newtid, newprecv = self._precvsWaiting.popitem() + self._precvs[newtid] = newprecv + self._procs[newtid].start() + self._pworking += 1 def make_chunks(arguments, nproc, maxbatch=10): # determine how many documents to read in one go nargs = len(arguments) - chunksize = min(nargs // nproc, maxbatch) - if chunksize == 0: - chunksize = 1 - if chunksize == maxbatch: + chunksize = nargs // nproc + if chunksize >= maxbatch: # try to improve batch size vs. number of batches chunksize = int(sqrt(nargs/nproc * maxbatch)) + if chunksize == 0: + chunksize = 1 nchunks, rest = divmod(nargs, chunksize) if rest: nchunks += 1 |