#!/usr/bin/env python2 # # Copyright 2016 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. # A script which builds, flashes, and runs EC CTS # # Software prerequisites: # - openocd version 0.10 or above # - lsusb # - udevadm # # To try it out, hook two boards (DEFAULT_TH and DEFAULT_DUT) with USB cables # to the host and execute the script: # $ ./cts.py --debug # It'll run mock tests. The result will be stored in CTS_TEST_RESULT_DIR. import argparse import collections import os import time import common.board as board from copy import deepcopy import xml.etree.ElementTree as et from twisted.python.syslog import DEFAULT_FACILITY CTS_CORRUPTED_CODE = -2 # The test didn't execute correctly CTS_CONFLICTING_CODE = -1 # Error codes should never conflict CTS_SUCCESS_CODE = 0 CTS_COLOR_RED = '#fb7d7d' CTS_COLOR_GREEN = '#7dfb9f' DEFAULT_TH = 'stm32l476g-eval' DEFAULT_DUT = 'nucleo-f072rb' MAX_SUITE_TIME_SEC = 5 CTS_DEBUG_START = '[DEBUG]' CTS_DEBUG_END = '[DEBUG_END]' CTS_TEST_RESULT_DIR = '/tmp/cts' class Cts(object): """Class that represents a CTS testing setup and provides interface to boards (building, flashing, etc.) Attributes: dut: DeviceUnderTest object representing dut th: TestHarness object representing th module: Name of module to build/run tests for test_names: List of strings of test names contained in given module test_results: Dictionary of results of each test from module, with keys being test name strings and values being test result integers return_codes: Dict of strings of return codes, with a code's integer value being the index for the corresponding string representation debug: Boolean that indicates whether or not on-board debug message printing should be enabled when building. debug_output: Dictionary mapping test name to an array contain debug messages sent while it was running """ def __init__(self, ec_dir, dut, module, debug=False): """Initializes cts class object with given arguments. Args: dut: Name of Device Under Test (DUT) board ec_dir: String path to ec directory dut: Name of board to use for DUT module: Name of module to build/run tests for debug: Boolean that indicates whether or not on-board debug message printing should be enabled. """ self.results_dir = CTS_TEST_RESULT_DIR self.ec_dir = ec_dir self.module = module self.debug = debug serial_path = os.path.join(self.ec_dir, 'build', 'cts_th_serial') self.th = board.TestHarness(DEFAULT_TH, serial_path) self.dut = board.DeviceUnderTest(dut, self.th) cts_dir = os.path.join(self.ec_dir, 'cts') testlist_path = os.path.join(cts_dir, self.module, 'cts.testlist') self.test_names = Cts.get_macro_args(testlist_path, 'CTS_TEST') self.debug_output = {} for test in self.test_names: self.debug_output[test] = [] return_codes_path = os.path.join(cts_dir, 'common', 'cts.rc') self.return_codes = dict(enumerate(Cts.get_macro_args( return_codes_path, 'CTS_RC_'))) self.return_codes[CTS_CONFLICTING_CODE] = 'RESULTS CONFLICT' self.return_codes[CTS_CORRUPTED_CODE] = 'CORRUPTED' self.test_results = collections.OrderedDict() def build(self): """Build images for DUT and TH""" if self.dut.build(self.module, self.ec_dir, self.debug): raise RuntimeError('Building module %s for DUT failed' % (self.module)) if self.th.build(self.module, self.ec_dir, self.debug): raise RuntimeError('Building module %s for TH failed' % (self.module)) def flash_boards(self): """Flashes th and dut boards with their most recently build ec.bin""" cts_module = 'cts_' + self.module image_path = os.path.join('build', self.th.board, cts_module, 'ec.bin') self.identify_boards() print 'Flashing TH with', image_path if self.th.flash(image_path): raise RuntimeError('Flashing TH failed') image_path = os.path.join('build', self.dut.board, cts_module, 'ec.bin') print 'Flashing DUT with', image_path if self.dut.flash(image_path): raise RuntimeError('Flashing DUT failed') def setup(self): """Setup boards""" self.th.save_serial() def identify_boards(self): """Updates serials of both th and dut, in that order (order matters)""" self.th.get_serial() self.dut.get_serial() @staticmethod def get_macro_args(filepath, macro): """Get list of args of a certain macro in a file when macro is used by itself on a line Args: filepath: String containing absolute path to the file macro: String containing text of macro to get args of """ args = [] with open(filepath, 'r') as f: for l in f.readlines(): if not l.strip().startswith(macro): continue l = l.strip()[len(macro):] args.append(l.strip('()').replace(',', '')) return args def extract_debug_output(self, output): """Append the debug messages from output to self.debug_output Args: output: String containing output from which to extract debug messages """ lines = [ln.strip() for ln in output.split('\n')] test_num = 0 i = 0 message_buf = [] while i < len(lines): if test_num >= len(self.test_names): break if lines[i].strip() == CTS_DEBUG_START: i += 1 msg = '' while i < len(lines): if lines[i] == CTS_DEBUG_END: break else: msg += lines[i] + '\n' i += 1 message_buf.append(msg) else: current_test = self.test_names[test_num] if lines[i].strip().startswith(current_test): self.debug_output[current_test] += message_buf message_buf = [] test_num += 1 i += 1 def parse_output(self, r1, r2): """Parse the outputs of the DUT and TH together Args; r1: String output of one of the DUT or the TH (order does not matter) r2: String output of one of the DUT or the TH (order does not matter) """ self.test_results.clear() # empty out any old results first_corrupted_test = len(self.test_names) self.extract_debug_output(r1) self.extract_debug_output(r2) for output_str in [r1, r2]: test_num = 0 for ln in [ln.strip() for ln in output_str.split('\n')]: tokens = ln.split() if len(tokens) != 2: continue test = tokens[0].strip() if test not in self.test_names: continue try: return_code = int(tokens[1]) except ValueError: # Second token is not an int continue if test != self.test_names[test_num]: first_corrupted_test = test_num break # Results after this test are corrupted elif self.test_results.get( test, CTS_SUCCESS_CODE) == CTS_SUCCESS_CODE: self.test_results[test] = return_code elif return_code == CTS_SUCCESS_CODE: pass elif return_code != self.test_results[test]: self.test_results[test] = CTS_CONFLICTING_CODE test_num += 1 if test_num != len(self.test_names): # If a suite didn't finish first_corrupted_test = min(first_corrupted_test, test_num) if first_corrupted_test < len(self.test_names): for test in self.test_names[first_corrupted_test:]: self.test_results[test] = CTS_CORRUPTED_CODE def _results_as_string(self): """Takes saved results and returns a duplicate of their dictionary with the return codes replaces with their string representation Returns: dictionary with test name strings as keys and test result strings as values """ result = deepcopy(self.test_results) # Convert codes to strings for test, code in result.items(): result[test] = self.return_codes.get(code, 'UNKNOWN %d' % code) return result def prettify_results(self): """Takes saved results and returns a string representation of them Return: Dictionary similar to self.test_results, but with strings instead of error codes """ res = self._results_as_string() t_long = max(len(s) for s in res.keys()) e_max_len = max(len(s) for s in res.values()) pretty_results = 'CTS Test Results for ' + self.module + ' module:\n' for test, code in res.items(): align_str = '\n{0:<' + str(t_long) + \ '} {1:>' + str(e_max_len) + '}' pretty_results += align_str.format(test, code) return pretty_results def results_as_html(self): res = self._results_as_string() root = et.Element('html') head = et.SubElement(root, 'head') style = et.SubElement(head, 'style') style.text = ('table, td, th {border: 1px solid black;}' 'body {font-family: \"Lucida Console\", Monaco, monospace') body = et.SubElement(root, 'body') table = et.SubElement(body, 'table') table.set('style','width:100%') title_row = et.SubElement(table, 'tr') test_name_title = et.SubElement(title_row, 'th') test_name_title.text = 'Test Name' test_name_title.set('style', 'white-space : nowrap') test_results_title = et.SubElement(title_row, 'th') test_results_title.text = 'Test Result' test_results_title.set('style', 'white-space : nowrap') test_debug_title = et.SubElement(title_row, 'th') test_debug_title.text = 'Debug Output' test_debug_title.set('style', 'width:99%') for name, result in res.items(): row = et.SubElement(table, 'tr') name_e = et.SubElement(row, 'td') name_e.text = name name_e.set('style', 'white-space : nowrap') result_e = et.SubElement(row, 'td') result_e.text = result result_e.set('style', 'white-space : nowrap') debug_e = et.SubElement(row, 'td') debug_e.set('style', 'width:99%') debug_e.set('style', 'white-space : pre-wrap') if len(self.debug_output[name]) == 0: debug_e.text = 'None' else: combined_message = '' for msg in self.debug_output[name]: combined_message += msg combined_message = combined_message debug_e.text = combined_message if result == self.return_codes[CTS_SUCCESS_CODE]: result_e.set('bgcolor', CTS_COLOR_GREEN) else: result_e.set('bgcolor', CTS_COLOR_RED) return et.tostring(root, method='html') def run(self): """Resets boards, records test results in results dir""" self.identify_boards() self.dut.setup_tty() self.th.setup_tty() # Boards might be still writing to tty. Wait a few seconds before flashing. time.sleep(3) # clear buffers self.dut.read_tty() self.th.read_tty() # Resets the boards and allows them to run tests # Due to current (7/27/16) version of sync function, # both boards must be rest and halted, with the th # resuming first, in order for the test suite to run in sync print 'Halting TH...' self.th.send_open_ocd_commands(['init', 'reset halt']) print 'Resetting DUT...' self.dut.send_open_ocd_commands(['init', 'reset halt']) print 'Resuming TH...' self.th.send_open_ocd_commands(['init', 'resume']) print 'Resuming DUT...' self.dut.send_open_ocd_commands(['init', 'resume']) time.sleep(MAX_SUITE_TIME_SEC) dut_results = self.dut.read_tty() th_results = self.th.read_tty() if not dut_results or not th_results: raise ValueError('Output missing from boards. If you have a process ' 'reading ttyACMx, please kill that process and try ' 'again.') self.parse_output(dut_results, th_results) pretty_results = self.prettify_results() html_results = self.results_as_html() dest = os.path.join(self.results_dir, self.dut.board, self.module + '.html') if not os.path.exists(os.path.dirname(dest)): os.makedirs(os.path.dirname(dest)) with open(dest, 'w') as fl: fl.write(html_results) print pretty_results def main(): """Main entry point for CTS script from command line""" ec_dir = os.path.realpath(os.path.join( os.path.dirname(os.path.abspath(__file__)), '..')) os.chdir(ec_dir) dut = DEFAULT_DUT module = 'meta' parser = argparse.ArgumentParser(description='Used to build/flash boards') parser.add_argument('-d', '--dut', help='Specify DUT you want to build/flash') parser.add_argument('-m', '--module', help='Specify module you want to build/flash') parser.add_argument('--debug', action='store_true', help=('If building, build with debug printing enabled. ' 'This may change test results')) parser.add_argument('-s', '--setup', action='store_true', help='Connect only the TH to save its serial') parser.add_argument('-b', '--build', action='store_true', help='Build test suite (no flashing)') parser.add_argument('-f', '--flash', action='store_true', help='Flash boards with most recent images') parser.add_argument('-r', '--run', action='store_true', help='Run tests without flashing') args = parser.parse_args() if args.module: module = args.module if args.dut: dut = args.dut cts = Cts(ec_dir, dut=dut, module=module, debug=args.debug) if args.setup: cts.setup() elif args.build: cts.build() elif args.flash: cts.flash_boards() elif args.run: cts.run() else: cts.build() cts.flash_boards() cts.run() if __name__ == "__main__": main()