diff options
author | Rafael H. Schloming <rhs@apache.org> | 2009-06-02 14:24:57 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2009-06-02 14:24:57 +0000 |
commit | 3dc19387981b88d9f5eb9a3e78d2ce4266c42b8c (patch) | |
tree | 40e22443f9cb713fbfb0de83f590b47b44162295 /qpid/python/qpid-python-test | |
parent | 7b4ced5276092feb0a1a5f1f8b8945a683aa017e (diff) | |
download | qpid-python-3dc19387981b88d9f5eb9a3e78d2ce4266c42b8c.tar.gz |
first commit of new messaging API and test harness
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@781044 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python/qpid-python-test')
-rwxr-xr-x | qpid/python/qpid-python-test | 450 |
1 files changed, 450 insertions, 0 deletions
diff --git a/qpid/python/qpid-python-test b/qpid/python/qpid-python-test new file mode 100755 index 0000000000..287637d7a0 --- /dev/null +++ b/qpid/python/qpid-python-test @@ -0,0 +1,450 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, os, sys, logging, traceback, types +from fnmatch import fnmatchcase as match +from getopt import GetoptError +from logging import getLogger, StreamHandler, Formatter, Filter, \ + WARN, DEBUG, ERROR +from qpid.util import URL + +levels = { + "DEBUG": DEBUG, + "WARN": WARN, + "ERROR": ERROR + } + +sorted_levels = [(v, k) for k, v in levels.items()] +sorted_levels.sort() +sorted_levels = [v for k, v in sorted_levels] + +parser = optparse.OptionParser(usage="usage: %prog [options] PATTERN ...", + description="Run tests matching the specified PATTERNs.") +parser.add_option("-l", "--list", action="store_true", default=False, + help="list tests instead of executing them") +parser.add_option("-b", "--broker", default="localhost", + help="run tests against BROKER (default %default)") +parser.add_option("-f", "--log-file", metavar="FILE", help="log output to FILE") +parser.add_option("-v", "--log-level", metavar="LEVEL", default="WARN", + help="only display log messages of LEVEL or higher severity: " + "%s (default %%default)" % ", ".join(sorted_levels)) +parser.add_option("-c", "--log-category", metavar="CATEGORY", action="append", + dest="log_categories", default=[], + help="log only categories matching CATEGORY pattern") +parser.add_option("-i", "--ignore", action="append", default=[], + help="ignore tests matching IGNORE pattern") +parser.add_option("-I", "--ignore-file", metavar="IFILE", action="append", + default=[], + help="ignore tests matching patterns in IFILE") + +class Config: + + def __init__(self): + self.broker = URL("localhost") + self.work = None + self.log_file = None + self.log_level = WARN + self.log_categories = [] + +opts, args = parser.parse_args() + +includes = [] +excludes = ["*__*__"] +config = Config() +list_only = opts.list +config.broker = URL(opts.broker) +config.log_file = opts.log_file +config.log_level = levels[opts.log_level.upper()] +config.log_categories = opts.log_categories +excludes.extend([v.strip() for v in opts.ignore]) +for v in opts.ignore_file: + f = open(v) + for line in f: + line = line.strip() + if line.startswith("#"): + continue + excludes.append(line) + f.close() + +for a in args: + includes.append(a.strip()) + +if not includes: + includes.append("*") + +def included(path): + for p in excludes: + if match(path, p): + return False + for p in includes: + if match(path, p): + return True + return False + +def vt100_attrs(*attrs): + return "\x1B[%sm" % ";".join(map(str, attrs)) + +vt100_reset = vt100_attrs(0) + +KEYWORDS = {"pass": (32,), + "fail": (31,), + "start": (34,), + "total": (34,)} + +COLORIZE = sys.stdout.isatty() + +def colorize_word(word, text=None): + if text is None: + text = word + return colorize(text, *KEYWORDS.get(word, ())) + +def colorize(text, *attrs): + if attrs and COLORIZE: + return "%s%s%s" % (vt100_attrs(*attrs), text, vt100_reset) + else: + return text + +def indent(text): + lines = text.split("\n") + return " %s" % "\n ".join(lines) + +from qpid.testlib import testrunner +testrunner.setBroker(str(config.broker)) + +class Interceptor: + + def __init__(self): + self.newline = False + self.indent = False + self.passthrough = True + self.dirty = False + self.last = None + + def begin(self): + self.newline = True + self.indent = True + self.passthrough = False + self.dirty = False + self.last = None + + def reset(self): + self.newline = False + self.indent = False + self.passthrough = True + +class StreamWrapper: + + def __init__(self, interceptor, stream, prefix=" "): + self.interceptor = interceptor + self.stream = stream + self.prefix = prefix + + def isatty(self): + return self.stream.isatty() + + def write(self, s): + if self.interceptor.passthrough: + self.stream.write(s) + return + + if s: + self.interceptor.dirty = True + + if self.interceptor.newline: + self.interceptor.newline = False + self.stream.write(" %s\n" % colorize_word("start")) + self.interceptor.indent = True + if self.interceptor.indent: + self.stream.write(self.prefix) + if s.endswith("\n"): + s = s.replace("\n", "\n%s" % self.prefix)[:-2] + self.interceptor.indent = True + else: + s = s.replace("\n", "\n%s" % self.prefix) + self.interceptor.indent = False + self.stream.write(s) + + if s: + self.interceptor.last = s[-1] + + def flush(self): + self.stream.flush() + +interceptor = Interceptor() + +out_wrp = StreamWrapper(interceptor, sys.stdout) +err_wrp = StreamWrapper(interceptor, sys.stderr) + +out = sys.stdout +err = sys.stderr +sys.stdout = out_wrp +sys.stderr = err_wrp + +class PatternFilter(Filter): + + def __init__(self, *patterns): + Filter.__init__(self, patterns) + self.patterns = patterns + + def filter(self, record): + if not self.patterns: + return True + for p in self.patterns: + if match(record.name, p): + return True + return False + +root = getLogger() +handler = StreamHandler(sys.stdout) +filter = PatternFilter(*config.log_categories) +handler.addFilter(filter) +handler.setFormatter(Formatter("%(asctime)s %(levelname)s %(message)s")) +root.addHandler(handler) +root.setLevel(WARN) + +log = getLogger("qpid.test") + +class Runner: + + def __init__(self): + self.exceptions = [] + + def passed(self): + return not self.exceptions + + def failed(self): + return self.exceptions + + def run(self, name, phase): + try: + phase() + except KeyboardInterrupt: + raise + except: + self.exceptions.append((name, sys.exc_info())) + + def status(self): + if self.passed(): + return "pass" + else: + return "fail" + + def print_exceptions(self): + for name, info in self.exceptions: + print "Error during %s:" % name + print indent("".join(traceback.format_exception(*info))).rstrip() + +def run_test(name, test, config): + patterns = filter.patterns + level = root.level + filter.patterns = config.log_categories + root.setLevel(config.log_level) + + parts = name.split(".") + line = None + output = "" + for part in parts: + if line: + if len(line) + len(part) >= 71: + output += "%s. \\\n" % line + line = " %s" % part + else: + line = "%s.%s" % (line, part) + else: + line = part + + if line: + output += "%s %s" % (line, ((72 - len(line))*".")) + sys.stdout.write(output) + sys.stdout.flush() + interceptor.begin() + try: + runner = test() + finally: + interceptor.reset() + if interceptor.dirty: + if interceptor.last != "\n": + sys.stdout.write("\n") + sys.stdout.write(output) + print " %s" % colorize_word(runner.status()) + if runner.failed(): + runner.print_exceptions() + root.setLevel(level) + filter.patterns = patterns + return runner.passed() + +class FunctionTest: + + def __init__(self, test): + self.test = test + + def name(self): + return "%s.%s" % (self.test.__module__, self.test.__name__) + + def run(self): + return run_test(self.name(), self._run, config) + + def _run(self): + runner = Runner() + runner.run("test", lambda: self.test(config)) + return runner + + def __repr__(self): + return "FunctionTest(%r)" % self.test + +class MethodTest: + + def __init__(self, cls, method): + self.cls = cls + self.method = method + + def name(self): + return "%s.%s.%s" % (self.cls.__module__, self.cls.__name__, self.method) + + def run(self): + return run_test(self.name(), self._run, config) + + def _run(self): + runner = Runner() + inst = self.cls(self.method) + test = getattr(inst, self.method) + + if hasattr(inst, "configure"): + runner.run("configure", lambda: inst.configure(config)) + if runner.failed(): return runner + if hasattr(inst, "setUp"): + runner.run("setup", inst.setUp) + if runner.failed(): return runner + elif hasattr(inst, "setup"): + runner.run("setup", inst.setup) + if runner.failed(): return runner + + runner.run("test", test) + + if hasattr(inst, "tearDown"): + runner.run("teardown", inst.tearDown) + elif hasattr(inst, "teardown"): + runner.run("teardown", inst.teardown) + + return runner + + def __repr__(self): + return "MethodTest(%r, %r)" % (self.cls, self.method) + +class PatternMatcher: + + def __init__(self, *patterns): + self.patterns = patterns + + def matches(self, name): + for p in self.patterns: + if match(name, p): + return True + return False + +class FunctionScanner(PatternMatcher): + + def inspect(self, obj): + return type(obj) == types.FunctionType and self.matches(name) + + def descend(self, func): + return; yield + + def extract(self, func): + yield FunctionTest(func) + +class ClassScanner(PatternMatcher): + + def inspect(self, obj): + return type(obj) in (types.ClassType, types.TypeType) and self.matches(obj.__name__) + + def descend(self, cls): + return; yield + + def extract(self, cls): + names = dir(cls) + names.sort() + for name in names: + obj = getattr(cls, name) + t = type(obj) + if t == types.MethodType and name.startswith("test"): + yield MethodTest(cls, name) + +class ModuleScanner: + + def inspect(self, obj): + return type(obj) == types.ModuleType + + def descend(self, obj): + names = dir(obj) + names.sort() + for name in names: + yield getattr(obj, name) + + def extract(self, obj): + return; yield + +class Harness: + + def __init__(self): + self.scanners = [ + ModuleScanner(), + ClassScanner("*Test", "*Tests", "*TestCase"), + FunctionScanner("test_*") + ] + self.tests = [] + self.scanned = [] + + def scan(self, *roots): + objects = list(roots) + + while objects: + obj = objects.pop(0) + for s in self.scanners: + if s.inspect(obj): + self.tests.extend(s.extract(obj)) + for child in s.descend(obj): + if not (child in self.scanned or child in objects): + objects.append(child) + self.scanned.append(obj) + +modules = "qpid.tests", "tests", "tests_0-10" +h = Harness() +for name in modules: + m = __import__(name, fromlist=["dummy"]) + h.scan(m) + +filtered = [t for t in h.tests if included(t.name())] +passed = 0 +failed = 0 +for t in filtered: + if list_only: + print t.name() + else: + if t.run(): + passed += 1 + else: + failed += 1 + +if not list_only: + print colorize("Totals:", 1), \ + colorize_word("total", "%s tests" % len(filtered)) + ",", \ + colorize_word("pass", "%s passed" % passed) + ",", \ + colorize_word("fail" if failed else "pass", "%s failed" % failed) |