diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2020-12-23 09:45:40 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2020-12-23 09:45:40 +0000 |
commit | 74833a73e3b8c5f473d85965e0a31ddacde6b782 (patch) | |
tree | 904a140c21f073aca49acaccf6aa971efe5fdcb2 | |
parent | 6904c1328cec54b3858e57bbd83ee59e0fd7e16e (diff) | |
parent | 5ccacfc8cea692dc0296c2f335c8129656a3d450 (diff) | |
download | buildstream-74833a73e3b8c5f473d85965e0a31ddacde6b782.tar.gz |
Merge branch 'tristan/dissolve-pipeline' into 'master'
Pipeline refactor
See merge request BuildStream/buildstream!2121
13 files changed, 401 insertions, 507 deletions
diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py index d53fc9d01..e1e6dcf39 100644 --- a/src/buildstream/_pipeline.py +++ b/src/buildstream/_pipeline.py @@ -1,5 +1,5 @@ # -# Copyright (C) 2016-2018 Codethink Limited +# Copyright (C) 2016-2020 Codethink Limited # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -19,403 +19,275 @@ # Jürg Billeter <juerg.billeter@codethink.co.uk> # Tristan Maat <tristan.maat@codethink.co.uk> -import os import itertools -from operator import itemgetter -from collections import OrderedDict +from collections import OrderedDict +from operator import itemgetter +from typing import List, Iterator from pyroaring import BitMap # pylint: disable=no-name-in-module -from ._exceptions import PipelineError -from ._profile import Topics, PROFILER -from ._project import ProjectRefStorage +from .element import Element from .types import _PipelineSelection, _Scope +from ._context import Context +from ._exceptions import PipelineError + -# Pipeline() +# dependencies() +# +# Generator function to iterate over the dependencies of multiple +# targets in the specified scope, while guaranteeing that a given +# element is never yielded more than once. # # Args: -# project (Project): The Project object -# context (Context): The Context object -# artifacts (Context): The ArtifactCache object +# targets: The target Elements to loop over +# scope: An integer value from the _Scope enum, the scope to iterate over +# recurse: Whether to recurse into dependencies # -class Pipeline: - def __init__(self, context, project, artifacts): - self._context = context # The Context - self._project = project # The toplevel project - self._artifacts = artifacts # The artifact cache - - # load() - # - # Loads elements from target names. - # - # This function is called with a list of lists, such that multiple - # target groups may be specified. Element names specified in `targets` - # are allowed to be redundant. - # - # Args: - # target_groups (list of lists): Groups of toplevel targets to load - # - # Returns: - # (tuple of lists): A tuple of grouped Element objects corresponding to target_groups - # - def load(self, target_groups): - - # First concatenate all the lists for the loader's sake - targets = list(itertools.chain(*target_groups)) - - with PROFILER.profile(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, "-") for t in targets)): - elements = self._project.load_elements(targets) - - # Now create element groups to match the input target groups - elt_iter = iter(elements) - element_groups = [[next(elt_iter) for i in range(len(group))] for group in target_groups] - - return tuple(element_groups) - - # resolve_elements() - # - # Resolve element state and cache keys. - # - # Args: - # targets (list of Element): The list of toplevel element targets - # - def resolve_elements(self, targets): - with self._context.messenger.simple_task("Resolving cached state", silent_nested=True) as task: - # We need to go through the project to access the loader - if task: - task.set_maximum_progress(self._project.loader.loaded) - - # XXX: Now that Element._update_state() can trigger recursive update_state calls - # it is possible that we could get a RecursionError. However, this is unlikely - # to happen, even for large projects (tested with the Debian stack). Although, - # if it does become a problem we may have to set the recursion limit to a - # greater value. - for element in self.dependencies(targets, _Scope.ALL): - # Determine initial element state. - element._initialize_state() - - # We may already have Elements which are cached and have their runtimes - # cached, if this is the case, we should immediately notify their reverse - # dependencies. - element._update_ready_for_runtime_and_cached() - - if task: - task.add_current_progress() - - # check_remotes() - # - # Check if the target artifact is cached in any of the available remotes - # - # Args: - # targets (list [Element]): The list of element targets - # - def check_remotes(self, targets): - with self._context.messenger.simple_task("Querying remotes for cached status", silent_nested=True) as task: - task.set_maximum_progress(len(targets)) +# Yields: +# Elements in the scope of the specified target elements +# +def dependencies(targets: List[Element], scope: int, *, recurse: bool = True) -> Iterator[Element]: + # Keep track of 'visited' in this scope, so that all targets + # share the same context. + visited = (BitMap(), BitMap()) - for element in targets: - element._cached_remotely() + for target in targets: + for element in target._dependencies(scope, recurse=recurse, visited=visited): + yield element - task.add_current_progress() - # dependencies() - # - # Generator function to iterate over elements and optionally - # also iterate over sources. - # - # Args: - # targets (list of Element): The target Elements to loop over - # scope (_Scope): The scope to iterate over - # recurse (bool): Whether to recurse into dependencies - # - def dependencies(self, targets, scope, *, recurse=True): - # Keep track of 'visited' in this scope, so that all targets - # share the same context. - visited = (BitMap(), BitMap()) - - for target in targets: - for element in target._dependencies(scope, recurse=recurse, visited=visited): - yield element +# get_selection() +# +# Gets a full list of elements based on a toplevel +# list of element targets +# +# Various commands define a --deps option to specify what elements to +# use in the result, this function reports a list that is appropriate for +# the selected option. +# +# Args: +# context: The invocation context +# targets: The target Elements +# mode: A value from PipelineSelection enumeration +# silent: Whether to silence messages +# +# Returns: +# A list of Elements appropriate for the specified selection mode +# +def get_selection(context: Context, targets: List[Element], mode: str, *, silent: bool = True) -> List[Element]: + def redirect_and_log() -> List[Element]: + # Redirect and log if permitted + elements: List[Element] = [] + for t in targets: + new_elm = t._get_source_element() + if new_elm != t and not silent: + context.messenger.info("Element '{}' redirected to '{}'".format(t.name, new_elm.name)) + if new_elm not in elements: + elements.append(new_elm) + return elements - # plan() - # - # Generator function to iterate over only the elements - # which are required to build the pipeline target, omitting - # cached elements. The elements are yielded in a depth sorted - # ordering for optimal build plans - # - # Args: - # elements (list of Element): List of target elements to plan - # - # Returns: - # (list of Element): A depth sorted list of the build plan - # - def plan(self, elements): + def plan() -> List[Element]: # Keep locally cached elements in the plan if remote artifact cache is used # to allow pulling artifact with strict cache key, if available. - plan_cached = not self._context.get_strict() and self._artifacts.has_fetch_remotes() + plan_cached = not context.get_strict() and context.artifactcache.has_fetch_remotes() + return _Planner().plan(targets, plan_cached) - return _Planner().plan(elements, plan_cached) - - # get_selection() - # - # Gets a full list of elements based on a toplevel - # list of element targets - # - # Args: - # targets (list of Element): The target Elements - # mode (_PipelineSelection): The PipelineSelection mode - # - # Various commands define a --deps option to specify what elements to - # use in the result, this function reports a list that is appropriate for - # the selected option. - # - def get_selection(self, targets, mode, *, silent=True): - def redirect_and_log(): - # Redirect and log if permitted - elements = [] - for t in targets: - new_elm = t._get_source_element() - if new_elm != t and not silent: - self._context.messenger.info("Element '{}' redirected to '{}'".format(t.name, new_elm.name)) - if new_elm not in elements: - elements.append(new_elm) - return elements - - # Work around python not having a switch statement; this is - # much clearer than the if/elif/else block we used to have. - # - # Note that the lambda is necessary so that we don't evaluate - # all possible values at run time; that would be slow. - return { - _PipelineSelection.NONE: lambda: targets, - _PipelineSelection.REDIRECT: redirect_and_log, - _PipelineSelection.PLAN: lambda: self.plan(targets), - _PipelineSelection.ALL: lambda: list(self.dependencies(targets, _Scope.ALL)), - _PipelineSelection.BUILD: lambda: list(self.dependencies(targets, _Scope.BUILD)), - _PipelineSelection.RUN: lambda: list(self.dependencies(targets, _Scope.RUN)), - }[mode]() - - # except_elements(): - # - # Return what we are left with after the intersection between - # excepted and target elements and their unique dependencies is - # gone. - # - # Args: - # targets (list of Element): List of toplevel targetted elements - # elements (list of Element): The list to remove elements from - # except_targets (list of Element): List of toplevel except targets - # - # Returns: - # (list of Element): The elements list with the intersected - # exceptions removed - # - def except_elements(self, targets, elements, except_targets): - if not except_targets: - return elements - - targeted = list(self.dependencies(targets, _Scope.ALL)) - visited = [] - - def find_intersection(element): - if element in visited: - return - visited.append(element) - - # Intersection elements are those that are also in - # 'targeted', as long as we don't recurse into them. - if element in targeted: - yield element - else: - for dep in element._dependencies(_Scope.ALL, recurse=False): - yield from find_intersection(dep) - - # Build a list of 'intersection' elements, i.e. the set of - # elements that lie on the border closest to excepted elements - # between excepted and target elements. - intersection = list(itertools.chain.from_iterable(find_intersection(element) for element in except_targets)) - - # Now use this set of elements to traverse the targeted - # elements, except 'intersection' elements and their unique - # dependencies. - queue = [] - visited = [] - - queue.extend(targets) - while queue: - element = queue.pop() - if element in visited or element in intersection: - continue - visited.append(element) - - queue.extend(element._dependencies(_Scope.ALL, recurse=False)) - - # That looks like a lot, but overall we only traverse (part - # of) the graph twice. This could be reduced to once if we - # kept track of parent elements, but is probably not - # significant. - - # Ensure that we return elements in the same order they were - # in before. - return [element for element in elements if element in visited] - - # add_elements() + # Work around python not having a switch statement; this is + # much clearer than the if/elif/else block we used to have. # - # Add to a list of elements all elements that are not already in it - # - # Args: - # elements (list of Element): The element list - # add (list of Element): List of elements to add - # - # Returns: - # (list): The original elements list, with elements in add that weren't - # already in it added. - def add_elements(self, elements, add): - ret = elements[:] - ret.extend(e for e in add if e not in ret) - return ret - - # track_cross_junction_filter() - # - # Filters out elements which are across junction boundaries, - # otherwise asserts that there are no such elements. - # - # This is currently assumed to be only relevant for element - # lists targetted at tracking. - # - # Args: - # project (Project): Project used for cross_junction filtering. - # All elements are expected to belong to that project. - # elements (list of Element): The list of elements to filter - # cross_junction_requested (bool): Whether the user requested - # cross junction tracking - # - # Returns: - # (list of Element): The filtered or asserted result - # - def track_cross_junction_filter(self, project, elements, cross_junction_requested): - # Filter out cross junctioned elements - if not cross_junction_requested: - elements = self._filter_cross_junctions(project, elements) - self._assert_junction_tracking(elements) + # Note that the lambda is necessary so that we don't evaluate + # all possible values at run time; that would be slow. + return { + _PipelineSelection.NONE: lambda: targets, + _PipelineSelection.REDIRECT: redirect_and_log, + _PipelineSelection.PLAN: plan, + _PipelineSelection.ALL: lambda: list(dependencies(targets, _Scope.ALL)), + _PipelineSelection.BUILD: lambda: list(dependencies(targets, _Scope.BUILD)), + _PipelineSelection.RUN: lambda: list(dependencies(targets, _Scope.RUN)), + }[mode]() - return elements - # assert_consistent() - # - # Asserts that the given list of elements are in a consistent state, that - # is to say that all sources are consistent and can at least be fetched. - # - # Consequently it also means that cache keys can be resolved. - # - def assert_consistent(self, elements): - inconsistent = [] - inconsistent_workspaced = [] - with self._context.messenger.timed_activity("Checking sources"): - for element in elements: - if not element._has_all_sources_resolved(): - if element._get_workspace(): - inconsistent_workspaced.append(element) - else: - inconsistent.append(element) - - if inconsistent: - detail = "Exact versions are missing for the following elements:\n\n" - for element in inconsistent: - detail += " Element: {} is inconsistent\n".format(element._get_full_name()) - for source in element.sources(): - if not source.is_resolved(): - detail += " {} is missing ref\n".format(source) - detail += "\n" - detail += "Try tracking these elements first with `bst source track`\n" - - raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline") - - if inconsistent_workspaced: - detail = "Some workspaces exist but are not closed\n" + "Try closing them with `bst workspace close`\n\n" - for element in inconsistent_workspaced: - detail += " " + element._get_full_name() + "\n" - raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline-workspaced") - - # assert_sources_cached() - # - # Asserts that sources for the given list of elements are cached. - # - # Args: - # elements (list): The list of elements - # - def assert_sources_cached(self, elements): - uncached = [] - with self._context.messenger.timed_activity("Checking sources"): - for element in elements: - if element._fetch_needed(): - uncached.append(element) - - if uncached: - detail = "Sources are not cached for the following elements:\n\n" - for element in uncached: - detail += " Following sources for element: {} are not cached:\n".format(element._get_full_name()) - for source in element.sources(): - if not source._is_cached(): - detail += " {}\n".format(source) - detail += "\n" - detail += ( - "Try fetching these elements first with `bst source fetch`,\n" - + "or run this command with `--fetch` option\n" - ) - - raise PipelineError("Uncached sources", detail=detail, reason="uncached-sources") - - ############################################################# - # Private Methods # - ############################################################# - - # _filter_cross_junction() - # - # Filters out cross junction elements from the elements - # - # Args: - # project (Project): The project on which elements are allowed - # elements (list of Element): The list of elements to be tracked - # - # Returns: - # (list): A filtered list of `elements` which does - # not contain any cross junction elements. - # - def _filter_cross_junctions(self, project, elements): - return [element for element in elements if element._get_project() is project] +# except_elements(): +# +# This function calculates the intersection of the `except_targets` +# element dependencies and the `targets` dependencies, and removes +# that intersection from the `elements` list, returning the result. +# +# Args: +# targets: List of toplevel targetted elements +# elements: The list to remove elements from +# except_targets: List of toplevel except targets +# +# Returns: +# The elements list with the intersected exceptions removed +# +# Important notes on the behavior +# =============================== +# +# * Except elements can be completely outside of the scope +# of targets. +# +# * When the dependencies of except elements intersect with +# dependencies of targets, those dependencies are removed +# from the result. +# +# * If a target is found within the intersection of excepted +# elements, that target and it's dependencies are considered +# exempt from the exception intersection. +# +# Example: +# +# (t1) (e1) +# / \ / +# (o) (o) ( ) +# / \ / \ +# (o) (x) ( ) +# \ / \ +# (o) (x) ( ) +# \ / +# (x) +# / \ +# (x) (t2) +# / \ / \ +# (x) (x) (o) +# / \ +# (o) (o) +# +# Here we have a mockup graph with 2 target elements (t1) and (t2), +# and one except element (e1) which lies outside of the graph. +# +# - ( ) elements are ignored, they were never in the element list +# - (o) elements will be included in the result +# - (x) elements are removed from the graph +# +# Note how (t2) reintroduces portions of the graph which were otherwise +# tainted by being depended on indirectly by the (e1) except element. +# +def except_elements(targets: List[Element], elements: List[Element], except_targets: List[Element]) -> List[Element]: + if not except_targets: + return elements - # _assert_junction_tracking() - # - # Raises an error if tracking is attempted on junctioned elements and - # a project.refs file is not enabled for the toplevel project. - # - # Args: - # elements (list of Element): The list of elements to be tracked - # - def _assert_junction_tracking(self, elements): + targeted: List[Element] = list(dependencies(targets, _Scope.ALL)) + visited: List[Element] = [] - # We can track anything if the toplevel project uses project.refs - # - if self._project.ref_storage == ProjectRefStorage.PROJECT_REFS: + def find_intersection(element: Element) -> Iterator[Element]: + if element in visited: return - - # Ideally, we would want to report every cross junction element but not - # their dependencies, unless those cross junction elements dependencies - # were also explicitly requested on the command line. - # - # But this is too hard, lets shoot for a simple error. + visited.append(element) + + # Intersection elements are those that are also in + # 'targeted', as long as we don't recurse into them. + if element in targeted: + yield element + else: + for dep in element._dependencies(_Scope.ALL, recurse=False): + yield from find_intersection(dep) + + # Build a list of 'intersection' elements, i.e. the set of + # elements that lie on the border closest to excepted elements + # between excepted and target elements. + intersection = list(itertools.chain.from_iterable(find_intersection(element) for element in except_targets)) + + # Now use this set of elements to traverse the targeted + # elements, except 'intersection' elements and their unique + # dependencies. + queue = [] + visited = [] + + queue.extend(targets) + while queue: + element = queue.pop() + if element in visited or element in intersection: + continue + visited.append(element) + + queue.extend(element._dependencies(_Scope.ALL, recurse=False)) + + # That looks like a lot, but overall we only traverse (part + # of) the graph twice. This could be reduced to once if we + # kept track of parent elements, but is probably not + # significant. + + # Ensure that we return elements in the same order they were + # in before. + return [element for element in elements if element in visited] + + +# assert_consistent() +# +# Asserts that the given list of elements are in a consistent state, that +# is to say that all sources are consistent and can at least be fetched. +# +# Consequently it also means that cache keys can be resolved. +# +# Args: +# context: The invocation context +# elements: The elements to assert consistency on +# +# Raises: +# PipelineError: If the elements are inconsistent. +# +def assert_consistent(context: Context, elements: List[Element]) -> None: + inconsistent = [] + inconsistent_workspaced = [] + with context.messenger.timed_activity("Checking sources"): + for element in elements: + if not element._has_all_sources_resolved(): + if element._get_workspace(): + inconsistent_workspaced.append(element) + else: + inconsistent.append(element) + + if inconsistent: + detail = "Exact versions are missing for the following elements:\n\n" + for element in inconsistent: + detail += " Element: {} is inconsistent\n".format(element._get_full_name()) + for source in element.sources(): + if not source.is_resolved(): + detail += " {} is missing ref\n".format(source) + detail += "\n" + detail += "Try tracking these elements first with `bst source track`\n" + raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline") + + if inconsistent_workspaced: + detail = "Some workspaces exist but are not closed\n" + "Try closing them with `bst workspace close`\n\n" + for element in inconsistent_workspaced: + detail += " " + element._get_full_name() + "\n" + raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline-workspaced") + + +# assert_sources_cached() +# +# Asserts that sources for the given list of elements are cached. +# +# Args: +# context: The invocation context +# elements: The elements to assert cached source state for +# +# Raises: +# PipelineError: If the elements have uncached sources +# +def assert_sources_cached(context: Context, elements: List[Element]): + uncached = [] + with context.messenger.timed_activity("Checking sources"): for element in elements: - element_project = element._get_project() - if element_project is not self._project: - detail = ( - "Requested to track sources across junction boundaries\n" - + "in a project which does not use project.refs ref-storage." - ) - - raise PipelineError("Untrackable sources", detail=detail, reason="untrackable-sources") + if element._fetch_needed(): + uncached.append(element) + + if uncached: + detail = "Sources are not cached for the following elements:\n\n" + for element in uncached: + detail += " Following sources for element: {} are not cached:\n".format(element._get_full_name()) + for source in element.sources(): + if not source._is_cached(): + detail += " {}\n".format(source) + detail += "\n" + detail += ( + "Try fetching these elements first with `bst source fetch`,\n" + + "or run this command with `--fetch` option\n" + ) + raise PipelineError("Uncached sources", detail=detail, reason="uncached-sources") # _Planner() diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index e05100f24..35cb230ed 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -1,5 +1,5 @@ # -# Copyright (C) 2018 Codethink Limited +# Copyright (C) 2020 Codethink Limited # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -19,6 +19,7 @@ # Jürg Billeter <juerg.billeter@codethink.co.uk> # Tristan Maat <tristan.maat@codethink.co.uk> +import itertools import os import sys import stat @@ -44,12 +45,12 @@ from ._scheduler import ( ArtifactPushQueue, ) from .element import Element -from ._pipeline import Pipeline from ._profile import Topics, PROFILER +from ._project import ProjectRefStorage from ._state import State from .types import _KeyStrength, _PipelineSelection, _Scope from .plugin import Plugin -from . import utils, _yaml, _site +from . import utils, _yaml, _site, _pipeline # Stream() @@ -84,7 +85,6 @@ class Stream: self._elementsourcescache = None self._sourcecache = None self._project = None - self._pipeline = None self._state = State(session_start) # Owned by Stream, used by Core to set state self._notification_queue = deque() @@ -125,7 +125,6 @@ class Stream: assert self._project is None self._project = project self._project.load_context.set_fetch_subprojects(self._fetch_subprojects) - self._pipeline = Pipeline(self._context, project, self._artifacts) # load_selection() # @@ -211,11 +210,14 @@ class Stream: if pull_: self._scheduler.clear_queues() self._add_queue(PullQueue(self._scheduler)) - plan = self._pipeline.add_elements([element], elements) + + # Pull the toplevel element regardless of whether it is in scope + plan = elements if element in elements else [element] + elements + self._enqueue_plan(plan) self._run() - missing_deps = [dep for dep in self._pipeline.dependencies([element], scope) if not dep._cached()] + missing_deps = [dep for dep in _pipeline.dependencies([element], scope) if not dep._cached()] if missing_deps: raise StreamError( "Elements need to be built or downloaded before staging a shell environment", @@ -245,7 +247,7 @@ class Stream: # Ensure we have our sources if we are launching a build shell if scope == _Scope.BUILD and not usebuildtree: self._fetch([element]) - self._pipeline.assert_sources_cached([element]) + _pipeline.assert_sources_cached(self._context, [element]) return element._shell( scope, mounts=mounts, isolate=isolate, prompt=prompt(element), command=command, usebuildtree=usebuildtree @@ -281,7 +283,7 @@ class Stream: ) # Assert that the elements are consistent - self._pipeline.assert_consistent(elements) + _pipeline.assert_consistent(self._context, elements) if all(project.remote_execution_specs for project in self._context.get_projects()): # Remote execution is configured for all projects. @@ -406,7 +408,7 @@ class Stream: if not self._sourcecache.has_push_remotes(): raise StreamError("No source caches available for pushing sources") - self._pipeline.assert_consistent(elements) + _pipeline.assert_consistent(self._context, elements) self._add_queue(FetchQueue(self._scheduler)) @@ -446,7 +448,7 @@ class Stream: if not self._artifacts.has_fetch_remotes(): raise StreamError("No artifact caches available for pulling artifacts") - self._pipeline.assert_consistent(elements) + _pipeline.assert_consistent(self._context, elements) self._scheduler.clear_queues() self._add_queue(PullQueue(self._scheduler)) self._enqueue_plan(elements) @@ -487,7 +489,7 @@ class Stream: if not self._artifacts.has_push_remotes(): raise StreamError("No artifact caches available for pushing artifacts") - self._pipeline.assert_consistent(elements) + _pipeline.assert_consistent(self._context, elements) self._scheduler.clear_queues() self._add_queue(PullQueue(self._scheduler)) @@ -618,7 +620,7 @@ class Stream: ) if self._artifacts.has_fetch_remotes(): - self._pipeline.check_remotes(target_objects) + self._resolve_cached_remotely(target_objects) return target_objects @@ -743,7 +745,7 @@ class Stream: # Assert all sources are cached in the source dir self._fetch(elements) - self._pipeline.assert_sources_cached(elements) + _pipeline.assert_sources_cached(self._context, elements) # Stage all sources determined by scope try: @@ -1140,6 +1142,34 @@ class Stream: ArtifactProject.clear_project_cache() return list(artifacts) + # _load_elements() + # + # Loads elements from target names. + # + # This function is called with a list of lists, such that multiple + # target groups may be specified. Element names specified in `targets` + # are allowed to be redundant. + # + # Args: + # target_groups (list of lists): Groups of toplevel targets to load + # + # Returns: + # (tuple of lists): A tuple of Element object lists, grouped corresponding to target_groups + # + def _load_elements(self, target_groups): + + # First concatenate all the lists for the loader's sake + targets = list(itertools.chain(*target_groups)) + + with PROFILER.profile(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, "-") for t in targets)): + elements = self._project.load_elements(targets) + + # Now create element groups to match the input target groups + elt_iter = iter(elements) + element_groups = [[next(elt_iter) for i in range(len(group))] for group in target_groups] + + return tuple(element_groups) + # _load_elements_from_targets # # Given the usual set of target element names/artifact refs, load @@ -1166,20 +1196,24 @@ class Stream: rewritable: bool = False, valid_artifact_names: bool = False ) -> Tuple[List[Element], List[Element], List[Element]]: - names, refs = self._expand_and_classify_targets(targets, valid_artifact_names=valid_artifact_names) - loadable = [names, except_targets] + + # First determine which of the user specified targets are artifact + # names and which are element names. + element_names, artifact_names = self._expand_and_classify_targets( + targets, valid_artifact_names=valid_artifact_names + ) self._project.load_context.set_rewritable(rewritable) - # Load and filter elements - if loadable: - elements, except_elements = self._pipeline.load(loadable) + # Load elements and except elements + if element_names: + elements, except_elements = self._load_elements([element_names, except_targets]) else: elements, except_elements = [], [] # Load artifacts - if refs: - artifacts = self._load_artifacts(refs) + if artifact_names: + artifacts = self._load_artifacts(artifact_names) else: artifacts = [] @@ -1205,6 +1239,21 @@ class Stream: self._elementsourcescache.setup_remotes(use_config=use_source_config, remote_url=source_url) self._sourcecache.setup_remotes(use_config=use_source_config, remote_url=source_url) + # _resolve_cached_remotely() + # + # Checks whether the listed elements are currently cached in + # any of their respectively configured remotes. + # + # Args: + # targets (list [Element]): The list of element targets + # + def _resolve_cached_remotely(self, targets): + with self._context.messenger.simple_task("Querying remotes for cached status", silent_nested=True) as task: + task.set_maximum_progress(len(targets)) + for element in targets: + element._cached_remotely() + task.add_current_progress() + # _load_tracking() # # A variant of _load() to be used when the elements should be used @@ -1251,11 +1300,56 @@ class Stream: track_selected = [] for project, project_elements in track_projects.items(): - selected = self._pipeline.get_selection(project_elements, selection) - selected = self._pipeline.track_cross_junction_filter(project, selected, cross_junctions) + selected = _pipeline.get_selection(self._context, project_elements, selection) + selected = self._track_cross_junction_filter(project, selected, cross_junctions) track_selected.extend(selected) - return self._pipeline.except_elements(elements, track_selected, except_elements) + return _pipeline.except_elements(elements, track_selected, except_elements) + + # _track_cross_junction_filter() + # + # Filters out elements which are across junction boundaries, + # otherwise asserts that there are no such elements. + # + # This is currently assumed to be only relevant for element + # lists targetted at tracking. + # + # Args: + # project (Project): Project used for cross_junction filtering. + # All elements are expected to belong to that project. + # elements (list of Element): The list of elements to filter + # cross_junction_requested (bool): Whether the user requested + # cross junction tracking + # + # Returns: + # (list of Element): The filtered or asserted result + # + def _track_cross_junction_filter(self, project, elements, cross_junction_requested): + + # First filter out cross junctioned elements + if not cross_junction_requested: + elements = [element for element in elements if element._get_project() is project] + + # We can track anything if the toplevel project uses project.refs + # + if self._project.ref_storage == ProjectRefStorage.PROJECT_REFS: + return elements + + # Ideally, we would want to report every cross junction element but not + # their dependencies, unless those cross junction elements dependencies + # were also explicitly requested on the command line. + # + # But this is too hard, lets shoot for a simple error. + for element in elements: + element_project = element._get_project() + if element_project is not self._project: + detail = ( + "Requested to track sources across junction boundaries\n" + + "in a project which does not use project.refs ref-storage." + ) + raise StreamError("Untrackable sources", detail=detail, reason="untrackable-sources") + + return elements # _load() # @@ -1315,9 +1409,9 @@ class Stream: # Now move on to loading primary selection. # - self._pipeline.resolve_elements(self.targets) - selected = self._pipeline.get_selection(self.targets, selection, silent=False) - selected = self._pipeline.except_elements(self.targets, selected, except_elements) + self._resolve_elements(self.targets) + selected = _pipeline.get_selection(self._context, self.targets, selection, silent=False) + selected = _pipeline.except_elements(self.targets, selected, except_elements) if selection == _PipelineSelection.PLAN and dynamic_plan: # We use a dynamic build plan, only request artifacts of top-level targets, @@ -1331,6 +1425,36 @@ class Stream: return selected + # _resolve_elements() + # + # Resolve element state and cache keys. + # + # Args: + # targets (list of Element): The list of toplevel element targets + # + def _resolve_elements(self, targets): + with self._context.messenger.simple_task("Resolving cached state", silent_nested=True) as task: + # We need to go through the project to access the loader + if task: + task.set_maximum_progress(self._project.loader.loaded) + + # XXX: Now that Element._update_state() can trigger recursive update_state calls + # it is possible that we could get a RecursionError. However, this is unlikely + # to happen, even for large projects (tested with the Debian stack). Although, + # if it does become a problem we may have to set the recursion limit to a + # greater value. + for element in _pipeline.dependencies(targets, _Scope.ALL): + # Determine initial element state. + element._initialize_state() + + # We may already have Elements which are cached and have their runtimes + # cached, if this is the case, we should immediately notify their reverse + # dependencies. + element._update_ready_for_runtime_and_cached() + + if task: + task.add_current_progress() + # _add_queue() # # Adds a queue to the stream @@ -1374,7 +1498,7 @@ class Stream: # Inform the frontend of the full list of elements # and the list of elements which will be processed in this run # - self.total_elements = list(self._pipeline.dependencies(self.targets, _Scope.ALL)) + self.total_elements = list(_pipeline.dependencies(self.targets, _Scope.ALL)) if announce_session and self._session_start_callback is not None: self._session_start_callback() @@ -1401,7 +1525,7 @@ class Stream: def _fetch(self, elements: List[Element], *, fetch_original: bool = False, announce_session: bool = False): # Assert consistency for the fetch elements - self._pipeline.assert_consistent(elements) + _pipeline.assert_consistent(self._context, elements) # Construct queues, enqueue and run # diff --git a/src/buildstream/testing/_sourcetests/track.py b/src/buildstream/testing/_sourcetests/track.py index 38ef217f0..638cbb9b1 100644 --- a/src/buildstream/testing/_sourcetests/track.py +++ b/src/buildstream/testing/_sourcetests/track.py @@ -228,7 +228,7 @@ def test_cross_junction(cli, tmpdir, datafiles, ref_storage, kind): if ref_storage == "inline": # This is not allowed to track cross junction without project.refs. - result.assert_main_error(ErrorDomain.PIPELINE, "untrackable-sources") + result.assert_main_error(ErrorDomain.STREAM, "untrackable-sources") else: result.assert_success() diff --git a/tests/frontend/track.py b/tests/frontend/track.py index bd8444973..3dd686de0 100644 --- a/tests/frontend/track.py +++ b/tests/frontend/track.py @@ -222,7 +222,7 @@ def test_track_cross_junction(cli, tmpdir, datafiles, cross_junction, ref_storag # Cross junction tracking is not allowed when the toplevel project # is using inline ref storage. # - result.assert_main_error(ErrorDomain.PIPELINE, "untrackable-sources") + result.assert_main_error(ErrorDomain.STREAM, "untrackable-sources") else: # # No cross juction tracking was requested diff --git a/tests/internals/pluginloading.py b/tests/internals/pluginloading.py deleted file mode 100644 index 1f4446541..000000000 --- a/tests/internals/pluginloading.py +++ /dev/null @@ -1,38 +0,0 @@ -from contextlib import contextmanager -import os -import pytest - -from buildstream._project import Project -from buildstream._pipeline import Pipeline - -from tests.testutils import dummy_context - -DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "pluginloading",) - - -@contextmanager -def create_pipeline(tmpdir, basedir, target): - with dummy_context() as context: - context.deploydir = os.path.join(str(tmpdir), "deploy") - context.casdir = os.path.join(str(tmpdir), "cas") - project = Project(basedir, context) - - pipeline = Pipeline(context, project, None) - (targets,) = pipeline.load([(target,)]) - yield targets - - -@pytest.mark.datafiles(os.path.join(DATA_DIR, "customsource")) -def test_customsource(datafiles, tmpdir): - - basedir = str(datafiles) - with create_pipeline(tmpdir, basedir, "simple.bst") as targets: - assert targets[0].get_kind() == "autotools" - - -@pytest.mark.datafiles(os.path.join(DATA_DIR, "customelement")) -def test_customelement(datafiles, tmpdir): - - basedir = str(datafiles) - with create_pipeline(tmpdir, basedir, "simple.bst") as targets: - assert targets[0].get_kind() == "foo" diff --git a/tests/internals/pluginloading/customelement/elements/simple.bst b/tests/internals/pluginloading/customelement/elements/simple.bst deleted file mode 100644 index fc48e3ba9..000000000 --- a/tests/internals/pluginloading/customelement/elements/simple.bst +++ /dev/null @@ -1,4 +0,0 @@ -kind: foo -description: Custom foo source -config: - pony-color: pink diff --git a/tests/internals/pluginloading/customelement/pluginelements/__init__.py b/tests/internals/pluginloading/customelement/pluginelements/__init__.py deleted file mode 100644 index e69de29bb..000000000 --- a/tests/internals/pluginloading/customelement/pluginelements/__init__.py +++ /dev/null diff --git a/tests/internals/pluginloading/customelement/pluginelements/foo.py b/tests/internals/pluginloading/customelement/pluginelements/foo.py deleted file mode 100644 index bdb6c8982..000000000 --- a/tests/internals/pluginloading/customelement/pluginelements/foo.py +++ /dev/null @@ -1,19 +0,0 @@ -from buildstream import Element - - -class FooElement(Element): - - BST_MIN_VERSION = "2.0" - - def preflight(self): - pass - - def configure(self, node): - pass - - def get_unique_key(self): - return {} - - -def setup(): - return FooElement diff --git a/tests/internals/pluginloading/customelement/project.conf b/tests/internals/pluginloading/customelement/project.conf deleted file mode 100644 index 2619bdf82..000000000 --- a/tests/internals/pluginloading/customelement/project.conf +++ /dev/null @@ -1,8 +0,0 @@ -name: pony -min-version: 2.0 -element-path: elements -plugins: -- origin: local - path: pluginelements - elements: - - foo diff --git a/tests/internals/pluginloading/customsource/elements/simple.bst b/tests/internals/pluginloading/customsource/elements/simple.bst deleted file mode 100644 index 7e0cc43b7..000000000 --- a/tests/internals/pluginloading/customsource/elements/simple.bst +++ /dev/null @@ -1,6 +0,0 @@ -kind: autotools -description: Custom foo source -sources: -- kind: foo - ref: 1.2.3 - uri: http://ponyland.com diff --git a/tests/internals/pluginloading/customsource/pluginsources/__init__.py b/tests/internals/pluginloading/customsource/pluginsources/__init__.py deleted file mode 100644 index e69de29bb..000000000 --- a/tests/internals/pluginloading/customsource/pluginsources/__init__.py +++ /dev/null diff --git a/tests/internals/pluginloading/customsource/pluginsources/foo.py b/tests/internals/pluginloading/customsource/pluginsources/foo.py deleted file mode 100644 index c5229f3e2..000000000 --- a/tests/internals/pluginloading/customsource/pluginsources/foo.py +++ /dev/null @@ -1,19 +0,0 @@ -from buildstream import Source - - -class FooSource(Source): - - BST_MIN_VERSION = "2.0" - - def preflight(self): - pass - - def configure(self, node): - pass - - def get_unique_key(self): - pass - - -def setup(): - return FooSource diff --git a/tests/internals/pluginloading/customsource/project.conf b/tests/internals/pluginloading/customsource/project.conf deleted file mode 100644 index 5cb6da537..000000000 --- a/tests/internals/pluginloading/customsource/project.conf +++ /dev/null @@ -1,8 +0,0 @@ -name: pony -min-version: 2.0 -element-path: elements -plugins: -- origin: local - path: pluginsources - sources: - - foo |