diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 77 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ErrorCheck.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/StoreStatus.h | 3 |
4 files changed, 50 insertions, 38 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 6bb597d21f..83ebbcc2e6 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -217,8 +217,11 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void ready(const std::string& url) { cluster.ready(member, url, l); } - void configChange(const std::string& current) { - cluster.configChange(member, current, l); + void configChange(const std::string& members, + const std::string& left, + const std::string& joined) + { + cluster.configChange(member, members, left, joined, l); } void updateOffer(uint64_t updatee) { cluster.updateOffer(member, updatee, l); @@ -554,40 +557,28 @@ Cluster::ConnectionVector Cluster::getConnections(Lock&) { boost::bind(&ConnectionMap::value_type::second, _1)); return result; } - -struct AddrList { - const cpg_address* addrs; - int count; - const char *prefix; - AddrList(const cpg_address* a, int n, const char* p="") - : addrs(a), count(n), prefix(p) {} -}; - -ostream& operator<<(ostream& o, const AddrList& a) { - if (!a.count) return o; - o << a.prefix; - for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) - o << qpid::cluster::MemberId(*p) << " "; - return o; -} +// CPG config-change callback. void Cluster::configChange ( cpg_handle_t /*handle*/, const cpg_name */*group*/, - const cpg_address *current, int nCurrent, + const cpg_address *members, int nMembers, const cpg_address *left, int nLeft, const cpg_address *joined, int nJoined) { Mutex::ScopedLock l(lock); - QPID_LOG(notice, *this << " membership change: " - << AddrList(current, nCurrent) << "(" - << AddrList(joined, nJoined, "joined: ") - << AddrList(left, nLeft, "left: ") - << ")"); - string addresses; - for (const cpg_address* p = current; p < current+nCurrent; ++p) - addresses.append(MemberId(*p).str()); - deliverEvent(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self)); + string membersStr, leftStr, joinedStr; + // Encode members and enqueue as an event so the config change can + // be executed in the correct thread. + for (const cpg_address* p = members; p < members+nMembers; ++p) + membersStr.append(MemberId(*p).str()); + for (const cpg_address* p = left; p < left+nLeft; ++p) + leftStr.append(MemberId(*p).str()); + for (const cpg_address* p = joined; p < joined+nJoined; ++p) + joinedStr.append(MemberId(*p).str()); + deliverEvent(Event::control(ClusterConfigChangeBody( + ProtocolVersion(), membersStr, leftStr, joinedStr), + self)); } void Cluster::setReady(Lock&) { @@ -654,22 +645,33 @@ void Cluster::initMapCompleted(Lock& l) { } } -void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& l) { +void Cluster::configChange(const MemberId&, + const std::string& membersStr, + const std::string& leftStr, + const std::string& joinedStr, + Lock& l) +{ if (state == LEFT) return; + MemberSet members = decodeMemberSet(membersStr); + MemberSet left = decodeMemberSet(leftStr); + MemberSet joined = decodeMemberSet(joinedStr); + QPID_LOG(notice, *this << " Membership update " << map.getConfigSeq() << ": " + << members); + QPID_LOG_IF(notice, !left.empty(), *this << " Members left: " << left); + QPID_LOG_IF(notice, !joined.empty(), *this << " Members joined: " << joined); - MemberSet config = decodeMemberSet(configStr); - elders = intersection(elders, config); + // Update initital status for members joining or leaving. + elders = intersection(elders, members); if (elders.empty() && INIT < state && state < CATCHUP) { QPID_LOG(critical, "Cannot update, all potential updaters left the cluster."); leave(l); return; } - bool memberChange = map.configChange(config); - QPID_LOG(debug, "Config sequence " << map.getConfigSeq()); + bool memberChange = map.configChange(members); store.setConfigSeq(map.getConfigSeq()); // Update initital status for members joining or leaving. - initMap.configChange(config); + initMap.configChange(members); if (initMap.isResendNeeded()) { mcast.mcastControl( ClusterInitialStatusBody( @@ -965,8 +967,11 @@ void Cluster::memberUpdate(Lock& l) { if (store.hasStore()) { // Mark store clean if I am the only broker, dirty otherwise. - if (size == 1) store.clean(Uuid(true)); - else store.dirty(clusterId); + if (size == 1 ) { + if (!store.isClean()) store.clean(Uuid(true)); + } else { + if (!store.isDirty()) store.dirty(clusterId); + } } if (size == 1 && lastSize > 1 && state >= CATCHUP) { diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 4a64ad73d6..343a66428b 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -163,7 +163,11 @@ class Cluster : private Cpg::Handler, public management::Manageable { const std::string& firstConfig, Lock&); void ready(const MemberId&, const std::string&, Lock&); - void configChange(const MemberId&, const std::string& current, Lock& l); + void configChange(const MemberId&, + const std::string& members, + const std::string& left, + const std::string& joined, + Lock& l); void messageExpired(const MemberId&, uint64_t, Lock& l); void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&); void timerWakeup(const MemberId&, const std::string& name, Lock&); diff --git a/cpp/src/qpid/cluster/ErrorCheck.cpp b/cpp/src/qpid/cluster/ErrorCheck.cpp index d66db8551d..be671c0f48 100644 --- a/cpp/src/qpid/cluster/ErrorCheck.cpp +++ b/cpp/src/qpid/cluster/ErrorCheck.cpp @@ -110,7 +110,7 @@ ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator& const ClusterConfigChangeBody* configChange = static_cast<const ClusterConfigChangeBody*>(method); if (configChange) { - MemberSet members(decodeMemberSet(configChange->getCurrent())); + MemberSet members(decodeMemberSet(configChange->getMembers())); QPID_LOG(debug, cluster << " apply config change to error " << frameSeq << ": " << members); MemberSet intersect; diff --git a/cpp/src/qpid/cluster/StoreStatus.h b/cpp/src/qpid/cluster/StoreStatus.h index 2371f0424e..b496fe0dc2 100644 --- a/cpp/src/qpid/cluster/StoreStatus.h +++ b/cpp/src/qpid/cluster/StoreStatus.h @@ -42,6 +42,9 @@ class StoreStatus StoreStatus(const std::string& dir); framing::cluster::StoreState getState() const { return state; } + bool isClean() { return state == framing::cluster::STORE_STATE_CLEAN_STORE; } + bool isDirty() { return state == framing::cluster::STORE_STATE_DIRTY_STORE; } + const Uuid& getClusterId() const { return clusterId; } const Uuid& getShutdownId() const { return shutdownId; } framing::SequenceNumber getConfigSeq() const { return configSeq; } |