summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-02-08 22:33:04 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-02-08 22:33:04 +0000
commit38f268837e1488b4ea63101d669f1117861a0012 (patch)
tree1a871c0d30f1609e6f42e0f0295b1781204fcb7a
parentbe7946e8b700018bf204e11f650f77d188ca10e3 (diff)
downloadqpid-python-38f268837e1488b4ea63101d669f1117861a0012.tar.gz
QPID-2935: modify flow accounting api per review input.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1068645 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp30
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp54
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.h17
-rw-r--r--qpid/cpp/src/tests/QueueFlowLimitTest.cpp93
4 files changed, 101 insertions, 93 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index ae3f84008a..8fc4a8ec39 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -645,11 +645,8 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
if (policy.get()) {
policy->enqueued(qm);
}
- if (flowLimit.get()) {
- bool fc = flowLimit->consume(qm);
- if (fc && mgmtObject)
- mgmtObject->set_flowStopped(true);
- }
+ if (flowLimit.get())
+ flowLimit->enqueued(qm);
}
copy.notify();
}
@@ -848,11 +845,8 @@ void Queue::popAndDequeue()
void Queue::dequeued(const QueuedMessage& msg)
{
if (policy.get()) policy->dequeued(msg);
- if (flowLimit.get()) {
- bool fc = flowLimit->replenish(msg);
- if (fc && mgmtObject)
- mgmtObject->set_flowStopped(false);
- }
+ if (flowLimit.get())
+ flowLimit->dequeued(msg);
mgntDeqStats(msg.payload);
if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) {
eventMgr->dequeued(msg);
@@ -918,13 +912,8 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
if (mgmtObject != 0) {
mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
- if (flowLimit.get()) {
- mgmtObject->set_flowStopCount(flowLimit->getFlowStopCount());
- mgmtObject->set_flowResumeCount(flowLimit->getFlowResumeCount());
- mgmtObject->set_flowStopSize(flowLimit->getFlowStopSize());
- mgmtObject->set_flowResumeSize(flowLimit->getFlowResumeSize());
- mgmtObject->set_flowStopped(flowLimit->isFlowControlActive());
- }
+ if (flowLimit.get())
+ flowLimit->setManagementObject( mgmtObject );
}
if ( isDurable() && ! getPersistenceId() && ! recovering )
@@ -1200,11 +1189,8 @@ void Queue::enqueued(const QueuedMessage& m)
policy->recoverEnqueued(m.payload);
policy->enqueued(m);
}
- if (flowLimit.get()) {
- bool fc = flowLimit->consume(m);
- if (fc && mgmtObject)
- mgmtObject->set_flowStopped(true);
- }
+ if (flowLimit.get())
+ flowLimit->enqueued(m);
mgntEnqStats(m.payload);
boost::intrusive_ptr<Message> payload = m.payload;
enqueue ( 0, payload, true );
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
index 7cd7b99557..6339085e4c 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -25,6 +25,9 @@
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/Mutex.h"
+
+#include "qmf/org/apache/qpid/broker/Queue.h"
+
#include <sstream>
using namespace qpid::broker;
@@ -89,7 +92,7 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue,
: queue(_queue), queueName("<unknown>"),
flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
- flowStopped(false), count(0), size(0)
+ flowStopped(false), count(0), size(0), queueMgmtObj(0)
{
uint32_t maxCount(0);
uint64_t maxSize(0);
@@ -110,25 +113,25 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue,
-bool QueueFlowLimit::consume(const QueuedMessage& msg)
+void QueueFlowLimit::enqueued(const QueuedMessage& msg)
{
- bool flowChanged(false);
-
- if (!msg.payload) return false;
+ if (!msg.payload) return;
sys::Mutex::ScopedLock l(indexLock);
++count;
size += msg.payload->contentSize();
- if (flowStopCount && !flowStopped && count > flowStopCount) {
- flowChanged = flowStopped = true;
- QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopCount << " enqueued messages. Producer flow control activated." );
- }
-
- if (flowStopSize && !flowStopped && size > flowStopSize) {
- flowChanged = flowStopped = true;
- QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopSize << " enqueued bytes. Producer flow control activated." );
+ if (!flowStopped) {
+ if (flowStopCount && count > flowStopCount) {
+ flowStopped = true;
+ QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopCount << " enqueued messages. Producer flow control activated." );
+ } else if (flowStopSize && size > flowStopSize) {
+ flowStopped = true;
+ QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopSize << " enqueued bytes. Producer flow control activated." );
+ }
+ if (flowStopped && queueMgmtObj)
+ queueMgmtObj->set_flowStopped(true);
}
// KAG: test - REMOVE ONCE STABLE
@@ -138,20 +141,15 @@ bool QueueFlowLimit::consume(const QueuedMessage& msg)
if (flowStopped || !index.empty()) {
msg.payload->getReceiveCompletion().startCompleter(); // don't complete until flow resumes
- //pendingFlow.push_back(msg.payload);
index.insert(msg.payload);
}
-
- return flowChanged;
}
-bool QueueFlowLimit::replenish(const QueuedMessage& msg)
+void QueueFlowLimit::dequeued(const QueuedMessage& msg)
{
- bool flowChanged(false);
-
- if (!msg.payload) return false;
+ if (!msg.payload) return;
sys::Mutex::ScopedLock l(indexLock);
@@ -172,7 +170,8 @@ bool QueueFlowLimit::replenish(const QueuedMessage& msg)
(flowResumeSize == 0 || size < flowResumeSize) &&
(flowResumeCount == 0 || count < flowResumeCount)) {
flowStopped = false;
- flowChanged = true;
+ if (queueMgmtObj)
+ queueMgmtObj->set_flowStopped(false);
QPID_LOG(info, "Queue \"" << queueName << "\": has drained below the flow control resume level. Producer flow control deactivated." );
}
@@ -193,8 +192,19 @@ bool QueueFlowLimit::replenish(const QueuedMessage& msg)
}
}
}
+}
- return flowChanged;
+
+void QueueFlowLimit::setManagementObject(_qmfBroker::Queue *mgmtObject)
+{
+ queueMgmtObj = mgmtObject;
+ if (queueMgmtObj) {
+ queueMgmtObj->set_flowStopCount(getFlowStopCount());
+ queueMgmtObj->set_flowResumeCount(getFlowResumeCount());
+ queueMgmtObj->set_flowStopSize(getFlowStopSize());
+ queueMgmtObj->set_flowResumeSize(getFlowResumeSize());
+ queueMgmtObj->set_flowStopped(isFlowControlActive());
+ }
}
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
index 2de214801f..32031cc0b1 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
@@ -31,6 +31,15 @@
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Mutex.h"
+namespace qmf {
+namespace org {
+namespace apache {
+namespace qpid {
+namespace broker {
+ class Queue;
+}}}}}
+namespace _qmfBroker = qmf::org::apache::qpid::broker;
+
namespace qpid {
namespace broker {
@@ -65,9 +74,9 @@ class QueueFlowLimit
virtual ~QueueFlowLimit() {}
/** the queue has added QueuedMessage. Returns true if flow state changes */
- bool consume(const QueuedMessage&);
+ void enqueued(const QueuedMessage&);
/** the queue has removed QueuedMessage. Returns true if flow state changes */
- bool replenish(const QueuedMessage&);
+ void dequeued(const QueuedMessage&);
uint32_t getFlowStopCount() const { return flowStopCount; }
uint32_t getFlowResumeCount() const { return flowResumeCount; }
@@ -76,6 +85,8 @@ class QueueFlowLimit
bool isFlowControlActive() const { return flowStopped; }
bool monitorFlowControl() const { return flowStopCount || flowStopSize; }
+ void setManagementObject(_qmfBroker::Queue *q);
+
void encode(framing::Buffer& buffer) const;
void decode(framing::Buffer& buffer);
uint32_t encodedSize() const;
@@ -88,6 +99,8 @@ class QueueFlowLimit
std::set< boost::intrusive_ptr<Message> > index;
qpid::sys::Mutex indexLock;
+ _qmfBroker::Queue *queueMgmtObj;
+
QueueFlowLimit(Queue *queue,
uint32_t flowStopCount, uint32_t flowResumeCount,
uint64_t flowStopSize, uint64_t flowResumeSize);
diff --git a/qpid/cpp/src/tests/QueueFlowLimitTest.cpp b/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
index a01907f4ba..99fa98cfbf 100644
--- a/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
+++ b/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
@@ -61,40 +61,39 @@ QPID_AUTO_TEST_CASE(testFlowCount)
BOOST_CHECK(!flow->isFlowControlActive());
BOOST_CHECK(flow->monitorFlowControl());
- bool fc;
std::deque<QueuedMessage> msgs;
for (size_t i = 0; i < 6; i++) {
msgs.push_back(createMessage(10));
- flow->consume(msgs.back());
+ flow->enqueued(msgs.back());
BOOST_CHECK(!flow->isFlowControlActive());
}
BOOST_CHECK(!flow->isFlowControlActive()); // 6 on queue
msgs.push_back(createMessage(10));
- flow->consume(msgs.back());
+ flow->enqueued(msgs.back());
BOOST_CHECK(!flow->isFlowControlActive()); // 7 on queue
msgs.push_back(createMessage(10));
- fc = flow->consume(msgs.back());
- BOOST_CHECK(fc && flow->isFlowControlActive()); // 8 on queue, ON
+ flow->enqueued(msgs.back());
+ BOOST_CHECK(flow->isFlowControlActive()); // 8 on queue, ON
msgs.push_back(createMessage(10));
- fc = flow->consume(msgs.back());
- BOOST_CHECK(!fc && flow->isFlowControlActive()); // 9 on queue, no change to flow control
+ flow->enqueued(msgs.back());
+ BOOST_CHECK(flow->isFlowControlActive()); // 9 on queue, no change to flow control
- flow->replenish(msgs.front());
+ flow->dequeued(msgs.front());
msgs.pop_front();
BOOST_CHECK(flow->isFlowControlActive()); // 8 on queue
- flow->replenish(msgs.front());
+ flow->dequeued(msgs.front());
msgs.pop_front();
BOOST_CHECK(flow->isFlowControlActive()); // 7 on queue
- flow->replenish(msgs.front());
+ flow->dequeued(msgs.front());
msgs.pop_front();
BOOST_CHECK(flow->isFlowControlActive()); // 6 on queue
- fc = flow->replenish(msgs.front());
+ flow->dequeued(msgs.front());
msgs.pop_front();
- BOOST_CHECK(!fc && flow->isFlowControlActive()); // 5 on queue, no change
+ BOOST_CHECK(flow->isFlowControlActive()); // 5 on queue, no change
- fc = flow->replenish(msgs.front());
+ flow->dequeued(msgs.front());
msgs.pop_front();
- BOOST_CHECK(fc && !flow->isFlowControlActive()); // 4 on queue, OFF
+ BOOST_CHECK(!flow->isFlowControlActive()); // 4 on queue, OFF
}
@@ -116,45 +115,45 @@ QPID_AUTO_TEST_CASE(testFlowSize)
std::deque<QueuedMessage> msgs;
for (size_t i = 0; i < 6; i++) {
msgs.push_back(createMessage(10));
- flow->consume(msgs.back());
+ flow->enqueued(msgs.back());
BOOST_CHECK(!flow->isFlowControlActive());
}
BOOST_CHECK(!flow->isFlowControlActive()); // 60 on queue
QueuedMessage msg_9 = createMessage(9);
- flow->consume(msg_9);
+ flow->enqueued(msg_9);
BOOST_CHECK(!flow->isFlowControlActive()); // 69 on queue
QueuedMessage tinyMsg_1 = createMessage(1);
- flow->consume(tinyMsg_1);
+ flow->enqueued(tinyMsg_1);
BOOST_CHECK(!flow->isFlowControlActive()); // 70 on queue
QueuedMessage tinyMsg_2 = createMessage(1);
- flow->consume(tinyMsg_2);
+ flow->enqueued(tinyMsg_2);
BOOST_CHECK(flow->isFlowControlActive()); // 71 on queue, ON
msgs.push_back(createMessage(10));
- flow->consume(msgs.back());
+ flow->enqueued(msgs.back());
BOOST_CHECK(flow->isFlowControlActive()); // 81 on queue
- flow->replenish(msgs.front());
+ flow->dequeued(msgs.front());
msgs.pop_front();
BOOST_CHECK(flow->isFlowControlActive()); // 71 on queue
- flow->replenish(msgs.front());
+ flow->dequeued(msgs.front());
msgs.pop_front();
BOOST_CHECK(flow->isFlowControlActive()); // 61 on queue
- flow->replenish(msgs.front());
+ flow->dequeued(msgs.front());
msgs.pop_front();
BOOST_CHECK(flow->isFlowControlActive()); // 51 on queue
- flow->replenish(tinyMsg_1);
+ flow->dequeued(tinyMsg_1);
BOOST_CHECK(flow->isFlowControlActive()); // 50 on queue
- flow->replenish(tinyMsg_2);
+ flow->dequeued(tinyMsg_2);
BOOST_CHECK(!flow->isFlowControlActive()); // 49 on queue, OFF
- flow->replenish(msg_9);
+ flow->dequeued(msg_9);
BOOST_CHECK(!flow->isFlowControlActive()); // 40 on queue
- flow->replenish(msgs.front());
+ flow->dequeued(msgs.front());
msgs.pop_front();
BOOST_CHECK(!flow->isFlowControlActive()); // 30 on queue
- flow->replenish(msgs.front());
+ flow->dequeued(msgs.front());
msgs.pop_front();
BOOST_CHECK(!flow->isFlowControlActive()); // 20 on queue
}
@@ -202,28 +201,28 @@ QPID_AUTO_TEST_CASE(testFlowCombo)
for (size_t i = 0; i < 10; i++) {
msgs_10.push_back(createMessage(10));
- flow->consume(msgs_10.back());
+ flow->enqueued(msgs_10.back());
BOOST_CHECK(!flow->isFlowControlActive());
}
// count:10 size:100
msgs_1.push_back(createMessage(1));
- flow->consume(msgs_1.back()); // count:11 size: 101 ->ON
+ flow->enqueued(msgs_1.back()); // count:11 size: 101 ->ON
BOOST_CHECK(flow->isFlowControlActive());
for (size_t i = 0; i < 6; i++) {
- flow->replenish(msgs_10.front());
+ flow->dequeued(msgs_10.front());
msgs_10.pop_front();
BOOST_CHECK(flow->isFlowControlActive());
}
// count:5 size: 41
- flow->replenish(msgs_1.front()); // count: 4 size: 40 ->OFF
+ flow->dequeued(msgs_1.front()); // count: 4 size: 40 ->OFF
msgs_1.pop_front();
BOOST_CHECK(!flow->isFlowControlActive());
for (size_t i = 0; i < 4; i++) {
- flow->replenish(msgs_10.front());
+ flow->dequeued(msgs_10.front());
msgs_10.pop_front();
BOOST_CHECK(!flow->isFlowControlActive());
}
@@ -232,30 +231,30 @@ QPID_AUTO_TEST_CASE(testFlowCombo)
// verify flow control comes ON when only size passes its stop point.
msgs_100.push_back(createMessage(100));
- flow->consume(msgs_100.back()); // count:1 size: 100
+ flow->enqueued(msgs_100.back()); // count:1 size: 100
BOOST_CHECK(!flow->isFlowControlActive());
msgs_50.push_back(createMessage(50));
- flow->consume(msgs_50.back()); // count:2 size: 150
+ flow->enqueued(msgs_50.back()); // count:2 size: 150
BOOST_CHECK(!flow->isFlowControlActive());
msgs_50.push_back(createMessage(50));
- flow->consume(msgs_50.back()); // count:3 size: 200
+ flow->enqueued(msgs_50.back()); // count:3 size: 200
BOOST_CHECK(!flow->isFlowControlActive());
msgs_1.push_back(createMessage(1));
- flow->consume(msgs_1.back()); // count:4 size: 201 ->ON
+ flow->enqueued(msgs_1.back()); // count:4 size: 201 ->ON
BOOST_CHECK(flow->isFlowControlActive());
- flow->replenish(msgs_100.front()); // count:3 size:101
+ flow->dequeued(msgs_100.front()); // count:3 size:101
msgs_100.pop_front();
BOOST_CHECK(flow->isFlowControlActive());
- flow->replenish(msgs_1.front()); // count:2 size:100
+ flow->dequeued(msgs_1.front()); // count:2 size:100
msgs_1.pop_front();
BOOST_CHECK(flow->isFlowControlActive());
- flow->replenish(msgs_50.front()); // count:1 size:50 ->OFF
+ flow->dequeued(msgs_50.front()); // count:1 size:50 ->OFF
msgs_50.pop_front();
BOOST_CHECK(!flow->isFlowControlActive());
@@ -264,40 +263,40 @@ QPID_AUTO_TEST_CASE(testFlowCombo)
for (size_t i = 0; i < 8; i++) {
msgs_10.push_back(createMessage(10));
- flow->consume(msgs_10.back());
+ flow->enqueued(msgs_10.back());
BOOST_CHECK(!flow->isFlowControlActive());
}
// count:9 size:130
msgs_10.push_back(createMessage(10));
- flow->consume(msgs_10.back()); // count:10 size: 140
+ flow->enqueued(msgs_10.back()); // count:10 size: 140
BOOST_CHECK(!flow->isFlowControlActive());
msgs_1.push_back(createMessage(1));
- flow->consume(msgs_1.back()); // count:11 size: 141 ->ON
+ flow->enqueued(msgs_1.back()); // count:11 size: 141 ->ON
BOOST_CHECK(flow->isFlowControlActive());
msgs_100.push_back(createMessage(100));
- flow->consume(msgs_100.back()); // count:12 size: 241 (both thresholds crossed)
+ flow->enqueued(msgs_100.back()); // count:12 size: 241 (both thresholds crossed)
BOOST_CHECK(flow->isFlowControlActive());
// at this point: 9@10 + 1@50 + 1@100 + 1@1 == 12@241
- flow->replenish(msgs_50.front()); // count:11 size:191
+ flow->dequeued(msgs_50.front()); // count:11 size:191
msgs_50.pop_front();
BOOST_CHECK(flow->isFlowControlActive());
for (size_t i = 0; i < 9; i++) {
- flow->replenish(msgs_10.front());
+ flow->dequeued(msgs_10.front());
msgs_10.pop_front();
BOOST_CHECK(flow->isFlowControlActive());
}
// count:2 size:101
- flow->replenish(msgs_1.front()); // count:1 size:100
+ flow->dequeued(msgs_1.front()); // count:1 size:100
msgs_1.pop_front();
BOOST_CHECK(flow->isFlowControlActive()); // still active due to size
- flow->replenish(msgs_100.front()); // count:0 size:0 ->OFF
+ flow->dequeued(msgs_100.front()); // count:0 size:0 ->OFF
msgs_100.pop_front();
BOOST_CHECK(!flow->isFlowControlActive());
}