summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2009-06-03 15:25:56 +0000
committerKim van der Riet <kpvdr@apache.org>2009-06-03 15:25:56 +0000
commit12048dcd9d13f1c2baeffb4b65c4ce65c0155234 (patch)
treeee4913784a020897e081c210f99eda53f152bf61 /cpp/src
parent811ef0bba2901e8ff65cd852a7f1d020493b2642 (diff)
downloadqpid-python-12048dcd9d13f1c2baeffb4b65c4ce65c0155234.tar.gz
Python cluster test improvements and some additional tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@781431 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rwxr-xr-xcpp/src/tests/cluster.py433
-rwxr-xr-xcpp/src/tests/run_cluster_tests30
-rw-r--r--cpp/src/tests/testlib.py382
3 files changed, 491 insertions, 354 deletions
diff --git a/cpp/src/tests/cluster.py b/cpp/src/tests/cluster.py
index 87c9b363a8..ae4cc7aa85 100755
--- a/cpp/src/tests/cluster.py
+++ b/cpp/src/tests/cluster.py
@@ -29,10 +29,9 @@ class ClusterTests(TestBaseCluster):
try:
clusterName = "cluster-01"
self.createCheckCluster(clusterName, 5)
- self.checkNumBrokers(5)
self.stopCheckCluster(clusterName)
except:
- self.killAllClusters()
+ self.killAllClusters(True)
raise
def test_Cluster_02_MultipleClusterInitialization(self):
@@ -40,13 +39,13 @@ class ClusterTests(TestBaseCluster):
try:
for i in range(0, 5):
clusterName = "cluster-02.%d" % i
- self.createCluster(clusterName, 5)
+ self.createCheckCluster(clusterName, 5)
self.checkNumBrokers(25)
self.killCluster("cluster-02.2")
self.checkNumBrokers(20)
self.stopCheckAll()
except:
- self.killAllClusters()
+ self.killAllClusters(True)
raise
def test_Cluster_03_AddRemoveNodes(self):
@@ -66,7 +65,7 @@ class ClusterTests(TestBaseCluster):
self.checkNumClusterBrokers(clusterName, 7)
self.stopCheckAll()
except:
- self.killAllClusters()
+ self.killAllClusters(True)
raise
def test_Cluster_04_RemoveRestoreNodes(self):
@@ -93,7 +92,7 @@ class ClusterTests(TestBaseCluster):
self.checkNumClusterBrokers(clusterName, 6)
self.stopCheckAll()
except:
- self.killAllClusters()
+ self.killAllClusters(True)
raise
def test_Cluster_05_KillAllNodesThenRecover(self):
@@ -105,170 +104,126 @@ class ClusterTests(TestBaseCluster):
self.createCheckCluster(clusterName, 6)
self.stopCheckAll()
except:
- self.killAllClusters()
+ self.killAllClusters(True)
raise
def test_Cluster_06_PublishConsume(self):
"""Publish then consume 100 messages from a single cluster"""
try:
- clusterName = "cluster-06"
- self.createCheckCluster(clusterName, 3)
- self.sendReceiveMsgs(0, clusterName, "test-exchange-06", "test-queue-06", 100)
- self.stopCheckAll()
+ dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-06", 3, "test-exchange-06", ["test-queue-06"])
+ dh.sendMsgs(100)
+ dh.finalizeTest()
except:
- self.killAllClusters()
+ self.killAllClusters(True)
raise
def test_Cluster_07_MultiplePublishConsume(self):
"""Staggered publish and consume on a single cluster"""
try:
- clusterName = "cluster-07"
- exchangeName = "test-exchange-07"
- queueName = "test-queue-07"
- self.createCheckCluster(clusterName, 3)
- self.createBindDirectExchangeQueue(0, clusterName, exchangeName, queueName)
- txMsgs = self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 20, 0
- rxMsgs = self.receiveMsgs(1, clusterName, queueName, 10) # 10, 10
- txMsgs += self.sendMsgs(2, clusterName, exchangeName, queueName, 20) # 30, 10
- rxMsgs += self.receiveMsgs(0, clusterName, queueName, 20) # 10, 30
- txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 20) # 30, 30
- rxMsgs += self.receiveMsgs(2, clusterName, queueName, 20) # 10, 50
- txMsgs += self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 30, 50
- rxMsgs += self.receiveMsgs(1, clusterName, queueName, 30) # 0, 80
- self.stopCheckAll()
- if txMsgs != rxMsgs:
- print "txMsgs=%s" % txMsgs
- print "rxMsgs=%s" % rxMsgs
- self.fail("Send - receive message mismatch")
+ dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-07", 3, "test-exchange-07", ["test-queue-07"])
+ # tx rx nodes
+ # 0 0 0 1 2
+ dh.sendMsgs(20) # 20 0 *
+ dh.receiveMsgs(10, 1) # 20 10 *
+ dh.sendMsgs(20, 2) # 40 10 *
+ dh.receiveMsgs(20, 0) # 40 30 *
+ dh.sendMsgs(20, 1) # 60 30 *
+ dh.receiveMsgs(20, 2) # 60 50 *
+ dh.sendMsgs(20, 0) # 80 50 *
+ dh.finalizeTest()
except:
- self.killAllClusters()
+ self.killAllClusters(True)
raise
def test_Cluster_08_MsgPublishConsumeAddRemoveNodes(self):
"""Staggered publish and consume interleaved with adding and removing nodes on a single cluster"""
try:
- clusterName = "cluster-08"
- exchangeName = "test-exchange-08"
- queueName = "test-queue-08"
- self.createCheckCluster(clusterName, 3)
- self.createBindDirectExchangeQueue(0, clusterName, exchangeName, queueName)
- txMsgs = self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 20, 0
- for i in range(3,6):
- self.createClusterNode(i, clusterName)
- self.checkNumClusterBrokers(clusterName, 6)
- txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 20) # 40, 0
- self.killNode(0, clusterName)
- self.checkNumClusterBrokers(clusterName, 5)
- rxMsgs = self.receiveMsgs(2, clusterName, queueName, 10) # 30, 10
- self.killNode(2, clusterName)
- self.checkNumClusterBrokers(clusterName, 4)
- rxMsgs += self.receiveMsgs(3, clusterName, queueName, 20) # 10, 30
- self.createClusterNode(6, clusterName)
- self.checkNumClusterBrokers(clusterName, 5)
- txMsgs += self.sendMsgs(4, clusterName, exchangeName, queueName, 20) # 30, 30
- rxMsgs += self.receiveMsgs(5, clusterName, queueName, 20) # 10, 50
- self.createClusterNode(7, clusterName)
- self.checkNumClusterBrokers(clusterName, 6)
- txMsgs += self.sendMsgs(6, clusterName, exchangeName, queueName, 20) # 30, 50
- rxMsgs += self.receiveMsgs(1, clusterName, queueName, 30) # 0, 80
- self.stopCheckAll()
- if txMsgs != rxMsgs:
- print "txMsgs=%s" % txMsgs
- print "rxMsgs=%s" % rxMsgs
- self.fail("Send - receive message mismatch")
+ dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-08", 3, "test-exchange-08", ["test-queue-08"])
+ # tx rx nodes
+ # 0 0 0 1 2
+ dh.sendMsgs(20) # 20 0 *
+ dh.addNodes(2) # 0 1 2 3 4
+ dh.sendMsgs(20, 1) # 40 0 *
+ dh.killNode(0) # . 1 2 3 4
+ dh.receiveMsgs(10, 2) # 40 10 *
+ dh.killNode(2) # . 1 . 3 4
+ dh.receiveMsgs(20, 3) # 40 30 *
+ dh.addNodes() # . 1 . 3 4 5
+ dh.sendMsgs(20, 4) # 60 30 *
+ dh.receiveMsgs(20, 5) # 60 50 *
+ dh.addNodes() # . 1 . 3 4 5 6
+ dh.sendMsgs(20, 6) # 80 50 *
+ dh.killNode(5) # . 1 . 3 4 . 6
+ dh.finalizeTest()
except:
- self.killAllClusters()
+ self.killAllClusters(True)
raise
def test_Cluster_09_MsgPublishConsumeRemoveRestoreNodes(self):
"""Publish and consume messages interleaved with adding and restoring previous nodes on a single cluster"""
try:
- clusterName = "cluster-09"
- exchangeName = "test-exchange-09"
- queueName = "test-queue-09"
- self.createCheckCluster(clusterName, 6)
- self.createBindDirectExchangeQueue(0, clusterName, exchangeName, queueName)
- txMsgs = self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 20, 0
- self.killNode(2, clusterName)
- self.checkNumClusterBrokers(clusterName, 5)
- txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 20) # 40, 0
- self.killNode(0, clusterName)
- self.checkNumClusterBrokers(clusterName, 4)
- rxMsgs = self.receiveMsgs(3, clusterName, queueName, 10) # 30, 10
- self.killNode(4, clusterName)
- self.checkNumClusterBrokers(clusterName, 3)
- rxMsgs += self.receiveMsgs(5, clusterName, queueName, 20) # 10, 30
- self.createClusterNode(2, clusterName)
- self.checkNumClusterBrokers(clusterName, 4)
- txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 20) # 30, 30
- self.createClusterNode(0, clusterName)
- self.checkNumClusterBrokers(clusterName, 5)
- rxMsgs += self.receiveMsgs(2, clusterName, queueName, 20) # 10, 50
- self.createClusterNode(4, clusterName)
- self.checkNumClusterBrokers(clusterName, 6)
- txMsgs += self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 30, 50
- rxMsgs += self.receiveMsgs(4, clusterName, queueName, 30) # 0, 80
- self.stopCheckAll()
- if txMsgs != rxMsgs:
- print "txMsgs=%s" % txMsgs
- print "rxMsgs=%s" % rxMsgs
- self.fail("Send - receive message mismatch")
+ dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-09", 6, "test-exchange-09", ["test-queue-09"])
+ # tx rx nodes
+ # 0 0 0 1 2 3 4 5
+ dh.sendMsgs(20) # 20 0 *
+ dh.killNode(2) # 0 1 . 3 4 5
+ dh.sendMsgs(20, 1) # 40 0 *
+ dh.killNode(0) # . 1 . 3 4 5
+ dh.receiveMsgs(10, 3) # 40 10 *
+ dh.killNode(4) # . 1 . 3 . 5
+ dh.receiveMsgs(20, 5) # 40 30 *
+ dh.restoreNode(2) # . 1 2 3 . 5
+ dh.sendMsgs(20, 1) # 60 30 *
+ dh.restoreNode(0) # 0 1 2 3 . 5
+ dh.receiveMsgs(20, 0) # 60 50 *
+ dh.killNode(2) # 0 1 . 3 . 5
+ dh.restoreNode(2) # 0 1 2 3 . 5
+ dh.sendMsgs(20, 2) # 80 50 *
+ dh.finalizeTest()
except:
- self.killAllClusters()
+ self.killAllClusters(True)
raise
def test_Cluster_10_LinearNodeKillCreateProgression(self):
"""Publish and consume messages while linearly killing all original nodes and replacing them with new ones"""
try:
- clusterName = "cluster-10"
- exchangeName = "test-exchange-10"
- queueName = "test-queue-10"
- self.createCheckCluster(clusterName, 4)
- self.createBindDirectExchangeQueue(2, clusterName, exchangeName, queueName)
- txMsgs = self.sendMsgs(2, clusterName, exchangeName, queueName, 20)
- rxMsgs = self.receiveMsgs(3, clusterName, queueName, 10)
- for i in range(0, 16):
- self.killNode(i, clusterName)
- self.createClusterNode(i+4, clusterName)
- self.checkNumClusterBrokers(clusterName, 4)
- txMsgs += self.sendMsgs(i+1, clusterName, exchangeName, queueName, 20)
- rxMsgs += self.receiveMsgs(i+2, clusterName, queueName, 20)
- rxMsgs += self.receiveMsgs(16, clusterName, queueName, 10)
- self.stopCheckAll()
- if txMsgs != rxMsgs:
- print "txMsgs=%s" % txMsgs
- print "rxMsgs=%s" % rxMsgs
- self.fail("Send - receive message mismatch")
+ dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-10", 4, "test-exchange-10", ["test-queue-10"])
+ # tx rx nodes
+ # 0 0 0 1 2 3
+ dh.sendMsgs(20) # 20 0 *
+ dh.receiveMsgs(10, 1) # 20 10 *
+ for i in range(0, 16): # First loop:
+ dh.killNode(i) # . 1 2 3
+ dh.addNodes() # . 1 2 3 4
+ dh.sendMsgs(20, i+1) # 40 10 *
+ dh.receiveMsgs(20, i+2) # 40 30 *
+ # After loop:
+ # 340 330 . . . . . . . . . . . . . . . . 16 17 18 19
+ dh.finalizeTest()
except:
- self.killAllClusters()
+ self.killAllClusters(True)
raise
def test_Cluster_11_CircularNodeKillRestoreProgression(self):
"""Publish and consume messages while circularly killing all original nodes and restoring them again"""
try:
- clusterName = "cluster-11"
- exchangeName = "test-exchange-11"
- queueName = "test-queue-11"
- self.createCheckCluster(clusterName, 4)
- self.createBindDirectExchangeQueue(2, clusterName, exchangeName, queueName)
- txMsgs = self.sendMsgs(3, clusterName, exchangeName, queueName, 20)
- rxMsgs = self.receiveMsgs(0, clusterName, queueName, 10)
- self.killNode(0, clusterName)
- self.killNode(1, clusterName)
- for i in range(0, 16):
- self.killNode((i + 2) % 4, clusterName)
- self.createClusterNode(i % 4, clusterName)
- self.checkNumClusterBrokers(clusterName, 2)
- txMsgs += self.sendMsgs((i + 3) % 4, clusterName, exchangeName, queueName, 20)
- rxMsgs += self.receiveMsgs((i + 4) % 4, clusterName, queueName, 20)
- rxMsgs += self.receiveMsgs(3, clusterName, queueName, 10)
- self.stopCheckAll()
- if txMsgs != rxMsgs:
- print "txMsgs=%s" % txMsgs
- print "rxMsgs=%s" % rxMsgs
- self.fail("Send - receive message mismatch")
+ dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-11", 4, "test-exchange-11", ["test-queue-11"])
+ # tx rx nodes
+ # 0 0 0 1 2 3
+ dh.sendMsgs(20, 3) # 20 0 *
+ dh.receiveMsgs(10) # 20 10 *
+ dh.killNode(0) # . 1 2 3
+ dh.killNode(1) # . . 2 3
+ for i in range(0, 16): # First loop:
+ dh.killNode((i + 2) % 4) # . . . 3
+ dh.restoreNode(i % 4) # 0 . . 3
+ dh.sendMsgs(20, (i + 3) % 4) # 40 10 *
+ dh.receiveMsgs(20, (i + 4) % 4) # 40 30 *
+ # After loop:
+ # 340 330 . . 2 3
+ dh.finalizeTest()
except:
- self.killAllClusters()
+ self.killAllClusters(True)
raise
def test_Cluster_12_KillAllNodesRecoverMessages(self):
@@ -277,180 +232,94 @@ class ClusterTests(TestBaseCluster):
print " No store loaded, skipped"
return
try:
- clusterName = "cluster-12"
- exchangeName = "test-exchange-12"
- queueName = "test-queue-12"
- self.createCheckCluster(clusterName, 4)
- self.createBindDirectExchangeQueue(2, clusterName, exchangeName, queueName)
- txMsgs = self.sendMsgs(0, clusterName, exchangeName, queueName, 20)
- rxMsgs = self.receiveMsgs(1, clusterName, queueName, 10)
- txMsgs += self.sendMsgs(2, clusterName, exchangeName, queueName, 20)
- rxMsgs += self.receiveMsgs(3, clusterName, queueName, 20)
- self.killNode(0, clusterName)
- self.createClusterNode(4, clusterName)
- self.checkNumClusterBrokers(clusterName, 4)
- txMsgs += self.sendMsgs(4, clusterName, exchangeName, queueName, 20)
- rxMsgs += self.receiveMsgs(1, clusterName, queueName, 20)
- self.killNode(2, clusterName)
- self.createClusterNode(0, clusterName)
- self.createClusterNode(5, clusterName)
- self.checkNumClusterBrokers(clusterName, 5)
- txMsgs += self.sendMsgs(0, clusterName, exchangeName, queueName, 20)
- rxMsgs += self.receiveMsgs(3, clusterName, queueName, 20)
- self.killAllClusters()
- self.checkNumClusterBrokers(clusterName, 0)
- self.createCluster(clusterName)
- self.createClusterNode(3, clusterName) # last node to be used
- self.createClusterNode(0, clusterName)
- self.createClusterNode(1, clusterName)
- self.createClusterNode(2, clusterName)
- self.createClusterNode(4, clusterName)
- self.createClusterNode(5, clusterName)
- rxMsgs += self.receiveMsgs(0, clusterName, queueName, 10)
- if txMsgs != rxMsgs:
- print "txMsgs=%s" % txMsgs
- print "rxMsgs=%s" % rxMsgs
- self.fail("Send - receive message mismatch")
+ dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-12", 4, "test-exchange-12", ["test-queue-12"])
+ # tx rx nodes
+ # 0 0 0 1 2 3
+ dh.sendMsgs(20, 2) # 20 0 *
+ dh.receiveMsgs(10, 1) # 20 10 *
+ dh.killNode(1) # 0 . 2 3
+ dh.sendMsgs(20, 0) # 40 10 *
+ dh.receiveMsgs(20, 3) # 40 30 *
+ dh.killNode(2) # 0 . . 3
+ dh.addNodes(2) # 0 . . 3 4 5
+ dh.sendMsgs(20, 5) # 60 30 *
+ dh.receiveMsgs(20, 4) # 60 50 *
+ dh.killCluster() # cluster does not exist
+ dh.restoreCluster() # 60 50 . . . . . .
+ dh.restoreNodes() # 0 1 2 3 4 5
+ dh.finalizeTest()
except:
- self.killAllClusters()
+ self.killAllClusters(True)
raise
def test_Cluster_13_TopicExchange(self):
- """Create topic exchange in a cluster and make sure it replicates correctly"""
+ """Create topic exchange in a cluster and make sure it behaves correctly"""
try:
- clusterName = "cluster-13"
- self.createCheckCluster(clusterName, 4)
- topicExchangeName = "test-exchange-13"
- topicQueueNameKeyList = {"test-queue-13-A" : "#.A", "test-queue-13-B" : "#.B", "test-queue-13-C" : "C.#", "test-queue-13-D" : "D.#"}
- self.createBindTopicExchangeQueues(2, clusterName, topicExchangeName, topicQueueNameKeyList)
-
- # Place initial messages
- txMsgsA = txMsgsC = self.sendMsgs(3, clusterName, topicExchangeName, "C.hello.A", 10) # (10, 0, 10, 0)
- self.sendMsgs(2, clusterName, topicExchangeName, "hello", 10) # Should not go to any queue
- txMsgsD = self.sendMsgs(1, clusterName, topicExchangeName, "D.hello.A", 10) # (20, 0, 10, 10)
- txMsgsA += txMsgsD
- txMsgsB = self.sendMsgs(0, clusterName, topicExchangeName, "hello.B", 20) # (20, 20, 10, 10)
+ topicQueueNameKeyList = {"test-queue-13-A" : "#.A", "test-queue-13-B" : "*.B", "test-queue-13-C" : "C.#", "test-queue-13-D" : "D.*"}
+ th = TestBaseCluster.TopicExchangeTestHelper(self, "cluster-13", 4, "test-exchange-13", topicQueueNameKeyList)
+ # Place initial messages
+ th.sendMsgs("C.hello.A", 10)
+ th.sendMsgs("hello.world", 10) # matches none of the queues
+ th.sendMsgs("D.hello.A", 10)
+ th.sendMsgs("hello.B", 20)
+ th.sendMsgs("D.hello", 20)
# Kill and add some nodes
- self.killNode(0, clusterName)
- self.killNode(2, clusterName)
- self.createClusterNode(4, clusterName)
- self.createClusterNode(5, clusterName)
- self.checkNumClusterBrokers(clusterName, 4)
+ th.killNode(0)
+ th.killNode(2)
+ th.addNodes(2)
# Pull 10 messages from each queue
- rxMsgsA = self.receiveMsgs(1, clusterName, "test-queue-13-A", 10) # (10, 20, 10, 10)
- rxMsgsB = self.receiveMsgs(3, clusterName, "test-queue-13-B", 10) # (10, 10, 10, 10)
- rxMsgsC = self.receiveMsgs(4, clusterName, "test-queue-13-C", 10) # (10, 10, 0, 10)
- rxMsgsD = self.receiveMsgs(5, clusterName, "test-queue-13-D", 10) # (10, 10, 0, 0)
+ th.receiveMsgs(10)
# Kill and add another node
- self.killNode(4, clusterName)
- self.createClusterNode(6, clusterName)
- self.checkNumClusterBrokers(clusterName, 4)
+ th.killNode(4)
+ th.addNodes()
# Add two more queues
- self.createBindTopicExchangeQueues(6, clusterName, topicExchangeName, {"test-queue-13-E" : "#.bye.A", "test-queue-13-F" : "#.bye.B"})
+ th.addQueues({"test-queue-13-E" : "#.bye.A", "test-queue-13-F" : "#.bye.B"})
# Place more messages
- txMsgs = self.sendMsgs(3, clusterName, topicExchangeName, "C.bye.A", 10) # (20, 10, 10, 0, 10, 0)
- txMsgsA += txMsgs
- txMsgsC += txMsgs
- txMsgsE = txMsgs
- self.sendMsgs(1, clusterName, topicExchangeName, "bye", 20) # Should not go to any queue
- txMsgs = self.sendMsgs(5, clusterName, topicExchangeName, "D.bye.B", 20) # (20, 30, 10, 20, 10, 20)
- txMsgsB += txMsgs
- txMsgsD += txMsgs
- txMsgsF = txMsgs
+ th.sendMsgs("C.bye.A", 10)
+ th.sendMsgs("hello.bye", 20) # matches none of the queues
+ th.sendMsgs("hello.bye.B", 20)
+ th.sendMsgs("D.bye", 20)
# Kill all nodes but one
- self.killNode(1, clusterName)
- self.killNode(3, clusterName)
- self.killNode(6, clusterName)
- self.checkNumClusterBrokers(clusterName, 1)
- # Pull all remaining messages from each queue
- rxMsgsA += self.receiveMsgs(5, clusterName, "test-queue-13-A", 20)
- rxMsgsB += self.receiveMsgs(5, clusterName, "test-queue-13-B", 30)
- rxMsgsC += self.receiveMsgs(5, clusterName, "test-queue-13-C", 10)
- rxMsgsD += self.receiveMsgs(5, clusterName, "test-queue-13-D", 20)
- rxMsgsE = self.receiveMsgs(5, clusterName, "test-queue-13-E", 10)
- rxMsgsF = self.receiveMsgs(5, clusterName, "test-queue-13-F", 20)
- # Check messages
- self.stopCheckAll()
- if txMsgsA != rxMsgsA:
- self.fail("Send - receive message mismatch for queue A")
- if txMsgsB != rxMsgsB:
- self.fail("Send - receive message mismatch for queue B")
- if txMsgsC != rxMsgsC:
- self.fail("Send - receive message mismatch for queue C")
- if txMsgsD != rxMsgsD:
- self.fail("Send - receive message mismatch for queue D")
- if txMsgsE != rxMsgsE:
- self.fail("Send - receive message mismatch for queue E")
- if txMsgsF != rxMsgsF:
- self.fail("Send - receive message mismatch for queue F")
+ th.killNode(1)
+ th.killNode(3)
+ th.killNode(6)
+ # Pull all remaining messages from each queue and check messages
+ th.finalizeTest()
except:
- self.killAllClusters()
+ self.killAllClusters(True)
raise
def test_Cluster_14_FanoutExchange(self):
- """Create fanout exchange in a cluster and make sure it replicates correctly"""
+ """Create fanout exchange in a cluster and make sure it behaves correctly"""
try:
- clusterName = "cluster-14"
- self.createCheckCluster(clusterName, 4)
- fanoutExchangeName = "test-exchange-14"
fanoutQueueNameList = ["test-queue-14-A", "test-queue-14-B", "test-queue-14-C"]
- self.createBindFanoutExchangeQueues(2, clusterName, fanoutExchangeName, fanoutQueueNameList)
-
+ fh = TestBaseCluster.FanoutExchangeTestHelper(self, "cluster-14", 4, "test-exchange-14", fanoutQueueNameList)
# Place initial 20 messages, retrieve 10
- txMsg = self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20)
- rxMsgA = self.receiveMsgs(1, clusterName, "test-queue-14-A", 10)
- rxMsgB = self.receiveMsgs(3, clusterName, "test-queue-14-B", 10)
- rxMsgC = self.receiveMsgs(0, clusterName, "test-queue-14-C", 10)
+ fh.sendMsgs(20)
+ fh.receiveMsgs(10)
# Kill and add some nodes
- self.killNode(0, clusterName)
- self.killNode(2, clusterName)
- self.createClusterNode(4, clusterName)
- self.createClusterNode(5, clusterName)
- self.checkNumClusterBrokers(clusterName, 4)
+ fh.killNode(0)
+ fh.killNode(2)
+ fh.addNodes(2)
# Place another 20 messages, retrieve 20
- txMsg += self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20)
- rxMsgA += self.receiveMsgs(1, clusterName, "test-queue-14-A", 20)
- rxMsgB += self.receiveMsgs(3, clusterName, "test-queue-14-B", 20)
- rxMsgC += self.receiveMsgs(4, clusterName, "test-queue-14-C", 20)
+ fh.sendMsgs(20)
+ fh.receiveMsgs(20)
# Kill and add another node
- self.killNode(4, clusterName)
- self.createClusterNode(6, clusterName)
- self.checkNumClusterBrokers(clusterName, 4)
+ fh.killNode(4)
+ fh.addNodes()
# Add another 2 queues
- self.createBindFanoutExchangeQueues(6, clusterName, fanoutExchangeName, ["test-queue-14-D", "test-queue-14-E"])
+ fh.addQueues(["test-queue-14-D", "test-queue-14-E"])
# Place another 20 messages, retrieve 20
- tmp = self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20)
- txMsg += tmp
- rxMsgA += self.receiveMsgs(1, clusterName, "test-queue-14-A", 20)
- rxMsgB += self.receiveMsgs(3, clusterName, "test-queue-14-B", 20)
- rxMsgC += self.receiveMsgs(6, clusterName, "test-queue-14-C", 20)
- rxMsgD = self.receiveMsgs(6, clusterName, "test-queue-14-D", 10)
- rxMsgE = self.receiveMsgs(6, clusterName, "test-queue-14-E", 10)
+ fh.sendMsgs(20)
+ fh.receiveMsgs(20)
# Kill all nodes but one
- self.killNode(1, clusterName)
- self.killNode(3, clusterName)
- self.killNode(6, clusterName)
- self.checkNumClusterBrokers(clusterName, 1)
- # Pull all remaining messages from each queue
- rxMsgA += self.receiveMsgs(5, clusterName, "test-queue-14-A", 10)
- rxMsgB += self.receiveMsgs(5, clusterName, "test-queue-14-B", 10)
- rxMsgC += self.receiveMsgs(5, clusterName, "test-queue-14-C", 10)
- rxMsgD += self.receiveMsgs(5, clusterName, "test-queue-14-D", 10)
- rxMsgE += self.receiveMsgs(5, clusterName, "test-queue-14-E", 10)
+ fh.killNode(1)
+ fh.killNode(3)
+ fh.killNode(6)
# Check messages
- self.stopCheckAll()
- if txMsg != rxMsgA:
- self.fail("Send - receive message mismatch for queue A")
- if txMsg != rxMsgB:
- self.fail("Send - receive message mismatch for queue B")
- if txMsg != rxMsgC:
- self.fail("Send - receive message mismatch for queue C")
- if tmp != rxMsgD:
- self.fail("Send - receive message mismatch for queue D")
- if tmp != rxMsgE:
- self.fail("Send - receive message mismatch for queue E")
+ fh.finalizeTest()
except:
- self.killAllClusters()
+ self.killAllClusters(True)
raise
# Start the test here
diff --git a/cpp/src/tests/run_cluster_tests b/cpp/src/tests/run_cluster_tests
index d2f7a77865..103896cd3d 100755
--- a/cpp/src/tests/run_cluster_tests
+++ b/cpp/src/tests/run_cluster_tests
@@ -1,4 +1,4 @@
-#!/bin/sh
+#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one
@@ -23,13 +23,13 @@
TEST_DIR=${top_builddir}/src/tests
# Check AIS requirements
-id -nG | grep '\<ais\>' >/dev/null || NOGROUP="You are not a member of the ais group."
-ps -u root | grep 'aisexec\|corosync' >/dev/null || NOAISEXEC="The aisexec or corosync daemon is not running as root"
+id -nG | grep '\<ais\>' > /dev/null || NOGROUP="You are not a member of the ais group."
+ps -u root | grep 'aisexec\|corosync' > /dev/null || NOAISEXEC="The aisexec or corosync daemon is not running as root"
if test -n "${NOGROUP}" -o -n "${NOAISEXEC}"; then
cat <<EOF
- ========= WARNING: CLUSTERING TESTS DISABLED ==============
+ ======== WARNING: PYTHON CLUSTER TESTS DISABLED ===========
Tests that depend on the openais library (used for clustering)
will not be run because:
@@ -43,22 +43,28 @@ EOF
exit 0
fi
-export PYTHONPATH=$srcdir:$srcdir/../../../python
-export RUN_CLUSTER_TESTS=1
+# Check XML exchange requirements
+XML_LIB=$srcdir/../.libs/xml.so
+if [ -f ${XML_LIB} ]; then
+ export XML_LIB
+fi
+
+export PYTHONPATH=${srcdir}:${srcdir}/../../../python
export QPIDD_EXEC=${top_builddir}/src/qpidd
export CLUSTER_LIB=${top_builddir}/src/.libs/cluster.so
-export QPID_CONFIG_EXEC=$srcdir/../../../python/commands/qpid-config
-export QPID_ROUTE_EXEC=$srcdir/../../../python/commands/qpid-route
+export QPID_CONFIG_EXEC=${srcdir}/../../../python/commands/qpid-config
+export QPID_ROUTE_EXEC=${srcdir}/../../../python/commands/qpid-route
export RECEIVER_EXEC=${top_builddir}/src/tests/receiver
export SENDER_EXEC=${top_builddir}/src/tests/sender
+
#Make sure temp dir exists if this is the first to use it
TMP_STORE_DIR=${TEST_DIR}/test_tmp
if ! test -d ${TMP_STORE_DIR} ; then
- mkdir -p ${TMP_STORE_DIR}
mkdir -p ${TMP_STORE_DIR}/cluster
else
- rm -rf "${TMP_STORE_DIR}/cluster"
+ # Delete old cluster test dirs
+ rm -rf "${TMP_STORE_DIR}/cluster"
mkdir -p "${TMP_STORE_DIR}/cluster"
fi
export TMP_STORE_DIR
@@ -66,6 +72,6 @@ export TMP_STORE_DIR
sg ais -c "${srcdir}/cluster.py -v"
RETCODE=$?
-if test x$RETCODE != x0; then
- echo "FAIL cluster tests"; exit 1;
+if test x${RETCODE} != x0; then
+ exit 1;
fi
diff --git a/cpp/src/tests/testlib.py b/cpp/src/tests/testlib.py
index 1c1d1bf407..07c4794767 100644
--- a/cpp/src/tests/testlib.py
+++ b/cpp/src/tests/testlib.py
@@ -21,7 +21,7 @@
# Support library for qpid python tests.
#
-import os, signal, subprocess, unittest
+import os, re, signal, subprocess, unittest
class TestBase(unittest.TestCase):
"""
@@ -32,8 +32,8 @@ class TestBase(unittest.TestCase):
The following environment vars control if and how the test is run, and determine where many of the helper
executables/libs are to be found.
"""
- _storeEnable = os.getenv("STORE_ENABLE") != None # Must be True for durability to be enabled during the test
_storeLib = os.getenv("STORE_LIB")
+ _storeEnable = _storeLib != None # Must be True for durability to be enabled during the test
_qpiddExec = os.getenv("QPIDD_EXEC", "/usr/sbin/qpidd")
_tempStoreDir = os.path.abspath(os.getenv("TMP_STORE_DIR", "/tmp/qpid"))
@@ -55,22 +55,7 @@ class TestBase(unittest.TestCase):
return " --%s yes" % key
else:
return " --%s no" % key
-
- def _paramNum(self, key, val):
- if val != None:
- return " --%s %d" % (key, val)
- return ""
-
- def _paramString(self, key, val):
- if val != None:
- return " --%s %s" % (key, val)
- return ""
-
- def _paramStringList(self, key, valList, val):
- if val in valList:
- return " --%s %s" % (key, val)
- return ""
-
+
# --- Helper functions for message creation ---
def _makeMessage(self, msgSize):
@@ -115,17 +100,37 @@ class TestBase(unittest.TestCase):
#print "started broker: pid=%d, port=%d args: %s" % (pid, port, qpiddArgs)
return (pid, port)
- def killBroker(self, pid):
+ def killBroker(self, nodeTuple, ignoreFailures = False):
"""Kill a broker using kill -9"""
- os.kill(pid, signal.SIGTERM)
- #print "killed broker: pid=%d" % pid
+ try:
+ os.kill(nodeTuple[self.PID], signal.SIGKILL)
+ try:
+ os.waitpid(nodeTuple[self.PID], 0)
+ except:
+ pass
+ #print "killed broker: port=%d pid=%d" % (nodeTuple[self.PORT], nodeTuple[self.PID])
+ except:
+ if ignoreFailures:
+ print "WARNING: killBroker (port=%d pid=%d) failed - ignoring." % (nodeTuple[self.PORT], nodeTuple[self.PID])
+ else:
+ raise
- def stopBroker(self, port):
+ def stopBroker(self, nodeTuple, ignoreFailures = False):
"""Stop a broker using qpidd -q"""
- ret = os.spawnl(os.P_WAIT, self._qpiddExec, self._qpiddExec, "--port=%d" % port, "-q")
- if ret != 0:
- raise Exception("stopBroker(): port=%d: qpidd -q returned %d" % (port, ret))
- #print "stopped broker: port=%d" % port
+ try:
+ ret = os.spawnl(os.P_WAIT, self._qpiddExec, self._qpiddExec, "--port=%d" % nodeTuple[self.PORT], "--quit")
+ if ret != 0:
+ raise Exception("stopBroker(): port=%d: qpidd -q returned %d" % (port, ret))
+ try:
+ os.waitpid(nodeTuple[self.PID], 0)
+ except:
+ pass
+ #print "stopped broker: port=%d pid=%d" % (nodeTuple[self.PORT], nodeTuple[self.PID])
+ except:
+ if ignoreFailures:
+ print "WARNING: stopBroker (port=%d pid=%d) failed - ignoring." % (nodeTuple[self.PORT], nodeTuple[self.PID])
+ else:
+ raise
@@ -138,8 +143,10 @@ class TestBaseCluster(TestBase):
The following environment vars control if and how the test is run, and determine where many of the helper
executables/libs are to be found.
"""
- _runClusterTests = os.getenv("RUN_CLUSTER_TESTS") != None # Must be True for these cluster tests to run
_clusterLib = os.getenv("CLUSTER_LIB")
+ _clusterTestEnable = _clusterLib != None # Must be True for these cluster tests to run
+ _xmlLib = os.getenv("XML_LIB")
+ _xmlEnable = _xmlLib != None
_qpidConfigExec = os.getenv("QPID_CONFIG_EXEC", "/usr/bin/qpid-config")
_qpidRouteExec = os.getenv("QPID_ROUTE_EXEC", "/usr/bin/qpid-route")
_receiverExec = os.getenv("RECEIVER_EXEC", "/usr/libexec/qpid/test/receiver")
@@ -164,10 +171,19 @@ class TestBaseCluster(TestBase):
def run(self, res):
""" Skip cluster testing if env var RUN_CLUSTER_TESTS is not defined."""
- if not self._runClusterTests:
+ if not self._clusterTestEnable:
return
unittest.TestCase.run(self, res)
+ # --- Private helper / convenience functions ---
+
+ def _checkPids(self, clusterName = None):
+ for pid, port in self.getTupleList():
+ try:
+ os.kill(pid, 0)
+ except:
+ raise Exception("_checkPids(): Broker with pid %d expected but does not exist! (crashed?)" % pid)
+
# --- Starting cluster node(s) ---
@@ -196,22 +212,25 @@ class TestBaseCluster(TestBase):
# --- Cluster and node status ---
- def getTupleList(self):
+ def getTupleList(self, clusterName = None):
"""Get list of (pid, port) tuples of all known cluster brokers"""
tList = []
- for l in self._clusterDict.itervalues():
- for t in l.itervalues():
- tList.append(t)
+ for c, l in self._clusterDict.iteritems():
+ if clusterName == None or c == clusterName:
+ for t in l.itervalues():
+ tList.append(t)
return tList
def getNumBrokers(self):
"""Get total number of brokers in all known clusters"""
return len(self.getTupleList())
- def checkNumBrokers(self, expected):
+ def checkNumBrokers(self, expected = None, checkPids = True):
"""Check that the total number of brokers in all known clusters is the expected value"""
- if self.getNumBrokers() != expected:
+ if expected != None and self.getNumBrokers() != expected:
raise Exception("Unexpected number of brokers: expected %d, found %d" % (expected, self.getNumBrokers()))
+ if checkPids:
+ self._checkPids()
def getClusterTupleList(self, clusterName):
"""Get list of (pid, port) tuples of all nodes in named cluster"""
@@ -227,11 +246,13 @@ class TestBaseCluster(TestBase):
"""Get the (pid, port) tuple for the given cluster node"""
return self._clusterDict[clusterName][nodeNumber]
- def checkNumClusterBrokers(self, clusterName, expected):
+ def checkNumClusterBrokers(self, clusterName, expected = None, checkPids = True):
"""Check that the total number of brokers in the named cluster is the expected value"""
- if self.getNumClusterBrokers(clusterName) != expected:
+ if expected != None and self.getNumClusterBrokers(clusterName) != expected:
raise Exception("Unexpected number of brokers in cluster %s: expected %d, found %d" % \
(clusterName, expected, self.getNumClusterBrokers(clusterName)))
+ if checkPids:
+ self._checkPids(clusterName)
def clusterExists(self, clusterName):
""" Return True if clusterName exists, False otherwise"""
@@ -250,16 +271,16 @@ class TestBaseCluster(TestBase):
# --- Kill cluster nodes using signal 9 ---
- def killNode(self, nodeNumber, clusterName, updateDict = True):
+ def killNode(self, nodeNumber, clusterName, updateDict = True, ignoreFailures = False):
"""Kill the given node in the named cluster using kill -9"""
- self.killBroker(self.getNodeTuple(nodeNumber, clusterName)[self.PID])
+ self.killBroker(self.getNodeTuple(nodeNumber, clusterName), ignoreFailures)
if updateDict:
del(self._clusterDict[clusterName][nodeNumber])
- def killCluster(self, clusterName, updateDict = True):
+ def killCluster(self, clusterName, updateDict = True, ignoreFailures = False):
"""Kill all nodes in the named cluster"""
for n in self._clusterDict[clusterName].iterkeys():
- self.killNode(n, clusterName, False)
+ self.killNode(n, clusterName, False, ignoreFailures)
if updateDict:
del(self._clusterDict[clusterName])
@@ -270,46 +291,46 @@ class TestBaseCluster(TestBase):
raise Exception("Unable to kill cluster %s; %d nodes still exist" % \
(clusterName, self.getNumClusterBrokers(clusterName)))
- def killAllClusters(self):
+ def killAllClusters(self, ignoreFailures = False):
"""Kill all known clusters"""
for n in self._clusterDict.iterkeys():
- self.killCluster(n, False)
+ self.killCluster(n, False, ignoreFailures)
self._clusterDict.clear()
- def killAllClustersCheck(self):
+ def killAllClustersCheck(self, ignoreFailures = False):
"""Kill all known clusters and check that the cluster dictionary is empty"""
- self.killAllClusters()
+ self.killAllClusters(ignoreFailures)
self.checkNumBrokers(0)
# --- Stop cluster nodes using qpidd -q ---
- def stopNode(self, nodeNumber, clusterName, updateDict = True):
+ def stopNode(self, nodeNumber, clusterName, updateDict = True, ignoreFailures = False):
"""Stop the given node in the named cluster using qpidd -q"""
- self.stopBroker(self.getNodeTuple(nodeNumber, clusterName)[self.PORT])
+ self.stopBroker(self.getNodeTuple(nodeNumber, clusterName), ignoreFailures)
if updateDict:
del(self._clusterDict[clusterName][nodeNumber])
- def stopAllClusters(self):
+ def stopAllClusters(self, ignoreFailures = False):
"""Stop all known clusters"""
for n in self._clusterDict.iterkeys():
- self.stopCluster(n, False)
+ self.stopCluster(n, False, ignoreFailures)
self._clusterDict.clear()
- def stopCluster(self, clusterName, updateDict = True):
+ def stopCluster(self, clusterName, updateDict = True, ignoreFailures = False):
"""Stop all nodes in the named cluster"""
for n in self._clusterDict[clusterName].iterkeys():
- self.stopNode(n, clusterName, False)
+ self.stopNode(n, clusterName, False, ignoreFailures)
if updateDict:
del(self._clusterDict[clusterName])
- def stopCheckCluster(self, clusterName):
+ def stopCheckCluster(self, clusterName, ignoreFailures = False):
"""Stop the named cluster and check that the name is removed from the cluster dictionary"""
- self.stopCluster(clusterName)
+ self.stopCluster(clusterName, True, ignoreFailures)
if self.clusterExists(clusterName):
raise Exception("Unable to kill cluster %s; %d nodes still exist" % (clusterName, self.getNumClusterBrokers(clusterName)))
- def stopCheckAll(self):
+ def stopCheckAll(self, ignoreFailures = False):
"""Kill all known clusters and check that the cluster dictionary is empty"""
self.stopAllClusters()
self.checkNumBrokers(0)
@@ -479,11 +500,252 @@ class TestBaseCluster(TestBase):
if wait:
receiver.wait()
return msgs
-
- def sendReceiveMsgs(self, nodeNumber, clusterName, exchangeName, queueName, numMsgs, wait = True, msgSize = None):
- self.createBindDirectExchangeQueue(nodeNumber, clusterName, exchangeName, queueName)
- txMsgs = self.sendMsgs(nodeNumber, clusterName, exchangeName, queueName, numMsgs, msgSize, wait)
- rxMsgs = self.receiveMsgs(nodeNumber, clusterName, queueName, numMsgs, wait)
- if txMsgs != rxMsgs:
- self.fail("Send - receive message mismatch")
+
+
+ # --- Exchange-specific helper inner classes ---
+
+ class TestHelper:
+ """
+ This is a "virtual" superclass for test helpers, and is not useful on its own, but the
+ per-exchange subclasses are designed to keep track of the messages sent to and received
+ from queues which have bindings to that exchange type.
+ """
+
+ def __init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameList):
+
+ """Dictionary of queues and lists of messages sent to them."""
+ self._txMsgs = {}
+ """Dictionary of queues and lists of messages received from them."""
+ self._rxMsgs = {}
+ """List of node numbers currently in the cluster"""
+ self._nodes = []
+ """List of node numbers which have been killed and can therefore be recovered"""
+ self._deadNodes = []
+ """Last node to be used"""
+ self._lastNode = None
+
+ self._testBaseCluster = testBaseCluster
+ self._clusterName = clusterName
+ self._exchangeName = exchangeName
+ self._queueNameList = queueNameList
+ self._addQueues(queueNameList)
+ self._testBaseCluster.createCheckCluster(clusterName, numNodes)
+ self._nodes.extend(range(0, numNodes))
+
+ def _addQueues(self, queueNameList):
+ for qn in queueNameList:
+ if not qn in self._txMsgs:
+ self._txMsgs[qn] = []
+ if not qn in self._rxMsgs:
+ self._rxMsgs[qn] = []
+
+ def _bindQueue(self, queueName, bindingKey, nodeNumber = None):
+ """Bind a queue to an exchange using a binding key."""
+ if nodeNumber == None:
+ nodeNumber = self._nodes[0] # first available node
+ self._testBaseCluster.addQueue(nodeNumber, self._clusterName, queueName)
+ self._testBaseCluster.bind(nodeNumber, self._clusterName, self._exchangeName, queueName, bindingKey)
+
+ def _highestNodeNumber(self):
+ """Find the highest node number used so far between the current nodes and those stopped/killed."""
+ highestNode = self._nodes[-1]
+ if len(self._deadNodes) == 0:
+ return highestNode
+ highestDeadNode = self._deadNodes[-1]
+ if highestNode > highestDeadNode:
+ return highestNode
+ return highestDeadNode
+
+ def killCluster(self):
+ """Kill all nodes in the cluster"""
+ self._testBaseCluster.killCluster(self._clusterName)
+ self._testBaseCluster.checkNumClusterBrokers(self._clusterName, 0)
+ self._deadNodes.extend(self._nodes)
+ self._deadNodes.sort()
+ del self._nodes[:]
+
+ def restoreCluster(self, lastNode = None, restoreNodes = True):
+ """Restore a previously killed cluster"""
+ self._testBaseCluster.createCluster(self._clusterName)
+ if restoreNodes:
+ numNodes = len(self._deadNodes)
+ self.restoreNodes(lastNode)
+ self._testBaseCluster.checkNumClusterBrokers(self._clusterName, numNodes)
+
+ def addNodes(self, numberOfNodes = 1):
+ """Add a fixed number of nodes to the cluster."""
+ nodeStart = self._highestNodeNumber() + 1
+ for i in range(0, numberOfNodes):
+ nodeNumber = nodeStart + i
+ self._testBaseCluster.createClusterNode(nodeNumber, self._clusterName)
+ self._nodes.append(nodeNumber)
+ self._testBaseCluster.checkNumClusterBrokers(self._clusterName, len(self._nodes))
+
+ def restoreNode(self, nodeNumber):
+ """Restore a cluster node that has been previously killed"""
+ if nodeNumber not in self._deadNodes:
+ raise Exception("restoreNode(): Node number %d not in dead node list %s" % (nodeNumber, self._deadNodes))
+ self._testBaseCluster.createClusterNode(nodeNumber, self._clusterName)
+ self._deadNodes.remove(nodeNumber)
+ self._nodes.append(nodeNumber)
+ self._nodes.sort()
+
+ def restoreNodes(self, lastNode = None):
+ """Restore all known cluster nodes that have been previously killed starting with a known last-used node"""
+ if len(self._nodes) == 0: # restore last-used node first
+ if lastNode == None:
+ lastNode = self._lastNode
+ self.restoreNode(lastNode)
+ while len(self._deadNodes) > 0:
+ self.restoreNode(self._deadNodes[0])
+
+ def killNode(self, nodeNumber):
+ """Kill a cluster node (if it is in the _nodes list)."""
+ if nodeNumber not in self._nodes:
+ raise Exception("killNode(): Node number %d not in node list %s" % (nodeNumber, self._nodes))
+ self._testBaseCluster.killNode(nodeNumber, self._clusterName)
+ self._nodes.remove(nodeNumber)
+ self._deadNodes.append(nodeNumber)
+ self._deadNodes.sort()
+
+ def sendMsgs(self, routingKey, numMsgs, nodeNumber = None, msgSize = None, wait = True):
+ """Send a fixed number of messages using the given routing key."""
+ if nodeNumber == None:
+ nodeNumber = self._nodes[0] # Use first available node
+ msgs = self._testBaseCluster._makeMessageList(numMsgs, msgSize)
+ sender = self._testBaseCluster.createSender(nodeNumber, self._clusterName, self._exchangeName, routingKey)
+ sender.stdin.write(msgs)
+ sender.stdin.close()
+ if wait:
+ sender.wait()
+ self._lastNode = nodeNumber
+ return msgs.split()
+
+ # TODO - this i/f is messy: one mumMsgs can be given, but a list of queues
+ # so assuming numMsgs for each queue
+ # A mechanism is needed to specify a different numMsgs per queue
+ def receiveMsgs(self, numMsgs, nodeNumber = None, queueNameList = None, wait = True):
+ """Receive a fixed number of messages from a named queue. If numMsgs == None, get all remaining messages."""
+ if nodeNumber == None:
+ nodeNumber = self._nodes[0] # Use first available node
+ if queueNameList == None:
+ queueNameList = self._txMsgs.iterkeys()
+ for qn in queueNameList:
+ nm = numMsgs
+ if nm == None:
+ nm = len(self._txMsgs[qn]) - len(self._rxMsgs[qn]) # get all remaining messages
+ if nm > 0:
+ receiver = self._testBaseCluster.createReciever(nodeNumber, self._clusterName, qn, nm)
+ cnt = 0
+ while cnt < nm:
+ rx = receiver.stdout.readline().strip()
+ if rx == "" and receiver.poll() != None: break
+ self._rxMsgs[qn].append(rx)
+ cnt = cnt + 1
+ if wait:
+ receiver.wait()
+ self._lastNode = nodeNumber
+
+ def receiveRemainingMsgs(self, nodeNumber = None, queueNameList = None, wait = True):
+ """Receive all remaining messages on named queue."""
+ self.receiveMsgs(None, nodeNumber, queueNameList, wait)
+
+ def checkMsgs(self):
+ """Return True if all expected messages have been received (ie the transmit and receive list are identical)."""
+ txMsgTot = 0
+ rxMsgTot = 0
+ for qn, txMsgList in self._txMsgs.iteritems():
+ rxMsgList = self._rxMsgs[qn]
+ txMsgTot = txMsgTot + len(txMsgList)
+ rxMsgTot = rxMsgTot + len(rxMsgList)
+ if len(txMsgList) != len(rxMsgList):
+ return False
+ for i, m in enumerate(txMsgList):
+ if m != rxMsgList[i]:
+ return False
+ if txMsgTot == 0 and rxMsgTot == 0:
+ print "WARNING: No messages were either sent or received"
+ return True
+
+ def finalizeTest(self):
+ """Recover all the remaining messages on all queues, then check that all expected messages were received."""
+ self.receiveRemainingMsgs()
+ self._testBaseCluster.stopCheckAll()
+ if not self.checkMsgs():
+ self._testBaseCluster.fail("Send - receive message mismatch")
+ self.printMsgs()
+
+ def printMsgs(self, txMsgs = True, rxMsgs = True):
+ """Print all messages transmitted and received."""
+ for qn, txMsgList in self._txMsgs.iteritems():
+ print "Queue: %s" % qn
+ if txMsgs:
+ print " txMsgList = %s" % txMsgList
+ if rxMsgs:
+ rxMsgList = self._rxMsgs[qn]
+ print " rxMsgList = %s" % rxMsgList
+
+
+ class DirectExchangeTestHelper(TestHelper):
+
+ def __init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameList):
+ TestBaseCluster.TestHelper.__init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameList)
+ self._testBaseCluster.addExchange(0, clusterName, "direct", exchangeName)
+ for qn in queueNameList:
+ self._bindQueue(qn, qn)
+
+ def addQueues(self, queueNameList):
+ self._addQueues(queueNameList)
+ for qn in queueNameList:
+ self._bindQueue(qn, qn)
+
+ def sendMsgs(self, numMsgs, nodeNumber = None, queueNameList = None, msgSize = None, wait = True):
+ if queueNameList == None:
+ queueNameList = self._txMsgs.iterkeys()
+ for qn in queueNameList:
+ self._txMsgs[qn].extend(TestBaseCluster.TestHelper.sendMsgs(self, qn, numMsgs, nodeNumber, msgSize, wait))
+
+
+ class TopicExchangeTestHelper(TestHelper):
+
+ def __init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameKeyList):
+ self._queueNameKeyList = queueNameKeyList
+ TestBaseCluster.TestHelper.__init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameKeyList.iterkeys())
+ self._testBaseCluster.addExchange(0, clusterName, "topic", exchangeName)
+ for qn, bk in queueNameKeyList.iteritems():
+ self._bindQueue(qn, bk)
+
+ def addQueues(self, queueNameKeyList):
+ self._addQueues(queueNameKeyList.iterkeys())
+ for qn, bk in queueNameKeyList.iteritems():
+ self._bindQueue(qn, bk)
+
+ def _prepareRegex(self, bk):
+ # This regex conversion is not very complete - there are other chars that should be escaped too
+ return "^%s$" % bk.replace(".", r"\.").replace("*", r"[^.]*").replace("#", ".*")
+
+ def sendMsgs(self, routingKey, numMsgs, nodeNumber = None, msgSize = None, wait = True):
+ msgList = TestBaseCluster.TestHelper.sendMsgs(self, routingKey, numMsgs, nodeNumber, msgSize, wait)
+ for qn, bk in self._queueNameKeyList.iteritems():
+ if re.match(self._prepareRegex(bk), routingKey):
+ self._txMsgs[qn].extend(msgList)
+
+
+ class FanoutExchangeTestHelper(TestHelper):
+
+ def __init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameList):
+ TestBaseCluster.TestHelper.__init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameList)
+ self._testBaseCluster.addExchange(0, clusterName, "fanout", exchangeName)
+ for qn in queueNameList:
+ self._bindQueue(qn, "")
+
+ def addQueues(self, queueNameList):
+ self._addQueues(queueNameList)
+ for qn in queueNameList:
+ self._bindQueue(qn, "")
+
+ def sendMsgs(self, numMsgs, nodeNumber = None, msgSize = None, wait = True):
+ msgList = TestBaseCluster.TestHelper.sendMsgs(self, "", numMsgs, nodeNumber, msgSize, wait)
+ for ml in self._txMsgs.itervalues():
+ ml.extend(msgList)
\ No newline at end of file