diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2019-09-10 11:17:39 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-09-10 11:17:39 +0000 |
commit | 826e12a2804dfa49d55eb59f07dd2d116ada10c2 (patch) | |
tree | b6c38c31da12c85d486bf4333f9559ea86f85f7a | |
parent | a200338e3b1947c0d8e7a89b1977f753988dc76e (diff) | |
parent | a97af8edd7e1e9385c45fca2761f7327f9180059 (diff) | |
download | buildstream-826e12a2804dfa49d55eb59f07dd2d116ada10c2.tar.gz |
Merge branch 'tpollard/notificationhandler' into 'master'
Stream - Scheduler notification handler
See merge request BuildStream/buildstream!1550
-rw-r--r-- | src/buildstream/_frontend/app.py | 6 | ||||
-rw-r--r-- | src/buildstream/_scheduler/__init__.py | 2 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 6 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/queue.py | 3 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 182 | ||||
-rw-r--r-- | src/buildstream/_state.py | 24 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 90 |
7 files changed, 234 insertions, 79 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py index f9729a7ce..45160afbc 100644 --- a/src/buildstream/_frontend/app.py +++ b/src/buildstream/_frontend/app.py @@ -663,11 +663,7 @@ class App(): elif choice == 'retry': click.echo("\nRetrying failed job\n", err=True) unique_id = element[0] - try: - self.stream._failure_retry(action_name, unique_id) - except StreamError: - click.echo("Job action {} does not have a corresponding queue".format(action_name), err=True) - self.stream.terminate() + self.stream._failure_retry(action_name, unique_id) # # Print the session heading if we've loaded a pipeline and there diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py index d2f458fa5..d689d6e25 100644 --- a/src/buildstream/_scheduler/__init__.py +++ b/src/buildstream/_scheduler/__init__.py @@ -26,5 +26,5 @@ from .queues.buildqueue import BuildQueue from .queues.artifactpushqueue import ArtifactPushQueue from .queues.pullqueue import PullQueue -from .scheduler import Scheduler, SchedStatus +from .scheduler import Scheduler, SchedStatus, Notification, NotificationType from .jobs import ElementJob, JobStatus diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 9af08df92..913e27ea2 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -393,8 +393,8 @@ class Job(): element_name = self._message_element_name if element_key is None: element_key = self._message_element_key - self._scheduler.context.messenger.message( - Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs)) + message = Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs) + self._scheduler.notify_messenger(message) # get_element() # @@ -536,7 +536,7 @@ class Job(): if envelope.message_type is _MessageType.LOG_MESSAGE: # Propagate received messages from children # back through the context. - self._scheduler.context.messenger.message(envelope.message) + self._scheduler.notify_messenger(envelope.message) elif envelope.message_type is _MessageType.ERROR: # For regression tests only, save the last error domain / reason # reported from a child task in the main process, this global state diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index 745b59417..6c6dfdc4f 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -345,9 +345,8 @@ class Queue(): # Convenience wrapper for Queue implementations to send # a message for the element they are processing def _message(self, element, message_type, brief, **kwargs): - context = element._get_context() message = Message(message_type, brief, element_name=element._get_full_name(), **kwargs) - context.messenger.message(message) + self._scheduler.notify_messenger(message) def _element_log_path(self, element): project = element._get_project() diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 37295b285..d0a189545 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -24,21 +24,74 @@ import asyncio from itertools import chain import signal import datetime -from contextlib import contextmanager # Local imports from .resources import Resources from .jobs import JobStatus +from ..types import FastEnum from .._profile import Topics, PROFILER +from ..plugin import Plugin # A decent return code for Scheduler.run() -class SchedStatus(): +class SchedStatus(FastEnum): SUCCESS = 0 ERROR = -1 TERMINATED = 1 +# NotificationType() +# +# Type of notification for inter-process communication +# between 'front' & 'back' end when a scheduler is executing. +# This is used as a parameter for a Notification object, +# to be used as a conditional for control or state handling. +# +class NotificationType(FastEnum): + INTERRUPT = "interrupt" + JOB_START = "job_start" + JOB_COMPLETE = "job_complete" + TICK = "tick" + TERMINATE = "terminate" + QUIT = "quit" + SCHED_START_TIME = "sched_start_time" + RUNNING = "running" + TERMINATED = "terminated" + SUSPEND = "suspend" + UNSUSPEND = "unsuspend" + SUSPENDED = "suspended" + RETRY = "retry" + MESSAGE = "message" + + +# Notification() +# +# An object to be passed across a bidirectional queue between +# Stream & Scheduler. A required NotificationType() parameter +# with accompanying information can be added as a member if +# required. NOTE: The notification object should be lightweight +# and all attributes must be picklable. +# +class Notification(): + + def __init__(self, + notification_type, + *, + full_name=None, + job_action=None, + job_status=None, + time=None, + element=None, + message=None): + self.notification_type = notification_type + self.full_name = full_name + self.job_action = job_action + self.job_status = job_status + self.time = time + self.element = element + self.message = message + + # Scheduler() # # The scheduler operates on a list queues, each of which is meant to accomplish @@ -62,9 +115,7 @@ class SchedStatus(): class Scheduler(): def __init__(self, context, - start_time, state, - interrupt_callback=None, - ticker_callback=None): + start_time, state, notification_queue, notifier): # # Public members @@ -87,9 +138,9 @@ class Scheduler(): self._queue_jobs = True # Whether we should continue to queue jobs self._state = state - # Callbacks to report back to the Scheduler owner - self._interrupt_callback = interrupt_callback - self._ticker_callback = ticker_callback + # Bidirectional queue to send notifications back to the Scheduler's owner + self._notification_queue = notification_queue + self._notifier = notifier self.resources = Resources(context.sched_builders, context.sched_fetchers, @@ -120,9 +171,11 @@ class Scheduler(): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) + # Notify that the loop has been created + self._notify(Notification(NotificationType.RUNNING)) + # Add timeouts - if self._ticker_callback: - self.loop.call_later(1, self._tick) + self.loop.call_later(1, self._tick) # Handle unix signals while running self._connect_signals() @@ -140,6 +193,9 @@ class Scheduler(): failed = any(queue.any_failed_elements() for queue in self.queues) self.loop = None + # Notify that the loop has been reset + self._notify(Notification(NotificationType.RUNNING)) + if failed: status = SchedStatus.ERROR elif self.terminated: @@ -181,6 +237,10 @@ class Scheduler(): # attribute to decide whether or not to print status info # etc and the following code block will trigger some callbacks. self.terminated = True + + # Notify the frontend that we're terminated as it might be + # from an interactive prompt callback or SIGTERM + self._notify(Notification(NotificationType.TERMINATED)) self.loop.call_soon(self._terminate_jobs_real) # Block this until we're finished terminating jobs, @@ -189,15 +249,17 @@ class Scheduler(): # jobs_suspended() # - # A context manager for running with jobs suspended + # Suspend jobs after being notified # - @contextmanager def jobs_suspended(self): self._disconnect_signals() self._suspend_jobs() - yield - + # jobs_unsuspended() + # + # Unsuspend jobs after being notified + # + def jobs_unsuspended(self): self._resume_jobs() self._connect_signals() @@ -209,21 +271,6 @@ class Scheduler(): def stop_queueing(self): self._queue_jobs = False - # elapsed_time() - # - # Fetches the current session elapsed time - # - # Returns: - # (timedelta): The amount of time since the start of the session, - # discounting any time spent while jobs were suspended. - # - def elapsed_time(self): - timenow = datetime.datetime.now() - starttime = self._starttime - if not starttime: - starttime = timenow - return timenow - starttime - # job_completed(): # # Called when a Job completes @@ -234,11 +281,10 @@ class Scheduler(): # status (JobStatus): The status of the completed job # def job_completed(self, job, status): - # Remove from the active jobs list self._active_jobs.remove(job) - self._state.remove_task(job.action_name, job.name) + element_info = None if status == JobStatus.FAIL: # If it's an elementjob, we want to compare against the failure messages # and send the unique_id and display key tuple of the Element. This can then @@ -248,11 +294,27 @@ class Scheduler(): element_info = element._unique_id, element._get_display_key() else: element_info = None - self._state.fail_task(job.action_name, job.name, element=element_info) # Now check for more jobs + notification = Notification(NotificationType.JOB_COMPLETE, + full_name=job.name, + job_action=job.action_name, + job_status=status, + element=element_info) + self._notify(notification) self._sched() + # notify_messenger() + # + # Send message over notification queue to Messenger callback + # + # Args: + # message (Message): A Message() to be sent to the frontend message + # handler, as assigned by context's messenger. + # + def notify_messenger(self, message): + self._notify(Notification(NotificationType.MESSAGE, message=message)) + ####################################################### # Local Private Methods # ####################################################### @@ -266,7 +328,11 @@ class Scheduler(): # def _start_job(self, job): self._active_jobs.append(job) - self._state.add_task(job.action_name, job.name, self.elapsed_time()) + notification = Notification(NotificationType.JOB_START, + full_name=job.name, + job_action=job.action_name, + time=self._state.elapsed_time(start_time=self._starttime)) + self._notify(notification) job.start() # _sched_queue_jobs() @@ -350,6 +416,8 @@ class Scheduler(): if not self.suspended: self._suspendtime = datetime.datetime.now() self.suspended = True + # Notify that we're suspended + self._notify(Notification(NotificationType.SUSPENDED)) for job in self._active_jobs: job.suspend() @@ -362,7 +430,10 @@ class Scheduler(): for job in self._active_jobs: job.resume() self.suspended = False + # Notify that we're unsuspended + self._notify(Notification(NotificationType.SUSPENDED)) self._starttime += (datetime.datetime.now() - self._suspendtime) + self._notify(Notification(NotificationType.SCHED_START_TIME, time=self._starttime)) self._suspendtime = None # _interrupt_event(): @@ -379,13 +450,8 @@ class Scheduler(): if self.terminated: return - # Leave this to the frontend to decide, if no - # interrrupt callback was specified, then just terminate. - if self._interrupt_callback: - self._interrupt_callback() - else: - # Default without a frontend is just terminate - self.terminate_jobs() + notification = Notification(NotificationType.INTERRUPT) + self._notify(notification) # _terminate_event(): # @@ -444,9 +510,43 @@ class Scheduler(): # Regular timeout for driving status in the UI def _tick(self): - self._ticker_callback() + self._notify(Notification(NotificationType.TICK)) self.loop.call_later(1, self._tick) + def _failure_retry(self, action_name, unique_id): + queue = None + for q in self.queues: + if q.action_name == action_name: + queue = q + break + # Assert queue found, we should only be retrying a queued job + assert queue + element = Plugin._lookup(unique_id) + queue._task_group.failed_tasks.remove(element._get_full_name()) + queue.enqueue([element]) + + def _notify(self, notification): + # Scheduler to Stream notifcations on right side + self._notification_queue.append(notification) + self._notifier() + + def _stream_notification_handler(self): + notification = self._notification_queue.popleft() + if notification.notification_type == NotificationType.TERMINATE: + self.terminate_jobs() + elif notification.notification_type == NotificationType.QUIT: + self.stop_queueing() + elif notification.notification_type == NotificationType.SUSPEND: + self.jobs_suspended() + elif notification.notification_type == NotificationType.UNSUSPEND: + self.jobs_unsuspended() + elif notification.notification_type == NotificationType.RETRY: + self._failure_retry(notification.job_action, notification.element) + else: + # Do not raise exception once scheduler process is separated + # as we don't want to pickle exceptions between processes + raise ValueError("Unrecognised notification type received") + def __getstate__(self): # The only use-cases for pickling in BuildStream at the time of writing # are enabling the 'spawn' method of starting child processes, and diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py index df3bceff2..c99434018 100644 --- a/src/buildstream/_state.py +++ b/src/buildstream/_state.py @@ -272,7 +272,9 @@ class State(): # it from other tasks with the same action name # e.g. an element's name. # elapsed_offset (timedelta): (Optional) The time the task started, relative - # to buildstream's start time. + # to buildstream's start time. Note scheduler tasks + # use this as they don't report relative to wallclock time + # if the Scheduler has been suspended. # def add_task(self, action_name, full_name, elapsed_offset=None): task_key = (action_name, full_name) @@ -280,7 +282,7 @@ class State(): "Trying to add task '{}:{}' to '{}'".format(action_name, full_name, self.tasks) if not elapsed_offset: - elapsed_offset = datetime.datetime.now() - self._session_start + elapsed_offset = self.elapsed_time() task = _Task(self, action_name, full_name, elapsed_offset) self.tasks[task_key] = task @@ -330,6 +332,24 @@ class State(): for cb in self._task_failed_cbs: cb(action_name, full_name, element) + # elapsed_time() + # + # Fetches the current session elapsed time + # + # Args: + # start_time(time): Optional explicit start time, relative to caller. + # + # Returns: + # (timedelta): The amount of time since the start of the session, + # discounting any time spent while jobs were suspended if + # start_time given relative to the Scheduler + # + def elapsed_time(self, start_time=None): + time_now = datetime.datetime.now() + if start_time is None: + start_time = self._session_start or time_now + return time_now - start_time + # _Task # diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 2e43bb1a2..293ba051d 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -28,12 +28,13 @@ import tarfile import tempfile from contextlib import contextmanager, suppress from fnmatch import fnmatch +from collections import deque from ._artifactelement import verify_artifact_ref, ArtifactElement from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError from ._message import Message, MessageType from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \ - SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue + SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, Notification, JobStatus from ._pipeline import Pipeline, PipelineSelection from ._profile import Topics, PROFILER from ._state import State @@ -78,14 +79,21 @@ class Stream(): self._project = None self._pipeline = None self._state = State(session_start) # Owned by Stream, used by Core to set state + self._notification_queue = deque() + self._starttime = session_start # Synchronised with Scheduler's relative start time context.messenger.set_state(self._state) - self._scheduler = Scheduler(context, session_start, self._state, - interrupt_callback=interrupt_callback, - ticker_callback=ticker_callback) + self._scheduler = Scheduler(context, session_start, self._state, self._notification_queue, + self._scheduler_notification_handler) self._first_non_track_queue = None self._session_start_callback = session_start_callback + self._ticker_callback = ticker_callback + self._interrupt_callback = interrupt_callback + self._notifier = self._scheduler._stream_notification_handler # Assign the schedulers notification handler + self._scheduler_running = False + self._scheduler_terminated = False + self._scheduler_suspended = False # init() # @@ -1072,7 +1080,7 @@ class Stream(): # @property def running(self): - return self._scheduler.loop is not None + return self._scheduler_running # suspended # @@ -1080,7 +1088,7 @@ class Stream(): # @property def suspended(self): - return self._scheduler.suspended + return self._scheduler_suspended # terminated # @@ -1088,7 +1096,7 @@ class Stream(): # @property def terminated(self): - return self._scheduler.terminated + return self._scheduler_terminated # elapsed_time # @@ -1096,14 +1104,15 @@ class Stream(): # @property def elapsed_time(self): - return self._scheduler.elapsed_time() + return self._state.elapsed_time(start_time=self._starttime) # terminate() # # Terminate jobs # def terminate(self): - self._scheduler.terminate_jobs() + notification = Notification(NotificationType.TERMINATE) + self._notify(notification) # quit() # @@ -1112,7 +1121,8 @@ class Stream(): # of ongoing jobs # def quit(self): - self._scheduler.stop_queueing() + notification = Notification(NotificationType.QUIT) + self._notify(notification) # suspend() # @@ -1120,8 +1130,13 @@ class Stream(): # @contextmanager def suspend(self): - with self._scheduler.jobs_suspended(): - yield + # Send the notification to suspend jobs + notification = Notification(NotificationType.SUSPEND) + self._notify(notification) + yield + # Unsuspend jobs on context exit + notification = Notification(NotificationType.UNSUSPEND) + self._notify(notification) ############################################################# # Private Methods # @@ -1323,20 +1338,11 @@ class Stream(): # action_name (str): The name of the action being performed # unique_id (str): A unique_id to load an Element instance # - # Raises: - # (StreamError): If the related queue cannot be found - # def _failure_retry(self, action_name, unique_id): - queue = None - # Attempt to resolve the required queue - for queue in self.queues: - if queue.action_name == action_name: - queue = queue - if not queue: - raise StreamError() - element = Plugin._lookup(unique_id) - queue._task_group.failed_tasks.remove(element._get_full_name()) - queue.enqueue([element]) + notification = Notification(NotificationType.RETRY, + job_action=action_name, + element=unique_id) + self._notify(notification) # _run() # @@ -1642,6 +1648,40 @@ class Stream(): return element_targets, artifact_refs + def _scheduler_notification_handler(self): + # Check the queue is there + assert self._notification_queue + notification = self._notification_queue.pop() + + if notification.notification_type == NotificationType.MESSAGE: + self._context.messenger.message(notification.message) + elif notification.notification_type == NotificationType.INTERRUPT: + self._interrupt_callback() + elif notification.notification_type == NotificationType.TICK: + self._ticker_callback() + elif notification.notification_type == NotificationType.JOB_START: + self._state.add_task(notification.job_action, notification.full_name, notification.time) + elif notification.notification_type == NotificationType.JOB_COMPLETE: + self._state.remove_task(notification.job_action, notification.full_name) + if notification.job_status == JobStatus.FAIL: + self._state.fail_task(notification.job_action, notification.full_name, + notification.element) + elif notification.notification_type == NotificationType.SCHED_START_TIME: + self._starttime = notification.time + elif notification.notification_type == NotificationType.RUNNING: + self._scheduler_running = not self._scheduler_running + elif notification.notification_type == NotificationType.TERMINATED: + self._scheduler_terminated = True + elif notification.notification_type == NotificationType.SUSPENDED: + self._scheduler_suspended = not self._scheduler_suspended + else: + raise StreamError("Unrecognised notification type received") + + def _notify(self, notification): + # Stream to scheduler notifcations on left side + self._notification_queue.appendleft(notification) + self._notifier() + def __getstate__(self): # The only use-cases for pickling in BuildStream at the time of writing # are enabling the 'spawn' method of starting child processes, and |