summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-01-26 23:17:29 +0000
committerTed Ross <tross@apache.org>2009-01-26 23:17:29 +0000
commita26e85a730375155b69df1913c907aad6144b028 (patch)
tree010acb7fc1b717d11597236b378dec37cb9aa845
parentd8c1199a62dbff0e287b54b924ec4ccb319ac037 (diff)
downloadqpid-python-a26e85a730375155b69df1913c907aad6144b028.tar.gz
Added qpid-cluster utility plus model changes to support it.
Fixed a segfault during cluster member shutdown. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@737935 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp62
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterMap.cpp11
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterMap.h1
-rw-r--r--qpid/cpp/src/qpid/cluster/ConnectionMap.h5
-rw-r--r--qpid/cpp/src/qpid/cluster/management-schema.xml10
-rwxr-xr-xqpid/python/commands/qpid-cluster180
7 files changed, 254 insertions, 18 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 7b40328f1c..0e1c049a9c 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -42,6 +42,7 @@
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
#include "qmf/org/apache/qpid/cluster/Package.h"
+#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
#include <boost/bind.hpp>
#include <boost/cast.hpp>
@@ -61,7 +62,7 @@ using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
-namespace qmf = qmf::org::apache::qpid::cluster;
+namespace _qmf = ::qmf::org::apache::qpid::cluster;
/**@file
Threading notes:
@@ -102,11 +103,11 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
lastSize(0),
lastBroker(false)
{
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
- if (agent != 0){
- qmf::Package packageInit(agent);
- mgmtObject = new qmf::Cluster (agent, this, &broker,name,myUrl.str());
- agent->addObject (mgmtObject);
+ mAgent = ManagementAgent::Singleton::getInstance();
+ if (mAgent != 0){
+ _qmf::Package packageInit(mAgent);
+ mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str());
+ mAgent->addObject (mgmtObject);
mgmtObject->set_status("JOINING");
}
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
@@ -132,6 +133,15 @@ void Cluster::erase(ConnectionId id) {
connections.erase(id);
}
+std::vector<string> Cluster::getIds() const {
+ Lock l(lock);
+ return getIds(l);
+}
+
+std::vector<string> Cluster::getIds(Lock&) const {
+ return map.memberIds();
+}
+
std::vector<Url> Cluster::getUrls() const {
Lock l(lock);
return getUrls(l);
@@ -150,11 +160,11 @@ void Cluster::leave(Lock&) {
if (state != LEFT) {
state = LEFT;
QPID_LOG(notice, *this << " leaving cluster " << name);
- if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
try { cpg.leave(); }
catch (const std::exception& e) {
QPID_LOG(critical, *this << " error leaving process group: " << e.what());
}
+ connections.clear();
try { broker.shutdown(); }
catch (const std::exception& e) {
QPID_LOG(critical, *this << " error during broker shutdown: " << e.what());
@@ -173,7 +183,7 @@ boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& conn
return cp;
}
-void Cluster::deliver(
+void Cluster::deliver(
cpg_handle_t /*handle*/,
cpg_name* /*group*/,
uint32_t nodeid,
@@ -467,16 +477,27 @@ void Cluster ::shutdown(const MemberId& id, Lock& l) {
ManagementObject* Cluster::GetManagementObject() const { return mgmtObject; }
-Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string&) {
+Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, string&) {
Lock l(lock);
QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]");
switch (methodId) {
- case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(l); break;
- case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(l); break;
- default: return Manageable::STATUS_UNKNOWN_METHOD;
+ case _qmf::Cluster::METHOD_STOPCLUSTERNODE :
+ {
+ _qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args;
+ stringstream stream;
+ stream << myId;
+ if (iargs.i_brokerId == stream.str())
+ stopClusterNode(l);
+ }
+ break;
+ case _qmf::Cluster::METHOD_STOPFULLCLUSTER :
+ stopFullCluster(l);
+ break;
+ default:
+ return Manageable::STATUS_UNKNOWN_METHOD;
}
return Manageable::STATUS_OK;
-}
+}
void Cluster::stopClusterNode(Lock& l) {
QPID_LOG(notice, *this << " stopped by admin");
@@ -491,6 +512,7 @@ void Cluster::stopFullCluster(Lock& ) {
void Cluster::memberUpdate(Lock& l) {
QPID_LOG(info, *this << " member update: " << map);
std::vector<Url> urls = getUrls(l);
+ std::vector<string> ids = getIds(l);
size_t size = urls.size();
failoverExchange->setUrls(urls);
@@ -512,10 +534,16 @@ void Cluster::memberUpdate(Lock& l) {
mgmtObject->set_clusterSize(size);
string urlstr;
for(std::vector<Url>::iterator iter = urls.begin(); iter != urls.end(); iter++ ) {
- if (iter != urls.begin()) urlstr += "\n";
+ if (iter != urls.begin()) urlstr += ";";
urlstr += iter->str();
}
+ string idstr;
+ for(std::vector<string>::iterator iter = ids.begin(); iter != ids.end(); iter++ ) {
+ if (iter != ids.begin()) idstr += ";";
+ idstr += (*iter);
+ }
mgmtObject->set_members(urlstr);
+ mgmtObject->set_memberIDs(idstr);
}
// Close connections belonging to members that have now been excluded
@@ -545,8 +573,12 @@ void Cluster::checkQuorum() {
void Cluster::setClusterId(const Uuid& uuid) {
clusterId = uuid;
- if (mgmtObject)
+ if (mgmtObject) {
+ stringstream stream;
+ stream << myId;
mgmtObject->set_clusterID(clusterId.str());
+ mgmtObject->set_memberID(stream.str());
+ }
QPID_LOG(debug, *this << " cluster-id = " << clusterId);
}
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index 8d235c7caf..ecd63a866e 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -80,6 +80,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void erase(ConnectionId);
// URLs of current cluster members - called in connection threads.
+ std::vector<std::string> getIds() const;
std::vector<Url> getUrls() const;
boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; }
@@ -111,6 +112,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// a Lock to call the unlocked functions.
void leave(Lock&);
+ std::vector<std::string> getIds(Lock&) const;
std::vector<Url> getUrls(Lock&) const;
// Make an offer if we can - called in deliver thread.
@@ -185,6 +187,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
framing::Uuid clusterId;
NoOpConnectionOutputHandler shadowOut;
ClusterMap::Set myElders;
+ qpid::management::ManagementAgent* mAgent;
// Thread safe members
Multicaster mcast;
diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
index cc9ea29093..b00699c903 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -107,6 +107,17 @@ MemberId ClusterMap::firstNewbie() const {
return newbies.empty() ? MemberId() : newbies.begin()->first;
}
+std::vector<string> ClusterMap::memberIds() const {
+ std::vector<string> ids;
+ for (Map::const_iterator iter = members.begin();
+ iter != members.end(); iter++) {
+ std::stringstream stream;
+ stream << iter->first;
+ ids.push_back(stream.str());
+ }
+ return ids;
+}
+
std::vector<Url> ClusterMap::memberUrls() const {
std::vector<Url> urls(members.size());
std::transform(members.begin(), members.end(), urls.begin(),
diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.h b/qpid/cpp/src/qpid/cluster/ClusterMap.h
index 507fee9a72..1893d0e796 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterMap.h
+++ b/qpid/cpp/src/qpid/cluster/ClusterMap.h
@@ -75,6 +75,7 @@ class ClusterMap {
size_t aliveCount() const { return alive.size(); }
size_t memberCount() const { return members.size(); }
+ std::vector<std::string> memberIds() const;
std::vector<Url> memberUrls() const;
Set getAlive() const;
diff --git a/qpid/cpp/src/qpid/cluster/ConnectionMap.h b/qpid/cpp/src/qpid/cluster/ConnectionMap.h
index f1862e2e75..c355074e75 100644
--- a/qpid/cpp/src/qpid/cluster/ConnectionMap.h
+++ b/qpid/cpp/src/qpid/cluster/ConnectionMap.h
@@ -75,6 +75,11 @@ class ConnectionMap
}
}
+ void clear() {
+ ScopedLock l(lock);
+ map.clear();
+ }
+
size_t size() const { return map.size(); }
private:
typedef std::map<ConnectionId, ConnectionPtr> Map;
diff --git a/qpid/cpp/src/qpid/cluster/management-schema.xml b/qpid/cpp/src/qpid/cluster/management-schema.xml
index da19387cc6..a6292e9113 100644
--- a/qpid/cpp/src/qpid/cluster/management-schema.xml
+++ b/qpid/cpp/src/qpid/cluster/management-schema.xml
@@ -40,13 +40,17 @@ If access rights are omitted for a property, they are assumed to be RO.
<class name="Cluster">
<property name="brokerRef" type="objId" references="Broker" access="RC" index="y" parentRef="y"/>
<property name="clusterName" type="sstr" access="RC" desc="Name of cluster this server is a member of"/>
- <property name="clusterID" type="sstr" access="RO" desc="Globally uniquie ID (UUID) for this cluster instance"/>
+ <property name="clusterID" type="sstr" access="RO" desc="Globally unique ID (UUID) for this cluster instance"/>
+ <property name="memberID" type="sstr" access="RO" desc="ID of this member of the cluster"/>
<property name="publishedURL" type="sstr" access="RC" desc="URL this node advertizes itself as"/>
<property name="clusterSize" type="uint16" access="RO" desc="Number of brokers currently in the cluster"/>
<property name="status" type="sstr" access="RO" desc="Cluster node status (STALLED,ACTIVE,JOINING)"/>
- <property name="members" type="lstr" access="RO" desc="List of member URLs delimited by ';'"/>
+ <property name="members" type="lstr" access="RO" desc="List of member URLs delimited by ';'"/>
+ <property name="memberIDs" type="lstr" access="RO" desc="List of member IDs delimited by ';'"/>
- <method name="stopClusterNode"/>
+ <method name="stopClusterNode">
+ <arg name="brokerId" type="sstr" dir="I"/>
+ </method>
<method name="stopFullCluster"/>
</class>
diff --git a/qpid/python/commands/qpid-cluster b/qpid/python/commands/qpid-cluster
new file mode 100755
index 0000000000..f2028d944b
--- /dev/null
+++ b/qpid/python/commands/qpid-cluster
@@ -0,0 +1,180 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import getopt
+import sys
+import locale
+from qmf.console import Session
+
+_host = "localhost"
+_stopId = None
+_stopAll = False
+_force = False
+
+def Usage ():
+ print "Usage: qpid-cluster [OPTIONS] [broker-addr]"
+ print
+ print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]"
+ print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
+ print
+ print "Options:"
+ print " -s [--stop] ID Stop one member of the cluster by its ID"
+ print " -k [--all-stop] Shut down the whole cluster"
+ print " -f [--force] Suppress the 'are-you-sure?' prompt"
+ print
+ sys.exit (1)
+
+class BrokerManager:
+ def __init__(self):
+ self.brokerName = None
+ self.qmf = None
+ self.broker = None
+
+ def SetBroker(self, brokerUrl):
+ self.url = brokerUrl
+ self.qmf = Session()
+ self.broker = self.qmf.addBroker(brokerUrl)
+ agents = self.qmf.getAgents()
+ for a in agents:
+ if a.getAgentBank() == 0:
+ self.brokerAgent = a
+
+ def Disconnect(self):
+ if self.broker:
+ self.qmf.delBroker(self.broker)
+
+ def overview(self):
+ packages = self.qmf.getPackages()
+ if "org.apache.qpid.cluster" not in packages:
+ print "Clustering is not installed on the broker."
+ sys.exit(0)
+
+ clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
+ if len(clusters) == 0:
+ print "Clustering is installed but not enabled on the broker."
+ sys.exit(0)
+
+ cluster = clusters[0]
+ myUrl = cluster.publishedURL
+ memberList = cluster.members.split(";")
+ idList = cluster.memberIDs.split(";")
+
+ print " Cluster Name: %s" % cluster.clusterName
+ print "Cluster Status: %s" % cluster.status
+ print " Cluster Size: %d" % cluster.clusterSize
+ print " Members: ID=%s URL=%s" % (idList[0], memberList[0])
+ for idx in range(1,len(idList)):
+ print " : ID=%s URL=%s" % (idList[idx], memberList[idx])
+
+ def stopMember(self, id):
+ clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
+ if len(clusters) == 0:
+ print "Clustering is installed but not enabled on the broker."
+ sys.exit(0)
+
+ cluster = clusters[0]
+ idList = cluster.memberIDs.split(";")
+ if id not in idList:
+ print "No member with matching ID found"
+ sys.exit(1)
+
+ if not _force:
+ prompt = "Warning: "
+ if len(idList) == 1:
+ prompt += "This command will shut down the last running cluster member."
+ else:
+ prompt += "This command will shut down a cluster member."
+ prompt += " Are you sure? [N]: "
+
+ confirm = raw_input(prompt)
+ if len(confirm) == 0 or confirm[0].upper() != 'Y':
+ print "Operation canceled"
+ sys.exit(1)
+
+ cluster.stopClusterNode(id)
+
+ def stopAll(self):
+ clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
+ if len(clusters) == 0:
+ print "Clustering is installed but not enabled on the broker."
+ sys.exit(0)
+
+ if not _force:
+ prompt = "Warning: This command will shut down the entire cluster."
+ prompt += " Are you sure? [N]: "
+
+ confirm = raw_input(prompt)
+ if len(confirm) == 0 or confirm[0].upper() != 'Y':
+ print "Operation canceled"
+ sys.exit(1)
+
+ cluster = clusters[0]
+ cluster.stopFullCluster()
+
+##
+## Main Program
+##
+
+try:
+ longOpts = ("stop=", "all-stop", "force")
+ (optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "s:kf", longOpts)
+except:
+ Usage ()
+
+try:
+ encoding = locale.getpreferredencoding()
+ cargs = [a.decode(encoding) for a in encArgs]
+except:
+ cargs = encArgs
+
+for opt in optlist:
+ if opt[0] == "-s" or opt[0] == "--stop":
+ _stopId = opt[1]
+ if opt[0] == "-k" or opt[0] == "--all-stop":
+ _stopAll = True
+ if opt[0] == "-f" or opt[0] == "--force":
+ _force = True
+
+nargs = len(cargs)
+bm = BrokerManager()
+
+if nargs == 1:
+ _host = cargs[0]
+
+try:
+ bm.SetBroker(_host)
+ if _stopId:
+ bm.stopMember(_stopId)
+ elif _stopAll:
+ bm.stopAll()
+ else:
+ bm.overview()
+except KeyboardInterrupt:
+ print
+except Exception,e:
+ if e.__repr__().find("connection aborted") > 0:
+ # we expect this when asking the connected broker to shut down
+ sys.exit(0)
+ print "Failed:", e.args
+ sys.exit(1)
+
+bm.Disconnect()