summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2019-10-08 18:12:14 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-10-08 18:12:14 +0000
commitd390fd62f28db1cb812554d386f2b3560e5e6628 (patch)
tree533baa5da4baf04b1e797e2ebb336dffe2fac3af
parentd268ff6fff7ab22e1080a500e0469f3dc9a54274 (diff)
parent5ea011c7ff420c6c9a4985088d4fadf2c066f7a2 (diff)
downloadbuildstream-d390fd62f28db1cb812554d386f2b3560e5e6628.tar.gz
Merge branch 'bschubert/casd-listen-failures' into 'master'
Listen for casd failures and abort the run when they happen Closes #1157 See merge request BuildStream/buildstream!1620
-rw-r--r--src/buildstream/_cas/cascache.py11
-rw-r--r--src/buildstream/_scheduler/scheduler.py33
-rw-r--r--src/buildstream/_stream.py2
3 files changed, 44 insertions, 2 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index f7855afc4..b6893503f 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -1049,6 +1049,17 @@ class CASCache():
return self._cache_usage_monitor.get_cache_usage()
+ # get_casd_process()
+ #
+ # Get the underlying buildbox-casd process
+ #
+ # Returns:
+ # (subprocess.Process): The casd process that is used for the current cascache
+ #
+ def get_casd_process(self):
+ assert self._casd_process is not None, "This should only be called with a running buildbox-casd process"
+ return self._casd_process
+
# _CASCacheUsage
#
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index d0a189545..6133cbfd7 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -30,6 +30,7 @@ from .resources import Resources
from .jobs import JobStatus
from ..types import FastEnum
from .._profile import Topics, PROFILER
+from .._message import Message, MessageType
from ..plugin import Plugin
@@ -137,6 +138,7 @@ class Scheduler():
self._suspendtime = None # Session time compensation for suspended state
self._queue_jobs = True # Whether we should continue to queue jobs
self._state = state
+ self._casd_process = None # handle to the casd process for monitoring purpose
# Bidirectional queue to send notifications back to the Scheduler's owner
self._notification_queue = notification_queue
@@ -150,6 +152,8 @@ class Scheduler():
#
# Args:
# queues (list): A list of Queue objects
+ # casd_processes (subprocess.Process): The subprocess which runs casd in order to be notified
+ # of failures.
#
# Returns:
# (SchedStatus): How the scheduling terminated
@@ -159,7 +163,7 @@ class Scheduler():
# elements have been processed by each queue or when
# an error arises
#
- def run(self, queues):
+ def run(self, queues, casd_process):
assert self.context.is_fork_allowed()
@@ -180,6 +184,11 @@ class Scheduler():
# Handle unix signals while running
self._connect_signals()
+ # Watch casd while running to ensure it doesn't die
+ self._casd_process = casd_process
+ _watcher = asyncio.get_child_watcher()
+ _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
+
# Start the profiler
with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
# Run the queues
@@ -187,6 +196,10 @@ class Scheduler():
self.loop.run_forever()
self.loop.close()
+ # Stop watching casd
+ _watcher.remove_child_handler(casd_process.pid)
+ self._casd_process = None
+
# Stop handling unix signals
self._disconnect_signals()
@@ -319,6 +332,24 @@ class Scheduler():
# Local Private Methods #
#######################################################
+ # _abort_on_casd_failure()
+ #
+ # Abort if casd failed while running.
+ #
+ # This will terminate immediately all jobs, since buildbox-casd is dead,
+ # we can't do anything with them anymore.
+ #
+ # Args:
+ # pid (int): the process id under which buildbox-casd was running
+ # returncode (int): the return code with which buildbox-casd exited
+ #
+ def _abort_on_casd_failure(self, pid, returncode):
+ message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.")
+ self._notify(Notification(NotificationType.MESSAGE, message=message))
+
+ self._casd_process.returncode = returncode
+ self.terminate_jobs()
+
# _start_job()
#
# Spanws a job
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 75b3dd84e..6e4e5caec 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1375,7 +1375,7 @@ class Stream():
if self._session_start_callback is not None:
self._session_start_callback()
- status = self._scheduler.run(self.queues)
+ status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
if status == SchedStatus.ERROR:
raise StreamError()