summaryrefslogtreecommitdiff
path: root/qpid/python/qpid-python-test
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python/qpid-python-test')
-rwxr-xr-xqpid/python/qpid-python-test639
1 files changed, 639 insertions, 0 deletions
diff --git a/qpid/python/qpid-python-test b/qpid/python/qpid-python-test
new file mode 100755
index 0000000000..dfe6a6fc7a
--- /dev/null
+++ b/qpid/python/qpid-python-test
@@ -0,0 +1,639 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# TODO: summarize, test harness preconditions (e.g. broker is alive)
+
+import logging, optparse, os, struct, sys, time, traceback, types
+from fnmatch import fnmatchcase as match
+from getopt import GetoptError
+from logging import getLogger, StreamHandler, Formatter, Filter, \
+ WARN, DEBUG, ERROR
+from qpid.harness import Skipped
+from qpid.util import URL
+
+levels = {
+ "DEBUG": DEBUG,
+ "WARN": WARN,
+ "ERROR": ERROR
+ }
+
+sorted_levels = [(v, k) for k, v in levels.items()]
+sorted_levels.sort()
+sorted_levels = [v for k, v in sorted_levels]
+
+parser = optparse.OptionParser(usage="usage: %prog [options] PATTERN ...",
+ description="Run tests matching the specified PATTERNs.")
+parser.add_option("-l", "--list", action="store_true", default=False,
+ help="list tests instead of executing them")
+parser.add_option("-b", "--broker", default="localhost",
+ help="run tests against BROKER (default %default)")
+parser.add_option("-f", "--log-file", metavar="FILE", help="log output to FILE")
+parser.add_option("-v", "--log-level", metavar="LEVEL", default="WARN",
+ help="only display log messages of LEVEL or higher severity: "
+ "%s (default %%default)" % ", ".join(sorted_levels))
+parser.add_option("-c", "--log-category", metavar="CATEGORY", action="append",
+ dest="log_categories", default=[],
+ help="log only categories matching CATEGORY pattern")
+parser.add_option("-m", "--module", action="append", default=[],
+ dest="modules", help="add module to test search path")
+parser.add_option("-i", "--ignore", action="append", default=[],
+ help="ignore tests matching IGNORE pattern")
+parser.add_option("-I", "--ignore-file", metavar="IFILE", action="append",
+ default=[],
+ help="ignore tests matching patterns in IFILE")
+parser.add_option("-H", "--halt-on-error", action="store_true", default=False,
+ dest="hoe", help="halt if an error is encountered")
+parser.add_option("-t", "--time", action="store_true", default=False,
+ help="report timing information on test run")
+parser.add_option("-D", "--define", metavar="DEFINE", dest="defines",
+ action="append", default=[], help="define test parameters")
+parser.add_option("-x", "--xml", metavar="XML", dest="xml",
+ help="write test results in Junit style xml suitable for use by CI tools etc")
+
+class Config:
+
+ def __init__(self):
+ self.broker = URL("localhost")
+ self.defines = {}
+ self.log_file = None
+ self.log_level = WARN
+ self.log_categories = []
+
+opts, args = parser.parse_args()
+
+includes = []
+excludes = ["*__*__"]
+config = Config()
+list_only = opts.list
+config.broker = URL(opts.broker)
+for d in opts.defines:
+ try:
+ idx = d.index("=")
+ name = d[:idx]
+ value = d[idx+1:]
+ config.defines[name] = value
+ except ValueError:
+ config.defines[d] = None
+config.log_file = opts.log_file
+config.log_level = levels[opts.log_level.upper()]
+config.log_categories = opts.log_categories
+excludes.extend([v.strip() for v in opts.ignore])
+for v in opts.ignore_file:
+ f = open(v)
+ for line in f:
+ line = line.strip()
+ if line.startswith("#"):
+ continue
+ excludes.append(line)
+ f.close()
+
+for a in args:
+ includes.append(a.strip())
+
+if not includes:
+ if opts.modules:
+ includes.append("*")
+ else:
+ includes.extend(["qpid.tests.*"])
+
+def is_ignored(path):
+ for p in excludes:
+ if match(path, p):
+ return True
+ return False
+
+def is_included(path):
+ if is_ignored(path):
+ return False
+ for p in includes:
+ if match(path, p):
+ return True
+ return False
+
+def is_smart():
+ return sys.stdout.isatty() and os.environ.get("TERM", "dumb") != "dumb"
+
+try:
+ import fcntl, termios
+
+ def width():
+ if is_smart():
+ s = struct.pack("HHHH", 0, 0, 0, 0)
+ fd_stdout = sys.stdout.fileno()
+ x = fcntl.ioctl(fd_stdout, termios.TIOCGWINSZ, s)
+ rows, cols, xpx, ypx = struct.unpack("HHHH", x)
+ return cols
+ else:
+ try:
+ return int(os.environ.get("COLUMNS", "80"))
+ except ValueError:
+ return 80
+
+ WIDTH = width()
+
+ def resize(sig, frm):
+ global WIDTH
+ WIDTH = width()
+
+ import signal
+ signal.signal(signal.SIGWINCH, resize)
+
+except ImportError:
+ WIDTH = 80
+
+def vt100_attrs(*attrs):
+ return "\x1B[%sm" % ";".join(map(str, attrs))
+
+vt100_reset = vt100_attrs(0)
+
+KEYWORDS = {"pass": (32,),
+ "skip": (33,),
+ "fail": (31,),
+ "start": (34,),
+ "total": (34,),
+ "ignored": (33,),
+ "selected": (34,),
+ "elapsed": (34,),
+ "average": (34,)}
+
+COLORIZE = is_smart()
+
+def colorize_word(word, text=None):
+ if text is None:
+ text = word
+ return colorize(text, *KEYWORDS.get(word, ()))
+
+def colorize(text, *attrs):
+ if attrs and COLORIZE:
+ return "%s%s%s" % (vt100_attrs(*attrs), text, vt100_reset)
+ else:
+ return text
+
+def indent(text):
+ lines = text.split("\n")
+ return " %s" % "\n ".join(lines)
+
+# Write a 'minimal' Junit xml style report file suitable for use by CI tools such as Jenkins.
+class JunitXmlStyleReporter:
+
+ def __init__(self, file):
+ self.f = open(file, "w");
+
+ def begin(self):
+ self.f.write('<?xml version="1.0" encoding="UTF-8" ?>\n')
+ self.f.write('<testsuite>\n')
+
+ def report(self, name, result):
+ parts = name.split(".")
+ method = parts[-1]
+ module = '.'.join(parts[0:-1])
+ self.f.write('<testcase classname="%s" name="%s" time="%f">\n' % (module, method, result.time))
+ if result.failed:
+ self.f.write('<failure>\n')
+ self.f.write('<![CDATA[\n')
+ self.f.write(result.exceptions)
+ self.f.write(']]>\n')
+ self.f.write('</failure>\n')
+ self.f.write('</testcase>\n')
+
+ def end(self):
+ self.f.write('</testsuite>\n')
+ self.f.close()
+
+class Interceptor:
+
+ def __init__(self):
+ self.newline = False
+ self.indent = False
+ self.passthrough = True
+ self.dirty = False
+ self.last = None
+
+ def begin(self):
+ self.newline = True
+ self.indent = True
+ self.passthrough = False
+ self.dirty = False
+ self.last = None
+
+ def reset(self):
+ self.newline = False
+ self.indent = False
+ self.passthrough = True
+
+class StreamWrapper:
+
+ def __init__(self, interceptor, stream, prefix=" "):
+ self.interceptor = interceptor
+ self.stream = stream
+ self.prefix = prefix
+
+ def fileno(self):
+ return self.stream.fileno()
+
+ def isatty(self):
+ return self.stream.isatty()
+
+ def write(self, s):
+ if self.interceptor.passthrough:
+ self.stream.write(s)
+ return
+
+ if s:
+ self.interceptor.dirty = True
+
+ if self.interceptor.newline:
+ self.interceptor.newline = False
+ self.stream.write(" %s\n" % colorize_word("start"))
+ self.interceptor.indent = True
+ if self.interceptor.indent:
+ self.stream.write(self.prefix)
+ if s.endswith("\n"):
+ s = s.replace("\n", "\n%s" % self.prefix)[:-2]
+ self.interceptor.indent = True
+ else:
+ s = s.replace("\n", "\n%s" % self.prefix)
+ self.interceptor.indent = False
+ self.stream.write(s)
+
+ if s:
+ self.interceptor.last = s[-1]
+
+ def flush(self):
+ self.stream.flush()
+
+interceptor = Interceptor()
+
+out_wrp = StreamWrapper(interceptor, sys.stdout)
+err_wrp = StreamWrapper(interceptor, sys.stderr)
+
+out = sys.stdout
+err = sys.stderr
+sys.stdout = out_wrp
+sys.stderr = err_wrp
+
+class PatternFilter(Filter):
+
+ def __init__(self, *patterns):
+ Filter.__init__(self, patterns)
+ self.patterns = patterns
+
+ def filter(self, record):
+ if not self.patterns:
+ return True
+ for p in self.patterns:
+ if match(record.name, p):
+ return True
+ return False
+
+root = getLogger()
+handler = StreamHandler(sys.stdout)
+filter = PatternFilter(*config.log_categories)
+handler.addFilter(filter)
+handler.setFormatter(Formatter("%(asctime)s %(levelname)s %(message)s"))
+root.addHandler(handler)
+root.setLevel(WARN)
+
+log = getLogger("qpid.test")
+
+PASS = "pass"
+SKIP = "skip"
+FAIL = "fail"
+
+class Runner:
+
+ def __init__(self):
+ self.exceptions = []
+ self.skip = False
+
+ def passed(self):
+ return not self.exceptions
+
+ def skipped(self):
+ return self.skip
+
+ def failed(self):
+ return self.exceptions and not self.skip
+
+ def halt(self):
+ return self.exceptions or self.skip
+
+ def run(self, name, phase):
+ try:
+ phase()
+ except KeyboardInterrupt:
+ raise
+ except:
+ exi = sys.exc_info()
+ if issubclass(exi[0], Skipped):
+ self.skip = True
+ self.exceptions.append((name, exi))
+
+ def status(self):
+ if self.passed():
+ return PASS
+ elif self.skipped():
+ return SKIP
+ elif self.failed():
+ return FAIL
+ else:
+ return None
+
+ def get_formatted_exceptions(self):
+ for name, info in self.exceptions:
+ if issubclass(info[0], Skipped):
+ output = indent("".join(traceback.format_exception_only(*info[:2]))).rstrip()
+ else:
+ output = "Error during %s:" % name
+ output += indent("".join(traceback.format_exception(*info))).rstrip()
+ return output
+
+ST_WIDTH = 8
+
+def run_test(name, test, config):
+ patterns = filter.patterns
+ level = root.level
+ filter.patterns = config.log_categories
+ root.setLevel(config.log_level)
+
+ parts = name.split(".")
+ line = None
+ output = ""
+ for part in parts:
+ if line:
+ if len(line) + len(part) >= (WIDTH - ST_WIDTH - 1):
+ output += "%s. \\\n" % line
+ line = " %s" % part
+ else:
+ line = "%s.%s" % (line, part)
+ else:
+ line = part
+
+ if line:
+ output += "%s %s" % (line, (((WIDTH - ST_WIDTH) - len(line))*"."))
+ sys.stdout.write(output)
+ sys.stdout.flush()
+ interceptor.begin()
+ start = time.time()
+ try:
+ runner = test()
+ finally:
+ interceptor.reset()
+ end = time.time()
+ if interceptor.dirty:
+ if interceptor.last != "\n":
+ sys.stdout.write("\n")
+ sys.stdout.write(output)
+ print " %s" % colorize_word(runner.status())
+ if runner.failed() or runner.skipped():
+ print runner.get_formatted_exceptions()
+ root.setLevel(level)
+ filter.patterns = patterns
+ return TestResult(end - start, runner.passed(), runner.skipped(), runner.failed(), runner.get_formatted_exceptions())
+
+class TestResult:
+
+ def __init__(self, time, passed, skipped, failed, exceptions):
+ self.time = time
+ self.passed = passed
+ self.skipped = skipped
+ self.failed = failed
+ self.exceptions = exceptions
+
+class FunctionTest:
+
+ def __init__(self, test):
+ self.test = test
+
+ def name(self):
+ return "%s.%s" % (self.test.__module__, self.test.__name__)
+
+ def run(self):
+ return run_test(self.name(), self._run, config)
+
+ def _run(self):
+ runner = Runner()
+ runner.run("test", lambda: self.test(config))
+ return runner
+
+ def __repr__(self):
+ return "FunctionTest(%r)" % self.test
+
+class MethodTest:
+
+ def __init__(self, cls, method):
+ self.cls = cls
+ self.method = method
+
+ def name(self):
+ return "%s.%s.%s" % (self.cls.__module__, self.cls.__name__, self.method)
+
+ def run(self):
+ return run_test(self.name(), self._run, config)
+
+ def _run(self):
+ runner = Runner()
+ inst = self.cls(self.method)
+ test = getattr(inst, self.method)
+
+ if hasattr(inst, "configure"):
+ runner.run("configure", lambda: inst.configure(config))
+ if runner.halt(): return runner
+ if hasattr(inst, "setUp"):
+ runner.run("setup", inst.setUp)
+ if runner.halt(): return runner
+ elif hasattr(inst, "setup"):
+ runner.run("setup", inst.setup)
+ if runner.halt(): return runner
+
+ runner.run("test", test)
+
+ if hasattr(inst, "tearDown"):
+ runner.run("teardown", inst.tearDown)
+ elif hasattr(inst, "teardown"):
+ runner.run("teardown", inst.teardown)
+
+ return runner
+
+ def __repr__(self):
+ return "MethodTest(%r, %r)" % (self.cls, self.method)
+
+class PatternMatcher:
+
+ def __init__(self, *patterns):
+ self.patterns = patterns
+
+ def matches(self, name):
+ for p in self.patterns:
+ if match(name, p):
+ return True
+ return False
+
+class FunctionScanner(PatternMatcher):
+
+ def inspect(self, obj):
+ return type(obj) == types.FunctionType and self.matches(name)
+
+ def descend(self, func):
+ # the None is required for older versions of python
+ return; yield None
+
+ def extract(self, func):
+ yield FunctionTest(func)
+
+class ClassScanner(PatternMatcher):
+
+ def inspect(self, obj):
+ return type(obj) in (types.ClassType, types.TypeType) and self.matches(obj.__name__)
+
+ def descend(self, cls):
+ # the None is required for older versions of python
+ return; yield None
+
+ def extract(self, cls):
+ names = dir(cls)
+ names.sort()
+ for name in names:
+ obj = getattr(cls, name)
+ t = type(obj)
+ if t == types.MethodType and name.startswith("test"):
+ yield MethodTest(cls, name)
+
+class ModuleScanner:
+
+ def inspect(self, obj):
+ return type(obj) == types.ModuleType
+
+ def descend(self, obj):
+ names = dir(obj)
+ names.sort()
+ for name in names:
+ yield getattr(obj, name)
+
+ def extract(self, obj):
+ # the None is required for older versions of python
+ return; yield None
+
+class Harness:
+
+ def __init__(self):
+ self.scanners = [
+ ModuleScanner(),
+ ClassScanner("*Test", "*Tests", "*TestCase"),
+ FunctionScanner("test_*")
+ ]
+ self.tests = []
+ self.scanned = []
+
+ def scan(self, *roots):
+ objects = list(roots)
+
+ while objects:
+ obj = objects.pop(0)
+ for s in self.scanners:
+ if s.inspect(obj):
+ self.tests.extend(s.extract(obj))
+ for child in s.descend(obj):
+ if not (child in self.scanned or child in objects):
+ objects.append(child)
+ self.scanned.append(obj)
+
+modules = opts.modules
+if not modules:
+ modules.extend(["qpid.tests"])
+h = Harness()
+for name in modules:
+ m = __import__(name, None, None, ["dummy"])
+ h.scan(m)
+
+filtered = [t for t in h.tests if is_included(t.name())]
+ignored = [t for t in h.tests if is_ignored(t.name())]
+total = len(filtered) + len(ignored)
+
+if opts.xml and not list_only:
+ xmlr = JunitXmlStyleReporter(opts.xml);
+ xmlr.begin();
+else:
+ xmlr = None
+
+passed = 0
+failed = 0
+skipped = 0
+start = time.time()
+for t in filtered:
+ if list_only:
+ print t.name()
+ else:
+ st = t.run()
+ if xmlr:
+ xmlr.report(t.name(), st)
+ if st.passed:
+ passed += 1
+ elif st.skipped:
+ skipped += 1
+ elif st.failed:
+ failed += 1
+ if opts.hoe:
+ break
+end = time.time()
+
+run = passed + failed
+
+if not list_only:
+ if passed:
+ _pass = "pass"
+ else:
+ _pass = "fail"
+ if failed:
+ outcome = "fail"
+ else:
+ outcome = "pass"
+ if ignored:
+ ign = "ignored"
+ else:
+ ign = "pass"
+ if skipped:
+ skip = "skip"
+ else:
+ skip = "pass"
+ print colorize("Totals:", 1),
+ totals = [colorize_word("total", "%s tests" % total),
+ colorize_word(_pass, "%s passed" % passed),
+ colorize_word(skip, "%s skipped" % skipped),
+ colorize_word(ign, "%s ignored" % len(ignored)),
+ colorize_word(outcome, "%s failed" % failed)]
+ print ", ".join(totals),
+ if opts.hoe and failed > 0:
+ print " -- (halted after %s)" % run
+ else:
+ print
+ if opts.time and run > 0:
+ print colorize("Timing:", 1),
+ timing = [colorize_word("elapsed", "%.2fs elapsed" % (end - start)),
+ colorize_word("average", "%.2fs average" % ((end - start)/run))]
+ print ", ".join(timing)
+
+if xmlr:
+ xmlr.end()
+
+if failed:
+ sys.exit(1)
+else:
+ sys.exit(0)