summaryrefslogtreecommitdiff
path: root/util/uart_stress_tester.py
diff options
context:
space:
mode:
Diffstat (limited to 'util/uart_stress_tester.py')
-rwxr-xr-xutil/uart_stress_tester.py562
1 files changed, 0 insertions, 562 deletions
diff --git a/util/uart_stress_tester.py b/util/uart_stress_tester.py
deleted file mode 100755
index b3db60060e..0000000000
--- a/util/uart_stress_tester.py
+++ /dev/null
@@ -1,562 +0,0 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-# Copyright 2019 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-#
-# Ignore indention messages, since legacy scripts use 2 spaces instead of 4.
-# pylint: disable=bad-indentation,docstring-section-indent
-# pylint: disable=docstring-trailing-quotes
-
-"""ChromeOS Uart Stress Test
-
-This tester runs the command 'chargen' on EC and/or AP, captures the
-output, and compares it against the expected output to check any characters
-lost.
-
-Prerequisite:
- (1) This test needs PySerial. Please check if it is available before test.
- Can be installed by 'pip install pyserial'
- (2) If servod is running, turn uart_timestamp off before running this test.
- e.g. dut-control cr50_uart_timestamp:off
-"""
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
-import argparse
-import atexit
-import logging
-import os
-import stat
-import sys
-import threading
-import time
-
-import serial
-
-BAUDRATE = 115200 # Default baudrate setting for UART port
-CROS_USERNAME = 'root' # Account name to login to ChromeOS
-CROS_PASSWORD = 'test0000' # Password to login to ChromeOS
-CHARGEN_TXT = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
- # The result of 'chargen 62 62'
-CHARGEN_TXT_LEN = len(CHARGEN_TXT)
-CR = '\r' # Carriage Return
-LF = '\n' # Line Feed
-CRLF = CR + LF
-FLAG_FILENAME = '/tmp/chargen_testing'
-TPM_CMD = ('trunks_client --key_create --rsa=2048 --usage=sign'
- ' --key_blob=/tmp/blob &> /dev/null')
- # A ChromeOS TPM command for the cr50 stress
- # purpose.
-CR50_LOAD_GEN_CMD = ('while [[ -f %s ]]; do %s; done &'
- % (FLAG_FILENAME, TPM_CMD))
- # A command line to run TPM_CMD in background
- # infinitely.
-
-
-class ChargenTestError(Exception):
- """Exception for Uart Stress Test Error"""
- pass
-
-
-class UartSerial(object):
- """Test Object for a single UART serial device
-
- Attributes:
- UART_DEV_PROFILES
- char_loss_occurrences: Number that character loss happens
- cleanup_cli: Command list to perform before the test exits
- cr50_workload: True if cr50 should be stressed, or False otherwise
- usb_output: True if output should be generated to USB channel
- dev_prof: Dictionary of device profile
- duration: Time to keep chargen running
- eol: Characters to add at the end of input
- logger: object that store the log
- num_ch_exp: Expected number of characters in output
- num_ch_cap: Number of captured characters in output
- test_cli: Command list to run for chargen test
- test_thread: Thread object that captures the UART output
- serial: serial.Serial object
- """
- UART_DEV_PROFILES = (
- # Kernel
- {
- 'prompt':'localhost login:',
- 'device_type':'AP',
- 'prepare_cmd':[
- CROS_USERNAME, # Login
- CROS_PASSWORD, # Password
- 'dmesg -D', # Disable console message
- 'touch ' + FLAG_FILENAME, # Create a temp file
- ],
- 'cleanup_cmd':[
- 'rm -f ' + FLAG_FILENAME, # Remove the temp file
- 'dmesg -E', # Enable console message
- 'logout', # Logout
- ],
- 'end_of_input':LF,
- },
- # EC
- {
- 'prompt':'> ',
- 'device_type':'EC',
- 'prepare_cmd':[
- 'chan save',
- 'chan 0' # Disable console message
- ],
- 'cleanup_cmd':['', 'chan restore'],
- 'end_of_input':CRLF,
- },
- )
-
- def __init__(self, port, duration, timeout=1,
- baudrate=BAUDRATE, cr50_workload=False,
- usb_output=False):
- """Initialize UartSerial
-
- Args:
- port: UART device path. e.g. /dev/ttyUSB0
- duration: Time to test, in seconds
- timeout: Read timeout value.
- baudrate: Baud rate such as 9600 or 115200.
- cr50_workload: True if a workload should be generated on cr50
- usb_output: True if a workload should be generated to USB channel
- """
-
- # Initialize serial object
- self.serial = serial.Serial()
- self.serial.port = port
- self.serial.timeout = timeout
- self.serial.baudrate = baudrate
-
- self.duration = duration
- self.cr50_workload = cr50_workload
- self.usb_output = usb_output
-
- self.logger = logging.getLogger(type(self).__name__ + '| ' + port)
- self.test_thread = threading.Thread(target=self.stress_test_thread)
-
- self.dev_prof = {}
- self.cleanup_cli = []
- self.test_cli = []
- self.eol = CRLF
- self.num_ch_exp = 0
- self.num_ch_cap = 0
- self.char_loss_occurrences = 0
- atexit.register(self.cleanup)
-
- def run_command(self, command_lines, delay=0):
- """Run command(s) at UART prompt
-
- Args:
- command_lines: list of commands to run.
- delay: delay after a command in second
- """
- for cli in command_lines:
- self.logger.debug('run %r', cli)
-
- self.serial.write((cli + self.eol).encode())
- self.serial.flush()
- if delay:
- time.sleep(delay)
-
- def cleanup(self):
- """Before termination, clean up the UART device."""
- self.logger.debug('Closing...')
-
- self.serial.open()
- self.run_command(self.cleanup_cli) # Run cleanup commands
- self.serial.close()
-
- self.logger.debug('Cleanup done')
-
- def get_output(self):
- """Capture the UART output
-
- Args:
- stop_char: Read output buffer until it reads stop_char.
-
- Returns:
- text from UART output.
- """
- if self.serial.inWaiting() == 0:
- time.sleep(1)
-
- return self.serial.read(self.serial.inWaiting()).decode()
-
- def prepare(self):
- """Prepare the test:
-
- Identify the type of UART device (EC or Kernel?), then
- decide what kind of commands to use to generate stress loads.
-
- Raises:
- ChargenTestError if UART source can't be identified.
- """
- try:
- self.logger.info('Preparing...')
-
- self.serial.open()
-
- # Prepare the device for test
- self.serial.flushInput()
- self.serial.flushOutput()
-
- self.get_output() # drain data
-
- # Give a couple of line feeds, and capture the prompt text
- self.run_command(['', ''])
- prompt_txt = self.get_output()
-
- # Detect the device source: EC or AP?
- # Detect if the device is AP or EC console based on the captured.
- for dev_prof in self.UART_DEV_PROFILES:
- if dev_prof['prompt'] in prompt_txt:
- self.dev_prof = dev_prof
- break
- else:
- # No prompt patterns were found. UART seems not responding or in
- # an undesirable status.
- if prompt_txt:
- raise ChargenTestError('%s: Got an unknown prompt text: %s\n'
- 'Check manually whether %s is available.' %
- (self.serial.port, prompt_txt,
- self.serial.port))
- else:
- raise ChargenTestError('%s: Got no input. Close any other connections'
- ' to this port, and try it again.' %
- self.serial.port)
-
- self.logger.info('Detected as %s UART', self.dev_prof['device_type'])
- # Log displays the UART type (AP|EC) instead of device filename.
- self.logger = logging.getLogger(type(self).__name__ + '| ' +
- self.dev_prof['device_type'])
-
- # Either login to AP or run some commands to prepare the device
- # for test
- self.eol = self.dev_prof['end_of_input']
- self.run_command(self.dev_prof['prepare_cmd'], delay=2)
- self.cleanup_cli += self.dev_prof['cleanup_cmd']
-
- # 'chargen' of AP does not have option for USB output.
- # Force it work on UART.
- if self.dev_prof['device_type'] == 'AP':
- self.usb_output = False
-
- # Check whether the command 'chargen' is available in the device.
- # 'chargen 1 4' is supposed to print '0000'
- self.get_output() # drain data
-
- chargen_cmd = 'chargen 1 4'
- if self.usb_output:
- chargen_cmd += ' usb'
- self.run_command([chargen_cmd])
- tmp_txt = self.get_output()
-
- # Check whether chargen command is available.
- if '0000' not in tmp_txt:
- raise ChargenTestError('%s: Chargen got an unexpected result: %s' %
- (self.dev_prof['device_type'], tmp_txt))
-
- self.num_ch_exp = int(self.serial.baudrate * self.duration / 10)
- chargen_cmd = 'chargen ' + str(CHARGEN_TXT_LEN) + ' ' + \
- str(self.num_ch_exp)
- if self.usb_output:
- chargen_cmd += ' usb'
- self.test_cli = [chargen_cmd]
-
- self.logger.info('Ready to test')
- finally:
- self.serial.close()
-
- def stress_test_thread(self):
- """Test thread
-
- Raises:
- ChargenTestError: if broken character is found.
- """
- try:
- self.serial.open()
- self.serial.flushInput()
- self.serial.flushOutput()
-
- # Run TPM command in background to burden cr50.
- if self.dev_prof['device_type'] == 'AP' and self.cr50_workload:
- self.run_command([CR50_LOAD_GEN_CMD])
- self.logger.debug('run TPM job while %s exists', FLAG_FILENAME)
-
- # Run the command 'chargen', one time
- self.run_command(['']) # Give a line feed
- self.get_output() # Drain the output
- self.run_command(self.test_cli)
- self.serial.readline() # Drain the echoed command line.
-
- err_msg = '%s: Expected %r but got %s after %d char received'
-
- # Keep capturing the output until the test timer is expired.
- self.num_ch_cap = 0
- self.char_loss_occurrences = 0
- data_starve_count = 0
-
- total_num_ch = self.num_ch_exp # Expected number of characters in total
- ch_exp = CHARGEN_TXT[0]
- ch_cap = 'z' # any character value is ok for loop initial condition.
- while self.num_ch_cap < total_num_ch:
- captured = self.get_output()
-
- if captured:
- # There is some output data. Reset the data starvation count.
- data_starve_count = 0
- else:
- data_starve_count += 1
- if data_starve_count > 1:
- # If nothing was captured more than once, then terminate the test.
- self.logger.debug('No more output')
- break
-
- for ch_cap in captured:
- if ch_cap not in CHARGEN_TXT:
- # If it is not alpha-numeric, terminate the test.
- if ch_cap not in CRLF:
- # If it is neither a CR nor LF, then it is an error case.
- self.logger.error('Whole captured characters: %r', captured)
- raise ChargenTestError(err_msg % ('Broken char captured', ch_exp,
- hex(ord(ch_cap)),
- self.num_ch_cap))
-
- # Set the loop termination condition true.
- total_num_ch = self.num_ch_cap
-
- if self.num_ch_cap >= total_num_ch:
- break
-
- if ch_exp != ch_cap:
- # If it is alpha-numeric but not continuous, then some characters
- # are lost.
- self.logger.error(err_msg, 'Char loss detected',
- ch_exp, repr(ch_cap), self.num_ch_cap)
- self.char_loss_occurrences += 1
-
- # Recalculate the expected number of characters to adjust
- # termination condition. The loss might be bigger than this
- # adjustment, but it is okay since it will terminates by either
- # CR/LF detection or by data starvation.
- idx_ch_exp = CHARGEN_TXT.find(ch_exp)
- idx_ch_cap = CHARGEN_TXT.find(ch_cap)
- if idx_ch_cap < idx_ch_exp:
- idx_ch_cap += len(CHARGEN_TXT)
- total_num_ch -= (idx_ch_cap - idx_ch_exp)
-
- self.num_ch_cap += 1
-
- # Determine What character is expected next?
- ch_exp = CHARGEN_TXT[(CHARGEN_TXT.find(ch_cap) + 1) % CHARGEN_TXT_LEN]
-
- finally:
- self.serial.close()
-
- def start_test(self):
- """Start the test thread"""
- self.logger.info('Test thread starts')
- self.test_thread.start()
-
- def wait_test_done(self):
- """Wait until the test thread get done and join"""
- self.test_thread.join()
- self.logger.info('Test thread is done')
-
- def get_result(self):
- """Display the result
-
- Returns:
- Integer = the number of lost character
-
- Raises:
- ChargenTestError: if the capture is corrupted.
- """
- # If more characters than expected are captured, it means some messages
- # from other than chargen are mixed. Stop processing further.
- if self.num_ch_exp < self.num_ch_cap:
- raise ChargenTestError('%s: UART output is corrupted.' %
- self.dev_prof['device_type'])
-
- # Get the count difference between the expected to the captured
- # as the number of lost character.
- char_lost = self.num_ch_exp - self.num_ch_cap
- self.logger.info('%8d char lost / %10d (%.1f %%)',
- char_lost, self.num_ch_exp,
- char_lost * 100.0 / self.num_ch_exp)
-
- return char_lost, self.num_ch_exp, self.char_loss_occurrences
-
-
-class ChargenTest(object):
- """UART stress tester
-
- Attributes:
- logger: logging object
- serials: Dictionary where key is filename of UART device, and the value is
- UartSerial object
- """
-
- def __init__(self, ports, duration, cr50_workload=False,
- usb_output=False):
- """Initialize UART stress tester
-
- Args:
- ports: List of UART ports to test.
- duration: Time to keep testing in seconds.
- cr50_workload: True if a workload should be generated on cr50
- usb_output: True if a workload should be generated to USB channel
-
- Raises:
- ChargenTestError: if any of ports is not a valid character device.
- """
-
- # Save the arguments
- for port in ports:
- try:
- mode = os.stat(port).st_mode
- except OSError as e:
- raise ChargenTestError(e)
- if not stat.S_ISCHR(mode):
- raise ChargenTestError('%s is not a character device.' % port)
-
- if duration <= 0:
- raise ChargenTestError('Input error: duration is not positive.')
-
- # Initialize logging object
- self.logger = logging.getLogger(type(self).__name__)
-
- # Create an UartSerial object per UART port
- self.serials = {} # UartSerial objects
- for port in ports:
- self.serials[port] = UartSerial(port=port, duration=duration,
- cr50_workload=cr50_workload,
- usb_output=usb_output)
-
- def prepare(self):
- """Prepare the test for each UART port"""
- self.logger.info('Prepare ports for test')
- for _, ser in self.serials.items():
- ser.prepare()
- self.logger.info('Ports are ready to test')
-
- def print_result(self):
- """Display the test result for each UART port
-
- Returns:
- char_lost: Total number of characters lost
- """
- char_lost = 0
- for _, ser in self.serials.items():
- (tmp_lost, _, _) = ser.get_result()
- char_lost += tmp_lost
-
- # If any characters are lost, then test fails.
- msg = 'lost %d character(s) from the test' % char_lost
- if char_lost > 0:
- self.logger.error('FAIL: %s', msg)
- else:
- self.logger.info('PASS: %s', msg)
-
- return char_lost
-
- def run(self):
- """Run the stress test on UART port(s)
-
- Raises:
- ChargenTestError: If any characters are lost.
- """
-
- # Detect UART source type, and decide which command to test.
- self.prepare()
-
- # Run the test on each UART port in thread.
- self.logger.info('Test starts')
- for _, ser in self.serials.items():
- ser.start_test()
-
- # Wait all tests to finish.
- for _, ser in self.serials.items():
- ser.wait_test_done()
-
- # Print the result.
- char_lost = self.print_result()
- if char_lost:
- raise ChargenTestError('Test failed: lost %d character(s)' %
- char_lost)
-
- self.logger.info('Test is done')
-
-def parse_args(cmdline):
- """Parse command line arguments.
-
- Args:
- cmdline: list to be parsed
-
- Returns:
- tuple (options, args) where args is a list of cmdline arguments that the
- parser was unable to match i.e. they're servod controls, not options.
- """
- description = """%(prog)s repeats sending a uart console command
-to each UART device for a given time, and check if output
-has any missing characters.
-
-Examples:
- %(prog)s /dev/ttyUSB2 --time 3600
- %(prog)s /dev/ttyUSB1 /dev/ttyUSB2 --debug
- %(prog)s /dev/ttyUSB1 /dev/ttyUSB2 --cr50
-"""
-
- parser = argparse.ArgumentParser(description=description,
- formatter_class=argparse.RawTextHelpFormatter
- )
- parser.add_argument('port', type=str, nargs='*',
- help='UART device path to test')
- parser.add_argument('-c', '--cr50', action='store_true', default=False,
- help='generate TPM workload on cr50')
- parser.add_argument('-d', '--debug', action='store_true', default=False,
- help='enable debug messages')
- parser.add_argument('-t', '--time', type=int,
- help='Test duration in second', default=300)
- parser.add_argument('-u', '--usb', action='store_true', default=False,
- help='Generate output to USB channel instead')
- return parser.parse_known_args(cmdline)
-
-
-def main():
- """Main function wrapper"""
- try:
- (options, _) = parse_args(sys.argv[1:])
-
- # Set Log format
- log_format = '%(asctime)s %(levelname)-6s | %(name)-25s'
- date_format = '%Y-%m-%d %H:%M:%S'
- if options.debug:
- log_format += ' | %(filename)s:%(lineno)4d:%(funcName)-18s'
- loglevel = logging.DEBUG
- else:
- loglevel = logging.INFO
- log_format += ' | %(message)s'
-
- logging.basicConfig(level=loglevel, format=log_format,
- datefmt=date_format)
-
- # Create a ChargenTest object
- utest = ChargenTest(options.port, options.time,
- cr50_workload=options.cr50,
- usb_output=options.usb)
- utest.run() # Run
-
- except KeyboardInterrupt:
- sys.exit(0)
-
- except ChargenTestError as e:
- logging.error(str(e))
- sys.exit(1)
-
-if __name__ == '__main__':
- main()