diff options
Diffstat (limited to 'qpid/cpp/src/tests/cluster_tests.py')
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 177 |
1 files changed, 138 insertions, 39 deletions
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 8952f5de7b..09eebc5ec9 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -768,6 +768,35 @@ acl deny all all fetch(cluster[2]) + def _verify_federation(self, src_broker, src, dst_broker, dst, timeout=30): + """ Prove that traffic can pass between two federated brokers. + """ + tot_time = 0 + active = False + send_session = src_broker.connect().session() + sender = send_session.sender(src) + receive_session = dst_broker.connect().session() + receiver = receive_session.receiver(dst) + while not active and tot_time < timeout: + sender.send(Message("Hello from Source!")) + try: + receiver.fetch(timeout = 1) + receive_session.acknowledge() + # Get this far without Empty exception, and the link is good! + active = True + while True: + # Keep receiving msgs, as several may have accumulated + receiver.fetch(timeout = 1) + receive_session.acknowledge() + except Empty: + if not active: + tot_time += 1 + receiver.close() + receive_session.close() + sender.close() + send_session.close() + return active + def test_federation_failover(self): """ Verify that federation operates across failures occuring in a cluster. @@ -778,38 +807,6 @@ acl deny all all cluster to newly-added members """ - TIMEOUT = 30 - def verify(src_broker, src, dst_broker, dst, timeout=TIMEOUT): - """ Prove that traffic can pass from source fed broker to - destination fed broker - """ - tot_time = 0 - active = False - send_session = src_broker.connect().session() - sender = send_session.sender(src) - receive_session = dst_broker.connect().session() - receiver = receive_session.receiver(dst) - while not active and tot_time < timeout: - sender.send(Message("Hello from Source!")) - try: - receiver.fetch(timeout = 1) - receive_session.acknowledge() - # Get this far without Empty exception, and the link is good! - active = True - while True: - # Keep receiving msgs, as several may have accumulated - receiver.fetch(timeout = 1) - receive_session.acknowledge() - except Empty: - if not active: - tot_time += 1 - receiver.close() - receive_session.close() - sender.close() - send_session.close() - self.assertTrue(active, "Bridge failed to become active") - - # 2 node cluster source, 2 node cluster destination src_cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL) src_cluster.ready(); @@ -848,43 +845,145 @@ acl deny all all self.assertEqual(result.status, 0, result) # check that traffic passes - verify(src_cluster[0], "srcQ", dst_cluster[0], "destQ") + assert self._verify_federation(src_cluster[0], "srcQ", dst_cluster[0], "destQ") # add src[2] broker to source cluster src_cluster.start(expect=EXPECT_EXIT_FAIL); src_cluster.ready(); - verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ") + assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ") # Kill src[0]. dst[0] should fail over to src[1] src_cluster[0].kill() for b in src_cluster[1:]: b.ready() - verify(src_cluster[1], "srcQ", dst_cluster[0], "destQ") + assert self._verify_federation(src_cluster[1], "srcQ", dst_cluster[0], "destQ") # Kill src[1], dst[0] should fail over to src[2] src_cluster[1].kill() for b in src_cluster[2:]: b.ready() - verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ") + assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ") # Kill dest[0], force failover to dest[1] dst_cluster[0].kill() for b in dst_cluster[1:]: b.ready() - verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ") + assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ") # Add dest[2] # dest[1] syncs dest[2] to current remote state dst_cluster.start(expect=EXPECT_EXIT_FAIL); for b in dst_cluster[1:]: b.ready() - verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ") + assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ") # Kill dest[1], force failover to dest[2] dst_cluster[1].kill() for b in dst_cluster[2:]: b.ready() - verify(src_cluster[2], "srcQ", dst_cluster[2], "destQ") + assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[2], "destQ") for i in range(2, len(src_cluster)): src_cluster[i].kill() for i in range(2, len(dst_cluster)): dst_cluster[i].kill() + def test_federation_multilink_failover(self): + """ + Verify that multi-link federation operates across failures occuring in + a cluster. + """ + + # 1 node cluster source, 1 node cluster destination + src_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL) + src_cluster.ready(); + dst_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL) + dst_cluster.ready(); + + # federate a direct binding across two separate links + + # first, create a direct exchange bound to two queues using different + # bindings + cmd = self.popen(["qpid-config", + "--broker", src_cluster[0].host_port(), + "add", "exchange", "direct", "FedX"], + EXPECT_EXIT_OK) + cmd.wait() + + cmd = self.popen(["qpid-config", + "--broker", dst_cluster[0].host_port(), + "add", "exchange", "direct", "FedX"], + EXPECT_EXIT_OK) + cmd.wait() + + cmd = self.popen(["qpid-config", + "--broker", dst_cluster[0].host_port(), + "add", "queue", "destQ1"], + EXPECT_EXIT_OK) + cmd.wait() + + cmd = self.popen(["qpid-config", + "--broker", dst_cluster[0].host_port(), + "bind", "FedX", "destQ1", "one"], + EXPECT_EXIT_OK) + cmd.wait() + + cmd = self.popen(["qpid-config", + "--broker", dst_cluster[0].host_port(), + "add", "queue", "destQ2"], + EXPECT_EXIT_OK) + cmd.wait() + + cmd = self.popen(["qpid-config", + "--broker", dst_cluster[0].host_port(), + "bind", "FedX", "destQ2", "two"], + EXPECT_EXIT_OK) + cmd.wait() + + # Create two separate links between the dst and source brokers, bind + # each to different keys + dst_cluster[0].startQmf() + dst_broker = dst_cluster[0].qmf_session.getObjects(_class="broker")[0] + + for _l in [("link1", "bridge1", "one"), + ("link2", "bridge2", "two")]: + result = dst_broker.create("link", _l[0], + {"host":src_cluster[0].host(), + "port":src_cluster[0].port()}, + False) + self.assertEqual(result.status, 0, result); + result = dst_broker.create("bridge", _l[1], + {"link":_l[0], + "src":"FedX", + "dest":"FedX", + "key":_l[2]}, False) + self.assertEqual(result.status, 0); + + # check that traffic passes + assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1") + assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2") + + # add new member, verify traffic + src_cluster.start(expect=EXPECT_EXIT_FAIL); + src_cluster.ready(); + + dst_cluster.start(expect=EXPECT_EXIT_FAIL); + dst_cluster.ready(); + + assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1") + assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2") + + src_cluster[0].kill() + for b in src_cluster[1:]: b.ready() + + assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[0], "destQ1") + assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[0], "destQ2") + + dst_cluster[0].kill() + for b in dst_cluster[1:]: b.ready() + + assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[1], "destQ1") + assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[1], "destQ2") + + for i in range(1, len(src_cluster)): src_cluster[i].kill() + for i in range(1, len(dst_cluster)): dst_cluster[i].kill() + + + # Some utility code for transaction tests XA_RBROLLBACK = 1 XA_RBTIMEOUT = 2 |