summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Pollard <tom.pollard@codethink.co.uk>2019-07-15 11:51:18 +0100
committerTom Pollard <tom.pollard@codethink.co.uk>2019-08-06 14:54:14 +0100
commit07bb725bbf4817a4982f3eaeb5a4268bade3e9c7 (patch)
tree0f7867da87390f061be7b3eacb14d3cb83a0f64a
parentc2dafd2dc11af73838ae19e337f94687244f60cc (diff)
downloadbuildstream-07bb725bbf4817a4982f3eaeb5a4268bade3e9c7.tar.gz
Support exception handling from the subprocess
-rw-r--r--src/buildstream/_exceptions.py3
-rw-r--r--src/buildstream/_scheduler/jobs/job.py2
-rw-r--r--src/buildstream/_scheduler/queues/queue.py1
-rw-r--r--src/buildstream/_scheduler/scheduler.py27
-rw-r--r--src/buildstream/_stream.py59
-rw-r--r--src/buildstream/testing/runcli.py2
-rw-r--r--src/buildstream/utils.py4
7 files changed, 80 insertions, 18 deletions
diff --git a/src/buildstream/_exceptions.py b/src/buildstream/_exceptions.py
index d5b87a85e..e9776606d 100644
--- a/src/buildstream/_exceptions.py
+++ b/src/buildstream/_exceptions.py
@@ -238,7 +238,8 @@ class LoadErrorReason(Enum):
# interpreting project YAML
#
class LoadError(BstError):
- def __init__(self, message, reason, *, detail=None):
+ def __init__(self, message, reason=None, *, detail=None):
+ # Second parameter needs to be a default arg due to unpickling issue, unpleasant.
super().__init__(message, detail=detail, domain=ErrorDomain.LOAD, reason=reason)
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index d80d1a9e1..9e3ca1369 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -488,6 +488,8 @@ class Job():
# is currently managed in _exceptions.py
set_last_task_error(envelope.message['domain'],
envelope.message['reason'])
+ self._scheduler.set_last_task_error(envelope.message['domain'],
+ envelope.message['reason'])
elif envelope.message_type is _MessageType.RESULT:
assert self._result is None
self._result = envelope.message
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 8c81ce21d..3435e3f20 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -316,6 +316,7 @@ class Queue():
# This just allows us stronger testing capability
#
set_last_task_error(e.domain, e.reason)
+ self._scheduler.set_last_task_error(e.domain, e.reason)
except Exception: # pylint: disable=broad-except
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 2e9c7408e..664986534 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -53,6 +53,8 @@ class NotificationType(enum.Enum):
JOB_COMPLETE = "job_complete"
TICK = "tick"
EXCEPTION = "exception"
+ TASK_ERROR = "task_error"
+ SCHED_TERMINATE = "sched_terminate"
class Notification:
@@ -66,7 +68,9 @@ class Notification:
failed_element=False,
elapsed_time=None,
element=None,
- exception=None):
+ exception=None,
+ domain=None,
+ reason=None):
self.notification_type = notification_type
self.full_name = full_name
@@ -76,6 +80,8 @@ class Notification:
self.elapsed_time = elapsed_time
self.element = element
self.exception = exception
+ self.domain = domain
+ self.reason = reason
# Scheduler()
@@ -331,6 +337,12 @@ class Scheduler():
#
self._cache_size_scheduled = True
+ def set_last_task_error(self, domain, reason):
+ notification = Notification(NotificationType.TASK_ERROR,
+ domain=domain,
+ reason=reason)
+ self._notify(notification)
+
#######################################################
# Local Private Methods #
#######################################################
@@ -673,3 +685,16 @@ class Scheduler():
# a new use-case arises.
#
raise TypeError("Scheduler objects should not be pickled.")
+
+ def _loop(self):
+ assert self._notification_queue
+ # Check for and process new messages
+ while True:
+ try:
+ notification = self._notification_queue.get_nowait()
+ if notification.notification_type == NotificationType.SCHED_TERMINATE:
+ print("handling notifications")
+ self.terminate_jobs()
+ except queue.Empty:
+ notification = None
+ break
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index afbeb2066..4de975e9b 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -35,7 +35,7 @@ import queue
from ._artifact import Artifact
from ._artifactelement import verify_artifact_ref, ArtifactElement
-from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError
+from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error
from ._message import Message, MessageType
from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, JobStatus
@@ -107,6 +107,16 @@ class Stream():
self._artifacts = self._context.artifactcache
self._sourcecache = self._context.sourcecache
+ @staticmethod
+ def _subprocess_main(func, queue, *args, **kwargs):
+ # Set main process
+ utils._reset_main_pid()
+ try:
+ func(*args, **kwargs)
+ except Exception as e:
+ from ._scheduler.scheduler import Notification, NotificationType
+ queue.put(Notification(NotificationType.EXCEPTION, exception=e))
+
def run_in_subprocess(self, func, *args, **kwargs):
print("Args: {}".format([*args]))
print("Kwargs: {}".format(list(kwargs.items())))
@@ -114,11 +124,16 @@ class Stream():
mp_context = mp.get_context(method='fork')
process_name = "stream-{}".format(func.__name__)
- print("launchinglaunching subprocess:", process_name)
- self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
+ args = list(args)
+ args.insert(0, self._notification_queue)
+ args.insert(0, func)
+ print("launching subprocess:", process_name)
+ self._subprocess = mp_context.Process(target=Stream._subprocess_main, args=args,
+ kwargs=kwargs, name=process_name)
+
self._subprocess.start()
- # TODO connect signal handlers
+ # TODO connect signal handlers with asyncio
while self._subprocess.exitcode is None:
# check every given time interval on subprocess state
self._subprocess.join(0.1)
@@ -126,13 +141,17 @@ class Stream():
self._loop()
print("Stopping loop...")
- # try:
- # while True:
- # notification = self._notification_queue.get_nowait()
- # self._scheduler_notification_handler(notification)
- # except queue.Empty:
- # print("Finished processing notifications")
- # pass
+ # Set main process back
+ utils._reset_main_pid()
+
+ # Ensure no more notifcations to process
+ try:
+ while True:
+ notification = self._notification_queue.get_nowait()
+ self._scheduler_notification_handler(notification)
+ except queue.Empty:
+ print("Finished processing notifications")
+ pass
# cleanup()
#
@@ -1086,7 +1105,15 @@ class Stream():
# Terminate jobs
#
def terminate(self):
+ #if self._scheduler.loop:
+ # Scheduler not in subprocess
self._scheduler.terminate_jobs()
+ #else:
+ # # Handle calling subprocessed scheduler outside of main process
+ # assert self._notification_queue
+ # from ._scheduler.scheduler import Notification, NotificationType
+ # self._notifiaction_queue.put(Notification(NotificationType.SCHED_TERMINATE))
+ # self._scheduler.terminate_jobs()
# quit()
#
@@ -1625,8 +1652,9 @@ class Stream():
unique_id = None
self._state.fail_task(notification.job_action, notification.full_name, unique_id)
elif notification.notification_type == NotificationType.EXCEPTION:
- # TODO
- pass
+ raise notification.exception
+ elif notification.notification_type == NotificationType.TASK_ERROR:
+ set_last_task_error(notification.domain, notification.reason)
else:
raise StreamError("Unreccognised notification type recieved")
@@ -1653,8 +1681,9 @@ class Stream():
while True:
try:
notification = self._notification_queue.get_nowait()
- print("handling notifications")
- self._scheduler_notification_handler(notification)
+ if notification.notification_type != NotificationType.SCHED_TERMINATE:
+ print("handling notifications")
+ self._scheduler_notification_handler(notification)
except queue.Empty:
notification = None
break
diff --git a/src/buildstream/testing/runcli.py b/src/buildstream/testing/runcli.py
index 95bf83eff..ad717056b 100644
--- a/src/buildstream/testing/runcli.py
+++ b/src/buildstream/testing/runcli.py
@@ -85,7 +85,6 @@ class Result():
# in the case that the exit code reported is 0 (success).
#
if self.exit_code != 0:
-
# Check if buildstream failed to handle an
# exception, topevel CLI exit should always
# be a SystemExit exception.
@@ -149,6 +148,7 @@ class Result():
self.exception.domain,
self.exception.reason
))
+
assert self.exit_code == -1, fail_message
assert self.exc is not None, fail_message
assert self.exception is not None, fail_message
diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py
index 2c57925d4..ed1e1232f 100644
--- a/src/buildstream/utils.py
+++ b/src/buildstream/utils.py
@@ -711,6 +711,10 @@ def _is_main_process():
assert _main_pid is not None
return os.getpid() == _main_pid
+def _reset_main_pid():
+ global _main_pid
+ _main_pid = os.getpid()
+
# Recursively remove directories, ignoring file permissions as much as
# possible.