summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-11-25 18:36:09 +0000
committerAlan Conway <aconway@apache.org>2009-11-25 18:36:09 +0000
commit3c1e695fe75391f8ed86c6ddbabed83220ecbd61 (patch)
tree8421c735e04057e65b54bbc3f1cfdce5e21c23c5 /cpp
parent54852d1be662a49d6e97edd50785ffa0340b9ed7 (diff)
downloadqpid-python-3c1e695fe75391f8ed86c6ddbabed83220ecbd61.tar.gz
Consistency checks for persistent cluster startup.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@884226 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp33
-rw-r--r--cpp/src/qpid/cluster/Cluster.h2
-rw-r--r--cpp/src/qpid/cluster/InitialStatusMap.cpp40
-rw-r--r--cpp/src/qpid/cluster/InitialStatusMap.h3
-rw-r--r--cpp/src/qpid/cluster/StoreStatus.cpp2
-rw-r--r--cpp/src/qpid/cluster/StoreStatus.h1
-rwxr-xr-xcpp/src/tests/cluster_tests.py49
-rw-r--r--cpp/xml/cluster.xml6
8 files changed, 92 insertions, 44 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 282b639f61..fa53fc5475 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -175,7 +175,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 835547;
+const uint32_t Cluster::CLUSTER_VERSION = 884125;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -202,7 +202,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
cluster.errorCheck(member, type, frameSeq, l);
}
- void shutdown() { cluster.shutdown(member, l); }
+ void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
};
@@ -287,7 +287,7 @@ void Cluster::initialize() {
default:
assert(0);
}
- QPID_LOG(notice, *this << (state == READY ? "joined" : "joining") << " cluster " << name << " with url=" << myUrl);
+ QPID_LOG(notice, *this << (state == READY ? " joined" : " joining") << " cluster " << name);
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
@@ -601,6 +601,7 @@ void Cluster::initMapCompleted(Lock& l) {
// Called on completion of the initial status map.
if (state == INIT) {
// We have status for all members so we can make join descisions.
+ initMap.checkConsistent();
elders = initMap.getElders();
QPID_LOG(debug, *this << " elders: " << elders);
if (!elders.empty()) { // I'm not the elder, I don't handle links & replication.
@@ -611,17 +612,8 @@ void Cluster::initMapCompleted(Lock& l) {
else {
QPID_LOG(info, this << " active for links.");
}
- // Check that cluster ID matches persistent store.
- Uuid agreedId = initMap.getClusterId();
- if (store.hasStore()) {
- Uuid storeId = store.getClusterId();
- if (storeId && storeId != agreedId)
- throw Exception(
- QPID_MSG("Persistent cluster-id " << storeId
- << " doesn't match cluster " << agreedId));
- store.dirty(agreedId);
- }
- setClusterId(agreedId, l);
+ setClusterId(initMap.getClusterId(), l);
+ if (store.hasStore()) store.dirty(clusterId);
if (initMap.isUpdateNeeded()) { // Joining established cluster.
broker.setRecovery(false); // Ditch my current store.
@@ -822,13 +814,13 @@ void Cluster::checkUpdateIn(Lock&) {
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
discarding = false; // ok to set, we're stalled for update.
- QPID_LOG(notice, *this << " update complete, starting catch-up, members: " << map);
+ QPID_LOG(notice, *this << " update complete, starting catch-up.");
deliverEventQueue.start();
}
else if (updateRetracted) { // Update was retracted, request another update
updateRetracted = false;
state = JOINER;
- QPID_LOG(notice, *this << " update retracted, sending new update request");
+ QPID_LOG(notice, *this << " update retracted, sending new update request.");
mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
deliverEventQueue.start();
}
@@ -853,10 +845,9 @@ void Cluster::updateOutError(const std::exception& e) {
updateOutDone(l);
}
-void Cluster ::shutdown(const MemberId& , Lock& l) {
+void Cluster ::shutdown(const MemberId& , const Uuid& id, Lock& l) {
QPID_LOG(notice, *this << " cluster shut down by administrator.");
- // FIXME aconway 2009-11-20: incorrect! Need to pass UUID on shutdown command.
- if (store.hasStore()) store.clean(Uuid(true));
+ if (store.hasStore()) store.clean(Uuid(id));
leave(l);
}
@@ -885,13 +876,13 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, s
}
void Cluster::stopClusterNode(Lock& l) {
- QPID_LOG(notice, *this << " stopped by admin");
+ QPID_LOG(notice, *this << " cluster member stopped by administrator.");
leave(l);
}
void Cluster::stopFullCluster(Lock& ) {
QPID_LOG(notice, *this << " shutting down cluster " << name);
- mcast.mcastControl(ClusterShutdownBody(), self);
+ mcast.mcastControl(ClusterShutdownBody(ProtocolVersion(), Uuid(true)), self);
}
void Cluster::memberUpdate(Lock& l) {
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 0f931bbe29..7872588307 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -160,7 +160,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void messageExpired(const MemberId&, uint64_t, Lock& l);
void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
- void shutdown(const MemberId&, Lock&);
+ void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&);
// Helper functions
ConnectionPtr getConnection(const EventFrame&, Lock&);
diff --git a/cpp/src/qpid/cluster/InitialStatusMap.cpp b/cpp/src/qpid/cluster/InitialStatusMap.cpp
index 51d6140008..a5618db3e6 100644
--- a/cpp/src/qpid/cluster/InitialStatusMap.cpp
+++ b/cpp/src/qpid/cluster/InitialStatusMap.cpp
@@ -29,6 +29,7 @@ namespace cluster {
using namespace std;
using namespace boost;
using namespace framing::cluster;
+using namespace framing;
InitialStatusMap::InitialStatusMap(const MemberId& self_, size_t size_)
: self(self_), completed(), resendNeeded(), size(size_)
@@ -106,7 +107,6 @@ bool InitialStatusMap::hasStore(const Map::value_type& v) {
}
bool InitialStatusMap::isUpdateNeeded() {
- // FIXME aconway 2009-11-20: consistency checks isComplete or here?
assert(isComplete());
// We need an update if there are any active members.
if (find_if(map.begin(), map.end(), &isActive) != map.end()) return true;
@@ -145,7 +145,43 @@ framing::Uuid InitialStatusMap::getClusterId() {
if (i != map.end())
return i->second->getClusterId(); // An active member
else
- return map.begin()->second->getClusterId();
+ return map.begin()->second->getClusterId(); // Youngest newcomer in node-id order
}
+void InitialStatusMap::checkConsistent() {
+ assert(isComplete());
+ bool persistent = (map.begin()->second->getStoreState() != STORE_STATE_NO_STORE);
+ Uuid clusterId;
+ for (Map::iterator i = map.begin(); i != map.end(); ++i) {
+ // Must not mix transient and persistent members.
+ if (persistent != (i->second->getStoreState() != STORE_STATE_NO_STORE))
+ throw Exception("Mixing transient and persistent brokers in a cluster");
+ // Members with non-empty stores must have same cluster-id
+ switch (i->second->getStoreState()) {
+ case STORE_STATE_NO_STORE:
+ case STORE_STATE_EMPTY_STORE:
+ break;
+ case STORE_STATE_DIRTY_STORE:
+ case STORE_STATE_CLEAN_STORE:
+ if (!clusterId) clusterId = i->second->getClusterId();
+ assert(clusterId);
+ if (clusterId != i->second->getClusterId())
+ throw Exception("Cluster-id mismatch, brokers belonged to different clusters.");
+ }
+ }
+ // If this is a newly forming cluster, clean stores must have same shutdown-id
+ if (find_if(map.begin(), map.end(), &isActive) == map.end()) {
+ Uuid shutdownId;
+ for (Map::iterator i = map.begin(); i != map.end(); ++i) {
+ if (i->second->getStoreState() == STORE_STATE_CLEAN_STORE) {
+ if (!shutdownId) shutdownId = i->second->getShutdownId();
+ assert(shutdownId);
+ if (shutdownId != i->second->getShutdownId())
+ throw Exception("Shutdown-id mismatch, brokers were not shut down together.");
+ }
+ }
+ }
+}
+
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/InitialStatusMap.h b/cpp/src/qpid/cluster/InitialStatusMap.h
index 72963ea2bb..40fd9ee49d 100644
--- a/cpp/src/qpid/cluster/InitialStatusMap.h
+++ b/cpp/src/qpid/cluster/InitialStatusMap.h
@@ -56,13 +56,14 @@ class InitialStatusMap
bool isUpdateNeeded();
/**@pre isComplete(). @return Cluster-wide cluster ID. */
framing::Uuid getClusterId();
+ /**@pre isComplete(). @throw Exception if there are any inconsistencies. */
+ void checkConsistent();
private:
typedef std::map<MemberId, boost::optional<Status> > Map;
static bool notInitialized(const Map::value_type&);
static bool isActive(const Map::value_type&);
static bool hasStore(const Map::value_type&);
- void check();
Map map;
MemberSet firstConfig;
MemberId self;
diff --git a/cpp/src/qpid/cluster/StoreStatus.cpp b/cpp/src/qpid/cluster/StoreStatus.cpp
index 3602ec9188..a7da3baa50 100644
--- a/cpp/src/qpid/cluster/StoreStatus.cpp
+++ b/cpp/src/qpid/cluster/StoreStatus.cpp
@@ -85,8 +85,6 @@ void StoreStatus::dirty(const Uuid& clusterId_) {
}
void StoreStatus::clean(const Uuid& shutdownId_) {
- assert(clusterId); // FIXME aconway 2009-11-20: throw exception
- assert(shutdownId_);
state = STORE_STATE_CLEAN_STORE;
shutdownId = shutdownId_;
save();
diff --git a/cpp/src/qpid/cluster/StoreStatus.h b/cpp/src/qpid/cluster/StoreStatus.h
index ead30b8fb8..539f46c10b 100644
--- a/cpp/src/qpid/cluster/StoreStatus.h
+++ b/cpp/src/qpid/cluster/StoreStatus.h
@@ -50,7 +50,6 @@ class StoreStatus
void save();
bool hasStore() { return state != framing::cluster::STORE_STATE_NO_STORE; }
- bool isEmpty() { return state != framing::cluster::STORE_STATE_EMPTY_STORE; }
private:
framing::cluster::StoreState state;
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index 1437c9e20a..78967196a9 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -122,9 +122,9 @@ class StoreTests(BrokerTest):
def test_persistent_restart(self):
"""Verify persistent cluster shutdown/restart scenarios"""
cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"])
- a = cluster.start("a", expect=EXPECT_EXIT_OK, wait_for_start=False)
- b = cluster.start("b", expect=EXPECT_EXIT_OK, wait_for_start=False)
- c = cluster.start("c", expect=EXPECT_EXIT_FAIL, wait_for_start=True)
+ a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
+ b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
+ c = cluster.start("c", expect=EXPECT_EXIT_FAIL, wait=True)
a.send_message("q", Message("1", durable=True))
# Kill & restart one member.
c.kill()
@@ -135,30 +135,30 @@ class StoreTests(BrokerTest):
# Shut down the entire cluster cleanly and bring it back up
a.send_message("q", Message("3", durable=True))
qpid_cluster.main(["qpid-cluster", "-kf", a.host_port()])
- a = cluster.start("a", wait_for_start=False)
- b = cluster.start("b", wait_for_start=False)
- c = cluster.start("c", wait_for_start=True)
+ a = cluster.start("a", wait=False)
+ b = cluster.start("b", wait=False)
+ c = cluster.start("c", wait=True)
self.assertEqual(a.get_message("q").content, "3")
def test_persistent_partial_failure(self):
# Kill 2 members, shut down the last cleanly then restart
# Ensure we use the clean database
cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"])
- a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait_for_start=False)
- b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait_for_start=False)
- c = cluster.start("c", expect=EXPECT_EXIT_OK, wait_for_start=True)
+ a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
+ b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False)
+ c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=True)
a.send_message("q", Message("4", durable=True))
a.kill()
b.kill()
self.assertEqual(c.get_message("q").content, "4")
c.send_message("q", Message("clean", durable=True))
qpid_cluster.main(["qpid-cluster", "-kf", c.host_port()])
- a = cluster.start("a", wait_for_start=False)
- b = cluster.start("b", wait_for_start=False)
- c = cluster.start("c", wait_for_start=True)
+ a = cluster.start("a", wait=False)
+ b = cluster.start("b", wait=False)
+ c = cluster.start("c", wait=True)
self.assertEqual(a.get_message("q").content, "clean")
- def test_wrong_store_uuid(self):
+ def test_wrong_cluster_id(self):
# Start a cluster1 broker, then try to restart in cluster2
cluster1 = self.cluster(0, args=self.args())
a = cluster1.start("a", expect=EXPECT_EXIT_OK)
@@ -168,4 +168,25 @@ class StoreTests(BrokerTest):
a = cluster2.start("a", expect=EXPECT_EXIT_FAIL)
self.fail("Expected exception")
except: pass
-
+
+ def test_wrong_shutdown_id(self):
+ # Start 2 members and shut down.
+ cluster = self.cluster(0, args=self.args()+["--cluster-size=2"])
+ a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
+ b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
+ self.assertEqual(0, qpid_cluster.main(["qpid_cluster", "-kf", a.host_port()]))
+ self.assertEqual(a.wait(), 0)
+ self.assertEqual(b.wait(), 0)
+
+ # Restart with a different member and shut down.
+ a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
+ c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=False)
+ self.assertEqual(0, qpid_cluster.main(["qpid_cluster", "-kf", a.host_port()]))
+ self.assertEqual(a.wait(), 0)
+ self.assertEqual(c.wait(), 0)
+
+ # Mix members from both shutdown events, they should fail
+ a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
+ b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False)
+
+
diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml
index 49a1ea3638..c0b7127f68 100644
--- a/cpp/xml/cluster.xml
+++ b/cpp/xml/cluster.xml
@@ -92,9 +92,11 @@
<field name="type" type="error-type"/>
<field name="frame-seq" type="sequence-no"/>
</control>
-
- <control name="shutdown" code="0x20" label="Shut down entire cluster"/>
+ <!-- Shut down the entire cluster -->
+ <control name="shutdown" code="0x20">
+ <field name="shutdown-id" type="uuid"/>
+ </control>
</class>