diff options
Diffstat (limited to 'extra/tigertool/ecusb/pty_driver.py')
-rw-r--r-- | extra/tigertool/ecusb/pty_driver.py | 559 |
1 files changed, 283 insertions, 276 deletions
diff --git a/extra/tigertool/ecusb/pty_driver.py b/extra/tigertool/ecusb/pty_driver.py index 09ef8c42e4..723bf41b57 100644 --- a/extra/tigertool/ecusb/pty_driver.py +++ b/extra/tigertool/ecusb/pty_driver.py @@ -1,10 +1,6 @@ -# Copyright 2017 The Chromium OS Authors. All rights reserved. +# Copyright 2017 The ChromiumOS Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. -# -# Ignore indention messages, since legacy scripts use 2 spaces instead of 4. -# pylint: disable=bad-indentation,docstring-section-indent -# pylint: disable=docstring-trailing-quotes """ptyDriver class @@ -17,9 +13,10 @@ import ast import errno import fcntl import os -import pexpect import time -from pexpect import fdpexpect + +import pexpect # pylint:disable=import-error +from pexpect import fdpexpect # pylint:disable=import-error # Expecting a result in 3 seconds is plenty even for slow platforms. DEFAULT_UART_TIMEOUT = 3 @@ -27,281 +24,291 @@ FLUSH_UART_TIMEOUT = 1 class ptyError(Exception): - """Exception class for pty errors.""" + """Exception class for pty errors.""" UART_PARAMS = { - 'uart_cmd': None, - 'uart_multicmd': None, - 'uart_regexp': None, - 'uart_timeout': DEFAULT_UART_TIMEOUT, + "uart_cmd": None, + "uart_multicmd": None, + "uart_regexp": None, + "uart_timeout": DEFAULT_UART_TIMEOUT, } class ptyDriver(object): - """Automate interactive commands on a pty interface.""" - def __init__(self, interface, params, fast=False): - """Init class variables.""" - self._child = None - self._fd = None - self._interface = interface - self._pty_path = self._interface.get_pty() - self._dict = UART_PARAMS.copy() - self._fast = fast - - def __del__(self): - self.close() - - def close(self): - """Close any open files and interfaces.""" - if self._fd: - self._close() - self._interface.close() - - def _open(self): - """Connect to serial device and create pexpect interface.""" - assert self._fd is None - self._fd = os.open(self._pty_path, os.O_RDWR | os.O_NONBLOCK) - # Don't allow forked processes to access. - fcntl.fcntl(self._fd, fcntl.F_SETFD, - fcntl.fcntl(self._fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC) - self._child = fdpexpect.fdspawn(self._fd) - # pexpect defaults to a 100ms delay before sending characters, to - # work around race conditions in ssh. We don't need this feature - # so we'll change delaybeforesend from 0.1 to 0.001 to speed things up. - if self._fast: - self._child.delaybeforesend = 0.001 - - def _close(self): - """Close serial device connection.""" - os.close(self._fd) - self._fd = None - self._child = None - - def _flush(self): - """Flush device output to prevent previous messages interfering.""" - if self._child.sendline('') != 1: - raise ptyError('Failed to send newline.') - # Have a maximum timeout for the flush operation. We should have cleared - # all data from the buffer, but if data is regularly being generated, we - # can't guarantee it will ever stop. - flush_end_time = time.time() + FLUSH_UART_TIMEOUT - while time.time() <= flush_end_time: - try: - self._child.expect('.', timeout=0.01) - except (pexpect.TIMEOUT, pexpect.EOF): - break - except OSError as e: - # EAGAIN indicates no data available, maybe we didn't wait long enough. - if e.errno != errno.EAGAIN: - raise - break - - def _send(self, cmds): - """Send command to EC. - - This function always flushes serial device before sending, and is used as - a wrapper function to make sure the channel is always flushed before - sending commands. - - Args: - cmds: The commands to send to the device, either a list or a string. - - Raises: - ptyError: Raised when writing to the device fails. - """ - self._flush() - if not isinstance(cmds, list): - cmds = [cmds] - for cmd in cmds: - if self._child.sendline(cmd) != len(cmd) + 1: - raise ptyError('Failed to send command.') - - def _issue_cmd(self, cmds): - """Send command to the device and do not wait for response. - - Args: - cmds: The commands to send to the device, either a list or a string. - """ - self._issue_cmd_get_results(cmds, []) - - def _issue_cmd_get_results(self, cmds, - regex_list, timeout=DEFAULT_UART_TIMEOUT): - """Send command to the device and wait for response. - - This function waits for response message matching a regular - expressions. - - Args: - cmds: The commands issued, either a list or a string. - regex_list: List of Regular expressions used to match response message. - Note1, list must be ordered. - Note2, empty list sends and returns. - timeout: time to wait for matching results before failing. - - Returns: - List of tuples, each of which contains the entire matched string and - all the subgroups of the match. None if not matched. - For example: - response of the given command: - High temp: 37.2 - Low temp: 36.4 - regex_list: - ['High temp: (\d+)\.(\d+)', 'Low temp: (\d+)\.(\d+)'] - returns: - [('High temp: 37.2', '37', '2'), ('Low temp: 36.4', '36', '4')] - - Raises: - ptyError: If timed out waiting for a response - """ - result_list = [] - self._open() - try: - self._send(cmds) - for regex in regex_list: - self._child.expect(regex, timeout) - match = self._child.match - lastindex = match.lastindex if match and match.lastindex else 0 - # Create a tuple which contains the entire matched string and all - # the subgroups of the match. - result = match.group(*range(lastindex + 1)) if match else None - if result: - result = tuple(res.decode('utf-8') for res in result) - result_list.append(result) - except pexpect.TIMEOUT: - raise ptyError('Timeout waiting for response.') - finally: - if not regex_list: - # Must be longer than delaybeforesend - time.sleep(0.1) - self._close() - return result_list - - def _issue_cmd_get_multi_results(self, cmd, regex): - """Send command to the device and wait for multiple response. - - This function waits for arbitrary number of response message - matching a regular expression. - - Args: - cmd: The command issued. - regex: Regular expression used to match response message. - - Returns: - List of tuples, each of which contains the entire matched string and - all the subgroups of the match. None if not matched. - """ - result_list = [] - self._open() - try: - self._send(cmd) - while True: + """Automate interactive commands on a pty interface.""" + + def __init__(self, interface, params, fast=False): + """Init class variables.""" + self._child = None + self._fd = None + self._interface = interface + self._pty_path = self._interface.get_pty() + self._dict = UART_PARAMS.copy() + self._fast = fast + + def __del__(self): + self.close() + + def close(self): + """Close any open files and interfaces.""" + if self._fd: + self._close() + self._interface.close() + + def _open(self): + """Connect to serial device and create pexpect interface.""" + assert self._fd is None + self._fd = os.open(self._pty_path, os.O_RDWR | os.O_NONBLOCK) + # Don't allow forked processes to access. + fcntl.fcntl( + self._fd, + fcntl.F_SETFD, + fcntl.fcntl(self._fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC, + ) + self._child = fdpexpect.fdspawn(self._fd) + # pexpect defaults to a 100ms delay before sending characters, to + # work around race conditions in ssh. We don't need this feature + # so we'll change delaybeforesend from 0.1 to 0.001 to speed things up. + if self._fast: + self._child.delaybeforesend = 0.001 + + def _close(self): + """Close serial device connection.""" + os.close(self._fd) + self._fd = None + self._child = None + + def _flush(self): + """Flush device output to prevent previous messages interfering.""" + if self._child.sendline("") != 1: + raise ptyError("Failed to send newline.") + # Have a maximum timeout for the flush operation. We should have cleared + # all data from the buffer, but if data is regularly being generated, we + # can't guarantee it will ever stop. + flush_end_time = time.time() + FLUSH_UART_TIMEOUT + while time.time() <= flush_end_time: + try: + self._child.expect(".", timeout=0.01) + except (pexpect.TIMEOUT, pexpect.EOF): + break + except OSError as e: + # EAGAIN indicates no data available, maybe we didn't wait long enough. + if e.errno != errno.EAGAIN: + raise + break + + def _send(self, cmds): + """Send command to EC. + + This function always flushes serial device before sending, and is used as + a wrapper function to make sure the channel is always flushed before + sending commands. + + Args: + cmds: The commands to send to the device, either a list or a string. + + Raises: + ptyError: Raised when writing to the device fails. + """ + self._flush() + if not isinstance(cmds, list): + cmds = [cmds] + for cmd in cmds: + if self._child.sendline(cmd) != len(cmd) + 1: + raise ptyError("Failed to send command.") + + def _issue_cmd(self, cmds): + """Send command to the device and do not wait for response. + + Args: + cmds: The commands to send to the device, either a list or a string. + """ + self._issue_cmd_get_results(cmds, []) + + def _issue_cmd_get_results( + self, cmds, regex_list, timeout=DEFAULT_UART_TIMEOUT + ): + """Send command to the device and wait for response. + + This function waits for response message matching a regular + expressions. + + Args: + cmds: The commands issued, either a list or a string. + regex_list: List of Regular expressions used to match response message. + Note1, list must be ordered. + Note2, empty list sends and returns. + timeout: time to wait for matching results before failing. + + Returns: + List of tuples, each of which contains the entire matched string and + all the subgroups of the match. None if not matched. + For example: + response of the given command: + High temp: 37.2 + Low temp: 36.4 + regex_list: + ['High temp: (\d+)\.(\d+)', 'Low temp: (\d+)\.(\d+)'] + returns: + [('High temp: 37.2', '37', '2'), ('Low temp: 36.4', '36', '4')] + + Raises: + ptyError: If timed out waiting for a response + """ + result_list = [] + self._open() try: - self._child.expect(regex, timeout=0.1) - match = self._child.match - lastindex = match.lastindex if match and match.lastindex else 0 - # Create a tuple which contains the entire matched string and all - # the subgroups of the match. - result = match.group(*range(lastindex + 1)) if match else None - if result: - result = tuple(res.decode('utf-8') for res in result) - result_list.append(result) + self._send(cmds) + for regex in regex_list: + self._child.expect(regex, timeout) + match = self._child.match + lastindex = match.lastindex if match and match.lastindex else 0 + # Create a tuple which contains the entire matched string and all + # the subgroups of the match. + result = match.group(*range(lastindex + 1)) if match else None + if result: + result = tuple(res.decode("utf-8") for res in result) + result_list.append(result) except pexpect.TIMEOUT: - break - finally: - self._close() - return result_list - - def _Set_uart_timeout(self, timeout): - """Set timeout value for waiting for the device response. - - Args: - timeout: Timeout value in second. - """ - self._dict['uart_timeout'] = timeout - - def _Get_uart_timeout(self): - """Get timeout value for waiting for the device response. - - Returns: - Timeout value in second. - """ - return self._dict['uart_timeout'] - - def _Set_uart_regexp(self, regexp): - """Set the list of regular expressions which matches the command response. - - Args: - regexp: A string which contains a list of regular expressions. - """ - if not isinstance(regexp, str): - raise ptyError('The argument regexp should be a string.') - self._dict['uart_regexp'] = ast.literal_eval(regexp) - - def _Get_uart_regexp(self): - """Get the list of regular expressions which matches the command response. - - Returns: - A string which contains a list of regular expressions. - """ - return str(self._dict['uart_regexp']) - - def _Set_uart_cmd(self, cmd): - """Set the UART command and send it to the device. - - If ec_uart_regexp is 'None', the command is just sent and it doesn't care - about its response. - - If ec_uart_regexp is not 'None', the command is send and its response, - which matches the regular expression of ec_uart_regexp, will be kept. - Use its getter to obtain this result. If no match after ec_uart_timeout - seconds, a timeout error will be raised. - - Args: - cmd: A string of UART command. - """ - if self._dict['uart_regexp']: - self._dict['uart_cmd'] = self._issue_cmd_get_results( - cmd, self._dict['uart_regexp'], self._dict['uart_timeout']) - else: - self._dict['uart_cmd'] = None - self._issue_cmd(cmd) - - def _Set_uart_multicmd(self, cmds): - """Set multiple UART commands and send them to the device. - - Note that ec_uart_regexp is not supported to match the results. - - Args: - cmds: A semicolon-separated string of UART commands. - """ - self._issue_cmd(cmds.split(';')) - - def _Get_uart_cmd(self): - """Get the result of the latest UART command. - - Returns: - A string which contains a list of tuples, each of which contains the - entire matched string and all the subgroups of the match. 'None' if - the ec_uart_regexp is 'None'. - """ - return str(self._dict['uart_cmd']) - - def _Set_uart_capture(self, cmd): - """Set UART capture mode (on or off). - - Once capture is enabled, UART output could be collected periodically by - invoking _Get_uart_stream() below. - - Args: - cmd: True for on, False for off - """ - self._interface.set_capture_active(cmd) - - def _Get_uart_capture(self): - """Get the UART capture mode (on or off).""" - return self._interface.get_capture_active() - - def _Get_uart_stream(self): - """Get uart stream generated since last time.""" - return self._interface.get_stream() + raise ptyError("Timeout waiting for response.") + finally: + if not regex_list: + # Must be longer than delaybeforesend + time.sleep(0.1) + self._close() + return result_list + + def _issue_cmd_get_multi_results(self, cmd, regex): + """Send command to the device and wait for multiple response. + + This function waits for arbitrary number of response message + matching a regular expression. + + Args: + cmd: The command issued. + regex: Regular expression used to match response message. + + Returns: + List of tuples, each of which contains the entire matched string and + all the subgroups of the match. None if not matched. + """ + result_list = [] + self._open() + try: + self._send(cmd) + while True: + try: + self._child.expect(regex, timeout=0.1) + match = self._child.match + lastindex = ( + match.lastindex if match and match.lastindex else 0 + ) + # Create a tuple which contains the entire matched string and all + # the subgroups of the match. + result = ( + match.group(*range(lastindex + 1)) if match else None + ) + if result: + result = tuple(res.decode("utf-8") for res in result) + result_list.append(result) + except pexpect.TIMEOUT: + break + finally: + self._close() + return result_list + + def _Set_uart_timeout(self, timeout): + """Set timeout value for waiting for the device response. + + Args: + timeout: Timeout value in second. + """ + self._dict["uart_timeout"] = timeout + + def _Get_uart_timeout(self): + """Get timeout value for waiting for the device response. + + Returns: + Timeout value in second. + """ + return self._dict["uart_timeout"] + + def _Set_uart_regexp(self, regexp): + """Set the list of regular expressions which matches the command response. + + Args: + regexp: A string which contains a list of regular expressions. + """ + if not isinstance(regexp, str): + raise ptyError("The argument regexp should be a string.") + self._dict["uart_regexp"] = ast.literal_eval(regexp) + + def _Get_uart_regexp(self): + """Get the list of regular expressions which matches the command response. + + Returns: + A string which contains a list of regular expressions. + """ + return str(self._dict["uart_regexp"]) + + def _Set_uart_cmd(self, cmd): + """Set the UART command and send it to the device. + + If ec_uart_regexp is 'None', the command is just sent and it doesn't care + about its response. + + If ec_uart_regexp is not 'None', the command is send and its response, + which matches the regular expression of ec_uart_regexp, will be kept. + Use its getter to obtain this result. If no match after ec_uart_timeout + seconds, a timeout error will be raised. + + Args: + cmd: A string of UART command. + """ + if self._dict["uart_regexp"]: + self._dict["uart_cmd"] = self._issue_cmd_get_results( + cmd, self._dict["uart_regexp"], self._dict["uart_timeout"] + ) + else: + self._dict["uart_cmd"] = None + self._issue_cmd(cmd) + + def _Set_uart_multicmd(self, cmds): + """Set multiple UART commands and send them to the device. + + Note that ec_uart_regexp is not supported to match the results. + + Args: + cmds: A semicolon-separated string of UART commands. + """ + self._issue_cmd(cmds.split(";")) + + def _Get_uart_cmd(self): + """Get the result of the latest UART command. + + Returns: + A string which contains a list of tuples, each of which contains the + entire matched string and all the subgroups of the match. 'None' if + the ec_uart_regexp is 'None'. + """ + return str(self._dict["uart_cmd"]) + + def _Set_uart_capture(self, cmd): + """Set UART capture mode (on or off). + + Once capture is enabled, UART output could be collected periodically by + invoking _Get_uart_stream() below. + + Args: + cmd: True for on, False for off + """ + self._interface.set_capture_active(cmd) + + def _Get_uart_capture(self): + """Get the UART capture mode (on or off).""" + return self._interface.get_capture_active() + + def _Get_uart_stream(self): + """Get uart stream generated since last time.""" + return self._interface.get_stream() |