From d1e1f87dd72650f5265f33b3f69ca8aa4d063162 Mon Sep 17 00:00:00 2001 From: Vadim Bendebury Date: Fri, 28 Feb 2020 18:44:56 -0800 Subject: Bring in acroterm from the Dauntless project. acroterm.py is a copy taken as if from Acropora core tree at sha 80bf39f and modified to add the Chromium OS authors headers. BUG=b:149964350 TEST=./util/test_acroterm.py Change-Id: I48e866e205ef62f6e776e6c50b0f970c9df5202a Signed-off-by: Vadim Bendebury Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/ec/+/2114240 Reviewed-by: Mary Ruthven --- util/acroterm.py | 1102 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1102 insertions(+) create mode 100755 util/acroterm.py (limited to 'util/acroterm.py') diff --git a/util/acroterm.py b/util/acroterm.py new file mode 100755 index 0000000000..5b79e059e0 --- /dev/null +++ b/util/acroterm.py @@ -0,0 +1,1102 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*-" +# Copyright 2020 The Chromium OS Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. +"""acroterm.py - Terminal program for Acropora RTOS + +Loosely based on miniterm.py from PySerial. + +Eventually: +- Switch to/from binary cmsg() mode +- Local console history/editing +- Integrate with coverage tool? +""" + +import argparse +import atexit +import fcntl +import glob +import struct +import subprocess +import sys +import termios +import threading + +import serial + +# ------------------------------------------------------------------------------ + +def fatal(desc): + """Print an error and exit.""" + sys.stderr.write('\nacroterm error: %s\n' % desc) + sys.exit(1) + + +def notice(desc): + """Print a notification to stderr.""" + sys.stderr.write('\nacroterm: %s\n' % desc) + + +def crc8(buf): + """CRC-8""" + c = 0 + for d in buf: + c ^= d << 8 + for _ in range(8): + if c & 0x8000: + c ^= (0x1070 << 3) + c <<= 1 + return c >> 8 + +# ------------------------------------------------------------------------------ + +class PacketError(Exception): + """Base packet error exception. + + TODO(vbendeb): should this be rolled into PacketDataLenError()? + """ + def __init__(self, packet): + super().__init__() + self.packet = packet + + +class PacketDataLenError(PacketError): + """Exception to throw if packet length error is detected""" + def __init__(self, packet, count): + super().__init__(packet) + self.count = count + +# Constants from enum cmsg_channel +CMSG_CHAN_DEFAULT = 0 +CMSG_CHAN_TASK_CMSG = 1 +CMSG_CHAN_SYSTEM = 2 +CMSG_CHAN_INTERRUPT = 3 +CMSG_CHAN_INIT = 4 +CMSG_CHAN_EXCEPTION = 5 +CMSG_CHAN_CUSTOM = 0x40 +CMSG_CHAN_CUSTOM_LAST = 0x7f +CMSG_CHAN_FLAG_TASK = 0x80 +CMSG_CHAN_FLAG_ASYNC = 0x40 +CMSG_CHAN_FLAG_SYSCALL = 0x20 +CMSG_CHAN_MASK_TASK_ID = 0x1f + +# Constants from enum syscall_cmsg_format +CMSG_FORMAT_DONE = 0 +CMSG_FORMAT_SIGNED = 1 +CMSG_FORMAT_UNSIGNED = 2 +CMSG_FORMAT_HEX = 3 +CMSG_FORMAT_CHAR = 4 +CMSG_FORMAT_ERR = 5 +CMSG_FORMAT_STRING = 6 +CMSG_FORMAT_64BIT = 7 +CMSG_FORMAT_BUFFER_BASED = 12 +CMSG_FORMAT_BUF_STRUCT = CMSG_FORMAT_BUFFER_BASED +CMSG_FORMAT_BUF_STRING = 13 +CMSG_FORMAT_BUF_BYTES = 14 +CMSG_FORMAT_BUF_WORDS = 15 +# 64-bit pseudo-formats +CMSG_FORMAT_SIGNED64 = 100 + CMSG_FORMAT_SIGNED +CMSG_FORMAT_UNSIGNED64 = 100 + CMSG_FORMAT_UNSIGNED +CMSG_FORMAT_HEX64 = 100 + CMSG_FORMAT_HEX +CMSG_FORMAT_TIME64 = 100 + CMSG_FORMAT_CHAR + +class Packet(object): + """Console output packet from acropora""" + + MAGIC = 0xc0 + END_MAGIC = 0xc1 + DATA_LEN_OFFSET = 10 + CRC_OFFSET = 12 + HEADER_LEN = 13 + + # Dict of struct handlers indexed by type. Add with set_struct_handler(). + # Handlers take (struct type index, bytearray) and return string. + struct_handlers = {} + + @staticmethod + def default_struct_handler(stype, buf): + """Default struct handler""" + return 'BadStruct#%d(%d)' % (stype, len(buf)) + + @staticmethod + def set_struct_handler(stype, handler): + """Assign structure handler for a structure type""" + Packet.struct_handlers[stype] = handler + + def __init__(self): + self.reset() + self.decoded = '' + self.errors = [] + self.next_seq = None + self.last_timestamp = 0 + + def reset(self): + """Reset the packet state.""" + self.data = bytearray() + self.expect_len = 0 + + def get_decoded(self): + """Return the last decoded packet, or an empty string if none.""" + d = self.decoded + self.decoded = '' + return d + + def get_errors(self): + """Return the last errors or an empty list if none.""" + e = self.errors + self.errors = [] + return e + + def add_byte(self, b): + """Add a byte to the packet. Returns True if the byte was consumed.""" + + if not self.expect_len: + # Not in a packet + if b != Packet.MAGIC: + return False + + # Now starting a packet + self.expect_len = Packet.HEADER_LEN + + self.data.append(b) + + if len(self.data) == self.expect_len: + if self.expect_len == Packet.HEADER_LEN: + if not self.validate_header(): + self.reset() + else: + self.decode_packet() + self.reset() + + return True + + def next_data(self, count): + """Returns the next bytes of data as a bytearray.""" + + if len(self.data) < count: + raise PacketDataLenError(self, count) + + d = self.data[:count] + self.data = self.data[count:] + return d + + def validate_header(self): + """Validate the packet header. + + Returns: + True if there is more data needed. + """ + if self.expect_len != Packet.HEADER_LEN: + return False + + if self.data[Packet.CRC_OFFSET] != crc8(self.data[:Packet.CRC_OFFSET]): + print('Bad packet') + return False + + data_len = struct.unpack('=H', self.data[Packet.DATA_LEN_OFFSET: + Packet.DATA_LEN_OFFSET + 2])[0] + if not data_len: + # No data; just header + self.decode_packet() + return False + + self.expect_len += data_len + 1 # +1 for packet end + return True + + def decode_param(self, fmt, param): + """Decode one param and return as string""" + dout = '' + + # TODO: maybe tidier to pass param in as a bytearray and unpack it here? + if fmt == CMSG_FORMAT_SIGNED: + if param > 1 << 31: + param -= 1 << 32 + dout += '%d' % param + elif fmt == CMSG_FORMAT_SIGNED64: + if param > 1 << 63: + param -= 1 << 64 + dout += '%d' % param + elif fmt == CMSG_FORMAT_UNSIGNED or fmt == CMSG_FORMAT_UNSIGNED64: + dout += '%u' % param + elif fmt == CMSG_FORMAT_HEX: + dout += '0x%08x' % param + elif fmt == CMSG_FORMAT_HEX64: + dout += '0x%016x' % param + elif fmt == CMSG_FORMAT_CHAR: + dout += "'%c'" % param + elif fmt == CMSG_FORMAT_TIME64: + if param == (1 << 64) - 1: + dout += '(FOREVER)' + else: + dout += '%d.%06d' % (param // 1000000, param % 1000000) + elif fmt == CMSG_FORMAT_ERR: + err_type = param >> 30 + if param == 0: + dout += 'ErrNone' + elif err_type < 2: + fileno = (param >> 19) & 0x7ff + lineno = (param >> 8) & 0x7ff # Or instance + dout += 'Err#%d' % (param & 0xff) + if fileno: + dout += ':File#%d' % fileno + if lineno: + dout += ':%s#%d' % ( + 'Instance' if err_type else 'Line', lineno) + elif err_type == 2: + err_subtype = (param >> 28) & 0x03 + if err_subtype == 0: + dout += 'ErrSub#%d:Code#%d' % ( + (param >> 16) & 0xfff, param & 0xffff) + else: + dout += 'ErrReserved#%08x' % param + else: + dout += 'ErrLegacy0x%08x' % (param & 0x3fffffff) + elif fmt in (CMSG_FORMAT_STRING, CMSG_FORMAT_BUF_STRING, + CMSG_FORMAT_BUF_STRUCT, CMSG_FORMAT_BUF_BYTES, + CMSG_FORMAT_BUF_WORDS): + size = param & 0xffff + if size == 0xffff: + dout += ('(BadStrPtr)' if fmt == CMSG_FORMAT_STRING else + '(bad size/offs)') + else: + buf = self.next_data(size) + if fmt == CMSG_FORMAT_STRING or fmt == CMSG_FORMAT_BUF_STRING: + dout += buf.decode(encoding='utf-8', errors='replace') + elif fmt == CMSG_FORMAT_BUF_STRUCT: + stype = param >> 16 + handler = Packet.struct_handlers.get( + stype, Packet.default_struct_handler) + dout += handler(stype, buf) + elif fmt == CMSG_FORMAT_BUF_BYTES: + dout += ' '.join('%02x' % x for x in buf) + elif fmt == CMSG_FORMAT_BUF_WORDS: + count = (param & 0xffff) // 4 + words = struct.unpack('=%dL' % count, buf) + dout += ' '.join('%08x' % x for x in words) + else: + if fmt > 100: + dout += 'BadFormatL%d' % (fmt - 100) + else: + dout += 'BadFormat%d' % fmt + + return dout + + def decode_packet(self): + """Decode a packet, now that it's all shown up.""" + + # Decode header + header = self.next_data(Packet.HEADER_LEN) + (b1, channel, const_str_len, time_lo, time_hi, + data_len) = struct.unpack('=xBBBLHHx', header) + sequence = b1 & 0x0f + sender_dropped = b1 & 0x10 + param_count = b1 >> 5 + timestamp = time_hi << 32 | time_lo + + # If timestamp decreased, board rebooted + if timestamp < self.last_timestamp: + # Reboot will restart the sequence at 0 + self.next_seq = 0 + self.last_timestamp = timestamp + + # Flag dropped packets + if sender_dropped: + self.errors.append('(sender dropped packet(s))') + if self.next_seq is not None and sequence != self.next_seq: + self.errors.append('(missing packet(s)); got %d expect %d' % + (sequence, self.next_seq)) + self.next_seq = (sequence + 1) % 16 + + self.decoded += '[%d.%06d/' % ( + timestamp // 1000000, timestamp % 1000000) + if channel == CMSG_CHAN_DEFAULT: + self.decoded += '??' + elif channel == CMSG_CHAN_INTERRUPT: + self.decoded += 'I.' + elif channel == CMSG_CHAN_INIT: + self.decoded += 'i.' + elif channel == CMSG_CHAN_EXCEPTION: + self.decoded += 'E.' + elif channel & CMSG_CHAN_TASK_CMSG: + if channel & CMSG_CHAN_FLAG_SYSCALL: + self.decoded += 'A' if channel & CMSG_CHAN_FLAG_ASYNC else 'T' + else: + self.decoded += 'a' if channel & CMSG_CHAN_FLAG_ASYNC else 't' + self.decoded += '%d' % (channel & CMSG_CHAN_MASK_TASK_ID) + else: + self.decoded += '%02x' % channel + self.decoded += ']' + + # Flag (but print) bad data; header is still fine + if data_len and ((not self.data) or self.data[-1] != Packet.END_MAGIC): + self.errors.append( + '(packet data missing end magic; may be corrupt)') + + # Decode data + const_str = '' + param_decoded = [] + try: + if const_str_len: + const_str = self.next_data(const_str_len).decode( + encoding='utf-8', errors='replace') + + if param_count: + # Unpack format nibbles + formats = [] + fbuf = self.next_data((param_count + 1) // 2) + for f in fbuf: + formats += [f & 0xf, f >> 4] + + # Unpack params + params = struct.unpack('=' + ('L' * param_count), + self.next_data(4 * param_count)) + + p = 0 + while p < param_count: + param = params[p] + fmt = formats[p] + p += 1 + if fmt == CMSG_FORMAT_64BIT: + param |= params[p] << 32 + fmt = formats[p] + 100 # TODO: define constant + p += 1 + + param_decoded.append(self.decode_param(fmt, param)) + + except PacketDataLenError: + print(' (bad len)') + + # Handle format chars inside the decoded string + # TODO: use regex rather than scanning a char at a time + if const_str: + self.decoded += ' ' + + const_chars = [c for c in const_str] + while const_chars: + c = const_chars.pop(0) + if c == '$': + if param_decoded: + self.decoded += param_decoded.pop(0) + elif c == '%': + if const_chars: + self.decoded += const_chars.pop(0) + else: + self.decoded += c + + # Consume remaining params + if param_decoded: + self.decoded += ' ' + ' '.join(param_decoded) + + # Clear packet for next time + self.reset() + +# ------------------------------------------------------------------------------ +# Packet struct handlers + +CMSG_STRUCT_PMP_CSRS = 0 +CMSG_STRUCT_MGPSCRATCH_CSRS = 1 +CMSG_STRUCT_EXCEPTION_FRAME = 2 +CMSG_STRUCT_COVERAGE_COUNTERS = 3 +CMSG_STRUCT_TASK_PRINT_ONE = 4 +CMSG_STRUCT_SHMEM_PRINT_ONE = 5 +CMSG_STRUCT_64BIT_POINTER = 6 + +def handle_struct_pmp_csrs(_, buf): + """Format for printing pmp_csrs acropora structure""" + fields = struct.unpack('=4L16L', buf) + pmpcfg = fields[0:4] + pmpaddr = fields[4:20] + out = '\n' + + for i in range(0, 16, 4): + out += 'pmp%02d-%02d %08x: ' % (i, i + 3, pmpcfg[i // 4]) + out += '%08x-%08x %08x-%08x\n' % pmpaddr[i : i + 4] + + for i in range(16): + cfg = pmpcfg[i >> 2] >> (8 * (i & 3)) + if cfg == 0: + continue + + out += 'pmp%-2d: %02x %s%s%s%s' % (i, cfg, + 'R' if (cfg & (1 << 0)) else '-', + 'W' if (cfg & (1 << 1)) else '-', + 'X' if (cfg & (1 << 2)) else '-', + 'L' if (cfg & (1 << 7)) else '-') + cfg_type = (cfg >> 3) & 3 + if cfg_type == 1: # TOR + out += '%08x - %08x' % (pmpaddr[i - 1] if i else 0, pmpaddr[i]) + elif cfg_type == 2: # NA4 + out += '%08x' % pmpaddr[i] + elif cfg_type == 3: # NAPOT + addr = pmpaddr[i] + out += '%08x size %08x' % ((addr & (addr + 1)) << 2, + (addr ^ (addr + 1) + 1) << 2) + else: + out += '-' + + out += '\n' + + return out + +Packet.set_struct_handler(CMSG_STRUCT_PMP_CSRS, handle_struct_pmp_csrs) + + +def handle_struct_mgpscratch_csrs(_, buf): + """Format for printing mgpscratch_csrs acropora structure""" + fields = struct.unpack('=16L', buf) + out = '\n' + + for i in range(0, 16, 4): + out += 'gen%02d-%02d ' % (i, i + 3) + out += '%08x %08x %08x %08x\n' % fields[i : i + 4] + + return out + +Packet.set_struct_handler(CMSG_STRUCT_MGPSCRATCH_CSRS, + handle_struct_mgpscratch_csrs) + + +mcause_desc = { + 0:'Instruction address misaligned', + 1:'Instruction access fault', + 2:'Illegal instruction', + 3:'Breakpoint', + 4:'Load address misaligned', + 5:'Load access fault', + 6:'Store/AMO address misaligned', + 7:'Store/AMO access fault', + 8:'Environment call from U-mode', + 9:'Environment call from S-mode', + 10:'NMI', + 11:'Environment call from M-mode', + 12:'Instruction page fault', + 13:'Load page fault', + 14:'Reserved', + 15:'Store/AMO page fault', + 48:'Watchdog' +} + +def handle_struct_exception_frame(_, buf): + """Format for printing exception_frame acropora structure""" + out = '' + + fieldvals = struct.unpack('=14L12L7L8LL', buf) + + fieldnames = ['gp', 'tp', 'sp', 'mcause', 'mepc', 'mtval', 'mstatus', + 'mscratch', 'mie', 'mip', 'mtvec', 'mnmivec', 'trapflgs', + 'reserved0'] + fieldnames += ['s%d' % i for i in range(12)] + fieldnames += ['t%d' % i for i in range(7)] + fieldnames += ['a%d' % i for i in range(8)] + fieldnames += ['ra'] + fields = dict(zip(fieldnames, fieldvals)) + + mstatus = fields['mstatus'] + out += '%s-MODE EXCEPTION ' % ('M' if (mstatus & (3 << 11)) else 'U') + + mcause = fields['mcause'] + out += '%08x: %s\n' % (mcause, mcause_desc.get(mcause, '?')) + + fieldlines = (('s0', 'gp', 'mstatus', 'mie'), + ('s1', 'ra', 'mepc', 'mip'), + ('s2', 'sp', 'mtvec', 'mnmivec'), + ('s3', 'tp', 'mtval', 'mscratch'), + ('s4', 'a0', 't0', 'trapflgs'), + ('s5', 'a1', 't1'), + ('s6', 'a2', 't2'), + ('s7', 'a3', 't3'), + ('s8', 'a4', 't4'), + ('s9', 'a5', 't5'), + ('s10', 'a6', 't6'), + ('s11', 'a7')) + fieldlinelen = (3, 3, 7, 8) + + for fline in fieldlines: + fl = zip(fline, fieldlinelen) + out += ' '.join('%-*s %08x' % + (l, f, fields[f]) for f, l in fl) + '\n' + + return out + +Packet.set_struct_handler(CMSG_STRUCT_EXCEPTION_FRAME, + handle_struct_exception_frame) + + +def handle_struct_coverage_counters(_, buf): + """Format for printing coverage_counters acropora structure""" + # TODO: this should just write them to coverage directly; no need to parse + # later. + count = len(buf) // 8 + counters = struct.unpack('=%dQ' % count, buf) + return ' '.join(str(c) for c in counters) + +Packet.set_struct_handler(CMSG_STRUCT_COVERAGE_COUNTERS, + handle_struct_coverage_counters) + + +def handle_struct_task_print_one(_, buf): + """Format for printing task_print_one acropora structure""" + fieldvals = struct.unpack('=LLLLQLLLL16c', buf) + fieldnames = ['flags', 'events', 'events_enabled', 'events_async_enabled', + 'wake_time', 'mepc', 'msg_head', 'msg_tail', 'msg_queue_mask', + 'name'] + fields = dict(zip(fieldnames, fieldvals)) + out = '' + + flags = fields['flags'] + out += 'R' if (flags & (1 << 0)) else 's' + out += 'T' if (flags & (1 << 1)) else '.' + + out += ' %(mepc)08x %(events)08x' % fields + out += ' %(events_enabled)08x %(events_async_enabled)08x' % fields + out += ' %(msg_head)04x %(msg_tail)04x %(msg_queue_mask)04x' % fields + + wake_time = fields['wake_time'] + if wake_time == 0xffffffffffffffff: + out += ' (FOREVER)' + else: + out += '%7d.%06d' % (wake_time // 1000000, wake_time % 1000000) + + out += ' ' + fields['name'].decode() + return out + +Packet.set_struct_handler(CMSG_STRUCT_TASK_PRINT_ONE, + handle_struct_task_print_one) + + +def handle_struct_shmem_print_one(_, buf): + """Format for printing shmem_print _one acropora structure""" + fieldvals = struct.unpack('=LLLLLBBBB', buf) + fieldnames = ['addr', 'max_size', 'allocated_size', 'allocated_owner_mask', + 'flags', 'handle', 'owner', 'requested_owner', 'map_count'] + fields = dict(zip(fieldnames, fieldvals)) + out = '' + + out += '%(handle)2d %(flags)02x %(addr)08x' % fields + out += ' %(allocated_size)6d/%(max_size)6d m%(map_count)d' % fields + + if fields['owner'] != 32: # SHMEM_OWNER_NONE + out += ' o%d' % fields['owner'] + if fields['requested_owner'] != 32: # SHMEM_OWNER_NONE + out += '->%d' % fields['requested_owner'] + + return out + +Packet.set_struct_handler(CMSG_STRUCT_SHMEM_PRINT_ONE, + handle_struct_shmem_print_one) + +def handle_struct_64bit_pointer(_, buf): + """Unpack and convert to text a 64 bit value""" + fields = struct.unpack('=Q', buf) + + return '0x%16x' % fields + +Packet.set_struct_handler(CMSG_STRUCT_64BIT_POINTER, + handle_struct_64bit_pointer) + +# ------------------------------------------------------------------------------ + +class Console(object): + """OS abstraction for console (input/output codec, no echo)""" + + def __init__(self): + self.fd = sys.stdin.fileno() + self.old = termios.tcgetattr(self.fd) + atexit.register(self.cleanup) + + def setup(self): + """Set console to read single characters, no echo""" + attr = termios.tcgetattr(self.fd) + attr[3] = attr[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG + attr[6][termios.VMIN] = 1 + attr[6][termios.VTIME] = 0 + termios.tcsetattr(self.fd, termios.TCSANOW, attr) + + def cleanup(self): + """Restore default console settings""" + termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old) + + @staticmethod + def getkey(): + """Read a single key from the console""" + while True: + try: + c = sys.stdin.read(1) + if c == '\x7f': + c = '\x08' # Map the BS key (which yields DEL) to backspace + return c + except IOError as e: + # When the programming command is running, it can eat stdin. + if e.errno == 11: # Resource temporarily unavailable + pass + else: + raise + @staticmethod + def write(text): + """Write string""" + sys.stdout.write(text) + sys.stdout.flush() + + def cancel(self): + """Cancel getkey operation""" + fcntl.ioctl(self.fd, termios.TIOCSTI, b'\0') + + def __enter__(self): + self.cleanup() + return self + + def __exit__(self, *args, **kwargs): + self.setup() + +# ------------------------------------------------------------------------------ + +class Acroterm(object): + """Acropora terminal class. + + Receives characters from a serial interface or a program, passes them + through a packet processing and/or directly sends them to the console + depending on the input stream contents. + """ + COLOR_DEFAULT = '\x1b[0m' + COLOR_BLACK = '\x1b[0;30m' + COLOR_RED = '\x1b[0;31m' + COLOR_GREEN = '\x1b[0;32m' + COLOR_BROWN = '\x1b[0;33m' + COLOR_BLUE = '\x1b[0;34m' + COLOR_PURPLE = '\x1b[0;35m' + COLOR_CYAN = '\x1b[0;36m' + COLOR_LGRAY = '\x1b[0;37m' + COLOR_DGRAY = '\x1b[1;30m' + COLOR_LRED = '\x1b[1;31m' + COLOR_LGREEN = '\x1b[1;32m' + COLOR_YELLOW = '\x1b[1;33m' + COLOR_LBLUE = '\x1b[1;34m' + COLOR_LPURPLE = '\x1b[1;35m' + COLOR_LCYAN = '\x1b[1;36m' + COLOR_LWHITE = '\x1b[1;37m' + COLOR_BOLD = '\x1b[1m' + COLOR_FAINT = '\x1b[2m' + COLOR_ITALIC = '\x1b[3m' + COLOR_UNDERLINE = '\x1b[4m' + COLOR_BLINK = '\x1b[5m' + COLOR_NEGATIVE = '\x1b[7m' + COLOR_CROSSED = '\x1b[9m' + + color_theme = { + 'none': {}, + 'light': {'default': COLOR_DEFAULT, 'normal': COLOR_BLUE, + 'error': COLOR_RED, 'not_packet': COLOR_GREEN, + 'program': COLOR_PURPLE}, + 'dark': {'default': COLOR_DEFAULT, 'normal': COLOR_LGREEN, + 'error': COLOR_LRED, 'not_packet': COLOR_LBLUE, + 'program': COLOR_PURPLE}, + 'mono': {'default': COLOR_DEFAULT, 'normal': COLOR_BOLD, + 'error': COLOR_NEGATIVE, 'not_packet': COLOR_FAINT, + 'program': COLOR_ITALIC} + } + + def color(self, vtype): + """Return text color according to the color scheme. + + Args: + vtype: A string, type of the text (see color_theme dictionary + above) + """ + # Fall back to default and then empty string + return self.color_themes.get(vtype, self.color_theme.get('default', '')) + + def __init__(self, args): + # Config + # TODO: maybe just save args? + self.exit_char = '\x03' # Ctrl+C + self.exit_sequence = args.remote_exit + self.exit_seq_fail = args.remote_fail + self.log_filename = args.log + self.cmd_filter = args.cmd_filter + self.timeout_secs = args.timeout + self.coverage_filter = args.coverage_filter + self.coverage_filter_active = False + + # Other state + self.packet = Packet() + self.current_line = '' + self.exit_code = 0 + self.alive = True + + # Set up the console + self.console = Console() + self.console.setup() + + # Start logging + if self.log_filename: + self.log_file = open(self.log_filename, 'wb') + else: + self.log_file = None + + # Colors for different display items + self.color_themes = self.color_theme.get(args.color, 'none') + + # Start serial thread + if args.tty: + notice('Starting on %s; ^C=exit' % args.tty) + self.serial = serial.Serial(port=args.tty, + baudrate=args.baud, + timeout=0) + self.rx_thread = threading.Thread(target=self.serial_reader, + name='rx') + self.rx_thread.daemon = True + self.rx_thread.start() + else: + self.serial = self.rx_thread = None + + # Start local input thread + self.tx_thread = threading.Thread(target=self.writer, name='tx') + self.tx_thread.daemon = True + self.tx_thread.start() + + # Start command process + if args.cmd: + notice('Running command...') + self.cmd_proc = subprocess.Popen( + args.cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + shell=True) + self.cmd_thread = threading.Thread(target=self.cmd_reader, + name='cmd') + self.cmd_thread.start() + else: + self.cmd_proc = self.cmd_thread = None + + # Start timeout if necessary + if self.timeout_secs > 0: + self.timer = threading.Timer(self.timeout_secs, + self.timeout_handler) + self.timer.start() + else: + self.timer = None + + def stop(self): + """Set flag to stop worker threads""" + self.alive = False + if self.timer: + self.timer.cancel() + if self.serial: + self.serial.cancel_read() + self.console.cancel() + if self.cmd_proc: + self.cmd_proc.terminate() + + def join_tx(self): + """Wait for the transmit thread to terminate""" + self.tx_thread.join() + + def join_all(self): + """Wait for all worker threads to terminate""" + self.join_tx() + + if self.serial: + self.serial.cancel_read() + if self.rx_thread: + self.rx_thread.join() + if self.cmd_thread: + self.cmd_thread.join() + + def close(self): + """Close the terminal session""" + if self.serial: + self.serial.close() + if self.log_file: + self.log_file.close() + + def process_line(self, line): + """Process a line of text for special sequences.""" + # TODO: Those really should be special packets, not just strings, now + # that we can send packets... + + if self.exit_sequence and self.exit_sequence in line: + notice('Exit from remote') + self.stop() + elif self.exit_seq_fail and self.exit_seq_fail in line: + notice('Exit from remote - failure!') + self.exit_code = 1 + self.stop() + elif (self.coverage_filter and + '***Dumping coverage***' in line): + self.coverage_filter_active = True + self.console.write('\nSaving') + elif self.coverage_filter_active: + if ':COV:' in line: + self.console.write('.') + if '***Coverage end***' in line: + self.coverage_filter_active = False + self.console.write('done.\n') + + def process_output(self, data): + """Process chunk of received data. + + Could come over serial line or from a program, depending on how this + script was invoked. + + If inside the packet - send the data to the packet handler, otherwise + print it on the console. + + Args: + data: a byte array, the received chunk. + """ + # Scan for magic sequences + for c in data: + + # If processing a packet or line requested exit, stop now + if not self.alive: + break + + # Handle packets + if self.packet.add_byte(c): + e = self.packet.get_errors() + if e: + self.log_file.write(('\n'.join(e) + '\n').encode()) + self.console.write(self.color('error')) + self.console.write('\n'.join(e)) + self.console.write(self.color('default')+'\n') + d = self.packet.get_decoded() + if d: + self.log_file.write((d + '\n').encode()) + if not self.coverage_filter_active: + self.console.write(self.color('normal')) + self.console.write(d) + self.console.write(self.color('default')+'\n') + self.process_line(d) + continue + + # Note that unicode will only work *inside* packets. It can't + # work outside because Unicode code points overlap the packet + # start/end byte values. + c = chr(c) + + self.log_file.write(c.encode()) + + if not self.coverage_filter_active: + self.console.write(self.color('not_packet')) + self.console.write(c) + self.console.write(self.color('default')) + + if c in '\r\n': + self.process_line(self.current_line) + self.current_line = '' + else: + self.current_line += c + + def serial_reader(self): + """Loop and copy serial->console""" + + # TODO: option to dump data which is waiting at start (from a previous + # run) + + try: + while self.alive: + # Read all that is there or wait for one byte + data = self.serial.read(self.serial.in_waiting or 1) + if data: + self.process_output(data) + + except serial.SerialException: + self.stop() + raise + + def writer(self): + """Loop and copy console->serial until self.exit_char is received.""" + try: + while self.alive: + try: + c = self.console.getkey() + except KeyboardInterrupt: + c = self.exit_char + + # Could have died while waiting for key + if not self.alive: + break + + if c == self.exit_char: + notice('Exit from console') + self.stop() # exit app + break + + if self.serial: + self.serial.write(bytes(c, encoding='utf-8')) + else: + self.cmd_proc.stdin.write(bytes(c, encoding='utf-8')) + self.cmd_proc.stdin.flush() + except: + self.stop() + raise + + def cmd_reader(self): + """Shell out to programming tool and filter output.""" + + proc = self.cmd_proc + + if self.serial: + # Command is for programming, with output filtering + while True: + raw_line = proc.stdout.readline() + if raw_line: + line = raw_line.decode() + if self.cmd_filter and self.cmd_filter in line: + line = '#' + sys.stdout.write(self.color('program') + line + + self.color('default')) + sys.stdout.flush() + elif proc.poll() is not None: + break + else: + # Command is the program we want to run, with packet I/O + while self.alive: + data = proc.stdout.read1(1) + if data: + self.process_output(data) + else: + if proc.poll() is not None: + break + + self.cmd_proc = None + + notice('Command exited with code %d' % proc.returncode) + if not self.serial: + self.stop() + + def timeout_handler(self): + """Close terminal on timeout.""" + notice('Hit timeout') + self.stop() + +# ------------------------------------------------------------------------------ + +def main(): + """Main function. + + Parse command line arguments and start operation accordingly. + """ + parser = argparse.ArgumentParser(description='Acropora terminal') + + group = parser.add_argument_group('port settings') + + group.add_argument( + '--tty', + help=('TTY to use (can have wildcards), or "host" to run --cmd as ' + 'the build target(default=%(default)s)'), + default='/dev/ttyUltraTarget_*') + + group.add_argument( + '--baud', + help='Baud rate (default=%(default)s)', + type=int, + default=115200) + + group.add_argument( + '--target', + help='Run host executable instead of connecting to tty') + + group = parser.add_argument_group('exit settings') + + group.add_argument( + '--timeout', + help='Timeout in seconds (default=None)', + type=int, + default=0) + + group.add_argument( + '--remote-exit', + help='Exit if remote emits this string (default=%(default)s)', + metavar='MATCH', + default='***HANGUP***') + + group.add_argument( + '--remote-fail', + help='Exit with error if remote emits this string' + '(default=%(default)s)', + metavar='MATCH', + default='***HANGUP-FAIL***') + + group = parser.add_argument_group('logging') + + group.add_argument( + '--log', + help='Logfile (default=%(default)s)', + metavar='LOGFILE', + default='acroterm.log') + + group.add_argument( + '--color', + help='Set color theme - none/light/dark/bw (default=%(default)s)', + default='light') + + group.add_argument( + '--no-log', + help='Disable logfile', + action='store_true') + + group.add_argument( + '--no-cov-filter', + help='Display coverage output instead of just logging it', + dest='coverage_filter', + action='store_false') + + group = parser.add_argument_group('command on host') + + group.add_argument( + '--cmd', + help='Command to run after terminal starts') + + group.add_argument( + '--cmd-filter', + help='Filter programming output containing this (default=%(default)s)', + metavar='MATCH', + default='adr:') + + group.add_argument( + '--no-cmd-filter', + help='Do not filter programming tool output', + dest='cmd_filter', + action='store_const', const=None) + + args = parser.parse_args() + + # Look for exactly one matching TTY, unless running target on host + if args.tty == 'host': + args.tty = None + if not args.cmd: + fatal('Running as host but no command specified') + else: + ttylist = glob.glob(args.tty) + if not ttylist: + fatal('TTY %s not found' % args.tty) + elif len(ttylist) != 1: + fatal('Multiple matches for TTY %s found:\n%s' % + (args.tty, '\n'.join(ttylist))) + else: + args.tty = ttylist[0] + + # Disable logging if needed + if args.no_log: + args.log = None + + # Start the terminal + term = Acroterm(args) + + # Keep running until the other side exits + try: + term.join_tx() + except KeyboardInterrupt: + pass + + notice('Cleaning up') + term.join_all() + term.close() + + sys.exit(term.exit_code) + +if __name__ == '__main__': + main() -- cgit v1.2.1