diff options
author | Gordon Sim <gsim@apache.org> | 2007-07-17 08:28:48 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-07-17 08:28:48 +0000 |
commit | ce9743f8f1640d42af5fe7aaa8fe7e3ca82a914d (patch) | |
tree | ea71b96a92eb5402b71a4c08312fbe1d8b835bbc /cpp/src/qpid/broker/BrokerChannel.h | |
parent | 54b8fe305e87f623bbeb2c50bea20a332f71a983 (diff) | |
download | qpid-python-ce9743f8f1640d42af5fe7aaa8fe7e3ca82a914d.tar.gz |
Some refactoring towards a more decoupled handler chain structure:
* Connection no longer depends on Channel; it contains a map of
FrameHandler::Chains. (The construction of the chains still refers
to specific handlers).
* Channel is no longer tied to ChannelAdapter through inheritance. The
former is independent of any particular handler chain or protocol
version, the latter is still used by ConnectionAdapter and
SemanticHandler in the 0-9 chain.
* A DeliveryAdapter interface has been introduced as part of the
separation of ChannelAdapter from Channel. This is intended to adapt
from a version independent core to version specific mechanisms for
sending messages. i.e. it fulfills the same role for outputs that
e.g. BrokerAdapter does for inputs. (Its not perfect yet by any
means but is a step on the way to the correct model I think).
* The connection related methods sent over channel zero are
implemented in their own adapter (ConnectionAdapter), and are
entirely separate from the semantic layer. The channel control
methods are still bundled with the proper semantic layer methods;
they too can be separated but would have to share the request id
with the semantic method handler due to the nature of the 0-9 WIP.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@556846 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/BrokerChannel.h')
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.h | 38 |
1 files changed, 16 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h index 9212e8f632..a2b6bd3ef9 100644 --- a/cpp/src/qpid/broker/BrokerChannel.h +++ b/cpp/src/qpid/broker/BrokerChannel.h @@ -23,6 +23,7 @@ */ #include <list> +#include <memory> #include <boost/scoped_ptr.hpp> #include <boost/shared_ptr.hpp> @@ -30,6 +31,7 @@ #include "AccumulatedAck.h" #include "Consumer.h" +#include "DeliveryAdapter.h" #include "DeliveryRecord.h" #include "DtxBuffer.h" #include "DtxManager.h" @@ -37,6 +39,7 @@ #include "NameGenerator.h" #include "Prefetch.h" #include "TxBuffer.h" +#include "qpid/framing/amqp_types.h" #include "qpid/framing/ChannelAdapter.h" #include "qpid/framing/ChannelOpenBody.h" #include "CompletionHandler.h" @@ -55,12 +58,12 @@ using framing::string; * Maintains state for an AMQP channel. Handles incoming and * outgoing messages for that channel. */ -class Channel : public framing::ChannelAdapter, - public CompletionHandler +class Channel : public CompletionHandler { class ConsumerImpl : public Consumer { Channel* parent; + std::auto_ptr<DeliveryAdapter> adapter; const string tag; Queue::shared_ptr queue; ConnectionToken* const connection; @@ -68,17 +71,19 @@ class Channel : public framing::ChannelAdapter, bool blocked; public: - ConsumerImpl(Channel* parent, const string& tag, - Queue::shared_ptr queue, + ConsumerImpl(Channel* parent, std::auto_ptr<DeliveryAdapter> adapter, + const string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack); ~ConsumerImpl(); - virtual bool deliver(Message::shared_ptr& msg); + bool deliver(Message::shared_ptr& msg); + void redeliver(Message::shared_ptr& msg, uint64_t deliveryTag); void cancel(); void requestDispatch(); }; typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap; + framing::ChannelId id; Connection& connection; uint64_t currentDeliveryTag; Queue::shared_ptr defaultQueue; @@ -97,15 +102,10 @@ class Channel : public framing::ChannelAdapter, MessageBuilder messageBuilder;//builder for in-progress message bool opened; bool flowActive; - boost::scoped_ptr<BrokerAdapter> adapter; - - // completion handler for MessageBuilder - void complete(Message::shared_ptr msg); - - void deliver(Message::shared_ptr& msg, const string& tag, - Queue::shared_ptr& queue, bool ackExpected); + + void complete(Message::shared_ptr msg);// completion handler for MessageBuilder + void record(const DeliveryRecord& delivery); bool checkPrefetch(Message::shared_ptr& msg); - void checkDtxTimeout(); public: @@ -113,7 +113,7 @@ class Channel : public framing::ChannelAdapter, ~Channel(); bool isOpen() const { return opened; } - BrokerAdapter& getAdapter() { return *adapter; } + framing::ChannelId getId() const { return id; } void open() { opened = true; } void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; } @@ -126,11 +126,11 @@ class Channel : public framing::ChannelAdapter, /** *@param tagInOut - if empty it is updated with the generated token. */ - void consume(string& tagInOut, Queue::shared_ptr queue, bool acks, + void consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0, const framing::FieldTable* = 0); void cancel(const string& tag); - bool get(Queue::shared_ptr queue, const std::string& destination, bool ackExpected); + bool get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected); void close(); void startTx(); void commit(); @@ -140,7 +140,6 @@ class Channel : public framing::ChannelAdapter, void endDtx(const std::string& xid, bool fail); void suspendDtx(const std::string& xid); void resumeDtx(const std::string& xid); - void ack(); void ack(uint64_t deliveryTag, bool multiple); void ack(uint64_t deliveryTag, uint64_t endTag); void recover(bool requeue); @@ -152,11 +151,6 @@ class Channel : public framing::ChannelAdapter, void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>); void handleInlineTransfer(Message::shared_ptr msg); - - // For ChannelAdapter - void handleMethodInContext( - boost::shared_ptr<framing::AMQMethodBody> method, - const framing::MethodContext& context); }; }} // namespace broker |