diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-01-31 19:35:54 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-01-31 19:35:54 +0000 |
commit | 14bb9643b6e62b80452c15802bc28687e717d3e0 (patch) | |
tree | 1b508a5fae62fd1bdb67fcd021239490134109e1 | |
parent | ac2a5631cf47adb96953b3c171ed7f180534c4d2 (diff) | |
download | qpid-python-14bb9643b6e62b80452c15802bc28687e717d3e0.tar.gz |
remove changes to QueuePolicy in lieu of QueueFlowLimits
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1065724 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueuePolicy.cpp | 180 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueuePolicy.h | 48 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueuePolicyTest.cpp | 209 |
3 files changed, 53 insertions, 384 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp index 0cc0c514a7..f311ea8321 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp @@ -29,114 +29,33 @@ using namespace qpid::broker; using namespace qpid::framing; -namespace { - /** ensure that the configured flow control stop and resume values are - * valid with respect to the maximum queue capacity, and each other - */ - template <typename T> - void validateFlowConfig(T max, T& stop, T& resume, const std::string& type, const std::string& queue) - { - if (resume > stop) { - throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_resume_" << type - << "=" << resume - << " must be less than qpid.flow_stop_" << type - << "=" << stop)); - } - if (resume == 0) resume = stop; - if (max != 0 && (max < stop)) { - throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_stop_" << type - << "=" << stop - << " must be less than qpid.max_" << type - << "=" << max)); - } - } - - /** extract a capacity value as passing in an argument map - */ - uint64_t getCapacity(const FieldTable& settings, const std::string& key, uint64_t defaultValue) - { - FieldTable::ValuePtr v = settings.get(key); - - int64_t result = 0; - - if (!v) return defaultValue; - if (v->getType() == 0x23) { - QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>()); - } else if (v->getType() == 0x33) { - QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>()); - } else if (v->convertsTo<int64_t>()) { - result = v->get<int64_t>(); - QPID_LOG(debug, "Got integer value for " << key << ": " << result); - if (result >= 0) return result; - } else if (v->convertsTo<string>()) { - string s(v->get<string>()); - QPID_LOG(debug, "Got string value for " << key << ": " << s); - std::istringstream convert(s); - if (convert >> result && result >= 0) return result; - } - - QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer, using default (" << defaultValue << ")"); - return defaultValue; - } - - -} - -QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type, - uint32_t _flowStopCount, uint32_t _flowResumeCount, uint64_t _flowStopSize, uint64_t _flowResumeSize) - : maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), - flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount), - flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize), - flowStopped(false), name(_name) -{ - validateFlowConfig( maxCount, flowStopCount, flowResumeCount, "count", name ); - validateFlowConfig( maxSize, flowStopSize, flowResumeSize, "size", name ); - QPID_LOG(info, "Queue \"" << name << "\": Policy created: type=" << type << "; maxCount=" << maxCount << "; maxSize=" << maxSize - << "; flowStopCount=" << flowStopCount << "; flowResumeCount=" << flowResumeCount - << "; flowStopSize=" << flowStopSize << "; flowResumeSize=" << flowResumeSize ); +QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : + maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), name(_name) { + QPID_LOG(info, "Queue \"" << name << "\": Policy created: type=" << type << "; maxCount=" << maxCount << "; maxSize=" << maxSize); } void QueuePolicy::enqueued(uint64_t _size) { - if (maxCount || flowStopCount) { - ++count; - if (flowStopCount && !flowStopped && count > flowStopCount) { - flowStopped = true; - QPID_LOG(info, "Queue \"" << name << "\": has reached " << flowStopCount << " enqueued messages. Producer flow control activated." ); - } - } - - if (maxSize || flowStopSize) { - size += _size; - if (flowStopSize && !flowStopped && size > flowStopSize) { - flowStopped = true; - QPID_LOG(info, "Queue \"" << name << "\": has reached " << flowStopSize << " enqueued bytes. Producer flow control activated." ); - } - } + if (maxCount) ++count; + if (maxSize) size += _size; } void QueuePolicy::dequeued(uint64_t _size) { - if (maxCount || flowStopCount) { + if (maxCount) { if (count > 0) { --count; } else { throw Exception(QPID_MSG("Attempted count underflow on dequeue(" << _size << "): " << *this)); } } - if (maxSize || flowStopSize) { + if (maxSize) { if (_size > size) { throw Exception(QPID_MSG("Attempted size underflow on dequeue(" << _size << "): " << *this)); } else { size -= _size; } } - if (flowStopped && - (flowResumeSize == 0 || size < flowResumeSize) && - (flowResumeCount == 0 || count < flowResumeCount)) { - flowStopped = false; - QPID_LOG(info, "Queue \"" << name << "\": has drained below the flow control resume level. Producer flow control deactivated." ); - } } bool QueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) @@ -197,6 +116,32 @@ void QueuePolicy::update(FieldTable& settings) settings.setString(typeKey, type); } +uint32_t QueuePolicy::getCapacity(const FieldTable& settings, const std::string& key, uint32_t defaultValue) +{ + FieldTable::ValuePtr v = settings.get(key); + + int32_t result = 0; + + if (!v) return defaultValue; + if (v->getType() == 0x23) { + QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>()); + } else if (v->getType() == 0x33) { + QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>()); + } else if (v->convertsTo<int>()) { + result = v->get<int>(); + QPID_LOG(debug, "Got integer value for " << key << ": " << result); + if (result >= 0) return result; + } else if (v->convertsTo<string>()) { + string s(v->get<string>()); + QPID_LOG(debug, "Got string value for " << key << ": " << s); + std::istringstream convert(s); + if (convert >> result && result >= 0) return result; + } + + QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer, using default (" << defaultValue << ")"); + return defaultValue; +} + std::string QueuePolicy::getType(const FieldTable& settings) { FieldTable::ValuePtr v = settings.get(typeKey); @@ -224,10 +169,6 @@ void QueuePolicy::encode(Buffer& buffer) const buffer.putLongLong(maxSize); buffer.putLong(count); buffer.putLongLong(size); - buffer.putLong(flowStopCount); - buffer.putLong(flowResumeCount); - buffer.putLongLong(flowStopSize); - buffer.putLongLong(flowResumeSize); } void QueuePolicy::decode ( Buffer& buffer ) @@ -236,10 +177,6 @@ void QueuePolicy::decode ( Buffer& buffer ) maxSize = buffer.getLongLong(); count = buffer.getLong(); size = buffer.getLongLong(); - flowStopCount = buffer.getLong(); - flowResumeCount = buffer.getLong(); - flowStopSize = buffer.getLongLong(); - flowResumeSize = buffer.getLongLong(); } @@ -247,11 +184,7 @@ uint32_t QueuePolicy::encodedSize() const { return sizeof(uint32_t) + // maxCount sizeof(uint64_t) + // maxSize sizeof(uint32_t) + // count - sizeof(uint64_t) + // size - sizeof(uint32_t) + // flowStopCount - sizeof(uint32_t) + // flowResumecount - sizeof(uint64_t) + // flowStopSize - sizeof(uint64_t); // flowResumeSize + sizeof(uint64_t); // size } @@ -259,21 +192,14 @@ uint32_t QueuePolicy::encodedSize() const { const std::string QueuePolicy::maxCountKey("qpid.max_count"); const std::string QueuePolicy::maxSizeKey("qpid.max_size"); const std::string QueuePolicy::typeKey("qpid.policy_type"); -const std::string QueuePolicy::flowStopCountKey("qpid.flow_stop_count"); -const std::string QueuePolicy::flowResumeCountKey("qpid.flow_resume_count"); -const std::string QueuePolicy::flowStopSizeKey("qpid.flow_stop_size"); -const std::string QueuePolicy::flowResumeSizeKey("qpid.flow_resume_size"); const std::string QueuePolicy::REJECT("reject"); const std::string QueuePolicy::FLOW_TO_DISK("flow_to_disk"); const std::string QueuePolicy::RING("ring"); const std::string QueuePolicy::RING_STRICT("ring_strict"); uint64_t QueuePolicy::defaultMaxSize(0); -FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, - uint32_t _flowStopCount, uint32_t _flowResumeCount, - uint64_t _flowStopSize, uint64_t _flowResumeSize) - : QueuePolicy(_name, _maxCount, _maxSize, FLOW_TO_DISK, - _flowStopCount, _flowResumeCount, _flowStopSize, _flowResumeSize) {} +FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize) : + QueuePolicy(_name, _maxCount, _maxSize, FLOW_TO_DISK) {} bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m) { @@ -282,11 +208,8 @@ bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m) } RingQueuePolicy::RingQueuePolicy(const std::string& _name, - uint32_t _maxCount, uint64_t _maxSize, const std::string& _type, - uint32_t _flowStopCount, uint32_t _flowResumeCount, - uint64_t _flowStopSize, uint64_t _flowResumeSize) - : QueuePolicy(_name, _maxCount, _maxSize, _type, _flowStopCount, _flowResumeCount, - _flowStopSize, _flowResumeSize), strict(_type == RING_STRICT) {} + uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : + QueuePolicy(_name, _maxCount, _maxSize, _type), strict(_type == RING_STRICT) {} bool before(const QueuedMessage& a, const QueuedMessage& b) { @@ -394,34 +317,23 @@ std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::F std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings) { uint32_t maxCount = getCapacity(settings, maxCountKey, 0); - uint64_t maxSize = getCapacity(settings, maxSizeKey, defaultMaxSize); - uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0); - uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0); - uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0); - uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0); - - if (maxCount || maxSize || flowStopCount || flowResumeCount || flowStopSize || flowResumeSize) { - return createQueuePolicy(name, maxCount, maxSize, getType(settings), - flowStopCount, flowResumeCount, flowStopSize, flowResumeSize); + uint32_t maxSize = getCapacity(settings, maxSizeKey, defaultMaxSize); + if (maxCount || maxSize) { + return createQueuePolicy(name, maxCount, maxSize, getType(settings)); } else { return std::auto_ptr<QueuePolicy>(); } } std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, - uint32_t maxCount, uint64_t maxSize, const std::string& type, - uint32_t flowStopCount, uint32_t flowResumeCount, - uint64_t flowStopSize, uint64_t flowResumeSize) + uint32_t maxCount, uint64_t maxSize, const std::string& type) { if (type == RING || type == RING_STRICT) { - return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(name, maxCount, maxSize, type, - flowStopCount, flowResumeCount, flowStopSize, flowResumeSize)); + return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(name, maxCount, maxSize, type)); } else if (type == FLOW_TO_DISK) { - return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(name, maxCount, maxSize, - flowStopCount, flowResumeCount, flowStopSize, flowResumeSize)); + return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(name, maxCount, maxSize)); } else { - return std::auto_ptr<QueuePolicy>(new QueuePolicy(name, maxCount, maxSize, type, - flowStopCount, flowResumeCount, flowStopSize, flowResumeSize)); + return std::auto_ptr<QueuePolicy>(new QueuePolicy(name, maxCount, maxSize, type)); } } @@ -437,8 +349,6 @@ std::ostream& operator<<(std::ostream& out, const QueuePolicy& p) if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count; else out << "count: unlimited"; out << "; type=" << p.type; - if (p.flowStopCount) out << "; flowStopCount=" << p.flowStopCount << ", flowResumeCount=" << p.flowResumeCount; - if (p.flowStopSize) out << "; flowStopSize=" << p.flowStopSize << ", flowResumeSize=" << p.flowResumeSize; return out; } diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.h b/qpid/cpp/src/qpid/broker/QueuePolicy.h index d7a71bc3ba..3cdd63784d 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.h +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.h @@ -43,20 +43,8 @@ class QueuePolicy uint32_t count; uint64_t size; bool policyExceeded; - - /** - * Producer flow control: when level is > flowStop*, flow control is ON. - * then level is < flowResume*, flow control is OFF. If == 0, flow control - * is not used. If both byte and msg count thresholds are set, then - * passing _either_ level may turn flow control ON, but _both_ must be - * below level before flow control will be turned OFF. - */ - uint32_t flowStopCount; - uint32_t flowResumeCount; - uint64_t flowStopSize; - uint64_t flowResumeSize; - bool flowStopped; // true = producers held in flow control - + + static uint32_t getCapacity(const qpid::framing::FieldTable& settings, const std::string& key, uint32_t defaultValue); protected: uint64_t getCurrentQueueSize() const { return size; } @@ -66,16 +54,10 @@ class QueuePolicy static QPID_BROKER_EXTERN const std::string maxCountKey; static QPID_BROKER_EXTERN const std::string maxSizeKey; static QPID_BROKER_EXTERN const std::string typeKey; - static QPID_BROKER_EXTERN const std::string flowStopCountKey; - static QPID_BROKER_EXTERN const std::string flowResumeCountKey; - static QPID_BROKER_EXTERN const std::string flowStopSizeKey; - static QPID_BROKER_EXTERN const std::string flowResumeSizeKey; - - // Policy types: static QPID_BROKER_EXTERN const std::string REJECT; static QPID_BROKER_EXTERN const std::string FLOW_TO_DISK; static QPID_BROKER_EXTERN const std::string RING; - static QPID_BROKER_EXTERN const std::string RING_STRICT; + static QPID_BROKER_EXTERN const std::string RING_STRICT; virtual ~QueuePolicy() {} QPID_BROKER_EXTERN void tryEnqueue(boost::intrusive_ptr<Message> msg); @@ -86,22 +68,14 @@ class QueuePolicy virtual bool isEnqueued(const QueuedMessage&); QPID_BROKER_EXTERN void update(qpid::framing::FieldTable& settings); uint32_t getMaxCount() const { return maxCount; } - uint64_t getMaxSize() const { return maxSize; } - uint32_t getFlowStopCount() const { return flowStopCount; } - uint32_t getFlowResumeCount() const { return flowResumeCount; } - uint64_t getFlowStopSize() const { return flowStopSize; } - uint64_t getFlowResumeSize() const { return flowResumeSize; } - bool isFlowControlActive() const { return flowStopped; } - bool monitorFlowControl() const { return flowStopCount || flowStopSize; } + uint64_t getMaxSize() const { return maxSize; } void encode(framing::Buffer& buffer) const; void decode ( framing::Buffer& buffer ); uint32_t encodedSize() const; virtual void getPendingDequeues(Messages& result); static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings); - static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT, - uint32_t flowStopCount = 0, uint32_t flowResumeCount = 0, - uint64_t flowStopSize = 0, uint64_t flowResumeSize = 0); + static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const qpid::framing::FieldTable& settings); static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); static std::string getType(const qpid::framing::FieldTable& settings); @@ -111,9 +85,7 @@ class QueuePolicy protected: const std::string name; - QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT, - uint32_t flowStopCount = 0, uint32_t flowResumeCount = 0, - uint64_t flowStopSize = 0, uint64_t flowResumeSize = 0); + QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); virtual bool checkLimit(boost::intrusive_ptr<Message> msg); void enqueued(uint64_t size); @@ -124,18 +96,14 @@ class QueuePolicy class FlowToDiskPolicy : public QueuePolicy { public: - FlowToDiskPolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, - uint32_t flowStopCount = 0, uint32_t flowResumeCount = 0, - uint64_t flowStopSize = 0, uint64_t flowResumeSize = 0); + FlowToDiskPolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize); bool checkLimit(boost::intrusive_ptr<Message> msg); }; class RingQueuePolicy : public QueuePolicy { public: - RingQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = RING, - uint32_t flowStopCount = 0, uint32_t flowResumeCount = 0, - uint64_t flowStopSize = 0, uint64_t flowResumeSize = 0); + RingQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = RING); void enqueued(const QueuedMessage&); void dequeued(const QueuedMessage&); bool isEnqueued(const QueuedMessage&); diff --git a/qpid/cpp/src/tests/QueuePolicyTest.cpp b/qpid/cpp/src/tests/QueuePolicyTest.cpp index 641dfd2421..90af9c7dd9 100644 --- a/qpid/cpp/src/tests/QueuePolicyTest.cpp +++ b/qpid/cpp/src/tests/QueuePolicyTest.cpp @@ -53,8 +53,6 @@ QPID_AUTO_TEST_CASE(testCount) BOOST_CHECK_EQUAL((uint64_t) 0, policy->getMaxSize()); BOOST_CHECK_EQUAL((uint32_t) 5, policy->getMaxCount()); - BOOST_CHECK(!policy->monitorFlowControl()); - QueuedMessage msg = createMessage(10); for (size_t i = 0; i < 5; i++) { policy->tryEnqueue(msg.payload); @@ -398,213 +396,6 @@ QPID_AUTO_TEST_CASE(testCapacityConversion) } catch (const ResourceLimitExceededException&) {} } - -QPID_AUTO_TEST_CASE(testFlowCount) -{ - std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 10, 0, QueuePolicy::REJECT, - 7, // flowStop - 5)); // flowResume - BOOST_CHECK_EQUAL((uint32_t) 7, policy->getFlowStopCount()); - BOOST_CHECK_EQUAL((uint32_t) 5, policy->getFlowResumeCount()); - BOOST_CHECK_EQUAL((uint32_t) 0, policy->getFlowStopSize()); - BOOST_CHECK_EQUAL((uint32_t) 0, policy->getFlowResumeSize()); - BOOST_CHECK(!policy->isFlowControlActive()); - BOOST_CHECK(policy->monitorFlowControl()); - - QueuedMessage msg = createMessage(10); - for (size_t i = 0; i < 6; i++) { - policy->tryEnqueue(msg.payload); - BOOST_CHECK(!policy->isFlowControlActive()); - } - BOOST_CHECK(!policy->isFlowControlActive()); // 6 on queue - policy->tryEnqueue(msg.payload); - BOOST_CHECK(!policy->isFlowControlActive()); // 7 on queue - - policy->tryEnqueue(msg.payload); - BOOST_CHECK(policy->isFlowControlActive()); // 8 on queue, ON - policy->tryEnqueue(msg.payload); - BOOST_CHECK(policy->isFlowControlActive()); // 9 on queue - - policy->dequeued(msg); - BOOST_CHECK(policy->isFlowControlActive()); // 8 on queue - policy->dequeued(msg); - BOOST_CHECK(policy->isFlowControlActive()); // 7 on queue - policy->dequeued(msg); - BOOST_CHECK(policy->isFlowControlActive()); // 6 on queue - policy->dequeued(msg); - BOOST_CHECK(policy->isFlowControlActive()); // 5 on queue - - policy->dequeued(msg); - BOOST_CHECK(!policy->isFlowControlActive()); // 4 on queue, OFF -} - - -QPID_AUTO_TEST_CASE(testFlowSize) -{ - std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 10, 0, QueuePolicy::REJECT, - 0, 0, // flow-Count - 70, // flowStopSize - 50)); // flowResumeSize - BOOST_CHECK_EQUAL((uint32_t) 0, policy->getFlowStopCount()); - BOOST_CHECK_EQUAL((uint32_t) 0, policy->getFlowResumeCount()); - BOOST_CHECK_EQUAL((uint32_t) 70, policy->getFlowStopSize()); - BOOST_CHECK_EQUAL((uint32_t) 50, policy->getFlowResumeSize()); - BOOST_CHECK(!policy->isFlowControlActive()); - BOOST_CHECK(policy->monitorFlowControl()); - - QueuedMessage msg = createMessage(10); - for (size_t i = 0; i < 6; i++) { - policy->tryEnqueue(msg.payload); - BOOST_CHECK(!policy->isFlowControlActive()); - } - BOOST_CHECK(!policy->isFlowControlActive()); // 60 on queue - policy->tryEnqueue(msg.payload); - BOOST_CHECK(!policy->isFlowControlActive()); // 70 on queue - - QueuedMessage tinyMsg = createMessage(1); - policy->tryEnqueue(tinyMsg.payload); - BOOST_CHECK(policy->isFlowControlActive()); // 71 on queue, ON - policy->tryEnqueue(msg.payload); - BOOST_CHECK(policy->isFlowControlActive()); // 81 on queue - - policy->dequeued(msg); - BOOST_CHECK(policy->isFlowControlActive()); // 71 on queue - policy->dequeued(msg); - BOOST_CHECK(policy->isFlowControlActive()); // 61 on queue - policy->dequeued(msg); - BOOST_CHECK(policy->isFlowControlActive()); // 51 on queue - - policy->dequeued(tinyMsg); - BOOST_CHECK(policy->isFlowControlActive()); // 50 on queue - policy->dequeued(tinyMsg); - BOOST_CHECK(!policy->isFlowControlActive()); // 49 on queue, OFF - policy->dequeued(msg); - BOOST_CHECK(!policy->isFlowControlActive()); // 39 on queue -} - -QPID_AUTO_TEST_CASE(testFlowArgs) -{ - FieldTable args; - const uint64_t stop(0x2FFFFFFFF); - const uint64_t resume(0x1FFFFFFFF); - args.setInt(QueuePolicy::flowStopCountKey, 30); - args.setInt(QueuePolicy::flowResumeCountKey, 21); - args.setUInt64(QueuePolicy::flowStopSizeKey, stop); - args.setUInt64(QueuePolicy::flowResumeSizeKey, resume); - args.setUInt64(QueuePolicy::maxSizeKey, stop + 1); // needed to pass stop < max validation - - std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", args)); - - BOOST_CHECK_EQUAL((uint32_t) 30, policy->getFlowStopCount()); - BOOST_CHECK_EQUAL((uint32_t) 21, policy->getFlowResumeCount()); - BOOST_CHECK_EQUAL(stop, policy->getFlowStopSize()); - BOOST_CHECK_EQUAL(resume, policy->getFlowResumeSize()); - BOOST_CHECK(!policy->isFlowControlActive()); - BOOST_CHECK(policy->monitorFlowControl()); -} - - -QPID_AUTO_TEST_CASE(testFlowCombo) -{ - FieldTable args; - args.setInt(QueuePolicy::flowStopCountKey, 10); - args.setInt(QueuePolicy::flowResumeCountKey, 5); - args.setUInt64(QueuePolicy::flowStopSizeKey, 200); - args.setUInt64(QueuePolicy::flowResumeSizeKey, 100); - - QueuedMessage msg_1 = createMessage(1); - QueuedMessage msg_10 = createMessage(10); - QueuedMessage msg_50 = createMessage(50); - QueuedMessage msg_100 = createMessage(100); - - std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", args)); - BOOST_CHECK(!policy->isFlowControlActive()); // count:0 size:0 - - // verify flow control comes ON when only count passes its stop point. - - for (size_t i = 0; i < 10; i++) { - policy->tryEnqueue(msg_10.payload); - BOOST_CHECK(!policy->isFlowControlActive()); - } - // count:10 size:100 - - policy->tryEnqueue(msg_1.payload); // count:11 size: 101 ->ON - BOOST_CHECK(policy->isFlowControlActive()); - - for (size_t i = 0; i < 6; i++) { - policy->dequeued(msg_10); - BOOST_CHECK(policy->isFlowControlActive()); - } - // count:5 size: 41 - - policy->dequeued(msg_1); // count: 4 size: 40 ->OFF - BOOST_CHECK(!policy->isFlowControlActive()); - - for (size_t i = 0; i < 4; i++) { - policy->dequeued(msg_10); - BOOST_CHECK(!policy->isFlowControlActive()); - } - // count:0 size:0 - - // verify flow control comes ON when only size passes its stop point. - - policy->tryEnqueue(msg_100.payload); // count:1 size: 100 - BOOST_CHECK(!policy->isFlowControlActive()); - - policy->tryEnqueue(msg_50.payload); // count:2 size: 150 - BOOST_CHECK(!policy->isFlowControlActive()); - - policy->tryEnqueue(msg_50.payload); // count:3 size: 200 - BOOST_CHECK(!policy->isFlowControlActive()); - - policy->tryEnqueue(msg_1.payload); // count:4 size: 201 ->ON - BOOST_CHECK(policy->isFlowControlActive()); - - policy->dequeued(msg_100); // count:3 size:101 - BOOST_CHECK(policy->isFlowControlActive()); - - policy->dequeued(msg_1); // count:2 size:100 - BOOST_CHECK(policy->isFlowControlActive()); - - policy->dequeued(msg_50); // count:1 size:50 ->OFF - BOOST_CHECK(!policy->isFlowControlActive()); - - // verify flow control remains ON until both thresholds drop below their - // resume point. - - for (size_t i = 0; i < 8; i++) { - policy->tryEnqueue(msg_10.payload); - BOOST_CHECK(!policy->isFlowControlActive()); - } - // count:9 size:130 - - policy->tryEnqueue(msg_10.payload); // count:10 size: 140 - BOOST_CHECK(!policy->isFlowControlActive()); - - policy->tryEnqueue(msg_1.payload); // count:11 size: 141 ->ON - BOOST_CHECK(policy->isFlowControlActive()); - - policy->tryEnqueue(msg_100.payload); // count:12 size: 241 (both thresholds crossed) - BOOST_CHECK(policy->isFlowControlActive()); - - // at this point: 9@10 + 1@50 + 1@100 + 1@1 == 12@241 - - policy->dequeued(msg_50); // count:11 size:191 - BOOST_CHECK(policy->isFlowControlActive()); - - for (size_t i = 0; i < 9; i++) { - policy->dequeued(msg_10); - BOOST_CHECK(policy->isFlowControlActive()); - } - // count:2 size:101 - policy->dequeued(msg_1); // count:1 size:100 - BOOST_CHECK(policy->isFlowControlActive()); // still active due to size - - policy->dequeued(msg_100); // count:0 size:0 ->OFF - BOOST_CHECK(!policy->isFlowControlActive()); -} - - QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |