# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # # Support library for qpid python tests. # import sys, re, unittest, os, random, logging, traceback import qpid.client, qpid.spec, qmf.console import Queue from fnmatch import fnmatch from getopt import getopt, GetoptError from qpid.content import Content from qpid.message import Message #0-10 support from qpid.connection import Connection from qpid.spec010 import load from qpid.util import connect, ssl, URL def findmodules(root): """Find potential python modules under directory root""" found = [] for dirpath, subdirs, files in os.walk(root): modpath = dirpath.replace(os.sep, '.') if not re.match(r'\.svn$', dirpath): # Avoid SVN directories for f in files: match = re.match(r'(.+)\.py$', f) if match and f != '__init__.py': found.append('.'.join([modpath, match.group(1)])) return found def default(value, default): if (value == None): return default else: return value class TestRunner: SPEC_FOLDER = "../specs" qpidd = os.getenv("QPIDD") """Runs unit tests. Parses command line arguments, provides utility functions for tests, runs the selected test suite. """ def _die(self, message = None): if message: print message print """ run-tests [options] [test*] The name of a test is package.module.ClassName.testMethod Options: -?/-h/--help : this message -s/--spec : URL of AMQP XML specification or one of these abbreviations: 0-8 - use the default 0-8 specification. 0-9 - use the default 0-9 specification. 0-10-errata - use the 0-10 specification with qpid errata. -e/--errata : file containing amqp XML errata -b/--broker [amqps://][[/]@][:] : broker to connect to -B/--start-broker : start a local broker using broker-args; set QPIDD env to point to broker executable. broker-args will be prepended with "--daemon --port=0" -v/--verbose : verbose - lists tests as they are run. -d/--debug : enable debug logging. -i/--ignore : ignore the named test. -I/--ignore-file : file containing patterns to ignore. -S/--skip-self-test : skips the client self tests in the 'tests folder' -F/--spec-folder : folder that contains the specs to be loaded """ sys.exit(1) def startBroker(self, brokerArgs): """Start a single broker daemon""" if TestRunner.qpidd == None: self._die("QPIDD environment var must point to qpidd when using -B/--start-broker") cmd = "%s --daemon --port=0 %s" % (TestRunner.qpidd, brokerArgs) portStr = os.popen(cmd).read() if len(portStr) == 0: self._die("%s failed to start" % TestRunner.qpidd) port = int(portStr) pid = int(os.popen("%s -p %d -c" % (TestRunner.qpidd, port)).read()) print "Started broker: pid=%d, port=%d" % (pid, port) self.brokerTuple = (pid, port) self.setBroker("localhost:%d" % port) def stopBroker(self): """Stop the broker using qpidd -q""" if self.brokerTuple: ret = os.spawnl(os.P_WAIT, TestRunner.qpidd, TestRunner.qpidd, "--port=%d" % self.brokerTuple[1], "-q") if ret != 0: self._die("stop_node(): pid=%d port=%d: qpidd -q returned %d" % (self.brokerTuple[0], self.brokerTuple[1], ret)) print "Stopped broker: pid=%d, port=%d" % self.brokerTuple def killBroker(self): """Kill the broker using kill -9 (SIGTERM)""" if self.brokerTuple: os.kill(self.brokerTuple[0], signal.SIGTERM) print "Killed broker: pid=%d, port=%d" % self.brokerTuple def setBroker(self, broker): try: self.url = URL(broker) except ValueError: self._die("'%s' is not a valid broker" % (broker)) self.user = default(self.url.user, "guest") self.password = default(self.url.password, "guest") self.host = self.url.host if self.url.scheme == URL.AMQPS: self.ssl = True default_port = 5671 else: self.ssl = False default_port = 5672 self.port = default(self.url.port, default_port) def ignoreFile(self, filename): f = file(filename) for line in f.readlines(): self.ignore.append(line.strip()) f.close() def use08spec(self): "True if we are running with the old 0-8 spec." # NB: AMQP 0-8 identifies itself as 8-0 for historical reasons. return self.spec.major==8 and self.spec.minor==0 def use09spec(self): "True if we are running with the 0-9 (non-wip) spec." return self.spec.major==0 and self.spec.minor==9 def _parseargs(self, args): # Defaults self.setBroker("localhost") self.verbose = 1 self.ignore = [] self.specfile = "0-8" self.errata = [] self.skip_self_test = False try: opts, self.tests = getopt(args, "s:e:b:B:h?dvSi:I:F:", ["help", "spec", "errata=", "broker=", "start-broker=", "verbose", "skip-self-test", "ignore", "ignore-file", "spec-folder"]) except GetoptError, e: self._die(str(e)) # check for mutually exclusive options if "-B" in opts or "--start-broker" in opts: if "-b" in opts or "--broker" in opts: self._die("Cannot use -B/--start-broker and -b/broker options together") for opt, value in opts: if opt in ("-?", "-h", "--help"): self._die() if opt in ("-s", "--spec"): self.specfile = value if opt in ("-e", "--errata"): self.errata.append(value) if opt in ("-b", "--broker"): self.setBroker(value) if opt in ("-B", "--start-broker"): self.startBroker(value) if opt in ("-v", "--verbose"): self.verbose = 2 if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG) if opt in ("-i", "--ignore"): self.ignore.append(value) if opt in ("-I", "--ignore-file"): self.ignoreFile(value) if opt in ("-S", "--skip-self-test"): self.skip_self_test = True if opt in ("-F", "--spec-folder"): TestRunner.SPEC_FOLDER = value # Abbreviations for default settings. if (self.specfile == "0-10"): self.spec = load(self.get_spec_file("amqp.0-10.xml")) elif (self.specfile == "0-10-errata"): self.spec = load(self.get_spec_file("amqp.0-10-qpid-errata.xml")) else: if (self.specfile == "0-8"): self.specfile = self.get_spec_file("amqp.0-8.xml") elif (self.specfile == "0-9"): self.specfile = self.get_spec_file("amqp.0-9.xml") self.errata.append(self.get_spec_file("amqp-errata.0-9.xml")) if (self.specfile == None): self._die("No XML specification provided") print "Using specification from:", self.specfile self.spec = qpid.spec.load(self.specfile, *self.errata) if len(self.tests) == 0: if not self.skip_self_test: self.tests=findmodules("tests") if self.use08spec() or self.use09spec(): self.tests+=findmodules("tests_0-8") elif (self.spec.major == 99 and self.spec.minor == 0): self.tests+=findmodules("tests_0-10_preview") elif (self.spec.major == 0 and self.spec.minor == 10): self.tests+=findmodules("tests_0-10") def testSuite(self): class IgnoringTestSuite(unittest.TestSuite): def addTest(self, test): if isinstance(test, unittest.TestCase): for pattern in testrunner.ignore: if fnmatch(test.id(), pattern): return unittest.TestSuite.addTest(self, test) # Use our IgnoringTestSuite in the test loader. unittest.TestLoader.suiteClass = IgnoringTestSuite return unittest.defaultTestLoader.loadTestsFromNames(self.tests) def run(self, args=sys.argv[1:]): self.brokerTuple = None self._parseargs(args) runner = unittest.TextTestRunner(descriptions=False, verbosity=self.verbose) result = runner.run(self.testSuite()) if (self.ignore): print "=======================================" print "NOTE: the following tests were ignored:" for t in self.ignore: print t print "=======================================" self.stopBroker() return result.wasSuccessful() def connect(self, host=None, port=None, spec=None, user=None, password=None, tune_params=None): """Connect to the broker, returns a qpid.client.Client""" host = host or self.host port = port or self.port spec = spec or self.spec user = user or self.user password = password or self.password client = qpid.client.Client(host, port, spec) if self.use08spec(): client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params) else: client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params) return client def get_spec_file(self, fname): return TestRunner.SPEC_FOLDER + os.sep + fname # Global instance for tests to call connect. testrunner = TestRunner() class TestBase(unittest.TestCase): """Base class for Qpid test cases. self.client is automatically connected with channel 1 open before the test methods are run. Deletes queues and exchanges after. Tests call self.queue_declare(channel, ...) and self.exchange_declare(chanel, ...) which are wrappers for the Channel functions that note resources to clean up later. """ def setUp(self): self.queues = [] self.exchanges = [] self.client = self.connect() self.channel = self.client.channel(1) self.version = (self.client.spec.major, self.client.spec.minor) if self.version == (8, 0) or self.version == (0, 9): self.channel.channel_open() else: self.channel.session_open() def tearDown(self): try: for ch, q in self.queues: ch.queue_delete(queue=q) for ch, ex in self.exchanges: ch.exchange_delete(exchange=ex) except: print "Error on tearDown:" print traceback.print_exc() if not self.client.closed: self.client.channel(0).connection_close(reply_code=200) else: self.client.close() def connect(self, *args, **keys): """Create a new connction, return the Client object""" return testrunner.connect(*args, **keys) def queue_declare(self, channel=None, *args, **keys): channel = channel or self.channel reply = channel.queue_declare(*args, **keys) self.queues.append((channel, keys["queue"])) return reply def exchange_declare(self, channel=None, ticket=0, exchange='', type='', passive=False, durable=False, auto_delete=False, arguments={}): channel = channel or self.channel reply = channel.exchange_declare(ticket=ticket, exchange=exchange, type=type, passive=passive,durable=durable, auto_delete=auto_delete, arguments=arguments) self.exchanges.append((channel,exchange)) return reply def uniqueString(self): """Generate a unique string, unique for this TestBase instance""" if not "uniqueCounter" in dir(self): self.uniqueCounter = 1; return "Test Message " + str(self.uniqueCounter) def consume(self, queueName): """Consume from named queue returns the Queue object.""" if testrunner.use08spec() or testrunner.use09spec(): reply = self.channel.basic_consume(queue=queueName, no_ack=True) return self.client.queue(reply.consumer_tag) else: if not "uniqueTag" in dir(self): self.uniqueTag = 1 else: self.uniqueTag += 1 consumer_tag = "tag" + str(self.uniqueTag) self.channel.message_subscribe(queue=queueName, destination=consumer_tag) self.channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL) self.channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL) return self.client.queue(consumer_tag) def subscribe(self, channel=None, **keys): channel = channel or self.channel consumer_tag = keys["destination"] channel.message_subscribe(**keys) channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL) channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL) def assertEmpty(self, queue): """Assert that the queue is empty""" try: queue.get(timeout=1) self.fail("Queue is not empty.") except Queue.Empty: None # Ignore def assertPublishGet(self, queue, exchange="", routing_key="", properties=None): """ Publish to exchange and assert queue.get() returns the same message. """ body = self.uniqueString() if testrunner.use08spec() or testrunner.use09spec(): self.channel.basic_publish( exchange=exchange, content=Content(body, properties=properties), routing_key=routing_key) else: self.channel.message_transfer( destination=exchange, content=Content(body, properties={'application_headers':properties,'routing_key':routing_key})) msg = queue.get(timeout=1) if testrunner.use08spec() or testrunner.use09spec(): self.assertEqual(body, msg.content.body) if (properties): self.assertEqual(properties, msg.content.properties) else: self.assertEqual(body, msg.content.body) if (properties): self.assertEqual(properties, msg.content['application_headers']) def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None): """ Publish a message and consume it, assert it comes back intact. Return the Queue object used to consume. """ self.assertPublishGet(self.consume(queue), exchange, routing_key, properties) def assertChannelException(self, expectedCode, message): if self.version == (8, 0) or self.version == (0, 9): if not isinstance(message, Message): self.fail("expected channel_close method, got %s" % (message)) self.assertEqual("channel", message.method.klass.name) self.assertEqual("close", message.method.name) else: if not isinstance(message, Message): self.fail("expected session_closed method, got %s" % (message)) self.assertEqual("session", message.method.klass.name) self.assertEqual("closed", message.method.name) self.assertEqual(expectedCode, message.reply_code) def assertConnectionException(self, expectedCode, message): if not isinstance(message, Message): self.fail("expected connection_close method, got %s" % (message)) self.assertEqual("connection", message.method.klass.name) self.assertEqual("close", message.method.name) self.assertEqual(expectedCode, message.reply_code) class TestBase010(unittest.TestCase): """ Base class for Qpid test cases. using the final 0-10 spec """ def setUp(self): self.conn = self.connect() self.session = self.conn.session("test-session", timeout=10) self.qmf = None def startQmf(self, handler=None): self.qmf = qmf.console.Session(handler) self.qmf_broker = self.qmf.addBroker(str(testrunner.url)) def connect(self, host=None, port=None): sock = connect(host or testrunner.host, port or testrunner.port) if testrunner.url.scheme == URL.AMQPS: sock = ssl(sock) conn = Connection(sock, username=testrunner.user, password=testrunner.password) conn.start(timeout=10) return conn def tearDown(self): if not self.session.error(): self.session.close(timeout=10) self.conn.close(timeout=10) if self.qmf: self.qmf.delBroker(self.qmf_broker) def subscribe(self, session=None, **keys): session = session or self.session consumer_tag = keys["destination"] session.message_subscribe(**keys) session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL) session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL)