diff options
author | Tom Pollard <tom.pollard@codethink.co.uk> | 2019-07-15 11:51:18 +0100 |
---|---|---|
committer | Tom Pollard <tom.pollard@codethink.co.uk> | 2019-08-06 14:54:14 +0100 |
commit | 07bb725bbf4817a4982f3eaeb5a4268bade3e9c7 (patch) | |
tree | 0f7867da87390f061be7b3eacb14d3cb83a0f64a | |
parent | c2dafd2dc11af73838ae19e337f94687244f60cc (diff) | |
download | buildstream-07bb725bbf4817a4982f3eaeb5a4268bade3e9c7.tar.gz |
Support exception handling from the subprocess
-rw-r--r-- | src/buildstream/_exceptions.py | 3 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 2 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/queue.py | 1 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 27 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 59 | ||||
-rw-r--r-- | src/buildstream/testing/runcli.py | 2 | ||||
-rw-r--r-- | src/buildstream/utils.py | 4 |
7 files changed, 80 insertions, 18 deletions
diff --git a/src/buildstream/_exceptions.py b/src/buildstream/_exceptions.py index d5b87a85e..e9776606d 100644 --- a/src/buildstream/_exceptions.py +++ b/src/buildstream/_exceptions.py @@ -238,7 +238,8 @@ class LoadErrorReason(Enum): # interpreting project YAML # class LoadError(BstError): - def __init__(self, message, reason, *, detail=None): + def __init__(self, message, reason=None, *, detail=None): + # Second parameter needs to be a default arg due to unpickling issue, unpleasant. super().__init__(message, detail=detail, domain=ErrorDomain.LOAD, reason=reason) diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index d80d1a9e1..9e3ca1369 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -488,6 +488,8 @@ class Job(): # is currently managed in _exceptions.py set_last_task_error(envelope.message['domain'], envelope.message['reason']) + self._scheduler.set_last_task_error(envelope.message['domain'], + envelope.message['reason']) elif envelope.message_type is _MessageType.RESULT: assert self._result is None self._result = envelope.message diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index 8c81ce21d..3435e3f20 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -316,6 +316,7 @@ class Queue(): # This just allows us stronger testing capability # set_last_task_error(e.domain, e.reason) + self._scheduler.set_last_task_error(e.domain, e.reason) except Exception: # pylint: disable=broad-except diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 2e9c7408e..664986534 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -53,6 +53,8 @@ class NotificationType(enum.Enum): JOB_COMPLETE = "job_complete" TICK = "tick" EXCEPTION = "exception" + TASK_ERROR = "task_error" + SCHED_TERMINATE = "sched_terminate" class Notification: @@ -66,7 +68,9 @@ class Notification: failed_element=False, elapsed_time=None, element=None, - exception=None): + exception=None, + domain=None, + reason=None): self.notification_type = notification_type self.full_name = full_name @@ -76,6 +80,8 @@ class Notification: self.elapsed_time = elapsed_time self.element = element self.exception = exception + self.domain = domain + self.reason = reason # Scheduler() @@ -331,6 +337,12 @@ class Scheduler(): # self._cache_size_scheduled = True + def set_last_task_error(self, domain, reason): + notification = Notification(NotificationType.TASK_ERROR, + domain=domain, + reason=reason) + self._notify(notification) + ####################################################### # Local Private Methods # ####################################################### @@ -673,3 +685,16 @@ class Scheduler(): # a new use-case arises. # raise TypeError("Scheduler objects should not be pickled.") + + def _loop(self): + assert self._notification_queue + # Check for and process new messages + while True: + try: + notification = self._notification_queue.get_nowait() + if notification.notification_type == NotificationType.SCHED_TERMINATE: + print("handling notifications") + self.terminate_jobs() + except queue.Empty: + notification = None + break diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index afbeb2066..4de975e9b 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -35,7 +35,7 @@ import queue from ._artifact import Artifact from ._artifactelement import verify_artifact_ref, ArtifactElement -from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError +from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error from ._message import Message, MessageType from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \ SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, JobStatus @@ -107,6 +107,16 @@ class Stream(): self._artifacts = self._context.artifactcache self._sourcecache = self._context.sourcecache + @staticmethod + def _subprocess_main(func, queue, *args, **kwargs): + # Set main process + utils._reset_main_pid() + try: + func(*args, **kwargs) + except Exception as e: + from ._scheduler.scheduler import Notification, NotificationType + queue.put(Notification(NotificationType.EXCEPTION, exception=e)) + def run_in_subprocess(self, func, *args, **kwargs): print("Args: {}".format([*args])) print("Kwargs: {}".format(list(kwargs.items()))) @@ -114,11 +124,16 @@ class Stream(): mp_context = mp.get_context(method='fork') process_name = "stream-{}".format(func.__name__) - print("launchinglaunching subprocess:", process_name) - self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name) + args = list(args) + args.insert(0, self._notification_queue) + args.insert(0, func) + print("launching subprocess:", process_name) + self._subprocess = mp_context.Process(target=Stream._subprocess_main, args=args, + kwargs=kwargs, name=process_name) + self._subprocess.start() - # TODO connect signal handlers + # TODO connect signal handlers with asyncio while self._subprocess.exitcode is None: # check every given time interval on subprocess state self._subprocess.join(0.1) @@ -126,13 +141,17 @@ class Stream(): self._loop() print("Stopping loop...") - # try: - # while True: - # notification = self._notification_queue.get_nowait() - # self._scheduler_notification_handler(notification) - # except queue.Empty: - # print("Finished processing notifications") - # pass + # Set main process back + utils._reset_main_pid() + + # Ensure no more notifcations to process + try: + while True: + notification = self._notification_queue.get_nowait() + self._scheduler_notification_handler(notification) + except queue.Empty: + print("Finished processing notifications") + pass # cleanup() # @@ -1086,7 +1105,15 @@ class Stream(): # Terminate jobs # def terminate(self): + #if self._scheduler.loop: + # Scheduler not in subprocess self._scheduler.terminate_jobs() + #else: + # # Handle calling subprocessed scheduler outside of main process + # assert self._notification_queue + # from ._scheduler.scheduler import Notification, NotificationType + # self._notifiaction_queue.put(Notification(NotificationType.SCHED_TERMINATE)) + # self._scheduler.terminate_jobs() # quit() # @@ -1625,8 +1652,9 @@ class Stream(): unique_id = None self._state.fail_task(notification.job_action, notification.full_name, unique_id) elif notification.notification_type == NotificationType.EXCEPTION: - # TODO - pass + raise notification.exception + elif notification.notification_type == NotificationType.TASK_ERROR: + set_last_task_error(notification.domain, notification.reason) else: raise StreamError("Unreccognised notification type recieved") @@ -1653,8 +1681,9 @@ class Stream(): while True: try: notification = self._notification_queue.get_nowait() - print("handling notifications") - self._scheduler_notification_handler(notification) + if notification.notification_type != NotificationType.SCHED_TERMINATE: + print("handling notifications") + self._scheduler_notification_handler(notification) except queue.Empty: notification = None break diff --git a/src/buildstream/testing/runcli.py b/src/buildstream/testing/runcli.py index 95bf83eff..ad717056b 100644 --- a/src/buildstream/testing/runcli.py +++ b/src/buildstream/testing/runcli.py @@ -85,7 +85,6 @@ class Result(): # in the case that the exit code reported is 0 (success). # if self.exit_code != 0: - # Check if buildstream failed to handle an # exception, topevel CLI exit should always # be a SystemExit exception. @@ -149,6 +148,7 @@ class Result(): self.exception.domain, self.exception.reason )) + assert self.exit_code == -1, fail_message assert self.exc is not None, fail_message assert self.exception is not None, fail_message diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py index 2c57925d4..ed1e1232f 100644 --- a/src/buildstream/utils.py +++ b/src/buildstream/utils.py @@ -711,6 +711,10 @@ def _is_main_process(): assert _main_pid is not None return os.getpid() == _main_pid +def _reset_main_pid(): + global _main_pid + _main_pid = os.getpid() + # Recursively remove directories, ignoring file permissions as much as # possible. |