diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Connection.h')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.h | 26 |
1 files changed, 17 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 98b47e1bc0..6434f763a8 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -27,14 +27,15 @@ #include "OutputInterceptor.h" #include "NoOpConnectionOutputHandler.h" #include "EventFrame.h" +#include "McastFrameHandler.h" #include "qpid/broker/Connection.h" #include "qpid/amqp_0_10/Connection.h" #include "qpid/sys/AtomicValue.h" #include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/ConnectionOutputHandler.h" -#include "qpid/framing/FrameDecoder.h" #include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/FrameDecoder.h" #include <iosfwd> @@ -63,10 +64,10 @@ class Connection : public: typedef sys::PollableQueue<EventFrame> PollableFrameQueue; - /** Local connection, use this in ConnectionId */ - Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp, bool isLink); - /** Shadow connection */ - Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, ConnectionId); + /** Local connection. */ + Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId, bool catchUp, bool isLink); + /** Shadow connection. */ + Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id); ~Connection(); ConnectionId getId() const { return self; } @@ -99,7 +100,7 @@ class Connection : /** Called if the connectors member has left the cluster */ void left(); - // ConnectionCodec methods + // ConnectionCodec methods - called by IO layer with a read buffer. size_t decode(const char* buffer, size_t size); // Called for data delivered from the cluster. @@ -117,9 +118,9 @@ class Connection : const framing::SequenceNumber& received, const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete); - void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username); + void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment); - void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId); + void membership(const framing::FieldTable&, const framing::FieldTable&); void deliveryRecord(const std::string& queue, const framing::SequenceNumber& position, @@ -134,6 +135,7 @@ class Connection : uint32_t credit); void queuePosition(const std::string&, const framing::SequenceNumber&); + void expiryId(uint64_t); void txStart(); void txAccept(const framing::SequenceSet&); @@ -148,8 +150,12 @@ class Connection : void exchange(const std::string& encoded); void giveReadCredit(int credit); - + private: + struct NullFrameHandler : public framing::FrameHandler { + void handle(framing::AMQFrame&) {} + }; + void init(); bool checkUnsupported(const framing::AMQBody& body); void deliverClose(); @@ -174,6 +180,8 @@ class Connection : framing::ChannelId currentChannel; boost::shared_ptr<broker::TxBuffer> txBuffer; bool expectProtocolHeader; + McastFrameHandler mcastFrameHandler; + NullFrameHandler nullFrameHandler; static qpid::sys::AtomicValue<uint64_t> catchUpId; |