diff options
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 9 |
1 files changed, 7 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index c2215a99a2..b4f146e699 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -281,6 +281,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, acquire(_acquire), blocked(true), windowing(true), + windowActive(false), exclusive(_exclusive), resumeId(_resumeId), resumeTtl(_resumeTtl), @@ -531,7 +532,7 @@ void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery) { if (!delivery.isComplete()) { delivery.complete(); - if (windowing) { + if (windowing && windowActive) { if (msgCredit != 0xFFFFFFFF) msgCredit++; if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit(); } @@ -627,6 +628,7 @@ void SemanticState::ConsumerImpl::setCreditMode() void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) { assertClusterSafe(); + if (windowing) windowActive = true; if (byteCredit != 0xFFFFFFFF) { if (value == 0xFFFFFFFF) byteCredit = value; else byteCredit += value; @@ -636,6 +638,7 @@ void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) { assertClusterSafe(); + if (windowing) windowActive = true; if (msgCredit != 0xFFFFFFFF) { if (value == 0xFFFFFFFF) msgCredit = value; else msgCredit += value; @@ -656,7 +659,8 @@ void SemanticState::ConsumerImpl::flush() { while(haveCredit() && queue->dispatch(shared_from_this())) ; - stop(); + msgCredit = 0; + byteCredit = 0; } void SemanticState::ConsumerImpl::stop() @@ -664,6 +668,7 @@ void SemanticState::ConsumerImpl::stop() assertClusterSafe(); msgCredit = 0; byteCredit = 0; + windowActive = false; } Queue::shared_ptr SemanticState::getQueue(const string& name) const { |