summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2012-05-01 13:57:58 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2012-05-01 13:57:58 +0000
commit9b29a602435ffb85517a2b6989dc0018b86c10b9 (patch)
tree2fa88b0c21c4bf17a77b6e72cfe0ce215050c69d
parentfe86122a0353cbb9369fdaa489498fc513c42dca (diff)
downloadqpid-python-9b29a602435ffb85517a2b6989dc0018b86c10b9.tar.gz
QPID-3963: add testcase for federation and cluster failover
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1332658 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py115
1 files changed, 115 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 79df21aada..28dbe62666 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -767,6 +767,121 @@ acl deny all all
cluster.start()
fetch(cluster[2])
+
+ def test_federation_failover(self):
+ """
+ Verify that federation operates across failures occuring in a cluster.
+ Specifically:
+ 1) Destination cluster learns of membership changes in the source
+ cluster
+ 2) Destination cluster replicates the current state of the source
+ cluster to newly-added members
+ """
+
+ TIMEOUT = 30
+ def verify(src_broker, src, dst_broker, dst, timeout=TIMEOUT):
+ """ Prove that traffic can pass from source fed broker to
+ destination fed broker
+ """
+ tot_time = 0
+ active = False
+ send_session = src_broker.connect().session()
+ sender = send_session.sender(src)
+ receive_session = dst_broker.connect().session()
+ receiver = receive_session.receiver(dst)
+ while not active and tot_time < timeout:
+ sender.send(Message("Hello from Source!"))
+ try:
+ receiver.fetch(timeout = 1)
+ receive_session.acknowledge()
+ # Get this far without Empty exception, and the link is good!
+ active = True
+ while True:
+ # Keep receiving msgs, as several may have accumulated
+ receiver.fetch(timeout = 1)
+ receive_session.acknowledge()
+ except Empty:
+ if not active:
+ tot_time += 1
+ receiver.close()
+ receive_session.close()
+ sender.close()
+ send_session.close()
+ self.assertTrue(active, "Bridge failed to become active")
+
+
+ # 1 node cluster source, 1 node cluster destination
+ src_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL)
+ src_cluster.ready();
+ dst_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL)
+ dst_cluster.ready();
+
+ cmd = self.popen(["qpid-config",
+ "--broker", src_cluster[0].host_port(),
+ "add", "queue", "srcQ"], EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "add", "exchange", "fanout", "destX"], EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "add", "queue", "destQ"], EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "bind", "destX", "destQ"], EXPECT_EXIT_OK)
+ cmd.wait()
+
+ # federate the srcQ to the destination exchange
+ dst_cluster[0].startQmf()
+ dst_broker = dst_cluster[0].qmf_session.getObjects(_class="broker")[0]
+ result = dst_broker.connect(src_cluster[0].host(), src_cluster[0].port(), False, "PLAIN",
+ "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0, result);
+
+ link = dst_cluster[0].qmf_session.getObjects(_class="link")[0]
+ result = link.bridge(False, "srcQ", "destX", "", "", "", True, False, False, 10)
+ self.assertEqual(result.status, 0, result)
+
+ # check that traffic passes
+ verify(src_cluster[0], "srcQ", dst_cluster[0], "destQ")
+
+ # add src[1] and src[2] brokers to source cluster
+ src_cluster.start(expect=EXPECT_EXIT_FAIL);
+ src_cluster.ready();
+ src_cluster.start(expect=EXPECT_EXIT_FAIL);
+ src_cluster.ready();
+ verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
+
+ # Kill src[0]. dst[0] should've learned about src[1,2]
+ src_cluster[0].kill()
+ for b in src_cluster[1:]: b.ready()
+ verify(src_cluster[1], "srcQ", dst_cluster[0], "destQ")
+
+ # Kill src[1], dst[0] should still be connected
+ src_cluster[1].kill()
+ for b in src_cluster[2:]: b.ready()
+ verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
+
+ # Add dest[1]
+ # dest[0] syncs dest[1] to current remote state
+ dst_cluster.start(expect=EXPECT_EXIT_FAIL);
+ dst_cluster.ready();
+ verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
+
+ # Kill dest[0], force failover to dest[1]
+ dst_cluster[0].kill()
+ for b in dst_cluster[1:]: b.ready()
+ verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
+
+ for i in range(2, len(src_cluster)): src_cluster[i].kill()
+ for i in range(1, len(dst_cluster)): dst_cluster[i].kill()
+
+
# Some utility code for transaction tests
XA_RBROLLBACK = 1
XA_RBTIMEOUT = 2