summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Schubert <contact@benschubert.me>2019-11-08 12:18:43 +0000
committerBenjamin Schubert <contact@benschubert.me>2019-11-08 14:58:08 +0000
commitef40868d959b68040493d0b36bddfca64ae5b562 (patch)
treec9ad270cdfb3971fe1032baf0cce6d160408e6e1
parenta6c9c52a578e953bc4af64e300ca24cbde8eea4e (diff)
downloadbuildstream-ef40868d959b68040493d0b36bddfca64ae5b562.tar.gz
scheduler.py: Prevent the asyncio loop from leaking into subprocesses
Having a running asyncio loop while forking a program is not supported in python and doesn't work as expected. This leads to file descriptors leaking and the subprocesses sharing the same loop as the parents. This also leads to the parent receiving all signals the children receive. This ensures we don't leek our asyncio loop in the workers we fork.
-rw-r--r--src/buildstream/_scheduler/_asyncio.py134
-rw-r--r--src/buildstream/_scheduler/jobs/job.py17
-rw-r--r--src/buildstream/_scheduler/scheduler.py8
3 files changed, 142 insertions, 17 deletions
diff --git a/src/buildstream/_scheduler/_asyncio.py b/src/buildstream/_scheduler/_asyncio.py
new file mode 100644
index 000000000..3c83dae64
--- /dev/null
+++ b/src/buildstream/_scheduler/_asyncio.py
@@ -0,0 +1,134 @@
+#
+# Copyright (C) 2019 Bloomberg LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+
+# TLDR:
+# NEVER use 'asyncio' in BuildStream directly, and use methods from _asyncio. This module has the same interface.
+# ALWAYS use `.AsyncioSafeProcess` when you have an event loop running and need a `multiprocessing.Process`
+#
+#
+# The upstream asyncio library doesn't play well with forking subprocesses while an event loop is running.
+#
+# The main problem that affects us is that the parent and the child will share some file handlers.
+# The most important one for us is the sig_handler_fd, which the loop uses to buffer signals received
+# by the app so that the asyncio loop can treat them afterwards.
+#
+# This sharing means that when we send a signal to the child, the sighandler in the child will write
+# it back to the parent sig_handler_fd, making the parent have to treat it too.
+# This is a problem for example when we sigterm the process. The scheduler will send sigterms to all its children,
+# which in turn will make the scheduler receive N SIGTERMs (one per child). Which in turn will send sigterms to
+# the children...
+#
+# We therefore implement here a new default loop to be used on Unix systems that will close the relevant
+# loop resources and ensure the asyncio system is in a sane state before actually executing the child's
+# target.
+#
+# We also provide a `AsyncioSafeProcess` derived from multiprocessing.Process that automatically
+# tries to cleanup the loop and never calls `waitpid` on the child process, which breaks our child watchers.
+#
+#
+# Relevant issues:
+# - Asyncio: support fork (https://bugs.python.org/issue21998)
+# - Asyncio: support multiprocessing (support fork) (https://bugs.python.org/issue22087)
+# - Signal delivered to a subprocess triggers parent's handler (https://bugs.python.org/issue31489)
+#
+#
+
+import multiprocessing
+import sys
+from asyncio import * # pylint: disable=redefined-builtin,wildcard-import,unused-wildcard-import
+
+
+# _ForkableUnixSelectorEventLoop()
+#
+# Inherit `SelectorEventLoop` and exposes a cleanup method to be
+# in a child process when it has been forked.
+#
+# This closes fds that would be shared with the parent otherwise.
+#
+class _ForkableUnixSelectorEventLoop(SelectorEventLoop): # type: ignore
+
+ # _cleanup_on_fork()
+ #
+ # Ensure that resources of the loop are in a clean state for it to be deleted
+ # Also ensure we don't leak fds from our parents to a new loop
+ #
+ def _cleanup_on_fork(self):
+ parent_selector = self._selector # pylint: disable=access-member-before-definition
+ self._selector = type(parent_selector)() # pylint: disable=attribute-defined-outside-init
+
+ for key in parent_selector.get_map().values():
+ self._selector.register(key.fileobj, key.events, key.data)
+
+ parent_selector.close()
+ self._close_self_pipe()
+ self._make_self_pipe()
+
+
+# _DefaultEventLoopPolicy()
+#
+# Provides a `cleanup_on_fork` method on top of what `DefaultEventLoopPolicy`
+# provides, in order to stop and remove the asyncio loop from the parent as cleanly as possible
+#
+class _DefaultEventLoopPolicy(DefaultEventLoopPolicy): # type: ignore
+ _loop_factory = _ForkableUnixSelectorEventLoop
+
+ def cleanup_on_fork(self):
+ loop = self._local._loop
+
+ if loop is None:
+ # We don't have a loop, we're fine
+ return
+
+ assert not Task.all_tasks(loop), "Some asyncio tasks leaked to a child"
+ assert Task.current_task(loop) is None, "We are in the child, we should have no running task"
+
+ loop._cleanup_on_fork()
+ loop.stop()
+ del self._local._loop
+ self._local._loop = None
+
+
+# _AsyncioSafeForkAwareProcess()
+#
+# Process class that doesn't call waitpid on its own.
+# This prevents conflicts with the asyncio child watcher.
+#
+# Also automatically close any running asyncio loop before calling
+# the actual run target
+#
+class _AsyncioSafeForkAwareProcess(multiprocessing.Process):
+ # pylint: disable=attribute-defined-outside-init
+ def start(self):
+ self._popen = self._Popen(self)
+ self._sentinel = self._popen.sentinel
+
+ def run(self):
+ get_event_loop_policy().cleanup_on_fork()
+ super().run()
+
+
+if sys.platform != "win32":
+ # Set the default event loop policy to automatically close our asyncio loop in child processes
+ DefaultEventLoopPolicy = _DefaultEventLoopPolicy # type: ignore
+ set_event_loop_policy(DefaultEventLoopPolicy()) # type: ignore
+
+ AsyncioSafeProcess = _AsyncioSafeForkAwareProcess
+
+else:
+ # Windows doesn't support ChildWatcher that way anyways, we'll need another
+ # implementation if we want it
+ AsyncioSafeProcess = multiprocessing.Process
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 1d7697b02..0d95b8e7e 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -20,7 +20,6 @@
# Tristan Maat <tristan.maat@codethink.co.uk>
# System imports
-import asyncio
import datetime
import multiprocessing
import os
@@ -33,6 +32,7 @@ from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
from ..._message import Message, MessageType, unconditional_messages
from ...types import FastEnum
from ... import _signals, utils
+from .. import _asyncio
from .jobpickler import pickle_child_job, do_pickled_child_job
@@ -69,15 +69,6 @@ class _Envelope():
self.message = message
-# Process class that doesn't call waitpid on its own.
-# This prevents conflicts with the asyncio child watcher.
-class Process(multiprocessing.Process):
- # pylint: disable=attribute-defined-outside-init
- def start(self):
- self._popen = self._Popen(self)
- self._sentinel = self._popen.sentinel
-
-
class _MessageType(FastEnum):
LOG_MESSAGE = 1
ERROR = 2
@@ -184,12 +175,12 @@ class Job():
child_job,
self._scheduler.context.get_projects(),
)
- self._process = Process(
+ self._process = _asyncio.AsyncioSafeProcess(
target=do_pickled_child_job,
args=[pickled, self._queue],
)
else:
- self._process = Process(
+ self._process = _asyncio.AsyncioSafeProcess(
target=child_job.child_action,
args=[self._queue],
)
@@ -223,7 +214,7 @@ class Job():
# an event loop callback. Otherwise, if the job completes too fast, then
# the callback is called immediately.
#
- self._watcher = asyncio.get_child_watcher()
+ self._watcher = _asyncio.get_child_watcher()
self._watcher.add_child_handler(self._process.pid, self._parent_child_completed)
# terminate()
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index d3faa2a8e..78af639fa 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -20,12 +20,12 @@
# System imports
import os
-import asyncio
from itertools import chain
import signal
import datetime
# Local imports
+from . import _asyncio
from .resources import Resources
from .jobs import JobStatus
from ..types import FastEnum
@@ -170,8 +170,8 @@ class Scheduler():
# Ensure that we have a fresh new event loop, in case we want
# to run another test in this thread.
- self.loop = asyncio.new_event_loop()
- asyncio.set_event_loop(self.loop)
+ self.loop = _asyncio.new_event_loop()
+ _asyncio.set_event_loop(self.loop)
# Notify that the loop has been created
self._notify(Notification(NotificationType.RUNNING))
@@ -184,7 +184,7 @@ class Scheduler():
# Watch casd while running to ensure it doesn't die
self._casd_process = casd_process
- _watcher = asyncio.get_child_watcher()
+ _watcher = _asyncio.get_child_watcher()
_watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
# Start the profiler