diff options
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 1 |
2 files changed, 8 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index c2215a99a2..b4f146e699 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -281,6 +281,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, acquire(_acquire), blocked(true), windowing(true), + windowActive(false), exclusive(_exclusive), resumeId(_resumeId), resumeTtl(_resumeTtl), @@ -531,7 +532,7 @@ void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery) { if (!delivery.isComplete()) { delivery.complete(); - if (windowing) { + if (windowing && windowActive) { if (msgCredit != 0xFFFFFFFF) msgCredit++; if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit(); } @@ -627,6 +628,7 @@ void SemanticState::ConsumerImpl::setCreditMode() void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) { assertClusterSafe(); + if (windowing) windowActive = true; if (byteCredit != 0xFFFFFFFF) { if (value == 0xFFFFFFFF) byteCredit = value; else byteCredit += value; @@ -636,6 +638,7 @@ void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) { assertClusterSafe(); + if (windowing) windowActive = true; if (msgCredit != 0xFFFFFFFF) { if (value == 0xFFFFFFFF) msgCredit = value; else msgCredit += value; @@ -656,7 +659,8 @@ void SemanticState::ConsumerImpl::flush() { while(haveCredit() && queue->dispatch(shared_from_this())) ; - stop(); + msgCredit = 0; + byteCredit = 0; } void SemanticState::ConsumerImpl::stop() @@ -664,6 +668,7 @@ void SemanticState::ConsumerImpl::stop() assertClusterSafe(); msgCredit = 0; byteCredit = 0; + windowActive = false; } Queue::shared_ptr SemanticState::getQueue(const string& name) const { diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 22bc272c50..69d980947b 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -81,6 +81,7 @@ class SemanticState : private boost::noncopyable { const bool acquire; bool blocked; bool windowing; + bool windowActive; bool exclusive; std::string resumeId; uint64_t resumeTtl; |