diff options
author | Ted Ross <tross@apache.org> | 2009-02-19 12:00:46 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2009-02-19 12:00:46 +0000 |
commit | e61b0224853e3864d2d219f8dd4c55a63a860881 (patch) | |
tree | f4119d7112ae45e3ccfaaf4350e9ebf0c692df34 /python | |
parent | 9d0a62d9b64388709ffe1593e305dee8f26a8b84 (diff) | |
download | qpid-python-e61b0224853e3864d2d219f8dd4c55a63a860881.tar.gz |
QPID-1669 - Added client connection management to qpid-cluster.
Also added better session-id discrimination in the qmf.console library.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@745832 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rwxr-xr-x | python/commands/qpid-cluster | 175 | ||||
-rw-r--r-- | python/qmf/console.py | 4 |
2 files changed, 160 insertions, 19 deletions
diff --git a/python/commands/qpid-cluster b/python/commands/qpid-cluster index f2028d944b..2928ba65ea 100755 --- a/python/commands/qpid-cluster +++ b/python/commands/qpid-cluster @@ -23,12 +23,17 @@ import os import getopt import sys import locale +import socket +import re from qmf.console import Session _host = "localhost" _stopId = None _stopAll = False _force = False +_numeric = False +_showConn = False +_delConn = None def Usage (): print "Usage: qpid-cluster [OPTIONS] [broker-addr]" @@ -37,12 +42,40 @@ def Usage (): print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" print print "Options:" - print " -s [--stop] ID Stop one member of the cluster by its ID" - print " -k [--all-stop] Shut down the whole cluster" - print " -f [--force] Suppress the 'are-you-sure?' prompt" + print " -C [--all-connections] View client connections to all cluster members" + print " -c [--connections] ID View client connections to specified member" + print " -d [--del-connection] HOST:PORT" + print " Disconnect a client connection" + print " -s [--stop] ID Stop one member of the cluster by its ID" + print " -k [--all-stop] Shut down the whole cluster" + print " -f [--force] Suppress the 'are-you-sure?' prompt" + print " -n [--numeric] Don't resolve names" print sys.exit (1) + +class IpAddr: + def __init__(self, text): + if text.find(":") != -1: + tokens = text.split(":") + text = tokens[0] + self.port = int(tokens[1]) + else: + self.port = 5672 + self.dottedQuad = socket.gethostbyname(text) + nums = self.dottedQuad.split(".") + self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3]) + + def bestAddr(self, addrPortList): + bestDiff = 0xFFFFFFFF + bestAddr = None + for addrPort in addrPortList: + diff = IpAddr(addrPort[0]).addr ^ self.addr + if diff < bestDiff: + bestDiff = diff + bestAddr = addrPort + return bestAddr + class BrokerManager: def __init__(self): self.brokerName = None @@ -62,7 +95,7 @@ class BrokerManager: if self.broker: self.qmf.delBroker(self.broker) - def overview(self): + def _getClusters(self): packages = self.qmf.getPackages() if "org.apache.qpid.cluster" not in packages: print "Clustering is not installed on the broker." @@ -73,8 +106,35 @@ class BrokerManager: print "Clustering is installed but not enabled on the broker." sys.exit(0) + return clusters + + def _getHostList(self, urlList): + hosts = [] + hostAddr = IpAddr(_host) + for url in urlList: + if url.find("amqp:") != 0: + raise Exception("Invalid URL 1") + url = url[5:] + addrs = str(url).split(",") + addrList = [] + for addr in addrs: + tokens = addr.split(":") + if len(tokens) != 3: + raise Exception("Invalid URL 2") + addrList.append((tokens[1], tokens[2])) + + # Find the address in the list that is most likely to be in the same subnet as the address + # with which we made the original QMF connection. This increases the probability that we will + # be able to reach the cluster member. + + best = hostAddr.bestAddr(addrList) + bestUrl = best[0] + ":" + best[1] + hosts.append(bestUrl) + return hosts + + def overview(self): + clusters = self._getClusters() cluster = clusters[0] - myUrl = cluster.publishedURL memberList = cluster.members.split(";") idList = cluster.memberIDs.split(";") @@ -86,11 +146,7 @@ class BrokerManager: print " : ID=%s URL=%s" % (idList[idx], memberList[idx]) def stopMember(self, id): - clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) - if len(clusters) == 0: - print "Clustering is installed but not enabled on the broker." - sys.exit(0) - + clusters = self._getClusters() cluster = clusters[0] idList = cluster.memberIDs.split(";") if id not in idList: @@ -113,11 +169,7 @@ class BrokerManager: cluster.stopClusterNode(id) def stopAll(self): - clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) - if len(clusters) == 0: - print "Clustering is installed but not enabled on the broker." - sys.exit(0) - + clusters = self._getClusters() if not _force: prompt = "Warning: This command will shut down the entire cluster." prompt += " Are you sure? [N]: " @@ -130,15 +182,72 @@ class BrokerManager: cluster = clusters[0] cluster.stopFullCluster() + def showConnections(self): + clusters = self._getClusters() + cluster = clusters[0] + memberList = cluster.members.split(";") + idList = cluster.memberIDs.split(";") + displayList = [] + hostList = self._getHostList(memberList) + self.qmf.delBroker(self.broker) + self.broker = None + self.brokers = [] + pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$") + + idx = 0 + for host in hostList: + if _showConn == "all" or _showConn == idList[idx] or _delConn: + self.brokers.append(self.qmf.addBroker(host)) + displayList.append(idList[idx]) + idx += 1 + + idx = 0 + found = False + for broker in self.brokers: + if not _delConn: + print "Clients on Member: ID=%s:" % displayList[idx] + connList = self.qmf.getObjects(_class="connection", _package="org.apache.qpid.broker", _broker=broker) + for conn in connList: + if pattern.match(conn.address): + if _numeric or _delConn: + a = conn.address + else: + tokens = conn.address.split(":") + try: + hostList = socket.gethostbyaddr(tokens[0]) + host = hostList[0] + except: + host = tokens[0] + a = host + ":" + tokens[1] + if _delConn: + tokens = _delConn.split(":") + ip = socket.gethostbyname(tokens[0]) + toDelete = ip + ":" + tokens[1] + if a == toDelete: + print "Closing connection from client: %s" % a + conn.close() + found = True + else: + print " %s" % a + idx += 1 + if not _delConn: + print + if _delConn and not found: + print "Client connection '%s' not found" % _delConn + + for broker in self.brokers: + self.qmf.delBroker(broker) + + ## ## Main Program ## try: - longOpts = ("stop=", "all-stop", "force") - (optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "s:kf", longOpts) + longOpts = ("stop=", "all-stop", "force", "connections=", "all-connections" "del-connection=", "numeric") + (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "s:kfCc:d:n", longOpts) except: - Usage () + Usage() try: encoding = locale.getpreferredencoding() @@ -146,13 +255,41 @@ try: except: cargs = encArgs +count = 0 for opt in optlist: if opt[0] == "-s" or opt[0] == "--stop": _stopId = opt[1] + if len(_stopId.split(":")) != 2: + print "Member ID must be of form: <host or ip>:<number>" + sys.exit(1) + count += 1 if opt[0] == "-k" or opt[0] == "--all-stop": _stopAll = True + count += 1 if opt[0] == "-f" or opt[0] == "--force": _force = True + if opt[0] == "-n" or opt[0] == "--numeric": + _numeric = True + if opt[0] == "-C" or opt[0] == "--all-connections": + _showConn = "all" + count += 1 + if opt[0] == "-c" or opt[0] == "--connections": + _showConn = opt[1] + if len(_showConn.split(":")) != 2: + print "Member ID must be of form: <host or ip>:<number>" + sys.exit(1) + count += 1 + if opt[0] == "-d" or opt[0] == "--del-connection": + _delConn = opt[1] + if len(_delConn.split(":")) != 2: + print "Connection must be of form: <host or ip>:<port>" + sys.exit(1) + count += 1 + +if count > 1: + print "Only one command option may be supplied" + print + Usage() nargs = len(cargs) bm = BrokerManager() @@ -166,6 +303,8 @@ try: bm.stopMember(_stopId) elif _stopAll: bm.stopAll() + elif _showConn or _delConn: + bm.showConnections() else: bm.overview() except KeyboardInterrupt: diff --git a/python/qmf/console.py b/python/qmf/console.py index 867d707f31..abc0929955 100644 --- a/python/qmf/console.py +++ b/python/qmf/console.py @@ -1230,6 +1230,7 @@ class ManagedConnection(Thread): class Broker: """ This object represents a connection (or potential connection) to a QMF broker. """ SYNC_TIME = 60 + nextSeq = 1 def __init__(self, session, host, port, authMech, authUser, authPass, ssl=False): self.session = session @@ -1242,7 +1243,8 @@ class Broker: self.error = None self.brokerId = None self.connected = False - self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid()) + self.amqpSessionId = "%s.%d.%d" % (os.uname()[1], os.getpid(), Broker.nextSeq) + Broker.nextSeq += 1 if self.session.manageConnections: self.thread = ManagedConnection(self) self.thread.start() |