diff options
Diffstat (limited to 'cpp/src/qpid/broker/BrokerChannel.h')
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.h | 38 |
1 files changed, 16 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h index 9212e8f632..a2b6bd3ef9 100644 --- a/cpp/src/qpid/broker/BrokerChannel.h +++ b/cpp/src/qpid/broker/BrokerChannel.h @@ -23,6 +23,7 @@ */ #include <list> +#include <memory> #include <boost/scoped_ptr.hpp> #include <boost/shared_ptr.hpp> @@ -30,6 +31,7 @@ #include "AccumulatedAck.h" #include "Consumer.h" +#include "DeliveryAdapter.h" #include "DeliveryRecord.h" #include "DtxBuffer.h" #include "DtxManager.h" @@ -37,6 +39,7 @@ #include "NameGenerator.h" #include "Prefetch.h" #include "TxBuffer.h" +#include "qpid/framing/amqp_types.h" #include "qpid/framing/ChannelAdapter.h" #include "qpid/framing/ChannelOpenBody.h" #include "CompletionHandler.h" @@ -55,12 +58,12 @@ using framing::string; * Maintains state for an AMQP channel. Handles incoming and * outgoing messages for that channel. */ -class Channel : public framing::ChannelAdapter, - public CompletionHandler +class Channel : public CompletionHandler { class ConsumerImpl : public Consumer { Channel* parent; + std::auto_ptr<DeliveryAdapter> adapter; const string tag; Queue::shared_ptr queue; ConnectionToken* const connection; @@ -68,17 +71,19 @@ class Channel : public framing::ChannelAdapter, bool blocked; public: - ConsumerImpl(Channel* parent, const string& tag, - Queue::shared_ptr queue, + ConsumerImpl(Channel* parent, std::auto_ptr<DeliveryAdapter> adapter, + const string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack); ~ConsumerImpl(); - virtual bool deliver(Message::shared_ptr& msg); + bool deliver(Message::shared_ptr& msg); + void redeliver(Message::shared_ptr& msg, uint64_t deliveryTag); void cancel(); void requestDispatch(); }; typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap; + framing::ChannelId id; Connection& connection; uint64_t currentDeliveryTag; Queue::shared_ptr defaultQueue; @@ -97,15 +102,10 @@ class Channel : public framing::ChannelAdapter, MessageBuilder messageBuilder;//builder for in-progress message bool opened; bool flowActive; - boost::scoped_ptr<BrokerAdapter> adapter; - - // completion handler for MessageBuilder - void complete(Message::shared_ptr msg); - - void deliver(Message::shared_ptr& msg, const string& tag, - Queue::shared_ptr& queue, bool ackExpected); + + void complete(Message::shared_ptr msg);// completion handler for MessageBuilder + void record(const DeliveryRecord& delivery); bool checkPrefetch(Message::shared_ptr& msg); - void checkDtxTimeout(); public: @@ -113,7 +113,7 @@ class Channel : public framing::ChannelAdapter, ~Channel(); bool isOpen() const { return opened; } - BrokerAdapter& getAdapter() { return *adapter; } + framing::ChannelId getId() const { return id; } void open() { opened = true; } void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; } @@ -126,11 +126,11 @@ class Channel : public framing::ChannelAdapter, /** *@param tagInOut - if empty it is updated with the generated token. */ - void consume(string& tagInOut, Queue::shared_ptr queue, bool acks, + void consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0, const framing::FieldTable* = 0); void cancel(const string& tag); - bool get(Queue::shared_ptr queue, const std::string& destination, bool ackExpected); + bool get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected); void close(); void startTx(); void commit(); @@ -140,7 +140,6 @@ class Channel : public framing::ChannelAdapter, void endDtx(const std::string& xid, bool fail); void suspendDtx(const std::string& xid); void resumeDtx(const std::string& xid); - void ack(); void ack(uint64_t deliveryTag, bool multiple); void ack(uint64_t deliveryTag, uint64_t endTag); void recover(bool requeue); @@ -152,11 +151,6 @@ class Channel : public framing::ChannelAdapter, void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>); void handleInlineTransfer(Message::shared_ptr msg); - - // For ChannelAdapter - void handleMethodInContext( - boost::shared_ptr<framing::AMQMethodBody> method, - const framing::MethodContext& context); }; }} // namespace broker |