summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-01-27 20:56:31 +0000
committerAlan Conway <aconway@apache.org>2010-01-27 20:56:31 +0000
commit8c09a071c998b789c464dc0ef1eecafe6cc4cd67 (patch)
treece00610d67d05ef0c544beab5d470721b02d074f
parentcd739d3ecad88ad28f6891e9e1b119b763b53120 (diff)
downloadqpid-python-8c09a071c998b789c464dc0ef1eecafe6cc4cd67.tar.gz
Fix cluster elder calculation to ensure unique elder.
Race condition in the previous algorithm allowed several cluster members to consider themselves the elder. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@903826 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp14
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h1
-rw-r--r--qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp25
-rw-r--r--qpid/cpp/src/qpid/cluster/InitialStatusMap.h8
-rw-r--r--qpid/cpp/src/qpid/cluster/MemberSet.cpp7
-rw-r--r--qpid/cpp/src/qpid/cluster/MemberSet.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/types.h2
-rw-r--r--qpid/cpp/src/tests/InitialStatusMap.cpp38
-rw-r--r--qpid/cpp/xml/cluster.xml1
9 files changed, 68 insertions, 30 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 53100fa0c1..d398f30a86 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -189,11 +189,13 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void initialStatus(uint32_t version, bool active, const Uuid& clusterId,
uint8_t storeState, const Uuid& shutdownId,
- const framing::SequenceNumber& configSeq)
+ const framing::SequenceNumber& configSeq,
+ const std::string& firstConfig)
{
cluster.initialStatus(
member, version, active, clusterId,
- framing::cluster::StoreState(storeState), shutdownId, configSeq, l);
+ framing::cluster::StoreState(storeState), shutdownId, configSeq,
+ firstConfig, l);
}
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& current) { cluster.configChange(member, current, l); }
@@ -553,7 +555,7 @@ void Cluster::configChange (
<< AddrList(joined, nJoined, "joined: ")
<< AddrList(left, nLeft, "left: ")
<< ")");
- std::string addresses;
+ string addresses;
for (const cpg_address* p = current; p < current+nCurrent; ++p)
addresses.append(MemberId(*p).str());
deliverEvent(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self));
@@ -625,7 +627,8 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock&
mcast.mcastControl(
ClusterInitialStatusBody(
ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
- store.getState(), store.getShutdownId(), store.getConfigSeq()
+ store.getState(), store.getShutdownId(), store.getConfigSeq(),
+ initMap.getFirstConfigStr()
),
self);
}
@@ -673,6 +676,7 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ
framing::cluster::StoreState store,
const framing::Uuid& shutdownId,
const framing::SequenceNumber& configSeq,
+ const std::string& firstConfig,
Lock& l)
{
if (version != CLUSTER_VERSION) {
@@ -684,7 +688,7 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ
initMap.received(
member,
ClusterInitialStatusBody(ProtocolVersion(), version, active, id,
- store, shutdownId, configSeq)
+ store, shutdownId, configSeq, firstConfig)
);
if (initMap.transitionToComplete()) initMapCompleted(l);
}
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index 79b84f1172..ae3d667359 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -155,6 +155,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
framing::cluster::StoreState,
const framing::Uuid& shutdownId,
const framing::SequenceNumber& configSeq,
+ const std::string& firstConfig,
Lock&);
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& current, Lock& l);
diff --git a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp b/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
index c6de488a40..a1a1456618 100644
--- a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
+++ b/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
@@ -45,6 +45,7 @@ void InitialStatusMap::configChange(const MemberSet& members) {
Map::iterator j = map.begin();
while (i != members.end() || j != map.end()) {
if (i == members.end()) { // j not in members, member left
+ firstConfig.erase(j->first);
Map::iterator k = j++;
map.erase(k);
}
@@ -59,6 +60,7 @@ void InitialStatusMap::configChange(const MemberSet& members) {
++i;
}
else if (*i > j->first) { // j not in members, member left
+ firstConfig.erase(j->first);
Map::iterator k = j++;
map.erase(k);
}
@@ -83,7 +85,7 @@ bool InitialStatusMap::notInitialized(const Map::value_type& v) {
return !v.second;
}
-bool InitialStatusMap::isComplete() {
+bool InitialStatusMap::isComplete() const {
return !map.empty() && find_if(map.begin(), map.end(), &notInitialized) == map.end()
&& (map.size() >= size);
}
@@ -128,12 +130,21 @@ bool InitialStatusMap::isUpdateNeeded() {
return false;
}
-MemberSet InitialStatusMap::getElders() {
+MemberSet InitialStatusMap::getElders() const {
assert(isComplete());
MemberSet elders;
- // Elders are from first config change, active or higher node-id.
- for (MemberSet::iterator i = firstConfig.begin(); i != firstConfig.end(); ++i) {
- if (map.find(*i) != map.end() && (map[*i]->getActive() || *i > self))
+ for (MemberSet::const_iterator i = firstConfig.begin(); i != firstConfig.end(); ++i) {
+ // *i is in my first config, so a potential elder.
+ if (*i == self) continue; // Not my own elder
+ Map::const_iterator j = map.find(*i);
+ assert(j != map.end());
+ assert(j->second);
+ const Status& s = *j->second;
+ // If I'm not in i's first config then i is older than me.
+ // Otherwise we were born in the same configuration so use
+ // member ID to break the tie.
+ MemberSet iFirstConfig = decodeMemberSet(s.getFirstConfig());
+ if (iFirstConfig.find(self) == iFirstConfig.end() || *i > self)
elders.insert(*i);
}
return elders;
@@ -197,5 +208,9 @@ void InitialStatusMap::checkConsistent() {
throw Exception("Cannot recover, no clean store.");
}
+std::string InitialStatusMap::getFirstConfigStr() const {
+ assert(!firstConfig.empty());
+ return encodeMemberSet(firstConfig);
+}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/InitialStatusMap.h b/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
index 40fd9ee49d..26a99fa0b0 100644
--- a/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
+++ b/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
@@ -47,11 +47,11 @@ class InitialStatusMap
void received(const MemberId&, const Status& is);
/**@return true if the map is complete. */
- bool isComplete();
+ bool isComplete() const;
/**@return true if the map was completed by the last config change or received. */
bool transitionToComplete();
/**@pre isComplete(). @return this node's elders */
- MemberSet getElders();
+ MemberSet getElders() const;
/**@pre isComplete(). @return True if we need an update. */
bool isUpdateNeeded();
/**@pre isComplete(). @return Cluster-wide cluster ID. */
@@ -59,6 +59,10 @@ class InitialStatusMap
/**@pre isComplete(). @throw Exception if there are any inconsistencies. */
void checkConsistent();
+ /** Get first config-change for this member, encoded as a string.
+ *@pre configChange has been called at least once.
+ */
+ std::string getFirstConfigStr() const;
private:
typedef std::map<MemberId, boost::optional<Status> > Map;
static bool notInitialized(const Map::value_type&);
diff --git a/qpid/cpp/src/qpid/cluster/MemberSet.cpp b/qpid/cpp/src/qpid/cluster/MemberSet.cpp
index 5dc148609f..0fdf4a8f96 100644
--- a/qpid/cpp/src/qpid/cluster/MemberSet.cpp
+++ b/qpid/cpp/src/qpid/cluster/MemberSet.cpp
@@ -25,6 +25,13 @@
namespace qpid {
namespace cluster {
+std::string encodeMemberSet(const MemberSet& m) {
+ std::string addresses;
+ for (MemberSet::const_iterator i = m.begin(); i != m.end(); ++i)
+ addresses.append(i->str());
+ return addresses;
+}
+
MemberSet decodeMemberSet(const std::string& s) {
MemberSet set;
for (std::string::const_iterator i = s.begin(); i < s.end(); i += 8) {
diff --git a/qpid/cpp/src/qpid/cluster/MemberSet.h b/qpid/cpp/src/qpid/cluster/MemberSet.h
index df3df7c319..7c97145dc1 100644
--- a/qpid/cpp/src/qpid/cluster/MemberSet.h
+++ b/qpid/cpp/src/qpid/cluster/MemberSet.h
@@ -31,6 +31,8 @@ namespace cluster {
typedef std::set<MemberId> MemberSet;
+std::string encodeMemberSet(const MemberSet&);
+
MemberSet decodeMemberSet(const std::string&);
MemberSet intersection(const MemberSet& a, const MemberSet& b);
diff --git a/qpid/cpp/src/qpid/cluster/types.h b/qpid/cpp/src/qpid/cluster/types.h
index c25370b6b6..0795e5e77a 100644
--- a/qpid/cpp/src/qpid/cluster/types.h
+++ b/qpid/cpp/src/qpid/cluster/types.h
@@ -59,7 +59,7 @@ struct MemberId : std::pair<uint32_t, uint32_t> {
uint32_t getPid() const { return second; }
operator uint64_t() const { return (uint64_t(first)<<32ull) + second; }
- // AsMethodBody as string, network byte order.
+ // MemberId as byte string, network byte order. Not human readable.
std::string str() const;
};
diff --git a/qpid/cpp/src/tests/InitialStatusMap.cpp b/qpid/cpp/src/tests/InitialStatusMap.cpp
index 63214ee395..dc86c41103 100644
--- a/qpid/cpp/src/tests/InitialStatusMap.cpp
+++ b/qpid/cpp/src/tests/InitialStatusMap.cpp
@@ -36,16 +36,21 @@ QPID_AUTO_TEST_SUITE(InitialStatusMapTestSuite)
typedef InitialStatusMap::Status Status;
-Status activeStatus(const Uuid& id=Uuid()) {
- return Status(ProtocolVersion(), 0, true, id, STORE_STATE_NO_STORE, Uuid(), 0);
+Status activeStatus(const Uuid& id=Uuid(), const MemberSet& ms=MemberSet()) {
+ return Status(ProtocolVersion(), 0, true, id, STORE_STATE_NO_STORE, Uuid(), 0,
+ encodeMemberSet(ms));
}
-Status newcomerStatus(const Uuid& id=Uuid()) {
- return Status(ProtocolVersion(), 0, false, id, STORE_STATE_NO_STORE, Uuid(), 0);
+Status newcomerStatus(const Uuid& id=Uuid(), const MemberSet& ms=MemberSet()) {
+ return Status(ProtocolVersion(), 0, false, id, STORE_STATE_NO_STORE, Uuid(), 0,
+ encodeMemberSet(ms));
}
-Status storeStatus(bool active, StoreState state, Uuid start=Uuid(), Uuid stop=Uuid()) {
- return Status(ProtocolVersion(), 0, active, start, state, stop, 0);
+Status storeStatus(bool active, StoreState state, Uuid start=Uuid(), Uuid stop=Uuid(),
+ const MemberSet& ms=MemberSet())
+{
+ return Status(ProtocolVersion(), 0, active, start, state, stop, 0,
+ encodeMemberSet(ms));
}
QPID_AUTO_TEST_CASE(testFirstInCluster) {
@@ -56,7 +61,7 @@ QPID_AUTO_TEST_CASE(testFirstInCluster) {
MemberSet members = list_of(MemberId(0));
map.configChange(members);
BOOST_CHECK(!map.isComplete());
- map.received(MemberId(0), newcomerStatus(id));
+ map.received(MemberId(0), newcomerStatus(id, list_of<MemberId>(0)));
BOOST_CHECK(map.isComplete());
BOOST_CHECK(map.transitionToComplete());
BOOST_CHECK(map.getElders().empty());
@@ -96,9 +101,9 @@ QPID_AUTO_TEST_CASE(testMultipleFirstInCluster) {
BOOST_CHECK(map.isResendNeeded());
// All new members
- map.received(MemberId(0), newcomerStatus(id));
- map.received(MemberId(1), newcomerStatus());
- map.received(MemberId(2), newcomerStatus());
+ map.received(MemberId(0), newcomerStatus(id, list_of<MemberId>(0)(1)(2)));
+ map.received(MemberId(1), newcomerStatus(id, list_of<MemberId>(0)(1)(2)));
+ map.received(MemberId(2), newcomerStatus(id, list_of<MemberId>(0)(1)(2)));
BOOST_CHECK(!map.isResendNeeded());
BOOST_CHECK(map.isComplete());
BOOST_CHECK(map.transitionToComplete());
@@ -108,21 +113,20 @@ QPID_AUTO_TEST_CASE(testMultipleFirstInCluster) {
}
QPID_AUTO_TEST_CASE(testMultipleJoinExisting) {
- // Multiple members 1,2,3 join existing cluster containing 0.
+ // Multiple members 2,3 join simultaneously a cluster containing 0,1.
InitialStatusMap map(MemberId(2), 1); // self is 2
Uuid id(true);
MemberSet members = list_of(MemberId(0))(MemberId(1))(MemberId(2))(MemberId(3));
map.configChange(members);
BOOST_CHECK(map.isResendNeeded());
-
- map.received(MemberId(1), newcomerStatus());
- map.received(MemberId(2), newcomerStatus());
- map.received(MemberId(3), newcomerStatus());
- map.received(MemberId(0), activeStatus(id));
+ map.received(MemberId(0), activeStatus(id, list_of<MemberId>(0)));
+ map.received(MemberId(1), newcomerStatus(id, list_of<MemberId>(0)(1)));
+ map.received(MemberId(2), newcomerStatus(id, list_of<MemberId>(0)(1)(2)(3)));
+ map.received(MemberId(3), newcomerStatus(id, list_of<MemberId>(0)(1)(2)(3)));
BOOST_CHECK(!map.isResendNeeded());
BOOST_CHECK(map.isComplete());
BOOST_CHECK(map.transitionToComplete());
- BOOST_CHECK_EQUAL(map.getElders(), list_of(MemberId(0))(MemberId(3)));
+ BOOST_CHECK_EQUAL(map.getElders(), list_of<MemberId>(0)(1)(3));
BOOST_CHECK(map.isUpdateNeeded());
BOOST_CHECK_EQUAL(map.getClusterId(), id);
}
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 0359514294..06f5478583 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -65,6 +65,7 @@
<field name="store-state" type="store-state"/>
<field name="shutdown-id" type="uuid"/>
<field name="config-seq" type="sequence-no"/>
+ <field name="first-config" type="str16"/>
</control>
<!-- New member or updater is ready as an active member. -->