summaryrefslogtreecommitdiff
path: root/cts
diff options
context:
space:
mode:
authorDaisuke Nojiri <dnojiri@chromium.org>2016-11-10 12:26:10 -0800
committerchrome-bot <chrome-bot@chromium.org>2017-06-16 21:10:54 -0700
commit3bb2d756e579be92a87ff6a17f26f78af3350e4b (patch)
treef2cf147017e47a1801f3304b6684eb289ab55031 /cts
parent3e4d3fd71249d9511ff8b61b44dbc358191cb90f (diff)
downloadchrome-ec-3bb2d756e579be92a87ff6a17f26f78af3350e4b.tar.gz
eCTS: Limit tty reads by boot counts
Currently, read_tty reads characters from tty as long as there is something to read. This causes read_tty to loop forever if the board is in reboot loop. This patch makes cts.py stop reading tty if boot count exceeds max_boot_count. Reboot is detected by detecting REBOOT_MARKER. This patch also does: - Remove debug option: This adds complexity for no real value. Developers should debug tests using regular tools (make, uart console, etc.). - Remove html output. Nobody use it. Should be redone when it's needed using proper libraries. BUG=chromium:664309 BRANCH=none TEST=cts.py -m task/gpio/interrupt Change-Id: I51d1dd51c4097e8115ef04ad46853720295141b4 Signed-off-by: Daisuke Nojiri <dnojiri@chromium.org> Reviewed-on: https://chromium-review.googlesource.com/410281 Reviewed-by: Randall Spangler <rspangler@chromium.org>
Diffstat (limited to 'cts')
-rw-r--r--cts/common/board.py85
-rwxr-xr-xcts/cts.py272
2 files changed, 115 insertions, 242 deletions
diff --git a/cts/common/board.py b/cts/common/board.py
index 7b618c8300..d478b1fca8 100644
--- a/cts/common/board.py
+++ b/cts/common/board.py
@@ -22,6 +22,7 @@ FLASH_OFFSETS = {
'nucleo-f072rb': '0x08000000',
'nucleo-f411re': '0x08000000',
}
+REBOOT_MARKER = 'UART initialized after reboot'
class Board(object):
@@ -119,13 +120,11 @@ class Board(object):
with open(self.openocd_log) as log:
print log.read()
- def build(self, module, ec_dir, debug=False):
+ def build(self, module, ec_dir):
"""Builds test suite module for board
Args:
ec_dir: String of the ec directory path
- debug: True means compile in debug messages when building (may
- affect test results)
"""
cmds = ['make',
'--directory=' + ec_dir,
@@ -133,9 +132,6 @@ class Board(object):
'CTS_MODULE=' + self.module,
'-j']
- if debug:
- cmds.append('CTS_DEBUG=TRUE')
-
rv = 1
with open(self.build_log, 'a') as output:
rv = sp.call(cmds, stdout=output, stderr=sp.STDOUT)
@@ -167,11 +163,11 @@ class Board(object):
return s
def reset(self):
- """Reset board (used when can't connect to TTY)"""
- return self.send_open_ocd_commands(['init', 'reset init', 'resume'])
+ """Reset then halt board """
+ return self.send_open_ocd_commands(['init', 'reset halt'])
def setup_tty(self):
- """Call this before trying to call readOutput for the first time.
+ """Call this before calling read_tty for the first time.
This is not in the initialization because caller only should call
this function after serial numbers are setup
"""
@@ -179,68 +175,67 @@ class Board(object):
self.reset()
self.identify_tty_port()
- # In testing 3 retries is enough to reset board (2 can fail)
- num_file_setup_retries = 3
- # In testing, 10 seconds is sufficient to allow board to reconnect
- reset_wait_time_seconds = 10
tty = None
try:
tty = self.open_tty()
- # If board was just connected, must be reset to be read from
except (IOError, OSError):
- for i in range(num_file_setup_retries):
- self.reset()
- time.sleep(reset_wait_time_seconds)
- try:
- tty = self.open_tty()
- break
- except (IOError, OSError):
- continue
- if not tty:
raise ValueError('Unable to read ' + self.board + '. If you are running '
'cat on a ttyACMx file, please kill that process and '
'try again')
self.tty = tty
- def read_tty(self):
+ def read_tty(self, max_boot_count=1):
"""Read info from a serial port described by a file descriptor
+ Args:
+ max_boot_count: Stop reading if boot count exceeds this number
+
Return:
- Bytes that UART has output
+ result: characters read from tty
+ boot: boot counts
"""
buf = []
+ line = []
+ boot = 0
while True:
if select.select([self.tty], [], [], 1)[0]:
- buf.append(os.read(self.tty, 1))
+ c = os.read(self.tty, 1)
else:
break
+ line.append(c)
+ if c == '\n':
+ l = ''.join(line)
+ buf.append(l)
+ if REBOOT_MARKER in l:
+ boot += 1
+ line = []
+ if boot > max_boot_count:
+ break
+
+ l = ''.join(line)
+ buf.append(l)
result = ''.join(buf)
- return result
+
+ return result, boot
def identify_tty_port(self):
"""Saves this board's serial port"""
dev_dir = '/dev'
id_prefix = 'ID_SERIAL_SHORT='
- num_reset_tries = 3
- reset_wait_time_s = 10
com_devices = [f for f in os.listdir(dev_dir) if f.startswith('ttyACM')]
- for i in range(num_reset_tries):
- for device in com_devices:
- self.tty_port = os.path.join(dev_dir, device)
- properties = sp.check_output(['udevadm',
- 'info',
- '-a',
- '-n',
- self.tty_port,
- '--query=property'])
- for line in [l.strip() for l in properties.split('\n')]:
- if line.startswith(id_prefix):
- if self.hla_serial == line[len(id_prefix):]:
- return
- if i != num_reset_tries - 1: # No need to reset the board the last time
- self.reset() # May need to reset to connect
- time.sleep(reset_wait_time_s)
+ for device in com_devices:
+ self.tty_port = os.path.join(dev_dir, device)
+ properties = sp.check_output(['udevadm',
+ 'info',
+ '-a',
+ '-n',
+ self.tty_port,
+ '--query=property'])
+ for line in [l.strip() for l in properties.split('\n')]:
+ if line.startswith(id_prefix):
+ if self.hla_serial == line[len(id_prefix):]:
+ return
# If we get here without returning, something is wrong
raise RuntimeError('The device dev path could not be found')
diff --git a/cts/cts.py b/cts/cts.py
index 1ae4fb13c2..0c6a23cd7c 100755
--- a/cts/cts.py
+++ b/cts/cts.py
@@ -12,19 +12,16 @@
#
# To try it out, hook two boards (DEFAULT_TH and DEFAULT_DUT) with USB cables
# to the host and execute the script:
-# $ ./cts.py --debug
+# $ ./cts.py
# It'll run mock tests. The result will be stored in CTS_TEST_RESULT_DIR.
import argparse
-import collections
-import os
-import time
+from collections import defaultdict
import common.board as board
-from copy import deepcopy
-import xml.etree.ElementTree as et
-from twisted.python.syslog import DEFAULT_FACILITY
+import os
import shutil
+import time
CTS_CORRUPTED_CODE = -2 # The test didn't execute correctly
@@ -35,8 +32,6 @@ CTS_COLOR_GREEN = '#7dfb9f'
DEFAULT_TH = 'stm32l476g-eval'
DEFAULT_DUT = 'nucleo-f072rb'
MAX_SUITE_TIME_SEC = 5
-CTS_DEBUG_START = '[DEBUG]'
-CTS_DEBUG_END = '[DEBUG_END]'
CTS_TEST_RESULT_DIR = '/tmp/ects'
@@ -49,26 +44,18 @@ class Cts(object):
th: TestHarness object representing th
module: Name of module to build/run tests for
test_names: List of strings of test names contained in given module
- test_results: Dictionary of results of each test from module, with
- keys being test name strings and values being test result integers
return_codes: Dict of strings of return codes, with a code's integer
value being the index for the corresponding string representation
- debug: Boolean that indicates whether or not on-board debug message
- printing should be enabled when building.
- debug_output: Dictionary mapping test name to an array contain debug
- messages sent while it was running
"""
- def __init__(self, ec_dir, th, dut, module, debug=False):
+ def __init__(self, ec_dir, th, dut, module):
"""Initializes cts class object with given arguments.
Args:
- dut: Name of Device Under Test (DUT) board
- ec_dir: String path to ec directory
- dut: Name of board to use for DUT
- module: Name of module to build/run tests for
- debug: Boolean that indicates whether or not on-board debug message
- printing should be enabled.
+ ec_dir: Path to ec directory
+ th: Name of the test harness board
+ dut: Name of the device under test board
+ module: Name of module to build/run tests for (e.g. gpio, interrupt)
"""
self.results_dir = os.path.join(CTS_TEST_RESULT_DIR, dut, module)
if os.path.isdir(self.results_dir):
@@ -77,7 +64,6 @@ class Cts(object):
os.makedirs(self.results_dir)
self.ec_dir = ec_dir
self.module = module
- self.debug = debug
serial_path = os.path.join(CTS_TEST_RESULT_DIR, 'th_serial')
self.th = board.TestHarness(th, module, self.results_dir, serial_path)
self.dut = board.DeviceUnderTest(dut, self.th, module, self.results_dir)
@@ -85,25 +71,20 @@ class Cts(object):
testlist_path = os.path.join(cts_dir, self.module, 'cts.testlist')
self.test_names = Cts.get_macro_args(testlist_path, 'CTS_TEST')
- self.debug_output = {}
- for test in self.test_names:
- self.debug_output[test] = []
-
return_codes_path = os.path.join(cts_dir, 'common', 'cts.rc')
self.return_codes = dict(enumerate(Cts.get_macro_args(
return_codes_path, 'CTS_RC_')))
self.return_codes[CTS_CONFLICTING_CODE] = 'RESULTS CONFLICT'
self.return_codes[CTS_CORRUPTED_CODE] = 'CORRUPTED'
- self.test_results = collections.OrderedDict()
def build(self):
"""Build images for DUT and TH"""
print 'Building DUT image...'
- if not self.dut.build(self.module, self.ec_dir, self.debug):
+ if not self.dut.build(self.module, self.ec_dir):
raise RuntimeError('Building module %s for DUT failed' % (self.module))
print 'Building TH image...'
- if not self.th.build(self.module, self.ec_dir, self.debug):
+ if not self.th.build(self.module, self.ec_dir):
raise RuntimeError('Building module %s for TH failed' % (self.module))
def flash_boards(self):
@@ -146,176 +127,69 @@ class Cts(object):
args.append(l.strip('()').replace(',', ''))
return args
- def extract_debug_output(self, output):
- """Append the debug messages from output to self.debug_output
+ def parse_output(self, output):
+ results = defaultdict(lambda: CTS_CORRUPTED_CODE)
- Args:
- output: String containing output from which to extract debug
- messages
- """
- lines = [ln.strip() for ln in output.split('\n')]
- test_num = 0
- i = 0
- message_buf = []
- while i < len(lines):
- if test_num >= len(self.test_names):
- break
- if lines[i].strip() == CTS_DEBUG_START:
- i += 1
- msg = ''
- while i < len(lines):
- if lines[i] == CTS_DEBUG_END:
- break
- else:
- msg += lines[i] + '\n'
- i += 1
- message_buf.append(msg)
- else:
- current_test = self.test_names[test_num]
- if lines[i].strip().startswith(current_test):
- self.debug_output[current_test] += message_buf
- message_buf = []
- test_num += 1
- i += 1
-
- def parse_output(self, r1, r2):
- """Parse the outputs of the DUT and TH together
+ for ln in [ln.strip() for ln in output.split('\n')]:
+ tokens = ln.split()
+ if len(tokens) != 2:
+ continue
+ test_name = tokens[0].strip()
+ if test_name not in self.test_names:
+ continue
+ try:
+ return_code = int(tokens[1])
+ except ValueError: # Second token is not an int
+ continue
+ results[test_name] = return_code
- Args;
- r1: String output of one of the DUT or the TH (order does not matter)
- r2: String output of one of the DUT or the TH (order does not matter)
- """
- self.test_results.clear() # empty out any old results
+ return results
- first_corrupted_test = len(self.test_names)
+ def get_return_code_name(self, code):
+ return self.return_codes.get(code, '%d' % code)
- self.extract_debug_output(r1)
- self.extract_debug_output(r2)
+ def evaluate_run(self, dut_output, th_output):
+ """Parse outputs to derive test results
- for output_str in [r1, r2]:
- test_num = 0
- for ln in [ln.strip() for ln in output_str.split('\n')]:
- tokens = ln.split()
- if len(tokens) != 2:
- continue
- test = tokens[0].strip()
- if test not in self.test_names:
- continue
- try:
- return_code = int(tokens[1])
- except ValueError: # Second token is not an int
- continue
- if test != self.test_names[test_num]:
- first_corrupted_test = test_num
- break # Results after this test are corrupted
- elif self.test_results.get(
- test,
- CTS_SUCCESS_CODE) == CTS_SUCCESS_CODE:
- self.test_results[test] = return_code
- elif return_code == CTS_SUCCESS_CODE:
- pass
- elif return_code != self.test_results[test]:
- self.test_results[test] = CTS_CONFLICTING_CODE
- test_num += 1
-
- if test_num != len(self.test_names): # If a suite didn't finish
- first_corrupted_test = min(first_corrupted_test, test_num)
-
- if first_corrupted_test < len(self.test_names):
- for test in self.test_names[first_corrupted_test:]:
- self.test_results[test] = CTS_CORRUPTED_CODE
-
- def _results_as_string(self):
- """Takes saved results and returns a duplicate of their dictionary
- with the return codes replaces with their string representation
-
- Returns:
- dictionary with test name strings as keys and test result strings
- as values
+ Args;
+ dut_output: String output of DUT
+ th_output: String output of TH
"""
- result = deepcopy(self.test_results)
- # Convert codes to strings
- for test, code in result.items():
- result[test] = self.return_codes.get(code, 'UNKNOWN %d' % code)
- return result
+ dut_results = self.parse_output(dut_output)
+ th_results = self.parse_output(th_output)
- def prettify_results(self):
- """Takes saved results and returns a string representation of them
+ len_test_name = max(len(s) for s in self.test_names)
+ len_code_name = max(len(s) for s in self.return_codes.values())
- Return: Dictionary similar to self.test_results, but with strings
- instead of error codes
- """
- res = self._results_as_string()
- t_long = max(len(s) for s in res.keys())
- e_max_len = max(len(s) for s in res.values())
-
- pretty_results = 'CTS Test Results for ' + self.module + ' module:\n'
-
- for test, code in res.items():
- align_str = '\n{0:<' + str(t_long) + \
- '} {1:>' + str(e_max_len) + '}'
- pretty_results += align_str.format(test, code)
-
- return pretty_results
-
- def results_as_html(self):
- res = self._results_as_string()
- root = et.Element('html')
- head = et.SubElement(root, 'head')
- style = et.SubElement(head, 'style')
- style.text = ('table, td, th {border: 1px solid black;}'
- 'body {font-family: \"Lucida Console\", Monaco, monospace')
- body = et.SubElement(root, 'body')
- table = et.SubElement(body, 'table')
- table.set('style','width:100%')
- title_row = et.SubElement(table, 'tr')
- test_name_title = et.SubElement(title_row, 'th')
- test_name_title.text = 'Test Name'
- test_name_title.set('style', 'white-space : nowrap')
- test_results_title = et.SubElement(title_row, 'th')
- test_results_title.text = 'Test Result'
- test_results_title.set('style', 'white-space : nowrap')
- test_debug_title = et.SubElement(title_row, 'th')
- test_debug_title.text = 'Debug Output'
- test_debug_title.set('style', 'width:99%')
-
- for name, result in res.items():
- row = et.SubElement(table, 'tr')
- name_e = et.SubElement(row, 'td')
- name_e.text = name
- name_e.set('style', 'white-space : nowrap')
- result_e = et.SubElement(row, 'td')
- result_e.text = result
- result_e.set('style', 'white-space : nowrap')
- debug_e = et.SubElement(row, 'td')
- debug_e.set('style', 'width:99%')
- debug_e.set('style', 'white-space : pre-wrap')
- if len(self.debug_output[name]) == 0:
- debug_e.text = 'None'
- else:
- combined_message = ''
- for msg in self.debug_output[name]:
- combined_message += msg
- combined_message = combined_message
- debug_e.text = combined_message
- if result == self.return_codes[CTS_SUCCESS_CODE]:
- result_e.set('bgcolor', CTS_COLOR_GREEN)
- else:
- result_e.set('bgcolor', CTS_COLOR_RED)
-
- return et.tostring(root, method='html')
+ head = '{:^' + str(len_test_name) + '} '
+ head += '{:^' + str(len_code_name) + '} '
+ head += '{:^' + str(len_code_name) + '}\n'
+ fmt = '{:' + str(len_test_name) + '} '
+ fmt += '{:>' + str(len_code_name) + '} '
+ fmt += '{:>' + str(len_code_name) + '}\n'
+
+ self.formatted_results = head.format('test name', 'TH result', 'DUT result')
+ for test_name in self.test_names:
+ th_cn = self.get_return_code_name(th_results[test_name])
+ dut_cn = self.get_return_code_name(dut_results[test_name])
+ self.formatted_results += fmt.format(test_name, th_cn, dut_cn)
def run(self):
"""Resets boards, records test results in results dir"""
+ print 'Reading serials...'
self.identify_boards()
+ print 'Opening DUT tty...'
self.dut.setup_tty()
+ print 'Opening TH tty...'
self.th.setup_tty()
# Boards might be still writing to tty. Wait a few seconds before flashing.
time.sleep(3)
# clear buffers
+ print 'Clearing DUT tty...'
self.dut.read_tty()
+ print 'Clearing TH tty...'
self.th.read_tty()
# Resets the boards and allows them to run tests
@@ -337,32 +211,40 @@ class Cts(object):
time.sleep(MAX_SUITE_TIME_SEC)
- dut_results = self.dut.read_tty()
- th_results = self.th.read_tty()
+ print 'Reading DUT tty...'
+ dut_output, dut_boot = self.dut.read_tty()
+ print 'Reading TH tty...'
+ th_output, th_boot = self.th.read_tty()
- if not dut_results or not th_results:
+ print 'Halting TH...'
+ if not self.th.send_open_ocd_commands(['init', 'reset halt']):
+ raise RuntimeError('Failed to halt TH')
+ print 'Halting DUT...'
+ if not self.dut.send_open_ocd_commands(['init', 'reset halt']):
+ raise RuntimeError('Failed to halt DUT')
+
+ if not dut_output or not th_output:
raise ValueError('Output missing from boards. If you have a process '
'reading ttyACMx, please kill that process and try '
'again.')
- self.parse_output(dut_results, th_results)
- pretty_results = self.prettify_results()
- html_results = self.results_as_html()
+ print 'Pursing results...'
+ self.evaluate_run(dut_output, th_output)
- # Write results in html
- dest = os.path.join(self.results_dir, 'results.html')
+ # Write results
+ dest = os.path.join(self.results_dir, 'results.log')
with open(dest, 'w') as fl:
- fl.write(html_results)
+ fl.write(self.formatted_results)
# Write UART outputs
dest = os.path.join(self.results_dir, 'uart_th.log')
with open(dest, 'w') as fl:
- fl.write(th_results)
+ fl.write(th_output)
dest = os.path.join(self.results_dir, 'uart_dut.log')
with open(dest, 'w') as fl:
- fl.write(dut_results)
+ fl.write(dut_output)
- print pretty_results
+ print self.formatted_results
def main():
@@ -381,10 +263,6 @@ def main():
parser.add_argument('-m',
'--module',
help='Specify module you want to build/flash')
- parser.add_argument('--debug',
- action='store_true',
- help=('If building, build with debug printing enabled. '
- 'This may change test results'))
parser.add_argument('-s',
'--setup',
action='store_true',
@@ -410,7 +288,7 @@ def main():
if args.dut:
dut = args.dut
- cts = Cts(ec_dir, DEFAULT_TH, dut=dut, module=module, debug=args.debug)
+ cts = Cts(ec_dir, DEFAULT_TH, dut=dut, module=module)
if args.setup:
cts.setup()