summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp47
1 files changed, 27 insertions, 20 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 099c3ef0ce..6b9fceccd9 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -266,7 +266,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
initMap(self, settings.size),
store(broker.getDataDir().getPath()),
elder(false),
- lastSize(0),
+ lastAliveCount(0),
lastBroker(false),
updateRetracted(false),
error(*this)
@@ -290,7 +290,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
store.load();
clusterId = store.getClusterId();
QPID_LOG(notice, "Cluster store state: " << store)
- }
+ }
cpg.join(name);
// pump the CPG dispatch manually till we get past PRE_INIT.
while (state == PRE_INIT)
@@ -326,7 +326,8 @@ void Cluster::initialize() {
mgmtObject->set_status("JOINING");
}
- // Run initMapCompleted immediately to process the initial configuration.
+ // Run initMapCompleted immediately to process the initial configuration
+ // that allowed us to transition out of PRE_INIT
assert(state == INIT);
initMapCompleted(*(Mutex::ScopedLock*)0); // Fake lock, single-threaded context.
@@ -433,7 +434,7 @@ const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) {
const ClusterConnectionAnnounceBody* castAnnounce( const framing::AMQBody *body) {
return (body && body->getMethod() &&
body->getMethod()->isA<ClusterConnectionAnnounceBody>()) ?
- static_cast<const ClusterConnectionAnnounceBody*>(body) : 0;
+ static_cast<const ClusterConnectionAnnounceBody*>(body) : 0;
}
// Handler for deliverEventQueue.
@@ -616,8 +617,8 @@ void Cluster::initMapCompleted(Lock& l) {
<< " members, waiting for at least " << initMap.getRequiredSize());
return;
}
- initMap.checkConsistent();
+ initMap.checkConsistent();
elders = initMap.getElders();
QPID_LOG(debug, *this << " elders: " << elders);
if (elders.empty())
@@ -657,11 +658,11 @@ void Cluster::configChange(const MemberId&,
MemberSet members = decodeMemberSet(membersStr);
MemberSet left = decodeMemberSet(leftStr);
MemberSet joined = decodeMemberSet(joinedStr);
- QPID_LOG(notice, *this << " Membership update: " << members);
+ QPID_LOG(notice, *this << " configuration change: " << members);
QPID_LOG_IF(notice, !left.empty(), *this << " Members left: " << left);
QPID_LOG_IF(notice, !joined.empty(), *this << " Members joined: " << joined);
- // Update initital status for members joining or leaving.
+ // If we are still joining, make sure there is someone to give us an update.
elders = intersection(elders, members);
if (elders.empty() && INIT < state && state < CATCHUP) {
QPID_LOG(critical, "Cannot update, all potential updaters left the cluster.");
@@ -882,6 +883,7 @@ void Cluster::checkUpdateIn(Lock& l) {
failoverExchange->setUrls(getUrls(l));
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
+ memberUpdate(l);
broker.setClusterUpdatee(false);
if (mAgent) mAgent->suppress(false); // Enable management output.
discarding = false; // ok to set, we're stalled for update.
@@ -908,7 +910,7 @@ void Cluster::updateOutDone(Lock& l) {
QPID_LOG(notice, *this << " update sent");
assert(state == UPDATER);
state = READY;
- deliverEventQueue.start(); // Start processing events again.
+ deliverEventQueue.start(); // Start processing events again.
makeOffer(map.firstJoiner(), l); // Try another offer
}
@@ -959,15 +961,18 @@ void Cluster::stopFullCluster(Lock& ) {
}
void Cluster::memberUpdate(Lock& l) {
+ // Ignore config changes while we are joining.
+ if (state < CATCHUP) return;
QPID_LOG(info, *this << " member update: " << map);
std::vector<Url> urls = getUrls(l);
std::vector<string> ids = getIds(l);
- size_t size = urls.size();
+ size_t aliveCount = map.aliveCount();
+ assert(map.isAlive(self));
failoverExchange->updateUrls(urls);
+ // Mark store clean if I am the only broker, dirty otherwise.
if (store.hasStore()) {
- // Mark store clean if I am the only broker, dirty otherwise.
- if (size == 1 ) {
+ if (aliveCount == 1) {
if (store.getState() != STORE_STATE_CLEAN_STORE) {
QPID_LOG(notice, *this << "Sole member of cluster, marking store clean.");
store.clean(Uuid(true));
@@ -975,26 +980,28 @@ void Cluster::memberUpdate(Lock& l) {
}
else {
if (store.getState() != STORE_STATE_DIRTY_STORE) {
- QPID_LOG(notice, "No longer sole cluster member, marking store dirty.");
+ QPID_LOG(notice, "Running in a cluster, marking store dirty.");
store.dirty();
}
}
}
- if (size == 1 && lastSize > 1 && state >= CATCHUP) {
+ // If I am the last member standing, set queue policies.
+ if (aliveCount == 1 && lastAliveCount > 1 && state >= CATCHUP) {
QPID_LOG(notice, *this << " last broker standing, update queue policies");
lastBroker = true;
broker.getQueues().updateQueueClusterState(true);
}
- else if (size > 1 && lastBroker) {
- QPID_LOG(notice, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size);
+ else if (aliveCount > 1 && lastBroker) {
+ QPID_LOG(notice, *this << " last broker standing joined by " << aliveCount-1
+ << " replicas, updating queue policies.");
lastBroker = false;
broker.getQueues().updateQueueClusterState(false);
}
- lastSize = size;
+ lastAliveCount = aliveCount;
if (mgmtObject) {
- mgmtObject->set_clusterSize(size);
+ mgmtObject->set_clusterSize(urls.size());
string urlstr;
for(std::vector<Url>::iterator iter = urls.begin(); iter != urls.end(); iter++ ) {
if (iter != urls.begin()) urlstr += ";";
@@ -1029,7 +1036,7 @@ std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
o << "cluster(" << cluster.self << " " << STATE[cluster.state];
if (cluster.error.isUnresolved()) o << "/error";
- return o << ")";;
+ return o << ")";
}
MemberId Cluster::getId() const {
@@ -1071,8 +1078,8 @@ void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) {
void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) {
QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name)
- if (state >= CATCHUP) // Pre catchup our timer isn't set up.
- timer->deliverDrop(name);
+ if (state >= CATCHUP) // Pre catchup our timer isn't set up.
+ timer->deliverDrop(name);
}
bool Cluster::isElder() const {