diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 79 |
1 files changed, 56 insertions, 23 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index f8e412f1e6..a17f54078c 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -36,6 +36,7 @@ #include "qpid/framing/ClusterConfigChangeBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" +#include "qpid/framing/ClusterErrorCheckBody.h" #include "qpid/framing/ClusterReadyBody.h" #include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterUpdateOfferBody.h" @@ -46,7 +47,6 @@ #include "qpid/management/ManagementBroker.h" #include "qpid/memory.h" #include "qpid/shared_ptr.h" -#include "qpid/sys/LatencyMetric.h" #include "qpid/sys/Thread.h" #include <boost/bind.hpp> @@ -63,6 +63,7 @@ using namespace qpid::framing; using namespace qpid::sys; using namespace std; using namespace qpid::cluster; +using namespace qpid::framing::cluster; using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; @@ -77,9 +78,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } void ready(const std::string& url) { cluster.ready(member, url, l); } - void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); } + void configChange(const std::string& current) { cluster.configChange(member, current, l); } void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); } void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } + void errorCheck(uint8_t type, uint64_t seq) { cluster.errorCheck(member, type, seq, l); } void shutdown() { cluster.shutdown(member, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } @@ -112,7 +114,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : discarding(true), state(INIT), lastSize(0), - lastBroker(false) + lastBroker(false), + error(*this) { mAgent = ManagementAgent::Singleton::getInstance(); if (mAgent != 0){ @@ -195,14 +198,19 @@ void Cluster::leave() { leave(l); } +#define LEAVE_TRY(STMT) try { STMT; } \ + catch (const std::exception& e) { \ + QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \ + } do {} while(0) + void Cluster::leave(Lock&) { if (state != LEFT) { state = LEFT; QPID_LOG(notice, *this << " leaving cluster " << name); - try { broker.shutdown(); } - catch (const std::exception& e) { - QPID_LOG(critical, *this << " error during broker shutdown: " << e.what()); - } + // Finalize connections now now to avoid problems later in destructor. + LEAVE_TRY(localConnections.clear()); + LEAVE_TRY(connections.clear()); + LEAVE_TRY(broker.shutdown()); } } @@ -218,8 +226,6 @@ void Cluster::deliver( MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); Event e(Event::decodeCopy(from, buf)); - if (from == self) // Record self-deliveries for flow control. - mcast.selfDeliver(e); deliverEvent(e); } @@ -254,10 +260,22 @@ void Cluster::deliveredEvent(const Event& e) { QPID_LOG(trace, *this << " DROP: " << e); } +void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) { + Mutex::ScopedLock l(lock); + error.error(connection, type, map.getFrameSeq(), map.getMembers()); +} + // Handler for deliverFrameQueue. // This thread executes the main logic. void Cluster::deliveredFrame(const EventFrame& e) { Mutex::ScopedLock l(lock); + // Process each frame through the error checker. + error.delivered(e); + while (error.canProcess()) // There is a frame ready to process. + processFrame(error.getNext(), l); +} + +void Cluster::processFrame(const EventFrame& e, Lock& l) { if (e.isCluster()) { QPID_LOG(trace, *this << " DLVR: " << e); ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l); @@ -265,7 +283,8 @@ void Cluster::deliveredFrame(const EventFrame& e) { throw Exception(QPID_MSG("Invalid cluster control")); } else if (state >= CATCHUP) { - QPID_LOG(trace, *this << " DLVR: " << e); + map.incrementFrameSeq(); + QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e); ConnectionPtr connection = getConnection(e.connectionId, l); if (connection) connection->deliveredFrame(e); @@ -316,11 +335,11 @@ ostream& operator<<(ostream& o, const AddrList& a) { for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) { const char* reasonString; switch (p->reason) { - case CPG_REASON_JOIN: reasonString = " (joined) "; break; - case CPG_REASON_LEAVE: reasonString = " (left) "; break; - case CPG_REASON_NODEDOWN: reasonString = " (node-down) "; break; - case CPG_REASON_NODEUP: reasonString = " (node-up) "; break; - case CPG_REASON_PROCDOWN: reasonString = " (process-down) "; break; + case CPG_REASON_JOIN: reasonString = "(joined) "; break; + case CPG_REASON_LEAVE: reasonString = "(left) "; break; + case CPG_REASON_NODEDOWN: reasonString = "(node-down) "; break; + case CPG_REASON_NODEUP: reasonString = "(node-up) "; break; + case CPG_REASON_PROCDOWN: reasonString = "(process-down) "; break; default: reasonString = " "; } qpid::cluster::MemberId member(*p); @@ -342,8 +361,8 @@ void Cluster::configChange ( broker.setRecovery(nCurrent == 1); initialized = true; } - QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent) - << AddrList(left, nLeft, "( ", ")")); + QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent) + << AddrList(left, nLeft, "left: ")); std::string addresses; for (cpg_address* p = current; p < current+nCurrent; ++p) addresses.append(MemberId(*p).str()); @@ -357,8 +376,8 @@ void Cluster::setReady(Lock&) { broker.getQueueEvents().enable(); } -void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) { - bool memberChange = map.configChange(addresses); +void Cluster::configChange(const MemberId&, const std::string& current, Lock& l) { + bool memberChange = map.configChange(current); if (state == LEFT) return; if (!map.isAlive(self)) { // Final config change. @@ -589,19 +608,24 @@ void Cluster::memberUpdate(Lock& l) { mgmtObject->set_memberIDs(idstr); } - // Erase connections belonging to members that have left the cluster. + // Close connections belonging to members that have left the cluster. ConnectionMap::iterator i = connections.begin(); while (i != connections.end()) { ConnectionMap::iterator j = i++; MemberId m = j->second->getId().getMember(); if (m != self && !map.isMember(m)) - connections.erase(j); + j->second->deliverClose(); } } std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { - static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" }; - return o << cluster.self << "(" << STATE[cluster.state] << ")"; + static const char* STATE[] = { + "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" + }; + assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1); + o << cluster.self << "(" << STATE[cluster.state]; + if (cluster.error.isUnresolved()) o << "/error"; + return o << ")"; } MemberId Cluster::getId() const { @@ -635,4 +659,13 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { expiryPolicy->deliverExpire(id); } +void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock&) { + // If we receive an errorCheck here, it's because we have processed past the point + // of the error so respond with ERROR_TYPE_NONE + assert(map.getFrameSeq() >= frameSeq); + if (type != framing::cluster::ERROR_TYPE_NONE) // Don't respond if its already NONE. + mcast.mcastControl( + ClusterErrorCheckBody(ProtocolVersion(), framing::cluster::ERROR_TYPE_NONE, frameSeq), self); +} + }} // namespace qpid::cluster |