diff options
author | Tom Pollard <tom.pollard@codethink.co.uk> | 2019-06-13 17:47:04 +0100 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-09-10 10:44:53 +0000 |
commit | 56365356a20540c3319107fe647d0852321a64cb (patch) | |
tree | e666faa9979c248506cd60da5ee96860527ee495 | |
parent | a200338e3b1947c0d8e7a89b1977f753988dc76e (diff) | |
download | buildstream-56365356a20540c3319107fe647d0852321a64cb.tar.gz |
scheduler.py: Introduce a notification queue for stream interaction
For the 'backend' of BuildStream to run in a subprocess certain
interactions between the 'frontend' need to be augmented over a
queue between the processes, as well as adaptions for the differences
in state between them. This commit implements the core of these
interactions as notification objects over a deque with notifying
callbacks on either side, which should be switched to a
multiprocessing queue once there's process separation at which
point 'both ends' can execute an event loop.
This also removes the checks for the ticker & interrupt callbacks
to be optional, as currently they're always implemented.
-rw-r--r-- | src/buildstream/_scheduler/__init__.py | 2 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 101 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 42 |
3 files changed, 117 insertions, 28 deletions
diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py index d2f458fa5..d689d6e25 100644 --- a/src/buildstream/_scheduler/__init__.py +++ b/src/buildstream/_scheduler/__init__.py @@ -26,5 +26,5 @@ from .queues.buildqueue import BuildQueue from .queues.artifactpushqueue import ArtifactPushQueue from .queues.pullqueue import PullQueue -from .scheduler import Scheduler, SchedStatus +from .scheduler import Scheduler, SchedStatus, Notification, NotificationType from .jobs import ElementJob, JobStatus diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 37295b285..e6a12e81c 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -29,16 +29,59 @@ from contextlib import contextmanager # Local imports from .resources import Resources from .jobs import JobStatus +from ..types import FastEnum from .._profile import Topics, PROFILER # A decent return code for Scheduler.run() -class SchedStatus(): +class SchedStatus(FastEnum): SUCCESS = 0 ERROR = -1 TERMINATED = 1 +# NotificationType() +# +# Type of notification for inter-process communication +# between 'front' & 'back' end when a scheduler is executing. +# This is used as a parameter for a Notification object, +# to be used as a conditional for control or state handling. +# +class NotificationType(FastEnum): + INTERRUPT = "interrupt" + JOB_START = "job_start" + JOB_COMPLETE = "job_complete" + TICK = "tick" + TERMINATE = "terminate" + QUIT = "quit" + + +# Notification() +# +# An object to be passed across a bidirectional queue between +# Stream & Scheduler. A required NotificationType() parameter +# with accompanying information can be added as a member if +# required. NOTE: The notification object should be lightweight +# and all attributes must be picklable. +# +class Notification(): + + def __init__(self, + notification_type, + *, + full_name=None, + job_action=None, + job_status=None, + elapsed_time=None, + element=None): + self.notification_type = notification_type + self.full_name = full_name + self.job_action = job_action + self.job_status = job_status + self.elapsed_time = elapsed_time + self.element = element + + # Scheduler() # # The scheduler operates on a list queues, each of which is meant to accomplish @@ -62,9 +105,7 @@ class SchedStatus(): class Scheduler(): def __init__(self, context, - start_time, state, - interrupt_callback=None, - ticker_callback=None): + start_time, state, notification_queue, notifier): # # Public members @@ -87,9 +128,9 @@ class Scheduler(): self._queue_jobs = True # Whether we should continue to queue jobs self._state = state - # Callbacks to report back to the Scheduler owner - self._interrupt_callback = interrupt_callback - self._ticker_callback = ticker_callback + # Bidirectional queue to send notifications back to the Scheduler's owner + self._notification_queue = notification_queue + self._notifier = notifier self.resources = Resources(context.sched_builders, context.sched_fetchers, @@ -121,8 +162,7 @@ class Scheduler(): asyncio.set_event_loop(self.loop) # Add timeouts - if self._ticker_callback: - self.loop.call_later(1, self._tick) + self.loop.call_later(1, self._tick) # Handle unix signals while running self._connect_signals() @@ -234,11 +274,10 @@ class Scheduler(): # status (JobStatus): The status of the completed job # def job_completed(self, job, status): - # Remove from the active jobs list self._active_jobs.remove(job) - self._state.remove_task(job.action_name, job.name) + element_info = None if status == JobStatus.FAIL: # If it's an elementjob, we want to compare against the failure messages # and send the unique_id and display key tuple of the Element. This can then @@ -248,9 +287,14 @@ class Scheduler(): element_info = element._unique_id, element._get_display_key() else: element_info = None - self._state.fail_task(job.action_name, job.name, element=element_info) # Now check for more jobs + notification = Notification(NotificationType.JOB_COMPLETE, + full_name=job.name, + job_action=job.action_name, + job_status=status, + element=element_info) + self._notify(notification) self._sched() ####################################################### @@ -266,7 +310,11 @@ class Scheduler(): # def _start_job(self, job): self._active_jobs.append(job) - self._state.add_task(job.action_name, job.name, self.elapsed_time()) + notification = Notification(NotificationType.JOB_START, + full_name=job.name, + job_action=job.action_name, + elapsed_time=self.elapsed_time()) + self._notify(notification) job.start() # _sched_queue_jobs() @@ -379,13 +427,8 @@ class Scheduler(): if self.terminated: return - # Leave this to the frontend to decide, if no - # interrrupt callback was specified, then just terminate. - if self._interrupt_callback: - self._interrupt_callback() - else: - # Default without a frontend is just terminate - self.terminate_jobs() + notification = Notification(NotificationType.INTERRUPT) + self._notify(notification) # _terminate_event(): # @@ -444,9 +487,25 @@ class Scheduler(): # Regular timeout for driving status in the UI def _tick(self): - self._ticker_callback() + self._notify(Notification(NotificationType.TICK)) self.loop.call_later(1, self._tick) + def _notify(self, notification): + # Scheduler to Stream notifcations on right side + self._notification_queue.append(notification) + self._notifier() + + def _stream_notification_handler(self): + notification = self._notification_queue.popleft() + if notification.notification_type == NotificationType.TERMINATE: + self.terminate_jobs() + elif notification.notification_type == NotificationType.QUIT: + self.stop_queueing() + else: + # Do not raise exception once scheduler process is separated + # as we don't want to pickle exceptions between processes + raise ValueError("Unrecognised notification type received") + def __getstate__(self): # The only use-cases for pickling in BuildStream at the time of writing # are enabling the 'spawn' method of starting child processes, and diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 2e43bb1a2..bbe60c1ec 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -28,12 +28,13 @@ import tarfile import tempfile from contextlib import contextmanager, suppress from fnmatch import fnmatch +from collections import deque from ._artifactelement import verify_artifact_ref, ArtifactElement from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError from ._message import Message, MessageType from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \ - SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue + SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, Notification, JobStatus from ._pipeline import Pipeline, PipelineSelection from ._profile import Topics, PROFILER from ._state import State @@ -78,14 +79,17 @@ class Stream(): self._project = None self._pipeline = None self._state = State(session_start) # Owned by Stream, used by Core to set state + self._notification_queue = deque() context.messenger.set_state(self._state) - self._scheduler = Scheduler(context, session_start, self._state, - interrupt_callback=interrupt_callback, - ticker_callback=ticker_callback) + self._scheduler = Scheduler(context, session_start, self._state, self._notification_queue, + self._scheduler_notification_handler) self._first_non_track_queue = None self._session_start_callback = session_start_callback + self._ticker_callback = ticker_callback + self._interrupt_callback = interrupt_callback + self._notifier = self._scheduler._stream_notification_handler # Assign the schedulers notification handler # init() # @@ -1103,7 +1107,8 @@ class Stream(): # Terminate jobs # def terminate(self): - self._scheduler.terminate_jobs() + notification = Notification(NotificationType.TERMINATE) + self._notify(notification) # quit() # @@ -1112,7 +1117,8 @@ class Stream(): # of ongoing jobs # def quit(self): - self._scheduler.stop_queueing() + notification = Notification(NotificationType.QUIT) + self._notify(notification) # suspend() # @@ -1642,6 +1648,30 @@ class Stream(): return element_targets, artifact_refs + def _scheduler_notification_handler(self): + # Check the queue is there and a scheduler is running + assert self._notification_queue and self.running + notification = self._notification_queue.pop() + + if notification.notification_type == NotificationType.INTERRUPT: + self._interrupt_callback() + elif notification.notification_type == NotificationType.TICK: + self._ticker_callback() + elif notification.notification_type == NotificationType.JOB_START: + self._state.add_task(notification.job_action, notification.full_name, notification.elapsed_time) + elif notification.notification_type == NotificationType.JOB_COMPLETE: + self._state.remove_task(notification.job_action, notification.full_name) + if notification.job_status == JobStatus.FAIL: + self._state.fail_task(notification.job_action, notification.full_name, + notification.element) + else: + raise StreamError("Unrecognised notification type received") + + def _notify(self, notification): + # Stream to scheduler notifcations on left side + self._notification_queue.appendleft(notification) + self._notifier() + def __getstate__(self): # The only use-cases for pickling in BuildStream at the time of writing # are enabling the 'spawn' method of starting child processes, and |