summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Bridge.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Bridge.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp467
1 files changed, 467 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
new file mode 100644
index 0000000000..06d3a0dd52
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -0,0 +1,467 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Bridge.h"
+
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/FedOps.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
+#include "qpid/broker/Link.h"
+#include "qpid/broker/LinkRegistry.h"
+#include "qpid/broker/SessionState.h"
+
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/types/Variant.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/log/Statement.h"
+#include <iostream>
+
+using qpid::framing::FieldTable;
+using qpid::framing::Uuid;
+using qpid::framing::Buffer;
+using qpid::framing::AMQFrame;
+using qpid::framing::AMQContentBody;
+using qpid::framing::AMQHeaderBody;
+using qpid::framing::MessageProperties;
+using qpid::framing::MessageTransferBody;
+using qpid::types::Variant;
+using qpid::management::ManagementAgent;
+using std::string;
+namespace _qmf = qmf::org::apache::qpid::broker;
+
+namespace {
+const std::string QPID_REPLICATE("qpid.replicate");
+const std::string NONE("none");
+const uint8_t EXPLICIT_ACK(0); // msg.accept required to be sent
+const uint8_t IMPLIED_ACK(1); // msg.accept assumed, not sent
+}
+
+namespace qpid {
+namespace broker {
+
+void Bridge::PushHandler::handle(framing::AMQFrame& frame)
+{
+ conn->received(frame);
+}
+
+Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id,
+ CancellationListener l, const _qmf::ArgsLinkBridge& _args,
+ InitializeCallback init, const std::string& _queueName, const string& ae) :
+ link(_link), channel(_id), args(_args),
+ listener(l), name(_name),
+ queueName(_queueName.empty() ? "qpid.bridge_queue_" + name + "_" + link->getBroker()->getFederationTag()
+ : _queueName),
+ altEx(ae), persistenceId(0),
+ conn(0), initialize(init), detached(false),
+ useExistingQueue(!_queueName.empty()),
+ sessionName("qpid.bridge_session_" + name + "_" + link->getBroker()->getFederationTag())
+{
+ // If both acks (i_sync) and limited credit is configured, then we'd
+ // better be able to sync before running out of credit or we
+ // may stall (note: i_credit==0 means "unlimited")
+ if (args.i_credit && args.i_sync && args.i_sync > args.i_credit)
+ throw Exception("The credit value must be greater than configured sync (ack) interval.");
+
+ ManagementAgent* agent = link->getBroker()->getManagementAgent();
+ if (agent != 0) {
+ mgmtObject = _qmf::Bridge::shared_ptr(new _qmf::Bridge
+ (agent, this, link, name, args.i_durable, args.i_src, args.i_dest,
+ args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
+ args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync,
+ args.i_credit));
+ mgmtObject->set_channelId(channel);
+ agent->addObject(mgmtObject);
+ }
+ QPID_LOG(debug, "Bridge " << name << " created from " << args.i_src << " to " << args.i_dest);
+}
+
+Bridge::~Bridge()
+{
+ mgmtObject->resourceDestroy();
+}
+
+void Bridge::create(amqp_0_10::Connection& c)
+{
+ detached = false; // Reset detached in case we are recovering.
+ conn = &c;
+
+ SessionHandler& sessionHandler = c.getChannel(channel);
+ sessionHandler.setErrorListener(shared_from_this());
+ if (args.i_srcIsLocal) {
+ if (args.i_dynamic)
+ throw Exception("Dynamic routing not supported for push routes");
+ // Point the bridging commands at the local connection handler
+ pushHandler.reset(new PushHandler(&c));
+ channelHandler.reset(new framing::ChannelHandler(channel, pushHandler.get()));
+
+ session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
+ peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
+
+ session->attach(sessionName, false);
+ session->commandPoint(0,0);
+ } else {
+ sessionHandler.attachAs(sessionName);
+ // Point the bridging commands at the remote peer broker
+ peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
+ }
+
+ if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking();
+
+ if (initialize) {
+ initialize(*this, sessionHandler); // custom subscription initializer supplied
+ } else {
+ // will a temp queue be created for this bridge?
+ const bool temp_queue = !args.i_srcIsQueue && !useExistingQueue;
+ // UI convention: user specifies 0 for infinite credit
+ const uint32_t credit = (args.i_credit == 0) ? LinkRegistry::INFINITE_CREDIT : args.i_credit;
+ // use explicit acks only for non-temp queues, useless for temp queues since they are
+ // destroyed when the session drops (can't resend unacked msgs)
+ const uint8_t ack_mode = (args.i_sync && !temp_queue) ? EXPLICIT_ACK : IMPLIED_ACK;
+
+ // configure command.sync frequency
+ FieldTable options;
+ uint32_t freq = 0;
+ if (ack_mode == EXPLICIT_ACK) { // user explicitly configured syncs
+ freq = uint32_t(args.i_sync);
+ } else if (credit && credit != LinkRegistry::INFINITE_CREDIT) {
+ // force occasional sync to keep from stalling due to lack of credit
+ freq = (credit + 1)/2;
+ }
+ if (freq)
+ options.setInt("qpid.sync_frequency", freq);
+
+ // create a subscription on the remote
+ if (args.i_srcIsQueue) {
+ peer->getMessage().subscribe(args.i_src, args.i_dest, ack_mode, 0, false, "", 0, options);
+ peer->getMessage().flow(args.i_dest, 0, credit); // message credit
+ peer->getMessage().flow(args.i_dest, 1, LinkRegistry::INFINITE_CREDIT); // byte credit
+ QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest);
+ } else {
+ if (!useExistingQueue) {
+ FieldTable queueSettings;
+
+ if (args.i_tag.size()) {
+ queueSettings.setString("qpid.trace.id", args.i_tag);
+ } else {
+ const string& peerTag = c.getFederationPeerTag();
+ if (peerTag.size())
+ queueSettings.setString("qpid.trace.id", peerTag);
+ }
+
+ if (args.i_excludes.size()) {
+ queueSettings.setString("qpid.trace.exclude", args.i_excludes);
+ } else {
+ const string& localTag = link->getBroker()->getFederationTag();
+ if (localTag.size())
+ queueSettings.setString("qpid.trace.exclude", localTag);
+ }
+
+ bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues?
+ bool exclusive = true; // only exclusive if the queue is owned by the bridge
+ bool autoDelete = exclusive && !durable;//auto delete transient queues?
+ peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings);
+ }
+ if (!args.i_dynamic)
+ peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable());
+ peer->getMessage().subscribe(queueName, args.i_dest, ack_mode, 0, false, "", 0, options);
+ peer->getMessage().flow(args.i_dest, 0, credit);
+ peer->getMessage().flow(args.i_dest, 1, LinkRegistry::INFINITE_CREDIT);
+ if (args.i_dynamic) {
+ Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src);
+ if (exchange.get() == 0)
+ throw Exception("Exchange not found for dynamic route");
+ exchange->registerDynamicBridge(this);
+ QPID_LOG(debug, "Activated bridge " << name << " for dynamic route for exchange " << args.i_src);
+ } else {
+ QPID_LOG(debug, "Activated bridge " << name << " for static route from exchange " << args.i_src << " to " << args.i_dest);
+ }
+ }
+ }
+ if (args.i_srcIsLocal) sessionHandler.getSession()->enableReceiverTracking();
+}
+
+void Bridge::cancel(amqp_0_10::Connection& c)
+{
+ // If &c != conn then we have failed over so the old connection is closed.
+ if (&c == conn && resetProxy()) {
+ peer->getMessage().cancel(args.i_dest);
+ peer->getSession().detach(sessionName);
+ }
+ QPID_LOG(debug, "Cancelled bridge " << name);
+}
+
+/** Notify the bridge that the connection has closed */
+void Bridge::closed()
+{
+ if (args.i_dynamic) {
+ Exchange::shared_ptr exchange = link->getBroker()->getExchanges().find(args.i_src);
+ if (exchange.get()) exchange->removeDynamicBridge(this);
+ }
+ QPID_LOG(debug, "Closed bridge " << name);
+}
+
+/** Shut down the bridge */
+void Bridge::close()
+{
+ listener(this); // ask the LinkRegistry to destroy us
+}
+
+void Bridge::setPersistenceId(uint64_t pId) const
+{
+ persistenceId = pId;
+}
+
+
+const std::string Bridge::ENCODED_IDENTIFIER("bridge.v2");
+const std::string Bridge::ENCODED_IDENTIFIER_V1("bridge");
+
+bool Bridge::isEncodedBridge(const std::string& key)
+{
+ return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1;
+}
+
+
+Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
+{
+ string kind;
+ buffer.getShortString(kind);
+
+ string host;
+ uint16_t port;
+ string src;
+ string dest;
+ string key;
+ string id;
+ string excludes;
+ string name;
+
+ Link::shared_ptr link;
+ if (kind == ENCODED_IDENTIFIER_V1) {
+ /** previous versions identified the bridge by host:port, not by name, and
+ * transport wasn't provided. Try to find a link using those paramters.
+ */
+ buffer.getShortString(host);
+ port = buffer.getShort();
+
+ link = links.getLink(host, port);
+ if (!link) {
+ QPID_LOG(error, "Bridge::decode() failed: cannot find Link for host=" << host << ", port=" << port);
+ return Bridge::shared_ptr();
+ }
+ } else {
+ string linkName;
+
+ buffer.getShortString(name);
+ buffer.getShortString(linkName);
+ link = links.getLink(linkName);
+ if (!link) {
+ QPID_LOG(error, "Bridge::decode() failed: cannot find Link named='" << linkName << "'");
+ return Bridge::shared_ptr();
+ }
+ }
+
+ bool durable(buffer.getOctet());
+ buffer.getShortString(src);
+ buffer.getShortString(dest);
+ buffer.getShortString(key);
+ bool is_queue(buffer.getOctet());
+ bool is_local(buffer.getOctet());
+ buffer.getShortString(id);
+ buffer.getShortString(excludes);
+ bool dynamic(buffer.getOctet());
+ uint16_t sync = buffer.getShort();
+ uint32_t credit = buffer.getLong();
+
+ if (kind == ENCODED_IDENTIFIER_V1) {
+ /** previous versions did not provide a name for the bridge, so create one
+ */
+ name = createName(link->getName(), src, dest, key);
+ }
+
+ return links.declare(name, *link, durable, src, dest, key, is_queue,
+ is_local, id, excludes, dynamic, sync, credit).first;
+}
+
+void Bridge::encode(Buffer& buffer) const
+{
+ buffer.putShortString(ENCODED_IDENTIFIER);
+ buffer.putShortString(name);
+ buffer.putShortString(link->getName());
+ buffer.putOctet(args.i_durable ? 1 : 0);
+ buffer.putShortString(args.i_src);
+ buffer.putShortString(args.i_dest);
+ buffer.putShortString(args.i_key);
+ buffer.putOctet(args.i_srcIsQueue ? 1 : 0);
+ buffer.putOctet(args.i_srcIsLocal ? 1 : 0);
+ buffer.putShortString(args.i_tag);
+ buffer.putShortString(args.i_excludes);
+ buffer.putOctet(args.i_dynamic ? 1 : 0);
+ buffer.putShort(args.i_sync);
+ buffer.putLong(args.i_credit);
+}
+
+uint32_t Bridge::encodedSize() const
+{
+ return ENCODED_IDENTIFIER.size() + 1 // +1 byte length
+ + name.size() + 1
+ + link->getName().size() + 1
+ + 1 // durable
+ + args.i_src.size() + 1
+ + args.i_dest.size() + 1
+ + args.i_key.size() + 1
+ + 1 // srcIsQueue
+ + 1 // srcIsLocal
+ + args.i_tag.size() + 1
+ + args.i_excludes.size() + 1
+ + 1 // dynamic
+ + 2 // sync
+ + 4; // credit
+}
+
+management::ManagementObject::shared_ptr Bridge::GetManagementObject(void) const
+{
+ return mgmtObject;
+}
+
+management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId,
+ management::Args& /*args*/,
+ string&)
+{
+ if (methodId == _qmf::Bridge::METHOD_CLOSE) {
+ //notify that we are closed
+ QPID_LOG(debug, "Bridge::close() method called on bridge '" << name << "'");
+ close();
+ return management::Manageable::STATUS_OK;
+ } else {
+ return management::Manageable::STATUS_UNKNOWN_METHOD;
+ }
+}
+
+void Bridge::propagateBinding(const string& key, const string& tagList,
+ const string& op, const string& origin,
+ qpid::framing::FieldTable* extra_args)
+{
+ const string& localTag = link->getBroker()->getFederationTag();
+ const string& peerTag = conn->getFederationPeerTag();
+
+ if (tagList.find(peerTag) == tagList.npos) {
+ FieldTable bindArgs;
+ if (extra_args) {
+ for (qpid::framing::FieldTable::ValueMap::iterator i=extra_args->begin(); i != extra_args->end(); ++i) {
+ bindArgs.insert((*i));
+ }
+ }
+ string newTagList(tagList + string(tagList.empty() ? "" : ",") + localTag);
+
+ bindArgs.setString(QPID_REPLICATE, NONE);
+ bindArgs.setString(qpidFedOp, op);
+ bindArgs.setString(qpidFedTags, newTagList);
+ if (origin.empty())
+ bindArgs.setString(qpidFedOrigin, localTag);
+ else
+ bindArgs.setString(qpidFedOrigin, origin);
+
+ conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this,
+ queueName, args.i_src, key, bindArgs));
+ }
+}
+
+void Bridge::sendReorigin()
+{
+ FieldTable bindArgs;
+
+ bindArgs.setString(qpidFedOp, fedOpReorigin);
+ bindArgs.setString(qpidFedTags, link->getBroker()->getFederationTag());
+
+ conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this,
+ queueName, args.i_src, args.i_key, bindArgs));
+}
+bool Bridge::resetProxy()
+{
+ SessionHandler& sessionHandler = conn->getChannel(channel);
+ if (!sessionHandler.getSession()) peer.reset();
+ else peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
+ return peer.get();
+}
+
+void Bridge::ioThreadPropagateBinding(const string& queue, const string& exchange, const string& key, FieldTable args)
+{
+ if (resetProxy()) {
+ peer->getExchange().bind(queue, exchange, key, args);
+ } else {
+ // link's periodic maintenance visit will attempt to recover
+ }
+}
+
+bool Bridge::containsLocalTag(const string& tagList) const
+{
+ const string& localTag = link->getBroker()->getFederationTag();
+ return (tagList.find(localTag) != tagList.npos);
+}
+
+const string& Bridge::getLocalTag() const
+{
+ return link->getBroker()->getFederationTag();
+}
+
+// SessionHandler::ErrorListener methods.
+void Bridge::connectionException(
+ framing::connection::CloseCode code, const std::string& msg)
+{
+ if (errorListener) errorListener->connectionException(code, msg);
+}
+
+void Bridge::channelException(
+ framing::session::DetachCode code, const std::string& msg)
+{
+ if (errorListener) errorListener->channelException(code, msg);
+}
+
+void Bridge::executionException(
+ framing::execution::ErrorCode code, const std::string& msg)
+{
+ if (errorListener) errorListener->executionException(code, msg);
+}
+
+void Bridge::incomingExecutionException(
+ framing::execution::ErrorCode code, const std::string& msg)
+{
+ if (errorListener) errorListener->incomingExecutionException(code, msg);
+}
+
+void Bridge::detach() {
+ detached = true;
+ if (errorListener) errorListener->detach();
+}
+
+std::string Bridge::createName(const std::string& linkName,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key)
+{
+ std::stringstream keystream;
+ keystream << linkName << "!" << src << "!" << dest << "!" << key;
+ return keystream.str();
+}
+
+}}