diff options
Diffstat (limited to 'qpid/python/qpid/testlib.py')
-rw-r--r-- | qpid/python/qpid/testlib.py | 327 |
1 files changed, 61 insertions, 266 deletions
diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py index 12b2781561..1439b892ea 100644 --- a/qpid/python/qpid/testlib.py +++ b/qpid/python/qpid/testlib.py @@ -21,239 +21,13 @@ # Support library for qpid python tests. # -import sys, re, unittest, os, random, logging, traceback -import qpid.client, qpid.spec, qmf.console +import unittest, traceback, socket +import qpid.client, qmf.console import Queue -from fnmatch import fnmatch -from getopt import getopt, GetoptError from qpid.content import Content from qpid.message import Message - -#0-10 support -from qpid.connection import Connection -from qpid.spec010 import load -from qpid.util import connect, ssl, URL - -def findmodules(root): - """Find potential python modules under directory root""" - found = [] - for dirpath, subdirs, files in os.walk(root): - modpath = dirpath.replace(os.sep, '.') - if not re.match(r'\.svn$', dirpath): # Avoid SVN directories - for f in files: - match = re.match(r'(.+)\.py$', f) - if match and f != '__init__.py': - found.append('.'.join([modpath, match.group(1)])) - return found - -def default(value, default): - if (value == None): return default - else: return value - -class TestRunner: - - SPEC_FOLDER = "../specs" - qpidd = os.getenv("QPIDD") - - """Runs unit tests. - - Parses command line arguments, provides utility functions for tests, - runs the selected test suite. - """ - - def _die(self, message = None): - if message: print message - print """ -run-tests [options] [test*] -The name of a test is package.module.ClassName.testMethod -Options: - -?/-h/--help : this message - -s/--spec <spec.xml> : URL of AMQP XML specification or one of these abbreviations: - 0-8 - use the default 0-8 specification. - 0-9 - use the default 0-9 specification. - 0-10-errata - use the 0-10 specification with qpid errata. - -e/--errata <errata.xml> : file containing amqp XML errata - -b/--broker [amqps://][<user>[/<password>]@]<host>[:<port>] : broker to connect to - -B/--start-broker <broker-args> : start a local broker using broker-args; set QPIDD - env to point to broker executable. broker-args will be - prepended with "--daemon --port=0" - -v/--verbose : verbose - lists tests as they are run. - -d/--debug : enable debug logging. - -i/--ignore <test> : ignore the named test. - -I/--ignore-file : file containing patterns to ignore. - -S/--skip-self-test : skips the client self tests in the 'tests folder' - -F/--spec-folder : folder that contains the specs to be loaded - """ - sys.exit(1) - - def startBroker(self, brokerArgs): - """Start a single broker daemon""" - if TestRunner.qpidd == None: - self._die("QPIDD environment var must point to qpidd when using -B/--start-broker") - cmd = "%s --daemon --port=0 %s" % (TestRunner.qpidd, brokerArgs) - portStr = os.popen(cmd).read() - if len(portStr) == 0: - self._die("%s failed to start" % TestRunner.qpidd) - port = int(portStr) - pid = int(os.popen("%s -p %d -c" % (TestRunner.qpidd, port)).read()) - print "Started broker: pid=%d, port=%d" % (pid, port) - self.brokerTuple = (pid, port) - self.setBroker("localhost:%d" % port) - - def stopBroker(self): - """Stop the broker using qpidd -q""" - if self.brokerTuple: - ret = os.spawnl(os.P_WAIT, TestRunner.qpidd, TestRunner.qpidd, "--port=%d" % self.brokerTuple[1], "-q") - if ret != 0: - self._die("stop_node(): pid=%d port=%d: qpidd -q returned %d" % (self.brokerTuple[0], self.brokerTuple[1], ret)) - print "Stopped broker: pid=%d, port=%d" % self.brokerTuple - - def killBroker(self): - """Kill the broker using kill -9 (SIGTERM)""" - if self.brokerTuple: - os.kill(self.brokerTuple[0], signal.SIGTERM) - print "Killed broker: pid=%d, port=%d" % self.brokerTuple - - def setBroker(self, broker): - try: - self.url = URL(broker) - except ValueError: - self._die("'%s' is not a valid broker" % (broker)) - self.user = default(self.url.user, "guest") - self.password = default(self.url.password, "guest") - self.host = self.url.host - if self.url.scheme == URL.AMQPS: - self.ssl = True - default_port = 5671 - else: - self.ssl = False - default_port = 5672 - self.port = default(self.url.port, default_port) - - def ignoreFile(self, filename): - f = file(filename) - for line in f.readlines(): self.ignore.append(line.strip()) - f.close() - - def use08spec(self): - "True if we are running with the old 0-8 spec." - # NB: AMQP 0-8 identifies itself as 8-0 for historical reasons. - return self.spec.major==8 and self.spec.minor==0 - - def use09spec(self): - "True if we are running with the 0-9 (non-wip) spec." - return self.spec.major==0 and self.spec.minor==9 - - def _parseargs(self, args): - # Defaults - self.setBroker("localhost") - self.verbose = 1 - self.ignore = [] - self.specfile = "0-8" - self.errata = [] - self.skip_self_test = False - - try: - opts, self.tests = getopt(args, "s:e:b:B:h?dvSi:I:F:", - ["help", "spec", "errata=", "broker=", - "start-broker=", "verbose", "skip-self-test", "ignore", - "ignore-file", "spec-folder"]) - except GetoptError, e: - self._die(str(e)) - # check for mutually exclusive options - if "-B" in opts or "--start-broker" in opts: - if "-b" in opts or "--broker" in opts: - self._die("Cannot use -B/--start-broker and -b/broker options together") - for opt, value in opts: - if opt in ("-?", "-h", "--help"): self._die() - if opt in ("-s", "--spec"): self.specfile = value - if opt in ("-e", "--errata"): self.errata.append(value) - if opt in ("-b", "--broker"): self.setBroker(value) - if opt in ("-B", "--start-broker"): self.startBroker(value) - if opt in ("-v", "--verbose"): self.verbose = 2 - if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG) - if opt in ("-i", "--ignore"): self.ignore.append(value) - if opt in ("-I", "--ignore-file"): self.ignoreFile(value) - if opt in ("-S", "--skip-self-test"): self.skip_self_test = True - if opt in ("-F", "--spec-folder"): TestRunner.SPEC_FOLDER = value - - # Abbreviations for default settings. - if (self.specfile == "0-10"): - self.spec = load(self.get_spec_file("amqp.0-10.xml")) - elif (self.specfile == "0-10-errata"): - self.spec = load(self.get_spec_file("amqp.0-10-qpid-errata.xml")) - else: - if (self.specfile == "0-8"): - self.specfile = self.get_spec_file("amqp.0-8.xml") - elif (self.specfile == "0-9"): - self.specfile = self.get_spec_file("amqp.0-9.xml") - self.errata.append(self.get_spec_file("amqp-errata.0-9.xml")) - - if (self.specfile == None): - self._die("No XML specification provided") - print "Using specification from:", self.specfile - - self.spec = qpid.spec.load(self.specfile, *self.errata) - - if len(self.tests) == 0: - if not self.skip_self_test: - self.tests=findmodules("tests") - if self.use08spec() or self.use09spec(): - self.tests+=findmodules("tests_0-8") - elif (self.spec.major == 99 and self.spec.minor == 0): - self.tests+=findmodules("tests_0-10_preview") - elif (self.spec.major == 0 and self.spec.minor == 10): - self.tests+=findmodules("tests_0-10") - - def testSuite(self): - class IgnoringTestSuite(unittest.TestSuite): - def addTest(self, test): - if isinstance(test, unittest.TestCase): - for pattern in testrunner.ignore: - if fnmatch(test.id(), pattern): - return - unittest.TestSuite.addTest(self, test) - - # Use our IgnoringTestSuite in the test loader. - unittest.TestLoader.suiteClass = IgnoringTestSuite - return unittest.defaultTestLoader.loadTestsFromNames(self.tests) - - def run(self, args=sys.argv[1:]): - self.brokerTuple = None - self._parseargs(args) - runner = unittest.TextTestRunner(descriptions=False, - verbosity=self.verbose) - result = runner.run(self.testSuite()) - - if (self.ignore): - print "=======================================" - print "NOTE: the following tests were ignored:" - for t in self.ignore: print t - print "=======================================" - - self.stopBroker() - return result.wasSuccessful() - - def connect(self, host=None, port=None, spec=None, user=None, password=None, tune_params=None): - """Connect to the broker, returns a qpid.client.Client""" - host = host or self.host - port = port or self.port - spec = spec or self.spec - user = user or self.user - password = password or self.password - client = qpid.client.Client(host, port, spec) - if self.use08spec(): - client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params) - else: - client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params) - return client - - def get_spec_file(self, fname): - return TestRunner.SPEC_FOLDER + os.sep + fname - -# Global instance for tests to call connect. -testrunner = TestRunner() - +from qpid.harness import Skipped +from qpid.exceptions import VersionError class TestBase(unittest.TestCase): """Base class for Qpid test cases. @@ -267,6 +41,9 @@ class TestBase(unittest.TestCase): resources to clean up later. """ + def configure(self, config): + self.config = config + def setUp(self): self.queues = [] self.exchanges = [] @@ -293,9 +70,26 @@ class TestBase(unittest.TestCase): else: self.client.close() - def connect(self, *args, **keys): + def connect(self, host=None, port=None, user=None, password=None, tune_params=None): """Create a new connction, return the Client object""" - return testrunner.connect(*args, **keys) + host = host or self.config.broker.host + port = port or self.config.broker.port or 5672 + user = user or "guest" + password = password or "guest" + client = qpid.client.Client(host, port) + try: + if client.spec.major == 8 and client.spec.minor == 0: + client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params) + else: + client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params) + except qpid.client.Closed, e: + if isinstance(e.args[0], VersionError): + raise Skipped(e.args[0]) + else: + raise e + except socket.error, e: + raise Skipped(e) + return client def queue_declare(self, channel=None, *args, **keys): channel = channel or self.channel @@ -319,17 +113,8 @@ class TestBase(unittest.TestCase): def consume(self, queueName): """Consume from named queue returns the Queue object.""" - if testrunner.use08spec() or testrunner.use09spec(): - reply = self.channel.basic_consume(queue=queueName, no_ack=True) - return self.client.queue(reply.consumer_tag) - else: - if not "uniqueTag" in dir(self): self.uniqueTag = 1 - else: self.uniqueTag += 1 - consumer_tag = "tag" + str(self.uniqueTag) - self.channel.message_subscribe(queue=queueName, destination=consumer_tag) - self.channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL) - self.channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL) - return self.client.queue(consumer_tag) + reply = self.channel.basic_consume(queue=queueName, no_ack=True) + return self.client.queue(reply.consumer_tag) def subscribe(self, channel=None, **keys): channel = channel or self.channel @@ -350,24 +135,14 @@ class TestBase(unittest.TestCase): Publish to exchange and assert queue.get() returns the same message. """ body = self.uniqueString() - if testrunner.use08spec() or testrunner.use09spec(): - self.channel.basic_publish( - exchange=exchange, - content=Content(body, properties=properties), - routing_key=routing_key) - else: - self.channel.message_transfer( - destination=exchange, - content=Content(body, properties={'application_headers':properties,'routing_key':routing_key})) + self.channel.basic_publish( + exchange=exchange, + content=Content(body, properties=properties), + routing_key=routing_key) msg = queue.get(timeout=1) - if testrunner.use08spec() or testrunner.use09spec(): - self.assertEqual(body, msg.content.body) - if (properties): - self.assertEqual(properties, msg.content.properties) - else: - self.assertEqual(body, msg.content.body) - if (properties): - self.assertEqual(properties, msg.content['application_headers']) + self.assertEqual(body, msg.content.body) + if (properties): + self.assertEqual(properties, msg.content.properties) def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None): """ @@ -394,11 +169,20 @@ class TestBase(unittest.TestCase): self.assertEqual("close", message.method.name) self.assertEqual(expectedCode, message.reply_code) +#0-10 support +from qpid.connection import Connection +from qpid.util import connect, ssl, URL + class TestBase010(unittest.TestCase): """ Base class for Qpid test cases. using the final 0-10 spec """ + def configure(self, config): + self.config = config + self.broker = config.broker + self.defines = self.config.defines + def setUp(self): self.conn = self.connect() self.session = self.conn.session("test-session", timeout=10) @@ -406,15 +190,26 @@ class TestBase010(unittest.TestCase): def startQmf(self, handler=None): self.qmf = qmf.console.Session(handler) - self.qmf_broker = self.qmf.addBroker(str(testrunner.url)) + self.qmf_broker = self.qmf.addBroker(str(self.broker)) def connect(self, host=None, port=None): - sock = connect(host or testrunner.host, port or testrunner.port) - if testrunner.url.scheme == URL.AMQPS: + url = self.broker + if url.scheme == URL.AMQPS: + default_port = 5671 + else: + default_port = 5672 + try: + sock = connect(host or url.host, port or url.port or default_port) + except socket.error, e: + raise Skipped(e) + if url.scheme == URL.AMQPS: sock = ssl(sock) - conn = Connection(sock, username=testrunner.user, - password=testrunner.password) - conn.start(timeout=10) + conn = Connection(sock, username=url.user or "guest", + password=url.password or "guest") + try: + conn.start(timeout=10) + except VersionError, e: + raise Skipped(e) return conn def tearDown(self): |