diff options
Diffstat (limited to 'cpp/broker/src')
-rw-r--r-- | cpp/broker/src/Channel.cpp | 53 | ||||
-rw-r--r-- | cpp/broker/src/Message.cpp | 16 | ||||
-rw-r--r-- | cpp/broker/src/Queue.cpp | 8 | ||||
-rw-r--r-- | cpp/broker/src/SessionHandlerImpl.cpp | 10 |
4 files changed, 74 insertions, 13 deletions
diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp index f887e080a6..ed1125ee76 100644 --- a/cpp/broker/src/Channel.cpp +++ b/cpp/broker/src/Channel.cpp @@ -31,6 +31,7 @@ Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : out(_out) prefetchCount(0), prefetchSize(0), outstandingSize(0), + outstandingCount(0), framesize(_framesize), transactional(false), deliveryTag(1), @@ -98,6 +99,7 @@ void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shar if(ackExpected){ unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag)); outstandingSize += msg->contentSize(); + outstandingCount++; } //send deliver method, header and content(s) msg->deliver(out, id, consumerTag, myDeliveryTag, framesize); @@ -162,10 +164,15 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){ throw InvalidAckException(); }else if(multiple){ unacknowledged.erase(unacknowledged.begin(), ++i); - //recompute outstandingSize (might in some cases be quicker to add up removed size and subtract from total?): - outstandingSize = for_each(unacknowledged.begin(), unacknowledged.end(), AddSize()).getSize(); + //recompute prefetch outstanding (note: messages delivered through get are ignored) + CalculatePrefetch calc(for_each(unacknowledged.begin(), unacknowledged.end(), CalculatePrefetch())); + outstandingSize = calc.getSize(); + outstandingCount = calc.getCount(); }else{ - outstandingSize -= i->msg->contentSize(); + if(!i->pull){ + outstandingSize -= i->msg->contentSize(); + outstandingCount--; + } unacknowledged.erase(i); } @@ -181,6 +188,7 @@ void Channel::recover(bool requeue){ if(requeue){ outstandingSize = 0; + outstandingCount = 0; ack_iterator start(unacknowledged.begin()); ack_iterator end(unacknowledged.end()); for_each(start, end, Requeue()); @@ -190,6 +198,21 @@ void Channel::recover(bool requeue){ } } +bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ + Message::shared_ptr msg = queue->dequeue(); + if(msg){ + Locker locker(deliveryLock); + u_int64_t myDeliveryTag = deliveryTag++; + msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize); + if(ackExpected){ + unacknowledged.push_back(AckRecord(msg, queue, myDeliveryTag)); + } + return true; + }else{ + return false; + } +} + Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {} bool Channel::MatchAck::operator()(AckRecord& record) const{ @@ -204,15 +227,29 @@ void Channel::Requeue::operator()(AckRecord& record) const{ Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {} void Channel::Redeliver::operator()(AckRecord& record) const{ - record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize); + if(record.pull){ + //if message was originally sent as response to get, we must requeue it + record.msg->redeliver(); + record.queue->deliver(record.msg); + }else{ + record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize); + } } -Channel::AddSize::AddSize() : size(0){} +Channel::CalculatePrefetch::CalculatePrefetch() : size(0){} -void Channel::AddSize::operator()(AckRecord& record){ - size += record.msg->contentSize(); +void Channel::CalculatePrefetch::operator()(AckRecord& record){ + if(!record.pull){ + //ignore messages that were sent in response to get when calculating prefetch + size += record.msg->contentSize(); + count++; + } } -u_int32_t Channel::AddSize::getSize(){ +u_int32_t Channel::CalculatePrefetch::getSize(){ return size; } + +u_int16_t Channel::CalculatePrefetch::getCount(){ + return count; +} diff --git a/cpp/broker/src/Message.cpp b/cpp/broker/src/Message.cpp index a44eeaab59..b0e5d16b77 100644 --- a/cpp/broker/src/Message.cpp +++ b/cpp/broker/src/Message.cpp @@ -59,10 +59,24 @@ void Message::redeliver(){ } void Message::deliver(OutputHandler* out, int channel, - string& consumerTag, u_int64_t deliveryTag, + const string& consumerTag, u_int64_t deliveryTag, u_int32_t framesize){ out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey))); + sendContent(out, channel, framesize); +} + +void Message::sendGetOk(OutputHandler* out, + int channel, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize){ + + out->send(new AMQFrame(channel, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount))); + sendContent(out, channel, framesize); +} + +void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize){ AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header); out->send(new AMQFrame(channel, headerBody)); for(content_iterator i = content.begin(); i != content.end(); i++){ diff --git a/cpp/broker/src/Queue.cpp b/cpp/broker/src/Queue.cpp index f7b8605b03..1db4454235 100644 --- a/cpp/broker/src/Queue.cpp +++ b/cpp/broker/src/Queue.cpp @@ -122,7 +122,13 @@ void Queue::cancel(Consumer* c){ } Message::shared_ptr Queue::dequeue(){ - + Locker locker(lock); + Message::shared_ptr msg; + if(!messages.empty()){ + msg = messages.front(); + messages.pop(); + } + return msg; } u_int32_t Queue::purge(){ diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp index 857730f3f7..ad73c1b23b 100644 --- a/cpp/broker/src/SessionHandlerImpl.cpp +++ b/cpp/broker/src/SessionHandlerImpl.cpp @@ -338,7 +338,6 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global){ //TODO: handle global - //TODO: channel doesn't do anything with these qos parameters yet parent->getChannel(channel)->setPrefetchSize(prefetchSize); parent->getChannel(channel)->setPrefetchCount(prefetchCount); parent->client.getBasic().qosOk(channel); @@ -349,7 +348,6 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_ bool noLocal, bool noAck, bool exclusive, bool nowait){ - //TODO: implement nolocal Queue::shared_ptr queue = parent->getQueue(queueName, channelId); Channel* channel = parent->channels[channelId]; if(!consumerTag.empty() && channel->exists(consumerTag)){ @@ -382,7 +380,13 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t parent->getChannel(channel)->handlePublish(msg); } -void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck){} +void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t ticket, string& queueName, bool noAck){ + Queue::shared_ptr queue = parent->getQueue(queueName, channelId); + if(!parent->getChannel(channelId)->get(queue, !noAck)){ + string clusterId;//not used, part of an imatix hack + parent->client.getBasic().getEmpty(channelId, clusterId); + } +} void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){ try{ |