summaryrefslogtreecommitdiff
path: root/cts/cts.py
diff options
context:
space:
mode:
Diffstat (limited to 'cts/cts.py')
-rwxr-xr-xcts/cts.py443
1 files changed, 0 insertions, 443 deletions
diff --git a/cts/cts.py b/cts/cts.py
deleted file mode 100755
index c3e0335cab..0000000000
--- a/cts/cts.py
+++ /dev/null
@@ -1,443 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright 2016 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-#
-# Ignore indention messages, since legacy scripts use 2 spaces instead of 4.
-# pylint: disable=bad-indentation,docstring-section-indent
-# pylint: disable=docstring-trailing-quotes
-
-# A script which builds, flashes, and runs EC CTS
-#
-# Software prerequisites:
-# - openocd version 0.10 or above
-# - lsusb
-# - udevadm
-#
-# To try it out, hook two boards (DEFAULT_TH and DEFAULT_DUT) with USB cables
-# to the host and execute the script:
-# $ ./cts.py
-# It'll run mock tests. The result will be stored in CTS_TEST_RESULT_DIR.
-
-# Note: This is a py2/3 compatible file.
-
-from __future__ import print_function
-
-import argparse
-import os
-import shutil
-import time
-import common.board as board
-
-
-CTS_RC_PREFIX = 'CTS_RC_'
-DEFAULT_TH = 'stm32l476g-eval'
-DEFAULT_DUT = 'nucleo-f072rb'
-MAX_SUITE_TIME_SEC = 5
-CTS_TEST_RESULT_DIR = '/tmp/ects'
-
-# Host only return codes. Make sure they match values in cts.rc
-CTS_RC_DID_NOT_START = -1 # test did not run.
-CTS_RC_DID_NOT_END = -2 # test did not run.
-CTS_RC_DUPLICATE_RUN = -3 # test was run multiple times.
-CTS_RC_INVALID_RETURN_CODE = -4 # failed to parse return code
-
-
-class Cts(object):
- """Class that represents a eCTS run.
-
- Attributes:
- dut: DeviceUnderTest object representing DUT
- th: TestHarness object representing a test harness
- module: Name of module to build/run tests for
- testlist: List of strings of test names contained in given module
- return_codes: Dict of strings of return codes, with a code's integer
- value being the index for the corresponding string representation
- """
-
- def __init__(self, ec_dir, th, dut, module):
- """Initializes cts class object with given arguments.
-
- Args:
- ec_dir: Path to ec directory
- th: Name of the test harness board
- dut: Name of the device under test board
- module: Name of module to build/run tests for (e.g. gpio, interrupt)
- """
- self.results_dir = os.path.join(CTS_TEST_RESULT_DIR, dut, module)
- if os.path.isdir(self.results_dir):
- shutil.rmtree(self.results_dir)
- else:
- os.makedirs(self.results_dir)
- self.ec_dir = ec_dir
- self.module = module
- serial_path = os.path.join(CTS_TEST_RESULT_DIR, 'th_serial')
- self.th = board.TestHarness(th, module, self.results_dir, serial_path)
- self.dut = board.DeviceUnderTest(dut, self.th, module, self.results_dir)
- cts_dir = os.path.join(self.ec_dir, 'cts')
- testlist_path = os.path.join(cts_dir, self.module, 'cts.testlist')
- return_codes_path = os.path.join(cts_dir, 'common', 'cts.rc')
- self.get_return_codes(return_codes_path)
- self.testlist = self.get_macro_args(testlist_path, 'CTS_TEST')
-
- def build(self):
- """Build images for DUT and TH."""
- print('Building DUT image...')
- if not self.dut.build(self.ec_dir):
- raise RuntimeError('Building module %s for DUT failed' % (self.module))
- print('Building TH image...')
- if not self.th.build(self.ec_dir):
- raise RuntimeError('Building module %s for TH failed' % (self.module))
-
- def flash_boards(self):
- """Flashes TH and DUT with their most recently built ec.bin."""
- cts_module = 'cts_' + self.module
- image_path = os.path.join('build', self.th.board, cts_module, 'ec.bin')
- self.identify_boards()
- print('Flashing TH with', image_path)
- if not self.th.flash(image_path):
- raise RuntimeError('Flashing TH failed')
- image_path = os.path.join('build', self.dut.board, cts_module, 'ec.bin')
- print('Flashing DUT with', image_path)
- if not self.dut.flash(image_path):
- raise RuntimeError('Flashing DUT failed')
-
- def setup(self):
- """Setup boards."""
- self.th.save_serial()
-
- def identify_boards(self):
- """Updates serials of TH and DUT in that order (order matters)."""
- self.th.get_serial()
- self.dut.get_serial()
-
- def get_macro_args(self, filepath, macro):
- """Get list of args of a macro in a file when macro.
-
- Args:
- filepath: String containing absolute path to the file
- macro: String containing text of macro to get args of
-
- Returns:
- List of dictionaries where each entry is:
- 'name': Test name,
- 'th_string': Expected string from TH,
- 'dut_string': Expected string from DUT,
- """
- tests = []
- with open(filepath, 'r') as f:
- lines = f.readlines()
- joined = ''.join(lines).replace('\\\n', '').splitlines()
- for l in joined:
- if not l.strip().startswith(macro):
- continue
- d = {}
- l = l.strip()[len(macro):]
- l = l.strip('()').split(',')
- d['name'] = l[0].strip()
- d['th_rc'] = self.get_return_code_value(l[1].strip().strip('"'))
- d['th_string'] = l[2].strip().strip('"')
- d['dut_rc'] = self.get_return_code_value(l[3].strip().strip('"'))
- d['dut_string'] = l[4].strip().strip('"')
- tests.append(d)
- return tests
-
- def get_return_codes(self, filepath):
- """Read return code names from the return code definition file."""
- self.return_codes = {}
- val = 0
- with open(filepath, 'r') as f:
- for line in f:
- line = line.strip()
- if not line.startswith(CTS_RC_PREFIX):
- continue
- line = line.split(',')[0]
- if '=' in line:
- tokens = line.split('=')
- line = tokens[0].strip()
- val = int(tokens[1].strip())
- self.return_codes[line] = val
- val += 1
-
- def parse_output(self, output):
- """Parse console output from DUT or TH.
-
- Args:
- output: String containing consoule output
-
- Returns:
- List of dictionaries where each key and value are:
- name = 'ects_test_x',
- started = True/False,
- ended = True/False,
- rc = CTS_RC_*,
- output = All text between 'ects_test_x start' and 'ects_test_x end'
- """
- results = []
- i = 0
- for test in self.testlist:
- results.append({})
- results[i]['name'] = test['name']
- results[i]['started'] = False
- results[i]['rc'] = CTS_RC_DID_NOT_START
- results[i]['string'] = False
- results[i]['output'] = []
- i += 1
-
- i = 0
- for ln in [ln.strip() for ln in output.split('\n')]:
- if i + 1 > len(results):
- break
- tokens = ln.split()
- if len(tokens) >= 2:
- if tokens[0].strip() == results[i]['name']:
- if tokens[1].strip() == 'start':
- # start line found
- if results[i]['started']: # Already started
- results[i]['rc'] = CTS_RC_DUPLICATE_RUN
- else:
- results[i]['rc'] = CTS_RC_DID_NOT_END
- results[i]['started'] = True
- continue
- elif results[i]['started'] and tokens[1].strip() == 'end':
- # end line found
- results[i]['rc'] = CTS_RC_INVALID_RETURN_CODE
- if len(tokens) == 3:
- try:
- results[i]['rc'] = int(tokens[2].strip())
- except ValueError:
- pass
- # Since index is incremented when 'end' is encountered, we don't
- # need to check duplicate 'end'.
- i += 1
- continue
- if results[i]['started']:
- results[i]['output'].append(ln)
-
- return results
-
- def get_return_code_name(self, code, strip_prefix=False):
- name = ''
- for k, v in self.return_codes.items():
- if v == code:
- if strip_prefix:
- name = k[len(CTS_RC_PREFIX):]
- else:
- name = k
- return name
-
- def get_return_code_value(self, name):
- if name:
- return self.return_codes[name]
- return 0
-
- def evaluate_run(self, dut_output, th_output):
- """Parse outputs to derive test results.
-
- Args:
- dut_output: String output of DUT
- th_output: String output of TH
-
- Returns:
- th_results: list of test results for TH
- dut_results: list of test results for DUT
- """
- th_results = self.parse_output(th_output)
- dut_results = self.parse_output(dut_output)
-
- # Search for expected string in each output
- for i, v in enumerate(self.testlist):
- if v['th_string'] in th_results[i]['output'] or not v['th_string']:
- th_results[i]['string'] = True
- if v['dut_string'] in dut_results[i]['output'] or not v['dut_string']:
- dut_results[i]['string'] = True
-
- return th_results, dut_results
-
- def print_result(self, th_results, dut_results):
- """Print results to the screen.
-
- Args:
- th_results: list of test results for TH
- dut_results: list of test results for DUT
- """
- len_test_name = max(len(s['name']) for s in self.testlist)
- len_code_name = max(len(self.get_return_code_name(v, True))
- for v in self.return_codes.values())
-
- head = '{:^' + str(len_test_name) + '} '
- head += '{:^' + str(len_code_name) + '} '
- head += '{:^' + str(len_code_name) + '}'
- head += '{:^' + str(len(' TH_STR')) + '}'
- head += '{:^' + str(len(' DUT_STR')) + '}'
- head += '{:^' + str(len(' RESULT')) + '}\n'
- fmt = '{:' + str(len_test_name) + '} '
- fmt += '{:>' + str(len_code_name) + '} '
- fmt += '{:>' + str(len_code_name) + '}'
- fmt += '{:>' + str(len(' TH_STR')) + '}'
- fmt += '{:>' + str(len(' DUT_STR')) + '}'
- fmt += '{:>' + str(len(' RESULT')) + '}\n'
-
- self.formatted_results = head.format(
- 'TEST NAME', 'TH_RC', 'DUT_RC',
- ' TH_STR', ' DUT_STR', ' RESULT')
- for i, d in enumerate(dut_results):
- th_cn = self.get_return_code_name(th_results[i]['rc'], True)
- dut_cn = self.get_return_code_name(dut_results[i]['rc'], True)
- th_res = self.evaluate_result(th_results[i],
- self.testlist[i]['th_rc'],
- self.testlist[i]['th_string'])
- dut_res = self.evaluate_result(dut_results[i],
- self.testlist[i]['dut_rc'],
- self.testlist[i]['dut_string'])
- self.formatted_results += fmt.format(
- d['name'], th_cn, dut_cn,
- 'YES' if th_results[i]['string'] else 'NO',
- 'YES' if dut_results[i]['string'] else 'NO',
- 'PASS' if th_res and dut_res else 'FAIL')
-
- def evaluate_result(self, result, expected_rc, expected_string):
- if result['rc'] != expected_rc:
- return False
- if expected_string and expected_string not in result['output']:
- return False
- return True
-
- def run(self):
- """Resets boards, records test results in results dir."""
- print('Reading serials...')
- self.identify_boards()
- print('Opening DUT tty...')
- self.dut.setup_tty()
- print('Opening TH tty...')
- self.th.setup_tty()
-
- # Boards might be still writing to tty. Wait a few seconds before flashing.
- time.sleep(3)
-
- # clear buffers
- print('Clearing DUT tty...')
- self.dut.read_tty()
- print('Clearing TH tty...')
- self.th.read_tty()
-
- # Resets the boards and allows them to run tests
- # Due to current (7/27/16) version of sync function,
- # both boards must be rest and halted, with the th
- # resuming first, in order for the test suite to run in sync
- print('Halting TH...')
- if not self.th.reset_halt():
- raise RuntimeError('Failed to halt TH')
- print('Halting DUT...')
- if not self.dut.reset_halt():
- raise RuntimeError('Failed to halt DUT')
- print('Resuming TH...')
- if not self.th.resume():
- raise RuntimeError('Failed to resume TH')
- print('Resuming DUT...')
- if not self.dut.resume():
- raise RuntimeError('Failed to resume DUT')
-
- time.sleep(MAX_SUITE_TIME_SEC)
-
- print('Reading DUT tty...')
- dut_output, _ = self.dut.read_tty()
- self.dut.close_tty()
- print('Reading TH tty...')
- th_output, _ = self.th.read_tty()
- self.th.close_tty()
-
- print('Halting TH...')
- if not self.th.reset_halt():
- raise RuntimeError('Failed to halt TH')
- print('Halting DUT...')
- if not self.dut.reset_halt():
- raise RuntimeError('Failed to halt DUT')
-
- if not dut_output or not th_output:
- raise ValueError('Output missing from boards. If you have a process '
- 'reading ttyACMx, please kill that process and try '
- 'again.')
-
- print('Pursing results...')
- th_results, dut_results = self.evaluate_run(dut_output, th_output)
-
- # Print out results
- self.print_result(th_results, dut_results)
-
- # Write results
- dest = os.path.join(self.results_dir, 'results.log')
- with open(dest, 'w') as fl:
- fl.write(self.formatted_results)
-
- # Write UART outputs
- dest = os.path.join(self.results_dir, 'uart_th.log')
- with open(dest, 'w') as fl:
- fl.write(th_output)
- dest = os.path.join(self.results_dir, 'uart_dut.log')
- with open(dest, 'w') as fl:
- fl.write(dut_output)
-
- print(self.formatted_results)
-
- # TODO(chromium:735652): Should set exit code for the shell
-
-
-def main():
- ec_dir = os.path.realpath(os.path.join(
- os.path.dirname(os.path.abspath(__file__)), '..'))
- os.chdir(ec_dir)
-
- dut = DEFAULT_DUT
- module = 'meta'
-
- parser = argparse.ArgumentParser(description='Used to build/flash boards')
- parser.add_argument('-d',
- '--dut',
- help='Specify DUT you want to build/flash')
- parser.add_argument('-m',
- '--module',
- help='Specify module you want to build/flash')
- parser.add_argument('-s',
- '--setup',
- action='store_true',
- help='Connect only the TH to save its serial')
- parser.add_argument('-b',
- '--build',
- action='store_true',
- help='Build test suite (no flashing)')
- parser.add_argument('-f',
- '--flash',
- action='store_true',
- help='Flash boards with most recent images')
- parser.add_argument('-r',
- '--run',
- action='store_true',
- help='Run tests without flashing')
-
- args = parser.parse_args()
-
- if args.module:
- module = args.module
-
- if args.dut:
- dut = args.dut
-
- cts = Cts(ec_dir, DEFAULT_TH, dut=dut, module=module)
-
- if args.setup:
- cts.setup()
- elif args.build:
- cts.build()
- elif args.flash:
- cts.flash_boards()
- elif args.run:
- cts.run()
- else:
- cts.build()
- cts.flash_boards()
- cts.run()
-
-if __name__ == '__main__':
- main()