From ac2433491154c458e8d0c7cc68ccd6cded508153 Mon Sep 17 00:00:00 2001 From: Tom Pollard Date: Tue, 12 Nov 2019 16:55:12 +0000 Subject: Apply AsyncioSafeProcess to Stream's multiprocess Note this stops explictly using the get_context object from multiprocessing which allows for fork to be used in a process where spawn is the default. This obviously breaks the linux CI targets for FORCE SPAWN. --- src/buildstream/_multiprocessing.py | 79 ++++++++++++++++++++++++++ src/buildstream/_scheduler/_multiprocessing.py | 79 -------------------------- src/buildstream/_scheduler/jobs/job.py | 3 +- src/buildstream/_stream.py | 6 +- 4 files changed, 83 insertions(+), 84 deletions(-) create mode 100644 src/buildstream/_multiprocessing.py delete mode 100644 src/buildstream/_scheduler/_multiprocessing.py diff --git a/src/buildstream/_multiprocessing.py b/src/buildstream/_multiprocessing.py new file mode 100644 index 000000000..4864e140c --- /dev/null +++ b/src/buildstream/_multiprocessing.py @@ -0,0 +1,79 @@ +# +# Copyright (C) 2019 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see . +# + +# TLDR: +# ALWAYS use `.AsyncioSafeProcess` when you have an asyncio event loop running and need a `multiprocessing.Process` +# +# +# The upstream asyncio library doesn't play well with forking subprocesses while an event loop is running. +# +# The main problem that affects us is that the parent and the child will share some file handlers. +# The most important one for us is the sig_handler_fd, which the loop uses to buffer signals received +# by the app so that the asyncio loop can treat them afterwards. +# +# This sharing means that when we send a signal to the child, the sighandler in the child will write +# it back to the parent sig_handler_fd, making the parent have to treat it too. +# This is a problem for example when we sigterm the process. The scheduler will send sigterms to all its children, +# which in turn will make the scheduler receive N SIGTERMs (one per child). Which in turn will send sigterms to +# the children... +# +# We therefore provide a `AsyncioSafeProcess` derived from multiprocessing.Process that automatically +# tries to cleanup the loop and never calls `waitpid` on the child process, which breaks our child watchers. +# +# +# Relevant issues: +# - Asyncio: support fork (https://bugs.python.org/issue21998) +# - Asyncio: support multiprocessing (support fork) (https://bugs.python.org/issue22087) +# - Signal delivered to a subprocess triggers parent's handler (https://bugs.python.org/issue31489) +# +# + +import multiprocessing +import signal +import sys +from asyncio import set_event_loop_policy + + +# _AsyncioSafeForkAwareProcess() +# +# Process class that doesn't call waitpid on its own. +# This prevents conflicts with the asyncio child watcher. +# +# Also automatically close any running asyncio loop before calling +# the actual run target +# +class _AsyncioSafeForkAwareProcess(multiprocessing.Process): + # pylint: disable=attribute-defined-outside-init + def start(self): + self._popen = self._Popen(self) + self._sentinel = self._popen.sentinel + + def run(self): + signal.set_wakeup_fd(-1) + set_event_loop_policy(None) + + super().run() + + +if sys.platform != "win32": + # Set the default event loop policy to automatically close our asyncio loop in child processes + AsyncioSafeProcess = _AsyncioSafeForkAwareProcess + +else: + # Windows doesn't support ChildWatcher that way anyways, we'll need another + # implementation if we want it + AsyncioSafeProcess = multiprocessing.Process diff --git a/src/buildstream/_scheduler/_multiprocessing.py b/src/buildstream/_scheduler/_multiprocessing.py deleted file mode 100644 index 4864e140c..000000000 --- a/src/buildstream/_scheduler/_multiprocessing.py +++ /dev/null @@ -1,79 +0,0 @@ -# -# Copyright (C) 2019 Bloomberg Finance LP -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library. If not, see . -# - -# TLDR: -# ALWAYS use `.AsyncioSafeProcess` when you have an asyncio event loop running and need a `multiprocessing.Process` -# -# -# The upstream asyncio library doesn't play well with forking subprocesses while an event loop is running. -# -# The main problem that affects us is that the parent and the child will share some file handlers. -# The most important one for us is the sig_handler_fd, which the loop uses to buffer signals received -# by the app so that the asyncio loop can treat them afterwards. -# -# This sharing means that when we send a signal to the child, the sighandler in the child will write -# it back to the parent sig_handler_fd, making the parent have to treat it too. -# This is a problem for example when we sigterm the process. The scheduler will send sigterms to all its children, -# which in turn will make the scheduler receive N SIGTERMs (one per child). Which in turn will send sigterms to -# the children... -# -# We therefore provide a `AsyncioSafeProcess` derived from multiprocessing.Process that automatically -# tries to cleanup the loop and never calls `waitpid` on the child process, which breaks our child watchers. -# -# -# Relevant issues: -# - Asyncio: support fork (https://bugs.python.org/issue21998) -# - Asyncio: support multiprocessing (support fork) (https://bugs.python.org/issue22087) -# - Signal delivered to a subprocess triggers parent's handler (https://bugs.python.org/issue31489) -# -# - -import multiprocessing -import signal -import sys -from asyncio import set_event_loop_policy - - -# _AsyncioSafeForkAwareProcess() -# -# Process class that doesn't call waitpid on its own. -# This prevents conflicts with the asyncio child watcher. -# -# Also automatically close any running asyncio loop before calling -# the actual run target -# -class _AsyncioSafeForkAwareProcess(multiprocessing.Process): - # pylint: disable=attribute-defined-outside-init - def start(self): - self._popen = self._Popen(self) - self._sentinel = self._popen.sentinel - - def run(self): - signal.set_wakeup_fd(-1) - set_event_loop_policy(None) - - super().run() - - -if sys.platform != "win32": - # Set the default event loop policy to automatically close our asyncio loop in child processes - AsyncioSafeProcess = _AsyncioSafeForkAwareProcess - -else: - # Windows doesn't support ChildWatcher that way anyways, we'll need another - # implementation if we want it - AsyncioSafeProcess = multiprocessing.Process diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 460f63dbf..9d135c61b 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -33,8 +33,7 @@ import traceback from ..._exceptions import ImplError, BstError, SkipJob from ..._message import Message, MessageType, unconditional_messages from ...types import FastEnum -from ... import _signals, utils -from .. import _multiprocessing +from ... import _signals, utils, _multiprocessing from .jobpickler import pickle_child_job, do_pickled_child_job diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index ebabcac5e..86eea4905 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -65,7 +65,7 @@ from ._profile import Topics, PROFILER from ._state import State from .types import _KeyStrength, _SchedulerErrorAction from .plugin import Plugin -from . import utils, _yaml, _site, _signals +from . import utils, _yaml, _site, _signals, _multiprocessing from . import Scope @@ -152,7 +152,7 @@ class Stream: def run_in_subprocess(self, func, *args, **kwargs): assert not self._subprocess - mp_context = mp.get_context(method="fork") + # mp_context = _multiprocessing.get_context(method='fork') process_name = "stream-{}".format(func.__name__) self._notify_front_queue = mp.Queue() @@ -165,7 +165,7 @@ class Stream: args.insert(0, self._notify_front_queue) args.insert(0, func) - self._subprocess = mp_context.Process( + self._subprocess = _multiprocessing.AsyncioSafeProcess( target=Stream._subprocess_main, args=args, kwargs=kwargs, name=process_name ) -- cgit v1.2.1