diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 72 |
1 files changed, 15 insertions, 57 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 222aa07548..aac5bc1dd8 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -90,20 +90,19 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b name(name_), myUrl(url_), myId(cpg.self()), + readMax(readMax_), cpgDispatchHandle( cpg, boost::bind(&Cluster::dispatch, this, _1), // read 0, // write boost::bind(&Cluster::disconnect, this, _1) // disconnect ), - deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller), - mcastQueue(boost::bind(&Cluster::sendMcast, this, _1), poller), - mcastId(0), + mcast(cpg, poller), mgmtObject(0), + deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller), state(INIT), lastSize(0), - lastBroker(false), - readMax(readMax_) + lastBroker(false) { ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0){ @@ -116,7 +115,6 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b failoverExchange.reset(new FailoverExchange(this)); cpgDispatchHandle.startWatch(poller); deliverQueue.start(); - mcastQueue.start(); QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); if (quorum_) quorum.init(); cpg.join(name); @@ -135,49 +133,6 @@ void Cluster::erase(ConnectionId id) { connections.erase(id); } -void Cluster::mcastControl(const framing::AMQBody& body, const ConnectionId& id, uint32_t seq) { - Event e(Event::control(body, id, seq)); - QPID_LOG(trace, *this << " MCAST " << e << ": " << body); - mcast(e); -} - -void Cluster::mcastControl(const framing::AMQBody& body) { - Event e(Event::control(body, ConnectionId(myId,0), ++mcastId)); - QPID_LOG(trace, *this << " MCAST " << e << ": " << body); - mcast(e); -} - -void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection, uint32_t id) { - Event e(DATA, connection, size, id); - memcpy(e.getData(), data, size); - { - Lock l(lock); - if (state <= CATCHUP && e.isConnection()) { - // Stall outgoing connection events untill we are fully READY - QPID_LOG(trace, *this << " MCAST deferred: " << e ); - mcastStallQueue.push_back(e); - return; - } - } - QPID_LOG(trace, *this << " MCAST " << e); - mcast(e); -} - -void Cluster::mcast(const Event& e) { mcastQueue.push(e); } - -void Cluster::sendMcast(PollableEventQueue::Queue& values) { - try { - PollableEventQueue::Queue::iterator i = values.begin(); - while (i != values.end() && i->mcast(cpg)) - ++i; - values.erase(values.begin(), i); - } - catch (const std::exception& e) { - QPID_LOG(critical, "Multicast failure: " << e.what()); - leave(); - } -} - std::vector<Url> Cluster::getUrls() const { Lock l(lock); return getUrls(l); @@ -315,7 +270,6 @@ ostream& operator<<(ostream& o, const AddrList& a) { void Cluster::dispatch(sys::DispatchHandle& h) { try { cpg.dispatchAll(); - mcastQueue.start(); // In case it was stopped by flow control. h.rewatch(); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error in cluster dispatch: " << e.what()); @@ -361,7 +315,9 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& if (state == INIT) { // First configChange if (map.aliveCount() == 1) { setClusterId(true); + // FIXME aconway 2008-12-11: Centralize transition to READY and associated actions eg mcast.release() state = READY; + mcast.release(); QPID_LOG(notice, *this << " first in cluster"); if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); map = ClusterMap(myId, myUrl, true); @@ -370,7 +326,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& else { // Joining established group. state = NEWBIE; QPID_LOG(info, *this << " joining cluster: " << map); - mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str())); + mcast.mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), myId); } } else if (state >= READY && memberChange) @@ -384,7 +340,7 @@ void Cluster::tryMakeOffer(const MemberId& id, Lock& ) { if (state == READY && map.isNewbie(id)) { state = OFFER; QPID_LOG(info, *this << " send dump-offer to " << id); - mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId)); + mcast.mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId), myId); } } @@ -414,10 +370,10 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { memberUpdate(l); if (state == CATCHUP && id == myId) { state = READY; + mcast.release(); QPID_LOG(notice, *this << " caught up, active cluster member"); if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); - for_each(mcastStallQueue.begin(), mcastStallQueue.end(), boost::bind(&Cluster::mcast, this, _1)); - mcastStallQueue.clear(); + mcast.release(); } } @@ -432,6 +388,7 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid& } else { // Another offer was first. state = READY; + mcast.release(); QPID_LOG(info, *this << " cancelled dump offer to " << dumpee); tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer. } @@ -461,6 +418,7 @@ void Cluster::dumpStart(const MemberId& dumpee, const Url& url, Lock&) { boost::bind(&Cluster::dumpOutError, this, _1))); } +// Called in dump thread. void Cluster::dumpInDone(const ClusterMap& m) { Lock l(lock); dumpedMap = m; @@ -471,8 +429,7 @@ void Cluster::checkDumpIn(Lock& ) { if (state == LEFT) return; if (state == DUMPEE && dumpedMap) { map = *dumpedMap; - mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str())); - // Don't flush the mcast queue till we are READY, on self-deliver. + mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId); state = CATCHUP; QPID_LOG(info, *this << " received dump, starting catch-up"); deliverQueue.start(); @@ -487,6 +444,7 @@ void Cluster::dumpOutDone() { void Cluster::dumpOutDone(Lock& l) { assert(state == DUMPER); state = READY; + mcast.release(); QPID_LOG(info, *this << " sent dump"); deliverQueue.start(); tryMakeOffer(map.firstNewbie(), l); // Try another offer @@ -523,7 +481,7 @@ void Cluster::stopClusterNode(Lock& l) { void Cluster::stopFullCluster(Lock& ) { QPID_LOG(notice, *this << " shutting down cluster " << name); - mcastControl(ClusterShutdownBody()); + mcast.mcastControl(ClusterShutdownBody(), myId); } void Cluster::memberUpdate(Lock& l) { |