summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2013-04-11 21:47:40 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2013-04-11 21:47:40 +0000
commit332410c66c62d5e075e9f9077d29fc4669e11db0 (patch)
treee856ee2846d664f98b21f3cb01b3989ba038241c
parent25eaa7e072a1f4bdfc592ac3c4ca57e265a04d40 (diff)
downloadqpid-python-332410c66c62d5e075e9f9077d29fc4669e11db0.tar.gz
QPID-4728: add 'credit' parameter to Federation Bridge configuration.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1467107 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp127
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.h3
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp1
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp1
-rwxr-xr-xqpid/cpp/src/tests/federation.py135
-rwxr-xr-xqpid/cpp/src/tests/federation_sys.py2
-rw-r--r--qpid/specs/management-schema.xml2
-rwxr-xr-xqpid/tools/src/py/qpid-route12
11 files changed, 213 insertions, 79 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 68bdf6d474..ae83e4415b 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -54,6 +54,8 @@ namespace _qmf = qmf::org::apache::qpid::broker;
namespace {
const std::string QPID_REPLICATE("qpid.replicate");
const std::string NONE("none");
+const uint8_t EXPLICIT_ACK(0); // msg.accept required to be sent
+const uint8_t IMPLIED_ACK(1); // msg.accept assumed, not sent
}
namespace qpid {
@@ -76,12 +78,19 @@ Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id,
useExistingQueue(!_queueName.empty()),
sessionName("qpid.bridge_session_" + name + "_" + link->getBroker()->getFederationTag())
{
+ // If both acks (i_sync) and limited credit is configured, then we'd
+ // better be able to sync before running out of credit or we
+ // may stall (note: i_credit==0 means "unlimited")
+ if (args.i_credit && args.i_sync && args.i_sync > args.i_credit)
+ throw Exception("The credit value must be greater than configured sync (ack) interval.");
+
ManagementAgent* agent = link->getBroker()->getManagementAgent();
if (agent != 0) {
mgmtObject = _qmf::Bridge::shared_ptr(new _qmf::Bridge
(agent, this, link, name, args.i_durable, args.i_src, args.i_dest,
args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
- args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync));
+ args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync,
+ args.i_credit));
mgmtObject->set_channelId(channel);
agent->addObject(mgmtObject);
}
@@ -98,8 +107,7 @@ void Bridge::create(Connection& c)
detached = false; // Reset detached in case we are recovering.
connState = &c;
conn = &c;
- FieldTable options;
- if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync);
+
SessionHandler& sessionHandler = c.getChannel(channel);
sessionHandler.setErrorListener(shared_from_this());
if (args.i_srcIsLocal) {
@@ -121,51 +129,75 @@ void Bridge::create(Connection& c)
}
if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking();
- if (initialize) initialize(*this, sessionHandler);
- else if (args.i_srcIsQueue) {
- peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options);
- peer->getMessage().flow(args.i_dest, 0, args.i_sync ? 2 * args.i_sync : 0xFFFFFFFF);
- peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
- QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest);
- } else {
- if (!useExistingQueue) {
- FieldTable queueSettings;
- if (args.i_tag.size()) {
- queueSettings.setString("qpid.trace.id", args.i_tag);
- } else {
- const string& peerTag = c.getFederationPeerTag();
- if (peerTag.size())
- queueSettings.setString("qpid.trace.id", peerTag);
+ if (initialize) {
+ initialize(*this, sessionHandler); // custom subscription initializer supplied
+ } else {
+ // will a temp queue be created for this bridge?
+ const bool temp_queue = !args.i_srcIsQueue && !useExistingQueue;
+ // UI convention: user specifies 0 for infinite credit
+ const uint32_t credit = (args.i_credit == 0) ? LinkRegistry::INFINITE_CREDIT : args.i_credit;
+ // use explicit acks only for non-temp queues, useless for temp queues since they are
+ // destroyed when the session drops (can't resend unacked msgs)
+ const uint8_t ack_mode = (args.i_sync && !temp_queue) ? EXPLICIT_ACK : IMPLIED_ACK;
+
+ // configure command.sync frequency
+ FieldTable options;
+ uint32_t freq = 0;
+ if (ack_mode == EXPLICIT_ACK) { // user explicitly configured syncs
+ freq = uint32_t(args.i_sync);
+ } else if (credit && credit != LinkRegistry::INFINITE_CREDIT) {
+ // force occasional sync to keep from stalling due to lack of credit
+ freq = (credit + 1)/2;
+ }
+ if (freq)
+ options.setInt("qpid.sync_frequency", freq);
+
+ // create a subscription on the remote
+ if (args.i_srcIsQueue) {
+ peer->getMessage().subscribe(args.i_src, args.i_dest, ack_mode, 0, false, "", 0, options);
+ peer->getMessage().flow(args.i_dest, 0, credit); // message credit
+ peer->getMessage().flow(args.i_dest, 1, LinkRegistry::INFINITE_CREDIT); // byte credit
+ QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest);
+ } else {
+ if (!useExistingQueue) {
+ FieldTable queueSettings;
+
+ if (args.i_tag.size()) {
+ queueSettings.setString("qpid.trace.id", args.i_tag);
+ } else {
+ const string& peerTag = c.getFederationPeerTag();
+ if (peerTag.size())
+ queueSettings.setString("qpid.trace.id", peerTag);
+ }
+
+ if (args.i_excludes.size()) {
+ queueSettings.setString("qpid.trace.exclude", args.i_excludes);
+ } else {
+ const string& localTag = link->getBroker()->getFederationTag();
+ if (localTag.size())
+ queueSettings.setString("qpid.trace.exclude", localTag);
+ }
+
+ bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues?
+ bool exclusive = true; // only exclusive if the queue is owned by the bridge
+ bool autoDelete = exclusive && !durable;//auto delete transient queues?
+ peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings);
}
-
- if (args.i_excludes.size()) {
- queueSettings.setString("qpid.trace.exclude", args.i_excludes);
+ if (!args.i_dynamic)
+ peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable());
+ peer->getMessage().subscribe(queueName, args.i_dest, ack_mode, 0, false, "", 0, options);
+ peer->getMessage().flow(args.i_dest, 0, credit);
+ peer->getMessage().flow(args.i_dest, 1, LinkRegistry::INFINITE_CREDIT);
+ if (args.i_dynamic) {
+ Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src);
+ if (exchange.get() == 0)
+ throw Exception("Exchange not found for dynamic route");
+ exchange->registerDynamicBridge(this);
+ QPID_LOG(debug, "Activated bridge " << name << " for dynamic route for exchange " << args.i_src);
} else {
- const string& localTag = link->getBroker()->getFederationTag();
- if (localTag.size())
- queueSettings.setString("qpid.trace.exclude", localTag);
+ QPID_LOG(debug, "Activated bridge " << name << " for static route from exchange " << args.i_src << " to " << args.i_dest);
}
-
- bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues?
- bool exclusive = true; // only exclusive if the queue is owned by the bridge
- bool autoDelete = exclusive && !durable;//auto delete transient queues?
- peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings);
- }
- if (!args.i_dynamic)
- peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable());
- peer->getMessage().subscribe(queueName, args.i_dest, (useExistingQueue && args.i_sync) ? 0 : 1, 0, false, "", 0, options);
- peer->getMessage().flow(args.i_dest, 0, (useExistingQueue && args.i_sync) ? 2 * args.i_sync : 0xFFFFFFFF);
- peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
-
- if (args.i_dynamic) {
- Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src);
- if (exchange.get() == 0)
- throw Exception("Exchange not found for dynamic route");
- exchange->registerDynamicBridge(this);
- QPID_LOG(debug, "Activated bridge " << name << " for dynamic route for exchange " << args.i_src);
- } else {
- QPID_LOG(debug, "Activated bridge " << name << " for static route from exchange " << args.i_src << " to " << args.i_dest);
}
}
if (args.i_srcIsLocal) sessionHandler.getSession()->enableReceiverTracking();
@@ -260,6 +292,7 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
buffer.getShortString(excludes);
bool dynamic(buffer.getOctet());
uint16_t sync = buffer.getShort();
+ uint32_t credit = buffer.getLong();
if (kind == ENCODED_IDENTIFIER_V1) {
/** previous versions did not provide a name for the bridge, so create one
@@ -268,7 +301,7 @@ Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
}
return links.declare(name, *link, durable, src, dest, key, is_queue,
- is_local, id, excludes, dynamic, sync).first;
+ is_local, id, excludes, dynamic, sync, credit).first;
}
void Bridge::encode(Buffer& buffer) const
@@ -286,6 +319,7 @@ void Bridge::encode(Buffer& buffer) const
buffer.putShortString(args.i_excludes);
buffer.putOctet(args.i_dynamic ? 1 : 0);
buffer.putShort(args.i_sync);
+ buffer.putLong(args.i_credit);
}
uint32_t Bridge::encodedSize() const
@@ -302,7 +336,8 @@ uint32_t Bridge::encodedSize() const
+ args.i_tag.size() + 1
+ args.i_excludes.size() + 1
+ 1 // dynamic
- + 2; // sync
+ + 2 // sync
+ + 4; // credit
}
management::ManagementObject::shared_ptr Bridge::GetManagementObject(void) const
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 098ffe34c4..ab5398a0e8 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -624,6 +624,7 @@ const std::string SRC_IS_QUEUE("srcIsQueue");
const std::string SRC_IS_LOCAL("srcIsLocal");
const std::string DYNAMIC("dynamic");
const std::string SYNC("sync");
+const std::string CREDIT("credit");
// parameters for deleting a Queue object
const std::string IF_EMPTY("if_empty");
@@ -840,6 +841,7 @@ void Broker::createObject(const std::string& type, const std::string& name,
bool srcIsLocal = false;
bool dynamic = false;
uint16_t sync = 0;
+ uint32_t credit = LinkRegistry::INFINITE_CREDIT;
for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
@@ -853,6 +855,7 @@ void Broker::createObject(const std::string& type, const std::string& name,
else if (i->first == SRC_IS_LOCAL) srcIsLocal = bool(i->second);
else if (i->first == DYNAMIC) dynamic = bool(i->second);
else if (i->first == SYNC) sync = i->second.asUint16();
+ else if (i->first == CREDIT) credit = i->second.asUint32();
else if (i->first == DURABLE) durable = bool(i->second);
else if (i->first == QUEUE_NAME) queueName = i->second.asString();
else {
@@ -867,7 +870,7 @@ void Broker::createObject(const std::string& type, const std::string& name,
}
std::pair<Bridge::shared_ptr, bool> rc =
links.declare(name, *link, durable, src, dest, key, srcIsQueue, srcIsLocal, id, excludes,
- dynamic, sync,
+ dynamic, sync, credit,
0,
queueName);
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 5d01a567b5..3380708c0e 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -733,7 +733,7 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te
*this, iargs.i_durable,
iargs.i_src, iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
- iargs.i_dynamic, iargs.i_sync);
+ iargs.i_dynamic, iargs.i_sync, iargs.i_credit);
if (!rc.first) {
text = "invalid parameters";
return Manageable::STATUS_PARAMETER_INVALID;
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
index 5af6053943..8642294d06 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -168,6 +168,7 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& name,
const std::string& excludes,
bool dynamic,
uint16_t sync,
+ uint32_t credit,
Bridge::InitializeCallback init,
const std::string& queueName,
const std::string& altExchange
@@ -209,6 +210,7 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& name,
args.i_excludes = excludes;
args.i_dynamic = dynamic;
args.i_sync = sync;
+ args.i_credit = credit;
bridge = Bridge::shared_ptr
(new Bridge (name, &link, link.nextChannel(),
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h
index 21e8ddec81..e5b1c40781 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.h
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h
@@ -108,10 +108,13 @@ namespace broker {
const std::string& excludes,
bool dynamic,
uint16_t sync,
+ uint32_t credit,
Bridge::InitializeCallback=0,
const std::string& queueName="",
const std::string& altExchange=""
);
+ QPID_BROKER_EXTERN static const uint32_t INFINITE_CREDIT = 0xFFFFFFFF;
+
/** determine if Bridge exists */
QPID_BROKER_EXTERN Bridge::shared_ptr
getBridge(const std::string& name);
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 42e54a0125..076bcac63f 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -326,6 +326,7 @@ void BrokerReplicator::initialize() {
"", // excludes
false, // dynamic
0, // sync?
+ LinkRegistry::INFINITE_CREDIT,
// shared_ptr keeps this in memory until outstanding connected
// calls are run.
boost::bind(&BrokerReplicator::connected, shared_from_this(), _1, _2)
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 6fe49bc1af..dece9dd045 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -137,6 +137,7 @@ void QueueReplicator::activate() {
"", // excludes
false, // dynamic
0, // sync?
+ LinkRegistry::INFINITE_CREDIT,
// Include shared_ptr to self to ensure we are not deleted
// before initializeBridge is called.
boost::bind(&QueueReplicator::initializeBridge, shared_from_this(), _1, _2)
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py
index 6477c6effd..0da5b47ac2 100755
--- a/qpid/cpp/src/tests/federation.py
+++ b/qpid/cpp/src/tests/federation.py
@@ -120,7 +120,8 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0, result)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", "", False, False, False, 0)
+ result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "",
+ "", False, False, False, 0, 0)
self.assertEqual(result.status, 0, result)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -199,7 +200,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0, result)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False, 0)
+ result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False, 0, 0)
self.assertEqual(result.status, 0, result)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -257,7 +258,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0, result)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1)
+ result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1, 0)
self.assertEqual(result.status, 0, result)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -310,7 +311,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0, result)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1)
+ result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1, 0)
self.assertEqual(result.status, 0, result)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -361,8 +362,8 @@ class FederationTests(TestBase010):
l_link = self.qmf.getObjects(_class="link", _broker=l_broker)[0]
r_link = self.qmf.getObjects(_class="link", _broker=r_broker)[0]
- l_res = l_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False, 0)
- r_res = r_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False, 0)
+ l_res = l_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False, 0, 0)
+ r_res = r_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False, 0, 0)
self.assertEqual(l_res.status, 0)
self.assertEqual(r_res.status, 0)
@@ -416,7 +417,7 @@ class FederationTests(TestBase010):
link = qmf.getObjects(_class="link")[0]
result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "my-bridge-id",
- "exclude-me,also-exclude-me", False, False, False, 0)
+ "exclude-me,also-exclude-me", False, False, False, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -474,7 +475,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.fanout", "fed.fanout", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.fanout", "fed.fanout", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
sleep(5)
@@ -519,7 +520,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.direct", "fed.direct", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.direct", "fed.direct", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
sleep(5)
@@ -563,7 +564,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
sleep(5)
@@ -615,9 +616,9 @@ class FederationTests(TestBase010):
queue2 = session.incoming("f2")
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.topic_reorigin", "fed.topic_reorigin", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.topic_reorigin", "fed.topic_reorigin", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
- result = link.bridge(False, "fed.topic_reorigin_2", "fed.topic_reorigin_2", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.topic_reorigin_2", "fed.topic_reorigin_2", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -687,9 +688,9 @@ class FederationTests(TestBase010):
queue2 = session.incoming("f2")
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.direct_reorigin", "fed.direct_reorigin", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.direct_reorigin", "fed.direct_reorigin", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
- result = link.bridge(False, "fed.direct_reorigin_2", "fed.direct_reorigin_2", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.direct_reorigin_2", "fed.direct_reorigin_2", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -750,7 +751,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.headers", "fed.headers", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.headers", "fed.headers", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
sleep(5)
@@ -803,9 +804,9 @@ class FederationTests(TestBase010):
queue2 = session.incoming("f2")
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.headers_reorigin", "fed.headers_reorigin", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.headers_reorigin", "fed.headers_reorigin", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
- result = link.bridge(False, "fed.headers_reorigin_2", "fed.headers_reorigin_2", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.headers_reorigin_2", "fed.headers_reorigin_2", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -859,7 +860,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.headers_unbind", "fed.headers_unbind", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.headers_unbind", "fed.headers_unbind", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
sleep(5)
@@ -904,7 +905,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.xml", "fed.xml", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.xml", "fed.xml", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -958,10 +959,10 @@ class FederationTests(TestBase010):
queue2 = session.incoming("f2")
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.xml_reorigin", "fed.xml_reorigin", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.xml_reorigin", "fed.xml_reorigin", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
- result = link.bridge(False, "fed.xml_reorigin_2", "fed.xml_reorigin_2", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.xml_reorigin_2", "fed.xml_reorigin_2", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -1016,7 +1017,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.xml_unbind", "fed.xml_unbind", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.xml_unbind", "fed.xml_unbind", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -1064,7 +1065,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True, 0)
+ result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True, 0, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
sleep(5)
@@ -1175,7 +1176,8 @@ class FederationTests(TestBase010):
False, # srcIsQueue
False, # srcIsLocal
True, # dynamic
- 0) # sync
+ 0, # sync
+ 0) # credit
self.assertEqual(result.status, 0)
# wait for the inter-broker links to become operational
@@ -1433,7 +1435,8 @@ class FederationTests(TestBase010):
False, # srcIsQueue
False, # srcIsLocal
True, # dynamic
- 0) # sync
+ 0, # sync
+ 0) # credit
self.assertEqual(result.status, 0)
# wait for the inter-broker links to become operational
@@ -1689,7 +1692,8 @@ class FederationTests(TestBase010):
False, # srcIsQueue
False, # srcIsLocal
True, # dynamic
- 0) # sync
+ 0, # sync
+ 0) # credit
self.assertEqual(result.status, 0)
# wait for the inter-broker links to become operational
@@ -2018,7 +2022,8 @@ class FederationTests(TestBase010):
False, # srcIsQueue
False, # srcIsLocal
True, # dynamic
- 0) # sync
+ 0, # sync
+ 0) # credit
self.assertEqual(result.status, 0)
# wait for all the inter-broker links to become operational
@@ -2106,7 +2111,8 @@ class FederationTests(TestBase010):
False, # srcIsQueue
False, # srcIsLocal
True, # dynamic
- 0) # sync
+ 0, # sync
+ 0) # credit
self.assertEqual(result.status, 0)
binding_counts = [2, 2]
@@ -2710,3 +2716,76 @@ class FederationTests(TestBase010):
self._teardown_brokers()
self.verify_cleanup()
+
+ def test_credit(self):
+ """ Test a federation link configured to use explict acks and a credit
+ limit
+ """
+ session = self.session
+
+ # setup queue on remote broker and add some messages
+ r_conn = self.connect(host=self.remote_host(), port=self.remote_port())
+ r_session = r_conn.session("test_credit")
+ r_session.queue_declare(queue="my-bridge-queue", auto_delete=True)
+
+ #setup queue to receive messages from local broker
+ session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="fed1", exchange="amq.fanout")
+ self.subscribe(queue="fed1", destination="f1")
+ queue = session.incoming("f1")
+
+ self.startQmf()
+ qmf = self.qmf
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0, result)
+
+ link = qmf.getObjects(_class="link")[0]
+
+ # now wait for Link to go operational
+ retries = 0
+ operational = False
+ while not operational:
+ link.update()
+ if link.state == "Operational":
+ operational = True;
+ if not operational:
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "inter-broker links failed to become operational.")
+ sleep(1)
+
+ # create the subscription
+ result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key",
+ "", "", True, False, False,
+ 3, # explicit ack, with sync every 3 msgs
+ 7) # msg credit
+ self.assertEqual(result.status, 0, result)
+ bridge = qmf.getObjects(_class="bridge")[0]
+
+ # generate enough traffic to trigger flow control and syncs
+ for i in range(1000):
+ dp = r_session.delivery_properties(routing_key="my-bridge-queue")
+ r_session.message_transfer(message=Message(dp, "Message %d" % i))
+
+ for i in range(1000):
+ try:
+ msg = queue.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+ except Empty:
+ self.fail("Failed to find expected message containing 'Message %d'" % i)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.body)
+ except Empty: None
+
+ result = bridge.close()
+ self.assertEqual(result.status, 0, result)
+ result = link.close()
+ self.assertEqual(result.status, 0, result)
+
+ r_session.close()
+ r_conn.close()
+
+ self.verify_cleanup()
+
diff --git a/qpid/cpp/src/tests/federation_sys.py b/qpid/cpp/src/tests/federation_sys.py
index e2553e4cf3..be9613bb9f 100755
--- a/qpid/cpp/src/tests/federation_sys.py
+++ b/qpid/cpp/src/tests/federation_sys.py
@@ -462,7 +462,7 @@ class QmfTestBase010(TestBase010):
if b is not None:
return b
# Does not exist, so create it
- self._check_qmf_return(qmf_link.bridge(bridge_durable_flag, src, dest, key, "", "", queue_route_type_flag, False, False, 1))
+ self._check_qmf_return(qmf_link.bridge(bridge_durable_flag, src, dest, key, "", "", queue_route_type_flag, False, False, 1, 0))
b = self._find_qmf_bridge(qmf_broker_proxy, qmf_link, src, dest, key)
self.assertNotEqual(b, None, "Bridge creation failed: src=%s dest=%s key=%s" % (src, dest, key))
return b
diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml
index 128d3b054c..58c6d59716 100644
--- a/qpid/specs/management-schema.xml
+++ b/qpid/specs/management-schema.xml
@@ -451,6 +451,7 @@
<arg name="srcIsLocal" dir="I" type="bool"/>
<arg name="dynamic" dir="I" type="bool"/>
<arg name="sync" dir="I" type="uint16"/>
+ <arg name="credit" dir="I" type="uint32" default="0xFFFFFFFF" desc="granted to peer, 0 = infinite"/>
</method>
</class>
@@ -474,6 +475,7 @@
<property name="excludes" type="sstr" access="RC"/>
<property name="dynamic" type="bool" access="RC"/>
<property name="sync" type="uint16" access="RC"/>
+ <property name="credit" type="uint32" access="RC"/>
<method name="close"/>
</class>
diff --git a/qpid/tools/src/py/qpid-route b/qpid/tools/src/py/qpid-route
index 7cf52e0a67..1d688de52c 100755
--- a/qpid/tools/src/py/qpid-route
+++ b/qpid/tools/src/py/qpid-route
@@ -60,6 +60,7 @@ class Config:
self._srclocal = False
self._transport = "tcp"
self._ack = 0
+ self._credit = 0xFFFFFFFF # unlimited
self._connTimeout = 10
self._conn_options = {}
@@ -93,6 +94,8 @@ def OptionsAndArguments(argv):
parser.add_option("-s", "--src-local", action="store_true", help="Make connection to source broker (push route)")
parser.add_option("--ack", action="store", type="int", metavar="<n>", help="Acknowledge transfers over the bridge in batches of N")
+ parser.add_option("--credit", action="store", type="int", default=0xFFFFFFFF, metavar="<msgs>",
+ help="Maximum number of messages a sender can have outstanding (0=unlimited)")
parser.add_option("-t", "--transport", action="store", type="string", default="tcp", metavar="<transport>", help="Transport to use for links, defaults to tcp")
parser.add_option("--client-sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). Used when the client connects to the destination broker (not for authentication between the source and destination brokers - that is specified using the [mechanisms] argument to 'add route'). SASL automatically picks the most secure available mechanism - use this option to override.")
@@ -136,6 +139,8 @@ def OptionsAndArguments(argv):
if opts.ack:
config._ack = opts.ack
+ config._credit = opts.credit
+
if opts.client_sasl_mechanism:
config._conn_options['mechanisms'] = opts.client_sasl_mechanism
@@ -327,7 +332,9 @@ class RouteManager:
if config._verbose:
print "Creating inter-broker binding..."
- res = link.bridge(config._durable, exchange, exchange, routingKey, tag, excludes, False, config._srclocal, dynamic, config._ack)
+ res = link.bridge(config._durable, exchange, exchange, routingKey, tag,
+ excludes, False, config._srclocal, dynamic,
+ config._ack, config._credit)
if res.status != 0:
raise Exception(res.text)
if config._verbose:
@@ -349,7 +356,8 @@ class RouteManager:
if config._verbose:
print "Creating inter-broker binding..."
- res = link.bridge(config._durable, queue, exchange, "", "", "", True, config._srclocal, False, config._ack)
+ res = link.bridge(config._durable, queue, exchange, "", "", "", True,
+ config._srclocal, False, config._ack, config._credit)
if res.status != 0:
raise Exception(res.text)
if config._verbose: