diff options
author | Alan Conway <aconway@apache.org> | 2010-05-25 18:05:54 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-05-25 18:05:54 +0000 |
commit | 476baeaf52da96b370a11d3a4c570b44f9a0c7b9 (patch) | |
tree | d4ab203916672ed0caf6b3dad314169612cdf4bd /cpp/src | |
parent | 5f8bd452d15ca7f906c972cddce008624df6b831 (diff) | |
download | qpid-python-476baeaf52da96b370a11d3a4c570b44f9a0c7b9.tar.gz |
Fix "mismatched cluster-id" errors during start up.
Intermittent failure when starting a persistent cluster with all clean stores.
Some brokers fail with:
critical Unexpected error: Cluster-ID mismatch. Stores belong to different clusters.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@948143 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 47 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 2 |
2 files changed, 28 insertions, 21 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 099c3ef0ce..6b9fceccd9 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -266,7 +266,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : initMap(self, settings.size), store(broker.getDataDir().getPath()), elder(false), - lastSize(0), + lastAliveCount(0), lastBroker(false), updateRetracted(false), error(*this) @@ -290,7 +290,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : store.load(); clusterId = store.getClusterId(); QPID_LOG(notice, "Cluster store state: " << store) - } + } cpg.join(name); // pump the CPG dispatch manually till we get past PRE_INIT. while (state == PRE_INIT) @@ -326,7 +326,8 @@ void Cluster::initialize() { mgmtObject->set_status("JOINING"); } - // Run initMapCompleted immediately to process the initial configuration. + // Run initMapCompleted immediately to process the initial configuration + // that allowed us to transition out of PRE_INIT assert(state == INIT); initMapCompleted(*(Mutex::ScopedLock*)0); // Fake lock, single-threaded context. @@ -433,7 +434,7 @@ const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) { const ClusterConnectionAnnounceBody* castAnnounce( const framing::AMQBody *body) { return (body && body->getMethod() && body->getMethod()->isA<ClusterConnectionAnnounceBody>()) ? - static_cast<const ClusterConnectionAnnounceBody*>(body) : 0; + static_cast<const ClusterConnectionAnnounceBody*>(body) : 0; } // Handler for deliverEventQueue. @@ -616,8 +617,8 @@ void Cluster::initMapCompleted(Lock& l) { << " members, waiting for at least " << initMap.getRequiredSize()); return; } - initMap.checkConsistent(); + initMap.checkConsistent(); elders = initMap.getElders(); QPID_LOG(debug, *this << " elders: " << elders); if (elders.empty()) @@ -657,11 +658,11 @@ void Cluster::configChange(const MemberId&, MemberSet members = decodeMemberSet(membersStr); MemberSet left = decodeMemberSet(leftStr); MemberSet joined = decodeMemberSet(joinedStr); - QPID_LOG(notice, *this << " Membership update: " << members); + QPID_LOG(notice, *this << " configuration change: " << members); QPID_LOG_IF(notice, !left.empty(), *this << " Members left: " << left); QPID_LOG_IF(notice, !joined.empty(), *this << " Members joined: " << joined); - // Update initital status for members joining or leaving. + // If we are still joining, make sure there is someone to give us an update. elders = intersection(elders, members); if (elders.empty() && INIT < state && state < CATCHUP) { QPID_LOG(critical, "Cannot update, all potential updaters left the cluster."); @@ -882,6 +883,7 @@ void Cluster::checkUpdateIn(Lock& l) { failoverExchange->setUrls(getUrls(l)); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; + memberUpdate(l); broker.setClusterUpdatee(false); if (mAgent) mAgent->suppress(false); // Enable management output. discarding = false; // ok to set, we're stalled for update. @@ -908,7 +910,7 @@ void Cluster::updateOutDone(Lock& l) { QPID_LOG(notice, *this << " update sent"); assert(state == UPDATER); state = READY; - deliverEventQueue.start(); // Start processing events again. + deliverEventQueue.start(); // Start processing events again. makeOffer(map.firstJoiner(), l); // Try another offer } @@ -959,15 +961,18 @@ void Cluster::stopFullCluster(Lock& ) { } void Cluster::memberUpdate(Lock& l) { + // Ignore config changes while we are joining. + if (state < CATCHUP) return; QPID_LOG(info, *this << " member update: " << map); std::vector<Url> urls = getUrls(l); std::vector<string> ids = getIds(l); - size_t size = urls.size(); + size_t aliveCount = map.aliveCount(); + assert(map.isAlive(self)); failoverExchange->updateUrls(urls); + // Mark store clean if I am the only broker, dirty otherwise. if (store.hasStore()) { - // Mark store clean if I am the only broker, dirty otherwise. - if (size == 1 ) { + if (aliveCount == 1) { if (store.getState() != STORE_STATE_CLEAN_STORE) { QPID_LOG(notice, *this << "Sole member of cluster, marking store clean."); store.clean(Uuid(true)); @@ -975,26 +980,28 @@ void Cluster::memberUpdate(Lock& l) { } else { if (store.getState() != STORE_STATE_DIRTY_STORE) { - QPID_LOG(notice, "No longer sole cluster member, marking store dirty."); + QPID_LOG(notice, "Running in a cluster, marking store dirty."); store.dirty(); } } } - if (size == 1 && lastSize > 1 && state >= CATCHUP) { + // If I am the last member standing, set queue policies. + if (aliveCount == 1 && lastAliveCount > 1 && state >= CATCHUP) { QPID_LOG(notice, *this << " last broker standing, update queue policies"); lastBroker = true; broker.getQueues().updateQueueClusterState(true); } - else if (size > 1 && lastBroker) { - QPID_LOG(notice, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size); + else if (aliveCount > 1 && lastBroker) { + QPID_LOG(notice, *this << " last broker standing joined by " << aliveCount-1 + << " replicas, updating queue policies."); lastBroker = false; broker.getQueues().updateQueueClusterState(false); } - lastSize = size; + lastAliveCount = aliveCount; if (mgmtObject) { - mgmtObject->set_clusterSize(size); + mgmtObject->set_clusterSize(urls.size()); string urlstr; for(std::vector<Url>::iterator iter = urls.begin(); iter != urls.end(); iter++ ) { if (iter != urls.begin()) urlstr += ";"; @@ -1029,7 +1036,7 @@ std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1); o << "cluster(" << cluster.self << " " << STATE[cluster.state]; if (cluster.error.isUnresolved()) o << "/error"; - return o << ")";; + return o << ")"; } MemberId Cluster::getId() const { @@ -1071,8 +1078,8 @@ void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) { void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) { QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name) - if (state >= CATCHUP) // Pre catchup our timer isn't set up. - timer->deliverDrop(name); + if (state >= CATCHUP) // Pre catchup our timer isn't set up. + timer->deliverDrop(name); } bool Cluster::isElder() const { diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 343a66428b..0d8b55cf01 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -273,7 +273,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { ClusterMap map; MemberSet elders; bool elder; - size_t lastSize; + size_t lastAliveCount; bool lastBroker; sys::Thread updateThread; boost::optional<ClusterMap> updatedMap; |