From ca909d9c41f074858dd4d24d9b3cc226b7a1c3f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Mon, 14 Dec 2020 19:43:40 +0100 Subject: Move artifact and source cache query to a job thread This allows parallelization of cache queries. --- src/buildstream/_elementsources.py | 5 +- src/buildstream/_frontend/cli.py | 2 + src/buildstream/_loader/loader.py | 1 + src/buildstream/_scheduler/__init__.py | 1 + .../_scheduler/queues/cachequeryqueue.py | 66 ++++++++++++++++++++++ src/buildstream/_scheduler/queues/fetchqueue.py | 2 +- src/buildstream/_scheduler/queues/pullqueue.py | 4 -- src/buildstream/_stream.py | 44 +++++++++++++++ src/buildstream/element.py | 10 +--- tests/artifactcache/push.py | 3 + tests/frontend/fetch.py | 8 +-- tests/frontend/track.py | 12 ++-- tests/sourcecache/fetch.py | 6 ++ tests/sourcecache/push.py | 2 + tests/sourcecache/staging.py | 3 + tests/sources/git.py | 6 +- 16 files changed, 151 insertions(+), 24 deletions(-) create mode 100644 src/buildstream/_scheduler/queues/cachequeryqueue.py diff --git a/src/buildstream/_elementsources.py b/src/buildstream/_elementsources.py index 9b4afe4eb..d426ee657 100644 --- a/src/buildstream/_elementsources.py +++ b/src/buildstream/_elementsources.py @@ -211,6 +211,9 @@ class ElementSources: # SourceError: If one of the element sources has an error # def fetch(self): + if self._cached is None: + self.query_cache() + if self.cached(): return @@ -387,8 +390,6 @@ class ElementSources: unique_key = self.get_unique_key() self._cache_key = _cachekey.generate_key(unique_key) - self.query_cache() - # preflight(): # # A internal wrapper for calling the abstract preflight() method on diff --git a/src/buildstream/_frontend/cli.py b/src/buildstream/_frontend/cli.py index ab06e8a8a..cad1ee0ca 100644 --- a/src/buildstream/_frontend/cli.py +++ b/src/buildstream/_frontend/cli.py @@ -551,6 +551,8 @@ def show(app, elements, deps, except_, order, format_): dependencies = app.stream.load_selection(elements, selection=deps, except_targets=except_) + app.stream.query_cache(dependencies) + if order == "alpha": dependencies = sorted(dependencies) diff --git a/src/buildstream/_loader/loader.py b/src/buildstream/_loader/loader.py index 6ace3624b..3d0fb654b 100644 --- a/src/buildstream/_loader/loader.py +++ b/src/buildstream/_loader/loader.py @@ -830,6 +830,7 @@ class Loader: # Handle the case where a subproject needs to be fetched # + element._query_source_cache() if element._should_fetch(): self.load_context.fetch_subprojects([element]) diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py index d2f458fa5..fcde00dff 100644 --- a/src/buildstream/_scheduler/__init__.py +++ b/src/buildstream/_scheduler/__init__.py @@ -25,6 +25,7 @@ from .queues.trackqueue import TrackQueue from .queues.buildqueue import BuildQueue from .queues.artifactpushqueue import ArtifactPushQueue from .queues.pullqueue import PullQueue +from .queues.cachequeryqueue import CacheQueryQueue from .scheduler import Scheduler, SchedStatus from .jobs import ElementJob, JobStatus diff --git a/src/buildstream/_scheduler/queues/cachequeryqueue.py b/src/buildstream/_scheduler/queues/cachequeryqueue.py new file mode 100644 index 000000000..f447ab5e9 --- /dev/null +++ b/src/buildstream/_scheduler/queues/cachequeryqueue.py @@ -0,0 +1,66 @@ +# +# Copyright (C) 2020 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see . + +from . import Queue, QueueStatus +from ..resources import ResourceType +from ..jobs import JobStatus +from ...types import _KeyStrength + + +# A queue which queries the cache for artifacts and sources +# +class CacheQueryQueue(Queue): + + action_name = "Cache-query" + complete_name = "Cache queried" + resources = [ResourceType.PROCESS, ResourceType.CACHE] + + def __init__(self, scheduler, *, sources=False): + super().__init__(scheduler) + + self._query_sources = sources + + def get_process_func(self): + if not self._query_sources: + return CacheQueryQueue._query_artifacts_or_sources + else: + return CacheQueryQueue._query_sources + + def status(self, element): + if not element._get_cache_key(strength=_KeyStrength.WEAK): + # Strict and weak cache keys are unavailable if the element or + # a dependency has an unresolved source + return QueueStatus.SKIP + + return QueueStatus.READY + + def done(self, _, element, result, status): + if status is JobStatus.FAIL: + return + + if not self._query_sources: + if not element._pull_pending(): + element._load_artifact_done() + + @staticmethod + def _query_artifacts_or_sources(element): + element._load_artifact(pull=False) + if not element._can_query_cache() or not element._cached_success(): + element._query_source_cache() + + @staticmethod + def _query_sources(element): + element._query_source_cache() diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py index 3a4183d83..ee84982e2 100644 --- a/src/buildstream/_scheduler/queues/fetchqueue.py +++ b/src/buildstream/_scheduler/queues/fetchqueue.py @@ -56,7 +56,7 @@ class FetchQueue(Queue): # This will automatically skip elements which # have no sources. - if not element._should_fetch(self._should_fetch_original): + if element._can_query_source_cache() and not element._should_fetch(self._should_fetch_original): return QueueStatus.SKIP return QueueStatus.READY diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py index ecff02cec..9860256a6 100644 --- a/src/buildstream/_scheduler/queues/pullqueue.py +++ b/src/buildstream/_scheduler/queues/pullqueue.py @@ -43,10 +43,6 @@ class PullQueue(Queue): return QueueStatus.SKIP def done(self, _, element, result, status): - - if status is JobStatus.FAIL: - return - element._load_artifact_done() @staticmethod diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index e05100f24..0558a12b1 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -37,6 +37,7 @@ from ._scheduler import ( Scheduler, SchedStatus, TrackQueue, + CacheQueryQueue, FetchQueue, SourcePushQueue, BuildQueue, @@ -162,6 +163,25 @@ class Stream: return target_objects + # query_cache() + # + # Query the artifact and source caches to determine the cache status + # of the specified elements. + # + # Args: + # elements (list of Element): The elements to check + # + def query_cache(self, elements, *, sources=False): + with self._context.messenger.timed_activity("Query cache", silent_nested=True): + # Enqueue complete build plan as this is required to determine `buildable` status. + plan = list(self._pipeline.dependencies(elements, _Scope.ALL)) + + self._scheduler.clear_queues() + self._add_queue(CacheQueryQueue(self._scheduler, sources=sources), track=True) + self._enqueue_plan(plan) + self._run() + self._scheduler.clear_queues() + # shell() # # Run a shell @@ -208,6 +228,8 @@ class Stream: element = self.targets[0] element._set_required(scope) + self.query_cache([element] + elements) + if pull_: self._scheduler.clear_queues() self._add_queue(PullQueue(self._scheduler)) @@ -244,6 +266,7 @@ class Stream: # Ensure we have our sources if we are launching a build shell if scope == _Scope.BUILD and not usebuildtree: + self.query_cache([element], sources=True) self._fetch([element]) self._pipeline.assert_sources_cached([element]) @@ -294,6 +317,8 @@ class Stream: for element in self.targets: element._set_artifact_files_required(scope=scope) + self.query_cache(elements) + # Now construct the queues # self._scheduler.clear_queues() @@ -339,6 +364,8 @@ class Stream: source_remote_url=remote, ) + self.query_cache(elements, sources=True) + # Delegated to a shared fetch method self._fetch(elements, announce_session=True) @@ -403,6 +430,8 @@ class Stream: load_artifacts=True, ) + self.query_cache(elements, sources=True) + if not self._sourcecache.has_push_remotes(): raise StreamError("No source caches available for pushing sources") @@ -447,6 +476,9 @@ class Stream: raise StreamError("No artifact caches available for pulling artifacts") self._pipeline.assert_consistent(elements) + + self.query_cache(elements) + self._scheduler.clear_queues() self._add_queue(PullQueue(self._scheduler)) self._enqueue_plan(elements) @@ -489,6 +521,8 @@ class Stream: self._pipeline.assert_consistent(elements) + self.query_cache(elements) + self._scheduler.clear_queues() self._add_queue(PullQueue(self._scheduler)) self._add_queue(ArtifactPushQueue(self._scheduler)) @@ -539,6 +573,8 @@ class Stream: self._check_location_writable(location, force=force, tar=tar) + self.query_cache(elements) + uncached_elts = [elt for elt in elements if not elt._cached()] if uncached_elts and pull: self._context.messenger.info("Attempting to fetch missing or incomplete artifact") @@ -617,6 +653,8 @@ class Stream: targets, selection=selection, use_artifact_config=True, load_artifacts=True ) + self.query_cache(target_objects) + if self._artifacts.has_fetch_remotes(): self._pipeline.check_remotes(target_objects) @@ -636,6 +674,8 @@ class Stream: # Return list of Element and/or ArtifactElement objects target_objects = self.load_selection(targets, selection=_PipelineSelection.NONE, load_artifacts=True) + self.query_cache(target_objects) + artifact_logs = {} for obj in target_objects: ref = obj.get_artifact_name() @@ -664,6 +704,8 @@ class Stream: # Return list of Element and/or ArtifactElement objects target_objects = self.load_selection(targets, selection=_PipelineSelection.NONE, load_artifacts=True) + self.query_cache(target_objects) + elements_to_files = {} for obj in target_objects: ref = obj.get_artifact_name() @@ -742,6 +784,7 @@ class Stream: elements = self._load((target,), selection=deps, except_targets=except_targets) # Assert all sources are cached in the source dir + self.query_cache(elements, sources=True) self._fetch(elements) self._pipeline.assert_sources_cached(elements) @@ -775,6 +818,7 @@ class Stream: # If we're going to checkout, we need at least a fetch, # if not no_checkout: + self.query_cache(elements, sources=True) self._fetch(elements, fetch_original=True) expanded_directories = [] diff --git a/src/buildstream/element.py b/src/buildstream/element.py index f5f8a71bc..7dcbc3283 100644 --- a/src/buildstream/element.py +++ b/src/buildstream/element.py @@ -1945,6 +1945,9 @@ class Element(Plugin): self.__artifact = artifact return pulled + def _query_source_cache(self): + self.__sources.query_cache() + def _skip_source_push(self): if not self.sources() or self._get_workspace(): return True @@ -3256,13 +3259,6 @@ class Element(Plugin): # In strict mode, the strong cache key always matches the strict cache key self.__cache_key = self.__strict_cache_key - # If we've newly calculated a cache key, our artifact's - # current state will also change - after all, we can now find - # a potential existing artifact. - self._load_artifact(pull=False) - if not self._pull_pending(): - self._load_artifact_done() - # Update the message kwargs in use for this plugin to dispatch messages with # self._message_kwargs["element_key"] = self._get_display_key() diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py index 74062ce47..17ad2e2a7 100644 --- a/tests/artifactcache/push.py +++ b/tests/artifactcache/push.py @@ -43,6 +43,9 @@ def _push(cli, cache_dir, project_dir, config_file, target): artifactcache.setup_remotes(use_config=True) artifactcache.initialize_remotes() + # Query local cache + element._load_artifact(pull=False) + assert artifactcache.has_push_remotes(plugin=element), "No remote configured for element target.bst" assert element._push(), "Push operation failed" diff --git a/tests/frontend/fetch.py b/tests/frontend/fetch.py index b2c9d64c2..6c8a4b71c 100644 --- a/tests/frontend/fetch.py +++ b/tests/frontend/fetch.py @@ -62,10 +62,9 @@ def test_fetch_deps(cli, datafiles, deps, expected_states): def test_fetch_consistency_error(cli, datafiles): project = str(datafiles) - # When the error occurs outside of the scheduler at load time, - # then the SourceError is reported directly as the main error. result = cli.run(project=project, args=["source", "fetch", "error.bst"]) - result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error") + result.assert_main_error(ErrorDomain.STREAM, None) + result.assert_task_error(ErrorDomain.SOURCE, "the-consistency-error") @pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror")) @@ -73,7 +72,8 @@ def test_fetch_consistency_bug(cli, datafiles): project = str(datafiles) result = cli.run(project=project, args=["source", "fetch", "bug.bst"]) - result.assert_main_error(ErrorDomain.PLUGIN, "source-bug") + result.assert_main_error(ErrorDomain.STREAM, None) + result.assert_task_error(ErrorDomain.PLUGIN, "source-bug") @pytest.mark.datafiles(DATA_DIR) diff --git a/tests/frontend/track.py b/tests/frontend/track.py index bd8444973..d1a93242b 100644 --- a/tests/frontend/track.py +++ b/tests/frontend/track.py @@ -248,20 +248,22 @@ def test_track_cross_junction(cli, tmpdir, datafiles, cross_junction, ref_storag def test_track_consistency_error(cli, datafiles): project = str(datafiles) - # Track the element causing a consistency error + # Track the element causing a consistency error in `is_cached()` result = cli.run(project=project, args=["source", "track", "error.bst"]) - result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error") + + # We expect tracking to succeed as `is_cached()` is not required for tracking. + result.assert_success() @pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror")) def test_track_consistency_bug(cli, datafiles): project = str(datafiles) - # Track the element causing an unhandled exception + # Track the element causing an unhandled exception in `is_cached()` result = cli.run(project=project, args=["source", "track", "bug.bst"]) - # We expect BuildStream to fail gracefully, with no recorded exception. - result.assert_main_error(ErrorDomain.PLUGIN, "source-bug") + # We expect tracking to succeed as `is_cached()` is not required for tracking. + result.assert_success() @pytest.mark.datafiles(DATA_DIR) diff --git a/tests/sourcecache/fetch.py b/tests/sourcecache/fetch.py index 76f5508f9..40076e4c0 100644 --- a/tests/sourcecache/fetch.py +++ b/tests/sourcecache/fetch.py @@ -75,6 +75,7 @@ def test_source_fetch(cli, tmpdir, datafiles): element = project.load_elements([element_name])[0] element._initialize_state() + element._query_source_cache() assert not element._cached_sources() source = list(element.sources())[0] @@ -117,6 +118,7 @@ def test_source_fetch(cli, tmpdir, datafiles): element._initialize_state() # check that we have the source in the cas now and it's not fetched + element._query_source_cache() assert element._cached_sources() assert os.listdir(os.path.join(str(tmpdir), "cache", "sources", "git")) == [] @@ -135,6 +137,7 @@ def test_fetch_fallback(cli, tmpdir, datafiles): element = project.load_elements([element_name])[0] element._initialize_state() + element._query_source_cache() assert not element._cached_sources() source = list(element.sources())[0] @@ -153,6 +156,7 @@ def test_fetch_fallback(cli, tmpdir, datafiles): # Check that the source in both in the source dir and the local CAS element = project.load_elements([element_name])[0] element._initialize_state() + element._query_source_cache() assert element._cached_sources() @@ -169,6 +173,7 @@ def test_pull_fail(cli, tmpdir, datafiles): element = project.load_elements([element_name])[0] element._initialize_state() + element._query_source_cache() assert not element._cached_sources() source = list(element.sources())[0] @@ -201,6 +206,7 @@ def test_source_pull_partial_fallback_fetch(cli, tmpdir, datafiles): element = project.load_elements([element_name])[0] element._initialize_state() + element._query_source_cache() assert not element._cached_sources() source = list(element.sources())[0] diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py index 25a4309b8..aa703de1b 100644 --- a/tests/sourcecache/push.py +++ b/tests/sourcecache/push.py @@ -85,6 +85,7 @@ def test_source_push_split(cli, tmpdir, datafiles): element = project.load_elements(["push.bst"])[0] element._initialize_state() + element._query_source_cache() assert not element._cached_sources() source = list(element.sources())[0] @@ -135,6 +136,7 @@ def test_source_push(cli, tmpdir, datafiles): element = project.load_elements(["push.bst"])[0] element._initialize_state() + element._query_source_cache() assert not element._cached_sources() source = list(element.sources())[0] diff --git a/tests/sourcecache/staging.py b/tests/sourcecache/staging.py index 0f2f05891..e0e7002bf 100644 --- a/tests/sourcecache/staging.py +++ b/tests/sourcecache/staging.py @@ -65,6 +65,7 @@ def test_source_staged(tmpdir, cli, datafiles): # seems to be the only way to get the sources? element = project.load_elements(["import-bin.bst"])[0] element._initialize_state() + element._query_source_cache() source = list(element.sources())[0] assert element._cached_sources() assert sourcecache.contains(source) @@ -100,6 +101,7 @@ def test_source_fetch(tmpdir, cli, datafiles): element = project.load_elements(["import-dev.bst"])[0] element._initialize_state() + element._query_source_cache() source = list(element.sources())[0] assert element._cached_sources() @@ -136,6 +138,7 @@ def test_staged_source_build(tmpdir, datafiles, cli): element._initialize_state() # check consistency of the source + element._query_source_cache() assert not element._cached_sources() res = cli.run(project=project_dir, args=["build", "target.bst"]) diff --git a/tests/sources/git.py b/tests/sources/git.py index 30657d825..861e70cd3 100644 --- a/tests/sources/git.py +++ b/tests/sources/git.py @@ -401,10 +401,14 @@ def test_submodule_track_ignore_inconsistent(cli, tmpdir, datafiles): result = cli.run(project=project, args=["source", "fetch", "target.bst"]) result.assert_success() - # Track will encounter an inconsistent submodule without any ref + # Track to update to the offending commit result = cli.run(project=project, args=["source", "track", "target.bst"]) result.assert_success() + # Fetch after track will encounter an inconsistent submodule without any ref + result = cli.run(project=project, args=["source", "fetch", "target.bst"]) + result.assert_success() + # Assert that we are just fine without it, and emit a warning to the user. assert "Ignoring inconsistent submodule" in result.stderr -- cgit v1.2.1