diff options
Diffstat (limited to 'python/qpid/testlib.py')
-rw-r--r-- | python/qpid/testlib.py | 221 |
1 files changed, 221 insertions, 0 deletions
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py new file mode 100644 index 0000000000..ff9ecbee8a --- /dev/null +++ b/python/qpid/testlib.py @@ -0,0 +1,221 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Support library for qpid python tests. +# + +import sys, re, unittest, os, random, logging +import qpid.client, qpid.spec +from getopt import getopt, GetoptError + + +def findmodules(root): + """Find potential python modules under directory root""" + found = [] + for dirpath, subdirs, files in os.walk(root): + modpath = dirpath.replace(os.sep, '.') + if not re.match(r'\.svn$', dirpath): # Avoid SVN directories + for f in files: + match = re.match(r'(.+)\.py$', f) + if match and f != '__init__.py': + found.append('.'.join([modpath, match.group(1)])) + return found + +def default(value, default): + if (value == None): return default + else: return value + +class TestRunner: + """Runs unit tests. + + Parses command line arguments, provides utility functions for tests, + runs the selected test suite. + """ + + def _die(self, message = None): + if message: print message + print """ +run-tests [options] [test*] +The name of a test is package.module.ClassName.testMethod +Options: + -?/-h/--help : this message + -s/--spec <spec.xml> : file containing amqp XML spec + -b/--broker [<user>[/<password>]@]<host>[:<port>] : broker to connect to + -v/--verbose : verbose - lists tests as they are run. + -d/--debug : enable debug logging. + -i/--ignore <test> : ignore the named test. + -I/--ignore-file : file containing patterns to ignore. + """ + sys.exit(1) + + def setBroker(self, broker): + rex = re.compile(r""" + # [ <user> [ / <password> ] @] <host> [ :<port> ] + ^ (?: ([^/]*) (?: / ([^@]*) )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X) + match = rex.match(broker) + if not match: self._die("'%s' is not a valid broker" % (broker)) + self.user, self.password, self.host, self.port = match.groups() + self.port = int(default(self.port, 5672)) + self.user = default(self.user, "guest") + self.password = default(self.password, "guest") + + def __init__(self): + # Defaults + self.setBroker("localhost") + self.spec = "../specs/amqp-8.0.xml" + self.verbose = 1 + self.ignore = [] + + def ignoreFile(self, filename): + f = file(filename) + for line in f.readlines(): self.ignore.append(line.strip()) + f.close() + + def _parseargs(self, args): + try: + opts, self.tests = getopt(args, "s:b:h?dvi:I:", ["help", "spec", "server", "verbose", "ignore", "ignore-file"]) + except GetoptError, e: + self._die(str(e)) + for opt, value in opts: + if opt in ("-?", "-h", "--help"): self._die() + if opt in ("-s", "--spec"): self.spec = value + if opt in ("-b", "--broker"): self.setBroker(value) + if opt in ("-v", "--verbose"): self.verbose = 2 + if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG) + if opt in ("-i", "--ignore"): self.ignore.append(value) + if opt in ("-I", "--ignore-file"): self.ignoreFile(value) + + if len(self.tests) == 0: self.tests=findmodules("tests") + + def testSuite(self): + class IgnoringTestSuite(unittest.TestSuite): + def addTest(self, test): + if isinstance(test, unittest.TestCase) and test.id() in testrunner.ignore: + return + unittest.TestSuite.addTest(self, test) + + # Use our IgnoringTestSuite in the test loader. + unittest.TestLoader.suiteClass = IgnoringTestSuite + return unittest.defaultTestLoader.loadTestsFromNames(self.tests) + + def run(self, args=sys.argv[1:]): + self._parseargs(args) + runner = unittest.TextTestRunner(descriptions=False, + verbosity=self.verbose) + result = runner.run(self.testSuite()) + if (self.ignore): + print "=======================================" + print "NOTE: the following tests were ignored:" + for t in self.ignore: print t + print "=======================================" + return result.wasSuccessful() + + def connect(self, host=None, port=None, spec=None, user=None, password=None): + """Connect to the broker, returns a qpid.client.Client""" + host = host or self.host + port = port or self.port + spec = spec or self.spec + user = user or self.user + password = password or self.password + client = qpid.client.Client(host, port, qpid.spec.load(spec)) + client.start({"LOGIN": user, "PASSWORD": password}) + return client + + +# Global instance for tests to call connect. +testrunner = TestRunner() + + +class TestBase(unittest.TestCase): + """Base class for Qpid test cases. + + self.client is automatically connected with channel 1 open before + the test methods are run. + + Deletes queues and exchanges after. Tests call + self.queue_declare(channel, ...) and self.exchange_declare(chanel, + ...) which are wrappers for the Channel functions that note + resources to clean up later. + """ + + def setUp(self): + self.queues = [] + self.exchanges = [] + self.client = self.connect() + self.channel = self.client.channel(1) + self.channel.channel_open() + + def tearDown(self): + # TODO aconway 2006-09-05: Wrong behaviour here, we should + # close all open channels (checking for exceptions on the + # channesl) then open a channel to clean up qs and exs, + # finally close that channel. + for ch, q in self.queues: + ch.queue_delete(queue=q) + for ch, ex in self.exchanges: + ch.exchange_delete(exchange=ex) + + def connect(self, *args, **keys): + """Create a new connction, return the Client object""" + return testrunner.connect(*args, **keys) + + def queue_declare(self, channel=None, *args, **keys): + channel = channel or self.channel + reply = channel.queue_declare(*args, **keys) + self.queues.append((channel, reply.queue)) + return reply + + def exchange_declare(self, channel=None, ticket=0, exchange='', + type='', passive=False, durable=False, + auto_delete=False, internal=False, nowait=False, + arguments={}): + channel = channel or self.channel + reply = channel.exchange_declare(ticket, exchange, type, passive, durable, auto_delete, internal, nowait, arguments) + # TODO aconway 2006-09-14: Don't add exchange on failure. + self.exchanges.append((channel,exchange)) + return reply + + def assertPublishConsume(self, queue="", exchange="", routing_key=""): + """ + Publish a message and consume it, assert it comes back intact. + + queue can be a single queue name or a list of queue names. + For a list assert the message appears on all queues. + Crude attempt to make unique messages so we can't consume + a message not really meant for us. + """ + body = "TestMessage("+str(random.randint(999999, 1000000))+")" + self.channel.basic_publish(exchange=exchange, + content=qpid.content.Content(body), + routing_key=routing_key) + if not isinstance(queue, list): queue = [queue] + for q in queue: + reply = self.channel.basic_consume(queue=q, no_ack=True) + msg = self.client.queue(reply.consumer_tag).get(timeout=2) + self.assertEqual(body, msg.content.body) + + + def assertChannelException(self, expectedCode, message): + self.assertEqual(message.method.klass.name, "channel") + self.assertEqual(message.method.name, "close") + self.assertEqual(message.reply_code, expectedCode) + + + def assertConnectionException(self, expectedCode, message): + self.assertEqual(message.method.klass.name, "connection") + self.assertEqual(message.method.name, "close") + self.assertEqual(message.reply_code, expectedCode) + |