summaryrefslogtreecommitdiff
path: root/qpid/tools/src/py/qpid-stat
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/tools/src/py/qpid-stat')
-rwxr-xr-xqpid/tools/src/py/qpid-stat521
1 files changed, 521 insertions, 0 deletions
diff --git a/qpid/tools/src/py/qpid-stat b/qpid/tools/src/py/qpid-stat
new file mode 100755
index 0000000000..ce3f5d1ef5
--- /dev/null
+++ b/qpid/tools/src/py/qpid-stat
@@ -0,0 +1,521 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+from optparse import OptionParser, OptionGroup
+from time import sleep ### debug
+import sys
+import locale
+import socket
+import re
+from qmf.console import Session, Console
+from qpid.disp import Display, Header, Sorter
+
+class Config:
+ def __init__(self):
+ self._host = "localhost"
+ self._connTimeout = 10
+ self._types = ""
+ self._limit = 50
+ self._increasing = False
+ self._sortcol = None
+ self._cluster_detail = False
+ self._sasl_mechanism = None
+
+config = Config()
+
+def OptionsAndArguments(argv):
+ """ Set global variables for options, return arguments """
+
+ global config
+
+ parser = OptionParser(usage="usage: %prog [options] BROKER",
+ description="Example: $ qpid-stat -q broker-host:10000")
+
+ group1 = OptionGroup(parser, "General Options")
+ group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", help="Maximum time to wait for broker connection (in seconds)")
+ group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+ parser.add_option_group(group1)
+
+ group2 = OptionGroup(parser, "Display Options")
+ group2.add_option("-b", "--broker", help="Show Brokers",
+ action="store_const", const="b", dest="show")
+ group2.add_option("-c", "--connections", help="Show Connections",
+ action="store_const", const="c", dest="show")
+ group2.add_option("-e", "--exchanges", help="Show Exchanges",
+ action="store_const", const="e", dest="show")
+ group2.add_option("-q", "--queues", help="Show Queues",
+ action="store_const", const="q", dest="show")
+ group2.add_option("-u", "--subscriptions", help="Show Subscriptions",
+ action="store_const", const="u", dest="show")
+ group2.add_option("-S", "--sort-by", metavar="<colname>",
+ help="Sort by column name")
+ group2.add_option("-I", "--increasing", action="store_true", default=False,
+ help="Sort by increasing value (default = decreasing)")
+ group2.add_option("-L", "--limit", default=50, metavar="<n>",
+ help="Limit output to n rows")
+ group2.add_option("-C", "--cluster", action="store_true", default=False,
+ help="Display per-broker cluster detail.")
+ parser.add_option_group(group2)
+
+ opts, args = parser.parse_args(args=argv)
+
+ if not opts.show:
+ parser.error("You must specify one of these options: -b, -c, -e, -q. or -u. For details, try $ qpid-stat --help")
+
+ config._types = opts.show
+ config._sortcol = opts.sort_by
+ config._connTimeout = opts.timeout
+ config._increasing = opts.increasing
+ config._limit = opts.limit
+ config._cluster_detail = opts.cluster
+ config._sasl_mechanism = opts.sasl_mechanism
+
+ if args:
+ config._host = args[0]
+
+ return args
+
+class IpAddr:
+ def __init__(self, text):
+ if text.find("@") != -1:
+ tokens = text.split("@")
+ text = tokens[1]
+ if text.find(":") != -1:
+ tokens = text.split(":")
+ text = tokens[0]
+ self.port = int(tokens[1])
+ else:
+ self.port = 5672
+ self.dottedQuad = socket.gethostbyname(text)
+ nums = self.dottedQuad.split(".")
+ self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])
+
+ def bestAddr(self, addrPortList):
+ bestDiff = 0xFFFFFFFFL
+ bestAddr = None
+ for addrPort in addrPortList:
+ diff = IpAddr(addrPort[0]).addr ^ self.addr
+ if diff < bestDiff:
+ bestDiff = diff
+ bestAddr = addrPort
+ return bestAddr
+
+class Broker(object):
+ def __init__(self, qmf, broker):
+ self.broker = broker
+
+ agents = qmf.getAgents()
+ for a in agents:
+ if a.getAgentBank() == '0':
+ self.brokerAgent = a
+
+ bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _agent=self.brokerAgent)[0]
+ self.currentTime = bobj.getTimestamps()[0]
+ try:
+ self.uptime = bobj.uptime
+ except:
+ self.uptime = 0
+ self.connections = {}
+ self.sessions = {}
+ self.exchanges = {}
+ self.queues = {}
+ self.subscriptions = {}
+ package = "org.apache.qpid.broker"
+
+ list = qmf.getObjects(_class="connection", _package=package, _agent=self.brokerAgent)
+ for conn in list:
+ if not conn.shadow:
+ self.connections[conn.getObjectId()] = conn
+
+ list = qmf.getObjects(_class="session", _package=package, _agent=self.brokerAgent)
+ for sess in list:
+ if sess.connectionRef in self.connections:
+ self.sessions[sess.getObjectId()] = sess
+
+ list = qmf.getObjects(_class="exchange", _package=package, _agent=self.brokerAgent)
+ for exchange in list:
+ self.exchanges[exchange.getObjectId()] = exchange
+
+ list = qmf.getObjects(_class="queue", _package=package, _agent=self.brokerAgent)
+ for queue in list:
+ self.queues[queue.getObjectId()] = queue
+
+ list = qmf.getObjects(_class="subscription", _package=package, _agent=self.brokerAgent)
+ for subscription in list:
+ self.subscriptions[subscription.getObjectId()] = subscription
+
+ def getName(self):
+ return self.broker.getUrl()
+
+ def getCurrentTime(self):
+ return self.currentTime
+
+ def getUptime(self):
+ return self.uptime
+
+class BrokerManager(Console):
+ def __init__(self):
+ self.brokerName = None
+ self.qmf = None
+ self.broker = None
+ self.brokers = []
+ self.cluster = None
+
+ def SetBroker(self, brokerUrl, mechanism):
+ self.url = brokerUrl
+ self.qmf = Session()
+ self.mechanism = mechanism
+ self.broker = self.qmf.addBroker(brokerUrl, config._connTimeout, mechanism)
+ agents = self.qmf.getAgents()
+ for a in agents:
+ if a.getAgentBank() == '0':
+ self.brokerAgent = a
+
+ def Disconnect(self):
+ """ Release any allocated brokers. Ignore any failures as the tool is
+ shutting down.
+ """
+ try:
+ if self.broker:
+ self.qmf.delBroker(self.broker)
+ else:
+ for b in self.brokers: self.qmf.delBroker(b.broker)
+ except:
+ pass
+
+ def _getCluster(self):
+ packages = self.qmf.getPackages()
+ if "org.apache.qpid.cluster" not in packages:
+ return None
+
+ clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
+ if len(clusters) == 0:
+ print "Clustering is installed but not enabled on the broker."
+ return None
+
+ self.cluster = clusters[0]
+
+ def _getHostList(self, urlList):
+ hosts = []
+ hostAddr = IpAddr(config._host)
+ for url in urlList:
+ if url.find("amqp:") != 0:
+ raise Exception("Invalid URL 1")
+ url = url[5:]
+ addrs = str(url).split(",")
+ addrList = []
+ for addr in addrs:
+ tokens = addr.split(":")
+ if len(tokens) != 3:
+ raise Exception("Invalid URL 2")
+ addrList.append((tokens[1], tokens[2]))
+
+ # Find the address in the list that is most likely to be in the same subnet as the address
+ # with which we made the original QMF connection. This increases the probability that we will
+ # be able to reach the cluster member.
+
+ best = hostAddr.bestAddr(addrList)
+ bestUrl = best[0] + ":" + best[1]
+ hosts.append(bestUrl)
+ return hosts
+
+ def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, exchange=None, queue=None):
+ if len(subs) == 0:
+ return
+ this = subs[0]
+ remaining = subs[1:]
+ newindent = indent + " "
+ if this == 'b':
+ pass
+ elif this == 'c':
+ if broker:
+ for oid in broker.connections:
+ iconn = broker.connections[oid]
+ self.printConnSub(indent, broker.getName(), iconn)
+ self.displaySubs(remaining, newindent, broker=broker, conn=iconn,
+ sess=sess, exchange=exchange, queue=queue)
+ elif this == 's':
+ pass
+ elif this == 'e':
+ pass
+ elif this == 'q':
+ pass
+ print
+
+ def displayBroker(self, subs):
+ disp = Display(prefix=" ")
+ heads = []
+ heads.append(Header('broker'))
+ heads.append(Header('cluster'))
+ heads.append(Header('uptime', Header.DURATION))
+ heads.append(Header('conn', Header.KMG))
+ heads.append(Header('sess', Header.KMG))
+ heads.append(Header('exch', Header.KMG))
+ heads.append(Header('queue', Header.KMG))
+ rows = []
+ for broker in self.brokers:
+ if self.cluster:
+ ctext = "%s(%s)" % (self.cluster.clusterName, self.cluster.status)
+ else:
+ ctext = "<standalone>"
+ row = (broker.getName(), ctext, broker.getUptime(),
+ len(broker.connections), len(broker.sessions),
+ len(broker.exchanges), len(broker.queues))
+ rows.append(row)
+ title = "Brokers"
+ if config._sortcol:
+ sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+ def displayConn(self, subs):
+ disp = Display(prefix=" ")
+ heads = []
+ if self.cluster:
+ heads.append(Header('broker'))
+ heads.append(Header('client-addr'))
+ heads.append(Header('cproc'))
+ heads.append(Header('cpid'))
+ heads.append(Header('auth'))
+ heads.append(Header('connected', Header.DURATION))
+ heads.append(Header('idle', Header.DURATION))
+ heads.append(Header('msgIn', Header.KMG))
+ heads.append(Header('msgOut', Header.KMG))
+ rows = []
+ for broker in self.brokers:
+ for oid in broker.connections:
+ conn = broker.connections[oid]
+ row = []
+ if self.cluster:
+ row.append(broker.getName())
+ row.append(conn.address)
+ row.append(conn.remoteProcessName)
+ row.append(conn.remotePid)
+ row.append(conn.authIdentity)
+ row.append(broker.getCurrentTime() - conn.getTimestamps()[1])
+ idle = broker.getCurrentTime() - conn.getTimestamps()[0]
+ row.append(broker.getCurrentTime() - conn.getTimestamps()[0])
+ row.append(conn.framesFromClient)
+ row.append(conn.framesToClient)
+ rows.append(row)
+ title = "Connections"
+ if self.cluster:
+ title += " for cluster '%s'" % self.cluster.clusterName
+ if config._sortcol:
+ sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+ def displaySession(self, subs):
+ disp = Display(prefix=" ")
+
+ def displayExchange(self, subs):
+ disp = Display(prefix=" ")
+ heads = []
+ if self.cluster:
+ heads.append(Header('broker'))
+ heads.append(Header("exchange"))
+ heads.append(Header("type"))
+ heads.append(Header("dur", Header.Y))
+ heads.append(Header("bind", Header.KMG))
+ heads.append(Header("msgIn", Header.KMG))
+ heads.append(Header("msgOut", Header.KMG))
+ heads.append(Header("msgDrop", Header.KMG))
+ heads.append(Header("byteIn", Header.KMG))
+ heads.append(Header("byteOut", Header.KMG))
+ heads.append(Header("byteDrop", Header.KMG))
+ rows = []
+ for broker in self.brokers:
+ for oid in broker.exchanges:
+ ex = broker.exchanges[oid]
+ row = []
+ if self.cluster:
+ row.append(broker.getName())
+ row.append(ex.name)
+ row.append(ex.type)
+ row.append(ex.durable)
+ row.append(ex.bindingCount)
+ row.append(ex.msgReceives)
+ row.append(ex.msgRoutes)
+ row.append(ex.msgDrops)
+ row.append(ex.byteReceives)
+ row.append(ex.byteRoutes)
+ row.append(ex.byteDrops)
+ rows.append(row)
+ title = "Exchanges"
+ if self.cluster:
+ title += " for cluster '%s'" % self.cluster.clusterName
+ if config._sortcol:
+ sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+ def displayQueue(self, subs):
+ disp = Display(prefix=" ")
+ heads = []
+ if self.cluster:
+ heads.append(Header('broker'))
+ heads.append(Header("queue"))
+ heads.append(Header("dur", Header.Y))
+ heads.append(Header("autoDel", Header.Y))
+ heads.append(Header("excl", Header.Y))
+ heads.append(Header("msg", Header.KMG))
+ heads.append(Header("msgIn", Header.KMG))
+ heads.append(Header("msgOut", Header.KMG))
+ heads.append(Header("bytes", Header.KMG))
+ heads.append(Header("bytesIn", Header.KMG))
+ heads.append(Header("bytesOut", Header.KMG))
+ heads.append(Header("cons", Header.KMG))
+ heads.append(Header("bind", Header.KMG))
+ rows = []
+ for broker in self.brokers:
+ for oid in broker.queues:
+ q = broker.queues[oid]
+ row = []
+ if self.cluster:
+ row.append(broker.getName())
+ row.append(q.name)
+ row.append(q.durable)
+ row.append(q.autoDelete)
+ row.append(q.exclusive)
+ row.append(q.msgDepth)
+ row.append(q.msgTotalEnqueues)
+ row.append(q.msgTotalDequeues)
+ row.append(q.byteDepth)
+ row.append(q.byteTotalEnqueues)
+ row.append(q.byteTotalDequeues)
+ row.append(q.consumerCount)
+ row.append(q.bindingCount)
+ rows.append(row)
+ title = "Queues"
+ if self.cluster:
+ title += " for cluster '%s'" % self.cluster.clusterName
+ if config._sortcol:
+ sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+ def displaySubscriptions(self, subs):
+ disp = Display(prefix=" ")
+ heads = []
+ if self.cluster:
+ heads.append(Header('broker'))
+ heads.append(Header("subscription"))
+ heads.append(Header("queue"))
+ heads.append(Header("connection"))
+ heads.append(Header("processName"))
+ heads.append(Header("processId"))
+ heads.append(Header("browsing", Header.Y))
+ heads.append(Header("acknowledged", Header.Y))
+ heads.append(Header("exclusive", Header.Y))
+ heads.append(Header("creditMode"))
+ heads.append(Header("delivered", Header.KMG))
+ rows = []
+ for broker in self.brokers:
+ for oid in broker.subscriptions:
+ s = broker.subscriptions[oid]
+ row = []
+ try:
+ if self.cluster:
+ row.append(broker.getName())
+ row.append(s.name)
+ row.append(self.qmf.getObjects(_objectId=s.queueRef)[0].name)
+ connectionRef = self.qmf.getObjects(_objectId=s.sessionRef)[0].connectionRef
+ row.append(self.qmf.getObjects(_objectId=connectionRef)[0].address)
+ row.append(self.qmf.getObjects(_objectId=connectionRef)[0].remoteProcessName)
+ row.append(self.qmf.getObjects(_objectId=connectionRef)[0].remotePid)
+ row.append(s.browsing)
+ row.append(s.acknowledged)
+ row.append(s.exclusive)
+ row.append(s.creditMode)
+ row.append(s.delivered)
+ rows.append(row)
+ except:
+ pass
+ title = "Subscriptions"
+ if self.cluster:
+ title += " for cluster '%s'" % self.cluster.clusterName
+ if config._sortcol:
+ sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+ def displayMain(self, main, subs):
+ if main == 'b': self.displayBroker(subs)
+ elif main == 'c': self.displayConn(subs)
+ elif main == 's': self.displaySession(subs)
+ elif main == 'e': self.displayExchange(subs)
+ elif main == 'q': self.displayQueue(subs)
+ elif main == 'u': self.displaySubscriptions(subs)
+
+ def display(self):
+ if config._cluster_detail or config._types[0] == 'b':
+ # always show cluster detail when dumping broker stats
+ self._getCluster()
+ if self.cluster:
+ memberList = self.cluster.members.split(";")
+ hostList = self._getHostList(memberList)
+ self.qmf.delBroker(self.broker)
+ self.broker = None
+ if config._host.find("@") > 0:
+ authString = config._host.split("@")[0] + "@"
+ else:
+ authString = ""
+ for host in hostList:
+ b = self.qmf.addBroker(authString + host, config._connTimeout)
+ self.brokers.append(Broker(self.qmf, b))
+ else:
+ self.brokers.append(Broker(self.qmf, self.broker))
+
+ self.displayMain(config._types[0], config._types[1:])
+
+
+def main(argv=None):
+
+ args = OptionsAndArguments(argv)
+ bm = BrokerManager()
+
+ try:
+ bm.SetBroker(config._host, config._sasl_mechanism)
+ bm.display()
+ bm.Disconnect()
+ return 0
+ except KeyboardInterrupt:
+ print
+ except Exception,e:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+
+ bm.Disconnect() # try to deallocate brokers
+ return 1
+
+if __name__ == "__main__":
+ sys.exit(main())