From 84b0f9330e78bc7bb5d077c10ea403f66617ea72 Mon Sep 17 00:00:00 2001 From: Jack Rosenthal Date: Mon, 16 Mar 2020 18:03:10 -0600 Subject: util: rewrite the host test runner in Python 3, and drop pexpect This is a total-rewrite of the host test runner, in Python 3, and no longer uses the pexpect library (simply because we don't need to open a can of Pringles with a crowbar: our usage can be handled with some simple IPC and the subprocess library). BUG=chromium:1031705,chromium:1061923 BRANCH=none TEST=host tests pass, manually-created failing tests with "Fail!", premature ending, or timeout fail appropriately. Signed-off-by: Jack Rosenthal Change-Id: I4017da877d6a34c1031b261fc41f8334dae26c00 Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/ec/+/2106862 Reviewed-by: Chris McDonald Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/ec/+/3105736 Commit-Queue: Daisuke Nojiri Tested-by: Daisuke Nojiri --- util/run_host_test | 177 +++++++++++++++++++++++++++++++---------------------- 1 file changed, 105 insertions(+), 72 deletions(-) diff --git a/util/run_host_test b/util/run_host_test index 395afd4620..5519c53e29 100755 --- a/util/run_host_test +++ b/util/run_host_test @@ -1,89 +1,122 @@ -#!/usr/bin/env python - -# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# Copyright 2020 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. -from cStringIO import StringIO +"""Wrapper that runs a host test. Handles timeout and stopping the emulator.""" + +from __future__ import print_function + +import argparse +import enum +import io import os -import pexpect -import signal +import pathlib +import select import subprocess import sys import time -TIMEOUT=10 -RESULT_ID_TIMEOUT = 0 -RESULT_ID_PASS = 1 -RESULT_ID_FAIL = 2 -RESULT_ID_EOF = 3 +class TestResult(enum.Enum): + """An Enum representing the result of running a test.""" + SUCCESS = enum.auto() + FAIL = enum.auto() + TIMEOUT = enum.auto() + UNEXPECTED_TERMINATION = enum.auto() -EXPECT_LIST = [pexpect.TIMEOUT, 'Pass!', 'Fail!', pexpect.EOF] + @property + def exit_code(self): + if self is TestResult.SUCCESS: + return 0 + return 1 -PS_ARGS = ['ps', '--no-headers', '-o', 'stat', '--pid'] + @property + def reason(self): + return { + TestResult.SUCCESS: 'passed', + TestResult.FAIL: 'failed', + TestResult.TIMEOUT: 'timed out', + TestResult.UNEXPECTED_TERMINATION: 'terminated unexpectedly', + }[self] -class Tee(object): - def __init__(self, target): - self._target = target - def write(self, data): - sys.stdout.write(data) - self._target.write(data) +def run_test(path, timeout=10): + start_time = time.monotonic() + env = dict(os.environ) + env['ASAN_OPTIONS'] = 'log_path=stderr' - def flush(self): - sys.stdout.flush() - self._target.flush() + proc = subprocess.Popen( + [path], + bufsize=0, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + env=env) + + # Put the output pipe in non-blocking mode. We will then select(2) + # on the pipe to know when we have bytes to process. + os.set_blocking(proc.stdout.fileno(), False) -def RunOnce(test_name, log, env): - child = pexpect.spawn('build/host/{0}/{0}.exe'.format(test_name), - timeout=TIMEOUT, env=env) - child.logfile = log try: - return child.expect(EXPECT_LIST) + output_buffer = io.BytesIO() + while True: + select_timeout = timeout - (time.monotonic() - start_time) + if select_timeout <= 0: + return TestResult.TIMEOUT, output_buffer.getvalue() + + readable, _, _ = select.select([proc.stdout], [], [], select_timeout) + + if not readable: + # Indicates that select(2) timed out. + return TestResult.TIMEOUT, output_buffer.getvalue() + + output_buffer.write(proc.stdout.read()) + output_log = output_buffer.getvalue() + + if b'Pass!' in output_log: + return TestResult.SUCCESS, output_log + if b'Fail!' in output_log: + return TestResult.FAIL, output_log + if proc.poll(): + return TestResult.UNEXPECTED_TERMINATION, output_log finally: - if child.isalive(): - ps_cmd = PS_ARGS + ['%d' % (child.pid)] - ps_stat = subprocess.check_output(ps_cmd).strip() - log.write('\n*** test %s process %d in state %s ***\n' % - (test_name, child.pid, ps_stat)) - child.kill(signal.SIGTERM) - child.read() - -# ASAN_OPTIONS environment variable is only required when test is built with -# ASan, but is otherwise harmless. -env = dict(os.environ) -env["ASAN_OPTIONS"] = "log_path=stderr" - -log = StringIO() -tee_log = Tee(log) -test_name = sys.argv[1] -start_time = time.time() - -result_id = RunOnce(test_name, tee_log, env) - -elapsed_time = time.time() - start_time -if result_id == RESULT_ID_TIMEOUT: - sys.stderr.write('Test %s timed out after %d seconds!\n' % - (test_name, TIMEOUT)) - failed = True -elif result_id == RESULT_ID_PASS: - sys.stderr.write('Test %s passed! (%.3f seconds)\n' % - (test_name, elapsed_time)) - failed = False -elif result_id == RESULT_ID_FAIL: - sys.stderr.write('Test %s failed! (%.3f seconds)\n' % - (test_name, elapsed_time)) - failed = True -elif result_id == RESULT_ID_EOF: - sys.stderr.write('Test %s terminated unexpectedly! (%.3f seconds)\n' % - (test_name, elapsed_time)) - failed = True - -if failed: - sys.stderr.write('\n====== Emulator output ======\n') - sys.stderr.write(log.getvalue()) - sys.stderr.write('\n=============================\n') - sys.exit(1) -else: - sys.exit(0) + if not proc.poll(): + proc.kill() + + +def host_test(test_name): + exec_path = pathlib.Path('build', 'host', test_name, f'{test_name}.exe') + if not exec_path.is_file(): + raise argparse.ArgumentTypeError(f'No test named {test_name} exists!') + return exec_path + + +def parse_options(argv): + parser = argparse.ArgumentParser() + parser.add_argument('-t', '--timeout', type=float, default=10, + help='Timeout to kill test after.') + parser.add_argument('test_name', type=host_test) + return parser.parse_args(argv) + + +def main(argv): + opts = parse_options(argv) + + start_time = time.monotonic() + result, output = run_test(opts.test_name, timeout=opts.timeout) + elapsed_time = time.monotonic() - start_time + + print('{} {}! ({:.3f} seconds)'.format( + opts.test_name, result.reason, elapsed_time), + file=sys.stderr) + + if result is not TestResult.SUCCESS: + print('====== Emulator output ======', file=sys.stderr) + print(output.decode('utf-8'), file=sys.stderr) + print('=============================', file=sys.stderr) + return result.exit_code + + +if __name__ == '__main__': + sys.exit(main(sys.argv[1:])) -- cgit v1.2.1