diff options
author | Alan Conway <aconway@apache.org> | 2010-01-27 20:56:31 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-01-27 20:56:31 +0000 |
commit | 8c09a071c998b789c464dc0ef1eecafe6cc4cd67 (patch) | |
tree | ce00610d67d05ef0c544beab5d470721b02d074f | |
parent | cd739d3ecad88ad28f6891e9e1b119b763b53120 (diff) | |
download | qpid-python-8c09a071c998b789c464dc0ef1eecafe6cc4cd67.tar.gz |
Fix cluster elder calculation to ensure unique elder.
Race condition in the previous algorithm allowed several cluster
members to consider themselves the elder.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@903826 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp | 25 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/InitialStatusMap.h | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/MemberSet.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/MemberSet.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/types.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/tests/InitialStatusMap.cpp | 38 | ||||
-rw-r--r-- | qpid/cpp/xml/cluster.xml | 1 |
9 files changed, 68 insertions, 30 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 53100fa0c1..d398f30a86 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -189,11 +189,13 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void initialStatus(uint32_t version, bool active, const Uuid& clusterId, uint8_t storeState, const Uuid& shutdownId, - const framing::SequenceNumber& configSeq) + const framing::SequenceNumber& configSeq, + const std::string& firstConfig) { cluster.initialStatus( member, version, active, clusterId, - framing::cluster::StoreState(storeState), shutdownId, configSeq, l); + framing::cluster::StoreState(storeState), shutdownId, configSeq, + firstConfig, l); } void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& current) { cluster.configChange(member, current, l); } @@ -553,7 +555,7 @@ void Cluster::configChange ( << AddrList(joined, nJoined, "joined: ") << AddrList(left, nLeft, "left: ") << ")"); - std::string addresses; + string addresses; for (const cpg_address* p = current; p < current+nCurrent; ++p) addresses.append(MemberId(*p).str()); deliverEvent(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self)); @@ -625,7 +627,8 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& mcast.mcastControl( ClusterInitialStatusBody( ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, - store.getState(), store.getShutdownId(), store.getConfigSeq() + store.getState(), store.getShutdownId(), store.getConfigSeq(), + initMap.getFirstConfigStr() ), self); } @@ -673,6 +676,7 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ framing::cluster::StoreState store, const framing::Uuid& shutdownId, const framing::SequenceNumber& configSeq, + const std::string& firstConfig, Lock& l) { if (version != CLUSTER_VERSION) { @@ -684,7 +688,7 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ initMap.received( member, ClusterInitialStatusBody(ProtocolVersion(), version, active, id, - store, shutdownId, configSeq) + store, shutdownId, configSeq, firstConfig) ); if (initMap.transitionToComplete()) initMapCompleted(l); } diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 79b84f1172..ae3d667359 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -155,6 +155,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { framing::cluster::StoreState, const framing::Uuid& shutdownId, const framing::SequenceNumber& configSeq, + const std::string& firstConfig, Lock&); void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& current, Lock& l); diff --git a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp b/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp index c6de488a40..a1a1456618 100644 --- a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp +++ b/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp @@ -45,6 +45,7 @@ void InitialStatusMap::configChange(const MemberSet& members) { Map::iterator j = map.begin(); while (i != members.end() || j != map.end()) { if (i == members.end()) { // j not in members, member left + firstConfig.erase(j->first); Map::iterator k = j++; map.erase(k); } @@ -59,6 +60,7 @@ void InitialStatusMap::configChange(const MemberSet& members) { ++i; } else if (*i > j->first) { // j not in members, member left + firstConfig.erase(j->first); Map::iterator k = j++; map.erase(k); } @@ -83,7 +85,7 @@ bool InitialStatusMap::notInitialized(const Map::value_type& v) { return !v.second; } -bool InitialStatusMap::isComplete() { +bool InitialStatusMap::isComplete() const { return !map.empty() && find_if(map.begin(), map.end(), ¬Initialized) == map.end() && (map.size() >= size); } @@ -128,12 +130,21 @@ bool InitialStatusMap::isUpdateNeeded() { return false; } -MemberSet InitialStatusMap::getElders() { +MemberSet InitialStatusMap::getElders() const { assert(isComplete()); MemberSet elders; - // Elders are from first config change, active or higher node-id. - for (MemberSet::iterator i = firstConfig.begin(); i != firstConfig.end(); ++i) { - if (map.find(*i) != map.end() && (map[*i]->getActive() || *i > self)) + for (MemberSet::const_iterator i = firstConfig.begin(); i != firstConfig.end(); ++i) { + // *i is in my first config, so a potential elder. + if (*i == self) continue; // Not my own elder + Map::const_iterator j = map.find(*i); + assert(j != map.end()); + assert(j->second); + const Status& s = *j->second; + // If I'm not in i's first config then i is older than me. + // Otherwise we were born in the same configuration so use + // member ID to break the tie. + MemberSet iFirstConfig = decodeMemberSet(s.getFirstConfig()); + if (iFirstConfig.find(self) == iFirstConfig.end() || *i > self) elders.insert(*i); } return elders; @@ -197,5 +208,9 @@ void InitialStatusMap::checkConsistent() { throw Exception("Cannot recover, no clean store."); } +std::string InitialStatusMap::getFirstConfigStr() const { + assert(!firstConfig.empty()); + return encodeMemberSet(firstConfig); +} }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/InitialStatusMap.h b/qpid/cpp/src/qpid/cluster/InitialStatusMap.h index 40fd9ee49d..26a99fa0b0 100644 --- a/qpid/cpp/src/qpid/cluster/InitialStatusMap.h +++ b/qpid/cpp/src/qpid/cluster/InitialStatusMap.h @@ -47,11 +47,11 @@ class InitialStatusMap void received(const MemberId&, const Status& is); /**@return true if the map is complete. */ - bool isComplete(); + bool isComplete() const; /**@return true if the map was completed by the last config change or received. */ bool transitionToComplete(); /**@pre isComplete(). @return this node's elders */ - MemberSet getElders(); + MemberSet getElders() const; /**@pre isComplete(). @return True if we need an update. */ bool isUpdateNeeded(); /**@pre isComplete(). @return Cluster-wide cluster ID. */ @@ -59,6 +59,10 @@ class InitialStatusMap /**@pre isComplete(). @throw Exception if there are any inconsistencies. */ void checkConsistent(); + /** Get first config-change for this member, encoded as a string. + *@pre configChange has been called at least once. + */ + std::string getFirstConfigStr() const; private: typedef std::map<MemberId, boost::optional<Status> > Map; static bool notInitialized(const Map::value_type&); diff --git a/qpid/cpp/src/qpid/cluster/MemberSet.cpp b/qpid/cpp/src/qpid/cluster/MemberSet.cpp index 5dc148609f..0fdf4a8f96 100644 --- a/qpid/cpp/src/qpid/cluster/MemberSet.cpp +++ b/qpid/cpp/src/qpid/cluster/MemberSet.cpp @@ -25,6 +25,13 @@ namespace qpid { namespace cluster { +std::string encodeMemberSet(const MemberSet& m) { + std::string addresses; + for (MemberSet::const_iterator i = m.begin(); i != m.end(); ++i) + addresses.append(i->str()); + return addresses; +} + MemberSet decodeMemberSet(const std::string& s) { MemberSet set; for (std::string::const_iterator i = s.begin(); i < s.end(); i += 8) { diff --git a/qpid/cpp/src/qpid/cluster/MemberSet.h b/qpid/cpp/src/qpid/cluster/MemberSet.h index df3df7c319..7c97145dc1 100644 --- a/qpid/cpp/src/qpid/cluster/MemberSet.h +++ b/qpid/cpp/src/qpid/cluster/MemberSet.h @@ -31,6 +31,8 @@ namespace cluster { typedef std::set<MemberId> MemberSet; +std::string encodeMemberSet(const MemberSet&); + MemberSet decodeMemberSet(const std::string&); MemberSet intersection(const MemberSet& a, const MemberSet& b); diff --git a/qpid/cpp/src/qpid/cluster/types.h b/qpid/cpp/src/qpid/cluster/types.h index c25370b6b6..0795e5e77a 100644 --- a/qpid/cpp/src/qpid/cluster/types.h +++ b/qpid/cpp/src/qpid/cluster/types.h @@ -59,7 +59,7 @@ struct MemberId : std::pair<uint32_t, uint32_t> { uint32_t getPid() const { return second; } operator uint64_t() const { return (uint64_t(first)<<32ull) + second; } - // AsMethodBody as string, network byte order. + // MemberId as byte string, network byte order. Not human readable. std::string str() const; }; diff --git a/qpid/cpp/src/tests/InitialStatusMap.cpp b/qpid/cpp/src/tests/InitialStatusMap.cpp index 63214ee395..dc86c41103 100644 --- a/qpid/cpp/src/tests/InitialStatusMap.cpp +++ b/qpid/cpp/src/tests/InitialStatusMap.cpp @@ -36,16 +36,21 @@ QPID_AUTO_TEST_SUITE(InitialStatusMapTestSuite) typedef InitialStatusMap::Status Status; -Status activeStatus(const Uuid& id=Uuid()) { - return Status(ProtocolVersion(), 0, true, id, STORE_STATE_NO_STORE, Uuid(), 0); +Status activeStatus(const Uuid& id=Uuid(), const MemberSet& ms=MemberSet()) { + return Status(ProtocolVersion(), 0, true, id, STORE_STATE_NO_STORE, Uuid(), 0, + encodeMemberSet(ms)); } -Status newcomerStatus(const Uuid& id=Uuid()) { - return Status(ProtocolVersion(), 0, false, id, STORE_STATE_NO_STORE, Uuid(), 0); +Status newcomerStatus(const Uuid& id=Uuid(), const MemberSet& ms=MemberSet()) { + return Status(ProtocolVersion(), 0, false, id, STORE_STATE_NO_STORE, Uuid(), 0, + encodeMemberSet(ms)); } -Status storeStatus(bool active, StoreState state, Uuid start=Uuid(), Uuid stop=Uuid()) { - return Status(ProtocolVersion(), 0, active, start, state, stop, 0); +Status storeStatus(bool active, StoreState state, Uuid start=Uuid(), Uuid stop=Uuid(), + const MemberSet& ms=MemberSet()) +{ + return Status(ProtocolVersion(), 0, active, start, state, stop, 0, + encodeMemberSet(ms)); } QPID_AUTO_TEST_CASE(testFirstInCluster) { @@ -56,7 +61,7 @@ QPID_AUTO_TEST_CASE(testFirstInCluster) { MemberSet members = list_of(MemberId(0)); map.configChange(members); BOOST_CHECK(!map.isComplete()); - map.received(MemberId(0), newcomerStatus(id)); + map.received(MemberId(0), newcomerStatus(id, list_of<MemberId>(0))); BOOST_CHECK(map.isComplete()); BOOST_CHECK(map.transitionToComplete()); BOOST_CHECK(map.getElders().empty()); @@ -96,9 +101,9 @@ QPID_AUTO_TEST_CASE(testMultipleFirstInCluster) { BOOST_CHECK(map.isResendNeeded()); // All new members - map.received(MemberId(0), newcomerStatus(id)); - map.received(MemberId(1), newcomerStatus()); - map.received(MemberId(2), newcomerStatus()); + map.received(MemberId(0), newcomerStatus(id, list_of<MemberId>(0)(1)(2))); + map.received(MemberId(1), newcomerStatus(id, list_of<MemberId>(0)(1)(2))); + map.received(MemberId(2), newcomerStatus(id, list_of<MemberId>(0)(1)(2))); BOOST_CHECK(!map.isResendNeeded()); BOOST_CHECK(map.isComplete()); BOOST_CHECK(map.transitionToComplete()); @@ -108,21 +113,20 @@ QPID_AUTO_TEST_CASE(testMultipleFirstInCluster) { } QPID_AUTO_TEST_CASE(testMultipleJoinExisting) { - // Multiple members 1,2,3 join existing cluster containing 0. + // Multiple members 2,3 join simultaneously a cluster containing 0,1. InitialStatusMap map(MemberId(2), 1); // self is 2 Uuid id(true); MemberSet members = list_of(MemberId(0))(MemberId(1))(MemberId(2))(MemberId(3)); map.configChange(members); BOOST_CHECK(map.isResendNeeded()); - - map.received(MemberId(1), newcomerStatus()); - map.received(MemberId(2), newcomerStatus()); - map.received(MemberId(3), newcomerStatus()); - map.received(MemberId(0), activeStatus(id)); + map.received(MemberId(0), activeStatus(id, list_of<MemberId>(0))); + map.received(MemberId(1), newcomerStatus(id, list_of<MemberId>(0)(1))); + map.received(MemberId(2), newcomerStatus(id, list_of<MemberId>(0)(1)(2)(3))); + map.received(MemberId(3), newcomerStatus(id, list_of<MemberId>(0)(1)(2)(3))); BOOST_CHECK(!map.isResendNeeded()); BOOST_CHECK(map.isComplete()); BOOST_CHECK(map.transitionToComplete()); - BOOST_CHECK_EQUAL(map.getElders(), list_of(MemberId(0))(MemberId(3))); + BOOST_CHECK_EQUAL(map.getElders(), list_of<MemberId>(0)(1)(3)); BOOST_CHECK(map.isUpdateNeeded()); BOOST_CHECK_EQUAL(map.getClusterId(), id); } diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 0359514294..06f5478583 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -65,6 +65,7 @@ <field name="store-state" type="store-state"/> <field name="shutdown-id" type="uuid"/> <field name="config-seq" type="sequence-no"/> + <field name="first-config" type="str16"/> </control> <!-- New member or updater is ready as an active member. --> |