summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhil Dawson <phil.dawson@codethink.co.uk>2019-06-13 17:47:04 +0100
committerTom Pollard <tom.pollard@codethink.co.uk>2019-08-15 12:38:19 +0100
commit7f7c55f040b8bc42c5c66f0cbbb173d09bef856e (patch)
tree5215d6f0b96fe4a6e783e265f7c114b712e7358a
parentcf3f925764cfe6949aedbdcd313a93654f7c7657 (diff)
downloadbuildstream-7f7c55f040b8bc42c5c66f0cbbb173d09bef856e.tar.gz
WIP: Refactor scheduler-frontend communication
-rw-r--r--src/buildstream/_frontend/app.py2
-rw-r--r--src/buildstream/_scheduler/__init__.py2
-rw-r--r--src/buildstream/_scheduler/scheduler.py72
-rw-r--r--src/buildstream/_stream.py25
4 files changed, 81 insertions, 20 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 87575b675..7fe71c55f 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -603,6 +603,7 @@ class App():
pass
return
+ assert False
# Interactive mode for element failures
with self._interrupted():
@@ -642,6 +643,7 @@ class App():
# Handle choices which you can come back from
#
+ assert choice != 'shell' # This won't work for now
if choice == 'shell':
click.echo("\nDropping into an interactive shell in the failed build sandbox\n", err=True)
try:
diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py
index d2f458fa5..d689d6e25 100644
--- a/src/buildstream/_scheduler/__init__.py
+++ b/src/buildstream/_scheduler/__init__.py
@@ -26,5 +26,5 @@ from .queues.buildqueue import BuildQueue
from .queues.artifactpushqueue import ArtifactPushQueue
from .queues.pullqueue import PullQueue
-from .scheduler import Scheduler, SchedStatus
+from .scheduler import Scheduler, SchedStatus, Notification, NotificationType
from .jobs import ElementJob, JobStatus
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 9d7cf5d09..0ed5ada09 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -21,6 +21,7 @@
# System imports
import os
import asyncio
+import enum
from itertools import chain
import signal
import datetime
@@ -45,6 +46,32 @@ _ACTION_NAME_CLEANUP = 'clean'
_ACTION_NAME_CACHE_SIZE = 'size'
+@enum.unique
+class NotificationType(enum.Enum):
+ INTERRUPT = "interrupt"
+ JOB_START = "job_start"
+ JOB_COMPLETE = "job_complete"
+ TICK = "tick"
+
+
+class Notification:
+
+ def __init__(self,
+ notification_type,
+ *,
+ full_name=None,
+ job_action=None,
+ job_status=None,
+ elapsed_time=None,
+ element=None):
+ self.notification_type = notification_type
+ self.full_name = full_name
+ self.job_action = job_action
+ self.job_status = job_status
+ self.elapsed_time = elapsed_time
+ self.element = element
+
+
# Scheduler()
#
# The scheduler operates on a list queues, each of which is meant to accomplish
@@ -68,7 +95,7 @@ _ACTION_NAME_CACHE_SIZE = 'size'
class Scheduler():
def __init__(self, context,
- start_time, state,
+ start_time, state, message_handler,
interrupt_callback=None,
ticker_callback=None):
@@ -99,9 +126,17 @@ class Scheduler():
self._cleanup_scheduled = False # Whether we have a cleanup job scheduled
self._cleanup_running = None # A running CleanupJob, or None
- # Callbacks to report back to the Scheduler owner
- self._interrupt_callback = interrupt_callback
- self._ticker_callback = ticker_callback
+ # Callback to send messages to report back to the Scheduler's owner
+ self.message = message_handler
+
+ # Whether our exclusive jobs, like 'cleanup' are currently already
+ # waiting or active.
+ #
+ # This is just a bit quicker than scanning the wait queue and active
+ # queue and comparing job action names.
+ #
+ self._exclusive_waiting = set()
+ self._exclusive_active = set()
self.resources = Resources(context.sched_builders,
context.sched_fetchers,
@@ -131,8 +166,7 @@ class Scheduler():
asyncio.set_event_loop(self.loop)
# Add timeouts
- if self._ticker_callback:
- self.loop.call_later(1, self._tick)
+ self.loop.call_later(1, self._tick)
# Handle unix signals while running
self._connect_signals()
@@ -251,13 +285,17 @@ class Scheduler():
# Remove from the active jobs list
self._active_jobs.remove(job)
- self._state.remove_task(job.action_name, job.name)
if status == JobStatus.FAIL:
# If it's an elementjob, we want to compare against the failure messages
# and send the Element() instance. Note this will change if the frontend
# is run in a separate process for pickling
element = job.get_element()
- self._state.fail_task(job.action_name, job.name, element=element)
+ message = Notification(NotificationType.JOB_COMPLETE,
+ full_name=job.name,
+ job_action=job.action_name,
+ job_status=status,
+ element=element)
+ self.message(message)
# Now check for more jobs
self._sched()
@@ -316,7 +354,11 @@ class Scheduler():
#
def _start_job(self, job):
self._active_jobs.append(job)
- self._state.add_task(job.action_name, job.name, self.elapsed_time())
+ message = Notification(NotificationType.JOB_START,
+ full_name=job.name,
+ job_action=job.action_name,
+ elapsed_time=self.elapsed_time())
+ self.message(message)
job.start()
# Callback for the cache size job
@@ -535,13 +577,8 @@ class Scheduler():
if self.terminated:
return
- # Leave this to the frontend to decide, if no
- # interrrupt callback was specified, then just terminate.
- if self._interrupt_callback:
- self._interrupt_callback()
- else:
- # Default without a frontend is just terminate
- self.terminate_jobs()
+ message = Notification(NotificationType.INTERRUPT)
+ self.message(message)
# _terminate_event():
#
@@ -600,7 +637,8 @@ class Scheduler():
# Regular timeout for driving status in the UI
def _tick(self):
- self._ticker_callback()
+ message = Notification(NotificationType.TICK)
+ self.message(message)
self.loop.call_later(1, self._tick)
def __getstate__(self):
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index c54fee1a7..b09521b5b 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -36,7 +36,7 @@ from ._artifactelement import verify_artifact_ref, ArtifactElement
from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError
from ._message import Message, MessageType
from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
- SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue
+ SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, JobStatus
from ._pipeline import Pipeline, PipelineSelection
from ._profile import Topics, PROFILER
from ._state import State
@@ -83,11 +83,13 @@ class Stream():
context.messenger.set_state(self._state)
- self._scheduler = Scheduler(context, session_start, self._state,
+ self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler,
interrupt_callback=interrupt_callback,
ticker_callback=ticker_callback)
self._first_non_track_queue = None
self._session_start_callback = session_start_callback
+ self._ticker_callback = ticker_callback
+ self._interrupt_callback = interrupt_callback
# init()
#
@@ -1584,6 +1586,25 @@ class Stream():
return element_targets, artifact_refs
+ def _scheduler_notification_handler(self, notification):
+ if notification.notification_type == NotificationType.INTERRUPT:
+ self._interrupt_callback()
+ elif notification.notification_type == NotificationType.TICK:
+ self._ticker_callback()
+ elif notification.notification_type == NotificationType.JOB_START:
+ self._state.add_task(notification.job_action, notification.full_name, notification.elapsed_time)
+
+ elif notification.notification_type == NotificationType.JOB_COMPLETE:
+ self._state.remove_task(notification.job_action, notification.full_name)
+ if notification.job_status == JobStatus.FAIL:
+ if notification.element:
+ unique_id = notification.full_name
+ else:
+ unique_id = None
+ self._state.fail_task(notification.job_action, notification.full_name, unique_id)
+ else:
+ raise StreamError("Unreccognised notification type recieved")
+
def __getstate__(self):
# The only use-cases for pickling in BuildStream at the time of writing
# are enabling the 'spawn' method of starting child processes, and