summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp9
-rw-r--r--cpp/src/qpid/broker/SemanticState.h1
2 files changed, 8 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index c2215a99a2..b4f146e699 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -281,6 +281,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
acquire(_acquire),
blocked(true),
windowing(true),
+ windowActive(false),
exclusive(_exclusive),
resumeId(_resumeId),
resumeTtl(_resumeTtl),
@@ -531,7 +532,7 @@ void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery)
{
if (!delivery.isComplete()) {
delivery.complete();
- if (windowing) {
+ if (windowing && windowActive) {
if (msgCredit != 0xFFFFFFFF) msgCredit++;
if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit();
}
@@ -627,6 +628,7 @@ void SemanticState::ConsumerImpl::setCreditMode()
void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
{
assertClusterSafe();
+ if (windowing) windowActive = true;
if (byteCredit != 0xFFFFFFFF) {
if (value == 0xFFFFFFFF) byteCredit = value;
else byteCredit += value;
@@ -636,6 +638,7 @@ void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
{
assertClusterSafe();
+ if (windowing) windowActive = true;
if (msgCredit != 0xFFFFFFFF) {
if (value == 0xFFFFFFFF) msgCredit = value;
else msgCredit += value;
@@ -656,7 +659,8 @@ void SemanticState::ConsumerImpl::flush()
{
while(haveCredit() && queue->dispatch(shared_from_this()))
;
- stop();
+ msgCredit = 0;
+ byteCredit = 0;
}
void SemanticState::ConsumerImpl::stop()
@@ -664,6 +668,7 @@ void SemanticState::ConsumerImpl::stop()
assertClusterSafe();
msgCredit = 0;
byteCredit = 0;
+ windowActive = false;
}
Queue::shared_ptr SemanticState::getQueue(const string& name) const {
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 22bc272c50..69d980947b 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -81,6 +81,7 @@ class SemanticState : private boost::noncopyable {
const bool acquire;
bool blocked;
bool windowing;
+ bool windowActive;
bool exclusive;
std::string resumeId;
uint64_t resumeTtl;