diff options
Diffstat (limited to 'qpid/tools/src/py/qpid-stat')
-rwxr-xr-x | qpid/tools/src/py/qpid-stat | 511 |
1 files changed, 511 insertions, 0 deletions
diff --git a/qpid/tools/src/py/qpid-stat b/qpid/tools/src/py/qpid-stat new file mode 100755 index 0000000000..ecbfddf5fb --- /dev/null +++ b/qpid/tools/src/py/qpid-stat @@ -0,0 +1,511 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +from optparse import OptionParser, OptionGroup +import sys +import locale +import socket +import re +from qpid.messaging import Connection + +home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools")) +sys.path.append(os.path.join(home, "python")) + +from qpidtoollibs import BrokerAgent +from qpidtoollibs import Display, Header, Sorter, YN, Commas, TimeLong + + +class Config: + def __init__(self): + self._host = "localhost" + self._connTimeout = 10 + self._types = "" + self._limit = 50 + self._increasing = False + self._sortcol = None + +config = Config() +conn_options = {} + +def OptionsAndArguments(argv): + """ Set global variables for options, return arguments """ + + global config + global conn_options + + usage = \ +"""%prog -g [options] + %prog -c [options] + %prog -e [options] + %prog -q [options] [queue-name] + %prog -u [options] + %prog -m [options] + %prog --acl [options]""" + + parser = OptionParser(usage=usage) + + group1 = OptionGroup(parser, "General Options") + group1.add_option("-b", "--broker", action="store", type="string", default="localhost", metavar="<url>", + help="URL of the broker to query") + group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", + help="Maximum time to wait for broker connection (in seconds)") + group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", + help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") + group1.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)") + group1.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)") + group1.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.") + parser.add_option_group(group1) + + group2 = OptionGroup(parser, "Command Options") + group2.add_option("-g", "--general", help="Show General Broker Stats", action="store_const", const="g", dest="show") + group2.add_option("-c", "--connections", help="Show Connections", action="store_const", const="c", dest="show") + group2.add_option("-e", "--exchanges", help="Show Exchanges", action="store_const", const="e", dest="show") + group2.add_option("-q", "--queues", help="Show Queues", action="store_const", const="q", dest="show") + group2.add_option("-u", "--subscriptions", help="Show Subscriptions", action="store_const", const="u", dest="show") + group2.add_option("-m", "--memory", help="Show Broker Memory Stats", action="store_const", const="m", dest="show") + group2.add_option( "--acl", help="Show Access Control List Stats", action="store_const", const="acl", dest="show") + parser.add_option_group(group2) + + group3 = OptionGroup(parser, "Display Options") + group3.add_option("-S", "--sort-by", metavar="<colname>", help="Sort by column name") + group3.add_option("-I", "--increasing", action="store_true", default=False, help="Sort by increasing value (default = decreasing)") + group3.add_option("-L", "--limit", type="int", default=50, metavar="<n>", help="Limit output to n rows") + parser.add_option_group(group3) + + opts, args = parser.parse_args(args=argv) + + if not opts.show: + parser.error("You must specify one of these options: -g, -c, -e, -q, -m, or -u. For details, try $ qpid-stat --help") + + config._types = opts.show + config._sortcol = opts.sort_by + config._host = opts.broker + config._connTimeout = opts.timeout + config._increasing = opts.increasing + config._limit = opts.limit + + if opts.sasl_mechanism: + conn_options['sasl_mechanisms'] = opts.sasl_mechanism + if opts.ssl_certificate: + conn_options['ssl_certfile'] = opts.ssl_certificate + if opts.ssl_key: + if not opts.ssl_certificate: + parser.error("missing '--ssl-certificate' (required by '--ssl-key')") + conn_options['ssl_keyfile'] = opts.ssl_key + if opts.ha_admin: + conn_options['client_properties'] = {'qpid.ha-admin' : 1} + + return args + +class BrokerManager: + def __init__(self): + self.brokerName = None + self.connection = None + self.broker = None + self.cluster = None + + def SetBroker(self, brokerUrl): + self.url = brokerUrl + self.connection = Connection.establish(self.url, **conn_options) + self.broker = BrokerAgent(self.connection) + + def Disconnect(self): + """ Release any allocated brokers. Ignore any failures as the tool is + shutting down. + """ + try: + self.connection.close() + except: + pass + + def displayBroker(self): + disp = Display(prefix=" ") + heads = [] + heads.append(Header('uptime', Header.DURATION)) + heads.append(Header('cluster', Header.NONE)) + heads.append(Header('connections', Header.COMMAS)) + heads.append(Header('sessions', Header.COMMAS)) + heads.append(Header('exchanges', Header.COMMAS)) + heads.append(Header('queues', Header.COMMAS)) + rows = [] + broker = self.broker.getBroker() + cluster = self.broker.getCluster() + clusterInfo = cluster and cluster.clusterName + "<" + cluster.status + ">" or "<standalone>" + connections = self.getConnectionMap() + sessions = self.getSessionMap() + exchanges = self.getExchangeMap() + queues = self.getQueueMap() + row = (broker.getUpdateTime() - broker.getCreateTime(), + clusterInfo, + len(connections), len(sessions), + len(exchanges), len(queues)) + rows.append(row) + disp.formattedTable('Broker Summary:', heads, rows) + + if 'queueCount' not in broker.values: + return + + print + heads = [] + heads.append(Header('Statistic')) + heads.append(Header('Messages', Header.COMMAS)) + heads.append(Header('Bytes', Header.COMMAS)) + rows = [] + rows.append(['queue-depth', broker.msgDepth, broker.byteDepth]) + rows.append(['total-enqueues', broker.msgTotalEnqueues, broker.byteTotalEnqueues]) + rows.append(['total-dequeues', broker.msgTotalDequeues, broker.byteTotalDequeues]) + rows.append(['persistent-enqueues', broker.msgPersistEnqueues, broker.bytePersistEnqueues]) + rows.append(['persistent-dequeues', broker.msgPersistDequeues, broker.bytePersistDequeues]) + rows.append(['transactional-enqueues', broker.msgTxnEnqueues, broker.byteTxnEnqueues]) + rows.append(['transactional-dequeues', broker.msgTxnDequeues, broker.byteTxnDequeues]) + rows.append(['flow-to-disk-depth', broker.msgFtdDepth, broker.byteFtdDepth]) + rows.append(['flow-to-disk-enqueues', broker.msgFtdEnqueues, broker.byteFtdEnqueues]) + rows.append(['flow-to-disk-dequeues', broker.msgFtdDequeues, broker.byteFtdDequeues]) + rows.append(['acquires', broker.acquires, None]) + rows.append(['releases', broker.releases, None]) + rows.append(['discards-no-route', broker.discardsNoRoute, None]) + rows.append(['discards-ttl-expired', broker.discardsTtl, None]) + rows.append(['discards-limit-overflow', broker.discardsOverflow, None]) + rows.append(['discards-ring-overflow', broker.discardsRing, None]) + rows.append(['discards-lvq-replace', broker.discardsLvq, None]) + rows.append(['discards-subscriber-reject', broker.discardsSubscriber, None]) + rows.append(['discards-purged', broker.discardsPurge, None]) + rows.append(['reroutes', broker.reroutes, None]) + rows.append(['abandoned', broker.abandoned, None]) + rows.append(['abandoned-via-alt', broker.abandonedViaAlt, None]) + disp.formattedTable('Aggregate Broker Statistics:', heads, rows) + + + def displayConn(self): + disp = Display(prefix=" ") + heads = [] + heads.append(Header('connection')) + heads.append(Header('cproc')) + heads.append(Header('cpid')) + heads.append(Header('mech')) + heads.append(Header('auth')) + heads.append(Header('connected', Header.DURATION)) + heads.append(Header('idle', Header.DURATION)) + heads.append(Header('msgIn', Header.KMG)) + heads.append(Header('msgOut', Header.KMG)) + rows = [] + connections = self.broker.getAllConnections() + broker = self.broker.getBroker() + for conn in connections: + row = [] + row.append(conn.address) + if conn.remoteProcessName: row.append(conn.remoteProcessName) + else: row.append("-") + row.append(conn.remotePid) + if conn.saslMechanism: row.append(conn.saslMechanism) + else: row.append("-") + if conn.authIdentity: row.append(conn.authIdentity) + else: row.append("-") + row.append(broker.getUpdateTime() - conn.getCreateTime()) + row.append(broker.getUpdateTime() - conn.getUpdateTime()) + row.append(conn.msgsFromClient) + row.append(conn.msgsToClient) + rows.append(row) + title = "Connections" + if config._sortcol: + sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) + dispRows = sorter.getSorted() + else: + dispRows = rows + disp.formattedTable(title, heads, dispRows) + + def displaySession(self): + disp = Display(prefix=" ") + + def displayExchange(self): + disp = Display(prefix=" ") + heads = [] + heads.append(Header("exchange")) + heads.append(Header("type")) + heads.append(Header("dur", Header.Y)) + heads.append(Header("bind", Header.KMG)) + heads.append(Header("msgIn", Header.KMG)) + heads.append(Header("msgOut", Header.KMG)) + heads.append(Header("msgDrop", Header.KMG)) + heads.append(Header("byteIn", Header.KMG)) + heads.append(Header("byteOut", Header.KMG)) + heads.append(Header("byteDrop", Header.KMG)) + rows = [] + exchanges = self.broker.getAllExchanges() + for ex in exchanges: + row = [] + row.append(ex.name) + row.append(ex.type) + row.append(ex.durable) + row.append(ex.bindingCount) + row.append(ex.msgReceives) + row.append(ex.msgRoutes) + row.append(ex.msgDrops) + row.append(ex.byteReceives) + row.append(ex.byteRoutes) + row.append(ex.byteDrops) + rows.append(row) + title = "Exchanges" + if config._sortcol: + sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) + dispRows = sorter.getSorted() + else: + dispRows = rows + disp.formattedTable(title, heads, dispRows) + + def displayQueues(self): + disp = Display(prefix=" ") + heads = [] + heads.append(Header("queue")) + heads.append(Header("dur", Header.Y)) + heads.append(Header("autoDel", Header.Y)) + heads.append(Header("excl", Header.Y)) + heads.append(Header("msg", Header.KMG)) + heads.append(Header("msgIn", Header.KMG)) + heads.append(Header("msgOut", Header.KMG)) + heads.append(Header("bytes", Header.KMG)) + heads.append(Header("bytesIn", Header.KMG)) + heads.append(Header("bytesOut", Header.KMG)) + heads.append(Header("cons", Header.KMG)) + heads.append(Header("bind", Header.KMG)) + rows = [] + queues = self.broker.getAllQueues() + for q in queues: + row = [] + row.append(q.name) + row.append(q.durable) + row.append(q.autoDelete) + row.append(q.exclusive) + row.append(q.msgDepth) + row.append(q.msgTotalEnqueues) + row.append(q.msgTotalDequeues) + row.append(q.byteDepth) + row.append(q.byteTotalEnqueues) + row.append(q.byteTotalDequeues) + row.append(q.consumerCount) + row.append(q.bindingCount) + rows.append(row) + title = "Queues" + if config._sortcol: + sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) + dispRows = sorter.getSorted() + else: + dispRows = rows + disp.formattedTable(title, heads, dispRows) + + + def displayQueue(self, name): + queue = self.broker.getQueue(name) + if not queue: + print "Queue '%s' not found" % name + return + + disp = Display(prefix=" ") + heads = [] + heads.append(Header('Name')) + heads.append(Header('Durable', Header.YN)) + heads.append(Header('AutoDelete', Header.YN)) + heads.append(Header('Exclusive', Header.YN)) + heads.append(Header('FlowStopped', Header.YN)) + heads.append(Header('FlowStoppedCount', Header.COMMAS)) + heads.append(Header('Consumers', Header.COMMAS)) + heads.append(Header('Bindings', Header.COMMAS)) + rows = [] + rows.append([queue.name, queue.durable, queue.autoDelete, queue.exclusive, + queue.flowStopped, queue.flowStoppedCount, + queue.consumerCount, queue.bindingCount]) + disp.formattedTable("Properties:", heads, rows) + print + + heads = [] + heads.append(Header('Property')) + heads.append(Header('Value')) + rows = [] + rows.append(['arguments', queue.arguments]) + rows.append(['alt-exchange', queue.altExchange]) + disp.formattedTable("Optional Properties:", heads, rows) + print + + heads = [] + heads.append(Header('Statistic')) + heads.append(Header('Messages', Header.COMMAS)) + heads.append(Header('Bytes', Header.COMMAS)) + rows = [] + rows.append(['queue-depth', queue.msgDepth, queue.byteDepth]) + rows.append(['total-enqueues', queue.msgTotalEnqueues, queue.byteTotalEnqueues]) + rows.append(['total-dequeues', queue.msgTotalDequeues, queue.byteTotalDequeues]) + rows.append(['persistent-enqueues', queue.msgPersistEnqueues, queue.bytePersistEnqueues]) + rows.append(['persistent-dequeues', queue.msgPersistDequeues, queue.bytePersistDequeues]) + rows.append(['transactional-enqueues', queue.msgTxnEnqueues, queue.byteTxnEnqueues]) + rows.append(['transactional-dequeues', queue.msgTxnDequeues, queue.byteTxnDequeues]) + rows.append(['flow-to-disk-depth', queue.msgFtdDepth, queue.byteFtdDepth]) + rows.append(['flow-to-disk-enqueues', queue.msgFtdEnqueues, queue.byteFtdEnqueues]) + rows.append(['flow-to-disk-dequeues', queue.msgFtdDequeues, queue.byteFtdDequeues]) + rows.append(['acquires', queue.acquires, None]) + rows.append(['releases', queue.releases, None]) + rows.append(['discards-ttl-expired', queue.discardsTtl, None]) + rows.append(['discards-limit-overflow', queue.discardsOverflow, None]) + rows.append(['discards-ring-overflow', queue.discardsRing, None]) + rows.append(['discards-lvq-replace', queue.discardsLvq, None]) + rows.append(['discards-subscriber-reject', queue.discardsSubscriber, None]) + rows.append(['discards-purged', queue.discardsPurge, None]) + rows.append(['reroutes', queue.reroutes, None]) + disp.formattedTable("Statistics:", heads, rows) + + + def displaySubscriptions(self): + disp = Display(prefix=" ") + heads = [] + heads.append(Header("subscr")) + heads.append(Header("queue")) + heads.append(Header("conn")) + heads.append(Header("procName")) + heads.append(Header("procId")) + heads.append(Header("browse", Header.Y)) + heads.append(Header("acked", Header.Y)) + heads.append(Header("excl", Header.Y)) + heads.append(Header("creditMode")) + heads.append(Header("delivered", Header.COMMAS)) + heads.append(Header("sessUnacked", Header.COMMAS)) + rows = [] + subscriptions = self.broker.getAllSubscriptions() + sessions = self.getSessionMap() + connections = self.getConnectionMap() + for s in subscriptions: + row = [] + try: + row.append(s.name) + row.append(s.queueRef) + session = sessions[s.sessionRef] + connection = connections[session.connectionRef] + row.append(connection.address) + if connection.remoteProcessName: row.append(connection.remoteProcessName) + else: row.append("-") + row.append(connection.remotePid) + row.append(s.browsing) + row.append(s.acknowledged) + row.append(s.exclusive) + row.append(s.creditMode) + row.append(s.delivered) + row.append(session.unackedMessages) + rows.append(row) + except: + pass + title = "Subscriptions" + if config._sortcol: + sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) + dispRows = sorter.getSorted() + else: + dispRows = rows + disp.formattedTable(title, heads, dispRows) + + def displayMemory(self): + disp = Display(prefix=" ") + heads = [Header('Statistic'), Header('Value', Header.COMMAS)] + rows = [] + memory = self.broker.getMemory() + for k,v in memory.values.items(): + if k != 'name': + rows.append([k, v]) + disp.formattedTable('Broker Memory Statistics:', heads, rows) + + def displayAcl(self): + acl = self.broker.getAcl() + if not acl: + print "ACL Policy Module is not installed" + return + disp = Display(prefix=" ") + heads = [Header('Statistic'), Header('Value')] + rows = [] + rows.append(['policy-file', acl.policyFile]) + rows.append(['enforcing', YN(acl.enforcingAcl)]) + rows.append(['has-transfer-acls', YN(acl.transferAcl)]) + rows.append(['last-acl-load', TimeLong(acl.lastAclLoad)]) + rows.append(['acl-denials', Commas(acl.aclDenyCount)]) + disp.formattedTable('ACL Policy Statistics:', heads, rows) + + def getExchangeMap(self): + exchanges = self.broker.getAllExchanges() + emap = {} + for e in exchanges: + emap[e.name] = e + return emap + + def getQueueMap(self): + queues = self.broker.getAllQueues() + qmap = {} + for q in queues: + qmap[q.name] = q + return qmap + + def getSessionMap(self): + sessions = self.broker.getAllSessions() + smap = {} + for s in sessions: + smap[s.name] = s + return smap + + def getConnectionMap(self): + connections = self.broker.getAllConnections() + cmap = {} + for c in connections: + cmap[c.address] = c + return cmap + + def displayMain(self, names, main): + if main == 'g': self.displayBroker() + elif main == 'c': self.displayConn() + elif main == 's': self.displaySession() + elif main == 'e': self.displayExchange() + elif main == 'q': + if len(names) >= 1: + self.displayQueue(names[0]) + else: + self.displayQueues() + elif main == 'u': self.displaySubscriptions() + elif main == 'm': self.displayMemory() + elif main == 'acl': self.displayAcl() + + def display(self, names): + self.displayMain(names, config._types) + + +def main(argv=None): + + args = OptionsAndArguments(argv) + bm = BrokerManager() + + try: + bm.SetBroker(config._host) + bm.display(args) + bm.Disconnect() + return 0 + except KeyboardInterrupt: + print + except Exception,e: + print "Failed: %s - %s" % (e.__class__.__name__, e) + + bm.Disconnect() # try to deallocate brokers + return 1 + +if __name__ == "__main__": + sys.exit(main()) |