summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/federation.py
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2012-06-18 22:36:51 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2012-06-18 22:36:51 +0000
commitc0a9cdb63a9e1b58bfed14dd6a3f7d8c6375e5af (patch)
tree8c11f27f49af76c55ff188a3a822302c801b62cf /qpid/cpp/src/tests/federation.py
parent5fbacc774744500e604d58d8904e1c3f8f09578a (diff)
downloadqpid-python-c0a9cdb63a9e1b58bfed14dd6a3f7d8c6375e5af.tar.gz
QPID-4063: allow configuration of source queue for exchange or dynamic routes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1351518 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/federation.py')
-rwxr-xr-xqpid/cpp/src/tests/federation.py165
1 files changed, 164 insertions, 1 deletions
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py
index d3dfaedaf9..fe037c720d 100755
--- a/qpid/cpp/src/tests/federation.py
+++ b/qpid/cpp/src/tests/federation.py
@@ -2328,6 +2328,7 @@ class FederationTests(TestBase010):
self.verify_cleanup()
+
def test_multilink_shared_queue(self):
""" Verify that two distinct links can be created between federated
brokers.
@@ -2441,4 +2442,166 @@ class FederationTests(TestBase010):
self.verify_cleanup()
-
+ def test_dynamic_direct_shared_queue(self):
+ """
+ Route Topology:
+
+ +<--- B1
+ B0 <---+<--- B2
+ +<--- B3
+ """
+ session = self.session
+
+ # create the federation
+
+ self.startQmf()
+ qmf = self.qmf
+
+ self._setup_brokers()
+
+ # create direct exchange on each broker, and retrieve the corresponding
+ # management object for that exchange
+
+ exchanges=[]
+ for _b in self._brokers:
+ _b.client_session.exchange_declare(exchange="fedX.direct", type="direct")
+ self.assertEqual(_b.client_session.exchange_query(name="fedX.direct").type,
+ "direct", "exchange_declare failed!")
+ # pull the exchange out of qmf...
+ retries = 0
+ my_exchange = None
+ while my_exchange is None:
+ objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange")
+ for ooo in objs:
+ if ooo.name == "fedX.direct":
+ my_exchange = ooo
+ break
+ if my_exchange is None:
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "QMF failed to find new exchange!")
+ sleep(1)
+ exchanges.append(my_exchange)
+
+ self.assertEqual(len(exchanges), len(self._brokers), "Exchange creation failed!")
+
+ # Create 2 links per each source broker (1,2,3) to the downstream
+ # broker 0:
+ for _b in range(1,4):
+ for _l in ["dynamic", "queue"]:
+ result = self._brokers[0].qmf_object.create( "link",
+ "Link-%d-%s" % (_b, _l),
+ {"host":self._brokers[_b].host,
+ "port":self._brokers[_b].port}, False)
+ self.assertEqual(result.status, 0)
+
+ # create queue on source brokers for use by the dynamic route
+ self._brokers[_b].client_session.queue_declare(queue="fedSrcQ", exclusive=False, auto_delete=True)
+
+ for _l in range(1,4):
+ # for each dynamic link, create a dynamic bridge for the "fedX.direct"
+ # exchanges, using the fedSrcQ on each upstream source broker
+ result = self._brokers[0].qmf_object.create("bridge",
+ "Bridge-%d-dynamic" % _l,
+ {"link":"Link-%d-dynamic" % _l,
+ "src":"fedX.direct",
+ "dest":"fedX.direct",
+ "dynamic":True,
+ "queue":"fedSrcQ"}, False)
+ self.assertEqual(result.status, 0)
+
+ # create a queue route that shares the queue used by the dynamic route
+ result = self._brokers[0].qmf_object.create("bridge",
+ "Bridge-%d-queue" % _l,
+ {"link":"Link-%d-queue" % _l,
+ "src":"fedSrcQ",
+ "dest":"fedX.direct",
+ "srcIsQueue":True}, False)
+ self.assertEqual(result.status, 0)
+
+
+ # wait for the inter-broker links to become operational
+ retries = 0
+ operational = False
+ while not operational:
+ operational = True
+ for _l in qmf.getObjects(_class="link"):
+ #print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state)))
+ if _l.state != "Operational":
+ operational = False
+ if not operational:
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "inter-broker links failed to become operational.")
+ sleep(1)
+
+ # @todo - There is no way to determine when the bridge objects become
+ # active. Hopefully, this is long enough!
+ sleep(6)
+
+ # create a queue on B0, bound to "spudboy"
+ self._brokers[0].client_session.queue_declare(queue="DestQ", exclusive=True, auto_delete=True)
+ self._brokers[0].client_session.exchange_bind(queue="DestQ", exchange="fedX.direct", binding_key="spudboy")
+
+ # subscribe to messages arriving on B2's queue
+ self.subscribe(self._brokers[0].client_session, queue="DestQ", destination="f1")
+ queue = self._brokers[0].client_session.incoming("f1")
+
+ # wait until the binding key has propagated to each broker
+
+ binding_counts = [1, 1, 1, 1]
+ self.assertEqual(len(binding_counts), len(exchanges), "Update Test!")
+ for i in range(3,-1,-1):
+ retries = 0
+ exchanges[i].update()
+ while exchanges[i].bindingCount < binding_counts[i]:
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "binding failed to propagate to broker %d"
+ % i)
+ sleep(3)
+ exchanges[i].update()
+
+ for _b in range(1,4):
+ # send 3 msgs from each source broker
+ for i in range(3):
+ dp = self._brokers[_b].client_session.delivery_properties(routing_key="spudboy")
+ self._brokers[_b].client_session.message_transfer(destination="fedX.direct", message=Message(dp, "Message_drp %d" % i))
+
+ # get exactly 9 (3 per broker) on B0
+ for i in range(9):
+ msg = queue.get(timeout=5)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.body)
+ except Empty: None
+
+ # verify that messages went across every link
+ for _l in qmf.getObjects(_broker=self._brokers[0].qmf_broker,
+ _class="link"):
+ for _c in qmf.getObjects(_broker=self._brokers[0].qmf_broker,
+ _objectId=_l.connectionRef):
+ self.assertNotEqual(_c.msgsToClient, 0, "Messages did not pass over link as expected.")
+
+ # cleanup
+
+ self._brokers[0].client_session.exchange_unbind(queue="DestQ", exchange="fedX.direct", binding_key="spudboy")
+ self._brokers[0].client_session.message_cancel(destination="f1")
+ self._brokers[0].client_session.queue_delete(queue="DestQ")
+
+ for _b in qmf.getObjects(_class="bridge"):
+ result = _b.close()
+ self.assertEqual(result.status, 0)
+
+ for _l in qmf.getObjects(_class="link"):
+ result = _l.close()
+ self.assertEqual(result.status, 0)
+
+ for _b in self._brokers:
+ _b.client_session.exchange_delete(exchange="fedX.direct")
+
+ self._teardown_brokers()
+
+ self.verify_cleanup()
+