diff options
author | Tom Pollard <tom.pollard@codethink.co.uk> | 2019-08-28 17:15:44 +0100 |
---|---|---|
committer | Tom Pollard <tom.pollard@codethink.co.uk> | 2019-09-10 10:12:30 +0100 |
commit | 50a1464ee9185ee737fe20455d9b73ca598575cc (patch) | |
tree | f0722500881ff0c7ec65dded8fc9363bffa293f9 | |
parent | 1c4b9842b52a147a4ec64e81bb5c93e196316cd3 (diff) | |
download | buildstream-50a1464ee9185ee737fe20455d9b73ca598575cc.tar.gz |
scheduler.py: Notification for interactive failure retry
Add a notifcation for RETRY. This moves the retry handling into
scheduler, which will be running in the process which has been
suspended for interactivity and as such will be able to load the
relevant Element. Note a failed job via the scheduler should never
not have a related queue, so the try except when matching the queue
via the action name should not be needed.
-rw-r--r-- | src/buildstream/_frontend/app.py | 6 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 16 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 17 |
3 files changed, 21 insertions, 18 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py index f9729a7ce..45160afbc 100644 --- a/src/buildstream/_frontend/app.py +++ b/src/buildstream/_frontend/app.py @@ -663,11 +663,7 @@ class App(): elif choice == 'retry': click.echo("\nRetrying failed job\n", err=True) unique_id = element[0] - try: - self.stream._failure_retry(action_name, unique_id) - except StreamError: - click.echo("Job action {} does not have a corresponding queue".format(action_name), err=True) - self.stream.terminate() + self.stream._failure_retry(action_name, unique_id) # # Print the session heading if we've loaded a pipeline and there diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 3c491b92d..b892296f5 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -30,6 +30,7 @@ from .resources import Resources from .jobs import JobStatus from ..types import FastEnum from .._profile import Topics, PROFILER +from ..plugin import Plugin # A decent return code for Scheduler.run() @@ -59,6 +60,7 @@ class NotificationType(FastEnum): SUSPEND = "suspend" UNSUSPEND = "unsuspend" SUSPENDED = "suspended" + RETRY = "retry" # Notification() @@ -497,6 +499,18 @@ class Scheduler(): self._notify(Notification(NotificationType.TICK)) self.loop.call_later(1, self._tick) + def _failure_retry(self, action_name, unique_id): + queue = None + for q in self.queues: + if q.action_name == action_name: + queue = q + break + # Assert queue found, we should only be retrying a queued job + assert queue + element = Plugin._lookup(unique_id) + queue._task_group.failed_tasks.remove(element._get_full_name()) + queue.enqueue([element]) + def _notify(self, notification): # Scheduler to Stream notifcations on right side self._notification_queue.append(notification) @@ -512,6 +526,8 @@ class Scheduler(): self.jobs_suspended() elif notification.notification_type == NotificationType.UNSUSPEND: self.jobs_unsuspended() + elif notification.notification_type == NotificationType.RETRY: + self._failure_retry(notification.job_action, notification.element) else: # Do not raise exception once scheduler process is separated # as we don't want to pickle exceptions between processes diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index e3d5eee89..6d8d918dd 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -1338,20 +1338,11 @@ class Stream(): # action_name (str): The name of the action being performed # unique_id (str): A unique_id to load an Element instance # - # Raises: - # (StreamError): If the related queue cannot be found - # def _failure_retry(self, action_name, unique_id): - queue = None - # Attempt to resolve the required queue - for queue in self.queues: - if queue.action_name == action_name: - queue = queue - if not queue: - raise StreamError() - element = Plugin._lookup(unique_id) - queue._task_group.failed_tasks.remove(element._get_full_name()) - queue.enqueue([element]) + notification = Notification(NotificationType.RETRY, + job_action=action_name, + element=unique_id) + self._notify(notification) # _run() # |