summaryrefslogtreecommitdiff
path: root/cpp/broker/inc/Channel.h
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-09-27 13:21:43 +0000
committerGordon Sim <gsim@apache.org>2006-09-27 13:21:43 +0000
commit72aff4cc22932bd98b542ac9dd6393e23985e312 (patch)
treed3159aa7eeba203fea961b7397809ad805c74e1a /cpp/broker/inc/Channel.h
parentf1745fde1a2164878794f0bcf62fa62aacbe5cae (diff)
downloadqpid-python-72aff4cc22932bd98b542ac9dd6393e23985e312.tar.gz
Initial implementation of basic_ack & basic_recover
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@450434 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/broker/inc/Channel.h')
-rw-r--r--cpp/broker/inc/Channel.h46
1 files changed, 43 insertions, 3 deletions
diff --git a/cpp/broker/inc/Channel.h b/cpp/broker/inc/Channel.h
index aaf2ce569b..b965665772 100644
--- a/cpp/broker/inc/Channel.h
+++ b/cpp/broker/inc/Channel.h
@@ -18,6 +18,7 @@
#ifndef _Channel_
#define _Channel_
+#include <algorithm>
#include <map>
#include "AMQContentBody.h"
#include "AMQHeaderBody.h"
@@ -35,18 +36,53 @@ namespace qpid {
class Channel{
private:
class ConsumerImpl : public virtual Consumer{
- ConnectionToken* const connection;
Channel* parent;
string tag;
Queue::shared_ptr queue;
+ ConnectionToken* const connection;
+ const bool ackExpected;
public:
- ConsumerImpl(Channel* parent, string& tag, Queue::shared_ptr queue, ConnectionToken* const connection);
+ ConsumerImpl(Channel* parent, string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack);
virtual bool deliver(Message::shared_ptr& msg);
void cancel();
};
typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator;
+ struct AckRecord{
+ Message::shared_ptr msg;
+ Queue::shared_ptr queue;
+ string consumerTag;
+ u_int64_t deliveryTag;
+
+ AckRecord(Message::shared_ptr _msg, Queue::shared_ptr _queue,
+ string _consumerTag, u_int64_t _deliveryTag) : msg(_msg),
+ queue(_queue),
+ consumerTag(_consumerTag),
+ deliveryTag(_deliveryTag){}
+ };
+
+ typedef std::vector<AckRecord>::iterator ack_iterator;
+
+ class MatchAck{
+ const u_int64_t tag;
+ public:
+ MatchAck(u_int64_t tag);
+ bool operator()(AckRecord& record) const;
+ };
+
+ class Requeue{
+ public:
+ void operator()(AckRecord& record) const;
+ };
+
+ class Redeliver{
+ Channel* const channel;
+ public:
+ Redeliver(Channel* const channel);
+ void operator()(AckRecord& record) const;
+ };
+
const int id;
qpid::framing::OutputHandler* out;
u_int64_t deliveryTag;
@@ -58,8 +94,10 @@ namespace qpid {
u_int32_t framesize;
Message::shared_ptr message;
NameGenerator tagGenerator;
+ std::vector<AckRecord> unacknowledged;
+ qpid::concurrent::MonitorImpl deliveryLock;
- void deliver(Message::shared_ptr& msg, string& tag);
+ void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr& queue, bool ackExpected);
void publish(ExchangeRegistry* exchanges);
public:
@@ -79,6 +117,8 @@ namespace qpid {
void close();
void commit();
void rollback();
+ void ack(u_int64_t deliveryTag, bool multiple);
+ void recover(bool requeue);
};
}
}