summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp62
1 files changed, 47 insertions, 15 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 7b40328f1c..0e1c049a9c 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -42,6 +42,7 @@
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
#include "qmf/org/apache/qpid/cluster/Package.h"
+#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
#include <boost/bind.hpp>
#include <boost/cast.hpp>
@@ -61,7 +62,7 @@ using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
-namespace qmf = qmf::org::apache::qpid::cluster;
+namespace _qmf = ::qmf::org::apache::qpid::cluster;
/**@file
Threading notes:
@@ -102,11 +103,11 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
lastSize(0),
lastBroker(false)
{
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
- if (agent != 0){
- qmf::Package packageInit(agent);
- mgmtObject = new qmf::Cluster (agent, this, &broker,name,myUrl.str());
- agent->addObject (mgmtObject);
+ mAgent = ManagementAgent::Singleton::getInstance();
+ if (mAgent != 0){
+ _qmf::Package packageInit(mAgent);
+ mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str());
+ mAgent->addObject (mgmtObject);
mgmtObject->set_status("JOINING");
}
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
@@ -132,6 +133,15 @@ void Cluster::erase(ConnectionId id) {
connections.erase(id);
}
+std::vector<string> Cluster::getIds() const {
+ Lock l(lock);
+ return getIds(l);
+}
+
+std::vector<string> Cluster::getIds(Lock&) const {
+ return map.memberIds();
+}
+
std::vector<Url> Cluster::getUrls() const {
Lock l(lock);
return getUrls(l);
@@ -150,11 +160,11 @@ void Cluster::leave(Lock&) {
if (state != LEFT) {
state = LEFT;
QPID_LOG(notice, *this << " leaving cluster " << name);
- if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
try { cpg.leave(); }
catch (const std::exception& e) {
QPID_LOG(critical, *this << " error leaving process group: " << e.what());
}
+ connections.clear();
try { broker.shutdown(); }
catch (const std::exception& e) {
QPID_LOG(critical, *this << " error during broker shutdown: " << e.what());
@@ -173,7 +183,7 @@ boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& conn
return cp;
}
-void Cluster::deliver(
+void Cluster::deliver(
cpg_handle_t /*handle*/,
cpg_name* /*group*/,
uint32_t nodeid,
@@ -467,16 +477,27 @@ void Cluster ::shutdown(const MemberId& id, Lock& l) {
ManagementObject* Cluster::GetManagementObject() const { return mgmtObject; }
-Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string&) {
+Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, string&) {
Lock l(lock);
QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]");
switch (methodId) {
- case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(l); break;
- case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(l); break;
- default: return Manageable::STATUS_UNKNOWN_METHOD;
+ case _qmf::Cluster::METHOD_STOPCLUSTERNODE :
+ {
+ _qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args;
+ stringstream stream;
+ stream << myId;
+ if (iargs.i_brokerId == stream.str())
+ stopClusterNode(l);
+ }
+ break;
+ case _qmf::Cluster::METHOD_STOPFULLCLUSTER :
+ stopFullCluster(l);
+ break;
+ default:
+ return Manageable::STATUS_UNKNOWN_METHOD;
}
return Manageable::STATUS_OK;
-}
+}
void Cluster::stopClusterNode(Lock& l) {
QPID_LOG(notice, *this << " stopped by admin");
@@ -491,6 +512,7 @@ void Cluster::stopFullCluster(Lock& ) {
void Cluster::memberUpdate(Lock& l) {
QPID_LOG(info, *this << " member update: " << map);
std::vector<Url> urls = getUrls(l);
+ std::vector<string> ids = getIds(l);
size_t size = urls.size();
failoverExchange->setUrls(urls);
@@ -512,10 +534,16 @@ void Cluster::memberUpdate(Lock& l) {
mgmtObject->set_clusterSize(size);
string urlstr;
for(std::vector<Url>::iterator iter = urls.begin(); iter != urls.end(); iter++ ) {
- if (iter != urls.begin()) urlstr += "\n";
+ if (iter != urls.begin()) urlstr += ";";
urlstr += iter->str();
}
+ string idstr;
+ for(std::vector<string>::iterator iter = ids.begin(); iter != ids.end(); iter++ ) {
+ if (iter != ids.begin()) idstr += ";";
+ idstr += (*iter);
+ }
mgmtObject->set_members(urlstr);
+ mgmtObject->set_memberIDs(idstr);
}
// Close connections belonging to members that have now been excluded
@@ -545,8 +573,12 @@ void Cluster::checkQuorum() {
void Cluster::setClusterId(const Uuid& uuid) {
clusterId = uuid;
- if (mgmtObject)
+ if (mgmtObject) {
+ stringstream stream;
+ stream << myId;
mgmtObject->set_clusterID(clusterId.str());
+ mgmtObject->set_memberID(stream.str());
+ }
QPID_LOG(debug, *this << " cluster-id = " << clusterId);
}