summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChandan Singh <csingh43@bloomberg.net>2018-07-25 15:01:32 +0100
committerChandan Singh <csingh43@bloomberg.net>2018-08-15 18:29:07 +0100
commit9d2442b2794d7a531f50b3f1f9a0c6e4236bd9be (patch)
treeeebccbef85a0e5f82fe01567a66b7a2c424781c4
parent1292004636c659769ffa76009539957783aebbc3 (diff)
downloadbuildstream-9d2442b2794d7a531f50b3f1f9a0c6e4236bd9be.tar.gz
Allow source plugins to access previous sources
Source plugin implementations can now specify that they need access to previously staged sources by specifying `BST_REQUIRES_PREVIOUS_SOURCES_TRACK` and/or `BST_REQUIRES_PREVIOUS_SOURCES_FETCH`, corresponding to access at `track` and `fetch` times respectively. Fixes #381. Replaces !505. For relevant discussion, see this discussion: https://gitlab.com/BuildStream/buildstream/merge_requests/505#note_83780747
-rw-r--r--buildstream/_loader/loader.py5
-rw-r--r--buildstream/_scheduler/queues/fetchqueue.py4
-rw-r--r--buildstream/element.py10
-rw-r--r--buildstream/source.py223
-rw-r--r--tests/sources/previous_source_access.py47
-rw-r--r--tests/sources/previous_source_access/elements/target.bst6
-rw-r--r--tests/sources/previous_source_access/files/file1
-rw-r--r--tests/sources/previous_source_access/plugins/sources/foo_transform.py87
-rw-r--r--tests/sources/previous_source_access/project.conf10
9 files changed, 337 insertions, 56 deletions
diff --git a/buildstream/_loader/loader.py b/buildstream/_loader/loader.py
index 6e46197ab..275bc20cf 100644
--- a/buildstream/_loader/loader.py
+++ b/buildstream/_loader/loader.py
@@ -522,14 +522,15 @@ class Loader():
element = Element._new_from_meta(meta_element, platform.artifactcache)
element._preflight()
- for source in element.sources():
+ sources = list(element.sources())
+ for idx, source in enumerate(sources):
# Handle the case where a subproject needs to be fetched
#
if source.get_consistency() == Consistency.RESOLVED:
if fetch_subprojects:
if ticker:
ticker(filename, 'Fetching subproject from {} source'.format(source.get_kind()))
- source._fetch()
+ source._fetch(sources[0:idx])
else:
detail = "Try fetching the project with `bst fetch {}`".format(filename)
raise LoadError(LoadErrorReason.SUBPROJECT_FETCH_NEEDED,
diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py
index 265890b7a..bd90a13b6 100644
--- a/buildstream/_scheduler/queues/fetchqueue.py
+++ b/buildstream/_scheduler/queues/fetchqueue.py
@@ -40,8 +40,10 @@ class FetchQueue(Queue):
self._skip_cached = skip_cached
def process(self, element):
+ previous_sources = []
for source in element.sources():
- source._fetch()
+ source._fetch(previous_sources)
+ previous_sources.append(source)
def status(self, element):
# state of dependencies may have changed, recalculate element state
diff --git a/buildstream/element.py b/buildstream/element.py
index 40cac47cd..a34b1ca36 100644
--- a/buildstream/element.py
+++ b/buildstream/element.py
@@ -1229,6 +1229,12 @@ class Element(Plugin):
# Prepend provenance to the error
raise ElementError("{}: {}".format(self, e), reason=e.reason) from e
+ # Ensure that the first source does not need access to previous soruces
+ if self.__sources and self.__sources[0]._requires_previous_sources():
+ raise ElementError("{}: {} cannot be the first source of an element "
+ "as it requires access to previous sources"
+ .format(self, self.__sources[0]))
+
# Preflight the sources
for source in self.sources():
source._preflight()
@@ -1272,9 +1278,9 @@ class Element(Plugin):
#
def _track(self):
refs = []
- for source in self.__sources:
+ for index, source in enumerate(self.__sources):
old_ref = source.get_ref()
- new_ref = source._track()
+ new_ref = source._track(self.__sources[0:index])
refs.append((source._get_unique_id(), new_ref))
# Complimentary warning that the new ref will be unused.
diff --git a/buildstream/source.py b/buildstream/source.py
index d58bfe2a3..7a0a0ec88 100644
--- a/buildstream/source.py
+++ b/buildstream/source.py
@@ -76,6 +76,39 @@ these methods are mandatory to implement.
:ref:`SourceFetcher <core_source_fetcher>`.
+Accessing previous sources
+--------------------------
+*Since: 1.4*
+
+In the general case, all sources are fetched and tracked independently of one
+another. In situations where a source needs to access previous source(s) in
+order to perform its own track and/or fetch, following attributes can be set to
+request access to previous sources:
+
+* :attr:`~buildstream.source.Source.BST_REQUIRES_PREVIOUS_SOURCES_TRACK`
+
+ Indicate that access to previous sources is required during track
+
+* :attr:`~buildstream.source.Source.BST_REQUIRES_PREVIOUS_SOURCES_FETCH`
+
+ Indicate that access to previous sources is required during fetch
+
+The intended use of such plugins is to fetch external dependencies of other
+sources, typically using some kind of package manager, such that all the
+dependencies of the original source(s) are available at build time.
+
+When implementing such a plugin, implementors should adhere to the following
+guidelines:
+
+* Implementations must be able to store the obtained artifacts in a
+ subdirectory.
+
+* Implementations must be able to deterministically generate a unique ref, such
+ that two refs are different if and only if they produce different outputs.
+
+* Implementations must not introduce host contamination.
+
+
.. _core_source_fetcher:
SourceFetcher - Object for fetching individual URLs
@@ -92,6 +125,8 @@ mentioned, these methods are mandatory to implement.
Fetches the URL associated with this SourceFetcher, optionally taking an
alias override.
+Class Reference
+---------------
"""
import os
@@ -156,7 +191,7 @@ class SourceFetcher():
#############################################################
# Abstract Methods #
#############################################################
- def fetch(self, alias_override=None):
+ def fetch(self, alias_override=None, **kwargs):
"""Fetch remote sources and mirror them locally, ensuring at least
that the specific reference is cached locally.
@@ -209,6 +244,32 @@ class Source(Plugin):
__defaults = {} # The defaults from the project
__defaults_set = False # Flag, in case there are not defaults at all
+ BST_REQUIRES_PREVIOUS_SOURCES_TRACK = False
+ """Whether access to previous sources is required during track
+
+ When set to True:
+ * all sources listed before this source in the given element will be
+ fetched before this source is tracked
+ * Source.track() will be called with an additional keyword argument
+ `previous_sources_dir` where previous sources will be staged
+ * this source can not be the first source for an element
+
+ *Since: 1.4*
+ """
+
+ BST_REQUIRES_PREVIOUS_SOURCES_FETCH = False
+ """Whether access to previous sources is required during fetch
+
+ When set to True:
+ * all sources listed before this source in the given element will be
+ fetched before this source is fetched
+ * Source.fetch() will be called with an additional keyword argument
+ `previous_sources_dir` where previous sources will be staged
+ * this source can not be the first source for an element
+
+ *Since: 1.4*
+ """
+
def __init__(self, context, project, meta, *, alias_override=None):
provenance = _yaml.node_get_provenance(meta.config)
super().__init__("{}-{}".format(meta.element_name, meta.element_index),
@@ -305,9 +366,15 @@ class Source(Plugin):
"""
raise ImplError("Source plugin '{}' does not implement set_ref()".format(self.get_kind()))
- def track(self):
+ def track(self, **kwargs):
"""Resolve a new ref from the plugin's track option
+ Args:
+ previous_sources_dir (str): directory where previous sources are staged.
+ Note that this keyword argument is available only when
+ :attr:`~buildstream.source.Source.BST_REQUIRES_PREVIOUS_SOURCES_TRACK`
+ is set to True.
+
Returns:
(simple object): A new internal source reference, or None
@@ -326,10 +393,16 @@ class Source(Plugin):
# Allow a non implementation
return None
- def fetch(self):
+ def fetch(self, **kwargs):
"""Fetch remote sources and mirror them locally, ensuring at least
that the specific reference is cached locally.
+ Args:
+ previous_sources_dir (str): directory where previous sources are staged.
+ Note that this keyword argument is available only when
+ :attr:`~buildstream.source.Source.BST_REQUIRES_PREVIOUS_SOURCES_FETCH`
+ is set to True.
+
Raises:
:class:`.SourceError`
@@ -519,50 +592,19 @@ class Source(Plugin):
# Wrapper function around plugin provided fetch method
#
- def _fetch(self):
- project = self._get_project()
- source_fetchers = self.get_source_fetchers()
- if source_fetchers:
- for fetcher in source_fetchers:
- alias = fetcher._get_alias()
- success = False
- for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
- try:
- fetcher.fetch(uri)
- # FIXME: Need to consider temporary vs. permanent failures,
- # and how this works with retries.
- except BstError as e:
- last_error = e
- continue
- success = True
- break
- if not success:
- raise last_error
+ # Args:
+ # previous_sources (list): List of Sources listed prior to this source
+ #
+ def _fetch(self, previous_sources):
+
+ if self.BST_REQUIRES_PREVIOUS_SOURCES_FETCH:
+ self.__ensure_previous_sources(previous_sources)
+ with self.tempdir() as staging_directory:
+ for src in previous_sources:
+ src._stage(staging_directory)
+ self.__do_fetch(previous_sources_dir=self.__ensure_directory(staging_directory))
else:
- alias = self._get_alias()
- if self.__first_pass:
- mirrors = project.first_pass_config.mirrors
- else:
- mirrors = project.config.mirrors
- if not mirrors or not alias:
- self.fetch()
- return
-
- context = self._get_context()
- source_kind = type(self)
- for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
- new_source = source_kind(context, project, self.__meta,
- alias_override=(alias, uri))
- new_source._preflight()
- try:
- new_source.fetch()
- # FIXME: Need to consider temporary vs. permanent failures,
- # and how this works with retries.
- except BstError as e:
- last_error = e
- continue
- return
- raise last_error
+ self.__do_fetch()
# Wrapper for stage() api which gives the source
# plugin a fully constructed path considering the
@@ -773,8 +815,19 @@ class Source(Plugin):
# Wrapper for track()
#
- def _track(self):
- new_ref = self.__do_track()
+ # Args:
+ # previous_sources (list): List of Sources listed prior to this source
+ #
+ def _track(self, previous_sources):
+ if self.BST_REQUIRES_PREVIOUS_SOURCES_TRACK:
+ self.__ensure_previous_sources(previous_sources)
+ with self.tempdir() as staging_directory:
+ for src in previous_sources:
+ src._stage(staging_directory)
+ new_ref = self.__do_track(previous_sources_dir=self.__ensure_directory(staging_directory))
+ else:
+ new_ref = self.__do_track()
+
current_ref = self.get_ref()
if new_ref is None:
@@ -786,6 +839,17 @@ class Source(Plugin):
return new_ref
+ # _requires_previous_sources()
+ #
+ # If a plugin requires access to previous sources at track or fetch time,
+ # then it cannot be the first source of an elemenet.
+ #
+ # Returns:
+ # (bool): Whether this source requires access to previous sources
+ #
+ def _requires_previous_sources(self):
+ return self.BST_REQUIRES_PREVIOUS_SOURCES_TRACK or self.BST_REQUIRES_PREVIOUS_SOURCES_FETCH
+
# Returns the alias if it's defined in the project
def _get_alias(self):
alias = self.__expected_alias
@@ -801,8 +865,54 @@ class Source(Plugin):
# Local Private Methods #
#############################################################
+ # Tries to call fetch for every mirror, stopping once it succeeds
+ def __do_fetch(self, **kwargs):
+ project = self._get_project()
+ source_fetchers = self.get_source_fetchers()
+ if source_fetchers:
+ for fetcher in source_fetchers:
+ alias = fetcher._get_alias()
+ success = False
+ for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
+ try:
+ fetcher.fetch(uri)
+ # FIXME: Need to consider temporary vs. permanent failures,
+ # and how this works with retries.
+ except BstError as e:
+ last_error = e
+ continue
+ success = True
+ break
+ if not success:
+ raise last_error
+ else:
+ alias = self._get_alias()
+ if self.__first_pass:
+ mirrors = project.first_pass_config.mirrors
+ else:
+ mirrors = project.config.mirrors
+ if not mirrors or not alias:
+ self.fetch(**kwargs)
+ return
+
+ context = self._get_context()
+ source_kind = type(self)
+ for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
+ new_source = source_kind(context, project, self.__meta,
+ alias_override=(alias, uri))
+ new_source._preflight()
+ try:
+ new_source.fetch(**kwargs)
+ # FIXME: Need to consider temporary vs. permanent failures,
+ # and how this works with retries.
+ except BstError as e:
+ last_error = e
+ continue
+ return
+ raise last_error
+
# Tries to call track for every mirror, stopping once it succeeds
- def __do_track(self):
+ def __do_track(self, **kwargs):
project = self._get_project()
# If there are no mirrors, or no aliases to replace, there's nothing to do here.
alias = self._get_alias()
@@ -811,7 +921,7 @@ class Source(Plugin):
else:
mirrors = project.config.mirrors
if not mirrors or not alias:
- return self.track()
+ return self.track(**kwargs)
context = self._get_context()
source_kind = type(self)
@@ -823,7 +933,7 @@ class Source(Plugin):
alias_override=(alias, uri))
new_source._preflight()
try:
- ref = new_source.track()
+ ref = new_source.track(**kwargs)
# FIXME: Need to consider temporary vs. permanent failures,
# and how this works with retries.
except BstError as e:
@@ -867,3 +977,14 @@ class Source(Plugin):
_yaml.node_final_assertions(config)
return config
+
+ # Ensures that previous sources have been tracked and fetched.
+ #
+ def __ensure_previous_sources(self, previous_sources):
+ for index, src in enumerate(previous_sources):
+ if src.get_consistency() == Consistency.RESOLVED:
+ src._fetch(previous_sources[0:index])
+ elif src.get_consistency() == Consistency.INCONSISTENT:
+ new_ref = src._track(previous_sources[0:index])
+ src._save_ref(new_ref)
+ src._fetch(previous_sources[0:index])
diff --git a/tests/sources/previous_source_access.py b/tests/sources/previous_source_access.py
new file mode 100644
index 000000000..f7045383c
--- /dev/null
+++ b/tests/sources/previous_source_access.py
@@ -0,0 +1,47 @@
+import os
+import pytest
+
+from tests.testutils import cli
+
+DATA_DIR = os.path.join(
+ os.path.dirname(os.path.realpath(__file__)),
+ 'previous_source_access'
+)
+
+
+##################################################################
+# Tests #
+##################################################################
+# Test that plugins can access data from previous sources
+@pytest.mark.datafiles(DATA_DIR)
+def test_custom_transform_source(cli, tmpdir, datafiles):
+ project = os.path.join(datafiles.dirname, datafiles.basename)
+
+ # Ensure we can track
+ result = cli.run(project=project, args=[
+ 'track', 'target.bst'
+ ])
+ result.assert_success()
+
+ # Ensure we can fetch
+ result = cli.run(project=project, args=[
+ 'fetch', 'target.bst'
+ ])
+ result.assert_success()
+
+ # Ensure we get correct output from foo_transform
+ result = cli.run(project=project, args=[
+ 'build', 'target.bst'
+ ])
+ destpath = os.path.join(cli.directory, 'checkout')
+ result = cli.run(project=project, args=[
+ 'checkout', 'target.bst', destpath
+ ])
+ result.assert_success()
+ # Assert that files from both sources exist, and that they have
+ # the same content
+ assert os.path.exists(os.path.join(destpath, 'file'))
+ assert os.path.exists(os.path.join(destpath, 'filetransform'))
+ with open(os.path.join(destpath, 'file')) as file1:
+ with open(os.path.join(destpath, 'filetransform')) as file2:
+ assert file1.read() == file2.read()
diff --git a/tests/sources/previous_source_access/elements/target.bst b/tests/sources/previous_source_access/elements/target.bst
new file mode 100644
index 000000000..c9d3b9bb6
--- /dev/null
+++ b/tests/sources/previous_source_access/elements/target.bst
@@ -0,0 +1,6 @@
+kind: import
+
+sources:
+- kind: local
+ path: files/file
+- kind: foo_transform
diff --git a/tests/sources/previous_source_access/files/file b/tests/sources/previous_source_access/files/file
new file mode 100644
index 000000000..980a0d5f1
--- /dev/null
+++ b/tests/sources/previous_source_access/files/file
@@ -0,0 +1 @@
+Hello World!
diff --git a/tests/sources/previous_source_access/plugins/sources/foo_transform.py b/tests/sources/previous_source_access/plugins/sources/foo_transform.py
new file mode 100644
index 000000000..7101bfd24
--- /dev/null
+++ b/tests/sources/previous_source_access/plugins/sources/foo_transform.py
@@ -0,0 +1,87 @@
+"""
+foo_transform - transform "file" from previous sources into "filetransform"
+===========================================================================
+
+This is a test source plugin that looks for a file named "file" staged by
+previous sources, and copies its contents to a file called "filetransform".
+
+"""
+
+import os
+import hashlib
+
+from buildstream import Consistency, Source, SourceError, utils
+
+
+class FooTransformSource(Source):
+
+ # We need access to previous both at track time and fetch time
+ BST_REQUIRES_PREVIOUS_SOURCES_TRACK = True
+ BST_REQUIRES_PREVIOUS_SOURCES_FETCH = True
+
+ @property
+ def mirror(self):
+ """Directory where this source should stage its files
+
+ """
+ path = os.path.join(self.get_mirror_directory(), self.name,
+ self.ref.strip())
+ os.makedirs(path, exist_ok=True)
+ return path
+
+ def configure(self, node):
+ self.node_validate(node, ['ref'] + Source.COMMON_CONFIG_KEYS)
+ self.ref = self.node_get_member(node, str, 'ref', None)
+
+ def preflight(self):
+ pass
+
+ def get_unique_key(self):
+ return (self.ref,)
+
+ def get_consistency(self):
+ if self.ref is None:
+ return Consistency.INCONSISTENT
+ # If we have a file called "filetransform", verify that its checksum
+ # matches our ref. Otherwise, it resolved but not cached.
+ fpath = os.path.join(self.mirror, 'filetransform')
+ try:
+ with open(fpath, 'rb') as f:
+ if hashlib.sha256(f.read()).hexdigest() == self.ref.strip():
+ return Consistency.CACHED
+ except Exception:
+ pass
+ return Consistency.RESOLVED
+
+ def get_ref(self):
+ return self.ref
+
+ def set_ref(self, ref, node):
+ self.ref = node['ref'] = ref
+
+ def track(self, previous_sources_dir):
+ # Store the checksum of the file from previous source as our ref
+ fpath = os.path.join(previous_sources_dir, 'file')
+ with open(fpath, 'rb') as f:
+ return hashlib.sha256(f.read()).hexdigest()
+
+ def fetch(self, previous_sources_dir):
+ fpath = os.path.join(previous_sources_dir, 'file')
+ # Verify that the checksum of the file from previous source matches
+ # our ref
+ with open(fpath, 'rb') as f:
+ if hashlib.sha256(f.read()).hexdigest() != self.ref.strip():
+ raise SourceError("Element references do not match")
+
+ # Copy "file" as "filetransform"
+ newfpath = os.path.join(self.mirror, 'filetransform')
+ utils.safe_copy(fpath, newfpath)
+
+ def stage(self, directory):
+ # Simply stage the "filetransform" file
+ utils.safe_copy(os.path.join(self.mirror, 'filetransform'),
+ os.path.join(directory, 'filetransform'))
+
+
+def setup():
+ return FooTransformSource
diff --git a/tests/sources/previous_source_access/project.conf b/tests/sources/previous_source_access/project.conf
new file mode 100644
index 000000000..1749b3dba
--- /dev/null
+++ b/tests/sources/previous_source_access/project.conf
@@ -0,0 +1,10 @@
+# Project with local source plugins
+name: foo
+
+element-path: elements
+
+plugins:
+- origin: local
+ path: plugins/sources
+ sources:
+ foo_transform: 0