summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAseda Aboagye <aaboagye@google.com>2016-01-08 14:48:48 -0800
committerchrome-bot <chrome-bot@chromium.org>2016-01-11 20:05:40 -0800
commit4368dcfb32942740dd11188de6a8658cdd448a5a (patch)
treedc25c50872a550d11fa192654b1de1cbbb13d32e
parent119916761c23c5d2c9c31689e0b1ee445bb57c37 (diff)
downloadchrome-ec-4368dcfb32942740dd11188de6a8658cdd448a5a.tar.gz
util: ec3po: Add support to disconnect from UART.
This commit adds support for the EC-3PO interpreter to disconnect and reconnect to the EC UART. This is handy so that other tools that need to use the raw UART directly can do so without interference from EC-3PO. BUG=chromium:571170 BRANCH=None TEST=For both enhanced and non-enhanced EC images, issue disconnect command and verify that no debug prints were emitted and no commands were sent to the EC. Then issue reconnect and verify that the console works as normal and that no commands were buffered. TEST=./util/ec3po/run_tests.sh Change-Id: Ic572e25d24d5e45fbe2eeb84a534235c4ec98d38 Signed-off-by: Aseda Aboagye <aaboagye@google.com> Reviewed-on: https://chromium-review.googlesource.com/321084 Commit-Ready: Aseda Aboagye <aaboagye@chromium.org> Tested-by: Aseda Aboagye <aaboagye@chromium.org> Reviewed-by: Randall Spangler <rspangler@chromium.org>
-rw-r--r--util/ec3po/interpreter.py68
-rwxr-xr-xutil/ec3po/interpreter_unittest.py108
2 files changed, 171 insertions, 5 deletions
diff --git a/util/ec3po/interpreter.py b/util/ec3po/interpreter.py
index d8ebc6f536..23e896c640 100644
--- a/util/ec3po/interpreter.py
+++ b/util/ec3po/interpreter.py
@@ -46,7 +46,8 @@ class Interpreter(object):
Attributes:
logger: A logger for this module.
- ec_uart_pty: A string representing the EC UART to connect to.
+ ec_uart_pty: An opened file object to the raw EC UART PTY.
+ ec_uart_pty_name: A string containing the name of the raw EC UART PTY.
cmd_pipe: A multiprocessing.Connection object which represents the
Interpreter side of the command pipe. This must be a bidirectional pipe.
Commands and responses will utilize this pipe.
@@ -69,6 +70,8 @@ class Interpreter(object):
and is changed depending on the result of an interrogation.
interrogating: A boolean indicating if we are in the middle of interrogating
the EC.
+ connected: A boolean indicating if the interpreter is actually connected to
+ the UART and listening.
"""
def __init__(self, ec_uart_pty, cmd_pipe, dbg_pipe, log_level=logging.INFO):
"""Intializes an Interpreter object with the provided args.
@@ -89,6 +92,7 @@ class Interpreter(object):
logger = logging.getLogger('EC3PO.Interpreter')
self.logger = LoggerAdapter(logger, {'pty': ec_uart_pty})
self.ec_uart_pty = open(ec_uart_pty, 'a+')
+ self.ec_uart_pty_name = ec_uart_pty
self.cmd_pipe = cmd_pipe
self.dbg_pipe = dbg_pipe
self.cmd_retries = COMMAND_RETRIES
@@ -99,6 +103,7 @@ class Interpreter(object):
self.last_cmd = ''
self.enhanced_ec = False
self.interrogating = False
+ self.connected = True
def __str__(self):
"""Show internal state of the Interpreter object.
@@ -129,8 +134,10 @@ class Interpreter(object):
"""
self.ec_cmd_queue.put(command)
self.logger.debug('Commands now in queue: %d', self.ec_cmd_queue.qsize())
+
# Add the EC UART as an output to be serviced.
- self.outputs.append(self.ec_uart_pty)
+ if self.connected and self.ec_uart_pty not in self.outputs:
+ self.outputs.append(self.ec_uart_pty)
def PackCommand(self, raw_cmd):
r"""Packs a command for use with error checking.
@@ -176,6 +183,53 @@ class Interpreter(object):
Args:
command: A string representing the command sent by the user.
"""
+ if command == "disconnect":
+ if self.connected:
+ self.logger.debug('UART disconnect request.')
+ # Drop all pending commands if any.
+ while not self.ec_cmd_queue.empty():
+ c = self.ec_cmd_queue.get()
+ self.logger.debug('dropped: \'%s\'', c)
+ if self.enhanced_ec:
+ # Reset retry state.
+ self.cmd_retries = COMMAND_RETRIES
+ self.last_cmd = ''
+ # Get the UART that the interpreter is attached to.
+ fd = self.ec_uart_pty
+ self.logger.debug('fd: %r', fd)
+ # Remove the descriptor from the inputs and outputs.
+ self.inputs.remove(fd)
+ if fd in self.outputs:
+ self.outputs.remove(fd)
+ self.logger.debug('Removed fd. Remaining inputs: %r', self.inputs)
+ # Close the file.
+ fd.close()
+ # Mark the interpreter as disconnected now.
+ self.connected = False
+ self.logger.debug('Disconnected from %s.', self.ec_uart_pty_name)
+ return
+
+ elif command == "reconnect":
+ if not self.connected:
+ self.logger.debug('UART reconnect request.')
+ # Reopen the PTY.
+ fd = open(self.ec_uart_pty_name, 'a+')
+ self.logger.debug('fd: %r', fd)
+ self.ec_uart_pty = fd
+ # Add the descriptor to the inputs.
+ self.inputs.append(fd)
+ self.logger.debug('fd added. curr inputs: %r', self.inputs)
+ # Mark the interpreter as connected now.
+ self.connected = True
+ self.logger.debug('Connected to %s.', self.ec_uart_pty_name)
+ return
+
+ # Ignore any other commands while in the disconnected state.
+ self.logger.debug('command: \'%s\'', command)
+ if not self.connected:
+ self.logger.debug('Ignoring command because currently disconnected.')
+ return
+
# Remove leading and trailing spaces only if this is an enhanced EC image.
# For non-enhanced EC images, commands will be single characters at a time
# and can be spaces.
@@ -199,7 +253,6 @@ class Interpreter(object):
# TODO(aaboagye): Make a dict of commands and keys and eventually,
# handle partial matching based on unique prefixes.
- self.logger.debug('command: \'%s\'', command)
self.EnqueueCmd(command)
def HandleCmdRetries(self):
@@ -245,8 +298,13 @@ class Interpreter(object):
self.last_cmd = cmd
# Reset the retry count.
self.cmd_retries = COMMAND_RETRIES
- # Remove the EC UART from the writers while we wait for a response.
- self.outputs.remove(self.ec_uart_pty)
+
+ # If no command is pending to be sent, then we can remove the EC UART from
+ # writers. Might need better checking for command retry logic in here.
+ if self.ec_cmd_queue.empty():
+ # Remove the EC UART from the writers while we wait for a response.
+ self.logger.debug('Removing EC UART from writers.')
+ self.outputs.remove(self.ec_uart_pty)
def HandleECData(self):
"""Handle any debug prints from the EC."""
diff --git a/util/ec3po/interpreter_unittest.py b/util/ec3po/interpreter_unittest.py
index f74bfe051c..c616394f8f 100755
--- a/util/ec3po/interpreter_unittest.py
+++ b/util/ec3po/interpreter_unittest.py
@@ -251,6 +251,114 @@ class TestEnhancedECBehaviour(unittest.TestCase):
'enhanced_ec should still be False.')
+class TestUARTDisconnection(unittest.TestCase):
+ """Test case to verify interpreter disconnection/reconnection."""
+ def setUp(self):
+ """Setup the test harness."""
+ # Setup logging with a timestamp, the module, and the log level.
+ logging.basicConfig(level=logging.DEBUG,
+ format=('%(asctime)s - %(module)s -'
+ ' %(levelname)s - %(message)s'))
+
+ # Create a tempfile that would represent the EC UART PTY.
+ self.tempfile = tempfile.NamedTemporaryFile()
+
+ # Create the pipes that the interpreter will use.
+ self.cmd_pipe_user, self.cmd_pipe_itpr = multiprocessing.Pipe()
+ self.dbg_pipe_user, self.dbg_pipe_itpr = multiprocessing.Pipe(duplex=False)
+
+ # Mock the open() function so we can inspect reads/writes to the EC.
+ self.ec_uart_pty = mock.mock_open()
+ with mock.patch('__builtin__.open', self.ec_uart_pty):
+ # Create an interpreter.
+ self.itpr = interpreter.Interpreter(self.tempfile.name,
+ self.cmd_pipe_itpr,
+ self.dbg_pipe_itpr,
+ log_level=logging.DEBUG)
+
+ # First, check that interpreter is initialized to connected.
+ self.assertTrue(self.itpr.connected, ('The interpreter should be'
+ ' initialized in a connected state'))
+
+ def test_DisconnectStopsECTraffic(self):
+ """Verify that when in disconnected state, no debug prints are sent."""
+ # Let's send a disconnect command through the command pipe.
+ self.cmd_pipe_user.send('disconnect')
+ self.itpr.HandleUserData()
+
+ # Verify interpreter is disconnected from EC.
+ self.assertFalse(self.itpr.connected, ('The interpreter should be'
+ 'disconnected.'))
+ # Verify that the EC UART is no longer a member of the inputs. The
+ # interpreter will never pull data from the EC if it's not a member of the
+ # inputs list.
+ self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs)
+
+ def test_CommandsDroppedWhenDisconnected(self):
+ """Verify that when in disconnected state, commands are dropped."""
+ # Send a command, followed by 'disconnect'.
+ self.cmd_pipe_user.send('taskinfo')
+ self.itpr.HandleUserData()
+ self.cmd_pipe_user.send('disconnect')
+ self.itpr.HandleUserData()
+
+ # Verify interpreter is disconnected from EC.
+ self.assertFalse(self.itpr.connected, ('The interpreter should be'
+ 'disconnected.'))
+ # Verify that the EC UART is no longer a member of the inputs nor outputs.
+ self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs)
+ self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs)
+
+ # Have the user send a few more commands in the disconnected state.
+ command = 'help\n'
+ for char in command:
+ self.cmd_pipe_user.send(char)
+ self.itpr.HandleUserData()
+
+ # The command queue should be empty.
+ self.assertEqual(0, self.itpr.ec_cmd_queue.qsize())
+
+ # Now send the reconnect command.
+ self.cmd_pipe_user.send('reconnect')
+ with mock.patch('__builtin__.open', mock.mock_open()):
+ self.itpr.HandleUserData()
+
+ # Verify interpreter is connected.
+ self.assertTrue(self.itpr.connected)
+ # Verify that EC UART is a member of the inputs.
+ self.assertTrue(self.itpr.ec_uart_pty in self.itpr.inputs)
+ # Since no command was sent after reconnection, verify that the EC UART is
+ # not a member of the outputs.
+ self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs)
+
+ def test_ReconnectAllowsECTraffic(self):
+ """Verify that when connected, EC UART traffic is allowed."""
+ # Let's send a disconnect command through the command pipe.
+ self.cmd_pipe_user.send('disconnect')
+ self.itpr.HandleUserData()
+
+ # Verify interpreter is disconnected.
+ self.assertFalse(self.itpr.connected, ('The interpreter should be'
+ 'disconnected.'))
+ # Verify that the EC UART is no longer a member of the inputs nor outputs.
+ self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs)
+ self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs)
+
+ # Issue reconnect command through the command pipe.
+ self.cmd_pipe_user.send('reconnect')
+ with mock.patch('__builtin__.open', mock.mock_open()):
+ self.itpr.HandleUserData()
+
+ # Verify interpreter is connected.
+ self.assertTrue(self.itpr.connected, ('The interpreter should be'
+ 'connected.'))
+ # Verify that the EC UART is now a member of the inputs.
+ self.assertTrue(self.itpr.ec_uart_pty in self.itpr.inputs)
+ # Since we have issued no commands during the disconnected state, no
+ # commands are pending and therefore the PTY should not be added to the
+ # outputs.
+ self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs)
+
if __name__ == '__main__':
unittest.main()