summaryrefslogtreecommitdiff
path: root/util
diff options
context:
space:
mode:
Diffstat (limited to 'util')
-rwxr-xr-xutil/ec3po/console.py130
-rwxr-xr-xutil/ec3po/console_unittest.py174
-rw-r--r--util/ec3po/interpreter.py4
-rwxr-xr-xutil/ec3po/interpreter_unittest.py2
4 files changed, 292 insertions, 18 deletions
diff --git a/util/ec3po/console.py b/util/ec3po/console.py
index e75523e80b..f17ac49fd3 100755
--- a/util/ec3po/console.py
+++ b/util/ec3po/console.py
@@ -43,6 +43,9 @@ NON_ENHANCED_EC_INTERROGATION_TIMEOUT = 0.3 # Maximum number of seconds to wait
ENHANCED_EC_INTERROGATION_TIMEOUT = 1.0 # Maximum number of seconds to wait for
# a response to an interrogation of an
# enhanced EC image.
+INTERROGATION_MODES = ['never', 'always', 'auto'] # List of modes which control
+ # when interrogations are
+ # performed with the EC.
class EscState(object):
@@ -107,6 +110,11 @@ class Console(object):
until we perform some handshaking.
interrogation_timeout: A float representing the current maximum seconds to
wait for a response to an interrogation.
+ receiving_oobm_cmd: A boolean indicating whether or not the console is in
+ the middle of receiving an out of band command.
+ pending_oobm_cmd: A string containing the pending OOBM command.
+ interrogation_mode: A string containing the current mode of whether
+ interrogations are performed with the EC or not and how often.
"""
def __init__(self, master_pty, user_pty, cmd_pipe, dbg_pipe):
@@ -140,6 +148,9 @@ class Console(object):
self.prompt = PROMPT
self.enhanced_ec = False
self.interrogation_timeout = NON_ENHANCED_EC_INTERROGATION_TIMEOUT
+ self.receiving_oobm_cmd = False
+ self.pending_oobm_cmd = ''
+ self.interrogation_mode = 'auto'
def __str__(self):
"""Show internal state of Console object as a string."""
@@ -444,12 +455,49 @@ class Console(object):
Args:
byte: An integer representing the character received from the user.
"""
- # Interrogate the EC every time we press the enter key. This is necessary
- # so that we know whether or not we should provide the console interface or
- # simply behave as a pass-thru.
+ fd = self.master_pty
+
+ # Enter the OOBM prompt mode if the user presses '%'.
+ if byte == ord('%'):
+ self.logger.debug('Begin OOBM command.')
+ self.receiving_oobm_cmd = True
+ # Print a "prompt".
+ os.write(self.master_pty, '\r\n% ')
+ return
+
+ # Add chars to the pending OOBM command if we're currently receiving one.
+ if self.receiving_oobm_cmd and byte != ControlKey.CARRIAGE_RETURN:
+ self.pending_oobm_cmd += chr(byte)
+ self.logger.debug('%s', chr(byte))
+ os.write(self.master_pty, chr(byte))
+ return
+
if byte == ControlKey.CARRIAGE_RETURN:
- self.enhanced_ec = self.CheckForEnhancedECImage()
- self.logger.debug('Enhanced EC image? %r', self.enhanced_ec)
+ if self.receiving_oobm_cmd:
+ # Terminate the command and place it in the OOBM queue.
+ self.logger.debug('End OOBM command.')
+ if self.pending_oobm_cmd:
+ self.oobm_queue.put(self.pending_oobm_cmd)
+ self.logger.debug('Placed \'%s\' into OOBM command queue.',
+ self.pending_oobm_cmd)
+
+ # Reset the state.
+ os.write(self.master_pty, '\r\n' + self.prompt)
+ self.input_buffer = ''
+ self.input_buffer_pos = 0
+ self.receiving_oobm_cmd = False
+ self.pending_oobm_cmd = ''
+ return
+
+ if self.interrogation_mode == 'never':
+ self.logger.debug('Skipping interrogation because interrogation mode'
+ ' is set to never.')
+ else:
+ # Only interrogate the EC if the interrogation mode is NOT set to
+ # 'never'.
+ # TODO(aaboagye): Implement the 'auto' mode.
+ self.enhanced_ec = self.CheckForEnhancedECImage()
+ self.logger.debug('Enhanced EC image? %r', self.enhanced_ec)
if not self.enhanced_ec:
# Send everything straight to the EC to handle.
@@ -477,7 +525,6 @@ class Console(object):
# If the input buffer is full we can't accept new chars.
buffer_full = len(self.input_buffer) >= self.line_limit
- fd = self.master_pty
# Carriage_Return/Enter
if byte == ControlKey.CARRIAGE_RETURN:
@@ -650,6 +697,64 @@ class Console(object):
"""Backspace a character on the console."""
os.write(self.master_pty, '\033[1D \033[1D')
+ def ProcessOOBMQueue(self):
+ """Retrieve an item from the OOBM queue and process it."""
+ item = self.oobm_queue.get()
+ self.logger.debug('OOBM cmd: %s', item)
+ cmd = item.split(' ')
+
+ if cmd[0] == 'loglevel':
+ # An integer is required in order to set the log level.
+ if len(cmd) < 2:
+ self.logger.debug('Insufficient args')
+ self.PrintOOBMHelp()
+ return
+ try:
+ self.logger.debug('Log level change request.')
+ new_log_level = int(cmd[1])
+ self.logger.logger.setLevel(new_log_level)
+ self.logger.info('Log level changed to %d.', new_log_level)
+
+ # Forward the request to the interpreter as well.
+ self.cmd_pipe.send(item)
+ except ValueError:
+ # Ignoring the request if an integer was not provided.
+ self.PrintOOBMHelp()
+
+ elif cmd[0] == 'interrogate' and len(cmd) >= 2:
+ enhanced = False
+ mode = cmd[1]
+ if len(cmd) >= 3 and cmd[2] == 'enhanced':
+ enhanced = True
+
+ # Set the mode if correct.
+ if mode in INTERROGATION_MODES:
+ self.interrogation_mode = mode
+ self.logger.debug('Updated interrogation mode to %s.', mode)
+ if mode == 'auto':
+ os.write(self.master_pty,
+ 'auto not implemented yet; treating it as always.\r\n')
+
+ # Update the assumptions of the EC image.
+ self.enhanced_ec = enhanced
+ self.logger.debug('Enhanced EC image is now %r', self.enhanced_ec)
+
+ # Send command to interpreter as well.
+ self.cmd_pipe.send('enhanced ' + str(self.enhanced_ec))
+ else:
+ self.PrintOOBMHelp()
+
+ else:
+ self.PrintOOBMHelp()
+
+ def PrintOOBMHelp(self):
+ """Prints out the OOBM help."""
+ # Print help syntax.
+ os.write(self.master_pty, '\r\n' + 'Known OOBM commands:\r\n')
+ os.write(self.master_pty, ' interrogate <never | always | auto> '
+ '[enhanced]\r\n')
+ os.write(self.master_pty, ' loglevel <int>\r\n')
+
def IsPrintable(byte):
"""Determines if a byte is printable.
@@ -700,17 +805,8 @@ def StartLoop(console):
os.write(console.master_pty, data)
while not console.oobm_queue.empty():
- console.logger.debug('OOBM queue ready for reading.')
- cmd = console.oobm_queue.get()
- console.logger.debug('cmd: %s', cmd)
- if cmd.startswith('loglevel'):
- console.logger.debug('Log level change request.')
- new_log_level = int(cmd.split(' ')[1])
- console.logger.logger.setLevel(new_log_level)
- console.logger.info('Log level changed to %d.', new_log_level)
-
- # Forward the request to the interpreter as well.
- console.cmd_pipe.send(cmd)
+ console.logger.debug('OOBM queue ready for reading.')
+ console.ProcessOOBMQueue()
finally:
# Close pipes.
diff --git a/util/ec3po/console_unittest.py b/util/ec3po/console_unittest.py
index 5963e9c84f..5e4d67516e 100755
--- a/util/ec3po/console_unittest.py
+++ b/util/ec3po/console_unittest.py
@@ -1314,5 +1314,179 @@ class TestConsoleCompatibility(unittest.TestCase):
self.assertFalse(self.console.CheckForEnhancedECImage())
+class TestOOBMConsoleCommands(unittest.TestCase):
+ """Verify that OOBM console commands work correctly."""
+ def setUp(self):
+ """Setup the test harness."""
+ # Setup logging with a timestamp, the module, and the log level.
+ logging.basicConfig(level=logging.DEBUG,
+ format=('%(asctime)s - %(module)s -'
+ ' %(levelname)s - %(message)s'))
+ # Create a temp file and set both the master and slave PTYs to the file to
+ # create a loopback.
+ self.tempfile = tempfile.TemporaryFile()
+
+ # Mock out the pipes.
+ dummy_pipe_end_0, dummy_pipe_end_1 = mock.MagicMock(), mock.MagicMock()
+ self.console = console.Console(self.tempfile.fileno(), self.tempfile,
+ dummy_pipe_end_0, dummy_pipe_end_1)
+ self.console.oobm_queue = mock.MagicMock()
+
+ @mock.patch('console.Console.CheckForEnhancedECImage')
+ def test_InterrogateCommand(self, mock_check):
+ """Verify that 'interrogate' command works as expected.
+
+ Args:
+ mock_check: A MagicMock object replacing the CheckForEnhancedECIMage()
+ method.
+ """
+ input_stream = []
+ expected_calls = []
+ mock_check.side_effect = [False]
+
+ # 'interrogate never' should disable the interrogation from happening at
+ # all.
+ cmd = 'interrogate never'
+ # Enter the OOBM prompt.
+ input_stream.extend(StringToByteList('%'))
+ # Type the command
+ input_stream.extend(StringToByteList(cmd))
+ # Press enter.
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ input_stream = []
+
+ # The OOBM queue should have been called with the command being put.
+ expected_calls.append(mock.call.put(cmd))
+ self.console.oobm_queue.assert_has_calls(expected_calls)
+
+ # Process the OOBM queue.
+ self.console.oobm_queue.get.side_effect = [cmd]
+ self.console.ProcessOOBMQueue()
+
+ # Type out a few commands.
+ input_stream.extend(StringToByteList('version'))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+ input_stream.extend(StringToByteList('flashinfo'))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+ input_stream.extend(StringToByteList('sysinfo'))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # The Check function should NOT have been called at all.
+ mock_check.assert_not_called()
+
+ # The EC image should be assumed to be not enhanced.
+ self.assertFalse(self.console.enhanced_ec, 'The image should be assumed to'
+ ' be NOT enhanced.')
+
+ # Reset the mocks.
+ mock_check.reset_mock()
+ self.console.oobm_queue.reset_mock()
+
+ # 'interrogate auto' should interrogate after each press of the enter key.
+ # TODO(aaboagye): Fix this test when you update auto to check after each
+ # reboot. For now, it would behave the same as 'interrogate always'.
+ cmd = 'interrogate auto'
+ # Enter the OOBM prompt.
+ input_stream.extend(StringToByteList('%'))
+ # Type the command
+ input_stream.extend(StringToByteList(cmd))
+ # Press enter.
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ input_stream = []
+ expected_calls = []
+
+ # The OOBM queue should have been called with the command being put.
+ expected_calls.append(mock.call.put(cmd))
+ self.console.oobm_queue.assert_has_calls(expected_calls)
+
+ # Process the OOBM queue.
+ self.console.oobm_queue.get.side_effect = [cmd]
+ self.console.ProcessOOBMQueue()
+
+ # The Check method should be called 3 times here.
+ mock_check.side_effect = [False, False, False]
+
+ # Type out a few commands.
+ input_stream.extend(StringToByteList('help list'))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+ input_stream.extend(StringToByteList('taskinfo'))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+ input_stream.extend(StringToByteList('hibdelay'))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # The Check method should have been called 3 times here.
+ expected_calls = [mock.call(), mock.call(), mock.call()]
+ mock_check.assert_has_calls(expected_calls)
+
+ # The EC image should be assumed to be not enhanced.
+ self.assertFalse(self.console.enhanced_ec, 'The image should be assumed to'
+ ' be NOT enhanced.')
+
+ # Now, let's try to assume that the image is enhanced while still disabling
+ # interrogation.
+ mock_check.reset_mock()
+ self.console.oobm_queue.reset_mock()
+ input_stream = []
+ cmd = 'interrogate never enhanced'
+ # Enter the OOBM prompt.
+ input_stream.extend(StringToByteList('%'))
+ # Type the command
+ input_stream.extend(StringToByteList(cmd))
+ # Press enter.
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ input_stream = []
+ expected_calls = []
+
+ # The OOBM queue should have been called with the command being put.
+ expected_calls.append(mock.call.put(cmd))
+ self.console.oobm_queue.assert_has_calls(expected_calls)
+
+ # Process the OOBM queue.
+ self.console.oobm_queue.get.side_effect = [cmd]
+ self.console.ProcessOOBMQueue()
+
+ # Type out a few commands.
+ input_stream.extend(StringToByteList('chgstate'))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+ input_stream.extend(StringToByteList('hash'))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+ input_stream.extend(StringToByteList('sysjump rw'))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # The check method should have never been called.
+ mock_check.assert_not_called()
+
+ # The EC image should be assumed to be enhanced.
+ self.assertTrue(self.console.enhanced_ec, 'The image should be'
+ ' assumed to be enhanced.')
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/util/ec3po/interpreter.py b/util/ec3po/interpreter.py
index 4875d66b74..73123b746a 100644
--- a/util/ec3po/interpreter.py
+++ b/util/ec3po/interpreter.py
@@ -224,6 +224,10 @@ class Interpreter(object):
self.logger.debug('Connected to %s.', self.ec_uart_pty_name)
return
+ elif command.startswith('enhanced'):
+ self.enhanced_ec = command.split(' ')[1] == 'True'
+ return
+
# Ignore any other commands while in the disconnected state.
self.logger.debug('command: \'%s\'', command)
if not self.connected:
diff --git a/util/ec3po/interpreter_unittest.py b/util/ec3po/interpreter_unittest.py
index c616394f8f..6eb6b3abda 100755
--- a/util/ec3po/interpreter_unittest.py
+++ b/util/ec3po/interpreter_unittest.py
@@ -351,7 +351,7 @@ class TestUARTDisconnection(unittest.TestCase):
# Verify interpreter is connected.
self.assertTrue(self.itpr.connected, ('The interpreter should be'
- 'connected.'))
+ 'connected.'))
# Verify that the EC UART is now a member of the inputs.
self.assertTrue(self.itpr.ec_uart_pty in self.itpr.inputs)
# Since we have issued no commands during the disconnected state, no