summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Thursfield <sam.thursfield@codethink.co.uk>2018-02-15 17:53:30 +0000
committerJürg Billeter <j@bitron.ch>2018-02-26 15:04:16 +0000
commitc36a1825bcb16004b5fc45c1b98f764a808a90a6 (patch)
tree7e782d8a5e1e82968a57cb9a4e86e63dad3ce738
parent2c0b69fb2c497be7d2b5492a56b200d952a89557 (diff)
downloadbuildstream-c36a1825bcb16004b5fc45c1b98f764a808a90a6.tar.gz
_artifactcache/ostreecache.py: Rework so that all remotes for a ref are tracked
This commit adds a couple of simple classes in order to reduce the number of different dictionaries tracking the state of the remotes within the OSTreeCache object. It also extends the internal ref map to remember all the remotes that store a given ref, not just the highest priority ref, and modifies the remote_contains_key() method to expose that data.
-rw-r--r--buildstream/_artifactcache/ostreecache.py139
1 files changed, 81 insertions, 58 deletions
diff --git a/buildstream/_artifactcache/ostreecache.py b/buildstream/_artifactcache/ostreecache.py
index 2c5d9c392..de0d33d58 100644
--- a/buildstream/_artifactcache/ostreecache.py
+++ b/buildstream/_artifactcache/ostreecache.py
@@ -52,6 +52,37 @@ def buildref(element, key):
return '{0}/{1}/{2}'.format(project.name, element_name, key)
+# Represents a single remote OSTree cache.
+#
+class _OSTreeRemote():
+ def __init__(self, spec, pull_url, push_url):
+ self.spec = spec
+ self.pull_url = pull_url
+ self.push_url = push_url
+
+
+# Maps artifacts to the remotes that contain them.
+#
+class _OSTreeArtifactMap():
+ def __init__(self):
+ self._ref_to_remotes = {}
+
+ def append(self, ref, remote):
+ if ref in self._ref_to_remotes:
+ self._ref_to_remotes[ref].append(remote)
+ else:
+ self._ref_to_remotes[ref] = [remote]
+
+ def lookup(self, ref):
+ return self._ref_to_remotes.get(ref, [])
+
+ def lookup_first(self, ref):
+ return self._ref_to_remotes.get(ref, [])[0]
+
+ def contains(self, ref):
+ return ref in self._ref_to_remotes
+
+
# An OSTreeCache manages artifacts in an OSTree repository
#
# Args:
@@ -73,9 +104,10 @@ class OSTreeCache(ArtifactCache):
ostreedir = os.path.join(context.artifactdir, 'ostree')
self.repo = _ostree.ensure(ostreedir, False)
- self.push_urls = {}
- self.pull_urls = {}
- self._remote_refs = {}
+ # Per-project list of OSTreeRemote and OSTreeArtifactMap instances.
+ self._remotes = {}
+ self._artifact_maps = {}
+
self._has_fetch_remotes = False
self._has_push_remotes = False
@@ -91,7 +123,8 @@ class OSTreeCache(ArtifactCache):
return True
else:
# Check whether the specified element's project has push remotes
- return len(self.push_urls[element._get_project()]) > 0
+ remotes_for_project = self._remotes[element._get_project()]
+ return any(remote.spec.push for remote in remotes_for_project)
# contains():
#
@@ -118,27 +151,24 @@ class OSTreeCache(ArtifactCache):
ref = buildref(element, key)
return _ostree.exists(self.repo, ref)
- # remote_contains_key():
+ # remotes_containing_key():
#
- # Check whether the artifact for the specified Element is already available
- # in the remote artifact cache.
+ # Return every remote cache that contains the key. The result will be an
+ # ordered list of remotes.
#
# Args:
# element (Element): The Element to check
# key (str): The key to use
#
- # Returns: True if the artifact is in the cache, False otherwise
+ # Returns (list): A list of _OSTreeRemote instances.
#
- def remote_contains_key(self, element, key):
+ def remotes_containing_key(self, element, key):
if not self._has_fetch_remotes:
- return False
-
- remote_refs = self._remote_refs[element._get_project()]
- if len(remote_refs) == 0:
- return False
+ return []
+ artifact_map = self._artifact_maps[element._get_project()]
ref = buildref(element, key)
- return ref in remote_refs
+ return artifact_map.lookup(ref)
# remote_contains():
#
@@ -162,7 +192,8 @@ class OSTreeCache(ArtifactCache):
if not key:
return False
- return self.remote_contains_key(element, key)
+ remotes = self.remotes_containing_key(element, key)
+ return len(remotes) > 0
# extract():
#
@@ -253,26 +284,28 @@ class OSTreeCache(ArtifactCache):
def pull(self, element, progress=None):
project = element._get_project()
- remote_refs = self._remote_refs[project]
+ artifact_map = self._artifact_maps[project]
ref = buildref(element, element._get_strict_cache_key())
weak_ref = buildref(element, element._get_cache_key(strength=_KeyStrength.WEAK))
try:
- if ref in remote_refs:
- # fetch the artifact using the strong cache key
- _ostree.fetch(self.repo, remote=remote_refs[ref],
- ref=ref, progress=progress)
+ if artifact_map.contains(ref):
+ # fetch the artifact from highest priority remote using the strong cache key
+ remote = artifact_map.lookup_first(ref)
+ remote_name = self._ensure_remote(self.repo, remote.pull_url)
+ _ostree.fetch(self.repo, remote=remote_name, ref=ref, progress=progress)
# resolve ref to checksum
rev = _ostree.checksum(self.repo, ref)
# update weak ref by pointing it to this newly fetched artifact
_ostree.set_ref(self.repo, weak_ref, rev)
- elif weak_ref in remote_refs:
- # fetch the artifact using the weak cache key
- _ostree.fetch(self.repo, remote=remote_refs[weak_ref],
- ref=weak_ref, progress=progress)
+ elif artifact_map.contains(weak_ref):
+ # fetch the artifact from the highest priority cache using the weak cache key
+ remote = artifact_map.lookup_first(weak_ref)
+ remote_name = self._ensure_remote(self.repo, remote.pull_url)
+ _ostree.fetch(self.repo, remote=remote_name, ref=weak_ref, progress=progress)
# resolve weak_ref to checksum
rev = _ostree.checksum(self.repo, weak_ref)
@@ -308,16 +341,16 @@ class OSTreeCache(ArtifactCache):
project = element._get_project()
- push_urls = self.push_urls[project]
+ push_remotes = [r for r in self._remotes[project] if r.spec.push]
- if len(push_urls) == 0:
+ if len(push_remotes) == 0:
raise ArtifactError("Push is not enabled for any of the configured remote artifact caches.")
ref = buildref(element, element._get_cache_key())
weak_ref = buildref(element, element._get_cache_key(strength=_KeyStrength.WEAK))
- for push_url in push_urls:
- any_pushed |= self._push_to_remote(push_url, element, ref, weak_ref)
+ for remote in push_remotes:
+ any_pushed |= self._push_to_remote(remote, element, ref, weak_ref)
return any_pushed
@@ -409,8 +442,8 @@ class OSTreeCache(ArtifactCache):
# possible to pickle local functions such as child_action().
#
q = multiprocessing.Queue()
- for remote in remote_specs:
- p = multiprocessing.Process(target=child_action, args=(remote.url, q))
+ for remote_spec in remote_specs:
+ p = multiprocessing.Process(target=child_action, args=(remote_spec.url, q))
try:
@@ -425,16 +458,16 @@ class OSTreeCache(ArtifactCache):
raise
if error and on_failure:
- on_failure(remote.url, error)
+ on_failure(remote_spec.url, error)
elif error:
raise ArtifactError(error)
else:
- if remote.push and push_url:
+ if remote_spec.push and push_url:
self._has_push_remotes = True
if pull_url:
self._has_fetch_remotes = True
- remote_results[remote.url] = (push_url, pull_url, remote_refs)
+ remote_results[remote_spec.url] = (push_url, pull_url, remote_refs)
# Prepare push_urls, pull_urls, and remote_refs for each project
for project in self.context._get_projects():
@@ -442,43 +475,33 @@ class OSTreeCache(ArtifactCache):
if project in self.project_remote_specs:
remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project]))
- push_urls = []
- pull_urls = []
- _remote_refs = {}
+ remotes = []
+ artifact_map = _OSTreeArtifactMap()
- for remote in remote_specs:
+ for remote_spec in remote_specs:
# Errors are already handled in the loop above,
# skip unreachable remotes here.
- if remote.url not in remote_results:
+ if remote_spec.url not in remote_results:
continue
- push_url, pull_url, remote_refs = remote_results[remote.url]
+ push_url, pull_url, remote_refs = remote_results[remote_spec.url]
- if remote.push:
- if push_url:
- push_urls.append(push_url)
- else:
- raise ArtifactError("Push enabled but not supported by repo at: {}".format(remote.url))
+ if remote_spec.push and not push_url:
+ raise ArtifactError("Push enabled but not supported by repo at: {}".format(remote_spec.url))
- # The specs are deduplicated when reading the config, but since
- # each push URL can supply an arbitrary pull URL we must dedup
- # those again here.
- if pull_url and pull_url not in pull_urls:
- pull_urls.append(pull_url)
+ remote = _OSTreeRemote(remote_spec, pull_url, push_url)
+ remotes.append(remote)
# Update our overall map of remote refs with any refs that are
# present in the new remote and were not already found in
# higher priority ones.
- remote = self._ensure_remote(self.repo, pull_url)
for ref in remote_refs:
- if ref not in _remote_refs:
- _remote_refs[ref] = remote
+ artifact_map.append(ref, remote)
- self.push_urls[project] = push_urls
- self.pull_urls[project] = pull_urls
- self._remote_refs[project] = _remote_refs
+ self._artifact_maps[project] = artifact_map
+ self._remotes[project] = remotes
- def _push_to_remote(self, push_url, element, ref, weak_ref):
+ def _push_to_remote(self, remote, element, ref, weak_ref):
with utils._tempdir(dir=self.context.artifactdir, prefix='push-repo-') as temp_repo_dir:
with element.timed_activity("Preparing compressed archive"):
@@ -494,7 +517,7 @@ class OSTreeCache(ArtifactCache):
element._output_file() as output_file:
try:
pushed = push_artifact(temp_repo.get_path().get_path(),
- push_url,
+ remote.push_url,
[ref, weak_ref], output_file)
except PushException as e:
raise ArtifactError("Failed to push artifact {}: {}".format(ref, e)) from e