summaryrefslogtreecommitdiff
path: root/test/run_device_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/run_device_tests.py')
-rwxr-xr-xtest/run_device_tests.py801
1 files changed, 459 insertions, 342 deletions
diff --git a/test/run_device_tests.py b/test/run_device_tests.py
index 337e4e6693..9b762fc1d6 100755
--- a/test/run_device_tests.py
+++ b/test/run_device_tests.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
-# Copyright 2020 The Chromium OS Authors. All rights reserved.
+# Copyright 2020 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
@@ -49,118 +49,132 @@ import subprocess
import sys
import time
from concurrent.futures.thread import ThreadPoolExecutor
+from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
-from typing import Optional, BinaryIO, List
+from typing import BinaryIO, Dict, List, Optional
# pylint: disable=import-error
import colorama # type: ignore[import]
-from contextlib2 import ExitStack
import fmap
+from contextlib2 import ExitStack
+
# pylint: enable=import-error
EC_DIR = Path(os.path.dirname(os.path.realpath(__file__))).parent
-JTRACE_FLASH_SCRIPT = os.path.join(EC_DIR, 'util/flash_jlink.py')
-SERVO_MICRO_FLASH_SCRIPT = os.path.join(EC_DIR, 'util/flash_ec')
+JTRACE_FLASH_SCRIPT = os.path.join(EC_DIR, "util/flash_jlink.py")
+SERVO_MICRO_FLASH_SCRIPT = os.path.join(EC_DIR, "util/flash_ec")
-ALL_TESTS_PASSED_REGEX = re.compile(r'Pass!\r\n')
-ALL_TESTS_FAILED_REGEX = re.compile(r'Fail! \(\d+ tests\)\r\n')
+ALL_TESTS_PASSED_REGEX = re.compile(r"Pass!\r\n")
+ALL_TESTS_FAILED_REGEX = re.compile(r"Fail! \(\d+ tests\)\r\n")
-SINGLE_CHECK_PASSED_REGEX = re.compile(r'Pass: .*')
-SINGLE_CHECK_FAILED_REGEX = re.compile(r'.*failed:.*')
+SINGLE_CHECK_PASSED_REGEX = re.compile(r"Pass: .*")
+SINGLE_CHECK_FAILED_REGEX = re.compile(r".*failed:.*")
-ASSERTION_FAILURE_REGEX = re.compile(r'ASSERTION FAILURE.*')
+ASSERTION_FAILURE_REGEX = re.compile(r"ASSERTION FAILURE.*")
DATA_ACCESS_VIOLATION_8020000_REGEX = re.compile(
- r'Data access violation, mfar = 8020000\r\n')
+ r"Data access violation, mfar = 8020000\r\n"
+)
DATA_ACCESS_VIOLATION_8040000_REGEX = re.compile(
- r'Data access violation, mfar = 8040000\r\n')
+ r"Data access violation, mfar = 8040000\r\n"
+)
DATA_ACCESS_VIOLATION_80C0000_REGEX = re.compile(
- r'Data access violation, mfar = 80c0000\r\n')
+ r"Data access violation, mfar = 80c0000\r\n"
+)
DATA_ACCESS_VIOLATION_80E0000_REGEX = re.compile(
- r'Data access violation, mfar = 80e0000\r\n')
+ r"Data access violation, mfar = 80e0000\r\n"
+)
DATA_ACCESS_VIOLATION_20000000_REGEX = re.compile(
- r'Data access violation, mfar = 20000000\r\n')
+ r"Data access violation, mfar = 20000000\r\n"
+)
DATA_ACCESS_VIOLATION_24000000_REGEX = re.compile(
- r'Data access violation, mfar = 24000000\r\n')
+ r"Data access violation, mfar = 24000000\r\n"
+)
-BLOONCHIPPER = 'bloonchipper'
-DARTMONKEY = 'dartmonkey'
+BLOONCHIPPER = "bloonchipper"
+DARTMONKEY = "dartmonkey"
-JTRACE = 'jtrace'
-SERVO_MICRO = 'servo_micro'
+JTRACE = "jtrace"
+SERVO_MICRO = "servo_micro"
-GCC = 'gcc'
-CLANG = 'clang'
+GCC = "gcc"
+CLANG = "clang"
-TEST_ASSETS_BUCKET = 'gs://chromiumos-test-assets-public/fpmcu/RO'
+TEST_ASSETS_BUCKET = "gs://chromiumos-test-assets-public/fpmcu/RO"
DARTMONKEY_IMAGE_PATH = os.path.join(
- TEST_ASSETS_BUCKET, 'dartmonkey_v2.0.2887-311310808.bin')
+ TEST_ASSETS_BUCKET, "dartmonkey_v2.0.2887-311310808.bin"
+)
NOCTURNE_FP_IMAGE_PATH = os.path.join(
- TEST_ASSETS_BUCKET, 'nocturne_fp_v2.2.64-58cf5974e.bin')
+ TEST_ASSETS_BUCKET, "nocturne_fp_v2.2.64-58cf5974e.bin"
+)
NAMI_FP_IMAGE_PATH = os.path.join(
- TEST_ASSETS_BUCKET, 'nami_fp_v2.2.144-7a08e07eb.bin')
+ TEST_ASSETS_BUCKET, "nami_fp_v2.2.144-7a08e07eb.bin"
+)
BLOONCHIPPER_V4277_IMAGE_PATH = os.path.join(
- TEST_ASSETS_BUCKET, 'bloonchipper_v2.0.4277-9f652bb3.bin')
+ TEST_ASSETS_BUCKET, "bloonchipper_v2.0.4277-9f652bb3.bin"
+)
BLOONCHIPPER_V5938_IMAGE_PATH = os.path.join(
- TEST_ASSETS_BUCKET, 'bloonchipper_v2.0.5938-197506c1.bin')
+ TEST_ASSETS_BUCKET, "bloonchipper_v2.0.5938-197506c1.bin"
+)
class ImageType(Enum):
"""EC Image type to use for the test."""
+
RO = 1
RW = 2
+@dataclass
class BoardConfig:
"""Board-specific configuration."""
- def __init__(self, name, servo_uart_name, servo_power_enable,
- rollback_region0_regex, rollback_region1_regex, mpu_regex,
- variants):
- self.name = name
- self.servo_uart_name = servo_uart_name
- self.servo_power_enable = servo_power_enable
- self.rollback_region0_regex = rollback_region0_regex
- self.rollback_region1_regex = rollback_region1_regex
- self.mpu_regex = mpu_regex
- self.variants = variants
+ name: str
+ servo_uart_name: str
+ servo_power_enable: str
+ rollback_region0_regex: object
+ rollback_region1_regex: object
+ mpu_regex: object
+ variants: Dict
+@dataclass
class TestConfig:
"""Configuration for a given test."""
- def __init__(self, test_name, image_to_use=ImageType.RW,
- finish_regexes=None, fail_regexes=None, toggle_power=False,
- test_args=None, num_flash_attempts=2, timeout_secs=10,
- enable_hw_write_protect=False, ro_image=None, build_board=None,
- config_name=None):
- if test_args is None:
- test_args = []
- if finish_regexes is None:
- finish_regexes = [ALL_TESTS_PASSED_REGEX, ALL_TESTS_FAILED_REGEX]
- if fail_regexes is None:
- fail_regexes = [SINGLE_CHECK_FAILED_REGEX, ALL_TESTS_FAILED_REGEX,
- ASSERTION_FAILURE_REGEX]
- if config_name is None:
- config_name = test_name
-
- self.test_name = test_name
- self.config_name = config_name
- self.image_to_use = image_to_use
- self.finish_regexes = finish_regexes
- self.fail_regexes = fail_regexes
- self.test_args = test_args
- self.toggle_power = toggle_power
- self.num_flash_attempts = num_flash_attempts
- self.timeout_secs = timeout_secs
- self.enable_hw_write_protect = enable_hw_write_protect
- self.logs = []
- self.passed = False
- self.num_fails = 0
- self.num_passes = 0
- self.ro_image = ro_image
- self.build_board = build_board
+ # pylint: disable=too-many-instance-attributes
+ test_name: str
+ image_to_use: ImageType = ImageType.RW
+ finish_regexes: List = None
+ fail_regexes: List = None
+ toggle_power: bool = False
+ test_args: List[str] = field(default_factory=list)
+ num_flash_attempts: int = 2
+ timeout_secs: int = 10
+ enable_hw_write_protect: bool = False
+ ro_image: str = None
+ build_board: str = None
+ config_name: str = None
+ logs: List = field(init=False, default_factory=list)
+ passed: bool = field(init=False, default=False)
+ num_passes: int = field(init=False, default=0)
+ num_fails: int = field(init=False, default=0)
+
+ def __post_init__(self):
+ if self.finish_regexes is None:
+ self.finish_regexes = [
+ ALL_TESTS_PASSED_REGEX,
+ ALL_TESTS_FAILED_REGEX,
+ ]
+ if self.fail_regexes is None:
+ self.fail_regexes = [
+ SINGLE_CHECK_FAILED_REGEX,
+ ALL_TESTS_FAILED_REGEX,
+ ASSERTION_FAILURE_REGEX,
+ ]
+ if self.config_name is None:
+ self.config_name = self.test_name
# All possible tests.
@@ -169,6 +183,7 @@ class AllTests:
@staticmethod
def get(board_config: BoardConfig) -> List[TestConfig]:
+ """Return public and private test configs for the specified board."""
public_tests = AllTests.get_public_tests(board_config)
private_tests = AllTests.get_private_tests()
@@ -176,146 +191,192 @@ class AllTests:
@staticmethod
def get_public_tests(board_config: BoardConfig) -> List[TestConfig]:
+ """Return public test configs for the specified board."""
tests = [
- TestConfig(test_name='aes'),
- TestConfig(test_name='cec'),
- TestConfig(test_name='cortexm_fpu'),
- TestConfig(test_name='crc'),
- TestConfig(test_name='flash_physical', image_to_use=ImageType.RO,
- toggle_power=True),
- TestConfig(test_name='flash_write_protect',
- image_to_use=ImageType.RO,
- toggle_power=True, enable_hw_write_protect=True),
- TestConfig(test_name='fpsensor_hw'),
- TestConfig(config_name='fpsensor_spi_ro', test_name='fpsensor',
- image_to_use=ImageType.RO, test_args=['spi']),
- TestConfig(config_name='fpsensor_spi_rw', test_name='fpsensor',
- test_args=['spi']),
- TestConfig(config_name='fpsensor_uart_ro', test_name='fpsensor',
- image_to_use=ImageType.RO, test_args=['uart']),
- TestConfig(config_name='fpsensor_uart_rw', test_name='fpsensor',
- test_args=['uart']),
- TestConfig(config_name='mpu_ro', test_name='mpu',
- image_to_use=ImageType.RO,
- finish_regexes=[board_config.mpu_regex]),
- TestConfig(config_name='mpu_rw', test_name='mpu',
- finish_regexes=[board_config.mpu_regex]),
- TestConfig(test_name='mutex'),
- TestConfig(test_name='pingpong'),
- TestConfig(test_name='printf'),
- TestConfig(test_name='queue'),
- TestConfig(config_name='rollback_region0', test_name='rollback',
- finish_regexes=[board_config.rollback_region0_regex],
- test_args=['region0']),
- TestConfig(config_name='rollback_region1', test_name='rollback',
- finish_regexes=[board_config.rollback_region1_regex],
- test_args=['region1']),
- TestConfig(test_name='rollback_entropy', image_to_use=ImageType.RO),
- TestConfig(test_name='rtc'),
- TestConfig(test_name='sha256'),
- TestConfig(test_name='sha256_unrolled'),
- TestConfig(test_name='static_if'),
- TestConfig(config_name='system_is_locked_wp_on',
- test_name='system_is_locked', test_args=['wp_on'],
- toggle_power=True, enable_hw_write_protect=True),
- TestConfig(config_name='system_is_locked_wp_off',
- test_name='system_is_locked', test_args=['wp_off'],
- toggle_power=True, enable_hw_write_protect=False),
- TestConfig(test_name='timer_dos'),
- TestConfig(test_name='utils', timeout_secs=20),
- TestConfig(test_name='utils_str'),
+ TestConfig(test_name="aes"),
+ TestConfig(test_name="cec"),
+ TestConfig(test_name="cortexm_fpu"),
+ TestConfig(test_name="crc"),
+ TestConfig(
+ test_name="flash_physical",
+ image_to_use=ImageType.RO,
+ toggle_power=True,
+ ),
+ TestConfig(
+ test_name="flash_write_protect",
+ image_to_use=ImageType.RO,
+ toggle_power=True,
+ enable_hw_write_protect=True,
+ ),
+ TestConfig(test_name="fpsensor_hw"),
+ TestConfig(
+ config_name="fpsensor_spi_ro",
+ test_name="fpsensor",
+ image_to_use=ImageType.RO,
+ test_args=["spi"],
+ ),
+ TestConfig(
+ config_name="fpsensor_spi_rw",
+ test_name="fpsensor",
+ test_args=["spi"],
+ ),
+ TestConfig(
+ config_name="fpsensor_uart_ro",
+ test_name="fpsensor",
+ image_to_use=ImageType.RO,
+ test_args=["uart"],
+ ),
+ TestConfig(
+ config_name="fpsensor_uart_rw",
+ test_name="fpsensor",
+ test_args=["uart"],
+ ),
+ TestConfig(
+ config_name="mpu_ro",
+ test_name="mpu",
+ image_to_use=ImageType.RO,
+ finish_regexes=[board_config.mpu_regex],
+ ),
+ TestConfig(
+ config_name="mpu_rw",
+ test_name="mpu",
+ finish_regexes=[board_config.mpu_regex],
+ ),
+ TestConfig(test_name="mutex"),
+ TestConfig(test_name="pingpong"),
+ TestConfig(test_name="printf"),
+ TestConfig(test_name="queue"),
+ TestConfig(
+ config_name="rollback_region0",
+ test_name="rollback",
+ finish_regexes=[board_config.rollback_region0_regex],
+ test_args=["region0"],
+ ),
+ TestConfig(
+ config_name="rollback_region1",
+ test_name="rollback",
+ finish_regexes=[board_config.rollback_region1_regex],
+ test_args=["region1"],
+ ),
+ TestConfig(test_name="rollback_entropy", image_to_use=ImageType.RO),
+ TestConfig(test_name="rtc"),
+ TestConfig(test_name="sha256"),
+ TestConfig(test_name="sha256_unrolled"),
+ TestConfig(test_name="static_if"),
+ TestConfig(test_name="stdlib"),
+ TestConfig(
+ config_name="system_is_locked_wp_on",
+ test_name="system_is_locked",
+ test_args=["wp_on"],
+ toggle_power=True,
+ enable_hw_write_protect=True,
+ ),
+ TestConfig(
+ config_name="system_is_locked_wp_off",
+ test_name="system_is_locked",
+ test_args=["wp_off"],
+ toggle_power=True,
+ enable_hw_write_protect=False,
+ ),
+ TestConfig(test_name="timer_dos"),
+ TestConfig(test_name="utils", timeout_secs=20),
+ TestConfig(test_name="utils_str"),
]
if board_config.name == BLOONCHIPPER:
- tests.append(TestConfig(test_name='stm32f_rtc'))
+ tests.append(TestConfig(test_name="stm32f_rtc"))
# Run panic data tests for all boards and RO versions.
for variant_name, variant_info in board_config.variants.items():
tests.append(
- TestConfig(config_name='panic_data_' + variant_name,
- test_name='panic_data',
- fail_regexes=[SINGLE_CHECK_FAILED_REGEX,
- ALL_TESTS_FAILED_REGEX],
- ro_image=variant_info.get('ro_image_path'),
- build_board=variant_info.get('build_board')))
+ TestConfig(
+ config_name="panic_data_" + variant_name,
+ test_name="panic_data",
+ fail_regexes=[
+ SINGLE_CHECK_FAILED_REGEX,
+ ALL_TESTS_FAILED_REGEX,
+ ],
+ ro_image=variant_info.get("ro_image_path"),
+ build_board=variant_info.get("build_board"),
+ )
+ )
return tests
@staticmethod
def get_private_tests() -> List[TestConfig]:
- # Return all private tests, if the folder exists
+ """Return private test configs for the specified board, if available."""
tests = []
try:
current_dir = os.path.dirname(__file__)
- private_dir = os.path.join(current_dir, os.pardir, 'private/test')
+ private_dir = os.path.join(current_dir, os.pardir, "private/test")
have_private = os.path.isdir(private_dir)
if not have_private:
return []
sys.path.append(private_dir)
- import private_tests # pylint: disable=import-error
+ import private_tests # pylint: disable=import-error,import-outside-toplevel
+
for test_args in private_tests.tests:
tests.append(TestConfig(**test_args))
# Catch all exceptions to avoid disruptions in public repo
- except BaseException as e:
- logging.debug('Failed to get list of private tests: %s', str(e))
- logging.debug('Ignore error and continue.')
+ except BaseException as e: # pylint: disable=broad-except
+ logging.debug("Failed to get list of private tests: %s", str(e))
+ logging.debug("Ignore error and continue.")
return []
return tests
BLOONCHIPPER_CONFIG = BoardConfig(
name=BLOONCHIPPER,
- servo_uart_name='raw_fpmcu_console_uart_pty',
- servo_power_enable='fpmcu_pp3300',
+ servo_uart_name="raw_fpmcu_console_uart_pty",
+ servo_power_enable="fpmcu_pp3300",
rollback_region0_regex=DATA_ACCESS_VIOLATION_8020000_REGEX,
rollback_region1_regex=DATA_ACCESS_VIOLATION_8040000_REGEX,
mpu_regex=DATA_ACCESS_VIOLATION_20000000_REGEX,
variants={
- 'bloonchipper_v2.0.4277': {
- 'ro_image_path': BLOONCHIPPER_V4277_IMAGE_PATH
+ "bloonchipper_v2.0.4277": {
+ "ro_image_path": BLOONCHIPPER_V4277_IMAGE_PATH
+ },
+ "bloonchipper_v2.0.5938": {
+ "ro_image_path": BLOONCHIPPER_V5938_IMAGE_PATH
},
- 'bloonchipper_v2.0.5938': {
- 'ro_image_path': BLOONCHIPPER_V5938_IMAGE_PATH
- }
- }
+ },
)
DARTMONKEY_CONFIG = BoardConfig(
name=DARTMONKEY,
- servo_uart_name='raw_fpmcu_console_uart_pty',
- servo_power_enable='fpmcu_pp3300',
+ servo_uart_name="raw_fpmcu_console_uart_pty",
+ servo_power_enable="fpmcu_pp3300",
rollback_region0_regex=DATA_ACCESS_VIOLATION_80C0000_REGEX,
rollback_region1_regex=DATA_ACCESS_VIOLATION_80E0000_REGEX,
mpu_regex=DATA_ACCESS_VIOLATION_24000000_REGEX,
# For dartmonkey board, run panic data test also on nocturne_fp and
# nami_fp boards with appropriate RO image.
variants={
- 'dartmonkey_v2.0.2887': {
- 'ro_image_path': DARTMONKEY_IMAGE_PATH
+ "dartmonkey_v2.0.2887": {"ro_image_path": DARTMONKEY_IMAGE_PATH},
+ "nocturne_fp_v2.2.64": {
+ "ro_image_path": NOCTURNE_FP_IMAGE_PATH,
+ "build_board": "nocturne_fp",
},
- 'nocturne_fp_v2.2.64': {
- 'ro_image_path': NOCTURNE_FP_IMAGE_PATH,
- 'build_board': 'nocturne_fp'
+ "nami_fp_v2.2.144": {
+ "ro_image_path": NAMI_FP_IMAGE_PATH,
+ "build_board": "nami_fp",
},
- 'nami_fp_v2.2.144': {
- 'ro_image_path': NAMI_FP_IMAGE_PATH,
- 'build_board': 'nami_fp'
- }
- }
+ },
)
BOARD_CONFIGS = {
- 'bloonchipper': BLOONCHIPPER_CONFIG,
- 'dartmonkey': DARTMONKEY_CONFIG,
+ "bloonchipper": BLOONCHIPPER_CONFIG,
+ "dartmonkey": DARTMONKEY_CONFIG,
}
def read_file_gsutil(path: str) -> bytes:
"""Get data from bucket, using gsutil tool"""
- cmd = ['gsutil', 'cat', path]
+ cmd = ["gsutil", "cat", path]
- logging.debug('Running command: "%s"', ' '.join(cmd))
- gsutil = subprocess.run(cmd, stdout=subprocess.PIPE) # pylint: disable=subprocess-run-check
+ logging.debug('Running command: "%s"', " ".join(cmd))
+ gsutil = subprocess.run(cmd, stdout=subprocess.PIPE, check=False)
gsutil.check_returncode()
return gsutil.stdout
@@ -323,9 +384,9 @@ def read_file_gsutil(path: str) -> bytes:
def find_section_offset_size(section: str, image: bytes) -> (int, int):
"""Get offset and size of the section in image"""
- areas = fmap.fmap_decode(image)['areas']
- area = next(area for area in areas if area['name'] == section)
- return area['offset'], area['size']
+ areas = fmap.fmap_decode(image)["areas"]
+ area = next(area for area in areas if area["name"] == section)
+ return area["offset"], area["size"]
def read_section(src: bytes, section: str) -> bytes:
@@ -340,10 +401,10 @@ def write_section(data: bytes, image: bytearray, section: str):
(section_start, section_size) = find_section_offset_size(section, image)
if section_size < len(data):
- raise ValueError(section + ' section size is not enough to store data')
+ raise ValueError(section + " section size is not enough to store data")
section_end = section_start + section_size
- filling = bytes([0xff for _ in range(section_size - len(data))])
+ filling = bytes([0xFF for _ in range(section_size - len(data))])
image[section_start:section_end] = data + filling
@@ -354,12 +415,14 @@ def copy_section(src: bytes, dst: bytearray, section: str):
(dst_start, dst_size) = find_section_offset_size(section, dst)
if dst_size < src_size:
- raise ValueError('Section ' + section + ' from source image has '
- 'greater size than the section in destination image')
+ raise ValueError(
+ "Section " + section + " from source image has "
+ "greater size than the section in destination image"
+ )
src_end = src_start + src_size
dst_end = dst_start + dst_size
- filling = bytes([0xff for _ in range(dst_size - src_size)])
+ filling = bytes([0xFF for _ in range(dst_size - src_size)])
dst[dst_start:dst_end] = src[src_start:src_end] + filling
@@ -367,28 +430,28 @@ def copy_section(src: bytes, dst: bytearray, section: str):
def replace_ro(image: bytearray, ro: bytes):
"""Replace RO in image with provided one"""
# Backup RO public key since its private part was used to sign RW.
- ro_pubkey = read_section(image, 'KEY_RO')
+ ro_pubkey = read_section(image, "KEY_RO")
# Copy RO part of the firmware to the image. Please note that RO public key
# is copied too since EC_RO area includes KEY_RO area.
- copy_section(ro, image, 'EC_RO')
+ copy_section(ro, image, "EC_RO")
# Restore RO public key.
- write_section(ro_pubkey, image, 'KEY_RO')
+ write_section(ro_pubkey, image, "KEY_RO")
def get_console(board_config: BoardConfig) -> Optional[str]:
"""Get the name of the console for a given board."""
cmd = [
- 'dut-control',
+ "dut-control",
board_config.servo_uart_name,
]
- logging.debug('Running command: "%s"', ' '.join(cmd))
+ logging.debug('Running command: "%s"', " ".join(cmd))
with subprocess.Popen(cmd, stdout=subprocess.PIPE) as proc:
for line in io.TextIOWrapper(proc.stdout): # type: ignore[arg-type]
logging.debug(line)
- pty = line.split(':')
+ pty = line.split(":")
if len(pty) == 2 and pty[0] == board_config.servo_uart_name:
return pty[1].strip()
@@ -398,77 +461,90 @@ def get_console(board_config: BoardConfig) -> Optional[str]:
def power(board_config: BoardConfig, on: bool) -> None:
"""Turn power to board on/off."""
if on:
- state = 'pp3300'
+ state = "pp3300"
else:
- state = 'off'
+ state = "off"
cmd = [
- 'dut-control',
- board_config.servo_power_enable + ':' + state,
+ "dut-control",
+ board_config.servo_power_enable + ":" + state,
]
- logging.debug('Running command: "%s"', ' '.join(cmd))
- subprocess.run(cmd).check_returncode() # pylint: disable=subprocess-run-check
+ logging.debug('Running command: "%s"', " ".join(cmd))
+ subprocess.run(
+ cmd
+ ).check_returncode() # pylint: disable=subprocess-run-check
def hw_write_protect(enable: bool) -> None:
"""Enable/disable hardware write protect."""
if enable:
- state = 'force_on'
+ state = "force_on"
else:
- state = 'force_off'
+ state = "force_off"
cmd = [
- 'dut-control',
- 'fw_wp_state:' + state,
- ]
- logging.debug('Running command: "%s"', ' '.join(cmd))
- subprocess.run(cmd).check_returncode() # pylint: disable=subprocess-run-check
+ "dut-control",
+ "fw_wp_state:" + state,
+ ]
+ logging.debug('Running command: "%s"', " ".join(cmd))
+ subprocess.run(
+ cmd
+ ).check_returncode() # pylint: disable=subprocess-run-check
def build(test_name: str, board_name: str, compiler: str) -> None:
"""Build specified test for specified board."""
- cmd = ['make']
+ cmd = ["make"]
if compiler == CLANG:
- cmd = cmd + ['CC=arm-none-eabi-clang']
+ cmd = cmd + ["CC=arm-none-eabi-clang"]
cmd = cmd + [
- 'BOARD=' + board_name,
- 'test-' + test_name,
- '-j',
+ "BOARD=" + board_name,
+ "test-" + test_name,
+ "-j",
]
- logging.debug('Running command: "%s"', ' '.join(cmd))
- subprocess.run(cmd).check_returncode() # pylint: disable=subprocess-run-check
+ logging.debug('Running command: "%s"', " ".join(cmd))
+ subprocess.run(
+ cmd
+ ).check_returncode() # pylint: disable=subprocess-run-check
-def flash(image_path: str, board: str, flasher: str, remote_ip: str,
- remote_port: int) -> bool:
+def flash(
+ image_path: str, board: str, flasher: str, remote_ip: str, remote_port: int
+) -> bool:
"""Flash specified test to specified board."""
- logging.info('Flashing test')
+ logging.info("Flashing test")
cmd = []
if flasher == JTRACE:
cmd.append(JTRACE_FLASH_SCRIPT)
if remote_ip:
- cmd.extend(['--remote', remote_ip + ':' + str(remote_port)])
+ cmd.extend(["--remote", remote_ip + ":" + str(remote_port)])
elif flasher == SERVO_MICRO:
cmd.append(SERVO_MICRO_FLASH_SCRIPT)
else:
logging.error('Unknown flasher: "%s"', flasher)
return False
- cmd.extend([
- '--board', board,
- '--image', image_path,
- ])
- logging.debug('Running command: "%s"', ' '.join(cmd))
- completed_process = subprocess.run(cmd) # pylint: disable=subprocess-run-check
+ cmd.extend(
+ [
+ "--board",
+ board,
+ "--image",
+ image_path,
+ ]
+ )
+ logging.debug('Running command: "%s"', " ".join(cmd))
+ completed_process = subprocess.run(
+ cmd
+ ) # pylint: disable=subprocess-run-check
return completed_process.returncode == 0
def patch_image(test: TestConfig, image_path: str):
"""Replace RO part of the firmware with provided one."""
- with open(image_path, 'rb+') as f:
+ with open(image_path, "rb+") as f:
image = bytearray(f.read())
ro = read_file_gsutil(test.ro_image)
replace_ro(image, ro)
@@ -477,8 +553,9 @@ def patch_image(test: TestConfig, image_path: str):
f.truncate()
-def readline(executor: ThreadPoolExecutor, f: BinaryIO, timeout_secs: int) -> \
- Optional[bytes]:
+def readline(
+ executor: ThreadPoolExecutor, f: BinaryIO, timeout_secs: int
+) -> Optional[bytes]:
"""Read a line with timeout."""
a = executor.submit(f.readline)
try:
@@ -487,8 +564,9 @@ def readline(executor: ThreadPoolExecutor, f: BinaryIO, timeout_secs: int) -> \
return None
-def readlines_until_timeout(executor, f: BinaryIO, timeout_secs: int) -> \
- List[bytes]:
+def readlines_until_timeout(
+ executor, f: BinaryIO, timeout_secs: int
+) -> List[bytes]:
"""Continuously read lines for timeout_secs."""
lines: List[bytes] = []
while True:
@@ -499,6 +577,7 @@ def readlines_until_timeout(executor, f: BinaryIO, timeout_secs: int) -> \
def process_console_output_line(line: bytes, test: TestConfig):
+ """Parse console output line and update test pass/fail counters."""
try:
line_str = line.decode()
@@ -518,19 +597,20 @@ def process_console_output_line(line: bytes, test: TestConfig):
return None
-def run_test(test: TestConfig, console: io.FileIO,
- executor: ThreadPoolExecutor) -> bool:
+def run_test(
+ test: TestConfig, console: io.FileIO, executor: ThreadPoolExecutor
+) -> bool:
"""Run specified test."""
start = time.time()
# Wait for boot to finish
time.sleep(1)
- console.write('\n'.encode())
+ console.write("\n".encode())
if test.image_to_use == ImageType.RO:
- console.write('reboot ro\n'.encode())
+ console.write("reboot ro\n".encode())
time.sleep(1)
- test_cmd = 'runtest ' + ' '.join(test.test_args) + '\n'
+ test_cmd = "runtest " + " ".join(test.test_args) + "\n"
console.write(test_cmd.encode())
while True:
@@ -539,7 +619,7 @@ def run_test(test: TestConfig, console: io.FileIO,
if not line:
now = time.time()
if now - start > test.timeout_secs:
- logging.debug('Test timed out')
+ logging.debug("Test timed out")
return False
continue
@@ -568,15 +648,18 @@ def run_test(test: TestConfig, console: io.FileIO,
def get_test_list(config: BoardConfig, test_args) -> List[TestConfig]:
"""Get a list of tests to run."""
- if test_args == 'all':
+ if test_args == "all":
return AllTests.get(config)
test_list = []
for t in test_args:
- logging.debug('test: %s', t)
+ logging.debug("test: %s", t)
test_regex = re.compile(t)
- tests = [test for test in AllTests.get(config)
- if test_regex.fullmatch(test.config_name)]
+ tests = [
+ test
+ for test in AllTests.get(config)
+ if test_regex.fullmatch(test.config_name)
+ ]
if not tests:
logging.error('Unable to find test config for "%s"', t)
sys.exit(1)
@@ -585,9 +668,81 @@ def get_test_list(config: BoardConfig, test_args) -> List[TestConfig]:
return test_list
+def flash_and_run_test(
+ test: TestConfig,
+ board_config: BoardConfig,
+ args: argparse.Namespace,
+ executor,
+) -> bool:
+ """Run a single test using the test and board configuration specified"""
+ build_board = args.board
+ # If test provides this information, build image for board specified
+ # by test.
+ if test.build_board is not None:
+ build_board = test.build_board
+
+ # build test binary
+ build(test.test_name, build_board, args.compiler)
+
+ image_path = os.path.join(
+ EC_DIR, "build", build_board, test.test_name, test.test_name + ".bin"
+ )
+
+ if test.ro_image is not None:
+ try:
+ patch_image(test, image_path)
+ except Exception as exception: # pylint: disable=broad-except
+ logging.warning(
+ "An exception occurred while patching " "image: %s", exception
+ )
+ return False
+
+ # flash test binary
+ # TODO(b/158327221): First attempt to flash fails after
+ # flash_write_protect test is run; works after second attempt.
+ flash_succeeded = False
+ for i in range(0, test.num_flash_attempts):
+ logging.debug("Flash attempt %d", i + 1)
+ if flash(
+ image_path, args.board, args.flasher, args.remote, args.jlink_port
+ ):
+ flash_succeeded = True
+ break
+ time.sleep(1)
+
+ if not flash_succeeded:
+ logging.debug(
+ "Flashing failed after max attempts: %d", test.num_flash_attempts
+ )
+ return False
+
+ if test.toggle_power:
+ power(board_config, on=False)
+ time.sleep(1)
+ power(board_config, on=True)
+
+ hw_write_protect(test.enable_hw_write_protect)
+
+ # run the test
+ logging.info('Running test: "%s"', test.config_name)
+
+ with ExitStack() as stack:
+ if args.remote and args.console_port:
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect((args.remote, args.console_port))
+ console = stack.enter_context(s.makefile(mode="rwb", buffering=0))
+ else:
+ console = stack.enter_context(
+ open(get_console(board_config), "wb+", buffering=0)
+ )
+
+ return run_test(test, console, executor=executor)
+
+
def parse_remote_arg(remote: str) -> str:
+ """Convert the 'remote' input argument to IP address, if available."""
if not remote:
- return ''
+ return ""
try:
ip = socket.gethostbyname(remote)
@@ -597,165 +752,127 @@ def parse_remote_arg(remote: str) -> str:
sys.exit(1)
+def validate_args_combination(args: argparse.Namespace):
+ """Check that the current combination of arguments is supported.
+
+ Not all combinations of command line arguments are valid or currently
+ supported. If tests can't be executed, print and error message and exit.
+ """
+ if args.jlink_port and not args.flasher == JTRACE:
+ logging.error("jlink_port specified, but flasher is not set to J-Link.")
+ sys.exit(1)
+
+ if args.remote and not (args.jlink_port or args.console_port):
+ logging.error(
+ "jlink_port or console_port must be specified when using "
+ "the remote option."
+ )
+ sys.exit(1)
+
+ if (args.jlink_port or args.console_port) and not args.remote:
+ logging.error(
+ "The remote option must be specified when using the "
+ "jlink_port or console_port options."
+ )
+ sys.exit(1)
+
+ if args.remote and args.flasher == SERVO_MICRO:
+ logging.error(
+ "The remote option is not supported when flashing with servo "
+ "micro. Use J-Link instead or flash with a local servo micro."
+ )
+ sys.exit(1)
+
+ if args.board not in BOARD_CONFIGS:
+ logging.error('Unable to find a config for board: "%s"', args.board)
+ sys.exit(1)
+
+
def main():
+ """Run unit tests on device and displays the results."""
parser = argparse.ArgumentParser()
- default_board = 'bloonchipper'
+ default_board = "bloonchipper"
parser.add_argument(
- '--board', '-b',
- help='Board (default: ' + default_board + ')',
- default=default_board)
+ "--board",
+ "-b",
+ help="Board (default: " + default_board + ")",
+ default=default_board,
+ )
- default_tests = 'all'
+ default_tests = "all"
parser.add_argument(
- '--tests', '-t',
- nargs='+',
- help='Tests (default: ' + default_tests + ')',
- default=default_tests)
+ "--tests",
+ "-t",
+ nargs="+",
+ help="Tests (default: " + default_tests + ")",
+ default=default_tests,
+ )
- log_level_choices = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
+ log_level_choices = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
parser.add_argument(
- '--log_level', '-l',
- choices=log_level_choices,
- default='DEBUG'
+ "--log_level", "-l", choices=log_level_choices, default="DEBUG"
)
flasher_choices = [SERVO_MICRO, JTRACE]
parser.add_argument(
- '--flasher', '-f',
- choices=flasher_choices,
- default=JTRACE
+ "--flasher", "-f", choices=flasher_choices, default=JTRACE
)
compiler_options = [GCC, CLANG]
- parser.add_argument('--compiler', '-c',
- choices=compiler_options,
- default=GCC)
+ parser.add_argument(
+ "--compiler", "-c", choices=compiler_options, default=GCC
+ )
# This might be expanded to serve as a "remote" for flash_ec also, so
# we will leave it generic.
parser.add_argument(
- '--remote', '-n',
- help='The remote host connected to one or both of: J-Link and Servo.',
+ "--remote",
+ "-n",
+ help="The remote host connected to one or both of: J-Link and Servo.",
+ type=parse_remote_arg,
)
- parser.add_argument('--jlink_port', '-j',
- type=int,
- help='The port to use when connecting to JLink.')
- parser.add_argument('--console_port', '-p',
- type=int,
- help='The port connected to the FPMCU console.')
+ parser.add_argument(
+ "--jlink_port",
+ "-j",
+ type=int,
+ help="The port to use when connecting to JLink.",
+ )
+ parser.add_argument(
+ "--console_port",
+ "-p",
+ type=int,
+ help="The port connected to the FPMCU console.",
+ )
args = parser.parse_args()
logging.basicConfig(level=args.log_level)
-
- if args.jlink_port and not args.flasher == JTRACE:
- logging.error('jlink_port specified, but flasher is not set to J-Link.')
- sys.exit(1)
-
- if args.remote and not (args.jlink_port or args.console_port):
- logging.error('jlink_port or console_port must be specified when using '
- 'the remote option.')
- sys.exit(1)
-
- if (args.jlink_port or args.console_port) and not args.remote:
- logging.error('The remote option must be specified when using the '
- 'jlink_port or console_port options.')
- sys.exit(1)
-
- if args.board not in BOARD_CONFIGS:
- logging.error('Unable to find a config for board: "%s"', args.board)
- sys.exit(1)
+ validate_args_combination(args)
board_config = BOARD_CONFIGS[args.board]
-
- remote_ip = parse_remote_arg(args.remote)
-
- e = ThreadPoolExecutor(max_workers=1)
-
test_list = get_test_list(board_config, args.tests)
- logging.debug(
- 'Running tests: %s', [
- test.config_name for test in test_list])
-
- for test in test_list:
- build_board = args.board
- # If test provides this information, build image for board specified
- # by test.
- if test.build_board is not None:
- build_board = test.build_board
-
- # build test binary
- build(test.test_name, build_board, args.compiler)
-
- image_path = os.path.join(EC_DIR, 'build', build_board, test.test_name,
- test.test_name + '.bin')
-
- if test.ro_image is not None:
- try:
- patch_image(test, image_path)
- except Exception as exception:
- logging.warning('An exception occurred while patching '
- 'image: %s', exception)
- test.passed = False
- continue
-
- # flash test binary
- # TODO(b/158327221): First attempt to flash fails after
- # flash_write_protect test is run; works after second attempt.
- flash_succeeded = False
- for i in range(0, test.num_flash_attempts):
- logging.debug('Flash attempt %d', i + 1)
- if flash(image_path, args.board, args.flasher, remote_ip,
- args.jlink_port):
- flash_succeeded = True
- break
- time.sleep(1)
-
- if not flash_succeeded:
- logging.debug('Flashing failed after max attempts: %d',
- test.num_flash_attempts)
- test.passed = False
- continue
-
- if test.toggle_power:
- power(board_config, on=False)
- time.sleep(1)
- power(board_config, on=True)
-
- hw_write_protect(test.enable_hw_write_protect)
-
- # run the test
- logging.info('Running test: "%s"', test.config_name)
-
- with ExitStack() as stack:
- if remote_ip and args.console_port:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect((remote_ip, args.console_port))
- console = stack.enter_context(
- s.makefile(mode='rwb', buffering=0))
+ logging.debug("Running tests: %s", [test.config_name for test in test_list])
+
+ with ThreadPoolExecutor(max_workers=1) as e:
+ for test in test_list:
+ test.passed = flash_and_run_test(test, board_config, args, e)
+
+ colorama.init()
+ exit_code = 0
+ for test in test_list:
+ # print results
+ print('Test "' + test.config_name + '": ', end="")
+ if test.passed:
+ print(colorama.Fore.GREEN + "PASSED")
else:
- console = stack.enter_context(
- open(get_console(board_config), 'wb+', buffering=0))
-
- test.passed = run_test(test, console, executor=e)
-
- colorama.init()
- exit_code = 0
- for test in test_list:
- # print results
- print('Test "' + test.config_name + '": ', end='')
- if test.passed:
- print(colorama.Fore.GREEN + 'PASSED')
- else:
- print(colorama.Fore.RED + 'FAILED')
- exit_code = 1
+ print(colorama.Fore.RED + "FAILED")
+ exit_code = 1
- print(colorama.Style.RESET_ALL)
+ print(colorama.Style.RESET_ALL)
- e.shutdown(wait=False)
sys.exit(exit_code)
-if __name__ == '__main__':
+if __name__ == "__main__":
sys.exit(main())