diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/buildstream/_messenger.py | 2 | ||||
-rw-r--r-- | src/buildstream/_state.py | 344 |
2 files changed, 215 insertions, 131 deletions
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py index 69a309f91..f18d3dc92 100644 --- a/src/buildstream/_messenger.py +++ b/src/buildstream/_messenger.py @@ -212,7 +212,7 @@ class Messenger: self.message(message) task = self._state.add_task(task_name, activity_name, task_name) - task.set_render_cb(self._render_status) + task.set_task_changed_callback(self._render_status) self._active_simple_tasks += 1 if not self._next_render: self._next_render = datetime.datetime.now() + _RENDER_INTERVAL diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py index 773aa2146..ddcbb09b9 100644 --- a/src/buildstream/_state.py +++ b/src/buildstream/_state.py @@ -16,7 +16,8 @@ # import datetime -from collections import OrderedDict +from typing import Optional, Tuple, List, Dict, Callable +from .types import _DisplayKey # TaskGroup @@ -24,23 +25,27 @@ from collections import OrderedDict # The state data stored for a group of tasks (usually scheduler queues) # # Args: -# name (str): The name of the Task Group, e.g. 'build' -# state (State): The state object -# complete_name (str): Optional name for frontend status rendering, e.g. 'built' +# name: The name of the Task Group, e.g. 'build' +# state: The state object +# complete_name: Optional name for frontend status rendering, e.g. 'built' # class TaskGroup: - def __init__(self, name, state, complete_name=None): - self.name = name - self.complete_name = complete_name - self.processed_tasks = 0 - self.skipped_tasks = 0 - # NOTE: failed_tasks is a list of strings instead of an integer count - # because the frontend requires the full list of failed tasks to - # know whether to print failure messages for a given element. - self.failed_tasks = [] - - self._state = state - self._update_task_group_cbs = [] + def __init__(self, name: str, state: "State", complete_name: Optional[str] = None) -> None: + + # + # Public members + # + self.name: str = name # The name of tasks in this group + self.complete_name: Optional[str] = complete_name # Optional name for frontend status rendering, e.g. 'built' + + self.processed_tasks: int = 0 # Number of processed tasks + self.skipped_tasks: int = 0 # Number of skipped tasks + self.failed_tasks: List[str] = [] # List of element full names which failed + + # + # Private members + # + self._state: "State" = state ########################################### # Core-facing APIs to drive notifications # @@ -52,7 +57,7 @@ class TaskGroup: # # This is a core-facing API and should not be called from the frontend # - def add_processed_task(self): + def add_processed_task(self) -> None: self.processed_tasks += 1 for cb in self._state._task_groups_changed_cbs: cb() @@ -63,7 +68,7 @@ class TaskGroup: # # This is a core-facing API and should not be called from the frontend # - def add_skipped_task(self): + def add_skipped_task(self) -> None: self.skipped_tasks += 1 for cb in self._state._task_groups_changed_cbs: @@ -74,19 +79,120 @@ class TaskGroup: # Update the TaskGroup's list of failed tasks and notify of changes # # Args: - # full_name (str): The full name of the task, distinguishing - # it from other tasks with the same action name - # e.g. an element's name. + # full_name: The full name of the task, distinguishing + # it from other tasks with the same action name + # e.g. an element's name. # # This is a core-facing API and should not be called from the frontend # - def add_failed_task(self, full_name): + def add_failed_task(self, full_name: str) -> None: self.failed_tasks.append(full_name) for cb in self._state._task_groups_changed_cbs: cb() +# Task +# +# The state data stored for an individual task +# +# Args: +# state: The State object +# task_id: The unique identifier of the task +# action_name: The name of the action, e.g. 'build' +# full_name: The full name of the task, distinguishing +# it from other tasks with the same action name +# e.g. an element's name. +# elapsed_offset: The time the task started, relative to +# buildstream's start time. +class Task: + def __init__( + self, state: "State", task_id: str, action_name: str, full_name: str, elapsed_offset: datetime.timedelta + ) -> None: + + # + # Public members + # + self.id: str = task_id + self.action_name: str = action_name + self.full_name: str = full_name + self.elapsed_offset: datetime.timedelta = elapsed_offset + self.current_progress: Optional[int] = None + self.maximum_progress: Optional[int] = None + + # + # Private members + # + self._state: "State" = state + self._task_changed_cb: Optional[Callable[[], None]] = None # Callback to call when something could be rendered + + ############################################## + # Core-facing APIs for driving notifications # + ############################################## + + # set_task_changed_callback() + # + # Sets the callback to be called when this task has + # changed. + # + # This is just a convenience codepath for the Messenger object + # run simple tasks outside of the scheduler context, rather + # than connecting to the State callbacks which are there for the + # purpose of the frontend to get notifications of task progress. + # + # Args: + # callback: The callback to call when progress changed + # + def set_task_changed_callback(self, callback: Optional[Callable[[], None]]) -> None: + self._task_changed_cb = callback + + # set_maximum_progress() + # + # Sets the maximum progress possible for this task. + # + # Args: + # progress: The maximum progress possible for this task + # + def set_maximum_progress(self, progress: int) -> None: + self.maximum_progress = progress + self._notify_task_changed() + + # set_current_progress() + # + # Sets the current progress of the task, this should + # be a number between 0 and the maximum progress, if a + # maximum progress has been set. + # + # Args: + # progress: The current progress + # + def set_current_progress(self, progress: int) -> None: + self.current_progress = progress + self._notify_task_changed() + + # add_current_progress() + # + # A convenience function for incrementing the current + # progress of this task by 1. + # + def add_current_progress(self) -> None: + if self.current_progress is None: + new_progress = 1 + else: + new_progress = self.current_progress + 1 + self.set_current_progress(new_progress) + + ############################################## + # Private methods # + ############################################## + def _notify_task_changed(self) -> None: + for cb in self._state._task_changed_cbs: + cb(self.id) + + if self._task_changed_cb: + self._task_changed_cb() + + # State # # The state data that is stored for the purpose of sharing with the frontend. @@ -96,22 +202,26 @@ class TaskGroup: # when parts of State change, and read State to know what has changed. # # Args: -# session_start (datetime): The time the session started +# session_start: The time the session started # class State: - def __init__(self, session_start): - self._session_start = session_start - - self.task_groups = OrderedDict() # key is TaskGroup name - - # Note: A Task's full_name is technically unique, but only accidentally. - self.tasks = OrderedDict() # key is a tuple of action_name and full_name - - self._task_added_cbs = [] - self._task_removed_cbs = [] - self._task_changed_cbs = [] - self._task_groups_changed_cbs = [] - self._task_failed_cbs = [] + def __init__(self, session_start: datetime.datetime) -> None: + + # + # Public members + # + self.task_groups: Dict[str, TaskGroup] = {} # Dictionary of active task groups by group name + self.tasks: Dict[str, Task] = {} # Dictionary of active tasks by unique task ID + + # + # Private members + # + self._session_start: datetime.datetime = session_start + self._task_added_cbs: List[Callable[[str], None]] = [] + self._task_removed_cbs: List[Callable[[str], None]] = [] + self._task_changed_cbs: List[Callable[[str], None]] = [] + self._task_failed_cbs: List[Callable[[str, Optional[Tuple[int, _DisplayKey]]], None]] = [] + self._task_groups_changed_cbs: List[Callable[[], None]] = [] ##################################### # Frontend-facing notification APIs # @@ -122,12 +232,12 @@ class State: # Registers a callback to be notified when a task is added # # Args: - # callback (function): The callback to be notified + # callback: The callback to be notified # # Callback Args: - # task_id (str): The unique identifier of the task + # task_id: The unique identifier of the task # - def register_task_added_callback(self, callback): + def register_task_added_callback(self, callback: Callable[[str], None]) -> None: self._task_added_cbs.append(callback) # unregister_task_added_callback() @@ -136,9 +246,9 @@ class State: # register_task_added_callback() # # Args: - # callback (function): The callback to be removed + # callback: The callback to be removed # - def unregister_task_added_callback(self, callback): + def unregister_task_added_callback(self, callback: Callable[[str], None]) -> None: self._task_added_cbs.remove(callback) # register_task_removed_callback() @@ -146,12 +256,12 @@ class State: # Registers a callback to be notified when a task is removed # # Args: - # callback (function): The callback to be notified + # callback: The callback to be notified # # Callback Args: - # task_id (str): The unique identifier of the task + # task_id: The unique identifier of the task # - def register_task_removed_callback(self, callback): + def register_task_removed_callback(self, callback: Callable[[str], None]) -> None: self._task_removed_cbs.append(callback) # unregister_task_removed_callback() @@ -160,9 +270,9 @@ class State: # register_task_removed_callback() # # Args: - # callback (function): The callback to be notified + # callback: The callback to be notified # - def unregister_task_removed_callback(self, callback): + def unregister_task_removed_callback(self, callback: Callable[[str], None]) -> None: self._task_removed_cbs.remove(callback) # register_task_changed_callback() @@ -170,12 +280,12 @@ class State: # Register a callback to be notified when a task has changed # # Args: - # callback (function): The callback to be notified + # callback: The callback to be notified # # Callback Args: - # task_id (str): The unique identifier of the task + # task_id: The unique identifier of the task # - def register_task_changed_callback(self, callback): + def register_task_changed_callback(self, callback: Callable[[str], None]) -> None: self._task_changed_cbs.append(callback) # unregister_task_changed_callback() @@ -184,9 +294,9 @@ class State: # register_task_changed_callback() # # Args: - # callback (function): The callback to be notified + # callback: The callback to be notified # - def unregister_task_changed_callback(self, callback): + def unregister_task_changed_callback(self, callback: Callable[[str], None]) -> None: self._task_changed_cbs.remove(callback) # register_task_failed_callback() @@ -197,11 +307,12 @@ class State: # callback (function): The callback to be notified # # Callback Args: - # task_id (str): The unique identifier of the task - # element (tuple): (optionally) The element unique_id and display keys if an - # element job + # task_id: The unique identifier of the task + # element: (optionally) The element unique_id and DisplayKey of an element job # - def register_task_failed_callback(self, callback): + def register_task_failed_callback( + self, callback: Callable[[str, Optional[Tuple[int, _DisplayKey]]], None] + ) -> None: self._task_failed_cbs.append(callback) # unregister_task_failed_callback() @@ -212,9 +323,35 @@ class State: # Args: # callback (function): The callback to be removed # - def unregister_task_failed_callback(self, callback): + def unregister_task_failed_callback( + self, callback: Callable[[str, Optional[Tuple[int, _DisplayKey]]], None] + ) -> None: self._task_failed_cbs.remove(callback) + # register_task_groups_changed_callback() + # + # Registers a callback to be notified whenever the task groups info has changed + # + # Args: + # callback: The callback to be notified + # + # Callback Args: + # task_id: The unique identifier of the task + # element: (optionally) The element unique_id and DisplayKey of an element job + # + def register_task_groups_changed_callback(self, callback: Callable[[], None]) -> None: + self._task_groups_changed_cbs.append(callback) + + # unregister_task_groups_changed_callback() + # + # Unregisters a callback previously registered by register_task_groups_changed_callback() + # + # Args: + # callback (function): The callback to be removed + # + def unregister_task_groups_changed_callback(self, callback: Callable[[], None]) -> None: + self._task_groups_changed_cbs.remove(callback) + ############################################## # Core-facing APIs for driving notifications # ############################################## @@ -232,7 +369,7 @@ class State: # Returns: # TaskGroup: The task group created # - def add_task_group(self, name, complete_name=None): + def add_task_group(self, name, complete_name=None) -> TaskGroup: assert name not in self.task_groups, "Trying to add task group '{}' to '{}'".format(name, self.task_groups) group = TaskGroup(name, self, complete_name) self.task_groups[name] = group @@ -248,7 +385,7 @@ class State: # Args: # name (str): The name of the task group, e.g. 'build' # - def remove_task_group(self, name): + def remove_task_group(self, name) -> None: # Rely on 'del' to raise an error when removing nonexistent task groups del self.task_groups[name] @@ -259,9 +396,9 @@ class State: # This is a core-facing API and should not be called from the frontend # # Args: - # task_id (str): The unique identifier of the task - # action_name (str): The name of the action, e.g. 'build' - # full_name (str): The full name of the task, distinguishing + # task_id: The unique identifier of the task + # action_name: The name of the action, e.g. 'build' + # full_name: The full name of the task, distinguishing # it from other tasks with the same action name # e.g. an element's name. # elapsed_offset (timedelta): (Optional) The time the task started, relative @@ -269,7 +406,12 @@ class State: # use this as they don't report relative to wallclock time # if the Scheduler has been suspended. # - def add_task(self, task_id, action_name, full_name, elapsed_offset=None): + # Returns: + # The new task + # + def add_task( + self, task_id: str, action_name: str, full_name: str, elapsed_offset: Optional[datetime.timedelta] = None + ) -> Task: assert task_id not in self.tasks, "Trying to add task '{}:{}' with ID '{}' to '{}'".format( action_name, full_name, task_id, self.tasks ) @@ -292,9 +434,9 @@ class State: # This is a core-facing API and should not be called from the frontend # # Args: - # task_id (str): The unique identifier of the task + # task_id: The unique identifier of the task # - def remove_task(self, task_id): + def remove_task(self, task_id: str) -> None: # Rely on 'del' to raise an error when removing nonexistent tasks del self.tasks[task_id] @@ -311,11 +453,11 @@ class State: # This is a core-facing API and should not be called from the frontend # # Args: - # task_id (str): The unique identifier of the task - # element (tuple): (optionally) The element unique_id and display keys if an + # task_id: The unique identifier of the task + # element: (optionally) The element unique_id and display keys if an # element job # - def fail_task(self, task_id, element=None): + def fail_task(self, task_id: str, element: Optional[Tuple[int, _DisplayKey]] = None) -> None: for cb in self._task_failed_cbs: cb(task_id, element) @@ -324,14 +466,14 @@ class State: # Fetches the current session elapsed time # # Args: - # start_time(time): Optional explicit start time, relative to caller. + # start_time: Optional explicit start time, relative to caller. # # Returns: - # (timedelta): The amount of time since the start of the session, - # discounting any time spent while jobs were suspended if - # start_time given relative to the Scheduler + # The amount of time since the start of the session, + # discounting any time spent while jobs were suspended if + # start_time given relative to the Scheduler # - def elapsed_time(self, start_time=None): + def elapsed_time(self, start_time: Optional[datetime.datetime] = None) -> datetime.timedelta: time_now = datetime.datetime.now() if start_time is None: start_time = self._session_start or time_now @@ -350,61 +492,3 @@ class State: # def offset_start_time(self, offset: datetime.timedelta) -> None: self._session_start += offset - - -# Task -# -# The state data stored for an individual task -# -# Args: -# state (State): The State object -# task_id (str): The unique identifier of the task -# action_name (str): The name of the action, e.g. 'build' -# full_name (str): The full name of the task, distinguishing -# it from other tasks with the same action name -# e.g. an element's name. -# elapsed_offset (timedelta): The time the task started, relative to -# buildstream's start time. -class Task: - def __init__(self, state, task_id, action_name, full_name, elapsed_offset): - self._state = state - self.id = task_id - self.action_name = action_name - self.full_name = full_name - self.elapsed_offset = elapsed_offset - self.current_progress = None - self.maximum_progress = None - - self._render_cb = None # Callback to call when something could be rendered - - # set_render_cb() - # - # Sets the callback to be called when the Task has changed and should be rendered - # - # NOTE: This should probably be removed once the frontend is running - # separately from the scheduler, since renders could be triggered - # by the scheduler. - def set_render_cb(self, callback): - self._render_cb = callback - - def set_current_progress(self, progress): - self.current_progress = progress - for cb in self._state._task_changed_cbs: - cb(self.id) - if self._render_cb: - self._render_cb() - - def set_maximum_progress(self, progress): - self.maximum_progress = progress - for cb in self._state._task_changed_cbs: - cb(self.id) - - if self._render_cb: - self._render_cb() - - def add_current_progress(self): - if self.current_progress is None: - new_progress = 1 - else: - new_progress = self.current_progress + 1 - self.set_current_progress(new_progress) |