summaryrefslogtreecommitdiff
path: root/util/ec3po/interpreter_unittest.py
blob: fe4d43c351265d8ef5b3a3e9039244a8de32e3d4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
#!/usr/bin/env python
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Unit tests for the EC-3PO interpreter."""

# Note: This is a py2/3 compatible file.

from __future__ import print_function

import logging
import mock
import tempfile
import unittest

import six

from ec3po import interpreter
from ec3po import threadproc_shim


def GetBuiltins(func):
  if six.PY2:
    return '__builtin__.' + func
  return 'builtins.' + func


class TestEnhancedECBehaviour(unittest.TestCase):
  """Test case to verify all enhanced EC interpretation tasks."""
  def setUp(self):
    """Setup the test harness."""
    # Setup logging with a timestamp, the module, and the log level.
    logging.basicConfig(level=logging.DEBUG,
                        format=('%(asctime)s - %(module)s -'
                                ' %(levelname)s - %(message)s'))

    # Create a tempfile that would represent the EC UART PTY.
    self.tempfile = tempfile.NamedTemporaryFile()

    # Create the pipes that the interpreter will use.
    self.cmd_pipe_user, self.cmd_pipe_itpr = threadproc_shim.Pipe()
    self.dbg_pipe_user, self.dbg_pipe_itpr = threadproc_shim.Pipe(duplex=False)

    # Mock the open() function so we can inspect reads/writes to the EC.
    self.ec_uart_pty = mock.mock_open()

    with mock.patch(GetBuiltins('open'), self.ec_uart_pty):
      # Create an interpreter.
      self.itpr = interpreter.Interpreter(self.tempfile.name,
                                          self.cmd_pipe_itpr,
                                          self.dbg_pipe_itpr,
                                          log_level=logging.DEBUG,
                                          name="EC")

  @mock.patch('ec3po.interpreter.os')
  def test_HandlingCommandsThatProduceNoOutput(self, mock_os):
    """Verify that the Interpreter correctly handles non-output commands.

    Args:
      mock_os: MagicMock object replacing the 'os' module for this test
        case.
    """
    # The interpreter init should open the EC UART PTY.
    expected_ec_calls = [mock.call(self.tempfile.name, 'r+b', buffering=0)]
    # Have a command come in the command pipe.  The first command will be an
    # interrogation to determine if the EC is enhanced or not.
    self.cmd_pipe_user.send(interpreter.EC_SYN)
    self.itpr.HandleUserData()
    # At this point, the command should be queued up waiting to be sent, so
    # let's actually send it to the EC.
    self.itpr.SendCmdToEC()
    expected_ec_calls.extend([mock.call().write(interpreter.EC_SYN),
                              mock.call().flush()])
    # Now, assume that the EC sends only 1 response back of EC_ACK.
    mock_os.read.side_effect = [interpreter.EC_ACK]
    # When reading the EC, the interpreter will call file.fileno() to pass to
    # os.read().
    expected_ec_calls.append(mock.call().fileno())
    # Simulate the response.
    self.itpr.HandleECData()

    # Now that the interrogation was complete, it's time to send down the real
    # command.
    test_cmd = b'chan save'
    # Send the test command down the pipe.
    self.cmd_pipe_user.send(test_cmd)
    self.itpr.HandleUserData()
    self.itpr.SendCmdToEC()
    # Since the EC image is enhanced, we should have sent a packed command.
    expected_ec_calls.append(mock.call().write(self.itpr.PackCommand(test_cmd)))
    expected_ec_calls.append(mock.call().flush())

    # Now that the first command was sent, we should send another command which
    # produces no output.  The console would send another interrogation.
    self.cmd_pipe_user.send(interpreter.EC_SYN)
    self.itpr.HandleUserData()
    self.itpr.SendCmdToEC()
    expected_ec_calls.extend([mock.call().write(interpreter.EC_SYN),
                              mock.call().flush()])
    # Again, assume that the EC sends only 1 response back of EC_ACK.
    mock_os.read.side_effect = [interpreter.EC_ACK]
    # When reading the EC, the interpreter will call file.fileno() to pass to
    # os.read().
    expected_ec_calls.append(mock.call().fileno())
    # Simulate the response.
    self.itpr.HandleECData()

    # Now send the second test command.
    test_cmd = b'chan 0'
    self.cmd_pipe_user.send(test_cmd)
    self.itpr.HandleUserData()
    self.itpr.SendCmdToEC()
    # Since the EC image is enhanced, we should have sent a packed command.
    expected_ec_calls.append(mock.call().write(self.itpr.PackCommand(test_cmd)))
    expected_ec_calls.append(mock.call().flush())

    # Finally, verify that the appropriate writes were actually sent to the EC.
    self.ec_uart_pty.assert_has_calls(expected_ec_calls)

  @mock.patch('ec3po.interpreter.os')
  def test_CommandRetryingOnError(self, mock_os):
    """Verify that commands are retried if an error is encountered.

    Args:
      mock_os: MagicMock object replacing the 'os' module for this test
        case.
    """
    # The interpreter init should open the EC UART PTY.
    expected_ec_calls = [mock.call(self.tempfile.name, 'r+b', buffering=0)]
    # Have a command come in the command pipe.  The first command will be an
    # interrogation to determine if the EC is enhanced or not.
    self.cmd_pipe_user.send(interpreter.EC_SYN)
    self.itpr.HandleUserData()
    # At this point, the command should be queued up waiting to be sent, so
    # let's actually send it to the EC.
    self.itpr.SendCmdToEC()
    expected_ec_calls.extend([mock.call().write(interpreter.EC_SYN),
                              mock.call().flush()])
    # Now, assume that the EC sends only 1 response back of EC_ACK.
    mock_os.read.side_effect = [interpreter.EC_ACK]
    # When reading the EC, the interpreter will call file.fileno() to pass to
    # os.read().
    expected_ec_calls.append(mock.call().fileno())
    # Simulate the response.
    self.itpr.HandleECData()

    # Let's send a command that is received on the EC-side with an error.
    test_cmd = b'accelinfo'
    self.cmd_pipe_user.send(test_cmd)
    self.itpr.HandleUserData()
    self.itpr.SendCmdToEC()
    packed_cmd = self.itpr.PackCommand(test_cmd)
    expected_ec_calls.extend([mock.call().write(packed_cmd),
                              mock.call().flush()])
    # Have the EC return the error string twice.
    mock_os.read.side_effect = [b'&&EE', b'&&EE']
    for i in range(2):
      # When reading the EC, the interpreter will call file.fileno() to pass to
      # os.read().
      expected_ec_calls.append(mock.call().fileno())
      # Simulate the response.
      self.itpr.HandleECData()

      # Since an error was received, the EC should attempt to retry the command.
      expected_ec_calls.extend([mock.call().write(packed_cmd),
                                mock.call().flush()])
      # Verify that the retry count was decremented.
      self.assertEqual(interpreter.COMMAND_RETRIES-i-1, self.itpr.cmd_retries,
                       'Unexpected cmd_remaining count.')
      # Actually retry the command.
      self.itpr.SendCmdToEC()

    # Now assume that the last one goes through with no trouble.
    expected_ec_calls.extend([mock.call().write(packed_cmd),
                              mock.call().flush()])
    self.itpr.SendCmdToEC()

    # Verify all the calls.
    self.ec_uart_pty.assert_has_calls(expected_ec_calls)

  def test_PackCommandsForEnhancedEC(self):
    """Verify that the interpreter packs commands for enhanced EC images."""
    # Assume current EC image is enhanced.
    self.itpr.enhanced_ec = True
    # Receive a command from the user.
    test_cmd = b'gettime'
    self.cmd_pipe_user.send(test_cmd)
    # Mock out PackCommand to see if it was called.
    self.itpr.PackCommand = mock.MagicMock()
    # Have the interpreter handle the command.
    self.itpr.HandleUserData()
    # Verify that PackCommand() was called.
    self.itpr.PackCommand.assert_called_once_with(test_cmd)

  def test_DontPackCommandsForNonEnhancedEC(self):
    """Verify the interpreter doesn't pack commands for non-enhanced images."""
    # Assume current EC image is not enhanced.
    self.itpr.enhanced_ec = False
    # Receive a command from the user.
    test_cmd = b'gettime'
    self.cmd_pipe_user.send(test_cmd)
    # Mock out PackCommand to see if it was called.
    self.itpr.PackCommand = mock.MagicMock()
    # Have the interpreter handle the command.
    self.itpr.HandleUserData()
    # Verify that PackCommand() was called.
    self.itpr.PackCommand.assert_not_called()

  @mock.patch('ec3po.interpreter.os')
  def test_KeepingTrackOfInterrogation(self, mock_os):
    """Verify that the interpreter can track the state of the interrogation.

    Args:
      mock_os: MagicMock object replacing the 'os' module. for this test
        case.
    """
    # Upon init, the interpreter should assume that the current EC image is not
    # enhanced.
    self.assertFalse(self.itpr.enhanced_ec, msg=('State of enhanced_ec upon'
                                                 ' init is not False.'))

    # Assume an interrogation request comes in from the user.
    self.cmd_pipe_user.send(interpreter.EC_SYN)
    self.itpr.HandleUserData()

    # Verify the state is now within an interrogation.
    self.assertTrue(self.itpr.interrogating, 'interrogating should be True')
    # The state of enhanced_ec should not be changed yet because we haven't
    # received a valid response yet.
    self.assertFalse(self.itpr.enhanced_ec, msg=('State of enhanced_ec is '
                                                 'not False.'))

    # Assume that the EC responds with an EC_ACK.
    mock_os.read.side_effect = [interpreter.EC_ACK]
    self.itpr.HandleECData()

    # Now, the interrogation should be complete and we should know that the
    # current EC image is enhanced.
    self.assertFalse(self.itpr.interrogating, msg=('interrogating should be '
                                                   'False'))
    self.assertTrue(self.itpr.enhanced_ec, msg='enhanced_ec sholud be True')

    # Now let's perform another interrogation, but pretend that the EC ignores
    # it.
    self.cmd_pipe_user.send(interpreter.EC_SYN)
    self.itpr.HandleUserData()

    # Verify interrogating state.
    self.assertTrue(self.itpr.interrogating, 'interrogating sholud be True')
    # We should assume that the image is not enhanced until we get the valid
    # response.
    self.assertFalse(self.itpr.enhanced_ec, 'enhanced_ec should be False now.')

    # Let's pretend that we get a random debug print.  This should clear the
    # interrogating flag.
    mock_os.read.side_effect = [b'[1660.593076 HC 0x103]']
    self.itpr.HandleECData()

    # Verify that interrogating flag is cleared and enhanced_ec is still False.
    self.assertFalse(self.itpr.interrogating, 'interrogating should be False.')
    self.assertFalse(self.itpr.enhanced_ec,
                     'enhanced_ec should still be False.')


class TestUARTDisconnection(unittest.TestCase):
  """Test case to verify interpreter disconnection/reconnection."""
  def setUp(self):
    """Setup the test harness."""
    # Setup logging with a timestamp, the module, and the log level.
    logging.basicConfig(level=logging.DEBUG,
                        format=('%(asctime)s - %(module)s -'
                                ' %(levelname)s - %(message)s'))

    # Create a tempfile that would represent the EC UART PTY.
    self.tempfile = tempfile.NamedTemporaryFile()

    # Create the pipes that the interpreter will use.
    self.cmd_pipe_user, self.cmd_pipe_itpr = threadproc_shim.Pipe()
    self.dbg_pipe_user, self.dbg_pipe_itpr = threadproc_shim.Pipe(duplex=False)

    # Mock the open() function so we can inspect reads/writes to the EC.
    self.ec_uart_pty = mock.mock_open()

    with mock.patch(GetBuiltins('open'), self.ec_uart_pty):
      # Create an interpreter.
      self.itpr = interpreter.Interpreter(self.tempfile.name,
                                          self.cmd_pipe_itpr,
                                          self.dbg_pipe_itpr,
                                          log_level=logging.DEBUG,
                                          name="EC")

    # First, check that interpreter is initialized to connected.
    self.assertTrue(self.itpr.connected, ('The interpreter should be'
                                          ' initialized in a connected state'))

  def test_DisconnectStopsECTraffic(self):
    """Verify that when in disconnected state, no debug prints are sent."""
    # Let's send a disconnect command through the command pipe.
    self.cmd_pipe_user.send(b'disconnect')
    self.itpr.HandleUserData()

    # Verify interpreter is disconnected from EC.
    self.assertFalse(self.itpr.connected, ('The interpreter should be'
                                           'disconnected.'))
    # Verify that the EC UART is no longer a member of the inputs.  The
    # interpreter will never pull data from the EC if it's not a member of the
    # inputs list.
    self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs)

  def test_CommandsDroppedWhenDisconnected(self):
    """Verify that when in disconnected state, commands are dropped."""
    # Send a command, followed by 'disconnect'.
    self.cmd_pipe_user.send(b'taskinfo')
    self.itpr.HandleUserData()
    self.cmd_pipe_user.send(b'disconnect')
    self.itpr.HandleUserData()

    # Verify interpreter is disconnected from EC.
    self.assertFalse(self.itpr.connected, ('The interpreter should be'
                                           'disconnected.'))
    # Verify that the EC UART is no longer a member of the inputs nor outputs.
    self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs)
    self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs)

    # Have the user send a few more commands in the disconnected state.
    command = 'help\n'
    for char in command:
      self.cmd_pipe_user.send(char.encode('utf-8'))
      self.itpr.HandleUserData()

    # The command queue should be empty.
    self.assertEqual(0, self.itpr.ec_cmd_queue.qsize())

    # Now send the reconnect command.
    self.cmd_pipe_user.send(b'reconnect')

    with mock.patch(GetBuiltins('open'), mock.mock_open()):
      self.itpr.HandleUserData()

    # Verify interpreter is connected.
    self.assertTrue(self.itpr.connected)
    # Verify that EC UART is a member of the inputs.
    self.assertTrue(self.itpr.ec_uart_pty in self.itpr.inputs)
    # Since no command was sent after reconnection, verify that the EC UART is
    # not a member of the outputs.
    self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs)

  def test_ReconnectAllowsECTraffic(self):
    """Verify that when connected, EC UART traffic is allowed."""
    # Let's send a disconnect command through the command pipe.
    self.cmd_pipe_user.send(b'disconnect')
    self.itpr.HandleUserData()

    # Verify interpreter is disconnected.
    self.assertFalse(self.itpr.connected, ('The interpreter should be'
                                           'disconnected.'))
    # Verify that the EC UART is no longer a member of the inputs nor outputs.
    self.assertFalse(self.itpr.ec_uart_pty in self.itpr.inputs)
    self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs)

    # Issue reconnect command through the command pipe.
    self.cmd_pipe_user.send(b'reconnect')

    with mock.patch(GetBuiltins('open'), mock.mock_open()):
      self.itpr.HandleUserData()

    # Verify interpreter is connected.
    self.assertTrue(self.itpr.connected, ('The interpreter should be'
                                          'connected.'))
    # Verify that the EC UART is now a member of the inputs.
    self.assertTrue(self.itpr.ec_uart_pty in self.itpr.inputs)
    # Since we have issued no commands during the disconnected state, no
    # commands are pending and therefore the PTY should not be added to the
    # outputs.
    self.assertFalse(self.itpr.ec_uart_pty in self.itpr.outputs)


if __name__ == '__main__':
  unittest.main()