diff options
Diffstat (limited to 'cpp/src/qpid/broker/QueueFlowLimit.cpp')
-rw-r--r-- | cpp/src/qpid/broker/QueueFlowLimit.cpp | 135 |
1 files changed, 78 insertions, 57 deletions
diff --git a/cpp/src/qpid/broker/QueueFlowLimit.cpp b/cpp/src/qpid/broker/QueueFlowLimit.cpp index 3494288f7b..20679972ff 100644 --- a/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -92,7 +92,7 @@ namespace { QueueFlowLimit::QueueFlowLimit(Queue *_queue, uint32_t _flowStopCount, uint32_t _flowResumeCount, uint64_t _flowStopSize, uint64_t _flowResumeSize) - : queue(_queue), queueName("<unknown>"), + : StatefulQueueObserver(std::string("QueueFlowLimit")), queue(_queue), queueName("<unknown>"), flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount), flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize), flowStopped(false), count(0), size(0), queueMgmtObj(0), broker(0) @@ -123,8 +123,6 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue, void QueueFlowLimit::enqueued(const QueuedMessage& msg) { - if (!msg.payload) return; - sys::Mutex::ScopedLock l(indexLock); ++count; @@ -152,7 +150,9 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg) } QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.position); msg.payload->getIngressCompletion().startCompleter(); // don't complete until flow resumes - index.insert(msg.payload); + bool unique; + unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(msg.position, msg.payload)).second; + assert(unique); } } @@ -160,8 +160,6 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg) void QueueFlowLimit::dequeued(const QueuedMessage& msg) { - if (!msg.payload) return; - sys::Mutex::ScopedLock l(indexLock); if (count > 0) { @@ -189,16 +187,16 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg) if (!index.empty()) { if (!flowStopped) { // flow enabled - release all pending msgs - while (!index.empty()) { - std::set< boost::intrusive_ptr<Message> >::iterator itr = index.begin(); - (*itr)->getIngressCompletion().finishCompleter(); - index.erase(itr); - } + for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.begin(); + itr != index.end(); ++itr) + if (itr->second) + itr->second->getIngressCompletion().finishCompleter(); + index.clear(); } else { // even if flow controlled, we must release this msg as it is being dequeued - std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload); + std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.position); if (itr != index.end()) { // this msg is flow controlled, release it: - (*itr)->getIngressCompletion().finishCompleter(); + msg.payload->getIngressCompletion().finishCompleter(); index.erase(itr); } } @@ -206,34 +204,6 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg) } -/** used by clustering: is the given message's completion blocked due to flow - * control? True if message is blocked. (for the clustering updater: done - * after msgs have been replicated to the updatee). - */ -bool QueueFlowLimit::getState(const QueuedMessage& msg) const -{ - sys::Mutex::ScopedLock l(indexLock); - return (index.find(msg.payload) != index.end()); -} - - -/** artificially force the flow control state of a given message - * (for the clustering updatee: done after msgs have been replicated to - * the updatee's queue) - */ -void QueueFlowLimit::setState(const QueuedMessage& msg, bool blocked) -{ - if (blocked && msg.payload) { - - sys::Mutex::ScopedLock l(indexLock); - assert(index.find(msg.payload) == index.end()); - - QPID_LOG(debug, "Queue \"" << queue->getName() << "\": forcing flow control for msg pos=" << msg.position << " for CLUSTER SYNC"); - index.insert(msg.payload); - } -} - - void QueueFlowLimit::encode(Buffer& buffer) const { buffer.putLong(flowStopCount); @@ -281,7 +251,7 @@ void QueueFlowLimit::setDefaults(uint64_t maxQueueSize, uint flowStopRatio, uint defaultFlowStopRatio = flowStopRatio; defaultFlowResumeRatio = flowResumeRatio; - /** @todo Verify valid range on Broker::Options instead of here */ + /** @todo KAG: Verify valid range on Broker::Options instead of here */ if (flowStopRatio > 100 || flowResumeRatio > 100) throw InvalidArgumentException(QPID_MSG("Default queue flow ratios must be between 0 and 100, inclusive:" << " flowStopRatio=" << flowStopRatio @@ -320,14 +290,6 @@ QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::F if (flowStopCount == 0 && flowStopSize == 0) { // disable flow control return 0; } - /** @todo KAG - remove once cluster support for flow control done. */ - // TODO aconway 2011-02-16: is queue==0 only in tests? - // TODO kgiusti 2011-02-19: yes! The unit tests test this class in isolation */ - if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) { - QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue " - << queue->getName()); - return 0; - } return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize); } @@ -335,17 +297,76 @@ QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::F uint64_t maxByteCount = getCapacity(settings, QueuePolicy::maxSizeKey, defaultMaxSize); uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5); uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0)); + return new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize); + } + return 0; +} - /** todo KAG - remove once cluster support for flow control done. */ - if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) { - QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue " - << queue->getName()); - return 0; +/* Cluster replication */ + +namespace { + /** pack a set of sequence number ranges into a framing::Array */ + void buildSeqRangeArray(qpid::framing::Array *seqs, + const qpid::framing::SequenceNumber first, + const qpid::framing::SequenceNumber last) + { + seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(first))); + seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(last))); + } +} + +/** Runs on UPDATER to snapshot current state */ +void QueueFlowLimit::getState(qpid::framing::FieldTable& state ) const +{ + sys::Mutex::ScopedLock l(indexLock); + state.clear(); + + framing::SequenceSet ss; + if (!index.empty()) { + /* replicate the set of messages pending flow control */ + for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::const_iterator itr = index.begin(); + itr != index.end(); ++itr) { + ss.add(itr->first); } + framing::Array seqs(TYPE_CODE_UINT32); + ss.for_each(boost::bind(&buildSeqRangeArray, &seqs, _1, _2)); + state.setArray("pendingMsgSeqs", seqs); + } + QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicating pending msgs, range=" << ss); +} - return new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize); + +/** called on UPDATEE to set state from snapshot */ +void QueueFlowLimit::setState(const qpid::framing::FieldTable& state) +{ + sys::Mutex::ScopedLock l(indexLock); + index.clear(); + + framing::SequenceSet fcmsg; + framing::Array seqArray(TYPE_CODE_UINT32); + if (state.getArray("pendingMsgSeqs", seqArray)) { + assert((seqArray.count() & 0x01) == 0); // must be even since they are sequence ranges + framing::Array::const_iterator i = seqArray.begin(); + while (i != seqArray.end()) { + framing::SequenceNumber first((*i)->getIntegerValue<uint32_t, 4>()); + ++i; + framing::SequenceNumber last((*i)->getIntegerValue<uint32_t, 4>()); + ++i; + fcmsg.add(first, last); + for (SequenceNumber seq = first; seq <= last; ++seq) { + QueuedMessage msg(queue->find(seq)); // fyi: msg.payload may be null if msg is delivered & unacked + bool unique; + unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second; + assert(unique); + } + } } - return 0; + + flowStopped = index.size() != 0; + if (queueMgmtObj) { + queueMgmtObj->set_flowStopped(isFlowControlActive()); + } + QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicated the pending msgs, range=" << fcmsg) } |