summaryrefslogtreecommitdiff
path: root/qpid/python/commands/qpid-stat
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python/commands/qpid-stat')
-rwxr-xr-xqpid/python/commands/qpid-stat449
1 files changed, 449 insertions, 0 deletions
diff --git a/qpid/python/commands/qpid-stat b/qpid/python/commands/qpid-stat
new file mode 100755
index 0000000000..9d41f4e966
--- /dev/null
+++ b/qpid/python/commands/qpid-stat
@@ -0,0 +1,449 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import getopt
+import sys
+import locale
+import socket
+import re
+from qmf.console import Session, Console
+from qpid.disp import Display, Header, Sorter
+
+_host = "localhost"
+_top = False
+_types = ""
+_limit = 50
+_increasing = False
+_sortcol = None
+pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
+
+def Usage ():
+ print "Usage: qpid-stat [OPTIONS] [broker-addr]"
+ print
+ print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]"
+ print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
+ print
+# print "General Options:"
+# print " -n [--numeric] Don't resolve names"
+# print " -t [--top] Repeatedly display top items"
+# print
+ print "Display Options:"
+ print
+ print " -b Show Brokers"
+ print " -c Show Connections"
+# print " -s Show Sessions"
+ print " -e Show Exchanges"
+ print " -q Show Queues"
+ print
+ print " -S [--sort-by] COLNAME Sort by column name"
+ print " -I [--increasing] Sort by increasing value (default = decreasing)"
+ print " -L [--limit] NUM Limit output to NUM rows (default = 50)"
+ print
+ sys.exit (1)
+
+class IpAddr:
+ def __init__(self, text):
+ if text.find("@") != -1:
+ tokens = text.split("@")
+ text = tokens[1]
+ if text.find(":") != -1:
+ tokens = text.split(":")
+ text = tokens[0]
+ self.port = int(tokens[1])
+ else:
+ self.port = 5672
+ self.dottedQuad = socket.gethostbyname(text)
+ nums = self.dottedQuad.split(".")
+ self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])
+
+ def bestAddr(self, addrPortList):
+ bestDiff = 0xFFFFFFFF
+ bestAddr = None
+ for addrPort in addrPortList:
+ diff = IpAddr(addrPort[0]).addr ^ self.addr
+ if diff < bestDiff:
+ bestDiff = diff
+ bestAddr = addrPort
+ return bestAddr
+
+class Broker(object):
+ def __init__(self, qmf, broker):
+ self.broker = broker
+ bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _broker=broker)[0]
+ self.currentTime = bobj.getTimestamps()[0]
+ try:
+ self.uptime = bobj.uptime
+ except:
+ self.uptime = 0
+ self.connections = {}
+ self.sessions = {}
+ self.exchanges = {}
+ self.queues = {}
+ package = "org.apache.qpid.broker"
+
+ list = qmf.getObjects(_class="connection", _package=package, _broker=broker)
+ for conn in list:
+ if pattern.match(conn.address):
+ self.connections[conn.getObjectId()] = conn
+
+ list = qmf.getObjects(_class="session", _package=package, _broker=broker)
+ for sess in list:
+ if sess.connectionRef in self.connections:
+ self.sessions[sess.getObjectId()] = sess
+
+ list = qmf.getObjects(_class="exchange", _package=package, _broker=broker)
+ for exchange in list:
+ self.exchanges[exchange.getObjectId()] = exchange
+
+ list = qmf.getObjects(_class="queue", _package=package, _broker=broker)
+ for queue in list:
+ self.queues[queue.getObjectId()] = queue
+
+ def getName(self):
+ return self.broker.getUrl()
+
+ def getCurrentTime(self):
+ return self.currentTime
+
+ def getUptime(self):
+ return self.uptime
+
+class BrokerManager(Console):
+ def __init__(self):
+ self.brokerName = None
+ self.qmf = None
+ self.broker = None
+ self.brokers = []
+ self.cluster = None
+
+ def SetBroker(self, brokerUrl):
+ self.url = brokerUrl
+ self.qmf = Session()
+ self.broker = self.qmf.addBroker(brokerUrl)
+ agents = self.qmf.getAgents()
+ for a in agents:
+ if a.getAgentBank() == 0:
+ self.brokerAgent = a
+
+ def Disconnect(self):
+ if self.broker:
+ self.qmf.delBroker(self.broker)
+
+ def _getCluster(self):
+ packages = self.qmf.getPackages()
+ if "org.apache.qpid.cluster" not in packages:
+ return None
+
+ clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
+ if len(clusters) == 0:
+ print "Clustering is installed but not enabled on the broker."
+ return None
+
+ self.cluster = clusters[0]
+
+ def _getHostList(self, urlList):
+ hosts = []
+ hostAddr = IpAddr(_host)
+ for url in urlList:
+ if url.find("amqp:") != 0:
+ raise Exception("Invalid URL 1")
+ url = url[5:]
+ addrs = str(url).split(",")
+ addrList = []
+ for addr in addrs:
+ tokens = addr.split(":")
+ if len(tokens) != 3:
+ raise Exception("Invalid URL 2")
+ addrList.append((tokens[1], tokens[2]))
+
+ # Find the address in the list that is most likely to be in the same subnet as the address
+ # with which we made the original QMF connection. This increases the probability that we will
+ # be able to reach the cluster member.
+
+ best = hostAddr.bestAddr(addrList)
+ bestUrl = best[0] + ":" + best[1]
+ hosts.append(bestUrl)
+ return hosts
+
+ def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, exchange=None, queue=None):
+ if len(subs) == 0:
+ return
+ this = subs[0]
+ remaining = subs[1:]
+ newindent = indent + " "
+ if this == 'b':
+ pass
+ elif this == 'c':
+ if broker:
+ for oid in broker.connections:
+ iconn = broker.connections[oid]
+ self.printConnSub(indent, broker.getName(), iconn)
+ self.displaySubs(remaining, newindent, broker=broker, conn=iconn,
+ sess=sess, exchange=exchange, queue=queue)
+ elif this == 's':
+ pass
+ elif this == 'e':
+ pass
+ elif this == 'q':
+ pass
+ print
+
+ def displayBroker(self, subs):
+ disp = Display(prefix=" ")
+ heads = []
+ heads.append(Header('broker'))
+ heads.append(Header('cluster'))
+ heads.append(Header('uptime', Header.DURATION))
+ heads.append(Header('conn', Header.KMG))
+ heads.append(Header('sess', Header.KMG))
+ heads.append(Header('exch', Header.KMG))
+ heads.append(Header('queue', Header.KMG))
+ rows = []
+ for broker in self.brokers:
+ if self.cluster:
+ ctext = "%s(%s)" % (self.cluster.clusterName, self.cluster.status)
+ else:
+ ctext = "<standalone>"
+ row = (broker.getName(), ctext, broker.getUptime(),
+ len(broker.connections), len(broker.sessions),
+ len(broker.exchanges), len(broker.queues))
+ rows.append(row)
+ title = "Brokers"
+ if _sortcol:
+ sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+ def displayConn(self, subs):
+ disp = Display(prefix=" ")
+ heads = []
+ if self.cluster:
+ heads.append(Header('broker'))
+ heads.append(Header('client-addr'))
+ heads.append(Header('cproc'))
+ heads.append(Header('cpid'))
+ heads.append(Header('auth'))
+ heads.append(Header('connected', Header.DURATION))
+ heads.append(Header('idle', Header.DURATION))
+ heads.append(Header('msgIn', Header.KMG))
+ heads.append(Header('msgOut', Header.KMG))
+ rows = []
+ for broker in self.brokers:
+ for oid in broker.connections:
+ conn = broker.connections[oid]
+ row = []
+ if self.cluster:
+ row.append(broker.getName())
+ row.append(conn.address)
+ row.append(conn.remoteProcessName)
+ row.append(conn.remotePid)
+ row.append(conn.authIdentity)
+ row.append(broker.getCurrentTime() - conn.getTimestamps()[1])
+ idle = broker.getCurrentTime() - conn.getTimestamps()[0]
+ row.append(broker.getCurrentTime() - conn.getTimestamps()[0])
+ row.append(conn.framesFromClient)
+ row.append(conn.framesToClient)
+ rows.append(row)
+ title = "Connections"
+ if self.cluster:
+ title += " for cluster '%s'" % self.cluster.clusterName
+ if _sortcol:
+ sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+ def displaySession(self, subs):
+ disp = Display(prefix=" ")
+
+ def displayExchange(self, subs):
+ disp = Display(prefix=" ")
+ heads = []
+ if self.cluster:
+ heads.append(Header('broker'))
+ heads.append(Header("exchange"))
+ heads.append(Header("type"))
+ heads.append(Header("dur", Header.Y))
+ heads.append(Header("bind", Header.KMG))
+ heads.append(Header("msgIn", Header.KMG))
+ heads.append(Header("msgOut", Header.KMG))
+ heads.append(Header("msgDrop", Header.KMG))
+ heads.append(Header("byteIn", Header.KMG))
+ heads.append(Header("byteOut", Header.KMG))
+ heads.append(Header("byteDrop", Header.KMG))
+ rows = []
+ for broker in self.brokers:
+ for oid in broker.exchanges:
+ ex = broker.exchanges[oid]
+ row = []
+ if self.cluster:
+ row.append(broker.getName())
+ row.append(ex.name)
+ row.append(ex.type)
+ row.append(ex.durable)
+ row.append(ex.bindingCount)
+ row.append(ex.msgReceives)
+ row.append(ex.msgRoutes)
+ row.append(ex.msgDrops)
+ row.append(ex.byteReceives)
+ row.append(ex.byteRoutes)
+ row.append(ex.byteDrops)
+ rows.append(row)
+ title = "Exchanges"
+ if self.cluster:
+ title += " for cluster '%s'" % self.cluster.clusterName
+ if _sortcol:
+ sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+ def displayQueue(self, subs):
+ disp = Display(prefix=" ")
+ heads = []
+ if self.cluster:
+ heads.append(Header('broker'))
+ heads.append(Header("queue"))
+ heads.append(Header("dur", Header.Y))
+ heads.append(Header("autoDel", Header.Y))
+ heads.append(Header("excl", Header.Y))
+ heads.append(Header("msg", Header.KMG))
+ heads.append(Header("msgIn", Header.KMG))
+ heads.append(Header("msgOut", Header.KMG))
+ heads.append(Header("bytes", Header.KMG))
+ heads.append(Header("bytesIn", Header.KMG))
+ heads.append(Header("bytesOut", Header.KMG))
+ heads.append(Header("cons", Header.KMG))
+ heads.append(Header("bind", Header.KMG))
+ rows = []
+ for broker in self.brokers:
+ for oid in broker.queues:
+ q = broker.queues[oid]
+ row = []
+ if self.cluster:
+ row.append(broker.getName())
+ row.append(q.name)
+ row.append(q.durable)
+ row.append(q.autoDelete)
+ row.append(q.exclusive)
+ row.append(q.msgDepth)
+ row.append(q.msgTotalEnqueues)
+ row.append(q.msgTotalDequeues)
+ row.append(q.byteDepth)
+ row.append(q.byteTotalEnqueues)
+ row.append(q.byteTotalDequeues)
+ row.append(q.consumerCount)
+ row.append(q.bindingCount)
+ rows.append(row)
+ title = "Queues"
+ if self.cluster:
+ title += " for cluster '%s'" % self.cluster.clusterName
+ if _sortcol:
+ sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+ def displayMain(self, main, subs):
+ if main == 'b': self.displayBroker(subs)
+ elif main == 'c': self.displayConn(subs)
+ elif main == 's': self.displaySession(subs)
+ elif main == 'e': self.displayExchange(subs)
+ elif main == 'q': self.displayQueue(subs)
+
+ def display(self):
+ self._getCluster()
+ if self.cluster:
+ memberList = self.cluster.members.split(";")
+ hostList = self._getHostList(memberList)
+ self.qmf.delBroker(self.broker)
+ self.broker = None
+ for host in hostList:
+ b = self.qmf.addBroker(host)
+ self.brokers.append(Broker(self.qmf, b))
+ else:
+ self.brokers.append(Broker(self.qmf, self.broker))
+
+ self.displayMain(_types[0], _types[1:])
+
+
+##
+## Main Program
+##
+
+try:
+ longOpts = ("top", "numeric", "sort-by=", "limit=", "increasing")
+ (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "bceqS:L:I", longOpts)
+except:
+ Usage()
+
+try:
+ encoding = locale.getpreferredencoding()
+ cargs = [a.decode(encoding) for a in encArgs]
+except:
+ cargs = encArgs
+
+for opt in optlist:
+ if opt[0] == "-t" or opt[0] == "--top":
+ _top = True
+ elif opt[0] == "-n" or opt[0] == "--numeric":
+ _numeric = True
+ elif opt[0] == "-S" or opt[0] == "--sort-by":
+ _sortcol = opt[1]
+ elif opt[0] == "-I" or opt[0] == "--increasing":
+ _increasing = True
+ elif opt[0] == "-L" or opt[0] == "--limit":
+ _limit = int(opt[1])
+ elif len(opt[0]) == 2:
+ char = opt[0][1]
+ if "bcseq".find(char) != -1:
+ _types += char
+ else:
+ Usage()
+ else:
+ Usage()
+
+if len(_types) == 0:
+ Usage()
+
+nargs = len(cargs)
+bm = BrokerManager()
+
+if nargs == 1:
+ _host = cargs[0]
+
+try:
+ bm.SetBroker(_host)
+ bm.display()
+except KeyboardInterrupt:
+ print
+except Exception,e:
+ print "Failed:", e.args
+ #raise # TODO: Remove before flight
+ sys.exit(1)
+
+bm.Disconnect()