diff options
Diffstat (limited to 'cpp/src/tests/cluster_tests.py')
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 233 |
1 files changed, 233 insertions, 0 deletions
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py new file mode 100755 index 0000000000..178271e977 --- /dev/null +++ b/cpp/src/tests/cluster_tests.py @@ -0,0 +1,233 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os, signal, sys, time, imp, re +from qpid import datatypes, messaging +from qpid.brokertest import * +from qpid.harness import Skipped +from qpid.messaging import Message +from threading import Thread +from logging import getLogger + +log = getLogger("qpid.cluster_tests") + +# Import scripts as modules +qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC")) + +class ShortTests(BrokerTest): + """Short cluster functionality tests.""" + + def test_message_replication(self): + """Test basic cluster message replication.""" + # Start a cluster, send some messages to member 0. + cluster = self.cluster(2) + s0 = cluster[0].connect().session() + s0.sender("q; {create:always}").send(Message("x")) + s0.sender("q; {create:always}").send(Message("y")) + s0.connection.close() + + # Verify messages available on member 1. + s1 = cluster[1].connect().session() + m = s1.receiver("q", capacity=1).fetch(timeout=1) + s1.acknowledge() + self.assertEqual("x", m.content) + s1.connection.close() + + # Start member 2 and verify messages available. + s2 = cluster.start().connect().session() + m = s2.receiver("q", capacity=1).fetch(timeout=1) + s2.acknowledge() + self.assertEqual("y", m.content) + s2.connection.close() + + def test_store_direct_update_match(self): + """Verify that brokers stores an identical message whether they receive it + direct from clients or during an update, no header or other differences""" + cluster = self.cluster(0, args=["--load-module", self.test_store_lib]) + cluster.start(args=["--test-store-dump", "direct.dump"]) + # Try messages with various headers + cluster[0].send_message("q", Message(durable=True, content="foobar", + subject="subject", + reply_to="reply_to", + properties={"n":10})) + # Try messages of different sizes + for size in range(0,10000,100): + cluster[0].send_message("q", Message(content="x"*size, durable=True)) + # Try sending via named exchange + c = cluster[0].connect_old() + s = c.session(str(qpid.datatypes.uuid4())) + s.exchange_bind(exchange="amq.direct", binding_key="foo", queue="q") + props = s.delivery_properties(routing_key="foo", delivery_mode=2) + s.message_transfer( + destination="amq.direct", + message=qpid.datatypes.Message(props, "content")) + + # Now update a new member and compare their dumps. + cluster.start(args=["--test-store-dump", "updatee.dump"]) + assert file("direct.dump").read() == file("updatee.dump").read() + os.remove("direct.dump") + os.remove("updatee.dump") + +class LongTests(BrokerTest): + """Tests that can run for a long time if -DDURATION=<minutes> is set""" + def duration(self): + d = self.config.defines.get("DURATION") + if d: return float(d)*60 + else: return 3 # Default is to be quick + + def test_failover(self): + """Test fail-over during continuous send-receive with errors""" + + # Original cluster will all be killed so expect exit with failure + cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL) + for b in cluster: ErrorGenerator(b) + + # Start sender and receiver threads + cluster[0].declare_queue("test-queue") + sender = NumberedSender(cluster[1], 1000) # Max queue depth + receiver = NumberedReceiver(cluster[2], sender) + receiver.start() + sender.start() + + # Kill original brokers, start new ones for the duration. + endtime = time.time() + self.duration() + i = 0 + while time.time() < endtime: + cluster[i].kill() + i += 1 + b = cluster.start(expect=EXPECT_EXIT_FAIL) + ErrorGenerator(b) + time.sleep(1) + sender.stop() + receiver.stop(sender.sent) + for i in range(i, len(cluster)): cluster[i].kill() + + +class StoreTests(BrokerTest): + """ + Cluster tests that can only be run if there is a store available. + """ + def args(self): + assert BrokerTest.store_lib + return ["--load-module", BrokerTest.store_lib] + + def test_store_loaded(self): + """Ensure we are indeed loading a working store""" + broker = self.broker(self.args(), name="recoverme", expect=EXPECT_EXIT_FAIL) + m = Message("x", durable=True) + broker.send_message("q", m) + broker.kill() + broker = self.broker(self.args(), name="recoverme") + self.assertEqual("x", broker.get_message("q").content) + + def test_kill_restart(self): + """Verify we can kill/resetart a broker with store in a cluster""" + cluster = self.cluster(1, self.args()) + cluster.start("restartme", expect=EXPECT_EXIT_FAIL).kill() + + # Send a message, retrieve from the restarted broker + cluster[0].send_message("q", "x") + m = cluster.start("restartme").get_message("q") + self.assertEqual("x", m.content) + + def test_persistent_restart(self): + """Verify persistent cluster shutdown/restart scenarios""" + cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"]) + a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) + b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False) + c = cluster.start("c", expect=EXPECT_EXIT_FAIL, wait=True) + a.send_message("q", Message("1", durable=True)) + # Kill & restart one member. + c.kill() + self.assertEqual(a.get_message("q").content, "1") + a.send_message("q", Message("2", durable=True)) + c = cluster.start("c", expect=EXPECT_EXIT_OK) + self.assertEqual(c.get_message("q").content, "2") + # Shut down the entire cluster cleanly and bring it back up + a.send_message("q", Message("3", durable=True)) + qpid_cluster.main(["qpid-cluster", "-kf", a.host_port()]) + a = cluster.start("a", wait=False) + b = cluster.start("b", wait=False) + c = cluster.start("c", wait=True) + self.assertEqual(a.get_message("q").content, "3") + + def test_persistent_partial_failure(self): + # Kill 2 members, shut down the last cleanly then restart + # Ensure we use the clean database + cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"]) + a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False) + b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False) + c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=True) + a.send_message("q", Message("4", durable=True)) + a.kill() + b.kill() + self.assertEqual(c.get_message("q").content, "4") + c.send_message("q", Message("clean", durable=True)) + qpid_cluster.main(["qpid-cluster", "-kf", c.host_port()]) + a = cluster.start("a", wait=False) + b = cluster.start("b", wait=False) + c = cluster.start("c", wait=True) + self.assertEqual(a.get_message("q").content, "clean") + + def test_wrong_cluster_id(self): + # Start a cluster1 broker, then try to restart in cluster2 + cluster1 = self.cluster(0, args=self.args()) + a = cluster1.start("a", expect=EXPECT_EXIT_OK) + a.terminate() + cluster2 = self.cluster(1, args=self.args()) + try: + a = cluster2.start("a", expect=EXPECT_EXIT_FAIL) + self.fail("Expected exception") + except: pass + + def test_wrong_shutdown_id(self): + # Start 2 members and shut down. + cluster = self.cluster(0, args=self.args()+["--cluster-size=2"]) + a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) + b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False) + self.assertEqual(0, qpid_cluster.main(["qpid_cluster", "-kf", a.host_port()])) + self.assertEqual(a.wait(), 0) + self.assertEqual(b.wait(), 0) + + # Restart with a different member and shut down. + a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) + c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=False) + self.assertEqual(0, qpid_cluster.main(["qpid_cluster", "-kf", a.host_port()])) + self.assertEqual(a.wait(), 0) + self.assertEqual(c.wait(), 0) + + # Mix members from both shutdown events, they should fail + a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False) + b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False) + + def test_total_failure(self): + # Verify we abort with sutiable error message if no clean stores. + cluster = self.cluster(0, args=self.args()+["--cluster-size=2"]) + a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False) + b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=True) + a.kill() + b.kill() + a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False) + b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False) + assert a.wait() != 0 + assert b.wait() != 0 + msg = re.compile("critical.*no clean store") + assert msg.search(file(a.log).read()) + assert msg.search(file(b.log).read()) |