summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2020-08-06 07:12:10 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2020-08-06 07:12:10 +0000
commit16a5a20e4e832fc4c8bc225b92251efbada5e454 (patch)
treecfb498e84a5650c8fb9fe127ecc6df61d235d5f0
parent981ad3697a9197da38f2ec4a867096d7a1ad144c (diff)
parentff180169260423e5da1742b89d4156186a0e4224 (diff)
downloadbuildstream-16a5a20e4e832fc4c8bc225b92251efbada5e454.tar.gz
Merge branch 'juerg/element-sources' into 'master'
Extract ElementSources class See merge request BuildStream/buildstream!2007
-rw-r--r--src/buildstream/_elementsources.py319
-rw-r--r--src/buildstream/_frontend/app.py6
-rw-r--r--src/buildstream/_frontend/widget.py35
-rw-r--r--src/buildstream/_loader/loader.py2
-rw-r--r--src/buildstream/_pipeline.py2
-rw-r--r--src/buildstream/element.py297
-rw-r--r--src/buildstream/source.py15
-rw-r--r--tests/frontend/track.py5
-rw-r--r--tests/sources/git.py48
-rw-r--r--tests/sources/local.py2
10 files changed, 447 insertions, 284 deletions
diff --git a/src/buildstream/_elementsources.py b/src/buildstream/_elementsources.py
new file mode 100644
index 000000000..fdb404b67
--- /dev/null
+++ b/src/buildstream/_elementsources.py
@@ -0,0 +1,319 @@
+#
+# Copyright (C) 2016-2018 Codethink Limited
+# Copyright (C) 2017-2020 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+from typing import TYPE_CHECKING, Iterator
+
+from ._context import Context
+
+from .storage._casbaseddirectory import CasBasedDirectory
+
+if TYPE_CHECKING:
+ from typing import List
+
+ from .source import Source
+
+# An ElementSources object represents the combined sources of an element.
+class ElementSources:
+ def __init__(self, context: Context):
+
+ self._context = context
+ self._sources = [] # type: List[Source]
+ self.vdir = None # Directory with staged sources
+ self._sourcecache = context.sourcecache # Source cache
+ self._is_resolved = False # Whether the source is fully resolved or not
+ self._cached = None # If the sources are known to be successfully cached in CAS
+
+ # the index of the last source in this element that requires previous
+ # sources for staging
+ self._last_source_requires_previous_idx = None
+
+ # add_source():
+ #
+ # Append source to this list of element sources.
+ #
+ # Args:
+ # source (Source): The source to add
+ #
+ def add_source(self, source):
+ self._sources.append(source)
+
+ # sources():
+ #
+ # A generator function to enumerate the element sources
+ #
+ # Yields:
+ # Source: The individual sources
+ #
+ def sources(self) -> Iterator["Source"]:
+ for source in self._sources:
+ yield source
+
+ # track():
+ #
+ # Calls track() on the Element sources
+ #
+ # Raises:
+ # SourceError: If one of the element sources has an error
+ #
+ # Returns:
+ # (list): A list of Source object ids and their new references
+ #
+ def track(self, workspace):
+ refs = []
+ for index, source in enumerate(self._sources):
+ old_ref = source.get_ref()
+ new_ref = source._track(self._sources[0:index])
+ refs.append((source._unique_id, new_ref))
+
+ # Complimentary warning that the new ref will be unused.
+ if old_ref != new_ref and workspace:
+ detail = (
+ "This source has an open workspace.\n"
+ + "To start using the new reference, please close the existing workspace."
+ )
+ source.warn("Updated reference will be ignored as source has open workspace", detail=detail)
+
+ return refs
+
+ # stage():
+ #
+ # Stage the element sources to a directory
+ #
+ # Returns:
+ # (:class:`.storage.Directory`): A virtual directory object to stage sources into.
+ #
+ def stage(self):
+ # Assert sources are cached
+ assert self.cached()
+
+ self.vdir = CasBasedDirectory(self._context.get_cascache())
+
+ if self._sources:
+ # find last required source
+ last_required_previous_idx = self._last_source_requires_previous()
+
+ for source in self._sources[last_required_previous_idx:]:
+ source_dir = self._sourcecache.export(source)
+ self.vdir.import_files(source_dir)
+
+ return self.vdir
+
+ # fetch_done()
+ #
+ # Indicates that fetching the sources for this element has been done.
+ #
+ # Args:
+ # fetched_original (bool): Whether the original sources had been asked (and fetched) or not
+ #
+ def fetch_done(self, fetched_original):
+ self._cached = True
+
+ for source in self._sources:
+ source._fetch_done(fetched_original)
+
+ # push()
+ #
+ # Push the element's sources.
+ #
+ # Returns:
+ # (bool): True if the remote was updated, False if it already existed
+ # and no updated was required
+ #
+ def push(self):
+ pushed = False
+
+ for source in self.sources():
+ if self._sourcecache.push(source):
+ pushed = True
+
+ return pushed
+
+ # init_workspace():
+ #
+ # Initialises a new workspace from the element sources.
+ #
+ # Args:
+ # directory (str): Path of the workspace to init
+ #
+ def init_workspace(self, directory: str):
+ for source in self.sources():
+ source._init_workspace(directory)
+
+ # fetch():
+ #
+ # Fetch the element sources.
+ #
+ # Raises:
+ # SourceError: If one of the element sources has an error
+ #
+ def fetch(self, fetch_original=False):
+ previous_sources = []
+ fetch_needed = False
+
+ if self._sources and not fetch_original:
+ for source in self._sources:
+ if self._sourcecache.contains(source):
+ continue
+
+ # try and fetch from source cache
+ if not source._is_cached() and self._sourcecache.has_fetch_remotes():
+ if self._sourcecache.pull(source):
+ continue
+
+ fetch_needed = True
+
+ # We need to fetch original sources
+ if fetch_needed or fetch_original:
+ for source in self.sources():
+ if not source._is_cached():
+ source._fetch(previous_sources)
+ previous_sources.append(source)
+
+ self._cache_sources()
+
+ # get_unique_key():
+ #
+ # Return something which uniquely identifies the combined sources of the
+ # element.
+ #
+ # Returns:
+ # (str, list, dict): A string, list or dictionary as unique identifier
+ #
+ def get_unique_key(self):
+ result = []
+
+ for source in self._sources:
+ result.append({"key": source._get_unique_key(), "name": source._get_source_name()})
+
+ return result
+
+ # cached():
+ #
+ # Check if the element sources are cached in CAS, generating the source
+ # cache keys if needed.
+ #
+ # Returns:
+ # (bool): True if the element sources are cached
+ #
+ def cached(self):
+ if self._cached is not None:
+ return self._cached
+
+ sourcecache = self._sourcecache
+
+ # Go through sources we'll cache generating keys
+ for ix, source in enumerate(self._sources):
+ if not source._key:
+ if source.BST_REQUIRES_PREVIOUS_SOURCES_STAGE:
+ source._generate_key(self._sources[:ix])
+ else:
+ source._generate_key([])
+
+ # Check all sources are in source cache
+ for source in self._sources:
+ if not sourcecache.contains(source):
+ return False
+
+ self._cached = True
+ return True
+
+ # is_resolved():
+ #
+ # Get whether all sources of the element are resolved
+ #
+ # Returns:
+ # (bool): True if all element sources are resolved
+ #
+ def is_resolved(self):
+ return self._is_resolved
+
+ # cached_original():
+ #
+ # Get whether all the sources of the element have their own cached
+ # copy of their sources.
+ #
+ # Returns:
+ # (bool): True if all element sources have the original sources cached
+ #
+ def cached_original(self):
+ return all(source._is_cached() for source in self._sources)
+
+ # update_resolved_state():
+ #
+ # Updates source's resolved state
+ #
+ # An element's source state must be resolved before it may compute
+ # cache keys, because the source's ref, whether defined in yaml or
+ # from the workspace, is a component of the element's cache keys.
+ #
+ def update_resolved_state(self):
+ for source in self._sources:
+ if not source.is_resolved():
+ break
+ else:
+ self._is_resolved = True
+
+ # preflight():
+ #
+ # A internal wrapper for calling the abstract preflight() method on
+ # the element and its sources.
+ #
+ def preflight(self):
+ # Ensure that the first source does not need access to previous sources
+ if self._sources and self._sources[0]._requires_previous_sources():
+ from .element import ElementError # pylint: disable=cyclic-import
+
+ raise ElementError(
+ "{}: {} cannot be the first source of an element "
+ "as it requires access to previous sources".format(self, self._sources[0])
+ )
+
+ # Preflight the sources
+ for source in self.sources():
+ source._preflight()
+
+ # _cache_sources():
+ #
+ # Caches the sources into the local CAS
+ #
+ def _cache_sources(self):
+ if self._sources and not self.cached():
+ last_requires_previous = 0
+ # commit all other sources by themselves
+ for idx, source in enumerate(self._sources):
+ if source.BST_REQUIRES_PREVIOUS_SOURCES_STAGE:
+ self._sourcecache.commit(source, self._sources[last_requires_previous:idx])
+ last_requires_previous = idx
+ else:
+ self._sourcecache.commit(source, [])
+
+ # _last_source_requires_previous
+ #
+ # This is the last source that requires previous sources to be cached.
+ # Sources listed after this will be cached separately.
+ #
+ # Returns:
+ # (int): index of last source that requires previous sources
+ #
+ def _last_source_requires_previous(self):
+ if self._last_source_requires_previous_idx is None:
+ last_requires_previous = 0
+ for idx, source in enumerate(self._sources):
+ if source.BST_REQUIRES_PREVIOUS_SOURCES_STAGE:
+ last_requires_previous = idx
+ self._last_source_requires_previous_idx = last_requires_previous
+ return self._last_source_requires_previous_idx
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 3160e8b1e..5d49e9612 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -747,7 +747,11 @@ class App:
# Ensure all status & messages have been processed
self._render(message_text=self._message_text)
click.echo("", err=True)
- self.logger.print_summary(self.stream, self._main_options["log_file"])
+
+ try:
+ self.logger.print_summary(self.stream, self._main_options["log_file"])
+ except BstError as e:
+ self._error_exit(e)
# _error_exit()
#
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index f07e3dba0..5038e9d6b 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -29,7 +29,7 @@ import click
from .profile import Profile
from .. import Scope
from .. import __version__ as bst_version
-from .._exceptions import ImplError
+from .._exceptions import BstError, ImplError
from .._message import MessageType
from ..storage.directory import _FileType
@@ -346,21 +346,26 @@ class LogLine(Widget):
line = p.fmt_subst(line, "key", cache_key, fg="yellow", dim=dim_keys)
line = p.fmt_subst(line, "full-key", full_key, fg="yellow", dim=dim_keys)
- if not element._has_all_sources_resolved():
- line = p.fmt_subst(line, "state", "no reference", fg="red")
- else:
- if element.get_kind() == "junction":
- line = p.fmt_subst(line, "state", "junction", fg="magenta")
- elif element._cached_failure():
- line = p.fmt_subst(line, "state", "failed", fg="red")
- elif element._cached_success():
- line = p.fmt_subst(line, "state", "cached", fg="magenta")
- elif not element._has_all_sources_in_source_cache() and not element._has_all_sources_cached():
- line = p.fmt_subst(line, "state", "fetch needed", fg="red")
- elif element._buildable():
- line = p.fmt_subst(line, "state", "buildable", fg="green")
+ try:
+ if not element._has_all_sources_resolved():
+ line = p.fmt_subst(line, "state", "no reference", fg="red")
else:
- line = p.fmt_subst(line, "state", "waiting", fg="blue")
+ if element.get_kind() == "junction":
+ line = p.fmt_subst(line, "state", "junction", fg="magenta")
+ elif element._cached_failure():
+ line = p.fmt_subst(line, "state", "failed", fg="red")
+ elif element._cached_success():
+ line = p.fmt_subst(line, "state", "cached", fg="magenta")
+ elif element._fetch_needed():
+ line = p.fmt_subst(line, "state", "fetch needed", fg="red")
+ elif element._buildable():
+ line = p.fmt_subst(line, "state", "buildable", fg="green")
+ else:
+ line = p.fmt_subst(line, "state", "waiting", fg="blue")
+ except BstError as e:
+ # Provide context to plugin error
+ e.args = ("Failed to determine state for {}: {}".format(element._get_full_name(), str(e)),)
+ raise e
# Element configuration
if "%{config" in format_:
diff --git a/src/buildstream/_loader/loader.py b/src/buildstream/_loader/loader.py
index 9f881743d..ab299f6f1 100644
--- a/src/buildstream/_loader/loader.py
+++ b/src/buildstream/_loader/loader.py
@@ -762,7 +762,7 @@ class Loader:
# Handle the case where a subproject needs to be fetched
#
- if not element._has_all_sources_in_source_cache():
+ if element._should_fetch():
self.load_context.fetch_subprojects([element])
sources = list(element.sources())
diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py
index 9edc6f51b..42579af85 100644
--- a/src/buildstream/_pipeline.py
+++ b/src/buildstream/_pipeline.py
@@ -373,7 +373,7 @@ class Pipeline:
uncached = []
with self._context.messenger.timed_activity("Checking sources"):
for element in elements:
- if not element._has_all_sources_in_source_cache() and not element._has_all_sources_cached():
+ if element._fetch_needed():
uncached.append(element)
if uncached:
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 4c3f239a1..dbbc7bf93 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -1,5 +1,6 @@
#
# Copyright (C) 2016-2018 Codethink Limited
+# Copyright (C) 2017-2020 Bloomberg Finance LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -104,10 +105,10 @@ from .sandbox._config import SandboxConfig
from .sandbox._sandboxremote import SandboxRemote
from .types import CoreWarnings, Scope, _CacheBuildTrees, _KeyStrength
from ._artifact import Artifact
+from ._elementsources import ElementSources
from .storage.directory import Directory
from .storage._filebaseddirectory import FileBasedDirectory
-from .storage._casbaseddirectory import CasBasedDirectory
from .storage.directory import VirtualDirectoryError
if TYPE_CHECKING:
@@ -236,20 +237,15 @@ class Element(Plugin):
self.__ready_for_runtime = False # Whether the element and its runtime dependencies have cache keys
self.__ready_for_runtime_and_cached = False # Whether all runtime deps are cached, as well as the element
self.__cached_remotely = None # Whether the element is cached remotely
- # List of Sources
- self.__sources = [] # type: List[Source]
- self.__sources_vdir = None # Directory with staged sources
+ self.__sources = ElementSources(context) # The element sources
self.__weak_cache_key = None # Our cached weak cache key
self.__strict_cache_key = None # Our cached cache key for strict builds
self.__artifacts = context.artifactcache # Artifact cache
self.__sourcecache = context.sourcecache # Source cache
- self.__is_resolved = False # Whether the source is fully resolved or not
self.__assemble_scheduled = False # Element is scheduled to be assembled
self.__assemble_done = False # Element is assembled
self.__pull_done = False # Whether pull was attempted
self.__cached_successfully = None # If the Element is known to be successfully cached
- self.__has_all_sources_in_source_cache = None # If the sources are known to be successfully cached
- self.__has_all_sources_cached = False # Whether all sources have a local copy of their respective sources
self.__splits = None # Resolved regex objects for computing split domains
self.__whitelist_regex = None # Resolved regex object to check if file is allowed to overlap
self.__tainted = None # Whether the artifact is tainted and should not be shared
@@ -261,10 +257,6 @@ class Element(Plugin):
self.__strict_artifact = None # Artifact for strict cache key
self.__meta_kind = meta.kind # The kind of this source, required for unpickling
- # the index of the last source in this element that requires previous
- # sources for staging
- self.__last_source_requires_previous_ix = None
-
self.__batch_prepare_assemble = False # Whether batching across prepare()/assemble() is configured
self.__batch_prepare_assemble_flags = 0 # Sandbox flags for batching across prepare()/assemble()
# Collect dir for batching across prepare()/assemble()
@@ -421,8 +413,7 @@ class Element(Plugin):
Yields:
The sources of this element
"""
- for source in self.__sources:
- yield source
+ return self.__sources.sources()
def dependencies(self, scope: Scope, *, recurse: bool = True, visited=None) -> Iterator["Element"]:
"""dependencies(scope, *, recurse=True)
@@ -915,7 +906,7 @@ class Element(Plugin):
redundant_ref = source._load_ref()
- element.__sources.append(source)
+ element.__sources.add_source(source)
# Collect redundant refs which occurred at load time
if redundant_ref is not None:
@@ -1077,7 +1068,7 @@ class Element(Plugin):
# (bool): Whether this element can currently be built
#
def _buildable(self):
- if not (self._has_all_sources_in_source_cache() or self._has_all_sources_cached()):
+ if self._fetch_needed():
return False
if not self.__assemble_scheduled:
@@ -1119,9 +1110,9 @@ class Element(Plugin):
# Compute up the elment's initial state. Element state contains
# the following mutable sub-states:
#
- # - Source state
+ # - Source state in `ElementSources`
# - Artifact cache key
- # - Source key
+ # - Source key in `ElementSources`
# - Integral component of the cache key
# - Computed as part of the source state
# - Artifact state
@@ -1138,8 +1129,6 @@ class Element(Plugin):
# notably jobs executed in sub-processes. Changes are performed by
# invocations of the following methods:
#
- # - __update_resolved_state()
- # - Computes the state of all sources of the element.
# - __update_cache_keys()
# - Computes the strong and weak cache keys.
# - _update_artifact_state()
@@ -1159,8 +1148,9 @@ class Element(Plugin):
# them, causing the state change to bubble through all potential
# side effects.
#
- # *This* method starts the process by invoking
- # `__update_resolved_state()`, which will cause all necessary state
+ # After initializing the source state via `ElementSources`,
+ # *this* method starts the process by invoking
+ # `__update_cache_keys()`, which will cause all necessary state
# changes. Other functions should use the appropriate methods and
# only update what they expect to change - this will ensure that
# the minimum amount of work is done.
@@ -1169,9 +1159,12 @@ class Element(Plugin):
assert not self._resolved_initial_state, "_initialize_state() should only be called once"
self._resolved_initial_state = True
- # This will update source state, and for un-initialized
+ # This will initialize source state.
+ self.__sources.update_resolved_state()
+
+ # This will calculate the cache keys, and for un-initialized
# elements recursively initialize anything else (because it
- # will become considered outdated after source state is
+ # will become considered outdated after cache keys are
# updated).
#
# FIXME: Currently this method may cause recursion through
@@ -1181,7 +1174,7 @@ class Element(Plugin):
# pull/build, however should not occur during initialization
# (since we will eventualyl visit reverse dependencies during
# our initialization anyway).
- self.__update_resolved_state()
+ self.__update_cache_keys()
# _get_display_key():
#
@@ -1230,14 +1223,8 @@ class Element(Plugin):
def _tracking_done(self):
# Tracking may change the sources' refs, and therefore the
# source state. We need to update source state.
- self.__update_resolved_state()
-
- # Check whether sources are now cached.
- # This is done here so that we don't throw an exception trying to show the pipeline at the end
- # This has for side-effect to cache this fact too, which will change the object's state.
- # This is done here rather than later so we can validate that the sources are valid locally
- self._has_all_sources_in_source_cache()
- self._has_all_sources_cached()
+ self.__sources.update_resolved_state()
+ self.__update_cache_keys()
# _track():
#
@@ -1250,21 +1237,7 @@ class Element(Plugin):
# (list): A list of Source object ids and their new references
#
def _track(self):
- refs = []
- for index, source in enumerate(self.__sources):
- old_ref = source.get_ref()
- new_ref = source._track(self.__sources[0:index])
- refs.append((source._unique_id, new_ref))
-
- # Complimentary warning that the new ref will be unused.
- if old_ref != new_ref and self._get_workspace():
- detail = (
- "This source has an open workspace.\n"
- + "To start using the new reference, please close the existing workspace."
- )
- source.warn("Updated reference will be ignored as source has open workspace", detail=detail)
-
- return refs
+ return self.__sources.track(self._get_workspace())
# _prepare_sandbox():
#
@@ -1346,39 +1319,22 @@ class Element(Plugin):
# No cached buildtree, stage source from source cache
else:
-
- # Assert sources are cached
- assert self._has_all_sources_in_source_cache()
-
- if self.__sources:
-
- sourcecache = context.sourcecache
- # find last required source
- last_required_previous_ix = self.__last_source_requires_previous()
- import_dir = CasBasedDirectory(context.get_cascache())
-
- try:
- for source in self.__sources[last_required_previous_ix:]:
- source_dir = sourcecache.export(source)
- import_dir.import_files(source_dir)
-
- except SourceCacheError as e:
- raise ElementError("Error trying to export source for {}: {}".format(self.name, e))
- except VirtualDirectoryError as e:
- raise ElementError(
- "Error trying to import sources together for {}: {}".format(self.name, e),
- reason="import-source-files-fail",
- )
-
- self.__sources_vdir = import_dir
-
- # incremental builds should merge the source into the last artifact before staging
- last_build_artifact = self.__get_last_build_artifact()
- if last_build_artifact:
- self.info("Incremental build")
- last_sources = last_build_artifact.get_sources()
- import_dir = last_build_artifact.get_buildtree()
- import_dir._apply_changes(last_sources, self.__sources_vdir)
+ try:
+ staged_sources = self.__sources.stage()
+ except (SourceCacheError, VirtualDirectoryError) as e:
+ raise ElementError(
+ "Error trying to stage sources for {}: {}".format(self.name, e), reason="stage-sources-fail"
+ )
+
+ # incremental builds should merge the source into the last artifact before staging
+ last_build_artifact = self.__get_last_build_artifact()
+ if last_build_artifact:
+ self.info("Incremental build")
+ last_sources = last_build_artifact.get_sources()
+ import_dir = last_build_artifact.get_buildtree()
+ import_dir._apply_changes(last_sources, staged_sources)
+ else:
+ import_dir = staged_sources
# Set update_mtime to ensure deterministic mtime of sources at build time
with utils._deterministic_umask():
@@ -1674,7 +1630,7 @@ class Element(Plugin):
# if the directory could not be found.
pass
- sourcesvdir = self.__sources_vdir
+ sourcesvdir = self.__sources.vdir
if collect is not None:
try:
@@ -1705,12 +1661,7 @@ class Element(Plugin):
# fetched_original (bool): Whether the original sources had been asked (and fetched) or not
#
def _fetch_done(self, fetched_original):
- self.__has_all_sources_in_source_cache = True
- if fetched_original:
- self.__has_all_sources_cached = True
-
- for source in self.__sources:
- source._fetch_done(fetched_original)
+ self.__sources.fetch_done(fetched_original)
# _pull_pending()
#
@@ -1796,19 +1747,12 @@ class Element(Plugin):
return True
def _skip_source_push(self):
- if not self.__sources or self._get_workspace():
+ if not self.sources() or self._get_workspace():
return True
return not (self.__sourcecache.has_push_remotes(plugin=self) and self._has_all_sources_in_source_cache())
def _source_push(self):
- # try and push sources if we've got them
- if self.__sourcecache.has_push_remotes(plugin=self) and self._has_all_sources_in_source_cache():
- for source in self.sources():
- if not self.__sourcecache.push(source):
- return False
-
- # Notify successful upload
- return True
+ return self.__sources.push()
# _skip_push():
#
@@ -1941,8 +1885,7 @@ class Element(Plugin):
#
os.makedirs(context.builddir, exist_ok=True)
with utils._tempdir(dir=context.builddir, prefix="workspace-{}".format(self.normal_name)) as temp:
- for source in self.sources():
- source._init_workspace(temp)
+ self.__sources.init_workspace(temp)
# Now hardlink the files into the workspace target.
utils.link_files(temp, workspace.get_absolute_path())
@@ -2048,29 +1991,7 @@ class Element(Plugin):
# SourceError: If one of the element sources has an error
#
def _fetch(self, fetch_original=False):
- previous_sources = []
- fetch_needed = False
-
- if self.__sources and not fetch_original:
- for source in self.__sources:
- if self.__sourcecache.contains(source):
- continue
-
- # try and fetch from source cache
- if not source._is_cached() and self.__sourcecache.has_fetch_remotes():
- if self.__sourcecache.pull(source):
- continue
-
- fetch_needed = True
-
- # We need to fetch original sources
- if fetch_needed or fetch_original:
- for source in self.sources():
- if not source._is_cached():
- source._fetch(previous_sources)
- previous_sources.append(source)
-
- self.__cache_sources()
+ self.__sources.fetch(fetch_original=fetch_original)
# _calculate_cache_key():
#
@@ -2103,12 +2024,7 @@ class Element(Plugin):
"public": self.__public.strip_node_info(),
}
- self.__cache_key_dict["sources"] = []
-
- for source in self.__sources:
- self.__cache_key_dict["sources"].append(
- {"key": source._get_unique_key(), "name": source._get_source_name()}
- )
+ self.__cache_key_dict["sources"] = self.__sources.get_unique_key()
self.__cache_key_dict["fatal-warnings"] = sorted(project._fatal_warnings)
@@ -2117,56 +2033,50 @@ class Element(Plugin):
return _cachekey.generate_key(cache_key_dict)
- # Check if sources are cached, generating the source key if it hasn't been
+ # _has_all_sources_in_source_cache()
+ #
+ # Get whether all sources of the element are cached in CAS
+ #
+ # Returns:
+ # (bool): True if the element sources are in CAS
+ #
def _has_all_sources_in_source_cache(self):
- if self.__has_all_sources_in_source_cache is not None:
- return self.__has_all_sources_in_source_cache
-
- if self.__sources:
- sourcecache = self._get_context().sourcecache
-
- # Go through sources we'll cache generating keys
- for ix, source in enumerate(self.__sources):
- if not source._key:
- if source.BST_REQUIRES_PREVIOUS_SOURCES_STAGE:
- source._generate_key(self.__sources[:ix])
- else:
- source._generate_key([])
-
- # Check all sources are in source cache
- for source in self.__sources:
- if not sourcecache.contains(source):
- return False
-
- self.__has_all_sources_in_source_cache = True
- return True
+ return self.__sources.cached()
# _has_all_sources_resolved()
#
# Get whether all sources of the element are resolved
#
+ # Returns:
+ # (bool): True if all element sources are resolved
+ #
def _has_all_sources_resolved(self):
- return self.__is_resolved
+ return self.__sources.is_resolved()
- # _has_all_sources_cached()
+ # _fetch_needed():
+ #
+ # Return whether sources need to be fetched from a remote
#
- # Get whether all the sources of the element have their own cached
- # copy of their sources.
+ # Returns:
+ # (bool): True if one or more element sources need to be fetched
#
- def _has_all_sources_cached(self):
- if not self.__has_all_sources_cached:
- self.__has_all_sources_cached = all(source._is_cached() for source in self.__sources)
- return self.__has_all_sources_cached
+ def _fetch_needed(self):
+ return not self.__sources.cached() and not self.__sources.cached_original()
+ # _should_fetch():
+ #
+ # Return whether we need to run the fetch stage for this element
+ #
+ # Args:
+ # fetch_original (bool): whether we need the original unstaged source
+ #
+ # Returns:
+ # (bool): True if a fetch job is required
+ #
def _should_fetch(self, fetch_original=False):
- """ return bool of if we need to run the fetch stage for this element
-
- Args:
- fetch_original (bool): whether we need to original unstaged source
- """
if fetch_original:
- return not self._has_all_sources_cached()
- return not self._has_all_sources_in_source_cache()
+ return not self.__sources.cached_original()
+ return not self.__sources.cached()
# _set_required_callback()
#
@@ -2293,22 +2203,6 @@ class Element(Plugin):
# Private Local Methods #
#############################################################
- # __update_resolved_state()
- #
- # Updates source's resolved state
- #
- # An element's source state must be resolved before it may compute
- # cache keys, because the source's ref, whether defined in yaml or
- # from the workspace, is a component of the element's cache keys.
- #
- def __update_resolved_state(self):
- for source in self.__sources:
- if not source.is_resolved():
- break
- else:
- self.__is_resolved = True
- self.__update_cache_keys()
-
# __get_dependency_refs()
#
# Retrieve the artifact refs of the element's dependencies
@@ -2419,16 +2313,7 @@ class Element(Plugin):
# Prepend provenance to the error
raise ElementError("{}: {}".format(self, e), reason=e.reason, detail=e.detail) from e
- # Ensure that the first source does not need access to previous soruces
- if self.__sources and self.__sources[0]._requires_previous_sources():
- raise ElementError(
- "{}: {} cannot be the first source of an element "
- "as it requires access to previous sources".format(self, self.__sources[0])
- )
-
- # Preflight the sources
- for source in self.sources():
- source._preflight()
+ self.__sources.preflight()
# __assert_cached()
#
@@ -2903,38 +2788,6 @@ class Element(Plugin):
return True
- # __cache_sources():
- #
- # Caches the sources into the local CAS
- #
- def __cache_sources(self):
- if self.__sources and not self._has_all_sources_in_source_cache():
- last_requires_previous = 0
- # commit all other sources by themselves
- for ix, source in enumerate(self.__sources):
- if source.BST_REQUIRES_PREVIOUS_SOURCES_STAGE:
- self.__sourcecache.commit(source, self.__sources[last_requires_previous:ix])
- last_requires_previous = ix
- else:
- self.__sourcecache.commit(source, [])
-
- # __last_source_requires_previous
- #
- # This is the last source that requires previous sources to be cached.
- # Sources listed after this will be cached separately.
- #
- # Returns:
- # (int): index of last source that requires previous sources
- #
- def __last_source_requires_previous(self):
- if self.__last_source_requires_previous_ix is None:
- last_requires_previous = 0
- for ix, source in enumerate(self.__sources):
- if source.BST_REQUIRES_PREVIOUS_SOURCES_STAGE:
- last_requires_previous = ix
- self.__last_source_requires_previous_ix = last_requires_previous
- return self.__last_source_requires_previous_ix
-
# __update_cache_keys()
#
# Updates weak and strict cache keys
diff --git a/src/buildstream/source.py b/src/buildstream/source.py
index f15d5a628..4c284a57c 100644
--- a/src/buildstream/source.py
+++ b/src/buildstream/source.py
@@ -539,10 +539,10 @@ class Source(Plugin):
"""Implement any validations once we know the sources are cached
This is guaranteed to be called only once for a given session
- once the sources are known to be cached.
- If source tracking is enabled in the session for this source,
- then this will only be called if the sources become cached after
- tracking completes.
+ once the sources are known to be cached, before
+ :func:`Source.stage() <buildstream.source.Source.stage>` or
+ :func:`Source.init_workspace() <buildstream.source.Source.init_workspace>`
+ is called.
"""
def is_cached(self) -> bool:
@@ -780,9 +780,6 @@ class Source(Plugin):
reason="source-bug",
)
- if self.__is_cached:
- self.validate_cache()
-
return self.__is_cached
# Wrapper function around plugin provided fetch method
@@ -800,8 +797,6 @@ class Source(Plugin):
else:
self.__do_fetch()
- self.validate_cache()
-
# _fetch_done()
#
# Indicates that fetching the source has been done.
@@ -831,6 +826,7 @@ class Source(Plugin):
cas_dir = CasBasedDirectory(self._get_context().get_cascache(), digest=self.__digest)
directory.import_files(cas_dir)
else:
+ self.validate_cache()
self.stage(directory)
# Wrapper for init_workspace()
@@ -840,6 +836,7 @@ class Source(Plugin):
directory = self.__ensure_directory(directory)
+ self.validate_cache()
self.init_workspace(directory)
# _get_unique_key():
diff --git a/tests/frontend/track.py b/tests/frontend/track.py
index 2ebaae81f..5fe9bbde6 100644
--- a/tests/frontend/track.py
+++ b/tests/frontend/track.py
@@ -209,8 +209,7 @@ def test_track_consistency_error(cli, datafiles):
# Track the element causing a consistency error
result = cli.run(project=project, args=["source", "track", "error.bst"])
- result.assert_main_error(ErrorDomain.STREAM, None)
- result.assert_task_error(ErrorDomain.SOURCE, "the-consistency-error")
+ result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error")
@pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror"))
@@ -221,7 +220,7 @@ def test_track_consistency_bug(cli, datafiles):
result = cli.run(project=project, args=["source", "track", "bug.bst"])
# We expect BuildStream to fail gracefully, with no recorded exception.
- result.assert_main_error(ErrorDomain.STREAM, None)
+ result.assert_main_error(ErrorDomain.PLUGIN, "source-bug")
@pytest.mark.datafiles(DATA_DIR)
diff --git a/tests/sources/git.py b/tests/sources/git.py
index 033db1bf9..30657d825 100644
--- a/tests/sources/git.py
+++ b/tests/sources/git.py
@@ -498,9 +498,8 @@ def test_unlisted_submodule(cli, tmpdir, datafiles, fail):
element = {"kind": "import", "sources": [gitsource]}
generate_element(project, "target.bst", element)
- # We will not see the warning or error before the first fetch, because
- # we don't have the repository yet and so we have no knowledge of
- # the unlisted submodule.
+ # The warning or error is reported during fetch. There should be no
+ # error with `bst show`.
result = cli.run(project=project, args=["show", "target.bst"])
result.assert_success()
assert "git:unlisted-submodule" not in result.stderr
@@ -517,17 +516,10 @@ def test_unlisted_submodule(cli, tmpdir, datafiles, fail):
result.assert_success()
assert "git:unlisted-submodule" in result.stderr
- # Now that we've fetched it, `bst show` will discover the unlisted submodule too
+ # Verify that `bst show` will still not error out after fetching.
result = cli.run(project=project, args=["show", "target.bst"])
-
- # Assert a warning or an error depending on what we're checking
- if fail == "error":
- result.assert_main_error(ErrorDomain.PLUGIN, "git:unlisted-submodule")
- else:
- result.assert_success()
- # We have cached things internally and successfully. Therefore, the plugin
- # is not involved in checking whether the cache is correct or not.
- assert "git:unlisted-submodule" not in result.stderr
+ result.assert_success()
+ assert "git:unlisted-submodule" not in result.stderr
@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
@@ -612,9 +604,8 @@ def test_invalid_submodule(cli, tmpdir, datafiles, fail):
element = {"kind": "import", "sources": [gitsource]}
generate_element(project, "target.bst", element)
- # We will not see the warning or error before the first fetch, because
- # we don't have the repository yet and so we have no knowledge of
- # the unlisted submodule.
+ # The warning or error is reported during fetch. There should be no
+ # error with `bst show`.
result = cli.run(project=project, args=["show", "target.bst"])
result.assert_success()
assert "git:invalid-submodule" not in result.stderr
@@ -631,17 +622,10 @@ def test_invalid_submodule(cli, tmpdir, datafiles, fail):
result.assert_success()
assert "git:invalid-submodule" in result.stderr
- # Now that we've fetched it, `bst show` will discover the unlisted submodule too
+ # Verify that `bst show` will still not error out after fetching.
result = cli.run(project=project, args=["show", "target.bst"])
-
- # Assert a warning or an error depending on what we're checking
- if fail == "error":
- result.assert_main_error(ErrorDomain.PLUGIN, "git:invalid-submodule")
- else:
- result.assert_success()
- # We have cached things internally and successfully. Therefore, the plugin
- # is not involved in checking whether the cache is correct or not.
- assert "git:invalid-submodule" not in result.stderr
+ result.assert_success()
+ assert "git:invalid-submodule" not in result.stderr
@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
@@ -683,12 +667,14 @@ def test_track_invalid_submodule(cli, tmpdir, datafiles, fail):
result.assert_success()
assert "git:invalid-submodule" not in result.stderr
- # In this case, we will get the error directly after tracking,
- # since the new HEAD does not require any submodules which are
- # not locally cached, the Source will be CACHED directly after
- # tracking and the validations will occur as a result.
- #
+ # After tracking we're pointing to a ref, which would trigger an invalid
+ # submodule warning. However, cache validation is only performed as part
+ # of fetch.
result = cli.run(project=project, args=["source", "track", "target.bst"])
+ result.assert_success()
+
+ # Fetch to trigger cache validation
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
if fail == "error":
result.assert_main_error(ErrorDomain.STREAM, None)
result.assert_task_error(ErrorDomain.PLUGIN, "git:invalid-submodule")
diff --git a/tests/sources/local.py b/tests/sources/local.py
index 2b0155107..f68a5b3b1 100644
--- a/tests/sources/local.py
+++ b/tests/sources/local.py
@@ -129,7 +129,7 @@ def test_stage_file_exists(cli, datafiles):
# Build, checkout
result = cli.run(project=project, args=["build", "target.bst"])
result.assert_main_error(ErrorDomain.STREAM, None)
- result.assert_task_error(ErrorDomain.ELEMENT, "import-source-files-fail")
+ result.assert_task_error(ErrorDomain.ELEMENT, "stage-sources-fail")
@pytest.mark.datafiles(os.path.join(DATA_DIR, "directory"))