diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-01-24 21:20:35 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-01-24 21:20:35 +0000 |
commit | 80321993fe0b59dbe33a29a81a6e40d3dc943544 (patch) | |
tree | 155f24d3119de983492fc99fd76d3e1c2594f86a | |
parent | e65c4741c7e80a3e2b0cf3f9ccff080312191785 (diff) | |
download | qpid-python-80321993fe0b59dbe33a29a81a6e40d3dc943544.tar.gz |
QPID-2935: provide producer flow control api to QueuePolicy
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1062986 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueuePolicy.cpp | 180 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueuePolicy.h | 48 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueuePolicyTest.cpp | 209 |
3 files changed, 384 insertions, 53 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp index f311ea8321..0cc0c514a7 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp @@ -29,33 +29,114 @@ using namespace qpid::broker; using namespace qpid::framing; -QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : - maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), name(_name) { - QPID_LOG(info, "Queue \"" << name << "\": Policy created: type=" << type << "; maxCount=" << maxCount << "; maxSize=" << maxSize); +namespace { + /** ensure that the configured flow control stop and resume values are + * valid with respect to the maximum queue capacity, and each other + */ + template <typename T> + void validateFlowConfig(T max, T& stop, T& resume, const std::string& type, const std::string& queue) + { + if (resume > stop) { + throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_resume_" << type + << "=" << resume + << " must be less than qpid.flow_stop_" << type + << "=" << stop)); + } + if (resume == 0) resume = stop; + if (max != 0 && (max < stop)) { + throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_stop_" << type + << "=" << stop + << " must be less than qpid.max_" << type + << "=" << max)); + } + } + + /** extract a capacity value as passing in an argument map + */ + uint64_t getCapacity(const FieldTable& settings, const std::string& key, uint64_t defaultValue) + { + FieldTable::ValuePtr v = settings.get(key); + + int64_t result = 0; + + if (!v) return defaultValue; + if (v->getType() == 0x23) { + QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>()); + } else if (v->getType() == 0x33) { + QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>()); + } else if (v->convertsTo<int64_t>()) { + result = v->get<int64_t>(); + QPID_LOG(debug, "Got integer value for " << key << ": " << result); + if (result >= 0) return result; + } else if (v->convertsTo<string>()) { + string s(v->get<string>()); + QPID_LOG(debug, "Got string value for " << key << ": " << s); + std::istringstream convert(s); + if (convert >> result && result >= 0) return result; + } + + QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer, using default (" << defaultValue << ")"); + return defaultValue; + } + + +} + +QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type, + uint32_t _flowStopCount, uint32_t _flowResumeCount, uint64_t _flowStopSize, uint64_t _flowResumeSize) + : maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), + flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount), + flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize), + flowStopped(false), name(_name) +{ + validateFlowConfig( maxCount, flowStopCount, flowResumeCount, "count", name ); + validateFlowConfig( maxSize, flowStopSize, flowResumeSize, "size", name ); + QPID_LOG(info, "Queue \"" << name << "\": Policy created: type=" << type << "; maxCount=" << maxCount << "; maxSize=" << maxSize + << "; flowStopCount=" << flowStopCount << "; flowResumeCount=" << flowResumeCount + << "; flowStopSize=" << flowStopSize << "; flowResumeSize=" << flowResumeSize ); } void QueuePolicy::enqueued(uint64_t _size) { - if (maxCount) ++count; - if (maxSize) size += _size; + if (maxCount || flowStopCount) { + ++count; + if (flowStopCount && !flowStopped && count > flowStopCount) { + flowStopped = true; + QPID_LOG(info, "Queue \"" << name << "\": has reached " << flowStopCount << " enqueued messages. Producer flow control activated." ); + } + } + + if (maxSize || flowStopSize) { + size += _size; + if (flowStopSize && !flowStopped && size > flowStopSize) { + flowStopped = true; + QPID_LOG(info, "Queue \"" << name << "\": has reached " << flowStopSize << " enqueued bytes. Producer flow control activated." ); + } + } } void QueuePolicy::dequeued(uint64_t _size) { - if (maxCount) { + if (maxCount || flowStopCount) { if (count > 0) { --count; } else { throw Exception(QPID_MSG("Attempted count underflow on dequeue(" << _size << "): " << *this)); } } - if (maxSize) { + if (maxSize || flowStopSize) { if (_size > size) { throw Exception(QPID_MSG("Attempted size underflow on dequeue(" << _size << "): " << *this)); } else { size -= _size; } } + if (flowStopped && + (flowResumeSize == 0 || size < flowResumeSize) && + (flowResumeCount == 0 || count < flowResumeCount)) { + flowStopped = false; + QPID_LOG(info, "Queue \"" << name << "\": has drained below the flow control resume level. Producer flow control deactivated." ); + } } bool QueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) @@ -116,32 +197,6 @@ void QueuePolicy::update(FieldTable& settings) settings.setString(typeKey, type); } -uint32_t QueuePolicy::getCapacity(const FieldTable& settings, const std::string& key, uint32_t defaultValue) -{ - FieldTable::ValuePtr v = settings.get(key); - - int32_t result = 0; - - if (!v) return defaultValue; - if (v->getType() == 0x23) { - QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>()); - } else if (v->getType() == 0x33) { - QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>()); - } else if (v->convertsTo<int>()) { - result = v->get<int>(); - QPID_LOG(debug, "Got integer value for " << key << ": " << result); - if (result >= 0) return result; - } else if (v->convertsTo<string>()) { - string s(v->get<string>()); - QPID_LOG(debug, "Got string value for " << key << ": " << s); - std::istringstream convert(s); - if (convert >> result && result >= 0) return result; - } - - QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer, using default (" << defaultValue << ")"); - return defaultValue; -} - std::string QueuePolicy::getType(const FieldTable& settings) { FieldTable::ValuePtr v = settings.get(typeKey); @@ -169,6 +224,10 @@ void QueuePolicy::encode(Buffer& buffer) const buffer.putLongLong(maxSize); buffer.putLong(count); buffer.putLongLong(size); + buffer.putLong(flowStopCount); + buffer.putLong(flowResumeCount); + buffer.putLongLong(flowStopSize); + buffer.putLongLong(flowResumeSize); } void QueuePolicy::decode ( Buffer& buffer ) @@ -177,6 +236,10 @@ void QueuePolicy::decode ( Buffer& buffer ) maxSize = buffer.getLongLong(); count = buffer.getLong(); size = buffer.getLongLong(); + flowStopCount = buffer.getLong(); + flowResumeCount = buffer.getLong(); + flowStopSize = buffer.getLongLong(); + flowResumeSize = buffer.getLongLong(); } @@ -184,7 +247,11 @@ uint32_t QueuePolicy::encodedSize() const { return sizeof(uint32_t) + // maxCount sizeof(uint64_t) + // maxSize sizeof(uint32_t) + // count - sizeof(uint64_t); // size + sizeof(uint64_t) + // size + sizeof(uint32_t) + // flowStopCount + sizeof(uint32_t) + // flowResumecount + sizeof(uint64_t) + // flowStopSize + sizeof(uint64_t); // flowResumeSize } @@ -192,14 +259,21 @@ uint32_t QueuePolicy::encodedSize() const { const std::string QueuePolicy::maxCountKey("qpid.max_count"); const std::string QueuePolicy::maxSizeKey("qpid.max_size"); const std::string QueuePolicy::typeKey("qpid.policy_type"); +const std::string QueuePolicy::flowStopCountKey("qpid.flow_stop_count"); +const std::string QueuePolicy::flowResumeCountKey("qpid.flow_resume_count"); +const std::string QueuePolicy::flowStopSizeKey("qpid.flow_stop_size"); +const std::string QueuePolicy::flowResumeSizeKey("qpid.flow_resume_size"); const std::string QueuePolicy::REJECT("reject"); const std::string QueuePolicy::FLOW_TO_DISK("flow_to_disk"); const std::string QueuePolicy::RING("ring"); const std::string QueuePolicy::RING_STRICT("ring_strict"); uint64_t QueuePolicy::defaultMaxSize(0); -FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize) : - QueuePolicy(_name, _maxCount, _maxSize, FLOW_TO_DISK) {} +FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, + uint32_t _flowStopCount, uint32_t _flowResumeCount, + uint64_t _flowStopSize, uint64_t _flowResumeSize) + : QueuePolicy(_name, _maxCount, _maxSize, FLOW_TO_DISK, + _flowStopCount, _flowResumeCount, _flowStopSize, _flowResumeSize) {} bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m) { @@ -208,8 +282,11 @@ bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m) } RingQueuePolicy::RingQueuePolicy(const std::string& _name, - uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : - QueuePolicy(_name, _maxCount, _maxSize, _type), strict(_type == RING_STRICT) {} + uint32_t _maxCount, uint64_t _maxSize, const std::string& _type, + uint32_t _flowStopCount, uint32_t _flowResumeCount, + uint64_t _flowStopSize, uint64_t _flowResumeSize) + : QueuePolicy(_name, _maxCount, _maxSize, _type, _flowStopCount, _flowResumeCount, + _flowStopSize, _flowResumeSize), strict(_type == RING_STRICT) {} bool before(const QueuedMessage& a, const QueuedMessage& b) { @@ -317,23 +394,34 @@ std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::F std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings) { uint32_t maxCount = getCapacity(settings, maxCountKey, 0); - uint32_t maxSize = getCapacity(settings, maxSizeKey, defaultMaxSize); - if (maxCount || maxSize) { - return createQueuePolicy(name, maxCount, maxSize, getType(settings)); + uint64_t maxSize = getCapacity(settings, maxSizeKey, defaultMaxSize); + uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0); + uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0); + uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0); + uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0); + + if (maxCount || maxSize || flowStopCount || flowResumeCount || flowStopSize || flowResumeSize) { + return createQueuePolicy(name, maxCount, maxSize, getType(settings), + flowStopCount, flowResumeCount, flowStopSize, flowResumeSize); } else { return std::auto_ptr<QueuePolicy>(); } } std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, - uint32_t maxCount, uint64_t maxSize, const std::string& type) + uint32_t maxCount, uint64_t maxSize, const std::string& type, + uint32_t flowStopCount, uint32_t flowResumeCount, + uint64_t flowStopSize, uint64_t flowResumeSize) { if (type == RING || type == RING_STRICT) { - return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(name, maxCount, maxSize, type)); + return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(name, maxCount, maxSize, type, + flowStopCount, flowResumeCount, flowStopSize, flowResumeSize)); } else if (type == FLOW_TO_DISK) { - return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(name, maxCount, maxSize)); + return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(name, maxCount, maxSize, + flowStopCount, flowResumeCount, flowStopSize, flowResumeSize)); } else { - return std::auto_ptr<QueuePolicy>(new QueuePolicy(name, maxCount, maxSize, type)); + return std::auto_ptr<QueuePolicy>(new QueuePolicy(name, maxCount, maxSize, type, + flowStopCount, flowResumeCount, flowStopSize, flowResumeSize)); } } @@ -349,6 +437,8 @@ std::ostream& operator<<(std::ostream& out, const QueuePolicy& p) if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count; else out << "count: unlimited"; out << "; type=" << p.type; + if (p.flowStopCount) out << "; flowStopCount=" << p.flowStopCount << ", flowResumeCount=" << p.flowResumeCount; + if (p.flowStopSize) out << "; flowStopSize=" << p.flowStopSize << ", flowResumeSize=" << p.flowResumeSize; return out; } diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.h b/qpid/cpp/src/qpid/broker/QueuePolicy.h index 3cdd63784d..d7a71bc3ba 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.h +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.h @@ -43,8 +43,20 @@ class QueuePolicy uint32_t count; uint64_t size; bool policyExceeded; - - static uint32_t getCapacity(const qpid::framing::FieldTable& settings, const std::string& key, uint32_t defaultValue); + + /** + * Producer flow control: when level is > flowStop*, flow control is ON. + * then level is < flowResume*, flow control is OFF. If == 0, flow control + * is not used. If both byte and msg count thresholds are set, then + * passing _either_ level may turn flow control ON, but _both_ must be + * below level before flow control will be turned OFF. + */ + uint32_t flowStopCount; + uint32_t flowResumeCount; + uint64_t flowStopSize; + uint64_t flowResumeSize; + bool flowStopped; // true = producers held in flow control + protected: uint64_t getCurrentQueueSize() const { return size; } @@ -54,10 +66,16 @@ class QueuePolicy static QPID_BROKER_EXTERN const std::string maxCountKey; static QPID_BROKER_EXTERN const std::string maxSizeKey; static QPID_BROKER_EXTERN const std::string typeKey; + static QPID_BROKER_EXTERN const std::string flowStopCountKey; + static QPID_BROKER_EXTERN const std::string flowResumeCountKey; + static QPID_BROKER_EXTERN const std::string flowStopSizeKey; + static QPID_BROKER_EXTERN const std::string flowResumeSizeKey; + + // Policy types: static QPID_BROKER_EXTERN const std::string REJECT; static QPID_BROKER_EXTERN const std::string FLOW_TO_DISK; static QPID_BROKER_EXTERN const std::string RING; - static QPID_BROKER_EXTERN const std::string RING_STRICT; + static QPID_BROKER_EXTERN const std::string RING_STRICT; virtual ~QueuePolicy() {} QPID_BROKER_EXTERN void tryEnqueue(boost::intrusive_ptr<Message> msg); @@ -68,14 +86,22 @@ class QueuePolicy virtual bool isEnqueued(const QueuedMessage&); QPID_BROKER_EXTERN void update(qpid::framing::FieldTable& settings); uint32_t getMaxCount() const { return maxCount; } - uint64_t getMaxSize() const { return maxSize; } + uint64_t getMaxSize() const { return maxSize; } + uint32_t getFlowStopCount() const { return flowStopCount; } + uint32_t getFlowResumeCount() const { return flowResumeCount; } + uint64_t getFlowStopSize() const { return flowStopSize; } + uint64_t getFlowResumeSize() const { return flowResumeSize; } + bool isFlowControlActive() const { return flowStopped; } + bool monitorFlowControl() const { return flowStopCount || flowStopSize; } void encode(framing::Buffer& buffer) const; void decode ( framing::Buffer& buffer ); uint32_t encodedSize() const; virtual void getPendingDequeues(Messages& result); static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings); - static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); + static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT, + uint32_t flowStopCount = 0, uint32_t flowResumeCount = 0, + uint64_t flowStopSize = 0, uint64_t flowResumeSize = 0); static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const qpid::framing::FieldTable& settings); static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); static std::string getType(const qpid::framing::FieldTable& settings); @@ -85,7 +111,9 @@ class QueuePolicy protected: const std::string name; - QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); + QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT, + uint32_t flowStopCount = 0, uint32_t flowResumeCount = 0, + uint64_t flowStopSize = 0, uint64_t flowResumeSize = 0); virtual bool checkLimit(boost::intrusive_ptr<Message> msg); void enqueued(uint64_t size); @@ -96,14 +124,18 @@ class QueuePolicy class FlowToDiskPolicy : public QueuePolicy { public: - FlowToDiskPolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize); + FlowToDiskPolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, + uint32_t flowStopCount = 0, uint32_t flowResumeCount = 0, + uint64_t flowStopSize = 0, uint64_t flowResumeSize = 0); bool checkLimit(boost::intrusive_ptr<Message> msg); }; class RingQueuePolicy : public QueuePolicy { public: - RingQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = RING); + RingQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = RING, + uint32_t flowStopCount = 0, uint32_t flowResumeCount = 0, + uint64_t flowStopSize = 0, uint64_t flowResumeSize = 0); void enqueued(const QueuedMessage&); void dequeued(const QueuedMessage&); bool isEnqueued(const QueuedMessage&); diff --git a/qpid/cpp/src/tests/QueuePolicyTest.cpp b/qpid/cpp/src/tests/QueuePolicyTest.cpp index 90af9c7dd9..641dfd2421 100644 --- a/qpid/cpp/src/tests/QueuePolicyTest.cpp +++ b/qpid/cpp/src/tests/QueuePolicyTest.cpp @@ -53,6 +53,8 @@ QPID_AUTO_TEST_CASE(testCount) BOOST_CHECK_EQUAL((uint64_t) 0, policy->getMaxSize()); BOOST_CHECK_EQUAL((uint32_t) 5, policy->getMaxCount()); + BOOST_CHECK(!policy->monitorFlowControl()); + QueuedMessage msg = createMessage(10); for (size_t i = 0; i < 5; i++) { policy->tryEnqueue(msg.payload); @@ -396,6 +398,213 @@ QPID_AUTO_TEST_CASE(testCapacityConversion) } catch (const ResourceLimitExceededException&) {} } + +QPID_AUTO_TEST_CASE(testFlowCount) +{ + std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 10, 0, QueuePolicy::REJECT, + 7, // flowStop + 5)); // flowResume + BOOST_CHECK_EQUAL((uint32_t) 7, policy->getFlowStopCount()); + BOOST_CHECK_EQUAL((uint32_t) 5, policy->getFlowResumeCount()); + BOOST_CHECK_EQUAL((uint32_t) 0, policy->getFlowStopSize()); + BOOST_CHECK_EQUAL((uint32_t) 0, policy->getFlowResumeSize()); + BOOST_CHECK(!policy->isFlowControlActive()); + BOOST_CHECK(policy->monitorFlowControl()); + + QueuedMessage msg = createMessage(10); + for (size_t i = 0; i < 6; i++) { + policy->tryEnqueue(msg.payload); + BOOST_CHECK(!policy->isFlowControlActive()); + } + BOOST_CHECK(!policy->isFlowControlActive()); // 6 on queue + policy->tryEnqueue(msg.payload); + BOOST_CHECK(!policy->isFlowControlActive()); // 7 on queue + + policy->tryEnqueue(msg.payload); + BOOST_CHECK(policy->isFlowControlActive()); // 8 on queue, ON + policy->tryEnqueue(msg.payload); + BOOST_CHECK(policy->isFlowControlActive()); // 9 on queue + + policy->dequeued(msg); + BOOST_CHECK(policy->isFlowControlActive()); // 8 on queue + policy->dequeued(msg); + BOOST_CHECK(policy->isFlowControlActive()); // 7 on queue + policy->dequeued(msg); + BOOST_CHECK(policy->isFlowControlActive()); // 6 on queue + policy->dequeued(msg); + BOOST_CHECK(policy->isFlowControlActive()); // 5 on queue + + policy->dequeued(msg); + BOOST_CHECK(!policy->isFlowControlActive()); // 4 on queue, OFF +} + + +QPID_AUTO_TEST_CASE(testFlowSize) +{ + std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 10, 0, QueuePolicy::REJECT, + 0, 0, // flow-Count + 70, // flowStopSize + 50)); // flowResumeSize + BOOST_CHECK_EQUAL((uint32_t) 0, policy->getFlowStopCount()); + BOOST_CHECK_EQUAL((uint32_t) 0, policy->getFlowResumeCount()); + BOOST_CHECK_EQUAL((uint32_t) 70, policy->getFlowStopSize()); + BOOST_CHECK_EQUAL((uint32_t) 50, policy->getFlowResumeSize()); + BOOST_CHECK(!policy->isFlowControlActive()); + BOOST_CHECK(policy->monitorFlowControl()); + + QueuedMessage msg = createMessage(10); + for (size_t i = 0; i < 6; i++) { + policy->tryEnqueue(msg.payload); + BOOST_CHECK(!policy->isFlowControlActive()); + } + BOOST_CHECK(!policy->isFlowControlActive()); // 60 on queue + policy->tryEnqueue(msg.payload); + BOOST_CHECK(!policy->isFlowControlActive()); // 70 on queue + + QueuedMessage tinyMsg = createMessage(1); + policy->tryEnqueue(tinyMsg.payload); + BOOST_CHECK(policy->isFlowControlActive()); // 71 on queue, ON + policy->tryEnqueue(msg.payload); + BOOST_CHECK(policy->isFlowControlActive()); // 81 on queue + + policy->dequeued(msg); + BOOST_CHECK(policy->isFlowControlActive()); // 71 on queue + policy->dequeued(msg); + BOOST_CHECK(policy->isFlowControlActive()); // 61 on queue + policy->dequeued(msg); + BOOST_CHECK(policy->isFlowControlActive()); // 51 on queue + + policy->dequeued(tinyMsg); + BOOST_CHECK(policy->isFlowControlActive()); // 50 on queue + policy->dequeued(tinyMsg); + BOOST_CHECK(!policy->isFlowControlActive()); // 49 on queue, OFF + policy->dequeued(msg); + BOOST_CHECK(!policy->isFlowControlActive()); // 39 on queue +} + +QPID_AUTO_TEST_CASE(testFlowArgs) +{ + FieldTable args; + const uint64_t stop(0x2FFFFFFFF); + const uint64_t resume(0x1FFFFFFFF); + args.setInt(QueuePolicy::flowStopCountKey, 30); + args.setInt(QueuePolicy::flowResumeCountKey, 21); + args.setUInt64(QueuePolicy::flowStopSizeKey, stop); + args.setUInt64(QueuePolicy::flowResumeSizeKey, resume); + args.setUInt64(QueuePolicy::maxSizeKey, stop + 1); // needed to pass stop < max validation + + std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", args)); + + BOOST_CHECK_EQUAL((uint32_t) 30, policy->getFlowStopCount()); + BOOST_CHECK_EQUAL((uint32_t) 21, policy->getFlowResumeCount()); + BOOST_CHECK_EQUAL(stop, policy->getFlowStopSize()); + BOOST_CHECK_EQUAL(resume, policy->getFlowResumeSize()); + BOOST_CHECK(!policy->isFlowControlActive()); + BOOST_CHECK(policy->monitorFlowControl()); +} + + +QPID_AUTO_TEST_CASE(testFlowCombo) +{ + FieldTable args; + args.setInt(QueuePolicy::flowStopCountKey, 10); + args.setInt(QueuePolicy::flowResumeCountKey, 5); + args.setUInt64(QueuePolicy::flowStopSizeKey, 200); + args.setUInt64(QueuePolicy::flowResumeSizeKey, 100); + + QueuedMessage msg_1 = createMessage(1); + QueuedMessage msg_10 = createMessage(10); + QueuedMessage msg_50 = createMessage(50); + QueuedMessage msg_100 = createMessage(100); + + std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", args)); + BOOST_CHECK(!policy->isFlowControlActive()); // count:0 size:0 + + // verify flow control comes ON when only count passes its stop point. + + for (size_t i = 0; i < 10; i++) { + policy->tryEnqueue(msg_10.payload); + BOOST_CHECK(!policy->isFlowControlActive()); + } + // count:10 size:100 + + policy->tryEnqueue(msg_1.payload); // count:11 size: 101 ->ON + BOOST_CHECK(policy->isFlowControlActive()); + + for (size_t i = 0; i < 6; i++) { + policy->dequeued(msg_10); + BOOST_CHECK(policy->isFlowControlActive()); + } + // count:5 size: 41 + + policy->dequeued(msg_1); // count: 4 size: 40 ->OFF + BOOST_CHECK(!policy->isFlowControlActive()); + + for (size_t i = 0; i < 4; i++) { + policy->dequeued(msg_10); + BOOST_CHECK(!policy->isFlowControlActive()); + } + // count:0 size:0 + + // verify flow control comes ON when only size passes its stop point. + + policy->tryEnqueue(msg_100.payload); // count:1 size: 100 + BOOST_CHECK(!policy->isFlowControlActive()); + + policy->tryEnqueue(msg_50.payload); // count:2 size: 150 + BOOST_CHECK(!policy->isFlowControlActive()); + + policy->tryEnqueue(msg_50.payload); // count:3 size: 200 + BOOST_CHECK(!policy->isFlowControlActive()); + + policy->tryEnqueue(msg_1.payload); // count:4 size: 201 ->ON + BOOST_CHECK(policy->isFlowControlActive()); + + policy->dequeued(msg_100); // count:3 size:101 + BOOST_CHECK(policy->isFlowControlActive()); + + policy->dequeued(msg_1); // count:2 size:100 + BOOST_CHECK(policy->isFlowControlActive()); + + policy->dequeued(msg_50); // count:1 size:50 ->OFF + BOOST_CHECK(!policy->isFlowControlActive()); + + // verify flow control remains ON until both thresholds drop below their + // resume point. + + for (size_t i = 0; i < 8; i++) { + policy->tryEnqueue(msg_10.payload); + BOOST_CHECK(!policy->isFlowControlActive()); + } + // count:9 size:130 + + policy->tryEnqueue(msg_10.payload); // count:10 size: 140 + BOOST_CHECK(!policy->isFlowControlActive()); + + policy->tryEnqueue(msg_1.payload); // count:11 size: 141 ->ON + BOOST_CHECK(policy->isFlowControlActive()); + + policy->tryEnqueue(msg_100.payload); // count:12 size: 241 (both thresholds crossed) + BOOST_CHECK(policy->isFlowControlActive()); + + // at this point: 9@10 + 1@50 + 1@100 + 1@1 == 12@241 + + policy->dequeued(msg_50); // count:11 size:191 + BOOST_CHECK(policy->isFlowControlActive()); + + for (size_t i = 0; i < 9; i++) { + policy->dequeued(msg_10); + BOOST_CHECK(policy->isFlowControlActive()); + } + // count:2 size:101 + policy->dequeued(msg_1); // count:1 size:100 + BOOST_CHECK(policy->isFlowControlActive()); // still active due to size + + policy->dequeued(msg_100); // count:0 size:0 ->OFF + BOOST_CHECK(!policy->isFlowControlActive()); +} + + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |