diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 41 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/InitialStatusMap.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/StoreStatus.cpp | 73 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/StoreStatus.h | 6 | ||||
-rw-r--r-- | cpp/src/tests/InitialStatusMap.cpp | 7 | ||||
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 25 |
11 files changed, 135 insertions, 59 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index cdeb89188b..320111c2e1 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -186,10 +186,12 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } void initialStatus(uint32_t version, bool active, const Uuid& clusterId, - uint8_t storeState, const Uuid& shutdownId) + uint8_t storeState, const Uuid& shutdownId, + const framing::SequenceNumber& configSeq) { - cluster.initialStatus(member, version, active, clusterId, - framing::cluster::StoreState(storeState), shutdownId, l); + cluster.initialStatus( + member, version, active, clusterId, + framing::cluster::StoreState(storeState), shutdownId, configSeq, l); } void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& current) { cluster.configChange(member, current, l); } @@ -521,28 +523,17 @@ Cluster::ConnectionVector Cluster::getConnections(Lock&) { struct AddrList { const cpg_address* addrs; int count; - const char *prefix, *suffix; - AddrList(const cpg_address* a, int n, const char* p="", const char* s="") - : addrs(a), count(n), prefix(p), suffix(s) {} + const char *prefix; + AddrList(const cpg_address* a, int n, const char* p="") + : addrs(a), count(n), prefix(p) {} }; ostream& operator<<(ostream& o, const AddrList& a) { if (!a.count) return o; o << a.prefix; - for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) { - const char* reasonString; - switch (p->reason) { - case CPG_REASON_JOIN: reasonString = "(joined) "; break; - case CPG_REASON_LEAVE: reasonString = "(left) "; break; - case CPG_REASON_NODEDOWN: reasonString = "(node-down) "; break; - case CPG_REASON_NODEUP: reasonString = "(node-up) "; break; - case CPG_REASON_PROCDOWN: reasonString = "(process-down) "; break; - default: reasonString = " "; - } - qpid::cluster::MemberId member(*p); - o << member << reasonString; - } - return o << a.suffix; + for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) + o << qpid::cluster::MemberId(*p) << " "; + return o; } void Cluster::configChange ( @@ -599,6 +590,8 @@ void Cluster::initMapCompleted(Lock& l) { } else { // I can go ready. discarding = false; + map.resetConfigSeq(); // Start from config-seq = 0 + store.setConfigSeq(map.getConfigSeq()); setReady(l); memberUpdate(l); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); @@ -618,6 +611,8 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& return; } bool memberChange = map.configChange(config); + QPID_LOG(debug, "Config sequence " << map.getConfigSeq()); + store.setConfigSeq(map.getConfigSeq()); // Update initital status for new members joining. initMap.configChange(config); @@ -625,7 +620,7 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& mcast.mcastControl( ClusterInitialStatusBody( ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, - store.getState(), store.getShutdownId() + store.getState(), store.getShutdownId(), store.getConfigSeq() ), self); } @@ -671,6 +666,7 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ const framing::Uuid& id, framing::cluster::StoreState store, const framing::Uuid& shutdownId, + const framing::SequenceNumber& configSeq, Lock& l) { if (version != CLUSTER_VERSION) { @@ -681,7 +677,8 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ } initMap.received( member, - ClusterInitialStatusBody(ProtocolVersion(), version, active, id, store, shutdownId) + ClusterInitialStatusBody(ProtocolVersion(), version, active, id, + store, shutdownId, configSeq) ); if (initMap.transitionToComplete()) initMapCompleted(l); } diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 7872588307..79b84f1172 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -154,6 +154,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { const framing::Uuid& clusterId, framing::cluster::StoreState, const framing::Uuid& shutdownId, + const framing::SequenceNumber& configSeq, Lock&); void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& current, Lock& l); diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp index 85ed447113..c050293717 100644 --- a/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/cpp/src/qpid/cluster/ClusterMap.cpp @@ -57,15 +57,17 @@ void assignFieldTable(FieldTable& ft, const ClusterMap::Map& map) { } -ClusterMap::ClusterMap() : frameSeq(0) {} +ClusterMap::ClusterMap() : frameSeq(0), configSeq(0) {} -ClusterMap::ClusterMap(const Map& map) : frameSeq(0) { +ClusterMap::ClusterMap(const Map& map) : frameSeq(0), configSeq(0) { transform(map.begin(), map.end(), inserter(alive, alive.begin()), bind(&Map::value_type::first, _1)); members = map; } -ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, framing::SequenceNumber frameSeq_) - : frameSeq(frameSeq_) +ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, + framing::SequenceNumber frameSeq_, + framing::SequenceNumber configSeq_) + : frameSeq(frameSeq_), configSeq(configSeq_) { for_each(joinersFt.begin(), joinersFt.end(), bind(&addFieldTableValue, _1, ref(joiners), ref(alive))); for_each(membersFt.begin(), membersFt.end(), bind(&addFieldTableValue, _1, ref(members), ref(alive))); @@ -81,6 +83,7 @@ void ClusterMap::toMethodBody(framing::ClusterConnectionMembershipBody& b) const b.getMembers().clear(); for_each(members.begin(), members.end(), bind(&insertFieldTableFromMapValue, ref(b.getMembers()), _1)); b.setFrameSeq(frameSeq); + b.setConfigSeq(configSeq); } Url ClusterMap::getUrl(const Map& map, const MemberId& id) { @@ -133,6 +136,7 @@ ostream& operator<<(ostream& o, const ClusterMap& m) { else o << "(unknown)"; o << " "; } + o << "frameSeq=" << m.getFrameSeq() << " configSeq=" << m.getConfigSeq(); return o; } @@ -153,6 +157,7 @@ bool ClusterMap::ready(const MemberId& id, const Url& url) { } bool ClusterMap::configChange(const Set& update) { + ++configSeq; bool memberChange = false; Set removed; set_difference(alive.begin(), alive.end(), diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h index 98572813a8..604d5c2d73 100644 --- a/cpp/src/qpid/cluster/ClusterMap.h +++ b/cpp/src/qpid/cluster/ClusterMap.h @@ -49,7 +49,8 @@ class ClusterMap { ClusterMap(); ClusterMap(const Map& map); - ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, framing::SequenceNumber frameSeq); + ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, + framing::SequenceNumber frameSeq, framing::SequenceNumber configSeq); /** Update from config change. *@return true if member set changed. @@ -83,8 +84,10 @@ class ClusterMap { /**@return true If this is a new member */ bool ready(const MemberId& id, const Url&); - framing::SequenceNumber getFrameSeq() { return frameSeq; } + framing::SequenceNumber getFrameSeq() const { return frameSeq; } framing::SequenceNumber incrementFrameSeq() { return ++frameSeq; } + framing::SequenceNumber getConfigSeq() const { return configSeq; } + void resetConfigSeq() { configSeq = 0; } /** Clear out all knowledge of joiners & members, just keep alive set */ void clearStatus() { joiners.clear(); members.clear(); } @@ -94,7 +97,7 @@ class ClusterMap { Map joiners, members; Set alive; - framing::SequenceNumber frameSeq; + framing::SequenceNumber frameSeq, configSeq; friend std::ostream& operator<<(std::ostream&, const Map&); friend std::ostream& operator<<(std::ostream&, const ClusterMap&); diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index d223244f15..3a5d121dc1 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -322,9 +322,12 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const str output.setSendMax(sendMax); } -void Connection::membership(const FieldTable& joiners, const FieldTable& members, const framing::SequenceNumber& frameSeq) { +void Connection::membership(const FieldTable& joiners, const FieldTable& members, + const framing::SequenceNumber& frameSeq, + const framing::SequenceNumber& configSeq) +{ QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); - cluster.updateInDone(ClusterMap(joiners, members, frameSeq)); + cluster.updateInDone(ClusterMap(joiners, members, frameSeq, configSeq)); consumerNumbering.clear(); self.second = 0; // Mark this as completed update connection. } diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 7f94338348..51e6107bfd 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -125,7 +125,9 @@ class Connection : void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment, uint32_t sendMax); - void membership(const framing::FieldTable&, const framing::FieldTable&, const framing::SequenceNumber& frameSeq); + void membership(const framing::FieldTable&, const framing::FieldTable&, + const framing::SequenceNumber& frameSeq, + const framing::SequenceNumber& configSeq); void retractOffer(); diff --git a/cpp/src/qpid/cluster/InitialStatusMap.cpp b/cpp/src/qpid/cluster/InitialStatusMap.cpp index 59338f89d4..c6de488a40 100644 --- a/cpp/src/qpid/cluster/InitialStatusMap.cpp +++ b/cpp/src/qpid/cluster/InitialStatusMap.cpp @@ -20,7 +20,9 @@ */ #include "InitialStatusMap.h" #include "StoreStatus.h" +#include "qpid/log/Statement.h" #include <algorithm> +#include <vector> #include <boost/bind.hpp> namespace qpid { @@ -138,7 +140,7 @@ MemberSet InitialStatusMap::getElders() { } // Get cluster ID from an active member or the youngest newcomer. -framing::Uuid InitialStatusMap::getClusterId() { +Uuid InitialStatusMap::getClusterId() { assert(isComplete()); assert(!map.empty()); Map::iterator i = find_if(map.begin(), map.end(), &isActive); @@ -166,6 +168,7 @@ void InitialStatusMap::checkConsistent() { Uuid shutdownId; for (Map::iterator i = map.begin(); i != map.end(); ++i) { + assert(i->second); if (i->second->getActive()) ++active; switch (i->second->getStoreState()) { case STORE_STATE_NO_STORE: ++none; break; @@ -187,10 +190,11 @@ void InitialStatusMap::checkConsistent() { // Can't mix transient and persistent members. if (none && (clean+dirty+empty)) throw Exception("Mixing transient and persistent brokers in a cluster"); + // If there are no active members and there are dirty stores there // must be at least one clean store. if (!active && dirty && !clean) - throw Exception("Cannot recover, no clean store"); + throw Exception("Cannot recover, no clean store."); } diff --git a/cpp/src/qpid/cluster/StoreStatus.cpp b/cpp/src/qpid/cluster/StoreStatus.cpp index 6e412c23f7..bf4adb76bb 100644 --- a/cpp/src/qpid/cluster/StoreStatus.cpp +++ b/cpp/src/qpid/cluster/StoreStatus.cpp @@ -34,7 +34,7 @@ namespace fs=boost::filesystem; using std::ostream; StoreStatus::StoreStatus(const std::string& d) - : state(STORE_STATE_NO_STORE), dataDir(d) + : state(STORE_STATE_NO_STORE), dataDir(d), configSeq(0) {} namespace { @@ -42,6 +42,7 @@ namespace { const char* SUBDIR="cluster"; const char* CLUSTER_ID_FILE="cluster.uuid"; const char* SHUTDOWN_ID_FILE="shutdown.uuid"; +const char* CONFIG_SEQ_FILE="config.seq"; Uuid loadUuid(const fs::path& path) { Uuid ret; @@ -62,23 +63,39 @@ void saveUuid(const fs::path& path, const Uuid& uuid) { void StoreStatus::load() { fs::path dir = fs::path(dataDir, fs::native)/SUBDIR; - create_directory(dir); - clusterId = loadUuid(dir/CLUSTER_ID_FILE); - shutdownId = loadUuid(dir/SHUTDOWN_ID_FILE); - - if (clusterId && shutdownId) state = STORE_STATE_CLEAN_STORE; - else if (clusterId) state = STORE_STATE_DIRTY_STORE; - else state = STORE_STATE_EMPTY_STORE; + try { + create_directory(dir); + clusterId = loadUuid(dir/CLUSTER_ID_FILE); + shutdownId = loadUuid(dir/SHUTDOWN_ID_FILE); + fs::ifstream is(dir/CONFIG_SEQ_FILE); + uint32_t n; + is >> n; + configSeq = framing::SequenceNumber(n); + if (clusterId && shutdownId) state = STORE_STATE_CLEAN_STORE; + else if (clusterId) state = STORE_STATE_DIRTY_STORE; + else state = STORE_STATE_EMPTY_STORE; + } + catch (const std::exception&e) { + throw Exception(QPID_MSG("Cannot load cluster store status: " << e.what())); + } } void StoreStatus::save() { fs::path dir = fs::path(dataDir, fs::native)/SUBDIR; - create_directory(dir); - saveUuid(dir/CLUSTER_ID_FILE, clusterId); - saveUuid(dir/SHUTDOWN_ID_FILE, shutdownId); + try { + create_directory(dir); + saveUuid(dir/CLUSTER_ID_FILE, clusterId); + saveUuid(dir/SHUTDOWN_ID_FILE, shutdownId); + fs::ofstream os(dir/CONFIG_SEQ_FILE); + os << configSeq.getValue(); + } + catch (const std::exception&e) { + throw Exception(QPID_MSG("Cannot save cluster store status: " << e.what())); + } } void StoreStatus::dirty(const Uuid& clusterId_) { + assert(clusterId_); clusterId = clusterId_; shutdownId = Uuid(); state = STORE_STATE_DIRTY_STORE; @@ -86,22 +103,38 @@ void StoreStatus::dirty(const Uuid& clusterId_) { } void StoreStatus::clean(const Uuid& shutdownId_) { + assert(shutdownId_); state = STORE_STATE_CLEAN_STORE; shutdownId = shutdownId_; save(); } +void StoreStatus::setConfigSeq(framing::SequenceNumber seq) { + configSeq = seq; + save(); +} + +const char* stateName(StoreState s) { + switch (s) { + case STORE_STATE_NO_STORE: return "none"; + case STORE_STATE_EMPTY_STORE: return "empty"; + case STORE_STATE_DIRTY_STORE: return "dirty"; + case STORE_STATE_CLEAN_STORE: return "clean"; + } + assert(0); + return "unknown"; +} + +ostream& operator<<(ostream& o, framing::cluster::StoreState s) { return o << stateName(s); } + ostream& operator<<(ostream& o, const StoreStatus& s) { - switch (s.getState()) { - case STORE_STATE_NO_STORE: o << "no store"; break; - case STORE_STATE_EMPTY_STORE: o << "empty store"; break; - case STORE_STATE_DIRTY_STORE: - o << "dirty store, cluster-id=" << s.getClusterId(); - break; - case STORE_STATE_CLEAN_STORE: - o << "clean store, cluster-id=" << s.getClusterId() + o << s.getState(); + if (s.getState() == STORE_STATE_DIRTY_STORE) + o << " cluster-id=" << s.getClusterId() + << " config-sequence=" << s.getConfigSeq(); + if (s.getState() == STORE_STATE_CLEAN_STORE) { + o << " cluster-id=" << s.getClusterId() << " shutdown-id=" << s.getShutdownId(); - break; } return o; } diff --git a/cpp/src/qpid/cluster/StoreStatus.h b/cpp/src/qpid/cluster/StoreStatus.h index 522020ed69..911b3a2ba2 100644 --- a/cpp/src/qpid/cluster/StoreStatus.h +++ b/cpp/src/qpid/cluster/StoreStatus.h @@ -23,6 +23,7 @@ */ #include "qpid/framing/Uuid.h" +#include "qpid/framing/SequenceNumber.h" #include "qpid/framing/enum.h" #include <iosfwd> @@ -43,9 +44,11 @@ class StoreStatus framing::cluster::StoreState getState() const { return state; } const Uuid& getClusterId() const { return clusterId; } const Uuid& getShutdownId() const { return shutdownId; } + framing::SequenceNumber getConfigSeq() const { return configSeq; } void dirty(const Uuid& start); // Start using the store. void clean(const Uuid& stop); // Stop using the store. + void setConfigSeq(framing::SequenceNumber seq); // Update the config seq number. void load(); void save(); @@ -56,8 +59,11 @@ class StoreStatus framing::cluster::StoreState state; Uuid clusterId, shutdownId; std::string dataDir; + framing::SequenceNumber configSeq; }; +const char* stateName(framing::cluster::StoreState); +std::ostream& operator<<(std::ostream&, framing::cluster::StoreState); std::ostream& operator<<(std::ostream&, const StoreStatus&); }} // namespace qpid::cluster diff --git a/cpp/src/tests/InitialStatusMap.cpp b/cpp/src/tests/InitialStatusMap.cpp index 82450eac22..63214ee395 100644 --- a/cpp/src/tests/InitialStatusMap.cpp +++ b/cpp/src/tests/InitialStatusMap.cpp @@ -37,15 +37,15 @@ QPID_AUTO_TEST_SUITE(InitialStatusMapTestSuite) typedef InitialStatusMap::Status Status; Status activeStatus(const Uuid& id=Uuid()) { - return Status(ProtocolVersion(), 0, true, id, STORE_STATE_NO_STORE, Uuid()); + return Status(ProtocolVersion(), 0, true, id, STORE_STATE_NO_STORE, Uuid(), 0); } Status newcomerStatus(const Uuid& id=Uuid()) { - return Status(ProtocolVersion(), 0, false, id, STORE_STATE_NO_STORE, Uuid()); + return Status(ProtocolVersion(), 0, false, id, STORE_STATE_NO_STORE, Uuid(), 0); } Status storeStatus(bool active, StoreState state, Uuid start=Uuid(), Uuid stop=Uuid()) { - return Status(ProtocolVersion(), 0, active, start, state, stop); + return Status(ProtocolVersion(), 0, active, start, state, stop, 0); } QPID_AUTO_TEST_CASE(testFirstInCluster) { @@ -241,7 +241,6 @@ QPID_AUTO_TEST_CASE(testEmptyAlone) { } // FIXME aconway 2009-11-20: consistency tests for mixed stores, -// tests for manual intervention case. QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index 4bfbca415a..089d42fd91 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -85,6 +85,29 @@ class ShortTests(BrokerTest): os.remove("direct.dump") os.remove("updatee.dump") + def test_config_change_seq(self): + """Check that cluster members have the correct config change sequence numbers""" + cluster = self.cluster(0) + cluster.start() + cluster.start(expect=EXPECT_EXIT_OK) + cluster[1].terminate(); cluster[1].wait() + cluster.start() + + update_re = re.compile(r"member update: (.*) frameSeq=[0-9]+ configSeq=([0-9]+)") + matches = [ update_re.search(file(b.log).read()) for b in cluster ] + sequences = [ m.group(2) for m in matches] + self.assertEqual(sequences, ["0", "1", "3"]) + + # Check that configurations with same seq. number match + configs={} + for b in cluster: + matches = update_re.findall(file(b.log).read()) + for m in matches: + seq=m[1] + config=re.sub("\((member|unknown)\)", "", m[0]) + if not seq in configs: configs[seq] = config + else: self.assertEqual(configs[seq], config) + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): @@ -234,7 +257,7 @@ class StoreTests(BrokerTest): msg = re.compile("critical.*no clean store") assert a.search_log(msg) assert b.search_log(msg) - # FIXME aconway 2009-12-03: verify correct store ID in log message + # FIXME aconway 2009-12-03: verify manual restore procedure |