diff options
author | Ted Ross <tross@apache.org> | 2009-01-26 23:17:29 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2009-01-26 23:17:29 +0000 |
commit | d40d874132bc5011a76bd883fdf9d2507a2f8149 (patch) | |
tree | c83d42ac2c118d3a4f066b10ed655322140704b3 /cpp/src | |
parent | 0f7d1da9baa6906d2481bc05063ae7f8840c6aee (diff) | |
download | qpid-python-d40d874132bc5011a76bd883fdf9d2507a2f8149.tar.gz |
Added qpid-cluster utility plus model changes to support it.
Fixed a segfault during cluster member shutdown.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@737935 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 62 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionMap.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/management-schema.xml | 10 |
6 files changed, 74 insertions, 18 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 7b40328f1c..0e1c049a9c 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -42,6 +42,7 @@ #include "qpid/memory.h" #include "qpid/shared_ptr.h" #include "qmf/org/apache/qpid/cluster/Package.h" +#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" #include <boost/bind.hpp> #include <boost/cast.hpp> @@ -61,7 +62,7 @@ using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; -namespace qmf = qmf::org::apache::qpid::cluster; +namespace _qmf = ::qmf::org::apache::qpid::cluster; /**@file Threading notes: @@ -102,11 +103,11 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b lastSize(0), lastBroker(false) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); - if (agent != 0){ - qmf::Package packageInit(agent); - mgmtObject = new qmf::Cluster (agent, this, &broker,name,myUrl.str()); - agent->addObject (mgmtObject); + mAgent = ManagementAgent::Singleton::getInstance(); + if (mAgent != 0){ + _qmf::Package packageInit(mAgent); + mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str()); + mAgent->addObject (mgmtObject); mgmtObject->set_status("JOINING"); } broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); @@ -132,6 +133,15 @@ void Cluster::erase(ConnectionId id) { connections.erase(id); } +std::vector<string> Cluster::getIds() const { + Lock l(lock); + return getIds(l); +} + +std::vector<string> Cluster::getIds(Lock&) const { + return map.memberIds(); +} + std::vector<Url> Cluster::getUrls() const { Lock l(lock); return getUrls(l); @@ -150,11 +160,11 @@ void Cluster::leave(Lock&) { if (state != LEFT) { state = LEFT; QPID_LOG(notice, *this << " leaving cluster " << name); - if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN"); try { cpg.leave(); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error leaving process group: " << e.what()); } + connections.clear(); try { broker.shutdown(); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error during broker shutdown: " << e.what()); @@ -173,7 +183,7 @@ boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& conn return cp; } -void Cluster::deliver( +void Cluster::deliver( cpg_handle_t /*handle*/, cpg_name* /*group*/, uint32_t nodeid, @@ -467,16 +477,27 @@ void Cluster ::shutdown(const MemberId& id, Lock& l) { ManagementObject* Cluster::GetManagementObject() const { return mgmtObject; } -Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string&) { +Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, string&) { Lock l(lock); QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]"); switch (methodId) { - case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(l); break; - case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(l); break; - default: return Manageable::STATUS_UNKNOWN_METHOD; + case _qmf::Cluster::METHOD_STOPCLUSTERNODE : + { + _qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args; + stringstream stream; + stream << myId; + if (iargs.i_brokerId == stream.str()) + stopClusterNode(l); + } + break; + case _qmf::Cluster::METHOD_STOPFULLCLUSTER : + stopFullCluster(l); + break; + default: + return Manageable::STATUS_UNKNOWN_METHOD; } return Manageable::STATUS_OK; -} +} void Cluster::stopClusterNode(Lock& l) { QPID_LOG(notice, *this << " stopped by admin"); @@ -491,6 +512,7 @@ void Cluster::stopFullCluster(Lock& ) { void Cluster::memberUpdate(Lock& l) { QPID_LOG(info, *this << " member update: " << map); std::vector<Url> urls = getUrls(l); + std::vector<string> ids = getIds(l); size_t size = urls.size(); failoverExchange->setUrls(urls); @@ -512,10 +534,16 @@ void Cluster::memberUpdate(Lock& l) { mgmtObject->set_clusterSize(size); string urlstr; for(std::vector<Url>::iterator iter = urls.begin(); iter != urls.end(); iter++ ) { - if (iter != urls.begin()) urlstr += "\n"; + if (iter != urls.begin()) urlstr += ";"; urlstr += iter->str(); } + string idstr; + for(std::vector<string>::iterator iter = ids.begin(); iter != ids.end(); iter++ ) { + if (iter != ids.begin()) idstr += ";"; + idstr += (*iter); + } mgmtObject->set_members(urlstr); + mgmtObject->set_memberIDs(idstr); } // Close connections belonging to members that have now been excluded @@ -545,8 +573,12 @@ void Cluster::checkQuorum() { void Cluster::setClusterId(const Uuid& uuid) { clusterId = uuid; - if (mgmtObject) + if (mgmtObject) { + stringstream stream; + stream << myId; mgmtObject->set_clusterID(clusterId.str()); + mgmtObject->set_memberID(stream.str()); + } QPID_LOG(debug, *this << " cluster-id = " << clusterId); } diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 8d235c7caf..ecd63a866e 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -80,6 +80,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void erase(ConnectionId); // URLs of current cluster members - called in connection threads. + std::vector<std::string> getIds() const; std::vector<Url> getUrls() const; boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; } @@ -111,6 +112,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { // a Lock to call the unlocked functions. void leave(Lock&); + std::vector<std::string> getIds(Lock&) const; std::vector<Url> getUrls(Lock&) const; // Make an offer if we can - called in deliver thread. @@ -185,6 +187,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { framing::Uuid clusterId; NoOpConnectionOutputHandler shadowOut; ClusterMap::Set myElders; + qpid::management::ManagementAgent* mAgent; // Thread safe members Multicaster mcast; diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp index cc9ea29093..b00699c903 100644 --- a/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/cpp/src/qpid/cluster/ClusterMap.cpp @@ -107,6 +107,17 @@ MemberId ClusterMap::firstNewbie() const { return newbies.empty() ? MemberId() : newbies.begin()->first; } +std::vector<string> ClusterMap::memberIds() const { + std::vector<string> ids; + for (Map::const_iterator iter = members.begin(); + iter != members.end(); iter++) { + std::stringstream stream; + stream << iter->first; + ids.push_back(stream.str()); + } + return ids; +} + std::vector<Url> ClusterMap::memberUrls() const { std::vector<Url> urls(members.size()); std::transform(members.begin(), members.end(), urls.begin(), diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h index 507fee9a72..1893d0e796 100644 --- a/cpp/src/qpid/cluster/ClusterMap.h +++ b/cpp/src/qpid/cluster/ClusterMap.h @@ -75,6 +75,7 @@ class ClusterMap { size_t aliveCount() const { return alive.size(); } size_t memberCount() const { return members.size(); } + std::vector<std::string> memberIds() const; std::vector<Url> memberUrls() const; Set getAlive() const; diff --git a/cpp/src/qpid/cluster/ConnectionMap.h b/cpp/src/qpid/cluster/ConnectionMap.h index f1862e2e75..c355074e75 100644 --- a/cpp/src/qpid/cluster/ConnectionMap.h +++ b/cpp/src/qpid/cluster/ConnectionMap.h @@ -75,6 +75,11 @@ class ConnectionMap } } + void clear() { + ScopedLock l(lock); + map.clear(); + } + size_t size() const { return map.size(); } private: typedef std::map<ConnectionId, ConnectionPtr> Map; diff --git a/cpp/src/qpid/cluster/management-schema.xml b/cpp/src/qpid/cluster/management-schema.xml index da19387cc6..a6292e9113 100644 --- a/cpp/src/qpid/cluster/management-schema.xml +++ b/cpp/src/qpid/cluster/management-schema.xml @@ -40,13 +40,17 @@ If access rights are omitted for a property, they are assumed to be RO. <class name="Cluster"> <property name="brokerRef" type="objId" references="Broker" access="RC" index="y" parentRef="y"/> <property name="clusterName" type="sstr" access="RC" desc="Name of cluster this server is a member of"/> - <property name="clusterID" type="sstr" access="RO" desc="Globally uniquie ID (UUID) for this cluster instance"/> + <property name="clusterID" type="sstr" access="RO" desc="Globally unique ID (UUID) for this cluster instance"/> + <property name="memberID" type="sstr" access="RO" desc="ID of this member of the cluster"/> <property name="publishedURL" type="sstr" access="RC" desc="URL this node advertizes itself as"/> <property name="clusterSize" type="uint16" access="RO" desc="Number of brokers currently in the cluster"/> <property name="status" type="sstr" access="RO" desc="Cluster node status (STALLED,ACTIVE,JOINING)"/> - <property name="members" type="lstr" access="RO" desc="List of member URLs delimited by ';'"/> + <property name="members" type="lstr" access="RO" desc="List of member URLs delimited by ';'"/> + <property name="memberIDs" type="lstr" access="RO" desc="List of member IDs delimited by ';'"/> - <method name="stopClusterNode"/> + <method name="stopClusterNode"> + <arg name="brokerId" type="sstr" dir="I"/> + </method> <method name="stopFullCluster"/> </class> |