summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Schubert <bschubert15@bloomberg.net>2020-06-17 20:57:37 +0000
committerBenjamin Schubert <bschubert15@bloomberg.net>2020-12-01 10:57:44 +0000
commitc7fc96494d6e268a6a48e0ff68c35763200d5d77 (patch)
treec7bce97db489a4fe6a7964a26a289e2be88719b8
parent7a9735ddb73c2d1b94eaf1a73b277259fdc37628 (diff)
downloadbuildstream-c7fc96494d6e268a6a48e0ff68c35763200d5d77.tar.gz
_signals.py: allow calling signal handler from non-main threads
* This modifies the signal terminator so that it can be called from any thread. This checks that either: - The signal handler is already in place - Or the caller is in the main thread, allowing to set the signal handler. This also removes the exact callback that was added instead of removing the last one, and fixes the `suspend_handler` to do the same. This is required, as we don't know which interleaving of calls will be done, and we can't guarantee that the last one is the right one to remove
-rw-r--r--src/buildstream/_cas/casserver.py7
-rw-r--r--src/buildstream/_signals.py19
2 files changed, 18 insertions, 8 deletions
diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py
index 013fb07dd..04c5eb836 100644
--- a/src/buildstream/_cas/casserver.py
+++ b/src/buildstream/_cas/casserver.py
@@ -30,6 +30,7 @@ import grpc
import click
from .._protos.build.bazel.remote.asset.v1 import remote_asset_pb2_grpc
+from .. import _signals
from .._protos.build.bazel.remote.execution.v2 import (
remote_execution_pb2,
remote_execution_pb2_grpc,
@@ -137,7 +138,11 @@ def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.Le
_ReferenceStorageServicer(casd_channel, root, enable_push=enable_push), server
)
- yield server
+ # Ensure we have the signal handler set for SIGTERM
+ # This allows threads from GRPC to call our methods that do register
+ # handlers at exit.
+ with _signals.terminator(lambda: None):
+ yield server
finally:
casd_channel.close()
diff --git a/src/buildstream/_signals.py b/src/buildstream/_signals.py
index 03b55b052..8032752a8 100644
--- a/src/buildstream/_signals.py
+++ b/src/buildstream/_signals.py
@@ -33,6 +33,9 @@ from typing import Callable, Deque
terminator_stack: Deque[Callable] = deque()
suspendable_stack: Deque[Callable] = deque()
+terminator_lock = threading.Lock()
+suspendable_lock = threading.Lock()
+
# Per process SIGTERM handler
def terminator_handler(signal_, frame):
@@ -80,13 +83,10 @@ def terminator_handler(signal_, frame):
def terminator(terminate_func):
global terminator_stack # pylint: disable=global-statement
- # Signal handling only works in the main thread
- if threading.current_thread() != threading.main_thread():
- yield
- return
-
outermost = bool(not terminator_stack)
+ assert threading.current_thread() == threading.main_thread() or not outermost
+
terminator_stack.append(terminate_func)
if outermost:
original_handler = signal.signal(signal.SIGTERM, terminator_handler)
@@ -96,7 +96,9 @@ def terminator(terminate_func):
finally:
if outermost:
signal.signal(signal.SIGTERM, original_handler)
- terminator_stack.pop()
+
+ with terminator_lock:
+ terminator_stack.remove(terminate_func)
# Just a simple object for holding on to two callbacks
@@ -146,6 +148,8 @@ def suspendable(suspend_callback, resume_callback):
global suspendable_stack # pylint: disable=global-statement
outermost = bool(not suspendable_stack)
+ assert threading.current_thread() == threading.main_thread() or not outermost
+
suspender = Suspender(suspend_callback, resume_callback)
suspendable_stack.append(suspender)
@@ -158,7 +162,8 @@ def suspendable(suspend_callback, resume_callback):
if outermost:
signal.signal(signal.SIGTSTP, original_stop)
- suspendable_stack.pop()
+ with suspendable_lock:
+ suspendable_stack.remove(suspender)
# blocked()