diff options
author | Daisuke Nojiri <dnojiri@chromium.org> | 2016-09-08 15:28:39 -0700 |
---|---|---|
committer | chrome-bot <chrome-bot@chromium.org> | 2016-09-13 22:22:18 -0700 |
commit | d6b0a1cc8809aba441f03c2090c142cfd227ce7d (patch) | |
tree | 094bfc170c5d9e06ece905406be199eaccb86c9c /cts/cts.py | |
parent | 91bd09c856cda8393c45c25e3f8b04082b6f3b74 (diff) | |
download | chrome-ec-d6b0a1cc8809aba441f03c2090c142cfd227ce7d.tar.gz |
cts: Refactor cts.py
Noteworthy changes:
- Move Board and its child classes in common/board.py
- Separate flashing and resetting. Flashing used to imply running tests.
- Move up constants up for better visibility
- Change default suite to 'meta'
- Removed redundant code
- Lots of renames (all lower case names, shorter names, etc.)
BUG=none
BRANCH=none
TEST=Ran meta test and verify the results match the expectations
Change-Id: I158d96e2ee104767d25b2e721d5206e528600381
Reviewed-on: https://chromium-review.googlesource.com/383911
Commit-Ready: Daisuke Nojiri <dnojiri@chromium.org>
Tested-by: Daisuke Nojiri <dnojiri@chromium.org>
Reviewed-by: Randall Spangler <rspangler@chromium.org>
Diffstat (limited to 'cts/cts.py')
-rwxr-xr-x | cts/cts.py | 545 |
1 files changed, 114 insertions, 431 deletions
diff --git a/cts/cts.py b/cts/cts.py index 1880ce6018..983b9751d2 100755 --- a/cts/cts.py +++ b/cts/cts.py @@ -3,306 +3,41 @@ # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. -# This file is a utility to quickly flash boards +# A script which builds, flashes, and runs EC CTS +# +# Software prerequisites: +# - openocd version 0.10 or above +# - lsusb +# - udevadm +# +# To try it out, hook two boards (DEFAULT_TH and DEFAULT_DUT) with USB cables +# to the host and execute the script: +# $ ./cts.py --debug +# It'll run mock tests. The result will be stored in CTS_TEST_RESULT_DIR. + import argparse import collections -import fcntl import os -import select -import subprocess as sp import time +import common.board as board from copy import deepcopy -from abc import ABCMeta, abstractmethod import xml.etree.ElementTree as et +from twisted.python.syslog import DEFAULT_FACILITY + CTS_CORRUPTED_CODE = -2 # The test didn't execute correctly CTS_CONFLICTING_CODE = -1 # Error codes should never conflict CTS_SUCCESS_CODE = 0 CTS_COLOR_RED = '#fb7d7d' CTS_COLOR_GREEN = '#7dfb9f' -TH_BOARD = 'stm32l476g-eval' -OCD_SCRIPT_DIR = '/usr/local/share/openocd/scripts' +DEFAULT_TH = 'stm32l476g-eval' +DEFAULT_DUT = 'nucleo-f072rb' MAX_SUITE_TIME_SEC = 3 CTS_DEBUG_START = '[DEBUG]' CTS_DEBUG_END = '[DEBUG_END]' +CTS_TEST_RESULT_DIR = '/tmp/cts' -class Board(object): - """Class representing a single board connected to a host machine - - This class is abstract, subclasses must define the updateSerial() - method - - Attributes: - board: String containing actual type of board, i.e. nucleo-f072rb - config: Directory of board config file relative to openocd's - scripts directory - hla_serial: String containing board's hla_serial number (if board - is an stm32 board) - tty_port: String that is the path to the tty port which board's - UART outputs to - _tty_descriptor: String of file descriptor for tty_port - """ - - configs = { - 'stm32l476g-eval': 'board/stm32l4discovery.cfg', - 'nucleo-f072rb': 'board/st_nucleo_f0.cfg' - } - - __metaclass__ = ABCMeta # This is an Abstract Base Class (ABC) - def __init__(self, board, hla_serial=None, flash_offset='0x08000000'): - """Initializes a board object with given attributes - - Args: - board: String containing board name - hla_serial: Serial number if board's adaptor is an HLA - """ - self.board = board - self.hla_serial = hla_serial - self.tty_port = None - self._tty_descriptor = None - self.flash_offset = flash_offset - - @abstractmethod - def updateSerial(self): - """Subclass should implement this""" - pass - - def sendOpenOcdCommands(self, commands): - """Send a command to the board via openocd - - Args: - commands: A list of commands to send - """ - args = ['openocd', '-s', OCD_SCRIPT_DIR, - '-f', Board.configs[self.board], '-c', 'hla_serial ' + self.hla_serial] - - for cmd in commands: - args += ['-c', cmd] - args += ['-c', 'shutdown'] - sp.call(args) - - def make(self, module, ec_dir, debug=False): - """Builds test suite module for board - - Args: - module: String of the test module you are building, - i.e. gpio, timer, etc. - ec_dir: String of the ec directory path - debug: True means compile in debug messages when building (may - affect test results) - """ - cmds = ['make', - '--directory=' + ec_dir, - 'BOARD=' + self.board, - 'CTS_MODULE=' + module, - '-j', - '-B'] - - if debug: - cmds.append('CTS_DEBUG=TRUE') - - print 'EC directory is ' + ec_dir - print ( - 'Building module \'' + module + '\' for ' + self.board + - 'with debug = ' + str(debug)) - sp.call(cmds) - - def flash(self): - """Flashes board with most recent build ec.bin""" - flash_cmds = [ - 'reset_config connect_assert_srst', - 'init', - 'reset init', - 'flash write_image erase build/' + - self.board + - '/ec.bin ' + - self.flash_offset, - 'reset'] - - self.sendOpenOcdCommands(flash_cmds) - - def toString(self): - s = ('Type: Board\n' - 'board: ' + self.board + '\n' - 'hla_serial: ' + self.hla_serial + '\n' - 'config: ' + Board.configs[self.board] + '\n' - 'tty_port ' + self.tty_port + '\n' - '_tty_descriptor: ' + str(self._tty_descriptor) + '\n') - return s - - def reset(self): - """Reset board (used when can't connect to TTY)""" - self.sendOpenOcdCommands(['init', 'reset init', 'resume']) - - def setupForOutput(self): - """Call this before trying to call readOutput for the first time. - This is not in the initialization because caller only should call - this function after serial numbers are setup - """ - self.updateSerial() - self.reset() - self._identifyTtyPort() - - # In testing 3 retries is enough to reset board (2 can fail) - num_file_setup_retries = 3 - # In testing, 10 seconds is sufficient to allow board to reconnect - reset_wait_time_seconds = 10 - try: - self._getDevFileDescriptor() - # If board was just connected, must be reset to be read from - except (IOError, OSError): - for i in range(num_file_setup_retries): - self.reset() - time.sleep(reset_wait_time_seconds) - try: - self._getDevFileDescriptor() - break - except (IOError, OSError): - continue - if self._tty_descriptor is None: - raise ValueError('Unable to read ' + self.name + '\n' - 'If you are running cat on a ttyACMx file,\n' - 'please kill that process and try again') - - def readAvailableBytes(self): - """Read info from a serial port described by a file descriptor - - Return: - Bytes that UART has output - """ - buf = [] - while True: - if select.select([self._tty_descriptor], [], [], 1)[0]: - buf.append(os.read(self._tty_descriptor, 1)) - else: - break - result = ''.join(buf) - return result - - def _identifyTtyPort(self): - """Saves this board's serial port""" - dev_dir = '/dev/' - id_prefix = 'ID_SERIAL_SHORT=' - num_reset_tries = 3 - reset_wait_time_s = 10 - com_devices = [f for f in os.listdir( - dev_dir) if f.startswith('ttyACM')] - - for i in range(num_reset_tries): - for device in com_devices: - self.tty_port = os.path.join(dev_dir, device) - properties = sp.check_output(['udevadm', - 'info', - '-a', - '-n', - self.tty_port, - '--query=property']) - for line in [l.strip() for l in properties.split('\n')]: - if line.startswith(id_prefix): - if self.hla_serial == line[len(id_prefix):]: - return - if i != num_reset_tries - 1: # No need to reset the obard the last time - self.reset() # May need to reset to connect - time.sleep(reset_wait_time_s) - - # If we get here without returning, something is wrong - raise RuntimeError('The device dev path could not be found') - - def _getDevFileDescriptor(self): - """Read available bytes from device dev path""" - fd = os.open(self.tty_port, os.O_RDONLY) - flag = fcntl.fcntl(fd, fcntl.F_GETFL) - fcntl.fcntl(fd, fcntl.F_SETFL, flag | os.O_NONBLOCK) - self._tty_descriptor = fd - -class TestHarness(Board): - """Subclass of Board representing a Test Harness - - Attributes: - serial_path: Path to file containing serial number - """ - - def __init__(self, serial_path=None): - """Initializes a board object with given attributes - - Args: - serial_path: Path to file containing serial number - """ - Board.__init__(self, TH_BOARD) - self.serial_path = serial_path - - def updateSerial(self): - """Loads serial number from saved location""" - if self.hla_serial: - return # serial was already loaded - try: - with open(self.serial_path, mode='r') as ser_f: - self.hla_serial = ser_f.read() - return - except IOError: - msg = ('Your th hla_serial may not have been saved.\n' - 'Connect only your th and run ./cts --setup, then try again.') - raise RuntimeError(msg) - - def saveSerial(self): - """Saves the th serial number to a file - - Return: the serial number saved - """ - serial = Cts.getSerialNumbers() - if len(serial) != 1: - msg = ('TH could not be identified.\n' - '\nConnect your TH and remove other st-link devices') - raise RuntimeError(msg) - else: - ser = serial[0] - if not ser: - msg = ('Unable to save serial') - raise RuntimeError(msg) - if not os.path.exists(os.path.dirname(self.serial_path)): - os.makedirs(os.path.dirname(self.serial_path)) - with open(self.serial_path, mode='w') as ser_f: - ser_f.write(ser) - self.hla_serial = ser - return ser - -class DeviceUnderTest(Board): - """Subclass of Board representing a DUT board - - Attributes: - th: Reference to test harness board to which this DUT is attached - """ - - def __init__(self, board, th, hla_ser=None, f_offset='0x08000000'): - """Initializes a Device Under Test object with given attributes - - Args: - board: String containing board name - th: Reference to test harness board to which this DUT is attached - hla_serial: Serial number if board uses an HLA adaptor - """ - Board.__init__(self, board, hla_serial=hla_ser, flash_offset=f_offset) - self.th = th - - def updateSerial(self): - """Stores the DUT's serial number. - - Precondition: The DUT and TH must both be connected, and th.hla_serial - must hold the correct value (the th's serial #) - """ - if self.hla_serial != None: - return # serial was already set ('' is a valid serial) - serials = Cts.getSerialNumbers() - dut = [s for s in serials if self.th.hla_serial != s] - if len(dut) == 1: - self.hla_serial = dut[0] - return # Found your other st-link device serial! - else: - raise RuntimeError('Your TH serial number is incorrect, or your have' - ' too many st-link devices attached.') - # If len(dut) is 0 then your dut doesn't use an st-link device, so we - # don't have to worry about its serial number class Cts(object): """Class that represents a CTS testing setup and provides @@ -312,7 +47,6 @@ class Cts(object): dut: DeviceUnderTest object representing dut th: TestHarness object representing th module: Name of module to build/run tests for - ec_directory: String containing path to EC top level directory test_names: List of strings of test names contained in given module test_results: Dictionary of results of each test from module, with keys being test name strings and values being test result integers @@ -324,8 +58,7 @@ class Cts(object): messages sent while it was running """ - def __init__(self, ec_dir, - dut='nucleo-f072rb', module='gpio', debug=False): + def __init__(self, ec_dir, dut, module, debug=False): """Initializes cts class object with given arguments. Args: @@ -334,105 +67,66 @@ class Cts(object): dut: Name of board to use for DUT module: Name of module to build/run tests for debug: Boolean that indicates whether or not on-board debug message - printing should be enabled when building. + printing should be enabled. """ - self.results_dir = '/tmp/cts_results' + self.results_dir = CTS_TEST_RESULT_DIR + self.ec_dir = ec_dir self.module = module self.debug = debug - self.ec_directory = ec_dir - self.th = TestHarness() - self.dut = DeviceUnderTest(dut, self.th) # DUT constructor needs TH - - th_ser_path = os.path.join( - self.ec_directory, - 'build', - self.th.board, - 'th_hla_serial') - - self.module = module - - testlist_path = os.path.join( - self.ec_directory, - 'cts', - self.module, - 'cts.testlist') - - self.test_names = Cts._getMacroArgs(testlist_path, 'CTS_TEST') + serial_path = os.path.join(self.ec_dir, 'build', 'cts_th_serial') + self.th = board.TestHarness(DEFAULT_TH, serial_path) + self.dut = board.DeviceUnderTest(dut, self.th) + cts_dir = os.path.join(self.ec_dir, 'cts') + testlist_path = os.path.join(cts_dir, self.module, 'cts.testlist') + self.test_names = Cts.get_macro_args(testlist_path, 'CTS_TEST') self.debug_output = {} for test in self.test_names: self.debug_output[test] = [] - self.th.serial_path = th_ser_path - - - return_codes_path = os.path.join(self.ec_directory, - 'cts', - 'common', - 'cts.rc') - - self.return_codes = dict(enumerate(Cts._getMacroArgs( + return_codes_path = os.path.join(cts_dir, 'common', 'cts.rc') + self.return_codes = dict(enumerate(Cts.get_macro_args( return_codes_path, 'CTS_RC_'))) self.return_codes[CTS_CONFLICTING_CODE] = 'RESULTS CONFLICT' self.return_codes[CTS_CORRUPTED_CODE] = 'CORRUPTED' - self.test_results = collections.OrderedDict() - def make(self): - self.dut.make(self.module, self.ec_directory, self.debug) - self.th.make(self.module, self.ec_directory, self.debug) + def build(self): + """Build images for DUT and TH""" + self.dut.build(self.module, self.ec_dir, self.debug) + self.th.build(self.module, self.ec_dir, self.debug) - def flashBoards(self): + def flash_boards(self): """Flashes th and dut boards with their most recently build ec.bin""" - self.updateSerials() + self.identify_boards() self.th.flash() self.dut.flash() def setup(self): - """Saves th serial number if th only is connected. + """Setup boards""" + self.th.save_serial() - Return: - Serial number that was saved - """ - return self.th.saveSerial() - - def updateSerials(self): + def identify_boards(self): """Updates serials of both th and dut, in that order (order matters)""" - self.th.updateSerial() - self.dut.updateSerial() + self.th.get_serial() + self.dut.get_serial() - def resetBoards(self): + def reset_boards(self): """Resets the boards and allows them to run tests Due to current (7/27/16) version of sync function, both boards must be rest and halted, with the th resuming first, in order for the test suite to run in sync """ - self.updateSerials() - self.th.sendOpenOcdCommands(['init', 'reset halt']) - self.dut.sendOpenOcdCommands(['init', 'reset halt']) - self.th.sendOpenOcdCommands(['init', 'resume']) - self.dut.sendOpenOcdCommands(['init', 'resume']) - - @staticmethod - def getSerialNumbers(): - """Gets serial numbers of all st-link v2.1 board attached to host - - Returns: - List of serials - """ - usb_args = ['lsusb', '-v', '-d', '0x0483:0x374b'] - usb_process = sp.Popen(usb_args, stdout=sp.PIPE, shell=False) - st_link_info = usb_process.communicate()[0] - st_serials = [] - for line in st_link_info.split('\n'): - if 'iSerial' in line: - st_serials.append(line.split()[2].strip()) - return st_serials + self.identify_boards() + self.th.send_open_ocd_commands(['init', 'reset halt']) + self.dut.send_open_ocd_commands(['init', 'reset halt']) + self.th.send_open_ocd_commands(['init', 'resume']) + self.dut.send_open_ocd_commands(['init', 'resume']) @staticmethod - def _getMacroArgs(filepath, macro): + def get_macro_args(filepath, macro): """Get list of args of a certain macro in a file when macro is used by itself on a line @@ -441,14 +135,15 @@ class Cts(object): macro: String containing text of macro to get args of """ args = [] - with open(filepath, 'r') as fl: - for ln in [ln for ln in fl.readlines( - ) if ln.strip().startswith(macro)]: - ln = ln.strip()[len(macro):] - args.append(ln.strip('()').replace(',', '')) + with open(filepath, 'r') as f: + for l in f.readlines(): + if not l.strip().startswith(macro): + continue + l = l.strip()[len(macro):] + args.append(l.strip('()').replace(',', '')) return args - def extractDebugOutput(self, output): + def extract_debug_output(self, output): """Append the debug messages from output to self.debug_output Args: @@ -480,7 +175,7 @@ class Cts(object): test_num += 1 i += 1 - def _parseOutput(self, r1, r2): + def parse_output(self, r1, r2): """Parse the outputs of the DUT and TH together Args; @@ -491,8 +186,8 @@ class Cts(object): first_corrupted_test = len(self.test_names) - self.extractDebugOutput(r1) - self.extractDebugOutput(r2) + self.extract_debug_output(r1) + self.extract_debug_output(r2) for output_str in [r1, r2]: test_num = 0 @@ -527,7 +222,7 @@ class Cts(object): for test in self.test_names[first_corrupted_test:]: self.test_results[test] = CTS_CORRUPTED_CODE - def _resultsAsString(self): + def _results_as_string(self): """Takes saved results and returns a duplicate of their dictionary with the return codes replaces with their string representation @@ -541,13 +236,13 @@ class Cts(object): result[test] = self.return_codes.get(code, 'UNKNOWN %d' % code) return result - def prettyResults(self): + def prettify_results(self): """Takes saved results and returns a string representation of them Return: Dictionary similar to self.test_results, but with strings instead of error codes """ - res = self._resultsAsString() + res = self._results_as_string() t_long = max(len(s) for s in res.keys()) e_max_len = max(len(s) for s in res.values()) @@ -560,8 +255,8 @@ class Cts(object): return pretty_results - def resultsAsHtml(self): - res = self._resultsAsString() + def results_as_html(self): + res = self._results_as_string() root = et.Element('html') head = et.SubElement(root, 'head') style = et.SubElement(head, 'style') @@ -607,39 +302,33 @@ class Cts(object): return et.tostring(root, method='html') - def resetAndRecord(self): + def run(self): """Resets boards, records test results in results dir""" - self.updateSerials() - self.dut.setupForOutput() - self.th.setupForOutput() + self.identify_boards() + self.dut.setup_tty() + self.th.setup_tty() - self.dut.readAvailableBytes() # clear buffer - self.th.readAvailableBytes() - bad_cat_message = ( - 'Output missing from boards.\n' - 'If you are running cat on a ttyACMx file,\n' - 'please kill that process and try again' - ) + # clear buffers + self.dut.read_tty() + self.th.read_tty() - self.resetBoards() + self.reset_boards() time.sleep(MAX_SUITE_TIME_SEC) - dut_results = self.dut.readAvailableBytes() - th_results = self.th.readAvailableBytes() + dut_results = self.dut.read_tty() + th_results = self.th.read_tty() if not dut_results or not th_results: + msg = ('Output missing from boards. If you have a process reading ' + 'ttyACMx, please kill that process and try again.') raise ValueError(bad_cat_message) - self._parseOutput(dut_results, th_results) - pretty_results = self.prettyResults() - html_results = self.resultsAsHtml() + self.parse_output(dut_results, th_results) + pretty_results = self.prettify_results() + html_results = self.results_as_html() - dest = os.path.join( - self.results_dir, - self.dut.board, - self.module + '.html' - ) + dest = os.path.join(self.results_dir, self.dut.board, self.module + '.html') if not os.path.exists(os.path.dirname(dest)): os.makedirs(os.path.dirname(dest)) @@ -648,42 +337,43 @@ class Cts(object): print pretty_results + def main(): - """Main entry point for cts script from command line""" - ec_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..') + """Main entry point for CTS script from command line""" + ec_dir = os.path.realpath(os.path.join( + os.path.dirname(os.path.abspath(__file__)), '..')) os.chdir(ec_dir) - dut_board = 'nucleo-f072rb' # nucleo by default - module = 'gpio' # gpio by default - debug = False + dut = DEFAULT_DUT + module = 'meta' parser = argparse.ArgumentParser(description='Used to build/flash boards') parser.add_argument('-d', - '--dut', - help='Specify DUT you want to build/flash') + '--dut', + help='Specify DUT you want to build/flash') parser.add_argument('-m', - '--module', - help='Specify module you want to build/flash') + '--module', + help='Specify module you want to build/flash') parser.add_argument('--debug', - action='store_true', - help='If building, build with debug printing enabled. This may' - 'change test results') + action='store_true', + help=('If building, build with debug printing enabled. ' + 'This may change test results')) parser.add_argument('-s', - '--setup', - action='store_true', - help='Connect only the th to save its serial') + '--setup', + action='store_true', + help='Connect only the TH to save its serial') parser.add_argument('-b', - '--build', - action='store_true', - help='Build test suite (no flashing)') + '--build', + action='store_true', + help='Build test suite (no flashing)') parser.add_argument('-f', - '--flash', - action='store_true', - help='Flash boards with most recent image and record results') + '--flash', + action='store_true', + help='Flash boards with most recent images') parser.add_argument('-r', - '--reset', - action='store_true', - help='Reset boards and save test results (no flashing)') + '--run', + action='store_true', + help='Run tests without flashing') args = parser.parse_args() @@ -691,29 +381,22 @@ def main(): module = args.module if args.dut: - dut_board = args.dut - - if args.debug: - debug = args.debug + dut = args.dut - cts_suite = Cts(ec_dir, module=module, dut=dut_board, debug=debug) + cts = Cts(ec_dir, dut=dut, module=module, debug=args.debug) if args.setup: - serial = cts_suite.setup() - print 'Your th hla_serial # has been saved as: ' + serial - - elif args.reset: - cts_suite.resetAndRecord() - + cts.setup() elif args.build: - cts_suite.make() - + cts.build() elif args.flash: - cts_suite.flashBoards() - + cts.flash_boards() + elif args.run: + cts.run() else: - cts_suite.make() - cts_suite.flashBoards() + cts.build() + cts.flash_boards() + cts.run() if __name__ == "__main__": main() |