diff options
Diffstat (limited to 'cpp/broker/src/Channel.cpp')
-rw-r--r-- | cpp/broker/src/Channel.cpp | 53 |
1 files changed, 45 insertions, 8 deletions
diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp index f887e080a6..ed1125ee76 100644 --- a/cpp/broker/src/Channel.cpp +++ b/cpp/broker/src/Channel.cpp @@ -31,6 +31,7 @@ Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : out(_out) prefetchCount(0), prefetchSize(0), outstandingSize(0), + outstandingCount(0), framesize(_framesize), transactional(false), deliveryTag(1), @@ -98,6 +99,7 @@ void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shar if(ackExpected){ unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag)); outstandingSize += msg->contentSize(); + outstandingCount++; } //send deliver method, header and content(s) msg->deliver(out, id, consumerTag, myDeliveryTag, framesize); @@ -162,10 +164,15 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){ throw InvalidAckException(); }else if(multiple){ unacknowledged.erase(unacknowledged.begin(), ++i); - //recompute outstandingSize (might in some cases be quicker to add up removed size and subtract from total?): - outstandingSize = for_each(unacknowledged.begin(), unacknowledged.end(), AddSize()).getSize(); + //recompute prefetch outstanding (note: messages delivered through get are ignored) + CalculatePrefetch calc(for_each(unacknowledged.begin(), unacknowledged.end(), CalculatePrefetch())); + outstandingSize = calc.getSize(); + outstandingCount = calc.getCount(); }else{ - outstandingSize -= i->msg->contentSize(); + if(!i->pull){ + outstandingSize -= i->msg->contentSize(); + outstandingCount--; + } unacknowledged.erase(i); } @@ -181,6 +188,7 @@ void Channel::recover(bool requeue){ if(requeue){ outstandingSize = 0; + outstandingCount = 0; ack_iterator start(unacknowledged.begin()); ack_iterator end(unacknowledged.end()); for_each(start, end, Requeue()); @@ -190,6 +198,21 @@ void Channel::recover(bool requeue){ } } +bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ + Message::shared_ptr msg = queue->dequeue(); + if(msg){ + Locker locker(deliveryLock); + u_int64_t myDeliveryTag = deliveryTag++; + msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize); + if(ackExpected){ + unacknowledged.push_back(AckRecord(msg, queue, myDeliveryTag)); + } + return true; + }else{ + return false; + } +} + Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {} bool Channel::MatchAck::operator()(AckRecord& record) const{ @@ -204,15 +227,29 @@ void Channel::Requeue::operator()(AckRecord& record) const{ Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {} void Channel::Redeliver::operator()(AckRecord& record) const{ - record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize); + if(record.pull){ + //if message was originally sent as response to get, we must requeue it + record.msg->redeliver(); + record.queue->deliver(record.msg); + }else{ + record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize); + } } -Channel::AddSize::AddSize() : size(0){} +Channel::CalculatePrefetch::CalculatePrefetch() : size(0){} -void Channel::AddSize::operator()(AckRecord& record){ - size += record.msg->contentSize(); +void Channel::CalculatePrefetch::operator()(AckRecord& record){ + if(!record.pull){ + //ignore messages that were sent in response to get when calculating prefetch + size += record.msg->contentSize(); + count++; + } } -u_int32_t Channel::AddSize::getSize(){ +u_int32_t Channel::CalculatePrefetch::getSize(){ return size; } + +u_int16_t Channel::CalculatePrefetch::getCount(){ + return count; +} |