diff options
Diffstat (limited to 'cts/common/board.py')
-rw-r--r-- | cts/common/board.py | 727 |
1 files changed, 380 insertions, 347 deletions
diff --git a/cts/common/board.py b/cts/common/board.py index d2c8e02b04..3f92ff50ad 100644 --- a/cts/common/board.py +++ b/cts/common/board.py @@ -1,388 +1,421 @@ -# Copyright 2016 The Chromium OS Authors. All rights reserved. +# Copyright 2016 The ChromiumOS Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. -# -# Ignore indention messages, since legacy scripts use 2 spaces instead of 4. -# pylint: disable=bad-indentation,docstring-section-indent -# pylint: disable=docstring-trailing-quotes # Note: This is a py2/3 compatible file. from __future__ import print_function -from abc import ABCMeta -from abc import abstractmethod import os import shutil import subprocess as sp -import serial +from abc import ABCMeta, abstractmethod +import serial # pylint:disable=import-error import six - -OCD_SCRIPT_DIR = '/usr/share/openocd/scripts' +OCD_SCRIPT_DIR = "/usr/share/openocd/scripts" OPENOCD_CONFIGS = { - 'stm32l476g-eval': 'board/stm32l4discovery.cfg', - 'nucleo-f072rb': 'board/st_nucleo_f0.cfg', - 'nucleo-f411re': 'board/st_nucleo_f4.cfg', + "stm32l476g-eval": "board/stm32l4discovery.cfg", + "nucleo-f072rb": "board/st_nucleo_f0.cfg", + "nucleo-f411re": "board/st_nucleo_f4.cfg", } FLASH_OFFSETS = { - 'stm32l476g-eval': '0x08000000', - 'nucleo-f072rb': '0x08000000', - 'nucleo-f411re': '0x08000000', + "stm32l476g-eval": "0x08000000", + "nucleo-f072rb": "0x08000000", + "nucleo-f411re": "0x08000000", } -REBOOT_MARKER = 'UART initialized after reboot' +REBOOT_MARKER = "UART initialized after reboot" def get_subprocess_args(): - if six.PY3: - return {'encoding': 'utf-8'} - return {} + if six.PY3: + return {"encoding": "utf-8"} + return {} class Board(six.with_metaclass(ABCMeta, object)): - """Class representing a single board connected to a host machine. - - Attributes: - board: String containing actual type of board, i.e. nucleo-f072rb - config: Directory of board config file relative to openocd's - scripts directory - hla_serial: String containing board's hla_serial number (if board - is an stm32 board) - tty_port: String that is the path to the tty port which board's - UART outputs to - tty: String of file descriptor for tty_port - """ - - def __init__(self, board, module, hla_serial=None): - """Initializes a board object with given attributes. - - Args: - board: String containing board name - module: String of the test module you are building, - i.e. gpio, timer, etc. - hla_serial: Serial number if board's adaptor is an HLA - - Raises: - RuntimeError: Board is not supported - """ - if board not in OPENOCD_CONFIGS: - msg = 'OpenOcd configuration not found for ' + board - raise RuntimeError(msg) - if board not in FLASH_OFFSETS: - msg = 'Flash offset not found for ' + board - raise RuntimeError(msg) - self.board = board - self.flash_offset = FLASH_OFFSETS[self.board] - self.openocd_config = OPENOCD_CONFIGS[self.board] - self.module = module - self.hla_serial = hla_serial - self.tty_port = None - self.tty = None - - def reset_log_dir(self): - """Reset log directory.""" - if os.path.isdir(self.log_dir): - shutil.rmtree(self.log_dir) - os.makedirs(self.log_dir) - - @staticmethod - def get_stlink_serials(): - """Gets serial numbers of all st-link v2.1 board attached to host. - - Returns: - List of serials + """Class representing a single board connected to a host machine. + + Attributes: + board: String containing actual type of board, i.e. nucleo-f072rb + config: Directory of board config file relative to openocd's + scripts directory + hla_serial: String containing board's hla_serial number (if board + is an stm32 board) + tty_port: String that is the path to the tty port which board's + UART outputs to + tty: String of file descriptor for tty_port """ - usb_args = ['sudo', 'lsusb', '-v', '-d', '0x0483:0x374b'] - st_link_info = sp.check_output(usb_args, **get_subprocess_args()) - st_serials = [] - for line in st_link_info.split('\n'): - if 'iSerial' not in line: - continue - words = line.split() - if len(words) <= 2: - continue - st_serials.append(words[2].strip()) - return st_serials - - @abstractmethod - def get_serial(self): - """Subclass should implement this.""" - pass - - def send_openocd_commands(self, commands): - """Send a command to the board via openocd. - - Args: - commands: A list of commands to send - - Returns: - True if execution is successful or False otherwise. - """ - args = ['sudo', 'openocd', '-s', OCD_SCRIPT_DIR, - '-f', self.openocd_config, '-c', 'hla_serial ' + self.hla_serial] - - for cmd in commands: - args += ['-c', cmd] - args += ['-c', 'shutdown'] - - rv = 1 - with open(self.openocd_log, 'a') as output: - rv = sp.call(args, stdout=output, stderr=sp.STDOUT) - - if rv != 0: - self.dump_openocd_log() - - return rv == 0 - def dump_openocd_log(self): - with open(self.openocd_log) as log: - print(log.read()) - - def build(self, ec_dir): - """Builds test suite module for board. - - Args: - ec_dir: String of the ec directory path - - Returns: - True if build is successful or False otherwise. - """ - cmds = ['make', - '--directory=' + ec_dir, - 'BOARD=' + self.board, - 'CTS_MODULE=' + self.module, - '-j'] - - rv = 1 - with open(self.build_log, 'a') as output: - rv = sp.call(cmds, stdout=output, stderr=sp.STDOUT) - - if rv != 0: - self.dump_build_log() - - return rv == 0 - - def dump_build_log(self): - with open(self.build_log) as log: - print(log.read()) - - def flash(self, image_path): - """Flashes board with most recent build ec.bin.""" - cmd = ['reset_config connect_assert_srst', - 'init', - 'reset init', - 'flash write_image erase %s %s' % (image_path, self.flash_offset)] - return self.send_openocd_commands(cmd) - - def to_string(self): - s = ('Type: Board\n' - 'board: ' + self.board + '\n' - 'hla_serial: ' + self.hla_serial + '\n' - 'openocd_config: ' + self.openocd_config + '\n' - 'tty_port: ' + self.tty_port + '\n' - 'tty: ' + str(self.tty) + '\n') - return s - - def reset_halt(self): - """Reset then halt board.""" - return self.send_openocd_commands(['init', 'reset halt']) - - def resume(self): - """Resume halting board.""" - return self.send_openocd_commands(['init', 'resume']) - - def setup_tty(self): - """Call this before calling read_tty for the first time. - - This is not in the initialization because caller only should call - this function after serial numbers are setup - """ - self.get_serial() - self.reset_halt() - self.identify_tty_port() - - tty = None - try: - tty = serial.Serial(self.tty_port, 115200, timeout=1) - except serial.SerialException: - raise ValueError('Failed to open ' + self.tty_port + ' of ' + self.board + - '. Please make sure the port is available and you have' + - ' permission to read it. Create dialout group and run:' + - ' sudo usermod -a -G dialout <username>.') - self.tty = tty - - def read_tty(self, max_boot_count=1): - """Read info from a serial port described by a file descriptor. - - Args: - max_boot_count: Stop reading if boot count exceeds this number - - Returns: - result: characters read from tty - boot: boot counts - """ - buf = [] - line = [] - boot = 0 - while True: - c = self.tty.read().decode('utf-8') - if not c: - break - line.append(c) - if c == '\n': - l = ''.join(line) - buf.append(l) - if REBOOT_MARKER in l: - boot += 1 + def __init__(self, board, module, hla_serial=None): + """Initializes a board object with given attributes. + + Args: + board: String containing board name + module: String of the test module you are building, + i.e. gpio, timer, etc. + hla_serial: Serial number if board's adaptor is an HLA + + Raises: + RuntimeError: Board is not supported + """ + if board not in OPENOCD_CONFIGS: + msg = "OpenOcd configuration not found for " + board + raise RuntimeError(msg) + if board not in FLASH_OFFSETS: + msg = "Flash offset not found for " + board + raise RuntimeError(msg) + self.board = board + self.flash_offset = FLASH_OFFSETS[self.board] + self.openocd_config = OPENOCD_CONFIGS[self.board] + self.module = module + self.hla_serial = hla_serial + self.tty_port = None + self.tty = None + self.log_dir = None + self.openocd_log = os.devnull + self.build_log = os.devnull + + def reset_log_dir(self): + """Reset log directory.""" + if self.log_dir: + if os.path.isdir(self.log_dir): + shutil.rmtree(self.log_dir) + os.makedirs(self.log_dir) + + @staticmethod + def get_stlink_serials(): + """Gets serial numbers of all st-link v2.1 board attached to host. + + Returns: + List of serials + """ + usb_args = ["sudo", "lsusb", "-v", "-d", "0x0483:0x374b"] + st_link_info = sp.check_output(usb_args, **get_subprocess_args()) + st_serials = [] + for line in st_link_info.split("\n"): + if "iSerial" not in line: + continue + words = line.split() + if len(words) <= 2: + continue + st_serials.append(words[2].strip()) + return st_serials + + @abstractmethod + def get_serial(self): + """Subclass should implement this.""" + pass + + def send_openocd_commands(self, commands): + """Send a command to the board via openocd. + + Args: + commands: A list of commands to send + + Returns: + True if execution is successful or False otherwise. + """ + args = [ + "sudo", + "openocd", + "-s", + OCD_SCRIPT_DIR, + "-f", + self.openocd_config, + "-c", + "hla_serial " + self.hla_serial, + ] + + for cmd in commands: + args += ["-c", cmd] + args += ["-c", "shutdown"] + + rv = 1 + with open(self.openocd_log, "a") as output: + rv = sp.call(args, stdout=output, stderr=sp.STDOUT) + + if rv != 0: + self.dump_openocd_log() + + return rv == 0 + + def dump_openocd_log(self): + with open(self.openocd_log) as log: + print(log.read()) + + def build(self, ec_dir): + """Builds test suite module for board. + + Args: + ec_dir: String of the ec directory path + + Returns: + True if build is successful or False otherwise. + """ + cmds = [ + "make", + "--directory=" + ec_dir, + "BOARD=" + self.board, + "CTS_MODULE=" + self.module, + "-j", + ] + + rv = 1 + with open(self.build_log, "a") as output: + rv = sp.call(cmds, stdout=output, stderr=sp.STDOUT) + + if rv != 0: + self.dump_build_log() + + return rv == 0 + + def dump_build_log(self): + with open(self.build_log) as log: + print(log.read()) + + def flash(self, image_path): + """Flashes board with most recent build ec.bin.""" + cmd = [ + "reset_config connect_assert_srst", + "init", + "reset init", + "flash write_image erase %s %s" % (image_path, self.flash_offset), + ] + return self.send_openocd_commands(cmd) + + def to_string(self): + s = ( + "Type: Board\n" + "board: " + self.board + "\n" + "hla_serial: " + self.hla_serial + "\n" + "openocd_config: " + self.openocd_config + "\n" + "tty_port: " + self.tty_port + "\n" + "tty: " + str(self.tty) + "\n" + ) + return s + + def reset_halt(self): + """Reset then halt board.""" + return self.send_openocd_commands(["init", "reset halt"]) + + def resume(self): + """Resume halting board.""" + return self.send_openocd_commands(["init", "resume"]) + + def setup_tty(self): + """Call this before calling read_tty for the first time. + + This is not in the initialization because caller only should call + this function after serial numbers are setup + """ + self.get_serial() + self.reset_halt() + self.identify_tty_port() + + tty = None + try: + tty = serial.Serial(self.tty_port, 115200, timeout=1) + except serial.SerialException: + raise ValueError( + "Failed to open " + + self.tty_port + + " of " + + self.board + + ". Please make sure the port is available and you have" + + " permission to read it. Create dialout group and run:" + + " sudo usermod -a -G dialout <username>." + ) + self.tty = tty + + def read_tty(self, max_boot_count=1): + """Read info from a serial port described by a file descriptor. + + Args: + max_boot_count: Stop reading if boot count exceeds this number + + Returns: + result: characters read from tty + boot: boot counts + """ + buf = [] line = [] - if boot > max_boot_count: - break - - l = ''.join(line) - buf.append(l) - result = ''.join(buf) - - return result, boot - - def identify_tty_port(self): - """Saves this board's serial port.""" - dev_dir = '/dev' - id_prefix = 'ID_SERIAL_SHORT=' - com_devices = [f for f in os.listdir(dev_dir) if f.startswith('ttyACM')] - - for device in com_devices: - self.tty_port = os.path.join(dev_dir, device) - properties = sp.check_output( - ['udevadm', 'info', '-a', '-n', self.tty_port, '--query=property'], - **get_subprocess_args()) - for line in [l.strip() for l in properties.split('\n')]: - if line.startswith(id_prefix): - if self.hla_serial == line[len(id_prefix):]: - return - - # If we get here without returning, something is wrong - raise RuntimeError('The device dev path could not be found') - - def close_tty(self): - """Close tty.""" - self.tty.close() + boot = 0 + while True: + c = self.tty.read().decode("utf-8") + if not c: + break + line.append(c) + if c == "\n": + l = "".join(line) + buf.append(l) + if REBOOT_MARKER in l: + boot += 1 + line = [] + if boot > max_boot_count: + break + + l = "".join(line) + buf.append(l) + result = "".join(buf) + + return result, boot + + def identify_tty_port(self): + """Saves this board's serial port.""" + dev_dir = "/dev" + id_prefix = "ID_SERIAL_SHORT=" + com_devices = [f for f in os.listdir(dev_dir) if f.startswith("ttyACM")] + + for device in com_devices: + self.tty_port = os.path.join(dev_dir, device) + properties = sp.check_output( + [ + "udevadm", + "info", + "-a", + "-n", + self.tty_port, + "--query=property", + ], + **get_subprocess_args() + ) + for line in [l.strip() for l in properties.split("\n")]: + if line.startswith(id_prefix): + if self.hla_serial == line[len(id_prefix) :]: + return + + # If we get here without returning, something is wrong + raise RuntimeError("The device dev path could not be found") + + def close_tty(self): + """Close tty.""" + self.tty.close() class TestHarness(Board): - """Subclass of Board representing a Test Harness. + """Subclass of Board representing a Test Harness. - Attributes: - serial_path: Path to file containing serial number - """ - - def __init__(self, board, module, log_dir, serial_path): - """Initializes a board object with given attributes. - - Args: - board: board name - module: module name - log_dir: Directory where log file is stored + Attributes: serial_path: Path to file containing serial number """ - Board.__init__(self, board, module) - self.log_dir = log_dir - self.openocd_log = os.path.join(log_dir, 'openocd_th.log') - self.build_log = os.path.join(log_dir, 'build_th.log') - self.serial_path = serial_path - self.reset_log_dir() - - def get_serial(self): - """Loads serial number from saved location.""" - if self.hla_serial: - return # serial was already loaded - try: - with open(self.serial_path, mode='r') as f: - s = f.read() - self.hla_serial = s.strip() + + def __init__(self, board, module, log_dir, serial_path): + """Initializes a board object with given attributes. + + Args: + board: board name + module: module name + log_dir: Directory where log file is stored + serial_path: Path to file containing serial number + """ + Board.__init__(self, board, module) + self.log_dir = log_dir + self.openocd_log = os.path.join(log_dir, "openocd_th.log") + self.build_log = os.path.join(log_dir, "build_th.log") + self.serial_path = serial_path + self.reset_log_dir() + + def get_serial(self): + """Loads serial number from saved location.""" + if self.hla_serial: + return # serial was already loaded + try: + with open(self.serial_path, mode="r") as f: + s = f.read() + self.hla_serial = s.strip() + return + except IOError: + msg = ( + "Your TH board has not been identified.\n" + "Connect only TH and run the script --setup, then try again." + ) + raise RuntimeError(msg) + + def save_serial(self): + """Saves the TH serial number to a file.""" + serials = Board.get_stlink_serials() + if len(serials) > 1: + msg = ( + "There are more than one test board connected to the host." + "\nConnect only the test harness and remove other boards." + ) + raise RuntimeError(msg) + if len(serials) < 1: + msg = "No test boards were found.\n" "Check boards are connected." + raise RuntimeError(msg) + + s = serials[0] + serial_dir = os.path.dirname(self.serial_path) + if not os.path.exists(serial_dir): + os.makedirs(serial_dir) + with open(self.serial_path, mode="w") as f: + f.write(s) + self.hla_serial = s + + print("Your TH serial", s, "has been saved as", self.serial_path) return - except IOError: - msg = ('Your TH board has not been identified.\n' - 'Connect only TH and run the script --setup, then try again.') - raise RuntimeError(msg) - - def save_serial(self): - """Saves the TH serial number to a file.""" - serials = Board.get_stlink_serials() - if len(serials) > 1: - msg = ('There are more than one test board connected to the host.' - '\nConnect only the test harness and remove other boards.') - raise RuntimeError(msg) - if len(serials) < 1: - msg = ('No test boards were found.\n' - 'Check boards are connected.') - raise RuntimeError(msg) - - s = serials[0] - serial_dir = os.path.dirname(self.serial_path) - if not os.path.exists(serial_dir): - os.makedirs(serial_dir) - with open(self.serial_path, mode='w') as f: - f.write(s) - self.hla_serial = s - - print('Your TH serial', s, 'has been saved as', self.serial_path) - return class DeviceUnderTest(Board): - """Subclass of Board representing a DUT board. + """Subclass of Board representing a DUT board. - Attributes: - th: Reference to test harness board to which this DUT is attached - """ - - def __init__(self, board, th, module, log_dir, hla_ser=None): - """Initializes a DUT object. - - Args: - board: String containing board name + Attributes: th: Reference to test harness board to which this DUT is attached - module: module name - log_dir: Directory where log file is stored - hla_ser: Serial number if board uses an HLA adaptor """ - Board.__init__(self, board, module, hla_serial=hla_ser) - self.th = th - self.log_dir = log_dir - self.openocd_log = os.path.join(log_dir, 'openocd_dut.log') - self.build_log = os.path.join(log_dir, 'build_dut.log') - self.reset_log_dir() - - def get_serial(self): - """Get serial number. - Precondition: The DUT and TH must both be connected, and th.hla_serial - must hold the correct value (the th's serial #) + def __init__(self, board, th, module, log_dir, hla_ser=None): + """Initializes a DUT object. + + Args: + board: String containing board name + th: Reference to test harness board to which this DUT is attached + module: module name + log_dir: Directory where log file is stored + hla_ser: Serial number if board uses an HLA adaptor + """ + Board.__init__(self, board, module, hla_serial=hla_ser) + self.th = th + self.log_dir = log_dir + self.openocd_log = os.path.join(log_dir, "openocd_dut.log") + self.build_log = os.path.join(log_dir, "build_dut.log") + self.reset_log_dir() + + def get_serial(self): + """Get serial number. + + Precondition: The DUT and TH must both be connected, and th.hla_serial + must hold the correct value (the th's serial #) + + Raises: + RuntimeError: DUT isn't found or multiple DUTs are found. + """ + if self.hla_serial is not None: + # serial was already set ('' is a valid serial) + return - Raises: - RuntimeError: DUT isn't found or multiple DUTs are found. - """ - if self.hla_serial is not None: - # serial was already set ('' is a valid serial) - return - - serials = Board.get_stlink_serials() - dut = [s for s in serials if self.th.hla_serial != s] - - # If len(dut) is 0 then your dut doesn't use an st-link device, so we - # don't have to worry about its serial number - if not dut: - msg = ('Failed to find serial for DUT.\n' - 'Is ' + self.board + ' connected?') - raise RuntimeError(msg) - if len(dut) > 1: - msg = ('Found multiple DUTs.\n' - 'You can connect only one DUT at a time. This may be caused by\n' - 'an incorrect TH serial. Check if ' + self.th.serial_path + '\n' - 'contains a correct serial.') - raise RuntimeError(msg) - - # Found your other st-link device serial! - self.hla_serial = dut[0] - return + serials = Board.get_stlink_serials() + dut = [s for s in serials if self.th.hla_serial != s] + + # If len(dut) is 0 then your dut doesn't use an st-link device, so we + # don't have to worry about its serial number + if not dut: + msg = ( + "Failed to find serial for DUT.\nIs " + + self.board + + " connected?" + ) + raise RuntimeError(msg) + if len(dut) > 1: + msg = ( + "Found multiple DUTs.\n" + "You can connect only one DUT at a time. This may be caused by\n" + "an incorrect TH serial. Check if " + self.th.serial_path + "\n" + "contains a correct serial." + ) + raise RuntimeError(msg) + + # Found your other st-link device serial! + self.hla_serial = dut[0] + return |