diff options
Diffstat (limited to 'cts/cts.py')
-rwxr-xr-x | cts/cts.py | 443 |
1 files changed, 0 insertions, 443 deletions
diff --git a/cts/cts.py b/cts/cts.py deleted file mode 100755 index c3e0335cab..0000000000 --- a/cts/cts.py +++ /dev/null @@ -1,443 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2016 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. -# -# Ignore indention messages, since legacy scripts use 2 spaces instead of 4. -# pylint: disable=bad-indentation,docstring-section-indent -# pylint: disable=docstring-trailing-quotes - -# A script which builds, flashes, and runs EC CTS -# -# Software prerequisites: -# - openocd version 0.10 or above -# - lsusb -# - udevadm -# -# To try it out, hook two boards (DEFAULT_TH and DEFAULT_DUT) with USB cables -# to the host and execute the script: -# $ ./cts.py -# It'll run mock tests. The result will be stored in CTS_TEST_RESULT_DIR. - -# Note: This is a py2/3 compatible file. - -from __future__ import print_function - -import argparse -import os -import shutil -import time -import common.board as board - - -CTS_RC_PREFIX = 'CTS_RC_' -DEFAULT_TH = 'stm32l476g-eval' -DEFAULT_DUT = 'nucleo-f072rb' -MAX_SUITE_TIME_SEC = 5 -CTS_TEST_RESULT_DIR = '/tmp/ects' - -# Host only return codes. Make sure they match values in cts.rc -CTS_RC_DID_NOT_START = -1 # test did not run. -CTS_RC_DID_NOT_END = -2 # test did not run. -CTS_RC_DUPLICATE_RUN = -3 # test was run multiple times. -CTS_RC_INVALID_RETURN_CODE = -4 # failed to parse return code - - -class Cts(object): - """Class that represents a eCTS run. - - Attributes: - dut: DeviceUnderTest object representing DUT - th: TestHarness object representing a test harness - module: Name of module to build/run tests for - testlist: List of strings of test names contained in given module - return_codes: Dict of strings of return codes, with a code's integer - value being the index for the corresponding string representation - """ - - def __init__(self, ec_dir, th, dut, module): - """Initializes cts class object with given arguments. - - Args: - ec_dir: Path to ec directory - th: Name of the test harness board - dut: Name of the device under test board - module: Name of module to build/run tests for (e.g. gpio, interrupt) - """ - self.results_dir = os.path.join(CTS_TEST_RESULT_DIR, dut, module) - if os.path.isdir(self.results_dir): - shutil.rmtree(self.results_dir) - else: - os.makedirs(self.results_dir) - self.ec_dir = ec_dir - self.module = module - serial_path = os.path.join(CTS_TEST_RESULT_DIR, 'th_serial') - self.th = board.TestHarness(th, module, self.results_dir, serial_path) - self.dut = board.DeviceUnderTest(dut, self.th, module, self.results_dir) - cts_dir = os.path.join(self.ec_dir, 'cts') - testlist_path = os.path.join(cts_dir, self.module, 'cts.testlist') - return_codes_path = os.path.join(cts_dir, 'common', 'cts.rc') - self.get_return_codes(return_codes_path) - self.testlist = self.get_macro_args(testlist_path, 'CTS_TEST') - - def build(self): - """Build images for DUT and TH.""" - print('Building DUT image...') - if not self.dut.build(self.ec_dir): - raise RuntimeError('Building module %s for DUT failed' % (self.module)) - print('Building TH image...') - if not self.th.build(self.ec_dir): - raise RuntimeError('Building module %s for TH failed' % (self.module)) - - def flash_boards(self): - """Flashes TH and DUT with their most recently built ec.bin.""" - cts_module = 'cts_' + self.module - image_path = os.path.join('build', self.th.board, cts_module, 'ec.bin') - self.identify_boards() - print('Flashing TH with', image_path) - if not self.th.flash(image_path): - raise RuntimeError('Flashing TH failed') - image_path = os.path.join('build', self.dut.board, cts_module, 'ec.bin') - print('Flashing DUT with', image_path) - if not self.dut.flash(image_path): - raise RuntimeError('Flashing DUT failed') - - def setup(self): - """Setup boards.""" - self.th.save_serial() - - def identify_boards(self): - """Updates serials of TH and DUT in that order (order matters).""" - self.th.get_serial() - self.dut.get_serial() - - def get_macro_args(self, filepath, macro): - """Get list of args of a macro in a file when macro. - - Args: - filepath: String containing absolute path to the file - macro: String containing text of macro to get args of - - Returns: - List of dictionaries where each entry is: - 'name': Test name, - 'th_string': Expected string from TH, - 'dut_string': Expected string from DUT, - """ - tests = [] - with open(filepath, 'r') as f: - lines = f.readlines() - joined = ''.join(lines).replace('\\\n', '').splitlines() - for l in joined: - if not l.strip().startswith(macro): - continue - d = {} - l = l.strip()[len(macro):] - l = l.strip('()').split(',') - d['name'] = l[0].strip() - d['th_rc'] = self.get_return_code_value(l[1].strip().strip('"')) - d['th_string'] = l[2].strip().strip('"') - d['dut_rc'] = self.get_return_code_value(l[3].strip().strip('"')) - d['dut_string'] = l[4].strip().strip('"') - tests.append(d) - return tests - - def get_return_codes(self, filepath): - """Read return code names from the return code definition file.""" - self.return_codes = {} - val = 0 - with open(filepath, 'r') as f: - for line in f: - line = line.strip() - if not line.startswith(CTS_RC_PREFIX): - continue - line = line.split(',')[0] - if '=' in line: - tokens = line.split('=') - line = tokens[0].strip() - val = int(tokens[1].strip()) - self.return_codes[line] = val - val += 1 - - def parse_output(self, output): - """Parse console output from DUT or TH. - - Args: - output: String containing consoule output - - Returns: - List of dictionaries where each key and value are: - name = 'ects_test_x', - started = True/False, - ended = True/False, - rc = CTS_RC_*, - output = All text between 'ects_test_x start' and 'ects_test_x end' - """ - results = [] - i = 0 - for test in self.testlist: - results.append({}) - results[i]['name'] = test['name'] - results[i]['started'] = False - results[i]['rc'] = CTS_RC_DID_NOT_START - results[i]['string'] = False - results[i]['output'] = [] - i += 1 - - i = 0 - for ln in [ln.strip() for ln in output.split('\n')]: - if i + 1 > len(results): - break - tokens = ln.split() - if len(tokens) >= 2: - if tokens[0].strip() == results[i]['name']: - if tokens[1].strip() == 'start': - # start line found - if results[i]['started']: # Already started - results[i]['rc'] = CTS_RC_DUPLICATE_RUN - else: - results[i]['rc'] = CTS_RC_DID_NOT_END - results[i]['started'] = True - continue - elif results[i]['started'] and tokens[1].strip() == 'end': - # end line found - results[i]['rc'] = CTS_RC_INVALID_RETURN_CODE - if len(tokens) == 3: - try: - results[i]['rc'] = int(tokens[2].strip()) - except ValueError: - pass - # Since index is incremented when 'end' is encountered, we don't - # need to check duplicate 'end'. - i += 1 - continue - if results[i]['started']: - results[i]['output'].append(ln) - - return results - - def get_return_code_name(self, code, strip_prefix=False): - name = '' - for k, v in self.return_codes.items(): - if v == code: - if strip_prefix: - name = k[len(CTS_RC_PREFIX):] - else: - name = k - return name - - def get_return_code_value(self, name): - if name: - return self.return_codes[name] - return 0 - - def evaluate_run(self, dut_output, th_output): - """Parse outputs to derive test results. - - Args: - dut_output: String output of DUT - th_output: String output of TH - - Returns: - th_results: list of test results for TH - dut_results: list of test results for DUT - """ - th_results = self.parse_output(th_output) - dut_results = self.parse_output(dut_output) - - # Search for expected string in each output - for i, v in enumerate(self.testlist): - if v['th_string'] in th_results[i]['output'] or not v['th_string']: - th_results[i]['string'] = True - if v['dut_string'] in dut_results[i]['output'] or not v['dut_string']: - dut_results[i]['string'] = True - - return th_results, dut_results - - def print_result(self, th_results, dut_results): - """Print results to the screen. - - Args: - th_results: list of test results for TH - dut_results: list of test results for DUT - """ - len_test_name = max(len(s['name']) for s in self.testlist) - len_code_name = max(len(self.get_return_code_name(v, True)) - for v in self.return_codes.values()) - - head = '{:^' + str(len_test_name) + '} ' - head += '{:^' + str(len_code_name) + '} ' - head += '{:^' + str(len_code_name) + '}' - head += '{:^' + str(len(' TH_STR')) + '}' - head += '{:^' + str(len(' DUT_STR')) + '}' - head += '{:^' + str(len(' RESULT')) + '}\n' - fmt = '{:' + str(len_test_name) + '} ' - fmt += '{:>' + str(len_code_name) + '} ' - fmt += '{:>' + str(len_code_name) + '}' - fmt += '{:>' + str(len(' TH_STR')) + '}' - fmt += '{:>' + str(len(' DUT_STR')) + '}' - fmt += '{:>' + str(len(' RESULT')) + '}\n' - - self.formatted_results = head.format( - 'TEST NAME', 'TH_RC', 'DUT_RC', - ' TH_STR', ' DUT_STR', ' RESULT') - for i, d in enumerate(dut_results): - th_cn = self.get_return_code_name(th_results[i]['rc'], True) - dut_cn = self.get_return_code_name(dut_results[i]['rc'], True) - th_res = self.evaluate_result(th_results[i], - self.testlist[i]['th_rc'], - self.testlist[i]['th_string']) - dut_res = self.evaluate_result(dut_results[i], - self.testlist[i]['dut_rc'], - self.testlist[i]['dut_string']) - self.formatted_results += fmt.format( - d['name'], th_cn, dut_cn, - 'YES' if th_results[i]['string'] else 'NO', - 'YES' if dut_results[i]['string'] else 'NO', - 'PASS' if th_res and dut_res else 'FAIL') - - def evaluate_result(self, result, expected_rc, expected_string): - if result['rc'] != expected_rc: - return False - if expected_string and expected_string not in result['output']: - return False - return True - - def run(self): - """Resets boards, records test results in results dir.""" - print('Reading serials...') - self.identify_boards() - print('Opening DUT tty...') - self.dut.setup_tty() - print('Opening TH tty...') - self.th.setup_tty() - - # Boards might be still writing to tty. Wait a few seconds before flashing. - time.sleep(3) - - # clear buffers - print('Clearing DUT tty...') - self.dut.read_tty() - print('Clearing TH tty...') - self.th.read_tty() - - # Resets the boards and allows them to run tests - # Due to current (7/27/16) version of sync function, - # both boards must be rest and halted, with the th - # resuming first, in order for the test suite to run in sync - print('Halting TH...') - if not self.th.reset_halt(): - raise RuntimeError('Failed to halt TH') - print('Halting DUT...') - if not self.dut.reset_halt(): - raise RuntimeError('Failed to halt DUT') - print('Resuming TH...') - if not self.th.resume(): - raise RuntimeError('Failed to resume TH') - print('Resuming DUT...') - if not self.dut.resume(): - raise RuntimeError('Failed to resume DUT') - - time.sleep(MAX_SUITE_TIME_SEC) - - print('Reading DUT tty...') - dut_output, _ = self.dut.read_tty() - self.dut.close_tty() - print('Reading TH tty...') - th_output, _ = self.th.read_tty() - self.th.close_tty() - - print('Halting TH...') - if not self.th.reset_halt(): - raise RuntimeError('Failed to halt TH') - print('Halting DUT...') - if not self.dut.reset_halt(): - raise RuntimeError('Failed to halt DUT') - - if not dut_output or not th_output: - raise ValueError('Output missing from boards. If you have a process ' - 'reading ttyACMx, please kill that process and try ' - 'again.') - - print('Pursing results...') - th_results, dut_results = self.evaluate_run(dut_output, th_output) - - # Print out results - self.print_result(th_results, dut_results) - - # Write results - dest = os.path.join(self.results_dir, 'results.log') - with open(dest, 'w') as fl: - fl.write(self.formatted_results) - - # Write UART outputs - dest = os.path.join(self.results_dir, 'uart_th.log') - with open(dest, 'w') as fl: - fl.write(th_output) - dest = os.path.join(self.results_dir, 'uart_dut.log') - with open(dest, 'w') as fl: - fl.write(dut_output) - - print(self.formatted_results) - - # TODO(chromium:735652): Should set exit code for the shell - - -def main(): - ec_dir = os.path.realpath(os.path.join( - os.path.dirname(os.path.abspath(__file__)), '..')) - os.chdir(ec_dir) - - dut = DEFAULT_DUT - module = 'meta' - - parser = argparse.ArgumentParser(description='Used to build/flash boards') - parser.add_argument('-d', - '--dut', - help='Specify DUT you want to build/flash') - parser.add_argument('-m', - '--module', - help='Specify module you want to build/flash') - parser.add_argument('-s', - '--setup', - action='store_true', - help='Connect only the TH to save its serial') - parser.add_argument('-b', - '--build', - action='store_true', - help='Build test suite (no flashing)') - parser.add_argument('-f', - '--flash', - action='store_true', - help='Flash boards with most recent images') - parser.add_argument('-r', - '--run', - action='store_true', - help='Run tests without flashing') - - args = parser.parse_args() - - if args.module: - module = args.module - - if args.dut: - dut = args.dut - - cts = Cts(ec_dir, DEFAULT_TH, dut=dut, module=module) - - if args.setup: - cts.setup() - elif args.build: - cts.build() - elif args.flash: - cts.flash_boards() - elif args.run: - cts.run() - else: - cts.build() - cts.flash_boards() - cts.run() - -if __name__ == '__main__': - main() |