summaryrefslogtreecommitdiff
path: root/extra/tigertool/ecusb
diff options
context:
space:
mode:
authorJack Rosenthal <jrosenth@chromium.org>2021-11-04 12:11:58 -0600
committerCommit Bot <commit-bot@chromium.org>2021-11-05 04:22:34 +0000
commit252457d4b21f46889eebad61d4c0a65331919cec (patch)
tree01856c4d31d710b20e85a74c8d7b5836e35c3b98 /extra/tigertool/ecusb
parent08f5a1e6fc2c9467230444ac9b582dcf4d9f0068 (diff)
downloadchrome-ec-release-R98-14388.B-ish.tar.gz
In the interest of making long-term branch maintenance incur as little technical debt on us as possible, we should not maintain any files on the branch we are not actually using. This has the added effect of making it extremely clear when merging CLs from the main branch when changes have the possibility to affect us. The follow-on CL adds a convenience script to actually pull updates from the main branch and generate a CL for the update. BUG=b:204206272 BRANCH=ish TEST=make BOARD=arcada_ish && make BOARD=drallion_ish Signed-off-by: Jack Rosenthal <jrosenth@chromium.org> Change-Id: I17e4694c38219b5a0823e0a3e55a28d1348f4b18 Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/ec/+/3262038 Reviewed-by: Jett Rink <jettrink@chromium.org> Reviewed-by: Tom Hughes <tomhughes@chromium.org>
Diffstat (limited to 'extra/tigertool/ecusb')
-rw-r--r--extra/tigertool/ecusb/__init__.py9
-rw-r--r--extra/tigertool/ecusb/pty_driver.py304
-rw-r--r--extra/tigertool/ecusb/stm32uart.py248
-rw-r--r--extra/tigertool/ecusb/stm32usb.py119
-rw-r--r--extra/tigertool/ecusb/tiny_servo_common.py174
-rw-r--r--extra/tigertool/ecusb/tiny_servod.py54
6 files changed, 0 insertions, 908 deletions
diff --git a/extra/tigertool/ecusb/__init__.py b/extra/tigertool/ecusb/__init__.py
deleted file mode 100644
index fe4dbc6749..0000000000
--- a/extra/tigertool/ecusb/__init__.py
+++ /dev/null
@@ -1,9 +0,0 @@
-# Copyright 2017 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-#
-# Ignore indention messages, since legacy scripts use 2 spaces instead of 4.
-# pylint: disable=bad-indentation,docstring-section-indent
-# pylint: disable=docstring-trailing-quotes
-
-__all__ = ['tiny_servo_common', 'stm32usb', 'stm32uart', 'pty_driver']
diff --git a/extra/tigertool/ecusb/pty_driver.py b/extra/tigertool/ecusb/pty_driver.py
deleted file mode 100644
index 137cbc149b..0000000000
--- a/extra/tigertool/ecusb/pty_driver.py
+++ /dev/null
@@ -1,304 +0,0 @@
-# Copyright 2017 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-#
-# Ignore indention messages, since legacy scripts use 2 spaces instead of 4.
-# pylint: disable=bad-indentation,docstring-section-indent
-# pylint: disable=docstring-trailing-quotes
-
-"""ptyDriver class
-
-This class takes a pty interface and can send commands and expect results
-as regex. This is useful for automating console based interfaces, such as
-the CrOS EC console commands.
-"""
-
-import ast
-import errno
-import fcntl
-import os
-import pexpect
-import time
-from pexpect import fdpexpect
-
-# Expecting a result in 3 seconds is plenty even for slow platforms.
-DEFAULT_UART_TIMEOUT = 3
-FLUSH_UART_TIMEOUT = 1
-
-
-class ptyError(Exception):
- """Exception class for pty errors."""
-
-
-UART_PARAMS = {
- 'uart_cmd': None,
- 'uart_multicmd': None,
- 'uart_regexp': None,
- 'uart_timeout': DEFAULT_UART_TIMEOUT,
-}
-
-
-class ptyDriver(object):
- """Automate interactive commands on a pty interface."""
- def __init__(self, interface, params, fast=False):
- """Init class variables."""
- self._child = None
- self._fd = None
- self._interface = interface
- self._pty_path = self._interface.get_pty()
- self._dict = UART_PARAMS.copy()
- self._fast = fast
-
- def __del__(self):
- self.close()
-
- def close(self):
- """Close any open files and interfaces."""
- if self._fd:
- self._close()
- self._interface.close()
-
- def _open(self):
- """Connect to serial device and create pexpect interface."""
- assert self._fd is None
- self._fd = os.open(self._pty_path, os.O_RDWR | os.O_NONBLOCK)
- # Don't allow forked processes to access.
- fcntl.fcntl(self._fd, fcntl.F_SETFD,
- fcntl.fcntl(self._fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC)
- self._child = fdpexpect.fdspawn(self._fd)
- # pexpect defaults to a 100ms delay before sending characters, to
- # work around race conditions in ssh. We don't need this feature
- # so we'll change delaybeforesend from 0.1 to 0.001 to speed things up.
- if self._fast:
- self._child.delaybeforesend = 0.001
-
- def _close(self):
- """Close serial device connection."""
- os.close(self._fd)
- self._fd = None
- self._child = None
-
- def _flush(self):
- """Flush device output to prevent previous messages interfering."""
- if self._child.sendline('') != 1:
- raise ptyError('Failed to send newline.')
- # Have a maximum timeout for the flush operation. We should have cleared
- # all data from the buffer, but if data is regularly being generated, we
- # can't guarantee it will ever stop.
- flush_end_time = time.time() + FLUSH_UART_TIMEOUT
- while time.time() <= flush_end_time:
- try:
- self._child.expect('.', timeout=0.01)
- except (pexpect.TIMEOUT, pexpect.EOF):
- break
- except OSError as e:
- # EAGAIN indicates no data available, maybe we didn't wait long enough.
- if e.errno != errno.EAGAIN:
- raise
- break
-
- def _send(self, cmds):
- """Send command to EC.
-
- This function always flushes serial device before sending, and is used as
- a wrapper function to make sure the channel is always flushed before
- sending commands.
-
- Args:
- cmds: The commands to send to the device, either a list or a string.
-
- Raises:
- ptyError: Raised when writing to the device fails.
- """
- self._flush()
- if not isinstance(cmds, list):
- cmds = [cmds]
- for cmd in cmds:
- if self._child.sendline(cmd) != len(cmd) + 1:
- raise ptyError('Failed to send command.')
-
- def _issue_cmd(self, cmds):
- """Send command to the device and do not wait for response.
-
- Args:
- cmds: The commands to send to the device, either a list or a string.
- """
- self._issue_cmd_get_results(cmds, [])
-
- def _issue_cmd_get_results(self, cmds,
- regex_list, timeout=DEFAULT_UART_TIMEOUT):
- """Send command to the device and wait for response.
-
- This function waits for response message matching a regular
- expressions.
-
- Args:
- cmds: The commands issued, either a list or a string.
- regex_list: List of Regular expressions used to match response message.
- Note1, list must be ordered.
- Note2, empty list sends and returns.
- timeout: time to wait for matching results before failing.
-
- Returns:
- List of tuples, each of which contains the entire matched string and
- all the subgroups of the match. None if not matched.
- For example:
- response of the given command:
- High temp: 37.2
- Low temp: 36.4
- regex_list:
- ['High temp: (\d+)\.(\d+)', 'Low temp: (\d+)\.(\d+)']
- returns:
- [('High temp: 37.2', '37', '2'), ('Low temp: 36.4', '36', '4')]
-
- Raises:
- ptyError: If timed out waiting for a response
- """
- result_list = []
- self._open()
- try:
- self._send(cmds)
- for regex in regex_list:
- self._child.expect(regex, timeout)
- match = self._child.match
- lastindex = match.lastindex if match and match.lastindex else 0
- # Create a tuple which contains the entire matched string and all
- # the subgroups of the match.
- result = match.group(*range(lastindex + 1)) if match else None
- if result:
- result = tuple(res.decode('utf-8') for res in result)
- result_list.append(result)
- except pexpect.TIMEOUT:
- raise ptyError('Timeout waiting for response.')
- finally:
- self._close()
- return result_list
-
- def _issue_cmd_get_multi_results(self, cmd, regex):
- """Send command to the device and wait for multiple response.
-
- This function waits for arbitrary number of response message
- matching a regular expression.
-
- Args:
- cmd: The command issued.
- regex: Regular expression used to match response message.
-
- Returns:
- List of tuples, each of which contains the entire matched string and
- all the subgroups of the match. None if not matched.
- """
- result_list = []
- self._open()
- try:
- self._send(cmd)
- while True:
- try:
- self._child.expect(regex, timeout=0.1)
- match = self._child.match
- lastindex = match.lastindex if match and match.lastindex else 0
- # Create a tuple which contains the entire matched string and all
- # the subgroups of the match.
- result = match.group(*range(lastindex + 1)) if match else None
- if result:
- result = tuple(res.decode('utf-8') for res in result)
- result_list.append(result)
- except pexpect.TIMEOUT:
- break
- finally:
- self._close()
- return result_list
-
- def _Set_uart_timeout(self, timeout):
- """Set timeout value for waiting for the device response.
-
- Args:
- timeout: Timeout value in second.
- """
- self._dict['uart_timeout'] = timeout
-
- def _Get_uart_timeout(self):
- """Get timeout value for waiting for the device response.
-
- Returns:
- Timeout value in second.
- """
- return self._dict['uart_timeout']
-
- def _Set_uart_regexp(self, regexp):
- """Set the list of regular expressions which matches the command response.
-
- Args:
- regexp: A string which contains a list of regular expressions.
- """
- if not isinstance(regexp, str):
- raise ptyError('The argument regexp should be a string.')
- self._dict['uart_regexp'] = ast.literal_eval(regexp)
-
- def _Get_uart_regexp(self):
- """Get the list of regular expressions which matches the command response.
-
- Returns:
- A string which contains a list of regular expressions.
- """
- return str(self._dict['uart_regexp'])
-
- def _Set_uart_cmd(self, cmd):
- """Set the UART command and send it to the device.
-
- If ec_uart_regexp is 'None', the command is just sent and it doesn't care
- about its response.
-
- If ec_uart_regexp is not 'None', the command is send and its response,
- which matches the regular expression of ec_uart_regexp, will be kept.
- Use its getter to obtain this result. If no match after ec_uart_timeout
- seconds, a timeout error will be raised.
-
- Args:
- cmd: A string of UART command.
- """
- if self._dict['uart_regexp']:
- self._dict['uart_cmd'] = self._issue_cmd_get_results(
- cmd, self._dict['uart_regexp'], self._dict['uart_timeout'])
- else:
- self._dict['uart_cmd'] = None
- self._issue_cmd(cmd)
-
- def _Set_uart_multicmd(self, cmds):
- """Set multiple UART commands and send them to the device.
-
- Note that ec_uart_regexp is not supported to match the results.
-
- Args:
- cmds: A semicolon-separated string of UART commands.
- """
- self._issue_cmd(cmds.split(';'))
-
- def _Get_uart_cmd(self):
- """Get the result of the latest UART command.
-
- Returns:
- A string which contains a list of tuples, each of which contains the
- entire matched string and all the subgroups of the match. 'None' if
- the ec_uart_regexp is 'None'.
- """
- return str(self._dict['uart_cmd'])
-
- def _Set_uart_capture(self, cmd):
- """Set UART capture mode (on or off).
-
- Once capture is enabled, UART output could be collected periodically by
- invoking _Get_uart_stream() below.
-
- Args:
- cmd: True for on, False for off
- """
- self._interface.set_capture_active(cmd)
-
- def _Get_uart_capture(self):
- """Get the UART capture mode (on or off)."""
- return self._interface.get_capture_active()
-
- def _Get_uart_stream(self):
- """Get uart stream generated since last time."""
- return self._interface.get_stream()
diff --git a/extra/tigertool/ecusb/stm32uart.py b/extra/tigertool/ecusb/stm32uart.py
deleted file mode 100644
index 95219455a9..0000000000
--- a/extra/tigertool/ecusb/stm32uart.py
+++ /dev/null
@@ -1,248 +0,0 @@
-# Copyright 2017 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-#
-# Ignore indention messages, since legacy scripts use 2 spaces instead of 4.
-# pylint: disable=bad-indentation,docstring-section-indent
-# pylint: disable=docstring-trailing-quotes
-
-"""Allow creation of uart/console interface via stm32 usb endpoint."""
-
-from __future__ import print_function
-
-import os
-import select
-import sys
-import termios
-import threading
-import time
-import tty
-import usb
-
-from . import stm32usb
-
-
-class SuartError(Exception):
- """Class for exceptions of Suart."""
- def __init__(self, msg, value=0):
- """SuartError constructor.
-
- Args:
- msg: string, message describing error in detail
- value: integer, value of error when non-zero status returned. Default=0
- """
- super(SuartError, self).__init__(msg, value)
- self.msg = msg
- self.value = value
-
-
-class Suart(object):
- """Provide interface to stm32 serial usb endpoint."""
- def __init__(self, vendor=0x18d1, product=0x501a, interface=0,
- serialname=None, debuglog=False):
- """Suart contstructor.
-
- Initializes stm32 USB stream interface.
-
- Args:
- vendor: usb vendor id of stm32 device
- product: usb product id of stm32 device
- interface: interface number of stm32 device to use
- serialname: serial name to target. Defaults to None.
- debuglog: chatty output. Defaults to False.
-
- Raises:
- SuartError: If init fails
- """
- self._ptym = None
- self._ptys = None
- self._ptyname = None
- self._rx_thread = None
- self._tx_thread = None
- self._debuglog = debuglog
- self._susb = stm32usb.Susb(vendor=vendor, product=product,
- interface=interface, serialname=serialname)
- self._running = False
-
- def __del__(self):
- """Suart destructor."""
- self.close()
-
- def close(self):
- """Stop all running threads."""
- self._running = False
- if self._rx_thread:
- self._rx_thread.join(2)
- self._rx_thread = None
- if self._tx_thread:
- self._tx_thread.join(2)
- self._tx_thread = None
- self._susb.close()
-
- def run_rx_thread(self):
- """Background loop to pass data from USB to pty."""
- ep = select.epoll()
- ep.register(self._ptym, select.EPOLLHUP)
- try:
- while self._running:
- events = ep.poll(0)
- # Check if the pty is connected to anything, or hungup.
- if not events:
- try:
- r = self._susb._read_ep.read(64, self._susb.TIMEOUT_MS)
- if r:
- if self._debuglog:
- print(''.join([chr(x) for x in r]), end='')
- os.write(self._ptym, r)
-
- # If we miss some characters on pty disconnect, that's fine.
- # ep.read() also throws USBError on timeout, which we discard.
- except OSError:
- pass
- except usb.core.USBError:
- pass
- else:
- time.sleep(.1)
- except Exception as e:
- raise e
-
- def run_tx_thread(self):
- """Background loop to pass data from pty to USB."""
- ep = select.epoll()
- ep.register(self._ptym, select.EPOLLHUP)
- try:
- while self._running:
- events = ep.poll(0)
- # Check if the pty is connected to anything, or hungup.
- if not events:
- try:
- r = os.read(self._ptym, 64)
- # TODO(crosbug.com/936182): Remove when the servo v4/micro console
- # issues are fixed.
- time.sleep(0.001)
- if r:
- self._susb._write_ep.write(r, self._susb.TIMEOUT_MS)
-
- except OSError:
- pass
- except usb.core.USBError:
- pass
- else:
- time.sleep(.1)
- except Exception as e:
- raise e
-
- def run(self):
- """Creates pthreads to poll stm32 & PTY for data."""
- m, s = os.openpty()
- self._ptyname = os.ttyname(s)
-
- self._ptym = m
- self._ptys = s
-
- os.fchmod(s, 0o660)
-
- # Change the owner and group of the PTY to the user who started servod.
- try:
- uid = int(os.environ.get('SUDO_UID', -1))
- except TypeError:
- uid = -1
-
- try:
- gid = int(os.environ.get('SUDO_GID', -1))
- except TypeError:
- gid = -1
- os.fchown(s, uid, gid)
-
- tty.setraw(self._ptym, termios.TCSADRAIN)
-
- # Generate a HUP flag on pty slave fd.
- os.fdopen(s).close()
-
- self._running = True
-
- self._rx_thread = threading.Thread(target=self.run_rx_thread, args=[])
- self._rx_thread.daemon = True
- self._rx_thread.start()
-
- self._tx_thread = threading.Thread(target=self.run_tx_thread, args=[])
- self._tx_thread.daemon = True
- self._tx_thread.start()
-
- def get_uart_props(self):
- """Get the uart's properties.
-
- Returns:
- dict where:
- baudrate: integer of uarts baudrate
- bits: integer, number of bits of data Can be 5|6|7|8 inclusive
- parity: integer, parity of 0-2 inclusive where:
- 0: no parity
- 1: odd parity
- 2: even parity
- sbits: integer, number of stop bits. Can be 0|1|2 inclusive where:
- 0: 1 stop bit
- 1: 1.5 stop bits
- 2: 2 stop bits
- """
- return {
- 'baudrate': 115200,
- 'bits': 8,
- 'parity': 0,
- 'sbits': 1,
- }
-
- def set_uart_props(self, line_props):
- """Set the uart's properties.
-
- Note that Suart cannot set properties
- and will fail if the properties are not the default 115200,8n1.
-
- Args:
- line_props: dict where:
- baudrate: integer of uarts baudrate
- bits: integer, number of bits of data ( prior to stop bit)
- parity: integer, parity of 0-2 inclusive where
- 0: no parity
- 1: odd parity
- 2: even parity
- sbits: integer, number of stop bits. Can be 0|1|2 inclusive where:
- 0: 1 stop bit
- 1: 1.5 stop bits
- 2: 2 stop bits
-
- Raises:
- SuartError: If requested line properties are not the default.
- """
- curr_props = self.get_uart_props()
- for prop in line_props:
- if line_props[prop] != curr_props[prop]:
- raise SuartError('Line property %s cannot be set from %s to %s' % (
- prop, curr_props[prop], line_props[prop]))
- return True
-
- def get_pty(self):
- """Gets path to pty for communication to/from uart.
-
- Returns:
- String path to the pty connected to the uart
- """
- return self._ptyname
-
-
-def main():
- """Run a suart test with the default parameters."""
- try:
- sobj = Suart()
- sobj.run()
-
- # run() is a thread so just busy wait to mimic server.
- while True:
- # Ours sleeps to eleven!
- time.sleep(11)
- except KeyboardInterrupt:
- sys.exit(0)
-
-
-if __name__ == '__main__':
- main()
diff --git a/extra/tigertool/ecusb/stm32usb.py b/extra/tigertool/ecusb/stm32usb.py
deleted file mode 100644
index bfd5fbb1fb..0000000000
--- a/extra/tigertool/ecusb/stm32usb.py
+++ /dev/null
@@ -1,119 +0,0 @@
-# Copyright 2017 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-#
-# Ignore indention messages, since legacy scripts use 2 spaces instead of 4.
-# pylint: disable=bad-indentation,docstring-section-indent
-# pylint: disable=docstring-trailing-quotes
-
-"""Allows creation of an interface via stm32 usb."""
-
-import usb
-
-
-class SusbError(Exception):
- """Class for exceptions of Susb."""
- def __init__(self, msg, value=0):
- """SusbError constructor.
-
- Args:
- msg: string, message describing error in detail
- value: integer, value of error when non-zero status returned. Default=0
- """
- super(SusbError, self).__init__(msg, value)
- self.msg = msg
- self.value = value
-
-
-class Susb(object):
- """Provide stm32 USB functionality.
-
- Instance Variables:
- _read_ep: pyUSB read endpoint for this interface
- _write_ep: pyUSB write endpoint for this interface
- """
- READ_ENDPOINT = 0x81
- WRITE_ENDPOINT = 0x1
- TIMEOUT_MS = 100
-
- def __init__(self, vendor=0x18d1,
- product=0x5027, interface=1, serialname=None, logger=None):
- """Susb constructor.
-
- Discovers and connects to stm32 USB endpoints.
-
- Args:
- vendor: usb vendor id of stm32 device.
- product: usb product id of stm32 device.
- interface: interface number ( 1 - 4 ) of stm32 device to use.
- serialname: string of device serialname.
- logger: none
-
- Raises:
- SusbError: An error accessing Susb object
- """
- self._vendor = vendor
- self._product = product
- self._interface = interface
- self._serialname = serialname
- self._find_device()
-
- def _find_device(self):
- """Set up the usb endpoint"""
- # Find the stm32.
- dev_g = usb.core.find(idVendor=self._vendor, idProduct=self._product,
- find_all=True)
- dev_list = list(dev_g)
-
- if not dev_list:
- raise SusbError('USB device not found')
-
- # Check if we have multiple stm32s and we've specified the serial.
- dev = None
- if self._serialname:
- for d in dev_list:
- dev_serial = usb.util.get_string(d, d.iSerialNumber)
- if dev_serial == self._serialname:
- dev = d
- break
- if dev is None:
- raise SusbError('USB device(%s) not found' % self._serialname)
- else:
- try:
- dev = dev_list[0]
- except StopIteration:
- raise SusbError('USB device %04x:%04x not found' % (
- self._vendor, self._product))
-
- # If we can't set configuration, it's already been set.
- try:
- dev.set_configuration()
- except usb.core.USBError:
- pass
-
- self._dev = dev
-
- # Get an endpoint instance.
- cfg = dev.get_active_configuration()
- intf = usb.util.find_descriptor(cfg, bInterfaceNumber=self._interface)
- self._intf = intf
- if not intf:
- raise SusbError('Interface %04x:%04x - 0x%x not found' % (
- self._vendor, self._product, self._interface))
-
- # Detach raiden.ko if it is loaded. CCD endpoints support either a kernel
- # module driver that produces a ttyUSB, or direct endpoint access, but
- # can't do both at the same time.
- if dev.is_kernel_driver_active(intf.bInterfaceNumber) is True:
- dev.detach_kernel_driver(intf.bInterfaceNumber)
-
- read_ep_number = intf.bInterfaceNumber + self.READ_ENDPOINT
- read_ep = usb.util.find_descriptor(intf, bEndpointAddress=read_ep_number)
- self._read_ep = read_ep
-
- write_ep_number = intf.bInterfaceNumber + self.WRITE_ENDPOINT
- write_ep = usb.util.find_descriptor(intf, bEndpointAddress=write_ep_number)
- self._write_ep = write_ep
-
- def close(self):
- usb.util.dispose_resources(self._dev)
diff --git a/extra/tigertool/ecusb/tiny_servo_common.py b/extra/tigertool/ecusb/tiny_servo_common.py
deleted file mode 100644
index 152c238bdf..0000000000
--- a/extra/tigertool/ecusb/tiny_servo_common.py
+++ /dev/null
@@ -1,174 +0,0 @@
-# Copyright 2017 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-#
-# Ignore indention messages, since legacy scripts use 2 spaces instead of 4.
-# pylint: disable=bad-indentation,docstring-section-indent
-# pylint: disable=docstring-trailing-quotes
-
-"""Utilities for using lightweight console functions."""
-
-# Note: This is a py2/3 compatible file.
-
-import datetime
-import errno
-import os
-import re
-import subprocess
-import sys
-import time
-
-import six
-
-from . import pty_driver
-from . import stm32uart
-
-
-def get_subprocess_args():
- if six.PY3:
- return {'encoding': 'utf-8'}
- return {}
-
-
-class TinyServoError(Exception):
- """Exceptions."""
-
-
-def log(output):
- """Print output to console, logfiles can be added here.
-
- Args:
- output: string to output.
- """
- sys.stdout.write(output)
- sys.stdout.write('\n')
- sys.stdout.flush()
-
-def check_usb(vidpid, serialname=None):
- """Check if |vidpid| is present on the system's USB.
-
- Args:
- vidpid: string representation of the usb vid:pid, eg. '18d1:2001'
- serialname: serialname if specified.
-
- Returns: True if found, False, otherwise.
- """
- if serialname:
- output = subprocess.check_output(['lsusb', '-v', '-d', vidpid],
- **get_subprocess_args())
- m = re.search(r'^\s*iSerial\s+\d+\s+%s$' % serialname, output, flags=re.M)
- if m:
- return True
-
- return False
- else:
- if subprocess.call(['lsusb', '-d', vidpid], stdout=open('/dev/null', 'w')):
- return False
- return True
-
-def check_usb_sn(vidpid):
- """Return the serial number
-
- Return the serial number of the first USB device with VID:PID vidpid,
- or None if no device is found. This will not work well with two of
- the same device attached.
-
- Args:
- vidpid: string representation of the usb vid:pid, eg. '18d1:2001'
-
- Returns: string serial number if found, None otherwise.
- """
- output = subprocess.check_output(['lsusb', '-v', '-d', vidpid],
- **get_subprocess_args())
- m = re.search(r'^\s*iSerial\s+(.*)$', output, flags=re.M)
- if m:
- return m.group(1)
-
- return None
-
-def wait_for_usb_remove(vidpid, serialname=None, timeout=None):
- """Wait for USB device with vidpid to be removed.
-
- Wrapper for wait_for_usb below
- """
- wait_for_usb(vidpid, serialname=serialname,
- timeout=timeout, desiredpresence=False)
-
-def wait_for_usb(vidpid, serialname=None, timeout=None, desiredpresence=True):
- """Wait for usb device with vidpid to be present/absent.
-
- Args:
- vidpid: string representation of the usb vid:pid, eg. '18d1:2001'
- serialname: serialname if specificed.
- timeout: timeout in seconds, None for no timeout.
- desiredpresence: True for present, False for not present.
-
- Raises:
- TinyServoError: on timeout.
- """
- if timeout:
- finish = datetime.datetime.now() + datetime.timedelta(seconds=timeout)
- while check_usb(vidpid, serialname) != desiredpresence:
- time.sleep(.1)
- if timeout:
- if datetime.datetime.now() > finish:
- raise TinyServoError('Timeout', 'Timeout waiting for USB %s' % vidpid)
-
-def do_serialno(serialno, pty):
- """Set serialnumber 'serialno' via ec console 'pty'.
-
- Commands are:
- # > serialno set 1234
- # Saving serial number
- # Serial number: 1234
-
- Args:
- serialno: string serial number to set.
- pty: tinyservo console to send commands.
-
- Raises:
- TinyServoError: on failure to set.
- ptyError: on command interface error.
- """
- cmd = 'serialno set %s' % serialno
- regex = 'Serial number:\s+(\S+)'
-
- results = pty._issue_cmd_get_results(cmd, [regex])[0]
- sn = results[1].strip().strip('\n\r')
-
- if sn == serialno:
- log('Success !')
- log('Serial set to %s' % sn)
- else:
- log('Serial number set to %s but saved as %s.' % (serialno, sn))
- raise TinyServoError(
- 'Serial Number',
- 'Serial number set to %s but saved as %s.' % (serialno, sn))
-
-def setup_tinyservod(vidpid, interface, serialname=None, debuglog=False):
- """Set up a pty
-
- Set up a pty to the ec console in order
- to send commands. Returns a pty_driver object.
-
- Args:
- vidpid: string vidpid of device to access.
- interface: not used.
- serialname: string serial name of device requested, optional.
- debuglog: chatty printout (boolean)
-
- Returns: pty object
-
- Raises:
- UsbError, SusbError: on device not found
- """
- vidstr, pidstr = vidpid.split(':')
- vid = int(vidstr, 16)
- pid = int(pidstr, 16)
- suart = stm32uart.Suart(vendor=vid, product=pid,
- interface=interface, serialname=serialname,
- debuglog=debuglog)
- suart.run()
- pty = pty_driver.ptyDriver(suart, [])
-
- return pty
diff --git a/extra/tigertool/ecusb/tiny_servod.py b/extra/tigertool/ecusb/tiny_servod.py
deleted file mode 100644
index 632d9c3a20..0000000000
--- a/extra/tigertool/ecusb/tiny_servod.py
+++ /dev/null
@@ -1,54 +0,0 @@
-# Copyright 2020 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-#
-# Ignore indention messages, since legacy scripts use 2 spaces instead of 4.
-# pylint: disable=bad-indentation,docstring-section-indent
-# pylint: disable=docstring-trailing-quotes
-
-"""Helper class to facilitate communication to servo ec console."""
-
-from ecusb import pty_driver
-from ecusb import stm32uart
-
-
-class TinyServod(object):
- """Helper class to wrap a pty_driver with interface."""
-
- def __init__(self, vid, pid, interface, serialname=None, debug=False):
- """Build the driver and interface.
-
- Args:
- vid: servo device vid
- pid: servo device pid
- interface: which usb interface the servo console is on
- serialname: the servo device serial (if available)
- """
- self._vid = vid
- self._pid = pid
- self._interface = interface
- self._serial = serialname
- self._debug = debug
- self._init()
-
- def _init(self):
- self.suart = stm32uart.Suart(vendor=self._vid,
- product=self._pid,
- interface=self._interface,
- serialname=self._serial,
- debuglog=self._debug)
- self.suart.run()
- self.pty = pty_driver.ptyDriver(self.suart, [])
-
- def reinitialize(self):
- """Reinitialize the connect after a reset/disconnect/etc."""
- self.close()
- self._init()
-
- def close(self):
- """Close out the connection and release resources.
-
- Note: if another TinyServod process or servod itself needs the same device
- it's necessary to call this to ensure the usb device is available.
- """
- self.suart.close()