diff options
Diffstat (limited to 'zephyr/zmake/zmake')
-rw-r--r-- | zephyr/zmake/zmake/__init__.py | 0 | ||||
-rw-r--r-- | zephyr/zmake/zmake/__main__.py | 273 | ||||
-rw-r--r-- | zephyr/zmake/zmake/build_config.py | 100 | ||||
-rw-r--r-- | zephyr/zmake/zmake/jobserver.py | 144 | ||||
-rw-r--r-- | zephyr/zmake/zmake/modules.py | 99 | ||||
-rw-r--r-- | zephyr/zmake/zmake/multiproc.py | 322 | ||||
-rw-r--r-- | zephyr/zmake/zmake/output_packers.py | 230 | ||||
-rw-r--r-- | zephyr/zmake/zmake/project.py | 248 | ||||
-rw-r--r-- | zephyr/zmake/zmake/toolchains.py | 154 | ||||
-rw-r--r-- | zephyr/zmake/zmake/util.py | 255 | ||||
-rw-r--r-- | zephyr/zmake/zmake/version.py | 166 | ||||
-rw-r--r-- | zephyr/zmake/zmake/zmake.py | 757 |
12 files changed, 0 insertions, 2748 deletions
diff --git a/zephyr/zmake/zmake/__init__.py b/zephyr/zmake/zmake/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 --- a/zephyr/zmake/zmake/__init__.py +++ /dev/null diff --git a/zephyr/zmake/zmake/__main__.py b/zephyr/zmake/zmake/__main__.py deleted file mode 100644 index 31f0436b5a..0000000000 --- a/zephyr/zmake/zmake/__main__.py +++ /dev/null @@ -1,273 +0,0 @@ -# Copyright 2020 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""The entry point into zmake.""" -import argparse -import inspect -import logging -import os -import pathlib -import sys - -import zmake.multiproc as multiproc -import zmake.zmake as zm - - -def maybe_reexec(argv): - """Re-exec zmake from the EC source tree, if possible and desired. - - Zmake installs into the users' chroot, which makes it convenient - to execute, but can sometimes become tedious when zmake changes - land and users haven't upgraded their chroots yet. - - We can partially subvert this problem by re-execing zmake from the - source if it's available. This won't make it so developers never - need to upgrade their chroots (e.g., a toolchain upgrade could - require chroot upgrades), but at least makes it slightly more - convenient for an average repo sync. - - Args: - argv: The argument list passed to the main function, not - including the executable path. - - Returns: - None, if the re-exec did not happen, or never returns if the - re-exec did happen. - """ - # We only re-exec if we are inside of a chroot (since if installed - # standalone using pip, there's already an "editable install" - # feature for that in pip.) - env = dict(os.environ) - srcroot = env.get("CROS_WORKON_SRCROOT") - if not srcroot: - return - - # If for some reason we decide to move zmake in the future, then - # we don't want to use the re-exec logic. - zmake_path = ( - pathlib.Path(srcroot) / "src" / "platform" / "ec" / "zephyr" / "zmake" - ).resolve() - if not zmake_path.is_dir(): - return - - # If PYTHONPATH is set, it is either because we just did a - # re-exec, or because the user wants to run a specific copy of - # zmake. In either case, we don't want to re-exec. - if "PYTHONPATH" in env: - return - - # Set PYTHONPATH so that we run zmake from source. - env["PYTHONPATH"] = str(zmake_path) - - os.execve(sys.executable, [sys.executable, "-m", "zmake", *argv], env) - - -def call_with_namespace(func, namespace): - """Call a function with arguments applied from a Namespace. - - Args: - func: The callable to call. - namespace: The namespace to apply to the callable. - - Returns: - The result of calling the callable. - """ - kwds = {} - sig = inspect.signature(func) - names = [p.name for p in sig.parameters.values()] - for name, value in vars(namespace).items(): - pyname = name.replace("-", "_") - if pyname in names: - kwds[pyname] = value - return func(**kwds) - - -# Dictionary used to map log level strings to their corresponding int values. -log_level_map = { - "DEBUG": logging.DEBUG, - "INFO": logging.INFO, - "WARNING": logging.WARNING, - "ERROR": logging.ERROR, - "CRITICAL": logging.CRITICAL, -} - - -def main(argv=None): - """The main function. - - Args: - argv: Optionally, the command-line to parse, not including argv[0]. - - Returns: - Zero upon success, or non-zero upon failure. - """ - if argv is None: - argv = sys.argv[1:] - - maybe_reexec(argv) - - parser = argparse.ArgumentParser() - parser.add_argument( - "--checkout", type=pathlib.Path, help="Path to ChromiumOS checkout" - ) - parser.add_argument( - "-D", - "--debug", - action="store_true", - default=False, - help=("Turn on debug features (e.g., stack trace, " "verbose logging)"), - ) - parser.add_argument( - "-j", - "--jobs", - # TODO(b/178196029): ninja doesn't know how to talk to a - # jobserver properly and spams our CPU on all cores. Default - # to -j1 to execute sequentially until we switch to GNU Make. - default=1, - type=int, - help="Degree of multiprogramming to use", - ) - parser.add_argument( - "-l", - "--log-level", - choices=list(log_level_map.keys()), - dest="log_level", - help="Set the logging level (default=INFO)", - ) - parser.add_argument( - "-L", - "--no-log-label", - action="store_false", - help="Turn off logging labels", - dest="log_label", - default=None, - ) - parser.add_argument( - "--log-label", - action="store_true", - help="Turn on logging labels", - dest="log_label", - default=None, - ) - parser.add_argument( - "--modules-dir", - type=pathlib.Path, - help="The path to a directory containing all modules " - "needed. If unspecified, zmake will assume you have " - "a Chrome OS checkout and try locating them in the " - "checkout.", - ) - parser.add_argument( - "--zephyr-base", type=pathlib.Path, help="Path to Zephyr OS repository" - ) - - sub = parser.add_subparsers(dest="subcommand", help="Subcommand") - sub.required = True - - configure = sub.add_parser("configure") - configure.add_argument( - "--ignore-unsupported-zephyr-version", - action="store_true", - help="Don't warn about using an unsupported Zephyr version", - ) - configure.add_argument("-t", "--toolchain", help="Name of toolchain to use") - configure.add_argument( - "--bringup", - action="store_true", - dest="bringup", - help="Enable bringup debugging features", - ) - configure.add_argument( - "-B", "--build-dir", type=pathlib.Path, help="Build directory" - ) - configure.add_argument( - "-b", - "--build", - action="store_true", - dest="build_after_configure", - help="Run the build after configuration", - ) - configure.add_argument( - "--test", - action="store_true", - dest="test_after_configure", - help="Test the .elf file after configuration", - ) - configure.add_argument( - "project_dir", type=pathlib.Path, help="Path to the project to build" - ) - configure.add_argument( - "-c", - "--coverage", - action="store_true", - dest="coverage", - help="Enable CONFIG_COVERAGE Kconfig.", - ) - - build = sub.add_parser("build") - build.add_argument( - "build_dir", - type=pathlib.Path, - help="The build directory used during configuration", - ) - build.add_argument( - "-w", - "--fail-on-warnings", - action="store_true", - help="Exit with code 2 if warnings are detected", - ) - - test = sub.add_parser("test") - test.add_argument( - "build_dir", - type=pathlib.Path, - help="The build directory used during configuration", - ) - - sub.add_parser("testall") - - coverage = sub.add_parser("coverage") - coverage.add_argument( - "build_dir", - type=pathlib.Path, - help="The build directory used during configuration", - ) - - opts = parser.parse_args(argv) - - # Default logging - log_level = logging.INFO - log_label = False - - if opts.log_level: - log_level = log_level_map[opts.log_level] - log_label = True - elif opts.debug: - log_level = logging.DEBUG - log_label = True - - if opts.log_label is not None: - log_label = opts.log_label - if log_label: - log_format = "%(levelname)s: %(message)s" - else: - log_format = "%(message)s" - multiproc.log_job_names = False - - logging.basicConfig(format=log_format, level=log_level) - - if not opts.debug: - sys.tracebacklimit = 0 - - try: - zmake = call_with_namespace(zm.Zmake, opts) - subcommand_method = getattr(zmake, opts.subcommand.replace("-", "_")) - result = call_with_namespace(subcommand_method, opts) - return result - finally: - multiproc.wait_for_log_end() - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/zephyr/zmake/zmake/build_config.py b/zephyr/zmake/zmake/build_config.py deleted file mode 100644 index 9a9c7f36a2..0000000000 --- a/zephyr/zmake/zmake/build_config.py +++ /dev/null @@ -1,100 +0,0 @@ -# Copyright 2020 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. -"""Encapsulation of a build configuration.""" - - -import zmake.util as util - - -class BuildConfig: - """A container for build configurations. - - A build config is a tuple of environment variables, cmake - variables, kconfig definitons, and kconfig files. - """ - - def __init__( - self, environ_defs={}, cmake_defs={}, kconfig_defs={}, kconfig_files=[] - ): - self.environ_defs = dict(environ_defs) - self.cmake_defs = dict(cmake_defs) - self.kconfig_defs = dict(kconfig_defs) - self.kconfig_files = kconfig_files - - def popen_cmake( - self, jobclient, project_dir, build_dir, kconfig_path=None, **kwargs - ): - """Run Cmake with this config using a jobclient. - - Args: - jobclient: A JobClient instance. - project_dir: The project directory. - build_dir: Directory to use for Cmake build. - kconfig_path: The path to write out Kconfig definitions. - kwargs: forwarded to popen. - """ - kconfig_files = list(self.kconfig_files) - if kconfig_path: - util.write_kconfig_file(kconfig_path, self.kconfig_defs) - kconfig_files.append(kconfig_path) - elif self.kconfig_defs: - raise ValueError( - "Cannot start Cmake on a config with Kconfig items without a " - "kconfig_path" - ) - - if kconfig_files: - base_config = BuildConfig( - environ_defs=self.environ_defs, cmake_defs=self.cmake_defs - ) - conf_file_config = BuildConfig( - cmake_defs={ - "CONF_FILE": ";".join(str(p.resolve()) for p in kconfig_files) - } - ) - return (base_config | conf_file_config).popen_cmake( - jobclient, project_dir, build_dir, **kwargs - ) - - kwargs["env"] = dict(**kwargs.get("env", {}), **self.environ_defs) - return jobclient.popen( - [ - "/usr/bin/cmake", - "-S", - project_dir, - "-B", - build_dir, - "-GNinja", - *("-D{}={}".format(*pair) for pair in self.cmake_defs.items()), - ], - **kwargs - ) - - def __or__(self, other): - """Combine two BuildConfig instances.""" - if not isinstance(other, BuildConfig): - raise TypeError( - "Unsupported operation | for {} and {}".format(type(self), type(other)) - ) - - return BuildConfig( - environ_defs=dict(**self.environ_defs, **other.environ_defs), - cmake_defs=dict(**self.cmake_defs, **other.cmake_defs), - kconfig_defs=dict(**self.kconfig_defs, **other.kconfig_defs), - kconfig_files=list({*self.kconfig_files, *other.kconfig_files}), - ) - - def __repr__(self): - return "BuildConfig({})".format( - ", ".join( - "{}={!r}".format(name, getattr(self, name)) - for name in [ - "environ_defs", - "cmake_defs", - "kconfig_defs", - "kconfig_files", - ] - if getattr(self, name) - ) - ) diff --git a/zephyr/zmake/zmake/jobserver.py b/zephyr/zmake/zmake/jobserver.py deleted file mode 100644 index 69199a2dc8..0000000000 --- a/zephyr/zmake/zmake/jobserver.py +++ /dev/null @@ -1,144 +0,0 @@ -# Copyright 2020 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. -"""Module for job counters, limiting the amount of concurrent executions.""" - -import logging -import multiprocessing -import os -import re -import select -import subprocess - -import zmake - - -class JobHandle: - """Small object to handle claim of a job.""" - - def __init__(self, release_func, *args, **kwargs): - self.release_func = release_func - self.args = args - self.kwargs = kwargs - - def __enter__(self): - pass - - def __exit__(self, exc_type, exc_value, traceback): - self.release_func(*self.args, **self.kwargs) - - -class JobClient: - """Abstract base class for all job clients.""" - - def get_job(self): - """Claim a job.""" - raise NotImplementedError("Abstract method not implemented") - - def env(self): - """Get the environment variables necessary to share the job server.""" - return {} - - def popen(self, *args, **kwargs): - """Start a process using subprocess.Popen - - All other arguments are passed to subprocess.Popen. - - Returns: - A Popen object. - """ - kwargs.setdefault("env", os.environ) - kwargs["env"].update(self.env()) - - logger = logging.getLogger(self.__class__.__name__) - logger.debug("Running %s", zmake.util.repr_command(*args)) - return subprocess.Popen(*args, **kwargs) - - def run(self, *args, claim_job=True, **kwargs): - """Run a process using subprocess.run, optionally claiming a job. - - Args: - claim_job: True if a job should be claimed. - - All other arguments are passed to subprocess.run. - - Returns: - A CompletedProcess object. - """ - if claim_job: - with self.get_job(): - return self.run(*args, claim_job=False, **kwargs) - - kwargs.setdefault("env", os.environ) - kwargs["env"].update(self.env()) - - return subprocess.run(*args, **kwargs) - - -class JobServer(JobClient): - """Abstract Job Server.""" - - def __init__(self, jobs=0): - raise NotImplementedError("Abstract method not implemented") - - -class GNUMakeJobClient(JobClient): - def __init__(self, read_fd, write_fd): - self._pipe = [read_fd, write_fd] - - @classmethod - def from_environ(cls, env=None): - """Create a job client from an environment with the MAKEFLAGS variable. - - If we are started under a GNU Make Job Server, we can search - the environment for a string "--jobserver-auth=R,W", where R - and W will be the read and write file descriptors to the pipe - respectively. If we don't find this environment variable (or - the string inside of it), this will raise an OSError. - - Args: - env: Optionally, the environment to search. - - Returns: - A GNUMakeJobClient configured appropriately. - """ - if env is None: - env = os.environ - makeflags = env.get("MAKEFLAGS") - if not makeflags: - raise OSError("MAKEFLAGS is not set in the environment") - match = re.search(r"--jobserver-auth=(\d+),(\d+)", makeflags) - if not match: - raise OSError("MAKEFLAGS did not contain jobserver flags") - read_fd, write_fd = map(int, match.groups()) - return cls(read_fd, write_fd) - - def get_job(self): - """Claim a job. - - Returns: - A JobHandle object. - """ - byte = os.read(self._pipe[0], 1) - return JobHandle(lambda: os.write(self._pipe[1], byte)) - - def env(self): - """Get the environment variables necessary to share the job server.""" - return {"MAKEFLAGS": "--jobserver-auth={},{}".format(*self._pipe)} - - -class GNUMakeJobServer(JobServer, GNUMakeJobClient): - """Implements a GNU Make POSIX Job Server. - - See https://www.gnu.org/software/make/manual/html_node/POSIX-Jobserver.html - for specification. - """ - - def __init__(self, jobs=0): - if not jobs: - jobs = multiprocessing.cpu_count() - elif jobs > select.PIPE_BUF: - jobs = select.PIPE_BUF - - self._pipe = os.pipe() - os.write(self._pipe[1], b"+" * jobs) diff --git a/zephyr/zmake/zmake/modules.py b/zephyr/zmake/zmake/modules.py deleted file mode 100644 index 5ba0ef73f8..0000000000 --- a/zephyr/zmake/zmake/modules.py +++ /dev/null @@ -1,99 +0,0 @@ -# Copyright 2020 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. -"""Registry of known Zephyr modules.""" - -import zmake.build_config as build_config -import zmake.util as util - - -def third_party_module(name, checkout): - """Common callback in registry for all third_party/zephyr modules. - - Args: - name: The name of the module. - checkout: The path to the chromiumos source. - - Return: - The path to the module module. - """ - return checkout / "src" / "third_party" / "zephyr" / name - - -known_modules = { - "hal_stm32": third_party_module, - "cmsis": third_party_module, - "ec": lambda name, checkout: (checkout / "src" / "platform" / "ec"), - "nanopb": third_party_module, -} - - -def locate_from_checkout(checkout_dir): - """Find modules from a Chrome OS checkout. - - Important: this function should only conditionally be called if a - checkout exists. Zmake *can* be used without a Chrome OS source - tree. You should call locate_from_directory if outside of a - Chrome OS source tree. - - Args: - checkout_dir: The path to the chromiumos source. - - Returns: - A dictionary mapping module names to paths. - """ - result = {} - for name, locator in known_modules.items(): - result[name] = locator(name, checkout_dir) - return result - - -def locate_from_directory(directory): - """Create a modules dictionary from a directory. - - This takes a directory, and searches for the known module names - located in it. - - Args: - directory: the directory to search in. - - Returns: - A dictionary mapping module names to paths. - """ - result = {} - - for name in known_modules: - modpath = (directory / name).resolve() - if (modpath / "zephyr" / "module.yml").is_file(): - result[name] = modpath - - return result - - -def setup_module_symlinks(output_dir, modules): - """Setup a directory with symlinks to modules. - - Args: - output_dir: The directory to place the symlinks in. - modules: A dictionary of module names mapping to paths. - - Returns: - The resultant BuildConfig that should be applied to use each - of these modules. - """ - if not output_dir.exists(): - output_dir.mkdir(parents=True) - - module_links = [] - - for name, path in modules.items(): - link_path = output_dir.resolve() / name - util.update_symlink(path, link_path) - module_links.append(link_path) - - if module_links: - return build_config.BuildConfig( - cmake_defs={"ZEPHYR_MODULES": ";".join(map(str, module_links))} - ) - else: - return build_config.BuildConfig() diff --git a/zephyr/zmake/zmake/multiproc.py b/zephyr/zmake/zmake/multiproc.py deleted file mode 100644 index 5e98374c8c..0000000000 --- a/zephyr/zmake/zmake/multiproc.py +++ /dev/null @@ -1,322 +0,0 @@ -# Copyright 2020 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. -import collections -import logging -import os -import select -import threading - -"""Zmake multiprocessing utility module. - -This module is used to aid in zmake's multiprocessing. It contains tools -available to log output from multiple processes on the fly. This means that a -process does not need to finish before the output is available to the developer -on the screen. -""" - -# A local pipe use to signal the look that a new file descriptor was added and -# should be included in the select statement. -_logging_interrupt_pipe = os.pipe() -# A condition variable used to synchronize logging operations. -_logging_cv = threading.Condition() -# A map of file descriptors to their LogWriter -_logging_map = {} -# Should we log job names or not -log_job_names = True - - -def reset(): - """Reset this module to its starting state (useful for tests)""" - global _logging_map - - _logging_map = {} - - -class LogWriter: - """Contains information about a file descriptor that is producing output - - There is typically one of these for each file descriptor that a process is - writing to while running (stdout and stderr). - - Properties: - _logger: The logger object to use. - _log_level: The logging level to use. - _override_func: A function used to override the log level. The - function will be called once per line prior to logging and will be - passed the arguments of the line and the default log level. - _written_at_level: dict: - key: log_level - value: True if output was written at that level - _job_id: The name to prepend to logged lines - _file_descriptor: The file descriptor being logged. - """ - - def __init__( - self, logger, log_level, log_level_override_func, job_id, file_descriptor - ): - self._logger = logger - self._log_level = log_level - self._override_func = log_level_override_func - # A map whether output was printed at each logging level - self._written_at_level = collections.defaultdict(lambda: False) - self._job_id = job_id - self._file_descriptor = file_descriptor - - def log_line(self, line): - """Log a line of output - - If the log-level override function requests a change in log level, that - causes self._log_level to be updated accordingly. - - Args: - line: Text line to log - """ - if self._override_func: - # Get the new log level and update the default. The reason we - # want to update the default is that if we hit an error, all - # future logging should be moved to the new logging level. This - # greatly simplifies the logic that is needed to update the log - # level. - self._log_level = self._override_func(line, self._log_level) - if self._job_id and log_job_names: - self._logger.log(self._log_level, "[%s]%s", self._job_id, line) - else: - self._logger.log(self._log_level, line) - self._written_at_level[self._log_level] = True - - def has_written(self, log_level): - """Check if output was written at a certain log level - - Args: - log_level: log level to check - - Returns: - True if any output was written at that log level, False if not - """ - return self._written_at_level[log_level] - - def wait(self): - """Wait for this LogWriter to finish. - - This method will block execution until all the logs have been flushed out. - """ - with _logging_cv: - _logging_cv.wait_for(lambda: self._file_descriptor not in _logging_map) - - -def _log_fd(fd): - """Log information from a single file descriptor. - - This function is BLOCKING. It will read from the given file descriptor until - either the end of line is read or EOF. Once EOF is read it will remove the - file descriptor from _logging_map so it will no longer be used. - Additionally, in some cases, the file descriptor will be closed (caused by - a call to Popen.wait()). In these cases, the file descriptor will also be - removed from the map as it is no longer valid. - """ - with _logging_cv: - writer = _logging_map[fd] - if fd.closed: - del _logging_map[fd] - _logging_cv.notify_all() - return - line = fd.readline() - if not line: - # EOF - del _logging_map[fd] - _logging_cv.notify_all() - return - line = line.rstrip("\n") - if line: - writer.log_line(line) - - -def _prune_logging_fds(): - """Prune the current file descriptors under _logging_map. - - This function will iterate over the logging map and check for closed file - descriptors. Every closed file descriptor will be removed. - """ - with _logging_cv: - remove = [fd for fd in _logging_map.keys() if fd.closed] - for fd in remove: - del _logging_map[fd] - if remove: - _logging_cv.notify_all() - - -def _logging_loop(): - """The primary logging thread loop. - - This is the entry point of the logging thread. It will listen for (1) any - new data on the output file descriptors that were added via log_output() and - (2) any new file descriptors being added by log_output(). Once a file - descriptor is ready to be read, this function will call _log_fd to perform - the actual read and logging. - """ - while True: - with _logging_cv: - _logging_cv.wait_for(lambda: _logging_map) - keys = list(_logging_map.keys()) + [_logging_interrupt_pipe[0]] - try: - fds, _, _ = select.select(keys, [], []) - except ValueError: - # One of the file descriptors must be closed, prune them and try - # again. - _prune_logging_fds() - continue - if _logging_interrupt_pipe[0] in fds: - # We got a dummy byte sent by log_output(), this is a signal used to - # break out of the blocking select.select call to tell us that the - # file descriptor set has changed. We just need to read the byte and - # remove this descriptor from the list. If we actually have data - # that should be read it will be read in the for loop below. - os.read(_logging_interrupt_pipe[0], 1) - fds.remove(_logging_interrupt_pipe[0]) - for fd in fds: - _log_fd(fd) - - -_logging_thread = None - - -def log_output( - logger, log_level, file_descriptor, log_level_override_func=None, job_id=None -): - """Log the output from the given file descriptor. - - Args: - logger: The logger object to use. - log_level: The logging level to use. - file_descriptor: The file descriptor to read from. - log_level_override_func: A function used to override the log level. The - function will be called once per line prior to logging and will be - passed the arguments of the line and the default log level. - - Returns: - LogWriter object for the resulting output - """ - with _logging_cv: - global _logging_thread - if _logging_thread is None or not _logging_thread.is_alive(): - # First pass or thread must have died, create a new one. - _logging_thread = threading.Thread(target=_logging_loop, daemon=True) - _logging_thread.start() - - writer = LogWriter( - logger, log_level, log_level_override_func, job_id, file_descriptor - ) - _logging_map[file_descriptor] = writer - # Write a dummy byte to the pipe to break the select so we can add the - # new fd. - os.write(_logging_interrupt_pipe[1], b"x") - # Notify the condition so we can run the select on the current fds. - _logging_cv.notify_all() - return writer - - -def wait_for_log_end(): - """Wait for all the logs to be printed. - - This method will block execution until all the logs have been flushed out. - """ - with _logging_cv: - _logging_cv.wait_for(lambda: not _logging_map) - - -class Executor: - """Parallel executor helper class. - - This class is used to run multiple functions in parallel. The functions MUST - return an integer result code (or throw an exception). This class will start - a thread per operation and wait() for all the threads to resolve. - - Attributes: - lock: The condition variable used to synchronize across threads. - threads: A list of threading.Thread objects currently under this - Executor. - results: A list of result codes returned by each of the functions called - by this Executor. - """ - - def __init__(self): - self.lock = threading.Condition() - self.threads = [] - self.results = [] - self.logger = logging.getLogger(self.__class__.__name__) - - def append(self, func): - """Append the given function to the wait list. - - Once added, the function's return value will be used to determine the - Executor's final result value. The function must return an int result - code or throw an exception. For example: If two functions were added - to the Executor, they will both be run in parallel and their results - will determine whether or not the Executor succeeded. If both functions - returned 0, then the Executor's wait function will also return 0. - - Args: - func: A function which returns an int result code or throws an - exception. - """ - with self.lock: - thread = threading.Thread(target=lambda: self._run_fn(func), daemon=True) - thread.start() - self.threads.append(thread) - - def wait(self): - """Wait for a result to be available. - - This function waits for the executor to resolve (i.e., all - threads have finished). - - Returns: - An integer result code of either the first failed function or 0 if - they all succeeded. - """ - with self.lock: - self.lock.wait_for(predicate=lambda: self._is_finished) - return self._result - - def _run_fn(self, func): - """Entry point to each running thread. - - This function will run the function provided in the append() function. - The result value of the function will be used to determine the - Executor's result value. If the function throws any exception it will be - caught and -1 will be used as the assumed result value. - - Args: - func: The function to run. - """ - try: - result = func() - except Exception as ex: - self.logger.exception(ex) - result = -1 - with self.lock: - self.results.append(result) - self.lock.notify_all() - - @property - def _is_finished(self): - """Whether or not the Executor is considered to be done. - - Returns: - True if the Executor is considered done. - """ - if len(self.threads) == len(self.results): - return True - return False - - @property - def _result(self): - """The result code of the Executor. - - Note that _is_finished must be True for this to have any meaning. - - Returns: - An int representing the result value of the underlying functions. - """ - return next((result for result in self.results if result), 0) diff --git a/zephyr/zmake/zmake/output_packers.py b/zephyr/zmake/zmake/output_packers.py deleted file mode 100644 index 1ba38cf96c..0000000000 --- a/zephyr/zmake/zmake/output_packers.py +++ /dev/null @@ -1,230 +0,0 @@ -# Copyright 2020 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. -"""Types which provide many builds and composite them into a single binary.""" -import logging -import shutil -import subprocess - -import zmake.build_config as build_config -import zmake.multiproc -import zmake.util as util - - -class BasePacker: - """Abstract base for all packers.""" - - def __init__(self, project): - self.project = project - - def configs(self): - """Get all of the build configurations necessary. - - Yields: - 2-tuples of config name and a BuildConfig. - """ - yield "singleimage", build_config.BuildConfig() - - def pack_firmware(self, work_dir, jobclient, version_string=""): - """Pack a firmware image. - - Config names from the configs generator are passed as keyword - arguments, with each argument being set to the path of the - build directory. - - Args: - work_dir: A directory to write outputs and temporary files - into. - jobclient: A JobClient object to use. - version_string: The version string, which may end up in - certain parts of the outputs. - - Yields: - 2-tuples of the path of each file in the work_dir (or any - other directory) which should be copied into the output - directory, and the output filename. - """ - raise NotImplementedError("Abstract method not implemented") - - def _get_max_image_bytes(self): - """Get the maximum allowed image size (in bytes). - - This value will generally be found in CONFIG_FLASH_SIZE but may vary - depending on the specific way things are being packed. - - Returns: - The maximum allowed size of the image in bytes. - """ - raise NotImplementedError("Abstract method not implemented") - - def _is_size_bound(self, path): - """Check whether the given path should be constrained by size. - - Generally, .elf files will be unconstrained while .bin files will be - constrained. - - Args: - path: A file's path to test. - - Returns: - True if the file size should be checked. False otherwise. - """ - return path.suffix == ".bin" - - def _check_packed_file_size(self, file, dirs): - """Check that a packed file passes size constraints. - - Args: - file: A file to test. - dirs: A map of the arguments to pass to _get_max_image_bytes - - Returns: - The file if it passes the test. - """ - if not self._is_size_bound( - file - ) or file.stat().st_size <= self._get_max_image_bytes(**dirs): - return file - raise RuntimeError("Output file ({}) too large".format(file)) - - -class ElfPacker(BasePacker): - """Raw proxy for ELF output of a single build.""" - - def pack_firmware(self, work_dir, jobclient, singleimage, version_string=""): - yield singleimage / "zephyr" / "zephyr.elf", "zephyr.elf" - - -class RawBinPacker(BasePacker): - """Raw proxy for zephyr.bin output of a single build.""" - - def pack_firmware(self, work_dir, jobclient, singleimage, version_string=""): - yield singleimage / "zephyr" / "zephyr.bin", "zephyr.bin" - - -class BinmanPacker(BasePacker): - """Packer for RO/RW image to generate a .bin build using FMAP.""" - - ro_file = "zephyr.bin" - rw_file = "zephyr.bin" - - def __init__(self, project): - self.logger = logging.getLogger(self.__class__.__name__) - super().__init__(project) - - def configs(self): - yield "ro", build_config.BuildConfig(kconfig_defs={"CONFIG_CROS_EC_RO": "y"}) - yield "rw", build_config.BuildConfig(kconfig_defs={"CONFIG_CROS_EC_RW": "y"}) - - def pack_firmware(self, work_dir, jobclient, ro, rw, version_string=""): - """Pack RO and RW sections using Binman. - - Binman configuration is expected to be found in the RO build - device-tree configuration. - - Args: - work_dir: The directory used for packing. - jobclient: The client used to run subprocesses. - ro: Directory containing the RO image build. - rw: Directory containing the RW image build. - version_string: The version string to use in FRID/FWID. - - Yields: - 2-tuples of the path of each file in the work_dir that - should be copied into the output directory, and the output - filename. - """ - dts_file_path = ro / "zephyr" / "zephyr.dts" - - # Copy the inputs into the work directory so that Binman can - # find them under a hard-coded name. - shutil.copy2(ro / "zephyr" / self.ro_file, work_dir / "zephyr_ro.bin") - shutil.copy2(rw / "zephyr" / self.rw_file, work_dir / "zephyr_rw.bin") - - # Version in FRID/FWID can be at most 31 bytes long (32, minus - # one for null character). - if len(version_string) > 31: - version_string = version_string[:31] - - proc = jobclient.popen( - [ - "binman", - "-v", - "5", - "build", - "-a", - "version={}".format(version_string), - "-d", - dts_file_path, - "-m", - "-O", - work_dir, - ], - cwd=work_dir, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - encoding="utf-8", - ) - - zmake.multiproc.log_output(self.logger, logging.DEBUG, proc.stdout) - zmake.multiproc.log_output(self.logger, logging.ERROR, proc.stderr) - if proc.wait(timeout=60): - raise OSError("Failed to run binman") - - yield work_dir / "zephyr.bin", "zephyr.bin" - yield ro / "zephyr" / "zephyr.elf", "zephyr.ro.elf" - yield rw / "zephyr" / "zephyr.elf", "zephyr.rw.elf" - - -class NpcxPacker(BinmanPacker): - """Packer for RO/RW image to generate a .bin build using FMAP. - - This expects that the build is setup to generate a - zephyr.npcx.bin for the RO image, which should be packed using - Nuvoton's loader format. - """ - - ro_file = "zephyr.npcx.bin" - npcx_monitor = "npcx_monitor.bin" - - # TODO(b/192401039): CONFIG_FLASH_SIZE is nuvoton-only. Since - # binman already checks sizes, perhaps we can just remove this - # code? - def _get_max_image_bytes(self, ro, rw): - ro_size = util.read_kconfig_autoconf_value( - ro / "zephyr" / "include" / "generated", "CONFIG_FLASH_SIZE" - ) - rw_size = util.read_kconfig_autoconf_value( - ro / "zephyr" / "include" / "generated", "CONFIG_FLASH_SIZE" - ) - return max(int(ro_size, 0), int(rw_size, 0)) * 1024 - - # This can probably be removed too and just rely on binman to - # check the sizes... see the comment above. - def pack_firmware(self, work_dir, jobclient, ro, rw, version_string=""): - for path, output_file in super().pack_firmware( - work_dir, - jobclient, - ro, - rw, - version_string=version_string, - ): - if output_file == "zephyr.bin": - yield ( - self._check_packed_file_size(path, {"ro": ro, "rw": rw}), - "zephyr.bin", - ) - else: - yield path, output_file - - # Include the NPCX monitor file as an output artifact. - yield ro / self.npcx_monitor, self.npcx_monitor - - -# A dictionary mapping packer config names to classes. -packer_registry = { - "binman": BinmanPacker, - "elf": ElfPacker, - "npcx": NpcxPacker, - "raw": RawBinPacker, -} diff --git a/zephyr/zmake/zmake/project.py b/zephyr/zmake/zmake/project.py deleted file mode 100644 index 84151a90b3..0000000000 --- a/zephyr/zmake/zmake/project.py +++ /dev/null @@ -1,248 +0,0 @@ -# Copyright 2020 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. -"""Module for project config wrapper object.""" - -import logging -import pathlib -import warnings - -import yaml - -import zmake.build_config as build_config -import zmake.modules as modules -import zmake.output_packers as packers -import zmake.toolchains as toolchains -import zmake.util as util - -# The version of jsonschema in the chroot has a bunch of -# DeprecationWarnings that fire when we import it. Suppress these -# during the import to keep the noise down. -with warnings.catch_warnings(): - warnings.simplefilter("ignore") - import jsonschema - - -def module_dts_overlay_name(modpath, board_name): - """Given a board name, return the expected DTS overlay path. - - Args: - modpath: the module path as a pathlib.Path object - board_name: the name of the board - - Returns: - A pathlib.Path object to the expected overlay path. - """ - return modpath / "zephyr" / "dts" / "board-overlays" / "{}.dts".format(board_name) - - -def find_projects(root_dir): - """Finds all zmake projects in root_dir. - - Args: - root_dir: the root dir as a pathlib.Path object - - Yields: - Project: The next project found. - """ - logging.info("Finding zmake targets under '%s'.", root_dir) - for path in pathlib.Path(root_dir).rglob("zmake.yaml"): - yield Project(path.parent) - - -class ProjectConfig: - """An object wrapping zmake.yaml.""" - - validator = jsonschema.Draft7Validator - schema = { - "type": "object", - "required": [ - "board", - "output-type", - "supported-toolchains", - "supported-zephyr-versions", - ], - "properties": { - "supported-zephyr-versions": { - "type": "array", - "items": { - "type": "string", - "enum": ["v2.6"], - }, - "minItems": 1, - "uniqueItems": True, - }, - "board": { - "type": "string", - }, - "modules": { - "type": "array", - "items": { - "type": "string", - "enum": list(modules.known_modules), - }, - }, - "output-type": { - "type": "string", - "enum": list(packers.packer_registry), - }, - "supported-toolchains": { - "type": "array", - "items": { - "type": "string", - "enum": list(toolchains.support_classes), - }, - }, - "is-test": { - "type": "boolean", - }, - "dts-overlays": { - "type": "array", - "items": { - "type": "string", - }, - }, - }, - } - - def __init__(self, config_dict): - self.validator.check_schema(self.schema) - jsonschema.validate(config_dict, self.schema, cls=self.validator) - self.config_dict = config_dict - - @property - def supported_zephyr_versions(self): - return [ - util.parse_zephyr_version(x) - for x in self.config_dict["supported-zephyr-versions"] - ] - - @property - def board(self): - return self.config_dict["board"] - - @property - def modules(self): - return self.config_dict.get("modules", list(modules.known_modules)) - - @property - def output_packer(self): - return packers.packer_registry[self.config_dict["output-type"]] - - @property - def supported_toolchains(self): - return self.config_dict["supported-toolchains"] - - @property - def is_test(self): - return self.config_dict.get("is-test", False) - - @property - def dts_overlays(self): - return self.config_dict.get("dts-overlays", []) - - -class Project: - """An object encapsulating a project directory.""" - - def __init__(self, project_dir, config_dict=None): - self.project_dir = project_dir.resolve() - if not config_dict: - with open(self.project_dir / "zmake.yaml") as f: - config_dict = yaml.safe_load(f) - self.config = ProjectConfig(config_dict) - self.packer = self.config.output_packer(self) - - def iter_builds(self): - """Iterate thru the build combinations provided by the project's packer. - - Yields: - 2-tuples of a build configuration name and a BuildConfig. - """ - conf = build_config.BuildConfig(cmake_defs={"BOARD": self.config.board}) - prj_conf = self.project_dir / "prj.conf" - if prj_conf.is_file(): - conf |= build_config.BuildConfig(kconfig_files=[prj_conf]) - for build_name, packer_config in self.packer.configs(): - yield build_name, conf | packer_config - - def find_dts_overlays(self, modules): - """Find appropriate dts overlays from registered modules. - - Args: - modules: A dictionary of module names mapping to paths. - - Returns: - A BuildConfig with relevant configurations to enable the - found DTS overlay files. - """ - overlays = [] - for module_path in modules.values(): - dts_path = module_dts_overlay_name(module_path, self.config.board) - if dts_path.is_file(): - overlays.append(dts_path.resolve()) - - overlays.extend(self.project_dir / f for f in self.config.dts_overlays) - - if overlays: - return build_config.BuildConfig( - cmake_defs={"DTC_OVERLAY_FILE": ";".join(map(str, overlays))} - ) - else: - return build_config.BuildConfig() - - def prune_modules(self, module_paths): - """Reduce a modules dict to the ones required by this project. - - If this project does not define a modules list in the - configuration, it is assumed that all known modules to Zmake - are required. This is typically inconsequential as Zephyr - module design conventions require a Kconfig option to actually - enable most modules. - - Args: - module_paths: A dictionary mapping module names to their - paths. This dictionary is not modified. - - Returns: - A new module_paths dictionary with only the modules - required by this project. - - Raises: - A KeyError, if a required module is unavailable. - """ - result = {} - for module in self.config.modules: - try: - result[module] = module_paths[module] - except KeyError as e: - raise KeyError( - "The {!r} module is required by the {} project, but is not " - "available.".format(module, self.project_dir) - ) from e - return result - - def get_toolchain(self, module_paths, override=None): - if override: - if override not in self.config.supported_toolchains: - logging.warning( - "Toolchain %r isn't supported by this project. You're on your own.", - override, - ) - support_class = toolchains.support_classes.get( - override, toolchains.GenericToolchain - ) - return support_class(name=override, modules=module_paths) - else: - for name in self.config.supported_toolchains: - support_class = toolchains.support_classes[name] - toolchain = support_class(name=name, modules=module_paths) - if toolchain.probe(): - logging.info("Toolchain %r selected by probe function.", toolchain) - return toolchain - raise OSError( - "No supported toolchains could be found on your system. If you see " - "this message in the chroot, it indicates a bug. Otherwise, you'll " - "either want to setup your system with a supported toolchain, or " - "manually select an unsupported toolchain with the -t flag." - ) diff --git a/zephyr/zmake/zmake/toolchains.py b/zephyr/zmake/zmake/toolchains.py deleted file mode 100644 index 924448aec5..0000000000 --- a/zephyr/zmake/zmake/toolchains.py +++ /dev/null @@ -1,154 +0,0 @@ -# Copyright 2020 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. -"""Definitions of toolchain variables.""" - -import os -import pathlib - -import zmake.build_config as build_config - - -class GenericToolchain: - """Default toolchain if not known to zmake. - - Simply pass ZEPHYR_TOOLCHAIN_VARIANT=name to the build, with - nothing extra. - """ - - def __init__(self, name, modules=None): - self.name = name - self.modules = modules or {} - - def probe(self): - """Probe if the toolchain is available on the system.""" - # Since the toolchain is not known to zmake, we have no way to - # know if it's installed. Simply return False to indicate not - # installed. An unknown toolchain would only be used if -t - # was manually passed to zmake, and is not valid to put in a - # zmake.yaml file. - return False - - def get_build_config(self): - """Get the build configuration for the toolchain. - - Returns: - A build_config.BuildConfig to be applied to the build. - """ - return build_config.BuildConfig( - cmake_defs={ - "ZEPHYR_TOOLCHAIN_VARIANT": self.name, - }, - ) - - -class CorebootSdkToolchain(GenericToolchain): - def probe(self): - # For now, we always assume it's at /opt/coreboot-sdk, since - # that's where it's installed in the chroot. We may want to - # consider adding support for a coreboot-sdk built in the - # user's home directory, for example, which happens if a - # "make crossgcc" is done from the coreboot repository. - return pathlib.Path("/opt/coreboot-sdk").is_dir() - - def get_build_config(self): - return ( - build_config.BuildConfig( - cmake_defs={ - "TOOLCHAIN_ROOT": str(self.modules["ec"] / "zephyr"), - }, - ) - | super().get_build_config() - ) - - -class ZephyrToolchain(GenericToolchain): - def __init__(self, *args, **kwargs): - self.zephyr_sdk_install_dir = self._find_zephyr_sdk() - super().__init__(*args, **kwargs) - - @staticmethod - def _find_zephyr_sdk(): - """Find the Zephyr SDK, if it's installed. - - Returns: - The path to the Zephyr SDK, using the search rules defined by - https://docs.zephyrproject.org/latest/getting_started/installation_linux.html, - or None, if one cannot be found on the system. - """ - from_env = os.getenv("ZEPHYR_SDK_INSTALL_DIR") - if from_env: - return pathlib.Path(from_env) - - def _gen_sdk_paths(): - for prefix in ( - "~", - "~/.local", - "~/.local/opt", - "~/bin", - "/opt", - "/usr", - "/usr/local", - ): - prefix = pathlib.Path(os.path.expanduser(prefix)) - yield prefix / "zephyr-sdk" - yield from prefix.glob("zephyr-sdk-*") - - for path in _gen_sdk_paths(): - if (path / "sdk_version").is_file(): - return path - - return None - - def probe(self): - return bool(self.zephyr_sdk_install_dir) - - def get_build_config(self): - assert self.zephyr_sdk_install_dir - tc_vars = { - "ZEPHYR_SDK_INSTALL_DIR": str(self.zephyr_sdk_install_dir), - } - return ( - build_config.BuildConfig( - environ_defs=tc_vars, - cmake_defs=tc_vars, - ) - | super().get_build_config() - ) - - -class LlvmToolchain(GenericToolchain): - def probe(self): - # TODO: differentiate chroot llvm path vs. something more - # generic? - return pathlib.Path("/usr/bin/x86_64-pc-linux-gnu-clang").exists() - - def get_build_config(self): - # TODO: this contains custom settings for the chroot. Plumb a - # toolchain for "generic-llvm" for external uses? - return ( - build_config.BuildConfig( - cmake_defs={ - "TOOLCHAIN_ROOT": str(self.modules["ec"] / "zephyr"), - }, - ) - | super().get_build_config() - ) - - -class HostToolchain(GenericToolchain): - def probe(self): - # "host" toolchain for Zephyr means GCC. - for search_path in os.getenv("PATH", "/usr/bin").split(":"): - if (pathlib.Path(search_path) / "gcc").exists(): - return True - return False - - -# Mapping of toolchain names -> support class -support_classes = { - "coreboot-sdk": CorebootSdkToolchain, - "host": HostToolchain, - "llvm": LlvmToolchain, - "zephyr": ZephyrToolchain, -} diff --git a/zephyr/zmake/zmake/util.py b/zephyr/zmake/zmake/util.py deleted file mode 100644 index 455cb7c9d6..0000000000 --- a/zephyr/zmake/zmake/util.py +++ /dev/null @@ -1,255 +0,0 @@ -# Copyright 2020 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. -"""Common miscellaneous utility functions for zmake.""" - -import os -import pathlib -import re -import shlex - - -def c_str(input_str): - """Make a string that can be included as a literal in C source code. - - Args: - input_str: The string to process. - - Returns: - A string which can be included in C source code. - """ - - def c_chr(char): - # Convert a char in a string to the C representation. Per the - # C standard, we can use all characters but quote, newline, - # and backslash directly with no replacements. - return { - '"': r"\"", - "\n": r"\n", - "\\": "\\\\", - }.get(char, char) - - return '"{}"'.format("".join(map(c_chr, input_str))) - - -def locate_cros_checkout(): - """Find the path to the ChromiumOS checkout. - - Returns: - The first directory found with a .repo directory in it, - starting by checking the CROS_WORKON_SRCROOT environment - variable, then scanning upwards from the current directory, - and finally from a known set of common paths. - """ - - def propose_checkouts(): - yield os.getenv("CROS_WORKON_SRCROOT") - - path = pathlib.Path.cwd() - while path.resolve() != pathlib.Path("/"): - yield path - path = path / ".." - - yield "/mnt/host/source" - yield pathlib.Path.home() / "trunk" - yield pathlib.Path.home() / "chromiumos" - - for path in propose_checkouts(): - if not path: - continue - path = pathlib.Path(path) - if (path / ".repo").is_dir(): - return path.resolve() - - raise FileNotFoundError("Unable to locate a ChromiumOS checkout") - - -def locate_zephyr_base(checkout, version): - """Locate the path to the Zephyr RTOS in a ChromiumOS checkout. - - Args: - checkout: The path to the ChromiumOS checkout. - version: The requested zephyr version, as a tuple of integers. - - Returns: - The path to the Zephyr source. - """ - return ( - checkout - / "src" - / "third_party" - / "zephyr" - / "main" - / "v{}.{}".format(*version[:2]) - ) - - -def read_kconfig_file(path): - """Parse a Kconfig file. - - Args: - path: The path to open. - - Returns: - A dictionary of kconfig items to their values. - """ - result = {} - with open(path) as f: - for line in f: - line, _, _ = line.partition("#") - line = line.strip() - if line: - name, _, value = line.partition("=") - result[name.strip()] = value.strip() - return result - - -def read_kconfig_autoconf_value(path, key): - """Parse an autoconf.h file for a resolved kconfig value - - Args: - path: The path to the autoconf.h file. - key: The define key to lookup. - - Returns: - The value associated with the key or nothing if the key wasn't found. - """ - prog = re.compile(r"^#define\s{}\s(\S+)$".format(key)) - with open(path / "autoconf.h") as f: - for line in f: - m = prog.match(line) - if m: - return m.group(1) - - -def write_kconfig_file(path, config, only_if_changed=True): - """Write out a dictionary to Kconfig format. - - Args: - path: The path to write to. - config: The dictionary to write. - only_if_changed: Set to True if the file should not be written - unless it has changed. - """ - if only_if_changed: - if path.exists() and read_kconfig_file(path) == config: - return - with open(path, "w") as f: - for name, value in config.items(): - f.write("{}={}\n".format(name, value)) - - -def parse_zephyr_version(version_string): - """Parse a human-readable version string (e.g., "v2.4") as a tuple. - - Args: - version_string: The human-readable version string. - - Returns: - A 2-tuple or 3-tuple of integers representing the version. - """ - match = re.fullmatch(r"v?(\d+)[._](\d+)(?:[._](\d+))?", version_string) - if not match: - raise ValueError( - "{} does not look like a Zephyr version.".format(version_string) - ) - return tuple(int(x) for x in match.groups() if x is not None) - - -def read_zephyr_version(zephyr_base): - """Read the Zephyr version from a Zephyr OS checkout. - - Args: - zephyr_base: path to the Zephyr OS repository. - - Returns: - A 3-tuple of the version number (major, minor, patchset). - """ - version_file = pathlib.Path(zephyr_base) / "VERSION" - - file_vars = {} - with open(version_file) as f: - for line in f: - key, sep, value = line.partition("=") - file_vars[key.strip()] = value.strip() - - return ( - int(file_vars["VERSION_MAJOR"]), - int(file_vars["VERSION_MINOR"]), - int(file_vars["PATCHLEVEL"]), - ) - - -def repr_command(argv): - """Represent an argument array as a string. - - Args: - argv: The arguments of the command. - - Returns: - A string which could be pasted into a shell for execution. - """ - return " ".join(shlex.quote(str(arg)) for arg in argv) - - -def update_symlink(target_path, link_path): - """Create a symlink if it does not exist, or links to a different path. - - Args: - target_path: A Path-like object of the desired symlink path. - link_path: A Path-like object of the symlink. - """ - target = target_path.resolve() - if ( - not link_path.is_symlink() - or pathlib.Path(os.readlink(link_path)).resolve() != target - ): - if link_path.exists(): - link_path.unlink() - link_path.symlink_to(target) - - -def log_multi_line(logger, level, message): - """Log a potentially multi-line message to the logger. - - Args: - logger: The Logger object to log to. - level: The logging level to use when logging. - message: The (potentially) multi-line message to log. - """ - for line in message.splitlines(): - if line: - logger.log(level, line) - - -def resolve_build_dir(platform_ec_dir, project_dir, build_dir): - """Resolve the build directory using platform/ec/build/... as default. - - Args: - platform_ec_dir: The path to the chromiumos source's platform/ec - directory. - project_dir: The directory of the project. - build_dir: The directory to build in (may be None). - Returns: - The resolved build directory (using build_dir if not None). - """ - if build_dir: - return build_dir - - if not pathlib.Path.exists(project_dir / "zmake.yaml"): - raise OSError("Invalid configuration") - - # Resolve project_dir to absolute path. - project_dir = project_dir.resolve() - - # Compute the path of project_dir relative to platform_ec_dir. - project_relative_path = pathlib.Path.relative_to(project_dir, platform_ec_dir) - - # Make sure that the project_dir is a subdirectory of platform_ec_dir. - if platform_ec_dir / project_relative_path != project_dir: - raise OSError( - "Can't resolve project directory {} which is not a subdirectory" - " of the platform/ec directory {}".format(project_dir, platform_ec_dir) - ) - - return platform_ec_dir / "build" / project_relative_path diff --git a/zephyr/zmake/zmake/version.py b/zephyr/zmake/zmake/version.py deleted file mode 100644 index 47aba6d804..0000000000 --- a/zephyr/zmake/zmake/version.py +++ /dev/null @@ -1,166 +0,0 @@ -# Copyright 2021 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -import datetime -import getpass -import io -import os -import platform -import subprocess - -import zmake.util as util - - -def _get_num_commits(repo): - """Get the number of commits that have been made. - - If a Git repository is available, return the number of commits that have - been made. Otherwise return a fixed count. - - Args: - repo: The path to the git repo. - - Returns: - An integer, the number of commits that have been made. - """ - try: - result = subprocess.run( - ["git", "-C", repo, "rev-list", "HEAD", "--count"], - check=True, - stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL, - encoding="utf-8", - ) - except subprocess.CalledProcessError: - commits = "9999" - else: - commits = result.stdout - - return int(commits) - - -def _get_revision(repo): - """Get the current revision hash. - - If a Git repository is available, return the hash of the current index. - Otherwise return the hash of the VCSID environment variable provided by - the packaging system. - - Args: - repo: The path to the git repo. - - Returns: - A string, of the current revision. - """ - try: - result = subprocess.run( - ["git", "-C", repo, "log", "-n1", "--format=%H"], - check=True, - stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL, - encoding="utf-8", - ) - except subprocess.CalledProcessError: - # Fall back to the VCSID provided by the packaging system. - # Format is 0.0.1-r425-032666c418782c14fe912ba6d9f98ffdf0b941e9 for - # releases and 9999-032666c418782c14fe912ba6d9f98ffdf0b941e9 for - # 9999 ebuilds. - vcsid = os.environ.get("VCSID", "9999-unknown") - revision = vcsid.rsplit("-", 1)[1] - else: - revision = result.stdout - - return revision - - -def get_version_string(project, zephyr_base, modules, static=False): - """Get the version string associated with a build. - - Args: - project: a zmake.project.Project object - zephyr_base: the path to the zephyr directory - modules: a dictionary mapping module names to module paths - static: if set, create a version string not dependent on git - commits, thus allowing binaries to be compared between two - commits. - - Returns: - A version string which can be placed in FRID, FWID, or used in - the build for the OS. - """ - major_version, minor_version, *_ = util.read_zephyr_version(zephyr_base) - project_id = project.project_dir.parts[-1] - num_commits = 0 - - if static: - vcs_hashes = "STATIC" - else: - repos = { - "os": zephyr_base, - **modules, - } - - for repo in repos.values(): - num_commits += _get_num_commits(repo) - - vcs_hashes = ",".join( - "{}:{}".format(name, _get_revision(repo)[:6]) - for name, repo in sorted( - repos.items(), - # Put the EC module first, then Zephyr OS kernel, as - # these are probably the most important hashes to - # developers. - key=lambda p: (p[0] != "ec", p[0] != "os", p), - ) - ) - - return "{}_v{}.{}.{}-{}".format( - project_id, major_version, minor_version, num_commits, vcs_hashes - ) - - -def write_version_header(version_str, output_path, static=False): - """Generate a version header and write it to the specified path. - - Generate a version header in the format expected by the EC build - system, and write it out only if the version header does not exist - or changes. We don't write in the case that the version header - does exist and was unchanged, which allows "zmake build" commands - on an unchanged tree to be an effective no-op. - - Args: - version_str: The version string to be used in the header, such - as one generated by get_version_string. - output_path: The file path to write at (a pathlib.Path - object). - static: If true, generate a header which does not include - information like the username, hostname, or date, allowing - the build to be reproducible. - """ - output = io.StringIO() - output.write("/* This file is automatically generated by zmake */\n") - - def add_def(name, value): - output.write("#define {} {}\n".format(name, util.c_str(value))) - - def add_def_unquoted(name, value): - output.write("#define {} {}\n".format(name, value)) - - add_def("VERSION", version_str) - add_def("CROS_EC_VERSION32", version_str[:31]) - - if static: - add_def("BUILDER", "reproducible@build") - add_def("DATE", "STATIC_VERSION_DATE") - else: - add_def("BUILDER", "{}@{}".format(getpass.getuser(), platform.node())) - add_def("DATE", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) - - add_def("CROS_FWID_MISSING_STR", "CROS_FWID_MISSING") - # TODO(b/198475757): Add zmake support for getting CROS_FWID32 - add_def_unquoted("CROS_FWID32", "CROS_FWID_MISSING_STR") - - contents = output.getvalue() - if not output_path.exists() or output_path.read_text() != contents: - output_path.write_text(contents) diff --git a/zephyr/zmake/zmake/zmake.py b/zephyr/zmake/zmake/zmake.py deleted file mode 100644 index 1b60e66f66..0000000000 --- a/zephyr/zmake/zmake/zmake.py +++ /dev/null @@ -1,757 +0,0 @@ -# Copyright 2020 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Module encapsulating Zmake wrapper object.""" -import logging -import os -import pathlib -import re -import shutil -import subprocess -import tempfile - -import zmake.build_config -import zmake.jobserver -import zmake.modules -import zmake.multiproc -import zmake.project -import zmake.util as util -import zmake.version - -ninja_warnings = re.compile(r"^(\S*: )?warning:.*") -ninja_errors = re.compile(r"error:.*") - - -def ninja_stdout_log_level_override(line, current_log_level): - """Update the log level for ninja builds if we hit an error. - - Ninja builds prints everything to stdout, but really we want to start - logging things to CRITICAL - - Args: - line: The line that is about to be logged. - current_log_level: The active logging level that would be used for the - line. - """ - # Output lines from Zephyr that are not normally useful - # Send any lines that start with these strings to INFO - cmake_suppress = [ - "-- ", # device tree messages - "Loaded configuration", - "Including boilerplate", - "Parsing ", - "No change to configuration", - "No change to Kconfig header", - ] - - # Herewith a long list of things which are really for debugging, not - # development. Return logging.DEBUG for each of these. - - # ninja puts progress information on stdout - if line.startswith("["): - return logging.DEBUG - # we don't care about entering directories since it happens every time - if line.startswith("ninja: Entering directory"): - return logging.DEBUG - # we know the build stops from the compiler messages and ninja return code - if line.startswith("ninja: build stopped"): - return logging.DEBUG - # someone prints a *** SUCCESS *** message which we don't need - if line.startswith("***"): - return logging.DEBUG - # dopey ninja puts errors on stdout, so fix that. It does not look - # likely that it will be fixed upstream: - # https://github.com/ninja-build/ninja/issues/1537 - # Try to drop output about the device tree - if any(line.startswith(x) for x in cmake_suppress): - return logging.INFO - # this message is a bit like make failing. We already got the error output. - if line.startswith("FAILED: CMakeFiles"): - return logging.INFO - # if a particular file fails it shows the build line used, but that is not - # useful except for debugging. - if line.startswith("ccache"): - return logging.DEBUG - if ninja_warnings.match(line): - return logging.WARNING - if ninja_errors.match(line): - return logging.ERROR - # When we see "Memory region" go into INFO, and stay there as long as the - # line starts with \S+: - if line.startswith("Memory region"): - return logging.INFO - if current_log_level == logging.INFO and line.split()[0].endswith(":"): - return current_log_level - if current_log_level == logging.WARNING: - return current_log_level - return logging.ERROR - - -def cmake_log_level_override(line, default_log_level): - """Update the log level for cmake output if we hit an error. - - Cmake prints some messages that are less than useful during - development. - - Args: - line: The line that is about to be logged. - default_log_level: The default logging level that will be used for the - line. - """ - # Strange output from Zephyr that we normally ignore - if line.startswith("Including boilerplate"): - return logging.DEBUG - elif line.startswith("devicetree error:"): - return logging.ERROR - if ninja_warnings.match(line): - return logging.WARNING - if ninja_errors.match(line): - return logging.ERROR - return default_log_level - - -def get_process_failure_msg(proc): - """Creates a suitable failure message if something exits badly - - Args: - proc: subprocess.Popen object containing the thing that failed - - Returns: - Failure message as a string: - """ - return "Execution failed (return code={}): {}\n".format( - proc.returncode, util.repr_command(proc.args) - ) - - -class Zmake: - """Wrapper class encapsulating zmake's supported operations. - - The invocations of the constructor and the methods actually comes - from the main function. The command line arguments are translated - such that dashes are replaced with underscores and applied as - keyword arguments to the constructor and the method, and the - subcommand invoked becomes the method run. - - As such, you won't find documentation for each method's parameters - here, as it would be duplicate of the help strings from the - command line. Run "zmake --help" for full documentation of each - parameter. - - Properties: - executor: a zmake.multiproc.Executor object for submitting - tasks to. - _sequential: True to check the results of each build job sequentially, - before launching more, False to just do this after all jobs complete - """ - - def __init__( - self, checkout=None, jobserver=None, jobs=0, modules_dir=None, zephyr_base=None - ): - zmake.multiproc.reset() - self._checkout = checkout - self._zephyr_base = zephyr_base - - if modules_dir: - self.module_paths = zmake.modules.locate_from_directory(modules_dir) - else: - self.module_paths = zmake.modules.locate_from_checkout(self.checkout) - - if jobserver: - self.jobserver = jobserver - else: - try: - self.jobserver = zmake.jobserver.GNUMakeJobClient.from_environ() - except OSError: - self.jobserver = zmake.jobserver.GNUMakeJobServer(jobs=jobs) - - self.logger = logging.getLogger(self.__class__.__name__) - self.executor = zmake.multiproc.Executor() - self._sequential = jobs == 1 - - @property - def checkout(self): - if not self._checkout: - self._checkout = util.locate_cros_checkout() - return self._checkout.resolve() - - def locate_zephyr_base(self, version): - """Locate the Zephyr OS repository. - - Args: - version: If a Zephyr OS base was not supplied to Zmake, - which version to search for as a tuple of integers. - This argument is ignored if a Zephyr base was supplied - to Zmake. - Returns: - A pathlib.Path to the found Zephyr OS repository. - """ - if self._zephyr_base: - return self._zephyr_base - - return util.locate_zephyr_base(self.checkout, version) - - def configure( - self, - project_dir, - build_dir=None, - toolchain=None, - ignore_unsupported_zephyr_version=False, - build_after_configure=False, - test_after_configure=False, - bringup=False, - coverage=False, - ): - """Set up a build directory to later be built by "zmake build".""" - project = zmake.project.Project(project_dir) - supported_versions = project.config.supported_zephyr_versions - - zephyr_base = self.locate_zephyr_base(max(supported_versions)).resolve() - - # Ignore the patchset from the Zephyr version. - zephyr_version = util.read_zephyr_version(zephyr_base)[:2] - - if ( - not ignore_unsupported_zephyr_version - and zephyr_version not in supported_versions - ): - raise ValueError( - "The Zephyr OS version (v{}.{}) is not supported by the " - "project. You may wish to either configure zmake.yaml to " - "support this version, or pass " - "--ignore-unsupported-zephyr-version.".format(*zephyr_version) - ) - - # Resolve build_dir if needed. - build_dir = util.resolve_build_dir( - platform_ec_dir=self.module_paths["ec"], - project_dir=project_dir, - build_dir=build_dir, - ) - # Make sure the build directory is clean. - if os.path.exists(build_dir): - self.logger.info("Clearing old build directory %s", build_dir) - shutil.rmtree(build_dir) - - generated_include_dir = (build_dir / "include").resolve() - base_config = zmake.build_config.BuildConfig( - environ_defs={"ZEPHYR_BASE": str(zephyr_base), "PATH": "/usr/bin"}, - cmake_defs={ - "DTS_ROOT": str(self.module_paths["ec"] / "zephyr"), - "SYSCALL_INCLUDE_DIRS": str( - self.module_paths["ec"] / "zephyr" / "include" / "drivers" - ), - "ZMAKE_INCLUDE_DIR": str(generated_include_dir), - }, - ) - - # Prune the module paths to just those required by the project. - module_paths = project.prune_modules(self.module_paths) - - module_config = zmake.modules.setup_module_symlinks( - build_dir / "modules", module_paths - ) - - # Symlink the Zephyr base into the build directory so it can - # be used in the build phase. - util.update_symlink(zephyr_base, build_dir / "zephyr_base") - - dts_overlay_config = project.find_dts_overlays(module_paths) - - toolchain_support = project.get_toolchain(module_paths, override=toolchain) - toolchain_config = toolchain_support.get_build_config() - - if bringup: - base_config |= zmake.build_config.BuildConfig( - kconfig_defs={"CONFIG_PLATFORM_EC_BRINGUP": "y"} - ) - if coverage: - base_config |= zmake.build_config.BuildConfig( - kconfig_defs={"CONFIG_COVERAGE": "y"} - ) - - if not build_dir.exists(): - build_dir = build_dir.mkdir() - if not generated_include_dir.exists(): - generated_include_dir.mkdir() - processes = [] - self.logger.info("Building %s in %s.", project_dir, build_dir) - for build_name, build_config in project.iter_builds(): - self.logger.info("Configuring %s:%s.", project_dir, build_name) - config = ( - base_config - | toolchain_config - | module_config - | dts_overlay_config - | build_config - ) - output_dir = build_dir / "build-{}".format(build_name) - kconfig_file = build_dir / "kconfig-{}.conf".format(build_name) - proc = config.popen_cmake( - self.jobserver, - project_dir, - output_dir, - kconfig_file, - stdin=subprocess.DEVNULL, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - encoding="utf-8", - errors="replace", - ) - job_id = "{}:{}".format(project_dir, build_name) - zmake.multiproc.log_output( - self.logger, - logging.DEBUG, - proc.stdout, - log_level_override_func=cmake_log_level_override, - job_id=job_id, - ) - zmake.multiproc.log_output( - self.logger, - logging.ERROR, - proc.stderr, - log_level_override_func=cmake_log_level_override, - job_id=job_id, - ) - if self._sequential: - if proc.wait(): - raise OSError(get_process_failure_msg(proc)) - else: - processes.append(proc) - for proc in processes: - if proc.wait(): - raise OSError(get_process_failure_msg(proc)) - - # Create symlink to project - util.update_symlink(project_dir, build_dir / "project") - - if test_after_configure: - return self.test(build_dir=build_dir) - elif build_after_configure: - return self.build(build_dir=build_dir) - - def build(self, build_dir, output_files_out=None, fail_on_warnings=False): - """Build a pre-configured build directory.""" - - def wait_and_check_success(procs, writers): - """Wait for processes to complete and check for errors - - Args: - procs: List of subprocess.Popen objects to check - writers: List of LogWriter objects to check - - Returns: - True if all if OK - False if an error was found (so that zmake should exit) - """ - bad = None - for proc in procs: - if proc.wait() and not bad: - bad = proc - if bad: - # Just show the first bad process for now. Both builds likely - # produce the same error anyway. If they don't, the user can - # still take action on the errors/warnings provided. Showing - # multiple 'Execution failed' messages is not very friendly - # since it exposes the fragmented nature of the build. - raise OSError(get_process_failure_msg(bad)) - - # Let all output be produced before exiting - for writer in writers: - writer.wait() - if fail_on_warnings and any( - w.has_written(logging.WARNING) or w.has_written(logging.ERROR) - for w in writers - ): - self.logger.warning("zmake: Warnings detected in build: aborting") - return False - return True - - procs = [] - log_writers = [] - dirs = {} - - build_dir = build_dir.resolve() - project = zmake.project.Project(build_dir / "project") - - # Compute the version string. - version_string = zmake.version.get_version_string( - project, - build_dir / "zephyr_base", - zmake.modules.locate_from_directory(build_dir / "modules"), - ) - - # The version header needs to generated during the build phase - # instead of configure, as the tree may have changed since - # configure was run. - zmake.version.write_version_header( - version_string, - build_dir / "include" / "ec_version.h", - ) - - for build_name, build_config in project.iter_builds(): - with self.jobserver.get_job(): - dirs[build_name] = build_dir / "build-{}".format(build_name) - cmd = ["/usr/bin/ninja", "-C", dirs[build_name].as_posix()] - self.logger.info( - "Building %s:%s: %s", - build_dir, - build_name, - zmake.util.repr_command(cmd), - ) - proc = self.jobserver.popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - encoding="utf-8", - errors="replace", - ) - job_id = "{}:{}".format(build_dir, build_name) - out = zmake.multiproc.log_output( - logger=self.logger, - log_level=logging.INFO, - file_descriptor=proc.stdout, - log_level_override_func=ninja_stdout_log_level_override, - job_id=job_id, - ) - err = zmake.multiproc.log_output( - self.logger, - logging.ERROR, - proc.stderr, - job_id=job_id, - ) - - if self._sequential: - if not wait_and_check_success([proc], [out, err]): - return 2 - else: - procs.append(proc) - log_writers += [out, err] - - if not wait_and_check_success(procs, log_writers): - return 2 - - # Run the packer. - packer_work_dir = build_dir / "packer" - output_dir = build_dir / "output" - for d in output_dir, packer_work_dir: - if not d.exists(): - d.mkdir() - - if output_files_out is None: - output_files_out = [] - for output_file, output_name in project.packer.pack_firmware( - packer_work_dir, self.jobserver, version_string=version_string, **dirs - ): - shutil.copy2(output_file, output_dir / output_name) - self.logger.debug("Output file '%s' created.", output_file) - output_files_out.append(output_file) - - return 0 - - def test(self, build_dir): - """Test a build directory.""" - procs = [] - output_files = [] - self.build(build_dir, output_files_out=output_files) - - # If the project built but isn't a test, just bail. - project = zmake.project.Project(build_dir / "project") - if not project.config.is_test: - return 0 - - for output_file in output_files: - self.logger.info("Running tests in %s.", output_file) - with self.jobserver.get_job(): - proc = self.jobserver.popen( - [output_file], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - encoding="utf-8", - errors="replace", - ) - job_id = "test {}".format(output_file) - zmake.multiproc.log_output( - self.logger, - logging.DEBUG, - proc.stdout, - job_id=job_id, - ) - zmake.multiproc.log_output( - self.logger, - logging.ERROR, - proc.stderr, - job_id=job_id, - ) - procs.append(proc) - - for idx, proc in enumerate(procs): - if proc.wait(): - raise OSError(get_process_failure_msg(proc)) - return 0 - - def testall(self): - """Test all the valid test targets""" - tmp_dirs = [] - for project in zmake.project.find_projects(self.module_paths["ec"] / "zephyr"): - is_test = project.config.is_test - temp_build_dir = tempfile.mkdtemp( - suffix="-{}".format(os.path.basename(project.project_dir.as_posix())), - prefix="zbuild-", - ) - tmp_dirs.append(temp_build_dir) - # Configure and run the test. - self.executor.append( - func=lambda: self.configure( - project_dir=project.project_dir, - build_dir=pathlib.Path(temp_build_dir), - build_after_configure=True, - test_after_configure=is_test, - ) - ) - - rv = self.executor.wait() - for tmpdir in tmp_dirs: - shutil.rmtree(tmpdir) - return rv - - def _run_lcov(self, build_dir, lcov_file, initial=False, gcov=""): - gcov = os.path.abspath(gcov) - with self.jobserver.get_job(): - if initial: - self.logger.info("Running (initial) lcov on %s.", build_dir) - else: - self.logger.info("Running lcov on %s.", build_dir) - cmd = [ - "/usr/bin/lcov", - "--gcov-tool", - gcov, - "-q", - "-o", - "-", - "-c", - "-d", - build_dir, - "-t", - lcov_file.stem, - "--exclude", - "*/build-*/zephyr/*/generated/*", - "--exclude", - "*/ec/test/*", - "--exclude", - "*/ec/zephyr/shim/chip/npcx/npcx_monitor/*", - "--exclude", - "*/ec/zephyr/emul/*", - "--exclude", - "*/ec/zephyr/test/*", - "--exclude", - "*/testsuite/*", - "--exclude", - "*/subsys/emul/*", - ] - if initial: - cmd += ["-i"] - proc = self.jobserver.popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - encoding="utf-8", - errors="replace", - ) - zmake.multiproc.log_output( - self.logger, - logging.WARNING, - proc.stderr, - job_id="{}-lcov".format(build_dir), - ) - - with open(lcov_file, "w") as outfile: - for line in proc.stdout: - if line.startswith("SF:"): - path = line[3:].rstrip() - outfile.write("SF:%s\n" % os.path.realpath(path)) - else: - outfile.write(line) - if proc.wait(): - raise OSError(get_process_failure_msg(proc)) - - return 0 - - def _coverage_compile_only(self, project, build_dir, lcov_file): - self.logger.info("Building %s in %s", project.project_dir, build_dir) - rv = self.configure( - project_dir=project.project_dir, - build_dir=build_dir, - build_after_configure=False, - test_after_configure=False, - coverage=True, - ) - if rv: - return rv - - # Compute the version string. - version_string = zmake.version.get_version_string( - project, - build_dir / "zephyr_base", - zmake.modules.locate_from_directory(build_dir / "modules"), - ) - - # The version header needs to generated during the build phase - # instead of configure, as the tree may have changed since - # configure was run. - zmake.version.write_version_header( - version_string, - build_dir / "include" / "ec_version.h", - ) - - # Use ninja to compile the all.libraries target. - build_project = zmake.project.Project(build_dir / "project") - - procs = [] - dirs = {} - gcov = "gcov.sh-not-found" - for build_name, build_config in build_project.iter_builds(): - self.logger.info("Building %s:%s all.libraries.", build_dir, build_name) - dirs[build_name] = build_dir / "build-{}".format(build_name) - gcov = dirs[build_name] / "gcov.sh" - proc = self.jobserver.popen( - ["/usr/bin/ninja", "-C", dirs[build_name], "all.libraries"], - # Ninja will connect as a job client instead and claim - # many jobs. - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - encoding="utf-8", - errors="replace", - ) - job_id = "{}:{}".format(build_dir, build_name) - zmake.multiproc.log_output( - logger=self.logger, - log_level=logging.DEBUG, - file_descriptor=proc.stdout, - log_level_override_func=ninja_stdout_log_level_override, - job_id=job_id, - ) - zmake.multiproc.log_output( - self.logger, - logging.ERROR, - proc.stderr, - job_id=job_id, - ) - if self._sequential: - if proc.wait(): - raise OSError(get_process_failure_msg(proc)) - else: - procs.append(proc) - - for proc in procs: - if proc.wait(): - raise OSError(get_process_failure_msg(proc)) - - return self._run_lcov(build_dir, lcov_file, initial=True, gcov=gcov) - - def _coverage_run_test(self, project, build_dir, lcov_file): - self.logger.info("Running test %s in %s", project.project_dir, build_dir) - rv = self.configure( - project_dir=project.project_dir, - build_dir=build_dir, - build_after_configure=True, - test_after_configure=True, - coverage=True, - ) - if rv: - return rv - gcov = "gcov.sh-not-found" - for build_name, build_config in project.iter_builds(): - gcov = build_dir / "build-{}".format(build_name) / "gcov.sh" - return self._run_lcov(build_dir, lcov_file, initial=False, gcov=gcov) - - def coverage(self, build_dir): - """Builds all targets with coverage enabled, and then runs the tests.""" - all_lcov_files = [] - root_dir = self.module_paths["ec"] / "zephyr" - for project in zmake.project.find_projects(root_dir): - is_test = project.config.is_test - rel_path = project.project_dir.relative_to(root_dir) - project_build_dir = pathlib.Path(build_dir).joinpath(rel_path) - lcov_file = pathlib.Path(build_dir).joinpath( - str(rel_path).replace("/", "_") + ".info" - ) - all_lcov_files.append(lcov_file) - if is_test: - # Configure and run the test. - self.executor.append( - func=lambda: self._coverage_run_test( - project, project_build_dir, lcov_file - ) - ) - else: - # Configure and compile the non-test project. - self.executor.append( - func=lambda: self._coverage_compile_only( - project, project_build_dir, lcov_file - ) - ) - if self._sequential: - rv = self.executor.wait() - if rv: - return rv - - rv = self.executor.wait() - if rv: - return rv - - with self.jobserver.get_job(): - # Merge info files into a single lcov.info - self.logger.info("Merging coverage data into %s.", build_dir / "lcov.info") - cmd = ["/usr/bin/lcov", "-o", build_dir / "lcov.info"] - for info in all_lcov_files: - cmd += ["-a", info] - proc = self.jobserver.popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - encoding="utf-8", - errors="replace", - ) - zmake.multiproc.log_output( - self.logger, logging.ERROR, proc.stderr, job_id="lcov" - ) - zmake.multiproc.log_output( - self.logger, logging.DEBUG, proc.stdout, job_id="lcov" - ) - if proc.wait(): - raise OSError(get_process_failure_msg(proc)) - - # Find the common root dir - prefixdir = os.path.commonprefix(list(self.module_paths.values())) - - # Merge into a nice html report - self.logger.info("Creating coverage report %s.", build_dir / "coverage_rpt") - proc = self.jobserver.popen( - [ - "/usr/bin/genhtml", - "-q", - "-o", - build_dir / "coverage_rpt", - "-t", - "Zephyr EC Unittest", - "-p", - prefixdir, - "-s", - ] - + all_lcov_files, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - encoding="utf-8", - errors="replace", - ) - zmake.multiproc.log_output( - self.logger, logging.ERROR, proc.stderr, job_id="genhtml" - ) - zmake.multiproc.log_output( - self.logger, logging.DEBUG, proc.stdout, job_id="genhtml" - ) - if proc.wait(): - raise OSError(get_process_failure_msg(proc)) - return 0 |