From 515eab747f44f4f19d0598d6b8e7cd9a339d52ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Mon, 14 Dec 2020 21:36:08 +0100 Subject: element.py: Combine cache query and pull into `_load_artifact()` In non-strict mode cache query and pull are intertwined as we prefer pulling the strict artifact to a cache lookup with the weak cache key. This replaces `__update_artifact_state()` and `_pull()` with a combined `_load_artifact()` method, which supports cache query with deferred pulling. This provides correct behavior with the flexibility of split or combined cache query and pull. --- src/buildstream/_artifact.py | 7 +- src/buildstream/_frontend/widget.py | 2 +- src/buildstream/_scheduler/queues/pullqueue.py | 13 +- src/buildstream/element.py | 170 ++++++++++--------------- 4 files changed, 72 insertions(+), 120 deletions(-) diff --git a/src/buildstream/_artifact.py b/src/buildstream/_artifact.py index b63cff6e5..da1e03a86 100644 --- a/src/buildstream/_artifact.py +++ b/src/buildstream/_artifact.py @@ -591,9 +591,12 @@ class Artifact: # Returns: # (bool): Whether artifact is in local cache # - def cached(self): + def cached(self, *, buildtree=False): assert self._cached is not None - return self._cached + ret = self._cached + if buildtree: + ret = ret and (self.cached_buildtree() or not self.buildtree_exists()) + return ret # cached_logs() # diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py index ad6c813b0..3ef64d3db 100644 --- a/src/buildstream/_frontend/widget.py +++ b/src/buildstream/_frontend/widget.py @@ -358,7 +358,7 @@ class LogLine(Widget): if element.get_kind() == "junction": line = p.fmt_subst(line, "state", "junction", fg="magenta") elif not element._can_query_cache(): - line = p.fmt_subst(line, "state", "unknown", fg="bright_black") + line = p.fmt_subst(line, "state", "waiting", fg="blue") elif element._cached_failure(): line = p.fmt_subst(line, "state", "failed", fg="red") elif element._cached_success(): diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py index e1d69590f..ecff02cec 100644 --- a/src/buildstream/_scheduler/queues/pullqueue.py +++ b/src/buildstream/_scheduler/queues/pullqueue.py @@ -37,9 +37,6 @@ class PullQueue(Queue): return PullQueue._pull_or_skip def status(self, element): - if not element._can_query_cache(): - return QueueStatus.PENDING - if element._pull_pending(): return QueueStatus.READY else: @@ -50,15 +47,9 @@ class PullQueue(Queue): if status is JobStatus.FAIL: return - element._pull_done() - - def register_pending_element(self, element): - # Set a "can_query_cache"_callback for an element which is not - # immediately ready to query the artifact cache so that it - # may be pulled. - element._set_can_query_cache_callback(self._enqueue_element) + element._load_artifact_done() @staticmethod def _pull_or_skip(element): - if not element._pull(): + if not element._load_artifact(pull=True): raise SkipJob(PullQueue.action_name) diff --git a/src/buildstream/element.py b/src/buildstream/element.py index c4f0479b6..f5f8a71bc 100644 --- a/src/buildstream/element.py +++ b/src/buildstream/element.py @@ -290,7 +290,7 @@ class Element(Plugin): self.__sourcecache = context.sourcecache # Source cache self.__assemble_scheduled = False # Element is scheduled to be assembled self.__assemble_done = False # Element is assembled - self.__pull_done = False # Whether pull was attempted + self.__pull_pending = False # Whether pull is pending self.__cached_successfully = None # If the Element is known to be successfully cached self.__splits = None # Resolved regex objects for computing split domains self.__whitelist_regex = None # Resolved regex object to check if file is allowed to overlap @@ -1332,9 +1332,6 @@ class Element(Plugin): # # - __update_cache_keys() # - Computes the strong and weak cache keys. - # - _update_artifact_state() - # - Computes the state of the element's artifact using the - # cache key. # - __schedule_assembly_when_necessary() # - Schedules assembly of an element, iff its current state # allows/necessitates it @@ -1869,83 +1866,85 @@ class Element(Plugin): # (bool): Whether a pull operation is pending # def _pull_pending(self): - if self._get_workspace(): - # Workspace builds are never pushed to artifact servers - return False - - # Check whether the pull has been invoked with a specific subdir requested - # in user context, as to complete a partial artifact - pull_buildtrees = self._get_context().pull_buildtrees - - if self._cached() and self.__artifact._cache_key == self.__strict_cache_key: - if pull_buildtrees: - # If we've specified a subdir, check if the subdir is cached locally - # or if it's possible to get - if self._cached_buildtree() or not self._buildtree_exists(): - return False - else: - return False + return self.__pull_pending - # Pull is pending if artifact remote server available - # and pull has not been attempted yet - return self.__artifacts.has_fetch_remotes(plugin=self) and not self.__pull_done - - # _pull_done() + # _load_artifact_done() # - # Indicate that pull was attempted. + # Indicate that `_load_artifact()` has completed. # - # This needs to be called in the main process after a pull + # This needs to be called in the main process after `_load_artifact()` # succeeds or fails so that we properly update the main # process data model # # This will result in updating the element state. # - def _pull_done(self): + def _load_artifact_done(self): assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" - self.__pull_done = True + assert self.__artifact - # Artifact may become cached after pulling, so let it query the - # filesystem again to check - self.__artifact.query_cache() + context = self._get_context() + + if not context.get_strict() and self.__artifact.cached(): + # In non-strict mode, strong cache key becomes available when + # the artifact is cached + self.__update_cache_key_non_strict() - # We may not have actually pulled an artifact - the pull may - # have failed. We might therefore need to schedule assembly. - self.__schedule_assembly_when_necessary() - # If we've finished pulling, an artifact might now exist - # locally, so we might need to update a non-strict strong - # cache key. - self.__update_cache_key_non_strict() self._update_ready_for_runtime_and_cached() - # _pull(): + self.__schedule_assembly_when_necessary() + + if self.__can_query_cache_callback is not None: + self.__can_query_cache_callback(self) + self.__can_query_cache_callback = None + + # _load_artifact(): # - # Pull artifact from remote artifact repository into local artifact cache. + # Load artifact from cache or pull it from remote artifact repository. # # Returns: True if the artifact has been downloaded, False otherwise # - def _pull(self): + def _load_artifact(self, *, pull): context = self._get_context() - # Get optional specific subdir to pull and optional list to not pull - # based off of user context - pull_buildtrees = context.pull_buildtrees + pull_buildtrees = context.pull_buildtrees and not self._get_workspace() - # Attempt to pull artifact without knowing whether it's available - strict_artifact = Artifact(self, context, strong_key=self.__strict_cache_key, weak_key=self.__weak_cache_key) - if strict_artifact.pull(pull_buildtrees=pull_buildtrees): - # Notify successful download - return True + # First check whether we already have the strict artifact in the local cache + artifact = Artifact( + self, + context, + strict_key=self.__strict_cache_key, + strong_key=self.__strict_cache_key, + weak_key=self.__weak_cache_key, + ) + artifact.query_cache() - if not context.get_strict() and not self._cached(): - # In non-strict mode also try pulling weak artifact - # if no weak artifact is cached yet. - artifact = Artifact(self, context, weak_key=self.__weak_cache_key) - return artifact.pull(pull_buildtrees=pull_buildtrees) - else: - # No artifact has been downloaded + self.__pull_pending = False + if not pull and not artifact.cached(buildtree=pull_buildtrees): + if self.__artifacts.has_fetch_remotes(plugin=self) and not self._get_workspace(): + # Artifact is not completely available in cache and artifact remote server is available. + # Stop artifact loading here as pull is required to proceed. + self.__pull_pending = True + + # Attempt to pull artifact with the strict cache key + pulled = pull and artifact.pull(pull_buildtrees=pull_buildtrees) + + if artifact.cached() or context.get_strict(): + self.__artifact = artifact + return pulled + elif self.__pull_pending: return False + # In non-strict mode retry with weak cache key + artifact = Artifact(self, context, strict_key=self.__strict_cache_key, weak_key=self.__weak_cache_key) + artifact.query_cache() + + # Attempt to pull artifact with the weak cache key + pulled = pull and artifact.pull(pull_buildtrees=pull_buildtrees) + + self.__artifact = artifact + return pulled + def _skip_source_push(self): if not self.sources() or self._get_workspace(): return True @@ -2384,7 +2383,7 @@ class Element(Plugin): assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" if not self.__ready_for_runtime_and_cached: - if self.__runtime_deps_uncached == 0 and self.__cache_key and self._cached_success(): + if self.__runtime_deps_uncached == 0 and self.__artifact and self.__cache_key and self._cached_success(): self.__ready_for_runtime_and_cached = True # Notify reverse dependencies @@ -2871,10 +2870,13 @@ class Element(Plugin): # Load bits which have been stored on the artifact # + artifact.query_cache() if artifact.cached(): self.__environment = artifact.load_environment() self.__sandbox_config = artifact.load_sandbox_config() self.__variables = artifact.load_variables() + else: + self.__pull_pending = True self.__cache_key = artifact.strong_key self.__strict_cache_key = artifact.strict_key @@ -3257,58 +3259,14 @@ class Element(Plugin): # If we've newly calculated a cache key, our artifact's # current state will also change - after all, we can now find # a potential existing artifact. - self.__update_artifact_state() + self._load_artifact(pull=False) + if not self._pull_pending(): + self._load_artifact_done() # Update the message kwargs in use for this plugin to dispatch messages with # self._message_kwargs["element_key"] = self._get_display_key() - # __update_artifact_state() - # - # Updates the data involved in knowing about the artifact corresponding - # to this element. - # - # If the state changes, this will subsequently call - # `self.__schedule_assembly_when_necessary()` to schedule assembly if it becomes - # possible. - # - # Element.__update_cache_keys() must be called before this to have - # meaningful results, because the element must know its cache key before - # it can check whether an artifact exists for that cache key. - # - def __update_artifact_state(self): - assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" - assert self.__artifact is None - - context = self._get_context() - - strict_artifact = Artifact( - self, - context, - strong_key=self.__strict_cache_key, - strict_key=self.__strict_cache_key, - weak_key=self.__weak_cache_key, - ) - strict_artifact.query_cache() - if context.get_strict() or strict_artifact.cached(): - self.__artifact = strict_artifact - else: - self.__artifact = Artifact( - self, context, strict_key=self.__strict_cache_key, weak_key=self.__weak_cache_key - ) - self.__artifact.query_cache() - - if not context.get_strict() and self.__artifact.cached(): - # In non-strict mode, strong cache key becomes available when - # the artifact is cached - self.__update_cache_key_non_strict() - - self.__schedule_assembly_when_necessary() - - if self.__can_query_cache_callback is not None: - self.__can_query_cache_callback(self) - self.__can_query_cache_callback = None - # __update_cache_key_non_strict() # # Calculates the strong cache key if it hasn't already been set. @@ -3317,7 +3275,7 @@ class Element(Plugin): # strict cache key, so no work needs to be done. # # When buildstream is not run in strict mode, this requires the artifact - # state (as set in Element.__update_artifact_state()) to be set accordingly, + # state (as set in Element._load_artifact()) to be set accordingly, # as the cache key can be loaded from the cache (possibly pulling from # a remote cache). # -- cgit v1.2.1