summaryrefslogtreecommitdiff
path: root/python/qpid/testlib.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/testlib.py')
-rw-r--r--python/qpid/testlib.py221
1 files changed, 221 insertions, 0 deletions
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
new file mode 100644
index 0000000000..ff9ecbee8a
--- /dev/null
+++ b/python/qpid/testlib.py
@@ -0,0 +1,221 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Support library for qpid python tests.
+#
+
+import sys, re, unittest, os, random, logging
+import qpid.client, qpid.spec
+from getopt import getopt, GetoptError
+
+
+def findmodules(root):
+ """Find potential python modules under directory root"""
+ found = []
+ for dirpath, subdirs, files in os.walk(root):
+ modpath = dirpath.replace(os.sep, '.')
+ if not re.match(r'\.svn$', dirpath): # Avoid SVN directories
+ for f in files:
+ match = re.match(r'(.+)\.py$', f)
+ if match and f != '__init__.py':
+ found.append('.'.join([modpath, match.group(1)]))
+ return found
+
+def default(value, default):
+ if (value == None): return default
+ else: return value
+
+class TestRunner:
+ """Runs unit tests.
+
+ Parses command line arguments, provides utility functions for tests,
+ runs the selected test suite.
+ """
+
+ def _die(self, message = None):
+ if message: print message
+ print """
+run-tests [options] [test*]
+The name of a test is package.module.ClassName.testMethod
+Options:
+ -?/-h/--help : this message
+ -s/--spec <spec.xml> : file containing amqp XML spec
+ -b/--broker [<user>[/<password>]@]<host>[:<port>] : broker to connect to
+ -v/--verbose : verbose - lists tests as they are run.
+ -d/--debug : enable debug logging.
+ -i/--ignore <test> : ignore the named test.
+ -I/--ignore-file : file containing patterns to ignore.
+ """
+ sys.exit(1)
+
+ def setBroker(self, broker):
+ rex = re.compile(r"""
+ # [ <user> [ / <password> ] @] <host> [ :<port> ]
+ ^ (?: ([^/]*) (?: / ([^@]*) )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X)
+ match = rex.match(broker)
+ if not match: self._die("'%s' is not a valid broker" % (broker))
+ self.user, self.password, self.host, self.port = match.groups()
+ self.port = int(default(self.port, 5672))
+ self.user = default(self.user, "guest")
+ self.password = default(self.password, "guest")
+
+ def __init__(self):
+ # Defaults
+ self.setBroker("localhost")
+ self.spec = "../specs/amqp-8.0.xml"
+ self.verbose = 1
+ self.ignore = []
+
+ def ignoreFile(self, filename):
+ f = file(filename)
+ for line in f.readlines(): self.ignore.append(line.strip())
+ f.close()
+
+ def _parseargs(self, args):
+ try:
+ opts, self.tests = getopt(args, "s:b:h?dvi:I:", ["help", "spec", "server", "verbose", "ignore", "ignore-file"])
+ except GetoptError, e:
+ self._die(str(e))
+ for opt, value in opts:
+ if opt in ("-?", "-h", "--help"): self._die()
+ if opt in ("-s", "--spec"): self.spec = value
+ if opt in ("-b", "--broker"): self.setBroker(value)
+ if opt in ("-v", "--verbose"): self.verbose = 2
+ if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG)
+ if opt in ("-i", "--ignore"): self.ignore.append(value)
+ if opt in ("-I", "--ignore-file"): self.ignoreFile(value)
+
+ if len(self.tests) == 0: self.tests=findmodules("tests")
+
+ def testSuite(self):
+ class IgnoringTestSuite(unittest.TestSuite):
+ def addTest(self, test):
+ if isinstance(test, unittest.TestCase) and test.id() in testrunner.ignore:
+ return
+ unittest.TestSuite.addTest(self, test)
+
+ # Use our IgnoringTestSuite in the test loader.
+ unittest.TestLoader.suiteClass = IgnoringTestSuite
+ return unittest.defaultTestLoader.loadTestsFromNames(self.tests)
+
+ def run(self, args=sys.argv[1:]):
+ self._parseargs(args)
+ runner = unittest.TextTestRunner(descriptions=False,
+ verbosity=self.verbose)
+ result = runner.run(self.testSuite())
+ if (self.ignore):
+ print "======================================="
+ print "NOTE: the following tests were ignored:"
+ for t in self.ignore: print t
+ print "======================================="
+ return result.wasSuccessful()
+
+ def connect(self, host=None, port=None, spec=None, user=None, password=None):
+ """Connect to the broker, returns a qpid.client.Client"""
+ host = host or self.host
+ port = port or self.port
+ spec = spec or self.spec
+ user = user or self.user
+ password = password or self.password
+ client = qpid.client.Client(host, port, qpid.spec.load(spec))
+ client.start({"LOGIN": user, "PASSWORD": password})
+ return client
+
+
+# Global instance for tests to call connect.
+testrunner = TestRunner()
+
+
+class TestBase(unittest.TestCase):
+ """Base class for Qpid test cases.
+
+ self.client is automatically connected with channel 1 open before
+ the test methods are run.
+
+ Deletes queues and exchanges after. Tests call
+ self.queue_declare(channel, ...) and self.exchange_declare(chanel,
+ ...) which are wrappers for the Channel functions that note
+ resources to clean up later.
+ """
+
+ def setUp(self):
+ self.queues = []
+ self.exchanges = []
+ self.client = self.connect()
+ self.channel = self.client.channel(1)
+ self.channel.channel_open()
+
+ def tearDown(self):
+ # TODO aconway 2006-09-05: Wrong behaviour here, we should
+ # close all open channels (checking for exceptions on the
+ # channesl) then open a channel to clean up qs and exs,
+ # finally close that channel.
+ for ch, q in self.queues:
+ ch.queue_delete(queue=q)
+ for ch, ex in self.exchanges:
+ ch.exchange_delete(exchange=ex)
+
+ def connect(self, *args, **keys):
+ """Create a new connction, return the Client object"""
+ return testrunner.connect(*args, **keys)
+
+ def queue_declare(self, channel=None, *args, **keys):
+ channel = channel or self.channel
+ reply = channel.queue_declare(*args, **keys)
+ self.queues.append((channel, reply.queue))
+ return reply
+
+ def exchange_declare(self, channel=None, ticket=0, exchange='',
+ type='', passive=False, durable=False,
+ auto_delete=False, internal=False, nowait=False,
+ arguments={}):
+ channel = channel or self.channel
+ reply = channel.exchange_declare(ticket, exchange, type, passive, durable, auto_delete, internal, nowait, arguments)
+ # TODO aconway 2006-09-14: Don't add exchange on failure.
+ self.exchanges.append((channel,exchange))
+ return reply
+
+ def assertPublishConsume(self, queue="", exchange="", routing_key=""):
+ """
+ Publish a message and consume it, assert it comes back intact.
+
+ queue can be a single queue name or a list of queue names.
+ For a list assert the message appears on all queues.
+ Crude attempt to make unique messages so we can't consume
+ a message not really meant for us.
+ """
+ body = "TestMessage("+str(random.randint(999999, 1000000))+")"
+ self.channel.basic_publish(exchange=exchange,
+ content=qpid.content.Content(body),
+ routing_key=routing_key)
+ if not isinstance(queue, list): queue = [queue]
+ for q in queue:
+ reply = self.channel.basic_consume(queue=q, no_ack=True)
+ msg = self.client.queue(reply.consumer_tag).get(timeout=2)
+ self.assertEqual(body, msg.content.body)
+
+
+ def assertChannelException(self, expectedCode, message):
+ self.assertEqual(message.method.klass.name, "channel")
+ self.assertEqual(message.method.name, "close")
+ self.assertEqual(message.reply_code, expectedCode)
+
+
+ def assertConnectionException(self, expectedCode, message):
+ self.assertEqual(message.method.klass.name, "connection")
+ self.assertEqual(message.method.name, "close")
+ self.assertEqual(message.reply_code, expectedCode)
+