summaryrefslogtreecommitdiff
path: root/cpp/broker/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/broker/src')
-rw-r--r--cpp/broker/src/Channel.cpp53
-rw-r--r--cpp/broker/src/Message.cpp16
-rw-r--r--cpp/broker/src/Queue.cpp8
-rw-r--r--cpp/broker/src/SessionHandlerImpl.cpp10
4 files changed, 74 insertions, 13 deletions
diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp
index f887e080a6..ed1125ee76 100644
--- a/cpp/broker/src/Channel.cpp
+++ b/cpp/broker/src/Channel.cpp
@@ -31,6 +31,7 @@ Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : out(_out)
prefetchCount(0),
prefetchSize(0),
outstandingSize(0),
+ outstandingCount(0),
framesize(_framesize),
transactional(false),
deliveryTag(1),
@@ -98,6 +99,7 @@ void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shar
if(ackExpected){
unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag));
outstandingSize += msg->contentSize();
+ outstandingCount++;
}
//send deliver method, header and content(s)
msg->deliver(out, id, consumerTag, myDeliveryTag, framesize);
@@ -162,10 +164,15 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){
throw InvalidAckException();
}else if(multiple){
unacknowledged.erase(unacknowledged.begin(), ++i);
- //recompute outstandingSize (might in some cases be quicker to add up removed size and subtract from total?):
- outstandingSize = for_each(unacknowledged.begin(), unacknowledged.end(), AddSize()).getSize();
+ //recompute prefetch outstanding (note: messages delivered through get are ignored)
+ CalculatePrefetch calc(for_each(unacknowledged.begin(), unacknowledged.end(), CalculatePrefetch()));
+ outstandingSize = calc.getSize();
+ outstandingCount = calc.getCount();
}else{
- outstandingSize -= i->msg->contentSize();
+ if(!i->pull){
+ outstandingSize -= i->msg->contentSize();
+ outstandingCount--;
+ }
unacknowledged.erase(i);
}
@@ -181,6 +188,7 @@ void Channel::recover(bool requeue){
if(requeue){
outstandingSize = 0;
+ outstandingCount = 0;
ack_iterator start(unacknowledged.begin());
ack_iterator end(unacknowledged.end());
for_each(start, end, Requeue());
@@ -190,6 +198,21 @@ void Channel::recover(bool requeue){
}
}
+bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
+ Message::shared_ptr msg = queue->dequeue();
+ if(msg){
+ Locker locker(deliveryLock);
+ u_int64_t myDeliveryTag = deliveryTag++;
+ msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize);
+ if(ackExpected){
+ unacknowledged.push_back(AckRecord(msg, queue, myDeliveryTag));
+ }
+ return true;
+ }else{
+ return false;
+ }
+}
+
Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {}
bool Channel::MatchAck::operator()(AckRecord& record) const{
@@ -204,15 +227,29 @@ void Channel::Requeue::operator()(AckRecord& record) const{
Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {}
void Channel::Redeliver::operator()(AckRecord& record) const{
- record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize);
+ if(record.pull){
+ //if message was originally sent as response to get, we must requeue it
+ record.msg->redeliver();
+ record.queue->deliver(record.msg);
+ }else{
+ record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize);
+ }
}
-Channel::AddSize::AddSize() : size(0){}
+Channel::CalculatePrefetch::CalculatePrefetch() : size(0){}
-void Channel::AddSize::operator()(AckRecord& record){
- size += record.msg->contentSize();
+void Channel::CalculatePrefetch::operator()(AckRecord& record){
+ if(!record.pull){
+ //ignore messages that were sent in response to get when calculating prefetch
+ size += record.msg->contentSize();
+ count++;
+ }
}
-u_int32_t Channel::AddSize::getSize(){
+u_int32_t Channel::CalculatePrefetch::getSize(){
return size;
}
+
+u_int16_t Channel::CalculatePrefetch::getCount(){
+ return count;
+}
diff --git a/cpp/broker/src/Message.cpp b/cpp/broker/src/Message.cpp
index a44eeaab59..b0e5d16b77 100644
--- a/cpp/broker/src/Message.cpp
+++ b/cpp/broker/src/Message.cpp
@@ -59,10 +59,24 @@ void Message::redeliver(){
}
void Message::deliver(OutputHandler* out, int channel,
- string& consumerTag, u_int64_t deliveryTag,
+ const string& consumerTag, u_int64_t deliveryTag,
u_int32_t framesize){
out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey)));
+ sendContent(out, channel, framesize);
+}
+
+void Message::sendGetOk(OutputHandler* out,
+ int channel,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize){
+
+ out->send(new AMQFrame(channel, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount)));
+ sendContent(out, channel, framesize);
+}
+
+void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize){
AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
out->send(new AMQFrame(channel, headerBody));
for(content_iterator i = content.begin(); i != content.end(); i++){
diff --git a/cpp/broker/src/Queue.cpp b/cpp/broker/src/Queue.cpp
index f7b8605b03..1db4454235 100644
--- a/cpp/broker/src/Queue.cpp
+++ b/cpp/broker/src/Queue.cpp
@@ -122,7 +122,13 @@ void Queue::cancel(Consumer* c){
}
Message::shared_ptr Queue::dequeue(){
-
+ Locker locker(lock);
+ Message::shared_ptr msg;
+ if(!messages.empty()){
+ msg = messages.front();
+ messages.pop();
+ }
+ return msg;
}
u_int32_t Queue::purge(){
diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp
index 857730f3f7..ad73c1b23b 100644
--- a/cpp/broker/src/SessionHandlerImpl.cpp
+++ b/cpp/broker/src/SessionHandlerImpl.cpp
@@ -338,7 +338,6 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t
void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global){
//TODO: handle global
- //TODO: channel doesn't do anything with these qos parameters yet
parent->getChannel(channel)->setPrefetchSize(prefetchSize);
parent->getChannel(channel)->setPrefetchCount(prefetchCount);
parent->client.getBasic().qosOk(channel);
@@ -349,7 +348,6 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_
bool noLocal, bool noAck, bool exclusive,
bool nowait){
- //TODO: implement nolocal
Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
Channel* channel = parent->channels[channelId];
if(!consumerTag.empty() && channel->exists(consumerTag)){
@@ -382,7 +380,13 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t
parent->getChannel(channel)->handlePublish(msg);
}
-void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck){}
+void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t ticket, string& queueName, bool noAck){
+ Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
+ if(!parent->getChannel(channelId)->get(queue, !noAck)){
+ string clusterId;//not used, part of an imatix hack
+ parent->client.getBasic().getEmpty(channelId, clusterId);
+ }
+}
void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){
try{