summaryrefslogtreecommitdiff
path: root/util/run_host_test
blob: 15013d91b69b0543e9d087c357a446ab9259757f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2020 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Wrapper that runs a host test. Handles timeout and stopping the emulator."""

import argparse
import enum
import io
import os
import pathlib
import select
import subprocess
import sys
import time


class TestResult(enum.Enum):
  """An Enum representing the result of running a test."""
  SUCCESS = 0
  FAIL = 1
  TIMEOUT = 2
  UNEXPECTED_TERMINATION = 3

  @property
  def reason(self):
    return {
        TestResult.SUCCESS: 'passed',
        TestResult.FAIL: 'failed',
        TestResult.TIMEOUT: 'timed out',
        TestResult.UNEXPECTED_TERMINATION: 'terminated unexpectedly',
    }[self]


def run_test(path, timeout=10):
  start_time = time.monotonic()
  env = dict(os.environ)
  env['ASAN_OPTIONS'] = 'log_path=stderr'

  proc = subprocess.Popen(
      [path],
      bufsize=0,
      stdin=subprocess.PIPE,
      stdout=subprocess.PIPE,
      env=env,
      encoding='utf-8',
      errors='replace',
  )

  # Put the output pipe in non-blocking mode. We will then select(2)
  # on the pipe to know when we have bytes to process.
  os.set_blocking(proc.stdout.fileno(), False)

  try:
    output_buffer = io.StringIO()
    while True:
      select_timeout = timeout - (time.monotonic() - start_time)
      if select_timeout <= 0:
        return TestResult.TIMEOUT, output_buffer.getvalue()

      readable, _, _ = select.select([proc.stdout], [], [], select_timeout)

      if not readable:
        # Indicates that select(2) timed out.
        return TestResult.TIMEOUT, output_buffer.getvalue()

      output_buffer.write(proc.stdout.read())
      output_log = output_buffer.getvalue()

      if 'Pass!' in output_log:
        return TestResult.SUCCESS, output_log
      if 'Fail!' in output_log:
        return TestResult.FAIL, output_log
      if proc.poll():
        return TestResult.UNEXPECTED_TERMINATION, output_log
  finally:
    # Check if the process has exited. If not, send it a SIGTERM, wait for it
    # to exit, and if it times out, kill the process directly.
    if not proc.poll():
      try:
        proc.terminate()
        proc.wait(timeout)
      except subprocess.TimeoutExpired:
        proc.kill()


def parse_options(argv):
  parser = argparse.ArgumentParser()
  parser.add_argument('-t', '--timeout', type=float, default=60,
                      help='Timeout to kill test after.')
  parser.add_argument('--coverage', action='store_const', const='coverage',
                      default='host', dest='test_target',
                      help='Flag if this is a code coverage test.')
  parser.add_argument('--verbose', '-v', action='store_true',
                      help='Dump emulator output always, even if successful.')
  parser.add_argument('test_name', type=str)
  return parser.parse_args(argv)


def main(argv):
  opts = parse_options(argv)

  # Tests will be located in build/host, unless the --coverage flag was
  # provided, in which case they will be in build/coverage.
  exec_path = pathlib.Path('build', opts.test_target, opts.test_name,
                           f'{opts.test_name}.exe')
  if not exec_path.is_file():
    print(f'No test named {opts.test_name} exists!')
    return 1

  start_time = time.monotonic()
  result, output = run_test(exec_path, timeout=opts.timeout)
  elapsed_time = time.monotonic() - start_time

  print('{} {}! ({:.3f} seconds)'.format(
      opts.test_name, result.reason, elapsed_time),
        file=sys.stderr)

  if result is not TestResult.SUCCESS or opts.verbose:
    print('====== Emulator output ======', file=sys.stderr)
    print(output, file=sys.stderr)
    print('=============================', file=sys.stderr)
  return result.value


if __name__ == '__main__':
  sys.exit(main(sys.argv[1:]))