diff options
Diffstat (limited to 'cpp/src/qpid/broker/QueueFlowLimit.cpp')
-rw-r--r-- | cpp/src/qpid/broker/QueueFlowLimit.cpp | 103 |
1 files changed, 38 insertions, 65 deletions
diff --git a/cpp/src/qpid/broker/QueueFlowLimit.cpp b/cpp/src/qpid/broker/QueueFlowLimit.cpp index 14fe5f4022..11b9cbae63 100644 --- a/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -20,7 +20,9 @@ */ #include "qpid/broker/QueueFlowLimit.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/Message.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/QueueSettings.h" #include "qpid/Exception.h" #include "qpid/framing/FieldValue.h" #include "qpid/framing/reply_exceptions.h" @@ -57,34 +59,6 @@ namespace { << "=" << max)); } } - - /** extract a capacity value as passed in an argument map - */ - uint64_t getCapacity(const FieldTable& settings, const std::string& key, uint64_t defaultValue) - { - FieldTable::ValuePtr v = settings.get(key); - - int64_t result = 0; - - if (!v) return defaultValue; - if (v->getType() == 0x23) { - QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>()); - } else if (v->getType() == 0x33) { - QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>()); - } else if (v->convertsTo<int64_t>()) { - result = v->get<int64_t>(); - QPID_LOG(debug, "Got integer value for " << key << ": " << result); - if (result >= 0) return result; - } else if (v->convertsTo<std::string>()) { - std::string s(v->get<std::string>()); - QPID_LOG(debug, "Got string value for " << key << ": " << s); - std::istringstream convert(s); - if (convert >> result && result >= 0) return result; - } - - QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer, using default (" << defaultValue << ")"); - return defaultValue; - } } @@ -102,10 +76,8 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue, if (queue) { queueName = _queue->getName(); - if (queue->getPolicy()) { - maxSize = _queue->getPolicy()->getMaxSize(); - maxCount = _queue->getPolicy()->getMaxCount(); - } + if (queue->getSettings().maxDepth.hasCount()) maxCount = queue->getSettings().maxDepth.getCount(); + if (queue->getSettings().maxDepth.hasCount()) maxSize = queue->getSettings().maxDepth.getSize(); broker = queue->getBroker(); queueMgmtObj = dynamic_cast<_qmfBroker::Queue*> (queue->GetManagementObject()); if (queueMgmtObj) { @@ -125,23 +97,23 @@ QueueFlowLimit::~QueueFlowLimit() sys::Mutex::ScopedLock l(indexLock); if (!index.empty()) { // we're gone - release all pending msgs - for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.begin(); + for (std::map<framing::SequenceNumber, Message >::iterator itr = index.begin(); itr != index.end(); ++itr) if (itr->second) try { - itr->second->getIngressCompletion().finishCompleter(); + itr->second.getPersistentContext()->getIngressCompletion().finishCompleter(); } catch (...) {} // ignore - not safe for a destructor to throw. index.clear(); } } -void QueueFlowLimit::enqueued(const QueuedMessage& msg) +void QueueFlowLimit::enqueued(const Message& msg) { sys::Mutex::ScopedLock l(indexLock); ++count; - size += msg.payload->contentSize(); + size += msg.getContentSize(); if (!flowStopped) { if (flowStopCount && count > flowStopCount) { @@ -160,13 +132,13 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg) if (flowStopped || !index.empty()) { // ignore flow control if we are populating the queue due to cluster replication: if (broker && broker->isClusterUpdatee()) { - QPID_LOG(trace, "Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.position); + QPID_LOG(trace, "Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.getSequence()); return; } - QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.position); - msg.payload->getIngressCompletion().startCompleter(); // don't complete until flow resumes + QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.getSequence()); + msg.getPersistentContext()->getIngressCompletion().startCompleter(); // don't complete until flow resumes bool unique; - unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(msg.position, msg.payload)).second; + unique = index.insert(std::pair<framing::SequenceNumber, Message >(msg.getSequence(), msg)).second; // Like this to avoid tripping up unused variable warning when NDEBUG set if (!unique) assert(unique); } @@ -174,7 +146,7 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg) -void QueueFlowLimit::dequeued(const QueuedMessage& msg) +void QueueFlowLimit::dequeued(const Message& msg) { sys::Mutex::ScopedLock l(indexLock); @@ -184,7 +156,7 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg) throw Exception(QPID_MSG("Flow limit count underflow on dequeue. Queue=" << queueName)); } - uint64_t _size = msg.payload->contentSize(); + uint64_t _size = msg.getContentSize(); if (_size <= size) { size -= _size; } else { @@ -203,16 +175,16 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg) if (!index.empty()) { if (!flowStopped) { // flow enabled - release all pending msgs - for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.begin(); + for (std::map<framing::SequenceNumber, Message >::iterator itr = index.begin(); itr != index.end(); ++itr) if (itr->second) - itr->second->getIngressCompletion().finishCompleter(); + itr->second.getPersistentContext()->getIngressCompletion().finishCompleter(); index.clear(); } else { // even if flow controlled, we must release this msg as it is being dequeued - std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.position); + std::map<framing::SequenceNumber, Message >::iterator itr = index.find(msg.getSequence()); if (itr != index.end()) { // this msg is flow controlled, release it: - msg.payload->getIngressCompletion().finishCompleter(); + msg.getPersistentContext()->getIngressCompletion().finishCompleter(); index.erase(itr); } } @@ -279,7 +251,7 @@ void QueueFlowLimit::setDefaults(uint64_t maxQueueSize, uint flowStopRatio, uint } -void QueueFlowLimit::observe(Queue& queue, const qpid::framing::FieldTable& settings) +void QueueFlowLimit::observe(Queue& queue, const QueueSettings& settings) { QueueFlowLimit *ptr = createLimit( &queue, settings ); if (ptr) { @@ -289,36 +261,37 @@ void QueueFlowLimit::observe(Queue& queue, const qpid::framing::FieldTable& sett } /** returns ptr to a QueueFlowLimit, else 0 if no limit */ -QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::FieldTable& settings) +QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const QueueSettings& settings) { - std::string type(QueuePolicy::getType(settings)); - - if (type == QueuePolicy::RING || type == QueuePolicy::RING_STRICT) { + if (settings.dropMessagesAtLimit) { // The size of a RING queue is limited by design - no need for flow control. return 0; } - if (settings.get(flowStopCountKey) || settings.get(flowStopSizeKey) || - settings.get(flowResumeCountKey) || settings.get(flowResumeSizeKey)) { + if (settings.flowStop.hasCount() || settings.flowStop.hasSize()) { // user provided (some) flow settings manually... - uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0); - uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0); - uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0); - uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0); - if (flowStopCount == 0 && flowStopSize == 0) { // disable flow control + if (settings.flowStop.getCount() || settings.flowStop.getSize()) { + return new QueueFlowLimit(queue, + settings.flowStop.getCount(), + settings.flowResume.getCount(), + settings.flowStop.getSize(), + settings.flowResume.getSize()); + } else { + //don't have a non-zero value for either the count or the + //size to stop at, yet at least one of these settings was + //provided, i.e it was set to 0 explicitly which we treat + //as turning it off return 0; } - return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize); } if (defaultFlowStopRatio) { // broker has a default ratio setup... - uint64_t maxByteCount = getCapacity(settings, QueuePolicy::maxSizeKey, defaultMaxSize); + uint64_t maxByteCount = settings.maxDepth.hasSize() ? settings.maxDepth.getSize() : defaultMaxSize; uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5); uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0)); - uint32_t maxMsgCount = getCapacity(settings, QueuePolicy::maxCountKey, 0); // no size by default + uint32_t maxMsgCount = settings.maxDepth.hasCount() ? settings.maxDepth.getCount() : 0; uint32_t flowStopCount = (uint32_t)(maxMsgCount * (defaultFlowStopRatio/100.0) + 0.5); uint32_t flowResumeCount = (uint32_t)(maxMsgCount * (defaultFlowResumeRatio/100.0)); - return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize); } return 0; @@ -346,7 +319,7 @@ void QueueFlowLimit::getState(qpid::framing::FieldTable& state ) const framing::SequenceSet ss; if (!index.empty()) { /* replicate the set of messages pending flow control */ - for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::const_iterator itr = index.begin(); + for (std::map<framing::SequenceNumber, Message >::const_iterator itr = index.begin(); itr != index.end(); ++itr) { ss.add(itr->first); } @@ -377,10 +350,10 @@ void QueueFlowLimit::setState(const qpid::framing::FieldTable& state) ++i; fcmsg.add(first, last); for (SequenceNumber seq = first; seq <= last; ++seq) { - QueuedMessage msg; + Message msg; queue->find(seq, msg); // fyi: may not be found if msg is acquired & unacked bool unique; - unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second; + unique = index.insert(std::pair<framing::SequenceNumber, Message >(seq, msg)).second; // Like this to avoid tripping up unused variable warning when NDEBUG set if (!unique) assert(unique); } |