diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2019-07-09 17:24:44 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-07-09 17:24:44 +0000 |
commit | e274b648ccbf0149b46ffdee51316bdd4fac7a2e (patch) | |
tree | 0d002654ef5dc555b3704b3cdeeeff18d7febbbd /src/buildstream/_stream.py | |
parent | 1e11d190562e641d10e2e435a8411c802eaedc81 (diff) | |
parent | 0379151c1185de654c3f3672fbec5c16ee7f49a6 (diff) | |
download | buildstream-e274b648ccbf0149b46ffdee51316bdd4fac7a2e.tar.gz |
Merge branch 'jonathan/frontend-separation' into 'master'
Separate frontend state handling from core state
See merge request BuildStream/buildstream!1409
Diffstat (limited to 'src/buildstream/_stream.py')
-rw-r--r-- | src/buildstream/_stream.py | 61 |
1 files changed, 40 insertions, 21 deletions
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index c6d748f91..2ad1a4fee 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -38,6 +38,7 @@ from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \ SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue from ._pipeline import Pipeline, PipelineSelection from ._profile import Topics, PROFILER +from ._state import State from .types import _KeyStrength from . import utils, _yaml, _site from . import Scope, Consistency @@ -53,17 +54,13 @@ from . import Scope, Consistency # session_start_callback (callable): A callback to invoke when the session starts # interrupt_callback (callable): A callback to invoke when we get interrupted # ticker_callback (callable): Invoked every second while running the scheduler -# job_start_callback (callable): Called when a job starts -# job_complete_callback (callable): Called when a job completes # class Stream(): def __init__(self, context, session_start, *, session_start_callback=None, interrupt_callback=None, - ticker_callback=None, - job_start_callback=None, - job_complete_callback=None): + ticker_callback=None): # # Public members @@ -76,19 +73,27 @@ class Stream(): # # Private members # - self._artifacts = context.artifactcache - self._sourcecache = context.sourcecache self._context = context + self._artifacts = None + self._sourcecache = None self._project = None self._pipeline = None - self._scheduler = Scheduler(context, session_start, + self._state = State() # Owned by Stream, used by Core to set state + self._scheduler = Scheduler(context, session_start, self._state, interrupt_callback=interrupt_callback, - ticker_callback=ticker_callback, - job_start_callback=job_start_callback, - job_complete_callback=job_complete_callback) + ticker_callback=ticker_callback) self._first_non_track_queue = None self._session_start_callback = session_start_callback + # init() + # + # Initialization of Stream that has side-effects that require it to be + # performed after the Stream is created. + # + def init(self): + self._artifacts = self._context.artifactcache + self._sourcecache = self._context.sourcecache + # cleanup() # # Cleans up application state @@ -178,8 +183,10 @@ class Stream(): detail="\n" .join(list(map(lambda x: x._get_full_name(), missing_deps)))) self._message(MessageType.INFO, "Attempting to fetch missing or incomplete artifacts") + self._scheduler.clear_queues() self._add_queue(PullQueue(self._scheduler)) - self._enqueue_plan([element] + missing_deps) + plan = self._pipeline.add_elements([element], missing_deps) + self._enqueue_plan(plan) self._run() buildtree = False @@ -190,6 +197,7 @@ class Stream(): # Attempt a pull queue for the given element if remote and context allow it if require_buildtree: self._message(MessageType.INFO, "Attempting to fetch missing artifact buildtree") + self._scheduler.clear_queues() self._add_queue(PullQueue(self._scheduler)) self._enqueue_plan(require_buildtree) self._run() @@ -269,6 +277,7 @@ class Stream(): # Now construct the queues # + self._scheduler.clear_queues() track_queue = None if track_elements: track_queue = TrackQueue(self._scheduler) @@ -366,6 +375,7 @@ class Stream(): track_except_targets=except_targets, track_cross_junctions=cross_junctions) + self._scheduler.clear_queues() track_queue = TrackQueue(self._scheduler) self._add_queue(track_queue, track=True) self._enqueue_plan(elements, queue=track_queue) @@ -403,6 +413,7 @@ class Stream(): raise StreamError("No artifact caches available for pulling artifacts") self._pipeline.assert_consistent(elements) + self._scheduler.clear_queues() self._add_queue(PullQueue(self._scheduler)) self._enqueue_plan(elements) self._run() @@ -465,6 +476,7 @@ class Stream(): for element in elements: element._pull_done() + self._scheduler.clear_queues() push_queue = ArtifactPushQueue(self._scheduler) self._add_queue(push_queue) self._enqueue_plan(elements, queue=push_queue) @@ -510,6 +522,7 @@ class Stream(): uncached_elts = [elt for elt in elements if not elt._cached()] if uncached_elts and pull: self._message(MessageType.INFO, "Attempting to fetch missing or incomplete artifact") + self._scheduler.clear_queues() self._add_queue(PullQueue(self._scheduler)) self._enqueue_plan(uncached_elts) self._run() @@ -958,14 +971,20 @@ class Stream(): # junctions (list of Element): The junctions to fetch # def fetch_subprojects(self, junctions): - old_queues = self.queues - try: - queue = FetchQueue(self._scheduler) - queue.enqueue(junctions) - self.queues = [queue] - self._run() - finally: - self.queues = old_queues + self._scheduler.clear_queues() + queue = FetchQueue(self._scheduler) + queue.enqueue(junctions) + self.queues = [queue] + self._run() + + # get_state() + # + # Get the State object owned by Stream + # + # Returns: + # State: The State object + def get_state(self): + return self._state ############################################################# # Scheduler API forwarding # @@ -1208,7 +1227,6 @@ class Stream(): # def _add_queue(self, queue, *, track=False): self.queues.append(queue) - if not (track or self._first_non_track_queue): self._first_non_track_queue = queue self._first_non_track_queue.set_required_element_check() @@ -1277,6 +1295,7 @@ class Stream(): # Construct queues, enqueue and run # + self._scheduler.clear_queues() track_queue = None if track_elements: track_queue = TrackQueue(self._scheduler) |