diff options
author | Alan Conway <aconway@apache.org> | 2009-02-27 19:34:47 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-02-27 19:34:47 +0000 |
commit | 7c79adf16acfeb31cd2b90699c456698237a2e82 (patch) | |
tree | 743738de6f0d5448ebc8280d45976511de8c9ad4 /cpp/src | |
parent | 739e6e1c66341b6b8dbda6776ada8179765ed347 (diff) | |
download | qpid-python-7c79adf16acfeb31cd2b90699c456698237a2e82.tar.gz |
cluster: apply membership updates while in CATCHUP mode.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@748651 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 17 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 4 |
2 files changed, 11 insertions, 10 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 6b4cd0256c..312d1e90e3 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -234,9 +234,9 @@ void Cluster::deliveredEvent(const Event& e) { // Check for deliver close here so we can erase the // connection decoder safely in this thread. if (frame.getMethod()->isA<ClusterConnectionDeliverCloseBody>()) - decoder.erase(e.getConnectionId()); + decoder.erase(e.getConnectionId()); deliverFrameQueue.push(EventFrame(e, frame)); - } + } } else if (e.getType() == DATA) decoder.decode(e, e.getData()); @@ -345,7 +345,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& broker.getLinks().setPassive(true); } } - else if (state >= READY && memberChange) { + else if (state >= CATCHUP && memberChange) { memberUpdate(l); elders = ClusterMap::intersection(elders, map.getAlive()); if (elders.empty()) { @@ -357,7 +357,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& bool Cluster::isLeader() const { return elders.empty(); } -void Cluster::tryMakeOffer(const MemberId& id, Lock& ) { +void Cluster::makeOffer(const MemberId& id, Lock& ) { if (state == READY && map.isJoiner(id)) { state = OFFER; QPID_LOG(info, *this << " send update-offer to " << id); @@ -382,7 +382,7 @@ void Cluster::brokerShutdown() { void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) { map.updateRequest(id, url); - tryMakeOffer(id, l); + makeOffer(id, l); } void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { @@ -406,7 +406,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu else { // Another offer was first. setReady(l); QPID_LOG(info, *this << " cancelled update offer to " << updatee); - tryMakeOffer(map.firstJoiner(), l); // Maybe make another offer. + makeOffer(map.firstJoiner(), l); // Maybe make another offer. } } else if (updatee == myId && url) { @@ -446,7 +446,6 @@ void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) { } void Cluster::checkUpdateIn(Lock& ) { - if (state == LEFT) return; if (state == UPDATEE && updatedMap) { map = *updatedMap; mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId); @@ -467,7 +466,7 @@ void Cluster::updateOutDone(Lock& l) { state = READY; mcast.release(); deliverFrameQueue.start(); - tryMakeOffer(map.firstJoiner(), l); // Try another offer + makeOffer(map.firstJoiner(), l); // Try another offer } void Cluster::updateOutError(const std::exception& e) { @@ -522,7 +521,7 @@ void Cluster::memberUpdate(Lock& l) { size_t size = urls.size(); failoverExchange->setUrls(urls); - if (size == 1 && lastSize > 1 && state >= READY) { + if (size == 1 && lastSize > 1 && state >= CATCHUP) { QPID_LOG(info, *this << " last broker standing, update queue policies"); lastBroker = true; broker.getQueues().updateQueueClusterState(true); diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index e3e259dac4..ea472a9ecf 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -119,7 +119,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { std::vector<Url> getUrls(Lock&) const; // Make an offer if we can - called in deliver thread. - void tryMakeOffer(const MemberId&, Lock&); + void makeOffer(const MemberId&, Lock&); // Called in main thread in ~Broker. void brokerShutdown(); @@ -133,6 +133,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { void configChange(const MemberId&, const std::string& addresses, Lock& l); void messageExpired(const MemberId&, uint64_t, Lock& l); void shutdown(const MemberId&, Lock&); + + // Handlers for pollable queues. void deliveredEvent(const Event&); void deliveredFrame(const EventFrame&); |