summaryrefslogtreecommitdiff
path: root/qpid/python/qpid/testlib.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python/qpid/testlib.py')
-rw-r--r--qpid/python/qpid/testlib.py327
1 files changed, 61 insertions, 266 deletions
diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py
index 12b2781561..1439b892ea 100644
--- a/qpid/python/qpid/testlib.py
+++ b/qpid/python/qpid/testlib.py
@@ -21,239 +21,13 @@
# Support library for qpid python tests.
#
-import sys, re, unittest, os, random, logging, traceback
-import qpid.client, qpid.spec, qmf.console
+import unittest, traceback, socket
+import qpid.client, qmf.console
import Queue
-from fnmatch import fnmatch
-from getopt import getopt, GetoptError
from qpid.content import Content
from qpid.message import Message
-
-#0-10 support
-from qpid.connection import Connection
-from qpid.spec010 import load
-from qpid.util import connect, ssl, URL
-
-def findmodules(root):
- """Find potential python modules under directory root"""
- found = []
- for dirpath, subdirs, files in os.walk(root):
- modpath = dirpath.replace(os.sep, '.')
- if not re.match(r'\.svn$', dirpath): # Avoid SVN directories
- for f in files:
- match = re.match(r'(.+)\.py$', f)
- if match and f != '__init__.py':
- found.append('.'.join([modpath, match.group(1)]))
- return found
-
-def default(value, default):
- if (value == None): return default
- else: return value
-
-class TestRunner:
-
- SPEC_FOLDER = "../specs"
- qpidd = os.getenv("QPIDD")
-
- """Runs unit tests.
-
- Parses command line arguments, provides utility functions for tests,
- runs the selected test suite.
- """
-
- def _die(self, message = None):
- if message: print message
- print """
-run-tests [options] [test*]
-The name of a test is package.module.ClassName.testMethod
-Options:
- -?/-h/--help : this message
- -s/--spec <spec.xml> : URL of AMQP XML specification or one of these abbreviations:
- 0-8 - use the default 0-8 specification.
- 0-9 - use the default 0-9 specification.
- 0-10-errata - use the 0-10 specification with qpid errata.
- -e/--errata <errata.xml> : file containing amqp XML errata
- -b/--broker [amqps://][<user>[/<password>]@]<host>[:<port>] : broker to connect to
- -B/--start-broker <broker-args> : start a local broker using broker-args; set QPIDD
- env to point to broker executable. broker-args will be
- prepended with "--daemon --port=0"
- -v/--verbose : verbose - lists tests as they are run.
- -d/--debug : enable debug logging.
- -i/--ignore <test> : ignore the named test.
- -I/--ignore-file : file containing patterns to ignore.
- -S/--skip-self-test : skips the client self tests in the 'tests folder'
- -F/--spec-folder : folder that contains the specs to be loaded
- """
- sys.exit(1)
-
- def startBroker(self, brokerArgs):
- """Start a single broker daemon"""
- if TestRunner.qpidd == None:
- self._die("QPIDD environment var must point to qpidd when using -B/--start-broker")
- cmd = "%s --daemon --port=0 %s" % (TestRunner.qpidd, brokerArgs)
- portStr = os.popen(cmd).read()
- if len(portStr) == 0:
- self._die("%s failed to start" % TestRunner.qpidd)
- port = int(portStr)
- pid = int(os.popen("%s -p %d -c" % (TestRunner.qpidd, port)).read())
- print "Started broker: pid=%d, port=%d" % (pid, port)
- self.brokerTuple = (pid, port)
- self.setBroker("localhost:%d" % port)
-
- def stopBroker(self):
- """Stop the broker using qpidd -q"""
- if self.brokerTuple:
- ret = os.spawnl(os.P_WAIT, TestRunner.qpidd, TestRunner.qpidd, "--port=%d" % self.brokerTuple[1], "-q")
- if ret != 0:
- self._die("stop_node(): pid=%d port=%d: qpidd -q returned %d" % (self.brokerTuple[0], self.brokerTuple[1], ret))
- print "Stopped broker: pid=%d, port=%d" % self.brokerTuple
-
- def killBroker(self):
- """Kill the broker using kill -9 (SIGTERM)"""
- if self.brokerTuple:
- os.kill(self.brokerTuple[0], signal.SIGTERM)
- print "Killed broker: pid=%d, port=%d" % self.brokerTuple
-
- def setBroker(self, broker):
- try:
- self.url = URL(broker)
- except ValueError:
- self._die("'%s' is not a valid broker" % (broker))
- self.user = default(self.url.user, "guest")
- self.password = default(self.url.password, "guest")
- self.host = self.url.host
- if self.url.scheme == URL.AMQPS:
- self.ssl = True
- default_port = 5671
- else:
- self.ssl = False
- default_port = 5672
- self.port = default(self.url.port, default_port)
-
- def ignoreFile(self, filename):
- f = file(filename)
- for line in f.readlines(): self.ignore.append(line.strip())
- f.close()
-
- def use08spec(self):
- "True if we are running with the old 0-8 spec."
- # NB: AMQP 0-8 identifies itself as 8-0 for historical reasons.
- return self.spec.major==8 and self.spec.minor==0
-
- def use09spec(self):
- "True if we are running with the 0-9 (non-wip) spec."
- return self.spec.major==0 and self.spec.minor==9
-
- def _parseargs(self, args):
- # Defaults
- self.setBroker("localhost")
- self.verbose = 1
- self.ignore = []
- self.specfile = "0-8"
- self.errata = []
- self.skip_self_test = False
-
- try:
- opts, self.tests = getopt(args, "s:e:b:B:h?dvSi:I:F:",
- ["help", "spec", "errata=", "broker=",
- "start-broker=", "verbose", "skip-self-test", "ignore",
- "ignore-file", "spec-folder"])
- except GetoptError, e:
- self._die(str(e))
- # check for mutually exclusive options
- if "-B" in opts or "--start-broker" in opts:
- if "-b" in opts or "--broker" in opts:
- self._die("Cannot use -B/--start-broker and -b/broker options together")
- for opt, value in opts:
- if opt in ("-?", "-h", "--help"): self._die()
- if opt in ("-s", "--spec"): self.specfile = value
- if opt in ("-e", "--errata"): self.errata.append(value)
- if opt in ("-b", "--broker"): self.setBroker(value)
- if opt in ("-B", "--start-broker"): self.startBroker(value)
- if opt in ("-v", "--verbose"): self.verbose = 2
- if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG)
- if opt in ("-i", "--ignore"): self.ignore.append(value)
- if opt in ("-I", "--ignore-file"): self.ignoreFile(value)
- if opt in ("-S", "--skip-self-test"): self.skip_self_test = True
- if opt in ("-F", "--spec-folder"): TestRunner.SPEC_FOLDER = value
-
- # Abbreviations for default settings.
- if (self.specfile == "0-10"):
- self.spec = load(self.get_spec_file("amqp.0-10.xml"))
- elif (self.specfile == "0-10-errata"):
- self.spec = load(self.get_spec_file("amqp.0-10-qpid-errata.xml"))
- else:
- if (self.specfile == "0-8"):
- self.specfile = self.get_spec_file("amqp.0-8.xml")
- elif (self.specfile == "0-9"):
- self.specfile = self.get_spec_file("amqp.0-9.xml")
- self.errata.append(self.get_spec_file("amqp-errata.0-9.xml"))
-
- if (self.specfile == None):
- self._die("No XML specification provided")
- print "Using specification from:", self.specfile
-
- self.spec = qpid.spec.load(self.specfile, *self.errata)
-
- if len(self.tests) == 0:
- if not self.skip_self_test:
- self.tests=findmodules("tests")
- if self.use08spec() or self.use09spec():
- self.tests+=findmodules("tests_0-8")
- elif (self.spec.major == 99 and self.spec.minor == 0):
- self.tests+=findmodules("tests_0-10_preview")
- elif (self.spec.major == 0 and self.spec.minor == 10):
- self.tests+=findmodules("tests_0-10")
-
- def testSuite(self):
- class IgnoringTestSuite(unittest.TestSuite):
- def addTest(self, test):
- if isinstance(test, unittest.TestCase):
- for pattern in testrunner.ignore:
- if fnmatch(test.id(), pattern):
- return
- unittest.TestSuite.addTest(self, test)
-
- # Use our IgnoringTestSuite in the test loader.
- unittest.TestLoader.suiteClass = IgnoringTestSuite
- return unittest.defaultTestLoader.loadTestsFromNames(self.tests)
-
- def run(self, args=sys.argv[1:]):
- self.brokerTuple = None
- self._parseargs(args)
- runner = unittest.TextTestRunner(descriptions=False,
- verbosity=self.verbose)
- result = runner.run(self.testSuite())
-
- if (self.ignore):
- print "======================================="
- print "NOTE: the following tests were ignored:"
- for t in self.ignore: print t
- print "======================================="
-
- self.stopBroker()
- return result.wasSuccessful()
-
- def connect(self, host=None, port=None, spec=None, user=None, password=None, tune_params=None):
- """Connect to the broker, returns a qpid.client.Client"""
- host = host or self.host
- port = port or self.port
- spec = spec or self.spec
- user = user or self.user
- password = password or self.password
- client = qpid.client.Client(host, port, spec)
- if self.use08spec():
- client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params)
- else:
- client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params)
- return client
-
- def get_spec_file(self, fname):
- return TestRunner.SPEC_FOLDER + os.sep + fname
-
-# Global instance for tests to call connect.
-testrunner = TestRunner()
-
+from qpid.harness import Skipped
+from qpid.exceptions import VersionError
class TestBase(unittest.TestCase):
"""Base class for Qpid test cases.
@@ -267,6 +41,9 @@ class TestBase(unittest.TestCase):
resources to clean up later.
"""
+ def configure(self, config):
+ self.config = config
+
def setUp(self):
self.queues = []
self.exchanges = []
@@ -293,9 +70,26 @@ class TestBase(unittest.TestCase):
else:
self.client.close()
- def connect(self, *args, **keys):
+ def connect(self, host=None, port=None, user=None, password=None, tune_params=None):
"""Create a new connction, return the Client object"""
- return testrunner.connect(*args, **keys)
+ host = host or self.config.broker.host
+ port = port or self.config.broker.port or 5672
+ user = user or "guest"
+ password = password or "guest"
+ client = qpid.client.Client(host, port)
+ try:
+ if client.spec.major == 8 and client.spec.minor == 0:
+ client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params)
+ else:
+ client.start("\x00" + user + "\x00" + password, mechanism="PLAIN", tune_params=tune_params)
+ except qpid.client.Closed, e:
+ if isinstance(e.args[0], VersionError):
+ raise Skipped(e.args[0])
+ else:
+ raise e
+ except socket.error, e:
+ raise Skipped(e)
+ return client
def queue_declare(self, channel=None, *args, **keys):
channel = channel or self.channel
@@ -319,17 +113,8 @@ class TestBase(unittest.TestCase):
def consume(self, queueName):
"""Consume from named queue returns the Queue object."""
- if testrunner.use08spec() or testrunner.use09spec():
- reply = self.channel.basic_consume(queue=queueName, no_ack=True)
- return self.client.queue(reply.consumer_tag)
- else:
- if not "uniqueTag" in dir(self): self.uniqueTag = 1
- else: self.uniqueTag += 1
- consumer_tag = "tag" + str(self.uniqueTag)
- self.channel.message_subscribe(queue=queueName, destination=consumer_tag)
- self.channel.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL)
- self.channel.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL)
- return self.client.queue(consumer_tag)
+ reply = self.channel.basic_consume(queue=queueName, no_ack=True)
+ return self.client.queue(reply.consumer_tag)
def subscribe(self, channel=None, **keys):
channel = channel or self.channel
@@ -350,24 +135,14 @@ class TestBase(unittest.TestCase):
Publish to exchange and assert queue.get() returns the same message.
"""
body = self.uniqueString()
- if testrunner.use08spec() or testrunner.use09spec():
- self.channel.basic_publish(
- exchange=exchange,
- content=Content(body, properties=properties),
- routing_key=routing_key)
- else:
- self.channel.message_transfer(
- destination=exchange,
- content=Content(body, properties={'application_headers':properties,'routing_key':routing_key}))
+ self.channel.basic_publish(
+ exchange=exchange,
+ content=Content(body, properties=properties),
+ routing_key=routing_key)
msg = queue.get(timeout=1)
- if testrunner.use08spec() or testrunner.use09spec():
- self.assertEqual(body, msg.content.body)
- if (properties):
- self.assertEqual(properties, msg.content.properties)
- else:
- self.assertEqual(body, msg.content.body)
- if (properties):
- self.assertEqual(properties, msg.content['application_headers'])
+ self.assertEqual(body, msg.content.body)
+ if (properties):
+ self.assertEqual(properties, msg.content.properties)
def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None):
"""
@@ -394,11 +169,20 @@ class TestBase(unittest.TestCase):
self.assertEqual("close", message.method.name)
self.assertEqual(expectedCode, message.reply_code)
+#0-10 support
+from qpid.connection import Connection
+from qpid.util import connect, ssl, URL
+
class TestBase010(unittest.TestCase):
"""
Base class for Qpid test cases. using the final 0-10 spec
"""
+ def configure(self, config):
+ self.config = config
+ self.broker = config.broker
+ self.defines = self.config.defines
+
def setUp(self):
self.conn = self.connect()
self.session = self.conn.session("test-session", timeout=10)
@@ -406,15 +190,26 @@ class TestBase010(unittest.TestCase):
def startQmf(self, handler=None):
self.qmf = qmf.console.Session(handler)
- self.qmf_broker = self.qmf.addBroker(str(testrunner.url))
+ self.qmf_broker = self.qmf.addBroker(str(self.broker))
def connect(self, host=None, port=None):
- sock = connect(host or testrunner.host, port or testrunner.port)
- if testrunner.url.scheme == URL.AMQPS:
+ url = self.broker
+ if url.scheme == URL.AMQPS:
+ default_port = 5671
+ else:
+ default_port = 5672
+ try:
+ sock = connect(host or url.host, port or url.port or default_port)
+ except socket.error, e:
+ raise Skipped(e)
+ if url.scheme == URL.AMQPS:
sock = ssl(sock)
- conn = Connection(sock, username=testrunner.user,
- password=testrunner.password)
- conn.start(timeout=10)
+ conn = Connection(sock, username=url.user or "guest",
+ password=url.password or "guest")
+ try:
+ conn.start(timeout=10)
+ except VersionError, e:
+ raise Skipped(e)
return conn
def tearDown(self):