#!/usr/bin/python # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. # # Python wrapper script for running tests under QEMU # import errno import imp import json import os import optparse import re import signal import socket import subprocess import sys import threading import time QEMU_BINARY="qemu-system-arm" QEMU_OPTIONS=["-machine","lm4f232h5","-serial","stdio","-display","none"] def trace(msg): sys.stdout.write(msg) class QEMUError(Exception): def __init__(self, value): self.value = value def __str__(self): return "QEMU Error:" + repr(self.value) class QEMUInstance: PORT=3456 QMP_ADDR=("127.0.0.1", PORT) def __run_qemu(self, cmdline, redirect_stdio=False): trace("Starting QEMU binary ...\n") if redirect_stdio: stdin = subprocess.PIPE stdout = subprocess.PIPE else: stdin = None stdout = None self.__qemu = subprocess.Popen(cmdline, shell=False, bufsize=16384, stdin=stdin, stdout=stdout, close_fds=True) trace("QEMU started pid:%d\n" % (self.__qemu.pid)) self.__qemu.wait() trace("QEMU has terminated\n") def __init__(self, qemu_bin, firmware, romcode = None, testmode = False): self.__events = [] cmdline = [qemu_bin] + QEMU_OPTIONS + ["-kernel",firmware,"-qmp","tcp:%s:%d" % self.QMP_ADDR] if romcode: cmdline += ["-bios",romcode] self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.__sock.bind(self.QMP_ADDR) self.__sock.listen(1) self.__thr = threading.Thread(target=QEMUInstance.__run_qemu,args=(self,cmdline,testmode)) self.__thr.start() try: trace("Waiting for QEMU connection ...\n") self.__sock, _ = self.__sock.accept() self.__sockfd = self.__sock.makefile() except socket.error: raise QEMUError('Cannot connect to QMP server') version = self.__json_recv() if version is None or not version.has_key('QMP'): raise QEMUError('Not QMP support') # Test basic communication with QMP resp = self.send_qmp('qmp_capabilities') if not "return" in resp: raise QEMUError('QMP not working properly') trace("QMP connected\n") def __json_recv(self, only_event=False): while True: data = self.__sockfd.readline() if not data: return return json.loads(data) def send_qmp(self, name, args=None): qmp_cmd = { 'execute': name } if args: qmp_cmd['arguments'] = args try: self.__sock.sendall(json.dumps(qmp_cmd)) except socket.error, err: if err[0] == errno.EPIPE: return raise QEMUError("Error on QMP socket:" + err) return self.__json_recv() def serial_readline(self): return self.__qemu.stdout.readline() def serial_write(self, string): self.__qemu.stdin.write(string) self.__qemu.stdin.flush() def get_event(self, blocking=True): if not blocking: self.__sock.setblocking(0) try: val = self.__json_recv() except socket.error, err: if err[0] == errno.EAGAIN: # Nothing available return None if not blocking: self.__sock.setblocking(1) return val def close(self): # Try to terminate QEMU gracefully if self.__qemu.poll() == None: self.send_qmp("quit") time.sleep(0.1) # Force termination if the process is still here : if self.__qemu.poll() == None: self.__qemu.terminate() self.__thr.join() self.__sock.close() self.__sockfd.close() class TestFailure(Exception): def __init__(self, reason): self.value = reason def __str__(self): return "reason:" + repr(self.value) class EcTest: def __init__(self, qemu_bin, firmware, romcode, test): self.__qemu_bin = qemu_bin self.__firmware = firmware self.__romcode = romcode self.__test = test def timeout_handler(self, signum, frame): raise TestFailure("Timeout waiting for %s" % self.__timeout_reason) def wait_output(self, string, use_re = False, timeout = 5): self.__timeout_reason = string old_handler = signal.signal(signal.SIGALRM, lambda s,f:self.timeout_handler(s,f)) if use_re: regexp = re.compile(string) signal.alarm(timeout) while True: ln = self.__qemu.serial_readline() trace("[EC]%s" % ln) if use_re: res = regexp.search(ln) if res: signal.alarm(0) signal.signal(signal.SIGALRM, old_handler) return res.groupdict() else: if string in ln: signal.alarm(0) signal.signal(signal.SIGALRM, old_handler) return def check_no_output(self, string, use_re = False, timeout = 1): success = False try: self.wait_output(string, use_re=use_re, timeout=timeout) except: success = True return success def wait_prompt(self): self.wait_output("> ") def ec_command(self, cmd): self.__qemu.serial_write(cmd + '\r\n') def trace(self, msg): trace(msg) def report(self, msg): sys.stderr.write(" === TEST %s ===\n" % msg) def fail(self, msg): raise TestFailure(msg) def run_test(self): try: self.__qemu = QEMUInstance(self.__qemu_bin, self.__firmware, self.__romcode, True) except QEMUError as e: self.report("QEMU FATAL ERROR: " + e.value) return 1 # Set up import path so each test can import other modules inside 'test' sys.path.insert(0, os.path.dirname(os.path.abspath(self.__test))) testmod = imp.load_module("testmodule", file(self.__test,"r"), self.__test, (".py","r",imp.PY_SOURCE)) self.report("RUN: %s" % os.path.basename(self.__test)) try: res = testmod.test(self) except TestFailure as e: res = False self.report("FAIL: %s" % e.value) self.__qemu.close() if res: self.report("PASS") return 0 return 1 def run_interactive(qemu_bin, firmware, romcode): try: qemu = QEMUInstance(qemu_bin, firmware, romcode, False) except QEMUError as e: sys.stderr.write('FATAL: %s\n' % e.value) return 1 # Dummy testing code : TODO remove #print qemu.send_qmp("query-commands") #print qemu.send_qmp("human-monitor-command", # { 'command-line': "sendkey ctrl-alt-f1 50",'cpu-index': 0 }) while True: msg = qemu.get_event() trace("[EVENT]%s\n" % msg) if msg.has_key("event") and msg["event"] == "RESET": break qemu.close() return 0 def parse_cmdline(basedir): parser = optparse.OptionParser("usage: %prog [options] [testname]") parser.add_option("-b", "--board", dest="board", default="bds", help="board to use") parser.add_option("-i", "--image", dest="image", help="firmware image filename") parser.add_option("-r", "--rom", dest="romcode", default=os.path.join(basedir,"util","rom_lm4fs1ge5bb.bin"), help="ROM code image filename") parser.add_option("-q", "--qemu", dest="qemu_bin", default=os.path.join(basedir,"util",QEMU_BINARY), help="Qemu binary path") (options, args) = parser.parse_args() if options.image: image = options.image else: image = os.path.join(basedir,"build",options.board,"ec.bin") return options.qemu_bin, image,options.romcode, args if __name__ == '__main__': basedir = os.path.abspath(os.path.join(os.path.dirname(__file__),"..")) qemu_bin, image, romcode, tests = parse_cmdline(basedir) if len(tests) > 0: res = EcTest(qemu_bin, image, romcode, tests[0]).run_test() else: res = run_interactive(qemu_bin, image, romcode) sys.exit(res)