summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2020-09-03 11:04:12 +0200
committerJürg Billeter <j@bitron.ch>2020-12-14 10:42:59 +0100
commitab35b016cc2c06528d8713af3b2fe099cad67197 (patch)
tree50ec496ae1061e016c81612334c0bfc2ae08c6dc
parentf0c5f4a0e4e7f1636ece9c13caad231a474f388a (diff)
downloadbuildstream-ab35b016cc2c06528d8713af3b2fe099cad67197.tar.gz
Always schedule pull job
This is preparation to perform artifact cache query as part of the same job as artifact pulling.
-rw-r--r--src/buildstream/_scheduler/queues/pullqueue.py20
-rw-r--r--src/buildstream/_stream.py37
-rw-r--r--src/buildstream/element.py65
3 files changed, 64 insertions, 58 deletions
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index e1d69590f..29243a088 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -33,17 +33,22 @@ class PullQueue(Queue):
complete_name = "Artifacts Pulled"
resources = [ResourceType.DOWNLOAD, ResourceType.CACHE]
+ def __init__(self, scheduler, *, check_remotes=True):
+ super().__init__(scheduler)
+
+ self._check_remotes = check_remotes
+
def get_process_func(self):
- return PullQueue._pull_or_skip
+ if self._check_remotes:
+ return PullQueue._pull_or_skip
+ else:
+ return PullQueue._check
def status(self, element):
if not element._can_query_cache():
return QueueStatus.PENDING
- if element._pull_pending():
- return QueueStatus.READY
- else:
- return QueueStatus.SKIP
+ return QueueStatus.READY
def done(self, _, element, result, status):
@@ -62,3 +67,8 @@ class PullQueue(Queue):
def _pull_or_skip(element):
if not element._pull():
raise SkipJob(PullQueue.action_name)
+
+ @staticmethod
+ def _check(element):
+ if not element._pull(check_remotes=False):
+ raise SkipJob(PullQueue.action_name)
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 9677a67a5..3adbca15f 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -173,12 +173,20 @@ class Stream:
#
def query_cache(self, elements):
with self._context.messenger.timed_activity("Query cache", silent_nested=True):
- # Artifact cache is implicitly queried on demand
+ # Enqueue complete build plan as this is required to determine `buildable` status.
+ plan = self._pipeline.plan(elements)
+
self._scheduler.clear_queues()
+ self._add_queue(PullQueue(self._scheduler, check_remotes=False))
self._add_queue(FetchQueue(self._scheduler, skip_cached=True, check_only=True))
- self._enqueue_plan(elements)
+ self._enqueue_plan(plan)
self._run()
+ # In non-strict mode, the above may not calculate all strong cache keys.
+ if not self._context.get_strict():
+ for element in plan:
+ element._update_cache_key_non_strict()
+
# shell()
#
# Run a shell
@@ -225,12 +233,11 @@ class Stream:
element = self.targets[0]
element._set_required(scope)
- if pull_:
- self._scheduler.clear_queues()
- self._add_queue(PullQueue(self._scheduler))
- plan = self._pipeline.add_elements([element], elements)
- self._enqueue_plan(plan)
- self._run()
+ self._scheduler.clear_queues()
+ self._add_queue(PullQueue(self._scheduler, check_remotes=pull_))
+ plan = self._pipeline.add_elements([element], elements)
+ self._enqueue_plan(plan)
+ self._run()
missing_deps = [dep for dep in self._pipeline.dependencies([element], scope) if not dep._cached()]
if missing_deps:
@@ -311,8 +318,7 @@ class Stream:
#
self._scheduler.clear_queues()
- if self._artifacts.has_fetch_remotes():
- self._add_queue(PullQueue(self._scheduler))
+ self._add_queue(PullQueue(self._scheduler))
self._add_queue(FetchQueue(self._scheduler, skip_cached=True))
@@ -552,13 +558,10 @@ class Stream:
self._check_location_writable(location, force=force, tar=tar)
- uncached_elts = [elt for elt in elements if not elt._cached()]
- if uncached_elts and pull:
- self._message(MessageType.INFO, "Attempting to fetch missing or incomplete artifact")
- self._scheduler.clear_queues()
- self._add_queue(PullQueue(self._scheduler))
- self._enqueue_plan(uncached_elts)
- self._run(announce_session=True)
+ self._scheduler.clear_queues()
+ self._add_queue(PullQueue(self._scheduler, check_remotes=pull))
+ self._enqueue_plan(elements)
+ self._run(announce_session=True)
try:
scope = {
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 6619f93dc..3c6995101 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -1338,7 +1338,7 @@ class Element(Plugin):
# - __schedule_assembly_when_necessary()
# - Schedules assembly of an element, iff its current state
# allows/necessitates it
- # - __update_cache_key_non_strict()
+ # - _update_cache_key_non_strict()
# - Sets strict cache keys in non-strict builds
# - Some non-strict build actions can create artifacts
# compatible with strict mode (such as pulling), so
@@ -1637,7 +1637,7 @@ class Element(Plugin):
# Once we schedule an element for assembly, we know that our
# build dependencies have strong cache keys, so we can update
# our own strong cache key.
- self.__update_cache_key_non_strict()
+ self._update_cache_key_non_strict()
# _assemble_done():
#
@@ -1666,7 +1666,7 @@ class Element(Plugin):
# assembled everything to this point without a strong cache
# key. Once the element has been assembled, a strong cache key
# can be set, so we do so.
- self.__update_cache_key_non_strict()
+ self._update_cache_key_non_strict()
self._update_ready_for_runtime_and_cached()
if self._get_workspace() and self._cached():
@@ -1862,34 +1862,13 @@ class Element(Plugin):
# _pull_pending()
#
- # Check whether the artifact will be pulled. If the pull operation is to
- # include a specific subdir of the element artifact (from cli or user conf)
- # then the local cache is queried for the subdirs existence.
+ # Check whether the artifact will be pulled.
#
# Returns:
# (bool): Whether a pull operation is pending
#
def _pull_pending(self):
- if self._get_workspace():
- # Workspace builds are never pushed to artifact servers
- return False
-
- # Check whether the pull has been invoked with a specific subdir requested
- # in user context, as to complete a partial artifact
- pull_buildtrees = self._get_context().pull_buildtrees
-
- if self._cached() and self.__artifact._cache_key == self.__strict_cache_key:
- if pull_buildtrees:
- # If we've specified a subdir, check if the subdir is cached locally
- # or if it's possible to get
- if self._cached_buildtree() or not self._buildtree_exists():
- return False
- else:
- return False
-
- # Pull is pending if artifact remote server available
- # and pull has not been attempted yet
- return self.__artifacts.has_fetch_remotes(plugin=self) and not self.__pull_done
+ return not self.__pull_done
# _pull_done()
#
@@ -1916,7 +1895,7 @@ class Element(Plugin):
# If we've finished pulling, an artifact might now exist
# locally, so we might need to update a non-strict strong
# cache key.
- self.__update_cache_key_non_strict()
+ self._update_cache_key_non_strict()
self._update_ready_for_runtime_and_cached()
# _pull():
@@ -1925,12 +1904,26 @@ class Element(Plugin):
#
# Returns: True if the artifact has been downloaded, False otherwise
#
- def _pull(self):
+ def _pull(self, *, check_remotes=True):
context = self._get_context()
- # Get optional specific subdir to pull and optional list to not pull
- # based off of user context
- pull_buildtrees = context.pull_buildtrees
+ if self._get_workspace():
+ # Workspace builds are never pushed to artifact servers
+ return False
+
+ pull_buildtrees = self._get_context().pull_buildtrees
+
+ if self._cached() and self.__artifact._cache_key == self.__strict_cache_key:
+ if pull_buildtrees:
+ # If we want to pull buildtrees, also pull if we're only missing the buildtree
+ if self._cached_buildtree() or not self._buildtree_exists():
+ return False
+ else:
+ return False
+
+ # Pull is pending if artifact remote server available
+ if not check_remotes or not self.__artifacts.has_fetch_remotes(plugin=self):
+ return False
# Attempt to pull artifact without knowing whether it's available
strict_artifact = Artifact(self, context, strong_key=self.__strict_cache_key, weak_key=self.__weak_cache_key)
@@ -2418,7 +2411,7 @@ class Element(Plugin):
assert not rdep.__build_deps_uncached < 0
if rdep.__buildable_callback is not None and rdep._buildable():
- rdep.__update_cache_key_non_strict()
+ rdep._update_cache_key_non_strict()
rdep.__buildable_callback(rdep)
rdep.__buildable_callback = None
@@ -3201,7 +3194,7 @@ class Element(Plugin):
# Updates weak and strict cache keys
#
# Note that it does not update *all* cache keys - In non-strict mode, the
- # strong cache key is updated in __update_cache_key_non_strict()
+ # strong cache key is updated in _update_cache_key_non_strict()
#
# If the element is not resolved, this is a no-op (since inconsistent
# elements cannot have cache keys).
@@ -3318,7 +3311,7 @@ class Element(Plugin):
if not context.get_strict() and self.__artifact.cached():
# In non-strict mode, strong cache key becomes available when
# the artifact is cached
- self.__update_cache_key_non_strict()
+ self._update_cache_key_non_strict()
self.__schedule_assembly_when_necessary()
@@ -3326,7 +3319,7 @@ class Element(Plugin):
self.__can_query_cache_callback(self)
self.__can_query_cache_callback = None
- # __update_cache_key_non_strict()
+ # _update_cache_key_non_strict()
#
# Calculates the strong cache key if it hasn't already been set.
#
@@ -3338,7 +3331,7 @@ class Element(Plugin):
# as the cache key can be loaded from the cache (possibly pulling from
# a remote cache).
#
- def __update_cache_key_non_strict(self):
+ def _update_cache_key_non_strict(self):
assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
# The final cache key can be None here only in non-strict mode