summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-09-27 13:21:43 +0000
committerGordon Sim <gsim@apache.org>2006-09-27 13:21:43 +0000
commit72aff4cc22932bd98b542ac9dd6393e23985e312 (patch)
treed3159aa7eeba203fea961b7397809ad805c74e1a /cpp
parentf1745fde1a2164878794f0bcf62fa62aacbe5cae (diff)
downloadqpid-python-72aff4cc22932bd98b542ac9dd6393e23985e312.tar.gz
Initial implementation of basic_ack & basic_recover
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@450434 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/broker/inc/Channel.h46
-rw-r--r--cpp/broker/src/Channel.cpp61
-rw-r--r--cpp/broker/src/SessionHandlerImpl.cpp8
-rw-r--r--cpp/client/test/topic_listener.cpp2
4 files changed, 103 insertions, 14 deletions
diff --git a/cpp/broker/inc/Channel.h b/cpp/broker/inc/Channel.h
index aaf2ce569b..b965665772 100644
--- a/cpp/broker/inc/Channel.h
+++ b/cpp/broker/inc/Channel.h
@@ -18,6 +18,7 @@
#ifndef _Channel_
#define _Channel_
+#include <algorithm>
#include <map>
#include "AMQContentBody.h"
#include "AMQHeaderBody.h"
@@ -35,18 +36,53 @@ namespace qpid {
class Channel{
private:
class ConsumerImpl : public virtual Consumer{
- ConnectionToken* const connection;
Channel* parent;
string tag;
Queue::shared_ptr queue;
+ ConnectionToken* const connection;
+ const bool ackExpected;
public:
- ConsumerImpl(Channel* parent, string& tag, Queue::shared_ptr queue, ConnectionToken* const connection);
+ ConsumerImpl(Channel* parent, string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack);
virtual bool deliver(Message::shared_ptr& msg);
void cancel();
};
typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator;
+ struct AckRecord{
+ Message::shared_ptr msg;
+ Queue::shared_ptr queue;
+ string consumerTag;
+ u_int64_t deliveryTag;
+
+ AckRecord(Message::shared_ptr _msg, Queue::shared_ptr _queue,
+ string _consumerTag, u_int64_t _deliveryTag) : msg(_msg),
+ queue(_queue),
+ consumerTag(_consumerTag),
+ deliveryTag(_deliveryTag){}
+ };
+
+ typedef std::vector<AckRecord>::iterator ack_iterator;
+
+ class MatchAck{
+ const u_int64_t tag;
+ public:
+ MatchAck(u_int64_t tag);
+ bool operator()(AckRecord& record) const;
+ };
+
+ class Requeue{
+ public:
+ void operator()(AckRecord& record) const;
+ };
+
+ class Redeliver{
+ Channel* const channel;
+ public:
+ Redeliver(Channel* const channel);
+ void operator()(AckRecord& record) const;
+ };
+
const int id;
qpid::framing::OutputHandler* out;
u_int64_t deliveryTag;
@@ -58,8 +94,10 @@ namespace qpid {
u_int32_t framesize;
Message::shared_ptr message;
NameGenerator tagGenerator;
+ std::vector<AckRecord> unacknowledged;
+ qpid::concurrent::MonitorImpl deliveryLock;
- void deliver(Message::shared_ptr& msg, string& tag);
+ void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr& queue, bool ackExpected);
void publish(ExchangeRegistry* exchanges);
public:
@@ -79,6 +117,8 @@ namespace qpid {
void close();
void commit();
void rollback();
+ void ack(u_int64_t deliveryTag, bool multiple);
+ void recover(bool requeue);
};
}
}
diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp
index f1f7d63a39..b49635e026 100644
--- a/cpp/broker/src/Channel.cpp
+++ b/cpp/broker/src/Channel.cpp
@@ -25,6 +25,7 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::concurrent;
+
Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : out(_out),
id(_id),
framesize(_framesize),
@@ -46,7 +47,7 @@ bool Channel::exists(string& consumerTag){
void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection){
if(tag.empty()) tag = tagGenerator.generate();
- ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection));
+ ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
try{
queue->consume(c, exclusive);//may throw exception
consumers[tag] = c;
@@ -92,22 +93,29 @@ void Channel::rollback(){
}
-void Channel::deliver(Message::shared_ptr& msg, string& consumerTag){
+void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){
+ Locker locker(deliveryLock);
+
+ u_int64_t myDeliveryTag = deliveryTag++;
+ if(ackExpected){
+ unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag));
+ }
//send deliver method, header and content(s)
- msg->deliver(out, id, consumerTag, deliveryTag++, framesize);
+ msg->deliver(out, id, consumerTag, myDeliveryTag, framesize);
}
Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, string& _tag,
Queue::shared_ptr _queue,
- ConnectionToken* const _connection) : parent(_parent),
- tag(_tag),
- queue(_queue),
- connection(_connection){
+ ConnectionToken* const _connection, bool ack) : parent(_parent),
+ tag(_tag),
+ queue(_queue),
+ connection(_connection),
+ ackExpected(ack){
}
bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
if(connection != msg->getPublisher()){
- parent->deliver(msg, tag);
+ parent->deliver(msg, tag, queue, ackExpected);
return true;
}else{
return false;
@@ -151,3 +159,40 @@ void Channel::publish(ExchangeRegistry* exchanges){
}
message.reset();
}
+
+void Channel::ack(u_int64_t deliveryTag, bool multiple){
+ ack_iterator i = find_if(unacknowledged.begin(), unacknowledged.end(), MatchAck(deliveryTag));
+ if(i == unacknowledged.end()){
+ //error: how should this be signalled?
+ }else if(multiple){
+ unacknowledged.erase(unacknowledged.begin(), ++i);
+ }else{
+ unacknowledged.erase(i);
+ }
+}
+
+void Channel::recover(bool requeue){
+ if(requeue){
+ //TODO: need to set redelivered flag
+ for_each(unacknowledged.begin(), unacknowledged.end(), Requeue());
+ unacknowledged.clear();
+ }else{
+ for_each(unacknowledged.begin(), unacknowledged.end(), Redeliver(this));
+ }
+}
+
+Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {}
+
+bool Channel::MatchAck::operator()(AckRecord& record) const{
+ return tag == record.deliveryTag;
+}
+
+void Channel::Requeue::operator()(AckRecord& record) const{
+ record.queue->deliver(record.msg);
+}
+
+Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {}
+
+void Channel::Redeliver::operator()(AckRecord& record) const{
+ record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize);
+}
diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp
index c72ad45d7c..2ce1c4b298 100644
--- a/cpp/broker/src/SessionHandlerImpl.cpp
+++ b/cpp/broker/src/SessionHandlerImpl.cpp
@@ -377,9 +377,13 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t
void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck){}
-void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){}
+void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){
+ parent->getChannel(channel)->ack(deliveryTag, multiple);
+}
void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue){}
-void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){}
+void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
+ parent->getChannel(channel)->recover(requeue);
+}
diff --git a/cpp/client/test/topic_listener.cpp b/cpp/client/test/topic_listener.cpp
index 707b3443a1..d5b6d77c1a 100644
--- a/cpp/client/test/topic_listener.cpp
+++ b/cpp/client/test/topic_listener.cpp
@@ -129,7 +129,7 @@ void Listener::report(){
report << "Received " << count << " messages in " << time << " ms.";
Message msg;
msg.setData(report.str());
- channel->publish(msg, Exchange::DEFAULT_DIRECT_EXCHANGE, responseQueue);
+ channel->publish(msg, string(""), responseQueue);
if(transactional){
channel->commit();
}