diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-08 22:33:04 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-08 22:33:04 +0000 |
commit | 38f268837e1488b4ea63101d669f1117861a0012 (patch) | |
tree | 1a871c0d30f1609e6f42e0f0295b1781204fcb7a | |
parent | be7946e8b700018bf204e11f650f77d188ca10e3 (diff) | |
download | qpid-python-38f268837e1488b4ea63101d669f1117861a0012.tar.gz |
QPID-2935: modify flow accounting api per review input.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1068645 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 30 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp | 54 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.h | 17 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueueFlowLimitTest.cpp | 93 |
4 files changed, 101 insertions, 93 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index ae3f84008a..8fc4a8ec39 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -645,11 +645,8 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ if (policy.get()) { policy->enqueued(qm); } - if (flowLimit.get()) { - bool fc = flowLimit->consume(qm); - if (fc && mgmtObject) - mgmtObject->set_flowStopped(true); - } + if (flowLimit.get()) + flowLimit->enqueued(qm); } copy.notify(); } @@ -848,11 +845,8 @@ void Queue::popAndDequeue() void Queue::dequeued(const QueuedMessage& msg) { if (policy.get()) policy->dequeued(msg); - if (flowLimit.get()) { - bool fc = flowLimit->replenish(msg); - if (fc && mgmtObject) - mgmtObject->set_flowStopped(false); - } + if (flowLimit.get()) + flowLimit->dequeued(msg); mgntDeqStats(msg.payload); if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) { eventMgr->dequeued(msg); @@ -918,13 +912,8 @@ void Queue::configure(const FieldTable& _settings, bool recovering) if (mgmtObject != 0) { mgmtObject->set_arguments(ManagementAgent::toMap(_settings)); - if (flowLimit.get()) { - mgmtObject->set_flowStopCount(flowLimit->getFlowStopCount()); - mgmtObject->set_flowResumeCount(flowLimit->getFlowResumeCount()); - mgmtObject->set_flowStopSize(flowLimit->getFlowStopSize()); - mgmtObject->set_flowResumeSize(flowLimit->getFlowResumeSize()); - mgmtObject->set_flowStopped(flowLimit->isFlowControlActive()); - } + if (flowLimit.get()) + flowLimit->setManagementObject( mgmtObject ); } if ( isDurable() && ! getPersistenceId() && ! recovering ) @@ -1200,11 +1189,8 @@ void Queue::enqueued(const QueuedMessage& m) policy->recoverEnqueued(m.payload); policy->enqueued(m); } - if (flowLimit.get()) { - bool fc = flowLimit->consume(m); - if (fc && mgmtObject) - mgmtObject->set_flowStopped(true); - } + if (flowLimit.get()) + flowLimit->enqueued(m); mgntEnqStats(m.payload); boost::intrusive_ptr<Message> payload = m.payload; enqueue ( 0, payload, true ); diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp index 7cd7b99557..6339085e4c 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -25,6 +25,9 @@ #include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" #include "qpid/sys/Mutex.h" + +#include "qmf/org/apache/qpid/broker/Queue.h" + #include <sstream> using namespace qpid::broker; @@ -89,7 +92,7 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue, : queue(_queue), queueName("<unknown>"), flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount), flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize), - flowStopped(false), count(0), size(0) + flowStopped(false), count(0), size(0), queueMgmtObj(0) { uint32_t maxCount(0); uint64_t maxSize(0); @@ -110,25 +113,25 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue, -bool QueueFlowLimit::consume(const QueuedMessage& msg) +void QueueFlowLimit::enqueued(const QueuedMessage& msg) { - bool flowChanged(false); - - if (!msg.payload) return false; + if (!msg.payload) return; sys::Mutex::ScopedLock l(indexLock); ++count; size += msg.payload->contentSize(); - if (flowStopCount && !flowStopped && count > flowStopCount) { - flowChanged = flowStopped = true; - QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopCount << " enqueued messages. Producer flow control activated." ); - } - - if (flowStopSize && !flowStopped && size > flowStopSize) { - flowChanged = flowStopped = true; - QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopSize << " enqueued bytes. Producer flow control activated." ); + if (!flowStopped) { + if (flowStopCount && count > flowStopCount) { + flowStopped = true; + QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopCount << " enqueued messages. Producer flow control activated." ); + } else if (flowStopSize && size > flowStopSize) { + flowStopped = true; + QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopSize << " enqueued bytes. Producer flow control activated." ); + } + if (flowStopped && queueMgmtObj) + queueMgmtObj->set_flowStopped(true); } // KAG: test - REMOVE ONCE STABLE @@ -138,20 +141,15 @@ bool QueueFlowLimit::consume(const QueuedMessage& msg) if (flowStopped || !index.empty()) { msg.payload->getReceiveCompletion().startCompleter(); // don't complete until flow resumes - //pendingFlow.push_back(msg.payload); index.insert(msg.payload); } - - return flowChanged; } -bool QueueFlowLimit::replenish(const QueuedMessage& msg) +void QueueFlowLimit::dequeued(const QueuedMessage& msg) { - bool flowChanged(false); - - if (!msg.payload) return false; + if (!msg.payload) return; sys::Mutex::ScopedLock l(indexLock); @@ -172,7 +170,8 @@ bool QueueFlowLimit::replenish(const QueuedMessage& msg) (flowResumeSize == 0 || size < flowResumeSize) && (flowResumeCount == 0 || count < flowResumeCount)) { flowStopped = false; - flowChanged = true; + if (queueMgmtObj) + queueMgmtObj->set_flowStopped(false); QPID_LOG(info, "Queue \"" << queueName << "\": has drained below the flow control resume level. Producer flow control deactivated." ); } @@ -193,8 +192,19 @@ bool QueueFlowLimit::replenish(const QueuedMessage& msg) } } } +} - return flowChanged; + +void QueueFlowLimit::setManagementObject(_qmfBroker::Queue *mgmtObject) +{ + queueMgmtObj = mgmtObject; + if (queueMgmtObj) { + queueMgmtObj->set_flowStopCount(getFlowStopCount()); + queueMgmtObj->set_flowResumeCount(getFlowResumeCount()); + queueMgmtObj->set_flowStopSize(getFlowStopSize()); + queueMgmtObj->set_flowResumeSize(getFlowResumeSize()); + queueMgmtObj->set_flowStopped(isFlowControlActive()); + } } diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h index 2de214801f..32031cc0b1 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h @@ -31,6 +31,15 @@ #include "qpid/sys/AtomicValue.h" #include "qpid/sys/Mutex.h" +namespace qmf { +namespace org { +namespace apache { +namespace qpid { +namespace broker { + class Queue; +}}}}} +namespace _qmfBroker = qmf::org::apache::qpid::broker; + namespace qpid { namespace broker { @@ -65,9 +74,9 @@ class QueueFlowLimit virtual ~QueueFlowLimit() {} /** the queue has added QueuedMessage. Returns true if flow state changes */ - bool consume(const QueuedMessage&); + void enqueued(const QueuedMessage&); /** the queue has removed QueuedMessage. Returns true if flow state changes */ - bool replenish(const QueuedMessage&); + void dequeued(const QueuedMessage&); uint32_t getFlowStopCount() const { return flowStopCount; } uint32_t getFlowResumeCount() const { return flowResumeCount; } @@ -76,6 +85,8 @@ class QueueFlowLimit bool isFlowControlActive() const { return flowStopped; } bool monitorFlowControl() const { return flowStopCount || flowStopSize; } + void setManagementObject(_qmfBroker::Queue *q); + void encode(framing::Buffer& buffer) const; void decode(framing::Buffer& buffer); uint32_t encodedSize() const; @@ -88,6 +99,8 @@ class QueueFlowLimit std::set< boost::intrusive_ptr<Message> > index; qpid::sys::Mutex indexLock; + _qmfBroker::Queue *queueMgmtObj; + QueueFlowLimit(Queue *queue, uint32_t flowStopCount, uint32_t flowResumeCount, uint64_t flowStopSize, uint64_t flowResumeSize); diff --git a/qpid/cpp/src/tests/QueueFlowLimitTest.cpp b/qpid/cpp/src/tests/QueueFlowLimitTest.cpp index a01907f4ba..99fa98cfbf 100644 --- a/qpid/cpp/src/tests/QueueFlowLimitTest.cpp +++ b/qpid/cpp/src/tests/QueueFlowLimitTest.cpp @@ -61,40 +61,39 @@ QPID_AUTO_TEST_CASE(testFlowCount) BOOST_CHECK(!flow->isFlowControlActive()); BOOST_CHECK(flow->monitorFlowControl()); - bool fc; std::deque<QueuedMessage> msgs; for (size_t i = 0; i < 6; i++) { msgs.push_back(createMessage(10)); - flow->consume(msgs.back()); + flow->enqueued(msgs.back()); BOOST_CHECK(!flow->isFlowControlActive()); } BOOST_CHECK(!flow->isFlowControlActive()); // 6 on queue msgs.push_back(createMessage(10)); - flow->consume(msgs.back()); + flow->enqueued(msgs.back()); BOOST_CHECK(!flow->isFlowControlActive()); // 7 on queue msgs.push_back(createMessage(10)); - fc = flow->consume(msgs.back()); - BOOST_CHECK(fc && flow->isFlowControlActive()); // 8 on queue, ON + flow->enqueued(msgs.back()); + BOOST_CHECK(flow->isFlowControlActive()); // 8 on queue, ON msgs.push_back(createMessage(10)); - fc = flow->consume(msgs.back()); - BOOST_CHECK(!fc && flow->isFlowControlActive()); // 9 on queue, no change to flow control + flow->enqueued(msgs.back()); + BOOST_CHECK(flow->isFlowControlActive()); // 9 on queue, no change to flow control - flow->replenish(msgs.front()); + flow->dequeued(msgs.front()); msgs.pop_front(); BOOST_CHECK(flow->isFlowControlActive()); // 8 on queue - flow->replenish(msgs.front()); + flow->dequeued(msgs.front()); msgs.pop_front(); BOOST_CHECK(flow->isFlowControlActive()); // 7 on queue - flow->replenish(msgs.front()); + flow->dequeued(msgs.front()); msgs.pop_front(); BOOST_CHECK(flow->isFlowControlActive()); // 6 on queue - fc = flow->replenish(msgs.front()); + flow->dequeued(msgs.front()); msgs.pop_front(); - BOOST_CHECK(!fc && flow->isFlowControlActive()); // 5 on queue, no change + BOOST_CHECK(flow->isFlowControlActive()); // 5 on queue, no change - fc = flow->replenish(msgs.front()); + flow->dequeued(msgs.front()); msgs.pop_front(); - BOOST_CHECK(fc && !flow->isFlowControlActive()); // 4 on queue, OFF + BOOST_CHECK(!flow->isFlowControlActive()); // 4 on queue, OFF } @@ -116,45 +115,45 @@ QPID_AUTO_TEST_CASE(testFlowSize) std::deque<QueuedMessage> msgs; for (size_t i = 0; i < 6; i++) { msgs.push_back(createMessage(10)); - flow->consume(msgs.back()); + flow->enqueued(msgs.back()); BOOST_CHECK(!flow->isFlowControlActive()); } BOOST_CHECK(!flow->isFlowControlActive()); // 60 on queue QueuedMessage msg_9 = createMessage(9); - flow->consume(msg_9); + flow->enqueued(msg_9); BOOST_CHECK(!flow->isFlowControlActive()); // 69 on queue QueuedMessage tinyMsg_1 = createMessage(1); - flow->consume(tinyMsg_1); + flow->enqueued(tinyMsg_1); BOOST_CHECK(!flow->isFlowControlActive()); // 70 on queue QueuedMessage tinyMsg_2 = createMessage(1); - flow->consume(tinyMsg_2); + flow->enqueued(tinyMsg_2); BOOST_CHECK(flow->isFlowControlActive()); // 71 on queue, ON msgs.push_back(createMessage(10)); - flow->consume(msgs.back()); + flow->enqueued(msgs.back()); BOOST_CHECK(flow->isFlowControlActive()); // 81 on queue - flow->replenish(msgs.front()); + flow->dequeued(msgs.front()); msgs.pop_front(); BOOST_CHECK(flow->isFlowControlActive()); // 71 on queue - flow->replenish(msgs.front()); + flow->dequeued(msgs.front()); msgs.pop_front(); BOOST_CHECK(flow->isFlowControlActive()); // 61 on queue - flow->replenish(msgs.front()); + flow->dequeued(msgs.front()); msgs.pop_front(); BOOST_CHECK(flow->isFlowControlActive()); // 51 on queue - flow->replenish(tinyMsg_1); + flow->dequeued(tinyMsg_1); BOOST_CHECK(flow->isFlowControlActive()); // 50 on queue - flow->replenish(tinyMsg_2); + flow->dequeued(tinyMsg_2); BOOST_CHECK(!flow->isFlowControlActive()); // 49 on queue, OFF - flow->replenish(msg_9); + flow->dequeued(msg_9); BOOST_CHECK(!flow->isFlowControlActive()); // 40 on queue - flow->replenish(msgs.front()); + flow->dequeued(msgs.front()); msgs.pop_front(); BOOST_CHECK(!flow->isFlowControlActive()); // 30 on queue - flow->replenish(msgs.front()); + flow->dequeued(msgs.front()); msgs.pop_front(); BOOST_CHECK(!flow->isFlowControlActive()); // 20 on queue } @@ -202,28 +201,28 @@ QPID_AUTO_TEST_CASE(testFlowCombo) for (size_t i = 0; i < 10; i++) { msgs_10.push_back(createMessage(10)); - flow->consume(msgs_10.back()); + flow->enqueued(msgs_10.back()); BOOST_CHECK(!flow->isFlowControlActive()); } // count:10 size:100 msgs_1.push_back(createMessage(1)); - flow->consume(msgs_1.back()); // count:11 size: 101 ->ON + flow->enqueued(msgs_1.back()); // count:11 size: 101 ->ON BOOST_CHECK(flow->isFlowControlActive()); for (size_t i = 0; i < 6; i++) { - flow->replenish(msgs_10.front()); + flow->dequeued(msgs_10.front()); msgs_10.pop_front(); BOOST_CHECK(flow->isFlowControlActive()); } // count:5 size: 41 - flow->replenish(msgs_1.front()); // count: 4 size: 40 ->OFF + flow->dequeued(msgs_1.front()); // count: 4 size: 40 ->OFF msgs_1.pop_front(); BOOST_CHECK(!flow->isFlowControlActive()); for (size_t i = 0; i < 4; i++) { - flow->replenish(msgs_10.front()); + flow->dequeued(msgs_10.front()); msgs_10.pop_front(); BOOST_CHECK(!flow->isFlowControlActive()); } @@ -232,30 +231,30 @@ QPID_AUTO_TEST_CASE(testFlowCombo) // verify flow control comes ON when only size passes its stop point. msgs_100.push_back(createMessage(100)); - flow->consume(msgs_100.back()); // count:1 size: 100 + flow->enqueued(msgs_100.back()); // count:1 size: 100 BOOST_CHECK(!flow->isFlowControlActive()); msgs_50.push_back(createMessage(50)); - flow->consume(msgs_50.back()); // count:2 size: 150 + flow->enqueued(msgs_50.back()); // count:2 size: 150 BOOST_CHECK(!flow->isFlowControlActive()); msgs_50.push_back(createMessage(50)); - flow->consume(msgs_50.back()); // count:3 size: 200 + flow->enqueued(msgs_50.back()); // count:3 size: 200 BOOST_CHECK(!flow->isFlowControlActive()); msgs_1.push_back(createMessage(1)); - flow->consume(msgs_1.back()); // count:4 size: 201 ->ON + flow->enqueued(msgs_1.back()); // count:4 size: 201 ->ON BOOST_CHECK(flow->isFlowControlActive()); - flow->replenish(msgs_100.front()); // count:3 size:101 + flow->dequeued(msgs_100.front()); // count:3 size:101 msgs_100.pop_front(); BOOST_CHECK(flow->isFlowControlActive()); - flow->replenish(msgs_1.front()); // count:2 size:100 + flow->dequeued(msgs_1.front()); // count:2 size:100 msgs_1.pop_front(); BOOST_CHECK(flow->isFlowControlActive()); - flow->replenish(msgs_50.front()); // count:1 size:50 ->OFF + flow->dequeued(msgs_50.front()); // count:1 size:50 ->OFF msgs_50.pop_front(); BOOST_CHECK(!flow->isFlowControlActive()); @@ -264,40 +263,40 @@ QPID_AUTO_TEST_CASE(testFlowCombo) for (size_t i = 0; i < 8; i++) { msgs_10.push_back(createMessage(10)); - flow->consume(msgs_10.back()); + flow->enqueued(msgs_10.back()); BOOST_CHECK(!flow->isFlowControlActive()); } // count:9 size:130 msgs_10.push_back(createMessage(10)); - flow->consume(msgs_10.back()); // count:10 size: 140 + flow->enqueued(msgs_10.back()); // count:10 size: 140 BOOST_CHECK(!flow->isFlowControlActive()); msgs_1.push_back(createMessage(1)); - flow->consume(msgs_1.back()); // count:11 size: 141 ->ON + flow->enqueued(msgs_1.back()); // count:11 size: 141 ->ON BOOST_CHECK(flow->isFlowControlActive()); msgs_100.push_back(createMessage(100)); - flow->consume(msgs_100.back()); // count:12 size: 241 (both thresholds crossed) + flow->enqueued(msgs_100.back()); // count:12 size: 241 (both thresholds crossed) BOOST_CHECK(flow->isFlowControlActive()); // at this point: 9@10 + 1@50 + 1@100 + 1@1 == 12@241 - flow->replenish(msgs_50.front()); // count:11 size:191 + flow->dequeued(msgs_50.front()); // count:11 size:191 msgs_50.pop_front(); BOOST_CHECK(flow->isFlowControlActive()); for (size_t i = 0; i < 9; i++) { - flow->replenish(msgs_10.front()); + flow->dequeued(msgs_10.front()); msgs_10.pop_front(); BOOST_CHECK(flow->isFlowControlActive()); } // count:2 size:101 - flow->replenish(msgs_1.front()); // count:1 size:100 + flow->dequeued(msgs_1.front()); // count:1 size:100 msgs_1.pop_front(); BOOST_CHECK(flow->isFlowControlActive()); // still active due to size - flow->replenish(msgs_100.front()); // count:0 size:0 ->OFF + flow->dequeued(msgs_100.front()); // count:0 size:0 ->OFF msgs_100.pop_front(); BOOST_CHECK(!flow->isFlowControlActive()); } |