diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Bridge.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.cpp | 467 |
1 files changed, 467 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp new file mode 100644 index 0000000000..06d3a0dd52 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -0,0 +1,467 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Bridge.h" + +#include "qpid/broker/Broker.h" +#include "qpid/broker/FedOps.h" +#include "qpid/broker/amqp_0_10/Connection.h" +#include "qpid/broker/Link.h" +#include "qpid/broker/LinkRegistry.h" +#include "qpid/broker/SessionState.h" + +#include "qpid/management/ManagementAgent.h" +#include "qpid/types/Variant.h" +#include "qpid/amqp_0_10/Codecs.h" +#include "qpid/framing/Uuid.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/log/Statement.h" +#include <iostream> + +using qpid::framing::FieldTable; +using qpid::framing::Uuid; +using qpid::framing::Buffer; +using qpid::framing::AMQFrame; +using qpid::framing::AMQContentBody; +using qpid::framing::AMQHeaderBody; +using qpid::framing::MessageProperties; +using qpid::framing::MessageTransferBody; +using qpid::types::Variant; +using qpid::management::ManagementAgent; +using std::string; +namespace _qmf = qmf::org::apache::qpid::broker; + +namespace { +const std::string QPID_REPLICATE("qpid.replicate"); +const std::string NONE("none"); +const uint8_t EXPLICIT_ACK(0); // msg.accept required to be sent +const uint8_t IMPLIED_ACK(1); // msg.accept assumed, not sent +} + +namespace qpid { +namespace broker { + +void Bridge::PushHandler::handle(framing::AMQFrame& frame) +{ + conn->received(frame); +} + +Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id, + CancellationListener l, const _qmf::ArgsLinkBridge& _args, + InitializeCallback init, const std::string& _queueName, const string& ae) : + link(_link), channel(_id), args(_args), + listener(l), name(_name), + queueName(_queueName.empty() ? "qpid.bridge_queue_" + name + "_" + link->getBroker()->getFederationTag() + : _queueName), + altEx(ae), persistenceId(0), + conn(0), initialize(init), detached(false), + useExistingQueue(!_queueName.empty()), + sessionName("qpid.bridge_session_" + name + "_" + link->getBroker()->getFederationTag()) +{ + // If both acks (i_sync) and limited credit is configured, then we'd + // better be able to sync before running out of credit or we + // may stall (note: i_credit==0 means "unlimited") + if (args.i_credit && args.i_sync && args.i_sync > args.i_credit) + throw Exception("The credit value must be greater than configured sync (ack) interval."); + + ManagementAgent* agent = link->getBroker()->getManagementAgent(); + if (agent != 0) { + mgmtObject = _qmf::Bridge::shared_ptr(new _qmf::Bridge + (agent, this, link, name, args.i_durable, args.i_src, args.i_dest, + args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, + args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync, + args.i_credit)); + mgmtObject->set_channelId(channel); + agent->addObject(mgmtObject); + } + QPID_LOG(debug, "Bridge " << name << " created from " << args.i_src << " to " << args.i_dest); +} + +Bridge::~Bridge() +{ + mgmtObject->resourceDestroy(); +} + +void Bridge::create(amqp_0_10::Connection& c) +{ + detached = false; // Reset detached in case we are recovering. + conn = &c; + + SessionHandler& sessionHandler = c.getChannel(channel); + sessionHandler.setErrorListener(shared_from_this()); + if (args.i_srcIsLocal) { + if (args.i_dynamic) + throw Exception("Dynamic routing not supported for push routes"); + // Point the bridging commands at the local connection handler + pushHandler.reset(new PushHandler(&c)); + channelHandler.reset(new framing::ChannelHandler(channel, pushHandler.get())); + + session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler)); + peer.reset(new framing::AMQP_ServerProxy(*channelHandler)); + + session->attach(sessionName, false); + session->commandPoint(0,0); + } else { + sessionHandler.attachAs(sessionName); + // Point the bridging commands at the remote peer broker + peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out)); + } + + if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking(); + + if (initialize) { + initialize(*this, sessionHandler); // custom subscription initializer supplied + } else { + // will a temp queue be created for this bridge? + const bool temp_queue = !args.i_srcIsQueue && !useExistingQueue; + // UI convention: user specifies 0 for infinite credit + const uint32_t credit = (args.i_credit == 0) ? LinkRegistry::INFINITE_CREDIT : args.i_credit; + // use explicit acks only for non-temp queues, useless for temp queues since they are + // destroyed when the session drops (can't resend unacked msgs) + const uint8_t ack_mode = (args.i_sync && !temp_queue) ? EXPLICIT_ACK : IMPLIED_ACK; + + // configure command.sync frequency + FieldTable options; + uint32_t freq = 0; + if (ack_mode == EXPLICIT_ACK) { // user explicitly configured syncs + freq = uint32_t(args.i_sync); + } else if (credit && credit != LinkRegistry::INFINITE_CREDIT) { + // force occasional sync to keep from stalling due to lack of credit + freq = (credit + 1)/2; + } + if (freq) + options.setInt("qpid.sync_frequency", freq); + + // create a subscription on the remote + if (args.i_srcIsQueue) { + peer->getMessage().subscribe(args.i_src, args.i_dest, ack_mode, 0, false, "", 0, options); + peer->getMessage().flow(args.i_dest, 0, credit); // message credit + peer->getMessage().flow(args.i_dest, 1, LinkRegistry::INFINITE_CREDIT); // byte credit + QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest); + } else { + if (!useExistingQueue) { + FieldTable queueSettings; + + if (args.i_tag.size()) { + queueSettings.setString("qpid.trace.id", args.i_tag); + } else { + const string& peerTag = c.getFederationPeerTag(); + if (peerTag.size()) + queueSettings.setString("qpid.trace.id", peerTag); + } + + if (args.i_excludes.size()) { + queueSettings.setString("qpid.trace.exclude", args.i_excludes); + } else { + const string& localTag = link->getBroker()->getFederationTag(); + if (localTag.size()) + queueSettings.setString("qpid.trace.exclude", localTag); + } + + bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues? + bool exclusive = true; // only exclusive if the queue is owned by the bridge + bool autoDelete = exclusive && !durable;//auto delete transient queues? + peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings); + } + if (!args.i_dynamic) + peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable()); + peer->getMessage().subscribe(queueName, args.i_dest, ack_mode, 0, false, "", 0, options); + peer->getMessage().flow(args.i_dest, 0, credit); + peer->getMessage().flow(args.i_dest, 1, LinkRegistry::INFINITE_CREDIT); + if (args.i_dynamic) { + Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src); + if (exchange.get() == 0) + throw Exception("Exchange not found for dynamic route"); + exchange->registerDynamicBridge(this); + QPID_LOG(debug, "Activated bridge " << name << " for dynamic route for exchange " << args.i_src); + } else { + QPID_LOG(debug, "Activated bridge " << name << " for static route from exchange " << args.i_src << " to " << args.i_dest); + } + } + } + if (args.i_srcIsLocal) sessionHandler.getSession()->enableReceiverTracking(); +} + +void Bridge::cancel(amqp_0_10::Connection& c) +{ + // If &c != conn then we have failed over so the old connection is closed. + if (&c == conn && resetProxy()) { + peer->getMessage().cancel(args.i_dest); + peer->getSession().detach(sessionName); + } + QPID_LOG(debug, "Cancelled bridge " << name); +} + +/** Notify the bridge that the connection has closed */ +void Bridge::closed() +{ + if (args.i_dynamic) { + Exchange::shared_ptr exchange = link->getBroker()->getExchanges().find(args.i_src); + if (exchange.get()) exchange->removeDynamicBridge(this); + } + QPID_LOG(debug, "Closed bridge " << name); +} + +/** Shut down the bridge */ +void Bridge::close() +{ + listener(this); // ask the LinkRegistry to destroy us +} + +void Bridge::setPersistenceId(uint64_t pId) const +{ + persistenceId = pId; +} + + +const std::string Bridge::ENCODED_IDENTIFIER("bridge.v2"); +const std::string Bridge::ENCODED_IDENTIFIER_V1("bridge"); + +bool Bridge::isEncodedBridge(const std::string& key) +{ + return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1; +} + + +Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) +{ + string kind; + buffer.getShortString(kind); + + string host; + uint16_t port; + string src; + string dest; + string key; + string id; + string excludes; + string name; + + Link::shared_ptr link; + if (kind == ENCODED_IDENTIFIER_V1) { + /** previous versions identified the bridge by host:port, not by name, and + * transport wasn't provided. Try to find a link using those paramters. + */ + buffer.getShortString(host); + port = buffer.getShort(); + + link = links.getLink(host, port); + if (!link) { + QPID_LOG(error, "Bridge::decode() failed: cannot find Link for host=" << host << ", port=" << port); + return Bridge::shared_ptr(); + } + } else { + string linkName; + + buffer.getShortString(name); + buffer.getShortString(linkName); + link = links.getLink(linkName); + if (!link) { + QPID_LOG(error, "Bridge::decode() failed: cannot find Link named='" << linkName << "'"); + return Bridge::shared_ptr(); + } + } + + bool durable(buffer.getOctet()); + buffer.getShortString(src); + buffer.getShortString(dest); + buffer.getShortString(key); + bool is_queue(buffer.getOctet()); + bool is_local(buffer.getOctet()); + buffer.getShortString(id); + buffer.getShortString(excludes); + bool dynamic(buffer.getOctet()); + uint16_t sync = buffer.getShort(); + uint32_t credit = buffer.getLong(); + + if (kind == ENCODED_IDENTIFIER_V1) { + /** previous versions did not provide a name for the bridge, so create one + */ + name = createName(link->getName(), src, dest, key); + } + + return links.declare(name, *link, durable, src, dest, key, is_queue, + is_local, id, excludes, dynamic, sync, credit).first; +} + +void Bridge::encode(Buffer& buffer) const +{ + buffer.putShortString(ENCODED_IDENTIFIER); + buffer.putShortString(name); + buffer.putShortString(link->getName()); + buffer.putOctet(args.i_durable ? 1 : 0); + buffer.putShortString(args.i_src); + buffer.putShortString(args.i_dest); + buffer.putShortString(args.i_key); + buffer.putOctet(args.i_srcIsQueue ? 1 : 0); + buffer.putOctet(args.i_srcIsLocal ? 1 : 0); + buffer.putShortString(args.i_tag); + buffer.putShortString(args.i_excludes); + buffer.putOctet(args.i_dynamic ? 1 : 0); + buffer.putShort(args.i_sync); + buffer.putLong(args.i_credit); +} + +uint32_t Bridge::encodedSize() const +{ + return ENCODED_IDENTIFIER.size() + 1 // +1 byte length + + name.size() + 1 + + link->getName().size() + 1 + + 1 // durable + + args.i_src.size() + 1 + + args.i_dest.size() + 1 + + args.i_key.size() + 1 + + 1 // srcIsQueue + + 1 // srcIsLocal + + args.i_tag.size() + 1 + + args.i_excludes.size() + 1 + + 1 // dynamic + + 2 // sync + + 4; // credit +} + +management::ManagementObject::shared_ptr Bridge::GetManagementObject(void) const +{ + return mgmtObject; +} + +management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, + management::Args& /*args*/, + string&) +{ + if (methodId == _qmf::Bridge::METHOD_CLOSE) { + //notify that we are closed + QPID_LOG(debug, "Bridge::close() method called on bridge '" << name << "'"); + close(); + return management::Manageable::STATUS_OK; + } else { + return management::Manageable::STATUS_UNKNOWN_METHOD; + } +} + +void Bridge::propagateBinding(const string& key, const string& tagList, + const string& op, const string& origin, + qpid::framing::FieldTable* extra_args) +{ + const string& localTag = link->getBroker()->getFederationTag(); + const string& peerTag = conn->getFederationPeerTag(); + + if (tagList.find(peerTag) == tagList.npos) { + FieldTable bindArgs; + if (extra_args) { + for (qpid::framing::FieldTable::ValueMap::iterator i=extra_args->begin(); i != extra_args->end(); ++i) { + bindArgs.insert((*i)); + } + } + string newTagList(tagList + string(tagList.empty() ? "" : ",") + localTag); + + bindArgs.setString(QPID_REPLICATE, NONE); + bindArgs.setString(qpidFedOp, op); + bindArgs.setString(qpidFedTags, newTagList); + if (origin.empty()) + bindArgs.setString(qpidFedOrigin, localTag); + else + bindArgs.setString(qpidFedOrigin, origin); + + conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this, + queueName, args.i_src, key, bindArgs)); + } +} + +void Bridge::sendReorigin() +{ + FieldTable bindArgs; + + bindArgs.setString(qpidFedOp, fedOpReorigin); + bindArgs.setString(qpidFedTags, link->getBroker()->getFederationTag()); + + conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this, + queueName, args.i_src, args.i_key, bindArgs)); +} +bool Bridge::resetProxy() +{ + SessionHandler& sessionHandler = conn->getChannel(channel); + if (!sessionHandler.getSession()) peer.reset(); + else peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out)); + return peer.get(); +} + +void Bridge::ioThreadPropagateBinding(const string& queue, const string& exchange, const string& key, FieldTable args) +{ + if (resetProxy()) { + peer->getExchange().bind(queue, exchange, key, args); + } else { + // link's periodic maintenance visit will attempt to recover + } +} + +bool Bridge::containsLocalTag(const string& tagList) const +{ + const string& localTag = link->getBroker()->getFederationTag(); + return (tagList.find(localTag) != tagList.npos); +} + +const string& Bridge::getLocalTag() const +{ + return link->getBroker()->getFederationTag(); +} + +// SessionHandler::ErrorListener methods. +void Bridge::connectionException( + framing::connection::CloseCode code, const std::string& msg) +{ + if (errorListener) errorListener->connectionException(code, msg); +} + +void Bridge::channelException( + framing::session::DetachCode code, const std::string& msg) +{ + if (errorListener) errorListener->channelException(code, msg); +} + +void Bridge::executionException( + framing::execution::ErrorCode code, const std::string& msg) +{ + if (errorListener) errorListener->executionException(code, msg); +} + +void Bridge::incomingExecutionException( + framing::execution::ErrorCode code, const std::string& msg) +{ + if (errorListener) errorListener->incomingExecutionException(code, msg); +} + +void Bridge::detach() { + detached = true; + if (errorListener) errorListener->detach(); +} + +std::string Bridge::createName(const std::string& linkName, + const std::string& src, + const std::string& dest, + const std::string& key) +{ + std::stringstream keystream; + keystream << linkName << "!" << src << "!" << dest << "!" << key; + return keystream.str(); +} + +}} |