diff options
Diffstat (limited to 'trunk/qpid/python/commands/qpid-cluster')
-rwxr-xr-x | trunk/qpid/python/commands/qpid-cluster | 328 |
1 files changed, 328 insertions, 0 deletions
diff --git a/trunk/qpid/python/commands/qpid-cluster b/trunk/qpid/python/commands/qpid-cluster new file mode 100755 index 0000000000..7afb7671b8 --- /dev/null +++ b/trunk/qpid/python/commands/qpid-cluster @@ -0,0 +1,328 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import getopt +import sys +import locale +import socket +import re +from qmf.console import Session + +class Config: + def __init__(self): + self._host = "localhost" + self._connTimeout = 10 + self._stopId = None + self._stopAll = False + self._force = False + self._numeric = False + self._showConn = False + self._delConn = None + +def usage (): + print "Usage: qpid-cluster [OPTIONS] [broker-addr]" + print + print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]" + print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" + print + print "Options:" + print " --timeout seconds (10) Maximum time to wait for broker connection" + print " -C [--all-connections] View client connections to all cluster members" + print " -c [--connections] ID View client connections to specified member" + print " -d [--del-connection] HOST:PORT" + print " Disconnect a client connection" + print " -s [--stop] ID Stop one member of the cluster by its ID" + print " -k [--all-stop] Shut down the whole cluster" + print " -f [--force] Suppress the 'are-you-sure?' prompt" + print " -n [--numeric] Don't resolve names" + print + +class IpAddr: + def __init__(self, text): + if text.find("@") != -1: + tokens = text.split("@") + text = tokens[1] + if text.find(":") != -1: + tokens = text.split(":") + text = tokens[0] + self.port = int(tokens[1]) + else: + self.port = 5672 + self.dottedQuad = socket.gethostbyname(text) + nums = self.dottedQuad.split(".") + self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3]) + + def bestAddr(self, addrPortList): + bestDiff = 0xFFFFFFFFL + bestAddr = None + for addrPort in addrPortList: + diff = IpAddr(addrPort[0]).addr ^ self.addr + if diff < bestDiff: + bestDiff = diff + bestAddr = addrPort + return bestAddr + +class BrokerManager: + def __init__(self, config): + self.config = config + self.brokerName = None + self.qmf = None + self.broker = None + + def SetBroker(self, brokerUrl): + self.url = brokerUrl + self.qmf = Session() + self.broker = self.qmf.addBroker(brokerUrl, self.config._connTimeout) + agents = self.qmf.getAgents() + for a in agents: + if a.getAgentBank() == 0: + self.brokerAgent = a + + def Disconnect(self): + if self.broker: + self.qmf.delBroker(self.broker) + + def _getClusters(self): + packages = self.qmf.getPackages() + if "org.apache.qpid.cluster" not in packages: + raise Exception("Clustering is not installed on the broker.") + + clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) + if len(clusters) == 0: + raise Exception("Clustering is installed but not enabled on the broker.") + + return clusters + + def _getHostList(self, urlList): + hosts = [] + hostAddr = IpAddr(self.config._host) + for url in urlList: + if url.find("amqp:") != 0: + raise Exception("Invalid URL 1") + url = url[5:] + addrs = str(url).split(",") + addrList = [] + for addr in addrs: + tokens = addr.split(":") + if len(tokens) != 3: + raise Exception("Invalid URL 2") + addrList.append((tokens[1], tokens[2])) + + # Find the address in the list that is most likely to be in the same subnet as the address + # with which we made the original QMF connection. This increases the probability that we will + # be able to reach the cluster member. + + best = hostAddr.bestAddr(addrList) + bestUrl = best[0] + ":" + best[1] + hosts.append(bestUrl) + return hosts + + def overview(self): + clusters = self._getClusters() + cluster = clusters[0] + memberList = cluster.members.split(";") + idList = cluster.memberIDs.split(";") + + print " Cluster Name: %s" % cluster.clusterName + print "Cluster Status: %s" % cluster.status + print " Cluster Size: %d" % cluster.clusterSize + print " Members: ID=%s URL=%s" % (idList[0], memberList[0]) + for idx in range(1,len(idList)): + print " : ID=%s URL=%s" % (idList[idx], memberList[idx]) + + def stopMember(self, id): + clusters = self._getClusters() + cluster = clusters[0] + idList = cluster.memberIDs.split(";") + if id not in idList: + raise Exception("No member with matching ID found") + + if not self.config._force: + prompt = "Warning: " + if len(idList) == 1: + prompt += "This command will shut down the last running cluster member." + else: + prompt += "This command will shut down a cluster member." + prompt += " Are you sure? [N]: " + + confirm = raw_input(prompt) + if len(confirm) == 0 or confirm[0].upper() != 'Y': + raise Exception("Operation canceled") + + cluster.stopClusterNode(id) + + def stopAll(self): + clusters = self._getClusters() + if not self.config._force: + prompt = "Warning: This command will shut down the entire cluster." + prompt += " Are you sure? [N]: " + + confirm = raw_input(prompt) + if len(confirm) == 0 or confirm[0].upper() != 'Y': + raise Exception("Operation canceled") + + cluster = clusters[0] + cluster.stopFullCluster() + + def showConnections(self): + clusters = self._getClusters() + cluster = clusters[0] + memberList = cluster.members.split(";") + idList = cluster.memberIDs.split(";") + displayList = [] + hostList = self._getHostList(memberList) + self.qmf.delBroker(self.broker) + self.broker = None + self.brokers = [] + pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$") + + idx = 0 + for host in hostList: + if self.config._showConn == "all" or self.config._showConn == idList[idx] or self.config._delConn: + self.brokers.append(self.qmf.addBroker(host, self.config._connTimeout)) + displayList.append(idList[idx]) + idx += 1 + + idx = 0 + found = False + for broker in self.brokers: + if not self.config._delConn: + print "Clients on Member: ID=%s:" % displayList[idx] + connList = self.qmf.getObjects(_class="connection", _package="org.apache.qpid.broker", _broker=broker) + for conn in connList: + if pattern.match(conn.address): + if self.config._numeric or self.config._delConn: + a = conn.address + else: + tokens = conn.address.split(":") + try: + hostList = socket.gethostbyaddr(tokens[0]) + host = hostList[0] + except: + host = tokens[0] + a = host + ":" + tokens[1] + if self.config._delConn: + tokens = self.config._delConn.split(":") + ip = socket.gethostbyname(tokens[0]) + toDelete = ip + ":" + tokens[1] + if a == toDelete: + print "Closing connection from client: %s" % a + conn.close() + found = True + else: + print " %s" % a + idx += 1 + if not self.config._delConn: + print + if self.config._delConn and not found: + print "Client connection '%s' not found" % self.config._delConn + + for broker in self.brokers: + self.qmf.delBroker(broker) + + +def main(argv=None): + if argv is None: argv = sys.argv + try: + config = Config() + try: + longOpts = ("stop=", "all-stop", "force", "connections=", "all-connections" "del-connection=", "numeric", "timeout=") + (optlist, encArgs) = getopt.gnu_getopt(argv[1:], "s:kfCc:d:n", longOpts) + except: + usage() + return 1 + + try: + encoding = locale.getpreferredencoding() + cargs = [a.decode(encoding) for a in encArgs] + except: + cargs = encArgs + + count = 0 + for opt in optlist: + if opt[0] == "--timeout": + config._connTimeout = int(opt[1]) + if config._connTimeout == 0: + config._connTimeout = None + if opt[0] == "-s" or opt[0] == "--stop": + config._stopId = opt[1] + if len(config._stopId.split(":")) != 2: + raise Exception("Member ID must be of form: <host or ip>:<number>") + count += 1 + if opt[0] == "-k" or opt[0] == "--all-stop": + config._stopAll = True + count += 1 + if opt[0] == "-f" or opt[0] == "--force": + config._force = True + if opt[0] == "-n" or opt[0] == "--numeric": + config._numeric = True + if opt[0] == "-C" or opt[0] == "--all-connections": + config._showConn = "all" + count += 1 + if opt[0] == "-c" or opt[0] == "--connections": + config._showConn = opt[1] + if len(config._showConn.split(":")) != 2: + raise Exception("Member ID must be of form: <host or ip>:<number>") + count += 1 + if opt[0] == "-d" or opt[0] == "--del-connection": + config._delConn = opt[1] + if len(config._delConn.split(":")) != 2: + raise Exception("Connection must be of form: <host or ip>:<port>") + count += 1 + + if count > 1: + print "Only one command option may be supplied" + print + usage() + return 1 + + nargs = len(cargs) + bm = BrokerManager(config) + + if nargs == 1: + config._host = cargs[0] + + try: + bm.SetBroker(config._host) + if config._stopId: + bm.stopMember(config._stopId) + elif config._stopAll: + bm.stopAll() + elif config._showConn or config._delConn: + bm.showConnections() + else: + bm.overview() + except KeyboardInterrupt: + print + except Exception,e: + if str(e).find("connection aborted") > 0: + # we expect this when asking the connected broker to shut down + return 0 + raise Exception("Failed: %s - %s" % (e.__class__.__name__, e)) + + bm.Disconnect() + except Exception, e: + print str(e) + return 1 + +if __name__ == "__main__": + sys.exit(main()) |