diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 52 |
1 files changed, 33 insertions, 19 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 3ce2b3f376..a375a65851 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -23,6 +23,7 @@ #include "Cluster.h" #include "UpdateReceiver.h" +#include "qpid/assert.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/SemanticState.h" #include "qpid/broker/TxBuffer.h" @@ -74,28 +75,30 @@ const std::string shadowPrefix("[shadow]"); // Shadow connection -Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, +Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, + const std::string& mgmtId, const ConnectionId& id, unsigned int ssf) : cluster(c), self(id), catchUp(false), output(*this, out), - connectionCtor(&output, cluster.getBroker(), shadowPrefix+logId, ssf), + connectionCtor(&output, cluster.getBroker(), mgmtId, ssf, false, 0, true), expectProtocolHeader(false), mcastFrameHandler(cluster.getMulticast(), self), - consumerNumbering(c.getUpdateReceiver().consumerNumbering) + updateIn(c.getUpdateReceiver()) {} // Local connection Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, - const std::string& logId, MemberId member, + const std::string& mgmtId, MemberId member, bool isCatchUp, bool isLink, unsigned int ssf ) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out), connectionCtor(&output, cluster.getBroker(), - isCatchUp ? shadowPrefix+logId : logId, + mgmtId, ssf, isLink, - isCatchUp ? ++catchUpId : 0), + isCatchUp ? ++catchUpId : 0, + isCatchUp), // isCatchUp => shadow expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self), - consumerNumbering(c.getUpdateReceiver().consumerNumbering) + updateIn(c.getUpdateReceiver()) { cluster.addLocalConnection(this); if (isLocalClient()) { @@ -104,12 +107,14 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, QPID_LOG(info, "new client connection " << *this); giveReadCredit(cluster.getSettings().readMax); cluster.getMulticast().mcastControl( - ClusterConnectionAnnounceBody(ProtocolVersion(), getSsf()), getId()); + ClusterConnectionAnnounceBody(ProtocolVersion(), mgmtId, getSsf()), getId()); } else { - // Catch-up connections initialized immediately. + // Catch-up shadow connections initialized using nextShadow id. assert(catchUp); QPID_LOG(info, "new catch-up connection " << *this); + connectionCtor.mgmtId = updateIn.nextShadowMgmtId; + updateIn.nextShadowMgmtId.clear(); init(); } } @@ -127,7 +132,6 @@ void Connection::init() { connection->setClusterOrderOutput(nullFrameHandler); // Disable client throttling, done by active node. connection->setClientThrottling(false); - connection->setShadow(); // Mark the connection as a shadow. } if (!isCatchUp()) connection->setErrorListener(this); @@ -138,8 +142,9 @@ void Connection::giveReadCredit(int credit) { output.giveReadCredit(credit); } -void Connection::announce(uint32_t ssf) { - assert(ssf == connectionCtor.ssf); +void Connection::announce(const std::string& mgmtId, uint32_t ssf) { + QPID_ASSERT(mgmtId == connectionCtor.mgmtId); + QPID_ASSERT(ssf == connectionCtor.ssf); init(); } @@ -296,13 +301,17 @@ broker::SemanticState& Connection::semanticState() { return sessionState().getSemanticState(); } +void Connection::shadowPrepare(const std::string& mgmtId) { + updateIn.nextShadowMgmtId = mgmtId; +} + void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position) { broker::SemanticState::ConsumerImpl& c = semanticState().find(name); c.position = position; c.setBlocked(blocked); if (notifyEnabled) c.enableNotify(); else c.disableNotify(); - consumerNumbering.add(c.shared_from_this()); + updateIn.consumerNumbering.add(c.shared_from_this()); } @@ -337,10 +346,15 @@ void Connection::outputTask(uint16_t channel, const std::string& name) { OutputTask* task = &session->getSemanticState().find(name); connection->getOutputTasks().addOutputTask(task); } - -void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment, uint32_t sendMax) { + +void Connection::shadowReady( + uint64_t memberId, uint64_t connectionId, const string& mgmtId, + const string& username, const string& fragment, uint32_t sendMax) +{ + QPID_ASSERT(mgmtId == getBrokerConnection().getMgmtId()); ConnectionId shadowId = ConnectionId(memberId, connectionId); - QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId); + QPID_LOG(debug, cluster << " catch-up connection " << *this + << " becomes shadow " << shadowId); self = shadowId; connection->setUserId(username); // OK to use decoder here because cluster is stalled for update. @@ -355,7 +369,7 @@ void Connection::membership(const FieldTable& joiners, const FieldTable& members { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); cluster.updateInDone(ClusterMap(joiners, members, frameSeq, configSeq)); - consumerNumbering.clear(); + updateIn.consumerNumbering.clear(); self.second = 0; // Mark this as completed update connection. } @@ -503,9 +517,9 @@ void Connection::connectionError(const std::string& msg) { } void Connection::addQueueListener(const std::string& q, uint32_t listener) { - if (listener >= consumerNumbering.size()) + if (listener >= updateIn.consumerNumbering.size()) throw Exception(QPID_MSG("Invalid listener ID: " << listener)); - findQueue(q)->getListeners().addListener(consumerNumbering[listener]); + findQueue(q)->getListeners().addListener(updateIn.consumerNumbering[listener]); } void Connection::managementSchema(const std::string& data) { |