summaryrefslogtreecommitdiff
path: root/util/run_host_test
blob: 5519c53e29b5d079e8d33ceace128002dba29f6b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2020 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Wrapper that runs a host test. Handles timeout and stopping the emulator."""

from __future__ import print_function

import argparse
import enum
import io
import os
import pathlib
import select
import subprocess
import sys
import time


class TestResult(enum.Enum):
  """An Enum representing the result of running a test."""
  SUCCESS = enum.auto()
  FAIL = enum.auto()
  TIMEOUT = enum.auto()
  UNEXPECTED_TERMINATION = enum.auto()

  @property
  def exit_code(self):
    if self is TestResult.SUCCESS:
      return 0
    return 1

  @property
  def reason(self):
    return {
        TestResult.SUCCESS: 'passed',
        TestResult.FAIL: 'failed',
        TestResult.TIMEOUT: 'timed out',
        TestResult.UNEXPECTED_TERMINATION: 'terminated unexpectedly',
    }[self]


def run_test(path, timeout=10):
  start_time = time.monotonic()
  env = dict(os.environ)
  env['ASAN_OPTIONS'] = 'log_path=stderr'

  proc = subprocess.Popen(
      [path],
      bufsize=0,
      stdin=subprocess.PIPE,
      stdout=subprocess.PIPE,
      env=env)

  # Put the output pipe in non-blocking mode. We will then select(2)
  # on the pipe to know when we have bytes to process.
  os.set_blocking(proc.stdout.fileno(), False)

  try:
    output_buffer = io.BytesIO()
    while True:
      select_timeout = timeout - (time.monotonic() - start_time)
      if select_timeout <= 0:
        return TestResult.TIMEOUT, output_buffer.getvalue()

      readable, _, _ = select.select([proc.stdout], [], [], select_timeout)

      if not readable:
        # Indicates that select(2) timed out.
        return TestResult.TIMEOUT, output_buffer.getvalue()

      output_buffer.write(proc.stdout.read())
      output_log = output_buffer.getvalue()

      if b'Pass!' in output_log:
        return TestResult.SUCCESS, output_log
      if b'Fail!' in output_log:
        return TestResult.FAIL, output_log
      if proc.poll():
        return TestResult.UNEXPECTED_TERMINATION, output_log
  finally:
    if not proc.poll():
      proc.kill()


def host_test(test_name):
  exec_path = pathlib.Path('build', 'host', test_name, f'{test_name}.exe')
  if not exec_path.is_file():
    raise argparse.ArgumentTypeError(f'No test named {test_name} exists!')
  return exec_path


def parse_options(argv):
  parser = argparse.ArgumentParser()
  parser.add_argument('-t', '--timeout', type=float, default=10,
                      help='Timeout to kill test after.')
  parser.add_argument('test_name', type=host_test)
  return parser.parse_args(argv)


def main(argv):
  opts = parse_options(argv)

  start_time = time.monotonic()
  result, output = run_test(opts.test_name, timeout=opts.timeout)
  elapsed_time = time.monotonic() - start_time

  print('{} {}! ({:.3f} seconds)'.format(
      opts.test_name, result.reason, elapsed_time),
        file=sys.stderr)

  if result is not TestResult.SUCCESS:
    print('====== Emulator output ======', file=sys.stderr)
    print(output.decode('utf-8'), file=sys.stderr)
    print('=============================', file=sys.stderr)
  return result.exit_code


if __name__ == '__main__':
  sys.exit(main(sys.argv[1:]))