summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp41
1 files changed, 19 insertions, 22 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index cdeb89188b..320111c2e1 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -186,10 +186,12 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
void initialStatus(uint32_t version, bool active, const Uuid& clusterId,
- uint8_t storeState, const Uuid& shutdownId)
+ uint8_t storeState, const Uuid& shutdownId,
+ const framing::SequenceNumber& configSeq)
{
- cluster.initialStatus(member, version, active, clusterId,
- framing::cluster::StoreState(storeState), shutdownId, l);
+ cluster.initialStatus(
+ member, version, active, clusterId,
+ framing::cluster::StoreState(storeState), shutdownId, configSeq, l);
}
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& current) { cluster.configChange(member, current, l); }
@@ -521,28 +523,17 @@ Cluster::ConnectionVector Cluster::getConnections(Lock&) {
struct AddrList {
const cpg_address* addrs;
int count;
- const char *prefix, *suffix;
- AddrList(const cpg_address* a, int n, const char* p="", const char* s="")
- : addrs(a), count(n), prefix(p), suffix(s) {}
+ const char *prefix;
+ AddrList(const cpg_address* a, int n, const char* p="")
+ : addrs(a), count(n), prefix(p) {}
};
ostream& operator<<(ostream& o, const AddrList& a) {
if (!a.count) return o;
o << a.prefix;
- for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) {
- const char* reasonString;
- switch (p->reason) {
- case CPG_REASON_JOIN: reasonString = "(joined) "; break;
- case CPG_REASON_LEAVE: reasonString = "(left) "; break;
- case CPG_REASON_NODEDOWN: reasonString = "(node-down) "; break;
- case CPG_REASON_NODEUP: reasonString = "(node-up) "; break;
- case CPG_REASON_PROCDOWN: reasonString = "(process-down) "; break;
- default: reasonString = " ";
- }
- qpid::cluster::MemberId member(*p);
- o << member << reasonString;
- }
- return o << a.suffix;
+ for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p)
+ o << qpid::cluster::MemberId(*p) << " ";
+ return o;
}
void Cluster::configChange (
@@ -599,6 +590,8 @@ void Cluster::initMapCompleted(Lock& l) {
}
else { // I can go ready.
discarding = false;
+ map.resetConfigSeq(); // Start from config-seq = 0
+ store.setConfigSeq(map.getConfigSeq());
setReady(l);
memberUpdate(l);
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
@@ -618,6 +611,8 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock&
return;
}
bool memberChange = map.configChange(config);
+ QPID_LOG(debug, "Config sequence " << map.getConfigSeq());
+ store.setConfigSeq(map.getConfigSeq());
// Update initital status for new members joining.
initMap.configChange(config);
@@ -625,7 +620,7 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock&
mcast.mcastControl(
ClusterInitialStatusBody(
ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
- store.getState(), store.getShutdownId()
+ store.getState(), store.getShutdownId(), store.getConfigSeq()
),
self);
}
@@ -671,6 +666,7 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ
const framing::Uuid& id,
framing::cluster::StoreState store,
const framing::Uuid& shutdownId,
+ const framing::SequenceNumber& configSeq,
Lock& l)
{
if (version != CLUSTER_VERSION) {
@@ -681,7 +677,8 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ
}
initMap.received(
member,
- ClusterInitialStatusBody(ProtocolVersion(), version, active, id, store, shutdownId)
+ ClusterInitialStatusBody(ProtocolVersion(), version, active, id,
+ store, shutdownId, configSeq)
);
if (initMap.transitionToComplete()) initMapCompleted(l);
}