summaryrefslogtreecommitdiff
path: root/qpid/tools/src/py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/tools/src/py')
-rw-r--r--qpid/tools/src/py/.gitignore22
-rw-r--r--qpid/tools/src/py/README.txt4
-rwxr-xr-xqpid/tools/src/py/qmf-tool684
-rwxr-xr-xqpid/tools/src/py/qpid-cluster310
-rwxr-xr-xqpid/tools/src/py/qpid-cluster-store75
-rwxr-xr-xqpid/tools/src/py/qpid-config682
-rwxr-xr-xqpid/tools/src/py/qpid-printevents114
-rwxr-xr-xqpid/tools/src/py/qpid-queue-stats145
-rwxr-xr-xqpid/tools/src/py/qpid-route597
-rwxr-xr-xqpid/tools/src/py/qpid-stat521
-rwxr-xr-xqpid/tools/src/py/qpid-tool737
11 files changed, 3891 insertions, 0 deletions
diff --git a/qpid/tools/src/py/.gitignore b/qpid/tools/src/py/.gitignore
new file mode 100644
index 0000000000..97cb05dc36
--- /dev/null
+++ b/qpid/tools/src/py/.gitignore
@@ -0,0 +1,22 @@
+
+#
+#
+#
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# "License"); you may not use this file except in compliance
+# KIND, either express or implied. See the License for the
+# Licensed to the Apache Software Foundation (ASF) under one
+# Unless required by applicable law or agreed to in writing,
+# distributed with this work for additional information
+# or more contributor license agreements. See the NOTICE file
+# regarding copyright ownership. The ASF licenses this file
+# software distributed under the License is distributed on an
+# specific language governing permissions and limitations
+# to you under the Apache License, Version 2.0 (the
+# under the License.
+# with the License. You may obtain a copy of the License at
+/qpid-clusterc
+/qpid-configc
+/qpid-routec
diff --git a/qpid/tools/src/py/README.txt b/qpid/tools/src/py/README.txt
new file mode 100644
index 0000000000..cabeb1be02
--- /dev/null
+++ b/qpid/tools/src/py/README.txt
@@ -0,0 +1,4 @@
+To run these programs, please set PYTHONPATH to include:
+
+ qpid/python
+ qpid/extras/qmf/src/py
diff --git a/qpid/tools/src/py/qmf-tool b/qpid/tools/src/py/qmf-tool
new file mode 100755
index 0000000000..e366d04709
--- /dev/null
+++ b/qpid/tools/src/py/qmf-tool
@@ -0,0 +1,684 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import optparse
+import sys
+import socket
+from cmd import Cmd
+from shlex import split
+from threading import Lock
+from time import strftime, gmtime
+from qpid.disp import Display
+import cqpid
+import qmf2
+
+class Mcli(Cmd):
+ """ Management Command Interpreter """
+
+ def __init__(self, dataObject, dispObject):
+ Cmd.__init__(self)
+ self.dataObject = dataObject
+ self.dispObject = dispObject
+ self.dataObject.setCli(self)
+ self.prompt = "qmf: "
+
+ def emptyline(self):
+ pass
+
+ def setPromptMessage(self, p=None):
+ if p == None:
+ self.prompt = "qmf: "
+ else:
+ self.prompt = "qmf[%s]: " % p
+
+ def do_help(self, data):
+ print "Management Tool for QMF"
+ print
+ print "Agent Commands:"
+ print " set filter <filter-string> - Filter the list of agents"
+ print " show filter - Show the agent filter currently in effect"
+ print " list agents - Print a list of the known Agents"
+ print " show agent <item-number> - Print detailed information about an Agent"
+ print " set default <item-number> - Set the default agent for operations"
+ print
+ print "Schema Commands:"
+ print " list packages - Print a list of packages supported by the default agent"
+ print " list classes [<package-name>] - Print all classes supported byt the default agent"
+ print " show class <class-name> [<package-name>] - Show details of a class"
+ print
+ print "Data Commands:"
+ print " query <class-name> [<package-name>] [<predicate>] - Query for data from the agent"
+ print " list - List accumulated query results"
+ print " clear - Clear accumulated query results"
+ print " show <id> - Show details from a data object"
+ print " call <id> <method> [<args>] - Call a method on a data object"
+ print
+ print "General Commands:"
+ print " set time-format short - Select short timestamp format (default)"
+ print " set time-format long - Select long timestamp format"
+ print " quit or ^D - Exit the program"
+ print
+
+ def complete_set(self, text, line, begidx, endidx):
+ """ Command completion for the 'set' command """
+ tokens = split(line[:begidx])
+ if len(tokens) == 1:
+ return [i for i in ('filter ', 'default ', 'time-format ') if i.startswith(text)]
+ if len(tokens) == 2 and tokens[1] == 'time-format':
+ return [i for i in ('long', 'short') if i.startswith(text)]
+ return []
+
+ def do_set(self, data):
+ tokens = split(data)
+ try:
+ if tokens[0] == "time-format":
+ self.dispObject.do_setTimeFormat(tokens[1])
+ else:
+ self.dataObject.do_set(data)
+ except Exception, e:
+ print "Exception in set command:", e
+
+ def complete_list(self, text, line, begidx, endidx):
+ tokens = split(line[:begidx])
+ if len(tokens) == 1:
+ return [i for i in ('agents', 'packages', 'classes ') if i.startswith(text)]
+ return []
+
+ def do_list(self, data):
+ try:
+ self.dataObject.do_list(data)
+ except Exception, e:
+ print "Exception in list command:", e
+
+ def complete_show(self, text, line, begidx, endidx):
+ tokens = split(line[:begidx])
+ if len(tokens) == 1:
+ return [i for i in ('filter', 'agent ', 'class ') if i.startswith(text)]
+ return []
+
+ def do_show(self, data):
+ try:
+ self.dataObject.do_show(data)
+ except Exception, e:
+ print "Exception in show command:", e
+
+ def complete_query(self, text, line, begidx, endidx):
+ return []
+
+ def do_query(self, data):
+ try:
+ self.dataObject.do_query(data)
+ except Exception, e:
+ if e.message.__class__ == qmf2.Data:
+ e = e.message.getProperties()
+ print "Exception in query command:", e
+
+ def do_call(self, data):
+ try:
+ self.dataObject.do_call(data)
+ except Exception, e:
+ if e.message.__class__ == qmf2.Data:
+ e = e.message.getProperties()
+ print "Exception in call command:", e
+
+ def do_clear(self, data):
+ try:
+ self.dataObject.do_clear(data)
+ except Exception, e:
+ print "Exception in clear command:", e
+
+ def do_EOF(self, data):
+ print "quit"
+ try:
+ self.dataObject.do_exit()
+ except:
+ pass
+ return True
+
+ def do_quit(self, data):
+ try:
+ self.dataObject.do_exit()
+ except:
+ pass
+ return True
+
+ def postcmd(self, stop, line):
+ return stop
+
+ def postloop(self):
+ print "Exiting..."
+ self.dataObject.close()
+
+
+#======================================================================================================
+# QmfData
+#======================================================================================================
+class QmfData:
+ """
+ """
+ def __init__(self, disp, url):
+ self.disp = disp
+ self.url = url
+ self.agent_filter = '[]'
+ self.connection = cqpid.Connection(self.url)
+ self.connection.open()
+ self.session = qmf2.ConsoleSession(self.connection)
+ self.session.setAgentFilter(self.agent_filter)
+ self.session.open()
+ self.lock = Lock()
+ self.cli = None
+ self.agents = {} # Map of number => agent object
+ self.deleted_agents = {} # Map of number => agent object
+ self.agent_numbers = {} # Map of agent name => number
+ self.next_number = 1
+ self.focus_agent = None
+ self.data_list = {}
+ self.next_data_index = 1
+
+ #=======================
+ # Methods to support CLI
+ #=======================
+ def setCli(self, cli):
+ self.cli = cli
+
+ def close(self):
+ try:
+ self.session.close()
+ self.connection.close()
+ except:
+ pass # we're shutting down - ignore any errors
+
+ def do_list(self, data):
+ tokens = data.split()
+ if len(tokens) == 0:
+ self.listData()
+ elif tokens[0] == 'agents' or tokens[0] == 'agent':
+ self.listAgents()
+ elif tokens[0] == 'packages' or tokens[0] == 'package':
+ self.listPackages()
+ elif tokens[0] == 'classes' or tokens[0] == 'class':
+ self.listClasses(tokens[1:])
+
+ def do_set(self, data):
+ tokens = split(data)
+ if len(tokens) == 0:
+ print "What do you want to set? type 'help' for more information."
+ return
+ if tokens[0] == 'filter':
+ if len(tokens) == 2:
+ self.setAgentFilter(tokens[1])
+ elif tokens[0] == 'default':
+ if len(tokens) == 2:
+ self.updateAgents()
+ number = int(tokens[1])
+ self.focus_agent = self.agents[number]
+ print "Default Agent: %s" % self.focus_agent.getName()
+
+ def do_show(self, data):
+ tokens = split(data)
+ if len(tokens) == 0:
+ print "What do you want to show? Type 'help' for more information."
+ return
+
+ if tokens[0] == 'agent':
+ self.showAgent(tokens[1:])
+ return
+
+ if tokens[0] == 'filter':
+ print self.agent_filter
+ return
+
+ if tokens[0] == "default":
+ if not self.focus_agent:
+ self.updateAgents()
+ if self.focus_agent:
+ print "Default Agent: %s" % self.focus_agent.getName()
+ else:
+ print "Default Agent not set"
+ return
+
+ if tokens[0] == "class":
+ self.showClass(tokens[1:])
+ return
+
+ if tokens[0].isdigit():
+ self.showData(tokens[0])
+ return
+
+ print "What do you want to show? Type 'help' for more information."
+ return
+
+ def do_query(self, data):
+ tokens = split(data)
+ if len(tokens) == 0:
+ print "Class name not specified."
+ return
+ cname = tokens[0]
+ pname = None
+ pred = None
+ if len(tokens) >= 2:
+ if tokens[1][0] == '[':
+ pred = tokens[1]
+ else:
+ pname = tokens[1]
+ if len(tokens) >= 3:
+ pred = tokens[2]
+ query = "{class:'%s'" % cname
+ if pname:
+ query += ",package:'%s'" % pname
+ if pred:
+ query += ",where:%s" % pred
+ query += "}"
+ if not self.focus_agent:
+ self.updateAgents()
+ d_list = self.focus_agent.query(query)
+ local_data_list = {}
+ for d in d_list:
+ local_data_list[self.next_data_index] = d
+ self.next_data_index += 1
+ rows = []
+ for index,val in local_data_list.items():
+ rows.append((index, val.getAddr().getName()))
+ self.data_list[index] = val
+ self.disp.table("Data Objects Returned: %d:" % len(d_list), ("Number", "Data Address"), rows)
+
+ def do_call(self, data):
+ tokens = split(data)
+ if len(tokens) < 2:
+ print "Data ID and method-name not specified."
+ return
+ idx = int(tokens[0])
+ methodName = tokens[1]
+ args = []
+ for arg in tokens[2:]:
+ ##
+ ## If the argument is a map, list, boolean, integer, or floating (one decimal point),
+ ## run it through the Python evaluator so it is converted to the correct type.
+ ##
+ ## TODO: use a regex for this instead of this convoluted logic
+ if arg[0] == '{' or arg[0] == '[' or arg == "True" or arg == "False" or \
+ ((arg.count('.') < 2 and (arg.count('-') == 0 or \
+ (arg.count('-') == 1 and arg[0] == '-')) and \
+ arg.replace('.','').replace('-','').isdigit())):
+ args.append(eval(arg))
+ else:
+ args.append(arg)
+
+ if not idx in self.data_list:
+ print "Unknown data index, run 'query' to get a list of data indices"
+ return
+
+ data = self.data_list[idx]
+ data._getSchema()
+ result = data._invoke(methodName, args, {})
+ rows = []
+ for k,v in result.items():
+ rows.append((k,v))
+ self.disp.table("Output Parameters:", ("Name", "Value"), rows)
+
+ def do_clear(self, data):
+ self.data_list = {}
+ self.next_data_index = 1
+ print "Accumulated query results cleared"
+
+ def do_exit(self):
+ pass
+
+ #====================
+ # Sub-Command Methods
+ #====================
+ def setAgentFilter(self, filt):
+ self.agent_filter = filt
+ self.session.setAgentFilter(filt)
+
+ def updateAgents(self):
+ agents = self.session.getAgents()
+ number_list = []
+ for agent in agents:
+ if agent.getName() not in self.agent_numbers:
+ number = self.next_number
+ number_list.append(number)
+ self.next_number += 1
+ self.agent_numbers[agent.getName()] = number
+ self.agents[number] = agent
+ else:
+ ## Track seen agents so we can clean out deleted ones
+ number = self.agent_numbers[agent.getName()]
+ number_list.append(number)
+ if number in self.deleted_agents:
+ self.agents[number] = self.deleted_agents.pop(number)
+ deleted = []
+ for number in self.agents:
+ if number not in number_list:
+ deleted.append(number)
+ for number in deleted:
+ self.deleted_agents[number] = self.agents.pop(number)
+ if not self.focus_agent:
+ self.focus_agent = self.session.getConnectedBrokerAgent()
+
+ def listAgents(self):
+ self.updateAgents()
+ rows = []
+ for number in self.agents:
+ agent = self.agents[number]
+ if self.focus_agent and agent.getName() == self.focus_agent.getName():
+ d = '*'
+ else:
+ d = ''
+ rows.append((d, number, agent.getVendor(), agent.getProduct(), agent.getInstance(), agent.getEpoch()))
+ self.disp.table("QMF Agents:", ("", "Id", "Vendor", "Product", "Instance", "Epoch"), rows)
+
+ def listPackages(self):
+ if not self.focus_agent:
+ raise "Default Agent not set - use 'set default'"
+ self.focus_agent.loadSchemaInfo()
+ packages = self.focus_agent.getPackages()
+ for p in packages:
+ print " %s" % p
+
+ def getClasses(self, tokens):
+ if not self.focus_agent:
+ raise "Default Agent not set - use 'set default'"
+ return
+ self.focus_agent.loadSchemaInfo()
+ if len(tokens) == 1:
+ classes = self.focus_agent.getSchemaIds(tokens[0]);
+ else:
+ packages = self.focus_agent.getPackages()
+ classes = []
+ for p in packages:
+ classes.extend(self.focus_agent.getSchemaIds(p))
+ return classes
+
+ def listClasses(self, tokens):
+ classes = self.getClasses(tokens)
+ rows = []
+ for c in classes:
+ rows.append((c.getPackageName(), c.getName(), self.classTypeName(c.getType())))
+ self.disp.table("Classes:", ("Package", "Class", "Type"), rows)
+
+ def showClass(self, tokens):
+ if len(tokens) < 1:
+ return
+ classes = self.getClasses([])
+ c = tokens[0]
+ p = None
+ if len(tokens) == 2:
+ p = tokens[1]
+ schema = None
+ sid = None
+ for cls in classes:
+ if c == cls.getName():
+ if not p or p == cls.getPackageName():
+ schema = self.focus_agent.getSchema(cls)
+ sid = cls
+ break
+ if not sid:
+ return
+ print "Class: %s:%s (%s) - %s" % \
+ (sid.getPackageName(), sid.getName(), self.classTypeName(sid.getType()), schema.getDesc())
+ print " hash: %r" % sid.getHash()
+ props = schema.getProperties()
+ methods = schema.getMethods()
+ rows = []
+ for prop in props:
+ name = prop.getName()
+ dtype = self.typeName(prop.getType())
+ if len(prop.getSubtype()) > 0:
+ dtype += "(%s)" % prop.getSubtype()
+ access = self.accessName(prop.getAccess())
+ idx = self.yes_blank(prop.isIndex())
+ opt = self.yes_blank(prop.isOptional())
+ unit = prop.getUnit()
+ desc = prop.getDesc()
+ rows.append((name, dtype, idx, access, opt, unit, desc))
+ self.disp.table("Properties:", ("Name", "Type", "Index", "Access", "Optional", "Unit", "Description"), rows)
+ if len(methods) > 0:
+ for meth in methods:
+ name = meth.getName()
+ desc = meth.getDesc()
+ if len(desc) > 0:
+ desc = " - " + desc
+ args = meth.getArguments()
+ rows = []
+ for prop in args:
+ aname = prop.getName()
+ dtype = self.typeName(prop.getType())
+ if len(prop.getSubtype()) > 0:
+ dtype += "(%s)" % prop.getSubtype()
+ unit = prop.getUnit()
+ adesc = prop.getDesc()
+ io = self.dirName(prop.getDirection())
+ rows.append((aname, dtype, io, unit, adesc))
+ print
+ print " Method: %s%s" % (name, desc)
+ self.disp.table("Arguments:", ("Name", "Type", "Dir", "Unit", "Description"), rows)
+
+ def showAgent(self, tokens):
+ self.updateAgents()
+ for token in tokens:
+ number = int(token)
+ agent = self.agents[number]
+ print
+ print " =================================================================================="
+ print " Agent Id: %d" % number
+ print " Agent Name: %s" % agent.getName()
+ print " Epoch: %d" % agent.getEpoch()
+ print " Attributes:"
+ attrs = agent.getAttributes()
+ keys = attrs.keys()
+ keys.sort()
+ pairs = []
+ for key in keys:
+ if key == '_timestamp' or key == '_schema_updated':
+ val = disp.timestamp(attrs[key])
+ else:
+ val = attrs[key]
+ pairs.append((key, val))
+ self.printAlignedPairs(pairs)
+ agent.loadSchemaInfo()
+ print " Packages:"
+ packages = agent.getPackages()
+ for package in packages:
+ print " %s" % package
+
+ def showData(self, idx):
+ num = int(idx)
+ if not num in self.data_list:
+ print "Data ID not known, run 'query' first to get data"
+ return
+ data = self.data_list[num]
+ props = data.getProperties()
+ rows = []
+ for k,v in props.items():
+ rows.append((k, v))
+ self.disp.table("Properties:", ("Name", "Value"), rows)
+
+ def listData(self):
+ if len(self.data_list) == 0:
+ print "No Query Results - Use the 'query' command"
+ return
+ rows = []
+ for index,val in self.data_list.items():
+ rows.append((index, val.getAgent().getName(), val.getAddr().getName()))
+ self.disp.table("Accumulated Query Results:", ('Number', 'Agent', 'Data Address'), rows)
+
+ def printAlignedPairs(self, rows, indent=8):
+ maxlen = 0
+ for first, second in rows:
+ if len(first) > maxlen:
+ maxlen = len(first)
+ maxlen += indent
+ for first, second in rows:
+ for i in range(maxlen - len(first)):
+ print "",
+ print "%s : %s" % (first, second)
+
+ def classTypeName(self, code):
+ if code == qmf2.SCHEMA_TYPE_DATA: return "Data"
+ if code == qmf2.SCHEMA_TYPE_EVENT: return "Event"
+ return "Unknown"
+
+ def typeName (self, typecode):
+ """ Convert type-codes to printable strings """
+ if typecode == qmf2.SCHEMA_DATA_VOID: return "void"
+ elif typecode == qmf2.SCHEMA_DATA_BOOL: return "bool"
+ elif typecode == qmf2.SCHEMA_DATA_INT: return "int"
+ elif typecode == qmf2.SCHEMA_DATA_FLOAT: return "float"
+ elif typecode == qmf2.SCHEMA_DATA_STRING: return "string"
+ elif typecode == qmf2.SCHEMA_DATA_MAP: return "map"
+ elif typecode == qmf2.SCHEMA_DATA_LIST: return "list"
+ elif typecode == qmf2.SCHEMA_DATA_UUID: return "uuid"
+ else:
+ raise ValueError ("Invalid type code: %s" % str(typecode))
+
+ def valueByType(self, typecode, val):
+ if typecode == 1: return "%d" % val
+ elif typecode == 2: return "%d" % val
+ elif typecode == 3: return "%d" % val
+ elif typecode == 4: return "%d" % val
+ elif typecode == 6: return val
+ elif typecode == 7: return val
+ elif typecode == 8: return strftime("%c", gmtime(val / 1000000000))
+ elif typecode == 9:
+ if val < 0: val = 0
+ sec = val / 1000000000
+ min = sec / 60
+ hour = min / 60
+ day = hour / 24
+ result = ""
+ if day > 0:
+ result = "%dd " % day
+ if hour > 0 or result != "":
+ result += "%dh " % (hour % 24)
+ if min > 0 or result != "":
+ result += "%dm " % (min % 60)
+ result += "%ds" % (sec % 60)
+ return result
+
+ elif typecode == 10: return str(self.idRegistry.displayId(val))
+ elif typecode == 11:
+ if val:
+ return "True"
+ else:
+ return "False"
+
+ elif typecode == 12: return "%f" % val
+ elif typecode == 13: return "%f" % val
+ elif typecode == 14: return "%r" % val
+ elif typecode == 15: return "%r" % val
+ elif typecode == 16: return "%d" % val
+ elif typecode == 17: return "%d" % val
+ elif typecode == 18: return "%d" % val
+ elif typecode == 19: return "%d" % val
+ elif typecode == 20: return "%r" % val
+ elif typecode == 21: return "%r" % val
+ elif typecode == 22: return "%r" % val
+ else:
+ raise ValueError ("Invalid type code: %s" % str(typecode))
+
+ def accessName (self, code):
+ """ Convert element access codes to printable strings """
+ if code == qmf2.ACCESS_READ_CREATE: return "ReadCreate"
+ elif code == qmf2.ACCESS_READ_WRITE: return "ReadWrite"
+ elif code == qmf2.ACCESS_READ_ONLY: return "ReadOnly"
+ else:
+ raise ValueError ("Invalid access code: %s" % str(code))
+
+ def dirName(self, io):
+ if io == qmf2.DIR_IN: return "in"
+ elif io == qmf2.DIR_OUT: return "out"
+ elif io == qmf2.DIR_IN_OUT: return "in_out"
+ else:
+ raise ValueError("Invalid direction code: %r" % io)
+
+ def notNone (self, text):
+ if text == None:
+ return ""
+ else:
+ return text
+
+ def yes_blank(self, val):
+ if val:
+ return "Y"
+ return ""
+
+ def objectIndex(self, obj):
+ if obj._objectId.isV2:
+ return obj._objectId.getObject()
+ result = ""
+ first = True
+ props = obj.getProperties()
+ for prop in props:
+ if prop[0].index:
+ if not first:
+ result += "."
+ result += self.valueByType(prop[0].type, prop[1])
+ first = None
+ return result
+
+def Usage():
+ print "Usage: qpid-tool [[<username>/<password>@]<target-host>[:<tcp-port>]]"
+ print
+
+#=========================================================
+# Main Program
+#=========================================================
+
+# Get host name and port if specified on the command line
+cargs = sys.argv[1:]
+_host = "localhost"
+
+if len(cargs) > 0:
+ _host = cargs[0]
+
+if _host[0] == '-':
+ Usage()
+ if _host != '-h' and _host != "--help":
+ print "qpid-tool: error: no such option:", _host
+ sys.exit(1)
+
+disp = Display()
+
+# Attempt to make a connection to the target broker
+try:
+ data = QmfData(disp, _host)
+except Exception, e:
+ if str(e).find("Exchange not found") != -1:
+ print "Management not enabled on broker: Use '-m yes' option on broker startup."
+ else:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+ sys.exit(1)
+
+# Instantiate the CLI interpreter and launch it.
+cli = Mcli(data, disp)
+print("Management Tool for QMF")
+try:
+ cli.cmdloop()
+except KeyboardInterrupt:
+ print
+ print "Exiting..."
+except Exception, e:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+
+# alway attempt to cleanup broker resources
+data.close()
diff --git a/qpid/tools/src/py/qpid-cluster b/qpid/tools/src/py/qpid-cluster
new file mode 100755
index 0000000000..312d59f670
--- /dev/null
+++ b/qpid/tools/src/py/qpid-cluster
@@ -0,0 +1,310 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+from optparse import OptionParser, OptionGroup
+import sys
+import locale
+import socket
+import re
+from qmf.console import Session
+
+class Config:
+ def __init__(self):
+ self._host = "localhost"
+ self._connTimeout = 10
+ self._stopId = None
+ self._stopAll = False
+ self._force = False
+ self._numeric = False
+ self._showConn = False
+ self._delConn = None
+
+class IpAddr:
+ def __init__(self, text):
+ if text.find("@") != -1:
+ tokens = text.split("@")
+ text = tokens[1]
+ if text.find(":") != -1:
+ tokens = text.split(":")
+ text = tokens[0]
+ self.port = int(tokens[1])
+ else:
+ self.port = 5672
+ self.dottedQuad = socket.gethostbyname(text)
+ nums = self.dottedQuad.split(".")
+ self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])
+
+ def bestAddr(self, addrPortList):
+ bestDiff = 0xFFFFFFFFL
+ bestAddr = None
+ for addrPort in addrPortList:
+ diff = IpAddr(addrPort[0]).addr ^ self.addr
+ if diff < bestDiff:
+ bestDiff = diff
+ bestAddr = addrPort
+ return bestAddr
+
+class BrokerManager:
+ def __init__(self, config):
+ self.config = config
+ self.brokerName = None
+ self.qmf = None
+ self.broker = None
+ self.brokers = []
+
+ def SetBroker(self, brokerUrl):
+ self.url = brokerUrl
+ self.qmf = Session()
+ self.broker = self.qmf.addBroker(brokerUrl, self.config._connTimeout)
+ agents = self.qmf.getAgents()
+ for a in agents:
+ if a.getAgentBank() == '0':
+ self.brokerAgent = a
+
+ def Disconnect(self):
+ """ Release any allocated brokers. Ignore any failures as the tool is
+ shutting down.
+ """
+ try:
+ if self.broker:
+ self.qmf.delBroker(self.broker)
+ self.broker = None
+ while len(self.brokers):
+ b = self.brokers.pop()
+ self.qmf.delBroker(b)
+ except:
+ pass
+
+ def _getClusters(self):
+ packages = self.qmf.getPackages()
+ if "org.apache.qpid.cluster" not in packages:
+ raise Exception("Clustering is not installed on the broker.")
+
+ clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
+ if len(clusters) == 0:
+ raise Exception("Clustering is installed but not enabled on the broker.")
+
+ return clusters
+
+ def _getHostList(self, urlList):
+ hosts = []
+ hostAddr = IpAddr(self.config._host)
+ for url in urlList:
+ if url.find("amqp:") != 0:
+ raise Exception("Invalid URL 1")
+ url = url[5:]
+ addrs = str(url).split(",")
+ addrList = []
+ for addr in addrs:
+ tokens = addr.split(":")
+ if len(tokens) != 3:
+ raise Exception("Invalid URL 2")
+ addrList.append((tokens[1], tokens[2]))
+
+ # Find the address in the list that is most likely to be in the same subnet as the address
+ # with which we made the original QMF connection. This increases the probability that we will
+ # be able to reach the cluster member.
+
+ best = hostAddr.bestAddr(addrList)
+ bestUrl = best[0] + ":" + best[1]
+ hosts.append(bestUrl)
+ return hosts
+
+ def overview(self):
+ clusters = self._getClusters()
+ cluster = clusters[0]
+ memberList = cluster.members.split(";")
+ idList = cluster.memberIDs.split(";")
+
+ print " Cluster Name: %s" % cluster.clusterName
+ print "Cluster Status: %s" % cluster.status
+ print " Cluster Size: %d" % cluster.clusterSize
+ print " Members: ID=%s URL=%s" % (idList[0], memberList[0])
+ for idx in range(1,len(idList)):
+ print " : ID=%s URL=%s" % (idList[idx], memberList[idx])
+
+ def stopMember(self, id):
+ clusters = self._getClusters()
+ cluster = clusters[0]
+ idList = cluster.memberIDs.split(";")
+ if id not in idList:
+ raise Exception("No member with matching ID found")
+
+ if not self.config._force:
+ prompt = "Warning: "
+ if len(idList) == 1:
+ prompt += "This command will shut down the last running cluster member."
+ else:
+ prompt += "This command will shut down a cluster member."
+ prompt += " Are you sure? [N]: "
+
+ confirm = raw_input(prompt)
+ if len(confirm) == 0 or confirm[0].upper() != 'Y':
+ raise Exception("Operation canceled")
+
+ cluster.stopClusterNode(id)
+
+ def stopAll(self):
+ clusters = self._getClusters()
+ if not self.config._force:
+ prompt = "Warning: This command will shut down the entire cluster."
+ prompt += " Are you sure? [N]: "
+
+ confirm = raw_input(prompt)
+ if len(confirm) == 0 or confirm[0].upper() != 'Y':
+ raise Exception("Operation canceled")
+
+ cluster = clusters[0]
+ cluster.stopFullCluster()
+
+ def showConnections(self):
+ clusters = self._getClusters()
+ cluster = clusters[0]
+ memberList = cluster.members.split(";")
+ idList = cluster.memberIDs.split(";")
+ displayList = []
+ hostList = self._getHostList(memberList)
+ self.qmf.delBroker(self.broker)
+ self.broker = None
+
+ idx = 0
+ for host in hostList:
+ if self.config._showConn == "all" or self.config._showConn == idList[idx] or self.config._delConn:
+ self.brokers.append(self.qmf.addBroker(host, self.config._connTimeout))
+ displayList.append(idList[idx])
+ idx += 1
+
+ idx = 0
+ found = False
+ for broker in self.brokers:
+ if not self.config._delConn:
+ print "Clients on Member: ID=%s:" % displayList[idx]
+ connList = self.qmf.getObjects(_class="connection", _package="org.apache.qpid.broker", _broker=broker)
+ for conn in connList:
+ if not conn.shadow:
+ if self.config._numeric or self.config._delConn:
+ a = conn.address
+ else:
+ tokens = conn.address.split(":")
+ try:
+ hostList = socket.gethostbyaddr(tokens[0])
+ host = hostList[0]
+ except:
+ host = tokens[0]
+ a = host + ":" + tokens[1]
+ if self.config._delConn:
+ tokens = self.config._delConn.split(":")
+ ip = socket.gethostbyname(tokens[0])
+ toDelete = ip + ":" + tokens[1]
+ if a == toDelete:
+ print "Closing connection from client: %s" % a
+ conn.close()
+ found = True
+ else:
+ print " %s" % a
+ idx += 1
+ if not self.config._delConn:
+ print
+ if self.config._delConn and not found:
+ print "Client connection '%s' not found" % self.config._delConn
+
+ while len(self.brokers):
+ broker = self.brokers.pop()
+ self.qmf.delBroker(broker)
+
+def main(argv=None):
+
+ try:
+ config = Config()
+
+ parser = OptionParser(usage="usage: %prog [options] BROKER",
+ description="Example: $ qpid-cluster -C broker-host:10000")
+
+ parser.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="SECS", help="Maximum time to wait for broker connection (in seconds)")
+ parser.add_option("-C", "--all-connections", action="store_true", default=False, help="View client connections to all cluster members")
+ parser.add_option("-c", "--connections", metavar="ID", help="View client connections to specified member")
+ parser.add_option("-d", "--del-connection", metavar="HOST:PORT", help="Disconnect a client connection")
+ parser.add_option("-s", "--stop", metavar="ID", help="Stop one member of the cluster by its ID")
+ parser.add_option("-k", "--all-stop", action="store_true", default=False, help="Shut down the whole cluster")
+ parser.add_option("-f", "--force", action="store_true", default=False, help="Suppress the 'are you sure' prompt")
+ parser.add_option("-n", "--numeric", action="store_true", default=False, help="Don't resolve names")
+
+ opts, args = parser.parse_args(args=argv)
+
+ if args:
+ config._host = args[0]
+
+ if opts.timeout != 0:
+ config._connTimeout = opts.timeout
+ else:
+ config._connTimeout = None
+
+ if opts.all_connections:
+ config._showConn = "all"
+
+ if opts.connections:
+ config._connections = opts.connections
+ if len(config._connections.split(":")) != 2:
+ parser.error("Member ID must be of form: <host or ip>:<number>")
+
+ if opts.del_connection:
+ config._delConn = opts.del_connection
+ if len(config._delConn.split(":")) != 2:
+ parser.error("Member ID must be of form: <host or ip>:<number>")
+
+ if opts.stop:
+ config._stopID = opts.stop
+ if len(config._stopId.split(":")) != 2:
+ parser.error("Member ID must be of form: <host or ip>:<number>")
+
+ config._stopAll = opts.all_stop
+ config._force = opts.force
+ config._numeric = opts.numeric
+
+ bm = BrokerManager(config)
+
+ try:
+ bm.SetBroker(config._host)
+ if config._stopId:
+ bm.stopMember(config._stopId)
+ elif config._stopAll:
+ bm.stopAll()
+ elif config._showConn or config._delConn:
+ bm.showConnections()
+ else:
+ bm.overview()
+ except KeyboardInterrupt:
+ print
+ except Exception,e:
+ bm.Disconnect() # try to deallocate brokers - ignores errors
+ if str(e).find("connection aborted") > 0:
+ # we expect this when asking the connected broker to shut down
+ return 0
+ raise Exception("Failed: %s - %s" % (e.__class__.__name__, e))
+
+ bm.Disconnect()
+ except Exception, e:
+ print str(e)
+ return 1
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/qpid/tools/src/py/qpid-cluster-store b/qpid/tools/src/py/qpid-cluster-store
new file mode 100755
index 0000000000..3541b6679c
--- /dev/null
+++ b/qpid/tools/src/py/qpid-cluster-store
@@ -0,0 +1,75 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+from qpid.datatypes import uuid4, UUID, parseUUID
+import optparse, os.path, sys, string
+
+op = optparse.OptionParser(
+ usage="usage: %prog [options] DATADIR",
+ description="View or modify cluster store status for broker with data-directory DATADIR")
+op.add_option("-d", "--display", default=False, action="store_true", help="display store status." )
+op.add_option("-c", "--mark-clean", default=False, action="store_true", help="mark the store as clean." )
+
+class ClusterStoreStatus:
+ """Load/save/display store status file"""
+
+ null_uuid=UUID(bytes='\0'*16)
+
+ def __init__(self, file):
+ self.file = file
+ self.read()
+
+ def read(self):
+ f = open(self.file)
+ try: self.cluster_id, self.shutdown_id = [parseUUID(string.rstrip(s)) for s in f.readlines()]
+ finally: f.close()
+
+ def write(self):
+ f = open(self.file,"w")
+ try:
+ for u in [self.cluster_id, self.shutdown_id]: f.write(str(u)+"\n")
+ finally: f.close()
+
+ def status(self):
+ if (self.cluster_id == self.null_uuid): return "empty"
+ if (self.shutdown_id == self.null_uuid): return "dirty"
+ return "clean"
+
+ def __str__(self):
+ return "status: %s\ncluster-id: %s\nshutdown_id: %s" % (
+ self.status(), self.cluster_id, self.shutdown_id)
+
+ def mark_clean(self):
+ self.shutdown_id = uuid4()
+ self.write()
+
+def main(argv=None):
+ opts, args = op.parse_args(args=argv)
+ if len(args) != 1: op.error("incorrect number of arguments")
+ try: status = ClusterStoreStatus(args[0]+"/cluster/store.status")
+ except Exception,e: print e; return 1
+ if opts.display: print status
+ if opts.mark_clean: status.mark_clean(); print status
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config
new file mode 100755
index 0000000000..3df69dc99b
--- /dev/null
+++ b/qpid/tools/src/py/qpid-config
@@ -0,0 +1,682 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+from optparse import OptionParser, OptionGroup, IndentedHelpFormatter
+import sys
+import locale
+from qmf.console import Session
+
+usage = """
+Usage: qpid-config [OPTIONS]
+ qpid-config [OPTIONS] exchanges [filter-string]
+ qpid-config [OPTIONS] queues [filter-string]
+ qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions]
+ qpid-config [OPTIONS] del exchange <name>
+ qpid-config [OPTIONS] add queue <name> [AddQueueOptions]
+ qpid-config [OPTIONS] del queue <name> [DelQueueOptions]
+ qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key]
+ <for type xml> [-f -|filename]
+ <for type header> [all|any] k1=v1 [, k2=v2...]
+ qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]"""
+
+description = """
+ADDRESS syntax:
+
+ [username/password@] hostname [:<port>]
+ [username/password@] ip-address [:<port>]
+
+Examples:
+
+$ qpid-config add queue q
+$ qpid-config add exchange direct d -a localhost:5672
+$ qpid-config exchanges -a 10.1.1.7:10000
+$ qpid-config queues -a guest/guest@broker-host:10000
+
+Add Exchange <type> values:
+
+ direct Direct exchange for point-to-point communication
+ fanout Fanout exchange for broadcast communication
+ topic Topic exchange that routes messages using binding keys with wildcards
+ headers Headers exchange that matches header fields against the binding keys
+ xml XML Exchange - allows content filtering using an XQuery
+
+
+Queue Limit Actions
+
+ none (default) - Use broker's default policy
+ reject - Reject enqueued messages
+ flow-to-disk - Page messages to disk
+ ring - Replace oldest unacquired message with new
+ ring-strict - Replace oldest message, reject if oldest is acquired
+
+Queue Ordering Policies
+
+ fifo (default) - First in, first out
+ lvq - Last Value Queue ordering, allows queue browsing
+ lvq-no-browse - Last Value Queue ordering, browsing clients may lose data"""
+
+
+class Config:
+ def __init__(self):
+ self._recursive = False
+ self._host = "localhost"
+ self._connTimeout = 10
+ self._altern_ex = None
+ self._passive = False
+ self._durable = False
+ self._clusterDurable = False
+ self._if_empty = True
+ self._if_unused = True
+ self._fileCount = 8
+ self._fileSize = 24
+ self._maxQueueSize = None
+ self._maxQueueCount = None
+ self._limitPolicy = None
+ self._order = None
+ self._msgSequence = False
+ self._ive = False
+ self._eventGeneration = None
+ self._file = None
+ self._sasl_mechanism = None
+ self._flowStopCount = None
+ self._flowResumeCount = None
+ self._flowStopSize = None
+ self._flowResumeSize = None
+ self._extra_arguments = []
+
+config = Config()
+
+FILECOUNT = "qpid.file_count"
+FILESIZE = "qpid.file_size"
+MAX_QUEUE_SIZE = "qpid.max_size"
+MAX_QUEUE_COUNT = "qpid.max_count"
+POLICY_TYPE = "qpid.policy_type"
+CLUSTER_DURABLE = "qpid.persist_last_node"
+LVQ = "qpid.last_value_queue"
+LVQNB = "qpid.last_value_queue_no_browse"
+MSG_SEQUENCE = "qpid.msg_sequence"
+IVE = "qpid.ive"
+QUEUE_EVENT_GENERATION = "qpid.queue_event_generation"
+FLOW_STOP_COUNT = "qpid.flow_stop_count"
+FLOW_RESUME_COUNT = "qpid.flow_resume_count"
+FLOW_STOP_SIZE = "qpid.flow_stop_size"
+FLOW_RESUME_SIZE = "qpid.flow_resume_size"
+#There are various arguments to declare that have specific program
+#options in this utility. However there is now a generic mechanism for
+#passing arguments as well. The SPECIAL_ARGS list contains the
+#arguments for which there are specific program options defined
+#i.e. the arguments for which there is special processing on add and
+#list
+SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ,LVQNB,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE]
+
+class JHelpFormatter(IndentedHelpFormatter):
+ """Format usage and description without stripping newlines from usage strings
+ """
+
+ def format_usage(self, usage):
+ return usage
+
+
+ def format_description(self, description):
+ if description:
+ return description + "\n"
+ else:
+ return ""
+
+def Usage():
+ print usage
+ exit(-1)
+
+def OptionsAndArguments(argv):
+ """ Set global variables for options, return arguments """
+
+ global config
+
+
+ parser = OptionParser(usage=usage,
+ description=description,
+ formatter=JHelpFormatter())
+
+ group1 = OptionGroup(parser, "General Options")
+ group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", help="Maximum time to wait for broker connection (in seconds)")
+ group1.add_option("-b", "--bindings", action="store_true", help="Show bindings in queue or exchange list")
+ group1.add_option("-a", "--broker-addr", action="store", type="string", default="localhost:5672", metavar="<address>", help="Address of qpidd broker")
+ group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+ parser.add_option_group(group1)
+
+ group2 = OptionGroup(parser, "Options for Adding Exchanges and Queues")
+ group2.add_option("--alternate-exchange", action="store", type="string", metavar="<aexname>", help="Name of the alternate-exchange for the new queue or exchange. Exchanges route messages to the alternate exchange if they are unable to route them elsewhere. Queues route messages to the alternate exchange if they are rejected by a subscriber or orphaned by queue deletion.")
+ group2.add_option("--passive", "--dry-run", action="store_true", help="Do not actually add the exchange or queue, ensure that all parameters and permissions are correct and would allow it to be created.")
+ group2.add_option("--durable", action="store_true", help="The new queue or exchange is durable.")
+ parser.add_option_group(group2)
+
+ group3 = OptionGroup(parser, "Options for Adding Queues")
+ group3.add_option("--cluster-durable", action="store_true", help="The new queue becomes durable if there is only one functioning cluster node")
+ group3.add_option("--file-count", action="store", type="int", default=8, metavar="<n>", help="Number of files in queue's persistence journal")
+ group3.add_option("--file-size", action="store", type="int", default=24, metavar="<n>", help="File size in pages (64Kib/page)")
+ group3.add_option("--max-queue-size", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as bytes")
+ group3.add_option("--max-queue-count", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as a number of messages")
+ group3.add_option("--limit-policy", action="store", choices=["none", "reject", "flow-to-disk", "ring", "ring-strict"], metavar="<policy>", help="Action to take when queue limit is reached")
+ group3.add_option("--order", action="store", choices=["fifo", "lvq", "lvq-no-browse"], metavar="<ordering>", help="Queue ordering policy")
+ group3.add_option("--generate-queue-events", action="store", type="int", metavar="<n>", help="If set to 1, every enqueue will generate an event that can be processed by registered listeners (e.g. for replication). If set to 2, events will be generated for enqueues and dequeues.")
+ group3.add_option("--flow-stop-size", action="store", type="int", metavar="<n>",
+ help="Turn on sender flow control when the number of queued bytes exceeds this value.")
+ group3.add_option("--flow-resume-size", action="store", type="int", metavar="<n>",
+ help="Turn off sender flow control when the number of queued bytes drops below this value.")
+ group3.add_option("--flow-stop-count", action="store", type="int", metavar="<n>",
+ help="Turn on sender flow control when the number of queued messages exceeds this value.")
+ group3.add_option("--flow-resume-count", action="store", type="int", metavar="<n>",
+ help="Turn off sender flow control when the number of queued messages drops below this value.")
+ group3.add_option("--argument", dest="extra_arguments", action="append", default=[],
+ metavar="<NAME=VALUE>", help="Specify a key-value pair to add to queue arguments")
+ # no option for declaring an exclusive queue - which can only be used by the session that creates it.
+ parser.add_option_group(group3)
+
+ group4 = OptionGroup(parser, "Options for Adding Exchanges")
+ group4.add_option("--sequence", action="store_true", help="Exchange will insert a 'qpid.msg_sequence' field in the message header")
+ group4.add_option("--ive", action="store_true", help="Exchange will behave as an 'initial-value-exchange', keeping a reference to the last message forwarded and enqueuing that message to newly bound queues.")
+ parser.add_option_group(group4)
+
+ group5 = OptionGroup(parser, "Options for Deleting Queues")
+ group5.add_option("--force", action="store_true", help="Force delete of queue even if it's currently used or it's not empty")
+ group5.add_option("--force-if-not-empty", action="store_true", help="Force delete of queue even if it's not empty")
+ group5.add_option("--force-if-used", action="store_true", help="Force delete of queue even if it's currently used")
+ parser.add_option_group(group5)
+
+ group6 = OptionGroup(parser, "Options for Declaring Bindings")
+ group6.add_option("-f", "--file", action="store", type="string", metavar="<file.xq>", help="For XML Exchange bindings - specifies the name of a file containing an XQuery.")
+ parser.add_option_group(group6)
+
+ opts, encArgs = parser.parse_args(args=argv)
+
+ try:
+ encoding = locale.getpreferredencoding()
+ args = [a.decode(encoding) for a in encArgs]
+ except:
+ args = encArgs
+
+ if opts.bindings:
+ config._recursive = True
+ if opts.broker_addr:
+ config._host = opts.broker_addr
+ if opts.timeout:
+ config._connTimeout = opts.timeout
+ if config._connTimeout == 0:
+ config._connTimeout = None
+ if opts.alternate_exchange:
+ config._altern_ex = opts.alternate_exchange
+ if opts.passive:
+ config._passive = True
+ if opts.durable:
+ config._durable = True
+ if opts.cluster_durable:
+ config._clusterDurable = True
+ if opts.file:
+ config._file = opts.file
+ if opts.file_count:
+ config._fileCount = opts.file_count
+ if opts.file_size:
+ config._fileSize = opts.file_size
+ if opts.max_queue_size:
+ config._maxQueueSize = opts.max_queue_size
+ if opts.max_queue_count:
+ config._maxQueueCount = opts.max_queue_count
+ if opts.limit_policy:
+ config._limitPolicy = opts.limit_policy
+ if opts.order:
+ config._order = opts.order
+ if opts.sequence:
+ config._msgSequence = True
+ if opts.ive:
+ config._ive = True
+ if opts.generate_queue_events:
+ config._eventGeneration = opts.generate_queue_events
+ if opts.force:
+ config._if_empty = False
+ config._if_unused = False
+ if opts.force_if_not_empty:
+ config._if_empty = False
+ if opts.force_if_used:
+ config._if_unused = False
+ if opts.sasl_mechanism:
+ config._sasl_mechanism = opts.sasl_mechanism
+ if opts.flow_stop_size:
+ config._flowStopSize = opts.flow_stop_size
+ if opts.flow_resume_size:
+ config._flowResumeSize = opts.flow_resume_size
+ if opts.flow_stop_count:
+ config._flowStopCount = opts.flow_stop_count
+ if opts.flow_resume_count:
+ config._flowResumeCount = opts.flow_resume_count
+ if opts.extra_arguments:
+ config._extra_arguments = opts.extra_arguments
+ return args
+
+
+#
+# helpers for the arg parsing in bind(). return multiple values; "ok"
+# followed by the resultant args
+
+#
+# accept -f followed by either
+# a filename or "-", for stdin. pull the bits into a string, to be
+# passed to the xml binding.
+#
+def snarf_xquery_args():
+ if not config._file:
+ print "Invalid args to bind xml: need an input file or stdin"
+ return [False]
+ if config._file == "-":
+ res = sys.stdin.read()
+ else:
+ f = open(config._file) # let this signal if it can't find it
+ res = f.read()
+ f.close()
+ return [True, res]
+
+#
+# look for "any"/"all" and grok the rest of argv into a map
+#
+def snarf_header_args(args):
+
+ if len(args) < 2:
+ print "Invalid args to bind headers: need 'any'/'all' plus conditions"
+ return [False]
+ op = args[0]
+ if op == "all" or op == "any":
+ kv = {}
+ for thing in args[1:]:
+ k_and_v = thing.split("=")
+ kv[k_and_v[0]] = k_and_v[1]
+ return [True, op, kv]
+ else:
+ print "Invalid condition arg to bind headers, need 'any' or 'all', not '" + op + "'"
+ return [False]
+
+class BrokerManager:
+ def __init__(self):
+ self.brokerName = None
+ self.qmf = None
+ self.broker = None
+ self.mechanism = None
+
+ def SetBroker(self, brokerUrl, mechanism):
+ self.url = brokerUrl
+ self.qmf = Session()
+ self.broker = self.qmf.addBroker(brokerUrl, config._connTimeout, mechanism)
+ agents = self.qmf.getAgents()
+ for a in agents:
+ if a.getAgentBank() == '0':
+ self.brokerAgent = a
+
+ def Disconnect(self):
+ if self.broker:
+ self.qmf.delBroker(self.broker)
+
+ def Overview(self):
+ exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
+ queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
+ print "Total Exchanges: %d" % len (exchanges)
+ etype = {}
+ for ex in exchanges:
+ if ex.type not in etype:
+ etype[ex.type] = 1
+ else:
+ etype[ex.type] = etype[ex.type] + 1
+ for typ in etype:
+ print "%15s: %d" % (typ, etype[typ])
+
+ print
+ print " Total Queues: %d" % len (queues)
+ durable = 0
+ for queue in queues:
+ if queue.durable:
+ durable = durable + 1
+ print " durable: %d" % durable
+ print " non-durable: %d" % (len (queues) - durable)
+
+ def ExchangeList(self, filter):
+ exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
+ caption1 = "Type "
+ caption2 = "Exchange Name"
+ maxNameLen = len(caption2)
+ for ex in exchanges:
+ if self.match(ex.name, filter):
+ if len(ex.name) > maxNameLen: maxNameLen = len(ex.name)
+ print "%s%-*s Attributes" % (caption1, maxNameLen, caption2)
+ line = ""
+ for i in range(((maxNameLen + len(caption1)) / 5) + 5):
+ line += "====="
+ print line
+
+ for ex in exchanges:
+ if self.match(ex.name, filter):
+ print "%-10s%-*s " % (ex.type, maxNameLen, ex.name),
+ args = ex.arguments
+ if not args: args = {}
+ if ex.durable: print "--durable",
+ if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence",
+ if IVE in args and args[IVE] == 1: print "--ive",
+ if ex.altExchange:
+ print "--alternate-exchange=%s" % ex._altExchange_.name,
+ print
+
+ def ExchangeListRecurse(self, filter):
+ exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
+ bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent)
+ queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
+ for ex in exchanges:
+ if self.match(ex.name, filter):
+ print "Exchange '%s' (%s)" % (ex.name, ex.type)
+ for bind in bindings:
+ if bind.exchangeRef == ex.getObjectId():
+ qname = "<unknown>"
+ queue = self.findById(queues, bind.queueRef)
+ if queue != None:
+ qname = queue.name
+ print " bind [%s] => %s" % (bind.bindingKey, qname)
+
+
+ def QueueList(self, filter):
+ queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
+
+ caption = "Queue Name"
+ maxNameLen = len(caption)
+ for q in queues:
+ if self.match(q.name, filter):
+ if len(q.name) > maxNameLen: maxNameLen = len(q.name)
+ print "%-*s Attributes" % (maxNameLen, caption)
+ line = ""
+ for i in range((maxNameLen / 5) + 5):
+ line += "====="
+ print line
+
+ for q in queues:
+ if self.match(q.name, filter):
+ print "%-*s " % (maxNameLen, q.name),
+ args = q.arguments
+ if not args: args = {}
+ if q.durable: print "--durable",
+ if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable",
+ if q.autoDelete: print "auto-del",
+ if q.exclusive: print "excl",
+ if FILESIZE in args: print "--file-size=%s" % args[FILESIZE],
+ if FILECOUNT in args: print "--file-count=%s" % args[FILECOUNT],
+ if MAX_QUEUE_SIZE in args: print "--max-queue-size=%s" % args[MAX_QUEUE_SIZE],
+ if MAX_QUEUE_COUNT in args: print "--max-queue-count=%s" % args[MAX_QUEUE_COUNT],
+ if POLICY_TYPE in args: print "--limit-policy=%s" % args[POLICY_TYPE].replace("_", "-"),
+ if LVQ in args and args[LVQ] == 1: print "--order lvq",
+ if LVQNB in args and args[LVQNB] == 1: print "--order lvq-no-browse",
+ if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%s" % args[QUEUE_EVENT_GENERATION],
+ if q.altExchange:
+ print "--alternate-exchange=%s" % q._altExchange_.name,
+ if FLOW_STOP_SIZE in args: print "--flow-stop-size=%s" % args[FLOW_STOP_SIZE],
+ if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%s" % args[FLOW_RESUME_SIZE],
+ if FLOW_STOP_COUNT in args: print "--flow-stop-count=%s" % args[FLOW_STOP_COUNT],
+ if FLOW_RESUME_COUNT in args: print "--flow-resume-count=%s" % args[FLOW_RESUME_COUNT],
+ print " ".join(["--argument %s=%s" % (k, v) for k,v in args.iteritems() if not k in SPECIAL_ARGS])
+
+ def QueueListRecurse(self, filter):
+ exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
+ bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent)
+ queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
+ for queue in queues:
+ if self.match(queue.name, filter):
+ print "Queue '%s'" % queue.name
+ for bind in bindings:
+ if bind.queueRef == queue.getObjectId():
+ ename = "<unknown>"
+ ex = self.findById(exchanges, bind.exchangeRef)
+ if ex != None:
+ ename = ex.name
+ if ename == "":
+ ename = "''"
+ print " bind [%s] => %s" % (bind.bindingKey, ename)
+
+ def AddExchange(self, args):
+ if len(args) < 2:
+ Usage()
+ etype = args[0]
+ ename = args[1]
+ declArgs = {}
+ if config._msgSequence:
+ declArgs[MSG_SEQUENCE] = 1
+ if config._ive:
+ declArgs[IVE] = 1
+ if config._altern_ex != None:
+ self.broker.getAmqpSession().exchange_declare(exchange=ename, type=etype, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs)
+ else:
+ self.broker.getAmqpSession().exchange_declare(exchange=ename, type=etype, passive=config._passive, durable=config._durable, arguments=declArgs)
+
+ def DelExchange(self, args):
+ if len(args) < 1:
+ Usage()
+ ename = args[0]
+ self.broker.getAmqpSession().exchange_delete(exchange=ename)
+
+ def AddQueue(self, args):
+ if len(args) < 1:
+ Usage()
+ qname = args[0]
+ declArgs = {}
+ for a in config._extra_arguments:
+ r = a.split("=", 1)
+ if len(r) == 2: value = r[1]
+ else: value = None
+ declArgs[r[0]] = value
+
+ if config._durable:
+ declArgs[FILECOUNT] = config._fileCount
+ declArgs[FILESIZE] = config._fileSize
+
+ if config._maxQueueSize:
+ declArgs[MAX_QUEUE_SIZE] = config._maxQueueSize
+ if config._maxQueueCount:
+ declArgs[MAX_QUEUE_COUNT] = config._maxQueueCount
+ if config._limitPolicy:
+ if config._limitPolicy == "none":
+ pass
+ elif config._limitPolicy == "reject":
+ declArgs[POLICY_TYPE] = "reject"
+ elif config._limitPolicy == "flow-to-disk":
+ declArgs[POLICY_TYPE] = "flow_to_disk"
+ elif config._limitPolicy == "ring":
+ declArgs[POLICY_TYPE] = "ring"
+ elif config._limitPolicy == "ring-strict":
+ declArgs[POLICY_TYPE] = "ring_strict"
+
+ if config._clusterDurable:
+ declArgs[CLUSTER_DURABLE] = 1
+ if config._order:
+ if config._order == "fifo":
+ pass
+ elif config._order == "lvq":
+ declArgs[LVQ] = 1
+ elif config._order == "lvq-no-browse":
+ declArgs[LVQNB] = 1
+ if config._eventGeneration:
+ declArgs[QUEUE_EVENT_GENERATION] = config._eventGeneration
+
+ if config._flowStopSize:
+ declArgs[FLOW_STOP_SIZE] = config._flowStopSize
+ if config._flowResumeSize:
+ declArgs[FLOW_RESUME_SIZE] = config._flowResumeSize
+ if config._flowStopCount:
+ declArgs[FLOW_STOP_COUNT] = config._flowStopCount
+ if config._flowResumeCount:
+ declArgs[FLOW_RESUME_COUNT] = config._flowResumeCount
+
+ if config._altern_ex != None:
+ self.broker.getAmqpSession().queue_declare(queue=qname, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs)
+ else:
+ self.broker.getAmqpSession().queue_declare(queue=qname, passive=config._passive, durable=config._durable, arguments=declArgs)
+
+
+ def DelQueue(self, args):
+ if len(args) < 1:
+ Usage()
+ qname = args[0]
+ self.broker.getAmqpSession().queue_delete(queue=qname, if_empty=config._if_empty, if_unused=config._if_unused)
+
+
+ def Bind(self, args):
+ if len(args) < 2:
+ Usage()
+ ename = args[0]
+ qname = args[1]
+ key = ""
+ if len(args) > 2:
+ key = args[2]
+
+ # query the exchange to determine its type.
+ res = self.broker.getAmqpSession().exchange_query(ename)
+
+ # type of the xchg determines the processing of the rest of
+ # argv. if it's an xml xchg, we want to find a file
+ # containing an x-query, and pass that. if it's a headers
+ # exchange, we need to pass either "any" or all, followed by a
+ # map containing key/value pairs. if neither of those, extra
+ # args are ignored.
+ ok = True
+ _args = None
+ if res.type == "xml":
+ # this checks/imports the -f arg
+ [ok, xquery] = snarf_xquery_args()
+ _args = { "xquery" : xquery }
+ else:
+ if res.type == "headers":
+ [ok, op, kv] = snarf_header_args(args[3:])
+ _args = kv
+ _args["x-match"] = op
+
+ if not ok:
+ sys.exit(1)
+
+ self.broker.getAmqpSession().exchange_bind(queue=qname,
+ exchange=ename,
+ binding_key=key,
+ arguments=_args)
+
+ def Unbind(self, args):
+ if len(args) < 2:
+ Usage()
+ ename = args[0]
+ qname = args[1]
+ key = ""
+ if len(args) > 2:
+ key = args[2]
+ self.broker.getAmqpSession().exchange_unbind(queue=qname, exchange=ename, binding_key=key)
+
+ def findById(self, items, id):
+ for item in items:
+ if item.getObjectId() == id:
+ return item
+ return None
+
+ def match(self, name, filter):
+ if filter == "":
+ return True
+ if name.find(filter) == -1:
+ return False
+ return True
+
+def YN(bool):
+ if bool:
+ return 'Y'
+ return 'N'
+
+
+def main(argv=None):
+ args = OptionsAndArguments(argv)
+ bm = BrokerManager()
+
+ try:
+ bm.SetBroker(config._host, config._sasl_mechanism)
+ if len(args) == 0:
+ bm.Overview()
+ else:
+ cmd = args[0]
+ modifier = ""
+ if len(args) > 1:
+ modifier = args[1]
+ if cmd == "exchanges":
+ if config._recursive:
+ bm.ExchangeListRecurse(modifier)
+ else:
+ bm.ExchangeList(modifier)
+ elif cmd == "queues":
+ if config._recursive:
+ bm.QueueListRecurse(modifier)
+ else:
+ bm.QueueList(modifier)
+ elif cmd == "add":
+ if modifier == "exchange":
+ bm.AddExchange(args[2:])
+ elif modifier == "queue":
+ bm.AddQueue(args[2:])
+ else:
+ Usage()
+ elif cmd == "del":
+ if modifier == "exchange":
+ bm.DelExchange(args[2:])
+ elif modifier == "queue":
+ bm.DelQueue(args[2:])
+ else:
+ Usage()
+ elif cmd == "bind":
+ bm.Bind(args[1:])
+ elif cmd == "unbind":
+ bm.Unbind(args[1:])
+ else:
+ Usage()
+ except KeyboardInterrupt:
+ print
+ except IOError, e:
+ print e
+ bm.Disconnect()
+ return 1
+ except SystemExit, e:
+ bm.Disconnect()
+ return 1
+ except Exception,e:
+ if e.__class__.__name__ != "Timeout":
+ # ignore Timeout exception, handle in the loop below
+ print "Failed: %s: %s" % (e.__class__.__name__, e)
+ bm.Disconnect()
+ return 1
+
+ while True:
+ # some commands take longer than the default amqp timeout to complete,
+ # so attempt to disconnect until successful, ignoring Timeouts
+ try:
+ bm.Disconnect()
+ break
+ except Exception, e:
+ if e.__class__.__name__ != "Timeout":
+ print "Failed: %s: %s" % (e.__class__.__name__, e)
+ return 1
+
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())
+
diff --git a/qpid/tools/src/py/qpid-printevents b/qpid/tools/src/py/qpid-printevents
new file mode 100755
index 0000000000..d56d2899b1
--- /dev/null
+++ b/qpid/tools/src/py/qpid-printevents
@@ -0,0 +1,114 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import optparse
+from optparse import IndentedHelpFormatter
+import sys
+import socket
+from time import time, strftime, gmtime, sleep
+from qmf.console import Console, Session
+
+
+class EventConsole(Console):
+ def event(self, broker, event):
+ print event
+ sys.stdout.flush()
+
+ def brokerConnected(self, broker):
+ print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerConnected broker=%s" % broker.getUrl()
+ sys.stdout.flush()
+
+ def brokerConnectionFailed(self, broker):
+ print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerConnectionFailed broker=%s %s" % (broker.getUrl(), str(broker.conn_exc))
+ sys.stdout.flush()
+
+ def brokerDisconnected(self, broker):
+ print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerDisconnected broker=%s" % broker.getUrl()
+ sys.stdout.flush()
+
+class JHelpFormatter(IndentedHelpFormatter):
+ """Format usage and description without stripping newlines from usage strings
+ """
+
+ def format_usage(self, usage):
+ return usage
+
+ def format_description(self, description):
+ if description:
+ return description + "\n"
+ else:
+ return ""
+
+_usage = "%prog [options] [broker-addr]..."
+
+_description = \
+"""
+Collect and print events from one or more Qpid message brokers.
+
+If no broker-addr is supplied, %prog connects to 'localhost:5672'.
+
+[broker-addr] syntax:
+
+ [username/password@] hostname
+ ip-address [:<port>]
+
+Examples:
+
+$ %prog localhost:5672
+$ %prog 10.1.1.7:10000
+$ %prog guest/guest@broker-host:10000
+"""
+
+def main(argv=None):
+ p = optparse.OptionParser(usage=_usage, description=_description, formatter=JHelpFormatter())
+ p.add_option("--heartbeats", action="store_true", default=False, help="Use heartbeats.")
+ p.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+
+ options, arguments = p.parse_args(args=argv)
+ if len(arguments) == 0:
+ arguments.append("localhost")
+
+ console = EventConsole()
+ session = Session(console, rcvObjects=False, rcvHeartbeats=options.heartbeats, manageConnections=True)
+ brokers = []
+ try:
+ try:
+ for host in arguments:
+ brokers.append(session.addBroker(host, None, options.sasl_mechanism))
+
+ while (True):
+ sleep(10)
+
+ except KeyboardInterrupt:
+ print
+ return 0
+
+ except Exception, e:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+ return 1
+ finally:
+ while len(brokers):
+ b = brokers.pop()
+ session.delBroker(b)
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/qpid/tools/src/py/qpid-queue-stats b/qpid/tools/src/py/qpid-queue-stats
new file mode 100755
index 0000000000..54f22dfc42
--- /dev/null
+++ b/qpid/tools/src/py/qpid-queue-stats
@@ -0,0 +1,145 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import optparse
+import sys
+import re
+import socket
+import qpid
+from threading import Condition
+from qmf.console import Session, Console
+from qpid.peer import Closed
+from qpid.connection import Connection, ConnectionFailed
+from time import sleep
+
+class BrokerManager(Console):
+ def __init__(self, host, mechanism):
+ self.url = host
+ self.objects = {}
+ self.filter = None
+ self.session = Session(self, rcvEvents=False, rcvHeartbeats=False,
+ userBindings=True, manageConnections=True)
+ self.broker = self.session.addBroker(self.url, None, mechanism)
+ self.firstError = True
+
+ def setFilter(self,filter):
+ self.filter = filter
+
+ def brokerConnected(self, broker):
+ if not self.firstError:
+ print "*** Broker connected"
+ self.firstError = False
+
+ def brokerDisconnected(self, broker):
+ print "*** Broker connection lost - %s, retrying..." % broker.getError()
+ self.firstError = False
+ self.objects.clear()
+
+ def objectProps(self, broker, record):
+ className = record.getClassKey().getClassName()
+ if className != "queue":
+ return
+
+ id = record.getObjectId().__repr__()
+ if id not in self.objects:
+ self.objects[id] = (record.name, None, None)
+
+ def objectStats(self, broker, record):
+ className = record.getClassKey().getClassName()
+ if className != "queue":
+ return
+
+ id = record.getObjectId().__repr__()
+ if id not in self.objects:
+ return
+
+ (name, first, last) = self.objects[id]
+ if first == None:
+ self.objects[id] = (name, record, None)
+ return
+
+ if len(self.filter) > 0 :
+ match = False
+
+ for x in self.filter:
+ if x.match(name):
+ match = True
+ break
+ if match == False:
+ return
+
+ if last == None:
+ lastSample = first
+ else:
+ lastSample = last
+
+ self.objects[id] = (name, first, record)
+
+ deltaTime = float (record.getTimestamps()[0] - lastSample.getTimestamps()[0])
+ if deltaTime < 1000000000.0:
+ return
+ enqueueRate = float (record.msgTotalEnqueues - lastSample.msgTotalEnqueues) / \
+ (deltaTime / 1000000000.0)
+ dequeueRate = float (record.msgTotalDequeues - lastSample.msgTotalDequeues) / \
+ (deltaTime / 1000000000.0)
+ print "%-41s%10.2f%11d%13.2f%13.2f" % \
+ (name, deltaTime / 1000000000, record.msgDepth, enqueueRate, dequeueRate)
+ sys.stdout.flush()
+
+
+ def Display (self):
+ self.session.bindClass("org.apache.qpid.broker", "queue")
+ print "Queue Name Sec Depth Enq Rate Deq Rate"
+ print "========================================================================================"
+ sys.stdout.flush()
+ try:
+ while True:
+ sleep (1)
+ if self.firstError and self.broker.getError():
+ self.firstError = False
+ print "*** Error: %s, retrying..." % self.broker.getError()
+ except KeyboardInterrupt:
+ print
+ self.session.delBroker(self.broker)
+
+def main(argv=None):
+ p = optparse.OptionParser()
+ p.add_option('--broker-address','-a', default='localhost' , help='broker-addr is in the form: [username/password@] hostname | ip-address [:<port>] \n ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost')
+ p.add_option('--filter','-f' ,default=None ,help='a list of comma separated queue names (regex are accepted) to show')
+ p.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+
+
+ options, arguments = p.parse_args(args=argv)
+
+ host = options.broker_address
+ filter = []
+ if options.filter != None:
+ for s in options.filter.split(","):
+ filter.append(re.compile(s))
+
+ bm = BrokerManager(host, options.sasl_mechanism)
+ bm.setFilter(filter)
+ bm.Display()
+
+if __name__ == '__main__':
+ sys.exit(main())
+
diff --git a/qpid/tools/src/py/qpid-route b/qpid/tools/src/py/qpid-route
new file mode 100755
index 0000000000..d98cefd618
--- /dev/null
+++ b/qpid/tools/src/py/qpid-route
@@ -0,0 +1,597 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from optparse import OptionParser, OptionGroup, IndentedHelpFormatter
+import sys
+import socket
+import os
+import locale
+from qmf.console import Session, BrokerURL
+
+usage = """
+Usage: qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list] [mechanism]
+ qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange>
+
+ qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list] [mechanism]
+ qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key>
+ qpid-route [OPTIONS] queue add <dest-broker> <src-broker> <exchange> <queue> [mechanism]
+ qpid-route [OPTIONS] queue del <dest-broker> <src-broker> <exchange> <queue>
+ qpid-route [OPTIONS] route list [<dest-broker>]
+ qpid-route [OPTIONS] route flush [<dest-broker>]
+ qpid-route [OPTIONS] route map [<broker>]
+
+ qpid-route [OPTIONS] link add <dest-broker> <src-broker> [mechanism]
+ qpid-route [OPTIONS] link del <dest-broker> <src-broker>
+ qpid-route [OPTIONS] link list [<dest-broker>]"""
+
+description = """
+ADDRESS syntax:
+
+ [username/password@] hostname
+ ip-address [:<port>]"""
+
+def Usage():
+ print usage
+
+class Config:
+ def __init__(self):
+ self._verbose = False
+ self._quiet = False
+ self._durable = False
+ self._dellink = False
+ self._srclocal = False
+ self._transport = "tcp"
+ self._ack = 0
+ self._connTimeout = 10
+ self._client_sasl_mechanism = None
+
+config = Config()
+
+class JHelpFormatter(IndentedHelpFormatter):
+ """Format usage and description without stripping newlines from usage strings
+ """
+
+ def format_usage(self, usage):
+ return usage
+
+
+ def format_description(self, description):
+ if description:
+ return description + "\n"
+ else:
+ return ""
+
+def OptionsAndArguments(argv):
+ parser = OptionParser(usage=usage,
+ description=description,
+ formatter=JHelpFormatter())
+
+ parser.add_option("--timeout", action="store", type="int", default=10, metavar="<secs>", help="Maximum time to wait for broker connection (in seconds)")
+ parser.add_option("-v", "--verbose", action="store_true", help="Verbose output")
+ parser.add_option("-q", "--quiet", action="store_true", help="Quiet output, don't print duplicate warnings")
+ parser.add_option("-d", "--durable", action="store_true", help="Added configuration shall be durable")
+
+ parser.add_option("-e", "--del-empty-link", action="store_true", help="Delete link after deleting last route on the link")
+ parser.add_option("-s", "--src-local", action="store_true", help="Make connection to source broker (push route)")
+
+ parser.add_option("--ack", action="store", type="int", metavar="<n>", help="Acknowledge transfers over the bridge in batches of N")
+ parser.add_option("-t", "--transport", action="store", type="string", default="tcp", metavar="<transport>", help="Transport to use for links, defaults to tcp")
+
+ parser.add_option("--client-sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). Used when the client connects to the destination broker (not for authentication between the source and destination brokers - that is specified using the [mechanisms] argument to 'add route'). SASL automatically picks the most secure available mechanism - use this option to override.")
+
+ opts, encArgs = parser.parse_args(args=argv)
+
+ try:
+ encoding = locale.getpreferredencoding()
+ args = [a.decode(encoding) for a in encArgs]
+ except:
+ args = encArgs
+
+ if opts.timeout:
+ config._connTimeout = opts.timeout
+ if config._connTimeout == 0:
+ config._connTimeout = None
+
+ if opts.verbose:
+ config._verbose = True
+
+ if opts.quiet:
+ config._quiet = True
+
+ if opts.durable:
+ config._durable = True
+
+ if opts.del_empty_link:
+ config._dellink = True
+
+ if opts.src_local:
+ config._srclocal = True
+
+ if opts.transport:
+ config._transport = opts.transport
+
+ if opts.ack:
+ config._ack = opts.ack
+
+ if opts.client_sasl_mechanism:
+ config._client_sasl_mechanism = opts.client_sasl_mechanism
+
+ return args
+
+
+class RouteManager:
+ def __init__(self, localBroker):
+ self.brokerList = {}
+ self.local = BrokerURL(localBroker)
+ self.remote = None
+ self.qmf = Session()
+ self.broker = self.qmf.addBroker(localBroker, config._connTimeout, config._client_sasl_mechanism)
+ self.broker._waitForStable()
+ self.agent = self.broker.getBrokerAgent()
+
+ def disconnect(self):
+ try:
+ if self.broker:
+ self.qmf.delBroker(self.broker)
+ self.broker = None
+ while len(self.brokerList):
+ b = self.brokerList.popitem()
+ if b[0] != self.local.name():
+ self.qmf.delBroker(b[1])
+ except:
+ pass # ignore errors while shutting down
+
+ def getLink(self):
+ links = self.agent.getObjects(_class="link")
+ for link in links:
+ if self.remote.match(link.host, link.port):
+ return link
+ return None
+
+ def addLink(self, remoteBroker, interbroker_mechanism=""):
+ self.remote = BrokerURL(remoteBroker)
+ if self.local.match(self.remote.host, self.remote.port):
+ raise Exception("Linking broker to itself is not permitted")
+
+ brokers = self.agent.getObjects(_class="broker")
+ broker = brokers[0]
+ link = self.getLink()
+ if link == None:
+ res = broker.connect(self.remote.host, self.remote.port, config._durable,
+ interbroker_mechanism, self.remote.authName or "", self.remote.authPass or "",
+ config._transport)
+ if config._verbose:
+ print "Connect method returned:", res.status, res.text
+
+ def delLink(self, remoteBroker):
+ self.remote = BrokerURL(remoteBroker)
+ brokers = self.agent.getObjects(_class="broker")
+ broker = brokers[0]
+ link = self.getLink()
+ if link == None:
+ raise Exception("Link not found")
+
+ res = link.close()
+ if config._verbose:
+ print "Close method returned:", res.status, res.text
+
+ def listLinks(self):
+ links = self.agent.getObjects(_class="link")
+ if len(links) == 0:
+ print "No Links Found"
+ else:
+ print
+ print "Host Port Transport Durable State Last Error"
+ print "============================================================================="
+ for link in links:
+ print "%-16s%-8d%-13s%c %-18s%s" % \
+ (link.host, link.port, link.transport, YN(link.durable), link.state, link.lastError)
+
+ def mapRoutes(self):
+ print
+ print "Finding Linked Brokers:"
+
+ self.brokerList[self.local.name()] = self.broker
+ print " %s... Ok" % self.local
+
+ added = True
+ while added:
+ added = False
+ links = self.qmf.getObjects(_class="link")
+ for link in links:
+ url = BrokerURL("%s:%d" % (link.host, link.port))
+ if url.name() not in self.brokerList:
+ print " %s..." % url.name(),
+ try:
+ b = self.qmf.addBroker("%s:%d" % (link.host, link.port), config._connTimeout)
+ self.brokerList[url.name()] = b
+ added = True
+ print "Ok"
+ except Exception, e:
+ print e
+
+ print
+ print "Dynamic Routes:"
+ bridges = self.qmf.getObjects(_class="bridge", dynamic=True)
+ fedExchanges = []
+ for bridge in bridges:
+ if bridge.src not in fedExchanges:
+ fedExchanges.append(bridge.src)
+ if len(fedExchanges) == 0:
+ print " none found"
+ print
+
+ for ex in fedExchanges:
+ print " Exchange %s:" % ex
+ pairs = []
+ for bridge in bridges:
+ if bridge.src == ex:
+ link = bridge._linkRef_
+ fromUrl = "%s:%s" % (link.host, link.port)
+ toUrl = bridge.getBroker().getUrl()
+ found = False
+ for pair in pairs:
+ if pair.matches(fromUrl, toUrl):
+ found = True
+ if not found:
+ pairs.append(RoutePair(fromUrl, toUrl))
+ for pair in pairs:
+ print " %s" % pair
+ print
+
+ print "Static Routes:"
+ bridges = self.qmf.getObjects(_class="bridge", dynamic=False)
+ if len(bridges) == 0:
+ print " none found"
+ print
+
+ for bridge in bridges:
+ link = bridge._linkRef_
+ fromUrl = "%s:%s" % (link.host, link.port)
+ toUrl = bridge.getBroker().getUrl()
+ leftType = "ex"
+ rightType = "ex"
+ if bridge.srcIsLocal:
+ arrow = "=>"
+ left = bridge.src
+ right = bridge.dest
+ if bridge.srcIsQueue:
+ leftType = "queue"
+ else:
+ arrow = "<="
+ left = bridge.dest
+ right = bridge.src
+ if bridge.srcIsQueue:
+ rightType = "queue"
+
+ if bridge.srcIsQueue:
+ print " %s(%s=%s) %s %s(%s=%s)" % \
+ (toUrl, leftType, left, arrow, fromUrl, rightType, right)
+ else:
+ print " %s(%s=%s) %s %s(%s=%s) key=%s" % \
+ (toUrl, leftType, left, arrow, fromUrl, rightType, right, bridge.key)
+ print
+
+ while len(self.brokerList):
+ b = self.brokerList.popitem()
+ if b[0] != self.local.name():
+ self.qmf.delBroker(b[1])
+
+ def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, interbroker_mechanism="", dynamic=False):
+ if dynamic and config._srclocal:
+ raise Exception("--src-local is not permitted on dynamic routes")
+
+ self.addLink(remoteBroker, interbroker_mechanism)
+ link = self.getLink()
+ if link == None:
+ raise Exception("Link failed to create")
+
+ bridges = self.agent.getObjects(_class="bridge")
+ for bridge in bridges:
+ if bridge.linkRef == link.getObjectId() and \
+ bridge.dest == exchange and bridge.key == routingKey and not bridge.srcIsQueue:
+ if not config._quiet:
+ raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey))
+ sys.exit(0)
+
+ if config._verbose:
+ print "Creating inter-broker binding..."
+ res = link.bridge(config._durable, exchange, exchange, routingKey, tag, excludes, False, config._srclocal, dynamic, config._ack)
+ if res.status != 0:
+ raise Exception(res.text)
+ if config._verbose:
+ print "Bridge method returned:", res.status, res.text
+
+ def addQueueRoute(self, remoteBroker, interbroker_mechanism, exchange, queue ):
+ self.addLink(remoteBroker, interbroker_mechanism)
+ link = self.getLink()
+ if link == None:
+ raise Exception("Link failed to create")
+
+ bridges = self.agent.getObjects(_class="bridge")
+ for bridge in bridges:
+ if bridge.linkRef == link.getObjectId() and \
+ bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue:
+ if not config._quiet:
+ raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, queue))
+ sys.exit(0)
+
+ if config._verbose:
+ print "Creating inter-broker binding..."
+ res = link.bridge(config._durable, queue, exchange, "", "", "", True, config._srclocal, False, config._ack)
+ if res.status != 0:
+ raise Exception(res.text)
+ if config._verbose:
+ print "Bridge method returned:", res.status, res.text
+
+ def delQueueRoute(self, remoteBroker, exchange, queue):
+ self.remote = BrokerURL(remoteBroker)
+ link = self.getLink()
+ if link == None:
+ if not config._quiet:
+ raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name()))
+ sys.exit(0)
+
+ bridges = self.agent.getObjects(_class="bridge")
+ for bridge in bridges:
+ if bridge.linkRef == link.getObjectId() and \
+ bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue:
+ if config._verbose:
+ print "Closing bridge..."
+ res = bridge.close()
+ if res.status != 0:
+ raise Exception("Error closing bridge: %d - %s" % (res.status, res.text))
+ if len(bridges) == 1 and config._dellink:
+ link = self.getLink()
+ if link == None:
+ sys.exit(0)
+ if config._verbose:
+ print "Last bridge on link, closing link..."
+ res = link.close()
+ if res.status != 0:
+ raise Exception("Error closing link: %d - %s" % (res.status, res.text))
+ sys.exit(0)
+ if not config._quiet:
+ raise Exception("Route not found")
+
+ def delRoute(self, remoteBroker, exchange, routingKey, dynamic=False):
+ self.remote = BrokerURL(remoteBroker)
+ link = self.getLink()
+ if link == None:
+ if not config._quiet:
+ raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name()))
+ sys.exit(0)
+
+ bridges = self.agent.getObjects(_class="bridge")
+ for bridge in bridges:
+ if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey \
+ and bridge.dynamic == dynamic:
+ if config._verbose:
+ print "Closing bridge..."
+ res = bridge.close()
+ if res.status != 0:
+ raise Exception("Error closing bridge: %d - %s" % (res.status, res.text))
+ if len(bridges) == 1 and config._dellink:
+ link = self.getLink()
+ if link == None:
+ sys.exit(0)
+ if config._verbose:
+ print "Last bridge on link, closing link..."
+ res = link.close()
+ if res.status != 0:
+ raise Exception("Error closing link: %d - %s" % (res.status, res.text))
+ return
+ if not config._quiet:
+ raise Exception("Route not found")
+
+ def listRoutes(self):
+ links = self.qmf.getObjects(_class="link")
+ bridges = self.qmf.getObjects(_class="bridge")
+
+ for bridge in bridges:
+ myLink = None
+ for link in links:
+ if bridge.linkRef == link.getObjectId():
+ myLink = link
+ break
+ if myLink != None:
+ if bridge.dynamic:
+ keyText = "<dynamic>"
+ else:
+ keyText = bridge.key
+ print "%s %s:%d %s %s" % (self.local.name(), myLink.host, myLink.port, bridge.dest, keyText)
+
+ def clearAllRoutes(self):
+ links = self.qmf.getObjects(_class="link")
+ bridges = self.qmf.getObjects(_class="bridge")
+
+ for bridge in bridges:
+ if config._verbose:
+ myLink = None
+ for link in links:
+ if bridge.linkRef == link.getObjectId():
+ myLink = link
+ break
+ if myLink != None:
+ print "Deleting Bridge: %s:%d %s %s... " % (myLink.host, myLink.port, bridge.dest, bridge.key),
+ res = bridge.close()
+ if res.status != 0:
+ print "Error: %d - %s" % (res.status, res.text)
+ elif config._verbose:
+ print "Ok"
+
+ if config._dellink:
+ links = self.qmf.getObjects(_class="link")
+ for link in links:
+ if config._verbose:
+ print "Deleting Link: %s:%d... " % (link.host, link.port),
+ res = link.close()
+ if res.status != 0:
+ print "Error: %d - %s" % (res.status, res.text)
+ elif config._verbose:
+ print "Ok"
+
+class RoutePair:
+ def __init__(self, fromUrl, toUrl):
+ self.fromUrl = fromUrl
+ self.toUrl = toUrl
+ self.bidir = False
+
+ def __repr__(self):
+ if self.bidir:
+ delimit = "<=>"
+ else:
+ delimit = " =>"
+ return "%s %s %s" % (self.fromUrl, delimit, self.toUrl)
+
+ def matches(self, fromUrl, toUrl):
+ if fromUrl == self.fromUrl and toUrl == self.toUrl:
+ return True
+ if toUrl == self.fromUrl and fromUrl == self.toUrl:
+ self.bidir = True
+ return True
+ return False
+
+
+def YN(val):
+ if val == 1:
+ return 'Y'
+ return 'N'
+
+
+def main(argv=None):
+
+ args = OptionsAndArguments(argv)
+ nargs = len(args)
+ if nargs < 2:
+ Usage()
+ return(-1)
+
+ if nargs == 2:
+ localBroker = socket.gethostname()
+ else:
+ if config._srclocal:
+ localBroker = args[3]
+ remoteBroker = args[2]
+ else:
+ localBroker = args[2]
+ if nargs > 3:
+ remoteBroker = args[3]
+
+ group = args[0]
+ cmd = args[1]
+
+ rm = None
+ try:
+ rm = RouteManager(localBroker)
+ if group == "link":
+ if cmd == "add":
+ if nargs < 3 or nargs > 5:
+ Usage()
+ return(-1)
+ interbroker_mechanism = ""
+ if nargs > 4: interbroker_mechanism = args[4]
+ rm.addLink(remoteBroker, interbroker_mechanism)
+ elif cmd == "del":
+ if nargs != 4:
+ Usage()
+ return(-1)
+ rm.delLink(remoteBroker)
+ elif cmd == "list":
+ rm.listLinks()
+
+ elif group == "dynamic":
+ if cmd == "add":
+ if nargs < 5 or nargs > 8:
+ Usage()
+ return(-1)
+
+ tag = ""
+ excludes = ""
+ interbroker_mechanism = ""
+ if nargs > 5: tag = args[5]
+ if nargs > 6: excludes = args[6]
+ if nargs > 7: interbroker_mechanism = args[7]
+ rm.addRoute(remoteBroker, args[4], "", tag, excludes, interbroker_mechanism, dynamic=True)
+ elif cmd == "del":
+ if nargs != 5:
+ Usage()
+ return(-1)
+ else:
+ rm.delRoute(remoteBroker, args[4], "", dynamic=True)
+
+ elif group == "route":
+ if cmd == "add":
+ if nargs < 6 or nargs > 9:
+ Usage()
+ return(-1)
+
+ tag = ""
+ excludes = ""
+ interbroker_mechanism = ""
+ if nargs > 6: tag = args[6]
+ if nargs > 7: excludes = args[7]
+ if nargs > 8: interbroker_mechanism = args[8]
+ rm.addRoute(remoteBroker, args[4], args[5], tag, excludes, interbroker_mechanism, dynamic=False)
+ elif cmd == "del":
+ if nargs != 6:
+ Usage()
+ return(-1)
+ rm.delRoute(remoteBroker, args[4], args[5], dynamic=False)
+ elif cmd == "map":
+ rm.mapRoutes()
+ else:
+ if cmd == "list":
+ rm.listRoutes()
+ elif cmd == "flush":
+ rm.clearAllRoutes()
+ else:
+ Usage()
+ return(-1)
+
+ elif group == "queue":
+ if nargs < 6 or nargs > 7:
+ Usage()
+ return(-1)
+ if cmd == "add":
+ interbroker_mechanism = ""
+ if nargs > 6: interbroker_mechanism = args[6]
+ rm.addQueueRoute(remoteBroker, interbroker_mechanism, exchange=args[4], queue=args[5] )
+ elif cmd == "del":
+ rm.delQueueRoute(remoteBroker, exchange=args[4], queue=args[5])
+ else:
+ Usage()
+ return(-1)
+ else:
+ Usage()
+ return(-1)
+
+ except Exception,e:
+ if rm:
+ rm.disconnect() # try to release broker resources
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+ return 1
+
+ rm.disconnect()
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/qpid/tools/src/py/qpid-stat b/qpid/tools/src/py/qpid-stat
new file mode 100755
index 0000000000..ce3f5d1ef5
--- /dev/null
+++ b/qpid/tools/src/py/qpid-stat
@@ -0,0 +1,521 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+from optparse import OptionParser, OptionGroup
+from time import sleep ### debug
+import sys
+import locale
+import socket
+import re
+from qmf.console import Session, Console
+from qpid.disp import Display, Header, Sorter
+
+class Config:
+ def __init__(self):
+ self._host = "localhost"
+ self._connTimeout = 10
+ self._types = ""
+ self._limit = 50
+ self._increasing = False
+ self._sortcol = None
+ self._cluster_detail = False
+ self._sasl_mechanism = None
+
+config = Config()
+
+def OptionsAndArguments(argv):
+ """ Set global variables for options, return arguments """
+
+ global config
+
+ parser = OptionParser(usage="usage: %prog [options] BROKER",
+ description="Example: $ qpid-stat -q broker-host:10000")
+
+ group1 = OptionGroup(parser, "General Options")
+ group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", help="Maximum time to wait for broker connection (in seconds)")
+ group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+ parser.add_option_group(group1)
+
+ group2 = OptionGroup(parser, "Display Options")
+ group2.add_option("-b", "--broker", help="Show Brokers",
+ action="store_const", const="b", dest="show")
+ group2.add_option("-c", "--connections", help="Show Connections",
+ action="store_const", const="c", dest="show")
+ group2.add_option("-e", "--exchanges", help="Show Exchanges",
+ action="store_const", const="e", dest="show")
+ group2.add_option("-q", "--queues", help="Show Queues",
+ action="store_const", const="q", dest="show")
+ group2.add_option("-u", "--subscriptions", help="Show Subscriptions",
+ action="store_const", const="u", dest="show")
+ group2.add_option("-S", "--sort-by", metavar="<colname>",
+ help="Sort by column name")
+ group2.add_option("-I", "--increasing", action="store_true", default=False,
+ help="Sort by increasing value (default = decreasing)")
+ group2.add_option("-L", "--limit", default=50, metavar="<n>",
+ help="Limit output to n rows")
+ group2.add_option("-C", "--cluster", action="store_true", default=False,
+ help="Display per-broker cluster detail.")
+ parser.add_option_group(group2)
+
+ opts, args = parser.parse_args(args=argv)
+
+ if not opts.show:
+ parser.error("You must specify one of these options: -b, -c, -e, -q. or -u. For details, try $ qpid-stat --help")
+
+ config._types = opts.show
+ config._sortcol = opts.sort_by
+ config._connTimeout = opts.timeout
+ config._increasing = opts.increasing
+ config._limit = opts.limit
+ config._cluster_detail = opts.cluster
+ config._sasl_mechanism = opts.sasl_mechanism
+
+ if args:
+ config._host = args[0]
+
+ return args
+
+class IpAddr:
+ def __init__(self, text):
+ if text.find("@") != -1:
+ tokens = text.split("@")
+ text = tokens[1]
+ if text.find(":") != -1:
+ tokens = text.split(":")
+ text = tokens[0]
+ self.port = int(tokens[1])
+ else:
+ self.port = 5672
+ self.dottedQuad = socket.gethostbyname(text)
+ nums = self.dottedQuad.split(".")
+ self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])
+
+ def bestAddr(self, addrPortList):
+ bestDiff = 0xFFFFFFFFL
+ bestAddr = None
+ for addrPort in addrPortList:
+ diff = IpAddr(addrPort[0]).addr ^ self.addr
+ if diff < bestDiff:
+ bestDiff = diff
+ bestAddr = addrPort
+ return bestAddr
+
+class Broker(object):
+ def __init__(self, qmf, broker):
+ self.broker = broker
+
+ agents = qmf.getAgents()
+ for a in agents:
+ if a.getAgentBank() == '0':
+ self.brokerAgent = a
+
+ bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _agent=self.brokerAgent)[0]
+ self.currentTime = bobj.getTimestamps()[0]
+ try:
+ self.uptime = bobj.uptime
+ except:
+ self.uptime = 0
+ self.connections = {}
+ self.sessions = {}
+ self.exchanges = {}
+ self.queues = {}
+ self.subscriptions = {}
+ package = "org.apache.qpid.broker"
+
+ list = qmf.getObjects(_class="connection", _package=package, _agent=self.brokerAgent)
+ for conn in list:
+ if not conn.shadow:
+ self.connections[conn.getObjectId()] = conn
+
+ list = qmf.getObjects(_class="session", _package=package, _agent=self.brokerAgent)
+ for sess in list:
+ if sess.connectionRef in self.connections:
+ self.sessions[sess.getObjectId()] = sess
+
+ list = qmf.getObjects(_class="exchange", _package=package, _agent=self.brokerAgent)
+ for exchange in list:
+ self.exchanges[exchange.getObjectId()] = exchange
+
+ list = qmf.getObjects(_class="queue", _package=package, _agent=self.brokerAgent)
+ for queue in list:
+ self.queues[queue.getObjectId()] = queue
+
+ list = qmf.getObjects(_class="subscription", _package=package, _agent=self.brokerAgent)
+ for subscription in list:
+ self.subscriptions[subscription.getObjectId()] = subscription
+
+ def getName(self):
+ return self.broker.getUrl()
+
+ def getCurrentTime(self):
+ return self.currentTime
+
+ def getUptime(self):
+ return self.uptime
+
+class BrokerManager(Console):
+ def __init__(self):
+ self.brokerName = None
+ self.qmf = None
+ self.broker = None
+ self.brokers = []
+ self.cluster = None
+
+ def SetBroker(self, brokerUrl, mechanism):
+ self.url = brokerUrl
+ self.qmf = Session()
+ self.mechanism = mechanism
+ self.broker = self.qmf.addBroker(brokerUrl, config._connTimeout, mechanism)
+ agents = self.qmf.getAgents()
+ for a in agents:
+ if a.getAgentBank() == '0':
+ self.brokerAgent = a
+
+ def Disconnect(self):
+ """ Release any allocated brokers. Ignore any failures as the tool is
+ shutting down.
+ """
+ try:
+ if self.broker:
+ self.qmf.delBroker(self.broker)
+ else:
+ for b in self.brokers: self.qmf.delBroker(b.broker)
+ except:
+ pass
+
+ def _getCluster(self):
+ packages = self.qmf.getPackages()
+ if "org.apache.qpid.cluster" not in packages:
+ return None
+
+ clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
+ if len(clusters) == 0:
+ print "Clustering is installed but not enabled on the broker."
+ return None
+
+ self.cluster = clusters[0]
+
+ def _getHostList(self, urlList):
+ hosts = []
+ hostAddr = IpAddr(config._host)
+ for url in urlList:
+ if url.find("amqp:") != 0:
+ raise Exception("Invalid URL 1")
+ url = url[5:]
+ addrs = str(url).split(",")
+ addrList = []
+ for addr in addrs:
+ tokens = addr.split(":")
+ if len(tokens) != 3:
+ raise Exception("Invalid URL 2")
+ addrList.append((tokens[1], tokens[2]))
+
+ # Find the address in the list that is most likely to be in the same subnet as the address
+ # with which we made the original QMF connection. This increases the probability that we will
+ # be able to reach the cluster member.
+
+ best = hostAddr.bestAddr(addrList)
+ bestUrl = best[0] + ":" + best[1]
+ hosts.append(bestUrl)
+ return hosts
+
+ def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, exchange=None, queue=None):
+ if len(subs) == 0:
+ return
+ this = subs[0]
+ remaining = subs[1:]
+ newindent = indent + " "
+ if this == 'b':
+ pass
+ elif this == 'c':
+ if broker:
+ for oid in broker.connections:
+ iconn = broker.connections[oid]
+ self.printConnSub(indent, broker.getName(), iconn)
+ self.displaySubs(remaining, newindent, broker=broker, conn=iconn,
+ sess=sess, exchange=exchange, queue=queue)
+ elif this == 's':
+ pass
+ elif this == 'e':
+ pass
+ elif this == 'q':
+ pass
+ print
+
+ def displayBroker(self, subs):
+ disp = Display(prefix=" ")
+ heads = []
+ heads.append(Header('broker'))
+ heads.append(Header('cluster'))
+ heads.append(Header('uptime', Header.DURATION))
+ heads.append(Header('conn', Header.KMG))
+ heads.append(Header('sess', Header.KMG))
+ heads.append(Header('exch', Header.KMG))
+ heads.append(Header('queue', Header.KMG))
+ rows = []
+ for broker in self.brokers:
+ if self.cluster:
+ ctext = "%s(%s)" % (self.cluster.clusterName, self.cluster.status)
+ else:
+ ctext = "<standalone>"
+ row = (broker.getName(), ctext, broker.getUptime(),
+ len(broker.connections), len(broker.sessions),
+ len(broker.exchanges), len(broker.queues))
+ rows.append(row)
+ title = "Brokers"
+ if config._sortcol:
+ sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+ def displayConn(self, subs):
+ disp = Display(prefix=" ")
+ heads = []
+ if self.cluster:
+ heads.append(Header('broker'))
+ heads.append(Header('client-addr'))
+ heads.append(Header('cproc'))
+ heads.append(Header('cpid'))
+ heads.append(Header('auth'))
+ heads.append(Header('connected', Header.DURATION))
+ heads.append(Header('idle', Header.DURATION))
+ heads.append(Header('msgIn', Header.KMG))
+ heads.append(Header('msgOut', Header.KMG))
+ rows = []
+ for broker in self.brokers:
+ for oid in broker.connections:
+ conn = broker.connections[oid]
+ row = []
+ if self.cluster:
+ row.append(broker.getName())
+ row.append(conn.address)
+ row.append(conn.remoteProcessName)
+ row.append(conn.remotePid)
+ row.append(conn.authIdentity)
+ row.append(broker.getCurrentTime() - conn.getTimestamps()[1])
+ idle = broker.getCurrentTime() - conn.getTimestamps()[0]
+ row.append(broker.getCurrentTime() - conn.getTimestamps()[0])
+ row.append(conn.framesFromClient)
+ row.append(conn.framesToClient)
+ rows.append(row)
+ title = "Connections"
+ if self.cluster:
+ title += " for cluster '%s'" % self.cluster.clusterName
+ if config._sortcol:
+ sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+ def displaySession(self, subs):
+ disp = Display(prefix=" ")
+
+ def displayExchange(self, subs):
+ disp = Display(prefix=" ")
+ heads = []
+ if self.cluster:
+ heads.append(Header('broker'))
+ heads.append(Header("exchange"))
+ heads.append(Header("type"))
+ heads.append(Header("dur", Header.Y))
+ heads.append(Header("bind", Header.KMG))
+ heads.append(Header("msgIn", Header.KMG))
+ heads.append(Header("msgOut", Header.KMG))
+ heads.append(Header("msgDrop", Header.KMG))
+ heads.append(Header("byteIn", Header.KMG))
+ heads.append(Header("byteOut", Header.KMG))
+ heads.append(Header("byteDrop", Header.KMG))
+ rows = []
+ for broker in self.brokers:
+ for oid in broker.exchanges:
+ ex = broker.exchanges[oid]
+ row = []
+ if self.cluster:
+ row.append(broker.getName())
+ row.append(ex.name)
+ row.append(ex.type)
+ row.append(ex.durable)
+ row.append(ex.bindingCount)
+ row.append(ex.msgReceives)
+ row.append(ex.msgRoutes)
+ row.append(ex.msgDrops)
+ row.append(ex.byteReceives)
+ row.append(ex.byteRoutes)
+ row.append(ex.byteDrops)
+ rows.append(row)
+ title = "Exchanges"
+ if self.cluster:
+ title += " for cluster '%s'" % self.cluster.clusterName
+ if config._sortcol:
+ sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+ def displayQueue(self, subs):
+ disp = Display(prefix=" ")
+ heads = []
+ if self.cluster:
+ heads.append(Header('broker'))
+ heads.append(Header("queue"))
+ heads.append(Header("dur", Header.Y))
+ heads.append(Header("autoDel", Header.Y))
+ heads.append(Header("excl", Header.Y))
+ heads.append(Header("msg", Header.KMG))
+ heads.append(Header("msgIn", Header.KMG))
+ heads.append(Header("msgOut", Header.KMG))
+ heads.append(Header("bytes", Header.KMG))
+ heads.append(Header("bytesIn", Header.KMG))
+ heads.append(Header("bytesOut", Header.KMG))
+ heads.append(Header("cons", Header.KMG))
+ heads.append(Header("bind", Header.KMG))
+ rows = []
+ for broker in self.brokers:
+ for oid in broker.queues:
+ q = broker.queues[oid]
+ row = []
+ if self.cluster:
+ row.append(broker.getName())
+ row.append(q.name)
+ row.append(q.durable)
+ row.append(q.autoDelete)
+ row.append(q.exclusive)
+ row.append(q.msgDepth)
+ row.append(q.msgTotalEnqueues)
+ row.append(q.msgTotalDequeues)
+ row.append(q.byteDepth)
+ row.append(q.byteTotalEnqueues)
+ row.append(q.byteTotalDequeues)
+ row.append(q.consumerCount)
+ row.append(q.bindingCount)
+ rows.append(row)
+ title = "Queues"
+ if self.cluster:
+ title += " for cluster '%s'" % self.cluster.clusterName
+ if config._sortcol:
+ sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+ def displaySubscriptions(self, subs):
+ disp = Display(prefix=" ")
+ heads = []
+ if self.cluster:
+ heads.append(Header('broker'))
+ heads.append(Header("subscription"))
+ heads.append(Header("queue"))
+ heads.append(Header("connection"))
+ heads.append(Header("processName"))
+ heads.append(Header("processId"))
+ heads.append(Header("browsing", Header.Y))
+ heads.append(Header("acknowledged", Header.Y))
+ heads.append(Header("exclusive", Header.Y))
+ heads.append(Header("creditMode"))
+ heads.append(Header("delivered", Header.KMG))
+ rows = []
+ for broker in self.brokers:
+ for oid in broker.subscriptions:
+ s = broker.subscriptions[oid]
+ row = []
+ try:
+ if self.cluster:
+ row.append(broker.getName())
+ row.append(s.name)
+ row.append(self.qmf.getObjects(_objectId=s.queueRef)[0].name)
+ connectionRef = self.qmf.getObjects(_objectId=s.sessionRef)[0].connectionRef
+ row.append(self.qmf.getObjects(_objectId=connectionRef)[0].address)
+ row.append(self.qmf.getObjects(_objectId=connectionRef)[0].remoteProcessName)
+ row.append(self.qmf.getObjects(_objectId=connectionRef)[0].remotePid)
+ row.append(s.browsing)
+ row.append(s.acknowledged)
+ row.append(s.exclusive)
+ row.append(s.creditMode)
+ row.append(s.delivered)
+ rows.append(row)
+ except:
+ pass
+ title = "Subscriptions"
+ if self.cluster:
+ title += " for cluster '%s'" % self.cluster.clusterName
+ if config._sortcol:
+ sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+ def displayMain(self, main, subs):
+ if main == 'b': self.displayBroker(subs)
+ elif main == 'c': self.displayConn(subs)
+ elif main == 's': self.displaySession(subs)
+ elif main == 'e': self.displayExchange(subs)
+ elif main == 'q': self.displayQueue(subs)
+ elif main == 'u': self.displaySubscriptions(subs)
+
+ def display(self):
+ if config._cluster_detail or config._types[0] == 'b':
+ # always show cluster detail when dumping broker stats
+ self._getCluster()
+ if self.cluster:
+ memberList = self.cluster.members.split(";")
+ hostList = self._getHostList(memberList)
+ self.qmf.delBroker(self.broker)
+ self.broker = None
+ if config._host.find("@") > 0:
+ authString = config._host.split("@")[0] + "@"
+ else:
+ authString = ""
+ for host in hostList:
+ b = self.qmf.addBroker(authString + host, config._connTimeout)
+ self.brokers.append(Broker(self.qmf, b))
+ else:
+ self.brokers.append(Broker(self.qmf, self.broker))
+
+ self.displayMain(config._types[0], config._types[1:])
+
+
+def main(argv=None):
+
+ args = OptionsAndArguments(argv)
+ bm = BrokerManager()
+
+ try:
+ bm.SetBroker(config._host, config._sasl_mechanism)
+ bm.display()
+ bm.Disconnect()
+ return 0
+ except KeyboardInterrupt:
+ print
+ except Exception,e:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+
+ bm.Disconnect() # try to deallocate brokers
+ return 1
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/qpid/tools/src/py/qpid-tool b/qpid/tools/src/py/qpid-tool
new file mode 100755
index 0000000000..d6bb9bcaea
--- /dev/null
+++ b/qpid/tools/src/py/qpid-tool
@@ -0,0 +1,737 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import optparse
+import sys
+import socket
+from types import *
+from cmd import Cmd
+from shlex import split
+from threading import Lock
+from time import strftime, gmtime
+from qpid.disp import Display
+from qpid.peer import Closed
+from qmf.console import Session, Console, SchemaClass, ObjectId
+
+class Mcli(Cmd):
+ """ Management Command Interpreter """
+
+ def __init__(self, dataObject, dispObject):
+ Cmd.__init__(self)
+ self.dataObject = dataObject
+ self.dispObject = dispObject
+ self.dataObject.setCli(self)
+ self.prompt = "qpid: "
+
+ def emptyline(self):
+ pass
+
+ def setPromptMessage(self, p):
+ if p == None:
+ self.prompt = "qpid: "
+ else:
+ self.prompt = "qpid[%s]: " % p
+
+ def do_help(self, data):
+ print "Management Tool for QPID"
+ print
+ print "Commands:"
+ print " agents - Print a list of the known Agents"
+ print " list - Print summary of existing objects by class"
+ print " list <className> - Print list of objects of the specified class"
+ print " list <className> active - Print list of non-deleted objects of the specified class"
+# print " show <className> - Print contents of all objects of specified class"
+# print " show <className> active - Print contents of all non-deleted objects of specified class"
+ print " show <ID> - Print contents of an object (infer className)"
+# print " show <className> <list-of-IDs> - Print contents of one or more objects"
+# print " list is space-separated, ranges may be specified (i.e. 1004-1010)"
+ print " call <ID> <methodName> [<args>] - Invoke a method on an object"
+ print " schema - Print summary of object classes seen on the target"
+ print " schema <className> - Print details of an object class"
+ print " set time-format short - Select short timestamp format (default)"
+ print " set time-format long - Select long timestamp format"
+ print " quit or ^D - Exit the program"
+ print
+
+ def complete_set(self, text, line, begidx, endidx):
+ """ Command completion for the 'set' command """
+ tokens = split(line)
+ if len(tokens) < 2:
+ return ["time-format "]
+ elif tokens[1] == "time-format":
+ if len(tokens) == 2:
+ return ["long", "short"]
+ elif len(tokens) == 3:
+ if "long".find(text) == 0:
+ return ["long"]
+ elif "short".find(text) == 0:
+ return ["short"]
+ elif "time-format".find(text) == 0:
+ return ["time-format "]
+ return []
+
+ def do_set(self, data):
+ tokens = split(data)
+ try:
+ if tokens[0] == "time-format":
+ self.dispObject.do_setTimeFormat(tokens[1])
+ except:
+ pass
+
+ def complete_schema(self, text, line, begidx, endidx):
+ tokens = split(line)
+ if len(tokens) > 2:
+ return []
+ return self.dataObject.classCompletions(text)
+
+ def do_schema(self, data):
+ try:
+ self.dataObject.do_schema(data)
+ except Exception, e:
+ print "Exception in do_schema: %r" % e
+
+ def do_agents(self, data):
+ try:
+ self.dataObject.do_agents(data)
+ except Exception, e:
+ print "Exception in do_agents: %r" % e
+
+ def do_id(self, data):
+ try:
+ self.dataObject.do_id(data)
+ except Exception, e:
+ print "Exception in do_id: %r" % e
+
+ def complete_list(self, text, line, begidx, endidx):
+ tokens = split(line)
+ if len(tokens) > 2:
+ return []
+ return self.dataObject.classCompletions(text)
+
+ def do_list(self, data):
+ try:
+ self.dataObject.do_list(data)
+ except Exception, e:
+ print "Exception in do_list: %r" % e
+
+ def do_show(self, data):
+ try:
+ self.dataObject.do_show(data)
+ except Exception, e:
+ print "Exception in do_show: %r" % e
+
+ def do_call(self, data):
+ try:
+ self.dataObject.do_call(data)
+ except Exception, e:
+ print "Exception in do_call: %r", e
+
+ def do_EOF(self, data):
+ print "quit"
+ try:
+ self.dataObject.do_exit()
+ except:
+ pass
+ return True
+
+ def do_quit(self, data):
+ try:
+ self.dataObject.do_exit()
+ except:
+ pass
+ return True
+
+ def postcmd(self, stop, line):
+ return stop
+
+ def postloop(self):
+ print "Exiting..."
+ self.dataObject.close()
+
+#======================================================================================================
+# QmfData
+#======================================================================================================
+class QmfData(Console):
+ """
+ """
+ def __init__(self, disp, url):
+ self.disp = disp
+ self.url = url
+ self.session = Session(self, manageConnections=True)
+ self.broker = self.session.addBroker(self.url)
+ self.lock = Lock()
+ self.connected = None
+ self.closing = None
+ self.first_connect = True
+ self.cli = None
+ self.idRegistry = IdRegistry()
+ self.objects = {}
+
+ #=======================
+ # Methods to support CLI
+ #=======================
+ def setCli(self, cli):
+ self.cli = cli
+
+ def close(self):
+ try:
+ self.closing = True
+ if self.session and self.broker:
+ self.session.delBroker(self.broker)
+ except:
+ pass # we're shutting down - ignore any errors
+
+ def classCompletions(self, text):
+ pass
+
+ def do_schema(self, data):
+ if data == "":
+ self.schemaSummary()
+ else:
+ self.schemaTable(data)
+
+ def do_agents(self, data):
+ agents = self.session.getAgents()
+ rows = []
+ for agent in agents:
+ version = 1
+ if agent.isV2:
+ version = 2
+ rows.append(("%d.%s" % (agent.getBrokerBank(), agent.getAgentBank()), agent.label, agent.epoch, version))
+ self.disp.table("QMF Agents:", ("Agent Name", "Label", "Epoch", "QMF Version"), rows)
+
+ def do_id(self, data):
+ tokens = data.split()
+ for token in tokens:
+ if not token.isdigit():
+ print "Value %s is non-numeric" % token
+ return
+ title = "Translation of Display IDs:"
+ heads = ('DisplayID', 'Epoch', 'Agent', 'ObjectName')
+ if len(tokens) == 0:
+ tokens = self.idRegistry.getDisplayIds()
+ rows = []
+ for token in tokens:
+ rows.append(self.idRegistry.getIdInfo(int(token)))
+ self.disp.table(title, heads, rows)
+
+ def do_list(self, data):
+ tokens = data.split()
+ if len(tokens) == 0:
+ self.listClasses()
+ else:
+ self.listObjects(tokens)
+
+ def do_show(self, data):
+ tokens = data.split()
+ if len(tokens) == 0:
+ print "Missing Class or ID"
+ return
+ keys = self.classKeysByToken(tokens[0])
+ if keys:
+ self.showObjectsByKey(keys)
+ elif tokens[0].isdigit():
+ self.showObjectById(int(tokens[0]))
+
+ def do_call(self, data):
+ tokens = data.split()
+ if len(tokens) < 2:
+ print "Not enough arguments supplied"
+ return
+ displayId = long(tokens[0])
+ methodName = tokens[1]
+ args = []
+ for arg in tokens[2:]:
+ ##
+ ## If the argument is a map, list, boolean, integer, or floating (one decimal point),
+ ## run it through the Python evaluator so it is converted to the correct type.
+ ##
+ ## TODO: use a regex for this instead of this convoluted logic,
+ ## or even consider passing all args through eval() [which would
+ ## be a minor change to the interface as string args would then
+ ## always need to be quoted as strings within a map/list would
+ ## now]
+ if arg[0] == '{' or arg[0] == '[' or arg[0] == '"' or arg[0] == '\'' or arg == "True" or arg == "False" or \
+ ((arg.count('.') < 2 and (arg.count('-') == 0 or \
+ (arg.count('-') == 1 and arg[0] == '-')) and \
+ arg.replace('.','').replace('-','').isdigit())):
+ args.append(eval(arg))
+ else:
+ args.append(arg)
+
+ obj = None
+ try:
+ self.lock.acquire()
+ if displayId not in self.objects:
+ print "Unknown ID"
+ return
+ obj = self.objects[displayId]
+ finally:
+ self.lock.release()
+
+ object_id = obj.getObjectId();
+ if not object_id.isV2 and obj.getAgent().isV2:
+ object_key = ",".join([str(v) for p, v in obj.getProperties() if p.name != "vhostRef" and p.index == 1])
+ class_key = obj.getClassKey();
+ object_name = class_key.getPackageName() + ":" + class_key.getClassName() + ":" + object_key
+ object_id = ObjectId.create(object_id.agentName, object_name)
+
+ self.session._sendMethodRequest(self.broker, obj.getClassKey(), object_id, methodName, args)
+
+
+ def do_exit(self):
+ pass
+
+ #====================
+ # Sub-Command Methods
+ #====================
+ def schemaSummary(self, package_filter=None):
+ rows = []
+ packages = self.session.getPackages()
+ for package in packages:
+ if package_filter and package_filter != package:
+ continue
+ keys = self.session.getClasses(package)
+ for key in keys:
+ kind = "object"
+ schema = self.session.getSchema(key)
+ if schema:
+ if schema.kind == SchemaClass.CLASS_KIND_EVENT:
+ kind = "event"
+ if schema.kind == SchemaClass.CLASS_KIND_TABLE:
+ #
+ # Don't display event schemata. This will be a future feature.
+ #
+ rows.append((package, key.getClassName(), kind))
+ self.disp.table("QMF Classes:", ("Package", "Name", "Kind"), rows)
+
+ def schemaTable(self, text):
+ packages = self.session.getPackages()
+ if text in packages:
+ self.schemaSummary(package_filter=text)
+ for package in packages:
+ keys = self.session.getClasses(package)
+ for key in keys:
+ if text == key.getClassName() or text == package + ":" + key.getClassName():
+ schema = self.session.getSchema(key)
+ if schema.kind == SchemaClass.CLASS_KIND_TABLE:
+ self.schemaObject(schema)
+ else:
+ self.schemaEvent(schema)
+
+ def schemaObject(self, schema):
+ rows = []
+ title = "Object Class: %s" % schema.__repr__()
+ heads = ("Element", "Type", "Access", "Unit", "Notes", "Description")
+ for prop in schema.getProperties():
+ notes = ""
+ if prop.index : notes += "index "
+ if prop.optional : notes += "optional "
+ row = (prop.name, self.typeName(prop.type), self.accessName(prop.access),
+ self.notNone(prop.unit), notes, self.notNone(prop.desc))
+ rows.append(row)
+ for stat in schema.getStatistics():
+ row = (stat.name, self.typeName(stat.type), "", self.notNone(prop.unit), "", self.notNone(prop.desc))
+ rows.append(row)
+ self.disp.table(title, heads, rows)
+
+ for method in schema.methods:
+ rows = []
+ heads = ("Argument", "Type", "Direction", "Unit", "Description")
+ title = " Method: %s" % method.name
+ for arg in method.arguments:
+ row = (arg.name, self.typeName(arg.type), arg.dir, self.notNone(arg.unit), self.notNone(arg.desc))
+ rows.append(row)
+ print
+ self.disp.table(title, heads, rows)
+
+ def schemaEvent(self, schema):
+ rows = []
+ title = "Event Class: %s" % schema.__repr__()
+ heads = ("Element", "Type", "Unit", "Description")
+ for arg in schema.arguments:
+ row = (arg.name, self.typeName(arg.type), self.notNone(arg.unit), self.notNone(arg.desc))
+ rows.append(row)
+ self.disp.table(title, heads, rows)
+
+ def listClasses(self):
+ title = "Summary of Objects by Type:"
+ heads = ("Package", "Class", "Active", "Deleted")
+ rows = []
+ totals = {}
+ try:
+ self.lock.acquire()
+ for dispId in self.objects:
+ obj = self.objects[dispId]
+ key = obj.getClassKey()
+ index = (key.getPackageName(), key.getClassName())
+ if index in totals:
+ stats = totals[index]
+ else:
+ stats = (0, 0)
+ if obj.isDeleted():
+ stats = (stats[0], stats[1] + 1)
+ else:
+ stats = (stats[0] + 1, stats[1])
+ totals[index] = stats
+ finally:
+ self.lock.release()
+
+ for index in totals:
+ stats = totals[index]
+ rows.append((index[0], index[1], stats[0], stats[1]))
+ self.disp.table(title, heads, rows)
+
+ def listObjects(self, tokens):
+ ckeys = self.classKeysByToken(tokens[0])
+ show_deleted = True
+ if len(tokens) > 1 and tokens[1] == 'active':
+ show_deleted = None
+ heads = ("ID", "Created", "Destroyed", "Index")
+ rows = []
+ try:
+ self.lock.acquire()
+ for dispId in self.objects:
+ obj = self.objects[dispId]
+ if obj.getClassKey() in ckeys:
+ utime, ctime, dtime = obj.getTimestamps()
+ dtimestr = self.disp.timestamp(dtime)
+ if dtime == 0:
+ dtimestr = "-"
+ if dtime == 0 or (dtime > 0 and show_deleted):
+ row = (dispId, self.disp.timestamp(ctime), dtimestr, self.objectIndex(obj))
+ rows.append(row)
+ finally:
+ self.lock.release()
+ self.disp.table("Object Summary:", heads, rows)
+
+ def showObjectsByKey(self, key):
+ pass
+
+ def showObjectById(self, dispId):
+ heads = ("Attribute", str(dispId))
+ rows = []
+ try:
+ self.lock.acquire()
+ if dispId in self.objects:
+ obj = self.objects[dispId]
+ caption = "Object of type: %r" % obj.getClassKey()
+ for prop in obj.getProperties():
+ row = (prop[0].name, self.valueByType(prop[0].type, prop[1]))
+ rows.append(row)
+ for stat in obj.getStatistics():
+ row = (stat[0].name, self.valueByType(stat[0].type, stat[1]))
+ rows.append(row)
+ else:
+ print "No object found with ID %d" % dispId
+ finally:
+ self.lock.release()
+ self.disp.table(caption, heads, rows)
+
+ def classKeysByToken(self, token):
+ """
+ Given a token, return a list of matching class keys (if found):
+ token formats: <class-name>
+ <package-name>:<class-name>
+ """
+ pname = None
+ cname = None
+ parts = token.split(':')
+ if len(parts) == 1:
+ cname = parts[0]
+ elif len(parts) == 2:
+ pname = parts[0]
+ cname = parts[1]
+ else:
+ raise ValueError("Invalid Class Name: %s" % token)
+
+ keys = []
+ packages = self.session.getPackages()
+ for p in packages:
+ if pname == None or pname == p:
+ classes = self.session.getClasses(p)
+ for key in classes:
+ if key.getClassName() == cname:
+ keys.append(key)
+ return keys
+
+ def typeName (self, typecode):
+ """ Convert type-codes to printable strings """
+ if typecode == 1: return "uint8"
+ elif typecode == 2: return "uint16"
+ elif typecode == 3: return "uint32"
+ elif typecode == 4: return "uint64"
+ elif typecode == 5: return "bool"
+ elif typecode == 6: return "short-string"
+ elif typecode == 7: return "long-string"
+ elif typecode == 8: return "abs-time"
+ elif typecode == 9: return "delta-time"
+ elif typecode == 10: return "reference"
+ elif typecode == 11: return "boolean"
+ elif typecode == 12: return "float"
+ elif typecode == 13: return "double"
+ elif typecode == 14: return "uuid"
+ elif typecode == 15: return "field-table"
+ elif typecode == 16: return "int8"
+ elif typecode == 17: return "int16"
+ elif typecode == 18: return "int32"
+ elif typecode == 19: return "int64"
+ elif typecode == 20: return "object"
+ elif typecode == 21: return "list"
+ elif typecode == 22: return "array"
+ else:
+ raise ValueError ("Invalid type code: %s" % str(typecode))
+
+ def valueByType(self, typecode, val):
+ if type(val) is type(None):
+ return "absent"
+ if typecode == 1: return "%d" % val
+ elif typecode == 2: return "%d" % val
+ elif typecode == 3: return "%d" % val
+ elif typecode == 4: return "%d" % val
+ elif typecode == 6: return val
+ elif typecode == 7: return val
+ elif typecode == 8: return strftime("%c", gmtime(val / 1000000000))
+ elif typecode == 9:
+ if val < 0: val = 0
+ sec = val / 1000000000
+ min = sec / 60
+ hour = min / 60
+ day = hour / 24
+ result = ""
+ if day > 0:
+ result = "%dd " % day
+ if hour > 0 or result != "":
+ result += "%dh " % (hour % 24)
+ if min > 0 or result != "":
+ result += "%dm " % (min % 60)
+ result += "%ds" % (sec % 60)
+ return result
+
+ elif typecode == 10: return str(self.idRegistry.displayId(val))
+ elif typecode == 11:
+ if val:
+ return "True"
+ else:
+ return "False"
+
+ elif typecode == 12: return "%f" % val
+ elif typecode == 13: return "%f" % val
+ elif typecode == 14: return "%r" % val
+ elif typecode == 15: return "%r" % val
+ elif typecode == 16: return "%d" % val
+ elif typecode == 17: return "%d" % val
+ elif typecode == 18: return "%d" % val
+ elif typecode == 19: return "%d" % val
+ elif typecode == 20: return "%r" % val
+ elif typecode == 21: return "%r" % val
+ elif typecode == 22: return "%r" % val
+ else:
+ raise ValueError ("Invalid type code: %s" % str(typecode))
+
+ def accessName (self, code):
+ """ Convert element access codes to printable strings """
+ if code == '1': return "ReadCreate"
+ elif code == '2': return "ReadWrite"
+ elif code == '3': return "ReadOnly"
+ else:
+ raise ValueError ("Invalid access code: %s" % str(code))
+
+ def notNone (self, text):
+ if text == None:
+ return ""
+ else:
+ return text
+
+ def objectIndex(self, obj):
+ if obj._objectId.isV2:
+ return obj._objectId.getObject()
+ result = ""
+ first = True
+ props = obj.getProperties()
+ for prop in props:
+ if prop[0].index:
+ if not first:
+ result += "."
+ result += self.valueByType(prop[0].type, prop[1])
+ first = None
+ return result
+
+
+ #=====================
+ # Methods from Console
+ #=====================
+ def brokerConnected(self, broker):
+ """ Invoked when a connection is established to a broker """
+ try:
+ self.lock.acquire()
+ self.connected = True
+ finally:
+ self.lock.release()
+ if not self.first_connect:
+ print "Broker connected:", broker
+ self.first_connect = None
+
+ def brokerDisconnected(self, broker):
+ """ Invoked when the connection to a broker is lost """
+ try:
+ self.lock.acquire()
+ self.connected = None
+ finally:
+ self.lock.release()
+ if not self.closing:
+ print "Broker disconnected:", broker
+
+ def objectProps(self, broker, record):
+ """ Invoked when an object is updated. """
+ oid = record.getObjectId()
+ dispId = self.idRegistry.displayId(oid)
+ try:
+ self.lock.acquire()
+ if dispId in self.objects:
+ self.objects[dispId].mergeUpdate(record)
+ else:
+ self.objects[dispId] = record
+ finally:
+ self.lock.release()
+
+ def objectStats(self, broker, record):
+ """ Invoked when an object is updated. """
+ oid = record.getObjectId()
+ dispId = self.idRegistry.displayId(oid)
+ try:
+ self.lock.acquire()
+ if dispId in self.objects:
+ self.objects[dispId].mergeUpdate(record)
+ finally:
+ self.lock.release()
+
+ def event(self, broker, event):
+ """ Invoked when an event is raised. """
+ pass
+
+ def methodResponse(self, broker, seq, response):
+ print response
+
+
+#======================================================================================================
+# IdRegistry
+#======================================================================================================
+class IdRegistry(object):
+ """
+ """
+ def __init__(self):
+ self.next_display_id = 101
+ self.oid_to_display = {}
+ self.display_to_oid = {}
+ self.lock = Lock()
+
+ def displayId(self, oid):
+ try:
+ self.lock.acquire()
+ if oid in self.oid_to_display:
+ return self.oid_to_display[oid]
+ newId = self.next_display_id
+ self.next_display_id += 1
+ self.oid_to_display[oid] = newId
+ self.display_to_oid[newId] = oid
+ return newId
+ finally:
+ self.lock.release()
+
+ def objectId(self, displayId):
+ try:
+ self.lock.acquire()
+ if displayId in self.display_to_oid:
+ return self.display_to_oid[displayId]
+ return None
+ finally:
+ self.lock.release()
+
+ def getDisplayIds(self):
+ result = []
+ for displayId in self.display_to_oid:
+ result.append(str(displayId))
+ return result
+
+ def getIdInfo(self, displayId):
+ """
+ Given a display ID, return a tuple of (displayID, bootSequence/Durable, AgentBank/Name, ObjectName)
+ """
+ oid = self.objectId(displayId)
+ if oid == None:
+ return (displayId, "?", "unknown", "unknown")
+ bootSeq = oid.getSequence()
+ if bootSeq == 0:
+ bootSeq = '<durable>'
+ agent = oid.getAgentBank()
+ if agent == '0':
+ agent = 'Broker'
+ return (displayId, bootSeq, agent, oid.getObject())
+
+
+def Usage():
+ print "Usage: qpid-tool [[<username>/<password>@]<target-host>[:<tcp-port>]]"
+ print
+
+#=========================================================
+# Main Program
+#=========================================================
+
+# Get host name and port if specified on the command line
+cargs = sys.argv[1:]
+_host = "localhost"
+
+if len(cargs) > 0:
+ _host = cargs[0]
+
+if _host[0] == '-':
+ Usage()
+ if _host != '-h' and _host != "--help":
+ print "qpid-tool: error: no such option:", _host
+ sys.exit(1)
+
+disp = Display()
+
+# Attempt to make a connection to the target broker
+try:
+ data = QmfData(disp, _host)
+except Exception, e:
+ if str(e).find("Exchange not found") != -1:
+ print "Management not enabled on broker: Use '-m yes' option on broker startup."
+ else:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+ sys.exit(1)
+
+# Instantiate the CLI interpreter and launch it.
+cli = Mcli(data, disp)
+print("Management Tool for QPID")
+try:
+ cli.cmdloop()
+except KeyboardInterrupt:
+ print
+ print "Exiting..."
+except Exception, e:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+
+# alway attempt to cleanup broker resources
+data.close()