summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-01-26 23:17:29 +0000
committerTed Ross <tross@apache.org>2009-01-26 23:17:29 +0000
commitd40d874132bc5011a76bd883fdf9d2507a2f8149 (patch)
treec83d42ac2c118d3a4f066b10ed655322140704b3 /cpp/src
parent0f7d1da9baa6906d2481bc05063ae7f8840c6aee (diff)
downloadqpid-python-d40d874132bc5011a76bd883fdf9d2507a2f8149.tar.gz
Added qpid-cluster utility plus model changes to support it.
Fixed a segfault during cluster member shutdown. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@737935 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp62
-rw-r--r--cpp/src/qpid/cluster/Cluster.h3
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.cpp11
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.h1
-rw-r--r--cpp/src/qpid/cluster/ConnectionMap.h5
-rw-r--r--cpp/src/qpid/cluster/management-schema.xml10
6 files changed, 74 insertions, 18 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 7b40328f1c..0e1c049a9c 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -42,6 +42,7 @@
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
#include "qmf/org/apache/qpid/cluster/Package.h"
+#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
#include <boost/bind.hpp>
#include <boost/cast.hpp>
@@ -61,7 +62,7 @@ using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
-namespace qmf = qmf::org::apache::qpid::cluster;
+namespace _qmf = ::qmf::org::apache::qpid::cluster;
/**@file
Threading notes:
@@ -102,11 +103,11 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
lastSize(0),
lastBroker(false)
{
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
- if (agent != 0){
- qmf::Package packageInit(agent);
- mgmtObject = new qmf::Cluster (agent, this, &broker,name,myUrl.str());
- agent->addObject (mgmtObject);
+ mAgent = ManagementAgent::Singleton::getInstance();
+ if (mAgent != 0){
+ _qmf::Package packageInit(mAgent);
+ mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str());
+ mAgent->addObject (mgmtObject);
mgmtObject->set_status("JOINING");
}
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
@@ -132,6 +133,15 @@ void Cluster::erase(ConnectionId id) {
connections.erase(id);
}
+std::vector<string> Cluster::getIds() const {
+ Lock l(lock);
+ return getIds(l);
+}
+
+std::vector<string> Cluster::getIds(Lock&) const {
+ return map.memberIds();
+}
+
std::vector<Url> Cluster::getUrls() const {
Lock l(lock);
return getUrls(l);
@@ -150,11 +160,11 @@ void Cluster::leave(Lock&) {
if (state != LEFT) {
state = LEFT;
QPID_LOG(notice, *this << " leaving cluster " << name);
- if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
try { cpg.leave(); }
catch (const std::exception& e) {
QPID_LOG(critical, *this << " error leaving process group: " << e.what());
}
+ connections.clear();
try { broker.shutdown(); }
catch (const std::exception& e) {
QPID_LOG(critical, *this << " error during broker shutdown: " << e.what());
@@ -173,7 +183,7 @@ boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& conn
return cp;
}
-void Cluster::deliver(
+void Cluster::deliver(
cpg_handle_t /*handle*/,
cpg_name* /*group*/,
uint32_t nodeid,
@@ -467,16 +477,27 @@ void Cluster ::shutdown(const MemberId& id, Lock& l) {
ManagementObject* Cluster::GetManagementObject() const { return mgmtObject; }
-Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string&) {
+Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, string&) {
Lock l(lock);
QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]");
switch (methodId) {
- case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(l); break;
- case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(l); break;
- default: return Manageable::STATUS_UNKNOWN_METHOD;
+ case _qmf::Cluster::METHOD_STOPCLUSTERNODE :
+ {
+ _qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args;
+ stringstream stream;
+ stream << myId;
+ if (iargs.i_brokerId == stream.str())
+ stopClusterNode(l);
+ }
+ break;
+ case _qmf::Cluster::METHOD_STOPFULLCLUSTER :
+ stopFullCluster(l);
+ break;
+ default:
+ return Manageable::STATUS_UNKNOWN_METHOD;
}
return Manageable::STATUS_OK;
-}
+}
void Cluster::stopClusterNode(Lock& l) {
QPID_LOG(notice, *this << " stopped by admin");
@@ -491,6 +512,7 @@ void Cluster::stopFullCluster(Lock& ) {
void Cluster::memberUpdate(Lock& l) {
QPID_LOG(info, *this << " member update: " << map);
std::vector<Url> urls = getUrls(l);
+ std::vector<string> ids = getIds(l);
size_t size = urls.size();
failoverExchange->setUrls(urls);
@@ -512,10 +534,16 @@ void Cluster::memberUpdate(Lock& l) {
mgmtObject->set_clusterSize(size);
string urlstr;
for(std::vector<Url>::iterator iter = urls.begin(); iter != urls.end(); iter++ ) {
- if (iter != urls.begin()) urlstr += "\n";
+ if (iter != urls.begin()) urlstr += ";";
urlstr += iter->str();
}
+ string idstr;
+ for(std::vector<string>::iterator iter = ids.begin(); iter != ids.end(); iter++ ) {
+ if (iter != ids.begin()) idstr += ";";
+ idstr += (*iter);
+ }
mgmtObject->set_members(urlstr);
+ mgmtObject->set_memberIDs(idstr);
}
// Close connections belonging to members that have now been excluded
@@ -545,8 +573,12 @@ void Cluster::checkQuorum() {
void Cluster::setClusterId(const Uuid& uuid) {
clusterId = uuid;
- if (mgmtObject)
+ if (mgmtObject) {
+ stringstream stream;
+ stream << myId;
mgmtObject->set_clusterID(clusterId.str());
+ mgmtObject->set_memberID(stream.str());
+ }
QPID_LOG(debug, *this << " cluster-id = " << clusterId);
}
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 8d235c7caf..ecd63a866e 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -80,6 +80,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void erase(ConnectionId);
// URLs of current cluster members - called in connection threads.
+ std::vector<std::string> getIds() const;
std::vector<Url> getUrls() const;
boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; }
@@ -111,6 +112,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// a Lock to call the unlocked functions.
void leave(Lock&);
+ std::vector<std::string> getIds(Lock&) const;
std::vector<Url> getUrls(Lock&) const;
// Make an offer if we can - called in deliver thread.
@@ -185,6 +187,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
framing::Uuid clusterId;
NoOpConnectionOutputHandler shadowOut;
ClusterMap::Set myElders;
+ qpid::management::ManagementAgent* mAgent;
// Thread safe members
Multicaster mcast;
diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp
index cc9ea29093..b00699c903 100644
--- a/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -107,6 +107,17 @@ MemberId ClusterMap::firstNewbie() const {
return newbies.empty() ? MemberId() : newbies.begin()->first;
}
+std::vector<string> ClusterMap::memberIds() const {
+ std::vector<string> ids;
+ for (Map::const_iterator iter = members.begin();
+ iter != members.end(); iter++) {
+ std::stringstream stream;
+ stream << iter->first;
+ ids.push_back(stream.str());
+ }
+ return ids;
+}
+
std::vector<Url> ClusterMap::memberUrls() const {
std::vector<Url> urls(members.size());
std::transform(members.begin(), members.end(), urls.begin(),
diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h
index 507fee9a72..1893d0e796 100644
--- a/cpp/src/qpid/cluster/ClusterMap.h
+++ b/cpp/src/qpid/cluster/ClusterMap.h
@@ -75,6 +75,7 @@ class ClusterMap {
size_t aliveCount() const { return alive.size(); }
size_t memberCount() const { return members.size(); }
+ std::vector<std::string> memberIds() const;
std::vector<Url> memberUrls() const;
Set getAlive() const;
diff --git a/cpp/src/qpid/cluster/ConnectionMap.h b/cpp/src/qpid/cluster/ConnectionMap.h
index f1862e2e75..c355074e75 100644
--- a/cpp/src/qpid/cluster/ConnectionMap.h
+++ b/cpp/src/qpid/cluster/ConnectionMap.h
@@ -75,6 +75,11 @@ class ConnectionMap
}
}
+ void clear() {
+ ScopedLock l(lock);
+ map.clear();
+ }
+
size_t size() const { return map.size(); }
private:
typedef std::map<ConnectionId, ConnectionPtr> Map;
diff --git a/cpp/src/qpid/cluster/management-schema.xml b/cpp/src/qpid/cluster/management-schema.xml
index da19387cc6..a6292e9113 100644
--- a/cpp/src/qpid/cluster/management-schema.xml
+++ b/cpp/src/qpid/cluster/management-schema.xml
@@ -40,13 +40,17 @@ If access rights are omitted for a property, they are assumed to be RO.
<class name="Cluster">
<property name="brokerRef" type="objId" references="Broker" access="RC" index="y" parentRef="y"/>
<property name="clusterName" type="sstr" access="RC" desc="Name of cluster this server is a member of"/>
- <property name="clusterID" type="sstr" access="RO" desc="Globally uniquie ID (UUID) for this cluster instance"/>
+ <property name="clusterID" type="sstr" access="RO" desc="Globally unique ID (UUID) for this cluster instance"/>
+ <property name="memberID" type="sstr" access="RO" desc="ID of this member of the cluster"/>
<property name="publishedURL" type="sstr" access="RC" desc="URL this node advertizes itself as"/>
<property name="clusterSize" type="uint16" access="RO" desc="Number of brokers currently in the cluster"/>
<property name="status" type="sstr" access="RO" desc="Cluster node status (STALLED,ACTIVE,JOINING)"/>
- <property name="members" type="lstr" access="RO" desc="List of member URLs delimited by ';'"/>
+ <property name="members" type="lstr" access="RO" desc="List of member URLs delimited by ';'"/>
+ <property name="memberIDs" type="lstr" access="RO" desc="List of member IDs delimited by ';'"/>
- <method name="stopClusterNode"/>
+ <method name="stopClusterNode">
+ <arg name="brokerId" type="sstr" dir="I"/>
+ </method>
<method name="stopFullCluster"/>
</class>