diff options
Diffstat (limited to 'qpid/tools/src/py/qpid-stat')
-rwxr-xr-x | qpid/tools/src/py/qpid-stat | 521 |
1 files changed, 521 insertions, 0 deletions
diff --git a/qpid/tools/src/py/qpid-stat b/qpid/tools/src/py/qpid-stat new file mode 100755 index 0000000000..ce3f5d1ef5 --- /dev/null +++ b/qpid/tools/src/py/qpid-stat @@ -0,0 +1,521 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +from optparse import OptionParser, OptionGroup +from time import sleep ### debug +import sys +import locale +import socket +import re +from qmf.console import Session, Console +from qpid.disp import Display, Header, Sorter + +class Config: + def __init__(self): + self._host = "localhost" + self._connTimeout = 10 + self._types = "" + self._limit = 50 + self._increasing = False + self._sortcol = None + self._cluster_detail = False + self._sasl_mechanism = None + +config = Config() + +def OptionsAndArguments(argv): + """ Set global variables for options, return arguments """ + + global config + + parser = OptionParser(usage="usage: %prog [options] BROKER", + description="Example: $ qpid-stat -q broker-host:10000") + + group1 = OptionGroup(parser, "General Options") + group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", help="Maximum time to wait for broker connection (in seconds)") + group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") + parser.add_option_group(group1) + + group2 = OptionGroup(parser, "Display Options") + group2.add_option("-b", "--broker", help="Show Brokers", + action="store_const", const="b", dest="show") + group2.add_option("-c", "--connections", help="Show Connections", + action="store_const", const="c", dest="show") + group2.add_option("-e", "--exchanges", help="Show Exchanges", + action="store_const", const="e", dest="show") + group2.add_option("-q", "--queues", help="Show Queues", + action="store_const", const="q", dest="show") + group2.add_option("-u", "--subscriptions", help="Show Subscriptions", + action="store_const", const="u", dest="show") + group2.add_option("-S", "--sort-by", metavar="<colname>", + help="Sort by column name") + group2.add_option("-I", "--increasing", action="store_true", default=False, + help="Sort by increasing value (default = decreasing)") + group2.add_option("-L", "--limit", default=50, metavar="<n>", + help="Limit output to n rows") + group2.add_option("-C", "--cluster", action="store_true", default=False, + help="Display per-broker cluster detail.") + parser.add_option_group(group2) + + opts, args = parser.parse_args(args=argv) + + if not opts.show: + parser.error("You must specify one of these options: -b, -c, -e, -q. or -u. For details, try $ qpid-stat --help") + + config._types = opts.show + config._sortcol = opts.sort_by + config._connTimeout = opts.timeout + config._increasing = opts.increasing + config._limit = opts.limit + config._cluster_detail = opts.cluster + config._sasl_mechanism = opts.sasl_mechanism + + if args: + config._host = args[0] + + return args + +class IpAddr: + def __init__(self, text): + if text.find("@") != -1: + tokens = text.split("@") + text = tokens[1] + if text.find(":") != -1: + tokens = text.split(":") + text = tokens[0] + self.port = int(tokens[1]) + else: + self.port = 5672 + self.dottedQuad = socket.gethostbyname(text) + nums = self.dottedQuad.split(".") + self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3]) + + def bestAddr(self, addrPortList): + bestDiff = 0xFFFFFFFFL + bestAddr = None + for addrPort in addrPortList: + diff = IpAddr(addrPort[0]).addr ^ self.addr + if diff < bestDiff: + bestDiff = diff + bestAddr = addrPort + return bestAddr + +class Broker(object): + def __init__(self, qmf, broker): + self.broker = broker + + agents = qmf.getAgents() + for a in agents: + if a.getAgentBank() == '0': + self.brokerAgent = a + + bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _agent=self.brokerAgent)[0] + self.currentTime = bobj.getTimestamps()[0] + try: + self.uptime = bobj.uptime + except: + self.uptime = 0 + self.connections = {} + self.sessions = {} + self.exchanges = {} + self.queues = {} + self.subscriptions = {} + package = "org.apache.qpid.broker" + + list = qmf.getObjects(_class="connection", _package=package, _agent=self.brokerAgent) + for conn in list: + if not conn.shadow: + self.connections[conn.getObjectId()] = conn + + list = qmf.getObjects(_class="session", _package=package, _agent=self.brokerAgent) + for sess in list: + if sess.connectionRef in self.connections: + self.sessions[sess.getObjectId()] = sess + + list = qmf.getObjects(_class="exchange", _package=package, _agent=self.brokerAgent) + for exchange in list: + self.exchanges[exchange.getObjectId()] = exchange + + list = qmf.getObjects(_class="queue", _package=package, _agent=self.brokerAgent) + for queue in list: + self.queues[queue.getObjectId()] = queue + + list = qmf.getObjects(_class="subscription", _package=package, _agent=self.brokerAgent) + for subscription in list: + self.subscriptions[subscription.getObjectId()] = subscription + + def getName(self): + return self.broker.getUrl() + + def getCurrentTime(self): + return self.currentTime + + def getUptime(self): + return self.uptime + +class BrokerManager(Console): + def __init__(self): + self.brokerName = None + self.qmf = None + self.broker = None + self.brokers = [] + self.cluster = None + + def SetBroker(self, brokerUrl, mechanism): + self.url = brokerUrl + self.qmf = Session() + self.mechanism = mechanism + self.broker = self.qmf.addBroker(brokerUrl, config._connTimeout, mechanism) + agents = self.qmf.getAgents() + for a in agents: + if a.getAgentBank() == '0': + self.brokerAgent = a + + def Disconnect(self): + """ Release any allocated brokers. Ignore any failures as the tool is + shutting down. + """ + try: + if self.broker: + self.qmf.delBroker(self.broker) + else: + for b in self.brokers: self.qmf.delBroker(b.broker) + except: + pass + + def _getCluster(self): + packages = self.qmf.getPackages() + if "org.apache.qpid.cluster" not in packages: + return None + + clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) + if len(clusters) == 0: + print "Clustering is installed but not enabled on the broker." + return None + + self.cluster = clusters[0] + + def _getHostList(self, urlList): + hosts = [] + hostAddr = IpAddr(config._host) + for url in urlList: + if url.find("amqp:") != 0: + raise Exception("Invalid URL 1") + url = url[5:] + addrs = str(url).split(",") + addrList = [] + for addr in addrs: + tokens = addr.split(":") + if len(tokens) != 3: + raise Exception("Invalid URL 2") + addrList.append((tokens[1], tokens[2])) + + # Find the address in the list that is most likely to be in the same subnet as the address + # with which we made the original QMF connection. This increases the probability that we will + # be able to reach the cluster member. + + best = hostAddr.bestAddr(addrList) + bestUrl = best[0] + ":" + best[1] + hosts.append(bestUrl) + return hosts + + def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, exchange=None, queue=None): + if len(subs) == 0: + return + this = subs[0] + remaining = subs[1:] + newindent = indent + " " + if this == 'b': + pass + elif this == 'c': + if broker: + for oid in broker.connections: + iconn = broker.connections[oid] + self.printConnSub(indent, broker.getName(), iconn) + self.displaySubs(remaining, newindent, broker=broker, conn=iconn, + sess=sess, exchange=exchange, queue=queue) + elif this == 's': + pass + elif this == 'e': + pass + elif this == 'q': + pass + print + + def displayBroker(self, subs): + disp = Display(prefix=" ") + heads = [] + heads.append(Header('broker')) + heads.append(Header('cluster')) + heads.append(Header('uptime', Header.DURATION)) + heads.append(Header('conn', Header.KMG)) + heads.append(Header('sess', Header.KMG)) + heads.append(Header('exch', Header.KMG)) + heads.append(Header('queue', Header.KMG)) + rows = [] + for broker in self.brokers: + if self.cluster: + ctext = "%s(%s)" % (self.cluster.clusterName, self.cluster.status) + else: + ctext = "<standalone>" + row = (broker.getName(), ctext, broker.getUptime(), + len(broker.connections), len(broker.sessions), + len(broker.exchanges), len(broker.queues)) + rows.append(row) + title = "Brokers" + if config._sortcol: + sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) + dispRows = sorter.getSorted() + else: + dispRows = rows + disp.formattedTable(title, heads, dispRows) + + def displayConn(self, subs): + disp = Display(prefix=" ") + heads = [] + if self.cluster: + heads.append(Header('broker')) + heads.append(Header('client-addr')) + heads.append(Header('cproc')) + heads.append(Header('cpid')) + heads.append(Header('auth')) + heads.append(Header('connected', Header.DURATION)) + heads.append(Header('idle', Header.DURATION)) + heads.append(Header('msgIn', Header.KMG)) + heads.append(Header('msgOut', Header.KMG)) + rows = [] + for broker in self.brokers: + for oid in broker.connections: + conn = broker.connections[oid] + row = [] + if self.cluster: + row.append(broker.getName()) + row.append(conn.address) + row.append(conn.remoteProcessName) + row.append(conn.remotePid) + row.append(conn.authIdentity) + row.append(broker.getCurrentTime() - conn.getTimestamps()[1]) + idle = broker.getCurrentTime() - conn.getTimestamps()[0] + row.append(broker.getCurrentTime() - conn.getTimestamps()[0]) + row.append(conn.framesFromClient) + row.append(conn.framesToClient) + rows.append(row) + title = "Connections" + if self.cluster: + title += " for cluster '%s'" % self.cluster.clusterName + if config._sortcol: + sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) + dispRows = sorter.getSorted() + else: + dispRows = rows + disp.formattedTable(title, heads, dispRows) + + def displaySession(self, subs): + disp = Display(prefix=" ") + + def displayExchange(self, subs): + disp = Display(prefix=" ") + heads = [] + if self.cluster: + heads.append(Header('broker')) + heads.append(Header("exchange")) + heads.append(Header("type")) + heads.append(Header("dur", Header.Y)) + heads.append(Header("bind", Header.KMG)) + heads.append(Header("msgIn", Header.KMG)) + heads.append(Header("msgOut", Header.KMG)) + heads.append(Header("msgDrop", Header.KMG)) + heads.append(Header("byteIn", Header.KMG)) + heads.append(Header("byteOut", Header.KMG)) + heads.append(Header("byteDrop", Header.KMG)) + rows = [] + for broker in self.brokers: + for oid in broker.exchanges: + ex = broker.exchanges[oid] + row = [] + if self.cluster: + row.append(broker.getName()) + row.append(ex.name) + row.append(ex.type) + row.append(ex.durable) + row.append(ex.bindingCount) + row.append(ex.msgReceives) + row.append(ex.msgRoutes) + row.append(ex.msgDrops) + row.append(ex.byteReceives) + row.append(ex.byteRoutes) + row.append(ex.byteDrops) + rows.append(row) + title = "Exchanges" + if self.cluster: + title += " for cluster '%s'" % self.cluster.clusterName + if config._sortcol: + sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) + dispRows = sorter.getSorted() + else: + dispRows = rows + disp.formattedTable(title, heads, dispRows) + + def displayQueue(self, subs): + disp = Display(prefix=" ") + heads = [] + if self.cluster: + heads.append(Header('broker')) + heads.append(Header("queue")) + heads.append(Header("dur", Header.Y)) + heads.append(Header("autoDel", Header.Y)) + heads.append(Header("excl", Header.Y)) + heads.append(Header("msg", Header.KMG)) + heads.append(Header("msgIn", Header.KMG)) + heads.append(Header("msgOut", Header.KMG)) + heads.append(Header("bytes", Header.KMG)) + heads.append(Header("bytesIn", Header.KMG)) + heads.append(Header("bytesOut", Header.KMG)) + heads.append(Header("cons", Header.KMG)) + heads.append(Header("bind", Header.KMG)) + rows = [] + for broker in self.brokers: + for oid in broker.queues: + q = broker.queues[oid] + row = [] + if self.cluster: + row.append(broker.getName()) + row.append(q.name) + row.append(q.durable) + row.append(q.autoDelete) + row.append(q.exclusive) + row.append(q.msgDepth) + row.append(q.msgTotalEnqueues) + row.append(q.msgTotalDequeues) + row.append(q.byteDepth) + row.append(q.byteTotalEnqueues) + row.append(q.byteTotalDequeues) + row.append(q.consumerCount) + row.append(q.bindingCount) + rows.append(row) + title = "Queues" + if self.cluster: + title += " for cluster '%s'" % self.cluster.clusterName + if config._sortcol: + sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) + dispRows = sorter.getSorted() + else: + dispRows = rows + disp.formattedTable(title, heads, dispRows) + + def displaySubscriptions(self, subs): + disp = Display(prefix=" ") + heads = [] + if self.cluster: + heads.append(Header('broker')) + heads.append(Header("subscription")) + heads.append(Header("queue")) + heads.append(Header("connection")) + heads.append(Header("processName")) + heads.append(Header("processId")) + heads.append(Header("browsing", Header.Y)) + heads.append(Header("acknowledged", Header.Y)) + heads.append(Header("exclusive", Header.Y)) + heads.append(Header("creditMode")) + heads.append(Header("delivered", Header.KMG)) + rows = [] + for broker in self.brokers: + for oid in broker.subscriptions: + s = broker.subscriptions[oid] + row = [] + try: + if self.cluster: + row.append(broker.getName()) + row.append(s.name) + row.append(self.qmf.getObjects(_objectId=s.queueRef)[0].name) + connectionRef = self.qmf.getObjects(_objectId=s.sessionRef)[0].connectionRef + row.append(self.qmf.getObjects(_objectId=connectionRef)[0].address) + row.append(self.qmf.getObjects(_objectId=connectionRef)[0].remoteProcessName) + row.append(self.qmf.getObjects(_objectId=connectionRef)[0].remotePid) + row.append(s.browsing) + row.append(s.acknowledged) + row.append(s.exclusive) + row.append(s.creditMode) + row.append(s.delivered) + rows.append(row) + except: + pass + title = "Subscriptions" + if self.cluster: + title += " for cluster '%s'" % self.cluster.clusterName + if config._sortcol: + sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) + dispRows = sorter.getSorted() + else: + dispRows = rows + disp.formattedTable(title, heads, dispRows) + + def displayMain(self, main, subs): + if main == 'b': self.displayBroker(subs) + elif main == 'c': self.displayConn(subs) + elif main == 's': self.displaySession(subs) + elif main == 'e': self.displayExchange(subs) + elif main == 'q': self.displayQueue(subs) + elif main == 'u': self.displaySubscriptions(subs) + + def display(self): + if config._cluster_detail or config._types[0] == 'b': + # always show cluster detail when dumping broker stats + self._getCluster() + if self.cluster: + memberList = self.cluster.members.split(";") + hostList = self._getHostList(memberList) + self.qmf.delBroker(self.broker) + self.broker = None + if config._host.find("@") > 0: + authString = config._host.split("@")[0] + "@" + else: + authString = "" + for host in hostList: + b = self.qmf.addBroker(authString + host, config._connTimeout) + self.brokers.append(Broker(self.qmf, b)) + else: + self.brokers.append(Broker(self.qmf, self.broker)) + + self.displayMain(config._types[0], config._types[1:]) + + +def main(argv=None): + + args = OptionsAndArguments(argv) + bm = BrokerManager() + + try: + bm.SetBroker(config._host, config._sasl_mechanism) + bm.display() + bm.Disconnect() + return 0 + except KeyboardInterrupt: + print + except Exception,e: + print "Failed: %s - %s" % (e.__class__.__name__, e) + + bm.Disconnect() # try to deallocate brokers + return 1 + +if __name__ == "__main__": + sys.exit(main()) |