diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2020-04-20 14:57:21 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2020-04-20 14:57:21 +0000 |
commit | 3d77f1583fd020ce5c18da1fa3c01d40784c2abd (patch) | |
tree | e6887c58808b83140bbb459568c38d0591b0a23a | |
parent | 74bbafa4c29caff91574316fb25fe2a5f9ff5fbf (diff) | |
parent | f42205787e33174408012e5de3e3576acaab9f9b (diff) | |
download | buildstream-3d77f1583fd020ce5c18da1fa3c01d40784c2abd.tar.gz |
Merge branch 'juerg/artifact-blob-not-found' into 'master'
Fix handling of missing blobs in `ArtifactCache.pull()`
Closes #1276
See merge request BuildStream/buildstream!1843
-rw-r--r-- | src/buildstream/_artifact.py | 43 | ||||
-rw-r--r-- | src/buildstream/_artifactcache.py | 10 | ||||
-rw-r--r-- | src/buildstream/_cas/cascache.py | 19 | ||||
-rw-r--r-- | src/buildstream/_sourcecache.py | 6 | ||||
-rw-r--r-- | src/buildstream/sandbox/_sandboxremote.py | 7 | ||||
-rw-r--r-- | src/buildstream/storage/_casbaseddirectory.py | 2 | ||||
-rw-r--r-- | tests/frontend/project/elements/random.bst | 1 | ||||
-rw-r--r-- | tests/frontend/project/plugins/randomelement.py | 36 | ||||
-rw-r--r-- | tests/frontend/pull.py | 92 | ||||
-rw-r--r-- | tests/frontend/push.py | 38 |
10 files changed, 180 insertions, 74 deletions
diff --git a/src/buildstream/_artifact.py b/src/buildstream/_artifact.py index 659facba4..0a70d096f 100644 --- a/src/buildstream/_artifact.py +++ b/src/buildstream/_artifact.py @@ -423,8 +423,7 @@ class Artifact: context = self._context - artifact = self._get_proto() - + artifact = self._load_proto() if not artifact: self._cached = False return False @@ -443,11 +442,14 @@ class Artifact: self._cached = False return False - # Check whether public data is available - if not self._cas.contains_file(artifact.public_data): + # Check whether public data and logs are available + logfile_digests = [logfile.digest for logfile in artifact.logs] + digests = [artifact.public_data] + logfile_digests + if not self._cas.contains_files(digests): self._cached = False return False + self._proto = artifact self._cached = True return True @@ -460,16 +462,9 @@ class Artifact: # element not cached or missing logs. # def cached_logs(self): - if not self._element._cached(): - return False - - artifact = self._get_proto() - - for logfile in artifact.logs: - if not self._cas.contains_file(logfile.digest): - return False - - return True + # Log files are currently considered an essential part of an artifact. + # If the artifact is cached, its log files are available as well. + return self._element._cached() # reset_cached() # @@ -477,6 +472,7 @@ class Artifact: # is cached or not. # def reset_cached(self): + self._proto = None self._cached = None # set_cached() @@ -485,18 +481,15 @@ class Artifact: # This is used as optimization when we know the artifact is available. # def set_cached(self): + self._proto = self._load_proto() self._cached = True - # _get_proto() + # load_proto() # # Returns: # (Artifact): Artifact proto # - def _get_proto(self): - # Check if we've already cached the proto object - if self._proto is not None: - return self._proto - + def _load_proto(self): key = self.get_extract_key() proto_path = os.path.join(self._artifactdir, self._element.get_artifact_name(key=key)) @@ -508,9 +501,15 @@ class Artifact: return None os.utime(proto_path) - # Cache the proto object - self._proto = artifact + return artifact + + # _get_proto() + # + # Returns: + # (Artifact): Artifact proto + # + def _get_proto(self): return self._proto # _get_field_digest() diff --git a/src/buildstream/_artifactcache.py b/src/buildstream/_artifactcache.py index f1648e947..9cebeb1a3 100644 --- a/src/buildstream/_artifactcache.py +++ b/src/buildstream/_artifactcache.py @@ -22,6 +22,7 @@ import os import grpc from ._basecache import BaseCache +from ._cas.casremote import BlobNotFound from ._exceptions import ArtifactError, CASError, CacheError, CASRemoteError, RemoteError from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, artifact_pb2, artifact_pb2_grpc @@ -281,7 +282,6 @@ class ArtifactCache(BaseCache): element.status("Pulling artifact {} <- {}".format(display_key, remote)) artifact = self._pull_artifact_proto(element, key, remote) if artifact: - element.info("Pulled artifact {} <- {}".format(display_key, remote)) break element.info("Remote ({}) does not have artifact {} cached".format(remote, display_key)) @@ -307,10 +307,14 @@ class ArtifactCache(BaseCache): element.status("Pulling data for artifact {} <- {}".format(display_key, remote)) if self._pull_artifact_storage(element, artifact, remote, pull_buildtrees=pull_buildtrees): - element.info("Pulled data for artifact {} <- {}".format(display_key, remote)) + element.info("Pulled artifact {} <- {}".format(display_key, remote)) return True element.info("Remote ({}) does not have artifact {} cached".format(remote, display_key)) + except BlobNotFound as e: + # Not all blobs are available on this remote + element.info("Remote cas ({}) does not have blob {} cached".format(remote, e.blob)) + continue except CASError as e: element.warn("Could not pull from remote {}: {}".format(remote, e)) errors.append(e) @@ -401,7 +405,7 @@ class ArtifactCache(BaseCache): remote.init() # fetch_blobs() will return the blobs that are still missing - missing_blobs = self.cas.fetch_blobs(remote, missing_blobs) + missing_blobs = self.cas.fetch_blobs(remote, missing_blobs, allow_partial=True) if missing_blobs: raise ArtifactError("Blobs not found on configured artifact servers") diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index 03be75c72..74912c4e2 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -165,20 +165,20 @@ class CASCache: self._casd_process_manager.release_resources(messenger) self._casd_process_manager = None - # contains_file(): + # contains_files(): # - # Check whether a digest corresponds to a file which exists in CAS + # Check whether file digests exist in the local CAS cache # # Args: # digest (Digest): The file digest to check # - # Returns: True if the file is in the cache, False otherwise + # Returns: True if the files are in the cache, False otherwise # - def contains_file(self, digest): + def contains_files(self, digests): cas = self.get_cas() request = remote_execution_pb2.FindMissingBlobsRequest() - request.blob_digests.append(digest) + request.blob_digests.extend(digests) response = cas.FindMissingBlobs(request) return len(response.missing_blob_digests) == 0 @@ -647,16 +647,19 @@ class CASCache: # fetch_blobs(): # - # Fetch blobs from remote CAS. Returns missing blobs that could not be fetched. + # Fetch blobs from remote CAS. Optionally returns missing blobs that could + # not be fetched. # # Args: # remote (CASRemote): The remote repository to fetch from # digests (list): The Digests of blobs to fetch + # allow_partial (bool): True to return missing blobs, False to raise a + # BlobNotFound error if a blob is missing # # Returns: The Digests of the blobs that were not available on the remote CAS # - def fetch_blobs(self, remote, digests): - missing_blobs = [] + def fetch_blobs(self, remote, digests, *, allow_partial=False): + missing_blobs = [] if allow_partial else None remote.init() diff --git a/src/buildstream/_sourcecache.py b/src/buildstream/_sourcecache.py index e485fbd47..4533a2580 100644 --- a/src/buildstream/_sourcecache.py +++ b/src/buildstream/_sourcecache.py @@ -242,11 +242,7 @@ class SourceCache(BaseCache): self.cas._fetch_directory(remote, source_proto.files) required_blobs = self.cas.required_blobs_for_directory(source_proto.files) missing_blobs = self.cas.local_missing_blobs(required_blobs) - missing_blobs = self.cas.fetch_blobs(remote, missing_blobs) - - if missing_blobs: - source.info("Remote cas ({}) does not have source {} cached".format(remote, display_key)) - continue + self.cas.fetch_blobs(remote, missing_blobs) source.info("Pulled source {} <- {}".format(display_key, remote)) return True diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py index 3dcbb2ccc..5b03852f6 100644 --- a/src/buildstream/sandbox/_sandboxremote.py +++ b/src/buildstream/sandbox/_sandboxremote.py @@ -291,12 +291,7 @@ class SandboxRemote(SandboxREAPI): blobs_to_fetch = artifactcache.find_missing_blobs(project, local_missing_blobs) with CASRemote(self.storage_remote_spec, cascache) as casremote: - remote_missing_blobs = cascache.fetch_blobs(casremote, blobs_to_fetch) - - if remote_missing_blobs: - raise SandboxError( - "{} output files are missing on the CAS server".format(len(remote_missing_blobs)) - ) + cascache.fetch_blobs(casremote, blobs_to_fetch) def _execute_action(self, action, flags): stdout, stderr = self._get_output() diff --git a/src/buildstream/storage/_casbaseddirectory.py b/src/buildstream/storage/_casbaseddirectory.py index b8b5ca09c..e33bdc3d7 100644 --- a/src/buildstream/storage/_casbaseddirectory.py +++ b/src/buildstream/storage/_casbaseddirectory.py @@ -754,6 +754,8 @@ class CasBasedDirectory(Directory): raise FileExistsError("{} already exists in {}".format(path[-1], str(subdir))) with utils._tempnamedfile(mode, encoding=encoding, dir=self.cas_cache.tmpdir) as f: + # Make sure the temporary file is readable by buildbox-casd + os.chmod(f.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH) yield f # Import written temporary file into CAS f.flush() diff --git a/tests/frontend/project/elements/random.bst b/tests/frontend/project/elements/random.bst new file mode 100644 index 000000000..2478dfa2a --- /dev/null +++ b/tests/frontend/project/elements/random.bst @@ -0,0 +1 @@ +kind: randomelement diff --git a/tests/frontend/project/plugins/randomelement.py b/tests/frontend/project/plugins/randomelement.py new file mode 100644 index 000000000..b36b75c8a --- /dev/null +++ b/tests/frontend/project/plugins/randomelement.py @@ -0,0 +1,36 @@ +import os + +from buildstream import Element + + +class RandomElement(Element): + BST_VIRTUAL_DIRECTORY = True + + def configure(self, node): + pass + + def preflight(self): + pass + + def get_unique_key(self): + pass + + def configure_sandbox(self, sandbox): + pass + + def stage(self, sandbox): + pass + + def assemble(self, sandbox): + rootdir = sandbox.get_virtual_directory() + outputdir = rootdir.descend("output", create=True) + + # Generate non-reproducible output + with outputdir.open_file("random", mode="wb") as f: + f.write(os.urandom(64)) + + return "/output" + + +def setup(): + return RandomElement diff --git a/tests/frontend/pull.py b/tests/frontend/pull.py index 3ae394fd1..1845f320e 100644 --- a/tests/frontend/pull.py +++ b/tests/frontend/pull.py @@ -8,7 +8,13 @@ import pytest from buildstream import utils, _yaml from buildstream.testing import cli # pylint: disable=unused-import from buildstream.testing import create_repo -from tests.testutils import create_artifact_share, generate_junction, assert_shared, assert_not_shared +from tests.testutils import ( + create_artifact_share, + create_split_share, + generate_junction, + assert_shared, + assert_not_shared, +) # Project directory @@ -227,46 +233,74 @@ def test_push_pull_cross_junction(cli, tmpdir, datafiles): assert cli.get_element_state(project, "junction.bst:import-etc.bst") == "cached" +def _test_pull_missing_blob(cli, project, index, storage): + # First build the target element and push to the remote. + result = cli.run(project=project, args=["build", "target.bst"]) + result.assert_success() + assert cli.get_element_state(project, "target.bst") == "cached" + + # Assert that everything is now cached in the remote. + all_elements = ["target.bst", "import-bin.bst", "import-dev.bst", "compose-all.bst"] + for element_name in all_elements: + project_name = "test" + artifact_name = cli.get_artifact_name(project, project_name, element_name) + artifact_proto = index.get_artifact_proto(artifact_name) + assert artifact_proto + assert storage.get_cas_files(artifact_proto) + + # Now we've pushed, delete the user's local artifact cache + # directory and try to redownload it from the share + # + casdir = os.path.join(cli.directory, "cas") + shutil.rmtree(casdir) + artifactdir = os.path.join(cli.directory, "artifacts") + shutil.rmtree(artifactdir) + + # Assert that nothing is cached locally anymore + for element_name in all_elements: + assert cli.get_element_state(project, element_name) != "cached" + + # Now delete blobs in the remote without deleting the artifact ref. + # This simulates scenarios with concurrent artifact expiry. + remote_objdir = os.path.join(storage.repodir, "cas", "objects") + shutil.rmtree(remote_objdir) + + # Now try bst build + result = cli.run(project=project, args=["build", "target.bst"]) + result.assert_success() + + # Assert that no artifacts were pulled + assert not result.get_pulled_elements() + + @pytest.mark.datafiles(DATA_DIR) def test_pull_missing_blob(cli, tmpdir, datafiles): project = str(datafiles) with create_artifact_share(os.path.join(str(tmpdir), "artifactshare")) as share: - - # First build the target element and push to the remote. cli.configure({"artifacts": {"url": share.repo, "push": True}}) - result = cli.run(project=project, args=["build", "target.bst"]) - result.assert_success() - assert cli.get_element_state(project, "target.bst") == "cached" - # Assert that everything is now cached in the remote. - all_elements = ["target.bst", "import-bin.bst", "import-dev.bst", "compose-all.bst"] - for element_name in all_elements: - assert_shared(cli, share, project, element_name) + _test_pull_missing_blob(cli, project, share, share) - # Now we've pushed, delete the user's local artifact cache - # directory and try to redownload it from the share - # - casdir = os.path.join(cli.directory, "cas") - shutil.rmtree(casdir) - artifactdir = os.path.join(cli.directory, "artifacts") - shutil.rmtree(artifactdir) - # Assert that nothing is cached locally anymore - for element_name in all_elements: - assert cli.get_element_state(project, element_name) != "cached" +@pytest.mark.datafiles(DATA_DIR) +def test_pull_missing_blob_split_share(cli, tmpdir, datafiles): + project = str(datafiles) - # Now delete blobs in the remote without deleting the artifact ref. - # This simulates scenarios with concurrent artifact expiry. - remote_objdir = os.path.join(share.repodir, "cas", "objects") - shutil.rmtree(remote_objdir) + indexshare = os.path.join(str(tmpdir), "indexshare") + storageshare = os.path.join(str(tmpdir), "storageshare") - # Now try bst build - result = cli.run(project=project, args=["build", "target.bst"]) - result.assert_success() + with create_split_share(indexshare, storageshare) as (index, storage): + cli.configure( + { + "artifacts": [ + {"url": index.repo, "push": True, "type": "index"}, + {"url": storage.repo, "push": True, "type": "storage"}, + ] + } + ) - # Assert that no artifacts were pulled - assert not result.get_pulled_elements() + _test_pull_missing_blob(cli, project, index, storage) @pytest.mark.datafiles(DATA_DIR) diff --git a/tests/frontend/push.py b/tests/frontend/push.py index e9dfa2c6a..c2f52c514 100644 --- a/tests/frontend/push.py +++ b/tests/frontend/push.py @@ -24,10 +24,11 @@ # pylint: disable=redefined-outer-name import os +import shutil import pytest from buildstream.exceptions import ErrorDomain -from buildstream.testing import cli # pylint: disable=unused-import +from buildstream.testing import cli, generate_project # pylint: disable=unused-import from tests.testutils import ( create_artifact_share, create_element_size, @@ -627,3 +628,38 @@ def test_push_no_strict(caplog, cli, tmpdir, datafiles, buildtrees): args += ["artifact", "push", "--deps", "all", "target.bst"] result = cli.run(project=project, args=args) result.assert_success() + + +# Test that push works after rebuilding an incomplete artifact +# of a non-reproducible element. +@pytest.mark.datafiles(DATA_DIR) +def test_push_after_rebuild(cli, tmpdir, datafiles): + project = os.path.join(datafiles.dirname, datafiles.basename) + + generate_project( + project, + config={ + "element-path": "elements", + "plugins": [{"origin": "local", "path": "plugins", "elements": {"randomelement": 0}}], + }, + ) + + # First build the element + result = cli.run(project=project, args=["build", "random.bst"]) + result.assert_success() + assert cli.get_element_state(project, "random.bst") == "cached" + + # Delete the artifact blobs but keep the artifact proto, + # i.e., now we have an incomplete artifact + casdir = os.path.join(cli.directory, "cas") + shutil.rmtree(casdir) + assert cli.get_element_state(project, "random.bst") != "cached" + + with create_artifact_share(os.path.join(str(tmpdir), "artifactshare")) as share: + cli.configure({"artifacts": {"url": share.repo, "push": True}}) + + # Now rebuild the element and push it + result = cli.run(project=project, args=["build", "random.bst"]) + result.assert_success() + assert result.get_pushed_elements() == ["random.bst"] + assert cli.get_element_state(project, "random.bst") == "cached" |