summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerChannel.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/BrokerChannel.h')
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.h38
1 files changed, 16 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h
index 9212e8f632..a2b6bd3ef9 100644
--- a/cpp/src/qpid/broker/BrokerChannel.h
+++ b/cpp/src/qpid/broker/BrokerChannel.h
@@ -23,6 +23,7 @@
*/
#include <list>
+#include <memory>
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
@@ -30,6 +31,7 @@
#include "AccumulatedAck.h"
#include "Consumer.h"
+#include "DeliveryAdapter.h"
#include "DeliveryRecord.h"
#include "DtxBuffer.h"
#include "DtxManager.h"
@@ -37,6 +39,7 @@
#include "NameGenerator.h"
#include "Prefetch.h"
#include "TxBuffer.h"
+#include "qpid/framing/amqp_types.h"
#include "qpid/framing/ChannelAdapter.h"
#include "qpid/framing/ChannelOpenBody.h"
#include "CompletionHandler.h"
@@ -55,12 +58,12 @@ using framing::string;
* Maintains state for an AMQP channel. Handles incoming and
* outgoing messages for that channel.
*/
-class Channel : public framing::ChannelAdapter,
- public CompletionHandler
+class Channel : public CompletionHandler
{
class ConsumerImpl : public Consumer
{
Channel* parent;
+ std::auto_ptr<DeliveryAdapter> adapter;
const string tag;
Queue::shared_ptr queue;
ConnectionToken* const connection;
@@ -68,17 +71,19 @@ class Channel : public framing::ChannelAdapter,
bool blocked;
public:
- ConsumerImpl(Channel* parent, const string& tag,
- Queue::shared_ptr queue,
+ ConsumerImpl(Channel* parent, std::auto_ptr<DeliveryAdapter> adapter,
+ const string& tag, Queue::shared_ptr queue,
ConnectionToken* const connection, bool ack);
~ConsumerImpl();
- virtual bool deliver(Message::shared_ptr& msg);
+ bool deliver(Message::shared_ptr& msg);
+ void redeliver(Message::shared_ptr& msg, uint64_t deliveryTag);
void cancel();
void requestDispatch();
};
typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
+ framing::ChannelId id;
Connection& connection;
uint64_t currentDeliveryTag;
Queue::shared_ptr defaultQueue;
@@ -97,15 +102,10 @@ class Channel : public framing::ChannelAdapter,
MessageBuilder messageBuilder;//builder for in-progress message
bool opened;
bool flowActive;
- boost::scoped_ptr<BrokerAdapter> adapter;
-
- // completion handler for MessageBuilder
- void complete(Message::shared_ptr msg);
-
- void deliver(Message::shared_ptr& msg, const string& tag,
- Queue::shared_ptr& queue, bool ackExpected);
+
+ void complete(Message::shared_ptr msg);// completion handler for MessageBuilder
+ void record(const DeliveryRecord& delivery);
bool checkPrefetch(Message::shared_ptr& msg);
-
void checkDtxTimeout();
public:
@@ -113,7 +113,7 @@ class Channel : public framing::ChannelAdapter,
~Channel();
bool isOpen() const { return opened; }
- BrokerAdapter& getAdapter() { return *adapter; }
+ framing::ChannelId getId() const { return id; }
void open() { opened = true; }
void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
@@ -126,11 +126,11 @@ class Channel : public framing::ChannelAdapter,
/**
*@param tagInOut - if empty it is updated with the generated token.
*/
- void consume(string& tagInOut, Queue::shared_ptr queue, bool acks,
+ void consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut, Queue::shared_ptr queue, bool acks,
bool exclusive, ConnectionToken* const connection = 0,
const framing::FieldTable* = 0);
void cancel(const string& tag);
- bool get(Queue::shared_ptr queue, const std::string& destination, bool ackExpected);
+ bool get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected);
void close();
void startTx();
void commit();
@@ -140,7 +140,6 @@ class Channel : public framing::ChannelAdapter,
void endDtx(const std::string& xid, bool fail);
void suspendDtx(const std::string& xid);
void resumeDtx(const std::string& xid);
- void ack();
void ack(uint64_t deliveryTag, bool multiple);
void ack(uint64_t deliveryTag, uint64_t endTag);
void recover(bool requeue);
@@ -152,11 +151,6 @@ class Channel : public framing::ChannelAdapter,
void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
void handleInlineTransfer(Message::shared_ptr msg);
-
- // For ChannelAdapter
- void handleMethodInContext(
- boost::shared_ptr<framing::AMQMethodBody> method,
- const framing::MethodContext& context);
};
}} // namespace broker