summaryrefslogtreecommitdiff
path: root/qpid/python/qpid-python-test
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-06-02 14:24:57 +0000
committerRafael H. Schloming <rhs@apache.org>2009-06-02 14:24:57 +0000
commit3dc19387981b88d9f5eb9a3e78d2ce4266c42b8c (patch)
tree40e22443f9cb713fbfb0de83f590b47b44162295 /qpid/python/qpid-python-test
parent7b4ced5276092feb0a1a5f1f8b8945a683aa017e (diff)
downloadqpid-python-3dc19387981b88d9f5eb9a3e78d2ce4266c42b8c.tar.gz
first commit of new messaging API and test harness
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@781044 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python/qpid-python-test')
-rwxr-xr-xqpid/python/qpid-python-test450
1 files changed, 450 insertions, 0 deletions
diff --git a/qpid/python/qpid-python-test b/qpid/python/qpid-python-test
new file mode 100755
index 0000000000..287637d7a0
--- /dev/null
+++ b/qpid/python/qpid-python-test
@@ -0,0 +1,450 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, os, sys, logging, traceback, types
+from fnmatch import fnmatchcase as match
+from getopt import GetoptError
+from logging import getLogger, StreamHandler, Formatter, Filter, \
+ WARN, DEBUG, ERROR
+from qpid.util import URL
+
+levels = {
+ "DEBUG": DEBUG,
+ "WARN": WARN,
+ "ERROR": ERROR
+ }
+
+sorted_levels = [(v, k) for k, v in levels.items()]
+sorted_levels.sort()
+sorted_levels = [v for k, v in sorted_levels]
+
+parser = optparse.OptionParser(usage="usage: %prog [options] PATTERN ...",
+ description="Run tests matching the specified PATTERNs.")
+parser.add_option("-l", "--list", action="store_true", default=False,
+ help="list tests instead of executing them")
+parser.add_option("-b", "--broker", default="localhost",
+ help="run tests against BROKER (default %default)")
+parser.add_option("-f", "--log-file", metavar="FILE", help="log output to FILE")
+parser.add_option("-v", "--log-level", metavar="LEVEL", default="WARN",
+ help="only display log messages of LEVEL or higher severity: "
+ "%s (default %%default)" % ", ".join(sorted_levels))
+parser.add_option("-c", "--log-category", metavar="CATEGORY", action="append",
+ dest="log_categories", default=[],
+ help="log only categories matching CATEGORY pattern")
+parser.add_option("-i", "--ignore", action="append", default=[],
+ help="ignore tests matching IGNORE pattern")
+parser.add_option("-I", "--ignore-file", metavar="IFILE", action="append",
+ default=[],
+ help="ignore tests matching patterns in IFILE")
+
+class Config:
+
+ def __init__(self):
+ self.broker = URL("localhost")
+ self.work = None
+ self.log_file = None
+ self.log_level = WARN
+ self.log_categories = []
+
+opts, args = parser.parse_args()
+
+includes = []
+excludes = ["*__*__"]
+config = Config()
+list_only = opts.list
+config.broker = URL(opts.broker)
+config.log_file = opts.log_file
+config.log_level = levels[opts.log_level.upper()]
+config.log_categories = opts.log_categories
+excludes.extend([v.strip() for v in opts.ignore])
+for v in opts.ignore_file:
+ f = open(v)
+ for line in f:
+ line = line.strip()
+ if line.startswith("#"):
+ continue
+ excludes.append(line)
+ f.close()
+
+for a in args:
+ includes.append(a.strip())
+
+if not includes:
+ includes.append("*")
+
+def included(path):
+ for p in excludes:
+ if match(path, p):
+ return False
+ for p in includes:
+ if match(path, p):
+ return True
+ return False
+
+def vt100_attrs(*attrs):
+ return "\x1B[%sm" % ";".join(map(str, attrs))
+
+vt100_reset = vt100_attrs(0)
+
+KEYWORDS = {"pass": (32,),
+ "fail": (31,),
+ "start": (34,),
+ "total": (34,)}
+
+COLORIZE = sys.stdout.isatty()
+
+def colorize_word(word, text=None):
+ if text is None:
+ text = word
+ return colorize(text, *KEYWORDS.get(word, ()))
+
+def colorize(text, *attrs):
+ if attrs and COLORIZE:
+ return "%s%s%s" % (vt100_attrs(*attrs), text, vt100_reset)
+ else:
+ return text
+
+def indent(text):
+ lines = text.split("\n")
+ return " %s" % "\n ".join(lines)
+
+from qpid.testlib import testrunner
+testrunner.setBroker(str(config.broker))
+
+class Interceptor:
+
+ def __init__(self):
+ self.newline = False
+ self.indent = False
+ self.passthrough = True
+ self.dirty = False
+ self.last = None
+
+ def begin(self):
+ self.newline = True
+ self.indent = True
+ self.passthrough = False
+ self.dirty = False
+ self.last = None
+
+ def reset(self):
+ self.newline = False
+ self.indent = False
+ self.passthrough = True
+
+class StreamWrapper:
+
+ def __init__(self, interceptor, stream, prefix=" "):
+ self.interceptor = interceptor
+ self.stream = stream
+ self.prefix = prefix
+
+ def isatty(self):
+ return self.stream.isatty()
+
+ def write(self, s):
+ if self.interceptor.passthrough:
+ self.stream.write(s)
+ return
+
+ if s:
+ self.interceptor.dirty = True
+
+ if self.interceptor.newline:
+ self.interceptor.newline = False
+ self.stream.write(" %s\n" % colorize_word("start"))
+ self.interceptor.indent = True
+ if self.interceptor.indent:
+ self.stream.write(self.prefix)
+ if s.endswith("\n"):
+ s = s.replace("\n", "\n%s" % self.prefix)[:-2]
+ self.interceptor.indent = True
+ else:
+ s = s.replace("\n", "\n%s" % self.prefix)
+ self.interceptor.indent = False
+ self.stream.write(s)
+
+ if s:
+ self.interceptor.last = s[-1]
+
+ def flush(self):
+ self.stream.flush()
+
+interceptor = Interceptor()
+
+out_wrp = StreamWrapper(interceptor, sys.stdout)
+err_wrp = StreamWrapper(interceptor, sys.stderr)
+
+out = sys.stdout
+err = sys.stderr
+sys.stdout = out_wrp
+sys.stderr = err_wrp
+
+class PatternFilter(Filter):
+
+ def __init__(self, *patterns):
+ Filter.__init__(self, patterns)
+ self.patterns = patterns
+
+ def filter(self, record):
+ if not self.patterns:
+ return True
+ for p in self.patterns:
+ if match(record.name, p):
+ return True
+ return False
+
+root = getLogger()
+handler = StreamHandler(sys.stdout)
+filter = PatternFilter(*config.log_categories)
+handler.addFilter(filter)
+handler.setFormatter(Formatter("%(asctime)s %(levelname)s %(message)s"))
+root.addHandler(handler)
+root.setLevel(WARN)
+
+log = getLogger("qpid.test")
+
+class Runner:
+
+ def __init__(self):
+ self.exceptions = []
+
+ def passed(self):
+ return not self.exceptions
+
+ def failed(self):
+ return self.exceptions
+
+ def run(self, name, phase):
+ try:
+ phase()
+ except KeyboardInterrupt:
+ raise
+ except:
+ self.exceptions.append((name, sys.exc_info()))
+
+ def status(self):
+ if self.passed():
+ return "pass"
+ else:
+ return "fail"
+
+ def print_exceptions(self):
+ for name, info in self.exceptions:
+ print "Error during %s:" % name
+ print indent("".join(traceback.format_exception(*info))).rstrip()
+
+def run_test(name, test, config):
+ patterns = filter.patterns
+ level = root.level
+ filter.patterns = config.log_categories
+ root.setLevel(config.log_level)
+
+ parts = name.split(".")
+ line = None
+ output = ""
+ for part in parts:
+ if line:
+ if len(line) + len(part) >= 71:
+ output += "%s. \\\n" % line
+ line = " %s" % part
+ else:
+ line = "%s.%s" % (line, part)
+ else:
+ line = part
+
+ if line:
+ output += "%s %s" % (line, ((72 - len(line))*"."))
+ sys.stdout.write(output)
+ sys.stdout.flush()
+ interceptor.begin()
+ try:
+ runner = test()
+ finally:
+ interceptor.reset()
+ if interceptor.dirty:
+ if interceptor.last != "\n":
+ sys.stdout.write("\n")
+ sys.stdout.write(output)
+ print " %s" % colorize_word(runner.status())
+ if runner.failed():
+ runner.print_exceptions()
+ root.setLevel(level)
+ filter.patterns = patterns
+ return runner.passed()
+
+class FunctionTest:
+
+ def __init__(self, test):
+ self.test = test
+
+ def name(self):
+ return "%s.%s" % (self.test.__module__, self.test.__name__)
+
+ def run(self):
+ return run_test(self.name(), self._run, config)
+
+ def _run(self):
+ runner = Runner()
+ runner.run("test", lambda: self.test(config))
+ return runner
+
+ def __repr__(self):
+ return "FunctionTest(%r)" % self.test
+
+class MethodTest:
+
+ def __init__(self, cls, method):
+ self.cls = cls
+ self.method = method
+
+ def name(self):
+ return "%s.%s.%s" % (self.cls.__module__, self.cls.__name__, self.method)
+
+ def run(self):
+ return run_test(self.name(), self._run, config)
+
+ def _run(self):
+ runner = Runner()
+ inst = self.cls(self.method)
+ test = getattr(inst, self.method)
+
+ if hasattr(inst, "configure"):
+ runner.run("configure", lambda: inst.configure(config))
+ if runner.failed(): return runner
+ if hasattr(inst, "setUp"):
+ runner.run("setup", inst.setUp)
+ if runner.failed(): return runner
+ elif hasattr(inst, "setup"):
+ runner.run("setup", inst.setup)
+ if runner.failed(): return runner
+
+ runner.run("test", test)
+
+ if hasattr(inst, "tearDown"):
+ runner.run("teardown", inst.tearDown)
+ elif hasattr(inst, "teardown"):
+ runner.run("teardown", inst.teardown)
+
+ return runner
+
+ def __repr__(self):
+ return "MethodTest(%r, %r)" % (self.cls, self.method)
+
+class PatternMatcher:
+
+ def __init__(self, *patterns):
+ self.patterns = patterns
+
+ def matches(self, name):
+ for p in self.patterns:
+ if match(name, p):
+ return True
+ return False
+
+class FunctionScanner(PatternMatcher):
+
+ def inspect(self, obj):
+ return type(obj) == types.FunctionType and self.matches(name)
+
+ def descend(self, func):
+ return; yield
+
+ def extract(self, func):
+ yield FunctionTest(func)
+
+class ClassScanner(PatternMatcher):
+
+ def inspect(self, obj):
+ return type(obj) in (types.ClassType, types.TypeType) and self.matches(obj.__name__)
+
+ def descend(self, cls):
+ return; yield
+
+ def extract(self, cls):
+ names = dir(cls)
+ names.sort()
+ for name in names:
+ obj = getattr(cls, name)
+ t = type(obj)
+ if t == types.MethodType and name.startswith("test"):
+ yield MethodTest(cls, name)
+
+class ModuleScanner:
+
+ def inspect(self, obj):
+ return type(obj) == types.ModuleType
+
+ def descend(self, obj):
+ names = dir(obj)
+ names.sort()
+ for name in names:
+ yield getattr(obj, name)
+
+ def extract(self, obj):
+ return; yield
+
+class Harness:
+
+ def __init__(self):
+ self.scanners = [
+ ModuleScanner(),
+ ClassScanner("*Test", "*Tests", "*TestCase"),
+ FunctionScanner("test_*")
+ ]
+ self.tests = []
+ self.scanned = []
+
+ def scan(self, *roots):
+ objects = list(roots)
+
+ while objects:
+ obj = objects.pop(0)
+ for s in self.scanners:
+ if s.inspect(obj):
+ self.tests.extend(s.extract(obj))
+ for child in s.descend(obj):
+ if not (child in self.scanned or child in objects):
+ objects.append(child)
+ self.scanned.append(obj)
+
+modules = "qpid.tests", "tests", "tests_0-10"
+h = Harness()
+for name in modules:
+ m = __import__(name, fromlist=["dummy"])
+ h.scan(m)
+
+filtered = [t for t in h.tests if included(t.name())]
+passed = 0
+failed = 0
+for t in filtered:
+ if list_only:
+ print t.name()
+ else:
+ if t.run():
+ passed += 1
+ else:
+ failed += 1
+
+if not list_only:
+ print colorize("Totals:", 1), \
+ colorize_word("total", "%s tests" % len(filtered)) + ",", \
+ colorize_word("pass", "%s passed" % passed) + ",", \
+ colorize_word("fail" if failed else "pass", "%s failed" % failed)