summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-01-31 18:09:51 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-01-31 18:09:51 +0000
commitd8b24fb9a4f5fa773663f58f040ea97baa7dea83 (patch)
treefa0e9a2396929f97b845bcb55ec612007828cf88
parent80321993fe0b59dbe33a29a81a6e40d3dc943544 (diff)
downloadqpid-python-d8b24fb9a4f5fa773663f58f040ea97baa7dea83.tar.gz
QPID-2935: add per-queue flow limits
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1065700 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/Makefile.am2
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp24
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h2
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp280
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.h99
-rw-r--r--qpid/cpp/src/tests/Makefile.am1
-rw-r--r--qpid/cpp/src/tests/QueueFlowLimitTest.cpp307
7 files changed, 706 insertions, 9 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 739424783a..b197017327 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -588,6 +588,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/QueueRegistry.cpp \
qpid/broker/QueueRegistry.h \
qpid/broker/QueuedMessage.h \
+ qpid/broker/QueueFlowLimit.h \
+ qpid/broker/QueueFlowLimit.cpp \
qpid/broker/RateFlowcontrol.h \
qpid/broker/RateTracker.cpp \
qpid/broker/RateTracker.h \
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 24469ed0e4..830b4215c7 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -27,6 +27,7 @@
#include "qpid/broker/MessageStore.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/StringUtils.h"
#include "qpid/log/Statement.h"
@@ -166,11 +167,11 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){
void Queue::recoverPrepared(boost::intrusive_ptr<Message>& msg)
{
- if (policy.get()) policy->recoverEnqueued(msg);
+ if (policy.get()) policy->recoverEnqueued(msg); // KAG INC COUNTERS
}
void Queue::recover(boost::intrusive_ptr<Message>& msg){
- if (policy.get()) policy->recoverEnqueued(msg);
+ if (policy.get()) policy->recoverEnqueued(msg); // KAG INC COUNTERS
push(msg, true);
if (store){
@@ -351,7 +352,7 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
//consumer wants the message
c->position = msg.position;
m = msg;
- if (!lastValueQueueNoBrowse) clearLVQIndex(msg);
+ if (!lastValueQueueNoBrowse) clearLVQIndex(msg); // prevent this msg from being replaced by LVQ
if (lastValueQueue) {
boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
if (replacement.get()) m.payload = replacement;
@@ -642,8 +643,9 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
}
if (policy.get()) {
- policy->enqueued(qm);
+ policy->enqueued(qm); // KAG STORE COPY
}
+ if (flowLimit.get()) flowLimit->consume(qm);
}
copy.notify();
}
@@ -746,7 +748,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg
Messages dequeues;
{
Mutex::ScopedLock locker(messageLock);
- policy->tryEnqueue(msg);
+ policy->tryEnqueue(msg); // KAG INC COUNTERS
policy->getPendingDequeues(dequeues);
}
//depending on policy, may have some dequeues that need to performed without holding the lock
@@ -784,7 +786,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg
void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
{
Mutex::ScopedLock locker(messageLock);
- if (policy.get()) policy->enqueueAborted(msg);
+ if (policy.get()) policy->enqueueAborted(msg); // KAG DEC COUNTERS
}
// return true if store exists,
@@ -841,7 +843,8 @@ void Queue::popAndDequeue()
*/
void Queue::dequeued(const QueuedMessage& msg)
{
- if (policy.get()) policy->dequeued(msg);
+ if (policy.get()) policy->dequeued(msg); // KAG REMOVE COPY, DEC COUNTERS
+ if (flowLimit.get()) flowLimit->replenish(msg);
mgntDeqStats(msg.payload);
if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) {
eventMgr->dequeued(msg);
@@ -903,6 +906,8 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
+ flowLimit = QueueFlowLimit::createQueueFlowLimit(this, _settings);
+
if (mgmtObject != 0)
mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
@@ -1176,9 +1181,10 @@ void Queue::enqueued(const QueuedMessage& m)
{
if (m.payload) {
if (policy.get()) {
- policy->recoverEnqueued(m.payload);
- policy->enqueued(m);
+ policy->recoverEnqueued(m.payload); // KAG INC COUNTERS
+ policy->enqueued(m); // KAG STORE COPY
}
+ if (flowLimit.get()) flowLimit->consume(m);
mgntEnqStats(m.payload);
boost::intrusive_ptr<Message> payload = m.payload;
enqueue ( 0, payload, true );
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 96c79d1b92..57ef3dae6b 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -56,6 +56,7 @@ class QueueEvents;
class QueueRegistry;
class TransactionContext;
class Exchange;
+class QueueFlowLimit;
/**
* The brokers representation of an amqp queue. Messages are
@@ -112,6 +113,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
mutable uint64_t persistenceId;
framing::FieldTable settings;
std::auto_ptr<QueuePolicy> policy;
+ std::auto_ptr<QueueFlowLimit> flowLimit;
bool policyExceeded;
QueueBindings bindings;
std::string alternateExchangeName;
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
new file mode 100644
index 0000000000..f824b809d3
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -0,0 +1,280 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/Exception.h"
+#include "qpid/framing/FieldValue.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Mutex.h"
+#include <sstream>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+namespace {
+ /** ensure that the configured flow control stop and resume values are
+ * valid with respect to the maximum queue capacity, and each other
+ */
+ template <typename T>
+ void validateFlowConfig(T max, T& stop, T& resume, const std::string& type, const std::string& queue)
+ {
+ if (resume > stop) {
+ throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_resume_" << type
+ << "=" << resume
+ << " must be less than qpid.flow_stop_" << type
+ << "=" << stop));
+ }
+ if (resume == 0) resume = stop;
+ if (max != 0 && (max < stop)) {
+ throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_stop_" << type
+ << "=" << stop
+ << " must be less than qpid.max_" << type
+ << "=" << max));
+ }
+ }
+
+ /** extract a capacity value as passed in an argument map
+ */
+ uint64_t getCapacity(const FieldTable& settings, const std::string& key, uint64_t defaultValue)
+ {
+ FieldTable::ValuePtr v = settings.get(key);
+
+ int64_t result = 0;
+
+ if (!v) return defaultValue;
+ if (v->getType() == 0x23) {
+ QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>());
+ } else if (v->getType() == 0x33) {
+ QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>());
+ } else if (v->convertsTo<int64_t>()) {
+ result = v->get<int64_t>();
+ QPID_LOG(debug, "Got integer value for " << key << ": " << result);
+ if (result >= 0) return result;
+ } else if (v->convertsTo<string>()) {
+ string s(v->get<string>());
+ QPID_LOG(debug, "Got string value for " << key << ": " << s);
+ std::istringstream convert(s);
+ if (convert >> result && result >= 0) return result;
+ }
+
+ QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer, using default (" << defaultValue << ")");
+ return defaultValue;
+ }
+}
+
+
+
+QueueFlowLimit::QueueFlowLimit(Queue *_queue,
+ uint32_t _flowStopCount, uint32_t _flowResumeCount,
+ uint64_t _flowStopSize, uint64_t _flowResumeSize)
+ : queueName("<unknown>"), flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
+ flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
+ flowStopped(false), count(0), size(0)
+{
+ uint32_t maxCount(0);
+ uint64_t maxSize(0);
+
+ if (_queue) {
+ queueName = _queue->getName();
+ if (_queue->getPolicy()) {
+ maxSize = _queue->getPolicy()->getMaxSize();
+ maxCount = _queue->getPolicy()->getMaxCount();
+ }
+ }
+ validateFlowConfig( maxCount, flowStopCount, flowResumeCount, "count", queueName );
+ validateFlowConfig( maxSize, flowStopSize, flowResumeSize, "size", queueName );
+ QPID_LOG(info, "Queue \"" << queueName << "\": Flow limit created: flowStopCount=" << flowStopCount
+ << ", flowResumeCount=" << flowResumeCount
+ << ", flowStopSize=" << flowStopSize << ", flowResumeSize=" << flowResumeSize );
+}
+
+
+
+void QueueFlowLimit::consume(const QueuedMessage& msg)
+{
+ if (!msg.payload) return;
+
+ sys::Mutex::ScopedLock l(pendingFlowLock);
+
+ ++count;
+ size += msg.payload->contentSize();
+
+ if (flowStopCount && !flowStopped && count > flowStopCount) {
+ flowStopped = true;
+ QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopCount << " enqueued messages. Producer flow control activated." );
+ }
+
+ if (flowStopSize && !flowStopped && size > flowStopSize) {
+ flowStopped = true;
+ QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopSize << " enqueued bytes. Producer flow control activated." );
+ }
+
+ // KAG: test
+ if (index.find(msg.payload) != index.end()) {
+ QPID_LOG(error, "Queue \"" << queueName << "\": has enqueued a msg twice: " << msg.position);
+ }
+
+ if (flowStopped || !pendingFlow.empty()) {
+ msg.payload->getReceiveCompletion().startCompleter(); // don't complete until flow resumes
+ pendingFlow.push_back(msg.payload);
+ index.insert(msg.payload);
+ }
+}
+
+
+
+void QueueFlowLimit::replenish(const QueuedMessage& msg)
+{
+ if (!msg.payload) return;
+
+ sys::Mutex::ScopedLock l(pendingFlowLock);
+
+ if (count > 0) {
+ --count;
+ } else {
+ throw Exception(QPID_MSG("Flow limit count underflow on dequeue. Queue=" << queueName));
+ }
+
+ uint64_t _size = msg.payload->contentSize();
+ if (_size <= size) {
+ size -= _size;
+ } else {
+ throw Exception(QPID_MSG("Flow limit size underflow on dequeue. Queue=" << queueName));
+ }
+
+ if (flowStopped &&
+ (flowResumeSize == 0 || size < flowResumeSize) &&
+ (flowResumeCount == 0 || count < flowResumeCount)) {
+ flowStopped = false;
+ QPID_LOG(info, "Queue \"" << queueName << "\": has drained below the flow control resume level. Producer flow control deactivated." );
+ }
+
+ if (!flowStopped && !pendingFlow.empty()) {
+ // if msg is flow controlled, release it.
+ std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload);
+ if (itr != index.end()) {
+ (*itr)->getReceiveCompletion().finishCompleter();
+ index.erase(itr);
+ // stupid:
+ std::list< boost::intrusive_ptr<Message> >::iterator itr2 = find(pendingFlow.begin(),
+ pendingFlow.end(),
+ msg.payload);
+ if (itr2 == pendingFlow.end()) {
+ QPID_LOG(error, "Queue \"" << queueName << "\": indexed msg missing in list: " << msg.position);
+ } else {
+ pendingFlow.erase(itr2);
+ }
+ }
+
+ // for now, just release the oldest also
+ if (!pendingFlow.empty()) {
+ pendingFlow.front()->getReceiveCompletion().finishCompleter();
+ itr = index.find(pendingFlow.front());
+ if (itr == index.end()) {
+ QPID_LOG(error, "Queue \"" << queueName << "\": msg missing in index: " << pendingFlow.front());
+ } else {
+ index.erase(itr);
+ }
+ pendingFlow.pop_front();
+ }
+ }
+}
+
+
+void QueueFlowLimit::encode(Buffer& buffer) const
+{
+ buffer.putLong(flowStopCount);
+ buffer.putLong(flowResumeCount);
+ buffer.putLongLong(flowStopSize);
+ buffer.putLongLong(flowResumeSize);
+ buffer.putLong(count);
+ buffer.putLongLong(size);
+}
+
+
+void QueueFlowLimit::decode ( Buffer& buffer )
+{
+ flowStopCount = buffer.getLong();
+ flowResumeCount = buffer.getLong();
+ flowStopSize = buffer.getLongLong();
+ flowResumeSize = buffer.getLongLong();
+ count = buffer.getLong();
+ size = buffer.getLongLong();
+}
+
+
+uint32_t QueueFlowLimit::encodedSize() const {
+ return sizeof(uint32_t) + // flowStopCount
+ sizeof(uint32_t) + // flowResumecount
+ sizeof(uint64_t) + // flowStopSize
+ sizeof(uint64_t) + // flowResumeSize
+ sizeof(uint32_t) + // count
+ sizeof(uint64_t); // size
+}
+
+
+const std::string QueueFlowLimit::flowStopCountKey("qpid.flow_stop_count");
+const std::string QueueFlowLimit::flowResumeCountKey("qpid.flow_resume_count");
+const std::string QueueFlowLimit::flowStopSizeKey("qpid.flow_stop_size");
+const std::string QueueFlowLimit::flowResumeSizeKey("qpid.flow_resume_size");
+
+
+std::auto_ptr<QueueFlowLimit> QueueFlowLimit::createQueueFlowLimit(Queue *queue, const qpid::framing::FieldTable& settings)
+{
+ uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0);
+ uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0);
+ uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0);
+ uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0);
+
+ if (flowStopCount || flowResumeCount || flowStopSize || flowResumeSize) {
+ return std::auto_ptr<QueueFlowLimit>(new QueueFlowLimit(queue, flowStopCount, flowResumeCount,
+ flowStopSize, flowResumeSize));
+ } else {
+ return std::auto_ptr<QueueFlowLimit>();
+ }
+}
+
+
+namespace qpid {
+ namespace broker {
+
+std::ostream& operator<<(std::ostream& out, const QueueFlowLimit& f)
+{
+ out << "; flowStopCount=" << f.flowStopCount << ", flowResumeCount=" << f.flowResumeCount;
+ out << "; flowStopSize=" << f.flowStopSize << ", flowResumeSize=" << f.flowResumeSize;
+ return out;
+}
+
+ }
+}
+
+/**
+ * TBD:
+ * - Is there a direct way to determine if QM is on pendingFlow list?
+ * - Rate limit the granting of flow.
+ * - What about LVQ? A newer msg may replace the older one.
+ * - What about queueing during a recovery?
+ * - What about queue purge?
+ * - What about message move?
+ * - How do we treat orphaned messages?
+ * -- Xfer a message to an alternate exchange - do we ack?
+ */
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
new file mode 100644
index 0000000000..bd54f18a2b
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QueueFlowLimit_
+#define _QueueFlowLimit_
+
+#include <list>
+#include <set>
+#include <iostream>
+#include <memory>
+#include "qpid/broker/BrokerImportExport.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/sys/AtomicValue.h"
+#include "qpid/sys/Mutex.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Producer flow control: when level is > flowStop*, flow control is ON.
+ * then level is < flowResume*, flow control is OFF. If == 0, flow control
+ * is not used. If both byte and msg count thresholds are set, then
+ * passing _either_ level may turn flow control ON, but _both_ must be
+ * below level before flow control will be turned OFF.
+ */
+class QueueFlowLimit
+{
+ std::string queueName;
+
+ uint32_t flowStopCount;
+ uint32_t flowResumeCount;
+ uint64_t flowStopSize;
+ uint64_t flowResumeSize;
+ bool flowStopped; // true = producers held in flow control
+
+ // current queue utilization
+ uint32_t count;
+ uint64_t size;
+
+ public:
+ static QPID_BROKER_EXTERN const std::string flowStopCountKey;
+ static QPID_BROKER_EXTERN const std::string flowResumeCountKey;
+ static QPID_BROKER_EXTERN const std::string flowStopSizeKey;
+ static QPID_BROKER_EXTERN const std::string flowResumeSizeKey;
+
+ virtual ~QueueFlowLimit() {}
+
+ /** the queue has added QueuedMessage */
+ void consume(const QueuedMessage&);
+ /** the queue has removed QueuedMessage */
+ void replenish(const QueuedMessage&);
+
+ uint32_t getFlowStopCount() const { return flowStopCount; }
+ uint32_t getFlowResumeCount() const { return flowResumeCount; }
+ uint64_t getFlowStopSize() const { return flowStopSize; }
+ uint64_t getFlowResumeSize() const { return flowResumeSize; }
+ bool isFlowControlActive() const { return flowStopped; }
+ bool monitorFlowControl() const { return flowStopCount || flowStopSize; }
+
+ void encode(framing::Buffer& buffer) const;
+ void decode(framing::Buffer& buffer);
+ uint32_t encodedSize() const;
+
+ static QPID_BROKER_EXTERN std::auto_ptr<QueueFlowLimit> createQueueFlowLimit(Queue *queue, const qpid::framing::FieldTable& settings);
+ friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&, const QueueFlowLimit&);
+
+ protected:
+ // msgs waiting for flow to become available.
+ std::list< boost::intrusive_ptr<Message> > pendingFlow; // ordered, oldest @front
+ std::set< boost::intrusive_ptr<Message> > index;
+ qpid::sys::Mutex pendingFlowLock;
+
+ QueueFlowLimit(Queue *queue,
+ uint32_t flowStopCount, uint32_t flowResumeCount,
+ uint64_t flowStopSize, uint64_t flowResumeSize);
+};
+
+}}
+
+
+#endif
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index fc3f7c0854..330b87e277 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -98,6 +98,7 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
MessageTest.cpp \
QueueRegistryTest.cpp \
QueuePolicyTest.cpp \
+ QueueFlowLimitTest.cpp \
FramingTest.cpp \
HeaderTest.cpp \
SequenceNumberTest.cpp \
diff --git a/qpid/cpp/src/tests/QueueFlowLimitTest.cpp b/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
new file mode 100644
index 0000000000..5899986ee9
--- /dev/null
+++ b/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
@@ -0,0 +1,307 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <sstream>
+#include <deque>
+#include "unit_test.h"
+#include "test_tools.h"
+
+#include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/sys/Time.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "MessageUtils.h"
+#include "BrokerFixture.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+namespace qpid {
+namespace tests {
+
+QPID_AUTO_TEST_SUITE(QueueFlowLimitTestSuite)
+
+QueuedMessage createMessage(uint32_t size)
+{
+ QueuedMessage msg;
+ msg.payload = MessageUtils::createMessage();
+ MessageUtils::addContent(msg.payload, std::string (size, 'x'));
+ return msg;
+}
+
+
+QPID_AUTO_TEST_CASE(testFlowCount)
+{
+ FieldTable args;
+ args.setInt(QueueFlowLimit::flowStopCountKey, 7);
+ args.setInt(QueueFlowLimit::flowResumeCountKey, 5);
+
+ std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+
+ BOOST_CHECK_EQUAL((uint32_t) 7, flow->getFlowStopCount());
+ BOOST_CHECK_EQUAL((uint32_t) 5, flow->getFlowResumeCount());
+ BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowStopSize());
+ BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowResumeSize());
+ BOOST_CHECK(!flow->isFlowControlActive());
+ BOOST_CHECK(flow->monitorFlowControl());
+
+ std::deque<QueuedMessage> msgs;
+ for (size_t i = 0; i < 6; i++) {
+ msgs.push_back(createMessage(10));
+ flow->consume(msgs.back());
+ BOOST_CHECK(!flow->isFlowControlActive());
+ }
+ BOOST_CHECK(!flow->isFlowControlActive()); // 6 on queue
+ msgs.push_back(createMessage(10));
+ flow->consume(msgs.back());
+ BOOST_CHECK(!flow->isFlowControlActive()); // 7 on queue
+ msgs.push_back(createMessage(10));
+ flow->consume(msgs.back());
+ BOOST_CHECK(flow->isFlowControlActive()); // 8 on queue, ON
+ msgs.push_back(createMessage(10));
+ flow->consume(msgs.back());
+ BOOST_CHECK(flow->isFlowControlActive()); // 9 on queue
+
+ flow->replenish(msgs.front());
+ msgs.pop_front();
+ BOOST_CHECK(flow->isFlowControlActive()); // 8 on queue
+ flow->replenish(msgs.front());
+ msgs.pop_front();
+ BOOST_CHECK(flow->isFlowControlActive()); // 7 on queue
+ flow->replenish(msgs.front());
+ msgs.pop_front();
+ BOOST_CHECK(flow->isFlowControlActive()); // 6 on queue
+ flow->replenish(msgs.front());
+ msgs.pop_front();
+ BOOST_CHECK(flow->isFlowControlActive()); // 5 on queue
+
+ flow->replenish(msgs.front());
+ msgs.pop_front();
+ BOOST_CHECK(!flow->isFlowControlActive()); // 4 on queue, OFF
+}
+
+
+QPID_AUTO_TEST_CASE(testFlowSize)
+{
+ FieldTable args;
+ args.setUInt64(QueueFlowLimit::flowStopSizeKey, 70);
+ args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 50);
+
+ std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+
+ BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowStopCount());
+ BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowResumeCount());
+ BOOST_CHECK_EQUAL((uint32_t) 70, flow->getFlowStopSize());
+ BOOST_CHECK_EQUAL((uint32_t) 50, flow->getFlowResumeSize());
+ BOOST_CHECK(!flow->isFlowControlActive());
+ BOOST_CHECK(flow->monitorFlowControl());
+
+ std::deque<QueuedMessage> msgs;
+ for (size_t i = 0; i < 6; i++) {
+ msgs.push_back(createMessage(10));
+ flow->consume(msgs.back());
+ BOOST_CHECK(!flow->isFlowControlActive());
+ }
+ BOOST_CHECK(!flow->isFlowControlActive()); // 60 on queue
+ QueuedMessage msg_9 = createMessage(9);
+ flow->consume(msg_9);
+ BOOST_CHECK(!flow->isFlowControlActive()); // 69 on queue
+ QueuedMessage tinyMsg_1 = createMessage(1);
+ flow->consume(tinyMsg_1);
+ BOOST_CHECK(!flow->isFlowControlActive()); // 70 on queue
+
+ QueuedMessage tinyMsg_2 = createMessage(1);
+ flow->consume(tinyMsg_2);
+ BOOST_CHECK(flow->isFlowControlActive()); // 71 on queue, ON
+ msgs.push_back(createMessage(10));
+ flow->consume(msgs.back());
+ BOOST_CHECK(flow->isFlowControlActive()); // 81 on queue
+
+ flow->replenish(msgs.front());
+ msgs.pop_front();
+ BOOST_CHECK(flow->isFlowControlActive()); // 71 on queue
+ flow->replenish(msgs.front());
+ msgs.pop_front();
+ BOOST_CHECK(flow->isFlowControlActive()); // 61 on queue
+ flow->replenish(msgs.front());
+ msgs.pop_front();
+ BOOST_CHECK(flow->isFlowControlActive()); // 51 on queue
+
+ flow->replenish(tinyMsg_1);
+ BOOST_CHECK(flow->isFlowControlActive()); // 50 on queue
+ flow->replenish(tinyMsg_2);
+ BOOST_CHECK(!flow->isFlowControlActive()); // 49 on queue, OFF
+
+ flow->replenish(msg_9);
+ BOOST_CHECK(!flow->isFlowControlActive()); // 40 on queue
+ flow->replenish(msgs.front());
+ msgs.pop_front();
+ BOOST_CHECK(!flow->isFlowControlActive()); // 30 on queue
+ flow->replenish(msgs.front());
+ msgs.pop_front();
+ BOOST_CHECK(!flow->isFlowControlActive()); // 20 on queue
+}
+
+QPID_AUTO_TEST_CASE(testFlowArgs)
+{
+ FieldTable args;
+ const uint64_t stop(0x2FFFFFFFF);
+ const uint64_t resume(0x1FFFFFFFF);
+ args.setInt(QueueFlowLimit::flowStopCountKey, 30);
+ args.setInt(QueueFlowLimit::flowResumeCountKey, 21);
+ args.setUInt64(QueueFlowLimit::flowStopSizeKey, stop);
+ args.setUInt64(QueueFlowLimit::flowResumeSizeKey, resume);
+
+ std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+
+ BOOST_CHECK_EQUAL((uint32_t) 30, flow->getFlowStopCount());
+ BOOST_CHECK_EQUAL((uint32_t) 21, flow->getFlowResumeCount());
+ BOOST_CHECK_EQUAL(stop, flow->getFlowStopSize());
+ BOOST_CHECK_EQUAL(resume, flow->getFlowResumeSize());
+ BOOST_CHECK(!flow->isFlowControlActive());
+ BOOST_CHECK(flow->monitorFlowControl());
+}
+
+
+QPID_AUTO_TEST_CASE(testFlowCombo)
+{
+ FieldTable args;
+ args.setInt(QueueFlowLimit::flowStopCountKey, 10);
+ args.setInt(QueueFlowLimit::flowResumeCountKey, 5);
+ args.setUInt64(QueueFlowLimit::flowStopSizeKey, 200);
+ args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 100);
+
+ std::deque<QueuedMessage> msgs_1;
+ std::deque<QueuedMessage> msgs_10;
+ std::deque<QueuedMessage> msgs_50;
+ std::deque<QueuedMessage> msgs_100;
+
+ QueuedMessage msg;
+
+ std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+ BOOST_CHECK(!flow->isFlowControlActive()); // count:0 size:0
+
+ // verify flow control comes ON when only count passes its stop point.
+
+ for (size_t i = 0; i < 10; i++) {
+ msgs_10.push_back(createMessage(10));
+ flow->consume(msgs_10.back());
+ BOOST_CHECK(!flow->isFlowControlActive());
+ }
+ // count:10 size:100
+
+ msgs_1.push_back(createMessage(1));
+ flow->consume(msgs_1.back()); // count:11 size: 101 ->ON
+ BOOST_CHECK(flow->isFlowControlActive());
+
+ for (size_t i = 0; i < 6; i++) {
+ flow->replenish(msgs_10.front());
+ msgs_10.pop_front();
+ BOOST_CHECK(flow->isFlowControlActive());
+ }
+ // count:5 size: 41
+
+ flow->replenish(msgs_1.front()); // count: 4 size: 40 ->OFF
+ msgs_1.pop_front();
+ BOOST_CHECK(!flow->isFlowControlActive());
+
+ for (size_t i = 0; i < 4; i++) {
+ flow->replenish(msgs_10.front());
+ msgs_10.pop_front();
+ BOOST_CHECK(!flow->isFlowControlActive());
+ }
+ // count:0 size:0
+
+ // verify flow control comes ON when only size passes its stop point.
+
+ msgs_100.push_back(createMessage(100));
+ flow->consume(msgs_100.back()); // count:1 size: 100
+ BOOST_CHECK(!flow->isFlowControlActive());
+
+ msgs_50.push_back(createMessage(50));
+ flow->consume(msgs_50.back()); // count:2 size: 150
+ BOOST_CHECK(!flow->isFlowControlActive());
+
+ msgs_50.push_back(createMessage(50));
+ flow->consume(msgs_50.back()); // count:3 size: 200
+ BOOST_CHECK(!flow->isFlowControlActive());
+
+ msgs_1.push_back(createMessage(1));
+ flow->consume(msgs_1.back()); // count:4 size: 201 ->ON
+ BOOST_CHECK(flow->isFlowControlActive());
+
+ flow->replenish(msgs_100.front()); // count:3 size:101
+ msgs_100.pop_front();
+ BOOST_CHECK(flow->isFlowControlActive());
+
+ flow->replenish(msgs_1.front()); // count:2 size:100
+ msgs_1.pop_front();
+ BOOST_CHECK(flow->isFlowControlActive());
+
+ flow->replenish(msgs_50.front()); // count:1 size:50 ->OFF
+ msgs_50.pop_front();
+ BOOST_CHECK(!flow->isFlowControlActive());
+
+ // verify flow control remains ON until both thresholds drop below their
+ // resume point.
+
+ for (size_t i = 0; i < 8; i++) {
+ msgs_10.push_back(createMessage(10));
+ flow->consume(msgs_10.back());
+ BOOST_CHECK(!flow->isFlowControlActive());
+ }
+ // count:9 size:130
+
+ msgs_10.push_back(createMessage(10));
+ flow->consume(msgs_10.back()); // count:10 size: 140
+ BOOST_CHECK(!flow->isFlowControlActive());
+
+ msgs_1.push_back(createMessage(1));
+ flow->consume(msgs_1.back()); // count:11 size: 141 ->ON
+ BOOST_CHECK(flow->isFlowControlActive());
+
+ msgs_100.push_back(createMessage(100));
+ flow->consume(msgs_100.back()); // count:12 size: 241 (both thresholds crossed)
+ BOOST_CHECK(flow->isFlowControlActive());
+
+ // at this point: 9@10 + 1@50 + 1@100 + 1@1 == 12@241
+
+ flow->replenish(msgs_50.front()); // count:11 size:191
+ msgs_50.pop_front();
+ BOOST_CHECK(flow->isFlowControlActive());
+
+ for (size_t i = 0; i < 9; i++) {
+ flow->replenish(msgs_10.front());
+ msgs_10.pop_front();
+ BOOST_CHECK(flow->isFlowControlActive());
+ }
+ // count:2 size:101
+ flow->replenish(msgs_1.front()); // count:1 size:100
+ msgs_1.pop_front();
+ BOOST_CHECK(flow->isFlowControlActive()); // still active due to size
+
+ flow->replenish(msgs_100.front()); // count:0 size:0 ->OFF
+ msgs_100.pop_front();
+ BOOST_CHECK(!flow->isFlowControlActive());
+}
+
+
+QPID_AUTO_TEST_SUITE_END()
+
+}} // namespace qpid::tests