#!/usr/bin/env python3 # Copyright 2016 The ChromiumOS Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Allow creation of uart/console interface via usb google serial endpoint.""" # Note: This is a py2/3 compatible file. from __future__ import print_function import argparse import array import os import sys import termios import threading import time import tty try: import usb # pylint:disable=import-error except ModuleNotFoundError: print("import usb failed") print("try running these commands:") print(" sudo apt-get install python-pip") print(" sudo pip install --pre pyusb") print() sys.exit(-1) import six def GetBuffer(stream): if six.PY3: return stream.buffer return stream """Class Susb covers USB device discovery and initialization. It can find a particular endpoint by vid:pid, serial number, and interface number. """ class SusbError(Exception): """Class for exceptions of Susb.""" def __init__(self, msg, value=0): """SusbError constructor. Args: msg: string, message describing error in detail value: integer, value of error when non-zero status returned. Default=0 """ super(SusbError, self).__init__(msg, value) self.msg = msg self.value = value class Susb: """Provide USB functionality. Instance Variables: _read_ep: pyUSB read endpoint for this interface _write_ep: pyUSB write endpoint for this interface """ READ_ENDPOINT = 0x81 WRITE_ENDPOINT = 0x1 TIMEOUT_MS = 100 def __init__( self, vendor=0x18D1, product=0x500F, interface=1, serialname=None ): """Susb constructor. Discovers and connects to USB endpoints. Args: vendor: usb vendor id of device product: usb product id of device interface: interface number ( 1 - 8 ) of device to use serialname: string of device serialnumber. Raises: SusbError: An error accessing Susb object """ # Find the device. dev_g = usb.core.find(idVendor=vendor, idProduct=product, find_all=True) dev_list = list(dev_g) if dev_list is None: raise SusbError("USB device not found") # Check if we have multiple devices. dev = None if serialname: for d in dev_list: dev_serial = usb.util.get_string(d, d.iSerialNumber) if dev_serial == serialname: dev = d break if dev is None: raise SusbError("USB device(%s) not found" % (serialname,)) else: try: dev = dev_list[0] except IndexError: raise SusbError( "USB device %04x:%04x not found" % (vendor, product) ) # If we can't set configuration, it's already been set. try: dev.set_configuration() except usb.core.USBError: pass # Get an endpoint instance. cfg = dev.get_active_configuration() intf = usb.util.find_descriptor(cfg, bInterfaceNumber=interface) self._intf = intf if not intf: raise SusbError("Interface not found") # Detach raiden.ko if it is loaded. if dev.is_kernel_driver_active(intf.bInterfaceNumber) is True: dev.detach_kernel_driver(intf.bInterfaceNumber) read_ep_number = intf.bInterfaceNumber + self.READ_ENDPOINT read_ep = usb.util.find_descriptor( intf, bEndpointAddress=read_ep_number ) self._read_ep = read_ep write_ep_number = intf.bInterfaceNumber + self.WRITE_ENDPOINT write_ep = usb.util.find_descriptor( intf, bEndpointAddress=write_ep_number ) self._write_ep = write_ep """Suart class implements a stream interface, to access Google's USB class. This creates a send and receive thread that monitors USB and console input and forwards them across. This particular class is hardcoded to stdin/out. """ class SuartError(Exception): """Class for exceptions of Suart.""" def __init__(self, msg, value=0): """SuartError constructor. Args: msg: string, message describing error in detail value: integer, value of error when non-zero status returned. Default=0 """ super(SuartError, self).__init__(msg, value) self.msg = msg self.value = value class Suart: """Provide interface to serial usb endpoint.""" def __init__( self, vendor=0x18D1, product=0x501C, interface=0, serialname=None ): """Suart contstructor. Initializes USB stream interface. Args: vendor: usb vendor id of device product: usb product id of device interface: interface number of device to use serialname: Defaults to None. Raises: SuartError: If init fails """ self._done = threading.Event() self._susb = Susb( vendor=vendor, product=product, interface=interface, serialname=serialname, ) def wait_until_done(self, timeout=None): return self._done.wait(timeout=timeout) def run_rx_thread(self): try: while True: try: r = self._susb._read_ep.read(64, self._susb.TIMEOUT_MS) if r: GetBuffer(sys.stdout).write(r.tobytes()) GetBuffer(sys.stdout).flush() except Exception as e: # If we miss some characters on pty disconnect, that's fine. # ep.read() also throws USBError on timeout, which we discard. if not isinstance(e, (OSError, usb.core.USBError)): print("rx %s" % e) finally: self._done.set() def run_tx_thread(self): try: while True: try: r = GetBuffer(sys.stdin).read(1) if not r or r == b"\x03": break if r: self._susb._write_ep.write( array.array("B", r), self._susb.TIMEOUT_MS ) except Exception as e: print("tx %s" % e) finally: self._done.set() def run(self): """Creates pthreads to poll USB & PTY for data.""" self._exit = False self._rx_thread = threading.Thread(target=self.run_rx_thread) self._rx_thread.daemon = True self._rx_thread.start() self._tx_thread = threading.Thread(target=self.run_tx_thread) self._tx_thread.daemon = True self._tx_thread.start() """Command line functionality Allows specifying vid:pid, serialnumber, interface. Ctrl-C exits. """ parser = argparse.ArgumentParser( description="Open a console to a USB device", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument( "-d", "--device", type=str, help="vid:pid of target device", default="18d1:501c", ) parser.add_argument( "-i", "--interface", type=int, help="interface number of console", default=0 ) parser.add_argument( "-s", "--serialno", type=str, help="serial number of device", default="" ) parser.add_argument( "-S", "--notty-exit-sleep", type=float, default=0.2, help="When stdin is *not* a TTY, wait this many seconds " "after EOF from stdin before exiting, to give time for " "receiving a reply from the USB device.", ) def runconsole(): """Run the usb console code Starts the pty thread, and idles until a ^C is caught. """ args = parser.parse_args() vidstr, pidstr = args.device.split(":") vid = int(vidstr, 16) pid = int(pidstr, 16) serialno = args.serialno interface = args.interface sobj = Suart( vendor=vid, product=pid, interface=interface, serialname=serialno ) if sys.stdin.isatty(): tty.setraw(sys.stdin.fileno()) sobj.run() sobj.wait_until_done() if not sys.stdin.isatty() and args.notty_exit_sleep > 0: time.sleep(args.notty_exit_sleep) def main(): stdin_isatty = sys.stdin.isatty() if stdin_isatty: fd = sys.stdin.fileno() os.system("stty -echo") old_settings = termios.tcgetattr(fd) try: runconsole() finally: if stdin_isatty: termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) os.system("stty echo") # Avoid having the user's shell prompt start mid-line after the final output # from this program. print() if __name__ == "__main__": main()