diff options
author | Tristan van Berkom <tristan@codethink.co.uk> | 2020-12-11 19:18:54 +0900 |
---|---|---|
committer | Tristan van Berkom <tristan@codethink.co.uk> | 2020-12-13 12:54:44 +0900 |
commit | 11e495b753145d15ac9c20aa141087c57ae75124 (patch) | |
tree | ce16ce1d3f0b0a2daa93e4bbdcff50b7c00e3425 | |
parent | 57293c8b47ca15937df8a5464582b6a58ef10dc7 (diff) | |
download | buildstream-11e495b753145d15ac9c20aa141087c57ae75124.tar.gz |
_stream.py: Added new _reset() function.
Instead of calling self._scheduler.clear_queues(), add a focus point
for resetting state related to a scheduler run. I suspect that the
session element counts have been flawed in cases where we run the
scheduler multiple times but fail to clear that part of the state.
This should all around be a better approach for multiple scheduler runs.
-rw-r--r-- | src/buildstream/_stream.py | 36 |
1 files changed, 27 insertions, 9 deletions
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 3feac677e..14fa42f5f 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -210,7 +210,7 @@ class Stream: element._set_required(scope) if pull_: - self._scheduler.clear_queues() + self._reset() self._add_queue(PullQueue(self._scheduler)) plan = self._pipeline.add_elements([element], elements) self._enqueue_plan(plan) @@ -293,7 +293,7 @@ class Stream: # Now construct the queues # - self._scheduler.clear_queues() + self._reset() if self._artifacts.has_fetch_remotes(): self._add_queue(PullQueue(self._scheduler)) @@ -364,7 +364,7 @@ class Stream: # is tracked, its state must be fully updated in either case, # and we anyway don't do anything else with it. - self._scheduler.clear_queues() + self._reset() track_queue = TrackQueue(self._scheduler) self._add_queue(track_queue, track=True) self._enqueue_plan(elements, queue=track_queue) @@ -445,7 +445,7 @@ class Stream: raise StreamError("No artifact caches available for pulling artifacts") self._pipeline.assert_consistent(elements) - self._scheduler.clear_queues() + self._reset() self._add_queue(PullQueue(self._scheduler)) self._enqueue_plan(elements) self._run(announce_session=True) @@ -487,7 +487,7 @@ class Stream: self._pipeline.assert_consistent(elements) - self._scheduler.clear_queues() + self._reset() self._add_queue(PullQueue(self._scheduler)) self._add_queue(ArtifactPushQueue(self._scheduler)) self._enqueue_plan(elements) @@ -546,7 +546,7 @@ class Stream: uncached_elts = [elt for elt in elements if not elt._cached()] if uncached_elts and pull: self._message(MessageType.INFO, "Attempting to fetch missing or incomplete artifact") - self._scheduler.clear_queues() + self._reset() self._add_queue(PullQueue(self._scheduler)) self._enqueue_plan(uncached_elts) self._run(announce_session=True) @@ -1109,7 +1109,7 @@ class Stream: # junctions (list of Element): The junctions to fetch # def _fetch_subprojects(self, junctions): - self._scheduler.clear_queues() + self._reset() queue = FetchQueue(self._scheduler) queue.enqueue(junctions) self.queues = [queue] @@ -1342,7 +1342,7 @@ class Stream: for element in artifacts: element._set_required(_Scope.NONE) - self._scheduler.clear_queues() + self._reset() self._add_queue(PullQueue(self._scheduler)) self._enqueue_plan(artifacts) self._run() @@ -1394,6 +1394,24 @@ class Stream: args = dict(kwargs) self._context.messenger.message(Message(message_type, message, **args)) + # _reset() + # + # Resets the internal state related to a given scheduler run. + # + # Invocations to the scheduler should start with a _reset() and end + # with _run() like so: + # + # self._reset() + # self._add_queue(...) + # self._add_queue(...) + # self._enqueue_plan(...) + # self._run() + # + def _reset(self): + self._scheduler.clear_queues() + self.session_elements = [] + self.total_elements = [] + # _add_queue() # # Adds a queue to the stream @@ -1468,7 +1486,7 @@ class Stream: # Construct queues, enqueue and run # - self._scheduler.clear_queues() + self._reset() self._add_queue(FetchQueue(self._scheduler, fetch_original=fetch_original)) self._enqueue_plan(elements) self._run(announce_session=announce_session) |