diff options
Diffstat (limited to 'trunk/qpid/python/commands/qpid-cluster')
-rwxr-xr-x | trunk/qpid/python/commands/qpid-cluster | 328 |
1 files changed, 0 insertions, 328 deletions
diff --git a/trunk/qpid/python/commands/qpid-cluster b/trunk/qpid/python/commands/qpid-cluster deleted file mode 100755 index 7afb7671b8..0000000000 --- a/trunk/qpid/python/commands/qpid-cluster +++ /dev/null @@ -1,328 +0,0 @@ -#!/usr/bin/env python - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import os -import getopt -import sys -import locale -import socket -import re -from qmf.console import Session - -class Config: - def __init__(self): - self._host = "localhost" - self._connTimeout = 10 - self._stopId = None - self._stopAll = False - self._force = False - self._numeric = False - self._showConn = False - self._delConn = None - -def usage (): - print "Usage: qpid-cluster [OPTIONS] [broker-addr]" - print - print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]" - print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" - print - print "Options:" - print " --timeout seconds (10) Maximum time to wait for broker connection" - print " -C [--all-connections] View client connections to all cluster members" - print " -c [--connections] ID View client connections to specified member" - print " -d [--del-connection] HOST:PORT" - print " Disconnect a client connection" - print " -s [--stop] ID Stop one member of the cluster by its ID" - print " -k [--all-stop] Shut down the whole cluster" - print " -f [--force] Suppress the 'are-you-sure?' prompt" - print " -n [--numeric] Don't resolve names" - print - -class IpAddr: - def __init__(self, text): - if text.find("@") != -1: - tokens = text.split("@") - text = tokens[1] - if text.find(":") != -1: - tokens = text.split(":") - text = tokens[0] - self.port = int(tokens[1]) - else: - self.port = 5672 - self.dottedQuad = socket.gethostbyname(text) - nums = self.dottedQuad.split(".") - self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3]) - - def bestAddr(self, addrPortList): - bestDiff = 0xFFFFFFFFL - bestAddr = None - for addrPort in addrPortList: - diff = IpAddr(addrPort[0]).addr ^ self.addr - if diff < bestDiff: - bestDiff = diff - bestAddr = addrPort - return bestAddr - -class BrokerManager: - def __init__(self, config): - self.config = config - self.brokerName = None - self.qmf = None - self.broker = None - - def SetBroker(self, brokerUrl): - self.url = brokerUrl - self.qmf = Session() - self.broker = self.qmf.addBroker(brokerUrl, self.config._connTimeout) - agents = self.qmf.getAgents() - for a in agents: - if a.getAgentBank() == 0: - self.brokerAgent = a - - def Disconnect(self): - if self.broker: - self.qmf.delBroker(self.broker) - - def _getClusters(self): - packages = self.qmf.getPackages() - if "org.apache.qpid.cluster" not in packages: - raise Exception("Clustering is not installed on the broker.") - - clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) - if len(clusters) == 0: - raise Exception("Clustering is installed but not enabled on the broker.") - - return clusters - - def _getHostList(self, urlList): - hosts = [] - hostAddr = IpAddr(self.config._host) - for url in urlList: - if url.find("amqp:") != 0: - raise Exception("Invalid URL 1") - url = url[5:] - addrs = str(url).split(",") - addrList = [] - for addr in addrs: - tokens = addr.split(":") - if len(tokens) != 3: - raise Exception("Invalid URL 2") - addrList.append((tokens[1], tokens[2])) - - # Find the address in the list that is most likely to be in the same subnet as the address - # with which we made the original QMF connection. This increases the probability that we will - # be able to reach the cluster member. - - best = hostAddr.bestAddr(addrList) - bestUrl = best[0] + ":" + best[1] - hosts.append(bestUrl) - return hosts - - def overview(self): - clusters = self._getClusters() - cluster = clusters[0] - memberList = cluster.members.split(";") - idList = cluster.memberIDs.split(";") - - print " Cluster Name: %s" % cluster.clusterName - print "Cluster Status: %s" % cluster.status - print " Cluster Size: %d" % cluster.clusterSize - print " Members: ID=%s URL=%s" % (idList[0], memberList[0]) - for idx in range(1,len(idList)): - print " : ID=%s URL=%s" % (idList[idx], memberList[idx]) - - def stopMember(self, id): - clusters = self._getClusters() - cluster = clusters[0] - idList = cluster.memberIDs.split(";") - if id not in idList: - raise Exception("No member with matching ID found") - - if not self.config._force: - prompt = "Warning: " - if len(idList) == 1: - prompt += "This command will shut down the last running cluster member." - else: - prompt += "This command will shut down a cluster member." - prompt += " Are you sure? [N]: " - - confirm = raw_input(prompt) - if len(confirm) == 0 or confirm[0].upper() != 'Y': - raise Exception("Operation canceled") - - cluster.stopClusterNode(id) - - def stopAll(self): - clusters = self._getClusters() - if not self.config._force: - prompt = "Warning: This command will shut down the entire cluster." - prompt += " Are you sure? [N]: " - - confirm = raw_input(prompt) - if len(confirm) == 0 or confirm[0].upper() != 'Y': - raise Exception("Operation canceled") - - cluster = clusters[0] - cluster.stopFullCluster() - - def showConnections(self): - clusters = self._getClusters() - cluster = clusters[0] - memberList = cluster.members.split(";") - idList = cluster.memberIDs.split(";") - displayList = [] - hostList = self._getHostList(memberList) - self.qmf.delBroker(self.broker) - self.broker = None - self.brokers = [] - pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$") - - idx = 0 - for host in hostList: - if self.config._showConn == "all" or self.config._showConn == idList[idx] or self.config._delConn: - self.brokers.append(self.qmf.addBroker(host, self.config._connTimeout)) - displayList.append(idList[idx]) - idx += 1 - - idx = 0 - found = False - for broker in self.brokers: - if not self.config._delConn: - print "Clients on Member: ID=%s:" % displayList[idx] - connList = self.qmf.getObjects(_class="connection", _package="org.apache.qpid.broker", _broker=broker) - for conn in connList: - if pattern.match(conn.address): - if self.config._numeric or self.config._delConn: - a = conn.address - else: - tokens = conn.address.split(":") - try: - hostList = socket.gethostbyaddr(tokens[0]) - host = hostList[0] - except: - host = tokens[0] - a = host + ":" + tokens[1] - if self.config._delConn: - tokens = self.config._delConn.split(":") - ip = socket.gethostbyname(tokens[0]) - toDelete = ip + ":" + tokens[1] - if a == toDelete: - print "Closing connection from client: %s" % a - conn.close() - found = True - else: - print " %s" % a - idx += 1 - if not self.config._delConn: - print - if self.config._delConn and not found: - print "Client connection '%s' not found" % self.config._delConn - - for broker in self.brokers: - self.qmf.delBroker(broker) - - -def main(argv=None): - if argv is None: argv = sys.argv - try: - config = Config() - try: - longOpts = ("stop=", "all-stop", "force", "connections=", "all-connections" "del-connection=", "numeric", "timeout=") - (optlist, encArgs) = getopt.gnu_getopt(argv[1:], "s:kfCc:d:n", longOpts) - except: - usage() - return 1 - - try: - encoding = locale.getpreferredencoding() - cargs = [a.decode(encoding) for a in encArgs] - except: - cargs = encArgs - - count = 0 - for opt in optlist: - if opt[0] == "--timeout": - config._connTimeout = int(opt[1]) - if config._connTimeout == 0: - config._connTimeout = None - if opt[0] == "-s" or opt[0] == "--stop": - config._stopId = opt[1] - if len(config._stopId.split(":")) != 2: - raise Exception("Member ID must be of form: <host or ip>:<number>") - count += 1 - if opt[0] == "-k" or opt[0] == "--all-stop": - config._stopAll = True - count += 1 - if opt[0] == "-f" or opt[0] == "--force": - config._force = True - if opt[0] == "-n" or opt[0] == "--numeric": - config._numeric = True - if opt[0] == "-C" or opt[0] == "--all-connections": - config._showConn = "all" - count += 1 - if opt[0] == "-c" or opt[0] == "--connections": - config._showConn = opt[1] - if len(config._showConn.split(":")) != 2: - raise Exception("Member ID must be of form: <host or ip>:<number>") - count += 1 - if opt[0] == "-d" or opt[0] == "--del-connection": - config._delConn = opt[1] - if len(config._delConn.split(":")) != 2: - raise Exception("Connection must be of form: <host or ip>:<port>") - count += 1 - - if count > 1: - print "Only one command option may be supplied" - print - usage() - return 1 - - nargs = len(cargs) - bm = BrokerManager(config) - - if nargs == 1: - config._host = cargs[0] - - try: - bm.SetBroker(config._host) - if config._stopId: - bm.stopMember(config._stopId) - elif config._stopAll: - bm.stopAll() - elif config._showConn or config._delConn: - bm.showConnections() - else: - bm.overview() - except KeyboardInterrupt: - print - except Exception,e: - if str(e).find("connection aborted") > 0: - # we expect this when asking the connected broker to shut down - return 0 - raise Exception("Failed: %s - %s" % (e.__class__.__name__, e)) - - bm.Disconnect() - except Exception, e: - print str(e) - return 1 - -if __name__ == "__main__": - sys.exit(main()) |