diff options
-rwxr-xr-x | util/ec3po/console.py | 61 | ||||
-rwxr-xr-x | util/ec3po/console_unittest.py | 140 |
2 files changed, 191 insertions, 10 deletions
diff --git a/util/ec3po/console.py b/util/ec3po/console.py index f17ac49fd3..e5f888af0e 100755 --- a/util/ec3po/console.py +++ b/util/ec3po/console.py @@ -19,6 +19,7 @@ import logging import multiprocessing import os import pty +import re import select import stat import sys @@ -29,6 +30,16 @@ import interpreter PROMPT = '> ' CONSOLE_INPUT_LINE_SIZE = 80 # Taken from the CONFIG_* with the same name. CONSOLE_MAX_READ = 100 # Max bytes to read at a time from the user. +LOOK_BUFFER_SIZE = 256 # Size of search window when looking for the enhanced EC + # image string. + +# In console_init(), the EC will print a string saying that the EC console is +# enabled. Enhanced images will print a slightly different string. These +# regular expressions are used to determine at reboot whether the EC image is +# enhanced or not. +ENHANCED_IMAGE_RE = re.compile(r'Enhanced Console is enabled ' + r'\(v([0-9]+\.[0-9]+\.[0-9]+)\)') +NON_ENHANCED_IMAGE_RE = re.compile(r'Console is enabled; ') # The timeouts are really only useful for enhanced EC images, but otherwise just # serve as a delay for non-enhanced EC images. Therefore, we can keep this @@ -151,6 +162,7 @@ class Console(object): self.receiving_oobm_cmd = False self.pending_oobm_cmd = '' self.interrogation_mode = 'auto' + self.look_buffer = '' def __str__(self): """Show internal state of Console object as a string.""" @@ -168,6 +180,8 @@ class Console(object): string.append('history_pos: %d' % self.history_pos) string.append('prompt: \'%s\'' % self.prompt) string.append('partial_cmd: \'%s\''% self.partial_cmd) + string.append('interrogation_mode: \'%s\'' % self.interrogation_mode) + string.append('look_buffer: \'%s\'' % self.look_buffer) return '\n'.join(string) def PrintHistory(self): @@ -492,10 +506,8 @@ class Console(object): if self.interrogation_mode == 'never': self.logger.debug('Skipping interrogation because interrogation mode' ' is set to never.') - else: - # Only interrogate the EC if the interrogation mode is NOT set to - # 'never'. - # TODO(aaboagye): Implement the 'auto' mode. + elif self.interrogation_mode == 'always': + # Only interrogate the EC if the interrogation mode is set to 'always'. self.enhanced_ec = self.CheckForEnhancedECImage() self.logger.debug('Enhanced EC image? %r', self.enhanced_ec) @@ -731,9 +743,6 @@ class Console(object): if mode in INTERROGATION_MODES: self.interrogation_mode = mode self.logger.debug('Updated interrogation mode to %s.', mode) - if mode == 'auto': - os.write(self.master_pty, - 'auto not implemented yet; treating it as always.\r\n') # Update the assumptions of the EC image. self.enhanced_ec = enhanced @@ -755,6 +764,41 @@ class Console(object): '[enhanced]\r\n') os.write(self.master_pty, ' loglevel <int>\r\n') + def CheckBufferForEnhancedImage(self, data): + """Adds data to a look buffer and checks to see for enhanced EC image. + + The EC's console task prints a string upon initialization which says that + "Console is enabled; type HELP for help.". The enhanced EC images print a + different string as a part of their init. This function searches through a + "look" buffer, scanning for the presence of either of those strings and + updating the enhanced_ec state accordingly. + + Args: + data: A string containing the data sent from the interpreter. + """ + self.look_buffer += data + + # Search the buffer for any of the EC image strings. + enhanced_match = re.search(ENHANCED_IMAGE_RE, self.look_buffer) + non_enhanced_match = re.search(NON_ENHANCED_IMAGE_RE, self.look_buffer) + + # Update the state if any matches were found. + if enhanced_match or non_enhanced_match: + if enhanced_match: + self.enhanced_ec = True + elif non_enhanced_match: + self.enhanced_ec = False + + # Inform the interpreter of the result. + self.cmd_pipe.send('enhanced ' + str(self.enhanced_ec)) + self.logger.debug('Enhanced EC image? %r', self.enhanced_ec) + + # Clear look buffer since a match was found. + self.look_buffer = '' + + # Move the sliding window. + self.look_buffer = self.look_buffer[-LOOK_BUFFER_SIZE:] + def IsPrintable(byte): """Determines if a byte is printable. @@ -800,6 +844,9 @@ def StartLoop(console): elif obj is console.dbg_pipe: data = console.dbg_pipe.recv() + if console.interrogation_mode == 'auto': + # Search look buffer for enhanced EC image string. + console.CheckBufferForEnhancedImage(data) # Write it to the user console. console.logger.debug('|DBG|->\'%s\'', data) os.write(console.master_pty, data) diff --git a/util/ec3po/console_unittest.py b/util/ec3po/console_unittest.py index 5e4d67516e..82bf4ba212 100755 --- a/util/ec3po/console_unittest.py +++ b/util/ec3po/console_unittest.py @@ -1156,6 +1156,9 @@ class TestConsoleCompatibility(unittest.TestCase): mock_check: A MagicMock object replacing the CheckForEnhancedECImage() method. """ + # Set the interrogation mode to always so that we actually interrogate. + self.console.interrogation_mode = 'always' + # Assume EC interrogations indicate that the image is non-enhanced. mock_check.return_value = False @@ -1194,6 +1197,9 @@ class TestConsoleCompatibility(unittest.TestCase): mock_check: A MagicMock object replacing the CheckForEnhancedECImage() method. """ + # Set the interrogation mode to always so that we actually interrogate. + self.console.interrogation_mode = 'always' + # First, assume that the EC interrogations indicate an enhanced EC image. mock_check.return_value = True # But our current knowledge of the EC image (which was actually the @@ -1249,6 +1255,9 @@ class TestConsoleCompatibility(unittest.TestCase): mock_check: A MagicMock object replacing the CheckForEnhancedECImage() method. """ + # Set the interrogation mode to always so that we actually interrogate. + self.console.interrogation_mode = 'always' + # First, assume that the EC interrogations indicate an non-enhanced EC # image. mock_check.return_value = False @@ -1313,6 +1322,83 @@ class TestConsoleCompatibility(unittest.TestCase): self.console.dbg_pipe.recv.return_value = '\xff' self.assertFalse(self.console.CheckForEnhancedECImage()) + def test_EnhancedCheckUsingBuffer(self): + """Verify that given reboot output, enhanced EC images are detected.""" + enhanced_output_stream = """ +--- UART initialized after reboot --- +[Reset cause: reset-pin soft] +[Image: RO, jerry_v1.1.4363-2af8572-dirty 2016-02-23 13:26:20 aaboagye@lithium.mtv.corp.google.com] +[0.001695 KB boot key 0] +[0.001790 Inits done] +[0.001923 not sysjump; forcing AP shutdown] +[0.002047 EC triggered warm reboot] +[0.002155 assert GPIO_PMIC_WARM_RESET_L for 4 ms] +[0.006326 auto_power_on set due to reset_flag 0x22] +[0.006477 Wait for battery stabilized during 1000000] +[0.007368 battery responded with status c0] +[0.009099 hash start 0x00010000 0x0000eb7c] +[0.009307 KB init state: -- -- -- -- -- -- -- -- -- -- -- -- --] +[0.009531 KB wait] +Enhanced Console is enabled (v1.0.0); type HELP for help. +> [0.009782 event set 0x00002000] +[0.009903 hostcmd init 0x2000] +[0.010031 power state 0 = G3, in 0x0000] +[0.010173 power state 4 = G3->S5, in 0x0000] +[0.010324 power state 1 = S5, in 0x0000] +[0.010466 power on 2] +[0.010566 power state 5 = S5->S3, in 0x0000] +[0.037713 event set 0x00000080] +[0.037836 event set 0x00400000] +[0.038675 Battery 89% / 1092h:15 to empty] +[0.224060 hash done 41dac382e3a6e3d2ea5b4d789c1bc46525cae7cc5ff6758f0de8d8369b506f57] +[0.375150 POWER_GOOD seen] +""" + for line in enhanced_output_stream.split('\n'): + self.console.CheckBufferForEnhancedImage(line) + + # Since the enhanced console string was present in the output, the console + # should have caught it. + self.assertTrue(self.console.enhanced_ec) + + # Also should check that the command was sent to the interpreter. + self.console.cmd_pipe.send.assert_called_once_with('enhanced True') + + # Now test the non-enhanced EC image. + self.console.cmd_pipe.reset_mock() + non_enhanced_output_stream = """ +--- UART initialized after reboot --- +[Reset cause: reset-pin soft] +[Image: RO, jerry_v1.1.4363-2af8572-dirty 2016-02-23 13:03:15 aaboagye@lithium.mtv.corp.google.com] +[0.001695 KB boot key 0] +[0.001790 Inits done] +[0.001923 not sysjump; forcing AP shutdown] +[0.002047 EC triggered warm reboot] +[0.002156 assert GPIO_PMIC_WARM_RESET_L for 4 ms] +[0.006326 auto_power_on set due to reset_flag 0x22] +[0.006477 Wait for battery stabilized during 1000000] +[0.007368 battery responded with status c0] +[0.008951 hash start 0x00010000 0x0000ed78] +[0.009159 KB init state: -- -- -- -- -- -- -- -- -- -- -- -- --] +[0.009383 KB wait] +Console is enabled; type HELP for help. +> [0.009602 event set 0x00002000] +[0.009722 hostcmd init 0x2000] +[0.009851 power state 0 = G3, in 0x0000] +[0.009993 power state 4 = G3->S5, in 0x0000] +[0.010144 power state 1 = S5, in 0x0000] +[0.010285 power on 2] +[0.010385 power state 5 = S5->S3, in 0x0000] +""" + for line in non_enhanced_output_stream.split('\n'): + self.console.CheckBufferForEnhancedImage(line) + + # Since the default console string is present in the output, it should be + # determined to be non enhanced now. + self.assertFalse(self.console.enhanced_ec) + + # Check that command was also sent to the interpreter. + self.console.cmd_pipe.send.assert_called_once_with('enhanced False') + class TestOOBMConsoleCommands(unittest.TestCase): """Verify that OOBM console commands work correctly.""" @@ -1391,9 +1477,8 @@ class TestOOBMConsoleCommands(unittest.TestCase): mock_check.reset_mock() self.console.oobm_queue.reset_mock() - # 'interrogate auto' should interrogate after each press of the enter key. - # TODO(aaboagye): Fix this test when you update auto to check after each - # reboot. For now, it would behave the same as 'interrogate always'. + # 'interrogate auto' should not interrogate at all. It should only be + # scanning the output stream for the 'console is enabled' strings. cmd = 'interrogate auto' # Enter the OOBM prompt. input_stream.extend(StringToByteList('%')) @@ -1417,6 +1502,55 @@ class TestOOBMConsoleCommands(unittest.TestCase): self.console.oobm_queue.get.side_effect = [cmd] self.console.ProcessOOBMQueue() + # Type out a few commands. + input_stream.extend(StringToByteList('version')) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + input_stream.extend(StringToByteList('flashinfo')) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + input_stream.extend(StringToByteList('sysinfo')) + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + # The Check function should NOT have been called at all. + mock_check.assert_not_called() + + # The EC image should be assumed to be not enhanced. + self.assertFalse(self.console.enhanced_ec, 'The image should be assumed to' + ' be NOT enhanced.') + + # Reset the mocks. + mock_check.reset_mock() + self.console.oobm_queue.reset_mock() + + # 'interrogate always' should, like its name implies, interrogate always + # after each press of the enter key. This was the former way of doing + # interrogation. + cmd = 'interrogate always' + # Enter the OOBM prompt. + input_stream.extend(StringToByteList('%')) + # Type the command + input_stream.extend(StringToByteList(cmd)) + # Press enter. + input_stream.append(console.ControlKey.CARRIAGE_RETURN) + + # Send the sequence out. + for byte in input_stream: + self.console.HandleChar(byte) + + input_stream = [] + expected_calls = [] + + # The OOBM queue should have been called with the command being put. + expected_calls.append(mock.call.put(cmd)) + self.console.oobm_queue.assert_has_calls(expected_calls) + + # Process the OOBM queue. + self.console.oobm_queue.get.side_effect = [cmd] + self.console.ProcessOOBMQueue() + # The Check method should be called 3 times here. mock_check.side_effect = [False, False, False] |