diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 118 |
1 files changed, 71 insertions, 47 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index d6312e7b93..9756ad0a62 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -162,13 +162,14 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {} void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } - void initialStatus(bool active, bool persistent, const framing::FieldTable& props) { - cluster.initialStatus(member, active, persistent, props); + void initialStatus(bool active, bool persistent, const Uuid& clusterId, + uint32_t version, const std::string& url) { + cluster.initialStatus(member, active, persistent, clusterId, version, url, l); } void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& current) { cluster.configChange(member, current, l); } - void updateOffer(uint64_t updatee, const Uuid& id, uint32_t version) { - cluster.updateOffer(member, updatee, id, version, l); + void updateOffer(uint64_t updatee) { + cluster.updateOffer(member, updatee, l); } void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); } void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } @@ -190,6 +191,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : name(settings.name), myUrl(settings.url.empty() ? Url() : Url(settings.url)), self(cpg.self()), + clusterId(true), expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())), mcast(cpg, poller, boost::bind(&Cluster::leave, this)), dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), @@ -206,6 +208,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : decoder(boost::bind(&Cluster::deliverFrame, this, _1)), discarding(true), state(INIT), + initMap(self), lastSize(0), lastBroker(false), updateRetracted(false), @@ -265,8 +268,8 @@ void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) { // Called in connection thread to insert an updated shadow connection. void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) { QPID_LOG(info, *this << " new shadow connection " << c->getId()); - // Safe to use connections here because we're pre-catchup, either - // discarding or stalled, so deliveredFrame is not processing any + // Safe to use connections here because we're pre-catchup, stalled + // and discarding, so deliveredFrame is not processing any // connection events. assert(discarding); pair<ConnectionMap::iterator, bool> ib @@ -522,7 +525,8 @@ void Cluster::configChange ( const cpg_address *joined, int nJoined) { Mutex::ScopedLock l(lock); - if (state == INIT) { // First config change. + if (state == INIT) { + // FIXME aconway 2009-11-16: persistent restart // Recover only if we are first in cluster. broker.setRecovery(nCurrent == 1); initialized = true; @@ -545,39 +549,55 @@ void Cluster::setReady(Lock&) { broker.getQueueEvents().enable(); } -void Cluster::configChange(const MemberId&, const std::string& current, Lock& l) { - bool memberChange = map.configChange(current); - QPID_LOG(debug, *this << " applied config change: " << map); +void Cluster::initMapCompleted(Lock& l) { + if (state == INIT) { + elders = initMap.getElders(); + if (!elders.empty()) { // I'm not the elder, I don't handle links & replication. + broker.getLinks().setPassive(true); + broker.getQueueEvents().disable(); + } + setClusterId(initMap.getClusterId(), l); + if (initMap.isUpdateNeeded()) { // Joining established cluster. + state = JOINER; + mcast.mcastControl( + ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); + } + else { // I can go ready. + QPID_LOG(notice, *this << " ready."); + discarding = false; + setReady(l); + map = ClusterMap(initMap.getMemberUrls()); + memberUpdate(l); + } + } +} + +void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& l) { if (state == LEFT) return; - - if (!map.isAlive(self)) { // Final config change. + + MemberSet config = decodeMemberSet(configStr); + elders = intersection(elders, config); + if (elders.empty() && INIT < state && state < CATCHUP) { + QPID_LOG(critical, "Cannot update, all potential updaters left the cluster."); leave(l); return; } + bool memberChange = map.configChange(config); - if (state == INIT) { // First configChange - if (map.aliveCount() == 1) { - setClusterId(true, l); - discarding = false; - setReady(l); - map = ClusterMap(self, myUrl, true); - memberUpdate(l); - QPID_LOG(notice, *this << " first in cluster"); - } - else { // Joining established group. - state = JOINER; - mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); - elders = map.getAlive(); - elders.erase(self); - broker.getLinks().setPassive(true); - broker.getQueueEvents().disable(); - } - } - else if (state >= CATCHUP && memberChange) { + // Update initital status for new members joining. + initMap.configChange(config); + if (initMap.isResendNeeded()) { + mcast.mcastControl( + // FIXME aconway 2009-11-17: persistent restart, set persistence bit. + ClusterInitialStatusBody(ProtocolVersion(), state > INIT, false, clusterId, + CLUSTER_VERSION, myUrl.str()), self); + } + if (initMap.transitionToComplete()) initMapCompleted(l); + + if (state >= CATCHUP && memberChange) { memberUpdate(l); - elders = ClusterMap::intersection(elders, map.getAlive()); if (elders.empty()) { - //assume we are oldest, reactive links if necessary + // We are the oldest, reactive links if necessary broker.getLinks().setPassive(false); } } @@ -587,8 +607,7 @@ void Cluster::makeOffer(const MemberId& id, Lock& ) { if (state == READY && map.isJoiner(id)) { state = OFFER; QPID_LOG(info, *this << " send update-offer to " << id); - mcast.mcastControl( - ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId, CLUSTER_VERSION), self); + mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id), self); } } @@ -610,10 +629,23 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) makeOffer(id, l); } -void Cluster::initialStatus(const MemberId&, bool /*active*/, bool /*persistent*/, - const framing::FieldTable&) { - // FIXME aconway 2009-11-12: fill in. +void Cluster::initialStatus(const MemberId& member, bool active, bool persistent, + const framing::Uuid& id, uint32_t version, + const std::string& url, Lock& l) +{ + if (version != CLUSTER_VERSION) { + QPID_LOG(critical, *this << " incompatible cluster versions " << + version << " != " << CLUSTER_VERSION); + leave(l); + return; + } + initMap.received( + member, + ClusterInitialStatusBody(ProtocolVersion(), active, persistent, id, version, url) + ); + if (initMap.transitionToComplete()) initMapCompleted(l); } + void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { if (map.ready(id, Url(url))) memberUpdate(l); @@ -623,17 +655,10 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { } } -void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, - uint32_t version, Lock& l) { +void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) { // NOTE: deliverEventQueue has been stopped at the update offer by // deliveredEvent in case an update is required. if (state == LEFT) return; - if (version != CLUSTER_VERSION) { - QPID_LOG(critical, *this << " incompatible cluster versions " << - version << " != " << CLUSTER_VERSION); - leave(l); - return; - } MemberId updatee(updateeInt); boost::optional<Url> url = map.updateOffer(updater, updatee); if (updater == self) { @@ -649,7 +674,6 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu } else if (updatee == self && url) { assert(state == JOINER); - setClusterId(uuid, l); state = UPDATEE; QPID_LOG(notice, *this << " receiving update from " << updater); checkUpdateIn(l); |