diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 41 |
1 files changed, 19 insertions, 22 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index cdeb89188b..320111c2e1 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -186,10 +186,12 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } void initialStatus(uint32_t version, bool active, const Uuid& clusterId, - uint8_t storeState, const Uuid& shutdownId) + uint8_t storeState, const Uuid& shutdownId, + const framing::SequenceNumber& configSeq) { - cluster.initialStatus(member, version, active, clusterId, - framing::cluster::StoreState(storeState), shutdownId, l); + cluster.initialStatus( + member, version, active, clusterId, + framing::cluster::StoreState(storeState), shutdownId, configSeq, l); } void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& current) { cluster.configChange(member, current, l); } @@ -521,28 +523,17 @@ Cluster::ConnectionVector Cluster::getConnections(Lock&) { struct AddrList { const cpg_address* addrs; int count; - const char *prefix, *suffix; - AddrList(const cpg_address* a, int n, const char* p="", const char* s="") - : addrs(a), count(n), prefix(p), suffix(s) {} + const char *prefix; + AddrList(const cpg_address* a, int n, const char* p="") + : addrs(a), count(n), prefix(p) {} }; ostream& operator<<(ostream& o, const AddrList& a) { if (!a.count) return o; o << a.prefix; - for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) { - const char* reasonString; - switch (p->reason) { - case CPG_REASON_JOIN: reasonString = "(joined) "; break; - case CPG_REASON_LEAVE: reasonString = "(left) "; break; - case CPG_REASON_NODEDOWN: reasonString = "(node-down) "; break; - case CPG_REASON_NODEUP: reasonString = "(node-up) "; break; - case CPG_REASON_PROCDOWN: reasonString = "(process-down) "; break; - default: reasonString = " "; - } - qpid::cluster::MemberId member(*p); - o << member << reasonString; - } - return o << a.suffix; + for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) + o << qpid::cluster::MemberId(*p) << " "; + return o; } void Cluster::configChange ( @@ -599,6 +590,8 @@ void Cluster::initMapCompleted(Lock& l) { } else { // I can go ready. discarding = false; + map.resetConfigSeq(); // Start from config-seq = 0 + store.setConfigSeq(map.getConfigSeq()); setReady(l); memberUpdate(l); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); @@ -618,6 +611,8 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& return; } bool memberChange = map.configChange(config); + QPID_LOG(debug, "Config sequence " << map.getConfigSeq()); + store.setConfigSeq(map.getConfigSeq()); // Update initital status for new members joining. initMap.configChange(config); @@ -625,7 +620,7 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& mcast.mcastControl( ClusterInitialStatusBody( ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, - store.getState(), store.getShutdownId() + store.getState(), store.getShutdownId(), store.getConfigSeq() ), self); } @@ -671,6 +666,7 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ const framing::Uuid& id, framing::cluster::StoreState store, const framing::Uuid& shutdownId, + const framing::SequenceNumber& configSeq, Lock& l) { if (version != CLUSTER_VERSION) { @@ -681,7 +677,8 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ } initMap.received( member, - ClusterInitialStatusBody(ProtocolVersion(), version, active, id, store, shutdownId) + ClusterInitialStatusBody(ProtocolVersion(), version, active, id, + store, shutdownId, configSeq) ); if (initMap.transitionToComplete()) initMapCompleted(l); } |