diff options
author | Jack Rosenthal <jrosenth@chromium.org> | 2021-11-04 12:11:58 -0600 |
---|---|---|
committer | Commit Bot <commit-bot@chromium.org> | 2021-11-05 04:22:34 +0000 |
commit | 252457d4b21f46889eebad61d4c0a65331919cec (patch) | |
tree | 01856c4d31d710b20e85a74c8d7b5836e35c3b98 /util/ec3po | |
parent | 08f5a1e6fc2c9467230444ac9b582dcf4d9f0068 (diff) | |
download | chrome-ec-stabilize-14333.B-ish.tar.gz |
ish: Trim down the release branchstabilize-wristpin-14469.59.B-ishstabilize-voshyr-14637.B-ishstabilize-quickfix-14695.187.B-ishstabilize-quickfix-14695.124.B-ishstabilize-quickfix-14526.91.B-ishstabilize-14695.85.B-ishstabilize-14695.107.B-ishstabilize-14682.B-ishstabilize-14633.B-ishstabilize-14616.B-ishstabilize-14589.B-ishstabilize-14588.98.B-ishstabilize-14588.14.B-ishstabilize-14588.123.B-ishstabilize-14536.B-ishstabilize-14532.B-ishstabilize-14528.B-ishstabilize-14526.89.B-ishstabilize-14526.84.B-ishstabilize-14526.73.B-ishstabilize-14526.67.B-ishstabilize-14526.57.B-ishstabilize-14498.B-ishstabilize-14496.B-ishstabilize-14477.B-ishstabilize-14469.9.B-ishstabilize-14469.8.B-ishstabilize-14469.58.B-ishstabilize-14469.41.B-ishstabilize-14442.B-ishstabilize-14438.B-ishstabilize-14411.B-ishstabilize-14396.B-ishstabilize-14395.B-ishstabilize-14388.62.B-ishstabilize-14388.61.B-ishstabilize-14388.52.B-ishstabilize-14385.B-ishstabilize-14345.B-ishstabilize-14336.B-ishstabilize-14333.B-ishrelease-R99-14469.B-ishrelease-R98-14388.B-ishrelease-R102-14695.B-ishrelease-R101-14588.B-ishrelease-R100-14526.B-ishfirmware-cherry-14454.B-ishfirmware-brya-14505.B-ishfirmware-brya-14505.71.B-ishfactory-kukui-14374.B-ishfactory-guybrush-14600.B-ishfactory-cherry-14455.B-ishfactory-brya-14517.B-ish
In the interest of making long-term branch maintenance incur as little
technical debt on us as possible, we should not maintain any files on
the branch we are not actually using.
This has the added effect of making it extremely clear when merging CLs
from the main branch when changes have the possibility to affect us.
The follow-on CL adds a convenience script to actually pull updates from
the main branch and generate a CL for the update.
BUG=b:204206272
BRANCH=ish
TEST=make BOARD=arcada_ish && make BOARD=drallion_ish
Signed-off-by: Jack Rosenthal <jrosenth@chromium.org>
Change-Id: I17e4694c38219b5a0823e0a3e55a28d1348f4b18
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/ec/+/3262038
Reviewed-by: Jett Rink <jettrink@chromium.org>
Reviewed-by: Tom Hughes <tomhughes@chromium.org>
Diffstat (limited to 'util/ec3po')
-rw-r--r-- | util/ec3po/OWNERS | 3 | ||||
-rw-r--r-- | util/ec3po/__init__.py | 21 | ||||
-rwxr-xr-x | util/ec3po/console.py | 1171 | ||||
-rwxr-xr-x | util/ec3po/console_unittest.py | 1569 | ||||
-rw-r--r-- | util/ec3po/interpreter.py | 467 | ||||
-rwxr-xr-x | util/ec3po/interpreter_unittest.py | 380 | ||||
-rwxr-xr-x | util/ec3po/run_tests.sh | 15 | ||||
-rw-r--r-- | util/ec3po/threadproc_shim.py | 66 |
8 files changed, 0 insertions, 3692 deletions
diff --git a/util/ec3po/OWNERS b/util/ec3po/OWNERS deleted file mode 100644 index 5d4c97339d..0000000000 --- a/util/ec3po/OWNERS +++ /dev/null @@ -1,3 +0,0 @@ -aaboagye@chromium.org -coconutruben@chromium.org -matthewb@chromium.org diff --git a/util/ec3po/__init__.py b/util/ec3po/__init__.py deleted file mode 100644 index 376ffdba04..0000000000 --- a/util/ec3po/__init__.py +++ /dev/null @@ -1,21 +0,0 @@ -# Copyright 2015 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""The EC console interpreter. - -EC-3PO is a console interpreter which migrates the rich debug console from the -EC itself to the host. This allows for a rich debug console without impacting -EC image sizes while also allowing the development of new console features. - -The package consists of two modules: console and interpreter. The console -module provides the interactive console interface between the user and the -interpreter. It handles the presentation of the EC console including editing -methods as well as session-persistent command history. - -The interpreter module provides the interpretation layer between the EC UART and -the user. The user does not necessarily have to be the interactive console, but -could be something like autotest. The interpreter is also responsible for the -automatic command retrying if the EC drops a character in a command. This is a -stopgap until all commands are communicated via host commands. -""" diff --git a/util/ec3po/console.py b/util/ec3po/console.py deleted file mode 100755 index 9f28c8b7bf..0000000000 --- a/util/ec3po/console.py +++ /dev/null @@ -1,1171 +0,0 @@ -#!/usr/bin/env python -# Copyright 2015 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""EC-3PO Interactive Console Interface - -console provides the console interface between the user and the interpreter. It -handles the presentation of the EC console including editing methods as well as -session-persistent command history. -""" - -# Note: This is a py2/3 compatible file. - -from __future__ import print_function - -import argparse -import binascii -import ctypes -from datetime import datetime -import logging -import os -import pty -import re -import select -import stat -import sys -import traceback - -import six - -from ec3po import interpreter -from ec3po import threadproc_shim - - -PROMPT = b'> ' -CONSOLE_INPUT_LINE_SIZE = 80 # Taken from the CONFIG_* with the same name. -CONSOLE_MAX_READ = 100 # Max bytes to read at a time from the user. -LOOK_BUFFER_SIZE = 256 # Size of search window when looking for the enhanced EC - # image string. - -# In console_init(), the EC will print a string saying that the EC console is -# enabled. Enhanced images will print a slightly different string. These -# regular expressions are used to determine at reboot whether the EC image is -# enhanced or not. -ENHANCED_IMAGE_RE = re.compile(br'Enhanced Console is enabled ' - br'\(v([0-9]+\.[0-9]+\.[0-9]+)\)') -NON_ENHANCED_IMAGE_RE = re.compile(br'Console is enabled; ') - -# The timeouts are really only useful for enhanced EC images, but otherwise just -# serve as a delay for non-enhanced EC images. Therefore, we can keep this -# value small enough so that there's not too much of a delay, but long enough -# that enhanced EC images can respond in time. Once we've detected an enhanced -# EC image, we can increase the timeout for stability just in case it takes a -# bit longer to receive an ACK for some reason. -NON_ENHANCED_EC_INTERROGATION_TIMEOUT = 0.3 # Maximum number of seconds to wait - # for a response to an - # interrogation of a non-enhanced - # EC image. -ENHANCED_EC_INTERROGATION_TIMEOUT = 1.0 # Maximum number of seconds to wait for - # a response to an interrogation of an - # enhanced EC image. -# List of modes which control when interrogations are performed with the EC. -INTERROGATION_MODES = [b'never', b'always', b'auto'] -# Format for printing host timestamp -HOST_STRFTIME="%y-%m-%d %H:%M:%S.%f" - - -class EscState(object): - """Class which contains an enumeration for states of ESC sequences.""" - ESC_START = 1 - ESC_BRACKET = 2 - ESC_BRACKET_1 = 3 - ESC_BRACKET_3 = 4 - ESC_BRACKET_8 = 5 - - -class ControlKey(object): - """Class which contains codes for various control keys.""" - BACKSPACE = 0x08 - CTRL_A = 0x01 - CTRL_B = 0x02 - CTRL_D = 0x04 - CTRL_E = 0x05 - CTRL_F = 0x06 - CTRL_K = 0x0b - CTRL_N = 0xe - CTRL_P = 0x10 - CARRIAGE_RETURN = 0x0d - ESC = 0x1b - - -class Console(object): - """Class which provides the console interface between the EC and the user. - - This class essentially represents the console interface between the user and - the EC. It handles all of the console editing behaviour - - Attributes: - logger: A logger for this module. - master_pty: File descriptor to the master side of the PTY. Used for driving - output to the user and receiving user input. - user_pty: A string representing the PTY name of the served console. - cmd_pipe: A socket.socket or multiprocessing.Connection object which - represents the console side of the command pipe. This must be a - bidirectional pipe. Console commands and responses utilize this pipe. - dbg_pipe: A socket.socket or multiprocessing.Connection object which - represents the console's read-only side of the debug pipe. This must be a - unidirectional pipe attached to the intepreter. EC debug messages use - this pipe. - oobm_queue: A queue.Queue or multiprocessing.Queue which is used for out of - band management for the interactive console. - input_buffer: A string representing the current input command. - input_buffer_pos: An integer representing the current position in the buffer - to insert a char. - partial_cmd: A string representing the command entered on a line before - pressing the up arrow keys. - esc_state: An integer represeting the current state within an escape - sequence. - line_limit: An integer representing the maximum number of characters on a - line. - history: A list of strings containing the past entered console commands. - history_pos: An integer representing the current history buffer position. - This index is used to show previous commands. - prompt: A string representing the console prompt displayed to the user. - enhanced_ec: A boolean indicating if the EC image that we are currently - communicating with is enhanced or not. Enhanced EC images will support - packed commands and host commands over the UART. This defaults to False - until we perform some handshaking. - interrogation_timeout: A float representing the current maximum seconds to - wait for a response to an interrogation. - receiving_oobm_cmd: A boolean indicating whether or not the console is in - the middle of receiving an out of band command. - pending_oobm_cmd: A string containing the pending OOBM command. - interrogation_mode: A string containing the current mode of whether - interrogations are performed with the EC or not and how often. - raw_debug: Flag to indicate whether per interrupt data should be logged to - debug - output_line_log_buffer: buffer for lines coming from the EC to log to debug - """ - - def __init__(self, master_pty, user_pty, interface_pty, cmd_pipe, dbg_pipe, - name=None): - """Initalises a Console object with the provided arguments. - - Args: - master_pty: File descriptor to the master side of the PTY. Used for driving - output to the user and receiving user input. - user_pty: A string representing the PTY name of the served console. - interface_pty: A string representing the PTY name of the served command - interface. - cmd_pipe: A socket.socket or multiprocessing.Connection object which - represents the console side of the command pipe. This must be a - bidirectional pipe. Console commands and responses utilize this pipe. - dbg_pipe: A socket.socket or multiprocessing.Connection object which - represents the console's read-only side of the debug pipe. This must be a - unidirectional pipe attached to the intepreter. EC debug messages use - this pipe. - name: the console source name - """ - # Create a unique logger based on the console name - console_prefix = ('%s - ' % name) if name else '' - logger = logging.getLogger('%sEC3PO.Console' % console_prefix) - self.logger = interpreter.LoggerAdapter(logger, {'pty': user_pty}) - self.master_pty = master_pty - self.user_pty = user_pty - self.interface_pty = interface_pty - self.cmd_pipe = cmd_pipe - self.dbg_pipe = dbg_pipe - self.oobm_queue = threadproc_shim.Queue() - self.input_buffer = b'' - self.input_buffer_pos = 0 - self.partial_cmd = b'' - self.esc_state = 0 - self.line_limit = CONSOLE_INPUT_LINE_SIZE - self.history = [] - self.history_pos = 0 - self.prompt = PROMPT - self.enhanced_ec = False - self.interrogation_timeout = NON_ENHANCED_EC_INTERROGATION_TIMEOUT - self.receiving_oobm_cmd = False - self.pending_oobm_cmd = b'' - self.interrogation_mode = b'auto' - self.timestamp_enabled = True - self.look_buffer = b'' - self.raw_debug = False - self.output_line_log_buffer = [] - - def __str__(self): - """Show internal state of Console object as a string.""" - string = [] - string.append('master_pty: %s' % self.master_pty) - string.append('user_pty: %s' % self.user_pty) - string.append('interface_pty: %s' % self.interface_pty) - string.append('cmd_pipe: %s' % self.cmd_pipe) - string.append('dbg_pipe: %s' % self.dbg_pipe) - string.append('oobm_queue: %s' % self.oobm_queue) - string.append('input_buffer: %s' % self.input_buffer) - string.append('input_buffer_pos: %d' % self.input_buffer_pos) - string.append('esc_state: %d' % self.esc_state) - string.append('line_limit: %d' % self.line_limit) - string.append('history: %r' % self.history) - string.append('history_pos: %d' % self.history_pos) - string.append('prompt: %r' % self.prompt) - string.append('partial_cmd: %r'% self.partial_cmd) - string.append('interrogation_mode: %r' % self.interrogation_mode) - string.append('look_buffer: %r' % self.look_buffer) - return '\n'.join(string) - - def LogConsoleOutput(self, data): - """Log to debug user MCU output to master_pty when line is filled. - - The logging also suppresses the Cr50 spinner lines by removing characters - when it sees backspaces. - - Args: - data: bytes - string received from MCU - """ - data = list(data) - # For compatibility with python2 and python3, standardize on the data - # being a list of integers. This requires one more transformation in py2 - if not isinstance(data[0], int): - data = [ord(c) for c in data] - - # This is a list of already filtered characters (or placeholders). - line = self.output_line_log_buffer - - # TODO(b/177480273): use raw strings here - symbols = { - ord(b'\n'): u'\\n', - ord(b'\r'): u'\\r', - ord(b'\t'): u'\\t' - } - # self.logger.debug(u'%s + %r', u''.join(line), ''.join(data)) - while data: - # Recall, data is a list of integers, namely the byte values sent by - # the MCU. - byte = data.pop(0) - # This means that |byte| is an int. - if byte == ord('\n'): - line.append(symbols[byte]) - if line: - self.logger.debug(u'%s', ''.join(line)) - line = [] - elif byte == ord('\b'): - # Backspace: trim the last character off the buffer - if line: - line.pop(-1) - elif byte in symbols: - line.append(symbols[byte]) - elif byte < ord(' ') or byte > ord('~'): - # Turn any character that isn't printable ASCII into escaped hex. - # ' ' is chr(20), and 0-19 are unprintable control characters. - # '~' is chr(126), and 127 is DELETE. 128-255 are control and Latin-1. - line.append(u'\\x%02x' % byte) - else: - # byte is printable. Thus it is safe to use chr() to get the printable - # character out of it again. - line.append(u'%s' % chr(byte)) - self.output_line_log_buffer = line - - def PrintHistory(self): - """Print the history of entered commands.""" - fd = self.master_pty - # Make it pretty by figuring out how wide to pad the numbers. - wide = (len(self.history) // 10) + 1 - for i in range(len(self.history)): - line = b' %*d %s\r\n' % (wide, i, self.history[i]) - os.write(fd, line) - - def ShowPreviousCommand(self): - """Shows the previous command from the history list.""" - # There's nothing to do if there's no history at all. - if not self.history: - self.logger.debug('No history to print.') - return - - # Don't do anything if there's no more history to show. - if self.history_pos == 0: - self.logger.debug('No more history to show.') - return - - self.logger.debug('current history position: %d.', self.history_pos) - - # Decrement the history buffer position. - self.history_pos -= 1 - self.logger.debug('new history position.: %d', self.history_pos) - - # Save the text entered on the console if any. - if self.history_pos == len(self.history)-1: - self.logger.debug('saving partial_cmd: %r', self.input_buffer) - self.partial_cmd = self.input_buffer - - # Backspace the line. - for _ in range(self.input_buffer_pos): - self.SendBackspace() - - # Print the last entry in the history buffer. - self.logger.debug('printing previous entry %d - %s', self.history_pos, - self.history[self.history_pos]) - fd = self.master_pty - prev_cmd = self.history[self.history_pos] - os.write(fd, prev_cmd) - # Update the input buffer. - self.input_buffer = prev_cmd - self.input_buffer_pos = len(prev_cmd) - - def ShowNextCommand(self): - """Shows the next command from the history list.""" - # Don't do anything if there's no history at all. - if not self.history: - self.logger.debug('History buffer is empty.') - return - - fd = self.master_pty - - self.logger.debug('current history position: %d', self.history_pos) - # Increment the history position. - self.history_pos += 1 - - # Restore the partial cmd. - if self.history_pos == len(self.history): - self.logger.debug('Restoring partial command of %r', self.partial_cmd) - # Backspace the line. - for _ in range(self.input_buffer_pos): - self.SendBackspace() - # Print the partially entered command if any. - os.write(fd, self.partial_cmd) - self.input_buffer = self.partial_cmd - self.input_buffer_pos = len(self.input_buffer) - # Now that we've printed it, clear the partial cmd storage. - self.partial_cmd = b'' - # Reset history position. - self.history_pos = len(self.history) - return - - self.logger.debug('new history position: %d', self.history_pos) - if self.history_pos > len(self.history)-1: - self.logger.debug('No more history to show.') - self.history_pos -= 1 - self.logger.debug('Reset history position to %d', self.history_pos) - return - - # Backspace the line. - for _ in range(self.input_buffer_pos): - self.SendBackspace() - - # Print the newer entry from the history buffer. - self.logger.debug('printing next entry %d - %s', self.history_pos, - self.history[self.history_pos]) - next_cmd = self.history[self.history_pos] - os.write(fd, next_cmd) - # Update the input buffer. - self.input_buffer = next_cmd - self.input_buffer_pos = len(next_cmd) - self.logger.debug('new history position: %d.', self.history_pos) - - def SliceOutChar(self): - """Remove a char from the line and shift everything over 1 column.""" - fd = self.master_pty - # Remove the character at the input_buffer_pos by slicing it out. - self.input_buffer = self.input_buffer[0:self.input_buffer_pos] + \ - self.input_buffer[self.input_buffer_pos+1:] - # Write the rest of the line - moved_col = os.write(fd, self.input_buffer[self.input_buffer_pos:]) - # Write a space to clear out the last char - moved_col += os.write(fd, b' ') - # Update the input buffer position. - self.input_buffer_pos += moved_col - # Reset the cursor - self.MoveCursor('left', moved_col) - - def HandleEsc(self, byte): - """HandleEsc processes escape sequences. - - Args: - byte: An integer representing the current byte in the sequence. - """ - # We shouldn't be handling an escape sequence if we haven't seen one. - assert self.esc_state != 0 - - if self.esc_state is EscState.ESC_START: - self.logger.debug('ESC_START') - if byte == ord('['): - self.esc_state = EscState.ESC_BRACKET - return - - else: - self.logger.error('Unexpected sequence. %c', byte) - self.esc_state = 0 - - elif self.esc_state is EscState.ESC_BRACKET: - self.logger.debug('ESC_BRACKET') - # Left Arrow key was pressed. - if byte == ord('D'): - self.logger.debug('Left arrow key pressed.') - self.MoveCursor('left', 1) - self.esc_state = 0 # Reset the state. - return - - # Right Arrow key. - elif byte == ord('C'): - self.logger.debug('Right arrow key pressed.') - self.MoveCursor('right', 1) - self.esc_state = 0 # Reset the state. - return - - # Up Arrow key. - elif byte == ord('A'): - self.logger.debug('Up arrow key pressed.') - self.ShowPreviousCommand() - # Reset the state. - self.esc_state = 0 # Reset the state. - return - - # Down Arrow key. - elif byte == ord('B'): - self.logger.debug('Down arrow key pressed.') - self.ShowNextCommand() - # Reset the state. - self.esc_state = 0 # Reset the state. - return - - # For some reason, minicom sends a 1 instead of 7. /shrug - # TODO(aaboagye): Figure out why this happens. - elif byte == ord('1') or byte == ord('7'): - self.esc_state = EscState.ESC_BRACKET_1 - - elif byte == ord('3'): - self.esc_state = EscState.ESC_BRACKET_3 - - elif byte == ord('8'): - self.esc_state = EscState.ESC_BRACKET_8 - - else: - self.logger.error(r'Bad or unhandled escape sequence. got ^[%c\(%d)', - chr(byte), byte) - self.esc_state = 0 - return - - elif self.esc_state is EscState.ESC_BRACKET_1: - self.logger.debug('ESC_BRACKET_1') - # HOME key. - if byte == ord('~'): - self.logger.debug('Home key pressed.') - self.MoveCursor('left', self.input_buffer_pos) - self.esc_state = 0 # Reset the state. - self.logger.debug('ESC sequence complete.') - return - - elif self.esc_state is EscState.ESC_BRACKET_3: - self.logger.debug('ESC_BRACKET_3') - # DEL key. - if byte == ord('~'): - self.logger.debug('Delete key pressed.') - if self.input_buffer_pos != len(self.input_buffer): - self.SliceOutChar() - self.esc_state = 0 # Reset the state. - - elif self.esc_state is EscState.ESC_BRACKET_8: - self.logger.debug('ESC_BRACKET_8') - # END key. - if byte == ord('~'): - self.logger.debug('End key pressed.') - self.MoveCursor('right', - len(self.input_buffer) - self.input_buffer_pos) - self.esc_state = 0 # Reset the state. - self.logger.debug('ESC sequence complete.') - return - - else: - self.logger.error('Unexpected sequence. %c', byte) - self.esc_state = 0 - - else: - self.logger.error('Unexpected sequence. %c', byte) - self.esc_state = 0 - - def ProcessInput(self): - """Captures the input determines what actions to take.""" - # There's nothing to do if the input buffer is empty. - if len(self.input_buffer) == 0: - return - - # Don't store 2 consecutive identical commands in the history. - if (self.history and self.history[-1] != self.input_buffer - or not self.history): - self.history.append(self.input_buffer) - - # Split the command up by spaces. - line = self.input_buffer.split(b' ') - self.logger.debug('cmd: %s', self.input_buffer) - cmd = line[0].lower() - - # The 'history' command is a special case that we handle locally. - if cmd == 'history': - self.PrintHistory() - return - - # Send the command to the interpreter. - self.logger.debug('Sending command to interpreter.') - self.cmd_pipe.send(self.input_buffer) - - def CheckForEnhancedECImage(self): - """Performs an interrogation of the EC image. - - Send a SYN and expect an ACK. If no ACK or the response is incorrect, then - assume that the current EC image that we are talking to is not enhanced. - - Returns: - is_enhanced: A boolean indicating whether the EC responded to the - interrogation correctly. - - Raises: - EOFError: Allowed to propagate through from self.dbg_pipe.recv(). - """ - # Send interrogation byte and wait for the response. - self.logger.debug('Performing interrogation.') - self.cmd_pipe.send(interpreter.EC_SYN) - - response = '' - if self.dbg_pipe.poll(self.interrogation_timeout): - response = self.dbg_pipe.recv() - self.logger.debug('response: %r', binascii.hexlify(response)) - else: - self.logger.debug('Timed out waiting for EC_ACK') - - # Verify the acknowledgment. - is_enhanced = response == interpreter.EC_ACK - - if is_enhanced: - # Increase the interrogation timeout for stability purposes. - self.interrogation_timeout = ENHANCED_EC_INTERROGATION_TIMEOUT - self.logger.debug('Increasing interrogation timeout to %rs.', - self.interrogation_timeout) - else: - # Reduce the timeout in order to reduce the perceivable delay. - self.interrogation_timeout = NON_ENHANCED_EC_INTERROGATION_TIMEOUT - self.logger.debug('Reducing interrogation timeout to %rs.', - self.interrogation_timeout) - - return is_enhanced - - def HandleChar(self, byte): - """HandleChar does a certain action when it receives a character. - - Args: - byte: An integer representing the character received from the user. - - Raises: - EOFError: Allowed to propagate through from self.CheckForEnhancedECImage() - i.e. from self.dbg_pipe.recv(). - """ - fd = self.master_pty - - # Enter the OOBM prompt mode if the user presses '%'. - if byte == ord('%'): - self.logger.debug('Begin OOBM command.') - self.receiving_oobm_cmd = True - # Print a "prompt". - os.write(self.master_pty, b'\r\n% ') - return - - # Add chars to the pending OOBM command if we're currently receiving one. - if self.receiving_oobm_cmd and byte != ControlKey.CARRIAGE_RETURN: - tmp_bytes = six.int2byte(byte) - self.pending_oobm_cmd += tmp_bytes - self.logger.debug('%s', tmp_bytes) - os.write(self.master_pty, tmp_bytes) - return - - if byte == ControlKey.CARRIAGE_RETURN: - if self.receiving_oobm_cmd: - # Terminate the command and place it in the OOBM queue. - self.logger.debug('End OOBM command.') - if self.pending_oobm_cmd: - self.oobm_queue.put(self.pending_oobm_cmd) - self.logger.debug('Placed %r into OOBM command queue.', - self.pending_oobm_cmd) - - # Reset the state. - os.write(self.master_pty, b'\r\n' + self.prompt) - self.input_buffer = b'' - self.input_buffer_pos = 0 - self.receiving_oobm_cmd = False - self.pending_oobm_cmd = b'' - return - - if self.interrogation_mode == b'never': - self.logger.debug('Skipping interrogation because interrogation mode' - ' is set to never.') - elif self.interrogation_mode == b'always': - # Only interrogate the EC if the interrogation mode is set to 'always'. - self.enhanced_ec = self.CheckForEnhancedECImage() - self.logger.debug('Enhanced EC image? %r', self.enhanced_ec) - - if not self.enhanced_ec: - # Send everything straight to the EC to handle. - self.cmd_pipe.send(six.int2byte(byte)) - # Reset the input buffer. - self.input_buffer = b'' - self.input_buffer_pos = 0 - self.logger.log(1, 'Reset input buffer.') - return - - # Keep handling the ESC sequence if we're in the middle of it. - if self.esc_state != 0: - self.HandleEsc(byte) - return - - # When we're at the end of the line, we should only allow going backwards, - # backspace, carriage return, up, or down. The arrow keys are escape - # sequences, so we let the escape...escape. - if (self.input_buffer_pos >= self.line_limit and - byte not in [ControlKey.CTRL_B, ControlKey.ESC, ControlKey.BACKSPACE, - ControlKey.CTRL_A, ControlKey.CARRIAGE_RETURN, - ControlKey.CTRL_P, ControlKey.CTRL_N]): - return - - # If the input buffer is full we can't accept new chars. - buffer_full = len(self.input_buffer) >= self.line_limit - - - # Carriage_Return/Enter - if byte == ControlKey.CARRIAGE_RETURN: - self.logger.debug('Enter key pressed.') - # Put a carriage return/newline and the print the prompt. - os.write(fd, b'\r\n') - - # TODO(aaboagye): When we control the printing of all output, print the - # prompt AFTER printing all the output. We can't do it yet because we - # don't know how much is coming from the EC. - - # Print the prompt. - os.write(fd, self.prompt) - # Process the input. - self.ProcessInput() - # Now, clear the buffer. - self.input_buffer = b'' - self.input_buffer_pos = 0 - # Reset history buffer pos. - self.history_pos = len(self.history) - # Clear partial command. - self.partial_cmd = b'' - - # Backspace - elif byte == ControlKey.BACKSPACE: - self.logger.debug('Backspace pressed.') - if self.input_buffer_pos > 0: - # Move left 1 column. - self.MoveCursor('left', 1) - # Remove the character at the input_buffer_pos by slicing it out. - self.SliceOutChar() - - self.logger.debug('input_buffer_pos: %d', self.input_buffer_pos) - - # Ctrl+A. Move cursor to beginning of the line - elif byte == ControlKey.CTRL_A: - self.logger.debug('Control+A pressed.') - self.MoveCursor('left', self.input_buffer_pos) - - # Ctrl+B. Move cursor left 1 column. - elif byte == ControlKey.CTRL_B: - self.logger.debug('Control+B pressed.') - self.MoveCursor('left', 1) - - # Ctrl+D. Delete a character. - elif byte == ControlKey.CTRL_D: - self.logger.debug('Control+D pressed.') - if self.input_buffer_pos != len(self.input_buffer): - # Remove the character by slicing it out. - self.SliceOutChar() - - # Ctrl+E. Move cursor to end of the line. - elif byte == ControlKey.CTRL_E: - self.logger.debug('Control+E pressed.') - self.MoveCursor('right', - len(self.input_buffer) - self.input_buffer_pos) - - # Ctrl+F. Move cursor right 1 column. - elif byte == ControlKey.CTRL_F: - self.logger.debug('Control+F pressed.') - self.MoveCursor('right', 1) - - # Ctrl+K. Kill line. - elif byte == ControlKey.CTRL_K: - self.logger.debug('Control+K pressed.') - self.KillLine() - - # Ctrl+N. Next line. - elif byte == ControlKey.CTRL_N: - self.logger.debug('Control+N pressed.') - self.ShowNextCommand() - - # Ctrl+P. Previous line. - elif byte == ControlKey.CTRL_P: - self.logger.debug('Control+P pressed.') - self.ShowPreviousCommand() - - # ESC sequence - elif byte == ControlKey.ESC: - # Starting an ESC sequence - self.esc_state = EscState.ESC_START - - # Only print printable chars. - elif IsPrintable(byte): - # Drop the character if we're full. - if buffer_full: - self.logger.debug('Dropped char: %c(%d)', byte, byte) - return - # Print the character. - os.write(fd, six.int2byte(byte)) - # Print the rest of the line (if any). - extra_bytes_written = os.write(fd, - self.input_buffer[self.input_buffer_pos:]) - - # Recreate the input buffer. - self.input_buffer = (self.input_buffer[0:self.input_buffer_pos] + - six.int2byte(byte) + - self.input_buffer[self.input_buffer_pos:]) - # Update the input buffer position. - self.input_buffer_pos += 1 + extra_bytes_written - - # Reset the cursor if we wrote any extra bytes. - if extra_bytes_written: - self.MoveCursor('left', extra_bytes_written) - - self.logger.debug('input_buffer_pos: %d', self.input_buffer_pos) - - def MoveCursor(self, direction, count): - """MoveCursor moves the cursor left or right by count columns. - - Args: - direction: A string that should be either 'left' or 'right' representing - the direction to move the cursor on the console. - count: An integer representing how many columns the cursor should be - moved. - - Raises: - AssertionError: If the direction is not equal to 'left' or 'right'. - """ - # If there's nothing to move, we're done. - if not count: - return - fd = self.master_pty - seq = b'\033[' + str(count).encode('ascii') - if direction == 'left': - # Bind the movement. - if count > self.input_buffer_pos: - count = self.input_buffer_pos - seq += b'D' - self.logger.debug('move cursor left %d', count) - self.input_buffer_pos -= count - - elif direction == 'right': - # Bind the movement. - if (count + self.input_buffer_pos) > len(self.input_buffer): - count = 0 - seq += b'C' - self.logger.debug('move cursor right %d', count) - self.input_buffer_pos += count - - else: - raise AssertionError(('The only valid directions are \'left\' and ' - '\'right\'')) - - self.logger.debug('input_buffer_pos: %d', self.input_buffer_pos) - # Move the cursor. - if count != 0: - os.write(fd, seq) - - def KillLine(self): - """Kill the rest of the line based on the input buffer position.""" - # Killing the line is killing all the text to the right. - diff = len(self.input_buffer) - self.input_buffer_pos - self.logger.debug('diff: %d', diff) - # Diff shouldn't be negative, but if it is for some reason, let's try to - # correct the cursor. - if diff < 0: - self.logger.warning('Resetting input buffer position to %d...', - len(self.input_buffer)) - self.MoveCursor('left', -diff) - return - if diff: - self.MoveCursor('right', diff) - for _ in range(diff): - self.SendBackspace() - self.input_buffer_pos -= diff - self.input_buffer = self.input_buffer[0:self.input_buffer_pos] - - def SendBackspace(self): - """Backspace a character on the console.""" - os.write(self.master_pty, b'\033[1D \033[1D') - - def ProcessOOBMQueue(self): - """Retrieve an item from the OOBM queue and process it.""" - item = self.oobm_queue.get() - self.logger.debug('OOBM cmd: %r', item) - cmd = item.split(b' ') - - if cmd[0] == b'loglevel': - # An integer is required in order to set the log level. - if len(cmd) < 2: - self.logger.debug('Insufficient args') - self.PrintOOBMHelp() - return - try: - self.logger.debug('Log level change request.') - new_log_level = int(cmd[1]) - self.logger.logger.setLevel(new_log_level) - self.logger.info('Log level changed to %d.', new_log_level) - - # Forward the request to the interpreter as well. - self.cmd_pipe.send(item) - except ValueError: - # Ignoring the request if an integer was not provided. - self.PrintOOBMHelp() - - elif cmd[0] == b'timestamp': - mode = cmd[1].lower() - self.timestamp_enabled = (mode == b'on') - self.logger.info('%sabling uart timestamps.', - 'En' if self.timestamp_enabled else 'Dis') - - elif cmd[0] == b'rawdebug': - mode = cmd[1].lower() - self.raw_debug = (mode == b'on') - self.logger.info('%sabling per interrupt debug logs.', - 'En' if self.raw_debug else 'Dis') - - elif cmd[0] == b'interrogate' and len(cmd) >= 2: - enhanced = False - mode = cmd[1] - if len(cmd) >= 3 and cmd[2] == b'enhanced': - enhanced = True - - # Set the mode if correct. - if mode in INTERROGATION_MODES: - self.interrogation_mode = mode - self.logger.debug('Updated interrogation mode to %s.', mode) - - # Update the assumptions of the EC image. - self.enhanced_ec = enhanced - self.logger.debug('Enhanced EC image is now %r', self.enhanced_ec) - - # Send command to interpreter as well. - self.cmd_pipe.send(b'enhanced ' + str(self.enhanced_ec).encode('ascii')) - else: - self.PrintOOBMHelp() - - else: - self.PrintOOBMHelp() - - def PrintOOBMHelp(self): - """Prints out the OOBM help.""" - # Print help syntax. - os.write(self.master_pty, b'\r\n' + b'Known OOBM commands:\r\n') - os.write(self.master_pty, b' interrogate <never | always | auto> ' - b'[enhanced]\r\n') - os.write(self.master_pty, b' loglevel <int>\r\n') - - def CheckBufferForEnhancedImage(self, data): - """Adds data to a look buffer and checks to see for enhanced EC image. - - The EC's console task prints a string upon initialization which says that - "Console is enabled; type HELP for help.". The enhanced EC images print a - different string as a part of their init. This function searches through a - "look" buffer, scanning for the presence of either of those strings and - updating the enhanced_ec state accordingly. - - Args: - data: A string containing the data sent from the interpreter. - """ - self.look_buffer += data - - # Search the buffer for any of the EC image strings. - enhanced_match = re.search(ENHANCED_IMAGE_RE, self.look_buffer) - non_enhanced_match = re.search(NON_ENHANCED_IMAGE_RE, self.look_buffer) - - # Update the state if any matches were found. - if enhanced_match or non_enhanced_match: - if enhanced_match: - self.enhanced_ec = True - elif non_enhanced_match: - self.enhanced_ec = False - - # Inform the interpreter of the result. - self.cmd_pipe.send(b'enhanced ' + str(self.enhanced_ec).encode('ascii')) - self.logger.debug('Enhanced EC image? %r', self.enhanced_ec) - - # Clear look buffer since a match was found. - self.look_buffer = b'' - - # Move the sliding window. - self.look_buffer = self.look_buffer[-LOOK_BUFFER_SIZE:] - - -def CanonicalizeTimeString(timestr): - """Canonicalize the timestamp string. - - Args: - timestr: A timestamp string ended with 6 digits msec. - - Returns: - A string with 3 digits msec and an extra space. - """ - return timestr[:-3].encode('ascii') + b' ' - - -def IsPrintable(byte): - """Determines if a byte is printable. - - Args: - byte: An integer potentially representing a printable character. - - Returns: - A boolean indicating whether the byte is a printable character. - """ - return byte >= ord(' ') and byte <= ord('~') - - -def StartLoop(console, command_active, shutdown_pipe=None): - """Starts the infinite loop of console processing. - - Args: - console: A Console object that has been properly initialzed. - command_active: ctypes data object or multiprocessing.Value indicating if - servod owns the console, or user owns the console. This prevents input - collisions. - shutdown_pipe: A file object for a pipe or equivalent that becomes readable - (not blocked) to indicate that the loop should exit. Can be None to never - exit the loop. - """ - try: - console.logger.debug('Console is being served on %s.', console.user_pty) - console.logger.debug('Console master is on %s.', console.master_pty) - console.logger.debug('Command interface is being served on %s.', - console.interface_pty) - console.logger.debug(console) - - # This checks for HUP to indicate if the user has connected to the pty. - ep = select.epoll() - ep.register(console.master_pty, select.EPOLLHUP) - - # This is used instead of "break" to avoid exiting the loop in the middle of - # an iteration. - continue_looping = True - - # Used for determining when to print host timestamps - tm_req = True - - while continue_looping: - # Check to see if pts is connected to anything - events = ep.poll(0) - master_connected = not events - - # Check to see if pipes or the console are ready for reading. - read_list = [console.interface_pty, - console.cmd_pipe, console.dbg_pipe] - if master_connected: - read_list.append(console.master_pty) - if shutdown_pipe is not None: - read_list.append(shutdown_pipe) - - # Check if any input is ready, or wait for .1 sec and re-poll if - # a user has connected to the pts. - select_output = select.select(read_list, [], [], .1) - if not select_output: - continue - ready_for_reading = select_output[0] - - for obj in ready_for_reading: - if obj is console.master_pty: - if not command_active.value: - # Convert to bytes so we can look for non-printable chars such as - # Ctrl+A, Ctrl+E, etc. - try: - line = bytearray(os.read(console.master_pty, CONSOLE_MAX_READ)) - console.logger.debug('Input from user: %s, locked:%s', - str(line).strip(), command_active.value) - for i in line: - try: - # Handle each character as it arrives. - console.HandleChar(i) - except EOFError: - console.logger.debug( - 'ec3po console received EOF from dbg_pipe in HandleChar()' - ' while reading console.master_pty') - continue_looping = False - break - except OSError: - console.logger.debug('Ptm read failed, probably user disconnect.') - - elif obj is console.interface_pty: - if command_active.value: - # Convert to bytes so we can look for non-printable chars such as - # Ctrl+A, Ctrl+E, etc. - line = bytearray(os.read(console.interface_pty, CONSOLE_MAX_READ)) - console.logger.debug('Input from interface: %s, locked:%s', - str(line).strip(), command_active.value) - for i in line: - try: - # Handle each character as it arrives. - console.HandleChar(i) - except EOFError: - console.logger.debug( - 'ec3po console received EOF from dbg_pipe in HandleChar()' - ' while reading console.interface_pty') - continue_looping = False - break - - elif obj is console.cmd_pipe: - try: - data = console.cmd_pipe.recv() - except EOFError: - console.logger.debug('ec3po console received EOF from cmd_pipe') - continue_looping = False - else: - # Write it to the user console. - if console.raw_debug: - console.logger.debug('|CMD|-%s->%r', - ('u' if master_connected else '') + - ('i' if command_active.value else ''), - data.strip()) - if master_connected: - os.write(console.master_pty, data) - if command_active.value: - os.write(console.interface_pty, data) - - elif obj is console.dbg_pipe: - try: - data = console.dbg_pipe.recv() - except EOFError: - console.logger.debug('ec3po console received EOF from dbg_pipe') - continue_looping = False - else: - if console.interrogation_mode == b'auto': - # Search look buffer for enhanced EC image string. - console.CheckBufferForEnhancedImage(data) - # Write it to the user console. - if len(data) > 1 and console.raw_debug: - console.logger.debug('|DBG|-%s->%r', - ('u' if master_connected else '') + - ('i' if command_active.value else ''), - data.strip()) - console.LogConsoleOutput(data) - if master_connected: - end = len(data) - 1 - if console.timestamp_enabled: - # A timestamp is required at the beginning of this line - if tm_req is True: - now = datetime.now() - tm = CanonicalizeTimeString(now.strftime(HOST_STRFTIME)) - os.write(console.master_pty, tm) - tm_req = False - - # Insert timestamps into the middle where appropriate - # except if the last character is a newline - nls_found = data.count(b'\n', 0, end) - now = datetime.now() - tm = CanonicalizeTimeString(now.strftime('\n' + HOST_STRFTIME)) - data_tm = data.replace(b'\n', tm, nls_found) - else: - data_tm = data - - # timestamp required on next input - if data[end] == b'\n'[0]: - tm_req = True - os.write(console.master_pty, data_tm) - if command_active.value: - os.write(console.interface_pty, data) - - elif obj is shutdown_pipe: - console.logger.debug( - 'ec3po console received shutdown pipe unblocked notification') - continue_looping = False - - while not console.oobm_queue.empty(): - console.logger.debug('OOBM queue ready for reading.') - console.ProcessOOBMQueue() - - except KeyboardInterrupt: - pass - - finally: - ep.unregister(console.master_pty) - console.dbg_pipe.close() - console.cmd_pipe.close() - os.close(console.master_pty) - os.close(console.interface_pty) - if shutdown_pipe is not None: - shutdown_pipe.close() - console.logger.debug('Exit ec3po console loop for %s', console.user_pty) - - -def main(argv): - """Kicks off the EC-3PO interactive console interface and interpreter. - - We create some pipes to communicate with an interpreter, instantiate an - interpreter, create a PTY pair, and begin serving the console interface. - - Args: - argv: A list of strings containing the arguments this module was called - with. - """ - # Set up argument parser. - parser = argparse.ArgumentParser(description=('Start interactive EC console ' - 'and interpreter.')) - parser.add_argument('ec_uart_pty', - help=('The full PTY name that the EC UART' - ' is present on. eg: /dev/pts/12')) - parser.add_argument('--log-level', - default='info', - help='info, debug, warning, error, or critical') - - # Parse arguments. - opts = parser.parse_args(argv) - - # Set logging level. - opts.log_level = opts.log_level.lower() - if opts.log_level == 'info': - log_level = logging.INFO - elif opts.log_level == 'debug': - log_level = logging.DEBUG - elif opts.log_level == 'warning': - log_level = logging.WARNING - elif opts.log_level == 'error': - log_level = logging.ERROR - elif opts.log_level == 'critical': - log_level = logging.CRITICAL - else: - parser.error('Invalid log level. (info, debug, warning, error, critical)') - - # Start logging with a timestamp, module, and log level shown in each log - # entry. - logging.basicConfig(level=log_level, format=('%(asctime)s - %(module)s -' - ' %(levelname)s - %(message)s')) - - # Create some pipes to communicate between the interpreter and the console. - # The command pipe is bidirectional. - cmd_pipe_interactive, cmd_pipe_interp = threadproc_shim.Pipe() - # The debug pipe is unidirectional from interpreter to console only. - dbg_pipe_interactive, dbg_pipe_interp = threadproc_shim.Pipe(duplex=False) - - # Create an interpreter instance. - itpr = interpreter.Interpreter(opts.ec_uart_pty, cmd_pipe_interp, - dbg_pipe_interp, log_level) - - # Spawn an interpreter process. - itpr_process = threadproc_shim.ThreadOrProcess( - target=interpreter.StartLoop, args=(itpr,)) - # Make sure to kill the interpreter when we terminate. - itpr_process.daemon = True - # Start the interpreter. - itpr_process.start() - - # Open a new pseudo-terminal pair - (master_pty, user_pty) = pty.openpty() - # Set the permissions to 660. - os.chmod(os.ttyname(user_pty), (stat.S_IRGRP | stat.S_IWGRP | - stat.S_IRUSR | stat.S_IWUSR)) - # Create a console. - console = Console(master_pty, os.ttyname(user_pty), cmd_pipe_interactive, - dbg_pipe_interactive) - # Start serving the console. - v = threadproc_shim.Value(ctypes.c_bool, False) - StartLoop(console, v) - - -if __name__ == '__main__': - main(sys.argv[1:]) diff --git a/util/ec3po/console_unittest.py b/util/ec3po/console_unittest.py deleted file mode 100755 index 3a44e0efce..0000000000 --- a/util/ec3po/console_unittest.py +++ /dev/null @@ -1,1569 +0,0 @@ -#!/usr/bin/env python -# Copyright 2015 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Unit tests for the EC-3PO Console interface.""" - -# Note: This is a py2/3 compatible file. - -from __future__ import print_function - -import binascii -import logging -import mock -import tempfile -import unittest - -import six - -from ec3po import console -from ec3po import interpreter -from ec3po import threadproc_shim - -ESC_STRING = six.int2byte(console.ControlKey.ESC) - -class Keys(object): - """A class that contains the escape sequences for special keys.""" - LEFT_ARROW = [console.ControlKey.ESC, ord('['), ord('D')] - RIGHT_ARROW = [console.ControlKey.ESC, ord('['), ord('C')] - UP_ARROW = [console.ControlKey.ESC, ord('['), ord('A')] - DOWN_ARROW = [console.ControlKey.ESC, ord('['), ord('B')] - HOME = [console.ControlKey.ESC, ord('['), ord('1'), ord('~')] - END = [console.ControlKey.ESC, ord('['), ord('8'), ord('~')] - DEL = [console.ControlKey.ESC, ord('['), ord('3'), ord('~')] - -class OutputStream(object): - """A class that has methods which return common console output.""" - - @staticmethod - def MoveCursorLeft(count): - """Produces what would be printed to the console if the cursor moved left. - - Args: - count: An integer representing how many columns to move left. - - Returns: - string: A string which contains what would be printed to the console if - the cursor moved left. - """ - string = ESC_STRING - string += b'[' + str(count).encode('ascii') + b'D' - return string - - @staticmethod - def MoveCursorRight(count): - """Produces what would be printed to the console if the cursor moved right. - - Args: - count: An integer representing how many columns to move right. - - Returns: - string: A string which contains what would be printed to the console if - the cursor moved right. - """ - string = ESC_STRING - string += b'[' + str(count).encode('ascii') + b'C' - return string - -BACKSPACE_STRING = b'' -# Move cursor left 1 column. -BACKSPACE_STRING += OutputStream.MoveCursorLeft(1) -# Write a space. -BACKSPACE_STRING += b' ' -# Move cursor left 1 column. -BACKSPACE_STRING += OutputStream.MoveCursorLeft(1) - -def BytesToByteList(string): - """Converts a bytes string to list of bytes. - - Args: - string: A literal bytes to turn into a list of bytes. - - Returns: - A list of integers representing the byte value of each character in the - string. - """ - if six.PY3: - return [c for c in string] - return [ord(c) for c in string] - -def CheckConsoleOutput(test_case, exp_console_out): - """Verify what was sent out the console matches what we expect. - - Args: - test_case: A unittest.TestCase object representing the current unit test. - exp_console_out: A string representing the console output stream. - """ - # Read what was sent out the console. - test_case.tempfile.seek(0) - console_out = test_case.tempfile.read() - - test_case.assertEqual(exp_console_out, console_out) - -def CheckInputBuffer(test_case, exp_input_buffer): - """Verify that the input buffer contains what we expect. - - Args: - test_case: A unittest.TestCase object representing the current unit test. - exp_input_buffer: A string containing the contents of the current input - buffer. - """ - test_case.assertEqual(exp_input_buffer, test_case.console.input_buffer, - (b'input buffer does not match expected.\n' - b'expected: |' + exp_input_buffer + b'|\n' - b'got: |' + test_case.console.input_buffer + - b'|\n' + str(test_case.console).encode('ascii'))) - -def CheckInputBufferPosition(test_case, exp_pos): - """Verify the input buffer position. - - Args: - test_case: A unittest.TestCase object representing the current unit test. - exp_pos: An integer representing the expected input buffer position. - """ - test_case.assertEqual(exp_pos, test_case.console.input_buffer_pos, - 'input buffer position is incorrect.\ngot: ' + - str(test_case.console.input_buffer_pos) + '\nexp: ' + - str(exp_pos) + '\n' + str(test_case.console)) - -def CheckHistoryBuffer(test_case, exp_history): - """Verify that the items in the history buffer are what we expect. - - Args: - test_case: A unittest.TestCase object representing the current unit test. - exp_history: A list of strings representing the expected contents of the - history buffer. - """ - # First, check to see if the length is what we expect. - test_case.assertEqual(len(exp_history), len(test_case.console.history), - ('The number of items in the history is unexpected.\n' - 'exp: ' + str(len(exp_history)) + '\n' - 'got: ' + str(len(test_case.console.history)) + '\n' - 'internal state:\n' + str(test_case.console))) - - # Next, check the actual contents of the history buffer. - for i in range(len(exp_history)): - test_case.assertEqual(exp_history[i], test_case.console.history[i], - (b'history buffer contents are incorrect.\n' - b'exp: ' + exp_history[i] + b'\n' - b'got: ' + test_case.console.history[i] + b'\n' - b'internal state:\n' + - str(test_case.console).encode('ascii'))) - - -class TestConsoleEditingMethods(unittest.TestCase): - """Test case to verify all console editing methods.""" - - def setUp(self): - """Setup the test harness.""" - # Setup logging with a timestamp, the module, and the log level. - logging.basicConfig(level=logging.DEBUG, - format=('%(asctime)s - %(module)s -' - ' %(levelname)s - %(message)s')) - - # Create a temp file and set both the master and slave PTYs to the file to - # create a loopback. - self.tempfile = tempfile.TemporaryFile() - - # Create some mock pipes. These won't be used since we'll mock out sends - # to the interpreter. - mock_pipe_end_0, mock_pipe_end_1 = threadproc_shim.Pipe() - self.console = console.Console(self.tempfile.fileno(), self.tempfile, - tempfile.TemporaryFile(), - mock_pipe_end_0, mock_pipe_end_1, "EC") - - # Console editing methods are only valid for enhanced EC images, therefore - # we have to assume that the "EC" we're talking to is enhanced. By default, - # the console believes that the EC it's communicating with is NOT enhanced - # which is why we have to override it here. - self.console.enhanced_ec = True - self.console.CheckForEnhancedECImage = mock.MagicMock(return_value=True) - - def test_EnteringChars(self): - """Verify that characters are echoed onto the console.""" - test_str = b'abc' - input_stream = BytesToByteList(test_str) - - # Send the characters in. - for byte in input_stream: - self.console.HandleChar(byte) - - # Check the input position. - exp_pos = len(test_str) - CheckInputBufferPosition(self, exp_pos) - - # Verify that the input buffer is correct. - expected_buffer = test_str - CheckInputBuffer(self, expected_buffer) - - # Check console output - exp_console_out = test_str - CheckConsoleOutput(self, exp_console_out) - - def test_EnteringDeletingMoreCharsThanEntered(self): - """Verify that we can press backspace more than we have entered chars.""" - test_str = b'spamspam' - input_stream = BytesToByteList(test_str) - - # Send the characters in. - for byte in input_stream: - self.console.HandleChar(byte) - - # Now backspace 1 more than what we sent. - input_stream = [] - for _ in range(len(test_str) + 1): - input_stream.append(console.ControlKey.BACKSPACE) - - # Send that sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # First, verify that input buffer position is 0. - CheckInputBufferPosition(self, 0) - - # Next, examine the output stream for the correct sequence. - exp_console_out = test_str - for _ in range(len(test_str)): - exp_console_out += BACKSPACE_STRING - - # Now, verify that we got what we expected. - CheckConsoleOutput(self, exp_console_out) - - def test_EnteringMoreThanCharLimit(self): - """Verify that we drop characters when the line is too long.""" - test_str = self.console.line_limit * b'o' # All allowed. - test_str += 5 * b'x' # All should be dropped. - input_stream = BytesToByteList(test_str) - - # Send the characters in. - for byte in input_stream: - self.console.HandleChar(byte) - - # First, we expect that input buffer position should be equal to the line - # limit. - exp_pos = self.console.line_limit - CheckInputBufferPosition(self, exp_pos) - - # The input buffer should only hold until the line limit. - exp_buffer = test_str[0:self.console.line_limit] - CheckInputBuffer(self, exp_buffer) - - # Lastly, check that the extra characters are not printed. - exp_console_out = exp_buffer - CheckConsoleOutput(self, exp_console_out) - - def test_ValidKeysOnLongLine(self): - """Verify that we can still press valid keys if the line is too long.""" - # Fill the line. - test_str = self.console.line_limit * b'o' - exp_console_out = test_str - # Try to fill it even more; these should all be dropped. - test_str += 5 * b'x' - input_stream = BytesToByteList(test_str) - - # We should be able to press the following keys: - # - Backspace - # - Arrow Keys/CTRL+B/CTRL+F/CTRL+P/CTRL+N - # - Delete - # - Home/CTRL+A - # - End/CTRL+E - # - Carriage Return - - # Backspace 1 character - input_stream.append(console.ControlKey.BACKSPACE) - exp_console_out += BACKSPACE_STRING - # Refill the line. - input_stream.extend(BytesToByteList(b'o')) - exp_console_out += b'o' - - # Left arrow key. - input_stream.extend(Keys.LEFT_ARROW) - exp_console_out += OutputStream.MoveCursorLeft(1) - - # Right arrow key. - input_stream.extend(Keys.RIGHT_ARROW) - exp_console_out += OutputStream.MoveCursorRight(1) - - # CTRL+B - input_stream.append(console.ControlKey.CTRL_B) - exp_console_out += OutputStream.MoveCursorLeft(1) - - # CTRL+F - input_stream.append(console.ControlKey.CTRL_F) - exp_console_out += OutputStream.MoveCursorRight(1) - - # Let's press enter now so we can test up and down. - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - exp_console_out += b'\r\n' + self.console.prompt - - # Up arrow key. - input_stream.extend(Keys.UP_ARROW) - exp_console_out += test_str[:self.console.line_limit] - - # Down arrow key. - input_stream.extend(Keys.DOWN_ARROW) - # Since the line was blank, we have to backspace the entire line. - exp_console_out += self.console.line_limit * BACKSPACE_STRING - - # CTRL+P - input_stream.append(console.ControlKey.CTRL_P) - exp_console_out += test_str[:self.console.line_limit] - - # CTRL+N - input_stream.append(console.ControlKey.CTRL_N) - # Since the line was blank, we have to backspace the entire line. - exp_console_out += self.console.line_limit * BACKSPACE_STRING - - # Press the Up arrow key to reprint the long line. - input_stream.extend(Keys.UP_ARROW) - exp_console_out += test_str[:self.console.line_limit] - - # Press the Home key to jump to the beginning of the line. - input_stream.extend(Keys.HOME) - exp_console_out += OutputStream.MoveCursorLeft(self.console.line_limit) - - # Press the End key to jump to the end of the line. - input_stream.extend(Keys.END) - exp_console_out += OutputStream.MoveCursorRight(self.console.line_limit) - - # Press CTRL+A to jump to the beginning of the line. - input_stream.append(console.ControlKey.CTRL_A) - exp_console_out += OutputStream.MoveCursorLeft(self.console.line_limit) - - # Press CTRL+E to jump to the end of the line. - input_stream.extend(Keys.END) - exp_console_out += OutputStream.MoveCursorRight(self.console.line_limit) - - # Move left one column so we can delete a character. - input_stream.extend(Keys.LEFT_ARROW) - exp_console_out += OutputStream.MoveCursorLeft(1) - - # Press the delete key. - input_stream.extend(Keys.DEL) - # This should look like a space, and then move cursor left 1 column since - # we're at the end of line. - exp_console_out += b' ' + OutputStream.MoveCursorLeft(1) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Verify everything happened correctly. - CheckConsoleOutput(self, exp_console_out) - - def test_BackspaceOnEmptyLine(self): - """Verify that we can backspace on an empty line with no bad effects.""" - # Send a single backspace. - test_str = [console.ControlKey.BACKSPACE] - - # Send the characters in. - for byte in test_str: - self.console.HandleChar(byte) - - # Check the input position. - exp_pos = 0 - CheckInputBufferPosition(self, exp_pos) - - # Check that buffer is empty. - exp_input_buffer = b'' - CheckInputBuffer(self, exp_input_buffer) - - # Check that the console output is empty. - exp_console_out = b'' - CheckConsoleOutput(self, exp_console_out) - - def test_BackspaceWithinLine(self): - """Verify that we shift the chars over when backspacing within a line.""" - # Misspell 'help' - test_str = b'heelp' - input_stream = BytesToByteList(test_str) - # Use the arrow key to go back to fix it. - # Move cursor left 1 column. - input_stream.extend(2*Keys.LEFT_ARROW) - # Backspace once to remove the extra 'e'. - input_stream.append(console.ControlKey.BACKSPACE) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Verify the input buffer - exp_input_buffer = b'help' - CheckInputBuffer(self, exp_input_buffer) - - # Verify the input buffer position. It should be at 2 (cursor over the 'l') - CheckInputBufferPosition(self, 2) - - # We expect the console output to be the test string, with two moves to the - # left, another move left, and then the rest of the line followed by a - # space. - exp_console_out = test_str - exp_console_out += 2 * OutputStream.MoveCursorLeft(1) - - # Move cursor left 1 column. - exp_console_out += OutputStream.MoveCursorLeft(1) - # Rest of the line and a space. (test_str in this case) - exp_console_out += b'lp ' - # Reset the cursor 2 + 1 to the left. - exp_console_out += OutputStream.MoveCursorLeft(3) - - # Verify console output. - CheckConsoleOutput(self, exp_console_out) - - def test_JumpToBeginningOfLineViaCtrlA(self): - """Verify that we can jump to the beginning of a line with Ctrl+A.""" - # Enter some chars and press CTRL+A - test_str = b'abc' - input_stream = BytesToByteList(test_str) + [console.ControlKey.CTRL_A] - - # Send the characters in. - for byte in input_stream: - self.console.HandleChar(byte) - - # We expect to see our test string followed by a move cursor left. - exp_console_out = test_str - exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) - - # Check to see what whas printed on the console. - CheckConsoleOutput(self, exp_console_out) - - # Check that the input buffer position is now 0. - CheckInputBufferPosition(self, 0) - - # Check input buffer still contains our test string. - CheckInputBuffer(self, test_str) - - def test_JumpToBeginningOfLineViaHomeKey(self): - """Jump to beginning of line via HOME key.""" - test_str = b'version' - input_stream = BytesToByteList(test_str) - input_stream.extend(Keys.HOME) - - # Send out the stream. - for byte in input_stream: - self.console.HandleChar(byte) - - # First, verify that input buffer position is now 0. - CheckInputBufferPosition(self, 0) - - # Next, verify that the input buffer did not change. - CheckInputBuffer(self, test_str) - - # Lastly, check that the cursor moved correctly. - exp_console_out = test_str - exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) - CheckConsoleOutput(self, exp_console_out) - - def test_JumpToEndOfLineViaEndKey(self): - """Jump to the end of the line using the END key.""" - test_str = b'version' - input_stream = BytesToByteList(test_str) - input_stream += [console.ControlKey.CTRL_A] - # Now, jump to the end of the line. - input_stream.extend(Keys.END) - - # Send out the stream. - for byte in input_stream: - self.console.HandleChar(byte) - - # Verify that the input buffer position is correct. This should be at the - # end of the test string. - CheckInputBufferPosition(self, len(test_str)) - - # The expected output should be the test string, followed by a jump to the - # beginning of the line, and lastly a jump to the end of the line. - exp_console_out = test_str - exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) - # Now the jump back to the end of the line. - exp_console_out += OutputStream.MoveCursorRight(len(test_str)) - - # Verify console output stream. - CheckConsoleOutput(self, exp_console_out) - - def test_JumpToEndOfLineViaCtrlE(self): - """Enter some chars and then try to jump to the end. (Should be a no-op)""" - test_str = b'sysinfo' - input_stream = BytesToByteList(test_str) - input_stream.append(console.ControlKey.CTRL_E) - - # Send out the stream - for byte in input_stream: - self.console.HandleChar(byte) - - # Verify that the input buffer position isn't any further than we expect. - # At this point, the position should be at the end of the test string. - CheckInputBufferPosition(self, len(test_str)) - - # Now, let's try to jump to the beginning and then jump back to the end. - input_stream = [console.ControlKey.CTRL_A, console.ControlKey.CTRL_E] - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Perform the same verification. - CheckInputBufferPosition(self, len(test_str)) - - # Lastly try to jump again, beyond the end. - input_stream = [console.ControlKey.CTRL_E] - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Perform the same verification. - CheckInputBufferPosition(self, len(test_str)) - - # We expect to see the test string, a jump to the beginning of the line, and - # one jump to the end of the line. - exp_console_out = test_str - # Jump to beginning. - exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) - # Jump back to end. - exp_console_out += OutputStream.MoveCursorRight(len(test_str)) - - # Verify the console output. - CheckConsoleOutput(self, exp_console_out) - - def test_MoveLeftWithArrowKey(self): - """Move cursor left one column with arrow key.""" - test_str = b'tastyspam' - input_stream = BytesToByteList(test_str) - input_stream.extend(Keys.LEFT_ARROW) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Verify that the input buffer position is 1 less than the length. - CheckInputBufferPosition(self, len(test_str) - 1) - - # Also, verify that the input buffer is not modified. - CheckInputBuffer(self, test_str) - - # We expect the test string, followed by a one column move left. - exp_console_out = test_str + OutputStream.MoveCursorLeft(1) - - # Verify console output. - CheckConsoleOutput(self, exp_console_out) - - def test_MoveLeftWithCtrlB(self): - """Move cursor back one column with Ctrl+B.""" - test_str = b'tastyspam' - input_stream = BytesToByteList(test_str) - input_stream.append(console.ControlKey.CTRL_B) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Verify that the input buffer position is 1 less than the length. - CheckInputBufferPosition(self, len(test_str) - 1) - - # Also, verify that the input buffer is not modified. - CheckInputBuffer(self, test_str) - - # We expect the test string, followed by a one column move left. - exp_console_out = test_str + OutputStream.MoveCursorLeft(1) - - # Verify console output. - CheckConsoleOutput(self, exp_console_out) - - def test_MoveRightWithArrowKey(self): - """Move cursor one column to the right with the arrow key.""" - test_str = b'version' - input_stream = BytesToByteList(test_str) - # Jump to beginning of line. - input_stream.append(console.ControlKey.CTRL_A) - # Press right arrow key. - input_stream.extend(Keys.RIGHT_ARROW) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Verify that the input buffer position is 1. - CheckInputBufferPosition(self, 1) - - # Also, verify that the input buffer is not modified. - CheckInputBuffer(self, test_str) - - # We expect the test string, followed by a jump to the beginning of the - # line, and finally a move right 1. - exp_console_out = test_str + OutputStream.MoveCursorLeft(len((test_str))) - - # A move right 1 column. - exp_console_out += OutputStream.MoveCursorRight(1) - - # Verify console output. - CheckConsoleOutput(self, exp_console_out) - - def test_MoveRightWithCtrlF(self): - """Move cursor forward one column with Ctrl+F.""" - test_str = b'panicinfo' - input_stream = BytesToByteList(test_str) - input_stream.append(console.ControlKey.CTRL_A) - # Now, move right one column. - input_stream.append(console.ControlKey.CTRL_F) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Verify that the input buffer position is 1. - CheckInputBufferPosition(self, 1) - - # Also, verify that the input buffer is not modified. - CheckInputBuffer(self, test_str) - - # We expect the test string, followed by a jump to the beginning of the - # line, and finally a move right 1. - exp_console_out = test_str + OutputStream.MoveCursorLeft(len((test_str))) - - # A move right 1 column. - exp_console_out += OutputStream.MoveCursorRight(1) - - # Verify console output. - CheckConsoleOutput(self, exp_console_out) - - def test_ImpossibleMoveLeftWithArrowKey(self): - """Verify that we can't move left at the beginning of the line.""" - # We shouldn't be able to move left if we're at the beginning of the line. - input_stream = Keys.LEFT_ARROW - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Nothing should have been output. - exp_console_output = b'' - CheckConsoleOutput(self, exp_console_output) - - # The input buffer position should still be 0. - CheckInputBufferPosition(self, 0) - - # The input buffer itself should be empty. - CheckInputBuffer(self, b'') - - def test_ImpossibleMoveRightWithArrowKey(self): - """Verify that we can't move right at the end of the line.""" - # We shouldn't be able to move right if we're at the end of the line. - input_stream = Keys.RIGHT_ARROW - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Nothing should have been output. - exp_console_output = b'' - CheckConsoleOutput(self, exp_console_output) - - # The input buffer position should still be 0. - CheckInputBufferPosition(self, 0) - - # The input buffer itself should be empty. - CheckInputBuffer(self, b'') - - def test_KillEntireLine(self): - """Verify that we can kill an entire line with Ctrl+K.""" - test_str = b'accelinfo on' - input_stream = BytesToByteList(test_str) - # Jump to beginning of line and then kill it with Ctrl+K. - input_stream.extend([console.ControlKey.CTRL_A, console.ControlKey.CTRL_K]) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # First, we expect that the input buffer is empty. - CheckInputBuffer(self, b'') - - # The buffer position should be 0. - CheckInputBufferPosition(self, 0) - - # What we expect to see on the console stream should be the following. The - # test string, a jump to the beginning of the line, then jump back to the - # end of the line and replace the line with spaces. - exp_console_out = test_str - # Jump to beginning of line. - exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) - # Jump to end of line. - exp_console_out += OutputStream.MoveCursorRight(len(test_str)) - # Replace line with spaces, which looks like backspaces. - for _ in range(len(test_str)): - exp_console_out += BACKSPACE_STRING - - # Verify the console output. - CheckConsoleOutput(self, exp_console_out) - - def test_KillPartialLine(self): - """Verify that we can kill a portion of a line.""" - test_str = b'accelread 0 1' - input_stream = BytesToByteList(test_str) - len_to_kill = 5 - for _ in range(len_to_kill): - # Move cursor left - input_stream.extend(Keys.LEFT_ARROW) - # Now kill - input_stream.append(console.ControlKey.CTRL_K) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # First, check that the input buffer was truncated. - exp_input_buffer = test_str[:-len_to_kill] - CheckInputBuffer(self, exp_input_buffer) - - # Verify the input buffer position. - CheckInputBufferPosition(self, len(test_str) - len_to_kill) - - # The console output stream that we expect is the test string followed by a - # move left of len_to_kill, then a jump to the end of the line and backspace - # of len_to_kill. - exp_console_out = test_str - for _ in range(len_to_kill): - # Move left 1 column. - exp_console_out += OutputStream.MoveCursorLeft(1) - # Then jump to the end of the line - exp_console_out += OutputStream.MoveCursorRight(len_to_kill) - # Backspace of len_to_kill - for _ in range(len_to_kill): - exp_console_out += BACKSPACE_STRING - - # Verify console output. - CheckConsoleOutput(self, exp_console_out) - - def test_InsertingCharacters(self): - """Verify that we can insert characters within the line.""" - test_str = b'accel 0 1' # Here we forgot the 'read' part in 'accelread' - input_stream = BytesToByteList(test_str) - # We need to move over to the 'l' and add read. - insertion_point = test_str.find(b'l') + 1 - for i in range(len(test_str) - insertion_point): - # Move cursor left. - input_stream.extend(Keys.LEFT_ARROW) - # Now, add in 'read' - added_str = b'read' - input_stream.extend(BytesToByteList(added_str)) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # First, verify that the input buffer is correct. - exp_input_buffer = test_str[:insertion_point] + added_str - exp_input_buffer += test_str[insertion_point:] - CheckInputBuffer(self, exp_input_buffer) - - # Verify that the input buffer position is correct. - exp_input_buffer_pos = insertion_point + len(added_str) - CheckInputBufferPosition(self, exp_input_buffer_pos) - - # The console output stream that we expect is the test string, followed by - # move cursor left until the 'l' was found, the added test string while - # shifting characters around. - exp_console_out = test_str - for i in range(len(test_str) - insertion_point): - # Move cursor left. - exp_console_out += OutputStream.MoveCursorLeft(1) - - # Now for each character, write the rest of the line will be shifted to the - # right one column. - for i in range(len(added_str)): - # Printed character. - exp_console_out += added_str[i:i+1] - # The rest of the line - exp_console_out += test_str[insertion_point:] - # Reset the cursor back left - reset_dist = len(test_str[insertion_point:]) - exp_console_out += OutputStream.MoveCursorLeft(reset_dist) - - # Verify the console output. - CheckConsoleOutput(self, exp_console_out) - - def test_StoreCommandHistory(self): - """Verify that entered commands are stored in the history.""" - test_commands = [] - test_commands.append(b'help') - test_commands.append(b'version') - test_commands.append(b'accelread 0 1') - input_stream = [] - for c in test_commands: - input_stream.extend(BytesToByteList(c)) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # We expect to have the test commands in the history buffer. - exp_history_buf = test_commands - CheckHistoryBuffer(self, exp_history_buf) - - def test_CycleUpThruCommandHistory(self): - """Verify that the UP arrow key will print itmes in the history buffer.""" - # Enter some commands. - test_commands = [b'version', b'accelrange 0', b'battery', b'gettime'] - input_stream = [] - for command in test_commands: - input_stream.extend(BytesToByteList(command)) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Now, hit the UP arrow key to print the previous entries. - for i in range(len(test_commands)): - input_stream.extend(Keys.UP_ARROW) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # The expected output should be test commands with prompts printed in - # between, followed by line kills with the previous test commands printed. - exp_console_out = b'' - for i in range(len(test_commands)): - exp_console_out += test_commands[i] + b'\r\n' + self.console.prompt - - # When we press up, the line should be cleared and print the previous buffer - # entry. - for i in range(len(test_commands)-1, 0, -1): - exp_console_out += test_commands[i] - # Backspace to the beginning. - for i in range(len(test_commands[i])): - exp_console_out += BACKSPACE_STRING - - # The last command should just be printed out with no backspacing. - exp_console_out += test_commands[0] - - # Now, verify. - CheckConsoleOutput(self, exp_console_out) - - def test_UpArrowOnEmptyHistory(self): - """Ensure nothing happens if the history is empty.""" - # Press the up arrow key twice. - input_stream = 2 * Keys.UP_ARROW - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # We expect nothing to have happened. - exp_console_out = b'' - exp_input_buffer = b'' - exp_input_buffer_pos = 0 - exp_history_buf = [] - - # Verify. - CheckConsoleOutput(self, exp_console_out) - CheckInputBufferPosition(self, exp_input_buffer_pos) - CheckInputBuffer(self, exp_input_buffer) - CheckHistoryBuffer(self, exp_history_buf) - - def test_UpArrowDoesNotGoOutOfBounds(self): - """Verify that pressing the up arrow many times won't go out of bounds.""" - # Enter one command. - test_str = b'help version' - input_stream = BytesToByteList(test_str) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - # Then press the up arrow key twice. - input_stream.extend(2 * Keys.UP_ARROW) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Verify that the history buffer is correct. - exp_history_buf = [test_str] - CheckHistoryBuffer(self, exp_history_buf) - - # We expect that the console output should only contain our entered command, - # a new prompt, and then our command aggain. - exp_console_out = test_str + b'\r\n' + self.console.prompt - # Pressing up should reprint the command we entered. - exp_console_out += test_str - - # Verify. - CheckConsoleOutput(self, exp_console_out) - - def test_CycleDownThruCommandHistory(self): - """Verify that we can select entries by hitting the down arrow.""" - # Enter at least 4 commands. - test_commands = [b'version', b'accelrange 0', b'battery', b'gettime'] - input_stream = [] - for command in test_commands: - input_stream.extend(BytesToByteList(command)) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Now, hit the UP arrow key twice to print the previous two entries. - for i in range(2): - input_stream.extend(Keys.UP_ARROW) - - # Now, hit the DOWN arrow key twice to print the newer entries. - input_stream.extend(2*Keys.DOWN_ARROW) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # The expected output should be commands that we entered, followed by - # prompts, then followed by our last two commands in reverse. Then, we - # should see the last entry in the list, followed by the saved partial cmd - # of a blank line. - exp_console_out = b'' - for i in range(len(test_commands)): - exp_console_out += test_commands[i] + b'\r\n' + self.console.prompt - - # When we press up, the line should be cleared and print the previous buffer - # entry. - for i in range(len(test_commands)-1, 1, -1): - exp_console_out += test_commands[i] - # Backspace to the beginning. - for i in range(len(test_commands[i])): - exp_console_out += BACKSPACE_STRING - - # When we press down, it should have cleared the last command (which we - # covered with the previous for loop), and then prints the next command. - exp_console_out += test_commands[3] - for i in range(len(test_commands[3])): - exp_console_out += BACKSPACE_STRING - - # Verify console output. - CheckConsoleOutput(self, exp_console_out) - - # Verify input buffer. - exp_input_buffer = b'' # Empty because our partial command was empty. - exp_input_buffer_pos = len(exp_input_buffer) - CheckInputBuffer(self, exp_input_buffer) - CheckInputBufferPosition(self, exp_input_buffer_pos) - - def test_SavingPartialCommandWhenNavigatingHistory(self): - """Verify that partial commands are saved when navigating history.""" - # Enter a command. - test_str = b'accelinfo' - input_stream = BytesToByteList(test_str) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Enter a partial command. - partial_cmd = b'ver' - input_stream.extend(BytesToByteList(partial_cmd)) - - # Hit the UP arrow key. - input_stream.extend(Keys.UP_ARROW) - # Then, the DOWN arrow key. - input_stream.extend(Keys.DOWN_ARROW) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # The expected output should be the command we entered, a prompt, the - # partial command, clearing of the partial command, the command entered, - # clearing of the command entered, and then the partial command. - exp_console_out = test_str + b'\r\n' + self.console.prompt - exp_console_out += partial_cmd - for _ in range(len(partial_cmd)): - exp_console_out += BACKSPACE_STRING - exp_console_out += test_str - for _ in range(len(test_str)): - exp_console_out += BACKSPACE_STRING - exp_console_out += partial_cmd - - # Verify console output. - CheckConsoleOutput(self, exp_console_out) - - # Verify input buffer. - exp_input_buffer = partial_cmd - exp_input_buffer_pos = len(exp_input_buffer) - CheckInputBuffer(self, exp_input_buffer) - CheckInputBufferPosition(self, exp_input_buffer_pos) - - def test_DownArrowOnEmptyHistory(self): - """Ensure nothing happens if the history is empty.""" - # Then press the up down arrow twice. - input_stream = 2 * Keys.DOWN_ARROW - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # We expect nothing to have happened. - exp_console_out = b'' - exp_input_buffer = b'' - exp_input_buffer_pos = 0 - exp_history_buf = [] - - # Verify. - CheckConsoleOutput(self, exp_console_out) - CheckInputBufferPosition(self, exp_input_buffer_pos) - CheckInputBuffer(self, exp_input_buffer) - CheckHistoryBuffer(self, exp_history_buf) - - def test_DeleteCharsUsingDELKey(self): - """Verify that we can delete characters using the DEL key.""" - test_str = b'version' - input_stream = BytesToByteList(test_str) - - # Hit the left arrow key 2 times. - input_stream.extend(2 * Keys.LEFT_ARROW) - - # Press the DEL key. - input_stream.extend(Keys.DEL) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # The expected output should be the command we entered, 2 individual cursor - # moves to the left, and then removing a char and shifting everything to the - # left one column. - exp_console_out = test_str - exp_console_out += 2 * OutputStream.MoveCursorLeft(1) - - # Remove the char by shifting everything to the left one, slicing out the - # remove char. - exp_console_out += test_str[-1:] + b' ' - - # Reset the cursor by moving back 2 columns because of the 'n' and space. - exp_console_out += OutputStream.MoveCursorLeft(2) - - # Verify console output. - CheckConsoleOutput(self, exp_console_out) - - # Verify input buffer. The input buffer should have the char sliced out and - # be positioned where the char was removed. - exp_input_buffer = test_str[:-2] + test_str[-1:] - exp_input_buffer_pos = len(exp_input_buffer) - 1 - CheckInputBuffer(self, exp_input_buffer) - CheckInputBufferPosition(self, exp_input_buffer_pos) - - def test_RepeatedCommandInHistory(self): - """Verify that we don't store 2 consecutive identical commands in history""" - # Enter a few commands. - test_commands = [b'version', b'accelrange 0', b'battery', b'gettime'] - # Repeat the last command. - test_commands.append(test_commands[len(test_commands)-1]) - - input_stream = [] - for command in test_commands: - input_stream.extend(BytesToByteList(command)) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Verify that the history buffer is correct. The last command, since - # it was repeated, should not have been added to the history. - exp_history_buf = test_commands[0:len(test_commands)-1] - CheckHistoryBuffer(self, exp_history_buf) - - -class TestConsoleCompatibility(unittest.TestCase): - """Verify that console can speak to enhanced and non-enhanced EC images.""" - def setUp(self): - """Setup the test harness.""" - # Setup logging with a timestamp, the module, and the log level. - logging.basicConfig(level=logging.DEBUG, - format=('%(asctime)s - %(module)s -' - ' %(levelname)s - %(message)s')) - # Create a temp file and set both the master and slave PTYs to the file to - # create a loopback. - self.tempfile = tempfile.TemporaryFile() - - # Mock out the pipes. - mock_pipe_end_0, mock_pipe_end_1 = mock.MagicMock(), mock.MagicMock() - self.console = console.Console(self.tempfile.fileno(), self.tempfile, - tempfile.TemporaryFile(), - mock_pipe_end_0, mock_pipe_end_1, "EC") - - @mock.patch('ec3po.console.Console.CheckForEnhancedECImage') - def test_ActAsPassThruInNonEnhancedMode(self, mock_check): - """Verify we simply pass everything thru to non-enhanced ECs. - - Args: - mock_check: A MagicMock object replacing the CheckForEnhancedECImage() - method. - """ - # Set the interrogation mode to always so that we actually interrogate. - self.console.interrogation_mode = b'always' - - # Assume EC interrogations indicate that the image is non-enhanced. - mock_check.return_value = False - - # Press enter, followed by the command, and another enter. - input_stream = [] - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - test_command = b'version' - input_stream.extend(BytesToByteList(test_command)) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Expected calls to send down the pipe would be each character of the test - # command. - expected_calls = [] - expected_calls.append(mock.call( - six.int2byte(console.ControlKey.CARRIAGE_RETURN))) - for char in test_command: - if six.PY3: - expected_calls.append(mock.call(bytes([char]))) - else: - expected_calls.append(mock.call(char)) - expected_calls.append(mock.call( - six.int2byte(console.ControlKey.CARRIAGE_RETURN))) - - # Verify that the calls happened. - self.console.cmd_pipe.send.assert_has_calls(expected_calls) - - # Since we're acting as a pass-thru, the input buffer should be empty and - # input_buffer_pos is 0. - CheckInputBuffer(self, b'') - CheckInputBufferPosition(self, 0) - - @mock.patch('ec3po.console.Console.CheckForEnhancedECImage') - def test_TransitionFromNonEnhancedToEnhanced(self, mock_check): - """Verify that we transition correctly to enhanced mode. - - Args: - mock_check: A MagicMock object replacing the CheckForEnhancedECImage() - method. - """ - # Set the interrogation mode to always so that we actually interrogate. - self.console.interrogation_mode = b'always' - - # First, assume that the EC interrogations indicate an enhanced EC image. - mock_check.return_value = True - # But our current knowledge of the EC image (which was actually the - # 'previous' EC) was a non-enhanced image. - self.console.enhanced_ec = False - - test_command = b'sysinfo' - input_stream = [] - input_stream.extend(BytesToByteList(test_command)) - - expected_calls = [] - # All keystrokes to the console should be directed straight through to the - # EC until we press the enter key. - for char in test_command: - if six.PY3: - expected_calls.append(mock.call(bytes([char]))) - else: - expected_calls.append(mock.call(char)) - - # Press the enter key. - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - # The enter key should not be sent to the pipe since we should negotiate - # to an enhanced EC image. - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # At this point, we should have negotiated to enhanced. - self.assertTrue(self.console.enhanced_ec, msg=('Did not negotiate to ' - 'enhanced EC image.')) - - # The command would have been dropped however, so verify this... - CheckInputBuffer(self, b'') - CheckInputBufferPosition(self, 0) - # ...and repeat the command. - input_stream = BytesToByteList(test_command) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Since we're enhanced now, we should have sent the entire command as one - # string with no trailing carriage return - expected_calls.append(mock.call(test_command)) - - # Verify all of the calls. - self.console.cmd_pipe.send.assert_has_calls(expected_calls) - - @mock.patch('ec3po.console.Console.CheckForEnhancedECImage') - def test_TransitionFromEnhancedToNonEnhanced(self, mock_check): - """Verify that we transition correctly to non-enhanced mode. - - Args: - mock_check: A MagicMock object replacing the CheckForEnhancedECImage() - method. - """ - # Set the interrogation mode to always so that we actually interrogate. - self.console.interrogation_mode = b'always' - - # First, assume that the EC interrogations indicate an non-enhanced EC - # image. - mock_check.return_value = False - # But our current knowledge of the EC image (which was actually the - # 'previous' EC) was an enhanced image. - self.console.enhanced_ec = True - - test_command = b'sysinfo' - input_stream = [] - input_stream.extend(BytesToByteList(test_command)) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # But, we will negotiate to non-enhanced however, dropping this command. - # Verify this. - self.assertFalse(self.console.enhanced_ec, msg=('Did not negotiate to' - 'non-enhanced EC image.')) - CheckInputBuffer(self, b'') - CheckInputBufferPosition(self, 0) - - # The carriage return should have passed through though. - expected_calls = [] - expected_calls.append(mock.call( - six.int2byte(console.ControlKey.CARRIAGE_RETURN))) - - # Since the command was dropped, repeat the command. - input_stream = BytesToByteList(test_command) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Since we're not enhanced now, we should have sent each character in the - # entire command separately and a carriage return. - for char in test_command: - if six.PY3: - expected_calls.append(mock.call(bytes([char]))) - else: - expected_calls.append(mock.call(char)) - expected_calls.append(mock.call( - six.int2byte(console.ControlKey.CARRIAGE_RETURN))) - - # Verify all of the calls. - self.console.cmd_pipe.send.assert_has_calls(expected_calls) - - def test_EnhancedCheckIfTimedOut(self): - """Verify that the check returns false if it times out.""" - # Make the debug pipe "time out". - self.console.dbg_pipe.poll.return_value = False - self.assertFalse(self.console.CheckForEnhancedECImage()) - - def test_EnhancedCheckIfACKReceived(self): - """Verify that the check returns true if the ACK is received.""" - # Make the debug pipe return EC_ACK. - self.console.dbg_pipe.poll.return_value = True - self.console.dbg_pipe.recv.return_value = interpreter.EC_ACK - self.assertTrue(self.console.CheckForEnhancedECImage()) - - def test_EnhancedCheckIfWrong(self): - """Verify that the check returns false if byte received is wrong.""" - # Make the debug pipe return the wrong byte. - self.console.dbg_pipe.poll.return_value = True - self.console.dbg_pipe.recv.return_value = b'\xff' - self.assertFalse(self.console.CheckForEnhancedECImage()) - - def test_EnhancedCheckUsingBuffer(self): - """Verify that given reboot output, enhanced EC images are detected.""" - enhanced_output_stream = b""" ---- UART initialized after reboot --- -[Reset cause: reset-pin soft] -[Image: RO, jerry_v1.1.4363-2af8572-dirty 2016-02-23 13:26:20 aaboagye@lithium.mtv.corp.google.com] -[0.001695 KB boot key 0] -[0.001790 Inits done] -[0.001923 not sysjump; forcing AP shutdown] -[0.002047 EC triggered warm reboot] -[0.002155 assert GPIO_PMIC_WARM_RESET_L for 4 ms] -[0.006326 auto_power_on set due to reset_flag 0x22] -[0.006477 Wait for battery stabilized during 1000000] -[0.007368 battery responded with status c0] -[0.009099 hash start 0x00010000 0x0000eb7c] -[0.009307 KB init state: -- -- -- -- -- -- -- -- -- -- -- -- --] -[0.009531 KB wait] -Enhanced Console is enabled (v1.0.0); type HELP for help. -> [0.009782 event set 0x00002000] -[0.009903 hostcmd init 0x2000] -[0.010031 power state 0 = G3, in 0x0000] -[0.010173 power state 4 = G3->S5, in 0x0000] -[0.010324 power state 1 = S5, in 0x0000] -[0.010466 power on 2] -[0.010566 power state 5 = S5->S3, in 0x0000] -[0.037713 event set 0x00000080] -[0.037836 event set 0x00400000] -[0.038675 Battery 89% / 1092h:15 to empty] -[0.224060 hash done 41dac382e3a6e3d2ea5b4d789c1bc46525cae7cc5ff6758f0de8d8369b506f57] -[0.375150 POWER_GOOD seen] -""" - for line in enhanced_output_stream.split(b'\n'): - self.console.CheckBufferForEnhancedImage(line) - - # Since the enhanced console string was present in the output, the console - # should have caught it. - self.assertTrue(self.console.enhanced_ec) - - # Also should check that the command was sent to the interpreter. - self.console.cmd_pipe.send.assert_called_once_with(b'enhanced True') - - # Now test the non-enhanced EC image. - self.console.cmd_pipe.reset_mock() - non_enhanced_output_stream = b""" ---- UART initialized after reboot --- -[Reset cause: reset-pin soft] -[Image: RO, jerry_v1.1.4363-2af8572-dirty 2016-02-23 13:03:15 aaboagye@lithium.mtv.corp.google.com] -[0.001695 KB boot key 0] -[0.001790 Inits done] -[0.001923 not sysjump; forcing AP shutdown] -[0.002047 EC triggered warm reboot] -[0.002156 assert GPIO_PMIC_WARM_RESET_L for 4 ms] -[0.006326 auto_power_on set due to reset_flag 0x22] -[0.006477 Wait for battery stabilized during 1000000] -[0.007368 battery responded with status c0] -[0.008951 hash start 0x00010000 0x0000ed78] -[0.009159 KB init state: -- -- -- -- -- -- -- -- -- -- -- -- --] -[0.009383 KB wait] -Console is enabled; type HELP for help. -> [0.009602 event set 0x00002000] -[0.009722 hostcmd init 0x2000] -[0.009851 power state 0 = G3, in 0x0000] -[0.009993 power state 4 = G3->S5, in 0x0000] -[0.010144 power state 1 = S5, in 0x0000] -[0.010285 power on 2] -[0.010385 power state 5 = S5->S3, in 0x0000] -""" - for line in non_enhanced_output_stream.split(b'\n'): - self.console.CheckBufferForEnhancedImage(line) - - # Since the default console string is present in the output, it should be - # determined to be non enhanced now. - self.assertFalse(self.console.enhanced_ec) - - # Check that command was also sent to the interpreter. - self.console.cmd_pipe.send.assert_called_once_with(b'enhanced False') - - -class TestOOBMConsoleCommands(unittest.TestCase): - """Verify that OOBM console commands work correctly.""" - def setUp(self): - """Setup the test harness.""" - # Setup logging with a timestamp, the module, and the log level. - logging.basicConfig(level=logging.DEBUG, - format=('%(asctime)s - %(module)s -' - ' %(levelname)s - %(message)s')) - # Create a temp file and set both the master and slave PTYs to the file to - # create a loopback. - self.tempfile = tempfile.TemporaryFile() - - # Mock out the pipes. - mock_pipe_end_0, mock_pipe_end_1 = mock.MagicMock(), mock.MagicMock() - self.console = console.Console(self.tempfile.fileno(), self.tempfile, - tempfile.TemporaryFile(), - mock_pipe_end_0, mock_pipe_end_1, "EC") - self.console.oobm_queue = mock.MagicMock() - - @mock.patch('ec3po.console.Console.CheckForEnhancedECImage') - def test_InterrogateCommand(self, mock_check): - """Verify that 'interrogate' command works as expected. - - Args: - mock_check: A MagicMock object replacing the CheckForEnhancedECIMage() - method. - """ - input_stream = [] - expected_calls = [] - mock_check.side_effect = [False] - - # 'interrogate never' should disable the interrogation from happening at - # all. - cmd = b'interrogate never' - # Enter the OOBM prompt. - input_stream.extend(BytesToByteList(b'%')) - # Type the command - input_stream.extend(BytesToByteList(cmd)) - # Press enter. - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - input_stream = [] - - # The OOBM queue should have been called with the command being put. - expected_calls.append(mock.call.put(cmd)) - self.console.oobm_queue.assert_has_calls(expected_calls) - - # Process the OOBM queue. - self.console.oobm_queue.get.side_effect = [cmd] - self.console.ProcessOOBMQueue() - - # Type out a few commands. - input_stream.extend(BytesToByteList(b'version')) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - input_stream.extend(BytesToByteList(b'flashinfo')) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - input_stream.extend(BytesToByteList(b'sysinfo')) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # The Check function should NOT have been called at all. - mock_check.assert_not_called() - - # The EC image should be assumed to be not enhanced. - self.assertFalse(self.console.enhanced_ec, 'The image should be assumed to' - ' be NOT enhanced.') - - # Reset the mocks. - mock_check.reset_mock() - self.console.oobm_queue.reset_mock() - - # 'interrogate auto' should not interrogate at all. It should only be - # scanning the output stream for the 'console is enabled' strings. - cmd = b'interrogate auto' - # Enter the OOBM prompt. - input_stream.extend(BytesToByteList(b'%')) - # Type the command - input_stream.extend(BytesToByteList(cmd)) - # Press enter. - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - input_stream = [] - expected_calls = [] - - # The OOBM queue should have been called with the command being put. - expected_calls.append(mock.call.put(cmd)) - self.console.oobm_queue.assert_has_calls(expected_calls) - - # Process the OOBM queue. - self.console.oobm_queue.get.side_effect = [cmd] - self.console.ProcessOOBMQueue() - - # Type out a few commands. - input_stream.extend(BytesToByteList(b'version')) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - input_stream.extend(BytesToByteList(b'flashinfo')) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - input_stream.extend(BytesToByteList(b'sysinfo')) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # The Check function should NOT have been called at all. - mock_check.assert_not_called() - - # The EC image should be assumed to be not enhanced. - self.assertFalse(self.console.enhanced_ec, 'The image should be assumed to' - ' be NOT enhanced.') - - # Reset the mocks. - mock_check.reset_mock() - self.console.oobm_queue.reset_mock() - - # 'interrogate always' should, like its name implies, interrogate always - # after each press of the enter key. This was the former way of doing - # interrogation. - cmd = b'interrogate always' - # Enter the OOBM prompt. - input_stream.extend(BytesToByteList(b'%')) - # Type the command - input_stream.extend(BytesToByteList(cmd)) - # Press enter. - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - input_stream = [] - expected_calls = [] - - # The OOBM queue should have been called with the command being put. - expected_calls.append(mock.call.put(cmd)) - self.console.oobm_queue.assert_has_calls(expected_calls) - - # Process the OOBM queue. - self.console.oobm_queue.get.side_effect = [cmd] - self.console.ProcessOOBMQueue() - - # The Check method should be called 3 times here. - mock_check.side_effect = [False, False, False] - - # Type out a few commands. - input_stream.extend(BytesToByteList(b'help list')) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - input_stream.extend(BytesToByteList(b'taskinfo')) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - input_stream.extend(BytesToByteList(b'hibdelay')) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # The Check method should have been called 3 times here. - expected_calls = [mock.call(), mock.call(), mock.call()] - mock_check.assert_has_calls(expected_calls) - - # The EC image should be assumed to be not enhanced. - self.assertFalse(self.console.enhanced_ec, 'The image should be assumed to' - ' be NOT enhanced.') - - # Now, let's try to assume that the image is enhanced while still disabling - # interrogation. - mock_check.reset_mock() - self.console.oobm_queue.reset_mock() - input_stream = [] - cmd = b'interrogate never enhanced' - # Enter the OOBM prompt. - input_stream.extend(BytesToByteList(b'%')) - # Type the command - input_stream.extend(BytesToByteList(cmd)) - # Press enter. - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - input_stream = [] - expected_calls = [] - - # The OOBM queue should have been called with the command being put. - expected_calls.append(mock.call.put(cmd)) - self.console.oobm_queue.assert_has_calls(expected_calls) - - # Process the OOBM queue. - self.console.oobm_queue.get.side_effect = [cmd] - self.console.ProcessOOBMQueue() - - # Type out a few commands. - input_stream.extend(BytesToByteList(b'chgstate')) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - input_stream.extend(BytesToByteList(b'hash')) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - input_stream.extend(BytesToByteList(b'sysjump rw')) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # The check method should have never been called. - mock_check.assert_not_called() - - # The EC image should be assumed to be enhanced. - self.assertTrue(self.console.enhanced_ec, 'The image should be' - ' assumed to be enhanced.') - - -if __name__ == '__main__': - unittest.main() diff --git a/util/ec3po/interpreter.py b/util/ec3po/interpreter.py deleted file mode 100644 index 4e151083bd..0000000000 --- a/util/ec3po/interpreter.py +++ /dev/null @@ -1,467 +0,0 @@ -# Copyright 2015 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""EC-3PO EC Interpreter - -interpreter provides the interpretation layer between the EC UART and the user. -It receives commands through its command pipe, formats the commands for the EC, -and sends the command to the EC. It also presents data from the EC to either be -displayed via the interactive console interface, or some other consumer. It -additionally supports automatic command retrying if the EC drops a character in -a command. -""" - -# Note: This is a py2/3 compatible file. - -from __future__ import print_function - -import binascii -import copy -import logging -import os -import select -import traceback - -import six - - -COMMAND_RETRIES = 3 # Number of attempts to retry a command. -EC_MAX_READ = 1024 # Max bytes to read at a time from the EC. -EC_SYN = b'\xec' # Byte indicating EC interrogation. -EC_ACK = b'\xc0' # Byte representing correct EC response to interrogation. - - -class LoggerAdapter(logging.LoggerAdapter): - """Class which provides a small adapter for the logger.""" - - def process(self, msg, kwargs): - """Prepends the served PTY to the beginning of the log message.""" - return '%s - %s' % (self.extra['pty'], msg), kwargs - - -class Interpreter(object): - """Class which provides the interpretation layer between the EC and user. - - This class essentially performs all of the intepretation for the EC and the - user. It handles all of the automatic command retrying as well as the - formation of commands for EC images which support that. - - Attributes: - logger: A logger for this module. - ec_uart_pty: An opened file object to the raw EC UART PTY. - ec_uart_pty_name: A string containing the name of the raw EC UART PTY. - cmd_pipe: A socket.socket or multiprocessing.Connection object which - represents the Interpreter side of the command pipe. This must be a - bidirectional pipe. Commands and responses will utilize this pipe. - dbg_pipe: A socket.socket or multiprocessing.Connection object which - represents the Interpreter side of the debug pipe. This must be a - unidirectional pipe with write capabilities. EC debug output will utilize - this pipe. - cmd_retries: An integer representing the number of attempts the console - should retry commands if it receives an error. - log_level: An integer representing the numeric value of the log level. - inputs: A list of objects that the intpreter selects for reading. - Initially, these are the EC UART and the command pipe. - outputs: A list of objects that the interpreter selects for writing. - ec_cmd_queue: A FIFO queue used for sending commands down to the EC UART. - last_cmd: A string that represents the last command sent to the EC. If an - error is encountered, the interpreter will attempt to retry this command - up to COMMAND_RETRIES. - enhanced_ec: A boolean indicating if the EC image that we are currently - communicating with is enhanced or not. Enhanced EC images will support - packed commands and host commands over the UART. This defaults to False - and is changed depending on the result of an interrogation. - interrogating: A boolean indicating if we are in the middle of interrogating - the EC. - connected: A boolean indicating if the interpreter is actually connected to - the UART and listening. - """ - def __init__(self, ec_uart_pty, cmd_pipe, dbg_pipe, log_level=logging.INFO, - name=None): - """Intializes an Interpreter object with the provided args. - - Args: - ec_uart_pty: A string representing the EC UART to connect to. - cmd_pipe: A socket.socket or multiprocessing.Connection object which - represents the Interpreter side of the command pipe. This must be a - bidirectional pipe. Commands and responses will utilize this pipe. - dbg_pipe: A socket.socket or multiprocessing.Connection object which - represents the Interpreter side of the debug pipe. This must be a - unidirectional pipe with write capabilities. EC debug output will - utilize this pipe. - cmd_retries: An integer representing the number of attempts the console - should retry commands if it receives an error. - log_level: An optional integer representing the numeric value of the log - level. By default, the log level will be logging.INFO (20). - name: the console source name - """ - # Create a unique logger based on the interpreter name - interpreter_prefix = ('%s - ' % name) if name else '' - logger = logging.getLogger('%sEC3PO.Interpreter' % interpreter_prefix) - self.logger = LoggerAdapter(logger, {'pty': ec_uart_pty}) - # TODO(https://crbug.com/1162189): revist the 2 TODOs below - # TODO(https://bugs.python.org/issue27805, python3.7+): revert to ab+ - # TODO(https://bugs.python.org/issue20074): removing buffering=0 if/when - # that gets fixed, or keep two pty: one for reading and one for writing - self.ec_uart_pty = open(ec_uart_pty, 'r+b', buffering=0) - self.ec_uart_pty_name = ec_uart_pty - self.cmd_pipe = cmd_pipe - self.dbg_pipe = dbg_pipe - self.cmd_retries = COMMAND_RETRIES - self.log_level = log_level - self.inputs = [self.ec_uart_pty, self.cmd_pipe] - self.outputs = [] - self.ec_cmd_queue = six.moves.queue.Queue() - self.last_cmd = b'' - self.enhanced_ec = False - self.interrogating = False - self.connected = True - - def __str__(self): - """Show internal state of the Interpreter object. - - Returns: - A string that shows the values of the attributes. - """ - string = [] - string.append('%r' % self) - string.append('ec_uart_pty: %s' % self.ec_uart_pty) - string.append('cmd_pipe: %r' % self.cmd_pipe) - string.append('dbg_pipe: %r' % self.dbg_pipe) - string.append('cmd_retries: %d' % self.cmd_retries) - string.append('log_level: %d' % self.log_level) - string.append('inputs: %r' % self.inputs) - string.append('outputs: %r' % self.outputs) - string.append('ec_cmd_queue: %r' % self.ec_cmd_queue) - string.append('last_cmd: \'%s\'' % self.last_cmd) - string.append('enhanced_ec: %r' % self.enhanced_ec) - string.append('interrogating: %r' % self.interrogating) - return '\n'.join(string) - - def EnqueueCmd(self, command): - """Enqueue a command to be sent to the EC UART. - - Args: - command: A string which contains the command to be sent. - """ - self.ec_cmd_queue.put(command) - self.logger.log(1, 'Commands now in queue: %d', self.ec_cmd_queue.qsize()) - - # Add the EC UART as an output to be serviced. - if self.connected and self.ec_uart_pty not in self.outputs: - self.outputs.append(self.ec_uart_pty) - - def PackCommand(self, raw_cmd): - r"""Packs a command for use with error checking. - - For error checking, we pack console commands in a particular format. The - format is as follows: - - &&[x][x][x][x]&{cmd}\n\n - ^ ^ ^^ ^^ ^ ^-- 2 newlines. - | | || || |-- the raw console command. - | | || ||-- 1 ampersand. - | | ||____|--- 2 hex digits representing the CRC8 of cmd. - | |____|-- 2 hex digits reprsenting the length of cmd. - |-- 2 ampersands - - Args: - raw_cmd: A pre-packed string which contains the raw command. - - Returns: - A string which contains the packed command. - """ - # Don't pack a single carriage return. - if raw_cmd != b'\r': - # The command format is as follows. - # &&[x][x][x][x]&{cmd}\n\n - packed_cmd = [] - packed_cmd.append(b'&&') - # The first pair of hex digits are the length of the command. - packed_cmd.append(b'%02x' % len(raw_cmd)) - # Then the CRC8 of cmd. - packed_cmd.append(b'%02x' % Crc8(raw_cmd)) - packed_cmd.append(b'&') - # Now, the raw command followed by 2 newlines. - packed_cmd.append(raw_cmd) - packed_cmd.append(b'\n\n') - return b''.join(packed_cmd) - else: - return raw_cmd - - def ProcessCommand(self, command): - """Captures the input determines what actions to take. - - Args: - command: A string representing the command sent by the user. - """ - if command == b'disconnect': - if self.connected: - self.logger.debug('UART disconnect request.') - # Drop all pending commands if any. - while not self.ec_cmd_queue.empty(): - c = self.ec_cmd_queue.get() - self.logger.debug('dropped: \'%s\'', c) - if self.enhanced_ec: - # Reset retry state. - self.cmd_retries = COMMAND_RETRIES - self.last_cmd = b'' - # Get the UART that the interpreter is attached to. - fileobj = self.ec_uart_pty - self.logger.debug('fileobj: %r', fileobj) - # Remove the descriptor from the inputs and outputs. - self.inputs.remove(fileobj) - if fileobj in self.outputs: - self.outputs.remove(fileobj) - self.logger.debug('Removed fileobj. Remaining inputs: %r', self.inputs) - # Close the file. - fileobj.close() - # Mark the interpreter as disconnected now. - self.connected = False - self.logger.debug('Disconnected from %s.', self.ec_uart_pty_name) - return - - elif command == b'reconnect': - if not self.connected: - self.logger.debug('UART reconnect request.') - # Reopen the PTY. - # TODO(https://bugs.python.org/issue27805, python3.7+): revert to ab+ - # TODO(https://bugs.python.org/issue20074): removing buffering=0 if/when - # that gets fixed, or keep two pty: one for reading and one for writing - fileobj = open(self.ec_uart_pty_name, 'r+b', buffering=0) - self.logger.debug('fileobj: %r', fileobj) - self.ec_uart_pty = fileobj - # Add the descriptor to the inputs. - self.inputs.append(fileobj) - self.logger.debug('fileobj added. curr inputs: %r', self.inputs) - # Mark the interpreter as connected now. - self.connected = True - self.logger.debug('Connected to %s.', self.ec_uart_pty_name) - return - - elif command.startswith(b'enhanced'): - self.enhanced_ec = command.split(b' ')[1] == b'True' - return - - # Ignore any other commands while in the disconnected state. - self.logger.log(1, 'command: \'%s\'', command) - if not self.connected: - self.logger.debug('Ignoring command because currently disconnected.') - return - - # Remove leading and trailing spaces only if this is an enhanced EC image. - # For non-enhanced EC images, commands will be single characters at a time - # and can be spaces. - if self.enhanced_ec: - command = command.strip(b' ') - - # There's nothing to do if the command is empty. - if len(command) == 0: - return - - # Handle log level change requests. - if command.startswith(b'loglevel'): - self.logger.debug('Log level change request.') - new_log_level = int(command.split(b' ')[1]) - self.logger.logger.setLevel(new_log_level) - self.logger.info('Log level changed to %d.', new_log_level) - return - - # Check for interrogation command. - if command == EC_SYN: - # User is requesting interrogation. Send SYN as is. - self.logger.debug('User requesting interrogation.') - self.interrogating = True - # Assume the EC isn't enhanced until we get a response. - self.enhanced_ec = False - elif self.enhanced_ec: - # Enhanced EC images require the plaintext commands to be packed. - command = self.PackCommand(command) - # TODO(aaboagye): Make a dict of commands and keys and eventually, - # handle partial matching based on unique prefixes. - - self.EnqueueCmd(command) - - def HandleCmdRetries(self): - """Attempts to retry commands if possible.""" - if self.cmd_retries > 0: - # The EC encountered an error. We'll have to retry again. - self.logger.warning('Retrying command...') - self.cmd_retries -= 1 - self.logger.warning('Retries remaining: %d', self.cmd_retries) - # Retry the command and add the EC UART to the writers again. - self.EnqueueCmd(self.last_cmd) - self.outputs.append(self.ec_uart_pty) - else: - # We're out of retries, so just give up. - self.logger.error('Command failed. No retries left.') - # Clear the command in progress. - self.last_cmd = b'' - # Reset the retry count. - self.cmd_retries = COMMAND_RETRIES - - def SendCmdToEC(self): - """Sends a command to the EC.""" - # If we're retrying a command, just try to send it again. - if self.cmd_retries < COMMAND_RETRIES: - cmd = self.last_cmd - else: - # If we're not retrying, we should not be writing to the EC if we have no - # items in our command queue. - assert not self.ec_cmd_queue.empty() - # Get the command to send. - cmd = self.ec_cmd_queue.get() - - # Send the command. - self.ec_uart_pty.write(cmd) - self.ec_uart_pty.flush() - self.logger.log(1, 'Sent command to EC.') - - if self.enhanced_ec and cmd != EC_SYN: - # Now, that we've sent the command, store the current command as the last - # command sent. If we encounter an error string, we will attempt to retry - # this command. - if cmd != self.last_cmd: - self.last_cmd = cmd - # Reset the retry count. - self.cmd_retries = COMMAND_RETRIES - - # If no command is pending to be sent, then we can remove the EC UART from - # writers. Might need better checking for command retry logic in here. - if self.ec_cmd_queue.empty(): - # Remove the EC UART from the writers while we wait for a response. - self.logger.debug('Removing EC UART from writers.') - self.outputs.remove(self.ec_uart_pty) - - def HandleECData(self): - """Handle any debug prints from the EC.""" - self.logger.log(1, 'EC has data') - # Read what the EC sent us. - data = os.read(self.ec_uart_pty.fileno(), EC_MAX_READ) - self.logger.log(1, 'got: \'%s\'', binascii.hexlify(data)) - if b'&E' in data and self.enhanced_ec: - # We received an error, so we should retry it if possible. - self.logger.warning('Error string found in data.') - self.HandleCmdRetries() - return - - # If we were interrogating, check the response and update our knowledge - # of the current EC image. - if self.interrogating: - self.enhanced_ec = data == EC_ACK - if self.enhanced_ec: - self.logger.debug('The current EC image seems enhanced.') - else: - self.logger.debug('The current EC image does NOT seem enhanced.') - # Done interrogating. - self.interrogating = False - # For now, just forward everything the EC sends us. - self.logger.log(1, 'Forwarding to user...') - self.dbg_pipe.send(data) - - def HandleUserData(self): - """Handle any incoming commands from the user. - - Raises: - EOFError: Allowed to propagate through from self.cmd_pipe.recv(). - """ - self.logger.log(1, 'Command data available. Begin processing.') - data = self.cmd_pipe.recv() - # Process the command. - self.ProcessCommand(data) - - -def Crc8(data): - """Calculates the CRC8 of data. - - The generator polynomial used is: x^8 + x^2 + x + 1. - This is the same implementation that is used in the EC. - - Args: - data: A string of data that we wish to calculate the CRC8 on. - - Returns: - crc >> 8: An integer representing the CRC8 value. - """ - crc = 0 - for byte in six.iterbytes(data): - crc ^= (byte << 8) - for _ in range(8): - if crc & 0x8000: - crc ^= (0x1070 << 3) - crc <<= 1 - return crc >> 8 - - -def StartLoop(interp, shutdown_pipe=None): - """Starts an infinite loop of servicing the user and the EC. - - StartLoop checks to see if there are any commands to process, processing them - if any, and forwards EC output to the user. - - When sending a command to the EC, we send the command once and check the - response to see if the EC encountered an error when receiving the command. An - error condition is reported to the interpreter by a string with at least one - '&' and 'E'. The full string is actually '&&EE', however it's possible that - the leading ampersand or trailing 'E' could be dropped. If an error is - encountered, the interpreter will retry up to the amount configured. - - Args: - interp: An Interpreter object that has been properly initialised. - shutdown_pipe: A file object for a pipe or equivalent that becomes readable - (not blocked) to indicate that the loop should exit. Can be None to never - exit the loop. - """ - try: - # This is used instead of "break" to avoid exiting the loop in the middle of - # an iteration. - continue_looping = True - - while continue_looping: - # The inputs list is created anew in each loop iteration because the - # Interpreter class sometimes modifies the interp.inputs list. - if shutdown_pipe is None: - inputs = interp.inputs - else: - inputs = list(interp.inputs) - inputs.append(shutdown_pipe) - - readable, writeable, _ = select.select(inputs, interp.outputs, []) - - for obj in readable: - # Handle any debug prints from the EC. - if obj is interp.ec_uart_pty: - interp.HandleECData() - - # Handle any commands from the user. - elif obj is interp.cmd_pipe: - try: - interp.HandleUserData() - except EOFError: - interp.logger.debug( - 'ec3po interpreter received EOF from cmd_pipe in ' - 'HandleUserData()') - continue_looping = False - - elif obj is shutdown_pipe: - interp.logger.debug( - 'ec3po interpreter received shutdown pipe unblocked notification') - continue_looping = False - - for obj in writeable: - # Send a command to the EC. - if obj is interp.ec_uart_pty: - interp.SendCmdToEC() - - except KeyboardInterrupt: - pass - - finally: - interp.cmd_pipe.close() - interp.dbg_pipe.close() - interp.ec_uart_pty.close() - if shutdown_pipe is not None: - shutdown_pipe.close() - interp.logger.debug('Exit ec3po interpreter loop for %s', - interp.ec_uart_pty_name) diff --git a/util/ec3po/interpreter_unittest.py b/util/ec3po/interpreter_unittest.py deleted file mode 100755 index fe4d43c351..0000000000 --- a/util/ec3po/interpreter_unittest.py +++ /dev/null @@ -1,380 +0,0 @@ -#!/usr/bin/env python -# Copyright 2015 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Unit tests for the EC-3PO interpreter.""" - -# Note: This is a py2/3 compatible file. - -from __future__ import print_function - -import logging -import mock -import tempfile -import unittest - -import six - -from ec3po import interpreter -from ec3po import threadproc_shim - - -def GetBuiltins(func): - if six.PY2: - return '__builtin__.' + func - return 'builtins.' + func - - -class TestEnhancedECBehaviour(unittest.TestCase): - """Test case to verify all enhanced EC interpretation tasks.""" - def setUp(self): - """Setup the test harness.""" - # Setup logging with a timestamp, the module, and the log level. - logging.basicConfig(level=logging.DEBUG, - format=('%(asctime)s - %(module)s -' - ' %(levelname)s - %(message)s')) - - # Create a tempfile that would represent the EC UART PTY. - self.tempfile = tempfile.NamedTemporaryFile() - - # Create the pipes that the interpreter will use. - self.cmd_pipe_user, self.cmd_pipe_itpr = threadproc_shim.Pipe() - self.dbg_pipe_user, self.dbg_pipe_itpr = threadproc_shim.Pipe(duplex=False) - - # Mock the open() function so we can inspect reads/writes to the EC. - self.ec_uart_pty = mock.mock_open() - - with mock.patch(GetBuiltins('open'), self.ec_uart_pty): - # Create an interpreter. - self.itpr = interpreter.Interpreter(self.tempfile.name, - self.cmd_pipe_itpr, - self.dbg_pipe_itpr, - log_level=logging.DEBUG, - name="EC") - - @mock.patch('ec3po.interpreter.os') - def test_HandlingCommandsThatProduceNoOutput(self, mock_os): - """Verify that the Interpreter correctly handles non-output commands. - - Args: - mock_os: MagicMock object replacing the 'os' module for this test - case. - """ - # The interpreter init should open the EC UART PTY. - expected_ec_calls = [mock.call(self.tempfile.name, 'r+b', buffering=0)] - # Have a command come in the command pipe. The first command will be an - # interrogation to determine if the EC is enhanced or not. - self.cmd_pipe_user.send(interpreter.EC_SYN) - self.itpr.HandleUserData() - # At this point, the command should be queued up waiting to be sent, so - # let's actually send it to the EC. - self.itpr.SendCmdToEC() - expected_ec_calls.extend([mock.call().write(interpreter.EC_SYN), - mock.call().flush()]) - # Now, assume that the EC sends only 1 response back of EC_ACK. - mock_os.read.side_effect = [interpreter.EC_ACK] - # When reading the EC, the interpreter will call file.fileno() to pass to - # os.read(). - expected_ec_calls.append(mock.call().fileno()) - # Simulate the response. - self.itpr.HandleECData() - - # Now that the interrogation was complete, it's time to send down the real - # command. - test_cmd = b'chan save' - # Send the test command down the pipe. - self.cmd_pipe_user.send(test_cmd) - self.itpr.HandleUserData() - self.itpr.SendCmdToEC() - # Since the EC image is enhanced, we should have sent a packed command. - expected_ec_calls.append(mock.call().write(self.itpr.PackCommand(test_cmd))) - expected_ec_calls.append(mock.call().flush()) - - # Now that the first command was sent, we should send another command which - # produces no output. The console would send another interrogation. - self.cmd_pipe_user.send(interpreter.EC_SYN) - self.itpr.HandleUserData() - self.itpr.SendCmdToEC() - expected_ec_calls.extend([mock.call().write(interpreter.EC_SYN), - mock.call().flush()]) - # Again, assume that the EC sends only 1 response back of EC_ACK. - mock_os.read.side_effect = [interpreter.EC_ACK] - # When reading the EC, the interpreter will call file.fileno() to pass to - # os.read(). - expected_ec_calls.append(mock.call().fileno()) - # Simulate the response. - self.itpr.HandleECData() - - # Now send the second test command. - test_cmd = b'chan 0' - self.cmd_pipe_user.send(test_cmd) - self.itpr.HandleUserData() - self.itpr.SendCmdToEC() - # Since the EC image is enhanced, we should have sent a packed command. - expected_ec_calls.append(mock.call().write(self.itpr.PackCommand(test_cmd))) - expected_ec_calls.append(mock.call().flush()) - - # Finally, verify that the appropriate writes were actually sent to the EC. - self.ec_uart_pty.assert_has_calls(expected_ec_calls) - - @mock.patch('ec3po.interpreter.os') - def test_CommandRetryingOnError(self, mock_os): - """Verify that commands are retried if an error is encountered. - - Args: - mock_os: MagicMock object replacing the 'os' module for this test - case. - """ - # The interpreter init should open the EC UART PTY. - expected_ec_calls = [mock.call(self.tempfile.name, 'r+b', buffering=0)] - # Have a command come in the command pipe. The first command will be an - # interrogation to determine if the EC is enhanced or not. - self.cmd_pipe_user.send(interpreter.EC_SYN) - self.itpr.HandleUserData() - # At this point, the command should be queued up waiting to be sent, so - # let's actually send it to the EC. - self.itpr.SendCmdToEC() - expected_ec_calls.extend([mock.call().write(interpreter.EC_SYN), - mock.call().flush()]) - # Now, assume that the EC sends only 1 response back of EC_ACK. - mock_os.read.side_effect = [interpreter.EC_ACK] - # When reading the EC, the interpreter will call file.fileno() to pass to - # os.read(). - expected_ec_calls.append(mock.call().fileno()) - # Simulate the response. - self.itpr.HandleECData() - - # Let's send a command that is received on the EC-side with an error. - test_cmd = b'accelinfo' - self.cmd_pipe_user.send(test_cmd) - self.itpr.HandleUserData() - self.itpr.SendCmdToEC() - packed_cmd = self.itpr.PackCommand(test_cmd) - expected_ec_calls.extend([mock.call().write(packed_cmd), - mock.call().flush()]) - # Have the EC return the error string twice. - mock_os.read.side_effect = [b'&&EE', b'&&EE'] - for i in range(2): - # When reading the EC, the interpreter will call file.fileno() to pass to - # os.read(). - expected_ec_calls.append(mock.call().fileno()) - # Simulate the response. - self.itpr.HandleECData() - - # Since an error was received, the EC should attempt to retry the command. - expected_ec_calls.extend([mock.call().write(packed_cmd), - mock.call().flush()]) - # Verify that the retry count was decremented. - self.assertEqual(interpreter.COMMAND_RETRIES-i-1, self.itpr.cmd_retries, - 'Unexpected cmd_remaining count.') - # Actually retry the command. - self.itpr.SendCmdToEC() - - # Now assume that the last one goes through with no trouble. - expected_ec_calls.extend([mock.call().write(packed_cmd), - mock.call().flush()]) - self.itpr.SendCmdToEC() - - # Verify all the calls. - self.ec_uart_pty.assert_has_calls(expected_ec_calls) - - def test_PackCommandsForEnhancedEC(self): - """Verify that the interpreter packs commands for enhanced EC images.""" - # Assume current EC image is enhanced. - self.itpr.enhanced_ec = True - # Receive a command from the user. - test_cmd = b'gettime' - self.cmd_pipe_user.send(test_cmd) - # Mock out PackCommand to see if it was called. - self.itpr.PackCommand = mock.MagicMock() - # Have the interpreter handle the command. - self.itpr.HandleUserData() - # Verify that PackCommand() was called. - self.itpr.PackCommand.assert_called_once_with(test_cmd) - - def test_DontPackCommandsForNonEnhancedEC(self): - """Verify the interpreter doesn't pack commands for non-enhanced images.""" - # Assume current EC image is not enhanced. - self.itpr.enhanced_ec = False - # Receive a command from the user. - test_cmd = b'gettime' - self.cmd_pipe_user.send(test_cmd) - # Mock out PackCommand to see if it was called. - self.itpr.PackCommand = mock.MagicMock() - # Have the interpreter handle the command. - self.itpr.HandleUserData() - # Verify that PackCommand() was called. - self.itpr.PackCommand.assert_not_called() - - @mock.patch('ec3po.interpreter.os') - def test_KeepingTrackOfInterrogation(self, mock_os): - """Verify that the interpreter can track the state of the interrogation. - - Args: - mock_os: MagicMock object replacing the 'os' module. for this test - case. - """ - # Upon init, the interpreter should assume that the current EC image is not - # enhanced. - self.assertFalse(self.itpr.enhanced_ec, msg=('State of enhanced_ec upon' - ' init is not False.')) - - # Assume an interrogation request comes in from the user. - self.cmd_pipe_user.send(interpreter.EC_SYN) - self.itpr.HandleUserData() - - # Verify the state is now within an interrogation. - self.assertTrue(self.itpr.interrogating, 'interrogating should be True') - # The state of enhanced_ec should not be changed yet because we haven't - # received a valid response yet. - self.assertFalse(self.itpr.enhanced_ec, msg=('State of enhanced_ec is ' - 'not False.')) - - # Assume that the EC responds with an EC_ACK. - mock_os.read.side_effect = [interpreter.EC_ACK] - self.itpr.HandleECData() - - # Now, the interrogation should be complete and we should know that the - # current EC image is enhanced. - self.assertFalse(self.itpr.interrogating, msg=('interrogating should be ' - 'False')) - self.assertTrue(self.itpr.enhanced_ec, msg='enhanced_ec sholud be True') - - # Now let's perform another interrogation, but pretend that the EC ignores - # it. - self.cmd_pipe_user.send(interpreter.EC_SYN) - self.itpr.HandleUserData() - - # Verify interrogating state. - self.assertTrue(self.itpr.interrogating, 'interrogating sholud be True') - # We should assume that the image is not enhanced until we get the valid - # response. - self.assertFalse(self.itpr.enhanced_ec, 'enhanced_ec should be False now.') - - # Let's pretend that we get a random debug print. This should clear the - # interrogating flag. - mock_os.read.side_effect = [b'[1660.593076 HC 0x103]'] - self.itpr.HandleECData() - - # Verify that interrogating flag is cleared and enhanced_ec is still False. - self.assertFalse(self.itpr.interrogating, 'interrogating should be False.') - self.assertFalse(self.itpr.enhanced_ec, - 'enhanced_ec should still be False.') - - -class TestUARTDisconnection(unittest.TestCase): - """Test case to verify interpreter disconnection/reconnection.""" - def setUp(self): - """Setup the test harness.""" - # Setup logging with a timestamp, the module, and the log level. - logging.basicConfig(level=logging.DEBUG, - format=('%(asctime)s - %(module)s -' - ' %(levelname)s - %(message)s')) - - # Create a tempfile that would represent the EC UART PTY. - self.tempfile = tempfile.NamedTemporaryFile() - - # Create the pipes that the interpreter will use. - self.cmd_pipe_user, self.cmd_pipe_itpr = threadproc_shim.Pipe() - self.dbg_pipe_user, self.dbg_pipe_itpr = threadproc_shim.Pipe(duplex=False) - - # Mock the open() function so we can inspect reads/writes to the EC. - self.ec_uart_pty = mock.mock_open() - - with mock.patch(GetBuiltins('open'), self.ec_uart_pty): - # Create an interpreter. - self.itpr = interpreter.Interpreter(self.tempfile.name, - self.cmd_pipe_itpr, - self.dbg_pipe_itpr, - log_level=logging.DEBUG, - name="EC") - - # First, check that interpreter is initialized to connected. - self.assertTrue(self.itpr.connected, ('The interpreter should be' - ' initialized in a connected state')) - - def test_DisconnectStopsECTraffic(self): - """Verify that when in disconnected state, no debug prints are sent.""" - # Let's send a disconnect command through the command pipe. - self.cmd_pipe_user.send(b'disconnect') - self.itpr.HandleUserData() - - # Verify interpreter is disconnected from EC. - self.assertFalse(self.itpr.connected, ('The interpreter should be' - 'disconnected.')) - # Verify that the EC UART is no longer a member of the inputs. The - # interpreter will never pull data from the EC if it's not a member of the - # inputs list. - self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs) - - def test_CommandsDroppedWhenDisconnected(self): - """Verify that when in disconnected state, commands are dropped.""" - # Send a command, followed by 'disconnect'. - self.cmd_pipe_user.send(b'taskinfo') - self.itpr.HandleUserData() - self.cmd_pipe_user.send(b'disconnect') - self.itpr.HandleUserData() - - # Verify interpreter is disconnected from EC. - self.assertFalse(self.itpr.connected, ('The interpreter should be' - 'disconnected.')) - # Verify that the EC UART is no longer a member of the inputs nor outputs. - self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs) - self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs) - - # Have the user send a few more commands in the disconnected state. - command = 'help\n' - for char in command: - self.cmd_pipe_user.send(char.encode('utf-8')) - self.itpr.HandleUserData() - - # The command queue should be empty. - self.assertEqual(0, self.itpr.ec_cmd_queue.qsize()) - - # Now send the reconnect command. - self.cmd_pipe_user.send(b'reconnect') - - with mock.patch(GetBuiltins('open'), mock.mock_open()): - self.itpr.HandleUserData() - - # Verify interpreter is connected. - self.assertTrue(self.itpr.connected) - # Verify that EC UART is a member of the inputs. - self.assertTrue(self.itpr.ec_uart_pty in self.itpr.inputs) - # Since no command was sent after reconnection, verify that the EC UART is - # not a member of the outputs. - self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs) - - def test_ReconnectAllowsECTraffic(self): - """Verify that when connected, EC UART traffic is allowed.""" - # Let's send a disconnect command through the command pipe. - self.cmd_pipe_user.send(b'disconnect') - self.itpr.HandleUserData() - - # Verify interpreter is disconnected. - self.assertFalse(self.itpr.connected, ('The interpreter should be' - 'disconnected.')) - # Verify that the EC UART is no longer a member of the inputs nor outputs. - self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs) - self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs) - - # Issue reconnect command through the command pipe. - self.cmd_pipe_user.send(b'reconnect') - - with mock.patch(GetBuiltins('open'), mock.mock_open()): - self.itpr.HandleUserData() - - # Verify interpreter is connected. - self.assertTrue(self.itpr.connected, ('The interpreter should be' - 'connected.')) - # Verify that the EC UART is now a member of the inputs. - self.assertTrue(self.itpr.ec_uart_pty in self.itpr.inputs) - # Since we have issued no commands during the disconnected state, no - # commands are pending and therefore the PTY should not be added to the - # outputs. - self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs) - - -if __name__ == '__main__': - unittest.main() diff --git a/util/ec3po/run_tests.sh b/util/ec3po/run_tests.sh deleted file mode 100755 index ba513abe30..0000000000 --- a/util/ec3po/run_tests.sh +++ /dev/null @@ -1,15 +0,0 @@ -#!/bin/bash -# -# Copyright 2015 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -set -e - -my_dir="$(realpath -e -- "$(dirname -- "$0")")" -parent_dir="$(realpath -e -- "$my_dir/..")" - -PYTHONPATH="$parent_dir" python3 -s -m unittest \ - ec3po.console_unittest \ - ec3po.interpreter_unittest \ - && touch -- "$my_dir/.tests-passed" diff --git a/util/ec3po/threadproc_shim.py b/util/ec3po/threadproc_shim.py deleted file mode 100644 index da5440b1f3..0000000000 --- a/util/ec3po/threadproc_shim.py +++ /dev/null @@ -1,66 +0,0 @@ -# Copyright 2018 The Chromium OS Authors. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""This is a shim library for the ec3po transition from subprocesses to threads. - -This is necessary because ec3po is split between the platform/ec/ and -third_party/hdctools/ repositories, so the transition cannot happen atomically -in one change. See http://b/79684405 #39. - -This contains only the multiprocessing objects or threading-oriented equivalents -that are actually in use by ec3po. There is no need for further functionality, -because this shim will be deleted after the migration is complete. - -TODO(b/79684405): Stop using multiprocessing.Pipe. The -multiprocessing.Connection objects it returns serialize and deserialize objects -(via Python pickling), which is necessary for sending them between processes, -but is unnecessary overhead between threads. This will not be a simple change, -because the ec3po Console and Interpreter classes use the underlying pipe/socket -pairs with select/poll/epoll alongside other file descriptors. A drop-in -replacement would be non-trivial and add undesirable complexity. The correct -solution will be to split off the polling of the pipes/queues from this module -into separate threads, so that they can be transitioned to another form of -cross-thread synchronization, e.g. directly waiting on queue.Queue.get() or a -lower-level thread synchronization primitive. - -TODO(b/79684405): After this library has been updated to contain -threading-oriented equivalents to its original multiprocessing implementations, -and some reasonable amount of time has elapsed for thread-based ec3po problems -to be discovered, migrate both the platform/ec/ and third_party/hdctools/ sides -of ec3po off of this shim and then delete this file. IMPORTANT: This should -wait until after completing the TODO above to stop using multiprocessing.Pipe! -""" - -# Imports to bring objects into this namespace for users of this module. -from multiprocessing import Pipe -from six.moves.queue import Queue -from threading import Thread as ThreadOrProcess - -# True if this module has ec3po using subprocesses, False if using threads. -USING_SUBPROCS = False - - -def _DoNothing(): - """Do-nothing function for use as a callback with DoIf().""" - - -def DoIf(subprocs=_DoNothing, threads=_DoNothing): - """Return a callback or not based on ec3po use of subprocesses or threads. - - Args: - subprocs: callback that does not require any args - This will be returned - (not called!) if and only if ec3po is using subprocesses. This is - OPTIONAL, the default value is a do-nothing callback that returns None. - threads: callback that does not require any args - This will be returned - (not called!) if and only if ec3po is using threads. This is OPTIONAL, - the default value is a do-nothing callback that returns None. - - Returns: - Either the subprocs or threads argument will be returned. - """ - return subprocs if USING_SUBPROCS else threads - - -def Value(ctype, *args): - return ctype(*args) |