summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2011-09-15 17:02:50 +0000
committerGordon Sim <gsim@apache.org>2011-09-15 17:02:50 +0000
commit852080fb799f0ac807b9553da31a4c5e2f475887 (patch)
tree78271ce2c7eb057552781378a23b839deb5ab412 /cpp
parent01fb15aee57a19f4adb4293849f3112e86712861 (diff)
downloadqpid-python-852080fb799f0ac807b9553da31a4c5e2f475887.tar.gz
QPID-3488: Ensure that message-stop clears any outstanding credit 'window'
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1171174 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp9
-rw-r--r--cpp/src/qpid/broker/SemanticState.h1
2 files changed, 8 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index c2215a99a2..b4f146e699 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -281,6 +281,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
acquire(_acquire),
blocked(true),
windowing(true),
+ windowActive(false),
exclusive(_exclusive),
resumeId(_resumeId),
resumeTtl(_resumeTtl),
@@ -531,7 +532,7 @@ void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery)
{
if (!delivery.isComplete()) {
delivery.complete();
- if (windowing) {
+ if (windowing && windowActive) {
if (msgCredit != 0xFFFFFFFF) msgCredit++;
if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit();
}
@@ -627,6 +628,7 @@ void SemanticState::ConsumerImpl::setCreditMode()
void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
{
assertClusterSafe();
+ if (windowing) windowActive = true;
if (byteCredit != 0xFFFFFFFF) {
if (value == 0xFFFFFFFF) byteCredit = value;
else byteCredit += value;
@@ -636,6 +638,7 @@ void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
{
assertClusterSafe();
+ if (windowing) windowActive = true;
if (msgCredit != 0xFFFFFFFF) {
if (value == 0xFFFFFFFF) msgCredit = value;
else msgCredit += value;
@@ -656,7 +659,8 @@ void SemanticState::ConsumerImpl::flush()
{
while(haveCredit() && queue->dispatch(shared_from_this()))
;
- stop();
+ msgCredit = 0;
+ byteCredit = 0;
}
void SemanticState::ConsumerImpl::stop()
@@ -664,6 +668,7 @@ void SemanticState::ConsumerImpl::stop()
assertClusterSafe();
msgCredit = 0;
byteCredit = 0;
+ windowActive = false;
}
Queue::shared_ptr SemanticState::getQueue(const string& name) const {
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 22bc272c50..69d980947b 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -81,6 +81,7 @@ class SemanticState : private boost::noncopyable {
const bool acquire;
bool blocked;
bool windowing;
+ bool windowActive;
bool exclusive;
std::string resumeId;
uint64_t resumeTtl;