diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 80 |
1 files changed, 44 insertions, 36 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 6bb597d21f..858900be9e 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -217,8 +217,11 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void ready(const std::string& url) { cluster.ready(member, url, l); } - void configChange(const std::string& current) { - cluster.configChange(member, current, l); + void configChange(const std::string& members, + const std::string& left, + const std::string& joined) + { + cluster.configChange(member, members, left, joined, l); } void updateOffer(uint64_t updatee) { cluster.updateOffer(member, updatee, l); @@ -316,7 +319,9 @@ void Cluster::initialize() { broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); broker.setExpiryPolicy(expiryPolicy); dispatcher.start(); + deliverEventQueue.bypassOff(); deliverEventQueue.start(); + deliverFrameQueue.bypassOff(); deliverFrameQueue.start(); mcast.start(); @@ -554,40 +559,28 @@ Cluster::ConnectionVector Cluster::getConnections(Lock&) { boost::bind(&ConnectionMap::value_type::second, _1)); return result; } - -struct AddrList { - const cpg_address* addrs; - int count; - const char *prefix; - AddrList(const cpg_address* a, int n, const char* p="") - : addrs(a), count(n), prefix(p) {} -}; - -ostream& operator<<(ostream& o, const AddrList& a) { - if (!a.count) return o; - o << a.prefix; - for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) - o << qpid::cluster::MemberId(*p) << " "; - return o; -} +// CPG config-change callback. void Cluster::configChange ( cpg_handle_t /*handle*/, const cpg_name */*group*/, - const cpg_address *current, int nCurrent, + const cpg_address *members, int nMembers, const cpg_address *left, int nLeft, const cpg_address *joined, int nJoined) { Mutex::ScopedLock l(lock); - QPID_LOG(notice, *this << " membership change: " - << AddrList(current, nCurrent) << "(" - << AddrList(joined, nJoined, "joined: ") - << AddrList(left, nLeft, "left: ") - << ")"); - string addresses; - for (const cpg_address* p = current; p < current+nCurrent; ++p) - addresses.append(MemberId(*p).str()); - deliverEvent(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self)); + string membersStr, leftStr, joinedStr; + // Encode members and enqueue as an event so the config change can + // be executed in the correct thread. + for (const cpg_address* p = members; p < members+nMembers; ++p) + membersStr.append(MemberId(*p).str()); + for (const cpg_address* p = left; p < left+nLeft; ++p) + leftStr.append(MemberId(*p).str()); + for (const cpg_address* p = joined; p < joined+nJoined; ++p) + joinedStr.append(MemberId(*p).str()); + deliverEvent(Event::control(ClusterConfigChangeBody( + ProtocolVersion(), membersStr, leftStr, joinedStr), + self)); } void Cluster::setReady(Lock&) { @@ -606,6 +599,7 @@ void Cluster::initMapCompleted(Lock& l) { // We decide here whether we want to recover from our store. // We won't recover if we are joining an active cluster or our store is dirty. if (store.hasStore() && + store.getState() != STORE_STATE_EMPTY_STORE && (initMap.isActive() || store.getState() == STORE_STATE_DIRTY_STORE)) broker.setRecovery(false); // Ditch my current store. state = INIT; @@ -654,22 +648,33 @@ void Cluster::initMapCompleted(Lock& l) { } } -void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& l) { +void Cluster::configChange(const MemberId&, + const std::string& membersStr, + const std::string& leftStr, + const std::string& joinedStr, + Lock& l) +{ if (state == LEFT) return; + MemberSet members = decodeMemberSet(membersStr); + MemberSet left = decodeMemberSet(leftStr); + MemberSet joined = decodeMemberSet(joinedStr); + QPID_LOG(notice, *this << " Membership update " << map.getConfigSeq() << ": " + << members); + QPID_LOG_IF(notice, !left.empty(), *this << " Members left: " << left); + QPID_LOG_IF(notice, !joined.empty(), *this << " Members joined: " << joined); - MemberSet config = decodeMemberSet(configStr); - elders = intersection(elders, config); + // Update initital status for members joining or leaving. + elders = intersection(elders, members); if (elders.empty() && INIT < state && state < CATCHUP) { QPID_LOG(critical, "Cannot update, all potential updaters left the cluster."); leave(l); return; } - bool memberChange = map.configChange(config); - QPID_LOG(debug, "Config sequence " << map.getConfigSeq()); + bool memberChange = map.configChange(members); store.setConfigSeq(map.getConfigSeq()); // Update initital status for members joining or leaving. - initMap.configChange(config); + initMap.configChange(members); if (initMap.isResendNeeded()) { mcast.mcastControl( ClusterInitialStatusBody( @@ -965,8 +970,11 @@ void Cluster::memberUpdate(Lock& l) { if (store.hasStore()) { // Mark store clean if I am the only broker, dirty otherwise. - if (size == 1) store.clean(Uuid(true)); - else store.dirty(clusterId); + if (size == 1 ) { + if (!store.isClean()) store.clean(Uuid(true)); + } else { + if (!store.isDirty()) store.dirty(clusterId); + } } if (size == 1 && lastSize > 1 && state >= CATCHUP) { |