diff options
author | Gordon Sim <gsim@apache.org> | 2006-09-27 13:21:43 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-09-27 13:21:43 +0000 |
commit | 72aff4cc22932bd98b542ac9dd6393e23985e312 (patch) | |
tree | d3159aa7eeba203fea961b7397809ad805c74e1a /cpp | |
parent | f1745fde1a2164878794f0bcf62fa62aacbe5cae (diff) | |
download | qpid-python-72aff4cc22932bd98b542ac9dd6393e23985e312.tar.gz |
Initial implementation of basic_ack & basic_recover
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@450434 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/broker/inc/Channel.h | 46 | ||||
-rw-r--r-- | cpp/broker/src/Channel.cpp | 61 | ||||
-rw-r--r-- | cpp/broker/src/SessionHandlerImpl.cpp | 8 | ||||
-rw-r--r-- | cpp/client/test/topic_listener.cpp | 2 |
4 files changed, 103 insertions, 14 deletions
diff --git a/cpp/broker/inc/Channel.h b/cpp/broker/inc/Channel.h index aaf2ce569b..b965665772 100644 --- a/cpp/broker/inc/Channel.h +++ b/cpp/broker/inc/Channel.h @@ -18,6 +18,7 @@ #ifndef _Channel_ #define _Channel_ +#include <algorithm> #include <map> #include "AMQContentBody.h" #include "AMQHeaderBody.h" @@ -35,18 +36,53 @@ namespace qpid { class Channel{ private: class ConsumerImpl : public virtual Consumer{ - ConnectionToken* const connection; Channel* parent; string tag; Queue::shared_ptr queue; + ConnectionToken* const connection; + const bool ackExpected; public: - ConsumerImpl(Channel* parent, string& tag, Queue::shared_ptr queue, ConnectionToken* const connection); + ConsumerImpl(Channel* parent, string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack); virtual bool deliver(Message::shared_ptr& msg); void cancel(); }; typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator; + struct AckRecord{ + Message::shared_ptr msg; + Queue::shared_ptr queue; + string consumerTag; + u_int64_t deliveryTag; + + AckRecord(Message::shared_ptr _msg, Queue::shared_ptr _queue, + string _consumerTag, u_int64_t _deliveryTag) : msg(_msg), + queue(_queue), + consumerTag(_consumerTag), + deliveryTag(_deliveryTag){} + }; + + typedef std::vector<AckRecord>::iterator ack_iterator; + + class MatchAck{ + const u_int64_t tag; + public: + MatchAck(u_int64_t tag); + bool operator()(AckRecord& record) const; + }; + + class Requeue{ + public: + void operator()(AckRecord& record) const; + }; + + class Redeliver{ + Channel* const channel; + public: + Redeliver(Channel* const channel); + void operator()(AckRecord& record) const; + }; + const int id; qpid::framing::OutputHandler* out; u_int64_t deliveryTag; @@ -58,8 +94,10 @@ namespace qpid { u_int32_t framesize; Message::shared_ptr message; NameGenerator tagGenerator; + std::vector<AckRecord> unacknowledged; + qpid::concurrent::MonitorImpl deliveryLock; - void deliver(Message::shared_ptr& msg, string& tag); + void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr& queue, bool ackExpected); void publish(ExchangeRegistry* exchanges); public: @@ -79,6 +117,8 @@ namespace qpid { void close(); void commit(); void rollback(); + void ack(u_int64_t deliveryTag, bool multiple); + void recover(bool requeue); }; } } diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp index f1f7d63a39..b49635e026 100644 --- a/cpp/broker/src/Channel.cpp +++ b/cpp/broker/src/Channel.cpp @@ -25,6 +25,7 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::concurrent; + Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : out(_out), id(_id), framesize(_framesize), @@ -46,7 +47,7 @@ bool Channel::exists(string& consumerTag){ void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection){ if(tag.empty()) tag = tagGenerator.generate(); - ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection)); + ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks)); try{ queue->consume(c, exclusive);//may throw exception consumers[tag] = c; @@ -92,22 +93,29 @@ void Channel::rollback(){ } -void Channel::deliver(Message::shared_ptr& msg, string& consumerTag){ +void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){ + Locker locker(deliveryLock); + + u_int64_t myDeliveryTag = deliveryTag++; + if(ackExpected){ + unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag)); + } //send deliver method, header and content(s) - msg->deliver(out, id, consumerTag, deliveryTag++, framesize); + msg->deliver(out, id, consumerTag, myDeliveryTag, framesize); } Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, string& _tag, Queue::shared_ptr _queue, - ConnectionToken* const _connection) : parent(_parent), - tag(_tag), - queue(_queue), - connection(_connection){ + ConnectionToken* const _connection, bool ack) : parent(_parent), + tag(_tag), + queue(_queue), + connection(_connection), + ackExpected(ack){ } bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ if(connection != msg->getPublisher()){ - parent->deliver(msg, tag); + parent->deliver(msg, tag, queue, ackExpected); return true; }else{ return false; @@ -151,3 +159,40 @@ void Channel::publish(ExchangeRegistry* exchanges){ } message.reset(); } + +void Channel::ack(u_int64_t deliveryTag, bool multiple){ + ack_iterator i = find_if(unacknowledged.begin(), unacknowledged.end(), MatchAck(deliveryTag)); + if(i == unacknowledged.end()){ + //error: how should this be signalled? + }else if(multiple){ + unacknowledged.erase(unacknowledged.begin(), ++i); + }else{ + unacknowledged.erase(i); + } +} + +void Channel::recover(bool requeue){ + if(requeue){ + //TODO: need to set redelivered flag + for_each(unacknowledged.begin(), unacknowledged.end(), Requeue()); + unacknowledged.clear(); + }else{ + for_each(unacknowledged.begin(), unacknowledged.end(), Redeliver(this)); + } +} + +Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {} + +bool Channel::MatchAck::operator()(AckRecord& record) const{ + return tag == record.deliveryTag; +} + +void Channel::Requeue::operator()(AckRecord& record) const{ + record.queue->deliver(record.msg); +} + +Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {} + +void Channel::Redeliver::operator()(AckRecord& record) const{ + record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize); +} diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp index c72ad45d7c..2ce1c4b298 100644 --- a/cpp/broker/src/SessionHandlerImpl.cpp +++ b/cpp/broker/src/SessionHandlerImpl.cpp @@ -377,9 +377,13 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck){} -void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){} +void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){ + parent->getChannel(channel)->ack(deliveryTag, multiple); +} void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue){} -void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){} +void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){ + parent->getChannel(channel)->recover(requeue); +} diff --git a/cpp/client/test/topic_listener.cpp b/cpp/client/test/topic_listener.cpp index 707b3443a1..d5b6d77c1a 100644 --- a/cpp/client/test/topic_listener.cpp +++ b/cpp/client/test/topic_listener.cpp @@ -129,7 +129,7 @@ void Listener::report(){ report << "Received " << count << " messages in " << time << " ms."; Message msg; msg.setData(report.str()); - channel->publish(msg, Exchange::DEFAULT_DIRECT_EXCHANGE, responseQueue); + channel->publish(msg, string(""), responseQueue); if(transactional){ channel->commit(); } |