summaryrefslogtreecommitdiff
path: root/src/buildstream/_stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_stream.py')
-rw-r--r--src/buildstream/_stream.py61
1 files changed, 40 insertions, 21 deletions
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index c6d748f91..2ad1a4fee 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -38,6 +38,7 @@ from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue
from ._pipeline import Pipeline, PipelineSelection
from ._profile import Topics, PROFILER
+from ._state import State
from .types import _KeyStrength
from . import utils, _yaml, _site
from . import Scope, Consistency
@@ -53,17 +54,13 @@ from . import Scope, Consistency
# session_start_callback (callable): A callback to invoke when the session starts
# interrupt_callback (callable): A callback to invoke when we get interrupted
# ticker_callback (callable): Invoked every second while running the scheduler
-# job_start_callback (callable): Called when a job starts
-# job_complete_callback (callable): Called when a job completes
#
class Stream():
def __init__(self, context, session_start, *,
session_start_callback=None,
interrupt_callback=None,
- ticker_callback=None,
- job_start_callback=None,
- job_complete_callback=None):
+ ticker_callback=None):
#
# Public members
@@ -76,19 +73,27 @@ class Stream():
#
# Private members
#
- self._artifacts = context.artifactcache
- self._sourcecache = context.sourcecache
self._context = context
+ self._artifacts = None
+ self._sourcecache = None
self._project = None
self._pipeline = None
- self._scheduler = Scheduler(context, session_start,
+ self._state = State() # Owned by Stream, used by Core to set state
+ self._scheduler = Scheduler(context, session_start, self._state,
interrupt_callback=interrupt_callback,
- ticker_callback=ticker_callback,
- job_start_callback=job_start_callback,
- job_complete_callback=job_complete_callback)
+ ticker_callback=ticker_callback)
self._first_non_track_queue = None
self._session_start_callback = session_start_callback
+ # init()
+ #
+ # Initialization of Stream that has side-effects that require it to be
+ # performed after the Stream is created.
+ #
+ def init(self):
+ self._artifacts = self._context.artifactcache
+ self._sourcecache = self._context.sourcecache
+
# cleanup()
#
# Cleans up application state
@@ -178,8 +183,10 @@ class Stream():
detail="\n"
.join(list(map(lambda x: x._get_full_name(), missing_deps))))
self._message(MessageType.INFO, "Attempting to fetch missing or incomplete artifacts")
+ self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
- self._enqueue_plan([element] + missing_deps)
+ plan = self._pipeline.add_elements([element], missing_deps)
+ self._enqueue_plan(plan)
self._run()
buildtree = False
@@ -190,6 +197,7 @@ class Stream():
# Attempt a pull queue for the given element if remote and context allow it
if require_buildtree:
self._message(MessageType.INFO, "Attempting to fetch missing artifact buildtree")
+ self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan(require_buildtree)
self._run()
@@ -269,6 +277,7 @@ class Stream():
# Now construct the queues
#
+ self._scheduler.clear_queues()
track_queue = None
if track_elements:
track_queue = TrackQueue(self._scheduler)
@@ -366,6 +375,7 @@ class Stream():
track_except_targets=except_targets,
track_cross_junctions=cross_junctions)
+ self._scheduler.clear_queues()
track_queue = TrackQueue(self._scheduler)
self._add_queue(track_queue, track=True)
self._enqueue_plan(elements, queue=track_queue)
@@ -403,6 +413,7 @@ class Stream():
raise StreamError("No artifact caches available for pulling artifacts")
self._pipeline.assert_consistent(elements)
+ self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan(elements)
self._run()
@@ -465,6 +476,7 @@ class Stream():
for element in elements:
element._pull_done()
+ self._scheduler.clear_queues()
push_queue = ArtifactPushQueue(self._scheduler)
self._add_queue(push_queue)
self._enqueue_plan(elements, queue=push_queue)
@@ -510,6 +522,7 @@ class Stream():
uncached_elts = [elt for elt in elements if not elt._cached()]
if uncached_elts and pull:
self._message(MessageType.INFO, "Attempting to fetch missing or incomplete artifact")
+ self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan(uncached_elts)
self._run()
@@ -958,14 +971,20 @@ class Stream():
# junctions (list of Element): The junctions to fetch
#
def fetch_subprojects(self, junctions):
- old_queues = self.queues
- try:
- queue = FetchQueue(self._scheduler)
- queue.enqueue(junctions)
- self.queues = [queue]
- self._run()
- finally:
- self.queues = old_queues
+ self._scheduler.clear_queues()
+ queue = FetchQueue(self._scheduler)
+ queue.enqueue(junctions)
+ self.queues = [queue]
+ self._run()
+
+ # get_state()
+ #
+ # Get the State object owned by Stream
+ #
+ # Returns:
+ # State: The State object
+ def get_state(self):
+ return self._state
#############################################################
# Scheduler API forwarding #
@@ -1208,7 +1227,6 @@ class Stream():
#
def _add_queue(self, queue, *, track=False):
self.queues.append(queue)
-
if not (track or self._first_non_track_queue):
self._first_non_track_queue = queue
self._first_non_track_queue.set_required_element_check()
@@ -1277,6 +1295,7 @@ class Stream():
# Construct queues, enqueue and run
#
+ self._scheduler.clear_queues()
track_queue = None
if track_elements:
track_queue = TrackQueue(self._scheduler)