diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 22 |
1 files changed, 18 insertions, 4 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 1a3f7c4ef7..9c2b4f1638 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -62,7 +62,8 @@ NoOpConnectionOutputHandler Connection::discardHandler; Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, ConnectionId myId) : cluster(c), self(myId), catchUp(false), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false) + connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false), + mcastFrameHandler(cluster.getMulticast(), self) { init(); } // Local connections @@ -70,15 +71,20 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink) : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out), connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0), - expectProtocolHeader(isLink) + expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self) { init(); } void Connection::init() { QPID_LOG(debug, cluster << " new connection: " << *this); - if (isLocalClient()) { + if (isLocalClient()) { + connection.setClusterOrderOutput(mcastFrameHandler); // Actively send cluster-order frames from local node cluster.addLocalConnection(this); giveReadCredit(cluster.getReadMax()); } + else { // Shadow or catch-up connection + connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames + connection.setClientThrottling(false); // Disable client throttling, done by active node. + } } void Connection::giveReadCredit(int credit) { @@ -143,7 +149,15 @@ void Connection::deliveredFrame(const EventFrame& f) { if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol. && !checkUnsupported(*f.frame.getBody())) // Unsupported operation. { - connection.received(const_cast<AMQFrame&>(f.frame)); // Pass to broker connection. + // FIXME aconway 2009-02-24: Using the DATA/CONTROL + // distinction to distinguish incoming vs. outgoing frames is + // very unclear. + if (f.type == DATA) // incoming data frames to broker::Connection + connection.received(const_cast<AMQFrame&>(f.frame)); + else { // outgoing data frame, send via SessionState + broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession(); + if (ss) ss->out(const_cast<AMQFrame&>(f.frame)); + } } giveReadCredit(f.readCredit); } |