# # Copyright (C) 2019 Bloomberg Finance LP # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . # # Authors: # Raoul Hidalgo Charman # # Pylint doesn't play well with fixtures and dependency injection from pytest # pylint: disable=redefined-outer-name from contextlib import contextmanager import os import shutil import pytest from buildstream.exceptions import ErrorDomain from buildstream._project import Project from buildstream import _yaml from buildstream.testing import cli # pylint: disable=unused-import from buildstream.testing import create_repo from tests.testutils import create_artifact_share, dummy_context DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "project") def create_test_element(tmpdir, project_dir): repo = create_repo("git", str(tmpdir)) ref = repo.create(os.path.join(project_dir, "files")) element_path = os.path.join(project_dir, "elements") element_name = "fetch.bst" element = {"kind": "import", "sources": [repo.source_config(ref=ref)]} _yaml.roundtrip_dump(element, os.path.join(element_path, element_name)) return element_name, repo, ref @contextmanager def context_with_source_cache(cli, cache, share, tmpdir): user_config_file = str(tmpdir.join("buildstream.conf")) user_config = { "scheduler": {"pushers": 1}, "source-caches": {"url": share.repo,}, "cachedir": cache, } _yaml.roundtrip_dump(user_config, file=user_config_file) cli.configure(user_config) with dummy_context(config=user_config_file) as context: yield context @pytest.mark.datafiles(DATA_DIR) def test_source_fetch(cli, tmpdir, datafiles): project_dir = str(datafiles) element_name, _repo, _ref = create_test_element(tmpdir, project_dir) cache_dir = os.path.join(str(tmpdir), "cache") # use artifact cache for sources for now, they should work the same with create_artifact_share(os.path.join(str(tmpdir), "sourceshare")) as share: with context_with_source_cache(cli, cache_dir, share, tmpdir) as context: project = Project(project_dir, context) project.ensure_fully_loaded() element = project.load_elements([element_name])[0] element._fetch(check_only=True) assert not element._cached_sources() source = list(element.sources())[0] assert not share.get_source_proto(source._get_source_name()) # Just check that we sensibly fetch and build the element res = cli.run(project=project_dir, args=["build", element_name]) res.assert_success() assert os.listdir(os.path.join(str(tmpdir), "cache", "sources", "git")) != [] # get root digest of source sourcecache = context.sourcecache digest = sourcecache.export(source)._get_digest() # Push the source to the remote res = cli.run(project=project_dir, args=["source", "push", "--remote", share.repo, element_name]) res.assert_success() # check the share has the proto and the object assert share.get_source_proto(source._get_source_name()) assert share.has_object(digest) # Delete the source locally shutil.rmtree(os.path.join(str(cache_dir), "sources")) shutil.rmtree(os.path.join(str(cache_dir), "cas")) state = cli.get_element_state(project_dir, element_name) assert state == "fetch needed" # Now fetch the source and check res = cli.run(project=project_dir, args=["source", "fetch", element_name]) res.assert_success() assert "Pulled source" in res.stderr with context_with_source_cache(cli, cache_dir, share, tmpdir) as context: project = Project(project_dir, context) project.ensure_fully_loaded() element = project.load_elements([element_name])[0] # check that we have the source in the cas now and it's not fetched element._fetch(check_only=True) assert element._cached_sources() assert os.listdir(os.path.join(str(tmpdir), "cache", "sources", "git")) == [] @pytest.mark.datafiles(DATA_DIR) def test_fetch_fallback(cli, tmpdir, datafiles): project_dir = str(datafiles) element_name, repo, ref = create_test_element(tmpdir, project_dir) cache_dir = os.path.join(str(tmpdir), "cache") # use artifact cache for sources for now, they should work the same with create_artifact_share(os.path.join(str(tmpdir), "sourceshare")) as share: with context_with_source_cache(cli, cache_dir, share, tmpdir) as context: project = Project(project_dir, context) project.ensure_fully_loaded() element = project.load_elements([element_name])[0] element._fetch(check_only=True) assert not element._cached_sources() source = list(element.sources())[0] assert not share.get_source_proto(source._get_source_name()) assert not os.path.exists(os.path.join(cache_dir, "sources")) # Now check if it falls back to the source fetch method. res = cli.run(project=project_dir, args=["source", "fetch", element_name]) res.assert_success() brief_key = source._get_brief_display_key() assert ( "Remote source service ({}) does not have source {} cached".format(share.repo, brief_key) ) in res.stderr assert ("SUCCESS Fetching from {}".format(repo.source_config(ref=ref)["url"])) in res.stderr # Check that the source in both in the source dir and the local CAS element = project.load_elements([element_name])[0] element._fetch(check_only=True) assert element._cached_sources() @pytest.mark.datafiles(DATA_DIR) def test_pull_fail(cli, tmpdir, datafiles): project_dir = str(datafiles) element_name, repo, _ref = create_test_element(tmpdir, project_dir) cache_dir = os.path.join(str(tmpdir), "cache") with create_artifact_share(os.path.join(str(tmpdir), "sourceshare")) as share: with context_with_source_cache(cli, cache_dir, share, tmpdir) as context: project = Project(project_dir, context) project.ensure_fully_loaded() element = project.load_elements([element_name])[0] element._fetch(check_only=True) assert not element._cached_sources() source = list(element.sources())[0] # remove files and check that it doesn't build shutil.rmtree(repo.repo) # Should fail in stream, with a plugin task causing the error res = cli.run(project=project_dir, args=["build", element_name]) res.assert_main_error(ErrorDomain.STREAM, None) res.assert_task_error(ErrorDomain.PLUGIN, None) assert ( "Remote source service ({}) does not have source {} cached".format( share.repo, source._get_brief_display_key() ) in res.stderr ) @pytest.mark.datafiles(DATA_DIR) def test_source_pull_partial_fallback_fetch(cli, tmpdir, datafiles): project_dir = str(datafiles) element_name, repo, ref = create_test_element(tmpdir, project_dir) cache_dir = os.path.join(str(tmpdir), "cache") # use artifact cache for sources for now, they should work the same with create_artifact_share(os.path.join(str(tmpdir), "sourceshare")) as share: with context_with_source_cache(cli, cache_dir, share, tmpdir) as context: project = Project(project_dir, context) project.ensure_fully_loaded() element = project.load_elements([element_name])[0] element._fetch(check_only=True) assert not element._cached_sources() source = list(element.sources())[0] assert not share.get_artifact_proto(source._get_source_name()) # Just check that we sensibly fetch and build the element res = cli.run(project=project_dir, args=["build", element_name]) res.assert_success() assert os.listdir(os.path.join(str(tmpdir), "cache", "sources", "git")) != [] # get root digest of source sourcecache = context.sourcecache digest = sourcecache.export(source)._get_digest() # Push the source to the remote res = cli.run(project=project_dir, args=["source", "push", "--remote", share.repo, element_name]) res.assert_success() # Remove the cas content, only keep the proto and such around shutil.rmtree(os.path.join(str(tmpdir), "sourceshare", "repo", "cas", "objects")) # check the share doesn't have the object assert not share.has_object(digest) # Delete the source locally shutil.rmtree(os.path.join(str(cache_dir), "sources")) shutil.rmtree(os.path.join(str(cache_dir), "cas")) state = cli.get_element_state(project_dir, element_name) assert state == "fetch needed" # Now fetch the source and check res = cli.run(project=project_dir, args=["source", "fetch", element_name]) res.assert_success() assert ("SUCCESS Fetching from {}".format(repo.source_config(ref=ref)["url"])) in res.stderr