diff options
author | Valentin David <valentin.david@codethink.co.uk> | 2020-03-10 14:34:57 +0000 |
---|---|---|
committer | Valentin David <valentin.david@codethink.co.uk> | 2020-03-10 14:34:57 +0000 |
commit | 9f7b8671b701381fea8ee61f789a5d6aa1759c78 (patch) | |
tree | 7c83653319787ad96f7cc1e2729190998933ddf3 | |
parent | c12c7f596f15842028a46fff0ad062b3b4e2988f (diff) | |
parent | 884ab574593a04a4790e45aaf92f1c4fa6af4bb9 (diff) | |
download | buildstream-9f7b8671b701381fea8ee61f789a5d6aa1759c78.tar.gz |
Merge branch 'valentindavid/bst-1/python3.8-with-backports' into 'bst-1'
[BuildStream 1.4] Backport changes to enable Python 3.8
See merge request BuildStream/buildstream!1830
-rw-r--r-- | buildstream/_scheduler/_multiprocessing.py | 79 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/job.py | 63 | ||||
-rw-r--r-- | buildstream/_scheduler/scheduler.py | 24 | ||||
-rw-r--r-- | requirements/cov-requirements.in | 3 | ||||
-rw-r--r-- | requirements/cov-requirements.txt | 5 | ||||
-rw-r--r-- | requirements/dev-requirements.txt | 2 | ||||
-rw-r--r-- | tox.ini | 20 |
7 files changed, 124 insertions, 72 deletions
diff --git a/buildstream/_scheduler/_multiprocessing.py b/buildstream/_scheduler/_multiprocessing.py new file mode 100644 index 000000000..4864e140c --- /dev/null +++ b/buildstream/_scheduler/_multiprocessing.py @@ -0,0 +1,79 @@ +# +# Copyright (C) 2019 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# + +# TLDR: +# ALWAYS use `.AsyncioSafeProcess` when you have an asyncio event loop running and need a `multiprocessing.Process` +# +# +# The upstream asyncio library doesn't play well with forking subprocesses while an event loop is running. +# +# The main problem that affects us is that the parent and the child will share some file handlers. +# The most important one for us is the sig_handler_fd, which the loop uses to buffer signals received +# by the app so that the asyncio loop can treat them afterwards. +# +# This sharing means that when we send a signal to the child, the sighandler in the child will write +# it back to the parent sig_handler_fd, making the parent have to treat it too. +# This is a problem for example when we sigterm the process. The scheduler will send sigterms to all its children, +# which in turn will make the scheduler receive N SIGTERMs (one per child). Which in turn will send sigterms to +# the children... +# +# We therefore provide a `AsyncioSafeProcess` derived from multiprocessing.Process that automatically +# tries to cleanup the loop and never calls `waitpid` on the child process, which breaks our child watchers. +# +# +# Relevant issues: +# - Asyncio: support fork (https://bugs.python.org/issue21998) +# - Asyncio: support multiprocessing (support fork) (https://bugs.python.org/issue22087) +# - Signal delivered to a subprocess triggers parent's handler (https://bugs.python.org/issue31489) +# +# + +import multiprocessing +import signal +import sys +from asyncio import set_event_loop_policy + + +# _AsyncioSafeForkAwareProcess() +# +# Process class that doesn't call waitpid on its own. +# This prevents conflicts with the asyncio child watcher. +# +# Also automatically close any running asyncio loop before calling +# the actual run target +# +class _AsyncioSafeForkAwareProcess(multiprocessing.Process): + # pylint: disable=attribute-defined-outside-init + def start(self): + self._popen = self._Popen(self) + self._sentinel = self._popen.sentinel + + def run(self): + signal.set_wakeup_fd(-1) + set_event_loop_policy(None) + + super().run() + + +if sys.platform != "win32": + # Set the default event loop policy to automatically close our asyncio loop in child processes + AsyncioSafeProcess = _AsyncioSafeForkAwareProcess + +else: + # Windows doesn't support ChildWatcher that way anyways, we'll need another + # implementation if we want it + AsyncioSafeProcess = multiprocessing.Process diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index b8b4a2c76..adb520088 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -1,5 +1,6 @@ # # Copyright (C) 2018 Codethink Limited +# Copyright (C) 2019 Bloomberg Finance LP # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -32,6 +33,7 @@ import multiprocessing from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob from ..._message import Message, MessageType, unconditional_messages from ... import _signals, utils +from .. import _multiprocessing # Return code values shutdown of job handling child processes # @@ -64,15 +66,6 @@ class _Envelope(): self.message = message -# Process class that doesn't call waitpid on its own. -# This prevents conflicts with the asyncio child watcher. -class Process(multiprocessing.Process): - # pylint: disable=attribute-defined-outside-init - def start(self): - self._popen = self._Popen(self) - self._sentinel = self._popen.sentinel - - # Job() # # The Job object represents a parallel task, when calling Job.spawn(), @@ -127,39 +120,23 @@ class Job(): self._parent_start_listening() # Spawn the process - self._process = Process(target=self._child_action, args=[self._queue]) + self._process = _multiprocessing.AsyncioSafeProcess(target=self._child_action, args=[self._queue]) # Block signals which are handled in the main process such that # the child process does not inherit the parent's state, but the main # process will be notified of any signal after we launch the child. # with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False): - self._process.start() + with asyncio.get_child_watcher() as watcher: + self._process.start() + # Register the process to call `_parent_child_completed` once it is done - # Wait for the child task to complete. - # - # This is a tricky part of python which doesnt seem to - # make it to the online docs: - # - # o asyncio.get_child_watcher() will return a SafeChildWatcher() instance - # which is the default type of watcher, and the instance belongs to the - # "event loop policy" in use (so there is only one in the main process). - # - # o SafeChildWatcher() will register a SIGCHLD handler with the asyncio - # loop, and will selectively reap any child pids which have been - # terminated. - # - # o At registration time, the process will immediately be checked with - # `os.waitpid()` and will be reaped immediately, before add_child_handler() - # returns. - # - # The self._parent_child_completed callback passed here will normally - # be called after the child task has been reaped with `os.waitpid()`, in - # an event loop callback. Otherwise, if the job completes too fast, then - # the callback is called immediately. - # - self._watcher = asyncio.get_child_watcher() - self._watcher.add_child_handler(self._process.pid, self._parent_child_completed) + # Here we delay the call to the next loop tick. This is in order to be running + # in the main thread, as the callback itself must be thread safe. + def on_completion(pid, returncode): + asyncio.get_event_loop().call_soon(self._parent_child_completed, pid, returncode) + + watcher.add_child_handler(self._process.pid, on_completion) # terminate() # @@ -182,21 +159,15 @@ class Job(): self._terminated = True - # terminate_wait() + # get_terminated() # - # Wait for terminated jobs to complete - # - # Args: - # timeout (float): Seconds to wait + # Check if a job has been terminated. # # Returns: - # (bool): True if the process terminated cleanly, otherwise False + # (bool): True in the main process if Job.terminate() was called. # - def terminate_wait(self, timeout): - - # Join the child process after sending SIGTERM - self._process.join(timeout) - return self._process.exitcode is not None + def get_terminated(self): + return self._terminated # kill() # diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index 68c115c1b..131cbb1d5 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -137,6 +137,12 @@ class Scheduler(): # Hold on to the queues to process self.queues = queues + # NOTE: Enforce use of `SafeChildWatcher` as we generally don't want + # background threads. + # In Python 3.8+, `ThreadedChildWatcher` is the default watcher, and + # not `SafeChildWatcher`. + asyncio.set_child_watcher(asyncio.SafeChildWatcher()) + # Ensure that we have a fresh new event loop, in case we want # to run another test in this thread. self.loop = asyncio.new_event_loop() @@ -516,21 +522,15 @@ class Scheduler(): self.loop.remove_signal_handler(signal.SIGTERM) def _terminate_jobs_real(self): - # 20 seconds is a long time, it can take a while and sometimes - # we still fail, need to look deeper into this again. - wait_start = datetime.datetime.now() - wait_limit = 20.0 + def kill_jobs(): + for job_ in self._active_jobs: + job_.kill() - # First tell all jobs to terminate - for job in self._active_jobs: - job.terminate() + # Schedule all jobs to be killed if they have not exited in 20 sec + self.loop.call_later(20, kill_jobs) - # Now wait for them to really terminate for job in self._active_jobs: - elapsed = datetime.datetime.now() - wait_start - timeout = max(wait_limit - elapsed.total_seconds(), 0.0) - if not job.terminate_wait(timeout): - job.kill() + job.terminate() # Regular timeout for driving status in the UI def _tick(self): diff --git a/requirements/cov-requirements.in b/requirements/cov-requirements.in index 455b91ba6..1911f3506 100644 --- a/requirements/cov-requirements.in +++ b/requirements/cov-requirements.in @@ -1,2 +1,3 @@ -coverage == 4.4.0 +coverage == 4.4.0 ; python_version < '3.8' +coverage == 4.5.4 ; python_version >= '3.8' pytest-cov >= 2.5.0 diff --git a/requirements/cov-requirements.txt b/requirements/cov-requirements.txt index 46c70432d..51bc2bd28 100644 --- a/requirements/cov-requirements.txt +++ b/requirements/cov-requirements.txt @@ -1,4 +1,5 @@ -coverage==4.4 +coverage==4.4.0; python_version < '3.8' +coverage==4.5.4; python_version >= '3.8' pytest-cov==2.7.1 ## The following requirements were added by pip freeze: atomicwrites==1.3.0 @@ -6,7 +7,7 @@ attrs==19.1.0 importlib-metadata==0.20 more-itertools==7.2.0 packaging==19.1 -pluggy==0.12.0 +pluggy==0.13.1 py==1.8.0 pyparsing==2.4.2 pytest==5.1.2 diff --git a/requirements/dev-requirements.txt b/requirements/dev-requirements.txt index e6f327284..dfc991e9b 100644 --- a/requirements/dev-requirements.txt +++ b/requirements/dev-requirements.txt @@ -19,7 +19,7 @@ lazy-object-proxy==1.4.2 mccabe==0.6.1 more-itertools==7.2.0 packaging==19.1 -pluggy==0.12.0 +pluggy==0.13.1 py==1.8.0 pyparsing==2.4.2 pytest-cache==1.0 @@ -2,7 +2,7 @@ # Tox global configuration # [tox] -envlist = py35,py36,py37 +envlist = py35,py36,py37,py38 skip_missing_interpreters = true # @@ -13,16 +13,16 @@ skip_missing_interpreters = true [testenv] commands = # Running with coverage reporting enabled - py{35,36,37}-!nocover: pytest --basetemp {envtmpdir} --cov=buildstream --cov-config .coveragerc {posargs} - py{35,36,37}-!nocover: mkdir -p .coverage-reports - py{35,36,37}-!nocover: mv {envtmpdir}/.coverage {toxinidir}/.coverage-reports/.coverage.{env:COVERAGE_PREFIX:}{envname} + py{35,36,37,38}-!nocover: pytest --basetemp {envtmpdir} --cov=buildstream --cov-config .coveragerc {posargs} + py{35,36,37,38}-!nocover: mkdir -p .coverage-reports + py{35,36,37,38}-!nocover: mv {envtmpdir}/.coverage {toxinidir}/.coverage-reports/.coverage.{env:COVERAGE_PREFIX:}{envname} # Running with coverage reporting disabled - py{35,36,37}-nocover: pytest --basetemp {envtmpdir} {posargs} + py{35,36,37,38}-nocover: pytest --basetemp {envtmpdir} {posargs} deps = - py{35,36,37}: -rrequirements/requirements.txt - py{35,36,37}: -rrequirements/dev-requirements.txt - py{35,36,37}: -rrequirements/plugin-requirements.txt + py{35,36,37,38}: -rrequirements/requirements.txt + py{35,36,37,38}: -rrequirements/dev-requirements.txt + py{35,36,37,38}: -rrequirements/plugin-requirements.txt # Only require coverage and pytest-cov when using it !nocover: -rrequirements/cov-requirements.txt @@ -35,9 +35,9 @@ passenv = # These keys are not inherited by any other sections # setenv = - py{35,36,37}: COVERAGE_FILE = {envtmpdir}/.coverage + py{35,36,37,38}: COVERAGE_FILE = {envtmpdir}/.coverage whitelist_externals = - py{35,36,37}: + py{35,36,37,38}: mv mkdir |