summaryrefslogtreecommitdiff
path: root/util
diff options
context:
space:
mode:
authorAseda Aboagye <aaboagye@google.com>2015-11-12 12:02:32 -0800
committerchrome-bot <chrome-bot@chromium.org>2015-11-26 01:39:36 -0800
commitf14dd412ce2ae44ac3ad1648a447a25077dbd82f (patch)
tree6087c71a230379a515ec92ab2056a0c839f196cc /util
parent1d0785da90f97bd2a58e40a62e5280fd0a574f08 (diff)
downloadchrome-ec-f14dd412ce2ae44ac3ad1648a447a25077dbd82f.tar.gz
ec3po: Add compatibility for older EC images.
The EC-3PO console and interpreter could be used to talk to EC images which do not have the necessary changes to support the new enhancements. If this was the case, the interpreter would be very confused and the user wouldn't be able to use the console. This commit adds compatibility support for talking to both non-enhanced and enhanced EC images. When the console and interpreter are instantiated, they assume by default that the EC image they are talking to is non-enhanced. When the user presses the carriage return key, the console initiates an interrogation with the EC image. The interrogation is a simple EC_SYN(0xEC) and waits EC_INTERROGATION_TIMEOUT for the correct EC_ACK(0xC0). Enhanced EC images will try to reply immediately to a EC_SYN. Non-enhanced EC images will just ignore the EC_SYN as it's not a printable character. Once the interrogation is complete, the console will either simply pass everything forwards to the EC or provide the console interface itself. BUG=chrome-os-partner:46063 BRANCH=None TEST=Enabled CONFIG_EXPERIMENTAL_CONSOLE on GLaDOS. Entered some commands and verified console was working. Disabled CONFIG_EXPERIMENTAL_CONSOLE on GLaDOS, reflashed, and verified console was still working without restarting the EC-3PO console. TEST=./util/ec3po/console_unittest.py -b TEST=./util/ec3po/interpreter_unittest.py -b TEST=cros lint --debug util/ec3po/console.py TEST=cros lint --debug util/ec3po/console_unittest.py TEST=cros lint --debug util/ec3po/interpreter.py TEST=cros lint --debug util/ec3po/interpreter_unittest.py Change-Id: I4f472afbdd7e898bee308c239b68ace0f4049842 Signed-off-by: Aseda Aboagye <aaboagye@google.com> Reviewed-on: https://chromium-review.googlesource.com/313002 Commit-Ready: Aseda Aboagye <aaboagye@chromium.org> Tested-by: Aseda Aboagye <aaboagye@chromium.org> Reviewed-by: Randall Spangler <rspangler@chromium.org>
Diffstat (limited to 'util')
-rwxr-xr-xutil/ec3po/console.py46
-rwxr-xr-xutil/ec3po/console_unittest.py698
-rwxr-xr-xutil/ec3po/interpreter.py204
-rwxr-xr-xutil/ec3po/interpreter_unittest.py252
4 files changed, 883 insertions, 317 deletions
diff --git a/util/ec3po/console.py b/util/ec3po/console.py
index de657b7ae0..b2e179671e 100755
--- a/util/ec3po/console.py
+++ b/util/ec3po/console.py
@@ -10,6 +10,7 @@ session-persistent command history.
"""
from __future__ import print_function
import argparse
+import binascii
from chromite.lib import cros_logging as logging
import multiprocessing
import os
@@ -83,6 +84,10 @@ class Console(object):
history_pos: An integer representing the current history buffer position.
This index is used to show previous commands.
prompt: A string representing the console prompt displayed to the user.
+ enhanced_ec: A boolean indicating if the EC image that we are currently
+ communicating with is enhanced or not. Enhanced EC images will support
+ packed commands and host commands over the UART. This defaults to False
+ until we perform some handshaking.
"""
def __init__(self, master_pty, user_pty, cmd_pipe, dbg_pipe):
@@ -111,6 +116,7 @@ class Console(object):
self.history = []
self.history_pos = 0
self.prompt = PROMPT
+ self.enhanced_ec = False
def __str__(self):
"""Show internal state of Console object as a string."""
@@ -371,12 +377,52 @@ class Console(object):
logging.debug('Sending command to interpreter.')
self.cmd_pipe.send(self.input_buffer)
+ def CheckForEnhancedECImage(self):
+ """Performs an interrogation of the EC image.
+
+ Send a SYN and expect an ACK. If no ACK or the response is incorrect, then
+ assume that the current EC image that we are talking to is not enhanced.
+
+ Returns:
+ A boolean indicating whether the EC responded to the interrogation
+ correctly.
+ """
+ # Send interrogation byte and wait for the response.
+ logging.debug('Performing interrogation.')
+ self.cmd_pipe.send(interpreter.EC_SYN)
+
+ response = ''
+ if self.dbg_pipe.poll(interpreter.EC_INTERROGATION_TIMEOUT):
+ response = self.dbg_pipe.recv()
+ logging.debug('response: \'%s\'', binascii.hexlify(response))
+ else:
+ logging.debug('Timed out waiting for EC_ACK')
+
+ # Verify the acknowledgment.
+ return response == interpreter.EC_ACK
+
def HandleChar(self, byte):
"""HandleChar does a certain action when it receives a character.
Args:
byte: An integer representing the character received from the user.
"""
+ # Interrogate the EC every time we press the enter key. This is necessary
+ # so that we know whether or not we should provide the console interface or
+ # simply behave as a pass-thru.
+ if byte == ControlKey.CARRIAGE_RETURN:
+ self.enhanced_ec = self.CheckForEnhancedECImage()
+ logging.debug('Enhanced EC image? %r', self.enhanced_ec)
+
+ if not self.enhanced_ec:
+ # Send everything straight to the EC to handle.
+ self.cmd_pipe.send(chr(byte))
+ # Reset the input buffer.
+ self.input_buffer = ''
+ self.input_buffer_pos = 0
+ logging.debug('Reset input buffer.')
+ return
+
# Keep handling the ESC sequence if we're in the middle of it.
if self.esc_state != 0:
self.HandleEsc(byte)
diff --git a/util/ec3po/console_unittest.py b/util/ec3po/console_unittest.py
index 7e9e4ab9f1..67c686c85b 100755
--- a/util/ec3po/console_unittest.py
+++ b/util/ec3po/console_unittest.py
@@ -5,12 +5,14 @@
"""Unit tests for the EC-3PO Console interface."""
from __future__ import print_function
import binascii
+from chromite.lib import cros_logging as logging
import mock
import multiprocessing
import tempfile
import unittest
import console
+import interpreter
ESC_STRING = chr(console.ControlKey.ESC)
@@ -65,11 +67,167 @@ BACKSPACE_STRING += ' '
# Move cursor left 1 column.
BACKSPACE_STRING += OutputStream.MoveCursorLeft(1)
+def StringToByteList(string):
+ """Converts a string to list of bytes.
+
+ Args:
+ string: A literal string to turn into a list of bytes.
+
+ Returns:
+ A list of integers representing the byte value of each character in the
+ string.
+ """
+ return [ord(c) for c in string]
+
+def BadConsoleOutput(expected, got):
+ """Format the console output into readable text.
+
+ Args:
+ expected: A list of bytes representing the expected output console
+ stream.
+ got: A list of byte representing the actual output console stream.
+
+ Returns:
+ string: A formatted string which shows the expected console output stream
+ and the actual console output stream.
+ """
+ esc_state = 0
+ string = 'Incorrect console output stream.\n'
+ string += 'exp: |'
+ count = 0
+ for char in expected:
+ if esc_state != 0:
+ if esc_state == console.EscState.ESC_START:
+ if char == '[':
+ esc_state = console.EscState.ESC_BRACKET
+ elif esc_state == console.EscState.ESC_BRACKET:
+ if char == 'D':
+ string += '[cursor left ' + str(count) + ' cols]'
+ esc_state = 0
+ elif char == 'C':
+ string += '[cursor right ' + str(count) + ' cols]'
+ esc_state = 0
+ else:
+ count = int(char)
+ # Print if it's printable.
+ elif console.IsPrintable(ord(char)):
+ string += char
+ else:
+ # It might be a sequence of some type.
+ if ord(char) == console.ControlKey.ESC:
+ # Need to look at the following sequence.
+ esc_state = console.EscState.ESC_START
+ else:
+ string += '{' + binascii.hexlify(char) + '}'
+
+ string += '|\n\ngot: |'
+ for char in got:
+ if esc_state != 0:
+ if esc_state == console.EscState.ESC_START:
+ if char == '[':
+ esc_state = console.EscState.ESC_BRACKET
+ elif esc_state == console.EscState.ESC_BRACKET:
+ if char == 'D':
+ string += '[cursor left ' + str(count) + ' cols]'
+ esc_state = 0
+ elif char == 'C':
+ string += '[cursor right ' + str(count) + ' cols]'
+ esc_state = 0
+ else:
+ count = int(char)
+ # Print if it's printable.
+ elif console.IsPrintable(ord(char)):
+ string += char
+ else:
+ # It might be a sequence of some type.
+ if ord(char) == console.ControlKey.ESC:
+ # Need to look at the following sequence.
+ esc_state = console.EscState.ESC_START
+ else:
+ string += '{' + binascii.hexlify(char) + '}'
+ string += '|\n\n'
+
+ # TODO(aaboagye): It would be nice to replace all those move left 1, ' ',
+ # move left 1, with backspace.
+
+ return string
+
+def CheckConsoleOutput(test_case, exp_console_out):
+ """Verify what was sent out the console matches what we expect.
+
+ Args:
+ test_case: A unittest.TestCase object representing the current unit test.
+ exp_console_out: A string representing the console output stream.
+ """
+ # Read what was sent out the console.
+ test_case.tempfile.seek(0)
+ console_out = test_case.tempfile.read()
+
+ test_case.assertEqual(exp_console_out,
+ console_out,
+ (BadConsoleOutput(exp_console_out, console_out)
+ + str(test_case.console)))
+
+def CheckInputBuffer(test_case, exp_input_buffer):
+ """Verify that the input buffer contains what we expect.
+
+ Args:
+ test_case: A unittest.TestCase object representing the current unit test.
+ exp_input_buffer: A string containing the contents of the current input
+ buffer.
+ """
+ test_case.assertEqual(exp_input_buffer, test_case.console.input_buffer,
+ ('input buffer does not match expected.\n'
+ 'expected: |' + exp_input_buffer + '|\n'
+ 'got: |' + test_case.console.input_buffer +
+ '|\n' + str(test_case.console)))
+
+def CheckInputBufferPosition(test_case, exp_pos):
+ """Verify the input buffer position.
+
+ Args:
+ test_case: A unittest.TestCase object representing the current unit test.
+ exp_pos: An integer representing the expected input buffer position.
+ """
+ test_case.assertEqual(exp_pos, test_case.console.input_buffer_pos,
+ 'input buffer position is incorrect.\ngot: ' +
+ str(test_case.console.input_buffer_pos) + '\nexp: ' +
+ str(exp_pos) + '\n' + str(test_case.console))
+
+def CheckHistoryBuffer(test_case, exp_history):
+ """Verify that the items in the history buffer are what we expect.
+
+ Args:
+ test_case: A unittest.TestCase object representing the current unit test.
+ exp_history: A list of strings representing the expected contents of the
+ history buffer.
+ """
+ # First, check to see if the length is what we expect.
+ test_case.assertEqual(len(exp_history), len(test_case.console.history),
+ ('The number of items in the history is unexpected.\n'
+ 'exp: ' + str(len(exp_history)) + '\n'
+ 'got: ' + str(len(test_case.console.history)) + '\n'
+ 'internal state:\n' + str(test_case.console)))
+
+ # Next, check the actual contents of the history buffer.
+ for i in range(len(exp_history)):
+ test_case.assertEqual(exp_history[i], test_case.console.history[i],
+ ('history buffer contents are incorrect.\n'
+ 'exp: ' + exp_history[i] + '\n'
+ 'got: ' + test_case.console.history[i] + '\n'
+ 'internal state:\n' + str(test_case.console)))
+
+
class TestConsoleEditingMethods(unittest.TestCase):
"""Test case to verify all console editing methods."""
def setUp(self):
"""Setup the test harness."""
+ # Setup logging with a timestamp, the module, and the log level.
+ logging.basicConfig(level=logging.DEBUG,
+ format=('%(asctime)s - %(module)s -'
+ ' %(levelname)s - %(message)s'))
+
# Create a temp file and set both the master and slave PTYs to the file to
# create a loopback.
self.tempfile = tempfile.TemporaryFile()
@@ -80,159 +238,20 @@ class TestConsoleEditingMethods(unittest.TestCase):
self.console = console.Console(self.tempfile.fileno(), self.tempfile,
dummy_pipe_end_0, dummy_pipe_end_1)
+ # Console editing methods are only valid for enhanced EC images, therefore
+ # we have to assume that the "EC" we're talking to is enhanced. By default,
+ # the console believes that the EC it's communicating with is NOT enhanced
+ # which is why we have to override it here.
+ self.console.enhanced_ec = True
+ self.console.CheckForEnhancedECImage = mock.MagicMock(return_value=True)
+
# Mock out sends to the interpreter.
multiprocessing.Pipe.send = mock.MagicMock()
- def StringToByteList(self, string):
- """Converts a string to list of bytes.
-
- Args:
- string: A literal string to turn into a list of bytes.
-
- Returns:
- A list of integers representing the byte value of each character in the
- string.
- """
- return [ord(c) for c in string]
-
- def BadConsoleOutput(self, expected, got):
- """Format the console output into readable text.
-
- Args:
- expected: A list of bytes representing the expected output console
- stream.
- got: A list of byte representing the actual output console stream.
-
- Returns:
- string: A formatted string which shows the expected console output stream
- and the actual console output stream.
- """
- esc_state = 0
- string = 'Incorrect console output stream.\n'
- string += 'exp: |'
- count = 0
- for char in expected:
- if esc_state != 0:
- if esc_state == console.EscState.ESC_START:
- if char == '[':
- esc_state = console.EscState.ESC_BRACKET
- elif esc_state == console.EscState.ESC_BRACKET:
- if char == 'D':
- string += '[cursor left ' + str(count) + ' cols]'
- esc_state = 0
- elif char == 'C':
- string += '[cursor right ' + str(count) + ' cols]'
- esc_state = 0
- else:
- count = int(char)
- # Print if it's printable.
- elif console.IsPrintable(ord(char)):
- string += char
- else:
- # It might be a sequence of some type.
- if ord(char) == console.ControlKey.ESC:
- # Need to look at the following sequence.
- esc_state = console.EscState.ESC_START
- else:
- string += '{' + binascii.hexlify(char) + '}'
-
- string += '|\n\ngot: |'
- for char in got:
- if esc_state != 0:
- if esc_state == console.EscState.ESC_START:
- if char == '[':
- esc_state = console.EscState.ESC_BRACKET
- elif esc_state == console.EscState.ESC_BRACKET:
- if char == 'D':
- string += '[cursor left ' + str(count) + ' cols]'
- esc_state = 0
- elif char == 'C':
- string += '[cursor right ' + str(count) + ' cols]'
- esc_state = 0
- else:
- count = int(char)
- # Print if it's printable.
- elif console.IsPrintable(ord(char)):
- string += char
- else:
- # It might be a sequence of some type.
- if ord(char) == console.ControlKey.ESC:
- # Need to look at the following sequence.
- esc_state = console.EscState.ESC_START
- else:
- string += '{' + binascii.hexlify(char) + '}'
- string += '|\n\n'
-
- # TODO(aaboagye): It would be nice to replace all those move left 1, ' ',
- # move left 1, with backspace.
-
- return string
-
- def CheckConsoleOutput(self, exp_console_out):
- """Verify what was sent out the console matches what we expect.
-
- Args:
- exp_console_out: A string representing the console output stream.
- """
- # Read what was sent out the console.
- self.tempfile.seek(0)
- console_out = self.tempfile.read()
-
- self.assertEqual(exp_console_out,
- console_out,
- (self.BadConsoleOutput(exp_console_out, console_out)
- + str(self.console)))
-
- def CheckInputBuffer(self, exp_input_buffer):
- """Verify that the input buffer contains what we expect.
-
- Args:
- exp_input_buffer: A string containing the contents of the current input
- buffer.
- """
- self.assertEqual(exp_input_buffer, self.console.input_buffer,
- ('input buffer does not match expected.\n'
- 'expected: |' + exp_input_buffer + '|\n'
- 'got: |' + self.console.input_buffer + '|\n' +
- str(self.console)))
-
- def CheckInputBufferPosition(self, exp_pos):
- """Verify the input buffer position.
-
- Args:
- exp_pos: An integer representing the expected input buffer position.
- """
- self.assertEqual(exp_pos, self.console.input_buffer_pos,
- 'input buffer position is incorrect.\ngot: ' +
- str(self.console.input_buffer_pos) + '\nexp: ' +
- str(exp_pos) + '\n' + str(self.console))
-
- def CheckHistoryBuffer(self, exp_history):
- """Verify that the items in the history buffer are what we expect.
-
- Args:
- exp_history: A list of strings representing the expected contents of the
- history buffer.
- """
- # First, check to see if the length is what we expect.
- self.assertEqual(len(exp_history), len(self.console.history),
- ('The number of items in the history is unexpected.\n'
- 'exp: ' + str(len(exp_history)) + '\n'
- 'got: ' + str(len(self.console.history)) + '\n'
- 'internal state:\n' + str(self.console)))
-
- # Next, check the actual contents of the history buffer.
- for i in range(len(exp_history)):
- self.assertEqual(exp_history[i], self.console.history[i],
- ('history buffer contents are incorrect.\n'
- 'exp: ' + exp_history[i] + '\n'
- 'got: ' + self.console.history[i] + '\n'
- 'internal state:\n' + str(self.console)))
-
def test_EnteringChars(self):
"""Verify that characters are echoed onto the console."""
test_str = 'abc'
- input_stream = self.StringToByteList(test_str)
+ input_stream = StringToByteList(test_str)
# Send the characters in.
for byte in input_stream:
@@ -240,20 +259,20 @@ class TestConsoleEditingMethods(unittest.TestCase):
# Check the input position.
exp_pos = len(test_str)
- self.CheckInputBufferPosition(exp_pos)
+ CheckInputBufferPosition(self, exp_pos)
# Verify that the input buffer is correct.
expected_buffer = test_str
- self.CheckInputBuffer(expected_buffer)
+ CheckInputBuffer(self, expected_buffer)
# Check console output
exp_console_out = test_str
- self.CheckConsoleOutput(exp_console_out)
+ CheckConsoleOutput(self, exp_console_out)
def test_EnteringDeletingMoreCharsThanEntered(self):
"""Verify that we can press backspace more than we have entered chars."""
test_str = 'spamspam'
- input_stream = self.StringToByteList(test_str)
+ input_stream = StringToByteList(test_str)
# Send the characters in.
for byte in input_stream:
@@ -269,7 +288,7 @@ class TestConsoleEditingMethods(unittest.TestCase):
self.console.HandleChar(byte)
# First, verify that input buffer position is 0.
- self.CheckInputBufferPosition(0)
+ CheckInputBufferPosition(self, 0)
# Next, examine the output stream for the correct sequence.
exp_console_out = test_str
@@ -277,13 +296,13 @@ class TestConsoleEditingMethods(unittest.TestCase):
exp_console_out += BACKSPACE_STRING
# Now, verify that we got what we expected.
- self.CheckConsoleOutput(exp_console_out)
+ CheckConsoleOutput(self, exp_console_out)
def test_EnteringMoreThanCharLimit(self):
"""Verify that we drop characters when the line is too long."""
test_str = self.console.line_limit * 'o' # All allowed.
test_str += 5 * 'x' # All should be dropped.
- input_stream = self.StringToByteList(test_str)
+ input_stream = StringToByteList(test_str)
# Send the characters in.
for byte in input_stream:
@@ -292,15 +311,15 @@ class TestConsoleEditingMethods(unittest.TestCase):
# First, we expect that input buffer position should be equal to the line
# limit.
exp_pos = self.console.line_limit
- self.CheckInputBufferPosition(exp_pos)
+ CheckInputBufferPosition(self, exp_pos)
# The input buffer should only hold until the line limit.
exp_buffer = test_str[0:self.console.line_limit]
- self.CheckInputBuffer(exp_buffer)
+ CheckInputBuffer(self, exp_buffer)
# Lastly, check that the extra characters are not printed.
exp_console_out = exp_buffer
- self.CheckConsoleOutput(exp_console_out)
+ CheckConsoleOutput(self, exp_console_out)
def test_ValidKeysOnLongLine(self):
"""Verify that we can still press valid keys if the line is too long."""
@@ -309,7 +328,7 @@ class TestConsoleEditingMethods(unittest.TestCase):
exp_console_out = test_str
# Try to fill it even more; these should all be dropped.
test_str += 5 * 'x'
- input_stream = self.StringToByteList(test_str)
+ input_stream = StringToByteList(test_str)
# We should be able to press the following keys:
# - Backspace
@@ -323,7 +342,7 @@ class TestConsoleEditingMethods(unittest.TestCase):
input_stream.append(console.ControlKey.BACKSPACE)
exp_console_out += BACKSPACE_STRING
# Refill the line.
- input_stream.extend(self.StringToByteList('o'))
+ input_stream.extend(StringToByteList('o'))
exp_console_out += 'o'
# Left arrow key.
@@ -399,7 +418,7 @@ class TestConsoleEditingMethods(unittest.TestCase):
self.console.HandleChar(byte)
# Verify everything happened correctly.
- self.CheckConsoleOutput(exp_console_out)
+ CheckConsoleOutput(self, exp_console_out)
def test_BackspaceOnEmptyLine(self):
"""Verify that we can backspace on an empty line with no bad effects."""
@@ -412,21 +431,21 @@ class TestConsoleEditingMethods(unittest.TestCase):
# Check the input position.
exp_pos = 0
- self.CheckInputBufferPosition(exp_pos)
+ CheckInputBufferPosition(self, exp_pos)
# Check that buffer is empty.
exp_input_buffer = ''
- self.CheckInputBuffer(exp_input_buffer)
+ CheckInputBuffer(self, exp_input_buffer)
# Check that the console output is empty.
exp_console_out = ''
- self.CheckConsoleOutput(exp_console_out)
+ CheckConsoleOutput(self, exp_console_out)
def test_BackspaceWithinLine(self):
"""Verify that we shift the chars over when backspacing within a line."""
# Misspell 'help'
test_str = 'heelp'
- input_stream = self.StringToByteList(test_str)
+ input_stream = StringToByteList(test_str)
# Use the arrow key to go back to fix it.
# Move cursor left 1 column.
input_stream.extend(2*Keys.LEFT_ARROW)
@@ -439,10 +458,10 @@ class TestConsoleEditingMethods(unittest.TestCase):
# Verify the input buffer
exp_input_buffer = 'help'
- self.CheckInputBuffer(exp_input_buffer)
+ CheckInputBuffer(self, exp_input_buffer)
# Verify the input buffer position. It should be at 2 (cursor over the 'l')
- self.CheckInputBufferPosition(2)
+ CheckInputBufferPosition(self, 2)
# We expect the console output to be the test string, with two moves to the
# left, another move left, and then the rest of the line followed by a
@@ -458,13 +477,13 @@ class TestConsoleEditingMethods(unittest.TestCase):
exp_console_out += OutputStream.MoveCursorLeft(3)
# Verify console output.
- self.CheckConsoleOutput(exp_console_out)
+ CheckConsoleOutput(self, exp_console_out)
def test_JumpToBeginningOfLineViaCtrlA(self):
"""Verify that we can jump to the beginning of a line with Ctrl+A."""
# Enter some chars and press CTRL+A
test_str = 'abc'
- input_stream = self.StringToByteList(test_str) + [console.ControlKey.CTRL_A]
+ input_stream = StringToByteList(test_str) + [console.ControlKey.CTRL_A]
# Send the characters in.
for byte in input_stream:
@@ -475,18 +494,18 @@ class TestConsoleEditingMethods(unittest.TestCase):
exp_console_out += OutputStream.MoveCursorLeft(len(test_str))
# Check to see what whas printed on the console.
- self.CheckConsoleOutput(exp_console_out)
+ CheckConsoleOutput(self, exp_console_out)
# Check that the input buffer position is now 0.
- self.CheckInputBufferPosition(0)
+ CheckInputBufferPosition(self, 0)
# Check input buffer still contains our test string.
- self.CheckInputBuffer(test_str)
+ CheckInputBuffer(self, test_str)
def test_JumpToBeginningOfLineViaHomeKey(self):
"""Jump to beginning of line via HOME key."""
test_str = 'version'
- input_stream = self.StringToByteList(test_str)
+ input_stream = StringToByteList(test_str)
input_stream.extend(Keys.HOME)
# Send out the stream.
@@ -494,20 +513,20 @@ class TestConsoleEditingMethods(unittest.TestCase):
self.console.HandleChar(byte)
# First, verify that input buffer position is now 0.
- self.CheckInputBufferPosition(0)
+ CheckInputBufferPosition(self, 0)
# Next, verify that the input buffer did not change.
- self.CheckInputBuffer(test_str)
+ CheckInputBuffer(self, test_str)
# Lastly, check that the cursor moved correctly.
exp_console_out = test_str
exp_console_out += OutputStream.MoveCursorLeft(len(test_str))
- self.CheckConsoleOutput(exp_console_out)
+ CheckConsoleOutput(self, exp_console_out)
def test_JumpToEndOfLineViaEndKey(self):
"""Jump to the end of the line using the END key."""
test_str = 'version'
- input_stream = self.StringToByteList(test_str)
+ input_stream = StringToByteList(test_str)
input_stream += [console.ControlKey.CTRL_A]
# Now, jump to the end of the line.
input_stream.extend(Keys.END)
@@ -518,7 +537,7 @@ class TestConsoleEditingMethods(unittest.TestCase):
# Verify that the input buffer position is correct. This should be at the
# end of the test string.
- self.CheckInputBufferPosition(len(test_str))
+ CheckInputBufferPosition(self, len(test_str))
# The expected output should be the test string, followed by a jump to the
# beginning of the line, and lastly a jump to the end of the line.
@@ -528,12 +547,12 @@ class TestConsoleEditingMethods(unittest.TestCase):
exp_console_out += OutputStream.MoveCursorRight(len(test_str))
# Verify console output stream.
- self.CheckConsoleOutput(exp_console_out)
+ CheckConsoleOutput(self, exp_console_out)
def test_JumpToEndOfLineViaCtrlE(self):
"""Enter some chars and then try to jump to the end. (Should be a no-op)"""
test_str = 'sysinfo'
- input_stream = self.StringToByteList(test_str)
+ input_stream = StringToByteList(test_str)
input_stream.append(console.ControlKey.CTRL_E)
# Send out the stream
@@ -542,7 +561,7 @@ class TestConsoleEditingMethods(unittest.TestCase):
# Verify that the input buffer position isn't any further than we expect.
# At this point, the position should be at the end of the test string.
- self.CheckInputBufferPosition(len(test_str))
+ CheckInputBufferPosition(self, len(test_str))
# Now, let's try to jump to the beginning and then jump back to the end.
input_stream = [console.ControlKey.CTRL_A, console.ControlKey.CTRL_E]
@@ -552,7 +571,7 @@ class TestConsoleEditingMethods(unittest.TestCase):
self.console.HandleChar(byte)
# Perform the same verification.
- self.CheckInputBufferPosition(len(test_str))
+ CheckInputBufferPosition(self, len(test_str))
# Lastly try to jump again, beyond the end.
input_stream = [console.ControlKey.CTRL_E]
@@ -562,7 +581,7 @@ class TestConsoleEditingMethods(unittest.TestCase):
self.console.HandleChar(byte)
# Perform the same verification.
- self.CheckInputBufferPosition(len(test_str))
+ CheckInputBufferPosition(self, len(test_str))
# We expect to see the test string, a jump to the begining of the line, and
# one jump to the end of the line.
@@ -573,12 +592,12 @@ class TestConsoleEditingMethods(unittest.TestCase):
exp_console_out += OutputStream.MoveCursorRight(len(test_str))
# Verify the console output.
- self.CheckConsoleOutput(exp_console_out)
+ CheckConsoleOutput(self, exp_console_out)
def test_MoveLeftWithArrowKey(self):
"""Move cursor left one column with arrow key."""
test_str = 'tastyspam'
- input_stream = self.StringToByteList(test_str)
+ input_stream = StringToByteList(test_str)
input_stream.extend(Keys.LEFT_ARROW)
# Send the sequence out.
@@ -586,21 +605,21 @@ class TestConsoleEditingMethods(unittest.TestCase):
self.console.HandleChar(byte)
# Verify that the input buffer position is 1 less than the length.
- self.CheckInputBufferPosition(len(test_str) - 1)
+ CheckInputBufferPosition(self, len(test_str) - 1)
# Also, verify that the input buffer is not modified.
- self.CheckInputBuffer(test_str)
+ CheckInputBuffer(self, test_str)
# We expect the test string, followed by a one column move left.
exp_console_out = test_str + OutputStream.MoveCursorLeft(1)
# Verify console output.
- self.CheckConsoleOutput(exp_console_out)
+ CheckConsoleOutput(self, exp_console_out)
def test_MoveLeftWithCtrlB(self):
"""Move cursor back one column with Ctrl+B."""
test_str = 'tastyspam'
- input_stream = self.StringToByteList(test_str)
+ input_stream = StringToByteList(test_str)
input_stream.append(console.ControlKey.CTRL_B)
# Send the sequence out.
@@ -608,21 +627,21 @@ class TestConsoleEditingMethods(unittest.TestCase):
self.console.HandleChar(byte)
# Verify that the input buffer position is 1 less than the length.
- self.CheckInputBufferPosition(len(test_str) - 1)
+ CheckInputBufferPosition(self, len(test_str) - 1)
# Also, verify that the input buffer is not modified.
- self.CheckInputBuffer(test_str)
+ CheckInputBuffer(self, test_str)
# We expect the test string, followed by a one column move left.
exp_console_out = test_str + OutputStream.MoveCursorLeft(1)
# Verify console output.
- self.CheckConsoleOutput(exp_console_out)
+ CheckConsoleOutput(self, exp_console_out)
def test_MoveRightWithArrowKey(self):
"""Move cursor one column to the right with the arrow key."""
test_str = 'version'
- input_stream = self.StringToByteList(test_str)
+ input_stream = StringToByteList(test_str)
# Jump to beginning of line.
input_stream.append(console.ControlKey.CTRL_A)
# Press right arrow key.
@@ -633,10 +652,10 @@ class TestConsoleEditingMethods(unittest.TestCase):
self.console.HandleChar(byte)
# Verify that the input buffer position is 1.
- self.CheckInputBufferPosition(1)
+ CheckInputBufferPosition(self, 1)
# Also, verify that the input buffer is not modified.
- self.CheckInputBuffer(test_str)
+ CheckInputBuffer(self, test_str)
# We expect the test string, followed by a jump to the beginning of the
# line, and finally a move right 1.
@@ -646,12 +665,12 @@ class TestConsoleEditingMethods(unittest.TestCase):
exp_console_out += OutputStream.MoveCursorRight(1)
# Verify console output.
- self.CheckConsoleOutput(exp_console_out)
+ CheckConsoleOutput(self, exp_console_out)
def test_MoveRightWithCtrlF(self):
"""Move cursor forward one column with Ctrl+F."""
test_str = 'panicinfo'
- input_stream = self.StringToByteList(test_str)
+ input_stream = StringToByteList(test_str)
input_stream.append(console.ControlKey.CTRL_A)
# Now, move right one column.
input_stream.append(console.ControlKey.CTRL_F)
@@ -661,10 +680,10 @@ class TestConsoleEditingMethods(unittest.TestCase):
self.console.HandleChar(byte)
# Verify that the input buffer position is 1.
- self.CheckInputBufferPosition(1)
+ CheckInputBufferPosition(self, 1)
# Also, verify that the input buffer is not modified.
- self.CheckInputBuffer(test_str)
+ CheckInputBuffer(self, test_str)
# We expect the test string, followed by a jump to the beginning of the
# line, and finally a move right 1.
@@ -674,7 +693,7 @@ class TestConsoleEditingMethods(unittest.TestCase):
exp_console_out += OutputStream.MoveCursorRight(1)
# Verify console output.
- self.CheckConsoleOutput(exp_console_out)
+ CheckConsoleOutput(self, exp_console_out)
def test_ImpossibleMoveLeftWithArrowKey(self):
"""Verify that we can't move left at the beginning of the line."""
@@ -687,13 +706,13 @@ class TestConsoleEditingMethods(unittest.TestCase):
# Nothing should have been output.
exp_console_output = ''
- self.CheckConsoleOutput(exp_console_output)
+ CheckConsoleOutput(self, exp_console_output)
# The input buffer position should still be 0.
- self.CheckInputBufferPosition(0)
+ CheckInputBufferPosition(self, 0)
# The input buffer itself should be empty.
- self.CheckInputBuffer('')
+ CheckInputBuffer(self, '')
def test_ImpossibleMoveRightWithArrowKey(self):
"""Verify that we can't move right at the end of the line."""
@@ -706,18 +725,18 @@ class TestConsoleEditingMethods(unittest.TestCase):
# Nothing should have been output.
exp_console_output = ''
- self.CheckConsoleOutput(exp_console_output)
+ CheckConsoleOutput(self, exp_console_output)
# The input buffer position should still be 0.
- self.CheckInputBufferPosition(0)
+ CheckInputBufferPosition(self, 0)
# The input buffer itself should be empty.
- self.CheckInputBuffer('')
+ CheckInputBuffer(self, '')
def test_KillEntireLine(self):
"""Verify that we can kill an entire line with Ctrl+K."""
test_str = 'accelinfo on'
- input_stream = self.StringToByteList(test_str)
+ input_stream = StringToByteList(test_str)
# Jump to beginning of line and then kill it with Ctrl+K.
input_stream.extend([console.ControlKey.CTRL_A, console.ControlKey.CTRL_K])
@@ -726,10 +745,10 @@ class TestConsoleEditingMethods(unittest.TestCase):
self.console.HandleChar(byte)
# First, we expect that the input buffer is empty.
- self.CheckInputBuffer('')
+ CheckInputBuffer(self, '')
# The buffer position should be 0.
- self.CheckInputBufferPosition(0)
+ CheckInputBufferPosition(self, 0)
# What we expect to see on the console stream should be the following. The
# test string, a jump to the beginning of the line, then jump back to the
@@ -744,12 +763,12 @@ class TestConsoleEditingMethods(unittest.TestCase):
exp_console_out += BACKSPACE_STRING
# Verify the console output.
- self.CheckConsoleOutput(exp_console_out)
+ CheckConsoleOutput(self, exp_console_out)
def test_KillPartialLine(self):
"""Verify that we can kill a portion of a line."""
test_str = 'accelread 0 1'
- input_stream = self.StringToByteList(test_str)
+ input_stream = StringToByteList(test_str)
len_to_kill = 5
for _ in range(len_to_kill):
# Move cursor left
@@ -763,10 +782,10 @@ class TestConsoleEditingMethods(unittest.TestCase):
# First, check that the input buffer was truncated.
exp_input_buffer = test_str[:-len_to_kill]
- self.CheckInputBuffer(exp_input_buffer)
+ CheckInputBuffer(self, exp_input_buffer)
# Verify the input buffer position.
- self.CheckInputBufferPosition(len(test_str) - len_to_kill)
+ CheckInputBufferPosition(self, len(test_str) - len_to_kill)
# The console output stream that we expect is the test string followed by a
# move left of len_to_kill, then a jump to the end of the line and backspace
@@ -782,12 +801,12 @@ class TestConsoleEditingMethods(unittest.TestCase):
exp_console_out += BACKSPACE_STRING
# Verify console output.
- self.CheckConsoleOutput(exp_console_out)
+ CheckConsoleOutput(self, exp_console_out)
def test_InsertingCharacters(self):
"""Verify that we can insert charcters within the line."""
test_str = 'accel 0 1' # Here we forgot the 'read' part in 'accelread'
- input_stream = self.StringToByteList(test_str)
+ input_stream = StringToByteList(test_str)
# We need to move over to the 'l' and add read.
insertion_point = test_str.find('l') + 1
for i in range(len(test_str) - insertion_point):
@@ -795,7 +814,7 @@ class TestConsoleEditingMethods(unittest.TestCase):
input_stream.extend(Keys.LEFT_ARROW)
# Now, add in 'read'
added_str = 'read'
- input_stream.extend(self.StringToByteList(added_str))
+ input_stream.extend(StringToByteList(added_str))
# Send the sequence out.
for byte in input_stream:
@@ -804,11 +823,11 @@ class TestConsoleEditingMethods(unittest.TestCase):
# First, verify that the input buffer is correct.
exp_input_buffer = test_str[:insertion_point] + added_str
exp_input_buffer += test_str[insertion_point:]
- self.CheckInputBuffer(exp_input_buffer)
+ CheckInputBuffer(self, exp_input_buffer)
# Verify that the input buffer position is correct.
exp_input_buffer_pos = insertion_point + len(added_str)
- self.CheckInputBufferPosition(exp_input_buffer_pos)
+ CheckInputBufferPosition(self, exp_input_buffer_pos)
# The console output stream that we expect is the test string, followed by
# move cursor left until the 'l' was found, the added test string while
@@ -830,7 +849,7 @@ class TestConsoleEditingMethods(unittest.TestCase):
exp_console_out += OutputStream.MoveCursorLeft(reset_dist)
# Verify the console output.
- self.CheckConsoleOutput(exp_console_out)
+ CheckConsoleOutput(self, exp_console_out)
def test_StoreCommandHistory(self):
"""Verify that entered commands are stored in the history."""
@@ -840,7 +859,7 @@ class TestConsoleEditingMethods(unittest.TestCase):
test_commands.append('accelread 0 1')
input_stream = []
for c in test_commands:
- input_stream.extend(self.StringToByteList(c))
+ input_stream.extend(StringToByteList(c))
input_stream.append(console.ControlKey.CARRIAGE_RETURN)
# Send the sequence out.
@@ -849,7 +868,7 @@ class TestConsoleEditingMethods(unittest.TestCase):
# We expect to have the test commands in the history buffer.
exp_history_buf = test_commands
- self.CheckHistoryBuffer(exp_history_buf)
+ CheckHistoryBuffer(self, exp_history_buf)
def test_CycleUpThruCommandHistory(self):
"""Verify that the UP arrow key will print itmes in the history buffer."""
@@ -857,7 +876,7 @@ class TestConsoleEditingMethods(unittest.TestCase):
test_commands = ['version', 'accelrange 0', 'battery', 'gettime']
input_stream = []
for command in test_commands:
- input_stream.extend(self.StringToByteList(command))
+ input_stream.extend(StringToByteList(command))
input_stream.append(console.ControlKey.CARRIAGE_RETURN)
# Now, hit the UP arrow key to print the previous entries.
@@ -886,7 +905,7 @@ class TestConsoleEditingMethods(unittest.TestCase):
exp_console_out += test_commands[0]
# Now, verify.
- self.CheckConsoleOutput(exp_console_out)
+ CheckConsoleOutput(self, exp_console_out)
def test_UpArrowOnEmptyHistory(self):
"""Ensure nothing happens if the history is empty."""
@@ -904,16 +923,16 @@ class TestConsoleEditingMethods(unittest.TestCase):
exp_history_buf = []
# Verify.
- self.CheckConsoleOutput(exp_console_out)
- self.CheckInputBufferPosition(exp_input_buffer_pos)
- self.CheckInputBuffer(exp_input_buffer)
- self.CheckHistoryBuffer(exp_history_buf)
+ CheckConsoleOutput(self, exp_console_out)
+ CheckInputBufferPosition(self, exp_input_buffer_pos)
+ CheckInputBuffer(self, exp_input_buffer)
+ CheckHistoryBuffer(self, exp_history_buf)
def test_UpArrowDoesNotGoOutOfBounds(self):
"""Verify that pressing the up arrow many times won't go out of bounds."""
# Enter one command.
test_str = 'help version'
- input_stream = self.StringToByteList(test_str)
+ input_stream = StringToByteList(test_str)
input_stream.append(console.ControlKey.CARRIAGE_RETURN)
# Then press the up arrow key twice.
input_stream.extend(2 * Keys.UP_ARROW)
@@ -924,7 +943,7 @@ class TestConsoleEditingMethods(unittest.TestCase):
# Verify that the history buffer is correct.
exp_history_buf = [test_str]
- self.CheckHistoryBuffer(exp_history_buf)
+ CheckHistoryBuffer(self, exp_history_buf)
# We expect that the console output should only contain our entered command,
# a new prompt, and then our command aggain.
@@ -933,7 +952,7 @@ class TestConsoleEditingMethods(unittest.TestCase):
exp_console_out += test_str
# Verify.
- self.CheckConsoleOutput(exp_console_out)
+ CheckConsoleOutput(self, exp_console_out)
def test_CycleDownThruCommandHistory(self):
"""Verify that we can select entries by hitting the down arrow."""
@@ -941,7 +960,7 @@ class TestConsoleEditingMethods(unittest.TestCase):
test_commands = ['version', 'accelrange 0', 'battery', 'gettime']
input_stream = []
for command in test_commands:
- input_stream.extend(self.StringToByteList(command))
+ input_stream.extend(StringToByteList(command))
input_stream.append(console.ControlKey.CARRIAGE_RETURN)
# Now, hit the UP arrow key twice to print the previous two entries.
@@ -978,24 +997,24 @@ class TestConsoleEditingMethods(unittest.TestCase):
exp_console_out += BACKSPACE_STRING
# Verify console output.
- self.CheckConsoleOutput(exp_console_out)
+ CheckConsoleOutput(self, exp_console_out)
# Verify input buffer.
exp_input_buffer = '' # Empty because our partial command was empty.
exp_input_buffer_pos = len(exp_input_buffer)
- self.CheckInputBuffer(exp_input_buffer)
- self.CheckInputBufferPosition(exp_input_buffer_pos)
+ CheckInputBuffer(self, exp_input_buffer)
+ CheckInputBufferPosition(self, exp_input_buffer_pos)
def test_SavingPartialCommandWhenNavigatingHistory(self):
"""Verify that partial commands are saved when navigating history."""
# Enter a command.
test_str = 'accelinfo'
- input_stream = self.StringToByteList(test_str)
+ input_stream = StringToByteList(test_str)
input_stream.append(console.ControlKey.CARRIAGE_RETURN)
# Enter a partial command.
partial_cmd = 'ver'
- input_stream.extend(self.StringToByteList(partial_cmd))
+ input_stream.extend(StringToByteList(partial_cmd))
# Hit the UP arrow key.
input_stream.extend(Keys.UP_ARROW)
@@ -1019,13 +1038,13 @@ class TestConsoleEditingMethods(unittest.TestCase):
exp_console_out += partial_cmd
# Verify console output.
- self.CheckConsoleOutput(exp_console_out)
+ CheckConsoleOutput(self, exp_console_out)
# Verify input buffer.
exp_input_buffer = partial_cmd
exp_input_buffer_pos = len(exp_input_buffer)
- self.CheckInputBuffer(exp_input_buffer)
- self.CheckInputBufferPosition(exp_input_buffer_pos)
+ CheckInputBuffer(self, exp_input_buffer)
+ CheckInputBufferPosition(self, exp_input_buffer_pos)
def test_DownArrowOnEmptyHistory(self):
"""Ensure nothing happens if the history is empty."""
@@ -1043,15 +1062,15 @@ class TestConsoleEditingMethods(unittest.TestCase):
exp_history_buf = []
# Verify.
- self.CheckConsoleOutput(exp_console_out)
- self.CheckInputBufferPosition(exp_input_buffer_pos)
- self.CheckInputBuffer(exp_input_buffer)
- self.CheckHistoryBuffer(exp_history_buf)
+ CheckConsoleOutput(self, exp_console_out)
+ CheckInputBufferPosition(self, exp_input_buffer_pos)
+ CheckInputBuffer(self, exp_input_buffer)
+ CheckHistoryBuffer(self, exp_history_buf)
def test_DeleteCharsUsingDELKey(self):
"""Verify that we can delete characters using the DEL key."""
test_str = 'version'
- input_stream = self.StringToByteList(test_str)
+ input_stream = StringToByteList(test_str)
# Hit the left arrow key 2 times.
input_stream.extend(2 * Keys.LEFT_ARROW)
@@ -1077,14 +1096,14 @@ class TestConsoleEditingMethods(unittest.TestCase):
exp_console_out += OutputStream.MoveCursorLeft(2)
# Verify console output.
- self.CheckConsoleOutput(exp_console_out)
+ CheckConsoleOutput(self, exp_console_out)
# Verify input buffer. The input buffer should have the char sliced out and
# be positioned where the char was removed.
exp_input_buffer = test_str[:-2] + test_str[-1:]
exp_input_buffer_pos = len(exp_input_buffer) - 1
- self.CheckInputBuffer(exp_input_buffer)
- self.CheckInputBufferPosition(exp_input_buffer_pos)
+ CheckInputBuffer(self, exp_input_buffer)
+ CheckInputBufferPosition(self, exp_input_buffer_pos)
def test_RepeatedCommandInHistory(self):
"""Verify that we don't store 2 consecutive identical commands in history"""
@@ -1095,7 +1114,7 @@ class TestConsoleEditingMethods(unittest.TestCase):
input_stream = []
for command in test_commands:
- input_stream.extend(self.StringToByteList(command))
+ input_stream.extend(StringToByteList(command))
input_stream.append(console.ControlKey.CARRIAGE_RETURN)
# Send the sequence out.
@@ -1105,7 +1124,190 @@ class TestConsoleEditingMethods(unittest.TestCase):
# Verify that the history buffer is correct. The last command, since
# it was repeated, should not have been added to the history.
exp_history_buf = test_commands[0:len(test_commands)-1]
- self.CheckHistoryBuffer(exp_history_buf)
+ CheckHistoryBuffer(self, exp_history_buf)
+
+
+class TestConsoleCompatibility(unittest.TestCase):
+ """Verify that console can speak to enhanced and non-enhanced EC images."""
+ def setUp(self):
+ """Setup the test harness."""
+ # Setup logging with a timestamp, the module, and the log level.
+ logging.basicConfig(level=logging.DEBUG,
+ format=('%(asctime)s - %(module)s -'
+ ' %(levelname)s - %(message)s'))
+ # Create a temp file and set both the master and slave PTYs to the file to
+ # create a loopback.
+ self.tempfile = tempfile.TemporaryFile()
+
+ # Mock out the pipes.
+ dummy_pipe_end_0, dummy_pipe_end_1 = mock.MagicMock(), mock.MagicMock()
+ self.console = console.Console(self.tempfile.fileno(), self.tempfile,
+ dummy_pipe_end_0, dummy_pipe_end_1)
+
+ @mock.patch('console.Console.CheckForEnhancedECImage')
+ def test_ActAsPassThruInNonEnhancedMode(self, mock_check):
+ """Verify we simply pass everything thru to non-enhanced ECs.
+
+ Args:
+ mock_check: A MagicMock object replacing the CheckForEnhancedECImage()
+ method.
+ """
+ # Assume EC interrogations indicate that the image is non-enhanced.
+ mock_check.return_value = False
+
+ # Press enter, followed by the command, and another enter.
+ input_stream = []
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+ test_command = 'version'
+ input_stream.extend(StringToByteList(test_command))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # Expected calls to send down the pipe would be each character of the test
+ # command.
+ expected_calls = []
+ expected_calls.append(mock.call(chr(console.ControlKey.CARRIAGE_RETURN)))
+ for char in test_command:
+ expected_calls.append(mock.call(char))
+ expected_calls.append(mock.call(chr(console.ControlKey.CARRIAGE_RETURN)))
+
+ # Verify that the calls happened.
+ self.console.cmd_pipe.send.assert_has_calls(expected_calls)
+
+ # Since we're acting as a pass-thru, the input buffer should be empty and
+ # input_buffer_pos is 0.
+ CheckInputBuffer(self, '')
+ CheckInputBufferPosition(self, 0)
+
+ @mock.patch('console.Console.CheckForEnhancedECImage')
+ def test_TransitionFromNonEnhancedToEnhanced(self, mock_check):
+ """Verify that we transition correctly to enhanced mode.
+
+ Args:
+ mock_check: A MagicMock object replacing the CheckForEnhancedECImage()
+ method.
+ """
+ # First, assume that the EC interrogations indicate an enhanced EC image.
+ mock_check.return_value = True
+ # But our current knowledge of the EC image (which was actually the
+ # 'previous' EC) was a non-enhanced image.
+ self.console.enhanced_ec = False
+
+ test_command = 'sysinfo'
+ input_stream = []
+ input_stream.extend(StringToByteList(test_command))
+
+ expected_calls = []
+ # All keystrokes to the console should be directed straight through to the
+ # EC until we press the enter key.
+ for char in test_command:
+ expected_calls.append(mock.call(char))
+
+ # Press the enter key.
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+ # The enter key should not be sent to the pipe since we should negotiate
+ # to an enhanced EC image.
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # At this point, we should have negotiated to enhanced.
+ self.assertTrue(self.console.enhanced_ec, msg=('Did not negotiate to '
+ 'enhanced EC image.'))
+
+ # The command would have been dropped however, so verify this...
+ CheckInputBuffer(self, '')
+ CheckInputBufferPosition(self, 0)
+ # ...and repeat the command.
+ input_stream = StringToByteList(test_command)
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # Since we're enhanced now, we should have sent the entire command as one
+ # string with no trailing carriage return
+ expected_calls.append(mock.call(test_command))
+
+ # Verify all of the calls.
+ self.console.cmd_pipe.send.assert_has_calls(expected_calls)
+
+ @mock.patch('console.Console.CheckForEnhancedECImage')
+ def test_TransitionFromEnhancedToNonEnhanced(self, mock_check):
+ """Verify that we transition correctly to non-enhanced mode.
+
+ Args:
+ mock_check: A MagicMock object replacing the CheckForEnhancedECImage()
+ method.
+ """
+ # First, assume that the EC interrogations indicate an non-enhanced EC
+ # image.
+ mock_check.return_value = False
+ # But our current knowledge of the EC image (which was actually the
+ # 'previous' EC) was an enhanced image.
+ self.console.enhanced_ec = True
+
+ test_command = 'sysinfo'
+ input_stream = []
+ input_stream.extend(StringToByteList(test_command))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # But, we will negotiate to non-enhanced however, dropping this command.
+ # Verify this.
+ self.assertFalse(self.console.enhanced_ec, msg=('Did not negotiate to'
+ 'non-enhanced EC image.'))
+ CheckInputBuffer(self, '')
+ CheckInputBufferPosition(self, 0)
+
+ # The carriage return should have passed through though.
+ expected_calls = []
+ expected_calls.append(mock.call(chr(console.ControlKey.CARRIAGE_RETURN)))
+
+ # Since the command was dropped, repeat the command.
+ input_stream = StringToByteList(test_command)
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # Since we're not enhanced now, we should have sent each character in the
+ # entire command separately and a carriage return.
+ for char in test_command:
+ expected_calls.append(mock.call(char))
+ expected_calls.append(mock.call(chr(console.ControlKey.CARRIAGE_RETURN)))
+
+ # Verify all of the calls.
+ self.console.cmd_pipe.send.assert_has_calls(expected_calls)
+
+ def test_EnhancedCheckIfTimedOut(self):
+ """Verify that the check returns false if it times out."""
+ # Make the debug pipe "time out".
+ self.console.dbg_pipe.poll.return_value = False
+ self.assertFalse(self.console.CheckForEnhancedECImage())
+
+ def test_EnhancedCheckIfACKReceived(self):
+ """Verify that the check returns true if the ACK is received."""
+ # Make the debug pipe return EC_ACK.
+ self.console.dbg_pipe.poll.return_value = True
+ self.console.dbg_pipe.recv.return_value = interpreter.EC_ACK
+ self.assertTrue(self.console.CheckForEnhancedECImage())
+
+ def test_EnhancedCheckIfWrong(self):
+ """Verify that the check returns false if byte received is wrong."""
+ # Make the debug pipe return the wrong byte.
+ self.console.dbg_pipe.poll.return_value = True
+ self.console.dbg_pipe.recv.return_value = '\xff'
+ self.assertFalse(self.console.CheckForEnhancedECImage())
if __name__ == '__main__':
diff --git a/util/ec3po/interpreter.py b/util/ec3po/interpreter.py
index 4c31037584..999c7b96e0 100755
--- a/util/ec3po/interpreter.py
+++ b/util/ec3po/interpreter.py
@@ -12,6 +12,7 @@ additionally supports automatic command retrying if the EC drops a character in
a command.
"""
from __future__ import print_function
+import binascii
from chromite.lib import cros_logging as logging
import os
import Queue
@@ -20,6 +21,10 @@ import select
COMMAND_RETRIES = 3 # Number of attempts to retry a command.
EC_MAX_READ = 1024 # Max bytes to read at a time from the EC.
+EC_SYN = '\xec' # Byte indicating EC interrogation.
+EC_ACK = '\xc0' # Byte representing correct EC response to interrogation.
+EC_INTERROGATION_TIMEOUT = 0.1 # Maximum number of seconds to wait for a
+ # response to an interrogation.
class Interpreter(object):
@@ -27,7 +32,7 @@ class Interpreter(object):
This class essentially performs all of the intepretation for the EC and the
user. It handles all of the automatic command retrying as well as the
- formation of commands.
+ formation of commands for EC images which support that.
Attributes:
ec_uart_pty: A string representing the EC UART to connect to.
@@ -44,8 +49,15 @@ class Interpreter(object):
Initially, these are the EC UART and the command pipe.
outputs: A list of objects that the interpreter selects for writing.
ec_cmd_queue: A FIFO queue used for sending commands down to the EC UART.
- cmd_in_progress: A string that represents the current command sent to the
- EC that is pending reception verification.
+ last_cmd: A string that represents the last command sent to the EC. If an
+ error is encountered, the interpreter will attempt to retry this command
+ up to COMMAND_RETRIES.
+ enhanced_ec: A boolean indicating if the EC image that we are currently
+ communicating with is enhanced or not. Enhanced EC images will support
+ packed commands and host commands over the UART. This defaults to False
+ and is changed depending on the result of an interrogation.
+ interrogating: A boolean indicating if we are in the middle of interrogating
+ the EC.
"""
def __init__(self, ec_uart_pty, cmd_pipe, dbg_pipe, log_level=logging.INFO):
"""Intializes an Interpreter object with the provided args.
@@ -71,16 +83,38 @@ class Interpreter(object):
self.inputs = [self.ec_uart_pty, self.cmd_pipe]
self.outputs = []
self.ec_cmd_queue = Queue.Queue()
- self.cmd_in_progress = ''
+ self.last_cmd = ''
+ self.enhanced_ec = False
+ self.interrogating = False
- def EnqueueCmd(self, packed_cmd):
- """Enqueue a packed console command to be sent to the EC UART.
+ def __str__(self):
+ """Show internal state of the Interpreter object.
+
+ Returns:
+ A string that shows the values of the attributes.
+ """
+ string = []
+ string.append('%r' % self)
+ string.append('ec_uart_pty: %s' % self.ec_uart_pty)
+ string.append('cmd_pipe: %r' % self.cmd_pipe)
+ string.append('dbg_pipe: %r' % self.dbg_pipe)
+ string.append('cmd_retries: %d' % self.cmd_retries)
+ string.append('log_level: %d' % self.log_level)
+ string.append('inputs: %r' % self.inputs)
+ string.append('outputs: %r' % self.outputs)
+ string.append('ec_cmd_queue: %r' % self.ec_cmd_queue)
+ string.append('last_cmd: \'%s\'' % self.last_cmd)
+ string.append('enhanced_ec: %r' % self.enhanced_ec)
+ string.append('interrogating: %r' % self.interrogating)
+ return '\n'.join(string)
+
+ def EnqueueCmd(self, command):
+ """Enqueue a command to be sent to the EC UART.
Args:
- packed_cmd: A string which contains the packed command to be sent.
+ command: A string which contains the command to be sent.
"""
- # Enqueue a packed command to be sent to the EC.
- self.ec_cmd_queue.put(packed_cmd)
+ self.ec_cmd_queue.put(command)
logging.debug('Commands now in queue: %d', self.ec_cmd_queue.qsize())
# Add the EC UART as an output to be serviced.
self.outputs.append(self.ec_uart_pty)
@@ -105,19 +139,23 @@ class Interpreter(object):
Returns:
A string which contains the packed command.
"""
- # The command format is as follows.
- # &&[x][x][x][x]&{cmd}\n\n
- packed_cmd = []
- packed_cmd.append('&&')
- # The first pair of hex digits are the length of the command.
- packed_cmd.append('%02x' % len(raw_cmd))
- # Then the CRC8 of cmd.
- packed_cmd.append('%02x' % Crc8(raw_cmd))
- packed_cmd.append('&')
- # Now, the raw command followed by 2 newlines.
- packed_cmd.append(raw_cmd)
- packed_cmd.append('\n\n')
- return ''.join(packed_cmd)
+ # Don't pack a single carriage return.
+ if raw_cmd != '\r':
+ # The command format is as follows.
+ # &&[x][x][x][x]&{cmd}\n\n
+ packed_cmd = []
+ packed_cmd.append('&&')
+ # The first pair of hex digits are the length of the command.
+ packed_cmd.append('%02x' % len(raw_cmd))
+ # Then the CRC8 of cmd.
+ packed_cmd.append('%02x' % Crc8(raw_cmd))
+ packed_cmd.append('&')
+ # Now, the raw command followed by 2 newlines.
+ packed_cmd.append(raw_cmd)
+ packed_cmd.append('\n\n')
+ return ''.join(packed_cmd)
+ else:
+ return raw_cmd
def ProcessCommand(self, command):
"""Captures the input determines what actions to take.
@@ -125,41 +163,47 @@ class Interpreter(object):
Args:
command: A string representing the command sent by the user.
"""
- command = command.strip()
+ # Remove leading and trailing spaces only if this is an enhanced EC image.
+ # For non-enhanced EC images, commands will be single characters at a time
+ # and can be spaces.
+ if self.enhanced_ec:
+ command = command.strip(' ')
+
# There's nothing to do if the command is empty.
if len(command) == 0:
return
- # All other commands need to be packed first before they go to the EC.
- packed_cmd = self.PackCommand(command)
- logging.debug('packed cmd: ' + packed_cmd)
- self.EnqueueCmd(packed_cmd)
- # TODO(aaboagye): Make a dict of commands and keys and eventually, handle
- # partial matching based on unique prefixes.
-
- def CheckECResponse(self):
- """Checks the response from the EC for any errors."""
- # An invalid response is at most 4 bytes.
- data = os.read(self.ec_uart_pty.fileno(), 4)
- if '&E' not in data:
- # No error received. Clear the command in progress.
- self.cmd_in_progress = ''
- # Reset the retry count.
- self.cmd_retries = COMMAND_RETRIES
- # Forward the data to the user.
- self.dbg_pipe.send(data)
- elif self.cmd_retries > 0:
+ # Check for interrogation command.
+ if command == EC_SYN:
+ # User is requesting interrogation. Send SYN as is.
+ logging.debug('User requesting interrogation.')
+ self.interrogating = True
+ # Assume the EC isn't enhanced until we get a response.
+ self.enhanced_ec = False
+ elif self.enhanced_ec:
+ # Enhanced EC images require the plaintext commands to be packed.
+ command = self.PackCommand(command)
+ # TODO(aaboagye): Make a dict of commands and keys and eventually,
+ # handle partial matching based on unique prefixes.
+
+ logging.debug('command: \'%s\'', command)
+ self.EnqueueCmd(command)
+
+ def HandleCmdRetries(self):
+ """Attempts to retry commands if possible."""
+ if self.cmd_retries > 0:
# The EC encountered an error. We'll have to retry again.
- logging.warning('EC replied with error. Retrying.')
+ logging.warning('Retrying command...')
self.cmd_retries -= 1
logging.warning('Retries remaining: %d', self.cmd_retries)
- # Add the EC UART to the writers again.
+ # Retry the command and add the EC UART to the writers again.
+ self.EnqueueCmd(self.last_cmd)
self.outputs.append(self.ec_uart_pty)
else:
# We're out of retries, so just give up.
logging.error('Command failed. No retries left.')
# Clear the command in progress.
- self.cmd_in_progress = ''
+ self.last_cmd = ''
# Reset the retry count.
self.cmd_retries = COMMAND_RETRIES
@@ -167,7 +211,7 @@ class Interpreter(object):
"""Sends a command to the EC."""
# If we're retrying a command, just try to send it again.
if self.cmd_retries < COMMAND_RETRIES:
- cmd = self.cmd_in_progress
+ cmd = self.last_cmd
else:
# If we're not retrying, we should not be writing to the EC if we have no
# items in our command queue.
@@ -176,17 +220,54 @@ class Interpreter(object):
cmd = self.ec_cmd_queue.get()
# Send the command.
- logging.debug('Sending command to EC.')
self.ec_uart_pty.write(cmd)
self.ec_uart_pty.flush()
-
- # Now, that we've sent the command we will need to make sure the EC
- # received it without an error. Store the current command as in
- # progress. We will clear this if the EC responds with a non-error.
- self.cmd_in_progress = cmd
+ logging.debug('Sent command to EC.')
+
+ if self.enhanced_ec and cmd != EC_SYN:
+ # Now, that we've sent the command, store the current command as the last
+ # command sent. If we encounter an error string, we will attempt to retry
+ # this command.
+ if cmd != self.last_cmd:
+ self.last_cmd = cmd
+ # Reset the retry count.
+ self.cmd_retries = COMMAND_RETRIES
# Remove the EC UART from the writers while we wait for a response.
self.outputs.remove(self.ec_uart_pty)
+ def HandleECData(self):
+ """Handle any debug prints from the EC."""
+ logging.debug('EC has data')
+ # Read what the EC sent us.
+ data = os.read(self.ec_uart_pty.fileno(), EC_MAX_READ)
+ logging.debug('got: \'%s\'', binascii.hexlify(data))
+ if '&E' in data and self.enhanced_ec:
+ # We received an error, so we should retry it if possible.
+ logging.warning('Error string found in data.')
+ self.HandleCmdRetries()
+ return
+
+ # If we were interrogating, check the response and update our knowledge
+ # of the current EC image.
+ if self.interrogating:
+ self.enhanced_ec = data == EC_ACK
+ if self.enhanced_ec:
+ logging.debug('The current EC image seems enhanced.')
+ else:
+ logging.debug('The current EC image does NOT seem enhanced.')
+ # Done interrogating.
+ self.interrogating = False
+ # For now, just forward everything the EC sends us.
+ logging.debug('Forwarding to user...')
+ self.dbg_pipe.send(data)
+
+ def HandleUserData(self):
+ """Handle any incoming commands from the user."""
+ logging.debug('Command data available. Begin processing.')
+ data = self.cmd_pipe.recv()
+ # Process the command.
+ self.ProcessCommand(data)
+
def Crc8(data):
"""Calculates the CRC8 of data.
@@ -232,26 +313,11 @@ def StartLoop(interp):
for obj in readable:
# Handle any debug prints from the EC.
if obj is interp.ec_uart_pty:
- logging.debug('EC has data')
- if interp.cmd_in_progress:
- # A command was just sent to the EC. We need to check to see if the
- # EC is telling us that it received a corrupted command.
- logging.debug('Command in progress so checking response...')
- interp.CheckECResponse()
-
- # Read what the EC sent us.
- data = os.read(obj.fileno(), EC_MAX_READ)
- logging.debug('got: \'%s\'', data)
- # For now, just forward everything the EC sends us.
- logging.debug('Forwarding to user...')
- interp.dbg_pipe.send(data)
+ interp.HandleECData()
# Handle any commands from the user.
elif obj is interp.cmd_pipe:
- logging.debug('Command data available. Begin processing.')
- data = interp.cmd_pipe.recv()
- # Process the command.
- interp.ProcessCommand(data)
+ interp.HandleUserData()
for obj in writeable:
# Send a command to the EC.
diff --git a/util/ec3po/interpreter_unittest.py b/util/ec3po/interpreter_unittest.py
new file mode 100755
index 0000000000..46bbcf8e93
--- /dev/null
+++ b/util/ec3po/interpreter_unittest.py
@@ -0,0 +1,252 @@
+#!/usr/bin/python2
+# Copyright 2015 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+"""Unit tests for the EC-3PO interpreter."""
+from __future__ import print_function
+from chromite.lib import cros_logging as logging
+import mock
+import multiprocessing
+import tempfile
+import unittest
+
+import interpreter
+
+class TestEnhancedECBehaviour(unittest.TestCase):
+ """Test case to verify all enhanced EC interpretation tasks."""
+ def setUp(self):
+ """Setup the test harness."""
+ # Setup logging with a timestamp, the module, and the log level.
+ logging.basicConfig(level=logging.DEBUG,
+ format=('%(asctime)s - %(module)s -'
+ ' %(levelname)s - %(message)s'))
+
+ # Create a tempfile that would represent the EC UART PTY.
+ self.tempfile = tempfile.NamedTemporaryFile()
+
+ # Create the pipes that the interpreter will use.
+ self.cmd_pipe_user, self.cmd_pipe_itpr = multiprocessing.Pipe()
+ self.dbg_pipe_user, self.dbg_pipe_itpr = multiprocessing.Pipe(duplex=False)
+
+ # Mock the open() function so we can inspect reads/writes to the EC.
+ self.ec_uart_pty = mock.mock_open()
+ with mock.patch('__builtin__.open', self.ec_uart_pty):
+ # Create an interpreter.
+ self.itpr = interpreter.Interpreter(self.tempfile.name,
+ self.cmd_pipe_itpr,
+ self.dbg_pipe_itpr,
+ log_level=logging.DEBUG)
+
+ @mock.patch('interpreter.os')
+ def test_HandlingCommandsThatProduceNoOutput(self, mock_os):
+ """Verify that the Interpreter correctly handles non-output commands.
+
+ Args:
+ mock_os: MagicMock object replacing the 'os' module for this test
+ case.
+ """
+ # The interpreter init should open the EC UART PTY.
+ expected_ec_calls = [mock.call(self.tempfile.name, 'a+')]
+ # Have a command come in the command pipe. The first command will be an
+ # interrogation to determine if the EC is enhanced or not.
+ self.cmd_pipe_user.send(interpreter.EC_SYN)
+ self.itpr.HandleUserData()
+ # At this point, the command should be queued up waiting to be sent, so
+ # let's actually send it to the EC.
+ self.itpr.SendCmdToEC()
+ expected_ec_calls.extend([mock.call().write(interpreter.EC_SYN),
+ mock.call().flush()])
+ # Now, assume that the EC sends only 1 response back of EC_ACK.
+ mock_os.read.side_effect = [interpreter.EC_ACK]
+ # When reading the EC, the interpreter will call file.fileno() to pass to
+ # os.read().
+ expected_ec_calls.append(mock.call().fileno())
+ # Simulate the response.
+ self.itpr.HandleECData()
+
+ # Now that the interrogation was complete, it's time to send down the real
+ # command.
+ test_cmd = 'chan save'
+ # Send the test command down the pipe.
+ self.cmd_pipe_user.send(test_cmd)
+ self.itpr.HandleUserData()
+ self.itpr.SendCmdToEC()
+ # Since the EC image is enhanced, we should have sent a packed command.
+ expected_ec_calls.append(mock.call().write(self.itpr.PackCommand(test_cmd)))
+ expected_ec_calls.append(mock.call().flush())
+
+ # Now that the first command was sent, we should send another command which
+ # produces no output. The console would send another interrogation.
+ self.cmd_pipe_user.send(interpreter.EC_SYN)
+ self.itpr.HandleUserData()
+ self.itpr.SendCmdToEC()
+ expected_ec_calls.extend([mock.call().write(interpreter.EC_SYN),
+ mock.call().flush()])
+ # Again, assume that the EC sends only 1 response back of EC_ACK.
+ mock_os.read.side_effect = [interpreter.EC_ACK]
+ # When reading the EC, the interpreter will call file.fileno() to pass to
+ # os.read().
+ expected_ec_calls.append(mock.call().fileno())
+ # Simulate the response.
+ self.itpr.HandleECData()
+
+ # Now send the second test command.
+ test_cmd = 'chan 0'
+ self.cmd_pipe_user.send(test_cmd)
+ self.itpr.HandleUserData()
+ self.itpr.SendCmdToEC()
+ # Since the EC image is enhanced, we should have sent a packed command.
+ expected_ec_calls.append(mock.call().write(self.itpr.PackCommand(test_cmd)))
+ expected_ec_calls.append(mock.call().flush())
+
+ # Finally, verify that the appropriate writes were actually sent to the EC.
+ self.ec_uart_pty.assert_has_calls(expected_ec_calls)
+
+ @mock.patch('interpreter.os')
+ def test_CommandRetryingOnError(self, mock_os):
+ """Verify that commands are retried if an error is encountered.
+
+ Args:
+ mock_os: MagicMock object replacing the 'os' module for this test
+ case.
+ """
+ # The interpreter init should open the EC UART PTY.
+ expected_ec_calls = [mock.call(self.tempfile.name, 'a+')]
+ # Have a command come in the command pipe. The first command will be an
+ # interrogation to determine if the EC is enhanced or not.
+ self.cmd_pipe_user.send(interpreter.EC_SYN)
+ self.itpr.HandleUserData()
+ # At this point, the command should be queued up waiting to be sent, so
+ # let's actually send it to the EC.
+ self.itpr.SendCmdToEC()
+ expected_ec_calls.extend([mock.call().write(interpreter.EC_SYN),
+ mock.call().flush()])
+ # Now, assume that the EC sends only 1 response back of EC_ACK.
+ mock_os.read.side_effect = [interpreter.EC_ACK]
+ # When reading the EC, the interpreter will call file.fileno() to pass to
+ # os.read().
+ expected_ec_calls.append(mock.call().fileno())
+ # Simulate the response.
+ self.itpr.HandleECData()
+
+ # Let's send a command that is received on the EC-side with an error.
+ test_cmd = 'accelinfo'
+ self.cmd_pipe_user.send(test_cmd)
+ self.itpr.HandleUserData()
+ self.itpr.SendCmdToEC()
+ packed_cmd = self.itpr.PackCommand(test_cmd)
+ expected_ec_calls.extend([mock.call().write(packed_cmd),
+ mock.call().flush()])
+ # Have the EC return the error string twice.
+ mock_os.read.side_effect = ['&&EE', '&&EE']
+ for i in range(2):
+ # When reading the EC, the interpreter will call file.fileno() to pass to
+ # os.read().
+ expected_ec_calls.append(mock.call().fileno())
+ # Simulate the response.
+ self.itpr.HandleECData()
+
+ # Since an error was received, the EC should attempt to retry the command.
+ expected_ec_calls.extend([mock.call().write(packed_cmd),
+ mock.call().flush()])
+ # Verify that the retry count was decremented.
+ self.assertEqual(interpreter.COMMAND_RETRIES-i-1, self.itpr.cmd_retries,
+ 'Unexpected cmd_remaining count.')
+ # Actually retry the command.
+ self.itpr.SendCmdToEC()
+
+ # Now assume that the last one goes through with no trouble.
+ expected_ec_calls.extend([mock.call().write(packed_cmd),
+ mock.call().flush()])
+ self.itpr.SendCmdToEC()
+
+ # Verify all the calls.
+ self.ec_uart_pty.assert_has_calls(expected_ec_calls)
+
+ def test_PackCommandsForEnhancedEC(self):
+ """Verify that the interpreter packs commands for enhanced EC images."""
+ # Assume current EC image is enhanced.
+ self.itpr.enhanced_ec = True
+ # Receive a command from the user.
+ test_cmd = 'gettime'
+ self.cmd_pipe_user.send(test_cmd)
+ # Mock out PackCommand to see if it was called.
+ self.itpr.PackCommand = mock.MagicMock()
+ # Have the interpreter handle the command.
+ self.itpr.HandleUserData()
+ # Verify that PackCommand() was called.
+ self.itpr.PackCommand.assert_called_once_with(test_cmd)
+
+ def test_DontPackCommandsForNonEnhancedEC(self):
+ """Verify the interpreter doesn't pack commands for non-enhanced images."""
+ # Assume current EC image is not enhanced.
+ self.itpr.enhanced_ec = False
+ # Receive a command from the user.
+ test_cmd = 'gettime'
+ self.cmd_pipe_user.send(test_cmd)
+ # Mock out PackCommand to see if it was called.
+ self.itpr.PackCommand = mock.MagicMock()
+ # Have the interpreter handle the command.
+ self.itpr.HandleUserData()
+ # Verify that PackCommand() was called.
+ self.itpr.PackCommand.assert_not_called()
+
+ @mock.patch('interpreter.os')
+ def test_KeepingTrackOfInterrogation(self, mock_os):
+ """Verify that the interpreter can track the state of the interrogation.
+
+ Args:
+ mock_os: MagicMock object replacing the 'os' module. for this test
+ case.
+ """
+ # Upon init, the interpreter should assume that the current EC image is not
+ # enhanced.
+ self.assertFalse(self.itpr.enhanced_ec, msg=('State of enhanced_ec upon'
+ ' init is not False.'))
+
+ # Assume an interrogation request comes in from the user.
+ self.cmd_pipe_user.send(interpreter.EC_SYN)
+ self.itpr.HandleUserData()
+
+ # Verify the state is now within an interrogation.
+ self.assertTrue(self.itpr.interrogating, 'interrogating should be True')
+ # The state of enhanced_ec should not be changed yet because we haven't
+ # received a valid response yet.
+ self.assertFalse(self.itpr.enhanced_ec, msg=('State of enhanced_ec is '
+ 'not False.'))
+
+ # Assume that the EC responds with an EC_ACK.
+ mock_os.read.side_effect = [interpreter.EC_ACK]
+ self.itpr.HandleECData()
+
+ # Now, the interrogation should be complete and we should know that the
+ # current EC image is enhanced.
+ self.assertFalse(self.itpr.interrogating, msg=('interrogating should be '
+ 'False'))
+ self.assertTrue(self.itpr.enhanced_ec, msg='enhanced_ec sholud be True')
+
+ # Now let's perform another interrogation, but pretend that the EC ignores
+ # it.
+ self.cmd_pipe_user.send(interpreter.EC_SYN)
+ self.itpr.HandleUserData()
+
+ # Verify interrogating state.
+ self.assertTrue(self.itpr.interrogating, 'interrogating sholud be True')
+ # We should assume that the image is not enhanced until we get the valid
+ # response.
+ self.assertFalse(self.itpr.enhanced_ec, 'enhanced_ec should be False now.')
+
+ # Let's pretend that we get a random debug print. This should clear the
+ # interrogating flag.
+ mock_os.read.side_effect = '[1660.593076 HC 0x103]'
+ self.itpr.HandleECData()
+
+ # Verify that interrogating flag is cleared and enhanced_ec is still False.
+ self.assertFalse(self.itpr.interrogating, 'interrogating should be False.')
+ self.assertFalse(self.itpr.enhanced_ec,
+ 'enhanced_ec should still be False.')
+
+
+
+if __name__ == '__main__':
+ unittest.main()