summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan van Berkom <tristan@codethink.co.uk>2020-12-11 19:18:54 +0900
committerTristan van Berkom <tristan@codethink.co.uk>2020-12-13 12:54:44 +0900
commit11e495b753145d15ac9c20aa141087c57ae75124 (patch)
treece16ce1d3f0b0a2daa93e4bbdcff50b7c00e3425
parent57293c8b47ca15937df8a5464582b6a58ef10dc7 (diff)
downloadbuildstream-11e495b753145d15ac9c20aa141087c57ae75124.tar.gz
_stream.py: Added new _reset() function.
Instead of calling self._scheduler.clear_queues(), add a focus point for resetting state related to a scheduler run. I suspect that the session element counts have been flawed in cases where we run the scheduler multiple times but fail to clear that part of the state. This should all around be a better approach for multiple scheduler runs.
-rw-r--r--src/buildstream/_stream.py36
1 files changed, 27 insertions, 9 deletions
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 3feac677e..14fa42f5f 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -210,7 +210,7 @@ class Stream:
element._set_required(scope)
if pull_:
- self._scheduler.clear_queues()
+ self._reset()
self._add_queue(PullQueue(self._scheduler))
plan = self._pipeline.add_elements([element], elements)
self._enqueue_plan(plan)
@@ -293,7 +293,7 @@ class Stream:
# Now construct the queues
#
- self._scheduler.clear_queues()
+ self._reset()
if self._artifacts.has_fetch_remotes():
self._add_queue(PullQueue(self._scheduler))
@@ -364,7 +364,7 @@ class Stream:
# is tracked, its state must be fully updated in either case,
# and we anyway don't do anything else with it.
- self._scheduler.clear_queues()
+ self._reset()
track_queue = TrackQueue(self._scheduler)
self._add_queue(track_queue, track=True)
self._enqueue_plan(elements, queue=track_queue)
@@ -445,7 +445,7 @@ class Stream:
raise StreamError("No artifact caches available for pulling artifacts")
self._pipeline.assert_consistent(elements)
- self._scheduler.clear_queues()
+ self._reset()
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan(elements)
self._run(announce_session=True)
@@ -487,7 +487,7 @@ class Stream:
self._pipeline.assert_consistent(elements)
- self._scheduler.clear_queues()
+ self._reset()
self._add_queue(PullQueue(self._scheduler))
self._add_queue(ArtifactPushQueue(self._scheduler))
self._enqueue_plan(elements)
@@ -546,7 +546,7 @@ class Stream:
uncached_elts = [elt for elt in elements if not elt._cached()]
if uncached_elts and pull:
self._message(MessageType.INFO, "Attempting to fetch missing or incomplete artifact")
- self._scheduler.clear_queues()
+ self._reset()
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan(uncached_elts)
self._run(announce_session=True)
@@ -1109,7 +1109,7 @@ class Stream:
# junctions (list of Element): The junctions to fetch
#
def _fetch_subprojects(self, junctions):
- self._scheduler.clear_queues()
+ self._reset()
queue = FetchQueue(self._scheduler)
queue.enqueue(junctions)
self.queues = [queue]
@@ -1342,7 +1342,7 @@ class Stream:
for element in artifacts:
element._set_required(_Scope.NONE)
- self._scheduler.clear_queues()
+ self._reset()
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan(artifacts)
self._run()
@@ -1394,6 +1394,24 @@ class Stream:
args = dict(kwargs)
self._context.messenger.message(Message(message_type, message, **args))
+ # _reset()
+ #
+ # Resets the internal state related to a given scheduler run.
+ #
+ # Invocations to the scheduler should start with a _reset() and end
+ # with _run() like so:
+ #
+ # self._reset()
+ # self._add_queue(...)
+ # self._add_queue(...)
+ # self._enqueue_plan(...)
+ # self._run()
+ #
+ def _reset(self):
+ self._scheduler.clear_queues()
+ self.session_elements = []
+ self.total_elements = []
+
# _add_queue()
#
# Adds a queue to the stream
@@ -1468,7 +1486,7 @@ class Stream:
# Construct queues, enqueue and run
#
- self._scheduler.clear_queues()
+ self._reset()
self._add_queue(FetchQueue(self._scheduler, fetch_original=fetch_original))
self._enqueue_plan(elements)
self._run(announce_session=announce_session)