summaryrefslogtreecommitdiff
path: root/qpid/tools/src/py/qpid-stat
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/tools/src/py/qpid-stat')
-rwxr-xr-xqpid/tools/src/py/qpid-stat511
1 files changed, 511 insertions, 0 deletions
diff --git a/qpid/tools/src/py/qpid-stat b/qpid/tools/src/py/qpid-stat
new file mode 100755
index 0000000000..ecbfddf5fb
--- /dev/null
+++ b/qpid/tools/src/py/qpid-stat
@@ -0,0 +1,511 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+from optparse import OptionParser, OptionGroup
+import sys
+import locale
+import socket
+import re
+from qpid.messaging import Connection
+
+home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools"))
+sys.path.append(os.path.join(home, "python"))
+
+from qpidtoollibs import BrokerAgent
+from qpidtoollibs import Display, Header, Sorter, YN, Commas, TimeLong
+
+
+class Config:
+ def __init__(self):
+ self._host = "localhost"
+ self._connTimeout = 10
+ self._types = ""
+ self._limit = 50
+ self._increasing = False
+ self._sortcol = None
+
+config = Config()
+conn_options = {}
+
+def OptionsAndArguments(argv):
+ """ Set global variables for options, return arguments """
+
+ global config
+ global conn_options
+
+ usage = \
+"""%prog -g [options]
+ %prog -c [options]
+ %prog -e [options]
+ %prog -q [options] [queue-name]
+ %prog -u [options]
+ %prog -m [options]
+ %prog --acl [options]"""
+
+ parser = OptionParser(usage=usage)
+
+ group1 = OptionGroup(parser, "General Options")
+ group1.add_option("-b", "--broker", action="store", type="string", default="localhost", metavar="<url>",
+ help="URL of the broker to query")
+ group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>",
+ help="Maximum time to wait for broker connection (in seconds)")
+ group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>",
+ help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+ group1.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
+ group1.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)")
+ group1.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
+ parser.add_option_group(group1)
+
+ group2 = OptionGroup(parser, "Command Options")
+ group2.add_option("-g", "--general", help="Show General Broker Stats", action="store_const", const="g", dest="show")
+ group2.add_option("-c", "--connections", help="Show Connections", action="store_const", const="c", dest="show")
+ group2.add_option("-e", "--exchanges", help="Show Exchanges", action="store_const", const="e", dest="show")
+ group2.add_option("-q", "--queues", help="Show Queues", action="store_const", const="q", dest="show")
+ group2.add_option("-u", "--subscriptions", help="Show Subscriptions", action="store_const", const="u", dest="show")
+ group2.add_option("-m", "--memory", help="Show Broker Memory Stats", action="store_const", const="m", dest="show")
+ group2.add_option( "--acl", help="Show Access Control List Stats", action="store_const", const="acl", dest="show")
+ parser.add_option_group(group2)
+
+ group3 = OptionGroup(parser, "Display Options")
+ group3.add_option("-S", "--sort-by", metavar="<colname>", help="Sort by column name")
+ group3.add_option("-I", "--increasing", action="store_true", default=False, help="Sort by increasing value (default = decreasing)")
+ group3.add_option("-L", "--limit", type="int", default=50, metavar="<n>", help="Limit output to n rows")
+ parser.add_option_group(group3)
+
+ opts, args = parser.parse_args(args=argv)
+
+ if not opts.show:
+ parser.error("You must specify one of these options: -g, -c, -e, -q, -m, or -u. For details, try $ qpid-stat --help")
+
+ config._types = opts.show
+ config._sortcol = opts.sort_by
+ config._host = opts.broker
+ config._connTimeout = opts.timeout
+ config._increasing = opts.increasing
+ config._limit = opts.limit
+
+ if opts.sasl_mechanism:
+ conn_options['sasl_mechanisms'] = opts.sasl_mechanism
+ if opts.ssl_certificate:
+ conn_options['ssl_certfile'] = opts.ssl_certificate
+ if opts.ssl_key:
+ if not opts.ssl_certificate:
+ parser.error("missing '--ssl-certificate' (required by '--ssl-key')")
+ conn_options['ssl_keyfile'] = opts.ssl_key
+ if opts.ha_admin:
+ conn_options['client_properties'] = {'qpid.ha-admin' : 1}
+
+ return args
+
+class BrokerManager:
+ def __init__(self):
+ self.brokerName = None
+ self.connection = None
+ self.broker = None
+ self.cluster = None
+
+ def SetBroker(self, brokerUrl):
+ self.url = brokerUrl
+ self.connection = Connection.establish(self.url, **conn_options)
+ self.broker = BrokerAgent(self.connection)
+
+ def Disconnect(self):
+ """ Release any allocated brokers. Ignore any failures as the tool is
+ shutting down.
+ """
+ try:
+ self.connection.close()
+ except:
+ pass
+
+ def displayBroker(self):
+ disp = Display(prefix=" ")
+ heads = []
+ heads.append(Header('uptime', Header.DURATION))
+ heads.append(Header('cluster', Header.NONE))
+ heads.append(Header('connections', Header.COMMAS))
+ heads.append(Header('sessions', Header.COMMAS))
+ heads.append(Header('exchanges', Header.COMMAS))
+ heads.append(Header('queues', Header.COMMAS))
+ rows = []
+ broker = self.broker.getBroker()
+ cluster = self.broker.getCluster()
+ clusterInfo = cluster and cluster.clusterName + "<" + cluster.status + ">" or "<standalone>"
+ connections = self.getConnectionMap()
+ sessions = self.getSessionMap()
+ exchanges = self.getExchangeMap()
+ queues = self.getQueueMap()
+ row = (broker.getUpdateTime() - broker.getCreateTime(),
+ clusterInfo,
+ len(connections), len(sessions),
+ len(exchanges), len(queues))
+ rows.append(row)
+ disp.formattedTable('Broker Summary:', heads, rows)
+
+ if 'queueCount' not in broker.values:
+ return
+
+ print
+ heads = []
+ heads.append(Header('Statistic'))
+ heads.append(Header('Messages', Header.COMMAS))
+ heads.append(Header('Bytes', Header.COMMAS))
+ rows = []
+ rows.append(['queue-depth', broker.msgDepth, broker.byteDepth])
+ rows.append(['total-enqueues', broker.msgTotalEnqueues, broker.byteTotalEnqueues])
+ rows.append(['total-dequeues', broker.msgTotalDequeues, broker.byteTotalDequeues])
+ rows.append(['persistent-enqueues', broker.msgPersistEnqueues, broker.bytePersistEnqueues])
+ rows.append(['persistent-dequeues', broker.msgPersistDequeues, broker.bytePersistDequeues])
+ rows.append(['transactional-enqueues', broker.msgTxnEnqueues, broker.byteTxnEnqueues])
+ rows.append(['transactional-dequeues', broker.msgTxnDequeues, broker.byteTxnDequeues])
+ rows.append(['flow-to-disk-depth', broker.msgFtdDepth, broker.byteFtdDepth])
+ rows.append(['flow-to-disk-enqueues', broker.msgFtdEnqueues, broker.byteFtdEnqueues])
+ rows.append(['flow-to-disk-dequeues', broker.msgFtdDequeues, broker.byteFtdDequeues])
+ rows.append(['acquires', broker.acquires, None])
+ rows.append(['releases', broker.releases, None])
+ rows.append(['discards-no-route', broker.discardsNoRoute, None])
+ rows.append(['discards-ttl-expired', broker.discardsTtl, None])
+ rows.append(['discards-limit-overflow', broker.discardsOverflow, None])
+ rows.append(['discards-ring-overflow', broker.discardsRing, None])
+ rows.append(['discards-lvq-replace', broker.discardsLvq, None])
+ rows.append(['discards-subscriber-reject', broker.discardsSubscriber, None])
+ rows.append(['discards-purged', broker.discardsPurge, None])
+ rows.append(['reroutes', broker.reroutes, None])
+ rows.append(['abandoned', broker.abandoned, None])
+ rows.append(['abandoned-via-alt', broker.abandonedViaAlt, None])
+ disp.formattedTable('Aggregate Broker Statistics:', heads, rows)
+
+
+ def displayConn(self):
+ disp = Display(prefix=" ")
+ heads = []
+ heads.append(Header('connection'))
+ heads.append(Header('cproc'))
+ heads.append(Header('cpid'))
+ heads.append(Header('mech'))
+ heads.append(Header('auth'))
+ heads.append(Header('connected', Header.DURATION))
+ heads.append(Header('idle', Header.DURATION))
+ heads.append(Header('msgIn', Header.KMG))
+ heads.append(Header('msgOut', Header.KMG))
+ rows = []
+ connections = self.broker.getAllConnections()
+ broker = self.broker.getBroker()
+ for conn in connections:
+ row = []
+ row.append(conn.address)
+ if conn.remoteProcessName: row.append(conn.remoteProcessName)
+ else: row.append("-")
+ row.append(conn.remotePid)
+ if conn.saslMechanism: row.append(conn.saslMechanism)
+ else: row.append("-")
+ if conn.authIdentity: row.append(conn.authIdentity)
+ else: row.append("-")
+ row.append(broker.getUpdateTime() - conn.getCreateTime())
+ row.append(broker.getUpdateTime() - conn.getUpdateTime())
+ row.append(conn.msgsFromClient)
+ row.append(conn.msgsToClient)
+ rows.append(row)
+ title = "Connections"
+ if config._sortcol:
+ sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+ def displaySession(self):
+ disp = Display(prefix=" ")
+
+ def displayExchange(self):
+ disp = Display(prefix=" ")
+ heads = []
+ heads.append(Header("exchange"))
+ heads.append(Header("type"))
+ heads.append(Header("dur", Header.Y))
+ heads.append(Header("bind", Header.KMG))
+ heads.append(Header("msgIn", Header.KMG))
+ heads.append(Header("msgOut", Header.KMG))
+ heads.append(Header("msgDrop", Header.KMG))
+ heads.append(Header("byteIn", Header.KMG))
+ heads.append(Header("byteOut", Header.KMG))
+ heads.append(Header("byteDrop", Header.KMG))
+ rows = []
+ exchanges = self.broker.getAllExchanges()
+ for ex in exchanges:
+ row = []
+ row.append(ex.name)
+ row.append(ex.type)
+ row.append(ex.durable)
+ row.append(ex.bindingCount)
+ row.append(ex.msgReceives)
+ row.append(ex.msgRoutes)
+ row.append(ex.msgDrops)
+ row.append(ex.byteReceives)
+ row.append(ex.byteRoutes)
+ row.append(ex.byteDrops)
+ rows.append(row)
+ title = "Exchanges"
+ if config._sortcol:
+ sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+ def displayQueues(self):
+ disp = Display(prefix=" ")
+ heads = []
+ heads.append(Header("queue"))
+ heads.append(Header("dur", Header.Y))
+ heads.append(Header("autoDel", Header.Y))
+ heads.append(Header("excl", Header.Y))
+ heads.append(Header("msg", Header.KMG))
+ heads.append(Header("msgIn", Header.KMG))
+ heads.append(Header("msgOut", Header.KMG))
+ heads.append(Header("bytes", Header.KMG))
+ heads.append(Header("bytesIn", Header.KMG))
+ heads.append(Header("bytesOut", Header.KMG))
+ heads.append(Header("cons", Header.KMG))
+ heads.append(Header("bind", Header.KMG))
+ rows = []
+ queues = self.broker.getAllQueues()
+ for q in queues:
+ row = []
+ row.append(q.name)
+ row.append(q.durable)
+ row.append(q.autoDelete)
+ row.append(q.exclusive)
+ row.append(q.msgDepth)
+ row.append(q.msgTotalEnqueues)
+ row.append(q.msgTotalDequeues)
+ row.append(q.byteDepth)
+ row.append(q.byteTotalEnqueues)
+ row.append(q.byteTotalDequeues)
+ row.append(q.consumerCount)
+ row.append(q.bindingCount)
+ rows.append(row)
+ title = "Queues"
+ if config._sortcol:
+ sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+
+ def displayQueue(self, name):
+ queue = self.broker.getQueue(name)
+ if not queue:
+ print "Queue '%s' not found" % name
+ return
+
+ disp = Display(prefix=" ")
+ heads = []
+ heads.append(Header('Name'))
+ heads.append(Header('Durable', Header.YN))
+ heads.append(Header('AutoDelete', Header.YN))
+ heads.append(Header('Exclusive', Header.YN))
+ heads.append(Header('FlowStopped', Header.YN))
+ heads.append(Header('FlowStoppedCount', Header.COMMAS))
+ heads.append(Header('Consumers', Header.COMMAS))
+ heads.append(Header('Bindings', Header.COMMAS))
+ rows = []
+ rows.append([queue.name, queue.durable, queue.autoDelete, queue.exclusive,
+ queue.flowStopped, queue.flowStoppedCount,
+ queue.consumerCount, queue.bindingCount])
+ disp.formattedTable("Properties:", heads, rows)
+ print
+
+ heads = []
+ heads.append(Header('Property'))
+ heads.append(Header('Value'))
+ rows = []
+ rows.append(['arguments', queue.arguments])
+ rows.append(['alt-exchange', queue.altExchange])
+ disp.formattedTable("Optional Properties:", heads, rows)
+ print
+
+ heads = []
+ heads.append(Header('Statistic'))
+ heads.append(Header('Messages', Header.COMMAS))
+ heads.append(Header('Bytes', Header.COMMAS))
+ rows = []
+ rows.append(['queue-depth', queue.msgDepth, queue.byteDepth])
+ rows.append(['total-enqueues', queue.msgTotalEnqueues, queue.byteTotalEnqueues])
+ rows.append(['total-dequeues', queue.msgTotalDequeues, queue.byteTotalDequeues])
+ rows.append(['persistent-enqueues', queue.msgPersistEnqueues, queue.bytePersistEnqueues])
+ rows.append(['persistent-dequeues', queue.msgPersistDequeues, queue.bytePersistDequeues])
+ rows.append(['transactional-enqueues', queue.msgTxnEnqueues, queue.byteTxnEnqueues])
+ rows.append(['transactional-dequeues', queue.msgTxnDequeues, queue.byteTxnDequeues])
+ rows.append(['flow-to-disk-depth', queue.msgFtdDepth, queue.byteFtdDepth])
+ rows.append(['flow-to-disk-enqueues', queue.msgFtdEnqueues, queue.byteFtdEnqueues])
+ rows.append(['flow-to-disk-dequeues', queue.msgFtdDequeues, queue.byteFtdDequeues])
+ rows.append(['acquires', queue.acquires, None])
+ rows.append(['releases', queue.releases, None])
+ rows.append(['discards-ttl-expired', queue.discardsTtl, None])
+ rows.append(['discards-limit-overflow', queue.discardsOverflow, None])
+ rows.append(['discards-ring-overflow', queue.discardsRing, None])
+ rows.append(['discards-lvq-replace', queue.discardsLvq, None])
+ rows.append(['discards-subscriber-reject', queue.discardsSubscriber, None])
+ rows.append(['discards-purged', queue.discardsPurge, None])
+ rows.append(['reroutes', queue.reroutes, None])
+ disp.formattedTable("Statistics:", heads, rows)
+
+
+ def displaySubscriptions(self):
+ disp = Display(prefix=" ")
+ heads = []
+ heads.append(Header("subscr"))
+ heads.append(Header("queue"))
+ heads.append(Header("conn"))
+ heads.append(Header("procName"))
+ heads.append(Header("procId"))
+ heads.append(Header("browse", Header.Y))
+ heads.append(Header("acked", Header.Y))
+ heads.append(Header("excl", Header.Y))
+ heads.append(Header("creditMode"))
+ heads.append(Header("delivered", Header.COMMAS))
+ heads.append(Header("sessUnacked", Header.COMMAS))
+ rows = []
+ subscriptions = self.broker.getAllSubscriptions()
+ sessions = self.getSessionMap()
+ connections = self.getConnectionMap()
+ for s in subscriptions:
+ row = []
+ try:
+ row.append(s.name)
+ row.append(s.queueRef)
+ session = sessions[s.sessionRef]
+ connection = connections[session.connectionRef]
+ row.append(connection.address)
+ if connection.remoteProcessName: row.append(connection.remoteProcessName)
+ else: row.append("-")
+ row.append(connection.remotePid)
+ row.append(s.browsing)
+ row.append(s.acknowledged)
+ row.append(s.exclusive)
+ row.append(s.creditMode)
+ row.append(s.delivered)
+ row.append(session.unackedMessages)
+ rows.append(row)
+ except:
+ pass
+ title = "Subscriptions"
+ if config._sortcol:
+ sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+ def displayMemory(self):
+ disp = Display(prefix=" ")
+ heads = [Header('Statistic'), Header('Value', Header.COMMAS)]
+ rows = []
+ memory = self.broker.getMemory()
+ for k,v in memory.values.items():
+ if k != 'name':
+ rows.append([k, v])
+ disp.formattedTable('Broker Memory Statistics:', heads, rows)
+
+ def displayAcl(self):
+ acl = self.broker.getAcl()
+ if not acl:
+ print "ACL Policy Module is not installed"
+ return
+ disp = Display(prefix=" ")
+ heads = [Header('Statistic'), Header('Value')]
+ rows = []
+ rows.append(['policy-file', acl.policyFile])
+ rows.append(['enforcing', YN(acl.enforcingAcl)])
+ rows.append(['has-transfer-acls', YN(acl.transferAcl)])
+ rows.append(['last-acl-load', TimeLong(acl.lastAclLoad)])
+ rows.append(['acl-denials', Commas(acl.aclDenyCount)])
+ disp.formattedTable('ACL Policy Statistics:', heads, rows)
+
+ def getExchangeMap(self):
+ exchanges = self.broker.getAllExchanges()
+ emap = {}
+ for e in exchanges:
+ emap[e.name] = e
+ return emap
+
+ def getQueueMap(self):
+ queues = self.broker.getAllQueues()
+ qmap = {}
+ for q in queues:
+ qmap[q.name] = q
+ return qmap
+
+ def getSessionMap(self):
+ sessions = self.broker.getAllSessions()
+ smap = {}
+ for s in sessions:
+ smap[s.name] = s
+ return smap
+
+ def getConnectionMap(self):
+ connections = self.broker.getAllConnections()
+ cmap = {}
+ for c in connections:
+ cmap[c.address] = c
+ return cmap
+
+ def displayMain(self, names, main):
+ if main == 'g': self.displayBroker()
+ elif main == 'c': self.displayConn()
+ elif main == 's': self.displaySession()
+ elif main == 'e': self.displayExchange()
+ elif main == 'q':
+ if len(names) >= 1:
+ self.displayQueue(names[0])
+ else:
+ self.displayQueues()
+ elif main == 'u': self.displaySubscriptions()
+ elif main == 'm': self.displayMemory()
+ elif main == 'acl': self.displayAcl()
+
+ def display(self, names):
+ self.displayMain(names, config._types)
+
+
+def main(argv=None):
+
+ args = OptionsAndArguments(argv)
+ bm = BrokerManager()
+
+ try:
+ bm.SetBroker(config._host)
+ bm.display(args)
+ bm.Disconnect()
+ return 0
+ except KeyboardInterrupt:
+ print
+ except Exception,e:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+
+ bm.Disconnect() # try to deallocate brokers
+ return 1
+
+if __name__ == "__main__":
+ sys.exit(main())