summaryrefslogtreecommitdiff
path: root/util/uart_stress_tester.py
diff options
context:
space:
mode:
authorNamyoon Woo <namyoon@chromium.org>2019-06-20 08:57:59 -0700
committerCommit Bot <commit-bot@chromium.org>2019-07-10 02:36:23 +0000
commiteccb31cfd4c17785f401e40b1ab2e27106dcec4d (patch)
tree0c729f9c8c9ba578910895bbdc06aaee39076e29 /util/uart_stress_tester.py
parent20ab5ee3c8a97e6bea27a6db550f9ab3c7df89a7 (diff)
downloadchrome-ec-eccb31cfd4c17785f401e40b1ab2e27106dcec4d.tar.gz
util: uart stress tester using 'chargen' command
Uart stress tester runs a 'chargen' UART command on EC and/or AP, and checks if any characters are lost from UART output. BUG=b:131340067 BRANCH=None TEST=ran on Bob and Octopus (Fleex) $ ./util/uart_stress_tester.py -h usage: uart_stress_tester.py [-h] [-c] [-d] [-t TIME] [port [port ...]] uart_stress_tester.py repeats sending a uart console command to each UART device for a given time, and check if output has any missing characters. Examples: uart_stress_tester.py /dev/ttyUSB2 --time 3600 uart_stress_tester.py /dev/ttyUSB1 /dev/ttyUSB2 --debug uart_stress_tester.py /dev/ttyUSB1 /dev/ttyUSB2 --cr50 positional arguments: port UART device path to test optional arguments: -h, --help show this help message and exit -c, --cr50 generate TPM workload on cr50 -d, --debug enable debug messages -t TIME, --time TIME Test duration in second $ ./util/uart_stress_tester.py /dev/ttyUSB1 /dev/ttyUSB2 -t 120 INFO | UartSerial| EC | 0 char lost / 1382400 (0.0 %) INFO | UartSerial| AP | 0 char lost / 1382400 (0.0 %) INFO | ChargenTest | PASS: lost 0 character(s) from the test $ ./util/uart_stress_tester.py /dev/ttyUSB1 /dev/ttyUSB2 -t 120 --cr50 INFO | UartSerial| EC | 0 char lost / 1382400 (0.0 %) INFO | UartSerial| AP | 0 char lost / 1382400 (0.0 %) INFO | ChargenTest | PASS: lost 0 character(s) from the test Change-Id: I713fb0180db3ca5904bd7aae0dd26a4633733d2e Signed-off-by: Namyoon Woo <namyoon@chromium.org> Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/ec/+/1683011 Reviewed-by: Mary Ruthven <mruthven@chromium.org>
Diffstat (limited to 'util/uart_stress_tester.py')
-rwxr-xr-xutil/uart_stress_tester.py509
1 files changed, 509 insertions, 0 deletions
diff --git a/util/uart_stress_tester.py b/util/uart_stress_tester.py
new file mode 100755
index 0000000000..2dd5e12515
--- /dev/null
+++ b/util/uart_stress_tester.py
@@ -0,0 +1,509 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""ChromeOS Uart Stress Test
+
+This tester runs the command 'chargen' on EC and/or AP, captures the
+output, and compares it against the expected output to check any characters
+lost.
+
+Prerequisite:
+ (1) This test needs PySerial. Please check if it is available before test.
+ Can be installed by 'pip install pyserial'
+ (2) If servod is running, turn uart_timestamp off before running this test.
+ e.g. dut-control cr50_uart_timestamp:off
+"""
+
+from __future__ import print_function
+from chromite.lib import cros_logging as logging
+
+import argparse
+import atexit
+import serial
+import sys
+import threading
+import time
+
+
+BAUDRATE = 115200 # Default baudrate setting for UART port
+CROS_USERNAME = 'root' # Account name to login to ChromeOS
+CROS_PASSWORD = 'test0000' # Password to login to ChromeOS
+CHARGEN_TXT = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
+ # The result of 'chargen 62 62'
+CHARGEN_TXT_LEN = len(CHARGEN_TXT)
+CR = '\r' # Carriage Return
+LF = '\n' # Line Feed
+CRLF = CR + LF
+FLAG_FILENAME = '/tmp/chargen_testing'
+TPM_CMD = ('trunks_client --key_create --rsa=2048 --usage=sign'
+ ' --key_blob=/tmp/blob &> /dev/null')
+ # A ChromeOS TPM command for the cr50 stress
+ # purpose.
+CR50_LOAD_GEN_CMD = ('while [[ -f %s ]]; do %s; done &'
+ % (FLAG_FILENAME, TPM_CMD))
+ # A command line to run TPM_CMD in background
+ # infinitely.
+
+
+class ChargenTestError(Exception):
+ """Exception for Uart Stress Test Error"""
+ pass
+
+
+class UartSerial(object):
+ """Test Object for a single UART serial device
+
+ Attributes
+ UART_DEV_PROFILES
+ """
+ UART_DEV_PROFILES = (
+ # Kernel
+ {
+ 'prompt':'localhost login:',
+ 'device_type':'AP',
+ 'prepare_cmd':[
+ CROS_USERNAME, # Login
+ CROS_PASSWORD, # Password
+ 'dmesg -D', # Disable console message
+ 'touch ' + FLAG_FILENAME, # Create a temp file
+ ],
+ 'cleanup_cmd':[
+ 'rm -f ' + FLAG_FILENAME, # Remove the temp file
+ 'dmesg -E', # Enable console message
+ 'logout', # Logout
+ ],
+ 'end_of_input':LF,
+ },
+ # EC
+ {
+ 'prompt':'> ',
+ 'device_type':'EC',
+ 'prepare_cmd':[
+ 'chan save',
+ 'chan 0' # Disable console message
+ ],
+ 'cleanup_cmd':['', 'chan restore'],
+ 'end_of_input':CRLF,
+ },
+ )
+
+ def __init__(self, port, duration, timeout=1,
+ baudrate=BAUDRATE, cr50_workload=False):
+ """Initialize UartSerial
+
+ Args:
+ port: UART device path. e.g. /dev/ttyUSB0
+ duration: Time to test, in seconds
+ timeout: Read timeout value.
+ baudrate: Baud rate such as 9600 or 115200.
+ cr50_workload: True if a workload should be generated on cr50
+
+ Attributes:
+ char_loss_occurrences: Number that character loss happens
+ cleanup_cli: Command list to perform before the test exits
+ cr50_workload: True if cr50 should be stressed, or False otherwise
+ dev_prof: Dictionary of device profile
+ duration: Time to keep chargen running
+ eol: Characters to add at the end of input
+ logger: object that store the log
+ num_ch_exp: Expected number of characters in output
+ num_ch_cap: Number of captured characters in output
+ test_cli: Command list to run for chargen test
+ test_thread: Thread object that captures the UART output
+ serial: serial.Serial object
+ """
+
+ # Initialize serial object
+ self.serial = serial.Serial()
+ self.serial.port = port
+ self.serial.timeout = timeout
+ self.serial.baudrate = baudrate
+
+ self.duration = duration
+ self.cr50_workload = cr50_workload
+
+ self.logger = logging.getLogger(type(self).__name__ + '| ' + port)
+ self.test_thread = threading.Thread(target=self.stress_test_thread)
+
+ self.dev_prof = {}
+ self.cleanup_cli = []
+ self.test_cli = []
+ self.eol = CRLF
+ self.num_ch_exp = 0
+ self.num_ch_cap = 0
+ self.char_loss_occurrences = 0
+ atexit.register(self.cleanup)
+
+ def run_command(self, command_lines, delay=0):
+ """Run command(s) at UART prompt
+
+ Args:
+ command_lines: list of commands to run.
+ delay: delay after a command in second
+ """
+ for cli in command_lines:
+ self.logger.debug('run %r', cli)
+
+ self.serial.write(cli + self.eol)
+ self.serial.flush()
+ if delay:
+ time.sleep(delay)
+
+ def cleanup(self):
+ """Before termination, clean up the UART device."""
+ self.logger.debug('Closing...')
+
+ self.serial.open()
+ self.run_command(self.cleanup_cli) # Run cleanup commands
+ self.serial.close()
+
+ self.logger.debug('Cleanup done')
+
+ def get_output(self):
+ """Capture the UART output
+
+ Args:
+ stop_char: Read output buffer until it reads stop_char.
+
+ Returns:
+ text from UART output.
+ """
+ if self.serial.inWaiting() == 0:
+ time.sleep(1)
+
+ return self.serial.read(self.serial.inWaiting())
+
+ def prepare(self):
+ """Prepare the test:
+
+ Identify the type of UART device (EC or Kernel?), then
+ decide what kind of commands to use to generate stress loads.
+
+ Raises:
+ ChargenTestError if UART source can't be identified.
+ """
+ try:
+ self.logger.info('Preparing...')
+
+ self.serial.open()
+
+ # Prepare the device for test
+ self.serial.flushInput()
+ self.serial.flushOutput()
+
+ self.get_output() # drain data
+
+ # Give a couple of line feeds, and capture the prompt text
+ self.run_command(['', ''])
+ prompt_txt = self.get_output()
+
+ # Detect the device source: EC or AP?
+ # Detect if the device is AP or EC console based on the captured.
+ for dev_prof in self.UART_DEV_PROFILES:
+ if dev_prof['prompt'] in prompt_txt:
+ self.dev_prof = dev_prof
+ break
+ else:
+ # No prompt patterns were found. UART seems not responding or in
+ # an undesirable status.
+ if prompt_txt:
+ raise ChargenTestError('%s: Got an unknown prompt text: %s\n'
+ 'Check manually whether %s is available.' %
+ (self.serial.port, prompt_txt,
+ self.serial.port))
+ else:
+ raise ChargenTestError('%s: Got no input. Close any other connections'
+ ' to this port, and try it again.' %
+ self.serial.port)
+
+ self.logger.info('Detected as %s UART', self.dev_prof['device_type'])
+ # Log displays the UART type (AP|EC) instead of device filename.
+ self.logger = logging.getLogger(type(self).__name__ + '| ' +
+ self.dev_prof['device_type'])
+
+ # Either login to AP or run some commands to prepare the device
+ # for test
+ self.eol = self.dev_prof['end_of_input']
+ self.run_command(self.dev_prof['prepare_cmd'], delay=2)
+ self.cleanup_cli += self.dev_prof['cleanup_cmd']
+
+ # Check whether the command 'chargen' is available in the device.
+ # 'chargen 1 4' is supposed to print '0000'
+ self.get_output() # drain data
+ self.run_command(['chargen 1 4'])
+ tmp_txt = self.get_output()
+
+ # Check whether chargen command is available.
+ if '0000' not in tmp_txt:
+ raise ChargenTestError('%s: Chargen got an unexpected result: %s' %
+ (self.dev_prof['device_type'], tmp_txt))
+
+ self.num_ch_exp = int(self.serial.baudrate * self.duration / 10)
+ self.test_cli = ['chargen %d %d' % (CHARGEN_TXT_LEN, self.num_ch_exp)]
+
+ self.logger.info('Ready to test')
+ finally:
+ self.serial.close()
+
+ def stress_test_thread(self):
+ """Test thread"""
+ try:
+ self.serial.open()
+ self.serial.flushInput()
+ self.serial.flushOutput()
+
+ # Run TPM command in background to burden cr50.
+ if self.dev_prof['device_type'] == 'AP' and self.cr50_workload:
+ self.run_command([CR50_LOAD_GEN_CMD])
+ self.logger.debug('run TPM job while %s exists', FLAG_FILENAME)
+
+ # Run the command 'chargen', one time
+ self.get_output() # Drain the output
+ self.run_command(self.test_cli)
+ self.serial.readline() # Drain the echoed command line.
+
+ err_msg = '%s: Expected %r but got %s after %d char received'
+
+ # Keep capturing the output until the test timer is expired.
+ self.num_ch_cap = 0
+ self.char_loss_occurrences = 0
+ data_starve_count = 0
+
+ total_num_ch = self.num_ch_exp # Expected number of characters in total
+ ch_exp = CHARGEN_TXT[0]
+ ch_cap = 'z' # any character value is ok for loop initial condition.
+ while self.num_ch_cap < total_num_ch:
+ captured = self.get_output()
+
+ if captured:
+ # There is some output data. Reset the data starvation count.
+ data_starve_count = 0
+ else:
+ data_starve_count += 1
+ if data_starve_count > 1:
+ # If nothing was captured more than once, then terminate the test.
+ self.logger.debug('No more output')
+ break
+
+ for ch_cap in captured:
+ if ch_cap not in CHARGEN_TXT:
+ # If it is not alpha-numeric, terminate the test.
+ if ch_cap not in CRLF:
+ # If it is neither a CR nor LF, then it is an error case.
+ self.logger.error(err_msg, 'Broken char captured',
+ ch_exp, hex(ord(ch_cap)), self.num_ch_cap)
+ self.logger.error('Whole captured characters: %r', captured)
+ # Set the loop termination condition true.
+ total_num_ch = self.num_ch_cap
+
+ if self.num_ch_cap >= total_num_ch:
+ break
+
+ if ch_exp != ch_cap:
+ # If it is alpha-numeric but not continuous, then some characters
+ # are lost.
+ self.logger.error(err_msg, 'Char loss detected',
+ ch_exp, repr(ch_cap), self.num_ch_cap)
+ self.char_loss_occurrences += 1
+
+ # Recalculate the expected number of characters to adjust
+ # termination condition. The loss might be bigger than this
+ # adjustment, but it is okay since it will terminates by either
+ # CR/LF detection or by data starvation.
+ idx_ch_exp = CHARGEN_TXT.find(ch_exp)
+ idx_ch_cap = CHARGEN_TXT.find(ch_cap)
+ if idx_ch_cap < idx_ch_exp:
+ idx_ch_cap += len(CHARGEN_TXT)
+ total_num_ch -= (idx_ch_cap - idx_ch_exp)
+
+ self.num_ch_cap += 1
+
+ # Determine What character is expected next?
+ ch_exp = CHARGEN_TXT[(CHARGEN_TXT.find(ch_cap) + 1) % CHARGEN_TXT_LEN]
+
+ finally:
+ self.serial.close()
+
+ def start_test(self):
+ """Start the test thread"""
+ self.logger.info('Test thread starts')
+ self.test_thread.start()
+
+ def wait_test_done(self):
+ """Wait until the test thread get done and join"""
+ self.test_thread.join()
+ self.logger.info('Test thread is done')
+
+ def get_result(self):
+ """Display the result
+
+ Returns:
+ Integer = the number of lost character
+
+ Raises:
+ ChargenTestError: if the capture is corrupted.
+ """
+ # If more characters than expected are captured, it means some messages
+ # from other than chargen are mixed. Stop processing further.
+ if self.num_ch_exp < self.num_ch_cap:
+ raise ChargenTestError('%s: UART output is corrupted.' %
+ self.dev_prof['device_type'])
+
+ # Get the count difference between the expected to the captured
+ # as the number of lost character.
+ char_lost = self.num_ch_exp - self.num_ch_cap
+ self.logger.info('%8d char lost / %10d (%.1f %%)',
+ char_lost, self.num_ch_exp,
+ char_lost * 100.0 / self.num_ch_exp)
+
+ return char_lost, self.num_ch_exp, self.char_loss_occurrences
+
+
+class ChargenTest(object):
+ """UART stress tester
+
+ Attributes:
+ cr50_workload: True if cr50 should be stressed, or False otherwise
+ duration: Time to keep testing in seconds
+ logger: logging object
+ ports: List of Uart device filename
+ serials: Dictionary where key is filename of UART device, and the value is
+ UartSerial object
+ """
+
+ def __init__(self, ports, duration, cr50_workload=False):
+ """Initialize UART stress tester
+
+ Args:
+ ports: List of UART ports to test.
+ duration: Time to keep testing in seconds.
+ cr50_workload: True if a workload should be generated on cr50
+ """
+
+ # Save the arguments
+ self.ports = ports
+
+ if duration <= 0:
+ raise ChargenTestError('Input error: duration is not positive.')
+ self.duration = duration
+
+ self.cr50_workload = cr50_workload
+
+ # Initialize logging object
+ self.logger = logging.getLogger(type(self).__name__)
+
+ # Create an UartSerial object per UART port
+ self.serials = {} # UartSerial objects
+ for port in self.ports:
+ self.serials[port] = UartSerial(port=port, duration=self.duration,
+ cr50_workload=self.cr50_workload)
+
+ def prepare(self):
+ """Prepare the test for each UART port"""
+ self.logger.info('Prepare ports for test')
+ for _, ser in self.serials.items():
+ ser.prepare()
+ self.logger.info('Ports are ready to test')
+
+ def print_result(self):
+ """Display the test result for each UART port"""
+ char_lost = 0
+ for _, ser in self.serials.items():
+ (tmp_lost, _, _) = ser.get_result()
+ char_lost += tmp_lost
+
+ # If any characters are lost, then test fails.
+ msg = 'lost %d character(s) from the test' % char_lost
+ if char_lost > 0:
+ self.logger.error('FAIL: %s', msg)
+ else:
+ self.logger.info('PASS: %s', msg)
+
+ def run(self):
+ """Run the stress test on UART port(s)"""
+
+ # Detect UART source type, and decide which command to test.
+ self.prepare()
+
+ # Run the test on each UART port in thread.
+ self.logger.info('Test starts')
+ for _, ser in self.serials.items():
+ ser.start_test()
+
+ # Wait all tests to finish.
+ for _, ser in self.serials.items():
+ ser.wait_test_done()
+
+ # Print the result.
+ self.print_result()
+ self.logger.info('Test is done')
+
+
+def parse_args(cmdline):
+ """Parse command line arguments.
+
+ Args:
+ cmdline: list to be parsed
+
+ Returns:
+ tuple (options, args) where args is a list of cmdline arguments that the
+ parser was unable to match i.e. they're servod controls, not options.
+ """
+ description = """%(prog)s repeats sending a uart console command
+to each UART device for a given time, and check if output
+has any missing characters.
+
+Examples:
+ %(prog)s /dev/ttyUSB2 --time 3600
+ %(prog)s /dev/ttyUSB1 /dev/ttyUSB2 --debug
+ %(prog)s /dev/ttyUSB1 /dev/ttyUSB2 --cr50
+"""
+
+ parser = argparse.ArgumentParser(description=description,
+ formatter_class=argparse.RawTextHelpFormatter
+ )
+ parser.add_argument('port', type=str, nargs="*",
+ help='UART device path to test')
+ parser.add_argument('-c', '--cr50', action='store_true', default=False,
+ help='generate TPM workload on cr50')
+ parser.add_argument('-d', '--debug', action='store_true', default=False,
+ help='enable debug messages')
+ parser.add_argument('-t', '--time', type=int,
+ help='Test duration in second', default=300)
+ return parser.parse_known_args(cmdline)
+
+
+def main():
+ """Main function wrapper"""
+ try:
+ (options, _) = parse_args(sys.argv[1:])
+
+ # Set Log format
+ log_format = '%(asctime)s %(levelname)-6s | %(name)-25s'
+ date_format = '%Y-%m-%d %H:%M:%S'
+ if options.debug:
+ log_format += ' | %(filename)s:%(lineno)4d:%(funcName)-18s'
+ loglevel = logging.DEBUG
+ else:
+ loglevel = logging.INFO
+ log_format += ' | %(message)s'
+
+ logging.basicConfig(level=loglevel, format=log_format,
+ datefmt=date_format)
+
+ # Create a ChargenTest object
+ utest = ChargenTest(options.port, options.time, options.cr50)
+ utest.run() # Run
+
+ except KeyboardInterrupt:
+ sys.exit(0)
+
+ except ChargenTestError as e:
+ print('Error: ', str(e))
+ sys.exit(1)
+
+if __name__ == '__main__':
+ main()