diff options
author | Gordon Sim <gsim@apache.org> | 2011-09-15 17:02:50 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2011-09-15 17:02:50 +0000 |
commit | 852080fb799f0ac807b9553da31a4c5e2f475887 (patch) | |
tree | 78271ce2c7eb057552781378a23b839deb5ab412 /cpp | |
parent | 01fb15aee57a19f4adb4293849f3112e86712861 (diff) | |
download | qpid-python-852080fb799f0ac807b9553da31a4c5e2f475887.tar.gz |
QPID-3488: Ensure that message-stop clears any outstanding credit 'window'
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1171174 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 1 |
2 files changed, 8 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index c2215a99a2..b4f146e699 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -281,6 +281,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, acquire(_acquire), blocked(true), windowing(true), + windowActive(false), exclusive(_exclusive), resumeId(_resumeId), resumeTtl(_resumeTtl), @@ -531,7 +532,7 @@ void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery) { if (!delivery.isComplete()) { delivery.complete(); - if (windowing) { + if (windowing && windowActive) { if (msgCredit != 0xFFFFFFFF) msgCredit++; if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit(); } @@ -627,6 +628,7 @@ void SemanticState::ConsumerImpl::setCreditMode() void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) { assertClusterSafe(); + if (windowing) windowActive = true; if (byteCredit != 0xFFFFFFFF) { if (value == 0xFFFFFFFF) byteCredit = value; else byteCredit += value; @@ -636,6 +638,7 @@ void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) { assertClusterSafe(); + if (windowing) windowActive = true; if (msgCredit != 0xFFFFFFFF) { if (value == 0xFFFFFFFF) msgCredit = value; else msgCredit += value; @@ -656,7 +659,8 @@ void SemanticState::ConsumerImpl::flush() { while(haveCredit() && queue->dispatch(shared_from_this())) ; - stop(); + msgCredit = 0; + byteCredit = 0; } void SemanticState::ConsumerImpl::stop() @@ -664,6 +668,7 @@ void SemanticState::ConsumerImpl::stop() assertClusterSafe(); msgCredit = 0; byteCredit = 0; + windowActive = false; } Queue::shared_ptr SemanticState::getQueue(const string& name) const { diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 22bc272c50..69d980947b 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -81,6 +81,7 @@ class SemanticState : private boost::noncopyable { const bool acquire; bool blocked; bool windowing; + bool windowActive; bool exclusive; std::string resumeId; uint64_t resumeTtl; |