summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2020-12-23 09:45:40 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2020-12-23 09:45:40 +0000
commit74833a73e3b8c5f473d85965e0a31ddacde6b782 (patch)
tree904a140c21f073aca49acaccf6aa971efe5fdcb2
parent6904c1328cec54b3858e57bbd83ee59e0fd7e16e (diff)
parent5ccacfc8cea692dc0296c2f335c8129656a3d450 (diff)
downloadbuildstream-74833a73e3b8c5f473d85965e0a31ddacde6b782.tar.gz
Merge branch 'tristan/dissolve-pipeline' into 'master'
Pipeline refactor See merge request BuildStream/buildstream!2121
-rw-r--r--src/buildstream/_pipeline.py620
-rw-r--r--src/buildstream/_stream.py182
-rw-r--r--src/buildstream/testing/_sourcetests/track.py2
-rw-r--r--tests/frontend/track.py2
-rw-r--r--tests/internals/pluginloading.py38
-rw-r--r--tests/internals/pluginloading/customelement/elements/simple.bst4
-rw-r--r--tests/internals/pluginloading/customelement/pluginelements/__init__.py0
-rw-r--r--tests/internals/pluginloading/customelement/pluginelements/foo.py19
-rw-r--r--tests/internals/pluginloading/customelement/project.conf8
-rw-r--r--tests/internals/pluginloading/customsource/elements/simple.bst6
-rw-r--r--tests/internals/pluginloading/customsource/pluginsources/__init__.py0
-rw-r--r--tests/internals/pluginloading/customsource/pluginsources/foo.py19
-rw-r--r--tests/internals/pluginloading/customsource/project.conf8
13 files changed, 401 insertions, 507 deletions
diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py
index d53fc9d01..e1e6dcf39 100644
--- a/src/buildstream/_pipeline.py
+++ b/src/buildstream/_pipeline.py
@@ -1,5 +1,5 @@
#
-# Copyright (C) 2016-2018 Codethink Limited
+# Copyright (C) 2016-2020 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -19,403 +19,275 @@
# Jürg Billeter <juerg.billeter@codethink.co.uk>
# Tristan Maat <tristan.maat@codethink.co.uk>
-import os
import itertools
-from operator import itemgetter
-from collections import OrderedDict
+from collections import OrderedDict
+from operator import itemgetter
+from typing import List, Iterator
from pyroaring import BitMap # pylint: disable=no-name-in-module
-from ._exceptions import PipelineError
-from ._profile import Topics, PROFILER
-from ._project import ProjectRefStorage
+from .element import Element
from .types import _PipelineSelection, _Scope
+from ._context import Context
+from ._exceptions import PipelineError
+
-# Pipeline()
+# dependencies()
+#
+# Generator function to iterate over the dependencies of multiple
+# targets in the specified scope, while guaranteeing that a given
+# element is never yielded more than once.
#
# Args:
-# project (Project): The Project object
-# context (Context): The Context object
-# artifacts (Context): The ArtifactCache object
+# targets: The target Elements to loop over
+# scope: An integer value from the _Scope enum, the scope to iterate over
+# recurse: Whether to recurse into dependencies
#
-class Pipeline:
- def __init__(self, context, project, artifacts):
- self._context = context # The Context
- self._project = project # The toplevel project
- self._artifacts = artifacts # The artifact cache
-
- # load()
- #
- # Loads elements from target names.
- #
- # This function is called with a list of lists, such that multiple
- # target groups may be specified. Element names specified in `targets`
- # are allowed to be redundant.
- #
- # Args:
- # target_groups (list of lists): Groups of toplevel targets to load
- #
- # Returns:
- # (tuple of lists): A tuple of grouped Element objects corresponding to target_groups
- #
- def load(self, target_groups):
-
- # First concatenate all the lists for the loader's sake
- targets = list(itertools.chain(*target_groups))
-
- with PROFILER.profile(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, "-") for t in targets)):
- elements = self._project.load_elements(targets)
-
- # Now create element groups to match the input target groups
- elt_iter = iter(elements)
- element_groups = [[next(elt_iter) for i in range(len(group))] for group in target_groups]
-
- return tuple(element_groups)
-
- # resolve_elements()
- #
- # Resolve element state and cache keys.
- #
- # Args:
- # targets (list of Element): The list of toplevel element targets
- #
- def resolve_elements(self, targets):
- with self._context.messenger.simple_task("Resolving cached state", silent_nested=True) as task:
- # We need to go through the project to access the loader
- if task:
- task.set_maximum_progress(self._project.loader.loaded)
-
- # XXX: Now that Element._update_state() can trigger recursive update_state calls
- # it is possible that we could get a RecursionError. However, this is unlikely
- # to happen, even for large projects (tested with the Debian stack). Although,
- # if it does become a problem we may have to set the recursion limit to a
- # greater value.
- for element in self.dependencies(targets, _Scope.ALL):
- # Determine initial element state.
- element._initialize_state()
-
- # We may already have Elements which are cached and have their runtimes
- # cached, if this is the case, we should immediately notify their reverse
- # dependencies.
- element._update_ready_for_runtime_and_cached()
-
- if task:
- task.add_current_progress()
-
- # check_remotes()
- #
- # Check if the target artifact is cached in any of the available remotes
- #
- # Args:
- # targets (list [Element]): The list of element targets
- #
- def check_remotes(self, targets):
- with self._context.messenger.simple_task("Querying remotes for cached status", silent_nested=True) as task:
- task.set_maximum_progress(len(targets))
+# Yields:
+# Elements in the scope of the specified target elements
+#
+def dependencies(targets: List[Element], scope: int, *, recurse: bool = True) -> Iterator[Element]:
+ # Keep track of 'visited' in this scope, so that all targets
+ # share the same context.
+ visited = (BitMap(), BitMap())
- for element in targets:
- element._cached_remotely()
+ for target in targets:
+ for element in target._dependencies(scope, recurse=recurse, visited=visited):
+ yield element
- task.add_current_progress()
- # dependencies()
- #
- # Generator function to iterate over elements and optionally
- # also iterate over sources.
- #
- # Args:
- # targets (list of Element): The target Elements to loop over
- # scope (_Scope): The scope to iterate over
- # recurse (bool): Whether to recurse into dependencies
- #
- def dependencies(self, targets, scope, *, recurse=True):
- # Keep track of 'visited' in this scope, so that all targets
- # share the same context.
- visited = (BitMap(), BitMap())
-
- for target in targets:
- for element in target._dependencies(scope, recurse=recurse, visited=visited):
- yield element
+# get_selection()
+#
+# Gets a full list of elements based on a toplevel
+# list of element targets
+#
+# Various commands define a --deps option to specify what elements to
+# use in the result, this function reports a list that is appropriate for
+# the selected option.
+#
+# Args:
+# context: The invocation context
+# targets: The target Elements
+# mode: A value from PipelineSelection enumeration
+# silent: Whether to silence messages
+#
+# Returns:
+# A list of Elements appropriate for the specified selection mode
+#
+def get_selection(context: Context, targets: List[Element], mode: str, *, silent: bool = True) -> List[Element]:
+ def redirect_and_log() -> List[Element]:
+ # Redirect and log if permitted
+ elements: List[Element] = []
+ for t in targets:
+ new_elm = t._get_source_element()
+ if new_elm != t and not silent:
+ context.messenger.info("Element '{}' redirected to '{}'".format(t.name, new_elm.name))
+ if new_elm not in elements:
+ elements.append(new_elm)
+ return elements
- # plan()
- #
- # Generator function to iterate over only the elements
- # which are required to build the pipeline target, omitting
- # cached elements. The elements are yielded in a depth sorted
- # ordering for optimal build plans
- #
- # Args:
- # elements (list of Element): List of target elements to plan
- #
- # Returns:
- # (list of Element): A depth sorted list of the build plan
- #
- def plan(self, elements):
+ def plan() -> List[Element]:
# Keep locally cached elements in the plan if remote artifact cache is used
# to allow pulling artifact with strict cache key, if available.
- plan_cached = not self._context.get_strict() and self._artifacts.has_fetch_remotes()
+ plan_cached = not context.get_strict() and context.artifactcache.has_fetch_remotes()
+ return _Planner().plan(targets, plan_cached)
- return _Planner().plan(elements, plan_cached)
-
- # get_selection()
- #
- # Gets a full list of elements based on a toplevel
- # list of element targets
- #
- # Args:
- # targets (list of Element): The target Elements
- # mode (_PipelineSelection): The PipelineSelection mode
- #
- # Various commands define a --deps option to specify what elements to
- # use in the result, this function reports a list that is appropriate for
- # the selected option.
- #
- def get_selection(self, targets, mode, *, silent=True):
- def redirect_and_log():
- # Redirect and log if permitted
- elements = []
- for t in targets:
- new_elm = t._get_source_element()
- if new_elm != t and not silent:
- self._context.messenger.info("Element '{}' redirected to '{}'".format(t.name, new_elm.name))
- if new_elm not in elements:
- elements.append(new_elm)
- return elements
-
- # Work around python not having a switch statement; this is
- # much clearer than the if/elif/else block we used to have.
- #
- # Note that the lambda is necessary so that we don't evaluate
- # all possible values at run time; that would be slow.
- return {
- _PipelineSelection.NONE: lambda: targets,
- _PipelineSelection.REDIRECT: redirect_and_log,
- _PipelineSelection.PLAN: lambda: self.plan(targets),
- _PipelineSelection.ALL: lambda: list(self.dependencies(targets, _Scope.ALL)),
- _PipelineSelection.BUILD: lambda: list(self.dependencies(targets, _Scope.BUILD)),
- _PipelineSelection.RUN: lambda: list(self.dependencies(targets, _Scope.RUN)),
- }[mode]()
-
- # except_elements():
- #
- # Return what we are left with after the intersection between
- # excepted and target elements and their unique dependencies is
- # gone.
- #
- # Args:
- # targets (list of Element): List of toplevel targetted elements
- # elements (list of Element): The list to remove elements from
- # except_targets (list of Element): List of toplevel except targets
- #
- # Returns:
- # (list of Element): The elements list with the intersected
- # exceptions removed
- #
- def except_elements(self, targets, elements, except_targets):
- if not except_targets:
- return elements
-
- targeted = list(self.dependencies(targets, _Scope.ALL))
- visited = []
-
- def find_intersection(element):
- if element in visited:
- return
- visited.append(element)
-
- # Intersection elements are those that are also in
- # 'targeted', as long as we don't recurse into them.
- if element in targeted:
- yield element
- else:
- for dep in element._dependencies(_Scope.ALL, recurse=False):
- yield from find_intersection(dep)
-
- # Build a list of 'intersection' elements, i.e. the set of
- # elements that lie on the border closest to excepted elements
- # between excepted and target elements.
- intersection = list(itertools.chain.from_iterable(find_intersection(element) for element in except_targets))
-
- # Now use this set of elements to traverse the targeted
- # elements, except 'intersection' elements and their unique
- # dependencies.
- queue = []
- visited = []
-
- queue.extend(targets)
- while queue:
- element = queue.pop()
- if element in visited or element in intersection:
- continue
- visited.append(element)
-
- queue.extend(element._dependencies(_Scope.ALL, recurse=False))
-
- # That looks like a lot, but overall we only traverse (part
- # of) the graph twice. This could be reduced to once if we
- # kept track of parent elements, but is probably not
- # significant.
-
- # Ensure that we return elements in the same order they were
- # in before.
- return [element for element in elements if element in visited]
-
- # add_elements()
+ # Work around python not having a switch statement; this is
+ # much clearer than the if/elif/else block we used to have.
#
- # Add to a list of elements all elements that are not already in it
- #
- # Args:
- # elements (list of Element): The element list
- # add (list of Element): List of elements to add
- #
- # Returns:
- # (list): The original elements list, with elements in add that weren't
- # already in it added.
- def add_elements(self, elements, add):
- ret = elements[:]
- ret.extend(e for e in add if e not in ret)
- return ret
-
- # track_cross_junction_filter()
- #
- # Filters out elements which are across junction boundaries,
- # otherwise asserts that there are no such elements.
- #
- # This is currently assumed to be only relevant for element
- # lists targetted at tracking.
- #
- # Args:
- # project (Project): Project used for cross_junction filtering.
- # All elements are expected to belong to that project.
- # elements (list of Element): The list of elements to filter
- # cross_junction_requested (bool): Whether the user requested
- # cross junction tracking
- #
- # Returns:
- # (list of Element): The filtered or asserted result
- #
- def track_cross_junction_filter(self, project, elements, cross_junction_requested):
- # Filter out cross junctioned elements
- if not cross_junction_requested:
- elements = self._filter_cross_junctions(project, elements)
- self._assert_junction_tracking(elements)
+ # Note that the lambda is necessary so that we don't evaluate
+ # all possible values at run time; that would be slow.
+ return {
+ _PipelineSelection.NONE: lambda: targets,
+ _PipelineSelection.REDIRECT: redirect_and_log,
+ _PipelineSelection.PLAN: plan,
+ _PipelineSelection.ALL: lambda: list(dependencies(targets, _Scope.ALL)),
+ _PipelineSelection.BUILD: lambda: list(dependencies(targets, _Scope.BUILD)),
+ _PipelineSelection.RUN: lambda: list(dependencies(targets, _Scope.RUN)),
+ }[mode]()
- return elements
- # assert_consistent()
- #
- # Asserts that the given list of elements are in a consistent state, that
- # is to say that all sources are consistent and can at least be fetched.
- #
- # Consequently it also means that cache keys can be resolved.
- #
- def assert_consistent(self, elements):
- inconsistent = []
- inconsistent_workspaced = []
- with self._context.messenger.timed_activity("Checking sources"):
- for element in elements:
- if not element._has_all_sources_resolved():
- if element._get_workspace():
- inconsistent_workspaced.append(element)
- else:
- inconsistent.append(element)
-
- if inconsistent:
- detail = "Exact versions are missing for the following elements:\n\n"
- for element in inconsistent:
- detail += " Element: {} is inconsistent\n".format(element._get_full_name())
- for source in element.sources():
- if not source.is_resolved():
- detail += " {} is missing ref\n".format(source)
- detail += "\n"
- detail += "Try tracking these elements first with `bst source track`\n"
-
- raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline")
-
- if inconsistent_workspaced:
- detail = "Some workspaces exist but are not closed\n" + "Try closing them with `bst workspace close`\n\n"
- for element in inconsistent_workspaced:
- detail += " " + element._get_full_name() + "\n"
- raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline-workspaced")
-
- # assert_sources_cached()
- #
- # Asserts that sources for the given list of elements are cached.
- #
- # Args:
- # elements (list): The list of elements
- #
- def assert_sources_cached(self, elements):
- uncached = []
- with self._context.messenger.timed_activity("Checking sources"):
- for element in elements:
- if element._fetch_needed():
- uncached.append(element)
-
- if uncached:
- detail = "Sources are not cached for the following elements:\n\n"
- for element in uncached:
- detail += " Following sources for element: {} are not cached:\n".format(element._get_full_name())
- for source in element.sources():
- if not source._is_cached():
- detail += " {}\n".format(source)
- detail += "\n"
- detail += (
- "Try fetching these elements first with `bst source fetch`,\n"
- + "or run this command with `--fetch` option\n"
- )
-
- raise PipelineError("Uncached sources", detail=detail, reason="uncached-sources")
-
- #############################################################
- # Private Methods #
- #############################################################
-
- # _filter_cross_junction()
- #
- # Filters out cross junction elements from the elements
- #
- # Args:
- # project (Project): The project on which elements are allowed
- # elements (list of Element): The list of elements to be tracked
- #
- # Returns:
- # (list): A filtered list of `elements` which does
- # not contain any cross junction elements.
- #
- def _filter_cross_junctions(self, project, elements):
- return [element for element in elements if element._get_project() is project]
+# except_elements():
+#
+# This function calculates the intersection of the `except_targets`
+# element dependencies and the `targets` dependencies, and removes
+# that intersection from the `elements` list, returning the result.
+#
+# Args:
+# targets: List of toplevel targetted elements
+# elements: The list to remove elements from
+# except_targets: List of toplevel except targets
+#
+# Returns:
+# The elements list with the intersected exceptions removed
+#
+# Important notes on the behavior
+# ===============================
+#
+# * Except elements can be completely outside of the scope
+# of targets.
+#
+# * When the dependencies of except elements intersect with
+# dependencies of targets, those dependencies are removed
+# from the result.
+#
+# * If a target is found within the intersection of excepted
+# elements, that target and it's dependencies are considered
+# exempt from the exception intersection.
+#
+# Example:
+#
+# (t1) (e1)
+# / \ /
+# (o) (o) ( )
+# / \ / \
+# (o) (x) ( )
+# \ / \
+# (o) (x) ( )
+# \ /
+# (x)
+# / \
+# (x) (t2)
+# / \ / \
+# (x) (x) (o)
+# / \
+# (o) (o)
+#
+# Here we have a mockup graph with 2 target elements (t1) and (t2),
+# and one except element (e1) which lies outside of the graph.
+#
+# - ( ) elements are ignored, they were never in the element list
+# - (o) elements will be included in the result
+# - (x) elements are removed from the graph
+#
+# Note how (t2) reintroduces portions of the graph which were otherwise
+# tainted by being depended on indirectly by the (e1) except element.
+#
+def except_elements(targets: List[Element], elements: List[Element], except_targets: List[Element]) -> List[Element]:
+ if not except_targets:
+ return elements
- # _assert_junction_tracking()
- #
- # Raises an error if tracking is attempted on junctioned elements and
- # a project.refs file is not enabled for the toplevel project.
- #
- # Args:
- # elements (list of Element): The list of elements to be tracked
- #
- def _assert_junction_tracking(self, elements):
+ targeted: List[Element] = list(dependencies(targets, _Scope.ALL))
+ visited: List[Element] = []
- # We can track anything if the toplevel project uses project.refs
- #
- if self._project.ref_storage == ProjectRefStorage.PROJECT_REFS:
+ def find_intersection(element: Element) -> Iterator[Element]:
+ if element in visited:
return
-
- # Ideally, we would want to report every cross junction element but not
- # their dependencies, unless those cross junction elements dependencies
- # were also explicitly requested on the command line.
- #
- # But this is too hard, lets shoot for a simple error.
+ visited.append(element)
+
+ # Intersection elements are those that are also in
+ # 'targeted', as long as we don't recurse into them.
+ if element in targeted:
+ yield element
+ else:
+ for dep in element._dependencies(_Scope.ALL, recurse=False):
+ yield from find_intersection(dep)
+
+ # Build a list of 'intersection' elements, i.e. the set of
+ # elements that lie on the border closest to excepted elements
+ # between excepted and target elements.
+ intersection = list(itertools.chain.from_iterable(find_intersection(element) for element in except_targets))
+
+ # Now use this set of elements to traverse the targeted
+ # elements, except 'intersection' elements and their unique
+ # dependencies.
+ queue = []
+ visited = []
+
+ queue.extend(targets)
+ while queue:
+ element = queue.pop()
+ if element in visited or element in intersection:
+ continue
+ visited.append(element)
+
+ queue.extend(element._dependencies(_Scope.ALL, recurse=False))
+
+ # That looks like a lot, but overall we only traverse (part
+ # of) the graph twice. This could be reduced to once if we
+ # kept track of parent elements, but is probably not
+ # significant.
+
+ # Ensure that we return elements in the same order they were
+ # in before.
+ return [element for element in elements if element in visited]
+
+
+# assert_consistent()
+#
+# Asserts that the given list of elements are in a consistent state, that
+# is to say that all sources are consistent and can at least be fetched.
+#
+# Consequently it also means that cache keys can be resolved.
+#
+# Args:
+# context: The invocation context
+# elements: The elements to assert consistency on
+#
+# Raises:
+# PipelineError: If the elements are inconsistent.
+#
+def assert_consistent(context: Context, elements: List[Element]) -> None:
+ inconsistent = []
+ inconsistent_workspaced = []
+ with context.messenger.timed_activity("Checking sources"):
+ for element in elements:
+ if not element._has_all_sources_resolved():
+ if element._get_workspace():
+ inconsistent_workspaced.append(element)
+ else:
+ inconsistent.append(element)
+
+ if inconsistent:
+ detail = "Exact versions are missing for the following elements:\n\n"
+ for element in inconsistent:
+ detail += " Element: {} is inconsistent\n".format(element._get_full_name())
+ for source in element.sources():
+ if not source.is_resolved():
+ detail += " {} is missing ref\n".format(source)
+ detail += "\n"
+ detail += "Try tracking these elements first with `bst source track`\n"
+ raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline")
+
+ if inconsistent_workspaced:
+ detail = "Some workspaces exist but are not closed\n" + "Try closing them with `bst workspace close`\n\n"
+ for element in inconsistent_workspaced:
+ detail += " " + element._get_full_name() + "\n"
+ raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline-workspaced")
+
+
+# assert_sources_cached()
+#
+# Asserts that sources for the given list of elements are cached.
+#
+# Args:
+# context: The invocation context
+# elements: The elements to assert cached source state for
+#
+# Raises:
+# PipelineError: If the elements have uncached sources
+#
+def assert_sources_cached(context: Context, elements: List[Element]):
+ uncached = []
+ with context.messenger.timed_activity("Checking sources"):
for element in elements:
- element_project = element._get_project()
- if element_project is not self._project:
- detail = (
- "Requested to track sources across junction boundaries\n"
- + "in a project which does not use project.refs ref-storage."
- )
-
- raise PipelineError("Untrackable sources", detail=detail, reason="untrackable-sources")
+ if element._fetch_needed():
+ uncached.append(element)
+
+ if uncached:
+ detail = "Sources are not cached for the following elements:\n\n"
+ for element in uncached:
+ detail += " Following sources for element: {} are not cached:\n".format(element._get_full_name())
+ for source in element.sources():
+ if not source._is_cached():
+ detail += " {}\n".format(source)
+ detail += "\n"
+ detail += (
+ "Try fetching these elements first with `bst source fetch`,\n"
+ + "or run this command with `--fetch` option\n"
+ )
+ raise PipelineError("Uncached sources", detail=detail, reason="uncached-sources")
# _Planner()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index e05100f24..35cb230ed 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1,5 +1,5 @@
#
-# Copyright (C) 2018 Codethink Limited
+# Copyright (C) 2020 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -19,6 +19,7 @@
# Jürg Billeter <juerg.billeter@codethink.co.uk>
# Tristan Maat <tristan.maat@codethink.co.uk>
+import itertools
import os
import sys
import stat
@@ -44,12 +45,12 @@ from ._scheduler import (
ArtifactPushQueue,
)
from .element import Element
-from ._pipeline import Pipeline
from ._profile import Topics, PROFILER
+from ._project import ProjectRefStorage
from ._state import State
from .types import _KeyStrength, _PipelineSelection, _Scope
from .plugin import Plugin
-from . import utils, _yaml, _site
+from . import utils, _yaml, _site, _pipeline
# Stream()
@@ -84,7 +85,6 @@ class Stream:
self._elementsourcescache = None
self._sourcecache = None
self._project = None
- self._pipeline = None
self._state = State(session_start) # Owned by Stream, used by Core to set state
self._notification_queue = deque()
@@ -125,7 +125,6 @@ class Stream:
assert self._project is None
self._project = project
self._project.load_context.set_fetch_subprojects(self._fetch_subprojects)
- self._pipeline = Pipeline(self._context, project, self._artifacts)
# load_selection()
#
@@ -211,11 +210,14 @@ class Stream:
if pull_:
self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
- plan = self._pipeline.add_elements([element], elements)
+
+ # Pull the toplevel element regardless of whether it is in scope
+ plan = elements if element in elements else [element] + elements
+
self._enqueue_plan(plan)
self._run()
- missing_deps = [dep for dep in self._pipeline.dependencies([element], scope) if not dep._cached()]
+ missing_deps = [dep for dep in _pipeline.dependencies([element], scope) if not dep._cached()]
if missing_deps:
raise StreamError(
"Elements need to be built or downloaded before staging a shell environment",
@@ -245,7 +247,7 @@ class Stream:
# Ensure we have our sources if we are launching a build shell
if scope == _Scope.BUILD and not usebuildtree:
self._fetch([element])
- self._pipeline.assert_sources_cached([element])
+ _pipeline.assert_sources_cached(self._context, [element])
return element._shell(
scope, mounts=mounts, isolate=isolate, prompt=prompt(element), command=command, usebuildtree=usebuildtree
@@ -281,7 +283,7 @@ class Stream:
)
# Assert that the elements are consistent
- self._pipeline.assert_consistent(elements)
+ _pipeline.assert_consistent(self._context, elements)
if all(project.remote_execution_specs for project in self._context.get_projects()):
# Remote execution is configured for all projects.
@@ -406,7 +408,7 @@ class Stream:
if not self._sourcecache.has_push_remotes():
raise StreamError("No source caches available for pushing sources")
- self._pipeline.assert_consistent(elements)
+ _pipeline.assert_consistent(self._context, elements)
self._add_queue(FetchQueue(self._scheduler))
@@ -446,7 +448,7 @@ class Stream:
if not self._artifacts.has_fetch_remotes():
raise StreamError("No artifact caches available for pulling artifacts")
- self._pipeline.assert_consistent(elements)
+ _pipeline.assert_consistent(self._context, elements)
self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan(elements)
@@ -487,7 +489,7 @@ class Stream:
if not self._artifacts.has_push_remotes():
raise StreamError("No artifact caches available for pushing artifacts")
- self._pipeline.assert_consistent(elements)
+ _pipeline.assert_consistent(self._context, elements)
self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
@@ -618,7 +620,7 @@ class Stream:
)
if self._artifacts.has_fetch_remotes():
- self._pipeline.check_remotes(target_objects)
+ self._resolve_cached_remotely(target_objects)
return target_objects
@@ -743,7 +745,7 @@ class Stream:
# Assert all sources are cached in the source dir
self._fetch(elements)
- self._pipeline.assert_sources_cached(elements)
+ _pipeline.assert_sources_cached(self._context, elements)
# Stage all sources determined by scope
try:
@@ -1140,6 +1142,34 @@ class Stream:
ArtifactProject.clear_project_cache()
return list(artifacts)
+ # _load_elements()
+ #
+ # Loads elements from target names.
+ #
+ # This function is called with a list of lists, such that multiple
+ # target groups may be specified. Element names specified in `targets`
+ # are allowed to be redundant.
+ #
+ # Args:
+ # target_groups (list of lists): Groups of toplevel targets to load
+ #
+ # Returns:
+ # (tuple of lists): A tuple of Element object lists, grouped corresponding to target_groups
+ #
+ def _load_elements(self, target_groups):
+
+ # First concatenate all the lists for the loader's sake
+ targets = list(itertools.chain(*target_groups))
+
+ with PROFILER.profile(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, "-") for t in targets)):
+ elements = self._project.load_elements(targets)
+
+ # Now create element groups to match the input target groups
+ elt_iter = iter(elements)
+ element_groups = [[next(elt_iter) for i in range(len(group))] for group in target_groups]
+
+ return tuple(element_groups)
+
# _load_elements_from_targets
#
# Given the usual set of target element names/artifact refs, load
@@ -1166,20 +1196,24 @@ class Stream:
rewritable: bool = False,
valid_artifact_names: bool = False
) -> Tuple[List[Element], List[Element], List[Element]]:
- names, refs = self._expand_and_classify_targets(targets, valid_artifact_names=valid_artifact_names)
- loadable = [names, except_targets]
+
+ # First determine which of the user specified targets are artifact
+ # names and which are element names.
+ element_names, artifact_names = self._expand_and_classify_targets(
+ targets, valid_artifact_names=valid_artifact_names
+ )
self._project.load_context.set_rewritable(rewritable)
- # Load and filter elements
- if loadable:
- elements, except_elements = self._pipeline.load(loadable)
+ # Load elements and except elements
+ if element_names:
+ elements, except_elements = self._load_elements([element_names, except_targets])
else:
elements, except_elements = [], []
# Load artifacts
- if refs:
- artifacts = self._load_artifacts(refs)
+ if artifact_names:
+ artifacts = self._load_artifacts(artifact_names)
else:
artifacts = []
@@ -1205,6 +1239,21 @@ class Stream:
self._elementsourcescache.setup_remotes(use_config=use_source_config, remote_url=source_url)
self._sourcecache.setup_remotes(use_config=use_source_config, remote_url=source_url)
+ # _resolve_cached_remotely()
+ #
+ # Checks whether the listed elements are currently cached in
+ # any of their respectively configured remotes.
+ #
+ # Args:
+ # targets (list [Element]): The list of element targets
+ #
+ def _resolve_cached_remotely(self, targets):
+ with self._context.messenger.simple_task("Querying remotes for cached status", silent_nested=True) as task:
+ task.set_maximum_progress(len(targets))
+ for element in targets:
+ element._cached_remotely()
+ task.add_current_progress()
+
# _load_tracking()
#
# A variant of _load() to be used when the elements should be used
@@ -1251,11 +1300,56 @@ class Stream:
track_selected = []
for project, project_elements in track_projects.items():
- selected = self._pipeline.get_selection(project_elements, selection)
- selected = self._pipeline.track_cross_junction_filter(project, selected, cross_junctions)
+ selected = _pipeline.get_selection(self._context, project_elements, selection)
+ selected = self._track_cross_junction_filter(project, selected, cross_junctions)
track_selected.extend(selected)
- return self._pipeline.except_elements(elements, track_selected, except_elements)
+ return _pipeline.except_elements(elements, track_selected, except_elements)
+
+ # _track_cross_junction_filter()
+ #
+ # Filters out elements which are across junction boundaries,
+ # otherwise asserts that there are no such elements.
+ #
+ # This is currently assumed to be only relevant for element
+ # lists targetted at tracking.
+ #
+ # Args:
+ # project (Project): Project used for cross_junction filtering.
+ # All elements are expected to belong to that project.
+ # elements (list of Element): The list of elements to filter
+ # cross_junction_requested (bool): Whether the user requested
+ # cross junction tracking
+ #
+ # Returns:
+ # (list of Element): The filtered or asserted result
+ #
+ def _track_cross_junction_filter(self, project, elements, cross_junction_requested):
+
+ # First filter out cross junctioned elements
+ if not cross_junction_requested:
+ elements = [element for element in elements if element._get_project() is project]
+
+ # We can track anything if the toplevel project uses project.refs
+ #
+ if self._project.ref_storage == ProjectRefStorage.PROJECT_REFS:
+ return elements
+
+ # Ideally, we would want to report every cross junction element but not
+ # their dependencies, unless those cross junction elements dependencies
+ # were also explicitly requested on the command line.
+ #
+ # But this is too hard, lets shoot for a simple error.
+ for element in elements:
+ element_project = element._get_project()
+ if element_project is not self._project:
+ detail = (
+ "Requested to track sources across junction boundaries\n"
+ + "in a project which does not use project.refs ref-storage."
+ )
+ raise StreamError("Untrackable sources", detail=detail, reason="untrackable-sources")
+
+ return elements
# _load()
#
@@ -1315,9 +1409,9 @@ class Stream:
# Now move on to loading primary selection.
#
- self._pipeline.resolve_elements(self.targets)
- selected = self._pipeline.get_selection(self.targets, selection, silent=False)
- selected = self._pipeline.except_elements(self.targets, selected, except_elements)
+ self._resolve_elements(self.targets)
+ selected = _pipeline.get_selection(self._context, self.targets, selection, silent=False)
+ selected = _pipeline.except_elements(self.targets, selected, except_elements)
if selection == _PipelineSelection.PLAN and dynamic_plan:
# We use a dynamic build plan, only request artifacts of top-level targets,
@@ -1331,6 +1425,36 @@ class Stream:
return selected
+ # _resolve_elements()
+ #
+ # Resolve element state and cache keys.
+ #
+ # Args:
+ # targets (list of Element): The list of toplevel element targets
+ #
+ def _resolve_elements(self, targets):
+ with self._context.messenger.simple_task("Resolving cached state", silent_nested=True) as task:
+ # We need to go through the project to access the loader
+ if task:
+ task.set_maximum_progress(self._project.loader.loaded)
+
+ # XXX: Now that Element._update_state() can trigger recursive update_state calls
+ # it is possible that we could get a RecursionError. However, this is unlikely
+ # to happen, even for large projects (tested with the Debian stack). Although,
+ # if it does become a problem we may have to set the recursion limit to a
+ # greater value.
+ for element in _pipeline.dependencies(targets, _Scope.ALL):
+ # Determine initial element state.
+ element._initialize_state()
+
+ # We may already have Elements which are cached and have their runtimes
+ # cached, if this is the case, we should immediately notify their reverse
+ # dependencies.
+ element._update_ready_for_runtime_and_cached()
+
+ if task:
+ task.add_current_progress()
+
# _add_queue()
#
# Adds a queue to the stream
@@ -1374,7 +1498,7 @@ class Stream:
# Inform the frontend of the full list of elements
# and the list of elements which will be processed in this run
#
- self.total_elements = list(self._pipeline.dependencies(self.targets, _Scope.ALL))
+ self.total_elements = list(_pipeline.dependencies(self.targets, _Scope.ALL))
if announce_session and self._session_start_callback is not None:
self._session_start_callback()
@@ -1401,7 +1525,7 @@ class Stream:
def _fetch(self, elements: List[Element], *, fetch_original: bool = False, announce_session: bool = False):
# Assert consistency for the fetch elements
- self._pipeline.assert_consistent(elements)
+ _pipeline.assert_consistent(self._context, elements)
# Construct queues, enqueue and run
#
diff --git a/src/buildstream/testing/_sourcetests/track.py b/src/buildstream/testing/_sourcetests/track.py
index 38ef217f0..638cbb9b1 100644
--- a/src/buildstream/testing/_sourcetests/track.py
+++ b/src/buildstream/testing/_sourcetests/track.py
@@ -228,7 +228,7 @@ def test_cross_junction(cli, tmpdir, datafiles, ref_storage, kind):
if ref_storage == "inline":
# This is not allowed to track cross junction without project.refs.
- result.assert_main_error(ErrorDomain.PIPELINE, "untrackable-sources")
+ result.assert_main_error(ErrorDomain.STREAM, "untrackable-sources")
else:
result.assert_success()
diff --git a/tests/frontend/track.py b/tests/frontend/track.py
index bd8444973..3dd686de0 100644
--- a/tests/frontend/track.py
+++ b/tests/frontend/track.py
@@ -222,7 +222,7 @@ def test_track_cross_junction(cli, tmpdir, datafiles, cross_junction, ref_storag
# Cross junction tracking is not allowed when the toplevel project
# is using inline ref storage.
#
- result.assert_main_error(ErrorDomain.PIPELINE, "untrackable-sources")
+ result.assert_main_error(ErrorDomain.STREAM, "untrackable-sources")
else:
#
# No cross juction tracking was requested
diff --git a/tests/internals/pluginloading.py b/tests/internals/pluginloading.py
deleted file mode 100644
index 1f4446541..000000000
--- a/tests/internals/pluginloading.py
+++ /dev/null
@@ -1,38 +0,0 @@
-from contextlib import contextmanager
-import os
-import pytest
-
-from buildstream._project import Project
-from buildstream._pipeline import Pipeline
-
-from tests.testutils import dummy_context
-
-DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "pluginloading",)
-
-
-@contextmanager
-def create_pipeline(tmpdir, basedir, target):
- with dummy_context() as context:
- context.deploydir = os.path.join(str(tmpdir), "deploy")
- context.casdir = os.path.join(str(tmpdir), "cas")
- project = Project(basedir, context)
-
- pipeline = Pipeline(context, project, None)
- (targets,) = pipeline.load([(target,)])
- yield targets
-
-
-@pytest.mark.datafiles(os.path.join(DATA_DIR, "customsource"))
-def test_customsource(datafiles, tmpdir):
-
- basedir = str(datafiles)
- with create_pipeline(tmpdir, basedir, "simple.bst") as targets:
- assert targets[0].get_kind() == "autotools"
-
-
-@pytest.mark.datafiles(os.path.join(DATA_DIR, "customelement"))
-def test_customelement(datafiles, tmpdir):
-
- basedir = str(datafiles)
- with create_pipeline(tmpdir, basedir, "simple.bst") as targets:
- assert targets[0].get_kind() == "foo"
diff --git a/tests/internals/pluginloading/customelement/elements/simple.bst b/tests/internals/pluginloading/customelement/elements/simple.bst
deleted file mode 100644
index fc48e3ba9..000000000
--- a/tests/internals/pluginloading/customelement/elements/simple.bst
+++ /dev/null
@@ -1,4 +0,0 @@
-kind: foo
-description: Custom foo source
-config:
- pony-color: pink
diff --git a/tests/internals/pluginloading/customelement/pluginelements/__init__.py b/tests/internals/pluginloading/customelement/pluginelements/__init__.py
deleted file mode 100644
index e69de29bb..000000000
--- a/tests/internals/pluginloading/customelement/pluginelements/__init__.py
+++ /dev/null
diff --git a/tests/internals/pluginloading/customelement/pluginelements/foo.py b/tests/internals/pluginloading/customelement/pluginelements/foo.py
deleted file mode 100644
index bdb6c8982..000000000
--- a/tests/internals/pluginloading/customelement/pluginelements/foo.py
+++ /dev/null
@@ -1,19 +0,0 @@
-from buildstream import Element
-
-
-class FooElement(Element):
-
- BST_MIN_VERSION = "2.0"
-
- def preflight(self):
- pass
-
- def configure(self, node):
- pass
-
- def get_unique_key(self):
- return {}
-
-
-def setup():
- return FooElement
diff --git a/tests/internals/pluginloading/customelement/project.conf b/tests/internals/pluginloading/customelement/project.conf
deleted file mode 100644
index 2619bdf82..000000000
--- a/tests/internals/pluginloading/customelement/project.conf
+++ /dev/null
@@ -1,8 +0,0 @@
-name: pony
-min-version: 2.0
-element-path: elements
-plugins:
-- origin: local
- path: pluginelements
- elements:
- - foo
diff --git a/tests/internals/pluginloading/customsource/elements/simple.bst b/tests/internals/pluginloading/customsource/elements/simple.bst
deleted file mode 100644
index 7e0cc43b7..000000000
--- a/tests/internals/pluginloading/customsource/elements/simple.bst
+++ /dev/null
@@ -1,6 +0,0 @@
-kind: autotools
-description: Custom foo source
-sources:
-- kind: foo
- ref: 1.2.3
- uri: http://ponyland.com
diff --git a/tests/internals/pluginloading/customsource/pluginsources/__init__.py b/tests/internals/pluginloading/customsource/pluginsources/__init__.py
deleted file mode 100644
index e69de29bb..000000000
--- a/tests/internals/pluginloading/customsource/pluginsources/__init__.py
+++ /dev/null
diff --git a/tests/internals/pluginloading/customsource/pluginsources/foo.py b/tests/internals/pluginloading/customsource/pluginsources/foo.py
deleted file mode 100644
index c5229f3e2..000000000
--- a/tests/internals/pluginloading/customsource/pluginsources/foo.py
+++ /dev/null
@@ -1,19 +0,0 @@
-from buildstream import Source
-
-
-class FooSource(Source):
-
- BST_MIN_VERSION = "2.0"
-
- def preflight(self):
- pass
-
- def configure(self, node):
- pass
-
- def get_unique_key(self):
- pass
-
-
-def setup():
- return FooSource
diff --git a/tests/internals/pluginloading/customsource/project.conf b/tests/internals/pluginloading/customsource/project.conf
deleted file mode 100644
index 5cb6da537..000000000
--- a/tests/internals/pluginloading/customsource/project.conf
+++ /dev/null
@@ -1,8 +0,0 @@
-name: pony
-min-version: 2.0
-element-path: elements
-plugins:
-- origin: local
- path: pluginsources
- sources:
- - foo