summaryrefslogtreecommitdiff
path: root/cpp/src/tests/cluster_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/cluster_tests.py')
-rwxr-xr-xcpp/src/tests/cluster_tests.py233
1 files changed, 233 insertions, 0 deletions
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
new file mode 100755
index 0000000000..178271e977
--- /dev/null
+++ b/cpp/src/tests/cluster_tests.py
@@ -0,0 +1,233 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, signal, sys, time, imp, re
+from qpid import datatypes, messaging
+from qpid.brokertest import *
+from qpid.harness import Skipped
+from qpid.messaging import Message
+from threading import Thread
+from logging import getLogger
+
+log = getLogger("qpid.cluster_tests")
+
+# Import scripts as modules
+qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC"))
+
+class ShortTests(BrokerTest):
+ """Short cluster functionality tests."""
+
+ def test_message_replication(self):
+ """Test basic cluster message replication."""
+ # Start a cluster, send some messages to member 0.
+ cluster = self.cluster(2)
+ s0 = cluster[0].connect().session()
+ s0.sender("q; {create:always}").send(Message("x"))
+ s0.sender("q; {create:always}").send(Message("y"))
+ s0.connection.close()
+
+ # Verify messages available on member 1.
+ s1 = cluster[1].connect().session()
+ m = s1.receiver("q", capacity=1).fetch(timeout=1)
+ s1.acknowledge()
+ self.assertEqual("x", m.content)
+ s1.connection.close()
+
+ # Start member 2 and verify messages available.
+ s2 = cluster.start().connect().session()
+ m = s2.receiver("q", capacity=1).fetch(timeout=1)
+ s2.acknowledge()
+ self.assertEqual("y", m.content)
+ s2.connection.close()
+
+ def test_store_direct_update_match(self):
+ """Verify that brokers stores an identical message whether they receive it
+ direct from clients or during an update, no header or other differences"""
+ cluster = self.cluster(0, args=["--load-module", self.test_store_lib])
+ cluster.start(args=["--test-store-dump", "direct.dump"])
+ # Try messages with various headers
+ cluster[0].send_message("q", Message(durable=True, content="foobar",
+ subject="subject",
+ reply_to="reply_to",
+ properties={"n":10}))
+ # Try messages of different sizes
+ for size in range(0,10000,100):
+ cluster[0].send_message("q", Message(content="x"*size, durable=True))
+ # Try sending via named exchange
+ c = cluster[0].connect_old()
+ s = c.session(str(qpid.datatypes.uuid4()))
+ s.exchange_bind(exchange="amq.direct", binding_key="foo", queue="q")
+ props = s.delivery_properties(routing_key="foo", delivery_mode=2)
+ s.message_transfer(
+ destination="amq.direct",
+ message=qpid.datatypes.Message(props, "content"))
+
+ # Now update a new member and compare their dumps.
+ cluster.start(args=["--test-store-dump", "updatee.dump"])
+ assert file("direct.dump").read() == file("updatee.dump").read()
+ os.remove("direct.dump")
+ os.remove("updatee.dump")
+
+class LongTests(BrokerTest):
+ """Tests that can run for a long time if -DDURATION=<minutes> is set"""
+ def duration(self):
+ d = self.config.defines.get("DURATION")
+ if d: return float(d)*60
+ else: return 3 # Default is to be quick
+
+ def test_failover(self):
+ """Test fail-over during continuous send-receive with errors"""
+
+ # Original cluster will all be killed so expect exit with failure
+ cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
+ for b in cluster: ErrorGenerator(b)
+
+ # Start sender and receiver threads
+ cluster[0].declare_queue("test-queue")
+ sender = NumberedSender(cluster[1], 1000) # Max queue depth
+ receiver = NumberedReceiver(cluster[2], sender)
+ receiver.start()
+ sender.start()
+
+ # Kill original brokers, start new ones for the duration.
+ endtime = time.time() + self.duration()
+ i = 0
+ while time.time() < endtime:
+ cluster[i].kill()
+ i += 1
+ b = cluster.start(expect=EXPECT_EXIT_FAIL)
+ ErrorGenerator(b)
+ time.sleep(1)
+ sender.stop()
+ receiver.stop(sender.sent)
+ for i in range(i, len(cluster)): cluster[i].kill()
+
+
+class StoreTests(BrokerTest):
+ """
+ Cluster tests that can only be run if there is a store available.
+ """
+ def args(self):
+ assert BrokerTest.store_lib
+ return ["--load-module", BrokerTest.store_lib]
+
+ def test_store_loaded(self):
+ """Ensure we are indeed loading a working store"""
+ broker = self.broker(self.args(), name="recoverme", expect=EXPECT_EXIT_FAIL)
+ m = Message("x", durable=True)
+ broker.send_message("q", m)
+ broker.kill()
+ broker = self.broker(self.args(), name="recoverme")
+ self.assertEqual("x", broker.get_message("q").content)
+
+ def test_kill_restart(self):
+ """Verify we can kill/resetart a broker with store in a cluster"""
+ cluster = self.cluster(1, self.args())
+ cluster.start("restartme", expect=EXPECT_EXIT_FAIL).kill()
+
+ # Send a message, retrieve from the restarted broker
+ cluster[0].send_message("q", "x")
+ m = cluster.start("restartme").get_message("q")
+ self.assertEqual("x", m.content)
+
+ def test_persistent_restart(self):
+ """Verify persistent cluster shutdown/restart scenarios"""
+ cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"])
+ a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
+ b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
+ c = cluster.start("c", expect=EXPECT_EXIT_FAIL, wait=True)
+ a.send_message("q", Message("1", durable=True))
+ # Kill & restart one member.
+ c.kill()
+ self.assertEqual(a.get_message("q").content, "1")
+ a.send_message("q", Message("2", durable=True))
+ c = cluster.start("c", expect=EXPECT_EXIT_OK)
+ self.assertEqual(c.get_message("q").content, "2")
+ # Shut down the entire cluster cleanly and bring it back up
+ a.send_message("q", Message("3", durable=True))
+ qpid_cluster.main(["qpid-cluster", "-kf", a.host_port()])
+ a = cluster.start("a", wait=False)
+ b = cluster.start("b", wait=False)
+ c = cluster.start("c", wait=True)
+ self.assertEqual(a.get_message("q").content, "3")
+
+ def test_persistent_partial_failure(self):
+ # Kill 2 members, shut down the last cleanly then restart
+ # Ensure we use the clean database
+ cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"])
+ a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
+ b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False)
+ c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=True)
+ a.send_message("q", Message("4", durable=True))
+ a.kill()
+ b.kill()
+ self.assertEqual(c.get_message("q").content, "4")
+ c.send_message("q", Message("clean", durable=True))
+ qpid_cluster.main(["qpid-cluster", "-kf", c.host_port()])
+ a = cluster.start("a", wait=False)
+ b = cluster.start("b", wait=False)
+ c = cluster.start("c", wait=True)
+ self.assertEqual(a.get_message("q").content, "clean")
+
+ def test_wrong_cluster_id(self):
+ # Start a cluster1 broker, then try to restart in cluster2
+ cluster1 = self.cluster(0, args=self.args())
+ a = cluster1.start("a", expect=EXPECT_EXIT_OK)
+ a.terminate()
+ cluster2 = self.cluster(1, args=self.args())
+ try:
+ a = cluster2.start("a", expect=EXPECT_EXIT_FAIL)
+ self.fail("Expected exception")
+ except: pass
+
+ def test_wrong_shutdown_id(self):
+ # Start 2 members and shut down.
+ cluster = self.cluster(0, args=self.args()+["--cluster-size=2"])
+ a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
+ b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
+ self.assertEqual(0, qpid_cluster.main(["qpid_cluster", "-kf", a.host_port()]))
+ self.assertEqual(a.wait(), 0)
+ self.assertEqual(b.wait(), 0)
+
+ # Restart with a different member and shut down.
+ a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
+ c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=False)
+ self.assertEqual(0, qpid_cluster.main(["qpid_cluster", "-kf", a.host_port()]))
+ self.assertEqual(a.wait(), 0)
+ self.assertEqual(c.wait(), 0)
+
+ # Mix members from both shutdown events, they should fail
+ a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
+ b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False)
+
+ def test_total_failure(self):
+ # Verify we abort with sutiable error message if no clean stores.
+ cluster = self.cluster(0, args=self.args()+["--cluster-size=2"])
+ a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
+ b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=True)
+ a.kill()
+ b.kill()
+ a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
+ b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False)
+ assert a.wait() != 0
+ assert b.wait() != 0
+ msg = re.compile("critical.*no clean store")
+ assert msg.search(file(a.log).read())
+ assert msg.search(file(b.log).read())