diff options
author | Ted Ross <tross@apache.org> | 2009-02-25 19:41:17 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2009-02-25 19:41:17 +0000 |
commit | 91dd5b774d42e35fb8b3d94f65dbfd12f6bed915 (patch) | |
tree | fdcb24b4243c70a99f520dea5506aafff20fda30 /python/commands | |
parent | 4a3e1f228d9fb9e5198d6ef779031c61e6eaaa2c (diff) | |
download | qpid-python-91dd5b774d42e35fb8b3d94f65dbfd12f6bed915.tar.gz |
Added a new utility for viewing broker stats.
Fixed a bug in qpid-cluster that causes failure when username/password are
included in the broker URL.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@747897 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/commands')
-rwxr-xr-x | python/commands/qpid-cluster | 3 | ||||
-rwxr-xr-x | python/commands/qpid-stat | 385 |
2 files changed, 388 insertions, 0 deletions
diff --git a/python/commands/qpid-cluster b/python/commands/qpid-cluster index 2928ba65ea..4bc40e5abc 100755 --- a/python/commands/qpid-cluster +++ b/python/commands/qpid-cluster @@ -56,6 +56,9 @@ def Usage (): class IpAddr: def __init__(self, text): + if text.find("@") != -1: + tokens = text.split("@") + text = tokens[1] if text.find(":") != -1: tokens = text.split(":") text = tokens[0] diff --git a/python/commands/qpid-stat b/python/commands/qpid-stat new file mode 100755 index 0000000000..2ac0cad9af --- /dev/null +++ b/python/commands/qpid-stat @@ -0,0 +1,385 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import getopt +import sys +import locale +import socket +import re +from qmf.console import Session, Console +from qpid.disp import Display + +_host = "localhost" +_top = False +_types = "" +pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$") + +def Usage (): + print "Usage: qpid-stat [OPTIONS] [broker-addr]" + print + print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]" + print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" + print +# print "General Options:" +# print " -n [--numeric] Don't resolve names" +# print " -t [--top] Repeatedly display top items" +# print + print "Display Options:" + print + print " -b Show Brokers" + print " -c Show Connections" +# print " -s Show Sessions" +# print " -e Show Exchanges" +# print " -q Show Queues" + print + sys.exit (1) + +def num(value): + if value < 2000: + return str(value) + value /= 1000 + if value < 2000: + return str(value) + "k" + value /= 1000 + if value < 2000: + return str(value) + "m" + value /= 1000 + return str(value) + "g" + +class IpAddr: + def __init__(self, text): + if text.find("@") != -1: + tokens = text.split("@") + text = tokens[1] + if text.find(":") != -1: + tokens = text.split(":") + text = tokens[0] + self.port = int(tokens[1]) + else: + self.port = 5672 + self.dottedQuad = socket.gethostbyname(text) + nums = self.dottedQuad.split(".") + self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3]) + + def bestAddr(self, addrPortList): + bestDiff = 0xFFFFFFFF + bestAddr = None + for addrPort in addrPortList: + diff = IpAddr(addrPort[0]).addr ^ self.addr + if diff < bestDiff: + bestDiff = diff + bestAddr = addrPort + return bestAddr + +class Broker(object): + def __init__(self, qmf, broker): + self.broker = broker + list = qmf.getObjects(_class="connection", _package="org.apache.qpid.broker", _broker=broker) + bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _broker=broker)[0] + self.currentTime = bobj.getTimestamps()[0] + try: + self.uptime = bobj.uptime + except: + self.uptime = 0 + self.connections = {} + self.sessions = {} + self.exchanges = {} + self.queues = {} + for conn in list: + if pattern.match(conn.address): + self.connections[conn.getObjectId()] = conn + list = qmf.getObjects(_class="session", _package="org.apache.qpid.broker", _broker=broker) + for sess in list: + if sess.connectionRef in self.connections: + self.sessions[sess.getObjectId()] = sess + list = qmf.getObjects(_class="exchange", _package="org.apache.qpid.broker", _broker=broker) + for exchange in list: + self.exchanges[exchange.getObjectId()] = exchange + list = qmf.getObjects(_class="queue", _package="org.apache.qpid.broker", _broker=broker) + for queue in list: + self.queues[queue.getObjectId()] = queue + + def getName(self): + return self.broker.getUrl() + + def getCurrentTime(self): + return self.currentTime + + def getUptime(self): + return self.uptime + +class BrokerManager(Console): + def __init__(self): + self.brokerName = None + self.qmf = None + self.broker = None + self.brokers = [] + self.cluster = None + + def SetBroker(self, brokerUrl): + self.url = brokerUrl + self.qmf = Session() + self.broker = self.qmf.addBroker(brokerUrl) + agents = self.qmf.getAgents() + for a in agents: + if a.getAgentBank() == 0: + self.brokerAgent = a + + def Disconnect(self): + if self.broker: + self.qmf.delBroker(self.broker) + + def _getCluster(self): + packages = self.qmf.getPackages() + if "org.apache.qpid.cluster" not in packages: + return None + + clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) + if len(clusters) == 0: + print "Clustering is installed but not enabled on the broker." + return None + + self.cluster = clusters[0] + + def _getHostList(self, urlList): + hosts = [] + hostAddr = IpAddr(_host) + for url in urlList: + if url.find("amqp:") != 0: + raise Exception("Invalid URL 1") + url = url[5:] + addrs = str(url).split(",") + addrList = [] + for addr in addrs: + tokens = addr.split(":") + if len(tokens) != 3: + raise Exception("Invalid URL 2") + addrList.append((tokens[1], tokens[2])) + + # Find the address in the list that is most likely to be in the same subnet as the address + # with which we made the original QMF connection. This increases the probability that we will + # be able to reach the cluster member. + + best = hostAddr.bestAddr(addrList) + bestUrl = best[0] + ":" + best[1] + hosts.append(bestUrl) + return hosts + + def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, exchange=None, queue=None): + if len(subs) == 0: + return + this = subs[0] + remaining = subs[1:] + newindent = indent + " " + if this == 'b': + pass + elif this == 'c': + if broker: + for oid in broker.connections: + iconn = broker.connections[oid] + self.printConnSub(indent, broker.getName(), iconn) + self.displaySubs(remaining, newindent, broker=broker, conn=iconn, + sess=sess, exchange=exchange, queue=queue) + elif this == 's': + pass + elif this == 'e': + pass + elif this == 'q': + pass + print + + def displayBroker(self, subs): + disp = Display(prefix=" ") + heads = ('Broker', 'cluster', 'uptime', 'conn', 'sess', 'exch', 'queue') + rows = [] + for broker in self.brokers: + if self.cluster: + ctext = "%s(%s)" % (self.cluster.clusterName, self.cluster.status) + else: + ctext = "<standalone>" + utext = "" + if broker.getUptime() > 0: + utext = disp.duration(broker.getUptime()) + row = (broker.getName(), ctext, utext, + str(len(broker.connections)), str(len(broker.sessions)), + str(len(broker.exchanges)), str(len(broker.queues))) + rows.append(row) + disp.table("Brokers", heads, rows) + + def displayConn(self, subs): + disp = Display(prefix=" ") + heads = [] + if self.cluster: + heads.append('broker') + heads.append('client addr') + heads.append('client(pid)') + heads.append('auth') + heads.append('connected') + heads.append('idle') + heads.append('msgIn') + heads.append('msgOut') + rows = [] + for broker in self.brokers: + for oid in broker.connections: + conn = broker.connections[oid] + row = [] + if self.cluster: + row.append(broker.getName()) + row.append(conn.address) + procpid = "" + if conn.remoteProcessName: + procpid += conn.remoteProcessName + if conn.remotePid: + procpid += "(%d)" % conn.remotePid + row.append(procpid) + row.append(conn.authIdentity) + row.append(disp.duration(broker.getCurrentTime() - conn.getTimestamps()[1])) + idle = broker.getCurrentTime() - conn.getTimestamps()[0] + if idle < 10000000000: + itext = "" + else: + itext = disp.duration(idle) + row.append(itext) + row.append(num(conn.framesFromClient)) + row.append(num(conn.framesToClient)) + rows.append(row) + title = "Connections" + if self.cluster: + title += " for cluster '%s'" % self.cluster.clusterName + disp.table(title, heads, rows) + + def displaySession(self, subs): + disp = Display(prefix=" ") + + def displayExchange(self, subs): + disp = Display(prefix=" ") + heads = [] + if self.cluster: + heads.append('broker') + heads.append("exchange") + heads.append("type") + heads.append("dur") + heads.append("bind") + heads.append("msgIn") + heads.append("msgOut") + heads.append("msgDrop") + heads.append("byteIn") + heads.append("byteOut") + heads.append("byteDrop") + rows = [] + for broker in self.brokers: + for oid in broker.exchanges: + ex = broker.exchanges[oid] + row = [] + if self.cluster: + row.append(broker.getName()) + if ex.durable: + dur = "Y" + else: + dur = "" + row.append(ex.name) + row.append(ex.type) + row.append(dur) + row.append(num(ex.bindingCount)) + row.append(num(ex.msgReceives)) + row.append(num(ex.msgRoutes)) + row.append(num(ex.msgDrops)) + row.append(num(ex.byteReceives)) + row.append(num(ex.byteRoutes)) + row.append(num(ex.byteDrops)) + rows.append(row) + title = "Exchanges" + if self.cluster: + title += " for cluster '%s'" % self.cluster.clusterName + disp.table(title, heads, rows) + + def displayMain(self, main, subs): + if main == 'b': self.displayBroker(subs) + elif main == 'c': self.displayConn(subs) + elif main == 's': self.displaySession(subs) + elif main == 'e': self.displayExchange(subs) + + def display(self): + self._getCluster() + if self.cluster: + memberList = self.cluster.members.split(";") + hostList = self._getHostList(memberList) + self.qmf.delBroker(self.broker) + self.broker = None + for host in hostList: + b = self.qmf.addBroker(host) + self.brokers.append(Broker(self.qmf, b)) + else: + self.brokers.append(Broker(self.qmf, self.broker)) + + self.displayMain(_types[0], _types[1:]) + + +## +## Main Program +## + +try: + longOpts = ("top", "numeric") + (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "bc", longOpts) +except: + Usage() + +try: + encoding = locale.getpreferredencoding() + cargs = [a.decode(encoding) for a in encArgs] +except: + cargs = encArgs + +for opt in optlist: + if opt[0] == "-t" or opt[0] == "--top": + _top = True + elif opt[0] == "-n" or opt[0] == "--numeric": + _numeric = True + elif len(opt[0]) == 2: + char = opt[0][1] + if "bcseq".find(char) != -1: + _types += char + else: + Usage() + else: + Usage() + +if len(_types) == 0: + Usage() + +nargs = len(cargs) +bm = BrokerManager() + +if nargs == 1: + _host = cargs[0] + +try: + bm.SetBroker(_host) + bm.display() +except KeyboardInterrupt: + print +except Exception,e: + print "Failed:", e.args + sys.exit(1) + +bm.Disconnect() |