From 592c04e44fb73151f5273f7125aaf7e0b60b925d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Wed, 9 Sep 2020 17:07:03 +0200 Subject: Move artifact cache query to pull job --- src/buildstream/_artifact.py | 9 ++ src/buildstream/_scheduler/queues/pullqueue.py | 13 +-- src/buildstream/element.py | 149 +++++++++---------------- tests/artifactcache/push.py | 3 + 4 files changed, 67 insertions(+), 107 deletions(-) diff --git a/src/buildstream/_artifact.py b/src/buildstream/_artifact.py index 9809ac822..309edd0e6 100644 --- a/src/buildstream/_artifact.py +++ b/src/buildstream/_artifact.py @@ -640,6 +640,15 @@ class Artifact: def pull(self, *, pull_buildtrees): artifacts = self._context.artifactcache + # Attempt pull only if anything is missing + if self.cached(): + if pull_buildtrees: + # If we want to pull buildtrees, also pull if we're only missing the buildtree + if self.cached_buildtree() or not self.buildtree_exists(): + return False + else: + return False + pull_key = self.get_extract_key() if not artifacts.pull(self._element, pull_key, pull_buildtrees=pull_buildtrees): diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py index 29243a088..84335715f 100644 --- a/src/buildstream/_scheduler/queues/pullqueue.py +++ b/src/buildstream/_scheduler/queues/pullqueue.py @@ -23,6 +23,7 @@ from . import Queue, QueueStatus from ..resources import ResourceType from ..jobs import JobStatus from ..._exceptions import SkipJob +from ...types import _KeyStrength # A queue which pulls element artifacts @@ -45,8 +46,10 @@ class PullQueue(Queue): return PullQueue._check def status(self, element): - if not element._can_query_cache(): - return QueueStatus.PENDING + if not element._get_cache_key(strength=_KeyStrength.WEAK): + # Strict and weak cache keys are unavailable if the element or + # a dependency has an unresolved source + return QueueStatus.SKIP return QueueStatus.READY @@ -57,12 +60,6 @@ class PullQueue(Queue): element._pull_done() - def register_pending_element(self, element): - # Set a "can_query_cache"_callback for an element which is not - # immediately ready to query the artifact cache so that it - # may be pulled. - element._set_can_query_cache_callback(self._enqueue_element) - @staticmethod def _pull_or_skip(element): if not element._pull(): diff --git a/src/buildstream/element.py b/src/buildstream/element.py index 3c6995101..cbacb7372 100644 --- a/src/buildstream/element.py +++ b/src/buildstream/element.py @@ -310,7 +310,7 @@ class Element(Plugin): # Callbacks self.__required_callback = None # Callback to Queues - self.__can_query_cache_callback = None # Callback to PullQueue/FetchQueue + self.__can_query_cache_callback = None # Callback to FetchQueue self.__buildable_callback = None # Callback to BuildQueue self.__resolved_initial_state = False # Whether the initial state of the Element has been resolved @@ -1308,7 +1308,7 @@ class Element(Plugin): # _initialize_state() # - # Compute up the elment's initial state. Element state contains + # Compute up the element's initial state. Element state contains # the following mutable sub-states: # # - Source state in `ElementSources` @@ -1332,9 +1332,6 @@ class Element(Plugin): # # - __update_cache_keys() # - Computes the strong and weak cache keys. - # - _update_artifact_state() - # - Computes the state of the element's artifact using the - # cache key. # - __schedule_assembly_when_necessary() # - Schedules assembly of an element, iff its current state # allows/necessitates it @@ -1872,10 +1869,10 @@ class Element(Plugin): # _pull_done() # - # Indicate that pull was attempted. + # Indicate that pull job completed successfully. # - # This needs to be called in the main process after a pull - # succeeds or fails so that we properly update the main + # This needs to be called in the main thread after a pull job completes + # successfully or is skipped so that we properly update the main # process data model # # This will result in updating the element state. @@ -1883,21 +1880,25 @@ class Element(Plugin): def _pull_done(self): assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" + context = self._get_context() + self.__pull_done = True - # Artifact may become cached after pulling, so let it query the - # filesystem again to check - self.__artifact.query_cache() + assert self.__artifact + + if not context.get_strict() and self.__artifact.cached(): + # In non-strict mode, strong cache key becomes available when + # the artifact is cached + self._update_cache_key_non_strict() - # We may not have actually pulled an artifact - the pull may - # have failed. We might therefore need to schedule assembly. - self.__schedule_assembly_when_necessary() - # If we've finished pulling, an artifact might now exist - # locally, so we might need to update a non-strict strong - # cache key. - self._update_cache_key_non_strict() self._update_ready_for_runtime_and_cached() + self.__schedule_assembly_when_necessary() + + if self.__can_query_cache_callback is not None: + self.__can_query_cache_callback(self) + self.__can_query_cache_callback = None + # _pull(): # # Pull artifact from remote artifact repository into local artifact cache. @@ -1907,38 +1908,38 @@ class Element(Plugin): def _pull(self, *, check_remotes=True): context = self._get_context() - if self._get_workspace(): - # Workspace builds are never pushed to artifact servers - return False + pull_buildtrees = context.pull_buildtrees and not self._get_workspace() - pull_buildtrees = self._get_context().pull_buildtrees + # Check remotes if requested and artifact remote server available. + # Workspace builds are never pushed to artifact servers + check_remotes = check_remotes and self.__artifacts.has_fetch_remotes(plugin=self) and not self._get_workspace() - if self._cached() and self.__artifact._cache_key == self.__strict_cache_key: - if pull_buildtrees: - # If we want to pull buildtrees, also pull if we're only missing the buildtree - if self._cached_buildtree() or not self._buildtree_exists(): - return False - else: - return False + # First check whether we already have the strict artifact in the local cache + artifact = Artifact( + self, + context, + strict_key=self.__strict_cache_key, + strong_key=self.__strict_cache_key, + weak_key=self.__weak_cache_key, + ) + artifact.query_cache() - # Pull is pending if artifact remote server available - if not check_remotes or not self.__artifacts.has_fetch_remotes(plugin=self): - return False + # Attempt to pull artifact with the strict cache key + pulled = check_remotes and artifact.pull(pull_buildtrees=pull_buildtrees) - # Attempt to pull artifact without knowing whether it's available - strict_artifact = Artifact(self, context, strong_key=self.__strict_cache_key, weak_key=self.__weak_cache_key) - if strict_artifact.pull(pull_buildtrees=pull_buildtrees): - # Notify successful download - return True + if artifact.cached() or context.get_strict(): + self.__artifact = artifact + return pulled - if not context.get_strict() and not self._cached(): - # In non-strict mode also try pulling weak artifact - # if no weak artifact is cached yet. - artifact = Artifact(self, context, weak_key=self.__weak_cache_key) - return artifact.pull(pull_buildtrees=pull_buildtrees) - else: - # No artifact has been downloaded - return False + # In non-strict mode retry with weak cache key + artifact = Artifact(self, context, strict_key=self.__strict_cache_key, weak_key=self.__weak_cache_key) + artifact.query_cache() + + # Attempt to pull artifact with the weak cache key + pulled = check_remotes and artifact.pull(pull_buildtrees=pull_buildtrees) + + self.__artifact = artifact + return pulled def _skip_source_push(self): if not self.sources() or self._get_workspace(): @@ -2333,7 +2334,7 @@ class Element(Plugin): # _set_can_query_cache_callback() # - # Notify the pull/fetch queue that the element is potentially + # Notify the fetch queue that the element is potentially # ready to be processed. # # Set the _can_query_cache_callback - the _can_query_cache_callback is @@ -2394,7 +2395,7 @@ class Element(Plugin): assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" if not self.__ready_for_runtime_and_cached: - if self.__runtime_deps_uncached == 0 and self.__cache_key and self._cached_success(): + if self.__runtime_deps_uncached == 0 and self.__pull_done and self.__cache_key and self._cached_success(): self.__ready_for_runtime_and_cached = True # Notify reverse dependencies @@ -2881,6 +2882,7 @@ class Element(Plugin): # Load bits which have been stored on the artifact # + artifact.query_cache() if artifact.cached(): self.__environment = artifact.load_environment() self.__sandbox_config = artifact.load_sandbox_config() @@ -3264,61 +3266,10 @@ class Element(Plugin): # In strict mode, the strong cache key always matches the strict cache key self.__cache_key = self.__strict_cache_key - # If we've newly calculated a cache key, our artifact's - # current state will also change - after all, we can now find - # a potential existing artifact. - self.__update_artifact_state() - # Update the message kwargs in use for this plugin to dispatch messages with # self._message_kwargs["element_key"] = self._get_display_key() - # __update_artifact_state() - # - # Updates the data involved in knowing about the artifact corresponding - # to this element. - # - # If the state changes, this will subsequently call - # `self.__schedule_assembly_when_necessary()` to schedule assembly if it becomes - # possible. - # - # Element.__update_cache_keys() must be called before this to have - # meaningful results, because the element must know its cache key before - # it can check whether an artifact exists for that cache key. - # - def __update_artifact_state(self): - assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" - assert self.__artifact is None - - context = self._get_context() - - strict_artifact = Artifact( - self, - context, - strong_key=self.__strict_cache_key, - strict_key=self.__strict_cache_key, - weak_key=self.__weak_cache_key, - ) - strict_artifact.query_cache() - if context.get_strict() or strict_artifact.cached(): - self.__artifact = strict_artifact - else: - self.__artifact = Artifact( - self, context, strict_key=self.__strict_cache_key, weak_key=self.__weak_cache_key - ) - self.__artifact.query_cache() - - if not context.get_strict() and self.__artifact.cached(): - # In non-strict mode, strong cache key becomes available when - # the artifact is cached - self._update_cache_key_non_strict() - - self.__schedule_assembly_when_necessary() - - if self.__can_query_cache_callback is not None: - self.__can_query_cache_callback(self) - self.__can_query_cache_callback = None - # _update_cache_key_non_strict() # # Calculates the strong cache key if it hasn't already been set. @@ -3327,7 +3278,7 @@ class Element(Plugin): # strict cache key, so no work needs to be done. # # When buildstream is not run in strict mode, this requires the artifact - # state (as set in Element.__update_artifact_state()) to be set accordingly, + # state (as set in Element._pull_done()) to be set accordingly, # as the cache key can be loaded from the cache (possibly pulling from # a remote cache). # diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py index 74062ce47..0bcf20502 100644 --- a/tests/artifactcache/push.py +++ b/tests/artifactcache/push.py @@ -43,6 +43,9 @@ def _push(cli, cache_dir, project_dir, config_file, target): artifactcache.setup_remotes(use_config=True) artifactcache.initialize_remotes() + # Query local cache + element._pull(check_remotes=False) + assert artifactcache.has_push_remotes(plugin=element), "No remote configured for element target.bst" assert element._push(), "Push operation failed" -- cgit v1.2.1