summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp48
1 files changed, 28 insertions, 20 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 0d0fb7bcee..7dd8c7e62c 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -112,7 +112,7 @@
#include "qpid/cluster/RetractClient.h"
#include "qpid/cluster/FailoverExchange.h"
#include "qpid/cluster/UpdateExchange.h"
-#include "qpid/cluster/PeriodicTimerImpl.h"
+#include "qpid/cluster/ClusterTimer.h"
#include "qpid/assert.h"
#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
@@ -137,7 +137,7 @@
#include "qpid/framing/ClusterUpdateRequestBody.h"
#include "qpid/framing/ClusterConnectionAnnounceBody.h"
#include "qpid/framing/ClusterErrorCheckBody.h"
-#include "qpid/framing/ClusterPeriodicTimerBody.h"
+#include "qpid/framing/ClusterTimerWakeupBody.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Helpers.h"
#include "qpid/log/Statement.h"
@@ -179,7 +179,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 903171;
+const uint32_t Cluster::CLUSTER_VERSION = 904565;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -209,9 +209,8 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) {
cluster.errorCheck(member, type, frameSeq, l);
}
- void periodicTimer(const std::string& name) {
- cluster.periodicTimer(member, name, l);
- }
+ void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); }
+ void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); }
void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); }
@@ -245,6 +244,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
state(INIT),
initMap(self, settings.size),
store(broker.getDataDir().getPath()),
+ elder(false),
lastSize(0),
lastBroker(false),
updateRetracted(false),
@@ -252,8 +252,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
{
// We give ownership of the timer to the broker and keep a plain pointer.
// This is OK as it means the timer has the same lifetime as the broker.
- timer = new PeriodicTimerImpl(*this);
- broker.setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer>(timer));
+ timer = new ClusterTimer(*this);
+ broker.setClusterTimer(std::auto_ptr<sys::Timer>(timer));
mAgent = broker.getManagementAgent();
if (mAgent != 0){
@@ -577,14 +577,13 @@ void Cluster::initMapCompleted(Lock& l) {
initMap.checkConsistent();
elders = initMap.getElders();
QPID_LOG(debug, *this << " elders: " << elders);
- if (!elders.empty()) { // I'm not the elder, I don't handle links & replication.
+ if (elders.empty())
+ becomeElder();
+ else {
broker.getLinks().setPassive(true);
broker.getQueueEvents().disable();
QPID_LOG(info, *this << " not active for links.");
}
- else {
- QPID_LOG(info, this << " active for links.");
- }
setClusterId(initMap.getClusterId(), l);
if (store.hasStore()) store.dirty(clusterId);
@@ -636,14 +635,19 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock&
if (state >= CATCHUP && memberChange) {
memberUpdate(l);
- if (elders.empty()) {
- // We are the oldest, reactive links if necessary
- QPID_LOG(info, this << " becoming active for links.");
- broker.getLinks().setPassive(false);
- }
+ if (elders.empty()) becomeElder();
}
}
+void Cluster::becomeElder() {
+ if (elder) return; // We were already the elder.
+ // We are the oldest, reactive links if necessary
+ QPID_LOG(info, *this << " became the elder, active for links.");
+ elder = true;
+ broker.getLinks().setPassive(false);
+ timer->becomeElder();
+}
+
void Cluster::makeOffer(const MemberId& id, Lock& ) {
if (state == READY && map.isJoiner(id)) {
state = OFFER;
@@ -962,13 +966,17 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu
error.respondNone(from, type, frameSeq);
}
-void Cluster::periodicTimer(const MemberId&, const std::string& name, Lock&) {
- timer->deliver(name);
+void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) {
+ timer->deliverWakeup(name);
+}
+
+void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) {
+ timer->deliverDrop(name);
}
bool Cluster::isElder() const {
Monitor::ScopedLock l(lock);
- return state >= CATCHUP && elders.empty();
+ return elder;
}
}} // namespace qpid::cluster