summaryrefslogtreecommitdiff
path: root/extra/tigertool/ecusb/pty_driver.py
diff options
context:
space:
mode:
Diffstat (limited to 'extra/tigertool/ecusb/pty_driver.py')
-rw-r--r--extra/tigertool/ecusb/pty_driver.py559
1 files changed, 283 insertions, 276 deletions
diff --git a/extra/tigertool/ecusb/pty_driver.py b/extra/tigertool/ecusb/pty_driver.py
index 09ef8c42e4..723bf41b57 100644
--- a/extra/tigertool/ecusb/pty_driver.py
+++ b/extra/tigertool/ecusb/pty_driver.py
@@ -1,10 +1,6 @@
-# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Copyright 2017 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-#
-# Ignore indention messages, since legacy scripts use 2 spaces instead of 4.
-# pylint: disable=bad-indentation,docstring-section-indent
-# pylint: disable=docstring-trailing-quotes
"""ptyDriver class
@@ -17,9 +13,10 @@ import ast
import errno
import fcntl
import os
-import pexpect
import time
-from pexpect import fdpexpect
+
+import pexpect # pylint:disable=import-error
+from pexpect import fdpexpect # pylint:disable=import-error
# Expecting a result in 3 seconds is plenty even for slow platforms.
DEFAULT_UART_TIMEOUT = 3
@@ -27,281 +24,291 @@ FLUSH_UART_TIMEOUT = 1
class ptyError(Exception):
- """Exception class for pty errors."""
+ """Exception class for pty errors."""
UART_PARAMS = {
- 'uart_cmd': None,
- 'uart_multicmd': None,
- 'uart_regexp': None,
- 'uart_timeout': DEFAULT_UART_TIMEOUT,
+ "uart_cmd": None,
+ "uart_multicmd": None,
+ "uart_regexp": None,
+ "uart_timeout": DEFAULT_UART_TIMEOUT,
}
class ptyDriver(object):
- """Automate interactive commands on a pty interface."""
- def __init__(self, interface, params, fast=False):
- """Init class variables."""
- self._child = None
- self._fd = None
- self._interface = interface
- self._pty_path = self._interface.get_pty()
- self._dict = UART_PARAMS.copy()
- self._fast = fast
-
- def __del__(self):
- self.close()
-
- def close(self):
- """Close any open files and interfaces."""
- if self._fd:
- self._close()
- self._interface.close()
-
- def _open(self):
- """Connect to serial device and create pexpect interface."""
- assert self._fd is None
- self._fd = os.open(self._pty_path, os.O_RDWR | os.O_NONBLOCK)
- # Don't allow forked processes to access.
- fcntl.fcntl(self._fd, fcntl.F_SETFD,
- fcntl.fcntl(self._fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC)
- self._child = fdpexpect.fdspawn(self._fd)
- # pexpect defaults to a 100ms delay before sending characters, to
- # work around race conditions in ssh. We don't need this feature
- # so we'll change delaybeforesend from 0.1 to 0.001 to speed things up.
- if self._fast:
- self._child.delaybeforesend = 0.001
-
- def _close(self):
- """Close serial device connection."""
- os.close(self._fd)
- self._fd = None
- self._child = None
-
- def _flush(self):
- """Flush device output to prevent previous messages interfering."""
- if self._child.sendline('') != 1:
- raise ptyError('Failed to send newline.')
- # Have a maximum timeout for the flush operation. We should have cleared
- # all data from the buffer, but if data is regularly being generated, we
- # can't guarantee it will ever stop.
- flush_end_time = time.time() + FLUSH_UART_TIMEOUT
- while time.time() <= flush_end_time:
- try:
- self._child.expect('.', timeout=0.01)
- except (pexpect.TIMEOUT, pexpect.EOF):
- break
- except OSError as e:
- # EAGAIN indicates no data available, maybe we didn't wait long enough.
- if e.errno != errno.EAGAIN:
- raise
- break
-
- def _send(self, cmds):
- """Send command to EC.
-
- This function always flushes serial device before sending, and is used as
- a wrapper function to make sure the channel is always flushed before
- sending commands.
-
- Args:
- cmds: The commands to send to the device, either a list or a string.
-
- Raises:
- ptyError: Raised when writing to the device fails.
- """
- self._flush()
- if not isinstance(cmds, list):
- cmds = [cmds]
- for cmd in cmds:
- if self._child.sendline(cmd) != len(cmd) + 1:
- raise ptyError('Failed to send command.')
-
- def _issue_cmd(self, cmds):
- """Send command to the device and do not wait for response.
-
- Args:
- cmds: The commands to send to the device, either a list or a string.
- """
- self._issue_cmd_get_results(cmds, [])
-
- def _issue_cmd_get_results(self, cmds,
- regex_list, timeout=DEFAULT_UART_TIMEOUT):
- """Send command to the device and wait for response.
-
- This function waits for response message matching a regular
- expressions.
-
- Args:
- cmds: The commands issued, either a list or a string.
- regex_list: List of Regular expressions used to match response message.
- Note1, list must be ordered.
- Note2, empty list sends and returns.
- timeout: time to wait for matching results before failing.
-
- Returns:
- List of tuples, each of which contains the entire matched string and
- all the subgroups of the match. None if not matched.
- For example:
- response of the given command:
- High temp: 37.2
- Low temp: 36.4
- regex_list:
- ['High temp: (\d+)\.(\d+)', 'Low temp: (\d+)\.(\d+)']
- returns:
- [('High temp: 37.2', '37', '2'), ('Low temp: 36.4', '36', '4')]
-
- Raises:
- ptyError: If timed out waiting for a response
- """
- result_list = []
- self._open()
- try:
- self._send(cmds)
- for regex in regex_list:
- self._child.expect(regex, timeout)
- match = self._child.match
- lastindex = match.lastindex if match and match.lastindex else 0
- # Create a tuple which contains the entire matched string and all
- # the subgroups of the match.
- result = match.group(*range(lastindex + 1)) if match else None
- if result:
- result = tuple(res.decode('utf-8') for res in result)
- result_list.append(result)
- except pexpect.TIMEOUT:
- raise ptyError('Timeout waiting for response.')
- finally:
- if not regex_list:
- # Must be longer than delaybeforesend
- time.sleep(0.1)
- self._close()
- return result_list
-
- def _issue_cmd_get_multi_results(self, cmd, regex):
- """Send command to the device and wait for multiple response.
-
- This function waits for arbitrary number of response message
- matching a regular expression.
-
- Args:
- cmd: The command issued.
- regex: Regular expression used to match response message.
-
- Returns:
- List of tuples, each of which contains the entire matched string and
- all the subgroups of the match. None if not matched.
- """
- result_list = []
- self._open()
- try:
- self._send(cmd)
- while True:
+ """Automate interactive commands on a pty interface."""
+
+ def __init__(self, interface, params, fast=False):
+ """Init class variables."""
+ self._child = None
+ self._fd = None
+ self._interface = interface
+ self._pty_path = self._interface.get_pty()
+ self._dict = UART_PARAMS.copy()
+ self._fast = fast
+
+ def __del__(self):
+ self.close()
+
+ def close(self):
+ """Close any open files and interfaces."""
+ if self._fd:
+ self._close()
+ self._interface.close()
+
+ def _open(self):
+ """Connect to serial device and create pexpect interface."""
+ assert self._fd is None
+ self._fd = os.open(self._pty_path, os.O_RDWR | os.O_NONBLOCK)
+ # Don't allow forked processes to access.
+ fcntl.fcntl(
+ self._fd,
+ fcntl.F_SETFD,
+ fcntl.fcntl(self._fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC,
+ )
+ self._child = fdpexpect.fdspawn(self._fd)
+ # pexpect defaults to a 100ms delay before sending characters, to
+ # work around race conditions in ssh. We don't need this feature
+ # so we'll change delaybeforesend from 0.1 to 0.001 to speed things up.
+ if self._fast:
+ self._child.delaybeforesend = 0.001
+
+ def _close(self):
+ """Close serial device connection."""
+ os.close(self._fd)
+ self._fd = None
+ self._child = None
+
+ def _flush(self):
+ """Flush device output to prevent previous messages interfering."""
+ if self._child.sendline("") != 1:
+ raise ptyError("Failed to send newline.")
+ # Have a maximum timeout for the flush operation. We should have cleared
+ # all data from the buffer, but if data is regularly being generated, we
+ # can't guarantee it will ever stop.
+ flush_end_time = time.time() + FLUSH_UART_TIMEOUT
+ while time.time() <= flush_end_time:
+ try:
+ self._child.expect(".", timeout=0.01)
+ except (pexpect.TIMEOUT, pexpect.EOF):
+ break
+ except OSError as e:
+ # EAGAIN indicates no data available, maybe we didn't wait long enough.
+ if e.errno != errno.EAGAIN:
+ raise
+ break
+
+ def _send(self, cmds):
+ """Send command to EC.
+
+ This function always flushes serial device before sending, and is used as
+ a wrapper function to make sure the channel is always flushed before
+ sending commands.
+
+ Args:
+ cmds: The commands to send to the device, either a list or a string.
+
+ Raises:
+ ptyError: Raised when writing to the device fails.
+ """
+ self._flush()
+ if not isinstance(cmds, list):
+ cmds = [cmds]
+ for cmd in cmds:
+ if self._child.sendline(cmd) != len(cmd) + 1:
+ raise ptyError("Failed to send command.")
+
+ def _issue_cmd(self, cmds):
+ """Send command to the device and do not wait for response.
+
+ Args:
+ cmds: The commands to send to the device, either a list or a string.
+ """
+ self._issue_cmd_get_results(cmds, [])
+
+ def _issue_cmd_get_results(
+ self, cmds, regex_list, timeout=DEFAULT_UART_TIMEOUT
+ ):
+ """Send command to the device and wait for response.
+
+ This function waits for response message matching a regular
+ expressions.
+
+ Args:
+ cmds: The commands issued, either a list or a string.
+ regex_list: List of Regular expressions used to match response message.
+ Note1, list must be ordered.
+ Note2, empty list sends and returns.
+ timeout: time to wait for matching results before failing.
+
+ Returns:
+ List of tuples, each of which contains the entire matched string and
+ all the subgroups of the match. None if not matched.
+ For example:
+ response of the given command:
+ High temp: 37.2
+ Low temp: 36.4
+ regex_list:
+ ['High temp: (\d+)\.(\d+)', 'Low temp: (\d+)\.(\d+)']
+ returns:
+ [('High temp: 37.2', '37', '2'), ('Low temp: 36.4', '36', '4')]
+
+ Raises:
+ ptyError: If timed out waiting for a response
+ """
+ result_list = []
+ self._open()
try:
- self._child.expect(regex, timeout=0.1)
- match = self._child.match
- lastindex = match.lastindex if match and match.lastindex else 0
- # Create a tuple which contains the entire matched string and all
- # the subgroups of the match.
- result = match.group(*range(lastindex + 1)) if match else None
- if result:
- result = tuple(res.decode('utf-8') for res in result)
- result_list.append(result)
+ self._send(cmds)
+ for regex in regex_list:
+ self._child.expect(regex, timeout)
+ match = self._child.match
+ lastindex = match.lastindex if match and match.lastindex else 0
+ # Create a tuple which contains the entire matched string and all
+ # the subgroups of the match.
+ result = match.group(*range(lastindex + 1)) if match else None
+ if result:
+ result = tuple(res.decode("utf-8") for res in result)
+ result_list.append(result)
except pexpect.TIMEOUT:
- break
- finally:
- self._close()
- return result_list
-
- def _Set_uart_timeout(self, timeout):
- """Set timeout value for waiting for the device response.
-
- Args:
- timeout: Timeout value in second.
- """
- self._dict['uart_timeout'] = timeout
-
- def _Get_uart_timeout(self):
- """Get timeout value for waiting for the device response.
-
- Returns:
- Timeout value in second.
- """
- return self._dict['uart_timeout']
-
- def _Set_uart_regexp(self, regexp):
- """Set the list of regular expressions which matches the command response.
-
- Args:
- regexp: A string which contains a list of regular expressions.
- """
- if not isinstance(regexp, str):
- raise ptyError('The argument regexp should be a string.')
- self._dict['uart_regexp'] = ast.literal_eval(regexp)
-
- def _Get_uart_regexp(self):
- """Get the list of regular expressions which matches the command response.
-
- Returns:
- A string which contains a list of regular expressions.
- """
- return str(self._dict['uart_regexp'])
-
- def _Set_uart_cmd(self, cmd):
- """Set the UART command and send it to the device.
-
- If ec_uart_regexp is 'None', the command is just sent and it doesn't care
- about its response.
-
- If ec_uart_regexp is not 'None', the command is send and its response,
- which matches the regular expression of ec_uart_regexp, will be kept.
- Use its getter to obtain this result. If no match after ec_uart_timeout
- seconds, a timeout error will be raised.
-
- Args:
- cmd: A string of UART command.
- """
- if self._dict['uart_regexp']:
- self._dict['uart_cmd'] = self._issue_cmd_get_results(
- cmd, self._dict['uart_regexp'], self._dict['uart_timeout'])
- else:
- self._dict['uart_cmd'] = None
- self._issue_cmd(cmd)
-
- def _Set_uart_multicmd(self, cmds):
- """Set multiple UART commands and send them to the device.
-
- Note that ec_uart_regexp is not supported to match the results.
-
- Args:
- cmds: A semicolon-separated string of UART commands.
- """
- self._issue_cmd(cmds.split(';'))
-
- def _Get_uart_cmd(self):
- """Get the result of the latest UART command.
-
- Returns:
- A string which contains a list of tuples, each of which contains the
- entire matched string and all the subgroups of the match. 'None' if
- the ec_uart_regexp is 'None'.
- """
- return str(self._dict['uart_cmd'])
-
- def _Set_uart_capture(self, cmd):
- """Set UART capture mode (on or off).
-
- Once capture is enabled, UART output could be collected periodically by
- invoking _Get_uart_stream() below.
-
- Args:
- cmd: True for on, False for off
- """
- self._interface.set_capture_active(cmd)
-
- def _Get_uart_capture(self):
- """Get the UART capture mode (on or off)."""
- return self._interface.get_capture_active()
-
- def _Get_uart_stream(self):
- """Get uart stream generated since last time."""
- return self._interface.get_stream()
+ raise ptyError("Timeout waiting for response.")
+ finally:
+ if not regex_list:
+ # Must be longer than delaybeforesend
+ time.sleep(0.1)
+ self._close()
+ return result_list
+
+ def _issue_cmd_get_multi_results(self, cmd, regex):
+ """Send command to the device and wait for multiple response.
+
+ This function waits for arbitrary number of response message
+ matching a regular expression.
+
+ Args:
+ cmd: The command issued.
+ regex: Regular expression used to match response message.
+
+ Returns:
+ List of tuples, each of which contains the entire matched string and
+ all the subgroups of the match. None if not matched.
+ """
+ result_list = []
+ self._open()
+ try:
+ self._send(cmd)
+ while True:
+ try:
+ self._child.expect(regex, timeout=0.1)
+ match = self._child.match
+ lastindex = (
+ match.lastindex if match and match.lastindex else 0
+ )
+ # Create a tuple which contains the entire matched string and all
+ # the subgroups of the match.
+ result = (
+ match.group(*range(lastindex + 1)) if match else None
+ )
+ if result:
+ result = tuple(res.decode("utf-8") for res in result)
+ result_list.append(result)
+ except pexpect.TIMEOUT:
+ break
+ finally:
+ self._close()
+ return result_list
+
+ def _Set_uart_timeout(self, timeout):
+ """Set timeout value for waiting for the device response.
+
+ Args:
+ timeout: Timeout value in second.
+ """
+ self._dict["uart_timeout"] = timeout
+
+ def _Get_uart_timeout(self):
+ """Get timeout value for waiting for the device response.
+
+ Returns:
+ Timeout value in second.
+ """
+ return self._dict["uart_timeout"]
+
+ def _Set_uart_regexp(self, regexp):
+ """Set the list of regular expressions which matches the command response.
+
+ Args:
+ regexp: A string which contains a list of regular expressions.
+ """
+ if not isinstance(regexp, str):
+ raise ptyError("The argument regexp should be a string.")
+ self._dict["uart_regexp"] = ast.literal_eval(regexp)
+
+ def _Get_uart_regexp(self):
+ """Get the list of regular expressions which matches the command response.
+
+ Returns:
+ A string which contains a list of regular expressions.
+ """
+ return str(self._dict["uart_regexp"])
+
+ def _Set_uart_cmd(self, cmd):
+ """Set the UART command and send it to the device.
+
+ If ec_uart_regexp is 'None', the command is just sent and it doesn't care
+ about its response.
+
+ If ec_uart_regexp is not 'None', the command is send and its response,
+ which matches the regular expression of ec_uart_regexp, will be kept.
+ Use its getter to obtain this result. If no match after ec_uart_timeout
+ seconds, a timeout error will be raised.
+
+ Args:
+ cmd: A string of UART command.
+ """
+ if self._dict["uart_regexp"]:
+ self._dict["uart_cmd"] = self._issue_cmd_get_results(
+ cmd, self._dict["uart_regexp"], self._dict["uart_timeout"]
+ )
+ else:
+ self._dict["uart_cmd"] = None
+ self._issue_cmd(cmd)
+
+ def _Set_uart_multicmd(self, cmds):
+ """Set multiple UART commands and send them to the device.
+
+ Note that ec_uart_regexp is not supported to match the results.
+
+ Args:
+ cmds: A semicolon-separated string of UART commands.
+ """
+ self._issue_cmd(cmds.split(";"))
+
+ def _Get_uart_cmd(self):
+ """Get the result of the latest UART command.
+
+ Returns:
+ A string which contains a list of tuples, each of which contains the
+ entire matched string and all the subgroups of the match. 'None' if
+ the ec_uart_regexp is 'None'.
+ """
+ return str(self._dict["uart_cmd"])
+
+ def _Set_uart_capture(self, cmd):
+ """Set UART capture mode (on or off).
+
+ Once capture is enabled, UART output could be collected periodically by
+ invoking _Get_uart_stream() below.
+
+ Args:
+ cmd: True for on, False for off
+ """
+ self._interface.set_capture_active(cmd)
+
+ def _Get_uart_capture(self):
+ """Get the UART capture mode (on or off)."""
+ return self._interface.get_capture_active()
+
+ def _Get_uart_stream(self):
+ """Get uart stream generated since last time."""
+ return self._interface.get_stream()