diff options
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.cpp | 24 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 6 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/federation.py | 165 |
4 files changed, 187 insertions, 12 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 60d8263a76..63325a1e91 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -59,13 +59,16 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame) Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id, CancellationListener l, const _qmf::ArgsLinkBridge& _args, - InitializeCallback init, const string& qn, const string& ae) : + InitializeCallback init, const std::string& _queueName, const string& ae) : link(_link), channel(_id), args(_args), mgmtObject(0), - listener(l), name(_name), queueName(qn), altEx(ae), persistenceId(0), - connState(0), conn(0), initialize(init), detached(false) + listener(l), name(_name), + queueName(_queueName.empty() ? "qpid.bridge_queue_" + name + "_" + link->getBroker()->getFederationTag() + : _queueName), + altEx(ae), persistenceId(0), + connState(0), conn(0), initialize(init), detached(false), + useExistingQueue(!_queueName.empty()), + sessionName("qpid.bridge_session_" + name + "_" + link->getBroker()->getFederationTag()) { - if (queueName.empty()) - queueName = "qpid.bridge_queue_" + Uuid(true).str(); ManagementAgent* agent = link->getBroker()->getManagementAgent(); if (agent != 0) { mgmtObject = new _qmf::Bridge @@ -102,10 +105,10 @@ void Bridge::create(Connection& c) session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler)); peer.reset(new framing::AMQP_ServerProxy(*channelHandler)); - session->attach(queueName, false); + session->attach(sessionName, false); session->commandPoint(0,0); } else { - sessionHandler.attachAs(queueName); + sessionHandler.attachAs(sessionName); // Point the bridging commands at the remote peer broker peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out)); } @@ -137,8 +140,9 @@ void Bridge::create(Connection& c) } bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues? - bool autoDelete = !durable;//auto delete transient queues? - peer->getQueue().declare(queueName, altEx, false, durable, true, autoDelete, queueSettings); + bool exclusive = !useExistingQueue; // only exclusive if the queue is owned by the bridge + bool autoDelete = exclusive && !durable;//auto delete transient queues? + peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings); if (!args.i_dynamic) peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable()); peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); @@ -162,7 +166,7 @@ void Bridge::cancel(Connection&) { if (resetProxy()) { peer->getMessage().cancel(args.i_dest); - peer->getSession().detach(queueName); + peer->getSession().detach(sessionName); } QPID_LOG(debug, "Cancelled bridge " << name); } diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h index 8d6ed03faf..662590aa45 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.h +++ b/qpid/cpp/src/qpid/broker/Bridge.h @@ -58,6 +58,7 @@ class Bridge : public PersistableConfig, Bridge(const std::string& name, Link* link, framing::ChannelId id, CancellationListener l, const qmf::org::apache::qpid::broker::ArgsLinkBridge& args, + InitializeCallback init, const std::string& queueName="", const std::string& altExchange="" ); @@ -146,6 +147,9 @@ class Bridge : public PersistableConfig, void closed(); friend class Link; // to call create, cancel, closed() boost::shared_ptr<ErrorListener> errorListener; + + const bool useExistingQueue; + const std::string sessionName; }; diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 77c578bd37..c202d9c4e8 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -797,6 +797,7 @@ void Broker::createObject(const std::string& type, const std::string& name, std::string key; std::string id; std::string excludes; + std::string queueName; bool durable = false; bool srcIsQueue = false; bool srcIsLocal = false; @@ -816,6 +817,7 @@ void Broker::createObject(const std::string& type, const std::string& name, else if (i->first == DYNAMIC) dynamic = bool(i->second); else if (i->first == SYNC) sync = i->second.asUint16(); else if (i->first == DURABLE) durable = bool(i->second); + else if (i->first == QUEUE_NAME) queueName = i->second.asString(); else { // TODO: strict checking here } @@ -828,7 +830,9 @@ void Broker::createObject(const std::string& type, const std::string& name, } std::pair<Bridge::shared_ptr, bool> rc = links.declare(name, *link, durable, src, dest, key, srcIsQueue, srcIsLocal, id, excludes, - dynamic, sync); + dynamic, sync, + 0, + queueName); if (!rc.first) { QPID_LOG (error, "Failed to create Bridge object, name=" << name << " link=" << linkName << diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py index d3dfaedaf9..fe037c720d 100755 --- a/qpid/cpp/src/tests/federation.py +++ b/qpid/cpp/src/tests/federation.py @@ -2328,6 +2328,7 @@ class FederationTests(TestBase010): self.verify_cleanup() + def test_multilink_shared_queue(self): """ Verify that two distinct links can be created between federated brokers. @@ -2441,4 +2442,166 @@ class FederationTests(TestBase010): self.verify_cleanup() - + def test_dynamic_direct_shared_queue(self): + """ + Route Topology: + + +<--- B1 + B0 <---+<--- B2 + +<--- B3 + """ + session = self.session + + # create the federation + + self.startQmf() + qmf = self.qmf + + self._setup_brokers() + + # create direct exchange on each broker, and retrieve the corresponding + # management object for that exchange + + exchanges=[] + for _b in self._brokers: + _b.client_session.exchange_declare(exchange="fedX.direct", type="direct") + self.assertEqual(_b.client_session.exchange_query(name="fedX.direct").type, + "direct", "exchange_declare failed!") + # pull the exchange out of qmf... + retries = 0 + my_exchange = None + while my_exchange is None: + objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange") + for ooo in objs: + if ooo.name == "fedX.direct": + my_exchange = ooo + break + if my_exchange is None: + retries += 1 + self.failIfEqual(retries, 10, + "QMF failed to find new exchange!") + sleep(1) + exchanges.append(my_exchange) + + self.assertEqual(len(exchanges), len(self._brokers), "Exchange creation failed!") + + # Create 2 links per each source broker (1,2,3) to the downstream + # broker 0: + for _b in range(1,4): + for _l in ["dynamic", "queue"]: + result = self._brokers[0].qmf_object.create( "link", + "Link-%d-%s" % (_b, _l), + {"host":self._brokers[_b].host, + "port":self._brokers[_b].port}, False) + self.assertEqual(result.status, 0) + + # create queue on source brokers for use by the dynamic route + self._brokers[_b].client_session.queue_declare(queue="fedSrcQ", exclusive=False, auto_delete=True) + + for _l in range(1,4): + # for each dynamic link, create a dynamic bridge for the "fedX.direct" + # exchanges, using the fedSrcQ on each upstream source broker + result = self._brokers[0].qmf_object.create("bridge", + "Bridge-%d-dynamic" % _l, + {"link":"Link-%d-dynamic" % _l, + "src":"fedX.direct", + "dest":"fedX.direct", + "dynamic":True, + "queue":"fedSrcQ"}, False) + self.assertEqual(result.status, 0) + + # create a queue route that shares the queue used by the dynamic route + result = self._brokers[0].qmf_object.create("bridge", + "Bridge-%d-queue" % _l, + {"link":"Link-%d-queue" % _l, + "src":"fedSrcQ", + "dest":"fedX.direct", + "srcIsQueue":True}, False) + self.assertEqual(result.status, 0) + + + # wait for the inter-broker links to become operational + retries = 0 + operational = False + while not operational: + operational = True + for _l in qmf.getObjects(_class="link"): + #print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state))) + if _l.state != "Operational": + operational = False + if not operational: + retries += 1 + self.failIfEqual(retries, 10, + "inter-broker links failed to become operational.") + sleep(1) + + # @todo - There is no way to determine when the bridge objects become + # active. Hopefully, this is long enough! + sleep(6) + + # create a queue on B0, bound to "spudboy" + self._brokers[0].client_session.queue_declare(queue="DestQ", exclusive=True, auto_delete=True) + self._brokers[0].client_session.exchange_bind(queue="DestQ", exchange="fedX.direct", binding_key="spudboy") + + # subscribe to messages arriving on B2's queue + self.subscribe(self._brokers[0].client_session, queue="DestQ", destination="f1") + queue = self._brokers[0].client_session.incoming("f1") + + # wait until the binding key has propagated to each broker + + binding_counts = [1, 1, 1, 1] + self.assertEqual(len(binding_counts), len(exchanges), "Update Test!") + for i in range(3,-1,-1): + retries = 0 + exchanges[i].update() + while exchanges[i].bindingCount < binding_counts[i]: + retries += 1 + self.failIfEqual(retries, 10, + "binding failed to propagate to broker %d" + % i) + sleep(3) + exchanges[i].update() + + for _b in range(1,4): + # send 3 msgs from each source broker + for i in range(3): + dp = self._brokers[_b].client_session.delivery_properties(routing_key="spudboy") + self._brokers[_b].client_session.message_transfer(destination="fedX.direct", message=Message(dp, "Message_drp %d" % i)) + + # get exactly 9 (3 per broker) on B0 + for i in range(9): + msg = queue.get(timeout=5) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + # verify that messages went across every link + for _l in qmf.getObjects(_broker=self._brokers[0].qmf_broker, + _class="link"): + for _c in qmf.getObjects(_broker=self._brokers[0].qmf_broker, + _objectId=_l.connectionRef): + self.assertNotEqual(_c.msgsToClient, 0, "Messages did not pass over link as expected.") + + # cleanup + + self._brokers[0].client_session.exchange_unbind(queue="DestQ", exchange="fedX.direct", binding_key="spudboy") + self._brokers[0].client_session.message_cancel(destination="f1") + self._brokers[0].client_session.queue_delete(queue="DestQ") + + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + for _l in qmf.getObjects(_class="link"): + result = _l.close() + self.assertEqual(result.status, 0) + + for _b in self._brokers: + _b.client_session.exchange_delete(exchange="fedX.direct") + + self._teardown_brokers() + + self.verify_cleanup() + |