diff options
Diffstat (limited to 'util')
-rwxr-xr-x | util/ec3po/console.py | 130 | ||||
-rwxr-xr-x | util/ec3po/console_unittest.py | 174 | ||||
-rw-r--r-- | util/ec3po/interpreter.py | 4 | ||||
-rwxr-xr-x | util/ec3po/interpreter_unittest.py | 2 |
4 files changed, 292 insertions, 18 deletions
diff --git a/util/ec3po/console.py b/util/ec3po/console.py index e75523e80b..f17ac49fd3 100755 --- a/util/ec3po/console.py +++ b/util/ec3po/console.py @@ -43,6 +43,9 @@ NON_ENHANCED_EC_INTERROGATION_TIMEOUT = 0.3 # Maximum number of seconds to wait ENHANCED_EC_INTERROGATION_TIMEOUT = 1.0 # Maximum number of seconds to wait for # a response to an interrogation of an # enhanced EC image. +INTERROGATION_MODES = ['never', 'always', 'auto'] # List of modes which control + # when interrogations are + # performed with the EC. class EscState(object): @@ -107,6 +110,11 @@ class Console(object): until we perform some handshaking. interrogation_timeout: A float representing the current maximum seconds to wait for a response to an interrogation. + receiving_oobm_cmd: A boolean indicating whether or not the console is in + the middle of receiving an out of band command. + pending_oobm_cmd: A string containing the pending OOBM command. + interrogation_mode: A string containing the current mode of whether + interrogations are performed with the EC or not and how often. """ def __init__(self, master_pty, user_pty, cmd_pipe, dbg_pipe): @@ -140,6 +148,9 @@ class Console(object): self.prompt = PROMPT self.enhanced_ec = False self.interrogation_timeout = NON_ENHANCED_EC_INTERROGATION_TIMEOUT + self.receiving_oobm_cmd = False + self.pending_oobm_cmd = '' + self.interrogation_mode = 'auto' def __str__(self): """Show internal state of Console object as a string.""" @@ -444,12 +455,49 @@ class Console(object): Args: byte: An integer representing the character received from the user. """ - # Interrogate the EC every time we press the enter key. This is necessary - # so that we know whether or not we should provide the console interface or - # simply behave as a pass-thru. + fd = self.master_pty + + # Enter the OOBM prompt mode if the user presses '%'. + if byte == ord('%'): + self.logger.debug('Begin OOBM command.') + self.receiving_oobm_cmd = True + # Print a "prompt". + os.write(self.master_pty, '\r\n% ') + return + + # Add chars to the pending OOBM command if we're currently receiving one. + if self.receiving_oobm_cmd and byte != ControlKey.CARRIAGE_RETURN: + self.pending_oobm_cmd += chr(byte) + self.logger.debug('%s', chr(byte)) + os.write(self.master_pty, chr(byte)) + return + if byte == ControlKey.CARRIAGE_RETURN: - self.enhanced_ec = self.CheckForEnhancedECImage() - self.logger.debug('Enhanced EC image? %r', self.enhanced_ec) + if self.receiving_oobm_cmd: + # Terminate the command and place it in the OOBM queue. + self.logger.debug('End OOBM command.') + if self.pending_oobm_cmd: + self.oobm_queue.put(self.pending_oobm_cmd) + self.logger.debug('Placed \'%s\' into OOBM command queue.', + self.pending_oobm_cmd) + + # Reset the state. + os.write(self.master_pty, '\r\n' + self.prompt) + self.input_buffer = '' + self.input_buffer_pos = 0 + self.receiving_oobm_cmd = False + self.pending_oobm_cmd = '' + return + + if self.interrogation_mode == 'never': + self.logger.debug('Skipping interrogation because interrogation mode' + ' is set to never.') + else: + # Only interrogate the EC if the interrogation mode is NOT set to + # 'never'. + # TODO(aaboagye): Implement the 'auto' mode. + self.enhanced_ec = self.CheckForEnhancedECImage() + self.logger.debug('Enhanced EC image? %r', self.enhanced_ec) if not self.enhanced_ec: # Send everything straight to the EC to handle. @@ -477,7 +525,6 @@ class Console(object): # If the input buffer is full we can't accept new chars. buffer_full = len(self.input_buffer) >= self.line_limit - fd = self.master_pty # Carriage_Return/Enter if byte == ControlKey.CARRIAGE_RETURN: @@ -650,6 +697,64 @@ class Console(object): """Backspace a character on the console.""" os.write(self.master_pty, '\033[1D \033[1D') + def ProcessOOBMQueue(self): + """Retrieve an item from the OOBM queue and process it.""" + item = self.oobm_queue.get() + self.logger.debug('OOBM cmd: %s', item) + cmd = item.split(' ') + + if cmd[0] == 'loglevel': + # An integer is required in order to set the log level. + if len(cmd) < 2: + self.logger.debug('Insufficient args') + self.PrintOOBMHelp() + return + try: + self.logger.debug('Log level change request.') + new_log_level = int(cmd[1]) + self.logger.logger.setLevel(new_log_level) + self.logger.info('Log level changed to %d.', new_log_level) + + # Forward the request to the interpreter as well. + self.cmd_pipe.send(item) + except ValueError: + # Ignoring the request if an integer was not provided. + self.PrintOOBMHelp() + + elif cmd[0] == 'interrogate' and len(cmd) >= 2: + enhanced = False + mode = cmd[1] + if len(cmd) >= 3 and cmd[2] == 'enhanced': + enhanced = True + + # Set the mode if correct. + if mode in INTERROGATION_MODES: + self.interrogation_mode = mode + self.logger.debug('Updated interrogation mode to %s.', mode) + if mode == 'auto': + os.write(self.master_pty, + 'auto not implemented yet; treating it as always.\r\n') + + # Update the assumptions of the EC image. + self.enhanced_ec = enhanced + self.logger.debug('Enhanced EC image is now %r', self.enhanced_ec) + + # Send command to interpreter as well. + self.cmd_pipe.send('enhanced ' + str(self.enhanced_ec)) + else: + self.PrintOOBMHelp() + + else: + self.PrintOOBMHelp() + + def PrintOOBMHelp(self): + """Prints out the OOBM help.""" + # Print help syntax. + os.write(self.master_pty, '\r\n' + 'Known OOBM commands:\r\n') + os.write(self.master_pty, ' interrogate <never | always | auto> ' + '[enhanced]\r\n') + os.write(self.master_pty, ' loglevel <int>\r\n') + def IsPrintable(byte): """Determines if a byte is printable. @@ -700,17 +805,8 @@ def StartLoop(console): os.write(console.master_pty, data) while not console.oobm_queue.empty(): - console.logger.debug('OOBM queue ready for reading.') - cmd = console.oobm_queue.get() - console.logger.debug('cmd: %s', cmd) - if cmd.startswith('loglevel'): - console.logger.debug('Log level change request.') - new_log_level = int(cmd.split(' ')[1]) - console.logger.logger.setLevel(new_log_level) - console.logger.info('Log level changed to %d.', new_log_level) - - # Forward the request to the interpreter as well. - console.cmd_pipe.send(cmd) + console.logger.debug('OOBM queue ready for reading.') + console.ProcessOOBMQueue() finally: # Close pipes. diff --git a/util/ec3po/console_unittest.py b/util/ec3po/console_unittest.py index 5963e9c84f..5e4d67516e 100755 --- a/util/ec3po/console_unittest.py +++ b/util/ec3po/console_unittest.py @@ -1314,5 +1314,179 @@ class TestConsoleCompatibility(unittest.TestCase): self.assertFalse(self.console.CheckForEnhancedECImage()) +class TestOOBMConsoleCommands(unittest.TestCase): + """Verify that OOBM console commands work correctly.""" + def setUp(self): + """Setup the test harness.""" + # Setup logging with a timestamp, the module, and the log level. + logging.basicConfig(level=logging.DEBUG, + format=('%(asctime)s - %(module)s -' + ' %(levelname)s - %(message)s')) + # Create a temp file and set both the master and slave PTYs to the file to + # create a loopback. + self.tempfile = tempfile.TemporaryFile() + + # Mock out the pipes. + dummy_pipe_end_0, dummy_pipe_end_1 = mock.MagicMock(), mock.MagicMock() + self.console = console.Console(self.tempfile.fileno(), self.tempfile, + dummy_pipe_end_0, dummy_pipe_end_1) + self.console.oobm_queue = mock.MagicMock() + + @mock.patch('console.Console.CheckForEnhancedECImage') + def test_InterrogateCommand(self, mock_check): + """Verify that 'interrogate' command works as expected. + + Args: + mock_check: A MagicMock object replacing the CheckForEnhancedECIMage() + method. + """ + input_stream = [] + expected_calls = [] + mock_check.side_effect = [False] + + # 'interrogate never' should disable the interrogation from happening at + # all. + cmd = 'interrogate never' + # Enter the OOBM prompt. + input_stream.extend(StringToByteList('%')) + # Type the command + input_stream.extend(StringToByteList(cmd)) + # Press enter. + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + input_stream = [] + + # The OOBM queue should have been called with the command being put. + expected_calls.append(mock.call.put(cmd)) + self.console.oobm_queue.assert_has_calls(expected_calls) + + # Process the OOBM queue. + self.console.oobm_queue.get.side_effect = [cmd] + self.console.ProcessOOBMQueue() + + # Type out a few commands. + input_stream.extend(StringToByteList('version')) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + input_stream.extend(StringToByteList('flashinfo')) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + input_stream.extend(StringToByteList('sysinfo')) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # The Check function should NOT have been called at all. + mock_check.assert_not_called() + + # The EC image should be assumed to be not enhanced. + self.assertFalse(self.console.enhanced_ec, 'The image should be assumed to' + ' be NOT enhanced.') + + # Reset the mocks. + mock_check.reset_mock() + self.console.oobm_queue.reset_mock() + + # 'interrogate auto' should interrogate after each press of the enter key. + # TODO(aaboagye): Fix this test when you update auto to check after each + # reboot. For now, it would behave the same as 'interrogate always'. + cmd = 'interrogate auto' + # Enter the OOBM prompt. + input_stream.extend(StringToByteList('%')) + # Type the command + input_stream.extend(StringToByteList(cmd)) + # Press enter. + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + input_stream = [] + expected_calls = [] + + # The OOBM queue should have been called with the command being put. + expected_calls.append(mock.call.put(cmd)) + self.console.oobm_queue.assert_has_calls(expected_calls) + + # Process the OOBM queue. + self.console.oobm_queue.get.side_effect = [cmd] + self.console.ProcessOOBMQueue() + + # The Check method should be called 3 times here. + mock_check.side_effect = [False, False, False] + + # Type out a few commands. + input_stream.extend(StringToByteList('help list')) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + input_stream.extend(StringToByteList('taskinfo')) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + input_stream.extend(StringToByteList('hibdelay')) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # The Check method should have been called 3 times here. + expected_calls = [mock.call(), mock.call(), mock.call()] + mock_check.assert_has_calls(expected_calls) + + # The EC image should be assumed to be not enhanced. + self.assertFalse(self.console.enhanced_ec, 'The image should be assumed to' + ' be NOT enhanced.') + + # Now, let's try to assume that the image is enhanced while still disabling + # interrogation. + mock_check.reset_mock() + self.console.oobm_queue.reset_mock() + input_stream = [] + cmd = 'interrogate never enhanced' + # Enter the OOBM prompt. + input_stream.extend(StringToByteList('%')) + # Type the command + input_stream.extend(StringToByteList(cmd)) + # Press enter. + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + input_stream = [] + expected_calls = [] + + # The OOBM queue should have been called with the command being put. + expected_calls.append(mock.call.put(cmd)) + self.console.oobm_queue.assert_has_calls(expected_calls) + + # Process the OOBM queue. + self.console.oobm_queue.get.side_effect = [cmd] + self.console.ProcessOOBMQueue() + + # Type out a few commands. + input_stream.extend(StringToByteList('chgstate')) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + input_stream.extend(StringToByteList('hash')) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + input_stream.extend(StringToByteList('sysjump rw')) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # The check method should have never been called. + mock_check.assert_not_called() + + # The EC image should be assumed to be enhanced. + self.assertTrue(self.console.enhanced_ec, 'The image should be' + ' assumed to be enhanced.') + + if __name__ == '__main__': unittest.main() diff --git a/util/ec3po/interpreter.py b/util/ec3po/interpreter.py index 4875d66b74..73123b746a 100644 --- a/util/ec3po/interpreter.py +++ b/util/ec3po/interpreter.py @@ -224,6 +224,10 @@ class Interpreter(object): self.logger.debug('Connected to %s.', self.ec_uart_pty_name) return + elif command.startswith('enhanced'): + self.enhanced_ec = command.split(' ')[1] == 'True' + return + # Ignore any other commands while in the disconnected state. self.logger.debug('command: \'%s\'', command) if not self.connected: diff --git a/util/ec3po/interpreter_unittest.py b/util/ec3po/interpreter_unittest.py index c616394f8f..6eb6b3abda 100755 --- a/util/ec3po/interpreter_unittest.py +++ b/util/ec3po/interpreter_unittest.py @@ -351,7 +351,7 @@ class TestUARTDisconnection(unittest.TestCase): # Verify interpreter is connected. self.assertTrue(self.itpr.connected, ('The interpreter should be' - 'connected.')) + 'connected.')) # Verify that the EC UART is now a member of the inputs. self.assertTrue(self.itpr.ec_uart_pty in self.itpr.inputs) # Since we have issued no commands during the disconnected state, no |