diff options
Diffstat (limited to 'util/uart_stress_tester.py')
-rwxr-xr-x | util/uart_stress_tester.py | 1042 |
1 files changed, 551 insertions, 491 deletions
diff --git a/util/uart_stress_tester.py b/util/uart_stress_tester.py index b3db60060e..d7b2341e93 100755 --- a/util/uart_stress_tester.py +++ b/util/uart_stress_tester.py @@ -1,12 +1,8 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -# Copyright 2019 The Chromium OS Authors. All rights reserved. +# Copyright 2019 The ChromiumOS Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. -# -# Ignore indention messages, since legacy scripts use 2 spaces instead of 4. -# pylint: disable=bad-indentation,docstring-section-indent -# pylint: disable=docstring-trailing-quotes """ChromeOS Uart Stress Test @@ -21,9 +17,7 @@ Prerequisite: e.g. dut-control cr50_uart_timestamp:off """ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function +from __future__ import absolute_import, division, print_function import argparse import atexit @@ -34,474 +28,518 @@ import sys import threading import time -import serial +import serial # pylint:disable=import-error -BAUDRATE = 115200 # Default baudrate setting for UART port -CROS_USERNAME = 'root' # Account name to login to ChromeOS -CROS_PASSWORD = 'test0000' # Password to login to ChromeOS -CHARGEN_TXT = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' - # The result of 'chargen 62 62' +BAUDRATE = 115200 # Default baudrate setting for UART port +CROS_USERNAME = "root" # Account name to login to ChromeOS +CROS_PASSWORD = "test0000" # Password to login to ChromeOS +CHARGEN_TXT = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" +# The result of 'chargen 62 62' CHARGEN_TXT_LEN = len(CHARGEN_TXT) -CR = '\r' # Carriage Return -LF = '\n' # Line Feed +CR = "\r" # Carriage Return +LF = "\n" # Line Feed CRLF = CR + LF -FLAG_FILENAME = '/tmp/chargen_testing' -TPM_CMD = ('trunks_client --key_create --rsa=2048 --usage=sign' - ' --key_blob=/tmp/blob &> /dev/null') - # A ChromeOS TPM command for the cr50 stress - # purpose. -CR50_LOAD_GEN_CMD = ('while [[ -f %s ]]; do %s; done &' - % (FLAG_FILENAME, TPM_CMD)) - # A command line to run TPM_CMD in background - # infinitely. +FLAG_FILENAME = "/tmp/chargen_testing" +TPM_CMD = ( + "trunks_client --key_create --rsa=2048 --usage=sign" + " --key_blob=/tmp/blob &> /dev/null" +) +# A ChromeOS TPM command for the cr50 stress +# purpose. +CR50_LOAD_GEN_CMD = "while [[ -f %s ]]; do %s; done &" % ( + FLAG_FILENAME, + TPM_CMD, +) +# A command line to run TPM_CMD in background +# infinitely. class ChargenTestError(Exception): - """Exception for Uart Stress Test Error""" - pass + """Exception for Uart Stress Test Error""" + pass -class UartSerial(object): - """Test Object for a single UART serial device - - Attributes: - UART_DEV_PROFILES - char_loss_occurrences: Number that character loss happens - cleanup_cli: Command list to perform before the test exits - cr50_workload: True if cr50 should be stressed, or False otherwise - usb_output: True if output should be generated to USB channel - dev_prof: Dictionary of device profile - duration: Time to keep chargen running - eol: Characters to add at the end of input - logger: object that store the log - num_ch_exp: Expected number of characters in output - num_ch_cap: Number of captured characters in output - test_cli: Command list to run for chargen test - test_thread: Thread object that captures the UART output - serial: serial.Serial object - """ - UART_DEV_PROFILES = ( - # Kernel - { - 'prompt':'localhost login:', - 'device_type':'AP', - 'prepare_cmd':[ - CROS_USERNAME, # Login - CROS_PASSWORD, # Password - 'dmesg -D', # Disable console message - 'touch ' + FLAG_FILENAME, # Create a temp file - ], - 'cleanup_cmd':[ - 'rm -f ' + FLAG_FILENAME, # Remove the temp file - 'dmesg -E', # Enable console message - 'logout', # Logout - ], - 'end_of_input':LF, - }, - # EC - { - 'prompt':'> ', - 'device_type':'EC', - 'prepare_cmd':[ - 'chan save', - 'chan 0' # Disable console message - ], - 'cleanup_cmd':['', 'chan restore'], - 'end_of_input':CRLF, - }, - ) - - def __init__(self, port, duration, timeout=1, - baudrate=BAUDRATE, cr50_workload=False, - usb_output=False): - """Initialize UartSerial - - Args: - port: UART device path. e.g. /dev/ttyUSB0 - duration: Time to test, in seconds - timeout: Read timeout value. - baudrate: Baud rate such as 9600 or 115200. - cr50_workload: True if a workload should be generated on cr50 - usb_output: True if a workload should be generated to USB channel - """ - - # Initialize serial object - self.serial = serial.Serial() - self.serial.port = port - self.serial.timeout = timeout - self.serial.baudrate = baudrate - - self.duration = duration - self.cr50_workload = cr50_workload - self.usb_output = usb_output - - self.logger = logging.getLogger(type(self).__name__ + '| ' + port) - self.test_thread = threading.Thread(target=self.stress_test_thread) - self.dev_prof = {} - self.cleanup_cli = [] - self.test_cli = [] - self.eol = CRLF - self.num_ch_exp = 0 - self.num_ch_cap = 0 - self.char_loss_occurrences = 0 - atexit.register(self.cleanup) - - def run_command(self, command_lines, delay=0): - """Run command(s) at UART prompt - - Args: - command_lines: list of commands to run. - delay: delay after a command in second +class UartSerial(object): + """Test Object for a single UART serial device + + Attributes: + UART_DEV_PROFILES + char_loss_occurrences: Number that character loss happens + cleanup_cli: Command list to perform before the test exits + cr50_workload: True if cr50 should be stressed, or False otherwise + usb_output: True if output should be generated to USB channel + dev_prof: Dictionary of device profile + duration: Time to keep chargen running + eol: Characters to add at the end of input + logger: object that store the log + num_ch_exp: Expected number of characters in output + num_ch_cap: Number of captured characters in output + test_cli: Command list to run for chargen test + test_thread: Thread object that captures the UART output + serial: serial.Serial object """ - for cli in command_lines: - self.logger.debug('run %r', cli) - - self.serial.write((cli + self.eol).encode()) - self.serial.flush() - if delay: - time.sleep(delay) - - def cleanup(self): - """Before termination, clean up the UART device.""" - self.logger.debug('Closing...') - - self.serial.open() - self.run_command(self.cleanup_cli) # Run cleanup commands - self.serial.close() - self.logger.debug('Cleanup done') + UART_DEV_PROFILES = ( + # Kernel + { + "prompt": "localhost login:", + "device_type": "AP", + "prepare_cmd": [ + CROS_USERNAME, # Login + CROS_PASSWORD, # Password + "dmesg -D", # Disable console message + "touch " + FLAG_FILENAME, # Create a temp file + ], + "cleanup_cmd": [ + "rm -f " + FLAG_FILENAME, # Remove the temp file + "dmesg -E", # Enable console message + "logout", # Logout + ], + "end_of_input": LF, + }, + # EC + { + "prompt": "> ", + "device_type": "EC", + "prepare_cmd": ["chan save", "chan 0"], # Disable console message + "cleanup_cmd": ["", "chan restore"], + "end_of_input": CRLF, + }, + ) + + def __init__( + self, + port, + duration, + timeout=1, + baudrate=BAUDRATE, + cr50_workload=False, + usb_output=False, + ): + """Initialize UartSerial + + Args: + port: UART device path. e.g. /dev/ttyUSB0 + duration: Time to test, in seconds + timeout: Read timeout value. + baudrate: Baud rate such as 9600 or 115200. + cr50_workload: True if a workload should be generated on cr50 + usb_output: True if a workload should be generated to USB channel + """ + + # Initialize serial object + self.serial = serial.Serial() + self.serial.port = port + self.serial.timeout = timeout + self.serial.baudrate = baudrate + + self.duration = duration + self.cr50_workload = cr50_workload + self.usb_output = usb_output + + self.logger = logging.getLogger(type(self).__name__ + "| " + port) + self.test_thread = threading.Thread(target=self.stress_test_thread) + + self.dev_prof = {} + self.cleanup_cli = [] + self.test_cli = [] + self.eol = CRLF + self.num_ch_exp = 0 + self.num_ch_cap = 0 + self.char_loss_occurrences = 0 + atexit.register(self.cleanup) + + def run_command(self, command_lines, delay=0): + """Run command(s) at UART prompt + + Args: + command_lines: list of commands to run. + delay: delay after a command in second + """ + for cli in command_lines: + self.logger.debug("run %r", cli) + + self.serial.write((cli + self.eol).encode()) + self.serial.flush() + if delay: + time.sleep(delay) + + def cleanup(self): + """Before termination, clean up the UART device.""" + self.logger.debug("Closing...") + + self.serial.open() + self.run_command(self.cleanup_cli) # Run cleanup commands + self.serial.close() + + self.logger.debug("Cleanup done") + + def get_output(self): + """Capture the UART output + + Args: + stop_char: Read output buffer until it reads stop_char. + + Returns: + text from UART output. + """ + if self.serial.inWaiting() == 0: + time.sleep(1) + + return self.serial.read(self.serial.inWaiting()).decode() + + def prepare(self): + """Prepare the test: + + Identify the type of UART device (EC or Kernel?), then + decide what kind of commands to use to generate stress loads. + + Raises: + ChargenTestError if UART source can't be identified. + """ + try: + self.logger.info("Preparing...") + + self.serial.open() + + # Prepare the device for test + self.serial.flushInput() + self.serial.flushOutput() + + self.get_output() # drain data + + # Give a couple of line feeds, and capture the prompt text + self.run_command(["", ""]) + prompt_txt = self.get_output() + + # Detect the device source: EC or AP? + # Detect if the device is AP or EC console based on the captured. + for dev_prof in self.UART_DEV_PROFILES: + if dev_prof["prompt"] in prompt_txt: + self.dev_prof = dev_prof + break + else: + # No prompt patterns were found. UART seems not responding or in + # an undesirable status. + if prompt_txt: + raise ChargenTestError( + "%s: Got an unknown prompt text: %s\n" + "Check manually whether %s is available." + % (self.serial.port, prompt_txt, self.serial.port) + ) + else: + raise ChargenTestError( + "%s: Got no input. Close any other connections" + " to this port, and try it again." % self.serial.port + ) + + self.logger.info( + "Detected as %s UART", self.dev_prof["device_type"] + ) + # Log displays the UART type (AP|EC) instead of device filename. + self.logger = logging.getLogger( + type(self).__name__ + "| " + self.dev_prof["device_type"] + ) + + # Either login to AP or run some commands to prepare the device + # for test + self.eol = self.dev_prof["end_of_input"] + self.run_command(self.dev_prof["prepare_cmd"], delay=2) + self.cleanup_cli += self.dev_prof["cleanup_cmd"] + + # 'chargen' of AP does not have option for USB output. + # Force it work on UART. + if self.dev_prof["device_type"] == "AP": + self.usb_output = False + + # Check whether the command 'chargen' is available in the device. + # 'chargen 1 4' is supposed to print '0000' + self.get_output() # drain data + + chargen_cmd = "chargen 1 4" + if self.usb_output: + chargen_cmd += " usb" + self.run_command([chargen_cmd]) + tmp_txt = self.get_output() + + # Check whether chargen command is available. + if "0000" not in tmp_txt: + raise ChargenTestError( + "%s: Chargen got an unexpected result: %s" + % (self.dev_prof["device_type"], tmp_txt) + ) + + self.num_ch_exp = int(self.serial.baudrate * self.duration / 10) + chargen_cmd = ( + "chargen " + str(CHARGEN_TXT_LEN) + " " + str(self.num_ch_exp) + ) + if self.usb_output: + chargen_cmd += " usb" + self.test_cli = [chargen_cmd] + + self.logger.info("Ready to test") + finally: + self.serial.close() + + def stress_test_thread(self): + """Test thread + + Raises: + ChargenTestError: if broken character is found. + """ + try: + self.serial.open() + self.serial.flushInput() + self.serial.flushOutput() + + # Run TPM command in background to burden cr50. + if self.dev_prof["device_type"] == "AP" and self.cr50_workload: + self.run_command([CR50_LOAD_GEN_CMD]) + self.logger.debug("run TPM job while %s exists", FLAG_FILENAME) + + # Run the command 'chargen', one time + self.run_command([""]) # Give a line feed + self.get_output() # Drain the output + self.run_command(self.test_cli) + self.serial.readline() # Drain the echoed command line. + + err_msg = "%s: Expected %r but got %s after %d char received" + + # Keep capturing the output until the test timer is expired. + self.num_ch_cap = 0 + self.char_loss_occurrences = 0 + data_starve_count = 0 + + total_num_ch = ( + self.num_ch_exp + ) # Expected number of characters in total + ch_exp = CHARGEN_TXT[0] + ch_cap = ( + "z" # any character value is ok for loop initial condition. + ) + while self.num_ch_cap < total_num_ch: + captured = self.get_output() + + if captured: + # There is some output data. Reset the data starvation count. + data_starve_count = 0 + else: + data_starve_count += 1 + if data_starve_count > 1: + # If nothing was captured more than once, then terminate the test. + self.logger.debug("No more output") + break + + for ch_cap in captured: + if ch_cap not in CHARGEN_TXT: + # If it is not alpha-numeric, terminate the test. + if ch_cap not in CRLF: + # If it is neither a CR nor LF, then it is an error case. + self.logger.error( + "Whole captured characters: %r", captured + ) + raise ChargenTestError( + err_msg + % ( + "Broken char captured", + ch_exp, + hex(ord(ch_cap)), + self.num_ch_cap, + ) + ) + + # Set the loop termination condition true. + total_num_ch = self.num_ch_cap + + if self.num_ch_cap >= total_num_ch: + break + + if ch_exp != ch_cap: + # If it is alpha-numeric but not continuous, then some characters + # are lost. + self.logger.error( + err_msg, + "Char loss detected", + ch_exp, + repr(ch_cap), + self.num_ch_cap, + ) + self.char_loss_occurrences += 1 + + # Recalculate the expected number of characters to adjust + # termination condition. The loss might be bigger than this + # adjustment, but it is okay since it will terminates by either + # CR/LF detection or by data starvation. + idx_ch_exp = CHARGEN_TXT.find(ch_exp) + idx_ch_cap = CHARGEN_TXT.find(ch_cap) + if idx_ch_cap < idx_ch_exp: + idx_ch_cap += len(CHARGEN_TXT) + total_num_ch -= idx_ch_cap - idx_ch_exp + + self.num_ch_cap += 1 + + # Determine What character is expected next? + ch_exp = CHARGEN_TXT[ + (CHARGEN_TXT.find(ch_cap) + 1) % CHARGEN_TXT_LEN + ] + + finally: + self.serial.close() + + def start_test(self): + """Start the test thread""" + self.logger.info("Test thread starts") + self.test_thread.start() + + def wait_test_done(self): + """Wait until the test thread get done and join""" + self.test_thread.join() + self.logger.info("Test thread is done") + + def get_result(self): + """Display the result + + Returns: + Integer = the number of lost character + + Raises: + ChargenTestError: if the capture is corrupted. + """ + # If more characters than expected are captured, it means some messages + # from other than chargen are mixed. Stop processing further. + if self.num_ch_exp < self.num_ch_cap: + raise ChargenTestError( + "%s: UART output is corrupted." % self.dev_prof["device_type"] + ) + + # Get the count difference between the expected to the captured + # as the number of lost character. + char_lost = self.num_ch_exp - self.num_ch_cap + self.logger.info( + "%8d char lost / %10d (%.1f %%)", + char_lost, + self.num_ch_exp, + char_lost * 100.0 / self.num_ch_exp, + ) + + return char_lost, self.num_ch_exp, self.char_loss_occurrences - def get_output(self): - """Capture the UART output - Args: - stop_char: Read output buffer until it reads stop_char. +class ChargenTest(object): + """UART stress tester - Returns: - text from UART output. + Attributes: + logger: logging object + serials: Dictionary where key is filename of UART device, and the value is + UartSerial object """ - if self.serial.inWaiting() == 0: - time.sleep(1) - - return self.serial.read(self.serial.inWaiting()).decode() - def prepare(self): - """Prepare the test: - - Identify the type of UART device (EC or Kernel?), then - decide what kind of commands to use to generate stress loads. - - Raises: - ChargenTestError if UART source can't be identified. - """ - try: - self.logger.info('Preparing...') - - self.serial.open() - - # Prepare the device for test - self.serial.flushInput() - self.serial.flushOutput() - - self.get_output() # drain data - - # Give a couple of line feeds, and capture the prompt text - self.run_command(['', '']) - prompt_txt = self.get_output() - - # Detect the device source: EC or AP? - # Detect if the device is AP or EC console based on the captured. - for dev_prof in self.UART_DEV_PROFILES: - if dev_prof['prompt'] in prompt_txt: - self.dev_prof = dev_prof - break - else: - # No prompt patterns were found. UART seems not responding or in - # an undesirable status. - if prompt_txt: - raise ChargenTestError('%s: Got an unknown prompt text: %s\n' - 'Check manually whether %s is available.' % - (self.serial.port, prompt_txt, - self.serial.port)) - else: - raise ChargenTestError('%s: Got no input. Close any other connections' - ' to this port, and try it again.' % - self.serial.port) - - self.logger.info('Detected as %s UART', self.dev_prof['device_type']) - # Log displays the UART type (AP|EC) instead of device filename. - self.logger = logging.getLogger(type(self).__name__ + '| ' + - self.dev_prof['device_type']) - - # Either login to AP or run some commands to prepare the device - # for test - self.eol = self.dev_prof['end_of_input'] - self.run_command(self.dev_prof['prepare_cmd'], delay=2) - self.cleanup_cli += self.dev_prof['cleanup_cmd'] - - # 'chargen' of AP does not have option for USB output. - # Force it work on UART. - if self.dev_prof['device_type'] == 'AP': - self.usb_output = False - - # Check whether the command 'chargen' is available in the device. - # 'chargen 1 4' is supposed to print '0000' - self.get_output() # drain data - - chargen_cmd = 'chargen 1 4' - if self.usb_output: - chargen_cmd += ' usb' - self.run_command([chargen_cmd]) - tmp_txt = self.get_output() - - # Check whether chargen command is available. - if '0000' not in tmp_txt: - raise ChargenTestError('%s: Chargen got an unexpected result: %s' % - (self.dev_prof['device_type'], tmp_txt)) - - self.num_ch_exp = int(self.serial.baudrate * self.duration / 10) - chargen_cmd = 'chargen ' + str(CHARGEN_TXT_LEN) + ' ' + \ - str(self.num_ch_exp) - if self.usb_output: - chargen_cmd += ' usb' - self.test_cli = [chargen_cmd] - - self.logger.info('Ready to test') - finally: - self.serial.close() - - def stress_test_thread(self): - """Test thread - - Raises: - ChargenTestError: if broken character is found. - """ - try: - self.serial.open() - self.serial.flushInput() - self.serial.flushOutput() - - # Run TPM command in background to burden cr50. - if self.dev_prof['device_type'] == 'AP' and self.cr50_workload: - self.run_command([CR50_LOAD_GEN_CMD]) - self.logger.debug('run TPM job while %s exists', FLAG_FILENAME) - - # Run the command 'chargen', one time - self.run_command(['']) # Give a line feed - self.get_output() # Drain the output - self.run_command(self.test_cli) - self.serial.readline() # Drain the echoed command line. - - err_msg = '%s: Expected %r but got %s after %d char received' - - # Keep capturing the output until the test timer is expired. - self.num_ch_cap = 0 - self.char_loss_occurrences = 0 - data_starve_count = 0 - - total_num_ch = self.num_ch_exp # Expected number of characters in total - ch_exp = CHARGEN_TXT[0] - ch_cap = 'z' # any character value is ok for loop initial condition. - while self.num_ch_cap < total_num_ch: - captured = self.get_output() - - if captured: - # There is some output data. Reset the data starvation count. - data_starve_count = 0 + def __init__(self, ports, duration, cr50_workload=False, usb_output=False): + """Initialize UART stress tester + + Args: + ports: List of UART ports to test. + duration: Time to keep testing in seconds. + cr50_workload: True if a workload should be generated on cr50 + usb_output: True if a workload should be generated to USB channel + + Raises: + ChargenTestError: if any of ports is not a valid character device. + """ + + # Save the arguments + for port in ports: + try: + mode = os.stat(port).st_mode + except OSError as e: + raise ChargenTestError(e) + if not stat.S_ISCHR(mode): + raise ChargenTestError("%s is not a character device." % port) + + if duration <= 0: + raise ChargenTestError("Input error: duration is not positive.") + + # Initialize logging object + self.logger = logging.getLogger(type(self).__name__) + + # Create an UartSerial object per UART port + self.serials = {} # UartSerial objects + for port in ports: + self.serials[port] = UartSerial( + port=port, + duration=duration, + cr50_workload=cr50_workload, + usb_output=usb_output, + ) + + def prepare(self): + """Prepare the test for each UART port""" + self.logger.info("Prepare ports for test") + for _, ser in self.serials.items(): + ser.prepare() + self.logger.info("Ports are ready to test") + + def print_result(self): + """Display the test result for each UART port + + Returns: + char_lost: Total number of characters lost + """ + char_lost = 0 + for _, ser in self.serials.items(): + (tmp_lost, _, _) = ser.get_result() + char_lost += tmp_lost + + # If any characters are lost, then test fails. + msg = "lost %d character(s) from the test" % char_lost + if char_lost > 0: + self.logger.error("FAIL: %s", msg) else: - data_starve_count += 1 - if data_starve_count > 1: - # If nothing was captured more than once, then terminate the test. - self.logger.debug('No more output') - break - - for ch_cap in captured: - if ch_cap not in CHARGEN_TXT: - # If it is not alpha-numeric, terminate the test. - if ch_cap not in CRLF: - # If it is neither a CR nor LF, then it is an error case. - self.logger.error('Whole captured characters: %r', captured) - raise ChargenTestError(err_msg % ('Broken char captured', ch_exp, - hex(ord(ch_cap)), - self.num_ch_cap)) - - # Set the loop termination condition true. - total_num_ch = self.num_ch_cap - - if self.num_ch_cap >= total_num_ch: - break - - if ch_exp != ch_cap: - # If it is alpha-numeric but not continuous, then some characters - # are lost. - self.logger.error(err_msg, 'Char loss detected', - ch_exp, repr(ch_cap), self.num_ch_cap) - self.char_loss_occurrences += 1 - - # Recalculate the expected number of characters to adjust - # termination condition. The loss might be bigger than this - # adjustment, but it is okay since it will terminates by either - # CR/LF detection or by data starvation. - idx_ch_exp = CHARGEN_TXT.find(ch_exp) - idx_ch_cap = CHARGEN_TXT.find(ch_cap) - if idx_ch_cap < idx_ch_exp: - idx_ch_cap += len(CHARGEN_TXT) - total_num_ch -= (idx_ch_cap - idx_ch_exp) - - self.num_ch_cap += 1 - - # Determine What character is expected next? - ch_exp = CHARGEN_TXT[(CHARGEN_TXT.find(ch_cap) + 1) % CHARGEN_TXT_LEN] - - finally: - self.serial.close() - - def start_test(self): - """Start the test thread""" - self.logger.info('Test thread starts') - self.test_thread.start() - - def wait_test_done(self): - """Wait until the test thread get done and join""" - self.test_thread.join() - self.logger.info('Test thread is done') - - def get_result(self): - """Display the result + self.logger.info("PASS: %s", msg) - Returns: - Integer = the number of lost character + return char_lost - Raises: - ChargenTestError: if the capture is corrupted. - """ - # If more characters than expected are captured, it means some messages - # from other than chargen are mixed. Stop processing further. - if self.num_ch_exp < self.num_ch_cap: - raise ChargenTestError('%s: UART output is corrupted.' % - self.dev_prof['device_type']) + def run(self): + """Run the stress test on UART port(s) - # Get the count difference between the expected to the captured - # as the number of lost character. - char_lost = self.num_ch_exp - self.num_ch_cap - self.logger.info('%8d char lost / %10d (%.1f %%)', - char_lost, self.num_ch_exp, - char_lost * 100.0 / self.num_ch_exp) + Raises: + ChargenTestError: If any characters are lost. + """ - return char_lost, self.num_ch_exp, self.char_loss_occurrences + # Detect UART source type, and decide which command to test. + self.prepare() + # Run the test on each UART port in thread. + self.logger.info("Test starts") + for _, ser in self.serials.items(): + ser.start_test() -class ChargenTest(object): - """UART stress tester + # Wait all tests to finish. + for _, ser in self.serials.items(): + ser.wait_test_done() - Attributes: - logger: logging object - serials: Dictionary where key is filename of UART device, and the value is - UartSerial object - """ + # Print the result. + char_lost = self.print_result() + if char_lost: + raise ChargenTestError( + "Test failed: lost %d character(s)" % char_lost + ) - def __init__(self, ports, duration, cr50_workload=False, - usb_output=False): - """Initialize UART stress tester + self.logger.info("Test is done") - Args: - ports: List of UART ports to test. - duration: Time to keep testing in seconds. - cr50_workload: True if a workload should be generated on cr50 - usb_output: True if a workload should be generated to USB channel - Raises: - ChargenTestError: if any of ports is not a valid character device. - """ +def parse_args(cmdline): + """Parse command line arguments. - # Save the arguments - for port in ports: - try: - mode = os.stat(port).st_mode - except OSError as e: - raise ChargenTestError(e) - if not stat.S_ISCHR(mode): - raise ChargenTestError('%s is not a character device.' % port) - - if duration <= 0: - raise ChargenTestError('Input error: duration is not positive.') - - # Initialize logging object - self.logger = logging.getLogger(type(self).__name__) - - # Create an UartSerial object per UART port - self.serials = {} # UartSerial objects - for port in ports: - self.serials[port] = UartSerial(port=port, duration=duration, - cr50_workload=cr50_workload, - usb_output=usb_output) - - def prepare(self): - """Prepare the test for each UART port""" - self.logger.info('Prepare ports for test') - for _, ser in self.serials.items(): - ser.prepare() - self.logger.info('Ports are ready to test') - - def print_result(self): - """Display the test result for each UART port + Args: + cmdline: list to be parsed Returns: - char_lost: Total number of characters lost - """ - char_lost = 0 - for _, ser in self.serials.items(): - (tmp_lost, _, _) = ser.get_result() - char_lost += tmp_lost - - # If any characters are lost, then test fails. - msg = 'lost %d character(s) from the test' % char_lost - if char_lost > 0: - self.logger.error('FAIL: %s', msg) - else: - self.logger.info('PASS: %s', msg) - - return char_lost - - def run(self): - """Run the stress test on UART port(s) - - Raises: - ChargenTestError: If any characters are lost. + tuple (options, args) where args is a list of cmdline arguments that the + parser was unable to match i.e. they're servod controls, not options. """ - - # Detect UART source type, and decide which command to test. - self.prepare() - - # Run the test on each UART port in thread. - self.logger.info('Test starts') - for _, ser in self.serials.items(): - ser.start_test() - - # Wait all tests to finish. - for _, ser in self.serials.items(): - ser.wait_test_done() - - # Print the result. - char_lost = self.print_result() - if char_lost: - raise ChargenTestError('Test failed: lost %d character(s)' % - char_lost) - - self.logger.info('Test is done') - -def parse_args(cmdline): - """Parse command line arguments. - - Args: - cmdline: list to be parsed - - Returns: - tuple (options, args) where args is a list of cmdline arguments that the - parser was unable to match i.e. they're servod controls, not options. - """ - description = """%(prog)s repeats sending a uart console command + description = """%(prog)s repeats sending a uart console command to each UART device for a given time, and check if output has any missing characters. @@ -511,52 +549,74 @@ Examples: %(prog)s /dev/ttyUSB1 /dev/ttyUSB2 --cr50 """ - parser = argparse.ArgumentParser(description=description, - formatter_class=argparse.RawTextHelpFormatter - ) - parser.add_argument('port', type=str, nargs='*', - help='UART device path to test') - parser.add_argument('-c', '--cr50', action='store_true', default=False, - help='generate TPM workload on cr50') - parser.add_argument('-d', '--debug', action='store_true', default=False, - help='enable debug messages') - parser.add_argument('-t', '--time', type=int, - help='Test duration in second', default=300) - parser.add_argument('-u', '--usb', action='store_true', default=False, - help='Generate output to USB channel instead') - return parser.parse_known_args(cmdline) + parser = argparse.ArgumentParser( + description=description, formatter_class=argparse.RawTextHelpFormatter + ) + parser.add_argument( + "port", type=str, nargs="*", help="UART device path to test" + ) + parser.add_argument( + "-c", + "--cr50", + action="store_true", + default=False, + help="generate TPM workload on cr50", + ) + parser.add_argument( + "-d", + "--debug", + action="store_true", + default=False, + help="enable debug messages", + ) + parser.add_argument( + "-t", "--time", type=int, help="Test duration in second", default=300 + ) + parser.add_argument( + "-u", + "--usb", + action="store_true", + default=False, + help="Generate output to USB channel instead", + ) + return parser.parse_known_args(cmdline) def main(): - """Main function wrapper""" - try: - (options, _) = parse_args(sys.argv[1:]) - - # Set Log format - log_format = '%(asctime)s %(levelname)-6s | %(name)-25s' - date_format = '%Y-%m-%d %H:%M:%S' - if options.debug: - log_format += ' | %(filename)s:%(lineno)4d:%(funcName)-18s' - loglevel = logging.DEBUG - else: - loglevel = logging.INFO - log_format += ' | %(message)s' - - logging.basicConfig(level=loglevel, format=log_format, - datefmt=date_format) - - # Create a ChargenTest object - utest = ChargenTest(options.port, options.time, - cr50_workload=options.cr50, - usb_output=options.usb) - utest.run() # Run - - except KeyboardInterrupt: - sys.exit(0) - - except ChargenTestError as e: - logging.error(str(e)) - sys.exit(1) - -if __name__ == '__main__': - main() + """Main function wrapper""" + try: + (options, _) = parse_args(sys.argv[1:]) + + # Set Log format + log_format = "%(asctime)s %(levelname)-6s | %(name)-25s" + date_format = "%Y-%m-%d %H:%M:%S" + if options.debug: + log_format += " | %(filename)s:%(lineno)4d:%(funcName)-18s" + loglevel = logging.DEBUG + else: + loglevel = logging.INFO + log_format += " | %(message)s" + + logging.basicConfig( + level=loglevel, format=log_format, datefmt=date_format + ) + + # Create a ChargenTest object + utest = ChargenTest( + options.port, + options.time, + cr50_workload=options.cr50, + usb_output=options.usb, + ) + utest.run() # Run + + except KeyboardInterrupt: + sys.exit(0) + + except ChargenTestError as e: + logging.error(str(e)) + sys.exit(1) + + +if __name__ == "__main__": + main() |