summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2012-05-24 21:37:29 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2012-05-24 21:37:29 +0000
commitae9385ec1b22b6ebf68237ef90a61a83e5daceb1 (patch)
treee0b00a021971a01742d41edb113c97cb205cebdb /qpid/cpp
parent2255b3813bbd5d4557596afc83c4113f16b83b9b (diff)
downloadqpid-python-ae9385ec1b22b6ebf68237ef90a61a83e5daceb1.tar.gz
QPID-3767: add additional test case for link sharing.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1342442 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rwxr-xr-xqpid/cpp/src/tests/federation.py113
1 files changed, 113 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py
index 5bcf67d152..3b58dce107 100755
--- a/qpid/cpp/src/tests/federation.py
+++ b/qpid/cpp/src/tests/federation.py
@@ -2325,4 +2325,117 @@ class FederationTests(TestBase010):
self.verify_cleanup()
+ def test_multilink_shared_queue(self):
+ """ Verify that two distinct links can be created between federated
+ brokers.
+ """
+ self.startQmf()
+ qmf = self.qmf
+ self._setup_brokers()
+ src_broker = self._brokers[0]
+ dst_broker = self._brokers[1]
+
+ # create a topic exchange on the destination broker
+ dst_broker.client_session.exchange_declare(exchange="fedX.topic", type="topic")
+ self.assertEqual(dst_broker.client_session.exchange_query(name="fedX.topic").type,
+ "topic", "exchange_declare failed!")
+
+ # create a destination queue
+ dst_broker.client_session.queue_declare(queue="destQ", auto_delete=True)
+ dst_broker.client_session.exchange_bind(queue="destQ", exchange="fedX.topic", binding_key="srcQ")
+
+ # create a single source queue
+ src_broker.client_session.queue_declare(queue="srcQ", auto_delete=True)
+
+ # create two connections
+ for _q in ["Link1", "Link2"]:
+ result = dst_broker.qmf_object.create("link", _q,
+ {"host":src_broker.host,
+ "port":src_broker.port},
+ False)
+ self.assertEqual(result.status, 0);
+
+ links = qmf.getObjects(_broker=dst_broker.qmf_broker, _class="link")
+ self.assertEqual(len(links), 2)
+
+ # now create two "parallel" queue routes from the source queue to the
+ # destination exchange.
+ result = dst_broker.qmf_object.create("bridge", "Bridge1",
+ {"link":"Link1",
+ "src":"srcQ",
+ "dest":"fedX.topic",
+ "srcIsQueue": True},
+ False)
+ self.assertEqual(result.status, 0);
+ result = dst_broker.qmf_object.create("bridge", "Bridge2",
+ {"link":"Link2",
+ "src":"srcQ",
+ "dest":"fedX.topic",
+ "srcIsQueue": True},
+ False)
+ self.assertEqual(result.status, 0);
+
+
+ # now wait for the links to become operational
+ for _l in links:
+ expire_time = time() + 30
+ while _l.state != "Operational" and time() < expire_time:
+ _l.update()
+ self.assertEqual(_l.state, "Operational", "Link failed to become operational")
+
+ # verify each link uses a different connection
+ self.assertNotEqual(links[0].connectionRef, links[1].connectionRef,
+ "Different links using the same connection")
+
+ conn1 = qmf.getObjects(_broker=dst_broker.qmf_broker,
+ _objectId=links[0].connectionRef)[0]
+ conn2 = qmf.getObjects(_broker=dst_broker.qmf_broker,
+ _objectId=links[1].connectionRef)[0]
+
+ # verify messages sent to the queue are pulled by each connection
+
+ r_ssn = dst_broker.connection.session()
+ receiver = r_ssn.receiver("destQ");
+
+ for _c in [conn1, conn2]:
+ _c.update()
+ self.assertEqual(_c.msgsToClient, 0, "Unexpected messages received")
+
+ s_ssn = src_broker.connection.session()
+ sender = s_ssn.sender("srcQ")
+
+ try:
+ for x in range(5):
+ sender.send(qpid.messaging.Message(content="hello"))
+ for x in range(5):
+ msg = receiver.fetch(timeout=10)
+ self.assertEqual(msg.content, "hello");
+ r_ssn.acknowledge()
+ except:
+ self.fail("Message failure")
+
+ # expect messages to be split over each connection.
+ conn1.update()
+ conn2.update()
+ self.assertNotEqual(conn1.msgsToClient, 0, "No messages sent")
+ self.assertNotEqual(conn2.msgsToClient, 0, "No messages sent")
+ self.assertEqual(conn2.msgsToClient + conn1.msgsToClient, 5,
+ "Expected 5 messages total")
+
+ for _b in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="bridge"):
+ result = _b.close()
+ self.assertEqual(result.status, 0)
+
+ for _l in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="link"):
+ result = _l.close()
+ self.assertEqual(result.status, 0)
+
+ dst_broker.client_session.exchange_unbind(queue="destQ", exchange="fedX.topic", binding_key="srcQ")
+ dst_broker.client_session.exchange_delete(exchange="fedX.topic")
+
+ self._teardown_brokers()
+
+ self.verify_cleanup()
+
+