summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Pollard <tom.pollard@codethink.co.uk>2019-08-28 17:15:44 +0100
committerTom Pollard <tom.pollard@codethink.co.uk>2019-09-10 10:12:30 +0100
commit50a1464ee9185ee737fe20455d9b73ca598575cc (patch)
treef0722500881ff0c7ec65dded8fc9363bffa293f9
parent1c4b9842b52a147a4ec64e81bb5c93e196316cd3 (diff)
downloadbuildstream-50a1464ee9185ee737fe20455d9b73ca598575cc.tar.gz
scheduler.py: Notification for interactive failure retry
Add a notifcation for RETRY. This moves the retry handling into scheduler, which will be running in the process which has been suspended for interactivity and as such will be able to load the relevant Element. Note a failed job via the scheduler should never not have a related queue, so the try except when matching the queue via the action name should not be needed.
-rw-r--r--src/buildstream/_frontend/app.py6
-rw-r--r--src/buildstream/_scheduler/scheduler.py16
-rw-r--r--src/buildstream/_stream.py17
3 files changed, 21 insertions, 18 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index f9729a7ce..45160afbc 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -663,11 +663,7 @@ class App():
elif choice == 'retry':
click.echo("\nRetrying failed job\n", err=True)
unique_id = element[0]
- try:
- self.stream._failure_retry(action_name, unique_id)
- except StreamError:
- click.echo("Job action {} does not have a corresponding queue".format(action_name), err=True)
- self.stream.terminate()
+ self.stream._failure_retry(action_name, unique_id)
#
# Print the session heading if we've loaded a pipeline and there
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 3c491b92d..b892296f5 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -30,6 +30,7 @@ from .resources import Resources
from .jobs import JobStatus
from ..types import FastEnum
from .._profile import Topics, PROFILER
+from ..plugin import Plugin
# A decent return code for Scheduler.run()
@@ -59,6 +60,7 @@ class NotificationType(FastEnum):
SUSPEND = "suspend"
UNSUSPEND = "unsuspend"
SUSPENDED = "suspended"
+ RETRY = "retry"
# Notification()
@@ -497,6 +499,18 @@ class Scheduler():
self._notify(Notification(NotificationType.TICK))
self.loop.call_later(1, self._tick)
+ def _failure_retry(self, action_name, unique_id):
+ queue = None
+ for q in self.queues:
+ if q.action_name == action_name:
+ queue = q
+ break
+ # Assert queue found, we should only be retrying a queued job
+ assert queue
+ element = Plugin._lookup(unique_id)
+ queue._task_group.failed_tasks.remove(element._get_full_name())
+ queue.enqueue([element])
+
def _notify(self, notification):
# Scheduler to Stream notifcations on right side
self._notification_queue.append(notification)
@@ -512,6 +526,8 @@ class Scheduler():
self.jobs_suspended()
elif notification.notification_type == NotificationType.UNSUSPEND:
self.jobs_unsuspended()
+ elif notification.notification_type == NotificationType.RETRY:
+ self._failure_retry(notification.job_action, notification.element)
else:
# Do not raise exception once scheduler process is separated
# as we don't want to pickle exceptions between processes
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index e3d5eee89..6d8d918dd 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1338,20 +1338,11 @@ class Stream():
# action_name (str): The name of the action being performed
# unique_id (str): A unique_id to load an Element instance
#
- # Raises:
- # (StreamError): If the related queue cannot be found
- #
def _failure_retry(self, action_name, unique_id):
- queue = None
- # Attempt to resolve the required queue
- for queue in self.queues:
- if queue.action_name == action_name:
- queue = queue
- if not queue:
- raise StreamError()
- element = Plugin._lookup(unique_id)
- queue._task_group.failed_tasks.remove(element._get_full_name())
- queue.enqueue([element])
+ notification = Notification(NotificationType.RETRY,
+ job_action=action_name,
+ element=unique_id)
+ self._notify(notification)
# _run()
#