summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp174
1 files changed, 66 insertions, 108 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
index fcf8d089f9..a99c9de7df 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -92,7 +92,7 @@ namespace {
QueueFlowLimit::QueueFlowLimit(Queue *_queue,
uint32_t _flowStopCount, uint32_t _flowResumeCount,
uint64_t _flowStopSize, uint64_t _flowResumeSize)
- : StatefulQueueObserver(std::string("QueueFlowLimit")), queue(_queue), queueName("<unknown>"),
+ : queue(_queue), queueName("<unknown>"),
flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
flowStopped(false), count(0), size(0), queueMgmtObj(0), broker(0)
@@ -120,24 +120,11 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue,
}
-QueueFlowLimit::~QueueFlowLimit()
-{
- sys::Mutex::ScopedLock l(indexLock);
- if (!index.empty()) {
- // we're gone - release all pending msgs
- for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.begin();
- itr != index.end(); ++itr)
- if (itr->second)
- try {
- itr->second->getIngressCompletion().finishCompleter();
- } catch (...) {} // ignore - not safe for a destructor to throw.
- index.clear();
- }
-}
-
void QueueFlowLimit::enqueued(const QueuedMessage& msg)
{
+ if (!msg.payload) return;
+
sys::Mutex::ScopedLock l(indexLock);
++count;
@@ -151,10 +138,13 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg)
flowStopped = true;
QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopSize << " enqueued bytes. Producer flow control activated." );
}
- if (flowStopped && queueMgmtObj) {
+ if (flowStopped && queueMgmtObj)
queueMgmtObj->set_flowStopped(true);
- queueMgmtObj->inc_flowStoppedCount();
- }
+ }
+
+ /** @todo KAG: - REMOVE ONCE STABLE */
+ if (index.find(msg.payload) != index.end()) {
+ QPID_LOG(error, "Queue \"" << queueName << "\": has enqueued a msg twice: " << msg.position);
}
if (flowStopped || !index.empty()) {
@@ -164,11 +154,8 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg)
return;
}
QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.position);
- msg.payload->getIngressCompletion().startCompleter(); // don't complete until flow resumes
- bool unique;
- unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(msg.position, msg.payload)).second;
- // Like this to avoid tripping up unused variable warning when NDEBUG set
- if (!unique) assert(unique);
+ msg.payload->getIngressCompletion()->startCompleter(); // don't complete until flow resumes
+ index.insert(msg.payload);
}
}
@@ -176,6 +163,8 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg)
void QueueFlowLimit::dequeued(const QueuedMessage& msg)
{
+ if (!msg.payload) return;
+
sys::Mutex::ScopedLock l(indexLock);
if (count > 0) {
@@ -203,16 +192,16 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg)
if (!index.empty()) {
if (!flowStopped) {
// flow enabled - release all pending msgs
- for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.begin();
- itr != index.end(); ++itr)
- if (itr->second)
- itr->second->getIngressCompletion().finishCompleter();
- index.clear();
+ while (!index.empty()) {
+ std::set< boost::intrusive_ptr<Message> >::iterator itr = index.begin();
+ (*itr)->getIngressCompletion()->finishCompleter();
+ index.erase(itr);
+ }
} else {
// even if flow controlled, we must release this msg as it is being dequeued
- std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.position);
+ std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload);
if (itr != index.end()) { // this msg is flow controlled, release it:
- msg.payload->getIngressCompletion().finishCompleter();
+ (*itr)->getIngressCompletion()->finishCompleter();
index.erase(itr);
}
}
@@ -220,6 +209,34 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg)
}
+/** used by clustering: is the given message's completion blocked due to flow
+ * control? True if message is blocked. (for the clustering updater: done
+ * after msgs have been replicated to the updatee).
+ */
+bool QueueFlowLimit::getState(const QueuedMessage& msg) const
+{
+ sys::Mutex::ScopedLock l(indexLock);
+ return (index.find(msg.payload) != index.end());
+}
+
+
+/** artificially force the flow control state of a given message
+ * (for the clustering updatee: done after msgs have been replicated to
+ * the updatee's queue)
+ */
+void QueueFlowLimit::setState(const QueuedMessage& msg, bool blocked)
+{
+ if (blocked && msg.payload) {
+
+ sys::Mutex::ScopedLock l(indexLock);
+ assert(index.find(msg.payload) == index.end());
+
+ QPID_LOG(debug, "Queue \"" << queue->getName() << "\": forcing flow control for msg pos=" << msg.position << " for CLUSTER SYNC");
+ index.insert(msg.payload);
+ }
+}
+
+
void QueueFlowLimit::encode(Buffer& buffer) const
{
buffer.putLong(flowStopCount);
@@ -267,7 +284,7 @@ void QueueFlowLimit::setDefaults(uint64_t maxQueueSize, uint flowStopRatio, uint
defaultFlowStopRatio = flowStopRatio;
defaultFlowResumeRatio = flowResumeRatio;
- /** @todo KAG: Verify valid range on Broker::Options instead of here */
+ /** @todo Verify valid range on Broker::Options instead of here */
if (flowStopRatio > 100 || flowResumeRatio > 100)
throw InvalidArgumentException(QPID_MSG("Default queue flow ratios must be between 0 and 100, inclusive:"
<< " flowStopRatio=" << flowStopRatio
@@ -298,9 +315,7 @@ QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::F
return 0;
}
- if (settings.get(flowStopCountKey) || settings.get(flowStopSizeKey) ||
- settings.get(flowResumeCountKey) || settings.get(flowResumeSizeKey)) {
- // user provided (some) flow settings manually...
+ if (settings.get(flowStopCountKey) || settings.get(flowStopSizeKey)) {
uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0);
uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0);
uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0);
@@ -308,89 +323,32 @@ QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::F
if (flowStopCount == 0 && flowStopSize == 0) { // disable flow control
return 0;
}
+ /** @todo KAG - remove once cluster support for flow control done. */
+ // TODO aconway 2011-02-16: is queue==0 only in tests?
+ // TODO kgiusti 2011-02-19: yes! The unit tests test this class in isolation */
+ if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) {
+ QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue "
+ << queue->getName());
+ return 0;
+ }
return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
}
- if (defaultFlowStopRatio) { // broker has a default ratio setup...
+ if (defaultFlowStopRatio) {
uint64_t maxByteCount = getCapacity(settings, QueuePolicy::maxSizeKey, defaultMaxSize);
uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5);
uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0));
- uint32_t maxMsgCount = getCapacity(settings, QueuePolicy::maxCountKey, 0); // no size by default
- uint32_t flowStopCount = (uint32_t)(maxMsgCount * (defaultFlowStopRatio/100.0) + 0.5);
- uint32_t flowResumeCount = (uint32_t)(maxMsgCount * (defaultFlowResumeRatio/100.0));
-
- return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
- }
- return 0;
-}
-
-/* Cluster replication */
-namespace {
- /** pack a set of sequence number ranges into a framing::Array */
- void buildSeqRangeArray(qpid::framing::Array *seqs,
- const qpid::framing::SequenceNumber& first,
- const qpid::framing::SequenceNumber& last)
- {
- seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(first)));
- seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(last)));
- }
-}
-
-/** Runs on UPDATER to snapshot current state */
-void QueueFlowLimit::getState(qpid::framing::FieldTable& state ) const
-{
- sys::Mutex::ScopedLock l(indexLock);
- state.clear();
-
- framing::SequenceSet ss;
- if (!index.empty()) {
- /* replicate the set of messages pending flow control */
- for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::const_iterator itr = index.begin();
- itr != index.end(); ++itr) {
- ss.add(itr->first);
- }
- framing::Array seqs(TYPE_CODE_UINT32);
- typedef boost::function<void(framing::SequenceNumber, framing::SequenceNumber)> arrayBuilder;
- ss.for_each((arrayBuilder)boost::bind(&buildSeqRangeArray, &seqs, _1, _2));
- state.setArray("pendingMsgSeqs", seqs);
- }
- QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicating pending msgs, range=" << ss);
-}
-
-
-/** called on UPDATEE to set state from snapshot */
-void QueueFlowLimit::setState(const qpid::framing::FieldTable& state)
-{
- sys::Mutex::ScopedLock l(indexLock);
- index.clear();
-
- framing::SequenceSet fcmsg;
- framing::Array seqArray(TYPE_CODE_UINT32);
- if (state.getArray("pendingMsgSeqs", seqArray)) {
- assert((seqArray.count() & 0x01) == 0); // must be even since they are sequence ranges
- framing::Array::const_iterator i = seqArray.begin();
- while (i != seqArray.end()) {
- framing::SequenceNumber first((*i)->getIntegerValue<uint32_t, 4>());
- ++i;
- framing::SequenceNumber last((*i)->getIntegerValue<uint32_t, 4>());
- ++i;
- fcmsg.add(first, last);
- for (SequenceNumber seq = first; seq <= last; ++seq) {
- QueuedMessage msg(queue->find(seq)); // fyi: msg.payload may be null if msg is delivered & unacked
- bool unique;
- unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second;
- // Like this to avoid tripping up unused variable warning when NDEBUG set
- if (!unique) assert(unique);
- }
+ /** todo KAG - remove once cluster support for flow control done. */
+ if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) {
+ QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue "
+ << queue->getName());
+ return 0;
}
- }
- flowStopped = index.size() != 0;
- if (queueMgmtObj) {
- queueMgmtObj->set_flowStopped(isFlowControlActive());
+ return new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize);
}
- QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicated the pending msgs, range=" << fcmsg)
+ return 0;
}