diff options
-rw-r--r-- | src/buildstream/_frontend/app.py | 2 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 15 | ||||
-rw-r--r-- | src/buildstream/_state.py | 29 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 40 |
4 files changed, 28 insertions, 58 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py index 2f4235733..993747041 100644 --- a/src/buildstream/_frontend/app.py +++ b/src/buildstream/_frontend/app.py @@ -732,7 +732,7 @@ class App: elif choice == "retry": click.echo("\nRetrying failed job\n", err=True) unique_id = element[0] - self.stream._failure_retry(task.id, unique_id) + self.stream.retry_job(task.action_name, unique_id) # # Print the session heading if we've loaded a pipeline and there diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 7acb062d0..23abbe46d 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -113,7 +113,6 @@ class Scheduler: self._interrupt_callback = interrupt_callback self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers) - self._state.register_task_retry_callback(self._failure_retry) # Ensure that the forkserver is started before we start. # This is best run before we do any GRPC connections to casd or have @@ -517,20 +516,6 @@ class Scheduler: self._ticker_callback() self.loop.call_later(1, self._tick) - def _failure_retry(self, task_id, unique_id): - task = self._state.tasks[task_id] - - queue = None - for q in self.queues: - if q.action_name == task.action_name: - queue = q - break - # Assert queue found, we should only be retrying a queued job - assert queue - element = Plugin._lookup(unique_id) - queue._task_group.failed_tasks.remove(element._get_full_name()) - queue.enqueue([element]) - def _handle_exception(self, loop, context: dict) -> None: e = context.get("exception") exc = bool(e) diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py index 0233dd323..773aa2146 100644 --- a/src/buildstream/_state.py +++ b/src/buildstream/_state.py @@ -112,7 +112,6 @@ class State: self._task_changed_cbs = [] self._task_groups_changed_cbs = [] self._task_failed_cbs = [] - self._task_retry_cbs = [] ##################################### # Frontend-facing notification APIs # @@ -216,20 +215,6 @@ class State: def unregister_task_failed_callback(self, callback): self._task_failed_cbs.remove(callback) - # register_task_retry_callback() - # - # Registers a callback to be notified when a task is to be retried - # - # Args: - # callback (function): The callback to be notified - # - # Callback Args: - # task_id (str): The unique identifier of the task - # unique_id: The unique id of the plugin instance to look up - # - def register_task_retry_callback(self, callback): - self._task_retry_cbs.append(callback) - ############################################## # Core-facing APIs for driving notifications # ############################################## @@ -334,20 +319,6 @@ class State: for cb in self._task_failed_cbs: cb(task_id, element) - # retry_task() - # - # Notify all registered callbacks that a task is to be retried. - # - # This is a core-facing API and should not be called from the frontend - # - # Args: - # task_id (str): The unique identifier of the task - # unique_id: The unique id of the plugin instance to look up - # - def retry_task(self, task_id: str, unique_id: str) -> None: - for cb in self._task_retry_cbs: - cb(task_id, unique_id) - # elapsed_time() # # Fetches the current session elapsed time diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index a5391562a..b50be2a0d 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -1069,6 +1069,33 @@ class Stream: self._suspended = False self._scheduler.resume() + # retry_job() + # + # Retry the indicated job + # + # Args: + # action_name: The unique identifier of the task + # unique_id: A unique_id to load an Element instance + # + def retry_job(self, action_name: str, unique_id: str) -> None: + element = Plugin._lookup(unique_id) + + # + # Update the state task group, remove the failed element + # + group = self._state.task_groups[action_name] + group.failed_tasks.remove(element._get_full_name()) + + # + # Find the queue for this action name and requeue the element + # + queue = None + for q in self.queues: + if q.action_name == action_name: + queue = q + assert queue + queue.enqueue([element]) + ############################################################# # Private Methods # ############################################################# @@ -1346,19 +1373,6 @@ class Stream: self.session_elements += plan - # _failure_retry() - # - # Enqueues given element via unique_id to the specified queue - # matched against provided action_name & removes the related - # failed task from the tasks group. - # - # Args: - # task_id (str): The unique identifier of the task - # unique_id: A unique_id to load an Element instance - # - def _failure_retry(self, task_id: str, unique_id: str) -> None: - self._state.retry_task(task_id, unique_id) - # _run() # # Common function for running the scheduler |