From 7540e7b47b55447475bb8191fb3520dd67cf7998 Mon Sep 17 00:00:00 2001 From: Jeremy Bettis Date: Fri, 8 Jul 2022 10:58:19 -0600 Subject: ec: Format all python files with black and isort find . \( -path ./private -prune \) -o -name '*.py' -print | xargs black find . \( -path ./private -prune \) -o -name '*.py' -print | xargs ~/chromiumos/chromite/scripts/isort --settings-file=.isort.cfg BRANCH=None BUG=b:238434058 TEST=None Signed-off-by: Jeremy Bettis Change-Id: I63462d6f15d1eaf3db84eb20d1404ee976be8382 Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/ec/+/3749242 Commit-Queue: Jeremy Bettis Reviewed-by: Tom Hughes Tested-by: Jeremy Bettis Commit-Queue: Jack Rosenthal Auto-Submit: Jeremy Bettis Reviewed-by: Jack Rosenthal --- cts/common/board.py | 698 +++++++++++++++++++++++---------------------- cts/cts.py | 804 ++++++++++++++++++++++++++-------------------------- 2 files changed, 766 insertions(+), 736 deletions(-) (limited to 'cts') diff --git a/cts/common/board.py b/cts/common/board.py index d2c8e02b04..f62d7bdfc5 100644 --- a/cts/common/board.py +++ b/cts/common/board.py @@ -10,379 +10,401 @@ from __future__ import print_function -from abc import ABCMeta -from abc import abstractmethod import os import shutil import subprocess as sp -import serial +from abc import ABCMeta, abstractmethod +import serial import six - -OCD_SCRIPT_DIR = '/usr/share/openocd/scripts' +OCD_SCRIPT_DIR = "/usr/share/openocd/scripts" OPENOCD_CONFIGS = { - 'stm32l476g-eval': 'board/stm32l4discovery.cfg', - 'nucleo-f072rb': 'board/st_nucleo_f0.cfg', - 'nucleo-f411re': 'board/st_nucleo_f4.cfg', + "stm32l476g-eval": "board/stm32l4discovery.cfg", + "nucleo-f072rb": "board/st_nucleo_f0.cfg", + "nucleo-f411re": "board/st_nucleo_f4.cfg", } FLASH_OFFSETS = { - 'stm32l476g-eval': '0x08000000', - 'nucleo-f072rb': '0x08000000', - 'nucleo-f411re': '0x08000000', + "stm32l476g-eval": "0x08000000", + "nucleo-f072rb": "0x08000000", + "nucleo-f411re": "0x08000000", } -REBOOT_MARKER = 'UART initialized after reboot' +REBOOT_MARKER = "UART initialized after reboot" def get_subprocess_args(): - if six.PY3: - return {'encoding': 'utf-8'} - return {} + if six.PY3: + return {"encoding": "utf-8"} + return {} class Board(six.with_metaclass(ABCMeta, object)): - """Class representing a single board connected to a host machine. - - Attributes: - board: String containing actual type of board, i.e. nucleo-f072rb - config: Directory of board config file relative to openocd's - scripts directory - hla_serial: String containing board's hla_serial number (if board - is an stm32 board) - tty_port: String that is the path to the tty port which board's - UART outputs to - tty: String of file descriptor for tty_port - """ - - def __init__(self, board, module, hla_serial=None): - """Initializes a board object with given attributes. - - Args: - board: String containing board name - module: String of the test module you are building, - i.e. gpio, timer, etc. - hla_serial: Serial number if board's adaptor is an HLA - - Raises: - RuntimeError: Board is not supported - """ - if board not in OPENOCD_CONFIGS: - msg = 'OpenOcd configuration not found for ' + board - raise RuntimeError(msg) - if board not in FLASH_OFFSETS: - msg = 'Flash offset not found for ' + board - raise RuntimeError(msg) - self.board = board - self.flash_offset = FLASH_OFFSETS[self.board] - self.openocd_config = OPENOCD_CONFIGS[self.board] - self.module = module - self.hla_serial = hla_serial - self.tty_port = None - self.tty = None - - def reset_log_dir(self): - """Reset log directory.""" - if os.path.isdir(self.log_dir): - shutil.rmtree(self.log_dir) - os.makedirs(self.log_dir) - - @staticmethod - def get_stlink_serials(): - """Gets serial numbers of all st-link v2.1 board attached to host. - - Returns: - List of serials + """Class representing a single board connected to a host machine. + + Attributes: + board: String containing actual type of board, i.e. nucleo-f072rb + config: Directory of board config file relative to openocd's + scripts directory + hla_serial: String containing board's hla_serial number (if board + is an stm32 board) + tty_port: String that is the path to the tty port which board's + UART outputs to + tty: String of file descriptor for tty_port """ - usb_args = ['sudo', 'lsusb', '-v', '-d', '0x0483:0x374b'] - st_link_info = sp.check_output(usb_args, **get_subprocess_args()) - st_serials = [] - for line in st_link_info.split('\n'): - if 'iSerial' not in line: - continue - words = line.split() - if len(words) <= 2: - continue - st_serials.append(words[2].strip()) - return st_serials - - @abstractmethod - def get_serial(self): - """Subclass should implement this.""" - pass - - def send_openocd_commands(self, commands): - """Send a command to the board via openocd. - - Args: - commands: A list of commands to send - - Returns: - True if execution is successful or False otherwise. - """ - args = ['sudo', 'openocd', '-s', OCD_SCRIPT_DIR, - '-f', self.openocd_config, '-c', 'hla_serial ' + self.hla_serial] - - for cmd in commands: - args += ['-c', cmd] - args += ['-c', 'shutdown'] - - rv = 1 - with open(self.openocd_log, 'a') as output: - rv = sp.call(args, stdout=output, stderr=sp.STDOUT) - if rv != 0: - self.dump_openocd_log() - - return rv == 0 - - def dump_openocd_log(self): - with open(self.openocd_log) as log: - print(log.read()) + def __init__(self, board, module, hla_serial=None): + """Initializes a board object with given attributes. + + Args: + board: String containing board name + module: String of the test module you are building, + i.e. gpio, timer, etc. + hla_serial: Serial number if board's adaptor is an HLA + + Raises: + RuntimeError: Board is not supported + """ + if board not in OPENOCD_CONFIGS: + msg = "OpenOcd configuration not found for " + board + raise RuntimeError(msg) + if board not in FLASH_OFFSETS: + msg = "Flash offset not found for " + board + raise RuntimeError(msg) + self.board = board + self.flash_offset = FLASH_OFFSETS[self.board] + self.openocd_config = OPENOCD_CONFIGS[self.board] + self.module = module + self.hla_serial = hla_serial + self.tty_port = None + self.tty = None + + def reset_log_dir(self): + """Reset log directory.""" + if os.path.isdir(self.log_dir): + shutil.rmtree(self.log_dir) + os.makedirs(self.log_dir) + + @staticmethod + def get_stlink_serials(): + """Gets serial numbers of all st-link v2.1 board attached to host. + + Returns: + List of serials + """ + usb_args = ["sudo", "lsusb", "-v", "-d", "0x0483:0x374b"] + st_link_info = sp.check_output(usb_args, **get_subprocess_args()) + st_serials = [] + for line in st_link_info.split("\n"): + if "iSerial" not in line: + continue + words = line.split() + if len(words) <= 2: + continue + st_serials.append(words[2].strip()) + return st_serials + + @abstractmethod + def get_serial(self): + """Subclass should implement this.""" + pass + + def send_openocd_commands(self, commands): + """Send a command to the board via openocd. + + Args: + commands: A list of commands to send + + Returns: + True if execution is successful or False otherwise. + """ + args = [ + "sudo", + "openocd", + "-s", + OCD_SCRIPT_DIR, + "-f", + self.openocd_config, + "-c", + "hla_serial " + self.hla_serial, + ] + + for cmd in commands: + args += ["-c", cmd] + args += ["-c", "shutdown"] + + rv = 1 + with open(self.openocd_log, "a") as output: + rv = sp.call(args, stdout=output, stderr=sp.STDOUT) + + if rv != 0: + self.dump_openocd_log() + + return rv == 0 + + def dump_openocd_log(self): + with open(self.openocd_log) as log: + print(log.read()) + + def build(self, ec_dir): + """Builds test suite module for board. + + Args: + ec_dir: String of the ec directory path + + Returns: + True if build is successful or False otherwise. + """ + cmds = [ + "make", + "--directory=" + ec_dir, + "BOARD=" + self.board, + "CTS_MODULE=" + self.module, + "-j", + ] + + rv = 1 + with open(self.build_log, "a") as output: + rv = sp.call(cmds, stdout=output, stderr=sp.STDOUT) + + if rv != 0: + self.dump_build_log() + + return rv == 0 + + def dump_build_log(self): + with open(self.build_log) as log: + print(log.read()) + + def flash(self, image_path): + """Flashes board with most recent build ec.bin.""" + cmd = [ + "reset_config connect_assert_srst", + "init", + "reset init", + "flash write_image erase %s %s" % (image_path, self.flash_offset), + ] + return self.send_openocd_commands(cmd) + + def to_string(self): + s = ( + "Type: Board\n" + "board: " + self.board + "\n" + "hla_serial: " + self.hla_serial + "\n" + "openocd_config: " + self.openocd_config + "\n" + "tty_port: " + self.tty_port + "\n" + "tty: " + str(self.tty) + "\n" + ) + return s + + def reset_halt(self): + """Reset then halt board.""" + return self.send_openocd_commands(["init", "reset halt"]) + + def resume(self): + """Resume halting board.""" + return self.send_openocd_commands(["init", "resume"]) + + def setup_tty(self): + """Call this before calling read_tty for the first time. + + This is not in the initialization because caller only should call + this function after serial numbers are setup + """ + self.get_serial() + self.reset_halt() + self.identify_tty_port() + + tty = None + try: + tty = serial.Serial(self.tty_port, 115200, timeout=1) + except serial.SerialException: + raise ValueError( + "Failed to open " + + self.tty_port + + " of " + + self.board + + ". Please make sure the port is available and you have" + + " permission to read it. Create dialout group and run:" + + " sudo usermod -a -G dialout ." + ) + self.tty = tty + + def read_tty(self, max_boot_count=1): + """Read info from a serial port described by a file descriptor. + + Args: + max_boot_count: Stop reading if boot count exceeds this number + + Returns: + result: characters read from tty + boot: boot counts + """ + buf = [] + line = [] + boot = 0 + while True: + c = self.tty.read().decode("utf-8") + if not c: + break + line.append(c) + if c == "\n": + l = "".join(line) + buf.append(l) + if REBOOT_MARKER in l: + boot += 1 + line = [] + if boot > max_boot_count: + break + + l = "".join(line) + buf.append(l) + result = "".join(buf) - def build(self, ec_dir): - """Builds test suite module for board. + return result, boot - Args: - ec_dir: String of the ec directory path + def identify_tty_port(self): + """Saves this board's serial port.""" + dev_dir = "/dev" + id_prefix = "ID_SERIAL_SHORT=" + com_devices = [f for f in os.listdir(dev_dir) if f.startswith("ttyACM")] - Returns: - True if build is successful or False otherwise. - """ - cmds = ['make', - '--directory=' + ec_dir, - 'BOARD=' + self.board, - 'CTS_MODULE=' + self.module, - '-j'] - - rv = 1 - with open(self.build_log, 'a') as output: - rv = sp.call(cmds, stdout=output, stderr=sp.STDOUT) - - if rv != 0: - self.dump_build_log() - - return rv == 0 - - def dump_build_log(self): - with open(self.build_log) as log: - print(log.read()) - - def flash(self, image_path): - """Flashes board with most recent build ec.bin.""" - cmd = ['reset_config connect_assert_srst', - 'init', - 'reset init', - 'flash write_image erase %s %s' % (image_path, self.flash_offset)] - return self.send_openocd_commands(cmd) - - def to_string(self): - s = ('Type: Board\n' - 'board: ' + self.board + '\n' - 'hla_serial: ' + self.hla_serial + '\n' - 'openocd_config: ' + self.openocd_config + '\n' - 'tty_port: ' + self.tty_port + '\n' - 'tty: ' + str(self.tty) + '\n') - return s - - def reset_halt(self): - """Reset then halt board.""" - return self.send_openocd_commands(['init', 'reset halt']) - - def resume(self): - """Resume halting board.""" - return self.send_openocd_commands(['init', 'resume']) - - def setup_tty(self): - """Call this before calling read_tty for the first time. - - This is not in the initialization because caller only should call - this function after serial numbers are setup - """ - self.get_serial() - self.reset_halt() - self.identify_tty_port() - - tty = None - try: - tty = serial.Serial(self.tty_port, 115200, timeout=1) - except serial.SerialException: - raise ValueError('Failed to open ' + self.tty_port + ' of ' + self.board + - '. Please make sure the port is available and you have' + - ' permission to read it. Create dialout group and run:' + - ' sudo usermod -a -G dialout .') - self.tty = tty - - def read_tty(self, max_boot_count=1): - """Read info from a serial port described by a file descriptor. - - Args: - max_boot_count: Stop reading if boot count exceeds this number - - Returns: - result: characters read from tty - boot: boot counts - """ - buf = [] - line = [] - boot = 0 - while True: - c = self.tty.read().decode('utf-8') - if not c: - break - line.append(c) - if c == '\n': - l = ''.join(line) - buf.append(l) - if REBOOT_MARKER in l: - boot += 1 - line = [] - if boot > max_boot_count: - break - - l = ''.join(line) - buf.append(l) - result = ''.join(buf) - - return result, boot - - def identify_tty_port(self): - """Saves this board's serial port.""" - dev_dir = '/dev' - id_prefix = 'ID_SERIAL_SHORT=' - com_devices = [f for f in os.listdir(dev_dir) if f.startswith('ttyACM')] - - for device in com_devices: - self.tty_port = os.path.join(dev_dir, device) - properties = sp.check_output( - ['udevadm', 'info', '-a', '-n', self.tty_port, '--query=property'], - **get_subprocess_args()) - for line in [l.strip() for l in properties.split('\n')]: - if line.startswith(id_prefix): - if self.hla_serial == line[len(id_prefix):]: - return + for device in com_devices: + self.tty_port = os.path.join(dev_dir, device) + properties = sp.check_output( + ["udevadm", "info", "-a", "-n", self.tty_port, "--query=property"], + **get_subprocess_args() + ) + for line in [l.strip() for l in properties.split("\n")]: + if line.startswith(id_prefix): + if self.hla_serial == line[len(id_prefix) :]: + return - # If we get here without returning, something is wrong - raise RuntimeError('The device dev path could not be found') + # If we get here without returning, something is wrong + raise RuntimeError("The device dev path could not be found") - def close_tty(self): - """Close tty.""" - self.tty.close() + def close_tty(self): + """Close tty.""" + self.tty.close() class TestHarness(Board): - """Subclass of Board representing a Test Harness. + """Subclass of Board representing a Test Harness. - Attributes: - serial_path: Path to file containing serial number - """ - - def __init__(self, board, module, log_dir, serial_path): - """Initializes a board object with given attributes. - - Args: - board: board name - module: module name - log_dir: Directory where log file is stored + Attributes: serial_path: Path to file containing serial number """ - Board.__init__(self, board, module) - self.log_dir = log_dir - self.openocd_log = os.path.join(log_dir, 'openocd_th.log') - self.build_log = os.path.join(log_dir, 'build_th.log') - self.serial_path = serial_path - self.reset_log_dir() - - def get_serial(self): - """Loads serial number from saved location.""" - if self.hla_serial: - return # serial was already loaded - try: - with open(self.serial_path, mode='r') as f: - s = f.read() - self.hla_serial = s.strip() + + def __init__(self, board, module, log_dir, serial_path): + """Initializes a board object with given attributes. + + Args: + board: board name + module: module name + log_dir: Directory where log file is stored + serial_path: Path to file containing serial number + """ + Board.__init__(self, board, module) + self.log_dir = log_dir + self.openocd_log = os.path.join(log_dir, "openocd_th.log") + self.build_log = os.path.join(log_dir, "build_th.log") + self.serial_path = serial_path + self.reset_log_dir() + + def get_serial(self): + """Loads serial number from saved location.""" + if self.hla_serial: + return # serial was already loaded + try: + with open(self.serial_path, mode="r") as f: + s = f.read() + self.hla_serial = s.strip() + return + except IOError: + msg = ( + "Your TH board has not been identified.\n" + "Connect only TH and run the script --setup, then try again." + ) + raise RuntimeError(msg) + + def save_serial(self): + """Saves the TH serial number to a file.""" + serials = Board.get_stlink_serials() + if len(serials) > 1: + msg = ( + "There are more than one test board connected to the host." + "\nConnect only the test harness and remove other boards." + ) + raise RuntimeError(msg) + if len(serials) < 1: + msg = "No test boards were found.\n" "Check boards are connected." + raise RuntimeError(msg) + + s = serials[0] + serial_dir = os.path.dirname(self.serial_path) + if not os.path.exists(serial_dir): + os.makedirs(serial_dir) + with open(self.serial_path, mode="w") as f: + f.write(s) + self.hla_serial = s + + print("Your TH serial", s, "has been saved as", self.serial_path) return - except IOError: - msg = ('Your TH board has not been identified.\n' - 'Connect only TH and run the script --setup, then try again.') - raise RuntimeError(msg) - - def save_serial(self): - """Saves the TH serial number to a file.""" - serials = Board.get_stlink_serials() - if len(serials) > 1: - msg = ('There are more than one test board connected to the host.' - '\nConnect only the test harness and remove other boards.') - raise RuntimeError(msg) - if len(serials) < 1: - msg = ('No test boards were found.\n' - 'Check boards are connected.') - raise RuntimeError(msg) - - s = serials[0] - serial_dir = os.path.dirname(self.serial_path) - if not os.path.exists(serial_dir): - os.makedirs(serial_dir) - with open(self.serial_path, mode='w') as f: - f.write(s) - self.hla_serial = s - - print('Your TH serial', s, 'has been saved as', self.serial_path) - return class DeviceUnderTest(Board): - """Subclass of Board representing a DUT board. + """Subclass of Board representing a DUT board. - Attributes: - th: Reference to test harness board to which this DUT is attached - """ - - def __init__(self, board, th, module, log_dir, hla_ser=None): - """Initializes a DUT object. - - Args: - board: String containing board name + Attributes: th: Reference to test harness board to which this DUT is attached - module: module name - log_dir: Directory where log file is stored - hla_ser: Serial number if board uses an HLA adaptor """ - Board.__init__(self, board, module, hla_serial=hla_ser) - self.th = th - self.log_dir = log_dir - self.openocd_log = os.path.join(log_dir, 'openocd_dut.log') - self.build_log = os.path.join(log_dir, 'build_dut.log') - self.reset_log_dir() - - def get_serial(self): - """Get serial number. - Precondition: The DUT and TH must both be connected, and th.hla_serial - must hold the correct value (the th's serial #) + def __init__(self, board, th, module, log_dir, hla_ser=None): + """Initializes a DUT object. + + Args: + board: String containing board name + th: Reference to test harness board to which this DUT is attached + module: module name + log_dir: Directory where log file is stored + hla_ser: Serial number if board uses an HLA adaptor + """ + Board.__init__(self, board, module, hla_serial=hla_ser) + self.th = th + self.log_dir = log_dir + self.openocd_log = os.path.join(log_dir, "openocd_dut.log") + self.build_log = os.path.join(log_dir, "build_dut.log") + self.reset_log_dir() + + def get_serial(self): + """Get serial number. + + Precondition: The DUT and TH must both be connected, and th.hla_serial + must hold the correct value (the th's serial #) + + Raises: + RuntimeError: DUT isn't found or multiple DUTs are found. + """ + if self.hla_serial is not None: + # serial was already set ('' is a valid serial) + return - Raises: - RuntimeError: DUT isn't found or multiple DUTs are found. - """ - if self.hla_serial is not None: - # serial was already set ('' is a valid serial) - return - - serials = Board.get_stlink_serials() - dut = [s for s in serials if self.th.hla_serial != s] - - # If len(dut) is 0 then your dut doesn't use an st-link device, so we - # don't have to worry about its serial number - if not dut: - msg = ('Failed to find serial for DUT.\n' - 'Is ' + self.board + ' connected?') - raise RuntimeError(msg) - if len(dut) > 1: - msg = ('Found multiple DUTs.\n' - 'You can connect only one DUT at a time. This may be caused by\n' - 'an incorrect TH serial. Check if ' + self.th.serial_path + '\n' - 'contains a correct serial.') - raise RuntimeError(msg) - - # Found your other st-link device serial! - self.hla_serial = dut[0] - return + serials = Board.get_stlink_serials() + dut = [s for s in serials if self.th.hla_serial != s] + + # If len(dut) is 0 then your dut doesn't use an st-link device, so we + # don't have to worry about its serial number + if not dut: + msg = "Failed to find serial for DUT.\n" "Is " + self.board + " connected?" + raise RuntimeError(msg) + if len(dut) > 1: + msg = ( + "Found multiple DUTs.\n" + "You can connect only one DUT at a time. This may be caused by\n" + "an incorrect TH serial. Check if " + self.th.serial_path + "\n" + "contains a correct serial." + ) + raise RuntimeError(msg) + + # Found your other st-link device serial! + self.hla_serial = dut[0] + return diff --git a/cts/cts.py b/cts/cts.py index c3e0335cab..ebc526c701 100755 --- a/cts/cts.py +++ b/cts/cts.py @@ -28,416 +28,424 @@ import argparse import os import shutil import time -import common.board as board +import common.board as board -CTS_RC_PREFIX = 'CTS_RC_' -DEFAULT_TH = 'stm32l476g-eval' -DEFAULT_DUT = 'nucleo-f072rb' +CTS_RC_PREFIX = "CTS_RC_" +DEFAULT_TH = "stm32l476g-eval" +DEFAULT_DUT = "nucleo-f072rb" MAX_SUITE_TIME_SEC = 5 -CTS_TEST_RESULT_DIR = '/tmp/ects' +CTS_TEST_RESULT_DIR = "/tmp/ects" # Host only return codes. Make sure they match values in cts.rc -CTS_RC_DID_NOT_START = -1 # test did not run. -CTS_RC_DID_NOT_END = -2 # test did not run. -CTS_RC_DUPLICATE_RUN = -3 # test was run multiple times. -CTS_RC_INVALID_RETURN_CODE = -4 # failed to parse return code +CTS_RC_DID_NOT_START = -1 # test did not run. +CTS_RC_DID_NOT_END = -2 # test did not run. +CTS_RC_DUPLICATE_RUN = -3 # test was run multiple times. +CTS_RC_INVALID_RETURN_CODE = -4 # failed to parse return code class Cts(object): - """Class that represents a eCTS run. - - Attributes: - dut: DeviceUnderTest object representing DUT - th: TestHarness object representing a test harness - module: Name of module to build/run tests for - testlist: List of strings of test names contained in given module - return_codes: Dict of strings of return codes, with a code's integer - value being the index for the corresponding string representation - """ - - def __init__(self, ec_dir, th, dut, module): - """Initializes cts class object with given arguments. - - Args: - ec_dir: Path to ec directory - th: Name of the test harness board - dut: Name of the device under test board - module: Name of module to build/run tests for (e.g. gpio, interrupt) - """ - self.results_dir = os.path.join(CTS_TEST_RESULT_DIR, dut, module) - if os.path.isdir(self.results_dir): - shutil.rmtree(self.results_dir) - else: - os.makedirs(self.results_dir) - self.ec_dir = ec_dir - self.module = module - serial_path = os.path.join(CTS_TEST_RESULT_DIR, 'th_serial') - self.th = board.TestHarness(th, module, self.results_dir, serial_path) - self.dut = board.DeviceUnderTest(dut, self.th, module, self.results_dir) - cts_dir = os.path.join(self.ec_dir, 'cts') - testlist_path = os.path.join(cts_dir, self.module, 'cts.testlist') - return_codes_path = os.path.join(cts_dir, 'common', 'cts.rc') - self.get_return_codes(return_codes_path) - self.testlist = self.get_macro_args(testlist_path, 'CTS_TEST') - - def build(self): - """Build images for DUT and TH.""" - print('Building DUT image...') - if not self.dut.build(self.ec_dir): - raise RuntimeError('Building module %s for DUT failed' % (self.module)) - print('Building TH image...') - if not self.th.build(self.ec_dir): - raise RuntimeError('Building module %s for TH failed' % (self.module)) - - def flash_boards(self): - """Flashes TH and DUT with their most recently built ec.bin.""" - cts_module = 'cts_' + self.module - image_path = os.path.join('build', self.th.board, cts_module, 'ec.bin') - self.identify_boards() - print('Flashing TH with', image_path) - if not self.th.flash(image_path): - raise RuntimeError('Flashing TH failed') - image_path = os.path.join('build', self.dut.board, cts_module, 'ec.bin') - print('Flashing DUT with', image_path) - if not self.dut.flash(image_path): - raise RuntimeError('Flashing DUT failed') - - def setup(self): - """Setup boards.""" - self.th.save_serial() - - def identify_boards(self): - """Updates serials of TH and DUT in that order (order matters).""" - self.th.get_serial() - self.dut.get_serial() - - def get_macro_args(self, filepath, macro): - """Get list of args of a macro in a file when macro. - - Args: - filepath: String containing absolute path to the file - macro: String containing text of macro to get args of - - Returns: - List of dictionaries where each entry is: - 'name': Test name, - 'th_string': Expected string from TH, - 'dut_string': Expected string from DUT, - """ - tests = [] - with open(filepath, 'r') as f: - lines = f.readlines() - joined = ''.join(lines).replace('\\\n', '').splitlines() - for l in joined: - if not l.strip().startswith(macro): - continue - d = {} - l = l.strip()[len(macro):] - l = l.strip('()').split(',') - d['name'] = l[0].strip() - d['th_rc'] = self.get_return_code_value(l[1].strip().strip('"')) - d['th_string'] = l[2].strip().strip('"') - d['dut_rc'] = self.get_return_code_value(l[3].strip().strip('"')) - d['dut_string'] = l[4].strip().strip('"') - tests.append(d) - return tests - - def get_return_codes(self, filepath): - """Read return code names from the return code definition file.""" - self.return_codes = {} - val = 0 - with open(filepath, 'r') as f: - for line in f: - line = line.strip() - if not line.startswith(CTS_RC_PREFIX): - continue - line = line.split(',')[0] - if '=' in line: - tokens = line.split('=') - line = tokens[0].strip() - val = int(tokens[1].strip()) - self.return_codes[line] = val - val += 1 - - def parse_output(self, output): - """Parse console output from DUT or TH. - - Args: - output: String containing consoule output - - Returns: - List of dictionaries where each key and value are: - name = 'ects_test_x', - started = True/False, - ended = True/False, - rc = CTS_RC_*, - output = All text between 'ects_test_x start' and 'ects_test_x end' - """ - results = [] - i = 0 - for test in self.testlist: - results.append({}) - results[i]['name'] = test['name'] - results[i]['started'] = False - results[i]['rc'] = CTS_RC_DID_NOT_START - results[i]['string'] = False - results[i]['output'] = [] - i += 1 - - i = 0 - for ln in [ln.strip() for ln in output.split('\n')]: - if i + 1 > len(results): - break - tokens = ln.split() - if len(tokens) >= 2: - if tokens[0].strip() == results[i]['name']: - if tokens[1].strip() == 'start': - # start line found - if results[i]['started']: # Already started - results[i]['rc'] = CTS_RC_DUPLICATE_RUN - else: - results[i]['rc'] = CTS_RC_DID_NOT_END - results[i]['started'] = True - continue - elif results[i]['started'] and tokens[1].strip() == 'end': - # end line found - results[i]['rc'] = CTS_RC_INVALID_RETURN_CODE - if len(tokens) == 3: - try: - results[i]['rc'] = int(tokens[2].strip()) - except ValueError: - pass - # Since index is incremented when 'end' is encountered, we don't - # need to check duplicate 'end'. - i += 1 - continue - if results[i]['started']: - results[i]['output'].append(ln) - - return results - - def get_return_code_name(self, code, strip_prefix=False): - name = '' - for k, v in self.return_codes.items(): - if v == code: - if strip_prefix: - name = k[len(CTS_RC_PREFIX):] - else: - name = k - return name - - def get_return_code_value(self, name): - if name: - return self.return_codes[name] - return 0 - - def evaluate_run(self, dut_output, th_output): - """Parse outputs to derive test results. - - Args: - dut_output: String output of DUT - th_output: String output of TH - - Returns: - th_results: list of test results for TH - dut_results: list of test results for DUT + """Class that represents a eCTS run. + + Attributes: + dut: DeviceUnderTest object representing DUT + th: TestHarness object representing a test harness + module: Name of module to build/run tests for + testlist: List of strings of test names contained in given module + return_codes: Dict of strings of return codes, with a code's integer + value being the index for the corresponding string representation """ - th_results = self.parse_output(th_output) - dut_results = self.parse_output(dut_output) - # Search for expected string in each output - for i, v in enumerate(self.testlist): - if v['th_string'] in th_results[i]['output'] or not v['th_string']: - th_results[i]['string'] = True - if v['dut_string'] in dut_results[i]['output'] or not v['dut_string']: - dut_results[i]['string'] = True + def __init__(self, ec_dir, th, dut, module): + """Initializes cts class object with given arguments. + + Args: + ec_dir: Path to ec directory + th: Name of the test harness board + dut: Name of the device under test board + module: Name of module to build/run tests for (e.g. gpio, interrupt) + """ + self.results_dir = os.path.join(CTS_TEST_RESULT_DIR, dut, module) + if os.path.isdir(self.results_dir): + shutil.rmtree(self.results_dir) + else: + os.makedirs(self.results_dir) + self.ec_dir = ec_dir + self.module = module + serial_path = os.path.join(CTS_TEST_RESULT_DIR, "th_serial") + self.th = board.TestHarness(th, module, self.results_dir, serial_path) + self.dut = board.DeviceUnderTest(dut, self.th, module, self.results_dir) + cts_dir = os.path.join(self.ec_dir, "cts") + testlist_path = os.path.join(cts_dir, self.module, "cts.testlist") + return_codes_path = os.path.join(cts_dir, "common", "cts.rc") + self.get_return_codes(return_codes_path) + self.testlist = self.get_macro_args(testlist_path, "CTS_TEST") + + def build(self): + """Build images for DUT and TH.""" + print("Building DUT image...") + if not self.dut.build(self.ec_dir): + raise RuntimeError("Building module %s for DUT failed" % (self.module)) + print("Building TH image...") + if not self.th.build(self.ec_dir): + raise RuntimeError("Building module %s for TH failed" % (self.module)) + + def flash_boards(self): + """Flashes TH and DUT with their most recently built ec.bin.""" + cts_module = "cts_" + self.module + image_path = os.path.join("build", self.th.board, cts_module, "ec.bin") + self.identify_boards() + print("Flashing TH with", image_path) + if not self.th.flash(image_path): + raise RuntimeError("Flashing TH failed") + image_path = os.path.join("build", self.dut.board, cts_module, "ec.bin") + print("Flashing DUT with", image_path) + if not self.dut.flash(image_path): + raise RuntimeError("Flashing DUT failed") + + def setup(self): + """Setup boards.""" + self.th.save_serial() + + def identify_boards(self): + """Updates serials of TH and DUT in that order (order matters).""" + self.th.get_serial() + self.dut.get_serial() + + def get_macro_args(self, filepath, macro): + """Get list of args of a macro in a file when macro. + + Args: + filepath: String containing absolute path to the file + macro: String containing text of macro to get args of + + Returns: + List of dictionaries where each entry is: + 'name': Test name, + 'th_string': Expected string from TH, + 'dut_string': Expected string from DUT, + """ + tests = [] + with open(filepath, "r") as f: + lines = f.readlines() + joined = "".join(lines).replace("\\\n", "").splitlines() + for l in joined: + if not l.strip().startswith(macro): + continue + d = {} + l = l.strip()[len(macro) :] + l = l.strip("()").split(",") + d["name"] = l[0].strip() + d["th_rc"] = self.get_return_code_value(l[1].strip().strip('"')) + d["th_string"] = l[2].strip().strip('"') + d["dut_rc"] = self.get_return_code_value(l[3].strip().strip('"')) + d["dut_string"] = l[4].strip().strip('"') + tests.append(d) + return tests + + def get_return_codes(self, filepath): + """Read return code names from the return code definition file.""" + self.return_codes = {} + val = 0 + with open(filepath, "r") as f: + for line in f: + line = line.strip() + if not line.startswith(CTS_RC_PREFIX): + continue + line = line.split(",")[0] + if "=" in line: + tokens = line.split("=") + line = tokens[0].strip() + val = int(tokens[1].strip()) + self.return_codes[line] = val + val += 1 + + def parse_output(self, output): + """Parse console output from DUT or TH. + + Args: + output: String containing consoule output + + Returns: + List of dictionaries where each key and value are: + name = 'ects_test_x', + started = True/False, + ended = True/False, + rc = CTS_RC_*, + output = All text between 'ects_test_x start' and 'ects_test_x end' + """ + results = [] + i = 0 + for test in self.testlist: + results.append({}) + results[i]["name"] = test["name"] + results[i]["started"] = False + results[i]["rc"] = CTS_RC_DID_NOT_START + results[i]["string"] = False + results[i]["output"] = [] + i += 1 - return th_results, dut_results + i = 0 + for ln in [ln.strip() for ln in output.split("\n")]: + if i + 1 > len(results): + break + tokens = ln.split() + if len(tokens) >= 2: + if tokens[0].strip() == results[i]["name"]: + if tokens[1].strip() == "start": + # start line found + if results[i]["started"]: # Already started + results[i]["rc"] = CTS_RC_DUPLICATE_RUN + else: + results[i]["rc"] = CTS_RC_DID_NOT_END + results[i]["started"] = True + continue + elif results[i]["started"] and tokens[1].strip() == "end": + # end line found + results[i]["rc"] = CTS_RC_INVALID_RETURN_CODE + if len(tokens) == 3: + try: + results[i]["rc"] = int(tokens[2].strip()) + except ValueError: + pass + # Since index is incremented when 'end' is encountered, we don't + # need to check duplicate 'end'. + i += 1 + continue + if results[i]["started"]: + results[i]["output"].append(ln) + + return results + + def get_return_code_name(self, code, strip_prefix=False): + name = "" + for k, v in self.return_codes.items(): + if v == code: + if strip_prefix: + name = k[len(CTS_RC_PREFIX) :] + else: + name = k + return name + + def get_return_code_value(self, name): + if name: + return self.return_codes[name] + return 0 + + def evaluate_run(self, dut_output, th_output): + """Parse outputs to derive test results. + + Args: + dut_output: String output of DUT + th_output: String output of TH + + Returns: + th_results: list of test results for TH + dut_results: list of test results for DUT + """ + th_results = self.parse_output(th_output) + dut_results = self.parse_output(dut_output) + + # Search for expected string in each output + for i, v in enumerate(self.testlist): + if v["th_string"] in th_results[i]["output"] or not v["th_string"]: + th_results[i]["string"] = True + if v["dut_string"] in dut_results[i]["output"] or not v["dut_string"]: + dut_results[i]["string"] = True + + return th_results, dut_results + + def print_result(self, th_results, dut_results): + """Print results to the screen. + + Args: + th_results: list of test results for TH + dut_results: list of test results for DUT + """ + len_test_name = max(len(s["name"]) for s in self.testlist) + len_code_name = max( + len(self.get_return_code_name(v, True)) for v in self.return_codes.values() + ) + + head = "{:^" + str(len_test_name) + "} " + head += "{:^" + str(len_code_name) + "} " + head += "{:^" + str(len_code_name) + "}" + head += "{:^" + str(len(" TH_STR")) + "}" + head += "{:^" + str(len(" DUT_STR")) + "}" + head += "{:^" + str(len(" RESULT")) + "}\n" + fmt = "{:" + str(len_test_name) + "} " + fmt += "{:>" + str(len_code_name) + "} " + fmt += "{:>" + str(len_code_name) + "}" + fmt += "{:>" + str(len(" TH_STR")) + "}" + fmt += "{:>" + str(len(" DUT_STR")) + "}" + fmt += "{:>" + str(len(" RESULT")) + "}\n" + + self.formatted_results = head.format( + "TEST NAME", "TH_RC", "DUT_RC", " TH_STR", " DUT_STR", " RESULT" + ) + for i, d in enumerate(dut_results): + th_cn = self.get_return_code_name(th_results[i]["rc"], True) + dut_cn = self.get_return_code_name(dut_results[i]["rc"], True) + th_res = self.evaluate_result( + th_results[i], self.testlist[i]["th_rc"], self.testlist[i]["th_string"] + ) + dut_res = self.evaluate_result( + dut_results[i], + self.testlist[i]["dut_rc"], + self.testlist[i]["dut_string"], + ) + self.formatted_results += fmt.format( + d["name"], + th_cn, + dut_cn, + "YES" if th_results[i]["string"] else "NO", + "YES" if dut_results[i]["string"] else "NO", + "PASS" if th_res and dut_res else "FAIL", + ) + + def evaluate_result(self, result, expected_rc, expected_string): + if result["rc"] != expected_rc: + return False + if expected_string and expected_string not in result["output"]: + return False + return True + + def run(self): + """Resets boards, records test results in results dir.""" + print("Reading serials...") + self.identify_boards() + print("Opening DUT tty...") + self.dut.setup_tty() + print("Opening TH tty...") + self.th.setup_tty() + + # Boards might be still writing to tty. Wait a few seconds before flashing. + time.sleep(3) + + # clear buffers + print("Clearing DUT tty...") + self.dut.read_tty() + print("Clearing TH tty...") + self.th.read_tty() + + # Resets the boards and allows them to run tests + # Due to current (7/27/16) version of sync function, + # both boards must be rest and halted, with the th + # resuming first, in order for the test suite to run in sync + print("Halting TH...") + if not self.th.reset_halt(): + raise RuntimeError("Failed to halt TH") + print("Halting DUT...") + if not self.dut.reset_halt(): + raise RuntimeError("Failed to halt DUT") + print("Resuming TH...") + if not self.th.resume(): + raise RuntimeError("Failed to resume TH") + print("Resuming DUT...") + if not self.dut.resume(): + raise RuntimeError("Failed to resume DUT") + + time.sleep(MAX_SUITE_TIME_SEC) + + print("Reading DUT tty...") + dut_output, _ = self.dut.read_tty() + self.dut.close_tty() + print("Reading TH tty...") + th_output, _ = self.th.read_tty() + self.th.close_tty() + + print("Halting TH...") + if not self.th.reset_halt(): + raise RuntimeError("Failed to halt TH") + print("Halting DUT...") + if not self.dut.reset_halt(): + raise RuntimeError("Failed to halt DUT") + + if not dut_output or not th_output: + raise ValueError( + "Output missing from boards. If you have a process " + "reading ttyACMx, please kill that process and try " + "again." + ) + + print("Pursing results...") + th_results, dut_results = self.evaluate_run(dut_output, th_output) + + # Print out results + self.print_result(th_results, dut_results) + + # Write results + dest = os.path.join(self.results_dir, "results.log") + with open(dest, "w") as fl: + fl.write(self.formatted_results) + + # Write UART outputs + dest = os.path.join(self.results_dir, "uart_th.log") + with open(dest, "w") as fl: + fl.write(th_output) + dest = os.path.join(self.results_dir, "uart_dut.log") + with open(dest, "w") as fl: + fl.write(dut_output) + + print(self.formatted_results) + + # TODO(chromium:735652): Should set exit code for the shell - def print_result(self, th_results, dut_results): - """Print results to the screen. - Args: - th_results: list of test results for TH - dut_results: list of test results for DUT - """ - len_test_name = max(len(s['name']) for s in self.testlist) - len_code_name = max(len(self.get_return_code_name(v, True)) - for v in self.return_codes.values()) - - head = '{:^' + str(len_test_name) + '} ' - head += '{:^' + str(len_code_name) + '} ' - head += '{:^' + str(len_code_name) + '}' - head += '{:^' + str(len(' TH_STR')) + '}' - head += '{:^' + str(len(' DUT_STR')) + '}' - head += '{:^' + str(len(' RESULT')) + '}\n' - fmt = '{:' + str(len_test_name) + '} ' - fmt += '{:>' + str(len_code_name) + '} ' - fmt += '{:>' + str(len_code_name) + '}' - fmt += '{:>' + str(len(' TH_STR')) + '}' - fmt += '{:>' + str(len(' DUT_STR')) + '}' - fmt += '{:>' + str(len(' RESULT')) + '}\n' - - self.formatted_results = head.format( - 'TEST NAME', 'TH_RC', 'DUT_RC', - ' TH_STR', ' DUT_STR', ' RESULT') - for i, d in enumerate(dut_results): - th_cn = self.get_return_code_name(th_results[i]['rc'], True) - dut_cn = self.get_return_code_name(dut_results[i]['rc'], True) - th_res = self.evaluate_result(th_results[i], - self.testlist[i]['th_rc'], - self.testlist[i]['th_string']) - dut_res = self.evaluate_result(dut_results[i], - self.testlist[i]['dut_rc'], - self.testlist[i]['dut_string']) - self.formatted_results += fmt.format( - d['name'], th_cn, dut_cn, - 'YES' if th_results[i]['string'] else 'NO', - 'YES' if dut_results[i]['string'] else 'NO', - 'PASS' if th_res and dut_res else 'FAIL') - - def evaluate_result(self, result, expected_rc, expected_string): - if result['rc'] != expected_rc: - return False - if expected_string and expected_string not in result['output']: - return False - return True - - def run(self): - """Resets boards, records test results in results dir.""" - print('Reading serials...') - self.identify_boards() - print('Opening DUT tty...') - self.dut.setup_tty() - print('Opening TH tty...') - self.th.setup_tty() - - # Boards might be still writing to tty. Wait a few seconds before flashing. - time.sleep(3) - - # clear buffers - print('Clearing DUT tty...') - self.dut.read_tty() - print('Clearing TH tty...') - self.th.read_tty() - - # Resets the boards and allows them to run tests - # Due to current (7/27/16) version of sync function, - # both boards must be rest and halted, with the th - # resuming first, in order for the test suite to run in sync - print('Halting TH...') - if not self.th.reset_halt(): - raise RuntimeError('Failed to halt TH') - print('Halting DUT...') - if not self.dut.reset_halt(): - raise RuntimeError('Failed to halt DUT') - print('Resuming TH...') - if not self.th.resume(): - raise RuntimeError('Failed to resume TH') - print('Resuming DUT...') - if not self.dut.resume(): - raise RuntimeError('Failed to resume DUT') - - time.sleep(MAX_SUITE_TIME_SEC) - - print('Reading DUT tty...') - dut_output, _ = self.dut.read_tty() - self.dut.close_tty() - print('Reading TH tty...') - th_output, _ = self.th.read_tty() - self.th.close_tty() - - print('Halting TH...') - if not self.th.reset_halt(): - raise RuntimeError('Failed to halt TH') - print('Halting DUT...') - if not self.dut.reset_halt(): - raise RuntimeError('Failed to halt DUT') - - if not dut_output or not th_output: - raise ValueError('Output missing from boards. If you have a process ' - 'reading ttyACMx, please kill that process and try ' - 'again.') - - print('Pursing results...') - th_results, dut_results = self.evaluate_run(dut_output, th_output) - - # Print out results - self.print_result(th_results, dut_results) - - # Write results - dest = os.path.join(self.results_dir, 'results.log') - with open(dest, 'w') as fl: - fl.write(self.formatted_results) - - # Write UART outputs - dest = os.path.join(self.results_dir, 'uart_th.log') - with open(dest, 'w') as fl: - fl.write(th_output) - dest = os.path.join(self.results_dir, 'uart_dut.log') - with open(dest, 'w') as fl: - fl.write(dut_output) - - print(self.formatted_results) - - # TODO(chromium:735652): Should set exit code for the shell +def main(): + ec_dir = os.path.realpath( + os.path.join(os.path.dirname(os.path.abspath(__file__)), "..") + ) + os.chdir(ec_dir) + + dut = DEFAULT_DUT + module = "meta" + + parser = argparse.ArgumentParser(description="Used to build/flash boards") + parser.add_argument("-d", "--dut", help="Specify DUT you want to build/flash") + parser.add_argument("-m", "--module", help="Specify module you want to build/flash") + parser.add_argument( + "-s", + "--setup", + action="store_true", + help="Connect only the TH to save its serial", + ) + parser.add_argument( + "-b", "--build", action="store_true", help="Build test suite (no flashing)" + ) + parser.add_argument( + "-f", + "--flash", + action="store_true", + help="Flash boards with most recent images", + ) + parser.add_argument( + "-r", "--run", action="store_true", help="Run tests without flashing" + ) + + args = parser.parse_args() + + if args.module: + module = args.module + + if args.dut: + dut = args.dut + + cts = Cts(ec_dir, DEFAULT_TH, dut=dut, module=module) + + if args.setup: + cts.setup() + elif args.build: + cts.build() + elif args.flash: + cts.flash_boards() + elif args.run: + cts.run() + else: + cts.build() + cts.flash_boards() + cts.run() -def main(): - ec_dir = os.path.realpath(os.path.join( - os.path.dirname(os.path.abspath(__file__)), '..')) - os.chdir(ec_dir) - - dut = DEFAULT_DUT - module = 'meta' - - parser = argparse.ArgumentParser(description='Used to build/flash boards') - parser.add_argument('-d', - '--dut', - help='Specify DUT you want to build/flash') - parser.add_argument('-m', - '--module', - help='Specify module you want to build/flash') - parser.add_argument('-s', - '--setup', - action='store_true', - help='Connect only the TH to save its serial') - parser.add_argument('-b', - '--build', - action='store_true', - help='Build test suite (no flashing)') - parser.add_argument('-f', - '--flash', - action='store_true', - help='Flash boards with most recent images') - parser.add_argument('-r', - '--run', - action='store_true', - help='Run tests without flashing') - - args = parser.parse_args() - - if args.module: - module = args.module - - if args.dut: - dut = args.dut - - cts = Cts(ec_dir, DEFAULT_TH, dut=dut, module=module) - - if args.setup: - cts.setup() - elif args.build: - cts.build() - elif args.flash: - cts.flash_boards() - elif args.run: - cts.run() - else: - cts.build() - cts.flash_boards() - cts.run() - -if __name__ == '__main__': - main() +if __name__ == "__main__": + main() -- cgit v1.2.1