summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp61
1 files changed, 18 insertions, 43 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index fbcb21eab9..aa1face18d 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -285,15 +285,11 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
ackExpected(ack),
acquire(_acquire),
blocked(true),
- windowing(true),
- windowActive(false),
exclusive(_exclusive),
resumeId(_resumeId),
tag(_tag),
resumeTtl(_resumeTtl),
arguments(_arguments),
- msgCredit(0),
- byteCredit(0),
notifyEnabled(true),
syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
deliveryCount(0),
@@ -338,11 +334,11 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
assertClusterSafe();
allocateCredit(msg.payload);
- DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, windowing);
+ DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, credit.isWindowMode());
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
parent->deliver(record, sync);
- if (windowing || ackExpected || !acquire) {
+ if (credit.isWindowMode() || ackExpected || !acquire) {
parent->record(record);
}
if (acquire && !ackExpected) { // auto acquire && auto accept
@@ -385,28 +381,19 @@ ostream& operator<<(ostream& o, const ConsumerName& pc) {
void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
{
assertClusterSafe();
- uint32_t originalMsgCredit = msgCredit;
- uint32_t originalByteCredit = byteCredit;
- if (msgCredit != 0xFFFFFFFF) {
- msgCredit--;
- }
- if (byteCredit != 0xFFFFFFFF) {
- byteCredit -= msg->getRequiredCredit();
- }
+ Credit original = credit;
+ credit.consume(1, msg->getRequiredCredit());
QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
- << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit
- << " now bytes: " << byteCredit << " msgs: " << msgCredit);
+ << ", was " << original << " now " << credit);
}
bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
{
- bool enoughCredit = msgCredit > 0 &&
- (byteCredit == 0xFFFFFFFF || byteCredit >= msg->getRequiredCredit());
- QPID_LOG(debug, (enoughCredit ? "Sufficient credit for " : "Insufficient credit for ")
- << ConsumerName(*this)
- << ", have bytes: " << byteCredit << " msgs: " << msgCredit
- << ", need " << msg->getRequiredCredit() << " bytes");
+ bool enoughCredit = credit.check(1, msg->getRequiredCredit());
+ QPID_LOG(debug, "Subscription " << ConsumerName(*this) << " has " << (enoughCredit ? "sufficient " : "insufficient")
+ << " credit for message of " << msg->getRequiredCredit() << " bytes: "
+ << credit);
return enoughCredit;
}
@@ -539,9 +526,8 @@ void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery)
{
if (!delivery.isComplete()) {
delivery.complete();
- if (windowing && windowActive) {
- if (msgCredit != 0xFFFFFFFF) msgCredit++;
- if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit();
+ if (credit.isWindowMode()) {
+ credit.moveWindow(1, delivery.getCredit());
}
}
}
@@ -628,7 +614,7 @@ void SemanticState::stop(const std::string& destination)
void SemanticState::ConsumerImpl::setWindowMode()
{
assertClusterSafe();
- windowing = true;
+ credit.setWindowMode(true);
if (mgmtObject){
mgmtObject->set_creditMode("WINDOW");
}
@@ -637,7 +623,7 @@ void SemanticState::ConsumerImpl::setWindowMode()
void SemanticState::ConsumerImpl::setCreditMode()
{
assertClusterSafe();
- windowing = false;
+ credit.setWindowMode(false);
if (mgmtObject){
mgmtObject->set_creditMode("CREDIT");
}
@@ -646,26 +632,18 @@ void SemanticState::ConsumerImpl::setCreditMode()
void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
{
assertClusterSafe();
- if (windowing) windowActive = true;
- if (byteCredit != 0xFFFFFFFF) {
- if (value == 0xFFFFFFFF) byteCredit = value;
- else byteCredit += value;
- }
+ credit.addByteCredit(value);
}
void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
{
assertClusterSafe();
- if (windowing) windowActive = true;
- if (msgCredit != 0xFFFFFFFF) {
- if (value == 0xFFFFFFFF) msgCredit = value;
- else msgCredit += value;
- }
+ credit.addMessageCredit(value);
}
bool SemanticState::ConsumerImpl::haveCredit()
{
- if (msgCredit && byteCredit) {
+ if (credit) {
return true;
} else {
blocked = true;
@@ -677,16 +655,13 @@ void SemanticState::ConsumerImpl::flush()
{
while(haveCredit() && queue->dispatch(shared_from_this()))
;
- msgCredit = 0;
- byteCredit = 0;
+ credit.cancel();
}
void SemanticState::ConsumerImpl::stop()
{
assertClusterSafe();
- msgCredit = 0;
- byteCredit = 0;
- windowActive = false;
+ credit.cancel();
}
Queue::shared_ptr SemanticState::getQueue(const string& name) const {