diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2019-11-22 13:03:49 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-11-22 13:03:49 +0000 |
commit | 229b9ba3f47d9fc966e82cf5dad55799313c14aa (patch) | |
tree | d2a955f92b9695702fdb951f43293d3c60abb218 | |
parent | 8ae2f7da6ac3356a7054531a587cd2bfecb62301 (diff) | |
parent | f3e1d232e1edb80dc5dabb5e25b6c011a53a8b43 (diff) | |
download | buildstream-229b9ba3f47d9fc966e82cf5dad55799313c14aa.tar.gz |
Merge branch 'tlater/track-cleanup' into 'master'
Simplify the codebase using post-tracking removal simplicity of the pipeline
Closes #1199, #1195, #1194, and #1193
See merge request BuildStream/buildstream!1718
-rw-r--r-- | NEWS | 23 | ||||
-rw-r--r-- | man/bst-source-fetch.1 | 8 | ||||
-rw-r--r-- | man/bst-workspace-open.1 | 5 | ||||
-rw-r--r-- | src/buildstream/_frontend/cli.py | 36 | ||||
-rw-r--r-- | src/buildstream/_loader/loader.py | 3 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/queue.py | 12 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/trackqueue.py | 5 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 327 | ||||
-rw-r--r-- | src/buildstream/element.py | 106 | ||||
-rw-r--r-- | src/buildstream/source.py | 17 | ||||
-rw-r--r-- | tests/frontend/fetch.py | 10 |
11 files changed, 198 insertions, 354 deletions
@@ -5,16 +5,19 @@ CLI --- - o BREAKING CHANGE: `bst build` no longer accepts any options related to - tracking. Please use `bst source track` separately prior to running - `bst build`, if you need similar functionality. The full list of removed - options is as follows: - - * `--track` - * `--track-all` - * `--track-except` - * `--track-cross-junctions` / `-J` - * `--track-save` + o BREAKING CHANGE: Commands no longer accept any options related to + tracking. Please use `bst source track` separately prior to + running commands, if you need similar functionality. The full list + of removed options is as follows: + + * `bst build --track` + * `bst build --track-all` + * `bst build --track-except` + * `bst build --track-cross-junctions` / `bst build -J` + * `bst build --track-save` + * `bst source fetch --track` + * `bst source fetch --track-cross-junctions` / `bst source fetch -J` + * `bst workspace open --track` API --- diff --git a/man/bst-source-fetch.1 b/man/bst-source-fetch.1 index e53e2be53..0b33859ed 100644 --- a/man/bst-source-fetch.1 +++ b/man/bst-source-fetch.1 @@ -1,4 +1,4 @@ -.TH "BST SOURCE FETCH" "1" "31-Oct-2019" "" "bst source fetch Manual" +.TH "BST SOURCE FETCH" "1" "20-Nov-2019" "" "bst source fetch Manual" .SH NAME bst\-source\-fetch \- Fetch sources in a pipeline .SH SYNOPSIS @@ -33,11 +33,5 @@ Except certain dependencies from fetching \fB\-d,\fP \-\-deps [none|plan|all] The dependencies to fetch [default: plan] .TP -\fB\-\-track\fP -Track new source references before fetching -.TP -\fB\-J,\fP \-\-track\-cross\-junctions -Allow tracking to cross junction boundaries -.TP \fB\-r,\fP \-\-remote TEXT The URL of the remote source cache (defaults to the first configured cache) diff --git a/man/bst-workspace-open.1 b/man/bst-workspace-open.1 index 4ccf010af..c1b1d0bed 100644 --- a/man/bst-workspace-open.1 +++ b/man/bst-workspace-open.1 @@ -1,4 +1,4 @@ -.TH "BST WORKSPACE OPEN" "1" "31-Oct-2019" "" "bst workspace open Manual" +.TH "BST WORKSPACE OPEN" "1" "20-Nov-2019" "" "bst workspace open Manual" .SH NAME bst\-workspace\-open \- Open a new workspace .SH SYNOPSIS @@ -14,8 +14,5 @@ Do not checkout the source, only link to the given directory \fB\-f,\fP \-\-force The workspace will be created even if the directory in which it will be created is not empty or if a workspace for that element already exists .TP -\fB\-\-track\fP -Track and fetch new source references before checking out the workspace -.TP \fB\-\-directory\fP DIRECTORY Only for use when a single Element is given: Set the directory to use to create the workspace diff --git a/src/buildstream/_frontend/cli.py b/src/buildstream/_frontend/cli.py index b6c126b34..10ad23bb7 100644 --- a/src/buildstream/_frontend/cli.py +++ b/src/buildstream/_frontend/cli.py @@ -761,14 +761,12 @@ def source(): type=click.Choice(["none", "plan", "all"]), help="The dependencies to fetch", ) -@click.option("--track", "track_", is_flag=True, help="Track new source references before fetching") -@click.option("--track-cross-junctions", "-J", is_flag=True, help="Allow tracking to cross junction boundaries") @click.option( "--remote", "-r", default=None, help="The URL of the remote source cache (defaults to the first configured cache)" ) @click.argument("elements", nargs=-1, type=click.Path(readable=False)) @click.pass_obj -def source_fetch(app, elements, deps, track_, except_, track_cross_junctions, remote): +def source_fetch(app, elements, deps, except_, remote): """Fetch sources required to build the pipeline Specifying no elements will result in fetching the default targets @@ -790,32 +788,11 @@ def source_fetch(app, elements, deps, track_, except_, track_cross_junctions, re plan: Only dependencies required for the build plan all: All dependencies """ - from .._pipeline import PipelineSelection - - if track_cross_junctions and not track_: - click.echo("ERROR: The --track-cross-junctions option can only be used with --track", err=True) - sys.exit(-1) - - if track_ and deps == PipelineSelection.PLAN: - click.echo( - "WARNING: --track specified for tracking of a build plan\n\n" - "Since tracking modifies the build plan, all elements will be tracked.", - err=True, - ) - deps = PipelineSelection.ALL - with app.initialized(session_name="Fetch"): if not elements: elements = app.project.get_default_targets() - app.stream.fetch( - elements, - selection=deps, - except_targets=except_, - track_targets=track_, - track_cross_junctions=track_cross_junctions, - remote=remote, - ) + app.stream.fetch(elements, selection=deps, except_targets=except_, remote=remote) ################################################################## @@ -969,9 +946,6 @@ def workspace(): + "or if a workspace for that element already exists", ) @click.option( - "--track", "track_", is_flag=True, help="Track and fetch new source references before checking out the workspace" -) -@click.option( "--directory", type=click.Path(file_okay=False), default=None, @@ -979,13 +953,11 @@ def workspace(): ) @click.argument("elements", nargs=-1, type=click.Path(readable=False), required=True) @click.pass_obj -def workspace_open(app, no_checkout, force, track_, directory, elements): +def workspace_open(app, no_checkout, force, directory, elements): """Open a workspace for manual source modification""" with app.initialized(): - app.stream.workspace_open( - elements, no_checkout=no_checkout, track_first=track_, force=force, custom_dir=directory - ) + app.stream.workspace_open(elements, no_checkout=no_checkout, force=force, custom_dir=directory) ################################################################## diff --git a/src/buildstream/_loader/loader.py b/src/buildstream/_loader/loader.py index 3b721d6f2..b2e58b9e7 100644 --- a/src/buildstream/_loader/loader.py +++ b/src/buildstream/_loader/loader.py @@ -669,9 +669,6 @@ class Loader: # Optimization for junctions with a single local source basedir = sources[0]._get_local_path() else: - # Stage sources - element._set_required() - # Note: We use _KeyStrength.WEAK here because junctions # cannot have dependencies, therefore the keys are # equivalent. diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index 986ac6c0a..295161ed2 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -181,10 +181,7 @@ class Queue: # Obtain immediate element status for elt in elts: - if self._required_element_check and not elt._is_required(): - elt._set_required_callback(self._enqueue_element) - else: - self._enqueue_element(elt) + self._enqueue_element(elt) # dequeue() # @@ -241,13 +238,6 @@ class Queue: for element in ready ] - # set_required_element_check() - # - # This ensures that, for the first non-track queue, we must check - # whether elements are required before enqueuing them - def set_required_element_check(self): - self._required_element_check = True - # any_failed_elements() # # Returns whether any elements in this queue have failed their jobs diff --git a/src/buildstream/_scheduler/queues/trackqueue.py b/src/buildstream/_scheduler/queues/trackqueue.py index 4187c5c7b..d9c31ace1 100644 --- a/src/buildstream/_scheduler/queues/trackqueue.py +++ b/src/buildstream/_scheduler/queues/trackqueue.py @@ -39,8 +39,9 @@ class TrackQueue(Queue): return TrackQueue._track_element def status(self, element): - # We can skip elements entirely if they have no sources. - if not list(element.sources()): + # We can skip elements entirely if they have no trackable + # sources. + if not any(source._is_trackable() for source in element.sources()): # But we still have to mark them as tracked element._tracking_done() diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index e0a8d92bb..0a3495ce5 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -29,6 +29,7 @@ import tempfile from contextlib import contextmanager, suppress from fnmatch import fnmatch from collections import deque +from typing import List, Tuple from ._artifactelement import verify_artifact_ref, ArtifactElement from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError @@ -46,6 +47,7 @@ from ._scheduler import ( Notification, JobStatus, ) +from .element import Element from ._pipeline import Pipeline, PipelineSelection from ._profile import Topics, PROFILER from ._state import State @@ -158,9 +160,8 @@ class Stream: load_refs=False ): with PROFILER.profile(Topics.LOAD_SELECTION, "_".join(t.replace(os.sep, "-") for t in targets)): - target_objects, _ = self._load( + target_objects = self._load( targets, - (), selection=selection, except_targets=except_targets, use_artifact_config=use_artifact_config, @@ -280,9 +281,8 @@ class Stream: if remote: use_config = False - elements, _ = self._load( + elements = self._load( targets, - [], selection=selection, ignore_junction_targets=ignore_junction_targets, use_artifact_config=use_config, @@ -334,48 +334,24 @@ class Stream: # targets (list of str): Targets to fetch # selection (PipelineSelection): The selection mode for the specified targets # except_targets (list of str): Specified targets to except from fetching - # track_targets (bool): Whether to track selected targets in addition to fetching - # track_cross_junctions (bool): Whether tracking should cross junction boundaries # remote (str|None): The URL of a specific remote server to pull from. # - def fetch( - self, - targets, - *, - selection=PipelineSelection.PLAN, - except_targets=None, - track_targets=False, - track_cross_junctions=False, - remote=None - ): - - if track_targets: - track_targets = targets - track_selection = selection - track_except_targets = except_targets - else: - track_targets = () - track_selection = PipelineSelection.NONE - track_except_targets = () + def fetch(self, targets, *, selection=PipelineSelection.PLAN, except_targets=None, remote=None): use_source_config = True if remote: use_source_config = False - elements, track_elements = self._load( + elements = self._load( targets, - track_targets, selection=selection, - track_selection=track_selection, except_targets=except_targets, - track_except_targets=track_except_targets, - track_cross_junctions=track_cross_junctions, use_source_config=use_source_config, source_remote_url=remote, ) # Delegated to a shared fetch method - self._fetch(elements, track_elements=track_elements) + self._fetch(elements) # track() # @@ -392,27 +368,20 @@ class Stream: # def track(self, targets, *, selection=PipelineSelection.REDIRECT, except_targets=None, cross_junctions=False): - # We pass no target to build. Only to track. Passing build targets - # would fully load project configuration which might not be - # possible before tracking is done. - _, elements = self._load( - [], - targets, - selection=selection, - track_selection=selection, - except_targets=except_targets, - track_except_targets=except_targets, - track_cross_junctions=cross_junctions, + elements = self._load_tracking( + targets, selection=selection, except_targets=except_targets, cross_junctions=cross_junctions ) - # FIXME: this can be refactored after element._update_state is simplified/removed - elements = [element for element in elements if element._schedule_tracking()] - self._pipeline.resolve_elements(elements) + # Note: We do not currently need to initialize the state of an + # element before it is tracked, since tracking can be done + # irrespective of source/artifact condition. Once an element + # is tracked, its state must be fully updated in either case, + # and we anyway don't do anything else with it. self._scheduler.clear_queues() track_queue = TrackQueue(self._scheduler) - self._add_queue(track_queue, track=True) - self._enqueue_plan(elements, queue=track_queue) + self._add_queue(track_queue) + self._enqueue_plan(elements) self._run() # pull() @@ -434,9 +403,8 @@ class Stream: if remote: use_config = False - elements, _ = self._load( + elements = self._load( targets, - (), selection=selection, ignore_junction_targets=ignore_junction_targets, use_artifact_config=use_config, @@ -476,9 +444,8 @@ class Stream: if remote: use_config = False - elements, _ = self._load( + elements = self._load( targets, - (), selection=selection, ignore_junction_targets=ignore_junction_targets, use_artifact_config=use_config, @@ -569,7 +536,7 @@ class Stream: tar=False ): - elements, _ = self._load((target,), (), selection=selection, use_artifact_config=True, load_refs=True) + elements = self._load((target,), selection=selection, use_artifact_config=True, load_refs=True) # self.targets contains a list of the loaded target objects # if we specify --deps build, Stream._load() will return a list @@ -778,7 +745,7 @@ class Stream: self._check_location_writable(location, force=force, tar=tar) - elements, _ = self._load((target,), (), selection=deps, except_targets=except_targets) + elements = self._load((target,), selection=deps, except_targets=except_targets) # Assert all sources are cached in the source dir self._fetch(elements) @@ -805,28 +772,18 @@ class Stream: # force (bool): Whether to ignore contents in an existing directory # custom_dir (str): Custom location to create a workspace or false to use default location. # - def workspace_open(self, targets, *, no_checkout, track_first, force, custom_dir): + def workspace_open(self, targets, *, no_checkout, force, custom_dir): # This function is a little funny but it is trying to be as atomic as possible. - if track_first: - track_targets = targets - else: - track_targets = () - - elements, track_elements = self._load( - targets, track_targets, selection=PipelineSelection.REDIRECT, track_selection=PipelineSelection.REDIRECT - ) + elements = self._load(targets, selection=PipelineSelection.REDIRECT) workspaces = self._context.get_workspaces() # If we're going to checkout, we need at least a fetch, # if we were asked to track first, we're going to fetch anyway. # - if not no_checkout or track_first: - track_elements = [] - if track_first: - track_elements = elements - self._fetch(elements, track_elements=track_elements, fetch_original=True) + if not no_checkout: + self._fetch(elements, fetch_original=True) expanded_directories = [] # To try to be more atomic, loop through the elements and raise any errors we can early @@ -966,9 +923,7 @@ class Stream: # def workspace_reset(self, targets, *, soft, track_first): - elements, _ = self._load( - targets, [], selection=PipelineSelection.REDIRECT, track_selection=PipelineSelection.REDIRECT - ) + elements = self._load(targets, selection=PipelineSelection.REDIRECT) nonexisting = [] for element in elements: @@ -991,13 +946,7 @@ class Stream: self.workspace_close(element._get_full_name(), remove_dir=True) workspaces.save_config() - self.workspace_open( - [element._get_full_name()], - no_checkout=False, - track_first=track_first, - force=True, - custom_dir=workspace_path, - ) + self.workspace_open([element._get_full_name()], no_checkout=False, force=True, custom_dir=workspace_path) # workspace_exists # @@ -1076,9 +1025,7 @@ class Stream: else: output_elements.add(e) if load_elements: - loaded_elements, _ = self._load( - load_elements, (), selection=PipelineSelection.REDIRECT, track_selection=PipelineSelection.REDIRECT - ) + loaded_elements = self._load(load_elements, selection=PipelineSelection.REDIRECT) for e in loaded_elements: output_elements.add(e.name) @@ -1180,6 +1127,116 @@ class Stream: # Private Methods # ############################################################# + # __load_elements_from_targets + # + # Given the usual set of target element names/artifact refs, load + # the `Element` objects required to describe the selection. + # + # The result is returned as a truple - firstly the loaded normal + # elements, secondly the loaded "excepting" elements and lastly + # the loaded artifact elements. + # + # Args: + # targets - The target element names/artifact refs + # except_targets - The names of elements to except + # rewritable - Whether to load the elements in re-writable mode + # + # Returns: + # ([elements], [except_elements], [artifact_elements]) + # + def __load_elements_from_targets( + self, targets: List[str], except_targets: List[str], *, rewritable: bool = False + ) -> Tuple[List[Element], List[Element], List[Element]]: + names, refs = self._classify_artifacts(targets) + loadable = [names, except_targets] + + # Load and filter elements + if loadable: + elements, except_elements = self._pipeline.load(loadable, rewritable=rewritable) + else: + elements, except_elements = [], [] + + # Load artifacts + if refs: + artifacts = self._pipeline.load_artifacts(refs) + else: + artifacts = [] + + return elements, except_elements, artifacts + + # __connect_remotes() + # + # Connect to the source and artifact remotes. + # + # Args: + # artifact_url - The url of the artifact server to connect to. + # source_url - The url of the source server to connect to. + # use_artifact_config - Whether to use the artifact config. + # use_source_config - Whether to use the source config. + # + def __connect_remotes( + self, artifact_url: str, source_url: str, use_artifact_config: bool, use_source_config: bool + ): + # ArtifactCache.setup_remotes expects all projects to be fully loaded + for project in self._context.get_projects(): + project.ensure_fully_loaded() + + # Connect to remote caches, this needs to be done before resolving element state + self._artifacts.setup_remotes(use_config=use_artifact_config, remote_url=artifact_url) + self._sourcecache.setup_remotes(use_config=use_source_config, remote_url=source_url) + + # _load_tracking() + # + # A variant of _load() to be used when the elements should be used + # for tracking + # + # If `targets` is not empty used project configuration will be + # fully loaded. + # + # Args: + # targets (list of str): Targets to load + # selection (PipelineSelection): The selection mode for the specified targets + # except_targets (list of str): Specified targets to except + # cross_junctions (bool): Whether tracking should cross junction boundaries + # + # Returns: + # (list of Element): The tracking element selection + # + def _load_tracking(self, targets, *, selection=PipelineSelection.NONE, except_targets=(), cross_junctions=False): + # We never want to use a PLAN selection when tracking elements + assert selection != PipelineSelection.PLAN + + elements, except_elements, artifacts = self.__load_elements_from_targets( + targets, except_targets, rewritable=True + ) + + # We can't track artifact refs, since they have no underlying + # elements or sources to interact with. Abort if the user asks + # us to do that. + if artifacts: + detail = "\n".join(artifact.get_artifact_name() for artifact in artifacts) + raise ArtifactElementError("Cannot perform this operation with artifact refs:", detail=detail) + + # Hold on to the targets + self.targets = elements + + track_projects = {} + for element in elements: + project = element._get_project() + if project not in track_projects: + track_projects[project] = [element] + else: + track_projects[project].append(element) + + track_selected = [] + + for project, project_elements in track_projects.items(): + selected = self._pipeline.get_selection(project_elements, selection) + selected = self._pipeline.track_cross_junction_filter(project, selected, cross_junctions) + track_selected.extend(selected) + + return self._pipeline.except_elements(elements, track_selected, except_elements) + # _load() # # A convenience method for loading element lists @@ -1192,12 +1249,8 @@ class Stream: # # Args: # targets (list of str): Main targets to load - # track_targets (list of str): Tracking targets # selection (PipelineSelection): The selection mode for the specified targets - # track_selection (PipelineSelection): The selection mode for the specified tracking targets # except_targets (list of str): Specified targets to except from fetching - # track_except_targets (list of str): Specified targets to except from fetching - # track_cross_junctions (bool): Whether tracking should cross junction boundaries # ignore_junction_targets (bool): Whether junction targets should be filtered out # use_artifact_config (bool): Whether to initialize artifacts with the config # use_source_config (bool): Whether to initialize remote source caches with the config @@ -1211,13 +1264,9 @@ class Stream: def _load( self, targets, - track_targets, *, selection=PipelineSelection.NONE, - track_selection=PipelineSelection.NONE, except_targets=(), - track_except_targets=(), - track_cross_junctions=False, ignore_junction_targets=False, use_artifact_config=False, use_source_config=False, @@ -1226,76 +1275,25 @@ class Stream: dynamic_plan=False, load_refs=False ): + elements, except_elements, artifacts = self.__load_elements_from_targets( + targets, except_targets, rewritable=False + ) - # Classify element and artifact strings - target_elements, target_artifacts = self._classify_artifacts(targets) - - if target_artifacts: + if artifacts: if not load_refs: - detail = "\n".join(target_artifacts) + detail = "\n".join(artifact.get_artifact_name() for artifact in artifacts) raise ArtifactElementError("Cannot perform this operation with artifact refs:", detail=detail) if selection in (PipelineSelection.ALL, PipelineSelection.RUN): raise StreamError("Error: '--deps {}' is not supported for artifact refs".format(selection)) - # Load rewritable if we have any tracking selection to make - rewritable = False - if track_targets: - rewritable = True - - # Load all target elements - loadable = [target_elements, except_targets, track_targets, track_except_targets] - if any(loadable): - elements, except_elements, track_elements, track_except_elements = self._pipeline.load( - loadable, rewritable=rewritable - ) - else: - elements, except_elements, track_elements, track_except_elements = [], [], [], [] - - # Load all target artifacts - artifacts = self._pipeline.load_artifacts(target_artifacts) if target_artifacts else [] - - # Optionally filter out junction elements if ignore_junction_targets: elements = [e for e in elements if e.get_kind() != "junction"] # Hold on to the targets self.targets = elements + artifacts - # First take care of marking tracking elements, this must be - # done before resolving element states. - # - assert track_selection != PipelineSelection.PLAN - - # Tracked elements are split by owner projects in order to - # filter cross junctions tracking dependencies on their - # respective project. - track_projects = {} - for element in track_elements: - project = element._get_project() - if project not in track_projects: - track_projects[project] = [element] - else: - track_projects[project].append(element) - - track_selected = [] - - for project, project_elements in track_projects.items(): - selected = self._pipeline.get_selection(project_elements, track_selection) - selected = self._pipeline.track_cross_junction_filter(project, selected, track_cross_junctions) - track_selected.extend(selected) - - track_selected = self._pipeline.except_elements(track_elements, track_selected, track_except_elements) - - if not targets: - return [], track_selected - - # ArtifactCache.setup_remotes expects all projects to be fully loaded - for project in self._context.get_projects(): - project.ensure_fully_loaded() - # Connect to remote caches, this needs to be done before resolving element state - self._artifacts.setup_remotes(use_config=use_artifact_config, remote_url=artifact_remote_url) - self._sourcecache.setup_remotes(use_config=use_source_config, remote_url=source_remote_url) + self.__connect_remotes(artifact_remote_url, source_remote_url, use_artifact_config, use_source_config) # Now move on to loading primary selection. # @@ -1308,12 +1306,12 @@ class Stream: # others are requested dynamically as needed. # This avoids pulling, fetching, or building unneeded build-only dependencies. for element in elements: - element._set_required() + element._schedule_assembly_when_necessary() else: for element in selected: - element._set_required() + element._schedule_assembly_when_necessary() - return selected, track_selected + return selected # _message() # @@ -1331,11 +1329,8 @@ class Stream: # queue (Queue): Queue to add to the pipeline # track (bool): Whether this is the tracking queue # - def _add_queue(self, queue, *, track=False): + def _add_queue(self, queue): self.queues.append(queue) - if not (track or self._first_non_track_queue): - self._first_non_track_queue = queue - self._first_non_track_queue.set_required_element_check() # _enqueue_plan() # @@ -1346,8 +1341,7 @@ class Stream: # queue (Queue): The target queue, defaults to the first non-track queue # def _enqueue_plan(self, plan, *, queue=None): - queue = queue or self._first_non_track_queue - + queue = queue or self.queues[0] queue.enqueue(plan) self.session_elements += plan @@ -1396,35 +1390,16 @@ class Stream: # track_elements (list of Element): Elements to track # fetch_original (Bool): Whether to fetch original unstaged # - def _fetch(self, elements, *, track_elements=None, fetch_original=False): - - if track_elements is None: - track_elements = [] - - # Subtract the track elements from the fetch elements, they will be added separately - fetch_plan = self._pipeline.subtract_elements(elements, track_elements) + def _fetch(self, elements, *, fetch_original=False): # Assert consistency for the fetch elements - self._pipeline.assert_consistent(fetch_plan) - - # Filter out elements with cached sources, only from the fetch plan - # let the track plan resolve new refs. - cached = [elt for elt in fetch_plan if not elt._should_fetch(fetch_original)] - fetch_plan = self._pipeline.subtract_elements(fetch_plan, cached) + self._pipeline.assert_consistent(elements) # Construct queues, enqueue and run # self._scheduler.clear_queues() - track_queue = None - if track_elements: - track_queue = TrackQueue(self._scheduler) - self._add_queue(track_queue, track=True) self._add_queue(FetchQueue(self._scheduler, fetch_original=fetch_original)) - - if track_elements: - self._enqueue_plan(track_elements, queue=track_queue) - - self._enqueue_plan(fetch_plan) + self._enqueue_plan(elements) self._run() # _check_location_writable() diff --git a/src/buildstream/element.py b/src/buildstream/element.py index 74fb1a056..a8c6bfa8f 100644 --- a/src/buildstream/element.py +++ b/src/buildstream/element.py @@ -262,7 +262,6 @@ class Element(Plugin): self.__consistency = Consistency.INCONSISTENT # Cached overall consistency state self.__assemble_scheduled = False # Element is scheduled to be assembled self.__assemble_done = False # Element is assembled - self.__tracking_scheduled = False # Sources are scheduled to be tracked self.__pull_done = False # Whether pull was attempted self.__cached_successfully = None # If the Element is known to be successfully cached self.__source_cached = None # If the sources are known to be successfully cached @@ -290,7 +289,6 @@ class Element(Plugin): self.__batch_prepare_assemble_collect = None # type: Optional[str] # Callbacks - self.__required_callback = None # Callback to Queues self.__can_query_cache_callback = None # Callback to PullQueue/FetchQueue self.__buildable_callback = None # Callback to BuildQueue @@ -1252,7 +1250,7 @@ class Element(Plugin): # - _update_artifact_state() # - Computes the state of the element's artifact using the # cache key. - # - __schedule_assembly_when_necessary() + # - _schedule_assembly_when_necessary() # - Schedules assembly of an element, iff its current state # allows/necessitates it # - __update_cache_key_non_strict() @@ -1330,43 +1328,11 @@ class Element(Plugin): _, display_key, _ = self._get_display_key() return display_key - # _schedule_tracking(): - # - # Force an element state to be inconsistent. Any sources appear to be - # inconsistent. - # - # This is used across the pipeline in sessions where the - # elements in question are going to be tracked, causing the - # pipeline to rebuild safely by ensuring cache key recalculation - # and reinterrogation of element state after tracking of elements - # succeeds. - # - # This method should return the value of `__tracking_scheduled` to report - # to callers that the element was marked for tracking. - # - # If `__tracking_scheduled` is not already determined then set it to `True` - # if at least one source advertises that it can be tracked. - # - # Returns: - # (bool): value of the `__tracking_scheduled` attribute - # - def _schedule_tracking(self) -> bool: - # if the tracking schedule is already determined then this can be skipped - if not self.__tracking_scheduled: - # Tracking does not make sense in cases where no sources can be tracked. - if any(source._is_trackable() for source in self.__sources): - self.__tracking_scheduled = True - return self.__tracking_scheduled - # _tracking_done(): # # This is called in the main process after the element has been tracked # def _tracking_done(self): - assert self.__tracking_scheduled - - self.__tracking_scheduled = False - # Tracking may change the sources' refs, and therefore the # source state. We need to update source state. self.__update_source_state() @@ -1519,34 +1485,6 @@ class Element(Plugin): # Ensure deterministic owners of sources at build time vdirectory.set_deterministic_user() - # _set_required(): - # - # Mark this element and its runtime dependencies as required. - # This unblocks pull/fetch/build. - # - def _set_required(self): - if self.__required: - # Already done - return - - self.__required = True - - # Request artifacts of runtime dependencies - for dep in self.dependencies(Scope.RUN, recurse=False): - dep._set_required() - - # When an element becomes required, it must be assembled for - # the current pipeline. `__schedule_assembly_when_necessary()` - # will abort if some other state prevents it from being built, - # and changes to such states will cause re-scheduling, so this - # is safe. - self.__schedule_assembly_when_necessary() - - # Callback to the Queue - if self.__required_callback is not None: - self.__required_callback(self) - self.__required_callback = None - # _is_required(): # # Returns whether this element has been marked as required. @@ -1592,9 +1530,6 @@ class Element(Plugin): # We're not processing not processing and - # We're required for the current build - self._is_required() - and # We have figured out the state of our artifact self.__artifact and @@ -1602,12 +1537,12 @@ class Element(Plugin): not self._cached() ) - # __schedule_assembly_when_necessary(): + # _schedule_assembly_when_necessary(): # # This is called in the main process before the element is assembled # in a subprocess. # - def __schedule_assembly_when_necessary(self): + def _schedule_assembly_when_necessary(self): # FIXME: We could reduce the number of function calls a bit by # factoring this out of this method (and checking whether we # should schedule at the calling end). @@ -1621,7 +1556,7 @@ class Element(Plugin): # Requests artifacts of build dependencies for dep in self.dependencies(Scope.BUILD, recurse=False): - dep._set_required() + dep._schedule_assembly_when_necessary() # Once we schedule an element for assembly, we know that our # build dependencies have strong cache keys, so we can update @@ -1885,7 +1820,7 @@ class Element(Plugin): # We may not have actually pulled an artifact - the pull may # have failed. We might therefore need to schedule assembly. - self.__schedule_assembly_when_necessary() + self._schedule_assembly_when_necessary() # If we've finished pulling, an artifact might now exist # locally, so we might need to update a non-strict strong # cache key. @@ -2282,22 +2217,6 @@ class Element(Plugin): else: return True - # _set_required_callback() - # - # - # Notify the pull/fetch/build queue that the element is potentially - # ready to be processed. - # - # _Set the _required_callback - the _required_callback is invoked when an - # element is marked as required. This informs us that the element needs to - # either be pulled or fetched + built. - # - # Args: - # callback (callable) - The callback function - # - def _set_required_callback(self, callback): - self.__required_callback = callback - # _set_can_query_cache_callback() # # Notify the pull/fetch queue that the element is potentially @@ -2406,11 +2325,6 @@ class Element(Plugin): assert "_Element__buildable_callback" in state state["_Element__buildable_callback"] = None - # This callback is not even read in the child process, so delete it. - # If this assumption is invalidated, we will get an attribute error to - # let us know, and we will need to update accordingly. - del state["_Element__required_callback"] - return self.__meta_kind, state def _walk_artifact_files(self): @@ -2451,10 +2365,6 @@ class Element(Plugin): # def __update_source_state(self): - # Cannot resolve source state until tracked - if self.__tracking_scheduled: - return - old_consistency = self.__consistency self.__consistency = Consistency.CACHED @@ -3225,7 +3135,7 @@ class Element(Plugin): # to this element. # # If the state changes, this will subsequently call - # `self.__schedule_assembly_when_necessary()` to schedule assembly if it becomes + # `self._schedule_assembly_when_necessary()` to schedule assembly if it becomes # possible. # # Element.__update_cache_keys() must be called before this to have @@ -3241,7 +3151,7 @@ class Element(Plugin): if not context.get_strict() and not self.__artifact: # We've calculated the weak_key, so instantiate artifact instance member self.__artifact = Artifact(self, context, weak_key=self.__weak_cache_key) - self.__schedule_assembly_when_necessary() + self._schedule_assembly_when_necessary() if not self.__strict_cache_key: return @@ -3253,7 +3163,7 @@ class Element(Plugin): if context.get_strict(): self.__artifact = self.__strict_artifact - self.__schedule_assembly_when_necessary() + self._schedule_assembly_when_necessary() else: self.__update_cache_key_non_strict() diff --git a/src/buildstream/source.py b/src/buildstream/source.py index 2e7460439..dbe113409 100644 --- a/src/buildstream/source.py +++ b/src/buildstream/source.py @@ -170,7 +170,7 @@ from . import _yaml, utils from .node import MappingNode from .plugin import Plugin from .types import Consistency, SourceRef, Union, List -from ._exceptions import BstError, ImplError, ErrorDomain +from ._exceptions import BstError, ImplError, PluginError, ErrorDomain from ._loader.metasource import MetaSource from ._projectrefs import ProjectRefStorage from ._cachekey import generate_key @@ -763,7 +763,20 @@ class Source(Plugin): # Source consistency interrogations are silent. context = self._get_context() with context.messenger.silence(): - self.__consistency = self.get_consistency() # pylint: disable=assignment-from-no-return + try: + self.__consistency = self.get_consistency() # pylint: disable=assignment-from-no-return + except SourceError: + # SourceErrors should be preserved so that the + # plugin can communicate real error cases. + raise + except Exception as err: # pylint: disable=broad-except + # Generic errors point to bugs in the plugin, so + # we need to catch them and make sure they do not + # cause stacktraces + raise PluginError( + "Source plugin '{}' failed to compute source consistency: {}".format(self.get_kind(), err), + reason="source-bug", + ) # Give the Source an opportunity to validate the cached # sources as soon as the Source becomes Consistency.CACHED. diff --git a/tests/frontend/fetch.py b/tests/frontend/fetch.py index ff3667707..9258fc7d9 100644 --- a/tests/frontend/fetch.py +++ b/tests/frontend/fetch.py @@ -60,16 +60,8 @@ def test_fetch_consistency_error(cli, datafiles): def test_fetch_consistency_bug(cli, datafiles): project = str(datafiles) - # FIXME: - # - # When a plugin raises an unhandled exception at load - # time, as is the case when running Source.get_consistency() - # for a fetch command, we could report this to the user - # more gracefully as a BUG message. - # result = cli.run(project=project, args=["source", "fetch", "bug.bst"]) - assert result.exc is not None - assert str(result.exc) == "Something went terribly wrong" + result.assert_main_error(ErrorDomain.PLUGIN, "source-bug") @pytest.mark.datafiles(DATA_DIR) |