diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 107 |
1 files changed, 53 insertions, 54 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index f8adb8ee98..0d082fc226 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -18,7 +18,7 @@ #include "Cluster.h" #include "Connection.h" -#include "DumpClient.h" +#include "UpdateClient.h" #include "FailoverExchange.h" #include "ClusterQueueHandler.h" @@ -29,10 +29,10 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" -#include "qpid/framing/ClusterDumpRequestBody.h" +#include "qpid/framing/ClusterUpdateRequestBody.h" #include "qpid/framing/ClusterReadyBody.h" #include "qpid/framing/ClusterConfigChangeBody.h" -#include "qpid/framing/ClusterDumpOfferBody.h" +#include "qpid/framing/ClusterUpdateOfferBody.h" #include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" @@ -77,10 +77,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { Cluster::Lock& l; ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {} - void dumpRequest(const std::string& url) { cluster.dumpRequest(member, url, l); } + void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); } - void dumpOffer(uint64_t dumpee, const Uuid& id) { cluster.dumpOffer(member, dumpee, id, l); } + void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); } void shutdown() { cluster.shutdown(member, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } @@ -124,7 +124,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b } Cluster::~Cluster() { - if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread. + if (updateThread.id()) updateThread.join(); // Join the previous updatethread. } void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { @@ -205,7 +205,6 @@ void Cluster::deliver( void Cluster::deliver(const Event& e, Lock&) { if (state == LEFT) return; - QPID_LOG(trace, *this << " PUSH: " << e); QPID_LATENCY_INIT(e); deliverEventQueue.push(e); } @@ -216,7 +215,7 @@ void Cluster::deliveredEvent(const Event& e) { Buffer buf(const_cast<char*>(e.getData()), e.getSize()); boost::intrusive_ptr<Connection> connection; if (e.isConnection()) { - if (state == NEWBIE) { + if (state == JOINER) { QPID_LOG(trace, *this << " DROP: " << e); return; } @@ -236,11 +235,11 @@ void Cluster::deliveredEvent(const Event& e) { void Cluster::deliveredFrame(const EventFrame& e) { QPID_LATENCY_RECORD("delivered frame queue", e.frame); - QPID_LOG(trace, *this << " DLVR: " << e.frame); if (e.connection) { e.connection->deliveredFrame(e); } else { + QPID_LOG(trace, *this << " DLVR: " << e.frame); Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope too big? ClusterDispatcher dispatch(*this, e.member, l); if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled()) @@ -313,9 +312,9 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& memberUpdate(l); } else { // Joining established group. - state = NEWBIE; + state = JOINER; QPID_LOG(info, *this << " joining cluster: " << map); - mcast.mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), myId); + mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId); ClusterMap::Set members = map.getAlive(); members.erase(myId); myElders = members; @@ -336,10 +335,10 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& void Cluster::tryMakeOffer(const MemberId& id, Lock& ) { - if (state == READY && map.isNewbie(id)) { + if (state == READY && map.isJoiner(id)) { state = OFFER; - QPID_LOG(info, *this << " send dump-offer to " << id); - mcast.mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId), myId); + QPID_LOG(info, *this << " send update-offer to " << id); + mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), myId); } } @@ -359,8 +358,8 @@ void Cluster::brokerShutdown() { delete this; } -void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) { - map.dumpRequest(id, url); +void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) { + map.updateRequest(id, url); tryMakeOffer(id, l); } @@ -376,81 +375,81 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { } } -void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid& uuid, Lock& l) { +void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) { if (state == LEFT) return; - MemberId dumpee(dumpeeInt); - boost::optional<Url> url = map.dumpOffer(dumper, dumpee); - if (dumper == myId) { + MemberId updatee(updateeInt); + boost::optional<Url> url = map.updateOffer(updater, updatee); + if (updater == myId) { assert(state == OFFER); if (url) { // My offer was first. - dumpStart(dumpee, *url, l); + updateStart(updatee, *url, l); } else { // Another offer was first. state = READY; mcast.release(); - QPID_LOG(info, *this << " cancelled dump offer to " << dumpee); - tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer. + QPID_LOG(info, *this << " cancelled update offer to " << updatee); + tryMakeOffer(map.firstJoiner(), l); // Maybe make another offer. } } - else if (dumpee == myId && url) { - assert(state == NEWBIE); + else if (updatee == myId && url) { + assert(state == JOINER); setClusterId(uuid); - state = DUMPEE; - QPID_LOG(info, *this << " receiving dump from " << dumper); + state = UPDATEE; + QPID_LOG(info, *this << " receiving update from " << updater); deliverEventQueue.stop(); - checkDumpIn(l); + checkUpdateIn(l); } } -void Cluster::dumpStart(const MemberId& dumpee, const Url& url, Lock&) { +void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) { if (state == LEFT) return; assert(state == OFFER); - state = DUMPER; - QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " << url); + state = UPDATER; + QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url); deliverEventQueue.stop(); - if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread. - dumpThread = Thread( - new DumpClient(myId, dumpee, url, broker, map, connections.values(), - boost::bind(&Cluster::dumpOutDone, this), - boost::bind(&Cluster::dumpOutError, this, _1))); + if (updateThread.id()) updateThread.join(); // Join the previous updatethread. + updateThread = Thread( + new UpdateClient(myId, updatee, url, broker, map, connections.values(), + boost::bind(&Cluster::updateOutDone, this), + boost::bind(&Cluster::updateOutError, this, _1))); } -// Called in dump thread. -void Cluster::dumpInDone(const ClusterMap& m) { +// Called in update thread. +void Cluster::updateInDone(const ClusterMap& m) { Lock l(lock); - dumpedMap = m; - checkDumpIn(l); + updatedMap = m; + checkUpdateIn(l); } -void Cluster::checkDumpIn(Lock& ) { +void Cluster::checkUpdateIn(Lock& ) { if (state == LEFT) return; - if (state == DUMPEE && dumpedMap) { - map = *dumpedMap; + if (state == UPDATEE && updatedMap) { + map = *updatedMap; mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId); state = CATCHUP; - QPID_LOG(info, *this << " received dump, starting catch-up"); + QPID_LOG(info, *this << " received update, starting catch-up"); deliverEventQueue.start(); } } -void Cluster::dumpOutDone() { +void Cluster::updateOutDone() { Monitor::ScopedLock l(lock); - dumpOutDone(l); + updateOutDone(l); } -void Cluster::dumpOutDone(Lock& l) { - assert(state == DUMPER); +void Cluster::updateOutDone(Lock& l) { + assert(state == UPDATER); state = READY; mcast.release(); - QPID_LOG(info, *this << " sent dump"); + QPID_LOG(info, *this << " sent update"); deliverEventQueue.start(); - tryMakeOffer(map.firstNewbie(), l); // Try another offer + tryMakeOffer(map.firstJoiner(), l); // Try another offer } -void Cluster::dumpOutError(const std::exception& e) { +void Cluster::updateOutError(const std::exception& e) { Monitor::ScopedLock l(lock); - QPID_LOG(error, *this << " error sending dump: " << e.what()); - dumpOutDone(l); + QPID_LOG(error, *this << " error sending update: " << e.what()); + updateOutDone(l); } void Cluster ::shutdown(const MemberId& id, Lock& l) { @@ -534,7 +533,7 @@ void Cluster::memberUpdate(Lock& l) { } std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { - static const char* STATE[] = { "INIT", "NEWBIE", "DUMPEE", "CATCHUP", "READY", "OFFER", "DUMPER", "LEFT" }; + static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" }; return o << cluster.myId << "(" << STATE[cluster.state] << ")"; } |