summaryrefslogtreecommitdiff
path: root/extra/tigertool/ecusb
diff options
context:
space:
mode:
authorNick Sanders <nsanders@chromium.org>2017-05-02 21:31:10 -0700
committerchrome-bot <chrome-bot@chromium.org>2017-05-16 00:09:24 -0700
commit395fce39e2116e3aa7a90f98671ef2310e987dc2 (patch)
treed169e78382f68cea4a53a5f6fa7b918aa04aa3fa /extra/tigertool/ecusb
parentf09ac93aeabcab6e681128a6a186aa9b511bcab5 (diff)
downloadchrome-ec-395fce39e2116e3aa7a90f98671ef2310e987dc2.tar.gz
tigertail: tigertool command line api
This tool allows an easy commandline interface to set the USB-C mux position, as well as init and reboot. BRANCH=None BUG=b:35849284 TEST=flash, control tigertail successfully Change-Id: I8d60c215fee04de158c22edca5377c3c6cd48cf0 Reviewed-on: https://chromium-review.googlesource.com/493617 Commit-Ready: Nick Sanders <nsanders@chromium.org> Tested-by: Nick Sanders <nsanders@chromium.org> Reviewed-by: Aseda Aboagye <aaboagye@chromium.org>
Diffstat (limited to 'extra/tigertool/ecusb')
-rw-r--r--extra/tigertool/ecusb/__init__.py5
-rw-r--r--extra/tigertool/ecusb/pty_driver.py291
-rw-r--r--extra/tigertool/ecusb/stm32uart.py235
-rw-r--r--extra/tigertool/ecusb/stm32usb.py108
-rw-r--r--extra/tigertool/ecusb/tiny_servo_common.py145
5 files changed, 784 insertions, 0 deletions
diff --git a/extra/tigertool/ecusb/__init__.py b/extra/tigertool/ecusb/__init__.py
new file mode 100644
index 0000000000..c2af553d57
--- /dev/null
+++ b/extra/tigertool/ecusb/__init__.py
@@ -0,0 +1,5 @@
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+__all__ = ['tiny_servo_common', 'stm32usb', 'stm32uart', 'pty_driver']
diff --git a/extra/tigertool/ecusb/pty_driver.py b/extra/tigertool/ecusb/pty_driver.py
new file mode 100644
index 0000000000..0802675634
--- /dev/null
+++ b/extra/tigertool/ecusb/pty_driver.py
@@ -0,0 +1,291 @@
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""ptyDriver class
+
+This class takes a pty interface and can send commands and expect results
+as regex. This is useful for automating console based interfaces, such as
+the CrOS EC console commands.
+"""
+
+import ast
+import errno
+import fcntl
+import os
+import pexpect
+import fdpexpect
+
+
+# Expecting a result in 3 seconds is plenty even for slow platforms.
+DEFAULT_UART_TIMEOUT = 3
+
+
+class ptyError(Exception):
+ """Exception class for pty errors."""
+
+
+UART_PARAMS = {
+ 'uart_cmd': None,
+ 'uart_multicmd': None,
+ 'uart_regexp': None,
+ 'uart_timeout': DEFAULT_UART_TIMEOUT,
+}
+
+
+class ptyDriver(object):
+ """Automate interactive commands on a pty interface."""
+ def __init__(self, interface, params, fast=False):
+ """Init class variables."""
+ self._child = None
+ self._fd = None
+ self._interface = interface
+ self._pty_path = self._interface.get_pty()
+ self._dict = UART_PARAMS.copy()
+ self._fast = fast
+
+ def __del__(self):
+ self.close()
+
+ def close(self):
+ """Close any open files and interfaces."""
+ if self._fd:
+ self._close()
+ self._interface.close()
+
+ def _open(self):
+ """Connect to serial device and create pexpect interface."""
+ assert self._fd is None
+ self._fd = os.open(self._pty_path, os.O_RDWR | os.O_NONBLOCK)
+ # Don't allow forked processes to access.
+ fcntl.fcntl(self._fd, fcntl.F_SETFD,
+ fcntl.fcntl(self._fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC)
+ self._child = fdpexpect.fdspawn(self._fd)
+ # pexpect defaults to a 100ms delay before sending characters, to
+ # work around race conditions in ssh. We don't need this feature
+ # so we'll change delaybeforesend from 0.1 to 0.001 to speed things up.
+ if self._fast:
+ self._child.delaybeforesend = 0.001
+
+ def _close(self):
+ """Close serial device connection."""
+ os.close(self._fd)
+ self._fd = None
+ self._child = None
+
+ def _flush(self):
+ """Flush device output to prevent previous messages interfering."""
+ if self._child.sendline('') != 1:
+ raise ptyError('Failed to send newline.')
+ while True:
+ try:
+ self._child.expect('.', timeout=0.2)
+ except (pexpect.TIMEOUT, pexpect.EOF):
+ break
+ except OSError as e:
+ # EAGAIN indicates no data available, maybe we didn't wait long enough.
+ if e.errno != errno.EAGAIN:
+ raise
+ break
+
+ def _send(self, cmds):
+ """Send command to EC.
+
+ This function always flushes serial device before sending, and is used as
+ a wrapper function to make sure the channel is always flushed before
+ sending commands.
+
+ Args:
+ cmds: The commands to send to the device, either a list or a string.
+
+ Raises:
+ ptyError: Raised when writing to the device fails.
+ """
+ self._flush()
+ if not isinstance(cmds, list):
+ cmds = [cmds]
+ for cmd in cmds:
+ if self._child.sendline(cmd) != len(cmd) + 1:
+ raise ptyError('Failed to send command.')
+
+ def _issue_cmd(self, cmds):
+ """Send command to the device and do not wait for response.
+
+ Args:
+ cmds: The commands to send to the device, either a list or a string.
+ """
+ self._issue_cmd_get_results(cmds, [])
+
+ def _issue_cmd_get_results(self, cmds,
+ regex_list, timeout=DEFAULT_UART_TIMEOUT):
+ """Send command to the device and wait for response.
+
+ This function waits for response message matching a regular
+ expressions.
+
+ Args:
+ cmds: The commands issued, either a list or a string.
+ regex_list: List of Regular expressions used to match response message.
+ Note1, list must be ordered.
+ Note2, empty list sends and returns.
+ timeout: time to wait for matching results before failing.
+
+ Returns:
+ List of tuples, each of which contains the entire matched string and
+ all the subgroups of the match. None if not matched.
+ For example:
+ response of the given command:
+ High temp: 37.2
+ Low temp: 36.4
+ regex_list:
+ ['High temp: (\d+)\.(\d+)', 'Low temp: (\d+)\.(\d+)']
+ returns:
+ [('High temp: 37.2', '37', '2'), ('Low temp: 36.4', '36', '4')]
+
+ Raises:
+ ptyError: If timed out waiting for a response
+ """
+ result_list = []
+ self._open()
+ try:
+ self._send(cmds)
+ for regex in regex_list:
+ self._child.expect(regex, timeout)
+ match = self._child.match
+ lastindex = match.lastindex if match and match.lastindex else 0
+ # Create a tuple which contains the entire matched string and all
+ # the subgroups of the match.
+ result = match.group(*range(lastindex + 1)) if match else None
+ result_list.append(result)
+ except pexpect.TIMEOUT:
+ raise ptyError('Timeout waiting for response.')
+ finally:
+ self._close()
+ return result_list
+
+ def _issue_cmd_get_multi_results(self, cmd, regex):
+ """Send command to the device and wait for multiple response.
+
+ This function waits for arbitrary number of response message
+ matching a regular expression.
+
+ Args:
+ cmd: The command issued.
+ regex: Regular expression used to match response message.
+
+ Returns:
+ List of tuples, each of which contains the entire matched string and
+ all the subgroups of the match. None if not matched.
+ """
+ result_list = []
+ self._open()
+ try:
+ self._send(cmd)
+ while True:
+ try:
+ self._child.expect(regex, timeout=0.1)
+ match = self._child.match
+ lastindex = match.lastindex if match and match.lastindex else 0
+ # Create a tuple which contains the entire matched string and all
+ # the subgroups of the match.
+ result = match.group(*range(lastindex + 1)) if match else None
+ result_list.append(result)
+ except pexpect.TIMEOUT:
+ break
+ finally:
+ self._close()
+ return result_list
+
+ def _Set_uart_timeout(self, timeout):
+ """Set timeout value for waiting for the device response.
+
+ Args:
+ timeout: Timeout value in second.
+ """
+ self._dict['uart_timeout'] = timeout
+
+ def _Get_uart_timeout(self):
+ """Get timeout value for waiting for the device response.
+
+ Returns:
+ Timeout value in second.
+ """
+ return self._dict['uart_timeout']
+
+ def _Set_uart_regexp(self, regexp):
+ """Set the list of regular expressions which matches the command response.
+
+ Args:
+ regexp: A string which contains a list of regular expressions.
+ """
+ if not isinstance(regexp, str):
+ raise ptyError('The argument regexp should be a string.')
+ self._dict['uart_regexp'] = ast.literal_eval(regexp)
+
+ def _Get_uart_regexp(self):
+ """Get the list of regular expressions which matches the command response.
+
+ Returns:
+ A string which contains a list of regular expressions.
+ """
+ return str(self._dict['uart_regexp'])
+
+ def _Set_uart_cmd(self, cmd):
+ """Set the UART command and send it to the device.
+
+ If ec_uart_regexp is 'None', the command is just sent and it doesn't care
+ about its response.
+
+ If ec_uart_regexp is not 'None', the command is send and its response,
+ which matches the regular expression of ec_uart_regexp, will be kept.
+ Use its getter to obtain this result. If no match after ec_uart_timeout
+ seconds, a timeout error will be raised.
+
+ Args:
+ cmd: A string of UART command.
+ """
+ if self._dict['uart_regexp']:
+ self._dict['uart_cmd'] = self._issue_cmd_get_results(
+ cmd, self._dict['uart_regexp'], self._dict['uart_timeout'])
+ else:
+ self._dict['uart_cmd'] = None
+ self._issue_cmd(cmd)
+
+ def _Set_uart_multicmd(self, cmds):
+ """Set multiple UART commands and send them to the device.
+
+ Note that ec_uart_regexp is not supported to match the results.
+
+ Args:
+ cmds: A semicolon-separated string of UART commands.
+ """
+ self._issue_cmd(cmds.split(';'))
+
+ def _Get_uart_cmd(self):
+ """Get the result of the latest UART command.
+
+ Returns:
+ A string which contains a list of tuples, each of which contains the
+ entire matched string and all the subgroups of the match. 'None' if
+ the ec_uart_regexp is 'None'.
+ """
+ return str(self._dict['uart_cmd'])
+
+ def _Set_uart_capture(self, cmd):
+ """Set UART capture mode (on or off).
+
+ Once capture is enabled, UART output could be collected periodically by
+ invoking _Get_uart_stream() below.
+
+ Args:
+ cmd: True for on, False for off
+ """
+ self._interface.set_capture_active(cmd)
+
+ def _Get_uart_capture(self):
+ """Get the UART capture mode (on or off)."""
+ return self._interface.get_capture_active()
+
+ def _Get_uart_stream(self):
+ """Get uart stream generated since last time."""
+ return self._interface.get_stream()
diff --git a/extra/tigertool/ecusb/stm32uart.py b/extra/tigertool/ecusb/stm32uart.py
new file mode 100644
index 0000000000..3f39b23faa
--- /dev/null
+++ b/extra/tigertool/ecusb/stm32uart.py
@@ -0,0 +1,235 @@
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Allow creation of uart/console interface via stm32 usb endpoint."""
+
+import os
+import select
+import sys
+import termios
+import threading
+import time
+import tty
+import usb
+
+import stm32usb
+
+
+class SuartError(Exception):
+ """Class for exceptions of Suart."""
+ def __init__(self, msg, value=0):
+ """SuartError constructor.
+
+ Args:
+ msg: string, message describing error in detail
+ value: integer, value of error when non-zero status returned. Default=0
+ """
+ super(SuartError, self).__init__(msg, value)
+ self.msg = msg
+ self.value = value
+
+
+class Suart(object):
+ """Provide interface to stm32 serial usb endpoint."""
+ def __init__(self, vendor=0x18d1, product=0x501a, interface=0,
+ serialname=None, ftdi_context=None):
+ """Suart contstructor.
+
+ Initializes stm32 USB stream interface.
+
+ Args:
+ vendor: usb vendor id of stm32 device
+ product: usb product id of stm32 device
+ interface: interface number of stm32 device to use
+ serialname: n/a. Defaults to None.
+ ftdi_context: n/a. Defaults to None.
+
+ Raises:
+ SuartError: If init fails
+ """
+ self._ptym = None
+ self._ptys = None
+ self._ptyname = None
+ self._rx_thread = None
+ self._tx_thread = None
+ self._susb = stm32usb.Susb(vendor=vendor, product=product,
+ interface=interface, serialname=serialname)
+ self._running = False
+
+ def __del__(self):
+ """Suart destructor."""
+ self.close()
+
+ def close(self):
+ """Stop all running threads."""
+ self._running = False
+ if self._rx_thread:
+ self._rx_thread.join(2)
+ self._rx_thread = None
+ if self._tx_thread:
+ self._tx_thread.join(2)
+ self._tx_thread = None
+
+ def run_rx_thread(self):
+ """Background loop to pass data from USB to pty."""
+ ep = select.epoll()
+ ep.register(self._ptym, select.EPOLLHUP)
+ try:
+ while self._running:
+ events = ep.poll(0)
+ # Check if the pty is connected to anything, or hungup.
+ if not events:
+ try:
+ r = self._susb._read_ep.read(64, self._susb.TIMEOUT_MS)
+ if r:
+ os.write(self._ptym, r)
+
+ # If we miss some characters on pty disconnect, that's fine.
+ # ep.read() also throws USBError on timeout, which we discard.
+ except OSError:
+ pass
+ except usb.core.USBError:
+ pass
+ else:
+ time.sleep(.1)
+ except Exception as e:
+ raise e
+
+ def run_tx_thread(self):
+ """Background loop to pass data from pty to USB."""
+ ep = select.epoll()
+ ep.register(self._ptym, select.EPOLLHUP)
+ try:
+ while self._running:
+ events = ep.poll(0)
+ # Check if the pty is connected to anything, or hungup.
+ if not events:
+ try:
+ r = os.read(self._ptym, 64)
+ if r:
+ self._susb._write_ep.write(r, self._susb.TIMEOUT_MS)
+
+ except OSError:
+ pass
+ except usb.core.USBError:
+ pass
+ else:
+ time.sleep(.1)
+ except Exception as e:
+ raise e
+
+ def run(self):
+ """Creates pthreads to poll stm32 & PTY for data."""
+ m, s = os.openpty()
+ self._ptyname = os.ttyname(s)
+
+ self._ptym = m
+ self._ptys = s
+
+ os.fchmod(s, 0o660)
+
+ # Change the owner and group of the PTY to the user who started servod.
+ try:
+ uid = int(os.environ.get('SUDO_UID', -1))
+ except TypeError:
+ uid = -1
+
+ try:
+ gid = int(os.environ.get('SUDO_GID', -1))
+ except TypeError:
+ gid = -1
+ os.fchown(s, uid, gid)
+
+ tty.setraw(self._ptym, termios.TCSADRAIN)
+
+ # Generate a HUP flag on pty slave fd.
+ os.fdopen(s).close()
+
+ self._running = True
+
+ self._rx_thread = threading.Thread(target=self.run_rx_thread, args=[])
+ self._rx_thread.daemon = True
+ self._rx_thread.start()
+
+ self._tx_thread = threading.Thread(target=self.run_tx_thread, args=[])
+ self._tx_thread.daemon = True
+ self._tx_thread.start()
+
+ def get_uart_props(self):
+ """Get the uart's properties.
+
+ Returns:
+ dict where:
+ baudrate: integer of uarts baudrate
+ bits: integer, number of bits of data Can be 5|6|7|8 inclusive
+ parity: integer, parity of 0-2 inclusive where:
+ 0: no parity
+ 1: odd parity
+ 2: even parity
+ sbits: integer, number of stop bits. Can be 0|1|2 inclusive where:
+ 0: 1 stop bit
+ 1: 1.5 stop bits
+ 2: 2 stop bits
+ """
+ return {
+ 'baudrate': 115200,
+ 'bits': 8,
+ 'parity': 0,
+ 'sbits': 1,
+ }
+
+ def set_uart_props(self, line_props):
+ """Set the uart's properties.
+
+ Note that Suart cannot set properties
+ and will fail if the properties are not the default 115200,8n1.
+
+ Args:
+ line_props: dict where:
+ baudrate: integer of uarts baudrate
+ bits: integer, number of bits of data ( prior to stop bit)
+ parity: integer, parity of 0-2 inclusive where
+ 0: no parity
+ 1: odd parity
+ 2: even parity
+ sbits: integer, number of stop bits. Can be 0|1|2 inclusive where:
+ 0: 1 stop bit
+ 1: 1.5 stop bits
+ 2: 2 stop bits
+
+ Raises:
+ SuartError: If requested line properties are not the default.
+ """
+ curr_props = self.get_uart_props()
+ for prop in line_props:
+ if line_props[prop] != curr_props[prop]:
+ raise SuartError('Line property %s cannot be set from %s to %s' % (
+ prop, curr_props[prop], line_props[prop]))
+ return True
+
+ def get_pty(self):
+ """Gets path to pty for communication to/from uart.
+
+ Returns:
+ String path to the pty connected to the uart
+ """
+ return self._ptyname
+
+
+def main():
+ """Run a suart test with the default parameters."""
+ try:
+ sobj = Suart()
+ sobj.run()
+
+ # run() is a thread so just busy wait to mimic server.
+ while True:
+ # Ours sleeps to eleven!
+ time.sleep(11)
+ except KeyboardInterrupt:
+ sys.exit(0)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/extra/tigertool/ecusb/stm32usb.py b/extra/tigertool/ecusb/stm32usb.py
new file mode 100644
index 0000000000..4165997e5d
--- /dev/null
+++ b/extra/tigertool/ecusb/stm32usb.py
@@ -0,0 +1,108 @@
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Allows creation of an interface via stm32 usb."""
+
+import usb
+
+
+class SusbError(Exception):
+ """Class for exceptions of Susb."""
+ def __init__(self, msg, value=0):
+ """SusbError constructor.
+
+ Args:
+ msg: string, message describing error in detail
+ value: integer, value of error when non-zero status returned. Default=0
+ """
+ super(SusbError, self).__init__(msg, value)
+ self.msg = msg
+ self.value = value
+
+
+class Susb(object):
+ """Provide stm32 USB functionality.
+
+ Instance Variables:
+ _read_ep: pyUSB read endpoint for this interface
+ _write_ep: pyUSB write endpoint for this interface
+ """
+ READ_ENDPOINT = 0x81
+ WRITE_ENDPOINT = 0x1
+ TIMEOUT_MS = 100
+
+ def __init__(self, vendor=0x18d1,
+ product=0x5027, interface=1, serialname=None, logger=None):
+ """Susb constructor.
+
+ Discovers and connects to stm32 USB endpoints.
+
+ Args:
+ vendor: usb vendor id of stm32 device.
+ product: usb product id of stm32 device.
+ interface: interface number ( 1 - 4 ) of stm32 device to use.
+ serialname: string of device serialname.
+ logger: none
+
+ Raises:
+ SusbError: An error accessing Susb object
+ """
+ self._vendor = vendor
+ self._product = product
+ self._interface = interface
+ self._serialname = serialname
+ self._find_device()
+
+ def _find_device(self):
+ """Set up the usb endpoint"""
+ # Find the stm32.
+ dev_list = usb.core.find(idVendor=self._vendor, idProduct=self._product,
+ find_all=True)
+ if not dev_list:
+ raise SusbError('USB device not found')
+
+ # Check if we have multiple stm32s and we've specified the serial.
+ dev = None
+ if self._serialname:
+ for d in dev_list:
+ dev_serial = usb.util.get_string(d, d.iSerialNumber)
+ if dev_serial == self._serialname:
+ dev = d
+ break
+ if dev is None:
+ raise SusbError('USB device(%s) not found' % self._serialname)
+ else:
+ try:
+ dev = dev_list.next()
+ except StopIteration:
+ raise SusbError('USB device %04x:%04x not found' % (
+ self._vendor, self._product))
+
+ # If we can't set configuration, it's already been set.
+ try:
+ dev.set_configuration()
+ except usb.core.USBError:
+ pass
+
+ # Get an endpoint instance.
+ cfg = dev.get_active_configuration()
+ intf = usb.util.find_descriptor(cfg, bInterfaceNumber=self._interface)
+ self._intf = intf
+ if not intf:
+ raise SusbError('Interface %04x:%04x - 0x%x not found' % (
+ self._vendor, self._product, self._interface))
+
+ # Detach raiden.ko if it is loaded. CCD endpoints support either a kernel
+ # module driver that produces a ttyUSB, or direct endpoint access, but
+ # can't do both at the same time.
+ if dev.is_kernel_driver_active(intf.bInterfaceNumber) is True:
+ dev.detach_kernel_driver(intf.bInterfaceNumber)
+
+ read_ep_number = intf.bInterfaceNumber + self.READ_ENDPOINT
+ read_ep = usb.util.find_descriptor(intf, bEndpointAddress=read_ep_number)
+ self._read_ep = read_ep
+
+ write_ep_number = intf.bInterfaceNumber + self.WRITE_ENDPOINT
+ write_ep = usb.util.find_descriptor(intf, bEndpointAddress=write_ep_number)
+ self._write_ep = write_ep
diff --git a/extra/tigertool/ecusb/tiny_servo_common.py b/extra/tigertool/ecusb/tiny_servo_common.py
new file mode 100644
index 0000000000..a65697b116
--- /dev/null
+++ b/extra/tigertool/ecusb/tiny_servo_common.py
@@ -0,0 +1,145 @@
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Utilities for using lightweight console functions."""
+
+import datetime
+import errno
+import os
+import re
+import subprocess
+import sys
+import time
+
+import pty_driver
+import stm32uart
+
+
+class TinyServoError(Exception):
+ """Exceptions."""
+
+
+def log(output):
+ """Print output to console, logfiles can be added here.
+
+ Args:
+ output: string to output.
+ """
+ sys.stdout.write(output)
+ sys.stdout.write('\n')
+ sys.stdout.flush()
+
+def check_usb(vidpid):
+ """Check if |vidpid| is present on the system's USB.
+
+ Args:
+ vidpid: string representation of the usb vid:pid, eg. '18d1:2001'
+
+ Returns: True if found, Flase, otherwise.
+ """
+ if subprocess.call(['lsusb', '-d', vidpid], stdout=open('/dev/null', 'w')):
+ return False
+ return True
+
+def check_usb_sn(vidpid):
+ """Return the serial number
+
+ Return the serial number of the first USB device with VID:PID vidpid,
+ or None if no device is found. This will not work well with two of
+ the same device attached.
+
+ Args:
+ vidpid: string representation of the usb vid:pid, eg. '18d1:2001'
+
+ Returns: string serial number if found, None otherwise.
+ """
+ output = subprocess.check_output(['lsusb', '-v', '-d', vidpid])
+ m = re.search(r'^\s*iSerial\s+(.*)$', output, flags=re.M)
+ if m:
+ return m.group(1)
+
+ return None
+
+def wait_for_usb_remove(vidpid, timeout=None):
+ """Wait for USB device with vidpid to be removed.
+
+ Wrapper for wait_for_usb below
+ """
+ wait_for_usb(vidpid, timeout=timeout, desiredpresence=False)
+
+def wait_for_usb(vidpid, timeout=None, desiredpresence=True):
+ """Wait for usb device with vidpid to be present/absent.
+
+ Args:
+ vidpid: string representation of the usb vid:pid, eg. '18d1:2001'
+ timeout: timeout in seconds, None for no timeout.
+ desiredpresence: True for present, False for not present.
+
+ Raises:
+ TinyServoError: on timeout.
+ """
+ if timeout:
+ finish = datetime.datetime.now() + datetime.timedelta(seconds=timeout)
+ while check_usb(vidpid) != desiredpresence:
+ time.sleep(.1)
+ if timeout:
+ if datetime.datetime.now() > finish:
+ raise TinyServoError('Timeout', 'Timeout waiting for USB %s' % vidpid)
+
+def do_serialno(serialno, pty):
+ """Set serialnumber 'serialno' via ec console 'pty'.
+
+ Commands are:
+ # > serialno set 1234
+ # Saving serial number
+ # Serial number: 1234
+
+ Args:
+ serialno: string serial number to set.
+ pty: tinyservo console to send commands.
+
+ Raises:
+ TinyServoError: on failure to set.
+ ptyError: on command interface error.
+ """
+ cmd = 'serialno set %s' % serialno
+ regex = 'Serial number: (.*)$'
+
+ results = pty._issue_cmd_get_results(cmd, [regex])[0]
+ sn = results[1].strip().strip('\n\r')
+
+ if sn == serialno:
+ log('Success !')
+ log('Serial set to %s' % sn)
+ else:
+ log('Serial number set to %s but saved as %s.' % (serialno, sn))
+ raise TinyServoError(
+ 'Serial Number',
+ 'Serial number set to %s but saved as %s.' % (serialno, sn))
+
+def setup_tinyservod(vidpid, interface, serialno=None):
+ """Set up a pty
+
+ Set up a pty to the ec console in order
+ to send commands. Returns a pty_driver object.
+
+ Args:
+ vidpid: string vidpid of device to access.
+ interface: not used.
+ serialno: string serial no of device requested, optional.
+
+ Returns: pty object
+
+ Raises:
+ UsbError, SusbError: on device not found
+ """
+ vidstr, pidstr = vidpid.split(':')
+ vid = int(vidstr, 16)
+ pid = int(pidstr, 16)
+ suart = stm32uart.Suart(vendor=vid, product=pid,
+ interface=interface, serialname=serialno)
+ suart.run()
+ pty = pty_driver.ptyDriver(suart, [])
+
+ return pty