summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2020-09-09 17:07:03 +0200
committerJürg Billeter <j@bitron.ch>2020-12-14 10:42:59 +0100
commit592c04e44fb73151f5273f7125aaf7e0b60b925d (patch)
treea8496eab89bc3312401320fa307eeb2408a30e62
parentab35b016cc2c06528d8713af3b2fe099cad67197 (diff)
downloadbuildstream-592c04e44fb73151f5273f7125aaf7e0b60b925d.tar.gz
Move artifact cache query to pull job
-rw-r--r--src/buildstream/_artifact.py9
-rw-r--r--src/buildstream/_scheduler/queues/pullqueue.py13
-rw-r--r--src/buildstream/element.py149
-rw-r--r--tests/artifactcache/push.py3
4 files changed, 67 insertions, 107 deletions
diff --git a/src/buildstream/_artifact.py b/src/buildstream/_artifact.py
index 9809ac822..309edd0e6 100644
--- a/src/buildstream/_artifact.py
+++ b/src/buildstream/_artifact.py
@@ -640,6 +640,15 @@ class Artifact:
def pull(self, *, pull_buildtrees):
artifacts = self._context.artifactcache
+ # Attempt pull only if anything is missing
+ if self.cached():
+ if pull_buildtrees:
+ # If we want to pull buildtrees, also pull if we're only missing the buildtree
+ if self.cached_buildtree() or not self.buildtree_exists():
+ return False
+ else:
+ return False
+
pull_key = self.get_extract_key()
if not artifacts.pull(self._element, pull_key, pull_buildtrees=pull_buildtrees):
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index 29243a088..84335715f 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -23,6 +23,7 @@ from . import Queue, QueueStatus
from ..resources import ResourceType
from ..jobs import JobStatus
from ..._exceptions import SkipJob
+from ...types import _KeyStrength
# A queue which pulls element artifacts
@@ -45,8 +46,10 @@ class PullQueue(Queue):
return PullQueue._check
def status(self, element):
- if not element._can_query_cache():
- return QueueStatus.PENDING
+ if not element._get_cache_key(strength=_KeyStrength.WEAK):
+ # Strict and weak cache keys are unavailable if the element or
+ # a dependency has an unresolved source
+ return QueueStatus.SKIP
return QueueStatus.READY
@@ -57,12 +60,6 @@ class PullQueue(Queue):
element._pull_done()
- def register_pending_element(self, element):
- # Set a "can_query_cache"_callback for an element which is not
- # immediately ready to query the artifact cache so that it
- # may be pulled.
- element._set_can_query_cache_callback(self._enqueue_element)
-
@staticmethod
def _pull_or_skip(element):
if not element._pull():
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 3c6995101..cbacb7372 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -310,7 +310,7 @@ class Element(Plugin):
# Callbacks
self.__required_callback = None # Callback to Queues
- self.__can_query_cache_callback = None # Callback to PullQueue/FetchQueue
+ self.__can_query_cache_callback = None # Callback to FetchQueue
self.__buildable_callback = None # Callback to BuildQueue
self.__resolved_initial_state = False # Whether the initial state of the Element has been resolved
@@ -1308,7 +1308,7 @@ class Element(Plugin):
# _initialize_state()
#
- # Compute up the elment's initial state. Element state contains
+ # Compute up the element's initial state. Element state contains
# the following mutable sub-states:
#
# - Source state in `ElementSources`
@@ -1332,9 +1332,6 @@ class Element(Plugin):
#
# - __update_cache_keys()
# - Computes the strong and weak cache keys.
- # - _update_artifact_state()
- # - Computes the state of the element's artifact using the
- # cache key.
# - __schedule_assembly_when_necessary()
# - Schedules assembly of an element, iff its current state
# allows/necessitates it
@@ -1872,10 +1869,10 @@ class Element(Plugin):
# _pull_done()
#
- # Indicate that pull was attempted.
+ # Indicate that pull job completed successfully.
#
- # This needs to be called in the main process after a pull
- # succeeds or fails so that we properly update the main
+ # This needs to be called in the main thread after a pull job completes
+ # successfully or is skipped so that we properly update the main
# process data model
#
# This will result in updating the element state.
@@ -1883,21 +1880,25 @@ class Element(Plugin):
def _pull_done(self):
assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
+ context = self._get_context()
+
self.__pull_done = True
- # Artifact may become cached after pulling, so let it query the
- # filesystem again to check
- self.__artifact.query_cache()
+ assert self.__artifact
+
+ if not context.get_strict() and self.__artifact.cached():
+ # In non-strict mode, strong cache key becomes available when
+ # the artifact is cached
+ self._update_cache_key_non_strict()
- # We may not have actually pulled an artifact - the pull may
- # have failed. We might therefore need to schedule assembly.
- self.__schedule_assembly_when_necessary()
- # If we've finished pulling, an artifact might now exist
- # locally, so we might need to update a non-strict strong
- # cache key.
- self._update_cache_key_non_strict()
self._update_ready_for_runtime_and_cached()
+ self.__schedule_assembly_when_necessary()
+
+ if self.__can_query_cache_callback is not None:
+ self.__can_query_cache_callback(self)
+ self.__can_query_cache_callback = None
+
# _pull():
#
# Pull artifact from remote artifact repository into local artifact cache.
@@ -1907,38 +1908,38 @@ class Element(Plugin):
def _pull(self, *, check_remotes=True):
context = self._get_context()
- if self._get_workspace():
- # Workspace builds are never pushed to artifact servers
- return False
+ pull_buildtrees = context.pull_buildtrees and not self._get_workspace()
- pull_buildtrees = self._get_context().pull_buildtrees
+ # Check remotes if requested and artifact remote server available.
+ # Workspace builds are never pushed to artifact servers
+ check_remotes = check_remotes and self.__artifacts.has_fetch_remotes(plugin=self) and not self._get_workspace()
- if self._cached() and self.__artifact._cache_key == self.__strict_cache_key:
- if pull_buildtrees:
- # If we want to pull buildtrees, also pull if we're only missing the buildtree
- if self._cached_buildtree() or not self._buildtree_exists():
- return False
- else:
- return False
+ # First check whether we already have the strict artifact in the local cache
+ artifact = Artifact(
+ self,
+ context,
+ strict_key=self.__strict_cache_key,
+ strong_key=self.__strict_cache_key,
+ weak_key=self.__weak_cache_key,
+ )
+ artifact.query_cache()
- # Pull is pending if artifact remote server available
- if not check_remotes or not self.__artifacts.has_fetch_remotes(plugin=self):
- return False
+ # Attempt to pull artifact with the strict cache key
+ pulled = check_remotes and artifact.pull(pull_buildtrees=pull_buildtrees)
- # Attempt to pull artifact without knowing whether it's available
- strict_artifact = Artifact(self, context, strong_key=self.__strict_cache_key, weak_key=self.__weak_cache_key)
- if strict_artifact.pull(pull_buildtrees=pull_buildtrees):
- # Notify successful download
- return True
+ if artifact.cached() or context.get_strict():
+ self.__artifact = artifact
+ return pulled
- if not context.get_strict() and not self._cached():
- # In non-strict mode also try pulling weak artifact
- # if no weak artifact is cached yet.
- artifact = Artifact(self, context, weak_key=self.__weak_cache_key)
- return artifact.pull(pull_buildtrees=pull_buildtrees)
- else:
- # No artifact has been downloaded
- return False
+ # In non-strict mode retry with weak cache key
+ artifact = Artifact(self, context, strict_key=self.__strict_cache_key, weak_key=self.__weak_cache_key)
+ artifact.query_cache()
+
+ # Attempt to pull artifact with the weak cache key
+ pulled = check_remotes and artifact.pull(pull_buildtrees=pull_buildtrees)
+
+ self.__artifact = artifact
+ return pulled
def _skip_source_push(self):
if not self.sources() or self._get_workspace():
@@ -2333,7 +2334,7 @@ class Element(Plugin):
# _set_can_query_cache_callback()
#
- # Notify the pull/fetch queue that the element is potentially
+ # Notify the fetch queue that the element is potentially
# ready to be processed.
#
# Set the _can_query_cache_callback - the _can_query_cache_callback is
@@ -2394,7 +2395,7 @@ class Element(Plugin):
assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
if not self.__ready_for_runtime_and_cached:
- if self.__runtime_deps_uncached == 0 and self.__cache_key and self._cached_success():
+ if self.__runtime_deps_uncached == 0 and self.__pull_done and self.__cache_key and self._cached_success():
self.__ready_for_runtime_and_cached = True
# Notify reverse dependencies
@@ -2881,6 +2882,7 @@ class Element(Plugin):
# Load bits which have been stored on the artifact
#
+ artifact.query_cache()
if artifact.cached():
self.__environment = artifact.load_environment()
self.__sandbox_config = artifact.load_sandbox_config()
@@ -3264,61 +3266,10 @@ class Element(Plugin):
# In strict mode, the strong cache key always matches the strict cache key
self.__cache_key = self.__strict_cache_key
- # If we've newly calculated a cache key, our artifact's
- # current state will also change - after all, we can now find
- # a potential existing artifact.
- self.__update_artifact_state()
-
# Update the message kwargs in use for this plugin to dispatch messages with
#
self._message_kwargs["element_key"] = self._get_display_key()
- # __update_artifact_state()
- #
- # Updates the data involved in knowing about the artifact corresponding
- # to this element.
- #
- # If the state changes, this will subsequently call
- # `self.__schedule_assembly_when_necessary()` to schedule assembly if it becomes
- # possible.
- #
- # Element.__update_cache_keys() must be called before this to have
- # meaningful results, because the element must know its cache key before
- # it can check whether an artifact exists for that cache key.
- #
- def __update_artifact_state(self):
- assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
- assert self.__artifact is None
-
- context = self._get_context()
-
- strict_artifact = Artifact(
- self,
- context,
- strong_key=self.__strict_cache_key,
- strict_key=self.__strict_cache_key,
- weak_key=self.__weak_cache_key,
- )
- strict_artifact.query_cache()
- if context.get_strict() or strict_artifact.cached():
- self.__artifact = strict_artifact
- else:
- self.__artifact = Artifact(
- self, context, strict_key=self.__strict_cache_key, weak_key=self.__weak_cache_key
- )
- self.__artifact.query_cache()
-
- if not context.get_strict() and self.__artifact.cached():
- # In non-strict mode, strong cache key becomes available when
- # the artifact is cached
- self._update_cache_key_non_strict()
-
- self.__schedule_assembly_when_necessary()
-
- if self.__can_query_cache_callback is not None:
- self.__can_query_cache_callback(self)
- self.__can_query_cache_callback = None
-
# _update_cache_key_non_strict()
#
# Calculates the strong cache key if it hasn't already been set.
@@ -3327,7 +3278,7 @@ class Element(Plugin):
# strict cache key, so no work needs to be done.
#
# When buildstream is not run in strict mode, this requires the artifact
- # state (as set in Element.__update_artifact_state()) to be set accordingly,
+ # state (as set in Element._pull_done()) to be set accordingly,
# as the cache key can be loaded from the cache (possibly pulling from
# a remote cache).
#
diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py
index 74062ce47..0bcf20502 100644
--- a/tests/artifactcache/push.py
+++ b/tests/artifactcache/push.py
@@ -43,6 +43,9 @@ def _push(cli, cache_dir, project_dir, config_file, target):
artifactcache.setup_remotes(use_config=True)
artifactcache.initialize_remotes()
+ # Query local cache
+ element._pull(check_remotes=False)
+
assert artifactcache.has_push_remotes(plugin=element), "No remote configured for element target.bst"
assert element._push(), "Push operation failed"