summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp58
1 files changed, 47 insertions, 11 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 38a41c36e8..ca325dde36 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -36,6 +36,7 @@
#include "qpid/framing/ClusterConfigChangeBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
+#include "qpid/framing/ClusterErrorCheckBody.h"
#include "qpid/framing/ClusterReadyBody.h"
#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterUpdateOfferBody.h"
@@ -63,6 +64,7 @@ using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
using namespace qpid::cluster;
+using namespace qpid::framing::cluster;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
@@ -77,9 +79,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
void ready(const std::string& url) { cluster.ready(member, url, l); }
- void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); }
+ void configChange(const std::string& current) { cluster.configChange(member, current, l); }
void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); }
void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
+ void errorCheck(uint8_t type, uint64_t seq) { cluster.errorCheck(member, type, seq, l); }
void shutdown() { cluster.shutdown(member, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
@@ -112,7 +115,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
discarding(true),
state(INIT),
lastSize(0),
- lastBroker(false)
+ lastBroker(false),
+ error(*this)
{
mAgent = ManagementAgent::Singleton::getInstance();
if (mAgent != 0){
@@ -195,14 +199,19 @@ void Cluster::leave() {
leave(l);
}
+#define LEAVE_TRY(STMT) try { STMT; } \
+ catch (const std::exception& e) { \
+ QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \
+ } do {} while(0)
+
void Cluster::leave(Lock&) {
if (state != LEFT) {
state = LEFT;
QPID_LOG(notice, *this << " leaving cluster " << name);
- try { broker.shutdown(); }
- catch (const std::exception& e) {
- QPID_LOG(critical, *this << " error during broker shutdown: " << e.what());
- }
+ // Finalize connections now now to avoid problems later in destructor.
+ LEAVE_TRY(localConnections.clear());
+ LEAVE_TRY(connections.clear());
+ LEAVE_TRY(broker.shutdown());
}
}
@@ -254,10 +263,22 @@ void Cluster::deliveredEvent(const Event& e) {
QPID_LOG(trace, *this << " DROP: " << e);
}
+void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) {
+ Mutex::ScopedLock l(lock);
+ error.error(connection, type, map.getFrameSeq(), map.getMembers());
+}
+
// Handler for deliverFrameQueue.
// This thread executes the main logic.
void Cluster::deliveredFrame(const EventFrame& e) {
Mutex::ScopedLock l(lock);
+ // Process each frame through the error checker.
+ error.delivered(e);
+ while (error.canProcess()) // There is a frame ready to process.
+ processFrame(error.getNext(), l);
+}
+
+void Cluster::processFrame(const EventFrame& e, Lock& l) {
if (e.isCluster()) {
QPID_LOG(trace, *this << " DLVR: " << e);
ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
@@ -265,7 +286,8 @@ void Cluster::deliveredFrame(const EventFrame& e) {
throw Exception(QPID_MSG("Invalid cluster control"));
}
else if (state >= CATCHUP) {
- QPID_LOG(trace, *this << " DLVR: " << e);
+ map.incrementFrameSeq();
+ QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e);
ConnectionPtr connection = getConnection(e.connectionId, l);
if (connection)
connection->deliveredFrame(e);
@@ -357,8 +379,8 @@ void Cluster::setReady(Lock&) {
broker.getQueueEvents().enable();
}
-void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) {
- bool memberChange = map.configChange(addresses);
+void Cluster::configChange(const MemberId&, const std::string& current, Lock& l) {
+ bool memberChange = map.configChange(current);
if (state == LEFT) return;
if (!map.isAlive(self)) { // Final config change.
@@ -600,8 +622,13 @@ void Cluster::memberUpdate(Lock& l) {
}
std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
- static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" };
- return o << cluster.self << "(" << STATE[cluster.state] << ")";
+ static const char* STATE[] = {
+ "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT"
+ };
+ assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
+ o << cluster.self << "(" << STATE[cluster.state];
+ if (cluster.error.isUnresolved()) o << "/error";
+ return o << ")";
}
MemberId Cluster::getId() const {
@@ -635,4 +662,13 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
expiryPolicy->deliverExpire(id);
}
+void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock&) {
+ // If we receive an errorCheck here, it's because we have processed past the point
+ // of the error so respond with ERROR_TYPE_NONE
+ assert(map.getFrameSeq() >= frameSeq);
+ if (type != framing::cluster::ERROR_TYPE_NONE) // Don't respond if its already NONE.
+ mcast.mcastControl(
+ ClusterErrorCheckBody(ProtocolVersion(), framing::cluster::ERROR_TYPE_NONE, frameSeq), self);
+}
+
}} // namespace qpid::cluster