summaryrefslogtreecommitdiff
path: root/util/ec3po
diff options
context:
space:
mode:
authorJeremy Bettis <jbettis@google.com>2022-07-08 10:58:19 -0600
committerChromeos LUCI <chromeos-scoped@luci-project-accounts.iam.gserviceaccount.com>2022-07-12 19:13:33 +0000
commit7540e7b47b55447475bb8191fb3520dd67cf7998 (patch)
tree13309dbcf1db48e60fa2c2e5aed79f63bce00b5e /util/ec3po
parent7c114b8e1a3bb29991da70b9de394ac5d4f6c909 (diff)
downloadchrome-ec-7540e7b47b55447475bb8191fb3520dd67cf7998.tar.gz
ec: Format all python files with black and isort
find . \( -path ./private -prune \) -o -name '*.py' -print | xargs black find . \( -path ./private -prune \) -o -name '*.py' -print | xargs ~/chromiumos/chromite/scripts/isort --settings-file=.isort.cfg BRANCH=None BUG=b:238434058 TEST=None Signed-off-by: Jeremy Bettis <jbettis@google.com> Change-Id: I63462d6f15d1eaf3db84eb20d1404ee976be8382 Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/ec/+/3749242 Commit-Queue: Jeremy Bettis <jbettis@chromium.org> Reviewed-by: Tom Hughes <tomhughes@chromium.org> Tested-by: Jeremy Bettis <jbettis@chromium.org> Commit-Queue: Jack Rosenthal <jrosenth@chromium.org> Auto-Submit: Jeremy Bettis <jbettis@chromium.org> Reviewed-by: Jack Rosenthal <jrosenth@chromium.org>
Diffstat (limited to 'util/ec3po')
-rwxr-xr-xutil/ec3po/console.py2231
-rwxr-xr-xutil/ec3po/console_unittest.py2954
-rw-r--r--util/ec3po/interpreter.py818
-rwxr-xr-xutil/ec3po/interpreter_unittest.py728
-rw-r--r--util/ec3po/threadproc_shim.py31
5 files changed, 3450 insertions, 3312 deletions
diff --git a/util/ec3po/console.py b/util/ec3po/console.py
index e71216e3f2..33fa5a6775 100755
--- a/util/ec3po/console.py
+++ b/util/ec3po/console.py
@@ -17,7 +17,6 @@ from __future__ import print_function
import argparse
import binascii
import ctypes
-from datetime import datetime
import logging
import os
import pty
@@ -26,26 +25,25 @@ import select
import stat
import sys
import traceback
+from datetime import datetime
import six
+from ec3po import interpreter, threadproc_shim
-from ec3po import interpreter
-from ec3po import threadproc_shim
-
-
-PROMPT = b'> '
+PROMPT = b"> "
CONSOLE_INPUT_LINE_SIZE = 80 # Taken from the CONFIG_* with the same name.
CONSOLE_MAX_READ = 100 # Max bytes to read at a time from the user.
LOOK_BUFFER_SIZE = 256 # Size of search window when looking for the enhanced EC
- # image string.
+# image string.
# In console_init(), the EC will print a string saying that the EC console is
# enabled. Enhanced images will print a slightly different string. These
# regular expressions are used to determine at reboot whether the EC image is
# enhanced or not.
-ENHANCED_IMAGE_RE = re.compile(br'Enhanced Console is enabled '
- br'\(v([0-9]+\.[0-9]+\.[0-9]+)\)')
-NON_ENHANCED_IMAGE_RE = re.compile(br'Console is enabled; ')
+ENHANCED_IMAGE_RE = re.compile(
+ rb"Enhanced Console is enabled " rb"\(v([0-9]+\.[0-9]+\.[0-9]+)\)"
+)
+NON_ENHANCED_IMAGE_RE = re.compile(rb"Console is enabled; ")
# The timeouts are really only useful for enhanced EC images, but otherwise just
# serve as a delay for non-enhanced EC images. Therefore, we can keep this
@@ -54,1118 +52,1173 @@ NON_ENHANCED_IMAGE_RE = re.compile(br'Console is enabled; ')
# EC image, we can increase the timeout for stability just in case it takes a
# bit longer to receive an ACK for some reason.
NON_ENHANCED_EC_INTERROGATION_TIMEOUT = 0.3 # Maximum number of seconds to wait
- # for a response to an
- # interrogation of a non-enhanced
- # EC image.
+# for a response to an
+# interrogation of a non-enhanced
+# EC image.
ENHANCED_EC_INTERROGATION_TIMEOUT = 1.0 # Maximum number of seconds to wait for
- # a response to an interrogation of an
- # enhanced EC image.
+# a response to an interrogation of an
+# enhanced EC image.
# List of modes which control when interrogations are performed with the EC.
-INTERROGATION_MODES = [b'never', b'always', b'auto']
+INTERROGATION_MODES = [b"never", b"always", b"auto"]
# Format for printing host timestamp
-HOST_STRFTIME="%y-%m-%d %H:%M:%S.%f"
+HOST_STRFTIME = "%y-%m-%d %H:%M:%S.%f"
class EscState(object):
- """Class which contains an enumeration for states of ESC sequences."""
- ESC_START = 1
- ESC_BRACKET = 2
- ESC_BRACKET_1 = 3
- ESC_BRACKET_3 = 4
- ESC_BRACKET_8 = 5
-
+ """Class which contains an enumeration for states of ESC sequences."""
-class ControlKey(object):
- """Class which contains codes for various control keys."""
- BACKSPACE = 0x08
- CTRL_A = 0x01
- CTRL_B = 0x02
- CTRL_D = 0x04
- CTRL_E = 0x05
- CTRL_F = 0x06
- CTRL_K = 0x0b
- CTRL_N = 0xe
- CTRL_P = 0x10
- CARRIAGE_RETURN = 0x0d
- ESC = 0x1b
+ ESC_START = 1
+ ESC_BRACKET = 2
+ ESC_BRACKET_1 = 3
+ ESC_BRACKET_3 = 4
+ ESC_BRACKET_8 = 5
-class Console(object):
- """Class which provides the console interface between the EC and the user.
-
- This class essentially represents the console interface between the user and
- the EC. It handles all of the console editing behaviour
-
- Attributes:
- logger: A logger for this module.
- controller_pty: File descriptor to the controller side of the PTY. Used for
- driving output to the user and receiving user input.
- user_pty: A string representing the PTY name of the served console.
- cmd_pipe: A socket.socket or multiprocessing.Connection object which
- represents the console side of the command pipe. This must be a
- bidirectional pipe. Console commands and responses utilize this pipe.
- dbg_pipe: A socket.socket or multiprocessing.Connection object which
- represents the console's read-only side of the debug pipe. This must be a
- unidirectional pipe attached to the intepreter. EC debug messages use
- this pipe.
- oobm_queue: A queue.Queue or multiprocessing.Queue which is used for out of
- band management for the interactive console.
- input_buffer: A string representing the current input command.
- input_buffer_pos: An integer representing the current position in the buffer
- to insert a char.
- partial_cmd: A string representing the command entered on a line before
- pressing the up arrow keys.
- esc_state: An integer represeting the current state within an escape
- sequence.
- line_limit: An integer representing the maximum number of characters on a
- line.
- history: A list of strings containing the past entered console commands.
- history_pos: An integer representing the current history buffer position.
- This index is used to show previous commands.
- prompt: A string representing the console prompt displayed to the user.
- enhanced_ec: A boolean indicating if the EC image that we are currently
- communicating with is enhanced or not. Enhanced EC images will support
- packed commands and host commands over the UART. This defaults to False
- until we perform some handshaking.
- interrogation_timeout: A float representing the current maximum seconds to
- wait for a response to an interrogation.
- receiving_oobm_cmd: A boolean indicating whether or not the console is in
- the middle of receiving an out of band command.
- pending_oobm_cmd: A string containing the pending OOBM command.
- interrogation_mode: A string containing the current mode of whether
- interrogations are performed with the EC or not and how often.
- raw_debug: Flag to indicate whether per interrupt data should be logged to
- debug
- output_line_log_buffer: buffer for lines coming from the EC to log to debug
- """
-
- def __init__(self, controller_pty, user_pty, interface_pty, cmd_pipe, dbg_pipe,
- name=None):
- """Initalises a Console object with the provided arguments.
+class ControlKey(object):
+ """Class which contains codes for various control keys."""
- Args:
- controller_pty: File descriptor to the controller side of the PTY. Used for
- driving output to the user and receiving user input.
- user_pty: A string representing the PTY name of the served console.
- interface_pty: A string representing the PTY name of the served command
- interface.
- cmd_pipe: A socket.socket or multiprocessing.Connection object which
- represents the console side of the command pipe. This must be a
- bidirectional pipe. Console commands and responses utilize this pipe.
- dbg_pipe: A socket.socket or multiprocessing.Connection object which
- represents the console's read-only side of the debug pipe. This must be a
- unidirectional pipe attached to the intepreter. EC debug messages use
- this pipe.
- name: the console source name
- """
- # Create a unique logger based on the console name
- console_prefix = ('%s - ' % name) if name else ''
- logger = logging.getLogger('%sEC3PO.Console' % console_prefix)
- self.logger = interpreter.LoggerAdapter(logger, {'pty': user_pty})
- self.controller_pty = controller_pty
- self.user_pty = user_pty
- self.interface_pty = interface_pty
- self.cmd_pipe = cmd_pipe
- self.dbg_pipe = dbg_pipe
- self.oobm_queue = threadproc_shim.Queue()
- self.input_buffer = b''
- self.input_buffer_pos = 0
- self.partial_cmd = b''
- self.esc_state = 0
- self.line_limit = CONSOLE_INPUT_LINE_SIZE
- self.history = []
- self.history_pos = 0
- self.prompt = PROMPT
- self.enhanced_ec = False
- self.interrogation_timeout = NON_ENHANCED_EC_INTERROGATION_TIMEOUT
- self.receiving_oobm_cmd = False
- self.pending_oobm_cmd = b''
- self.interrogation_mode = b'auto'
- self.timestamp_enabled = True
- self.look_buffer = b''
- self.raw_debug = False
- self.output_line_log_buffer = []
-
- def __str__(self):
- """Show internal state of Console object as a string."""
- string = []
- string.append('controller_pty: %s' % self.controller_pty)
- string.append('user_pty: %s' % self.user_pty)
- string.append('interface_pty: %s' % self.interface_pty)
- string.append('cmd_pipe: %s' % self.cmd_pipe)
- string.append('dbg_pipe: %s' % self.dbg_pipe)
- string.append('oobm_queue: %s' % self.oobm_queue)
- string.append('input_buffer: %s' % self.input_buffer)
- string.append('input_buffer_pos: %d' % self.input_buffer_pos)
- string.append('esc_state: %d' % self.esc_state)
- string.append('line_limit: %d' % self.line_limit)
- string.append('history: %r' % self.history)
- string.append('history_pos: %d' % self.history_pos)
- string.append('prompt: %r' % self.prompt)
- string.append('partial_cmd: %r'% self.partial_cmd)
- string.append('interrogation_mode: %r' % self.interrogation_mode)
- string.append('look_buffer: %r' % self.look_buffer)
- return '\n'.join(string)
-
- def LogConsoleOutput(self, data):
- """Log to debug user MCU output to controller_pty when line is filled.
-
- The logging also suppresses the Cr50 spinner lines by removing characters
- when it sees backspaces.
+ BACKSPACE = 0x08
+ CTRL_A = 0x01
+ CTRL_B = 0x02
+ CTRL_D = 0x04
+ CTRL_E = 0x05
+ CTRL_F = 0x06
+ CTRL_K = 0x0B
+ CTRL_N = 0xE
+ CTRL_P = 0x10
+ CARRIAGE_RETURN = 0x0D
+ ESC = 0x1B
- Args:
- data: bytes - string received from MCU
- """
- data = list(data)
- # For compatibility with python2 and python3, standardize on the data
- # being a list of integers. This requires one more transformation in py2
- if not isinstance(data[0], int):
- data = [ord(c) for c in data]
-
- # This is a list of already filtered characters (or placeholders).
- line = self.output_line_log_buffer
-
- # TODO(b/177480273): use raw strings here
- symbols = {
- ord(b'\n'): u'\\n',
- ord(b'\r'): u'\\r',
- ord(b'\t'): u'\\t'
- }
- # self.logger.debug(u'%s + %r', u''.join(line), ''.join(data))
- while data:
- # Recall, data is a list of integers, namely the byte values sent by
- # the MCU.
- byte = data.pop(0)
- # This means that |byte| is an int.
- if byte == ord('\n'):
- line.append(symbols[byte])
- if line:
- self.logger.debug(u'%s', ''.join(line))
- line = []
- elif byte == ord('\b'):
- # Backspace: trim the last character off the buffer
- if line:
- line.pop(-1)
- elif byte in symbols:
- line.append(symbols[byte])
- elif byte < ord(' ') or byte > ord('~'):
- # Turn any character that isn't printable ASCII into escaped hex.
- # ' ' is chr(20), and 0-19 are unprintable control characters.
- # '~' is chr(126), and 127 is DELETE. 128-255 are control and Latin-1.
- line.append(u'\\x%02x' % byte)
- else:
- # byte is printable. Thus it is safe to use chr() to get the printable
- # character out of it again.
- line.append(u'%s' % chr(byte))
- self.output_line_log_buffer = line
-
- def PrintHistory(self):
- """Print the history of entered commands."""
- fd = self.controller_pty
- # Make it pretty by figuring out how wide to pad the numbers.
- wide = (len(self.history) // 10) + 1
- for i in range(len(self.history)):
- line = b' %*d %s\r\n' % (wide, i, self.history[i])
- os.write(fd, line)
-
- def ShowPreviousCommand(self):
- """Shows the previous command from the history list."""
- # There's nothing to do if there's no history at all.
- if not self.history:
- self.logger.debug('No history to print.')
- return
-
- # Don't do anything if there's no more history to show.
- if self.history_pos == 0:
- self.logger.debug('No more history to show.')
- return
-
- self.logger.debug('current history position: %d.', self.history_pos)
-
- # Decrement the history buffer position.
- self.history_pos -= 1
- self.logger.debug('new history position.: %d', self.history_pos)
-
- # Save the text entered on the console if any.
- if self.history_pos == len(self.history)-1:
- self.logger.debug('saving partial_cmd: %r', self.input_buffer)
- self.partial_cmd = self.input_buffer
-
- # Backspace the line.
- for _ in range(self.input_buffer_pos):
- self.SendBackspace()
-
- # Print the last entry in the history buffer.
- self.logger.debug('printing previous entry %d - %s', self.history_pos,
- self.history[self.history_pos])
- fd = self.controller_pty
- prev_cmd = self.history[self.history_pos]
- os.write(fd, prev_cmd)
- # Update the input buffer.
- self.input_buffer = prev_cmd
- self.input_buffer_pos = len(prev_cmd)
-
- def ShowNextCommand(self):
- """Shows the next command from the history list."""
- # Don't do anything if there's no history at all.
- if not self.history:
- self.logger.debug('History buffer is empty.')
- return
-
- fd = self.controller_pty
-
- self.logger.debug('current history position: %d', self.history_pos)
- # Increment the history position.
- self.history_pos += 1
-
- # Restore the partial cmd.
- if self.history_pos == len(self.history):
- self.logger.debug('Restoring partial command of %r', self.partial_cmd)
- # Backspace the line.
- for _ in range(self.input_buffer_pos):
- self.SendBackspace()
- # Print the partially entered command if any.
- os.write(fd, self.partial_cmd)
- self.input_buffer = self.partial_cmd
- self.input_buffer_pos = len(self.input_buffer)
- # Now that we've printed it, clear the partial cmd storage.
- self.partial_cmd = b''
- # Reset history position.
- self.history_pos = len(self.history)
- return
-
- self.logger.debug('new history position: %d', self.history_pos)
- if self.history_pos > len(self.history)-1:
- self.logger.debug('No more history to show.')
- self.history_pos -= 1
- self.logger.debug('Reset history position to %d', self.history_pos)
- return
-
- # Backspace the line.
- for _ in range(self.input_buffer_pos):
- self.SendBackspace()
-
- # Print the newer entry from the history buffer.
- self.logger.debug('printing next entry %d - %s', self.history_pos,
- self.history[self.history_pos])
- next_cmd = self.history[self.history_pos]
- os.write(fd, next_cmd)
- # Update the input buffer.
- self.input_buffer = next_cmd
- self.input_buffer_pos = len(next_cmd)
- self.logger.debug('new history position: %d.', self.history_pos)
-
- def SliceOutChar(self):
- """Remove a char from the line and shift everything over 1 column."""
- fd = self.controller_pty
- # Remove the character at the input_buffer_pos by slicing it out.
- self.input_buffer = self.input_buffer[0:self.input_buffer_pos] + \
- self.input_buffer[self.input_buffer_pos+1:]
- # Write the rest of the line
- moved_col = os.write(fd, self.input_buffer[self.input_buffer_pos:])
- # Write a space to clear out the last char
- moved_col += os.write(fd, b' ')
- # Update the input buffer position.
- self.input_buffer_pos += moved_col
- # Reset the cursor
- self.MoveCursor('left', moved_col)
-
- def HandleEsc(self, byte):
- """HandleEsc processes escape sequences.
- Args:
- byte: An integer representing the current byte in the sequence.
+class Console(object):
+ """Class which provides the console interface between the EC and the user.
+
+ This class essentially represents the console interface between the user and
+ the EC. It handles all of the console editing behaviour
+
+ Attributes:
+ logger: A logger for this module.
+ controller_pty: File descriptor to the controller side of the PTY. Used for
+ driving output to the user and receiving user input.
+ user_pty: A string representing the PTY name of the served console.
+ cmd_pipe: A socket.socket or multiprocessing.Connection object which
+ represents the console side of the command pipe. This must be a
+ bidirectional pipe. Console commands and responses utilize this pipe.
+ dbg_pipe: A socket.socket or multiprocessing.Connection object which
+ represents the console's read-only side of the debug pipe. This must be a
+ unidirectional pipe attached to the intepreter. EC debug messages use
+ this pipe.
+ oobm_queue: A queue.Queue or multiprocessing.Queue which is used for out of
+ band management for the interactive console.
+ input_buffer: A string representing the current input command.
+ input_buffer_pos: An integer representing the current position in the buffer
+ to insert a char.
+ partial_cmd: A string representing the command entered on a line before
+ pressing the up arrow keys.
+ esc_state: An integer represeting the current state within an escape
+ sequence.
+ line_limit: An integer representing the maximum number of characters on a
+ line.
+ history: A list of strings containing the past entered console commands.
+ history_pos: An integer representing the current history buffer position.
+ This index is used to show previous commands.
+ prompt: A string representing the console prompt displayed to the user.
+ enhanced_ec: A boolean indicating if the EC image that we are currently
+ communicating with is enhanced or not. Enhanced EC images will support
+ packed commands and host commands over the UART. This defaults to False
+ until we perform some handshaking.
+ interrogation_timeout: A float representing the current maximum seconds to
+ wait for a response to an interrogation.
+ receiving_oobm_cmd: A boolean indicating whether or not the console is in
+ the middle of receiving an out of band command.
+ pending_oobm_cmd: A string containing the pending OOBM command.
+ interrogation_mode: A string containing the current mode of whether
+ interrogations are performed with the EC or not and how often.
+ raw_debug: Flag to indicate whether per interrupt data should be logged to
+ debug
+ output_line_log_buffer: buffer for lines coming from the EC to log to debug
"""
- # We shouldn't be handling an escape sequence if we haven't seen one.
- assert self.esc_state != 0
-
- if self.esc_state is EscState.ESC_START:
- self.logger.debug('ESC_START')
- if byte == ord('['):
- self.esc_state = EscState.ESC_BRACKET
- return
- else:
- self.logger.error('Unexpected sequence. %c', byte)
- self.esc_state = 0
-
- elif self.esc_state is EscState.ESC_BRACKET:
- self.logger.debug('ESC_BRACKET')
- # Left Arrow key was pressed.
- if byte == ord('D'):
- self.logger.debug('Left arrow key pressed.')
- self.MoveCursor('left', 1)
- self.esc_state = 0 # Reset the state.
- return
-
- # Right Arrow key.
- elif byte == ord('C'):
- self.logger.debug('Right arrow key pressed.')
- self.MoveCursor('right', 1)
- self.esc_state = 0 # Reset the state.
- return
-
- # Up Arrow key.
- elif byte == ord('A'):
- self.logger.debug('Up arrow key pressed.')
- self.ShowPreviousCommand()
- # Reset the state.
- self.esc_state = 0 # Reset the state.
- return
-
- # Down Arrow key.
- elif byte == ord('B'):
- self.logger.debug('Down arrow key pressed.')
- self.ShowNextCommand()
- # Reset the state.
- self.esc_state = 0 # Reset the state.
- return
-
- # For some reason, minicom sends a 1 instead of 7. /shrug
- # TODO(aaboagye): Figure out why this happens.
- elif byte == ord('1') or byte == ord('7'):
- self.esc_state = EscState.ESC_BRACKET_1
-
- elif byte == ord('3'):
- self.esc_state = EscState.ESC_BRACKET_3
-
- elif byte == ord('8'):
- self.esc_state = EscState.ESC_BRACKET_8
-
- else:
- self.logger.error(r'Bad or unhandled escape sequence. got ^[%c\(%d)',
- chr(byte), byte)
- self.esc_state = 0
- return
-
- elif self.esc_state is EscState.ESC_BRACKET_1:
- self.logger.debug('ESC_BRACKET_1')
- # HOME key.
- if byte == ord('~'):
- self.logger.debug('Home key pressed.')
- self.MoveCursor('left', self.input_buffer_pos)
- self.esc_state = 0 # Reset the state.
- self.logger.debug('ESC sequence complete.')
- return
-
- elif self.esc_state is EscState.ESC_BRACKET_3:
- self.logger.debug('ESC_BRACKET_3')
- # DEL key.
- if byte == ord('~'):
- self.logger.debug('Delete key pressed.')
- if self.input_buffer_pos != len(self.input_buffer):
- self.SliceOutChar()
- self.esc_state = 0 # Reset the state.
-
- elif self.esc_state is EscState.ESC_BRACKET_8:
- self.logger.debug('ESC_BRACKET_8')
- # END key.
- if byte == ord('~'):
- self.logger.debug('End key pressed.')
- self.MoveCursor('right',
- len(self.input_buffer) - self.input_buffer_pos)
- self.esc_state = 0 # Reset the state.
- self.logger.debug('ESC sequence complete.')
- return
-
- else:
- self.logger.error('Unexpected sequence. %c', byte)
+ def __init__(
+ self, controller_pty, user_pty, interface_pty, cmd_pipe, dbg_pipe, name=None
+ ):
+ """Initalises a Console object with the provided arguments.
+
+ Args:
+ controller_pty: File descriptor to the controller side of the PTY. Used for
+ driving output to the user and receiving user input.
+ user_pty: A string representing the PTY name of the served console.
+ interface_pty: A string representing the PTY name of the served command
+ interface.
+ cmd_pipe: A socket.socket or multiprocessing.Connection object which
+ represents the console side of the command pipe. This must be a
+ bidirectional pipe. Console commands and responses utilize this pipe.
+ dbg_pipe: A socket.socket or multiprocessing.Connection object which
+ represents the console's read-only side of the debug pipe. This must be a
+ unidirectional pipe attached to the intepreter. EC debug messages use
+ this pipe.
+ name: the console source name
+ """
+ # Create a unique logger based on the console name
+ console_prefix = ("%s - " % name) if name else ""
+ logger = logging.getLogger("%sEC3PO.Console" % console_prefix)
+ self.logger = interpreter.LoggerAdapter(logger, {"pty": user_pty})
+ self.controller_pty = controller_pty
+ self.user_pty = user_pty
+ self.interface_pty = interface_pty
+ self.cmd_pipe = cmd_pipe
+ self.dbg_pipe = dbg_pipe
+ self.oobm_queue = threadproc_shim.Queue()
+ self.input_buffer = b""
+ self.input_buffer_pos = 0
+ self.partial_cmd = b""
self.esc_state = 0
+ self.line_limit = CONSOLE_INPUT_LINE_SIZE
+ self.history = []
+ self.history_pos = 0
+ self.prompt = PROMPT
+ self.enhanced_ec = False
+ self.interrogation_timeout = NON_ENHANCED_EC_INTERROGATION_TIMEOUT
+ self.receiving_oobm_cmd = False
+ self.pending_oobm_cmd = b""
+ self.interrogation_mode = b"auto"
+ self.timestamp_enabled = True
+ self.look_buffer = b""
+ self.raw_debug = False
+ self.output_line_log_buffer = []
+
+ def __str__(self):
+ """Show internal state of Console object as a string."""
+ string = []
+ string.append("controller_pty: %s" % self.controller_pty)
+ string.append("user_pty: %s" % self.user_pty)
+ string.append("interface_pty: %s" % self.interface_pty)
+ string.append("cmd_pipe: %s" % self.cmd_pipe)
+ string.append("dbg_pipe: %s" % self.dbg_pipe)
+ string.append("oobm_queue: %s" % self.oobm_queue)
+ string.append("input_buffer: %s" % self.input_buffer)
+ string.append("input_buffer_pos: %d" % self.input_buffer_pos)
+ string.append("esc_state: %d" % self.esc_state)
+ string.append("line_limit: %d" % self.line_limit)
+ string.append("history: %r" % self.history)
+ string.append("history_pos: %d" % self.history_pos)
+ string.append("prompt: %r" % self.prompt)
+ string.append("partial_cmd: %r" % self.partial_cmd)
+ string.append("interrogation_mode: %r" % self.interrogation_mode)
+ string.append("look_buffer: %r" % self.look_buffer)
+ return "\n".join(string)
+
+ def LogConsoleOutput(self, data):
+ """Log to debug user MCU output to controller_pty when line is filled.
+
+ The logging also suppresses the Cr50 spinner lines by removing characters
+ when it sees backspaces.
+
+ Args:
+ data: bytes - string received from MCU
+ """
+ data = list(data)
+ # For compatibility with python2 and python3, standardize on the data
+ # being a list of integers. This requires one more transformation in py2
+ if not isinstance(data[0], int):
+ data = [ord(c) for c in data]
+
+ # This is a list of already filtered characters (or placeholders).
+ line = self.output_line_log_buffer
+
+ # TODO(b/177480273): use raw strings here
+ symbols = {ord(b"\n"): "\\n", ord(b"\r"): "\\r", ord(b"\t"): "\\t"}
+ # self.logger.debug(u'%s + %r', u''.join(line), ''.join(data))
+ while data:
+ # Recall, data is a list of integers, namely the byte values sent by
+ # the MCU.
+ byte = data.pop(0)
+ # This means that |byte| is an int.
+ if byte == ord("\n"):
+ line.append(symbols[byte])
+ if line:
+ self.logger.debug("%s", "".join(line))
+ line = []
+ elif byte == ord("\b"):
+ # Backspace: trim the last character off the buffer
+ if line:
+ line.pop(-1)
+ elif byte in symbols:
+ line.append(symbols[byte])
+ elif byte < ord(" ") or byte > ord("~"):
+ # Turn any character that isn't printable ASCII into escaped hex.
+ # ' ' is chr(20), and 0-19 are unprintable control characters.
+ # '~' is chr(126), and 127 is DELETE. 128-255 are control and Latin-1.
+ line.append("\\x%02x" % byte)
+ else:
+ # byte is printable. Thus it is safe to use chr() to get the printable
+ # character out of it again.
+ line.append("%s" % chr(byte))
+ self.output_line_log_buffer = line
+
+ def PrintHistory(self):
+ """Print the history of entered commands."""
+ fd = self.controller_pty
+ # Make it pretty by figuring out how wide to pad the numbers.
+ wide = (len(self.history) // 10) + 1
+ for i in range(len(self.history)):
+ line = b" %*d %s\r\n" % (wide, i, self.history[i])
+ os.write(fd, line)
+
+ def ShowPreviousCommand(self):
+ """Shows the previous command from the history list."""
+ # There's nothing to do if there's no history at all.
+ if not self.history:
+ self.logger.debug("No history to print.")
+ return
+
+ # Don't do anything if there's no more history to show.
+ if self.history_pos == 0:
+ self.logger.debug("No more history to show.")
+ return
+
+ self.logger.debug("current history position: %d.", self.history_pos)
+
+ # Decrement the history buffer position.
+ self.history_pos -= 1
+ self.logger.debug("new history position.: %d", self.history_pos)
+
+ # Save the text entered on the console if any.
+ if self.history_pos == len(self.history) - 1:
+ self.logger.debug("saving partial_cmd: %r", self.input_buffer)
+ self.partial_cmd = self.input_buffer
+
+ # Backspace the line.
+ for _ in range(self.input_buffer_pos):
+ self.SendBackspace()
+
+ # Print the last entry in the history buffer.
+ self.logger.debug(
+ "printing previous entry %d - %s",
+ self.history_pos,
+ self.history[self.history_pos],
+ )
+ fd = self.controller_pty
+ prev_cmd = self.history[self.history_pos]
+ os.write(fd, prev_cmd)
+ # Update the input buffer.
+ self.input_buffer = prev_cmd
+ self.input_buffer_pos = len(prev_cmd)
+
+ def ShowNextCommand(self):
+ """Shows the next command from the history list."""
+ # Don't do anything if there's no history at all.
+ if not self.history:
+ self.logger.debug("History buffer is empty.")
+ return
+
+ fd = self.controller_pty
+
+ self.logger.debug("current history position: %d", self.history_pos)
+ # Increment the history position.
+ self.history_pos += 1
+
+ # Restore the partial cmd.
+ if self.history_pos == len(self.history):
+ self.logger.debug("Restoring partial command of %r", self.partial_cmd)
+ # Backspace the line.
+ for _ in range(self.input_buffer_pos):
+ self.SendBackspace()
+ # Print the partially entered command if any.
+ os.write(fd, self.partial_cmd)
+ self.input_buffer = self.partial_cmd
+ self.input_buffer_pos = len(self.input_buffer)
+ # Now that we've printed it, clear the partial cmd storage.
+ self.partial_cmd = b""
+ # Reset history position.
+ self.history_pos = len(self.history)
+ return
+
+ self.logger.debug("new history position: %d", self.history_pos)
+ if self.history_pos > len(self.history) - 1:
+ self.logger.debug("No more history to show.")
+ self.history_pos -= 1
+ self.logger.debug("Reset history position to %d", self.history_pos)
+ return
+
+ # Backspace the line.
+ for _ in range(self.input_buffer_pos):
+ self.SendBackspace()
+
+ # Print the newer entry from the history buffer.
+ self.logger.debug(
+ "printing next entry %d - %s",
+ self.history_pos,
+ self.history[self.history_pos],
+ )
+ next_cmd = self.history[self.history_pos]
+ os.write(fd, next_cmd)
+ # Update the input buffer.
+ self.input_buffer = next_cmd
+ self.input_buffer_pos = len(next_cmd)
+ self.logger.debug("new history position: %d.", self.history_pos)
+
+ def SliceOutChar(self):
+ """Remove a char from the line and shift everything over 1 column."""
+ fd = self.controller_pty
+ # Remove the character at the input_buffer_pos by slicing it out.
+ self.input_buffer = (
+ self.input_buffer[0 : self.input_buffer_pos]
+ + self.input_buffer[self.input_buffer_pos + 1 :]
+ )
+ # Write the rest of the line
+ moved_col = os.write(fd, self.input_buffer[self.input_buffer_pos :])
+ # Write a space to clear out the last char
+ moved_col += os.write(fd, b" ")
+ # Update the input buffer position.
+ self.input_buffer_pos += moved_col
+ # Reset the cursor
+ self.MoveCursor("left", moved_col)
+
+ def HandleEsc(self, byte):
+ """HandleEsc processes escape sequences.
+
+ Args:
+ byte: An integer representing the current byte in the sequence.
+ """
+ # We shouldn't be handling an escape sequence if we haven't seen one.
+ assert self.esc_state != 0
+
+ if self.esc_state is EscState.ESC_START:
+ self.logger.debug("ESC_START")
+ if byte == ord("["):
+ self.esc_state = EscState.ESC_BRACKET
+ return
+
+ else:
+ self.logger.error("Unexpected sequence. %c", byte)
+ self.esc_state = 0
+
+ elif self.esc_state is EscState.ESC_BRACKET:
+ self.logger.debug("ESC_BRACKET")
+ # Left Arrow key was pressed.
+ if byte == ord("D"):
+ self.logger.debug("Left arrow key pressed.")
+ self.MoveCursor("left", 1)
+ self.esc_state = 0 # Reset the state.
+ return
+
+ # Right Arrow key.
+ elif byte == ord("C"):
+ self.logger.debug("Right arrow key pressed.")
+ self.MoveCursor("right", 1)
+ self.esc_state = 0 # Reset the state.
+ return
+
+ # Up Arrow key.
+ elif byte == ord("A"):
+ self.logger.debug("Up arrow key pressed.")
+ self.ShowPreviousCommand()
+ # Reset the state.
+ self.esc_state = 0 # Reset the state.
+ return
+
+ # Down Arrow key.
+ elif byte == ord("B"):
+ self.logger.debug("Down arrow key pressed.")
+ self.ShowNextCommand()
+ # Reset the state.
+ self.esc_state = 0 # Reset the state.
+ return
+
+ # For some reason, minicom sends a 1 instead of 7. /shrug
+ # TODO(aaboagye): Figure out why this happens.
+ elif byte == ord("1") or byte == ord("7"):
+ self.esc_state = EscState.ESC_BRACKET_1
+
+ elif byte == ord("3"):
+ self.esc_state = EscState.ESC_BRACKET_3
+
+ elif byte == ord("8"):
+ self.esc_state = EscState.ESC_BRACKET_8
+
+ else:
+ self.logger.error(
+ r"Bad or unhandled escape sequence. got ^[%c\(%d)", chr(byte), byte
+ )
+ self.esc_state = 0
+ return
+
+ elif self.esc_state is EscState.ESC_BRACKET_1:
+ self.logger.debug("ESC_BRACKET_1")
+ # HOME key.
+ if byte == ord("~"):
+ self.logger.debug("Home key pressed.")
+ self.MoveCursor("left", self.input_buffer_pos)
+ self.esc_state = 0 # Reset the state.
+ self.logger.debug("ESC sequence complete.")
+ return
+
+ elif self.esc_state is EscState.ESC_BRACKET_3:
+ self.logger.debug("ESC_BRACKET_3")
+ # DEL key.
+ if byte == ord("~"):
+ self.logger.debug("Delete key pressed.")
+ if self.input_buffer_pos != len(self.input_buffer):
+ self.SliceOutChar()
+ self.esc_state = 0 # Reset the state.
+
+ elif self.esc_state is EscState.ESC_BRACKET_8:
+ self.logger.debug("ESC_BRACKET_8")
+ # END key.
+ if byte == ord("~"):
+ self.logger.debug("End key pressed.")
+ self.MoveCursor("right", len(self.input_buffer) - self.input_buffer_pos)
+ self.esc_state = 0 # Reset the state.
+ self.logger.debug("ESC sequence complete.")
+ return
+
+ else:
+ self.logger.error("Unexpected sequence. %c", byte)
+ self.esc_state = 0
+
+ else:
+ self.logger.error("Unexpected sequence. %c", byte)
+ self.esc_state = 0
+
+ def ProcessInput(self):
+ """Captures the input determines what actions to take."""
+ # There's nothing to do if the input buffer is empty.
+ if len(self.input_buffer) == 0:
+ return
+
+ # Don't store 2 consecutive identical commands in the history.
+ if self.history and self.history[-1] != self.input_buffer or not self.history:
+ self.history.append(self.input_buffer)
+
+ # Split the command up by spaces.
+ line = self.input_buffer.split(b" ")
+ self.logger.debug("cmd: %s", self.input_buffer)
+ cmd = line[0].lower()
+
+ # The 'history' command is a special case that we handle locally.
+ if cmd == "history":
+ self.PrintHistory()
+ return
+
+ # Send the command to the interpreter.
+ self.logger.debug("Sending command to interpreter.")
+ self.cmd_pipe.send(self.input_buffer)
+
+ def CheckForEnhancedECImage(self):
+ """Performs an interrogation of the EC image.
+
+ Send a SYN and expect an ACK. If no ACK or the response is incorrect, then
+ assume that the current EC image that we are talking to is not enhanced.
+
+ Returns:
+ is_enhanced: A boolean indicating whether the EC responded to the
+ interrogation correctly.
+
+ Raises:
+ EOFError: Allowed to propagate through from self.dbg_pipe.recv().
+ """
+ # Send interrogation byte and wait for the response.
+ self.logger.debug("Performing interrogation.")
+ self.cmd_pipe.send(interpreter.EC_SYN)
+
+ response = ""
+ if self.dbg_pipe.poll(self.interrogation_timeout):
+ response = self.dbg_pipe.recv()
+ self.logger.debug("response: %r", binascii.hexlify(response))
+ else:
+ self.logger.debug("Timed out waiting for EC_ACK")
+
+ # Verify the acknowledgment.
+ is_enhanced = response == interpreter.EC_ACK
+
+ if is_enhanced:
+ # Increase the interrogation timeout for stability purposes.
+ self.interrogation_timeout = ENHANCED_EC_INTERROGATION_TIMEOUT
+ self.logger.debug(
+ "Increasing interrogation timeout to %rs.", self.interrogation_timeout
+ )
+ else:
+ # Reduce the timeout in order to reduce the perceivable delay.
+ self.interrogation_timeout = NON_ENHANCED_EC_INTERROGATION_TIMEOUT
+ self.logger.debug(
+ "Reducing interrogation timeout to %rs.", self.interrogation_timeout
+ )
+
+ return is_enhanced
+
+ def HandleChar(self, byte):
+ """HandleChar does a certain action when it receives a character.
+
+ Args:
+ byte: An integer representing the character received from the user.
+
+ Raises:
+ EOFError: Allowed to propagate through from self.CheckForEnhancedECImage()
+ i.e. from self.dbg_pipe.recv().
+ """
+ fd = self.controller_pty
+
+ # Enter the OOBM prompt mode if the user presses '%'.
+ if byte == ord("%"):
+ self.logger.debug("Begin OOBM command.")
+ self.receiving_oobm_cmd = True
+ # Print a "prompt".
+ os.write(self.controller_pty, b"\r\n% ")
+ return
+
+ # Add chars to the pending OOBM command if we're currently receiving one.
+ if self.receiving_oobm_cmd and byte != ControlKey.CARRIAGE_RETURN:
+ tmp_bytes = six.int2byte(byte)
+ self.pending_oobm_cmd += tmp_bytes
+ self.logger.debug("%s", tmp_bytes)
+ os.write(self.controller_pty, tmp_bytes)
+ return
+
+ if byte == ControlKey.CARRIAGE_RETURN:
+ if self.receiving_oobm_cmd:
+ # Terminate the command and place it in the OOBM queue.
+ self.logger.debug("End OOBM command.")
+ if self.pending_oobm_cmd:
+ self.oobm_queue.put(self.pending_oobm_cmd)
+ self.logger.debug(
+ "Placed %r into OOBM command queue.", self.pending_oobm_cmd
+ )
+
+ # Reset the state.
+ os.write(self.controller_pty, b"\r\n" + self.prompt)
+ self.input_buffer = b""
+ self.input_buffer_pos = 0
+ self.receiving_oobm_cmd = False
+ self.pending_oobm_cmd = b""
+ return
+
+ if self.interrogation_mode == b"never":
+ self.logger.debug(
+ "Skipping interrogation because interrogation mode"
+ " is set to never."
+ )
+ elif self.interrogation_mode == b"always":
+ # Only interrogate the EC if the interrogation mode is set to 'always'.
+ self.enhanced_ec = self.CheckForEnhancedECImage()
+ self.logger.debug("Enhanced EC image? %r", self.enhanced_ec)
+
+ if not self.enhanced_ec:
+ # Send everything straight to the EC to handle.
+ self.cmd_pipe.send(six.int2byte(byte))
+ # Reset the input buffer.
+ self.input_buffer = b""
+ self.input_buffer_pos = 0
+ self.logger.log(1, "Reset input buffer.")
+ return
+
+ # Keep handling the ESC sequence if we're in the middle of it.
+ if self.esc_state != 0:
+ self.HandleEsc(byte)
+ return
+
+ # When we're at the end of the line, we should only allow going backwards,
+ # backspace, carriage return, up, or down. The arrow keys are escape
+ # sequences, so we let the escape...escape.
+ if self.input_buffer_pos >= self.line_limit and byte not in [
+ ControlKey.CTRL_B,
+ ControlKey.ESC,
+ ControlKey.BACKSPACE,
+ ControlKey.CTRL_A,
+ ControlKey.CARRIAGE_RETURN,
+ ControlKey.CTRL_P,
+ ControlKey.CTRL_N,
+ ]:
+ return
+
+ # If the input buffer is full we can't accept new chars.
+ buffer_full = len(self.input_buffer) >= self.line_limit
+
+ # Carriage_Return/Enter
+ if byte == ControlKey.CARRIAGE_RETURN:
+ self.logger.debug("Enter key pressed.")
+ # Put a carriage return/newline and the print the prompt.
+ os.write(fd, b"\r\n")
+
+ # TODO(aaboagye): When we control the printing of all output, print the
+ # prompt AFTER printing all the output. We can't do it yet because we
+ # don't know how much is coming from the EC.
+
+ # Print the prompt.
+ os.write(fd, self.prompt)
+ # Process the input.
+ self.ProcessInput()
+ # Now, clear the buffer.
+ self.input_buffer = b""
+ self.input_buffer_pos = 0
+ # Reset history buffer pos.
+ self.history_pos = len(self.history)
+ # Clear partial command.
+ self.partial_cmd = b""
+
+ # Backspace
+ elif byte == ControlKey.BACKSPACE:
+ self.logger.debug("Backspace pressed.")
+ if self.input_buffer_pos > 0:
+ # Move left 1 column.
+ self.MoveCursor("left", 1)
+ # Remove the character at the input_buffer_pos by slicing it out.
+ self.SliceOutChar()
+
+ self.logger.debug("input_buffer_pos: %d", self.input_buffer_pos)
+
+ # Ctrl+A. Move cursor to beginning of the line
+ elif byte == ControlKey.CTRL_A:
+ self.logger.debug("Control+A pressed.")
+ self.MoveCursor("left", self.input_buffer_pos)
+
+ # Ctrl+B. Move cursor left 1 column.
+ elif byte == ControlKey.CTRL_B:
+ self.logger.debug("Control+B pressed.")
+ self.MoveCursor("left", 1)
+
+ # Ctrl+D. Delete a character.
+ elif byte == ControlKey.CTRL_D:
+ self.logger.debug("Control+D pressed.")
+ if self.input_buffer_pos != len(self.input_buffer):
+ # Remove the character by slicing it out.
+ self.SliceOutChar()
+
+ # Ctrl+E. Move cursor to end of the line.
+ elif byte == ControlKey.CTRL_E:
+ self.logger.debug("Control+E pressed.")
+ self.MoveCursor("right", len(self.input_buffer) - self.input_buffer_pos)
+
+ # Ctrl+F. Move cursor right 1 column.
+ elif byte == ControlKey.CTRL_F:
+ self.logger.debug("Control+F pressed.")
+ self.MoveCursor("right", 1)
+
+ # Ctrl+K. Kill line.
+ elif byte == ControlKey.CTRL_K:
+ self.logger.debug("Control+K pressed.")
+ self.KillLine()
+
+ # Ctrl+N. Next line.
+ elif byte == ControlKey.CTRL_N:
+ self.logger.debug("Control+N pressed.")
+ self.ShowNextCommand()
+
+ # Ctrl+P. Previous line.
+ elif byte == ControlKey.CTRL_P:
+ self.logger.debug("Control+P pressed.")
+ self.ShowPreviousCommand()
+
+ # ESC sequence
+ elif byte == ControlKey.ESC:
+ # Starting an ESC sequence
+ self.esc_state = EscState.ESC_START
+
+ # Only print printable chars.
+ elif IsPrintable(byte):
+ # Drop the character if we're full.
+ if buffer_full:
+ self.logger.debug("Dropped char: %c(%d)", byte, byte)
+ return
+ # Print the character.
+ os.write(fd, six.int2byte(byte))
+ # Print the rest of the line (if any).
+ extra_bytes_written = os.write(
+ fd, self.input_buffer[self.input_buffer_pos :]
+ )
+
+ # Recreate the input buffer.
+ self.input_buffer = (
+ self.input_buffer[0 : self.input_buffer_pos]
+ + six.int2byte(byte)
+ + self.input_buffer[self.input_buffer_pos :]
+ )
+ # Update the input buffer position.
+ self.input_buffer_pos += 1 + extra_bytes_written
+
+ # Reset the cursor if we wrote any extra bytes.
+ if extra_bytes_written:
+ self.MoveCursor("left", extra_bytes_written)
+
+ self.logger.debug("input_buffer_pos: %d", self.input_buffer_pos)
+
+ def MoveCursor(self, direction, count):
+ """MoveCursor moves the cursor left or right by count columns.
+
+ Args:
+ direction: A string that should be either 'left' or 'right' representing
+ the direction to move the cursor on the console.
+ count: An integer representing how many columns the cursor should be
+ moved.
+
+ Raises:
+ AssertionError: If the direction is not equal to 'left' or 'right'.
+ """
+ # If there's nothing to move, we're done.
+ if not count:
+ return
+ fd = self.controller_pty
+ seq = b"\033[" + str(count).encode("ascii")
+ if direction == "left":
+ # Bind the movement.
+ if count > self.input_buffer_pos:
+ count = self.input_buffer_pos
+ seq += b"D"
+ self.logger.debug("move cursor left %d", count)
+ self.input_buffer_pos -= count
+
+ elif direction == "right":
+ # Bind the movement.
+ if (count + self.input_buffer_pos) > len(self.input_buffer):
+ count = 0
+ seq += b"C"
+ self.logger.debug("move cursor right %d", count)
+ self.input_buffer_pos += count
+
+ else:
+ raise AssertionError(
+ ("The only valid directions are 'left' and " "'right'")
+ )
+
+ self.logger.debug("input_buffer_pos: %d", self.input_buffer_pos)
+ # Move the cursor.
+ if count != 0:
+ os.write(fd, seq)
+
+ def KillLine(self):
+ """Kill the rest of the line based on the input buffer position."""
+ # Killing the line is killing all the text to the right.
+ diff = len(self.input_buffer) - self.input_buffer_pos
+ self.logger.debug("diff: %d", diff)
+ # Diff shouldn't be negative, but if it is for some reason, let's try to
+ # correct the cursor.
+ if diff < 0:
+ self.logger.warning(
+ "Resetting input buffer position to %d...", len(self.input_buffer)
+ )
+ self.MoveCursor("left", -diff)
+ return
+ if diff:
+ self.MoveCursor("right", diff)
+ for _ in range(diff):
+ self.SendBackspace()
+ self.input_buffer_pos -= diff
+ self.input_buffer = self.input_buffer[0 : self.input_buffer_pos]
+
+ def SendBackspace(self):
+ """Backspace a character on the console."""
+ os.write(self.controller_pty, b"\033[1D \033[1D")
+
+ def ProcessOOBMQueue(self):
+ """Retrieve an item from the OOBM queue and process it."""
+ item = self.oobm_queue.get()
+ self.logger.debug("OOBM cmd: %r", item)
+ cmd = item.split(b" ")
+
+ if cmd[0] == b"loglevel":
+ # An integer is required in order to set the log level.
+ if len(cmd) < 2:
+ self.logger.debug("Insufficient args")
+ self.PrintOOBMHelp()
+ return
+ try:
+ self.logger.debug("Log level change request.")
+ new_log_level = int(cmd[1])
+ self.logger.logger.setLevel(new_log_level)
+ self.logger.info("Log level changed to %d.", new_log_level)
+
+ # Forward the request to the interpreter as well.
+ self.cmd_pipe.send(item)
+ except ValueError:
+ # Ignoring the request if an integer was not provided.
+ self.PrintOOBMHelp()
+
+ elif cmd[0] == b"timestamp":
+ mode = cmd[1].lower()
+ self.timestamp_enabled = mode == b"on"
+ self.logger.info(
+ "%sabling uart timestamps.", "En" if self.timestamp_enabled else "Dis"
+ )
+
+ elif cmd[0] == b"rawdebug":
+ mode = cmd[1].lower()
+ self.raw_debug = mode == b"on"
+ self.logger.info(
+ "%sabling per interrupt debug logs.", "En" if self.raw_debug else "Dis"
+ )
+
+ elif cmd[0] == b"interrogate" and len(cmd) >= 2:
+ enhanced = False
+ mode = cmd[1]
+ if len(cmd) >= 3 and cmd[2] == b"enhanced":
+ enhanced = True
+
+ # Set the mode if correct.
+ if mode in INTERROGATION_MODES:
+ self.interrogation_mode = mode
+ self.logger.debug("Updated interrogation mode to %s.", mode)
+
+ # Update the assumptions of the EC image.
+ self.enhanced_ec = enhanced
+ self.logger.debug("Enhanced EC image is now %r", self.enhanced_ec)
+
+ # Send command to interpreter as well.
+ self.cmd_pipe.send(b"enhanced " + str(self.enhanced_ec).encode("ascii"))
+ else:
+ self.PrintOOBMHelp()
+
+ else:
+ self.PrintOOBMHelp()
+
+ def PrintOOBMHelp(self):
+ """Prints out the OOBM help."""
+ # Print help syntax.
+ os.write(self.controller_pty, b"\r\n" + b"Known OOBM commands:\r\n")
+ os.write(
+ self.controller_pty,
+ b" interrogate <never | always | auto> " b"[enhanced]\r\n",
+ )
+ os.write(self.controller_pty, b" loglevel <int>\r\n")
+
+ def CheckBufferForEnhancedImage(self, data):
+ """Adds data to a look buffer and checks to see for enhanced EC image.
+
+ The EC's console task prints a string upon initialization which says that
+ "Console is enabled; type HELP for help.". The enhanced EC images print a
+ different string as a part of their init. This function searches through a
+ "look" buffer, scanning for the presence of either of those strings and
+ updating the enhanced_ec state accordingly.
+
+ Args:
+ data: A string containing the data sent from the interpreter.
+ """
+ self.look_buffer += data
+
+ # Search the buffer for any of the EC image strings.
+ enhanced_match = re.search(ENHANCED_IMAGE_RE, self.look_buffer)
+ non_enhanced_match = re.search(NON_ENHANCED_IMAGE_RE, self.look_buffer)
+
+ # Update the state if any matches were found.
+ if enhanced_match or non_enhanced_match:
+ if enhanced_match:
+ self.enhanced_ec = True
+ elif non_enhanced_match:
+ self.enhanced_ec = False
+
+ # Inform the interpreter of the result.
+ self.cmd_pipe.send(b"enhanced " + str(self.enhanced_ec).encode("ascii"))
+ self.logger.debug("Enhanced EC image? %r", self.enhanced_ec)
+
+ # Clear look buffer since a match was found.
+ self.look_buffer = b""
+
+ # Move the sliding window.
+ self.look_buffer = self.look_buffer[-LOOK_BUFFER_SIZE:]
- else:
- self.logger.error('Unexpected sequence. %c', byte)
- self.esc_state = 0
-
- def ProcessInput(self):
- """Captures the input determines what actions to take."""
- # There's nothing to do if the input buffer is empty.
- if len(self.input_buffer) == 0:
- return
-
- # Don't store 2 consecutive identical commands in the history.
- if (self.history and self.history[-1] != self.input_buffer
- or not self.history):
- self.history.append(self.input_buffer)
-
- # Split the command up by spaces.
- line = self.input_buffer.split(b' ')
- self.logger.debug('cmd: %s', self.input_buffer)
- cmd = line[0].lower()
-
- # The 'history' command is a special case that we handle locally.
- if cmd == 'history':
- self.PrintHistory()
- return
-
- # Send the command to the interpreter.
- self.logger.debug('Sending command to interpreter.')
- self.cmd_pipe.send(self.input_buffer)
- def CheckForEnhancedECImage(self):
- """Performs an interrogation of the EC image.
+def CanonicalizeTimeString(timestr):
+ """Canonicalize the timestamp string.
- Send a SYN and expect an ACK. If no ACK or the response is incorrect, then
- assume that the current EC image that we are talking to is not enhanced.
+ Args:
+ timestr: A timestamp string ended with 6 digits msec.
Returns:
- is_enhanced: A boolean indicating whether the EC responded to the
- interrogation correctly.
-
- Raises:
- EOFError: Allowed to propagate through from self.dbg_pipe.recv().
+ A string with 3 digits msec and an extra space.
"""
- # Send interrogation byte and wait for the response.
- self.logger.debug('Performing interrogation.')
- self.cmd_pipe.send(interpreter.EC_SYN)
-
- response = ''
- if self.dbg_pipe.poll(self.interrogation_timeout):
- response = self.dbg_pipe.recv()
- self.logger.debug('response: %r', binascii.hexlify(response))
- else:
- self.logger.debug('Timed out waiting for EC_ACK')
+ return timestr[:-3].encode("ascii") + b" "
- # Verify the acknowledgment.
- is_enhanced = response == interpreter.EC_ACK
-
- if is_enhanced:
- # Increase the interrogation timeout for stability purposes.
- self.interrogation_timeout = ENHANCED_EC_INTERROGATION_TIMEOUT
- self.logger.debug('Increasing interrogation timeout to %rs.',
- self.interrogation_timeout)
- else:
- # Reduce the timeout in order to reduce the perceivable delay.
- self.interrogation_timeout = NON_ENHANCED_EC_INTERROGATION_TIMEOUT
- self.logger.debug('Reducing interrogation timeout to %rs.',
- self.interrogation_timeout)
-
- return is_enhanced
-
- def HandleChar(self, byte):
- """HandleChar does a certain action when it receives a character.
-
- Args:
- byte: An integer representing the character received from the user.
- Raises:
- EOFError: Allowed to propagate through from self.CheckForEnhancedECImage()
- i.e. from self.dbg_pipe.recv().
- """
- fd = self.controller_pty
-
- # Enter the OOBM prompt mode if the user presses '%'.
- if byte == ord('%'):
- self.logger.debug('Begin OOBM command.')
- self.receiving_oobm_cmd = True
- # Print a "prompt".
- os.write(self.controller_pty, b'\r\n% ')
- return
-
- # Add chars to the pending OOBM command if we're currently receiving one.
- if self.receiving_oobm_cmd and byte != ControlKey.CARRIAGE_RETURN:
- tmp_bytes = six.int2byte(byte)
- self.pending_oobm_cmd += tmp_bytes
- self.logger.debug('%s', tmp_bytes)
- os.write(self.controller_pty, tmp_bytes)
- return
-
- if byte == ControlKey.CARRIAGE_RETURN:
- if self.receiving_oobm_cmd:
- # Terminate the command and place it in the OOBM queue.
- self.logger.debug('End OOBM command.')
- if self.pending_oobm_cmd:
- self.oobm_queue.put(self.pending_oobm_cmd)
- self.logger.debug('Placed %r into OOBM command queue.',
- self.pending_oobm_cmd)
-
- # Reset the state.
- os.write(self.controller_pty, b'\r\n' + self.prompt)
- self.input_buffer = b''
- self.input_buffer_pos = 0
- self.receiving_oobm_cmd = False
- self.pending_oobm_cmd = b''
- return
-
- if self.interrogation_mode == b'never':
- self.logger.debug('Skipping interrogation because interrogation mode'
- ' is set to never.')
- elif self.interrogation_mode == b'always':
- # Only interrogate the EC if the interrogation mode is set to 'always'.
- self.enhanced_ec = self.CheckForEnhancedECImage()
- self.logger.debug('Enhanced EC image? %r', self.enhanced_ec)
-
- if not self.enhanced_ec:
- # Send everything straight to the EC to handle.
- self.cmd_pipe.send(six.int2byte(byte))
- # Reset the input buffer.
- self.input_buffer = b''
- self.input_buffer_pos = 0
- self.logger.log(1, 'Reset input buffer.')
- return
-
- # Keep handling the ESC sequence if we're in the middle of it.
- if self.esc_state != 0:
- self.HandleEsc(byte)
- return
-
- # When we're at the end of the line, we should only allow going backwards,
- # backspace, carriage return, up, or down. The arrow keys are escape
- # sequences, so we let the escape...escape.
- if (self.input_buffer_pos >= self.line_limit and
- byte not in [ControlKey.CTRL_B, ControlKey.ESC, ControlKey.BACKSPACE,
- ControlKey.CTRL_A, ControlKey.CARRIAGE_RETURN,
- ControlKey.CTRL_P, ControlKey.CTRL_N]):
- return
-
- # If the input buffer is full we can't accept new chars.
- buffer_full = len(self.input_buffer) >= self.line_limit
-
-
- # Carriage_Return/Enter
- if byte == ControlKey.CARRIAGE_RETURN:
- self.logger.debug('Enter key pressed.')
- # Put a carriage return/newline and the print the prompt.
- os.write(fd, b'\r\n')
-
- # TODO(aaboagye): When we control the printing of all output, print the
- # prompt AFTER printing all the output. We can't do it yet because we
- # don't know how much is coming from the EC.
-
- # Print the prompt.
- os.write(fd, self.prompt)
- # Process the input.
- self.ProcessInput()
- # Now, clear the buffer.
- self.input_buffer = b''
- self.input_buffer_pos = 0
- # Reset history buffer pos.
- self.history_pos = len(self.history)
- # Clear partial command.
- self.partial_cmd = b''
-
- # Backspace
- elif byte == ControlKey.BACKSPACE:
- self.logger.debug('Backspace pressed.')
- if self.input_buffer_pos > 0:
- # Move left 1 column.
- self.MoveCursor('left', 1)
- # Remove the character at the input_buffer_pos by slicing it out.
- self.SliceOutChar()
-
- self.logger.debug('input_buffer_pos: %d', self.input_buffer_pos)
-
- # Ctrl+A. Move cursor to beginning of the line
- elif byte == ControlKey.CTRL_A:
- self.logger.debug('Control+A pressed.')
- self.MoveCursor('left', self.input_buffer_pos)
-
- # Ctrl+B. Move cursor left 1 column.
- elif byte == ControlKey.CTRL_B:
- self.logger.debug('Control+B pressed.')
- self.MoveCursor('left', 1)
-
- # Ctrl+D. Delete a character.
- elif byte == ControlKey.CTRL_D:
- self.logger.debug('Control+D pressed.')
- if self.input_buffer_pos != len(self.input_buffer):
- # Remove the character by slicing it out.
- self.SliceOutChar()
-
- # Ctrl+E. Move cursor to end of the line.
- elif byte == ControlKey.CTRL_E:
- self.logger.debug('Control+E pressed.')
- self.MoveCursor('right',
- len(self.input_buffer) - self.input_buffer_pos)
-
- # Ctrl+F. Move cursor right 1 column.
- elif byte == ControlKey.CTRL_F:
- self.logger.debug('Control+F pressed.')
- self.MoveCursor('right', 1)
-
- # Ctrl+K. Kill line.
- elif byte == ControlKey.CTRL_K:
- self.logger.debug('Control+K pressed.')
- self.KillLine()
-
- # Ctrl+N. Next line.
- elif byte == ControlKey.CTRL_N:
- self.logger.debug('Control+N pressed.')
- self.ShowNextCommand()
-
- # Ctrl+P. Previous line.
- elif byte == ControlKey.CTRL_P:
- self.logger.debug('Control+P pressed.')
- self.ShowPreviousCommand()
-
- # ESC sequence
- elif byte == ControlKey.ESC:
- # Starting an ESC sequence
- self.esc_state = EscState.ESC_START
-
- # Only print printable chars.
- elif IsPrintable(byte):
- # Drop the character if we're full.
- if buffer_full:
- self.logger.debug('Dropped char: %c(%d)', byte, byte)
- return
- # Print the character.
- os.write(fd, six.int2byte(byte))
- # Print the rest of the line (if any).
- extra_bytes_written = os.write(fd,
- self.input_buffer[self.input_buffer_pos:])
-
- # Recreate the input buffer.
- self.input_buffer = (self.input_buffer[0:self.input_buffer_pos] +
- six.int2byte(byte) +
- self.input_buffer[self.input_buffer_pos:])
- # Update the input buffer position.
- self.input_buffer_pos += 1 + extra_bytes_written
-
- # Reset the cursor if we wrote any extra bytes.
- if extra_bytes_written:
- self.MoveCursor('left', extra_bytes_written)
-
- self.logger.debug('input_buffer_pos: %d', self.input_buffer_pos)
-
- def MoveCursor(self, direction, count):
- """MoveCursor moves the cursor left or right by count columns.
+def IsPrintable(byte):
+ """Determines if a byte is printable.
Args:
- direction: A string that should be either 'left' or 'right' representing
- the direction to move the cursor on the console.
- count: An integer representing how many columns the cursor should be
- moved.
+ byte: An integer potentially representing a printable character.
- Raises:
- AssertionError: If the direction is not equal to 'left' or 'right'.
+ Returns:
+ A boolean indicating whether the byte is a printable character.
"""
- # If there's nothing to move, we're done.
- if not count:
- return
- fd = self.controller_pty
- seq = b'\033[' + str(count).encode('ascii')
- if direction == 'left':
- # Bind the movement.
- if count > self.input_buffer_pos:
- count = self.input_buffer_pos
- seq += b'D'
- self.logger.debug('move cursor left %d', count)
- self.input_buffer_pos -= count
-
- elif direction == 'right':
- # Bind the movement.
- if (count + self.input_buffer_pos) > len(self.input_buffer):
- count = 0
- seq += b'C'
- self.logger.debug('move cursor right %d', count)
- self.input_buffer_pos += count
-
- else:
- raise AssertionError(('The only valid directions are \'left\' and '
- '\'right\''))
-
- self.logger.debug('input_buffer_pos: %d', self.input_buffer_pos)
- # Move the cursor.
- if count != 0:
- os.write(fd, seq)
-
- def KillLine(self):
- """Kill the rest of the line based on the input buffer position."""
- # Killing the line is killing all the text to the right.
- diff = len(self.input_buffer) - self.input_buffer_pos
- self.logger.debug('diff: %d', diff)
- # Diff shouldn't be negative, but if it is for some reason, let's try to
- # correct the cursor.
- if diff < 0:
- self.logger.warning('Resetting input buffer position to %d...',
- len(self.input_buffer))
- self.MoveCursor('left', -diff)
- return
- if diff:
- self.MoveCursor('right', diff)
- for _ in range(diff):
- self.SendBackspace()
- self.input_buffer_pos -= diff
- self.input_buffer = self.input_buffer[0:self.input_buffer_pos]
-
- def SendBackspace(self):
- """Backspace a character on the console."""
- os.write(self.controller_pty, b'\033[1D \033[1D')
-
- def ProcessOOBMQueue(self):
- """Retrieve an item from the OOBM queue and process it."""
- item = self.oobm_queue.get()
- self.logger.debug('OOBM cmd: %r', item)
- cmd = item.split(b' ')
-
- if cmd[0] == b'loglevel':
- # An integer is required in order to set the log level.
- if len(cmd) < 2:
- self.logger.debug('Insufficient args')
- self.PrintOOBMHelp()
- return
- try:
- self.logger.debug('Log level change request.')
- new_log_level = int(cmd[1])
- self.logger.logger.setLevel(new_log_level)
- self.logger.info('Log level changed to %d.', new_log_level)
-
- # Forward the request to the interpreter as well.
- self.cmd_pipe.send(item)
- except ValueError:
- # Ignoring the request if an integer was not provided.
- self.PrintOOBMHelp()
-
- elif cmd[0] == b'timestamp':
- mode = cmd[1].lower()
- self.timestamp_enabled = (mode == b'on')
- self.logger.info('%sabling uart timestamps.',
- 'En' if self.timestamp_enabled else 'Dis')
-
- elif cmd[0] == b'rawdebug':
- mode = cmd[1].lower()
- self.raw_debug = (mode == b'on')
- self.logger.info('%sabling per interrupt debug logs.',
- 'En' if self.raw_debug else 'Dis')
-
- elif cmd[0] == b'interrogate' and len(cmd) >= 2:
- enhanced = False
- mode = cmd[1]
- if len(cmd) >= 3 and cmd[2] == b'enhanced':
- enhanced = True
-
- # Set the mode if correct.
- if mode in INTERROGATION_MODES:
- self.interrogation_mode = mode
- self.logger.debug('Updated interrogation mode to %s.', mode)
-
- # Update the assumptions of the EC image.
- self.enhanced_ec = enhanced
- self.logger.debug('Enhanced EC image is now %r', self.enhanced_ec)
-
- # Send command to interpreter as well.
- self.cmd_pipe.send(b'enhanced ' + str(self.enhanced_ec).encode('ascii'))
- else:
- self.PrintOOBMHelp()
-
- else:
- self.PrintOOBMHelp()
+ return byte >= ord(" ") and byte <= ord("~")
- def PrintOOBMHelp(self):
- """Prints out the OOBM help."""
- # Print help syntax.
- os.write(self.controller_pty, b'\r\n' + b'Known OOBM commands:\r\n')
- os.write(self.controller_pty, b' interrogate <never | always | auto> '
- b'[enhanced]\r\n')
- os.write(self.controller_pty, b' loglevel <int>\r\n')
- def CheckBufferForEnhancedImage(self, data):
- """Adds data to a look buffer and checks to see for enhanced EC image.
-
- The EC's console task prints a string upon initialization which says that
- "Console is enabled; type HELP for help.". The enhanced EC images print a
- different string as a part of their init. This function searches through a
- "look" buffer, scanning for the presence of either of those strings and
- updating the enhanced_ec state accordingly.
+def StartLoop(console, command_active, shutdown_pipe=None):
+ """Starts the infinite loop of console processing.
Args:
- data: A string containing the data sent from the interpreter.
+ console: A Console object that has been properly initialzed.
+ command_active: ctypes data object or multiprocessing.Value indicating if
+ servod owns the console, or user owns the console. This prevents input
+ collisions.
+ shutdown_pipe: A file object for a pipe or equivalent that becomes readable
+ (not blocked) to indicate that the loop should exit. Can be None to never
+ exit the loop.
"""
- self.look_buffer += data
-
- # Search the buffer for any of the EC image strings.
- enhanced_match = re.search(ENHANCED_IMAGE_RE, self.look_buffer)
- non_enhanced_match = re.search(NON_ENHANCED_IMAGE_RE, self.look_buffer)
-
- # Update the state if any matches were found.
- if enhanced_match or non_enhanced_match:
- if enhanced_match:
- self.enhanced_ec = True
- elif non_enhanced_match:
- self.enhanced_ec = False
-
- # Inform the interpreter of the result.
- self.cmd_pipe.send(b'enhanced ' + str(self.enhanced_ec).encode('ascii'))
- self.logger.debug('Enhanced EC image? %r', self.enhanced_ec)
-
- # Clear look buffer since a match was found.
- self.look_buffer = b''
-
- # Move the sliding window.
- self.look_buffer = self.look_buffer[-LOOK_BUFFER_SIZE:]
-
-
-def CanonicalizeTimeString(timestr):
- """Canonicalize the timestamp string.
-
- Args:
- timestr: A timestamp string ended with 6 digits msec.
-
- Returns:
- A string with 3 digits msec and an extra space.
- """
- return timestr[:-3].encode('ascii') + b' '
-
-
-def IsPrintable(byte):
- """Determines if a byte is printable.
-
- Args:
- byte: An integer potentially representing a printable character.
-
- Returns:
- A boolean indicating whether the byte is a printable character.
- """
- return byte >= ord(' ') and byte <= ord('~')
-
-
-def StartLoop(console, command_active, shutdown_pipe=None):
- """Starts the infinite loop of console processing.
-
- Args:
- console: A Console object that has been properly initialzed.
- command_active: ctypes data object or multiprocessing.Value indicating if
- servod owns the console, or user owns the console. This prevents input
- collisions.
- shutdown_pipe: A file object for a pipe or equivalent that becomes readable
- (not blocked) to indicate that the loop should exit. Can be None to never
- exit the loop.
- """
- try:
- console.logger.debug('Console is being served on %s.', console.user_pty)
- console.logger.debug('Console controller is on %s.', console.controller_pty)
- console.logger.debug('Command interface is being served on %s.',
- console.interface_pty)
- console.logger.debug(console)
-
- # This checks for HUP to indicate if the user has connected to the pty.
- ep = select.epoll()
- ep.register(console.controller_pty, select.EPOLLHUP)
-
- # This is used instead of "break" to avoid exiting the loop in the middle of
- # an iteration.
- continue_looping = True
-
- # Used for determining when to print host timestamps
- tm_req = True
-
- while continue_looping:
- # Check to see if pts is connected to anything
- events = ep.poll(0)
- controller_connected = not events
-
- # Check to see if pipes or the console are ready for reading.
- read_list = [console.interface_pty,
- console.cmd_pipe, console.dbg_pipe]
- if controller_connected:
- read_list.append(console.controller_pty)
- if shutdown_pipe is not None:
- read_list.append(shutdown_pipe)
-
- # Check if any input is ready, or wait for .1 sec and re-poll if
- # a user has connected to the pts.
- select_output = select.select(read_list, [], [], .1)
- if not select_output:
- continue
- ready_for_reading = select_output[0]
-
- for obj in ready_for_reading:
- if obj is console.controller_pty:
- if not command_active.value:
- # Convert to bytes so we can look for non-printable chars such as
- # Ctrl+A, Ctrl+E, etc.
- try:
- line = bytearray(os.read(console.controller_pty, CONSOLE_MAX_READ))
- console.logger.debug('Input from user: %s, locked:%s',
- str(line).strip(), command_active.value)
- for i in line:
- try:
- # Handle each character as it arrives.
- console.HandleChar(i)
- except EOFError:
- console.logger.debug(
- 'ec3po console received EOF from dbg_pipe in HandleChar()'
- ' while reading console.controller_pty')
- continue_looping = False
- break
- except OSError:
- console.logger.debug('Ptm read failed, probably user disconnect.')
-
- elif obj is console.interface_pty:
- if command_active.value:
- # Convert to bytes so we can look for non-printable chars such as
- # Ctrl+A, Ctrl+E, etc.
- line = bytearray(os.read(console.interface_pty, CONSOLE_MAX_READ))
- console.logger.debug('Input from interface: %s, locked:%s',
- str(line).strip(), command_active.value)
- for i in line:
- try:
- # Handle each character as it arrives.
- console.HandleChar(i)
- except EOFError:
- console.logger.debug(
- 'ec3po console received EOF from dbg_pipe in HandleChar()'
- ' while reading console.interface_pty')
- continue_looping = False
- break
-
- elif obj is console.cmd_pipe:
- try:
- data = console.cmd_pipe.recv()
- except EOFError:
- console.logger.debug('ec3po console received EOF from cmd_pipe')
- continue_looping = False
- else:
- # Write it to the user console.
- if console.raw_debug:
- console.logger.debug('|CMD|-%s->%r',
- ('u' if controller_connected else '') +
- ('i' if command_active.value else ''),
- data.strip())
+ try:
+ console.logger.debug("Console is being served on %s.", console.user_pty)
+ console.logger.debug("Console controller is on %s.", console.controller_pty)
+ console.logger.debug(
+ "Command interface is being served on %s.", console.interface_pty
+ )
+ console.logger.debug(console)
+
+ # This checks for HUP to indicate if the user has connected to the pty.
+ ep = select.epoll()
+ ep.register(console.controller_pty, select.EPOLLHUP)
+
+ # This is used instead of "break" to avoid exiting the loop in the middle of
+ # an iteration.
+ continue_looping = True
+
+ # Used for determining when to print host timestamps
+ tm_req = True
+
+ while continue_looping:
+ # Check to see if pts is connected to anything
+ events = ep.poll(0)
+ controller_connected = not events
+
+ # Check to see if pipes or the console are ready for reading.
+ read_list = [console.interface_pty, console.cmd_pipe, console.dbg_pipe]
if controller_connected:
- os.write(console.controller_pty, data)
- if command_active.value:
- os.write(console.interface_pty, data)
-
- elif obj is console.dbg_pipe:
- try:
- data = console.dbg_pipe.recv()
- except EOFError:
- console.logger.debug('ec3po console received EOF from dbg_pipe')
- continue_looping = False
- else:
- if console.interrogation_mode == b'auto':
- # Search look buffer for enhanced EC image string.
- console.CheckBufferForEnhancedImage(data)
- # Write it to the user console.
- if len(data) > 1 and console.raw_debug:
- console.logger.debug('|DBG|-%s->%r',
- ('u' if controller_connected else '') +
- ('i' if command_active.value else ''),
- data.strip())
- console.LogConsoleOutput(data)
- if controller_connected:
- end = len(data) - 1
- if console.timestamp_enabled:
- # A timestamp is required at the beginning of this line
- if tm_req is True:
- now = datetime.now()
- tm = CanonicalizeTimeString(now.strftime(HOST_STRFTIME))
- os.write(console.controller_pty, tm)
- tm_req = False
-
- # Insert timestamps into the middle where appropriate
- # except if the last character is a newline
- nls_found = data.count(b'\n', 0, end)
- now = datetime.now()
- tm = CanonicalizeTimeString(now.strftime('\n' + HOST_STRFTIME))
- data_tm = data.replace(b'\n', tm, nls_found)
- else:
- data_tm = data
-
- # timestamp required on next input
- if data[end] == b'\n'[0]:
- tm_req = True
- os.write(console.controller_pty, data_tm)
- if command_active.value:
- os.write(console.interface_pty, data)
-
- elif obj is shutdown_pipe:
- console.logger.debug(
- 'ec3po console received shutdown pipe unblocked notification')
- continue_looping = False
-
- while not console.oobm_queue.empty():
- console.logger.debug('OOBM queue ready for reading.')
- console.ProcessOOBMQueue()
-
- except KeyboardInterrupt:
- pass
-
- finally:
- ep.unregister(console.controller_pty)
- console.dbg_pipe.close()
- console.cmd_pipe.close()
- os.close(console.controller_pty)
- os.close(console.interface_pty)
- if shutdown_pipe is not None:
- shutdown_pipe.close()
- console.logger.debug('Exit ec3po console loop for %s', console.user_pty)
+ read_list.append(console.controller_pty)
+ if shutdown_pipe is not None:
+ read_list.append(shutdown_pipe)
+
+ # Check if any input is ready, or wait for .1 sec and re-poll if
+ # a user has connected to the pts.
+ select_output = select.select(read_list, [], [], 0.1)
+ if not select_output:
+ continue
+ ready_for_reading = select_output[0]
+
+ for obj in ready_for_reading:
+ if obj is console.controller_pty:
+ if not command_active.value:
+ # Convert to bytes so we can look for non-printable chars such as
+ # Ctrl+A, Ctrl+E, etc.
+ try:
+ line = bytearray(
+ os.read(console.controller_pty, CONSOLE_MAX_READ)
+ )
+ console.logger.debug(
+ "Input from user: %s, locked:%s",
+ str(line).strip(),
+ command_active.value,
+ )
+ for i in line:
+ try:
+ # Handle each character as it arrives.
+ console.HandleChar(i)
+ except EOFError:
+ console.logger.debug(
+ "ec3po console received EOF from dbg_pipe in HandleChar()"
+ " while reading console.controller_pty"
+ )
+ continue_looping = False
+ break
+ except OSError:
+ console.logger.debug(
+ "Ptm read failed, probably user disconnect."
+ )
+
+ elif obj is console.interface_pty:
+ if command_active.value:
+ # Convert to bytes so we can look for non-printable chars such as
+ # Ctrl+A, Ctrl+E, etc.
+ line = bytearray(
+ os.read(console.interface_pty, CONSOLE_MAX_READ)
+ )
+ console.logger.debug(
+ "Input from interface: %s, locked:%s",
+ str(line).strip(),
+ command_active.value,
+ )
+ for i in line:
+ try:
+ # Handle each character as it arrives.
+ console.HandleChar(i)
+ except EOFError:
+ console.logger.debug(
+ "ec3po console received EOF from dbg_pipe in HandleChar()"
+ " while reading console.interface_pty"
+ )
+ continue_looping = False
+ break
+
+ elif obj is console.cmd_pipe:
+ try:
+ data = console.cmd_pipe.recv()
+ except EOFError:
+ console.logger.debug("ec3po console received EOF from cmd_pipe")
+ continue_looping = False
+ else:
+ # Write it to the user console.
+ if console.raw_debug:
+ console.logger.debug(
+ "|CMD|-%s->%r",
+ ("u" if controller_connected else "")
+ + ("i" if command_active.value else ""),
+ data.strip(),
+ )
+ if controller_connected:
+ os.write(console.controller_pty, data)
+ if command_active.value:
+ os.write(console.interface_pty, data)
+
+ elif obj is console.dbg_pipe:
+ try:
+ data = console.dbg_pipe.recv()
+ except EOFError:
+ console.logger.debug("ec3po console received EOF from dbg_pipe")
+ continue_looping = False
+ else:
+ if console.interrogation_mode == b"auto":
+ # Search look buffer for enhanced EC image string.
+ console.CheckBufferForEnhancedImage(data)
+ # Write it to the user console.
+ if len(data) > 1 and console.raw_debug:
+ console.logger.debug(
+ "|DBG|-%s->%r",
+ ("u" if controller_connected else "")
+ + ("i" if command_active.value else ""),
+ data.strip(),
+ )
+ console.LogConsoleOutput(data)
+ if controller_connected:
+ end = len(data) - 1
+ if console.timestamp_enabled:
+ # A timestamp is required at the beginning of this line
+ if tm_req is True:
+ now = datetime.now()
+ tm = CanonicalizeTimeString(
+ now.strftime(HOST_STRFTIME)
+ )
+ os.write(console.controller_pty, tm)
+ tm_req = False
+
+ # Insert timestamps into the middle where appropriate
+ # except if the last character is a newline
+ nls_found = data.count(b"\n", 0, end)
+ now = datetime.now()
+ tm = CanonicalizeTimeString(
+ now.strftime("\n" + HOST_STRFTIME)
+ )
+ data_tm = data.replace(b"\n", tm, nls_found)
+ else:
+ data_tm = data
+
+ # timestamp required on next input
+ if data[end] == b"\n"[0]:
+ tm_req = True
+ os.write(console.controller_pty, data_tm)
+ if command_active.value:
+ os.write(console.interface_pty, data)
+
+ elif obj is shutdown_pipe:
+ console.logger.debug(
+ "ec3po console received shutdown pipe unblocked notification"
+ )
+ continue_looping = False
+
+ while not console.oobm_queue.empty():
+ console.logger.debug("OOBM queue ready for reading.")
+ console.ProcessOOBMQueue()
+
+ except KeyboardInterrupt:
+ pass
+
+ finally:
+ ep.unregister(console.controller_pty)
+ console.dbg_pipe.close()
+ console.cmd_pipe.close()
+ os.close(console.controller_pty)
+ os.close(console.interface_pty)
+ if shutdown_pipe is not None:
+ shutdown_pipe.close()
+ console.logger.debug("Exit ec3po console loop for %s", console.user_pty)
def main(argv):
- """Kicks off the EC-3PO interactive console interface and interpreter.
-
- We create some pipes to communicate with an interpreter, instantiate an
- interpreter, create a PTY pair, and begin serving the console interface.
-
- Args:
- argv: A list of strings containing the arguments this module was called
- with.
- """
- # Set up argument parser.
- parser = argparse.ArgumentParser(description=('Start interactive EC console '
- 'and interpreter.'))
- parser.add_argument('ec_uart_pty',
- help=('The full PTY name that the EC UART'
- ' is present on. eg: /dev/pts/12'))
- parser.add_argument('--log-level',
- default='info',
- help='info, debug, warning, error, or critical')
-
- # Parse arguments.
- opts = parser.parse_args(argv)
-
- # Set logging level.
- opts.log_level = opts.log_level.lower()
- if opts.log_level == 'info':
- log_level = logging.INFO
- elif opts.log_level == 'debug':
- log_level = logging.DEBUG
- elif opts.log_level == 'warning':
- log_level = logging.WARNING
- elif opts.log_level == 'error':
- log_level = logging.ERROR
- elif opts.log_level == 'critical':
- log_level = logging.CRITICAL
- else:
- parser.error('Invalid log level. (info, debug, warning, error, critical)')
-
- # Start logging with a timestamp, module, and log level shown in each log
- # entry.
- logging.basicConfig(level=log_level, format=('%(asctime)s - %(module)s -'
- ' %(levelname)s - %(message)s'))
-
- # Create some pipes to communicate between the interpreter and the console.
- # The command pipe is bidirectional.
- cmd_pipe_interactive, cmd_pipe_interp = threadproc_shim.Pipe()
- # The debug pipe is unidirectional from interpreter to console only.
- dbg_pipe_interactive, dbg_pipe_interp = threadproc_shim.Pipe(duplex=False)
-
- # Create an interpreter instance.
- itpr = interpreter.Interpreter(opts.ec_uart_pty, cmd_pipe_interp,
- dbg_pipe_interp, log_level)
-
- # Spawn an interpreter process.
- itpr_process = threadproc_shim.ThreadOrProcess(
- target=interpreter.StartLoop, args=(itpr,))
- # Make sure to kill the interpreter when we terminate.
- itpr_process.daemon = True
- # Start the interpreter.
- itpr_process.start()
-
- # Open a new pseudo-terminal pair
- (controller_pty, user_pty) = pty.openpty()
- # Set the permissions to 660.
- os.chmod(os.ttyname(user_pty), (stat.S_IRGRP | stat.S_IWGRP |
- stat.S_IRUSR | stat.S_IWUSR))
- # Create a console.
- console = Console(controller_pty, os.ttyname(user_pty), cmd_pipe_interactive,
- dbg_pipe_interactive)
- # Start serving the console.
- v = threadproc_shim.Value(ctypes.c_bool, False)
- StartLoop(console, v)
-
-
-if __name__ == '__main__':
- main(sys.argv[1:])
+ """Kicks off the EC-3PO interactive console interface and interpreter.
+
+ We create some pipes to communicate with an interpreter, instantiate an
+ interpreter, create a PTY pair, and begin serving the console interface.
+
+ Args:
+ argv: A list of strings containing the arguments this module was called
+ with.
+ """
+ # Set up argument parser.
+ parser = argparse.ArgumentParser(
+ description=("Start interactive EC console " "and interpreter.")
+ )
+ parser.add_argument(
+ "ec_uart_pty",
+ help=("The full PTY name that the EC UART" " is present on. eg: /dev/pts/12"),
+ )
+ parser.add_argument(
+ "--log-level", default="info", help="info, debug, warning, error, or critical"
+ )
+
+ # Parse arguments.
+ opts = parser.parse_args(argv)
+
+ # Set logging level.
+ opts.log_level = opts.log_level.lower()
+ if opts.log_level == "info":
+ log_level = logging.INFO
+ elif opts.log_level == "debug":
+ log_level = logging.DEBUG
+ elif opts.log_level == "warning":
+ log_level = logging.WARNING
+ elif opts.log_level == "error":
+ log_level = logging.ERROR
+ elif opts.log_level == "critical":
+ log_level = logging.CRITICAL
+ else:
+ parser.error("Invalid log level. (info, debug, warning, error, critical)")
+
+ # Start logging with a timestamp, module, and log level shown in each log
+ # entry.
+ logging.basicConfig(
+ level=log_level,
+ format=("%(asctime)s - %(module)s -" " %(levelname)s - %(message)s"),
+ )
+
+ # Create some pipes to communicate between the interpreter and the console.
+ # The command pipe is bidirectional.
+ cmd_pipe_interactive, cmd_pipe_interp = threadproc_shim.Pipe()
+ # The debug pipe is unidirectional from interpreter to console only.
+ dbg_pipe_interactive, dbg_pipe_interp = threadproc_shim.Pipe(duplex=False)
+
+ # Create an interpreter instance.
+ itpr = interpreter.Interpreter(
+ opts.ec_uart_pty, cmd_pipe_interp, dbg_pipe_interp, log_level
+ )
+
+ # Spawn an interpreter process.
+ itpr_process = threadproc_shim.ThreadOrProcess(
+ target=interpreter.StartLoop, args=(itpr,)
+ )
+ # Make sure to kill the interpreter when we terminate.
+ itpr_process.daemon = True
+ # Start the interpreter.
+ itpr_process.start()
+
+ # Open a new pseudo-terminal pair
+ (controller_pty, user_pty) = pty.openpty()
+ # Set the permissions to 660.
+ os.chmod(
+ os.ttyname(user_pty),
+ (stat.S_IRGRP | stat.S_IWGRP | stat.S_IRUSR | stat.S_IWUSR),
+ )
+ # Create a console.
+ console = Console(
+ controller_pty, os.ttyname(user_pty), cmd_pipe_interactive, dbg_pipe_interactive
+ )
+ # Start serving the console.
+ v = threadproc_shim.Value(ctypes.c_bool, False)
+ StartLoop(console, v)
+
+
+if __name__ == "__main__":
+ main(sys.argv[1:])
diff --git a/util/ec3po/console_unittest.py b/util/ec3po/console_unittest.py
index 7e341e7e8d..41ae324ef4 100755
--- a/util/ec3po/console_unittest.py
+++ b/util/ec3po/console_unittest.py
@@ -11,1262 +11,1310 @@ from __future__ import print_function
import binascii
import logging
-import mock
import tempfile
import unittest
+import mock
import six
-
-from ec3po import console
-from ec3po import interpreter
-from ec3po import threadproc_shim
+from ec3po import console, interpreter, threadproc_shim
ESC_STRING = six.int2byte(console.ControlKey.ESC)
+
class Keys(object):
- """A class that contains the escape sequences for special keys."""
- LEFT_ARROW = [console.ControlKey.ESC, ord('['), ord('D')]
- RIGHT_ARROW = [console.ControlKey.ESC, ord('['), ord('C')]
- UP_ARROW = [console.ControlKey.ESC, ord('['), ord('A')]
- DOWN_ARROW = [console.ControlKey.ESC, ord('['), ord('B')]
- HOME = [console.ControlKey.ESC, ord('['), ord('1'), ord('~')]
- END = [console.ControlKey.ESC, ord('['), ord('8'), ord('~')]
- DEL = [console.ControlKey.ESC, ord('['), ord('3'), ord('~')]
+ """A class that contains the escape sequences for special keys."""
+
+ LEFT_ARROW = [console.ControlKey.ESC, ord("["), ord("D")]
+ RIGHT_ARROW = [console.ControlKey.ESC, ord("["), ord("C")]
+ UP_ARROW = [console.ControlKey.ESC, ord("["), ord("A")]
+ DOWN_ARROW = [console.ControlKey.ESC, ord("["), ord("B")]
+ HOME = [console.ControlKey.ESC, ord("["), ord("1"), ord("~")]
+ END = [console.ControlKey.ESC, ord("["), ord("8"), ord("~")]
+ DEL = [console.ControlKey.ESC, ord("["), ord("3"), ord("~")]
+
class OutputStream(object):
- """A class that has methods which return common console output."""
+ """A class that has methods which return common console output."""
- @staticmethod
- def MoveCursorLeft(count):
- """Produces what would be printed to the console if the cursor moved left.
+ @staticmethod
+ def MoveCursorLeft(count):
+ """Produces what would be printed to the console if the cursor moved left.
- Args:
- count: An integer representing how many columns to move left.
+ Args:
+ count: An integer representing how many columns to move left.
- Returns:
- string: A string which contains what would be printed to the console if
- the cursor moved left.
- """
- string = ESC_STRING
- string += b'[' + str(count).encode('ascii') + b'D'
- return string
+ Returns:
+ string: A string which contains what would be printed to the console if
+ the cursor moved left.
+ """
+ string = ESC_STRING
+ string += b"[" + str(count).encode("ascii") + b"D"
+ return string
- @staticmethod
- def MoveCursorRight(count):
- """Produces what would be printed to the console if the cursor moved right.
+ @staticmethod
+ def MoveCursorRight(count):
+ """Produces what would be printed to the console if the cursor moved right.
- Args:
- count: An integer representing how many columns to move right.
+ Args:
+ count: An integer representing how many columns to move right.
- Returns:
- string: A string which contains what would be printed to the console if
- the cursor moved right.
- """
- string = ESC_STRING
- string += b'[' + str(count).encode('ascii') + b'C'
- return string
+ Returns:
+ string: A string which contains what would be printed to the console if
+ the cursor moved right.
+ """
+ string = ESC_STRING
+ string += b"[" + str(count).encode("ascii") + b"C"
+ return string
-BACKSPACE_STRING = b''
+
+BACKSPACE_STRING = b""
# Move cursor left 1 column.
BACKSPACE_STRING += OutputStream.MoveCursorLeft(1)
# Write a space.
-BACKSPACE_STRING += b' '
+BACKSPACE_STRING += b" "
# Move cursor left 1 column.
BACKSPACE_STRING += OutputStream.MoveCursorLeft(1)
+
def BytesToByteList(string):
- """Converts a bytes string to list of bytes.
+ """Converts a bytes string to list of bytes.
+
+ Args:
+ string: A literal bytes to turn into a list of bytes.
- Args:
- string: A literal bytes to turn into a list of bytes.
+ Returns:
+ A list of integers representing the byte value of each character in the
+ string.
+ """
+ if six.PY3:
+ return [c for c in string]
+ return [ord(c) for c in string]
- Returns:
- A list of integers representing the byte value of each character in the
- string.
- """
- if six.PY3:
- return [c for c in string]
- return [ord(c) for c in string]
def CheckConsoleOutput(test_case, exp_console_out):
- """Verify what was sent out the console matches what we expect.
+ """Verify what was sent out the console matches what we expect.
- Args:
- test_case: A unittest.TestCase object representing the current unit test.
- exp_console_out: A string representing the console output stream.
- """
- # Read what was sent out the console.
- test_case.tempfile.seek(0)
- console_out = test_case.tempfile.read()
+ Args:
+ test_case: A unittest.TestCase object representing the current unit test.
+ exp_console_out: A string representing the console output stream.
+ """
+ # Read what was sent out the console.
+ test_case.tempfile.seek(0)
+ console_out = test_case.tempfile.read()
- test_case.assertEqual(exp_console_out, console_out)
+ test_case.assertEqual(exp_console_out, console_out)
-def CheckInputBuffer(test_case, exp_input_buffer):
- """Verify that the input buffer contains what we expect.
-
- Args:
- test_case: A unittest.TestCase object representing the current unit test.
- exp_input_buffer: A string containing the contents of the current input
- buffer.
- """
- test_case.assertEqual(exp_input_buffer, test_case.console.input_buffer,
- (b'input buffer does not match expected.\n'
- b'expected: |' + exp_input_buffer + b'|\n'
- b'got: |' + test_case.console.input_buffer +
- b'|\n' + str(test_case.console).encode('ascii')))
-def CheckInputBufferPosition(test_case, exp_pos):
- """Verify the input buffer position.
+def CheckInputBuffer(test_case, exp_input_buffer):
+ """Verify that the input buffer contains what we expect.
- Args:
- test_case: A unittest.TestCase object representing the current unit test.
- exp_pos: An integer representing the expected input buffer position.
- """
- test_case.assertEqual(exp_pos, test_case.console.input_buffer_pos,
- 'input buffer position is incorrect.\ngot: ' +
- str(test_case.console.input_buffer_pos) + '\nexp: ' +
- str(exp_pos) + '\n' + str(test_case.console))
+ Args:
+ test_case: A unittest.TestCase object representing the current unit test.
+ exp_input_buffer: A string containing the contents of the current input
+ buffer.
+ """
+ test_case.assertEqual(
+ exp_input_buffer,
+ test_case.console.input_buffer,
+ (
+ b"input buffer does not match expected.\n"
+ b"expected: |" + exp_input_buffer + b"|\n"
+ b"got: |"
+ + test_case.console.input_buffer
+ + b"|\n"
+ + str(test_case.console).encode("ascii")
+ ),
+ )
-def CheckHistoryBuffer(test_case, exp_history):
- """Verify that the items in the history buffer are what we expect.
-
- Args:
- test_case: A unittest.TestCase object representing the current unit test.
- exp_history: A list of strings representing the expected contents of the
- history buffer.
- """
- # First, check to see if the length is what we expect.
- test_case.assertEqual(len(exp_history), len(test_case.console.history),
- ('The number of items in the history is unexpected.\n'
- 'exp: ' + str(len(exp_history)) + '\n'
- 'got: ' + str(len(test_case.console.history)) + '\n'
- 'internal state:\n' + str(test_case.console)))
-
- # Next, check the actual contents of the history buffer.
- for i in range(len(exp_history)):
- test_case.assertEqual(exp_history[i], test_case.console.history[i],
- (b'history buffer contents are incorrect.\n'
- b'exp: ' + exp_history[i] + b'\n'
- b'got: ' + test_case.console.history[i] + b'\n'
- b'internal state:\n' +
- str(test_case.console).encode('ascii')))
+def CheckInputBufferPosition(test_case, exp_pos):
+ """Verify the input buffer position.
-class TestConsoleEditingMethods(unittest.TestCase):
- """Test case to verify all console editing methods."""
-
- def setUp(self):
- """Setup the test harness."""
- # Setup logging with a timestamp, the module, and the log level.
- logging.basicConfig(level=logging.DEBUG,
- format=('%(asctime)s - %(module)s -'
- ' %(levelname)s - %(message)s'))
-
- # Create a temp file and set both the controller and peripheral PTYs to the
- # file to create a loopback.
- self.tempfile = tempfile.TemporaryFile()
-
- # Create some mock pipes. These won't be used since we'll mock out sends
- # to the interpreter.
- mock_pipe_end_0, mock_pipe_end_1 = threadproc_shim.Pipe()
- self.console = console.Console(self.tempfile.fileno(), self.tempfile,
- tempfile.TemporaryFile(),
- mock_pipe_end_0, mock_pipe_end_1, "EC")
-
- # Console editing methods are only valid for enhanced EC images, therefore
- # we have to assume that the "EC" we're talking to is enhanced. By default,
- # the console believes that the EC it's communicating with is NOT enhanced
- # which is why we have to override it here.
- self.console.enhanced_ec = True
- self.console.CheckForEnhancedECImage = mock.MagicMock(return_value=True)
-
- def test_EnteringChars(self):
- """Verify that characters are echoed onto the console."""
- test_str = b'abc'
- input_stream = BytesToByteList(test_str)
-
- # Send the characters in.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # Check the input position.
- exp_pos = len(test_str)
- CheckInputBufferPosition(self, exp_pos)
-
- # Verify that the input buffer is correct.
- expected_buffer = test_str
- CheckInputBuffer(self, expected_buffer)
-
- # Check console output
- exp_console_out = test_str
- CheckConsoleOutput(self, exp_console_out)
-
- def test_EnteringDeletingMoreCharsThanEntered(self):
- """Verify that we can press backspace more than we have entered chars."""
- test_str = b'spamspam'
- input_stream = BytesToByteList(test_str)
-
- # Send the characters in.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # Now backspace 1 more than what we sent.
- input_stream = []
- for _ in range(len(test_str) + 1):
- input_stream.append(console.ControlKey.BACKSPACE)
-
- # Send that sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # First, verify that input buffer position is 0.
- CheckInputBufferPosition(self, 0)
-
- # Next, examine the output stream for the correct sequence.
- exp_console_out = test_str
- for _ in range(len(test_str)):
- exp_console_out += BACKSPACE_STRING
-
- # Now, verify that we got what we expected.
- CheckConsoleOutput(self, exp_console_out)
-
- def test_EnteringMoreThanCharLimit(self):
- """Verify that we drop characters when the line is too long."""
- test_str = self.console.line_limit * b'o' # All allowed.
- test_str += 5 * b'x' # All should be dropped.
- input_stream = BytesToByteList(test_str)
-
- # Send the characters in.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # First, we expect that input buffer position should be equal to the line
- # limit.
- exp_pos = self.console.line_limit
- CheckInputBufferPosition(self, exp_pos)
-
- # The input buffer should only hold until the line limit.
- exp_buffer = test_str[0:self.console.line_limit]
- CheckInputBuffer(self, exp_buffer)
-
- # Lastly, check that the extra characters are not printed.
- exp_console_out = exp_buffer
- CheckConsoleOutput(self, exp_console_out)
-
- def test_ValidKeysOnLongLine(self):
- """Verify that we can still press valid keys if the line is too long."""
- # Fill the line.
- test_str = self.console.line_limit * b'o'
- exp_console_out = test_str
- # Try to fill it even more; these should all be dropped.
- test_str += 5 * b'x'
- input_stream = BytesToByteList(test_str)
-
- # We should be able to press the following keys:
- # - Backspace
- # - Arrow Keys/CTRL+B/CTRL+F/CTRL+P/CTRL+N
- # - Delete
- # - Home/CTRL+A
- # - End/CTRL+E
- # - Carriage Return
-
- # Backspace 1 character
- input_stream.append(console.ControlKey.BACKSPACE)
- exp_console_out += BACKSPACE_STRING
- # Refill the line.
- input_stream.extend(BytesToByteList(b'o'))
- exp_console_out += b'o'
-
- # Left arrow key.
- input_stream.extend(Keys.LEFT_ARROW)
- exp_console_out += OutputStream.MoveCursorLeft(1)
-
- # Right arrow key.
- input_stream.extend(Keys.RIGHT_ARROW)
- exp_console_out += OutputStream.MoveCursorRight(1)
-
- # CTRL+B
- input_stream.append(console.ControlKey.CTRL_B)
- exp_console_out += OutputStream.MoveCursorLeft(1)
-
- # CTRL+F
- input_stream.append(console.ControlKey.CTRL_F)
- exp_console_out += OutputStream.MoveCursorRight(1)
-
- # Let's press enter now so we can test up and down.
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
- exp_console_out += b'\r\n' + self.console.prompt
-
- # Up arrow key.
- input_stream.extend(Keys.UP_ARROW)
- exp_console_out += test_str[:self.console.line_limit]
-
- # Down arrow key.
- input_stream.extend(Keys.DOWN_ARROW)
- # Since the line was blank, we have to backspace the entire line.
- exp_console_out += self.console.line_limit * BACKSPACE_STRING
-
- # CTRL+P
- input_stream.append(console.ControlKey.CTRL_P)
- exp_console_out += test_str[:self.console.line_limit]
-
- # CTRL+N
- input_stream.append(console.ControlKey.CTRL_N)
- # Since the line was blank, we have to backspace the entire line.
- exp_console_out += self.console.line_limit * BACKSPACE_STRING
-
- # Press the Up arrow key to reprint the long line.
- input_stream.extend(Keys.UP_ARROW)
- exp_console_out += test_str[:self.console.line_limit]
-
- # Press the Home key to jump to the beginning of the line.
- input_stream.extend(Keys.HOME)
- exp_console_out += OutputStream.MoveCursorLeft(self.console.line_limit)
-
- # Press the End key to jump to the end of the line.
- input_stream.extend(Keys.END)
- exp_console_out += OutputStream.MoveCursorRight(self.console.line_limit)
-
- # Press CTRL+A to jump to the beginning of the line.
- input_stream.append(console.ControlKey.CTRL_A)
- exp_console_out += OutputStream.MoveCursorLeft(self.console.line_limit)
-
- # Press CTRL+E to jump to the end of the line.
- input_stream.extend(Keys.END)
- exp_console_out += OutputStream.MoveCursorRight(self.console.line_limit)
-
- # Move left one column so we can delete a character.
- input_stream.extend(Keys.LEFT_ARROW)
- exp_console_out += OutputStream.MoveCursorLeft(1)
-
- # Press the delete key.
- input_stream.extend(Keys.DEL)
- # This should look like a space, and then move cursor left 1 column since
- # we're at the end of line.
- exp_console_out += b' ' + OutputStream.MoveCursorLeft(1)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # Verify everything happened correctly.
- CheckConsoleOutput(self, exp_console_out)
-
- def test_BackspaceOnEmptyLine(self):
- """Verify that we can backspace on an empty line with no bad effects."""
- # Send a single backspace.
- test_str = [console.ControlKey.BACKSPACE]
-
- # Send the characters in.
- for byte in test_str:
- self.console.HandleChar(byte)
-
- # Check the input position.
- exp_pos = 0
- CheckInputBufferPosition(self, exp_pos)
-
- # Check that buffer is empty.
- exp_input_buffer = b''
- CheckInputBuffer(self, exp_input_buffer)
-
- # Check that the console output is empty.
- exp_console_out = b''
- CheckConsoleOutput(self, exp_console_out)
-
- def test_BackspaceWithinLine(self):
- """Verify that we shift the chars over when backspacing within a line."""
- # Misspell 'help'
- test_str = b'heelp'
- input_stream = BytesToByteList(test_str)
- # Use the arrow key to go back to fix it.
- # Move cursor left 1 column.
- input_stream.extend(2*Keys.LEFT_ARROW)
- # Backspace once to remove the extra 'e'.
- input_stream.append(console.ControlKey.BACKSPACE)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # Verify the input buffer
- exp_input_buffer = b'help'
- CheckInputBuffer(self, exp_input_buffer)
-
- # Verify the input buffer position. It should be at 2 (cursor over the 'l')
- CheckInputBufferPosition(self, 2)
-
- # We expect the console output to be the test string, with two moves to the
- # left, another move left, and then the rest of the line followed by a
- # space.
- exp_console_out = test_str
- exp_console_out += 2 * OutputStream.MoveCursorLeft(1)
-
- # Move cursor left 1 column.
- exp_console_out += OutputStream.MoveCursorLeft(1)
- # Rest of the line and a space. (test_str in this case)
- exp_console_out += b'lp '
- # Reset the cursor 2 + 1 to the left.
- exp_console_out += OutputStream.MoveCursorLeft(3)
-
- # Verify console output.
- CheckConsoleOutput(self, exp_console_out)
-
- def test_JumpToBeginningOfLineViaCtrlA(self):
- """Verify that we can jump to the beginning of a line with Ctrl+A."""
- # Enter some chars and press CTRL+A
- test_str = b'abc'
- input_stream = BytesToByteList(test_str) + [console.ControlKey.CTRL_A]
-
- # Send the characters in.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # We expect to see our test string followed by a move cursor left.
- exp_console_out = test_str
- exp_console_out += OutputStream.MoveCursorLeft(len(test_str))
-
- # Check to see what whas printed on the console.
- CheckConsoleOutput(self, exp_console_out)
-
- # Check that the input buffer position is now 0.
- CheckInputBufferPosition(self, 0)
-
- # Check input buffer still contains our test string.
- CheckInputBuffer(self, test_str)
-
- def test_JumpToBeginningOfLineViaHomeKey(self):
- """Jump to beginning of line via HOME key."""
- test_str = b'version'
- input_stream = BytesToByteList(test_str)
- input_stream.extend(Keys.HOME)
-
- # Send out the stream.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # First, verify that input buffer position is now 0.
- CheckInputBufferPosition(self, 0)
-
- # Next, verify that the input buffer did not change.
- CheckInputBuffer(self, test_str)
-
- # Lastly, check that the cursor moved correctly.
- exp_console_out = test_str
- exp_console_out += OutputStream.MoveCursorLeft(len(test_str))
- CheckConsoleOutput(self, exp_console_out)
-
- def test_JumpToEndOfLineViaEndKey(self):
- """Jump to the end of the line using the END key."""
- test_str = b'version'
- input_stream = BytesToByteList(test_str)
- input_stream += [console.ControlKey.CTRL_A]
- # Now, jump to the end of the line.
- input_stream.extend(Keys.END)
-
- # Send out the stream.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # Verify that the input buffer position is correct. This should be at the
- # end of the test string.
- CheckInputBufferPosition(self, len(test_str))
-
- # The expected output should be the test string, followed by a jump to the
- # beginning of the line, and lastly a jump to the end of the line.
- exp_console_out = test_str
- exp_console_out += OutputStream.MoveCursorLeft(len(test_str))
- # Now the jump back to the end of the line.
- exp_console_out += OutputStream.MoveCursorRight(len(test_str))
-
- # Verify console output stream.
- CheckConsoleOutput(self, exp_console_out)
-
- def test_JumpToEndOfLineViaCtrlE(self):
- """Enter some chars and then try to jump to the end. (Should be a no-op)"""
- test_str = b'sysinfo'
- input_stream = BytesToByteList(test_str)
- input_stream.append(console.ControlKey.CTRL_E)
-
- # Send out the stream
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # Verify that the input buffer position isn't any further than we expect.
- # At this point, the position should be at the end of the test string.
- CheckInputBufferPosition(self, len(test_str))
-
- # Now, let's try to jump to the beginning and then jump back to the end.
- input_stream = [console.ControlKey.CTRL_A, console.ControlKey.CTRL_E]
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # Perform the same verification.
- CheckInputBufferPosition(self, len(test_str))
-
- # Lastly try to jump again, beyond the end.
- input_stream = [console.ControlKey.CTRL_E]
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # Perform the same verification.
- CheckInputBufferPosition(self, len(test_str))
-
- # We expect to see the test string, a jump to the beginning of the line, and
- # one jump to the end of the line.
- exp_console_out = test_str
- # Jump to beginning.
- exp_console_out += OutputStream.MoveCursorLeft(len(test_str))
- # Jump back to end.
- exp_console_out += OutputStream.MoveCursorRight(len(test_str))
-
- # Verify the console output.
- CheckConsoleOutput(self, exp_console_out)
-
- def test_MoveLeftWithArrowKey(self):
- """Move cursor left one column with arrow key."""
- test_str = b'tastyspam'
- input_stream = BytesToByteList(test_str)
- input_stream.extend(Keys.LEFT_ARROW)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # Verify that the input buffer position is 1 less than the length.
- CheckInputBufferPosition(self, len(test_str) - 1)
-
- # Also, verify that the input buffer is not modified.
- CheckInputBuffer(self, test_str)
-
- # We expect the test string, followed by a one column move left.
- exp_console_out = test_str + OutputStream.MoveCursorLeft(1)
-
- # Verify console output.
- CheckConsoleOutput(self, exp_console_out)
-
- def test_MoveLeftWithCtrlB(self):
- """Move cursor back one column with Ctrl+B."""
- test_str = b'tastyspam'
- input_stream = BytesToByteList(test_str)
- input_stream.append(console.ControlKey.CTRL_B)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # Verify that the input buffer position is 1 less than the length.
- CheckInputBufferPosition(self, len(test_str) - 1)
+ Args:
+ test_case: A unittest.TestCase object representing the current unit test.
+ exp_pos: An integer representing the expected input buffer position.
+ """
+ test_case.assertEqual(
+ exp_pos,
+ test_case.console.input_buffer_pos,
+ "input buffer position is incorrect.\ngot: "
+ + str(test_case.console.input_buffer_pos)
+ + "\nexp: "
+ + str(exp_pos)
+ + "\n"
+ + str(test_case.console),
+ )
- # Also, verify that the input buffer is not modified.
- CheckInputBuffer(self, test_str)
- # We expect the test string, followed by a one column move left.
- exp_console_out = test_str + OutputStream.MoveCursorLeft(1)
+def CheckHistoryBuffer(test_case, exp_history):
+ """Verify that the items in the history buffer are what we expect.
- # Verify console output.
- CheckConsoleOutput(self, exp_console_out)
+ Args:
+ test_case: A unittest.TestCase object representing the current unit test.
+ exp_history: A list of strings representing the expected contents of the
+ history buffer.
+ """
+ # First, check to see if the length is what we expect.
+ test_case.assertEqual(
+ len(exp_history),
+ len(test_case.console.history),
+ (
+ "The number of items in the history is unexpected.\n"
+ "exp: " + str(len(exp_history)) + "\n"
+ "got: " + str(len(test_case.console.history)) + "\n"
+ "internal state:\n" + str(test_case.console)
+ ),
+ )
+
+ # Next, check the actual contents of the history buffer.
+ for i in range(len(exp_history)):
+ test_case.assertEqual(
+ exp_history[i],
+ test_case.console.history[i],
+ (
+ b"history buffer contents are incorrect.\n"
+ b"exp: " + exp_history[i] + b"\n"
+ b"got: " + test_case.console.history[i] + b"\n"
+ b"internal state:\n" + str(test_case.console).encode("ascii")
+ ),
+ )
- def test_MoveRightWithArrowKey(self):
- """Move cursor one column to the right with the arrow key."""
- test_str = b'version'
- input_stream = BytesToByteList(test_str)
- # Jump to beginning of line.
- input_stream.append(console.ControlKey.CTRL_A)
- # Press right arrow key.
- input_stream.extend(Keys.RIGHT_ARROW)
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
+class TestConsoleEditingMethods(unittest.TestCase):
+ """Test case to verify all console editing methods."""
+
+ def setUp(self):
+ """Setup the test harness."""
+ # Setup logging with a timestamp, the module, and the log level.
+ logging.basicConfig(
+ level=logging.DEBUG,
+ format=("%(asctime)s - %(module)s -" " %(levelname)s - %(message)s"),
+ )
+
+ # Create a temp file and set both the controller and peripheral PTYs to the
+ # file to create a loopback.
+ self.tempfile = tempfile.TemporaryFile()
+
+ # Create some mock pipes. These won't be used since we'll mock out sends
+ # to the interpreter.
+ mock_pipe_end_0, mock_pipe_end_1 = threadproc_shim.Pipe()
+ self.console = console.Console(
+ self.tempfile.fileno(),
+ self.tempfile,
+ tempfile.TemporaryFile(),
+ mock_pipe_end_0,
+ mock_pipe_end_1,
+ "EC",
+ )
+
+ # Console editing methods are only valid for enhanced EC images, therefore
+ # we have to assume that the "EC" we're talking to is enhanced. By default,
+ # the console believes that the EC it's communicating with is NOT enhanced
+ # which is why we have to override it here.
+ self.console.enhanced_ec = True
+ self.console.CheckForEnhancedECImage = mock.MagicMock(return_value=True)
+
+ def test_EnteringChars(self):
+ """Verify that characters are echoed onto the console."""
+ test_str = b"abc"
+ input_stream = BytesToByteList(test_str)
+
+ # Send the characters in.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # Check the input position.
+ exp_pos = len(test_str)
+ CheckInputBufferPosition(self, exp_pos)
+
+ # Verify that the input buffer is correct.
+ expected_buffer = test_str
+ CheckInputBuffer(self, expected_buffer)
+
+ # Check console output
+ exp_console_out = test_str
+ CheckConsoleOutput(self, exp_console_out)
+
+ def test_EnteringDeletingMoreCharsThanEntered(self):
+ """Verify that we can press backspace more than we have entered chars."""
+ test_str = b"spamspam"
+ input_stream = BytesToByteList(test_str)
+
+ # Send the characters in.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # Now backspace 1 more than what we sent.
+ input_stream = []
+ for _ in range(len(test_str) + 1):
+ input_stream.append(console.ControlKey.BACKSPACE)
+
+ # Send that sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # First, verify that input buffer position is 0.
+ CheckInputBufferPosition(self, 0)
+
+ # Next, examine the output stream for the correct sequence.
+ exp_console_out = test_str
+ for _ in range(len(test_str)):
+ exp_console_out += BACKSPACE_STRING
+
+ # Now, verify that we got what we expected.
+ CheckConsoleOutput(self, exp_console_out)
+
+ def test_EnteringMoreThanCharLimit(self):
+ """Verify that we drop characters when the line is too long."""
+ test_str = self.console.line_limit * b"o" # All allowed.
+ test_str += 5 * b"x" # All should be dropped.
+ input_stream = BytesToByteList(test_str)
+
+ # Send the characters in.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # First, we expect that input buffer position should be equal to the line
+ # limit.
+ exp_pos = self.console.line_limit
+ CheckInputBufferPosition(self, exp_pos)
+
+ # The input buffer should only hold until the line limit.
+ exp_buffer = test_str[0 : self.console.line_limit]
+ CheckInputBuffer(self, exp_buffer)
+
+ # Lastly, check that the extra characters are not printed.
+ exp_console_out = exp_buffer
+ CheckConsoleOutput(self, exp_console_out)
+
+ def test_ValidKeysOnLongLine(self):
+ """Verify that we can still press valid keys if the line is too long."""
+ # Fill the line.
+ test_str = self.console.line_limit * b"o"
+ exp_console_out = test_str
+ # Try to fill it even more; these should all be dropped.
+ test_str += 5 * b"x"
+ input_stream = BytesToByteList(test_str)
+
+ # We should be able to press the following keys:
+ # - Backspace
+ # - Arrow Keys/CTRL+B/CTRL+F/CTRL+P/CTRL+N
+ # - Delete
+ # - Home/CTRL+A
+ # - End/CTRL+E
+ # - Carriage Return
+
+ # Backspace 1 character
+ input_stream.append(console.ControlKey.BACKSPACE)
+ exp_console_out += BACKSPACE_STRING
+ # Refill the line.
+ input_stream.extend(BytesToByteList(b"o"))
+ exp_console_out += b"o"
+
+ # Left arrow key.
+ input_stream.extend(Keys.LEFT_ARROW)
+ exp_console_out += OutputStream.MoveCursorLeft(1)
+
+ # Right arrow key.
+ input_stream.extend(Keys.RIGHT_ARROW)
+ exp_console_out += OutputStream.MoveCursorRight(1)
+
+ # CTRL+B
+ input_stream.append(console.ControlKey.CTRL_B)
+ exp_console_out += OutputStream.MoveCursorLeft(1)
+
+ # CTRL+F
+ input_stream.append(console.ControlKey.CTRL_F)
+ exp_console_out += OutputStream.MoveCursorRight(1)
+
+ # Let's press enter now so we can test up and down.
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+ exp_console_out += b"\r\n" + self.console.prompt
+
+ # Up arrow key.
+ input_stream.extend(Keys.UP_ARROW)
+ exp_console_out += test_str[: self.console.line_limit]
+
+ # Down arrow key.
+ input_stream.extend(Keys.DOWN_ARROW)
+ # Since the line was blank, we have to backspace the entire line.
+ exp_console_out += self.console.line_limit * BACKSPACE_STRING
+
+ # CTRL+P
+ input_stream.append(console.ControlKey.CTRL_P)
+ exp_console_out += test_str[: self.console.line_limit]
+
+ # CTRL+N
+ input_stream.append(console.ControlKey.CTRL_N)
+ # Since the line was blank, we have to backspace the entire line.
+ exp_console_out += self.console.line_limit * BACKSPACE_STRING
+
+ # Press the Up arrow key to reprint the long line.
+ input_stream.extend(Keys.UP_ARROW)
+ exp_console_out += test_str[: self.console.line_limit]
+
+ # Press the Home key to jump to the beginning of the line.
+ input_stream.extend(Keys.HOME)
+ exp_console_out += OutputStream.MoveCursorLeft(self.console.line_limit)
+
+ # Press the End key to jump to the end of the line.
+ input_stream.extend(Keys.END)
+ exp_console_out += OutputStream.MoveCursorRight(self.console.line_limit)
+
+ # Press CTRL+A to jump to the beginning of the line.
+ input_stream.append(console.ControlKey.CTRL_A)
+ exp_console_out += OutputStream.MoveCursorLeft(self.console.line_limit)
+
+ # Press CTRL+E to jump to the end of the line.
+ input_stream.extend(Keys.END)
+ exp_console_out += OutputStream.MoveCursorRight(self.console.line_limit)
+
+ # Move left one column so we can delete a character.
+ input_stream.extend(Keys.LEFT_ARROW)
+ exp_console_out += OutputStream.MoveCursorLeft(1)
+
+ # Press the delete key.
+ input_stream.extend(Keys.DEL)
+ # This should look like a space, and then move cursor left 1 column since
+ # we're at the end of line.
+ exp_console_out += b" " + OutputStream.MoveCursorLeft(1)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # Verify everything happened correctly.
+ CheckConsoleOutput(self, exp_console_out)
+
+ def test_BackspaceOnEmptyLine(self):
+ """Verify that we can backspace on an empty line with no bad effects."""
+ # Send a single backspace.
+ test_str = [console.ControlKey.BACKSPACE]
+
+ # Send the characters in.
+ for byte in test_str:
+ self.console.HandleChar(byte)
+
+ # Check the input position.
+ exp_pos = 0
+ CheckInputBufferPosition(self, exp_pos)
+
+ # Check that buffer is empty.
+ exp_input_buffer = b""
+ CheckInputBuffer(self, exp_input_buffer)
+
+ # Check that the console output is empty.
+ exp_console_out = b""
+ CheckConsoleOutput(self, exp_console_out)
+
+ def test_BackspaceWithinLine(self):
+ """Verify that we shift the chars over when backspacing within a line."""
+ # Misspell 'help'
+ test_str = b"heelp"
+ input_stream = BytesToByteList(test_str)
+ # Use the arrow key to go back to fix it.
+ # Move cursor left 1 column.
+ input_stream.extend(2 * Keys.LEFT_ARROW)
+ # Backspace once to remove the extra 'e'.
+ input_stream.append(console.ControlKey.BACKSPACE)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # Verify the input buffer
+ exp_input_buffer = b"help"
+ CheckInputBuffer(self, exp_input_buffer)
+
+ # Verify the input buffer position. It should be at 2 (cursor over the 'l')
+ CheckInputBufferPosition(self, 2)
+
+ # We expect the console output to be the test string, with two moves to the
+ # left, another move left, and then the rest of the line followed by a
+ # space.
+ exp_console_out = test_str
+ exp_console_out += 2 * OutputStream.MoveCursorLeft(1)
+
+ # Move cursor left 1 column.
+ exp_console_out += OutputStream.MoveCursorLeft(1)
+ # Rest of the line and a space. (test_str in this case)
+ exp_console_out += b"lp "
+ # Reset the cursor 2 + 1 to the left.
+ exp_console_out += OutputStream.MoveCursorLeft(3)
+
+ # Verify console output.
+ CheckConsoleOutput(self, exp_console_out)
+
+ def test_JumpToBeginningOfLineViaCtrlA(self):
+ """Verify that we can jump to the beginning of a line with Ctrl+A."""
+ # Enter some chars and press CTRL+A
+ test_str = b"abc"
+ input_stream = BytesToByteList(test_str) + [console.ControlKey.CTRL_A]
+
+ # Send the characters in.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # We expect to see our test string followed by a move cursor left.
+ exp_console_out = test_str
+ exp_console_out += OutputStream.MoveCursorLeft(len(test_str))
+
+ # Check to see what whas printed on the console.
+ CheckConsoleOutput(self, exp_console_out)
+
+ # Check that the input buffer position is now 0.
+ CheckInputBufferPosition(self, 0)
+
+ # Check input buffer still contains our test string.
+ CheckInputBuffer(self, test_str)
+
+ def test_JumpToBeginningOfLineViaHomeKey(self):
+ """Jump to beginning of line via HOME key."""
+ test_str = b"version"
+ input_stream = BytesToByteList(test_str)
+ input_stream.extend(Keys.HOME)
+
+ # Send out the stream.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # First, verify that input buffer position is now 0.
+ CheckInputBufferPosition(self, 0)
+
+ # Next, verify that the input buffer did not change.
+ CheckInputBuffer(self, test_str)
+
+ # Lastly, check that the cursor moved correctly.
+ exp_console_out = test_str
+ exp_console_out += OutputStream.MoveCursorLeft(len(test_str))
+ CheckConsoleOutput(self, exp_console_out)
+
+ def test_JumpToEndOfLineViaEndKey(self):
+ """Jump to the end of the line using the END key."""
+ test_str = b"version"
+ input_stream = BytesToByteList(test_str)
+ input_stream += [console.ControlKey.CTRL_A]
+ # Now, jump to the end of the line.
+ input_stream.extend(Keys.END)
+
+ # Send out the stream.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # Verify that the input buffer position is correct. This should be at the
+ # end of the test string.
+ CheckInputBufferPosition(self, len(test_str))
+
+ # The expected output should be the test string, followed by a jump to the
+ # beginning of the line, and lastly a jump to the end of the line.
+ exp_console_out = test_str
+ exp_console_out += OutputStream.MoveCursorLeft(len(test_str))
+ # Now the jump back to the end of the line.
+ exp_console_out += OutputStream.MoveCursorRight(len(test_str))
+
+ # Verify console output stream.
+ CheckConsoleOutput(self, exp_console_out)
+
+ def test_JumpToEndOfLineViaCtrlE(self):
+ """Enter some chars and then try to jump to the end. (Should be a no-op)"""
+ test_str = b"sysinfo"
+ input_stream = BytesToByteList(test_str)
+ input_stream.append(console.ControlKey.CTRL_E)
+
+ # Send out the stream
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # Verify that the input buffer position isn't any further than we expect.
+ # At this point, the position should be at the end of the test string.
+ CheckInputBufferPosition(self, len(test_str))
+
+ # Now, let's try to jump to the beginning and then jump back to the end.
+ input_stream = [console.ControlKey.CTRL_A, console.ControlKey.CTRL_E]
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # Perform the same verification.
+ CheckInputBufferPosition(self, len(test_str))
+
+ # Lastly try to jump again, beyond the end.
+ input_stream = [console.ControlKey.CTRL_E]
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # Perform the same verification.
+ CheckInputBufferPosition(self, len(test_str))
+
+ # We expect to see the test string, a jump to the beginning of the line, and
+ # one jump to the end of the line.
+ exp_console_out = test_str
+ # Jump to beginning.
+ exp_console_out += OutputStream.MoveCursorLeft(len(test_str))
+ # Jump back to end.
+ exp_console_out += OutputStream.MoveCursorRight(len(test_str))
+
+ # Verify the console output.
+ CheckConsoleOutput(self, exp_console_out)
+
+ def test_MoveLeftWithArrowKey(self):
+ """Move cursor left one column with arrow key."""
+ test_str = b"tastyspam"
+ input_stream = BytesToByteList(test_str)
+ input_stream.extend(Keys.LEFT_ARROW)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # Verify that the input buffer position is 1 less than the length.
+ CheckInputBufferPosition(self, len(test_str) - 1)
+
+ # Also, verify that the input buffer is not modified.
+ CheckInputBuffer(self, test_str)
+
+ # We expect the test string, followed by a one column move left.
+ exp_console_out = test_str + OutputStream.MoveCursorLeft(1)
+
+ # Verify console output.
+ CheckConsoleOutput(self, exp_console_out)
+
+ def test_MoveLeftWithCtrlB(self):
+ """Move cursor back one column with Ctrl+B."""
+ test_str = b"tastyspam"
+ input_stream = BytesToByteList(test_str)
+ input_stream.append(console.ControlKey.CTRL_B)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # Verify that the input buffer position is 1 less than the length.
+ CheckInputBufferPosition(self, len(test_str) - 1)
- # Verify that the input buffer position is 1.
- CheckInputBufferPosition(self, 1)
+ # Also, verify that the input buffer is not modified.
+ CheckInputBuffer(self, test_str)
- # Also, verify that the input buffer is not modified.
- CheckInputBuffer(self, test_str)
+ # We expect the test string, followed by a one column move left.
+ exp_console_out = test_str + OutputStream.MoveCursorLeft(1)
- # We expect the test string, followed by a jump to the beginning of the
- # line, and finally a move right 1.
- exp_console_out = test_str + OutputStream.MoveCursorLeft(len((test_str)))
-
- # A move right 1 column.
- exp_console_out += OutputStream.MoveCursorRight(1)
-
- # Verify console output.
- CheckConsoleOutput(self, exp_console_out)
-
- def test_MoveRightWithCtrlF(self):
- """Move cursor forward one column with Ctrl+F."""
- test_str = b'panicinfo'
- input_stream = BytesToByteList(test_str)
- input_stream.append(console.ControlKey.CTRL_A)
- # Now, move right one column.
- input_stream.append(console.ControlKey.CTRL_F)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # Verify that the input buffer position is 1.
- CheckInputBufferPosition(self, 1)
-
- # Also, verify that the input buffer is not modified.
- CheckInputBuffer(self, test_str)
-
- # We expect the test string, followed by a jump to the beginning of the
- # line, and finally a move right 1.
- exp_console_out = test_str + OutputStream.MoveCursorLeft(len((test_str)))
-
- # A move right 1 column.
- exp_console_out += OutputStream.MoveCursorRight(1)
-
- # Verify console output.
- CheckConsoleOutput(self, exp_console_out)
-
- def test_ImpossibleMoveLeftWithArrowKey(self):
- """Verify that we can't move left at the beginning of the line."""
- # We shouldn't be able to move left if we're at the beginning of the line.
- input_stream = Keys.LEFT_ARROW
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # Nothing should have been output.
- exp_console_output = b''
- CheckConsoleOutput(self, exp_console_output)
-
- # The input buffer position should still be 0.
- CheckInputBufferPosition(self, 0)
-
- # The input buffer itself should be empty.
- CheckInputBuffer(self, b'')
-
- def test_ImpossibleMoveRightWithArrowKey(self):
- """Verify that we can't move right at the end of the line."""
- # We shouldn't be able to move right if we're at the end of the line.
- input_stream = Keys.RIGHT_ARROW
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # Nothing should have been output.
- exp_console_output = b''
- CheckConsoleOutput(self, exp_console_output)
-
- # The input buffer position should still be 0.
- CheckInputBufferPosition(self, 0)
-
- # The input buffer itself should be empty.
- CheckInputBuffer(self, b'')
-
- def test_KillEntireLine(self):
- """Verify that we can kill an entire line with Ctrl+K."""
- test_str = b'accelinfo on'
- input_stream = BytesToByteList(test_str)
- # Jump to beginning of line and then kill it with Ctrl+K.
- input_stream.extend([console.ControlKey.CTRL_A, console.ControlKey.CTRL_K])
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # First, we expect that the input buffer is empty.
- CheckInputBuffer(self, b'')
-
- # The buffer position should be 0.
- CheckInputBufferPosition(self, 0)
-
- # What we expect to see on the console stream should be the following. The
- # test string, a jump to the beginning of the line, then jump back to the
- # end of the line and replace the line with spaces.
- exp_console_out = test_str
- # Jump to beginning of line.
- exp_console_out += OutputStream.MoveCursorLeft(len(test_str))
- # Jump to end of line.
- exp_console_out += OutputStream.MoveCursorRight(len(test_str))
- # Replace line with spaces, which looks like backspaces.
- for _ in range(len(test_str)):
- exp_console_out += BACKSPACE_STRING
-
- # Verify the console output.
- CheckConsoleOutput(self, exp_console_out)
-
- def test_KillPartialLine(self):
- """Verify that we can kill a portion of a line."""
- test_str = b'accelread 0 1'
- input_stream = BytesToByteList(test_str)
- len_to_kill = 5
- for _ in range(len_to_kill):
- # Move cursor left
- input_stream.extend(Keys.LEFT_ARROW)
- # Now kill
- input_stream.append(console.ControlKey.CTRL_K)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # First, check that the input buffer was truncated.
- exp_input_buffer = test_str[:-len_to_kill]
- CheckInputBuffer(self, exp_input_buffer)
-
- # Verify the input buffer position.
- CheckInputBufferPosition(self, len(test_str) - len_to_kill)
-
- # The console output stream that we expect is the test string followed by a
- # move left of len_to_kill, then a jump to the end of the line and backspace
- # of len_to_kill.
- exp_console_out = test_str
- for _ in range(len_to_kill):
- # Move left 1 column.
- exp_console_out += OutputStream.MoveCursorLeft(1)
- # Then jump to the end of the line
- exp_console_out += OutputStream.MoveCursorRight(len_to_kill)
- # Backspace of len_to_kill
- for _ in range(len_to_kill):
- exp_console_out += BACKSPACE_STRING
-
- # Verify console output.
- CheckConsoleOutput(self, exp_console_out)
-
- def test_InsertingCharacters(self):
- """Verify that we can insert characters within the line."""
- test_str = b'accel 0 1' # Here we forgot the 'read' part in 'accelread'
- input_stream = BytesToByteList(test_str)
- # We need to move over to the 'l' and add read.
- insertion_point = test_str.find(b'l') + 1
- for i in range(len(test_str) - insertion_point):
- # Move cursor left.
- input_stream.extend(Keys.LEFT_ARROW)
- # Now, add in 'read'
- added_str = b'read'
- input_stream.extend(BytesToByteList(added_str))
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # First, verify that the input buffer is correct.
- exp_input_buffer = test_str[:insertion_point] + added_str
- exp_input_buffer += test_str[insertion_point:]
- CheckInputBuffer(self, exp_input_buffer)
-
- # Verify that the input buffer position is correct.
- exp_input_buffer_pos = insertion_point + len(added_str)
- CheckInputBufferPosition(self, exp_input_buffer_pos)
-
- # The console output stream that we expect is the test string, followed by
- # move cursor left until the 'l' was found, the added test string while
- # shifting characters around.
- exp_console_out = test_str
- for i in range(len(test_str) - insertion_point):
- # Move cursor left.
- exp_console_out += OutputStream.MoveCursorLeft(1)
-
- # Now for each character, write the rest of the line will be shifted to the
- # right one column.
- for i in range(len(added_str)):
- # Printed character.
- exp_console_out += added_str[i:i+1]
- # The rest of the line
- exp_console_out += test_str[insertion_point:]
- # Reset the cursor back left
- reset_dist = len(test_str[insertion_point:])
- exp_console_out += OutputStream.MoveCursorLeft(reset_dist)
-
- # Verify the console output.
- CheckConsoleOutput(self, exp_console_out)
-
- def test_StoreCommandHistory(self):
- """Verify that entered commands are stored in the history."""
- test_commands = []
- test_commands.append(b'help')
- test_commands.append(b'version')
- test_commands.append(b'accelread 0 1')
- input_stream = []
- for c in test_commands:
- input_stream.extend(BytesToByteList(c))
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # We expect to have the test commands in the history buffer.
- exp_history_buf = test_commands
- CheckHistoryBuffer(self, exp_history_buf)
-
- def test_CycleUpThruCommandHistory(self):
- """Verify that the UP arrow key will print itmes in the history buffer."""
- # Enter some commands.
- test_commands = [b'version', b'accelrange 0', b'battery', b'gettime']
- input_stream = []
- for command in test_commands:
- input_stream.extend(BytesToByteList(command))
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
-
- # Now, hit the UP arrow key to print the previous entries.
- for i in range(len(test_commands)):
- input_stream.extend(Keys.UP_ARROW)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # The expected output should be test commands with prompts printed in
- # between, followed by line kills with the previous test commands printed.
- exp_console_out = b''
- for i in range(len(test_commands)):
- exp_console_out += test_commands[i] + b'\r\n' + self.console.prompt
-
- # When we press up, the line should be cleared and print the previous buffer
- # entry.
- for i in range(len(test_commands)-1, 0, -1):
- exp_console_out += test_commands[i]
- # Backspace to the beginning.
- for i in range(len(test_commands[i])):
- exp_console_out += BACKSPACE_STRING
+ # Verify console output.
+ CheckConsoleOutput(self, exp_console_out)
- # The last command should just be printed out with no backspacing.
- exp_console_out += test_commands[0]
-
- # Now, verify.
- CheckConsoleOutput(self, exp_console_out)
-
- def test_UpArrowOnEmptyHistory(self):
- """Ensure nothing happens if the history is empty."""
- # Press the up arrow key twice.
- input_stream = 2 * Keys.UP_ARROW
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # We expect nothing to have happened.
- exp_console_out = b''
- exp_input_buffer = b''
- exp_input_buffer_pos = 0
- exp_history_buf = []
-
- # Verify.
- CheckConsoleOutput(self, exp_console_out)
- CheckInputBufferPosition(self, exp_input_buffer_pos)
- CheckInputBuffer(self, exp_input_buffer)
- CheckHistoryBuffer(self, exp_history_buf)
-
- def test_UpArrowDoesNotGoOutOfBounds(self):
- """Verify that pressing the up arrow many times won't go out of bounds."""
- # Enter one command.
- test_str = b'help version'
- input_stream = BytesToByteList(test_str)
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
- # Then press the up arrow key twice.
- input_stream.extend(2 * Keys.UP_ARROW)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # Verify that the history buffer is correct.
- exp_history_buf = [test_str]
- CheckHistoryBuffer(self, exp_history_buf)
-
- # We expect that the console output should only contain our entered command,
- # a new prompt, and then our command aggain.
- exp_console_out = test_str + b'\r\n' + self.console.prompt
- # Pressing up should reprint the command we entered.
- exp_console_out += test_str
-
- # Verify.
- CheckConsoleOutput(self, exp_console_out)
-
- def test_CycleDownThruCommandHistory(self):
- """Verify that we can select entries by hitting the down arrow."""
- # Enter at least 4 commands.
- test_commands = [b'version', b'accelrange 0', b'battery', b'gettime']
- input_stream = []
- for command in test_commands:
- input_stream.extend(BytesToByteList(command))
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
-
- # Now, hit the UP arrow key twice to print the previous two entries.
- for i in range(2):
- input_stream.extend(Keys.UP_ARROW)
-
- # Now, hit the DOWN arrow key twice to print the newer entries.
- input_stream.extend(2*Keys.DOWN_ARROW)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # The expected output should be commands that we entered, followed by
- # prompts, then followed by our last two commands in reverse. Then, we
- # should see the last entry in the list, followed by the saved partial cmd
- # of a blank line.
- exp_console_out = b''
- for i in range(len(test_commands)):
- exp_console_out += test_commands[i] + b'\r\n' + self.console.prompt
-
- # When we press up, the line should be cleared and print the previous buffer
- # entry.
- for i in range(len(test_commands)-1, 1, -1):
- exp_console_out += test_commands[i]
- # Backspace to the beginning.
- for i in range(len(test_commands[i])):
- exp_console_out += BACKSPACE_STRING
+ def test_MoveRightWithArrowKey(self):
+ """Move cursor one column to the right with the arrow key."""
+ test_str = b"version"
+ input_stream = BytesToByteList(test_str)
+ # Jump to beginning of line.
+ input_stream.append(console.ControlKey.CTRL_A)
+ # Press right arrow key.
+ input_stream.extend(Keys.RIGHT_ARROW)
- # When we press down, it should have cleared the last command (which we
- # covered with the previous for loop), and then prints the next command.
- exp_console_out += test_commands[3]
- for i in range(len(test_commands[3])):
- exp_console_out += BACKSPACE_STRING
-
- # Verify console output.
- CheckConsoleOutput(self, exp_console_out)
-
- # Verify input buffer.
- exp_input_buffer = b'' # Empty because our partial command was empty.
- exp_input_buffer_pos = len(exp_input_buffer)
- CheckInputBuffer(self, exp_input_buffer)
- CheckInputBufferPosition(self, exp_input_buffer_pos)
-
- def test_SavingPartialCommandWhenNavigatingHistory(self):
- """Verify that partial commands are saved when navigating history."""
- # Enter a command.
- test_str = b'accelinfo'
- input_stream = BytesToByteList(test_str)
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
-
- # Enter a partial command.
- partial_cmd = b'ver'
- input_stream.extend(BytesToByteList(partial_cmd))
-
- # Hit the UP arrow key.
- input_stream.extend(Keys.UP_ARROW)
- # Then, the DOWN arrow key.
- input_stream.extend(Keys.DOWN_ARROW)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # The expected output should be the command we entered, a prompt, the
- # partial command, clearing of the partial command, the command entered,
- # clearing of the command entered, and then the partial command.
- exp_console_out = test_str + b'\r\n' + self.console.prompt
- exp_console_out += partial_cmd
- for _ in range(len(partial_cmd)):
- exp_console_out += BACKSPACE_STRING
- exp_console_out += test_str
- for _ in range(len(test_str)):
- exp_console_out += BACKSPACE_STRING
- exp_console_out += partial_cmd
-
- # Verify console output.
- CheckConsoleOutput(self, exp_console_out)
-
- # Verify input buffer.
- exp_input_buffer = partial_cmd
- exp_input_buffer_pos = len(exp_input_buffer)
- CheckInputBuffer(self, exp_input_buffer)
- CheckInputBufferPosition(self, exp_input_buffer_pos)
-
- def test_DownArrowOnEmptyHistory(self):
- """Ensure nothing happens if the history is empty."""
- # Then press the up down arrow twice.
- input_stream = 2 * Keys.DOWN_ARROW
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # We expect nothing to have happened.
- exp_console_out = b''
- exp_input_buffer = b''
- exp_input_buffer_pos = 0
- exp_history_buf = []
-
- # Verify.
- CheckConsoleOutput(self, exp_console_out)
- CheckInputBufferPosition(self, exp_input_buffer_pos)
- CheckInputBuffer(self, exp_input_buffer)
- CheckHistoryBuffer(self, exp_history_buf)
-
- def test_DeleteCharsUsingDELKey(self):
- """Verify that we can delete characters using the DEL key."""
- test_str = b'version'
- input_stream = BytesToByteList(test_str)
-
- # Hit the left arrow key 2 times.
- input_stream.extend(2 * Keys.LEFT_ARROW)
-
- # Press the DEL key.
- input_stream.extend(Keys.DEL)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # The expected output should be the command we entered, 2 individual cursor
- # moves to the left, and then removing a char and shifting everything to the
- # left one column.
- exp_console_out = test_str
- exp_console_out += 2 * OutputStream.MoveCursorLeft(1)
-
- # Remove the char by shifting everything to the left one, slicing out the
- # remove char.
- exp_console_out += test_str[-1:] + b' '
-
- # Reset the cursor by moving back 2 columns because of the 'n' and space.
- exp_console_out += OutputStream.MoveCursorLeft(2)
-
- # Verify console output.
- CheckConsoleOutput(self, exp_console_out)
-
- # Verify input buffer. The input buffer should have the char sliced out and
- # be positioned where the char was removed.
- exp_input_buffer = test_str[:-2] + test_str[-1:]
- exp_input_buffer_pos = len(exp_input_buffer) - 1
- CheckInputBuffer(self, exp_input_buffer)
- CheckInputBufferPosition(self, exp_input_buffer_pos)
-
- def test_RepeatedCommandInHistory(self):
- """Verify that we don't store 2 consecutive identical commands in history"""
- # Enter a few commands.
- test_commands = [b'version', b'accelrange 0', b'battery', b'gettime']
- # Repeat the last command.
- test_commands.append(test_commands[len(test_commands)-1])
-
- input_stream = []
- for command in test_commands:
- input_stream.extend(BytesToByteList(command))
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # Verify that the history buffer is correct. The last command, since
- # it was repeated, should not have been added to the history.
- exp_history_buf = test_commands[0:len(test_commands)-1]
- CheckHistoryBuffer(self, exp_history_buf)
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+ # Verify that the input buffer position is 1.
+ CheckInputBufferPosition(self, 1)
-class TestConsoleCompatibility(unittest.TestCase):
- """Verify that console can speak to enhanced and non-enhanced EC images."""
- def setUp(self):
- """Setup the test harness."""
- # Setup logging with a timestamp, the module, and the log level.
- logging.basicConfig(level=logging.DEBUG,
- format=('%(asctime)s - %(module)s -'
- ' %(levelname)s - %(message)s'))
- # Create a temp file and set both the controller and peripheral PTYs to the
- # file to create a loopback.
- self.tempfile = tempfile.TemporaryFile()
-
- # Mock out the pipes.
- mock_pipe_end_0, mock_pipe_end_1 = mock.MagicMock(), mock.MagicMock()
- self.console = console.Console(self.tempfile.fileno(), self.tempfile,
- tempfile.TemporaryFile(),
- mock_pipe_end_0, mock_pipe_end_1, "EC")
-
- @mock.patch('ec3po.console.Console.CheckForEnhancedECImage')
- def test_ActAsPassThruInNonEnhancedMode(self, mock_check):
- """Verify we simply pass everything thru to non-enhanced ECs.
+ # Also, verify that the input buffer is not modified.
+ CheckInputBuffer(self, test_str)
- Args:
- mock_check: A MagicMock object replacing the CheckForEnhancedECImage()
- method.
- """
- # Set the interrogation mode to always so that we actually interrogate.
- self.console.interrogation_mode = b'always'
-
- # Assume EC interrogations indicate that the image is non-enhanced.
- mock_check.return_value = False
-
- # Press enter, followed by the command, and another enter.
- input_stream = []
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
- test_command = b'version'
- input_stream.extend(BytesToByteList(test_command))
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # Expected calls to send down the pipe would be each character of the test
- # command.
- expected_calls = []
- expected_calls.append(mock.call(
- six.int2byte(console.ControlKey.CARRIAGE_RETURN)))
- for char in test_command:
- if six.PY3:
- expected_calls.append(mock.call(bytes([char])))
- else:
- expected_calls.append(mock.call(char))
- expected_calls.append(mock.call(
- six.int2byte(console.ControlKey.CARRIAGE_RETURN)))
-
- # Verify that the calls happened.
- self.console.cmd_pipe.send.assert_has_calls(expected_calls)
-
- # Since we're acting as a pass-thru, the input buffer should be empty and
- # input_buffer_pos is 0.
- CheckInputBuffer(self, b'')
- CheckInputBufferPosition(self, 0)
-
- @mock.patch('ec3po.console.Console.CheckForEnhancedECImage')
- def test_TransitionFromNonEnhancedToEnhanced(self, mock_check):
- """Verify that we transition correctly to enhanced mode.
+ # We expect the test string, followed by a jump to the beginning of the
+ # line, and finally a move right 1.
+ exp_console_out = test_str + OutputStream.MoveCursorLeft(len((test_str)))
+
+ # A move right 1 column.
+ exp_console_out += OutputStream.MoveCursorRight(1)
+
+ # Verify console output.
+ CheckConsoleOutput(self, exp_console_out)
+
+ def test_MoveRightWithCtrlF(self):
+ """Move cursor forward one column with Ctrl+F."""
+ test_str = b"panicinfo"
+ input_stream = BytesToByteList(test_str)
+ input_stream.append(console.ControlKey.CTRL_A)
+ # Now, move right one column.
+ input_stream.append(console.ControlKey.CTRL_F)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # Verify that the input buffer position is 1.
+ CheckInputBufferPosition(self, 1)
+
+ # Also, verify that the input buffer is not modified.
+ CheckInputBuffer(self, test_str)
+
+ # We expect the test string, followed by a jump to the beginning of the
+ # line, and finally a move right 1.
+ exp_console_out = test_str + OutputStream.MoveCursorLeft(len((test_str)))
+
+ # A move right 1 column.
+ exp_console_out += OutputStream.MoveCursorRight(1)
+
+ # Verify console output.
+ CheckConsoleOutput(self, exp_console_out)
+
+ def test_ImpossibleMoveLeftWithArrowKey(self):
+ """Verify that we can't move left at the beginning of the line."""
+ # We shouldn't be able to move left if we're at the beginning of the line.
+ input_stream = Keys.LEFT_ARROW
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # Nothing should have been output.
+ exp_console_output = b""
+ CheckConsoleOutput(self, exp_console_output)
+
+ # The input buffer position should still be 0.
+ CheckInputBufferPosition(self, 0)
+
+ # The input buffer itself should be empty.
+ CheckInputBuffer(self, b"")
+
+ def test_ImpossibleMoveRightWithArrowKey(self):
+ """Verify that we can't move right at the end of the line."""
+ # We shouldn't be able to move right if we're at the end of the line.
+ input_stream = Keys.RIGHT_ARROW
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # Nothing should have been output.
+ exp_console_output = b""
+ CheckConsoleOutput(self, exp_console_output)
+
+ # The input buffer position should still be 0.
+ CheckInputBufferPosition(self, 0)
+
+ # The input buffer itself should be empty.
+ CheckInputBuffer(self, b"")
+
+ def test_KillEntireLine(self):
+ """Verify that we can kill an entire line with Ctrl+K."""
+ test_str = b"accelinfo on"
+ input_stream = BytesToByteList(test_str)
+ # Jump to beginning of line and then kill it with Ctrl+K.
+ input_stream.extend([console.ControlKey.CTRL_A, console.ControlKey.CTRL_K])
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # First, we expect that the input buffer is empty.
+ CheckInputBuffer(self, b"")
+
+ # The buffer position should be 0.
+ CheckInputBufferPosition(self, 0)
+
+ # What we expect to see on the console stream should be the following. The
+ # test string, a jump to the beginning of the line, then jump back to the
+ # end of the line and replace the line with spaces.
+ exp_console_out = test_str
+ # Jump to beginning of line.
+ exp_console_out += OutputStream.MoveCursorLeft(len(test_str))
+ # Jump to end of line.
+ exp_console_out += OutputStream.MoveCursorRight(len(test_str))
+ # Replace line with spaces, which looks like backspaces.
+ for _ in range(len(test_str)):
+ exp_console_out += BACKSPACE_STRING
+
+ # Verify the console output.
+ CheckConsoleOutput(self, exp_console_out)
+
+ def test_KillPartialLine(self):
+ """Verify that we can kill a portion of a line."""
+ test_str = b"accelread 0 1"
+ input_stream = BytesToByteList(test_str)
+ len_to_kill = 5
+ for _ in range(len_to_kill):
+ # Move cursor left
+ input_stream.extend(Keys.LEFT_ARROW)
+ # Now kill
+ input_stream.append(console.ControlKey.CTRL_K)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # First, check that the input buffer was truncated.
+ exp_input_buffer = test_str[:-len_to_kill]
+ CheckInputBuffer(self, exp_input_buffer)
+
+ # Verify the input buffer position.
+ CheckInputBufferPosition(self, len(test_str) - len_to_kill)
+
+ # The console output stream that we expect is the test string followed by a
+ # move left of len_to_kill, then a jump to the end of the line and backspace
+ # of len_to_kill.
+ exp_console_out = test_str
+ for _ in range(len_to_kill):
+ # Move left 1 column.
+ exp_console_out += OutputStream.MoveCursorLeft(1)
+ # Then jump to the end of the line
+ exp_console_out += OutputStream.MoveCursorRight(len_to_kill)
+ # Backspace of len_to_kill
+ for _ in range(len_to_kill):
+ exp_console_out += BACKSPACE_STRING
+
+ # Verify console output.
+ CheckConsoleOutput(self, exp_console_out)
+
+ def test_InsertingCharacters(self):
+ """Verify that we can insert characters within the line."""
+ test_str = b"accel 0 1" # Here we forgot the 'read' part in 'accelread'
+ input_stream = BytesToByteList(test_str)
+ # We need to move over to the 'l' and add read.
+ insertion_point = test_str.find(b"l") + 1
+ for i in range(len(test_str) - insertion_point):
+ # Move cursor left.
+ input_stream.extend(Keys.LEFT_ARROW)
+ # Now, add in 'read'
+ added_str = b"read"
+ input_stream.extend(BytesToByteList(added_str))
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # First, verify that the input buffer is correct.
+ exp_input_buffer = test_str[:insertion_point] + added_str
+ exp_input_buffer += test_str[insertion_point:]
+ CheckInputBuffer(self, exp_input_buffer)
+
+ # Verify that the input buffer position is correct.
+ exp_input_buffer_pos = insertion_point + len(added_str)
+ CheckInputBufferPosition(self, exp_input_buffer_pos)
+
+ # The console output stream that we expect is the test string, followed by
+ # move cursor left until the 'l' was found, the added test string while
+ # shifting characters around.
+ exp_console_out = test_str
+ for i in range(len(test_str) - insertion_point):
+ # Move cursor left.
+ exp_console_out += OutputStream.MoveCursorLeft(1)
+
+ # Now for each character, write the rest of the line will be shifted to the
+ # right one column.
+ for i in range(len(added_str)):
+ # Printed character.
+ exp_console_out += added_str[i : i + 1]
+ # The rest of the line
+ exp_console_out += test_str[insertion_point:]
+ # Reset the cursor back left
+ reset_dist = len(test_str[insertion_point:])
+ exp_console_out += OutputStream.MoveCursorLeft(reset_dist)
+
+ # Verify the console output.
+ CheckConsoleOutput(self, exp_console_out)
+
+ def test_StoreCommandHistory(self):
+ """Verify that entered commands are stored in the history."""
+ test_commands = []
+ test_commands.append(b"help")
+ test_commands.append(b"version")
+ test_commands.append(b"accelread 0 1")
+ input_stream = []
+ for c in test_commands:
+ input_stream.extend(BytesToByteList(c))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # We expect to have the test commands in the history buffer.
+ exp_history_buf = test_commands
+ CheckHistoryBuffer(self, exp_history_buf)
+
+ def test_CycleUpThruCommandHistory(self):
+ """Verify that the UP arrow key will print itmes in the history buffer."""
+ # Enter some commands.
+ test_commands = [b"version", b"accelrange 0", b"battery", b"gettime"]
+ input_stream = []
+ for command in test_commands:
+ input_stream.extend(BytesToByteList(command))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Now, hit the UP arrow key to print the previous entries.
+ for i in range(len(test_commands)):
+ input_stream.extend(Keys.UP_ARROW)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # The expected output should be test commands with prompts printed in
+ # between, followed by line kills with the previous test commands printed.
+ exp_console_out = b""
+ for i in range(len(test_commands)):
+ exp_console_out += test_commands[i] + b"\r\n" + self.console.prompt
+
+ # When we press up, the line should be cleared and print the previous buffer
+ # entry.
+ for i in range(len(test_commands) - 1, 0, -1):
+ exp_console_out += test_commands[i]
+ # Backspace to the beginning.
+ for i in range(len(test_commands[i])):
+ exp_console_out += BACKSPACE_STRING
+
+ # The last command should just be printed out with no backspacing.
+ exp_console_out += test_commands[0]
+
+ # Now, verify.
+ CheckConsoleOutput(self, exp_console_out)
+
+ def test_UpArrowOnEmptyHistory(self):
+ """Ensure nothing happens if the history is empty."""
+ # Press the up arrow key twice.
+ input_stream = 2 * Keys.UP_ARROW
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # We expect nothing to have happened.
+ exp_console_out = b""
+ exp_input_buffer = b""
+ exp_input_buffer_pos = 0
+ exp_history_buf = []
+
+ # Verify.
+ CheckConsoleOutput(self, exp_console_out)
+ CheckInputBufferPosition(self, exp_input_buffer_pos)
+ CheckInputBuffer(self, exp_input_buffer)
+ CheckHistoryBuffer(self, exp_history_buf)
+
+ def test_UpArrowDoesNotGoOutOfBounds(self):
+ """Verify that pressing the up arrow many times won't go out of bounds."""
+ # Enter one command.
+ test_str = b"help version"
+ input_stream = BytesToByteList(test_str)
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+ # Then press the up arrow key twice.
+ input_stream.extend(2 * Keys.UP_ARROW)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # Verify that the history buffer is correct.
+ exp_history_buf = [test_str]
+ CheckHistoryBuffer(self, exp_history_buf)
+
+ # We expect that the console output should only contain our entered command,
+ # a new prompt, and then our command aggain.
+ exp_console_out = test_str + b"\r\n" + self.console.prompt
+ # Pressing up should reprint the command we entered.
+ exp_console_out += test_str
+
+ # Verify.
+ CheckConsoleOutput(self, exp_console_out)
+
+ def test_CycleDownThruCommandHistory(self):
+ """Verify that we can select entries by hitting the down arrow."""
+ # Enter at least 4 commands.
+ test_commands = [b"version", b"accelrange 0", b"battery", b"gettime"]
+ input_stream = []
+ for command in test_commands:
+ input_stream.extend(BytesToByteList(command))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Now, hit the UP arrow key twice to print the previous two entries.
+ for i in range(2):
+ input_stream.extend(Keys.UP_ARROW)
+
+ # Now, hit the DOWN arrow key twice to print the newer entries.
+ input_stream.extend(2 * Keys.DOWN_ARROW)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # The expected output should be commands that we entered, followed by
+ # prompts, then followed by our last two commands in reverse. Then, we
+ # should see the last entry in the list, followed by the saved partial cmd
+ # of a blank line.
+ exp_console_out = b""
+ for i in range(len(test_commands)):
+ exp_console_out += test_commands[i] + b"\r\n" + self.console.prompt
+
+ # When we press up, the line should be cleared and print the previous buffer
+ # entry.
+ for i in range(len(test_commands) - 1, 1, -1):
+ exp_console_out += test_commands[i]
+ # Backspace to the beginning.
+ for i in range(len(test_commands[i])):
+ exp_console_out += BACKSPACE_STRING
+
+ # When we press down, it should have cleared the last command (which we
+ # covered with the previous for loop), and then prints the next command.
+ exp_console_out += test_commands[3]
+ for i in range(len(test_commands[3])):
+ exp_console_out += BACKSPACE_STRING
+
+ # Verify console output.
+ CheckConsoleOutput(self, exp_console_out)
+
+ # Verify input buffer.
+ exp_input_buffer = b"" # Empty because our partial command was empty.
+ exp_input_buffer_pos = len(exp_input_buffer)
+ CheckInputBuffer(self, exp_input_buffer)
+ CheckInputBufferPosition(self, exp_input_buffer_pos)
+
+ def test_SavingPartialCommandWhenNavigatingHistory(self):
+ """Verify that partial commands are saved when navigating history."""
+ # Enter a command.
+ test_str = b"accelinfo"
+ input_stream = BytesToByteList(test_str)
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Enter a partial command.
+ partial_cmd = b"ver"
+ input_stream.extend(BytesToByteList(partial_cmd))
+
+ # Hit the UP arrow key.
+ input_stream.extend(Keys.UP_ARROW)
+ # Then, the DOWN arrow key.
+ input_stream.extend(Keys.DOWN_ARROW)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # The expected output should be the command we entered, a prompt, the
+ # partial command, clearing of the partial command, the command entered,
+ # clearing of the command entered, and then the partial command.
+ exp_console_out = test_str + b"\r\n" + self.console.prompt
+ exp_console_out += partial_cmd
+ for _ in range(len(partial_cmd)):
+ exp_console_out += BACKSPACE_STRING
+ exp_console_out += test_str
+ for _ in range(len(test_str)):
+ exp_console_out += BACKSPACE_STRING
+ exp_console_out += partial_cmd
+
+ # Verify console output.
+ CheckConsoleOutput(self, exp_console_out)
+
+ # Verify input buffer.
+ exp_input_buffer = partial_cmd
+ exp_input_buffer_pos = len(exp_input_buffer)
+ CheckInputBuffer(self, exp_input_buffer)
+ CheckInputBufferPosition(self, exp_input_buffer_pos)
+
+ def test_DownArrowOnEmptyHistory(self):
+ """Ensure nothing happens if the history is empty."""
+ # Then press the up down arrow twice.
+ input_stream = 2 * Keys.DOWN_ARROW
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # We expect nothing to have happened.
+ exp_console_out = b""
+ exp_input_buffer = b""
+ exp_input_buffer_pos = 0
+ exp_history_buf = []
+
+ # Verify.
+ CheckConsoleOutput(self, exp_console_out)
+ CheckInputBufferPosition(self, exp_input_buffer_pos)
+ CheckInputBuffer(self, exp_input_buffer)
+ CheckHistoryBuffer(self, exp_history_buf)
+
+ def test_DeleteCharsUsingDELKey(self):
+ """Verify that we can delete characters using the DEL key."""
+ test_str = b"version"
+ input_stream = BytesToByteList(test_str)
+
+ # Hit the left arrow key 2 times.
+ input_stream.extend(2 * Keys.LEFT_ARROW)
+
+ # Press the DEL key.
+ input_stream.extend(Keys.DEL)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # The expected output should be the command we entered, 2 individual cursor
+ # moves to the left, and then removing a char and shifting everything to the
+ # left one column.
+ exp_console_out = test_str
+ exp_console_out += 2 * OutputStream.MoveCursorLeft(1)
+
+ # Remove the char by shifting everything to the left one, slicing out the
+ # remove char.
+ exp_console_out += test_str[-1:] + b" "
+
+ # Reset the cursor by moving back 2 columns because of the 'n' and space.
+ exp_console_out += OutputStream.MoveCursorLeft(2)
+
+ # Verify console output.
+ CheckConsoleOutput(self, exp_console_out)
+
+ # Verify input buffer. The input buffer should have the char sliced out and
+ # be positioned where the char was removed.
+ exp_input_buffer = test_str[:-2] + test_str[-1:]
+ exp_input_buffer_pos = len(exp_input_buffer) - 1
+ CheckInputBuffer(self, exp_input_buffer)
+ CheckInputBufferPosition(self, exp_input_buffer_pos)
+
+ def test_RepeatedCommandInHistory(self):
+ """Verify that we don't store 2 consecutive identical commands in history"""
+ # Enter a few commands.
+ test_commands = [b"version", b"accelrange 0", b"battery", b"gettime"]
+ # Repeat the last command.
+ test_commands.append(test_commands[len(test_commands) - 1])
+
+ input_stream = []
+ for command in test_commands:
+ input_stream.extend(BytesToByteList(command))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # Verify that the history buffer is correct. The last command, since
+ # it was repeated, should not have been added to the history.
+ exp_history_buf = test_commands[0 : len(test_commands) - 1]
+ CheckHistoryBuffer(self, exp_history_buf)
- Args:
- mock_check: A MagicMock object replacing the CheckForEnhancedECImage()
- method.
- """
- # Set the interrogation mode to always so that we actually interrogate.
- self.console.interrogation_mode = b'always'
-
- # First, assume that the EC interrogations indicate an enhanced EC image.
- mock_check.return_value = True
- # But our current knowledge of the EC image (which was actually the
- # 'previous' EC) was a non-enhanced image.
- self.console.enhanced_ec = False
-
- test_command = b'sysinfo'
- input_stream = []
- input_stream.extend(BytesToByteList(test_command))
-
- expected_calls = []
- # All keystrokes to the console should be directed straight through to the
- # EC until we press the enter key.
- for char in test_command:
- if six.PY3:
- expected_calls.append(mock.call(bytes([char])))
- else:
- expected_calls.append(mock.call(char))
-
- # Press the enter key.
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
- # The enter key should not be sent to the pipe since we should negotiate
- # to an enhanced EC image.
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # At this point, we should have negotiated to enhanced.
- self.assertTrue(self.console.enhanced_ec, msg=('Did not negotiate to '
- 'enhanced EC image.'))
-
- # The command would have been dropped however, so verify this...
- CheckInputBuffer(self, b'')
- CheckInputBufferPosition(self, 0)
- # ...and repeat the command.
- input_stream = BytesToByteList(test_command)
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # Since we're enhanced now, we should have sent the entire command as one
- # string with no trailing carriage return
- expected_calls.append(mock.call(test_command))
-
- # Verify all of the calls.
- self.console.cmd_pipe.send.assert_has_calls(expected_calls)
-
- @mock.patch('ec3po.console.Console.CheckForEnhancedECImage')
- def test_TransitionFromEnhancedToNonEnhanced(self, mock_check):
- """Verify that we transition correctly to non-enhanced mode.
- Args:
- mock_check: A MagicMock object replacing the CheckForEnhancedECImage()
- method.
- """
- # Set the interrogation mode to always so that we actually interrogate.
- self.console.interrogation_mode = b'always'
-
- # First, assume that the EC interrogations indicate an non-enhanced EC
- # image.
- mock_check.return_value = False
- # But our current knowledge of the EC image (which was actually the
- # 'previous' EC) was an enhanced image.
- self.console.enhanced_ec = True
-
- test_command = b'sysinfo'
- input_stream = []
- input_stream.extend(BytesToByteList(test_command))
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # But, we will negotiate to non-enhanced however, dropping this command.
- # Verify this.
- self.assertFalse(self.console.enhanced_ec, msg=('Did not negotiate to'
- 'non-enhanced EC image.'))
- CheckInputBuffer(self, b'')
- CheckInputBufferPosition(self, 0)
-
- # The carriage return should have passed through though.
- expected_calls = []
- expected_calls.append(mock.call(
- six.int2byte(console.ControlKey.CARRIAGE_RETURN)))
-
- # Since the command was dropped, repeat the command.
- input_stream = BytesToByteList(test_command)
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # Since we're not enhanced now, we should have sent each character in the
- # entire command separately and a carriage return.
- for char in test_command:
- if six.PY3:
- expected_calls.append(mock.call(bytes([char])))
- else:
- expected_calls.append(mock.call(char))
- expected_calls.append(mock.call(
- six.int2byte(console.ControlKey.CARRIAGE_RETURN)))
-
- # Verify all of the calls.
- self.console.cmd_pipe.send.assert_has_calls(expected_calls)
-
- def test_EnhancedCheckIfTimedOut(self):
- """Verify that the check returns false if it times out."""
- # Make the debug pipe "time out".
- self.console.dbg_pipe.poll.return_value = False
- self.assertFalse(self.console.CheckForEnhancedECImage())
-
- def test_EnhancedCheckIfACKReceived(self):
- """Verify that the check returns true if the ACK is received."""
- # Make the debug pipe return EC_ACK.
- self.console.dbg_pipe.poll.return_value = True
- self.console.dbg_pipe.recv.return_value = interpreter.EC_ACK
- self.assertTrue(self.console.CheckForEnhancedECImage())
-
- def test_EnhancedCheckIfWrong(self):
- """Verify that the check returns false if byte received is wrong."""
- # Make the debug pipe return the wrong byte.
- self.console.dbg_pipe.poll.return_value = True
- self.console.dbg_pipe.recv.return_value = b'\xff'
- self.assertFalse(self.console.CheckForEnhancedECImage())
-
- def test_EnhancedCheckUsingBuffer(self):
- """Verify that given reboot output, enhanced EC images are detected."""
- enhanced_output_stream = b"""
+class TestConsoleCompatibility(unittest.TestCase):
+ """Verify that console can speak to enhanced and non-enhanced EC images."""
+
+ def setUp(self):
+ """Setup the test harness."""
+ # Setup logging with a timestamp, the module, and the log level.
+ logging.basicConfig(
+ level=logging.DEBUG,
+ format=("%(asctime)s - %(module)s -" " %(levelname)s - %(message)s"),
+ )
+ # Create a temp file and set both the controller and peripheral PTYs to the
+ # file to create a loopback.
+ self.tempfile = tempfile.TemporaryFile()
+
+ # Mock out the pipes.
+ mock_pipe_end_0, mock_pipe_end_1 = mock.MagicMock(), mock.MagicMock()
+ self.console = console.Console(
+ self.tempfile.fileno(),
+ self.tempfile,
+ tempfile.TemporaryFile(),
+ mock_pipe_end_0,
+ mock_pipe_end_1,
+ "EC",
+ )
+
+ @mock.patch("ec3po.console.Console.CheckForEnhancedECImage")
+ def test_ActAsPassThruInNonEnhancedMode(self, mock_check):
+ """Verify we simply pass everything thru to non-enhanced ECs.
+
+ Args:
+ mock_check: A MagicMock object replacing the CheckForEnhancedECImage()
+ method.
+ """
+ # Set the interrogation mode to always so that we actually interrogate.
+ self.console.interrogation_mode = b"always"
+
+ # Assume EC interrogations indicate that the image is non-enhanced.
+ mock_check.return_value = False
+
+ # Press enter, followed by the command, and another enter.
+ input_stream = []
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+ test_command = b"version"
+ input_stream.extend(BytesToByteList(test_command))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # Expected calls to send down the pipe would be each character of the test
+ # command.
+ expected_calls = []
+ expected_calls.append(
+ mock.call(six.int2byte(console.ControlKey.CARRIAGE_RETURN))
+ )
+ for char in test_command:
+ if six.PY3:
+ expected_calls.append(mock.call(bytes([char])))
+ else:
+ expected_calls.append(mock.call(char))
+ expected_calls.append(
+ mock.call(six.int2byte(console.ControlKey.CARRIAGE_RETURN))
+ )
+
+ # Verify that the calls happened.
+ self.console.cmd_pipe.send.assert_has_calls(expected_calls)
+
+ # Since we're acting as a pass-thru, the input buffer should be empty and
+ # input_buffer_pos is 0.
+ CheckInputBuffer(self, b"")
+ CheckInputBufferPosition(self, 0)
+
+ @mock.patch("ec3po.console.Console.CheckForEnhancedECImage")
+ def test_TransitionFromNonEnhancedToEnhanced(self, mock_check):
+ """Verify that we transition correctly to enhanced mode.
+
+ Args:
+ mock_check: A MagicMock object replacing the CheckForEnhancedECImage()
+ method.
+ """
+ # Set the interrogation mode to always so that we actually interrogate.
+ self.console.interrogation_mode = b"always"
+
+ # First, assume that the EC interrogations indicate an enhanced EC image.
+ mock_check.return_value = True
+ # But our current knowledge of the EC image (which was actually the
+ # 'previous' EC) was a non-enhanced image.
+ self.console.enhanced_ec = False
+
+ test_command = b"sysinfo"
+ input_stream = []
+ input_stream.extend(BytesToByteList(test_command))
+
+ expected_calls = []
+ # All keystrokes to the console should be directed straight through to the
+ # EC until we press the enter key.
+ for char in test_command:
+ if six.PY3:
+ expected_calls.append(mock.call(bytes([char])))
+ else:
+ expected_calls.append(mock.call(char))
+
+ # Press the enter key.
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+ # The enter key should not be sent to the pipe since we should negotiate
+ # to an enhanced EC image.
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # At this point, we should have negotiated to enhanced.
+ self.assertTrue(
+ self.console.enhanced_ec, msg=("Did not negotiate to " "enhanced EC image.")
+ )
+
+ # The command would have been dropped however, so verify this...
+ CheckInputBuffer(self, b"")
+ CheckInputBufferPosition(self, 0)
+ # ...and repeat the command.
+ input_stream = BytesToByteList(test_command)
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # Since we're enhanced now, we should have sent the entire command as one
+ # string with no trailing carriage return
+ expected_calls.append(mock.call(test_command))
+
+ # Verify all of the calls.
+ self.console.cmd_pipe.send.assert_has_calls(expected_calls)
+
+ @mock.patch("ec3po.console.Console.CheckForEnhancedECImage")
+ def test_TransitionFromEnhancedToNonEnhanced(self, mock_check):
+ """Verify that we transition correctly to non-enhanced mode.
+
+ Args:
+ mock_check: A MagicMock object replacing the CheckForEnhancedECImage()
+ method.
+ """
+ # Set the interrogation mode to always so that we actually interrogate.
+ self.console.interrogation_mode = b"always"
+
+ # First, assume that the EC interrogations indicate an non-enhanced EC
+ # image.
+ mock_check.return_value = False
+ # But our current knowledge of the EC image (which was actually the
+ # 'previous' EC) was an enhanced image.
+ self.console.enhanced_ec = True
+
+ test_command = b"sysinfo"
+ input_stream = []
+ input_stream.extend(BytesToByteList(test_command))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # But, we will negotiate to non-enhanced however, dropping this command.
+ # Verify this.
+ self.assertFalse(
+ self.console.enhanced_ec,
+ msg=("Did not negotiate to" "non-enhanced EC image."),
+ )
+ CheckInputBuffer(self, b"")
+ CheckInputBufferPosition(self, 0)
+
+ # The carriage return should have passed through though.
+ expected_calls = []
+ expected_calls.append(
+ mock.call(six.int2byte(console.ControlKey.CARRIAGE_RETURN))
+ )
+
+ # Since the command was dropped, repeat the command.
+ input_stream = BytesToByteList(test_command)
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # Since we're not enhanced now, we should have sent each character in the
+ # entire command separately and a carriage return.
+ for char in test_command:
+ if six.PY3:
+ expected_calls.append(mock.call(bytes([char])))
+ else:
+ expected_calls.append(mock.call(char))
+ expected_calls.append(
+ mock.call(six.int2byte(console.ControlKey.CARRIAGE_RETURN))
+ )
+
+ # Verify all of the calls.
+ self.console.cmd_pipe.send.assert_has_calls(expected_calls)
+
+ def test_EnhancedCheckIfTimedOut(self):
+ """Verify that the check returns false if it times out."""
+ # Make the debug pipe "time out".
+ self.console.dbg_pipe.poll.return_value = False
+ self.assertFalse(self.console.CheckForEnhancedECImage())
+
+ def test_EnhancedCheckIfACKReceived(self):
+ """Verify that the check returns true if the ACK is received."""
+ # Make the debug pipe return EC_ACK.
+ self.console.dbg_pipe.poll.return_value = True
+ self.console.dbg_pipe.recv.return_value = interpreter.EC_ACK
+ self.assertTrue(self.console.CheckForEnhancedECImage())
+
+ def test_EnhancedCheckIfWrong(self):
+ """Verify that the check returns false if byte received is wrong."""
+ # Make the debug pipe return the wrong byte.
+ self.console.dbg_pipe.poll.return_value = True
+ self.console.dbg_pipe.recv.return_value = b"\xff"
+ self.assertFalse(self.console.CheckForEnhancedECImage())
+
+ def test_EnhancedCheckUsingBuffer(self):
+ """Verify that given reboot output, enhanced EC images are detected."""
+ enhanced_output_stream = b"""
--- UART initialized after reboot ---
[Reset cause: reset-pin soft]
[Image: RO, jerry_v1.1.4363-2af8572-dirty 2016-02-23 13:26:20 aaboagye@lithium.mtv.corp.google.com]
@@ -1295,19 +1343,19 @@ Enhanced Console is enabled (v1.0.0); type HELP for help.
[0.224060 hash done 41dac382e3a6e3d2ea5b4d789c1bc46525cae7cc5ff6758f0de8d8369b506f57]
[0.375150 POWER_GOOD seen]
"""
- for line in enhanced_output_stream.split(b'\n'):
- self.console.CheckBufferForEnhancedImage(line)
+ for line in enhanced_output_stream.split(b"\n"):
+ self.console.CheckBufferForEnhancedImage(line)
- # Since the enhanced console string was present in the output, the console
- # should have caught it.
- self.assertTrue(self.console.enhanced_ec)
+ # Since the enhanced console string was present in the output, the console
+ # should have caught it.
+ self.assertTrue(self.console.enhanced_ec)
- # Also should check that the command was sent to the interpreter.
- self.console.cmd_pipe.send.assert_called_once_with(b'enhanced True')
+ # Also should check that the command was sent to the interpreter.
+ self.console.cmd_pipe.send.assert_called_once_with(b"enhanced True")
- # Now test the non-enhanced EC image.
- self.console.cmd_pipe.reset_mock()
- non_enhanced_output_stream = b"""
+ # Now test the non-enhanced EC image.
+ self.console.cmd_pipe.reset_mock()
+ non_enhanced_output_stream = b"""
--- UART initialized after reboot ---
[Reset cause: reset-pin soft]
[Image: RO, jerry_v1.1.4363-2af8572-dirty 2016-02-23 13:03:15 aaboagye@lithium.mtv.corp.google.com]
@@ -1331,239 +1379,253 @@ Console is enabled; type HELP for help.
[0.010285 power on 2]
[0.010385 power state 5 = S5->S3, in 0x0000]
"""
- for line in non_enhanced_output_stream.split(b'\n'):
- self.console.CheckBufferForEnhancedImage(line)
+ for line in non_enhanced_output_stream.split(b"\n"):
+ self.console.CheckBufferForEnhancedImage(line)
- # Since the default console string is present in the output, it should be
- # determined to be non enhanced now.
- self.assertFalse(self.console.enhanced_ec)
+ # Since the default console string is present in the output, it should be
+ # determined to be non enhanced now.
+ self.assertFalse(self.console.enhanced_ec)
- # Check that command was also sent to the interpreter.
- self.console.cmd_pipe.send.assert_called_once_with(b'enhanced False')
+ # Check that command was also sent to the interpreter.
+ self.console.cmd_pipe.send.assert_called_once_with(b"enhanced False")
class TestOOBMConsoleCommands(unittest.TestCase):
- """Verify that OOBM console commands work correctly."""
- def setUp(self):
- """Setup the test harness."""
- # Setup logging with a timestamp, the module, and the log level.
- logging.basicConfig(level=logging.DEBUG,
- format=('%(asctime)s - %(module)s -'
- ' %(levelname)s - %(message)s'))
- # Create a temp file and set both the controller and peripheral PTYs to the
- # file to create a loopback.
- self.tempfile = tempfile.TemporaryFile()
-
- # Mock out the pipes.
- mock_pipe_end_0, mock_pipe_end_1 = mock.MagicMock(), mock.MagicMock()
- self.console = console.Console(self.tempfile.fileno(), self.tempfile,
- tempfile.TemporaryFile(),
- mock_pipe_end_0, mock_pipe_end_1, "EC")
- self.console.oobm_queue = mock.MagicMock()
-
- @mock.patch('ec3po.console.Console.CheckForEnhancedECImage')
- def test_InterrogateCommand(self, mock_check):
- """Verify that 'interrogate' command works as expected.
-
- Args:
- mock_check: A MagicMock object replacing the CheckForEnhancedECIMage()
- method.
- """
- input_stream = []
- expected_calls = []
- mock_check.side_effect = [False]
-
- # 'interrogate never' should disable the interrogation from happening at
- # all.
- cmd = b'interrogate never'
- # Enter the OOBM prompt.
- input_stream.extend(BytesToByteList(b'%'))
- # Type the command
- input_stream.extend(BytesToByteList(cmd))
- # Press enter.
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- input_stream = []
-
- # The OOBM queue should have been called with the command being put.
- expected_calls.append(mock.call.put(cmd))
- self.console.oobm_queue.assert_has_calls(expected_calls)
-
- # Process the OOBM queue.
- self.console.oobm_queue.get.side_effect = [cmd]
- self.console.ProcessOOBMQueue()
-
- # Type out a few commands.
- input_stream.extend(BytesToByteList(b'version'))
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
- input_stream.extend(BytesToByteList(b'flashinfo'))
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
- input_stream.extend(BytesToByteList(b'sysinfo'))
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # The Check function should NOT have been called at all.
- mock_check.assert_not_called()
-
- # The EC image should be assumed to be not enhanced.
- self.assertFalse(self.console.enhanced_ec, 'The image should be assumed to'
- ' be NOT enhanced.')
-
- # Reset the mocks.
- mock_check.reset_mock()
- self.console.oobm_queue.reset_mock()
-
- # 'interrogate auto' should not interrogate at all. It should only be
- # scanning the output stream for the 'console is enabled' strings.
- cmd = b'interrogate auto'
- # Enter the OOBM prompt.
- input_stream.extend(BytesToByteList(b'%'))
- # Type the command
- input_stream.extend(BytesToByteList(cmd))
- # Press enter.
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- input_stream = []
- expected_calls = []
-
- # The OOBM queue should have been called with the command being put.
- expected_calls.append(mock.call.put(cmd))
- self.console.oobm_queue.assert_has_calls(expected_calls)
-
- # Process the OOBM queue.
- self.console.oobm_queue.get.side_effect = [cmd]
- self.console.ProcessOOBMQueue()
-
- # Type out a few commands.
- input_stream.extend(BytesToByteList(b'version'))
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
- input_stream.extend(BytesToByteList(b'flashinfo'))
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
- input_stream.extend(BytesToByteList(b'sysinfo'))
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # The Check function should NOT have been called at all.
- mock_check.assert_not_called()
-
- # The EC image should be assumed to be not enhanced.
- self.assertFalse(self.console.enhanced_ec, 'The image should be assumed to'
- ' be NOT enhanced.')
-
- # Reset the mocks.
- mock_check.reset_mock()
- self.console.oobm_queue.reset_mock()
-
- # 'interrogate always' should, like its name implies, interrogate always
- # after each press of the enter key. This was the former way of doing
- # interrogation.
- cmd = b'interrogate always'
- # Enter the OOBM prompt.
- input_stream.extend(BytesToByteList(b'%'))
- # Type the command
- input_stream.extend(BytesToByteList(cmd))
- # Press enter.
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- input_stream = []
- expected_calls = []
-
- # The OOBM queue should have been called with the command being put.
- expected_calls.append(mock.call.put(cmd))
- self.console.oobm_queue.assert_has_calls(expected_calls)
-
- # Process the OOBM queue.
- self.console.oobm_queue.get.side_effect = [cmd]
- self.console.ProcessOOBMQueue()
-
- # The Check method should be called 3 times here.
- mock_check.side_effect = [False, False, False]
-
- # Type out a few commands.
- input_stream.extend(BytesToByteList(b'help list'))
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
- input_stream.extend(BytesToByteList(b'taskinfo'))
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
- input_stream.extend(BytesToByteList(b'hibdelay'))
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # The Check method should have been called 3 times here.
- expected_calls = [mock.call(), mock.call(), mock.call()]
- mock_check.assert_has_calls(expected_calls)
-
- # The EC image should be assumed to be not enhanced.
- self.assertFalse(self.console.enhanced_ec, 'The image should be assumed to'
- ' be NOT enhanced.')
-
- # Now, let's try to assume that the image is enhanced while still disabling
- # interrogation.
- mock_check.reset_mock()
- self.console.oobm_queue.reset_mock()
- input_stream = []
- cmd = b'interrogate never enhanced'
- # Enter the OOBM prompt.
- input_stream.extend(BytesToByteList(b'%'))
- # Type the command
- input_stream.extend(BytesToByteList(cmd))
- # Press enter.
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- input_stream = []
- expected_calls = []
-
- # The OOBM queue should have been called with the command being put.
- expected_calls.append(mock.call.put(cmd))
- self.console.oobm_queue.assert_has_calls(expected_calls)
-
- # Process the OOBM queue.
- self.console.oobm_queue.get.side_effect = [cmd]
- self.console.ProcessOOBMQueue()
-
- # Type out a few commands.
- input_stream.extend(BytesToByteList(b'chgstate'))
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
- input_stream.extend(BytesToByteList(b'hash'))
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
- input_stream.extend(BytesToByteList(b'sysjump rw'))
- input_stream.append(console.ControlKey.CARRIAGE_RETURN)
-
- # Send the sequence out.
- for byte in input_stream:
- self.console.HandleChar(byte)
-
- # The check method should have never been called.
- mock_check.assert_not_called()
-
- # The EC image should be assumed to be enhanced.
- self.assertTrue(self.console.enhanced_ec, 'The image should be'
- ' assumed to be enhanced.')
-
-
-if __name__ == '__main__':
- unittest.main()
+ """Verify that OOBM console commands work correctly."""
+
+ def setUp(self):
+ """Setup the test harness."""
+ # Setup logging with a timestamp, the module, and the log level.
+ logging.basicConfig(
+ level=logging.DEBUG,
+ format=("%(asctime)s - %(module)s -" " %(levelname)s - %(message)s"),
+ )
+ # Create a temp file and set both the controller and peripheral PTYs to the
+ # file to create a loopback.
+ self.tempfile = tempfile.TemporaryFile()
+
+ # Mock out the pipes.
+ mock_pipe_end_0, mock_pipe_end_1 = mock.MagicMock(), mock.MagicMock()
+ self.console = console.Console(
+ self.tempfile.fileno(),
+ self.tempfile,
+ tempfile.TemporaryFile(),
+ mock_pipe_end_0,
+ mock_pipe_end_1,
+ "EC",
+ )
+ self.console.oobm_queue = mock.MagicMock()
+
+ @mock.patch("ec3po.console.Console.CheckForEnhancedECImage")
+ def test_InterrogateCommand(self, mock_check):
+ """Verify that 'interrogate' command works as expected.
+
+ Args:
+ mock_check: A MagicMock object replacing the CheckForEnhancedECIMage()
+ method.
+ """
+ input_stream = []
+ expected_calls = []
+ mock_check.side_effect = [False]
+
+ # 'interrogate never' should disable the interrogation from happening at
+ # all.
+ cmd = b"interrogate never"
+ # Enter the OOBM prompt.
+ input_stream.extend(BytesToByteList(b"%"))
+ # Type the command
+ input_stream.extend(BytesToByteList(cmd))
+ # Press enter.
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ input_stream = []
+
+ # The OOBM queue should have been called with the command being put.
+ expected_calls.append(mock.call.put(cmd))
+ self.console.oobm_queue.assert_has_calls(expected_calls)
+
+ # Process the OOBM queue.
+ self.console.oobm_queue.get.side_effect = [cmd]
+ self.console.ProcessOOBMQueue()
+
+ # Type out a few commands.
+ input_stream.extend(BytesToByteList(b"version"))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+ input_stream.extend(BytesToByteList(b"flashinfo"))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+ input_stream.extend(BytesToByteList(b"sysinfo"))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # The Check function should NOT have been called at all.
+ mock_check.assert_not_called()
+
+ # The EC image should be assumed to be not enhanced.
+ self.assertFalse(
+ self.console.enhanced_ec,
+ "The image should be assumed to" " be NOT enhanced.",
+ )
+
+ # Reset the mocks.
+ mock_check.reset_mock()
+ self.console.oobm_queue.reset_mock()
+
+ # 'interrogate auto' should not interrogate at all. It should only be
+ # scanning the output stream for the 'console is enabled' strings.
+ cmd = b"interrogate auto"
+ # Enter the OOBM prompt.
+ input_stream.extend(BytesToByteList(b"%"))
+ # Type the command
+ input_stream.extend(BytesToByteList(cmd))
+ # Press enter.
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ input_stream = []
+ expected_calls = []
+
+ # The OOBM queue should have been called with the command being put.
+ expected_calls.append(mock.call.put(cmd))
+ self.console.oobm_queue.assert_has_calls(expected_calls)
+
+ # Process the OOBM queue.
+ self.console.oobm_queue.get.side_effect = [cmd]
+ self.console.ProcessOOBMQueue()
+
+ # Type out a few commands.
+ input_stream.extend(BytesToByteList(b"version"))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+ input_stream.extend(BytesToByteList(b"flashinfo"))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+ input_stream.extend(BytesToByteList(b"sysinfo"))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # The Check function should NOT have been called at all.
+ mock_check.assert_not_called()
+
+ # The EC image should be assumed to be not enhanced.
+ self.assertFalse(
+ self.console.enhanced_ec,
+ "The image should be assumed to" " be NOT enhanced.",
+ )
+
+ # Reset the mocks.
+ mock_check.reset_mock()
+ self.console.oobm_queue.reset_mock()
+
+ # 'interrogate always' should, like its name implies, interrogate always
+ # after each press of the enter key. This was the former way of doing
+ # interrogation.
+ cmd = b"interrogate always"
+ # Enter the OOBM prompt.
+ input_stream.extend(BytesToByteList(b"%"))
+ # Type the command
+ input_stream.extend(BytesToByteList(cmd))
+ # Press enter.
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ input_stream = []
+ expected_calls = []
+
+ # The OOBM queue should have been called with the command being put.
+ expected_calls.append(mock.call.put(cmd))
+ self.console.oobm_queue.assert_has_calls(expected_calls)
+
+ # Process the OOBM queue.
+ self.console.oobm_queue.get.side_effect = [cmd]
+ self.console.ProcessOOBMQueue()
+
+ # The Check method should be called 3 times here.
+ mock_check.side_effect = [False, False, False]
+
+ # Type out a few commands.
+ input_stream.extend(BytesToByteList(b"help list"))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+ input_stream.extend(BytesToByteList(b"taskinfo"))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+ input_stream.extend(BytesToByteList(b"hibdelay"))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # The Check method should have been called 3 times here.
+ expected_calls = [mock.call(), mock.call(), mock.call()]
+ mock_check.assert_has_calls(expected_calls)
+
+ # The EC image should be assumed to be not enhanced.
+ self.assertFalse(
+ self.console.enhanced_ec,
+ "The image should be assumed to" " be NOT enhanced.",
+ )
+
+ # Now, let's try to assume that the image is enhanced while still disabling
+ # interrogation.
+ mock_check.reset_mock()
+ self.console.oobm_queue.reset_mock()
+ input_stream = []
+ cmd = b"interrogate never enhanced"
+ # Enter the OOBM prompt.
+ input_stream.extend(BytesToByteList(b"%"))
+ # Type the command
+ input_stream.extend(BytesToByteList(cmd))
+ # Press enter.
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ input_stream = []
+ expected_calls = []
+
+ # The OOBM queue should have been called with the command being put.
+ expected_calls.append(mock.call.put(cmd))
+ self.console.oobm_queue.assert_has_calls(expected_calls)
+
+ # Process the OOBM queue.
+ self.console.oobm_queue.get.side_effect = [cmd]
+ self.console.ProcessOOBMQueue()
+
+ # Type out a few commands.
+ input_stream.extend(BytesToByteList(b"chgstate"))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+ input_stream.extend(BytesToByteList(b"hash"))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+ input_stream.extend(BytesToByteList(b"sysjump rw"))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # The check method should have never been called.
+ mock_check.assert_not_called()
+
+ # The EC image should be assumed to be enhanced.
+ self.assertTrue(
+ self.console.enhanced_ec, "The image should be" " assumed to be enhanced."
+ )
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/util/ec3po/interpreter.py b/util/ec3po/interpreter.py
index 4e151083bd..591603e038 100644
--- a/util/ec3po/interpreter.py
+++ b/util/ec3po/interpreter.py
@@ -25,443 +25,447 @@ import traceback
import six
-
COMMAND_RETRIES = 3 # Number of attempts to retry a command.
EC_MAX_READ = 1024 # Max bytes to read at a time from the EC.
-EC_SYN = b'\xec' # Byte indicating EC interrogation.
-EC_ACK = b'\xc0' # Byte representing correct EC response to interrogation.
+EC_SYN = b"\xec" # Byte indicating EC interrogation.
+EC_ACK = b"\xc0" # Byte representing correct EC response to interrogation.
class LoggerAdapter(logging.LoggerAdapter):
- """Class which provides a small adapter for the logger."""
+ """Class which provides a small adapter for the logger."""
- def process(self, msg, kwargs):
- """Prepends the served PTY to the beginning of the log message."""
- return '%s - %s' % (self.extra['pty'], msg), kwargs
+ def process(self, msg, kwargs):
+ """Prepends the served PTY to the beginning of the log message."""
+ return "%s - %s" % (self.extra["pty"], msg), kwargs
class Interpreter(object):
- """Class which provides the interpretation layer between the EC and user.
-
- This class essentially performs all of the intepretation for the EC and the
- user. It handles all of the automatic command retrying as well as the
- formation of commands for EC images which support that.
-
- Attributes:
- logger: A logger for this module.
- ec_uart_pty: An opened file object to the raw EC UART PTY.
- ec_uart_pty_name: A string containing the name of the raw EC UART PTY.
- cmd_pipe: A socket.socket or multiprocessing.Connection object which
- represents the Interpreter side of the command pipe. This must be a
- bidirectional pipe. Commands and responses will utilize this pipe.
- dbg_pipe: A socket.socket or multiprocessing.Connection object which
- represents the Interpreter side of the debug pipe. This must be a
- unidirectional pipe with write capabilities. EC debug output will utilize
- this pipe.
- cmd_retries: An integer representing the number of attempts the console
- should retry commands if it receives an error.
- log_level: An integer representing the numeric value of the log level.
- inputs: A list of objects that the intpreter selects for reading.
- Initially, these are the EC UART and the command pipe.
- outputs: A list of objects that the interpreter selects for writing.
- ec_cmd_queue: A FIFO queue used for sending commands down to the EC UART.
- last_cmd: A string that represents the last command sent to the EC. If an
- error is encountered, the interpreter will attempt to retry this command
- up to COMMAND_RETRIES.
- enhanced_ec: A boolean indicating if the EC image that we are currently
- communicating with is enhanced or not. Enhanced EC images will support
- packed commands and host commands over the UART. This defaults to False
- and is changed depending on the result of an interrogation.
- interrogating: A boolean indicating if we are in the middle of interrogating
- the EC.
- connected: A boolean indicating if the interpreter is actually connected to
- the UART and listening.
- """
- def __init__(self, ec_uart_pty, cmd_pipe, dbg_pipe, log_level=logging.INFO,
- name=None):
- """Intializes an Interpreter object with the provided args.
+ """Class which provides the interpretation layer between the EC and user.
- Args:
- ec_uart_pty: A string representing the EC UART to connect to.
+ This class essentially performs all of the intepretation for the EC and the
+ user. It handles all of the automatic command retrying as well as the
+ formation of commands for EC images which support that.
+
+ Attributes:
+ logger: A logger for this module.
+ ec_uart_pty: An opened file object to the raw EC UART PTY.
+ ec_uart_pty_name: A string containing the name of the raw EC UART PTY.
cmd_pipe: A socket.socket or multiprocessing.Connection object which
represents the Interpreter side of the command pipe. This must be a
bidirectional pipe. Commands and responses will utilize this pipe.
dbg_pipe: A socket.socket or multiprocessing.Connection object which
represents the Interpreter side of the debug pipe. This must be a
- unidirectional pipe with write capabilities. EC debug output will
- utilize this pipe.
+ unidirectional pipe with write capabilities. EC debug output will utilize
+ this pipe.
cmd_retries: An integer representing the number of attempts the console
should retry commands if it receives an error.
- log_level: An optional integer representing the numeric value of the log
- level. By default, the log level will be logging.INFO (20).
- name: the console source name
- """
- # Create a unique logger based on the interpreter name
- interpreter_prefix = ('%s - ' % name) if name else ''
- logger = logging.getLogger('%sEC3PO.Interpreter' % interpreter_prefix)
- self.logger = LoggerAdapter(logger, {'pty': ec_uart_pty})
- # TODO(https://crbug.com/1162189): revist the 2 TODOs below
- # TODO(https://bugs.python.org/issue27805, python3.7+): revert to ab+
- # TODO(https://bugs.python.org/issue20074): removing buffering=0 if/when
- # that gets fixed, or keep two pty: one for reading and one for writing
- self.ec_uart_pty = open(ec_uart_pty, 'r+b', buffering=0)
- self.ec_uart_pty_name = ec_uart_pty
- self.cmd_pipe = cmd_pipe
- self.dbg_pipe = dbg_pipe
- self.cmd_retries = COMMAND_RETRIES
- self.log_level = log_level
- self.inputs = [self.ec_uart_pty, self.cmd_pipe]
- self.outputs = []
- self.ec_cmd_queue = six.moves.queue.Queue()
- self.last_cmd = b''
- self.enhanced_ec = False
- self.interrogating = False
- self.connected = True
-
- def __str__(self):
- """Show internal state of the Interpreter object.
-
- Returns:
- A string that shows the values of the attributes.
- """
- string = []
- string.append('%r' % self)
- string.append('ec_uart_pty: %s' % self.ec_uart_pty)
- string.append('cmd_pipe: %r' % self.cmd_pipe)
- string.append('dbg_pipe: %r' % self.dbg_pipe)
- string.append('cmd_retries: %d' % self.cmd_retries)
- string.append('log_level: %d' % self.log_level)
- string.append('inputs: %r' % self.inputs)
- string.append('outputs: %r' % self.outputs)
- string.append('ec_cmd_queue: %r' % self.ec_cmd_queue)
- string.append('last_cmd: \'%s\'' % self.last_cmd)
- string.append('enhanced_ec: %r' % self.enhanced_ec)
- string.append('interrogating: %r' % self.interrogating)
- return '\n'.join(string)
-
- def EnqueueCmd(self, command):
- """Enqueue a command to be sent to the EC UART.
-
- Args:
- command: A string which contains the command to be sent.
+ log_level: An integer representing the numeric value of the log level.
+ inputs: A list of objects that the intpreter selects for reading.
+ Initially, these are the EC UART and the command pipe.
+ outputs: A list of objects that the interpreter selects for writing.
+ ec_cmd_queue: A FIFO queue used for sending commands down to the EC UART.
+ last_cmd: A string that represents the last command sent to the EC. If an
+ error is encountered, the interpreter will attempt to retry this command
+ up to COMMAND_RETRIES.
+ enhanced_ec: A boolean indicating if the EC image that we are currently
+ communicating with is enhanced or not. Enhanced EC images will support
+ packed commands and host commands over the UART. This defaults to False
+ and is changed depending on the result of an interrogation.
+ interrogating: A boolean indicating if we are in the middle of interrogating
+ the EC.
+ connected: A boolean indicating if the interpreter is actually connected to
+ the UART and listening.
"""
- self.ec_cmd_queue.put(command)
- self.logger.log(1, 'Commands now in queue: %d', self.ec_cmd_queue.qsize())
- # Add the EC UART as an output to be serviced.
- if self.connected and self.ec_uart_pty not in self.outputs:
- self.outputs.append(self.ec_uart_pty)
-
- def PackCommand(self, raw_cmd):
- r"""Packs a command for use with error checking.
-
- For error checking, we pack console commands in a particular format. The
- format is as follows:
-
- &&[x][x][x][x]&{cmd}\n\n
- ^ ^ ^^ ^^ ^ ^-- 2 newlines.
- | | || || |-- the raw console command.
- | | || ||-- 1 ampersand.
- | | ||____|--- 2 hex digits representing the CRC8 of cmd.
- | |____|-- 2 hex digits reprsenting the length of cmd.
- |-- 2 ampersands
-
- Args:
- raw_cmd: A pre-packed string which contains the raw command.
-
- Returns:
- A string which contains the packed command.
- """
- # Don't pack a single carriage return.
- if raw_cmd != b'\r':
- # The command format is as follows.
- # &&[x][x][x][x]&{cmd}\n\n
- packed_cmd = []
- packed_cmd.append(b'&&')
- # The first pair of hex digits are the length of the command.
- packed_cmd.append(b'%02x' % len(raw_cmd))
- # Then the CRC8 of cmd.
- packed_cmd.append(b'%02x' % Crc8(raw_cmd))
- packed_cmd.append(b'&')
- # Now, the raw command followed by 2 newlines.
- packed_cmd.append(raw_cmd)
- packed_cmd.append(b'\n\n')
- return b''.join(packed_cmd)
- else:
- return raw_cmd
-
- def ProcessCommand(self, command):
- """Captures the input determines what actions to take.
-
- Args:
- command: A string representing the command sent by the user.
- """
- if command == b'disconnect':
- if self.connected:
- self.logger.debug('UART disconnect request.')
- # Drop all pending commands if any.
- while not self.ec_cmd_queue.empty():
- c = self.ec_cmd_queue.get()
- self.logger.debug('dropped: \'%s\'', c)
- if self.enhanced_ec:
- # Reset retry state.
- self.cmd_retries = COMMAND_RETRIES
- self.last_cmd = b''
- # Get the UART that the interpreter is attached to.
- fileobj = self.ec_uart_pty
- self.logger.debug('fileobj: %r', fileobj)
- # Remove the descriptor from the inputs and outputs.
- self.inputs.remove(fileobj)
- if fileobj in self.outputs:
- self.outputs.remove(fileobj)
- self.logger.debug('Removed fileobj. Remaining inputs: %r', self.inputs)
- # Close the file.
- fileobj.close()
- # Mark the interpreter as disconnected now.
- self.connected = False
- self.logger.debug('Disconnected from %s.', self.ec_uart_pty_name)
- return
-
- elif command == b'reconnect':
- if not self.connected:
- self.logger.debug('UART reconnect request.')
- # Reopen the PTY.
+ def __init__(
+ self, ec_uart_pty, cmd_pipe, dbg_pipe, log_level=logging.INFO, name=None
+ ):
+ """Intializes an Interpreter object with the provided args.
+
+ Args:
+ ec_uart_pty: A string representing the EC UART to connect to.
+ cmd_pipe: A socket.socket or multiprocessing.Connection object which
+ represents the Interpreter side of the command pipe. This must be a
+ bidirectional pipe. Commands and responses will utilize this pipe.
+ dbg_pipe: A socket.socket or multiprocessing.Connection object which
+ represents the Interpreter side of the debug pipe. This must be a
+ unidirectional pipe with write capabilities. EC debug output will
+ utilize this pipe.
+ cmd_retries: An integer representing the number of attempts the console
+ should retry commands if it receives an error.
+ log_level: An optional integer representing the numeric value of the log
+ level. By default, the log level will be logging.INFO (20).
+ name: the console source name
+ """
+ # Create a unique logger based on the interpreter name
+ interpreter_prefix = ("%s - " % name) if name else ""
+ logger = logging.getLogger("%sEC3PO.Interpreter" % interpreter_prefix)
+ self.logger = LoggerAdapter(logger, {"pty": ec_uart_pty})
+ # TODO(https://crbug.com/1162189): revist the 2 TODOs below
# TODO(https://bugs.python.org/issue27805, python3.7+): revert to ab+
# TODO(https://bugs.python.org/issue20074): removing buffering=0 if/when
# that gets fixed, or keep two pty: one for reading and one for writing
- fileobj = open(self.ec_uart_pty_name, 'r+b', buffering=0)
- self.logger.debug('fileobj: %r', fileobj)
- self.ec_uart_pty = fileobj
- # Add the descriptor to the inputs.
- self.inputs.append(fileobj)
- self.logger.debug('fileobj added. curr inputs: %r', self.inputs)
- # Mark the interpreter as connected now.
- self.connected = True
- self.logger.debug('Connected to %s.', self.ec_uart_pty_name)
- return
-
- elif command.startswith(b'enhanced'):
- self.enhanced_ec = command.split(b' ')[1] == b'True'
- return
-
- # Ignore any other commands while in the disconnected state.
- self.logger.log(1, 'command: \'%s\'', command)
- if not self.connected:
- self.logger.debug('Ignoring command because currently disconnected.')
- return
-
- # Remove leading and trailing spaces only if this is an enhanced EC image.
- # For non-enhanced EC images, commands will be single characters at a time
- # and can be spaces.
- if self.enhanced_ec:
- command = command.strip(b' ')
-
- # There's nothing to do if the command is empty.
- if len(command) == 0:
- return
-
- # Handle log level change requests.
- if command.startswith(b'loglevel'):
- self.logger.debug('Log level change request.')
- new_log_level = int(command.split(b' ')[1])
- self.logger.logger.setLevel(new_log_level)
- self.logger.info('Log level changed to %d.', new_log_level)
- return
-
- # Check for interrogation command.
- if command == EC_SYN:
- # User is requesting interrogation. Send SYN as is.
- self.logger.debug('User requesting interrogation.')
- self.interrogating = True
- # Assume the EC isn't enhanced until we get a response.
- self.enhanced_ec = False
- elif self.enhanced_ec:
- # Enhanced EC images require the plaintext commands to be packed.
- command = self.PackCommand(command)
- # TODO(aaboagye): Make a dict of commands and keys and eventually,
- # handle partial matching based on unique prefixes.
-
- self.EnqueueCmd(command)
-
- def HandleCmdRetries(self):
- """Attempts to retry commands if possible."""
- if self.cmd_retries > 0:
- # The EC encountered an error. We'll have to retry again.
- self.logger.warning('Retrying command...')
- self.cmd_retries -= 1
- self.logger.warning('Retries remaining: %d', self.cmd_retries)
- # Retry the command and add the EC UART to the writers again.
- self.EnqueueCmd(self.last_cmd)
- self.outputs.append(self.ec_uart_pty)
- else:
- # We're out of retries, so just give up.
- self.logger.error('Command failed. No retries left.')
- # Clear the command in progress.
- self.last_cmd = b''
- # Reset the retry count.
- self.cmd_retries = COMMAND_RETRIES
-
- def SendCmdToEC(self):
- """Sends a command to the EC."""
- # If we're retrying a command, just try to send it again.
- if self.cmd_retries < COMMAND_RETRIES:
- cmd = self.last_cmd
- else:
- # If we're not retrying, we should not be writing to the EC if we have no
- # items in our command queue.
- assert not self.ec_cmd_queue.empty()
- # Get the command to send.
- cmd = self.ec_cmd_queue.get()
-
- # Send the command.
- self.ec_uart_pty.write(cmd)
- self.ec_uart_pty.flush()
- self.logger.log(1, 'Sent command to EC.')
-
- if self.enhanced_ec and cmd != EC_SYN:
- # Now, that we've sent the command, store the current command as the last
- # command sent. If we encounter an error string, we will attempt to retry
- # this command.
- if cmd != self.last_cmd:
- self.last_cmd = cmd
- # Reset the retry count.
+ self.ec_uart_pty = open(ec_uart_pty, "r+b", buffering=0)
+ self.ec_uart_pty_name = ec_uart_pty
+ self.cmd_pipe = cmd_pipe
+ self.dbg_pipe = dbg_pipe
self.cmd_retries = COMMAND_RETRIES
+ self.log_level = log_level
+ self.inputs = [self.ec_uart_pty, self.cmd_pipe]
+ self.outputs = []
+ self.ec_cmd_queue = six.moves.queue.Queue()
+ self.last_cmd = b""
+ self.enhanced_ec = False
+ self.interrogating = False
+ self.connected = True
- # If no command is pending to be sent, then we can remove the EC UART from
- # writers. Might need better checking for command retry logic in here.
- if self.ec_cmd_queue.empty():
- # Remove the EC UART from the writers while we wait for a response.
- self.logger.debug('Removing EC UART from writers.')
- self.outputs.remove(self.ec_uart_pty)
-
- def HandleECData(self):
- """Handle any debug prints from the EC."""
- self.logger.log(1, 'EC has data')
- # Read what the EC sent us.
- data = os.read(self.ec_uart_pty.fileno(), EC_MAX_READ)
- self.logger.log(1, 'got: \'%s\'', binascii.hexlify(data))
- if b'&E' in data and self.enhanced_ec:
- # We received an error, so we should retry it if possible.
- self.logger.warning('Error string found in data.')
- self.HandleCmdRetries()
- return
-
- # If we were interrogating, check the response and update our knowledge
- # of the current EC image.
- if self.interrogating:
- self.enhanced_ec = data == EC_ACK
- if self.enhanced_ec:
- self.logger.debug('The current EC image seems enhanced.')
- else:
- self.logger.debug('The current EC image does NOT seem enhanced.')
- # Done interrogating.
- self.interrogating = False
- # For now, just forward everything the EC sends us.
- self.logger.log(1, 'Forwarding to user...')
- self.dbg_pipe.send(data)
-
- def HandleUserData(self):
- """Handle any incoming commands from the user.
-
- Raises:
- EOFError: Allowed to propagate through from self.cmd_pipe.recv().
- """
- self.logger.log(1, 'Command data available. Begin processing.')
- data = self.cmd_pipe.recv()
- # Process the command.
- self.ProcessCommand(data)
+ def __str__(self):
+ """Show internal state of the Interpreter object.
+
+ Returns:
+ A string that shows the values of the attributes.
+ """
+ string = []
+ string.append("%r" % self)
+ string.append("ec_uart_pty: %s" % self.ec_uart_pty)
+ string.append("cmd_pipe: %r" % self.cmd_pipe)
+ string.append("dbg_pipe: %r" % self.dbg_pipe)
+ string.append("cmd_retries: %d" % self.cmd_retries)
+ string.append("log_level: %d" % self.log_level)
+ string.append("inputs: %r" % self.inputs)
+ string.append("outputs: %r" % self.outputs)
+ string.append("ec_cmd_queue: %r" % self.ec_cmd_queue)
+ string.append("last_cmd: '%s'" % self.last_cmd)
+ string.append("enhanced_ec: %r" % self.enhanced_ec)
+ string.append("interrogating: %r" % self.interrogating)
+ return "\n".join(string)
+
+ def EnqueueCmd(self, command):
+ """Enqueue a command to be sent to the EC UART.
+
+ Args:
+ command: A string which contains the command to be sent.
+ """
+ self.ec_cmd_queue.put(command)
+ self.logger.log(1, "Commands now in queue: %d", self.ec_cmd_queue.qsize())
+
+ # Add the EC UART as an output to be serviced.
+ if self.connected and self.ec_uart_pty not in self.outputs:
+ self.outputs.append(self.ec_uart_pty)
+
+ def PackCommand(self, raw_cmd):
+ r"""Packs a command for use with error checking.
+
+ For error checking, we pack console commands in a particular format. The
+ format is as follows:
+
+ &&[x][x][x][x]&{cmd}\n\n
+ ^ ^ ^^ ^^ ^ ^-- 2 newlines.
+ | | || || |-- the raw console command.
+ | | || ||-- 1 ampersand.
+ | | ||____|--- 2 hex digits representing the CRC8 of cmd.
+ | |____|-- 2 hex digits reprsenting the length of cmd.
+ |-- 2 ampersands
+
+ Args:
+ raw_cmd: A pre-packed string which contains the raw command.
+
+ Returns:
+ A string which contains the packed command.
+ """
+ # Don't pack a single carriage return.
+ if raw_cmd != b"\r":
+ # The command format is as follows.
+ # &&[x][x][x][x]&{cmd}\n\n
+ packed_cmd = []
+ packed_cmd.append(b"&&")
+ # The first pair of hex digits are the length of the command.
+ packed_cmd.append(b"%02x" % len(raw_cmd))
+ # Then the CRC8 of cmd.
+ packed_cmd.append(b"%02x" % Crc8(raw_cmd))
+ packed_cmd.append(b"&")
+ # Now, the raw command followed by 2 newlines.
+ packed_cmd.append(raw_cmd)
+ packed_cmd.append(b"\n\n")
+ return b"".join(packed_cmd)
+ else:
+ return raw_cmd
+
+ def ProcessCommand(self, command):
+ """Captures the input determines what actions to take.
+
+ Args:
+ command: A string representing the command sent by the user.
+ """
+ if command == b"disconnect":
+ if self.connected:
+ self.logger.debug("UART disconnect request.")
+ # Drop all pending commands if any.
+ while not self.ec_cmd_queue.empty():
+ c = self.ec_cmd_queue.get()
+ self.logger.debug("dropped: '%s'", c)
+ if self.enhanced_ec:
+ # Reset retry state.
+ self.cmd_retries = COMMAND_RETRIES
+ self.last_cmd = b""
+ # Get the UART that the interpreter is attached to.
+ fileobj = self.ec_uart_pty
+ self.logger.debug("fileobj: %r", fileobj)
+ # Remove the descriptor from the inputs and outputs.
+ self.inputs.remove(fileobj)
+ if fileobj in self.outputs:
+ self.outputs.remove(fileobj)
+ self.logger.debug("Removed fileobj. Remaining inputs: %r", self.inputs)
+ # Close the file.
+ fileobj.close()
+ # Mark the interpreter as disconnected now.
+ self.connected = False
+ self.logger.debug("Disconnected from %s.", self.ec_uart_pty_name)
+ return
+
+ elif command == b"reconnect":
+ if not self.connected:
+ self.logger.debug("UART reconnect request.")
+ # Reopen the PTY.
+ # TODO(https://bugs.python.org/issue27805, python3.7+): revert to ab+
+ # TODO(https://bugs.python.org/issue20074): removing buffering=0 if/when
+ # that gets fixed, or keep two pty: one for reading and one for writing
+ fileobj = open(self.ec_uart_pty_name, "r+b", buffering=0)
+ self.logger.debug("fileobj: %r", fileobj)
+ self.ec_uart_pty = fileobj
+ # Add the descriptor to the inputs.
+ self.inputs.append(fileobj)
+ self.logger.debug("fileobj added. curr inputs: %r", self.inputs)
+ # Mark the interpreter as connected now.
+ self.connected = True
+ self.logger.debug("Connected to %s.", self.ec_uart_pty_name)
+ return
+
+ elif command.startswith(b"enhanced"):
+ self.enhanced_ec = command.split(b" ")[1] == b"True"
+ return
+
+ # Ignore any other commands while in the disconnected state.
+ self.logger.log(1, "command: '%s'", command)
+ if not self.connected:
+ self.logger.debug("Ignoring command because currently disconnected.")
+ return
+
+ # Remove leading and trailing spaces only if this is an enhanced EC image.
+ # For non-enhanced EC images, commands will be single characters at a time
+ # and can be spaces.
+ if self.enhanced_ec:
+ command = command.strip(b" ")
+
+ # There's nothing to do if the command is empty.
+ if len(command) == 0:
+ return
+
+ # Handle log level change requests.
+ if command.startswith(b"loglevel"):
+ self.logger.debug("Log level change request.")
+ new_log_level = int(command.split(b" ")[1])
+ self.logger.logger.setLevel(new_log_level)
+ self.logger.info("Log level changed to %d.", new_log_level)
+ return
+
+ # Check for interrogation command.
+ if command == EC_SYN:
+ # User is requesting interrogation. Send SYN as is.
+ self.logger.debug("User requesting interrogation.")
+ self.interrogating = True
+ # Assume the EC isn't enhanced until we get a response.
+ self.enhanced_ec = False
+ elif self.enhanced_ec:
+ # Enhanced EC images require the plaintext commands to be packed.
+ command = self.PackCommand(command)
+ # TODO(aaboagye): Make a dict of commands and keys and eventually,
+ # handle partial matching based on unique prefixes.
+
+ self.EnqueueCmd(command)
+
+ def HandleCmdRetries(self):
+ """Attempts to retry commands if possible."""
+ if self.cmd_retries > 0:
+ # The EC encountered an error. We'll have to retry again.
+ self.logger.warning("Retrying command...")
+ self.cmd_retries -= 1
+ self.logger.warning("Retries remaining: %d", self.cmd_retries)
+ # Retry the command and add the EC UART to the writers again.
+ self.EnqueueCmd(self.last_cmd)
+ self.outputs.append(self.ec_uart_pty)
+ else:
+ # We're out of retries, so just give up.
+ self.logger.error("Command failed. No retries left.")
+ # Clear the command in progress.
+ self.last_cmd = b""
+ # Reset the retry count.
+ self.cmd_retries = COMMAND_RETRIES
+
+ def SendCmdToEC(self):
+ """Sends a command to the EC."""
+ # If we're retrying a command, just try to send it again.
+ if self.cmd_retries < COMMAND_RETRIES:
+ cmd = self.last_cmd
+ else:
+ # If we're not retrying, we should not be writing to the EC if we have no
+ # items in our command queue.
+ assert not self.ec_cmd_queue.empty()
+ # Get the command to send.
+ cmd = self.ec_cmd_queue.get()
+
+ # Send the command.
+ self.ec_uart_pty.write(cmd)
+ self.ec_uart_pty.flush()
+ self.logger.log(1, "Sent command to EC.")
+
+ if self.enhanced_ec and cmd != EC_SYN:
+ # Now, that we've sent the command, store the current command as the last
+ # command sent. If we encounter an error string, we will attempt to retry
+ # this command.
+ if cmd != self.last_cmd:
+ self.last_cmd = cmd
+ # Reset the retry count.
+ self.cmd_retries = COMMAND_RETRIES
+
+ # If no command is pending to be sent, then we can remove the EC UART from
+ # writers. Might need better checking for command retry logic in here.
+ if self.ec_cmd_queue.empty():
+ # Remove the EC UART from the writers while we wait for a response.
+ self.logger.debug("Removing EC UART from writers.")
+ self.outputs.remove(self.ec_uart_pty)
+
+ def HandleECData(self):
+ """Handle any debug prints from the EC."""
+ self.logger.log(1, "EC has data")
+ # Read what the EC sent us.
+ data = os.read(self.ec_uart_pty.fileno(), EC_MAX_READ)
+ self.logger.log(1, "got: '%s'", binascii.hexlify(data))
+ if b"&E" in data and self.enhanced_ec:
+ # We received an error, so we should retry it if possible.
+ self.logger.warning("Error string found in data.")
+ self.HandleCmdRetries()
+ return
+
+ # If we were interrogating, check the response and update our knowledge
+ # of the current EC image.
+ if self.interrogating:
+ self.enhanced_ec = data == EC_ACK
+ if self.enhanced_ec:
+ self.logger.debug("The current EC image seems enhanced.")
+ else:
+ self.logger.debug("The current EC image does NOT seem enhanced.")
+ # Done interrogating.
+ self.interrogating = False
+ # For now, just forward everything the EC sends us.
+ self.logger.log(1, "Forwarding to user...")
+ self.dbg_pipe.send(data)
+
+ def HandleUserData(self):
+ """Handle any incoming commands from the user.
+
+ Raises:
+ EOFError: Allowed to propagate through from self.cmd_pipe.recv().
+ """
+ self.logger.log(1, "Command data available. Begin processing.")
+ data = self.cmd_pipe.recv()
+ # Process the command.
+ self.ProcessCommand(data)
def Crc8(data):
- """Calculates the CRC8 of data.
+ """Calculates the CRC8 of data.
- The generator polynomial used is: x^8 + x^2 + x + 1.
- This is the same implementation that is used in the EC.
+ The generator polynomial used is: x^8 + x^2 + x + 1.
+ This is the same implementation that is used in the EC.
- Args:
- data: A string of data that we wish to calculate the CRC8 on.
+ Args:
+ data: A string of data that we wish to calculate the CRC8 on.
- Returns:
- crc >> 8: An integer representing the CRC8 value.
- """
- crc = 0
- for byte in six.iterbytes(data):
- crc ^= (byte << 8)
- for _ in range(8):
- if crc & 0x8000:
- crc ^= (0x1070 << 3)
- crc <<= 1
- return crc >> 8
+ Returns:
+ crc >> 8: An integer representing the CRC8 value.
+ """
+ crc = 0
+ for byte in six.iterbytes(data):
+ crc ^= byte << 8
+ for _ in range(8):
+ if crc & 0x8000:
+ crc ^= 0x1070 << 3
+ crc <<= 1
+ return crc >> 8
def StartLoop(interp, shutdown_pipe=None):
- """Starts an infinite loop of servicing the user and the EC.
-
- StartLoop checks to see if there are any commands to process, processing them
- if any, and forwards EC output to the user.
-
- When sending a command to the EC, we send the command once and check the
- response to see if the EC encountered an error when receiving the command. An
- error condition is reported to the interpreter by a string with at least one
- '&' and 'E'. The full string is actually '&&EE', however it's possible that
- the leading ampersand or trailing 'E' could be dropped. If an error is
- encountered, the interpreter will retry up to the amount configured.
-
- Args:
- interp: An Interpreter object that has been properly initialised.
- shutdown_pipe: A file object for a pipe or equivalent that becomes readable
- (not blocked) to indicate that the loop should exit. Can be None to never
- exit the loop.
- """
- try:
- # This is used instead of "break" to avoid exiting the loop in the middle of
- # an iteration.
- continue_looping = True
-
- while continue_looping:
- # The inputs list is created anew in each loop iteration because the
- # Interpreter class sometimes modifies the interp.inputs list.
- if shutdown_pipe is None:
- inputs = interp.inputs
- else:
- inputs = list(interp.inputs)
- inputs.append(shutdown_pipe)
-
- readable, writeable, _ = select.select(inputs, interp.outputs, [])
-
- for obj in readable:
- # Handle any debug prints from the EC.
- if obj is interp.ec_uart_pty:
- interp.HandleECData()
-
- # Handle any commands from the user.
- elif obj is interp.cmd_pipe:
- try:
- interp.HandleUserData()
- except EOFError:
- interp.logger.debug(
- 'ec3po interpreter received EOF from cmd_pipe in '
- 'HandleUserData()')
- continue_looping = False
-
- elif obj is shutdown_pipe:
- interp.logger.debug(
- 'ec3po interpreter received shutdown pipe unblocked notification')
- continue_looping = False
-
- for obj in writeable:
- # Send a command to the EC.
- if obj is interp.ec_uart_pty:
- interp.SendCmdToEC()
-
- except KeyboardInterrupt:
- pass
-
- finally:
- interp.cmd_pipe.close()
- interp.dbg_pipe.close()
- interp.ec_uart_pty.close()
- if shutdown_pipe is not None:
- shutdown_pipe.close()
- interp.logger.debug('Exit ec3po interpreter loop for %s',
- interp.ec_uart_pty_name)
+ """Starts an infinite loop of servicing the user and the EC.
+
+ StartLoop checks to see if there are any commands to process, processing them
+ if any, and forwards EC output to the user.
+
+ When sending a command to the EC, we send the command once and check the
+ response to see if the EC encountered an error when receiving the command. An
+ error condition is reported to the interpreter by a string with at least one
+ '&' and 'E'. The full string is actually '&&EE', however it's possible that
+ the leading ampersand or trailing 'E' could be dropped. If an error is
+ encountered, the interpreter will retry up to the amount configured.
+
+ Args:
+ interp: An Interpreter object that has been properly initialised.
+ shutdown_pipe: A file object for a pipe or equivalent that becomes readable
+ (not blocked) to indicate that the loop should exit. Can be None to never
+ exit the loop.
+ """
+ try:
+ # This is used instead of "break" to avoid exiting the loop in the middle of
+ # an iteration.
+ continue_looping = True
+
+ while continue_looping:
+ # The inputs list is created anew in each loop iteration because the
+ # Interpreter class sometimes modifies the interp.inputs list.
+ if shutdown_pipe is None:
+ inputs = interp.inputs
+ else:
+ inputs = list(interp.inputs)
+ inputs.append(shutdown_pipe)
+
+ readable, writeable, _ = select.select(inputs, interp.outputs, [])
+
+ for obj in readable:
+ # Handle any debug prints from the EC.
+ if obj is interp.ec_uart_pty:
+ interp.HandleECData()
+
+ # Handle any commands from the user.
+ elif obj is interp.cmd_pipe:
+ try:
+ interp.HandleUserData()
+ except EOFError:
+ interp.logger.debug(
+ "ec3po interpreter received EOF from cmd_pipe in "
+ "HandleUserData()"
+ )
+ continue_looping = False
+
+ elif obj is shutdown_pipe:
+ interp.logger.debug(
+ "ec3po interpreter received shutdown pipe unblocked notification"
+ )
+ continue_looping = False
+
+ for obj in writeable:
+ # Send a command to the EC.
+ if obj is interp.ec_uart_pty:
+ interp.SendCmdToEC()
+
+ except KeyboardInterrupt:
+ pass
+
+ finally:
+ interp.cmd_pipe.close()
+ interp.dbg_pipe.close()
+ interp.ec_uart_pty.close()
+ if shutdown_pipe is not None:
+ shutdown_pipe.close()
+ interp.logger.debug(
+ "Exit ec3po interpreter loop for %s", interp.ec_uart_pty_name
+ )
diff --git a/util/ec3po/interpreter_unittest.py b/util/ec3po/interpreter_unittest.py
index fe4d43c351..509b90f667 100755
--- a/util/ec3po/interpreter_unittest.py
+++ b/util/ec3po/interpreter_unittest.py
@@ -10,371 +10,389 @@
from __future__ import print_function
import logging
-import mock
import tempfile
import unittest
+import mock
import six
-
-from ec3po import interpreter
-from ec3po import threadproc_shim
+from ec3po import interpreter, threadproc_shim
def GetBuiltins(func):
- if six.PY2:
- return '__builtin__.' + func
- return 'builtins.' + func
+ if six.PY2:
+ return "__builtin__." + func
+ return "builtins." + func
class TestEnhancedECBehaviour(unittest.TestCase):
- """Test case to verify all enhanced EC interpretation tasks."""
- def setUp(self):
- """Setup the test harness."""
- # Setup logging with a timestamp, the module, and the log level.
- logging.basicConfig(level=logging.DEBUG,
- format=('%(asctime)s - %(module)s -'
- ' %(levelname)s - %(message)s'))
-
- # Create a tempfile that would represent the EC UART PTY.
- self.tempfile = tempfile.NamedTemporaryFile()
-
- # Create the pipes that the interpreter will use.
- self.cmd_pipe_user, self.cmd_pipe_itpr = threadproc_shim.Pipe()
- self.dbg_pipe_user, self.dbg_pipe_itpr = threadproc_shim.Pipe(duplex=False)
-
- # Mock the open() function so we can inspect reads/writes to the EC.
- self.ec_uart_pty = mock.mock_open()
-
- with mock.patch(GetBuiltins('open'), self.ec_uart_pty):
- # Create an interpreter.
- self.itpr = interpreter.Interpreter(self.tempfile.name,
- self.cmd_pipe_itpr,
- self.dbg_pipe_itpr,
- log_level=logging.DEBUG,
- name="EC")
-
- @mock.patch('ec3po.interpreter.os')
- def test_HandlingCommandsThatProduceNoOutput(self, mock_os):
- """Verify that the Interpreter correctly handles non-output commands.
-
- Args:
- mock_os: MagicMock object replacing the 'os' module for this test
- case.
- """
- # The interpreter init should open the EC UART PTY.
- expected_ec_calls = [mock.call(self.tempfile.name, 'r+b', buffering=0)]
- # Have a command come in the command pipe. The first command will be an
- # interrogation to determine if the EC is enhanced or not.
- self.cmd_pipe_user.send(interpreter.EC_SYN)
- self.itpr.HandleUserData()
- # At this point, the command should be queued up waiting to be sent, so
- # let's actually send it to the EC.
- self.itpr.SendCmdToEC()
- expected_ec_calls.extend([mock.call().write(interpreter.EC_SYN),
- mock.call().flush()])
- # Now, assume that the EC sends only 1 response back of EC_ACK.
- mock_os.read.side_effect = [interpreter.EC_ACK]
- # When reading the EC, the interpreter will call file.fileno() to pass to
- # os.read().
- expected_ec_calls.append(mock.call().fileno())
- # Simulate the response.
- self.itpr.HandleECData()
-
- # Now that the interrogation was complete, it's time to send down the real
- # command.
- test_cmd = b'chan save'
- # Send the test command down the pipe.
- self.cmd_pipe_user.send(test_cmd)
- self.itpr.HandleUserData()
- self.itpr.SendCmdToEC()
- # Since the EC image is enhanced, we should have sent a packed command.
- expected_ec_calls.append(mock.call().write(self.itpr.PackCommand(test_cmd)))
- expected_ec_calls.append(mock.call().flush())
-
- # Now that the first command was sent, we should send another command which
- # produces no output. The console would send another interrogation.
- self.cmd_pipe_user.send(interpreter.EC_SYN)
- self.itpr.HandleUserData()
- self.itpr.SendCmdToEC()
- expected_ec_calls.extend([mock.call().write(interpreter.EC_SYN),
- mock.call().flush()])
- # Again, assume that the EC sends only 1 response back of EC_ACK.
- mock_os.read.side_effect = [interpreter.EC_ACK]
- # When reading the EC, the interpreter will call file.fileno() to pass to
- # os.read().
- expected_ec_calls.append(mock.call().fileno())
- # Simulate the response.
- self.itpr.HandleECData()
-
- # Now send the second test command.
- test_cmd = b'chan 0'
- self.cmd_pipe_user.send(test_cmd)
- self.itpr.HandleUserData()
- self.itpr.SendCmdToEC()
- # Since the EC image is enhanced, we should have sent a packed command.
- expected_ec_calls.append(mock.call().write(self.itpr.PackCommand(test_cmd)))
- expected_ec_calls.append(mock.call().flush())
-
- # Finally, verify that the appropriate writes were actually sent to the EC.
- self.ec_uart_pty.assert_has_calls(expected_ec_calls)
-
- @mock.patch('ec3po.interpreter.os')
- def test_CommandRetryingOnError(self, mock_os):
- """Verify that commands are retried if an error is encountered.
-
- Args:
- mock_os: MagicMock object replacing the 'os' module for this test
- case.
- """
- # The interpreter init should open the EC UART PTY.
- expected_ec_calls = [mock.call(self.tempfile.name, 'r+b', buffering=0)]
- # Have a command come in the command pipe. The first command will be an
- # interrogation to determine if the EC is enhanced or not.
- self.cmd_pipe_user.send(interpreter.EC_SYN)
- self.itpr.HandleUserData()
- # At this point, the command should be queued up waiting to be sent, so
- # let's actually send it to the EC.
- self.itpr.SendCmdToEC()
- expected_ec_calls.extend([mock.call().write(interpreter.EC_SYN),
- mock.call().flush()])
- # Now, assume that the EC sends only 1 response back of EC_ACK.
- mock_os.read.side_effect = [interpreter.EC_ACK]
- # When reading the EC, the interpreter will call file.fileno() to pass to
- # os.read().
- expected_ec_calls.append(mock.call().fileno())
- # Simulate the response.
- self.itpr.HandleECData()
-
- # Let's send a command that is received on the EC-side with an error.
- test_cmd = b'accelinfo'
- self.cmd_pipe_user.send(test_cmd)
- self.itpr.HandleUserData()
- self.itpr.SendCmdToEC()
- packed_cmd = self.itpr.PackCommand(test_cmd)
- expected_ec_calls.extend([mock.call().write(packed_cmd),
- mock.call().flush()])
- # Have the EC return the error string twice.
- mock_os.read.side_effect = [b'&&EE', b'&&EE']
- for i in range(2):
- # When reading the EC, the interpreter will call file.fileno() to pass to
- # os.read().
- expected_ec_calls.append(mock.call().fileno())
- # Simulate the response.
- self.itpr.HandleECData()
-
- # Since an error was received, the EC should attempt to retry the command.
- expected_ec_calls.extend([mock.call().write(packed_cmd),
- mock.call().flush()])
- # Verify that the retry count was decremented.
- self.assertEqual(interpreter.COMMAND_RETRIES-i-1, self.itpr.cmd_retries,
- 'Unexpected cmd_remaining count.')
- # Actually retry the command.
- self.itpr.SendCmdToEC()
-
- # Now assume that the last one goes through with no trouble.
- expected_ec_calls.extend([mock.call().write(packed_cmd),
- mock.call().flush()])
- self.itpr.SendCmdToEC()
-
- # Verify all the calls.
- self.ec_uart_pty.assert_has_calls(expected_ec_calls)
-
- def test_PackCommandsForEnhancedEC(self):
- """Verify that the interpreter packs commands for enhanced EC images."""
- # Assume current EC image is enhanced.
- self.itpr.enhanced_ec = True
- # Receive a command from the user.
- test_cmd = b'gettime'
- self.cmd_pipe_user.send(test_cmd)
- # Mock out PackCommand to see if it was called.
- self.itpr.PackCommand = mock.MagicMock()
- # Have the interpreter handle the command.
- self.itpr.HandleUserData()
- # Verify that PackCommand() was called.
- self.itpr.PackCommand.assert_called_once_with(test_cmd)
-
- def test_DontPackCommandsForNonEnhancedEC(self):
- """Verify the interpreter doesn't pack commands for non-enhanced images."""
- # Assume current EC image is not enhanced.
- self.itpr.enhanced_ec = False
- # Receive a command from the user.
- test_cmd = b'gettime'
- self.cmd_pipe_user.send(test_cmd)
- # Mock out PackCommand to see if it was called.
- self.itpr.PackCommand = mock.MagicMock()
- # Have the interpreter handle the command.
- self.itpr.HandleUserData()
- # Verify that PackCommand() was called.
- self.itpr.PackCommand.assert_not_called()
-
- @mock.patch('ec3po.interpreter.os')
- def test_KeepingTrackOfInterrogation(self, mock_os):
- """Verify that the interpreter can track the state of the interrogation.
-
- Args:
- mock_os: MagicMock object replacing the 'os' module. for this test
- case.
- """
- # Upon init, the interpreter should assume that the current EC image is not
- # enhanced.
- self.assertFalse(self.itpr.enhanced_ec, msg=('State of enhanced_ec upon'
- ' init is not False.'))
-
- # Assume an interrogation request comes in from the user.
- self.cmd_pipe_user.send(interpreter.EC_SYN)
- self.itpr.HandleUserData()
-
- # Verify the state is now within an interrogation.
- self.assertTrue(self.itpr.interrogating, 'interrogating should be True')
- # The state of enhanced_ec should not be changed yet because we haven't
- # received a valid response yet.
- self.assertFalse(self.itpr.enhanced_ec, msg=('State of enhanced_ec is '
- 'not False.'))
-
- # Assume that the EC responds with an EC_ACK.
- mock_os.read.side_effect = [interpreter.EC_ACK]
- self.itpr.HandleECData()
-
- # Now, the interrogation should be complete and we should know that the
- # current EC image is enhanced.
- self.assertFalse(self.itpr.interrogating, msg=('interrogating should be '
- 'False'))
- self.assertTrue(self.itpr.enhanced_ec, msg='enhanced_ec sholud be True')
-
- # Now let's perform another interrogation, but pretend that the EC ignores
- # it.
- self.cmd_pipe_user.send(interpreter.EC_SYN)
- self.itpr.HandleUserData()
-
- # Verify interrogating state.
- self.assertTrue(self.itpr.interrogating, 'interrogating sholud be True')
- # We should assume that the image is not enhanced until we get the valid
- # response.
- self.assertFalse(self.itpr.enhanced_ec, 'enhanced_ec should be False now.')
-
- # Let's pretend that we get a random debug print. This should clear the
- # interrogating flag.
- mock_os.read.side_effect = [b'[1660.593076 HC 0x103]']
- self.itpr.HandleECData()
-
- # Verify that interrogating flag is cleared and enhanced_ec is still False.
- self.assertFalse(self.itpr.interrogating, 'interrogating should be False.')
- self.assertFalse(self.itpr.enhanced_ec,
- 'enhanced_ec should still be False.')
+ """Test case to verify all enhanced EC interpretation tasks."""
+
+ def setUp(self):
+ """Setup the test harness."""
+ # Setup logging with a timestamp, the module, and the log level.
+ logging.basicConfig(
+ level=logging.DEBUG,
+ format=("%(asctime)s - %(module)s -" " %(levelname)s - %(message)s"),
+ )
+
+ # Create a tempfile that would represent the EC UART PTY.
+ self.tempfile = tempfile.NamedTemporaryFile()
+
+ # Create the pipes that the interpreter will use.
+ self.cmd_pipe_user, self.cmd_pipe_itpr = threadproc_shim.Pipe()
+ self.dbg_pipe_user, self.dbg_pipe_itpr = threadproc_shim.Pipe(duplex=False)
+
+ # Mock the open() function so we can inspect reads/writes to the EC.
+ self.ec_uart_pty = mock.mock_open()
+
+ with mock.patch(GetBuiltins("open"), self.ec_uart_pty):
+ # Create an interpreter.
+ self.itpr = interpreter.Interpreter(
+ self.tempfile.name,
+ self.cmd_pipe_itpr,
+ self.dbg_pipe_itpr,
+ log_level=logging.DEBUG,
+ name="EC",
+ )
+
+ @mock.patch("ec3po.interpreter.os")
+ def test_HandlingCommandsThatProduceNoOutput(self, mock_os):
+ """Verify that the Interpreter correctly handles non-output commands.
+
+ Args:
+ mock_os: MagicMock object replacing the 'os' module for this test
+ case.
+ """
+ # The interpreter init should open the EC UART PTY.
+ expected_ec_calls = [mock.call(self.tempfile.name, "r+b", buffering=0)]
+ # Have a command come in the command pipe. The first command will be an
+ # interrogation to determine if the EC is enhanced or not.
+ self.cmd_pipe_user.send(interpreter.EC_SYN)
+ self.itpr.HandleUserData()
+ # At this point, the command should be queued up waiting to be sent, so
+ # let's actually send it to the EC.
+ self.itpr.SendCmdToEC()
+ expected_ec_calls.extend(
+ [mock.call().write(interpreter.EC_SYN), mock.call().flush()]
+ )
+ # Now, assume that the EC sends only 1 response back of EC_ACK.
+ mock_os.read.side_effect = [interpreter.EC_ACK]
+ # When reading the EC, the interpreter will call file.fileno() to pass to
+ # os.read().
+ expected_ec_calls.append(mock.call().fileno())
+ # Simulate the response.
+ self.itpr.HandleECData()
+
+ # Now that the interrogation was complete, it's time to send down the real
+ # command.
+ test_cmd = b"chan save"
+ # Send the test command down the pipe.
+ self.cmd_pipe_user.send(test_cmd)
+ self.itpr.HandleUserData()
+ self.itpr.SendCmdToEC()
+ # Since the EC image is enhanced, we should have sent a packed command.
+ expected_ec_calls.append(mock.call().write(self.itpr.PackCommand(test_cmd)))
+ expected_ec_calls.append(mock.call().flush())
+
+ # Now that the first command was sent, we should send another command which
+ # produces no output. The console would send another interrogation.
+ self.cmd_pipe_user.send(interpreter.EC_SYN)
+ self.itpr.HandleUserData()
+ self.itpr.SendCmdToEC()
+ expected_ec_calls.extend(
+ [mock.call().write(interpreter.EC_SYN), mock.call().flush()]
+ )
+ # Again, assume that the EC sends only 1 response back of EC_ACK.
+ mock_os.read.side_effect = [interpreter.EC_ACK]
+ # When reading the EC, the interpreter will call file.fileno() to pass to
+ # os.read().
+ expected_ec_calls.append(mock.call().fileno())
+ # Simulate the response.
+ self.itpr.HandleECData()
+
+ # Now send the second test command.
+ test_cmd = b"chan 0"
+ self.cmd_pipe_user.send(test_cmd)
+ self.itpr.HandleUserData()
+ self.itpr.SendCmdToEC()
+ # Since the EC image is enhanced, we should have sent a packed command.
+ expected_ec_calls.append(mock.call().write(self.itpr.PackCommand(test_cmd)))
+ expected_ec_calls.append(mock.call().flush())
+
+ # Finally, verify that the appropriate writes were actually sent to the EC.
+ self.ec_uart_pty.assert_has_calls(expected_ec_calls)
+
+ @mock.patch("ec3po.interpreter.os")
+ def test_CommandRetryingOnError(self, mock_os):
+ """Verify that commands are retried if an error is encountered.
+
+ Args:
+ mock_os: MagicMock object replacing the 'os' module for this test
+ case.
+ """
+ # The interpreter init should open the EC UART PTY.
+ expected_ec_calls = [mock.call(self.tempfile.name, "r+b", buffering=0)]
+ # Have a command come in the command pipe. The first command will be an
+ # interrogation to determine if the EC is enhanced or not.
+ self.cmd_pipe_user.send(interpreter.EC_SYN)
+ self.itpr.HandleUserData()
+ # At this point, the command should be queued up waiting to be sent, so
+ # let's actually send it to the EC.
+ self.itpr.SendCmdToEC()
+ expected_ec_calls.extend(
+ [mock.call().write(interpreter.EC_SYN), mock.call().flush()]
+ )
+ # Now, assume that the EC sends only 1 response back of EC_ACK.
+ mock_os.read.side_effect = [interpreter.EC_ACK]
+ # When reading the EC, the interpreter will call file.fileno() to pass to
+ # os.read().
+ expected_ec_calls.append(mock.call().fileno())
+ # Simulate the response.
+ self.itpr.HandleECData()
+
+ # Let's send a command that is received on the EC-side with an error.
+ test_cmd = b"accelinfo"
+ self.cmd_pipe_user.send(test_cmd)
+ self.itpr.HandleUserData()
+ self.itpr.SendCmdToEC()
+ packed_cmd = self.itpr.PackCommand(test_cmd)
+ expected_ec_calls.extend([mock.call().write(packed_cmd), mock.call().flush()])
+ # Have the EC return the error string twice.
+ mock_os.read.side_effect = [b"&&EE", b"&&EE"]
+ for i in range(2):
+ # When reading the EC, the interpreter will call file.fileno() to pass to
+ # os.read().
+ expected_ec_calls.append(mock.call().fileno())
+ # Simulate the response.
+ self.itpr.HandleECData()
+
+ # Since an error was received, the EC should attempt to retry the command.
+ expected_ec_calls.extend(
+ [mock.call().write(packed_cmd), mock.call().flush()]
+ )
+ # Verify that the retry count was decremented.
+ self.assertEqual(
+ interpreter.COMMAND_RETRIES - i - 1,
+ self.itpr.cmd_retries,
+ "Unexpected cmd_remaining count.",
+ )
+ # Actually retry the command.
+ self.itpr.SendCmdToEC()
+
+ # Now assume that the last one goes through with no trouble.
+ expected_ec_calls.extend([mock.call().write(packed_cmd), mock.call().flush()])
+ self.itpr.SendCmdToEC()
+
+ # Verify all the calls.
+ self.ec_uart_pty.assert_has_calls(expected_ec_calls)
+
+ def test_PackCommandsForEnhancedEC(self):
+ """Verify that the interpreter packs commands for enhanced EC images."""
+ # Assume current EC image is enhanced.
+ self.itpr.enhanced_ec = True
+ # Receive a command from the user.
+ test_cmd = b"gettime"
+ self.cmd_pipe_user.send(test_cmd)
+ # Mock out PackCommand to see if it was called.
+ self.itpr.PackCommand = mock.MagicMock()
+ # Have the interpreter handle the command.
+ self.itpr.HandleUserData()
+ # Verify that PackCommand() was called.
+ self.itpr.PackCommand.assert_called_once_with(test_cmd)
+
+ def test_DontPackCommandsForNonEnhancedEC(self):
+ """Verify the interpreter doesn't pack commands for non-enhanced images."""
+ # Assume current EC image is not enhanced.
+ self.itpr.enhanced_ec = False
+ # Receive a command from the user.
+ test_cmd = b"gettime"
+ self.cmd_pipe_user.send(test_cmd)
+ # Mock out PackCommand to see if it was called.
+ self.itpr.PackCommand = mock.MagicMock()
+ # Have the interpreter handle the command.
+ self.itpr.HandleUserData()
+ # Verify that PackCommand() was called.
+ self.itpr.PackCommand.assert_not_called()
+
+ @mock.patch("ec3po.interpreter.os")
+ def test_KeepingTrackOfInterrogation(self, mock_os):
+ """Verify that the interpreter can track the state of the interrogation.
+
+ Args:
+ mock_os: MagicMock object replacing the 'os' module. for this test
+ case.
+ """
+ # Upon init, the interpreter should assume that the current EC image is not
+ # enhanced.
+ self.assertFalse(
+ self.itpr.enhanced_ec,
+ msg=("State of enhanced_ec upon" " init is not False."),
+ )
+
+ # Assume an interrogation request comes in from the user.
+ self.cmd_pipe_user.send(interpreter.EC_SYN)
+ self.itpr.HandleUserData()
+
+ # Verify the state is now within an interrogation.
+ self.assertTrue(self.itpr.interrogating, "interrogating should be True")
+ # The state of enhanced_ec should not be changed yet because we haven't
+ # received a valid response yet.
+ self.assertFalse(
+ self.itpr.enhanced_ec, msg=("State of enhanced_ec is " "not False.")
+ )
+
+ # Assume that the EC responds with an EC_ACK.
+ mock_os.read.side_effect = [interpreter.EC_ACK]
+ self.itpr.HandleECData()
+
+ # Now, the interrogation should be complete and we should know that the
+ # current EC image is enhanced.
+ self.assertFalse(
+ self.itpr.interrogating, msg=("interrogating should be " "False")
+ )
+ self.assertTrue(self.itpr.enhanced_ec, msg="enhanced_ec sholud be True")
+
+ # Now let's perform another interrogation, but pretend that the EC ignores
+ # it.
+ self.cmd_pipe_user.send(interpreter.EC_SYN)
+ self.itpr.HandleUserData()
+
+ # Verify interrogating state.
+ self.assertTrue(self.itpr.interrogating, "interrogating sholud be True")
+ # We should assume that the image is not enhanced until we get the valid
+ # response.
+ self.assertFalse(self.itpr.enhanced_ec, "enhanced_ec should be False now.")
+
+ # Let's pretend that we get a random debug print. This should clear the
+ # interrogating flag.
+ mock_os.read.side_effect = [b"[1660.593076 HC 0x103]"]
+ self.itpr.HandleECData()
+
+ # Verify that interrogating flag is cleared and enhanced_ec is still False.
+ self.assertFalse(self.itpr.interrogating, "interrogating should be False.")
+ self.assertFalse(self.itpr.enhanced_ec, "enhanced_ec should still be False.")
class TestUARTDisconnection(unittest.TestCase):
- """Test case to verify interpreter disconnection/reconnection."""
- def setUp(self):
- """Setup the test harness."""
- # Setup logging with a timestamp, the module, and the log level.
- logging.basicConfig(level=logging.DEBUG,
- format=('%(asctime)s - %(module)s -'
- ' %(levelname)s - %(message)s'))
-
- # Create a tempfile that would represent the EC UART PTY.
- self.tempfile = tempfile.NamedTemporaryFile()
-
- # Create the pipes that the interpreter will use.
- self.cmd_pipe_user, self.cmd_pipe_itpr = threadproc_shim.Pipe()
- self.dbg_pipe_user, self.dbg_pipe_itpr = threadproc_shim.Pipe(duplex=False)
-
- # Mock the open() function so we can inspect reads/writes to the EC.
- self.ec_uart_pty = mock.mock_open()
-
- with mock.patch(GetBuiltins('open'), self.ec_uart_pty):
- # Create an interpreter.
- self.itpr = interpreter.Interpreter(self.tempfile.name,
- self.cmd_pipe_itpr,
- self.dbg_pipe_itpr,
- log_level=logging.DEBUG,
- name="EC")
-
- # First, check that interpreter is initialized to connected.
- self.assertTrue(self.itpr.connected, ('The interpreter should be'
- ' initialized in a connected state'))
-
- def test_DisconnectStopsECTraffic(self):
- """Verify that when in disconnected state, no debug prints are sent."""
- # Let's send a disconnect command through the command pipe.
- self.cmd_pipe_user.send(b'disconnect')
- self.itpr.HandleUserData()
-
- # Verify interpreter is disconnected from EC.
- self.assertFalse(self.itpr.connected, ('The interpreter should be'
- 'disconnected.'))
- # Verify that the EC UART is no longer a member of the inputs. The
- # interpreter will never pull data from the EC if it's not a member of the
- # inputs list.
- self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs)
-
- def test_CommandsDroppedWhenDisconnected(self):
- """Verify that when in disconnected state, commands are dropped."""
- # Send a command, followed by 'disconnect'.
- self.cmd_pipe_user.send(b'taskinfo')
- self.itpr.HandleUserData()
- self.cmd_pipe_user.send(b'disconnect')
- self.itpr.HandleUserData()
-
- # Verify interpreter is disconnected from EC.
- self.assertFalse(self.itpr.connected, ('The interpreter should be'
- 'disconnected.'))
- # Verify that the EC UART is no longer a member of the inputs nor outputs.
- self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs)
- self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs)
-
- # Have the user send a few more commands in the disconnected state.
- command = 'help\n'
- for char in command:
- self.cmd_pipe_user.send(char.encode('utf-8'))
- self.itpr.HandleUserData()
-
- # The command queue should be empty.
- self.assertEqual(0, self.itpr.ec_cmd_queue.qsize())
-
- # Now send the reconnect command.
- self.cmd_pipe_user.send(b'reconnect')
-
- with mock.patch(GetBuiltins('open'), mock.mock_open()):
- self.itpr.HandleUserData()
-
- # Verify interpreter is connected.
- self.assertTrue(self.itpr.connected)
- # Verify that EC UART is a member of the inputs.
- self.assertTrue(self.itpr.ec_uart_pty in self.itpr.inputs)
- # Since no command was sent after reconnection, verify that the EC UART is
- # not a member of the outputs.
- self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs)
-
- def test_ReconnectAllowsECTraffic(self):
- """Verify that when connected, EC UART traffic is allowed."""
- # Let's send a disconnect command through the command pipe.
- self.cmd_pipe_user.send(b'disconnect')
- self.itpr.HandleUserData()
-
- # Verify interpreter is disconnected.
- self.assertFalse(self.itpr.connected, ('The interpreter should be'
- 'disconnected.'))
- # Verify that the EC UART is no longer a member of the inputs nor outputs.
- self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs)
- self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs)
-
- # Issue reconnect command through the command pipe.
- self.cmd_pipe_user.send(b'reconnect')
-
- with mock.patch(GetBuiltins('open'), mock.mock_open()):
- self.itpr.HandleUserData()
-
- # Verify interpreter is connected.
- self.assertTrue(self.itpr.connected, ('The interpreter should be'
- 'connected.'))
- # Verify that the EC UART is now a member of the inputs.
- self.assertTrue(self.itpr.ec_uart_pty in self.itpr.inputs)
- # Since we have issued no commands during the disconnected state, no
- # commands are pending and therefore the PTY should not be added to the
- # outputs.
- self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs)
-
-
-if __name__ == '__main__':
- unittest.main()
+ """Test case to verify interpreter disconnection/reconnection."""
+
+ def setUp(self):
+ """Setup the test harness."""
+ # Setup logging with a timestamp, the module, and the log level.
+ logging.basicConfig(
+ level=logging.DEBUG,
+ format=("%(asctime)s - %(module)s -" " %(levelname)s - %(message)s"),
+ )
+
+ # Create a tempfile that would represent the EC UART PTY.
+ self.tempfile = tempfile.NamedTemporaryFile()
+
+ # Create the pipes that the interpreter will use.
+ self.cmd_pipe_user, self.cmd_pipe_itpr = threadproc_shim.Pipe()
+ self.dbg_pipe_user, self.dbg_pipe_itpr = threadproc_shim.Pipe(duplex=False)
+
+ # Mock the open() function so we can inspect reads/writes to the EC.
+ self.ec_uart_pty = mock.mock_open()
+
+ with mock.patch(GetBuiltins("open"), self.ec_uart_pty):
+ # Create an interpreter.
+ self.itpr = interpreter.Interpreter(
+ self.tempfile.name,
+ self.cmd_pipe_itpr,
+ self.dbg_pipe_itpr,
+ log_level=logging.DEBUG,
+ name="EC",
+ )
+
+ # First, check that interpreter is initialized to connected.
+ self.assertTrue(
+ self.itpr.connected,
+ ("The interpreter should be" " initialized in a connected state"),
+ )
+
+ def test_DisconnectStopsECTraffic(self):
+ """Verify that when in disconnected state, no debug prints are sent."""
+ # Let's send a disconnect command through the command pipe.
+ self.cmd_pipe_user.send(b"disconnect")
+ self.itpr.HandleUserData()
+
+ # Verify interpreter is disconnected from EC.
+ self.assertFalse(
+ self.itpr.connected, ("The interpreter should be" "disconnected.")
+ )
+ # Verify that the EC UART is no longer a member of the inputs. The
+ # interpreter will never pull data from the EC if it's not a member of the
+ # inputs list.
+ self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs)
+
+ def test_CommandsDroppedWhenDisconnected(self):
+ """Verify that when in disconnected state, commands are dropped."""
+ # Send a command, followed by 'disconnect'.
+ self.cmd_pipe_user.send(b"taskinfo")
+ self.itpr.HandleUserData()
+ self.cmd_pipe_user.send(b"disconnect")
+ self.itpr.HandleUserData()
+
+ # Verify interpreter is disconnected from EC.
+ self.assertFalse(
+ self.itpr.connected, ("The interpreter should be" "disconnected.")
+ )
+ # Verify that the EC UART is no longer a member of the inputs nor outputs.
+ self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs)
+ self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs)
+
+ # Have the user send a few more commands in the disconnected state.
+ command = "help\n"
+ for char in command:
+ self.cmd_pipe_user.send(char.encode("utf-8"))
+ self.itpr.HandleUserData()
+
+ # The command queue should be empty.
+ self.assertEqual(0, self.itpr.ec_cmd_queue.qsize())
+
+ # Now send the reconnect command.
+ self.cmd_pipe_user.send(b"reconnect")
+
+ with mock.patch(GetBuiltins("open"), mock.mock_open()):
+ self.itpr.HandleUserData()
+
+ # Verify interpreter is connected.
+ self.assertTrue(self.itpr.connected)
+ # Verify that EC UART is a member of the inputs.
+ self.assertTrue(self.itpr.ec_uart_pty in self.itpr.inputs)
+ # Since no command was sent after reconnection, verify that the EC UART is
+ # not a member of the outputs.
+ self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs)
+
+ def test_ReconnectAllowsECTraffic(self):
+ """Verify that when connected, EC UART traffic is allowed."""
+ # Let's send a disconnect command through the command pipe.
+ self.cmd_pipe_user.send(b"disconnect")
+ self.itpr.HandleUserData()
+
+ # Verify interpreter is disconnected.
+ self.assertFalse(
+ self.itpr.connected, ("The interpreter should be" "disconnected.")
+ )
+ # Verify that the EC UART is no longer a member of the inputs nor outputs.
+ self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs)
+ self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs)
+
+ # Issue reconnect command through the command pipe.
+ self.cmd_pipe_user.send(b"reconnect")
+
+ with mock.patch(GetBuiltins("open"), mock.mock_open()):
+ self.itpr.HandleUserData()
+
+ # Verify interpreter is connected.
+ self.assertTrue(self.itpr.connected, ("The interpreter should be" "connected."))
+ # Verify that the EC UART is now a member of the inputs.
+ self.assertTrue(self.itpr.ec_uart_pty in self.itpr.inputs)
+ # Since we have issued no commands during the disconnected state, no
+ # commands are pending and therefore the PTY should not be added to the
+ # outputs.
+ self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/util/ec3po/threadproc_shim.py b/util/ec3po/threadproc_shim.py
index da5440b1f3..c0b3ce0bf4 100644
--- a/util/ec3po/threadproc_shim.py
+++ b/util/ec3po/threadproc_shim.py
@@ -34,33 +34,34 @@ wait until after completing the TODO above to stop using multiprocessing.Pipe!
# Imports to bring objects into this namespace for users of this module.
from multiprocessing import Pipe
-from six.moves.queue import Queue
from threading import Thread as ThreadOrProcess
+from six.moves.queue import Queue
+
# True if this module has ec3po using subprocesses, False if using threads.
USING_SUBPROCS = False
def _DoNothing():
- """Do-nothing function for use as a callback with DoIf()."""
+ """Do-nothing function for use as a callback with DoIf()."""
def DoIf(subprocs=_DoNothing, threads=_DoNothing):
- """Return a callback or not based on ec3po use of subprocesses or threads.
+ """Return a callback or not based on ec3po use of subprocesses or threads.
- Args:
- subprocs: callback that does not require any args - This will be returned
- (not called!) if and only if ec3po is using subprocesses. This is
- OPTIONAL, the default value is a do-nothing callback that returns None.
- threads: callback that does not require any args - This will be returned
- (not called!) if and only if ec3po is using threads. This is OPTIONAL,
- the default value is a do-nothing callback that returns None.
+ Args:
+ subprocs: callback that does not require any args - This will be returned
+ (not called!) if and only if ec3po is using subprocesses. This is
+ OPTIONAL, the default value is a do-nothing callback that returns None.
+ threads: callback that does not require any args - This will be returned
+ (not called!) if and only if ec3po is using threads. This is OPTIONAL,
+ the default value is a do-nothing callback that returns None.
- Returns:
- Either the subprocs or threads argument will be returned.
- """
- return subprocs if USING_SUBPROCS else threads
+ Returns:
+ Either the subprocs or threads argument will be returned.
+ """
+ return subprocs if USING_SUBPROCS else threads
def Value(ctype, *args):
- return ctype(*args)
+ return ctype(*args)