summaryrefslogtreecommitdiff
path: root/qpid/tools/src/py/qpid-cluster
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/tools/src/py/qpid-cluster')
-rwxr-xr-xqpid/tools/src/py/qpid-cluster310
1 files changed, 310 insertions, 0 deletions
diff --git a/qpid/tools/src/py/qpid-cluster b/qpid/tools/src/py/qpid-cluster
new file mode 100755
index 0000000000..312d59f670
--- /dev/null
+++ b/qpid/tools/src/py/qpid-cluster
@@ -0,0 +1,310 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+from optparse import OptionParser, OptionGroup
+import sys
+import locale
+import socket
+import re
+from qmf.console import Session
+
+class Config:
+ def __init__(self):
+ self._host = "localhost"
+ self._connTimeout = 10
+ self._stopId = None
+ self._stopAll = False
+ self._force = False
+ self._numeric = False
+ self._showConn = False
+ self._delConn = None
+
+class IpAddr:
+ def __init__(self, text):
+ if text.find("@") != -1:
+ tokens = text.split("@")
+ text = tokens[1]
+ if text.find(":") != -1:
+ tokens = text.split(":")
+ text = tokens[0]
+ self.port = int(tokens[1])
+ else:
+ self.port = 5672
+ self.dottedQuad = socket.gethostbyname(text)
+ nums = self.dottedQuad.split(".")
+ self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])
+
+ def bestAddr(self, addrPortList):
+ bestDiff = 0xFFFFFFFFL
+ bestAddr = None
+ for addrPort in addrPortList:
+ diff = IpAddr(addrPort[0]).addr ^ self.addr
+ if diff < bestDiff:
+ bestDiff = diff
+ bestAddr = addrPort
+ return bestAddr
+
+class BrokerManager:
+ def __init__(self, config):
+ self.config = config
+ self.brokerName = None
+ self.qmf = None
+ self.broker = None
+ self.brokers = []
+
+ def SetBroker(self, brokerUrl):
+ self.url = brokerUrl
+ self.qmf = Session()
+ self.broker = self.qmf.addBroker(brokerUrl, self.config._connTimeout)
+ agents = self.qmf.getAgents()
+ for a in agents:
+ if a.getAgentBank() == '0':
+ self.brokerAgent = a
+
+ def Disconnect(self):
+ """ Release any allocated brokers. Ignore any failures as the tool is
+ shutting down.
+ """
+ try:
+ if self.broker:
+ self.qmf.delBroker(self.broker)
+ self.broker = None
+ while len(self.brokers):
+ b = self.brokers.pop()
+ self.qmf.delBroker(b)
+ except:
+ pass
+
+ def _getClusters(self):
+ packages = self.qmf.getPackages()
+ if "org.apache.qpid.cluster" not in packages:
+ raise Exception("Clustering is not installed on the broker.")
+
+ clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
+ if len(clusters) == 0:
+ raise Exception("Clustering is installed but not enabled on the broker.")
+
+ return clusters
+
+ def _getHostList(self, urlList):
+ hosts = []
+ hostAddr = IpAddr(self.config._host)
+ for url in urlList:
+ if url.find("amqp:") != 0:
+ raise Exception("Invalid URL 1")
+ url = url[5:]
+ addrs = str(url).split(",")
+ addrList = []
+ for addr in addrs:
+ tokens = addr.split(":")
+ if len(tokens) != 3:
+ raise Exception("Invalid URL 2")
+ addrList.append((tokens[1], tokens[2]))
+
+ # Find the address in the list that is most likely to be in the same subnet as the address
+ # with which we made the original QMF connection. This increases the probability that we will
+ # be able to reach the cluster member.
+
+ best = hostAddr.bestAddr(addrList)
+ bestUrl = best[0] + ":" + best[1]
+ hosts.append(bestUrl)
+ return hosts
+
+ def overview(self):
+ clusters = self._getClusters()
+ cluster = clusters[0]
+ memberList = cluster.members.split(";")
+ idList = cluster.memberIDs.split(";")
+
+ print " Cluster Name: %s" % cluster.clusterName
+ print "Cluster Status: %s" % cluster.status
+ print " Cluster Size: %d" % cluster.clusterSize
+ print " Members: ID=%s URL=%s" % (idList[0], memberList[0])
+ for idx in range(1,len(idList)):
+ print " : ID=%s URL=%s" % (idList[idx], memberList[idx])
+
+ def stopMember(self, id):
+ clusters = self._getClusters()
+ cluster = clusters[0]
+ idList = cluster.memberIDs.split(";")
+ if id not in idList:
+ raise Exception("No member with matching ID found")
+
+ if not self.config._force:
+ prompt = "Warning: "
+ if len(idList) == 1:
+ prompt += "This command will shut down the last running cluster member."
+ else:
+ prompt += "This command will shut down a cluster member."
+ prompt += " Are you sure? [N]: "
+
+ confirm = raw_input(prompt)
+ if len(confirm) == 0 or confirm[0].upper() != 'Y':
+ raise Exception("Operation canceled")
+
+ cluster.stopClusterNode(id)
+
+ def stopAll(self):
+ clusters = self._getClusters()
+ if not self.config._force:
+ prompt = "Warning: This command will shut down the entire cluster."
+ prompt += " Are you sure? [N]: "
+
+ confirm = raw_input(prompt)
+ if len(confirm) == 0 or confirm[0].upper() != 'Y':
+ raise Exception("Operation canceled")
+
+ cluster = clusters[0]
+ cluster.stopFullCluster()
+
+ def showConnections(self):
+ clusters = self._getClusters()
+ cluster = clusters[0]
+ memberList = cluster.members.split(";")
+ idList = cluster.memberIDs.split(";")
+ displayList = []
+ hostList = self._getHostList(memberList)
+ self.qmf.delBroker(self.broker)
+ self.broker = None
+
+ idx = 0
+ for host in hostList:
+ if self.config._showConn == "all" or self.config._showConn == idList[idx] or self.config._delConn:
+ self.brokers.append(self.qmf.addBroker(host, self.config._connTimeout))
+ displayList.append(idList[idx])
+ idx += 1
+
+ idx = 0
+ found = False
+ for broker in self.brokers:
+ if not self.config._delConn:
+ print "Clients on Member: ID=%s:" % displayList[idx]
+ connList = self.qmf.getObjects(_class="connection", _package="org.apache.qpid.broker", _broker=broker)
+ for conn in connList:
+ if not conn.shadow:
+ if self.config._numeric or self.config._delConn:
+ a = conn.address
+ else:
+ tokens = conn.address.split(":")
+ try:
+ hostList = socket.gethostbyaddr(tokens[0])
+ host = hostList[0]
+ except:
+ host = tokens[0]
+ a = host + ":" + tokens[1]
+ if self.config._delConn:
+ tokens = self.config._delConn.split(":")
+ ip = socket.gethostbyname(tokens[0])
+ toDelete = ip + ":" + tokens[1]
+ if a == toDelete:
+ print "Closing connection from client: %s" % a
+ conn.close()
+ found = True
+ else:
+ print " %s" % a
+ idx += 1
+ if not self.config._delConn:
+ print
+ if self.config._delConn and not found:
+ print "Client connection '%s' not found" % self.config._delConn
+
+ while len(self.brokers):
+ broker = self.brokers.pop()
+ self.qmf.delBroker(broker)
+
+def main(argv=None):
+
+ try:
+ config = Config()
+
+ parser = OptionParser(usage="usage: %prog [options] BROKER",
+ description="Example: $ qpid-cluster -C broker-host:10000")
+
+ parser.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="SECS", help="Maximum time to wait for broker connection (in seconds)")
+ parser.add_option("-C", "--all-connections", action="store_true", default=False, help="View client connections to all cluster members")
+ parser.add_option("-c", "--connections", metavar="ID", help="View client connections to specified member")
+ parser.add_option("-d", "--del-connection", metavar="HOST:PORT", help="Disconnect a client connection")
+ parser.add_option("-s", "--stop", metavar="ID", help="Stop one member of the cluster by its ID")
+ parser.add_option("-k", "--all-stop", action="store_true", default=False, help="Shut down the whole cluster")
+ parser.add_option("-f", "--force", action="store_true", default=False, help="Suppress the 'are you sure' prompt")
+ parser.add_option("-n", "--numeric", action="store_true", default=False, help="Don't resolve names")
+
+ opts, args = parser.parse_args(args=argv)
+
+ if args:
+ config._host = args[0]
+
+ if opts.timeout != 0:
+ config._connTimeout = opts.timeout
+ else:
+ config._connTimeout = None
+
+ if opts.all_connections:
+ config._showConn = "all"
+
+ if opts.connections:
+ config._connections = opts.connections
+ if len(config._connections.split(":")) != 2:
+ parser.error("Member ID must be of form: <host or ip>:<number>")
+
+ if opts.del_connection:
+ config._delConn = opts.del_connection
+ if len(config._delConn.split(":")) != 2:
+ parser.error("Member ID must be of form: <host or ip>:<number>")
+
+ if opts.stop:
+ config._stopID = opts.stop
+ if len(config._stopId.split(":")) != 2:
+ parser.error("Member ID must be of form: <host or ip>:<number>")
+
+ config._stopAll = opts.all_stop
+ config._force = opts.force
+ config._numeric = opts.numeric
+
+ bm = BrokerManager(config)
+
+ try:
+ bm.SetBroker(config._host)
+ if config._stopId:
+ bm.stopMember(config._stopId)
+ elif config._stopAll:
+ bm.stopAll()
+ elif config._showConn or config._delConn:
+ bm.showConnections()
+ else:
+ bm.overview()
+ except KeyboardInterrupt:
+ print
+ except Exception,e:
+ bm.Disconnect() # try to deallocate brokers - ignores errors
+ if str(e).find("connection aborted") > 0:
+ # we expect this when asking the connected broker to shut down
+ return 0
+ raise Exception("Failed: %s - %s" % (e.__class__.__name__, e))
+
+ bm.Disconnect()
+ except Exception, e:
+ print str(e)
+ return 1
+
+if __name__ == "__main__":
+ sys.exit(main())