summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2019-11-22 13:03:49 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-11-22 13:03:49 +0000
commit229b9ba3f47d9fc966e82cf5dad55799313c14aa (patch)
treed2a955f92b9695702fdb951f43293d3c60abb218
parent8ae2f7da6ac3356a7054531a587cd2bfecb62301 (diff)
parentf3e1d232e1edb80dc5dabb5e25b6c011a53a8b43 (diff)
downloadbuildstream-229b9ba3f47d9fc966e82cf5dad55799313c14aa.tar.gz
Merge branch 'tlater/track-cleanup' into 'master'
Simplify the codebase using post-tracking removal simplicity of the pipeline Closes #1199, #1195, #1194, and #1193 See merge request BuildStream/buildstream!1718
-rw-r--r--NEWS23
-rw-r--r--man/bst-source-fetch.18
-rw-r--r--man/bst-workspace-open.15
-rw-r--r--src/buildstream/_frontend/cli.py36
-rw-r--r--src/buildstream/_loader/loader.py3
-rw-r--r--src/buildstream/_scheduler/queues/queue.py12
-rw-r--r--src/buildstream/_scheduler/queues/trackqueue.py5
-rw-r--r--src/buildstream/_stream.py327
-rw-r--r--src/buildstream/element.py106
-rw-r--r--src/buildstream/source.py17
-rw-r--r--tests/frontend/fetch.py10
11 files changed, 198 insertions, 354 deletions
diff --git a/NEWS b/NEWS
index 3f7f1732e..2639dabfb 100644
--- a/NEWS
+++ b/NEWS
@@ -5,16 +5,19 @@
CLI
---
- o BREAKING CHANGE: `bst build` no longer accepts any options related to
- tracking. Please use `bst source track` separately prior to running
- `bst build`, if you need similar functionality. The full list of removed
- options is as follows:
-
- * `--track`
- * `--track-all`
- * `--track-except`
- * `--track-cross-junctions` / `-J`
- * `--track-save`
+ o BREAKING CHANGE: Commands no longer accept any options related to
+ tracking. Please use `bst source track` separately prior to
+ running commands, if you need similar functionality. The full list
+ of removed options is as follows:
+
+ * `bst build --track`
+ * `bst build --track-all`
+ * `bst build --track-except`
+ * `bst build --track-cross-junctions` / `bst build -J`
+ * `bst build --track-save`
+ * `bst source fetch --track`
+ * `bst source fetch --track-cross-junctions` / `bst source fetch -J`
+ * `bst workspace open --track`
API
---
diff --git a/man/bst-source-fetch.1 b/man/bst-source-fetch.1
index e53e2be53..0b33859ed 100644
--- a/man/bst-source-fetch.1
+++ b/man/bst-source-fetch.1
@@ -1,4 +1,4 @@
-.TH "BST SOURCE FETCH" "1" "31-Oct-2019" "" "bst source fetch Manual"
+.TH "BST SOURCE FETCH" "1" "20-Nov-2019" "" "bst source fetch Manual"
.SH NAME
bst\-source\-fetch \- Fetch sources in a pipeline
.SH SYNOPSIS
@@ -33,11 +33,5 @@ Except certain dependencies from fetching
\fB\-d,\fP \-\-deps [none|plan|all]
The dependencies to fetch [default: plan]
.TP
-\fB\-\-track\fP
-Track new source references before fetching
-.TP
-\fB\-J,\fP \-\-track\-cross\-junctions
-Allow tracking to cross junction boundaries
-.TP
\fB\-r,\fP \-\-remote TEXT
The URL of the remote source cache (defaults to the first configured cache)
diff --git a/man/bst-workspace-open.1 b/man/bst-workspace-open.1
index 4ccf010af..c1b1d0bed 100644
--- a/man/bst-workspace-open.1
+++ b/man/bst-workspace-open.1
@@ -1,4 +1,4 @@
-.TH "BST WORKSPACE OPEN" "1" "31-Oct-2019" "" "bst workspace open Manual"
+.TH "BST WORKSPACE OPEN" "1" "20-Nov-2019" "" "bst workspace open Manual"
.SH NAME
bst\-workspace\-open \- Open a new workspace
.SH SYNOPSIS
@@ -14,8 +14,5 @@ Do not checkout the source, only link to the given directory
\fB\-f,\fP \-\-force
The workspace will be created even if the directory in which it will be created is not empty or if a workspace for that element already exists
.TP
-\fB\-\-track\fP
-Track and fetch new source references before checking out the workspace
-.TP
\fB\-\-directory\fP DIRECTORY
Only for use when a single Element is given: Set the directory to use to create the workspace
diff --git a/src/buildstream/_frontend/cli.py b/src/buildstream/_frontend/cli.py
index b6c126b34..10ad23bb7 100644
--- a/src/buildstream/_frontend/cli.py
+++ b/src/buildstream/_frontend/cli.py
@@ -761,14 +761,12 @@ def source():
type=click.Choice(["none", "plan", "all"]),
help="The dependencies to fetch",
)
-@click.option("--track", "track_", is_flag=True, help="Track new source references before fetching")
-@click.option("--track-cross-junctions", "-J", is_flag=True, help="Allow tracking to cross junction boundaries")
@click.option(
"--remote", "-r", default=None, help="The URL of the remote source cache (defaults to the first configured cache)"
)
@click.argument("elements", nargs=-1, type=click.Path(readable=False))
@click.pass_obj
-def source_fetch(app, elements, deps, track_, except_, track_cross_junctions, remote):
+def source_fetch(app, elements, deps, except_, remote):
"""Fetch sources required to build the pipeline
Specifying no elements will result in fetching the default targets
@@ -790,32 +788,11 @@ def source_fetch(app, elements, deps, track_, except_, track_cross_junctions, re
plan: Only dependencies required for the build plan
all: All dependencies
"""
- from .._pipeline import PipelineSelection
-
- if track_cross_junctions and not track_:
- click.echo("ERROR: The --track-cross-junctions option can only be used with --track", err=True)
- sys.exit(-1)
-
- if track_ and deps == PipelineSelection.PLAN:
- click.echo(
- "WARNING: --track specified for tracking of a build plan\n\n"
- "Since tracking modifies the build plan, all elements will be tracked.",
- err=True,
- )
- deps = PipelineSelection.ALL
-
with app.initialized(session_name="Fetch"):
if not elements:
elements = app.project.get_default_targets()
- app.stream.fetch(
- elements,
- selection=deps,
- except_targets=except_,
- track_targets=track_,
- track_cross_junctions=track_cross_junctions,
- remote=remote,
- )
+ app.stream.fetch(elements, selection=deps, except_targets=except_, remote=remote)
##################################################################
@@ -969,9 +946,6 @@ def workspace():
+ "or if a workspace for that element already exists",
)
@click.option(
- "--track", "track_", is_flag=True, help="Track and fetch new source references before checking out the workspace"
-)
-@click.option(
"--directory",
type=click.Path(file_okay=False),
default=None,
@@ -979,13 +953,11 @@ def workspace():
)
@click.argument("elements", nargs=-1, type=click.Path(readable=False), required=True)
@click.pass_obj
-def workspace_open(app, no_checkout, force, track_, directory, elements):
+def workspace_open(app, no_checkout, force, directory, elements):
"""Open a workspace for manual source modification"""
with app.initialized():
- app.stream.workspace_open(
- elements, no_checkout=no_checkout, track_first=track_, force=force, custom_dir=directory
- )
+ app.stream.workspace_open(elements, no_checkout=no_checkout, force=force, custom_dir=directory)
##################################################################
diff --git a/src/buildstream/_loader/loader.py b/src/buildstream/_loader/loader.py
index 3b721d6f2..b2e58b9e7 100644
--- a/src/buildstream/_loader/loader.py
+++ b/src/buildstream/_loader/loader.py
@@ -669,9 +669,6 @@ class Loader:
# Optimization for junctions with a single local source
basedir = sources[0]._get_local_path()
else:
- # Stage sources
- element._set_required()
-
# Note: We use _KeyStrength.WEAK here because junctions
# cannot have dependencies, therefore the keys are
# equivalent.
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 986ac6c0a..295161ed2 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -181,10 +181,7 @@ class Queue:
# Obtain immediate element status
for elt in elts:
- if self._required_element_check and not elt._is_required():
- elt._set_required_callback(self._enqueue_element)
- else:
- self._enqueue_element(elt)
+ self._enqueue_element(elt)
# dequeue()
#
@@ -241,13 +238,6 @@ class Queue:
for element in ready
]
- # set_required_element_check()
- #
- # This ensures that, for the first non-track queue, we must check
- # whether elements are required before enqueuing them
- def set_required_element_check(self):
- self._required_element_check = True
-
# any_failed_elements()
#
# Returns whether any elements in this queue have failed their jobs
diff --git a/src/buildstream/_scheduler/queues/trackqueue.py b/src/buildstream/_scheduler/queues/trackqueue.py
index 4187c5c7b..d9c31ace1 100644
--- a/src/buildstream/_scheduler/queues/trackqueue.py
+++ b/src/buildstream/_scheduler/queues/trackqueue.py
@@ -39,8 +39,9 @@ class TrackQueue(Queue):
return TrackQueue._track_element
def status(self, element):
- # We can skip elements entirely if they have no sources.
- if not list(element.sources()):
+ # We can skip elements entirely if they have no trackable
+ # sources.
+ if not any(source._is_trackable() for source in element.sources()):
# But we still have to mark them as tracked
element._tracking_done()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index e0a8d92bb..0a3495ce5 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -29,6 +29,7 @@ import tempfile
from contextlib import contextmanager, suppress
from fnmatch import fnmatch
from collections import deque
+from typing import List, Tuple
from ._artifactelement import verify_artifact_ref, ArtifactElement
from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError
@@ -46,6 +47,7 @@ from ._scheduler import (
Notification,
JobStatus,
)
+from .element import Element
from ._pipeline import Pipeline, PipelineSelection
from ._profile import Topics, PROFILER
from ._state import State
@@ -158,9 +160,8 @@ class Stream:
load_refs=False
):
with PROFILER.profile(Topics.LOAD_SELECTION, "_".join(t.replace(os.sep, "-") for t in targets)):
- target_objects, _ = self._load(
+ target_objects = self._load(
targets,
- (),
selection=selection,
except_targets=except_targets,
use_artifact_config=use_artifact_config,
@@ -280,9 +281,8 @@ class Stream:
if remote:
use_config = False
- elements, _ = self._load(
+ elements = self._load(
targets,
- [],
selection=selection,
ignore_junction_targets=ignore_junction_targets,
use_artifact_config=use_config,
@@ -334,48 +334,24 @@ class Stream:
# targets (list of str): Targets to fetch
# selection (PipelineSelection): The selection mode for the specified targets
# except_targets (list of str): Specified targets to except from fetching
- # track_targets (bool): Whether to track selected targets in addition to fetching
- # track_cross_junctions (bool): Whether tracking should cross junction boundaries
# remote (str|None): The URL of a specific remote server to pull from.
#
- def fetch(
- self,
- targets,
- *,
- selection=PipelineSelection.PLAN,
- except_targets=None,
- track_targets=False,
- track_cross_junctions=False,
- remote=None
- ):
-
- if track_targets:
- track_targets = targets
- track_selection = selection
- track_except_targets = except_targets
- else:
- track_targets = ()
- track_selection = PipelineSelection.NONE
- track_except_targets = ()
+ def fetch(self, targets, *, selection=PipelineSelection.PLAN, except_targets=None, remote=None):
use_source_config = True
if remote:
use_source_config = False
- elements, track_elements = self._load(
+ elements = self._load(
targets,
- track_targets,
selection=selection,
- track_selection=track_selection,
except_targets=except_targets,
- track_except_targets=track_except_targets,
- track_cross_junctions=track_cross_junctions,
use_source_config=use_source_config,
source_remote_url=remote,
)
# Delegated to a shared fetch method
- self._fetch(elements, track_elements=track_elements)
+ self._fetch(elements)
# track()
#
@@ -392,27 +368,20 @@ class Stream:
#
def track(self, targets, *, selection=PipelineSelection.REDIRECT, except_targets=None, cross_junctions=False):
- # We pass no target to build. Only to track. Passing build targets
- # would fully load project configuration which might not be
- # possible before tracking is done.
- _, elements = self._load(
- [],
- targets,
- selection=selection,
- track_selection=selection,
- except_targets=except_targets,
- track_except_targets=except_targets,
- track_cross_junctions=cross_junctions,
+ elements = self._load_tracking(
+ targets, selection=selection, except_targets=except_targets, cross_junctions=cross_junctions
)
- # FIXME: this can be refactored after element._update_state is simplified/removed
- elements = [element for element in elements if element._schedule_tracking()]
- self._pipeline.resolve_elements(elements)
+ # Note: We do not currently need to initialize the state of an
+ # element before it is tracked, since tracking can be done
+ # irrespective of source/artifact condition. Once an element
+ # is tracked, its state must be fully updated in either case,
+ # and we anyway don't do anything else with it.
self._scheduler.clear_queues()
track_queue = TrackQueue(self._scheduler)
- self._add_queue(track_queue, track=True)
- self._enqueue_plan(elements, queue=track_queue)
+ self._add_queue(track_queue)
+ self._enqueue_plan(elements)
self._run()
# pull()
@@ -434,9 +403,8 @@ class Stream:
if remote:
use_config = False
- elements, _ = self._load(
+ elements = self._load(
targets,
- (),
selection=selection,
ignore_junction_targets=ignore_junction_targets,
use_artifact_config=use_config,
@@ -476,9 +444,8 @@ class Stream:
if remote:
use_config = False
- elements, _ = self._load(
+ elements = self._load(
targets,
- (),
selection=selection,
ignore_junction_targets=ignore_junction_targets,
use_artifact_config=use_config,
@@ -569,7 +536,7 @@ class Stream:
tar=False
):
- elements, _ = self._load((target,), (), selection=selection, use_artifact_config=True, load_refs=True)
+ elements = self._load((target,), selection=selection, use_artifact_config=True, load_refs=True)
# self.targets contains a list of the loaded target objects
# if we specify --deps build, Stream._load() will return a list
@@ -778,7 +745,7 @@ class Stream:
self._check_location_writable(location, force=force, tar=tar)
- elements, _ = self._load((target,), (), selection=deps, except_targets=except_targets)
+ elements = self._load((target,), selection=deps, except_targets=except_targets)
# Assert all sources are cached in the source dir
self._fetch(elements)
@@ -805,28 +772,18 @@ class Stream:
# force (bool): Whether to ignore contents in an existing directory
# custom_dir (str): Custom location to create a workspace or false to use default location.
#
- def workspace_open(self, targets, *, no_checkout, track_first, force, custom_dir):
+ def workspace_open(self, targets, *, no_checkout, force, custom_dir):
# This function is a little funny but it is trying to be as atomic as possible.
- if track_first:
- track_targets = targets
- else:
- track_targets = ()
-
- elements, track_elements = self._load(
- targets, track_targets, selection=PipelineSelection.REDIRECT, track_selection=PipelineSelection.REDIRECT
- )
+ elements = self._load(targets, selection=PipelineSelection.REDIRECT)
workspaces = self._context.get_workspaces()
# If we're going to checkout, we need at least a fetch,
# if we were asked to track first, we're going to fetch anyway.
#
- if not no_checkout or track_first:
- track_elements = []
- if track_first:
- track_elements = elements
- self._fetch(elements, track_elements=track_elements, fetch_original=True)
+ if not no_checkout:
+ self._fetch(elements, fetch_original=True)
expanded_directories = []
# To try to be more atomic, loop through the elements and raise any errors we can early
@@ -966,9 +923,7 @@ class Stream:
#
def workspace_reset(self, targets, *, soft, track_first):
- elements, _ = self._load(
- targets, [], selection=PipelineSelection.REDIRECT, track_selection=PipelineSelection.REDIRECT
- )
+ elements = self._load(targets, selection=PipelineSelection.REDIRECT)
nonexisting = []
for element in elements:
@@ -991,13 +946,7 @@ class Stream:
self.workspace_close(element._get_full_name(), remove_dir=True)
workspaces.save_config()
- self.workspace_open(
- [element._get_full_name()],
- no_checkout=False,
- track_first=track_first,
- force=True,
- custom_dir=workspace_path,
- )
+ self.workspace_open([element._get_full_name()], no_checkout=False, force=True, custom_dir=workspace_path)
# workspace_exists
#
@@ -1076,9 +1025,7 @@ class Stream:
else:
output_elements.add(e)
if load_elements:
- loaded_elements, _ = self._load(
- load_elements, (), selection=PipelineSelection.REDIRECT, track_selection=PipelineSelection.REDIRECT
- )
+ loaded_elements = self._load(load_elements, selection=PipelineSelection.REDIRECT)
for e in loaded_elements:
output_elements.add(e.name)
@@ -1180,6 +1127,116 @@ class Stream:
# Private Methods #
#############################################################
+ # __load_elements_from_targets
+ #
+ # Given the usual set of target element names/artifact refs, load
+ # the `Element` objects required to describe the selection.
+ #
+ # The result is returned as a truple - firstly the loaded normal
+ # elements, secondly the loaded "excepting" elements and lastly
+ # the loaded artifact elements.
+ #
+ # Args:
+ # targets - The target element names/artifact refs
+ # except_targets - The names of elements to except
+ # rewritable - Whether to load the elements in re-writable mode
+ #
+ # Returns:
+ # ([elements], [except_elements], [artifact_elements])
+ #
+ def __load_elements_from_targets(
+ self, targets: List[str], except_targets: List[str], *, rewritable: bool = False
+ ) -> Tuple[List[Element], List[Element], List[Element]]:
+ names, refs = self._classify_artifacts(targets)
+ loadable = [names, except_targets]
+
+ # Load and filter elements
+ if loadable:
+ elements, except_elements = self._pipeline.load(loadable, rewritable=rewritable)
+ else:
+ elements, except_elements = [], []
+
+ # Load artifacts
+ if refs:
+ artifacts = self._pipeline.load_artifacts(refs)
+ else:
+ artifacts = []
+
+ return elements, except_elements, artifacts
+
+ # __connect_remotes()
+ #
+ # Connect to the source and artifact remotes.
+ #
+ # Args:
+ # artifact_url - The url of the artifact server to connect to.
+ # source_url - The url of the source server to connect to.
+ # use_artifact_config - Whether to use the artifact config.
+ # use_source_config - Whether to use the source config.
+ #
+ def __connect_remotes(
+ self, artifact_url: str, source_url: str, use_artifact_config: bool, use_source_config: bool
+ ):
+ # ArtifactCache.setup_remotes expects all projects to be fully loaded
+ for project in self._context.get_projects():
+ project.ensure_fully_loaded()
+
+ # Connect to remote caches, this needs to be done before resolving element state
+ self._artifacts.setup_remotes(use_config=use_artifact_config, remote_url=artifact_url)
+ self._sourcecache.setup_remotes(use_config=use_source_config, remote_url=source_url)
+
+ # _load_tracking()
+ #
+ # A variant of _load() to be used when the elements should be used
+ # for tracking
+ #
+ # If `targets` is not empty used project configuration will be
+ # fully loaded.
+ #
+ # Args:
+ # targets (list of str): Targets to load
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # except_targets (list of str): Specified targets to except
+ # cross_junctions (bool): Whether tracking should cross junction boundaries
+ #
+ # Returns:
+ # (list of Element): The tracking element selection
+ #
+ def _load_tracking(self, targets, *, selection=PipelineSelection.NONE, except_targets=(), cross_junctions=False):
+ # We never want to use a PLAN selection when tracking elements
+ assert selection != PipelineSelection.PLAN
+
+ elements, except_elements, artifacts = self.__load_elements_from_targets(
+ targets, except_targets, rewritable=True
+ )
+
+ # We can't track artifact refs, since they have no underlying
+ # elements or sources to interact with. Abort if the user asks
+ # us to do that.
+ if artifacts:
+ detail = "\n".join(artifact.get_artifact_name() for artifact in artifacts)
+ raise ArtifactElementError("Cannot perform this operation with artifact refs:", detail=detail)
+
+ # Hold on to the targets
+ self.targets = elements
+
+ track_projects = {}
+ for element in elements:
+ project = element._get_project()
+ if project not in track_projects:
+ track_projects[project] = [element]
+ else:
+ track_projects[project].append(element)
+
+ track_selected = []
+
+ for project, project_elements in track_projects.items():
+ selected = self._pipeline.get_selection(project_elements, selection)
+ selected = self._pipeline.track_cross_junction_filter(project, selected, cross_junctions)
+ track_selected.extend(selected)
+
+ return self._pipeline.except_elements(elements, track_selected, except_elements)
+
# _load()
#
# A convenience method for loading element lists
@@ -1192,12 +1249,8 @@ class Stream:
#
# Args:
# targets (list of str): Main targets to load
- # track_targets (list of str): Tracking targets
# selection (PipelineSelection): The selection mode for the specified targets
- # track_selection (PipelineSelection): The selection mode for the specified tracking targets
# except_targets (list of str): Specified targets to except from fetching
- # track_except_targets (list of str): Specified targets to except from fetching
- # track_cross_junctions (bool): Whether tracking should cross junction boundaries
# ignore_junction_targets (bool): Whether junction targets should be filtered out
# use_artifact_config (bool): Whether to initialize artifacts with the config
# use_source_config (bool): Whether to initialize remote source caches with the config
@@ -1211,13 +1264,9 @@ class Stream:
def _load(
self,
targets,
- track_targets,
*,
selection=PipelineSelection.NONE,
- track_selection=PipelineSelection.NONE,
except_targets=(),
- track_except_targets=(),
- track_cross_junctions=False,
ignore_junction_targets=False,
use_artifact_config=False,
use_source_config=False,
@@ -1226,76 +1275,25 @@ class Stream:
dynamic_plan=False,
load_refs=False
):
+ elements, except_elements, artifacts = self.__load_elements_from_targets(
+ targets, except_targets, rewritable=False
+ )
- # Classify element and artifact strings
- target_elements, target_artifacts = self._classify_artifacts(targets)
-
- if target_artifacts:
+ if artifacts:
if not load_refs:
- detail = "\n".join(target_artifacts)
+ detail = "\n".join(artifact.get_artifact_name() for artifact in artifacts)
raise ArtifactElementError("Cannot perform this operation with artifact refs:", detail=detail)
if selection in (PipelineSelection.ALL, PipelineSelection.RUN):
raise StreamError("Error: '--deps {}' is not supported for artifact refs".format(selection))
- # Load rewritable if we have any tracking selection to make
- rewritable = False
- if track_targets:
- rewritable = True
-
- # Load all target elements
- loadable = [target_elements, except_targets, track_targets, track_except_targets]
- if any(loadable):
- elements, except_elements, track_elements, track_except_elements = self._pipeline.load(
- loadable, rewritable=rewritable
- )
- else:
- elements, except_elements, track_elements, track_except_elements = [], [], [], []
-
- # Load all target artifacts
- artifacts = self._pipeline.load_artifacts(target_artifacts) if target_artifacts else []
-
- # Optionally filter out junction elements
if ignore_junction_targets:
elements = [e for e in elements if e.get_kind() != "junction"]
# Hold on to the targets
self.targets = elements + artifacts
- # First take care of marking tracking elements, this must be
- # done before resolving element states.
- #
- assert track_selection != PipelineSelection.PLAN
-
- # Tracked elements are split by owner projects in order to
- # filter cross junctions tracking dependencies on their
- # respective project.
- track_projects = {}
- for element in track_elements:
- project = element._get_project()
- if project not in track_projects:
- track_projects[project] = [element]
- else:
- track_projects[project].append(element)
-
- track_selected = []
-
- for project, project_elements in track_projects.items():
- selected = self._pipeline.get_selection(project_elements, track_selection)
- selected = self._pipeline.track_cross_junction_filter(project, selected, track_cross_junctions)
- track_selected.extend(selected)
-
- track_selected = self._pipeline.except_elements(track_elements, track_selected, track_except_elements)
-
- if not targets:
- return [], track_selected
-
- # ArtifactCache.setup_remotes expects all projects to be fully loaded
- for project in self._context.get_projects():
- project.ensure_fully_loaded()
-
# Connect to remote caches, this needs to be done before resolving element state
- self._artifacts.setup_remotes(use_config=use_artifact_config, remote_url=artifact_remote_url)
- self._sourcecache.setup_remotes(use_config=use_source_config, remote_url=source_remote_url)
+ self.__connect_remotes(artifact_remote_url, source_remote_url, use_artifact_config, use_source_config)
# Now move on to loading primary selection.
#
@@ -1308,12 +1306,12 @@ class Stream:
# others are requested dynamically as needed.
# This avoids pulling, fetching, or building unneeded build-only dependencies.
for element in elements:
- element._set_required()
+ element._schedule_assembly_when_necessary()
else:
for element in selected:
- element._set_required()
+ element._schedule_assembly_when_necessary()
- return selected, track_selected
+ return selected
# _message()
#
@@ -1331,11 +1329,8 @@ class Stream:
# queue (Queue): Queue to add to the pipeline
# track (bool): Whether this is the tracking queue
#
- def _add_queue(self, queue, *, track=False):
+ def _add_queue(self, queue):
self.queues.append(queue)
- if not (track or self._first_non_track_queue):
- self._first_non_track_queue = queue
- self._first_non_track_queue.set_required_element_check()
# _enqueue_plan()
#
@@ -1346,8 +1341,7 @@ class Stream:
# queue (Queue): The target queue, defaults to the first non-track queue
#
def _enqueue_plan(self, plan, *, queue=None):
- queue = queue or self._first_non_track_queue
-
+ queue = queue or self.queues[0]
queue.enqueue(plan)
self.session_elements += plan
@@ -1396,35 +1390,16 @@ class Stream:
# track_elements (list of Element): Elements to track
# fetch_original (Bool): Whether to fetch original unstaged
#
- def _fetch(self, elements, *, track_elements=None, fetch_original=False):
-
- if track_elements is None:
- track_elements = []
-
- # Subtract the track elements from the fetch elements, they will be added separately
- fetch_plan = self._pipeline.subtract_elements(elements, track_elements)
+ def _fetch(self, elements, *, fetch_original=False):
# Assert consistency for the fetch elements
- self._pipeline.assert_consistent(fetch_plan)
-
- # Filter out elements with cached sources, only from the fetch plan
- # let the track plan resolve new refs.
- cached = [elt for elt in fetch_plan if not elt._should_fetch(fetch_original)]
- fetch_plan = self._pipeline.subtract_elements(fetch_plan, cached)
+ self._pipeline.assert_consistent(elements)
# Construct queues, enqueue and run
#
self._scheduler.clear_queues()
- track_queue = None
- if track_elements:
- track_queue = TrackQueue(self._scheduler)
- self._add_queue(track_queue, track=True)
self._add_queue(FetchQueue(self._scheduler, fetch_original=fetch_original))
-
- if track_elements:
- self._enqueue_plan(track_elements, queue=track_queue)
-
- self._enqueue_plan(fetch_plan)
+ self._enqueue_plan(elements)
self._run()
# _check_location_writable()
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 74fb1a056..a8c6bfa8f 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -262,7 +262,6 @@ class Element(Plugin):
self.__consistency = Consistency.INCONSISTENT # Cached overall consistency state
self.__assemble_scheduled = False # Element is scheduled to be assembled
self.__assemble_done = False # Element is assembled
- self.__tracking_scheduled = False # Sources are scheduled to be tracked
self.__pull_done = False # Whether pull was attempted
self.__cached_successfully = None # If the Element is known to be successfully cached
self.__source_cached = None # If the sources are known to be successfully cached
@@ -290,7 +289,6 @@ class Element(Plugin):
self.__batch_prepare_assemble_collect = None # type: Optional[str]
# Callbacks
- self.__required_callback = None # Callback to Queues
self.__can_query_cache_callback = None # Callback to PullQueue/FetchQueue
self.__buildable_callback = None # Callback to BuildQueue
@@ -1252,7 +1250,7 @@ class Element(Plugin):
# - _update_artifact_state()
# - Computes the state of the element's artifact using the
# cache key.
- # - __schedule_assembly_when_necessary()
+ # - _schedule_assembly_when_necessary()
# - Schedules assembly of an element, iff its current state
# allows/necessitates it
# - __update_cache_key_non_strict()
@@ -1330,43 +1328,11 @@ class Element(Plugin):
_, display_key, _ = self._get_display_key()
return display_key
- # _schedule_tracking():
- #
- # Force an element state to be inconsistent. Any sources appear to be
- # inconsistent.
- #
- # This is used across the pipeline in sessions where the
- # elements in question are going to be tracked, causing the
- # pipeline to rebuild safely by ensuring cache key recalculation
- # and reinterrogation of element state after tracking of elements
- # succeeds.
- #
- # This method should return the value of `__tracking_scheduled` to report
- # to callers that the element was marked for tracking.
- #
- # If `__tracking_scheduled` is not already determined then set it to `True`
- # if at least one source advertises that it can be tracked.
- #
- # Returns:
- # (bool): value of the `__tracking_scheduled` attribute
- #
- def _schedule_tracking(self) -> bool:
- # if the tracking schedule is already determined then this can be skipped
- if not self.__tracking_scheduled:
- # Tracking does not make sense in cases where no sources can be tracked.
- if any(source._is_trackable() for source in self.__sources):
- self.__tracking_scheduled = True
- return self.__tracking_scheduled
-
# _tracking_done():
#
# This is called in the main process after the element has been tracked
#
def _tracking_done(self):
- assert self.__tracking_scheduled
-
- self.__tracking_scheduled = False
-
# Tracking may change the sources' refs, and therefore the
# source state. We need to update source state.
self.__update_source_state()
@@ -1519,34 +1485,6 @@ class Element(Plugin):
# Ensure deterministic owners of sources at build time
vdirectory.set_deterministic_user()
- # _set_required():
- #
- # Mark this element and its runtime dependencies as required.
- # This unblocks pull/fetch/build.
- #
- def _set_required(self):
- if self.__required:
- # Already done
- return
-
- self.__required = True
-
- # Request artifacts of runtime dependencies
- for dep in self.dependencies(Scope.RUN, recurse=False):
- dep._set_required()
-
- # When an element becomes required, it must be assembled for
- # the current pipeline. `__schedule_assembly_when_necessary()`
- # will abort if some other state prevents it from being built,
- # and changes to such states will cause re-scheduling, so this
- # is safe.
- self.__schedule_assembly_when_necessary()
-
- # Callback to the Queue
- if self.__required_callback is not None:
- self.__required_callback(self)
- self.__required_callback = None
-
# _is_required():
#
# Returns whether this element has been marked as required.
@@ -1592,9 +1530,6 @@ class Element(Plugin):
# We're not processing
not processing
and
- # We're required for the current build
- self._is_required()
- and
# We have figured out the state of our artifact
self.__artifact
and
@@ -1602,12 +1537,12 @@ class Element(Plugin):
not self._cached()
)
- # __schedule_assembly_when_necessary():
+ # _schedule_assembly_when_necessary():
#
# This is called in the main process before the element is assembled
# in a subprocess.
#
- def __schedule_assembly_when_necessary(self):
+ def _schedule_assembly_when_necessary(self):
# FIXME: We could reduce the number of function calls a bit by
# factoring this out of this method (and checking whether we
# should schedule at the calling end).
@@ -1621,7 +1556,7 @@ class Element(Plugin):
# Requests artifacts of build dependencies
for dep in self.dependencies(Scope.BUILD, recurse=False):
- dep._set_required()
+ dep._schedule_assembly_when_necessary()
# Once we schedule an element for assembly, we know that our
# build dependencies have strong cache keys, so we can update
@@ -1885,7 +1820,7 @@ class Element(Plugin):
# We may not have actually pulled an artifact - the pull may
# have failed. We might therefore need to schedule assembly.
- self.__schedule_assembly_when_necessary()
+ self._schedule_assembly_when_necessary()
# If we've finished pulling, an artifact might now exist
# locally, so we might need to update a non-strict strong
# cache key.
@@ -2282,22 +2217,6 @@ class Element(Plugin):
else:
return True
- # _set_required_callback()
- #
- #
- # Notify the pull/fetch/build queue that the element is potentially
- # ready to be processed.
- #
- # _Set the _required_callback - the _required_callback is invoked when an
- # element is marked as required. This informs us that the element needs to
- # either be pulled or fetched + built.
- #
- # Args:
- # callback (callable) - The callback function
- #
- def _set_required_callback(self, callback):
- self.__required_callback = callback
-
# _set_can_query_cache_callback()
#
# Notify the pull/fetch queue that the element is potentially
@@ -2406,11 +2325,6 @@ class Element(Plugin):
assert "_Element__buildable_callback" in state
state["_Element__buildable_callback"] = None
- # This callback is not even read in the child process, so delete it.
- # If this assumption is invalidated, we will get an attribute error to
- # let us know, and we will need to update accordingly.
- del state["_Element__required_callback"]
-
return self.__meta_kind, state
def _walk_artifact_files(self):
@@ -2451,10 +2365,6 @@ class Element(Plugin):
#
def __update_source_state(self):
- # Cannot resolve source state until tracked
- if self.__tracking_scheduled:
- return
-
old_consistency = self.__consistency
self.__consistency = Consistency.CACHED
@@ -3225,7 +3135,7 @@ class Element(Plugin):
# to this element.
#
# If the state changes, this will subsequently call
- # `self.__schedule_assembly_when_necessary()` to schedule assembly if it becomes
+ # `self._schedule_assembly_when_necessary()` to schedule assembly if it becomes
# possible.
#
# Element.__update_cache_keys() must be called before this to have
@@ -3241,7 +3151,7 @@ class Element(Plugin):
if not context.get_strict() and not self.__artifact:
# We've calculated the weak_key, so instantiate artifact instance member
self.__artifact = Artifact(self, context, weak_key=self.__weak_cache_key)
- self.__schedule_assembly_when_necessary()
+ self._schedule_assembly_when_necessary()
if not self.__strict_cache_key:
return
@@ -3253,7 +3163,7 @@ class Element(Plugin):
if context.get_strict():
self.__artifact = self.__strict_artifact
- self.__schedule_assembly_when_necessary()
+ self._schedule_assembly_when_necessary()
else:
self.__update_cache_key_non_strict()
diff --git a/src/buildstream/source.py b/src/buildstream/source.py
index 2e7460439..dbe113409 100644
--- a/src/buildstream/source.py
+++ b/src/buildstream/source.py
@@ -170,7 +170,7 @@ from . import _yaml, utils
from .node import MappingNode
from .plugin import Plugin
from .types import Consistency, SourceRef, Union, List
-from ._exceptions import BstError, ImplError, ErrorDomain
+from ._exceptions import BstError, ImplError, PluginError, ErrorDomain
from ._loader.metasource import MetaSource
from ._projectrefs import ProjectRefStorage
from ._cachekey import generate_key
@@ -763,7 +763,20 @@ class Source(Plugin):
# Source consistency interrogations are silent.
context = self._get_context()
with context.messenger.silence():
- self.__consistency = self.get_consistency() # pylint: disable=assignment-from-no-return
+ try:
+ self.__consistency = self.get_consistency() # pylint: disable=assignment-from-no-return
+ except SourceError:
+ # SourceErrors should be preserved so that the
+ # plugin can communicate real error cases.
+ raise
+ except Exception as err: # pylint: disable=broad-except
+ # Generic errors point to bugs in the plugin, so
+ # we need to catch them and make sure they do not
+ # cause stacktraces
+ raise PluginError(
+ "Source plugin '{}' failed to compute source consistency: {}".format(self.get_kind(), err),
+ reason="source-bug",
+ )
# Give the Source an opportunity to validate the cached
# sources as soon as the Source becomes Consistency.CACHED.
diff --git a/tests/frontend/fetch.py b/tests/frontend/fetch.py
index ff3667707..9258fc7d9 100644
--- a/tests/frontend/fetch.py
+++ b/tests/frontend/fetch.py
@@ -60,16 +60,8 @@ def test_fetch_consistency_error(cli, datafiles):
def test_fetch_consistency_bug(cli, datafiles):
project = str(datafiles)
- # FIXME:
- #
- # When a plugin raises an unhandled exception at load
- # time, as is the case when running Source.get_consistency()
- # for a fetch command, we could report this to the user
- # more gracefully as a BUG message.
- #
result = cli.run(project=project, args=["source", "fetch", "bug.bst"])
- assert result.exc is not None
- assert str(result.exc) == "Something went terribly wrong"
+ result.assert_main_error(ErrorDomain.PLUGIN, "source-bug")
@pytest.mark.datafiles(DATA_DIR)