summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-11-24 22:41:10 +0000
committerAlan Conway <aconway@apache.org>2009-11-24 22:41:10 +0000
commit40e40d3c5d3d3c6872d07f2e9071424cfc69ba91 (patch)
treeea601b8207e438812553b514d320a460348334f4 /qpid/cpp/src
parentd65bec1539940b8d7f871d1bdd34719a2fdb9e64 (diff)
downloadqpid-python-40e40d3c5d3d3c6872d07f2e9071424cfc69ba91.tar.gz
Verify stored cluster-id matches agreed cluster-id when joining a persistent cluster.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@883910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp48
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h5
-rw-r--r--qpid/cpp/src/qpid/cluster/StoreStatus.cpp30
-rw-r--r--qpid/cpp/src/qpid/cluster/StoreStatus.h7
-rw-r--r--qpid/cpp/src/tests/InitialStatusMap.cpp8
-rw-r--r--qpid/cpp/src/tests/StoreStatus.cpp40
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py11
7 files changed, 83 insertions, 66 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 07fdc6fc93..282b639f61 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -103,6 +103,7 @@
* done single-threaded, bypassing the normal PollableQueues because
* the Poller is not active at this point to service them.
*/
+#include "qpid/Exception.h"
#include "qpid/cluster/Cluster.h"
#include "qpid/cluster/ClusterSettings.h"
#include "qpid/cluster/Connection.h"
@@ -153,15 +154,16 @@
namespace qpid {
namespace cluster {
+using namespace qpid;
using namespace qpid::framing;
using namespace qpid::sys;
-using namespace std;
using namespace qpid::cluster;
-using namespace qpid::framing::cluster;
-using qpid::management::ManagementAgent;
-using qpid::management::ManagementObject;
-using qpid::management::Manageable;
-using qpid::management::Args;
+using namespace framing::cluster;
+using namespace std;
+using management::ManagementAgent;
+using management::ManagementObject;
+using management::Manageable;
+using management::Args;
namespace _qmf = ::qmf::org::apache::qpid::cluster;
/**
@@ -184,10 +186,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
void initialStatus(uint32_t version, bool active, const Uuid& clusterId,
- uint8_t storeState, const Uuid& start, const Uuid& stop)
+ uint8_t storeState, const Uuid& shutdownId)
{
cluster.initialStatus(member, version, active, clusterId,
- framing::cluster::StoreState(storeState), start, stop, l);
+ framing::cluster::StoreState(storeState), shutdownId, l);
}
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& current) { cluster.configChange(member, current, l); }
@@ -254,8 +256,11 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
broker.getExchanges().registerExchange(
boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
// Load my store status before we go into initialization
- if (! broker::NullMessageStore::isNullStore(&broker.getStore()))
+ if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
store.load();
+ if (store.getClusterId())
+ clusterId = store.getClusterId(); // Use stored ID if there is one.
+ }
cpg.join(name);
// Pump the CPG dispatch manually till we get initialized.
@@ -606,14 +611,18 @@ void Cluster::initMapCompleted(Lock& l) {
else {
QPID_LOG(info, this << " active for links.");
}
+ // Check that cluster ID matches persistent store.
+ Uuid agreedId = initMap.getClusterId();
+ if (store.hasStore()) {
+ Uuid storeId = store.getClusterId();
+ if (storeId && storeId != agreedId)
+ throw Exception(
+ QPID_MSG("Persistent cluster-id " << storeId
+ << " doesn't match cluster " << agreedId));
+ store.dirty(agreedId);
+ }
+ setClusterId(agreedId, l);
- setClusterId(initMap.getClusterId(), l);
- // FIXME aconway 2009-11-20: store id == cluster id.
- // Clean up redundant copy of id in InitialStatus
- // Use store ID as advertized cluster ID.
- // Consistency check on cluster ID vs. locally stored ID.
- // throw rathr than assert in StoreStatus.
- if (store.hasStore()) store.dirty(clusterId);
if (initMap.isUpdateNeeded()) { // Joining established cluster.
broker.setRecovery(false); // Ditch my current store.
state = JOINER;
@@ -645,7 +654,7 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock&
mcast.mcastControl(
ClusterInitialStatusBody(
ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
- store.getState(), store.getStart(), store.getStop()
+ store.getState(), store.getShutdownId()
),
self);
}
@@ -690,7 +699,7 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l)
void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active,
const framing::Uuid& id,
framing::cluster::StoreState store,
- const framing::Uuid& start, const framing::Uuid& end,
+ const framing::Uuid& shutdownId,
Lock& l)
{
if (version != CLUSTER_VERSION) {
@@ -701,8 +710,7 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ
}
initMap.received(
member,
- ClusterInitialStatusBody(
- ProtocolVersion(), version, active, id, store, start, end)
+ ClusterInitialStatusBody(ProtocolVersion(), version, active, id, store, shutdownId)
);
if (initMap.transitionToComplete()) {
QPID_LOG(debug, *this << " initial status map complete. ");
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index c1ee0c2be1..0f931bbe29 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -151,10 +151,9 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void initialStatus(const MemberId&,
uint32_t version,
bool active,
- const framing::Uuid& id,
+ const framing::Uuid& clusterId,
framing::cluster::StoreState,
- const framing::Uuid& start,
- const framing::Uuid& end,
+ const framing::Uuid& shutdownId,
Lock&);
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& current, Lock& l);
diff --git a/qpid/cpp/src/qpid/cluster/StoreStatus.cpp b/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
index 1c5f581ea1..3602ec9188 100644
--- a/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
+++ b/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
@@ -39,8 +39,8 @@ StoreStatus::StoreStatus(const std::string& d)
namespace {
const char* SUBDIR="cluster";
-const char* START_FILE="start";
-const char* STOP_FILE="stop";
+const char* CLUSTER_ID_FILE="cluster.uuid";
+const char* SHUTDOWN_ID_FILE="shutdown.uuid";
Uuid loadUuid(const path& path) {
Uuid ret;
@@ -62,33 +62,33 @@ void saveUuid(const path& path, const Uuid& uuid) {
void StoreStatus::load() {
path dir = path(dataDir)/SUBDIR;
create_directory(dir);
- start = loadUuid(dir/START_FILE);
- stop = loadUuid(dir/STOP_FILE);
+ clusterId = loadUuid(dir/CLUSTER_ID_FILE);
+ shutdownId = loadUuid(dir/SHUTDOWN_ID_FILE);
- if (start && stop) state = STORE_STATE_CLEAN_STORE;
- else if (start) state = STORE_STATE_DIRTY_STORE;
+ if (clusterId && shutdownId) state = STORE_STATE_CLEAN_STORE;
+ else if (clusterId) state = STORE_STATE_DIRTY_STORE;
else state = STORE_STATE_EMPTY_STORE;
}
void StoreStatus::save() {
path dir = path(dataDir)/SUBDIR;
create_directory(dir);
- saveUuid(dir/START_FILE, start);
- saveUuid(dir/STOP_FILE, stop);
+ saveUuid(dir/CLUSTER_ID_FILE, clusterId);
+ saveUuid(dir/SHUTDOWN_ID_FILE, shutdownId);
}
-void StoreStatus::dirty(const Uuid& start_) {
- start = start_;
- stop = Uuid();
+void StoreStatus::dirty(const Uuid& clusterId_) {
+ clusterId = clusterId_;
+ shutdownId = Uuid();
state = STORE_STATE_DIRTY_STORE;
save();
}
-void StoreStatus::clean(const Uuid& stop_) {
- assert(start); // FIXME aconway 2009-11-20: exception?
- assert(stop_);
+void StoreStatus::clean(const Uuid& shutdownId_) {
+ assert(clusterId); // FIXME aconway 2009-11-20: throw exception
+ assert(shutdownId_);
state = STORE_STATE_CLEAN_STORE;
- stop = stop_;
+ shutdownId = shutdownId_;
save();
}
diff --git a/qpid/cpp/src/qpid/cluster/StoreStatus.h b/qpid/cpp/src/qpid/cluster/StoreStatus.h
index b4c6bda480..ead30b8fb8 100644
--- a/qpid/cpp/src/qpid/cluster/StoreStatus.h
+++ b/qpid/cpp/src/qpid/cluster/StoreStatus.h
@@ -40,8 +40,8 @@ class StoreStatus
StoreStatus(const std::string& dir);
framing::cluster::StoreState getState() const { return state; }
- Uuid getStart() const { return start; }
- Uuid getStop() const { return stop; }
+ const Uuid& getClusterId() const { return clusterId; }
+ const Uuid& getShutdownId() const { return shutdownId; }
void dirty(const Uuid& start); // Start using the store.
void clean(const Uuid& stop); // Stop using the store.
@@ -51,9 +51,10 @@ class StoreStatus
bool hasStore() { return state != framing::cluster::STORE_STATE_NO_STORE; }
bool isEmpty() { return state != framing::cluster::STORE_STATE_EMPTY_STORE; }
+
private:
framing::cluster::StoreState state;
- Uuid start, stop;
+ Uuid clusterId, shutdownId;
std::string dataDir;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/tests/InitialStatusMap.cpp b/qpid/cpp/src/tests/InitialStatusMap.cpp
index e6a3ec1620..82450eac22 100644
--- a/qpid/cpp/src/tests/InitialStatusMap.cpp
+++ b/qpid/cpp/src/tests/InitialStatusMap.cpp
@@ -37,17 +37,15 @@ QPID_AUTO_TEST_SUITE(InitialStatusMapTestSuite)
typedef InitialStatusMap::Status Status;
Status activeStatus(const Uuid& id=Uuid()) {
- return Status(ProtocolVersion(), 0, true, id,
- STORE_STATE_NO_STORE, Uuid(), Uuid());
+ return Status(ProtocolVersion(), 0, true, id, STORE_STATE_NO_STORE, Uuid());
}
Status newcomerStatus(const Uuid& id=Uuid()) {
- return Status(ProtocolVersion(), 0, false, id,
- STORE_STATE_NO_STORE, Uuid(), Uuid());
+ return Status(ProtocolVersion(), 0, false, id, STORE_STATE_NO_STORE, Uuid());
}
Status storeStatus(bool active, StoreState state, Uuid start=Uuid(), Uuid stop=Uuid()) {
- return Status(ProtocolVersion(), 0, active, Uuid(), state, start, stop);
+ return Status(ProtocolVersion(), 0, active, start, state, stop);
}
QPID_AUTO_TEST_CASE(testFirstInCluster) {
diff --git a/qpid/cpp/src/tests/StoreStatus.cpp b/qpid/cpp/src/tests/StoreStatus.cpp
index 37ba19e34a..153e4a33db 100644
--- a/qpid/cpp/src/tests/StoreStatus.cpp
+++ b/qpid/cpp/src/tests/StoreStatus.cpp
@@ -43,64 +43,64 @@ QPID_AUTO_TEST_CASE(testLoadEmpty) {
create_directory(TEST_DIR);
StoreStatus ss(TEST_DIR);
BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_NO_STORE);
- BOOST_CHECK(!ss.getStart());
- BOOST_CHECK(!ss.getStop());
+ BOOST_CHECK(!ss.getClusterId());
+ BOOST_CHECK(!ss.getShutdownId());
ss.load();
BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_EMPTY_STORE);
- BOOST_CHECK(!ss.getStop());
+ BOOST_CHECK(!ss.getShutdownId());
remove_all(TEST_DIR);
}
QPID_AUTO_TEST_CASE(testSaveLoadDirty) {
create_directory(TEST_DIR);
- Uuid start = Uuid(true);
+ Uuid clusterId = Uuid(true);
StoreStatus ss(TEST_DIR);
ss.load();
- ss.dirty(start);
+ ss.dirty(clusterId);
BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_DIRTY_STORE);
StoreStatus ss2(TEST_DIR);
ss2.load();
BOOST_CHECK_EQUAL(ss2.getState(), STORE_STATE_DIRTY_STORE);
- BOOST_CHECK_EQUAL(ss2.getStart(), start);
- BOOST_CHECK(!ss2.getStop());
+ BOOST_CHECK_EQUAL(ss2.getClusterId(), clusterId);
+ BOOST_CHECK(!ss2.getShutdownId());
remove_all(TEST_DIR);
}
QPID_AUTO_TEST_CASE(testSaveLoadClean) {
create_directory(TEST_DIR);
- Uuid start = Uuid(true);
- Uuid stop = Uuid(true);
+ Uuid clusterId = Uuid(true);
+ Uuid shutdownId = Uuid(true);
StoreStatus ss(TEST_DIR);
ss.load();
- ss.dirty(start);
- ss.clean(stop);
+ ss.dirty(clusterId);
+ ss.clean(shutdownId);
BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_CLEAN_STORE);
StoreStatus ss2(TEST_DIR);
ss2.load();
BOOST_CHECK_EQUAL(ss2.getState(), STORE_STATE_CLEAN_STORE);
- BOOST_CHECK_EQUAL(ss2.getStart(), start);
- BOOST_CHECK_EQUAL(ss2.getStop(), stop);
+ BOOST_CHECK_EQUAL(ss2.getClusterId(), clusterId);
+ BOOST_CHECK_EQUAL(ss2.getShutdownId(), shutdownId);
remove_all(TEST_DIR);
}
QPID_AUTO_TEST_CASE(testMarkDirty) {
// Save clean then mark to dirty.
create_directory(TEST_DIR);
- Uuid start = Uuid(true);
- Uuid stop = Uuid(true);
+ Uuid clusterId = Uuid(true);
+ Uuid shutdownId = Uuid(true);
StoreStatus ss(TEST_DIR);
ss.load();
- ss.dirty(start);
- ss.clean(stop);
- ss.dirty(start);
+ ss.dirty(clusterId);
+ ss.clean(shutdownId);
+ ss.dirty(clusterId);
StoreStatus ss2(TEST_DIR);
ss2.load();
BOOST_CHECK_EQUAL(ss2.getState(), STORE_STATE_DIRTY_STORE);
- BOOST_CHECK_EQUAL(ss2.getStart(), start);
- BOOST_CHECK(!ss2.getStop());
+ BOOST_CHECK_EQUAL(ss2.getClusterId(), clusterId);
+ BOOST_CHECK(!ss2.getShutdownId());
remove_all(TEST_DIR);
}
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index f3b71d700c..1437c9e20a 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -158,3 +158,14 @@ class StoreTests(BrokerTest):
c = cluster.start("c", wait_for_start=True)
self.assertEqual(a.get_message("q").content, "clean")
+ def test_wrong_store_uuid(self):
+ # Start a cluster1 broker, then try to restart in cluster2
+ cluster1 = self.cluster(0, args=self.args())
+ a = cluster1.start("a", expect=EXPECT_EXIT_OK)
+ a.terminate()
+ cluster2 = self.cluster(1, args=self.args())
+ try:
+ a = cluster2.start("a", expect=EXPECT_EXIT_FAIL)
+ self.fail("Expected exception")
+ except: pass
+