summaryrefslogtreecommitdiff
path: root/cpp/broker/src/Channel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/broker/src/Channel.cpp')
-rw-r--r--cpp/broker/src/Channel.cpp53
1 files changed, 45 insertions, 8 deletions
diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp
index f887e080a6..ed1125ee76 100644
--- a/cpp/broker/src/Channel.cpp
+++ b/cpp/broker/src/Channel.cpp
@@ -31,6 +31,7 @@ Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : out(_out)
prefetchCount(0),
prefetchSize(0),
outstandingSize(0),
+ outstandingCount(0),
framesize(_framesize),
transactional(false),
deliveryTag(1),
@@ -98,6 +99,7 @@ void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shar
if(ackExpected){
unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag));
outstandingSize += msg->contentSize();
+ outstandingCount++;
}
//send deliver method, header and content(s)
msg->deliver(out, id, consumerTag, myDeliveryTag, framesize);
@@ -162,10 +164,15 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){
throw InvalidAckException();
}else if(multiple){
unacknowledged.erase(unacknowledged.begin(), ++i);
- //recompute outstandingSize (might in some cases be quicker to add up removed size and subtract from total?):
- outstandingSize = for_each(unacknowledged.begin(), unacknowledged.end(), AddSize()).getSize();
+ //recompute prefetch outstanding (note: messages delivered through get are ignored)
+ CalculatePrefetch calc(for_each(unacknowledged.begin(), unacknowledged.end(), CalculatePrefetch()));
+ outstandingSize = calc.getSize();
+ outstandingCount = calc.getCount();
}else{
- outstandingSize -= i->msg->contentSize();
+ if(!i->pull){
+ outstandingSize -= i->msg->contentSize();
+ outstandingCount--;
+ }
unacknowledged.erase(i);
}
@@ -181,6 +188,7 @@ void Channel::recover(bool requeue){
if(requeue){
outstandingSize = 0;
+ outstandingCount = 0;
ack_iterator start(unacknowledged.begin());
ack_iterator end(unacknowledged.end());
for_each(start, end, Requeue());
@@ -190,6 +198,21 @@ void Channel::recover(bool requeue){
}
}
+bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
+ Message::shared_ptr msg = queue->dequeue();
+ if(msg){
+ Locker locker(deliveryLock);
+ u_int64_t myDeliveryTag = deliveryTag++;
+ msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize);
+ if(ackExpected){
+ unacknowledged.push_back(AckRecord(msg, queue, myDeliveryTag));
+ }
+ return true;
+ }else{
+ return false;
+ }
+}
+
Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {}
bool Channel::MatchAck::operator()(AckRecord& record) const{
@@ -204,15 +227,29 @@ void Channel::Requeue::operator()(AckRecord& record) const{
Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {}
void Channel::Redeliver::operator()(AckRecord& record) const{
- record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize);
+ if(record.pull){
+ //if message was originally sent as response to get, we must requeue it
+ record.msg->redeliver();
+ record.queue->deliver(record.msg);
+ }else{
+ record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize);
+ }
}
-Channel::AddSize::AddSize() : size(0){}
+Channel::CalculatePrefetch::CalculatePrefetch() : size(0){}
-void Channel::AddSize::operator()(AckRecord& record){
- size += record.msg->contentSize();
+void Channel::CalculatePrefetch::operator()(AckRecord& record){
+ if(!record.pull){
+ //ignore messages that were sent in response to get when calculating prefetch
+ size += record.msg->contentSize();
+ count++;
+ }
}
-u_int32_t Channel::AddSize::getSize(){
+u_int32_t Channel::CalculatePrefetch::getSize(){
return size;
}
+
+u_int16_t Channel::CalculatePrefetch::getCount(){
+ return count;
+}