diff options
author | Jürg Billeter <j@bitron.ch> | 2020-09-17 17:33:06 +0200 |
---|---|---|
committer | Jürg Billeter <j@bitron.ch> | 2020-12-14 10:41:51 +0100 |
commit | fa5e01bbb0126e112df1b69ef6ed7d3a3beaa86b (patch) | |
tree | 43dd061762006d0f252cc84a9b4a7576dcb61c8b | |
parent | 2a1b85a203dd088f262aaa49c95d880027a4d367 (diff) | |
download | buildstream-fa5e01bbb0126e112df1b69ef6ed7d3a3beaa86b.tar.gz |
Move source cache query to fetch job
-rw-r--r-- | src/buildstream/_elementsources.py | 14 | ||||
-rw-r--r-- | src/buildstream/_frontend/widget.py | 2 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/fetchqueue.py | 22 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 8 | ||||
-rw-r--r-- | src/buildstream/element.py | 37 | ||||
-rw-r--r-- | tests/frontend/fetch.py | 8 | ||||
-rw-r--r-- | tests/frontend/track.py | 12 | ||||
-rw-r--r-- | tests/sourcecache/fetch.py | 6 | ||||
-rw-r--r-- | tests/sourcecache/push.py | 2 | ||||
-rw-r--r-- | tests/sourcecache/staging.py | 3 | ||||
-rw-r--r-- | tests/sources/git.py | 6 |
11 files changed, 91 insertions, 29 deletions
diff --git a/src/buildstream/_elementsources.py b/src/buildstream/_elementsources.py index a3e510543..f5aaa56d4 100644 --- a/src/buildstream/_elementsources.py +++ b/src/buildstream/_elementsources.py @@ -153,11 +153,13 @@ class ElementSources: # # Args: # fetched_original (bool): Whether the original sources had been asked (and fetched) or not + # cached (bool): Whether the sources are now cached in CAS # - def fetch_done(self, fetched_original): - self._proto = self._elementsourcescache.load_proto(self) - assert self._proto - self._cached = True + def fetch_done(self, fetched_original, cached): + self._cached = cached + if cached: + self._proto = self._elementsourcescache.load_proto(self) + assert self._proto for source in self._sources: source._fetch_done(fetched_original) @@ -216,7 +218,7 @@ class ElementSources: # Try to fetch staged sources from remote source cache if self._elementsourcescache.has_fetch_remotes() and self._elementsourcescache.pull(self, self._plugin): - self.fetch_done(False) + self.fetch_done(False, True) return # Otherwise, fetch individual sources @@ -377,8 +379,6 @@ class ElementSources: unique_key = self.get_unique_key() self._cache_key = _cachekey.generate_key(unique_key) - self.query_cache() - # preflight(): # # A internal wrapper for calling the abstract preflight() method on diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py index 626611dac..3ef64d3db 100644 --- a/src/buildstream/_frontend/widget.py +++ b/src/buildstream/_frontend/widget.py @@ -363,6 +363,8 @@ class LogLine(Widget): line = p.fmt_subst(line, "state", "failed", fg="red") elif element._cached_success(): line = p.fmt_subst(line, "state", "cached", fg="magenta") + elif not element._can_query_source_cache(): + line = p.fmt_subst(line, "state", "waiting", fg="blue") elif element._fetch_needed(): line = p.fmt_subst(line, "state", "fetch needed", fg="red") elif element._buildable(): diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py index 3a4183d83..3df1c6e43 100644 --- a/src/buildstream/_scheduler/queues/fetchqueue.py +++ b/src/buildstream/_scheduler/queues/fetchqueue.py @@ -32,15 +32,20 @@ class FetchQueue(Queue): complete_name = "Sources Fetched" resources = [ResourceType.DOWNLOAD] - def __init__(self, scheduler, skip_cached=False, fetch_original=False): + def __init__(self, scheduler, skip_cached=False, check_only=False, fetch_original=False): super().__init__(scheduler) + assert not (fetch_original and check_only) + self._skip_cached = skip_cached + self._check_only = check_only self._should_fetch_original = fetch_original def get_process_func(self): if self._should_fetch_original: return FetchQueue._fetch_original + elif self._check_only: + return FetchQueue._check else: return FetchQueue._fetch_not_original @@ -63,10 +68,13 @@ class FetchQueue(Queue): def done(self, _, element, result, status): - if status is JobStatus.FAIL: + if status is not JobStatus.OK: return - element._fetch_done(self._should_fetch_original) + if not self._check_only: + assert result + + element._fetch_done(self._should_fetch_original, result) def register_pending_element(self, element): # Set a "can_query_cache" callback for an element not yet ready @@ -74,9 +82,13 @@ class FetchQueue(Queue): element._set_can_query_cache_callback(self._enqueue_element) @staticmethod + def _check(element): + return element._fetch(check_only=True) + + @staticmethod def _fetch_not_original(element): - element._fetch(fetch_original=False) + return element._fetch(fetch_original=False) @staticmethod def _fetch_original(element): - element._fetch(fetch_original=True) + return element._fetch(fetch_original=True) diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index dc76edad9..9677a67a5 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -172,8 +172,12 @@ class Stream: # elements (list of Element): The elements to check # def query_cache(self, elements): - # Cache is implicitly queried on demand - pass + with self._context.messenger.timed_activity("Query cache", silent_nested=True): + # Artifact cache is implicitly queried on demand + self._scheduler.clear_queues() + self._add_queue(FetchQueue(self._scheduler, skip_cached=True, check_only=True)) + self._enqueue_plan(elements) + self._run() # shell() # diff --git a/src/buildstream/element.py b/src/buildstream/element.py index 0f2a01ced..6619f93dc 100644 --- a/src/buildstream/element.py +++ b/src/buildstream/element.py @@ -1296,6 +1296,16 @@ class Element(Plugin): # cache cannot be queried until strict cache key is available return self.__artifact is not None + # _can_query_source_cache(): + # + # Returns whether the source cache status is available. + # + # Returns: + # (bool): True if source cache can be queried + # + def _can_query_source_cache(self): + return self.__sources._cached is not None + # _initialize_state() # # Compute up the elment's initial state. Element state contains @@ -1843,11 +1853,12 @@ class Element(Plugin): # # Args: # fetched_original (bool): Whether the original sources had been asked (and fetched) or not + # cached (bool): Whether the sources are now cached in CAS # - def _fetch_done(self, fetched_original): + def _fetch_done(self, fetched_original, cached): assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" - self.__sources.fetch_done(fetched_original) + self.__sources.fetch_done(fetched_original, cached) # _pull_pending() # @@ -1939,7 +1950,11 @@ class Element(Plugin): def _skip_source_push(self): if not self.sources() or self._get_workspace(): return True - return not (self.__sourcecache.has_push_remotes(plugin=self) and self._cached_sources()) + return not ( + self.__sourcecache.has_push_remotes(plugin=self) + and self._can_query_source_cache() + and self._cached_sources() + ) def _source_push(self): return self.__sources.push() @@ -2190,10 +2205,19 @@ class Element(Plugin): # Raises: # SourceError: If one of the element sources has an error # - def _fetch(self, fetch_original=False): + def _fetch(self, check_only=False, fetch_original=False): + assert not (check_only and fetch_original) + if fetch_original: self.__sources.fetch_sources(fetch_original=True) + if self.__sources.query_cache(): + # Already cached + return True + + if check_only: + return False + self.__sources.fetch() if not self.__sources.cached(): @@ -2205,6 +2229,8 @@ class Element(Plugin): "Error trying to stage sources for {}: {}".format(self.name, e), reason="stage-sources-fail" ) + return True + # _calculate_cache_key(): # # Calculates the cache key @@ -2293,7 +2319,8 @@ class Element(Plugin): def _should_fetch(self, fetch_original=False): if fetch_original: return not self.__sources.cached_original() - return not self.__sources.cached() + else: + return True # _set_required_callback() # diff --git a/tests/frontend/fetch.py b/tests/frontend/fetch.py index b2c9d64c2..6c8a4b71c 100644 --- a/tests/frontend/fetch.py +++ b/tests/frontend/fetch.py @@ -62,10 +62,9 @@ def test_fetch_deps(cli, datafiles, deps, expected_states): def test_fetch_consistency_error(cli, datafiles): project = str(datafiles) - # When the error occurs outside of the scheduler at load time, - # then the SourceError is reported directly as the main error. result = cli.run(project=project, args=["source", "fetch", "error.bst"]) - result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error") + result.assert_main_error(ErrorDomain.STREAM, None) + result.assert_task_error(ErrorDomain.SOURCE, "the-consistency-error") @pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror")) @@ -73,7 +72,8 @@ def test_fetch_consistency_bug(cli, datafiles): project = str(datafiles) result = cli.run(project=project, args=["source", "fetch", "bug.bst"]) - result.assert_main_error(ErrorDomain.PLUGIN, "source-bug") + result.assert_main_error(ErrorDomain.STREAM, None) + result.assert_task_error(ErrorDomain.PLUGIN, "source-bug") @pytest.mark.datafiles(DATA_DIR) diff --git a/tests/frontend/track.py b/tests/frontend/track.py index bd8444973..d1a93242b 100644 --- a/tests/frontend/track.py +++ b/tests/frontend/track.py @@ -248,20 +248,22 @@ def test_track_cross_junction(cli, tmpdir, datafiles, cross_junction, ref_storag def test_track_consistency_error(cli, datafiles): project = str(datafiles) - # Track the element causing a consistency error + # Track the element causing a consistency error in `is_cached()` result = cli.run(project=project, args=["source", "track", "error.bst"]) - result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error") + + # We expect tracking to succeed as `is_cached()` is not required for tracking. + result.assert_success() @pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror")) def test_track_consistency_bug(cli, datafiles): project = str(datafiles) - # Track the element causing an unhandled exception + # Track the element causing an unhandled exception in `is_cached()` result = cli.run(project=project, args=["source", "track", "bug.bst"]) - # We expect BuildStream to fail gracefully, with no recorded exception. - result.assert_main_error(ErrorDomain.PLUGIN, "source-bug") + # We expect tracking to succeed as `is_cached()` is not required for tracking. + result.assert_success() @pytest.mark.datafiles(DATA_DIR) diff --git a/tests/sourcecache/fetch.py b/tests/sourcecache/fetch.py index 76f5508f9..1f8911f84 100644 --- a/tests/sourcecache/fetch.py +++ b/tests/sourcecache/fetch.py @@ -75,6 +75,7 @@ def test_source_fetch(cli, tmpdir, datafiles): element = project.load_elements([element_name])[0] element._initialize_state() + element._fetch(check_only=True) assert not element._cached_sources() source = list(element.sources())[0] @@ -117,6 +118,7 @@ def test_source_fetch(cli, tmpdir, datafiles): element._initialize_state() # check that we have the source in the cas now and it's not fetched + element._fetch(check_only=True) assert element._cached_sources() assert os.listdir(os.path.join(str(tmpdir), "cache", "sources", "git")) == [] @@ -135,6 +137,7 @@ def test_fetch_fallback(cli, tmpdir, datafiles): element = project.load_elements([element_name])[0] element._initialize_state() + element._fetch(check_only=True) assert not element._cached_sources() source = list(element.sources())[0] @@ -153,6 +156,7 @@ def test_fetch_fallback(cli, tmpdir, datafiles): # Check that the source in both in the source dir and the local CAS element = project.load_elements([element_name])[0] element._initialize_state() + element._fetch(check_only=True) assert element._cached_sources() @@ -169,6 +173,7 @@ def test_pull_fail(cli, tmpdir, datafiles): element = project.load_elements([element_name])[0] element._initialize_state() + element._fetch(check_only=True) assert not element._cached_sources() source = list(element.sources())[0] @@ -201,6 +206,7 @@ def test_source_pull_partial_fallback_fetch(cli, tmpdir, datafiles): element = project.load_elements([element_name])[0] element._initialize_state() + element._fetch(check_only=True) assert not element._cached_sources() source = list(element.sources())[0] diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py index 25a4309b8..9288446e5 100644 --- a/tests/sourcecache/push.py +++ b/tests/sourcecache/push.py @@ -85,6 +85,7 @@ def test_source_push_split(cli, tmpdir, datafiles): element = project.load_elements(["push.bst"])[0] element._initialize_state() + element._fetch(check_only=True) assert not element._cached_sources() source = list(element.sources())[0] @@ -135,6 +136,7 @@ def test_source_push(cli, tmpdir, datafiles): element = project.load_elements(["push.bst"])[0] element._initialize_state() + element._fetch(check_only=True) assert not element._cached_sources() source = list(element.sources())[0] diff --git a/tests/sourcecache/staging.py b/tests/sourcecache/staging.py index 0f2f05891..6d00c3eed 100644 --- a/tests/sourcecache/staging.py +++ b/tests/sourcecache/staging.py @@ -65,6 +65,7 @@ def test_source_staged(tmpdir, cli, datafiles): # seems to be the only way to get the sources? element = project.load_elements(["import-bin.bst"])[0] element._initialize_state() + element._fetch(check_only=True) source = list(element.sources())[0] assert element._cached_sources() assert sourcecache.contains(source) @@ -100,6 +101,7 @@ def test_source_fetch(tmpdir, cli, datafiles): element = project.load_elements(["import-dev.bst"])[0] element._initialize_state() + element._fetch(check_only=True) source = list(element.sources())[0] assert element._cached_sources() @@ -136,6 +138,7 @@ def test_staged_source_build(tmpdir, datafiles, cli): element._initialize_state() # check consistency of the source + element._fetch(check_only=True) assert not element._cached_sources() res = cli.run(project=project_dir, args=["build", "target.bst"]) diff --git a/tests/sources/git.py b/tests/sources/git.py index 30657d825..861e70cd3 100644 --- a/tests/sources/git.py +++ b/tests/sources/git.py @@ -401,10 +401,14 @@ def test_submodule_track_ignore_inconsistent(cli, tmpdir, datafiles): result = cli.run(project=project, args=["source", "fetch", "target.bst"]) result.assert_success() - # Track will encounter an inconsistent submodule without any ref + # Track to update to the offending commit result = cli.run(project=project, args=["source", "track", "target.bst"]) result.assert_success() + # Fetch after track will encounter an inconsistent submodule without any ref + result = cli.run(project=project, args=["source", "fetch", "target.bst"]) + result.assert_success() + # Assert that we are just fine without it, and emit a warning to the user. assert "Ignoring inconsistent submodule" in result.stderr |