diff options
Diffstat (limited to 'cpp/broker/inc/Channel.h')
-rw-r--r-- | cpp/broker/inc/Channel.h | 48 |
1 files changed, 44 insertions, 4 deletions
diff --git a/cpp/broker/inc/Channel.h b/cpp/broker/inc/Channel.h index b965665772..e76c8a63e9 100644 --- a/cpp/broker/inc/Channel.h +++ b/cpp/broker/inc/Channel.h @@ -33,6 +33,10 @@ namespace qpid { namespace broker { + /** + * Maintains state for an AMQP channel. Handles incoming and + * outgoing messages for that channel. + */ class Channel{ private: class ConsumerImpl : public virtual Consumer{ @@ -98,7 +102,15 @@ namespace qpid { qpid::concurrent::MonitorImpl deliveryLock; void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr& queue, bool ackExpected); - void publish(ExchangeRegistry* exchanges); + void checkMessage(const std::string& text); + + template<class Operation> void processMessage(Operation route){ + if(message->isComplete()){ + route(message); + message.reset(); + } + } + public: Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize); @@ -107,9 +119,6 @@ namespace qpid { inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; } inline u_int32_t setPrefetchSize(u_int32_t size){ prefetchSize = size; } inline u_int16_t setPrefetchCount(u_int16_t count){ prefetchCount = count; } - void handlePublish(Message* msg); - void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, ExchangeRegistry* exchanges); - void handleContent(qpid::framing::AMQContentBody::shared_ptr content, ExchangeRegistry* exchanges); bool exists(string& consumerTag); void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0); void cancel(string& tag); @@ -119,6 +128,37 @@ namespace qpid { void rollback(); void ack(u_int64_t deliveryTag, bool multiple); void recover(bool requeue); + + /** + * Handles the initial publish request though a + * channel. The header and (if applicable) content will be + * accumulated through calls to handleHeader() and + * handleContent() + */ + void handlePublish(Message* msg); + + /** + * A template method that handles a received header and if + * there is no content routes it using the functor passed + * in. + */ + template<class Operation> void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, Operation route){ + checkMessage("Invalid message sequence: got header before publish."); + message->setHeader(header); + processMessage(route); + } + + /** + * A template method that handles a received content and + * if this completes the message, routes it using the + * functor passed in. + */ + template<class Operation> void handleContent(qpid::framing::AMQContentBody::shared_ptr content, Operation route){ + checkMessage("Invalid message sequence: got content before publish."); + message->addContent(content); + processMessage(route); + } + }; } } |