summaryrefslogtreecommitdiff
path: root/zephyr/zmake/zmake
diff options
context:
space:
mode:
Diffstat (limited to 'zephyr/zmake/zmake')
-rw-r--r--zephyr/zmake/zmake/__init__.py0
-rw-r--r--zephyr/zmake/zmake/__main__.py273
-rw-r--r--zephyr/zmake/zmake/build_config.py100
-rw-r--r--zephyr/zmake/zmake/jobserver.py144
-rw-r--r--zephyr/zmake/zmake/modules.py99
-rw-r--r--zephyr/zmake/zmake/multiproc.py322
-rw-r--r--zephyr/zmake/zmake/output_packers.py230
-rw-r--r--zephyr/zmake/zmake/project.py248
-rw-r--r--zephyr/zmake/zmake/toolchains.py154
-rw-r--r--zephyr/zmake/zmake/util.py255
-rw-r--r--zephyr/zmake/zmake/version.py166
-rw-r--r--zephyr/zmake/zmake/zmake.py757
12 files changed, 0 insertions, 2748 deletions
diff --git a/zephyr/zmake/zmake/__init__.py b/zephyr/zmake/zmake/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
--- a/zephyr/zmake/zmake/__init__.py
+++ /dev/null
diff --git a/zephyr/zmake/zmake/__main__.py b/zephyr/zmake/zmake/__main__.py
deleted file mode 100644
index 31f0436b5a..0000000000
--- a/zephyr/zmake/zmake/__main__.py
+++ /dev/null
@@ -1,273 +0,0 @@
-# Copyright 2020 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""The entry point into zmake."""
-import argparse
-import inspect
-import logging
-import os
-import pathlib
-import sys
-
-import zmake.multiproc as multiproc
-import zmake.zmake as zm
-
-
-def maybe_reexec(argv):
- """Re-exec zmake from the EC source tree, if possible and desired.
-
- Zmake installs into the users' chroot, which makes it convenient
- to execute, but can sometimes become tedious when zmake changes
- land and users haven't upgraded their chroots yet.
-
- We can partially subvert this problem by re-execing zmake from the
- source if it's available. This won't make it so developers never
- need to upgrade their chroots (e.g., a toolchain upgrade could
- require chroot upgrades), but at least makes it slightly more
- convenient for an average repo sync.
-
- Args:
- argv: The argument list passed to the main function, not
- including the executable path.
-
- Returns:
- None, if the re-exec did not happen, or never returns if the
- re-exec did happen.
- """
- # We only re-exec if we are inside of a chroot (since if installed
- # standalone using pip, there's already an "editable install"
- # feature for that in pip.)
- env = dict(os.environ)
- srcroot = env.get("CROS_WORKON_SRCROOT")
- if not srcroot:
- return
-
- # If for some reason we decide to move zmake in the future, then
- # we don't want to use the re-exec logic.
- zmake_path = (
- pathlib.Path(srcroot) / "src" / "platform" / "ec" / "zephyr" / "zmake"
- ).resolve()
- if not zmake_path.is_dir():
- return
-
- # If PYTHONPATH is set, it is either because we just did a
- # re-exec, or because the user wants to run a specific copy of
- # zmake. In either case, we don't want to re-exec.
- if "PYTHONPATH" in env:
- return
-
- # Set PYTHONPATH so that we run zmake from source.
- env["PYTHONPATH"] = str(zmake_path)
-
- os.execve(sys.executable, [sys.executable, "-m", "zmake", *argv], env)
-
-
-def call_with_namespace(func, namespace):
- """Call a function with arguments applied from a Namespace.
-
- Args:
- func: The callable to call.
- namespace: The namespace to apply to the callable.
-
- Returns:
- The result of calling the callable.
- """
- kwds = {}
- sig = inspect.signature(func)
- names = [p.name for p in sig.parameters.values()]
- for name, value in vars(namespace).items():
- pyname = name.replace("-", "_")
- if pyname in names:
- kwds[pyname] = value
- return func(**kwds)
-
-
-# Dictionary used to map log level strings to their corresponding int values.
-log_level_map = {
- "DEBUG": logging.DEBUG,
- "INFO": logging.INFO,
- "WARNING": logging.WARNING,
- "ERROR": logging.ERROR,
- "CRITICAL": logging.CRITICAL,
-}
-
-
-def main(argv=None):
- """The main function.
-
- Args:
- argv: Optionally, the command-line to parse, not including argv[0].
-
- Returns:
- Zero upon success, or non-zero upon failure.
- """
- if argv is None:
- argv = sys.argv[1:]
-
- maybe_reexec(argv)
-
- parser = argparse.ArgumentParser()
- parser.add_argument(
- "--checkout", type=pathlib.Path, help="Path to ChromiumOS checkout"
- )
- parser.add_argument(
- "-D",
- "--debug",
- action="store_true",
- default=False,
- help=("Turn on debug features (e.g., stack trace, " "verbose logging)"),
- )
- parser.add_argument(
- "-j",
- "--jobs",
- # TODO(b/178196029): ninja doesn't know how to talk to a
- # jobserver properly and spams our CPU on all cores. Default
- # to -j1 to execute sequentially until we switch to GNU Make.
- default=1,
- type=int,
- help="Degree of multiprogramming to use",
- )
- parser.add_argument(
- "-l",
- "--log-level",
- choices=list(log_level_map.keys()),
- dest="log_level",
- help="Set the logging level (default=INFO)",
- )
- parser.add_argument(
- "-L",
- "--no-log-label",
- action="store_false",
- help="Turn off logging labels",
- dest="log_label",
- default=None,
- )
- parser.add_argument(
- "--log-label",
- action="store_true",
- help="Turn on logging labels",
- dest="log_label",
- default=None,
- )
- parser.add_argument(
- "--modules-dir",
- type=pathlib.Path,
- help="The path to a directory containing all modules "
- "needed. If unspecified, zmake will assume you have "
- "a Chrome OS checkout and try locating them in the "
- "checkout.",
- )
- parser.add_argument(
- "--zephyr-base", type=pathlib.Path, help="Path to Zephyr OS repository"
- )
-
- sub = parser.add_subparsers(dest="subcommand", help="Subcommand")
- sub.required = True
-
- configure = sub.add_parser("configure")
- configure.add_argument(
- "--ignore-unsupported-zephyr-version",
- action="store_true",
- help="Don't warn about using an unsupported Zephyr version",
- )
- configure.add_argument("-t", "--toolchain", help="Name of toolchain to use")
- configure.add_argument(
- "--bringup",
- action="store_true",
- dest="bringup",
- help="Enable bringup debugging features",
- )
- configure.add_argument(
- "-B", "--build-dir", type=pathlib.Path, help="Build directory"
- )
- configure.add_argument(
- "-b",
- "--build",
- action="store_true",
- dest="build_after_configure",
- help="Run the build after configuration",
- )
- configure.add_argument(
- "--test",
- action="store_true",
- dest="test_after_configure",
- help="Test the .elf file after configuration",
- )
- configure.add_argument(
- "project_dir", type=pathlib.Path, help="Path to the project to build"
- )
- configure.add_argument(
- "-c",
- "--coverage",
- action="store_true",
- dest="coverage",
- help="Enable CONFIG_COVERAGE Kconfig.",
- )
-
- build = sub.add_parser("build")
- build.add_argument(
- "build_dir",
- type=pathlib.Path,
- help="The build directory used during configuration",
- )
- build.add_argument(
- "-w",
- "--fail-on-warnings",
- action="store_true",
- help="Exit with code 2 if warnings are detected",
- )
-
- test = sub.add_parser("test")
- test.add_argument(
- "build_dir",
- type=pathlib.Path,
- help="The build directory used during configuration",
- )
-
- sub.add_parser("testall")
-
- coverage = sub.add_parser("coverage")
- coverage.add_argument(
- "build_dir",
- type=pathlib.Path,
- help="The build directory used during configuration",
- )
-
- opts = parser.parse_args(argv)
-
- # Default logging
- log_level = logging.INFO
- log_label = False
-
- if opts.log_level:
- log_level = log_level_map[opts.log_level]
- log_label = True
- elif opts.debug:
- log_level = logging.DEBUG
- log_label = True
-
- if opts.log_label is not None:
- log_label = opts.log_label
- if log_label:
- log_format = "%(levelname)s: %(message)s"
- else:
- log_format = "%(message)s"
- multiproc.log_job_names = False
-
- logging.basicConfig(format=log_format, level=log_level)
-
- if not opts.debug:
- sys.tracebacklimit = 0
-
- try:
- zmake = call_with_namespace(zm.Zmake, opts)
- subcommand_method = getattr(zmake, opts.subcommand.replace("-", "_"))
- result = call_with_namespace(subcommand_method, opts)
- return result
- finally:
- multiproc.wait_for_log_end()
-
-
-if __name__ == "__main__":
- sys.exit(main())
diff --git a/zephyr/zmake/zmake/build_config.py b/zephyr/zmake/zmake/build_config.py
deleted file mode 100644
index 9a9c7f36a2..0000000000
--- a/zephyr/zmake/zmake/build_config.py
+++ /dev/null
@@ -1,100 +0,0 @@
-# Copyright 2020 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-"""Encapsulation of a build configuration."""
-
-
-import zmake.util as util
-
-
-class BuildConfig:
- """A container for build configurations.
-
- A build config is a tuple of environment variables, cmake
- variables, kconfig definitons, and kconfig files.
- """
-
- def __init__(
- self, environ_defs={}, cmake_defs={}, kconfig_defs={}, kconfig_files=[]
- ):
- self.environ_defs = dict(environ_defs)
- self.cmake_defs = dict(cmake_defs)
- self.kconfig_defs = dict(kconfig_defs)
- self.kconfig_files = kconfig_files
-
- def popen_cmake(
- self, jobclient, project_dir, build_dir, kconfig_path=None, **kwargs
- ):
- """Run Cmake with this config using a jobclient.
-
- Args:
- jobclient: A JobClient instance.
- project_dir: The project directory.
- build_dir: Directory to use for Cmake build.
- kconfig_path: The path to write out Kconfig definitions.
- kwargs: forwarded to popen.
- """
- kconfig_files = list(self.kconfig_files)
- if kconfig_path:
- util.write_kconfig_file(kconfig_path, self.kconfig_defs)
- kconfig_files.append(kconfig_path)
- elif self.kconfig_defs:
- raise ValueError(
- "Cannot start Cmake on a config with Kconfig items without a "
- "kconfig_path"
- )
-
- if kconfig_files:
- base_config = BuildConfig(
- environ_defs=self.environ_defs, cmake_defs=self.cmake_defs
- )
- conf_file_config = BuildConfig(
- cmake_defs={
- "CONF_FILE": ";".join(str(p.resolve()) for p in kconfig_files)
- }
- )
- return (base_config | conf_file_config).popen_cmake(
- jobclient, project_dir, build_dir, **kwargs
- )
-
- kwargs["env"] = dict(**kwargs.get("env", {}), **self.environ_defs)
- return jobclient.popen(
- [
- "/usr/bin/cmake",
- "-S",
- project_dir,
- "-B",
- build_dir,
- "-GNinja",
- *("-D{}={}".format(*pair) for pair in self.cmake_defs.items()),
- ],
- **kwargs
- )
-
- def __or__(self, other):
- """Combine two BuildConfig instances."""
- if not isinstance(other, BuildConfig):
- raise TypeError(
- "Unsupported operation | for {} and {}".format(type(self), type(other))
- )
-
- return BuildConfig(
- environ_defs=dict(**self.environ_defs, **other.environ_defs),
- cmake_defs=dict(**self.cmake_defs, **other.cmake_defs),
- kconfig_defs=dict(**self.kconfig_defs, **other.kconfig_defs),
- kconfig_files=list({*self.kconfig_files, *other.kconfig_files}),
- )
-
- def __repr__(self):
- return "BuildConfig({})".format(
- ", ".join(
- "{}={!r}".format(name, getattr(self, name))
- for name in [
- "environ_defs",
- "cmake_defs",
- "kconfig_defs",
- "kconfig_files",
- ]
- if getattr(self, name)
- )
- )
diff --git a/zephyr/zmake/zmake/jobserver.py b/zephyr/zmake/zmake/jobserver.py
deleted file mode 100644
index 69199a2dc8..0000000000
--- a/zephyr/zmake/zmake/jobserver.py
+++ /dev/null
@@ -1,144 +0,0 @@
-# Copyright 2020 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-"""Module for job counters, limiting the amount of concurrent executions."""
-
-import logging
-import multiprocessing
-import os
-import re
-import select
-import subprocess
-
-import zmake
-
-
-class JobHandle:
- """Small object to handle claim of a job."""
-
- def __init__(self, release_func, *args, **kwargs):
- self.release_func = release_func
- self.args = args
- self.kwargs = kwargs
-
- def __enter__(self):
- pass
-
- def __exit__(self, exc_type, exc_value, traceback):
- self.release_func(*self.args, **self.kwargs)
-
-
-class JobClient:
- """Abstract base class for all job clients."""
-
- def get_job(self):
- """Claim a job."""
- raise NotImplementedError("Abstract method not implemented")
-
- def env(self):
- """Get the environment variables necessary to share the job server."""
- return {}
-
- def popen(self, *args, **kwargs):
- """Start a process using subprocess.Popen
-
- All other arguments are passed to subprocess.Popen.
-
- Returns:
- A Popen object.
- """
- kwargs.setdefault("env", os.environ)
- kwargs["env"].update(self.env())
-
- logger = logging.getLogger(self.__class__.__name__)
- logger.debug("Running %s", zmake.util.repr_command(*args))
- return subprocess.Popen(*args, **kwargs)
-
- def run(self, *args, claim_job=True, **kwargs):
- """Run a process using subprocess.run, optionally claiming a job.
-
- Args:
- claim_job: True if a job should be claimed.
-
- All other arguments are passed to subprocess.run.
-
- Returns:
- A CompletedProcess object.
- """
- if claim_job:
- with self.get_job():
- return self.run(*args, claim_job=False, **kwargs)
-
- kwargs.setdefault("env", os.environ)
- kwargs["env"].update(self.env())
-
- return subprocess.run(*args, **kwargs)
-
-
-class JobServer(JobClient):
- """Abstract Job Server."""
-
- def __init__(self, jobs=0):
- raise NotImplementedError("Abstract method not implemented")
-
-
-class GNUMakeJobClient(JobClient):
- def __init__(self, read_fd, write_fd):
- self._pipe = [read_fd, write_fd]
-
- @classmethod
- def from_environ(cls, env=None):
- """Create a job client from an environment with the MAKEFLAGS variable.
-
- If we are started under a GNU Make Job Server, we can search
- the environment for a string "--jobserver-auth=R,W", where R
- and W will be the read and write file descriptors to the pipe
- respectively. If we don't find this environment variable (or
- the string inside of it), this will raise an OSError.
-
- Args:
- env: Optionally, the environment to search.
-
- Returns:
- A GNUMakeJobClient configured appropriately.
- """
- if env is None:
- env = os.environ
- makeflags = env.get("MAKEFLAGS")
- if not makeflags:
- raise OSError("MAKEFLAGS is not set in the environment")
- match = re.search(r"--jobserver-auth=(\d+),(\d+)", makeflags)
- if not match:
- raise OSError("MAKEFLAGS did not contain jobserver flags")
- read_fd, write_fd = map(int, match.groups())
- return cls(read_fd, write_fd)
-
- def get_job(self):
- """Claim a job.
-
- Returns:
- A JobHandle object.
- """
- byte = os.read(self._pipe[0], 1)
- return JobHandle(lambda: os.write(self._pipe[1], byte))
-
- def env(self):
- """Get the environment variables necessary to share the job server."""
- return {"MAKEFLAGS": "--jobserver-auth={},{}".format(*self._pipe)}
-
-
-class GNUMakeJobServer(JobServer, GNUMakeJobClient):
- """Implements a GNU Make POSIX Job Server.
-
- See https://www.gnu.org/software/make/manual/html_node/POSIX-Jobserver.html
- for specification.
- """
-
- def __init__(self, jobs=0):
- if not jobs:
- jobs = multiprocessing.cpu_count()
- elif jobs > select.PIPE_BUF:
- jobs = select.PIPE_BUF
-
- self._pipe = os.pipe()
- os.write(self._pipe[1], b"+" * jobs)
diff --git a/zephyr/zmake/zmake/modules.py b/zephyr/zmake/zmake/modules.py
deleted file mode 100644
index 5ba0ef73f8..0000000000
--- a/zephyr/zmake/zmake/modules.py
+++ /dev/null
@@ -1,99 +0,0 @@
-# Copyright 2020 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-"""Registry of known Zephyr modules."""
-
-import zmake.build_config as build_config
-import zmake.util as util
-
-
-def third_party_module(name, checkout):
- """Common callback in registry for all third_party/zephyr modules.
-
- Args:
- name: The name of the module.
- checkout: The path to the chromiumos source.
-
- Return:
- The path to the module module.
- """
- return checkout / "src" / "third_party" / "zephyr" / name
-
-
-known_modules = {
- "hal_stm32": third_party_module,
- "cmsis": third_party_module,
- "ec": lambda name, checkout: (checkout / "src" / "platform" / "ec"),
- "nanopb": third_party_module,
-}
-
-
-def locate_from_checkout(checkout_dir):
- """Find modules from a Chrome OS checkout.
-
- Important: this function should only conditionally be called if a
- checkout exists. Zmake *can* be used without a Chrome OS source
- tree. You should call locate_from_directory if outside of a
- Chrome OS source tree.
-
- Args:
- checkout_dir: The path to the chromiumos source.
-
- Returns:
- A dictionary mapping module names to paths.
- """
- result = {}
- for name, locator in known_modules.items():
- result[name] = locator(name, checkout_dir)
- return result
-
-
-def locate_from_directory(directory):
- """Create a modules dictionary from a directory.
-
- This takes a directory, and searches for the known module names
- located in it.
-
- Args:
- directory: the directory to search in.
-
- Returns:
- A dictionary mapping module names to paths.
- """
- result = {}
-
- for name in known_modules:
- modpath = (directory / name).resolve()
- if (modpath / "zephyr" / "module.yml").is_file():
- result[name] = modpath
-
- return result
-
-
-def setup_module_symlinks(output_dir, modules):
- """Setup a directory with symlinks to modules.
-
- Args:
- output_dir: The directory to place the symlinks in.
- modules: A dictionary of module names mapping to paths.
-
- Returns:
- The resultant BuildConfig that should be applied to use each
- of these modules.
- """
- if not output_dir.exists():
- output_dir.mkdir(parents=True)
-
- module_links = []
-
- for name, path in modules.items():
- link_path = output_dir.resolve() / name
- util.update_symlink(path, link_path)
- module_links.append(link_path)
-
- if module_links:
- return build_config.BuildConfig(
- cmake_defs={"ZEPHYR_MODULES": ";".join(map(str, module_links))}
- )
- else:
- return build_config.BuildConfig()
diff --git a/zephyr/zmake/zmake/multiproc.py b/zephyr/zmake/zmake/multiproc.py
deleted file mode 100644
index 5e98374c8c..0000000000
--- a/zephyr/zmake/zmake/multiproc.py
+++ /dev/null
@@ -1,322 +0,0 @@
-# Copyright 2020 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-import collections
-import logging
-import os
-import select
-import threading
-
-"""Zmake multiprocessing utility module.
-
-This module is used to aid in zmake's multiprocessing. It contains tools
-available to log output from multiple processes on the fly. This means that a
-process does not need to finish before the output is available to the developer
-on the screen.
-"""
-
-# A local pipe use to signal the look that a new file descriptor was added and
-# should be included in the select statement.
-_logging_interrupt_pipe = os.pipe()
-# A condition variable used to synchronize logging operations.
-_logging_cv = threading.Condition()
-# A map of file descriptors to their LogWriter
-_logging_map = {}
-# Should we log job names or not
-log_job_names = True
-
-
-def reset():
- """Reset this module to its starting state (useful for tests)"""
- global _logging_map
-
- _logging_map = {}
-
-
-class LogWriter:
- """Contains information about a file descriptor that is producing output
-
- There is typically one of these for each file descriptor that a process is
- writing to while running (stdout and stderr).
-
- Properties:
- _logger: The logger object to use.
- _log_level: The logging level to use.
- _override_func: A function used to override the log level. The
- function will be called once per line prior to logging and will be
- passed the arguments of the line and the default log level.
- _written_at_level: dict:
- key: log_level
- value: True if output was written at that level
- _job_id: The name to prepend to logged lines
- _file_descriptor: The file descriptor being logged.
- """
-
- def __init__(
- self, logger, log_level, log_level_override_func, job_id, file_descriptor
- ):
- self._logger = logger
- self._log_level = log_level
- self._override_func = log_level_override_func
- # A map whether output was printed at each logging level
- self._written_at_level = collections.defaultdict(lambda: False)
- self._job_id = job_id
- self._file_descriptor = file_descriptor
-
- def log_line(self, line):
- """Log a line of output
-
- If the log-level override function requests a change in log level, that
- causes self._log_level to be updated accordingly.
-
- Args:
- line: Text line to log
- """
- if self._override_func:
- # Get the new log level and update the default. The reason we
- # want to update the default is that if we hit an error, all
- # future logging should be moved to the new logging level. This
- # greatly simplifies the logic that is needed to update the log
- # level.
- self._log_level = self._override_func(line, self._log_level)
- if self._job_id and log_job_names:
- self._logger.log(self._log_level, "[%s]%s", self._job_id, line)
- else:
- self._logger.log(self._log_level, line)
- self._written_at_level[self._log_level] = True
-
- def has_written(self, log_level):
- """Check if output was written at a certain log level
-
- Args:
- log_level: log level to check
-
- Returns:
- True if any output was written at that log level, False if not
- """
- return self._written_at_level[log_level]
-
- def wait(self):
- """Wait for this LogWriter to finish.
-
- This method will block execution until all the logs have been flushed out.
- """
- with _logging_cv:
- _logging_cv.wait_for(lambda: self._file_descriptor not in _logging_map)
-
-
-def _log_fd(fd):
- """Log information from a single file descriptor.
-
- This function is BLOCKING. It will read from the given file descriptor until
- either the end of line is read or EOF. Once EOF is read it will remove the
- file descriptor from _logging_map so it will no longer be used.
- Additionally, in some cases, the file descriptor will be closed (caused by
- a call to Popen.wait()). In these cases, the file descriptor will also be
- removed from the map as it is no longer valid.
- """
- with _logging_cv:
- writer = _logging_map[fd]
- if fd.closed:
- del _logging_map[fd]
- _logging_cv.notify_all()
- return
- line = fd.readline()
- if not line:
- # EOF
- del _logging_map[fd]
- _logging_cv.notify_all()
- return
- line = line.rstrip("\n")
- if line:
- writer.log_line(line)
-
-
-def _prune_logging_fds():
- """Prune the current file descriptors under _logging_map.
-
- This function will iterate over the logging map and check for closed file
- descriptors. Every closed file descriptor will be removed.
- """
- with _logging_cv:
- remove = [fd for fd in _logging_map.keys() if fd.closed]
- for fd in remove:
- del _logging_map[fd]
- if remove:
- _logging_cv.notify_all()
-
-
-def _logging_loop():
- """The primary logging thread loop.
-
- This is the entry point of the logging thread. It will listen for (1) any
- new data on the output file descriptors that were added via log_output() and
- (2) any new file descriptors being added by log_output(). Once a file
- descriptor is ready to be read, this function will call _log_fd to perform
- the actual read and logging.
- """
- while True:
- with _logging_cv:
- _logging_cv.wait_for(lambda: _logging_map)
- keys = list(_logging_map.keys()) + [_logging_interrupt_pipe[0]]
- try:
- fds, _, _ = select.select(keys, [], [])
- except ValueError:
- # One of the file descriptors must be closed, prune them and try
- # again.
- _prune_logging_fds()
- continue
- if _logging_interrupt_pipe[0] in fds:
- # We got a dummy byte sent by log_output(), this is a signal used to
- # break out of the blocking select.select call to tell us that the
- # file descriptor set has changed. We just need to read the byte and
- # remove this descriptor from the list. If we actually have data
- # that should be read it will be read in the for loop below.
- os.read(_logging_interrupt_pipe[0], 1)
- fds.remove(_logging_interrupt_pipe[0])
- for fd in fds:
- _log_fd(fd)
-
-
-_logging_thread = None
-
-
-def log_output(
- logger, log_level, file_descriptor, log_level_override_func=None, job_id=None
-):
- """Log the output from the given file descriptor.
-
- Args:
- logger: The logger object to use.
- log_level: The logging level to use.
- file_descriptor: The file descriptor to read from.
- log_level_override_func: A function used to override the log level. The
- function will be called once per line prior to logging and will be
- passed the arguments of the line and the default log level.
-
- Returns:
- LogWriter object for the resulting output
- """
- with _logging_cv:
- global _logging_thread
- if _logging_thread is None or not _logging_thread.is_alive():
- # First pass or thread must have died, create a new one.
- _logging_thread = threading.Thread(target=_logging_loop, daemon=True)
- _logging_thread.start()
-
- writer = LogWriter(
- logger, log_level, log_level_override_func, job_id, file_descriptor
- )
- _logging_map[file_descriptor] = writer
- # Write a dummy byte to the pipe to break the select so we can add the
- # new fd.
- os.write(_logging_interrupt_pipe[1], b"x")
- # Notify the condition so we can run the select on the current fds.
- _logging_cv.notify_all()
- return writer
-
-
-def wait_for_log_end():
- """Wait for all the logs to be printed.
-
- This method will block execution until all the logs have been flushed out.
- """
- with _logging_cv:
- _logging_cv.wait_for(lambda: not _logging_map)
-
-
-class Executor:
- """Parallel executor helper class.
-
- This class is used to run multiple functions in parallel. The functions MUST
- return an integer result code (or throw an exception). This class will start
- a thread per operation and wait() for all the threads to resolve.
-
- Attributes:
- lock: The condition variable used to synchronize across threads.
- threads: A list of threading.Thread objects currently under this
- Executor.
- results: A list of result codes returned by each of the functions called
- by this Executor.
- """
-
- def __init__(self):
- self.lock = threading.Condition()
- self.threads = []
- self.results = []
- self.logger = logging.getLogger(self.__class__.__name__)
-
- def append(self, func):
- """Append the given function to the wait list.
-
- Once added, the function's return value will be used to determine the
- Executor's final result value. The function must return an int result
- code or throw an exception. For example: If two functions were added
- to the Executor, they will both be run in parallel and their results
- will determine whether or not the Executor succeeded. If both functions
- returned 0, then the Executor's wait function will also return 0.
-
- Args:
- func: A function which returns an int result code or throws an
- exception.
- """
- with self.lock:
- thread = threading.Thread(target=lambda: self._run_fn(func), daemon=True)
- thread.start()
- self.threads.append(thread)
-
- def wait(self):
- """Wait for a result to be available.
-
- This function waits for the executor to resolve (i.e., all
- threads have finished).
-
- Returns:
- An integer result code of either the first failed function or 0 if
- they all succeeded.
- """
- with self.lock:
- self.lock.wait_for(predicate=lambda: self._is_finished)
- return self._result
-
- def _run_fn(self, func):
- """Entry point to each running thread.
-
- This function will run the function provided in the append() function.
- The result value of the function will be used to determine the
- Executor's result value. If the function throws any exception it will be
- caught and -1 will be used as the assumed result value.
-
- Args:
- func: The function to run.
- """
- try:
- result = func()
- except Exception as ex:
- self.logger.exception(ex)
- result = -1
- with self.lock:
- self.results.append(result)
- self.lock.notify_all()
-
- @property
- def _is_finished(self):
- """Whether or not the Executor is considered to be done.
-
- Returns:
- True if the Executor is considered done.
- """
- if len(self.threads) == len(self.results):
- return True
- return False
-
- @property
- def _result(self):
- """The result code of the Executor.
-
- Note that _is_finished must be True for this to have any meaning.
-
- Returns:
- An int representing the result value of the underlying functions.
- """
- return next((result for result in self.results if result), 0)
diff --git a/zephyr/zmake/zmake/output_packers.py b/zephyr/zmake/zmake/output_packers.py
deleted file mode 100644
index 1ba38cf96c..0000000000
--- a/zephyr/zmake/zmake/output_packers.py
+++ /dev/null
@@ -1,230 +0,0 @@
-# Copyright 2020 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-"""Types which provide many builds and composite them into a single binary."""
-import logging
-import shutil
-import subprocess
-
-import zmake.build_config as build_config
-import zmake.multiproc
-import zmake.util as util
-
-
-class BasePacker:
- """Abstract base for all packers."""
-
- def __init__(self, project):
- self.project = project
-
- def configs(self):
- """Get all of the build configurations necessary.
-
- Yields:
- 2-tuples of config name and a BuildConfig.
- """
- yield "singleimage", build_config.BuildConfig()
-
- def pack_firmware(self, work_dir, jobclient, version_string=""):
- """Pack a firmware image.
-
- Config names from the configs generator are passed as keyword
- arguments, with each argument being set to the path of the
- build directory.
-
- Args:
- work_dir: A directory to write outputs and temporary files
- into.
- jobclient: A JobClient object to use.
- version_string: The version string, which may end up in
- certain parts of the outputs.
-
- Yields:
- 2-tuples of the path of each file in the work_dir (or any
- other directory) which should be copied into the output
- directory, and the output filename.
- """
- raise NotImplementedError("Abstract method not implemented")
-
- def _get_max_image_bytes(self):
- """Get the maximum allowed image size (in bytes).
-
- This value will generally be found in CONFIG_FLASH_SIZE but may vary
- depending on the specific way things are being packed.
-
- Returns:
- The maximum allowed size of the image in bytes.
- """
- raise NotImplementedError("Abstract method not implemented")
-
- def _is_size_bound(self, path):
- """Check whether the given path should be constrained by size.
-
- Generally, .elf files will be unconstrained while .bin files will be
- constrained.
-
- Args:
- path: A file's path to test.
-
- Returns:
- True if the file size should be checked. False otherwise.
- """
- return path.suffix == ".bin"
-
- def _check_packed_file_size(self, file, dirs):
- """Check that a packed file passes size constraints.
-
- Args:
- file: A file to test.
- dirs: A map of the arguments to pass to _get_max_image_bytes
-
- Returns:
- The file if it passes the test.
- """
- if not self._is_size_bound(
- file
- ) or file.stat().st_size <= self._get_max_image_bytes(**dirs):
- return file
- raise RuntimeError("Output file ({}) too large".format(file))
-
-
-class ElfPacker(BasePacker):
- """Raw proxy for ELF output of a single build."""
-
- def pack_firmware(self, work_dir, jobclient, singleimage, version_string=""):
- yield singleimage / "zephyr" / "zephyr.elf", "zephyr.elf"
-
-
-class RawBinPacker(BasePacker):
- """Raw proxy for zephyr.bin output of a single build."""
-
- def pack_firmware(self, work_dir, jobclient, singleimage, version_string=""):
- yield singleimage / "zephyr" / "zephyr.bin", "zephyr.bin"
-
-
-class BinmanPacker(BasePacker):
- """Packer for RO/RW image to generate a .bin build using FMAP."""
-
- ro_file = "zephyr.bin"
- rw_file = "zephyr.bin"
-
- def __init__(self, project):
- self.logger = logging.getLogger(self.__class__.__name__)
- super().__init__(project)
-
- def configs(self):
- yield "ro", build_config.BuildConfig(kconfig_defs={"CONFIG_CROS_EC_RO": "y"})
- yield "rw", build_config.BuildConfig(kconfig_defs={"CONFIG_CROS_EC_RW": "y"})
-
- def pack_firmware(self, work_dir, jobclient, ro, rw, version_string=""):
- """Pack RO and RW sections using Binman.
-
- Binman configuration is expected to be found in the RO build
- device-tree configuration.
-
- Args:
- work_dir: The directory used for packing.
- jobclient: The client used to run subprocesses.
- ro: Directory containing the RO image build.
- rw: Directory containing the RW image build.
- version_string: The version string to use in FRID/FWID.
-
- Yields:
- 2-tuples of the path of each file in the work_dir that
- should be copied into the output directory, and the output
- filename.
- """
- dts_file_path = ro / "zephyr" / "zephyr.dts"
-
- # Copy the inputs into the work directory so that Binman can
- # find them under a hard-coded name.
- shutil.copy2(ro / "zephyr" / self.ro_file, work_dir / "zephyr_ro.bin")
- shutil.copy2(rw / "zephyr" / self.rw_file, work_dir / "zephyr_rw.bin")
-
- # Version in FRID/FWID can be at most 31 bytes long (32, minus
- # one for null character).
- if len(version_string) > 31:
- version_string = version_string[:31]
-
- proc = jobclient.popen(
- [
- "binman",
- "-v",
- "5",
- "build",
- "-a",
- "version={}".format(version_string),
- "-d",
- dts_file_path,
- "-m",
- "-O",
- work_dir,
- ],
- cwd=work_dir,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- encoding="utf-8",
- )
-
- zmake.multiproc.log_output(self.logger, logging.DEBUG, proc.stdout)
- zmake.multiproc.log_output(self.logger, logging.ERROR, proc.stderr)
- if proc.wait(timeout=60):
- raise OSError("Failed to run binman")
-
- yield work_dir / "zephyr.bin", "zephyr.bin"
- yield ro / "zephyr" / "zephyr.elf", "zephyr.ro.elf"
- yield rw / "zephyr" / "zephyr.elf", "zephyr.rw.elf"
-
-
-class NpcxPacker(BinmanPacker):
- """Packer for RO/RW image to generate a .bin build using FMAP.
-
- This expects that the build is setup to generate a
- zephyr.npcx.bin for the RO image, which should be packed using
- Nuvoton's loader format.
- """
-
- ro_file = "zephyr.npcx.bin"
- npcx_monitor = "npcx_monitor.bin"
-
- # TODO(b/192401039): CONFIG_FLASH_SIZE is nuvoton-only. Since
- # binman already checks sizes, perhaps we can just remove this
- # code?
- def _get_max_image_bytes(self, ro, rw):
- ro_size = util.read_kconfig_autoconf_value(
- ro / "zephyr" / "include" / "generated", "CONFIG_FLASH_SIZE"
- )
- rw_size = util.read_kconfig_autoconf_value(
- ro / "zephyr" / "include" / "generated", "CONFIG_FLASH_SIZE"
- )
- return max(int(ro_size, 0), int(rw_size, 0)) * 1024
-
- # This can probably be removed too and just rely on binman to
- # check the sizes... see the comment above.
- def pack_firmware(self, work_dir, jobclient, ro, rw, version_string=""):
- for path, output_file in super().pack_firmware(
- work_dir,
- jobclient,
- ro,
- rw,
- version_string=version_string,
- ):
- if output_file == "zephyr.bin":
- yield (
- self._check_packed_file_size(path, {"ro": ro, "rw": rw}),
- "zephyr.bin",
- )
- else:
- yield path, output_file
-
- # Include the NPCX monitor file as an output artifact.
- yield ro / self.npcx_monitor, self.npcx_monitor
-
-
-# A dictionary mapping packer config names to classes.
-packer_registry = {
- "binman": BinmanPacker,
- "elf": ElfPacker,
- "npcx": NpcxPacker,
- "raw": RawBinPacker,
-}
diff --git a/zephyr/zmake/zmake/project.py b/zephyr/zmake/zmake/project.py
deleted file mode 100644
index 84151a90b3..0000000000
--- a/zephyr/zmake/zmake/project.py
+++ /dev/null
@@ -1,248 +0,0 @@
-# Copyright 2020 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-"""Module for project config wrapper object."""
-
-import logging
-import pathlib
-import warnings
-
-import yaml
-
-import zmake.build_config as build_config
-import zmake.modules as modules
-import zmake.output_packers as packers
-import zmake.toolchains as toolchains
-import zmake.util as util
-
-# The version of jsonschema in the chroot has a bunch of
-# DeprecationWarnings that fire when we import it. Suppress these
-# during the import to keep the noise down.
-with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- import jsonschema
-
-
-def module_dts_overlay_name(modpath, board_name):
- """Given a board name, return the expected DTS overlay path.
-
- Args:
- modpath: the module path as a pathlib.Path object
- board_name: the name of the board
-
- Returns:
- A pathlib.Path object to the expected overlay path.
- """
- return modpath / "zephyr" / "dts" / "board-overlays" / "{}.dts".format(board_name)
-
-
-def find_projects(root_dir):
- """Finds all zmake projects in root_dir.
-
- Args:
- root_dir: the root dir as a pathlib.Path object
-
- Yields:
- Project: The next project found.
- """
- logging.info("Finding zmake targets under '%s'.", root_dir)
- for path in pathlib.Path(root_dir).rglob("zmake.yaml"):
- yield Project(path.parent)
-
-
-class ProjectConfig:
- """An object wrapping zmake.yaml."""
-
- validator = jsonschema.Draft7Validator
- schema = {
- "type": "object",
- "required": [
- "board",
- "output-type",
- "supported-toolchains",
- "supported-zephyr-versions",
- ],
- "properties": {
- "supported-zephyr-versions": {
- "type": "array",
- "items": {
- "type": "string",
- "enum": ["v2.6"],
- },
- "minItems": 1,
- "uniqueItems": True,
- },
- "board": {
- "type": "string",
- },
- "modules": {
- "type": "array",
- "items": {
- "type": "string",
- "enum": list(modules.known_modules),
- },
- },
- "output-type": {
- "type": "string",
- "enum": list(packers.packer_registry),
- },
- "supported-toolchains": {
- "type": "array",
- "items": {
- "type": "string",
- "enum": list(toolchains.support_classes),
- },
- },
- "is-test": {
- "type": "boolean",
- },
- "dts-overlays": {
- "type": "array",
- "items": {
- "type": "string",
- },
- },
- },
- }
-
- def __init__(self, config_dict):
- self.validator.check_schema(self.schema)
- jsonschema.validate(config_dict, self.schema, cls=self.validator)
- self.config_dict = config_dict
-
- @property
- def supported_zephyr_versions(self):
- return [
- util.parse_zephyr_version(x)
- for x in self.config_dict["supported-zephyr-versions"]
- ]
-
- @property
- def board(self):
- return self.config_dict["board"]
-
- @property
- def modules(self):
- return self.config_dict.get("modules", list(modules.known_modules))
-
- @property
- def output_packer(self):
- return packers.packer_registry[self.config_dict["output-type"]]
-
- @property
- def supported_toolchains(self):
- return self.config_dict["supported-toolchains"]
-
- @property
- def is_test(self):
- return self.config_dict.get("is-test", False)
-
- @property
- def dts_overlays(self):
- return self.config_dict.get("dts-overlays", [])
-
-
-class Project:
- """An object encapsulating a project directory."""
-
- def __init__(self, project_dir, config_dict=None):
- self.project_dir = project_dir.resolve()
- if not config_dict:
- with open(self.project_dir / "zmake.yaml") as f:
- config_dict = yaml.safe_load(f)
- self.config = ProjectConfig(config_dict)
- self.packer = self.config.output_packer(self)
-
- def iter_builds(self):
- """Iterate thru the build combinations provided by the project's packer.
-
- Yields:
- 2-tuples of a build configuration name and a BuildConfig.
- """
- conf = build_config.BuildConfig(cmake_defs={"BOARD": self.config.board})
- prj_conf = self.project_dir / "prj.conf"
- if prj_conf.is_file():
- conf |= build_config.BuildConfig(kconfig_files=[prj_conf])
- for build_name, packer_config in self.packer.configs():
- yield build_name, conf | packer_config
-
- def find_dts_overlays(self, modules):
- """Find appropriate dts overlays from registered modules.
-
- Args:
- modules: A dictionary of module names mapping to paths.
-
- Returns:
- A BuildConfig with relevant configurations to enable the
- found DTS overlay files.
- """
- overlays = []
- for module_path in modules.values():
- dts_path = module_dts_overlay_name(module_path, self.config.board)
- if dts_path.is_file():
- overlays.append(dts_path.resolve())
-
- overlays.extend(self.project_dir / f for f in self.config.dts_overlays)
-
- if overlays:
- return build_config.BuildConfig(
- cmake_defs={"DTC_OVERLAY_FILE": ";".join(map(str, overlays))}
- )
- else:
- return build_config.BuildConfig()
-
- def prune_modules(self, module_paths):
- """Reduce a modules dict to the ones required by this project.
-
- If this project does not define a modules list in the
- configuration, it is assumed that all known modules to Zmake
- are required. This is typically inconsequential as Zephyr
- module design conventions require a Kconfig option to actually
- enable most modules.
-
- Args:
- module_paths: A dictionary mapping module names to their
- paths. This dictionary is not modified.
-
- Returns:
- A new module_paths dictionary with only the modules
- required by this project.
-
- Raises:
- A KeyError, if a required module is unavailable.
- """
- result = {}
- for module in self.config.modules:
- try:
- result[module] = module_paths[module]
- except KeyError as e:
- raise KeyError(
- "The {!r} module is required by the {} project, but is not "
- "available.".format(module, self.project_dir)
- ) from e
- return result
-
- def get_toolchain(self, module_paths, override=None):
- if override:
- if override not in self.config.supported_toolchains:
- logging.warning(
- "Toolchain %r isn't supported by this project. You're on your own.",
- override,
- )
- support_class = toolchains.support_classes.get(
- override, toolchains.GenericToolchain
- )
- return support_class(name=override, modules=module_paths)
- else:
- for name in self.config.supported_toolchains:
- support_class = toolchains.support_classes[name]
- toolchain = support_class(name=name, modules=module_paths)
- if toolchain.probe():
- logging.info("Toolchain %r selected by probe function.", toolchain)
- return toolchain
- raise OSError(
- "No supported toolchains could be found on your system. If you see "
- "this message in the chroot, it indicates a bug. Otherwise, you'll "
- "either want to setup your system with a supported toolchain, or "
- "manually select an unsupported toolchain with the -t flag."
- )
diff --git a/zephyr/zmake/zmake/toolchains.py b/zephyr/zmake/zmake/toolchains.py
deleted file mode 100644
index 924448aec5..0000000000
--- a/zephyr/zmake/zmake/toolchains.py
+++ /dev/null
@@ -1,154 +0,0 @@
-# Copyright 2020 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-"""Definitions of toolchain variables."""
-
-import os
-import pathlib
-
-import zmake.build_config as build_config
-
-
-class GenericToolchain:
- """Default toolchain if not known to zmake.
-
- Simply pass ZEPHYR_TOOLCHAIN_VARIANT=name to the build, with
- nothing extra.
- """
-
- def __init__(self, name, modules=None):
- self.name = name
- self.modules = modules or {}
-
- def probe(self):
- """Probe if the toolchain is available on the system."""
- # Since the toolchain is not known to zmake, we have no way to
- # know if it's installed. Simply return False to indicate not
- # installed. An unknown toolchain would only be used if -t
- # was manually passed to zmake, and is not valid to put in a
- # zmake.yaml file.
- return False
-
- def get_build_config(self):
- """Get the build configuration for the toolchain.
-
- Returns:
- A build_config.BuildConfig to be applied to the build.
- """
- return build_config.BuildConfig(
- cmake_defs={
- "ZEPHYR_TOOLCHAIN_VARIANT": self.name,
- },
- )
-
-
-class CorebootSdkToolchain(GenericToolchain):
- def probe(self):
- # For now, we always assume it's at /opt/coreboot-sdk, since
- # that's where it's installed in the chroot. We may want to
- # consider adding support for a coreboot-sdk built in the
- # user's home directory, for example, which happens if a
- # "make crossgcc" is done from the coreboot repository.
- return pathlib.Path("/opt/coreboot-sdk").is_dir()
-
- def get_build_config(self):
- return (
- build_config.BuildConfig(
- cmake_defs={
- "TOOLCHAIN_ROOT": str(self.modules["ec"] / "zephyr"),
- },
- )
- | super().get_build_config()
- )
-
-
-class ZephyrToolchain(GenericToolchain):
- def __init__(self, *args, **kwargs):
- self.zephyr_sdk_install_dir = self._find_zephyr_sdk()
- super().__init__(*args, **kwargs)
-
- @staticmethod
- def _find_zephyr_sdk():
- """Find the Zephyr SDK, if it's installed.
-
- Returns:
- The path to the Zephyr SDK, using the search rules defined by
- https://docs.zephyrproject.org/latest/getting_started/installation_linux.html,
- or None, if one cannot be found on the system.
- """
- from_env = os.getenv("ZEPHYR_SDK_INSTALL_DIR")
- if from_env:
- return pathlib.Path(from_env)
-
- def _gen_sdk_paths():
- for prefix in (
- "~",
- "~/.local",
- "~/.local/opt",
- "~/bin",
- "/opt",
- "/usr",
- "/usr/local",
- ):
- prefix = pathlib.Path(os.path.expanduser(prefix))
- yield prefix / "zephyr-sdk"
- yield from prefix.glob("zephyr-sdk-*")
-
- for path in _gen_sdk_paths():
- if (path / "sdk_version").is_file():
- return path
-
- return None
-
- def probe(self):
- return bool(self.zephyr_sdk_install_dir)
-
- def get_build_config(self):
- assert self.zephyr_sdk_install_dir
- tc_vars = {
- "ZEPHYR_SDK_INSTALL_DIR": str(self.zephyr_sdk_install_dir),
- }
- return (
- build_config.BuildConfig(
- environ_defs=tc_vars,
- cmake_defs=tc_vars,
- )
- | super().get_build_config()
- )
-
-
-class LlvmToolchain(GenericToolchain):
- def probe(self):
- # TODO: differentiate chroot llvm path vs. something more
- # generic?
- return pathlib.Path("/usr/bin/x86_64-pc-linux-gnu-clang").exists()
-
- def get_build_config(self):
- # TODO: this contains custom settings for the chroot. Plumb a
- # toolchain for "generic-llvm" for external uses?
- return (
- build_config.BuildConfig(
- cmake_defs={
- "TOOLCHAIN_ROOT": str(self.modules["ec"] / "zephyr"),
- },
- )
- | super().get_build_config()
- )
-
-
-class HostToolchain(GenericToolchain):
- def probe(self):
- # "host" toolchain for Zephyr means GCC.
- for search_path in os.getenv("PATH", "/usr/bin").split(":"):
- if (pathlib.Path(search_path) / "gcc").exists():
- return True
- return False
-
-
-# Mapping of toolchain names -> support class
-support_classes = {
- "coreboot-sdk": CorebootSdkToolchain,
- "host": HostToolchain,
- "llvm": LlvmToolchain,
- "zephyr": ZephyrToolchain,
-}
diff --git a/zephyr/zmake/zmake/util.py b/zephyr/zmake/zmake/util.py
deleted file mode 100644
index 455cb7c9d6..0000000000
--- a/zephyr/zmake/zmake/util.py
+++ /dev/null
@@ -1,255 +0,0 @@
-# Copyright 2020 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-"""Common miscellaneous utility functions for zmake."""
-
-import os
-import pathlib
-import re
-import shlex
-
-
-def c_str(input_str):
- """Make a string that can be included as a literal in C source code.
-
- Args:
- input_str: The string to process.
-
- Returns:
- A string which can be included in C source code.
- """
-
- def c_chr(char):
- # Convert a char in a string to the C representation. Per the
- # C standard, we can use all characters but quote, newline,
- # and backslash directly with no replacements.
- return {
- '"': r"\"",
- "\n": r"\n",
- "\\": "\\\\",
- }.get(char, char)
-
- return '"{}"'.format("".join(map(c_chr, input_str)))
-
-
-def locate_cros_checkout():
- """Find the path to the ChromiumOS checkout.
-
- Returns:
- The first directory found with a .repo directory in it,
- starting by checking the CROS_WORKON_SRCROOT environment
- variable, then scanning upwards from the current directory,
- and finally from a known set of common paths.
- """
-
- def propose_checkouts():
- yield os.getenv("CROS_WORKON_SRCROOT")
-
- path = pathlib.Path.cwd()
- while path.resolve() != pathlib.Path("/"):
- yield path
- path = path / ".."
-
- yield "/mnt/host/source"
- yield pathlib.Path.home() / "trunk"
- yield pathlib.Path.home() / "chromiumos"
-
- for path in propose_checkouts():
- if not path:
- continue
- path = pathlib.Path(path)
- if (path / ".repo").is_dir():
- return path.resolve()
-
- raise FileNotFoundError("Unable to locate a ChromiumOS checkout")
-
-
-def locate_zephyr_base(checkout, version):
- """Locate the path to the Zephyr RTOS in a ChromiumOS checkout.
-
- Args:
- checkout: The path to the ChromiumOS checkout.
- version: The requested zephyr version, as a tuple of integers.
-
- Returns:
- The path to the Zephyr source.
- """
- return (
- checkout
- / "src"
- / "third_party"
- / "zephyr"
- / "main"
- / "v{}.{}".format(*version[:2])
- )
-
-
-def read_kconfig_file(path):
- """Parse a Kconfig file.
-
- Args:
- path: The path to open.
-
- Returns:
- A dictionary of kconfig items to their values.
- """
- result = {}
- with open(path) as f:
- for line in f:
- line, _, _ = line.partition("#")
- line = line.strip()
- if line:
- name, _, value = line.partition("=")
- result[name.strip()] = value.strip()
- return result
-
-
-def read_kconfig_autoconf_value(path, key):
- """Parse an autoconf.h file for a resolved kconfig value
-
- Args:
- path: The path to the autoconf.h file.
- key: The define key to lookup.
-
- Returns:
- The value associated with the key or nothing if the key wasn't found.
- """
- prog = re.compile(r"^#define\s{}\s(\S+)$".format(key))
- with open(path / "autoconf.h") as f:
- for line in f:
- m = prog.match(line)
- if m:
- return m.group(1)
-
-
-def write_kconfig_file(path, config, only_if_changed=True):
- """Write out a dictionary to Kconfig format.
-
- Args:
- path: The path to write to.
- config: The dictionary to write.
- only_if_changed: Set to True if the file should not be written
- unless it has changed.
- """
- if only_if_changed:
- if path.exists() and read_kconfig_file(path) == config:
- return
- with open(path, "w") as f:
- for name, value in config.items():
- f.write("{}={}\n".format(name, value))
-
-
-def parse_zephyr_version(version_string):
- """Parse a human-readable version string (e.g., "v2.4") as a tuple.
-
- Args:
- version_string: The human-readable version string.
-
- Returns:
- A 2-tuple or 3-tuple of integers representing the version.
- """
- match = re.fullmatch(r"v?(\d+)[._](\d+)(?:[._](\d+))?", version_string)
- if not match:
- raise ValueError(
- "{} does not look like a Zephyr version.".format(version_string)
- )
- return tuple(int(x) for x in match.groups() if x is not None)
-
-
-def read_zephyr_version(zephyr_base):
- """Read the Zephyr version from a Zephyr OS checkout.
-
- Args:
- zephyr_base: path to the Zephyr OS repository.
-
- Returns:
- A 3-tuple of the version number (major, minor, patchset).
- """
- version_file = pathlib.Path(zephyr_base) / "VERSION"
-
- file_vars = {}
- with open(version_file) as f:
- for line in f:
- key, sep, value = line.partition("=")
- file_vars[key.strip()] = value.strip()
-
- return (
- int(file_vars["VERSION_MAJOR"]),
- int(file_vars["VERSION_MINOR"]),
- int(file_vars["PATCHLEVEL"]),
- )
-
-
-def repr_command(argv):
- """Represent an argument array as a string.
-
- Args:
- argv: The arguments of the command.
-
- Returns:
- A string which could be pasted into a shell for execution.
- """
- return " ".join(shlex.quote(str(arg)) for arg in argv)
-
-
-def update_symlink(target_path, link_path):
- """Create a symlink if it does not exist, or links to a different path.
-
- Args:
- target_path: A Path-like object of the desired symlink path.
- link_path: A Path-like object of the symlink.
- """
- target = target_path.resolve()
- if (
- not link_path.is_symlink()
- or pathlib.Path(os.readlink(link_path)).resolve() != target
- ):
- if link_path.exists():
- link_path.unlink()
- link_path.symlink_to(target)
-
-
-def log_multi_line(logger, level, message):
- """Log a potentially multi-line message to the logger.
-
- Args:
- logger: The Logger object to log to.
- level: The logging level to use when logging.
- message: The (potentially) multi-line message to log.
- """
- for line in message.splitlines():
- if line:
- logger.log(level, line)
-
-
-def resolve_build_dir(platform_ec_dir, project_dir, build_dir):
- """Resolve the build directory using platform/ec/build/... as default.
-
- Args:
- platform_ec_dir: The path to the chromiumos source's platform/ec
- directory.
- project_dir: The directory of the project.
- build_dir: The directory to build in (may be None).
- Returns:
- The resolved build directory (using build_dir if not None).
- """
- if build_dir:
- return build_dir
-
- if not pathlib.Path.exists(project_dir / "zmake.yaml"):
- raise OSError("Invalid configuration")
-
- # Resolve project_dir to absolute path.
- project_dir = project_dir.resolve()
-
- # Compute the path of project_dir relative to platform_ec_dir.
- project_relative_path = pathlib.Path.relative_to(project_dir, platform_ec_dir)
-
- # Make sure that the project_dir is a subdirectory of platform_ec_dir.
- if platform_ec_dir / project_relative_path != project_dir:
- raise OSError(
- "Can't resolve project directory {} which is not a subdirectory"
- " of the platform/ec directory {}".format(project_dir, platform_ec_dir)
- )
-
- return platform_ec_dir / "build" / project_relative_path
diff --git a/zephyr/zmake/zmake/version.py b/zephyr/zmake/zmake/version.py
deleted file mode 100644
index 47aba6d804..0000000000
--- a/zephyr/zmake/zmake/version.py
+++ /dev/null
@@ -1,166 +0,0 @@
-# Copyright 2021 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-import datetime
-import getpass
-import io
-import os
-import platform
-import subprocess
-
-import zmake.util as util
-
-
-def _get_num_commits(repo):
- """Get the number of commits that have been made.
-
- If a Git repository is available, return the number of commits that have
- been made. Otherwise return a fixed count.
-
- Args:
- repo: The path to the git repo.
-
- Returns:
- An integer, the number of commits that have been made.
- """
- try:
- result = subprocess.run(
- ["git", "-C", repo, "rev-list", "HEAD", "--count"],
- check=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.DEVNULL,
- encoding="utf-8",
- )
- except subprocess.CalledProcessError:
- commits = "9999"
- else:
- commits = result.stdout
-
- return int(commits)
-
-
-def _get_revision(repo):
- """Get the current revision hash.
-
- If a Git repository is available, return the hash of the current index.
- Otherwise return the hash of the VCSID environment variable provided by
- the packaging system.
-
- Args:
- repo: The path to the git repo.
-
- Returns:
- A string, of the current revision.
- """
- try:
- result = subprocess.run(
- ["git", "-C", repo, "log", "-n1", "--format=%H"],
- check=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.DEVNULL,
- encoding="utf-8",
- )
- except subprocess.CalledProcessError:
- # Fall back to the VCSID provided by the packaging system.
- # Format is 0.0.1-r425-032666c418782c14fe912ba6d9f98ffdf0b941e9 for
- # releases and 9999-032666c418782c14fe912ba6d9f98ffdf0b941e9 for
- # 9999 ebuilds.
- vcsid = os.environ.get("VCSID", "9999-unknown")
- revision = vcsid.rsplit("-", 1)[1]
- else:
- revision = result.stdout
-
- return revision
-
-
-def get_version_string(project, zephyr_base, modules, static=False):
- """Get the version string associated with a build.
-
- Args:
- project: a zmake.project.Project object
- zephyr_base: the path to the zephyr directory
- modules: a dictionary mapping module names to module paths
- static: if set, create a version string not dependent on git
- commits, thus allowing binaries to be compared between two
- commits.
-
- Returns:
- A version string which can be placed in FRID, FWID, or used in
- the build for the OS.
- """
- major_version, minor_version, *_ = util.read_zephyr_version(zephyr_base)
- project_id = project.project_dir.parts[-1]
- num_commits = 0
-
- if static:
- vcs_hashes = "STATIC"
- else:
- repos = {
- "os": zephyr_base,
- **modules,
- }
-
- for repo in repos.values():
- num_commits += _get_num_commits(repo)
-
- vcs_hashes = ",".join(
- "{}:{}".format(name, _get_revision(repo)[:6])
- for name, repo in sorted(
- repos.items(),
- # Put the EC module first, then Zephyr OS kernel, as
- # these are probably the most important hashes to
- # developers.
- key=lambda p: (p[0] != "ec", p[0] != "os", p),
- )
- )
-
- return "{}_v{}.{}.{}-{}".format(
- project_id, major_version, minor_version, num_commits, vcs_hashes
- )
-
-
-def write_version_header(version_str, output_path, static=False):
- """Generate a version header and write it to the specified path.
-
- Generate a version header in the format expected by the EC build
- system, and write it out only if the version header does not exist
- or changes. We don't write in the case that the version header
- does exist and was unchanged, which allows "zmake build" commands
- on an unchanged tree to be an effective no-op.
-
- Args:
- version_str: The version string to be used in the header, such
- as one generated by get_version_string.
- output_path: The file path to write at (a pathlib.Path
- object).
- static: If true, generate a header which does not include
- information like the username, hostname, or date, allowing
- the build to be reproducible.
- """
- output = io.StringIO()
- output.write("/* This file is automatically generated by zmake */\n")
-
- def add_def(name, value):
- output.write("#define {} {}\n".format(name, util.c_str(value)))
-
- def add_def_unquoted(name, value):
- output.write("#define {} {}\n".format(name, value))
-
- add_def("VERSION", version_str)
- add_def("CROS_EC_VERSION32", version_str[:31])
-
- if static:
- add_def("BUILDER", "reproducible@build")
- add_def("DATE", "STATIC_VERSION_DATE")
- else:
- add_def("BUILDER", "{}@{}".format(getpass.getuser(), platform.node()))
- add_def("DATE", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
-
- add_def("CROS_FWID_MISSING_STR", "CROS_FWID_MISSING")
- # TODO(b/198475757): Add zmake support for getting CROS_FWID32
- add_def_unquoted("CROS_FWID32", "CROS_FWID_MISSING_STR")
-
- contents = output.getvalue()
- if not output_path.exists() or output_path.read_text() != contents:
- output_path.write_text(contents)
diff --git a/zephyr/zmake/zmake/zmake.py b/zephyr/zmake/zmake/zmake.py
deleted file mode 100644
index 1b60e66f66..0000000000
--- a/zephyr/zmake/zmake/zmake.py
+++ /dev/null
@@ -1,757 +0,0 @@
-# Copyright 2020 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Module encapsulating Zmake wrapper object."""
-import logging
-import os
-import pathlib
-import re
-import shutil
-import subprocess
-import tempfile
-
-import zmake.build_config
-import zmake.jobserver
-import zmake.modules
-import zmake.multiproc
-import zmake.project
-import zmake.util as util
-import zmake.version
-
-ninja_warnings = re.compile(r"^(\S*: )?warning:.*")
-ninja_errors = re.compile(r"error:.*")
-
-
-def ninja_stdout_log_level_override(line, current_log_level):
- """Update the log level for ninja builds if we hit an error.
-
- Ninja builds prints everything to stdout, but really we want to start
- logging things to CRITICAL
-
- Args:
- line: The line that is about to be logged.
- current_log_level: The active logging level that would be used for the
- line.
- """
- # Output lines from Zephyr that are not normally useful
- # Send any lines that start with these strings to INFO
- cmake_suppress = [
- "-- ", # device tree messages
- "Loaded configuration",
- "Including boilerplate",
- "Parsing ",
- "No change to configuration",
- "No change to Kconfig header",
- ]
-
- # Herewith a long list of things which are really for debugging, not
- # development. Return logging.DEBUG for each of these.
-
- # ninja puts progress information on stdout
- if line.startswith("["):
- return logging.DEBUG
- # we don't care about entering directories since it happens every time
- if line.startswith("ninja: Entering directory"):
- return logging.DEBUG
- # we know the build stops from the compiler messages and ninja return code
- if line.startswith("ninja: build stopped"):
- return logging.DEBUG
- # someone prints a *** SUCCESS *** message which we don't need
- if line.startswith("***"):
- return logging.DEBUG
- # dopey ninja puts errors on stdout, so fix that. It does not look
- # likely that it will be fixed upstream:
- # https://github.com/ninja-build/ninja/issues/1537
- # Try to drop output about the device tree
- if any(line.startswith(x) for x in cmake_suppress):
- return logging.INFO
- # this message is a bit like make failing. We already got the error output.
- if line.startswith("FAILED: CMakeFiles"):
- return logging.INFO
- # if a particular file fails it shows the build line used, but that is not
- # useful except for debugging.
- if line.startswith("ccache"):
- return logging.DEBUG
- if ninja_warnings.match(line):
- return logging.WARNING
- if ninja_errors.match(line):
- return logging.ERROR
- # When we see "Memory region" go into INFO, and stay there as long as the
- # line starts with \S+:
- if line.startswith("Memory region"):
- return logging.INFO
- if current_log_level == logging.INFO and line.split()[0].endswith(":"):
- return current_log_level
- if current_log_level == logging.WARNING:
- return current_log_level
- return logging.ERROR
-
-
-def cmake_log_level_override(line, default_log_level):
- """Update the log level for cmake output if we hit an error.
-
- Cmake prints some messages that are less than useful during
- development.
-
- Args:
- line: The line that is about to be logged.
- default_log_level: The default logging level that will be used for the
- line.
- """
- # Strange output from Zephyr that we normally ignore
- if line.startswith("Including boilerplate"):
- return logging.DEBUG
- elif line.startswith("devicetree error:"):
- return logging.ERROR
- if ninja_warnings.match(line):
- return logging.WARNING
- if ninja_errors.match(line):
- return logging.ERROR
- return default_log_level
-
-
-def get_process_failure_msg(proc):
- """Creates a suitable failure message if something exits badly
-
- Args:
- proc: subprocess.Popen object containing the thing that failed
-
- Returns:
- Failure message as a string:
- """
- return "Execution failed (return code={}): {}\n".format(
- proc.returncode, util.repr_command(proc.args)
- )
-
-
-class Zmake:
- """Wrapper class encapsulating zmake's supported operations.
-
- The invocations of the constructor and the methods actually comes
- from the main function. The command line arguments are translated
- such that dashes are replaced with underscores and applied as
- keyword arguments to the constructor and the method, and the
- subcommand invoked becomes the method run.
-
- As such, you won't find documentation for each method's parameters
- here, as it would be duplicate of the help strings from the
- command line. Run "zmake --help" for full documentation of each
- parameter.
-
- Properties:
- executor: a zmake.multiproc.Executor object for submitting
- tasks to.
- _sequential: True to check the results of each build job sequentially,
- before launching more, False to just do this after all jobs complete
- """
-
- def __init__(
- self, checkout=None, jobserver=None, jobs=0, modules_dir=None, zephyr_base=None
- ):
- zmake.multiproc.reset()
- self._checkout = checkout
- self._zephyr_base = zephyr_base
-
- if modules_dir:
- self.module_paths = zmake.modules.locate_from_directory(modules_dir)
- else:
- self.module_paths = zmake.modules.locate_from_checkout(self.checkout)
-
- if jobserver:
- self.jobserver = jobserver
- else:
- try:
- self.jobserver = zmake.jobserver.GNUMakeJobClient.from_environ()
- except OSError:
- self.jobserver = zmake.jobserver.GNUMakeJobServer(jobs=jobs)
-
- self.logger = logging.getLogger(self.__class__.__name__)
- self.executor = zmake.multiproc.Executor()
- self._sequential = jobs == 1
-
- @property
- def checkout(self):
- if not self._checkout:
- self._checkout = util.locate_cros_checkout()
- return self._checkout.resolve()
-
- def locate_zephyr_base(self, version):
- """Locate the Zephyr OS repository.
-
- Args:
- version: If a Zephyr OS base was not supplied to Zmake,
- which version to search for as a tuple of integers.
- This argument is ignored if a Zephyr base was supplied
- to Zmake.
- Returns:
- A pathlib.Path to the found Zephyr OS repository.
- """
- if self._zephyr_base:
- return self._zephyr_base
-
- return util.locate_zephyr_base(self.checkout, version)
-
- def configure(
- self,
- project_dir,
- build_dir=None,
- toolchain=None,
- ignore_unsupported_zephyr_version=False,
- build_after_configure=False,
- test_after_configure=False,
- bringup=False,
- coverage=False,
- ):
- """Set up a build directory to later be built by "zmake build"."""
- project = zmake.project.Project(project_dir)
- supported_versions = project.config.supported_zephyr_versions
-
- zephyr_base = self.locate_zephyr_base(max(supported_versions)).resolve()
-
- # Ignore the patchset from the Zephyr version.
- zephyr_version = util.read_zephyr_version(zephyr_base)[:2]
-
- if (
- not ignore_unsupported_zephyr_version
- and zephyr_version not in supported_versions
- ):
- raise ValueError(
- "The Zephyr OS version (v{}.{}) is not supported by the "
- "project. You may wish to either configure zmake.yaml to "
- "support this version, or pass "
- "--ignore-unsupported-zephyr-version.".format(*zephyr_version)
- )
-
- # Resolve build_dir if needed.
- build_dir = util.resolve_build_dir(
- platform_ec_dir=self.module_paths["ec"],
- project_dir=project_dir,
- build_dir=build_dir,
- )
- # Make sure the build directory is clean.
- if os.path.exists(build_dir):
- self.logger.info("Clearing old build directory %s", build_dir)
- shutil.rmtree(build_dir)
-
- generated_include_dir = (build_dir / "include").resolve()
- base_config = zmake.build_config.BuildConfig(
- environ_defs={"ZEPHYR_BASE": str(zephyr_base), "PATH": "/usr/bin"},
- cmake_defs={
- "DTS_ROOT": str(self.module_paths["ec"] / "zephyr"),
- "SYSCALL_INCLUDE_DIRS": str(
- self.module_paths["ec"] / "zephyr" / "include" / "drivers"
- ),
- "ZMAKE_INCLUDE_DIR": str(generated_include_dir),
- },
- )
-
- # Prune the module paths to just those required by the project.
- module_paths = project.prune_modules(self.module_paths)
-
- module_config = zmake.modules.setup_module_symlinks(
- build_dir / "modules", module_paths
- )
-
- # Symlink the Zephyr base into the build directory so it can
- # be used in the build phase.
- util.update_symlink(zephyr_base, build_dir / "zephyr_base")
-
- dts_overlay_config = project.find_dts_overlays(module_paths)
-
- toolchain_support = project.get_toolchain(module_paths, override=toolchain)
- toolchain_config = toolchain_support.get_build_config()
-
- if bringup:
- base_config |= zmake.build_config.BuildConfig(
- kconfig_defs={"CONFIG_PLATFORM_EC_BRINGUP": "y"}
- )
- if coverage:
- base_config |= zmake.build_config.BuildConfig(
- kconfig_defs={"CONFIG_COVERAGE": "y"}
- )
-
- if not build_dir.exists():
- build_dir = build_dir.mkdir()
- if not generated_include_dir.exists():
- generated_include_dir.mkdir()
- processes = []
- self.logger.info("Building %s in %s.", project_dir, build_dir)
- for build_name, build_config in project.iter_builds():
- self.logger.info("Configuring %s:%s.", project_dir, build_name)
- config = (
- base_config
- | toolchain_config
- | module_config
- | dts_overlay_config
- | build_config
- )
- output_dir = build_dir / "build-{}".format(build_name)
- kconfig_file = build_dir / "kconfig-{}.conf".format(build_name)
- proc = config.popen_cmake(
- self.jobserver,
- project_dir,
- output_dir,
- kconfig_file,
- stdin=subprocess.DEVNULL,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- encoding="utf-8",
- errors="replace",
- )
- job_id = "{}:{}".format(project_dir, build_name)
- zmake.multiproc.log_output(
- self.logger,
- logging.DEBUG,
- proc.stdout,
- log_level_override_func=cmake_log_level_override,
- job_id=job_id,
- )
- zmake.multiproc.log_output(
- self.logger,
- logging.ERROR,
- proc.stderr,
- log_level_override_func=cmake_log_level_override,
- job_id=job_id,
- )
- if self._sequential:
- if proc.wait():
- raise OSError(get_process_failure_msg(proc))
- else:
- processes.append(proc)
- for proc in processes:
- if proc.wait():
- raise OSError(get_process_failure_msg(proc))
-
- # Create symlink to project
- util.update_symlink(project_dir, build_dir / "project")
-
- if test_after_configure:
- return self.test(build_dir=build_dir)
- elif build_after_configure:
- return self.build(build_dir=build_dir)
-
- def build(self, build_dir, output_files_out=None, fail_on_warnings=False):
- """Build a pre-configured build directory."""
-
- def wait_and_check_success(procs, writers):
- """Wait for processes to complete and check for errors
-
- Args:
- procs: List of subprocess.Popen objects to check
- writers: List of LogWriter objects to check
-
- Returns:
- True if all if OK
- False if an error was found (so that zmake should exit)
- """
- bad = None
- for proc in procs:
- if proc.wait() and not bad:
- bad = proc
- if bad:
- # Just show the first bad process for now. Both builds likely
- # produce the same error anyway. If they don't, the user can
- # still take action on the errors/warnings provided. Showing
- # multiple 'Execution failed' messages is not very friendly
- # since it exposes the fragmented nature of the build.
- raise OSError(get_process_failure_msg(bad))
-
- # Let all output be produced before exiting
- for writer in writers:
- writer.wait()
- if fail_on_warnings and any(
- w.has_written(logging.WARNING) or w.has_written(logging.ERROR)
- for w in writers
- ):
- self.logger.warning("zmake: Warnings detected in build: aborting")
- return False
- return True
-
- procs = []
- log_writers = []
- dirs = {}
-
- build_dir = build_dir.resolve()
- project = zmake.project.Project(build_dir / "project")
-
- # Compute the version string.
- version_string = zmake.version.get_version_string(
- project,
- build_dir / "zephyr_base",
- zmake.modules.locate_from_directory(build_dir / "modules"),
- )
-
- # The version header needs to generated during the build phase
- # instead of configure, as the tree may have changed since
- # configure was run.
- zmake.version.write_version_header(
- version_string,
- build_dir / "include" / "ec_version.h",
- )
-
- for build_name, build_config in project.iter_builds():
- with self.jobserver.get_job():
- dirs[build_name] = build_dir / "build-{}".format(build_name)
- cmd = ["/usr/bin/ninja", "-C", dirs[build_name].as_posix()]
- self.logger.info(
- "Building %s:%s: %s",
- build_dir,
- build_name,
- zmake.util.repr_command(cmd),
- )
- proc = self.jobserver.popen(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- encoding="utf-8",
- errors="replace",
- )
- job_id = "{}:{}".format(build_dir, build_name)
- out = zmake.multiproc.log_output(
- logger=self.logger,
- log_level=logging.INFO,
- file_descriptor=proc.stdout,
- log_level_override_func=ninja_stdout_log_level_override,
- job_id=job_id,
- )
- err = zmake.multiproc.log_output(
- self.logger,
- logging.ERROR,
- proc.stderr,
- job_id=job_id,
- )
-
- if self._sequential:
- if not wait_and_check_success([proc], [out, err]):
- return 2
- else:
- procs.append(proc)
- log_writers += [out, err]
-
- if not wait_and_check_success(procs, log_writers):
- return 2
-
- # Run the packer.
- packer_work_dir = build_dir / "packer"
- output_dir = build_dir / "output"
- for d in output_dir, packer_work_dir:
- if not d.exists():
- d.mkdir()
-
- if output_files_out is None:
- output_files_out = []
- for output_file, output_name in project.packer.pack_firmware(
- packer_work_dir, self.jobserver, version_string=version_string, **dirs
- ):
- shutil.copy2(output_file, output_dir / output_name)
- self.logger.debug("Output file '%s' created.", output_file)
- output_files_out.append(output_file)
-
- return 0
-
- def test(self, build_dir):
- """Test a build directory."""
- procs = []
- output_files = []
- self.build(build_dir, output_files_out=output_files)
-
- # If the project built but isn't a test, just bail.
- project = zmake.project.Project(build_dir / "project")
- if not project.config.is_test:
- return 0
-
- for output_file in output_files:
- self.logger.info("Running tests in %s.", output_file)
- with self.jobserver.get_job():
- proc = self.jobserver.popen(
- [output_file],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- encoding="utf-8",
- errors="replace",
- )
- job_id = "test {}".format(output_file)
- zmake.multiproc.log_output(
- self.logger,
- logging.DEBUG,
- proc.stdout,
- job_id=job_id,
- )
- zmake.multiproc.log_output(
- self.logger,
- logging.ERROR,
- proc.stderr,
- job_id=job_id,
- )
- procs.append(proc)
-
- for idx, proc in enumerate(procs):
- if proc.wait():
- raise OSError(get_process_failure_msg(proc))
- return 0
-
- def testall(self):
- """Test all the valid test targets"""
- tmp_dirs = []
- for project in zmake.project.find_projects(self.module_paths["ec"] / "zephyr"):
- is_test = project.config.is_test
- temp_build_dir = tempfile.mkdtemp(
- suffix="-{}".format(os.path.basename(project.project_dir.as_posix())),
- prefix="zbuild-",
- )
- tmp_dirs.append(temp_build_dir)
- # Configure and run the test.
- self.executor.append(
- func=lambda: self.configure(
- project_dir=project.project_dir,
- build_dir=pathlib.Path(temp_build_dir),
- build_after_configure=True,
- test_after_configure=is_test,
- )
- )
-
- rv = self.executor.wait()
- for tmpdir in tmp_dirs:
- shutil.rmtree(tmpdir)
- return rv
-
- def _run_lcov(self, build_dir, lcov_file, initial=False, gcov=""):
- gcov = os.path.abspath(gcov)
- with self.jobserver.get_job():
- if initial:
- self.logger.info("Running (initial) lcov on %s.", build_dir)
- else:
- self.logger.info("Running lcov on %s.", build_dir)
- cmd = [
- "/usr/bin/lcov",
- "--gcov-tool",
- gcov,
- "-q",
- "-o",
- "-",
- "-c",
- "-d",
- build_dir,
- "-t",
- lcov_file.stem,
- "--exclude",
- "*/build-*/zephyr/*/generated/*",
- "--exclude",
- "*/ec/test/*",
- "--exclude",
- "*/ec/zephyr/shim/chip/npcx/npcx_monitor/*",
- "--exclude",
- "*/ec/zephyr/emul/*",
- "--exclude",
- "*/ec/zephyr/test/*",
- "--exclude",
- "*/testsuite/*",
- "--exclude",
- "*/subsys/emul/*",
- ]
- if initial:
- cmd += ["-i"]
- proc = self.jobserver.popen(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- encoding="utf-8",
- errors="replace",
- )
- zmake.multiproc.log_output(
- self.logger,
- logging.WARNING,
- proc.stderr,
- job_id="{}-lcov".format(build_dir),
- )
-
- with open(lcov_file, "w") as outfile:
- for line in proc.stdout:
- if line.startswith("SF:"):
- path = line[3:].rstrip()
- outfile.write("SF:%s\n" % os.path.realpath(path))
- else:
- outfile.write(line)
- if proc.wait():
- raise OSError(get_process_failure_msg(proc))
-
- return 0
-
- def _coverage_compile_only(self, project, build_dir, lcov_file):
- self.logger.info("Building %s in %s", project.project_dir, build_dir)
- rv = self.configure(
- project_dir=project.project_dir,
- build_dir=build_dir,
- build_after_configure=False,
- test_after_configure=False,
- coverage=True,
- )
- if rv:
- return rv
-
- # Compute the version string.
- version_string = zmake.version.get_version_string(
- project,
- build_dir / "zephyr_base",
- zmake.modules.locate_from_directory(build_dir / "modules"),
- )
-
- # The version header needs to generated during the build phase
- # instead of configure, as the tree may have changed since
- # configure was run.
- zmake.version.write_version_header(
- version_string,
- build_dir / "include" / "ec_version.h",
- )
-
- # Use ninja to compile the all.libraries target.
- build_project = zmake.project.Project(build_dir / "project")
-
- procs = []
- dirs = {}
- gcov = "gcov.sh-not-found"
- for build_name, build_config in build_project.iter_builds():
- self.logger.info("Building %s:%s all.libraries.", build_dir, build_name)
- dirs[build_name] = build_dir / "build-{}".format(build_name)
- gcov = dirs[build_name] / "gcov.sh"
- proc = self.jobserver.popen(
- ["/usr/bin/ninja", "-C", dirs[build_name], "all.libraries"],
- # Ninja will connect as a job client instead and claim
- # many jobs.
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- encoding="utf-8",
- errors="replace",
- )
- job_id = "{}:{}".format(build_dir, build_name)
- zmake.multiproc.log_output(
- logger=self.logger,
- log_level=logging.DEBUG,
- file_descriptor=proc.stdout,
- log_level_override_func=ninja_stdout_log_level_override,
- job_id=job_id,
- )
- zmake.multiproc.log_output(
- self.logger,
- logging.ERROR,
- proc.stderr,
- job_id=job_id,
- )
- if self._sequential:
- if proc.wait():
- raise OSError(get_process_failure_msg(proc))
- else:
- procs.append(proc)
-
- for proc in procs:
- if proc.wait():
- raise OSError(get_process_failure_msg(proc))
-
- return self._run_lcov(build_dir, lcov_file, initial=True, gcov=gcov)
-
- def _coverage_run_test(self, project, build_dir, lcov_file):
- self.logger.info("Running test %s in %s", project.project_dir, build_dir)
- rv = self.configure(
- project_dir=project.project_dir,
- build_dir=build_dir,
- build_after_configure=True,
- test_after_configure=True,
- coverage=True,
- )
- if rv:
- return rv
- gcov = "gcov.sh-not-found"
- for build_name, build_config in project.iter_builds():
- gcov = build_dir / "build-{}".format(build_name) / "gcov.sh"
- return self._run_lcov(build_dir, lcov_file, initial=False, gcov=gcov)
-
- def coverage(self, build_dir):
- """Builds all targets with coverage enabled, and then runs the tests."""
- all_lcov_files = []
- root_dir = self.module_paths["ec"] / "zephyr"
- for project in zmake.project.find_projects(root_dir):
- is_test = project.config.is_test
- rel_path = project.project_dir.relative_to(root_dir)
- project_build_dir = pathlib.Path(build_dir).joinpath(rel_path)
- lcov_file = pathlib.Path(build_dir).joinpath(
- str(rel_path).replace("/", "_") + ".info"
- )
- all_lcov_files.append(lcov_file)
- if is_test:
- # Configure and run the test.
- self.executor.append(
- func=lambda: self._coverage_run_test(
- project, project_build_dir, lcov_file
- )
- )
- else:
- # Configure and compile the non-test project.
- self.executor.append(
- func=lambda: self._coverage_compile_only(
- project, project_build_dir, lcov_file
- )
- )
- if self._sequential:
- rv = self.executor.wait()
- if rv:
- return rv
-
- rv = self.executor.wait()
- if rv:
- return rv
-
- with self.jobserver.get_job():
- # Merge info files into a single lcov.info
- self.logger.info("Merging coverage data into %s.", build_dir / "lcov.info")
- cmd = ["/usr/bin/lcov", "-o", build_dir / "lcov.info"]
- for info in all_lcov_files:
- cmd += ["-a", info]
- proc = self.jobserver.popen(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- encoding="utf-8",
- errors="replace",
- )
- zmake.multiproc.log_output(
- self.logger, logging.ERROR, proc.stderr, job_id="lcov"
- )
- zmake.multiproc.log_output(
- self.logger, logging.DEBUG, proc.stdout, job_id="lcov"
- )
- if proc.wait():
- raise OSError(get_process_failure_msg(proc))
-
- # Find the common root dir
- prefixdir = os.path.commonprefix(list(self.module_paths.values()))
-
- # Merge into a nice html report
- self.logger.info("Creating coverage report %s.", build_dir / "coverage_rpt")
- proc = self.jobserver.popen(
- [
- "/usr/bin/genhtml",
- "-q",
- "-o",
- build_dir / "coverage_rpt",
- "-t",
- "Zephyr EC Unittest",
- "-p",
- prefixdir,
- "-s",
- ]
- + all_lcov_files,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- encoding="utf-8",
- errors="replace",
- )
- zmake.multiproc.log_output(
- self.logger, logging.ERROR, proc.stderr, job_id="genhtml"
- )
- zmake.multiproc.log_output(
- self.logger, logging.DEBUG, proc.stdout, job_id="genhtml"
- )
- if proc.wait():
- raise OSError(get_process_failure_msg(proc))
- return 0