summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/broker/inc/Message.h2
-rw-r--r--cpp/broker/src/Channel.cpp2
-rw-r--r--cpp/broker/src/Message.cpp9
3 files changed, 10 insertions, 3 deletions
diff --git a/cpp/broker/inc/Message.h b/cpp/broker/inc/Message.h
index 37a0c9b2c8..7a239adace 100644
--- a/cpp/broker/inc/Message.h
+++ b/cpp/broker/inc/Message.h
@@ -39,6 +39,7 @@ namespace qpid {
string routingKey;
const bool mandatory;
const bool immediate;
+ bool redelivered;
qpid::framing::AMQHeaderBody::shared_ptr header;
content_list content;
@@ -61,6 +62,7 @@ namespace qpid {
void deliver(qpid::framing::OutputHandler* out, int channel,
string& consumerTag, u_int64_t deliveryTag,
u_int32_t framesize);
+ void redeliver();
friend bool route(Message::shared_ptr& msg, ExchangeRegistry* registry);
diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp
index b49635e026..4fb6a52b99 100644
--- a/cpp/broker/src/Channel.cpp
+++ b/cpp/broker/src/Channel.cpp
@@ -173,7 +173,6 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){
void Channel::recover(bool requeue){
if(requeue){
- //TODO: need to set redelivered flag
for_each(unacknowledged.begin(), unacknowledged.end(), Requeue());
unacknowledged.clear();
}else{
@@ -188,6 +187,7 @@ bool Channel::MatchAck::operator()(AckRecord& record) const{
}
void Channel::Requeue::operator()(AckRecord& record) const{
+ record.msg->redeliver();
record.queue->deliver(record.msg);
}
diff --git a/cpp/broker/src/Message.cpp b/cpp/broker/src/Message.cpp
index 7afcd97934..8ebe40410a 100644
--- a/cpp/broker/src/Message.cpp
+++ b/cpp/broker/src/Message.cpp
@@ -32,7 +32,8 @@ Message::Message(const ConnectionToken* const _publisher,
exchange(_exchange),
routingKey(_routingKey),
mandatory(_mandatory),
- immediate(_immediate){
+ immediate(_immediate),
+ redelivered(false){
}
@@ -51,11 +52,15 @@ bool Message::isComplete(){
return header.get() && (header->getContentSize() == contentSize());
}
+void Message::redeliver(){
+ redelivered = true;
+}
+
void Message::deliver(OutputHandler* out, int channel,
string& consumerTag, u_int64_t deliveryTag,
u_int32_t framesize){
- out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, false, exchange, routingKey)));
+ out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey)));
AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
out->send(new AMQFrame(channel, headerBody));
for(content_iterator i = content.begin(); i != content.end(); i++){