summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-05-25 18:05:54 +0000
committerAlan Conway <aconway@apache.org>2010-05-25 18:05:54 +0000
commit476baeaf52da96b370a11d3a4c570b44f9a0c7b9 (patch)
treed4ab203916672ed0caf6b3dad314169612cdf4bd /cpp/src
parent5f8bd452d15ca7f906c972cddce008624df6b831 (diff)
downloadqpid-python-476baeaf52da96b370a11d3a4c570b44f9a0c7b9.tar.gz
Fix "mismatched cluster-id" errors during start up.
Intermittent failure when starting a persistent cluster with all clean stores. Some brokers fail with: critical Unexpected error: Cluster-ID mismatch. Stores belong to different clusters. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@948143 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp47
-rw-r--r--cpp/src/qpid/cluster/Cluster.h2
2 files changed, 28 insertions, 21 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 099c3ef0ce..6b9fceccd9 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -266,7 +266,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
initMap(self, settings.size),
store(broker.getDataDir().getPath()),
elder(false),
- lastSize(0),
+ lastAliveCount(0),
lastBroker(false),
updateRetracted(false),
error(*this)
@@ -290,7 +290,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
store.load();
clusterId = store.getClusterId();
QPID_LOG(notice, "Cluster store state: " << store)
- }
+ }
cpg.join(name);
// pump the CPG dispatch manually till we get past PRE_INIT.
while (state == PRE_INIT)
@@ -326,7 +326,8 @@ void Cluster::initialize() {
mgmtObject->set_status("JOINING");
}
- // Run initMapCompleted immediately to process the initial configuration.
+ // Run initMapCompleted immediately to process the initial configuration
+ // that allowed us to transition out of PRE_INIT
assert(state == INIT);
initMapCompleted(*(Mutex::ScopedLock*)0); // Fake lock, single-threaded context.
@@ -433,7 +434,7 @@ const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) {
const ClusterConnectionAnnounceBody* castAnnounce( const framing::AMQBody *body) {
return (body && body->getMethod() &&
body->getMethod()->isA<ClusterConnectionAnnounceBody>()) ?
- static_cast<const ClusterConnectionAnnounceBody*>(body) : 0;
+ static_cast<const ClusterConnectionAnnounceBody*>(body) : 0;
}
// Handler for deliverEventQueue.
@@ -616,8 +617,8 @@ void Cluster::initMapCompleted(Lock& l) {
<< " members, waiting for at least " << initMap.getRequiredSize());
return;
}
- initMap.checkConsistent();
+ initMap.checkConsistent();
elders = initMap.getElders();
QPID_LOG(debug, *this << " elders: " << elders);
if (elders.empty())
@@ -657,11 +658,11 @@ void Cluster::configChange(const MemberId&,
MemberSet members = decodeMemberSet(membersStr);
MemberSet left = decodeMemberSet(leftStr);
MemberSet joined = decodeMemberSet(joinedStr);
- QPID_LOG(notice, *this << " Membership update: " << members);
+ QPID_LOG(notice, *this << " configuration change: " << members);
QPID_LOG_IF(notice, !left.empty(), *this << " Members left: " << left);
QPID_LOG_IF(notice, !joined.empty(), *this << " Members joined: " << joined);
- // Update initital status for members joining or leaving.
+ // If we are still joining, make sure there is someone to give us an update.
elders = intersection(elders, members);
if (elders.empty() && INIT < state && state < CATCHUP) {
QPID_LOG(critical, "Cannot update, all potential updaters left the cluster.");
@@ -882,6 +883,7 @@ void Cluster::checkUpdateIn(Lock& l) {
failoverExchange->setUrls(getUrls(l));
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
+ memberUpdate(l);
broker.setClusterUpdatee(false);
if (mAgent) mAgent->suppress(false); // Enable management output.
discarding = false; // ok to set, we're stalled for update.
@@ -908,7 +910,7 @@ void Cluster::updateOutDone(Lock& l) {
QPID_LOG(notice, *this << " update sent");
assert(state == UPDATER);
state = READY;
- deliverEventQueue.start(); // Start processing events again.
+ deliverEventQueue.start(); // Start processing events again.
makeOffer(map.firstJoiner(), l); // Try another offer
}
@@ -959,15 +961,18 @@ void Cluster::stopFullCluster(Lock& ) {
}
void Cluster::memberUpdate(Lock& l) {
+ // Ignore config changes while we are joining.
+ if (state < CATCHUP) return;
QPID_LOG(info, *this << " member update: " << map);
std::vector<Url> urls = getUrls(l);
std::vector<string> ids = getIds(l);
- size_t size = urls.size();
+ size_t aliveCount = map.aliveCount();
+ assert(map.isAlive(self));
failoverExchange->updateUrls(urls);
+ // Mark store clean if I am the only broker, dirty otherwise.
if (store.hasStore()) {
- // Mark store clean if I am the only broker, dirty otherwise.
- if (size == 1 ) {
+ if (aliveCount == 1) {
if (store.getState() != STORE_STATE_CLEAN_STORE) {
QPID_LOG(notice, *this << "Sole member of cluster, marking store clean.");
store.clean(Uuid(true));
@@ -975,26 +980,28 @@ void Cluster::memberUpdate(Lock& l) {
}
else {
if (store.getState() != STORE_STATE_DIRTY_STORE) {
- QPID_LOG(notice, "No longer sole cluster member, marking store dirty.");
+ QPID_LOG(notice, "Running in a cluster, marking store dirty.");
store.dirty();
}
}
}
- if (size == 1 && lastSize > 1 && state >= CATCHUP) {
+ // If I am the last member standing, set queue policies.
+ if (aliveCount == 1 && lastAliveCount > 1 && state >= CATCHUP) {
QPID_LOG(notice, *this << " last broker standing, update queue policies");
lastBroker = true;
broker.getQueues().updateQueueClusterState(true);
}
- else if (size > 1 && lastBroker) {
- QPID_LOG(notice, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size);
+ else if (aliveCount > 1 && lastBroker) {
+ QPID_LOG(notice, *this << " last broker standing joined by " << aliveCount-1
+ << " replicas, updating queue policies.");
lastBroker = false;
broker.getQueues().updateQueueClusterState(false);
}
- lastSize = size;
+ lastAliveCount = aliveCount;
if (mgmtObject) {
- mgmtObject->set_clusterSize(size);
+ mgmtObject->set_clusterSize(urls.size());
string urlstr;
for(std::vector<Url>::iterator iter = urls.begin(); iter != urls.end(); iter++ ) {
if (iter != urls.begin()) urlstr += ";";
@@ -1029,7 +1036,7 @@ std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
o << "cluster(" << cluster.self << " " << STATE[cluster.state];
if (cluster.error.isUnresolved()) o << "/error";
- return o << ")";;
+ return o << ")";
}
MemberId Cluster::getId() const {
@@ -1071,8 +1078,8 @@ void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) {
void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) {
QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name)
- if (state >= CATCHUP) // Pre catchup our timer isn't set up.
- timer->deliverDrop(name);
+ if (state >= CATCHUP) // Pre catchup our timer isn't set up.
+ timer->deliverDrop(name);
}
bool Cluster::isElder() const {
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 343a66428b..0d8b55cf01 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -273,7 +273,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
ClusterMap map;
MemberSet elders;
bool elder;
- size_t lastSize;
+ size_t lastAliveCount;
bool lastBroker;
sys::Thread updateThread;
boost::optional<ClusterMap> updatedMap;