summaryrefslogtreecommitdiff
path: root/util/run_qemu_test
diff options
context:
space:
mode:
Diffstat (limited to 'util/run_qemu_test')
-rwxr-xr-xutil/run_qemu_test253
1 files changed, 253 insertions, 0 deletions
diff --git a/util/run_qemu_test b/util/run_qemu_test
new file mode 100755
index 0000000000..ffc82215d2
--- /dev/null
+++ b/util/run_qemu_test
@@ -0,0 +1,253 @@
+#!/usr/bin/python
+# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+#
+# Python wrapper script for running tests under QEMU
+#
+
+import errno
+import imp
+import json
+import os
+import optparse
+import re
+import signal
+import socket
+import subprocess
+import sys
+import threading
+import time
+
+QEMU_BINARY="qemu-system-arm"
+QEMU_OPTIONS=["-machine","lm4f232h5","-serial","stdio","-display","none"]
+
+def trace(msg):
+ sys.stdout.write(msg)
+
+class QEMUError(Exception):
+ def __init__(self, value):
+ self.value = value
+
+ def __str__(self):
+ return "QEMU Error:" + repr(self.value)
+
+class QEMUInstance:
+ PORT=3456
+ QMP_ADDR=("127.0.0.1", PORT)
+
+ def __run_qemu(self, cmdline, redirect_stdio=False):
+ trace("Starting QEMU binary ...\n")
+ if redirect_stdio:
+ stdin = subprocess.PIPE
+ stdout = subprocess.PIPE
+ else:
+ stdin = None
+ stdout = None
+ self.__qemu = subprocess.Popen(cmdline, shell=False, bufsize=16384,
+ stdin=stdin, stdout=stdout, close_fds=True)
+ trace("QEMU started pid:%d\n" % (self.__qemu.pid))
+ self.__qemu.wait()
+ trace("QEMU has terminated\n")
+
+ def __init__(self, qemu_bin, firmware, romcode = None, testmode = False):
+ self.__events = []
+ cmdline = [qemu_bin] + QEMU_OPTIONS + ["-kernel",firmware,"-qmp","tcp:%s:%d" % self.QMP_ADDR]
+ if romcode:
+ cmdline += ["-bios",romcode]
+ self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.__sock.bind(self.QMP_ADDR)
+ self.__sock.listen(1)
+
+ self.__thr = threading.Thread(target=QEMUInstance.__run_qemu,args=(self,cmdline,testmode))
+ self.__thr.start()
+ try:
+ trace("Waiting for QEMU connection ...\n")
+ self.__sock, _ = self.__sock.accept()
+ self.__sockfd = self.__sock.makefile()
+ except socket.error:
+ raise QEMUError('Cannot connect to QMP server')
+
+ version = self.__json_recv()
+ if version is None or not version.has_key('QMP'):
+ raise QEMUError('Not QMP support')
+ # Test basic communication with QMP
+ resp = self.send_qmp('qmp_capabilities')
+ if not "return" in resp:
+ raise QEMUError('QMP not working properly')
+ trace("QMP connected\n")
+
+ def __json_recv(self, only_event=False):
+ while True:
+ data = self.__sockfd.readline()
+ if not data:
+ return
+ return json.loads(data)
+
+ def send_qmp(self, name, args=None):
+ qmp_cmd = { 'execute': name }
+ if args:
+ qmp_cmd['arguments'] = args
+ try:
+ self.__sock.sendall(json.dumps(qmp_cmd))
+ except socket.error, err:
+ if err[0] == errno.EPIPE:
+ return
+ raise QEMUError("Error on QMP socket:" + err)
+ return self.__json_recv()
+
+ def serial_readline(self):
+ return self.__qemu.stdout.readline()
+
+ def serial_write(self, string):
+ self.__qemu.stdin.write(string)
+ self.__qemu.stdin.flush()
+
+ def get_event(self, blocking=True):
+ if not blocking:
+ self.__sock.setblocking(0)
+ try:
+ val = self.__json_recv()
+ except socket.error, err:
+ if err[0] == errno.EAGAIN:
+ # Nothing available
+ return None
+ if not blocking:
+ self.__sock.setblocking(1)
+ return val
+
+ def close(self):
+ # Try to terminate QEMU gracefully
+ if self.__qemu.poll() == None:
+ self.send_qmp("quit")
+ time.sleep(0.1)
+ # Force termination if the process is still here :
+ if self.__qemu.poll() == None:
+ self.__qemu.terminate()
+ self.__thr.join()
+ self.__sock.close()
+ self.__sockfd.close()
+
+class TestFailure(Exception):
+ def __init__(self, reason):
+ self.value = reason
+
+ def __str__(self):
+ return "reason:" + repr(self.value)
+
+class EcTest:
+ def __init__(self, qemu_bin, firmware, romcode, test):
+ self.__qemu_bin = qemu_bin
+ self.__firmware = firmware
+ self.__romcode = romcode
+ self.__test = test
+
+ def timeout_handler(self, signum, frame):
+ raise TestFailure("Timeout waiting for %s" % self.__timeout_reason)
+
+ def wait_output(self, string, use_re = False, timeout = 5):
+ self.__timeout_reason = string
+ old_handler = signal.signal(signal.SIGALRM, lambda
+ s,f:self.timeout_handler(s,f))
+ if use_re:
+ regexp = re.compile(string)
+ signal.alarm(timeout)
+ while True:
+ ln = self.__qemu.serial_readline()
+ trace("[EC]%s" % ln)
+ if use_re:
+ res = regexp.search(ln)
+ if res:
+ signal.alarm(0)
+ signal.signal(signal.SIGALRM, old_handler)
+ return res.groupdict()
+ else:
+ if string in ln:
+ signal.alarm(0)
+ signal.signal(signal.SIGALRM, old_handler)
+ return
+
+ def wait_prompt(self):
+ self.wait_output("> ")
+
+ def ec_command(self, cmd):
+ self.__qemu.serial_write(cmd + '\r\n')
+
+ def trace(self, msg):
+ trace(msg)
+
+ def report(self, msg):
+ sys.stderr.write(" === TEST %s ===\n" % msg)
+
+ def fail(self, msg):
+ raise TestFailure(msg)
+
+ def run_test(self):
+ try:
+ self.__qemu = QEMUInstance(self.__qemu_bin, self.__firmware,
+ self.__romcode, True)
+ except QEMUError as e:
+ self.report("QEMU FATAL ERROR: " + e.value)
+ return 1
+
+ testmod = imp.load_module("testmodule", file(self.__test,"r"),
+ self.__test, (".py","r",imp.PY_SOURCE))
+ self.report("RUN: %s" % os.path.basename(self.__test))
+ try:
+ res = testmod.test(self)
+ except TestFailure as e:
+ res = False
+ self.report("FAIL: %s" % e.value)
+ self.__qemu.close()
+ if res:
+ self.report("PASS")
+ return 0
+ return 1
+
+def run_interactive(qemu_bin, firmware, romcode):
+ try:
+ qemu = QEMUInstance(qemu_bin, firmware, romcode, False)
+ except QEMUError as e:
+ sys.stderr.write('FATAL: %s\n' % e.value)
+ return 1
+
+ # Dummy testing code : TODO remove
+ #print qemu.send_qmp("query-commands")
+ #print qemu.send_qmp("human-monitor-command",
+ # { 'command-line': "sendkey ctrl-alt-f1 50",'cpu-index': 0 })
+ while True:
+ msg = qemu.get_event()
+ trace("[EVENT]%s\n" % msg)
+ if msg.has_key("event") and msg["event"] == "RESET":
+ break
+ qemu.close()
+ return 0
+
+def parse_cmdline(basedir):
+ parser = optparse.OptionParser("usage: %prog [options] [testname]")
+ parser.add_option("-b", "--board", dest="board", default="bds",
+ help="board to use")
+ parser.add_option("-i", "--image", dest="image",
+ help="firmware image filename")
+ parser.add_option("-r", "--rom", dest="romcode",
+ default=os.path.join(basedir,"util","rom_lm4fs1ge5bb.bin"),
+ help="ROM code image filename")
+ parser.add_option("-q", "--qemu", dest="qemu_bin",
+ default=os.path.join(basedir,"util",QEMU_BINARY),
+ help="Qemu binary path")
+ (options, args) = parser.parse_args()
+ if options.image:
+ image = options.image
+ else:
+ image = os.path.join(basedir,"build",options.board,"ec.bin")
+
+ return options.qemu_bin, image,options.romcode, args
+
+if __name__ == '__main__':
+ basedir = os.path.abspath(os.path.join(os.path.dirname(__file__),".."))
+ qemu_bin, image, romcode, tests = parse_cmdline(basedir)
+ if len(tests) > 0:
+ res = EcTest(qemu_bin, image, romcode, tests[0]).run_test()
+ else:
+ res = run_interactive(qemu_bin, image, romcode)
+ sys.exit(res)