diff options
Diffstat (limited to 'Final/python/qpid/testlib.py')
-rw-r--r-- | Final/python/qpid/testlib.py | 237 |
1 files changed, 237 insertions, 0 deletions
diff --git a/Final/python/qpid/testlib.py b/Final/python/qpid/testlib.py new file mode 100644 index 0000000000..39bad75b86 --- /dev/null +++ b/Final/python/qpid/testlib.py @@ -0,0 +1,237 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# Support library for qpid python tests. +# + +import sys, re, unittest, os, random, logging +import qpid.client, qpid.spec +import Queue +from getopt import getopt, GetoptError +from qpid.content import Content + +def findmodules(root): + """Find potential python modules under directory root""" + found = [] + for dirpath, subdirs, files in os.walk(root): + modpath = dirpath.replace(os.sep, '.') + if not re.match(r'\.svn$', dirpath): # Avoid SVN directories + for f in files: + match = re.match(r'(.+)\.py$', f) + if match and f != '__init__.py': + found.append('.'.join([modpath, match.group(1)])) + return found + +def default(value, default): + if (value == None): return default + else: return value + +class TestRunner: + """Runs unit tests. + + Parses command line arguments, provides utility functions for tests, + runs the selected test suite. + """ + + def _die(self, message = None): + if message: print message + print """ +run-tests [options] [test*] +The name of a test is package.module.ClassName.testMethod +Options: + -?/-h/--help : this message + -s/--spec <spec.xml> : file containing amqp XML spec + -b/--broker [<user>[/<password>]@]<host>[:<port>] : broker to connect to + -v/--verbose : verbose - lists tests as they are run. + -d/--debug : enable debug logging. + -i/--ignore <test> : ignore the named test. + -I/--ignore-file : file containing patterns to ignore. + """ + sys.exit(1) + + def setBroker(self, broker): + rex = re.compile(r""" + # [ <user> [ / <password> ] @] <host> [ :<port> ] + ^ (?: ([^/]*) (?: / ([^@]*) )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X) + match = rex.match(broker) + if not match: self._die("'%s' is not a valid broker" % (broker)) + self.user, self.password, self.host, self.port = match.groups() + self.port = int(default(self.port, 5672)) + self.user = default(self.user, "guest") + self.password = default(self.password, "guest") + + def __init__(self): + # Defaults + self.setBroker("localhost") + self.spec = "../specs/amqp.0-8.xml" + self.verbose = 1 + self.ignore = [] + + def ignoreFile(self, filename): + f = file(filename) + for line in f.readlines(): self.ignore.append(line.strip()) + f.close() + + def _parseargs(self, args): + try: + opts, self.tests = getopt(args, "s:b:h?dvi:I:", ["help", "spec", "server", "verbose", "ignore", "ignore-file"]) + except GetoptError, e: + self._die(str(e)) + for opt, value in opts: + if opt in ("-?", "-h", "--help"): self._die() + if opt in ("-s", "--spec"): self.spec = value + if opt in ("-b", "--broker"): self.setBroker(value) + if opt in ("-v", "--verbose"): self.verbose = 2 + if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG) + if opt in ("-i", "--ignore"): self.ignore.append(value) + if opt in ("-I", "--ignore-file"): self.ignoreFile(value) + + if len(self.tests) == 0: self.tests=findmodules("tests") + + def testSuite(self): + class IgnoringTestSuite(unittest.TestSuite): + def addTest(self, test): + if isinstance(test, unittest.TestCase) and test.id() in testrunner.ignore: + return + unittest.TestSuite.addTest(self, test) + + # Use our IgnoringTestSuite in the test loader. + unittest.TestLoader.suiteClass = IgnoringTestSuite + return unittest.defaultTestLoader.loadTestsFromNames(self.tests) + + def run(self, args=sys.argv[1:]): + self._parseargs(args) + runner = unittest.TextTestRunner(descriptions=False, + verbosity=self.verbose) + result = runner.run(self.testSuite()) + if (self.ignore): + print "=======================================" + print "NOTE: the following tests were ignored:" + for t in self.ignore: print t + print "=======================================" + return result.wasSuccessful() + + def connect(self, host=None, port=None, spec=None, user=None, password=None): + """Connect to the broker, returns a qpid.client.Client""" + host = host or self.host + port = port or self.port + spec = spec or self.spec + user = user or self.user + password = password or self.password + client = qpid.client.Client(host, port, qpid.spec.load(spec)) + client.start({"LOGIN": user, "PASSWORD": password}) + return client + + +# Global instance for tests to call connect. +testrunner = TestRunner() + + +class TestBase(unittest.TestCase): + """Base class for Qpid test cases. + + self.client is automatically connected with channel 1 open before + the test methods are run. + + Deletes queues and exchanges after. Tests call + self.queue_declare(channel, ...) and self.exchange_declare(chanel, + ...) which are wrappers for the Channel functions that note + resources to clean up later. + """ + + def setUp(self): + self.queues = [] + self.exchanges = [] + self.client = self.connect() + self.channel = self.client.channel(1) + self.channel.channel_open() + + def tearDown(self): + for ch, q in self.queues: + ch.queue_delete(queue=q) + for ch, ex in self.exchanges: + ch.exchange_delete(exchange=ex) + + def connect(self, *args, **keys): + """Create a new connction, return the Client object""" + return testrunner.connect(*args, **keys) + + def queue_declare(self, channel=None, *args, **keys): + channel = channel or self.channel + reply = channel.queue_declare(*args, **keys) + self.queues.append((channel, reply.queue)) + return reply + + def exchange_declare(self, channel=None, ticket=0, exchange='', + type='', passive=False, durable=False, + auto_delete=False, internal=False, nowait=False, + arguments={}): + channel = channel or self.channel + reply = channel.exchange_declare(ticket, exchange, type, passive, durable, auto_delete, internal, nowait, arguments) + self.exchanges.append((channel,exchange)) + return reply + + def uniqueString(self): + """Generate a unique string, unique for this TestBase instance""" + if not "uniqueCounter" in dir(self): self.uniqueCounter = 1; + return "Test Message " + str(self.uniqueCounter) + + def consume(self, queueName): + """Consume from named queue returns the Queue object.""" + reply = self.channel.basic_consume(queue=queueName, no_ack=True) + return self.client.queue(reply.consumer_tag) + + def assertEmpty(self, queue): + """Assert that the queue is empty""" + try: + queue.get(timeout=1) + self.fail("Queue is not empty.") + except Queue.Empty: None # Ignore + + def assertPublishGet(self, queue, exchange="", routing_key="", properties=None): + """ + Publish to exchange and assert queue.get() returns the same message. + """ + body = self.uniqueString() + self.channel.basic_publish(exchange=exchange, + content=Content(body, properties=properties), + routing_key=routing_key) + msg = queue.get(timeout=1) + self.assertEqual(body, msg.content.body) + if (properties): self.assertEqual(properties, msg.content.properties) + + def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None): + """ + Publish a message and consume it, assert it comes back intact. + Return the Queue object used to consume. + """ + self.assertPublishGet(self.consume(queue), exchange, routing_key, properties) + + def assertChannelException(self, expectedCode, message): + self.assertEqual("channel", message.method.klass.name) + self.assertEqual("close", message.method.name) + self.assertEqual(expectedCode, message.reply_code) + + + def assertConnectionException(self, expectedCode, message): + self.assertEqual("connection", message.method.klass.name) + self.assertEqual("close", message.method.name) + self.assertEqual(expectedCode, message.reply_code) + |