diff options
Diffstat (limited to 'cpp/broker/inc/Channel.h')
-rw-r--r-- | cpp/broker/inc/Channel.h | 30 |
1 files changed, 23 insertions, 7 deletions
diff --git a/cpp/broker/inc/Channel.h b/cpp/broker/inc/Channel.h index e76c8a63e9..4f4d8e2890 100644 --- a/cpp/broker/inc/Channel.h +++ b/cpp/broker/inc/Channel.h @@ -45,10 +45,12 @@ namespace qpid { Queue::shared_ptr queue; ConnectionToken* const connection; const bool ackExpected; + bool blocked; public: ConsumerImpl(Channel* parent, string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack); virtual bool deliver(Message::shared_ptr& msg); void cancel(); + void requestDispatch(); }; typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator; @@ -87,6 +89,14 @@ namespace qpid { void operator()(AckRecord& record) const; }; + class AddSize{ + u_int32_t size; + public: + AddSize(); + void operator()(AckRecord& record); + u_int32_t getSize(); + }; + const int id; qpid::framing::OutputHandler* out; u_int64_t deliveryTag; @@ -95,6 +105,7 @@ namespace qpid { std::map<string, ConsumerImpl*> consumers; u_int32_t prefetchSize; u_int16_t prefetchCount; + u_int32_t outstandingSize; u_int32_t framesize; Message::shared_ptr message; NameGenerator tagGenerator; @@ -103,12 +114,15 @@ namespace qpid { void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr& queue, bool ackExpected); void checkMessage(const std::string& text); + bool checkPrefetch(Message::shared_ptr& msg); + void cancel(consumer_iterator consumer); - template<class Operation> void processMessage(Operation route){ + template<class Operation> Operation processMessage(Operation route){ if(message->isComplete()){ route(message); message.reset(); } + return route; } @@ -119,9 +133,9 @@ namespace qpid { inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; } inline u_int32_t setPrefetchSize(u_int32_t size){ prefetchSize = size; } inline u_int16_t setPrefetchCount(u_int16_t count){ prefetchCount = count; } - bool exists(string& consumerTag); + bool exists(const string& consumerTag); void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0); - void cancel(string& tag); + void cancel(const string& tag); void begin(); void close(); void commit(); @@ -142,10 +156,10 @@ namespace qpid { * there is no content routes it using the functor passed * in. */ - template<class Operation> void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, Operation route){ + template<class Operation> Operation handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, Operation route){ checkMessage("Invalid message sequence: got header before publish."); message->setHeader(header); - processMessage(route); + return processMessage(route); } /** @@ -153,13 +167,15 @@ namespace qpid { * if this completes the message, routes it using the * functor passed in. */ - template<class Operation> void handleContent(qpid::framing::AMQContentBody::shared_ptr content, Operation route){ + template<class Operation> Operation handleContent(qpid::framing::AMQContentBody::shared_ptr content, Operation route){ checkMessage("Invalid message sequence: got content before publish."); message->addContent(content); - processMessage(route); + return processMessage(route); } }; + + struct InvalidAckException{}; } } |