diff options
author | Alan Conway <aconway@apache.org> | 2009-11-25 18:36:09 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-11-25 18:36:09 +0000 |
commit | 3c1e695fe75391f8ed86c6ddbabed83220ecbd61 (patch) | |
tree | 8421c735e04057e65b54bbc3f1cfdce5e21c23c5 /cpp | |
parent | 54852d1be662a49d6e97edd50785ffa0340b9ed7 (diff) | |
download | qpid-python-3c1e695fe75391f8ed86c6ddbabed83220ecbd61.tar.gz |
Consistency checks for persistent cluster startup.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@884226 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 33 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/InitialStatusMap.cpp | 40 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/InitialStatusMap.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/StoreStatus.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/StoreStatus.h | 1 | ||||
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 49 | ||||
-rw-r--r-- | cpp/xml/cluster.xml | 6 |
8 files changed, 92 insertions, 44 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 282b639f61..fa53fc5475 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -175,7 +175,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 835547; +const uint32_t Cluster::CLUSTER_VERSION = 884125; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -202,7 +202,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { cluster.errorCheck(member, type, frameSeq, l); } - void shutdown() { cluster.shutdown(member, l); } + void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } }; @@ -287,7 +287,7 @@ void Cluster::initialize() { default: assert(0); } - QPID_LOG(notice, *this << (state == READY ? "joined" : "joining") << " cluster " << name << " with url=" << myUrl); + QPID_LOG(notice, *this << (state == READY ? " joined" : " joining") << " cluster " << name); broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); broker.setExpiryPolicy(expiryPolicy); dispatcher.start(); @@ -601,6 +601,7 @@ void Cluster::initMapCompleted(Lock& l) { // Called on completion of the initial status map. if (state == INIT) { // We have status for all members so we can make join descisions. + initMap.checkConsistent(); elders = initMap.getElders(); QPID_LOG(debug, *this << " elders: " << elders); if (!elders.empty()) { // I'm not the elder, I don't handle links & replication. @@ -611,17 +612,8 @@ void Cluster::initMapCompleted(Lock& l) { else { QPID_LOG(info, this << " active for links."); } - // Check that cluster ID matches persistent store. - Uuid agreedId = initMap.getClusterId(); - if (store.hasStore()) { - Uuid storeId = store.getClusterId(); - if (storeId && storeId != agreedId) - throw Exception( - QPID_MSG("Persistent cluster-id " << storeId - << " doesn't match cluster " << agreedId)); - store.dirty(agreedId); - } - setClusterId(agreedId, l); + setClusterId(initMap.getClusterId(), l); + if (store.hasStore()) store.dirty(clusterId); if (initMap.isUpdateNeeded()) { // Joining established cluster. broker.setRecovery(false); // Ditch my current store. @@ -822,13 +814,13 @@ void Cluster::checkUpdateIn(Lock&) { mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; discarding = false; // ok to set, we're stalled for update. - QPID_LOG(notice, *this << " update complete, starting catch-up, members: " << map); + QPID_LOG(notice, *this << " update complete, starting catch-up."); deliverEventQueue.start(); } else if (updateRetracted) { // Update was retracted, request another update updateRetracted = false; state = JOINER; - QPID_LOG(notice, *this << " update retracted, sending new update request"); + QPID_LOG(notice, *this << " update retracted, sending new update request."); mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); deliverEventQueue.start(); } @@ -853,10 +845,9 @@ void Cluster::updateOutError(const std::exception& e) { updateOutDone(l); } -void Cluster ::shutdown(const MemberId& , Lock& l) { +void Cluster ::shutdown(const MemberId& , const Uuid& id, Lock& l) { QPID_LOG(notice, *this << " cluster shut down by administrator."); - // FIXME aconway 2009-11-20: incorrect! Need to pass UUID on shutdown command. - if (store.hasStore()) store.clean(Uuid(true)); + if (store.hasStore()) store.clean(Uuid(id)); leave(l); } @@ -885,13 +876,13 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, s } void Cluster::stopClusterNode(Lock& l) { - QPID_LOG(notice, *this << " stopped by admin"); + QPID_LOG(notice, *this << " cluster member stopped by administrator."); leave(l); } void Cluster::stopFullCluster(Lock& ) { QPID_LOG(notice, *this << " shutting down cluster " << name); - mcast.mcastControl(ClusterShutdownBody(), self); + mcast.mcastControl(ClusterShutdownBody(ProtocolVersion(), Uuid(true)), self); } void Cluster::memberUpdate(Lock& l) { diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 0f931bbe29..7872588307 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -160,7 +160,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void messageExpired(const MemberId&, uint64_t, Lock& l); void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&); - void shutdown(const MemberId&, Lock&); + void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&); // Helper functions ConnectionPtr getConnection(const EventFrame&, Lock&); diff --git a/cpp/src/qpid/cluster/InitialStatusMap.cpp b/cpp/src/qpid/cluster/InitialStatusMap.cpp index 51d6140008..a5618db3e6 100644 --- a/cpp/src/qpid/cluster/InitialStatusMap.cpp +++ b/cpp/src/qpid/cluster/InitialStatusMap.cpp @@ -29,6 +29,7 @@ namespace cluster { using namespace std; using namespace boost; using namespace framing::cluster; +using namespace framing; InitialStatusMap::InitialStatusMap(const MemberId& self_, size_t size_) : self(self_), completed(), resendNeeded(), size(size_) @@ -106,7 +107,6 @@ bool InitialStatusMap::hasStore(const Map::value_type& v) { } bool InitialStatusMap::isUpdateNeeded() { - // FIXME aconway 2009-11-20: consistency checks isComplete or here? assert(isComplete()); // We need an update if there are any active members. if (find_if(map.begin(), map.end(), &isActive) != map.end()) return true; @@ -145,7 +145,43 @@ framing::Uuid InitialStatusMap::getClusterId() { if (i != map.end()) return i->second->getClusterId(); // An active member else - return map.begin()->second->getClusterId(); + return map.begin()->second->getClusterId(); // Youngest newcomer in node-id order } +void InitialStatusMap::checkConsistent() { + assert(isComplete()); + bool persistent = (map.begin()->second->getStoreState() != STORE_STATE_NO_STORE); + Uuid clusterId; + for (Map::iterator i = map.begin(); i != map.end(); ++i) { + // Must not mix transient and persistent members. + if (persistent != (i->second->getStoreState() != STORE_STATE_NO_STORE)) + throw Exception("Mixing transient and persistent brokers in a cluster"); + // Members with non-empty stores must have same cluster-id + switch (i->second->getStoreState()) { + case STORE_STATE_NO_STORE: + case STORE_STATE_EMPTY_STORE: + break; + case STORE_STATE_DIRTY_STORE: + case STORE_STATE_CLEAN_STORE: + if (!clusterId) clusterId = i->second->getClusterId(); + assert(clusterId); + if (clusterId != i->second->getClusterId()) + throw Exception("Cluster-id mismatch, brokers belonged to different clusters."); + } + } + // If this is a newly forming cluster, clean stores must have same shutdown-id + if (find_if(map.begin(), map.end(), &isActive) == map.end()) { + Uuid shutdownId; + for (Map::iterator i = map.begin(); i != map.end(); ++i) { + if (i->second->getStoreState() == STORE_STATE_CLEAN_STORE) { + if (!shutdownId) shutdownId = i->second->getShutdownId(); + assert(shutdownId); + if (shutdownId != i->second->getShutdownId()) + throw Exception("Shutdown-id mismatch, brokers were not shut down together."); + } + } + } +} + + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/InitialStatusMap.h b/cpp/src/qpid/cluster/InitialStatusMap.h index 72963ea2bb..40fd9ee49d 100644 --- a/cpp/src/qpid/cluster/InitialStatusMap.h +++ b/cpp/src/qpid/cluster/InitialStatusMap.h @@ -56,13 +56,14 @@ class InitialStatusMap bool isUpdateNeeded(); /**@pre isComplete(). @return Cluster-wide cluster ID. */ framing::Uuid getClusterId(); + /**@pre isComplete(). @throw Exception if there are any inconsistencies. */ + void checkConsistent(); private: typedef std::map<MemberId, boost::optional<Status> > Map; static bool notInitialized(const Map::value_type&); static bool isActive(const Map::value_type&); static bool hasStore(const Map::value_type&); - void check(); Map map; MemberSet firstConfig; MemberId self; diff --git a/cpp/src/qpid/cluster/StoreStatus.cpp b/cpp/src/qpid/cluster/StoreStatus.cpp index 3602ec9188..a7da3baa50 100644 --- a/cpp/src/qpid/cluster/StoreStatus.cpp +++ b/cpp/src/qpid/cluster/StoreStatus.cpp @@ -85,8 +85,6 @@ void StoreStatus::dirty(const Uuid& clusterId_) { } void StoreStatus::clean(const Uuid& shutdownId_) { - assert(clusterId); // FIXME aconway 2009-11-20: throw exception - assert(shutdownId_); state = STORE_STATE_CLEAN_STORE; shutdownId = shutdownId_; save(); diff --git a/cpp/src/qpid/cluster/StoreStatus.h b/cpp/src/qpid/cluster/StoreStatus.h index ead30b8fb8..539f46c10b 100644 --- a/cpp/src/qpid/cluster/StoreStatus.h +++ b/cpp/src/qpid/cluster/StoreStatus.h @@ -50,7 +50,6 @@ class StoreStatus void save(); bool hasStore() { return state != framing::cluster::STORE_STATE_NO_STORE; } - bool isEmpty() { return state != framing::cluster::STORE_STATE_EMPTY_STORE; } private: framing::cluster::StoreState state; diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index 1437c9e20a..78967196a9 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -122,9 +122,9 @@ class StoreTests(BrokerTest): def test_persistent_restart(self): """Verify persistent cluster shutdown/restart scenarios""" cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"]) - a = cluster.start("a", expect=EXPECT_EXIT_OK, wait_for_start=False) - b = cluster.start("b", expect=EXPECT_EXIT_OK, wait_for_start=False) - c = cluster.start("c", expect=EXPECT_EXIT_FAIL, wait_for_start=True) + a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) + b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False) + c = cluster.start("c", expect=EXPECT_EXIT_FAIL, wait=True) a.send_message("q", Message("1", durable=True)) # Kill & restart one member. c.kill() @@ -135,30 +135,30 @@ class StoreTests(BrokerTest): # Shut down the entire cluster cleanly and bring it back up a.send_message("q", Message("3", durable=True)) qpid_cluster.main(["qpid-cluster", "-kf", a.host_port()]) - a = cluster.start("a", wait_for_start=False) - b = cluster.start("b", wait_for_start=False) - c = cluster.start("c", wait_for_start=True) + a = cluster.start("a", wait=False) + b = cluster.start("b", wait=False) + c = cluster.start("c", wait=True) self.assertEqual(a.get_message("q").content, "3") def test_persistent_partial_failure(self): # Kill 2 members, shut down the last cleanly then restart # Ensure we use the clean database cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"]) - a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait_for_start=False) - b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait_for_start=False) - c = cluster.start("c", expect=EXPECT_EXIT_OK, wait_for_start=True) + a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False) + b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False) + c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=True) a.send_message("q", Message("4", durable=True)) a.kill() b.kill() self.assertEqual(c.get_message("q").content, "4") c.send_message("q", Message("clean", durable=True)) qpid_cluster.main(["qpid-cluster", "-kf", c.host_port()]) - a = cluster.start("a", wait_for_start=False) - b = cluster.start("b", wait_for_start=False) - c = cluster.start("c", wait_for_start=True) + a = cluster.start("a", wait=False) + b = cluster.start("b", wait=False) + c = cluster.start("c", wait=True) self.assertEqual(a.get_message("q").content, "clean") - def test_wrong_store_uuid(self): + def test_wrong_cluster_id(self): # Start a cluster1 broker, then try to restart in cluster2 cluster1 = self.cluster(0, args=self.args()) a = cluster1.start("a", expect=EXPECT_EXIT_OK) @@ -168,4 +168,25 @@ class StoreTests(BrokerTest): a = cluster2.start("a", expect=EXPECT_EXIT_FAIL) self.fail("Expected exception") except: pass - + + def test_wrong_shutdown_id(self): + # Start 2 members and shut down. + cluster = self.cluster(0, args=self.args()+["--cluster-size=2"]) + a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) + b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False) + self.assertEqual(0, qpid_cluster.main(["qpid_cluster", "-kf", a.host_port()])) + self.assertEqual(a.wait(), 0) + self.assertEqual(b.wait(), 0) + + # Restart with a different member and shut down. + a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) + c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=False) + self.assertEqual(0, qpid_cluster.main(["qpid_cluster", "-kf", a.host_port()])) + self.assertEqual(a.wait(), 0) + self.assertEqual(c.wait(), 0) + + # Mix members from both shutdown events, they should fail + a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False) + b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False) + + diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml index 49a1ea3638..c0b7127f68 100644 --- a/cpp/xml/cluster.xml +++ b/cpp/xml/cluster.xml @@ -92,9 +92,11 @@ <field name="type" type="error-type"/> <field name="frame-seq" type="sequence-no"/> </control> - - <control name="shutdown" code="0x20" label="Shut down entire cluster"/> + <!-- Shut down the entire cluster --> + <control name="shutdown" code="0x20"> + <field name="shutdown-id" type="uuid"/> + </control> </class> |