summaryrefslogtreecommitdiff
path: root/sphinx/util/parallel.py
diff options
context:
space:
mode:
authorTakeshi KOMIYA <i.tkomiya@gmail.com>2016-12-22 21:57:29 +0900
committerTakeshi KOMIYA <i.tkomiya@gmail.com>2017-01-02 12:59:50 +0900
commitd8ad3d063c25278b22c20a46cb2b0b465b4047bf (patch)
tree30bf3790884600edad552dcc94558f974e6d6f48 /sphinx/util/parallel.py
parentb43523fcbecda58ffd1ed31d29d0c9363a42be86 (diff)
downloadsphinx-git-d8ad3d063c25278b22c20a46cb2b0b465b4047bf.tar.gz
sphinx.util.parallel supports logging in child workers
Diffstat (limited to 'sphinx/util/parallel.py')
-rw-r--r--sphinx/util/parallel.py27
1 files changed, 19 insertions, 8 deletions
diff --git a/sphinx/util/parallel.py b/sphinx/util/parallel.py
index 814af09b1..a92dd9639 100644
--- a/sphinx/util/parallel.py
+++ b/sphinx/util/parallel.py
@@ -21,11 +21,15 @@ except ImportError:
multiprocessing = None
from sphinx.errors import SphinxParallelError
+from sphinx.util import logging
if False:
# For type annotation
from typing import Any, Callable, Sequence # NOQA
+logger = logging.getLogger(__name__)
+
+
# our parallel functionality only works for the forking Process
parallel_available = multiprocessing and (os.name == 'posix')
@@ -75,19 +79,24 @@ class ParallelTasks(object):
def _process(self, pipe, func, arg):
# type: (Any, Callable, Any) -> None
try:
- if arg is None:
- ret = func()
- else:
- ret = func(arg)
- pipe.send((False, ret))
+ collector = logging.LogCollector()
+ with collector.collect():
+ if arg is None:
+ ret = func()
+ else:
+ ret = func(arg)
+ failed = False
except BaseException as err:
- pipe.send((True, (err, traceback.format_exc())))
+ failed = True
+ ret = (err, traceback.format_exc())
+ logging.convert_serializable(collector.logs)
+ pipe.send((failed, collector.logs, ret))
def add_task(self, task_func, arg=None, result_func=None):
# type: (Callable, Any, Callable) -> None
tid = self._taskid
self._taskid += 1
- self._result_funcs[tid] = result_func or (lambda arg: None)
+ self._result_funcs[tid] = result_func or (lambda arg, result: None)
self._args[tid] = arg
precv, psend = multiprocessing.Pipe(False)
proc = multiprocessing.Process(target=self._process,
@@ -105,9 +114,11 @@ class ParallelTasks(object):
# type: () -> None
for tid, pipe in iteritems(self._precvs):
if pipe.poll():
- exc, result = pipe.recv()
+ exc, logs, result = pipe.recv()
if exc:
raise SphinxParallelError(*result)
+ for log in logs:
+ logger.handle(log)
self._result_funcs.pop(tid)(self._args.pop(tid), result)
self._procs[tid].join()
self._pworking -= 1