diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.h')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 35 |
1 files changed, 18 insertions, 17 deletions
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 150c53807e..df3c035c8a 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -47,7 +47,6 @@ class Cluster; class Connection : public RefCounted, public sys::ConnectionInputHandler, - public sys::ConnectionOutputHandler, public framing::AMQP_AllOperations::ClusterConnectionHandler { @@ -60,20 +59,18 @@ class Connection : ConnectionId getId() const { return self; } broker::Connection& getBrokerConnection() { return connection; } + + /** True for connections from direct clients of local broker */ bool isLocal() const; + + /** True for connections that are shadowing remote broker connections */ bool isShadow() const { return !isLocal(); } - /** True if the connection is in "catch-up" mode: building initial state */ + /** True if the connection is in "catch-up" mode: building initial broker state. */ bool isCatchUp() const { return catchUp; } Cluster& getCluster() { return cluster; } - // ConnectionOutputHandler methods - void close() {} - void send(framing::AMQFrame&) {} - void activateOutput() {} - virtual size_t getBuffered() const { assert(0); return 0; } - // ConnectionInputHandler methods void received(framing::AMQFrame&); void closed(); @@ -85,18 +82,19 @@ class Connection : // ConnectionCodec methods size_t decode(const char* buffer, size_t size); - // Called by cluster to deliver a buffer from CPG. + // Called for data delivered from the cluster. void deliverBuffer(framing::Buffer&); - + void delivered(framing::AMQFrame&); // ==== Used in catch-up mode to build initial state. // // State dump methods. void sessionState(const SequenceNumber& replayStart, - const SequenceSet& sentIncomplete, - const SequenceNumber& expected, - const SequenceNumber& received, - const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete); + const SequenceNumber& sendCommandPoint, + const SequenceSet& sentIncomplete, + const SequenceNumber& expected, + const SequenceNumber& received, + const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete); void shadowReady(uint64_t memberId, uint64_t connectionId); @@ -108,17 +106,20 @@ class Connection : void deliverDoOutput(uint32_t requested); void sendDoOutput(); + static NoOpConnectionOutputHandler discardHandler; + Cluster& cluster; ConnectionId self; bool catchUp; - NoOpConnectionOutputHandler discardHandler; WriteEstimate writeEstimate; OutputInterceptor output; - framing::FrameDecoder decoder; + framing::FrameDecoder localDecoder; + framing::FrameDecoder mcastDecoder; broker::Connection connection; framing::SequenceNumber mcastSeq; framing::SequenceNumber deliverSeq; - + framing::ChannelId currentChannel; + friend std::ostream& operator<<(std::ostream&, const Connection&); }; |