diff options
author | Jonathan Maw <jonathan.maw@codethink.co.uk> | 2019-07-05 11:46:37 +0100 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-07-09 16:54:37 +0000 |
commit | fcce77761d118fb96a07afa32ae578fb7015a1ad (patch) | |
tree | 4605afec991b0369031e89f14be194b222b74936 | |
parent | bd0dada6189934b6f11fdc0cadfe9f36b67af077 (diff) | |
download | buildstream-fcce77761d118fb96a07afa32ae578fb7015a1ad.tar.gz |
Stream: Fix the existence of duplicate queues
It was possible for multiple Queues of the same type to exist.
Currently, there is no desired reason for this to happen.
These changes add an explicit function call to the Scheduler that
destroys the queues, to be used before constructing the next list of
queues to pass into the Scheduler.
It also calls this in all the places before we construct the queues.
Further, it alters Stream.fetch_subprojects because there is currently
no reason why we'd want to preserve the Stream's list of queues before
running.
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 11 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 21 |
2 files changed, 24 insertions, 8 deletions
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 0ee629338..17d655ce2 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -175,6 +175,17 @@ class Scheduler(): return status + # clear_queues() + # + # Forcibly destroys all the scheduler's queues + # This is needed because Queues register TaskGroups with State, + # which must be unique. As there is not yet any reason to have multiple + # Queues of the same type, old ones should be deleted. + # + def clear_queues(self): + if self.queues: + self.queues.clear() + # terminate_jobs() # # Forcefully terminates all ongoing jobs. diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index c6d748f91..594da997e 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -178,6 +178,7 @@ class Stream(): detail="\n" .join(list(map(lambda x: x._get_full_name(), missing_deps)))) self._message(MessageType.INFO, "Attempting to fetch missing or incomplete artifacts") + self._scheduler.clear_queues() self._add_queue(PullQueue(self._scheduler)) self._enqueue_plan([element] + missing_deps) self._run() @@ -190,6 +191,7 @@ class Stream(): # Attempt a pull queue for the given element if remote and context allow it if require_buildtree: self._message(MessageType.INFO, "Attempting to fetch missing artifact buildtree") + self._scheduler.clear_queues() self._add_queue(PullQueue(self._scheduler)) self._enqueue_plan(require_buildtree) self._run() @@ -269,6 +271,7 @@ class Stream(): # Now construct the queues # + self._scheduler.clear_queues() track_queue = None if track_elements: track_queue = TrackQueue(self._scheduler) @@ -366,6 +369,7 @@ class Stream(): track_except_targets=except_targets, track_cross_junctions=cross_junctions) + self._scheduler.clear_queues() track_queue = TrackQueue(self._scheduler) self._add_queue(track_queue, track=True) self._enqueue_plan(elements, queue=track_queue) @@ -403,6 +407,7 @@ class Stream(): raise StreamError("No artifact caches available for pulling artifacts") self._pipeline.assert_consistent(elements) + self._scheduler.clear_queues() self._add_queue(PullQueue(self._scheduler)) self._enqueue_plan(elements) self._run() @@ -465,6 +470,7 @@ class Stream(): for element in elements: element._pull_done() + self._scheduler.clear_queues() push_queue = ArtifactPushQueue(self._scheduler) self._add_queue(push_queue) self._enqueue_plan(elements, queue=push_queue) @@ -510,6 +516,7 @@ class Stream(): uncached_elts = [elt for elt in elements if not elt._cached()] if uncached_elts and pull: self._message(MessageType.INFO, "Attempting to fetch missing or incomplete artifact") + self._scheduler.clear_queues() self._add_queue(PullQueue(self._scheduler)) self._enqueue_plan(uncached_elts) self._run() @@ -958,14 +965,11 @@ class Stream(): # junctions (list of Element): The junctions to fetch # def fetch_subprojects(self, junctions): - old_queues = self.queues - try: - queue = FetchQueue(self._scheduler) - queue.enqueue(junctions) - self.queues = [queue] - self._run() - finally: - self.queues = old_queues + self._scheduler.clear_queues() + queue = FetchQueue(self._scheduler) + queue.enqueue(junctions) + self.queues = [queue] + self._run() ############################################################# # Scheduler API forwarding # @@ -1277,6 +1281,7 @@ class Stream(): # Construct queues, enqueue and run # + self._scheduler.clear_queues() track_queue = None if track_elements: track_queue = TrackQueue(self._scheduler) |