summaryrefslogtreecommitdiff
path: root/cts/cts.py
diff options
context:
space:
mode:
authorDaisuke Nojiri <dnojiri@chromium.org>2016-09-08 15:28:39 -0700
committerchrome-bot <chrome-bot@chromium.org>2016-09-13 22:22:18 -0700
commitd6b0a1cc8809aba441f03c2090c142cfd227ce7d (patch)
tree094bfc170c5d9e06ece905406be199eaccb86c9c /cts/cts.py
parent91bd09c856cda8393c45c25e3f8b04082b6f3b74 (diff)
downloadchrome-ec-d6b0a1cc8809aba441f03c2090c142cfd227ce7d.tar.gz
cts: Refactor cts.py
Noteworthy changes: - Move Board and its child classes in common/board.py - Separate flashing and resetting. Flashing used to imply running tests. - Move up constants up for better visibility - Change default suite to 'meta' - Removed redundant code - Lots of renames (all lower case names, shorter names, etc.) BUG=none BRANCH=none TEST=Ran meta test and verify the results match the expectations Change-Id: I158d96e2ee104767d25b2e721d5206e528600381 Reviewed-on: https://chromium-review.googlesource.com/383911 Commit-Ready: Daisuke Nojiri <dnojiri@chromium.org> Tested-by: Daisuke Nojiri <dnojiri@chromium.org> Reviewed-by: Randall Spangler <rspangler@chromium.org>
Diffstat (limited to 'cts/cts.py')
-rwxr-xr-xcts/cts.py545
1 files changed, 114 insertions, 431 deletions
diff --git a/cts/cts.py b/cts/cts.py
index 1880ce6018..983b9751d2 100755
--- a/cts/cts.py
+++ b/cts/cts.py
@@ -3,306 +3,41 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-# This file is a utility to quickly flash boards
+# A script which builds, flashes, and runs EC CTS
+#
+# Software prerequisites:
+# - openocd version 0.10 or above
+# - lsusb
+# - udevadm
+#
+# To try it out, hook two boards (DEFAULT_TH and DEFAULT_DUT) with USB cables
+# to the host and execute the script:
+# $ ./cts.py --debug
+# It'll run mock tests. The result will be stored in CTS_TEST_RESULT_DIR.
+
import argparse
import collections
-import fcntl
import os
-import select
-import subprocess as sp
import time
+import common.board as board
from copy import deepcopy
-from abc import ABCMeta, abstractmethod
import xml.etree.ElementTree as et
+from twisted.python.syslog import DEFAULT_FACILITY
+
CTS_CORRUPTED_CODE = -2 # The test didn't execute correctly
CTS_CONFLICTING_CODE = -1 # Error codes should never conflict
CTS_SUCCESS_CODE = 0
CTS_COLOR_RED = '#fb7d7d'
CTS_COLOR_GREEN = '#7dfb9f'
-TH_BOARD = 'stm32l476g-eval'
-OCD_SCRIPT_DIR = '/usr/local/share/openocd/scripts'
+DEFAULT_TH = 'stm32l476g-eval'
+DEFAULT_DUT = 'nucleo-f072rb'
MAX_SUITE_TIME_SEC = 3
CTS_DEBUG_START = '[DEBUG]'
CTS_DEBUG_END = '[DEBUG_END]'
+CTS_TEST_RESULT_DIR = '/tmp/cts'
-class Board(object):
- """Class representing a single board connected to a host machine
-
- This class is abstract, subclasses must define the updateSerial()
- method
-
- Attributes:
- board: String containing actual type of board, i.e. nucleo-f072rb
- config: Directory of board config file relative to openocd's
- scripts directory
- hla_serial: String containing board's hla_serial number (if board
- is an stm32 board)
- tty_port: String that is the path to the tty port which board's
- UART outputs to
- _tty_descriptor: String of file descriptor for tty_port
- """
-
- configs = {
- 'stm32l476g-eval': 'board/stm32l4discovery.cfg',
- 'nucleo-f072rb': 'board/st_nucleo_f0.cfg'
- }
-
- __metaclass__ = ABCMeta # This is an Abstract Base Class (ABC)
- def __init__(self, board, hla_serial=None, flash_offset='0x08000000'):
- """Initializes a board object with given attributes
-
- Args:
- board: String containing board name
- hla_serial: Serial number if board's adaptor is an HLA
- """
- self.board = board
- self.hla_serial = hla_serial
- self.tty_port = None
- self._tty_descriptor = None
- self.flash_offset = flash_offset
-
- @abstractmethod
- def updateSerial(self):
- """Subclass should implement this"""
- pass
-
- def sendOpenOcdCommands(self, commands):
- """Send a command to the board via openocd
-
- Args:
- commands: A list of commands to send
- """
- args = ['openocd', '-s', OCD_SCRIPT_DIR,
- '-f', Board.configs[self.board], '-c', 'hla_serial ' + self.hla_serial]
-
- for cmd in commands:
- args += ['-c', cmd]
- args += ['-c', 'shutdown']
- sp.call(args)
-
- def make(self, module, ec_dir, debug=False):
- """Builds test suite module for board
-
- Args:
- module: String of the test module you are building,
- i.e. gpio, timer, etc.
- ec_dir: String of the ec directory path
- debug: True means compile in debug messages when building (may
- affect test results)
- """
- cmds = ['make',
- '--directory=' + ec_dir,
- 'BOARD=' + self.board,
- 'CTS_MODULE=' + module,
- '-j',
- '-B']
-
- if debug:
- cmds.append('CTS_DEBUG=TRUE')
-
- print 'EC directory is ' + ec_dir
- print (
- 'Building module \'' + module + '\' for ' + self.board +
- 'with debug = ' + str(debug))
- sp.call(cmds)
-
- def flash(self):
- """Flashes board with most recent build ec.bin"""
- flash_cmds = [
- 'reset_config connect_assert_srst',
- 'init',
- 'reset init',
- 'flash write_image erase build/' +
- self.board +
- '/ec.bin ' +
- self.flash_offset,
- 'reset']
-
- self.sendOpenOcdCommands(flash_cmds)
-
- def toString(self):
- s = ('Type: Board\n'
- 'board: ' + self.board + '\n'
- 'hla_serial: ' + self.hla_serial + '\n'
- 'config: ' + Board.configs[self.board] + '\n'
- 'tty_port ' + self.tty_port + '\n'
- '_tty_descriptor: ' + str(self._tty_descriptor) + '\n')
- return s
-
- def reset(self):
- """Reset board (used when can't connect to TTY)"""
- self.sendOpenOcdCommands(['init', 'reset init', 'resume'])
-
- def setupForOutput(self):
- """Call this before trying to call readOutput for the first time.
- This is not in the initialization because caller only should call
- this function after serial numbers are setup
- """
- self.updateSerial()
- self.reset()
- self._identifyTtyPort()
-
- # In testing 3 retries is enough to reset board (2 can fail)
- num_file_setup_retries = 3
- # In testing, 10 seconds is sufficient to allow board to reconnect
- reset_wait_time_seconds = 10
- try:
- self._getDevFileDescriptor()
- # If board was just connected, must be reset to be read from
- except (IOError, OSError):
- for i in range(num_file_setup_retries):
- self.reset()
- time.sleep(reset_wait_time_seconds)
- try:
- self._getDevFileDescriptor()
- break
- except (IOError, OSError):
- continue
- if self._tty_descriptor is None:
- raise ValueError('Unable to read ' + self.name + '\n'
- 'If you are running cat on a ttyACMx file,\n'
- 'please kill that process and try again')
-
- def readAvailableBytes(self):
- """Read info from a serial port described by a file descriptor
-
- Return:
- Bytes that UART has output
- """
- buf = []
- while True:
- if select.select([self._tty_descriptor], [], [], 1)[0]:
- buf.append(os.read(self._tty_descriptor, 1))
- else:
- break
- result = ''.join(buf)
- return result
-
- def _identifyTtyPort(self):
- """Saves this board's serial port"""
- dev_dir = '/dev/'
- id_prefix = 'ID_SERIAL_SHORT='
- num_reset_tries = 3
- reset_wait_time_s = 10
- com_devices = [f for f in os.listdir(
- dev_dir) if f.startswith('ttyACM')]
-
- for i in range(num_reset_tries):
- for device in com_devices:
- self.tty_port = os.path.join(dev_dir, device)
- properties = sp.check_output(['udevadm',
- 'info',
- '-a',
- '-n',
- self.tty_port,
- '--query=property'])
- for line in [l.strip() for l in properties.split('\n')]:
- if line.startswith(id_prefix):
- if self.hla_serial == line[len(id_prefix):]:
- return
- if i != num_reset_tries - 1: # No need to reset the obard the last time
- self.reset() # May need to reset to connect
- time.sleep(reset_wait_time_s)
-
- # If we get here without returning, something is wrong
- raise RuntimeError('The device dev path could not be found')
-
- def _getDevFileDescriptor(self):
- """Read available bytes from device dev path"""
- fd = os.open(self.tty_port, os.O_RDONLY)
- flag = fcntl.fcntl(fd, fcntl.F_GETFL)
- fcntl.fcntl(fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
- self._tty_descriptor = fd
-
-class TestHarness(Board):
- """Subclass of Board representing a Test Harness
-
- Attributes:
- serial_path: Path to file containing serial number
- """
-
- def __init__(self, serial_path=None):
- """Initializes a board object with given attributes
-
- Args:
- serial_path: Path to file containing serial number
- """
- Board.__init__(self, TH_BOARD)
- self.serial_path = serial_path
-
- def updateSerial(self):
- """Loads serial number from saved location"""
- if self.hla_serial:
- return # serial was already loaded
- try:
- with open(self.serial_path, mode='r') as ser_f:
- self.hla_serial = ser_f.read()
- return
- except IOError:
- msg = ('Your th hla_serial may not have been saved.\n'
- 'Connect only your th and run ./cts --setup, then try again.')
- raise RuntimeError(msg)
-
- def saveSerial(self):
- """Saves the th serial number to a file
-
- Return: the serial number saved
- """
- serial = Cts.getSerialNumbers()
- if len(serial) != 1:
- msg = ('TH could not be identified.\n'
- '\nConnect your TH and remove other st-link devices')
- raise RuntimeError(msg)
- else:
- ser = serial[0]
- if not ser:
- msg = ('Unable to save serial')
- raise RuntimeError(msg)
- if not os.path.exists(os.path.dirname(self.serial_path)):
- os.makedirs(os.path.dirname(self.serial_path))
- with open(self.serial_path, mode='w') as ser_f:
- ser_f.write(ser)
- self.hla_serial = ser
- return ser
-
-class DeviceUnderTest(Board):
- """Subclass of Board representing a DUT board
-
- Attributes:
- th: Reference to test harness board to which this DUT is attached
- """
-
- def __init__(self, board, th, hla_ser=None, f_offset='0x08000000'):
- """Initializes a Device Under Test object with given attributes
-
- Args:
- board: String containing board name
- th: Reference to test harness board to which this DUT is attached
- hla_serial: Serial number if board uses an HLA adaptor
- """
- Board.__init__(self, board, hla_serial=hla_ser, flash_offset=f_offset)
- self.th = th
-
- def updateSerial(self):
- """Stores the DUT's serial number.
-
- Precondition: The DUT and TH must both be connected, and th.hla_serial
- must hold the correct value (the th's serial #)
- """
- if self.hla_serial != None:
- return # serial was already set ('' is a valid serial)
- serials = Cts.getSerialNumbers()
- dut = [s for s in serials if self.th.hla_serial != s]
- if len(dut) == 1:
- self.hla_serial = dut[0]
- return # Found your other st-link device serial!
- else:
- raise RuntimeError('Your TH serial number is incorrect, or your have'
- ' too many st-link devices attached.')
- # If len(dut) is 0 then your dut doesn't use an st-link device, so we
- # don't have to worry about its serial number
class Cts(object):
"""Class that represents a CTS testing setup and provides
@@ -312,7 +47,6 @@ class Cts(object):
dut: DeviceUnderTest object representing dut
th: TestHarness object representing th
module: Name of module to build/run tests for
- ec_directory: String containing path to EC top level directory
test_names: List of strings of test names contained in given module
test_results: Dictionary of results of each test from module, with
keys being test name strings and values being test result integers
@@ -324,8 +58,7 @@ class Cts(object):
messages sent while it was running
"""
- def __init__(self, ec_dir,
- dut='nucleo-f072rb', module='gpio', debug=False):
+ def __init__(self, ec_dir, dut, module, debug=False):
"""Initializes cts class object with given arguments.
Args:
@@ -334,105 +67,66 @@ class Cts(object):
dut: Name of board to use for DUT
module: Name of module to build/run tests for
debug: Boolean that indicates whether or not on-board debug message
- printing should be enabled when building.
+ printing should be enabled.
"""
- self.results_dir = '/tmp/cts_results'
+ self.results_dir = CTS_TEST_RESULT_DIR
+ self.ec_dir = ec_dir
self.module = module
self.debug = debug
- self.ec_directory = ec_dir
- self.th = TestHarness()
- self.dut = DeviceUnderTest(dut, self.th) # DUT constructor needs TH
-
- th_ser_path = os.path.join(
- self.ec_directory,
- 'build',
- self.th.board,
- 'th_hla_serial')
-
- self.module = module
-
- testlist_path = os.path.join(
- self.ec_directory,
- 'cts',
- self.module,
- 'cts.testlist')
-
- self.test_names = Cts._getMacroArgs(testlist_path, 'CTS_TEST')
+ serial_path = os.path.join(self.ec_dir, 'build', 'cts_th_serial')
+ self.th = board.TestHarness(DEFAULT_TH, serial_path)
+ self.dut = board.DeviceUnderTest(dut, self.th)
+ cts_dir = os.path.join(self.ec_dir, 'cts')
+ testlist_path = os.path.join(cts_dir, self.module, 'cts.testlist')
+ self.test_names = Cts.get_macro_args(testlist_path, 'CTS_TEST')
self.debug_output = {}
for test in self.test_names:
self.debug_output[test] = []
- self.th.serial_path = th_ser_path
-
-
- return_codes_path = os.path.join(self.ec_directory,
- 'cts',
- 'common',
- 'cts.rc')
-
- self.return_codes = dict(enumerate(Cts._getMacroArgs(
+ return_codes_path = os.path.join(cts_dir, 'common', 'cts.rc')
+ self.return_codes = dict(enumerate(Cts.get_macro_args(
return_codes_path, 'CTS_RC_')))
self.return_codes[CTS_CONFLICTING_CODE] = 'RESULTS CONFLICT'
self.return_codes[CTS_CORRUPTED_CODE] = 'CORRUPTED'
-
self.test_results = collections.OrderedDict()
- def make(self):
- self.dut.make(self.module, self.ec_directory, self.debug)
- self.th.make(self.module, self.ec_directory, self.debug)
+ def build(self):
+ """Build images for DUT and TH"""
+ self.dut.build(self.module, self.ec_dir, self.debug)
+ self.th.build(self.module, self.ec_dir, self.debug)
- def flashBoards(self):
+ def flash_boards(self):
"""Flashes th and dut boards with their most recently build ec.bin"""
- self.updateSerials()
+ self.identify_boards()
self.th.flash()
self.dut.flash()
def setup(self):
- """Saves th serial number if th only is connected.
+ """Setup boards"""
+ self.th.save_serial()
- Return:
- Serial number that was saved
- """
- return self.th.saveSerial()
-
- def updateSerials(self):
+ def identify_boards(self):
"""Updates serials of both th and dut, in that order (order matters)"""
- self.th.updateSerial()
- self.dut.updateSerial()
+ self.th.get_serial()
+ self.dut.get_serial()
- def resetBoards(self):
+ def reset_boards(self):
"""Resets the boards and allows them to run tests
Due to current (7/27/16) version of sync function,
both boards must be rest and halted, with the th
resuming first, in order for the test suite to run
in sync
"""
- self.updateSerials()
- self.th.sendOpenOcdCommands(['init', 'reset halt'])
- self.dut.sendOpenOcdCommands(['init', 'reset halt'])
- self.th.sendOpenOcdCommands(['init', 'resume'])
- self.dut.sendOpenOcdCommands(['init', 'resume'])
-
- @staticmethod
- def getSerialNumbers():
- """Gets serial numbers of all st-link v2.1 board attached to host
-
- Returns:
- List of serials
- """
- usb_args = ['lsusb', '-v', '-d', '0x0483:0x374b']
- usb_process = sp.Popen(usb_args, stdout=sp.PIPE, shell=False)
- st_link_info = usb_process.communicate()[0]
- st_serials = []
- for line in st_link_info.split('\n'):
- if 'iSerial' in line:
- st_serials.append(line.split()[2].strip())
- return st_serials
+ self.identify_boards()
+ self.th.send_open_ocd_commands(['init', 'reset halt'])
+ self.dut.send_open_ocd_commands(['init', 'reset halt'])
+ self.th.send_open_ocd_commands(['init', 'resume'])
+ self.dut.send_open_ocd_commands(['init', 'resume'])
@staticmethod
- def _getMacroArgs(filepath, macro):
+ def get_macro_args(filepath, macro):
"""Get list of args of a certain macro in a file when macro is used
by itself on a line
@@ -441,14 +135,15 @@ class Cts(object):
macro: String containing text of macro to get args of
"""
args = []
- with open(filepath, 'r') as fl:
- for ln in [ln for ln in fl.readlines(
- ) if ln.strip().startswith(macro)]:
- ln = ln.strip()[len(macro):]
- args.append(ln.strip('()').replace(',', ''))
+ with open(filepath, 'r') as f:
+ for l in f.readlines():
+ if not l.strip().startswith(macro):
+ continue
+ l = l.strip()[len(macro):]
+ args.append(l.strip('()').replace(',', ''))
return args
- def extractDebugOutput(self, output):
+ def extract_debug_output(self, output):
"""Append the debug messages from output to self.debug_output
Args:
@@ -480,7 +175,7 @@ class Cts(object):
test_num += 1
i += 1
- def _parseOutput(self, r1, r2):
+ def parse_output(self, r1, r2):
"""Parse the outputs of the DUT and TH together
Args;
@@ -491,8 +186,8 @@ class Cts(object):
first_corrupted_test = len(self.test_names)
- self.extractDebugOutput(r1)
- self.extractDebugOutput(r2)
+ self.extract_debug_output(r1)
+ self.extract_debug_output(r2)
for output_str in [r1, r2]:
test_num = 0
@@ -527,7 +222,7 @@ class Cts(object):
for test in self.test_names[first_corrupted_test:]:
self.test_results[test] = CTS_CORRUPTED_CODE
- def _resultsAsString(self):
+ def _results_as_string(self):
"""Takes saved results and returns a duplicate of their dictionary
with the return codes replaces with their string representation
@@ -541,13 +236,13 @@ class Cts(object):
result[test] = self.return_codes.get(code, 'UNKNOWN %d' % code)
return result
- def prettyResults(self):
+ def prettify_results(self):
"""Takes saved results and returns a string representation of them
Return: Dictionary similar to self.test_results, but with strings
instead of error codes
"""
- res = self._resultsAsString()
+ res = self._results_as_string()
t_long = max(len(s) for s in res.keys())
e_max_len = max(len(s) for s in res.values())
@@ -560,8 +255,8 @@ class Cts(object):
return pretty_results
- def resultsAsHtml(self):
- res = self._resultsAsString()
+ def results_as_html(self):
+ res = self._results_as_string()
root = et.Element('html')
head = et.SubElement(root, 'head')
style = et.SubElement(head, 'style')
@@ -607,39 +302,33 @@ class Cts(object):
return et.tostring(root, method='html')
- def resetAndRecord(self):
+ def run(self):
"""Resets boards, records test results in results dir"""
- self.updateSerials()
- self.dut.setupForOutput()
- self.th.setupForOutput()
+ self.identify_boards()
+ self.dut.setup_tty()
+ self.th.setup_tty()
- self.dut.readAvailableBytes() # clear buffer
- self.th.readAvailableBytes()
- bad_cat_message = (
- 'Output missing from boards.\n'
- 'If you are running cat on a ttyACMx file,\n'
- 'please kill that process and try again'
- )
+ # clear buffers
+ self.dut.read_tty()
+ self.th.read_tty()
- self.resetBoards()
+ self.reset_boards()
time.sleep(MAX_SUITE_TIME_SEC)
- dut_results = self.dut.readAvailableBytes()
- th_results = self.th.readAvailableBytes()
+ dut_results = self.dut.read_tty()
+ th_results = self.th.read_tty()
if not dut_results or not th_results:
+ msg = ('Output missing from boards. If you have a process reading '
+ 'ttyACMx, please kill that process and try again.')
raise ValueError(bad_cat_message)
- self._parseOutput(dut_results, th_results)
- pretty_results = self.prettyResults()
- html_results = self.resultsAsHtml()
+ self.parse_output(dut_results, th_results)
+ pretty_results = self.prettify_results()
+ html_results = self.results_as_html()
- dest = os.path.join(
- self.results_dir,
- self.dut.board,
- self.module + '.html'
- )
+ dest = os.path.join(self.results_dir, self.dut.board, self.module + '.html')
if not os.path.exists(os.path.dirname(dest)):
os.makedirs(os.path.dirname(dest))
@@ -648,42 +337,43 @@ class Cts(object):
print pretty_results
+
def main():
- """Main entry point for cts script from command line"""
- ec_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..')
+ """Main entry point for CTS script from command line"""
+ ec_dir = os.path.realpath(os.path.join(
+ os.path.dirname(os.path.abspath(__file__)), '..'))
os.chdir(ec_dir)
- dut_board = 'nucleo-f072rb' # nucleo by default
- module = 'gpio' # gpio by default
- debug = False
+ dut = DEFAULT_DUT
+ module = 'meta'
parser = argparse.ArgumentParser(description='Used to build/flash boards')
parser.add_argument('-d',
- '--dut',
- help='Specify DUT you want to build/flash')
+ '--dut',
+ help='Specify DUT you want to build/flash')
parser.add_argument('-m',
- '--module',
- help='Specify module you want to build/flash')
+ '--module',
+ help='Specify module you want to build/flash')
parser.add_argument('--debug',
- action='store_true',
- help='If building, build with debug printing enabled. This may'
- 'change test results')
+ action='store_true',
+ help=('If building, build with debug printing enabled. '
+ 'This may change test results'))
parser.add_argument('-s',
- '--setup',
- action='store_true',
- help='Connect only the th to save its serial')
+ '--setup',
+ action='store_true',
+ help='Connect only the TH to save its serial')
parser.add_argument('-b',
- '--build',
- action='store_true',
- help='Build test suite (no flashing)')
+ '--build',
+ action='store_true',
+ help='Build test suite (no flashing)')
parser.add_argument('-f',
- '--flash',
- action='store_true',
- help='Flash boards with most recent image and record results')
+ '--flash',
+ action='store_true',
+ help='Flash boards with most recent images')
parser.add_argument('-r',
- '--reset',
- action='store_true',
- help='Reset boards and save test results (no flashing)')
+ '--run',
+ action='store_true',
+ help='Run tests without flashing')
args = parser.parse_args()
@@ -691,29 +381,22 @@ def main():
module = args.module
if args.dut:
- dut_board = args.dut
-
- if args.debug:
- debug = args.debug
+ dut = args.dut
- cts_suite = Cts(ec_dir, module=module, dut=dut_board, debug=debug)
+ cts = Cts(ec_dir, dut=dut, module=module, debug=args.debug)
if args.setup:
- serial = cts_suite.setup()
- print 'Your th hla_serial # has been saved as: ' + serial
-
- elif args.reset:
- cts_suite.resetAndRecord()
-
+ cts.setup()
elif args.build:
- cts_suite.make()
-
+ cts.build()
elif args.flash:
- cts_suite.flashBoards()
-
+ cts.flash_boards()
+ elif args.run:
+ cts.run()
else:
- cts_suite.make()
- cts_suite.flashBoards()
+ cts.build()
+ cts.flash_boards()
+ cts.run()
if __name__ == "__main__":
main()