# # Copyright (C) 2018 Codethink Limited # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . # # Authors: # Tristan Van Berkom # Jürg Billeter # Tristan Maat import os import sys import stat import shlex import shutil import tarfile import tempfile from contextlib import contextmanager, suppress from ._exceptions import StreamError, ImplError, BstError, set_last_task_error from ._message import Message, MessageType from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue from ._pipeline import Pipeline, PipelineSelection from . import utils, _yaml, _site from . import Scope, Consistency # Stream() # # This is the main, toplevel calling interface in BuildStream core. # # Args: # context (Context): The Context object # project (Project): The Project object # session_start (datetime): The time when the session started # session_start_callback (callable): A callback to invoke when the session starts # interrupt_callback (callable): A callback to invoke when we get interrupted # ticker_callback (callable): Invoked every second while running the scheduler # job_start_callback (callable): Called when a job starts # job_complete_callback (callable): Called when a job completes # class Stream(): def __init__(self, context, project, session_start, *, session_start_callback=None, interrupt_callback=None, ticker_callback=None, job_start_callback=None, job_complete_callback=None): # # Public members # self.targets = [] # Resolved target elements self.session_elements = [] # List of elements being processed this session self.total_elements = [] # Total list of elements based on targets self.queues = [] # Queue objects # # Private members # self._artifacts = context.artifactcache self._context = context self._project = project self._pipeline = Pipeline(context, project, self._artifacts) self._scheduler = Scheduler(context, session_start, interrupt_callback=interrupt_callback, ticker_callback=ticker_callback, job_start_callback=job_start_callback, job_complete_callback=job_complete_callback) self._first_non_track_queue = None self._session_start_callback = session_start_callback # cleanup() # # Cleans up application state # def cleanup(self): if self._project: self._project.cleanup() # load_selection() # # An all purpose method for loading a selection of elements, this # is primarily useful for the frontend to implement `bst show` # and `bst shell`. # # Args: # targets (list of str): Targets to pull # selection (PipelineSelection): The selection mode for the specified targets # except_targets (list of str): Specified targets to except from fetching # # Returns: # (list of Element): The selected elements def load_selection(self, targets, *, selection=PipelineSelection.NONE, except_targets=()): elements, _ = self._load(targets, (), selection=selection, except_targets=except_targets, fetch_subprojects=False) return elements # shell() # # Run a shell # # Args: # element (Element): An Element object to run the shell for # scope (Scope): The scope for the shell (Scope.BUILD or Scope.RUN) # prompt (str): The prompt to display in the shell # directory (str): A directory where an existing prestaged sysroot is expected, or None # mounts (list of HostMount): Additional directories to mount into the sandbox # isolate (bool): Whether to isolate the environment like we do in builds # command (list): An argv to launch in the sandbox, or None # usebuildtree (bool): Wheather to use a buildtree as the source. # # Returns: # (int): The exit code of the launched shell # def shell(self, element, scope, prompt, *, directory=None, mounts=None, isolate=False, command=None, usebuildtree=False): # Assert we have everything we need built, unless the directory is specified # in which case we just blindly trust the directory, using the element # definitions to control the execution environment only. if directory is None: missing_deps = [ dep._get_full_name() for dep in self._pipeline.dependencies([element], scope) if not dep._cached() ] if missing_deps: raise StreamError("Elements need to be built or downloaded before staging a shell environment", detail="\n".join(missing_deps)) return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command, usebuildtree=usebuildtree) # build() # # Builds (assembles) elements in the pipeline. # # Args: # targets (list of str): Targets to build # track_targets (list of str): Specified targets for tracking # track_except (list of str): Specified targets to except from tracking # track_cross_junctions (bool): Whether tracking should cross junction boundaries # ignore_junction_targets (bool): Whether junction targets should be filtered out # build_all (bool): Whether to build all elements, or only those # which are required to build the target. # def build(self, targets, *, track_targets=None, track_except=None, track_cross_junctions=False, ignore_junction_targets=False, build_all=False): if build_all: selection = PipelineSelection.ALL else: selection = PipelineSelection.PLAN elements, track_elements = \ self._load(targets, track_targets, selection=selection, track_selection=PipelineSelection.ALL, track_except_targets=track_except, track_cross_junctions=track_cross_junctions, ignore_junction_targets=ignore_junction_targets, use_artifact_config=True, fetch_subprojects=True, dynamic_plan=True) # Remove the tracking elements from the main targets elements = self._pipeline.subtract_elements(elements, track_elements) # Assert that the elements we're not going to track are consistent self._pipeline.assert_consistent(elements) # Now construct the queues # track_queue = None if track_elements: track_queue = TrackQueue(self._scheduler) self._add_queue(track_queue, track=True) if self._artifacts.has_fetch_remotes(): self._add_queue(PullQueue(self._scheduler)) self._add_queue(FetchQueue(self._scheduler, skip_cached=True)) self._add_queue(BuildQueue(self._scheduler)) if self._artifacts.has_push_remotes(): self._add_queue(PushQueue(self._scheduler)) # Enqueue elements # if track_elements: self._enqueue_plan(track_elements, queue=track_queue) self._enqueue_plan(elements) self._run() # fetch() # # Fetches sources on the pipeline. # # Args: # targets (list of str): Targets to fetch # selection (PipelineSelection): The selection mode for the specified targets # except_targets (list of str): Specified targets to except from fetching # track_targets (bool): Whether to track selected targets in addition to fetching # track_cross_junctions (bool): Whether tracking should cross junction boundaries # def fetch(self, targets, *, selection=PipelineSelection.PLAN, except_targets=None, track_targets=False, track_cross_junctions=False): if track_targets: track_targets = targets track_selection = selection track_except_targets = except_targets else: track_targets = () track_selection = PipelineSelection.NONE track_except_targets = () elements, track_elements = \ self._load(targets, track_targets, selection=selection, track_selection=track_selection, except_targets=except_targets, track_except_targets=track_except_targets, track_cross_junctions=track_cross_junctions, fetch_subprojects=True) # Delegated to a shared fetch method self._fetch(elements, track_elements=track_elements) # track() # # Tracks all the sources of the selected elements. # # Args: # targets (list of str): Targets to track # selection (PipelineSelection): The selection mode for the specified targets # except_targets (list of str): Specified targets to except from tracking # cross_junctions (bool): Whether tracking should cross junction boundaries # # If no error is encountered while tracking, then the project files # are rewritten inline. # def track(self, targets, *, selection=PipelineSelection.REDIRECT, except_targets=None, cross_junctions=False): # We pass no target to build. Only to track. Passing build targets # would fully load project configuration which might not be # possible before tracking is done. _, elements = \ self._load([], targets, selection=selection, track_selection=selection, except_targets=except_targets, track_except_targets=except_targets, track_cross_junctions=cross_junctions, fetch_subprojects=True) track_queue = TrackQueue(self._scheduler) self._add_queue(track_queue, track=True) self._enqueue_plan(elements, queue=track_queue) self._run() # pull() # # Pulls artifacts from remote artifact server(s) # # Args: # targets (list of str): Targets to pull # selection (PipelineSelection): The selection mode for the specified targets # ignore_junction_targets (bool): Whether junction targets should be filtered out # remote (str): The URL of a specific remote server to pull from, or None # # If `remote` specified as None, then regular configuration will be used # to determine where to pull artifacts from. # def pull(self, targets, *, selection=PipelineSelection.NONE, ignore_junction_targets=False, remote=None): use_config = True if remote: use_config = False elements, _ = self._load(targets, (), selection=selection, ignore_junction_targets=ignore_junction_targets, use_artifact_config=use_config, artifact_remote_url=remote, fetch_subprojects=True) if not self._artifacts.has_fetch_remotes(): raise StreamError("No artifact caches available for pulling artifacts") self._pipeline.assert_consistent(elements) self._add_queue(PullQueue(self._scheduler)) self._enqueue_plan(elements) self._run() # push() # # Pulls artifacts to remote artifact server(s) # # Args: # targets (list of str): Targets to push # selection (PipelineSelection): The selection mode for the specified targets # ignore_junction_targets (bool): Whether junction targets should be filtered out # remote (str): The URL of a specific remote server to push to, or None # # If `remote` specified as None, then regular configuration will be used # to determine where to push artifacts to. # # If any of the given targets are missing their expected buildtree artifact, # a pull queue will be created if user context and available remotes allow for # attempting to fetch them. # def push(self, targets, *, selection=PipelineSelection.NONE, ignore_junction_targets=False, remote=None): use_config = True if remote: use_config = False elements, _ = self._load(targets, (), selection=selection, ignore_junction_targets=ignore_junction_targets, use_artifact_config=use_config, artifact_remote_url=remote, fetch_subprojects=True) if not self._artifacts.has_push_remotes(): raise StreamError("No artifact caches available for pushing artifacts") self._pipeline.assert_consistent(elements) # Check if we require a pull queue, with given artifact state and context require_buildtrees = self._buildtree_pull_required(elements) if require_buildtrees: self._message(MessageType.INFO, "Attempting to fetch missing artifact buildtrees") self._add_queue(PullQueue(self._scheduler)) self._enqueue_plan(require_buildtrees) push_queue = PushQueue(self._scheduler) self._add_queue(push_queue) self._enqueue_plan(elements, queue=push_queue) self._run() # checkout() # # Checkout target artifact to the specified location # # Args: # target (str): Target to checkout # location (str): Location to checkout the artifact to # force (bool): Whether files can be overwritten if necessary # scope (str): The scope of dependencies to checkout # integrate (bool): Whether to run integration commands # hardlinks (bool): Whether checking out files hardlinked to # their artifacts is acceptable # tar (bool): If true, a tarball from the artifact contents will # be created, otherwise the file tree of the artifact # will be placed at the given location. If true and # location is '-', the tarball will be dumped on the # standard output. # def checkout(self, target, *, location=None, force=False, scope=Scope.RUN, integrate=True, hardlinks=False, tar=False): # We only have one target in a checkout command elements, _ = self._load((target,), (), fetch_subprojects=True) target = elements[0] self._check_location_writable(location, force=force, tar=tar) # Stage deps into a temporary sandbox first try: with target._prepare_sandbox(scope=scope, directory=None, integrate=integrate) as sandbox: # Copy or move the sandbox to the target directory sandbox_vroot = sandbox.get_virtual_directory() if not tar: with target.timed_activity("Checking out files in '{}'" .format(location)): try: if hardlinks: self._checkout_hardlinks(sandbox_vroot, location) else: sandbox_vroot.export_files(location) except OSError as e: raise StreamError("Failed to checkout files: '{}'" .format(e)) from e else: if location == '-': with target.timed_activity("Creating tarball"): # Save the stdout FD to restore later saved_fd = os.dup(sys.stdout.fileno()) try: with os.fdopen(sys.stdout.fileno(), 'wb') as fo: with tarfile.open(fileobj=fo, mode="w|") as tf: sandbox_vroot.export_to_tar(tf, '.') finally: # No matter what, restore stdout for further use os.dup2(saved_fd, sys.stdout.fileno()) os.close(saved_fd) else: with target.timed_activity("Creating tarball '{}'" .format(location)): with tarfile.open(location, "w:") as tf: sandbox_vroot.export_to_tar(tf, '.') except BstError as e: raise StreamError("Error while staging dependencies into a sandbox" ": '{}'".format(e), detail=e.detail, reason=e.reason) from e # source_checkout() # # Checkout sources of the target element to the specified location # # Args: # target (str): The target element whose sources to checkout # location (str): Location to checkout the sources to # deps (str): The dependencies to checkout # fetch (bool): Whether to fetch missing sources # except_targets (list): List of targets to except from staging # def source_checkout(self, target, *, location=None, force=False, deps='none', fetch=False, except_targets=(), tar=False, include_build_scripts=False): self._check_location_writable(location, force=force, tar=tar) elements, _ = self._load((target,), (), selection=deps, except_targets=except_targets, fetch_subprojects=True) # Assert all sources are cached if fetch: self._fetch(elements) self._pipeline.assert_sources_cached(elements) # Stage all sources determined by scope try: self._source_checkout(elements, location, force, deps, fetch, tar, include_build_scripts) except BstError as e: raise StreamError("Error while writing sources" ": '{}'".format(e), detail=e.detail, reason=e.reason) from e # workspace_open # # Open a project workspace # # Args: # targets (list): List of target elements to open workspaces for # no_checkout (bool): Whether to skip checking out the source # track_first (bool): Whether to track and fetch first # force (bool): Whether to ignore contents in an existing directory # custom_dir (str): Custom location to create a workspace or false to use default location. # def workspace_open(self, targets, *, no_checkout, track_first, force, custom_dir): # This function is a little funny but it is trying to be as atomic as possible. if track_first: track_targets = targets else: track_targets = () elements, track_elements = self._load(targets, track_targets, selection=PipelineSelection.REDIRECT, track_selection=PipelineSelection.REDIRECT) workspaces = self._context.get_workspaces() # If we're going to checkout, we need at least a fetch, # if we were asked to track first, we're going to fetch anyway. # if not no_checkout or track_first: track_elements = [] if track_first: track_elements = elements self._fetch(elements, track_elements=track_elements) expanded_directories = [] # To try to be more atomic, loop through the elements and raise any errors we can early for target in elements: if not list(target.sources()): build_depends = [x.name for x in target.dependencies(Scope.BUILD, recurse=False)] if not build_depends: raise StreamError("The element {} has no sources".format(target.name)) detail = "Try opening a workspace on one of its dependencies instead:\n" detail += " \n".join(build_depends) raise StreamError("The element {} has no sources".format(target.name), detail=detail) # Check for workspace config workspace = workspaces.get_workspace(target._get_full_name()) if workspace and not force: raise StreamError("Element '{}' already has workspace defined at: {}" .format(target.name, workspace.get_absolute_path())) if not no_checkout and target._get_consistency() != Consistency.CACHED: raise StreamError("Could not stage uncached source. For {} ".format(target.name) + "Use `--track` to track and " + "fetch the latest version of the " + "source.") if not custom_dir: directory = os.path.abspath(os.path.join(self._context.workspacedir, target.name)) if directory[-4:] == '.bst': directory = directory[:-4] expanded_directories.append(directory) if custom_dir: if len(elements) != 1: raise StreamError("Exactly one element can be given if --directory is used", reason='directory-with-multiple-elements') directory = os.path.abspath(custom_dir) expanded_directories = [directory, ] else: # If this fails it is a bug in what ever calls this, usually cli.py and so can not be tested for via the # run bst test mechanism. assert len(elements) == len(expanded_directories) for target, directory in zip(elements, expanded_directories): if os.path.exists(directory): if not os.path.isdir(directory): raise StreamError("For element '{}', Directory path is not a directory: {}" .format(target.name, directory), reason='bad-directory') if not (no_checkout or force) and os.listdir(directory): raise StreamError("For element '{}', Directory path is not empty: {}" .format(target.name, directory), reason='bad-directory') # So far this function has tried to catch as many issues as possible with out making any changes # Now it dose the bits that can not be made atomic. targetGenerator = zip(elements, expanded_directories) for target, directory in targetGenerator: self._message(MessageType.INFO, "Creating workspace for element {}" .format(target.name)) workspace = workspaces.get_workspace(target._get_full_name()) if workspace: workspaces.delete_workspace(target._get_full_name()) workspaces.save_config() shutil.rmtree(directory) try: os.makedirs(directory, exist_ok=True) except OSError as e: todo_elements = " ".join([str(target.name) for target, directory_dict in targetGenerator]) if todo_elements: # This output should make creating the remaining workspaces as easy as possible. todo_elements = "\nDid not try to create workspaces for " + todo_elements raise StreamError("Failed to create workspace directory: {}".format(e) + todo_elements) from e workspaces.create_workspace(target, directory, checkout=not no_checkout) self._message(MessageType.INFO, "Created a workspace for element: {}" .format(target._get_full_name())) # workspace_close # # Close a project workspace # # Args: # element_name (str): The element name to close the workspace for # remove_dir (bool): Whether to remove the associated directory # def workspace_close(self, element_name, *, remove_dir): workspaces = self._context.get_workspaces() workspace = workspaces.get_workspace(element_name) # Remove workspace directory if prompted if remove_dir: with self._context.timed_activity("Removing workspace directory {}" .format(workspace.get_absolute_path())): try: shutil.rmtree(workspace.get_absolute_path()) except OSError as e: raise StreamError("Could not remove '{}': {}" .format(workspace.get_absolute_path(), e)) from e # Delete the workspace and save the configuration workspaces.delete_workspace(element_name) workspaces.save_config() self._message(MessageType.INFO, "Closed workspace for {}".format(element_name)) # workspace_reset # # Reset a workspace to its original state, discarding any user # changes. # # Args: # targets (list of str): The target elements to reset the workspace for # soft (bool): Only reset workspace state # track_first (bool): Whether to also track the sources first # def workspace_reset(self, targets, *, soft, track_first): if track_first: track_targets = targets else: track_targets = () elements, track_elements = self._load(targets, track_targets, selection=PipelineSelection.REDIRECT, track_selection=PipelineSelection.REDIRECT) nonexisting = [] for element in elements: if not self.workspace_exists(element.name): nonexisting.append(element.name) if nonexisting: raise StreamError("Workspace does not exist", detail="\n".join(nonexisting)) # Do the tracking first if track_first: self._fetch(elements, track_elements=track_elements) workspaces = self._context.get_workspaces() for element in elements: workspace = workspaces.get_workspace(element._get_full_name()) workspace_path = workspace.get_absolute_path() if soft: workspace.prepared = False self._message(MessageType.INFO, "Reset workspace state for {} at: {}" .format(element.name, workspace_path)) continue with element.timed_activity("Removing workspace directory {}" .format(workspace_path)): try: shutil.rmtree(workspace_path) except OSError as e: raise StreamError("Could not remove '{}': {}" .format(workspace_path, e)) from e workspaces.delete_workspace(element._get_full_name()) workspaces.create_workspace(element, workspace_path, checkout=True) self._message(MessageType.INFO, "Reset workspace for {} at: {}".format(element.name, workspace_path)) workspaces.save_config() # workspace_exists # # Check if a workspace exists # # Args: # element_name (str): The element name to close the workspace for, or None # # Returns: # (bool): True if the workspace exists # # If None is specified for `element_name`, then this will return # True if there are any existing workspaces. # def workspace_exists(self, element_name=None): workspaces = self._context.get_workspaces() if element_name: workspace = workspaces.get_workspace(element_name) if workspace: return True elif any(workspaces.list()): return True return False # workspace_is_required() # # Checks whether the workspace belonging to element_name is required to # load the project # # Args: # element_name (str): The element whose workspace may be required # # Returns: # (bool): True if the workspace is required def workspace_is_required(self, element_name): invoked_elm = self._project.invoked_from_workspace_element() return invoked_elm == element_name # workspace_list # # Serializes the workspaces and dumps them in YAML to stdout. # def workspace_list(self): workspaces = [] for element_name, workspace_ in self._context.get_workspaces().list(): workspace_detail = { 'element': element_name, 'directory': workspace_.get_absolute_path(), } workspaces.append(workspace_detail) _yaml.dump({ 'workspaces': workspaces }) # redirect_element_names() # # Takes a list of element names and returns a list where elements have been # redirected to their source elements if the element file exists, and just # the name, if not. # # Args: # elements (list of str): The element names to redirect # # Returns: # (list of str): The element names after redirecting # def redirect_element_names(self, elements): element_dir = self._project.element_path load_elements = [] output_elements = set() for e in elements: element_path = os.path.join(element_dir, e) if os.path.exists(element_path): load_elements.append(e) else: output_elements.add(e) if load_elements: loaded_elements, _ = self._load(load_elements, (), selection=PipelineSelection.REDIRECT, track_selection=PipelineSelection.REDIRECT) for e in loaded_elements: output_elements.add(e.name) return list(output_elements) ############################################################# # Scheduler API forwarding # ############################################################# # running # # Whether the scheduler is running # @property def running(self): return self._scheduler.loop is not None # suspended # # Whether the scheduler is currently suspended # @property def suspended(self): return self._scheduler.suspended # terminated # # Whether the scheduler is currently terminated # @property def terminated(self): return self._scheduler.terminated # elapsed_time # # Elapsed time since the session start # @property def elapsed_time(self): return self._scheduler.elapsed_time() # terminate() # # Terminate jobs # def terminate(self): self._scheduler.terminate_jobs() # quit() # # Quit the session, this will continue with any ongoing # jobs, use Stream.terminate() instead for cancellation # of ongoing jobs # def quit(self): self._scheduler.stop_queueing() # suspend() # # Context manager to suspend ongoing jobs # @contextmanager def suspend(self): with self._scheduler.jobs_suspended(): yield ############################################################# # Private Methods # ############################################################# # _load() # # A convenience method for loading element lists # # If `targets` is not empty used project configuration will be # fully loaded. If `targets` is empty, tracking will still be # resolved for elements in `track_targets`, but no build pipeline # will be resolved. This is behavior is import for track() to # not trigger full loading of project configuration. # # Args: # targets (list of str): Main targets to load # track_targets (list of str): Tracking targets # selection (PipelineSelection): The selection mode for the specified targets # track_selection (PipelineSelection): The selection mode for the specified tracking targets # except_targets (list of str): Specified targets to except from fetching # track_except_targets (list of str): Specified targets to except from fetching # track_cross_junctions (bool): Whether tracking should cross junction boundaries # ignore_junction_targets (bool): Whether junction targets should be filtered out # use_artifact_config (bool): Whether to initialize artifacts with the config # artifact_remote_url (bool): A remote url for initializing the artifacts # fetch_subprojects (bool): Whether to fetch subprojects while loading # # Returns: # (list of Element): The primary element selection # (list of Element): The tracking element selection # def _load(self, targets, track_targets, *, selection=PipelineSelection.NONE, track_selection=PipelineSelection.NONE, except_targets=(), track_except_targets=(), track_cross_junctions=False, ignore_junction_targets=False, use_artifact_config=False, artifact_remote_url=None, fetch_subprojects=False, dynamic_plan=False): # Load rewritable if we have any tracking selection to make rewritable = False if track_targets: rewritable = True # Load all targets elements, except_elements, track_elements, track_except_elements = \ self._pipeline.load([targets, except_targets, track_targets, track_except_targets], rewritable=rewritable, fetch_subprojects=fetch_subprojects) # Optionally filter out junction elements if ignore_junction_targets: elements = [e for e in elements if e.get_kind() != 'junction'] # Hold on to the targets self.targets = elements # Here we should raise an error if the track_elements targets # are not dependencies of the primary targets, this is not # supported. # # This can happen with `bst build --track` # if targets and not self._pipeline.targets_include(elements, track_elements): raise StreamError("Specified tracking targets that are not " "within the scope of primary targets") # First take care of marking tracking elements, this must be # done before resolving element states. # assert track_selection != PipelineSelection.PLAN # Tracked elements are split by owner projects in order to # filter cross junctions tracking dependencies on their # respective project. track_projects = {} for element in track_elements: project = element._get_project() if project not in track_projects: track_projects[project] = [element] else: track_projects[project].append(element) track_selected = [] for project, project_elements in track_projects.items(): selected = self._pipeline.get_selection(project_elements, track_selection) selected = self._pipeline.track_cross_junction_filter(project, selected, track_cross_junctions) track_selected.extend(selected) track_selected = self._pipeline.except_elements(track_elements, track_selected, track_except_elements) for element in track_selected: element._schedule_tracking() if not targets: self._pipeline.resolve_elements(track_selected) return [], track_selected # ArtifactCache.setup_remotes expects all projects to be fully loaded for project in self._context.get_projects(): project.ensure_fully_loaded() # Connect to remote caches, this needs to be done before resolving element state self._artifacts.setup_remotes(use_config=use_artifact_config, remote_url=artifact_remote_url) # Now move on to loading primary selection. # self._pipeline.resolve_elements(elements) selected = self._pipeline.get_selection(elements, selection, silent=False) selected = self._pipeline.except_elements(elements, selected, except_elements) # Set the "required" artifacts that should not be removed # while this pipeline is active # # It must include all the artifacts which are required by the # final product. Note that this is a superset of the build plan. # self._artifacts.mark_required_elements(self._pipeline.dependencies(elements, Scope.ALL)) if selection == PipelineSelection.PLAN and dynamic_plan: # We use a dynamic build plan, only request artifacts of top-level targets, # others are requested dynamically as needed. # This avoids pulling, fetching, or building unneeded build-only dependencies. for element in elements: element._set_required() else: for element in selected: element._set_required() return selected, track_selected # _message() # # Local message propagator # def _message(self, message_type, message, **kwargs): args = dict(kwargs) self._context.message( Message(None, message_type, message, **args)) # _add_queue() # # Adds a queue to the stream # # Args: # queue (Queue): Queue to add to the pipeline # track (bool): Whether this is the tracking queue # def _add_queue(self, queue, *, track=False): self.queues.append(queue) if not (track or self._first_non_track_queue): self._first_non_track_queue = queue # _enqueue_plan() # # Enqueues planned elements to the specified queue. # # Args: # plan (list of Element): The list of elements to be enqueued # queue (Queue): The target queue, defaults to the first non-track queue # def _enqueue_plan(self, plan, *, queue=None): queue = queue or self._first_non_track_queue queue.enqueue(plan) self.session_elements += plan # _run() # # Common function for running the scheduler # def _run(self): # Inform the frontend of the full list of elements # and the list of elements which will be processed in this run # self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL)) if self._session_start_callback is not None: self._session_start_callback() _, status = self._scheduler.run(self.queues) # Force update element states after a run, such that the summary # is more coherent try: for element in self.total_elements: element._update_state() except BstError as e: self._message(MessageType.ERROR, "Error resolving final state", detail=str(e)) set_last_task_error(e.domain, e.reason) except Exception as e: # pylint: disable=broad-except self._message(MessageType.BUG, "Unhandled exception while resolving final state", detail=str(e)) if status == SchedStatus.ERROR: raise StreamError() elif status == SchedStatus.TERMINATED: raise StreamError(terminated=True) # _fetch() # # Performs the fetch job, the body of this function is here because # it is shared between a few internals. # # Args: # elements (list of Element): Elements to fetch # track_elements (list of Element): Elements to track # def _fetch(self, elements, *, track_elements=None): if track_elements is None: track_elements = [] # Subtract the track elements from the fetch elements, they will be added separately fetch_plan = self._pipeline.subtract_elements(elements, track_elements) # Assert consistency for the fetch elements self._pipeline.assert_consistent(fetch_plan) # Filter out elements with cached sources, only from the fetch plan # let the track plan resolve new refs. cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED] fetch_plan = self._pipeline.subtract_elements(fetch_plan, cached) # Construct queues, enqueue and run # track_queue = None if track_elements: track_queue = TrackQueue(self._scheduler) self._add_queue(track_queue, track=True) self._add_queue(FetchQueue(self._scheduler)) if track_elements: self._enqueue_plan(track_elements, queue=track_queue) self._enqueue_plan(fetch_plan) self._run() # _check_location_writable() # # Check if given location is writable. # # Args: # location (str): Destination path # force (bool): Allow files to be overwritten # tar (bool): Whether destination is a tarball # # Raises: # (StreamError): If the destination is not writable # def _check_location_writable(self, location, force=False, tar=False): if not tar: try: os.makedirs(location, exist_ok=True) except OSError as e: raise StreamError("Failed to create destination directory: '{}'" .format(e)) from e if not os.access(location, os.W_OK): raise StreamError("Destination directory '{}' not writable" .format(location)) if not force and os.listdir(location): raise StreamError("Destination directory '{}' not empty" .format(location)) elif os.path.exists(location) and location != '-': if not os.access(location, os.W_OK): raise StreamError("Output file '{}' not writable" .format(location)) if not force and os.path.exists(location): raise StreamError("Output file '{}' already exists" .format(location)) # Helper function for checkout() # def _checkout_hardlinks(self, sandbox_vroot, directory): try: utils.safe_remove(directory) except OSError as e: raise StreamError("Failed to remove checkout directory: {}".format(e)) from e sandbox_vroot.export_files(directory, can_link=True, can_destroy=True) # Helper function for source_checkout() def _source_checkout(self, elements, location=None, force=False, deps='none', fetch=False, tar=False, include_build_scripts=False): location = os.path.abspath(location) location_parent = os.path.abspath(os.path.join(location, "..")) # Stage all our sources in a temporary directory. The this # directory can be used to either construct a tarball or moved # to the final desired location. temp_source_dir = tempfile.TemporaryDirectory(dir=location_parent) try: self._write_element_sources(temp_source_dir.name, elements) if include_build_scripts: self._write_build_scripts(temp_source_dir.name, elements) if tar: self._create_tarball(temp_source_dir.name, location) else: self._move_directory(temp_source_dir.name, location, force) except OSError as e: raise StreamError("Failed to checkout sources to {}: {}" .format(location, e)) from e finally: with suppress(FileNotFoundError): temp_source_dir.cleanup() # Move a directory src to dest. This will work across devices and # may optionaly overwrite existing files. def _move_directory(self, src, dest, force=False): def is_empty_dir(path): return os.path.isdir(dest) and not os.listdir(dest) try: os.rename(src, dest) return except OSError: pass if force or is_empty_dir(dest): try: utils.link_files(src, dest) except utils.UtilError as e: raise StreamError("Failed to move directory: {}".format(e)) from e # Write the element build script to the given directory def _write_element_script(self, directory, element): try: element._write_script(directory) except ImplError: return False return True # Write all source elements to the given directory def _write_element_sources(self, directory, elements): for element in elements: element_source_dir = self._get_element_dirname(directory, element) if list(element.sources()): os.makedirs(element_source_dir) element._stage_sources_at(element_source_dir, mount_workspaces=False) # Create a tarball from the content of directory def _create_tarball(self, directory, tar_name): try: with utils.save_file_atomic(tar_name, mode='wb') as f: # This TarFile does not need to be explicitly closed # as the underlying file object will be closed be the # save_file_atomic contect manager tarball = tarfile.open(fileobj=f, mode='w') for item in os.listdir(str(directory)): file_to_add = os.path.join(directory, item) tarball.add(file_to_add, arcname=item) except OSError as e: raise StreamError("Failed to create tar archive: {}".format(e)) from e # Write all the build_scripts for elements in the directory location def _write_build_scripts(self, location, elements): for element in elements: self._write_element_script(location, element) self._write_master_build_script(location, elements) # Write a master build script to the sandbox def _write_master_build_script(self, directory, elements): module_string = "" for element in elements: module_string += shlex.quote(element.normal_name) + " " script_path = os.path.join(directory, "build.sh") with open(_site.build_all_template, "r") as f: script_template = f.read() with utils.save_file_atomic(script_path, "w") as script: script.write(script_template.format(modules=module_string)) os.chmod(script_path, stat.S_IEXEC | stat.S_IREAD) # Collect the sources in the given sandbox into a tarfile def _collect_sources(self, directory, tar_name, element_name, compression): with self._context.timed_activity("Creating tarball {}".format(tar_name)): if compression == "none": permissions = "w:" else: permissions = "w:" + compression with tarfile.open(tar_name, permissions) as tar: tar.add(directory, arcname=element_name) # _get_element_dirname() # # Get path to directory for an element based on its normal name. # # For cross-junction elements, the path will be prefixed with the name # of the junction element. # # Args: # directory (str): path to base directory # element (Element): the element # # Returns: # (str): Path to directory for this element # def _get_element_dirname(self, directory, element): parts = [element.normal_name] while element._get_project() != self._project: element = element._get_project().junction parts.append(element.normal_name) return os.path.join(directory, *reversed(parts)) # _buildtree_pull_required() # # Check if current task, given config, requires element buildtree artifact # # Args: # elements (list): elements to check if buildtrees are required # # Returns: # (list): elements requiring buildtrees # def _buildtree_pull_required(self, elements): required_list = [] # If context is set to not pull buildtrees, or no fetch remotes, return empty list if not self._context.pull_buildtrees or not self._artifacts.has_fetch_remotes(): return required_list for element in elements: # Check if element is partially cached without its buildtree, as the element # artifact may not be cached at all if element._cached() and not element._cached_buildtree(): required_list.append(element) return required_list