summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp54
1 files changed, 32 insertions, 22 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
index 7cd7b99557..6339085e4c 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -25,6 +25,9 @@
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/Mutex.h"
+
+#include "qmf/org/apache/qpid/broker/Queue.h"
+
#include <sstream>
using namespace qpid::broker;
@@ -89,7 +92,7 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue,
: queue(_queue), queueName("<unknown>"),
flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
- flowStopped(false), count(0), size(0)
+ flowStopped(false), count(0), size(0), queueMgmtObj(0)
{
uint32_t maxCount(0);
uint64_t maxSize(0);
@@ -110,25 +113,25 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue,
-bool QueueFlowLimit::consume(const QueuedMessage& msg)
+void QueueFlowLimit::enqueued(const QueuedMessage& msg)
{
- bool flowChanged(false);
-
- if (!msg.payload) return false;
+ if (!msg.payload) return;
sys::Mutex::ScopedLock l(indexLock);
++count;
size += msg.payload->contentSize();
- if (flowStopCount && !flowStopped && count > flowStopCount) {
- flowChanged = flowStopped = true;
- QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopCount << " enqueued messages. Producer flow control activated." );
- }
-
- if (flowStopSize && !flowStopped && size > flowStopSize) {
- flowChanged = flowStopped = true;
- QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopSize << " enqueued bytes. Producer flow control activated." );
+ if (!flowStopped) {
+ if (flowStopCount && count > flowStopCount) {
+ flowStopped = true;
+ QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopCount << " enqueued messages. Producer flow control activated." );
+ } else if (flowStopSize && size > flowStopSize) {
+ flowStopped = true;
+ QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopSize << " enqueued bytes. Producer flow control activated." );
+ }
+ if (flowStopped && queueMgmtObj)
+ queueMgmtObj->set_flowStopped(true);
}
// KAG: test - REMOVE ONCE STABLE
@@ -138,20 +141,15 @@ bool QueueFlowLimit::consume(const QueuedMessage& msg)
if (flowStopped || !index.empty()) {
msg.payload->getReceiveCompletion().startCompleter(); // don't complete until flow resumes
- //pendingFlow.push_back(msg.payload);
index.insert(msg.payload);
}
-
- return flowChanged;
}
-bool QueueFlowLimit::replenish(const QueuedMessage& msg)
+void QueueFlowLimit::dequeued(const QueuedMessage& msg)
{
- bool flowChanged(false);
-
- if (!msg.payload) return false;
+ if (!msg.payload) return;
sys::Mutex::ScopedLock l(indexLock);
@@ -172,7 +170,8 @@ bool QueueFlowLimit::replenish(const QueuedMessage& msg)
(flowResumeSize == 0 || size < flowResumeSize) &&
(flowResumeCount == 0 || count < flowResumeCount)) {
flowStopped = false;
- flowChanged = true;
+ if (queueMgmtObj)
+ queueMgmtObj->set_flowStopped(false);
QPID_LOG(info, "Queue \"" << queueName << "\": has drained below the flow control resume level. Producer flow control deactivated." );
}
@@ -193,8 +192,19 @@ bool QueueFlowLimit::replenish(const QueuedMessage& msg)
}
}
}
+}
- return flowChanged;
+
+void QueueFlowLimit::setManagementObject(_qmfBroker::Queue *mgmtObject)
+{
+ queueMgmtObj = mgmtObject;
+ if (queueMgmtObj) {
+ queueMgmtObj->set_flowStopCount(getFlowStopCount());
+ queueMgmtObj->set_flowResumeCount(getFlowResumeCount());
+ queueMgmtObj->set_flowStopSize(getFlowStopSize());
+ queueMgmtObj->set_flowResumeSize(getFlowResumeSize());
+ queueMgmtObj->set_flowStopped(isFlowControlActive());
+ }
}