summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp41
-rw-r--r--cpp/src/qpid/cluster/Cluster.h1
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.cpp13
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.h9
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp7
-rw-r--r--cpp/src/qpid/cluster/Connection.h4
-rw-r--r--cpp/src/qpid/cluster/InitialStatusMap.cpp8
-rw-r--r--cpp/src/qpid/cluster/StoreStatus.cpp73
-rw-r--r--cpp/src/qpid/cluster/StoreStatus.h6
-rw-r--r--cpp/src/tests/InitialStatusMap.cpp7
-rwxr-xr-xcpp/src/tests/cluster_tests.py25
11 files changed, 135 insertions, 59 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index cdeb89188b..320111c2e1 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -186,10 +186,12 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
void initialStatus(uint32_t version, bool active, const Uuid& clusterId,
- uint8_t storeState, const Uuid& shutdownId)
+ uint8_t storeState, const Uuid& shutdownId,
+ const framing::SequenceNumber& configSeq)
{
- cluster.initialStatus(member, version, active, clusterId,
- framing::cluster::StoreState(storeState), shutdownId, l);
+ cluster.initialStatus(
+ member, version, active, clusterId,
+ framing::cluster::StoreState(storeState), shutdownId, configSeq, l);
}
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& current) { cluster.configChange(member, current, l); }
@@ -521,28 +523,17 @@ Cluster::ConnectionVector Cluster::getConnections(Lock&) {
struct AddrList {
const cpg_address* addrs;
int count;
- const char *prefix, *suffix;
- AddrList(const cpg_address* a, int n, const char* p="", const char* s="")
- : addrs(a), count(n), prefix(p), suffix(s) {}
+ const char *prefix;
+ AddrList(const cpg_address* a, int n, const char* p="")
+ : addrs(a), count(n), prefix(p) {}
};
ostream& operator<<(ostream& o, const AddrList& a) {
if (!a.count) return o;
o << a.prefix;
- for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) {
- const char* reasonString;
- switch (p->reason) {
- case CPG_REASON_JOIN: reasonString = "(joined) "; break;
- case CPG_REASON_LEAVE: reasonString = "(left) "; break;
- case CPG_REASON_NODEDOWN: reasonString = "(node-down) "; break;
- case CPG_REASON_NODEUP: reasonString = "(node-up) "; break;
- case CPG_REASON_PROCDOWN: reasonString = "(process-down) "; break;
- default: reasonString = " ";
- }
- qpid::cluster::MemberId member(*p);
- o << member << reasonString;
- }
- return o << a.suffix;
+ for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p)
+ o << qpid::cluster::MemberId(*p) << " ";
+ return o;
}
void Cluster::configChange (
@@ -599,6 +590,8 @@ void Cluster::initMapCompleted(Lock& l) {
}
else { // I can go ready.
discarding = false;
+ map.resetConfigSeq(); // Start from config-seq = 0
+ store.setConfigSeq(map.getConfigSeq());
setReady(l);
memberUpdate(l);
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
@@ -618,6 +611,8 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock&
return;
}
bool memberChange = map.configChange(config);
+ QPID_LOG(debug, "Config sequence " << map.getConfigSeq());
+ store.setConfigSeq(map.getConfigSeq());
// Update initital status for new members joining.
initMap.configChange(config);
@@ -625,7 +620,7 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock&
mcast.mcastControl(
ClusterInitialStatusBody(
ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
- store.getState(), store.getShutdownId()
+ store.getState(), store.getShutdownId(), store.getConfigSeq()
),
self);
}
@@ -671,6 +666,7 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ
const framing::Uuid& id,
framing::cluster::StoreState store,
const framing::Uuid& shutdownId,
+ const framing::SequenceNumber& configSeq,
Lock& l)
{
if (version != CLUSTER_VERSION) {
@@ -681,7 +677,8 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ
}
initMap.received(
member,
- ClusterInitialStatusBody(ProtocolVersion(), version, active, id, store, shutdownId)
+ ClusterInitialStatusBody(ProtocolVersion(), version, active, id,
+ store, shutdownId, configSeq)
);
if (initMap.transitionToComplete()) initMapCompleted(l);
}
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 7872588307..79b84f1172 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -154,6 +154,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
const framing::Uuid& clusterId,
framing::cluster::StoreState,
const framing::Uuid& shutdownId,
+ const framing::SequenceNumber& configSeq,
Lock&);
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& current, Lock& l);
diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp
index 85ed447113..c050293717 100644
--- a/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -57,15 +57,17 @@ void assignFieldTable(FieldTable& ft, const ClusterMap::Map& map) {
}
-ClusterMap::ClusterMap() : frameSeq(0) {}
+ClusterMap::ClusterMap() : frameSeq(0), configSeq(0) {}
-ClusterMap::ClusterMap(const Map& map) : frameSeq(0) {
+ClusterMap::ClusterMap(const Map& map) : frameSeq(0), configSeq(0) {
transform(map.begin(), map.end(), inserter(alive, alive.begin()), bind(&Map::value_type::first, _1));
members = map;
}
-ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, framing::SequenceNumber frameSeq_)
- : frameSeq(frameSeq_)
+ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt,
+ framing::SequenceNumber frameSeq_,
+ framing::SequenceNumber configSeq_)
+ : frameSeq(frameSeq_), configSeq(configSeq_)
{
for_each(joinersFt.begin(), joinersFt.end(), bind(&addFieldTableValue, _1, ref(joiners), ref(alive)));
for_each(membersFt.begin(), membersFt.end(), bind(&addFieldTableValue, _1, ref(members), ref(alive)));
@@ -81,6 +83,7 @@ void ClusterMap::toMethodBody(framing::ClusterConnectionMembershipBody& b) const
b.getMembers().clear();
for_each(members.begin(), members.end(), bind(&insertFieldTableFromMapValue, ref(b.getMembers()), _1));
b.setFrameSeq(frameSeq);
+ b.setConfigSeq(configSeq);
}
Url ClusterMap::getUrl(const Map& map, const MemberId& id) {
@@ -133,6 +136,7 @@ ostream& operator<<(ostream& o, const ClusterMap& m) {
else o << "(unknown)";
o << " ";
}
+ o << "frameSeq=" << m.getFrameSeq() << " configSeq=" << m.getConfigSeq();
return o;
}
@@ -153,6 +157,7 @@ bool ClusterMap::ready(const MemberId& id, const Url& url) {
}
bool ClusterMap::configChange(const Set& update) {
+ ++configSeq;
bool memberChange = false;
Set removed;
set_difference(alive.begin(), alive.end(),
diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h
index 98572813a8..604d5c2d73 100644
--- a/cpp/src/qpid/cluster/ClusterMap.h
+++ b/cpp/src/qpid/cluster/ClusterMap.h
@@ -49,7 +49,8 @@ class ClusterMap {
ClusterMap();
ClusterMap(const Map& map);
- ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, framing::SequenceNumber frameSeq);
+ ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members,
+ framing::SequenceNumber frameSeq, framing::SequenceNumber configSeq);
/** Update from config change.
*@return true if member set changed.
@@ -83,8 +84,10 @@ class ClusterMap {
/**@return true If this is a new member */
bool ready(const MemberId& id, const Url&);
- framing::SequenceNumber getFrameSeq() { return frameSeq; }
+ framing::SequenceNumber getFrameSeq() const { return frameSeq; }
framing::SequenceNumber incrementFrameSeq() { return ++frameSeq; }
+ framing::SequenceNumber getConfigSeq() const { return configSeq; }
+ void resetConfigSeq() { configSeq = 0; }
/** Clear out all knowledge of joiners & members, just keep alive set */
void clearStatus() { joiners.clear(); members.clear(); }
@@ -94,7 +97,7 @@ class ClusterMap {
Map joiners, members;
Set alive;
- framing::SequenceNumber frameSeq;
+ framing::SequenceNumber frameSeq, configSeq;
friend std::ostream& operator<<(std::ostream&, const Map&);
friend std::ostream& operator<<(std::ostream&, const ClusterMap&);
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index d223244f15..3a5d121dc1 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -322,9 +322,12 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const str
output.setSendMax(sendMax);
}
-void Connection::membership(const FieldTable& joiners, const FieldTable& members, const framing::SequenceNumber& frameSeq) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& members,
+ const framing::SequenceNumber& frameSeq,
+ const framing::SequenceNumber& configSeq)
+{
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
- cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
+ cluster.updateInDone(ClusterMap(joiners, members, frameSeq, configSeq));
consumerNumbering.clear();
self.second = 0; // Mark this as completed update connection.
}
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 7f94338348..51e6107bfd 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -125,7 +125,9 @@ class Connection :
void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment, uint32_t sendMax);
- void membership(const framing::FieldTable&, const framing::FieldTable&, const framing::SequenceNumber& frameSeq);
+ void membership(const framing::FieldTable&, const framing::FieldTable&,
+ const framing::SequenceNumber& frameSeq,
+ const framing::SequenceNumber& configSeq);
void retractOffer();
diff --git a/cpp/src/qpid/cluster/InitialStatusMap.cpp b/cpp/src/qpid/cluster/InitialStatusMap.cpp
index 59338f89d4..c6de488a40 100644
--- a/cpp/src/qpid/cluster/InitialStatusMap.cpp
+++ b/cpp/src/qpid/cluster/InitialStatusMap.cpp
@@ -20,7 +20,9 @@
*/
#include "InitialStatusMap.h"
#include "StoreStatus.h"
+#include "qpid/log/Statement.h"
#include <algorithm>
+#include <vector>
#include <boost/bind.hpp>
namespace qpid {
@@ -138,7 +140,7 @@ MemberSet InitialStatusMap::getElders() {
}
// Get cluster ID from an active member or the youngest newcomer.
-framing::Uuid InitialStatusMap::getClusterId() {
+Uuid InitialStatusMap::getClusterId() {
assert(isComplete());
assert(!map.empty());
Map::iterator i = find_if(map.begin(), map.end(), &isActive);
@@ -166,6 +168,7 @@ void InitialStatusMap::checkConsistent() {
Uuid shutdownId;
for (Map::iterator i = map.begin(); i != map.end(); ++i) {
+ assert(i->second);
if (i->second->getActive()) ++active;
switch (i->second->getStoreState()) {
case STORE_STATE_NO_STORE: ++none; break;
@@ -187,10 +190,11 @@ void InitialStatusMap::checkConsistent() {
// Can't mix transient and persistent members.
if (none && (clean+dirty+empty))
throw Exception("Mixing transient and persistent brokers in a cluster");
+
// If there are no active members and there are dirty stores there
// must be at least one clean store.
if (!active && dirty && !clean)
- throw Exception("Cannot recover, no clean store");
+ throw Exception("Cannot recover, no clean store.");
}
diff --git a/cpp/src/qpid/cluster/StoreStatus.cpp b/cpp/src/qpid/cluster/StoreStatus.cpp
index 6e412c23f7..bf4adb76bb 100644
--- a/cpp/src/qpid/cluster/StoreStatus.cpp
+++ b/cpp/src/qpid/cluster/StoreStatus.cpp
@@ -34,7 +34,7 @@ namespace fs=boost::filesystem;
using std::ostream;
StoreStatus::StoreStatus(const std::string& d)
- : state(STORE_STATE_NO_STORE), dataDir(d)
+ : state(STORE_STATE_NO_STORE), dataDir(d), configSeq(0)
{}
namespace {
@@ -42,6 +42,7 @@ namespace {
const char* SUBDIR="cluster";
const char* CLUSTER_ID_FILE="cluster.uuid";
const char* SHUTDOWN_ID_FILE="shutdown.uuid";
+const char* CONFIG_SEQ_FILE="config.seq";
Uuid loadUuid(const fs::path& path) {
Uuid ret;
@@ -62,23 +63,39 @@ void saveUuid(const fs::path& path, const Uuid& uuid) {
void StoreStatus::load() {
fs::path dir = fs::path(dataDir, fs::native)/SUBDIR;
- create_directory(dir);
- clusterId = loadUuid(dir/CLUSTER_ID_FILE);
- shutdownId = loadUuid(dir/SHUTDOWN_ID_FILE);
-
- if (clusterId && shutdownId) state = STORE_STATE_CLEAN_STORE;
- else if (clusterId) state = STORE_STATE_DIRTY_STORE;
- else state = STORE_STATE_EMPTY_STORE;
+ try {
+ create_directory(dir);
+ clusterId = loadUuid(dir/CLUSTER_ID_FILE);
+ shutdownId = loadUuid(dir/SHUTDOWN_ID_FILE);
+ fs::ifstream is(dir/CONFIG_SEQ_FILE);
+ uint32_t n;
+ is >> n;
+ configSeq = framing::SequenceNumber(n);
+ if (clusterId && shutdownId) state = STORE_STATE_CLEAN_STORE;
+ else if (clusterId) state = STORE_STATE_DIRTY_STORE;
+ else state = STORE_STATE_EMPTY_STORE;
+ }
+ catch (const std::exception&e) {
+ throw Exception(QPID_MSG("Cannot load cluster store status: " << e.what()));
+ }
}
void StoreStatus::save() {
fs::path dir = fs::path(dataDir, fs::native)/SUBDIR;
- create_directory(dir);
- saveUuid(dir/CLUSTER_ID_FILE, clusterId);
- saveUuid(dir/SHUTDOWN_ID_FILE, shutdownId);
+ try {
+ create_directory(dir);
+ saveUuid(dir/CLUSTER_ID_FILE, clusterId);
+ saveUuid(dir/SHUTDOWN_ID_FILE, shutdownId);
+ fs::ofstream os(dir/CONFIG_SEQ_FILE);
+ os << configSeq.getValue();
+ }
+ catch (const std::exception&e) {
+ throw Exception(QPID_MSG("Cannot save cluster store status: " << e.what()));
+ }
}
void StoreStatus::dirty(const Uuid& clusterId_) {
+ assert(clusterId_);
clusterId = clusterId_;
shutdownId = Uuid();
state = STORE_STATE_DIRTY_STORE;
@@ -86,22 +103,38 @@ void StoreStatus::dirty(const Uuid& clusterId_) {
}
void StoreStatus::clean(const Uuid& shutdownId_) {
+ assert(shutdownId_);
state = STORE_STATE_CLEAN_STORE;
shutdownId = shutdownId_;
save();
}
+void StoreStatus::setConfigSeq(framing::SequenceNumber seq) {
+ configSeq = seq;
+ save();
+}
+
+const char* stateName(StoreState s) {
+ switch (s) {
+ case STORE_STATE_NO_STORE: return "none";
+ case STORE_STATE_EMPTY_STORE: return "empty";
+ case STORE_STATE_DIRTY_STORE: return "dirty";
+ case STORE_STATE_CLEAN_STORE: return "clean";
+ }
+ assert(0);
+ return "unknown";
+}
+
+ostream& operator<<(ostream& o, framing::cluster::StoreState s) { return o << stateName(s); }
+
ostream& operator<<(ostream& o, const StoreStatus& s) {
- switch (s.getState()) {
- case STORE_STATE_NO_STORE: o << "no store"; break;
- case STORE_STATE_EMPTY_STORE: o << "empty store"; break;
- case STORE_STATE_DIRTY_STORE:
- o << "dirty store, cluster-id=" << s.getClusterId();
- break;
- case STORE_STATE_CLEAN_STORE:
- o << "clean store, cluster-id=" << s.getClusterId()
+ o << s.getState();
+ if (s.getState() == STORE_STATE_DIRTY_STORE)
+ o << " cluster-id=" << s.getClusterId()
+ << " config-sequence=" << s.getConfigSeq();
+ if (s.getState() == STORE_STATE_CLEAN_STORE) {
+ o << " cluster-id=" << s.getClusterId()
<< " shutdown-id=" << s.getShutdownId();
- break;
}
return o;
}
diff --git a/cpp/src/qpid/cluster/StoreStatus.h b/cpp/src/qpid/cluster/StoreStatus.h
index 522020ed69..911b3a2ba2 100644
--- a/cpp/src/qpid/cluster/StoreStatus.h
+++ b/cpp/src/qpid/cluster/StoreStatus.h
@@ -23,6 +23,7 @@
*/
#include "qpid/framing/Uuid.h"
+#include "qpid/framing/SequenceNumber.h"
#include "qpid/framing/enum.h"
#include <iosfwd>
@@ -43,9 +44,11 @@ class StoreStatus
framing::cluster::StoreState getState() const { return state; }
const Uuid& getClusterId() const { return clusterId; }
const Uuid& getShutdownId() const { return shutdownId; }
+ framing::SequenceNumber getConfigSeq() const { return configSeq; }
void dirty(const Uuid& start); // Start using the store.
void clean(const Uuid& stop); // Stop using the store.
+ void setConfigSeq(framing::SequenceNumber seq); // Update the config seq number.
void load();
void save();
@@ -56,8 +59,11 @@ class StoreStatus
framing::cluster::StoreState state;
Uuid clusterId, shutdownId;
std::string dataDir;
+ framing::SequenceNumber configSeq;
};
+const char* stateName(framing::cluster::StoreState);
+std::ostream& operator<<(std::ostream&, framing::cluster::StoreState);
std::ostream& operator<<(std::ostream&, const StoreStatus&);
}} // namespace qpid::cluster
diff --git a/cpp/src/tests/InitialStatusMap.cpp b/cpp/src/tests/InitialStatusMap.cpp
index 82450eac22..63214ee395 100644
--- a/cpp/src/tests/InitialStatusMap.cpp
+++ b/cpp/src/tests/InitialStatusMap.cpp
@@ -37,15 +37,15 @@ QPID_AUTO_TEST_SUITE(InitialStatusMapTestSuite)
typedef InitialStatusMap::Status Status;
Status activeStatus(const Uuid& id=Uuid()) {
- return Status(ProtocolVersion(), 0, true, id, STORE_STATE_NO_STORE, Uuid());
+ return Status(ProtocolVersion(), 0, true, id, STORE_STATE_NO_STORE, Uuid(), 0);
}
Status newcomerStatus(const Uuid& id=Uuid()) {
- return Status(ProtocolVersion(), 0, false, id, STORE_STATE_NO_STORE, Uuid());
+ return Status(ProtocolVersion(), 0, false, id, STORE_STATE_NO_STORE, Uuid(), 0);
}
Status storeStatus(bool active, StoreState state, Uuid start=Uuid(), Uuid stop=Uuid()) {
- return Status(ProtocolVersion(), 0, active, start, state, stop);
+ return Status(ProtocolVersion(), 0, active, start, state, stop, 0);
}
QPID_AUTO_TEST_CASE(testFirstInCluster) {
@@ -241,7 +241,6 @@ QPID_AUTO_TEST_CASE(testEmptyAlone) {
}
// FIXME aconway 2009-11-20: consistency tests for mixed stores,
-// tests for manual intervention case.
QPID_AUTO_TEST_SUITE_END()
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index 4bfbca415a..089d42fd91 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -85,6 +85,29 @@ class ShortTests(BrokerTest):
os.remove("direct.dump")
os.remove("updatee.dump")
+ def test_config_change_seq(self):
+ """Check that cluster members have the correct config change sequence numbers"""
+ cluster = self.cluster(0)
+ cluster.start()
+ cluster.start(expect=EXPECT_EXIT_OK)
+ cluster[1].terminate(); cluster[1].wait()
+ cluster.start()
+
+ update_re = re.compile(r"member update: (.*) frameSeq=[0-9]+ configSeq=([0-9]+)")
+ matches = [ update_re.search(file(b.log).read()) for b in cluster ]
+ sequences = [ m.group(2) for m in matches]
+ self.assertEqual(sequences, ["0", "1", "3"])
+
+ # Check that configurations with same seq. number match
+ configs={}
+ for b in cluster:
+ matches = update_re.findall(file(b.log).read())
+ for m in matches:
+ seq=m[1]
+ config=re.sub("\((member|unknown)\)", "", m[0])
+ if not seq in configs: configs[seq] = config
+ else: self.assertEqual(configs[seq], config)
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
@@ -234,7 +257,7 @@ class StoreTests(BrokerTest):
msg = re.compile("critical.*no clean store")
assert a.search_log(msg)
assert b.search_log(msg)
- # FIXME aconway 2009-12-03: verify correct store ID in log message
+
# FIXME aconway 2009-12-03: verify manual restore procedure