summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp77
-rw-r--r--cpp/src/qpid/cluster/Cluster.h6
-rw-r--r--cpp/src/qpid/cluster/ErrorCheck.cpp2
-rw-r--r--cpp/src/qpid/cluster/StoreStatus.h3
4 files changed, 50 insertions, 38 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 6bb597d21f..83ebbcc2e6 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -217,8 +217,11 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void ready(const std::string& url) {
cluster.ready(member, url, l);
}
- void configChange(const std::string& current) {
- cluster.configChange(member, current, l);
+ void configChange(const std::string& members,
+ const std::string& left,
+ const std::string& joined)
+ {
+ cluster.configChange(member, members, left, joined, l);
}
void updateOffer(uint64_t updatee) {
cluster.updateOffer(member, updatee, l);
@@ -554,40 +557,28 @@ Cluster::ConnectionVector Cluster::getConnections(Lock&) {
boost::bind(&ConnectionMap::value_type::second, _1));
return result;
}
-
-struct AddrList {
- const cpg_address* addrs;
- int count;
- const char *prefix;
- AddrList(const cpg_address* a, int n, const char* p="")
- : addrs(a), count(n), prefix(p) {}
-};
-
-ostream& operator<<(ostream& o, const AddrList& a) {
- if (!a.count) return o;
- o << a.prefix;
- for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p)
- o << qpid::cluster::MemberId(*p) << " ";
- return o;
-}
+// CPG config-change callback.
void Cluster::configChange (
cpg_handle_t /*handle*/,
const cpg_name */*group*/,
- const cpg_address *current, int nCurrent,
+ const cpg_address *members, int nMembers,
const cpg_address *left, int nLeft,
const cpg_address *joined, int nJoined)
{
Mutex::ScopedLock l(lock);
- QPID_LOG(notice, *this << " membership change: "
- << AddrList(current, nCurrent) << "("
- << AddrList(joined, nJoined, "joined: ")
- << AddrList(left, nLeft, "left: ")
- << ")");
- string addresses;
- for (const cpg_address* p = current; p < current+nCurrent; ++p)
- addresses.append(MemberId(*p).str());
- deliverEvent(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self));
+ string membersStr, leftStr, joinedStr;
+ // Encode members and enqueue as an event so the config change can
+ // be executed in the correct thread.
+ for (const cpg_address* p = members; p < members+nMembers; ++p)
+ membersStr.append(MemberId(*p).str());
+ for (const cpg_address* p = left; p < left+nLeft; ++p)
+ leftStr.append(MemberId(*p).str());
+ for (const cpg_address* p = joined; p < joined+nJoined; ++p)
+ joinedStr.append(MemberId(*p).str());
+ deliverEvent(Event::control(ClusterConfigChangeBody(
+ ProtocolVersion(), membersStr, leftStr, joinedStr),
+ self));
}
void Cluster::setReady(Lock&) {
@@ -654,22 +645,33 @@ void Cluster::initMapCompleted(Lock& l) {
}
}
-void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& l) {
+void Cluster::configChange(const MemberId&,
+ const std::string& membersStr,
+ const std::string& leftStr,
+ const std::string& joinedStr,
+ Lock& l)
+{
if (state == LEFT) return;
+ MemberSet members = decodeMemberSet(membersStr);
+ MemberSet left = decodeMemberSet(leftStr);
+ MemberSet joined = decodeMemberSet(joinedStr);
+ QPID_LOG(notice, *this << " Membership update " << map.getConfigSeq() << ": "
+ << members);
+ QPID_LOG_IF(notice, !left.empty(), *this << " Members left: " << left);
+ QPID_LOG_IF(notice, !joined.empty(), *this << " Members joined: " << joined);
- MemberSet config = decodeMemberSet(configStr);
- elders = intersection(elders, config);
+ // Update initital status for members joining or leaving.
+ elders = intersection(elders, members);
if (elders.empty() && INIT < state && state < CATCHUP) {
QPID_LOG(critical, "Cannot update, all potential updaters left the cluster.");
leave(l);
return;
}
- bool memberChange = map.configChange(config);
- QPID_LOG(debug, "Config sequence " << map.getConfigSeq());
+ bool memberChange = map.configChange(members);
store.setConfigSeq(map.getConfigSeq());
// Update initital status for members joining or leaving.
- initMap.configChange(config);
+ initMap.configChange(members);
if (initMap.isResendNeeded()) {
mcast.mcastControl(
ClusterInitialStatusBody(
@@ -965,8 +967,11 @@ void Cluster::memberUpdate(Lock& l) {
if (store.hasStore()) {
// Mark store clean if I am the only broker, dirty otherwise.
- if (size == 1) store.clean(Uuid(true));
- else store.dirty(clusterId);
+ if (size == 1 ) {
+ if (!store.isClean()) store.clean(Uuid(true));
+ } else {
+ if (!store.isDirty()) store.dirty(clusterId);
+ }
}
if (size == 1 && lastSize > 1 && state >= CATCHUP) {
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 4a64ad73d6..343a66428b 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -163,7 +163,11 @@ class Cluster : private Cpg::Handler, public management::Manageable {
const std::string& firstConfig,
Lock&);
void ready(const MemberId&, const std::string&, Lock&);
- void configChange(const MemberId&, const std::string& current, Lock& l);
+ void configChange(const MemberId&,
+ const std::string& members,
+ const std::string& left,
+ const std::string& joined,
+ Lock& l);
void messageExpired(const MemberId&, uint64_t, Lock& l);
void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
void timerWakeup(const MemberId&, const std::string& name, Lock&);
diff --git a/cpp/src/qpid/cluster/ErrorCheck.cpp b/cpp/src/qpid/cluster/ErrorCheck.cpp
index d66db8551d..be671c0f48 100644
--- a/cpp/src/qpid/cluster/ErrorCheck.cpp
+++ b/cpp/src/qpid/cluster/ErrorCheck.cpp
@@ -110,7 +110,7 @@ ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator&
const ClusterConfigChangeBody* configChange =
static_cast<const ClusterConfigChangeBody*>(method);
if (configChange) {
- MemberSet members(decodeMemberSet(configChange->getCurrent()));
+ MemberSet members(decodeMemberSet(configChange->getMembers()));
QPID_LOG(debug, cluster << " apply config change to error "
<< frameSeq << ": " << members);
MemberSet intersect;
diff --git a/cpp/src/qpid/cluster/StoreStatus.h b/cpp/src/qpid/cluster/StoreStatus.h
index 2371f0424e..b496fe0dc2 100644
--- a/cpp/src/qpid/cluster/StoreStatus.h
+++ b/cpp/src/qpid/cluster/StoreStatus.h
@@ -42,6 +42,9 @@ class StoreStatus
StoreStatus(const std::string& dir);
framing::cluster::StoreState getState() const { return state; }
+ bool isClean() { return state == framing::cluster::STORE_STATE_CLEAN_STORE; }
+ bool isDirty() { return state == framing::cluster::STORE_STATE_DIRTY_STORE; }
+
const Uuid& getClusterId() const { return clusterId; }
const Uuid& getShutdownId() const { return shutdownId; }
framing::SequenceNumber getConfigSeq() const { return configSeq; }