summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/QueueFlowLimit.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/QueueFlowLimit.cpp')
-rw-r--r--cpp/src/qpid/broker/QueueFlowLimit.cpp135
1 files changed, 78 insertions, 57 deletions
diff --git a/cpp/src/qpid/broker/QueueFlowLimit.cpp b/cpp/src/qpid/broker/QueueFlowLimit.cpp
index 3494288f7b..20679972ff 100644
--- a/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -92,7 +92,7 @@ namespace {
QueueFlowLimit::QueueFlowLimit(Queue *_queue,
uint32_t _flowStopCount, uint32_t _flowResumeCount,
uint64_t _flowStopSize, uint64_t _flowResumeSize)
- : queue(_queue), queueName("<unknown>"),
+ : StatefulQueueObserver(std::string("QueueFlowLimit")), queue(_queue), queueName("<unknown>"),
flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
flowStopped(false), count(0), size(0), queueMgmtObj(0), broker(0)
@@ -123,8 +123,6 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue,
void QueueFlowLimit::enqueued(const QueuedMessage& msg)
{
- if (!msg.payload) return;
-
sys::Mutex::ScopedLock l(indexLock);
++count;
@@ -152,7 +150,9 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg)
}
QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.position);
msg.payload->getIngressCompletion().startCompleter(); // don't complete until flow resumes
- index.insert(msg.payload);
+ bool unique;
+ unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(msg.position, msg.payload)).second;
+ assert(unique);
}
}
@@ -160,8 +160,6 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg)
void QueueFlowLimit::dequeued(const QueuedMessage& msg)
{
- if (!msg.payload) return;
-
sys::Mutex::ScopedLock l(indexLock);
if (count > 0) {
@@ -189,16 +187,16 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg)
if (!index.empty()) {
if (!flowStopped) {
// flow enabled - release all pending msgs
- while (!index.empty()) {
- std::set< boost::intrusive_ptr<Message> >::iterator itr = index.begin();
- (*itr)->getIngressCompletion().finishCompleter();
- index.erase(itr);
- }
+ for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.begin();
+ itr != index.end(); ++itr)
+ if (itr->second)
+ itr->second->getIngressCompletion().finishCompleter();
+ index.clear();
} else {
// even if flow controlled, we must release this msg as it is being dequeued
- std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload);
+ std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.position);
if (itr != index.end()) { // this msg is flow controlled, release it:
- (*itr)->getIngressCompletion().finishCompleter();
+ msg.payload->getIngressCompletion().finishCompleter();
index.erase(itr);
}
}
@@ -206,34 +204,6 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg)
}
-/** used by clustering: is the given message's completion blocked due to flow
- * control? True if message is blocked. (for the clustering updater: done
- * after msgs have been replicated to the updatee).
- */
-bool QueueFlowLimit::getState(const QueuedMessage& msg) const
-{
- sys::Mutex::ScopedLock l(indexLock);
- return (index.find(msg.payload) != index.end());
-}
-
-
-/** artificially force the flow control state of a given message
- * (for the clustering updatee: done after msgs have been replicated to
- * the updatee's queue)
- */
-void QueueFlowLimit::setState(const QueuedMessage& msg, bool blocked)
-{
- if (blocked && msg.payload) {
-
- sys::Mutex::ScopedLock l(indexLock);
- assert(index.find(msg.payload) == index.end());
-
- QPID_LOG(debug, "Queue \"" << queue->getName() << "\": forcing flow control for msg pos=" << msg.position << " for CLUSTER SYNC");
- index.insert(msg.payload);
- }
-}
-
-
void QueueFlowLimit::encode(Buffer& buffer) const
{
buffer.putLong(flowStopCount);
@@ -281,7 +251,7 @@ void QueueFlowLimit::setDefaults(uint64_t maxQueueSize, uint flowStopRatio, uint
defaultFlowStopRatio = flowStopRatio;
defaultFlowResumeRatio = flowResumeRatio;
- /** @todo Verify valid range on Broker::Options instead of here */
+ /** @todo KAG: Verify valid range on Broker::Options instead of here */
if (flowStopRatio > 100 || flowResumeRatio > 100)
throw InvalidArgumentException(QPID_MSG("Default queue flow ratios must be between 0 and 100, inclusive:"
<< " flowStopRatio=" << flowStopRatio
@@ -320,14 +290,6 @@ QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::F
if (flowStopCount == 0 && flowStopSize == 0) { // disable flow control
return 0;
}
- /** @todo KAG - remove once cluster support for flow control done. */
- // TODO aconway 2011-02-16: is queue==0 only in tests?
- // TODO kgiusti 2011-02-19: yes! The unit tests test this class in isolation */
- if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) {
- QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue "
- << queue->getName());
- return 0;
- }
return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
}
@@ -335,17 +297,76 @@ QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::F
uint64_t maxByteCount = getCapacity(settings, QueuePolicy::maxSizeKey, defaultMaxSize);
uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5);
uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0));
+ return new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize);
+ }
+ return 0;
+}
- /** todo KAG - remove once cluster support for flow control done. */
- if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) {
- QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue "
- << queue->getName());
- return 0;
+/* Cluster replication */
+
+namespace {
+ /** pack a set of sequence number ranges into a framing::Array */
+ void buildSeqRangeArray(qpid::framing::Array *seqs,
+ const qpid::framing::SequenceNumber first,
+ const qpid::framing::SequenceNumber last)
+ {
+ seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(first)));
+ seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(last)));
+ }
+}
+
+/** Runs on UPDATER to snapshot current state */
+void QueueFlowLimit::getState(qpid::framing::FieldTable& state ) const
+{
+ sys::Mutex::ScopedLock l(indexLock);
+ state.clear();
+
+ framing::SequenceSet ss;
+ if (!index.empty()) {
+ /* replicate the set of messages pending flow control */
+ for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::const_iterator itr = index.begin();
+ itr != index.end(); ++itr) {
+ ss.add(itr->first);
}
+ framing::Array seqs(TYPE_CODE_UINT32);
+ ss.for_each(boost::bind(&buildSeqRangeArray, &seqs, _1, _2));
+ state.setArray("pendingMsgSeqs", seqs);
+ }
+ QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicating pending msgs, range=" << ss);
+}
- return new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize);
+
+/** called on UPDATEE to set state from snapshot */
+void QueueFlowLimit::setState(const qpid::framing::FieldTable& state)
+{
+ sys::Mutex::ScopedLock l(indexLock);
+ index.clear();
+
+ framing::SequenceSet fcmsg;
+ framing::Array seqArray(TYPE_CODE_UINT32);
+ if (state.getArray("pendingMsgSeqs", seqArray)) {
+ assert((seqArray.count() & 0x01) == 0); // must be even since they are sequence ranges
+ framing::Array::const_iterator i = seqArray.begin();
+ while (i != seqArray.end()) {
+ framing::SequenceNumber first((*i)->getIntegerValue<uint32_t, 4>());
+ ++i;
+ framing::SequenceNumber last((*i)->getIntegerValue<uint32_t, 4>());
+ ++i;
+ fcmsg.add(first, last);
+ for (SequenceNumber seq = first; seq <= last; ++seq) {
+ QueuedMessage msg(queue->find(seq)); // fyi: msg.payload may be null if msg is delivered & unacked
+ bool unique;
+ unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second;
+ assert(unique);
+ }
+ }
}
- return 0;
+
+ flowStopped = index.size() != 0;
+ if (queueMgmtObj) {
+ queueMgmtObj->set_flowStopped(isFlowControlActive());
+ }
+ QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicated the pending msgs, range=" << fcmsg)
}