summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/cluster_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/cluster_tests.py')
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py177
1 files changed, 138 insertions, 39 deletions
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 8952f5de7b..09eebc5ec9 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -768,6 +768,35 @@ acl deny all all
fetch(cluster[2])
+ def _verify_federation(self, src_broker, src, dst_broker, dst, timeout=30):
+ """ Prove that traffic can pass between two federated brokers.
+ """
+ tot_time = 0
+ active = False
+ send_session = src_broker.connect().session()
+ sender = send_session.sender(src)
+ receive_session = dst_broker.connect().session()
+ receiver = receive_session.receiver(dst)
+ while not active and tot_time < timeout:
+ sender.send(Message("Hello from Source!"))
+ try:
+ receiver.fetch(timeout = 1)
+ receive_session.acknowledge()
+ # Get this far without Empty exception, and the link is good!
+ active = True
+ while True:
+ # Keep receiving msgs, as several may have accumulated
+ receiver.fetch(timeout = 1)
+ receive_session.acknowledge()
+ except Empty:
+ if not active:
+ tot_time += 1
+ receiver.close()
+ receive_session.close()
+ sender.close()
+ send_session.close()
+ return active
+
def test_federation_failover(self):
"""
Verify that federation operates across failures occuring in a cluster.
@@ -778,38 +807,6 @@ acl deny all all
cluster to newly-added members
"""
- TIMEOUT = 30
- def verify(src_broker, src, dst_broker, dst, timeout=TIMEOUT):
- """ Prove that traffic can pass from source fed broker to
- destination fed broker
- """
- tot_time = 0
- active = False
- send_session = src_broker.connect().session()
- sender = send_session.sender(src)
- receive_session = dst_broker.connect().session()
- receiver = receive_session.receiver(dst)
- while not active and tot_time < timeout:
- sender.send(Message("Hello from Source!"))
- try:
- receiver.fetch(timeout = 1)
- receive_session.acknowledge()
- # Get this far without Empty exception, and the link is good!
- active = True
- while True:
- # Keep receiving msgs, as several may have accumulated
- receiver.fetch(timeout = 1)
- receive_session.acknowledge()
- except Empty:
- if not active:
- tot_time += 1
- receiver.close()
- receive_session.close()
- sender.close()
- send_session.close()
- self.assertTrue(active, "Bridge failed to become active")
-
-
# 2 node cluster source, 2 node cluster destination
src_cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL)
src_cluster.ready();
@@ -848,43 +845,145 @@ acl deny all all
self.assertEqual(result.status, 0, result)
# check that traffic passes
- verify(src_cluster[0], "srcQ", dst_cluster[0], "destQ")
+ assert self._verify_federation(src_cluster[0], "srcQ", dst_cluster[0], "destQ")
# add src[2] broker to source cluster
src_cluster.start(expect=EXPECT_EXIT_FAIL);
src_cluster.ready();
- verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
# Kill src[0]. dst[0] should fail over to src[1]
src_cluster[0].kill()
for b in src_cluster[1:]: b.ready()
- verify(src_cluster[1], "srcQ", dst_cluster[0], "destQ")
+ assert self._verify_federation(src_cluster[1], "srcQ", dst_cluster[0], "destQ")
# Kill src[1], dst[0] should fail over to src[2]
src_cluster[1].kill()
for b in src_cluster[2:]: b.ready()
- verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
# Kill dest[0], force failover to dest[1]
dst_cluster[0].kill()
for b in dst_cluster[1:]: b.ready()
- verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
# Add dest[2]
# dest[1] syncs dest[2] to current remote state
dst_cluster.start(expect=EXPECT_EXIT_FAIL);
for b in dst_cluster[1:]: b.ready()
- verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
# Kill dest[1], force failover to dest[2]
dst_cluster[1].kill()
for b in dst_cluster[2:]: b.ready()
- verify(src_cluster[2], "srcQ", dst_cluster[2], "destQ")
+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[2], "destQ")
for i in range(2, len(src_cluster)): src_cluster[i].kill()
for i in range(2, len(dst_cluster)): dst_cluster[i].kill()
+ def test_federation_multilink_failover(self):
+ """
+ Verify that multi-link federation operates across failures occuring in
+ a cluster.
+ """
+
+ # 1 node cluster source, 1 node cluster destination
+ src_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL)
+ src_cluster.ready();
+ dst_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL)
+ dst_cluster.ready();
+
+ # federate a direct binding across two separate links
+
+ # first, create a direct exchange bound to two queues using different
+ # bindings
+ cmd = self.popen(["qpid-config",
+ "--broker", src_cluster[0].host_port(),
+ "add", "exchange", "direct", "FedX"],
+ EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "add", "exchange", "direct", "FedX"],
+ EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "add", "queue", "destQ1"],
+ EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "bind", "FedX", "destQ1", "one"],
+ EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "add", "queue", "destQ2"],
+ EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "bind", "FedX", "destQ2", "two"],
+ EXPECT_EXIT_OK)
+ cmd.wait()
+
+ # Create two separate links between the dst and source brokers, bind
+ # each to different keys
+ dst_cluster[0].startQmf()
+ dst_broker = dst_cluster[0].qmf_session.getObjects(_class="broker")[0]
+
+ for _l in [("link1", "bridge1", "one"),
+ ("link2", "bridge2", "two")]:
+ result = dst_broker.create("link", _l[0],
+ {"host":src_cluster[0].host(),
+ "port":src_cluster[0].port()},
+ False)
+ self.assertEqual(result.status, 0, result);
+ result = dst_broker.create("bridge", _l[1],
+ {"link":_l[0],
+ "src":"FedX",
+ "dest":"FedX",
+ "key":_l[2]}, False)
+ self.assertEqual(result.status, 0);
+
+ # check that traffic passes
+ assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1")
+ assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2")
+
+ # add new member, verify traffic
+ src_cluster.start(expect=EXPECT_EXIT_FAIL);
+ src_cluster.ready();
+
+ dst_cluster.start(expect=EXPECT_EXIT_FAIL);
+ dst_cluster.ready();
+
+ assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1")
+ assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2")
+
+ src_cluster[0].kill()
+ for b in src_cluster[1:]: b.ready()
+
+ assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[0], "destQ1")
+ assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[0], "destQ2")
+
+ dst_cluster[0].kill()
+ for b in dst_cluster[1:]: b.ready()
+
+ assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[1], "destQ1")
+ assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[1], "destQ2")
+
+ for i in range(1, len(src_cluster)): src_cluster[i].kill()
+ for i in range(1, len(dst_cluster)): dst_cluster[i].kill()
+
+
+
# Some utility code for transaction tests
XA_RBROLLBACK = 1
XA_RBTIMEOUT = 2