diff options
Diffstat (limited to 'cpp/src/qpid/broker/Bridge.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Bridge.cpp | 84 |
1 files changed, 76 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index 5064320efb..cc76cf7f21 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -34,6 +34,18 @@ using qpid::framing::Buffer; using qpid::management::ManagementAgent; namespace _qmf = qmf::org::apache::qpid::broker; +namespace +{ +const std::string qpidFedOp("qpid.fed.op"); +const std::string qpidFedTags("qpid.fed.tags"); +const std::string qpidFedOrigin("qpid.fed.origin"); + +const std::string fedOpBind("B"); +const std::string fedOpUnbind("U"); +const std::string fedOpReorigin("R"); +const std::string fedOpHello("H"); +} + namespace qpid { namespace broker { @@ -45,8 +57,9 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame) Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, const _qmf::ArgsLinkBridge& _args) : link(_link), id(_id), args(_args), mgmtObject(0), - listener(l), name(Uuid(true).str()), persistenceId(0) + listener(l), name(Uuid(true).str()), queueName("bridge_queue_"), persistenceId(0) { + queueName += name; ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0) { mgmtObject = new _qmf::Bridge @@ -65,7 +78,10 @@ Bridge::~Bridge() void Bridge::create(ConnectionState& c) { + connState = &c; if (args.i_srcIsLocal) { + if (args.i_dynamic) + throw Exception("Dynamic routing not supported for push routes"); // Point the bridging commands at the local connection handler Connection* conn = dynamic_cast<Connection*>(&c); if (conn == 0) @@ -74,7 +90,7 @@ void Bridge::create(ConnectionState& c) channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get())); } else { // Point the bridging commands at the remote peer broker - channelHandler.reset(new framing::ChannelHandler(id, &(c.getOutput()))); + channelHandler.reset(new framing::ChannelHandler(id, &(connState->getOutput()))); } session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler)); @@ -88,8 +104,6 @@ void Bridge::create(ConnectionState& c) peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); } else { - string queue = "bridge_queue_"; - queue += Uuid(true).str(); FieldTable queueSettings; if (args.i_tag.size()) { @@ -103,19 +117,26 @@ void Bridge::create(ConnectionState& c) if (args.i_excludes.size()) { queueSettings.setString("qpid.trace.exclude", args.i_excludes); } else { - const string& peerTag = c.getFederationPeerTag(); + const string& peerTag = connState->getFederationPeerTag(); if (peerTag.size()) queueSettings.setString("qpid.trace.exclude", peerTag); } bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues? bool autoDelete = !durable;//auto delete transient queues? - peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings); + peer->getQueue().declare(queueName, "", false, durable, true, autoDelete, queueSettings); if (!args.i_dynamic) - peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable()); - peer->getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable()); + peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable()); + peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + + if (args.i_dynamic) { + Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src); + if (exchange.get() == 0) + throw Exception("Exchange not found for dynamic route"); + exchange->registerDynamicBridge(this); + } } } @@ -123,6 +144,11 @@ void Bridge::cancel() { peer->getMessage().cancel(args.i_dest); peer->getSession().detach(name); + if (args.i_dynamic) { + Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src); + if (exchange.get() != 0) + exchange->removeDynamicBridge(this); + } } void Bridge::destroy() @@ -220,4 +246,46 @@ management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, } } +void Bridge::propagateBinding(const string& key, const string& tagList, + const string& op, const string& origin) +{ + const string& localTag = link->getBroker()->getFederationTag(); + const string& peerTag = connState->getFederationPeerTag(); + + if (tagList.find(peerTag) == tagList.npos) { + FieldTable bindArgs; + string newTagList(tagList + string(tagList.empty() ? "" : ",") + localTag); + + bindArgs.setString(qpidFedOp, op); + bindArgs.setString(qpidFedTags, newTagList); + if (origin.empty()) + bindArgs.setString(qpidFedOrigin, localTag); + else + bindArgs.setString(qpidFedOrigin, origin); + + peer->getExchange().bind(queueName, args.i_src, key, bindArgs); + } +} + +void Bridge::sendReorigin() +{ + FieldTable bindArgs; + + bindArgs.setString(qpidFedOp, fedOpReorigin); + bindArgs.setString(qpidFedTags, link->getBroker()->getFederationTag()); + + peer->getExchange().bind(queueName, args.i_src, args.i_key, bindArgs); +} + +bool Bridge::containsLocalTag(const string& tagList) const +{ + const string& localTag = link->getBroker()->getFederationTag(); + return (tagList.find(localTag) != tagList.npos); +} + +const string& Bridge::getLocalTag() const +{ + return link->getBroker()->getFederationTag(); +} + }} |