diff options
author | Jürg Billeter <j@bitron.ch> | 2020-09-03 11:04:12 +0200 |
---|---|---|
committer | Jürg Billeter <j@bitron.ch> | 2020-10-15 17:32:02 +0200 |
commit | 94e4866e29778eb1e16e4a79d756bab765235286 (patch) | |
tree | c8d4ea316bf24fba7a9bedf20c4bee256c07e280 | |
parent | c9071f6418cae837462fd374a923779b78dd5d73 (diff) | |
download | buildstream-94e4866e29778eb1e16e4a79d756bab765235286.tar.gz |
Always schedule pull job
This is preparation to perform artifact cache query as part of the same
job as artifact pulling.
-rw-r--r-- | src/buildstream/_scheduler/queues/pullqueue.py | 20 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 37 | ||||
-rw-r--r-- | src/buildstream/element.py | 65 |
3 files changed, 64 insertions, 58 deletions
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py index e1d69590f..29243a088 100644 --- a/src/buildstream/_scheduler/queues/pullqueue.py +++ b/src/buildstream/_scheduler/queues/pullqueue.py @@ -33,17 +33,22 @@ class PullQueue(Queue): complete_name = "Artifacts Pulled" resources = [ResourceType.DOWNLOAD, ResourceType.CACHE] + def __init__(self, scheduler, *, check_remotes=True): + super().__init__(scheduler) + + self._check_remotes = check_remotes + def get_process_func(self): - return PullQueue._pull_or_skip + if self._check_remotes: + return PullQueue._pull_or_skip + else: + return PullQueue._check def status(self, element): if not element._can_query_cache(): return QueueStatus.PENDING - if element._pull_pending(): - return QueueStatus.READY - else: - return QueueStatus.SKIP + return QueueStatus.READY def done(self, _, element, result, status): @@ -62,3 +67,8 @@ class PullQueue(Queue): def _pull_or_skip(element): if not element._pull(): raise SkipJob(PullQueue.action_name) + + @staticmethod + def _check(element): + if not element._pull(check_remotes=False): + raise SkipJob(PullQueue.action_name) diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index ccebd4889..6ceba8485 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -171,12 +171,20 @@ class Stream: # elements (list of Element): The elements to check # def query_cache(self, elements): - # Artifact cache is implicitly queried on demand + # Enqueue complete build plan as this is required to determine `buildable` status. + plan = self._pipeline.plan(elements) + self._scheduler.clear_queues() + self._add_queue(PullQueue(self._scheduler, check_remotes=False)) self._add_queue(FetchQueue(self._scheduler, skip_cached=True, check_only=True)) - self._enqueue_plan(elements) + self._enqueue_plan(plan) self._run() + # In non-strict mode, the above may not calculate all strong cache keys. + if not self._context.get_strict(): + for element in plan: + element._update_cache_key_non_strict() + # shell() # # Run a shell @@ -223,12 +231,11 @@ class Stream: element = self.targets[0] element._set_required(scope) - if pull_: - self._scheduler.clear_queues() - self._add_queue(PullQueue(self._scheduler)) - plan = self._pipeline.add_elements([element], elements) - self._enqueue_plan(plan) - self._run() + self._scheduler.clear_queues() + self._add_queue(PullQueue(self._scheduler, check_remotes=pull_)) + plan = self._pipeline.add_elements([element], elements) + self._enqueue_plan(plan) + self._run() missing_deps = [dep for dep in self._pipeline.dependencies([element], scope) if not dep._cached()] if missing_deps: @@ -309,8 +316,7 @@ class Stream: # self._scheduler.clear_queues() - if self._artifacts.has_fetch_remotes(): - self._add_queue(PullQueue(self._scheduler)) + self._add_queue(PullQueue(self._scheduler)) self._add_queue(FetchQueue(self._scheduler, skip_cached=True)) @@ -550,13 +556,10 @@ class Stream: self._check_location_writable(location, force=force, tar=tar) - uncached_elts = [elt for elt in elements if not elt._cached()] - if uncached_elts and pull: - self._message(MessageType.INFO, "Attempting to fetch missing or incomplete artifact") - self._scheduler.clear_queues() - self._add_queue(PullQueue(self._scheduler)) - self._enqueue_plan(uncached_elts) - self._run(announce_session=True) + self._scheduler.clear_queues() + self._add_queue(PullQueue(self._scheduler, check_remotes=pull)) + self._enqueue_plan(elements) + self._run(announce_session=True) try: scope = { diff --git a/src/buildstream/element.py b/src/buildstream/element.py index 445ad5193..a748de89b 100644 --- a/src/buildstream/element.py +++ b/src/buildstream/element.py @@ -1368,7 +1368,7 @@ class Element(Plugin): # - __schedule_assembly_when_necessary() # - Schedules assembly of an element, iff its current state # allows/necessitates it - # - __update_cache_key_non_strict() + # - _update_cache_key_non_strict() # - Sets strict cache keys in non-strict builds # - Some non-strict build actions can create artifacts # compatible with strict mode (such as pulling), so @@ -1673,7 +1673,7 @@ class Element(Plugin): # Once we schedule an element for assembly, we know that our # build dependencies have strong cache keys, so we can update # our own strong cache key. - self.__update_cache_key_non_strict() + self._update_cache_key_non_strict() # _assemble_done(): # @@ -1702,7 +1702,7 @@ class Element(Plugin): # assembled everything to this point without a strong cache # key. Once the element has been assembled, a strong cache key # can be set, so we do so. - self.__update_cache_key_non_strict() + self._update_cache_key_non_strict() self._update_ready_for_runtime_and_cached() if self._get_workspace() and self._cached(): @@ -1889,34 +1889,13 @@ class Element(Plugin): # _pull_pending() # - # Check whether the artifact will be pulled. If the pull operation is to - # include a specific subdir of the element artifact (from cli or user conf) - # then the local cache is queried for the subdirs existence. + # Check whether the artifact will be pulled. # # Returns: # (bool): Whether a pull operation is pending # def _pull_pending(self): - if self._get_workspace(): - # Workspace builds are never pushed to artifact servers - return False - - # Check whether the pull has been invoked with a specific subdir requested - # in user context, as to complete a partial artifact - pull_buildtrees = self._get_context().pull_buildtrees - - if self._cached() and self.__artifact._cache_key == self.__strict_cache_key: - if pull_buildtrees: - # If we've specified a subdir, check if the subdir is cached locally - # or if it's possible to get - if self._cached_buildtree() or not self._buildtree_exists(): - return False - else: - return False - - # Pull is pending if artifact remote server available - # and pull has not been attempted yet - return self.__artifacts.has_fetch_remotes(plugin=self) and not self.__pull_done + return not self.__pull_done # _pull_done() # @@ -1943,7 +1922,7 @@ class Element(Plugin): # If we've finished pulling, an artifact might now exist # locally, so we might need to update a non-strict strong # cache key. - self.__update_cache_key_non_strict() + self._update_cache_key_non_strict() self._update_ready_for_runtime_and_cached() # _pull(): @@ -1952,12 +1931,26 @@ class Element(Plugin): # # Returns: True if the artifact has been downloaded, False otherwise # - def _pull(self): + def _pull(self, *, check_remotes=True): context = self._get_context() - # Get optional specific subdir to pull and optional list to not pull - # based off of user context - pull_buildtrees = context.pull_buildtrees + if self._get_workspace(): + # Workspace builds are never pushed to artifact servers + return False + + pull_buildtrees = self._get_context().pull_buildtrees + + if self._cached() and self.__artifact._cache_key == self.__strict_cache_key: + if pull_buildtrees: + # If we want to pull buildtrees, also pull if we're only missing the buildtree + if self._cached_buildtree() or not self._buildtree_exists(): + return False + else: + return False + + # Pull is pending if artifact remote server available + if not check_remotes or not self.__artifacts.has_fetch_remotes(plugin=self): + return False # Attempt to pull artifact without knowing whether it's available strict_artifact = Artifact(self, context, strong_key=self.__strict_cache_key, weak_key=self.__weak_cache_key) @@ -2445,7 +2438,7 @@ class Element(Plugin): assert not rdep.__build_deps_uncached < 0 if rdep.__buildable_callback is not None and rdep._buildable(): - rdep.__update_cache_key_non_strict() + rdep._update_cache_key_non_strict() rdep.__buildable_callback(rdep) rdep.__buildable_callback = None @@ -3152,7 +3145,7 @@ class Element(Plugin): # Updates weak and strict cache keys # # Note that it does not update *all* cache keys - In non-strict mode, the - # strong cache key is updated in __update_cache_key_non_strict() + # strong cache key is updated in _update_cache_key_non_strict() # # If the element is not resolved, this is a no-op (since inconsistent # elements cannot have cache keys). @@ -3257,7 +3250,7 @@ class Element(Plugin): if not context.get_strict() and self.__artifact.cached(): # In non-strict mode, strong cache key becomes available when # the artifact is cached - self.__update_cache_key_non_strict() + self._update_cache_key_non_strict() self.__schedule_assembly_when_necessary() @@ -3265,7 +3258,7 @@ class Element(Plugin): self.__can_query_cache_callback(self) self.__can_query_cache_callback = None - # __update_cache_key_non_strict() + # _update_cache_key_non_strict() # # Calculates the strong cache key if it hasn't already been set. # @@ -3277,7 +3270,7 @@ class Element(Plugin): # as the cache key can be loaded from the cache (possibly pulling from # a remote cache). # - def __update_cache_key_non_strict(self): + def _update_cache_key_non_strict(self): assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" # The final cache key can be None here only in non-strict mode |