diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp | 174 |
1 files changed, 66 insertions, 108 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp index fcf8d089f9..a99c9de7df 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -92,7 +92,7 @@ namespace { QueueFlowLimit::QueueFlowLimit(Queue *_queue, uint32_t _flowStopCount, uint32_t _flowResumeCount, uint64_t _flowStopSize, uint64_t _flowResumeSize) - : StatefulQueueObserver(std::string("QueueFlowLimit")), queue(_queue), queueName("<unknown>"), + : queue(_queue), queueName("<unknown>"), flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount), flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize), flowStopped(false), count(0), size(0), queueMgmtObj(0), broker(0) @@ -120,24 +120,11 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue, } -QueueFlowLimit::~QueueFlowLimit() -{ - sys::Mutex::ScopedLock l(indexLock); - if (!index.empty()) { - // we're gone - release all pending msgs - for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.begin(); - itr != index.end(); ++itr) - if (itr->second) - try { - itr->second->getIngressCompletion().finishCompleter(); - } catch (...) {} // ignore - not safe for a destructor to throw. - index.clear(); - } -} - void QueueFlowLimit::enqueued(const QueuedMessage& msg) { + if (!msg.payload) return; + sys::Mutex::ScopedLock l(indexLock); ++count; @@ -151,10 +138,13 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg) flowStopped = true; QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopSize << " enqueued bytes. Producer flow control activated." ); } - if (flowStopped && queueMgmtObj) { + if (flowStopped && queueMgmtObj) queueMgmtObj->set_flowStopped(true); - queueMgmtObj->inc_flowStoppedCount(); - } + } + + /** @todo KAG: - REMOVE ONCE STABLE */ + if (index.find(msg.payload) != index.end()) { + QPID_LOG(error, "Queue \"" << queueName << "\": has enqueued a msg twice: " << msg.position); } if (flowStopped || !index.empty()) { @@ -164,11 +154,8 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg) return; } QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.position); - msg.payload->getIngressCompletion().startCompleter(); // don't complete until flow resumes - bool unique; - unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(msg.position, msg.payload)).second; - // Like this to avoid tripping up unused variable warning when NDEBUG set - if (!unique) assert(unique); + msg.payload->getIngressCompletion()->startCompleter(); // don't complete until flow resumes + index.insert(msg.payload); } } @@ -176,6 +163,8 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg) void QueueFlowLimit::dequeued(const QueuedMessage& msg) { + if (!msg.payload) return; + sys::Mutex::ScopedLock l(indexLock); if (count > 0) { @@ -203,16 +192,16 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg) if (!index.empty()) { if (!flowStopped) { // flow enabled - release all pending msgs - for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.begin(); - itr != index.end(); ++itr) - if (itr->second) - itr->second->getIngressCompletion().finishCompleter(); - index.clear(); + while (!index.empty()) { + std::set< boost::intrusive_ptr<Message> >::iterator itr = index.begin(); + (*itr)->getIngressCompletion()->finishCompleter(); + index.erase(itr); + } } else { // even if flow controlled, we must release this msg as it is being dequeued - std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.position); + std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload); if (itr != index.end()) { // this msg is flow controlled, release it: - msg.payload->getIngressCompletion().finishCompleter(); + (*itr)->getIngressCompletion()->finishCompleter(); index.erase(itr); } } @@ -220,6 +209,34 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg) } +/** used by clustering: is the given message's completion blocked due to flow + * control? True if message is blocked. (for the clustering updater: done + * after msgs have been replicated to the updatee). + */ +bool QueueFlowLimit::getState(const QueuedMessage& msg) const +{ + sys::Mutex::ScopedLock l(indexLock); + return (index.find(msg.payload) != index.end()); +} + + +/** artificially force the flow control state of a given message + * (for the clustering updatee: done after msgs have been replicated to + * the updatee's queue) + */ +void QueueFlowLimit::setState(const QueuedMessage& msg, bool blocked) +{ + if (blocked && msg.payload) { + + sys::Mutex::ScopedLock l(indexLock); + assert(index.find(msg.payload) == index.end()); + + QPID_LOG(debug, "Queue \"" << queue->getName() << "\": forcing flow control for msg pos=" << msg.position << " for CLUSTER SYNC"); + index.insert(msg.payload); + } +} + + void QueueFlowLimit::encode(Buffer& buffer) const { buffer.putLong(flowStopCount); @@ -267,7 +284,7 @@ void QueueFlowLimit::setDefaults(uint64_t maxQueueSize, uint flowStopRatio, uint defaultFlowStopRatio = flowStopRatio; defaultFlowResumeRatio = flowResumeRatio; - /** @todo KAG: Verify valid range on Broker::Options instead of here */ + /** @todo Verify valid range on Broker::Options instead of here */ if (flowStopRatio > 100 || flowResumeRatio > 100) throw InvalidArgumentException(QPID_MSG("Default queue flow ratios must be between 0 and 100, inclusive:" << " flowStopRatio=" << flowStopRatio @@ -298,9 +315,7 @@ QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::F return 0; } - if (settings.get(flowStopCountKey) || settings.get(flowStopSizeKey) || - settings.get(flowResumeCountKey) || settings.get(flowResumeSizeKey)) { - // user provided (some) flow settings manually... + if (settings.get(flowStopCountKey) || settings.get(flowStopSizeKey)) { uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0); uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0); uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0); @@ -308,89 +323,32 @@ QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::F if (flowStopCount == 0 && flowStopSize == 0) { // disable flow control return 0; } + /** @todo KAG - remove once cluster support for flow control done. */ + // TODO aconway 2011-02-16: is queue==0 only in tests? + // TODO kgiusti 2011-02-19: yes! The unit tests test this class in isolation */ + if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) { + QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue " + << queue->getName()); + return 0; + } return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize); } - if (defaultFlowStopRatio) { // broker has a default ratio setup... + if (defaultFlowStopRatio) { uint64_t maxByteCount = getCapacity(settings, QueuePolicy::maxSizeKey, defaultMaxSize); uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5); uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0)); - uint32_t maxMsgCount = getCapacity(settings, QueuePolicy::maxCountKey, 0); // no size by default - uint32_t flowStopCount = (uint32_t)(maxMsgCount * (defaultFlowStopRatio/100.0) + 0.5); - uint32_t flowResumeCount = (uint32_t)(maxMsgCount * (defaultFlowResumeRatio/100.0)); - - return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize); - } - return 0; -} - -/* Cluster replication */ -namespace { - /** pack a set of sequence number ranges into a framing::Array */ - void buildSeqRangeArray(qpid::framing::Array *seqs, - const qpid::framing::SequenceNumber& first, - const qpid::framing::SequenceNumber& last) - { - seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(first))); - seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(last))); - } -} - -/** Runs on UPDATER to snapshot current state */ -void QueueFlowLimit::getState(qpid::framing::FieldTable& state ) const -{ - sys::Mutex::ScopedLock l(indexLock); - state.clear(); - - framing::SequenceSet ss; - if (!index.empty()) { - /* replicate the set of messages pending flow control */ - for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::const_iterator itr = index.begin(); - itr != index.end(); ++itr) { - ss.add(itr->first); - } - framing::Array seqs(TYPE_CODE_UINT32); - typedef boost::function<void(framing::SequenceNumber, framing::SequenceNumber)> arrayBuilder; - ss.for_each((arrayBuilder)boost::bind(&buildSeqRangeArray, &seqs, _1, _2)); - state.setArray("pendingMsgSeqs", seqs); - } - QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicating pending msgs, range=" << ss); -} - - -/** called on UPDATEE to set state from snapshot */ -void QueueFlowLimit::setState(const qpid::framing::FieldTable& state) -{ - sys::Mutex::ScopedLock l(indexLock); - index.clear(); - - framing::SequenceSet fcmsg; - framing::Array seqArray(TYPE_CODE_UINT32); - if (state.getArray("pendingMsgSeqs", seqArray)) { - assert((seqArray.count() & 0x01) == 0); // must be even since they are sequence ranges - framing::Array::const_iterator i = seqArray.begin(); - while (i != seqArray.end()) { - framing::SequenceNumber first((*i)->getIntegerValue<uint32_t, 4>()); - ++i; - framing::SequenceNumber last((*i)->getIntegerValue<uint32_t, 4>()); - ++i; - fcmsg.add(first, last); - for (SequenceNumber seq = first; seq <= last; ++seq) { - QueuedMessage msg(queue->find(seq)); // fyi: msg.payload may be null if msg is delivered & unacked - bool unique; - unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second; - // Like this to avoid tripping up unused variable warning when NDEBUG set - if (!unique) assert(unique); - } + /** todo KAG - remove once cluster support for flow control done. */ + if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) { + QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue " + << queue->getName()); + return 0; } - } - flowStopped = index.size() != 0; - if (queueMgmtObj) { - queueMgmtObj->set_flowStopped(isFlowControlActive()); + return new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize); } - QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicated the pending msgs, range=" << fcmsg) + return 0; } |