summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJonathan Maw <jonathan.maw@codethink.co.uk>2019-07-05 11:46:37 +0100
committerbst-marge-bot <marge-bot@buildstream.build>2019-07-09 16:54:37 +0000
commitfcce77761d118fb96a07afa32ae578fb7015a1ad (patch)
tree4605afec991b0369031e89f14be194b222b74936
parentbd0dada6189934b6f11fdc0cadfe9f36b67af077 (diff)
downloadbuildstream-fcce77761d118fb96a07afa32ae578fb7015a1ad.tar.gz
Stream: Fix the existence of duplicate queues
It was possible for multiple Queues of the same type to exist. Currently, there is no desired reason for this to happen. These changes add an explicit function call to the Scheduler that destroys the queues, to be used before constructing the next list of queues to pass into the Scheduler. It also calls this in all the places before we construct the queues. Further, it alters Stream.fetch_subprojects because there is currently no reason why we'd want to preserve the Stream's list of queues before running.
-rw-r--r--src/buildstream/_scheduler/scheduler.py11
-rw-r--r--src/buildstream/_stream.py21
2 files changed, 24 insertions, 8 deletions
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 0ee629338..17d655ce2 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -175,6 +175,17 @@ class Scheduler():
return status
+ # clear_queues()
+ #
+ # Forcibly destroys all the scheduler's queues
+ # This is needed because Queues register TaskGroups with State,
+ # which must be unique. As there is not yet any reason to have multiple
+ # Queues of the same type, old ones should be deleted.
+ #
+ def clear_queues(self):
+ if self.queues:
+ self.queues.clear()
+
# terminate_jobs()
#
# Forcefully terminates all ongoing jobs.
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index c6d748f91..594da997e 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -178,6 +178,7 @@ class Stream():
detail="\n"
.join(list(map(lambda x: x._get_full_name(), missing_deps))))
self._message(MessageType.INFO, "Attempting to fetch missing or incomplete artifacts")
+ self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan([element] + missing_deps)
self._run()
@@ -190,6 +191,7 @@ class Stream():
# Attempt a pull queue for the given element if remote and context allow it
if require_buildtree:
self._message(MessageType.INFO, "Attempting to fetch missing artifact buildtree")
+ self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan(require_buildtree)
self._run()
@@ -269,6 +271,7 @@ class Stream():
# Now construct the queues
#
+ self._scheduler.clear_queues()
track_queue = None
if track_elements:
track_queue = TrackQueue(self._scheduler)
@@ -366,6 +369,7 @@ class Stream():
track_except_targets=except_targets,
track_cross_junctions=cross_junctions)
+ self._scheduler.clear_queues()
track_queue = TrackQueue(self._scheduler)
self._add_queue(track_queue, track=True)
self._enqueue_plan(elements, queue=track_queue)
@@ -403,6 +407,7 @@ class Stream():
raise StreamError("No artifact caches available for pulling artifacts")
self._pipeline.assert_consistent(elements)
+ self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan(elements)
self._run()
@@ -465,6 +470,7 @@ class Stream():
for element in elements:
element._pull_done()
+ self._scheduler.clear_queues()
push_queue = ArtifactPushQueue(self._scheduler)
self._add_queue(push_queue)
self._enqueue_plan(elements, queue=push_queue)
@@ -510,6 +516,7 @@ class Stream():
uncached_elts = [elt for elt in elements if not elt._cached()]
if uncached_elts and pull:
self._message(MessageType.INFO, "Attempting to fetch missing or incomplete artifact")
+ self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan(uncached_elts)
self._run()
@@ -958,14 +965,11 @@ class Stream():
# junctions (list of Element): The junctions to fetch
#
def fetch_subprojects(self, junctions):
- old_queues = self.queues
- try:
- queue = FetchQueue(self._scheduler)
- queue.enqueue(junctions)
- self.queues = [queue]
- self._run()
- finally:
- self.queues = old_queues
+ self._scheduler.clear_queues()
+ queue = FetchQueue(self._scheduler)
+ queue.enqueue(junctions)
+ self.queues = [queue]
+ self._run()
#############################################################
# Scheduler API forwarding #
@@ -1277,6 +1281,7 @@ class Stream():
# Construct queues, enqueue and run
#
+ self._scheduler.clear_queues()
track_queue = None
if track_elements:
track_queue = TrackQueue(self._scheduler)