summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp80
1 files changed, 44 insertions, 36 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 6bb597d21f..858900be9e 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -217,8 +217,11 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void ready(const std::string& url) {
cluster.ready(member, url, l);
}
- void configChange(const std::string& current) {
- cluster.configChange(member, current, l);
+ void configChange(const std::string& members,
+ const std::string& left,
+ const std::string& joined)
+ {
+ cluster.configChange(member, members, left, joined, l);
}
void updateOffer(uint64_t updatee) {
cluster.updateOffer(member, updatee, l);
@@ -316,7 +319,9 @@ void Cluster::initialize() {
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
+ deliverEventQueue.bypassOff();
deliverEventQueue.start();
+ deliverFrameQueue.bypassOff();
deliverFrameQueue.start();
mcast.start();
@@ -554,40 +559,28 @@ Cluster::ConnectionVector Cluster::getConnections(Lock&) {
boost::bind(&ConnectionMap::value_type::second, _1));
return result;
}
-
-struct AddrList {
- const cpg_address* addrs;
- int count;
- const char *prefix;
- AddrList(const cpg_address* a, int n, const char* p="")
- : addrs(a), count(n), prefix(p) {}
-};
-
-ostream& operator<<(ostream& o, const AddrList& a) {
- if (!a.count) return o;
- o << a.prefix;
- for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p)
- o << qpid::cluster::MemberId(*p) << " ";
- return o;
-}
+// CPG config-change callback.
void Cluster::configChange (
cpg_handle_t /*handle*/,
const cpg_name */*group*/,
- const cpg_address *current, int nCurrent,
+ const cpg_address *members, int nMembers,
const cpg_address *left, int nLeft,
const cpg_address *joined, int nJoined)
{
Mutex::ScopedLock l(lock);
- QPID_LOG(notice, *this << " membership change: "
- << AddrList(current, nCurrent) << "("
- << AddrList(joined, nJoined, "joined: ")
- << AddrList(left, nLeft, "left: ")
- << ")");
- string addresses;
- for (const cpg_address* p = current; p < current+nCurrent; ++p)
- addresses.append(MemberId(*p).str());
- deliverEvent(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self));
+ string membersStr, leftStr, joinedStr;
+ // Encode members and enqueue as an event so the config change can
+ // be executed in the correct thread.
+ for (const cpg_address* p = members; p < members+nMembers; ++p)
+ membersStr.append(MemberId(*p).str());
+ for (const cpg_address* p = left; p < left+nLeft; ++p)
+ leftStr.append(MemberId(*p).str());
+ for (const cpg_address* p = joined; p < joined+nJoined; ++p)
+ joinedStr.append(MemberId(*p).str());
+ deliverEvent(Event::control(ClusterConfigChangeBody(
+ ProtocolVersion(), membersStr, leftStr, joinedStr),
+ self));
}
void Cluster::setReady(Lock&) {
@@ -606,6 +599,7 @@ void Cluster::initMapCompleted(Lock& l) {
// We decide here whether we want to recover from our store.
// We won't recover if we are joining an active cluster or our store is dirty.
if (store.hasStore() &&
+ store.getState() != STORE_STATE_EMPTY_STORE &&
(initMap.isActive() || store.getState() == STORE_STATE_DIRTY_STORE))
broker.setRecovery(false); // Ditch my current store.
state = INIT;
@@ -654,22 +648,33 @@ void Cluster::initMapCompleted(Lock& l) {
}
}
-void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& l) {
+void Cluster::configChange(const MemberId&,
+ const std::string& membersStr,
+ const std::string& leftStr,
+ const std::string& joinedStr,
+ Lock& l)
+{
if (state == LEFT) return;
+ MemberSet members = decodeMemberSet(membersStr);
+ MemberSet left = decodeMemberSet(leftStr);
+ MemberSet joined = decodeMemberSet(joinedStr);
+ QPID_LOG(notice, *this << " Membership update " << map.getConfigSeq() << ": "
+ << members);
+ QPID_LOG_IF(notice, !left.empty(), *this << " Members left: " << left);
+ QPID_LOG_IF(notice, !joined.empty(), *this << " Members joined: " << joined);
- MemberSet config = decodeMemberSet(configStr);
- elders = intersection(elders, config);
+ // Update initital status for members joining or leaving.
+ elders = intersection(elders, members);
if (elders.empty() && INIT < state && state < CATCHUP) {
QPID_LOG(critical, "Cannot update, all potential updaters left the cluster.");
leave(l);
return;
}
- bool memberChange = map.configChange(config);
- QPID_LOG(debug, "Config sequence " << map.getConfigSeq());
+ bool memberChange = map.configChange(members);
store.setConfigSeq(map.getConfigSeq());
// Update initital status for members joining or leaving.
- initMap.configChange(config);
+ initMap.configChange(members);
if (initMap.isResendNeeded()) {
mcast.mcastControl(
ClusterInitialStatusBody(
@@ -965,8 +970,11 @@ void Cluster::memberUpdate(Lock& l) {
if (store.hasStore()) {
// Mark store clean if I am the only broker, dirty otherwise.
- if (size == 1) store.clean(Uuid(true));
- else store.dirty(clusterId);
+ if (size == 1 ) {
+ if (!store.isClean()) store.clean(Uuid(true));
+ } else {
+ if (!store.isDirty()) store.dirty(clusterId);
+ }
}
if (size == 1 && lastSize > 1 && state >= CATCHUP) {