summaryrefslogtreecommitdiff
path: root/src/buildstream/_signals.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_signals.py')
-rw-r--r--src/buildstream/_signals.py83
1 files changed, 63 insertions, 20 deletions
diff --git a/src/buildstream/_signals.py b/src/buildstream/_signals.py
index 03b55b052..ef2ba1965 100644
--- a/src/buildstream/_signals.py
+++ b/src/buildstream/_signals.py
@@ -33,6 +33,23 @@ from typing import Callable, Deque
terminator_stack: Deque[Callable] = deque()
suspendable_stack: Deque[Callable] = deque()
+terminator_lock = threading.Lock()
+suspendable_lock = threading.Lock()
+
+# This event is used to block all the threads while we wait for user
+# interaction. This is because we can't stop all the pythin threads but the
+# one easily when waiting for user input. However, most performance intensive
+# tasks will pass through a subprocess or a multiprocess.Process and all of
+# those are guarded by the signal handling. Thus, by setting and unsetting this
+# event in the scheduler, we can enable and disable the launching of processes
+# and ensure we don't do anything resource intensive while being interrupted.
+is_not_suspended = threading.Event()
+is_not_suspended.set()
+
+
+class TerminateException(BaseException):
+ pass
+
# Per process SIGTERM handler
def terminator_handler(signal_, frame):
@@ -68,6 +85,9 @@ def terminator_handler(signal_, frame):
# that while the code block is running, the supplied function
# will be called upon process termination.
#
+# /!\ The callbacks passed must only contain code that does not acces thread
+# local variables. Those will run in the main thread.
+#
# Note that after handlers are called, the termination will be handled by
# terminating immediately with os._exit(). This means that SystemExit will not
# be raised and 'finally' clauses will not be executed.
@@ -80,23 +100,27 @@ def terminator_handler(signal_, frame):
def terminator(terminate_func):
global terminator_stack # pylint: disable=global-statement
- # Signal handling only works in the main thread
- if threading.current_thread() != threading.main_thread():
- yield
- return
-
outermost = bool(not terminator_stack)
- terminator_stack.append(terminate_func)
+ assert threading.current_thread() == threading.main_thread() or not outermost
+
+ with terminator_lock:
+ terminator_stack.append(terminate_func)
+
if outermost:
original_handler = signal.signal(signal.SIGTERM, terminator_handler)
try:
yield
+ except TerminateException:
+ terminate_func()
+ raise
finally:
if outermost:
signal.signal(signal.SIGTERM, original_handler)
- terminator_stack.pop()
+
+ with terminator_lock:
+ terminator_stack.remove(terminate_func)
# Just a simple object for holding on to two callbacks
@@ -108,21 +132,25 @@ class Suspender:
# Per process SIGTSTP handler
def suspend_handler(sig, frame):
+ is_not_suspended.clear()
# Suspend callbacks from innermost frame first
- for suspender in reversed(suspendable_stack):
- suspender.suspend()
+ with suspendable_lock:
+ for suspender in reversed(suspendable_stack):
+ suspender.suspend()
- # Use SIGSTOP directly now on self, dont introduce more SIGTSTP
- #
- # Here the process sleeps until SIGCONT, which we simply
- # dont handle. We know we'll pickup execution right here
- # when we wake up.
- os.kill(os.getpid(), signal.SIGSTOP)
+ # Use SIGSTOP directly now on self, dont introduce more SIGTSTP
+ #
+ # Here the process sleeps until SIGCONT, which we simply
+ # dont handle. We know we'll pickup execution right here
+ # when we wake up.
+ os.kill(os.getpid(), signal.SIGSTOP)
- # Resume callbacks from outermost frame inwards
- for suspender in suspendable_stack:
- suspender.resume()
+ # Resume callbacks from outermost frame inwards
+ for suspender in suspendable_stack:
+ suspender.resume()
+
+ is_not_suspended.set()
# suspendable()
@@ -133,6 +161,9 @@ def suspend_handler(sig, frame):
# suspend_callback (callable): A function to call as process suspend time.
# resume_callback (callable): A function to call as process resume time.
#
+# /!\ The callbacks passed must only contain code that does not acces thread
+# local variables. Those will run in the main thread.
+#
# This must be used in code blocks which start processes that become
# their own session leader. In these cases, SIGSTOP and SIGCONT need
# to be propagated to the child process group.
@@ -146,8 +177,19 @@ def suspendable(suspend_callback, resume_callback):
global suspendable_stack # pylint: disable=global-statement
outermost = bool(not suspendable_stack)
+ assert threading.current_thread() == threading.main_thread() or not outermost
+
+ # If we are not in the main thread, ensure that we are not suspended
+ # before running.
+ # If we are in the main thread, never block on this, to ensure we
+ # don't deadlock.
+ if threading.current_thread() != threading.main_thread():
+ is_not_suspended.wait()
+
suspender = Suspender(suspend_callback, resume_callback)
- suspendable_stack.append(suspender)
+
+ with suspendable_lock:
+ suspendable_stack.append(suspender)
if outermost:
original_stop = signal.signal(signal.SIGTSTP, suspend_handler)
@@ -158,7 +200,8 @@ def suspendable(suspend_callback, resume_callback):
if outermost:
signal.signal(signal.SIGTSTP, original_stop)
- suspendable_stack.pop()
+ with suspendable_lock:
+ suspendable_stack.remove(suspender)
# blocked()