diff options
Diffstat (limited to 'extra/usb_serial/console.py')
-rwxr-xr-x | extra/usb_serial/console.py | 298 |
1 files changed, 0 insertions, 298 deletions
diff --git a/extra/usb_serial/console.py b/extra/usb_serial/console.py deleted file mode 100755 index 7b3bacd903..0000000000 --- a/extra/usb_serial/console.py +++ /dev/null @@ -1,298 +0,0 @@ -#!/usr/bin/env python -# Copyright 2016 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. -# -# Ignore indention messages, since legacy scripts use 2 spaces instead of 4. -# pylint: disable=bad-indentation,docstring-section-indent -# pylint: disable=docstring-trailing-quotes - -"""Allow creation of uart/console interface via usb google serial endpoint.""" - -# Note: This is a py2/3 compatible file. - -from __future__ import print_function -import argparse -import array -import os -import sys -import termios -import threading -import time -import traceback -import tty -try: - import usb -except: - print("import usb failed") - print("try running these commands:") - print(" sudo apt-get install python-pip") - print(" sudo pip install --pre pyusb") - print() - sys.exit(-1) - -import six - - -def GetBuffer(stream): - if six.PY3: - return stream.buffer - return stream - - -"""Class Susb covers USB device discovery and initialization. - - It can find a particular endpoint by vid:pid, serial number, - and interface number. -""" - -class SusbError(Exception): - """Class for exceptions of Susb.""" - def __init__(self, msg, value=0): - """SusbError constructor. - - Args: - msg: string, message describing error in detail - value: integer, value of error when non-zero status returned. Default=0 - """ - super(SusbError, self).__init__(msg, value) - self.msg = msg - self.value = value - -class Susb(): - """Provide USB functionality. - - Instance Variables: - _read_ep: pyUSB read endpoint for this interface - _write_ep: pyUSB write endpoint for this interface - """ - READ_ENDPOINT = 0x81 - WRITE_ENDPOINT = 0x1 - TIMEOUT_MS = 100 - - def __init__(self, vendor=0x18d1, - product=0x500f, interface=1, serialname=None): - """Susb constructor. - - Discovers and connects to USB endpoints. - - Args: - vendor : usb vendor id of device - product : usb product id of device - interface : interface number ( 1 - 8 ) of device to use - serialname: string of device serialnumber. - - Raises: - SusbError: An error accessing Susb object - """ - # Find the device. - dev_g = usb.core.find(idVendor=vendor, idProduct=product, find_all=True) - dev_list = list(dev_g) - if dev_list is None: - raise SusbError("USB device not found") - - # Check if we have multiple devices. - dev = None - if serialname: - for d in dev_list: - dev_serial = "PyUSB doesn't have a stable interface" - try: - dev_serial = usb.util.get_string(d, 256, d.iSerialNumber) - except: - dev_serial = usb.util.get_string(d, d.iSerialNumber) - if dev_serial == serialname: - dev = d - break - if dev is None: - raise SusbError("USB device(%s) not found" % (serialname,)) - else: - try: - dev = dev_list[0] - except: - try: - dev = dev_list.next() - except: - raise SusbError("USB device %04x:%04x not found" % (vendor, product)) - - # If we can't set configuration, it's already been set. - try: - dev.set_configuration() - except usb.core.USBError: - pass - - # Get an endpoint instance. - cfg = dev.get_active_configuration() - intf = usb.util.find_descriptor(cfg, bInterfaceNumber=interface) - self._intf = intf - - if not intf: - raise SusbError("Interface not found") - - # Detach raiden.ko if it is loaded. - if dev.is_kernel_driver_active(intf.bInterfaceNumber) is True: - dev.detach_kernel_driver(intf.bInterfaceNumber) - - read_ep_number = intf.bInterfaceNumber + self.READ_ENDPOINT - read_ep = usb.util.find_descriptor(intf, bEndpointAddress=read_ep_number) - self._read_ep = read_ep - - write_ep_number = intf.bInterfaceNumber + self.WRITE_ENDPOINT - write_ep = usb.util.find_descriptor(intf, bEndpointAddress=write_ep_number) - self._write_ep = write_ep - - -"""Suart class implements a stream interface, to access Google's USB class. - - This creates a send and receive thread that monitors USB and console input - and forwards them across. This particular class is hardcoded to stdin/out. -""" - -class SuartError(Exception): - """Class for exceptions of Suart.""" - def __init__(self, msg, value=0): - """SuartError constructor. - - Args: - msg: string, message describing error in detail - value: integer, value of error when non-zero status returned. Default=0 - """ - super(SuartError, self).__init__(msg, value) - self.msg = msg - self.value = value - - -class Suart(): - """Provide interface to serial usb endpoint.""" - - def __init__(self, vendor=0x18d1, product=0x501c, interface=0, - serialname=None): - """Suart contstructor. - - Initializes USB stream interface. - - Args: - vendor: usb vendor id of device - product: usb product id of device - interface: interface number of device to use - serialname: Defaults to None. - - Raises: - SuartError: If init fails - """ - self._done = threading.Event() - self._susb = Susb(vendor=vendor, product=product, - interface=interface, serialname=serialname) - - def wait_until_done(self, timeout=None): - return self._done.wait(timeout=timeout) - - def run_rx_thread(self): - try: - while True: - try: - r = self._susb._read_ep.read(64, self._susb.TIMEOUT_MS) - if r: - GetBuffer(sys.stdout).write(r.tostring()) - GetBuffer(sys.stdout).flush() - - except Exception as e: - # If we miss some characters on pty disconnect, that's fine. - # ep.read() also throws USBError on timeout, which we discard. - if not isinstance(e, (OSError, usb.core.USBError)): - print("rx %s" % e) - finally: - self._done.set() - - def run_tx_thread(self): - try: - while True: - try: - r = GetBuffer(sys.stdin).read(1) - if not r or r == b"\x03": - break - if r: - self._susb._write_ep.write(array.array('B', r), - self._susb.TIMEOUT_MS) - except Exception as e: - print("tx %s" % e) - finally: - self._done.set() - - def run(self): - """Creates pthreads to poll USB & PTY for data. - """ - self._exit = False - - self._rx_thread = threading.Thread(target=self.run_rx_thread) - self._rx_thread.daemon = True - self._rx_thread.start() - - self._tx_thread = threading.Thread(target=self.run_tx_thread) - self._tx_thread.daemon = True - self._tx_thread.start() - - - -"""Command line functionality - - Allows specifying vid:pid, serialnumber, interface. - Ctrl-C exits. -""" - -parser = argparse.ArgumentParser(description="Open a console to a USB device") -parser.add_argument('-d', '--device', type=str, - help="vid:pid of target device", default="18d1:501c") -parser.add_argument('-i', '--interface', type=int, - help="interface number of console", default=0) -parser.add_argument('-s', '--serialno', type=str, - help="serial number of device", default="") -parser.add_argument('-S', '--notty-exit-sleep', type=float, default=0.2, - help="When stdin is *not* a TTY, wait this many seconds after EOF from " - "stdin before exiting, to give time for receiving a reply from the USB " - "device.") - - -def runconsole(): - """Run the usb console code - - Starts the pty thread, and idles until a ^C is caught. - """ - args = parser.parse_args() - - vidstr, pidstr = args.device.split(':') - vid = int(vidstr, 16) - pid = int(pidstr, 16) - - serialno = args.serialno - interface = args.interface - - sobj = Suart(vendor=vid, product=pid, interface=interface, - serialname=serialno) - if sys.stdin.isatty(): - tty.setraw(sys.stdin.fileno()) - sobj.run() - sobj.wait_until_done() - if not sys.stdin.isatty() and args.notty_exit_sleep > 0: - time.sleep(args.notty_exit_sleep) - - -def main(): - stdin_isatty = sys.stdin.isatty() - if stdin_isatty: - fd = sys.stdin.fileno() - os.system("stty -echo") - old_settings = termios.tcgetattr(fd) - - try: - runconsole() - finally: - if stdin_isatty: - termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) - os.system("stty echo") - # Avoid having the user's shell prompt start mid-line after the final output - # from this program. - print() - - -if __name__ == '__main__': - main() |