diff options
Diffstat (limited to 'cpp/src/tests/cluster_tests.py')
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 414 |
1 files changed, 27 insertions, 387 deletions
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index a3fbeeadab..e61d114713 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -18,19 +18,20 @@ # under the License. # -import os, signal, sys +import os, signal, sys, time +from threading import Thread from brokertest import * from qpid import datatypes, messaging -from testlib import TestBaseCluster # Old framework +from qpid.harness import Skipped -# New framework tests -class NewTests(BrokerTest): +class ClusterTests(BrokerTest): + """Cluster tests with support for testing with a store plugin.""" - def test_basic(self): + def test_message_replication(self): """Test basic cluster message replication.""" # Start a cluster, send some messages to member 0. - cluster = Cluster(self, 2) + cluster = self.cluster(2) s0 = cluster[0].connect().session() s0.sender("q {create:always}").send(messaging.Message("x")) s0.sender("q {create:always}").send(messaging.Message("y")) @@ -50,388 +51,27 @@ class NewTests(BrokerTest): self.assertEqual("y", m.content) s2.connection.close() - self.passed() + def test_failover(self): + """Test fail-over during continuous send-receive""" + # FIXME aconway 2009-11-09: this test is failing, showing lost messages. + # Enable when fixed + return # FIXME should be raise Skipped or negative test? -# Old framework tests -class ShortTests(TestBaseCluster): - """Basic cluster with async store tests""" + # Original cluster will all be killed so expect exit with failure + cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL) - def test_01_Initialization(self): - """Start a single cluster containing several nodes, and stop it again""" - try: - clusterName = "cluster-01" - self.createCheckCluster(clusterName, 5) - self.stopCheckCluster(clusterName) - except: - self.killAllClusters(True) - raise + # Start sender and receiver threads + cluster[0].declare_queue("test-queue") + self.receiver = Receiver(cluster[1]) + self.receiver.start() + self.sender = Sender(cluster[2]) + self.sender.start() - def test_02_MultipleClusterInitialization(self): - """Start several clusters each with several nodes and stop them again""" - try: - for i in range(0, 5): - clusterName = "cluster-02.%d" % i - self.createCheckCluster(clusterName, 5) - self.checkNumBrokers(25) - self.killCluster("cluster-02.2") - self.checkNumBrokers(20) - self.stopAllCheck() - except: - self.killAllClusters(True) - raise - - def test_03_AddRemoveNodes(self): - """Create a multi-node cluster, then kill some nodes and add some new ones (not those killed)""" - try: - clusterName = "cluster-03" - self.createCheckCluster(clusterName, 3) - for i in range(3,8): - self.createClusterNode(i, clusterName) - self.checkNumClusterBrokers(clusterName, 8) - self.killNode(2, clusterName) - self.killNode(5, clusterName) - self.killNode(6, clusterName) - self.checkNumClusterBrokers(clusterName, 5) - self.createClusterNode(8, clusterName) - self.createClusterNode(9, clusterName) - self.checkNumClusterBrokers(clusterName, 7) - self.stopAllCheck() - except: - self.killAllClusters(True) - raise + # Kill original brokers, start new ones. + for i in range(3): + cluster[i].kill() + cluster.start() + time.sleep(1) - def test_04_RemoveRestoreNodes(self): - """Create a multi-node cluster, then kill some of the nodes and restart them""" - try: - clusterName = "cluster-04" - self.createCheckCluster(clusterName, 6) - self.checkNumBrokers(6) - self.killNode(1, clusterName) - self.killNode(3, clusterName) - self.killNode(4, clusterName) - self.checkNumBrokers(3) - self.createClusterNode(1, clusterName) - self.createClusterNode(3, clusterName) - self.createClusterNode(4, clusterName) - self.checkNumClusterBrokers(clusterName, 6) - self.killNode(2, clusterName) - self.killNode(3, clusterName) - self.killNode(4, clusterName) - self.checkNumBrokers(3) - self.createClusterNode(2, clusterName) - self.createClusterNode(3, clusterName) - self.createClusterNode(4, clusterName) - self.checkNumClusterBrokers(clusterName, 6) - self.stopAllCheck() - except: - self.killAllClusters(True) - raise - - def test_05_KillAllNodesThenRecover(self): - """Create a multi-node cluster, then kill *all* nodes, then restart the cluster""" - try: - clusterName = "cluster-05" - self.createCheckCluster(clusterName, 6) - self.killClusterCheck(clusterName) - self.createCheckCluster(clusterName, 6) - self.stopAllCheck() - except: - self.killAllClusters(True) - raise - - def test_06_PublishConsume(self): - """Publish then consume 100 messages from a single cluster""" - try: - dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-06", 3, "test-exchange-06", ["test-queue-06"]) - dh.sendMsgs(100) - dh.finalizeTest() - except: - self.killAllClusters(True) - raise - - def test_07_MultiplePublishConsume(self): - """Staggered publish and consume on a single cluster""" - try: - dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-07", 3, "test-exchange-07", ["test-queue-07"]) - # tx rx nodes - # 0 0 0 1 2 - dh.sendMsgs(20) # 20 0 * - dh.receiveMsgs(10, 1) # 20 10 * - dh.sendMsgs(20, 2) # 40 10 * - dh.receiveMsgs(20, 0) # 40 30 * - dh.sendMsgs(20, 1) # 60 30 * - dh.receiveMsgs(20, 2) # 60 50 * - dh.sendMsgs(20, 0) # 80 50 * - dh.finalizeTest() - except: - self.killAllClusters(True) - raise - - def test_08_MsgPublishConsumeAddRemoveNodes(self): - """Staggered publish and consume interleaved with adding and removing nodes on a single cluster""" - try: - dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-08", 3, "test-exchange-08", ["test-queue-08"]) - # tx rx nodes - # 0 0 0 1 2 - dh.sendMsgs(20) # 20 0 * - dh.addNodes(2) # 0 1 2 3 4 - dh.sendMsgs(20, 1) # 40 0 * - dh.killNode(0) # . 1 2 3 4 - dh.receiveMsgs(10, 2) # 40 10 * - dh.killNode(2) # . 1 . 3 4 - dh.receiveMsgs(20, 3) # 40 30 * - dh.addNodes() # . 1 . 3 4 5 - dh.sendMsgs(20, 4) # 60 30 * - dh.receiveMsgs(20, 5) # 60 50 * - dh.addNodes() # . 1 . 3 4 5 6 - dh.sendMsgs(20, 6) # 80 50 * - dh.killNode(5) # . 1 . 3 4 . 6 - dh.finalizeTest() - except: - self.killAllClusters(True) - raise - - def test_09_MsgPublishConsumeRemoveRestoreNodes(self): - """Publish and consume messages interleaved with adding and restoring previous nodes on a single cluster""" - try: - dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-09", 6, "test-exchange-09", ["test-queue-09"]) - # tx rx nodes - # 0 0 0 1 2 3 4 5 - dh.sendMsgs(20) # 20 0 * - dh.killNode(2) # 0 1 . 3 4 5 - dh.sendMsgs(20, 1) # 40 0 * - dh.killNode(0) # . 1 . 3 4 5 - dh.receiveMsgs(10, 3) # 40 10 * - dh.killNode(4) # . 1 . 3 . 5 - dh.receiveMsgs(20, 5) # 40 30 * - dh.restoreNode(2) # . 1 2 3 . 5 - dh.sendMsgs(20, 1) # 60 30 * - dh.restoreNode(0) # 0 1 2 3 . 5 - dh.receiveMsgs(20, 0) # 60 50 * - dh.killNode(2) # 0 1 . 3 . 5 - dh.restoreNode(2) # 0 1 2 3 . 5 - dh.sendMsgs(20, 2) # 80 50 * - dh.finalizeTest() - except: - self.killAllClusters(True) - raise - - def test_10_LinearNodeKillCreateProgression(self): - """Publish and consume messages while linearly killing all original nodes and replacing them with new ones""" - try: - dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-10", 4, "test-exchange-10", ["test-queue-10"]) - # tx rx nodes - # 0 0 0 1 2 3 - dh.sendMsgs(20) # 20 0 * - dh.receiveMsgs(10, 1) # 20 10 * - for i in range(0, 16): # First loop: - dh.killNode(i) # . 1 2 3 - dh.addNodes() # . 1 2 3 4 - dh.sendMsgs(20, i+1) # 40 10 * - dh.receiveMsgs(20, i+2) # 40 30 * - # After loop: - # 340 330 . . . . . . . . . . . . . . . . 16 17 18 19 - dh.finalizeTest() - except: - self.killAllClusters(True) - raise - - def test_11_CircularNodeKillRestoreProgression(self): - """Publish and consume messages while circularly killing all original nodes and restoring them again""" - try: - dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-11", 4, "test-exchange-11", ["test-queue-11"]) - # tx rx nodes - # 0 0 0 1 2 3 - dh.sendMsgs(20, 3) # 20 0 * - dh.receiveMsgs(10) # 20 10 * - dh.killNode(0) # . 1 2 3 - dh.killNode(1) # . . 2 3 - for i in range(0, 16): # First loop: - dh.killNode((i + 2) % 4) # . . . 3 - dh.restoreNode(i % 4) # 0 . . 3 - dh.sendMsgs(20, (i + 3) % 4) # 40 10 * - dh.receiveMsgs(20, (i + 4) % 4) # 40 30 * - # After loop: - # 340 330 . . 2 3 - dh.finalizeTest() - except: - self.killAllClusters(True) - raise - - def test_12_KillAllNodesRecoverMessages(self): - """Create a cluster, add and delete messages, kill all nodes then recover cluster and messages""" - if not self._storeEnable: - print " No store loaded, skipped" - return - try: - dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-12", 4, "test-exchange-12", ["test-queue-12"]) - # tx rx nodes - # 0 0 0 1 2 3 - dh.sendMsgs(20, 2) # 20 0 * - dh.receiveMsgs(10, 1) # 20 10 * - dh.killNode(1) # 0 . 2 3 - dh.sendMsgs(20, 0) # 40 10 * - dh.receiveMsgs(20, 3) # 40 30 * - dh.killNode(2) # 0 . . 3 - dh.addNodes(2) # 0 . . 3 4 5 - dh.sendMsgs(20, 5) # 60 30 * - dh.receiveMsgs(20, 4) # 60 50 * - dh.killCluster() # cluster does not exist - self.checkNumClusterBrokers("cluster-12", 0) - dh.restoreCluster() # 60 50 . . . . . . - dh.restoreNodes() # 0 1 2 3 4 5 - dh.finalizeTest() - except: - self.killAllClusters(True) - raise - - def test_13_TopicExchange(self): - """Create topic exchange in a cluster and make sure it behaves correctly""" - try: - topicQueueNameKeyList = {"test-queue-13-A" : "#.A", "test-queue-13-B" : "*.B", "test-queue-13-C" : "C.#", "test-queue-13-D" : "D.*"} - th = TestBaseCluster.TopicExchangeTestHelper(self, "cluster-13", 4, "test-exchange-13", topicQueueNameKeyList) - # Place initial messages - th.sendMsgs("C.hello.A", 10) - th.sendMsgs("hello.world", 10) # matches none of the queues - th.sendMsgs("D.hello.A", 10) - th.sendMsgs("hello.B", 20) - th.sendMsgs("D.hello", 20) - # Kill and add some nodes - th.killNode(0) - th.killNode(2) - th.addNodes(2) - # Pull 10 messages from each queue - th.receiveMsgs(10) - # Kill and add another node - th.killNode(4) - th.addNodes() - # Add two more queues - th.addQueues({"test-queue-13-E" : "#.bye.A", "test-queue-13-F" : "#.bye.B"}) - # Place more messages - th.sendMsgs("C.bye.A", 10) - th.sendMsgs("hello.bye", 20) # matches none of the queues - th.sendMsgs("hello.bye.B", 20) - th.sendMsgs("D.bye", 20) - # Kill all nodes but one - th.killNode(1) - th.killNode(3) - th.killNode(6) - # Pull all remaining messages from each queue and check messages - th.finalizeTest() - except: - self.killAllClusters(True) - raise - - def test_14_FanoutExchange(self): - """Create fanout exchange in a cluster and make sure it behaves correctly""" - try: - fanoutQueueNameList = ["test-queue-14-A", "test-queue-14-B", "test-queue-14-C"] - fh = TestBaseCluster.FanoutExchangeTestHelper(self, "cluster-14", 4, "test-exchange-14", fanoutQueueNameList) - # Place initial 20 messages, retrieve 10 - fh.sendMsgs(20) - fh.receiveMsgs(10) - # Kill and add some nodes - fh.killNode(0) - fh.killNode(2) - fh.addNodes(2) - # Place another 20 messages, retrieve 20 - fh.sendMsgs(20) - fh.receiveMsgs(20) - # Kill and add another node - fh.killNode(4) - fh.addNodes() - # Add another 2 queues - fh.addQueues(["test-queue-14-D", "test-queue-14-E"]) - # Place another 20 messages, retrieve 20 - fh.sendMsgs(20) - fh.receiveMsgs(20) - # Kill all nodes but one - fh.killNode(1) - fh.killNode(3) - fh.killNode(6) - # Check messages - fh.finalizeTest() - except: - self.killAllClusters(True) - raise - -class LongTests(TestBaseCluster): - """Basic cluster with async store tests""" - - def test_01_TopicExchange(self): - """Create topic exchange in a cluster and make sure it behaves correctly""" - try: - topicQueueNameKeyList = {"test-queue-13-A" : "#.A", "test-queue-13-B" : "*.B", "test-queue-13-C" : "C.#", "test-queue-13-D" : "D.*"} - th = TestBaseCluster.TopicExchangeTestHelper(self, "cluster-13", 4, "test-exchange-13", topicQueueNameKeyList) - # Place initial messages - th.sendMsgs("C.hello.A", 10) - th.sendMsgs("hello.world", 10) # matches none of the queues - th.sendMsgs("D.hello.A", 10) - th.sendMsgs("hello.B", 20) - th.sendMsgs("D.hello", 20) - # Kill and add some nodes - th.killNode(0) - th.killNode(2) - th.addNodes(2) - # Pull 10 messages from each queue - th.receiveMsgs(10) - # Kill and add another node - th.killNode(4) - th.addNodes() - # Add two more queues - th.addQueues({"test-queue-13-E" : "#.bye.A", "test-queue-13-F" : "#.bye.B"}) - # Place more messages - th.sendMsgs("C.bye.A", 10) - th.sendMsgs("hello.bye", 20) # matches none of the queues - th.sendMsgs("hello.bye.B", 20) - th.sendMsgs("D.bye", 20) - # Kill all nodes but one - th.killNode(1) - th.killNode(3) - th.killNode(6) - # Pull all remaining messages from each queue and check messages - th.finalizeTest() - except: - self.killAllClusters(True) - raise - - def test_02_FanoutExchange(self): - """Create fanout exchange in a cluster and make sure it behaves correctly""" - try: - fanoutQueueNameList = ["test-queue-14-A", "test-queue-14-B", "test-queue-14-C"] - fh = TestBaseCluster.FanoutExchangeTestHelper(self, "cluster-14", 4, "test-exchange-14", fanoutQueueNameList) - # Place initial 20 messages, retrieve 10 - fh.sendMsgs(20) - fh.receiveMsgs(10) - # Kill and add some nodes - fh.killNode(0) - fh.killNode(2) - fh.addNodes(2) - # Place another 20 messages, retrieve 20 - fh.sendMsgs(20) - fh.receiveMsgs(20) - # Kill and add another node - fh.killNode(4) - fh.addNodes() - # Add another 2 queues - fh.addQueues(["test-queue-14-D", "test-queue-14-E"]) - # Place another 20 messages, retrieve 20 - fh.sendMsgs(20) - fh.receiveMsgs(20) - # Kill all nodes but one - fh.killNode(1) - fh.killNode(3) - fh.killNode(6) - # Check messages - fh.finalizeTest() - except: - self.killAllClusters(True) - raise - -# Start the test here - -if __name__ == '__main__': - if os.getenv("STORE_LIB") != None: - print "NOTE: Store enabled for the following tests:" - if not test.main(): sys.exit(1) - + self.sender.stop() + self.receiver.stop(self.sender.sent) |