diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Bridge.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.cpp | 88 |
1 files changed, 18 insertions, 70 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index b00cdd00dc..405482da5d 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,8 +24,7 @@ #include "qpid/broker/Connection.h" #include "qpid/broker/Link.h" #include "qpid/broker/LinkRegistry.h" -#include "qpid/broker/NodeClone.h" -#include "qpid/broker/QueueReplicator.h" +#include "qpid/ha/WiringReplicator.h" #include "qpid/broker/SessionState.h" #include "qpid/management/ManagementAgent.h" @@ -59,9 +58,11 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame) } Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, - const _qmf::ArgsLinkBridge& _args) : + const _qmf::ArgsLinkBridge& _args, + InitializeCallback init) : link(_link), id(_id), args(_args), mgmtObject(0), - listener(l), name(Uuid(true).str()), queueName("bridge_queue_"), persistenceId(0) + listener(l), name(Uuid(true).str()), queueName("bridge_queue_"), persistenceId(0), + initialize(init) { std::stringstream title; title << id << "_" << link->getBroker()->getFederationTag(); @@ -77,9 +78,9 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest); } -Bridge::~Bridge() +Bridge::~Bridge() { - mgmtObject->resourceDestroy(); + mgmtObject->resourceDestroy(); } void Bridge::create(Connection& c) @@ -98,7 +99,7 @@ void Bridge::create(Connection& c) session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler)); peer.reset(new framing::AMQP_ServerProxy(*channelHandler)); - + session->attach(name, false); session->commandPoint(0,0); } else { @@ -108,60 +109,12 @@ void Bridge::create(Connection& c) } if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking(); - if (args.i_srcIsQueue) { - //TODO: something other than this which is nasty... - bool isReplicatingLink = QueueReplicator::initReplicationSettings(args.i_dest, link->getBroker()->getQueues(), options); - - peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, isReplicatingLink ? 1 : 0, false, "", 0, options); + if (initialize) initialize(*this, sessionHandler); + else if (args.i_srcIsQueue) { + peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options); peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest); - } else if (NodeClone::isNodeCloneDestination(args.i_dest)) { - //declare and bind an event queue - peer->getQueue().declare(queueName, "", false, false, true, true, FieldTable()); - peer->getExchange().bind(queueName, "qmf.default.topic", "agent.ind.event.org_apache_qpid_broker.#", FieldTable()); - //subscribe to the queue - peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); - peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); - peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); - - //issue a query request for queues and another for exchanges using event queue as the reply-to address - for (int i = 0; i < 2; ++i) {//TODO: cleanup this code into reusable utility functions - Variant::Map request; - request["_what"] = "OBJECT"; - Variant::Map schema; - schema["_class_name"] = (i == 0 ? "queue" : "exchange"); - schema["_package_name"] = "org.apache.qpid.broker"; - request["_schema_id"] = schema; - - AMQFrame method((MessageTransferBody(qpid::framing::ProtocolVersion(), "qmf.default.direct", 0, 0))); - method.setBof(true); - method.setEof(false); - method.setBos(true); - method.setEos(true); - AMQHeaderBody headerBody; - MessageProperties* props = headerBody.get<MessageProperties>(true); - props->setReplyTo(qpid::framing::ReplyTo("", queueName)); - props->setAppId("qmf2"); - props->getApplicationHeaders().setString("qmf.opcode", "_query_request"); - headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey("broker"); - AMQFrame header(headerBody); - header.setBof(false); - header.setEof(false); - header.setBos(true); - header.setEos(true); - AMQContentBody data; - qpid::amqp_0_10::MapCodec::encode(request, data.getData()); - AMQFrame content(data); - content.setBof(false); - content.setEof(true); - content.setBos(true); - content.setEos(true); - sessionHandler.out->handle(method); - sessionHandler.out->handle(header); - sessionHandler.out->handle(content); - } - } else { FieldTable queueSettings; @@ -236,11 +189,6 @@ void Bridge::setPersistenceId(uint64_t pId) const persistenceId = pId; } -const string& Bridge::getName() const -{ - return name; -} - Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) { string host; @@ -268,7 +216,7 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) is_queue, is_local, id, excludes, dynamic, sync).first; } -void Bridge::encode(Buffer& buffer) const +void Bridge::encode(Buffer& buffer) const { buffer.putShortString(string("bridge")); buffer.putShortString(link->getHost()); @@ -285,8 +233,8 @@ void Bridge::encode(Buffer& buffer) const buffer.putShort(args.i_sync); } -uint32_t Bridge::encodedSize() const -{ +uint32_t Bridge::encodedSize() const +{ return link->getHost().size() + 1 // short-string (host) + 7 // short-string ("bridge") + 2 // port @@ -311,7 +259,7 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, management::Args& /*args*/, string&) { - if (methodId == _qmf::Bridge::METHOD_CLOSE) { + if (methodId == _qmf::Bridge::METHOD_CLOSE) { //notify that we are closed destroy(); return management::Manageable::STATUS_OK; @@ -358,7 +306,7 @@ void Bridge::sendReorigin() conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this, queueName, args.i_src, args.i_key, bindArgs)); } -bool Bridge::resetProxy() +bool Bridge::resetProxy() { SessionHandler& sessionHandler = conn->getChannel(id); if (!sessionHandler.getSession()) peer.reset(); |