summaryrefslogtreecommitdiff
path: root/sphinx/util/parallel.py
diff options
context:
space:
mode:
authorGeorg Brandl <georg@python.org>2015-12-18 21:02:46 +0100
committerGeorg Brandl <georg@python.org>2015-12-18 21:37:20 +0100
commit580405c714fc338f45e6694120a52bfdb84f8ca9 (patch)
tree071afcab2bb2d630478809754ada5fdabf45e768 /sphinx/util/parallel.py
parent9cdaf3406f4201b6d2c1116dcb19b12bfc481393 (diff)
downloadsphinx-git-580405c714fc338f45e6694120a52bfdb84f8ca9.tar.gz
More parallel code optimizations.
Diffstat (limited to 'sphinx/util/parallel.py')
-rw-r--r--sphinx/util/parallel.py67
1 files changed, 31 insertions, 36 deletions
diff --git a/sphinx/util/parallel.py b/sphinx/util/parallel.py
index a6985c86b..618fbdc38 100644
--- a/sphinx/util/parallel.py
+++ b/sphinx/util/parallel.py
@@ -10,14 +10,16 @@
"""
import os
+import time
import traceback
+from math import sqrt
try:
import multiprocessing
except ImportError:
multiprocessing = None
-from math import sqrt
+from six import iteritems
from sphinx.errors import SphinxParallelError
@@ -48,10 +50,8 @@ class ParallelTasks(object):
def __init__(self, nproc):
self.nproc = nproc
- # main task performed by each task, returning result
- self._task_func = 0
# (optional) function performed by each task on the result of main task
- self._result_func = 0
+ self._result_funcs = {}
# task arguments
self._args = {}
# list of subprocesses (both started and waiting)
@@ -75,55 +75,50 @@ class ParallelTasks(object):
except BaseException as err:
pipe.send((True, (err, traceback.format_exc())))
- def _result_func_wrapper(self, arg, result):
- result_func = self._result_func(arg, result)
- if result_func:
- result_func(result)
-
def add_task(self, task_func, arg=None, result_func=None):
- self._task_func = task_func # dummy code after first call
- self._result_func = result_func or (lambda *x: None) # dummy code after first call
tid = self._taskid
self._taskid += 1
+ self._result_funcs[tid] = result_func or (lambda arg: None)
self._args[tid] = arg
precv, psend = multiprocessing.Pipe(False)
proc = multiprocessing.Process(target=self._process,
- args=(psend, self._task_func, arg))
+ args=(psend, task_func, arg))
self._procs[tid] = proc
- if self._pworking < self.nproc:
- self._precvs[tid] = precv
- self._pworking += 1
- proc.start()
- else:
- self._precvsWaiting[tid] = precv
+ self._precvsWaiting[tid] = precv
+ self._join_one()
def join(self):
while self._pworking:
- for tid, pipe in self._precvs.items():
- if pipe.poll():
- exc, result = pipe.recv()
- if exc:
- raise SphinxParallelError(*result)
- self._result_func_wrapper(self._args[tid], result)
- self._procs[tid].join()
- if len(self._precvsWaiting) > 0:
- newtid, newprecv = self._precvsWaiting.popitem()
- self._precvs[newtid] = newprecv
- self._procs[newtid].start()
- break
- else:
- self._pworking -= 1
+ self._join_one()
+
+ def _join_one(self):
+ for tid, pipe in iteritems(self._precvs):
+ if pipe.poll():
+ exc, result = pipe.recv()
+ if exc:
+ raise SphinxParallelError(*result)
+ self._result_funcs.pop(tid)(self._args.pop(tid), result)
+ self._procs[tid].join()
+ self._pworking -= 1
+ break
+ else:
+ time.sleep(0.02)
+ while self._precvsWaiting and self._pworking < self.nproc:
+ newtid, newprecv = self._precvsWaiting.popitem()
+ self._precvs[newtid] = newprecv
+ self._procs[newtid].start()
+ self._pworking += 1
def make_chunks(arguments, nproc, maxbatch=10):
# determine how many documents to read in one go
nargs = len(arguments)
- chunksize = min(nargs // nproc, maxbatch)
- if chunksize == 0:
- chunksize = 1
- if chunksize == maxbatch:
+ chunksize = nargs // nproc
+ if chunksize >= maxbatch:
# try to improve batch size vs. number of batches
chunksize = int(sqrt(nargs/nproc * maxbatch))
+ if chunksize == 0:
+ chunksize = 1
nchunks, rest = divmod(nargs, chunksize)
if rest:
nchunks += 1