summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Pollard <tom.pollard@codethink.co.uk>2019-06-13 17:47:04 +0100
committerbst-marge-bot <marge-bot@buildstream.build>2019-09-10 10:44:53 +0000
commit56365356a20540c3319107fe647d0852321a64cb (patch)
treee666faa9979c248506cd60da5ee96860527ee495
parenta200338e3b1947c0d8e7a89b1977f753988dc76e (diff)
downloadbuildstream-56365356a20540c3319107fe647d0852321a64cb.tar.gz
scheduler.py: Introduce a notification queue for stream interaction
For the 'backend' of BuildStream to run in a subprocess certain interactions between the 'frontend' need to be augmented over a queue between the processes, as well as adaptions for the differences in state between them. This commit implements the core of these interactions as notification objects over a deque with notifying callbacks on either side, which should be switched to a multiprocessing queue once there's process separation at which point 'both ends' can execute an event loop. This also removes the checks for the ticker & interrupt callbacks to be optional, as currently they're always implemented.
-rw-r--r--src/buildstream/_scheduler/__init__.py2
-rw-r--r--src/buildstream/_scheduler/scheduler.py101
-rw-r--r--src/buildstream/_stream.py42
3 files changed, 117 insertions, 28 deletions
diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py
index d2f458fa5..d689d6e25 100644
--- a/src/buildstream/_scheduler/__init__.py
+++ b/src/buildstream/_scheduler/__init__.py
@@ -26,5 +26,5 @@ from .queues.buildqueue import BuildQueue
from .queues.artifactpushqueue import ArtifactPushQueue
from .queues.pullqueue import PullQueue
-from .scheduler import Scheduler, SchedStatus
+from .scheduler import Scheduler, SchedStatus, Notification, NotificationType
from .jobs import ElementJob, JobStatus
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 37295b285..e6a12e81c 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -29,16 +29,59 @@ from contextlib import contextmanager
# Local imports
from .resources import Resources
from .jobs import JobStatus
+from ..types import FastEnum
from .._profile import Topics, PROFILER
# A decent return code for Scheduler.run()
-class SchedStatus():
+class SchedStatus(FastEnum):
SUCCESS = 0
ERROR = -1
TERMINATED = 1
+# NotificationType()
+#
+# Type of notification for inter-process communication
+# between 'front' & 'back' end when a scheduler is executing.
+# This is used as a parameter for a Notification object,
+# to be used as a conditional for control or state handling.
+#
+class NotificationType(FastEnum):
+ INTERRUPT = "interrupt"
+ JOB_START = "job_start"
+ JOB_COMPLETE = "job_complete"
+ TICK = "tick"
+ TERMINATE = "terminate"
+ QUIT = "quit"
+
+
+# Notification()
+#
+# An object to be passed across a bidirectional queue between
+# Stream & Scheduler. A required NotificationType() parameter
+# with accompanying information can be added as a member if
+# required. NOTE: The notification object should be lightweight
+# and all attributes must be picklable.
+#
+class Notification():
+
+ def __init__(self,
+ notification_type,
+ *,
+ full_name=None,
+ job_action=None,
+ job_status=None,
+ elapsed_time=None,
+ element=None):
+ self.notification_type = notification_type
+ self.full_name = full_name
+ self.job_action = job_action
+ self.job_status = job_status
+ self.elapsed_time = elapsed_time
+ self.element = element
+
+
# Scheduler()
#
# The scheduler operates on a list queues, each of which is meant to accomplish
@@ -62,9 +105,7 @@ class SchedStatus():
class Scheduler():
def __init__(self, context,
- start_time, state,
- interrupt_callback=None,
- ticker_callback=None):
+ start_time, state, notification_queue, notifier):
#
# Public members
@@ -87,9 +128,9 @@ class Scheduler():
self._queue_jobs = True # Whether we should continue to queue jobs
self._state = state
- # Callbacks to report back to the Scheduler owner
- self._interrupt_callback = interrupt_callback
- self._ticker_callback = ticker_callback
+ # Bidirectional queue to send notifications back to the Scheduler's owner
+ self._notification_queue = notification_queue
+ self._notifier = notifier
self.resources = Resources(context.sched_builders,
context.sched_fetchers,
@@ -121,8 +162,7 @@ class Scheduler():
asyncio.set_event_loop(self.loop)
# Add timeouts
- if self._ticker_callback:
- self.loop.call_later(1, self._tick)
+ self.loop.call_later(1, self._tick)
# Handle unix signals while running
self._connect_signals()
@@ -234,11 +274,10 @@ class Scheduler():
# status (JobStatus): The status of the completed job
#
def job_completed(self, job, status):
-
# Remove from the active jobs list
self._active_jobs.remove(job)
- self._state.remove_task(job.action_name, job.name)
+ element_info = None
if status == JobStatus.FAIL:
# If it's an elementjob, we want to compare against the failure messages
# and send the unique_id and display key tuple of the Element. This can then
@@ -248,9 +287,14 @@ class Scheduler():
element_info = element._unique_id, element._get_display_key()
else:
element_info = None
- self._state.fail_task(job.action_name, job.name, element=element_info)
# Now check for more jobs
+ notification = Notification(NotificationType.JOB_COMPLETE,
+ full_name=job.name,
+ job_action=job.action_name,
+ job_status=status,
+ element=element_info)
+ self._notify(notification)
self._sched()
#######################################################
@@ -266,7 +310,11 @@ class Scheduler():
#
def _start_job(self, job):
self._active_jobs.append(job)
- self._state.add_task(job.action_name, job.name, self.elapsed_time())
+ notification = Notification(NotificationType.JOB_START,
+ full_name=job.name,
+ job_action=job.action_name,
+ elapsed_time=self.elapsed_time())
+ self._notify(notification)
job.start()
# _sched_queue_jobs()
@@ -379,13 +427,8 @@ class Scheduler():
if self.terminated:
return
- # Leave this to the frontend to decide, if no
- # interrrupt callback was specified, then just terminate.
- if self._interrupt_callback:
- self._interrupt_callback()
- else:
- # Default without a frontend is just terminate
- self.terminate_jobs()
+ notification = Notification(NotificationType.INTERRUPT)
+ self._notify(notification)
# _terminate_event():
#
@@ -444,9 +487,25 @@ class Scheduler():
# Regular timeout for driving status in the UI
def _tick(self):
- self._ticker_callback()
+ self._notify(Notification(NotificationType.TICK))
self.loop.call_later(1, self._tick)
+ def _notify(self, notification):
+ # Scheduler to Stream notifcations on right side
+ self._notification_queue.append(notification)
+ self._notifier()
+
+ def _stream_notification_handler(self):
+ notification = self._notification_queue.popleft()
+ if notification.notification_type == NotificationType.TERMINATE:
+ self.terminate_jobs()
+ elif notification.notification_type == NotificationType.QUIT:
+ self.stop_queueing()
+ else:
+ # Do not raise exception once scheduler process is separated
+ # as we don't want to pickle exceptions between processes
+ raise ValueError("Unrecognised notification type received")
+
def __getstate__(self):
# The only use-cases for pickling in BuildStream at the time of writing
# are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 2e43bb1a2..bbe60c1ec 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -28,12 +28,13 @@ import tarfile
import tempfile
from contextlib import contextmanager, suppress
from fnmatch import fnmatch
+from collections import deque
from ._artifactelement import verify_artifact_ref, ArtifactElement
from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError
from ._message import Message, MessageType
from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
- SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue
+ SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, Notification, JobStatus
from ._pipeline import Pipeline, PipelineSelection
from ._profile import Topics, PROFILER
from ._state import State
@@ -78,14 +79,17 @@ class Stream():
self._project = None
self._pipeline = None
self._state = State(session_start) # Owned by Stream, used by Core to set state
+ self._notification_queue = deque()
context.messenger.set_state(self._state)
- self._scheduler = Scheduler(context, session_start, self._state,
- interrupt_callback=interrupt_callback,
- ticker_callback=ticker_callback)
+ self._scheduler = Scheduler(context, session_start, self._state, self._notification_queue,
+ self._scheduler_notification_handler)
self._first_non_track_queue = None
self._session_start_callback = session_start_callback
+ self._ticker_callback = ticker_callback
+ self._interrupt_callback = interrupt_callback
+ self._notifier = self._scheduler._stream_notification_handler # Assign the schedulers notification handler
# init()
#
@@ -1103,7 +1107,8 @@ class Stream():
# Terminate jobs
#
def terminate(self):
- self._scheduler.terminate_jobs()
+ notification = Notification(NotificationType.TERMINATE)
+ self._notify(notification)
# quit()
#
@@ -1112,7 +1117,8 @@ class Stream():
# of ongoing jobs
#
def quit(self):
- self._scheduler.stop_queueing()
+ notification = Notification(NotificationType.QUIT)
+ self._notify(notification)
# suspend()
#
@@ -1642,6 +1648,30 @@ class Stream():
return element_targets, artifact_refs
+ def _scheduler_notification_handler(self):
+ # Check the queue is there and a scheduler is running
+ assert self._notification_queue and self.running
+ notification = self._notification_queue.pop()
+
+ if notification.notification_type == NotificationType.INTERRUPT:
+ self._interrupt_callback()
+ elif notification.notification_type == NotificationType.TICK:
+ self._ticker_callback()
+ elif notification.notification_type == NotificationType.JOB_START:
+ self._state.add_task(notification.job_action, notification.full_name, notification.elapsed_time)
+ elif notification.notification_type == NotificationType.JOB_COMPLETE:
+ self._state.remove_task(notification.job_action, notification.full_name)
+ if notification.job_status == JobStatus.FAIL:
+ self._state.fail_task(notification.job_action, notification.full_name,
+ notification.element)
+ else:
+ raise StreamError("Unrecognised notification type received")
+
+ def _notify(self, notification):
+ # Stream to scheduler notifcations on left side
+ self._notification_queue.appendleft(notification)
+ self._notifier()
+
def __getstate__(self):
# The only use-cases for pickling in BuildStream at the time of writing
# are enabling the 'spawn' method of starting child processes, and