summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/buildstream/_frontend/app.py2
-rw-r--r--src/buildstream/_scheduler/scheduler.py15
-rw-r--r--src/buildstream/_state.py29
-rw-r--r--src/buildstream/_stream.py40
4 files changed, 28 insertions, 58 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 2f4235733..993747041 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -732,7 +732,7 @@ class App:
elif choice == "retry":
click.echo("\nRetrying failed job\n", err=True)
unique_id = element[0]
- self.stream._failure_retry(task.id, unique_id)
+ self.stream.retry_job(task.action_name, unique_id)
#
# Print the session heading if we've loaded a pipeline and there
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 7acb062d0..23abbe46d 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -113,7 +113,6 @@ class Scheduler:
self._interrupt_callback = interrupt_callback
self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers)
- self._state.register_task_retry_callback(self._failure_retry)
# Ensure that the forkserver is started before we start.
# This is best run before we do any GRPC connections to casd or have
@@ -517,20 +516,6 @@ class Scheduler:
self._ticker_callback()
self.loop.call_later(1, self._tick)
- def _failure_retry(self, task_id, unique_id):
- task = self._state.tasks[task_id]
-
- queue = None
- for q in self.queues:
- if q.action_name == task.action_name:
- queue = q
- break
- # Assert queue found, we should only be retrying a queued job
- assert queue
- element = Plugin._lookup(unique_id)
- queue._task_group.failed_tasks.remove(element._get_full_name())
- queue.enqueue([element])
-
def _handle_exception(self, loop, context: dict) -> None:
e = context.get("exception")
exc = bool(e)
diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py
index 0233dd323..773aa2146 100644
--- a/src/buildstream/_state.py
+++ b/src/buildstream/_state.py
@@ -112,7 +112,6 @@ class State:
self._task_changed_cbs = []
self._task_groups_changed_cbs = []
self._task_failed_cbs = []
- self._task_retry_cbs = []
#####################################
# Frontend-facing notification APIs #
@@ -216,20 +215,6 @@ class State:
def unregister_task_failed_callback(self, callback):
self._task_failed_cbs.remove(callback)
- # register_task_retry_callback()
- #
- # Registers a callback to be notified when a task is to be retried
- #
- # Args:
- # callback (function): The callback to be notified
- #
- # Callback Args:
- # task_id (str): The unique identifier of the task
- # unique_id: The unique id of the plugin instance to look up
- #
- def register_task_retry_callback(self, callback):
- self._task_retry_cbs.append(callback)
-
##############################################
# Core-facing APIs for driving notifications #
##############################################
@@ -334,20 +319,6 @@ class State:
for cb in self._task_failed_cbs:
cb(task_id, element)
- # retry_task()
- #
- # Notify all registered callbacks that a task is to be retried.
- #
- # This is a core-facing API and should not be called from the frontend
- #
- # Args:
- # task_id (str): The unique identifier of the task
- # unique_id: The unique id of the plugin instance to look up
- #
- def retry_task(self, task_id: str, unique_id: str) -> None:
- for cb in self._task_retry_cbs:
- cb(task_id, unique_id)
-
# elapsed_time()
#
# Fetches the current session elapsed time
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index a5391562a..b50be2a0d 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1069,6 +1069,33 @@ class Stream:
self._suspended = False
self._scheduler.resume()
+ # retry_job()
+ #
+ # Retry the indicated job
+ #
+ # Args:
+ # action_name: The unique identifier of the task
+ # unique_id: A unique_id to load an Element instance
+ #
+ def retry_job(self, action_name: str, unique_id: str) -> None:
+ element = Plugin._lookup(unique_id)
+
+ #
+ # Update the state task group, remove the failed element
+ #
+ group = self._state.task_groups[action_name]
+ group.failed_tasks.remove(element._get_full_name())
+
+ #
+ # Find the queue for this action name and requeue the element
+ #
+ queue = None
+ for q in self.queues:
+ if q.action_name == action_name:
+ queue = q
+ assert queue
+ queue.enqueue([element])
+
#############################################################
# Private Methods #
#############################################################
@@ -1346,19 +1373,6 @@ class Stream:
self.session_elements += plan
- # _failure_retry()
- #
- # Enqueues given element via unique_id to the specified queue
- # matched against provided action_name & removes the related
- # failed task from the tasks group.
- #
- # Args:
- # task_id (str): The unique identifier of the task
- # unique_id: A unique_id to load an Element instance
- #
- def _failure_retry(self, task_id: str, unique_id: str) -> None:
- self._state.retry_task(task_id, unique_id)
-
# _run()
#
# Common function for running the scheduler