summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Bridge.cpp
diff options
context:
space:
mode:
authorKen Giusti <kgiusti@apache.org>2013-04-11 21:47:40 +0000
committerKen Giusti <kgiusti@apache.org>2013-04-11 21:47:40 +0000
commit8b47505e4e783167cd666b7e6f8aca547c9a96e1 (patch)
tree1e42ea111b6cf729d046b7650eaad73f579fd91b /cpp/src/qpid/broker/Bridge.cpp
parentedde6c21a5749a54964b6fa69e8f9bbfb940fc21 (diff)
downloadqpid-python-8b47505e4e783167cd666b7e6f8aca547c9a96e1.tar.gz
QPID-4728: add 'credit' parameter to Federation Bridge configuration.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1467107 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Bridge.cpp')
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp127
1 files changed, 81 insertions, 46 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index 68bdf6d474..ae83e4415b 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -54,6 +54,8 @@ namespace _qmf = qmf::org::apache::qpid::broker;
namespace {
const std::string QPID_REPLICATE("qpid.replicate");
const std::string NONE("none");
+const uint8_t EXPLICIT_ACK(0); // msg.accept required to be sent
+const uint8_t IMPLIED_ACK(1); // msg.accept assumed, not sent
}
namespace qpid {
@@ -76,12 +78,19 @@ Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id,
useExistingQueue(!_queueName.empty()),
sessionName("qpid.bridge_session_" + name + "_" + link->getBroker()->getFederationTag())
{
+ // If both acks (i_sync) and limited credit is configured, then we'd
+ // better be able to sync before running out of credit or we
+ // may stall (note: i_credit==0 means "unlimited")
+ if (args.i_credit && args.i_sync && args.i_sync > args.i_credit)
+ throw Exception("The credit value must be greater than configured sync (ack) interval.");
+
ManagementAgent* agent = link->getBroker()->getManagementAgent();
if (agent != 0) {
mgmtObject = _qmf::Bridge::shared_ptr(new _qmf::Bridge
(agent, this, link, name, args.i_durable, args.i_src, args.i_dest,
args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
- args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync));
+ args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync,
+ args.i_credit));
mgmtObject->set_channelId(channel);
agent->addObject(mgmtObject);
}
@@ -98,8 +107,7 @@ void Bridge::create(Connection& c)
detached = false; // Reset detached in case we are recovering.
connState = &c;
conn = &c;
- FieldTable options;
- if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync);
+
SessionHandler& sessionHandler = c.getChannel(channel);
sessionHandler.setErrorListener(shared_from_this());
if (args.i_srcIsLocal) {
@@ -121,51 +129,75 @@ void Bridge::create(Connection& c)
}
if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking();
- if (initialize) initialize(*this, sessionHandler);
- else if (args.i_srcIsQueue) {
- peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options);
- peer->getMessage().flow(args.i_dest, 0, args.i_sync ? 2 * args.i_sync : 0xFFFFFFFF);
- peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
- QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest);
- } else {
- if (!useExistingQueue) {
- FieldTable queueSettings;
- if (args.i_tag.size()) {
- queueSettings.setString("qpid.trace.id", args.i_tag);
- } else {
- const string& peerTag = c.getFederationPeerTag();
- if (peerTag.size())
- queueSettings.setString("qpid.trace.id", peerTag);
+ if (initialize) {
+ initialize(*this, sessionHandler); // custom subscription initializer supplied
+ } else {
+ // will a temp queue be created for this bridge?
+ const bool temp_queue = !args.i_srcIsQueue && !useExistingQueue;
+ // UI convention: user specifies 0 for infinite credit
+ const uint32_t credit = (args.i_credit == 0) ? LinkRegistry::INFINITE_CREDIT : args.i_credit;
+ // use explicit acks only for non-temp queues, useless for temp queues since they are
+ // destroyed when the session drops (can't resend unacked msgs)
+ const uint8_t ack_mode = (args.i_sync && !temp_queue) ? EXPLICIT_ACK : IMPLIED_ACK;
+
+ // configure command.sync frequency
+ FieldTable options;
+ uint32_t freq = 0;
+ if (ack_mode == EXPLICIT_ACK) { // user explicitly configured syncs
+ freq = uint32_t(args.i_sync);
+ } else if (credit && credit != LinkRegistry::INFINITE_CREDIT) {
+ // force occasional sync to keep from stalling due to lack of credit
+ freq = (credit + 1)/2;
+ }
+ if (freq)
+ options.setInt("qpid.sync_frequency", freq);
+
+ // create a subscription on the remote
+ if (args.i_srcIsQueue) {
+ peer->getMessage().subscribe(args.i_src, args.i_dest, ack_mode, 0, false, "", 0, options);
+ peer->getMessage().flow(args.i_dest, 0, credit); // message credit
+ peer->getMessage().flow(args.i_dest, 1, LinkRegistry::INFINITE_CREDIT); // byte credit
+ QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest);
+ } else {
+ if (!useExistingQueue) {
+ FieldTable queueSettings;
+
+ if (args.i_tag.size()) {
+ queueSettings.setString("qpid.trace.id", args.i_tag);
+ } else {
+ const string& peerTag = c.getFederationPeerTag();
+ if (peerTag.size())
+ queueSettings.setString("qpid.trace.id", peerTag);
+ }
+
+ if (args.i_excludes.size()) {
+ queueSettings.setString("qpid.trace.exclude", args.i_excludes);
+ } else {
+ const string& localTag = link->getBroker()->getFederationTag();
+ if (localTag.size())
+ queueSettings.setString("qpid.trace.exclude", localTag);
+ }
+
+ bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues?
+ bool exclusive = true; // only exclusive if the queue is owned by the bridge
+ bool autoDelete = exclusive && !durable;//auto delete transient queues?
+ peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings);
}
-
- if (args.i_excludes.size()) {
- queueSettings.setString("qpid.trace.exclude", args.i_excludes);
+ if (!args.i_dynamic)
+ peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable());
+ peer->getMessage().subscribe(queueName, args.i_dest, ack_mode, 0, false, "", 0, options);
+ peer->getMessage().flow(args.i_dest, 0, credit);
+ peer->getMessage().flow(args.i_dest, 1, LinkRegistry::INFINITE_CREDIT);
+ if (args.i_dynamic) {
+ Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src);
+ if (exchange.get() == 0)
+ throw Exception("Exchange not found for dynamic route");
+ exchange->registerDynamicBridge(this);
+ QPID_LOG(debug, "Activated bridge " << name << " for dynamic route for exchange " << args.i_src);
} else {
- const string& localTag = link->getBroker()->getFederationTag();
- if (localTag.size())
- queueSettings.setString("qpid.trace.exclude", localTag);
+ QPID_LOG(debug, "Activated bridge " << name << " for static route from exchange " << args.i_src << " to " << args.i_dest);
}
-
- bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues?
- bool exclusive = true; // only exclusive if the queue is owned by the bridge
- bool autoDelete = exclusive && !durable;//auto delete transient queues?
- peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings);
- }
- if (!args.i_dynamic)
- peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable());
- peer->getMessage().subscribe(queueName, args.i_dest, (useExistingQueue && args.i_sync) ? 0 : 1, 0, false, "", 0, options);
- peer->getMessage().flow(args.i_dest, 0, (useExistingQueue && args.i_sync) ? 2 * args.i_sync : 0xFFFFFFFF);
- peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
-
- if (args.i_dynamic) {
- Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src);
- if (exchange.get() == 0)
- throw Exception("Exchange not found for dynamic route");
- exchange->registerDynamicBridge(this);
- QPID_LOG(debug, "Activated bridge " << name << " for dynamic route for exchange " << args.i_src);
- } else {
- QPID_LOG(debug, "Activated bridge " << name << " for static route from exchange " << args.i_src << " to " << args.i_dest);
}
}
if (args.i_srcIsLocal) sessionHandler.getSession()->enableReceiverTracking();
@@ -260,6 +292,7 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
buffer.getShortString(excludes);
bool dynamic(buffer.getOctet());
uint16_t sync = buffer.getShort();
+ uint32_t credit = buffer.getLong();
if (kind == ENCODED_IDENTIFIER_V1) {
/** previous versions did not provide a name for the bridge, so create one
@@ -268,7 +301,7 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
}
return links.declare(name, *link, durable, src, dest, key, is_queue,
- is_local, id, excludes, dynamic, sync).first;
+ is_local, id, excludes, dynamic, sync, credit).first;
}
void Bridge::encode(Buffer& buffer) const
@@ -286,6 +319,7 @@ void Bridge::encode(Buffer& buffer) const
buffer.putShortString(args.i_excludes);
buffer.putOctet(args.i_dynamic ? 1 : 0);
buffer.putShort(args.i_sync);
+ buffer.putLong(args.i_credit);
}
uint32_t Bridge::encodedSize() const
@@ -302,7 +336,8 @@ uint32_t Bridge::encodedSize() const
+ args.i_tag.size() + 1
+ args.i_excludes.size() + 1
+ 1 // dynamic
- + 2; // sync
+ + 2 // sync
+ + 4; // credit
}
management::ManagementObject::shared_ptr Bridge::GetManagementObject(void) const