diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2012-05-04 13:36:42 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2012-05-04 13:36:42 +0000 |
commit | 673733dc03adb3ee4497d7a841342b5c380a65b2 (patch) | |
tree | b70f77c094b1e981e096908a827726cdaf45d319 | |
parent | 27fb41307ef6593f09a6aea10e58ce0c736bebf0 (diff) | |
download | qpid-python-qpid-3767.tar.gz |
QPID-3767: add multilink federation testsqpid-3767
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3767@1333954 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 177 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/federation.py | 163 |
2 files changed, 300 insertions, 40 deletions
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 8952f5de7b..09eebc5ec9 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -768,6 +768,35 @@ acl deny all all fetch(cluster[2]) + def _verify_federation(self, src_broker, src, dst_broker, dst, timeout=30): + """ Prove that traffic can pass between two federated brokers. + """ + tot_time = 0 + active = False + send_session = src_broker.connect().session() + sender = send_session.sender(src) + receive_session = dst_broker.connect().session() + receiver = receive_session.receiver(dst) + while not active and tot_time < timeout: + sender.send(Message("Hello from Source!")) + try: + receiver.fetch(timeout = 1) + receive_session.acknowledge() + # Get this far without Empty exception, and the link is good! + active = True + while True: + # Keep receiving msgs, as several may have accumulated + receiver.fetch(timeout = 1) + receive_session.acknowledge() + except Empty: + if not active: + tot_time += 1 + receiver.close() + receive_session.close() + sender.close() + send_session.close() + return active + def test_federation_failover(self): """ Verify that federation operates across failures occuring in a cluster. @@ -778,38 +807,6 @@ acl deny all all cluster to newly-added members """ - TIMEOUT = 30 - def verify(src_broker, src, dst_broker, dst, timeout=TIMEOUT): - """ Prove that traffic can pass from source fed broker to - destination fed broker - """ - tot_time = 0 - active = False - send_session = src_broker.connect().session() - sender = send_session.sender(src) - receive_session = dst_broker.connect().session() - receiver = receive_session.receiver(dst) - while not active and tot_time < timeout: - sender.send(Message("Hello from Source!")) - try: - receiver.fetch(timeout = 1) - receive_session.acknowledge() - # Get this far without Empty exception, and the link is good! - active = True - while True: - # Keep receiving msgs, as several may have accumulated - receiver.fetch(timeout = 1) - receive_session.acknowledge() - except Empty: - if not active: - tot_time += 1 - receiver.close() - receive_session.close() - sender.close() - send_session.close() - self.assertTrue(active, "Bridge failed to become active") - - # 2 node cluster source, 2 node cluster destination src_cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL) src_cluster.ready(); @@ -848,43 +845,145 @@ acl deny all all self.assertEqual(result.status, 0, result) # check that traffic passes - verify(src_cluster[0], "srcQ", dst_cluster[0], "destQ") + assert self._verify_federation(src_cluster[0], "srcQ", dst_cluster[0], "destQ") # add src[2] broker to source cluster src_cluster.start(expect=EXPECT_EXIT_FAIL); src_cluster.ready(); - verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ") + assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ") # Kill src[0]. dst[0] should fail over to src[1] src_cluster[0].kill() for b in src_cluster[1:]: b.ready() - verify(src_cluster[1], "srcQ", dst_cluster[0], "destQ") + assert self._verify_federation(src_cluster[1], "srcQ", dst_cluster[0], "destQ") # Kill src[1], dst[0] should fail over to src[2] src_cluster[1].kill() for b in src_cluster[2:]: b.ready() - verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ") + assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ") # Kill dest[0], force failover to dest[1] dst_cluster[0].kill() for b in dst_cluster[1:]: b.ready() - verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ") + assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ") # Add dest[2] # dest[1] syncs dest[2] to current remote state dst_cluster.start(expect=EXPECT_EXIT_FAIL); for b in dst_cluster[1:]: b.ready() - verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ") + assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ") # Kill dest[1], force failover to dest[2] dst_cluster[1].kill() for b in dst_cluster[2:]: b.ready() - verify(src_cluster[2], "srcQ", dst_cluster[2], "destQ") + assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[2], "destQ") for i in range(2, len(src_cluster)): src_cluster[i].kill() for i in range(2, len(dst_cluster)): dst_cluster[i].kill() + def test_federation_multilink_failover(self): + """ + Verify that multi-link federation operates across failures occuring in + a cluster. + """ + + # 1 node cluster source, 1 node cluster destination + src_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL) + src_cluster.ready(); + dst_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL) + dst_cluster.ready(); + + # federate a direct binding across two separate links + + # first, create a direct exchange bound to two queues using different + # bindings + cmd = self.popen(["qpid-config", + "--broker", src_cluster[0].host_port(), + "add", "exchange", "direct", "FedX"], + EXPECT_EXIT_OK) + cmd.wait() + + cmd = self.popen(["qpid-config", + "--broker", dst_cluster[0].host_port(), + "add", "exchange", "direct", "FedX"], + EXPECT_EXIT_OK) + cmd.wait() + + cmd = self.popen(["qpid-config", + "--broker", dst_cluster[0].host_port(), + "add", "queue", "destQ1"], + EXPECT_EXIT_OK) + cmd.wait() + + cmd = self.popen(["qpid-config", + "--broker", dst_cluster[0].host_port(), + "bind", "FedX", "destQ1", "one"], + EXPECT_EXIT_OK) + cmd.wait() + + cmd = self.popen(["qpid-config", + "--broker", dst_cluster[0].host_port(), + "add", "queue", "destQ2"], + EXPECT_EXIT_OK) + cmd.wait() + + cmd = self.popen(["qpid-config", + "--broker", dst_cluster[0].host_port(), + "bind", "FedX", "destQ2", "two"], + EXPECT_EXIT_OK) + cmd.wait() + + # Create two separate links between the dst and source brokers, bind + # each to different keys + dst_cluster[0].startQmf() + dst_broker = dst_cluster[0].qmf_session.getObjects(_class="broker")[0] + + for _l in [("link1", "bridge1", "one"), + ("link2", "bridge2", "two")]: + result = dst_broker.create("link", _l[0], + {"host":src_cluster[0].host(), + "port":src_cluster[0].port()}, + False) + self.assertEqual(result.status, 0, result); + result = dst_broker.create("bridge", _l[1], + {"link":_l[0], + "src":"FedX", + "dest":"FedX", + "key":_l[2]}, False) + self.assertEqual(result.status, 0); + + # check that traffic passes + assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1") + assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2") + + # add new member, verify traffic + src_cluster.start(expect=EXPECT_EXIT_FAIL); + src_cluster.ready(); + + dst_cluster.start(expect=EXPECT_EXIT_FAIL); + dst_cluster.ready(); + + assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1") + assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2") + + src_cluster[0].kill() + for b in src_cluster[1:]: b.ready() + + assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[0], "destQ1") + assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[0], "destQ2") + + dst_cluster[0].kill() + for b in dst_cluster[1:]: b.ready() + + assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[1], "destQ1") + assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[1], "destQ2") + + for i in range(1, len(src_cluster)): src_cluster[i].kill() + for i in range(1, len(dst_cluster)): dst_cluster[i].kill() + + + # Some utility code for transaction tests XA_RBROLLBACK = 1 XA_RBTIMEOUT = 2 diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py index 5f483ad258..5bcf67d152 100755 --- a/qpid/cpp/src/tests/federation.py +++ b/qpid/cpp/src/tests/federation.py @@ -23,6 +23,7 @@ from qpid.testlib import TestBase010 from qpid.datatypes import Message from qpid.queue import Empty from qpid.util import URL +import qpid.messaging from time import sleep, time @@ -94,6 +95,11 @@ class FederationTests(TestBase010): break self._brokers.append(_b) + # add a new-style messaging connection to each broker + for _b in self._brokers: + _b.connection = qpid.messaging.Connection(_b.url) + _b.connection.open() + def _teardown_brokers(self): """ Un-does _setup_brokers() """ @@ -103,7 +109,7 @@ class FederationTests(TestBase010): if not _b.client_session.error(): _b.client_session.close(timeout=10) _b.client_conn.close(timeout=10) - + _b.connection.close() def test_bridge_create_and_close(self): self.startQmf(); @@ -2165,3 +2171,158 @@ class FederationTests(TestBase010): self.verify_cleanup() + def test_multilink_direct(self): + """ Verify that two distinct links can be created between federated + brokers. + """ + self.startQmf() + qmf = self.qmf + self._setup_brokers() + src_broker = self._brokers[0] + dst_broker = self._brokers[1] + + # create a direct exchange on each broker + for _b in [src_broker, dst_broker]: + _b.client_session.exchange_declare(exchange="fedX.direct", type="direct") + self.assertEqual(_b.client_session.exchange_query(name="fedX.direct").type, + "direct", "exchange_declare failed!") + + # create destination queues + for _q in [("HiQ", "high"), ("MedQ", "medium"), ("LoQ", "low")]: + dst_broker.client_session.queue_declare(queue=_q[0], auto_delete=True) + dst_broker.client_session.exchange_bind(queue=_q[0], exchange="fedX.direct", binding_key=_q[1]) + + # create two connections, one for high priority traffic + for _q in ["HiPri", "Traffic"]: + result = dst_broker.qmf_object.create("link", _q, + {"host":src_broker.host, + "port":src_broker.port}, + False) + self.assertEqual(result.status, 0); + + links = qmf.getObjects(_broker=dst_broker.qmf_broker, _class="link") + for _l in links: + if _l.name == "HiPri": + hi_link = _l + elif _l.name == "Traffic": + data_link = _l + else: + self.fail("Unexpected Link found: " + _l.name) + + # now create a route for messages sent with key "high" to use the + # hi_link + result = dst_broker.qmf_object.create("bridge", "HiPriBridge", + {"link":hi_link.name, + "src":"fedX.direct", + "dest":"fedX.direct", + "key":"high"}, False) + self.assertEqual(result.status, 0); + + + # create routes for the "medium" and "low" links to use the normal + # data_link + for _b in [("MediumBridge", "medium"), ("LowBridge", "low")]: + result = dst_broker.qmf_object.create("bridge", _b[0], + {"link":data_link.name, + "src":"fedX.direct", + "dest":"fedX.direct", + "key":_b[1]}, False) + self.assertEqual(result.status, 0); + + # now wait for the links to become operational + for _l in [hi_link, data_link]: + expire_time = time() + 30 + while _l.state != "Operational" and time() < expire_time: + _l.update() + self.assertEqual(_l.state, "Operational", "Link failed to become operational") + + # verify each link uses a different connection + self.assertNotEqual(hi_link.connectionRef, data_link.connectionRef, + "Different links using the same connection") + + hi_conn = qmf.getObjects(_broker=dst_broker.qmf_broker, + _objectId=hi_link.connectionRef)[0] + data_conn = qmf.getObjects(_broker=dst_broker.qmf_broker, + _objectId=data_link.connectionRef)[0] + + + # send hi data, verify only goes over hi link + + r_ssn = dst_broker.connection.session() + hi_receiver = r_ssn.receiver("HiQ"); + med_receiver = r_ssn.receiver("MedQ"); + low_receiver = r_ssn.receiver("LoQ"); + + for _c in [hi_conn, data_conn]: + _c.update() + self.assertEqual(_c.msgsToClient, 0, "Unexpected messages received") + + s_ssn = src_broker.connection.session() + hi_sender = s_ssn.sender("fedX.direct/high") + med_sender = s_ssn.sender("fedX.direct/medium") + low_sender = s_ssn.sender("fedX.direct/low") + + try: + hi_sender.send(qpid.messaging.Message(content="hi priority")) + msg = hi_receiver.fetch(timeout=10) + r_ssn.acknowledge() + self.assertEqual(msg.content, "hi priority"); + except: + self.fail("Hi Pri message failure") + + hi_conn.update() + data_conn.update() + self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message") + self.assertEqual(data_conn.msgsToClient, 0, "Expected 0 data messages") + + # send low and medium, verify it does not go over hi link + + try: + med_sender.send(qpid.messaging.Message(content="medium priority")) + msg = med_receiver.fetch(timeout=10) + r_ssn.acknowledge() + self.assertEqual(msg.content, "medium priority"); + except: + self.fail("Medium Pri message failure") + + hi_conn.update() + data_conn.update() + self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message") + self.assertEqual(data_conn.msgsToClient, 1, "Expected 1 data message") + + try: + low_sender.send(qpid.messaging.Message(content="low priority")) + msg = low_receiver.fetch(timeout=10) + r_ssn.acknowledge() + self.assertEqual(msg.content, "low priority"); + except: + self.fail("Low Pri message failure") + + hi_conn.update() + data_conn.update() + self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message") + self.assertEqual(data_conn.msgsToClient, 2, "Expected 2 data message") + + # cleanup + + for _b in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + for _l in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="link"): + result = _l.close() + self.assertEqual(result.status, 0) + + for _q in [("HiQ", "high"), ("MedQ", "medium"), ("LoQ", "low")]: + dst_broker.client_session.exchange_unbind(queue=_q[0], exchange="fedX.direct", binding_key=_q[1]) + dst_broker.client_session.queue_delete(queue=_q[0]) + + for _b in [src_broker, dst_broker]: + _b.client_session.exchange_delete(exchange="fedX.direct") + + self._teardown_brokers() + + self.verify_cleanup() + + + |