diff options
Diffstat (limited to 'cpp/src/tests/cluster.py')
-rwxr-xr-x | cpp/src/tests/cluster.py | 433 |
1 files changed, 151 insertions, 282 deletions
diff --git a/cpp/src/tests/cluster.py b/cpp/src/tests/cluster.py index 87c9b363a8..ae4cc7aa85 100755 --- a/cpp/src/tests/cluster.py +++ b/cpp/src/tests/cluster.py @@ -29,10 +29,9 @@ class ClusterTests(TestBaseCluster): try: clusterName = "cluster-01" self.createCheckCluster(clusterName, 5) - self.checkNumBrokers(5) self.stopCheckCluster(clusterName) except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_02_MultipleClusterInitialization(self): @@ -40,13 +39,13 @@ class ClusterTests(TestBaseCluster): try: for i in range(0, 5): clusterName = "cluster-02.%d" % i - self.createCluster(clusterName, 5) + self.createCheckCluster(clusterName, 5) self.checkNumBrokers(25) self.killCluster("cluster-02.2") self.checkNumBrokers(20) self.stopCheckAll() except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_03_AddRemoveNodes(self): @@ -66,7 +65,7 @@ class ClusterTests(TestBaseCluster): self.checkNumClusterBrokers(clusterName, 7) self.stopCheckAll() except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_04_RemoveRestoreNodes(self): @@ -93,7 +92,7 @@ class ClusterTests(TestBaseCluster): self.checkNumClusterBrokers(clusterName, 6) self.stopCheckAll() except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_05_KillAllNodesThenRecover(self): @@ -105,170 +104,126 @@ class ClusterTests(TestBaseCluster): self.createCheckCluster(clusterName, 6) self.stopCheckAll() except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_06_PublishConsume(self): """Publish then consume 100 messages from a single cluster""" try: - clusterName = "cluster-06" - self.createCheckCluster(clusterName, 3) - self.sendReceiveMsgs(0, clusterName, "test-exchange-06", "test-queue-06", 100) - self.stopCheckAll() + dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-06", 3, "test-exchange-06", ["test-queue-06"]) + dh.sendMsgs(100) + dh.finalizeTest() except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_07_MultiplePublishConsume(self): """Staggered publish and consume on a single cluster""" try: - clusterName = "cluster-07" - exchangeName = "test-exchange-07" - queueName = "test-queue-07" - self.createCheckCluster(clusterName, 3) - self.createBindDirectExchangeQueue(0, clusterName, exchangeName, queueName) - txMsgs = self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 20, 0 - rxMsgs = self.receiveMsgs(1, clusterName, queueName, 10) # 10, 10 - txMsgs += self.sendMsgs(2, clusterName, exchangeName, queueName, 20) # 30, 10 - rxMsgs += self.receiveMsgs(0, clusterName, queueName, 20) # 10, 30 - txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 20) # 30, 30 - rxMsgs += self.receiveMsgs(2, clusterName, queueName, 20) # 10, 50 - txMsgs += self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 30, 50 - rxMsgs += self.receiveMsgs(1, clusterName, queueName, 30) # 0, 80 - self.stopCheckAll() - if txMsgs != rxMsgs: - print "txMsgs=%s" % txMsgs - print "rxMsgs=%s" % rxMsgs - self.fail("Send - receive message mismatch") + dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-07", 3, "test-exchange-07", ["test-queue-07"]) + # tx rx nodes + # 0 0 0 1 2 + dh.sendMsgs(20) # 20 0 * + dh.receiveMsgs(10, 1) # 20 10 * + dh.sendMsgs(20, 2) # 40 10 * + dh.receiveMsgs(20, 0) # 40 30 * + dh.sendMsgs(20, 1) # 60 30 * + dh.receiveMsgs(20, 2) # 60 50 * + dh.sendMsgs(20, 0) # 80 50 * + dh.finalizeTest() except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_08_MsgPublishConsumeAddRemoveNodes(self): """Staggered publish and consume interleaved with adding and removing nodes on a single cluster""" try: - clusterName = "cluster-08" - exchangeName = "test-exchange-08" - queueName = "test-queue-08" - self.createCheckCluster(clusterName, 3) - self.createBindDirectExchangeQueue(0, clusterName, exchangeName, queueName) - txMsgs = self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 20, 0 - for i in range(3,6): - self.createClusterNode(i, clusterName) - self.checkNumClusterBrokers(clusterName, 6) - txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 20) # 40, 0 - self.killNode(0, clusterName) - self.checkNumClusterBrokers(clusterName, 5) - rxMsgs = self.receiveMsgs(2, clusterName, queueName, 10) # 30, 10 - self.killNode(2, clusterName) - self.checkNumClusterBrokers(clusterName, 4) - rxMsgs += self.receiveMsgs(3, clusterName, queueName, 20) # 10, 30 - self.createClusterNode(6, clusterName) - self.checkNumClusterBrokers(clusterName, 5) - txMsgs += self.sendMsgs(4, clusterName, exchangeName, queueName, 20) # 30, 30 - rxMsgs += self.receiveMsgs(5, clusterName, queueName, 20) # 10, 50 - self.createClusterNode(7, clusterName) - self.checkNumClusterBrokers(clusterName, 6) - txMsgs += self.sendMsgs(6, clusterName, exchangeName, queueName, 20) # 30, 50 - rxMsgs += self.receiveMsgs(1, clusterName, queueName, 30) # 0, 80 - self.stopCheckAll() - if txMsgs != rxMsgs: - print "txMsgs=%s" % txMsgs - print "rxMsgs=%s" % rxMsgs - self.fail("Send - receive message mismatch") + dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-08", 3, "test-exchange-08", ["test-queue-08"]) + # tx rx nodes + # 0 0 0 1 2 + dh.sendMsgs(20) # 20 0 * + dh.addNodes(2) # 0 1 2 3 4 + dh.sendMsgs(20, 1) # 40 0 * + dh.killNode(0) # . 1 2 3 4 + dh.receiveMsgs(10, 2) # 40 10 * + dh.killNode(2) # . 1 . 3 4 + dh.receiveMsgs(20, 3) # 40 30 * + dh.addNodes() # . 1 . 3 4 5 + dh.sendMsgs(20, 4) # 60 30 * + dh.receiveMsgs(20, 5) # 60 50 * + dh.addNodes() # . 1 . 3 4 5 6 + dh.sendMsgs(20, 6) # 80 50 * + dh.killNode(5) # . 1 . 3 4 . 6 + dh.finalizeTest() except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_09_MsgPublishConsumeRemoveRestoreNodes(self): """Publish and consume messages interleaved with adding and restoring previous nodes on a single cluster""" try: - clusterName = "cluster-09" - exchangeName = "test-exchange-09" - queueName = "test-queue-09" - self.createCheckCluster(clusterName, 6) - self.createBindDirectExchangeQueue(0, clusterName, exchangeName, queueName) - txMsgs = self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 20, 0 - self.killNode(2, clusterName) - self.checkNumClusterBrokers(clusterName, 5) - txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 20) # 40, 0 - self.killNode(0, clusterName) - self.checkNumClusterBrokers(clusterName, 4) - rxMsgs = self.receiveMsgs(3, clusterName, queueName, 10) # 30, 10 - self.killNode(4, clusterName) - self.checkNumClusterBrokers(clusterName, 3) - rxMsgs += self.receiveMsgs(5, clusterName, queueName, 20) # 10, 30 - self.createClusterNode(2, clusterName) - self.checkNumClusterBrokers(clusterName, 4) - txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 20) # 30, 30 - self.createClusterNode(0, clusterName) - self.checkNumClusterBrokers(clusterName, 5) - rxMsgs += self.receiveMsgs(2, clusterName, queueName, 20) # 10, 50 - self.createClusterNode(4, clusterName) - self.checkNumClusterBrokers(clusterName, 6) - txMsgs += self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 30, 50 - rxMsgs += self.receiveMsgs(4, clusterName, queueName, 30) # 0, 80 - self.stopCheckAll() - if txMsgs != rxMsgs: - print "txMsgs=%s" % txMsgs - print "rxMsgs=%s" % rxMsgs - self.fail("Send - receive message mismatch") + dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-09", 6, "test-exchange-09", ["test-queue-09"]) + # tx rx nodes + # 0 0 0 1 2 3 4 5 + dh.sendMsgs(20) # 20 0 * + dh.killNode(2) # 0 1 . 3 4 5 + dh.sendMsgs(20, 1) # 40 0 * + dh.killNode(0) # . 1 . 3 4 5 + dh.receiveMsgs(10, 3) # 40 10 * + dh.killNode(4) # . 1 . 3 . 5 + dh.receiveMsgs(20, 5) # 40 30 * + dh.restoreNode(2) # . 1 2 3 . 5 + dh.sendMsgs(20, 1) # 60 30 * + dh.restoreNode(0) # 0 1 2 3 . 5 + dh.receiveMsgs(20, 0) # 60 50 * + dh.killNode(2) # 0 1 . 3 . 5 + dh.restoreNode(2) # 0 1 2 3 . 5 + dh.sendMsgs(20, 2) # 80 50 * + dh.finalizeTest() except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_10_LinearNodeKillCreateProgression(self): """Publish and consume messages while linearly killing all original nodes and replacing them with new ones""" try: - clusterName = "cluster-10" - exchangeName = "test-exchange-10" - queueName = "test-queue-10" - self.createCheckCluster(clusterName, 4) - self.createBindDirectExchangeQueue(2, clusterName, exchangeName, queueName) - txMsgs = self.sendMsgs(2, clusterName, exchangeName, queueName, 20) - rxMsgs = self.receiveMsgs(3, clusterName, queueName, 10) - for i in range(0, 16): - self.killNode(i, clusterName) - self.createClusterNode(i+4, clusterName) - self.checkNumClusterBrokers(clusterName, 4) - txMsgs += self.sendMsgs(i+1, clusterName, exchangeName, queueName, 20) - rxMsgs += self.receiveMsgs(i+2, clusterName, queueName, 20) - rxMsgs += self.receiveMsgs(16, clusterName, queueName, 10) - self.stopCheckAll() - if txMsgs != rxMsgs: - print "txMsgs=%s" % txMsgs - print "rxMsgs=%s" % rxMsgs - self.fail("Send - receive message mismatch") + dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-10", 4, "test-exchange-10", ["test-queue-10"]) + # tx rx nodes + # 0 0 0 1 2 3 + dh.sendMsgs(20) # 20 0 * + dh.receiveMsgs(10, 1) # 20 10 * + for i in range(0, 16): # First loop: + dh.killNode(i) # . 1 2 3 + dh.addNodes() # . 1 2 3 4 + dh.sendMsgs(20, i+1) # 40 10 * + dh.receiveMsgs(20, i+2) # 40 30 * + # After loop: + # 340 330 . . . . . . . . . . . . . . . . 16 17 18 19 + dh.finalizeTest() except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_11_CircularNodeKillRestoreProgression(self): """Publish and consume messages while circularly killing all original nodes and restoring them again""" try: - clusterName = "cluster-11" - exchangeName = "test-exchange-11" - queueName = "test-queue-11" - self.createCheckCluster(clusterName, 4) - self.createBindDirectExchangeQueue(2, clusterName, exchangeName, queueName) - txMsgs = self.sendMsgs(3, clusterName, exchangeName, queueName, 20) - rxMsgs = self.receiveMsgs(0, clusterName, queueName, 10) - self.killNode(0, clusterName) - self.killNode(1, clusterName) - for i in range(0, 16): - self.killNode((i + 2) % 4, clusterName) - self.createClusterNode(i % 4, clusterName) - self.checkNumClusterBrokers(clusterName, 2) - txMsgs += self.sendMsgs((i + 3) % 4, clusterName, exchangeName, queueName, 20) - rxMsgs += self.receiveMsgs((i + 4) % 4, clusterName, queueName, 20) - rxMsgs += self.receiveMsgs(3, clusterName, queueName, 10) - self.stopCheckAll() - if txMsgs != rxMsgs: - print "txMsgs=%s" % txMsgs - print "rxMsgs=%s" % rxMsgs - self.fail("Send - receive message mismatch") + dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-11", 4, "test-exchange-11", ["test-queue-11"]) + # tx rx nodes + # 0 0 0 1 2 3 + dh.sendMsgs(20, 3) # 20 0 * + dh.receiveMsgs(10) # 20 10 * + dh.killNode(0) # . 1 2 3 + dh.killNode(1) # . . 2 3 + for i in range(0, 16): # First loop: + dh.killNode((i + 2) % 4) # . . . 3 + dh.restoreNode(i % 4) # 0 . . 3 + dh.sendMsgs(20, (i + 3) % 4) # 40 10 * + dh.receiveMsgs(20, (i + 4) % 4) # 40 30 * + # After loop: + # 340 330 . . 2 3 + dh.finalizeTest() except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_12_KillAllNodesRecoverMessages(self): @@ -277,180 +232,94 @@ class ClusterTests(TestBaseCluster): print " No store loaded, skipped" return try: - clusterName = "cluster-12" - exchangeName = "test-exchange-12" - queueName = "test-queue-12" - self.createCheckCluster(clusterName, 4) - self.createBindDirectExchangeQueue(2, clusterName, exchangeName, queueName) - txMsgs = self.sendMsgs(0, clusterName, exchangeName, queueName, 20) - rxMsgs = self.receiveMsgs(1, clusterName, queueName, 10) - txMsgs += self.sendMsgs(2, clusterName, exchangeName, queueName, 20) - rxMsgs += self.receiveMsgs(3, clusterName, queueName, 20) - self.killNode(0, clusterName) - self.createClusterNode(4, clusterName) - self.checkNumClusterBrokers(clusterName, 4) - txMsgs += self.sendMsgs(4, clusterName, exchangeName, queueName, 20) - rxMsgs += self.receiveMsgs(1, clusterName, queueName, 20) - self.killNode(2, clusterName) - self.createClusterNode(0, clusterName) - self.createClusterNode(5, clusterName) - self.checkNumClusterBrokers(clusterName, 5) - txMsgs += self.sendMsgs(0, clusterName, exchangeName, queueName, 20) - rxMsgs += self.receiveMsgs(3, clusterName, queueName, 20) - self.killAllClusters() - self.checkNumClusterBrokers(clusterName, 0) - self.createCluster(clusterName) - self.createClusterNode(3, clusterName) # last node to be used - self.createClusterNode(0, clusterName) - self.createClusterNode(1, clusterName) - self.createClusterNode(2, clusterName) - self.createClusterNode(4, clusterName) - self.createClusterNode(5, clusterName) - rxMsgs += self.receiveMsgs(0, clusterName, queueName, 10) - if txMsgs != rxMsgs: - print "txMsgs=%s" % txMsgs - print "rxMsgs=%s" % rxMsgs - self.fail("Send - receive message mismatch") + dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-12", 4, "test-exchange-12", ["test-queue-12"]) + # tx rx nodes + # 0 0 0 1 2 3 + dh.sendMsgs(20, 2) # 20 0 * + dh.receiveMsgs(10, 1) # 20 10 * + dh.killNode(1) # 0 . 2 3 + dh.sendMsgs(20, 0) # 40 10 * + dh.receiveMsgs(20, 3) # 40 30 * + dh.killNode(2) # 0 . . 3 + dh.addNodes(2) # 0 . . 3 4 5 + dh.sendMsgs(20, 5) # 60 30 * + dh.receiveMsgs(20, 4) # 60 50 * + dh.killCluster() # cluster does not exist + dh.restoreCluster() # 60 50 . . . . . . + dh.restoreNodes() # 0 1 2 3 4 5 + dh.finalizeTest() except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_13_TopicExchange(self): - """Create topic exchange in a cluster and make sure it replicates correctly""" + """Create topic exchange in a cluster and make sure it behaves correctly""" try: - clusterName = "cluster-13" - self.createCheckCluster(clusterName, 4) - topicExchangeName = "test-exchange-13" - topicQueueNameKeyList = {"test-queue-13-A" : "#.A", "test-queue-13-B" : "#.B", "test-queue-13-C" : "C.#", "test-queue-13-D" : "D.#"} - self.createBindTopicExchangeQueues(2, clusterName, topicExchangeName, topicQueueNameKeyList) - - # Place initial messages - txMsgsA = txMsgsC = self.sendMsgs(3, clusterName, topicExchangeName, "C.hello.A", 10) # (10, 0, 10, 0) - self.sendMsgs(2, clusterName, topicExchangeName, "hello", 10) # Should not go to any queue - txMsgsD = self.sendMsgs(1, clusterName, topicExchangeName, "D.hello.A", 10) # (20, 0, 10, 10) - txMsgsA += txMsgsD - txMsgsB = self.sendMsgs(0, clusterName, topicExchangeName, "hello.B", 20) # (20, 20, 10, 10) + topicQueueNameKeyList = {"test-queue-13-A" : "#.A", "test-queue-13-B" : "*.B", "test-queue-13-C" : "C.#", "test-queue-13-D" : "D.*"} + th = TestBaseCluster.TopicExchangeTestHelper(self, "cluster-13", 4, "test-exchange-13", topicQueueNameKeyList) + # Place initial messages + th.sendMsgs("C.hello.A", 10) + th.sendMsgs("hello.world", 10) # matches none of the queues + th.sendMsgs("D.hello.A", 10) + th.sendMsgs("hello.B", 20) + th.sendMsgs("D.hello", 20) # Kill and add some nodes - self.killNode(0, clusterName) - self.killNode(2, clusterName) - self.createClusterNode(4, clusterName) - self.createClusterNode(5, clusterName) - self.checkNumClusterBrokers(clusterName, 4) + th.killNode(0) + th.killNode(2) + th.addNodes(2) # Pull 10 messages from each queue - rxMsgsA = self.receiveMsgs(1, clusterName, "test-queue-13-A", 10) # (10, 20, 10, 10) - rxMsgsB = self.receiveMsgs(3, clusterName, "test-queue-13-B", 10) # (10, 10, 10, 10) - rxMsgsC = self.receiveMsgs(4, clusterName, "test-queue-13-C", 10) # (10, 10, 0, 10) - rxMsgsD = self.receiveMsgs(5, clusterName, "test-queue-13-D", 10) # (10, 10, 0, 0) + th.receiveMsgs(10) # Kill and add another node - self.killNode(4, clusterName) - self.createClusterNode(6, clusterName) - self.checkNumClusterBrokers(clusterName, 4) + th.killNode(4) + th.addNodes() # Add two more queues - self.createBindTopicExchangeQueues(6, clusterName, topicExchangeName, {"test-queue-13-E" : "#.bye.A", "test-queue-13-F" : "#.bye.B"}) + th.addQueues({"test-queue-13-E" : "#.bye.A", "test-queue-13-F" : "#.bye.B"}) # Place more messages - txMsgs = self.sendMsgs(3, clusterName, topicExchangeName, "C.bye.A", 10) # (20, 10, 10, 0, 10, 0) - txMsgsA += txMsgs - txMsgsC += txMsgs - txMsgsE = txMsgs - self.sendMsgs(1, clusterName, topicExchangeName, "bye", 20) # Should not go to any queue - txMsgs = self.sendMsgs(5, clusterName, topicExchangeName, "D.bye.B", 20) # (20, 30, 10, 20, 10, 20) - txMsgsB += txMsgs - txMsgsD += txMsgs - txMsgsF = txMsgs + th.sendMsgs("C.bye.A", 10) + th.sendMsgs("hello.bye", 20) # matches none of the queues + th.sendMsgs("hello.bye.B", 20) + th.sendMsgs("D.bye", 20) # Kill all nodes but one - self.killNode(1, clusterName) - self.killNode(3, clusterName) - self.killNode(6, clusterName) - self.checkNumClusterBrokers(clusterName, 1) - # Pull all remaining messages from each queue - rxMsgsA += self.receiveMsgs(5, clusterName, "test-queue-13-A", 20) - rxMsgsB += self.receiveMsgs(5, clusterName, "test-queue-13-B", 30) - rxMsgsC += self.receiveMsgs(5, clusterName, "test-queue-13-C", 10) - rxMsgsD += self.receiveMsgs(5, clusterName, "test-queue-13-D", 20) - rxMsgsE = self.receiveMsgs(5, clusterName, "test-queue-13-E", 10) - rxMsgsF = self.receiveMsgs(5, clusterName, "test-queue-13-F", 20) - # Check messages - self.stopCheckAll() - if txMsgsA != rxMsgsA: - self.fail("Send - receive message mismatch for queue A") - if txMsgsB != rxMsgsB: - self.fail("Send - receive message mismatch for queue B") - if txMsgsC != rxMsgsC: - self.fail("Send - receive message mismatch for queue C") - if txMsgsD != rxMsgsD: - self.fail("Send - receive message mismatch for queue D") - if txMsgsE != rxMsgsE: - self.fail("Send - receive message mismatch for queue E") - if txMsgsF != rxMsgsF: - self.fail("Send - receive message mismatch for queue F") + th.killNode(1) + th.killNode(3) + th.killNode(6) + # Pull all remaining messages from each queue and check messages + th.finalizeTest() except: - self.killAllClusters() + self.killAllClusters(True) raise def test_Cluster_14_FanoutExchange(self): - """Create fanout exchange in a cluster and make sure it replicates correctly""" + """Create fanout exchange in a cluster and make sure it behaves correctly""" try: - clusterName = "cluster-14" - self.createCheckCluster(clusterName, 4) - fanoutExchangeName = "test-exchange-14" fanoutQueueNameList = ["test-queue-14-A", "test-queue-14-B", "test-queue-14-C"] - self.createBindFanoutExchangeQueues(2, clusterName, fanoutExchangeName, fanoutQueueNameList) - + fh = TestBaseCluster.FanoutExchangeTestHelper(self, "cluster-14", 4, "test-exchange-14", fanoutQueueNameList) # Place initial 20 messages, retrieve 10 - txMsg = self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20) - rxMsgA = self.receiveMsgs(1, clusterName, "test-queue-14-A", 10) - rxMsgB = self.receiveMsgs(3, clusterName, "test-queue-14-B", 10) - rxMsgC = self.receiveMsgs(0, clusterName, "test-queue-14-C", 10) + fh.sendMsgs(20) + fh.receiveMsgs(10) # Kill and add some nodes - self.killNode(0, clusterName) - self.killNode(2, clusterName) - self.createClusterNode(4, clusterName) - self.createClusterNode(5, clusterName) - self.checkNumClusterBrokers(clusterName, 4) + fh.killNode(0) + fh.killNode(2) + fh.addNodes(2) # Place another 20 messages, retrieve 20 - txMsg += self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20) - rxMsgA += self.receiveMsgs(1, clusterName, "test-queue-14-A", 20) - rxMsgB += self.receiveMsgs(3, clusterName, "test-queue-14-B", 20) - rxMsgC += self.receiveMsgs(4, clusterName, "test-queue-14-C", 20) + fh.sendMsgs(20) + fh.receiveMsgs(20) # Kill and add another node - self.killNode(4, clusterName) - self.createClusterNode(6, clusterName) - self.checkNumClusterBrokers(clusterName, 4) + fh.killNode(4) + fh.addNodes() # Add another 2 queues - self.createBindFanoutExchangeQueues(6, clusterName, fanoutExchangeName, ["test-queue-14-D", "test-queue-14-E"]) + fh.addQueues(["test-queue-14-D", "test-queue-14-E"]) # Place another 20 messages, retrieve 20 - tmp = self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20) - txMsg += tmp - rxMsgA += self.receiveMsgs(1, clusterName, "test-queue-14-A", 20) - rxMsgB += self.receiveMsgs(3, clusterName, "test-queue-14-B", 20) - rxMsgC += self.receiveMsgs(6, clusterName, "test-queue-14-C", 20) - rxMsgD = self.receiveMsgs(6, clusterName, "test-queue-14-D", 10) - rxMsgE = self.receiveMsgs(6, clusterName, "test-queue-14-E", 10) + fh.sendMsgs(20) + fh.receiveMsgs(20) # Kill all nodes but one - self.killNode(1, clusterName) - self.killNode(3, clusterName) - self.killNode(6, clusterName) - self.checkNumClusterBrokers(clusterName, 1) - # Pull all remaining messages from each queue - rxMsgA += self.receiveMsgs(5, clusterName, "test-queue-14-A", 10) - rxMsgB += self.receiveMsgs(5, clusterName, "test-queue-14-B", 10) - rxMsgC += self.receiveMsgs(5, clusterName, "test-queue-14-C", 10) - rxMsgD += self.receiveMsgs(5, clusterName, "test-queue-14-D", 10) - rxMsgE += self.receiveMsgs(5, clusterName, "test-queue-14-E", 10) + fh.killNode(1) + fh.killNode(3) + fh.killNode(6) # Check messages - self.stopCheckAll() - if txMsg != rxMsgA: - self.fail("Send - receive message mismatch for queue A") - if txMsg != rxMsgB: - self.fail("Send - receive message mismatch for queue B") - if txMsg != rxMsgC: - self.fail("Send - receive message mismatch for queue C") - if tmp != rxMsgD: - self.fail("Send - receive message mismatch for queue D") - if tmp != rxMsgE: - self.fail("Send - receive message mismatch for queue E") + fh.finalizeTest() except: - self.killAllClusters() + self.killAllClusters(True) raise # Start the test here |