diff options
author | Namyoon Woo <namyoon@chromium.org> | 2019-06-20 08:57:59 -0700 |
---|---|---|
committer | Commit Bot <commit-bot@chromium.org> | 2019-07-10 02:36:23 +0000 |
commit | eccb31cfd4c17785f401e40b1ab2e27106dcec4d (patch) | |
tree | 0c729f9c8c9ba578910895bbdc06aaee39076e29 /util/uart_stress_tester.py | |
parent | 20ab5ee3c8a97e6bea27a6db550f9ab3c7df89a7 (diff) | |
download | chrome-ec-eccb31cfd4c17785f401e40b1ab2e27106dcec4d.tar.gz |
util: uart stress tester using 'chargen' command
Uart stress tester runs a 'chargen' UART command on EC and/or AP, and
checks if any characters are lost from UART output.
BUG=b:131340067
BRANCH=None
TEST=ran on Bob and Octopus (Fleex)
$ ./util/uart_stress_tester.py -h
usage: uart_stress_tester.py [-h] [-c] [-d] [-t TIME] [port [port ...]]
uart_stress_tester.py repeats sending a uart console command
to each UART device for a given time, and check if output
has any missing characters.
Examples:
uart_stress_tester.py /dev/ttyUSB2 --time 3600
uart_stress_tester.py /dev/ttyUSB1 /dev/ttyUSB2 --debug
uart_stress_tester.py /dev/ttyUSB1 /dev/ttyUSB2 --cr50
positional arguments:
port UART device path to test
optional arguments:
-h, --help show this help message and exit
-c, --cr50 generate TPM workload on cr50
-d, --debug enable debug messages
-t TIME, --time TIME Test duration in second
$ ./util/uart_stress_tester.py /dev/ttyUSB1 /dev/ttyUSB2 -t 120
INFO | UartSerial| EC | 0 char lost / 1382400 (0.0 %)
INFO | UartSerial| AP | 0 char lost / 1382400 (0.0 %)
INFO | ChargenTest | PASS: lost 0 character(s) from the test
$ ./util/uart_stress_tester.py /dev/ttyUSB1 /dev/ttyUSB2 -t 120 --cr50
INFO | UartSerial| EC | 0 char lost / 1382400 (0.0 %)
INFO | UartSerial| AP | 0 char lost / 1382400 (0.0 %)
INFO | ChargenTest | PASS: lost 0 character(s) from the test
Change-Id: I713fb0180db3ca5904bd7aae0dd26a4633733d2e
Signed-off-by: Namyoon Woo <namyoon@chromium.org>
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/ec/+/1683011
Reviewed-by: Mary Ruthven <mruthven@chromium.org>
Diffstat (limited to 'util/uart_stress_tester.py')
-rwxr-xr-x | util/uart_stress_tester.py | 509 |
1 files changed, 509 insertions, 0 deletions
diff --git a/util/uart_stress_tester.py b/util/uart_stress_tester.py new file mode 100755 index 0000000000..2dd5e12515 --- /dev/null +++ b/util/uart_stress_tester.py @@ -0,0 +1,509 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- +# Copyright 2019 The Chromium OS Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""ChromeOS Uart Stress Test + +This tester runs the command 'chargen' on EC and/or AP, captures the +output, and compares it against the expected output to check any characters +lost. + +Prerequisite: + (1) This test needs PySerial. Please check if it is available before test. + Can be installed by 'pip install pyserial' + (2) If servod is running, turn uart_timestamp off before running this test. + e.g. dut-control cr50_uart_timestamp:off +""" + +from __future__ import print_function +from chromite.lib import cros_logging as logging + +import argparse +import atexit +import serial +import sys +import threading +import time + + +BAUDRATE = 115200 # Default baudrate setting for UART port +CROS_USERNAME = 'root' # Account name to login to ChromeOS +CROS_PASSWORD = 'test0000' # Password to login to ChromeOS +CHARGEN_TXT = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' + # The result of 'chargen 62 62' +CHARGEN_TXT_LEN = len(CHARGEN_TXT) +CR = '\r' # Carriage Return +LF = '\n' # Line Feed +CRLF = CR + LF +FLAG_FILENAME = '/tmp/chargen_testing' +TPM_CMD = ('trunks_client --key_create --rsa=2048 --usage=sign' + ' --key_blob=/tmp/blob &> /dev/null') + # A ChromeOS TPM command for the cr50 stress + # purpose. +CR50_LOAD_GEN_CMD = ('while [[ -f %s ]]; do %s; done &' + % (FLAG_FILENAME, TPM_CMD)) + # A command line to run TPM_CMD in background + # infinitely. + + +class ChargenTestError(Exception): + """Exception for Uart Stress Test Error""" + pass + + +class UartSerial(object): + """Test Object for a single UART serial device + + Attributes + UART_DEV_PROFILES + """ + UART_DEV_PROFILES = ( + # Kernel + { + 'prompt':'localhost login:', + 'device_type':'AP', + 'prepare_cmd':[ + CROS_USERNAME, # Login + CROS_PASSWORD, # Password + 'dmesg -D', # Disable console message + 'touch ' + FLAG_FILENAME, # Create a temp file + ], + 'cleanup_cmd':[ + 'rm -f ' + FLAG_FILENAME, # Remove the temp file + 'dmesg -E', # Enable console message + 'logout', # Logout + ], + 'end_of_input':LF, + }, + # EC + { + 'prompt':'> ', + 'device_type':'EC', + 'prepare_cmd':[ + 'chan save', + 'chan 0' # Disable console message + ], + 'cleanup_cmd':['', 'chan restore'], + 'end_of_input':CRLF, + }, + ) + + def __init__(self, port, duration, timeout=1, + baudrate=BAUDRATE, cr50_workload=False): + """Initialize UartSerial + + Args: + port: UART device path. e.g. /dev/ttyUSB0 + duration: Time to test, in seconds + timeout: Read timeout value. + baudrate: Baud rate such as 9600 or 115200. + cr50_workload: True if a workload should be generated on cr50 + + Attributes: + char_loss_occurrences: Number that character loss happens + cleanup_cli: Command list to perform before the test exits + cr50_workload: True if cr50 should be stressed, or False otherwise + dev_prof: Dictionary of device profile + duration: Time to keep chargen running + eol: Characters to add at the end of input + logger: object that store the log + num_ch_exp: Expected number of characters in output + num_ch_cap: Number of captured characters in output + test_cli: Command list to run for chargen test + test_thread: Thread object that captures the UART output + serial: serial.Serial object + """ + + # Initialize serial object + self.serial = serial.Serial() + self.serial.port = port + self.serial.timeout = timeout + self.serial.baudrate = baudrate + + self.duration = duration + self.cr50_workload = cr50_workload + + self.logger = logging.getLogger(type(self).__name__ + '| ' + port) + self.test_thread = threading.Thread(target=self.stress_test_thread) + + self.dev_prof = {} + self.cleanup_cli = [] + self.test_cli = [] + self.eol = CRLF + self.num_ch_exp = 0 + self.num_ch_cap = 0 + self.char_loss_occurrences = 0 + atexit.register(self.cleanup) + + def run_command(self, command_lines, delay=0): + """Run command(s) at UART prompt + + Args: + command_lines: list of commands to run. + delay: delay after a command in second + """ + for cli in command_lines: + self.logger.debug('run %r', cli) + + self.serial.write(cli + self.eol) + self.serial.flush() + if delay: + time.sleep(delay) + + def cleanup(self): + """Before termination, clean up the UART device.""" + self.logger.debug('Closing...') + + self.serial.open() + self.run_command(self.cleanup_cli) # Run cleanup commands + self.serial.close() + + self.logger.debug('Cleanup done') + + def get_output(self): + """Capture the UART output + + Args: + stop_char: Read output buffer until it reads stop_char. + + Returns: + text from UART output. + """ + if self.serial.inWaiting() == 0: + time.sleep(1) + + return self.serial.read(self.serial.inWaiting()) + + def prepare(self): + """Prepare the test: + + Identify the type of UART device (EC or Kernel?), then + decide what kind of commands to use to generate stress loads. + + Raises: + ChargenTestError if UART source can't be identified. + """ + try: + self.logger.info('Preparing...') + + self.serial.open() + + # Prepare the device for test + self.serial.flushInput() + self.serial.flushOutput() + + self.get_output() # drain data + + # Give a couple of line feeds, and capture the prompt text + self.run_command(['', '']) + prompt_txt = self.get_output() + + # Detect the device source: EC or AP? + # Detect if the device is AP or EC console based on the captured. + for dev_prof in self.UART_DEV_PROFILES: + if dev_prof['prompt'] in prompt_txt: + self.dev_prof = dev_prof + break + else: + # No prompt patterns were found. UART seems not responding or in + # an undesirable status. + if prompt_txt: + raise ChargenTestError('%s: Got an unknown prompt text: %s\n' + 'Check manually whether %s is available.' % + (self.serial.port, prompt_txt, + self.serial.port)) + else: + raise ChargenTestError('%s: Got no input. Close any other connections' + ' to this port, and try it again.' % + self.serial.port) + + self.logger.info('Detected as %s UART', self.dev_prof['device_type']) + # Log displays the UART type (AP|EC) instead of device filename. + self.logger = logging.getLogger(type(self).__name__ + '| ' + + self.dev_prof['device_type']) + + # Either login to AP or run some commands to prepare the device + # for test + self.eol = self.dev_prof['end_of_input'] + self.run_command(self.dev_prof['prepare_cmd'], delay=2) + self.cleanup_cli += self.dev_prof['cleanup_cmd'] + + # Check whether the command 'chargen' is available in the device. + # 'chargen 1 4' is supposed to print '0000' + self.get_output() # drain data + self.run_command(['chargen 1 4']) + tmp_txt = self.get_output() + + # Check whether chargen command is available. + if '0000' not in tmp_txt: + raise ChargenTestError('%s: Chargen got an unexpected result: %s' % + (self.dev_prof['device_type'], tmp_txt)) + + self.num_ch_exp = int(self.serial.baudrate * self.duration / 10) + self.test_cli = ['chargen %d %d' % (CHARGEN_TXT_LEN, self.num_ch_exp)] + + self.logger.info('Ready to test') + finally: + self.serial.close() + + def stress_test_thread(self): + """Test thread""" + try: + self.serial.open() + self.serial.flushInput() + self.serial.flushOutput() + + # Run TPM command in background to burden cr50. + if self.dev_prof['device_type'] == 'AP' and self.cr50_workload: + self.run_command([CR50_LOAD_GEN_CMD]) + self.logger.debug('run TPM job while %s exists', FLAG_FILENAME) + + # Run the command 'chargen', one time + self.get_output() # Drain the output + self.run_command(self.test_cli) + self.serial.readline() # Drain the echoed command line. + + err_msg = '%s: Expected %r but got %s after %d char received' + + # Keep capturing the output until the test timer is expired. + self.num_ch_cap = 0 + self.char_loss_occurrences = 0 + data_starve_count = 0 + + total_num_ch = self.num_ch_exp # Expected number of characters in total + ch_exp = CHARGEN_TXT[0] + ch_cap = 'z' # any character value is ok for loop initial condition. + while self.num_ch_cap < total_num_ch: + captured = self.get_output() + + if captured: + # There is some output data. Reset the data starvation count. + data_starve_count = 0 + else: + data_starve_count += 1 + if data_starve_count > 1: + # If nothing was captured more than once, then terminate the test. + self.logger.debug('No more output') + break + + for ch_cap in captured: + if ch_cap not in CHARGEN_TXT: + # If it is not alpha-numeric, terminate the test. + if ch_cap not in CRLF: + # If it is neither a CR nor LF, then it is an error case. + self.logger.error(err_msg, 'Broken char captured', + ch_exp, hex(ord(ch_cap)), self.num_ch_cap) + self.logger.error('Whole captured characters: %r', captured) + # Set the loop termination condition true. + total_num_ch = self.num_ch_cap + + if self.num_ch_cap >= total_num_ch: + break + + if ch_exp != ch_cap: + # If it is alpha-numeric but not continuous, then some characters + # are lost. + self.logger.error(err_msg, 'Char loss detected', + ch_exp, repr(ch_cap), self.num_ch_cap) + self.char_loss_occurrences += 1 + + # Recalculate the expected number of characters to adjust + # termination condition. The loss might be bigger than this + # adjustment, but it is okay since it will terminates by either + # CR/LF detection or by data starvation. + idx_ch_exp = CHARGEN_TXT.find(ch_exp) + idx_ch_cap = CHARGEN_TXT.find(ch_cap) + if idx_ch_cap < idx_ch_exp: + idx_ch_cap += len(CHARGEN_TXT) + total_num_ch -= (idx_ch_cap - idx_ch_exp) + + self.num_ch_cap += 1 + + # Determine What character is expected next? + ch_exp = CHARGEN_TXT[(CHARGEN_TXT.find(ch_cap) + 1) % CHARGEN_TXT_LEN] + + finally: + self.serial.close() + + def start_test(self): + """Start the test thread""" + self.logger.info('Test thread starts') + self.test_thread.start() + + def wait_test_done(self): + """Wait until the test thread get done and join""" + self.test_thread.join() + self.logger.info('Test thread is done') + + def get_result(self): + """Display the result + + Returns: + Integer = the number of lost character + + Raises: + ChargenTestError: if the capture is corrupted. + """ + # If more characters than expected are captured, it means some messages + # from other than chargen are mixed. Stop processing further. + if self.num_ch_exp < self.num_ch_cap: + raise ChargenTestError('%s: UART output is corrupted.' % + self.dev_prof['device_type']) + + # Get the count difference between the expected to the captured + # as the number of lost character. + char_lost = self.num_ch_exp - self.num_ch_cap + self.logger.info('%8d char lost / %10d (%.1f %%)', + char_lost, self.num_ch_exp, + char_lost * 100.0 / self.num_ch_exp) + + return char_lost, self.num_ch_exp, self.char_loss_occurrences + + +class ChargenTest(object): + """UART stress tester + + Attributes: + cr50_workload: True if cr50 should be stressed, or False otherwise + duration: Time to keep testing in seconds + logger: logging object + ports: List of Uart device filename + serials: Dictionary where key is filename of UART device, and the value is + UartSerial object + """ + + def __init__(self, ports, duration, cr50_workload=False): + """Initialize UART stress tester + + Args: + ports: List of UART ports to test. + duration: Time to keep testing in seconds. + cr50_workload: True if a workload should be generated on cr50 + """ + + # Save the arguments + self.ports = ports + + if duration <= 0: + raise ChargenTestError('Input error: duration is not positive.') + self.duration = duration + + self.cr50_workload = cr50_workload + + # Initialize logging object + self.logger = logging.getLogger(type(self).__name__) + + # Create an UartSerial object per UART port + self.serials = {} # UartSerial objects + for port in self.ports: + self.serials[port] = UartSerial(port=port, duration=self.duration, + cr50_workload=self.cr50_workload) + + def prepare(self): + """Prepare the test for each UART port""" + self.logger.info('Prepare ports for test') + for _, ser in self.serials.items(): + ser.prepare() + self.logger.info('Ports are ready to test') + + def print_result(self): + """Display the test result for each UART port""" + char_lost = 0 + for _, ser in self.serials.items(): + (tmp_lost, _, _) = ser.get_result() + char_lost += tmp_lost + + # If any characters are lost, then test fails. + msg = 'lost %d character(s) from the test' % char_lost + if char_lost > 0: + self.logger.error('FAIL: %s', msg) + else: + self.logger.info('PASS: %s', msg) + + def run(self): + """Run the stress test on UART port(s)""" + + # Detect UART source type, and decide which command to test. + self.prepare() + + # Run the test on each UART port in thread. + self.logger.info('Test starts') + for _, ser in self.serials.items(): + ser.start_test() + + # Wait all tests to finish. + for _, ser in self.serials.items(): + ser.wait_test_done() + + # Print the result. + self.print_result() + self.logger.info('Test is done') + + +def parse_args(cmdline): + """Parse command line arguments. + + Args: + cmdline: list to be parsed + + Returns: + tuple (options, args) where args is a list of cmdline arguments that the + parser was unable to match i.e. they're servod controls, not options. + """ + description = """%(prog)s repeats sending a uart console command +to each UART device for a given time, and check if output +has any missing characters. + +Examples: + %(prog)s /dev/ttyUSB2 --time 3600 + %(prog)s /dev/ttyUSB1 /dev/ttyUSB2 --debug + %(prog)s /dev/ttyUSB1 /dev/ttyUSB2 --cr50 +""" + + parser = argparse.ArgumentParser(description=description, + formatter_class=argparse.RawTextHelpFormatter + ) + parser.add_argument('port', type=str, nargs="*", + help='UART device path to test') + parser.add_argument('-c', '--cr50', action='store_true', default=False, + help='generate TPM workload on cr50') + parser.add_argument('-d', '--debug', action='store_true', default=False, + help='enable debug messages') + parser.add_argument('-t', '--time', type=int, + help='Test duration in second', default=300) + return parser.parse_known_args(cmdline) + + +def main(): + """Main function wrapper""" + try: + (options, _) = parse_args(sys.argv[1:]) + + # Set Log format + log_format = '%(asctime)s %(levelname)-6s | %(name)-25s' + date_format = '%Y-%m-%d %H:%M:%S' + if options.debug: + log_format += ' | %(filename)s:%(lineno)4d:%(funcName)-18s' + loglevel = logging.DEBUG + else: + loglevel = logging.INFO + log_format += ' | %(message)s' + + logging.basicConfig(level=loglevel, format=log_format, + datefmt=date_format) + + # Create a ChargenTest object + utest = ChargenTest(options.port, options.time, options.cr50) + utest.run() # Run + + except KeyboardInterrupt: + sys.exit(0) + + except ChargenTestError as e: + print('Error: ', str(e)) + sys.exit(1) + +if __name__ == '__main__': + main() |