summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-02-14 15:05:35 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-02-14 15:05:35 +0000
commitfd96d1ca556ffdcec5fa78bb155130786ed06612 (patch)
tree1123561a984bdd80ec40c646dcb41bb08c3c8221
parentff1eed41b972f15ffc6c1a9121360a29e1947b1f (diff)
downloadqpid-python-fd96d1ca556ffdcec5fa78bb155130786ed06612.tar.gz
QPID-2935: make flow control enabled by default
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1070515 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h2
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp53
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.h6
-rw-r--r--qpid/cpp/src/tests/QueueFlowLimitTest.cpp101
-rw-r--r--qpid/cpp/src/tests/QueuePolicyTest.cpp3
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp5
7 files changed, 170 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 0cc150419f..1cf81dfcf6 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -31,6 +31,7 @@
#include "qpid/broker/TopicExchange.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/broker/QueueFlowLimit.h"
#include "qmf/org/apache/qpid/broker/Package.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
@@ -102,7 +103,9 @@ Broker::Options::Options(const std::string& name) :
requireEncrypted(false),
maxSessionRate(0),
asyncQueueEvents(false), // Must be false in a cluster.
- qmf2Support(false)
+ qmf2Support(false),
+ queueFlowStopRatio(80),
+ queueFlowResumeRatio(70)
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -134,7 +137,9 @@ Broker::Options::Options(const std::string& name) :
("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)")
("sasl-config", optValue(saslConfigPath, "FILE"), "gets sasl config from nonstandard location")
("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)")
- ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication");
+ ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication")
+ ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "%MESSAGES"), "Queue capacity level at which flow control is activated.")
+ ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "%MESSAGES"), "Queue capacity level at which flow control is de-activated.");
}
const std::string empty;
@@ -219,6 +224,7 @@ Broker::Broker(const Broker::Options& conf) :
}
QueuePolicy::setDefaultMaxSize(conf.queueLimit);
+ QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio);
queues.setQueueEvents(&queueEvents);
// Early-Initialize plugins
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 32e2c8ab6b..263cee51f5 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -115,6 +115,8 @@ public:
uint32_t maxSessionRate;
bool asyncQueueEvents;
bool qmf2Support;
+ uint queueFlowStopRatio; // producer flow control: on
+ uint queueFlowResumeRatio; // producer flow control: off
private:
std::string getHome();
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
index 6339085e4c..02a776fd70 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -219,7 +219,7 @@ void QueueFlowLimit::encode(Buffer& buffer) const
}
-void QueueFlowLimit::decode ( Buffer& buffer )
+void QueueFlowLimit::decode ( Buffer& buffer )
{
flowStopCount = buffer.getLong();
flowResumeCount = buffer.getLong();
@@ -244,21 +244,58 @@ const std::string QueueFlowLimit::flowStopCountKey("qpid.flow_stop_count");
const std::string QueueFlowLimit::flowResumeCountKey("qpid.flow_resume_count");
const std::string QueueFlowLimit::flowStopSizeKey("qpid.flow_stop_size");
const std::string QueueFlowLimit::flowResumeSizeKey("qpid.flow_resume_size");
+uint64_t QueueFlowLimit::defaultMaxSize;
+uint QueueFlowLimit::defaultFlowStopRatio;
+uint QueueFlowLimit::defaultFlowResumeRatio;
+
+
+void QueueFlowLimit::setDefaults(uint64_t maxQueueSize, uint flowStopRatio, uint flowResumeRatio)
+{
+ defaultMaxSize = maxQueueSize;
+ defaultFlowStopRatio = flowStopRatio;
+ defaultFlowResumeRatio = flowResumeRatio;
+
+ /** @todo Verify valid range on Broker::Options instead of here */
+ if (flowStopRatio > 100 || flowResumeRatio > 100)
+ throw InvalidArgumentException(QPID_MSG("Default queue flow ratios must be between 0 and 100, inclusive:"
+ << " flowStopRatio=" << flowStopRatio
+ << " flowResumeRatio=" << flowResumeRatio));
+ if (flowResumeRatio > flowStopRatio)
+ throw InvalidArgumentException(QPID_MSG("Default queue flow stop ratio must be >= flow resume ratio:"
+ << " flowStopRatio=" << flowStopRatio
+ << " flowResumeRatio=" << flowResumeRatio));
+}
std::auto_ptr<QueueFlowLimit> QueueFlowLimit::createQueueFlowLimit(Queue *queue, const qpid::framing::FieldTable& settings)
{
- uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0);
- uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0);
- uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0);
- uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0);
+ std::string type(QueuePolicy::getType(settings));
+
+ if (type == QueuePolicy::RING || type == QueuePolicy::RING_STRICT) {
+ // The size of a RING queue is limited by design - no need for flow control.
+ return std::auto_ptr<QueueFlowLimit>();
+ }
- if (flowStopCount || flowResumeCount || flowStopSize || flowResumeSize) {
+ if (settings.get(flowStopCountKey) || settings.get(flowStopSizeKey)) {
+ uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0);
+ uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0);
+ uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0);
+ uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0);
+ if (flowStopCount == 0 && flowStopSize == 0) { // disable flow control
+ return std::auto_ptr<QueueFlowLimit>();
+ }
return std::auto_ptr<QueueFlowLimit>(new QueueFlowLimit(queue, flowStopCount, flowResumeCount,
flowStopSize, flowResumeSize));
- } else {
- return std::auto_ptr<QueueFlowLimit>();
}
+
+ if (defaultFlowStopRatio) {
+ uint64_t maxByteCount = getCapacity(settings, QueuePolicy::maxSizeKey, defaultMaxSize);
+ uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5);
+ uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0));
+
+ return std::auto_ptr<QueueFlowLimit>(new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize));
+ }
+ return std::auto_ptr<QueueFlowLimit>();
}
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
index 32031cc0b1..8533854dee 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
@@ -52,6 +52,10 @@ namespace broker {
*/
class QueueFlowLimit
{
+ static uint64_t defaultMaxSize;
+ static uint defaultFlowStopRatio;
+ static uint defaultFlowResumeRatio;
+
Queue *queue;
std::string queueName;
@@ -92,6 +96,8 @@ class QueueFlowLimit
uint32_t encodedSize() const;
static QPID_BROKER_EXTERN std::auto_ptr<QueueFlowLimit> createQueueFlowLimit(Queue *queue, const qpid::framing::FieldTable& settings);
+ static QPID_BROKER_EXTERN void setDefaults(uint64_t defaultMaxSize, uint defaultFlowStopRatio, uint defaultFlowResumeRatio);
+
friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&, const QueueFlowLimit&);
protected:
diff --git a/qpid/cpp/src/tests/QueueFlowLimitTest.cpp b/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
index 99fa98cfbf..ab314f0b04 100644
--- a/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
+++ b/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
@@ -23,6 +23,7 @@
#include "unit_test.h"
#include "test_tools.h"
+#include "qpid/broker/QueuePolicy.h"
#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/reply_exceptions.h"
@@ -302,6 +303,106 @@ QPID_AUTO_TEST_CASE(testFlowCombo)
}
+QPID_AUTO_TEST_CASE(testFlowDefaultArgs)
+{
+ QueueFlowLimit::setDefaults(2950001, // max queue byte count
+ 80, // 80% stop threshold
+ 70); // 70% resume threshold
+ FieldTable args;
+ std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+
+ BOOST_CHECK_EQUAL((uint64_t) 2360001, flow->getFlowStopSize());
+ BOOST_CHECK_EQUAL((uint64_t) 2065000, flow->getFlowResumeSize());
+ BOOST_CHECK_EQUAL( 0, flow->getFlowStopCount());
+ BOOST_CHECK_EQUAL( 0, flow->getFlowResumeCount());
+ BOOST_CHECK(!flow->isFlowControlActive());
+ BOOST_CHECK(flow->monitorFlowControl());
+}
+
+
+QPID_AUTO_TEST_CASE(testFlowOverrideArgs)
+{
+ QueueFlowLimit::setDefaults(2950001, // max queue byte count
+ 80, // 80% stop threshold
+ 70); // 70% resume threshold
+ {
+ FieldTable args;
+ args.setInt(QueueFlowLimit::flowStopCountKey, 35000);
+ args.setInt(QueueFlowLimit::flowResumeCountKey, 30000);
+ std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+
+ BOOST_CHECK_EQUAL((uint32_t) 35000, flow->getFlowStopCount());
+ BOOST_CHECK_EQUAL((uint32_t) 30000, flow->getFlowResumeCount());
+ BOOST_CHECK_EQUAL((uint64_t) 0, flow->getFlowStopSize());
+ BOOST_CHECK_EQUAL((uint64_t) 0, flow->getFlowResumeSize());
+ BOOST_CHECK(!flow->isFlowControlActive());
+ BOOST_CHECK(flow->monitorFlowControl());
+ }
+ {
+ FieldTable args;
+ args.setInt(QueueFlowLimit::flowStopSizeKey, 350000);
+ args.setInt(QueueFlowLimit::flowResumeSizeKey, 300000);
+ std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+
+ BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowStopCount());
+ BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowResumeCount());
+ BOOST_CHECK_EQUAL((uint64_t) 350000, flow->getFlowStopSize());
+ BOOST_CHECK_EQUAL((uint64_t) 300000, flow->getFlowResumeSize());
+ BOOST_CHECK(!flow->isFlowControlActive());
+ BOOST_CHECK(flow->monitorFlowControl());
+ }
+ {
+ FieldTable args;
+ args.setInt(QueueFlowLimit::flowStopCountKey, 35000);
+ args.setInt(QueueFlowLimit::flowResumeCountKey, 30000);
+ args.setInt(QueueFlowLimit::flowStopSizeKey, 350000);
+ args.setInt(QueueFlowLimit::flowResumeSizeKey, 300000);
+ std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+
+ BOOST_CHECK_EQUAL((uint32_t) 35000, flow->getFlowStopCount());
+ BOOST_CHECK_EQUAL((uint32_t) 30000, flow->getFlowResumeCount());
+ BOOST_CHECK_EQUAL((uint64_t) 350000, flow->getFlowStopSize());
+ BOOST_CHECK_EQUAL((uint64_t) 300000, flow->getFlowResumeSize());
+ BOOST_CHECK(!flow->isFlowControlActive());
+ BOOST_CHECK(flow->monitorFlowControl());
+ }
+}
+
+
+QPID_AUTO_TEST_CASE(testFlowOverrideDefaults)
+{
+ QueueFlowLimit::setDefaults(2950001, // max queue byte count
+ 97, // stop threshold
+ 73); // resume threshold
+ FieldTable args;
+ std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+
+ BOOST_CHECK_EQUAL((uint32_t) 2861501, flow->getFlowStopSize());
+ BOOST_CHECK_EQUAL((uint32_t) 2153500, flow->getFlowResumeSize());
+ BOOST_CHECK(!flow->isFlowControlActive());
+ BOOST_CHECK(flow->monitorFlowControl());
+}
+
+
+QPID_AUTO_TEST_CASE(testFlowDisable)
+{
+ {
+ FieldTable args;
+ args.setInt(QueueFlowLimit::flowStopCountKey, 0);
+ std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+
+ BOOST_CHECK(!flow.get());
+ }
+ {
+ FieldTable args;
+ args.setInt(QueueFlowLimit::flowStopSizeKey, 0);
+ std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+
+ BOOST_CHECK(!flow.get());
+ }
+}
+
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
diff --git a/qpid/cpp/src/tests/QueuePolicyTest.cpp b/qpid/cpp/src/tests/QueuePolicyTest.cpp
index 90af9c7dd9..73fc494019 100644
--- a/qpid/cpp/src/tests/QueuePolicyTest.cpp
+++ b/qpid/cpp/src/tests/QueuePolicyTest.cpp
@@ -23,6 +23,7 @@
#include "test_tools.h"
#include "qpid/broker/QueuePolicy.h"
+#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/client/QueueOptions.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/reply_exceptions.h"
@@ -340,6 +341,8 @@ QPID_AUTO_TEST_CASE(testFlowToDiskWithNoStore)
//fallback to rejecting messages
QueueOptions args;
args.setSizePolicy(FLOW_TO_DISK, 0, 5);
+ // Disable flow control, or else we'll never hit the max limit
+ args.setInt(QueueFlowLimit::flowStopCountKey, 0);
ProxySessionFixture f;
std::string q("my-queue");
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index 80c69ac386..9b6e423b02 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -36,6 +36,9 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/broker/QueuePolicy.h"
+#include "qpid/broker/QueueFlowLimit.h"
+
#include <iostream>
#include "boost/format.hpp"
@@ -508,6 +511,8 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
client::QueueOptions args;
// set queue mode
args.setOrdering(client::LVQ);
+ // disable flow control, as this test violates the enqueue/dequeue sequence.
+ args.setInt(QueueFlowLimit::flowStopCountKey, 0);
Queue::shared_ptr queue(new Queue("my-queue", true ));
queue->configure(args);