path: root/buildstream/_artifactcache/
diff options
Diffstat (limited to 'buildstream/_artifactcache/')
1 files changed, 0 insertions, 377 deletions
diff --git a/buildstream/_artifactcache/ b/buildstream/_artifactcache/
deleted file mode 100644
index 707a974eb..000000000
--- a/buildstream/_artifactcache/
+++ /dev/null
@@ -1,377 +0,0 @@
-# Copyright (C) 2017-2018 Codethink Limited
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2 of the License, or (at your option) any later version.
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# Lesser General Public License for more details.
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library. If not, see <>.
-# Authors:
-# Jürg Billeter <>
-import multiprocessing
-import os
-import signal
-import tempfile
-from .. import _ostree, _signals, utils
-from .._exceptions import ArtifactError
-from .._ostree import OSTreeError
-from . import ArtifactCache
-from .pushreceive import initialize_push_connection
-from .pushreceive import push as push_artifact
-from .pushreceive import PushException
-# An OSTreeCache manages artifacts in an OSTree repository
-# Args:
-# context (Context): The BuildStream context
-# project (Project): The BuildStream project
-# enable_push (bool): Whether pushing is allowed by the platform
-# Pushing is explicitly disabled by the platform in some cases,
-# like when we are falling back to functioning without using
-# user namespaces.
-class OSTreeCache(ArtifactCache):
- def __init__(self, context, *, enable_push):
- super().__init__(context)
- self.enable_push = enable_push
- ostreedir = os.path.join(context.artifactdir, 'ostree')
- self.repo = _ostree.ensure(ostreedir, False)
- # Per-project list of OSTreeRemote instances.
- self._remotes = {}
- self._has_fetch_remotes = False
- self._has_push_remotes = False
- ################################################
- # Implementation of abstract methods #
- ################################################
- def has_fetch_remotes(self, *, element=None):
- if not self._has_fetch_remotes:
- # No project has push remotes
- return False
- elif element is None:
- # At least one (sub)project has fetch remotes
- return True
- else:
- # Check whether the specified element's project has fetch remotes
- remotes_for_project = self._remotes[element._get_project()]
- return bool(remotes_for_project)
- def has_push_remotes(self, *, element=None):
- if not self._has_push_remotes:
- # No project has push remotes
- return False
- elif element is None:
- # At least one (sub)project has push remotes
- return True
- else:
- # Check whether the specified element's project has push remotes
- remotes_for_project = self._remotes[element._get_project()]
- return any(remote.spec.push for remote in remotes_for_project)
- def contains(self, element, key):
- ref = self.get_artifact_fullname(element, key)
- return _ostree.exists(self.repo, ref)
- def extract(self, element, key):
- ref = self.get_artifact_fullname(element, key)
- # resolve ref to checksum
- rev = _ostree.checksum(self.repo, ref)
- # Extracting a nonexistent artifact is a bug
- assert rev, "Artifact missing for {}".format(ref)
- dest = os.path.join(self.extractdir, element._get_project().name, element.normal_name, rev)
- if os.path.isdir(dest):
- # artifact has already been extracted
- return dest
- os.makedirs(self.extractdir, exist_ok=True)
- with tempfile.TemporaryDirectory(prefix='tmp', dir=self.extractdir) as tmpdir:
- checkoutdir = os.path.join(tmpdir, ref)
- _ostree.checkout(self.repo, checkoutdir, rev, user=True)
- os.makedirs(os.path.dirname(dest), exist_ok=True)
- try:
- os.rename(checkoutdir, dest)
- except OSError as e:
- # With rename, it's possible to get either ENOTEMPTY or EEXIST
- # in the case that the destination path is a not empty directory.
- #
- # If rename fails with these errors, another process beat
- # us to it so just ignore.
- if e.errno not in [os.errno.ENOTEMPTY, os.errno.EEXIST]:
- raise ArtifactError("Failed to extract artifact for ref '{}': {}"
- .format(ref, e)) from e
- return dest
- def commit(self, element, content, keys):
- refs = [self.get_artifact_fullname(element, key) for key in keys]
- try:
- _ostree.commit(self.repo, content, refs)
- except OSTreeError as e:
- raise ArtifactError("Failed to commit artifact: {}".format(e)) from e
- def can_diff(self):
- return True
- def diff(self, element, key_a, key_b, *, subdir=None):
- _, a, _ = self.repo.read_commit(self.get_artifact_fullname(element, key_a))
- _, b, _ = self.repo.read_commit(self.get_artifact_fullname(element, key_b))
- if subdir:
- a = a.get_child(subdir)
- b = b.get_child(subdir)
- subpath = a.get_path()
- else:
- subpath = '/'
- modified, removed, added = _ostree.diff_dirs(a, b)
- modified = [os.path.relpath(, subpath) for item in modified]
- removed = [os.path.relpath(item.get_path(), subpath) for item in removed]
- added = [os.path.relpath(item.get_path(), subpath) for item in added]
- return modified, removed, added
- def pull(self, element, key, *, progress=None):
- project = element._get_project()
- ref = self.get_artifact_fullname(element, key)
- for remote in self._remotes[project]:
- try:
- # fetch the artifact from highest priority remote using the specified cache key
- remote_name = self._ensure_remote(self.repo, remote.pull_url)
- _ostree.fetch(self.repo, remote=remote_name, ref=ref, progress=progress)
- return True
- except OSTreeError:
- # Try next remote
- continue
- return False
- def link_key(self, element, oldkey, newkey):
- oldref = self.get_artifact_fullname(element, oldkey)
- newref = self.get_artifact_fullname(element, newkey)
- # resolve ref to checksum
- rev = _ostree.checksum(self.repo, oldref)
- # create additional ref for the same checksum
- _ostree.set_ref(self.repo, newref, rev)
- def push(self, element, keys):
- any_pushed = False
- project = element._get_project()
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
- if not push_remotes:
- raise ArtifactError("Push is not enabled for any of the configured remote artifact caches.")
- refs = [self.get_artifact_fullname(element, key) for key in keys]
- for remote in push_remotes:
- any_pushed |= self._push_to_remote(remote, element, refs)
- return any_pushed
- def initialize_remotes(self, *, on_failure=None):
- remote_specs = self.global_remote_specs.copy()
- for project in self.project_remote_specs:
- remote_specs.extend(self.project_remote_specs[project])
- remote_specs = list(utils._deduplicate(remote_specs))
- remote_results = {}
- # Callback to initialize one remote in a 'multiprocessing' subprocess.
- #
- # We cannot do this in the main process because of the way the tasks
- # run by the main scheduler calls into libostree using
- # fork()-without-exec() subprocesses. OSTree fetch operations in
- # subprocesses hang if fetch operations were previously done in the
- # main process.
- #
- def child_action(url, q):
- try:
- push_url, pull_url = self._initialize_remote(url)
- q.put((None, push_url, pull_url))
- except Exception as e: # pylint: disable=broad-except
- # Whatever happens, we need to return it to the calling process
- #
- q.put((str(e), None, None))
- # Kick off all the initialization jobs one by one.
- #
- # Note that we cannot use multiprocessing.Pool here because it's not
- # possible to pickle local functions such as child_action().
- #
- q = multiprocessing.Queue()
- for remote_spec in remote_specs:
- p = multiprocessing.Process(target=child_action, args=(remote_spec.url, q))
- try:
- # Keep SIGINT blocked in the child process
- with _signals.blocked([signal.SIGINT], ignore=False):
- p.start()
- error, push_url, pull_url = q.get()
- p.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(
- raise
- if error and on_failure:
- on_failure(remote_spec.url, error)
- elif error:
- raise ArtifactError(error)
- else:
- if remote_spec.push and push_url:
- self._has_push_remotes = True
- if pull_url:
- self._has_fetch_remotes = True
- remote_results[remote_spec.url] = (push_url, pull_url)
- # Prepare push_urls and pull_urls for each project
- for project in self.context.get_projects():
- remote_specs = self.global_remote_specs
- if project in self.project_remote_specs:
- remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project]))
- remotes = []
- for remote_spec in remote_specs:
- # Errors are already handled in the loop above,
- # skip unreachable remotes here.
- if remote_spec.url not in remote_results:
- continue
- push_url, pull_url = remote_results[remote_spec.url]
- if remote_spec.push and not push_url:
- raise ArtifactError("Push enabled but not supported by repo at: {}".format(remote_spec.url))
- remote = _OSTreeRemote(remote_spec, pull_url, push_url)
- remotes.append(remote)
- self._remotes[project] = remotes
- ################################################
- # Local Private Methods #
- ################################################
- # _initialize_remote():
- #
- # Do protocol-specific initialization necessary to use a given OSTree
- # remote.
- #
- # The SSH protocol that we use only supports pushing so initializing these
- # involves contacting the remote to find out the corresponding pull URL.
- #
- # Args:
- # url (str): URL of the remote
- #
- # Returns:
- # (str, str): the pull URL and push URL for the remote
- #
- # Raises:
- # ArtifactError: if there was an error
- def _initialize_remote(self, url):
- if url.startswith('ssh://'):
- try:
- push_url = url
- pull_url = initialize_push_connection(url)
- except PushException as e:
- raise ArtifactError(e) from e
- elif url.startswith('/'):
- push_url = pull_url = 'file://' + url
- elif url.startswith('file://'):
- push_url = pull_url = url
- elif url.startswith('http://') or url.startswith('https://'):
- push_url = None
- pull_url = url
- else:
- raise ArtifactError("Unsupported URL: {}".format(url))
- return push_url, pull_url
- # _ensure_remote():
- #
- # Ensure that our OSTree repo has a remote configured for the given URL.
- # Note that SSH access to remotes is not handled by libostree itself.
- #
- # Args:
- # repo (OSTree.Repo): an OSTree repository
- # pull_url (str): the URL where libostree can pull from the remote
- #
- # Returns:
- # (str): the name of the remote, which can be passed to various other
- # operations implemented by the _ostree module.
- #
- # Raises:
- # OSTreeError: if there was a problem reported by libostree
- def _ensure_remote(self, repo, pull_url):
- remote_name = utils.url_directory_name(pull_url)
- _ostree.configure_remote(repo, remote_name, pull_url)
- return remote_name
- def _push_to_remote(self, remote, element, refs):
- with utils._tempdir(dir=self.context.artifactdir, prefix='push-repo-') as temp_repo_dir:
- with element.timed_activity("Preparing compressed archive"):
- # First create a temporary archive-z2 repository, we can
- # only use ostree-push with archive-z2 local repo.
- temp_repo = _ostree.ensure(temp_repo_dir, True)
- # Now push the ref we want to push into our temporary archive-z2 repo
- for ref in refs:
- _ostree.fetch(temp_repo, remote=self.repo.get_path().get_uri(), ref=ref)
- with element.timed_activity("Sending artifact"), \
- element._output_file() as output_file:
- try:
- pushed = push_artifact(temp_repo.get_path().get_path(),
- remote.push_url,
- refs, output_file)
- except PushException as e:
- raise ArtifactError("Failed to push artifact {}: {}".format(refs, e)) from e
- return pushed
-# Represents a single remote OSTree cache.
-class _OSTreeRemote():
- def __init__(self, spec, pull_url, push_url):
- self.spec = spec
- self.pull_url = pull_url
- self.push_url = push_url