summaryrefslogtreecommitdiff
path: root/util/ec3po/console.py
diff options
context:
space:
mode:
Diffstat (limited to 'util/ec3po/console.py')
-rwxr-xr-xutil/ec3po/console.py2285
1 files changed, 1195 insertions, 1090 deletions
diff --git a/util/ec3po/console.py b/util/ec3po/console.py
index e71216e3f2..12b31baa60 100755
--- a/util/ec3po/console.py
+++ b/util/ec3po/console.py
@@ -1,5 +1,5 @@
#!/usr/bin/env python
-# Copyright 2015 The Chromium OS Authors. All rights reserved.
+# Copyright 2015 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
@@ -17,7 +17,6 @@ from __future__ import print_function
import argparse
import binascii
import ctypes
-from datetime import datetime
import logging
import os
import pty
@@ -26,26 +25,25 @@ import select
import stat
import sys
import traceback
+from datetime import datetime
import six
+from ec3po import interpreter, threadproc_shim
-from ec3po import interpreter
-from ec3po import threadproc_shim
-
-
-PROMPT = b'> '
+PROMPT = b"> "
CONSOLE_INPUT_LINE_SIZE = 80 # Taken from the CONFIG_* with the same name.
CONSOLE_MAX_READ = 100 # Max bytes to read at a time from the user.
LOOK_BUFFER_SIZE = 256 # Size of search window when looking for the enhanced EC
- # image string.
+# image string.
# In console_init(), the EC will print a string saying that the EC console is
# enabled. Enhanced images will print a slightly different string. These
# regular expressions are used to determine at reboot whether the EC image is
# enhanced or not.
-ENHANCED_IMAGE_RE = re.compile(br'Enhanced Console is enabled '
- br'\(v([0-9]+\.[0-9]+\.[0-9]+)\)')
-NON_ENHANCED_IMAGE_RE = re.compile(br'Console is enabled; ')
+ENHANCED_IMAGE_RE = re.compile(
+ rb"Enhanced Console is enabled " rb"\(v([0-9]+\.[0-9]+\.[0-9]+)\)"
+)
+NON_ENHANCED_IMAGE_RE = re.compile(rb"Console is enabled; ")
# The timeouts are really only useful for enhanced EC images, but otherwise just
# serve as a delay for non-enhanced EC images. Therefore, we can keep this
@@ -54,1118 +52,1225 @@ NON_ENHANCED_IMAGE_RE = re.compile(br'Console is enabled; ')
# EC image, we can increase the timeout for stability just in case it takes a
# bit longer to receive an ACK for some reason.
NON_ENHANCED_EC_INTERROGATION_TIMEOUT = 0.3 # Maximum number of seconds to wait
- # for a response to an
- # interrogation of a non-enhanced
- # EC image.
+# for a response to an
+# interrogation of a non-enhanced
+# EC image.
ENHANCED_EC_INTERROGATION_TIMEOUT = 1.0 # Maximum number of seconds to wait for
- # a response to an interrogation of an
- # enhanced EC image.
+# a response to an interrogation of an
+# enhanced EC image.
# List of modes which control when interrogations are performed with the EC.
-INTERROGATION_MODES = [b'never', b'always', b'auto']
+INTERROGATION_MODES = [b"never", b"always", b"auto"]
# Format for printing host timestamp
-HOST_STRFTIME="%y-%m-%d %H:%M:%S.%f"
+HOST_STRFTIME = "%y-%m-%d %H:%M:%S.%f"
class EscState(object):
- """Class which contains an enumeration for states of ESC sequences."""
- ESC_START = 1
- ESC_BRACKET = 2
- ESC_BRACKET_1 = 3
- ESC_BRACKET_3 = 4
- ESC_BRACKET_8 = 5
-
+ """Class which contains an enumeration for states of ESC sequences."""
-class ControlKey(object):
- """Class which contains codes for various control keys."""
- BACKSPACE = 0x08
- CTRL_A = 0x01
- CTRL_B = 0x02
- CTRL_D = 0x04
- CTRL_E = 0x05
- CTRL_F = 0x06
- CTRL_K = 0x0b
- CTRL_N = 0xe
- CTRL_P = 0x10
- CARRIAGE_RETURN = 0x0d
- ESC = 0x1b
+ ESC_START = 1
+ ESC_BRACKET = 2
+ ESC_BRACKET_1 = 3
+ ESC_BRACKET_3 = 4
+ ESC_BRACKET_8 = 5
-class Console(object):
- """Class which provides the console interface between the EC and the user.
-
- This class essentially represents the console interface between the user and
- the EC. It handles all of the console editing behaviour
-
- Attributes:
- logger: A logger for this module.
- controller_pty: File descriptor to the controller side of the PTY. Used for
- driving output to the user and receiving user input.
- user_pty: A string representing the PTY name of the served console.
- cmd_pipe: A socket.socket or multiprocessing.Connection object which
- represents the console side of the command pipe. This must be a
- bidirectional pipe. Console commands and responses utilize this pipe.
- dbg_pipe: A socket.socket or multiprocessing.Connection object which
- represents the console's read-only side of the debug pipe. This must be a
- unidirectional pipe attached to the intepreter. EC debug messages use
- this pipe.
- oobm_queue: A queue.Queue or multiprocessing.Queue which is used for out of
- band management for the interactive console.
- input_buffer: A string representing the current input command.
- input_buffer_pos: An integer representing the current position in the buffer
- to insert a char.
- partial_cmd: A string representing the command entered on a line before
- pressing the up arrow keys.
- esc_state: An integer represeting the current state within an escape
- sequence.
- line_limit: An integer representing the maximum number of characters on a
- line.
- history: A list of strings containing the past entered console commands.
- history_pos: An integer representing the current history buffer position.
- This index is used to show previous commands.
- prompt: A string representing the console prompt displayed to the user.
- enhanced_ec: A boolean indicating if the EC image that we are currently
- communicating with is enhanced or not. Enhanced EC images will support
- packed commands and host commands over the UART. This defaults to False
- until we perform some handshaking.
- interrogation_timeout: A float representing the current maximum seconds to
- wait for a response to an interrogation.
- receiving_oobm_cmd: A boolean indicating whether or not the console is in
- the middle of receiving an out of band command.
- pending_oobm_cmd: A string containing the pending OOBM command.
- interrogation_mode: A string containing the current mode of whether
- interrogations are performed with the EC or not and how often.
- raw_debug: Flag to indicate whether per interrupt data should be logged to
- debug
- output_line_log_buffer: buffer for lines coming from the EC to log to debug
- """
-
- def __init__(self, controller_pty, user_pty, interface_pty, cmd_pipe, dbg_pipe,
- name=None):
- """Initalises a Console object with the provided arguments.
+class ControlKey(object):
+ """Class which contains codes for various control keys."""
- Args:
- controller_pty: File descriptor to the controller side of the PTY. Used for
- driving output to the user and receiving user input.
- user_pty: A string representing the PTY name of the served console.
- interface_pty: A string representing the PTY name of the served command
- interface.
- cmd_pipe: A socket.socket or multiprocessing.Connection object which
- represents the console side of the command pipe. This must be a
- bidirectional pipe. Console commands and responses utilize this pipe.
- dbg_pipe: A socket.socket or multiprocessing.Connection object which
- represents the console's read-only side of the debug pipe. This must be a
- unidirectional pipe attached to the intepreter. EC debug messages use
- this pipe.
- name: the console source name
- """
- # Create a unique logger based on the console name
- console_prefix = ('%s - ' % name) if name else ''
- logger = logging.getLogger('%sEC3PO.Console' % console_prefix)
- self.logger = interpreter.LoggerAdapter(logger, {'pty': user_pty})
- self.controller_pty = controller_pty
- self.user_pty = user_pty
- self.interface_pty = interface_pty
- self.cmd_pipe = cmd_pipe
- self.dbg_pipe = dbg_pipe
- self.oobm_queue = threadproc_shim.Queue()
- self.input_buffer = b''
- self.input_buffer_pos = 0
- self.partial_cmd = b''
- self.esc_state = 0
- self.line_limit = CONSOLE_INPUT_LINE_SIZE
- self.history = []
- self.history_pos = 0
- self.prompt = PROMPT
- self.enhanced_ec = False
- self.interrogation_timeout = NON_ENHANCED_EC_INTERROGATION_TIMEOUT
- self.receiving_oobm_cmd = False
- self.pending_oobm_cmd = b''
- self.interrogation_mode = b'auto'
- self.timestamp_enabled = True
- self.look_buffer = b''
- self.raw_debug = False
- self.output_line_log_buffer = []
-
- def __str__(self):
- """Show internal state of Console object as a string."""
- string = []
- string.append('controller_pty: %s' % self.controller_pty)
- string.append('user_pty: %s' % self.user_pty)
- string.append('interface_pty: %s' % self.interface_pty)
- string.append('cmd_pipe: %s' % self.cmd_pipe)
- string.append('dbg_pipe: %s' % self.dbg_pipe)
- string.append('oobm_queue: %s' % self.oobm_queue)
- string.append('input_buffer: %s' % self.input_buffer)
- string.append('input_buffer_pos: %d' % self.input_buffer_pos)
- string.append('esc_state: %d' % self.esc_state)
- string.append('line_limit: %d' % self.line_limit)
- string.append('history: %r' % self.history)
- string.append('history_pos: %d' % self.history_pos)
- string.append('prompt: %r' % self.prompt)
- string.append('partial_cmd: %r'% self.partial_cmd)
- string.append('interrogation_mode: %r' % self.interrogation_mode)
- string.append('look_buffer: %r' % self.look_buffer)
- return '\n'.join(string)
-
- def LogConsoleOutput(self, data):
- """Log to debug user MCU output to controller_pty when line is filled.
-
- The logging also suppresses the Cr50 spinner lines by removing characters
- when it sees backspaces.
+ BACKSPACE = 0x08
+ CTRL_A = 0x01
+ CTRL_B = 0x02
+ CTRL_D = 0x04
+ CTRL_E = 0x05
+ CTRL_F = 0x06
+ CTRL_K = 0x0B
+ CTRL_N = 0xE
+ CTRL_P = 0x10
+ CARRIAGE_RETURN = 0x0D
+ ESC = 0x1B
- Args:
- data: bytes - string received from MCU
- """
- data = list(data)
- # For compatibility with python2 and python3, standardize on the data
- # being a list of integers. This requires one more transformation in py2
- if not isinstance(data[0], int):
- data = [ord(c) for c in data]
-
- # This is a list of already filtered characters (or placeholders).
- line = self.output_line_log_buffer
-
- # TODO(b/177480273): use raw strings here
- symbols = {
- ord(b'\n'): u'\\n',
- ord(b'\r'): u'\\r',
- ord(b'\t'): u'\\t'
- }
- # self.logger.debug(u'%s + %r', u''.join(line), ''.join(data))
- while data:
- # Recall, data is a list of integers, namely the byte values sent by
- # the MCU.
- byte = data.pop(0)
- # This means that |byte| is an int.
- if byte == ord('\n'):
- line.append(symbols[byte])
- if line:
- self.logger.debug(u'%s', ''.join(line))
- line = []
- elif byte == ord('\b'):
- # Backspace: trim the last character off the buffer
- if line:
- line.pop(-1)
- elif byte in symbols:
- line.append(symbols[byte])
- elif byte < ord(' ') or byte > ord('~'):
- # Turn any character that isn't printable ASCII into escaped hex.
- # ' ' is chr(20), and 0-19 are unprintable control characters.
- # '~' is chr(126), and 127 is DELETE. 128-255 are control and Latin-1.
- line.append(u'\\x%02x' % byte)
- else:
- # byte is printable. Thus it is safe to use chr() to get the printable
- # character out of it again.
- line.append(u'%s' % chr(byte))
- self.output_line_log_buffer = line
-
- def PrintHistory(self):
- """Print the history of entered commands."""
- fd = self.controller_pty
- # Make it pretty by figuring out how wide to pad the numbers.
- wide = (len(self.history) // 10) + 1
- for i in range(len(self.history)):
- line = b' %*d %s\r\n' % (wide, i, self.history[i])
- os.write(fd, line)
-
- def ShowPreviousCommand(self):
- """Shows the previous command from the history list."""
- # There's nothing to do if there's no history at all.
- if not self.history:
- self.logger.debug('No history to print.')
- return
-
- # Don't do anything if there's no more history to show.
- if self.history_pos == 0:
- self.logger.debug('No more history to show.')
- return
-
- self.logger.debug('current history position: %d.', self.history_pos)
-
- # Decrement the history buffer position.
- self.history_pos -= 1
- self.logger.debug('new history position.: %d', self.history_pos)
-
- # Save the text entered on the console if any.
- if self.history_pos == len(self.history)-1:
- self.logger.debug('saving partial_cmd: %r', self.input_buffer)
- self.partial_cmd = self.input_buffer
-
- # Backspace the line.
- for _ in range(self.input_buffer_pos):
- self.SendBackspace()
-
- # Print the last entry in the history buffer.
- self.logger.debug('printing previous entry %d - %s', self.history_pos,
- self.history[self.history_pos])
- fd = self.controller_pty
- prev_cmd = self.history[self.history_pos]
- os.write(fd, prev_cmd)
- # Update the input buffer.
- self.input_buffer = prev_cmd
- self.input_buffer_pos = len(prev_cmd)
-
- def ShowNextCommand(self):
- """Shows the next command from the history list."""
- # Don't do anything if there's no history at all.
- if not self.history:
- self.logger.debug('History buffer is empty.')
- return
-
- fd = self.controller_pty
-
- self.logger.debug('current history position: %d', self.history_pos)
- # Increment the history position.
- self.history_pos += 1
-
- # Restore the partial cmd.
- if self.history_pos == len(self.history):
- self.logger.debug('Restoring partial command of %r', self.partial_cmd)
- # Backspace the line.
- for _ in range(self.input_buffer_pos):
- self.SendBackspace()
- # Print the partially entered command if any.
- os.write(fd, self.partial_cmd)
- self.input_buffer = self.partial_cmd
- self.input_buffer_pos = len(self.input_buffer)
- # Now that we've printed it, clear the partial cmd storage.
- self.partial_cmd = b''
- # Reset history position.
- self.history_pos = len(self.history)
- return
-
- self.logger.debug('new history position: %d', self.history_pos)
- if self.history_pos > len(self.history)-1:
- self.logger.debug('No more history to show.')
- self.history_pos -= 1
- self.logger.debug('Reset history position to %d', self.history_pos)
- return
-
- # Backspace the line.
- for _ in range(self.input_buffer_pos):
- self.SendBackspace()
-
- # Print the newer entry from the history buffer.
- self.logger.debug('printing next entry %d - %s', self.history_pos,
- self.history[self.history_pos])
- next_cmd = self.history[self.history_pos]
- os.write(fd, next_cmd)
- # Update the input buffer.
- self.input_buffer = next_cmd
- self.input_buffer_pos = len(next_cmd)
- self.logger.debug('new history position: %d.', self.history_pos)
-
- def SliceOutChar(self):
- """Remove a char from the line and shift everything over 1 column."""
- fd = self.controller_pty
- # Remove the character at the input_buffer_pos by slicing it out.
- self.input_buffer = self.input_buffer[0:self.input_buffer_pos] + \
- self.input_buffer[self.input_buffer_pos+1:]
- # Write the rest of the line
- moved_col = os.write(fd, self.input_buffer[self.input_buffer_pos:])
- # Write a space to clear out the last char
- moved_col += os.write(fd, b' ')
- # Update the input buffer position.
- self.input_buffer_pos += moved_col
- # Reset the cursor
- self.MoveCursor('left', moved_col)
-
- def HandleEsc(self, byte):
- """HandleEsc processes escape sequences.
- Args:
- byte: An integer representing the current byte in the sequence.
+class Console(object):
+ """Class which provides the console interface between the EC and the user.
+
+ This class essentially represents the console interface between the user and
+ the EC. It handles all of the console editing behaviour
+
+ Attributes:
+ logger: A logger for this module.
+ controller_pty: File descriptor to the controller side of the PTY. Used for
+ driving output to the user and receiving user input.
+ user_pty: A string representing the PTY name of the served console.
+ cmd_pipe: A socket.socket or multiprocessing.Connection object which
+ represents the console side of the command pipe. This must be a
+ bidirectional pipe. Console commands and responses utilize this pipe.
+ dbg_pipe: A socket.socket or multiprocessing.Connection object which
+ represents the console's read-only side of the debug pipe. This must be a
+ unidirectional pipe attached to the intepreter. EC debug messages use
+ this pipe.
+ oobm_queue: A queue.Queue or multiprocessing.Queue which is used for out of
+ band management for the interactive console.
+ input_buffer: A string representing the current input command.
+ input_buffer_pos: An integer representing the current position in the buffer
+ to insert a char.
+ partial_cmd: A string representing the command entered on a line before
+ pressing the up arrow keys.
+ esc_state: An integer represeting the current state within an escape
+ sequence.
+ line_limit: An integer representing the maximum number of characters on a
+ line.
+ history: A list of strings containing the past entered console commands.
+ history_pos: An integer representing the current history buffer position.
+ This index is used to show previous commands.
+ prompt: A string representing the console prompt displayed to the user.
+ enhanced_ec: A boolean indicating if the EC image that we are currently
+ communicating with is enhanced or not. Enhanced EC images will support
+ packed commands and host commands over the UART. This defaults to False
+ until we perform some handshaking.
+ interrogation_timeout: A float representing the current maximum seconds to
+ wait for a response to an interrogation.
+ receiving_oobm_cmd: A boolean indicating whether or not the console is in
+ the middle of receiving an out of band command.
+ pending_oobm_cmd: A string containing the pending OOBM command.
+ interrogation_mode: A string containing the current mode of whether
+ interrogations are performed with the EC or not and how often.
+ raw_debug: Flag to indicate whether per interrupt data should be logged to
+ debug
+ output_line_log_buffer: buffer for lines coming from the EC to log to debug
"""
- # We shouldn't be handling an escape sequence if we haven't seen one.
- assert self.esc_state != 0
-
- if self.esc_state is EscState.ESC_START:
- self.logger.debug('ESC_START')
- if byte == ord('['):
- self.esc_state = EscState.ESC_BRACKET
- return
- else:
- self.logger.error('Unexpected sequence. %c', byte)
- self.esc_state = 0
-
- elif self.esc_state is EscState.ESC_BRACKET:
- self.logger.debug('ESC_BRACKET')
- # Left Arrow key was pressed.
- if byte == ord('D'):
- self.logger.debug('Left arrow key pressed.')
- self.MoveCursor('left', 1)
- self.esc_state = 0 # Reset the state.
- return
-
- # Right Arrow key.
- elif byte == ord('C'):
- self.logger.debug('Right arrow key pressed.')
- self.MoveCursor('right', 1)
- self.esc_state = 0 # Reset the state.
- return
-
- # Up Arrow key.
- elif byte == ord('A'):
- self.logger.debug('Up arrow key pressed.')
- self.ShowPreviousCommand()
- # Reset the state.
- self.esc_state = 0 # Reset the state.
- return
-
- # Down Arrow key.
- elif byte == ord('B'):
- self.logger.debug('Down arrow key pressed.')
- self.ShowNextCommand()
- # Reset the state.
- self.esc_state = 0 # Reset the state.
- return
-
- # For some reason, minicom sends a 1 instead of 7. /shrug
- # TODO(aaboagye): Figure out why this happens.
- elif byte == ord('1') or byte == ord('7'):
- self.esc_state = EscState.ESC_BRACKET_1
-
- elif byte == ord('3'):
- self.esc_state = EscState.ESC_BRACKET_3
-
- elif byte == ord('8'):
- self.esc_state = EscState.ESC_BRACKET_8
-
- else:
- self.logger.error(r'Bad or unhandled escape sequence. got ^[%c\(%d)',
- chr(byte), byte)
- self.esc_state = 0
- return
-
- elif self.esc_state is EscState.ESC_BRACKET_1:
- self.logger.debug('ESC_BRACKET_1')
- # HOME key.
- if byte == ord('~'):
- self.logger.debug('Home key pressed.')
- self.MoveCursor('left', self.input_buffer_pos)
- self.esc_state = 0 # Reset the state.
- self.logger.debug('ESC sequence complete.')
- return
-
- elif self.esc_state is EscState.ESC_BRACKET_3:
- self.logger.debug('ESC_BRACKET_3')
- # DEL key.
- if byte == ord('~'):
- self.logger.debug('Delete key pressed.')
- if self.input_buffer_pos != len(self.input_buffer):
- self.SliceOutChar()
- self.esc_state = 0 # Reset the state.
-
- elif self.esc_state is EscState.ESC_BRACKET_8:
- self.logger.debug('ESC_BRACKET_8')
- # END key.
- if byte == ord('~'):
- self.logger.debug('End key pressed.')
- self.MoveCursor('right',
- len(self.input_buffer) - self.input_buffer_pos)
- self.esc_state = 0 # Reset the state.
- self.logger.debug('ESC sequence complete.')
- return
-
- else:
- self.logger.error('Unexpected sequence. %c', byte)
+ def __init__(
+ self,
+ controller_pty,
+ user_pty,
+ interface_pty,
+ cmd_pipe,
+ dbg_pipe,
+ name=None,
+ ):
+ """Initalises a Console object with the provided arguments.
+
+ Args:
+ controller_pty: File descriptor to the controller side of the PTY. Used for
+ driving output to the user and receiving user input.
+ user_pty: A string representing the PTY name of the served console.
+ interface_pty: A string representing the PTY name of the served command
+ interface.
+ cmd_pipe: A socket.socket or multiprocessing.Connection object which
+ represents the console side of the command pipe. This must be a
+ bidirectional pipe. Console commands and responses utilize this pipe.
+ dbg_pipe: A socket.socket or multiprocessing.Connection object which
+ represents the console's read-only side of the debug pipe. This must be a
+ unidirectional pipe attached to the intepreter. EC debug messages use
+ this pipe.
+ name: the console source name
+ """
+ # Create a unique logger based on the console name
+ console_prefix = ("%s - " % name) if name else ""
+ logger = logging.getLogger("%sEC3PO.Console" % console_prefix)
+ self.logger = interpreter.LoggerAdapter(logger, {"pty": user_pty})
+ self.controller_pty = controller_pty
+ self.user_pty = user_pty
+ self.interface_pty = interface_pty
+ self.cmd_pipe = cmd_pipe
+ self.dbg_pipe = dbg_pipe
+ self.oobm_queue = threadproc_shim.Queue()
+ self.input_buffer = b""
+ self.input_buffer_pos = 0
+ self.partial_cmd = b""
self.esc_state = 0
+ self.line_limit = CONSOLE_INPUT_LINE_SIZE
+ self.history = []
+ self.history_pos = 0
+ self.prompt = PROMPT
+ self.enhanced_ec = False
+ self.interrogation_timeout = NON_ENHANCED_EC_INTERROGATION_TIMEOUT
+ self.receiving_oobm_cmd = False
+ self.pending_oobm_cmd = b""
+ self.interrogation_mode = b"auto"
+ self.timestamp_enabled = True
+ self.look_buffer = b""
+ self.raw_debug = False
+ self.output_line_log_buffer = []
+
+ def __str__(self):
+ """Show internal state of Console object as a string."""
+ string = []
+ string.append("controller_pty: %s" % self.controller_pty)
+ string.append("user_pty: %s" % self.user_pty)
+ string.append("interface_pty: %s" % self.interface_pty)
+ string.append("cmd_pipe: %s" % self.cmd_pipe)
+ string.append("dbg_pipe: %s" % self.dbg_pipe)
+ string.append("oobm_queue: %s" % self.oobm_queue)
+ string.append("input_buffer: %s" % self.input_buffer)
+ string.append("input_buffer_pos: %d" % self.input_buffer_pos)
+ string.append("esc_state: %d" % self.esc_state)
+ string.append("line_limit: %d" % self.line_limit)
+ string.append("history: %r" % self.history)
+ string.append("history_pos: %d" % self.history_pos)
+ string.append("prompt: %r" % self.prompt)
+ string.append("partial_cmd: %r" % self.partial_cmd)
+ string.append("interrogation_mode: %r" % self.interrogation_mode)
+ string.append("look_buffer: %r" % self.look_buffer)
+ return "\n".join(string)
+
+ def LogConsoleOutput(self, data):
+ """Log to debug user MCU output to controller_pty when line is filled.
+
+ The logging also suppresses the Cr50 spinner lines by removing characters
+ when it sees backspaces.
+
+ Args:
+ data: bytes - string received from MCU
+ """
+ data = list(data)
+ # For compatibility with python2 and python3, standardize on the data
+ # being a list of integers. This requires one more transformation in py2
+ if not isinstance(data[0], int):
+ data = [ord(c) for c in data]
+
+ # This is a list of already filtered characters (or placeholders).
+ line = self.output_line_log_buffer
+
+ # TODO(b/177480273): use raw strings here
+ symbols = {ord(b"\n"): "\\n", ord(b"\r"): "\\r", ord(b"\t"): "\\t"}
+ # self.logger.debug(u'%s + %r', u''.join(line), ''.join(data))
+ while data:
+ # Recall, data is a list of integers, namely the byte values sent by
+ # the MCU.
+ byte = data.pop(0)
+ # This means that |byte| is an int.
+ if byte == ord("\n"):
+ line.append(symbols[byte])
+ if line:
+ self.logger.debug("%s", "".join(line))
+ line = []
+ elif byte == ord("\b"):
+ # Backspace: trim the last character off the buffer
+ if line:
+ line.pop(-1)
+ elif byte in symbols:
+ line.append(symbols[byte])
+ elif byte < ord(" ") or byte > ord("~"):
+ # Turn any character that isn't printable ASCII into escaped hex.
+ # ' ' is chr(20), and 0-19 are unprintable control characters.
+ # '~' is chr(126), and 127 is DELETE. 128-255 are control and Latin-1.
+ line.append("\\x%02x" % byte)
+ else:
+ # byte is printable. Thus it is safe to use chr() to get the printable
+ # character out of it again.
+ line.append("%s" % chr(byte))
+ self.output_line_log_buffer = line
+
+ def PrintHistory(self):
+ """Print the history of entered commands."""
+ fd = self.controller_pty
+ # Make it pretty by figuring out how wide to pad the numbers.
+ wide = (len(self.history) // 10) + 1
+ for i in range(len(self.history)):
+ line = b" %*d %s\r\n" % (wide, i, self.history[i])
+ os.write(fd, line)
+
+ def ShowPreviousCommand(self):
+ """Shows the previous command from the history list."""
+ # There's nothing to do if there's no history at all.
+ if not self.history:
+ self.logger.debug("No history to print.")
+ return
+
+ # Don't do anything if there's no more history to show.
+ if self.history_pos == 0:
+ self.logger.debug("No more history to show.")
+ return
+
+ self.logger.debug("current history position: %d.", self.history_pos)
+
+ # Decrement the history buffer position.
+ self.history_pos -= 1
+ self.logger.debug("new history position.: %d", self.history_pos)
+
+ # Save the text entered on the console if any.
+ if self.history_pos == len(self.history) - 1:
+ self.logger.debug("saving partial_cmd: %r", self.input_buffer)
+ self.partial_cmd = self.input_buffer
+
+ # Backspace the line.
+ for _ in range(self.input_buffer_pos):
+ self.SendBackspace()
+
+ # Print the last entry in the history buffer.
+ self.logger.debug(
+ "printing previous entry %d - %s",
+ self.history_pos,
+ self.history[self.history_pos],
+ )
+ fd = self.controller_pty
+ prev_cmd = self.history[self.history_pos]
+ os.write(fd, prev_cmd)
+ # Update the input buffer.
+ self.input_buffer = prev_cmd
+ self.input_buffer_pos = len(prev_cmd)
+
+ def ShowNextCommand(self):
+ """Shows the next command from the history list."""
+ # Don't do anything if there's no history at all.
+ if not self.history:
+ self.logger.debug("History buffer is empty.")
+ return
+
+ fd = self.controller_pty
+
+ self.logger.debug("current history position: %d", self.history_pos)
+ # Increment the history position.
+ self.history_pos += 1
+
+ # Restore the partial cmd.
+ if self.history_pos == len(self.history):
+ self.logger.debug(
+ "Restoring partial command of %r", self.partial_cmd
+ )
+ # Backspace the line.
+ for _ in range(self.input_buffer_pos):
+ self.SendBackspace()
+ # Print the partially entered command if any.
+ os.write(fd, self.partial_cmd)
+ self.input_buffer = self.partial_cmd
+ self.input_buffer_pos = len(self.input_buffer)
+ # Now that we've printed it, clear the partial cmd storage.
+ self.partial_cmd = b""
+ # Reset history position.
+ self.history_pos = len(self.history)
+ return
+
+ self.logger.debug("new history position: %d", self.history_pos)
+ if self.history_pos > len(self.history) - 1:
+ self.logger.debug("No more history to show.")
+ self.history_pos -= 1
+ self.logger.debug("Reset history position to %d", self.history_pos)
+ return
+
+ # Backspace the line.
+ for _ in range(self.input_buffer_pos):
+ self.SendBackspace()
+
+ # Print the newer entry from the history buffer.
+ self.logger.debug(
+ "printing next entry %d - %s",
+ self.history_pos,
+ self.history[self.history_pos],
+ )
+ next_cmd = self.history[self.history_pos]
+ os.write(fd, next_cmd)
+ # Update the input buffer.
+ self.input_buffer = next_cmd
+ self.input_buffer_pos = len(next_cmd)
+ self.logger.debug("new history position: %d.", self.history_pos)
+
+ def SliceOutChar(self):
+ """Remove a char from the line and shift everything over 1 column."""
+ fd = self.controller_pty
+ # Remove the character at the input_buffer_pos by slicing it out.
+ self.input_buffer = (
+ self.input_buffer[0 : self.input_buffer_pos]
+ + self.input_buffer[self.input_buffer_pos + 1 :]
+ )
+ # Write the rest of the line
+ moved_col = os.write(fd, self.input_buffer[self.input_buffer_pos :])
+ # Write a space to clear out the last char
+ moved_col += os.write(fd, b" ")
+ # Update the input buffer position.
+ self.input_buffer_pos += moved_col
+ # Reset the cursor
+ self.MoveCursor("left", moved_col)
+
+ def HandleEsc(self, byte):
+ """HandleEsc processes escape sequences.
+
+ Args:
+ byte: An integer representing the current byte in the sequence.
+ """
+ # We shouldn't be handling an escape sequence if we haven't seen one.
+ assert self.esc_state != 0
+
+ if self.esc_state is EscState.ESC_START:
+ self.logger.debug("ESC_START")
+ if byte == ord("["):
+ self.esc_state = EscState.ESC_BRACKET
+ return
+
+ else:
+ self.logger.error("Unexpected sequence. %c", byte)
+ self.esc_state = 0
+
+ elif self.esc_state is EscState.ESC_BRACKET:
+ self.logger.debug("ESC_BRACKET")
+ # Left Arrow key was pressed.
+ if byte == ord("D"):
+ self.logger.debug("Left arrow key pressed.")
+ self.MoveCursor("left", 1)
+ self.esc_state = 0 # Reset the state.
+ return
+
+ # Right Arrow key.
+ elif byte == ord("C"):
+ self.logger.debug("Right arrow key pressed.")
+ self.MoveCursor("right", 1)
+ self.esc_state = 0 # Reset the state.
+ return
+
+ # Up Arrow key.
+ elif byte == ord("A"):
+ self.logger.debug("Up arrow key pressed.")
+ self.ShowPreviousCommand()
+ # Reset the state.
+ self.esc_state = 0 # Reset the state.
+ return
+
+ # Down Arrow key.
+ elif byte == ord("B"):
+ self.logger.debug("Down arrow key pressed.")
+ self.ShowNextCommand()
+ # Reset the state.
+ self.esc_state = 0 # Reset the state.
+ return
+
+ # For some reason, minicom sends a 1 instead of 7. /shrug
+ # TODO(aaboagye): Figure out why this happens.
+ elif byte == ord("1") or byte == ord("7"):
+ self.esc_state = EscState.ESC_BRACKET_1
+
+ elif byte == ord("3"):
+ self.esc_state = EscState.ESC_BRACKET_3
+
+ elif byte == ord("8"):
+ self.esc_state = EscState.ESC_BRACKET_8
+
+ else:
+ self.logger.error(
+ r"Bad or unhandled escape sequence. got ^[%c\(%d)",
+ chr(byte),
+ byte,
+ )
+ self.esc_state = 0
+ return
+
+ elif self.esc_state is EscState.ESC_BRACKET_1:
+ self.logger.debug("ESC_BRACKET_1")
+ # HOME key.
+ if byte == ord("~"):
+ self.logger.debug("Home key pressed.")
+ self.MoveCursor("left", self.input_buffer_pos)
+ self.esc_state = 0 # Reset the state.
+ self.logger.debug("ESC sequence complete.")
+ return
+
+ elif self.esc_state is EscState.ESC_BRACKET_3:
+ self.logger.debug("ESC_BRACKET_3")
+ # DEL key.
+ if byte == ord("~"):
+ self.logger.debug("Delete key pressed.")
+ if self.input_buffer_pos != len(self.input_buffer):
+ self.SliceOutChar()
+ self.esc_state = 0 # Reset the state.
+
+ elif self.esc_state is EscState.ESC_BRACKET_8:
+ self.logger.debug("ESC_BRACKET_8")
+ # END key.
+ if byte == ord("~"):
+ self.logger.debug("End key pressed.")
+ self.MoveCursor(
+ "right", len(self.input_buffer) - self.input_buffer_pos
+ )
+ self.esc_state = 0 # Reset the state.
+ self.logger.debug("ESC sequence complete.")
+ return
+
+ else:
+ self.logger.error("Unexpected sequence. %c", byte)
+ self.esc_state = 0
+
+ else:
+ self.logger.error("Unexpected sequence. %c", byte)
+ self.esc_state = 0
+
+ def ProcessInput(self):
+ """Captures the input determines what actions to take."""
+ # There's nothing to do if the input buffer is empty.
+ if len(self.input_buffer) == 0:
+ return
+
+ # Don't store 2 consecutive identical commands in the history.
+ if (
+ self.history
+ and self.history[-1] != self.input_buffer
+ or not self.history
+ ):
+ self.history.append(self.input_buffer)
+
+ # Split the command up by spaces.
+ line = self.input_buffer.split(b" ")
+ self.logger.debug("cmd: %s", self.input_buffer)
+ cmd = line[0].lower()
+
+ # The 'history' command is a special case that we handle locally.
+ if cmd == "history":
+ self.PrintHistory()
+ return
+
+ # Send the command to the interpreter.
+ self.logger.debug("Sending command to interpreter.")
+ self.cmd_pipe.send(self.input_buffer)
+
+ def CheckForEnhancedECImage(self):
+ """Performs an interrogation of the EC image.
+
+ Send a SYN and expect an ACK. If no ACK or the response is incorrect, then
+ assume that the current EC image that we are talking to is not enhanced.
+
+ Returns:
+ is_enhanced: A boolean indicating whether the EC responded to the
+ interrogation correctly.
+
+ Raises:
+ EOFError: Allowed to propagate through from self.dbg_pipe.recv().
+ """
+ # Send interrogation byte and wait for the response.
+ self.logger.debug("Performing interrogation.")
+ self.cmd_pipe.send(interpreter.EC_SYN)
+
+ response = ""
+ if self.dbg_pipe.poll(self.interrogation_timeout):
+ response = self.dbg_pipe.recv()
+ self.logger.debug("response: %r", binascii.hexlify(response))
+ else:
+ self.logger.debug("Timed out waiting for EC_ACK")
+
+ # Verify the acknowledgment.
+ is_enhanced = response == interpreter.EC_ACK
+
+ if is_enhanced:
+ # Increase the interrogation timeout for stability purposes.
+ self.interrogation_timeout = ENHANCED_EC_INTERROGATION_TIMEOUT
+ self.logger.debug(
+ "Increasing interrogation timeout to %rs.",
+ self.interrogation_timeout,
+ )
+ else:
+ # Reduce the timeout in order to reduce the perceivable delay.
+ self.interrogation_timeout = NON_ENHANCED_EC_INTERROGATION_TIMEOUT
+ self.logger.debug(
+ "Reducing interrogation timeout to %rs.",
+ self.interrogation_timeout,
+ )
+
+ return is_enhanced
+
+ def HandleChar(self, byte):
+ """HandleChar does a certain action when it receives a character.
+
+ Args:
+ byte: An integer representing the character received from the user.
+
+ Raises:
+ EOFError: Allowed to propagate through from self.CheckForEnhancedECImage()
+ i.e. from self.dbg_pipe.recv().
+ """
+ fd = self.controller_pty
+
+ # Enter the OOBM prompt mode if the user presses '%'.
+ if byte == ord("%"):
+ self.logger.debug("Begin OOBM command.")
+ self.receiving_oobm_cmd = True
+ # Print a "prompt".
+ os.write(self.controller_pty, b"\r\n% ")
+ return
+
+ # Add chars to the pending OOBM command if we're currently receiving one.
+ if self.receiving_oobm_cmd and byte != ControlKey.CARRIAGE_RETURN:
+ tmp_bytes = six.int2byte(byte)
+ self.pending_oobm_cmd += tmp_bytes
+ self.logger.debug("%s", tmp_bytes)
+ os.write(self.controller_pty, tmp_bytes)
+ return
+
+ if byte == ControlKey.CARRIAGE_RETURN:
+ if self.receiving_oobm_cmd:
+ # Terminate the command and place it in the OOBM queue.
+ self.logger.debug("End OOBM command.")
+ if self.pending_oobm_cmd:
+ self.oobm_queue.put(self.pending_oobm_cmd)
+ self.logger.debug(
+ "Placed %r into OOBM command queue.",
+ self.pending_oobm_cmd,
+ )
+
+ # Reset the state.
+ os.write(self.controller_pty, b"\r\n" + self.prompt)
+ self.input_buffer = b""
+ self.input_buffer_pos = 0
+ self.receiving_oobm_cmd = False
+ self.pending_oobm_cmd = b""
+ return
+
+ if self.interrogation_mode == b"never":
+ self.logger.debug(
+ "Skipping interrogation because interrogation mode"
+ " is set to never."
+ )
+ elif self.interrogation_mode == b"always":
+ # Only interrogate the EC if the interrogation mode is set to 'always'.
+ self.enhanced_ec = self.CheckForEnhancedECImage()
+ self.logger.debug("Enhanced EC image? %r", self.enhanced_ec)
+
+ if not self.enhanced_ec:
+ # Send everything straight to the EC to handle.
+ self.cmd_pipe.send(six.int2byte(byte))
+ # Reset the input buffer.
+ self.input_buffer = b""
+ self.input_buffer_pos = 0
+ self.logger.log(1, "Reset input buffer.")
+ return
+
+ # Keep handling the ESC sequence if we're in the middle of it.
+ if self.esc_state != 0:
+ self.HandleEsc(byte)
+ return
+
+ # When we're at the end of the line, we should only allow going backwards,
+ # backspace, carriage return, up, or down. The arrow keys are escape
+ # sequences, so we let the escape...escape.
+ if self.input_buffer_pos >= self.line_limit and byte not in [
+ ControlKey.CTRL_B,
+ ControlKey.ESC,
+ ControlKey.BACKSPACE,
+ ControlKey.CTRL_A,
+ ControlKey.CARRIAGE_RETURN,
+ ControlKey.CTRL_P,
+ ControlKey.CTRL_N,
+ ]:
+ return
+
+ # If the input buffer is full we can't accept new chars.
+ buffer_full = len(self.input_buffer) >= self.line_limit
+
+ # Carriage_Return/Enter
+ if byte == ControlKey.CARRIAGE_RETURN:
+ self.logger.debug("Enter key pressed.")
+ # Put a carriage return/newline and the print the prompt.
+ os.write(fd, b"\r\n")
+
+ # TODO(aaboagye): When we control the printing of all output, print the
+ # prompt AFTER printing all the output. We can't do it yet because we
+ # don't know how much is coming from the EC.
+
+ # Print the prompt.
+ os.write(fd, self.prompt)
+ # Process the input.
+ self.ProcessInput()
+ # Now, clear the buffer.
+ self.input_buffer = b""
+ self.input_buffer_pos = 0
+ # Reset history buffer pos.
+ self.history_pos = len(self.history)
+ # Clear partial command.
+ self.partial_cmd = b""
+
+ # Backspace
+ elif byte == ControlKey.BACKSPACE:
+ self.logger.debug("Backspace pressed.")
+ if self.input_buffer_pos > 0:
+ # Move left 1 column.
+ self.MoveCursor("left", 1)
+ # Remove the character at the input_buffer_pos by slicing it out.
+ self.SliceOutChar()
+
+ self.logger.debug("input_buffer_pos: %d", self.input_buffer_pos)
+
+ # Ctrl+A. Move cursor to beginning of the line
+ elif byte == ControlKey.CTRL_A:
+ self.logger.debug("Control+A pressed.")
+ self.MoveCursor("left", self.input_buffer_pos)
+
+ # Ctrl+B. Move cursor left 1 column.
+ elif byte == ControlKey.CTRL_B:
+ self.logger.debug("Control+B pressed.")
+ self.MoveCursor("left", 1)
+
+ # Ctrl+D. Delete a character.
+ elif byte == ControlKey.CTRL_D:
+ self.logger.debug("Control+D pressed.")
+ if self.input_buffer_pos != len(self.input_buffer):
+ # Remove the character by slicing it out.
+ self.SliceOutChar()
+
+ # Ctrl+E. Move cursor to end of the line.
+ elif byte == ControlKey.CTRL_E:
+ self.logger.debug("Control+E pressed.")
+ self.MoveCursor(
+ "right", len(self.input_buffer) - self.input_buffer_pos
+ )
+
+ # Ctrl+F. Move cursor right 1 column.
+ elif byte == ControlKey.CTRL_F:
+ self.logger.debug("Control+F pressed.")
+ self.MoveCursor("right", 1)
+
+ # Ctrl+K. Kill line.
+ elif byte == ControlKey.CTRL_K:
+ self.logger.debug("Control+K pressed.")
+ self.KillLine()
+
+ # Ctrl+N. Next line.
+ elif byte == ControlKey.CTRL_N:
+ self.logger.debug("Control+N pressed.")
+ self.ShowNextCommand()
+
+ # Ctrl+P. Previous line.
+ elif byte == ControlKey.CTRL_P:
+ self.logger.debug("Control+P pressed.")
+ self.ShowPreviousCommand()
+
+ # ESC sequence
+ elif byte == ControlKey.ESC:
+ # Starting an ESC sequence
+ self.esc_state = EscState.ESC_START
+
+ # Only print printable chars.
+ elif IsPrintable(byte):
+ # Drop the character if we're full.
+ if buffer_full:
+ self.logger.debug("Dropped char: %c(%d)", byte, byte)
+ return
+ # Print the character.
+ os.write(fd, six.int2byte(byte))
+ # Print the rest of the line (if any).
+ extra_bytes_written = os.write(
+ fd, self.input_buffer[self.input_buffer_pos :]
+ )
+
+ # Recreate the input buffer.
+ self.input_buffer = (
+ self.input_buffer[0 : self.input_buffer_pos]
+ + six.int2byte(byte)
+ + self.input_buffer[self.input_buffer_pos :]
+ )
+ # Update the input buffer position.
+ self.input_buffer_pos += 1 + extra_bytes_written
+
+ # Reset the cursor if we wrote any extra bytes.
+ if extra_bytes_written:
+ self.MoveCursor("left", extra_bytes_written)
+
+ self.logger.debug("input_buffer_pos: %d", self.input_buffer_pos)
+
+ def MoveCursor(self, direction, count):
+ """MoveCursor moves the cursor left or right by count columns.
+
+ Args:
+ direction: A string that should be either 'left' or 'right' representing
+ the direction to move the cursor on the console.
+ count: An integer representing how many columns the cursor should be
+ moved.
+
+ Raises:
+ AssertionError: If the direction is not equal to 'left' or 'right'.
+ """
+ # If there's nothing to move, we're done.
+ if not count:
+ return
+ fd = self.controller_pty
+ seq = b"\033[" + str(count).encode("ascii")
+ if direction == "left":
+ # Bind the movement.
+ if count > self.input_buffer_pos:
+ count = self.input_buffer_pos
+ seq += b"D"
+ self.logger.debug("move cursor left %d", count)
+ self.input_buffer_pos -= count
+
+ elif direction == "right":
+ # Bind the movement.
+ if (count + self.input_buffer_pos) > len(self.input_buffer):
+ count = 0
+ seq += b"C"
+ self.logger.debug("move cursor right %d", count)
+ self.input_buffer_pos += count
+
+ else:
+ raise AssertionError(
+ ("The only valid directions are 'left' and 'right'")
+ )
+
+ self.logger.debug("input_buffer_pos: %d", self.input_buffer_pos)
+ # Move the cursor.
+ if count != 0:
+ os.write(fd, seq)
+
+ def KillLine(self):
+ """Kill the rest of the line based on the input buffer position."""
+ # Killing the line is killing all the text to the right.
+ diff = len(self.input_buffer) - self.input_buffer_pos
+ self.logger.debug("diff: %d", diff)
+ # Diff shouldn't be negative, but if it is for some reason, let's try to
+ # correct the cursor.
+ if diff < 0:
+ self.logger.warning(
+ "Resetting input buffer position to %d...",
+ len(self.input_buffer),
+ )
+ self.MoveCursor("left", -diff)
+ return
+ if diff:
+ self.MoveCursor("right", diff)
+ for _ in range(diff):
+ self.SendBackspace()
+ self.input_buffer_pos -= diff
+ self.input_buffer = self.input_buffer[0 : self.input_buffer_pos]
+
+ def SendBackspace(self):
+ """Backspace a character on the console."""
+ os.write(self.controller_pty, b"\033[1D \033[1D")
+
+ def ProcessOOBMQueue(self):
+ """Retrieve an item from the OOBM queue and process it."""
+ item = self.oobm_queue.get()
+ self.logger.debug("OOBM cmd: %r", item)
+ cmd = item.split(b" ")
+
+ if cmd[0] == b"loglevel":
+ # An integer is required in order to set the log level.
+ if len(cmd) < 2:
+ self.logger.debug("Insufficient args")
+ self.PrintOOBMHelp()
+ return
+ try:
+ self.logger.debug("Log level change request.")
+ new_log_level = int(cmd[1])
+ self.logger.logger.setLevel(new_log_level)
+ self.logger.info("Log level changed to %d.", new_log_level)
+
+ # Forward the request to the interpreter as well.
+ self.cmd_pipe.send(item)
+ except ValueError:
+ # Ignoring the request if an integer was not provided.
+ self.PrintOOBMHelp()
+
+ elif cmd[0] == b"timestamp":
+ mode = cmd[1].lower()
+ self.timestamp_enabled = mode == b"on"
+ self.logger.info(
+ "%sabling uart timestamps.",
+ "En" if self.timestamp_enabled else "Dis",
+ )
+
+ elif cmd[0] == b"rawdebug":
+ mode = cmd[1].lower()
+ self.raw_debug = mode == b"on"
+ self.logger.info(
+ "%sabling per interrupt debug logs.",
+ "En" if self.raw_debug else "Dis",
+ )
+
+ elif cmd[0] == b"interrogate" and len(cmd) >= 2:
+ enhanced = False
+ mode = cmd[1]
+ if len(cmd) >= 3 and cmd[2] == b"enhanced":
+ enhanced = True
+
+ # Set the mode if correct.
+ if mode in INTERROGATION_MODES:
+ self.interrogation_mode = mode
+ self.logger.debug("Updated interrogation mode to %s.", mode)
+
+ # Update the assumptions of the EC image.
+ self.enhanced_ec = enhanced
+ self.logger.debug(
+ "Enhanced EC image is now %r", self.enhanced_ec
+ )
+
+ # Send command to interpreter as well.
+ self.cmd_pipe.send(
+ b"enhanced " + str(self.enhanced_ec).encode("ascii")
+ )
+ else:
+ self.PrintOOBMHelp()
+
+ else:
+ self.PrintOOBMHelp()
+
+ def PrintOOBMHelp(self):
+ """Prints out the OOBM help."""
+ # Print help syntax.
+ os.write(self.controller_pty, b"\r\n" + b"Known OOBM commands:\r\n")
+ os.write(
+ self.controller_pty,
+ b" interrogate <never | always | auto> " b"[enhanced]\r\n",
+ )
+ os.write(self.controller_pty, b" loglevel <int>\r\n")
+
+ def CheckBufferForEnhancedImage(self, data):
+ """Adds data to a look buffer and checks to see for enhanced EC image.
+
+ The EC's console task prints a string upon initialization which says that
+ "Console is enabled; type HELP for help.". The enhanced EC images print a
+ different string as a part of their init. This function searches through a
+ "look" buffer, scanning for the presence of either of those strings and
+ updating the enhanced_ec state accordingly.
+
+ Args:
+ data: A string containing the data sent from the interpreter.
+ """
+ self.look_buffer += data
+
+ # Search the buffer for any of the EC image strings.
+ enhanced_match = re.search(ENHANCED_IMAGE_RE, self.look_buffer)
+ non_enhanced_match = re.search(NON_ENHANCED_IMAGE_RE, self.look_buffer)
+
+ # Update the state if any matches were found.
+ if enhanced_match or non_enhanced_match:
+ if enhanced_match:
+ self.enhanced_ec = True
+ elif non_enhanced_match:
+ self.enhanced_ec = False
+
+ # Inform the interpreter of the result.
+ self.cmd_pipe.send(
+ b"enhanced " + str(self.enhanced_ec).encode("ascii")
+ )
+ self.logger.debug("Enhanced EC image? %r", self.enhanced_ec)
+
+ # Clear look buffer since a match was found.
+ self.look_buffer = b""
+
+ # Move the sliding window.
+ self.look_buffer = self.look_buffer[-LOOK_BUFFER_SIZE:]
- else:
- self.logger.error('Unexpected sequence. %c', byte)
- self.esc_state = 0
-
- def ProcessInput(self):
- """Captures the input determines what actions to take."""
- # There's nothing to do if the input buffer is empty.
- if len(self.input_buffer) == 0:
- return
-
- # Don't store 2 consecutive identical commands in the history.
- if (self.history and self.history[-1] != self.input_buffer
- or not self.history):
- self.history.append(self.input_buffer)
-
- # Split the command up by spaces.
- line = self.input_buffer.split(b' ')
- self.logger.debug('cmd: %s', self.input_buffer)
- cmd = line[0].lower()
-
- # The 'history' command is a special case that we handle locally.
- if cmd == 'history':
- self.PrintHistory()
- return
-
- # Send the command to the interpreter.
- self.logger.debug('Sending command to interpreter.')
- self.cmd_pipe.send(self.input_buffer)
- def CheckForEnhancedECImage(self):
- """Performs an interrogation of the EC image.
+def CanonicalizeTimeString(timestr):
+ """Canonicalize the timestamp string.
- Send a SYN and expect an ACK. If no ACK or the response is incorrect, then
- assume that the current EC image that we are talking to is not enhanced.
+ Args:
+ timestr: A timestamp string ended with 6 digits msec.
Returns:
- is_enhanced: A boolean indicating whether the EC responded to the
- interrogation correctly.
-
- Raises:
- EOFError: Allowed to propagate through from self.dbg_pipe.recv().
+ A string with 3 digits msec and an extra space.
"""
- # Send interrogation byte and wait for the response.
- self.logger.debug('Performing interrogation.')
- self.cmd_pipe.send(interpreter.EC_SYN)
-
- response = ''
- if self.dbg_pipe.poll(self.interrogation_timeout):
- response = self.dbg_pipe.recv()
- self.logger.debug('response: %r', binascii.hexlify(response))
- else:
- self.logger.debug('Timed out waiting for EC_ACK')
+ return timestr[:-3].encode("ascii") + b" "
- # Verify the acknowledgment.
- is_enhanced = response == interpreter.EC_ACK
-
- if is_enhanced:
- # Increase the interrogation timeout for stability purposes.
- self.interrogation_timeout = ENHANCED_EC_INTERROGATION_TIMEOUT
- self.logger.debug('Increasing interrogation timeout to %rs.',
- self.interrogation_timeout)
- else:
- # Reduce the timeout in order to reduce the perceivable delay.
- self.interrogation_timeout = NON_ENHANCED_EC_INTERROGATION_TIMEOUT
- self.logger.debug('Reducing interrogation timeout to %rs.',
- self.interrogation_timeout)
-
- return is_enhanced
-
- def HandleChar(self, byte):
- """HandleChar does a certain action when it receives a character.
-
- Args:
- byte: An integer representing the character received from the user.
- Raises:
- EOFError: Allowed to propagate through from self.CheckForEnhancedECImage()
- i.e. from self.dbg_pipe.recv().
- """
- fd = self.controller_pty
-
- # Enter the OOBM prompt mode if the user presses '%'.
- if byte == ord('%'):
- self.logger.debug('Begin OOBM command.')
- self.receiving_oobm_cmd = True
- # Print a "prompt".
- os.write(self.controller_pty, b'\r\n% ')
- return
-
- # Add chars to the pending OOBM command if we're currently receiving one.
- if self.receiving_oobm_cmd and byte != ControlKey.CARRIAGE_RETURN:
- tmp_bytes = six.int2byte(byte)
- self.pending_oobm_cmd += tmp_bytes
- self.logger.debug('%s', tmp_bytes)
- os.write(self.controller_pty, tmp_bytes)
- return
-
- if byte == ControlKey.CARRIAGE_RETURN:
- if self.receiving_oobm_cmd:
- # Terminate the command and place it in the OOBM queue.
- self.logger.debug('End OOBM command.')
- if self.pending_oobm_cmd:
- self.oobm_queue.put(self.pending_oobm_cmd)
- self.logger.debug('Placed %r into OOBM command queue.',
- self.pending_oobm_cmd)
-
- # Reset the state.
- os.write(self.controller_pty, b'\r\n' + self.prompt)
- self.input_buffer = b''
- self.input_buffer_pos = 0
- self.receiving_oobm_cmd = False
- self.pending_oobm_cmd = b''
- return
-
- if self.interrogation_mode == b'never':
- self.logger.debug('Skipping interrogation because interrogation mode'
- ' is set to never.')
- elif self.interrogation_mode == b'always':
- # Only interrogate the EC if the interrogation mode is set to 'always'.
- self.enhanced_ec = self.CheckForEnhancedECImage()
- self.logger.debug('Enhanced EC image? %r', self.enhanced_ec)
-
- if not self.enhanced_ec:
- # Send everything straight to the EC to handle.
- self.cmd_pipe.send(six.int2byte(byte))
- # Reset the input buffer.
- self.input_buffer = b''
- self.input_buffer_pos = 0
- self.logger.log(1, 'Reset input buffer.')
- return
-
- # Keep handling the ESC sequence if we're in the middle of it.
- if self.esc_state != 0:
- self.HandleEsc(byte)
- return
-
- # When we're at the end of the line, we should only allow going backwards,
- # backspace, carriage return, up, or down. The arrow keys are escape
- # sequences, so we let the escape...escape.
- if (self.input_buffer_pos >= self.line_limit and
- byte not in [ControlKey.CTRL_B, ControlKey.ESC, ControlKey.BACKSPACE,
- ControlKey.CTRL_A, ControlKey.CARRIAGE_RETURN,
- ControlKey.CTRL_P, ControlKey.CTRL_N]):
- return
-
- # If the input buffer is full we can't accept new chars.
- buffer_full = len(self.input_buffer) >= self.line_limit
-
-
- # Carriage_Return/Enter
- if byte == ControlKey.CARRIAGE_RETURN:
- self.logger.debug('Enter key pressed.')
- # Put a carriage return/newline and the print the prompt.
- os.write(fd, b'\r\n')
-
- # TODO(aaboagye): When we control the printing of all output, print the
- # prompt AFTER printing all the output. We can't do it yet because we
- # don't know how much is coming from the EC.
-
- # Print the prompt.
- os.write(fd, self.prompt)
- # Process the input.
- self.ProcessInput()
- # Now, clear the buffer.
- self.input_buffer = b''
- self.input_buffer_pos = 0
- # Reset history buffer pos.
- self.history_pos = len(self.history)
- # Clear partial command.
- self.partial_cmd = b''
-
- # Backspace
- elif byte == ControlKey.BACKSPACE:
- self.logger.debug('Backspace pressed.')
- if self.input_buffer_pos > 0:
- # Move left 1 column.
- self.MoveCursor('left', 1)
- # Remove the character at the input_buffer_pos by slicing it out.
- self.SliceOutChar()
-
- self.logger.debug('input_buffer_pos: %d', self.input_buffer_pos)
-
- # Ctrl+A. Move cursor to beginning of the line
- elif byte == ControlKey.CTRL_A:
- self.logger.debug('Control+A pressed.')
- self.MoveCursor('left', self.input_buffer_pos)
-
- # Ctrl+B. Move cursor left 1 column.
- elif byte == ControlKey.CTRL_B:
- self.logger.debug('Control+B pressed.')
- self.MoveCursor('left', 1)
-
- # Ctrl+D. Delete a character.
- elif byte == ControlKey.CTRL_D:
- self.logger.debug('Control+D pressed.')
- if self.input_buffer_pos != len(self.input_buffer):
- # Remove the character by slicing it out.
- self.SliceOutChar()
-
- # Ctrl+E. Move cursor to end of the line.
- elif byte == ControlKey.CTRL_E:
- self.logger.debug('Control+E pressed.')
- self.MoveCursor('right',
- len(self.input_buffer) - self.input_buffer_pos)
-
- # Ctrl+F. Move cursor right 1 column.
- elif byte == ControlKey.CTRL_F:
- self.logger.debug('Control+F pressed.')
- self.MoveCursor('right', 1)
-
- # Ctrl+K. Kill line.
- elif byte == ControlKey.CTRL_K:
- self.logger.debug('Control+K pressed.')
- self.KillLine()
-
- # Ctrl+N. Next line.
- elif byte == ControlKey.CTRL_N:
- self.logger.debug('Control+N pressed.')
- self.ShowNextCommand()
-
- # Ctrl+P. Previous line.
- elif byte == ControlKey.CTRL_P:
- self.logger.debug('Control+P pressed.')
- self.ShowPreviousCommand()
-
- # ESC sequence
- elif byte == ControlKey.ESC:
- # Starting an ESC sequence
- self.esc_state = EscState.ESC_START
-
- # Only print printable chars.
- elif IsPrintable(byte):
- # Drop the character if we're full.
- if buffer_full:
- self.logger.debug('Dropped char: %c(%d)', byte, byte)
- return
- # Print the character.
- os.write(fd, six.int2byte(byte))
- # Print the rest of the line (if any).
- extra_bytes_written = os.write(fd,
- self.input_buffer[self.input_buffer_pos:])
-
- # Recreate the input buffer.
- self.input_buffer = (self.input_buffer[0:self.input_buffer_pos] +
- six.int2byte(byte) +
- self.input_buffer[self.input_buffer_pos:])
- # Update the input buffer position.
- self.input_buffer_pos += 1 + extra_bytes_written
-
- # Reset the cursor if we wrote any extra bytes.
- if extra_bytes_written:
- self.MoveCursor('left', extra_bytes_written)
-
- self.logger.debug('input_buffer_pos: %d', self.input_buffer_pos)
-
- def MoveCursor(self, direction, count):
- """MoveCursor moves the cursor left or right by count columns.
+def IsPrintable(byte):
+ """Determines if a byte is printable.
Args:
- direction: A string that should be either 'left' or 'right' representing
- the direction to move the cursor on the console.
- count: An integer representing how many columns the cursor should be
- moved.
+ byte: An integer potentially representing a printable character.
- Raises:
- AssertionError: If the direction is not equal to 'left' or 'right'.
+ Returns:
+ A boolean indicating whether the byte is a printable character.
"""
- # If there's nothing to move, we're done.
- if not count:
- return
- fd = self.controller_pty
- seq = b'\033[' + str(count).encode('ascii')
- if direction == 'left':
- # Bind the movement.
- if count > self.input_buffer_pos:
- count = self.input_buffer_pos
- seq += b'D'
- self.logger.debug('move cursor left %d', count)
- self.input_buffer_pos -= count
-
- elif direction == 'right':
- # Bind the movement.
- if (count + self.input_buffer_pos) > len(self.input_buffer):
- count = 0
- seq += b'C'
- self.logger.debug('move cursor right %d', count)
- self.input_buffer_pos += count
-
- else:
- raise AssertionError(('The only valid directions are \'left\' and '
- '\'right\''))
-
- self.logger.debug('input_buffer_pos: %d', self.input_buffer_pos)
- # Move the cursor.
- if count != 0:
- os.write(fd, seq)
-
- def KillLine(self):
- """Kill the rest of the line based on the input buffer position."""
- # Killing the line is killing all the text to the right.
- diff = len(self.input_buffer) - self.input_buffer_pos
- self.logger.debug('diff: %d', diff)
- # Diff shouldn't be negative, but if it is for some reason, let's try to
- # correct the cursor.
- if diff < 0:
- self.logger.warning('Resetting input buffer position to %d...',
- len(self.input_buffer))
- self.MoveCursor('left', -diff)
- return
- if diff:
- self.MoveCursor('right', diff)
- for _ in range(diff):
- self.SendBackspace()
- self.input_buffer_pos -= diff
- self.input_buffer = self.input_buffer[0:self.input_buffer_pos]
-
- def SendBackspace(self):
- """Backspace a character on the console."""
- os.write(self.controller_pty, b'\033[1D \033[1D')
-
- def ProcessOOBMQueue(self):
- """Retrieve an item from the OOBM queue and process it."""
- item = self.oobm_queue.get()
- self.logger.debug('OOBM cmd: %r', item)
- cmd = item.split(b' ')
-
- if cmd[0] == b'loglevel':
- # An integer is required in order to set the log level.
- if len(cmd) < 2:
- self.logger.debug('Insufficient args')
- self.PrintOOBMHelp()
- return
- try:
- self.logger.debug('Log level change request.')
- new_log_level = int(cmd[1])
- self.logger.logger.setLevel(new_log_level)
- self.logger.info('Log level changed to %d.', new_log_level)
-
- # Forward the request to the interpreter as well.
- self.cmd_pipe.send(item)
- except ValueError:
- # Ignoring the request if an integer was not provided.
- self.PrintOOBMHelp()
-
- elif cmd[0] == b'timestamp':
- mode = cmd[1].lower()
- self.timestamp_enabled = (mode == b'on')
- self.logger.info('%sabling uart timestamps.',
- 'En' if self.timestamp_enabled else 'Dis')
-
- elif cmd[0] == b'rawdebug':
- mode = cmd[1].lower()
- self.raw_debug = (mode == b'on')
- self.logger.info('%sabling per interrupt debug logs.',
- 'En' if self.raw_debug else 'Dis')
-
- elif cmd[0] == b'interrogate' and len(cmd) >= 2:
- enhanced = False
- mode = cmd[1]
- if len(cmd) >= 3 and cmd[2] == b'enhanced':
- enhanced = True
-
- # Set the mode if correct.
- if mode in INTERROGATION_MODES:
- self.interrogation_mode = mode
- self.logger.debug('Updated interrogation mode to %s.', mode)
-
- # Update the assumptions of the EC image.
- self.enhanced_ec = enhanced
- self.logger.debug('Enhanced EC image is now %r', self.enhanced_ec)
-
- # Send command to interpreter as well.
- self.cmd_pipe.send(b'enhanced ' + str(self.enhanced_ec).encode('ascii'))
- else:
- self.PrintOOBMHelp()
-
- else:
- self.PrintOOBMHelp()
+ return byte >= ord(" ") and byte <= ord("~")
- def PrintOOBMHelp(self):
- """Prints out the OOBM help."""
- # Print help syntax.
- os.write(self.controller_pty, b'\r\n' + b'Known OOBM commands:\r\n')
- os.write(self.controller_pty, b' interrogate <never | always | auto> '
- b'[enhanced]\r\n')
- os.write(self.controller_pty, b' loglevel <int>\r\n')
- def CheckBufferForEnhancedImage(self, data):
- """Adds data to a look buffer and checks to see for enhanced EC image.
-
- The EC's console task prints a string upon initialization which says that
- "Console is enabled; type HELP for help.". The enhanced EC images print a
- different string as a part of their init. This function searches through a
- "look" buffer, scanning for the presence of either of those strings and
- updating the enhanced_ec state accordingly.
+def StartLoop(console, command_active, shutdown_pipe=None):
+ """Starts the infinite loop of console processing.
Args:
- data: A string containing the data sent from the interpreter.
+ console: A Console object that has been properly initialzed.
+ command_active: ctypes data object or multiprocessing.Value indicating if
+ servod owns the console, or user owns the console. This prevents input
+ collisions.
+ shutdown_pipe: A file object for a pipe or equivalent that becomes readable
+ (not blocked) to indicate that the loop should exit. Can be None to never
+ exit the loop.
"""
- self.look_buffer += data
-
- # Search the buffer for any of the EC image strings.
- enhanced_match = re.search(ENHANCED_IMAGE_RE, self.look_buffer)
- non_enhanced_match = re.search(NON_ENHANCED_IMAGE_RE, self.look_buffer)
-
- # Update the state if any matches were found.
- if enhanced_match or non_enhanced_match:
- if enhanced_match:
- self.enhanced_ec = True
- elif non_enhanced_match:
- self.enhanced_ec = False
-
- # Inform the interpreter of the result.
- self.cmd_pipe.send(b'enhanced ' + str(self.enhanced_ec).encode('ascii'))
- self.logger.debug('Enhanced EC image? %r', self.enhanced_ec)
-
- # Clear look buffer since a match was found.
- self.look_buffer = b''
-
- # Move the sliding window.
- self.look_buffer = self.look_buffer[-LOOK_BUFFER_SIZE:]
-
-
-def CanonicalizeTimeString(timestr):
- """Canonicalize the timestamp string.
-
- Args:
- timestr: A timestamp string ended with 6 digits msec.
-
- Returns:
- A string with 3 digits msec and an extra space.
- """
- return timestr[:-3].encode('ascii') + b' '
-
-
-def IsPrintable(byte):
- """Determines if a byte is printable.
-
- Args:
- byte: An integer potentially representing a printable character.
-
- Returns:
- A boolean indicating whether the byte is a printable character.
- """
- return byte >= ord(' ') and byte <= ord('~')
-
-
-def StartLoop(console, command_active, shutdown_pipe=None):
- """Starts the infinite loop of console processing.
-
- Args:
- console: A Console object that has been properly initialzed.
- command_active: ctypes data object or multiprocessing.Value indicating if
- servod owns the console, or user owns the console. This prevents input
- collisions.
- shutdown_pipe: A file object for a pipe or equivalent that becomes readable
- (not blocked) to indicate that the loop should exit. Can be None to never
- exit the loop.
- """
- try:
- console.logger.debug('Console is being served on %s.', console.user_pty)
- console.logger.debug('Console controller is on %s.', console.controller_pty)
- console.logger.debug('Command interface is being served on %s.',
- console.interface_pty)
- console.logger.debug(console)
-
- # This checks for HUP to indicate if the user has connected to the pty.
- ep = select.epoll()
- ep.register(console.controller_pty, select.EPOLLHUP)
-
- # This is used instead of "break" to avoid exiting the loop in the middle of
- # an iteration.
- continue_looping = True
-
- # Used for determining when to print host timestamps
- tm_req = True
-
- while continue_looping:
- # Check to see if pts is connected to anything
- events = ep.poll(0)
- controller_connected = not events
-
- # Check to see if pipes or the console are ready for reading.
- read_list = [console.interface_pty,
- console.cmd_pipe, console.dbg_pipe]
- if controller_connected:
- read_list.append(console.controller_pty)
- if shutdown_pipe is not None:
- read_list.append(shutdown_pipe)
-
- # Check if any input is ready, or wait for .1 sec and re-poll if
- # a user has connected to the pts.
- select_output = select.select(read_list, [], [], .1)
- if not select_output:
- continue
- ready_for_reading = select_output[0]
-
- for obj in ready_for_reading:
- if obj is console.controller_pty:
- if not command_active.value:
- # Convert to bytes so we can look for non-printable chars such as
- # Ctrl+A, Ctrl+E, etc.
- try:
- line = bytearray(os.read(console.controller_pty, CONSOLE_MAX_READ))
- console.logger.debug('Input from user: %s, locked:%s',
- str(line).strip(), command_active.value)
- for i in line:
- try:
- # Handle each character as it arrives.
- console.HandleChar(i)
- except EOFError:
- console.logger.debug(
- 'ec3po console received EOF from dbg_pipe in HandleChar()'
- ' while reading console.controller_pty')
- continue_looping = False
- break
- except OSError:
- console.logger.debug('Ptm read failed, probably user disconnect.')
-
- elif obj is console.interface_pty:
- if command_active.value:
- # Convert to bytes so we can look for non-printable chars such as
- # Ctrl+A, Ctrl+E, etc.
- line = bytearray(os.read(console.interface_pty, CONSOLE_MAX_READ))
- console.logger.debug('Input from interface: %s, locked:%s',
- str(line).strip(), command_active.value)
- for i in line:
- try:
- # Handle each character as it arrives.
- console.HandleChar(i)
- except EOFError:
- console.logger.debug(
- 'ec3po console received EOF from dbg_pipe in HandleChar()'
- ' while reading console.interface_pty')
- continue_looping = False
- break
-
- elif obj is console.cmd_pipe:
- try:
- data = console.cmd_pipe.recv()
- except EOFError:
- console.logger.debug('ec3po console received EOF from cmd_pipe')
- continue_looping = False
- else:
- # Write it to the user console.
- if console.raw_debug:
- console.logger.debug('|CMD|-%s->%r',
- ('u' if controller_connected else '') +
- ('i' if command_active.value else ''),
- data.strip())
+ try:
+ console.logger.debug("Console is being served on %s.", console.user_pty)
+ console.logger.debug(
+ "Console controller is on %s.", console.controller_pty
+ )
+ console.logger.debug(
+ "Command interface is being served on %s.", console.interface_pty
+ )
+ console.logger.debug(console)
+
+ # This checks for HUP to indicate if the user has connected to the pty.
+ ep = select.epoll()
+ ep.register(console.controller_pty, select.EPOLLHUP)
+
+ # This is used instead of "break" to avoid exiting the loop in the middle of
+ # an iteration.
+ continue_looping = True
+
+ # Used for determining when to print host timestamps
+ tm_req = True
+
+ while continue_looping:
+ # Check to see if pts is connected to anything
+ events = ep.poll(0)
+ controller_connected = not events
+
+ # Check to see if pipes or the console are ready for reading.
+ read_list = [
+ console.interface_pty,
+ console.cmd_pipe,
+ console.dbg_pipe,
+ ]
if controller_connected:
- os.write(console.controller_pty, data)
- if command_active.value:
- os.write(console.interface_pty, data)
-
- elif obj is console.dbg_pipe:
- try:
- data = console.dbg_pipe.recv()
- except EOFError:
- console.logger.debug('ec3po console received EOF from dbg_pipe')
- continue_looping = False
- else:
- if console.interrogation_mode == b'auto':
- # Search look buffer for enhanced EC image string.
- console.CheckBufferForEnhancedImage(data)
- # Write it to the user console.
- if len(data) > 1 and console.raw_debug:
- console.logger.debug('|DBG|-%s->%r',
- ('u' if controller_connected else '') +
- ('i' if command_active.value else ''),
- data.strip())
- console.LogConsoleOutput(data)
- if controller_connected:
- end = len(data) - 1
- if console.timestamp_enabled:
- # A timestamp is required at the beginning of this line
- if tm_req is True:
- now = datetime.now()
- tm = CanonicalizeTimeString(now.strftime(HOST_STRFTIME))
- os.write(console.controller_pty, tm)
- tm_req = False
-
- # Insert timestamps into the middle where appropriate
- # except if the last character is a newline
- nls_found = data.count(b'\n', 0, end)
- now = datetime.now()
- tm = CanonicalizeTimeString(now.strftime('\n' + HOST_STRFTIME))
- data_tm = data.replace(b'\n', tm, nls_found)
- else:
- data_tm = data
-
- # timestamp required on next input
- if data[end] == b'\n'[0]:
- tm_req = True
- os.write(console.controller_pty, data_tm)
- if command_active.value:
- os.write(console.interface_pty, data)
-
- elif obj is shutdown_pipe:
- console.logger.debug(
- 'ec3po console received shutdown pipe unblocked notification')
- continue_looping = False
-
- while not console.oobm_queue.empty():
- console.logger.debug('OOBM queue ready for reading.')
- console.ProcessOOBMQueue()
-
- except KeyboardInterrupt:
- pass
-
- finally:
- ep.unregister(console.controller_pty)
- console.dbg_pipe.close()
- console.cmd_pipe.close()
- os.close(console.controller_pty)
- os.close(console.interface_pty)
- if shutdown_pipe is not None:
- shutdown_pipe.close()
- console.logger.debug('Exit ec3po console loop for %s', console.user_pty)
+ read_list.append(console.controller_pty)
+ if shutdown_pipe is not None:
+ read_list.append(shutdown_pipe)
+
+ # Check if any input is ready, or wait for .1 sec and re-poll if
+ # a user has connected to the pts.
+ select_output = select.select(read_list, [], [], 0.1)
+ if not select_output:
+ continue
+ ready_for_reading = select_output[0]
+
+ for obj in ready_for_reading:
+ if obj is console.controller_pty:
+ if not command_active.value:
+ # Convert to bytes so we can look for non-printable chars such as
+ # Ctrl+A, Ctrl+E, etc.
+ try:
+ line = bytearray(
+ os.read(
+ console.controller_pty, CONSOLE_MAX_READ
+ )
+ )
+ console.logger.debug(
+ "Input from user: %s, locked:%s",
+ str(line).strip(),
+ command_active.value,
+ )
+ for i in line:
+ try:
+ # Handle each character as it arrives.
+ console.HandleChar(i)
+ except EOFError:
+ console.logger.debug(
+ "ec3po console received EOF from dbg_pipe in HandleChar()"
+ " while reading console.controller_pty"
+ )
+ continue_looping = False
+ break
+ except OSError:
+ console.logger.debug(
+ "Ptm read failed, probably user disconnect."
+ )
+
+ elif obj is console.interface_pty:
+ if command_active.value:
+ # Convert to bytes so we can look for non-printable chars such as
+ # Ctrl+A, Ctrl+E, etc.
+ line = bytearray(
+ os.read(console.interface_pty, CONSOLE_MAX_READ)
+ )
+ console.logger.debug(
+ "Input from interface: %s, locked:%s",
+ str(line).strip(),
+ command_active.value,
+ )
+ for i in line:
+ try:
+ # Handle each character as it arrives.
+ console.HandleChar(i)
+ except EOFError:
+ console.logger.debug(
+ "ec3po console received EOF from dbg_pipe in HandleChar()"
+ " while reading console.interface_pty"
+ )
+ continue_looping = False
+ break
+
+ elif obj is console.cmd_pipe:
+ try:
+ data = console.cmd_pipe.recv()
+ except EOFError:
+ console.logger.debug(
+ "ec3po console received EOF from cmd_pipe"
+ )
+ continue_looping = False
+ else:
+ # Write it to the user console.
+ if console.raw_debug:
+ console.logger.debug(
+ "|CMD|-%s->%r",
+ ("u" if controller_connected else "")
+ + ("i" if command_active.value else ""),
+ data.strip(),
+ )
+ if controller_connected:
+ os.write(console.controller_pty, data)
+ if command_active.value:
+ os.write(console.interface_pty, data)
+
+ elif obj is console.dbg_pipe:
+ try:
+ data = console.dbg_pipe.recv()
+ except EOFError:
+ console.logger.debug(
+ "ec3po console received EOF from dbg_pipe"
+ )
+ continue_looping = False
+ else:
+ if console.interrogation_mode == b"auto":
+ # Search look buffer for enhanced EC image string.
+ console.CheckBufferForEnhancedImage(data)
+ # Write it to the user console.
+ if len(data) > 1 and console.raw_debug:
+ console.logger.debug(
+ "|DBG|-%s->%r",
+ ("u" if controller_connected else "")
+ + ("i" if command_active.value else ""),
+ data.strip(),
+ )
+ console.LogConsoleOutput(data)
+ if controller_connected:
+ end = len(data) - 1
+ if console.timestamp_enabled:
+ # A timestamp is required at the beginning of this line
+ if tm_req is True:
+ now = datetime.now()
+ tm = CanonicalizeTimeString(
+ now.strftime(HOST_STRFTIME)
+ )
+ os.write(console.controller_pty, tm)
+ tm_req = False
+
+ # Insert timestamps into the middle where appropriate
+ # except if the last character is a newline
+ nls_found = data.count(b"\n", 0, end)
+ now = datetime.now()
+ tm = CanonicalizeTimeString(
+ now.strftime("\n" + HOST_STRFTIME)
+ )
+ data_tm = data.replace(b"\n", tm, nls_found)
+ else:
+ data_tm = data
+
+ # timestamp required on next input
+ if data[end] == b"\n"[0]:
+ tm_req = True
+ os.write(console.controller_pty, data_tm)
+ if command_active.value:
+ os.write(console.interface_pty, data)
+
+ elif obj is shutdown_pipe:
+ console.logger.debug(
+ "ec3po console received shutdown pipe unblocked notification"
+ )
+ continue_looping = False
+
+ while not console.oobm_queue.empty():
+ console.logger.debug("OOBM queue ready for reading.")
+ console.ProcessOOBMQueue()
+
+ except KeyboardInterrupt:
+ pass
+
+ finally:
+ ep.unregister(console.controller_pty)
+ console.dbg_pipe.close()
+ console.cmd_pipe.close()
+ os.close(console.controller_pty)
+ os.close(console.interface_pty)
+ if shutdown_pipe is not None:
+ shutdown_pipe.close()
+ console.logger.debug("Exit ec3po console loop for %s", console.user_pty)
def main(argv):
- """Kicks off the EC-3PO interactive console interface and interpreter.
-
- We create some pipes to communicate with an interpreter, instantiate an
- interpreter, create a PTY pair, and begin serving the console interface.
-
- Args:
- argv: A list of strings containing the arguments this module was called
- with.
- """
- # Set up argument parser.
- parser = argparse.ArgumentParser(description=('Start interactive EC console '
- 'and interpreter.'))
- parser.add_argument('ec_uart_pty',
- help=('The full PTY name that the EC UART'
- ' is present on. eg: /dev/pts/12'))
- parser.add_argument('--log-level',
- default='info',
- help='info, debug, warning, error, or critical')
-
- # Parse arguments.
- opts = parser.parse_args(argv)
-
- # Set logging level.
- opts.log_level = opts.log_level.lower()
- if opts.log_level == 'info':
- log_level = logging.INFO
- elif opts.log_level == 'debug':
- log_level = logging.DEBUG
- elif opts.log_level == 'warning':
- log_level = logging.WARNING
- elif opts.log_level == 'error':
- log_level = logging.ERROR
- elif opts.log_level == 'critical':
- log_level = logging.CRITICAL
- else:
- parser.error('Invalid log level. (info, debug, warning, error, critical)')
-
- # Start logging with a timestamp, module, and log level shown in each log
- # entry.
- logging.basicConfig(level=log_level, format=('%(asctime)s - %(module)s -'
- ' %(levelname)s - %(message)s'))
-
- # Create some pipes to communicate between the interpreter and the console.
- # The command pipe is bidirectional.
- cmd_pipe_interactive, cmd_pipe_interp = threadproc_shim.Pipe()
- # The debug pipe is unidirectional from interpreter to console only.
- dbg_pipe_interactive, dbg_pipe_interp = threadproc_shim.Pipe(duplex=False)
-
- # Create an interpreter instance.
- itpr = interpreter.Interpreter(opts.ec_uart_pty, cmd_pipe_interp,
- dbg_pipe_interp, log_level)
-
- # Spawn an interpreter process.
- itpr_process = threadproc_shim.ThreadOrProcess(
- target=interpreter.StartLoop, args=(itpr,))
- # Make sure to kill the interpreter when we terminate.
- itpr_process.daemon = True
- # Start the interpreter.
- itpr_process.start()
-
- # Open a new pseudo-terminal pair
- (controller_pty, user_pty) = pty.openpty()
- # Set the permissions to 660.
- os.chmod(os.ttyname(user_pty), (stat.S_IRGRP | stat.S_IWGRP |
- stat.S_IRUSR | stat.S_IWUSR))
- # Create a console.
- console = Console(controller_pty, os.ttyname(user_pty), cmd_pipe_interactive,
- dbg_pipe_interactive)
- # Start serving the console.
- v = threadproc_shim.Value(ctypes.c_bool, False)
- StartLoop(console, v)
-
-
-if __name__ == '__main__':
- main(sys.argv[1:])
+ """Kicks off the EC-3PO interactive console interface and interpreter.
+
+ We create some pipes to communicate with an interpreter, instantiate an
+ interpreter, create a PTY pair, and begin serving the console interface.
+
+ Args:
+ argv: A list of strings containing the arguments this module was called
+ with.
+ """
+ # Set up argument parser.
+ parser = argparse.ArgumentParser(
+ description=("Start interactive EC console and interpreter.")
+ )
+ parser.add_argument(
+ "ec_uart_pty",
+ help=(
+ "The full PTY name that the EC UART is present on. eg: /dev/pts/12"
+ ),
+ )
+ parser.add_argument(
+ "--log-level",
+ default="info",
+ help="info, debug, warning, error, or critical",
+ )
+
+ # Parse arguments.
+ opts = parser.parse_args(argv)
+
+ # Set logging level.
+ opts.log_level = opts.log_level.lower()
+ if opts.log_level == "info":
+ log_level = logging.INFO
+ elif opts.log_level == "debug":
+ log_level = logging.DEBUG
+ elif opts.log_level == "warning":
+ log_level = logging.WARNING
+ elif opts.log_level == "error":
+ log_level = logging.ERROR
+ elif opts.log_level == "critical":
+ log_level = logging.CRITICAL
+ else:
+ parser.error(
+ "Invalid log level. (info, debug, warning, error, critical)"
+ )
+
+ # Start logging with a timestamp, module, and log level shown in each log
+ # entry.
+ logging.basicConfig(
+ level=log_level,
+ format=("%(asctime)s - %(module)s - %(levelname)s - %(message)s"),
+ )
+
+ # Create some pipes to communicate between the interpreter and the console.
+ # The command pipe is bidirectional.
+ cmd_pipe_interactive, cmd_pipe_interp = threadproc_shim.Pipe()
+ # The debug pipe is unidirectional from interpreter to console only.
+ dbg_pipe_interactive, dbg_pipe_interp = threadproc_shim.Pipe(duplex=False)
+
+ # Create an interpreter instance.
+ itpr = interpreter.Interpreter(
+ opts.ec_uart_pty, cmd_pipe_interp, dbg_pipe_interp, log_level
+ )
+
+ # Spawn an interpreter process.
+ itpr_process = threadproc_shim.ThreadOrProcess(
+ target=interpreter.StartLoop, args=(itpr,)
+ )
+ # Make sure to kill the interpreter when we terminate.
+ itpr_process.daemon = True
+ # Start the interpreter.
+ itpr_process.start()
+
+ # Open a new pseudo-terminal pair
+ (controller_pty, user_pty) = pty.openpty()
+ # Set the permissions to 660.
+ os.chmod(
+ os.ttyname(user_pty),
+ (stat.S_IRGRP | stat.S_IWGRP | stat.S_IRUSR | stat.S_IWUSR),
+ )
+ # Create a console.
+ console = Console(
+ controller_pty,
+ os.ttyname(user_pty),
+ os.ttyname(controller_pty),
+ cmd_pipe_interactive,
+ dbg_pipe_interactive,
+ )
+ # Start serving the console.
+ v = threadproc_shim.Value(ctypes.c_bool, False)
+ StartLoop(console, v)
+
+
+if __name__ == "__main__":
+ main(sys.argv[1:])