summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xutil/ec3po/console.py61
-rwxr-xr-xutil/ec3po/console_unittest.py140
2 files changed, 191 insertions, 10 deletions
diff --git a/util/ec3po/console.py b/util/ec3po/console.py
index f17ac49fd3..e5f888af0e 100755
--- a/util/ec3po/console.py
+++ b/util/ec3po/console.py
@@ -19,6 +19,7 @@ import logging
import multiprocessing
import os
import pty
+import re
import select
import stat
import sys
@@ -29,6 +30,16 @@ import interpreter
PROMPT = '> '
CONSOLE_INPUT_LINE_SIZE = 80 # Taken from the CONFIG_* with the same name.
CONSOLE_MAX_READ = 100 # Max bytes to read at a time from the user.
+LOOK_BUFFER_SIZE = 256 # Size of search window when looking for the enhanced EC
+ # image string.
+
+# In console_init(), the EC will print a string saying that the EC console is
+# enabled. Enhanced images will print a slightly different string. These
+# regular expressions are used to determine at reboot whether the EC image is
+# enhanced or not.
+ENHANCED_IMAGE_RE = re.compile(r'Enhanced Console is enabled '
+ r'\(v([0-9]+\.[0-9]+\.[0-9]+)\)')
+NON_ENHANCED_IMAGE_RE = re.compile(r'Console is enabled; ')
# The timeouts are really only useful for enhanced EC images, but otherwise just
# serve as a delay for non-enhanced EC images. Therefore, we can keep this
@@ -151,6 +162,7 @@ class Console(object):
self.receiving_oobm_cmd = False
self.pending_oobm_cmd = ''
self.interrogation_mode = 'auto'
+ self.look_buffer = ''
def __str__(self):
"""Show internal state of Console object as a string."""
@@ -168,6 +180,8 @@ class Console(object):
string.append('history_pos: %d' % self.history_pos)
string.append('prompt: \'%s\'' % self.prompt)
string.append('partial_cmd: \'%s\''% self.partial_cmd)
+ string.append('interrogation_mode: \'%s\'' % self.interrogation_mode)
+ string.append('look_buffer: \'%s\'' % self.look_buffer)
return '\n'.join(string)
def PrintHistory(self):
@@ -492,10 +506,8 @@ class Console(object):
if self.interrogation_mode == 'never':
self.logger.debug('Skipping interrogation because interrogation mode'
' is set to never.')
- else:
- # Only interrogate the EC if the interrogation mode is NOT set to
- # 'never'.
- # TODO(aaboagye): Implement the 'auto' mode.
+ elif self.interrogation_mode == 'always':
+ # Only interrogate the EC if the interrogation mode is set to 'always'.
self.enhanced_ec = self.CheckForEnhancedECImage()
self.logger.debug('Enhanced EC image? %r', self.enhanced_ec)
@@ -731,9 +743,6 @@ class Console(object):
if mode in INTERROGATION_MODES:
self.interrogation_mode = mode
self.logger.debug('Updated interrogation mode to %s.', mode)
- if mode == 'auto':
- os.write(self.master_pty,
- 'auto not implemented yet; treating it as always.\r\n')
# Update the assumptions of the EC image.
self.enhanced_ec = enhanced
@@ -755,6 +764,41 @@ class Console(object):
'[enhanced]\r\n')
os.write(self.master_pty, ' loglevel <int>\r\n')
+ def CheckBufferForEnhancedImage(self, data):
+ """Adds data to a look buffer and checks to see for enhanced EC image.
+
+ The EC's console task prints a string upon initialization which says that
+ "Console is enabled; type HELP for help.". The enhanced EC images print a
+ different string as a part of their init. This function searches through a
+ "look" buffer, scanning for the presence of either of those strings and
+ updating the enhanced_ec state accordingly.
+
+ Args:
+ data: A string containing the data sent from the interpreter.
+ """
+ self.look_buffer += data
+
+ # Search the buffer for any of the EC image strings.
+ enhanced_match = re.search(ENHANCED_IMAGE_RE, self.look_buffer)
+ non_enhanced_match = re.search(NON_ENHANCED_IMAGE_RE, self.look_buffer)
+
+ # Update the state if any matches were found.
+ if enhanced_match or non_enhanced_match:
+ if enhanced_match:
+ self.enhanced_ec = True
+ elif non_enhanced_match:
+ self.enhanced_ec = False
+
+ # Inform the interpreter of the result.
+ self.cmd_pipe.send('enhanced ' + str(self.enhanced_ec))
+ self.logger.debug('Enhanced EC image? %r', self.enhanced_ec)
+
+ # Clear look buffer since a match was found.
+ self.look_buffer = ''
+
+ # Move the sliding window.
+ self.look_buffer = self.look_buffer[-LOOK_BUFFER_SIZE:]
+
def IsPrintable(byte):
"""Determines if a byte is printable.
@@ -800,6 +844,9 @@ def StartLoop(console):
elif obj is console.dbg_pipe:
data = console.dbg_pipe.recv()
+ if console.interrogation_mode == 'auto':
+ # Search look buffer for enhanced EC image string.
+ console.CheckBufferForEnhancedImage(data)
# Write it to the user console.
console.logger.debug('|DBG|->\'%s\'', data)
os.write(console.master_pty, data)
diff --git a/util/ec3po/console_unittest.py b/util/ec3po/console_unittest.py
index 5e4d67516e..82bf4ba212 100755
--- a/util/ec3po/console_unittest.py
+++ b/util/ec3po/console_unittest.py
@@ -1156,6 +1156,9 @@ class TestConsoleCompatibility(unittest.TestCase):
mock_check: A MagicMock object replacing the CheckForEnhancedECImage()
method.
"""
+ # Set the interrogation mode to always so that we actually interrogate.
+ self.console.interrogation_mode = 'always'
+
# Assume EC interrogations indicate that the image is non-enhanced.
mock_check.return_value = False
@@ -1194,6 +1197,9 @@ class TestConsoleCompatibility(unittest.TestCase):
mock_check: A MagicMock object replacing the CheckForEnhancedECImage()
method.
"""
+ # Set the interrogation mode to always so that we actually interrogate.
+ self.console.interrogation_mode = 'always'
+
# First, assume that the EC interrogations indicate an enhanced EC image.
mock_check.return_value = True
# But our current knowledge of the EC image (which was actually the
@@ -1249,6 +1255,9 @@ class TestConsoleCompatibility(unittest.TestCase):
mock_check: A MagicMock object replacing the CheckForEnhancedECImage()
method.
"""
+ # Set the interrogation mode to always so that we actually interrogate.
+ self.console.interrogation_mode = 'always'
+
# First, assume that the EC interrogations indicate an non-enhanced EC
# image.
mock_check.return_value = False
@@ -1313,6 +1322,83 @@ class TestConsoleCompatibility(unittest.TestCase):
self.console.dbg_pipe.recv.return_value = '\xff'
self.assertFalse(self.console.CheckForEnhancedECImage())
+ def test_EnhancedCheckUsingBuffer(self):
+ """Verify that given reboot output, enhanced EC images are detected."""
+ enhanced_output_stream = """
+--- UART initialized after reboot ---
+[Reset cause: reset-pin soft]
+[Image: RO, jerry_v1.1.4363-2af8572-dirty 2016-02-23 13:26:20 aaboagye@lithium.mtv.corp.google.com]
+[0.001695 KB boot key 0]
+[0.001790 Inits done]
+[0.001923 not sysjump; forcing AP shutdown]
+[0.002047 EC triggered warm reboot]
+[0.002155 assert GPIO_PMIC_WARM_RESET_L for 4 ms]
+[0.006326 auto_power_on set due to reset_flag 0x22]
+[0.006477 Wait for battery stabilized during 1000000]
+[0.007368 battery responded with status c0]
+[0.009099 hash start 0x00010000 0x0000eb7c]
+[0.009307 KB init state: -- -- -- -- -- -- -- -- -- -- -- -- --]
+[0.009531 KB wait]
+Enhanced Console is enabled (v1.0.0); type HELP for help.
+> [0.009782 event set 0x00002000]
+[0.009903 hostcmd init 0x2000]
+[0.010031 power state 0 = G3, in 0x0000]
+[0.010173 power state 4 = G3->S5, in 0x0000]
+[0.010324 power state 1 = S5, in 0x0000]
+[0.010466 power on 2]
+[0.010566 power state 5 = S5->S3, in 0x0000]
+[0.037713 event set 0x00000080]
+[0.037836 event set 0x00400000]
+[0.038675 Battery 89% / 1092h:15 to empty]
+[0.224060 hash done 41dac382e3a6e3d2ea5b4d789c1bc46525cae7cc5ff6758f0de8d8369b506f57]
+[0.375150 POWER_GOOD seen]
+"""
+ for line in enhanced_output_stream.split('\n'):
+ self.console.CheckBufferForEnhancedImage(line)
+
+ # Since the enhanced console string was present in the output, the console
+ # should have caught it.
+ self.assertTrue(self.console.enhanced_ec)
+
+ # Also should check that the command was sent to the interpreter.
+ self.console.cmd_pipe.send.assert_called_once_with('enhanced True')
+
+ # Now test the non-enhanced EC image.
+ self.console.cmd_pipe.reset_mock()
+ non_enhanced_output_stream = """
+--- UART initialized after reboot ---
+[Reset cause: reset-pin soft]
+[Image: RO, jerry_v1.1.4363-2af8572-dirty 2016-02-23 13:03:15 aaboagye@lithium.mtv.corp.google.com]
+[0.001695 KB boot key 0]
+[0.001790 Inits done]
+[0.001923 not sysjump; forcing AP shutdown]
+[0.002047 EC triggered warm reboot]
+[0.002156 assert GPIO_PMIC_WARM_RESET_L for 4 ms]
+[0.006326 auto_power_on set due to reset_flag 0x22]
+[0.006477 Wait for battery stabilized during 1000000]
+[0.007368 battery responded with status c0]
+[0.008951 hash start 0x00010000 0x0000ed78]
+[0.009159 KB init state: -- -- -- -- -- -- -- -- -- -- -- -- --]
+[0.009383 KB wait]
+Console is enabled; type HELP for help.
+> [0.009602 event set 0x00002000]
+[0.009722 hostcmd init 0x2000]
+[0.009851 power state 0 = G3, in 0x0000]
+[0.009993 power state 4 = G3->S5, in 0x0000]
+[0.010144 power state 1 = S5, in 0x0000]
+[0.010285 power on 2]
+[0.010385 power state 5 = S5->S3, in 0x0000]
+"""
+ for line in non_enhanced_output_stream.split('\n'):
+ self.console.CheckBufferForEnhancedImage(line)
+
+ # Since the default console string is present in the output, it should be
+ # determined to be non enhanced now.
+ self.assertFalse(self.console.enhanced_ec)
+
+ # Check that command was also sent to the interpreter.
+ self.console.cmd_pipe.send.assert_called_once_with('enhanced False')
+
class TestOOBMConsoleCommands(unittest.TestCase):
"""Verify that OOBM console commands work correctly."""
@@ -1391,9 +1477,8 @@ class TestOOBMConsoleCommands(unittest.TestCase):
mock_check.reset_mock()
self.console.oobm_queue.reset_mock()
- # 'interrogate auto' should interrogate after each press of the enter key.
- # TODO(aaboagye): Fix this test when you update auto to check after each
- # reboot. For now, it would behave the same as 'interrogate always'.
+ # 'interrogate auto' should not interrogate at all. It should only be
+ # scanning the output stream for the 'console is enabled' strings.
cmd = 'interrogate auto'
# Enter the OOBM prompt.
input_stream.extend(StringToByteList('%'))
@@ -1417,6 +1502,55 @@ class TestOOBMConsoleCommands(unittest.TestCase):
self.console.oobm_queue.get.side_effect = [cmd]
self.console.ProcessOOBMQueue()
+ # Type out a few commands.
+ input_stream.extend(StringToByteList('version'))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+ input_stream.extend(StringToByteList('flashinfo'))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+ input_stream.extend(StringToByteList('sysinfo'))
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ # The Check function should NOT have been called at all.
+ mock_check.assert_not_called()
+
+ # The EC image should be assumed to be not enhanced.
+ self.assertFalse(self.console.enhanced_ec, 'The image should be assumed to'
+ ' be NOT enhanced.')
+
+ # Reset the mocks.
+ mock_check.reset_mock()
+ self.console.oobm_queue.reset_mock()
+
+ # 'interrogate always' should, like its name implies, interrogate always
+ # after each press of the enter key. This was the former way of doing
+ # interrogation.
+ cmd = 'interrogate always'
+ # Enter the OOBM prompt.
+ input_stream.extend(StringToByteList('%'))
+ # Type the command
+ input_stream.extend(StringToByteList(cmd))
+ # Press enter.
+ input_stream.append(console.ControlKey.CARRIAGE_RETURN)
+
+ # Send the sequence out.
+ for byte in input_stream:
+ self.console.HandleChar(byte)
+
+ input_stream = []
+ expected_calls = []
+
+ # The OOBM queue should have been called with the command being put.
+ expected_calls.append(mock.call.put(cmd))
+ self.console.oobm_queue.assert_has_calls(expected_calls)
+
+ # Process the OOBM queue.
+ self.console.oobm_queue.get.side_effect = [cmd]
+ self.console.ProcessOOBMQueue()
+
# The Check method should be called 3 times here.
mock_check.side_effect = [False, False, False]