summaryrefslogtreecommitdiff
path: root/extra/tigertool/ecusb/stm32uart.py
diff options
context:
space:
mode:
authorJack Rosenthal <jrosenth@chromium.org>2021-11-04 12:11:58 -0600
committerCommit Bot <commit-bot@chromium.org>2021-11-05 04:22:34 +0000
commit252457d4b21f46889eebad61d4c0a65331919cec (patch)
tree01856c4d31d710b20e85a74c8d7b5836e35c3b98 /extra/tigertool/ecusb/stm32uart.py
parent08f5a1e6fc2c9467230444ac9b582dcf4d9f0068 (diff)
downloadchrome-ec-stabilize-quickfix-14695.124.B-ish.tar.gz
In the interest of making long-term branch maintenance incur as little technical debt on us as possible, we should not maintain any files on the branch we are not actually using. This has the added effect of making it extremely clear when merging CLs from the main branch when changes have the possibility to affect us. The follow-on CL adds a convenience script to actually pull updates from the main branch and generate a CL for the update. BUG=b:204206272 BRANCH=ish TEST=make BOARD=arcada_ish && make BOARD=drallion_ish Signed-off-by: Jack Rosenthal <jrosenth@chromium.org> Change-Id: I17e4694c38219b5a0823e0a3e55a28d1348f4b18 Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/ec/+/3262038 Reviewed-by: Jett Rink <jettrink@chromium.org> Reviewed-by: Tom Hughes <tomhughes@chromium.org>
Diffstat (limited to 'extra/tigertool/ecusb/stm32uart.py')
-rw-r--r--extra/tigertool/ecusb/stm32uart.py248
1 files changed, 0 insertions, 248 deletions
diff --git a/extra/tigertool/ecusb/stm32uart.py b/extra/tigertool/ecusb/stm32uart.py
deleted file mode 100644
index 95219455a9..0000000000
--- a/extra/tigertool/ecusb/stm32uart.py
+++ /dev/null
@@ -1,248 +0,0 @@
-# Copyright 2017 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-#
-# Ignore indention messages, since legacy scripts use 2 spaces instead of 4.
-# pylint: disable=bad-indentation,docstring-section-indent
-# pylint: disable=docstring-trailing-quotes
-
-"""Allow creation of uart/console interface via stm32 usb endpoint."""
-
-from __future__ import print_function
-
-import os
-import select
-import sys
-import termios
-import threading
-import time
-import tty
-import usb
-
-from . import stm32usb
-
-
-class SuartError(Exception):
- """Class for exceptions of Suart."""
- def __init__(self, msg, value=0):
- """SuartError constructor.
-
- Args:
- msg: string, message describing error in detail
- value: integer, value of error when non-zero status returned. Default=0
- """
- super(SuartError, self).__init__(msg, value)
- self.msg = msg
- self.value = value
-
-
-class Suart(object):
- """Provide interface to stm32 serial usb endpoint."""
- def __init__(self, vendor=0x18d1, product=0x501a, interface=0,
- serialname=None, debuglog=False):
- """Suart contstructor.
-
- Initializes stm32 USB stream interface.
-
- Args:
- vendor: usb vendor id of stm32 device
- product: usb product id of stm32 device
- interface: interface number of stm32 device to use
- serialname: serial name to target. Defaults to None.
- debuglog: chatty output. Defaults to False.
-
- Raises:
- SuartError: If init fails
- """
- self._ptym = None
- self._ptys = None
- self._ptyname = None
- self._rx_thread = None
- self._tx_thread = None
- self._debuglog = debuglog
- self._susb = stm32usb.Susb(vendor=vendor, product=product,
- interface=interface, serialname=serialname)
- self._running = False
-
- def __del__(self):
- """Suart destructor."""
- self.close()
-
- def close(self):
- """Stop all running threads."""
- self._running = False
- if self._rx_thread:
- self._rx_thread.join(2)
- self._rx_thread = None
- if self._tx_thread:
- self._tx_thread.join(2)
- self._tx_thread = None
- self._susb.close()
-
- def run_rx_thread(self):
- """Background loop to pass data from USB to pty."""
- ep = select.epoll()
- ep.register(self._ptym, select.EPOLLHUP)
- try:
- while self._running:
- events = ep.poll(0)
- # Check if the pty is connected to anything, or hungup.
- if not events:
- try:
- r = self._susb._read_ep.read(64, self._susb.TIMEOUT_MS)
- if r:
- if self._debuglog:
- print(''.join([chr(x) for x in r]), end='')
- os.write(self._ptym, r)
-
- # If we miss some characters on pty disconnect, that's fine.
- # ep.read() also throws USBError on timeout, which we discard.
- except OSError:
- pass
- except usb.core.USBError:
- pass
- else:
- time.sleep(.1)
- except Exception as e:
- raise e
-
- def run_tx_thread(self):
- """Background loop to pass data from pty to USB."""
- ep = select.epoll()
- ep.register(self._ptym, select.EPOLLHUP)
- try:
- while self._running:
- events = ep.poll(0)
- # Check if the pty is connected to anything, or hungup.
- if not events:
- try:
- r = os.read(self._ptym, 64)
- # TODO(crosbug.com/936182): Remove when the servo v4/micro console
- # issues are fixed.
- time.sleep(0.001)
- if r:
- self._susb._write_ep.write(r, self._susb.TIMEOUT_MS)
-
- except OSError:
- pass
- except usb.core.USBError:
- pass
- else:
- time.sleep(.1)
- except Exception as e:
- raise e
-
- def run(self):
- """Creates pthreads to poll stm32 & PTY for data."""
- m, s = os.openpty()
- self._ptyname = os.ttyname(s)
-
- self._ptym = m
- self._ptys = s
-
- os.fchmod(s, 0o660)
-
- # Change the owner and group of the PTY to the user who started servod.
- try:
- uid = int(os.environ.get('SUDO_UID', -1))
- except TypeError:
- uid = -1
-
- try:
- gid = int(os.environ.get('SUDO_GID', -1))
- except TypeError:
- gid = -1
- os.fchown(s, uid, gid)
-
- tty.setraw(self._ptym, termios.TCSADRAIN)
-
- # Generate a HUP flag on pty slave fd.
- os.fdopen(s).close()
-
- self._running = True
-
- self._rx_thread = threading.Thread(target=self.run_rx_thread, args=[])
- self._rx_thread.daemon = True
- self._rx_thread.start()
-
- self._tx_thread = threading.Thread(target=self.run_tx_thread, args=[])
- self._tx_thread.daemon = True
- self._tx_thread.start()
-
- def get_uart_props(self):
- """Get the uart's properties.
-
- Returns:
- dict where:
- baudrate: integer of uarts baudrate
- bits: integer, number of bits of data Can be 5|6|7|8 inclusive
- parity: integer, parity of 0-2 inclusive where:
- 0: no parity
- 1: odd parity
- 2: even parity
- sbits: integer, number of stop bits. Can be 0|1|2 inclusive where:
- 0: 1 stop bit
- 1: 1.5 stop bits
- 2: 2 stop bits
- """
- return {
- 'baudrate': 115200,
- 'bits': 8,
- 'parity': 0,
- 'sbits': 1,
- }
-
- def set_uart_props(self, line_props):
- """Set the uart's properties.
-
- Note that Suart cannot set properties
- and will fail if the properties are not the default 115200,8n1.
-
- Args:
- line_props: dict where:
- baudrate: integer of uarts baudrate
- bits: integer, number of bits of data ( prior to stop bit)
- parity: integer, parity of 0-2 inclusive where
- 0: no parity
- 1: odd parity
- 2: even parity
- sbits: integer, number of stop bits. Can be 0|1|2 inclusive where:
- 0: 1 stop bit
- 1: 1.5 stop bits
- 2: 2 stop bits
-
- Raises:
- SuartError: If requested line properties are not the default.
- """
- curr_props = self.get_uart_props()
- for prop in line_props:
- if line_props[prop] != curr_props[prop]:
- raise SuartError('Line property %s cannot be set from %s to %s' % (
- prop, curr_props[prop], line_props[prop]))
- return True
-
- def get_pty(self):
- """Gets path to pty for communication to/from uart.
-
- Returns:
- String path to the pty connected to the uart
- """
- return self._ptyname
-
-
-def main():
- """Run a suart test with the default parameters."""
- try:
- sobj = Suart()
- sobj.run()
-
- # run() is a thread so just busy wait to mimic server.
- while True:
- # Ours sleeps to eleven!
- time.sleep(11)
- except KeyboardInterrupt:
- sys.exit(0)
-
-
-if __name__ == '__main__':
- main()