diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 62 |
1 files changed, 47 insertions, 15 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 7b40328f1c..0e1c049a9c 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -42,6 +42,7 @@ #include "qpid/memory.h" #include "qpid/shared_ptr.h" #include "qmf/org/apache/qpid/cluster/Package.h" +#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" #include <boost/bind.hpp> #include <boost/cast.hpp> @@ -61,7 +62,7 @@ using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; -namespace qmf = qmf::org::apache::qpid::cluster; +namespace _qmf = ::qmf::org::apache::qpid::cluster; /**@file Threading notes: @@ -102,11 +103,11 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b lastSize(0), lastBroker(false) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); - if (agent != 0){ - qmf::Package packageInit(agent); - mgmtObject = new qmf::Cluster (agent, this, &broker,name,myUrl.str()); - agent->addObject (mgmtObject); + mAgent = ManagementAgent::Singleton::getInstance(); + if (mAgent != 0){ + _qmf::Package packageInit(mAgent); + mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str()); + mAgent->addObject (mgmtObject); mgmtObject->set_status("JOINING"); } broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); @@ -132,6 +133,15 @@ void Cluster::erase(ConnectionId id) { connections.erase(id); } +std::vector<string> Cluster::getIds() const { + Lock l(lock); + return getIds(l); +} + +std::vector<string> Cluster::getIds(Lock&) const { + return map.memberIds(); +} + std::vector<Url> Cluster::getUrls() const { Lock l(lock); return getUrls(l); @@ -150,11 +160,11 @@ void Cluster::leave(Lock&) { if (state != LEFT) { state = LEFT; QPID_LOG(notice, *this << " leaving cluster " << name); - if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN"); try { cpg.leave(); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error leaving process group: " << e.what()); } + connections.clear(); try { broker.shutdown(); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error during broker shutdown: " << e.what()); @@ -173,7 +183,7 @@ boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& conn return cp; } -void Cluster::deliver( +void Cluster::deliver( cpg_handle_t /*handle*/, cpg_name* /*group*/, uint32_t nodeid, @@ -467,16 +477,27 @@ void Cluster ::shutdown(const MemberId& id, Lock& l) { ManagementObject* Cluster::GetManagementObject() const { return mgmtObject; } -Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string&) { +Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, string&) { Lock l(lock); QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]"); switch (methodId) { - case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(l); break; - case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(l); break; - default: return Manageable::STATUS_UNKNOWN_METHOD; + case _qmf::Cluster::METHOD_STOPCLUSTERNODE : + { + _qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args; + stringstream stream; + stream << myId; + if (iargs.i_brokerId == stream.str()) + stopClusterNode(l); + } + break; + case _qmf::Cluster::METHOD_STOPFULLCLUSTER : + stopFullCluster(l); + break; + default: + return Manageable::STATUS_UNKNOWN_METHOD; } return Manageable::STATUS_OK; -} +} void Cluster::stopClusterNode(Lock& l) { QPID_LOG(notice, *this << " stopped by admin"); @@ -491,6 +512,7 @@ void Cluster::stopFullCluster(Lock& ) { void Cluster::memberUpdate(Lock& l) { QPID_LOG(info, *this << " member update: " << map); std::vector<Url> urls = getUrls(l); + std::vector<string> ids = getIds(l); size_t size = urls.size(); failoverExchange->setUrls(urls); @@ -512,10 +534,16 @@ void Cluster::memberUpdate(Lock& l) { mgmtObject->set_clusterSize(size); string urlstr; for(std::vector<Url>::iterator iter = urls.begin(); iter != urls.end(); iter++ ) { - if (iter != urls.begin()) urlstr += "\n"; + if (iter != urls.begin()) urlstr += ";"; urlstr += iter->str(); } + string idstr; + for(std::vector<string>::iterator iter = ids.begin(); iter != ids.end(); iter++ ) { + if (iter != ids.begin()) idstr += ";"; + idstr += (*iter); + } mgmtObject->set_members(urlstr); + mgmtObject->set_memberIDs(idstr); } // Close connections belonging to members that have now been excluded @@ -545,8 +573,12 @@ void Cluster::checkQuorum() { void Cluster::setClusterId(const Uuid& uuid) { clusterId = uuid; - if (mgmtObject) + if (mgmtObject) { + stringstream stream; + stream << myId; mgmtObject->set_clusterID(clusterId.str()); + mgmtObject->set_memberID(stream.str()); + } QPID_LOG(debug, *this << " cluster-id = " << clusterId); } |