summaryrefslogtreecommitdiff
path: root/cpp/src/tests/testlib.py
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/testlib.py')
-rw-r--r--cpp/src/tests/testlib.py382
1 files changed, 322 insertions, 60 deletions
diff --git a/cpp/src/tests/testlib.py b/cpp/src/tests/testlib.py
index 1c1d1bf407..07c4794767 100644
--- a/cpp/src/tests/testlib.py
+++ b/cpp/src/tests/testlib.py
@@ -21,7 +21,7 @@
# Support library for qpid python tests.
#
-import os, signal, subprocess, unittest
+import os, re, signal, subprocess, unittest
class TestBase(unittest.TestCase):
"""
@@ -32,8 +32,8 @@ class TestBase(unittest.TestCase):
The following environment vars control if and how the test is run, and determine where many of the helper
executables/libs are to be found.
"""
- _storeEnable = os.getenv("STORE_ENABLE") != None # Must be True for durability to be enabled during the test
_storeLib = os.getenv("STORE_LIB")
+ _storeEnable = _storeLib != None # Must be True for durability to be enabled during the test
_qpiddExec = os.getenv("QPIDD_EXEC", "/usr/sbin/qpidd")
_tempStoreDir = os.path.abspath(os.getenv("TMP_STORE_DIR", "/tmp/qpid"))
@@ -55,22 +55,7 @@ class TestBase(unittest.TestCase):
return " --%s yes" % key
else:
return " --%s no" % key
-
- def _paramNum(self, key, val):
- if val != None:
- return " --%s %d" % (key, val)
- return ""
-
- def _paramString(self, key, val):
- if val != None:
- return " --%s %s" % (key, val)
- return ""
-
- def _paramStringList(self, key, valList, val):
- if val in valList:
- return " --%s %s" % (key, val)
- return ""
-
+
# --- Helper functions for message creation ---
def _makeMessage(self, msgSize):
@@ -115,17 +100,37 @@ class TestBase(unittest.TestCase):
#print "started broker: pid=%d, port=%d args: %s" % (pid, port, qpiddArgs)
return (pid, port)
- def killBroker(self, pid):
+ def killBroker(self, nodeTuple, ignoreFailures = False):
"""Kill a broker using kill -9"""
- os.kill(pid, signal.SIGTERM)
- #print "killed broker: pid=%d" % pid
+ try:
+ os.kill(nodeTuple[self.PID], signal.SIGKILL)
+ try:
+ os.waitpid(nodeTuple[self.PID], 0)
+ except:
+ pass
+ #print "killed broker: port=%d pid=%d" % (nodeTuple[self.PORT], nodeTuple[self.PID])
+ except:
+ if ignoreFailures:
+ print "WARNING: killBroker (port=%d pid=%d) failed - ignoring." % (nodeTuple[self.PORT], nodeTuple[self.PID])
+ else:
+ raise
- def stopBroker(self, port):
+ def stopBroker(self, nodeTuple, ignoreFailures = False):
"""Stop a broker using qpidd -q"""
- ret = os.spawnl(os.P_WAIT, self._qpiddExec, self._qpiddExec, "--port=%d" % port, "-q")
- if ret != 0:
- raise Exception("stopBroker(): port=%d: qpidd -q returned %d" % (port, ret))
- #print "stopped broker: port=%d" % port
+ try:
+ ret = os.spawnl(os.P_WAIT, self._qpiddExec, self._qpiddExec, "--port=%d" % nodeTuple[self.PORT], "--quit")
+ if ret != 0:
+ raise Exception("stopBroker(): port=%d: qpidd -q returned %d" % (port, ret))
+ try:
+ os.waitpid(nodeTuple[self.PID], 0)
+ except:
+ pass
+ #print "stopped broker: port=%d pid=%d" % (nodeTuple[self.PORT], nodeTuple[self.PID])
+ except:
+ if ignoreFailures:
+ print "WARNING: stopBroker (port=%d pid=%d) failed - ignoring." % (nodeTuple[self.PORT], nodeTuple[self.PID])
+ else:
+ raise
@@ -138,8 +143,10 @@ class TestBaseCluster(TestBase):
The following environment vars control if and how the test is run, and determine where many of the helper
executables/libs are to be found.
"""
- _runClusterTests = os.getenv("RUN_CLUSTER_TESTS") != None # Must be True for these cluster tests to run
_clusterLib = os.getenv("CLUSTER_LIB")
+ _clusterTestEnable = _clusterLib != None # Must be True for these cluster tests to run
+ _xmlLib = os.getenv("XML_LIB")
+ _xmlEnable = _xmlLib != None
_qpidConfigExec = os.getenv("QPID_CONFIG_EXEC", "/usr/bin/qpid-config")
_qpidRouteExec = os.getenv("QPID_ROUTE_EXEC", "/usr/bin/qpid-route")
_receiverExec = os.getenv("RECEIVER_EXEC", "/usr/libexec/qpid/test/receiver")
@@ -164,10 +171,19 @@ class TestBaseCluster(TestBase):
def run(self, res):
""" Skip cluster testing if env var RUN_CLUSTER_TESTS is not defined."""
- if not self._runClusterTests:
+ if not self._clusterTestEnable:
return
unittest.TestCase.run(self, res)
+ # --- Private helper / convenience functions ---
+
+ def _checkPids(self, clusterName = None):
+ for pid, port in self.getTupleList():
+ try:
+ os.kill(pid, 0)
+ except:
+ raise Exception("_checkPids(): Broker with pid %d expected but does not exist! (crashed?)" % pid)
+
# --- Starting cluster node(s) ---
@@ -196,22 +212,25 @@ class TestBaseCluster(TestBase):
# --- Cluster and node status ---
- def getTupleList(self):
+ def getTupleList(self, clusterName = None):
"""Get list of (pid, port) tuples of all known cluster brokers"""
tList = []
- for l in self._clusterDict.itervalues():
- for t in l.itervalues():
- tList.append(t)
+ for c, l in self._clusterDict.iteritems():
+ if clusterName == None or c == clusterName:
+ for t in l.itervalues():
+ tList.append(t)
return tList
def getNumBrokers(self):
"""Get total number of brokers in all known clusters"""
return len(self.getTupleList())
- def checkNumBrokers(self, expected):
+ def checkNumBrokers(self, expected = None, checkPids = True):
"""Check that the total number of brokers in all known clusters is the expected value"""
- if self.getNumBrokers() != expected:
+ if expected != None and self.getNumBrokers() != expected:
raise Exception("Unexpected number of brokers: expected %d, found %d" % (expected, self.getNumBrokers()))
+ if checkPids:
+ self._checkPids()
def getClusterTupleList(self, clusterName):
"""Get list of (pid, port) tuples of all nodes in named cluster"""
@@ -227,11 +246,13 @@ class TestBaseCluster(TestBase):
"""Get the (pid, port) tuple for the given cluster node"""
return self._clusterDict[clusterName][nodeNumber]
- def checkNumClusterBrokers(self, clusterName, expected):
+ def checkNumClusterBrokers(self, clusterName, expected = None, checkPids = True):
"""Check that the total number of brokers in the named cluster is the expected value"""
- if self.getNumClusterBrokers(clusterName) != expected:
+ if expected != None and self.getNumClusterBrokers(clusterName) != expected:
raise Exception("Unexpected number of brokers in cluster %s: expected %d, found %d" % \
(clusterName, expected, self.getNumClusterBrokers(clusterName)))
+ if checkPids:
+ self._checkPids(clusterName)
def clusterExists(self, clusterName):
""" Return True if clusterName exists, False otherwise"""
@@ -250,16 +271,16 @@ class TestBaseCluster(TestBase):
# --- Kill cluster nodes using signal 9 ---
- def killNode(self, nodeNumber, clusterName, updateDict = True):
+ def killNode(self, nodeNumber, clusterName, updateDict = True, ignoreFailures = False):
"""Kill the given node in the named cluster using kill -9"""
- self.killBroker(self.getNodeTuple(nodeNumber, clusterName)[self.PID])
+ self.killBroker(self.getNodeTuple(nodeNumber, clusterName), ignoreFailures)
if updateDict:
del(self._clusterDict[clusterName][nodeNumber])
- def killCluster(self, clusterName, updateDict = True):
+ def killCluster(self, clusterName, updateDict = True, ignoreFailures = False):
"""Kill all nodes in the named cluster"""
for n in self._clusterDict[clusterName].iterkeys():
- self.killNode(n, clusterName, False)
+ self.killNode(n, clusterName, False, ignoreFailures)
if updateDict:
del(self._clusterDict[clusterName])
@@ -270,46 +291,46 @@ class TestBaseCluster(TestBase):
raise Exception("Unable to kill cluster %s; %d nodes still exist" % \
(clusterName, self.getNumClusterBrokers(clusterName)))
- def killAllClusters(self):
+ def killAllClusters(self, ignoreFailures = False):
"""Kill all known clusters"""
for n in self._clusterDict.iterkeys():
- self.killCluster(n, False)
+ self.killCluster(n, False, ignoreFailures)
self._clusterDict.clear()
- def killAllClustersCheck(self):
+ def killAllClustersCheck(self, ignoreFailures = False):
"""Kill all known clusters and check that the cluster dictionary is empty"""
- self.killAllClusters()
+ self.killAllClusters(ignoreFailures)
self.checkNumBrokers(0)
# --- Stop cluster nodes using qpidd -q ---
- def stopNode(self, nodeNumber, clusterName, updateDict = True):
+ def stopNode(self, nodeNumber, clusterName, updateDict = True, ignoreFailures = False):
"""Stop the given node in the named cluster using qpidd -q"""
- self.stopBroker(self.getNodeTuple(nodeNumber, clusterName)[self.PORT])
+ self.stopBroker(self.getNodeTuple(nodeNumber, clusterName), ignoreFailures)
if updateDict:
del(self._clusterDict[clusterName][nodeNumber])
- def stopAllClusters(self):
+ def stopAllClusters(self, ignoreFailures = False):
"""Stop all known clusters"""
for n in self._clusterDict.iterkeys():
- self.stopCluster(n, False)
+ self.stopCluster(n, False, ignoreFailures)
self._clusterDict.clear()
- def stopCluster(self, clusterName, updateDict = True):
+ def stopCluster(self, clusterName, updateDict = True, ignoreFailures = False):
"""Stop all nodes in the named cluster"""
for n in self._clusterDict[clusterName].iterkeys():
- self.stopNode(n, clusterName, False)
+ self.stopNode(n, clusterName, False, ignoreFailures)
if updateDict:
del(self._clusterDict[clusterName])
- def stopCheckCluster(self, clusterName):
+ def stopCheckCluster(self, clusterName, ignoreFailures = False):
"""Stop the named cluster and check that the name is removed from the cluster dictionary"""
- self.stopCluster(clusterName)
+ self.stopCluster(clusterName, True, ignoreFailures)
if self.clusterExists(clusterName):
raise Exception("Unable to kill cluster %s; %d nodes still exist" % (clusterName, self.getNumClusterBrokers(clusterName)))
- def stopCheckAll(self):
+ def stopCheckAll(self, ignoreFailures = False):
"""Kill all known clusters and check that the cluster dictionary is empty"""
self.stopAllClusters()
self.checkNumBrokers(0)
@@ -479,11 +500,252 @@ class TestBaseCluster(TestBase):
if wait:
receiver.wait()
return msgs
-
- def sendReceiveMsgs(self, nodeNumber, clusterName, exchangeName, queueName, numMsgs, wait = True, msgSize = None):
- self.createBindDirectExchangeQueue(nodeNumber, clusterName, exchangeName, queueName)
- txMsgs = self.sendMsgs(nodeNumber, clusterName, exchangeName, queueName, numMsgs, msgSize, wait)
- rxMsgs = self.receiveMsgs(nodeNumber, clusterName, queueName, numMsgs, wait)
- if txMsgs != rxMsgs:
- self.fail("Send - receive message mismatch")
+
+
+ # --- Exchange-specific helper inner classes ---
+
+ class TestHelper:
+ """
+ This is a "virtual" superclass for test helpers, and is not useful on its own, but the
+ per-exchange subclasses are designed to keep track of the messages sent to and received
+ from queues which have bindings to that exchange type.
+ """
+
+ def __init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameList):
+
+ """Dictionary of queues and lists of messages sent to them."""
+ self._txMsgs = {}
+ """Dictionary of queues and lists of messages received from them."""
+ self._rxMsgs = {}
+ """List of node numbers currently in the cluster"""
+ self._nodes = []
+ """List of node numbers which have been killed and can therefore be recovered"""
+ self._deadNodes = []
+ """Last node to be used"""
+ self._lastNode = None
+
+ self._testBaseCluster = testBaseCluster
+ self._clusterName = clusterName
+ self._exchangeName = exchangeName
+ self._queueNameList = queueNameList
+ self._addQueues(queueNameList)
+ self._testBaseCluster.createCheckCluster(clusterName, numNodes)
+ self._nodes.extend(range(0, numNodes))
+
+ def _addQueues(self, queueNameList):
+ for qn in queueNameList:
+ if not qn in self._txMsgs:
+ self._txMsgs[qn] = []
+ if not qn in self._rxMsgs:
+ self._rxMsgs[qn] = []
+
+ def _bindQueue(self, queueName, bindingKey, nodeNumber = None):
+ """Bind a queue to an exchange using a binding key."""
+ if nodeNumber == None:
+ nodeNumber = self._nodes[0] # first available node
+ self._testBaseCluster.addQueue(nodeNumber, self._clusterName, queueName)
+ self._testBaseCluster.bind(nodeNumber, self._clusterName, self._exchangeName, queueName, bindingKey)
+
+ def _highestNodeNumber(self):
+ """Find the highest node number used so far between the current nodes and those stopped/killed."""
+ highestNode = self._nodes[-1]
+ if len(self._deadNodes) == 0:
+ return highestNode
+ highestDeadNode = self._deadNodes[-1]
+ if highestNode > highestDeadNode:
+ return highestNode
+ return highestDeadNode
+
+ def killCluster(self):
+ """Kill all nodes in the cluster"""
+ self._testBaseCluster.killCluster(self._clusterName)
+ self._testBaseCluster.checkNumClusterBrokers(self._clusterName, 0)
+ self._deadNodes.extend(self._nodes)
+ self._deadNodes.sort()
+ del self._nodes[:]
+
+ def restoreCluster(self, lastNode = None, restoreNodes = True):
+ """Restore a previously killed cluster"""
+ self._testBaseCluster.createCluster(self._clusterName)
+ if restoreNodes:
+ numNodes = len(self._deadNodes)
+ self.restoreNodes(lastNode)
+ self._testBaseCluster.checkNumClusterBrokers(self._clusterName, numNodes)
+
+ def addNodes(self, numberOfNodes = 1):
+ """Add a fixed number of nodes to the cluster."""
+ nodeStart = self._highestNodeNumber() + 1
+ for i in range(0, numberOfNodes):
+ nodeNumber = nodeStart + i
+ self._testBaseCluster.createClusterNode(nodeNumber, self._clusterName)
+ self._nodes.append(nodeNumber)
+ self._testBaseCluster.checkNumClusterBrokers(self._clusterName, len(self._nodes))
+
+ def restoreNode(self, nodeNumber):
+ """Restore a cluster node that has been previously killed"""
+ if nodeNumber not in self._deadNodes:
+ raise Exception("restoreNode(): Node number %d not in dead node list %s" % (nodeNumber, self._deadNodes))
+ self._testBaseCluster.createClusterNode(nodeNumber, self._clusterName)
+ self._deadNodes.remove(nodeNumber)
+ self._nodes.append(nodeNumber)
+ self._nodes.sort()
+
+ def restoreNodes(self, lastNode = None):
+ """Restore all known cluster nodes that have been previously killed starting with a known last-used node"""
+ if len(self._nodes) == 0: # restore last-used node first
+ if lastNode == None:
+ lastNode = self._lastNode
+ self.restoreNode(lastNode)
+ while len(self._deadNodes) > 0:
+ self.restoreNode(self._deadNodes[0])
+
+ def killNode(self, nodeNumber):
+ """Kill a cluster node (if it is in the _nodes list)."""
+ if nodeNumber not in self._nodes:
+ raise Exception("killNode(): Node number %d not in node list %s" % (nodeNumber, self._nodes))
+ self._testBaseCluster.killNode(nodeNumber, self._clusterName)
+ self._nodes.remove(nodeNumber)
+ self._deadNodes.append(nodeNumber)
+ self._deadNodes.sort()
+
+ def sendMsgs(self, routingKey, numMsgs, nodeNumber = None, msgSize = None, wait = True):
+ """Send a fixed number of messages using the given routing key."""
+ if nodeNumber == None:
+ nodeNumber = self._nodes[0] # Use first available node
+ msgs = self._testBaseCluster._makeMessageList(numMsgs, msgSize)
+ sender = self._testBaseCluster.createSender(nodeNumber, self._clusterName, self._exchangeName, routingKey)
+ sender.stdin.write(msgs)
+ sender.stdin.close()
+ if wait:
+ sender.wait()
+ self._lastNode = nodeNumber
+ return msgs.split()
+
+ # TODO - this i/f is messy: one mumMsgs can be given, but a list of queues
+ # so assuming numMsgs for each queue
+ # A mechanism is needed to specify a different numMsgs per queue
+ def receiveMsgs(self, numMsgs, nodeNumber = None, queueNameList = None, wait = True):
+ """Receive a fixed number of messages from a named queue. If numMsgs == None, get all remaining messages."""
+ if nodeNumber == None:
+ nodeNumber = self._nodes[0] # Use first available node
+ if queueNameList == None:
+ queueNameList = self._txMsgs.iterkeys()
+ for qn in queueNameList:
+ nm = numMsgs
+ if nm == None:
+ nm = len(self._txMsgs[qn]) - len(self._rxMsgs[qn]) # get all remaining messages
+ if nm > 0:
+ receiver = self._testBaseCluster.createReciever(nodeNumber, self._clusterName, qn, nm)
+ cnt = 0
+ while cnt < nm:
+ rx = receiver.stdout.readline().strip()
+ if rx == "" and receiver.poll() != None: break
+ self._rxMsgs[qn].append(rx)
+ cnt = cnt + 1
+ if wait:
+ receiver.wait()
+ self._lastNode = nodeNumber
+
+ def receiveRemainingMsgs(self, nodeNumber = None, queueNameList = None, wait = True):
+ """Receive all remaining messages on named queue."""
+ self.receiveMsgs(None, nodeNumber, queueNameList, wait)
+
+ def checkMsgs(self):
+ """Return True if all expected messages have been received (ie the transmit and receive list are identical)."""
+ txMsgTot = 0
+ rxMsgTot = 0
+ for qn, txMsgList in self._txMsgs.iteritems():
+ rxMsgList = self._rxMsgs[qn]
+ txMsgTot = txMsgTot + len(txMsgList)
+ rxMsgTot = rxMsgTot + len(rxMsgList)
+ if len(txMsgList) != len(rxMsgList):
+ return False
+ for i, m in enumerate(txMsgList):
+ if m != rxMsgList[i]:
+ return False
+ if txMsgTot == 0 and rxMsgTot == 0:
+ print "WARNING: No messages were either sent or received"
+ return True
+
+ def finalizeTest(self):
+ """Recover all the remaining messages on all queues, then check that all expected messages were received."""
+ self.receiveRemainingMsgs()
+ self._testBaseCluster.stopCheckAll()
+ if not self.checkMsgs():
+ self._testBaseCluster.fail("Send - receive message mismatch")
+ self.printMsgs()
+
+ def printMsgs(self, txMsgs = True, rxMsgs = True):
+ """Print all messages transmitted and received."""
+ for qn, txMsgList in self._txMsgs.iteritems():
+ print "Queue: %s" % qn
+ if txMsgs:
+ print " txMsgList = %s" % txMsgList
+ if rxMsgs:
+ rxMsgList = self._rxMsgs[qn]
+ print " rxMsgList = %s" % rxMsgList
+
+
+ class DirectExchangeTestHelper(TestHelper):
+
+ def __init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameList):
+ TestBaseCluster.TestHelper.__init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameList)
+ self._testBaseCluster.addExchange(0, clusterName, "direct", exchangeName)
+ for qn in queueNameList:
+ self._bindQueue(qn, qn)
+
+ def addQueues(self, queueNameList):
+ self._addQueues(queueNameList)
+ for qn in queueNameList:
+ self._bindQueue(qn, qn)
+
+ def sendMsgs(self, numMsgs, nodeNumber = None, queueNameList = None, msgSize = None, wait = True):
+ if queueNameList == None:
+ queueNameList = self._txMsgs.iterkeys()
+ for qn in queueNameList:
+ self._txMsgs[qn].extend(TestBaseCluster.TestHelper.sendMsgs(self, qn, numMsgs, nodeNumber, msgSize, wait))
+
+
+ class TopicExchangeTestHelper(TestHelper):
+
+ def __init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameKeyList):
+ self._queueNameKeyList = queueNameKeyList
+ TestBaseCluster.TestHelper.__init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameKeyList.iterkeys())
+ self._testBaseCluster.addExchange(0, clusterName, "topic", exchangeName)
+ for qn, bk in queueNameKeyList.iteritems():
+ self._bindQueue(qn, bk)
+
+ def addQueues(self, queueNameKeyList):
+ self._addQueues(queueNameKeyList.iterkeys())
+ for qn, bk in queueNameKeyList.iteritems():
+ self._bindQueue(qn, bk)
+
+ def _prepareRegex(self, bk):
+ # This regex conversion is not very complete - there are other chars that should be escaped too
+ return "^%s$" % bk.replace(".", r"\.").replace("*", r"[^.]*").replace("#", ".*")
+
+ def sendMsgs(self, routingKey, numMsgs, nodeNumber = None, msgSize = None, wait = True):
+ msgList = TestBaseCluster.TestHelper.sendMsgs(self, routingKey, numMsgs, nodeNumber, msgSize, wait)
+ for qn, bk in self._queueNameKeyList.iteritems():
+ if re.match(self._prepareRegex(bk), routingKey):
+ self._txMsgs[qn].extend(msgList)
+
+
+ class FanoutExchangeTestHelper(TestHelper):
+
+ def __init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameList):
+ TestBaseCluster.TestHelper.__init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameList)
+ self._testBaseCluster.addExchange(0, clusterName, "fanout", exchangeName)
+ for qn in queueNameList:
+ self._bindQueue(qn, "")
+
+ def addQueues(self, queueNameList):
+ self._addQueues(queueNameList)
+ for qn in queueNameList:
+ self._bindQueue(qn, "")
+
+ def sendMsgs(self, numMsgs, nodeNumber = None, msgSize = None, wait = True):
+ msgList = TestBaseCluster.TestHelper.sendMsgs(self, "", numMsgs, nodeNumber, msgSize, wait)
+ for ml in self._txMsgs.itervalues():
+ ml.extend(msgList)
\ No newline at end of file