summaryrefslogtreecommitdiff
path: root/extra/usb_serial/console.py
diff options
context:
space:
mode:
Diffstat (limited to 'extra/usb_serial/console.py')
-rwxr-xr-xextra/usb_serial/console.py298
1 files changed, 0 insertions, 298 deletions
diff --git a/extra/usb_serial/console.py b/extra/usb_serial/console.py
deleted file mode 100755
index 7b3bacd903..0000000000
--- a/extra/usb_serial/console.py
+++ /dev/null
@@ -1,298 +0,0 @@
-#!/usr/bin/env python
-# Copyright 2016 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-#
-# Ignore indention messages, since legacy scripts use 2 spaces instead of 4.
-# pylint: disable=bad-indentation,docstring-section-indent
-# pylint: disable=docstring-trailing-quotes
-
-"""Allow creation of uart/console interface via usb google serial endpoint."""
-
-# Note: This is a py2/3 compatible file.
-
-from __future__ import print_function
-import argparse
-import array
-import os
-import sys
-import termios
-import threading
-import time
-import traceback
-import tty
-try:
- import usb
-except:
- print("import usb failed")
- print("try running these commands:")
- print(" sudo apt-get install python-pip")
- print(" sudo pip install --pre pyusb")
- print()
- sys.exit(-1)
-
-import six
-
-
-def GetBuffer(stream):
- if six.PY3:
- return stream.buffer
- return stream
-
-
-"""Class Susb covers USB device discovery and initialization.
-
- It can find a particular endpoint by vid:pid, serial number,
- and interface number.
-"""
-
-class SusbError(Exception):
- """Class for exceptions of Susb."""
- def __init__(self, msg, value=0):
- """SusbError constructor.
-
- Args:
- msg: string, message describing error in detail
- value: integer, value of error when non-zero status returned. Default=0
- """
- super(SusbError, self).__init__(msg, value)
- self.msg = msg
- self.value = value
-
-class Susb():
- """Provide USB functionality.
-
- Instance Variables:
- _read_ep: pyUSB read endpoint for this interface
- _write_ep: pyUSB write endpoint for this interface
- """
- READ_ENDPOINT = 0x81
- WRITE_ENDPOINT = 0x1
- TIMEOUT_MS = 100
-
- def __init__(self, vendor=0x18d1,
- product=0x500f, interface=1, serialname=None):
- """Susb constructor.
-
- Discovers and connects to USB endpoints.
-
- Args:
- vendor : usb vendor id of device
- product : usb product id of device
- interface : interface number ( 1 - 8 ) of device to use
- serialname: string of device serialnumber.
-
- Raises:
- SusbError: An error accessing Susb object
- """
- # Find the device.
- dev_g = usb.core.find(idVendor=vendor, idProduct=product, find_all=True)
- dev_list = list(dev_g)
- if dev_list is None:
- raise SusbError("USB device not found")
-
- # Check if we have multiple devices.
- dev = None
- if serialname:
- for d in dev_list:
- dev_serial = "PyUSB doesn't have a stable interface"
- try:
- dev_serial = usb.util.get_string(d, 256, d.iSerialNumber)
- except:
- dev_serial = usb.util.get_string(d, d.iSerialNumber)
- if dev_serial == serialname:
- dev = d
- break
- if dev is None:
- raise SusbError("USB device(%s) not found" % (serialname,))
- else:
- try:
- dev = dev_list[0]
- except:
- try:
- dev = dev_list.next()
- except:
- raise SusbError("USB device %04x:%04x not found" % (vendor, product))
-
- # If we can't set configuration, it's already been set.
- try:
- dev.set_configuration()
- except usb.core.USBError:
- pass
-
- # Get an endpoint instance.
- cfg = dev.get_active_configuration()
- intf = usb.util.find_descriptor(cfg, bInterfaceNumber=interface)
- self._intf = intf
-
- if not intf:
- raise SusbError("Interface not found")
-
- # Detach raiden.ko if it is loaded.
- if dev.is_kernel_driver_active(intf.bInterfaceNumber) is True:
- dev.detach_kernel_driver(intf.bInterfaceNumber)
-
- read_ep_number = intf.bInterfaceNumber + self.READ_ENDPOINT
- read_ep = usb.util.find_descriptor(intf, bEndpointAddress=read_ep_number)
- self._read_ep = read_ep
-
- write_ep_number = intf.bInterfaceNumber + self.WRITE_ENDPOINT
- write_ep = usb.util.find_descriptor(intf, bEndpointAddress=write_ep_number)
- self._write_ep = write_ep
-
-
-"""Suart class implements a stream interface, to access Google's USB class.
-
- This creates a send and receive thread that monitors USB and console input
- and forwards them across. This particular class is hardcoded to stdin/out.
-"""
-
-class SuartError(Exception):
- """Class for exceptions of Suart."""
- def __init__(self, msg, value=0):
- """SuartError constructor.
-
- Args:
- msg: string, message describing error in detail
- value: integer, value of error when non-zero status returned. Default=0
- """
- super(SuartError, self).__init__(msg, value)
- self.msg = msg
- self.value = value
-
-
-class Suart():
- """Provide interface to serial usb endpoint."""
-
- def __init__(self, vendor=0x18d1, product=0x501c, interface=0,
- serialname=None):
- """Suart contstructor.
-
- Initializes USB stream interface.
-
- Args:
- vendor: usb vendor id of device
- product: usb product id of device
- interface: interface number of device to use
- serialname: Defaults to None.
-
- Raises:
- SuartError: If init fails
- """
- self._done = threading.Event()
- self._susb = Susb(vendor=vendor, product=product,
- interface=interface, serialname=serialname)
-
- def wait_until_done(self, timeout=None):
- return self._done.wait(timeout=timeout)
-
- def run_rx_thread(self):
- try:
- while True:
- try:
- r = self._susb._read_ep.read(64, self._susb.TIMEOUT_MS)
- if r:
- GetBuffer(sys.stdout).write(r.tostring())
- GetBuffer(sys.stdout).flush()
-
- except Exception as e:
- # If we miss some characters on pty disconnect, that's fine.
- # ep.read() also throws USBError on timeout, which we discard.
- if not isinstance(e, (OSError, usb.core.USBError)):
- print("rx %s" % e)
- finally:
- self._done.set()
-
- def run_tx_thread(self):
- try:
- while True:
- try:
- r = GetBuffer(sys.stdin).read(1)
- if not r or r == b"\x03":
- break
- if r:
- self._susb._write_ep.write(array.array('B', r),
- self._susb.TIMEOUT_MS)
- except Exception as e:
- print("tx %s" % e)
- finally:
- self._done.set()
-
- def run(self):
- """Creates pthreads to poll USB & PTY for data.
- """
- self._exit = False
-
- self._rx_thread = threading.Thread(target=self.run_rx_thread)
- self._rx_thread.daemon = True
- self._rx_thread.start()
-
- self._tx_thread = threading.Thread(target=self.run_tx_thread)
- self._tx_thread.daemon = True
- self._tx_thread.start()
-
-
-
-"""Command line functionality
-
- Allows specifying vid:pid, serialnumber, interface.
- Ctrl-C exits.
-"""
-
-parser = argparse.ArgumentParser(description="Open a console to a USB device")
-parser.add_argument('-d', '--device', type=str,
- help="vid:pid of target device", default="18d1:501c")
-parser.add_argument('-i', '--interface', type=int,
- help="interface number of console", default=0)
-parser.add_argument('-s', '--serialno', type=str,
- help="serial number of device", default="")
-parser.add_argument('-S', '--notty-exit-sleep', type=float, default=0.2,
- help="When stdin is *not* a TTY, wait this many seconds after EOF from "
- "stdin before exiting, to give time for receiving a reply from the USB "
- "device.")
-
-
-def runconsole():
- """Run the usb console code
-
- Starts the pty thread, and idles until a ^C is caught.
- """
- args = parser.parse_args()
-
- vidstr, pidstr = args.device.split(':')
- vid = int(vidstr, 16)
- pid = int(pidstr, 16)
-
- serialno = args.serialno
- interface = args.interface
-
- sobj = Suart(vendor=vid, product=pid, interface=interface,
- serialname=serialno)
- if sys.stdin.isatty():
- tty.setraw(sys.stdin.fileno())
- sobj.run()
- sobj.wait_until_done()
- if not sys.stdin.isatty() and args.notty_exit_sleep > 0:
- time.sleep(args.notty_exit_sleep)
-
-
-def main():
- stdin_isatty = sys.stdin.isatty()
- if stdin_isatty:
- fd = sys.stdin.fileno()
- os.system("stty -echo")
- old_settings = termios.tcgetattr(fd)
-
- try:
- runconsole()
- finally:
- if stdin_isatty:
- termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
- os.system("stty echo")
- # Avoid having the user's shell prompt start mid-line after the final output
- # from this program.
- print()
-
-
-if __name__ == '__main__':
- main()