diff options
Diffstat (limited to 'qpid/python/commands/qpid-stat')
-rwxr-xr-x | qpid/python/commands/qpid-stat | 449 |
1 files changed, 449 insertions, 0 deletions
diff --git a/qpid/python/commands/qpid-stat b/qpid/python/commands/qpid-stat new file mode 100755 index 0000000000..9d41f4e966 --- /dev/null +++ b/qpid/python/commands/qpid-stat @@ -0,0 +1,449 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import getopt +import sys +import locale +import socket +import re +from qmf.console import Session, Console +from qpid.disp import Display, Header, Sorter + +_host = "localhost" +_top = False +_types = "" +_limit = 50 +_increasing = False +_sortcol = None +pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$") + +def Usage (): + print "Usage: qpid-stat [OPTIONS] [broker-addr]" + print + print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]" + print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" + print +# print "General Options:" +# print " -n [--numeric] Don't resolve names" +# print " -t [--top] Repeatedly display top items" +# print + print "Display Options:" + print + print " -b Show Brokers" + print " -c Show Connections" +# print " -s Show Sessions" + print " -e Show Exchanges" + print " -q Show Queues" + print + print " -S [--sort-by] COLNAME Sort by column name" + print " -I [--increasing] Sort by increasing value (default = decreasing)" + print " -L [--limit] NUM Limit output to NUM rows (default = 50)" + print + sys.exit (1) + +class IpAddr: + def __init__(self, text): + if text.find("@") != -1: + tokens = text.split("@") + text = tokens[1] + if text.find(":") != -1: + tokens = text.split(":") + text = tokens[0] + self.port = int(tokens[1]) + else: + self.port = 5672 + self.dottedQuad = socket.gethostbyname(text) + nums = self.dottedQuad.split(".") + self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3]) + + def bestAddr(self, addrPortList): + bestDiff = 0xFFFFFFFF + bestAddr = None + for addrPort in addrPortList: + diff = IpAddr(addrPort[0]).addr ^ self.addr + if diff < bestDiff: + bestDiff = diff + bestAddr = addrPort + return bestAddr + +class Broker(object): + def __init__(self, qmf, broker): + self.broker = broker + bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _broker=broker)[0] + self.currentTime = bobj.getTimestamps()[0] + try: + self.uptime = bobj.uptime + except: + self.uptime = 0 + self.connections = {} + self.sessions = {} + self.exchanges = {} + self.queues = {} + package = "org.apache.qpid.broker" + + list = qmf.getObjects(_class="connection", _package=package, _broker=broker) + for conn in list: + if pattern.match(conn.address): + self.connections[conn.getObjectId()] = conn + + list = qmf.getObjects(_class="session", _package=package, _broker=broker) + for sess in list: + if sess.connectionRef in self.connections: + self.sessions[sess.getObjectId()] = sess + + list = qmf.getObjects(_class="exchange", _package=package, _broker=broker) + for exchange in list: + self.exchanges[exchange.getObjectId()] = exchange + + list = qmf.getObjects(_class="queue", _package=package, _broker=broker) + for queue in list: + self.queues[queue.getObjectId()] = queue + + def getName(self): + return self.broker.getUrl() + + def getCurrentTime(self): + return self.currentTime + + def getUptime(self): + return self.uptime + +class BrokerManager(Console): + def __init__(self): + self.brokerName = None + self.qmf = None + self.broker = None + self.brokers = [] + self.cluster = None + + def SetBroker(self, brokerUrl): + self.url = brokerUrl + self.qmf = Session() + self.broker = self.qmf.addBroker(brokerUrl) + agents = self.qmf.getAgents() + for a in agents: + if a.getAgentBank() == 0: + self.brokerAgent = a + + def Disconnect(self): + if self.broker: + self.qmf.delBroker(self.broker) + + def _getCluster(self): + packages = self.qmf.getPackages() + if "org.apache.qpid.cluster" not in packages: + return None + + clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) + if len(clusters) == 0: + print "Clustering is installed but not enabled on the broker." + return None + + self.cluster = clusters[0] + + def _getHostList(self, urlList): + hosts = [] + hostAddr = IpAddr(_host) + for url in urlList: + if url.find("amqp:") != 0: + raise Exception("Invalid URL 1") + url = url[5:] + addrs = str(url).split(",") + addrList = [] + for addr in addrs: + tokens = addr.split(":") + if len(tokens) != 3: + raise Exception("Invalid URL 2") + addrList.append((tokens[1], tokens[2])) + + # Find the address in the list that is most likely to be in the same subnet as the address + # with which we made the original QMF connection. This increases the probability that we will + # be able to reach the cluster member. + + best = hostAddr.bestAddr(addrList) + bestUrl = best[0] + ":" + best[1] + hosts.append(bestUrl) + return hosts + + def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, exchange=None, queue=None): + if len(subs) == 0: + return + this = subs[0] + remaining = subs[1:] + newindent = indent + " " + if this == 'b': + pass + elif this == 'c': + if broker: + for oid in broker.connections: + iconn = broker.connections[oid] + self.printConnSub(indent, broker.getName(), iconn) + self.displaySubs(remaining, newindent, broker=broker, conn=iconn, + sess=sess, exchange=exchange, queue=queue) + elif this == 's': + pass + elif this == 'e': + pass + elif this == 'q': + pass + print + + def displayBroker(self, subs): + disp = Display(prefix=" ") + heads = [] + heads.append(Header('broker')) + heads.append(Header('cluster')) + heads.append(Header('uptime', Header.DURATION)) + heads.append(Header('conn', Header.KMG)) + heads.append(Header('sess', Header.KMG)) + heads.append(Header('exch', Header.KMG)) + heads.append(Header('queue', Header.KMG)) + rows = [] + for broker in self.brokers: + if self.cluster: + ctext = "%s(%s)" % (self.cluster.clusterName, self.cluster.status) + else: + ctext = "<standalone>" + row = (broker.getName(), ctext, broker.getUptime(), + len(broker.connections), len(broker.sessions), + len(broker.exchanges), len(broker.queues)) + rows.append(row) + title = "Brokers" + if _sortcol: + sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) + dispRows = sorter.getSorted() + else: + dispRows = rows + disp.formattedTable(title, heads, dispRows) + + def displayConn(self, subs): + disp = Display(prefix=" ") + heads = [] + if self.cluster: + heads.append(Header('broker')) + heads.append(Header('client-addr')) + heads.append(Header('cproc')) + heads.append(Header('cpid')) + heads.append(Header('auth')) + heads.append(Header('connected', Header.DURATION)) + heads.append(Header('idle', Header.DURATION)) + heads.append(Header('msgIn', Header.KMG)) + heads.append(Header('msgOut', Header.KMG)) + rows = [] + for broker in self.brokers: + for oid in broker.connections: + conn = broker.connections[oid] + row = [] + if self.cluster: + row.append(broker.getName()) + row.append(conn.address) + row.append(conn.remoteProcessName) + row.append(conn.remotePid) + row.append(conn.authIdentity) + row.append(broker.getCurrentTime() - conn.getTimestamps()[1]) + idle = broker.getCurrentTime() - conn.getTimestamps()[0] + row.append(broker.getCurrentTime() - conn.getTimestamps()[0]) + row.append(conn.framesFromClient) + row.append(conn.framesToClient) + rows.append(row) + title = "Connections" + if self.cluster: + title += " for cluster '%s'" % self.cluster.clusterName + if _sortcol: + sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) + dispRows = sorter.getSorted() + else: + dispRows = rows + disp.formattedTable(title, heads, dispRows) + + def displaySession(self, subs): + disp = Display(prefix=" ") + + def displayExchange(self, subs): + disp = Display(prefix=" ") + heads = [] + if self.cluster: + heads.append(Header('broker')) + heads.append(Header("exchange")) + heads.append(Header("type")) + heads.append(Header("dur", Header.Y)) + heads.append(Header("bind", Header.KMG)) + heads.append(Header("msgIn", Header.KMG)) + heads.append(Header("msgOut", Header.KMG)) + heads.append(Header("msgDrop", Header.KMG)) + heads.append(Header("byteIn", Header.KMG)) + heads.append(Header("byteOut", Header.KMG)) + heads.append(Header("byteDrop", Header.KMG)) + rows = [] + for broker in self.brokers: + for oid in broker.exchanges: + ex = broker.exchanges[oid] + row = [] + if self.cluster: + row.append(broker.getName()) + row.append(ex.name) + row.append(ex.type) + row.append(ex.durable) + row.append(ex.bindingCount) + row.append(ex.msgReceives) + row.append(ex.msgRoutes) + row.append(ex.msgDrops) + row.append(ex.byteReceives) + row.append(ex.byteRoutes) + row.append(ex.byteDrops) + rows.append(row) + title = "Exchanges" + if self.cluster: + title += " for cluster '%s'" % self.cluster.clusterName + if _sortcol: + sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) + dispRows = sorter.getSorted() + else: + dispRows = rows + disp.formattedTable(title, heads, dispRows) + + def displayQueue(self, subs): + disp = Display(prefix=" ") + heads = [] + if self.cluster: + heads.append(Header('broker')) + heads.append(Header("queue")) + heads.append(Header("dur", Header.Y)) + heads.append(Header("autoDel", Header.Y)) + heads.append(Header("excl", Header.Y)) + heads.append(Header("msg", Header.KMG)) + heads.append(Header("msgIn", Header.KMG)) + heads.append(Header("msgOut", Header.KMG)) + heads.append(Header("bytes", Header.KMG)) + heads.append(Header("bytesIn", Header.KMG)) + heads.append(Header("bytesOut", Header.KMG)) + heads.append(Header("cons", Header.KMG)) + heads.append(Header("bind", Header.KMG)) + rows = [] + for broker in self.brokers: + for oid in broker.queues: + q = broker.queues[oid] + row = [] + if self.cluster: + row.append(broker.getName()) + row.append(q.name) + row.append(q.durable) + row.append(q.autoDelete) + row.append(q.exclusive) + row.append(q.msgDepth) + row.append(q.msgTotalEnqueues) + row.append(q.msgTotalDequeues) + row.append(q.byteDepth) + row.append(q.byteTotalEnqueues) + row.append(q.byteTotalDequeues) + row.append(q.consumerCount) + row.append(q.bindingCount) + rows.append(row) + title = "Queues" + if self.cluster: + title += " for cluster '%s'" % self.cluster.clusterName + if _sortcol: + sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) + dispRows = sorter.getSorted() + else: + dispRows = rows + disp.formattedTable(title, heads, dispRows) + + def displayMain(self, main, subs): + if main == 'b': self.displayBroker(subs) + elif main == 'c': self.displayConn(subs) + elif main == 's': self.displaySession(subs) + elif main == 'e': self.displayExchange(subs) + elif main == 'q': self.displayQueue(subs) + + def display(self): + self._getCluster() + if self.cluster: + memberList = self.cluster.members.split(";") + hostList = self._getHostList(memberList) + self.qmf.delBroker(self.broker) + self.broker = None + for host in hostList: + b = self.qmf.addBroker(host) + self.brokers.append(Broker(self.qmf, b)) + else: + self.brokers.append(Broker(self.qmf, self.broker)) + + self.displayMain(_types[0], _types[1:]) + + +## +## Main Program +## + +try: + longOpts = ("top", "numeric", "sort-by=", "limit=", "increasing") + (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "bceqS:L:I", longOpts) +except: + Usage() + +try: + encoding = locale.getpreferredencoding() + cargs = [a.decode(encoding) for a in encArgs] +except: + cargs = encArgs + +for opt in optlist: + if opt[0] == "-t" or opt[0] == "--top": + _top = True + elif opt[0] == "-n" or opt[0] == "--numeric": + _numeric = True + elif opt[0] == "-S" or opt[0] == "--sort-by": + _sortcol = opt[1] + elif opt[0] == "-I" or opt[0] == "--increasing": + _increasing = True + elif opt[0] == "-L" or opt[0] == "--limit": + _limit = int(opt[1]) + elif len(opt[0]) == 2: + char = opt[0][1] + if "bcseq".find(char) != -1: + _types += char + else: + Usage() + else: + Usage() + +if len(_types) == 0: + Usage() + +nargs = len(cargs) +bm = BrokerManager() + +if nargs == 1: + _host = cargs[0] + +try: + bm.SetBroker(_host) + bm.display() +except KeyboardInterrupt: + print +except Exception,e: + print "Failed:", e.args + #raise # TODO: Remove before flight + sys.exit(1) + +bm.Disconnect() |