summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2020-09-17 17:33:06 +0200
committerJürg Billeter <j@bitron.ch>2020-12-14 10:41:51 +0100
commitfa5e01bbb0126e112df1b69ef6ed7d3a3beaa86b (patch)
tree43dd061762006d0f252cc84a9b4a7576dcb61c8b
parent2a1b85a203dd088f262aaa49c95d880027a4d367 (diff)
downloadbuildstream-fa5e01bbb0126e112df1b69ef6ed7d3a3beaa86b.tar.gz
Move source cache query to fetch job
-rw-r--r--src/buildstream/_elementsources.py14
-rw-r--r--src/buildstream/_frontend/widget.py2
-rw-r--r--src/buildstream/_scheduler/queues/fetchqueue.py22
-rw-r--r--src/buildstream/_stream.py8
-rw-r--r--src/buildstream/element.py37
-rw-r--r--tests/frontend/fetch.py8
-rw-r--r--tests/frontend/track.py12
-rw-r--r--tests/sourcecache/fetch.py6
-rw-r--r--tests/sourcecache/push.py2
-rw-r--r--tests/sourcecache/staging.py3
-rw-r--r--tests/sources/git.py6
11 files changed, 91 insertions, 29 deletions
diff --git a/src/buildstream/_elementsources.py b/src/buildstream/_elementsources.py
index a3e510543..f5aaa56d4 100644
--- a/src/buildstream/_elementsources.py
+++ b/src/buildstream/_elementsources.py
@@ -153,11 +153,13 @@ class ElementSources:
#
# Args:
# fetched_original (bool): Whether the original sources had been asked (and fetched) or not
+ # cached (bool): Whether the sources are now cached in CAS
#
- def fetch_done(self, fetched_original):
- self._proto = self._elementsourcescache.load_proto(self)
- assert self._proto
- self._cached = True
+ def fetch_done(self, fetched_original, cached):
+ self._cached = cached
+ if cached:
+ self._proto = self._elementsourcescache.load_proto(self)
+ assert self._proto
for source in self._sources:
source._fetch_done(fetched_original)
@@ -216,7 +218,7 @@ class ElementSources:
# Try to fetch staged sources from remote source cache
if self._elementsourcescache.has_fetch_remotes() and self._elementsourcescache.pull(self, self._plugin):
- self.fetch_done(False)
+ self.fetch_done(False, True)
return
# Otherwise, fetch individual sources
@@ -377,8 +379,6 @@ class ElementSources:
unique_key = self.get_unique_key()
self._cache_key = _cachekey.generate_key(unique_key)
- self.query_cache()
-
# preflight():
#
# A internal wrapper for calling the abstract preflight() method on
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index 626611dac..3ef64d3db 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -363,6 +363,8 @@ class LogLine(Widget):
line = p.fmt_subst(line, "state", "failed", fg="red")
elif element._cached_success():
line = p.fmt_subst(line, "state", "cached", fg="magenta")
+ elif not element._can_query_source_cache():
+ line = p.fmt_subst(line, "state", "waiting", fg="blue")
elif element._fetch_needed():
line = p.fmt_subst(line, "state", "fetch needed", fg="red")
elif element._buildable():
diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py
index 3a4183d83..3df1c6e43 100644
--- a/src/buildstream/_scheduler/queues/fetchqueue.py
+++ b/src/buildstream/_scheduler/queues/fetchqueue.py
@@ -32,15 +32,20 @@ class FetchQueue(Queue):
complete_name = "Sources Fetched"
resources = [ResourceType.DOWNLOAD]
- def __init__(self, scheduler, skip_cached=False, fetch_original=False):
+ def __init__(self, scheduler, skip_cached=False, check_only=False, fetch_original=False):
super().__init__(scheduler)
+ assert not (fetch_original and check_only)
+
self._skip_cached = skip_cached
+ self._check_only = check_only
self._should_fetch_original = fetch_original
def get_process_func(self):
if self._should_fetch_original:
return FetchQueue._fetch_original
+ elif self._check_only:
+ return FetchQueue._check
else:
return FetchQueue._fetch_not_original
@@ -63,10 +68,13 @@ class FetchQueue(Queue):
def done(self, _, element, result, status):
- if status is JobStatus.FAIL:
+ if status is not JobStatus.OK:
return
- element._fetch_done(self._should_fetch_original)
+ if not self._check_only:
+ assert result
+
+ element._fetch_done(self._should_fetch_original, result)
def register_pending_element(self, element):
# Set a "can_query_cache" callback for an element not yet ready
@@ -74,9 +82,13 @@ class FetchQueue(Queue):
element._set_can_query_cache_callback(self._enqueue_element)
@staticmethod
+ def _check(element):
+ return element._fetch(check_only=True)
+
+ @staticmethod
def _fetch_not_original(element):
- element._fetch(fetch_original=False)
+ return element._fetch(fetch_original=False)
@staticmethod
def _fetch_original(element):
- element._fetch(fetch_original=True)
+ return element._fetch(fetch_original=True)
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index dc76edad9..9677a67a5 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -172,8 +172,12 @@ class Stream:
# elements (list of Element): The elements to check
#
def query_cache(self, elements):
- # Cache is implicitly queried on demand
- pass
+ with self._context.messenger.timed_activity("Query cache", silent_nested=True):
+ # Artifact cache is implicitly queried on demand
+ self._scheduler.clear_queues()
+ self._add_queue(FetchQueue(self._scheduler, skip_cached=True, check_only=True))
+ self._enqueue_plan(elements)
+ self._run()
# shell()
#
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 0f2a01ced..6619f93dc 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -1296,6 +1296,16 @@ class Element(Plugin):
# cache cannot be queried until strict cache key is available
return self.__artifact is not None
+ # _can_query_source_cache():
+ #
+ # Returns whether the source cache status is available.
+ #
+ # Returns:
+ # (bool): True if source cache can be queried
+ #
+ def _can_query_source_cache(self):
+ return self.__sources._cached is not None
+
# _initialize_state()
#
# Compute up the elment's initial state. Element state contains
@@ -1843,11 +1853,12 @@ class Element(Plugin):
#
# Args:
# fetched_original (bool): Whether the original sources had been asked (and fetched) or not
+ # cached (bool): Whether the sources are now cached in CAS
#
- def _fetch_done(self, fetched_original):
+ def _fetch_done(self, fetched_original, cached):
assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
- self.__sources.fetch_done(fetched_original)
+ self.__sources.fetch_done(fetched_original, cached)
# _pull_pending()
#
@@ -1939,7 +1950,11 @@ class Element(Plugin):
def _skip_source_push(self):
if not self.sources() or self._get_workspace():
return True
- return not (self.__sourcecache.has_push_remotes(plugin=self) and self._cached_sources())
+ return not (
+ self.__sourcecache.has_push_remotes(plugin=self)
+ and self._can_query_source_cache()
+ and self._cached_sources()
+ )
def _source_push(self):
return self.__sources.push()
@@ -2190,10 +2205,19 @@ class Element(Plugin):
# Raises:
# SourceError: If one of the element sources has an error
#
- def _fetch(self, fetch_original=False):
+ def _fetch(self, check_only=False, fetch_original=False):
+ assert not (check_only and fetch_original)
+
if fetch_original:
self.__sources.fetch_sources(fetch_original=True)
+ if self.__sources.query_cache():
+ # Already cached
+ return True
+
+ if check_only:
+ return False
+
self.__sources.fetch()
if not self.__sources.cached():
@@ -2205,6 +2229,8 @@ class Element(Plugin):
"Error trying to stage sources for {}: {}".format(self.name, e), reason="stage-sources-fail"
)
+ return True
+
# _calculate_cache_key():
#
# Calculates the cache key
@@ -2293,7 +2319,8 @@ class Element(Plugin):
def _should_fetch(self, fetch_original=False):
if fetch_original:
return not self.__sources.cached_original()
- return not self.__sources.cached()
+ else:
+ return True
# _set_required_callback()
#
diff --git a/tests/frontend/fetch.py b/tests/frontend/fetch.py
index b2c9d64c2..6c8a4b71c 100644
--- a/tests/frontend/fetch.py
+++ b/tests/frontend/fetch.py
@@ -62,10 +62,9 @@ def test_fetch_deps(cli, datafiles, deps, expected_states):
def test_fetch_consistency_error(cli, datafiles):
project = str(datafiles)
- # When the error occurs outside of the scheduler at load time,
- # then the SourceError is reported directly as the main error.
result = cli.run(project=project, args=["source", "fetch", "error.bst"])
- result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error")
+ result.assert_main_error(ErrorDomain.STREAM, None)
+ result.assert_task_error(ErrorDomain.SOURCE, "the-consistency-error")
@pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror"))
@@ -73,7 +72,8 @@ def test_fetch_consistency_bug(cli, datafiles):
project = str(datafiles)
result = cli.run(project=project, args=["source", "fetch", "bug.bst"])
- result.assert_main_error(ErrorDomain.PLUGIN, "source-bug")
+ result.assert_main_error(ErrorDomain.STREAM, None)
+ result.assert_task_error(ErrorDomain.PLUGIN, "source-bug")
@pytest.mark.datafiles(DATA_DIR)
diff --git a/tests/frontend/track.py b/tests/frontend/track.py
index bd8444973..d1a93242b 100644
--- a/tests/frontend/track.py
+++ b/tests/frontend/track.py
@@ -248,20 +248,22 @@ def test_track_cross_junction(cli, tmpdir, datafiles, cross_junction, ref_storag
def test_track_consistency_error(cli, datafiles):
project = str(datafiles)
- # Track the element causing a consistency error
+ # Track the element causing a consistency error in `is_cached()`
result = cli.run(project=project, args=["source", "track", "error.bst"])
- result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error")
+
+ # We expect tracking to succeed as `is_cached()` is not required for tracking.
+ result.assert_success()
@pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror"))
def test_track_consistency_bug(cli, datafiles):
project = str(datafiles)
- # Track the element causing an unhandled exception
+ # Track the element causing an unhandled exception in `is_cached()`
result = cli.run(project=project, args=["source", "track", "bug.bst"])
- # We expect BuildStream to fail gracefully, with no recorded exception.
- result.assert_main_error(ErrorDomain.PLUGIN, "source-bug")
+ # We expect tracking to succeed as `is_cached()` is not required for tracking.
+ result.assert_success()
@pytest.mark.datafiles(DATA_DIR)
diff --git a/tests/sourcecache/fetch.py b/tests/sourcecache/fetch.py
index 76f5508f9..1f8911f84 100644
--- a/tests/sourcecache/fetch.py
+++ b/tests/sourcecache/fetch.py
@@ -75,6 +75,7 @@ def test_source_fetch(cli, tmpdir, datafiles):
element = project.load_elements([element_name])[0]
element._initialize_state()
+ element._fetch(check_only=True)
assert not element._cached_sources()
source = list(element.sources())[0]
@@ -117,6 +118,7 @@ def test_source_fetch(cli, tmpdir, datafiles):
element._initialize_state()
# check that we have the source in the cas now and it's not fetched
+ element._fetch(check_only=True)
assert element._cached_sources()
assert os.listdir(os.path.join(str(tmpdir), "cache", "sources", "git")) == []
@@ -135,6 +137,7 @@ def test_fetch_fallback(cli, tmpdir, datafiles):
element = project.load_elements([element_name])[0]
element._initialize_state()
+ element._fetch(check_only=True)
assert not element._cached_sources()
source = list(element.sources())[0]
@@ -153,6 +156,7 @@ def test_fetch_fallback(cli, tmpdir, datafiles):
# Check that the source in both in the source dir and the local CAS
element = project.load_elements([element_name])[0]
element._initialize_state()
+ element._fetch(check_only=True)
assert element._cached_sources()
@@ -169,6 +173,7 @@ def test_pull_fail(cli, tmpdir, datafiles):
element = project.load_elements([element_name])[0]
element._initialize_state()
+ element._fetch(check_only=True)
assert not element._cached_sources()
source = list(element.sources())[0]
@@ -201,6 +206,7 @@ def test_source_pull_partial_fallback_fetch(cli, tmpdir, datafiles):
element = project.load_elements([element_name])[0]
element._initialize_state()
+ element._fetch(check_only=True)
assert not element._cached_sources()
source = list(element.sources())[0]
diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py
index 25a4309b8..9288446e5 100644
--- a/tests/sourcecache/push.py
+++ b/tests/sourcecache/push.py
@@ -85,6 +85,7 @@ def test_source_push_split(cli, tmpdir, datafiles):
element = project.load_elements(["push.bst"])[0]
element._initialize_state()
+ element._fetch(check_only=True)
assert not element._cached_sources()
source = list(element.sources())[0]
@@ -135,6 +136,7 @@ def test_source_push(cli, tmpdir, datafiles):
element = project.load_elements(["push.bst"])[0]
element._initialize_state()
+ element._fetch(check_only=True)
assert not element._cached_sources()
source = list(element.sources())[0]
diff --git a/tests/sourcecache/staging.py b/tests/sourcecache/staging.py
index 0f2f05891..6d00c3eed 100644
--- a/tests/sourcecache/staging.py
+++ b/tests/sourcecache/staging.py
@@ -65,6 +65,7 @@ def test_source_staged(tmpdir, cli, datafiles):
# seems to be the only way to get the sources?
element = project.load_elements(["import-bin.bst"])[0]
element._initialize_state()
+ element._fetch(check_only=True)
source = list(element.sources())[0]
assert element._cached_sources()
assert sourcecache.contains(source)
@@ -100,6 +101,7 @@ def test_source_fetch(tmpdir, cli, datafiles):
element = project.load_elements(["import-dev.bst"])[0]
element._initialize_state()
+ element._fetch(check_only=True)
source = list(element.sources())[0]
assert element._cached_sources()
@@ -136,6 +138,7 @@ def test_staged_source_build(tmpdir, datafiles, cli):
element._initialize_state()
# check consistency of the source
+ element._fetch(check_only=True)
assert not element._cached_sources()
res = cli.run(project=project_dir, args=["build", "target.bst"])
diff --git a/tests/sources/git.py b/tests/sources/git.py
index 30657d825..861e70cd3 100644
--- a/tests/sources/git.py
+++ b/tests/sources/git.py
@@ -401,10 +401,14 @@ def test_submodule_track_ignore_inconsistent(cli, tmpdir, datafiles):
result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- # Track will encounter an inconsistent submodule without any ref
+ # Track to update to the offending commit
result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
+ # Fetch after track will encounter an inconsistent submodule without any ref
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
+ result.assert_success()
+
# Assert that we are just fine without it, and emit a warning to the user.
assert "Ignoring inconsistent submodule" in result.stderr