diff options
author | Ted Ross <tross@apache.org> | 2009-01-26 23:17:29 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2009-01-26 23:17:29 +0000 |
commit | a26e85a730375155b69df1913c907aad6144b028 (patch) | |
tree | 010acb7fc1b717d11597236b378dec37cb9aa845 | |
parent | d8c1199a62dbff0e287b54b924ec4ccb319ac037 (diff) | |
download | qpid-python-a26e85a730375155b69df1913c907aad6144b028.tar.gz |
Added qpid-cluster utility plus model changes to support it.
Fixed a segfault during cluster member shutdown.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@737935 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 62 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ClusterMap.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ClusterMap.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ConnectionMap.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/management-schema.xml | 10 | ||||
-rwxr-xr-x | qpid/python/commands/qpid-cluster | 180 |
7 files changed, 254 insertions, 18 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 7b40328f1c..0e1c049a9c 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -42,6 +42,7 @@ #include "qpid/memory.h" #include "qpid/shared_ptr.h" #include "qmf/org/apache/qpid/cluster/Package.h" +#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" #include <boost/bind.hpp> #include <boost/cast.hpp> @@ -61,7 +62,7 @@ using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; -namespace qmf = qmf::org::apache::qpid::cluster; +namespace _qmf = ::qmf::org::apache::qpid::cluster; /**@file Threading notes: @@ -102,11 +103,11 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b lastSize(0), lastBroker(false) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); - if (agent != 0){ - qmf::Package packageInit(agent); - mgmtObject = new qmf::Cluster (agent, this, &broker,name,myUrl.str()); - agent->addObject (mgmtObject); + mAgent = ManagementAgent::Singleton::getInstance(); + if (mAgent != 0){ + _qmf::Package packageInit(mAgent); + mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str()); + mAgent->addObject (mgmtObject); mgmtObject->set_status("JOINING"); } broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); @@ -132,6 +133,15 @@ void Cluster::erase(ConnectionId id) { connections.erase(id); } +std::vector<string> Cluster::getIds() const { + Lock l(lock); + return getIds(l); +} + +std::vector<string> Cluster::getIds(Lock&) const { + return map.memberIds(); +} + std::vector<Url> Cluster::getUrls() const { Lock l(lock); return getUrls(l); @@ -150,11 +160,11 @@ void Cluster::leave(Lock&) { if (state != LEFT) { state = LEFT; QPID_LOG(notice, *this << " leaving cluster " << name); - if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN"); try { cpg.leave(); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error leaving process group: " << e.what()); } + connections.clear(); try { broker.shutdown(); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error during broker shutdown: " << e.what()); @@ -173,7 +183,7 @@ boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& conn return cp; } -void Cluster::deliver( +void Cluster::deliver( cpg_handle_t /*handle*/, cpg_name* /*group*/, uint32_t nodeid, @@ -467,16 +477,27 @@ void Cluster ::shutdown(const MemberId& id, Lock& l) { ManagementObject* Cluster::GetManagementObject() const { return mgmtObject; } -Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string&) { +Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, string&) { Lock l(lock); QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]"); switch (methodId) { - case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(l); break; - case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(l); break; - default: return Manageable::STATUS_UNKNOWN_METHOD; + case _qmf::Cluster::METHOD_STOPCLUSTERNODE : + { + _qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args; + stringstream stream; + stream << myId; + if (iargs.i_brokerId == stream.str()) + stopClusterNode(l); + } + break; + case _qmf::Cluster::METHOD_STOPFULLCLUSTER : + stopFullCluster(l); + break; + default: + return Manageable::STATUS_UNKNOWN_METHOD; } return Manageable::STATUS_OK; -} +} void Cluster::stopClusterNode(Lock& l) { QPID_LOG(notice, *this << " stopped by admin"); @@ -491,6 +512,7 @@ void Cluster::stopFullCluster(Lock& ) { void Cluster::memberUpdate(Lock& l) { QPID_LOG(info, *this << " member update: " << map); std::vector<Url> urls = getUrls(l); + std::vector<string> ids = getIds(l); size_t size = urls.size(); failoverExchange->setUrls(urls); @@ -512,10 +534,16 @@ void Cluster::memberUpdate(Lock& l) { mgmtObject->set_clusterSize(size); string urlstr; for(std::vector<Url>::iterator iter = urls.begin(); iter != urls.end(); iter++ ) { - if (iter != urls.begin()) urlstr += "\n"; + if (iter != urls.begin()) urlstr += ";"; urlstr += iter->str(); } + string idstr; + for(std::vector<string>::iterator iter = ids.begin(); iter != ids.end(); iter++ ) { + if (iter != ids.begin()) idstr += ";"; + idstr += (*iter); + } mgmtObject->set_members(urlstr); + mgmtObject->set_memberIDs(idstr); } // Close connections belonging to members that have now been excluded @@ -545,8 +573,12 @@ void Cluster::checkQuorum() { void Cluster::setClusterId(const Uuid& uuid) { clusterId = uuid; - if (mgmtObject) + if (mgmtObject) { + stringstream stream; + stream << myId; mgmtObject->set_clusterID(clusterId.str()); + mgmtObject->set_memberID(stream.str()); + } QPID_LOG(debug, *this << " cluster-id = " << clusterId); } diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 8d235c7caf..ecd63a866e 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -80,6 +80,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void erase(ConnectionId); // URLs of current cluster members - called in connection threads. + std::vector<std::string> getIds() const; std::vector<Url> getUrls() const; boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; } @@ -111,6 +112,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { // a Lock to call the unlocked functions. void leave(Lock&); + std::vector<std::string> getIds(Lock&) const; std::vector<Url> getUrls(Lock&) const; // Make an offer if we can - called in deliver thread. @@ -185,6 +187,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { framing::Uuid clusterId; NoOpConnectionOutputHandler shadowOut; ClusterMap::Set myElders; + qpid::management::ManagementAgent* mAgent; // Thread safe members Multicaster mcast; diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp index cc9ea29093..b00699c903 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp @@ -107,6 +107,17 @@ MemberId ClusterMap::firstNewbie() const { return newbies.empty() ? MemberId() : newbies.begin()->first; } +std::vector<string> ClusterMap::memberIds() const { + std::vector<string> ids; + for (Map::const_iterator iter = members.begin(); + iter != members.end(); iter++) { + std::stringstream stream; + stream << iter->first; + ids.push_back(stream.str()); + } + return ids; +} + std::vector<Url> ClusterMap::memberUrls() const { std::vector<Url> urls(members.size()); std::transform(members.begin(), members.end(), urls.begin(), diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.h b/qpid/cpp/src/qpid/cluster/ClusterMap.h index 507fee9a72..1893d0e796 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterMap.h +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.h @@ -75,6 +75,7 @@ class ClusterMap { size_t aliveCount() const { return alive.size(); } size_t memberCount() const { return members.size(); } + std::vector<std::string> memberIds() const; std::vector<Url> memberUrls() const; Set getAlive() const; diff --git a/qpid/cpp/src/qpid/cluster/ConnectionMap.h b/qpid/cpp/src/qpid/cluster/ConnectionMap.h index f1862e2e75..c355074e75 100644 --- a/qpid/cpp/src/qpid/cluster/ConnectionMap.h +++ b/qpid/cpp/src/qpid/cluster/ConnectionMap.h @@ -75,6 +75,11 @@ class ConnectionMap } } + void clear() { + ScopedLock l(lock); + map.clear(); + } + size_t size() const { return map.size(); } private: typedef std::map<ConnectionId, ConnectionPtr> Map; diff --git a/qpid/cpp/src/qpid/cluster/management-schema.xml b/qpid/cpp/src/qpid/cluster/management-schema.xml index da19387cc6..a6292e9113 100644 --- a/qpid/cpp/src/qpid/cluster/management-schema.xml +++ b/qpid/cpp/src/qpid/cluster/management-schema.xml @@ -40,13 +40,17 @@ If access rights are omitted for a property, they are assumed to be RO. <class name="Cluster"> <property name="brokerRef" type="objId" references="Broker" access="RC" index="y" parentRef="y"/> <property name="clusterName" type="sstr" access="RC" desc="Name of cluster this server is a member of"/> - <property name="clusterID" type="sstr" access="RO" desc="Globally uniquie ID (UUID) for this cluster instance"/> + <property name="clusterID" type="sstr" access="RO" desc="Globally unique ID (UUID) for this cluster instance"/> + <property name="memberID" type="sstr" access="RO" desc="ID of this member of the cluster"/> <property name="publishedURL" type="sstr" access="RC" desc="URL this node advertizes itself as"/> <property name="clusterSize" type="uint16" access="RO" desc="Number of brokers currently in the cluster"/> <property name="status" type="sstr" access="RO" desc="Cluster node status (STALLED,ACTIVE,JOINING)"/> - <property name="members" type="lstr" access="RO" desc="List of member URLs delimited by ';'"/> + <property name="members" type="lstr" access="RO" desc="List of member URLs delimited by ';'"/> + <property name="memberIDs" type="lstr" access="RO" desc="List of member IDs delimited by ';'"/> - <method name="stopClusterNode"/> + <method name="stopClusterNode"> + <arg name="brokerId" type="sstr" dir="I"/> + </method> <method name="stopFullCluster"/> </class> diff --git a/qpid/python/commands/qpid-cluster b/qpid/python/commands/qpid-cluster new file mode 100755 index 0000000000..f2028d944b --- /dev/null +++ b/qpid/python/commands/qpid-cluster @@ -0,0 +1,180 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import getopt +import sys +import locale +from qmf.console import Session + +_host = "localhost" +_stopId = None +_stopAll = False +_force = False + +def Usage (): + print "Usage: qpid-cluster [OPTIONS] [broker-addr]" + print + print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]" + print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" + print + print "Options:" + print " -s [--stop] ID Stop one member of the cluster by its ID" + print " -k [--all-stop] Shut down the whole cluster" + print " -f [--force] Suppress the 'are-you-sure?' prompt" + print + sys.exit (1) + +class BrokerManager: + def __init__(self): + self.brokerName = None + self.qmf = None + self.broker = None + + def SetBroker(self, brokerUrl): + self.url = brokerUrl + self.qmf = Session() + self.broker = self.qmf.addBroker(brokerUrl) + agents = self.qmf.getAgents() + for a in agents: + if a.getAgentBank() == 0: + self.brokerAgent = a + + def Disconnect(self): + if self.broker: + self.qmf.delBroker(self.broker) + + def overview(self): + packages = self.qmf.getPackages() + if "org.apache.qpid.cluster" not in packages: + print "Clustering is not installed on the broker." + sys.exit(0) + + clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) + if len(clusters) == 0: + print "Clustering is installed but not enabled on the broker." + sys.exit(0) + + cluster = clusters[0] + myUrl = cluster.publishedURL + memberList = cluster.members.split(";") + idList = cluster.memberIDs.split(";") + + print " Cluster Name: %s" % cluster.clusterName + print "Cluster Status: %s" % cluster.status + print " Cluster Size: %d" % cluster.clusterSize + print " Members: ID=%s URL=%s" % (idList[0], memberList[0]) + for idx in range(1,len(idList)): + print " : ID=%s URL=%s" % (idList[idx], memberList[idx]) + + def stopMember(self, id): + clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) + if len(clusters) == 0: + print "Clustering is installed but not enabled on the broker." + sys.exit(0) + + cluster = clusters[0] + idList = cluster.memberIDs.split(";") + if id not in idList: + print "No member with matching ID found" + sys.exit(1) + + if not _force: + prompt = "Warning: " + if len(idList) == 1: + prompt += "This command will shut down the last running cluster member." + else: + prompt += "This command will shut down a cluster member." + prompt += " Are you sure? [N]: " + + confirm = raw_input(prompt) + if len(confirm) == 0 or confirm[0].upper() != 'Y': + print "Operation canceled" + sys.exit(1) + + cluster.stopClusterNode(id) + + def stopAll(self): + clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) + if len(clusters) == 0: + print "Clustering is installed but not enabled on the broker." + sys.exit(0) + + if not _force: + prompt = "Warning: This command will shut down the entire cluster." + prompt += " Are you sure? [N]: " + + confirm = raw_input(prompt) + if len(confirm) == 0 or confirm[0].upper() != 'Y': + print "Operation canceled" + sys.exit(1) + + cluster = clusters[0] + cluster.stopFullCluster() + +## +## Main Program +## + +try: + longOpts = ("stop=", "all-stop", "force") + (optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "s:kf", longOpts) +except: + Usage () + +try: + encoding = locale.getpreferredencoding() + cargs = [a.decode(encoding) for a in encArgs] +except: + cargs = encArgs + +for opt in optlist: + if opt[0] == "-s" or opt[0] == "--stop": + _stopId = opt[1] + if opt[0] == "-k" or opt[0] == "--all-stop": + _stopAll = True + if opt[0] == "-f" or opt[0] == "--force": + _force = True + +nargs = len(cargs) +bm = BrokerManager() + +if nargs == 1: + _host = cargs[0] + +try: + bm.SetBroker(_host) + if _stopId: + bm.stopMember(_stopId) + elif _stopAll: + bm.stopAll() + else: + bm.overview() +except KeyboardInterrupt: + print +except Exception,e: + if e.__repr__().find("connection aborted") > 0: + # we expect this when asking the connected broker to shut down + sys.exit(0) + print "Failed:", e.args + sys.exit(1) + +bm.Disconnect() |