diff options
author | Ted Ross <tross@apache.org> | 2010-03-30 21:48:50 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2010-03-30 21:48:50 +0000 |
commit | 8554740df378860da8e2124ea481e464208c4377 (patch) | |
tree | fa274e8eea9e37abb511933a613721d1f53036e3 /qpid/cpp/src/qpid/cluster/Cluster.cpp | |
parent | 153da67fa601a2f20da496b1982469f67ba7ea0a (diff) | |
download | qpid-python-8554740df378860da8e2124ea481e464208c4377.tar.gz |
Merged Alan's cluster changes from trunk.qmf-devel0.7a
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@929314 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 80 |
1 files changed, 44 insertions, 36 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 6bb597d21f..858900be9e 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -217,8 +217,11 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void ready(const std::string& url) { cluster.ready(member, url, l); } - void configChange(const std::string& current) { - cluster.configChange(member, current, l); + void configChange(const std::string& members, + const std::string& left, + const std::string& joined) + { + cluster.configChange(member, members, left, joined, l); } void updateOffer(uint64_t updatee) { cluster.updateOffer(member, updatee, l); @@ -316,7 +319,9 @@ void Cluster::initialize() { broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); broker.setExpiryPolicy(expiryPolicy); dispatcher.start(); + deliverEventQueue.bypassOff(); deliverEventQueue.start(); + deliverFrameQueue.bypassOff(); deliverFrameQueue.start(); mcast.start(); @@ -554,40 +559,28 @@ Cluster::ConnectionVector Cluster::getConnections(Lock&) { boost::bind(&ConnectionMap::value_type::second, _1)); return result; } - -struct AddrList { - const cpg_address* addrs; - int count; - const char *prefix; - AddrList(const cpg_address* a, int n, const char* p="") - : addrs(a), count(n), prefix(p) {} -}; - -ostream& operator<<(ostream& o, const AddrList& a) { - if (!a.count) return o; - o << a.prefix; - for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) - o << qpid::cluster::MemberId(*p) << " "; - return o; -} +// CPG config-change callback. void Cluster::configChange ( cpg_handle_t /*handle*/, const cpg_name */*group*/, - const cpg_address *current, int nCurrent, + const cpg_address *members, int nMembers, const cpg_address *left, int nLeft, const cpg_address *joined, int nJoined) { Mutex::ScopedLock l(lock); - QPID_LOG(notice, *this << " membership change: " - << AddrList(current, nCurrent) << "(" - << AddrList(joined, nJoined, "joined: ") - << AddrList(left, nLeft, "left: ") - << ")"); - string addresses; - for (const cpg_address* p = current; p < current+nCurrent; ++p) - addresses.append(MemberId(*p).str()); - deliverEvent(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self)); + string membersStr, leftStr, joinedStr; + // Encode members and enqueue as an event so the config change can + // be executed in the correct thread. + for (const cpg_address* p = members; p < members+nMembers; ++p) + membersStr.append(MemberId(*p).str()); + for (const cpg_address* p = left; p < left+nLeft; ++p) + leftStr.append(MemberId(*p).str()); + for (const cpg_address* p = joined; p < joined+nJoined; ++p) + joinedStr.append(MemberId(*p).str()); + deliverEvent(Event::control(ClusterConfigChangeBody( + ProtocolVersion(), membersStr, leftStr, joinedStr), + self)); } void Cluster::setReady(Lock&) { @@ -606,6 +599,7 @@ void Cluster::initMapCompleted(Lock& l) { // We decide here whether we want to recover from our store. // We won't recover if we are joining an active cluster or our store is dirty. if (store.hasStore() && + store.getState() != STORE_STATE_EMPTY_STORE && (initMap.isActive() || store.getState() == STORE_STATE_DIRTY_STORE)) broker.setRecovery(false); // Ditch my current store. state = INIT; @@ -654,22 +648,33 @@ void Cluster::initMapCompleted(Lock& l) { } } -void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& l) { +void Cluster::configChange(const MemberId&, + const std::string& membersStr, + const std::string& leftStr, + const std::string& joinedStr, + Lock& l) +{ if (state == LEFT) return; + MemberSet members = decodeMemberSet(membersStr); + MemberSet left = decodeMemberSet(leftStr); + MemberSet joined = decodeMemberSet(joinedStr); + QPID_LOG(notice, *this << " Membership update " << map.getConfigSeq() << ": " + << members); + QPID_LOG_IF(notice, !left.empty(), *this << " Members left: " << left); + QPID_LOG_IF(notice, !joined.empty(), *this << " Members joined: " << joined); - MemberSet config = decodeMemberSet(configStr); - elders = intersection(elders, config); + // Update initital status for members joining or leaving. + elders = intersection(elders, members); if (elders.empty() && INIT < state && state < CATCHUP) { QPID_LOG(critical, "Cannot update, all potential updaters left the cluster."); leave(l); return; } - bool memberChange = map.configChange(config); - QPID_LOG(debug, "Config sequence " << map.getConfigSeq()); + bool memberChange = map.configChange(members); store.setConfigSeq(map.getConfigSeq()); // Update initital status for members joining or leaving. - initMap.configChange(config); + initMap.configChange(members); if (initMap.isResendNeeded()) { mcast.mcastControl( ClusterInitialStatusBody( @@ -965,8 +970,11 @@ void Cluster::memberUpdate(Lock& l) { if (store.hasStore()) { // Mark store clean if I am the only broker, dirty otherwise. - if (size == 1) store.clean(Uuid(true)); - else store.dirty(clusterId); + if (size == 1 ) { + if (!store.isClean()) store.clean(Uuid(true)); + } else { + if (!store.isDirty()) store.dirty(clusterId); + } } if (size == 1 && lastSize > 1 && state >= CATCHUP) { |