diff options
author | Tom Hughes <tomhughes@chromium.org> | 2022-09-21 14:08:36 -0700 |
---|---|---|
committer | Tom Hughes <tomhughes@chromium.org> | 2022-09-22 12:59:38 -0700 |
commit | c453fd704268ef72de871b0c5ac7a989de662334 (patch) | |
tree | fcf6ce5810f9ff9e3c8cce434812dd75492269ed /extra/tigertool/ecusb/pty_driver.py | |
parent | 6c1587ca70f558b4f96b3f0b18ad8b027d3ba99d (diff) | |
parent | 28712dae9d7ed1e694f7622cc083afa71090d4d5 (diff) | |
download | chrome-ec-c453fd704268ef72de871b0c5ac7a989de662334.tar.gz |
Merge remote-tracking branch cros/main into firmware-fpmcu-dartmonkey-releasefirmware-fpmcu-dartmonkey-release
Generated by: ./util/update_release_branch.py --board dartmonkey --relevant_paths_file
./util/fingerprint-relevant-paths.txt firmware-fpmcu-dartmonkey-release
Relevant changes:
git log --oneline 6c1587ca70..28712dae9d -- board/nocturne_fp
board/dartmonkey common/fpsensor docs/fingerprint driver/fingerprint
util/getversion.sh
ded9307b79 util/getversion.sh: Fix version when not in a git repo
956055e692 board: change Google USB vendor info
71b2ef709d Update license boilerplate text in source code files
33e11afda0 Revert "fpsensor: Build fpsensor source file with C++"
c8d0360723 fpsensor: Build fpsensor source file with C++
bc113abd53 fpsensor: Fix g++ compiler error
150a58a0dc fpsensor: Fix fp_set_sensor_mode return type
b33b5ce85b fpsensor: Remove nested designators for C++ compatibility
2e864b2539 tree-wide: const-ify argv for console commands
56d8b360f9 test: Add test for get ikm failure when seed not set
3a3d6c3690 test: Add test for fpsensor trivial key failure
233e6bbd08 fpsensor_crypto: Abstract calls to hmac_SHA256
0a041b285b docs/fingerprint: Typo correction
c03fab67e2 docs/fingerprint: Fix the path of fputils.py
0b5d4baf5a util/getversion.sh: Fix empty file list handling
6e128fe760 FPMCU dev board environment with Satlab
3eb29b6aa5 builtin: Move ssize_t to sys/types.h
345d62ebd1 docs/fingerprint: Update power numbers for latest dartmonkey release
c25ffdb316 common: Conditionally support printf %l and %i modifiers
9a3c514b45 test: Add a test to check if the debugger is connected
54e603413f Move standard library tests to their own file
43fa6b4bf8 docs/fingerprint: Update power numbers for latest bloonchipper release
25536f9a84 driver/fingerprint/fpc/bep/fpc_sensor_spi.c: Format with clang-format
4face99efd driver/fingerprint/fpc/libfp/fpc_sensor_pal.h: Format with clang-format
738de2b575 trng: Rename rand to trng_rand
14b8270edd docs/fingerprint: Update dragonclaw power numbers
0b268f93d1 driver/fingerprint/fpc/libfp/fpc_private.c: Format with clang-format
f80da163f2 driver/fingerprint/fpc/libfp/fpc_private.h: Format with clang-format
a0751778f4 board/nocturne_fp/ro_workarounds.c: Format with clang-format
5e9c85c9b1 driver/fingerprint/fpc/libfp/fpc_sensor_pal.c: Format with clang-format
c1f9dd3cf8 driver/fingerprint/fpc/libfp/fpc_bio_algorithm.h: Format with clang-format
eb1e1bed8d driver/fingerprint/fpc/libfp/fpc1145_private.h: Format with clang-format
6e7b611821 driver/fingerprint/fpc/bep/fpc_bio_algorithm.h: Format with clang-format
e0589cd5e2 driver/fingerprint/fpc/bep/fpc1035_private.h: Format with clang-format
58f0246dbe board/nocturne_fp/board_ro.c: Format with clang-format
7905e556a0 common/fpsensor/fpsensor_crypto.c: Format with clang-format
21289d170c driver/fingerprint/fpc/bep/fpc1025_private.h: Format with clang-format
98a20f937e common/fpsensor/fpsensor_state.c: Format with clang-format
a2d255d8af common/fpsensor/fpsensor.c: Format with clang-format
84e53a65da board/nocturne_fp/board.h: Format with clang-format
73055eeb3f driver/fingerprint/fpc/bep/fpc_private.c: Format with clang-format
0f7b5cb509 common/fpsensor/fpsensor_private.h: Format with clang-format
1ceade6e65 driver/fingerprint/fpc/bep/fpc_private.h: Format with clang-format
dca9d74321 Revert "trng: Rename rand to trng_rand"
a6b0b3554f trng: Rename rand to trng_rand
28d0b75b70 third_party/boringssl: Remove unused header
BRANCH=None
BUG=b:244387210 b:242720240 b:215613183 b:242720910 b:236386294
BUG=b:234181908 b:244781166 b:234781655 b:234143158 b:234181908
BUG=b:237344361 b:236025198 b:234181908 b:180945056 chromium:1098010
BUG=b:246424843 b:234181908 b:131913998
TEST=`make -j buildall`
TEST=./util/run_device_tests.py --board dartmonkey
Test "aes": PASSED
Test "cec": PASSED
Test "cortexm_fpu": PASSED
Test "crc": PASSED
Test "flash_physical": PASSED
Test "flash_write_protect": PASSED
Test "fpsensor_hw": PASSED
Test "fpsensor_spi_ro": PASSED
Test "fpsensor_spi_rw": PASSED
Test "fpsensor_uart_ro": PASSED
Test "fpsensor_uart_rw": PASSED
Test "mpu_ro": PASSED
Test "mpu_rw": PASSED
Test "mutex": PASSED
Test "pingpong": PASSED
Test "printf": PASSED
Test "queue": PASSED
Test "rollback_region0": PASSED
Test "rollback_region1": PASSED
Test "rollback_entropy": PASSED
Test "rtc": PASSED
Test "sha256": PASSED
Test "sha256_unrolled": PASSED
Test "static_if": PASSED
Test "stdlib": PASSED
Test "system_is_locked_wp_on": PASSED
Test "system_is_locked_wp_off": PASSED
Test "timer_dos": PASSED
Test "utils": PASSED
Test "utils_str": PASSED
Test "panic_data_dartmonkey_v2.0.2887": PASSED
Test "panic_data_nocturne_fp_v2.2.64": PASSED
Test "panic_data_nami_fp_v2.2.144": PASSED
Force-Relevant-Builds: all
Signed-off-by: Tom Hughes <tomhughes@chromium.org>
Change-Id: I2c312583a709fedae8fe11d92c22328c3b634bc7
Diffstat (limited to 'extra/tigertool/ecusb/pty_driver.py')
-rw-r--r-- | extra/tigertool/ecusb/pty_driver.py | 559 |
1 files changed, 283 insertions, 276 deletions
diff --git a/extra/tigertool/ecusb/pty_driver.py b/extra/tigertool/ecusb/pty_driver.py index 09ef8c42e4..723bf41b57 100644 --- a/extra/tigertool/ecusb/pty_driver.py +++ b/extra/tigertool/ecusb/pty_driver.py @@ -1,10 +1,6 @@ -# Copyright 2017 The Chromium OS Authors. All rights reserved. +# Copyright 2017 The ChromiumOS Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. -# -# Ignore indention messages, since legacy scripts use 2 spaces instead of 4. -# pylint: disable=bad-indentation,docstring-section-indent -# pylint: disable=docstring-trailing-quotes """ptyDriver class @@ -17,9 +13,10 @@ import ast import errno import fcntl import os -import pexpect import time -from pexpect import fdpexpect + +import pexpect # pylint:disable=import-error +from pexpect import fdpexpect # pylint:disable=import-error # Expecting a result in 3 seconds is plenty even for slow platforms. DEFAULT_UART_TIMEOUT = 3 @@ -27,281 +24,291 @@ FLUSH_UART_TIMEOUT = 1 class ptyError(Exception): - """Exception class for pty errors.""" + """Exception class for pty errors.""" UART_PARAMS = { - 'uart_cmd': None, - 'uart_multicmd': None, - 'uart_regexp': None, - 'uart_timeout': DEFAULT_UART_TIMEOUT, + "uart_cmd": None, + "uart_multicmd": None, + "uart_regexp": None, + "uart_timeout": DEFAULT_UART_TIMEOUT, } class ptyDriver(object): - """Automate interactive commands on a pty interface.""" - def __init__(self, interface, params, fast=False): - """Init class variables.""" - self._child = None - self._fd = None - self._interface = interface - self._pty_path = self._interface.get_pty() - self._dict = UART_PARAMS.copy() - self._fast = fast - - def __del__(self): - self.close() - - def close(self): - """Close any open files and interfaces.""" - if self._fd: - self._close() - self._interface.close() - - def _open(self): - """Connect to serial device and create pexpect interface.""" - assert self._fd is None - self._fd = os.open(self._pty_path, os.O_RDWR | os.O_NONBLOCK) - # Don't allow forked processes to access. - fcntl.fcntl(self._fd, fcntl.F_SETFD, - fcntl.fcntl(self._fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC) - self._child = fdpexpect.fdspawn(self._fd) - # pexpect defaults to a 100ms delay before sending characters, to - # work around race conditions in ssh. We don't need this feature - # so we'll change delaybeforesend from 0.1 to 0.001 to speed things up. - if self._fast: - self._child.delaybeforesend = 0.001 - - def _close(self): - """Close serial device connection.""" - os.close(self._fd) - self._fd = None - self._child = None - - def _flush(self): - """Flush device output to prevent previous messages interfering.""" - if self._child.sendline('') != 1: - raise ptyError('Failed to send newline.') - # Have a maximum timeout for the flush operation. We should have cleared - # all data from the buffer, but if data is regularly being generated, we - # can't guarantee it will ever stop. - flush_end_time = time.time() + FLUSH_UART_TIMEOUT - while time.time() <= flush_end_time: - try: - self._child.expect('.', timeout=0.01) - except (pexpect.TIMEOUT, pexpect.EOF): - break - except OSError as e: - # EAGAIN indicates no data available, maybe we didn't wait long enough. - if e.errno != errno.EAGAIN: - raise - break - - def _send(self, cmds): - """Send command to EC. - - This function always flushes serial device before sending, and is used as - a wrapper function to make sure the channel is always flushed before - sending commands. - - Args: - cmds: The commands to send to the device, either a list or a string. - - Raises: - ptyError: Raised when writing to the device fails. - """ - self._flush() - if not isinstance(cmds, list): - cmds = [cmds] - for cmd in cmds: - if self._child.sendline(cmd) != len(cmd) + 1: - raise ptyError('Failed to send command.') - - def _issue_cmd(self, cmds): - """Send command to the device and do not wait for response. - - Args: - cmds: The commands to send to the device, either a list or a string. - """ - self._issue_cmd_get_results(cmds, []) - - def _issue_cmd_get_results(self, cmds, - regex_list, timeout=DEFAULT_UART_TIMEOUT): - """Send command to the device and wait for response. - - This function waits for response message matching a regular - expressions. - - Args: - cmds: The commands issued, either a list or a string. - regex_list: List of Regular expressions used to match response message. - Note1, list must be ordered. - Note2, empty list sends and returns. - timeout: time to wait for matching results before failing. - - Returns: - List of tuples, each of which contains the entire matched string and - all the subgroups of the match. None if not matched. - For example: - response of the given command: - High temp: 37.2 - Low temp: 36.4 - regex_list: - ['High temp: (\d+)\.(\d+)', 'Low temp: (\d+)\.(\d+)'] - returns: - [('High temp: 37.2', '37', '2'), ('Low temp: 36.4', '36', '4')] - - Raises: - ptyError: If timed out waiting for a response - """ - result_list = [] - self._open() - try: - self._send(cmds) - for regex in regex_list: - self._child.expect(regex, timeout) - match = self._child.match - lastindex = match.lastindex if match and match.lastindex else 0 - # Create a tuple which contains the entire matched string and all - # the subgroups of the match. - result = match.group(*range(lastindex + 1)) if match else None - if result: - result = tuple(res.decode('utf-8') for res in result) - result_list.append(result) - except pexpect.TIMEOUT: - raise ptyError('Timeout waiting for response.') - finally: - if not regex_list: - # Must be longer than delaybeforesend - time.sleep(0.1) - self._close() - return result_list - - def _issue_cmd_get_multi_results(self, cmd, regex): - """Send command to the device and wait for multiple response. - - This function waits for arbitrary number of response message - matching a regular expression. - - Args: - cmd: The command issued. - regex: Regular expression used to match response message. - - Returns: - List of tuples, each of which contains the entire matched string and - all the subgroups of the match. None if not matched. - """ - result_list = [] - self._open() - try: - self._send(cmd) - while True: + """Automate interactive commands on a pty interface.""" + + def __init__(self, interface, params, fast=False): + """Init class variables.""" + self._child = None + self._fd = None + self._interface = interface + self._pty_path = self._interface.get_pty() + self._dict = UART_PARAMS.copy() + self._fast = fast + + def __del__(self): + self.close() + + def close(self): + """Close any open files and interfaces.""" + if self._fd: + self._close() + self._interface.close() + + def _open(self): + """Connect to serial device and create pexpect interface.""" + assert self._fd is None + self._fd = os.open(self._pty_path, os.O_RDWR | os.O_NONBLOCK) + # Don't allow forked processes to access. + fcntl.fcntl( + self._fd, + fcntl.F_SETFD, + fcntl.fcntl(self._fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC, + ) + self._child = fdpexpect.fdspawn(self._fd) + # pexpect defaults to a 100ms delay before sending characters, to + # work around race conditions in ssh. We don't need this feature + # so we'll change delaybeforesend from 0.1 to 0.001 to speed things up. + if self._fast: + self._child.delaybeforesend = 0.001 + + def _close(self): + """Close serial device connection.""" + os.close(self._fd) + self._fd = None + self._child = None + + def _flush(self): + """Flush device output to prevent previous messages interfering.""" + if self._child.sendline("") != 1: + raise ptyError("Failed to send newline.") + # Have a maximum timeout for the flush operation. We should have cleared + # all data from the buffer, but if data is regularly being generated, we + # can't guarantee it will ever stop. + flush_end_time = time.time() + FLUSH_UART_TIMEOUT + while time.time() <= flush_end_time: + try: + self._child.expect(".", timeout=0.01) + except (pexpect.TIMEOUT, pexpect.EOF): + break + except OSError as e: + # EAGAIN indicates no data available, maybe we didn't wait long enough. + if e.errno != errno.EAGAIN: + raise + break + + def _send(self, cmds): + """Send command to EC. + + This function always flushes serial device before sending, and is used as + a wrapper function to make sure the channel is always flushed before + sending commands. + + Args: + cmds: The commands to send to the device, either a list or a string. + + Raises: + ptyError: Raised when writing to the device fails. + """ + self._flush() + if not isinstance(cmds, list): + cmds = [cmds] + for cmd in cmds: + if self._child.sendline(cmd) != len(cmd) + 1: + raise ptyError("Failed to send command.") + + def _issue_cmd(self, cmds): + """Send command to the device and do not wait for response. + + Args: + cmds: The commands to send to the device, either a list or a string. + """ + self._issue_cmd_get_results(cmds, []) + + def _issue_cmd_get_results( + self, cmds, regex_list, timeout=DEFAULT_UART_TIMEOUT + ): + """Send command to the device and wait for response. + + This function waits for response message matching a regular + expressions. + + Args: + cmds: The commands issued, either a list or a string. + regex_list: List of Regular expressions used to match response message. + Note1, list must be ordered. + Note2, empty list sends and returns. + timeout: time to wait for matching results before failing. + + Returns: + List of tuples, each of which contains the entire matched string and + all the subgroups of the match. None if not matched. + For example: + response of the given command: + High temp: 37.2 + Low temp: 36.4 + regex_list: + ['High temp: (\d+)\.(\d+)', 'Low temp: (\d+)\.(\d+)'] + returns: + [('High temp: 37.2', '37', '2'), ('Low temp: 36.4', '36', '4')] + + Raises: + ptyError: If timed out waiting for a response + """ + result_list = [] + self._open() try: - self._child.expect(regex, timeout=0.1) - match = self._child.match - lastindex = match.lastindex if match and match.lastindex else 0 - # Create a tuple which contains the entire matched string and all - # the subgroups of the match. - result = match.group(*range(lastindex + 1)) if match else None - if result: - result = tuple(res.decode('utf-8') for res in result) - result_list.append(result) + self._send(cmds) + for regex in regex_list: + self._child.expect(regex, timeout) + match = self._child.match + lastindex = match.lastindex if match and match.lastindex else 0 + # Create a tuple which contains the entire matched string and all + # the subgroups of the match. + result = match.group(*range(lastindex + 1)) if match else None + if result: + result = tuple(res.decode("utf-8") for res in result) + result_list.append(result) except pexpect.TIMEOUT: - break - finally: - self._close() - return result_list - - def _Set_uart_timeout(self, timeout): - """Set timeout value for waiting for the device response. - - Args: - timeout: Timeout value in second. - """ - self._dict['uart_timeout'] = timeout - - def _Get_uart_timeout(self): - """Get timeout value for waiting for the device response. - - Returns: - Timeout value in second. - """ - return self._dict['uart_timeout'] - - def _Set_uart_regexp(self, regexp): - """Set the list of regular expressions which matches the command response. - - Args: - regexp: A string which contains a list of regular expressions. - """ - if not isinstance(regexp, str): - raise ptyError('The argument regexp should be a string.') - self._dict['uart_regexp'] = ast.literal_eval(regexp) - - def _Get_uart_regexp(self): - """Get the list of regular expressions which matches the command response. - - Returns: - A string which contains a list of regular expressions. - """ - return str(self._dict['uart_regexp']) - - def _Set_uart_cmd(self, cmd): - """Set the UART command and send it to the device. - - If ec_uart_regexp is 'None', the command is just sent and it doesn't care - about its response. - - If ec_uart_regexp is not 'None', the command is send and its response, - which matches the regular expression of ec_uart_regexp, will be kept. - Use its getter to obtain this result. If no match after ec_uart_timeout - seconds, a timeout error will be raised. - - Args: - cmd: A string of UART command. - """ - if self._dict['uart_regexp']: - self._dict['uart_cmd'] = self._issue_cmd_get_results( - cmd, self._dict['uart_regexp'], self._dict['uart_timeout']) - else: - self._dict['uart_cmd'] = None - self._issue_cmd(cmd) - - def _Set_uart_multicmd(self, cmds): - """Set multiple UART commands and send them to the device. - - Note that ec_uart_regexp is not supported to match the results. - - Args: - cmds: A semicolon-separated string of UART commands. - """ - self._issue_cmd(cmds.split(';')) - - def _Get_uart_cmd(self): - """Get the result of the latest UART command. - - Returns: - A string which contains a list of tuples, each of which contains the - entire matched string and all the subgroups of the match. 'None' if - the ec_uart_regexp is 'None'. - """ - return str(self._dict['uart_cmd']) - - def _Set_uart_capture(self, cmd): - """Set UART capture mode (on or off). - - Once capture is enabled, UART output could be collected periodically by - invoking _Get_uart_stream() below. - - Args: - cmd: True for on, False for off - """ - self._interface.set_capture_active(cmd) - - def _Get_uart_capture(self): - """Get the UART capture mode (on or off).""" - return self._interface.get_capture_active() - - def _Get_uart_stream(self): - """Get uart stream generated since last time.""" - return self._interface.get_stream() + raise ptyError("Timeout waiting for response.") + finally: + if not regex_list: + # Must be longer than delaybeforesend + time.sleep(0.1) + self._close() + return result_list + + def _issue_cmd_get_multi_results(self, cmd, regex): + """Send command to the device and wait for multiple response. + + This function waits for arbitrary number of response message + matching a regular expression. + + Args: + cmd: The command issued. + regex: Regular expression used to match response message. + + Returns: + List of tuples, each of which contains the entire matched string and + all the subgroups of the match. None if not matched. + """ + result_list = [] + self._open() + try: + self._send(cmd) + while True: + try: + self._child.expect(regex, timeout=0.1) + match = self._child.match + lastindex = ( + match.lastindex if match and match.lastindex else 0 + ) + # Create a tuple which contains the entire matched string and all + # the subgroups of the match. + result = ( + match.group(*range(lastindex + 1)) if match else None + ) + if result: + result = tuple(res.decode("utf-8") for res in result) + result_list.append(result) + except pexpect.TIMEOUT: + break + finally: + self._close() + return result_list + + def _Set_uart_timeout(self, timeout): + """Set timeout value for waiting for the device response. + + Args: + timeout: Timeout value in second. + """ + self._dict["uart_timeout"] = timeout + + def _Get_uart_timeout(self): + """Get timeout value for waiting for the device response. + + Returns: + Timeout value in second. + """ + return self._dict["uart_timeout"] + + def _Set_uart_regexp(self, regexp): + """Set the list of regular expressions which matches the command response. + + Args: + regexp: A string which contains a list of regular expressions. + """ + if not isinstance(regexp, str): + raise ptyError("The argument regexp should be a string.") + self._dict["uart_regexp"] = ast.literal_eval(regexp) + + def _Get_uart_regexp(self): + """Get the list of regular expressions which matches the command response. + + Returns: + A string which contains a list of regular expressions. + """ + return str(self._dict["uart_regexp"]) + + def _Set_uart_cmd(self, cmd): + """Set the UART command and send it to the device. + + If ec_uart_regexp is 'None', the command is just sent and it doesn't care + about its response. + + If ec_uart_regexp is not 'None', the command is send and its response, + which matches the regular expression of ec_uart_regexp, will be kept. + Use its getter to obtain this result. If no match after ec_uart_timeout + seconds, a timeout error will be raised. + + Args: + cmd: A string of UART command. + """ + if self._dict["uart_regexp"]: + self._dict["uart_cmd"] = self._issue_cmd_get_results( + cmd, self._dict["uart_regexp"], self._dict["uart_timeout"] + ) + else: + self._dict["uart_cmd"] = None + self._issue_cmd(cmd) + + def _Set_uart_multicmd(self, cmds): + """Set multiple UART commands and send them to the device. + + Note that ec_uart_regexp is not supported to match the results. + + Args: + cmds: A semicolon-separated string of UART commands. + """ + self._issue_cmd(cmds.split(";")) + + def _Get_uart_cmd(self): + """Get the result of the latest UART command. + + Returns: + A string which contains a list of tuples, each of which contains the + entire matched string and all the subgroups of the match. 'None' if + the ec_uart_regexp is 'None'. + """ + return str(self._dict["uart_cmd"]) + + def _Set_uart_capture(self, cmd): + """Set UART capture mode (on or off). + + Once capture is enabled, UART output could be collected periodically by + invoking _Get_uart_stream() below. + + Args: + cmd: True for on, False for off + """ + self._interface.set_capture_active(cmd) + + def _Get_uart_capture(self): + """Get the UART capture mode (on or off).""" + return self._interface.get_capture_active() + + def _Get_uart_stream(self): + """Get uart stream generated since last time.""" + return self._interface.get_stream() |