summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerChannel.h
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-07-17 08:28:48 +0000
committerGordon Sim <gsim@apache.org>2007-07-17 08:28:48 +0000
commitce9743f8f1640d42af5fe7aaa8fe7e3ca82a914d (patch)
treeea71b96a92eb5402b71a4c08312fbe1d8b835bbc /cpp/src/qpid/broker/BrokerChannel.h
parent54b8fe305e87f623bbeb2c50bea20a332f71a983 (diff)
downloadqpid-python-ce9743f8f1640d42af5fe7aaa8fe7e3ca82a914d.tar.gz
Some refactoring towards a more decoupled handler chain structure:
* Connection no longer depends on Channel; it contains a map of FrameHandler::Chains. (The construction of the chains still refers to specific handlers). * Channel is no longer tied to ChannelAdapter through inheritance. The former is independent of any particular handler chain or protocol version, the latter is still used by ConnectionAdapter and SemanticHandler in the 0-9 chain. * A DeliveryAdapter interface has been introduced as part of the separation of ChannelAdapter from Channel. This is intended to adapt from a version independent core to version specific mechanisms for sending messages. i.e. it fulfills the same role for outputs that e.g. BrokerAdapter does for inputs. (Its not perfect yet by any means but is a step on the way to the correct model I think). * The connection related methods sent over channel zero are implemented in their own adapter (ConnectionAdapter), and are entirely separate from the semantic layer. The channel control methods are still bundled with the proper semantic layer methods; they too can be separated but would have to share the request id with the semantic method handler due to the nature of the 0-9 WIP. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@556846 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/BrokerChannel.h')
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.h38
1 files changed, 16 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h
index 9212e8f632..a2b6bd3ef9 100644
--- a/cpp/src/qpid/broker/BrokerChannel.h
+++ b/cpp/src/qpid/broker/BrokerChannel.h
@@ -23,6 +23,7 @@
*/
#include <list>
+#include <memory>
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
@@ -30,6 +31,7 @@
#include "AccumulatedAck.h"
#include "Consumer.h"
+#include "DeliveryAdapter.h"
#include "DeliveryRecord.h"
#include "DtxBuffer.h"
#include "DtxManager.h"
@@ -37,6 +39,7 @@
#include "NameGenerator.h"
#include "Prefetch.h"
#include "TxBuffer.h"
+#include "qpid/framing/amqp_types.h"
#include "qpid/framing/ChannelAdapter.h"
#include "qpid/framing/ChannelOpenBody.h"
#include "CompletionHandler.h"
@@ -55,12 +58,12 @@ using framing::string;
* Maintains state for an AMQP channel. Handles incoming and
* outgoing messages for that channel.
*/
-class Channel : public framing::ChannelAdapter,
- public CompletionHandler
+class Channel : public CompletionHandler
{
class ConsumerImpl : public Consumer
{
Channel* parent;
+ std::auto_ptr<DeliveryAdapter> adapter;
const string tag;
Queue::shared_ptr queue;
ConnectionToken* const connection;
@@ -68,17 +71,19 @@ class Channel : public framing::ChannelAdapter,
bool blocked;
public:
- ConsumerImpl(Channel* parent, const string& tag,
- Queue::shared_ptr queue,
+ ConsumerImpl(Channel* parent, std::auto_ptr<DeliveryAdapter> adapter,
+ const string& tag, Queue::shared_ptr queue,
ConnectionToken* const connection, bool ack);
~ConsumerImpl();
- virtual bool deliver(Message::shared_ptr& msg);
+ bool deliver(Message::shared_ptr& msg);
+ void redeliver(Message::shared_ptr& msg, uint64_t deliveryTag);
void cancel();
void requestDispatch();
};
typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
+ framing::ChannelId id;
Connection& connection;
uint64_t currentDeliveryTag;
Queue::shared_ptr defaultQueue;
@@ -97,15 +102,10 @@ class Channel : public framing::ChannelAdapter,
MessageBuilder messageBuilder;//builder for in-progress message
bool opened;
bool flowActive;
- boost::scoped_ptr<BrokerAdapter> adapter;
-
- // completion handler for MessageBuilder
- void complete(Message::shared_ptr msg);
-
- void deliver(Message::shared_ptr& msg, const string& tag,
- Queue::shared_ptr& queue, bool ackExpected);
+
+ void complete(Message::shared_ptr msg);// completion handler for MessageBuilder
+ void record(const DeliveryRecord& delivery);
bool checkPrefetch(Message::shared_ptr& msg);
-
void checkDtxTimeout();
public:
@@ -113,7 +113,7 @@ class Channel : public framing::ChannelAdapter,
~Channel();
bool isOpen() const { return opened; }
- BrokerAdapter& getAdapter() { return *adapter; }
+ framing::ChannelId getId() const { return id; }
void open() { opened = true; }
void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
@@ -126,11 +126,11 @@ class Channel : public framing::ChannelAdapter,
/**
*@param tagInOut - if empty it is updated with the generated token.
*/
- void consume(string& tagInOut, Queue::shared_ptr queue, bool acks,
+ void consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut, Queue::shared_ptr queue, bool acks,
bool exclusive, ConnectionToken* const connection = 0,
const framing::FieldTable* = 0);
void cancel(const string& tag);
- bool get(Queue::shared_ptr queue, const std::string& destination, bool ackExpected);
+ bool get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected);
void close();
void startTx();
void commit();
@@ -140,7 +140,6 @@ class Channel : public framing::ChannelAdapter,
void endDtx(const std::string& xid, bool fail);
void suspendDtx(const std::string& xid);
void resumeDtx(const std::string& xid);
- void ack();
void ack(uint64_t deliveryTag, bool multiple);
void ack(uint64_t deliveryTag, uint64_t endTag);
void recover(bool requeue);
@@ -152,11 +151,6 @@ class Channel : public framing::ChannelAdapter,
void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
void handleInlineTransfer(Message::shared_ptr msg);
-
- // For ChannelAdapter
- void handleMethodInContext(
- boost::shared_ptr<framing::AMQMethodBody> method,
- const framing::MethodContext& context);
};
}} // namespace broker