diff options
| author | Ken Giusti <kgiusti@apache.org> | 2013-04-11 21:47:40 +0000 |
|---|---|---|
| committer | Ken Giusti <kgiusti@apache.org> | 2013-04-11 21:47:40 +0000 |
| commit | 8b47505e4e783167cd666b7e6f8aca547c9a96e1 (patch) | |
| tree | 1e42ea111b6cf729d046b7650eaad73f579fd91b /cpp/src/qpid/broker/Bridge.cpp | |
| parent | edde6c21a5749a54964b6fa69e8f9bbfb940fc21 (diff) | |
| download | qpid-python-8b47505e4e783167cd666b7e6f8aca547c9a96e1.tar.gz | |
QPID-4728: add 'credit' parameter to Federation Bridge configuration.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1467107 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Bridge.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Bridge.cpp | 127 |
1 files changed, 81 insertions, 46 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index 68bdf6d474..ae83e4415b 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -54,6 +54,8 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace { const std::string QPID_REPLICATE("qpid.replicate"); const std::string NONE("none"); +const uint8_t EXPLICIT_ACK(0); // msg.accept required to be sent +const uint8_t IMPLIED_ACK(1); // msg.accept assumed, not sent } namespace qpid { @@ -76,12 +78,19 @@ Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id, useExistingQueue(!_queueName.empty()), sessionName("qpid.bridge_session_" + name + "_" + link->getBroker()->getFederationTag()) { + // If both acks (i_sync) and limited credit is configured, then we'd + // better be able to sync before running out of credit or we + // may stall (note: i_credit==0 means "unlimited") + if (args.i_credit && args.i_sync && args.i_sync > args.i_credit) + throw Exception("The credit value must be greater than configured sync (ack) interval."); + ManagementAgent* agent = link->getBroker()->getManagementAgent(); if (agent != 0) { mgmtObject = _qmf::Bridge::shared_ptr(new _qmf::Bridge (agent, this, link, name, args.i_durable, args.i_src, args.i_dest, args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, - args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync)); + args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync, + args.i_credit)); mgmtObject->set_channelId(channel); agent->addObject(mgmtObject); } @@ -98,8 +107,7 @@ void Bridge::create(Connection& c) detached = false; // Reset detached in case we are recovering. connState = &c; conn = &c; - FieldTable options; - if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync); + SessionHandler& sessionHandler = c.getChannel(channel); sessionHandler.setErrorListener(shared_from_this()); if (args.i_srcIsLocal) { @@ -121,51 +129,75 @@ void Bridge::create(Connection& c) } if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking(); - if (initialize) initialize(*this, sessionHandler); - else if (args.i_srcIsQueue) { - peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options); - peer->getMessage().flow(args.i_dest, 0, args.i_sync ? 2 * args.i_sync : 0xFFFFFFFF); - peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); - QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest); - } else { - if (!useExistingQueue) { - FieldTable queueSettings; - if (args.i_tag.size()) { - queueSettings.setString("qpid.trace.id", args.i_tag); - } else { - const string& peerTag = c.getFederationPeerTag(); - if (peerTag.size()) - queueSettings.setString("qpid.trace.id", peerTag); + if (initialize) { + initialize(*this, sessionHandler); // custom subscription initializer supplied + } else { + // will a temp queue be created for this bridge? + const bool temp_queue = !args.i_srcIsQueue && !useExistingQueue; + // UI convention: user specifies 0 for infinite credit + const uint32_t credit = (args.i_credit == 0) ? LinkRegistry::INFINITE_CREDIT : args.i_credit; + // use explicit acks only for non-temp queues, useless for temp queues since they are + // destroyed when the session drops (can't resend unacked msgs) + const uint8_t ack_mode = (args.i_sync && !temp_queue) ? EXPLICIT_ACK : IMPLIED_ACK; + + // configure command.sync frequency + FieldTable options; + uint32_t freq = 0; + if (ack_mode == EXPLICIT_ACK) { // user explicitly configured syncs + freq = uint32_t(args.i_sync); + } else if (credit && credit != LinkRegistry::INFINITE_CREDIT) { + // force occasional sync to keep from stalling due to lack of credit + freq = (credit + 1)/2; + } + if (freq) + options.setInt("qpid.sync_frequency", freq); + + // create a subscription on the remote + if (args.i_srcIsQueue) { + peer->getMessage().subscribe(args.i_src, args.i_dest, ack_mode, 0, false, "", 0, options); + peer->getMessage().flow(args.i_dest, 0, credit); // message credit + peer->getMessage().flow(args.i_dest, 1, LinkRegistry::INFINITE_CREDIT); // byte credit + QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest); + } else { + if (!useExistingQueue) { + FieldTable queueSettings; + + if (args.i_tag.size()) { + queueSettings.setString("qpid.trace.id", args.i_tag); + } else { + const string& peerTag = c.getFederationPeerTag(); + if (peerTag.size()) + queueSettings.setString("qpid.trace.id", peerTag); + } + + if (args.i_excludes.size()) { + queueSettings.setString("qpid.trace.exclude", args.i_excludes); + } else { + const string& localTag = link->getBroker()->getFederationTag(); + if (localTag.size()) + queueSettings.setString("qpid.trace.exclude", localTag); + } + + bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues? + bool exclusive = true; // only exclusive if the queue is owned by the bridge + bool autoDelete = exclusive && !durable;//auto delete transient queues? + peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings); } - - if (args.i_excludes.size()) { - queueSettings.setString("qpid.trace.exclude", args.i_excludes); + if (!args.i_dynamic) + peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable()); + peer->getMessage().subscribe(queueName, args.i_dest, ack_mode, 0, false, "", 0, options); + peer->getMessage().flow(args.i_dest, 0, credit); + peer->getMessage().flow(args.i_dest, 1, LinkRegistry::INFINITE_CREDIT); + if (args.i_dynamic) { + Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src); + if (exchange.get() == 0) + throw Exception("Exchange not found for dynamic route"); + exchange->registerDynamicBridge(this); + QPID_LOG(debug, "Activated bridge " << name << " for dynamic route for exchange " << args.i_src); } else { - const string& localTag = link->getBroker()->getFederationTag(); - if (localTag.size()) - queueSettings.setString("qpid.trace.exclude", localTag); + QPID_LOG(debug, "Activated bridge " << name << " for static route from exchange " << args.i_src << " to " << args.i_dest); } - - bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues? - bool exclusive = true; // only exclusive if the queue is owned by the bridge - bool autoDelete = exclusive && !durable;//auto delete transient queues? - peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings); - } - if (!args.i_dynamic) - peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable()); - peer->getMessage().subscribe(queueName, args.i_dest, (useExistingQueue && args.i_sync) ? 0 : 1, 0, false, "", 0, options); - peer->getMessage().flow(args.i_dest, 0, (useExistingQueue && args.i_sync) ? 2 * args.i_sync : 0xFFFFFFFF); - peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); - - if (args.i_dynamic) { - Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src); - if (exchange.get() == 0) - throw Exception("Exchange not found for dynamic route"); - exchange->registerDynamicBridge(this); - QPID_LOG(debug, "Activated bridge " << name << " for dynamic route for exchange " << args.i_src); - } else { - QPID_LOG(debug, "Activated bridge " << name << " for static route from exchange " << args.i_src << " to " << args.i_dest); } } if (args.i_srcIsLocal) sessionHandler.getSession()->enableReceiverTracking(); @@ -260,6 +292,7 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) buffer.getShortString(excludes); bool dynamic(buffer.getOctet()); uint16_t sync = buffer.getShort(); + uint32_t credit = buffer.getLong(); if (kind == ENCODED_IDENTIFIER_V1) { /** previous versions did not provide a name for the bridge, so create one @@ -268,7 +301,7 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) } return links.declare(name, *link, durable, src, dest, key, is_queue, - is_local, id, excludes, dynamic, sync).first; + is_local, id, excludes, dynamic, sync, credit).first; } void Bridge::encode(Buffer& buffer) const @@ -286,6 +319,7 @@ void Bridge::encode(Buffer& buffer) const buffer.putShortString(args.i_excludes); buffer.putOctet(args.i_dynamic ? 1 : 0); buffer.putShort(args.i_sync); + buffer.putLong(args.i_credit); } uint32_t Bridge::encodedSize() const @@ -302,7 +336,8 @@ uint32_t Bridge::encodedSize() const + args.i_tag.size() + 1 + args.i_excludes.size() + 1 + 1 // dynamic - + 2; // sync + + 2 // sync + + 4; // credit } management::ManagementObject::shared_ptr Bridge::GetManagementObject(void) const |
