diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 48 |
1 files changed, 28 insertions, 20 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 0d0fb7bcee..7dd8c7e62c 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -112,7 +112,7 @@ #include "qpid/cluster/RetractClient.h" #include "qpid/cluster/FailoverExchange.h" #include "qpid/cluster/UpdateExchange.h" -#include "qpid/cluster/PeriodicTimerImpl.h" +#include "qpid/cluster/ClusterTimer.h" #include "qpid/assert.h" #include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" @@ -137,7 +137,7 @@ #include "qpid/framing/ClusterUpdateRequestBody.h" #include "qpid/framing/ClusterConnectionAnnounceBody.h" #include "qpid/framing/ClusterErrorCheckBody.h" -#include "qpid/framing/ClusterPeriodicTimerBody.h" +#include "qpid/framing/ClusterTimerWakeupBody.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Helpers.h" #include "qpid/log/Statement.h" @@ -179,7 +179,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 903171; +const uint32_t Cluster::CLUSTER_VERSION = 904565; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -209,9 +209,8 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) { cluster.errorCheck(member, type, frameSeq, l); } - void periodicTimer(const std::string& name) { - cluster.periodicTimer(member, name, l); - } + void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); } + void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); } void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); } @@ -245,6 +244,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : state(INIT), initMap(self, settings.size), store(broker.getDataDir().getPath()), + elder(false), lastSize(0), lastBroker(false), updateRetracted(false), @@ -252,8 +252,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : { // We give ownership of the timer to the broker and keep a plain pointer. // This is OK as it means the timer has the same lifetime as the broker. - timer = new PeriodicTimerImpl(*this); - broker.setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer>(timer)); + timer = new ClusterTimer(*this); + broker.setClusterTimer(std::auto_ptr<sys::Timer>(timer)); mAgent = broker.getManagementAgent(); if (mAgent != 0){ @@ -577,14 +577,13 @@ void Cluster::initMapCompleted(Lock& l) { initMap.checkConsistent(); elders = initMap.getElders(); QPID_LOG(debug, *this << " elders: " << elders); - if (!elders.empty()) { // I'm not the elder, I don't handle links & replication. + if (elders.empty()) + becomeElder(); + else { broker.getLinks().setPassive(true); broker.getQueueEvents().disable(); QPID_LOG(info, *this << " not active for links."); } - else { - QPID_LOG(info, this << " active for links."); - } setClusterId(initMap.getClusterId(), l); if (store.hasStore()) store.dirty(clusterId); @@ -636,14 +635,19 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& if (state >= CATCHUP && memberChange) { memberUpdate(l); - if (elders.empty()) { - // We are the oldest, reactive links if necessary - QPID_LOG(info, this << " becoming active for links."); - broker.getLinks().setPassive(false); - } + if (elders.empty()) becomeElder(); } } +void Cluster::becomeElder() { + if (elder) return; // We were already the elder. + // We are the oldest, reactive links if necessary + QPID_LOG(info, *this << " became the elder, active for links."); + elder = true; + broker.getLinks().setPassive(false); + timer->becomeElder(); +} + void Cluster::makeOffer(const MemberId& id, Lock& ) { if (state == READY && map.isJoiner(id)) { state = OFFER; @@ -962,13 +966,17 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu error.respondNone(from, type, frameSeq); } -void Cluster::periodicTimer(const MemberId&, const std::string& name, Lock&) { - timer->deliver(name); +void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) { + timer->deliverWakeup(name); +} + +void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) { + timer->deliverDrop(name); } bool Cluster::isElder() const { Monitor::ScopedLock l(lock); - return state >= CATCHUP && elders.empty(); + return elder; } }} // namespace qpid::cluster |