diff options
author | Alan Conway <aconway@apache.org> | 2009-11-17 18:09:21 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-11-17 18:09:21 +0000 |
commit | 1f91e22255a62a408d5fc82e0d0ed8c6260a49f7 (patch) | |
tree | 6a5646b695f6aba7915f017d1d6b06e17dd2ee95 /qpid/cpp/src | |
parent | 20e76d9894129f94f58a6b4794f64dbb4ddf8820 (diff) | |
download | qpid-python-1f91e22255a62a408d5fc82e0d0ed8c6260a49f7.tar.gz |
Integrated InitialStatusMap into cluster code.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@881423 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/cluster.mk | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 118 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.h | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ClusterMap.cpp | 81 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ClusterMap.h | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cpg.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ErrorCheck.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ErrorCheck.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp | 38 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/InitialStatusMap.h | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/MemberSet.cpp | 51 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/MemberSet.h | 43 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/types.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/framing/AMQContentBody.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/tests/InitialStatusMap.cpp | 41 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 10 |
16 files changed, 305 insertions, 147 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index 6e9bd27698..a2ec661ccf 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -84,6 +84,8 @@ cluster_la_SOURCES = \ qpid/cluster/Quorum.h \ qpid/cluster/InitialStatusMap.h \ qpid/cluster/InitialStatusMap.cpp \ + qpid/cluster/MemberSet.h \ + qpid/cluster/MemberSet.cpp \ qpid/cluster/types.h cluster_la_LIBADD= -lcpg $(libcman) libqpidbroker.la libqpidclient.la diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index d6312e7b93..9756ad0a62 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -162,13 +162,14 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {} void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } - void initialStatus(bool active, bool persistent, const framing::FieldTable& props) { - cluster.initialStatus(member, active, persistent, props); + void initialStatus(bool active, bool persistent, const Uuid& clusterId, + uint32_t version, const std::string& url) { + cluster.initialStatus(member, active, persistent, clusterId, version, url, l); } void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& current) { cluster.configChange(member, current, l); } - void updateOffer(uint64_t updatee, const Uuid& id, uint32_t version) { - cluster.updateOffer(member, updatee, id, version, l); + void updateOffer(uint64_t updatee) { + cluster.updateOffer(member, updatee, l); } void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); } void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } @@ -190,6 +191,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : name(settings.name), myUrl(settings.url.empty() ? Url() : Url(settings.url)), self(cpg.self()), + clusterId(true), expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())), mcast(cpg, poller, boost::bind(&Cluster::leave, this)), dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), @@ -206,6 +208,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : decoder(boost::bind(&Cluster::deliverFrame, this, _1)), discarding(true), state(INIT), + initMap(self), lastSize(0), lastBroker(false), updateRetracted(false), @@ -265,8 +268,8 @@ void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) { // Called in connection thread to insert an updated shadow connection. void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) { QPID_LOG(info, *this << " new shadow connection " << c->getId()); - // Safe to use connections here because we're pre-catchup, either - // discarding or stalled, so deliveredFrame is not processing any + // Safe to use connections here because we're pre-catchup, stalled + // and discarding, so deliveredFrame is not processing any // connection events. assert(discarding); pair<ConnectionMap::iterator, bool> ib @@ -522,7 +525,8 @@ void Cluster::configChange ( const cpg_address *joined, int nJoined) { Mutex::ScopedLock l(lock); - if (state == INIT) { // First config change. + if (state == INIT) { + // FIXME aconway 2009-11-16: persistent restart // Recover only if we are first in cluster. broker.setRecovery(nCurrent == 1); initialized = true; @@ -545,39 +549,55 @@ void Cluster::setReady(Lock&) { broker.getQueueEvents().enable(); } -void Cluster::configChange(const MemberId&, const std::string& current, Lock& l) { - bool memberChange = map.configChange(current); - QPID_LOG(debug, *this << " applied config change: " << map); +void Cluster::initMapCompleted(Lock& l) { + if (state == INIT) { + elders = initMap.getElders(); + if (!elders.empty()) { // I'm not the elder, I don't handle links & replication. + broker.getLinks().setPassive(true); + broker.getQueueEvents().disable(); + } + setClusterId(initMap.getClusterId(), l); + if (initMap.isUpdateNeeded()) { // Joining established cluster. + state = JOINER; + mcast.mcastControl( + ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); + } + else { // I can go ready. + QPID_LOG(notice, *this << " ready."); + discarding = false; + setReady(l); + map = ClusterMap(initMap.getMemberUrls()); + memberUpdate(l); + } + } +} + +void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& l) { if (state == LEFT) return; - - if (!map.isAlive(self)) { // Final config change. + + MemberSet config = decodeMemberSet(configStr); + elders = intersection(elders, config); + if (elders.empty() && INIT < state && state < CATCHUP) { + QPID_LOG(critical, "Cannot update, all potential updaters left the cluster."); leave(l); return; } + bool memberChange = map.configChange(config); - if (state == INIT) { // First configChange - if (map.aliveCount() == 1) { - setClusterId(true, l); - discarding = false; - setReady(l); - map = ClusterMap(self, myUrl, true); - memberUpdate(l); - QPID_LOG(notice, *this << " first in cluster"); - } - else { // Joining established group. - state = JOINER; - mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); - elders = map.getAlive(); - elders.erase(self); - broker.getLinks().setPassive(true); - broker.getQueueEvents().disable(); - } - } - else if (state >= CATCHUP && memberChange) { + // Update initital status for new members joining. + initMap.configChange(config); + if (initMap.isResendNeeded()) { + mcast.mcastControl( + // FIXME aconway 2009-11-17: persistent restart, set persistence bit. + ClusterInitialStatusBody(ProtocolVersion(), state > INIT, false, clusterId, + CLUSTER_VERSION, myUrl.str()), self); + } + if (initMap.transitionToComplete()) initMapCompleted(l); + + if (state >= CATCHUP && memberChange) { memberUpdate(l); - elders = ClusterMap::intersection(elders, map.getAlive()); if (elders.empty()) { - //assume we are oldest, reactive links if necessary + // We are the oldest, reactive links if necessary broker.getLinks().setPassive(false); } } @@ -587,8 +607,7 @@ void Cluster::makeOffer(const MemberId& id, Lock& ) { if (state == READY && map.isJoiner(id)) { state = OFFER; QPID_LOG(info, *this << " send update-offer to " << id); - mcast.mcastControl( - ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId, CLUSTER_VERSION), self); + mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id), self); } } @@ -610,10 +629,23 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) makeOffer(id, l); } -void Cluster::initialStatus(const MemberId&, bool /*active*/, bool /*persistent*/, - const framing::FieldTable&) { - // FIXME aconway 2009-11-12: fill in. +void Cluster::initialStatus(const MemberId& member, bool active, bool persistent, + const framing::Uuid& id, uint32_t version, + const std::string& url, Lock& l) +{ + if (version != CLUSTER_VERSION) { + QPID_LOG(critical, *this << " incompatible cluster versions " << + version << " != " << CLUSTER_VERSION); + leave(l); + return; + } + initMap.received( + member, + ClusterInitialStatusBody(ProtocolVersion(), active, persistent, id, version, url) + ); + if (initMap.transitionToComplete()) initMapCompleted(l); } + void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { if (map.ready(id, Url(url))) memberUpdate(l); @@ -623,17 +655,10 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { } } -void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, - uint32_t version, Lock& l) { +void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) { // NOTE: deliverEventQueue has been stopped at the update offer by // deliveredEvent in case an update is required. if (state == LEFT) return; - if (version != CLUSTER_VERSION) { - QPID_LOG(critical, *this << " incompatible cluster versions " << - version << " != " << CLUSTER_VERSION); - leave(l); - return; - } MemberId updatee(updateeInt); boost::optional<Url> url = map.updateOffer(updater, updatee); if (updater == self) { @@ -649,7 +674,6 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu } else if (updatee == self && url) { assert(state == JOINER); - setClusterId(uuid, l); state = UPDATEE; QPID_LOG(notice, *this << " receiving update from " << updater); checkUpdateIn(l); diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 751a71867d..aff703c081 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -19,6 +19,7 @@ * */ +#include "InitialStatusMap.h" #include "ClusterMap.h" #include "ClusterSettings.h" #include "Cpg.h" @@ -144,10 +145,11 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Cluster controls implement XML methods from cluster.xml. void updateRequest(const MemberId&, const std::string&, Lock&); - void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, - uint32_t version, Lock&); + void updateOffer(const MemberId& updater, uint64_t updatee, Lock&); void retractOffer(const MemberId& updater, uint64_t updatee, Lock&); - void initialStatus(const MemberId&, bool active, bool persistent, const framing::FieldTable& props); + void initialStatus(const MemberId&, bool active, bool persistent, + const framing::Uuid& id, uint32_t version, + const std::string& url, Lock&); void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& current, Lock& l); void messageExpired(const MemberId&, uint64_t, Lock& l); @@ -165,6 +167,10 @@ class Cluster : private Cpg::Handler, public management::Manageable { void setClusterId(const framing::Uuid&, Lock&); void erase(const ConnectionId&, Lock&); + void initMapCompleted(Lock&); + + + // == Called in CPG dispatch thread void deliver( // CPG deliver callback. cpg_handle_t /*handle*/, @@ -241,7 +247,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Local cluster state, cluster map enum { - INIT, ///< Initial state, no CPG messages received. + INIT, ///< Establishing inital cluster stattus. JOINER, ///< Sent update request, waiting for update offer. UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete. CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event. @@ -252,8 +258,9 @@ class Cluster : private Cpg::Handler, public management::Manageable { } state; ConnectionMap connections; + InitialStatusMap initMap; ClusterMap map; - ClusterMap::Set elders; + MemberSet elders; size_t lastSize; bool lastBroker; sys::Thread updateThread; diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp index ca5237d6b1..8cac470ef3 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp @@ -28,24 +28,20 @@ #include <iterator> #include <ostream> +using namespace std; +using namespace boost; + namespace qpid { using namespace framing; namespace cluster { -ClusterMap::Set ClusterMap::decode(const std::string& s) { - Set set; - for (std::string::const_iterator i = s.begin(); i < s.end(); i += 8) - set.insert(MemberId(std::string(i, i+8))); - return set; -} - namespace { void addFieldTableValue(FieldTable::ValueMap::value_type vt, ClusterMap::Map& map, ClusterMap::Set& set) { MemberId id(vt.first); set.insert(id); - std::string url = vt.second->get<std::string>(); + string url = vt.second->get<string>(); if (!url.empty()) map.insert(ClusterMap::Map::value_type(id, Url(url))); } @@ -56,37 +52,34 @@ void insertFieldTableFromMapValue(FieldTable& ft, const ClusterMap::Map::value_t void assignFieldTable(FieldTable& ft, const ClusterMap::Map& map) { ft.clear(); - std::for_each(map.begin(), map.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(ft), _1)); + for_each(map.begin(), map.end(), bind(&insertFieldTableFromMapValue, ref(ft), _1)); } } ClusterMap::ClusterMap() : frameSeq(0) {} -ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) : frameSeq(0) { - alive.insert(id); - if (isMember) - members[id] = url; - else - joiners[id] = url; +ClusterMap::ClusterMap(const Map& map) : frameSeq(0) { + transform(map.begin(), map.end(), inserter(alive, alive.begin()), bind(&Map::value_type::first, _1)); + members = map; } ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, framing::SequenceNumber frameSeq_) : frameSeq(frameSeq_) { - std::for_each(joinersFt.begin(), joinersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(joiners), boost::ref(alive))); - std::for_each(membersFt.begin(), membersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(members), boost::ref(alive))); + for_each(joinersFt.begin(), joinersFt.end(), bind(&addFieldTableValue, _1, ref(joiners), ref(alive))); + for_each(membersFt.begin(), membersFt.end(), bind(&addFieldTableValue, _1, ref(members), ref(alive))); } void ClusterMap::toMethodBody(framing::ClusterConnectionMembershipBody& b) const { b.getJoiners().clear(); - std::for_each(joiners.begin(), joiners.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getJoiners()), _1)); + for_each(joiners.begin(), joiners.end(), bind(&insertFieldTableFromMapValue, ref(b.getJoiners()), _1)); for(Set::const_iterator i = alive.begin(); i != alive.end(); ++i) { if (!isMember(*i) && !isJoiner(*i)) - b.getJoiners().setString(i->str(), std::string()); + b.getJoiners().setString(i->str(), string()); } b.getMembers().clear(); - std::for_each(members.begin(), members.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getMembers()), _1)); + for_each(members.begin(), members.end(), bind(&insertFieldTableFromMapValue, ref(b.getMembers()), _1)); b.setFrameSeq(frameSeq); } @@ -99,21 +92,21 @@ MemberId ClusterMap::firstJoiner() const { return joiners.empty() ? MemberId() : joiners.begin()->first; } -std::vector<string> ClusterMap::memberIds() const { - std::vector<string> ids; +vector<string> ClusterMap::memberIds() const { + vector<string> ids; for (Map::const_iterator iter = members.begin(); iter != members.end(); iter++) { - std::stringstream stream; + stringstream stream; stream << iter->first; ids.push_back(stream.str()); } return ids; } -std::vector<Url> ClusterMap::memberUrls() const { - std::vector<Url> urls(members.size()); - std::transform(members.begin(), members.end(), urls.begin(), - boost::bind(&Map::value_type::second, _1)); +vector<Url> ClusterMap::memberUrls() const { + vector<Url> urls(members.size()); + transform(members.begin(), members.end(), urls.begin(), + bind(&Map::value_type::second, _1)); return urls; } @@ -121,18 +114,18 @@ ClusterMap::Set ClusterMap::getAlive() const { return alive; } ClusterMap::Set ClusterMap::getMembers() const { Set s; - std::transform(members.begin(), members.end(), std::inserter(s, s.begin()), - boost::bind(&Map::value_type::first, _1)); + transform(members.begin(), members.end(), inserter(s, s.begin()), + bind(&Map::value_type::first, _1)); return s; } -std::ostream& operator<<(std::ostream& o, const ClusterMap::Map& m) { - std::ostream_iterator<MemberId> oi(o); - std::transform(m.begin(), m.end(), oi, boost::bind(&ClusterMap::Map::value_type::first, _1)); +ostream& operator<<(ostream& o, const ClusterMap::Map& m) { + ostream_iterator<MemberId> oi(o); + transform(m.begin(), m.end(), oi, bind(&ClusterMap::Map::value_type::first, _1)); return o; } -std::ostream& operator<<(std::ostream& o, const ClusterMap& m) { +ostream& operator<<(ostream& o, const ClusterMap& m) { for (ClusterMap::Set::const_iterator i = m.alive.begin(); i != m.alive.end(); ++i) { o << *i; if (m.isMember(*i)) o << "(member)"; @@ -143,7 +136,7 @@ std::ostream& operator<<(std::ostream& o, const ClusterMap& m) { return o; } -bool ClusterMap::updateRequest(const MemberId& id, const std::string& url) { +bool ClusterMap::updateRequest(const MemberId& id, const string& url) { if (isAlive(id)) { joiners[id] = Url(url); return true; @@ -155,13 +148,12 @@ bool ClusterMap::ready(const MemberId& id, const Url& url) { return isAlive(id) && members.insert(Map::value_type(id,url)).second; } -bool ClusterMap::configChange(const std::string& addresses) { +bool ClusterMap::configChange(const Set& update) { bool memberChange = false; - Set update = decode(addresses); Set removed; - std::set_difference(alive.begin(), alive.end(), + set_difference(alive.begin(), alive.end(), update.begin(), update.end(), - std::inserter(removed, removed.begin())); + inserter(removed, removed.begin())); alive = update; for (Set::const_iterator i = removed.begin(); i != removed.end(); ++i) { memberChange = memberChange || members.erase(*i); @@ -170,23 +162,14 @@ bool ClusterMap::configChange(const std::string& addresses) { return memberChange; } -boost::optional<Url> ClusterMap::updateOffer(const MemberId& from, const MemberId& to) { +optional<Url> ClusterMap::updateOffer(const MemberId& from, const MemberId& to) { Map::iterator i = joiners.find(to); if (isAlive(from) && i != joiners.end()) { Url url= i->second; joiners.erase(i); // No longer a potential updatee. return url; } - return boost::optional<Url>(); + return optional<Url>(); } -ClusterMap::Set ClusterMap::intersection(const ClusterMap::Set& a, const ClusterMap::Set& b) -{ - Set intersection; - std::set_intersection(a.begin(), a.end(), - b.begin(), b.end(), - std::inserter(intersection, intersection.begin())); - return intersection; - -} }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.h b/qpid/cpp/src/qpid/cluster/ClusterMap.h index 2e682a6f4a..98572813a8 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterMap.h +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.h @@ -22,7 +22,7 @@ * */ -#include "qpid/cluster/types.h" +#include "MemberSet.h" #include "qpid/Url.h" #include "qpid/framing/ClusterConnectionMembershipBody.h" #include "qpid/framing/SequenceNumber.h" @@ -47,16 +47,14 @@ class ClusterMap { typedef std::map<MemberId, Url> Map; typedef std::set<MemberId> Set; - static Set decode(const std::string&); - ClusterMap(); - ClusterMap(const MemberId& id, const Url& url, bool isReady); + ClusterMap(const Map& map); ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, framing::SequenceNumber frameSeq); /** Update from config change. *@return true if member set changed. */ - bool configChange(const std::string& addresses); + bool configChange(const Set& members); bool isJoiner(const MemberId& id) const { return joiners.find(id) != joiners.end(); } bool isMember(const MemberId& id) const { return members.find(id) != members.end(); } @@ -85,11 +83,6 @@ class ClusterMap { /**@return true If this is a new member */ bool ready(const MemberId& id, const Url&); - /** - * Utility method to return intersection of two member sets - */ - static Set intersection(const Set& a, const Set& b); - framing::SequenceNumber getFrameSeq() { return frameSeq; } framing::SequenceNumber incrementFrameSeq() { return ++frameSeq; } diff --git a/qpid/cpp/src/qpid/cluster/Cpg.cpp b/qpid/cpp/src/qpid/cluster/Cpg.cpp index 49a814b848..5efa91f11c 100644 --- a/qpid/cpp/src/qpid/cluster/Cpg.cpp +++ b/qpid/cpp/src/qpid/cluster/Cpg.cpp @@ -105,7 +105,7 @@ Cpg::Cpg(Handler& h) : IOHandle(new sys::IOHandlePrivate), handler(h), isShutdow QPID_LOG(notice, "Initializing CPG"); cpg_error_t err = cpg_initialize(&handle, &callbacks); - int retries = 6; // FIXME aconway 2009-08-06: configure, use same config for cman connection. + int retries = 6; // FIXME aconway 2009-08-06: make this configurable. while (err == CPG_ERR_TRY_AGAIN && --retries) { QPID_LOG(notice, "Re-trying CPG initialization."); sys::sleep(5); diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp index 5b7011047b..d66db8551d 100644 --- a/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp +++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp @@ -39,11 +39,6 @@ ErrorCheck::ErrorCheck(Cluster& c) : cluster(c), mcast(c.getMulticast()), frameSeq(0), type(ERROR_TYPE_NONE), connection(0) {} -ostream& operator<<(ostream& o, const ErrorCheck::MemberSet& ms) { - copy(ms.begin(), ms.end(), ostream_iterator<MemberId>(o, " ")); - return o; -} - void ErrorCheck::error( Connection& c, ErrorType t, framing::SequenceNumber seq, const MemberSet& ms, const std::string& msg) @@ -115,7 +110,7 @@ ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator& const ClusterConfigChangeBody* configChange = static_cast<const ClusterConfigChangeBody*>(method); if (configChange) { - MemberSet members(ClusterMap::decode(configChange->getCurrent())); + MemberSet members(decodeMemberSet(configChange->getCurrent())); QPID_LOG(debug, cluster << " apply config change to error " << frameSeq << ": " << members); MemberSet intersect; diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.h b/qpid/cpp/src/qpid/cluster/ErrorCheck.h index c975b9af64..de8cedafb3 100644 --- a/qpid/cpp/src/qpid/cluster/ErrorCheck.h +++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.h @@ -22,7 +22,7 @@ * */ -#include "qpid/cluster/types.h" +#include "qpid/cluster/MemberSet.h" #include "qpid/cluster/Multicaster.h" #include "qpid/framing/enum.h" #include "qpid/framing/SequenceNumber.h" @@ -34,7 +34,6 @@ namespace qpid { namespace cluster { class EventFrame; -class ClusterMap; class Cluster; class Multicaster; class Connection; @@ -48,7 +47,6 @@ class Connection; class ErrorCheck { public: - typedef std::set<MemberId> MemberSet; typedef framing::cluster::ErrorType ErrorType; typedef framing::SequenceNumber SequenceNumber; diff --git a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp b/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp index 8c55a66ed7..6d27b3ae72 100644 --- a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp +++ b/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp @@ -29,13 +29,12 @@ namespace qpid { namespace cluster { InitialStatusMap::InitialStatusMap(const MemberId& self_) - : self(self_), complete(), updateNeeded(), resendNeeded() -{ - map[self] = optional<Status>(); -} + : self(self_), completed(), resendNeeded() +{} void InitialStatusMap::configChange(const MemberSet& members) { resendNeeded = false; + bool wasComplete = isComplete(); if (firstConfig.empty()) firstConfig = members; MemberSet::const_iterator i = members.begin(); Map::iterator j = map.begin(); @@ -66,10 +65,13 @@ void InitialStatusMap::configChange(const MemberSet& members) { for (Map::iterator i = map.begin(); i != map.end(); ++i) i->second = optional<Status>(); } + completed = isComplete() && !wasComplete; // Set completed on the transition. } void InitialStatusMap::received(const MemberId& m, const Status& s){ + bool wasComplete = isComplete(); map[m] = s; + completed = isComplete() && !wasComplete; // Set completed on the transition. } bool InitialStatusMap::notInitialized(const Map::value_type& v) { @@ -81,7 +83,11 @@ bool InitialStatusMap::isActive(const Map::value_type& v) { } bool InitialStatusMap::isComplete() { - return find_if(map.begin(), map.end(), ¬Initialized) == map.end(); + return !map.empty() && find_if(map.begin(), map.end(), ¬Initialized) == map.end(); +} + +bool InitialStatusMap::transitionToComplete() { + return completed; } bool InitialStatusMap::isResendNeeded() { @@ -107,4 +113,26 @@ MemberSet InitialStatusMap::getElders() { return elders; } +// Get cluster ID from an active member or the youngest newcomer. +framing::Uuid InitialStatusMap::getClusterId() { + assert(isComplete()); + assert(!map.empty()); + Map::iterator i = find_if(map.begin(), map.end(), &isActive); + if (i != map.end()) + return i->second->getClusterId(); // An active member + else + return map.begin()->second->getClusterId(); +} + +std::map<MemberId, Url> InitialStatusMap::getMemberUrls() { + assert(isComplete()); + assert(!isUpdateNeeded()); + std::map<MemberId, Url> urlMap; + for (Map::iterator i = map.begin(); i != map.end(); ++i) { + assert(i->second); + urlMap.insert(std::make_pair(i->first, i->second->getUrl())); + } + return urlMap; +} + }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/InitialStatusMap.h b/qpid/cpp/src/qpid/cluster/InitialStatusMap.h index d139722623..4605a4c1fe 100644 --- a/qpid/cpp/src/qpid/cluster/InitialStatusMap.h +++ b/qpid/cpp/src/qpid/cluster/InitialStatusMap.h @@ -22,7 +22,7 @@ * */ -#include "types.h" +#include "MemberSet.h" #include <qpid/framing/ClusterInitialStatusBody.h> #include <boost/optional.hpp> @@ -48,10 +48,19 @@ class InitialStatusMap /**@return true if the map is complete. */ bool isComplete(); - /**@pre isComplete. @return this node's elders */ + /**@return true if the map was completed by the last config change or received. */ + bool transitionToComplete(); + /**@pre isComplete(). @return this node's elders */ MemberSet getElders(); - /**@pre isComplete. @return True if we need an update. */ + /**@pre isComplete(). @return True if we need an update. */ bool isUpdateNeeded(); + /**@pre isComplete(). @return Cluster-wide cluster ID. */ + framing::Uuid getClusterId(); + + /**@pre isComplete() && !isUpdateNeeded(). + *@return member->URL map for all members. + */ + std::map<MemberId, Url> getMemberUrls(); private: typedef std::map<MemberId, boost::optional<Status> > Map; @@ -61,7 +70,7 @@ class InitialStatusMap Map map; MemberSet firstConfig; MemberId self; - bool complete, updateNeeded, resendNeeded; + bool completed, resendNeeded; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/MemberSet.cpp b/qpid/cpp/src/qpid/cluster/MemberSet.cpp new file mode 100644 index 0000000000..f8aa499818 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/MemberSet.cpp @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "MemberSet.h" +#include <ostream> + +namespace qpid { +namespace cluster { + +MemberSet decodeMemberSet(const std::string& s) { + MemberSet set; + for (std::string::const_iterator i = s.begin(); i < s.end(); i += 8) { + assert(size_t(i-s.begin())+8 <= s.size()); + set.insert(MemberId(std::string(i, i+8))); + } + return set; +} + +MemberSet intersection(const MemberSet& a, const MemberSet& b) +{ + MemberSet intersection; + std::set_intersection(a.begin(), a.end(), + b.begin(), b.end(), + std::inserter(intersection, intersection.begin())); + return intersection; + +} + +std::ostream& operator<<(std::ostream& o, const MemberSet& ms) { + copy(ms.begin(), ms.end(), std::ostream_iterator<MemberId>(o, " ")); + return o; +} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/MemberSet.h b/qpid/cpp/src/qpid/cluster/MemberSet.h new file mode 100644 index 0000000000..df3df7c319 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/MemberSet.h @@ -0,0 +1,43 @@ +#ifndef QPID_CLUSTER_MEMBERSET_H +#define QPID_CLUSTER_MEMBERSET_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include <set> +#include <iosfwd> + +namespace qpid { +namespace cluster { + +typedef std::set<MemberId> MemberSet; + +MemberSet decodeMemberSet(const std::string&); + +MemberSet intersection(const MemberSet& a, const MemberSet& b); + +std::ostream& operator<<(std::ostream& o, const MemberSet& ms); + + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_MEMBERSET_H*/ diff --git a/qpid/cpp/src/qpid/cluster/types.h b/qpid/cpp/src/qpid/cluster/types.h index 6777c9674c..c25370b6b6 100644 --- a/qpid/cpp/src/qpid/cluster/types.h +++ b/qpid/cpp/src/qpid/cluster/types.h @@ -29,8 +29,6 @@ #include <utility> #include <iosfwd> #include <string> -#include <set> - extern "C" { #if defined (HAVE_OPENAIS_CPG_H) @@ -76,8 +74,6 @@ struct ConnectionId : public std::pair<MemberId, uint64_t> { uint64_t getNumber() const { return second; } }; -typedef std::set<MemberId> MemberSet; - std::ostream& operator<<(std::ostream&, const ConnectionId&); std::ostream& operator<<(std::ostream&, EventType); diff --git a/qpid/cpp/src/qpid/framing/AMQContentBody.cpp b/qpid/cpp/src/qpid/framing/AMQContentBody.cpp index d0ae9a2902..72f7d9978e 100644 --- a/qpid/cpp/src/qpid/framing/AMQContentBody.cpp +++ b/qpid/cpp/src/qpid/framing/AMQContentBody.cpp @@ -40,5 +40,7 @@ void qpid::framing::AMQContentBody::decode(Buffer& buffer, uint32_t _size){ void qpid::framing::AMQContentBody::print(std::ostream& out) const { out << "content (" << encodedSize() << " bytes)"; - out << " " << data.substr(0,16) << "..."; + const size_t max = 32; + out << " " << data.substr(0, max); + if (data.size() > max) out << "..."; } diff --git a/qpid/cpp/src/tests/InitialStatusMap.cpp b/qpid/cpp/src/tests/InitialStatusMap.cpp index 7709b1fbfc..70b077b695 100644 --- a/qpid/cpp/src/tests/InitialStatusMap.cpp +++ b/qpid/cpp/src/tests/InitialStatusMap.cpp @@ -20,6 +20,7 @@ #include "unit_test.h" #include "test_tools.h" #include "qpid/cluster/InitialStatusMap.h" +#include "qpid/framing/Uuid.h" #include <boost/assign.hpp> using namespace std; @@ -34,58 +35,72 @@ QPID_AUTO_TEST_SUITE(InitialStatusMapTestSuite) typedef InitialStatusMap::Status Status; -Status activeStatus() { return Status(ProtocolVersion(), true, false, FieldTable()); } -Status newcomerStatus() { return Status(ProtocolVersion(), false, false, FieldTable()); } +Status activeStatus(const Uuid& id=Uuid()) { return Status(ProtocolVersion(), true, false, id, 0, ""); } +Status newcomerStatus(const Uuid& id=Uuid()) { return Status(ProtocolVersion(), false, false, id, 0, ""); } QPID_AUTO_TEST_CASE(testFirstInCluster) { // Single member is first in cluster. InitialStatusMap map(MemberId(0)); + Uuid id(true); BOOST_CHECK(!map.isComplete()); MemberSet members = list_of(MemberId(0)); map.configChange(members); BOOST_CHECK(!map.isComplete()); - map.received(MemberId(0), newcomerStatus()); + map.received(MemberId(0), newcomerStatus(id)); BOOST_CHECK(map.isComplete()); + BOOST_CHECK(map.transitionToComplete()); BOOST_CHECK(map.getElders().empty()); BOOST_CHECK(!map.isUpdateNeeded()); + BOOST_CHECK_EQUAL(id, map.getClusterId()); } QPID_AUTO_TEST_CASE(testJoinExistingCluster) { // Single member 0 joins existing cluster 1,2 InitialStatusMap map(MemberId(0)); + Uuid id(true); MemberSet members = list_of(MemberId(0))(MemberId(1))(MemberId(2)); map.configChange(members); BOOST_CHECK(map.isResendNeeded()); BOOST_CHECK(!map.isComplete()); map.received(MemberId(0), newcomerStatus()); - map.received(MemberId(1), activeStatus()); + map.received(MemberId(1), activeStatus(id)); BOOST_CHECK(!map.isComplete()); - map.received(MemberId(2), activeStatus()); + map.received(MemberId(2), activeStatus(id)); BOOST_CHECK(map.isComplete()); + BOOST_CHECK(map.transitionToComplete()); BOOST_CHECK_EQUAL(map.getElders(), list_of<MemberId>(1)(2)); BOOST_CHECK(map.isUpdateNeeded()); + BOOST_CHECK_EQUAL(map.getClusterId(), id); + + // Check that transitionToComplete is reset. + map.configChange(list_of<MemberId>(0)(1)); + BOOST_CHECK(!map.transitionToComplete()); } QPID_AUTO_TEST_CASE(testMultipleFirstInCluster) { // Multiple members 0,1,2 join at same time. InitialStatusMap map(MemberId(1)); // self is 1 + Uuid id(true); MemberSet members = list_of(MemberId(0))(MemberId(1))(MemberId(2)); map.configChange(members); BOOST_CHECK(map.isResendNeeded()); // All new members - map.received(MemberId(0), newcomerStatus()); + map.received(MemberId(0), newcomerStatus(id)); map.received(MemberId(1), newcomerStatus()); map.received(MemberId(2), newcomerStatus()); BOOST_CHECK(!map.isResendNeeded()); BOOST_CHECK(map.isComplete()); + BOOST_CHECK(map.transitionToComplete()); BOOST_CHECK_EQUAL(map.getElders(), list_of(MemberId(2))); BOOST_CHECK(!map.isUpdateNeeded()); + BOOST_CHECK_EQUAL(map.getClusterId(), id); } QPID_AUTO_TEST_CASE(testMultipleJoinExisting) { // Multiple members 1,2,3 join existing cluster containing 0. InitialStatusMap map(MemberId(2)); // self is 2 + Uuid id(true); MemberSet members = list_of(MemberId(0))(MemberId(1))(MemberId(2))(MemberId(3)); map.configChange(members); BOOST_CHECK(map.isResendNeeded()); @@ -93,28 +108,34 @@ QPID_AUTO_TEST_CASE(testMultipleJoinExisting) { map.received(MemberId(1), newcomerStatus()); map.received(MemberId(2), newcomerStatus()); map.received(MemberId(3), newcomerStatus()); - map.received(MemberId(0), activeStatus()); + map.received(MemberId(0), activeStatus(id)); BOOST_CHECK(!map.isResendNeeded()); BOOST_CHECK(map.isComplete()); + BOOST_CHECK(map.transitionToComplete()); BOOST_CHECK_EQUAL(map.getElders(), list_of(MemberId(0))(MemberId(3))); BOOST_CHECK(map.isUpdateNeeded()); + BOOST_CHECK_EQUAL(map.getClusterId(), id); } QPID_AUTO_TEST_CASE(testMembersLeave) { // Test that map completes if members leave rather than send status. InitialStatusMap map(MemberId(0)); + Uuid id(true); map.configChange(list_of(MemberId(0))(MemberId(1))(MemberId(2))); map.received(MemberId(0), newcomerStatus()); - map.received(MemberId(1), activeStatus()); + map.received(MemberId(1), activeStatus(id)); BOOST_CHECK(!map.isComplete()); map.configChange(list_of(MemberId(0))(MemberId(1))); // 2 left BOOST_CHECK(map.isComplete()); + BOOST_CHECK(map.transitionToComplete()); BOOST_CHECK_EQUAL(map.getElders(), list_of(MemberId(1))); + BOOST_CHECK_EQUAL(map.getClusterId(), id); } QPID_AUTO_TEST_CASE(testInteveningConfig) { // Multiple config changes arrives before we complete the map. InitialStatusMap map(MemberId(0)); + Uuid id(true); map.configChange(list_of<MemberId>(0)(1)); BOOST_CHECK(map.isResendNeeded()); @@ -125,7 +146,7 @@ QPID_AUTO_TEST_CASE(testInteveningConfig) { map.configChange(list_of<MemberId>(0)(1)(2)); BOOST_CHECK(!map.isComplete()); BOOST_CHECK(map.isResendNeeded()); - map.received(1, activeStatus()); + map.received(1, activeStatus(id)); map.received(2, newcomerStatus()); // We should not be complete as we haven't received 0 since new member joined BOOST_CHECK(!map.isComplete()); @@ -133,7 +154,9 @@ QPID_AUTO_TEST_CASE(testInteveningConfig) { map.received(0, newcomerStatus()); BOOST_CHECK(map.isComplete()); + BOOST_CHECK(map.transitionToComplete()); BOOST_CHECK_EQUAL(map.getElders(), list_of<MemberId>(1)); + BOOST_CHECK_EQUAL(map.getClusterId(), id); } QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 637b0aea0f..3ded6c103e 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -29,6 +29,11 @@ from threading import Thread class ClusterTests(BrokerTest): """Cluster tests with support for testing with a store plugin.""" + def duration(self): + d = self.config.defines.get("DURATION") + if d: return float(d)*60 + else: return 3 + def test_message_replication(self): """Test basic cluster message replication.""" # Start a cluster, send some messages to member 0. @@ -66,11 +71,10 @@ class ClusterTests(BrokerTest): sender = NumberedSender(cluster[2]) sender.start() - # Kill original brokers, start new ones. - endtime = time.time() + (int(self.config.defines.get("DURATION") or 3)) + # Kill original brokers, start new ones for the duration. + endtime = time.time() + self.duration() i = 0 while time.time() < endtime: - print time.time(), endtime cluster[i].kill() i += 1 b = cluster.start(expect=EXPECT_EXIT_FAIL) |