diff options
author | Alan Conway <aconway@apache.org> | 2008-12-02 20:41:49 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-12-02 20:41:49 +0000 |
commit | 7cdb9a9ab688988e596d9fce116a0998decd0972 (patch) | |
tree | aef9d6d0bc837b2eb0116e863c8bc89ed8f45021 /cpp/src/qpid/cluster/Cluster.cpp | |
parent | 0fa4afae5e690b1cf147ebbe60641b448fcb5c31 (diff) | |
download | qpid-python-7cdb9a9ab688988e596d9fce116a0998decd0972.tar.gz |
Cluster: handle CPG flow-control conditions.
PollableQueue: allow dispatch functions to refuse dispatch.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@722614 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 36 |
1 files changed, 24 insertions, 12 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index d536ac59f2..0ac0da2be4 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -99,7 +99,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b boost::bind(&Cluster::disconnect, this, _1) // disconnect ), deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller), - mcastQueue(boost::bind(&Event::mcast, _1, boost::cref(name), boost::ref(cpg)), poller), + mcastQueue(boost::bind(&Cluster::sendMcast, this, _1), poller), mcastId(0), mgmtObject(0), state(INIT), @@ -109,7 +109,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0){ qmf::Package packageInit(agent); - mgmtObject = new qmf::Cluster (agent, this, &broker,name.str(),myUrl.str()); + mgmtObject = new qmf::Cluster (agent, this, &broker,name,myUrl.str()); agent->addObject (mgmtObject); mgmtObject->set_status("JOINING"); } @@ -118,7 +118,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b cpgDispatchHandle.startWatch(poller); deliverQueue.start(); mcastQueue.start(); - QPID_LOG(notice, *this << " joining cluster " << name.str() << " with url=" << myUrl); + QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); if (useQuorum) quorum.init(); cpg.join(name); broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); // Must be last for exception safety. @@ -184,6 +184,17 @@ void Cluster::mcast(const Event& e, Lock&) { mcastQueue.push(e); } +bool Cluster::sendMcast(const Event& e) { + try { + return e.mcast(cpg); + } + catch (const std::exception& e) { + QPID_LOG(critical, "Multicast failure: " << e.what()); + leave(); + return false; + } +} + std::vector<Url> Cluster::getUrls() const { Lock l(lock); return getUrls(l); @@ -201,10 +212,10 @@ void Cluster::leave() { void Cluster::leave(Lock&) { if (state != LEFT) { state = LEFT; - QPID_LOG(notice, *this << " leaving cluster " << name.str()); + QPID_LOG(notice, *this << " leaving cluster " << name); if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN"); if (!deliverQueue.isStopped()) deliverQueue.stop(); - try { cpg.leave(name); } + try { cpg.leave(); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error leaving process group: " << e.what()); } @@ -224,7 +235,7 @@ boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& conn } else { // New shadow connection std::ostringstream mgmtId; - mgmtId << name.str() << ":" << connectionId; + mgmtId << name << ":" << connectionId; ConnectionMap::value_type value(connectionId, new Connection(*this, shadowOut, mgmtId.str(), connectionId)); i = connections.insert(value).first; @@ -260,7 +271,7 @@ void Cluster::deliver(const Event& e, Lock&) { } // Entry point: called when deliverQueue has events to process. -void Cluster::delivered(const Event& e) { +bool Cluster::delivered(const Event& e) { try { Lock l(lock); delivered(e,l); @@ -268,7 +279,7 @@ void Cluster::delivered(const Event& e) { QPID_LOG(critical, *this << " error in cluster delivery: " << e.what()); leave(); } - + return true; } void Cluster::delivered(const Event& e, Lock& l) { @@ -334,6 +345,7 @@ ostream& operator<<(ostream& o, const AddrList& a) { void Cluster::dispatch(sys::DispatchHandle& h) { try { cpg.dispatchAll(); + mcastQueue.start(); // In case it was stopped by flow control. h.rewatch(); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error in cluster dispatch: " << e.what()); @@ -359,7 +371,7 @@ void Cluster::configChange ( cpg_address */*joined*/, int /*nJoined*/) { Mutex::ScopedLock l(lock); - QPID_LOG(debug, *this << " enqueue config change: " << AddrList(current, nCurrent) + QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent) << AddrList(left, nLeft, "( ", ")")); std::string addresses; for (cpg_address* p = current; p < current+nCurrent; ++p) @@ -387,7 +399,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& } else { // Joining established group. state = NEWBIE; - QPID_LOG(info, *this << " joining established cluster"); + QPID_LOG(info, *this << " joining cluster: " << map); mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), l); } } @@ -542,12 +554,12 @@ void Cluster::stopClusterNode(Lock& l) { } void Cluster::stopFullCluster(Lock& l) { - QPID_LOG(notice, *this << " shutting down cluster " << name.str()); + QPID_LOG(notice, *this << " shutting down cluster " << name); mcastControl(ClusterShutdownBody(), l); } void Cluster::memberUpdate(Lock& l) { - QPID_LOG(info, *this << map.memberCount() << " members: " << map); + QPID_LOG(info, *this << " member update: " << map); std::vector<Url> urls = getUrls(l); size_t size = urls.size(); failoverExchange->setUrls(urls); |