diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp | 54 |
1 files changed, 32 insertions, 22 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp index 7cd7b99557..6339085e4c 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -25,6 +25,9 @@ #include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" #include "qpid/sys/Mutex.h" + +#include "qmf/org/apache/qpid/broker/Queue.h" + #include <sstream> using namespace qpid::broker; @@ -89,7 +92,7 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue, : queue(_queue), queueName("<unknown>"), flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount), flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize), - flowStopped(false), count(0), size(0) + flowStopped(false), count(0), size(0), queueMgmtObj(0) { uint32_t maxCount(0); uint64_t maxSize(0); @@ -110,25 +113,25 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue, -bool QueueFlowLimit::consume(const QueuedMessage& msg) +void QueueFlowLimit::enqueued(const QueuedMessage& msg) { - bool flowChanged(false); - - if (!msg.payload) return false; + if (!msg.payload) return; sys::Mutex::ScopedLock l(indexLock); ++count; size += msg.payload->contentSize(); - if (flowStopCount && !flowStopped && count > flowStopCount) { - flowChanged = flowStopped = true; - QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopCount << " enqueued messages. Producer flow control activated." ); - } - - if (flowStopSize && !flowStopped && size > flowStopSize) { - flowChanged = flowStopped = true; - QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopSize << " enqueued bytes. Producer flow control activated." ); + if (!flowStopped) { + if (flowStopCount && count > flowStopCount) { + flowStopped = true; + QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopCount << " enqueued messages. Producer flow control activated." ); + } else if (flowStopSize && size > flowStopSize) { + flowStopped = true; + QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopSize << " enqueued bytes. Producer flow control activated." ); + } + if (flowStopped && queueMgmtObj) + queueMgmtObj->set_flowStopped(true); } // KAG: test - REMOVE ONCE STABLE @@ -138,20 +141,15 @@ bool QueueFlowLimit::consume(const QueuedMessage& msg) if (flowStopped || !index.empty()) { msg.payload->getReceiveCompletion().startCompleter(); // don't complete until flow resumes - //pendingFlow.push_back(msg.payload); index.insert(msg.payload); } - - return flowChanged; } -bool QueueFlowLimit::replenish(const QueuedMessage& msg) +void QueueFlowLimit::dequeued(const QueuedMessage& msg) { - bool flowChanged(false); - - if (!msg.payload) return false; + if (!msg.payload) return; sys::Mutex::ScopedLock l(indexLock); @@ -172,7 +170,8 @@ bool QueueFlowLimit::replenish(const QueuedMessage& msg) (flowResumeSize == 0 || size < flowResumeSize) && (flowResumeCount == 0 || count < flowResumeCount)) { flowStopped = false; - flowChanged = true; + if (queueMgmtObj) + queueMgmtObj->set_flowStopped(false); QPID_LOG(info, "Queue \"" << queueName << "\": has drained below the flow control resume level. Producer flow control deactivated." ); } @@ -193,8 +192,19 @@ bool QueueFlowLimit::replenish(const QueuedMessage& msg) } } } +} - return flowChanged; + +void QueueFlowLimit::setManagementObject(_qmfBroker::Queue *mgmtObject) +{ + queueMgmtObj = mgmtObject; + if (queueMgmtObj) { + queueMgmtObj->set_flowStopCount(getFlowStopCount()); + queueMgmtObj->set_flowResumeCount(getFlowResumeCount()); + queueMgmtObj->set_flowStopSize(getFlowStopSize()); + queueMgmtObj->set_flowResumeSize(getFlowResumeSize()); + queueMgmtObj->set_flowStopped(isFlowControlActive()); + } } |