summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp79
1 files changed, 56 insertions, 23 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index f8e412f1e6..a17f54078c 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -36,6 +36,7 @@
#include "qpid/framing/ClusterConfigChangeBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
+#include "qpid/framing/ClusterErrorCheckBody.h"
#include "qpid/framing/ClusterReadyBody.h"
#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterUpdateOfferBody.h"
@@ -46,7 +47,6 @@
#include "qpid/management/ManagementBroker.h"
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
-#include "qpid/sys/LatencyMetric.h"
#include "qpid/sys/Thread.h"
#include <boost/bind.hpp>
@@ -63,6 +63,7 @@ using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
using namespace qpid::cluster;
+using namespace qpid::framing::cluster;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
@@ -77,9 +78,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
void ready(const std::string& url) { cluster.ready(member, url, l); }
- void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); }
+ void configChange(const std::string& current) { cluster.configChange(member, current, l); }
void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); }
void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
+ void errorCheck(uint8_t type, uint64_t seq) { cluster.errorCheck(member, type, seq, l); }
void shutdown() { cluster.shutdown(member, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
@@ -112,7 +114,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
discarding(true),
state(INIT),
lastSize(0),
- lastBroker(false)
+ lastBroker(false),
+ error(*this)
{
mAgent = ManagementAgent::Singleton::getInstance();
if (mAgent != 0){
@@ -195,14 +198,19 @@ void Cluster::leave() {
leave(l);
}
+#define LEAVE_TRY(STMT) try { STMT; } \
+ catch (const std::exception& e) { \
+ QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \
+ } do {} while(0)
+
void Cluster::leave(Lock&) {
if (state != LEFT) {
state = LEFT;
QPID_LOG(notice, *this << " leaving cluster " << name);
- try { broker.shutdown(); }
- catch (const std::exception& e) {
- QPID_LOG(critical, *this << " error during broker shutdown: " << e.what());
- }
+ // Finalize connections now now to avoid problems later in destructor.
+ LEAVE_TRY(localConnections.clear());
+ LEAVE_TRY(connections.clear());
+ LEAVE_TRY(broker.shutdown());
}
}
@@ -218,8 +226,6 @@ void Cluster::deliver(
MemberId from(nodeid, pid);
framing::Buffer buf(static_cast<char*>(msg), msg_len);
Event e(Event::decodeCopy(from, buf));
- if (from == self) // Record self-deliveries for flow control.
- mcast.selfDeliver(e);
deliverEvent(e);
}
@@ -254,10 +260,22 @@ void Cluster::deliveredEvent(const Event& e) {
QPID_LOG(trace, *this << " DROP: " << e);
}
+void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) {
+ Mutex::ScopedLock l(lock);
+ error.error(connection, type, map.getFrameSeq(), map.getMembers());
+}
+
// Handler for deliverFrameQueue.
// This thread executes the main logic.
void Cluster::deliveredFrame(const EventFrame& e) {
Mutex::ScopedLock l(lock);
+ // Process each frame through the error checker.
+ error.delivered(e);
+ while (error.canProcess()) // There is a frame ready to process.
+ processFrame(error.getNext(), l);
+}
+
+void Cluster::processFrame(const EventFrame& e, Lock& l) {
if (e.isCluster()) {
QPID_LOG(trace, *this << " DLVR: " << e);
ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
@@ -265,7 +283,8 @@ void Cluster::deliveredFrame(const EventFrame& e) {
throw Exception(QPID_MSG("Invalid cluster control"));
}
else if (state >= CATCHUP) {
- QPID_LOG(trace, *this << " DLVR: " << e);
+ map.incrementFrameSeq();
+ QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e);
ConnectionPtr connection = getConnection(e.connectionId, l);
if (connection)
connection->deliveredFrame(e);
@@ -316,11 +335,11 @@ ostream& operator<<(ostream& o, const AddrList& a) {
for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) {
const char* reasonString;
switch (p->reason) {
- case CPG_REASON_JOIN: reasonString = " (joined) "; break;
- case CPG_REASON_LEAVE: reasonString = " (left) "; break;
- case CPG_REASON_NODEDOWN: reasonString = " (node-down) "; break;
- case CPG_REASON_NODEUP: reasonString = " (node-up) "; break;
- case CPG_REASON_PROCDOWN: reasonString = " (process-down) "; break;
+ case CPG_REASON_JOIN: reasonString = "(joined) "; break;
+ case CPG_REASON_LEAVE: reasonString = "(left) "; break;
+ case CPG_REASON_NODEDOWN: reasonString = "(node-down) "; break;
+ case CPG_REASON_NODEUP: reasonString = "(node-up) "; break;
+ case CPG_REASON_PROCDOWN: reasonString = "(process-down) "; break;
default: reasonString = " ";
}
qpid::cluster::MemberId member(*p);
@@ -342,8 +361,8 @@ void Cluster::configChange (
broker.setRecovery(nCurrent == 1);
initialized = true;
}
- QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent)
- << AddrList(left, nLeft, "( ", ")"));
+ QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent)
+ << AddrList(left, nLeft, "left: "));
std::string addresses;
for (cpg_address* p = current; p < current+nCurrent; ++p)
addresses.append(MemberId(*p).str());
@@ -357,8 +376,8 @@ void Cluster::setReady(Lock&) {
broker.getQueueEvents().enable();
}
-void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) {
- bool memberChange = map.configChange(addresses);
+void Cluster::configChange(const MemberId&, const std::string& current, Lock& l) {
+ bool memberChange = map.configChange(current);
if (state == LEFT) return;
if (!map.isAlive(self)) { // Final config change.
@@ -589,19 +608,24 @@ void Cluster::memberUpdate(Lock& l) {
mgmtObject->set_memberIDs(idstr);
}
- // Erase connections belonging to members that have left the cluster.
+ // Close connections belonging to members that have left the cluster.
ConnectionMap::iterator i = connections.begin();
while (i != connections.end()) {
ConnectionMap::iterator j = i++;
MemberId m = j->second->getId().getMember();
if (m != self && !map.isMember(m))
- connections.erase(j);
+ j->second->deliverClose();
}
}
std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
- static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" };
- return o << cluster.self << "(" << STATE[cluster.state] << ")";
+ static const char* STATE[] = {
+ "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT"
+ };
+ assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
+ o << cluster.self << "(" << STATE[cluster.state];
+ if (cluster.error.isUnresolved()) o << "/error";
+ return o << ")";
}
MemberId Cluster::getId() const {
@@ -635,4 +659,13 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
expiryPolicy->deliverExpire(id);
}
+void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock&) {
+ // If we receive an errorCheck here, it's because we have processed past the point
+ // of the error so respond with ERROR_TYPE_NONE
+ assert(map.getFrameSeq() >= frameSeq);
+ if (type != framing::cluster::ERROR_TYPE_NONE) // Don't respond if its already NONE.
+ mcast.mcastControl(
+ ClusterErrorCheckBody(ProtocolVersion(), framing::cluster::ERROR_TYPE_NONE, frameSeq), self);
+}
+
}} // namespace qpid::cluster