summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/QueueFlowLimit.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/QueueFlowLimit.cpp')
-rw-r--r--cpp/src/qpid/broker/QueueFlowLimit.cpp103
1 files changed, 38 insertions, 65 deletions
diff --git a/cpp/src/qpid/broker/QueueFlowLimit.cpp b/cpp/src/qpid/broker/QueueFlowLimit.cpp
index 14fe5f4022..11b9cbae63 100644
--- a/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -20,7 +20,9 @@
*/
#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Message.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueSettings.h"
#include "qpid/Exception.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/framing/reply_exceptions.h"
@@ -57,34 +59,6 @@ namespace {
<< "=" << max));
}
}
-
- /** extract a capacity value as passed in an argument map
- */
- uint64_t getCapacity(const FieldTable& settings, const std::string& key, uint64_t defaultValue)
- {
- FieldTable::ValuePtr v = settings.get(key);
-
- int64_t result = 0;
-
- if (!v) return defaultValue;
- if (v->getType() == 0x23) {
- QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>());
- } else if (v->getType() == 0x33) {
- QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>());
- } else if (v->convertsTo<int64_t>()) {
- result = v->get<int64_t>();
- QPID_LOG(debug, "Got integer value for " << key << ": " << result);
- if (result >= 0) return result;
- } else if (v->convertsTo<std::string>()) {
- std::string s(v->get<std::string>());
- QPID_LOG(debug, "Got string value for " << key << ": " << s);
- std::istringstream convert(s);
- if (convert >> result && result >= 0) return result;
- }
-
- QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer, using default (" << defaultValue << ")");
- return defaultValue;
- }
}
@@ -102,10 +76,8 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue,
if (queue) {
queueName = _queue->getName();
- if (queue->getPolicy()) {
- maxSize = _queue->getPolicy()->getMaxSize();
- maxCount = _queue->getPolicy()->getMaxCount();
- }
+ if (queue->getSettings().maxDepth.hasCount()) maxCount = queue->getSettings().maxDepth.getCount();
+ if (queue->getSettings().maxDepth.hasCount()) maxSize = queue->getSettings().maxDepth.getSize();
broker = queue->getBroker();
queueMgmtObj = dynamic_cast<_qmfBroker::Queue*> (queue->GetManagementObject());
if (queueMgmtObj) {
@@ -125,23 +97,23 @@ QueueFlowLimit::~QueueFlowLimit()
sys::Mutex::ScopedLock l(indexLock);
if (!index.empty()) {
// we're gone - release all pending msgs
- for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.begin();
+ for (std::map<framing::SequenceNumber, Message >::iterator itr = index.begin();
itr != index.end(); ++itr)
if (itr->second)
try {
- itr->second->getIngressCompletion().finishCompleter();
+ itr->second.getPersistentContext()->getIngressCompletion().finishCompleter();
} catch (...) {} // ignore - not safe for a destructor to throw.
index.clear();
}
}
-void QueueFlowLimit::enqueued(const QueuedMessage& msg)
+void QueueFlowLimit::enqueued(const Message& msg)
{
sys::Mutex::ScopedLock l(indexLock);
++count;
- size += msg.payload->contentSize();
+ size += msg.getContentSize();
if (!flowStopped) {
if (flowStopCount && count > flowStopCount) {
@@ -160,13 +132,13 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg)
if (flowStopped || !index.empty()) {
// ignore flow control if we are populating the queue due to cluster replication:
if (broker && broker->isClusterUpdatee()) {
- QPID_LOG(trace, "Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.position);
+ QPID_LOG(trace, "Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.getSequence());
return;
}
- QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.position);
- msg.payload->getIngressCompletion().startCompleter(); // don't complete until flow resumes
+ QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.getSequence());
+ msg.getPersistentContext()->getIngressCompletion().startCompleter(); // don't complete until flow resumes
bool unique;
- unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(msg.position, msg.payload)).second;
+ unique = index.insert(std::pair<framing::SequenceNumber, Message >(msg.getSequence(), msg)).second;
// Like this to avoid tripping up unused variable warning when NDEBUG set
if (!unique) assert(unique);
}
@@ -174,7 +146,7 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg)
-void QueueFlowLimit::dequeued(const QueuedMessage& msg)
+void QueueFlowLimit::dequeued(const Message& msg)
{
sys::Mutex::ScopedLock l(indexLock);
@@ -184,7 +156,7 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg)
throw Exception(QPID_MSG("Flow limit count underflow on dequeue. Queue=" << queueName));
}
- uint64_t _size = msg.payload->contentSize();
+ uint64_t _size = msg.getContentSize();
if (_size <= size) {
size -= _size;
} else {
@@ -203,16 +175,16 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg)
if (!index.empty()) {
if (!flowStopped) {
// flow enabled - release all pending msgs
- for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.begin();
+ for (std::map<framing::SequenceNumber, Message >::iterator itr = index.begin();
itr != index.end(); ++itr)
if (itr->second)
- itr->second->getIngressCompletion().finishCompleter();
+ itr->second.getPersistentContext()->getIngressCompletion().finishCompleter();
index.clear();
} else {
// even if flow controlled, we must release this msg as it is being dequeued
- std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.position);
+ std::map<framing::SequenceNumber, Message >::iterator itr = index.find(msg.getSequence());
if (itr != index.end()) { // this msg is flow controlled, release it:
- msg.payload->getIngressCompletion().finishCompleter();
+ msg.getPersistentContext()->getIngressCompletion().finishCompleter();
index.erase(itr);
}
}
@@ -279,7 +251,7 @@ void QueueFlowLimit::setDefaults(uint64_t maxQueueSize, uint flowStopRatio, uint
}
-void QueueFlowLimit::observe(Queue& queue, const qpid::framing::FieldTable& settings)
+void QueueFlowLimit::observe(Queue& queue, const QueueSettings& settings)
{
QueueFlowLimit *ptr = createLimit( &queue, settings );
if (ptr) {
@@ -289,36 +261,37 @@ void QueueFlowLimit::observe(Queue& queue, const qpid::framing::FieldTable& sett
}
/** returns ptr to a QueueFlowLimit, else 0 if no limit */
-QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::FieldTable& settings)
+QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const QueueSettings& settings)
{
- std::string type(QueuePolicy::getType(settings));
-
- if (type == QueuePolicy::RING || type == QueuePolicy::RING_STRICT) {
+ if (settings.dropMessagesAtLimit) {
// The size of a RING queue is limited by design - no need for flow control.
return 0;
}
- if (settings.get(flowStopCountKey) || settings.get(flowStopSizeKey) ||
- settings.get(flowResumeCountKey) || settings.get(flowResumeSizeKey)) {
+ if (settings.flowStop.hasCount() || settings.flowStop.hasSize()) {
// user provided (some) flow settings manually...
- uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0);
- uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0);
- uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0);
- uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0);
- if (flowStopCount == 0 && flowStopSize == 0) { // disable flow control
+ if (settings.flowStop.getCount() || settings.flowStop.getSize()) {
+ return new QueueFlowLimit(queue,
+ settings.flowStop.getCount(),
+ settings.flowResume.getCount(),
+ settings.flowStop.getSize(),
+ settings.flowResume.getSize());
+ } else {
+ //don't have a non-zero value for either the count or the
+ //size to stop at, yet at least one of these settings was
+ //provided, i.e it was set to 0 explicitly which we treat
+ //as turning it off
return 0;
}
- return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
}
if (defaultFlowStopRatio) { // broker has a default ratio setup...
- uint64_t maxByteCount = getCapacity(settings, QueuePolicy::maxSizeKey, defaultMaxSize);
+ uint64_t maxByteCount = settings.maxDepth.hasSize() ? settings.maxDepth.getSize() : defaultMaxSize;
uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5);
uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0));
- uint32_t maxMsgCount = getCapacity(settings, QueuePolicy::maxCountKey, 0); // no size by default
+ uint32_t maxMsgCount = settings.maxDepth.hasCount() ? settings.maxDepth.getCount() : 0;
uint32_t flowStopCount = (uint32_t)(maxMsgCount * (defaultFlowStopRatio/100.0) + 0.5);
uint32_t flowResumeCount = (uint32_t)(maxMsgCount * (defaultFlowResumeRatio/100.0));
-
return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
}
return 0;
@@ -346,7 +319,7 @@ void QueueFlowLimit::getState(qpid::framing::FieldTable& state ) const
framing::SequenceSet ss;
if (!index.empty()) {
/* replicate the set of messages pending flow control */
- for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::const_iterator itr = index.begin();
+ for (std::map<framing::SequenceNumber, Message >::const_iterator itr = index.begin();
itr != index.end(); ++itr) {
ss.add(itr->first);
}
@@ -377,10 +350,10 @@ void QueueFlowLimit::setState(const qpid::framing::FieldTable& state)
++i;
fcmsg.add(first, last);
for (SequenceNumber seq = first; seq <= last; ++seq) {
- QueuedMessage msg;
+ Message msg;
queue->find(seq, msg); // fyi: may not be found if msg is acquired & unacked
bool unique;
- unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second;
+ unique = index.insert(std::pair<framing::SequenceNumber, Message >(seq, msg)).second;
// Like this to avoid tripping up unused variable warning when NDEBUG set
if (!unique) assert(unique);
}