diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2012-05-24 21:37:29 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2012-05-24 21:37:29 +0000 |
commit | ae9385ec1b22b6ebf68237ef90a61a83e5daceb1 (patch) | |
tree | e0b00a021971a01742d41edb113c97cb205cebdb /qpid/cpp/src/tests/federation.py | |
parent | 2255b3813bbd5d4557596afc83c4113f16b83b9b (diff) | |
download | qpid-python-ae9385ec1b22b6ebf68237ef90a61a83e5daceb1.tar.gz |
QPID-3767: add additional test case for link sharing.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1342442 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/federation.py')
-rwxr-xr-x | qpid/cpp/src/tests/federation.py | 113 |
1 files changed, 113 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py index 5bcf67d152..3b58dce107 100755 --- a/qpid/cpp/src/tests/federation.py +++ b/qpid/cpp/src/tests/federation.py @@ -2325,4 +2325,117 @@ class FederationTests(TestBase010): self.verify_cleanup() + def test_multilink_shared_queue(self): + """ Verify that two distinct links can be created between federated + brokers. + """ + self.startQmf() + qmf = self.qmf + self._setup_brokers() + src_broker = self._brokers[0] + dst_broker = self._brokers[1] + + # create a topic exchange on the destination broker + dst_broker.client_session.exchange_declare(exchange="fedX.topic", type="topic") + self.assertEqual(dst_broker.client_session.exchange_query(name="fedX.topic").type, + "topic", "exchange_declare failed!") + + # create a destination queue + dst_broker.client_session.queue_declare(queue="destQ", auto_delete=True) + dst_broker.client_session.exchange_bind(queue="destQ", exchange="fedX.topic", binding_key="srcQ") + + # create a single source queue + src_broker.client_session.queue_declare(queue="srcQ", auto_delete=True) + + # create two connections + for _q in ["Link1", "Link2"]: + result = dst_broker.qmf_object.create("link", _q, + {"host":src_broker.host, + "port":src_broker.port}, + False) + self.assertEqual(result.status, 0); + + links = qmf.getObjects(_broker=dst_broker.qmf_broker, _class="link") + self.assertEqual(len(links), 2) + + # now create two "parallel" queue routes from the source queue to the + # destination exchange. + result = dst_broker.qmf_object.create("bridge", "Bridge1", + {"link":"Link1", + "src":"srcQ", + "dest":"fedX.topic", + "srcIsQueue": True}, + False) + self.assertEqual(result.status, 0); + result = dst_broker.qmf_object.create("bridge", "Bridge2", + {"link":"Link2", + "src":"srcQ", + "dest":"fedX.topic", + "srcIsQueue": True}, + False) + self.assertEqual(result.status, 0); + + + # now wait for the links to become operational + for _l in links: + expire_time = time() + 30 + while _l.state != "Operational" and time() < expire_time: + _l.update() + self.assertEqual(_l.state, "Operational", "Link failed to become operational") + + # verify each link uses a different connection + self.assertNotEqual(links[0].connectionRef, links[1].connectionRef, + "Different links using the same connection") + + conn1 = qmf.getObjects(_broker=dst_broker.qmf_broker, + _objectId=links[0].connectionRef)[0] + conn2 = qmf.getObjects(_broker=dst_broker.qmf_broker, + _objectId=links[1].connectionRef)[0] + + # verify messages sent to the queue are pulled by each connection + + r_ssn = dst_broker.connection.session() + receiver = r_ssn.receiver("destQ"); + + for _c in [conn1, conn2]: + _c.update() + self.assertEqual(_c.msgsToClient, 0, "Unexpected messages received") + + s_ssn = src_broker.connection.session() + sender = s_ssn.sender("srcQ") + + try: + for x in range(5): + sender.send(qpid.messaging.Message(content="hello")) + for x in range(5): + msg = receiver.fetch(timeout=10) + self.assertEqual(msg.content, "hello"); + r_ssn.acknowledge() + except: + self.fail("Message failure") + + # expect messages to be split over each connection. + conn1.update() + conn2.update() + self.assertNotEqual(conn1.msgsToClient, 0, "No messages sent") + self.assertNotEqual(conn2.msgsToClient, 0, "No messages sent") + self.assertEqual(conn2.msgsToClient + conn1.msgsToClient, 5, + "Expected 5 messages total") + + for _b in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + for _l in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="link"): + result = _l.close() + self.assertEqual(result.status, 0) + + dst_broker.client_session.exchange_unbind(queue="destQ", exchange="fedX.topic", binding_key="srcQ") + dst_broker.client_session.exchange_delete(exchange="fedX.topic") + + self._teardown_brokers() + + self.verify_cleanup() + + |