diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2013-04-11 21:47:40 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2013-04-11 21:47:40 +0000 |
commit | 332410c66c62d5e075e9f9077d29fc4669e11db0 (patch) | |
tree | e856ee2846d664f98b21f3cb01b3989ba038241c | |
parent | 25eaa7e072a1f4bdfc592ac3c4ca57e265a04d40 (diff) | |
download | qpid-python-332410c66c62d5e075e9f9077d29fc4669e11db0.tar.gz |
QPID-4728: add 'credit' parameter to Federation Bridge configuration.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1467107 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.cpp | 127 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/LinkRegistry.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/LinkRegistry.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 1 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/federation.py | 135 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/federation_sys.py | 2 | ||||
-rw-r--r-- | qpid/specs/management-schema.xml | 2 | ||||
-rwxr-xr-x | qpid/tools/src/py/qpid-route | 12 |
11 files changed, 213 insertions, 79 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 68bdf6d474..ae83e4415b 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -54,6 +54,8 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace { const std::string QPID_REPLICATE("qpid.replicate"); const std::string NONE("none"); +const uint8_t EXPLICIT_ACK(0); // msg.accept required to be sent +const uint8_t IMPLIED_ACK(1); // msg.accept assumed, not sent } namespace qpid { @@ -76,12 +78,19 @@ Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id, useExistingQueue(!_queueName.empty()), sessionName("qpid.bridge_session_" + name + "_" + link->getBroker()->getFederationTag()) { + // If both acks (i_sync) and limited credit is configured, then we'd + // better be able to sync before running out of credit or we + // may stall (note: i_credit==0 means "unlimited") + if (args.i_credit && args.i_sync && args.i_sync > args.i_credit) + throw Exception("The credit value must be greater than configured sync (ack) interval."); + ManagementAgent* agent = link->getBroker()->getManagementAgent(); if (agent != 0) { mgmtObject = _qmf::Bridge::shared_ptr(new _qmf::Bridge (agent, this, link, name, args.i_durable, args.i_src, args.i_dest, args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, - args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync)); + args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync, + args.i_credit)); mgmtObject->set_channelId(channel); agent->addObject(mgmtObject); } @@ -98,8 +107,7 @@ void Bridge::create(Connection& c) detached = false; // Reset detached in case we are recovering. connState = &c; conn = &c; - FieldTable options; - if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync); + SessionHandler& sessionHandler = c.getChannel(channel); sessionHandler.setErrorListener(shared_from_this()); if (args.i_srcIsLocal) { @@ -121,51 +129,75 @@ void Bridge::create(Connection& c) } if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking(); - if (initialize) initialize(*this, sessionHandler); - else if (args.i_srcIsQueue) { - peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options); - peer->getMessage().flow(args.i_dest, 0, args.i_sync ? 2 * args.i_sync : 0xFFFFFFFF); - peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); - QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest); - } else { - if (!useExistingQueue) { - FieldTable queueSettings; - if (args.i_tag.size()) { - queueSettings.setString("qpid.trace.id", args.i_tag); - } else { - const string& peerTag = c.getFederationPeerTag(); - if (peerTag.size()) - queueSettings.setString("qpid.trace.id", peerTag); + if (initialize) { + initialize(*this, sessionHandler); // custom subscription initializer supplied + } else { + // will a temp queue be created for this bridge? + const bool temp_queue = !args.i_srcIsQueue && !useExistingQueue; + // UI convention: user specifies 0 for infinite credit + const uint32_t credit = (args.i_credit == 0) ? LinkRegistry::INFINITE_CREDIT : args.i_credit; + // use explicit acks only for non-temp queues, useless for temp queues since they are + // destroyed when the session drops (can't resend unacked msgs) + const uint8_t ack_mode = (args.i_sync && !temp_queue) ? EXPLICIT_ACK : IMPLIED_ACK; + + // configure command.sync frequency + FieldTable options; + uint32_t freq = 0; + if (ack_mode == EXPLICIT_ACK) { // user explicitly configured syncs + freq = uint32_t(args.i_sync); + } else if (credit && credit != LinkRegistry::INFINITE_CREDIT) { + // force occasional sync to keep from stalling due to lack of credit + freq = (credit + 1)/2; + } + if (freq) + options.setInt("qpid.sync_frequency", freq); + + // create a subscription on the remote + if (args.i_srcIsQueue) { + peer->getMessage().subscribe(args.i_src, args.i_dest, ack_mode, 0, false, "", 0, options); + peer->getMessage().flow(args.i_dest, 0, credit); // message credit + peer->getMessage().flow(args.i_dest, 1, LinkRegistry::INFINITE_CREDIT); // byte credit + QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest); + } else { + if (!useExistingQueue) { + FieldTable queueSettings; + + if (args.i_tag.size()) { + queueSettings.setString("qpid.trace.id", args.i_tag); + } else { + const string& peerTag = c.getFederationPeerTag(); + if (peerTag.size()) + queueSettings.setString("qpid.trace.id", peerTag); + } + + if (args.i_excludes.size()) { + queueSettings.setString("qpid.trace.exclude", args.i_excludes); + } else { + const string& localTag = link->getBroker()->getFederationTag(); + if (localTag.size()) + queueSettings.setString("qpid.trace.exclude", localTag); + } + + bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues? + bool exclusive = true; // only exclusive if the queue is owned by the bridge + bool autoDelete = exclusive && !durable;//auto delete transient queues? + peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings); } - - if (args.i_excludes.size()) { - queueSettings.setString("qpid.trace.exclude", args.i_excludes); + if (!args.i_dynamic) + peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable()); + peer->getMessage().subscribe(queueName, args.i_dest, ack_mode, 0, false, "", 0, options); + peer->getMessage().flow(args.i_dest, 0, credit); + peer->getMessage().flow(args.i_dest, 1, LinkRegistry::INFINITE_CREDIT); + if (args.i_dynamic) { + Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src); + if (exchange.get() == 0) + throw Exception("Exchange not found for dynamic route"); + exchange->registerDynamicBridge(this); + QPID_LOG(debug, "Activated bridge " << name << " for dynamic route for exchange " << args.i_src); } else { - const string& localTag = link->getBroker()->getFederationTag(); - if (localTag.size()) - queueSettings.setString("qpid.trace.exclude", localTag); + QPID_LOG(debug, "Activated bridge " << name << " for static route from exchange " << args.i_src << " to " << args.i_dest); } - - bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues? - bool exclusive = true; // only exclusive if the queue is owned by the bridge - bool autoDelete = exclusive && !durable;//auto delete transient queues? - peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings); - } - if (!args.i_dynamic) - peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable()); - peer->getMessage().subscribe(queueName, args.i_dest, (useExistingQueue && args.i_sync) ? 0 : 1, 0, false, "", 0, options); - peer->getMessage().flow(args.i_dest, 0, (useExistingQueue && args.i_sync) ? 2 * args.i_sync : 0xFFFFFFFF); - peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); - - if (args.i_dynamic) { - Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src); - if (exchange.get() == 0) - throw Exception("Exchange not found for dynamic route"); - exchange->registerDynamicBridge(this); - QPID_LOG(debug, "Activated bridge " << name << " for dynamic route for exchange " << args.i_src); - } else { - QPID_LOG(debug, "Activated bridge " << name << " for static route from exchange " << args.i_src << " to " << args.i_dest); } } if (args.i_srcIsLocal) sessionHandler.getSession()->enableReceiverTracking(); @@ -260,6 +292,7 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) buffer.getShortString(excludes); bool dynamic(buffer.getOctet()); uint16_t sync = buffer.getShort(); + uint32_t credit = buffer.getLong(); if (kind == ENCODED_IDENTIFIER_V1) { /** previous versions did not provide a name for the bridge, so create one @@ -268,7 +301,7 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) } return links.declare(name, *link, durable, src, dest, key, is_queue, - is_local, id, excludes, dynamic, sync).first; + is_local, id, excludes, dynamic, sync, credit).first; } void Bridge::encode(Buffer& buffer) const @@ -286,6 +319,7 @@ void Bridge::encode(Buffer& buffer) const buffer.putShortString(args.i_excludes); buffer.putOctet(args.i_dynamic ? 1 : 0); buffer.putShort(args.i_sync); + buffer.putLong(args.i_credit); } uint32_t Bridge::encodedSize() const @@ -302,7 +336,8 @@ uint32_t Bridge::encodedSize() const + args.i_tag.size() + 1 + args.i_excludes.size() + 1 + 1 // dynamic - + 2; // sync + + 2 // sync + + 4; // credit } management::ManagementObject::shared_ptr Bridge::GetManagementObject(void) const diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 098ffe34c4..ab5398a0e8 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -624,6 +624,7 @@ const std::string SRC_IS_QUEUE("srcIsQueue"); const std::string SRC_IS_LOCAL("srcIsLocal"); const std::string DYNAMIC("dynamic"); const std::string SYNC("sync"); +const std::string CREDIT("credit"); // parameters for deleting a Queue object const std::string IF_EMPTY("if_empty"); @@ -840,6 +841,7 @@ void Broker::createObject(const std::string& type, const std::string& name, bool srcIsLocal = false; bool dynamic = false; uint16_t sync = 0; + uint32_t credit = LinkRegistry::INFINITE_CREDIT; for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { @@ -853,6 +855,7 @@ void Broker::createObject(const std::string& type, const std::string& name, else if (i->first == SRC_IS_LOCAL) srcIsLocal = bool(i->second); else if (i->first == DYNAMIC) dynamic = bool(i->second); else if (i->first == SYNC) sync = i->second.asUint16(); + else if (i->first == CREDIT) credit = i->second.asUint32(); else if (i->first == DURABLE) durable = bool(i->second); else if (i->first == QUEUE_NAME) queueName = i->second.asString(); else { @@ -867,7 +870,7 @@ void Broker::createObject(const std::string& type, const std::string& name, } std::pair<Bridge::shared_ptr, bool> rc = links.declare(name, *link, durable, src, dest, key, srcIsQueue, srcIsLocal, id, excludes, - dynamic, sync, + dynamic, sync, credit, 0, queueName); diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 5d01a567b5..3380708c0e 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -733,7 +733,7 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te *this, iargs.i_durable, iargs.i_src, iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue, iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes, - iargs.i_dynamic, iargs.i_sync); + iargs.i_dynamic, iargs.i_sync, iargs.i_credit); if (!rc.first) { text = "invalid parameters"; return Manageable::STATUS_PARAMETER_INVALID; diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp index 5af6053943..8642294d06 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp @@ -168,6 +168,7 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& name, const std::string& excludes, bool dynamic, uint16_t sync, + uint32_t credit, Bridge::InitializeCallback init, const std::string& queueName, const std::string& altExchange @@ -209,6 +210,7 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& name, args.i_excludes = excludes; args.i_dynamic = dynamic; args.i_sync = sync; + args.i_credit = credit; bridge = Bridge::shared_ptr (new Bridge (name, &link, link.nextChannel(), diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h index 21e8ddec81..e5b1c40781 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.h +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h @@ -108,10 +108,13 @@ namespace broker { const std::string& excludes, bool dynamic, uint16_t sync, + uint32_t credit, Bridge::InitializeCallback=0, const std::string& queueName="", const std::string& altExchange="" ); + QPID_BROKER_EXTERN static const uint32_t INFINITE_CREDIT = 0xFFFFFFFF; + /** determine if Bridge exists */ QPID_BROKER_EXTERN Bridge::shared_ptr getBridge(const std::string& name); diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 42e54a0125..076bcac63f 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -326,6 +326,7 @@ void BrokerReplicator::initialize() { "", // excludes false, // dynamic 0, // sync? + LinkRegistry::INFINITE_CREDIT, // shared_ptr keeps this in memory until outstanding connected // calls are run. boost::bind(&BrokerReplicator::connected, shared_from_this(), _1, _2) diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 6fe49bc1af..dece9dd045 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -137,6 +137,7 @@ void QueueReplicator::activate() { "", // excludes false, // dynamic 0, // sync? + LinkRegistry::INFINITE_CREDIT, // Include shared_ptr to self to ensure we are not deleted // before initializeBridge is called. boost::bind(&QueueReplicator::initializeBridge, shared_from_this(), _1, _2) diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py index 6477c6effd..0da5b47ac2 100755 --- a/qpid/cpp/src/tests/federation.py +++ b/qpid/cpp/src/tests/federation.py @@ -120,7 +120,8 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0, result) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", "", False, False, False, 0) + result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", + "", False, False, False, 0, 0) self.assertEqual(result.status, 0, result) bridge = qmf.getObjects(_class="bridge")[0] @@ -199,7 +200,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0, result) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False, 0) + result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False, 0, 0) self.assertEqual(result.status, 0, result) bridge = qmf.getObjects(_class="bridge")[0] @@ -257,7 +258,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0, result) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1) + result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1, 0) self.assertEqual(result.status, 0, result) bridge = qmf.getObjects(_class="bridge")[0] @@ -310,7 +311,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0, result) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1) + result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1, 0) self.assertEqual(result.status, 0, result) bridge = qmf.getObjects(_class="bridge")[0] @@ -361,8 +362,8 @@ class FederationTests(TestBase010): l_link = self.qmf.getObjects(_class="link", _broker=l_broker)[0] r_link = self.qmf.getObjects(_class="link", _broker=r_broker)[0] - l_res = l_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False, 0) - r_res = r_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False, 0) + l_res = l_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False, 0, 0) + r_res = r_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False, 0, 0) self.assertEqual(l_res.status, 0) self.assertEqual(r_res.status, 0) @@ -416,7 +417,7 @@ class FederationTests(TestBase010): link = qmf.getObjects(_class="link")[0] result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "my-bridge-id", - "exclude-me,also-exclude-me", False, False, False, 0) + "exclude-me,also-exclude-me", False, False, False, 0, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] @@ -474,7 +475,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "fed.fanout", "fed.fanout", "", "", "", False, False, True, 0) + result = link.bridge(False, "fed.fanout", "fed.fanout", "", "", "", False, False, True, 0, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] sleep(5) @@ -519,7 +520,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "fed.direct", "fed.direct", "", "", "", False, False, True, 0) + result = link.bridge(False, "fed.direct", "fed.direct", "", "", "", False, False, True, 0, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] sleep(5) @@ -563,7 +564,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True, 0) + result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True, 0, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] sleep(5) @@ -615,9 +616,9 @@ class FederationTests(TestBase010): queue2 = session.incoming("f2") link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "fed.topic_reorigin", "fed.topic_reorigin", "", "", "", False, False, True, 0) + result = link.bridge(False, "fed.topic_reorigin", "fed.topic_reorigin", "", "", "", False, False, True, 0, 0) self.assertEqual(result.status, 0) - result = link.bridge(False, "fed.topic_reorigin_2", "fed.topic_reorigin_2", "", "", "", False, False, True, 0) + result = link.bridge(False, "fed.topic_reorigin_2", "fed.topic_reorigin_2", "", "", "", False, False, True, 0, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] @@ -687,9 +688,9 @@ class FederationTests(TestBase010): queue2 = session.incoming("f2") link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "fed.direct_reorigin", "fed.direct_reorigin", "", "", "", False, False, True, 0) + result = link.bridge(False, "fed.direct_reorigin", "fed.direct_reorigin", "", "", "", False, False, True, 0, 0) self.assertEqual(result.status, 0) - result = link.bridge(False, "fed.direct_reorigin_2", "fed.direct_reorigin_2", "", "", "", False, False, True, 0) + result = link.bridge(False, "fed.direct_reorigin_2", "fed.direct_reorigin_2", "", "", "", False, False, True, 0, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] @@ -750,7 +751,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "fed.headers", "fed.headers", "", "", "", False, False, True, 0) + result = link.bridge(False, "fed.headers", "fed.headers", "", "", "", False, False, True, 0, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] sleep(5) @@ -803,9 +804,9 @@ class FederationTests(TestBase010): queue2 = session.incoming("f2") link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "fed.headers_reorigin", "fed.headers_reorigin", "", "", "", False, False, True, 0) + result = link.bridge(False, "fed.headers_reorigin", "fed.headers_reorigin", "", "", "", False, False, True, 0, 0) self.assertEqual(result.status, 0) - result = link.bridge(False, "fed.headers_reorigin_2", "fed.headers_reorigin_2", "", "", "", False, False, True, 0) + result = link.bridge(False, "fed.headers_reorigin_2", "fed.headers_reorigin_2", "", "", "", False, False, True, 0, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] @@ -859,7 +860,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "fed.headers_unbind", "fed.headers_unbind", "", "", "", False, False, True, 0) + result = link.bridge(False, "fed.headers_unbind", "fed.headers_unbind", "", "", "", False, False, True, 0, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] sleep(5) @@ -904,7 +905,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "fed.xml", "fed.xml", "", "", "", False, False, True, 0) + result = link.bridge(False, "fed.xml", "fed.xml", "", "", "", False, False, True, 0, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] @@ -958,10 +959,10 @@ class FederationTests(TestBase010): queue2 = session.incoming("f2") link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "fed.xml_reorigin", "fed.xml_reorigin", "", "", "", False, False, True, 0) + result = link.bridge(False, "fed.xml_reorigin", "fed.xml_reorigin", "", "", "", False, False, True, 0, 0) self.assertEqual(result.status, 0) - result = link.bridge(False, "fed.xml_reorigin_2", "fed.xml_reorigin_2", "", "", "", False, False, True, 0) + result = link.bridge(False, "fed.xml_reorigin_2", "fed.xml_reorigin_2", "", "", "", False, False, True, 0, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] @@ -1016,7 +1017,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "fed.xml_unbind", "fed.xml_unbind", "", "", "", False, False, True, 0) + result = link.bridge(False, "fed.xml_unbind", "fed.xml_unbind", "", "", "", False, False, True, 0, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] @@ -1064,7 +1065,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True, 0) + result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True, 0, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] sleep(5) @@ -1175,7 +1176,8 @@ class FederationTests(TestBase010): False, # srcIsQueue False, # srcIsLocal True, # dynamic - 0) # sync + 0, # sync + 0) # credit self.assertEqual(result.status, 0) # wait for the inter-broker links to become operational @@ -1433,7 +1435,8 @@ class FederationTests(TestBase010): False, # srcIsQueue False, # srcIsLocal True, # dynamic - 0) # sync + 0, # sync + 0) # credit self.assertEqual(result.status, 0) # wait for the inter-broker links to become operational @@ -1689,7 +1692,8 @@ class FederationTests(TestBase010): False, # srcIsQueue False, # srcIsLocal True, # dynamic - 0) # sync + 0, # sync + 0) # credit self.assertEqual(result.status, 0) # wait for the inter-broker links to become operational @@ -2018,7 +2022,8 @@ class FederationTests(TestBase010): False, # srcIsQueue False, # srcIsLocal True, # dynamic - 0) # sync + 0, # sync + 0) # credit self.assertEqual(result.status, 0) # wait for all the inter-broker links to become operational @@ -2106,7 +2111,8 @@ class FederationTests(TestBase010): False, # srcIsQueue False, # srcIsLocal True, # dynamic - 0) # sync + 0, # sync + 0) # credit self.assertEqual(result.status, 0) binding_counts = [2, 2] @@ -2710,3 +2716,76 @@ class FederationTests(TestBase010): self._teardown_brokers() self.verify_cleanup() + + def test_credit(self): + """ Test a federation link configured to use explict acks and a credit + limit + """ + session = self.session + + # setup queue on remote broker and add some messages + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_credit") + r_session.queue_declare(queue="my-bridge-queue", auto_delete=True) + + #setup queue to receive messages from local broker + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="amq.fanout") + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0, result) + + link = qmf.getObjects(_class="link")[0] + + # now wait for Link to go operational + retries = 0 + operational = False + while not operational: + link.update() + if link.state == "Operational": + operational = True; + if not operational: + retries += 1 + self.failIfEqual(retries, 10, + "inter-broker links failed to become operational.") + sleep(1) + + # create the subscription + result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", + "", "", True, False, False, + 3, # explicit ack, with sync every 3 msgs + 7) # msg credit + self.assertEqual(result.status, 0, result) + bridge = qmf.getObjects(_class="bridge")[0] + + # generate enough traffic to trigger flow control and syncs + for i in range(1000): + dp = r_session.delivery_properties(routing_key="my-bridge-queue") + r_session.message_transfer(message=Message(dp, "Message %d" % i)) + + for i in range(1000): + try: + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + except Empty: + self.fail("Failed to find expected message containing 'Message %d'" % i) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0, result) + result = link.close() + self.assertEqual(result.status, 0, result) + + r_session.close() + r_conn.close() + + self.verify_cleanup() + diff --git a/qpid/cpp/src/tests/federation_sys.py b/qpid/cpp/src/tests/federation_sys.py index e2553e4cf3..be9613bb9f 100755 --- a/qpid/cpp/src/tests/federation_sys.py +++ b/qpid/cpp/src/tests/federation_sys.py @@ -462,7 +462,7 @@ class QmfTestBase010(TestBase010): if b is not None: return b # Does not exist, so create it - self._check_qmf_return(qmf_link.bridge(bridge_durable_flag, src, dest, key, "", "", queue_route_type_flag, False, False, 1)) + self._check_qmf_return(qmf_link.bridge(bridge_durable_flag, src, dest, key, "", "", queue_route_type_flag, False, False, 1, 0)) b = self._find_qmf_bridge(qmf_broker_proxy, qmf_link, src, dest, key) self.assertNotEqual(b, None, "Bridge creation failed: src=%s dest=%s key=%s" % (src, dest, key)) return b diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml index 128d3b054c..58c6d59716 100644 --- a/qpid/specs/management-schema.xml +++ b/qpid/specs/management-schema.xml @@ -451,6 +451,7 @@ <arg name="srcIsLocal" dir="I" type="bool"/> <arg name="dynamic" dir="I" type="bool"/> <arg name="sync" dir="I" type="uint16"/> + <arg name="credit" dir="I" type="uint32" default="0xFFFFFFFF" desc="granted to peer, 0 = infinite"/> </method> </class> @@ -474,6 +475,7 @@ <property name="excludes" type="sstr" access="RC"/> <property name="dynamic" type="bool" access="RC"/> <property name="sync" type="uint16" access="RC"/> + <property name="credit" type="uint32" access="RC"/> <method name="close"/> </class> diff --git a/qpid/tools/src/py/qpid-route b/qpid/tools/src/py/qpid-route index 7cf52e0a67..1d688de52c 100755 --- a/qpid/tools/src/py/qpid-route +++ b/qpid/tools/src/py/qpid-route @@ -60,6 +60,7 @@ class Config: self._srclocal = False self._transport = "tcp" self._ack = 0 + self._credit = 0xFFFFFFFF # unlimited self._connTimeout = 10 self._conn_options = {} @@ -93,6 +94,8 @@ def OptionsAndArguments(argv): parser.add_option("-s", "--src-local", action="store_true", help="Make connection to source broker (push route)") parser.add_option("--ack", action="store", type="int", metavar="<n>", help="Acknowledge transfers over the bridge in batches of N") + parser.add_option("--credit", action="store", type="int", default=0xFFFFFFFF, metavar="<msgs>", + help="Maximum number of messages a sender can have outstanding (0=unlimited)") parser.add_option("-t", "--transport", action="store", type="string", default="tcp", metavar="<transport>", help="Transport to use for links, defaults to tcp") parser.add_option("--client-sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). Used when the client connects to the destination broker (not for authentication between the source and destination brokers - that is specified using the [mechanisms] argument to 'add route'). SASL automatically picks the most secure available mechanism - use this option to override.") @@ -136,6 +139,8 @@ def OptionsAndArguments(argv): if opts.ack: config._ack = opts.ack + config._credit = opts.credit + if opts.client_sasl_mechanism: config._conn_options['mechanisms'] = opts.client_sasl_mechanism @@ -327,7 +332,9 @@ class RouteManager: if config._verbose: print "Creating inter-broker binding..." - res = link.bridge(config._durable, exchange, exchange, routingKey, tag, excludes, False, config._srclocal, dynamic, config._ack) + res = link.bridge(config._durable, exchange, exchange, routingKey, tag, + excludes, False, config._srclocal, dynamic, + config._ack, config._credit) if res.status != 0: raise Exception(res.text) if config._verbose: @@ -349,7 +356,8 @@ class RouteManager: if config._verbose: print "Creating inter-broker binding..." - res = link.bridge(config._durable, queue, exchange, "", "", "", True, config._srclocal, False, config._ack) + res = link.bridge(config._durable, queue, exchange, "", "", "", True, + config._srclocal, False, config._ack, config._credit) if res.status != 0: raise Exception(res.text) if config._verbose: |