diff options
author | Jeremy Bettis <jbettis@google.com> | 2022-07-08 10:58:19 -0600 |
---|---|---|
committer | Chromeos LUCI <chromeos-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2022-07-12 19:13:33 +0000 |
commit | 7540e7b47b55447475bb8191fb3520dd67cf7998 (patch) | |
tree | 13309dbcf1db48e60fa2c2e5aed79f63bce00b5e /util/ec3po | |
parent | 7c114b8e1a3bb29991da70b9de394ac5d4f6c909 (diff) | |
download | chrome-ec-7540e7b47b55447475bb8191fb3520dd67cf7998.tar.gz |
ec: Format all python files with black and isort
find . \( -path ./private -prune \) -o -name '*.py' -print | xargs black
find . \( -path ./private -prune \) -o -name '*.py' -print | xargs ~/chromiumos/chromite/scripts/isort --settings-file=.isort.cfg
BRANCH=None
BUG=b:238434058
TEST=None
Signed-off-by: Jeremy Bettis <jbettis@google.com>
Change-Id: I63462d6f15d1eaf3db84eb20d1404ee976be8382
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/ec/+/3749242
Commit-Queue: Jeremy Bettis <jbettis@chromium.org>
Reviewed-by: Tom Hughes <tomhughes@chromium.org>
Tested-by: Jeremy Bettis <jbettis@chromium.org>
Commit-Queue: Jack Rosenthal <jrosenth@chromium.org>
Auto-Submit: Jeremy Bettis <jbettis@chromium.org>
Reviewed-by: Jack Rosenthal <jrosenth@chromium.org>
Diffstat (limited to 'util/ec3po')
-rwxr-xr-x | util/ec3po/console.py | 2231 | ||||
-rwxr-xr-x | util/ec3po/console_unittest.py | 2954 | ||||
-rw-r--r-- | util/ec3po/interpreter.py | 818 | ||||
-rwxr-xr-x | util/ec3po/interpreter_unittest.py | 728 | ||||
-rw-r--r-- | util/ec3po/threadproc_shim.py | 31 |
5 files changed, 3450 insertions, 3312 deletions
diff --git a/util/ec3po/console.py b/util/ec3po/console.py index e71216e3f2..33fa5a6775 100755 --- a/util/ec3po/console.py +++ b/util/ec3po/console.py @@ -17,7 +17,6 @@ from __future__ import print_function import argparse import binascii import ctypes -from datetime import datetime import logging import os import pty @@ -26,26 +25,25 @@ import select import stat import sys import traceback +from datetime import datetime import six +from ec3po import interpreter, threadproc_shim -from ec3po import interpreter -from ec3po import threadproc_shim - - -PROMPT = b'> ' +PROMPT = b"> " CONSOLE_INPUT_LINE_SIZE = 80 # Taken from the CONFIG_* with the same name. CONSOLE_MAX_READ = 100 # Max bytes to read at a time from the user. LOOK_BUFFER_SIZE = 256 # Size of search window when looking for the enhanced EC - # image string. +# image string. # In console_init(), the EC will print a string saying that the EC console is # enabled. Enhanced images will print a slightly different string. These # regular expressions are used to determine at reboot whether the EC image is # enhanced or not. -ENHANCED_IMAGE_RE = re.compile(br'Enhanced Console is enabled ' - br'\(v([0-9]+\.[0-9]+\.[0-9]+)\)') -NON_ENHANCED_IMAGE_RE = re.compile(br'Console is enabled; ') +ENHANCED_IMAGE_RE = re.compile( + rb"Enhanced Console is enabled " rb"\(v([0-9]+\.[0-9]+\.[0-9]+)\)" +) +NON_ENHANCED_IMAGE_RE = re.compile(rb"Console is enabled; ") # The timeouts are really only useful for enhanced EC images, but otherwise just # serve as a delay for non-enhanced EC images. Therefore, we can keep this @@ -54,1118 +52,1173 @@ NON_ENHANCED_IMAGE_RE = re.compile(br'Console is enabled; ') # EC image, we can increase the timeout for stability just in case it takes a # bit longer to receive an ACK for some reason. NON_ENHANCED_EC_INTERROGATION_TIMEOUT = 0.3 # Maximum number of seconds to wait - # for a response to an - # interrogation of a non-enhanced - # EC image. +# for a response to an +# interrogation of a non-enhanced +# EC image. ENHANCED_EC_INTERROGATION_TIMEOUT = 1.0 # Maximum number of seconds to wait for - # a response to an interrogation of an - # enhanced EC image. +# a response to an interrogation of an +# enhanced EC image. # List of modes which control when interrogations are performed with the EC. -INTERROGATION_MODES = [b'never', b'always', b'auto'] +INTERROGATION_MODES = [b"never", b"always", b"auto"] # Format for printing host timestamp -HOST_STRFTIME="%y-%m-%d %H:%M:%S.%f" +HOST_STRFTIME = "%y-%m-%d %H:%M:%S.%f" class EscState(object): - """Class which contains an enumeration for states of ESC sequences.""" - ESC_START = 1 - ESC_BRACKET = 2 - ESC_BRACKET_1 = 3 - ESC_BRACKET_3 = 4 - ESC_BRACKET_8 = 5 - + """Class which contains an enumeration for states of ESC sequences.""" -class ControlKey(object): - """Class which contains codes for various control keys.""" - BACKSPACE = 0x08 - CTRL_A = 0x01 - CTRL_B = 0x02 - CTRL_D = 0x04 - CTRL_E = 0x05 - CTRL_F = 0x06 - CTRL_K = 0x0b - CTRL_N = 0xe - CTRL_P = 0x10 - CARRIAGE_RETURN = 0x0d - ESC = 0x1b + ESC_START = 1 + ESC_BRACKET = 2 + ESC_BRACKET_1 = 3 + ESC_BRACKET_3 = 4 + ESC_BRACKET_8 = 5 -class Console(object): - """Class which provides the console interface between the EC and the user. - - This class essentially represents the console interface between the user and - the EC. It handles all of the console editing behaviour - - Attributes: - logger: A logger for this module. - controller_pty: File descriptor to the controller side of the PTY. Used for - driving output to the user and receiving user input. - user_pty: A string representing the PTY name of the served console. - cmd_pipe: A socket.socket or multiprocessing.Connection object which - represents the console side of the command pipe. This must be a - bidirectional pipe. Console commands and responses utilize this pipe. - dbg_pipe: A socket.socket or multiprocessing.Connection object which - represents the console's read-only side of the debug pipe. This must be a - unidirectional pipe attached to the intepreter. EC debug messages use - this pipe. - oobm_queue: A queue.Queue or multiprocessing.Queue which is used for out of - band management for the interactive console. - input_buffer: A string representing the current input command. - input_buffer_pos: An integer representing the current position in the buffer - to insert a char. - partial_cmd: A string representing the command entered on a line before - pressing the up arrow keys. - esc_state: An integer represeting the current state within an escape - sequence. - line_limit: An integer representing the maximum number of characters on a - line. - history: A list of strings containing the past entered console commands. - history_pos: An integer representing the current history buffer position. - This index is used to show previous commands. - prompt: A string representing the console prompt displayed to the user. - enhanced_ec: A boolean indicating if the EC image that we are currently - communicating with is enhanced or not. Enhanced EC images will support - packed commands and host commands over the UART. This defaults to False - until we perform some handshaking. - interrogation_timeout: A float representing the current maximum seconds to - wait for a response to an interrogation. - receiving_oobm_cmd: A boolean indicating whether or not the console is in - the middle of receiving an out of band command. - pending_oobm_cmd: A string containing the pending OOBM command. - interrogation_mode: A string containing the current mode of whether - interrogations are performed with the EC or not and how often. - raw_debug: Flag to indicate whether per interrupt data should be logged to - debug - output_line_log_buffer: buffer for lines coming from the EC to log to debug - """ - - def __init__(self, controller_pty, user_pty, interface_pty, cmd_pipe, dbg_pipe, - name=None): - """Initalises a Console object with the provided arguments. +class ControlKey(object): + """Class which contains codes for various control keys.""" - Args: - controller_pty: File descriptor to the controller side of the PTY. Used for - driving output to the user and receiving user input. - user_pty: A string representing the PTY name of the served console. - interface_pty: A string representing the PTY name of the served command - interface. - cmd_pipe: A socket.socket or multiprocessing.Connection object which - represents the console side of the command pipe. This must be a - bidirectional pipe. Console commands and responses utilize this pipe. - dbg_pipe: A socket.socket or multiprocessing.Connection object which - represents the console's read-only side of the debug pipe. This must be a - unidirectional pipe attached to the intepreter. EC debug messages use - this pipe. - name: the console source name - """ - # Create a unique logger based on the console name - console_prefix = ('%s - ' % name) if name else '' - logger = logging.getLogger('%sEC3PO.Console' % console_prefix) - self.logger = interpreter.LoggerAdapter(logger, {'pty': user_pty}) - self.controller_pty = controller_pty - self.user_pty = user_pty - self.interface_pty = interface_pty - self.cmd_pipe = cmd_pipe - self.dbg_pipe = dbg_pipe - self.oobm_queue = threadproc_shim.Queue() - self.input_buffer = b'' - self.input_buffer_pos = 0 - self.partial_cmd = b'' - self.esc_state = 0 - self.line_limit = CONSOLE_INPUT_LINE_SIZE - self.history = [] - self.history_pos = 0 - self.prompt = PROMPT - self.enhanced_ec = False - self.interrogation_timeout = NON_ENHANCED_EC_INTERROGATION_TIMEOUT - self.receiving_oobm_cmd = False - self.pending_oobm_cmd = b'' - self.interrogation_mode = b'auto' - self.timestamp_enabled = True - self.look_buffer = b'' - self.raw_debug = False - self.output_line_log_buffer = [] - - def __str__(self): - """Show internal state of Console object as a string.""" - string = [] - string.append('controller_pty: %s' % self.controller_pty) - string.append('user_pty: %s' % self.user_pty) - string.append('interface_pty: %s' % self.interface_pty) - string.append('cmd_pipe: %s' % self.cmd_pipe) - string.append('dbg_pipe: %s' % self.dbg_pipe) - string.append('oobm_queue: %s' % self.oobm_queue) - string.append('input_buffer: %s' % self.input_buffer) - string.append('input_buffer_pos: %d' % self.input_buffer_pos) - string.append('esc_state: %d' % self.esc_state) - string.append('line_limit: %d' % self.line_limit) - string.append('history: %r' % self.history) - string.append('history_pos: %d' % self.history_pos) - string.append('prompt: %r' % self.prompt) - string.append('partial_cmd: %r'% self.partial_cmd) - string.append('interrogation_mode: %r' % self.interrogation_mode) - string.append('look_buffer: %r' % self.look_buffer) - return '\n'.join(string) - - def LogConsoleOutput(self, data): - """Log to debug user MCU output to controller_pty when line is filled. - - The logging also suppresses the Cr50 spinner lines by removing characters - when it sees backspaces. + BACKSPACE = 0x08 + CTRL_A = 0x01 + CTRL_B = 0x02 + CTRL_D = 0x04 + CTRL_E = 0x05 + CTRL_F = 0x06 + CTRL_K = 0x0B + CTRL_N = 0xE + CTRL_P = 0x10 + CARRIAGE_RETURN = 0x0D + ESC = 0x1B - Args: - data: bytes - string received from MCU - """ - data = list(data) - # For compatibility with python2 and python3, standardize on the data - # being a list of integers. This requires one more transformation in py2 - if not isinstance(data[0], int): - data = [ord(c) for c in data] - - # This is a list of already filtered characters (or placeholders). - line = self.output_line_log_buffer - - # TODO(b/177480273): use raw strings here - symbols = { - ord(b'\n'): u'\\n', - ord(b'\r'): u'\\r', - ord(b'\t'): u'\\t' - } - # self.logger.debug(u'%s + %r', u''.join(line), ''.join(data)) - while data: - # Recall, data is a list of integers, namely the byte values sent by - # the MCU. - byte = data.pop(0) - # This means that |byte| is an int. - if byte == ord('\n'): - line.append(symbols[byte]) - if line: - self.logger.debug(u'%s', ''.join(line)) - line = [] - elif byte == ord('\b'): - # Backspace: trim the last character off the buffer - if line: - line.pop(-1) - elif byte in symbols: - line.append(symbols[byte]) - elif byte < ord(' ') or byte > ord('~'): - # Turn any character that isn't printable ASCII into escaped hex. - # ' ' is chr(20), and 0-19 are unprintable control characters. - # '~' is chr(126), and 127 is DELETE. 128-255 are control and Latin-1. - line.append(u'\\x%02x' % byte) - else: - # byte is printable. Thus it is safe to use chr() to get the printable - # character out of it again. - line.append(u'%s' % chr(byte)) - self.output_line_log_buffer = line - - def PrintHistory(self): - """Print the history of entered commands.""" - fd = self.controller_pty - # Make it pretty by figuring out how wide to pad the numbers. - wide = (len(self.history) // 10) + 1 - for i in range(len(self.history)): - line = b' %*d %s\r\n' % (wide, i, self.history[i]) - os.write(fd, line) - - def ShowPreviousCommand(self): - """Shows the previous command from the history list.""" - # There's nothing to do if there's no history at all. - if not self.history: - self.logger.debug('No history to print.') - return - - # Don't do anything if there's no more history to show. - if self.history_pos == 0: - self.logger.debug('No more history to show.') - return - - self.logger.debug('current history position: %d.', self.history_pos) - - # Decrement the history buffer position. - self.history_pos -= 1 - self.logger.debug('new history position.: %d', self.history_pos) - - # Save the text entered on the console if any. - if self.history_pos == len(self.history)-1: - self.logger.debug('saving partial_cmd: %r', self.input_buffer) - self.partial_cmd = self.input_buffer - - # Backspace the line. - for _ in range(self.input_buffer_pos): - self.SendBackspace() - - # Print the last entry in the history buffer. - self.logger.debug('printing previous entry %d - %s', self.history_pos, - self.history[self.history_pos]) - fd = self.controller_pty - prev_cmd = self.history[self.history_pos] - os.write(fd, prev_cmd) - # Update the input buffer. - self.input_buffer = prev_cmd - self.input_buffer_pos = len(prev_cmd) - - def ShowNextCommand(self): - """Shows the next command from the history list.""" - # Don't do anything if there's no history at all. - if not self.history: - self.logger.debug('History buffer is empty.') - return - - fd = self.controller_pty - - self.logger.debug('current history position: %d', self.history_pos) - # Increment the history position. - self.history_pos += 1 - - # Restore the partial cmd. - if self.history_pos == len(self.history): - self.logger.debug('Restoring partial command of %r', self.partial_cmd) - # Backspace the line. - for _ in range(self.input_buffer_pos): - self.SendBackspace() - # Print the partially entered command if any. - os.write(fd, self.partial_cmd) - self.input_buffer = self.partial_cmd - self.input_buffer_pos = len(self.input_buffer) - # Now that we've printed it, clear the partial cmd storage. - self.partial_cmd = b'' - # Reset history position. - self.history_pos = len(self.history) - return - - self.logger.debug('new history position: %d', self.history_pos) - if self.history_pos > len(self.history)-1: - self.logger.debug('No more history to show.') - self.history_pos -= 1 - self.logger.debug('Reset history position to %d', self.history_pos) - return - - # Backspace the line. - for _ in range(self.input_buffer_pos): - self.SendBackspace() - - # Print the newer entry from the history buffer. - self.logger.debug('printing next entry %d - %s', self.history_pos, - self.history[self.history_pos]) - next_cmd = self.history[self.history_pos] - os.write(fd, next_cmd) - # Update the input buffer. - self.input_buffer = next_cmd - self.input_buffer_pos = len(next_cmd) - self.logger.debug('new history position: %d.', self.history_pos) - - def SliceOutChar(self): - """Remove a char from the line and shift everything over 1 column.""" - fd = self.controller_pty - # Remove the character at the input_buffer_pos by slicing it out. - self.input_buffer = self.input_buffer[0:self.input_buffer_pos] + \ - self.input_buffer[self.input_buffer_pos+1:] - # Write the rest of the line - moved_col = os.write(fd, self.input_buffer[self.input_buffer_pos:]) - # Write a space to clear out the last char - moved_col += os.write(fd, b' ') - # Update the input buffer position. - self.input_buffer_pos += moved_col - # Reset the cursor - self.MoveCursor('left', moved_col) - - def HandleEsc(self, byte): - """HandleEsc processes escape sequences. - Args: - byte: An integer representing the current byte in the sequence. +class Console(object): + """Class which provides the console interface between the EC and the user. + + This class essentially represents the console interface between the user and + the EC. It handles all of the console editing behaviour + + Attributes: + logger: A logger for this module. + controller_pty: File descriptor to the controller side of the PTY. Used for + driving output to the user and receiving user input. + user_pty: A string representing the PTY name of the served console. + cmd_pipe: A socket.socket or multiprocessing.Connection object which + represents the console side of the command pipe. This must be a + bidirectional pipe. Console commands and responses utilize this pipe. + dbg_pipe: A socket.socket or multiprocessing.Connection object which + represents the console's read-only side of the debug pipe. This must be a + unidirectional pipe attached to the intepreter. EC debug messages use + this pipe. + oobm_queue: A queue.Queue or multiprocessing.Queue which is used for out of + band management for the interactive console. + input_buffer: A string representing the current input command. + input_buffer_pos: An integer representing the current position in the buffer + to insert a char. + partial_cmd: A string representing the command entered on a line before + pressing the up arrow keys. + esc_state: An integer represeting the current state within an escape + sequence. + line_limit: An integer representing the maximum number of characters on a + line. + history: A list of strings containing the past entered console commands. + history_pos: An integer representing the current history buffer position. + This index is used to show previous commands. + prompt: A string representing the console prompt displayed to the user. + enhanced_ec: A boolean indicating if the EC image that we are currently + communicating with is enhanced or not. Enhanced EC images will support + packed commands and host commands over the UART. This defaults to False + until we perform some handshaking. + interrogation_timeout: A float representing the current maximum seconds to + wait for a response to an interrogation. + receiving_oobm_cmd: A boolean indicating whether or not the console is in + the middle of receiving an out of band command. + pending_oobm_cmd: A string containing the pending OOBM command. + interrogation_mode: A string containing the current mode of whether + interrogations are performed with the EC or not and how often. + raw_debug: Flag to indicate whether per interrupt data should be logged to + debug + output_line_log_buffer: buffer for lines coming from the EC to log to debug """ - # We shouldn't be handling an escape sequence if we haven't seen one. - assert self.esc_state != 0 - - if self.esc_state is EscState.ESC_START: - self.logger.debug('ESC_START') - if byte == ord('['): - self.esc_state = EscState.ESC_BRACKET - return - else: - self.logger.error('Unexpected sequence. %c', byte) - self.esc_state = 0 - - elif self.esc_state is EscState.ESC_BRACKET: - self.logger.debug('ESC_BRACKET') - # Left Arrow key was pressed. - if byte == ord('D'): - self.logger.debug('Left arrow key pressed.') - self.MoveCursor('left', 1) - self.esc_state = 0 # Reset the state. - return - - # Right Arrow key. - elif byte == ord('C'): - self.logger.debug('Right arrow key pressed.') - self.MoveCursor('right', 1) - self.esc_state = 0 # Reset the state. - return - - # Up Arrow key. - elif byte == ord('A'): - self.logger.debug('Up arrow key pressed.') - self.ShowPreviousCommand() - # Reset the state. - self.esc_state = 0 # Reset the state. - return - - # Down Arrow key. - elif byte == ord('B'): - self.logger.debug('Down arrow key pressed.') - self.ShowNextCommand() - # Reset the state. - self.esc_state = 0 # Reset the state. - return - - # For some reason, minicom sends a 1 instead of 7. /shrug - # TODO(aaboagye): Figure out why this happens. - elif byte == ord('1') or byte == ord('7'): - self.esc_state = EscState.ESC_BRACKET_1 - - elif byte == ord('3'): - self.esc_state = EscState.ESC_BRACKET_3 - - elif byte == ord('8'): - self.esc_state = EscState.ESC_BRACKET_8 - - else: - self.logger.error(r'Bad or unhandled escape sequence. got ^[%c\(%d)', - chr(byte), byte) - self.esc_state = 0 - return - - elif self.esc_state is EscState.ESC_BRACKET_1: - self.logger.debug('ESC_BRACKET_1') - # HOME key. - if byte == ord('~'): - self.logger.debug('Home key pressed.') - self.MoveCursor('left', self.input_buffer_pos) - self.esc_state = 0 # Reset the state. - self.logger.debug('ESC sequence complete.') - return - - elif self.esc_state is EscState.ESC_BRACKET_3: - self.logger.debug('ESC_BRACKET_3') - # DEL key. - if byte == ord('~'): - self.logger.debug('Delete key pressed.') - if self.input_buffer_pos != len(self.input_buffer): - self.SliceOutChar() - self.esc_state = 0 # Reset the state. - - elif self.esc_state is EscState.ESC_BRACKET_8: - self.logger.debug('ESC_BRACKET_8') - # END key. - if byte == ord('~'): - self.logger.debug('End key pressed.') - self.MoveCursor('right', - len(self.input_buffer) - self.input_buffer_pos) - self.esc_state = 0 # Reset the state. - self.logger.debug('ESC sequence complete.') - return - - else: - self.logger.error('Unexpected sequence. %c', byte) + def __init__( + self, controller_pty, user_pty, interface_pty, cmd_pipe, dbg_pipe, name=None + ): + """Initalises a Console object with the provided arguments. + + Args: + controller_pty: File descriptor to the controller side of the PTY. Used for + driving output to the user and receiving user input. + user_pty: A string representing the PTY name of the served console. + interface_pty: A string representing the PTY name of the served command + interface. + cmd_pipe: A socket.socket or multiprocessing.Connection object which + represents the console side of the command pipe. This must be a + bidirectional pipe. Console commands and responses utilize this pipe. + dbg_pipe: A socket.socket or multiprocessing.Connection object which + represents the console's read-only side of the debug pipe. This must be a + unidirectional pipe attached to the intepreter. EC debug messages use + this pipe. + name: the console source name + """ + # Create a unique logger based on the console name + console_prefix = ("%s - " % name) if name else "" + logger = logging.getLogger("%sEC3PO.Console" % console_prefix) + self.logger = interpreter.LoggerAdapter(logger, {"pty": user_pty}) + self.controller_pty = controller_pty + self.user_pty = user_pty + self.interface_pty = interface_pty + self.cmd_pipe = cmd_pipe + self.dbg_pipe = dbg_pipe + self.oobm_queue = threadproc_shim.Queue() + self.input_buffer = b"" + self.input_buffer_pos = 0 + self.partial_cmd = b"" self.esc_state = 0 + self.line_limit = CONSOLE_INPUT_LINE_SIZE + self.history = [] + self.history_pos = 0 + self.prompt = PROMPT + self.enhanced_ec = False + self.interrogation_timeout = NON_ENHANCED_EC_INTERROGATION_TIMEOUT + self.receiving_oobm_cmd = False + self.pending_oobm_cmd = b"" + self.interrogation_mode = b"auto" + self.timestamp_enabled = True + self.look_buffer = b"" + self.raw_debug = False + self.output_line_log_buffer = [] + + def __str__(self): + """Show internal state of Console object as a string.""" + string = [] + string.append("controller_pty: %s" % self.controller_pty) + string.append("user_pty: %s" % self.user_pty) + string.append("interface_pty: %s" % self.interface_pty) + string.append("cmd_pipe: %s" % self.cmd_pipe) + string.append("dbg_pipe: %s" % self.dbg_pipe) + string.append("oobm_queue: %s" % self.oobm_queue) + string.append("input_buffer: %s" % self.input_buffer) + string.append("input_buffer_pos: %d" % self.input_buffer_pos) + string.append("esc_state: %d" % self.esc_state) + string.append("line_limit: %d" % self.line_limit) + string.append("history: %r" % self.history) + string.append("history_pos: %d" % self.history_pos) + string.append("prompt: %r" % self.prompt) + string.append("partial_cmd: %r" % self.partial_cmd) + string.append("interrogation_mode: %r" % self.interrogation_mode) + string.append("look_buffer: %r" % self.look_buffer) + return "\n".join(string) + + def LogConsoleOutput(self, data): + """Log to debug user MCU output to controller_pty when line is filled. + + The logging also suppresses the Cr50 spinner lines by removing characters + when it sees backspaces. + + Args: + data: bytes - string received from MCU + """ + data = list(data) + # For compatibility with python2 and python3, standardize on the data + # being a list of integers. This requires one more transformation in py2 + if not isinstance(data[0], int): + data = [ord(c) for c in data] + + # This is a list of already filtered characters (or placeholders). + line = self.output_line_log_buffer + + # TODO(b/177480273): use raw strings here + symbols = {ord(b"\n"): "\\n", ord(b"\r"): "\\r", ord(b"\t"): "\\t"} + # self.logger.debug(u'%s + %r', u''.join(line), ''.join(data)) + while data: + # Recall, data is a list of integers, namely the byte values sent by + # the MCU. + byte = data.pop(0) + # This means that |byte| is an int. + if byte == ord("\n"): + line.append(symbols[byte]) + if line: + self.logger.debug("%s", "".join(line)) + line = [] + elif byte == ord("\b"): + # Backspace: trim the last character off the buffer + if line: + line.pop(-1) + elif byte in symbols: + line.append(symbols[byte]) + elif byte < ord(" ") or byte > ord("~"): + # Turn any character that isn't printable ASCII into escaped hex. + # ' ' is chr(20), and 0-19 are unprintable control characters. + # '~' is chr(126), and 127 is DELETE. 128-255 are control and Latin-1. + line.append("\\x%02x" % byte) + else: + # byte is printable. Thus it is safe to use chr() to get the printable + # character out of it again. + line.append("%s" % chr(byte)) + self.output_line_log_buffer = line + + def PrintHistory(self): + """Print the history of entered commands.""" + fd = self.controller_pty + # Make it pretty by figuring out how wide to pad the numbers. + wide = (len(self.history) // 10) + 1 + for i in range(len(self.history)): + line = b" %*d %s\r\n" % (wide, i, self.history[i]) + os.write(fd, line) + + def ShowPreviousCommand(self): + """Shows the previous command from the history list.""" + # There's nothing to do if there's no history at all. + if not self.history: + self.logger.debug("No history to print.") + return + + # Don't do anything if there's no more history to show. + if self.history_pos == 0: + self.logger.debug("No more history to show.") + return + + self.logger.debug("current history position: %d.", self.history_pos) + + # Decrement the history buffer position. + self.history_pos -= 1 + self.logger.debug("new history position.: %d", self.history_pos) + + # Save the text entered on the console if any. + if self.history_pos == len(self.history) - 1: + self.logger.debug("saving partial_cmd: %r", self.input_buffer) + self.partial_cmd = self.input_buffer + + # Backspace the line. + for _ in range(self.input_buffer_pos): + self.SendBackspace() + + # Print the last entry in the history buffer. + self.logger.debug( + "printing previous entry %d - %s", + self.history_pos, + self.history[self.history_pos], + ) + fd = self.controller_pty + prev_cmd = self.history[self.history_pos] + os.write(fd, prev_cmd) + # Update the input buffer. + self.input_buffer = prev_cmd + self.input_buffer_pos = len(prev_cmd) + + def ShowNextCommand(self): + """Shows the next command from the history list.""" + # Don't do anything if there's no history at all. + if not self.history: + self.logger.debug("History buffer is empty.") + return + + fd = self.controller_pty + + self.logger.debug("current history position: %d", self.history_pos) + # Increment the history position. + self.history_pos += 1 + + # Restore the partial cmd. + if self.history_pos == len(self.history): + self.logger.debug("Restoring partial command of %r", self.partial_cmd) + # Backspace the line. + for _ in range(self.input_buffer_pos): + self.SendBackspace() + # Print the partially entered command if any. + os.write(fd, self.partial_cmd) + self.input_buffer = self.partial_cmd + self.input_buffer_pos = len(self.input_buffer) + # Now that we've printed it, clear the partial cmd storage. + self.partial_cmd = b"" + # Reset history position. + self.history_pos = len(self.history) + return + + self.logger.debug("new history position: %d", self.history_pos) + if self.history_pos > len(self.history) - 1: + self.logger.debug("No more history to show.") + self.history_pos -= 1 + self.logger.debug("Reset history position to %d", self.history_pos) + return + + # Backspace the line. + for _ in range(self.input_buffer_pos): + self.SendBackspace() + + # Print the newer entry from the history buffer. + self.logger.debug( + "printing next entry %d - %s", + self.history_pos, + self.history[self.history_pos], + ) + next_cmd = self.history[self.history_pos] + os.write(fd, next_cmd) + # Update the input buffer. + self.input_buffer = next_cmd + self.input_buffer_pos = len(next_cmd) + self.logger.debug("new history position: %d.", self.history_pos) + + def SliceOutChar(self): + """Remove a char from the line and shift everything over 1 column.""" + fd = self.controller_pty + # Remove the character at the input_buffer_pos by slicing it out. + self.input_buffer = ( + self.input_buffer[0 : self.input_buffer_pos] + + self.input_buffer[self.input_buffer_pos + 1 :] + ) + # Write the rest of the line + moved_col = os.write(fd, self.input_buffer[self.input_buffer_pos :]) + # Write a space to clear out the last char + moved_col += os.write(fd, b" ") + # Update the input buffer position. + self.input_buffer_pos += moved_col + # Reset the cursor + self.MoveCursor("left", moved_col) + + def HandleEsc(self, byte): + """HandleEsc processes escape sequences. + + Args: + byte: An integer representing the current byte in the sequence. + """ + # We shouldn't be handling an escape sequence if we haven't seen one. + assert self.esc_state != 0 + + if self.esc_state is EscState.ESC_START: + self.logger.debug("ESC_START") + if byte == ord("["): + self.esc_state = EscState.ESC_BRACKET + return + + else: + self.logger.error("Unexpected sequence. %c", byte) + self.esc_state = 0 + + elif self.esc_state is EscState.ESC_BRACKET: + self.logger.debug("ESC_BRACKET") + # Left Arrow key was pressed. + if byte == ord("D"): + self.logger.debug("Left arrow key pressed.") + self.MoveCursor("left", 1) + self.esc_state = 0 # Reset the state. + return + + # Right Arrow key. + elif byte == ord("C"): + self.logger.debug("Right arrow key pressed.") + self.MoveCursor("right", 1) + self.esc_state = 0 # Reset the state. + return + + # Up Arrow key. + elif byte == ord("A"): + self.logger.debug("Up arrow key pressed.") + self.ShowPreviousCommand() + # Reset the state. + self.esc_state = 0 # Reset the state. + return + + # Down Arrow key. + elif byte == ord("B"): + self.logger.debug("Down arrow key pressed.") + self.ShowNextCommand() + # Reset the state. + self.esc_state = 0 # Reset the state. + return + + # For some reason, minicom sends a 1 instead of 7. /shrug + # TODO(aaboagye): Figure out why this happens. + elif byte == ord("1") or byte == ord("7"): + self.esc_state = EscState.ESC_BRACKET_1 + + elif byte == ord("3"): + self.esc_state = EscState.ESC_BRACKET_3 + + elif byte == ord("8"): + self.esc_state = EscState.ESC_BRACKET_8 + + else: + self.logger.error( + r"Bad or unhandled escape sequence. got ^[%c\(%d)", chr(byte), byte + ) + self.esc_state = 0 + return + + elif self.esc_state is EscState.ESC_BRACKET_1: + self.logger.debug("ESC_BRACKET_1") + # HOME key. + if byte == ord("~"): + self.logger.debug("Home key pressed.") + self.MoveCursor("left", self.input_buffer_pos) + self.esc_state = 0 # Reset the state. + self.logger.debug("ESC sequence complete.") + return + + elif self.esc_state is EscState.ESC_BRACKET_3: + self.logger.debug("ESC_BRACKET_3") + # DEL key. + if byte == ord("~"): + self.logger.debug("Delete key pressed.") + if self.input_buffer_pos != len(self.input_buffer): + self.SliceOutChar() + self.esc_state = 0 # Reset the state. + + elif self.esc_state is EscState.ESC_BRACKET_8: + self.logger.debug("ESC_BRACKET_8") + # END key. + if byte == ord("~"): + self.logger.debug("End key pressed.") + self.MoveCursor("right", len(self.input_buffer) - self.input_buffer_pos) + self.esc_state = 0 # Reset the state. + self.logger.debug("ESC sequence complete.") + return + + else: + self.logger.error("Unexpected sequence. %c", byte) + self.esc_state = 0 + + else: + self.logger.error("Unexpected sequence. %c", byte) + self.esc_state = 0 + + def ProcessInput(self): + """Captures the input determines what actions to take.""" + # There's nothing to do if the input buffer is empty. + if len(self.input_buffer) == 0: + return + + # Don't store 2 consecutive identical commands in the history. + if self.history and self.history[-1] != self.input_buffer or not self.history: + self.history.append(self.input_buffer) + + # Split the command up by spaces. + line = self.input_buffer.split(b" ") + self.logger.debug("cmd: %s", self.input_buffer) + cmd = line[0].lower() + + # The 'history' command is a special case that we handle locally. + if cmd == "history": + self.PrintHistory() + return + + # Send the command to the interpreter. + self.logger.debug("Sending command to interpreter.") + self.cmd_pipe.send(self.input_buffer) + + def CheckForEnhancedECImage(self): + """Performs an interrogation of the EC image. + + Send a SYN and expect an ACK. If no ACK or the response is incorrect, then + assume that the current EC image that we are talking to is not enhanced. + + Returns: + is_enhanced: A boolean indicating whether the EC responded to the + interrogation correctly. + + Raises: + EOFError: Allowed to propagate through from self.dbg_pipe.recv(). + """ + # Send interrogation byte and wait for the response. + self.logger.debug("Performing interrogation.") + self.cmd_pipe.send(interpreter.EC_SYN) + + response = "" + if self.dbg_pipe.poll(self.interrogation_timeout): + response = self.dbg_pipe.recv() + self.logger.debug("response: %r", binascii.hexlify(response)) + else: + self.logger.debug("Timed out waiting for EC_ACK") + + # Verify the acknowledgment. + is_enhanced = response == interpreter.EC_ACK + + if is_enhanced: + # Increase the interrogation timeout for stability purposes. + self.interrogation_timeout = ENHANCED_EC_INTERROGATION_TIMEOUT + self.logger.debug( + "Increasing interrogation timeout to %rs.", self.interrogation_timeout + ) + else: + # Reduce the timeout in order to reduce the perceivable delay. + self.interrogation_timeout = NON_ENHANCED_EC_INTERROGATION_TIMEOUT + self.logger.debug( + "Reducing interrogation timeout to %rs.", self.interrogation_timeout + ) + + return is_enhanced + + def HandleChar(self, byte): + """HandleChar does a certain action when it receives a character. + + Args: + byte: An integer representing the character received from the user. + + Raises: + EOFError: Allowed to propagate through from self.CheckForEnhancedECImage() + i.e. from self.dbg_pipe.recv(). + """ + fd = self.controller_pty + + # Enter the OOBM prompt mode if the user presses '%'. + if byte == ord("%"): + self.logger.debug("Begin OOBM command.") + self.receiving_oobm_cmd = True + # Print a "prompt". + os.write(self.controller_pty, b"\r\n% ") + return + + # Add chars to the pending OOBM command if we're currently receiving one. + if self.receiving_oobm_cmd and byte != ControlKey.CARRIAGE_RETURN: + tmp_bytes = six.int2byte(byte) + self.pending_oobm_cmd += tmp_bytes + self.logger.debug("%s", tmp_bytes) + os.write(self.controller_pty, tmp_bytes) + return + + if byte == ControlKey.CARRIAGE_RETURN: + if self.receiving_oobm_cmd: + # Terminate the command and place it in the OOBM queue. + self.logger.debug("End OOBM command.") + if self.pending_oobm_cmd: + self.oobm_queue.put(self.pending_oobm_cmd) + self.logger.debug( + "Placed %r into OOBM command queue.", self.pending_oobm_cmd + ) + + # Reset the state. + os.write(self.controller_pty, b"\r\n" + self.prompt) + self.input_buffer = b"" + self.input_buffer_pos = 0 + self.receiving_oobm_cmd = False + self.pending_oobm_cmd = b"" + return + + if self.interrogation_mode == b"never": + self.logger.debug( + "Skipping interrogation because interrogation mode" + " is set to never." + ) + elif self.interrogation_mode == b"always": + # Only interrogate the EC if the interrogation mode is set to 'always'. + self.enhanced_ec = self.CheckForEnhancedECImage() + self.logger.debug("Enhanced EC image? %r", self.enhanced_ec) + + if not self.enhanced_ec: + # Send everything straight to the EC to handle. + self.cmd_pipe.send(six.int2byte(byte)) + # Reset the input buffer. + self.input_buffer = b"" + self.input_buffer_pos = 0 + self.logger.log(1, "Reset input buffer.") + return + + # Keep handling the ESC sequence if we're in the middle of it. + if self.esc_state != 0: + self.HandleEsc(byte) + return + + # When we're at the end of the line, we should only allow going backwards, + # backspace, carriage return, up, or down. The arrow keys are escape + # sequences, so we let the escape...escape. + if self.input_buffer_pos >= self.line_limit and byte not in [ + ControlKey.CTRL_B, + ControlKey.ESC, + ControlKey.BACKSPACE, + ControlKey.CTRL_A, + ControlKey.CARRIAGE_RETURN, + ControlKey.CTRL_P, + ControlKey.CTRL_N, + ]: + return + + # If the input buffer is full we can't accept new chars. + buffer_full = len(self.input_buffer) >= self.line_limit + + # Carriage_Return/Enter + if byte == ControlKey.CARRIAGE_RETURN: + self.logger.debug("Enter key pressed.") + # Put a carriage return/newline and the print the prompt. + os.write(fd, b"\r\n") + + # TODO(aaboagye): When we control the printing of all output, print the + # prompt AFTER printing all the output. We can't do it yet because we + # don't know how much is coming from the EC. + + # Print the prompt. + os.write(fd, self.prompt) + # Process the input. + self.ProcessInput() + # Now, clear the buffer. + self.input_buffer = b"" + self.input_buffer_pos = 0 + # Reset history buffer pos. + self.history_pos = len(self.history) + # Clear partial command. + self.partial_cmd = b"" + + # Backspace + elif byte == ControlKey.BACKSPACE: + self.logger.debug("Backspace pressed.") + if self.input_buffer_pos > 0: + # Move left 1 column. + self.MoveCursor("left", 1) + # Remove the character at the input_buffer_pos by slicing it out. + self.SliceOutChar() + + self.logger.debug("input_buffer_pos: %d", self.input_buffer_pos) + + # Ctrl+A. Move cursor to beginning of the line + elif byte == ControlKey.CTRL_A: + self.logger.debug("Control+A pressed.") + self.MoveCursor("left", self.input_buffer_pos) + + # Ctrl+B. Move cursor left 1 column. + elif byte == ControlKey.CTRL_B: + self.logger.debug("Control+B pressed.") + self.MoveCursor("left", 1) + + # Ctrl+D. Delete a character. + elif byte == ControlKey.CTRL_D: + self.logger.debug("Control+D pressed.") + if self.input_buffer_pos != len(self.input_buffer): + # Remove the character by slicing it out. + self.SliceOutChar() + + # Ctrl+E. Move cursor to end of the line. + elif byte == ControlKey.CTRL_E: + self.logger.debug("Control+E pressed.") + self.MoveCursor("right", len(self.input_buffer) - self.input_buffer_pos) + + # Ctrl+F. Move cursor right 1 column. + elif byte == ControlKey.CTRL_F: + self.logger.debug("Control+F pressed.") + self.MoveCursor("right", 1) + + # Ctrl+K. Kill line. + elif byte == ControlKey.CTRL_K: + self.logger.debug("Control+K pressed.") + self.KillLine() + + # Ctrl+N. Next line. + elif byte == ControlKey.CTRL_N: + self.logger.debug("Control+N pressed.") + self.ShowNextCommand() + + # Ctrl+P. Previous line. + elif byte == ControlKey.CTRL_P: + self.logger.debug("Control+P pressed.") + self.ShowPreviousCommand() + + # ESC sequence + elif byte == ControlKey.ESC: + # Starting an ESC sequence + self.esc_state = EscState.ESC_START + + # Only print printable chars. + elif IsPrintable(byte): + # Drop the character if we're full. + if buffer_full: + self.logger.debug("Dropped char: %c(%d)", byte, byte) + return + # Print the character. + os.write(fd, six.int2byte(byte)) + # Print the rest of the line (if any). + extra_bytes_written = os.write( + fd, self.input_buffer[self.input_buffer_pos :] + ) + + # Recreate the input buffer. + self.input_buffer = ( + self.input_buffer[0 : self.input_buffer_pos] + + six.int2byte(byte) + + self.input_buffer[self.input_buffer_pos :] + ) + # Update the input buffer position. + self.input_buffer_pos += 1 + extra_bytes_written + + # Reset the cursor if we wrote any extra bytes. + if extra_bytes_written: + self.MoveCursor("left", extra_bytes_written) + + self.logger.debug("input_buffer_pos: %d", self.input_buffer_pos) + + def MoveCursor(self, direction, count): + """MoveCursor moves the cursor left or right by count columns. + + Args: + direction: A string that should be either 'left' or 'right' representing + the direction to move the cursor on the console. + count: An integer representing how many columns the cursor should be + moved. + + Raises: + AssertionError: If the direction is not equal to 'left' or 'right'. + """ + # If there's nothing to move, we're done. + if not count: + return + fd = self.controller_pty + seq = b"\033[" + str(count).encode("ascii") + if direction == "left": + # Bind the movement. + if count > self.input_buffer_pos: + count = self.input_buffer_pos + seq += b"D" + self.logger.debug("move cursor left %d", count) + self.input_buffer_pos -= count + + elif direction == "right": + # Bind the movement. + if (count + self.input_buffer_pos) > len(self.input_buffer): + count = 0 + seq += b"C" + self.logger.debug("move cursor right %d", count) + self.input_buffer_pos += count + + else: + raise AssertionError( + ("The only valid directions are 'left' and " "'right'") + ) + + self.logger.debug("input_buffer_pos: %d", self.input_buffer_pos) + # Move the cursor. + if count != 0: + os.write(fd, seq) + + def KillLine(self): + """Kill the rest of the line based on the input buffer position.""" + # Killing the line is killing all the text to the right. + diff = len(self.input_buffer) - self.input_buffer_pos + self.logger.debug("diff: %d", diff) + # Diff shouldn't be negative, but if it is for some reason, let's try to + # correct the cursor. + if diff < 0: + self.logger.warning( + "Resetting input buffer position to %d...", len(self.input_buffer) + ) + self.MoveCursor("left", -diff) + return + if diff: + self.MoveCursor("right", diff) + for _ in range(diff): + self.SendBackspace() + self.input_buffer_pos -= diff + self.input_buffer = self.input_buffer[0 : self.input_buffer_pos] + + def SendBackspace(self): + """Backspace a character on the console.""" + os.write(self.controller_pty, b"\033[1D \033[1D") + + def ProcessOOBMQueue(self): + """Retrieve an item from the OOBM queue and process it.""" + item = self.oobm_queue.get() + self.logger.debug("OOBM cmd: %r", item) + cmd = item.split(b" ") + + if cmd[0] == b"loglevel": + # An integer is required in order to set the log level. + if len(cmd) < 2: + self.logger.debug("Insufficient args") + self.PrintOOBMHelp() + return + try: + self.logger.debug("Log level change request.") + new_log_level = int(cmd[1]) + self.logger.logger.setLevel(new_log_level) + self.logger.info("Log level changed to %d.", new_log_level) + + # Forward the request to the interpreter as well. + self.cmd_pipe.send(item) + except ValueError: + # Ignoring the request if an integer was not provided. + self.PrintOOBMHelp() + + elif cmd[0] == b"timestamp": + mode = cmd[1].lower() + self.timestamp_enabled = mode == b"on" + self.logger.info( + "%sabling uart timestamps.", "En" if self.timestamp_enabled else "Dis" + ) + + elif cmd[0] == b"rawdebug": + mode = cmd[1].lower() + self.raw_debug = mode == b"on" + self.logger.info( + "%sabling per interrupt debug logs.", "En" if self.raw_debug else "Dis" + ) + + elif cmd[0] == b"interrogate" and len(cmd) >= 2: + enhanced = False + mode = cmd[1] + if len(cmd) >= 3 and cmd[2] == b"enhanced": + enhanced = True + + # Set the mode if correct. + if mode in INTERROGATION_MODES: + self.interrogation_mode = mode + self.logger.debug("Updated interrogation mode to %s.", mode) + + # Update the assumptions of the EC image. + self.enhanced_ec = enhanced + self.logger.debug("Enhanced EC image is now %r", self.enhanced_ec) + + # Send command to interpreter as well. + self.cmd_pipe.send(b"enhanced " + str(self.enhanced_ec).encode("ascii")) + else: + self.PrintOOBMHelp() + + else: + self.PrintOOBMHelp() + + def PrintOOBMHelp(self): + """Prints out the OOBM help.""" + # Print help syntax. + os.write(self.controller_pty, b"\r\n" + b"Known OOBM commands:\r\n") + os.write( + self.controller_pty, + b" interrogate <never | always | auto> " b"[enhanced]\r\n", + ) + os.write(self.controller_pty, b" loglevel <int>\r\n") + + def CheckBufferForEnhancedImage(self, data): + """Adds data to a look buffer and checks to see for enhanced EC image. + + The EC's console task prints a string upon initialization which says that + "Console is enabled; type HELP for help.". The enhanced EC images print a + different string as a part of their init. This function searches through a + "look" buffer, scanning for the presence of either of those strings and + updating the enhanced_ec state accordingly. + + Args: + data: A string containing the data sent from the interpreter. + """ + self.look_buffer += data + + # Search the buffer for any of the EC image strings. + enhanced_match = re.search(ENHANCED_IMAGE_RE, self.look_buffer) + non_enhanced_match = re.search(NON_ENHANCED_IMAGE_RE, self.look_buffer) + + # Update the state if any matches were found. + if enhanced_match or non_enhanced_match: + if enhanced_match: + self.enhanced_ec = True + elif non_enhanced_match: + self.enhanced_ec = False + + # Inform the interpreter of the result. + self.cmd_pipe.send(b"enhanced " + str(self.enhanced_ec).encode("ascii")) + self.logger.debug("Enhanced EC image? %r", self.enhanced_ec) + + # Clear look buffer since a match was found. + self.look_buffer = b"" + + # Move the sliding window. + self.look_buffer = self.look_buffer[-LOOK_BUFFER_SIZE:] - else: - self.logger.error('Unexpected sequence. %c', byte) - self.esc_state = 0 - - def ProcessInput(self): - """Captures the input determines what actions to take.""" - # There's nothing to do if the input buffer is empty. - if len(self.input_buffer) == 0: - return - - # Don't store 2 consecutive identical commands in the history. - if (self.history and self.history[-1] != self.input_buffer - or not self.history): - self.history.append(self.input_buffer) - - # Split the command up by spaces. - line = self.input_buffer.split(b' ') - self.logger.debug('cmd: %s', self.input_buffer) - cmd = line[0].lower() - - # The 'history' command is a special case that we handle locally. - if cmd == 'history': - self.PrintHistory() - return - - # Send the command to the interpreter. - self.logger.debug('Sending command to interpreter.') - self.cmd_pipe.send(self.input_buffer) - def CheckForEnhancedECImage(self): - """Performs an interrogation of the EC image. +def CanonicalizeTimeString(timestr): + """Canonicalize the timestamp string. - Send a SYN and expect an ACK. If no ACK or the response is incorrect, then - assume that the current EC image that we are talking to is not enhanced. + Args: + timestr: A timestamp string ended with 6 digits msec. Returns: - is_enhanced: A boolean indicating whether the EC responded to the - interrogation correctly. - - Raises: - EOFError: Allowed to propagate through from self.dbg_pipe.recv(). + A string with 3 digits msec and an extra space. """ - # Send interrogation byte and wait for the response. - self.logger.debug('Performing interrogation.') - self.cmd_pipe.send(interpreter.EC_SYN) - - response = '' - if self.dbg_pipe.poll(self.interrogation_timeout): - response = self.dbg_pipe.recv() - self.logger.debug('response: %r', binascii.hexlify(response)) - else: - self.logger.debug('Timed out waiting for EC_ACK') + return timestr[:-3].encode("ascii") + b" " - # Verify the acknowledgment. - is_enhanced = response == interpreter.EC_ACK - - if is_enhanced: - # Increase the interrogation timeout for stability purposes. - self.interrogation_timeout = ENHANCED_EC_INTERROGATION_TIMEOUT - self.logger.debug('Increasing interrogation timeout to %rs.', - self.interrogation_timeout) - else: - # Reduce the timeout in order to reduce the perceivable delay. - self.interrogation_timeout = NON_ENHANCED_EC_INTERROGATION_TIMEOUT - self.logger.debug('Reducing interrogation timeout to %rs.', - self.interrogation_timeout) - - return is_enhanced - - def HandleChar(self, byte): - """HandleChar does a certain action when it receives a character. - - Args: - byte: An integer representing the character received from the user. - Raises: - EOFError: Allowed to propagate through from self.CheckForEnhancedECImage() - i.e. from self.dbg_pipe.recv(). - """ - fd = self.controller_pty - - # Enter the OOBM prompt mode if the user presses '%'. - if byte == ord('%'): - self.logger.debug('Begin OOBM command.') - self.receiving_oobm_cmd = True - # Print a "prompt". - os.write(self.controller_pty, b'\r\n% ') - return - - # Add chars to the pending OOBM command if we're currently receiving one. - if self.receiving_oobm_cmd and byte != ControlKey.CARRIAGE_RETURN: - tmp_bytes = six.int2byte(byte) - self.pending_oobm_cmd += tmp_bytes - self.logger.debug('%s', tmp_bytes) - os.write(self.controller_pty, tmp_bytes) - return - - if byte == ControlKey.CARRIAGE_RETURN: - if self.receiving_oobm_cmd: - # Terminate the command and place it in the OOBM queue. - self.logger.debug('End OOBM command.') - if self.pending_oobm_cmd: - self.oobm_queue.put(self.pending_oobm_cmd) - self.logger.debug('Placed %r into OOBM command queue.', - self.pending_oobm_cmd) - - # Reset the state. - os.write(self.controller_pty, b'\r\n' + self.prompt) - self.input_buffer = b'' - self.input_buffer_pos = 0 - self.receiving_oobm_cmd = False - self.pending_oobm_cmd = b'' - return - - if self.interrogation_mode == b'never': - self.logger.debug('Skipping interrogation because interrogation mode' - ' is set to never.') - elif self.interrogation_mode == b'always': - # Only interrogate the EC if the interrogation mode is set to 'always'. - self.enhanced_ec = self.CheckForEnhancedECImage() - self.logger.debug('Enhanced EC image? %r', self.enhanced_ec) - - if not self.enhanced_ec: - # Send everything straight to the EC to handle. - self.cmd_pipe.send(six.int2byte(byte)) - # Reset the input buffer. - self.input_buffer = b'' - self.input_buffer_pos = 0 - self.logger.log(1, 'Reset input buffer.') - return - - # Keep handling the ESC sequence if we're in the middle of it. - if self.esc_state != 0: - self.HandleEsc(byte) - return - - # When we're at the end of the line, we should only allow going backwards, - # backspace, carriage return, up, or down. The arrow keys are escape - # sequences, so we let the escape...escape. - if (self.input_buffer_pos >= self.line_limit and - byte not in [ControlKey.CTRL_B, ControlKey.ESC, ControlKey.BACKSPACE, - ControlKey.CTRL_A, ControlKey.CARRIAGE_RETURN, - ControlKey.CTRL_P, ControlKey.CTRL_N]): - return - - # If the input buffer is full we can't accept new chars. - buffer_full = len(self.input_buffer) >= self.line_limit - - - # Carriage_Return/Enter - if byte == ControlKey.CARRIAGE_RETURN: - self.logger.debug('Enter key pressed.') - # Put a carriage return/newline and the print the prompt. - os.write(fd, b'\r\n') - - # TODO(aaboagye): When we control the printing of all output, print the - # prompt AFTER printing all the output. We can't do it yet because we - # don't know how much is coming from the EC. - - # Print the prompt. - os.write(fd, self.prompt) - # Process the input. - self.ProcessInput() - # Now, clear the buffer. - self.input_buffer = b'' - self.input_buffer_pos = 0 - # Reset history buffer pos. - self.history_pos = len(self.history) - # Clear partial command. - self.partial_cmd = b'' - - # Backspace - elif byte == ControlKey.BACKSPACE: - self.logger.debug('Backspace pressed.') - if self.input_buffer_pos > 0: - # Move left 1 column. - self.MoveCursor('left', 1) - # Remove the character at the input_buffer_pos by slicing it out. - self.SliceOutChar() - - self.logger.debug('input_buffer_pos: %d', self.input_buffer_pos) - - # Ctrl+A. Move cursor to beginning of the line - elif byte == ControlKey.CTRL_A: - self.logger.debug('Control+A pressed.') - self.MoveCursor('left', self.input_buffer_pos) - - # Ctrl+B. Move cursor left 1 column. - elif byte == ControlKey.CTRL_B: - self.logger.debug('Control+B pressed.') - self.MoveCursor('left', 1) - - # Ctrl+D. Delete a character. - elif byte == ControlKey.CTRL_D: - self.logger.debug('Control+D pressed.') - if self.input_buffer_pos != len(self.input_buffer): - # Remove the character by slicing it out. - self.SliceOutChar() - - # Ctrl+E. Move cursor to end of the line. - elif byte == ControlKey.CTRL_E: - self.logger.debug('Control+E pressed.') - self.MoveCursor('right', - len(self.input_buffer) - self.input_buffer_pos) - - # Ctrl+F. Move cursor right 1 column. - elif byte == ControlKey.CTRL_F: - self.logger.debug('Control+F pressed.') - self.MoveCursor('right', 1) - - # Ctrl+K. Kill line. - elif byte == ControlKey.CTRL_K: - self.logger.debug('Control+K pressed.') - self.KillLine() - - # Ctrl+N. Next line. - elif byte == ControlKey.CTRL_N: - self.logger.debug('Control+N pressed.') - self.ShowNextCommand() - - # Ctrl+P. Previous line. - elif byte == ControlKey.CTRL_P: - self.logger.debug('Control+P pressed.') - self.ShowPreviousCommand() - - # ESC sequence - elif byte == ControlKey.ESC: - # Starting an ESC sequence - self.esc_state = EscState.ESC_START - - # Only print printable chars. - elif IsPrintable(byte): - # Drop the character if we're full. - if buffer_full: - self.logger.debug('Dropped char: %c(%d)', byte, byte) - return - # Print the character. - os.write(fd, six.int2byte(byte)) - # Print the rest of the line (if any). - extra_bytes_written = os.write(fd, - self.input_buffer[self.input_buffer_pos:]) - - # Recreate the input buffer. - self.input_buffer = (self.input_buffer[0:self.input_buffer_pos] + - six.int2byte(byte) + - self.input_buffer[self.input_buffer_pos:]) - # Update the input buffer position. - self.input_buffer_pos += 1 + extra_bytes_written - - # Reset the cursor if we wrote any extra bytes. - if extra_bytes_written: - self.MoveCursor('left', extra_bytes_written) - - self.logger.debug('input_buffer_pos: %d', self.input_buffer_pos) - - def MoveCursor(self, direction, count): - """MoveCursor moves the cursor left or right by count columns. +def IsPrintable(byte): + """Determines if a byte is printable. Args: - direction: A string that should be either 'left' or 'right' representing - the direction to move the cursor on the console. - count: An integer representing how many columns the cursor should be - moved. + byte: An integer potentially representing a printable character. - Raises: - AssertionError: If the direction is not equal to 'left' or 'right'. + Returns: + A boolean indicating whether the byte is a printable character. """ - # If there's nothing to move, we're done. - if not count: - return - fd = self.controller_pty - seq = b'\033[' + str(count).encode('ascii') - if direction == 'left': - # Bind the movement. - if count > self.input_buffer_pos: - count = self.input_buffer_pos - seq += b'D' - self.logger.debug('move cursor left %d', count) - self.input_buffer_pos -= count - - elif direction == 'right': - # Bind the movement. - if (count + self.input_buffer_pos) > len(self.input_buffer): - count = 0 - seq += b'C' - self.logger.debug('move cursor right %d', count) - self.input_buffer_pos += count - - else: - raise AssertionError(('The only valid directions are \'left\' and ' - '\'right\'')) - - self.logger.debug('input_buffer_pos: %d', self.input_buffer_pos) - # Move the cursor. - if count != 0: - os.write(fd, seq) - - def KillLine(self): - """Kill the rest of the line based on the input buffer position.""" - # Killing the line is killing all the text to the right. - diff = len(self.input_buffer) - self.input_buffer_pos - self.logger.debug('diff: %d', diff) - # Diff shouldn't be negative, but if it is for some reason, let's try to - # correct the cursor. - if diff < 0: - self.logger.warning('Resetting input buffer position to %d...', - len(self.input_buffer)) - self.MoveCursor('left', -diff) - return - if diff: - self.MoveCursor('right', diff) - for _ in range(diff): - self.SendBackspace() - self.input_buffer_pos -= diff - self.input_buffer = self.input_buffer[0:self.input_buffer_pos] - - def SendBackspace(self): - """Backspace a character on the console.""" - os.write(self.controller_pty, b'\033[1D \033[1D') - - def ProcessOOBMQueue(self): - """Retrieve an item from the OOBM queue and process it.""" - item = self.oobm_queue.get() - self.logger.debug('OOBM cmd: %r', item) - cmd = item.split(b' ') - - if cmd[0] == b'loglevel': - # An integer is required in order to set the log level. - if len(cmd) < 2: - self.logger.debug('Insufficient args') - self.PrintOOBMHelp() - return - try: - self.logger.debug('Log level change request.') - new_log_level = int(cmd[1]) - self.logger.logger.setLevel(new_log_level) - self.logger.info('Log level changed to %d.', new_log_level) - - # Forward the request to the interpreter as well. - self.cmd_pipe.send(item) - except ValueError: - # Ignoring the request if an integer was not provided. - self.PrintOOBMHelp() - - elif cmd[0] == b'timestamp': - mode = cmd[1].lower() - self.timestamp_enabled = (mode == b'on') - self.logger.info('%sabling uart timestamps.', - 'En' if self.timestamp_enabled else 'Dis') - - elif cmd[0] == b'rawdebug': - mode = cmd[1].lower() - self.raw_debug = (mode == b'on') - self.logger.info('%sabling per interrupt debug logs.', - 'En' if self.raw_debug else 'Dis') - - elif cmd[0] == b'interrogate' and len(cmd) >= 2: - enhanced = False - mode = cmd[1] - if len(cmd) >= 3 and cmd[2] == b'enhanced': - enhanced = True - - # Set the mode if correct. - if mode in INTERROGATION_MODES: - self.interrogation_mode = mode - self.logger.debug('Updated interrogation mode to %s.', mode) - - # Update the assumptions of the EC image. - self.enhanced_ec = enhanced - self.logger.debug('Enhanced EC image is now %r', self.enhanced_ec) - - # Send command to interpreter as well. - self.cmd_pipe.send(b'enhanced ' + str(self.enhanced_ec).encode('ascii')) - else: - self.PrintOOBMHelp() - - else: - self.PrintOOBMHelp() + return byte >= ord(" ") and byte <= ord("~") - def PrintOOBMHelp(self): - """Prints out the OOBM help.""" - # Print help syntax. - os.write(self.controller_pty, b'\r\n' + b'Known OOBM commands:\r\n') - os.write(self.controller_pty, b' interrogate <never | always | auto> ' - b'[enhanced]\r\n') - os.write(self.controller_pty, b' loglevel <int>\r\n') - def CheckBufferForEnhancedImage(self, data): - """Adds data to a look buffer and checks to see for enhanced EC image. - - The EC's console task prints a string upon initialization which says that - "Console is enabled; type HELP for help.". The enhanced EC images print a - different string as a part of their init. This function searches through a - "look" buffer, scanning for the presence of either of those strings and - updating the enhanced_ec state accordingly. +def StartLoop(console, command_active, shutdown_pipe=None): + """Starts the infinite loop of console processing. Args: - data: A string containing the data sent from the interpreter. + console: A Console object that has been properly initialzed. + command_active: ctypes data object or multiprocessing.Value indicating if + servod owns the console, or user owns the console. This prevents input + collisions. + shutdown_pipe: A file object for a pipe or equivalent that becomes readable + (not blocked) to indicate that the loop should exit. Can be None to never + exit the loop. """ - self.look_buffer += data - - # Search the buffer for any of the EC image strings. - enhanced_match = re.search(ENHANCED_IMAGE_RE, self.look_buffer) - non_enhanced_match = re.search(NON_ENHANCED_IMAGE_RE, self.look_buffer) - - # Update the state if any matches were found. - if enhanced_match or non_enhanced_match: - if enhanced_match: - self.enhanced_ec = True - elif non_enhanced_match: - self.enhanced_ec = False - - # Inform the interpreter of the result. - self.cmd_pipe.send(b'enhanced ' + str(self.enhanced_ec).encode('ascii')) - self.logger.debug('Enhanced EC image? %r', self.enhanced_ec) - - # Clear look buffer since a match was found. - self.look_buffer = b'' - - # Move the sliding window. - self.look_buffer = self.look_buffer[-LOOK_BUFFER_SIZE:] - - -def CanonicalizeTimeString(timestr): - """Canonicalize the timestamp string. - - Args: - timestr: A timestamp string ended with 6 digits msec. - - Returns: - A string with 3 digits msec and an extra space. - """ - return timestr[:-3].encode('ascii') + b' ' - - -def IsPrintable(byte): - """Determines if a byte is printable. - - Args: - byte: An integer potentially representing a printable character. - - Returns: - A boolean indicating whether the byte is a printable character. - """ - return byte >= ord(' ') and byte <= ord('~') - - -def StartLoop(console, command_active, shutdown_pipe=None): - """Starts the infinite loop of console processing. - - Args: - console: A Console object that has been properly initialzed. - command_active: ctypes data object or multiprocessing.Value indicating if - servod owns the console, or user owns the console. This prevents input - collisions. - shutdown_pipe: A file object for a pipe or equivalent that becomes readable - (not blocked) to indicate that the loop should exit. Can be None to never - exit the loop. - """ - try: - console.logger.debug('Console is being served on %s.', console.user_pty) - console.logger.debug('Console controller is on %s.', console.controller_pty) - console.logger.debug('Command interface is being served on %s.', - console.interface_pty) - console.logger.debug(console) - - # This checks for HUP to indicate if the user has connected to the pty. - ep = select.epoll() - ep.register(console.controller_pty, select.EPOLLHUP) - - # This is used instead of "break" to avoid exiting the loop in the middle of - # an iteration. - continue_looping = True - - # Used for determining when to print host timestamps - tm_req = True - - while continue_looping: - # Check to see if pts is connected to anything - events = ep.poll(0) - controller_connected = not events - - # Check to see if pipes or the console are ready for reading. - read_list = [console.interface_pty, - console.cmd_pipe, console.dbg_pipe] - if controller_connected: - read_list.append(console.controller_pty) - if shutdown_pipe is not None: - read_list.append(shutdown_pipe) - - # Check if any input is ready, or wait for .1 sec and re-poll if - # a user has connected to the pts. - select_output = select.select(read_list, [], [], .1) - if not select_output: - continue - ready_for_reading = select_output[0] - - for obj in ready_for_reading: - if obj is console.controller_pty: - if not command_active.value: - # Convert to bytes so we can look for non-printable chars such as - # Ctrl+A, Ctrl+E, etc. - try: - line = bytearray(os.read(console.controller_pty, CONSOLE_MAX_READ)) - console.logger.debug('Input from user: %s, locked:%s', - str(line).strip(), command_active.value) - for i in line: - try: - # Handle each character as it arrives. - console.HandleChar(i) - except EOFError: - console.logger.debug( - 'ec3po console received EOF from dbg_pipe in HandleChar()' - ' while reading console.controller_pty') - continue_looping = False - break - except OSError: - console.logger.debug('Ptm read failed, probably user disconnect.') - - elif obj is console.interface_pty: - if command_active.value: - # Convert to bytes so we can look for non-printable chars such as - # Ctrl+A, Ctrl+E, etc. - line = bytearray(os.read(console.interface_pty, CONSOLE_MAX_READ)) - console.logger.debug('Input from interface: %s, locked:%s', - str(line).strip(), command_active.value) - for i in line: - try: - # Handle each character as it arrives. - console.HandleChar(i) - except EOFError: - console.logger.debug( - 'ec3po console received EOF from dbg_pipe in HandleChar()' - ' while reading console.interface_pty') - continue_looping = False - break - - elif obj is console.cmd_pipe: - try: - data = console.cmd_pipe.recv() - except EOFError: - console.logger.debug('ec3po console received EOF from cmd_pipe') - continue_looping = False - else: - # Write it to the user console. - if console.raw_debug: - console.logger.debug('|CMD|-%s->%r', - ('u' if controller_connected else '') + - ('i' if command_active.value else ''), - data.strip()) + try: + console.logger.debug("Console is being served on %s.", console.user_pty) + console.logger.debug("Console controller is on %s.", console.controller_pty) + console.logger.debug( + "Command interface is being served on %s.", console.interface_pty + ) + console.logger.debug(console) + + # This checks for HUP to indicate if the user has connected to the pty. + ep = select.epoll() + ep.register(console.controller_pty, select.EPOLLHUP) + + # This is used instead of "break" to avoid exiting the loop in the middle of + # an iteration. + continue_looping = True + + # Used for determining when to print host timestamps + tm_req = True + + while continue_looping: + # Check to see if pts is connected to anything + events = ep.poll(0) + controller_connected = not events + + # Check to see if pipes or the console are ready for reading. + read_list = [console.interface_pty, console.cmd_pipe, console.dbg_pipe] if controller_connected: - os.write(console.controller_pty, data) - if command_active.value: - os.write(console.interface_pty, data) - - elif obj is console.dbg_pipe: - try: - data = console.dbg_pipe.recv() - except EOFError: - console.logger.debug('ec3po console received EOF from dbg_pipe') - continue_looping = False - else: - if console.interrogation_mode == b'auto': - # Search look buffer for enhanced EC image string. - console.CheckBufferForEnhancedImage(data) - # Write it to the user console. - if len(data) > 1 and console.raw_debug: - console.logger.debug('|DBG|-%s->%r', - ('u' if controller_connected else '') + - ('i' if command_active.value else ''), - data.strip()) - console.LogConsoleOutput(data) - if controller_connected: - end = len(data) - 1 - if console.timestamp_enabled: - # A timestamp is required at the beginning of this line - if tm_req is True: - now = datetime.now() - tm = CanonicalizeTimeString(now.strftime(HOST_STRFTIME)) - os.write(console.controller_pty, tm) - tm_req = False - - # Insert timestamps into the middle where appropriate - # except if the last character is a newline - nls_found = data.count(b'\n', 0, end) - now = datetime.now() - tm = CanonicalizeTimeString(now.strftime('\n' + HOST_STRFTIME)) - data_tm = data.replace(b'\n', tm, nls_found) - else: - data_tm = data - - # timestamp required on next input - if data[end] == b'\n'[0]: - tm_req = True - os.write(console.controller_pty, data_tm) - if command_active.value: - os.write(console.interface_pty, data) - - elif obj is shutdown_pipe: - console.logger.debug( - 'ec3po console received shutdown pipe unblocked notification') - continue_looping = False - - while not console.oobm_queue.empty(): - console.logger.debug('OOBM queue ready for reading.') - console.ProcessOOBMQueue() - - except KeyboardInterrupt: - pass - - finally: - ep.unregister(console.controller_pty) - console.dbg_pipe.close() - console.cmd_pipe.close() - os.close(console.controller_pty) - os.close(console.interface_pty) - if shutdown_pipe is not None: - shutdown_pipe.close() - console.logger.debug('Exit ec3po console loop for %s', console.user_pty) + read_list.append(console.controller_pty) + if shutdown_pipe is not None: + read_list.append(shutdown_pipe) + + # Check if any input is ready, or wait for .1 sec and re-poll if + # a user has connected to the pts. + select_output = select.select(read_list, [], [], 0.1) + if not select_output: + continue + ready_for_reading = select_output[0] + + for obj in ready_for_reading: + if obj is console.controller_pty: + if not command_active.value: + # Convert to bytes so we can look for non-printable chars such as + # Ctrl+A, Ctrl+E, etc. + try: + line = bytearray( + os.read(console.controller_pty, CONSOLE_MAX_READ) + ) + console.logger.debug( + "Input from user: %s, locked:%s", + str(line).strip(), + command_active.value, + ) + for i in line: + try: + # Handle each character as it arrives. + console.HandleChar(i) + except EOFError: + console.logger.debug( + "ec3po console received EOF from dbg_pipe in HandleChar()" + " while reading console.controller_pty" + ) + continue_looping = False + break + except OSError: + console.logger.debug( + "Ptm read failed, probably user disconnect." + ) + + elif obj is console.interface_pty: + if command_active.value: + # Convert to bytes so we can look for non-printable chars such as + # Ctrl+A, Ctrl+E, etc. + line = bytearray( + os.read(console.interface_pty, CONSOLE_MAX_READ) + ) + console.logger.debug( + "Input from interface: %s, locked:%s", + str(line).strip(), + command_active.value, + ) + for i in line: + try: + # Handle each character as it arrives. + console.HandleChar(i) + except EOFError: + console.logger.debug( + "ec3po console received EOF from dbg_pipe in HandleChar()" + " while reading console.interface_pty" + ) + continue_looping = False + break + + elif obj is console.cmd_pipe: + try: + data = console.cmd_pipe.recv() + except EOFError: + console.logger.debug("ec3po console received EOF from cmd_pipe") + continue_looping = False + else: + # Write it to the user console. + if console.raw_debug: + console.logger.debug( + "|CMD|-%s->%r", + ("u" if controller_connected else "") + + ("i" if command_active.value else ""), + data.strip(), + ) + if controller_connected: + os.write(console.controller_pty, data) + if command_active.value: + os.write(console.interface_pty, data) + + elif obj is console.dbg_pipe: + try: + data = console.dbg_pipe.recv() + except EOFError: + console.logger.debug("ec3po console received EOF from dbg_pipe") + continue_looping = False + else: + if console.interrogation_mode == b"auto": + # Search look buffer for enhanced EC image string. + console.CheckBufferForEnhancedImage(data) + # Write it to the user console. + if len(data) > 1 and console.raw_debug: + console.logger.debug( + "|DBG|-%s->%r", + ("u" if controller_connected else "") + + ("i" if command_active.value else ""), + data.strip(), + ) + console.LogConsoleOutput(data) + if controller_connected: + end = len(data) - 1 + if console.timestamp_enabled: + # A timestamp is required at the beginning of this line + if tm_req is True: + now = datetime.now() + tm = CanonicalizeTimeString( + now.strftime(HOST_STRFTIME) + ) + os.write(console.controller_pty, tm) + tm_req = False + + # Insert timestamps into the middle where appropriate + # except if the last character is a newline + nls_found = data.count(b"\n", 0, end) + now = datetime.now() + tm = CanonicalizeTimeString( + now.strftime("\n" + HOST_STRFTIME) + ) + data_tm = data.replace(b"\n", tm, nls_found) + else: + data_tm = data + + # timestamp required on next input + if data[end] == b"\n"[0]: + tm_req = True + os.write(console.controller_pty, data_tm) + if command_active.value: + os.write(console.interface_pty, data) + + elif obj is shutdown_pipe: + console.logger.debug( + "ec3po console received shutdown pipe unblocked notification" + ) + continue_looping = False + + while not console.oobm_queue.empty(): + console.logger.debug("OOBM queue ready for reading.") + console.ProcessOOBMQueue() + + except KeyboardInterrupt: + pass + + finally: + ep.unregister(console.controller_pty) + console.dbg_pipe.close() + console.cmd_pipe.close() + os.close(console.controller_pty) + os.close(console.interface_pty) + if shutdown_pipe is not None: + shutdown_pipe.close() + console.logger.debug("Exit ec3po console loop for %s", console.user_pty) def main(argv): - """Kicks off the EC-3PO interactive console interface and interpreter. - - We create some pipes to communicate with an interpreter, instantiate an - interpreter, create a PTY pair, and begin serving the console interface. - - Args: - argv: A list of strings containing the arguments this module was called - with. - """ - # Set up argument parser. - parser = argparse.ArgumentParser(description=('Start interactive EC console ' - 'and interpreter.')) - parser.add_argument('ec_uart_pty', - help=('The full PTY name that the EC UART' - ' is present on. eg: /dev/pts/12')) - parser.add_argument('--log-level', - default='info', - help='info, debug, warning, error, or critical') - - # Parse arguments. - opts = parser.parse_args(argv) - - # Set logging level. - opts.log_level = opts.log_level.lower() - if opts.log_level == 'info': - log_level = logging.INFO - elif opts.log_level == 'debug': - log_level = logging.DEBUG - elif opts.log_level == 'warning': - log_level = logging.WARNING - elif opts.log_level == 'error': - log_level = logging.ERROR - elif opts.log_level == 'critical': - log_level = logging.CRITICAL - else: - parser.error('Invalid log level. (info, debug, warning, error, critical)') - - # Start logging with a timestamp, module, and log level shown in each log - # entry. - logging.basicConfig(level=log_level, format=('%(asctime)s - %(module)s -' - ' %(levelname)s - %(message)s')) - - # Create some pipes to communicate between the interpreter and the console. - # The command pipe is bidirectional. - cmd_pipe_interactive, cmd_pipe_interp = threadproc_shim.Pipe() - # The debug pipe is unidirectional from interpreter to console only. - dbg_pipe_interactive, dbg_pipe_interp = threadproc_shim.Pipe(duplex=False) - - # Create an interpreter instance. - itpr = interpreter.Interpreter(opts.ec_uart_pty, cmd_pipe_interp, - dbg_pipe_interp, log_level) - - # Spawn an interpreter process. - itpr_process = threadproc_shim.ThreadOrProcess( - target=interpreter.StartLoop, args=(itpr,)) - # Make sure to kill the interpreter when we terminate. - itpr_process.daemon = True - # Start the interpreter. - itpr_process.start() - - # Open a new pseudo-terminal pair - (controller_pty, user_pty) = pty.openpty() - # Set the permissions to 660. - os.chmod(os.ttyname(user_pty), (stat.S_IRGRP | stat.S_IWGRP | - stat.S_IRUSR | stat.S_IWUSR)) - # Create a console. - console = Console(controller_pty, os.ttyname(user_pty), cmd_pipe_interactive, - dbg_pipe_interactive) - # Start serving the console. - v = threadproc_shim.Value(ctypes.c_bool, False) - StartLoop(console, v) - - -if __name__ == '__main__': - main(sys.argv[1:]) + """Kicks off the EC-3PO interactive console interface and interpreter. + + We create some pipes to communicate with an interpreter, instantiate an + interpreter, create a PTY pair, and begin serving the console interface. + + Args: + argv: A list of strings containing the arguments this module was called + with. + """ + # Set up argument parser. + parser = argparse.ArgumentParser( + description=("Start interactive EC console " "and interpreter.") + ) + parser.add_argument( + "ec_uart_pty", + help=("The full PTY name that the EC UART" " is present on. eg: /dev/pts/12"), + ) + parser.add_argument( + "--log-level", default="info", help="info, debug, warning, error, or critical" + ) + + # Parse arguments. + opts = parser.parse_args(argv) + + # Set logging level. + opts.log_level = opts.log_level.lower() + if opts.log_level == "info": + log_level = logging.INFO + elif opts.log_level == "debug": + log_level = logging.DEBUG + elif opts.log_level == "warning": + log_level = logging.WARNING + elif opts.log_level == "error": + log_level = logging.ERROR + elif opts.log_level == "critical": + log_level = logging.CRITICAL + else: + parser.error("Invalid log level. (info, debug, warning, error, critical)") + + # Start logging with a timestamp, module, and log level shown in each log + # entry. + logging.basicConfig( + level=log_level, + format=("%(asctime)s - %(module)s -" " %(levelname)s - %(message)s"), + ) + + # Create some pipes to communicate between the interpreter and the console. + # The command pipe is bidirectional. + cmd_pipe_interactive, cmd_pipe_interp = threadproc_shim.Pipe() + # The debug pipe is unidirectional from interpreter to console only. + dbg_pipe_interactive, dbg_pipe_interp = threadproc_shim.Pipe(duplex=False) + + # Create an interpreter instance. + itpr = interpreter.Interpreter( + opts.ec_uart_pty, cmd_pipe_interp, dbg_pipe_interp, log_level + ) + + # Spawn an interpreter process. + itpr_process = threadproc_shim.ThreadOrProcess( + target=interpreter.StartLoop, args=(itpr,) + ) + # Make sure to kill the interpreter when we terminate. + itpr_process.daemon = True + # Start the interpreter. + itpr_process.start() + + # Open a new pseudo-terminal pair + (controller_pty, user_pty) = pty.openpty() + # Set the permissions to 660. + os.chmod( + os.ttyname(user_pty), + (stat.S_IRGRP | stat.S_IWGRP | stat.S_IRUSR | stat.S_IWUSR), + ) + # Create a console. + console = Console( + controller_pty, os.ttyname(user_pty), cmd_pipe_interactive, dbg_pipe_interactive + ) + # Start serving the console. + v = threadproc_shim.Value(ctypes.c_bool, False) + StartLoop(console, v) + + +if __name__ == "__main__": + main(sys.argv[1:]) diff --git a/util/ec3po/console_unittest.py b/util/ec3po/console_unittest.py index 7e341e7e8d..41ae324ef4 100755 --- a/util/ec3po/console_unittest.py +++ b/util/ec3po/console_unittest.py @@ -11,1262 +11,1310 @@ from __future__ import print_function import binascii import logging -import mock import tempfile import unittest +import mock import six - -from ec3po import console -from ec3po import interpreter -from ec3po import threadproc_shim +from ec3po import console, interpreter, threadproc_shim ESC_STRING = six.int2byte(console.ControlKey.ESC) + class Keys(object): - """A class that contains the escape sequences for special keys.""" - LEFT_ARROW = [console.ControlKey.ESC, ord('['), ord('D')] - RIGHT_ARROW = [console.ControlKey.ESC, ord('['), ord('C')] - UP_ARROW = [console.ControlKey.ESC, ord('['), ord('A')] - DOWN_ARROW = [console.ControlKey.ESC, ord('['), ord('B')] - HOME = [console.ControlKey.ESC, ord('['), ord('1'), ord('~')] - END = [console.ControlKey.ESC, ord('['), ord('8'), ord('~')] - DEL = [console.ControlKey.ESC, ord('['), ord('3'), ord('~')] + """A class that contains the escape sequences for special keys.""" + + LEFT_ARROW = [console.ControlKey.ESC, ord("["), ord("D")] + RIGHT_ARROW = [console.ControlKey.ESC, ord("["), ord("C")] + UP_ARROW = [console.ControlKey.ESC, ord("["), ord("A")] + DOWN_ARROW = [console.ControlKey.ESC, ord("["), ord("B")] + HOME = [console.ControlKey.ESC, ord("["), ord("1"), ord("~")] + END = [console.ControlKey.ESC, ord("["), ord("8"), ord("~")] + DEL = [console.ControlKey.ESC, ord("["), ord("3"), ord("~")] + class OutputStream(object): - """A class that has methods which return common console output.""" + """A class that has methods which return common console output.""" - @staticmethod - def MoveCursorLeft(count): - """Produces what would be printed to the console if the cursor moved left. + @staticmethod + def MoveCursorLeft(count): + """Produces what would be printed to the console if the cursor moved left. - Args: - count: An integer representing how many columns to move left. + Args: + count: An integer representing how many columns to move left. - Returns: - string: A string which contains what would be printed to the console if - the cursor moved left. - """ - string = ESC_STRING - string += b'[' + str(count).encode('ascii') + b'D' - return string + Returns: + string: A string which contains what would be printed to the console if + the cursor moved left. + """ + string = ESC_STRING + string += b"[" + str(count).encode("ascii") + b"D" + return string - @staticmethod - def MoveCursorRight(count): - """Produces what would be printed to the console if the cursor moved right. + @staticmethod + def MoveCursorRight(count): + """Produces what would be printed to the console if the cursor moved right. - Args: - count: An integer representing how many columns to move right. + Args: + count: An integer representing how many columns to move right. - Returns: - string: A string which contains what would be printed to the console if - the cursor moved right. - """ - string = ESC_STRING - string += b'[' + str(count).encode('ascii') + b'C' - return string + Returns: + string: A string which contains what would be printed to the console if + the cursor moved right. + """ + string = ESC_STRING + string += b"[" + str(count).encode("ascii") + b"C" + return string -BACKSPACE_STRING = b'' + +BACKSPACE_STRING = b"" # Move cursor left 1 column. BACKSPACE_STRING += OutputStream.MoveCursorLeft(1) # Write a space. -BACKSPACE_STRING += b' ' +BACKSPACE_STRING += b" " # Move cursor left 1 column. BACKSPACE_STRING += OutputStream.MoveCursorLeft(1) + def BytesToByteList(string): - """Converts a bytes string to list of bytes. + """Converts a bytes string to list of bytes. + + Args: + string: A literal bytes to turn into a list of bytes. - Args: - string: A literal bytes to turn into a list of bytes. + Returns: + A list of integers representing the byte value of each character in the + string. + """ + if six.PY3: + return [c for c in string] + return [ord(c) for c in string] - Returns: - A list of integers representing the byte value of each character in the - string. - """ - if six.PY3: - return [c for c in string] - return [ord(c) for c in string] def CheckConsoleOutput(test_case, exp_console_out): - """Verify what was sent out the console matches what we expect. + """Verify what was sent out the console matches what we expect. - Args: - test_case: A unittest.TestCase object representing the current unit test. - exp_console_out: A string representing the console output stream. - """ - # Read what was sent out the console. - test_case.tempfile.seek(0) - console_out = test_case.tempfile.read() + Args: + test_case: A unittest.TestCase object representing the current unit test. + exp_console_out: A string representing the console output stream. + """ + # Read what was sent out the console. + test_case.tempfile.seek(0) + console_out = test_case.tempfile.read() - test_case.assertEqual(exp_console_out, console_out) + test_case.assertEqual(exp_console_out, console_out) -def CheckInputBuffer(test_case, exp_input_buffer): - """Verify that the input buffer contains what we expect. - - Args: - test_case: A unittest.TestCase object representing the current unit test. - exp_input_buffer: A string containing the contents of the current input - buffer. - """ - test_case.assertEqual(exp_input_buffer, test_case.console.input_buffer, - (b'input buffer does not match expected.\n' - b'expected: |' + exp_input_buffer + b'|\n' - b'got: |' + test_case.console.input_buffer + - b'|\n' + str(test_case.console).encode('ascii'))) -def CheckInputBufferPosition(test_case, exp_pos): - """Verify the input buffer position. +def CheckInputBuffer(test_case, exp_input_buffer): + """Verify that the input buffer contains what we expect. - Args: - test_case: A unittest.TestCase object representing the current unit test. - exp_pos: An integer representing the expected input buffer position. - """ - test_case.assertEqual(exp_pos, test_case.console.input_buffer_pos, - 'input buffer position is incorrect.\ngot: ' + - str(test_case.console.input_buffer_pos) + '\nexp: ' + - str(exp_pos) + '\n' + str(test_case.console)) + Args: + test_case: A unittest.TestCase object representing the current unit test. + exp_input_buffer: A string containing the contents of the current input + buffer. + """ + test_case.assertEqual( + exp_input_buffer, + test_case.console.input_buffer, + ( + b"input buffer does not match expected.\n" + b"expected: |" + exp_input_buffer + b"|\n" + b"got: |" + + test_case.console.input_buffer + + b"|\n" + + str(test_case.console).encode("ascii") + ), + ) -def CheckHistoryBuffer(test_case, exp_history): - """Verify that the items in the history buffer are what we expect. - - Args: - test_case: A unittest.TestCase object representing the current unit test. - exp_history: A list of strings representing the expected contents of the - history buffer. - """ - # First, check to see if the length is what we expect. - test_case.assertEqual(len(exp_history), len(test_case.console.history), - ('The number of items in the history is unexpected.\n' - 'exp: ' + str(len(exp_history)) + '\n' - 'got: ' + str(len(test_case.console.history)) + '\n' - 'internal state:\n' + str(test_case.console))) - - # Next, check the actual contents of the history buffer. - for i in range(len(exp_history)): - test_case.assertEqual(exp_history[i], test_case.console.history[i], - (b'history buffer contents are incorrect.\n' - b'exp: ' + exp_history[i] + b'\n' - b'got: ' + test_case.console.history[i] + b'\n' - b'internal state:\n' + - str(test_case.console).encode('ascii'))) +def CheckInputBufferPosition(test_case, exp_pos): + """Verify the input buffer position. -class TestConsoleEditingMethods(unittest.TestCase): - """Test case to verify all console editing methods.""" - - def setUp(self): - """Setup the test harness.""" - # Setup logging with a timestamp, the module, and the log level. - logging.basicConfig(level=logging.DEBUG, - format=('%(asctime)s - %(module)s -' - ' %(levelname)s - %(message)s')) - - # Create a temp file and set both the controller and peripheral PTYs to the - # file to create a loopback. - self.tempfile = tempfile.TemporaryFile() - - # Create some mock pipes. These won't be used since we'll mock out sends - # to the interpreter. - mock_pipe_end_0, mock_pipe_end_1 = threadproc_shim.Pipe() - self.console = console.Console(self.tempfile.fileno(), self.tempfile, - tempfile.TemporaryFile(), - mock_pipe_end_0, mock_pipe_end_1, "EC") - - # Console editing methods are only valid for enhanced EC images, therefore - # we have to assume that the "EC" we're talking to is enhanced. By default, - # the console believes that the EC it's communicating with is NOT enhanced - # which is why we have to override it here. - self.console.enhanced_ec = True - self.console.CheckForEnhancedECImage = mock.MagicMock(return_value=True) - - def test_EnteringChars(self): - """Verify that characters are echoed onto the console.""" - test_str = b'abc' - input_stream = BytesToByteList(test_str) - - # Send the characters in. - for byte in input_stream: - self.console.HandleChar(byte) - - # Check the input position. - exp_pos = len(test_str) - CheckInputBufferPosition(self, exp_pos) - - # Verify that the input buffer is correct. - expected_buffer = test_str - CheckInputBuffer(self, expected_buffer) - - # Check console output - exp_console_out = test_str - CheckConsoleOutput(self, exp_console_out) - - def test_EnteringDeletingMoreCharsThanEntered(self): - """Verify that we can press backspace more than we have entered chars.""" - test_str = b'spamspam' - input_stream = BytesToByteList(test_str) - - # Send the characters in. - for byte in input_stream: - self.console.HandleChar(byte) - - # Now backspace 1 more than what we sent. - input_stream = [] - for _ in range(len(test_str) + 1): - input_stream.append(console.ControlKey.BACKSPACE) - - # Send that sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # First, verify that input buffer position is 0. - CheckInputBufferPosition(self, 0) - - # Next, examine the output stream for the correct sequence. - exp_console_out = test_str - for _ in range(len(test_str)): - exp_console_out += BACKSPACE_STRING - - # Now, verify that we got what we expected. - CheckConsoleOutput(self, exp_console_out) - - def test_EnteringMoreThanCharLimit(self): - """Verify that we drop characters when the line is too long.""" - test_str = self.console.line_limit * b'o' # All allowed. - test_str += 5 * b'x' # All should be dropped. - input_stream = BytesToByteList(test_str) - - # Send the characters in. - for byte in input_stream: - self.console.HandleChar(byte) - - # First, we expect that input buffer position should be equal to the line - # limit. - exp_pos = self.console.line_limit - CheckInputBufferPosition(self, exp_pos) - - # The input buffer should only hold until the line limit. - exp_buffer = test_str[0:self.console.line_limit] - CheckInputBuffer(self, exp_buffer) - - # Lastly, check that the extra characters are not printed. - exp_console_out = exp_buffer - CheckConsoleOutput(self, exp_console_out) - - def test_ValidKeysOnLongLine(self): - """Verify that we can still press valid keys if the line is too long.""" - # Fill the line. - test_str = self.console.line_limit * b'o' - exp_console_out = test_str - # Try to fill it even more; these should all be dropped. - test_str += 5 * b'x' - input_stream = BytesToByteList(test_str) - - # We should be able to press the following keys: - # - Backspace - # - Arrow Keys/CTRL+B/CTRL+F/CTRL+P/CTRL+N - # - Delete - # - Home/CTRL+A - # - End/CTRL+E - # - Carriage Return - - # Backspace 1 character - input_stream.append(console.ControlKey.BACKSPACE) - exp_console_out += BACKSPACE_STRING - # Refill the line. - input_stream.extend(BytesToByteList(b'o')) - exp_console_out += b'o' - - # Left arrow key. - input_stream.extend(Keys.LEFT_ARROW) - exp_console_out += OutputStream.MoveCursorLeft(1) - - # Right arrow key. - input_stream.extend(Keys.RIGHT_ARROW) - exp_console_out += OutputStream.MoveCursorRight(1) - - # CTRL+B - input_stream.append(console.ControlKey.CTRL_B) - exp_console_out += OutputStream.MoveCursorLeft(1) - - # CTRL+F - input_stream.append(console.ControlKey.CTRL_F) - exp_console_out += OutputStream.MoveCursorRight(1) - - # Let's press enter now so we can test up and down. - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - exp_console_out += b'\r\n' + self.console.prompt - - # Up arrow key. - input_stream.extend(Keys.UP_ARROW) - exp_console_out += test_str[:self.console.line_limit] - - # Down arrow key. - input_stream.extend(Keys.DOWN_ARROW) - # Since the line was blank, we have to backspace the entire line. - exp_console_out += self.console.line_limit * BACKSPACE_STRING - - # CTRL+P - input_stream.append(console.ControlKey.CTRL_P) - exp_console_out += test_str[:self.console.line_limit] - - # CTRL+N - input_stream.append(console.ControlKey.CTRL_N) - # Since the line was blank, we have to backspace the entire line. - exp_console_out += self.console.line_limit * BACKSPACE_STRING - - # Press the Up arrow key to reprint the long line. - input_stream.extend(Keys.UP_ARROW) - exp_console_out += test_str[:self.console.line_limit] - - # Press the Home key to jump to the beginning of the line. - input_stream.extend(Keys.HOME) - exp_console_out += OutputStream.MoveCursorLeft(self.console.line_limit) - - # Press the End key to jump to the end of the line. - input_stream.extend(Keys.END) - exp_console_out += OutputStream.MoveCursorRight(self.console.line_limit) - - # Press CTRL+A to jump to the beginning of the line. - input_stream.append(console.ControlKey.CTRL_A) - exp_console_out += OutputStream.MoveCursorLeft(self.console.line_limit) - - # Press CTRL+E to jump to the end of the line. - input_stream.extend(Keys.END) - exp_console_out += OutputStream.MoveCursorRight(self.console.line_limit) - - # Move left one column so we can delete a character. - input_stream.extend(Keys.LEFT_ARROW) - exp_console_out += OutputStream.MoveCursorLeft(1) - - # Press the delete key. - input_stream.extend(Keys.DEL) - # This should look like a space, and then move cursor left 1 column since - # we're at the end of line. - exp_console_out += b' ' + OutputStream.MoveCursorLeft(1) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Verify everything happened correctly. - CheckConsoleOutput(self, exp_console_out) - - def test_BackspaceOnEmptyLine(self): - """Verify that we can backspace on an empty line with no bad effects.""" - # Send a single backspace. - test_str = [console.ControlKey.BACKSPACE] - - # Send the characters in. - for byte in test_str: - self.console.HandleChar(byte) - - # Check the input position. - exp_pos = 0 - CheckInputBufferPosition(self, exp_pos) - - # Check that buffer is empty. - exp_input_buffer = b'' - CheckInputBuffer(self, exp_input_buffer) - - # Check that the console output is empty. - exp_console_out = b'' - CheckConsoleOutput(self, exp_console_out) - - def test_BackspaceWithinLine(self): - """Verify that we shift the chars over when backspacing within a line.""" - # Misspell 'help' - test_str = b'heelp' - input_stream = BytesToByteList(test_str) - # Use the arrow key to go back to fix it. - # Move cursor left 1 column. - input_stream.extend(2*Keys.LEFT_ARROW) - # Backspace once to remove the extra 'e'. - input_stream.append(console.ControlKey.BACKSPACE) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Verify the input buffer - exp_input_buffer = b'help' - CheckInputBuffer(self, exp_input_buffer) - - # Verify the input buffer position. It should be at 2 (cursor over the 'l') - CheckInputBufferPosition(self, 2) - - # We expect the console output to be the test string, with two moves to the - # left, another move left, and then the rest of the line followed by a - # space. - exp_console_out = test_str - exp_console_out += 2 * OutputStream.MoveCursorLeft(1) - - # Move cursor left 1 column. - exp_console_out += OutputStream.MoveCursorLeft(1) - # Rest of the line and a space. (test_str in this case) - exp_console_out += b'lp ' - # Reset the cursor 2 + 1 to the left. - exp_console_out += OutputStream.MoveCursorLeft(3) - - # Verify console output. - CheckConsoleOutput(self, exp_console_out) - - def test_JumpToBeginningOfLineViaCtrlA(self): - """Verify that we can jump to the beginning of a line with Ctrl+A.""" - # Enter some chars and press CTRL+A - test_str = b'abc' - input_stream = BytesToByteList(test_str) + [console.ControlKey.CTRL_A] - - # Send the characters in. - for byte in input_stream: - self.console.HandleChar(byte) - - # We expect to see our test string followed by a move cursor left. - exp_console_out = test_str - exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) - - # Check to see what whas printed on the console. - CheckConsoleOutput(self, exp_console_out) - - # Check that the input buffer position is now 0. - CheckInputBufferPosition(self, 0) - - # Check input buffer still contains our test string. - CheckInputBuffer(self, test_str) - - def test_JumpToBeginningOfLineViaHomeKey(self): - """Jump to beginning of line via HOME key.""" - test_str = b'version' - input_stream = BytesToByteList(test_str) - input_stream.extend(Keys.HOME) - - # Send out the stream. - for byte in input_stream: - self.console.HandleChar(byte) - - # First, verify that input buffer position is now 0. - CheckInputBufferPosition(self, 0) - - # Next, verify that the input buffer did not change. - CheckInputBuffer(self, test_str) - - # Lastly, check that the cursor moved correctly. - exp_console_out = test_str - exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) - CheckConsoleOutput(self, exp_console_out) - - def test_JumpToEndOfLineViaEndKey(self): - """Jump to the end of the line using the END key.""" - test_str = b'version' - input_stream = BytesToByteList(test_str) - input_stream += [console.ControlKey.CTRL_A] - # Now, jump to the end of the line. - input_stream.extend(Keys.END) - - # Send out the stream. - for byte in input_stream: - self.console.HandleChar(byte) - - # Verify that the input buffer position is correct. This should be at the - # end of the test string. - CheckInputBufferPosition(self, len(test_str)) - - # The expected output should be the test string, followed by a jump to the - # beginning of the line, and lastly a jump to the end of the line. - exp_console_out = test_str - exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) - # Now the jump back to the end of the line. - exp_console_out += OutputStream.MoveCursorRight(len(test_str)) - - # Verify console output stream. - CheckConsoleOutput(self, exp_console_out) - - def test_JumpToEndOfLineViaCtrlE(self): - """Enter some chars and then try to jump to the end. (Should be a no-op)""" - test_str = b'sysinfo' - input_stream = BytesToByteList(test_str) - input_stream.append(console.ControlKey.CTRL_E) - - # Send out the stream - for byte in input_stream: - self.console.HandleChar(byte) - - # Verify that the input buffer position isn't any further than we expect. - # At this point, the position should be at the end of the test string. - CheckInputBufferPosition(self, len(test_str)) - - # Now, let's try to jump to the beginning and then jump back to the end. - input_stream = [console.ControlKey.CTRL_A, console.ControlKey.CTRL_E] - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Perform the same verification. - CheckInputBufferPosition(self, len(test_str)) - - # Lastly try to jump again, beyond the end. - input_stream = [console.ControlKey.CTRL_E] - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Perform the same verification. - CheckInputBufferPosition(self, len(test_str)) - - # We expect to see the test string, a jump to the beginning of the line, and - # one jump to the end of the line. - exp_console_out = test_str - # Jump to beginning. - exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) - # Jump back to end. - exp_console_out += OutputStream.MoveCursorRight(len(test_str)) - - # Verify the console output. - CheckConsoleOutput(self, exp_console_out) - - def test_MoveLeftWithArrowKey(self): - """Move cursor left one column with arrow key.""" - test_str = b'tastyspam' - input_stream = BytesToByteList(test_str) - input_stream.extend(Keys.LEFT_ARROW) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Verify that the input buffer position is 1 less than the length. - CheckInputBufferPosition(self, len(test_str) - 1) - - # Also, verify that the input buffer is not modified. - CheckInputBuffer(self, test_str) - - # We expect the test string, followed by a one column move left. - exp_console_out = test_str + OutputStream.MoveCursorLeft(1) - - # Verify console output. - CheckConsoleOutput(self, exp_console_out) - - def test_MoveLeftWithCtrlB(self): - """Move cursor back one column with Ctrl+B.""" - test_str = b'tastyspam' - input_stream = BytesToByteList(test_str) - input_stream.append(console.ControlKey.CTRL_B) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Verify that the input buffer position is 1 less than the length. - CheckInputBufferPosition(self, len(test_str) - 1) + Args: + test_case: A unittest.TestCase object representing the current unit test. + exp_pos: An integer representing the expected input buffer position. + """ + test_case.assertEqual( + exp_pos, + test_case.console.input_buffer_pos, + "input buffer position is incorrect.\ngot: " + + str(test_case.console.input_buffer_pos) + + "\nexp: " + + str(exp_pos) + + "\n" + + str(test_case.console), + ) - # Also, verify that the input buffer is not modified. - CheckInputBuffer(self, test_str) - # We expect the test string, followed by a one column move left. - exp_console_out = test_str + OutputStream.MoveCursorLeft(1) +def CheckHistoryBuffer(test_case, exp_history): + """Verify that the items in the history buffer are what we expect. - # Verify console output. - CheckConsoleOutput(self, exp_console_out) + Args: + test_case: A unittest.TestCase object representing the current unit test. + exp_history: A list of strings representing the expected contents of the + history buffer. + """ + # First, check to see if the length is what we expect. + test_case.assertEqual( + len(exp_history), + len(test_case.console.history), + ( + "The number of items in the history is unexpected.\n" + "exp: " + str(len(exp_history)) + "\n" + "got: " + str(len(test_case.console.history)) + "\n" + "internal state:\n" + str(test_case.console) + ), + ) + + # Next, check the actual contents of the history buffer. + for i in range(len(exp_history)): + test_case.assertEqual( + exp_history[i], + test_case.console.history[i], + ( + b"history buffer contents are incorrect.\n" + b"exp: " + exp_history[i] + b"\n" + b"got: " + test_case.console.history[i] + b"\n" + b"internal state:\n" + str(test_case.console).encode("ascii") + ), + ) - def test_MoveRightWithArrowKey(self): - """Move cursor one column to the right with the arrow key.""" - test_str = b'version' - input_stream = BytesToByteList(test_str) - # Jump to beginning of line. - input_stream.append(console.ControlKey.CTRL_A) - # Press right arrow key. - input_stream.extend(Keys.RIGHT_ARROW) - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) +class TestConsoleEditingMethods(unittest.TestCase): + """Test case to verify all console editing methods.""" + + def setUp(self): + """Setup the test harness.""" + # Setup logging with a timestamp, the module, and the log level. + logging.basicConfig( + level=logging.DEBUG, + format=("%(asctime)s - %(module)s -" " %(levelname)s - %(message)s"), + ) + + # Create a temp file and set both the controller and peripheral PTYs to the + # file to create a loopback. + self.tempfile = tempfile.TemporaryFile() + + # Create some mock pipes. These won't be used since we'll mock out sends + # to the interpreter. + mock_pipe_end_0, mock_pipe_end_1 = threadproc_shim.Pipe() + self.console = console.Console( + self.tempfile.fileno(), + self.tempfile, + tempfile.TemporaryFile(), + mock_pipe_end_0, + mock_pipe_end_1, + "EC", + ) + + # Console editing methods are only valid for enhanced EC images, therefore + # we have to assume that the "EC" we're talking to is enhanced. By default, + # the console believes that the EC it's communicating with is NOT enhanced + # which is why we have to override it here. + self.console.enhanced_ec = True + self.console.CheckForEnhancedECImage = mock.MagicMock(return_value=True) + + def test_EnteringChars(self): + """Verify that characters are echoed onto the console.""" + test_str = b"abc" + input_stream = BytesToByteList(test_str) + + # Send the characters in. + for byte in input_stream: + self.console.HandleChar(byte) + + # Check the input position. + exp_pos = len(test_str) + CheckInputBufferPosition(self, exp_pos) + + # Verify that the input buffer is correct. + expected_buffer = test_str + CheckInputBuffer(self, expected_buffer) + + # Check console output + exp_console_out = test_str + CheckConsoleOutput(self, exp_console_out) + + def test_EnteringDeletingMoreCharsThanEntered(self): + """Verify that we can press backspace more than we have entered chars.""" + test_str = b"spamspam" + input_stream = BytesToByteList(test_str) + + # Send the characters in. + for byte in input_stream: + self.console.HandleChar(byte) + + # Now backspace 1 more than what we sent. + input_stream = [] + for _ in range(len(test_str) + 1): + input_stream.append(console.ControlKey.BACKSPACE) + + # Send that sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # First, verify that input buffer position is 0. + CheckInputBufferPosition(self, 0) + + # Next, examine the output stream for the correct sequence. + exp_console_out = test_str + for _ in range(len(test_str)): + exp_console_out += BACKSPACE_STRING + + # Now, verify that we got what we expected. + CheckConsoleOutput(self, exp_console_out) + + def test_EnteringMoreThanCharLimit(self): + """Verify that we drop characters when the line is too long.""" + test_str = self.console.line_limit * b"o" # All allowed. + test_str += 5 * b"x" # All should be dropped. + input_stream = BytesToByteList(test_str) + + # Send the characters in. + for byte in input_stream: + self.console.HandleChar(byte) + + # First, we expect that input buffer position should be equal to the line + # limit. + exp_pos = self.console.line_limit + CheckInputBufferPosition(self, exp_pos) + + # The input buffer should only hold until the line limit. + exp_buffer = test_str[0 : self.console.line_limit] + CheckInputBuffer(self, exp_buffer) + + # Lastly, check that the extra characters are not printed. + exp_console_out = exp_buffer + CheckConsoleOutput(self, exp_console_out) + + def test_ValidKeysOnLongLine(self): + """Verify that we can still press valid keys if the line is too long.""" + # Fill the line. + test_str = self.console.line_limit * b"o" + exp_console_out = test_str + # Try to fill it even more; these should all be dropped. + test_str += 5 * b"x" + input_stream = BytesToByteList(test_str) + + # We should be able to press the following keys: + # - Backspace + # - Arrow Keys/CTRL+B/CTRL+F/CTRL+P/CTRL+N + # - Delete + # - Home/CTRL+A + # - End/CTRL+E + # - Carriage Return + + # Backspace 1 character + input_stream.append(console.ControlKey.BACKSPACE) + exp_console_out += BACKSPACE_STRING + # Refill the line. + input_stream.extend(BytesToByteList(b"o")) + exp_console_out += b"o" + + # Left arrow key. + input_stream.extend(Keys.LEFT_ARROW) + exp_console_out += OutputStream.MoveCursorLeft(1) + + # Right arrow key. + input_stream.extend(Keys.RIGHT_ARROW) + exp_console_out += OutputStream.MoveCursorRight(1) + + # CTRL+B + input_stream.append(console.ControlKey.CTRL_B) + exp_console_out += OutputStream.MoveCursorLeft(1) + + # CTRL+F + input_stream.append(console.ControlKey.CTRL_F) + exp_console_out += OutputStream.MoveCursorRight(1) + + # Let's press enter now so we can test up and down. + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + exp_console_out += b"\r\n" + self.console.prompt + + # Up arrow key. + input_stream.extend(Keys.UP_ARROW) + exp_console_out += test_str[: self.console.line_limit] + + # Down arrow key. + input_stream.extend(Keys.DOWN_ARROW) + # Since the line was blank, we have to backspace the entire line. + exp_console_out += self.console.line_limit * BACKSPACE_STRING + + # CTRL+P + input_stream.append(console.ControlKey.CTRL_P) + exp_console_out += test_str[: self.console.line_limit] + + # CTRL+N + input_stream.append(console.ControlKey.CTRL_N) + # Since the line was blank, we have to backspace the entire line. + exp_console_out += self.console.line_limit * BACKSPACE_STRING + + # Press the Up arrow key to reprint the long line. + input_stream.extend(Keys.UP_ARROW) + exp_console_out += test_str[: self.console.line_limit] + + # Press the Home key to jump to the beginning of the line. + input_stream.extend(Keys.HOME) + exp_console_out += OutputStream.MoveCursorLeft(self.console.line_limit) + + # Press the End key to jump to the end of the line. + input_stream.extend(Keys.END) + exp_console_out += OutputStream.MoveCursorRight(self.console.line_limit) + + # Press CTRL+A to jump to the beginning of the line. + input_stream.append(console.ControlKey.CTRL_A) + exp_console_out += OutputStream.MoveCursorLeft(self.console.line_limit) + + # Press CTRL+E to jump to the end of the line. + input_stream.extend(Keys.END) + exp_console_out += OutputStream.MoveCursorRight(self.console.line_limit) + + # Move left one column so we can delete a character. + input_stream.extend(Keys.LEFT_ARROW) + exp_console_out += OutputStream.MoveCursorLeft(1) + + # Press the delete key. + input_stream.extend(Keys.DEL) + # This should look like a space, and then move cursor left 1 column since + # we're at the end of line. + exp_console_out += b" " + OutputStream.MoveCursorLeft(1) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify everything happened correctly. + CheckConsoleOutput(self, exp_console_out) + + def test_BackspaceOnEmptyLine(self): + """Verify that we can backspace on an empty line with no bad effects.""" + # Send a single backspace. + test_str = [console.ControlKey.BACKSPACE] + + # Send the characters in. + for byte in test_str: + self.console.HandleChar(byte) + + # Check the input position. + exp_pos = 0 + CheckInputBufferPosition(self, exp_pos) + + # Check that buffer is empty. + exp_input_buffer = b"" + CheckInputBuffer(self, exp_input_buffer) + + # Check that the console output is empty. + exp_console_out = b"" + CheckConsoleOutput(self, exp_console_out) + + def test_BackspaceWithinLine(self): + """Verify that we shift the chars over when backspacing within a line.""" + # Misspell 'help' + test_str = b"heelp" + input_stream = BytesToByteList(test_str) + # Use the arrow key to go back to fix it. + # Move cursor left 1 column. + input_stream.extend(2 * Keys.LEFT_ARROW) + # Backspace once to remove the extra 'e'. + input_stream.append(console.ControlKey.BACKSPACE) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify the input buffer + exp_input_buffer = b"help" + CheckInputBuffer(self, exp_input_buffer) + + # Verify the input buffer position. It should be at 2 (cursor over the 'l') + CheckInputBufferPosition(self, 2) + + # We expect the console output to be the test string, with two moves to the + # left, another move left, and then the rest of the line followed by a + # space. + exp_console_out = test_str + exp_console_out += 2 * OutputStream.MoveCursorLeft(1) + + # Move cursor left 1 column. + exp_console_out += OutputStream.MoveCursorLeft(1) + # Rest of the line and a space. (test_str in this case) + exp_console_out += b"lp " + # Reset the cursor 2 + 1 to the left. + exp_console_out += OutputStream.MoveCursorLeft(3) + + # Verify console output. + CheckConsoleOutput(self, exp_console_out) + + def test_JumpToBeginningOfLineViaCtrlA(self): + """Verify that we can jump to the beginning of a line with Ctrl+A.""" + # Enter some chars and press CTRL+A + test_str = b"abc" + input_stream = BytesToByteList(test_str) + [console.ControlKey.CTRL_A] + + # Send the characters in. + for byte in input_stream: + self.console.HandleChar(byte) + + # We expect to see our test string followed by a move cursor left. + exp_console_out = test_str + exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) + + # Check to see what whas printed on the console. + CheckConsoleOutput(self, exp_console_out) + + # Check that the input buffer position is now 0. + CheckInputBufferPosition(self, 0) + + # Check input buffer still contains our test string. + CheckInputBuffer(self, test_str) + + def test_JumpToBeginningOfLineViaHomeKey(self): + """Jump to beginning of line via HOME key.""" + test_str = b"version" + input_stream = BytesToByteList(test_str) + input_stream.extend(Keys.HOME) + + # Send out the stream. + for byte in input_stream: + self.console.HandleChar(byte) + + # First, verify that input buffer position is now 0. + CheckInputBufferPosition(self, 0) + + # Next, verify that the input buffer did not change. + CheckInputBuffer(self, test_str) + + # Lastly, check that the cursor moved correctly. + exp_console_out = test_str + exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) + CheckConsoleOutput(self, exp_console_out) + + def test_JumpToEndOfLineViaEndKey(self): + """Jump to the end of the line using the END key.""" + test_str = b"version" + input_stream = BytesToByteList(test_str) + input_stream += [console.ControlKey.CTRL_A] + # Now, jump to the end of the line. + input_stream.extend(Keys.END) + + # Send out the stream. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify that the input buffer position is correct. This should be at the + # end of the test string. + CheckInputBufferPosition(self, len(test_str)) + + # The expected output should be the test string, followed by a jump to the + # beginning of the line, and lastly a jump to the end of the line. + exp_console_out = test_str + exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) + # Now the jump back to the end of the line. + exp_console_out += OutputStream.MoveCursorRight(len(test_str)) + + # Verify console output stream. + CheckConsoleOutput(self, exp_console_out) + + def test_JumpToEndOfLineViaCtrlE(self): + """Enter some chars and then try to jump to the end. (Should be a no-op)""" + test_str = b"sysinfo" + input_stream = BytesToByteList(test_str) + input_stream.append(console.ControlKey.CTRL_E) + + # Send out the stream + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify that the input buffer position isn't any further than we expect. + # At this point, the position should be at the end of the test string. + CheckInputBufferPosition(self, len(test_str)) + + # Now, let's try to jump to the beginning and then jump back to the end. + input_stream = [console.ControlKey.CTRL_A, console.ControlKey.CTRL_E] + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Perform the same verification. + CheckInputBufferPosition(self, len(test_str)) + + # Lastly try to jump again, beyond the end. + input_stream = [console.ControlKey.CTRL_E] + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Perform the same verification. + CheckInputBufferPosition(self, len(test_str)) + + # We expect to see the test string, a jump to the beginning of the line, and + # one jump to the end of the line. + exp_console_out = test_str + # Jump to beginning. + exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) + # Jump back to end. + exp_console_out += OutputStream.MoveCursorRight(len(test_str)) + + # Verify the console output. + CheckConsoleOutput(self, exp_console_out) + + def test_MoveLeftWithArrowKey(self): + """Move cursor left one column with arrow key.""" + test_str = b"tastyspam" + input_stream = BytesToByteList(test_str) + input_stream.extend(Keys.LEFT_ARROW) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify that the input buffer position is 1 less than the length. + CheckInputBufferPosition(self, len(test_str) - 1) + + # Also, verify that the input buffer is not modified. + CheckInputBuffer(self, test_str) + + # We expect the test string, followed by a one column move left. + exp_console_out = test_str + OutputStream.MoveCursorLeft(1) + + # Verify console output. + CheckConsoleOutput(self, exp_console_out) + + def test_MoveLeftWithCtrlB(self): + """Move cursor back one column with Ctrl+B.""" + test_str = b"tastyspam" + input_stream = BytesToByteList(test_str) + input_stream.append(console.ControlKey.CTRL_B) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify that the input buffer position is 1 less than the length. + CheckInputBufferPosition(self, len(test_str) - 1) - # Verify that the input buffer position is 1. - CheckInputBufferPosition(self, 1) + # Also, verify that the input buffer is not modified. + CheckInputBuffer(self, test_str) - # Also, verify that the input buffer is not modified. - CheckInputBuffer(self, test_str) + # We expect the test string, followed by a one column move left. + exp_console_out = test_str + OutputStream.MoveCursorLeft(1) - # We expect the test string, followed by a jump to the beginning of the - # line, and finally a move right 1. - exp_console_out = test_str + OutputStream.MoveCursorLeft(len((test_str))) - - # A move right 1 column. - exp_console_out += OutputStream.MoveCursorRight(1) - - # Verify console output. - CheckConsoleOutput(self, exp_console_out) - - def test_MoveRightWithCtrlF(self): - """Move cursor forward one column with Ctrl+F.""" - test_str = b'panicinfo' - input_stream = BytesToByteList(test_str) - input_stream.append(console.ControlKey.CTRL_A) - # Now, move right one column. - input_stream.append(console.ControlKey.CTRL_F) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Verify that the input buffer position is 1. - CheckInputBufferPosition(self, 1) - - # Also, verify that the input buffer is not modified. - CheckInputBuffer(self, test_str) - - # We expect the test string, followed by a jump to the beginning of the - # line, and finally a move right 1. - exp_console_out = test_str + OutputStream.MoveCursorLeft(len((test_str))) - - # A move right 1 column. - exp_console_out += OutputStream.MoveCursorRight(1) - - # Verify console output. - CheckConsoleOutput(self, exp_console_out) - - def test_ImpossibleMoveLeftWithArrowKey(self): - """Verify that we can't move left at the beginning of the line.""" - # We shouldn't be able to move left if we're at the beginning of the line. - input_stream = Keys.LEFT_ARROW - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Nothing should have been output. - exp_console_output = b'' - CheckConsoleOutput(self, exp_console_output) - - # The input buffer position should still be 0. - CheckInputBufferPosition(self, 0) - - # The input buffer itself should be empty. - CheckInputBuffer(self, b'') - - def test_ImpossibleMoveRightWithArrowKey(self): - """Verify that we can't move right at the end of the line.""" - # We shouldn't be able to move right if we're at the end of the line. - input_stream = Keys.RIGHT_ARROW - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Nothing should have been output. - exp_console_output = b'' - CheckConsoleOutput(self, exp_console_output) - - # The input buffer position should still be 0. - CheckInputBufferPosition(self, 0) - - # The input buffer itself should be empty. - CheckInputBuffer(self, b'') - - def test_KillEntireLine(self): - """Verify that we can kill an entire line with Ctrl+K.""" - test_str = b'accelinfo on' - input_stream = BytesToByteList(test_str) - # Jump to beginning of line and then kill it with Ctrl+K. - input_stream.extend([console.ControlKey.CTRL_A, console.ControlKey.CTRL_K]) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # First, we expect that the input buffer is empty. - CheckInputBuffer(self, b'') - - # The buffer position should be 0. - CheckInputBufferPosition(self, 0) - - # What we expect to see on the console stream should be the following. The - # test string, a jump to the beginning of the line, then jump back to the - # end of the line and replace the line with spaces. - exp_console_out = test_str - # Jump to beginning of line. - exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) - # Jump to end of line. - exp_console_out += OutputStream.MoveCursorRight(len(test_str)) - # Replace line with spaces, which looks like backspaces. - for _ in range(len(test_str)): - exp_console_out += BACKSPACE_STRING - - # Verify the console output. - CheckConsoleOutput(self, exp_console_out) - - def test_KillPartialLine(self): - """Verify that we can kill a portion of a line.""" - test_str = b'accelread 0 1' - input_stream = BytesToByteList(test_str) - len_to_kill = 5 - for _ in range(len_to_kill): - # Move cursor left - input_stream.extend(Keys.LEFT_ARROW) - # Now kill - input_stream.append(console.ControlKey.CTRL_K) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # First, check that the input buffer was truncated. - exp_input_buffer = test_str[:-len_to_kill] - CheckInputBuffer(self, exp_input_buffer) - - # Verify the input buffer position. - CheckInputBufferPosition(self, len(test_str) - len_to_kill) - - # The console output stream that we expect is the test string followed by a - # move left of len_to_kill, then a jump to the end of the line and backspace - # of len_to_kill. - exp_console_out = test_str - for _ in range(len_to_kill): - # Move left 1 column. - exp_console_out += OutputStream.MoveCursorLeft(1) - # Then jump to the end of the line - exp_console_out += OutputStream.MoveCursorRight(len_to_kill) - # Backspace of len_to_kill - for _ in range(len_to_kill): - exp_console_out += BACKSPACE_STRING - - # Verify console output. - CheckConsoleOutput(self, exp_console_out) - - def test_InsertingCharacters(self): - """Verify that we can insert characters within the line.""" - test_str = b'accel 0 1' # Here we forgot the 'read' part in 'accelread' - input_stream = BytesToByteList(test_str) - # We need to move over to the 'l' and add read. - insertion_point = test_str.find(b'l') + 1 - for i in range(len(test_str) - insertion_point): - # Move cursor left. - input_stream.extend(Keys.LEFT_ARROW) - # Now, add in 'read' - added_str = b'read' - input_stream.extend(BytesToByteList(added_str)) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # First, verify that the input buffer is correct. - exp_input_buffer = test_str[:insertion_point] + added_str - exp_input_buffer += test_str[insertion_point:] - CheckInputBuffer(self, exp_input_buffer) - - # Verify that the input buffer position is correct. - exp_input_buffer_pos = insertion_point + len(added_str) - CheckInputBufferPosition(self, exp_input_buffer_pos) - - # The console output stream that we expect is the test string, followed by - # move cursor left until the 'l' was found, the added test string while - # shifting characters around. - exp_console_out = test_str - for i in range(len(test_str) - insertion_point): - # Move cursor left. - exp_console_out += OutputStream.MoveCursorLeft(1) - - # Now for each character, write the rest of the line will be shifted to the - # right one column. - for i in range(len(added_str)): - # Printed character. - exp_console_out += added_str[i:i+1] - # The rest of the line - exp_console_out += test_str[insertion_point:] - # Reset the cursor back left - reset_dist = len(test_str[insertion_point:]) - exp_console_out += OutputStream.MoveCursorLeft(reset_dist) - - # Verify the console output. - CheckConsoleOutput(self, exp_console_out) - - def test_StoreCommandHistory(self): - """Verify that entered commands are stored in the history.""" - test_commands = [] - test_commands.append(b'help') - test_commands.append(b'version') - test_commands.append(b'accelread 0 1') - input_stream = [] - for c in test_commands: - input_stream.extend(BytesToByteList(c)) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # We expect to have the test commands in the history buffer. - exp_history_buf = test_commands - CheckHistoryBuffer(self, exp_history_buf) - - def test_CycleUpThruCommandHistory(self): - """Verify that the UP arrow key will print itmes in the history buffer.""" - # Enter some commands. - test_commands = [b'version', b'accelrange 0', b'battery', b'gettime'] - input_stream = [] - for command in test_commands: - input_stream.extend(BytesToByteList(command)) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Now, hit the UP arrow key to print the previous entries. - for i in range(len(test_commands)): - input_stream.extend(Keys.UP_ARROW) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # The expected output should be test commands with prompts printed in - # between, followed by line kills with the previous test commands printed. - exp_console_out = b'' - for i in range(len(test_commands)): - exp_console_out += test_commands[i] + b'\r\n' + self.console.prompt - - # When we press up, the line should be cleared and print the previous buffer - # entry. - for i in range(len(test_commands)-1, 0, -1): - exp_console_out += test_commands[i] - # Backspace to the beginning. - for i in range(len(test_commands[i])): - exp_console_out += BACKSPACE_STRING + # Verify console output. + CheckConsoleOutput(self, exp_console_out) - # The last command should just be printed out with no backspacing. - exp_console_out += test_commands[0] - - # Now, verify. - CheckConsoleOutput(self, exp_console_out) - - def test_UpArrowOnEmptyHistory(self): - """Ensure nothing happens if the history is empty.""" - # Press the up arrow key twice. - input_stream = 2 * Keys.UP_ARROW - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # We expect nothing to have happened. - exp_console_out = b'' - exp_input_buffer = b'' - exp_input_buffer_pos = 0 - exp_history_buf = [] - - # Verify. - CheckConsoleOutput(self, exp_console_out) - CheckInputBufferPosition(self, exp_input_buffer_pos) - CheckInputBuffer(self, exp_input_buffer) - CheckHistoryBuffer(self, exp_history_buf) - - def test_UpArrowDoesNotGoOutOfBounds(self): - """Verify that pressing the up arrow many times won't go out of bounds.""" - # Enter one command. - test_str = b'help version' - input_stream = BytesToByteList(test_str) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - # Then press the up arrow key twice. - input_stream.extend(2 * Keys.UP_ARROW) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Verify that the history buffer is correct. - exp_history_buf = [test_str] - CheckHistoryBuffer(self, exp_history_buf) - - # We expect that the console output should only contain our entered command, - # a new prompt, and then our command aggain. - exp_console_out = test_str + b'\r\n' + self.console.prompt - # Pressing up should reprint the command we entered. - exp_console_out += test_str - - # Verify. - CheckConsoleOutput(self, exp_console_out) - - def test_CycleDownThruCommandHistory(self): - """Verify that we can select entries by hitting the down arrow.""" - # Enter at least 4 commands. - test_commands = [b'version', b'accelrange 0', b'battery', b'gettime'] - input_stream = [] - for command in test_commands: - input_stream.extend(BytesToByteList(command)) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Now, hit the UP arrow key twice to print the previous two entries. - for i in range(2): - input_stream.extend(Keys.UP_ARROW) - - # Now, hit the DOWN arrow key twice to print the newer entries. - input_stream.extend(2*Keys.DOWN_ARROW) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # The expected output should be commands that we entered, followed by - # prompts, then followed by our last two commands in reverse. Then, we - # should see the last entry in the list, followed by the saved partial cmd - # of a blank line. - exp_console_out = b'' - for i in range(len(test_commands)): - exp_console_out += test_commands[i] + b'\r\n' + self.console.prompt - - # When we press up, the line should be cleared and print the previous buffer - # entry. - for i in range(len(test_commands)-1, 1, -1): - exp_console_out += test_commands[i] - # Backspace to the beginning. - for i in range(len(test_commands[i])): - exp_console_out += BACKSPACE_STRING + def test_MoveRightWithArrowKey(self): + """Move cursor one column to the right with the arrow key.""" + test_str = b"version" + input_stream = BytesToByteList(test_str) + # Jump to beginning of line. + input_stream.append(console.ControlKey.CTRL_A) + # Press right arrow key. + input_stream.extend(Keys.RIGHT_ARROW) - # When we press down, it should have cleared the last command (which we - # covered with the previous for loop), and then prints the next command. - exp_console_out += test_commands[3] - for i in range(len(test_commands[3])): - exp_console_out += BACKSPACE_STRING - - # Verify console output. - CheckConsoleOutput(self, exp_console_out) - - # Verify input buffer. - exp_input_buffer = b'' # Empty because our partial command was empty. - exp_input_buffer_pos = len(exp_input_buffer) - CheckInputBuffer(self, exp_input_buffer) - CheckInputBufferPosition(self, exp_input_buffer_pos) - - def test_SavingPartialCommandWhenNavigatingHistory(self): - """Verify that partial commands are saved when navigating history.""" - # Enter a command. - test_str = b'accelinfo' - input_stream = BytesToByteList(test_str) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Enter a partial command. - partial_cmd = b'ver' - input_stream.extend(BytesToByteList(partial_cmd)) - - # Hit the UP arrow key. - input_stream.extend(Keys.UP_ARROW) - # Then, the DOWN arrow key. - input_stream.extend(Keys.DOWN_ARROW) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # The expected output should be the command we entered, a prompt, the - # partial command, clearing of the partial command, the command entered, - # clearing of the command entered, and then the partial command. - exp_console_out = test_str + b'\r\n' + self.console.prompt - exp_console_out += partial_cmd - for _ in range(len(partial_cmd)): - exp_console_out += BACKSPACE_STRING - exp_console_out += test_str - for _ in range(len(test_str)): - exp_console_out += BACKSPACE_STRING - exp_console_out += partial_cmd - - # Verify console output. - CheckConsoleOutput(self, exp_console_out) - - # Verify input buffer. - exp_input_buffer = partial_cmd - exp_input_buffer_pos = len(exp_input_buffer) - CheckInputBuffer(self, exp_input_buffer) - CheckInputBufferPosition(self, exp_input_buffer_pos) - - def test_DownArrowOnEmptyHistory(self): - """Ensure nothing happens if the history is empty.""" - # Then press the up down arrow twice. - input_stream = 2 * Keys.DOWN_ARROW - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # We expect nothing to have happened. - exp_console_out = b'' - exp_input_buffer = b'' - exp_input_buffer_pos = 0 - exp_history_buf = [] - - # Verify. - CheckConsoleOutput(self, exp_console_out) - CheckInputBufferPosition(self, exp_input_buffer_pos) - CheckInputBuffer(self, exp_input_buffer) - CheckHistoryBuffer(self, exp_history_buf) - - def test_DeleteCharsUsingDELKey(self): - """Verify that we can delete characters using the DEL key.""" - test_str = b'version' - input_stream = BytesToByteList(test_str) - - # Hit the left arrow key 2 times. - input_stream.extend(2 * Keys.LEFT_ARROW) - - # Press the DEL key. - input_stream.extend(Keys.DEL) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # The expected output should be the command we entered, 2 individual cursor - # moves to the left, and then removing a char and shifting everything to the - # left one column. - exp_console_out = test_str - exp_console_out += 2 * OutputStream.MoveCursorLeft(1) - - # Remove the char by shifting everything to the left one, slicing out the - # remove char. - exp_console_out += test_str[-1:] + b' ' - - # Reset the cursor by moving back 2 columns because of the 'n' and space. - exp_console_out += OutputStream.MoveCursorLeft(2) - - # Verify console output. - CheckConsoleOutput(self, exp_console_out) - - # Verify input buffer. The input buffer should have the char sliced out and - # be positioned where the char was removed. - exp_input_buffer = test_str[:-2] + test_str[-1:] - exp_input_buffer_pos = len(exp_input_buffer) - 1 - CheckInputBuffer(self, exp_input_buffer) - CheckInputBufferPosition(self, exp_input_buffer_pos) - - def test_RepeatedCommandInHistory(self): - """Verify that we don't store 2 consecutive identical commands in history""" - # Enter a few commands. - test_commands = [b'version', b'accelrange 0', b'battery', b'gettime'] - # Repeat the last command. - test_commands.append(test_commands[len(test_commands)-1]) - - input_stream = [] - for command in test_commands: - input_stream.extend(BytesToByteList(command)) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Verify that the history buffer is correct. The last command, since - # it was repeated, should not have been added to the history. - exp_history_buf = test_commands[0:len(test_commands)-1] - CheckHistoryBuffer(self, exp_history_buf) + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + # Verify that the input buffer position is 1. + CheckInputBufferPosition(self, 1) -class TestConsoleCompatibility(unittest.TestCase): - """Verify that console can speak to enhanced and non-enhanced EC images.""" - def setUp(self): - """Setup the test harness.""" - # Setup logging with a timestamp, the module, and the log level. - logging.basicConfig(level=logging.DEBUG, - format=('%(asctime)s - %(module)s -' - ' %(levelname)s - %(message)s')) - # Create a temp file and set both the controller and peripheral PTYs to the - # file to create a loopback. - self.tempfile = tempfile.TemporaryFile() - - # Mock out the pipes. - mock_pipe_end_0, mock_pipe_end_1 = mock.MagicMock(), mock.MagicMock() - self.console = console.Console(self.tempfile.fileno(), self.tempfile, - tempfile.TemporaryFile(), - mock_pipe_end_0, mock_pipe_end_1, "EC") - - @mock.patch('ec3po.console.Console.CheckForEnhancedECImage') - def test_ActAsPassThruInNonEnhancedMode(self, mock_check): - """Verify we simply pass everything thru to non-enhanced ECs. + # Also, verify that the input buffer is not modified. + CheckInputBuffer(self, test_str) - Args: - mock_check: A MagicMock object replacing the CheckForEnhancedECImage() - method. - """ - # Set the interrogation mode to always so that we actually interrogate. - self.console.interrogation_mode = b'always' - - # Assume EC interrogations indicate that the image is non-enhanced. - mock_check.return_value = False - - # Press enter, followed by the command, and another enter. - input_stream = [] - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - test_command = b'version' - input_stream.extend(BytesToByteList(test_command)) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Expected calls to send down the pipe would be each character of the test - # command. - expected_calls = [] - expected_calls.append(mock.call( - six.int2byte(console.ControlKey.CARRIAGE_RETURN))) - for char in test_command: - if six.PY3: - expected_calls.append(mock.call(bytes([char]))) - else: - expected_calls.append(mock.call(char)) - expected_calls.append(mock.call( - six.int2byte(console.ControlKey.CARRIAGE_RETURN))) - - # Verify that the calls happened. - self.console.cmd_pipe.send.assert_has_calls(expected_calls) - - # Since we're acting as a pass-thru, the input buffer should be empty and - # input_buffer_pos is 0. - CheckInputBuffer(self, b'') - CheckInputBufferPosition(self, 0) - - @mock.patch('ec3po.console.Console.CheckForEnhancedECImage') - def test_TransitionFromNonEnhancedToEnhanced(self, mock_check): - """Verify that we transition correctly to enhanced mode. + # We expect the test string, followed by a jump to the beginning of the + # line, and finally a move right 1. + exp_console_out = test_str + OutputStream.MoveCursorLeft(len((test_str))) + + # A move right 1 column. + exp_console_out += OutputStream.MoveCursorRight(1) + + # Verify console output. + CheckConsoleOutput(self, exp_console_out) + + def test_MoveRightWithCtrlF(self): + """Move cursor forward one column with Ctrl+F.""" + test_str = b"panicinfo" + input_stream = BytesToByteList(test_str) + input_stream.append(console.ControlKey.CTRL_A) + # Now, move right one column. + input_stream.append(console.ControlKey.CTRL_F) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify that the input buffer position is 1. + CheckInputBufferPosition(self, 1) + + # Also, verify that the input buffer is not modified. + CheckInputBuffer(self, test_str) + + # We expect the test string, followed by a jump to the beginning of the + # line, and finally a move right 1. + exp_console_out = test_str + OutputStream.MoveCursorLeft(len((test_str))) + + # A move right 1 column. + exp_console_out += OutputStream.MoveCursorRight(1) + + # Verify console output. + CheckConsoleOutput(self, exp_console_out) + + def test_ImpossibleMoveLeftWithArrowKey(self): + """Verify that we can't move left at the beginning of the line.""" + # We shouldn't be able to move left if we're at the beginning of the line. + input_stream = Keys.LEFT_ARROW + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Nothing should have been output. + exp_console_output = b"" + CheckConsoleOutput(self, exp_console_output) + + # The input buffer position should still be 0. + CheckInputBufferPosition(self, 0) + + # The input buffer itself should be empty. + CheckInputBuffer(self, b"") + + def test_ImpossibleMoveRightWithArrowKey(self): + """Verify that we can't move right at the end of the line.""" + # We shouldn't be able to move right if we're at the end of the line. + input_stream = Keys.RIGHT_ARROW + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Nothing should have been output. + exp_console_output = b"" + CheckConsoleOutput(self, exp_console_output) + + # The input buffer position should still be 0. + CheckInputBufferPosition(self, 0) + + # The input buffer itself should be empty. + CheckInputBuffer(self, b"") + + def test_KillEntireLine(self): + """Verify that we can kill an entire line with Ctrl+K.""" + test_str = b"accelinfo on" + input_stream = BytesToByteList(test_str) + # Jump to beginning of line and then kill it with Ctrl+K. + input_stream.extend([console.ControlKey.CTRL_A, console.ControlKey.CTRL_K]) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # First, we expect that the input buffer is empty. + CheckInputBuffer(self, b"") + + # The buffer position should be 0. + CheckInputBufferPosition(self, 0) + + # What we expect to see on the console stream should be the following. The + # test string, a jump to the beginning of the line, then jump back to the + # end of the line and replace the line with spaces. + exp_console_out = test_str + # Jump to beginning of line. + exp_console_out += OutputStream.MoveCursorLeft(len(test_str)) + # Jump to end of line. + exp_console_out += OutputStream.MoveCursorRight(len(test_str)) + # Replace line with spaces, which looks like backspaces. + for _ in range(len(test_str)): + exp_console_out += BACKSPACE_STRING + + # Verify the console output. + CheckConsoleOutput(self, exp_console_out) + + def test_KillPartialLine(self): + """Verify that we can kill a portion of a line.""" + test_str = b"accelread 0 1" + input_stream = BytesToByteList(test_str) + len_to_kill = 5 + for _ in range(len_to_kill): + # Move cursor left + input_stream.extend(Keys.LEFT_ARROW) + # Now kill + input_stream.append(console.ControlKey.CTRL_K) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # First, check that the input buffer was truncated. + exp_input_buffer = test_str[:-len_to_kill] + CheckInputBuffer(self, exp_input_buffer) + + # Verify the input buffer position. + CheckInputBufferPosition(self, len(test_str) - len_to_kill) + + # The console output stream that we expect is the test string followed by a + # move left of len_to_kill, then a jump to the end of the line and backspace + # of len_to_kill. + exp_console_out = test_str + for _ in range(len_to_kill): + # Move left 1 column. + exp_console_out += OutputStream.MoveCursorLeft(1) + # Then jump to the end of the line + exp_console_out += OutputStream.MoveCursorRight(len_to_kill) + # Backspace of len_to_kill + for _ in range(len_to_kill): + exp_console_out += BACKSPACE_STRING + + # Verify console output. + CheckConsoleOutput(self, exp_console_out) + + def test_InsertingCharacters(self): + """Verify that we can insert characters within the line.""" + test_str = b"accel 0 1" # Here we forgot the 'read' part in 'accelread' + input_stream = BytesToByteList(test_str) + # We need to move over to the 'l' and add read. + insertion_point = test_str.find(b"l") + 1 + for i in range(len(test_str) - insertion_point): + # Move cursor left. + input_stream.extend(Keys.LEFT_ARROW) + # Now, add in 'read' + added_str = b"read" + input_stream.extend(BytesToByteList(added_str)) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # First, verify that the input buffer is correct. + exp_input_buffer = test_str[:insertion_point] + added_str + exp_input_buffer += test_str[insertion_point:] + CheckInputBuffer(self, exp_input_buffer) + + # Verify that the input buffer position is correct. + exp_input_buffer_pos = insertion_point + len(added_str) + CheckInputBufferPosition(self, exp_input_buffer_pos) + + # The console output stream that we expect is the test string, followed by + # move cursor left until the 'l' was found, the added test string while + # shifting characters around. + exp_console_out = test_str + for i in range(len(test_str) - insertion_point): + # Move cursor left. + exp_console_out += OutputStream.MoveCursorLeft(1) + + # Now for each character, write the rest of the line will be shifted to the + # right one column. + for i in range(len(added_str)): + # Printed character. + exp_console_out += added_str[i : i + 1] + # The rest of the line + exp_console_out += test_str[insertion_point:] + # Reset the cursor back left + reset_dist = len(test_str[insertion_point:]) + exp_console_out += OutputStream.MoveCursorLeft(reset_dist) + + # Verify the console output. + CheckConsoleOutput(self, exp_console_out) + + def test_StoreCommandHistory(self): + """Verify that entered commands are stored in the history.""" + test_commands = [] + test_commands.append(b"help") + test_commands.append(b"version") + test_commands.append(b"accelread 0 1") + input_stream = [] + for c in test_commands: + input_stream.extend(BytesToByteList(c)) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # We expect to have the test commands in the history buffer. + exp_history_buf = test_commands + CheckHistoryBuffer(self, exp_history_buf) + + def test_CycleUpThruCommandHistory(self): + """Verify that the UP arrow key will print itmes in the history buffer.""" + # Enter some commands. + test_commands = [b"version", b"accelrange 0", b"battery", b"gettime"] + input_stream = [] + for command in test_commands: + input_stream.extend(BytesToByteList(command)) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Now, hit the UP arrow key to print the previous entries. + for i in range(len(test_commands)): + input_stream.extend(Keys.UP_ARROW) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # The expected output should be test commands with prompts printed in + # between, followed by line kills with the previous test commands printed. + exp_console_out = b"" + for i in range(len(test_commands)): + exp_console_out += test_commands[i] + b"\r\n" + self.console.prompt + + # When we press up, the line should be cleared and print the previous buffer + # entry. + for i in range(len(test_commands) - 1, 0, -1): + exp_console_out += test_commands[i] + # Backspace to the beginning. + for i in range(len(test_commands[i])): + exp_console_out += BACKSPACE_STRING + + # The last command should just be printed out with no backspacing. + exp_console_out += test_commands[0] + + # Now, verify. + CheckConsoleOutput(self, exp_console_out) + + def test_UpArrowOnEmptyHistory(self): + """Ensure nothing happens if the history is empty.""" + # Press the up arrow key twice. + input_stream = 2 * Keys.UP_ARROW + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # We expect nothing to have happened. + exp_console_out = b"" + exp_input_buffer = b"" + exp_input_buffer_pos = 0 + exp_history_buf = [] + + # Verify. + CheckConsoleOutput(self, exp_console_out) + CheckInputBufferPosition(self, exp_input_buffer_pos) + CheckInputBuffer(self, exp_input_buffer) + CheckHistoryBuffer(self, exp_history_buf) + + def test_UpArrowDoesNotGoOutOfBounds(self): + """Verify that pressing the up arrow many times won't go out of bounds.""" + # Enter one command. + test_str = b"help version" + input_stream = BytesToByteList(test_str) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + # Then press the up arrow key twice. + input_stream.extend(2 * Keys.UP_ARROW) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify that the history buffer is correct. + exp_history_buf = [test_str] + CheckHistoryBuffer(self, exp_history_buf) + + # We expect that the console output should only contain our entered command, + # a new prompt, and then our command aggain. + exp_console_out = test_str + b"\r\n" + self.console.prompt + # Pressing up should reprint the command we entered. + exp_console_out += test_str + + # Verify. + CheckConsoleOutput(self, exp_console_out) + + def test_CycleDownThruCommandHistory(self): + """Verify that we can select entries by hitting the down arrow.""" + # Enter at least 4 commands. + test_commands = [b"version", b"accelrange 0", b"battery", b"gettime"] + input_stream = [] + for command in test_commands: + input_stream.extend(BytesToByteList(command)) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Now, hit the UP arrow key twice to print the previous two entries. + for i in range(2): + input_stream.extend(Keys.UP_ARROW) + + # Now, hit the DOWN arrow key twice to print the newer entries. + input_stream.extend(2 * Keys.DOWN_ARROW) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # The expected output should be commands that we entered, followed by + # prompts, then followed by our last two commands in reverse. Then, we + # should see the last entry in the list, followed by the saved partial cmd + # of a blank line. + exp_console_out = b"" + for i in range(len(test_commands)): + exp_console_out += test_commands[i] + b"\r\n" + self.console.prompt + + # When we press up, the line should be cleared and print the previous buffer + # entry. + for i in range(len(test_commands) - 1, 1, -1): + exp_console_out += test_commands[i] + # Backspace to the beginning. + for i in range(len(test_commands[i])): + exp_console_out += BACKSPACE_STRING + + # When we press down, it should have cleared the last command (which we + # covered with the previous for loop), and then prints the next command. + exp_console_out += test_commands[3] + for i in range(len(test_commands[3])): + exp_console_out += BACKSPACE_STRING + + # Verify console output. + CheckConsoleOutput(self, exp_console_out) + + # Verify input buffer. + exp_input_buffer = b"" # Empty because our partial command was empty. + exp_input_buffer_pos = len(exp_input_buffer) + CheckInputBuffer(self, exp_input_buffer) + CheckInputBufferPosition(self, exp_input_buffer_pos) + + def test_SavingPartialCommandWhenNavigatingHistory(self): + """Verify that partial commands are saved when navigating history.""" + # Enter a command. + test_str = b"accelinfo" + input_stream = BytesToByteList(test_str) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Enter a partial command. + partial_cmd = b"ver" + input_stream.extend(BytesToByteList(partial_cmd)) + + # Hit the UP arrow key. + input_stream.extend(Keys.UP_ARROW) + # Then, the DOWN arrow key. + input_stream.extend(Keys.DOWN_ARROW) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # The expected output should be the command we entered, a prompt, the + # partial command, clearing of the partial command, the command entered, + # clearing of the command entered, and then the partial command. + exp_console_out = test_str + b"\r\n" + self.console.prompt + exp_console_out += partial_cmd + for _ in range(len(partial_cmd)): + exp_console_out += BACKSPACE_STRING + exp_console_out += test_str + for _ in range(len(test_str)): + exp_console_out += BACKSPACE_STRING + exp_console_out += partial_cmd + + # Verify console output. + CheckConsoleOutput(self, exp_console_out) + + # Verify input buffer. + exp_input_buffer = partial_cmd + exp_input_buffer_pos = len(exp_input_buffer) + CheckInputBuffer(self, exp_input_buffer) + CheckInputBufferPosition(self, exp_input_buffer_pos) + + def test_DownArrowOnEmptyHistory(self): + """Ensure nothing happens if the history is empty.""" + # Then press the up down arrow twice. + input_stream = 2 * Keys.DOWN_ARROW + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # We expect nothing to have happened. + exp_console_out = b"" + exp_input_buffer = b"" + exp_input_buffer_pos = 0 + exp_history_buf = [] + + # Verify. + CheckConsoleOutput(self, exp_console_out) + CheckInputBufferPosition(self, exp_input_buffer_pos) + CheckInputBuffer(self, exp_input_buffer) + CheckHistoryBuffer(self, exp_history_buf) + + def test_DeleteCharsUsingDELKey(self): + """Verify that we can delete characters using the DEL key.""" + test_str = b"version" + input_stream = BytesToByteList(test_str) + + # Hit the left arrow key 2 times. + input_stream.extend(2 * Keys.LEFT_ARROW) + + # Press the DEL key. + input_stream.extend(Keys.DEL) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # The expected output should be the command we entered, 2 individual cursor + # moves to the left, and then removing a char and shifting everything to the + # left one column. + exp_console_out = test_str + exp_console_out += 2 * OutputStream.MoveCursorLeft(1) + + # Remove the char by shifting everything to the left one, slicing out the + # remove char. + exp_console_out += test_str[-1:] + b" " + + # Reset the cursor by moving back 2 columns because of the 'n' and space. + exp_console_out += OutputStream.MoveCursorLeft(2) + + # Verify console output. + CheckConsoleOutput(self, exp_console_out) + + # Verify input buffer. The input buffer should have the char sliced out and + # be positioned where the char was removed. + exp_input_buffer = test_str[:-2] + test_str[-1:] + exp_input_buffer_pos = len(exp_input_buffer) - 1 + CheckInputBuffer(self, exp_input_buffer) + CheckInputBufferPosition(self, exp_input_buffer_pos) + + def test_RepeatedCommandInHistory(self): + """Verify that we don't store 2 consecutive identical commands in history""" + # Enter a few commands. + test_commands = [b"version", b"accelrange 0", b"battery", b"gettime"] + # Repeat the last command. + test_commands.append(test_commands[len(test_commands) - 1]) + + input_stream = [] + for command in test_commands: + input_stream.extend(BytesToByteList(command)) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Verify that the history buffer is correct. The last command, since + # it was repeated, should not have been added to the history. + exp_history_buf = test_commands[0 : len(test_commands) - 1] + CheckHistoryBuffer(self, exp_history_buf) - Args: - mock_check: A MagicMock object replacing the CheckForEnhancedECImage() - method. - """ - # Set the interrogation mode to always so that we actually interrogate. - self.console.interrogation_mode = b'always' - - # First, assume that the EC interrogations indicate an enhanced EC image. - mock_check.return_value = True - # But our current knowledge of the EC image (which was actually the - # 'previous' EC) was a non-enhanced image. - self.console.enhanced_ec = False - - test_command = b'sysinfo' - input_stream = [] - input_stream.extend(BytesToByteList(test_command)) - - expected_calls = [] - # All keystrokes to the console should be directed straight through to the - # EC until we press the enter key. - for char in test_command: - if six.PY3: - expected_calls.append(mock.call(bytes([char]))) - else: - expected_calls.append(mock.call(char)) - - # Press the enter key. - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - # The enter key should not be sent to the pipe since we should negotiate - # to an enhanced EC image. - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # At this point, we should have negotiated to enhanced. - self.assertTrue(self.console.enhanced_ec, msg=('Did not negotiate to ' - 'enhanced EC image.')) - - # The command would have been dropped however, so verify this... - CheckInputBuffer(self, b'') - CheckInputBufferPosition(self, 0) - # ...and repeat the command. - input_stream = BytesToByteList(test_command) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Since we're enhanced now, we should have sent the entire command as one - # string with no trailing carriage return - expected_calls.append(mock.call(test_command)) - - # Verify all of the calls. - self.console.cmd_pipe.send.assert_has_calls(expected_calls) - - @mock.patch('ec3po.console.Console.CheckForEnhancedECImage') - def test_TransitionFromEnhancedToNonEnhanced(self, mock_check): - """Verify that we transition correctly to non-enhanced mode. - Args: - mock_check: A MagicMock object replacing the CheckForEnhancedECImage() - method. - """ - # Set the interrogation mode to always so that we actually interrogate. - self.console.interrogation_mode = b'always' - - # First, assume that the EC interrogations indicate an non-enhanced EC - # image. - mock_check.return_value = False - # But our current knowledge of the EC image (which was actually the - # 'previous' EC) was an enhanced image. - self.console.enhanced_ec = True - - test_command = b'sysinfo' - input_stream = [] - input_stream.extend(BytesToByteList(test_command)) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # But, we will negotiate to non-enhanced however, dropping this command. - # Verify this. - self.assertFalse(self.console.enhanced_ec, msg=('Did not negotiate to' - 'non-enhanced EC image.')) - CheckInputBuffer(self, b'') - CheckInputBufferPosition(self, 0) - - # The carriage return should have passed through though. - expected_calls = [] - expected_calls.append(mock.call( - six.int2byte(console.ControlKey.CARRIAGE_RETURN))) - - # Since the command was dropped, repeat the command. - input_stream = BytesToByteList(test_command) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # Since we're not enhanced now, we should have sent each character in the - # entire command separately and a carriage return. - for char in test_command: - if six.PY3: - expected_calls.append(mock.call(bytes([char]))) - else: - expected_calls.append(mock.call(char)) - expected_calls.append(mock.call( - six.int2byte(console.ControlKey.CARRIAGE_RETURN))) - - # Verify all of the calls. - self.console.cmd_pipe.send.assert_has_calls(expected_calls) - - def test_EnhancedCheckIfTimedOut(self): - """Verify that the check returns false if it times out.""" - # Make the debug pipe "time out". - self.console.dbg_pipe.poll.return_value = False - self.assertFalse(self.console.CheckForEnhancedECImage()) - - def test_EnhancedCheckIfACKReceived(self): - """Verify that the check returns true if the ACK is received.""" - # Make the debug pipe return EC_ACK. - self.console.dbg_pipe.poll.return_value = True - self.console.dbg_pipe.recv.return_value = interpreter.EC_ACK - self.assertTrue(self.console.CheckForEnhancedECImage()) - - def test_EnhancedCheckIfWrong(self): - """Verify that the check returns false if byte received is wrong.""" - # Make the debug pipe return the wrong byte. - self.console.dbg_pipe.poll.return_value = True - self.console.dbg_pipe.recv.return_value = b'\xff' - self.assertFalse(self.console.CheckForEnhancedECImage()) - - def test_EnhancedCheckUsingBuffer(self): - """Verify that given reboot output, enhanced EC images are detected.""" - enhanced_output_stream = b""" +class TestConsoleCompatibility(unittest.TestCase): + """Verify that console can speak to enhanced and non-enhanced EC images.""" + + def setUp(self): + """Setup the test harness.""" + # Setup logging with a timestamp, the module, and the log level. + logging.basicConfig( + level=logging.DEBUG, + format=("%(asctime)s - %(module)s -" " %(levelname)s - %(message)s"), + ) + # Create a temp file and set both the controller and peripheral PTYs to the + # file to create a loopback. + self.tempfile = tempfile.TemporaryFile() + + # Mock out the pipes. + mock_pipe_end_0, mock_pipe_end_1 = mock.MagicMock(), mock.MagicMock() + self.console = console.Console( + self.tempfile.fileno(), + self.tempfile, + tempfile.TemporaryFile(), + mock_pipe_end_0, + mock_pipe_end_1, + "EC", + ) + + @mock.patch("ec3po.console.Console.CheckForEnhancedECImage") + def test_ActAsPassThruInNonEnhancedMode(self, mock_check): + """Verify we simply pass everything thru to non-enhanced ECs. + + Args: + mock_check: A MagicMock object replacing the CheckForEnhancedECImage() + method. + """ + # Set the interrogation mode to always so that we actually interrogate. + self.console.interrogation_mode = b"always" + + # Assume EC interrogations indicate that the image is non-enhanced. + mock_check.return_value = False + + # Press enter, followed by the command, and another enter. + input_stream = [] + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + test_command = b"version" + input_stream.extend(BytesToByteList(test_command)) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Expected calls to send down the pipe would be each character of the test + # command. + expected_calls = [] + expected_calls.append( + mock.call(six.int2byte(console.ControlKey.CARRIAGE_RETURN)) + ) + for char in test_command: + if six.PY3: + expected_calls.append(mock.call(bytes([char]))) + else: + expected_calls.append(mock.call(char)) + expected_calls.append( + mock.call(six.int2byte(console.ControlKey.CARRIAGE_RETURN)) + ) + + # Verify that the calls happened. + self.console.cmd_pipe.send.assert_has_calls(expected_calls) + + # Since we're acting as a pass-thru, the input buffer should be empty and + # input_buffer_pos is 0. + CheckInputBuffer(self, b"") + CheckInputBufferPosition(self, 0) + + @mock.patch("ec3po.console.Console.CheckForEnhancedECImage") + def test_TransitionFromNonEnhancedToEnhanced(self, mock_check): + """Verify that we transition correctly to enhanced mode. + + Args: + mock_check: A MagicMock object replacing the CheckForEnhancedECImage() + method. + """ + # Set the interrogation mode to always so that we actually interrogate. + self.console.interrogation_mode = b"always" + + # First, assume that the EC interrogations indicate an enhanced EC image. + mock_check.return_value = True + # But our current knowledge of the EC image (which was actually the + # 'previous' EC) was a non-enhanced image. + self.console.enhanced_ec = False + + test_command = b"sysinfo" + input_stream = [] + input_stream.extend(BytesToByteList(test_command)) + + expected_calls = [] + # All keystrokes to the console should be directed straight through to the + # EC until we press the enter key. + for char in test_command: + if six.PY3: + expected_calls.append(mock.call(bytes([char]))) + else: + expected_calls.append(mock.call(char)) + + # Press the enter key. + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + # The enter key should not be sent to the pipe since we should negotiate + # to an enhanced EC image. + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # At this point, we should have negotiated to enhanced. + self.assertTrue( + self.console.enhanced_ec, msg=("Did not negotiate to " "enhanced EC image.") + ) + + # The command would have been dropped however, so verify this... + CheckInputBuffer(self, b"") + CheckInputBufferPosition(self, 0) + # ...and repeat the command. + input_stream = BytesToByteList(test_command) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Since we're enhanced now, we should have sent the entire command as one + # string with no trailing carriage return + expected_calls.append(mock.call(test_command)) + + # Verify all of the calls. + self.console.cmd_pipe.send.assert_has_calls(expected_calls) + + @mock.patch("ec3po.console.Console.CheckForEnhancedECImage") + def test_TransitionFromEnhancedToNonEnhanced(self, mock_check): + """Verify that we transition correctly to non-enhanced mode. + + Args: + mock_check: A MagicMock object replacing the CheckForEnhancedECImage() + method. + """ + # Set the interrogation mode to always so that we actually interrogate. + self.console.interrogation_mode = b"always" + + # First, assume that the EC interrogations indicate an non-enhanced EC + # image. + mock_check.return_value = False + # But our current knowledge of the EC image (which was actually the + # 'previous' EC) was an enhanced image. + self.console.enhanced_ec = True + + test_command = b"sysinfo" + input_stream = [] + input_stream.extend(BytesToByteList(test_command)) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # But, we will negotiate to non-enhanced however, dropping this command. + # Verify this. + self.assertFalse( + self.console.enhanced_ec, + msg=("Did not negotiate to" "non-enhanced EC image."), + ) + CheckInputBuffer(self, b"") + CheckInputBufferPosition(self, 0) + + # The carriage return should have passed through though. + expected_calls = [] + expected_calls.append( + mock.call(six.int2byte(console.ControlKey.CARRIAGE_RETURN)) + ) + + # Since the command was dropped, repeat the command. + input_stream = BytesToByteList(test_command) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # Since we're not enhanced now, we should have sent each character in the + # entire command separately and a carriage return. + for char in test_command: + if six.PY3: + expected_calls.append(mock.call(bytes([char]))) + else: + expected_calls.append(mock.call(char)) + expected_calls.append( + mock.call(six.int2byte(console.ControlKey.CARRIAGE_RETURN)) + ) + + # Verify all of the calls. + self.console.cmd_pipe.send.assert_has_calls(expected_calls) + + def test_EnhancedCheckIfTimedOut(self): + """Verify that the check returns false if it times out.""" + # Make the debug pipe "time out". + self.console.dbg_pipe.poll.return_value = False + self.assertFalse(self.console.CheckForEnhancedECImage()) + + def test_EnhancedCheckIfACKReceived(self): + """Verify that the check returns true if the ACK is received.""" + # Make the debug pipe return EC_ACK. + self.console.dbg_pipe.poll.return_value = True + self.console.dbg_pipe.recv.return_value = interpreter.EC_ACK + self.assertTrue(self.console.CheckForEnhancedECImage()) + + def test_EnhancedCheckIfWrong(self): + """Verify that the check returns false if byte received is wrong.""" + # Make the debug pipe return the wrong byte. + self.console.dbg_pipe.poll.return_value = True + self.console.dbg_pipe.recv.return_value = b"\xff" + self.assertFalse(self.console.CheckForEnhancedECImage()) + + def test_EnhancedCheckUsingBuffer(self): + """Verify that given reboot output, enhanced EC images are detected.""" + enhanced_output_stream = b""" --- UART initialized after reboot --- [Reset cause: reset-pin soft] [Image: RO, jerry_v1.1.4363-2af8572-dirty 2016-02-23 13:26:20 aaboagye@lithium.mtv.corp.google.com] @@ -1295,19 +1343,19 @@ Enhanced Console is enabled (v1.0.0); type HELP for help. [0.224060 hash done 41dac382e3a6e3d2ea5b4d789c1bc46525cae7cc5ff6758f0de8d8369b506f57] [0.375150 POWER_GOOD seen] """ - for line in enhanced_output_stream.split(b'\n'): - self.console.CheckBufferForEnhancedImage(line) + for line in enhanced_output_stream.split(b"\n"): + self.console.CheckBufferForEnhancedImage(line) - # Since the enhanced console string was present in the output, the console - # should have caught it. - self.assertTrue(self.console.enhanced_ec) + # Since the enhanced console string was present in the output, the console + # should have caught it. + self.assertTrue(self.console.enhanced_ec) - # Also should check that the command was sent to the interpreter. - self.console.cmd_pipe.send.assert_called_once_with(b'enhanced True') + # Also should check that the command was sent to the interpreter. + self.console.cmd_pipe.send.assert_called_once_with(b"enhanced True") - # Now test the non-enhanced EC image. - self.console.cmd_pipe.reset_mock() - non_enhanced_output_stream = b""" + # Now test the non-enhanced EC image. + self.console.cmd_pipe.reset_mock() + non_enhanced_output_stream = b""" --- UART initialized after reboot --- [Reset cause: reset-pin soft] [Image: RO, jerry_v1.1.4363-2af8572-dirty 2016-02-23 13:03:15 aaboagye@lithium.mtv.corp.google.com] @@ -1331,239 +1379,253 @@ Console is enabled; type HELP for help. [0.010285 power on 2] [0.010385 power state 5 = S5->S3, in 0x0000] """ - for line in non_enhanced_output_stream.split(b'\n'): - self.console.CheckBufferForEnhancedImage(line) + for line in non_enhanced_output_stream.split(b"\n"): + self.console.CheckBufferForEnhancedImage(line) - # Since the default console string is present in the output, it should be - # determined to be non enhanced now. - self.assertFalse(self.console.enhanced_ec) + # Since the default console string is present in the output, it should be + # determined to be non enhanced now. + self.assertFalse(self.console.enhanced_ec) - # Check that command was also sent to the interpreter. - self.console.cmd_pipe.send.assert_called_once_with(b'enhanced False') + # Check that command was also sent to the interpreter. + self.console.cmd_pipe.send.assert_called_once_with(b"enhanced False") class TestOOBMConsoleCommands(unittest.TestCase): - """Verify that OOBM console commands work correctly.""" - def setUp(self): - """Setup the test harness.""" - # Setup logging with a timestamp, the module, and the log level. - logging.basicConfig(level=logging.DEBUG, - format=('%(asctime)s - %(module)s -' - ' %(levelname)s - %(message)s')) - # Create a temp file and set both the controller and peripheral PTYs to the - # file to create a loopback. - self.tempfile = tempfile.TemporaryFile() - - # Mock out the pipes. - mock_pipe_end_0, mock_pipe_end_1 = mock.MagicMock(), mock.MagicMock() - self.console = console.Console(self.tempfile.fileno(), self.tempfile, - tempfile.TemporaryFile(), - mock_pipe_end_0, mock_pipe_end_1, "EC") - self.console.oobm_queue = mock.MagicMock() - - @mock.patch('ec3po.console.Console.CheckForEnhancedECImage') - def test_InterrogateCommand(self, mock_check): - """Verify that 'interrogate' command works as expected. - - Args: - mock_check: A MagicMock object replacing the CheckForEnhancedECIMage() - method. - """ - input_stream = [] - expected_calls = [] - mock_check.side_effect = [False] - - # 'interrogate never' should disable the interrogation from happening at - # all. - cmd = b'interrogate never' - # Enter the OOBM prompt. - input_stream.extend(BytesToByteList(b'%')) - # Type the command - input_stream.extend(BytesToByteList(cmd)) - # Press enter. - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - input_stream = [] - - # The OOBM queue should have been called with the command being put. - expected_calls.append(mock.call.put(cmd)) - self.console.oobm_queue.assert_has_calls(expected_calls) - - # Process the OOBM queue. - self.console.oobm_queue.get.side_effect = [cmd] - self.console.ProcessOOBMQueue() - - # Type out a few commands. - input_stream.extend(BytesToByteList(b'version')) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - input_stream.extend(BytesToByteList(b'flashinfo')) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - input_stream.extend(BytesToByteList(b'sysinfo')) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # The Check function should NOT have been called at all. - mock_check.assert_not_called() - - # The EC image should be assumed to be not enhanced. - self.assertFalse(self.console.enhanced_ec, 'The image should be assumed to' - ' be NOT enhanced.') - - # Reset the mocks. - mock_check.reset_mock() - self.console.oobm_queue.reset_mock() - - # 'interrogate auto' should not interrogate at all. It should only be - # scanning the output stream for the 'console is enabled' strings. - cmd = b'interrogate auto' - # Enter the OOBM prompt. - input_stream.extend(BytesToByteList(b'%')) - # Type the command - input_stream.extend(BytesToByteList(cmd)) - # Press enter. - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - input_stream = [] - expected_calls = [] - - # The OOBM queue should have been called with the command being put. - expected_calls.append(mock.call.put(cmd)) - self.console.oobm_queue.assert_has_calls(expected_calls) - - # Process the OOBM queue. - self.console.oobm_queue.get.side_effect = [cmd] - self.console.ProcessOOBMQueue() - - # Type out a few commands. - input_stream.extend(BytesToByteList(b'version')) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - input_stream.extend(BytesToByteList(b'flashinfo')) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - input_stream.extend(BytesToByteList(b'sysinfo')) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # The Check function should NOT have been called at all. - mock_check.assert_not_called() - - # The EC image should be assumed to be not enhanced. - self.assertFalse(self.console.enhanced_ec, 'The image should be assumed to' - ' be NOT enhanced.') - - # Reset the mocks. - mock_check.reset_mock() - self.console.oobm_queue.reset_mock() - - # 'interrogate always' should, like its name implies, interrogate always - # after each press of the enter key. This was the former way of doing - # interrogation. - cmd = b'interrogate always' - # Enter the OOBM prompt. - input_stream.extend(BytesToByteList(b'%')) - # Type the command - input_stream.extend(BytesToByteList(cmd)) - # Press enter. - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - input_stream = [] - expected_calls = [] - - # The OOBM queue should have been called with the command being put. - expected_calls.append(mock.call.put(cmd)) - self.console.oobm_queue.assert_has_calls(expected_calls) - - # Process the OOBM queue. - self.console.oobm_queue.get.side_effect = [cmd] - self.console.ProcessOOBMQueue() - - # The Check method should be called 3 times here. - mock_check.side_effect = [False, False, False] - - # Type out a few commands. - input_stream.extend(BytesToByteList(b'help list')) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - input_stream.extend(BytesToByteList(b'taskinfo')) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - input_stream.extend(BytesToByteList(b'hibdelay')) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # The Check method should have been called 3 times here. - expected_calls = [mock.call(), mock.call(), mock.call()] - mock_check.assert_has_calls(expected_calls) - - # The EC image should be assumed to be not enhanced. - self.assertFalse(self.console.enhanced_ec, 'The image should be assumed to' - ' be NOT enhanced.') - - # Now, let's try to assume that the image is enhanced while still disabling - # interrogation. - mock_check.reset_mock() - self.console.oobm_queue.reset_mock() - input_stream = [] - cmd = b'interrogate never enhanced' - # Enter the OOBM prompt. - input_stream.extend(BytesToByteList(b'%')) - # Type the command - input_stream.extend(BytesToByteList(cmd)) - # Press enter. - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - input_stream = [] - expected_calls = [] - - # The OOBM queue should have been called with the command being put. - expected_calls.append(mock.call.put(cmd)) - self.console.oobm_queue.assert_has_calls(expected_calls) - - # Process the OOBM queue. - self.console.oobm_queue.get.side_effect = [cmd] - self.console.ProcessOOBMQueue() - - # Type out a few commands. - input_stream.extend(BytesToByteList(b'chgstate')) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - input_stream.extend(BytesToByteList(b'hash')) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - input_stream.extend(BytesToByteList(b'sysjump rw')) - input_stream.append(console.ControlKey.CARRIAGE_RETURN) - - # Send the sequence out. - for byte in input_stream: - self.console.HandleChar(byte) - - # The check method should have never been called. - mock_check.assert_not_called() - - # The EC image should be assumed to be enhanced. - self.assertTrue(self.console.enhanced_ec, 'The image should be' - ' assumed to be enhanced.') - - -if __name__ == '__main__': - unittest.main() + """Verify that OOBM console commands work correctly.""" + + def setUp(self): + """Setup the test harness.""" + # Setup logging with a timestamp, the module, and the log level. + logging.basicConfig( + level=logging.DEBUG, + format=("%(asctime)s - %(module)s -" " %(levelname)s - %(message)s"), + ) + # Create a temp file and set both the controller and peripheral PTYs to the + # file to create a loopback. + self.tempfile = tempfile.TemporaryFile() + + # Mock out the pipes. + mock_pipe_end_0, mock_pipe_end_1 = mock.MagicMock(), mock.MagicMock() + self.console = console.Console( + self.tempfile.fileno(), + self.tempfile, + tempfile.TemporaryFile(), + mock_pipe_end_0, + mock_pipe_end_1, + "EC", + ) + self.console.oobm_queue = mock.MagicMock() + + @mock.patch("ec3po.console.Console.CheckForEnhancedECImage") + def test_InterrogateCommand(self, mock_check): + """Verify that 'interrogate' command works as expected. + + Args: + mock_check: A MagicMock object replacing the CheckForEnhancedECIMage() + method. + """ + input_stream = [] + expected_calls = [] + mock_check.side_effect = [False] + + # 'interrogate never' should disable the interrogation from happening at + # all. + cmd = b"interrogate never" + # Enter the OOBM prompt. + input_stream.extend(BytesToByteList(b"%")) + # Type the command + input_stream.extend(BytesToByteList(cmd)) + # Press enter. + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + input_stream = [] + + # The OOBM queue should have been called with the command being put. + expected_calls.append(mock.call.put(cmd)) + self.console.oobm_queue.assert_has_calls(expected_calls) + + # Process the OOBM queue. + self.console.oobm_queue.get.side_effect = [cmd] + self.console.ProcessOOBMQueue() + + # Type out a few commands. + input_stream.extend(BytesToByteList(b"version")) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + input_stream.extend(BytesToByteList(b"flashinfo")) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + input_stream.extend(BytesToByteList(b"sysinfo")) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # The Check function should NOT have been called at all. + mock_check.assert_not_called() + + # The EC image should be assumed to be not enhanced. + self.assertFalse( + self.console.enhanced_ec, + "The image should be assumed to" " be NOT enhanced.", + ) + + # Reset the mocks. + mock_check.reset_mock() + self.console.oobm_queue.reset_mock() + + # 'interrogate auto' should not interrogate at all. It should only be + # scanning the output stream for the 'console is enabled' strings. + cmd = b"interrogate auto" + # Enter the OOBM prompt. + input_stream.extend(BytesToByteList(b"%")) + # Type the command + input_stream.extend(BytesToByteList(cmd)) + # Press enter. + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + input_stream = [] + expected_calls = [] + + # The OOBM queue should have been called with the command being put. + expected_calls.append(mock.call.put(cmd)) + self.console.oobm_queue.assert_has_calls(expected_calls) + + # Process the OOBM queue. + self.console.oobm_queue.get.side_effect = [cmd] + self.console.ProcessOOBMQueue() + + # Type out a few commands. + input_stream.extend(BytesToByteList(b"version")) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + input_stream.extend(BytesToByteList(b"flashinfo")) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + input_stream.extend(BytesToByteList(b"sysinfo")) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # The Check function should NOT have been called at all. + mock_check.assert_not_called() + + # The EC image should be assumed to be not enhanced. + self.assertFalse( + self.console.enhanced_ec, + "The image should be assumed to" " be NOT enhanced.", + ) + + # Reset the mocks. + mock_check.reset_mock() + self.console.oobm_queue.reset_mock() + + # 'interrogate always' should, like its name implies, interrogate always + # after each press of the enter key. This was the former way of doing + # interrogation. + cmd = b"interrogate always" + # Enter the OOBM prompt. + input_stream.extend(BytesToByteList(b"%")) + # Type the command + input_stream.extend(BytesToByteList(cmd)) + # Press enter. + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + input_stream = [] + expected_calls = [] + + # The OOBM queue should have been called with the command being put. + expected_calls.append(mock.call.put(cmd)) + self.console.oobm_queue.assert_has_calls(expected_calls) + + # Process the OOBM queue. + self.console.oobm_queue.get.side_effect = [cmd] + self.console.ProcessOOBMQueue() + + # The Check method should be called 3 times here. + mock_check.side_effect = [False, False, False] + + # Type out a few commands. + input_stream.extend(BytesToByteList(b"help list")) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + input_stream.extend(BytesToByteList(b"taskinfo")) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + input_stream.extend(BytesToByteList(b"hibdelay")) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # The Check method should have been called 3 times here. + expected_calls = [mock.call(), mock.call(), mock.call()] + mock_check.assert_has_calls(expected_calls) + + # The EC image should be assumed to be not enhanced. + self.assertFalse( + self.console.enhanced_ec, + "The image should be assumed to" " be NOT enhanced.", + ) + + # Now, let's try to assume that the image is enhanced while still disabling + # interrogation. + mock_check.reset_mock() + self.console.oobm_queue.reset_mock() + input_stream = [] + cmd = b"interrogate never enhanced" + # Enter the OOBM prompt. + input_stream.extend(BytesToByteList(b"%")) + # Type the command + input_stream.extend(BytesToByteList(cmd)) + # Press enter. + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + input_stream = [] + expected_calls = [] + + # The OOBM queue should have been called with the command being put. + expected_calls.append(mock.call.put(cmd)) + self.console.oobm_queue.assert_has_calls(expected_calls) + + # Process the OOBM queue. + self.console.oobm_queue.get.side_effect = [cmd] + self.console.ProcessOOBMQueue() + + # Type out a few commands. + input_stream.extend(BytesToByteList(b"chgstate")) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + input_stream.extend(BytesToByteList(b"hash")) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + input_stream.extend(BytesToByteList(b"sysjump rw")) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # The check method should have never been called. + mock_check.assert_not_called() + + # The EC image should be assumed to be enhanced. + self.assertTrue( + self.console.enhanced_ec, "The image should be" " assumed to be enhanced." + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/util/ec3po/interpreter.py b/util/ec3po/interpreter.py index 4e151083bd..591603e038 100644 --- a/util/ec3po/interpreter.py +++ b/util/ec3po/interpreter.py @@ -25,443 +25,447 @@ import traceback import six - COMMAND_RETRIES = 3 # Number of attempts to retry a command. EC_MAX_READ = 1024 # Max bytes to read at a time from the EC. -EC_SYN = b'\xec' # Byte indicating EC interrogation. -EC_ACK = b'\xc0' # Byte representing correct EC response to interrogation. +EC_SYN = b"\xec" # Byte indicating EC interrogation. +EC_ACK = b"\xc0" # Byte representing correct EC response to interrogation. class LoggerAdapter(logging.LoggerAdapter): - """Class which provides a small adapter for the logger.""" + """Class which provides a small adapter for the logger.""" - def process(self, msg, kwargs): - """Prepends the served PTY to the beginning of the log message.""" - return '%s - %s' % (self.extra['pty'], msg), kwargs + def process(self, msg, kwargs): + """Prepends the served PTY to the beginning of the log message.""" + return "%s - %s" % (self.extra["pty"], msg), kwargs class Interpreter(object): - """Class which provides the interpretation layer between the EC and user. - - This class essentially performs all of the intepretation for the EC and the - user. It handles all of the automatic command retrying as well as the - formation of commands for EC images which support that. - - Attributes: - logger: A logger for this module. - ec_uart_pty: An opened file object to the raw EC UART PTY. - ec_uart_pty_name: A string containing the name of the raw EC UART PTY. - cmd_pipe: A socket.socket or multiprocessing.Connection object which - represents the Interpreter side of the command pipe. This must be a - bidirectional pipe. Commands and responses will utilize this pipe. - dbg_pipe: A socket.socket or multiprocessing.Connection object which - represents the Interpreter side of the debug pipe. This must be a - unidirectional pipe with write capabilities. EC debug output will utilize - this pipe. - cmd_retries: An integer representing the number of attempts the console - should retry commands if it receives an error. - log_level: An integer representing the numeric value of the log level. - inputs: A list of objects that the intpreter selects for reading. - Initially, these are the EC UART and the command pipe. - outputs: A list of objects that the interpreter selects for writing. - ec_cmd_queue: A FIFO queue used for sending commands down to the EC UART. - last_cmd: A string that represents the last command sent to the EC. If an - error is encountered, the interpreter will attempt to retry this command - up to COMMAND_RETRIES. - enhanced_ec: A boolean indicating if the EC image that we are currently - communicating with is enhanced or not. Enhanced EC images will support - packed commands and host commands over the UART. This defaults to False - and is changed depending on the result of an interrogation. - interrogating: A boolean indicating if we are in the middle of interrogating - the EC. - connected: A boolean indicating if the interpreter is actually connected to - the UART and listening. - """ - def __init__(self, ec_uart_pty, cmd_pipe, dbg_pipe, log_level=logging.INFO, - name=None): - """Intializes an Interpreter object with the provided args. + """Class which provides the interpretation layer between the EC and user. - Args: - ec_uart_pty: A string representing the EC UART to connect to. + This class essentially performs all of the intepretation for the EC and the + user. It handles all of the automatic command retrying as well as the + formation of commands for EC images which support that. + + Attributes: + logger: A logger for this module. + ec_uart_pty: An opened file object to the raw EC UART PTY. + ec_uart_pty_name: A string containing the name of the raw EC UART PTY. cmd_pipe: A socket.socket or multiprocessing.Connection object which represents the Interpreter side of the command pipe. This must be a bidirectional pipe. Commands and responses will utilize this pipe. dbg_pipe: A socket.socket or multiprocessing.Connection object which represents the Interpreter side of the debug pipe. This must be a - unidirectional pipe with write capabilities. EC debug output will - utilize this pipe. + unidirectional pipe with write capabilities. EC debug output will utilize + this pipe. cmd_retries: An integer representing the number of attempts the console should retry commands if it receives an error. - log_level: An optional integer representing the numeric value of the log - level. By default, the log level will be logging.INFO (20). - name: the console source name - """ - # Create a unique logger based on the interpreter name - interpreter_prefix = ('%s - ' % name) if name else '' - logger = logging.getLogger('%sEC3PO.Interpreter' % interpreter_prefix) - self.logger = LoggerAdapter(logger, {'pty': ec_uart_pty}) - # TODO(https://crbug.com/1162189): revist the 2 TODOs below - # TODO(https://bugs.python.org/issue27805, python3.7+): revert to ab+ - # TODO(https://bugs.python.org/issue20074): removing buffering=0 if/when - # that gets fixed, or keep two pty: one for reading and one for writing - self.ec_uart_pty = open(ec_uart_pty, 'r+b', buffering=0) - self.ec_uart_pty_name = ec_uart_pty - self.cmd_pipe = cmd_pipe - self.dbg_pipe = dbg_pipe - self.cmd_retries = COMMAND_RETRIES - self.log_level = log_level - self.inputs = [self.ec_uart_pty, self.cmd_pipe] - self.outputs = [] - self.ec_cmd_queue = six.moves.queue.Queue() - self.last_cmd = b'' - self.enhanced_ec = False - self.interrogating = False - self.connected = True - - def __str__(self): - """Show internal state of the Interpreter object. - - Returns: - A string that shows the values of the attributes. - """ - string = [] - string.append('%r' % self) - string.append('ec_uart_pty: %s' % self.ec_uart_pty) - string.append('cmd_pipe: %r' % self.cmd_pipe) - string.append('dbg_pipe: %r' % self.dbg_pipe) - string.append('cmd_retries: %d' % self.cmd_retries) - string.append('log_level: %d' % self.log_level) - string.append('inputs: %r' % self.inputs) - string.append('outputs: %r' % self.outputs) - string.append('ec_cmd_queue: %r' % self.ec_cmd_queue) - string.append('last_cmd: \'%s\'' % self.last_cmd) - string.append('enhanced_ec: %r' % self.enhanced_ec) - string.append('interrogating: %r' % self.interrogating) - return '\n'.join(string) - - def EnqueueCmd(self, command): - """Enqueue a command to be sent to the EC UART. - - Args: - command: A string which contains the command to be sent. + log_level: An integer representing the numeric value of the log level. + inputs: A list of objects that the intpreter selects for reading. + Initially, these are the EC UART and the command pipe. + outputs: A list of objects that the interpreter selects for writing. + ec_cmd_queue: A FIFO queue used for sending commands down to the EC UART. + last_cmd: A string that represents the last command sent to the EC. If an + error is encountered, the interpreter will attempt to retry this command + up to COMMAND_RETRIES. + enhanced_ec: A boolean indicating if the EC image that we are currently + communicating with is enhanced or not. Enhanced EC images will support + packed commands and host commands over the UART. This defaults to False + and is changed depending on the result of an interrogation. + interrogating: A boolean indicating if we are in the middle of interrogating + the EC. + connected: A boolean indicating if the interpreter is actually connected to + the UART and listening. """ - self.ec_cmd_queue.put(command) - self.logger.log(1, 'Commands now in queue: %d', self.ec_cmd_queue.qsize()) - # Add the EC UART as an output to be serviced. - if self.connected and self.ec_uart_pty not in self.outputs: - self.outputs.append(self.ec_uart_pty) - - def PackCommand(self, raw_cmd): - r"""Packs a command for use with error checking. - - For error checking, we pack console commands in a particular format. The - format is as follows: - - &&[x][x][x][x]&{cmd}\n\n - ^ ^ ^^ ^^ ^ ^-- 2 newlines. - | | || || |-- the raw console command. - | | || ||-- 1 ampersand. - | | ||____|--- 2 hex digits representing the CRC8 of cmd. - | |____|-- 2 hex digits reprsenting the length of cmd. - |-- 2 ampersands - - Args: - raw_cmd: A pre-packed string which contains the raw command. - - Returns: - A string which contains the packed command. - """ - # Don't pack a single carriage return. - if raw_cmd != b'\r': - # The command format is as follows. - # &&[x][x][x][x]&{cmd}\n\n - packed_cmd = [] - packed_cmd.append(b'&&') - # The first pair of hex digits are the length of the command. - packed_cmd.append(b'%02x' % len(raw_cmd)) - # Then the CRC8 of cmd. - packed_cmd.append(b'%02x' % Crc8(raw_cmd)) - packed_cmd.append(b'&') - # Now, the raw command followed by 2 newlines. - packed_cmd.append(raw_cmd) - packed_cmd.append(b'\n\n') - return b''.join(packed_cmd) - else: - return raw_cmd - - def ProcessCommand(self, command): - """Captures the input determines what actions to take. - - Args: - command: A string representing the command sent by the user. - """ - if command == b'disconnect': - if self.connected: - self.logger.debug('UART disconnect request.') - # Drop all pending commands if any. - while not self.ec_cmd_queue.empty(): - c = self.ec_cmd_queue.get() - self.logger.debug('dropped: \'%s\'', c) - if self.enhanced_ec: - # Reset retry state. - self.cmd_retries = COMMAND_RETRIES - self.last_cmd = b'' - # Get the UART that the interpreter is attached to. - fileobj = self.ec_uart_pty - self.logger.debug('fileobj: %r', fileobj) - # Remove the descriptor from the inputs and outputs. - self.inputs.remove(fileobj) - if fileobj in self.outputs: - self.outputs.remove(fileobj) - self.logger.debug('Removed fileobj. Remaining inputs: %r', self.inputs) - # Close the file. - fileobj.close() - # Mark the interpreter as disconnected now. - self.connected = False - self.logger.debug('Disconnected from %s.', self.ec_uart_pty_name) - return - - elif command == b'reconnect': - if not self.connected: - self.logger.debug('UART reconnect request.') - # Reopen the PTY. + def __init__( + self, ec_uart_pty, cmd_pipe, dbg_pipe, log_level=logging.INFO, name=None + ): + """Intializes an Interpreter object with the provided args. + + Args: + ec_uart_pty: A string representing the EC UART to connect to. + cmd_pipe: A socket.socket or multiprocessing.Connection object which + represents the Interpreter side of the command pipe. This must be a + bidirectional pipe. Commands and responses will utilize this pipe. + dbg_pipe: A socket.socket or multiprocessing.Connection object which + represents the Interpreter side of the debug pipe. This must be a + unidirectional pipe with write capabilities. EC debug output will + utilize this pipe. + cmd_retries: An integer representing the number of attempts the console + should retry commands if it receives an error. + log_level: An optional integer representing the numeric value of the log + level. By default, the log level will be logging.INFO (20). + name: the console source name + """ + # Create a unique logger based on the interpreter name + interpreter_prefix = ("%s - " % name) if name else "" + logger = logging.getLogger("%sEC3PO.Interpreter" % interpreter_prefix) + self.logger = LoggerAdapter(logger, {"pty": ec_uart_pty}) + # TODO(https://crbug.com/1162189): revist the 2 TODOs below # TODO(https://bugs.python.org/issue27805, python3.7+): revert to ab+ # TODO(https://bugs.python.org/issue20074): removing buffering=0 if/when # that gets fixed, or keep two pty: one for reading and one for writing - fileobj = open(self.ec_uart_pty_name, 'r+b', buffering=0) - self.logger.debug('fileobj: %r', fileobj) - self.ec_uart_pty = fileobj - # Add the descriptor to the inputs. - self.inputs.append(fileobj) - self.logger.debug('fileobj added. curr inputs: %r', self.inputs) - # Mark the interpreter as connected now. - self.connected = True - self.logger.debug('Connected to %s.', self.ec_uart_pty_name) - return - - elif command.startswith(b'enhanced'): - self.enhanced_ec = command.split(b' ')[1] == b'True' - return - - # Ignore any other commands while in the disconnected state. - self.logger.log(1, 'command: \'%s\'', command) - if not self.connected: - self.logger.debug('Ignoring command because currently disconnected.') - return - - # Remove leading and trailing spaces only if this is an enhanced EC image. - # For non-enhanced EC images, commands will be single characters at a time - # and can be spaces. - if self.enhanced_ec: - command = command.strip(b' ') - - # There's nothing to do if the command is empty. - if len(command) == 0: - return - - # Handle log level change requests. - if command.startswith(b'loglevel'): - self.logger.debug('Log level change request.') - new_log_level = int(command.split(b' ')[1]) - self.logger.logger.setLevel(new_log_level) - self.logger.info('Log level changed to %d.', new_log_level) - return - - # Check for interrogation command. - if command == EC_SYN: - # User is requesting interrogation. Send SYN as is. - self.logger.debug('User requesting interrogation.') - self.interrogating = True - # Assume the EC isn't enhanced until we get a response. - self.enhanced_ec = False - elif self.enhanced_ec: - # Enhanced EC images require the plaintext commands to be packed. - command = self.PackCommand(command) - # TODO(aaboagye): Make a dict of commands and keys and eventually, - # handle partial matching based on unique prefixes. - - self.EnqueueCmd(command) - - def HandleCmdRetries(self): - """Attempts to retry commands if possible.""" - if self.cmd_retries > 0: - # The EC encountered an error. We'll have to retry again. - self.logger.warning('Retrying command...') - self.cmd_retries -= 1 - self.logger.warning('Retries remaining: %d', self.cmd_retries) - # Retry the command and add the EC UART to the writers again. - self.EnqueueCmd(self.last_cmd) - self.outputs.append(self.ec_uart_pty) - else: - # We're out of retries, so just give up. - self.logger.error('Command failed. No retries left.') - # Clear the command in progress. - self.last_cmd = b'' - # Reset the retry count. - self.cmd_retries = COMMAND_RETRIES - - def SendCmdToEC(self): - """Sends a command to the EC.""" - # If we're retrying a command, just try to send it again. - if self.cmd_retries < COMMAND_RETRIES: - cmd = self.last_cmd - else: - # If we're not retrying, we should not be writing to the EC if we have no - # items in our command queue. - assert not self.ec_cmd_queue.empty() - # Get the command to send. - cmd = self.ec_cmd_queue.get() - - # Send the command. - self.ec_uart_pty.write(cmd) - self.ec_uart_pty.flush() - self.logger.log(1, 'Sent command to EC.') - - if self.enhanced_ec and cmd != EC_SYN: - # Now, that we've sent the command, store the current command as the last - # command sent. If we encounter an error string, we will attempt to retry - # this command. - if cmd != self.last_cmd: - self.last_cmd = cmd - # Reset the retry count. + self.ec_uart_pty = open(ec_uart_pty, "r+b", buffering=0) + self.ec_uart_pty_name = ec_uart_pty + self.cmd_pipe = cmd_pipe + self.dbg_pipe = dbg_pipe self.cmd_retries = COMMAND_RETRIES + self.log_level = log_level + self.inputs = [self.ec_uart_pty, self.cmd_pipe] + self.outputs = [] + self.ec_cmd_queue = six.moves.queue.Queue() + self.last_cmd = b"" + self.enhanced_ec = False + self.interrogating = False + self.connected = True - # If no command is pending to be sent, then we can remove the EC UART from - # writers. Might need better checking for command retry logic in here. - if self.ec_cmd_queue.empty(): - # Remove the EC UART from the writers while we wait for a response. - self.logger.debug('Removing EC UART from writers.') - self.outputs.remove(self.ec_uart_pty) - - def HandleECData(self): - """Handle any debug prints from the EC.""" - self.logger.log(1, 'EC has data') - # Read what the EC sent us. - data = os.read(self.ec_uart_pty.fileno(), EC_MAX_READ) - self.logger.log(1, 'got: \'%s\'', binascii.hexlify(data)) - if b'&E' in data and self.enhanced_ec: - # We received an error, so we should retry it if possible. - self.logger.warning('Error string found in data.') - self.HandleCmdRetries() - return - - # If we were interrogating, check the response and update our knowledge - # of the current EC image. - if self.interrogating: - self.enhanced_ec = data == EC_ACK - if self.enhanced_ec: - self.logger.debug('The current EC image seems enhanced.') - else: - self.logger.debug('The current EC image does NOT seem enhanced.') - # Done interrogating. - self.interrogating = False - # For now, just forward everything the EC sends us. - self.logger.log(1, 'Forwarding to user...') - self.dbg_pipe.send(data) - - def HandleUserData(self): - """Handle any incoming commands from the user. - - Raises: - EOFError: Allowed to propagate through from self.cmd_pipe.recv(). - """ - self.logger.log(1, 'Command data available. Begin processing.') - data = self.cmd_pipe.recv() - # Process the command. - self.ProcessCommand(data) + def __str__(self): + """Show internal state of the Interpreter object. + + Returns: + A string that shows the values of the attributes. + """ + string = [] + string.append("%r" % self) + string.append("ec_uart_pty: %s" % self.ec_uart_pty) + string.append("cmd_pipe: %r" % self.cmd_pipe) + string.append("dbg_pipe: %r" % self.dbg_pipe) + string.append("cmd_retries: %d" % self.cmd_retries) + string.append("log_level: %d" % self.log_level) + string.append("inputs: %r" % self.inputs) + string.append("outputs: %r" % self.outputs) + string.append("ec_cmd_queue: %r" % self.ec_cmd_queue) + string.append("last_cmd: '%s'" % self.last_cmd) + string.append("enhanced_ec: %r" % self.enhanced_ec) + string.append("interrogating: %r" % self.interrogating) + return "\n".join(string) + + def EnqueueCmd(self, command): + """Enqueue a command to be sent to the EC UART. + + Args: + command: A string which contains the command to be sent. + """ + self.ec_cmd_queue.put(command) + self.logger.log(1, "Commands now in queue: %d", self.ec_cmd_queue.qsize()) + + # Add the EC UART as an output to be serviced. + if self.connected and self.ec_uart_pty not in self.outputs: + self.outputs.append(self.ec_uart_pty) + + def PackCommand(self, raw_cmd): + r"""Packs a command for use with error checking. + + For error checking, we pack console commands in a particular format. The + format is as follows: + + &&[x][x][x][x]&{cmd}\n\n + ^ ^ ^^ ^^ ^ ^-- 2 newlines. + | | || || |-- the raw console command. + | | || ||-- 1 ampersand. + | | ||____|--- 2 hex digits representing the CRC8 of cmd. + | |____|-- 2 hex digits reprsenting the length of cmd. + |-- 2 ampersands + + Args: + raw_cmd: A pre-packed string which contains the raw command. + + Returns: + A string which contains the packed command. + """ + # Don't pack a single carriage return. + if raw_cmd != b"\r": + # The command format is as follows. + # &&[x][x][x][x]&{cmd}\n\n + packed_cmd = [] + packed_cmd.append(b"&&") + # The first pair of hex digits are the length of the command. + packed_cmd.append(b"%02x" % len(raw_cmd)) + # Then the CRC8 of cmd. + packed_cmd.append(b"%02x" % Crc8(raw_cmd)) + packed_cmd.append(b"&") + # Now, the raw command followed by 2 newlines. + packed_cmd.append(raw_cmd) + packed_cmd.append(b"\n\n") + return b"".join(packed_cmd) + else: + return raw_cmd + + def ProcessCommand(self, command): + """Captures the input determines what actions to take. + + Args: + command: A string representing the command sent by the user. + """ + if command == b"disconnect": + if self.connected: + self.logger.debug("UART disconnect request.") + # Drop all pending commands if any. + while not self.ec_cmd_queue.empty(): + c = self.ec_cmd_queue.get() + self.logger.debug("dropped: '%s'", c) + if self.enhanced_ec: + # Reset retry state. + self.cmd_retries = COMMAND_RETRIES + self.last_cmd = b"" + # Get the UART that the interpreter is attached to. + fileobj = self.ec_uart_pty + self.logger.debug("fileobj: %r", fileobj) + # Remove the descriptor from the inputs and outputs. + self.inputs.remove(fileobj) + if fileobj in self.outputs: + self.outputs.remove(fileobj) + self.logger.debug("Removed fileobj. Remaining inputs: %r", self.inputs) + # Close the file. + fileobj.close() + # Mark the interpreter as disconnected now. + self.connected = False + self.logger.debug("Disconnected from %s.", self.ec_uart_pty_name) + return + + elif command == b"reconnect": + if not self.connected: + self.logger.debug("UART reconnect request.") + # Reopen the PTY. + # TODO(https://bugs.python.org/issue27805, python3.7+): revert to ab+ + # TODO(https://bugs.python.org/issue20074): removing buffering=0 if/when + # that gets fixed, or keep two pty: one for reading and one for writing + fileobj = open(self.ec_uart_pty_name, "r+b", buffering=0) + self.logger.debug("fileobj: %r", fileobj) + self.ec_uart_pty = fileobj + # Add the descriptor to the inputs. + self.inputs.append(fileobj) + self.logger.debug("fileobj added. curr inputs: %r", self.inputs) + # Mark the interpreter as connected now. + self.connected = True + self.logger.debug("Connected to %s.", self.ec_uart_pty_name) + return + + elif command.startswith(b"enhanced"): + self.enhanced_ec = command.split(b" ")[1] == b"True" + return + + # Ignore any other commands while in the disconnected state. + self.logger.log(1, "command: '%s'", command) + if not self.connected: + self.logger.debug("Ignoring command because currently disconnected.") + return + + # Remove leading and trailing spaces only if this is an enhanced EC image. + # For non-enhanced EC images, commands will be single characters at a time + # and can be spaces. + if self.enhanced_ec: + command = command.strip(b" ") + + # There's nothing to do if the command is empty. + if len(command) == 0: + return + + # Handle log level change requests. + if command.startswith(b"loglevel"): + self.logger.debug("Log level change request.") + new_log_level = int(command.split(b" ")[1]) + self.logger.logger.setLevel(new_log_level) + self.logger.info("Log level changed to %d.", new_log_level) + return + + # Check for interrogation command. + if command == EC_SYN: + # User is requesting interrogation. Send SYN as is. + self.logger.debug("User requesting interrogation.") + self.interrogating = True + # Assume the EC isn't enhanced until we get a response. + self.enhanced_ec = False + elif self.enhanced_ec: + # Enhanced EC images require the plaintext commands to be packed. + command = self.PackCommand(command) + # TODO(aaboagye): Make a dict of commands and keys and eventually, + # handle partial matching based on unique prefixes. + + self.EnqueueCmd(command) + + def HandleCmdRetries(self): + """Attempts to retry commands if possible.""" + if self.cmd_retries > 0: + # The EC encountered an error. We'll have to retry again. + self.logger.warning("Retrying command...") + self.cmd_retries -= 1 + self.logger.warning("Retries remaining: %d", self.cmd_retries) + # Retry the command and add the EC UART to the writers again. + self.EnqueueCmd(self.last_cmd) + self.outputs.append(self.ec_uart_pty) + else: + # We're out of retries, so just give up. + self.logger.error("Command failed. No retries left.") + # Clear the command in progress. + self.last_cmd = b"" + # Reset the retry count. + self.cmd_retries = COMMAND_RETRIES + + def SendCmdToEC(self): + """Sends a command to the EC.""" + # If we're retrying a command, just try to send it again. + if self.cmd_retries < COMMAND_RETRIES: + cmd = self.last_cmd + else: + # If we're not retrying, we should not be writing to the EC if we have no + # items in our command queue. + assert not self.ec_cmd_queue.empty() + # Get the command to send. + cmd = self.ec_cmd_queue.get() + + # Send the command. + self.ec_uart_pty.write(cmd) + self.ec_uart_pty.flush() + self.logger.log(1, "Sent command to EC.") + + if self.enhanced_ec and cmd != EC_SYN: + # Now, that we've sent the command, store the current command as the last + # command sent. If we encounter an error string, we will attempt to retry + # this command. + if cmd != self.last_cmd: + self.last_cmd = cmd + # Reset the retry count. + self.cmd_retries = COMMAND_RETRIES + + # If no command is pending to be sent, then we can remove the EC UART from + # writers. Might need better checking for command retry logic in here. + if self.ec_cmd_queue.empty(): + # Remove the EC UART from the writers while we wait for a response. + self.logger.debug("Removing EC UART from writers.") + self.outputs.remove(self.ec_uart_pty) + + def HandleECData(self): + """Handle any debug prints from the EC.""" + self.logger.log(1, "EC has data") + # Read what the EC sent us. + data = os.read(self.ec_uart_pty.fileno(), EC_MAX_READ) + self.logger.log(1, "got: '%s'", binascii.hexlify(data)) + if b"&E" in data and self.enhanced_ec: + # We received an error, so we should retry it if possible. + self.logger.warning("Error string found in data.") + self.HandleCmdRetries() + return + + # If we were interrogating, check the response and update our knowledge + # of the current EC image. + if self.interrogating: + self.enhanced_ec = data == EC_ACK + if self.enhanced_ec: + self.logger.debug("The current EC image seems enhanced.") + else: + self.logger.debug("The current EC image does NOT seem enhanced.") + # Done interrogating. + self.interrogating = False + # For now, just forward everything the EC sends us. + self.logger.log(1, "Forwarding to user...") + self.dbg_pipe.send(data) + + def HandleUserData(self): + """Handle any incoming commands from the user. + + Raises: + EOFError: Allowed to propagate through from self.cmd_pipe.recv(). + """ + self.logger.log(1, "Command data available. Begin processing.") + data = self.cmd_pipe.recv() + # Process the command. + self.ProcessCommand(data) def Crc8(data): - """Calculates the CRC8 of data. + """Calculates the CRC8 of data. - The generator polynomial used is: x^8 + x^2 + x + 1. - This is the same implementation that is used in the EC. + The generator polynomial used is: x^8 + x^2 + x + 1. + This is the same implementation that is used in the EC. - Args: - data: A string of data that we wish to calculate the CRC8 on. + Args: + data: A string of data that we wish to calculate the CRC8 on. - Returns: - crc >> 8: An integer representing the CRC8 value. - """ - crc = 0 - for byte in six.iterbytes(data): - crc ^= (byte << 8) - for _ in range(8): - if crc & 0x8000: - crc ^= (0x1070 << 3) - crc <<= 1 - return crc >> 8 + Returns: + crc >> 8: An integer representing the CRC8 value. + """ + crc = 0 + for byte in six.iterbytes(data): + crc ^= byte << 8 + for _ in range(8): + if crc & 0x8000: + crc ^= 0x1070 << 3 + crc <<= 1 + return crc >> 8 def StartLoop(interp, shutdown_pipe=None): - """Starts an infinite loop of servicing the user and the EC. - - StartLoop checks to see if there are any commands to process, processing them - if any, and forwards EC output to the user. - - When sending a command to the EC, we send the command once and check the - response to see if the EC encountered an error when receiving the command. An - error condition is reported to the interpreter by a string with at least one - '&' and 'E'. The full string is actually '&&EE', however it's possible that - the leading ampersand or trailing 'E' could be dropped. If an error is - encountered, the interpreter will retry up to the amount configured. - - Args: - interp: An Interpreter object that has been properly initialised. - shutdown_pipe: A file object for a pipe or equivalent that becomes readable - (not blocked) to indicate that the loop should exit. Can be None to never - exit the loop. - """ - try: - # This is used instead of "break" to avoid exiting the loop in the middle of - # an iteration. - continue_looping = True - - while continue_looping: - # The inputs list is created anew in each loop iteration because the - # Interpreter class sometimes modifies the interp.inputs list. - if shutdown_pipe is None: - inputs = interp.inputs - else: - inputs = list(interp.inputs) - inputs.append(shutdown_pipe) - - readable, writeable, _ = select.select(inputs, interp.outputs, []) - - for obj in readable: - # Handle any debug prints from the EC. - if obj is interp.ec_uart_pty: - interp.HandleECData() - - # Handle any commands from the user. - elif obj is interp.cmd_pipe: - try: - interp.HandleUserData() - except EOFError: - interp.logger.debug( - 'ec3po interpreter received EOF from cmd_pipe in ' - 'HandleUserData()') - continue_looping = False - - elif obj is shutdown_pipe: - interp.logger.debug( - 'ec3po interpreter received shutdown pipe unblocked notification') - continue_looping = False - - for obj in writeable: - # Send a command to the EC. - if obj is interp.ec_uart_pty: - interp.SendCmdToEC() - - except KeyboardInterrupt: - pass - - finally: - interp.cmd_pipe.close() - interp.dbg_pipe.close() - interp.ec_uart_pty.close() - if shutdown_pipe is not None: - shutdown_pipe.close() - interp.logger.debug('Exit ec3po interpreter loop for %s', - interp.ec_uart_pty_name) + """Starts an infinite loop of servicing the user and the EC. + + StartLoop checks to see if there are any commands to process, processing them + if any, and forwards EC output to the user. + + When sending a command to the EC, we send the command once and check the + response to see if the EC encountered an error when receiving the command. An + error condition is reported to the interpreter by a string with at least one + '&' and 'E'. The full string is actually '&&EE', however it's possible that + the leading ampersand or trailing 'E' could be dropped. If an error is + encountered, the interpreter will retry up to the amount configured. + + Args: + interp: An Interpreter object that has been properly initialised. + shutdown_pipe: A file object for a pipe or equivalent that becomes readable + (not blocked) to indicate that the loop should exit. Can be None to never + exit the loop. + """ + try: + # This is used instead of "break" to avoid exiting the loop in the middle of + # an iteration. + continue_looping = True + + while continue_looping: + # The inputs list is created anew in each loop iteration because the + # Interpreter class sometimes modifies the interp.inputs list. + if shutdown_pipe is None: + inputs = interp.inputs + else: + inputs = list(interp.inputs) + inputs.append(shutdown_pipe) + + readable, writeable, _ = select.select(inputs, interp.outputs, []) + + for obj in readable: + # Handle any debug prints from the EC. + if obj is interp.ec_uart_pty: + interp.HandleECData() + + # Handle any commands from the user. + elif obj is interp.cmd_pipe: + try: + interp.HandleUserData() + except EOFError: + interp.logger.debug( + "ec3po interpreter received EOF from cmd_pipe in " + "HandleUserData()" + ) + continue_looping = False + + elif obj is shutdown_pipe: + interp.logger.debug( + "ec3po interpreter received shutdown pipe unblocked notification" + ) + continue_looping = False + + for obj in writeable: + # Send a command to the EC. + if obj is interp.ec_uart_pty: + interp.SendCmdToEC() + + except KeyboardInterrupt: + pass + + finally: + interp.cmd_pipe.close() + interp.dbg_pipe.close() + interp.ec_uart_pty.close() + if shutdown_pipe is not None: + shutdown_pipe.close() + interp.logger.debug( + "Exit ec3po interpreter loop for %s", interp.ec_uart_pty_name + ) diff --git a/util/ec3po/interpreter_unittest.py b/util/ec3po/interpreter_unittest.py index fe4d43c351..509b90f667 100755 --- a/util/ec3po/interpreter_unittest.py +++ b/util/ec3po/interpreter_unittest.py @@ -10,371 +10,389 @@ from __future__ import print_function import logging -import mock import tempfile import unittest +import mock import six - -from ec3po import interpreter -from ec3po import threadproc_shim +from ec3po import interpreter, threadproc_shim def GetBuiltins(func): - if six.PY2: - return '__builtin__.' + func - return 'builtins.' + func + if six.PY2: + return "__builtin__." + func + return "builtins." + func class TestEnhancedECBehaviour(unittest.TestCase): - """Test case to verify all enhanced EC interpretation tasks.""" - def setUp(self): - """Setup the test harness.""" - # Setup logging with a timestamp, the module, and the log level. - logging.basicConfig(level=logging.DEBUG, - format=('%(asctime)s - %(module)s -' - ' %(levelname)s - %(message)s')) - - # Create a tempfile that would represent the EC UART PTY. - self.tempfile = tempfile.NamedTemporaryFile() - - # Create the pipes that the interpreter will use. - self.cmd_pipe_user, self.cmd_pipe_itpr = threadproc_shim.Pipe() - self.dbg_pipe_user, self.dbg_pipe_itpr = threadproc_shim.Pipe(duplex=False) - - # Mock the open() function so we can inspect reads/writes to the EC. - self.ec_uart_pty = mock.mock_open() - - with mock.patch(GetBuiltins('open'), self.ec_uart_pty): - # Create an interpreter. - self.itpr = interpreter.Interpreter(self.tempfile.name, - self.cmd_pipe_itpr, - self.dbg_pipe_itpr, - log_level=logging.DEBUG, - name="EC") - - @mock.patch('ec3po.interpreter.os') - def test_HandlingCommandsThatProduceNoOutput(self, mock_os): - """Verify that the Interpreter correctly handles non-output commands. - - Args: - mock_os: MagicMock object replacing the 'os' module for this test - case. - """ - # The interpreter init should open the EC UART PTY. - expected_ec_calls = [mock.call(self.tempfile.name, 'r+b', buffering=0)] - # Have a command come in the command pipe. The first command will be an - # interrogation to determine if the EC is enhanced or not. - self.cmd_pipe_user.send(interpreter.EC_SYN) - self.itpr.HandleUserData() - # At this point, the command should be queued up waiting to be sent, so - # let's actually send it to the EC. - self.itpr.SendCmdToEC() - expected_ec_calls.extend([mock.call().write(interpreter.EC_SYN), - mock.call().flush()]) - # Now, assume that the EC sends only 1 response back of EC_ACK. - mock_os.read.side_effect = [interpreter.EC_ACK] - # When reading the EC, the interpreter will call file.fileno() to pass to - # os.read(). - expected_ec_calls.append(mock.call().fileno()) - # Simulate the response. - self.itpr.HandleECData() - - # Now that the interrogation was complete, it's time to send down the real - # command. - test_cmd = b'chan save' - # Send the test command down the pipe. - self.cmd_pipe_user.send(test_cmd) - self.itpr.HandleUserData() - self.itpr.SendCmdToEC() - # Since the EC image is enhanced, we should have sent a packed command. - expected_ec_calls.append(mock.call().write(self.itpr.PackCommand(test_cmd))) - expected_ec_calls.append(mock.call().flush()) - - # Now that the first command was sent, we should send another command which - # produces no output. The console would send another interrogation. - self.cmd_pipe_user.send(interpreter.EC_SYN) - self.itpr.HandleUserData() - self.itpr.SendCmdToEC() - expected_ec_calls.extend([mock.call().write(interpreter.EC_SYN), - mock.call().flush()]) - # Again, assume that the EC sends only 1 response back of EC_ACK. - mock_os.read.side_effect = [interpreter.EC_ACK] - # When reading the EC, the interpreter will call file.fileno() to pass to - # os.read(). - expected_ec_calls.append(mock.call().fileno()) - # Simulate the response. - self.itpr.HandleECData() - - # Now send the second test command. - test_cmd = b'chan 0' - self.cmd_pipe_user.send(test_cmd) - self.itpr.HandleUserData() - self.itpr.SendCmdToEC() - # Since the EC image is enhanced, we should have sent a packed command. - expected_ec_calls.append(mock.call().write(self.itpr.PackCommand(test_cmd))) - expected_ec_calls.append(mock.call().flush()) - - # Finally, verify that the appropriate writes were actually sent to the EC. - self.ec_uart_pty.assert_has_calls(expected_ec_calls) - - @mock.patch('ec3po.interpreter.os') - def test_CommandRetryingOnError(self, mock_os): - """Verify that commands are retried if an error is encountered. - - Args: - mock_os: MagicMock object replacing the 'os' module for this test - case. - """ - # The interpreter init should open the EC UART PTY. - expected_ec_calls = [mock.call(self.tempfile.name, 'r+b', buffering=0)] - # Have a command come in the command pipe. The first command will be an - # interrogation to determine if the EC is enhanced or not. - self.cmd_pipe_user.send(interpreter.EC_SYN) - self.itpr.HandleUserData() - # At this point, the command should be queued up waiting to be sent, so - # let's actually send it to the EC. - self.itpr.SendCmdToEC() - expected_ec_calls.extend([mock.call().write(interpreter.EC_SYN), - mock.call().flush()]) - # Now, assume that the EC sends only 1 response back of EC_ACK. - mock_os.read.side_effect = [interpreter.EC_ACK] - # When reading the EC, the interpreter will call file.fileno() to pass to - # os.read(). - expected_ec_calls.append(mock.call().fileno()) - # Simulate the response. - self.itpr.HandleECData() - - # Let's send a command that is received on the EC-side with an error. - test_cmd = b'accelinfo' - self.cmd_pipe_user.send(test_cmd) - self.itpr.HandleUserData() - self.itpr.SendCmdToEC() - packed_cmd = self.itpr.PackCommand(test_cmd) - expected_ec_calls.extend([mock.call().write(packed_cmd), - mock.call().flush()]) - # Have the EC return the error string twice. - mock_os.read.side_effect = [b'&&EE', b'&&EE'] - for i in range(2): - # When reading the EC, the interpreter will call file.fileno() to pass to - # os.read(). - expected_ec_calls.append(mock.call().fileno()) - # Simulate the response. - self.itpr.HandleECData() - - # Since an error was received, the EC should attempt to retry the command. - expected_ec_calls.extend([mock.call().write(packed_cmd), - mock.call().flush()]) - # Verify that the retry count was decremented. - self.assertEqual(interpreter.COMMAND_RETRIES-i-1, self.itpr.cmd_retries, - 'Unexpected cmd_remaining count.') - # Actually retry the command. - self.itpr.SendCmdToEC() - - # Now assume that the last one goes through with no trouble. - expected_ec_calls.extend([mock.call().write(packed_cmd), - mock.call().flush()]) - self.itpr.SendCmdToEC() - - # Verify all the calls. - self.ec_uart_pty.assert_has_calls(expected_ec_calls) - - def test_PackCommandsForEnhancedEC(self): - """Verify that the interpreter packs commands for enhanced EC images.""" - # Assume current EC image is enhanced. - self.itpr.enhanced_ec = True - # Receive a command from the user. - test_cmd = b'gettime' - self.cmd_pipe_user.send(test_cmd) - # Mock out PackCommand to see if it was called. - self.itpr.PackCommand = mock.MagicMock() - # Have the interpreter handle the command. - self.itpr.HandleUserData() - # Verify that PackCommand() was called. - self.itpr.PackCommand.assert_called_once_with(test_cmd) - - def test_DontPackCommandsForNonEnhancedEC(self): - """Verify the interpreter doesn't pack commands for non-enhanced images.""" - # Assume current EC image is not enhanced. - self.itpr.enhanced_ec = False - # Receive a command from the user. - test_cmd = b'gettime' - self.cmd_pipe_user.send(test_cmd) - # Mock out PackCommand to see if it was called. - self.itpr.PackCommand = mock.MagicMock() - # Have the interpreter handle the command. - self.itpr.HandleUserData() - # Verify that PackCommand() was called. - self.itpr.PackCommand.assert_not_called() - - @mock.patch('ec3po.interpreter.os') - def test_KeepingTrackOfInterrogation(self, mock_os): - """Verify that the interpreter can track the state of the interrogation. - - Args: - mock_os: MagicMock object replacing the 'os' module. for this test - case. - """ - # Upon init, the interpreter should assume that the current EC image is not - # enhanced. - self.assertFalse(self.itpr.enhanced_ec, msg=('State of enhanced_ec upon' - ' init is not False.')) - - # Assume an interrogation request comes in from the user. - self.cmd_pipe_user.send(interpreter.EC_SYN) - self.itpr.HandleUserData() - - # Verify the state is now within an interrogation. - self.assertTrue(self.itpr.interrogating, 'interrogating should be True') - # The state of enhanced_ec should not be changed yet because we haven't - # received a valid response yet. - self.assertFalse(self.itpr.enhanced_ec, msg=('State of enhanced_ec is ' - 'not False.')) - - # Assume that the EC responds with an EC_ACK. - mock_os.read.side_effect = [interpreter.EC_ACK] - self.itpr.HandleECData() - - # Now, the interrogation should be complete and we should know that the - # current EC image is enhanced. - self.assertFalse(self.itpr.interrogating, msg=('interrogating should be ' - 'False')) - self.assertTrue(self.itpr.enhanced_ec, msg='enhanced_ec sholud be True') - - # Now let's perform another interrogation, but pretend that the EC ignores - # it. - self.cmd_pipe_user.send(interpreter.EC_SYN) - self.itpr.HandleUserData() - - # Verify interrogating state. - self.assertTrue(self.itpr.interrogating, 'interrogating sholud be True') - # We should assume that the image is not enhanced until we get the valid - # response. - self.assertFalse(self.itpr.enhanced_ec, 'enhanced_ec should be False now.') - - # Let's pretend that we get a random debug print. This should clear the - # interrogating flag. - mock_os.read.side_effect = [b'[1660.593076 HC 0x103]'] - self.itpr.HandleECData() - - # Verify that interrogating flag is cleared and enhanced_ec is still False. - self.assertFalse(self.itpr.interrogating, 'interrogating should be False.') - self.assertFalse(self.itpr.enhanced_ec, - 'enhanced_ec should still be False.') + """Test case to verify all enhanced EC interpretation tasks.""" + + def setUp(self): + """Setup the test harness.""" + # Setup logging with a timestamp, the module, and the log level. + logging.basicConfig( + level=logging.DEBUG, + format=("%(asctime)s - %(module)s -" " %(levelname)s - %(message)s"), + ) + + # Create a tempfile that would represent the EC UART PTY. + self.tempfile = tempfile.NamedTemporaryFile() + + # Create the pipes that the interpreter will use. + self.cmd_pipe_user, self.cmd_pipe_itpr = threadproc_shim.Pipe() + self.dbg_pipe_user, self.dbg_pipe_itpr = threadproc_shim.Pipe(duplex=False) + + # Mock the open() function so we can inspect reads/writes to the EC. + self.ec_uart_pty = mock.mock_open() + + with mock.patch(GetBuiltins("open"), self.ec_uart_pty): + # Create an interpreter. + self.itpr = interpreter.Interpreter( + self.tempfile.name, + self.cmd_pipe_itpr, + self.dbg_pipe_itpr, + log_level=logging.DEBUG, + name="EC", + ) + + @mock.patch("ec3po.interpreter.os") + def test_HandlingCommandsThatProduceNoOutput(self, mock_os): + """Verify that the Interpreter correctly handles non-output commands. + + Args: + mock_os: MagicMock object replacing the 'os' module for this test + case. + """ + # The interpreter init should open the EC UART PTY. + expected_ec_calls = [mock.call(self.tempfile.name, "r+b", buffering=0)] + # Have a command come in the command pipe. The first command will be an + # interrogation to determine if the EC is enhanced or not. + self.cmd_pipe_user.send(interpreter.EC_SYN) + self.itpr.HandleUserData() + # At this point, the command should be queued up waiting to be sent, so + # let's actually send it to the EC. + self.itpr.SendCmdToEC() + expected_ec_calls.extend( + [mock.call().write(interpreter.EC_SYN), mock.call().flush()] + ) + # Now, assume that the EC sends only 1 response back of EC_ACK. + mock_os.read.side_effect = [interpreter.EC_ACK] + # When reading the EC, the interpreter will call file.fileno() to pass to + # os.read(). + expected_ec_calls.append(mock.call().fileno()) + # Simulate the response. + self.itpr.HandleECData() + + # Now that the interrogation was complete, it's time to send down the real + # command. + test_cmd = b"chan save" + # Send the test command down the pipe. + self.cmd_pipe_user.send(test_cmd) + self.itpr.HandleUserData() + self.itpr.SendCmdToEC() + # Since the EC image is enhanced, we should have sent a packed command. + expected_ec_calls.append(mock.call().write(self.itpr.PackCommand(test_cmd))) + expected_ec_calls.append(mock.call().flush()) + + # Now that the first command was sent, we should send another command which + # produces no output. The console would send another interrogation. + self.cmd_pipe_user.send(interpreter.EC_SYN) + self.itpr.HandleUserData() + self.itpr.SendCmdToEC() + expected_ec_calls.extend( + [mock.call().write(interpreter.EC_SYN), mock.call().flush()] + ) + # Again, assume that the EC sends only 1 response back of EC_ACK. + mock_os.read.side_effect = [interpreter.EC_ACK] + # When reading the EC, the interpreter will call file.fileno() to pass to + # os.read(). + expected_ec_calls.append(mock.call().fileno()) + # Simulate the response. + self.itpr.HandleECData() + + # Now send the second test command. + test_cmd = b"chan 0" + self.cmd_pipe_user.send(test_cmd) + self.itpr.HandleUserData() + self.itpr.SendCmdToEC() + # Since the EC image is enhanced, we should have sent a packed command. + expected_ec_calls.append(mock.call().write(self.itpr.PackCommand(test_cmd))) + expected_ec_calls.append(mock.call().flush()) + + # Finally, verify that the appropriate writes were actually sent to the EC. + self.ec_uart_pty.assert_has_calls(expected_ec_calls) + + @mock.patch("ec3po.interpreter.os") + def test_CommandRetryingOnError(self, mock_os): + """Verify that commands are retried if an error is encountered. + + Args: + mock_os: MagicMock object replacing the 'os' module for this test + case. + """ + # The interpreter init should open the EC UART PTY. + expected_ec_calls = [mock.call(self.tempfile.name, "r+b", buffering=0)] + # Have a command come in the command pipe. The first command will be an + # interrogation to determine if the EC is enhanced or not. + self.cmd_pipe_user.send(interpreter.EC_SYN) + self.itpr.HandleUserData() + # At this point, the command should be queued up waiting to be sent, so + # let's actually send it to the EC. + self.itpr.SendCmdToEC() + expected_ec_calls.extend( + [mock.call().write(interpreter.EC_SYN), mock.call().flush()] + ) + # Now, assume that the EC sends only 1 response back of EC_ACK. + mock_os.read.side_effect = [interpreter.EC_ACK] + # When reading the EC, the interpreter will call file.fileno() to pass to + # os.read(). + expected_ec_calls.append(mock.call().fileno()) + # Simulate the response. + self.itpr.HandleECData() + + # Let's send a command that is received on the EC-side with an error. + test_cmd = b"accelinfo" + self.cmd_pipe_user.send(test_cmd) + self.itpr.HandleUserData() + self.itpr.SendCmdToEC() + packed_cmd = self.itpr.PackCommand(test_cmd) + expected_ec_calls.extend([mock.call().write(packed_cmd), mock.call().flush()]) + # Have the EC return the error string twice. + mock_os.read.side_effect = [b"&&EE", b"&&EE"] + for i in range(2): + # When reading the EC, the interpreter will call file.fileno() to pass to + # os.read(). + expected_ec_calls.append(mock.call().fileno()) + # Simulate the response. + self.itpr.HandleECData() + + # Since an error was received, the EC should attempt to retry the command. + expected_ec_calls.extend( + [mock.call().write(packed_cmd), mock.call().flush()] + ) + # Verify that the retry count was decremented. + self.assertEqual( + interpreter.COMMAND_RETRIES - i - 1, + self.itpr.cmd_retries, + "Unexpected cmd_remaining count.", + ) + # Actually retry the command. + self.itpr.SendCmdToEC() + + # Now assume that the last one goes through with no trouble. + expected_ec_calls.extend([mock.call().write(packed_cmd), mock.call().flush()]) + self.itpr.SendCmdToEC() + + # Verify all the calls. + self.ec_uart_pty.assert_has_calls(expected_ec_calls) + + def test_PackCommandsForEnhancedEC(self): + """Verify that the interpreter packs commands for enhanced EC images.""" + # Assume current EC image is enhanced. + self.itpr.enhanced_ec = True + # Receive a command from the user. + test_cmd = b"gettime" + self.cmd_pipe_user.send(test_cmd) + # Mock out PackCommand to see if it was called. + self.itpr.PackCommand = mock.MagicMock() + # Have the interpreter handle the command. + self.itpr.HandleUserData() + # Verify that PackCommand() was called. + self.itpr.PackCommand.assert_called_once_with(test_cmd) + + def test_DontPackCommandsForNonEnhancedEC(self): + """Verify the interpreter doesn't pack commands for non-enhanced images.""" + # Assume current EC image is not enhanced. + self.itpr.enhanced_ec = False + # Receive a command from the user. + test_cmd = b"gettime" + self.cmd_pipe_user.send(test_cmd) + # Mock out PackCommand to see if it was called. + self.itpr.PackCommand = mock.MagicMock() + # Have the interpreter handle the command. + self.itpr.HandleUserData() + # Verify that PackCommand() was called. + self.itpr.PackCommand.assert_not_called() + + @mock.patch("ec3po.interpreter.os") + def test_KeepingTrackOfInterrogation(self, mock_os): + """Verify that the interpreter can track the state of the interrogation. + + Args: + mock_os: MagicMock object replacing the 'os' module. for this test + case. + """ + # Upon init, the interpreter should assume that the current EC image is not + # enhanced. + self.assertFalse( + self.itpr.enhanced_ec, + msg=("State of enhanced_ec upon" " init is not False."), + ) + + # Assume an interrogation request comes in from the user. + self.cmd_pipe_user.send(interpreter.EC_SYN) + self.itpr.HandleUserData() + + # Verify the state is now within an interrogation. + self.assertTrue(self.itpr.interrogating, "interrogating should be True") + # The state of enhanced_ec should not be changed yet because we haven't + # received a valid response yet. + self.assertFalse( + self.itpr.enhanced_ec, msg=("State of enhanced_ec is " "not False.") + ) + + # Assume that the EC responds with an EC_ACK. + mock_os.read.side_effect = [interpreter.EC_ACK] + self.itpr.HandleECData() + + # Now, the interrogation should be complete and we should know that the + # current EC image is enhanced. + self.assertFalse( + self.itpr.interrogating, msg=("interrogating should be " "False") + ) + self.assertTrue(self.itpr.enhanced_ec, msg="enhanced_ec sholud be True") + + # Now let's perform another interrogation, but pretend that the EC ignores + # it. + self.cmd_pipe_user.send(interpreter.EC_SYN) + self.itpr.HandleUserData() + + # Verify interrogating state. + self.assertTrue(self.itpr.interrogating, "interrogating sholud be True") + # We should assume that the image is not enhanced until we get the valid + # response. + self.assertFalse(self.itpr.enhanced_ec, "enhanced_ec should be False now.") + + # Let's pretend that we get a random debug print. This should clear the + # interrogating flag. + mock_os.read.side_effect = [b"[1660.593076 HC 0x103]"] + self.itpr.HandleECData() + + # Verify that interrogating flag is cleared and enhanced_ec is still False. + self.assertFalse(self.itpr.interrogating, "interrogating should be False.") + self.assertFalse(self.itpr.enhanced_ec, "enhanced_ec should still be False.") class TestUARTDisconnection(unittest.TestCase): - """Test case to verify interpreter disconnection/reconnection.""" - def setUp(self): - """Setup the test harness.""" - # Setup logging with a timestamp, the module, and the log level. - logging.basicConfig(level=logging.DEBUG, - format=('%(asctime)s - %(module)s -' - ' %(levelname)s - %(message)s')) - - # Create a tempfile that would represent the EC UART PTY. - self.tempfile = tempfile.NamedTemporaryFile() - - # Create the pipes that the interpreter will use. - self.cmd_pipe_user, self.cmd_pipe_itpr = threadproc_shim.Pipe() - self.dbg_pipe_user, self.dbg_pipe_itpr = threadproc_shim.Pipe(duplex=False) - - # Mock the open() function so we can inspect reads/writes to the EC. - self.ec_uart_pty = mock.mock_open() - - with mock.patch(GetBuiltins('open'), self.ec_uart_pty): - # Create an interpreter. - self.itpr = interpreter.Interpreter(self.tempfile.name, - self.cmd_pipe_itpr, - self.dbg_pipe_itpr, - log_level=logging.DEBUG, - name="EC") - - # First, check that interpreter is initialized to connected. - self.assertTrue(self.itpr.connected, ('The interpreter should be' - ' initialized in a connected state')) - - def test_DisconnectStopsECTraffic(self): - """Verify that when in disconnected state, no debug prints are sent.""" - # Let's send a disconnect command through the command pipe. - self.cmd_pipe_user.send(b'disconnect') - self.itpr.HandleUserData() - - # Verify interpreter is disconnected from EC. - self.assertFalse(self.itpr.connected, ('The interpreter should be' - 'disconnected.')) - # Verify that the EC UART is no longer a member of the inputs. The - # interpreter will never pull data from the EC if it's not a member of the - # inputs list. - self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs) - - def test_CommandsDroppedWhenDisconnected(self): - """Verify that when in disconnected state, commands are dropped.""" - # Send a command, followed by 'disconnect'. - self.cmd_pipe_user.send(b'taskinfo') - self.itpr.HandleUserData() - self.cmd_pipe_user.send(b'disconnect') - self.itpr.HandleUserData() - - # Verify interpreter is disconnected from EC. - self.assertFalse(self.itpr.connected, ('The interpreter should be' - 'disconnected.')) - # Verify that the EC UART is no longer a member of the inputs nor outputs. - self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs) - self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs) - - # Have the user send a few more commands in the disconnected state. - command = 'help\n' - for char in command: - self.cmd_pipe_user.send(char.encode('utf-8')) - self.itpr.HandleUserData() - - # The command queue should be empty. - self.assertEqual(0, self.itpr.ec_cmd_queue.qsize()) - - # Now send the reconnect command. - self.cmd_pipe_user.send(b'reconnect') - - with mock.patch(GetBuiltins('open'), mock.mock_open()): - self.itpr.HandleUserData() - - # Verify interpreter is connected. - self.assertTrue(self.itpr.connected) - # Verify that EC UART is a member of the inputs. - self.assertTrue(self.itpr.ec_uart_pty in self.itpr.inputs) - # Since no command was sent after reconnection, verify that the EC UART is - # not a member of the outputs. - self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs) - - def test_ReconnectAllowsECTraffic(self): - """Verify that when connected, EC UART traffic is allowed.""" - # Let's send a disconnect command through the command pipe. - self.cmd_pipe_user.send(b'disconnect') - self.itpr.HandleUserData() - - # Verify interpreter is disconnected. - self.assertFalse(self.itpr.connected, ('The interpreter should be' - 'disconnected.')) - # Verify that the EC UART is no longer a member of the inputs nor outputs. - self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs) - self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs) - - # Issue reconnect command through the command pipe. - self.cmd_pipe_user.send(b'reconnect') - - with mock.patch(GetBuiltins('open'), mock.mock_open()): - self.itpr.HandleUserData() - - # Verify interpreter is connected. - self.assertTrue(self.itpr.connected, ('The interpreter should be' - 'connected.')) - # Verify that the EC UART is now a member of the inputs. - self.assertTrue(self.itpr.ec_uart_pty in self.itpr.inputs) - # Since we have issued no commands during the disconnected state, no - # commands are pending and therefore the PTY should not be added to the - # outputs. - self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs) - - -if __name__ == '__main__': - unittest.main() + """Test case to verify interpreter disconnection/reconnection.""" + + def setUp(self): + """Setup the test harness.""" + # Setup logging with a timestamp, the module, and the log level. + logging.basicConfig( + level=logging.DEBUG, + format=("%(asctime)s - %(module)s -" " %(levelname)s - %(message)s"), + ) + + # Create a tempfile that would represent the EC UART PTY. + self.tempfile = tempfile.NamedTemporaryFile() + + # Create the pipes that the interpreter will use. + self.cmd_pipe_user, self.cmd_pipe_itpr = threadproc_shim.Pipe() + self.dbg_pipe_user, self.dbg_pipe_itpr = threadproc_shim.Pipe(duplex=False) + + # Mock the open() function so we can inspect reads/writes to the EC. + self.ec_uart_pty = mock.mock_open() + + with mock.patch(GetBuiltins("open"), self.ec_uart_pty): + # Create an interpreter. + self.itpr = interpreter.Interpreter( + self.tempfile.name, + self.cmd_pipe_itpr, + self.dbg_pipe_itpr, + log_level=logging.DEBUG, + name="EC", + ) + + # First, check that interpreter is initialized to connected. + self.assertTrue( + self.itpr.connected, + ("The interpreter should be" " initialized in a connected state"), + ) + + def test_DisconnectStopsECTraffic(self): + """Verify that when in disconnected state, no debug prints are sent.""" + # Let's send a disconnect command through the command pipe. + self.cmd_pipe_user.send(b"disconnect") + self.itpr.HandleUserData() + + # Verify interpreter is disconnected from EC. + self.assertFalse( + self.itpr.connected, ("The interpreter should be" "disconnected.") + ) + # Verify that the EC UART is no longer a member of the inputs. The + # interpreter will never pull data from the EC if it's not a member of the + # inputs list. + self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs) + + def test_CommandsDroppedWhenDisconnected(self): + """Verify that when in disconnected state, commands are dropped.""" + # Send a command, followed by 'disconnect'. + self.cmd_pipe_user.send(b"taskinfo") + self.itpr.HandleUserData() + self.cmd_pipe_user.send(b"disconnect") + self.itpr.HandleUserData() + + # Verify interpreter is disconnected from EC. + self.assertFalse( + self.itpr.connected, ("The interpreter should be" "disconnected.") + ) + # Verify that the EC UART is no longer a member of the inputs nor outputs. + self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs) + self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs) + + # Have the user send a few more commands in the disconnected state. + command = "help\n" + for char in command: + self.cmd_pipe_user.send(char.encode("utf-8")) + self.itpr.HandleUserData() + + # The command queue should be empty. + self.assertEqual(0, self.itpr.ec_cmd_queue.qsize()) + + # Now send the reconnect command. + self.cmd_pipe_user.send(b"reconnect") + + with mock.patch(GetBuiltins("open"), mock.mock_open()): + self.itpr.HandleUserData() + + # Verify interpreter is connected. + self.assertTrue(self.itpr.connected) + # Verify that EC UART is a member of the inputs. + self.assertTrue(self.itpr.ec_uart_pty in self.itpr.inputs) + # Since no command was sent after reconnection, verify that the EC UART is + # not a member of the outputs. + self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs) + + def test_ReconnectAllowsECTraffic(self): + """Verify that when connected, EC UART traffic is allowed.""" + # Let's send a disconnect command through the command pipe. + self.cmd_pipe_user.send(b"disconnect") + self.itpr.HandleUserData() + + # Verify interpreter is disconnected. + self.assertFalse( + self.itpr.connected, ("The interpreter should be" "disconnected.") + ) + # Verify that the EC UART is no longer a member of the inputs nor outputs. + self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs) + self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs) + + # Issue reconnect command through the command pipe. + self.cmd_pipe_user.send(b"reconnect") + + with mock.patch(GetBuiltins("open"), mock.mock_open()): + self.itpr.HandleUserData() + + # Verify interpreter is connected. + self.assertTrue(self.itpr.connected, ("The interpreter should be" "connected.")) + # Verify that the EC UART is now a member of the inputs. + self.assertTrue(self.itpr.ec_uart_pty in self.itpr.inputs) + # Since we have issued no commands during the disconnected state, no + # commands are pending and therefore the PTY should not be added to the + # outputs. + self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs) + + +if __name__ == "__main__": + unittest.main() diff --git a/util/ec3po/threadproc_shim.py b/util/ec3po/threadproc_shim.py index da5440b1f3..c0b3ce0bf4 100644 --- a/util/ec3po/threadproc_shim.py +++ b/util/ec3po/threadproc_shim.py @@ -34,33 +34,34 @@ wait until after completing the TODO above to stop using multiprocessing.Pipe! # Imports to bring objects into this namespace for users of this module. from multiprocessing import Pipe -from six.moves.queue import Queue from threading import Thread as ThreadOrProcess +from six.moves.queue import Queue + # True if this module has ec3po using subprocesses, False if using threads. USING_SUBPROCS = False def _DoNothing(): - """Do-nothing function for use as a callback with DoIf().""" + """Do-nothing function for use as a callback with DoIf().""" def DoIf(subprocs=_DoNothing, threads=_DoNothing): - """Return a callback or not based on ec3po use of subprocesses or threads. + """Return a callback or not based on ec3po use of subprocesses or threads. - Args: - subprocs: callback that does not require any args - This will be returned - (not called!) if and only if ec3po is using subprocesses. This is - OPTIONAL, the default value is a do-nothing callback that returns None. - threads: callback that does not require any args - This will be returned - (not called!) if and only if ec3po is using threads. This is OPTIONAL, - the default value is a do-nothing callback that returns None. + Args: + subprocs: callback that does not require any args - This will be returned + (not called!) if and only if ec3po is using subprocesses. This is + OPTIONAL, the default value is a do-nothing callback that returns None. + threads: callback that does not require any args - This will be returned + (not called!) if and only if ec3po is using threads. This is OPTIONAL, + the default value is a do-nothing callback that returns None. - Returns: - Either the subprocs or threads argument will be returned. - """ - return subprocs if USING_SUBPROCS else threads + Returns: + Either the subprocs or threads argument will be returned. + """ + return subprocs if USING_SUBPROCS else threads def Value(ctype, *args): - return ctype(*args) + return ctype(*args) |