diff options
author | Alan Conway <aconway@apache.org> | 2009-11-24 22:41:10 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-11-24 22:41:10 +0000 |
commit | 40e40d3c5d3d3c6872d07f2e9071424cfc69ba91 (patch) | |
tree | ea601b8207e438812553b514d320a460348334f4 /qpid/cpp/src | |
parent | d65bec1539940b8d7f871d1bdd34719a2fdb9e64 (diff) | |
download | qpid-python-40e40d3c5d3d3c6872d07f2e9071424cfc69ba91.tar.gz |
Verify stored cluster-id matches agreed cluster-id when joining a persistent cluster.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@883910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 48 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/StoreStatus.cpp | 30 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/StoreStatus.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/tests/InitialStatusMap.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/tests/StoreStatus.cpp | 40 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 11 |
7 files changed, 83 insertions, 66 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 07fdc6fc93..282b639f61 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -103,6 +103,7 @@ * done single-threaded, bypassing the normal PollableQueues because * the Poller is not active at this point to service them. */ +#include "qpid/Exception.h" #include "qpid/cluster/Cluster.h" #include "qpid/cluster/ClusterSettings.h" #include "qpid/cluster/Connection.h" @@ -153,15 +154,16 @@ namespace qpid { namespace cluster { +using namespace qpid; using namespace qpid::framing; using namespace qpid::sys; -using namespace std; using namespace qpid::cluster; -using namespace qpid::framing::cluster; -using qpid::management::ManagementAgent; -using qpid::management::ManagementObject; -using qpid::management::Manageable; -using qpid::management::Args; +using namespace framing::cluster; +using namespace std; +using management::ManagementAgent; +using management::ManagementObject; +using management::Manageable; +using management::Args; namespace _qmf = ::qmf::org::apache::qpid::cluster; /** @@ -184,10 +186,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } void initialStatus(uint32_t version, bool active, const Uuid& clusterId, - uint8_t storeState, const Uuid& start, const Uuid& stop) + uint8_t storeState, const Uuid& shutdownId) { cluster.initialStatus(member, version, active, clusterId, - framing::cluster::StoreState(storeState), start, stop, l); + framing::cluster::StoreState(storeState), shutdownId, l); } void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& current) { cluster.configChange(member, current, l); } @@ -254,8 +256,11 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : broker.getExchanges().registerExchange( boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); // Load my store status before we go into initialization - if (! broker::NullMessageStore::isNullStore(&broker.getStore())) + if (! broker::NullMessageStore::isNullStore(&broker.getStore())) { store.load(); + if (store.getClusterId()) + clusterId = store.getClusterId(); // Use stored ID if there is one. + } cpg.join(name); // Pump the CPG dispatch manually till we get initialized. @@ -606,14 +611,18 @@ void Cluster::initMapCompleted(Lock& l) { else { QPID_LOG(info, this << " active for links."); } + // Check that cluster ID matches persistent store. + Uuid agreedId = initMap.getClusterId(); + if (store.hasStore()) { + Uuid storeId = store.getClusterId(); + if (storeId && storeId != agreedId) + throw Exception( + QPID_MSG("Persistent cluster-id " << storeId + << " doesn't match cluster " << agreedId)); + store.dirty(agreedId); + } + setClusterId(agreedId, l); - setClusterId(initMap.getClusterId(), l); - // FIXME aconway 2009-11-20: store id == cluster id. - // Clean up redundant copy of id in InitialStatus - // Use store ID as advertized cluster ID. - // Consistency check on cluster ID vs. locally stored ID. - // throw rathr than assert in StoreStatus. - if (store.hasStore()) store.dirty(clusterId); if (initMap.isUpdateNeeded()) { // Joining established cluster. broker.setRecovery(false); // Ditch my current store. state = JOINER; @@ -645,7 +654,7 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& mcast.mcastControl( ClusterInitialStatusBody( ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, - store.getState(), store.getStart(), store.getStop() + store.getState(), store.getShutdownId() ), self); } @@ -690,7 +699,7 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active, const framing::Uuid& id, framing::cluster::StoreState store, - const framing::Uuid& start, const framing::Uuid& end, + const framing::Uuid& shutdownId, Lock& l) { if (version != CLUSTER_VERSION) { @@ -701,8 +710,7 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ } initMap.received( member, - ClusterInitialStatusBody( - ProtocolVersion(), version, active, id, store, start, end) + ClusterInitialStatusBody(ProtocolVersion(), version, active, id, store, shutdownId) ); if (initMap.transitionToComplete()) { QPID_LOG(debug, *this << " initial status map complete. "); diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index c1ee0c2be1..0f931bbe29 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -151,10 +151,9 @@ class Cluster : private Cpg::Handler, public management::Manageable { void initialStatus(const MemberId&, uint32_t version, bool active, - const framing::Uuid& id, + const framing::Uuid& clusterId, framing::cluster::StoreState, - const framing::Uuid& start, - const framing::Uuid& end, + const framing::Uuid& shutdownId, Lock&); void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& current, Lock& l); diff --git a/qpid/cpp/src/qpid/cluster/StoreStatus.cpp b/qpid/cpp/src/qpid/cluster/StoreStatus.cpp index 1c5f581ea1..3602ec9188 100644 --- a/qpid/cpp/src/qpid/cluster/StoreStatus.cpp +++ b/qpid/cpp/src/qpid/cluster/StoreStatus.cpp @@ -39,8 +39,8 @@ StoreStatus::StoreStatus(const std::string& d) namespace { const char* SUBDIR="cluster"; -const char* START_FILE="start"; -const char* STOP_FILE="stop"; +const char* CLUSTER_ID_FILE="cluster.uuid"; +const char* SHUTDOWN_ID_FILE="shutdown.uuid"; Uuid loadUuid(const path& path) { Uuid ret; @@ -62,33 +62,33 @@ void saveUuid(const path& path, const Uuid& uuid) { void StoreStatus::load() { path dir = path(dataDir)/SUBDIR; create_directory(dir); - start = loadUuid(dir/START_FILE); - stop = loadUuid(dir/STOP_FILE); + clusterId = loadUuid(dir/CLUSTER_ID_FILE); + shutdownId = loadUuid(dir/SHUTDOWN_ID_FILE); - if (start && stop) state = STORE_STATE_CLEAN_STORE; - else if (start) state = STORE_STATE_DIRTY_STORE; + if (clusterId && shutdownId) state = STORE_STATE_CLEAN_STORE; + else if (clusterId) state = STORE_STATE_DIRTY_STORE; else state = STORE_STATE_EMPTY_STORE; } void StoreStatus::save() { path dir = path(dataDir)/SUBDIR; create_directory(dir); - saveUuid(dir/START_FILE, start); - saveUuid(dir/STOP_FILE, stop); + saveUuid(dir/CLUSTER_ID_FILE, clusterId); + saveUuid(dir/SHUTDOWN_ID_FILE, shutdownId); } -void StoreStatus::dirty(const Uuid& start_) { - start = start_; - stop = Uuid(); +void StoreStatus::dirty(const Uuid& clusterId_) { + clusterId = clusterId_; + shutdownId = Uuid(); state = STORE_STATE_DIRTY_STORE; save(); } -void StoreStatus::clean(const Uuid& stop_) { - assert(start); // FIXME aconway 2009-11-20: exception? - assert(stop_); +void StoreStatus::clean(const Uuid& shutdownId_) { + assert(clusterId); // FIXME aconway 2009-11-20: throw exception + assert(shutdownId_); state = STORE_STATE_CLEAN_STORE; - stop = stop_; + shutdownId = shutdownId_; save(); } diff --git a/qpid/cpp/src/qpid/cluster/StoreStatus.h b/qpid/cpp/src/qpid/cluster/StoreStatus.h index b4c6bda480..ead30b8fb8 100644 --- a/qpid/cpp/src/qpid/cluster/StoreStatus.h +++ b/qpid/cpp/src/qpid/cluster/StoreStatus.h @@ -40,8 +40,8 @@ class StoreStatus StoreStatus(const std::string& dir); framing::cluster::StoreState getState() const { return state; } - Uuid getStart() const { return start; } - Uuid getStop() const { return stop; } + const Uuid& getClusterId() const { return clusterId; } + const Uuid& getShutdownId() const { return shutdownId; } void dirty(const Uuid& start); // Start using the store. void clean(const Uuid& stop); // Stop using the store. @@ -51,9 +51,10 @@ class StoreStatus bool hasStore() { return state != framing::cluster::STORE_STATE_NO_STORE; } bool isEmpty() { return state != framing::cluster::STORE_STATE_EMPTY_STORE; } + private: framing::cluster::StoreState state; - Uuid start, stop; + Uuid clusterId, shutdownId; std::string dataDir; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/tests/InitialStatusMap.cpp b/qpid/cpp/src/tests/InitialStatusMap.cpp index e6a3ec1620..82450eac22 100644 --- a/qpid/cpp/src/tests/InitialStatusMap.cpp +++ b/qpid/cpp/src/tests/InitialStatusMap.cpp @@ -37,17 +37,15 @@ QPID_AUTO_TEST_SUITE(InitialStatusMapTestSuite) typedef InitialStatusMap::Status Status; Status activeStatus(const Uuid& id=Uuid()) { - return Status(ProtocolVersion(), 0, true, id, - STORE_STATE_NO_STORE, Uuid(), Uuid()); + return Status(ProtocolVersion(), 0, true, id, STORE_STATE_NO_STORE, Uuid()); } Status newcomerStatus(const Uuid& id=Uuid()) { - return Status(ProtocolVersion(), 0, false, id, - STORE_STATE_NO_STORE, Uuid(), Uuid()); + return Status(ProtocolVersion(), 0, false, id, STORE_STATE_NO_STORE, Uuid()); } Status storeStatus(bool active, StoreState state, Uuid start=Uuid(), Uuid stop=Uuid()) { - return Status(ProtocolVersion(), 0, active, Uuid(), state, start, stop); + return Status(ProtocolVersion(), 0, active, start, state, stop); } QPID_AUTO_TEST_CASE(testFirstInCluster) { diff --git a/qpid/cpp/src/tests/StoreStatus.cpp b/qpid/cpp/src/tests/StoreStatus.cpp index 37ba19e34a..153e4a33db 100644 --- a/qpid/cpp/src/tests/StoreStatus.cpp +++ b/qpid/cpp/src/tests/StoreStatus.cpp @@ -43,64 +43,64 @@ QPID_AUTO_TEST_CASE(testLoadEmpty) { create_directory(TEST_DIR); StoreStatus ss(TEST_DIR); BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_NO_STORE); - BOOST_CHECK(!ss.getStart()); - BOOST_CHECK(!ss.getStop()); + BOOST_CHECK(!ss.getClusterId()); + BOOST_CHECK(!ss.getShutdownId()); ss.load(); BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_EMPTY_STORE); - BOOST_CHECK(!ss.getStop()); + BOOST_CHECK(!ss.getShutdownId()); remove_all(TEST_DIR); } QPID_AUTO_TEST_CASE(testSaveLoadDirty) { create_directory(TEST_DIR); - Uuid start = Uuid(true); + Uuid clusterId = Uuid(true); StoreStatus ss(TEST_DIR); ss.load(); - ss.dirty(start); + ss.dirty(clusterId); BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_DIRTY_STORE); StoreStatus ss2(TEST_DIR); ss2.load(); BOOST_CHECK_EQUAL(ss2.getState(), STORE_STATE_DIRTY_STORE); - BOOST_CHECK_EQUAL(ss2.getStart(), start); - BOOST_CHECK(!ss2.getStop()); + BOOST_CHECK_EQUAL(ss2.getClusterId(), clusterId); + BOOST_CHECK(!ss2.getShutdownId()); remove_all(TEST_DIR); } QPID_AUTO_TEST_CASE(testSaveLoadClean) { create_directory(TEST_DIR); - Uuid start = Uuid(true); - Uuid stop = Uuid(true); + Uuid clusterId = Uuid(true); + Uuid shutdownId = Uuid(true); StoreStatus ss(TEST_DIR); ss.load(); - ss.dirty(start); - ss.clean(stop); + ss.dirty(clusterId); + ss.clean(shutdownId); BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_CLEAN_STORE); StoreStatus ss2(TEST_DIR); ss2.load(); BOOST_CHECK_EQUAL(ss2.getState(), STORE_STATE_CLEAN_STORE); - BOOST_CHECK_EQUAL(ss2.getStart(), start); - BOOST_CHECK_EQUAL(ss2.getStop(), stop); + BOOST_CHECK_EQUAL(ss2.getClusterId(), clusterId); + BOOST_CHECK_EQUAL(ss2.getShutdownId(), shutdownId); remove_all(TEST_DIR); } QPID_AUTO_TEST_CASE(testMarkDirty) { // Save clean then mark to dirty. create_directory(TEST_DIR); - Uuid start = Uuid(true); - Uuid stop = Uuid(true); + Uuid clusterId = Uuid(true); + Uuid shutdownId = Uuid(true); StoreStatus ss(TEST_DIR); ss.load(); - ss.dirty(start); - ss.clean(stop); - ss.dirty(start); + ss.dirty(clusterId); + ss.clean(shutdownId); + ss.dirty(clusterId); StoreStatus ss2(TEST_DIR); ss2.load(); BOOST_CHECK_EQUAL(ss2.getState(), STORE_STATE_DIRTY_STORE); - BOOST_CHECK_EQUAL(ss2.getStart(), start); - BOOST_CHECK(!ss2.getStop()); + BOOST_CHECK_EQUAL(ss2.getClusterId(), clusterId); + BOOST_CHECK(!ss2.getShutdownId()); remove_all(TEST_DIR); } diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index f3b71d700c..1437c9e20a 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -158,3 +158,14 @@ class StoreTests(BrokerTest): c = cluster.start("c", wait_for_start=True) self.assertEqual(a.get_message("q").content, "clean") + def test_wrong_store_uuid(self): + # Start a cluster1 broker, then try to restart in cluster2 + cluster1 = self.cluster(0, args=self.args()) + a = cluster1.start("a", expect=EXPECT_EXIT_OK) + a.terminate() + cluster2 = self.cluster(1, args=self.args()) + try: + a = cluster2.start("a", expect=EXPECT_EXIT_FAIL) + self.fail("Expected exception") + except: pass + |