summaryrefslogtreecommitdiff
path: root/cpp/broker/inc/Channel.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/broker/inc/Channel.h')
-rw-r--r--cpp/broker/inc/Channel.h48
1 files changed, 44 insertions, 4 deletions
diff --git a/cpp/broker/inc/Channel.h b/cpp/broker/inc/Channel.h
index b965665772..e76c8a63e9 100644
--- a/cpp/broker/inc/Channel.h
+++ b/cpp/broker/inc/Channel.h
@@ -33,6 +33,10 @@
namespace qpid {
namespace broker {
+ /**
+ * Maintains state for an AMQP channel. Handles incoming and
+ * outgoing messages for that channel.
+ */
class Channel{
private:
class ConsumerImpl : public virtual Consumer{
@@ -98,7 +102,15 @@ namespace qpid {
qpid::concurrent::MonitorImpl deliveryLock;
void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr& queue, bool ackExpected);
- void publish(ExchangeRegistry* exchanges);
+ void checkMessage(const std::string& text);
+
+ template<class Operation> void processMessage(Operation route){
+ if(message->isComplete()){
+ route(message);
+ message.reset();
+ }
+ }
+
public:
Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize);
@@ -107,9 +119,6 @@ namespace qpid {
inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; }
inline u_int32_t setPrefetchSize(u_int32_t size){ prefetchSize = size; }
inline u_int16_t setPrefetchCount(u_int16_t count){ prefetchCount = count; }
- void handlePublish(Message* msg);
- void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, ExchangeRegistry* exchanges);
- void handleContent(qpid::framing::AMQContentBody::shared_ptr content, ExchangeRegistry* exchanges);
bool exists(string& consumerTag);
void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0);
void cancel(string& tag);
@@ -119,6 +128,37 @@ namespace qpid {
void rollback();
void ack(u_int64_t deliveryTag, bool multiple);
void recover(bool requeue);
+
+ /**
+ * Handles the initial publish request though a
+ * channel. The header and (if applicable) content will be
+ * accumulated through calls to handleHeader() and
+ * handleContent()
+ */
+ void handlePublish(Message* msg);
+
+ /**
+ * A template method that handles a received header and if
+ * there is no content routes it using the functor passed
+ * in.
+ */
+ template<class Operation> void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, Operation route){
+ checkMessage("Invalid message sequence: got header before publish.");
+ message->setHeader(header);
+ processMessage(route);
+ }
+
+ /**
+ * A template method that handles a received content and
+ * if this completes the message, routes it using the
+ * functor passed in.
+ */
+ template<class Operation> void handleContent(qpid::framing::AMQContentBody::shared_ptr content, Operation route){
+ checkMessage("Invalid message sequence: got content before publish.");
+ message->addContent(content);
+ processMessage(route);
+ }
+
};
}
}