summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-01-31 19:35:54 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-01-31 19:35:54 +0000
commit14bb9643b6e62b80452c15802bc28687e717d3e0 (patch)
tree1b508a5fae62fd1bdb67fcd021239490134109e1
parentac2a5631cf47adb96953b3c171ed7f180534c4d2 (diff)
downloadqpid-python-14bb9643b6e62b80452c15802bc28687e717d3e0.tar.gz
remove changes to QueuePolicy in lieu of QueueFlowLimits
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1065724 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/QueuePolicy.cpp180
-rw-r--r--qpid/cpp/src/qpid/broker/QueuePolicy.h48
-rw-r--r--qpid/cpp/src/tests/QueuePolicyTest.cpp209
3 files changed, 53 insertions, 384 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
index 0cc0c514a7..f311ea8321 100644
--- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -29,114 +29,33 @@
using namespace qpid::broker;
using namespace qpid::framing;
-namespace {
- /** ensure that the configured flow control stop and resume values are
- * valid with respect to the maximum queue capacity, and each other
- */
- template <typename T>
- void validateFlowConfig(T max, T& stop, T& resume, const std::string& type, const std::string& queue)
- {
- if (resume > stop) {
- throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_resume_" << type
- << "=" << resume
- << " must be less than qpid.flow_stop_" << type
- << "=" << stop));
- }
- if (resume == 0) resume = stop;
- if (max != 0 && (max < stop)) {
- throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_stop_" << type
- << "=" << stop
- << " must be less than qpid.max_" << type
- << "=" << max));
- }
- }
-
- /** extract a capacity value as passing in an argument map
- */
- uint64_t getCapacity(const FieldTable& settings, const std::string& key, uint64_t defaultValue)
- {
- FieldTable::ValuePtr v = settings.get(key);
-
- int64_t result = 0;
-
- if (!v) return defaultValue;
- if (v->getType() == 0x23) {
- QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>());
- } else if (v->getType() == 0x33) {
- QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>());
- } else if (v->convertsTo<int64_t>()) {
- result = v->get<int64_t>();
- QPID_LOG(debug, "Got integer value for " << key << ": " << result);
- if (result >= 0) return result;
- } else if (v->convertsTo<string>()) {
- string s(v->get<string>());
- QPID_LOG(debug, "Got string value for " << key << ": " << s);
- std::istringstream convert(s);
- if (convert >> result && result >= 0) return result;
- }
-
- QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer, using default (" << defaultValue << ")");
- return defaultValue;
- }
-
-
-}
-
-QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type,
- uint32_t _flowStopCount, uint32_t _flowResumeCount, uint64_t _flowStopSize, uint64_t _flowResumeSize)
- : maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false),
- flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
- flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
- flowStopped(false), name(_name)
-{
- validateFlowConfig( maxCount, flowStopCount, flowResumeCount, "count", name );
- validateFlowConfig( maxSize, flowStopSize, flowResumeSize, "size", name );
- QPID_LOG(info, "Queue \"" << name << "\": Policy created: type=" << type << "; maxCount=" << maxCount << "; maxSize=" << maxSize
- << "; flowStopCount=" << flowStopCount << "; flowResumeCount=" << flowResumeCount
- << "; flowStopSize=" << flowStopSize << "; flowResumeSize=" << flowResumeSize );
+QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
+ maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), name(_name) {
+ QPID_LOG(info, "Queue \"" << name << "\": Policy created: type=" << type << "; maxCount=" << maxCount << "; maxSize=" << maxSize);
}
void QueuePolicy::enqueued(uint64_t _size)
{
- if (maxCount || flowStopCount) {
- ++count;
- if (flowStopCount && !flowStopped && count > flowStopCount) {
- flowStopped = true;
- QPID_LOG(info, "Queue \"" << name << "\": has reached " << flowStopCount << " enqueued messages. Producer flow control activated." );
- }
- }
-
- if (maxSize || flowStopSize) {
- size += _size;
- if (flowStopSize && !flowStopped && size > flowStopSize) {
- flowStopped = true;
- QPID_LOG(info, "Queue \"" << name << "\": has reached " << flowStopSize << " enqueued bytes. Producer flow control activated." );
- }
- }
+ if (maxCount) ++count;
+ if (maxSize) size += _size;
}
void QueuePolicy::dequeued(uint64_t _size)
{
- if (maxCount || flowStopCount) {
+ if (maxCount) {
if (count > 0) {
--count;
} else {
throw Exception(QPID_MSG("Attempted count underflow on dequeue(" << _size << "): " << *this));
}
}
- if (maxSize || flowStopSize) {
+ if (maxSize) {
if (_size > size) {
throw Exception(QPID_MSG("Attempted size underflow on dequeue(" << _size << "): " << *this));
} else {
size -= _size;
}
}
- if (flowStopped &&
- (flowResumeSize == 0 || size < flowResumeSize) &&
- (flowResumeCount == 0 || count < flowResumeCount)) {
- flowStopped = false;
- QPID_LOG(info, "Queue \"" << name << "\": has drained below the flow control resume level. Producer flow control deactivated." );
- }
}
bool QueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
@@ -197,6 +116,32 @@ void QueuePolicy::update(FieldTable& settings)
settings.setString(typeKey, type);
}
+uint32_t QueuePolicy::getCapacity(const FieldTable& settings, const std::string& key, uint32_t defaultValue)
+{
+ FieldTable::ValuePtr v = settings.get(key);
+
+ int32_t result = 0;
+
+ if (!v) return defaultValue;
+ if (v->getType() == 0x23) {
+ QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>());
+ } else if (v->getType() == 0x33) {
+ QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>());
+ } else if (v->convertsTo<int>()) {
+ result = v->get<int>();
+ QPID_LOG(debug, "Got integer value for " << key << ": " << result);
+ if (result >= 0) return result;
+ } else if (v->convertsTo<string>()) {
+ string s(v->get<string>());
+ QPID_LOG(debug, "Got string value for " << key << ": " << s);
+ std::istringstream convert(s);
+ if (convert >> result && result >= 0) return result;
+ }
+
+ QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer, using default (" << defaultValue << ")");
+ return defaultValue;
+}
+
std::string QueuePolicy::getType(const FieldTable& settings)
{
FieldTable::ValuePtr v = settings.get(typeKey);
@@ -224,10 +169,6 @@ void QueuePolicy::encode(Buffer& buffer) const
buffer.putLongLong(maxSize);
buffer.putLong(count);
buffer.putLongLong(size);
- buffer.putLong(flowStopCount);
- buffer.putLong(flowResumeCount);
- buffer.putLongLong(flowStopSize);
- buffer.putLongLong(flowResumeSize);
}
void QueuePolicy::decode ( Buffer& buffer )
@@ -236,10 +177,6 @@ void QueuePolicy::decode ( Buffer& buffer )
maxSize = buffer.getLongLong();
count = buffer.getLong();
size = buffer.getLongLong();
- flowStopCount = buffer.getLong();
- flowResumeCount = buffer.getLong();
- flowStopSize = buffer.getLongLong();
- flowResumeSize = buffer.getLongLong();
}
@@ -247,11 +184,7 @@ uint32_t QueuePolicy::encodedSize() const {
return sizeof(uint32_t) + // maxCount
sizeof(uint64_t) + // maxSize
sizeof(uint32_t) + // count
- sizeof(uint64_t) + // size
- sizeof(uint32_t) + // flowStopCount
- sizeof(uint32_t) + // flowResumecount
- sizeof(uint64_t) + // flowStopSize
- sizeof(uint64_t); // flowResumeSize
+ sizeof(uint64_t); // size
}
@@ -259,21 +192,14 @@ uint32_t QueuePolicy::encodedSize() const {
const std::string QueuePolicy::maxCountKey("qpid.max_count");
const std::string QueuePolicy::maxSizeKey("qpid.max_size");
const std::string QueuePolicy::typeKey("qpid.policy_type");
-const std::string QueuePolicy::flowStopCountKey("qpid.flow_stop_count");
-const std::string QueuePolicy::flowResumeCountKey("qpid.flow_resume_count");
-const std::string QueuePolicy::flowStopSizeKey("qpid.flow_stop_size");
-const std::string QueuePolicy::flowResumeSizeKey("qpid.flow_resume_size");
const std::string QueuePolicy::REJECT("reject");
const std::string QueuePolicy::FLOW_TO_DISK("flow_to_disk");
const std::string QueuePolicy::RING("ring");
const std::string QueuePolicy::RING_STRICT("ring_strict");
uint64_t QueuePolicy::defaultMaxSize(0);
-FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize,
- uint32_t _flowStopCount, uint32_t _flowResumeCount,
- uint64_t _flowStopSize, uint64_t _flowResumeSize)
- : QueuePolicy(_name, _maxCount, _maxSize, FLOW_TO_DISK,
- _flowStopCount, _flowResumeCount, _flowStopSize, _flowResumeSize) {}
+FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize) :
+ QueuePolicy(_name, _maxCount, _maxSize, FLOW_TO_DISK) {}
bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m)
{
@@ -282,11 +208,8 @@ bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m)
}
RingQueuePolicy::RingQueuePolicy(const std::string& _name,
- uint32_t _maxCount, uint64_t _maxSize, const std::string& _type,
- uint32_t _flowStopCount, uint32_t _flowResumeCount,
- uint64_t _flowStopSize, uint64_t _flowResumeSize)
- : QueuePolicy(_name, _maxCount, _maxSize, _type, _flowStopCount, _flowResumeCount,
- _flowStopSize, _flowResumeSize), strict(_type == RING_STRICT) {}
+ uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
+ QueuePolicy(_name, _maxCount, _maxSize, _type), strict(_type == RING_STRICT) {}
bool before(const QueuedMessage& a, const QueuedMessage& b)
{
@@ -394,34 +317,23 @@ std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::F
std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings)
{
uint32_t maxCount = getCapacity(settings, maxCountKey, 0);
- uint64_t maxSize = getCapacity(settings, maxSizeKey, defaultMaxSize);
- uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0);
- uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0);
- uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0);
- uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0);
-
- if (maxCount || maxSize || flowStopCount || flowResumeCount || flowStopSize || flowResumeSize) {
- return createQueuePolicy(name, maxCount, maxSize, getType(settings),
- flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
+ uint32_t maxSize = getCapacity(settings, maxSizeKey, defaultMaxSize);
+ if (maxCount || maxSize) {
+ return createQueuePolicy(name, maxCount, maxSize, getType(settings));
} else {
return std::auto_ptr<QueuePolicy>();
}
}
std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name,
- uint32_t maxCount, uint64_t maxSize, const std::string& type,
- uint32_t flowStopCount, uint32_t flowResumeCount,
- uint64_t flowStopSize, uint64_t flowResumeSize)
+ uint32_t maxCount, uint64_t maxSize, const std::string& type)
{
if (type == RING || type == RING_STRICT) {
- return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(name, maxCount, maxSize, type,
- flowStopCount, flowResumeCount, flowStopSize, flowResumeSize));
+ return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(name, maxCount, maxSize, type));
} else if (type == FLOW_TO_DISK) {
- return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(name, maxCount, maxSize,
- flowStopCount, flowResumeCount, flowStopSize, flowResumeSize));
+ return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(name, maxCount, maxSize));
} else {
- return std::auto_ptr<QueuePolicy>(new QueuePolicy(name, maxCount, maxSize, type,
- flowStopCount, flowResumeCount, flowStopSize, flowResumeSize));
+ return std::auto_ptr<QueuePolicy>(new QueuePolicy(name, maxCount, maxSize, type));
}
}
@@ -437,8 +349,6 @@ std::ostream& operator<<(std::ostream& out, const QueuePolicy& p)
if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count;
else out << "count: unlimited";
out << "; type=" << p.type;
- if (p.flowStopCount) out << "; flowStopCount=" << p.flowStopCount << ", flowResumeCount=" << p.flowResumeCount;
- if (p.flowStopSize) out << "; flowStopSize=" << p.flowStopSize << ", flowResumeSize=" << p.flowResumeSize;
return out;
}
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.h b/qpid/cpp/src/qpid/broker/QueuePolicy.h
index d7a71bc3ba..3cdd63784d 100644
--- a/qpid/cpp/src/qpid/broker/QueuePolicy.h
+++ b/qpid/cpp/src/qpid/broker/QueuePolicy.h
@@ -43,20 +43,8 @@ class QueuePolicy
uint32_t count;
uint64_t size;
bool policyExceeded;
-
- /**
- * Producer flow control: when level is > flowStop*, flow control is ON.
- * then level is < flowResume*, flow control is OFF. If == 0, flow control
- * is not used. If both byte and msg count thresholds are set, then
- * passing _either_ level may turn flow control ON, but _both_ must be
- * below level before flow control will be turned OFF.
- */
- uint32_t flowStopCount;
- uint32_t flowResumeCount;
- uint64_t flowStopSize;
- uint64_t flowResumeSize;
- bool flowStopped; // true = producers held in flow control
-
+
+ static uint32_t getCapacity(const qpid::framing::FieldTable& settings, const std::string& key, uint32_t defaultValue);
protected:
uint64_t getCurrentQueueSize() const { return size; }
@@ -66,16 +54,10 @@ class QueuePolicy
static QPID_BROKER_EXTERN const std::string maxCountKey;
static QPID_BROKER_EXTERN const std::string maxSizeKey;
static QPID_BROKER_EXTERN const std::string typeKey;
- static QPID_BROKER_EXTERN const std::string flowStopCountKey;
- static QPID_BROKER_EXTERN const std::string flowResumeCountKey;
- static QPID_BROKER_EXTERN const std::string flowStopSizeKey;
- static QPID_BROKER_EXTERN const std::string flowResumeSizeKey;
-
- // Policy types:
static QPID_BROKER_EXTERN const std::string REJECT;
static QPID_BROKER_EXTERN const std::string FLOW_TO_DISK;
static QPID_BROKER_EXTERN const std::string RING;
- static QPID_BROKER_EXTERN const std::string RING_STRICT;
+ static QPID_BROKER_EXTERN const std::string RING_STRICT;
virtual ~QueuePolicy() {}
QPID_BROKER_EXTERN void tryEnqueue(boost::intrusive_ptr<Message> msg);
@@ -86,22 +68,14 @@ class QueuePolicy
virtual bool isEnqueued(const QueuedMessage&);
QPID_BROKER_EXTERN void update(qpid::framing::FieldTable& settings);
uint32_t getMaxCount() const { return maxCount; }
- uint64_t getMaxSize() const { return maxSize; }
- uint32_t getFlowStopCount() const { return flowStopCount; }
- uint32_t getFlowResumeCount() const { return flowResumeCount; }
- uint64_t getFlowStopSize() const { return flowStopSize; }
- uint64_t getFlowResumeSize() const { return flowResumeSize; }
- bool isFlowControlActive() const { return flowStopped; }
- bool monitorFlowControl() const { return flowStopCount || flowStopSize; }
+ uint64_t getMaxSize() const { return maxSize; }
void encode(framing::Buffer& buffer) const;
void decode ( framing::Buffer& buffer );
uint32_t encodedSize() const;
virtual void getPendingDequeues(Messages& result);
static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings);
- static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT,
- uint32_t flowStopCount = 0, uint32_t flowResumeCount = 0,
- uint64_t flowStopSize = 0, uint64_t flowResumeSize = 0);
+ static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const qpid::framing::FieldTable& settings);
static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
static std::string getType(const qpid::framing::FieldTable& settings);
@@ -111,9 +85,7 @@ class QueuePolicy
protected:
const std::string name;
- QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT,
- uint32_t flowStopCount = 0, uint32_t flowResumeCount = 0,
- uint64_t flowStopSize = 0, uint64_t flowResumeSize = 0);
+ QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
virtual bool checkLimit(boost::intrusive_ptr<Message> msg);
void enqueued(uint64_t size);
@@ -124,18 +96,14 @@ class QueuePolicy
class FlowToDiskPolicy : public QueuePolicy
{
public:
- FlowToDiskPolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize,
- uint32_t flowStopCount = 0, uint32_t flowResumeCount = 0,
- uint64_t flowStopSize = 0, uint64_t flowResumeSize = 0);
+ FlowToDiskPolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize);
bool checkLimit(boost::intrusive_ptr<Message> msg);
};
class RingQueuePolicy : public QueuePolicy
{
public:
- RingQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = RING,
- uint32_t flowStopCount = 0, uint32_t flowResumeCount = 0,
- uint64_t flowStopSize = 0, uint64_t flowResumeSize = 0);
+ RingQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = RING);
void enqueued(const QueuedMessage&);
void dequeued(const QueuedMessage&);
bool isEnqueued(const QueuedMessage&);
diff --git a/qpid/cpp/src/tests/QueuePolicyTest.cpp b/qpid/cpp/src/tests/QueuePolicyTest.cpp
index 641dfd2421..90af9c7dd9 100644
--- a/qpid/cpp/src/tests/QueuePolicyTest.cpp
+++ b/qpid/cpp/src/tests/QueuePolicyTest.cpp
@@ -53,8 +53,6 @@ QPID_AUTO_TEST_CASE(testCount)
BOOST_CHECK_EQUAL((uint64_t) 0, policy->getMaxSize());
BOOST_CHECK_EQUAL((uint32_t) 5, policy->getMaxCount());
- BOOST_CHECK(!policy->monitorFlowControl());
-
QueuedMessage msg = createMessage(10);
for (size_t i = 0; i < 5; i++) {
policy->tryEnqueue(msg.payload);
@@ -398,213 +396,6 @@ QPID_AUTO_TEST_CASE(testCapacityConversion)
} catch (const ResourceLimitExceededException&) {}
}
-
-QPID_AUTO_TEST_CASE(testFlowCount)
-{
- std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 10, 0, QueuePolicy::REJECT,
- 7, // flowStop
- 5)); // flowResume
- BOOST_CHECK_EQUAL((uint32_t) 7, policy->getFlowStopCount());
- BOOST_CHECK_EQUAL((uint32_t) 5, policy->getFlowResumeCount());
- BOOST_CHECK_EQUAL((uint32_t) 0, policy->getFlowStopSize());
- BOOST_CHECK_EQUAL((uint32_t) 0, policy->getFlowResumeSize());
- BOOST_CHECK(!policy->isFlowControlActive());
- BOOST_CHECK(policy->monitorFlowControl());
-
- QueuedMessage msg = createMessage(10);
- for (size_t i = 0; i < 6; i++) {
- policy->tryEnqueue(msg.payload);
- BOOST_CHECK(!policy->isFlowControlActive());
- }
- BOOST_CHECK(!policy->isFlowControlActive()); // 6 on queue
- policy->tryEnqueue(msg.payload);
- BOOST_CHECK(!policy->isFlowControlActive()); // 7 on queue
-
- policy->tryEnqueue(msg.payload);
- BOOST_CHECK(policy->isFlowControlActive()); // 8 on queue, ON
- policy->tryEnqueue(msg.payload);
- BOOST_CHECK(policy->isFlowControlActive()); // 9 on queue
-
- policy->dequeued(msg);
- BOOST_CHECK(policy->isFlowControlActive()); // 8 on queue
- policy->dequeued(msg);
- BOOST_CHECK(policy->isFlowControlActive()); // 7 on queue
- policy->dequeued(msg);
- BOOST_CHECK(policy->isFlowControlActive()); // 6 on queue
- policy->dequeued(msg);
- BOOST_CHECK(policy->isFlowControlActive()); // 5 on queue
-
- policy->dequeued(msg);
- BOOST_CHECK(!policy->isFlowControlActive()); // 4 on queue, OFF
-}
-
-
-QPID_AUTO_TEST_CASE(testFlowSize)
-{
- std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 10, 0, QueuePolicy::REJECT,
- 0, 0, // flow-Count
- 70, // flowStopSize
- 50)); // flowResumeSize
- BOOST_CHECK_EQUAL((uint32_t) 0, policy->getFlowStopCount());
- BOOST_CHECK_EQUAL((uint32_t) 0, policy->getFlowResumeCount());
- BOOST_CHECK_EQUAL((uint32_t) 70, policy->getFlowStopSize());
- BOOST_CHECK_EQUAL((uint32_t) 50, policy->getFlowResumeSize());
- BOOST_CHECK(!policy->isFlowControlActive());
- BOOST_CHECK(policy->monitorFlowControl());
-
- QueuedMessage msg = createMessage(10);
- for (size_t i = 0; i < 6; i++) {
- policy->tryEnqueue(msg.payload);
- BOOST_CHECK(!policy->isFlowControlActive());
- }
- BOOST_CHECK(!policy->isFlowControlActive()); // 60 on queue
- policy->tryEnqueue(msg.payload);
- BOOST_CHECK(!policy->isFlowControlActive()); // 70 on queue
-
- QueuedMessage tinyMsg = createMessage(1);
- policy->tryEnqueue(tinyMsg.payload);
- BOOST_CHECK(policy->isFlowControlActive()); // 71 on queue, ON
- policy->tryEnqueue(msg.payload);
- BOOST_CHECK(policy->isFlowControlActive()); // 81 on queue
-
- policy->dequeued(msg);
- BOOST_CHECK(policy->isFlowControlActive()); // 71 on queue
- policy->dequeued(msg);
- BOOST_CHECK(policy->isFlowControlActive()); // 61 on queue
- policy->dequeued(msg);
- BOOST_CHECK(policy->isFlowControlActive()); // 51 on queue
-
- policy->dequeued(tinyMsg);
- BOOST_CHECK(policy->isFlowControlActive()); // 50 on queue
- policy->dequeued(tinyMsg);
- BOOST_CHECK(!policy->isFlowControlActive()); // 49 on queue, OFF
- policy->dequeued(msg);
- BOOST_CHECK(!policy->isFlowControlActive()); // 39 on queue
-}
-
-QPID_AUTO_TEST_CASE(testFlowArgs)
-{
- FieldTable args;
- const uint64_t stop(0x2FFFFFFFF);
- const uint64_t resume(0x1FFFFFFFF);
- args.setInt(QueuePolicy::flowStopCountKey, 30);
- args.setInt(QueuePolicy::flowResumeCountKey, 21);
- args.setUInt64(QueuePolicy::flowStopSizeKey, stop);
- args.setUInt64(QueuePolicy::flowResumeSizeKey, resume);
- args.setUInt64(QueuePolicy::maxSizeKey, stop + 1); // needed to pass stop < max validation
-
- std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", args));
-
- BOOST_CHECK_EQUAL((uint32_t) 30, policy->getFlowStopCount());
- BOOST_CHECK_EQUAL((uint32_t) 21, policy->getFlowResumeCount());
- BOOST_CHECK_EQUAL(stop, policy->getFlowStopSize());
- BOOST_CHECK_EQUAL(resume, policy->getFlowResumeSize());
- BOOST_CHECK(!policy->isFlowControlActive());
- BOOST_CHECK(policy->monitorFlowControl());
-}
-
-
-QPID_AUTO_TEST_CASE(testFlowCombo)
-{
- FieldTable args;
- args.setInt(QueuePolicy::flowStopCountKey, 10);
- args.setInt(QueuePolicy::flowResumeCountKey, 5);
- args.setUInt64(QueuePolicy::flowStopSizeKey, 200);
- args.setUInt64(QueuePolicy::flowResumeSizeKey, 100);
-
- QueuedMessage msg_1 = createMessage(1);
- QueuedMessage msg_10 = createMessage(10);
- QueuedMessage msg_50 = createMessage(50);
- QueuedMessage msg_100 = createMessage(100);
-
- std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", args));
- BOOST_CHECK(!policy->isFlowControlActive()); // count:0 size:0
-
- // verify flow control comes ON when only count passes its stop point.
-
- for (size_t i = 0; i < 10; i++) {
- policy->tryEnqueue(msg_10.payload);
- BOOST_CHECK(!policy->isFlowControlActive());
- }
- // count:10 size:100
-
- policy->tryEnqueue(msg_1.payload); // count:11 size: 101 ->ON
- BOOST_CHECK(policy->isFlowControlActive());
-
- for (size_t i = 0; i < 6; i++) {
- policy->dequeued(msg_10);
- BOOST_CHECK(policy->isFlowControlActive());
- }
- // count:5 size: 41
-
- policy->dequeued(msg_1); // count: 4 size: 40 ->OFF
- BOOST_CHECK(!policy->isFlowControlActive());
-
- for (size_t i = 0; i < 4; i++) {
- policy->dequeued(msg_10);
- BOOST_CHECK(!policy->isFlowControlActive());
- }
- // count:0 size:0
-
- // verify flow control comes ON when only size passes its stop point.
-
- policy->tryEnqueue(msg_100.payload); // count:1 size: 100
- BOOST_CHECK(!policy->isFlowControlActive());
-
- policy->tryEnqueue(msg_50.payload); // count:2 size: 150
- BOOST_CHECK(!policy->isFlowControlActive());
-
- policy->tryEnqueue(msg_50.payload); // count:3 size: 200
- BOOST_CHECK(!policy->isFlowControlActive());
-
- policy->tryEnqueue(msg_1.payload); // count:4 size: 201 ->ON
- BOOST_CHECK(policy->isFlowControlActive());
-
- policy->dequeued(msg_100); // count:3 size:101
- BOOST_CHECK(policy->isFlowControlActive());
-
- policy->dequeued(msg_1); // count:2 size:100
- BOOST_CHECK(policy->isFlowControlActive());
-
- policy->dequeued(msg_50); // count:1 size:50 ->OFF
- BOOST_CHECK(!policy->isFlowControlActive());
-
- // verify flow control remains ON until both thresholds drop below their
- // resume point.
-
- for (size_t i = 0; i < 8; i++) {
- policy->tryEnqueue(msg_10.payload);
- BOOST_CHECK(!policy->isFlowControlActive());
- }
- // count:9 size:130
-
- policy->tryEnqueue(msg_10.payload); // count:10 size: 140
- BOOST_CHECK(!policy->isFlowControlActive());
-
- policy->tryEnqueue(msg_1.payload); // count:11 size: 141 ->ON
- BOOST_CHECK(policy->isFlowControlActive());
-
- policy->tryEnqueue(msg_100.payload); // count:12 size: 241 (both thresholds crossed)
- BOOST_CHECK(policy->isFlowControlActive());
-
- // at this point: 9@10 + 1@50 + 1@100 + 1@1 == 12@241
-
- policy->dequeued(msg_50); // count:11 size:191
- BOOST_CHECK(policy->isFlowControlActive());
-
- for (size_t i = 0; i < 9; i++) {
- policy->dequeued(msg_10);
- BOOST_CHECK(policy->isFlowControlActive());
- }
- // count:2 size:101
- policy->dequeued(msg_1); // count:1 size:100
- BOOST_CHECK(policy->isFlowControlActive()); // still active due to size
-
- policy->dequeued(msg_100); // count:0 size:0 ->OFF
- BOOST_CHECK(!policy->isFlowControlActive());
-}
-
-
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests