diff options
author | Alan Conway <aconway@apache.org> | 2010-01-06 17:01:50 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-01-06 17:01:50 +0000 |
commit | e7c824871a7238697e5c534aafffee99078975cd (patch) | |
tree | 8fe2f397beb8f3b2e2fe94386dfee7aa168b2376 /cpp/src/qpid/cluster/Cluster.cpp | |
parent | de23cb5942844463a237cff18c30e5f43ea5d5d0 (diff) | |
download | qpid-python-e7c824871a7238697e5c534aafffee99078975cd.tar.gz |
Added config-seq counter to track config changes since cluster init.
Config-seq is recorded persitently to help identify best store when
recovering from total failure.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@896538 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 41 |
1 files changed, 19 insertions, 22 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index cdeb89188b..320111c2e1 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -186,10 +186,12 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } void initialStatus(uint32_t version, bool active, const Uuid& clusterId, - uint8_t storeState, const Uuid& shutdownId) + uint8_t storeState, const Uuid& shutdownId, + const framing::SequenceNumber& configSeq) { - cluster.initialStatus(member, version, active, clusterId, - framing::cluster::StoreState(storeState), shutdownId, l); + cluster.initialStatus( + member, version, active, clusterId, + framing::cluster::StoreState(storeState), shutdownId, configSeq, l); } void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& current) { cluster.configChange(member, current, l); } @@ -521,28 +523,17 @@ Cluster::ConnectionVector Cluster::getConnections(Lock&) { struct AddrList { const cpg_address* addrs; int count; - const char *prefix, *suffix; - AddrList(const cpg_address* a, int n, const char* p="", const char* s="") - : addrs(a), count(n), prefix(p), suffix(s) {} + const char *prefix; + AddrList(const cpg_address* a, int n, const char* p="") + : addrs(a), count(n), prefix(p) {} }; ostream& operator<<(ostream& o, const AddrList& a) { if (!a.count) return o; o << a.prefix; - for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) { - const char* reasonString; - switch (p->reason) { - case CPG_REASON_JOIN: reasonString = "(joined) "; break; - case CPG_REASON_LEAVE: reasonString = "(left) "; break; - case CPG_REASON_NODEDOWN: reasonString = "(node-down) "; break; - case CPG_REASON_NODEUP: reasonString = "(node-up) "; break; - case CPG_REASON_PROCDOWN: reasonString = "(process-down) "; break; - default: reasonString = " "; - } - qpid::cluster::MemberId member(*p); - o << member << reasonString; - } - return o << a.suffix; + for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) + o << qpid::cluster::MemberId(*p) << " "; + return o; } void Cluster::configChange ( @@ -599,6 +590,8 @@ void Cluster::initMapCompleted(Lock& l) { } else { // I can go ready. discarding = false; + map.resetConfigSeq(); // Start from config-seq = 0 + store.setConfigSeq(map.getConfigSeq()); setReady(l); memberUpdate(l); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); @@ -618,6 +611,8 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& return; } bool memberChange = map.configChange(config); + QPID_LOG(debug, "Config sequence " << map.getConfigSeq()); + store.setConfigSeq(map.getConfigSeq()); // Update initital status for new members joining. initMap.configChange(config); @@ -625,7 +620,7 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& mcast.mcastControl( ClusterInitialStatusBody( ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, - store.getState(), store.getShutdownId() + store.getState(), store.getShutdownId(), store.getConfigSeq() ), self); } @@ -671,6 +666,7 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ const framing::Uuid& id, framing::cluster::StoreState store, const framing::Uuid& shutdownId, + const framing::SequenceNumber& configSeq, Lock& l) { if (version != CLUSTER_VERSION) { @@ -681,7 +677,8 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ } initMap.received( member, - ClusterInitialStatusBody(ProtocolVersion(), version, active, id, store, shutdownId) + ClusterInitialStatusBody(ProtocolVersion(), version, active, id, + store, shutdownId, configSeq) ); if (initMap.transitionToComplete()) initMapCompleted(l); } |