diff options
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 61 |
1 files changed, 18 insertions, 43 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index fbcb21eab9..aa1face18d 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -285,15 +285,11 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, ackExpected(ack), acquire(_acquire), blocked(true), - windowing(true), - windowActive(false), exclusive(_exclusive), resumeId(_resumeId), tag(_tag), resumeTtl(_resumeTtl), arguments(_arguments), - msgCredit(0), - byteCredit(0), notifyEnabled(true), syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)), deliveryCount(0), @@ -338,11 +334,11 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { assertClusterSafe(); allocateCredit(msg.payload); - DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, windowing); + DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, credit.isWindowMode()); bool sync = syncFrequency && ++deliveryCount >= syncFrequency; if (sync) deliveryCount = 0;//reset parent->deliver(record, sync); - if (windowing || ackExpected || !acquire) { + if (credit.isWindowMode() || ackExpected || !acquire) { parent->record(record); } if (acquire && !ackExpected) { // auto acquire && auto accept @@ -385,28 +381,19 @@ ostream& operator<<(ostream& o, const ConsumerName& pc) { void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) { assertClusterSafe(); - uint32_t originalMsgCredit = msgCredit; - uint32_t originalByteCredit = byteCredit; - if (msgCredit != 0xFFFFFFFF) { - msgCredit--; - } - if (byteCredit != 0xFFFFFFFF) { - byteCredit -= msg->getRequiredCredit(); - } + Credit original = credit; + credit.consume(1, msg->getRequiredCredit()); QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this) - << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit - << " now bytes: " << byteCredit << " msgs: " << msgCredit); + << ", was " << original << " now " << credit); } bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) { - bool enoughCredit = msgCredit > 0 && - (byteCredit == 0xFFFFFFFF || byteCredit >= msg->getRequiredCredit()); - QPID_LOG(debug, (enoughCredit ? "Sufficient credit for " : "Insufficient credit for ") - << ConsumerName(*this) - << ", have bytes: " << byteCredit << " msgs: " << msgCredit - << ", need " << msg->getRequiredCredit() << " bytes"); + bool enoughCredit = credit.check(1, msg->getRequiredCredit()); + QPID_LOG(debug, "Subscription " << ConsumerName(*this) << " has " << (enoughCredit ? "sufficient " : "insufficient") + << " credit for message of " << msg->getRequiredCredit() << " bytes: " + << credit); return enoughCredit; } @@ -539,9 +526,8 @@ void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery) { if (!delivery.isComplete()) { delivery.complete(); - if (windowing && windowActive) { - if (msgCredit != 0xFFFFFFFF) msgCredit++; - if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit(); + if (credit.isWindowMode()) { + credit.moveWindow(1, delivery.getCredit()); } } } @@ -628,7 +614,7 @@ void SemanticState::stop(const std::string& destination) void SemanticState::ConsumerImpl::setWindowMode() { assertClusterSafe(); - windowing = true; + credit.setWindowMode(true); if (mgmtObject){ mgmtObject->set_creditMode("WINDOW"); } @@ -637,7 +623,7 @@ void SemanticState::ConsumerImpl::setWindowMode() void SemanticState::ConsumerImpl::setCreditMode() { assertClusterSafe(); - windowing = false; + credit.setWindowMode(false); if (mgmtObject){ mgmtObject->set_creditMode("CREDIT"); } @@ -646,26 +632,18 @@ void SemanticState::ConsumerImpl::setCreditMode() void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) { assertClusterSafe(); - if (windowing) windowActive = true; - if (byteCredit != 0xFFFFFFFF) { - if (value == 0xFFFFFFFF) byteCredit = value; - else byteCredit += value; - } + credit.addByteCredit(value); } void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) { assertClusterSafe(); - if (windowing) windowActive = true; - if (msgCredit != 0xFFFFFFFF) { - if (value == 0xFFFFFFFF) msgCredit = value; - else msgCredit += value; - } + credit.addMessageCredit(value); } bool SemanticState::ConsumerImpl::haveCredit() { - if (msgCredit && byteCredit) { + if (credit) { return true; } else { blocked = true; @@ -677,16 +655,13 @@ void SemanticState::ConsumerImpl::flush() { while(haveCredit() && queue->dispatch(shared_from_this())) ; - msgCredit = 0; - byteCredit = 0; + credit.cancel(); } void SemanticState::ConsumerImpl::stop() { assertClusterSafe(); - msgCredit = 0; - byteCredit = 0; - windowActive = false; + credit.cancel(); } Queue::shared_ptr SemanticState::getQueue(const string& name) const { |