diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 109 |
1 files changed, 88 insertions, 21 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 093ca13c7a..78f7bf13fc 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -88,6 +88,7 @@ #include "ClusterSettings.h" #include "Connection.h" #include "UpdateClient.h" +#include "RetractClient.h" #include "FailoverExchange.h" #include "UpdateExchange.h" @@ -104,6 +105,7 @@ #include "qpid/framing/ClusterConfigChangeBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionAbortBody.h" +#include "qpid/framing/ClusterRetractOfferBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/framing/ClusterReadyBody.h" #include "qpid/framing/ClusterShutdownBody.h" @@ -152,6 +154,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& current) { cluster.configChange(member, current, l); } void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); } + void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); } void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } void errorCheck(uint8_t type, uint64_t frameSeq) { cluster.errorCheck(member, type, frameSeq, l); } @@ -186,6 +189,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : state(INIT), lastSize(0), lastBroker(false), + updateRetracted(false), error(*this) { mAgent = broker.getManagementAgent(); @@ -325,6 +329,12 @@ void Cluster::deliverFrame(const EventFrame& e) { deliverFrameQueue.push(e); } +const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) { + return (body && body->getMethod() && + body->getMethod()->isA<ClusterUpdateOfferBody>()) ? + static_cast<const ClusterUpdateOfferBody*>(body) : 0; +} + // Handler for deliverEventQueue. // This thread decodes frames from events. void Cluster::deliveredEvent(const Event& e) { @@ -334,8 +344,7 @@ void Cluster::deliveredEvent(const Event& e) { EventFrame ef(e, e.getFrame()); // Stop the deliverEventQueue on update offers. // This preserves the connection decoder fragments for an update. - ClusterUpdateOfferBody* offer = dynamic_cast<ClusterUpdateOfferBody*>(ef.frame.getBody()); - if (offer) + if (castUpdateOffer(ef.frame.getBody())) deliverEventQueue.stop(); deliverFrame(ef); } @@ -357,20 +366,37 @@ void Cluster::deliveredEvent(const Event& e) { QPID_LOG(trace, *this << " DROP: " << e); } -void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) { +void Cluster::flagError( + Connection& connection, ErrorCheck::ErrorType type, const std::string& msg) +{ Mutex::ScopedLock l(lock); - if (settings.checkErrors) - error.error(connection, type, map.getFrameSeq(), map.getMembers()); + if (connection.isCatchUp()) { + QPID_LOG(critical, *this << " error on update connection " << connection + << ": " << msg); + leave(l); + } + else if (settings.checkErrors) + error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg); } LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");) // Handler for deliverFrameQueue. // This thread executes the main logic. -void Cluster::deliveredFrame(const EventFrame& e) { + void Cluster::deliveredFrame(const EventFrame& efConst) { LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody())); LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody())); Mutex::ScopedLock l(lock); + EventFrame e(efConst); + const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody()); + if (offer && error.isUnresolved()) { + // We can't honour an update offer that is delivered while an + // error is in progress so replace it with a retractOffer and re-start + // the event queue. + e.frame = AMQFrame( + ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee())); + deliverEventQueue.start(); + } // Process each frame through the error checker. if (settings.checkErrors) { error.delivered(e); @@ -382,7 +408,6 @@ void Cluster::deliveredFrame(const EventFrame& e) { } } -LATENCY_TRACK(sys::LatencyStatistic processLatency("Process");) void Cluster::processFrame(const EventFrame& e, Lock& l) { if (e.isCluster()) { @@ -562,6 +587,14 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { } } +// Go back to normal processing after an offer that did not result in an update. +void Cluster::cancelOffer(const MemberId& updatee, Lock& l) { + QPID_LOG(info, *this << " cancelled offer to " << updatee); + deliverEventQueue.start(); // Go back to normal processing + setReady(l); + makeOffer(map.firstJoiner(), l); // Maybe make another offer. +} + void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) { // NOTE: deliverEventQueue has been stopped at the update offer by // deliveredEvent in case an update is required. @@ -572,12 +605,8 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu assert(state == OFFER); if (url) // My offer was first. updateStart(updatee, *url, l); - else { // Another offer was first. - deliverEventQueue.start(); // Don't need to update - setReady(l); - QPID_LOG(info, *this << " cancelled update offer to " << updatee); - makeOffer(map.firstJoiner(), l); // Maybe make another offer. - } + else // Another offer was first. + cancelOffer(updatee, l); } else if (updatee == self && url) { assert(state == JOINER); @@ -587,7 +616,34 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu checkUpdateIn(l); } else - deliverEventQueue.start(); // Don't need to update + deliverEventQueue.start(); // Not involved in update. +} + +static client::ConnectionSettings connectionSettings(const ClusterSettings& settings) { + client::ConnectionSettings cs; + cs.username = settings.username; + cs.password = settings.password; + cs.mechanism = settings.mechanism; + return cs; +} + +void Cluster::retractOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) { + // An offer was received while handling an error, and converted to a retract. + if (state == LEFT) return; + MemberId updatee(updateeInt); + boost::optional<Url> url = map.updateOffer(updater, updatee); + if (updater == self) { + assert(state == OFFER); + if (url) { // My offer was first. + QPID_LOG(info, *this << " retracted offer to " << updatee); + if (updateThread.id()) + updateThread.join(); // Join the previous updateThread to avoid leaks. + updateThread = Thread(new RetractClient(*url, connectionSettings(settings))); + } + cancelOffer(updatee, l); + } + else + deliverEventQueue.start(); // Not involved in update. } void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) { @@ -598,15 +654,12 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) { QPID_LOG(info, *this << " sending update to " << updatee << " at " << url); if (updateThread.id()) updateThread.join(); // Join the previous updateThread to avoid leaks. - client::ConnectionSettings cs; - cs.username = settings.username; - cs.password = settings.password; - cs.mechanism = settings.mechanism; updateThread = Thread( - new UpdateClient(self, updatee, url, broker, map, *expiryPolicy, getConnections(l), decoder, + new UpdateClient(self, updatee, url, broker, map, *expiryPolicy, + getConnections(l), decoder, boost::bind(&Cluster::updateOutDone, this), boost::bind(&Cluster::updateOutError, this, _1), - cs)); + connectionSettings(settings))); } // Called in update thread. @@ -616,8 +669,15 @@ void Cluster::updateInDone(const ClusterMap& m) { checkUpdateIn(l); } +void Cluster::updateInRetracted() { + Lock l(lock); + updateRetracted = true; + checkUpdateIn(l); +} + void Cluster::checkUpdateIn(Lock&) { - if (state == UPDATEE && updatedMap) { + if (state != UPDATEE) return; // Wait till we reach the stall point. + if (updatedMap) { // We're up to date map = *updatedMap; mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; @@ -625,6 +685,13 @@ void Cluster::checkUpdateIn(Lock&) { QPID_LOG(info, *this << " received update, starting catch-up"); deliverEventQueue.start(); } + else if (updateRetracted) { // Update was retracted, request another update + updateRetracted = false; + state = JOINER; + QPID_LOG(info, *this << " re-try joining after retracted update"); + mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); + deliverEventQueue.start(); + } } void Cluster::updateOutDone() { |