diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 231 |
1 files changed, 140 insertions, 91 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 9db2a61a82..c441686def 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -25,7 +25,10 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" -#include "qpid/framing/ClusterUrlNoticeBody.h" +#include "qpid/framing/ClusterDumpRequestBody.h" +#include "qpid/framing/ClusterReadyBody.h" +#include "qpid/framing/ClusterDumpErrorBody.h" +#include "qpid/framing/ClusterMapBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/log/Statement.h" @@ -50,19 +53,14 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler { Cluster& cluster; MemberId member; ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {} - void urlNotice(const std::string& u) { cluster.urlNotice(member, u); } - void ready() { cluster.ready(member); } - - void members(const framing::FieldTable& , const framing::FieldTable& , const framing::FieldTable& ) { - assert(0); // Not passed to cluster, used to start a brain dump over TCP. - } - bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); } - virtual void map(const FieldTable& ,const FieldTable& ,const FieldTable& ) { - // FIXME aconway 2008-09-12: TODO + void dumpRequest(const std::string& u) { cluster.dumpRequest(member, u); } + void dumpError(uint64_t dumpee) { cluster.dumpError(member, MemberId(dumpee)); } + void ready(const std::string& u) { cluster.ready(member, u); } + virtual void map(const FieldTable& members,const FieldTable& dumpees, const FieldTable& dumps) { + cluster.mapInit(members, dumpees, dumps); } - }; Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : @@ -80,17 +78,14 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))), state(DISCARD) { - QPID_LOG(notice, "Cluster member " << self << " joining cluster " << name.str()); + QPID_LOG(notice, self << " joining cluster " << name.str()); broker.addFinalizer(boost::bind(&Cluster::shutdown, this)); - cpg.join(name); - - connectionEventQueue.start(poller); cpgDispatchHandle.startWatch(poller); + cpg.join(name); + } -Cluster::~Cluster() { - QPID_LOG(debug, "~Cluster()"); -} +Cluster::~Cluster() {} void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { Mutex::ScopedLock l(lock); @@ -102,60 +97,47 @@ void Cluster::erase(ConnectionId id) { connections.erase(id); } -// FIXME aconway 2008-09-10: leave is currently not called, -// It should be called if we are shut down by a cluster admin command. +// FIXME aconway 2008-09-10: call leave from cluster admin command. // Any other type of exit is caught in disconnect(). // void Cluster::leave() { - QPID_LOG(notice, "Cluster member " << self << " leaving cluster " << name.str()); + QPID_LOG(notice, self << " leaving cluster " << name.str()); cpg.leave(name); - // Cluster will shut down in configChange when the cluster knows we've left. -} - -template <class T> void decodePtr(Buffer& buf, T*& ptr) { - uint64_t value = buf.getLongLong(); - ptr = reinterpret_cast<T*>(value); -} - -template <class T> void encodePtr(Buffer& buf, T* ptr) { - uint64_t value = reinterpret_cast<uint64_t>(ptr); - buf.putLongLong(value); + // Defer shut down to the final configChange when the group knows we've left. } -void Cluster::mcastFrame(const AMQFrame& frame, const ConnectionId& connection) { - QPID_LOG(trace, "MCAST [" << connection << "] " << frame); - Event e(CONTROL, connection, frame.size()); +void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr) { + QPID_LOG(trace, "MCAST [" << self << "]: " << body); + AMQFrame f(body); + Event e(CONTROL, ConnectionId(self, cptr), f.size()); Buffer buf(e); - frame.encode(buf); + f.encode(buf); mcastEvent(e); } void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection) { - QPID_LOG(trace, "MCAST [" << connection << "] " << size << "bytes of data"); Event e(DATA, connection, size); memcpy(e.getData(), data, size); mcastEvent(e); } void Cluster::mcastEvent(const Event& e) { - QPID_LOG(trace, "Multicasting: " << e); e.mcast(name, cpg); } size_t Cluster::size() const { Mutex::ScopedLock l(lock); - return urls.size(); + return map.memberCount(); } std::vector<Url> Cluster::getUrls() const { Mutex::ScopedLock l(lock); - std::vector<Url> result(urls.size()); - std::transform(urls.begin(), urls.end(), result.begin(), - boost::bind(&UrlMap::value_type::second, _1)); - return result; -} + return map.memberUrls(); +} +// FIXME aconway 2008-09-15: volatile for locked/unlocked functions. boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) { + Mutex::ScopedLock l(lock); if (id.getMember() == self) return boost::intrusive_ptr<Connection>(id.getConnectionPtr()); ConnectionMap::iterator i = connections.find(id); @@ -180,17 +162,19 @@ void Cluster::deliver( try { MemberId from(nodeid, pid); Event e = Event::delivered(from, msg, msg_len); - QPID_LOG(trace, "Cluster deliver: " << e); - // Process cluster controls immediately if (e.getConnectionId().getConnectionPtr() == 0) { // Cluster control Buffer buf(e); AMQFrame frame; - while (frame.decode(buf)) + while (frame.decode(buf)) { + QPID_LOG(trace, "DLVR [" << e.getConnectionId().getMember() << "]: " << *frame.getBody()); if (!ClusterOperations(*this, e.getConnectionId().getMember()).invoke(frame)) - throw Exception("Invalid cluster control"); + throw Exception(QPID_MSG("Invalid cluster control")); + } } - else { // Process connection controls & data via the connectionEventQueue. + else { + // Process connection controls & data via the connectionEventQueue + // unless we are in the DISCARD state, in which case ignore. if (state != DISCARD) { e.setConnection(getConnection(e.getConnectionId())); connectionEventQueue.push(e); @@ -227,15 +211,15 @@ ostream& operator<<(ostream& o, const AddrList& a) { for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) { const char* reasonString; switch (p->reason) { - case CPG_REASON_JOIN: reasonString = " joined "; break; - case CPG_REASON_LEAVE: reasonString = " left ";break; - case CPG_REASON_NODEDOWN: reasonString = " node-down ";break; - case CPG_REASON_NODEUP: reasonString = " node-up ";break; - case CPG_REASON_PROCDOWN: reasonString = " process-down ";break; + case CPG_REASON_JOIN: reasonString = " joined"; break; + case CPG_REASON_LEAVE: reasonString = " left";break; + case CPG_REASON_NODEDOWN: reasonString = " node-down";break; + case CPG_REASON_NODEUP: reasonString = " node-up";break; + case CPG_REASON_PROCDOWN: reasonString = " process-down";break; default: reasonString = " "; } qpid::cluster::MemberId member(*p); - o << member << reasonString; + o << member << reasonString << ((p+1 < a.addrs+a.count) ? ", " : ""); } return o; } @@ -247,23 +231,28 @@ void Cluster::configChange( cpg_address *left, int nLeft, cpg_address *joined, int nJoined) { - QPID_LOG(info, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: " - << AddrList(joined, nJoined) << AddrList(left, nLeft)); - - if (nJoined) // Notfiy new members of my URL. - mcastFrame( - AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())), - ConnectionId(self,0)); - + // FIXME aconway 2008-09-15: use group terminology not cluster. Member not node. + QPID_LOG(notice, "Current cluster: " << AddrList(current, nCurrent)); + QPID_LOG_IF(notice, nLeft, "Left the cluster: " << AddrList(left, nLeft)); if (find(left, left+nLeft, self) != left+nLeft) { // We have left the group, this is the final config change. - QPID_LOG(notice, "Cluster member " << self << " left cluster " << name.str()); - broker.shutdown(); + QPID_LOG(notice, self << " left cluster " << name.str()); + broker.shutdown(); } Mutex::ScopedLock l(lock); - for (int i = 0; i < nLeft; ++i) urls.erase(left[i]); - // Add new members when their URL notice arraives. - lock.notifyAll(); // Threads waiting for membership changes. + if (state == DISCARD) { + if (nCurrent == 1 && *current == self) { + QPID_LOG(notice, self << " first in cluster."); + map.ready(self, url); + ready(); // First in cluster. + } + else if (find(joined, joined+nJoined, self) != joined+nJoined) { + QPID_LOG(notice, self << " requesting state dump."); // Just joined + mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0); + } + } + for (int i = 0; i < nLeft; ++i) + map.leave(left[i]); } void Cluster::dispatch(sys::DispatchHandle& h) { @@ -275,24 +264,59 @@ void Cluster::disconnect(sys::DispatchHandle& ) { // FIXME aconway 2008-09-11: this should be logged as critical, // when we provide admin option to shut down cluster and let // members leave cleanly. - QPID_LOG(notice, "Cluster member " << self << " disconnected from cluster " << name.str()); + QPID_LOG(notice, self << " disconnected from cluster " << name.str()); broker.shutdown(); } -void Cluster::urlNotice(const MemberId& m, const string& url) { - //FIXME aconway 2008-09-12: Rdo join logic using ClusterMap. Implement xml map function also. - //FIXME aconway 2008-09-11: Note multiple meanings of my own notice - - //from DISCARD->STALL and from STALL->READY via map. +// FIXME aconway 2008-09-15: can't serve multiple dump requests, stall in wrong place. +// Only one at a time to simplify things? +void Cluster::dumpRequest(const MemberId& m, const string& urlStr) { + Mutex::ScopedLock l(lock); + Url url(urlStr); + if (self == m) { + switch (state) { + case DISCARD: state = CATCHUP; stall(); break; + case HAVE_DUMP: ready(); break; // FIXME aconway 2008-09-15: apply dump to map. + default: assert(0); + }; + } + else if (self == map.dumpRequest(m, url)) { + assert(state == READY); + QPID_LOG(info, self << " dumping to " << url); + // state = DUMPING; + // stall(); + // FIXME aconway 2008-09-15: need to stall map? + // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient. + mcastControl(map.toControl(), 0); // FIXME aconway 2008-09-15: stand-in for dump. + } +} + +void Cluster::ready(const MemberId& m, const string& urlStr) { + Mutex::ScopedLock l(lock); + Url url(urlStr); + map.ready(m, url); +} + +broker::Broker& Cluster::getBroker(){ return broker; } + +void Cluster::stall() { + Mutex::ScopedLock l(lock); + // Stop processing connection events. We still process config changes + // and cluster controls in deliver() + connectionEventQueue.stop(); - QPID_LOG(info, "Cluster member " << m << " has URL " << url); - // My brain dump is up to this point, stall till it is complete. - if (m == self && state == DISCARD) - state = STALL; - urls.insert(UrlMap::value_type(m,Url(url))); + // FIXME aconway 2008-09-11: Flow control, we should slow down or + // stop reading from local connections while stalled to avoid an + // unbounded queue. } -void Cluster::ready(const MemberId& ) { - // FIXME aconway 2008-09-08: TODO +void Cluster::ready() { + // Called with lock held + QPID_LOG(info, self << " ready with URL " << url); + state = READY; + mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0); + connectionEventQueue.start(poller); + // FIXME aconway 2008-09-15: stall/unstall map? } // Called from Broker::~Broker when broker is shut down. At this @@ -301,26 +325,51 @@ void Cluster::ready(const MemberId& ) { // callbacks will be invoked. // void Cluster::shutdown() { - QPID_LOG(notice, "Cluster member " << self << " shutting down."); + QPID_LOG(notice, self << " shutting down."); try { cpg.shutdown(); } catch (const std::exception& e) { QPID_LOG(error, "During CPG shutdown: " << e.what()); } delete this; } -broker::Broker& Cluster::getBroker(){ return broker; } +/** Received from cluster */ +void Cluster::dumpError(const MemberId& dumper, const MemberId& dumpee) { + QPID_LOG(error, "Error in dump from " << dumper << " to " << dumpee); + Mutex::ScopedLock l(lock); + map.dumpError(dumpee); + if (state == DUMPING && map.dumps(self) == 0) + ready(); +} -void Cluster::stall() { - // Stop processing connection events. We still process config changes - // and cluster controls in deliver() +/** Called in local dump thread */ +void Cluster::dumpError(const MemberId& dumpee, const Url& url, const char* msg) { + assert(state == DUMPING); + QPID_LOG(error, "Error in local dump to " << dumpee << " at " << url << ": " << msg); + mcastControl(ClusterDumpErrorBody(ProtocolVersion(), dumpee), 0); + Mutex::ScopedLock l(lock); + map.dumpError(dumpee); + if (map.dumps(self) == 0) // Unstall immediately. + ready(); +} - // FIXME aconway 2008-09-11: Flow control, we should slow down or - // stop reading from local connections while stalled to avoid an - // unbounded queue. - connectionEventQueue.stop(); +void Cluster::mapInit(const FieldTable& members,const FieldTable& dumpees, const FieldTable& dumps) { + Mutex::ScopedLock l(lock); + // FIXME aconway 2008-09-15: faking out dump here. + switch (state) { + case DISCARD: + map.init(members, dumpees, dumps); + state = HAVE_DUMP; + break; + case CATCHUP: + map.init(members, dumpees, dumps); + ready(); + break; + default: + break; + } } -void Cluster::unStall() { - connectionEventQueue.start(poller); +void Cluster::dumpTo(const Url& ) { + // FIXME aconway 2008-09-12: DumpClient } }} // namespace qpid::cluster |