diff options
Diffstat (limited to 'src/buildstream/_signals.py')
-rw-r--r-- | src/buildstream/_signals.py | 83 |
1 files changed, 63 insertions, 20 deletions
diff --git a/src/buildstream/_signals.py b/src/buildstream/_signals.py index 03b55b052..ef2ba1965 100644 --- a/src/buildstream/_signals.py +++ b/src/buildstream/_signals.py @@ -33,6 +33,23 @@ from typing import Callable, Deque terminator_stack: Deque[Callable] = deque() suspendable_stack: Deque[Callable] = deque() +terminator_lock = threading.Lock() +suspendable_lock = threading.Lock() + +# This event is used to block all the threads while we wait for user +# interaction. This is because we can't stop all the pythin threads but the +# one easily when waiting for user input. However, most performance intensive +# tasks will pass through a subprocess or a multiprocess.Process and all of +# those are guarded by the signal handling. Thus, by setting and unsetting this +# event in the scheduler, we can enable and disable the launching of processes +# and ensure we don't do anything resource intensive while being interrupted. +is_not_suspended = threading.Event() +is_not_suspended.set() + + +class TerminateException(BaseException): + pass + # Per process SIGTERM handler def terminator_handler(signal_, frame): @@ -68,6 +85,9 @@ def terminator_handler(signal_, frame): # that while the code block is running, the supplied function # will be called upon process termination. # +# /!\ The callbacks passed must only contain code that does not acces thread +# local variables. Those will run in the main thread. +# # Note that after handlers are called, the termination will be handled by # terminating immediately with os._exit(). This means that SystemExit will not # be raised and 'finally' clauses will not be executed. @@ -80,23 +100,27 @@ def terminator_handler(signal_, frame): def terminator(terminate_func): global terminator_stack # pylint: disable=global-statement - # Signal handling only works in the main thread - if threading.current_thread() != threading.main_thread(): - yield - return - outermost = bool(not terminator_stack) - terminator_stack.append(terminate_func) + assert threading.current_thread() == threading.main_thread() or not outermost + + with terminator_lock: + terminator_stack.append(terminate_func) + if outermost: original_handler = signal.signal(signal.SIGTERM, terminator_handler) try: yield + except TerminateException: + terminate_func() + raise finally: if outermost: signal.signal(signal.SIGTERM, original_handler) - terminator_stack.pop() + + with terminator_lock: + terminator_stack.remove(terminate_func) # Just a simple object for holding on to two callbacks @@ -108,21 +132,25 @@ class Suspender: # Per process SIGTSTP handler def suspend_handler(sig, frame): + is_not_suspended.clear() # Suspend callbacks from innermost frame first - for suspender in reversed(suspendable_stack): - suspender.suspend() + with suspendable_lock: + for suspender in reversed(suspendable_stack): + suspender.suspend() - # Use SIGSTOP directly now on self, dont introduce more SIGTSTP - # - # Here the process sleeps until SIGCONT, which we simply - # dont handle. We know we'll pickup execution right here - # when we wake up. - os.kill(os.getpid(), signal.SIGSTOP) + # Use SIGSTOP directly now on self, dont introduce more SIGTSTP + # + # Here the process sleeps until SIGCONT, which we simply + # dont handle. We know we'll pickup execution right here + # when we wake up. + os.kill(os.getpid(), signal.SIGSTOP) - # Resume callbacks from outermost frame inwards - for suspender in suspendable_stack: - suspender.resume() + # Resume callbacks from outermost frame inwards + for suspender in suspendable_stack: + suspender.resume() + + is_not_suspended.set() # suspendable() @@ -133,6 +161,9 @@ def suspend_handler(sig, frame): # suspend_callback (callable): A function to call as process suspend time. # resume_callback (callable): A function to call as process resume time. # +# /!\ The callbacks passed must only contain code that does not acces thread +# local variables. Those will run in the main thread. +# # This must be used in code blocks which start processes that become # their own session leader. In these cases, SIGSTOP and SIGCONT need # to be propagated to the child process group. @@ -146,8 +177,19 @@ def suspendable(suspend_callback, resume_callback): global suspendable_stack # pylint: disable=global-statement outermost = bool(not suspendable_stack) + assert threading.current_thread() == threading.main_thread() or not outermost + + # If we are not in the main thread, ensure that we are not suspended + # before running. + # If we are in the main thread, never block on this, to ensure we + # don't deadlock. + if threading.current_thread() != threading.main_thread(): + is_not_suspended.wait() + suspender = Suspender(suspend_callback, resume_callback) - suspendable_stack.append(suspender) + + with suspendable_lock: + suspendable_stack.append(suspender) if outermost: original_stop = signal.signal(signal.SIGTSTP, suspend_handler) @@ -158,7 +200,8 @@ def suspendable(suspend_callback, resume_callback): if outermost: signal.signal(signal.SIGTSTP, original_stop) - suspendable_stack.pop() + with suspendable_lock: + suspendable_stack.remove(suspender) # blocked() |