diff options
-rw-r--r-- | qpid/cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 24 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp | 280 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.h | 99 | ||||
-rw-r--r-- | qpid/cpp/src/tests/Makefile.am | 1 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueueFlowLimitTest.cpp | 307 |
7 files changed, 706 insertions, 9 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 739424783a..b197017327 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -588,6 +588,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/QueueRegistry.cpp \ qpid/broker/QueueRegistry.h \ qpid/broker/QueuedMessage.h \ + qpid/broker/QueueFlowLimit.h \ + qpid/broker/QueueFlowLimit.cpp \ qpid/broker/RateFlowcontrol.h \ qpid/broker/RateTracker.cpp \ qpid/broker/RateTracker.h \ diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 24469ed0e4..830b4215c7 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -27,6 +27,7 @@ #include "qpid/broker/MessageStore.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/QueueFlowLimit.h" #include "qpid/StringUtils.h" #include "qpid/log/Statement.h" @@ -166,11 +167,11 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ void Queue::recoverPrepared(boost::intrusive_ptr<Message>& msg) { - if (policy.get()) policy->recoverEnqueued(msg); + if (policy.get()) policy->recoverEnqueued(msg); // KAG INC COUNTERS } void Queue::recover(boost::intrusive_ptr<Message>& msg){ - if (policy.get()) policy->recoverEnqueued(msg); + if (policy.get()) policy->recoverEnqueued(msg); // KAG INC COUNTERS push(msg, true); if (store){ @@ -351,7 +352,7 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) //consumer wants the message c->position = msg.position; m = msg; - if (!lastValueQueueNoBrowse) clearLVQIndex(msg); + if (!lastValueQueueNoBrowse) clearLVQIndex(msg); // prevent this msg from being replaced by LVQ if (lastValueQueue) { boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this); if (replacement.get()) m.payload = replacement; @@ -642,8 +643,9 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName()); } if (policy.get()) { - policy->enqueued(qm); + policy->enqueued(qm); // KAG STORE COPY } + if (flowLimit.get()) flowLimit->consume(qm); } copy.notify(); } @@ -746,7 +748,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg Messages dequeues; { Mutex::ScopedLock locker(messageLock); - policy->tryEnqueue(msg); + policy->tryEnqueue(msg); // KAG INC COUNTERS policy->getPendingDequeues(dequeues); } //depending on policy, may have some dequeues that need to performed without holding the lock @@ -784,7 +786,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg) { Mutex::ScopedLock locker(messageLock); - if (policy.get()) policy->enqueueAborted(msg); + if (policy.get()) policy->enqueueAborted(msg); // KAG DEC COUNTERS } // return true if store exists, @@ -841,7 +843,8 @@ void Queue::popAndDequeue() */ void Queue::dequeued(const QueuedMessage& msg) { - if (policy.get()) policy->dequeued(msg); + if (policy.get()) policy->dequeued(msg); // KAG REMOVE COPY, DEC COUNTERS + if (flowLimit.get()) flowLimit->replenish(msg); mgntDeqStats(msg.payload); if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) { eventMgr->dequeued(msg); @@ -903,6 +906,8 @@ void Queue::configure(const FieldTable& _settings, bool recovering) FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers); if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>()); + flowLimit = QueueFlowLimit::createQueueFlowLimit(this, _settings); + if (mgmtObject != 0) mgmtObject->set_arguments(ManagementAgent::toMap(_settings)); @@ -1176,9 +1181,10 @@ void Queue::enqueued(const QueuedMessage& m) { if (m.payload) { if (policy.get()) { - policy->recoverEnqueued(m.payload); - policy->enqueued(m); + policy->recoverEnqueued(m.payload); // KAG INC COUNTERS + policy->enqueued(m); // KAG STORE COPY } + if (flowLimit.get()) flowLimit->consume(m); mgntEnqStats(m.payload); boost::intrusive_ptr<Message> payload = m.payload; enqueue ( 0, payload, true ); diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 96c79d1b92..57ef3dae6b 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -56,6 +56,7 @@ class QueueEvents; class QueueRegistry; class TransactionContext; class Exchange; +class QueueFlowLimit; /** * The brokers representation of an amqp queue. Messages are @@ -112,6 +113,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, mutable uint64_t persistenceId; framing::FieldTable settings; std::auto_ptr<QueuePolicy> policy; + std::auto_ptr<QueueFlowLimit> flowLimit; bool policyExceeded; QueueBindings bindings; std::string alternateExchangeName; diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp new file mode 100644 index 0000000000..f824b809d3 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -0,0 +1,280 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/QueueFlowLimit.h" +#include "qpid/broker/Queue.h" +#include "qpid/Exception.h" +#include "qpid/framing/FieldValue.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/Mutex.h" +#include <sstream> + +using namespace qpid::broker; +using namespace qpid::framing; + +namespace { + /** ensure that the configured flow control stop and resume values are + * valid with respect to the maximum queue capacity, and each other + */ + template <typename T> + void validateFlowConfig(T max, T& stop, T& resume, const std::string& type, const std::string& queue) + { + if (resume > stop) { + throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_resume_" << type + << "=" << resume + << " must be less than qpid.flow_stop_" << type + << "=" << stop)); + } + if (resume == 0) resume = stop; + if (max != 0 && (max < stop)) { + throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_stop_" << type + << "=" << stop + << " must be less than qpid.max_" << type + << "=" << max)); + } + } + + /** extract a capacity value as passed in an argument map + */ + uint64_t getCapacity(const FieldTable& settings, const std::string& key, uint64_t defaultValue) + { + FieldTable::ValuePtr v = settings.get(key); + + int64_t result = 0; + + if (!v) return defaultValue; + if (v->getType() == 0x23) { + QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>()); + } else if (v->getType() == 0x33) { + QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>()); + } else if (v->convertsTo<int64_t>()) { + result = v->get<int64_t>(); + QPID_LOG(debug, "Got integer value for " << key << ": " << result); + if (result >= 0) return result; + } else if (v->convertsTo<string>()) { + string s(v->get<string>()); + QPID_LOG(debug, "Got string value for " << key << ": " << s); + std::istringstream convert(s); + if (convert >> result && result >= 0) return result; + } + + QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer, using default (" << defaultValue << ")"); + return defaultValue; + } +} + + + +QueueFlowLimit::QueueFlowLimit(Queue *_queue, + uint32_t _flowStopCount, uint32_t _flowResumeCount, + uint64_t _flowStopSize, uint64_t _flowResumeSize) + : queueName("<unknown>"), flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount), + flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize), + flowStopped(false), count(0), size(0) +{ + uint32_t maxCount(0); + uint64_t maxSize(0); + + if (_queue) { + queueName = _queue->getName(); + if (_queue->getPolicy()) { + maxSize = _queue->getPolicy()->getMaxSize(); + maxCount = _queue->getPolicy()->getMaxCount(); + } + } + validateFlowConfig( maxCount, flowStopCount, flowResumeCount, "count", queueName ); + validateFlowConfig( maxSize, flowStopSize, flowResumeSize, "size", queueName ); + QPID_LOG(info, "Queue \"" << queueName << "\": Flow limit created: flowStopCount=" << flowStopCount + << ", flowResumeCount=" << flowResumeCount + << ", flowStopSize=" << flowStopSize << ", flowResumeSize=" << flowResumeSize ); +} + + + +void QueueFlowLimit::consume(const QueuedMessage& msg) +{ + if (!msg.payload) return; + + sys::Mutex::ScopedLock l(pendingFlowLock); + + ++count; + size += msg.payload->contentSize(); + + if (flowStopCount && !flowStopped && count > flowStopCount) { + flowStopped = true; + QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopCount << " enqueued messages. Producer flow control activated." ); + } + + if (flowStopSize && !flowStopped && size > flowStopSize) { + flowStopped = true; + QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopSize << " enqueued bytes. Producer flow control activated." ); + } + + // KAG: test + if (index.find(msg.payload) != index.end()) { + QPID_LOG(error, "Queue \"" << queueName << "\": has enqueued a msg twice: " << msg.position); + } + + if (flowStopped || !pendingFlow.empty()) { + msg.payload->getReceiveCompletion().startCompleter(); // don't complete until flow resumes + pendingFlow.push_back(msg.payload); + index.insert(msg.payload); + } +} + + + +void QueueFlowLimit::replenish(const QueuedMessage& msg) +{ + if (!msg.payload) return; + + sys::Mutex::ScopedLock l(pendingFlowLock); + + if (count > 0) { + --count; + } else { + throw Exception(QPID_MSG("Flow limit count underflow on dequeue. Queue=" << queueName)); + } + + uint64_t _size = msg.payload->contentSize(); + if (_size <= size) { + size -= _size; + } else { + throw Exception(QPID_MSG("Flow limit size underflow on dequeue. Queue=" << queueName)); + } + + if (flowStopped && + (flowResumeSize == 0 || size < flowResumeSize) && + (flowResumeCount == 0 || count < flowResumeCount)) { + flowStopped = false; + QPID_LOG(info, "Queue \"" << queueName << "\": has drained below the flow control resume level. Producer flow control deactivated." ); + } + + if (!flowStopped && !pendingFlow.empty()) { + // if msg is flow controlled, release it. + std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload); + if (itr != index.end()) { + (*itr)->getReceiveCompletion().finishCompleter(); + index.erase(itr); + // stupid: + std::list< boost::intrusive_ptr<Message> >::iterator itr2 = find(pendingFlow.begin(), + pendingFlow.end(), + msg.payload); + if (itr2 == pendingFlow.end()) { + QPID_LOG(error, "Queue \"" << queueName << "\": indexed msg missing in list: " << msg.position); + } else { + pendingFlow.erase(itr2); + } + } + + // for now, just release the oldest also + if (!pendingFlow.empty()) { + pendingFlow.front()->getReceiveCompletion().finishCompleter(); + itr = index.find(pendingFlow.front()); + if (itr == index.end()) { + QPID_LOG(error, "Queue \"" << queueName << "\": msg missing in index: " << pendingFlow.front()); + } else { + index.erase(itr); + } + pendingFlow.pop_front(); + } + } +} + + +void QueueFlowLimit::encode(Buffer& buffer) const +{ + buffer.putLong(flowStopCount); + buffer.putLong(flowResumeCount); + buffer.putLongLong(flowStopSize); + buffer.putLongLong(flowResumeSize); + buffer.putLong(count); + buffer.putLongLong(size); +} + + +void QueueFlowLimit::decode ( Buffer& buffer ) +{ + flowStopCount = buffer.getLong(); + flowResumeCount = buffer.getLong(); + flowStopSize = buffer.getLongLong(); + flowResumeSize = buffer.getLongLong(); + count = buffer.getLong(); + size = buffer.getLongLong(); +} + + +uint32_t QueueFlowLimit::encodedSize() const { + return sizeof(uint32_t) + // flowStopCount + sizeof(uint32_t) + // flowResumecount + sizeof(uint64_t) + // flowStopSize + sizeof(uint64_t) + // flowResumeSize + sizeof(uint32_t) + // count + sizeof(uint64_t); // size +} + + +const std::string QueueFlowLimit::flowStopCountKey("qpid.flow_stop_count"); +const std::string QueueFlowLimit::flowResumeCountKey("qpid.flow_resume_count"); +const std::string QueueFlowLimit::flowStopSizeKey("qpid.flow_stop_size"); +const std::string QueueFlowLimit::flowResumeSizeKey("qpid.flow_resume_size"); + + +std::auto_ptr<QueueFlowLimit> QueueFlowLimit::createQueueFlowLimit(Queue *queue, const qpid::framing::FieldTable& settings) +{ + uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0); + uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0); + uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0); + uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0); + + if (flowStopCount || flowResumeCount || flowStopSize || flowResumeSize) { + return std::auto_ptr<QueueFlowLimit>(new QueueFlowLimit(queue, flowStopCount, flowResumeCount, + flowStopSize, flowResumeSize)); + } else { + return std::auto_ptr<QueueFlowLimit>(); + } +} + + +namespace qpid { + namespace broker { + +std::ostream& operator<<(std::ostream& out, const QueueFlowLimit& f) +{ + out << "; flowStopCount=" << f.flowStopCount << ", flowResumeCount=" << f.flowResumeCount; + out << "; flowStopSize=" << f.flowStopSize << ", flowResumeSize=" << f.flowResumeSize; + return out; +} + + } +} + +/** + * TBD: + * - Is there a direct way to determine if QM is on pendingFlow list? + * - Rate limit the granting of flow. + * - What about LVQ? A newer msg may replace the older one. + * - What about queueing during a recovery? + * - What about queue purge? + * - What about message move? + * - How do we treat orphaned messages? + * -- Xfer a message to an alternate exchange - do we ack? + */ diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h new file mode 100644 index 0000000000..bd54f18a2b --- /dev/null +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _QueueFlowLimit_ +#define _QueueFlowLimit_ + +#include <list> +#include <set> +#include <iostream> +#include <memory> +#include "qpid/broker/BrokerImportExport.h" +#include "qpid/broker/QueuedMessage.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/sys/AtomicValue.h" +#include "qpid/sys/Mutex.h" + +namespace qpid { +namespace broker { + +/** + * Producer flow control: when level is > flowStop*, flow control is ON. + * then level is < flowResume*, flow control is OFF. If == 0, flow control + * is not used. If both byte and msg count thresholds are set, then + * passing _either_ level may turn flow control ON, but _both_ must be + * below level before flow control will be turned OFF. + */ +class QueueFlowLimit +{ + std::string queueName; + + uint32_t flowStopCount; + uint32_t flowResumeCount; + uint64_t flowStopSize; + uint64_t flowResumeSize; + bool flowStopped; // true = producers held in flow control + + // current queue utilization + uint32_t count; + uint64_t size; + + public: + static QPID_BROKER_EXTERN const std::string flowStopCountKey; + static QPID_BROKER_EXTERN const std::string flowResumeCountKey; + static QPID_BROKER_EXTERN const std::string flowStopSizeKey; + static QPID_BROKER_EXTERN const std::string flowResumeSizeKey; + + virtual ~QueueFlowLimit() {} + + /** the queue has added QueuedMessage */ + void consume(const QueuedMessage&); + /** the queue has removed QueuedMessage */ + void replenish(const QueuedMessage&); + + uint32_t getFlowStopCount() const { return flowStopCount; } + uint32_t getFlowResumeCount() const { return flowResumeCount; } + uint64_t getFlowStopSize() const { return flowStopSize; } + uint64_t getFlowResumeSize() const { return flowResumeSize; } + bool isFlowControlActive() const { return flowStopped; } + bool monitorFlowControl() const { return flowStopCount || flowStopSize; } + + void encode(framing::Buffer& buffer) const; + void decode(framing::Buffer& buffer); + uint32_t encodedSize() const; + + static QPID_BROKER_EXTERN std::auto_ptr<QueueFlowLimit> createQueueFlowLimit(Queue *queue, const qpid::framing::FieldTable& settings); + friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&, const QueueFlowLimit&); + + protected: + // msgs waiting for flow to become available. + std::list< boost::intrusive_ptr<Message> > pendingFlow; // ordered, oldest @front + std::set< boost::intrusive_ptr<Message> > index; + qpid::sys::Mutex pendingFlowLock; + + QueueFlowLimit(Queue *queue, + uint32_t flowStopCount, uint32_t flowResumeCount, + uint64_t flowStopSize, uint64_t flowResumeSize); +}; + +}} + + +#endif diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index fc3f7c0854..330b87e277 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -98,6 +98,7 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ MessageTest.cpp \ QueueRegistryTest.cpp \ QueuePolicyTest.cpp \ + QueueFlowLimitTest.cpp \ FramingTest.cpp \ HeaderTest.cpp \ SequenceNumberTest.cpp \ diff --git a/qpid/cpp/src/tests/QueueFlowLimitTest.cpp b/qpid/cpp/src/tests/QueueFlowLimitTest.cpp new file mode 100644 index 0000000000..5899986ee9 --- /dev/null +++ b/qpid/cpp/src/tests/QueueFlowLimitTest.cpp @@ -0,0 +1,307 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <sstream> +#include <deque> +#include "unit_test.h" +#include "test_tools.h" + +#include "qpid/broker/QueueFlowLimit.h" +#include "qpid/sys/Time.h" +#include "qpid/framing/reply_exceptions.h" +#include "MessageUtils.h" +#include "BrokerFixture.h" + +using namespace qpid::broker; +using namespace qpid::framing; + +namespace qpid { +namespace tests { + +QPID_AUTO_TEST_SUITE(QueueFlowLimitTestSuite) + +QueuedMessage createMessage(uint32_t size) +{ + QueuedMessage msg; + msg.payload = MessageUtils::createMessage(); + MessageUtils::addContent(msg.payload, std::string (size, 'x')); + return msg; +} + + +QPID_AUTO_TEST_CASE(testFlowCount) +{ + FieldTable args; + args.setInt(QueueFlowLimit::flowStopCountKey, 7); + args.setInt(QueueFlowLimit::flowResumeCountKey, 5); + + std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args)); + + BOOST_CHECK_EQUAL((uint32_t) 7, flow->getFlowStopCount()); + BOOST_CHECK_EQUAL((uint32_t) 5, flow->getFlowResumeCount()); + BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowStopSize()); + BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowResumeSize()); + BOOST_CHECK(!flow->isFlowControlActive()); + BOOST_CHECK(flow->monitorFlowControl()); + + std::deque<QueuedMessage> msgs; + for (size_t i = 0; i < 6; i++) { + msgs.push_back(createMessage(10)); + flow->consume(msgs.back()); + BOOST_CHECK(!flow->isFlowControlActive()); + } + BOOST_CHECK(!flow->isFlowControlActive()); // 6 on queue + msgs.push_back(createMessage(10)); + flow->consume(msgs.back()); + BOOST_CHECK(!flow->isFlowControlActive()); // 7 on queue + msgs.push_back(createMessage(10)); + flow->consume(msgs.back()); + BOOST_CHECK(flow->isFlowControlActive()); // 8 on queue, ON + msgs.push_back(createMessage(10)); + flow->consume(msgs.back()); + BOOST_CHECK(flow->isFlowControlActive()); // 9 on queue + + flow->replenish(msgs.front()); + msgs.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); // 8 on queue + flow->replenish(msgs.front()); + msgs.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); // 7 on queue + flow->replenish(msgs.front()); + msgs.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); // 6 on queue + flow->replenish(msgs.front()); + msgs.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); // 5 on queue + + flow->replenish(msgs.front()); + msgs.pop_front(); + BOOST_CHECK(!flow->isFlowControlActive()); // 4 on queue, OFF +} + + +QPID_AUTO_TEST_CASE(testFlowSize) +{ + FieldTable args; + args.setUInt64(QueueFlowLimit::flowStopSizeKey, 70); + args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 50); + + std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args)); + + BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowStopCount()); + BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowResumeCount()); + BOOST_CHECK_EQUAL((uint32_t) 70, flow->getFlowStopSize()); + BOOST_CHECK_EQUAL((uint32_t) 50, flow->getFlowResumeSize()); + BOOST_CHECK(!flow->isFlowControlActive()); + BOOST_CHECK(flow->monitorFlowControl()); + + std::deque<QueuedMessage> msgs; + for (size_t i = 0; i < 6; i++) { + msgs.push_back(createMessage(10)); + flow->consume(msgs.back()); + BOOST_CHECK(!flow->isFlowControlActive()); + } + BOOST_CHECK(!flow->isFlowControlActive()); // 60 on queue + QueuedMessage msg_9 = createMessage(9); + flow->consume(msg_9); + BOOST_CHECK(!flow->isFlowControlActive()); // 69 on queue + QueuedMessage tinyMsg_1 = createMessage(1); + flow->consume(tinyMsg_1); + BOOST_CHECK(!flow->isFlowControlActive()); // 70 on queue + + QueuedMessage tinyMsg_2 = createMessage(1); + flow->consume(tinyMsg_2); + BOOST_CHECK(flow->isFlowControlActive()); // 71 on queue, ON + msgs.push_back(createMessage(10)); + flow->consume(msgs.back()); + BOOST_CHECK(flow->isFlowControlActive()); // 81 on queue + + flow->replenish(msgs.front()); + msgs.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); // 71 on queue + flow->replenish(msgs.front()); + msgs.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); // 61 on queue + flow->replenish(msgs.front()); + msgs.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); // 51 on queue + + flow->replenish(tinyMsg_1); + BOOST_CHECK(flow->isFlowControlActive()); // 50 on queue + flow->replenish(tinyMsg_2); + BOOST_CHECK(!flow->isFlowControlActive()); // 49 on queue, OFF + + flow->replenish(msg_9); + BOOST_CHECK(!flow->isFlowControlActive()); // 40 on queue + flow->replenish(msgs.front()); + msgs.pop_front(); + BOOST_CHECK(!flow->isFlowControlActive()); // 30 on queue + flow->replenish(msgs.front()); + msgs.pop_front(); + BOOST_CHECK(!flow->isFlowControlActive()); // 20 on queue +} + +QPID_AUTO_TEST_CASE(testFlowArgs) +{ + FieldTable args; + const uint64_t stop(0x2FFFFFFFF); + const uint64_t resume(0x1FFFFFFFF); + args.setInt(QueueFlowLimit::flowStopCountKey, 30); + args.setInt(QueueFlowLimit::flowResumeCountKey, 21); + args.setUInt64(QueueFlowLimit::flowStopSizeKey, stop); + args.setUInt64(QueueFlowLimit::flowResumeSizeKey, resume); + + std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args)); + + BOOST_CHECK_EQUAL((uint32_t) 30, flow->getFlowStopCount()); + BOOST_CHECK_EQUAL((uint32_t) 21, flow->getFlowResumeCount()); + BOOST_CHECK_EQUAL(stop, flow->getFlowStopSize()); + BOOST_CHECK_EQUAL(resume, flow->getFlowResumeSize()); + BOOST_CHECK(!flow->isFlowControlActive()); + BOOST_CHECK(flow->monitorFlowControl()); +} + + +QPID_AUTO_TEST_CASE(testFlowCombo) +{ + FieldTable args; + args.setInt(QueueFlowLimit::flowStopCountKey, 10); + args.setInt(QueueFlowLimit::flowResumeCountKey, 5); + args.setUInt64(QueueFlowLimit::flowStopSizeKey, 200); + args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 100); + + std::deque<QueuedMessage> msgs_1; + std::deque<QueuedMessage> msgs_10; + std::deque<QueuedMessage> msgs_50; + std::deque<QueuedMessage> msgs_100; + + QueuedMessage msg; + + std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args)); + BOOST_CHECK(!flow->isFlowControlActive()); // count:0 size:0 + + // verify flow control comes ON when only count passes its stop point. + + for (size_t i = 0; i < 10; i++) { + msgs_10.push_back(createMessage(10)); + flow->consume(msgs_10.back()); + BOOST_CHECK(!flow->isFlowControlActive()); + } + // count:10 size:100 + + msgs_1.push_back(createMessage(1)); + flow->consume(msgs_1.back()); // count:11 size: 101 ->ON + BOOST_CHECK(flow->isFlowControlActive()); + + for (size_t i = 0; i < 6; i++) { + flow->replenish(msgs_10.front()); + msgs_10.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); + } + // count:5 size: 41 + + flow->replenish(msgs_1.front()); // count: 4 size: 40 ->OFF + msgs_1.pop_front(); + BOOST_CHECK(!flow->isFlowControlActive()); + + for (size_t i = 0; i < 4; i++) { + flow->replenish(msgs_10.front()); + msgs_10.pop_front(); + BOOST_CHECK(!flow->isFlowControlActive()); + } + // count:0 size:0 + + // verify flow control comes ON when only size passes its stop point. + + msgs_100.push_back(createMessage(100)); + flow->consume(msgs_100.back()); // count:1 size: 100 + BOOST_CHECK(!flow->isFlowControlActive()); + + msgs_50.push_back(createMessage(50)); + flow->consume(msgs_50.back()); // count:2 size: 150 + BOOST_CHECK(!flow->isFlowControlActive()); + + msgs_50.push_back(createMessage(50)); + flow->consume(msgs_50.back()); // count:3 size: 200 + BOOST_CHECK(!flow->isFlowControlActive()); + + msgs_1.push_back(createMessage(1)); + flow->consume(msgs_1.back()); // count:4 size: 201 ->ON + BOOST_CHECK(flow->isFlowControlActive()); + + flow->replenish(msgs_100.front()); // count:3 size:101 + msgs_100.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); + + flow->replenish(msgs_1.front()); // count:2 size:100 + msgs_1.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); + + flow->replenish(msgs_50.front()); // count:1 size:50 ->OFF + msgs_50.pop_front(); + BOOST_CHECK(!flow->isFlowControlActive()); + + // verify flow control remains ON until both thresholds drop below their + // resume point. + + for (size_t i = 0; i < 8; i++) { + msgs_10.push_back(createMessage(10)); + flow->consume(msgs_10.back()); + BOOST_CHECK(!flow->isFlowControlActive()); + } + // count:9 size:130 + + msgs_10.push_back(createMessage(10)); + flow->consume(msgs_10.back()); // count:10 size: 140 + BOOST_CHECK(!flow->isFlowControlActive()); + + msgs_1.push_back(createMessage(1)); + flow->consume(msgs_1.back()); // count:11 size: 141 ->ON + BOOST_CHECK(flow->isFlowControlActive()); + + msgs_100.push_back(createMessage(100)); + flow->consume(msgs_100.back()); // count:12 size: 241 (both thresholds crossed) + BOOST_CHECK(flow->isFlowControlActive()); + + // at this point: 9@10 + 1@50 + 1@100 + 1@1 == 12@241 + + flow->replenish(msgs_50.front()); // count:11 size:191 + msgs_50.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); + + for (size_t i = 0; i < 9; i++) { + flow->replenish(msgs_10.front()); + msgs_10.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); + } + // count:2 size:101 + flow->replenish(msgs_1.front()); // count:1 size:100 + msgs_1.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); // still active due to size + + flow->replenish(msgs_100.front()); // count:0 size:0 ->OFF + msgs_100.pop_front(); + BOOST_CHECK(!flow->isFlowControlActive()); +} + + +QPID_AUTO_TEST_SUITE_END() + +}} // namespace qpid::tests |