summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp109
1 files changed, 88 insertions, 21 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 093ca13c7a..78f7bf13fc 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -88,6 +88,7 @@
#include "ClusterSettings.h"
#include "Connection.h"
#include "UpdateClient.h"
+#include "RetractClient.h"
#include "FailoverExchange.h"
#include "UpdateExchange.h"
@@ -104,6 +105,7 @@
#include "qpid/framing/ClusterConfigChangeBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionAbortBody.h"
+#include "qpid/framing/ClusterRetractOfferBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/framing/ClusterReadyBody.h"
#include "qpid/framing/ClusterShutdownBody.h"
@@ -152,6 +154,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& current) { cluster.configChange(member, current, l); }
void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); }
+ void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
void errorCheck(uint8_t type, uint64_t frameSeq) { cluster.errorCheck(member, type, frameSeq, l); }
@@ -186,6 +189,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
state(INIT),
lastSize(0),
lastBroker(false),
+ updateRetracted(false),
error(*this)
{
mAgent = broker.getManagementAgent();
@@ -325,6 +329,12 @@ void Cluster::deliverFrame(const EventFrame& e) {
deliverFrameQueue.push(e);
}
+const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) {
+ return (body && body->getMethod() &&
+ body->getMethod()->isA<ClusterUpdateOfferBody>()) ?
+ static_cast<const ClusterUpdateOfferBody*>(body) : 0;
+}
+
// Handler for deliverEventQueue.
// This thread decodes frames from events.
void Cluster::deliveredEvent(const Event& e) {
@@ -334,8 +344,7 @@ void Cluster::deliveredEvent(const Event& e) {
EventFrame ef(e, e.getFrame());
// Stop the deliverEventQueue on update offers.
// This preserves the connection decoder fragments for an update.
- ClusterUpdateOfferBody* offer = dynamic_cast<ClusterUpdateOfferBody*>(ef.frame.getBody());
- if (offer)
+ if (castUpdateOffer(ef.frame.getBody()))
deliverEventQueue.stop();
deliverFrame(ef);
}
@@ -357,20 +366,37 @@ void Cluster::deliveredEvent(const Event& e) {
QPID_LOG(trace, *this << " DROP: " << e);
}
-void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) {
+void Cluster::flagError(
+ Connection& connection, ErrorCheck::ErrorType type, const std::string& msg)
+{
Mutex::ScopedLock l(lock);
- if (settings.checkErrors)
- error.error(connection, type, map.getFrameSeq(), map.getMembers());
+ if (connection.isCatchUp()) {
+ QPID_LOG(critical, *this << " error on update connection " << connection
+ << ": " << msg);
+ leave(l);
+ }
+ else if (settings.checkErrors)
+ error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg);
}
LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");)
// Handler for deliverFrameQueue.
// This thread executes the main logic.
-void Cluster::deliveredFrame(const EventFrame& e) {
+ void Cluster::deliveredFrame(const EventFrame& efConst) {
LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody()));
LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody()));
Mutex::ScopedLock l(lock);
+ EventFrame e(efConst);
+ const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody());
+ if (offer && error.isUnresolved()) {
+ // We can't honour an update offer that is delivered while an
+ // error is in progress so replace it with a retractOffer and re-start
+ // the event queue.
+ e.frame = AMQFrame(
+ ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee()));
+ deliverEventQueue.start();
+ }
// Process each frame through the error checker.
if (settings.checkErrors) {
error.delivered(e);
@@ -382,7 +408,6 @@ void Cluster::deliveredFrame(const EventFrame& e) {
}
}
-LATENCY_TRACK(sys::LatencyStatistic processLatency("Process");)
void Cluster::processFrame(const EventFrame& e, Lock& l) {
if (e.isCluster()) {
@@ -562,6 +587,14 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
}
}
+// Go back to normal processing after an offer that did not result in an update.
+void Cluster::cancelOffer(const MemberId& updatee, Lock& l) {
+ QPID_LOG(info, *this << " cancelled offer to " << updatee);
+ deliverEventQueue.start(); // Go back to normal processing
+ setReady(l);
+ makeOffer(map.firstJoiner(), l); // Maybe make another offer.
+}
+
void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) {
// NOTE: deliverEventQueue has been stopped at the update offer by
// deliveredEvent in case an update is required.
@@ -572,12 +605,8 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu
assert(state == OFFER);
if (url) // My offer was first.
updateStart(updatee, *url, l);
- else { // Another offer was first.
- deliverEventQueue.start(); // Don't need to update
- setReady(l);
- QPID_LOG(info, *this << " cancelled update offer to " << updatee);
- makeOffer(map.firstJoiner(), l); // Maybe make another offer.
- }
+ else // Another offer was first.
+ cancelOffer(updatee, l);
}
else if (updatee == self && url) {
assert(state == JOINER);
@@ -587,7 +616,34 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu
checkUpdateIn(l);
}
else
- deliverEventQueue.start(); // Don't need to update
+ deliverEventQueue.start(); // Not involved in update.
+}
+
+static client::ConnectionSettings connectionSettings(const ClusterSettings& settings) {
+ client::ConnectionSettings cs;
+ cs.username = settings.username;
+ cs.password = settings.password;
+ cs.mechanism = settings.mechanism;
+ return cs;
+}
+
+void Cluster::retractOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) {
+ // An offer was received while handling an error, and converted to a retract.
+ if (state == LEFT) return;
+ MemberId updatee(updateeInt);
+ boost::optional<Url> url = map.updateOffer(updater, updatee);
+ if (updater == self) {
+ assert(state == OFFER);
+ if (url) { // My offer was first.
+ QPID_LOG(info, *this << " retracted offer to " << updatee);
+ if (updateThread.id())
+ updateThread.join(); // Join the previous updateThread to avoid leaks.
+ updateThread = Thread(new RetractClient(*url, connectionSettings(settings)));
+ }
+ cancelOffer(updatee, l);
+ }
+ else
+ deliverEventQueue.start(); // Not involved in update.
}
void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
@@ -598,15 +654,12 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
QPID_LOG(info, *this << " sending update to " << updatee << " at " << url);
if (updateThread.id())
updateThread.join(); // Join the previous updateThread to avoid leaks.
- client::ConnectionSettings cs;
- cs.username = settings.username;
- cs.password = settings.password;
- cs.mechanism = settings.mechanism;
updateThread = Thread(
- new UpdateClient(self, updatee, url, broker, map, *expiryPolicy, getConnections(l), decoder,
+ new UpdateClient(self, updatee, url, broker, map, *expiryPolicy,
+ getConnections(l), decoder,
boost::bind(&Cluster::updateOutDone, this),
boost::bind(&Cluster::updateOutError, this, _1),
- cs));
+ connectionSettings(settings)));
}
// Called in update thread.
@@ -616,8 +669,15 @@ void Cluster::updateInDone(const ClusterMap& m) {
checkUpdateIn(l);
}
+void Cluster::updateInRetracted() {
+ Lock l(lock);
+ updateRetracted = true;
+ checkUpdateIn(l);
+}
+
void Cluster::checkUpdateIn(Lock&) {
- if (state == UPDATEE && updatedMap) {
+ if (state != UPDATEE) return; // Wait till we reach the stall point.
+ if (updatedMap) { // We're up to date
map = *updatedMap;
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
@@ -625,6 +685,13 @@ void Cluster::checkUpdateIn(Lock&) {
QPID_LOG(info, *this << " received update, starting catch-up");
deliverEventQueue.start();
}
+ else if (updateRetracted) { // Update was retracted, request another update
+ updateRetracted = false;
+ state = JOINER;
+ QPID_LOG(info, *this << " re-try joining after retracted update");
+ mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
+ deliverEventQueue.start();
+ }
}
void Cluster::updateOutDone() {