summaryrefslogtreecommitdiff
path: root/cpp/broker/inc/Channel.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/broker/inc/Channel.h')
-rw-r--r--cpp/broker/inc/Channel.h30
1 files changed, 23 insertions, 7 deletions
diff --git a/cpp/broker/inc/Channel.h b/cpp/broker/inc/Channel.h
index e76c8a63e9..4f4d8e2890 100644
--- a/cpp/broker/inc/Channel.h
+++ b/cpp/broker/inc/Channel.h
@@ -45,10 +45,12 @@ namespace qpid {
Queue::shared_ptr queue;
ConnectionToken* const connection;
const bool ackExpected;
+ bool blocked;
public:
ConsumerImpl(Channel* parent, string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack);
virtual bool deliver(Message::shared_ptr& msg);
void cancel();
+ void requestDispatch();
};
typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator;
@@ -87,6 +89,14 @@ namespace qpid {
void operator()(AckRecord& record) const;
};
+ class AddSize{
+ u_int32_t size;
+ public:
+ AddSize();
+ void operator()(AckRecord& record);
+ u_int32_t getSize();
+ };
+
const int id;
qpid::framing::OutputHandler* out;
u_int64_t deliveryTag;
@@ -95,6 +105,7 @@ namespace qpid {
std::map<string, ConsumerImpl*> consumers;
u_int32_t prefetchSize;
u_int16_t prefetchCount;
+ u_int32_t outstandingSize;
u_int32_t framesize;
Message::shared_ptr message;
NameGenerator tagGenerator;
@@ -103,12 +114,15 @@ namespace qpid {
void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr& queue, bool ackExpected);
void checkMessage(const std::string& text);
+ bool checkPrefetch(Message::shared_ptr& msg);
+ void cancel(consumer_iterator consumer);
- template<class Operation> void processMessage(Operation route){
+ template<class Operation> Operation processMessage(Operation route){
if(message->isComplete()){
route(message);
message.reset();
}
+ return route;
}
@@ -119,9 +133,9 @@ namespace qpid {
inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; }
inline u_int32_t setPrefetchSize(u_int32_t size){ prefetchSize = size; }
inline u_int16_t setPrefetchCount(u_int16_t count){ prefetchCount = count; }
- bool exists(string& consumerTag);
+ bool exists(const string& consumerTag);
void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0);
- void cancel(string& tag);
+ void cancel(const string& tag);
void begin();
void close();
void commit();
@@ -142,10 +156,10 @@ namespace qpid {
* there is no content routes it using the functor passed
* in.
*/
- template<class Operation> void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, Operation route){
+ template<class Operation> Operation handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, Operation route){
checkMessage("Invalid message sequence: got header before publish.");
message->setHeader(header);
- processMessage(route);
+ return processMessage(route);
}
/**
@@ -153,13 +167,15 @@ namespace qpid {
* if this completes the message, routes it using the
* functor passed in.
*/
- template<class Operation> void handleContent(qpid::framing::AMQContentBody::shared_ptr content, Operation route){
+ template<class Operation> Operation handleContent(qpid::framing::AMQContentBody::shared_ptr content, Operation route){
checkMessage("Invalid message sequence: got content before publish.");
message->addContent(content);
- processMessage(route);
+ return processMessage(route);
}
};
+
+ struct InvalidAckException{};
}
}