diff options
author | Kim van der Riet <kpvdr@apache.org> | 2009-06-03 15:25:56 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2009-06-03 15:25:56 +0000 |
commit | 12048dcd9d13f1c2baeffb4b65c4ce65c0155234 (patch) | |
tree | ee4913784a020897e081c210f99eda53f152bf61 /cpp/src | |
parent | 811ef0bba2901e8ff65cd852a7f1d020493b2642 (diff) | |
download | qpid-python-12048dcd9d13f1c2baeffb4b65c4ce65c0155234.tar.gz |
Python cluster test improvements and some additional tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@781431 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rwxr-xr-x | cpp/src/tests/cluster.py | 433 | ||||
-rwxr-xr-x | cpp/src/tests/run_cluster_tests | 30 | ||||
-rw-r--r-- | cpp/src/tests/testlib.py | 382 |
3 files changed, 491 insertions, 354 deletions
diff --git a/cpp/src/tests/cluster.py b/cpp/src/tests/cluster.py index 87c9b363a8..ae4cc7aa85 100755 --- a/cpp/src/tests/cluster.py +++ b/cpp/src/tests/cluster.py @@ -29,10 +29,9 @@ class ClusterTests(TestBaseCluster): try: clusterName = "cluster-01" self.createCheckCluster(clusterName, 5) - self.checkNumBrokers(5) self.stopCheckCluster(clusterName) except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_02_MultipleClusterInitialization(self): @@ -40,13 +39,13 @@ class ClusterTests(TestBaseCluster): try: for i in range(0, 5): clusterName = "cluster-02.%d" % i - self.createCluster(clusterName, 5) + self.createCheckCluster(clusterName, 5) self.checkNumBrokers(25) self.killCluster("cluster-02.2") self.checkNumBrokers(20) self.stopCheckAll() except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_03_AddRemoveNodes(self): @@ -66,7 +65,7 @@ class ClusterTests(TestBaseCluster): self.checkNumClusterBrokers(clusterName, 7) self.stopCheckAll() except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_04_RemoveRestoreNodes(self): @@ -93,7 +92,7 @@ class ClusterTests(TestBaseCluster): self.checkNumClusterBrokers(clusterName, 6) self.stopCheckAll() except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_05_KillAllNodesThenRecover(self): @@ -105,170 +104,126 @@ class ClusterTests(TestBaseCluster): self.createCheckCluster(clusterName, 6) self.stopCheckAll() except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_06_PublishConsume(self): """Publish then consume 100 messages from a single cluster""" try: - clusterName = "cluster-06" - self.createCheckCluster(clusterName, 3) - self.sendReceiveMsgs(0, clusterName, "test-exchange-06", "test-queue-06", 100) - self.stopCheckAll() + dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-06", 3, "test-exchange-06", ["test-queue-06"]) + dh.sendMsgs(100) + dh.finalizeTest() except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_07_MultiplePublishConsume(self): """Staggered publish and consume on a single cluster""" try: - clusterName = "cluster-07" - exchangeName = "test-exchange-07" - queueName = "test-queue-07" - self.createCheckCluster(clusterName, 3) - self.createBindDirectExchangeQueue(0, clusterName, exchangeName, queueName) - txMsgs = self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 20, 0 - rxMsgs = self.receiveMsgs(1, clusterName, queueName, 10) # 10, 10 - txMsgs += self.sendMsgs(2, clusterName, exchangeName, queueName, 20) # 30, 10 - rxMsgs += self.receiveMsgs(0, clusterName, queueName, 20) # 10, 30 - txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 20) # 30, 30 - rxMsgs += self.receiveMsgs(2, clusterName, queueName, 20) # 10, 50 - txMsgs += self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 30, 50 - rxMsgs += self.receiveMsgs(1, clusterName, queueName, 30) # 0, 80 - self.stopCheckAll() - if txMsgs != rxMsgs: - print "txMsgs=%s" % txMsgs - print "rxMsgs=%s" % rxMsgs - self.fail("Send - receive message mismatch") + dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-07", 3, "test-exchange-07", ["test-queue-07"]) + # tx rx nodes + # 0 0 0 1 2 + dh.sendMsgs(20) # 20 0 * + dh.receiveMsgs(10, 1) # 20 10 * + dh.sendMsgs(20, 2) # 40 10 * + dh.receiveMsgs(20, 0) # 40 30 * + dh.sendMsgs(20, 1) # 60 30 * + dh.receiveMsgs(20, 2) # 60 50 * + dh.sendMsgs(20, 0) # 80 50 * + dh.finalizeTest() except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_08_MsgPublishConsumeAddRemoveNodes(self): """Staggered publish and consume interleaved with adding and removing nodes on a single cluster""" try: - clusterName = "cluster-08" - exchangeName = "test-exchange-08" - queueName = "test-queue-08" - self.createCheckCluster(clusterName, 3) - self.createBindDirectExchangeQueue(0, clusterName, exchangeName, queueName) - txMsgs = self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 20, 0 - for i in range(3,6): - self.createClusterNode(i, clusterName) - self.checkNumClusterBrokers(clusterName, 6) - txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 20) # 40, 0 - self.killNode(0, clusterName) - self.checkNumClusterBrokers(clusterName, 5) - rxMsgs = self.receiveMsgs(2, clusterName, queueName, 10) # 30, 10 - self.killNode(2, clusterName) - self.checkNumClusterBrokers(clusterName, 4) - rxMsgs += self.receiveMsgs(3, clusterName, queueName, 20) # 10, 30 - self.createClusterNode(6, clusterName) - self.checkNumClusterBrokers(clusterName, 5) - txMsgs += self.sendMsgs(4, clusterName, exchangeName, queueName, 20) # 30, 30 - rxMsgs += self.receiveMsgs(5, clusterName, queueName, 20) # 10, 50 - self.createClusterNode(7, clusterName) - self.checkNumClusterBrokers(clusterName, 6) - txMsgs += self.sendMsgs(6, clusterName, exchangeName, queueName, 20) # 30, 50 - rxMsgs += self.receiveMsgs(1, clusterName, queueName, 30) # 0, 80 - self.stopCheckAll() - if txMsgs != rxMsgs: - print "txMsgs=%s" % txMsgs - print "rxMsgs=%s" % rxMsgs - self.fail("Send - receive message mismatch") + dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-08", 3, "test-exchange-08", ["test-queue-08"]) + # tx rx nodes + # 0 0 0 1 2 + dh.sendMsgs(20) # 20 0 * + dh.addNodes(2) # 0 1 2 3 4 + dh.sendMsgs(20, 1) # 40 0 * + dh.killNode(0) # . 1 2 3 4 + dh.receiveMsgs(10, 2) # 40 10 * + dh.killNode(2) # . 1 . 3 4 + dh.receiveMsgs(20, 3) # 40 30 * + dh.addNodes() # . 1 . 3 4 5 + dh.sendMsgs(20, 4) # 60 30 * + dh.receiveMsgs(20, 5) # 60 50 * + dh.addNodes() # . 1 . 3 4 5 6 + dh.sendMsgs(20, 6) # 80 50 * + dh.killNode(5) # . 1 . 3 4 . 6 + dh.finalizeTest() except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_09_MsgPublishConsumeRemoveRestoreNodes(self): """Publish and consume messages interleaved with adding and restoring previous nodes on a single cluster""" try: - clusterName = "cluster-09" - exchangeName = "test-exchange-09" - queueName = "test-queue-09" - self.createCheckCluster(clusterName, 6) - self.createBindDirectExchangeQueue(0, clusterName, exchangeName, queueName) - txMsgs = self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 20, 0 - self.killNode(2, clusterName) - self.checkNumClusterBrokers(clusterName, 5) - txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 20) # 40, 0 - self.killNode(0, clusterName) - self.checkNumClusterBrokers(clusterName, 4) - rxMsgs = self.receiveMsgs(3, clusterName, queueName, 10) # 30, 10 - self.killNode(4, clusterName) - self.checkNumClusterBrokers(clusterName, 3) - rxMsgs += self.receiveMsgs(5, clusterName, queueName, 20) # 10, 30 - self.createClusterNode(2, clusterName) - self.checkNumClusterBrokers(clusterName, 4) - txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 20) # 30, 30 - self.createClusterNode(0, clusterName) - self.checkNumClusterBrokers(clusterName, 5) - rxMsgs += self.receiveMsgs(2, clusterName, queueName, 20) # 10, 50 - self.createClusterNode(4, clusterName) - self.checkNumClusterBrokers(clusterName, 6) - txMsgs += self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 30, 50 - rxMsgs += self.receiveMsgs(4, clusterName, queueName, 30) # 0, 80 - self.stopCheckAll() - if txMsgs != rxMsgs: - print "txMsgs=%s" % txMsgs - print "rxMsgs=%s" % rxMsgs - self.fail("Send - receive message mismatch") + dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-09", 6, "test-exchange-09", ["test-queue-09"]) + # tx rx nodes + # 0 0 0 1 2 3 4 5 + dh.sendMsgs(20) # 20 0 * + dh.killNode(2) # 0 1 . 3 4 5 + dh.sendMsgs(20, 1) # 40 0 * + dh.killNode(0) # . 1 . 3 4 5 + dh.receiveMsgs(10, 3) # 40 10 * + dh.killNode(4) # . 1 . 3 . 5 + dh.receiveMsgs(20, 5) # 40 30 * + dh.restoreNode(2) # . 1 2 3 . 5 + dh.sendMsgs(20, 1) # 60 30 * + dh.restoreNode(0) # 0 1 2 3 . 5 + dh.receiveMsgs(20, 0) # 60 50 * + dh.killNode(2) # 0 1 . 3 . 5 + dh.restoreNode(2) # 0 1 2 3 . 5 + dh.sendMsgs(20, 2) # 80 50 * + dh.finalizeTest() except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_10_LinearNodeKillCreateProgression(self): """Publish and consume messages while linearly killing all original nodes and replacing them with new ones""" try: - clusterName = "cluster-10" - exchangeName = "test-exchange-10" - queueName = "test-queue-10" - self.createCheckCluster(clusterName, 4) - self.createBindDirectExchangeQueue(2, clusterName, exchangeName, queueName) - txMsgs = self.sendMsgs(2, clusterName, exchangeName, queueName, 20) - rxMsgs = self.receiveMsgs(3, clusterName, queueName, 10) - for i in range(0, 16): - self.killNode(i, clusterName) - self.createClusterNode(i+4, clusterName) - self.checkNumClusterBrokers(clusterName, 4) - txMsgs += self.sendMsgs(i+1, clusterName, exchangeName, queueName, 20) - rxMsgs += self.receiveMsgs(i+2, clusterName, queueName, 20) - rxMsgs += self.receiveMsgs(16, clusterName, queueName, 10) - self.stopCheckAll() - if txMsgs != rxMsgs: - print "txMsgs=%s" % txMsgs - print "rxMsgs=%s" % rxMsgs - self.fail("Send - receive message mismatch") + dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-10", 4, "test-exchange-10", ["test-queue-10"]) + # tx rx nodes + # 0 0 0 1 2 3 + dh.sendMsgs(20) # 20 0 * + dh.receiveMsgs(10, 1) # 20 10 * + for i in range(0, 16): # First loop: + dh.killNode(i) # . 1 2 3 + dh.addNodes() # . 1 2 3 4 + dh.sendMsgs(20, i+1) # 40 10 * + dh.receiveMsgs(20, i+2) # 40 30 * + # After loop: + # 340 330 . . . . . . . . . . . . . . . . 16 17 18 19 + dh.finalizeTest() except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_11_CircularNodeKillRestoreProgression(self): """Publish and consume messages while circularly killing all original nodes and restoring them again""" try: - clusterName = "cluster-11" - exchangeName = "test-exchange-11" - queueName = "test-queue-11" - self.createCheckCluster(clusterName, 4) - self.createBindDirectExchangeQueue(2, clusterName, exchangeName, queueName) - txMsgs = self.sendMsgs(3, clusterName, exchangeName, queueName, 20) - rxMsgs = self.receiveMsgs(0, clusterName, queueName, 10) - self.killNode(0, clusterName) - self.killNode(1, clusterName) - for i in range(0, 16): - self.killNode((i + 2) % 4, clusterName) - self.createClusterNode(i % 4, clusterName) - self.checkNumClusterBrokers(clusterName, 2) - txMsgs += self.sendMsgs((i + 3) % 4, clusterName, exchangeName, queueName, 20) - rxMsgs += self.receiveMsgs((i + 4) % 4, clusterName, queueName, 20) - rxMsgs += self.receiveMsgs(3, clusterName, queueName, 10) - self.stopCheckAll() - if txMsgs != rxMsgs: - print "txMsgs=%s" % txMsgs - print "rxMsgs=%s" % rxMsgs - self.fail("Send - receive message mismatch") + dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-11", 4, "test-exchange-11", ["test-queue-11"]) + # tx rx nodes + # 0 0 0 1 2 3 + dh.sendMsgs(20, 3) # 20 0 * + dh.receiveMsgs(10) # 20 10 * + dh.killNode(0) # . 1 2 3 + dh.killNode(1) # . . 2 3 + for i in range(0, 16): # First loop: + dh.killNode((i + 2) % 4) # . . . 3 + dh.restoreNode(i % 4) # 0 . . 3 + dh.sendMsgs(20, (i + 3) % 4) # 40 10 * + dh.receiveMsgs(20, (i + 4) % 4) # 40 30 * + # After loop: + # 340 330 . . 2 3 + dh.finalizeTest() except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_12_KillAllNodesRecoverMessages(self): @@ -277,180 +232,94 @@ class ClusterTests(TestBaseCluster): print " No store loaded, skipped" return try: - clusterName = "cluster-12" - exchangeName = "test-exchange-12" - queueName = "test-queue-12" - self.createCheckCluster(clusterName, 4) - self.createBindDirectExchangeQueue(2, clusterName, exchangeName, queueName) - txMsgs = self.sendMsgs(0, clusterName, exchangeName, queueName, 20) - rxMsgs = self.receiveMsgs(1, clusterName, queueName, 10) - txMsgs += self.sendMsgs(2, clusterName, exchangeName, queueName, 20) - rxMsgs += self.receiveMsgs(3, clusterName, queueName, 20) - self.killNode(0, clusterName) - self.createClusterNode(4, clusterName) - self.checkNumClusterBrokers(clusterName, 4) - txMsgs += self.sendMsgs(4, clusterName, exchangeName, queueName, 20) - rxMsgs += self.receiveMsgs(1, clusterName, queueName, 20) - self.killNode(2, clusterName) - self.createClusterNode(0, clusterName) - self.createClusterNode(5, clusterName) - self.checkNumClusterBrokers(clusterName, 5) - txMsgs += self.sendMsgs(0, clusterName, exchangeName, queueName, 20) - rxMsgs += self.receiveMsgs(3, clusterName, queueName, 20) - self.killAllClusters() - self.checkNumClusterBrokers(clusterName, 0) - self.createCluster(clusterName) - self.createClusterNode(3, clusterName) # last node to be used - self.createClusterNode(0, clusterName) - self.createClusterNode(1, clusterName) - self.createClusterNode(2, clusterName) - self.createClusterNode(4, clusterName) - self.createClusterNode(5, clusterName) - rxMsgs += self.receiveMsgs(0, clusterName, queueName, 10) - if txMsgs != rxMsgs: - print "txMsgs=%s" % txMsgs - print "rxMsgs=%s" % rxMsgs - self.fail("Send - receive message mismatch") + dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-12", 4, "test-exchange-12", ["test-queue-12"]) + # tx rx nodes + # 0 0 0 1 2 3 + dh.sendMsgs(20, 2) # 20 0 * + dh.receiveMsgs(10, 1) # 20 10 * + dh.killNode(1) # 0 . 2 3 + dh.sendMsgs(20, 0) # 40 10 * + dh.receiveMsgs(20, 3) # 40 30 * + dh.killNode(2) # 0 . . 3 + dh.addNodes(2) # 0 . . 3 4 5 + dh.sendMsgs(20, 5) # 60 30 * + dh.receiveMsgs(20, 4) # 60 50 * + dh.killCluster() # cluster does not exist + dh.restoreCluster() # 60 50 . . . . . . + dh.restoreNodes() # 0 1 2 3 4 5 + dh.finalizeTest() except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_13_TopicExchange(self): - """Create topic exchange in a cluster and make sure it replicates correctly""" + """Create topic exchange in a cluster and make sure it behaves correctly""" try: - clusterName = "cluster-13" - self.createCheckCluster(clusterName, 4) - topicExchangeName = "test-exchange-13" - topicQueueNameKeyList = {"test-queue-13-A" : "#.A", "test-queue-13-B" : "#.B", "test-queue-13-C" : "C.#", "test-queue-13-D" : "D.#"} - self.createBindTopicExchangeQueues(2, clusterName, topicExchangeName, topicQueueNameKeyList) - - # Place initial messages - txMsgsA = txMsgsC = self.sendMsgs(3, clusterName, topicExchangeName, "C.hello.A", 10) # (10, 0, 10, 0) - self.sendMsgs(2, clusterName, topicExchangeName, "hello", 10) # Should not go to any queue - txMsgsD = self.sendMsgs(1, clusterName, topicExchangeName, "D.hello.A", 10) # (20, 0, 10, 10) - txMsgsA += txMsgsD - txMsgsB = self.sendMsgs(0, clusterName, topicExchangeName, "hello.B", 20) # (20, 20, 10, 10) + topicQueueNameKeyList = {"test-queue-13-A" : "#.A", "test-queue-13-B" : "*.B", "test-queue-13-C" : "C.#", "test-queue-13-D" : "D.*"} + th = TestBaseCluster.TopicExchangeTestHelper(self, "cluster-13", 4, "test-exchange-13", topicQueueNameKeyList) + # Place initial messages + th.sendMsgs("C.hello.A", 10) + th.sendMsgs("hello.world", 10) # matches none of the queues + th.sendMsgs("D.hello.A", 10) + th.sendMsgs("hello.B", 20) + th.sendMsgs("D.hello", 20) # Kill and add some nodes - self.killNode(0, clusterName) - self.killNode(2, clusterName) - self.createClusterNode(4, clusterName) - self.createClusterNode(5, clusterName) - self.checkNumClusterBrokers(clusterName, 4) + th.killNode(0) + th.killNode(2) + th.addNodes(2) # Pull 10 messages from each queue - rxMsgsA = self.receiveMsgs(1, clusterName, "test-queue-13-A", 10) # (10, 20, 10, 10) - rxMsgsB = self.receiveMsgs(3, clusterName, "test-queue-13-B", 10) # (10, 10, 10, 10) - rxMsgsC = self.receiveMsgs(4, clusterName, "test-queue-13-C", 10) # (10, 10, 0, 10) - rxMsgsD = self.receiveMsgs(5, clusterName, "test-queue-13-D", 10) # (10, 10, 0, 0) + th.receiveMsgs(10) # Kill and add another node - self.killNode(4, clusterName) - self.createClusterNode(6, clusterName) - self.checkNumClusterBrokers(clusterName, 4) + th.killNode(4) + th.addNodes() # Add two more queues - self.createBindTopicExchangeQueues(6, clusterName, topicExchangeName, {"test-queue-13-E" : "#.bye.A", "test-queue-13-F" : "#.bye.B"}) + th.addQueues({"test-queue-13-E" : "#.bye.A", "test-queue-13-F" : "#.bye.B"}) # Place more messages - txMsgs = self.sendMsgs(3, clusterName, topicExchangeName, "C.bye.A", 10) # (20, 10, 10, 0, 10, 0) - txMsgsA += txMsgs - txMsgsC += txMsgs - txMsgsE = txMsgs - self.sendMsgs(1, clusterName, topicExchangeName, "bye", 20) # Should not go to any queue - txMsgs = self.sendMsgs(5, clusterName, topicExchangeName, "D.bye.B", 20) # (20, 30, 10, 20, 10, 20) - txMsgsB += txMsgs - txMsgsD += txMsgs - txMsgsF = txMsgs + th.sendMsgs("C.bye.A", 10) + th.sendMsgs("hello.bye", 20) # matches none of the queues + th.sendMsgs("hello.bye.B", 20) + th.sendMsgs("D.bye", 20) # Kill all nodes but one - self.killNode(1, clusterName) - self.killNode(3, clusterName) - self.killNode(6, clusterName) - self.checkNumClusterBrokers(clusterName, 1) - # Pull all remaining messages from each queue - rxMsgsA += self.receiveMsgs(5, clusterName, "test-queue-13-A", 20) - rxMsgsB += self.receiveMsgs(5, clusterName, "test-queue-13-B", 30) - rxMsgsC += self.receiveMsgs(5, clusterName, "test-queue-13-C", 10) - rxMsgsD += self.receiveMsgs(5, clusterName, "test-queue-13-D", 20) - rxMsgsE = self.receiveMsgs(5, clusterName, "test-queue-13-E", 10) - rxMsgsF = self.receiveMsgs(5, clusterName, "test-queue-13-F", 20) - # Check messages - self.stopCheckAll() - if txMsgsA != rxMsgsA: - self.fail("Send - receive message mismatch for queue A") - if txMsgsB != rxMsgsB: - self.fail("Send - receive message mismatch for queue B") - if txMsgsC != rxMsgsC: - self.fail("Send - receive message mismatch for queue C") - if txMsgsD != rxMsgsD: - self.fail("Send - receive message mismatch for queue D") - if txMsgsE != rxMsgsE: - self.fail("Send - receive message mismatch for queue E") - if txMsgsF != rxMsgsF: - self.fail("Send - receive message mismatch for queue F") + th.killNode(1) + th.killNode(3) + th.killNode(6) + # Pull all remaining messages from each queue and check messages + th.finalizeTest() except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_14_FanoutExchange(self): - """Create fanout exchange in a cluster and make sure it replicates correctly""" + """Create fanout exchange in a cluster and make sure it behaves correctly""" try: - clusterName = "cluster-14" - self.createCheckCluster(clusterName, 4) - fanoutExchangeName = "test-exchange-14" fanoutQueueNameList = ["test-queue-14-A", "test-queue-14-B", "test-queue-14-C"] - self.createBindFanoutExchangeQueues(2, clusterName, fanoutExchangeName, fanoutQueueNameList) - + fh = TestBaseCluster.FanoutExchangeTestHelper(self, "cluster-14", 4, "test-exchange-14", fanoutQueueNameList) # Place initial 20 messages, retrieve 10 - txMsg = self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20) - rxMsgA = self.receiveMsgs(1, clusterName, "test-queue-14-A", 10) - rxMsgB = self.receiveMsgs(3, clusterName, "test-queue-14-B", 10) - rxMsgC = self.receiveMsgs(0, clusterName, "test-queue-14-C", 10) + fh.sendMsgs(20) + fh.receiveMsgs(10) # Kill and add some nodes - self.killNode(0, clusterName) - self.killNode(2, clusterName) - self.createClusterNode(4, clusterName) - self.createClusterNode(5, clusterName) - self.checkNumClusterBrokers(clusterName, 4) + fh.killNode(0) + fh.killNode(2) + fh.addNodes(2) # Place another 20 messages, retrieve 20 - txMsg += self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20) - rxMsgA += self.receiveMsgs(1, clusterName, "test-queue-14-A", 20) - rxMsgB += self.receiveMsgs(3, clusterName, "test-queue-14-B", 20) - rxMsgC += self.receiveMsgs(4, clusterName, "test-queue-14-C", 20) + fh.sendMsgs(20) + fh.receiveMsgs(20) # Kill and add another node - self.killNode(4, clusterName) - self.createClusterNode(6, clusterName) - self.checkNumClusterBrokers(clusterName, 4) + fh.killNode(4) + fh.addNodes() # Add another 2 queues - self.createBindFanoutExchangeQueues(6, clusterName, fanoutExchangeName, ["test-queue-14-D", "test-queue-14-E"]) + fh.addQueues(["test-queue-14-D", "test-queue-14-E"]) # Place another 20 messages, retrieve 20 - tmp = self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20) - txMsg += tmp - rxMsgA += self.receiveMsgs(1, clusterName, "test-queue-14-A", 20) - rxMsgB += self.receiveMsgs(3, clusterName, "test-queue-14-B", 20) - rxMsgC += self.receiveMsgs(6, clusterName, "test-queue-14-C", 20) - rxMsgD = self.receiveMsgs(6, clusterName, "test-queue-14-D", 10) - rxMsgE = self.receiveMsgs(6, clusterName, "test-queue-14-E", 10) + fh.sendMsgs(20) + fh.receiveMsgs(20) # Kill all nodes but one - self.killNode(1, clusterName) - self.killNode(3, clusterName) - self.killNode(6, clusterName) - self.checkNumClusterBrokers(clusterName, 1) - # Pull all remaining messages from each queue - rxMsgA += self.receiveMsgs(5, clusterName, "test-queue-14-A", 10) - rxMsgB += self.receiveMsgs(5, clusterName, "test-queue-14-B", 10) - rxMsgC += self.receiveMsgs(5, clusterName, "test-queue-14-C", 10) - rxMsgD += self.receiveMsgs(5, clusterName, "test-queue-14-D", 10) - rxMsgE += self.receiveMsgs(5, clusterName, "test-queue-14-E", 10) + fh.killNode(1) + fh.killNode(3) + fh.killNode(6) # Check messages - self.stopCheckAll() - if txMsg != rxMsgA: - self.fail("Send - receive message mismatch for queue A") - if txMsg != rxMsgB: - self.fail("Send - receive message mismatch for queue B") - if txMsg != rxMsgC: - self.fail("Send - receive message mismatch for queue C") - if tmp != rxMsgD: - self.fail("Send - receive message mismatch for queue D") - if tmp != rxMsgE: - self.fail("Send - receive message mismatch for queue E") + fh.finalizeTest() except: - self.killAllClusters() + self.killAllClusters(True) raise # Start the test here diff --git a/cpp/src/tests/run_cluster_tests b/cpp/src/tests/run_cluster_tests index d2f7a77865..103896cd3d 100755 --- a/cpp/src/tests/run_cluster_tests +++ b/cpp/src/tests/run_cluster_tests @@ -1,4 +1,4 @@ -#!/bin/sh +#!/bin/bash # # Licensed to the Apache Software Foundation (ASF) under one @@ -23,13 +23,13 @@ TEST_DIR=${top_builddir}/src/tests # Check AIS requirements -id -nG | grep '\<ais\>' >/dev/null || NOGROUP="You are not a member of the ais group." -ps -u root | grep 'aisexec\|corosync' >/dev/null || NOAISEXEC="The aisexec or corosync daemon is not running as root" +id -nG | grep '\<ais\>' > /dev/null || NOGROUP="You are not a member of the ais group." +ps -u root | grep 'aisexec\|corosync' > /dev/null || NOAISEXEC="The aisexec or corosync daemon is not running as root" if test -n "${NOGROUP}" -o -n "${NOAISEXEC}"; then cat <<EOF - ========= WARNING: CLUSTERING TESTS DISABLED ============== + ======== WARNING: PYTHON CLUSTER TESTS DISABLED =========== Tests that depend on the openais library (used for clustering) will not be run because: @@ -43,22 +43,28 @@ EOF exit 0 fi -export PYTHONPATH=$srcdir:$srcdir/../../../python -export RUN_CLUSTER_TESTS=1 +# Check XML exchange requirements +XML_LIB=$srcdir/../.libs/xml.so +if [ -f ${XML_LIB} ]; then + export XML_LIB +fi + +export PYTHONPATH=${srcdir}:${srcdir}/../../../python export QPIDD_EXEC=${top_builddir}/src/qpidd export CLUSTER_LIB=${top_builddir}/src/.libs/cluster.so -export QPID_CONFIG_EXEC=$srcdir/../../../python/commands/qpid-config -export QPID_ROUTE_EXEC=$srcdir/../../../python/commands/qpid-route +export QPID_CONFIG_EXEC=${srcdir}/../../../python/commands/qpid-config +export QPID_ROUTE_EXEC=${srcdir}/../../../python/commands/qpid-route export RECEIVER_EXEC=${top_builddir}/src/tests/receiver export SENDER_EXEC=${top_builddir}/src/tests/sender + #Make sure temp dir exists if this is the first to use it TMP_STORE_DIR=${TEST_DIR}/test_tmp if ! test -d ${TMP_STORE_DIR} ; then - mkdir -p ${TMP_STORE_DIR} mkdir -p ${TMP_STORE_DIR}/cluster else - rm -rf "${TMP_STORE_DIR}/cluster" + # Delete old cluster test dirs + rm -rf "${TMP_STORE_DIR}/cluster" mkdir -p "${TMP_STORE_DIR}/cluster" fi export TMP_STORE_DIR @@ -66,6 +72,6 @@ export TMP_STORE_DIR sg ais -c "${srcdir}/cluster.py -v" RETCODE=$? -if test x$RETCODE != x0; then - echo "FAIL cluster tests"; exit 1; +if test x${RETCODE} != x0; then + exit 1; fi diff --git a/cpp/src/tests/testlib.py b/cpp/src/tests/testlib.py index 1c1d1bf407..07c4794767 100644 --- a/cpp/src/tests/testlib.py +++ b/cpp/src/tests/testlib.py @@ -21,7 +21,7 @@ # Support library for qpid python tests. # -import os, signal, subprocess, unittest +import os, re, signal, subprocess, unittest class TestBase(unittest.TestCase): """ @@ -32,8 +32,8 @@ class TestBase(unittest.TestCase): The following environment vars control if and how the test is run, and determine where many of the helper executables/libs are to be found. """ - _storeEnable = os.getenv("STORE_ENABLE") != None # Must be True for durability to be enabled during the test _storeLib = os.getenv("STORE_LIB") + _storeEnable = _storeLib != None # Must be True for durability to be enabled during the test _qpiddExec = os.getenv("QPIDD_EXEC", "/usr/sbin/qpidd") _tempStoreDir = os.path.abspath(os.getenv("TMP_STORE_DIR", "/tmp/qpid")) @@ -55,22 +55,7 @@ class TestBase(unittest.TestCase): return " --%s yes" % key else: return " --%s no" % key - - def _paramNum(self, key, val): - if val != None: - return " --%s %d" % (key, val) - return "" - - def _paramString(self, key, val): - if val != None: - return " --%s %s" % (key, val) - return "" - - def _paramStringList(self, key, valList, val): - if val in valList: - return " --%s %s" % (key, val) - return "" - + # --- Helper functions for message creation --- def _makeMessage(self, msgSize): @@ -115,17 +100,37 @@ class TestBase(unittest.TestCase): #print "started broker: pid=%d, port=%d args: %s" % (pid, port, qpiddArgs) return (pid, port) - def killBroker(self, pid): + def killBroker(self, nodeTuple, ignoreFailures = False): """Kill a broker using kill -9""" - os.kill(pid, signal.SIGTERM) - #print "killed broker: pid=%d" % pid + try: + os.kill(nodeTuple[self.PID], signal.SIGKILL) + try: + os.waitpid(nodeTuple[self.PID], 0) + except: + pass + #print "killed broker: port=%d pid=%d" % (nodeTuple[self.PORT], nodeTuple[self.PID]) + except: + if ignoreFailures: + print "WARNING: killBroker (port=%d pid=%d) failed - ignoring." % (nodeTuple[self.PORT], nodeTuple[self.PID]) + else: + raise - def stopBroker(self, port): + def stopBroker(self, nodeTuple, ignoreFailures = False): """Stop a broker using qpidd -q""" - ret = os.spawnl(os.P_WAIT, self._qpiddExec, self._qpiddExec, "--port=%d" % port, "-q") - if ret != 0: - raise Exception("stopBroker(): port=%d: qpidd -q returned %d" % (port, ret)) - #print "stopped broker: port=%d" % port + try: + ret = os.spawnl(os.P_WAIT, self._qpiddExec, self._qpiddExec, "--port=%d" % nodeTuple[self.PORT], "--quit") + if ret != 0: + raise Exception("stopBroker(): port=%d: qpidd -q returned %d" % (port, ret)) + try: + os.waitpid(nodeTuple[self.PID], 0) + except: + pass + #print "stopped broker: port=%d pid=%d" % (nodeTuple[self.PORT], nodeTuple[self.PID]) + except: + if ignoreFailures: + print "WARNING: stopBroker (port=%d pid=%d) failed - ignoring." % (nodeTuple[self.PORT], nodeTuple[self.PID]) + else: + raise @@ -138,8 +143,10 @@ class TestBaseCluster(TestBase): The following environment vars control if and how the test is run, and determine where many of the helper executables/libs are to be found. """ - _runClusterTests = os.getenv("RUN_CLUSTER_TESTS") != None # Must be True for these cluster tests to run _clusterLib = os.getenv("CLUSTER_LIB") + _clusterTestEnable = _clusterLib != None # Must be True for these cluster tests to run + _xmlLib = os.getenv("XML_LIB") + _xmlEnable = _xmlLib != None _qpidConfigExec = os.getenv("QPID_CONFIG_EXEC", "/usr/bin/qpid-config") _qpidRouteExec = os.getenv("QPID_ROUTE_EXEC", "/usr/bin/qpid-route") _receiverExec = os.getenv("RECEIVER_EXEC", "/usr/libexec/qpid/test/receiver") @@ -164,10 +171,19 @@ class TestBaseCluster(TestBase): def run(self, res): """ Skip cluster testing if env var RUN_CLUSTER_TESTS is not defined.""" - if not self._runClusterTests: + if not self._clusterTestEnable: return unittest.TestCase.run(self, res) + # --- Private helper / convenience functions --- + + def _checkPids(self, clusterName = None): + for pid, port in self.getTupleList(): + try: + os.kill(pid, 0) + except: + raise Exception("_checkPids(): Broker with pid %d expected but does not exist! (crashed?)" % pid) + # --- Starting cluster node(s) --- @@ -196,22 +212,25 @@ class TestBaseCluster(TestBase): # --- Cluster and node status --- - def getTupleList(self): + def getTupleList(self, clusterName = None): """Get list of (pid, port) tuples of all known cluster brokers""" tList = [] - for l in self._clusterDict.itervalues(): - for t in l.itervalues(): - tList.append(t) + for c, l in self._clusterDict.iteritems(): + if clusterName == None or c == clusterName: + for t in l.itervalues(): + tList.append(t) return tList def getNumBrokers(self): """Get total number of brokers in all known clusters""" return len(self.getTupleList()) - def checkNumBrokers(self, expected): + def checkNumBrokers(self, expected = None, checkPids = True): """Check that the total number of brokers in all known clusters is the expected value""" - if self.getNumBrokers() != expected: + if expected != None and self.getNumBrokers() != expected: raise Exception("Unexpected number of brokers: expected %d, found %d" % (expected, self.getNumBrokers())) + if checkPids: + self._checkPids() def getClusterTupleList(self, clusterName): """Get list of (pid, port) tuples of all nodes in named cluster""" @@ -227,11 +246,13 @@ class TestBaseCluster(TestBase): """Get the (pid, port) tuple for the given cluster node""" return self._clusterDict[clusterName][nodeNumber] - def checkNumClusterBrokers(self, clusterName, expected): + def checkNumClusterBrokers(self, clusterName, expected = None, checkPids = True): """Check that the total number of brokers in the named cluster is the expected value""" - if self.getNumClusterBrokers(clusterName) != expected: + if expected != None and self.getNumClusterBrokers(clusterName) != expected: raise Exception("Unexpected number of brokers in cluster %s: expected %d, found %d" % \ (clusterName, expected, self.getNumClusterBrokers(clusterName))) + if checkPids: + self._checkPids(clusterName) def clusterExists(self, clusterName): """ Return True if clusterName exists, False otherwise""" @@ -250,16 +271,16 @@ class TestBaseCluster(TestBase): # --- Kill cluster nodes using signal 9 --- - def killNode(self, nodeNumber, clusterName, updateDict = True): + def killNode(self, nodeNumber, clusterName, updateDict = True, ignoreFailures = False): """Kill the given node in the named cluster using kill -9""" - self.killBroker(self.getNodeTuple(nodeNumber, clusterName)[self.PID]) + self.killBroker(self.getNodeTuple(nodeNumber, clusterName), ignoreFailures) if updateDict: del(self._clusterDict[clusterName][nodeNumber]) - def killCluster(self, clusterName, updateDict = True): + def killCluster(self, clusterName, updateDict = True, ignoreFailures = False): """Kill all nodes in the named cluster""" for n in self._clusterDict[clusterName].iterkeys(): - self.killNode(n, clusterName, False) + self.killNode(n, clusterName, False, ignoreFailures) if updateDict: del(self._clusterDict[clusterName]) @@ -270,46 +291,46 @@ class TestBaseCluster(TestBase): raise Exception("Unable to kill cluster %s; %d nodes still exist" % \ (clusterName, self.getNumClusterBrokers(clusterName))) - def killAllClusters(self): + def killAllClusters(self, ignoreFailures = False): """Kill all known clusters""" for n in self._clusterDict.iterkeys(): - self.killCluster(n, False) + self.killCluster(n, False, ignoreFailures) self._clusterDict.clear() - def killAllClustersCheck(self): + def killAllClustersCheck(self, ignoreFailures = False): """Kill all known clusters and check that the cluster dictionary is empty""" - self.killAllClusters() + self.killAllClusters(ignoreFailures) self.checkNumBrokers(0) # --- Stop cluster nodes using qpidd -q --- - def stopNode(self, nodeNumber, clusterName, updateDict = True): + def stopNode(self, nodeNumber, clusterName, updateDict = True, ignoreFailures = False): """Stop the given node in the named cluster using qpidd -q""" - self.stopBroker(self.getNodeTuple(nodeNumber, clusterName)[self.PORT]) + self.stopBroker(self.getNodeTuple(nodeNumber, clusterName), ignoreFailures) if updateDict: del(self._clusterDict[clusterName][nodeNumber]) - def stopAllClusters(self): + def stopAllClusters(self, ignoreFailures = False): """Stop all known clusters""" for n in self._clusterDict.iterkeys(): - self.stopCluster(n, False) + self.stopCluster(n, False, ignoreFailures) self._clusterDict.clear() - def stopCluster(self, clusterName, updateDict = True): + def stopCluster(self, clusterName, updateDict = True, ignoreFailures = False): """Stop all nodes in the named cluster""" for n in self._clusterDict[clusterName].iterkeys(): - self.stopNode(n, clusterName, False) + self.stopNode(n, clusterName, False, ignoreFailures) if updateDict: del(self._clusterDict[clusterName]) - def stopCheckCluster(self, clusterName): + def stopCheckCluster(self, clusterName, ignoreFailures = False): """Stop the named cluster and check that the name is removed from the cluster dictionary""" - self.stopCluster(clusterName) + self.stopCluster(clusterName, True, ignoreFailures) if self.clusterExists(clusterName): raise Exception("Unable to kill cluster %s; %d nodes still exist" % (clusterName, self.getNumClusterBrokers(clusterName))) - def stopCheckAll(self): + def stopCheckAll(self, ignoreFailures = False): """Kill all known clusters and check that the cluster dictionary is empty""" self.stopAllClusters() self.checkNumBrokers(0) @@ -479,11 +500,252 @@ class TestBaseCluster(TestBase): if wait: receiver.wait() return msgs - - def sendReceiveMsgs(self, nodeNumber, clusterName, exchangeName, queueName, numMsgs, wait = True, msgSize = None): - self.createBindDirectExchangeQueue(nodeNumber, clusterName, exchangeName, queueName) - txMsgs = self.sendMsgs(nodeNumber, clusterName, exchangeName, queueName, numMsgs, msgSize, wait) - rxMsgs = self.receiveMsgs(nodeNumber, clusterName, queueName, numMsgs, wait) - if txMsgs != rxMsgs: - self.fail("Send - receive message mismatch") + + + # --- Exchange-specific helper inner classes --- + + class TestHelper: + """ + This is a "virtual" superclass for test helpers, and is not useful on its own, but the + per-exchange subclasses are designed to keep track of the messages sent to and received + from queues which have bindings to that exchange type. + """ + + def __init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameList): + + """Dictionary of queues and lists of messages sent to them.""" + self._txMsgs = {} + """Dictionary of queues and lists of messages received from them.""" + self._rxMsgs = {} + """List of node numbers currently in the cluster""" + self._nodes = [] + """List of node numbers which have been killed and can therefore be recovered""" + self._deadNodes = [] + """Last node to be used""" + self._lastNode = None + + self._testBaseCluster = testBaseCluster + self._clusterName = clusterName + self._exchangeName = exchangeName + self._queueNameList = queueNameList + self._addQueues(queueNameList) + self._testBaseCluster.createCheckCluster(clusterName, numNodes) + self._nodes.extend(range(0, numNodes)) + + def _addQueues(self, queueNameList): + for qn in queueNameList: + if not qn in self._txMsgs: + self._txMsgs[qn] = [] + if not qn in self._rxMsgs: + self._rxMsgs[qn] = [] + + def _bindQueue(self, queueName, bindingKey, nodeNumber = None): + """Bind a queue to an exchange using a binding key.""" + if nodeNumber == None: + nodeNumber = self._nodes[0] # first available node + self._testBaseCluster.addQueue(nodeNumber, self._clusterName, queueName) + self._testBaseCluster.bind(nodeNumber, self._clusterName, self._exchangeName, queueName, bindingKey) + + def _highestNodeNumber(self): + """Find the highest node number used so far between the current nodes and those stopped/killed.""" + highestNode = self._nodes[-1] + if len(self._deadNodes) == 0: + return highestNode + highestDeadNode = self._deadNodes[-1] + if highestNode > highestDeadNode: + return highestNode + return highestDeadNode + + def killCluster(self): + """Kill all nodes in the cluster""" + self._testBaseCluster.killCluster(self._clusterName) + self._testBaseCluster.checkNumClusterBrokers(self._clusterName, 0) + self._deadNodes.extend(self._nodes) + self._deadNodes.sort() + del self._nodes[:] + + def restoreCluster(self, lastNode = None, restoreNodes = True): + """Restore a previously killed cluster""" + self._testBaseCluster.createCluster(self._clusterName) + if restoreNodes: + numNodes = len(self._deadNodes) + self.restoreNodes(lastNode) + self._testBaseCluster.checkNumClusterBrokers(self._clusterName, numNodes) + + def addNodes(self, numberOfNodes = 1): + """Add a fixed number of nodes to the cluster.""" + nodeStart = self._highestNodeNumber() + 1 + for i in range(0, numberOfNodes): + nodeNumber = nodeStart + i + self._testBaseCluster.createClusterNode(nodeNumber, self._clusterName) + self._nodes.append(nodeNumber) + self._testBaseCluster.checkNumClusterBrokers(self._clusterName, len(self._nodes)) + + def restoreNode(self, nodeNumber): + """Restore a cluster node that has been previously killed""" + if nodeNumber not in self._deadNodes: + raise Exception("restoreNode(): Node number %d not in dead node list %s" % (nodeNumber, self._deadNodes)) + self._testBaseCluster.createClusterNode(nodeNumber, self._clusterName) + self._deadNodes.remove(nodeNumber) + self._nodes.append(nodeNumber) + self._nodes.sort() + + def restoreNodes(self, lastNode = None): + """Restore all known cluster nodes that have been previously killed starting with a known last-used node""" + if len(self._nodes) == 0: # restore last-used node first + if lastNode == None: + lastNode = self._lastNode + self.restoreNode(lastNode) + while len(self._deadNodes) > 0: + self.restoreNode(self._deadNodes[0]) + + def killNode(self, nodeNumber): + """Kill a cluster node (if it is in the _nodes list).""" + if nodeNumber not in self._nodes: + raise Exception("killNode(): Node number %d not in node list %s" % (nodeNumber, self._nodes)) + self._testBaseCluster.killNode(nodeNumber, self._clusterName) + self._nodes.remove(nodeNumber) + self._deadNodes.append(nodeNumber) + self._deadNodes.sort() + + def sendMsgs(self, routingKey, numMsgs, nodeNumber = None, msgSize = None, wait = True): + """Send a fixed number of messages using the given routing key.""" + if nodeNumber == None: + nodeNumber = self._nodes[0] # Use first available node + msgs = self._testBaseCluster._makeMessageList(numMsgs, msgSize) + sender = self._testBaseCluster.createSender(nodeNumber, self._clusterName, self._exchangeName, routingKey) + sender.stdin.write(msgs) + sender.stdin.close() + if wait: + sender.wait() + self._lastNode = nodeNumber + return msgs.split() + + # TODO - this i/f is messy: one mumMsgs can be given, but a list of queues + # so assuming numMsgs for each queue + # A mechanism is needed to specify a different numMsgs per queue + def receiveMsgs(self, numMsgs, nodeNumber = None, queueNameList = None, wait = True): + """Receive a fixed number of messages from a named queue. If numMsgs == None, get all remaining messages.""" + if nodeNumber == None: + nodeNumber = self._nodes[0] # Use first available node + if queueNameList == None: + queueNameList = self._txMsgs.iterkeys() + for qn in queueNameList: + nm = numMsgs + if nm == None: + nm = len(self._txMsgs[qn]) - len(self._rxMsgs[qn]) # get all remaining messages + if nm > 0: + receiver = self._testBaseCluster.createReciever(nodeNumber, self._clusterName, qn, nm) + cnt = 0 + while cnt < nm: + rx = receiver.stdout.readline().strip() + if rx == "" and receiver.poll() != None: break + self._rxMsgs[qn].append(rx) + cnt = cnt + 1 + if wait: + receiver.wait() + self._lastNode = nodeNumber + + def receiveRemainingMsgs(self, nodeNumber = None, queueNameList = None, wait = True): + """Receive all remaining messages on named queue.""" + self.receiveMsgs(None, nodeNumber, queueNameList, wait) + + def checkMsgs(self): + """Return True if all expected messages have been received (ie the transmit and receive list are identical).""" + txMsgTot = 0 + rxMsgTot = 0 + for qn, txMsgList in self._txMsgs.iteritems(): + rxMsgList = self._rxMsgs[qn] + txMsgTot = txMsgTot + len(txMsgList) + rxMsgTot = rxMsgTot + len(rxMsgList) + if len(txMsgList) != len(rxMsgList): + return False + for i, m in enumerate(txMsgList): + if m != rxMsgList[i]: + return False + if txMsgTot == 0 and rxMsgTot == 0: + print "WARNING: No messages were either sent or received" + return True + + def finalizeTest(self): + """Recover all the remaining messages on all queues, then check that all expected messages were received.""" + self.receiveRemainingMsgs() + self._testBaseCluster.stopCheckAll() + if not self.checkMsgs(): + self._testBaseCluster.fail("Send - receive message mismatch") + self.printMsgs() + + def printMsgs(self, txMsgs = True, rxMsgs = True): + """Print all messages transmitted and received.""" + for qn, txMsgList in self._txMsgs.iteritems(): + print "Queue: %s" % qn + if txMsgs: + print " txMsgList = %s" % txMsgList + if rxMsgs: + rxMsgList = self._rxMsgs[qn] + print " rxMsgList = %s" % rxMsgList + + + class DirectExchangeTestHelper(TestHelper): + + def __init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameList): + TestBaseCluster.TestHelper.__init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameList) + self._testBaseCluster.addExchange(0, clusterName, "direct", exchangeName) + for qn in queueNameList: + self._bindQueue(qn, qn) + + def addQueues(self, queueNameList): + self._addQueues(queueNameList) + for qn in queueNameList: + self._bindQueue(qn, qn) + + def sendMsgs(self, numMsgs, nodeNumber = None, queueNameList = None, msgSize = None, wait = True): + if queueNameList == None: + queueNameList = self._txMsgs.iterkeys() + for qn in queueNameList: + self._txMsgs[qn].extend(TestBaseCluster.TestHelper.sendMsgs(self, qn, numMsgs, nodeNumber, msgSize, wait)) + + + class TopicExchangeTestHelper(TestHelper): + + def __init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameKeyList): + self._queueNameKeyList = queueNameKeyList + TestBaseCluster.TestHelper.__init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameKeyList.iterkeys()) + self._testBaseCluster.addExchange(0, clusterName, "topic", exchangeName) + for qn, bk in queueNameKeyList.iteritems(): + self._bindQueue(qn, bk) + + def addQueues(self, queueNameKeyList): + self._addQueues(queueNameKeyList.iterkeys()) + for qn, bk in queueNameKeyList.iteritems(): + self._bindQueue(qn, bk) + + def _prepareRegex(self, bk): + # This regex conversion is not very complete - there are other chars that should be escaped too + return "^%s$" % bk.replace(".", r"\.").replace("*", r"[^.]*").replace("#", ".*") + + def sendMsgs(self, routingKey, numMsgs, nodeNumber = None, msgSize = None, wait = True): + msgList = TestBaseCluster.TestHelper.sendMsgs(self, routingKey, numMsgs, nodeNumber, msgSize, wait) + for qn, bk in self._queueNameKeyList.iteritems(): + if re.match(self._prepareRegex(bk), routingKey): + self._txMsgs[qn].extend(msgList) + + + class FanoutExchangeTestHelper(TestHelper): + + def __init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameList): + TestBaseCluster.TestHelper.__init__(self, testBaseCluster, clusterName, numNodes, exchangeName, queueNameList) + self._testBaseCluster.addExchange(0, clusterName, "fanout", exchangeName) + for qn in queueNameList: + self._bindQueue(qn, "") + + def addQueues(self, queueNameList): + self._addQueues(queueNameList) + for qn in queueNameList: + self._bindQueue(qn, "") + + def sendMsgs(self, numMsgs, nodeNumber = None, msgSize = None, wait = True): + msgList = TestBaseCluster.TestHelper.sendMsgs(self, "", numMsgs, nodeNumber, msgSize, wait) + for ml in self._txMsgs.itervalues(): + ml.extend(msgList)
\ No newline at end of file |