summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2019-11-11 18:36:08 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-11-11 18:36:08 +0000
commit74fa2537788d0933c7d0b229504a1bd30dd7a451 (patch)
treee599e792e5d435130574c945fb1f8d47d23921d9
parent4001b46e1e198b7c84bd52448eff2a59a7cf94cd (diff)
parent3d1fb3bd3fe68826ee59bc466364e2407f75cfd8 (diff)
downloadbuildstream-74fa2537788d0933c7d0b229504a1bd30dd7a451.tar.gz
Merge branch 'bschubert/fix-children-termination' into 'master'
scheduler.py: Prevent the asyncio loop from leaking into subprocesses See merge request BuildStream/buildstream!1691
-rw-r--r--src/buildstream/_scheduler/_multiprocessing.py79
-rw-r--r--src/buildstream/_scheduler/jobs/job.py14
-rw-r--r--src/buildstream/_scheduler/scheduler.py8
3 files changed, 85 insertions, 16 deletions
diff --git a/src/buildstream/_scheduler/_multiprocessing.py b/src/buildstream/_scheduler/_multiprocessing.py
new file mode 100644
index 000000000..4864e140c
--- /dev/null
+++ b/src/buildstream/_scheduler/_multiprocessing.py
@@ -0,0 +1,79 @@
+#
+# Copyright (C) 2019 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+
+# TLDR:
+# ALWAYS use `.AsyncioSafeProcess` when you have an asyncio event loop running and need a `multiprocessing.Process`
+#
+#
+# The upstream asyncio library doesn't play well with forking subprocesses while an event loop is running.
+#
+# The main problem that affects us is that the parent and the child will share some file handlers.
+# The most important one for us is the sig_handler_fd, which the loop uses to buffer signals received
+# by the app so that the asyncio loop can treat them afterwards.
+#
+# This sharing means that when we send a signal to the child, the sighandler in the child will write
+# it back to the parent sig_handler_fd, making the parent have to treat it too.
+# This is a problem for example when we sigterm the process. The scheduler will send sigterms to all its children,
+# which in turn will make the scheduler receive N SIGTERMs (one per child). Which in turn will send sigterms to
+# the children...
+#
+# We therefore provide a `AsyncioSafeProcess` derived from multiprocessing.Process that automatically
+# tries to cleanup the loop and never calls `waitpid` on the child process, which breaks our child watchers.
+#
+#
+# Relevant issues:
+# - Asyncio: support fork (https://bugs.python.org/issue21998)
+# - Asyncio: support multiprocessing (support fork) (https://bugs.python.org/issue22087)
+# - Signal delivered to a subprocess triggers parent's handler (https://bugs.python.org/issue31489)
+#
+#
+
+import multiprocessing
+import signal
+import sys
+from asyncio import set_event_loop_policy
+
+
+# _AsyncioSafeForkAwareProcess()
+#
+# Process class that doesn't call waitpid on its own.
+# This prevents conflicts with the asyncio child watcher.
+#
+# Also automatically close any running asyncio loop before calling
+# the actual run target
+#
+class _AsyncioSafeForkAwareProcess(multiprocessing.Process):
+ # pylint: disable=attribute-defined-outside-init
+ def start(self):
+ self._popen = self._Popen(self)
+ self._sentinel = self._popen.sentinel
+
+ def run(self):
+ signal.set_wakeup_fd(-1)
+ set_event_loop_policy(None)
+
+ super().run()
+
+
+if sys.platform != "win32":
+ # Set the default event loop policy to automatically close our asyncio loop in child processes
+ AsyncioSafeProcess = _AsyncioSafeForkAwareProcess
+
+else:
+ # Windows doesn't support ChildWatcher that way anyways, we'll need another
+ # implementation if we want it
+ AsyncioSafeProcess = multiprocessing.Process
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 1d7697b02..4e6199e16 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -33,6 +33,7 @@ from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
from ..._message import Message, MessageType, unconditional_messages
from ...types import FastEnum
from ... import _signals, utils
+from .. import _multiprocessing
from .jobpickler import pickle_child_job, do_pickled_child_job
@@ -69,15 +70,6 @@ class _Envelope():
self.message = message
-# Process class that doesn't call waitpid on its own.
-# This prevents conflicts with the asyncio child watcher.
-class Process(multiprocessing.Process):
- # pylint: disable=attribute-defined-outside-init
- def start(self):
- self._popen = self._Popen(self)
- self._sentinel = self._popen.sentinel
-
-
class _MessageType(FastEnum):
LOG_MESSAGE = 1
ERROR = 2
@@ -184,12 +176,12 @@ class Job():
child_job,
self._scheduler.context.get_projects(),
)
- self._process = Process(
+ self._process = _multiprocessing.AsyncioSafeProcess(
target=do_pickled_child_job,
args=[pickled, self._queue],
)
else:
- self._process = Process(
+ self._process = _multiprocessing.AsyncioSafeProcess(
target=child_job.child_action,
args=[self._queue],
)
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index d3faa2a8e..7ef5c5fe3 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -478,11 +478,9 @@ class Scheduler():
#
def _interrupt_event(self):
- # FIXME: This should not be needed, but for some reason we receive an
- # additional SIGINT event when the user hits ^C a second time
- # to inform us that they really intend to terminate; even though
- # we have disconnected our handlers at this time.
- #
+ # The event loop receives a copy of all signals that are sent while it is running
+ # This means that even though we catch the SIGINT in the question to the user,
+ # the loop will receive it too, and thus we need to skip it here.
if self.terminated:
return