summaryrefslogtreecommitdiff
path: root/cpp/src/tests/cluster_tests.py
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-05-04 15:39:19 +0000
committerKim van der Riet <kpvdr@apache.org>2012-05-04 15:39:19 +0000
commit633c33f224f3196f3f9bd80bd2e418d8143fea06 (patch)
tree1391da89470593209466df68c0b40b89c14963b1 /cpp/src/tests/cluster_tests.py
parentc73f9286ebff93a6c8dbc29cf05e258c4b55c976 (diff)
downloadqpid-python-633c33f224f3196f3f9bd80bd2e418d8143fea06.tar.gz
QPID-3858: Updated branch - merged from trunk r.1333987
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/cluster_tests.py')
-rwxr-xr-xcpp/src/tests/cluster_tests.py122
1 files changed, 120 insertions, 2 deletions
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index cbc3df4a6b..8952f5de7b 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -277,7 +277,7 @@ acl deny all all
QMF-based tools - regression test for BZ615300."""
broker1 = self.cluster(1)[0]
broker2 = self.cluster(1)[0]
- qs = subprocess.Popen(["qpid-stat", "-e", broker1.host_port()], stdout=subprocess.PIPE)
+ qs = subprocess.Popen(["qpid-stat", "-e", "-b", broker1.host_port()], stdout=subprocess.PIPE)
out = qs.communicate()[0]
assert out.find("amq.failover") > 0
@@ -767,6 +767,124 @@ acl deny all all
cluster.start()
fetch(cluster[2])
+
+ def test_federation_failover(self):
+ """
+ Verify that federation operates across failures occuring in a cluster.
+ Specifically:
+ 1) Destination cluster learns of membership changes in the source
+ cluster
+ 2) Destination cluster replicates the current state of the source
+ cluster to newly-added members
+ """
+
+ TIMEOUT = 30
+ def verify(src_broker, src, dst_broker, dst, timeout=TIMEOUT):
+ """ Prove that traffic can pass from source fed broker to
+ destination fed broker
+ """
+ tot_time = 0
+ active = False
+ send_session = src_broker.connect().session()
+ sender = send_session.sender(src)
+ receive_session = dst_broker.connect().session()
+ receiver = receive_session.receiver(dst)
+ while not active and tot_time < timeout:
+ sender.send(Message("Hello from Source!"))
+ try:
+ receiver.fetch(timeout = 1)
+ receive_session.acknowledge()
+ # Get this far without Empty exception, and the link is good!
+ active = True
+ while True:
+ # Keep receiving msgs, as several may have accumulated
+ receiver.fetch(timeout = 1)
+ receive_session.acknowledge()
+ except Empty:
+ if not active:
+ tot_time += 1
+ receiver.close()
+ receive_session.close()
+ sender.close()
+ send_session.close()
+ self.assertTrue(active, "Bridge failed to become active")
+
+
+ # 2 node cluster source, 2 node cluster destination
+ src_cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL)
+ src_cluster.ready();
+ dst_cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL)
+ dst_cluster.ready();
+
+ cmd = self.popen(["qpid-config",
+ "--broker", src_cluster[0].host_port(),
+ "add", "queue", "srcQ"], EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "add", "exchange", "fanout", "destX"], EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "add", "queue", "destQ"], EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "bind", "destX", "destQ"], EXPECT_EXIT_OK)
+ cmd.wait()
+
+ # federate the srcQ to the destination exchange
+ dst_cluster[0].startQmf()
+ dst_broker = dst_cluster[0].qmf_session.getObjects(_class="broker")[0]
+ result = dst_broker.connect(src_cluster[0].host(), src_cluster[0].port(), False, "PLAIN",
+ "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0, result);
+
+ link = dst_cluster[0].qmf_session.getObjects(_class="link")[0]
+ result = link.bridge(False, "srcQ", "destX", "", "", "", True, False, False, 10)
+ self.assertEqual(result.status, 0, result)
+
+ # check that traffic passes
+ verify(src_cluster[0], "srcQ", dst_cluster[0], "destQ")
+
+ # add src[2] broker to source cluster
+ src_cluster.start(expect=EXPECT_EXIT_FAIL);
+ src_cluster.ready();
+ verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
+
+ # Kill src[0]. dst[0] should fail over to src[1]
+ src_cluster[0].kill()
+ for b in src_cluster[1:]: b.ready()
+ verify(src_cluster[1], "srcQ", dst_cluster[0], "destQ")
+
+ # Kill src[1], dst[0] should fail over to src[2]
+ src_cluster[1].kill()
+ for b in src_cluster[2:]: b.ready()
+ verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
+
+ # Kill dest[0], force failover to dest[1]
+ dst_cluster[0].kill()
+ for b in dst_cluster[1:]: b.ready()
+ verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
+
+ # Add dest[2]
+ # dest[1] syncs dest[2] to current remote state
+ dst_cluster.start(expect=EXPECT_EXIT_FAIL);
+ for b in dst_cluster[1:]: b.ready()
+ verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
+
+ # Kill dest[1], force failover to dest[2]
+ dst_cluster[1].kill()
+ for b in dst_cluster[2:]: b.ready()
+ verify(src_cluster[2], "srcQ", dst_cluster[2], "destQ")
+
+ for i in range(2, len(src_cluster)): src_cluster[i].kill()
+ for i in range(2, len(dst_cluster)): dst_cluster[i].kill()
+
+
# Some utility code for transaction tests
XA_RBROLLBACK = 1
XA_RBTIMEOUT = 2
@@ -1160,7 +1278,7 @@ class LongTests(BrokerTest):
def start_mclients(broker):
"""Start management clients that make multiple connections."""
- cmd = ["qpid-stat", "-b", "localhost:%s" %(broker.port())]
+ cmd = ["qpid-cluster", "-C", "localhost:%s" %(broker.port())]
mclients.append(ClientLoop(broker, cmd))
endtime = time.time() + self.duration()