diff options
author | Phil Dawson <phil.dawson@codethink.co.uk> | 2019-06-13 17:47:04 +0100 |
---|---|---|
committer | Tom Pollard <tom.pollard@codethink.co.uk> | 2019-08-15 12:38:19 +0100 |
commit | 7f7c55f040b8bc42c5c66f0cbbb173d09bef856e (patch) | |
tree | 5215d6f0b96fe4a6e783e265f7c114b712e7358a | |
parent | cf3f925764cfe6949aedbdcd313a93654f7c7657 (diff) | |
download | buildstream-7f7c55f040b8bc42c5c66f0cbbb173d09bef856e.tar.gz |
WIP: Refactor scheduler-frontend communication
-rw-r--r-- | src/buildstream/_frontend/app.py | 2 | ||||
-rw-r--r-- | src/buildstream/_scheduler/__init__.py | 2 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 72 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 25 |
4 files changed, 81 insertions, 20 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py index 87575b675..7fe71c55f 100644 --- a/src/buildstream/_frontend/app.py +++ b/src/buildstream/_frontend/app.py @@ -603,6 +603,7 @@ class App(): pass return + assert False # Interactive mode for element failures with self._interrupted(): @@ -642,6 +643,7 @@ class App(): # Handle choices which you can come back from # + assert choice != 'shell' # This won't work for now if choice == 'shell': click.echo("\nDropping into an interactive shell in the failed build sandbox\n", err=True) try: diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py index d2f458fa5..d689d6e25 100644 --- a/src/buildstream/_scheduler/__init__.py +++ b/src/buildstream/_scheduler/__init__.py @@ -26,5 +26,5 @@ from .queues.buildqueue import BuildQueue from .queues.artifactpushqueue import ArtifactPushQueue from .queues.pullqueue import PullQueue -from .scheduler import Scheduler, SchedStatus +from .scheduler import Scheduler, SchedStatus, Notification, NotificationType from .jobs import ElementJob, JobStatus diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 9d7cf5d09..0ed5ada09 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -21,6 +21,7 @@ # System imports import os import asyncio +import enum from itertools import chain import signal import datetime @@ -45,6 +46,32 @@ _ACTION_NAME_CLEANUP = 'clean' _ACTION_NAME_CACHE_SIZE = 'size' +@enum.unique +class NotificationType(enum.Enum): + INTERRUPT = "interrupt" + JOB_START = "job_start" + JOB_COMPLETE = "job_complete" + TICK = "tick" + + +class Notification: + + def __init__(self, + notification_type, + *, + full_name=None, + job_action=None, + job_status=None, + elapsed_time=None, + element=None): + self.notification_type = notification_type + self.full_name = full_name + self.job_action = job_action + self.job_status = job_status + self.elapsed_time = elapsed_time + self.element = element + + # Scheduler() # # The scheduler operates on a list queues, each of which is meant to accomplish @@ -68,7 +95,7 @@ _ACTION_NAME_CACHE_SIZE = 'size' class Scheduler(): def __init__(self, context, - start_time, state, + start_time, state, message_handler, interrupt_callback=None, ticker_callback=None): @@ -99,9 +126,17 @@ class Scheduler(): self._cleanup_scheduled = False # Whether we have a cleanup job scheduled self._cleanup_running = None # A running CleanupJob, or None - # Callbacks to report back to the Scheduler owner - self._interrupt_callback = interrupt_callback - self._ticker_callback = ticker_callback + # Callback to send messages to report back to the Scheduler's owner + self.message = message_handler + + # Whether our exclusive jobs, like 'cleanup' are currently already + # waiting or active. + # + # This is just a bit quicker than scanning the wait queue and active + # queue and comparing job action names. + # + self._exclusive_waiting = set() + self._exclusive_active = set() self.resources = Resources(context.sched_builders, context.sched_fetchers, @@ -131,8 +166,7 @@ class Scheduler(): asyncio.set_event_loop(self.loop) # Add timeouts - if self._ticker_callback: - self.loop.call_later(1, self._tick) + self.loop.call_later(1, self._tick) # Handle unix signals while running self._connect_signals() @@ -251,13 +285,17 @@ class Scheduler(): # Remove from the active jobs list self._active_jobs.remove(job) - self._state.remove_task(job.action_name, job.name) if status == JobStatus.FAIL: # If it's an elementjob, we want to compare against the failure messages # and send the Element() instance. Note this will change if the frontend # is run in a separate process for pickling element = job.get_element() - self._state.fail_task(job.action_name, job.name, element=element) + message = Notification(NotificationType.JOB_COMPLETE, + full_name=job.name, + job_action=job.action_name, + job_status=status, + element=element) + self.message(message) # Now check for more jobs self._sched() @@ -316,7 +354,11 @@ class Scheduler(): # def _start_job(self, job): self._active_jobs.append(job) - self._state.add_task(job.action_name, job.name, self.elapsed_time()) + message = Notification(NotificationType.JOB_START, + full_name=job.name, + job_action=job.action_name, + elapsed_time=self.elapsed_time()) + self.message(message) job.start() # Callback for the cache size job @@ -535,13 +577,8 @@ class Scheduler(): if self.terminated: return - # Leave this to the frontend to decide, if no - # interrrupt callback was specified, then just terminate. - if self._interrupt_callback: - self._interrupt_callback() - else: - # Default without a frontend is just terminate - self.terminate_jobs() + message = Notification(NotificationType.INTERRUPT) + self.message(message) # _terminate_event(): # @@ -600,7 +637,8 @@ class Scheduler(): # Regular timeout for driving status in the UI def _tick(self): - self._ticker_callback() + message = Notification(NotificationType.TICK) + self.message(message) self.loop.call_later(1, self._tick) def __getstate__(self): diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index c54fee1a7..b09521b5b 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -36,7 +36,7 @@ from ._artifactelement import verify_artifact_ref, ArtifactElement from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError from ._message import Message, MessageType from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \ - SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue + SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, JobStatus from ._pipeline import Pipeline, PipelineSelection from ._profile import Topics, PROFILER from ._state import State @@ -83,11 +83,13 @@ class Stream(): context.messenger.set_state(self._state) - self._scheduler = Scheduler(context, session_start, self._state, + self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler, interrupt_callback=interrupt_callback, ticker_callback=ticker_callback) self._first_non_track_queue = None self._session_start_callback = session_start_callback + self._ticker_callback = ticker_callback + self._interrupt_callback = interrupt_callback # init() # @@ -1584,6 +1586,25 @@ class Stream(): return element_targets, artifact_refs + def _scheduler_notification_handler(self, notification): + if notification.notification_type == NotificationType.INTERRUPT: + self._interrupt_callback() + elif notification.notification_type == NotificationType.TICK: + self._ticker_callback() + elif notification.notification_type == NotificationType.JOB_START: + self._state.add_task(notification.job_action, notification.full_name, notification.elapsed_time) + + elif notification.notification_type == NotificationType.JOB_COMPLETE: + self._state.remove_task(notification.job_action, notification.full_name) + if notification.job_status == JobStatus.FAIL: + if notification.element: + unique_id = notification.full_name + else: + unique_id = None + self._state.fail_task(notification.job_action, notification.full_name, unique_id) + else: + raise StreamError("Unreccognised notification type recieved") + def __getstate__(self): # The only use-cases for pickling in BuildStream at the time of writing # are enabling the 'spawn' method of starting child processes, and |