diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 63 |
1 files changed, 54 insertions, 9 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index ede8d96681..82ed8bf8c9 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -146,6 +146,7 @@ #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" #include "qpid/framing/ClusterConfigChangeBody.h" +#include "qpid/framing/ClusterClockBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionAbortBody.h" #include "qpid/framing/ClusterRetractOfferBody.h" @@ -198,7 +199,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 1097431; +const uint32_t Cluster::CLUSTER_VERSION = 1128070; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -230,7 +231,6 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { cluster.updateOffer(member, updatee, l); } void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); } - void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) { cluster.errorCheck(member, type, frameSeq, l); } @@ -240,6 +240,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void deliverToQueue(const std::string& queue, const std::string& message) { cluster.deliverToQueue(queue, message, l); } + void clock(uint64_t time) { cluster.clock(time, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } }; @@ -253,7 +254,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : self(cpg.self()), clusterId(true), mAgent(0), - expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())), + expiryPolicy(new ExpiryPolicy(*this)), mcast(cpg, poller, boost::bind(&Cluster::leave, this)), dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1), @@ -668,6 +669,8 @@ void Cluster::initMapCompleted(Lock& l) { else { // I can go ready. discarding = false; setReady(l); + // Must be called *before* memberUpdate so first update will be generated. + failoverExchange->setReady(); memberUpdate(l); updateMgmtMembership(l); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); @@ -720,6 +723,20 @@ void Cluster::configChange(const MemberId&, updateMgmtMembership(l); // Update on every config change for consistency } +struct ClusterClockTask : public sys::TimerTask { + Cluster& cluster; + sys::Timer& timer; + + ClusterClockTask(Cluster& cluster, sys::Timer& timer, uint16_t clockInterval) + : TimerTask(Duration(clockInterval * TIME_MSEC),"ClusterClock"), cluster(cluster), timer(timer) {} + + void fire() { + cluster.sendClockUpdate(); + setupNextFire(); + timer.add(this); + } +}; + void Cluster::becomeElder(Lock&) { if (elder) return; // We were already the elder. // We are the oldest, reactive links if necessary @@ -727,6 +744,8 @@ void Cluster::becomeElder(Lock&) { elder = true; broker.getLinks().setPassive(false); timer->becomeElder(); + + clockTimer.add(new ClusterClockTask(*this, clockTimer, settings.clockInterval)); } void Cluster::makeOffer(const MemberId& id, Lock& ) { @@ -847,7 +866,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) if (updatee != self && url) { QPID_LOG(debug, debugSnapshot()); if (mAgent) mAgent->clusterUpdate(); - // Updatee will call clusterUpdate when update completes + // Updatee will call clusterUpdate() via checkUpdateIn() when update completes } } @@ -928,10 +947,11 @@ void Cluster::checkUpdateIn(Lock& l) { if (!updateClosed) return; // Wait till update connection closes. if (updatedMap) { // We're up to date map = *updatedMap; - failoverExchange->setUrls(getUrls(l)); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; memberUpdate(l); + // Must be called *after* memberUpdate() to avoid sending an extra update. + failoverExchange->setReady(); // NB: don't updateMgmtMembership() here as we are not in the deliver // thread. It will be updated on delivery of the "ready" we just mcast. broker.setClusterUpdatee(false); @@ -1121,10 +1141,6 @@ void Cluster::setClusterId(const Uuid& uuid, Lock&) { QPID_LOG(notice, *this << " cluster-uuid = " << clusterId); } -void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { - expiryPolicy->deliverExpire(id); -} - void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) { // If we see an errorCheck here (rather than in the ErrorCheck // class) then we have processed succesfully past the point of the @@ -1162,6 +1178,35 @@ void Cluster::deliverToQueue(const std::string& queue, const std::string& messag q->deliver(msg); } +sys::AbsTime Cluster::getClusterTime() { + Mutex::ScopedLock l(lock); + return clusterTime; +} + +// This method is called during update on the updatee to set the initial cluster time. +void Cluster::clock(const uint64_t time) { + Mutex::ScopedLock l(lock); + clock(time, l); +} + +// called when broadcast message received +void Cluster::clock(const uint64_t time, Lock&) { + clusterTime = AbsTime(EPOCH, time); + AbsTime now = AbsTime::now(); + + if (!elder) { + clusterTimeOffset = Duration(now, clusterTime); + } +} + +// called by elder timer to send clock broadcast +void Cluster::sendClockUpdate() { + Mutex::ScopedLock l(lock); + int64_t nanosecondsSinceEpoch = Duration(EPOCH, now()); + nanosecondsSinceEpoch += clusterTimeOffset; + mcast.mcastControl(ClusterClockBody(ProtocolVersion(), nanosecondsSinceEpoch), self); +} + bool Cluster::deferDeliveryImpl(const std::string& queue, const boost::intrusive_ptr<broker::Message>& msg) { |