diff options
Diffstat (limited to 'cpp/broker/inc/Channel.h')
-rw-r--r-- | cpp/broker/inc/Channel.h | 46 |
1 files changed, 43 insertions, 3 deletions
diff --git a/cpp/broker/inc/Channel.h b/cpp/broker/inc/Channel.h index aaf2ce569b..b965665772 100644 --- a/cpp/broker/inc/Channel.h +++ b/cpp/broker/inc/Channel.h @@ -18,6 +18,7 @@ #ifndef _Channel_ #define _Channel_ +#include <algorithm> #include <map> #include "AMQContentBody.h" #include "AMQHeaderBody.h" @@ -35,18 +36,53 @@ namespace qpid { class Channel{ private: class ConsumerImpl : public virtual Consumer{ - ConnectionToken* const connection; Channel* parent; string tag; Queue::shared_ptr queue; + ConnectionToken* const connection; + const bool ackExpected; public: - ConsumerImpl(Channel* parent, string& tag, Queue::shared_ptr queue, ConnectionToken* const connection); + ConsumerImpl(Channel* parent, string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack); virtual bool deliver(Message::shared_ptr& msg); void cancel(); }; typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator; + struct AckRecord{ + Message::shared_ptr msg; + Queue::shared_ptr queue; + string consumerTag; + u_int64_t deliveryTag; + + AckRecord(Message::shared_ptr _msg, Queue::shared_ptr _queue, + string _consumerTag, u_int64_t _deliveryTag) : msg(_msg), + queue(_queue), + consumerTag(_consumerTag), + deliveryTag(_deliveryTag){} + }; + + typedef std::vector<AckRecord>::iterator ack_iterator; + + class MatchAck{ + const u_int64_t tag; + public: + MatchAck(u_int64_t tag); + bool operator()(AckRecord& record) const; + }; + + class Requeue{ + public: + void operator()(AckRecord& record) const; + }; + + class Redeliver{ + Channel* const channel; + public: + Redeliver(Channel* const channel); + void operator()(AckRecord& record) const; + }; + const int id; qpid::framing::OutputHandler* out; u_int64_t deliveryTag; @@ -58,8 +94,10 @@ namespace qpid { u_int32_t framesize; Message::shared_ptr message; NameGenerator tagGenerator; + std::vector<AckRecord> unacknowledged; + qpid::concurrent::MonitorImpl deliveryLock; - void deliver(Message::shared_ptr& msg, string& tag); + void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr& queue, bool ackExpected); void publish(ExchangeRegistry* exchanges); public: @@ -79,6 +117,8 @@ namespace qpid { void close(); void commit(); void rollback(); + void ack(u_int64_t deliveryTag, bool multiple); + void recover(bool requeue); }; } } |