diff options
author | Vincent Palatin <vpalatin@chromium.org> | 2011-12-07 18:58:43 +0000 |
---|---|---|
committer | Vincent Palatin <vpalatin@chromium.org> | 2011-12-07 19:10:02 +0000 |
commit | e24fa592d2a215d8ae67917c1d89e68cdf847a03 (patch) | |
tree | 47fbe4c55e7f4089cad7d619eded337da3bae999 /util | |
parent | 6396911897e4cd40f52636d710cee2865acf15e3 (diff) | |
download | chrome-ec-e24fa592d2a215d8ae67917c1d89e68cdf847a03.tar.gz |
Initial sources import 3/3
source files mainly done by Vincent.
Signed-off-by: Vincent Palatin <vpalatin@chromium.org>
Change-Id: Ic2d1becd400c9b4b4a14d4a243af1bdf77d9c1e2
Diffstat (limited to 'util')
-rw-r--r-- | util/build.mk | 5 | ||||
-rwxr-xr-x | util/qemu-system-arm | bin | 0 -> 9269950 bytes | |||
-rwxr-xr-x | util/run_qemu_test | 253 |
3 files changed, 258 insertions, 0 deletions
diff --git a/util/build.mk b/util/build.mk new file mode 100644 index 0000000000..1bf1f7f372 --- /dev/null +++ b/util/build.mk @@ -0,0 +1,5 @@ +# +# Host tools build +# + +util-bin=ectool diff --git a/util/qemu-system-arm b/util/qemu-system-arm Binary files differnew file mode 100755 index 0000000000..aba798e27b --- /dev/null +++ b/util/qemu-system-arm diff --git a/util/run_qemu_test b/util/run_qemu_test new file mode 100755 index 0000000000..ffc82215d2 --- /dev/null +++ b/util/run_qemu_test @@ -0,0 +1,253 @@ +#!/usr/bin/python +# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. +# +# Python wrapper script for running tests under QEMU +# + +import errno +import imp +import json +import os +import optparse +import re +import signal +import socket +import subprocess +import sys +import threading +import time + +QEMU_BINARY="qemu-system-arm" +QEMU_OPTIONS=["-machine","lm4f232h5","-serial","stdio","-display","none"] + +def trace(msg): + sys.stdout.write(msg) + +class QEMUError(Exception): + def __init__(self, value): + self.value = value + + def __str__(self): + return "QEMU Error:" + repr(self.value) + +class QEMUInstance: + PORT=3456 + QMP_ADDR=("127.0.0.1", PORT) + + def __run_qemu(self, cmdline, redirect_stdio=False): + trace("Starting QEMU binary ...\n") + if redirect_stdio: + stdin = subprocess.PIPE + stdout = subprocess.PIPE + else: + stdin = None + stdout = None + self.__qemu = subprocess.Popen(cmdline, shell=False, bufsize=16384, + stdin=stdin, stdout=stdout, close_fds=True) + trace("QEMU started pid:%d\n" % (self.__qemu.pid)) + self.__qemu.wait() + trace("QEMU has terminated\n") + + def __init__(self, qemu_bin, firmware, romcode = None, testmode = False): + self.__events = [] + cmdline = [qemu_bin] + QEMU_OPTIONS + ["-kernel",firmware,"-qmp","tcp:%s:%d" % self.QMP_ADDR] + if romcode: + cmdline += ["-bios",romcode] + self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.__sock.bind(self.QMP_ADDR) + self.__sock.listen(1) + + self.__thr = threading.Thread(target=QEMUInstance.__run_qemu,args=(self,cmdline,testmode)) + self.__thr.start() + try: + trace("Waiting for QEMU connection ...\n") + self.__sock, _ = self.__sock.accept() + self.__sockfd = self.__sock.makefile() + except socket.error: + raise QEMUError('Cannot connect to QMP server') + + version = self.__json_recv() + if version is None or not version.has_key('QMP'): + raise QEMUError('Not QMP support') + # Test basic communication with QMP + resp = self.send_qmp('qmp_capabilities') + if not "return" in resp: + raise QEMUError('QMP not working properly') + trace("QMP connected\n") + + def __json_recv(self, only_event=False): + while True: + data = self.__sockfd.readline() + if not data: + return + return json.loads(data) + + def send_qmp(self, name, args=None): + qmp_cmd = { 'execute': name } + if args: + qmp_cmd['arguments'] = args + try: + self.__sock.sendall(json.dumps(qmp_cmd)) + except socket.error, err: + if err[0] == errno.EPIPE: + return + raise QEMUError("Error on QMP socket:" + err) + return self.__json_recv() + + def serial_readline(self): + return self.__qemu.stdout.readline() + + def serial_write(self, string): + self.__qemu.stdin.write(string) + self.__qemu.stdin.flush() + + def get_event(self, blocking=True): + if not blocking: + self.__sock.setblocking(0) + try: + val = self.__json_recv() + except socket.error, err: + if err[0] == errno.EAGAIN: + # Nothing available + return None + if not blocking: + self.__sock.setblocking(1) + return val + + def close(self): + # Try to terminate QEMU gracefully + if self.__qemu.poll() == None: + self.send_qmp("quit") + time.sleep(0.1) + # Force termination if the process is still here : + if self.__qemu.poll() == None: + self.__qemu.terminate() + self.__thr.join() + self.__sock.close() + self.__sockfd.close() + +class TestFailure(Exception): + def __init__(self, reason): + self.value = reason + + def __str__(self): + return "reason:" + repr(self.value) + +class EcTest: + def __init__(self, qemu_bin, firmware, romcode, test): + self.__qemu_bin = qemu_bin + self.__firmware = firmware + self.__romcode = romcode + self.__test = test + + def timeout_handler(self, signum, frame): + raise TestFailure("Timeout waiting for %s" % self.__timeout_reason) + + def wait_output(self, string, use_re = False, timeout = 5): + self.__timeout_reason = string + old_handler = signal.signal(signal.SIGALRM, lambda + s,f:self.timeout_handler(s,f)) + if use_re: + regexp = re.compile(string) + signal.alarm(timeout) + while True: + ln = self.__qemu.serial_readline() + trace("[EC]%s" % ln) + if use_re: + res = regexp.search(ln) + if res: + signal.alarm(0) + signal.signal(signal.SIGALRM, old_handler) + return res.groupdict() + else: + if string in ln: + signal.alarm(0) + signal.signal(signal.SIGALRM, old_handler) + return + + def wait_prompt(self): + self.wait_output("> ") + + def ec_command(self, cmd): + self.__qemu.serial_write(cmd + '\r\n') + + def trace(self, msg): + trace(msg) + + def report(self, msg): + sys.stderr.write(" === TEST %s ===\n" % msg) + + def fail(self, msg): + raise TestFailure(msg) + + def run_test(self): + try: + self.__qemu = QEMUInstance(self.__qemu_bin, self.__firmware, + self.__romcode, True) + except QEMUError as e: + self.report("QEMU FATAL ERROR: " + e.value) + return 1 + + testmod = imp.load_module("testmodule", file(self.__test,"r"), + self.__test, (".py","r",imp.PY_SOURCE)) + self.report("RUN: %s" % os.path.basename(self.__test)) + try: + res = testmod.test(self) + except TestFailure as e: + res = False + self.report("FAIL: %s" % e.value) + self.__qemu.close() + if res: + self.report("PASS") + return 0 + return 1 + +def run_interactive(qemu_bin, firmware, romcode): + try: + qemu = QEMUInstance(qemu_bin, firmware, romcode, False) + except QEMUError as e: + sys.stderr.write('FATAL: %s\n' % e.value) + return 1 + + # Dummy testing code : TODO remove + #print qemu.send_qmp("query-commands") + #print qemu.send_qmp("human-monitor-command", + # { 'command-line': "sendkey ctrl-alt-f1 50",'cpu-index': 0 }) + while True: + msg = qemu.get_event() + trace("[EVENT]%s\n" % msg) + if msg.has_key("event") and msg["event"] == "RESET": + break + qemu.close() + return 0 + +def parse_cmdline(basedir): + parser = optparse.OptionParser("usage: %prog [options] [testname]") + parser.add_option("-b", "--board", dest="board", default="bds", + help="board to use") + parser.add_option("-i", "--image", dest="image", + help="firmware image filename") + parser.add_option("-r", "--rom", dest="romcode", + default=os.path.join(basedir,"util","rom_lm4fs1ge5bb.bin"), + help="ROM code image filename") + parser.add_option("-q", "--qemu", dest="qemu_bin", + default=os.path.join(basedir,"util",QEMU_BINARY), + help="Qemu binary path") + (options, args) = parser.parse_args() + if options.image: + image = options.image + else: + image = os.path.join(basedir,"build",options.board,"ec.bin") + + return options.qemu_bin, image,options.romcode, args + +if __name__ == '__main__': + basedir = os.path.abspath(os.path.join(os.path.dirname(__file__),"..")) + qemu_bin, image, romcode, tests = parse_cmdline(basedir) + if len(tests) > 0: + res = EcTest(qemu_bin, image, romcode, tests[0]).run_test() + else: + res = run_interactive(qemu_bin, image, romcode) + sys.exit(res) |