diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2012-06-18 22:36:51 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2012-06-18 22:36:51 +0000 |
commit | c0a9cdb63a9e1b58bfed14dd6a3f7d8c6375e5af (patch) | |
tree | 8c11f27f49af76c55ff188a3a822302c801b62cf /qpid/cpp/src/tests/federation.py | |
parent | 5fbacc774744500e604d58d8904e1c3f8f09578a (diff) | |
download | qpid-python-c0a9cdb63a9e1b58bfed14dd6a3f7d8c6375e5af.tar.gz |
QPID-4063: allow configuration of source queue for exchange or dynamic routes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1351518 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/federation.py')
-rwxr-xr-x | qpid/cpp/src/tests/federation.py | 165 |
1 files changed, 164 insertions, 1 deletions
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py index d3dfaedaf9..fe037c720d 100755 --- a/qpid/cpp/src/tests/federation.py +++ b/qpid/cpp/src/tests/federation.py @@ -2328,6 +2328,7 @@ class FederationTests(TestBase010): self.verify_cleanup() + def test_multilink_shared_queue(self): """ Verify that two distinct links can be created between federated brokers. @@ -2441,4 +2442,166 @@ class FederationTests(TestBase010): self.verify_cleanup() - + def test_dynamic_direct_shared_queue(self): + """ + Route Topology: + + +<--- B1 + B0 <---+<--- B2 + +<--- B3 + """ + session = self.session + + # create the federation + + self.startQmf() + qmf = self.qmf + + self._setup_brokers() + + # create direct exchange on each broker, and retrieve the corresponding + # management object for that exchange + + exchanges=[] + for _b in self._brokers: + _b.client_session.exchange_declare(exchange="fedX.direct", type="direct") + self.assertEqual(_b.client_session.exchange_query(name="fedX.direct").type, + "direct", "exchange_declare failed!") + # pull the exchange out of qmf... + retries = 0 + my_exchange = None + while my_exchange is None: + objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange") + for ooo in objs: + if ooo.name == "fedX.direct": + my_exchange = ooo + break + if my_exchange is None: + retries += 1 + self.failIfEqual(retries, 10, + "QMF failed to find new exchange!") + sleep(1) + exchanges.append(my_exchange) + + self.assertEqual(len(exchanges), len(self._brokers), "Exchange creation failed!") + + # Create 2 links per each source broker (1,2,3) to the downstream + # broker 0: + for _b in range(1,4): + for _l in ["dynamic", "queue"]: + result = self._brokers[0].qmf_object.create( "link", + "Link-%d-%s" % (_b, _l), + {"host":self._brokers[_b].host, + "port":self._brokers[_b].port}, False) + self.assertEqual(result.status, 0) + + # create queue on source brokers for use by the dynamic route + self._brokers[_b].client_session.queue_declare(queue="fedSrcQ", exclusive=False, auto_delete=True) + + for _l in range(1,4): + # for each dynamic link, create a dynamic bridge for the "fedX.direct" + # exchanges, using the fedSrcQ on each upstream source broker + result = self._brokers[0].qmf_object.create("bridge", + "Bridge-%d-dynamic" % _l, + {"link":"Link-%d-dynamic" % _l, + "src":"fedX.direct", + "dest":"fedX.direct", + "dynamic":True, + "queue":"fedSrcQ"}, False) + self.assertEqual(result.status, 0) + + # create a queue route that shares the queue used by the dynamic route + result = self._brokers[0].qmf_object.create("bridge", + "Bridge-%d-queue" % _l, + {"link":"Link-%d-queue" % _l, + "src":"fedSrcQ", + "dest":"fedX.direct", + "srcIsQueue":True}, False) + self.assertEqual(result.status, 0) + + + # wait for the inter-broker links to become operational + retries = 0 + operational = False + while not operational: + operational = True + for _l in qmf.getObjects(_class="link"): + #print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state))) + if _l.state != "Operational": + operational = False + if not operational: + retries += 1 + self.failIfEqual(retries, 10, + "inter-broker links failed to become operational.") + sleep(1) + + # @todo - There is no way to determine when the bridge objects become + # active. Hopefully, this is long enough! + sleep(6) + + # create a queue on B0, bound to "spudboy" + self._brokers[0].client_session.queue_declare(queue="DestQ", exclusive=True, auto_delete=True) + self._brokers[0].client_session.exchange_bind(queue="DestQ", exchange="fedX.direct", binding_key="spudboy") + + # subscribe to messages arriving on B2's queue + self.subscribe(self._brokers[0].client_session, queue="DestQ", destination="f1") + queue = self._brokers[0].client_session.incoming("f1") + + # wait until the binding key has propagated to each broker + + binding_counts = [1, 1, 1, 1] + self.assertEqual(len(binding_counts), len(exchanges), "Update Test!") + for i in range(3,-1,-1): + retries = 0 + exchanges[i].update() + while exchanges[i].bindingCount < binding_counts[i]: + retries += 1 + self.failIfEqual(retries, 10, + "binding failed to propagate to broker %d" + % i) + sleep(3) + exchanges[i].update() + + for _b in range(1,4): + # send 3 msgs from each source broker + for i in range(3): + dp = self._brokers[_b].client_session.delivery_properties(routing_key="spudboy") + self._brokers[_b].client_session.message_transfer(destination="fedX.direct", message=Message(dp, "Message_drp %d" % i)) + + # get exactly 9 (3 per broker) on B0 + for i in range(9): + msg = queue.get(timeout=5) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + # verify that messages went across every link + for _l in qmf.getObjects(_broker=self._brokers[0].qmf_broker, + _class="link"): + for _c in qmf.getObjects(_broker=self._brokers[0].qmf_broker, + _objectId=_l.connectionRef): + self.assertNotEqual(_c.msgsToClient, 0, "Messages did not pass over link as expected.") + + # cleanup + + self._brokers[0].client_session.exchange_unbind(queue="DestQ", exchange="fedX.direct", binding_key="spudboy") + self._brokers[0].client_session.message_cancel(destination="f1") + self._brokers[0].client_session.queue_delete(queue="DestQ") + + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + for _l in qmf.getObjects(_class="link"): + result = _l.close() + self.assertEqual(result.status, 0) + + for _b in self._brokers: + _b.client_session.exchange_delete(exchange="fedX.direct") + + self._teardown_brokers() + + self.verify_cleanup() + |