summaryrefslogtreecommitdiff
path: root/util/run_qemu_test
diff options
context:
space:
mode:
Diffstat (limited to 'util/run_qemu_test')
-rwxr-xr-xutil/run_qemu_test265
1 files changed, 0 insertions, 265 deletions
diff --git a/util/run_qemu_test b/util/run_qemu_test
deleted file mode 100755
index 1889eedcf3..0000000000
--- a/util/run_qemu_test
+++ /dev/null
@@ -1,265 +0,0 @@
-#!/usr/bin/python
-# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-#
-# Python wrapper script for running tests under QEMU
-#
-
-import errno
-import imp
-import json
-import os
-import optparse
-import re
-import signal
-import socket
-import subprocess
-import sys
-import threading
-import time
-
-QEMU_BINARY="qemu-system-arm"
-QEMU_OPTIONS=["-machine","lm4f232h5","-serial","stdio","-display","none"]
-
-def trace(msg):
- sys.stdout.write(msg)
-
-class QEMUError(Exception):
- def __init__(self, value):
- self.value = value
-
- def __str__(self):
- return "QEMU Error:" + repr(self.value)
-
-class QEMUInstance:
- PORT=3456
- QMP_ADDR=("127.0.0.1", PORT)
-
- def __run_qemu(self, cmdline, redirect_stdio=False):
- trace("Starting QEMU binary ...\n")
- if redirect_stdio:
- stdin = subprocess.PIPE
- stdout = subprocess.PIPE
- else:
- stdin = None
- stdout = None
- self.__qemu = subprocess.Popen(cmdline, shell=False, bufsize=16384,
- stdin=stdin, stdout=stdout, close_fds=True)
- trace("QEMU started pid:%d\n" % (self.__qemu.pid))
- self.__qemu.wait()
- trace("QEMU has terminated\n")
-
- def __init__(self, qemu_bin, firmware, romcode = None, testmode = False):
- self.__events = []
- cmdline = [qemu_bin] + QEMU_OPTIONS + ["-kernel",firmware,"-qmp","tcp:%s:%d" % self.QMP_ADDR]
- if romcode:
- cmdline += ["-bios",romcode]
- self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.__sock.bind(self.QMP_ADDR)
- self.__sock.listen(1)
-
- self.__thr = threading.Thread(target=QEMUInstance.__run_qemu,args=(self,cmdline,testmode))
- self.__thr.start()
- try:
- trace("Waiting for QEMU connection ...\n")
- self.__sock, _ = self.__sock.accept()
- self.__sockfd = self.__sock.makefile()
- except socket.error:
- raise QEMUError('Cannot connect to QMP server')
-
- version = self.__json_recv()
- if version is None or not version.has_key('QMP'):
- raise QEMUError('Not QMP support')
- # Test basic communication with QMP
- resp = self.send_qmp('qmp_capabilities')
- if not "return" in resp:
- raise QEMUError('QMP not working properly')
- trace("QMP connected\n")
-
- def __json_recv(self, only_event=False):
- while True:
- data = self.__sockfd.readline()
- if not data:
- return
- return json.loads(data)
-
- def send_qmp(self, name, args=None):
- qmp_cmd = { 'execute': name }
- if args:
- qmp_cmd['arguments'] = args
- try:
- self.__sock.sendall(json.dumps(qmp_cmd))
- except socket.error, err:
- if err[0] == errno.EPIPE:
- return
- raise QEMUError("Error on QMP socket:" + err)
- return self.__json_recv()
-
- def serial_readline(self):
- return self.__qemu.stdout.readline()
-
- def serial_write(self, string):
- self.__qemu.stdin.write(string)
- self.__qemu.stdin.flush()
-
- def get_event(self, blocking=True):
- if not blocking:
- self.__sock.setblocking(0)
- try:
- val = self.__json_recv()
- except socket.error, err:
- if err[0] == errno.EAGAIN:
- # Nothing available
- return None
- if not blocking:
- self.__sock.setblocking(1)
- return val
-
- def close(self):
- # Try to terminate QEMU gracefully
- if self.__qemu.poll() == None:
- self.send_qmp("quit")
- time.sleep(0.1)
- # Force termination if the process is still here :
- if self.__qemu.poll() == None:
- self.__qemu.terminate()
- self.__thr.join()
- self.__sock.close()
- self.__sockfd.close()
-
-class TestFailure(Exception):
- def __init__(self, reason):
- self.value = reason
-
- def __str__(self):
- return "reason:" + repr(self.value)
-
-class EcTest:
- def __init__(self, qemu_bin, firmware, romcode, test):
- self.__qemu_bin = qemu_bin
- self.__firmware = firmware
- self.__romcode = romcode
- self.__test = test
-
- def timeout_handler(self, signum, frame):
- raise TestFailure("Timeout waiting for %s" % self.__timeout_reason)
-
- def wait_output(self, string, use_re = False, timeout = 5):
- self.__timeout_reason = string
- old_handler = signal.signal(signal.SIGALRM, lambda
- s,f:self.timeout_handler(s,f))
- if use_re:
- regexp = re.compile(string)
- signal.alarm(timeout)
- while True:
- ln = self.__qemu.serial_readline()
- trace("[EC]%s" % ln)
- if use_re:
- res = regexp.search(ln)
- if res:
- signal.alarm(0)
- signal.signal(signal.SIGALRM, old_handler)
- return res.groupdict()
- else:
- if string in ln:
- signal.alarm(0)
- signal.signal(signal.SIGALRM, old_handler)
- return
-
- def check_no_output(self, string, use_re = False, timeout = 1):
- success = False
- try:
- self.wait_output(string, use_re=use_re, timeout=timeout)
- except:
- success = True
- return success
-
-
- def wait_prompt(self):
- self.wait_output("> ")
-
- def ec_command(self, cmd):
- self.__qemu.serial_write(cmd + '\r\n')
-
- def trace(self, msg):
- trace(msg)
-
- def report(self, msg):
- sys.stderr.write(" === TEST %s ===\n" % msg)
-
- def fail(self, msg):
- raise TestFailure(msg)
-
- def run_test(self):
- try:
- self.__qemu = QEMUInstance(self.__qemu_bin, self.__firmware,
- self.__romcode, True)
- except QEMUError as e:
- self.report("QEMU FATAL ERROR: " + e.value)
- return 1
-
- # Set up import path so each test can import other modules inside 'test'
- sys.path.insert(0, os.path.dirname(os.path.abspath(self.__test)))
-
- testmod = imp.load_module("testmodule", file(self.__test,"r"),
- self.__test, (".py","r",imp.PY_SOURCE))
- self.report("RUN: %s" % os.path.basename(self.__test))
- try:
- res = testmod.test(self)
- except TestFailure as e:
- res = False
- self.report("FAIL: %s" % e.value)
- self.__qemu.close()
- if res:
- self.report("PASS")
- return 0
- return 1
-
-def run_interactive(qemu_bin, firmware, romcode):
- try:
- qemu = QEMUInstance(qemu_bin, firmware, romcode, False)
- except QEMUError as e:
- sys.stderr.write('FATAL: %s\n' % e.value)
- return 1
-
- # Dummy testing code : TODO remove
- #print qemu.send_qmp("query-commands")
- #print qemu.send_qmp("human-monitor-command",
- # { 'command-line': "sendkey ctrl-alt-f1 50",'cpu-index': 0 })
- while True:
- msg = qemu.get_event()
- trace("[EVENT]%s\n" % msg)
- if msg.has_key("event") and msg["event"] == "RESET":
- break
- qemu.close()
- return 0
-
-def parse_cmdline(basedir):
- parser = optparse.OptionParser("usage: %prog [options] [testname]")
- parser.add_option("-b", "--board", dest="board", default="bds",
- help="board to use")
- parser.add_option("-i", "--image", dest="image",
- help="firmware image filename")
- parser.add_option("-r", "--rom", dest="romcode",
- default=os.path.join(basedir,"util","rom_lm4fs1ge5bb.bin"),
- help="ROM code image filename")
- parser.add_option("-q", "--qemu", dest="qemu_bin",
- default=os.path.join(basedir,"util",QEMU_BINARY),
- help="Qemu binary path")
- (options, args) = parser.parse_args()
- if options.image:
- image = options.image
- else:
- image = os.path.join(basedir,"build",options.board,"ec.bin")
-
- return options.qemu_bin, image,options.romcode, args
-
-if __name__ == '__main__':
- basedir = os.path.abspath(os.path.join(os.path.dirname(__file__),".."))
- qemu_bin, image, romcode, tests = parse_cmdline(basedir)
- if len(tests) > 0:
- res = EcTest(qemu_bin, image, romcode, tests[0]).run_test()
- else:
- res = run_interactive(qemu_bin, image, romcode)
- sys.exit(res)