diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 67 |
1 files changed, 45 insertions, 22 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 1a3f7c4ef7..aa7d082720 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -40,6 +40,7 @@ #include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" #include "qpid/sys/LatencyMetric.h" +#include "qpid/sys/AtomicValue.h" #include <boost/current_function.hpp> @@ -58,27 +59,36 @@ using namespace framing; NoOpConnectionOutputHandler Connection::discardHandler; -// Shadow connections -Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, - const std::string& wrappedId, ConnectionId myId) - : cluster(c), self(myId), catchUp(false), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false) +namespace { +sys::AtomicValue<uint64_t> idCounter; +} + +// Shadow connection +Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id) + : cluster(c), self(id), catchUp(false), output(*this, out), + connection(&output, cluster.getBroker(), logId), expectProtocolHeader(false), + mcastFrameHandler(cluster.getMulticast(), self) { init(); } -// Local connections +// Local connection Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, - const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink) - : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0), - expectProtocolHeader(isLink) + const std::string& logId, MemberId member, bool isCatchUp, bool isLink) + : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out), + connection(&output, cluster.getBroker(), logId, isLink, catchUp ? ++catchUpId : 0), + expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self) { init(); } void Connection::init() { QPID_LOG(debug, cluster << " new connection: " << *this); - if (isLocalClient()) { + if (isLocalClient()) { + connection.setClusterOrderOutput(mcastFrameHandler); // Actively send cluster-order frames from local node cluster.addLocalConnection(this); giveReadCredit(cluster.getReadMax()); } + else { // Shadow or catch-up connection + connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames + connection.setClientThrottling(false); // Disable client throttling, done by active node. + } } void Connection::giveReadCredit(int credit) { @@ -140,10 +150,16 @@ bool Connection::checkUnsupported(const AMQBody& body) { void Connection::deliveredFrame(const EventFrame& f) { assert(!catchUp); currentChannel = f.frame.getChannel(); - if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol. + if (f.frame.getBody() // frame can be emtpy with just readCredit + && !framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol. && !checkUnsupported(*f.frame.getBody())) // Unsupported operation. { - connection.received(const_cast<AMQFrame&>(f.frame)); // Pass to broker connection. + if (f.type == DATA) // incoming data frames to broker::Connection + connection.received(const_cast<AMQFrame&>(f.frame)); + else { // frame control, send frame via SessionState + broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession(); + if (ss) ss->out(const_cast<AMQFrame&>(f.frame)); + } } giveReadCredit(f.readCredit); } @@ -186,12 +202,12 @@ void Connection::left() { connection.closed(); } -// Decode data from local clients. +// ConnectoinCodec::decode receives read buffers from directly-connected clients. size_t Connection::decode(const char* buffer, size_t size) { if (catchUp) { // Handle catch-up locally. Buffer buf(const_cast<char*>(buffer), size); while (localDecoder.decode(buf)) - received(localDecoder.frame); + received(localDecoder.getFrame()); } else { // Multicast local connections. assert(isLocal()); @@ -242,6 +258,7 @@ void Connection::sessionState( const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete) { + sessionState().setState( replayStart, sendCommandPoint, @@ -253,21 +270,23 @@ void Connection::sessionState( QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); } -void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username) { - ConnectionId shadow = ConnectionId(memberId, connectionId); - QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadow); - self = shadow; +void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment) { + ConnectionId shadowId = ConnectionId(memberId, connectionId); + QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId); + self = shadowId; connection.setUserId(username); + // OK to use decoder here because we are stalled for update. + cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size()); } -void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) { +void Connection::membership(const FieldTable& joiners, const FieldTable& members) { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); - cluster.updateInDone(ClusterMap(joiners, members), frameId); + cluster.updateInDone(ClusterMap(joiners, members)); self.second = 0; // Mark this as completed update connection. } bool Connection::isLocal() const { - return self.first == cluster.getId() && self.second == this; + return self.first == cluster.getId() && self.second; } bool Connection::isShadow() const { @@ -333,6 +352,10 @@ void Connection::queuePosition(const string& qname, const SequenceNumber& positi q->setPosition(position); } +void Connection::expiryId(uint64_t id) { + cluster.getExpiryPolicy().setId(id); +} + std::ostream& operator<<(std::ostream& o, const Connection& c) { const char* type="unknown"; if (c.isLocal()) type = "local"; |