summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-02-03 11:37:02 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-02-03 11:37:02 +0000
commit5ca9d80194815638fac351164802d08386a519cb (patch)
tree0b80a8b745b6855702ab0b7f049976577402ac41
parent91d7819eb5d0e9657195ebabf89e6454f7eb89b5 (diff)
downloadqpid-python-5ca9d80194815638fac351164802d08386a519cb.tar.gz
release all pending msgs when flow enabled
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1066782 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp49
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.h3
2 files changed, 25 insertions, 27 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
index 31dd5987b1..ad7e8f9bb3 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -136,9 +136,9 @@ bool QueueFlowLimit::consume(const QueuedMessage& msg)
QPID_LOG(error, "Queue \"" << queueName << "\": has enqueued a msg twice: " << msg.position);
}
- if (flowStopped || !pendingFlow.empty()) {
+ if (flowStopped || !index.empty()) {
msg.payload->getReceiveCompletion().startCompleter(); // don't complete until flow resumes
- pendingFlow.push_back(msg.payload);
+ //pendingFlow.push_back(msg.payload);
index.insert(msg.payload);
}
@@ -176,33 +176,30 @@ bool QueueFlowLimit::replenish(const QueuedMessage& msg)
QPID_LOG(info, "Queue \"" << queueName << "\": has drained below the flow control resume level. Producer flow control deactivated." );
}
- if (!flowStopped && !pendingFlow.empty()) {
- // if msg is flow controlled, release it.
- std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload);
- if (itr != index.end()) {
- (*itr)->getReceiveCompletion().finishCompleter();
- index.erase(itr);
- // stupid:
- std::list< boost::intrusive_ptr<Message> >::iterator itr2 = find(pendingFlow.begin(),
- pendingFlow.end(),
- msg.payload);
- if (itr2 == pendingFlow.end()) {
- QPID_LOG(error, "Queue \"" << queueName << "\": indexed msg missing in list: " << msg.position);
- } else {
- pendingFlow.erase(itr2);
+ if (!index.empty()) {
+ if (!flowStopped) {
+ // flow enabled - release all pending msgs
+ while (!index.empty()) {
+ std::set< boost::intrusive_ptr<Message> >::iterator itr = index.begin();
+ (*itr)->getReceiveCompletion().finishCompleter();
+ index.erase(itr);
}
- }
-
- // for now, just release the oldest also
- if (!pendingFlow.empty()) {
- pendingFlow.front()->getReceiveCompletion().finishCompleter();
- itr = index.find(pendingFlow.front());
- if (itr == index.end()) {
- QPID_LOG(error, "Queue \"" << queueName << "\": msg missing in index: " << pendingFlow.front());
- } else {
+ } else {
+ // even if flow controlled, we must release this msg as it is being dequeued
+ std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload);
+ if (itr != index.end()) { // this msg is flow controlled, release it:
+ (*itr)->getReceiveCompletion().finishCompleter();
index.erase(itr);
+ //// stupid: (hopefully this is the next pending msg)
+ //std::list< boost::intrusive_ptr<Message> >::iterator itr2 = find(pendingFlow.begin(),
+ // pendingFlow.end(),
+ // msg.payload);
+ //if (itr2 == pendingFlow.end()) {
+ // QPID_LOG(error, "Queue \"" << queueName << "\": indexed msg missing in list: " << msg.position);
+ //} else {
+ // pendingFlow.erase(itr2);
+ //}
}
- pendingFlow.pop_front();
}
}
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
index 48c8095470..57d06c6bdc 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
@@ -85,8 +85,9 @@ class QueueFlowLimit
protected:
// msgs waiting for flow to become available.
- std::list< boost::intrusive_ptr<Message> > pendingFlow; // ordered, oldest @front
std::set< boost::intrusive_ptr<Message> > index;
+ // KAG: is this necessary? Not if we release all pending when level < low (?)
+ // std::list< boost::intrusive_ptr<Message> > pendingFlow; // ordered, oldest @front
qpid::sys::Mutex pendingFlowLock;
QueueFlowLimit(Queue *queue,