diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-03 11:37:02 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-03 11:37:02 +0000 |
commit | 5ca9d80194815638fac351164802d08386a519cb (patch) | |
tree | 0b80a8b745b6855702ab0b7f049976577402ac41 | |
parent | 91d7819eb5d0e9657195ebabf89e6454f7eb89b5 (diff) | |
download | qpid-python-5ca9d80194815638fac351164802d08386a519cb.tar.gz |
release all pending msgs when flow enabled
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1066782 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp | 49 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.h | 3 |
2 files changed, 25 insertions, 27 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp index 31dd5987b1..ad7e8f9bb3 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -136,9 +136,9 @@ bool QueueFlowLimit::consume(const QueuedMessage& msg) QPID_LOG(error, "Queue \"" << queueName << "\": has enqueued a msg twice: " << msg.position); } - if (flowStopped || !pendingFlow.empty()) { + if (flowStopped || !index.empty()) { msg.payload->getReceiveCompletion().startCompleter(); // don't complete until flow resumes - pendingFlow.push_back(msg.payload); + //pendingFlow.push_back(msg.payload); index.insert(msg.payload); } @@ -176,33 +176,30 @@ bool QueueFlowLimit::replenish(const QueuedMessage& msg) QPID_LOG(info, "Queue \"" << queueName << "\": has drained below the flow control resume level. Producer flow control deactivated." ); } - if (!flowStopped && !pendingFlow.empty()) { - // if msg is flow controlled, release it. - std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload); - if (itr != index.end()) { - (*itr)->getReceiveCompletion().finishCompleter(); - index.erase(itr); - // stupid: - std::list< boost::intrusive_ptr<Message> >::iterator itr2 = find(pendingFlow.begin(), - pendingFlow.end(), - msg.payload); - if (itr2 == pendingFlow.end()) { - QPID_LOG(error, "Queue \"" << queueName << "\": indexed msg missing in list: " << msg.position); - } else { - pendingFlow.erase(itr2); + if (!index.empty()) { + if (!flowStopped) { + // flow enabled - release all pending msgs + while (!index.empty()) { + std::set< boost::intrusive_ptr<Message> >::iterator itr = index.begin(); + (*itr)->getReceiveCompletion().finishCompleter(); + index.erase(itr); } - } - - // for now, just release the oldest also - if (!pendingFlow.empty()) { - pendingFlow.front()->getReceiveCompletion().finishCompleter(); - itr = index.find(pendingFlow.front()); - if (itr == index.end()) { - QPID_LOG(error, "Queue \"" << queueName << "\": msg missing in index: " << pendingFlow.front()); - } else { + } else { + // even if flow controlled, we must release this msg as it is being dequeued + std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload); + if (itr != index.end()) { // this msg is flow controlled, release it: + (*itr)->getReceiveCompletion().finishCompleter(); index.erase(itr); + //// stupid: (hopefully this is the next pending msg) + //std::list< boost::intrusive_ptr<Message> >::iterator itr2 = find(pendingFlow.begin(), + // pendingFlow.end(), + // msg.payload); + //if (itr2 == pendingFlow.end()) { + // QPID_LOG(error, "Queue \"" << queueName << "\": indexed msg missing in list: " << msg.position); + //} else { + // pendingFlow.erase(itr2); + //} } - pendingFlow.pop_front(); } } diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h index 48c8095470..57d06c6bdc 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h @@ -85,8 +85,9 @@ class QueueFlowLimit protected: // msgs waiting for flow to become available. - std::list< boost::intrusive_ptr<Message> > pendingFlow; // ordered, oldest @front std::set< boost::intrusive_ptr<Message> > index; + // KAG: is this necessary? Not if we release all pending when level < low (?) + // std::list< boost::intrusive_ptr<Message> > pendingFlow; // ordered, oldest @front qpid::sys::Mutex pendingFlowLock; QueueFlowLimit(Queue *queue, |