diff options
Diffstat (limited to 'qpid/python/qpid/managementdata.py')
-rw-r--r-- | qpid/python/qpid/managementdata.py | 773 |
1 files changed, 773 insertions, 0 deletions
diff --git a/qpid/python/qpid/managementdata.py b/qpid/python/qpid/managementdata.py new file mode 100644 index 0000000000..61cb10c134 --- /dev/null +++ b/qpid/python/qpid/managementdata.py @@ -0,0 +1,773 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +############################################################################### +## This file is being obsoleted by qmf/console.py +############################################################################### + +import qpid +import re +import socket +import struct +import os +import platform +import locale +from qpid.connection import Timeout +from qpid.management import managementChannel, managementClient +from threading import Lock +from disp import Display +from shlex import split +from qpid.connection import Connection +from qpid.util import connect + +class Broker: + def __init__ (self, text): + rex = re.compile(r""" + # [ <user> [ / <password> ] @] <host> [ :<port> ] + ^ (?: ([^/]*) (?: / ([^@]*) )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X) + match = rex.match(text) + if not match: raise ValueError("'%s' is not a valid broker url" % (text)) + user, password, host, port = match.groups() + + if port: self.port = int(port) + else: self.port = 5672 + for addr in socket.getaddrinfo(host, self.port): + if addr[1] == socket.AF_INET: + self.host = addr[4][0] + self.username = user or "guest" + self.password = password or "guest" + + def name (self): + return self.host + ":" + str (self.port) + +class ManagementData: + + # + # Data Structure: + # + # Please note that this data structure holds only the most recent + # configuration and instrumentation data for each object. It does + # not hold the detailed historical data that is sent from the broker. + # The only historical data it keeps are the high and low watermarks + # for hi-lo statistics. + # + # tables :== {class-key} + # {<obj-id>} + # (timestamp, config-record, inst-record) + # class-key :== (<package-name>, <class-name>, <class-hash>) + # timestamp :== (<last-interval-time>, <create-time>, <delete-time>) + # config-record :== [element] + # inst-record :== [element] + # element :== (<element-name>, <element-value>) + # + + def registerObjId (self, objId): + if not objId.index() in self.idBackMap: + self.idBackMap[objId.index()] = self.nextId + self.idMap[self.nextId] = objId + self.nextId += 1 + + def displayObjId (self, objIdIndex): + if objIdIndex in self.idBackMap: + return self.idBackMap[objIdIndex] + else: + return 0 + + def rawObjId (self, displayId): + if displayId in self.idMap: + return self.idMap[displayId] + else: + return None + + def displayClassName (self, cls): + (packageName, className, hash) = cls + rev = self.schema[cls][4] + if rev == 0: + suffix = "" + else: + suffix = ".%d" % rev + return packageName + ":" + className + suffix + + def dataHandler (self, context, className, list, timestamps): + """ Callback for configuration and instrumentation data updates """ + self.lock.acquire () + try: + # If this class has not been seen before, create an empty dictionary to + # hold objects of this class + if className not in self.tables: + self.tables[className] = {} + + # Register the ID so a more friendly presentation can be displayed + objId = list[0][1] + oidx = objId.index() + self.registerObjId (objId) + + # If this object hasn't been seen before, create a new object record with + # the timestamps and empty lists for configuration and instrumentation data. + if oidx not in self.tables[className]: + self.tables[className][oidx] = (timestamps, [], []) + + (unused, oldConf, oldInst) = self.tables[className][oidx] + + # For config updates, simply replace old config list with the new one. + if context == 0: #config + self.tables[className][oidx] = (timestamps, list, oldInst) + + # For instrumentation updates, carry the minimum and maximum values for + # "hi-lo" stats forward. + elif context == 1: #inst + if len (oldInst) == 0: + newInst = list + else: + newInst = [] + for idx in range (len (list)): + (key, value) = list[idx] + if key.find ("High") == len (key) - 4: + if oldInst[idx][1] > value: + value = oldInst[idx][1] + if key.find ("Low") == len (key) - 3: + if oldInst[idx][1] < value: + value = oldInst[idx][1] + newInst.append ((key, value)) + self.tables[className][oidx] = (timestamps, oldConf, newInst) + + finally: + self.lock.release () + + def ctrlHandler (self, context, op, data): + if op == self.mclient.CTRL_BROKER_INFO: + pass + elif op == self.mclient.CTRL_HEARTBEAT: + pass + + def configHandler (self, context, className, list, timestamps): + self.dataHandler (0, className, list, timestamps); + + def instHandler (self, context, className, list, timestamps): + self.dataHandler (1, className, list, timestamps); + + def methodReply (self, broker, sequence, status, sText, args): + """ Callback for method-reply messages """ + self.lock.acquire () + try: + line = "Call Result: " + self.methodsPending[sequence] + \ + " " + str (status) + " (" + sText + ")" + print line, args + del self.methodsPending[sequence] + finally: + self.lock.release () + + def closeHandler (self, context, reason): + if self.operational: + print "Connection to broker lost:", reason + self.operational = False + if self.cli != None: + self.cli.setPromptMessage ("Broker Disconnected") + + def schemaHandler (self, context, classKey, configs, insts, methods, events): + """ Callback for schema updates """ + if classKey not in self.schema: + schemaRev = 0 + for key in self.schema: + if classKey[0] == key[0] and classKey[1] == key[1]: + schemaRev += 1 + self.schema[classKey] = (configs, insts, methods, events, schemaRev) + + def setCli (self, cliobj): + self.cli = cliobj + + def __init__ (self, disp, host, username="guest", password="guest"): + self.lock = Lock () + self.tables = {} + self.schema = {} + self.bootSequence = 0 + self.operational = False + self.disp = disp + self.cli = None + self.lastUnit = None + self.methodSeq = 1 + self.methodsPending = {} + self.sessionId = "%s.%d" % (platform.uname()[1], os.getpid()) + + self.broker = Broker (host) + sock = connect (self.broker.host, self.broker.port) + oldTimeout = sock.gettimeout() + sock.settimeout(10) + self.conn = Connection (sock, + username=self.broker.username, password=self.broker.password) + def aborted(): + raise Timeout("Waiting for connection to be established with broker") + oldAborted = self.conn.aborted + self.conn.aborted = aborted + + self.conn.start () + + sock.settimeout(oldTimeout) + self.conn.aborted = oldAborted + + self.mclient = managementClient ("unused", self.ctrlHandler, self.configHandler, + self.instHandler, self.methodReply, self.closeHandler) + self.mclient.schemaListener (self.schemaHandler) + self.mch = self.mclient.addChannel (self.conn.session(self.sessionId)) + self.operational = True + self.idMap = {} + self.idBackMap = {} + self.nextId = 101 + + def close (self): + pass + + def refName (self, oid): + if oid == None: + return "NULL" + return str (self.displayObjId (oid.index())) + + def valueDisplay (self, classKey, key, value): + if value == None: + return "<NULL>" + for kind in range (2): + schema = self.schema[classKey][kind] + for item in schema: + if item[0] == key: + typecode = item[1] + unit = item[2] + if (typecode >= 1 and typecode <= 5) or typecode == 12 or typecode == 13 or \ + (typecode >= 16 and typecode <= 19): + if unit == None or unit == self.lastUnit: + return str (value) + else: + self.lastUnit = unit + suffix = "" + if value != 1: + suffix = "s" + return str (value) + " " + unit + suffix + elif typecode == 6 or typecode == 7: # strings + return value + elif typecode == 8: + if value == 0: + return "--" + return self.disp.timestamp (value) + elif typecode == 9: + return str (value) + elif typecode == 10: + return self.refName (value) + elif typecode == 11: + if value == 0: + return "False" + else: + return "True" + elif typecode == 14: + return str (value) + elif typecode == 15: + return str (value) + return "*type-error*" + + def getObjIndex (self, classKey, config): + """ Concatenate the values from index columns to form a unique object name """ + result = "" + schemaConfig = self.schema[classKey][0] + for item in schemaConfig: + if item[5] == 1 and item[0] != "id": + if result != "": + result = result + "." + for key,val in config: + if key == item[0]: + result = result + self.valueDisplay (classKey, key, val) + return result + + def getClassKey (self, className): + delimPos = className.find(":") + if delimPos == -1: + schemaRev = 0 + delim = className.find(".") + if delim != -1: + schemaRev = int(className[delim + 1:]) + name = className[0:delim] + else: + name = className + for key in self.schema: + if key[1] == name and self.schema[key][4] == schemaRev: + return key + else: + package = className[0:delimPos] + name = className[delimPos + 1:] + schemaRev = 0 + delim = name.find(".") + if delim != -1: + schemaRev = int(name[delim + 1:]) + name = name[0:delim] + for key in self.schema: + if key[0] == package and key[1] == name: + if self.schema[key][4] == schemaRev: + return key + return None + + def classCompletions (self, prefix): + """ Provide a list of candidate class names for command completion """ + self.lock.acquire () + complist = [] + try: + for name in self.tables: + if name.find (prefix) == 0: + complist.append (name) + finally: + self.lock.release () + return complist + + def typeName (self, typecode): + """ Convert type-codes to printable strings """ + if typecode == 1: + return "uint8" + elif typecode == 2: + return "uint16" + elif typecode == 3: + return "uint32" + elif typecode == 4: + return "uint64" + elif typecode == 5: + return "bool" + elif typecode == 6: + return "short-string" + elif typecode == 7: + return "long-string" + elif typecode == 8: + return "abs-time" + elif typecode == 9: + return "delta-time" + elif typecode == 10: + return "reference" + elif typecode == 11: + return "boolean" + elif typecode == 12: + return "float" + elif typecode == 13: + return "double" + elif typecode == 14: + return "uuid" + elif typecode == 15: + return "field-table" + elif typecode == 16: + return "int8" + elif typecode == 17: + return "int16" + elif typecode == 18: + return "int32" + elif typecode == 19: + return "int64" + elif typecode == 20: + return "object" + elif typecode == 21: + return "list" + elif typecode == 22: + return "array" + else: + raise ValueError ("Invalid type code: %d" % typecode) + + def accessName (self, code): + """ Convert element access codes to printable strings """ + if code == 1: + return "ReadCreate" + elif code == 2: + return "ReadWrite" + elif code == 3: + return "ReadOnly" + else: + raise ValueError ("Invalid access code: %d" %code) + + def notNone (self, text): + if text == None: + return "" + else: + return text + + def isOid (self, id): + for char in str (id): + if not char.isdigit () and not char == '-': + return False + return True + + def listOfIds (self, classKey, tokens): + """ Generate a tuple of object ids for a classname based on command tokens. """ + list = [] + if len(tokens) == 0 or tokens[0] == "all": + for id in self.tables[classKey]: + list.append (self.displayObjId (id)) + + elif tokens[0] == "active": + for id in self.tables[classKey]: + if self.tables[classKey][id][0][2] == 0: + list.append (self.displayObjId (id)) + + else: + for token in tokens: + if self.isOid (token): + if token.find ("-") != -1: + ids = token.split("-", 2) + for id in range (int (ids[0]), int (ids[1]) + 1): + if self.getClassForId (self.rawObjId (long (id))) == classKey: + list.append (id) + else: + list.append (int(token)) + + list.sort () + result = () + for item in list: + result = result + (item,) + return result + + def listClasses (self): + """ Generate a display of the list of classes """ + self.lock.acquire () + try: + rows = [] + sorted = self.tables.keys () + sorted.sort () + for name in sorted: + active = 0 + deleted = 0 + for record in self.tables[name]: + isdel = False + ts = self.tables[name][record][0] + if ts[2] > 0: + isdel = True + if isdel: + deleted = deleted + 1 + else: + active = active + 1 + rows.append ((self.displayClassName (name), active, deleted)) + if len (rows) != 0: + self.disp.table ("Management Object Types:", + ("ObjectType", "Active", "Deleted"), rows) + else: + print "Waiting for next periodic update" + finally: + self.lock.release () + + def listObjects (self, tokens): + """ Generate a display of a list of objects in a class """ + if len(tokens) == 0: + print "Error - No class name provided" + return + + self.lock.acquire () + try: + classKey = self.getClassKey (tokens[0]) + if classKey == None: + print ("Object type %s not known" % tokens[0]) + else: + rows = [] + if classKey in self.tables: + ids = self.listOfIds(classKey, tokens[1:]) + for objId in ids: + (ts, config, inst) = self.tables[classKey][self.rawObjId(objId).index()] + createTime = self.disp.timestamp (ts[1]) + destroyTime = "-" + if ts[2] > 0: + destroyTime = self.disp.timestamp (ts[2]) + objIndex = self.getObjIndex (classKey, config) + row = (objId, createTime, destroyTime, objIndex) + rows.append (row) + self.disp.table ("Objects of type %s" % self.displayClassName(classKey), + ("ID", "Created", "Destroyed", "Index"), + rows) + finally: + self.lock.release () + + def showObjects (self, tokens): + """ Generate a display of object data for a particular class """ + self.lock.acquire () + try: + self.lastUnit = None + if self.isOid (tokens[0]): + if tokens[0].find ("-") != -1: + rootId = int (tokens[0][0:tokens[0].find ("-")]) + else: + rootId = int (tokens[0]) + + classKey = self.getClassForId (self.rawObjId (rootId)) + remaining = tokens + if classKey == None: + print "Id not known: %d" % int (tokens[0]) + raise ValueError () + else: + classKey = self.getClassKey (tokens[0]) + remaining = tokens[1:] + if classKey not in self.tables: + print "Class not known: %s" % tokens[0] + raise ValueError () + + userIds = self.listOfIds (classKey, remaining) + if len (userIds) == 0: + print "No object IDs supplied" + raise ValueError () + + ids = [] + for id in userIds: + if self.getClassForId (self.rawObjId (long (id))) == classKey: + ids.append (self.rawObjId (long (id))) + + rows = [] + timestamp = None + config = self.tables[classKey][ids[0].index()][1] + for eIdx in range (len (config)): + key = config[eIdx][0] + if key != "id": + row = ("property", key) + for id in ids: + if timestamp == None or \ + timestamp < self.tables[classKey][id.index()][0][0]: + timestamp = self.tables[classKey][id.index()][0][0] + (key, value) = self.tables[classKey][id.index()][1][eIdx] + row = row + (self.valueDisplay (classKey, key, value),) + rows.append (row) + + inst = self.tables[classKey][ids[0].index()][2] + for eIdx in range (len (inst)): + key = inst[eIdx][0] + if key != "id": + row = ("statistic", key) + for id in ids: + (key, value) = self.tables[classKey][id.index()][2][eIdx] + row = row + (self.valueDisplay (classKey, key, value),) + rows.append (row) + + titleRow = ("Type", "Element") + for id in ids: + titleRow = titleRow + (self.refName(id),) + caption = "Object of type %s:" % self.displayClassName(classKey) + if timestamp != None: + caption = caption + " (last sample time: " + self.disp.timestamp (timestamp) + ")" + self.disp.table (caption, titleRow, rows) + + except: + pass + self.lock.release () + + def schemaSummary (self): + """ Generate a display of the list of classes in the schema """ + self.lock.acquire () + try: + rows = [] + sorted = self.schema.keys () + sorted.sort () + for classKey in sorted: + tuple = self.schema[classKey] + row = (self.displayClassName(classKey), len (tuple[0]), len (tuple[1]), + len (tuple[2])) + rows.append (row) + self.disp.table ("Classes in Schema:", + ("Class", "Properties", "Statistics", "Methods"), + rows) + finally: + self.lock.release () + + def schemaTable (self, className): + """ Generate a display of details of the schema of a particular class """ + self.lock.acquire () + try: + classKey = self.getClassKey (className) + if classKey == None: + print ("Class name %s not known" % className) + raise ValueError () + + rows = [] + schemaRev = self.schema[classKey][4] + for config in self.schema[classKey][0]: + name = config[0] + if name != "id": + typename = self.typeName(config[1]) + unit = self.notNone (config[2]) + desc = self.notNone (config[3]) + access = self.accessName (config[4]) + extra = "" + if config[5] == 1: + extra += "index " + if config[6] != None: + extra += "Min: " + str(config[6]) + " " + if config[7] != None: + extra += "Max: " + str(config[7]) + " " + if config[8] != None: + extra += "MaxLen: " + str(config[8]) + " " + if config[9] == 1: + extra += "optional " + rows.append ((name, typename, unit, access, extra, desc)) + + for config in self.schema[classKey][1]: + name = config[0] + if name != "id": + typename = self.typeName(config[1]) + unit = self.notNone (config[2]) + desc = self.notNone (config[3]) + rows.append ((name, typename, unit, "", "", desc)) + + titles = ("Element", "Type", "Unit", "Access", "Notes", "Description") + self.disp.table ("Schema for class '%s':" % self.displayClassName(classKey), titles, rows) + + for mname in self.schema[classKey][2]: + (mdesc, args) = self.schema[classKey][2][mname] + caption = "\nMethod '%s' %s" % (mname, self.notNone (mdesc)) + rows = [] + for arg in args: + name = arg[0] + typename = self.typeName (arg[1]) + dir = arg[2] + unit = self.notNone (arg[3]) + desc = self.notNone (arg[4]) + extra = "" + if arg[5] != None: + extra = extra + "Min: " + str (arg[5]) + if arg[6] != None: + extra = extra + "Max: " + str (arg[6]) + if arg[7] != None: + extra = extra + "MaxLen: " + str (arg[7]) + if arg[8] != None: + extra = extra + "Default: " + str (arg[8]) + rows.append ((name, typename, dir, unit, extra, desc)) + titles = ("Argument", "Type", "Direction", "Unit", "Notes", "Description") + self.disp.table (caption, titles, rows) + + except Exception,e: + pass + self.lock.release () + + def getClassForId (self, objId): + """ Given an object ID, return the class key for the referenced object """ + for classKey in self.tables: + if objId.index() in self.tables[classKey]: + return classKey + return None + + def callMethod (self, userOid, methodName, args): + self.lock.acquire () + methodOk = True + try: + classKey = self.getClassForId (self.rawObjId (userOid)) + if classKey == None: + raise ValueError () + + if methodName not in self.schema[classKey][2]: + print "Method '%s' not valid for class '%s'" % (methodName, self.displayClassName(classKey)) + raise ValueError () + + schemaMethod = self.schema[classKey][2][methodName] + count = 0 + for arg in range(len(schemaMethod[1])): + if schemaMethod[1][arg][2].find("I") != -1: + count += 1 + if len (args) != count: + print "Wrong number of method args: Need %d, Got %d" % (count, len (args)) + raise ValueError () + + namedArgs = {} + idx = 0 + for arg in range(len(schemaMethod[1])): + if schemaMethod[1][arg][2].find("I") != -1: + namedArgs[schemaMethod[1][arg][0]] = args[idx] + idx += 1 + + self.methodSeq = self.methodSeq + 1 + self.methodsPending[self.methodSeq] = methodName + except Exception, e: + methodOk = False + self.lock.release () + if methodOk: +# try: + self.mclient.callMethod (self.mch, self.methodSeq, self.rawObjId (userOid), classKey, + methodName, namedArgs) +# except ValueError, e: +# print "Error invoking method:", e + + def makeIdRow (self, displayId): + if displayId in self.idMap: + objId = self.idMap[displayId] + else: + return None + if objId.getFlags() == 0: + flags = "" + else: + flags = str(objId.getFlags()) + seq = objId.getSequence() + if seq == 0: + seqText = "<durable>" + else: + seqText = str(seq) + return (displayId, flags, seqText, objId.getBroker(), objId.getBank(), hex(objId.getObject())) + + def listIds (self, select): + rows = [] + if select == 0: + sorted = self.idMap.keys() + sorted.sort() + for displayId in sorted: + row = self.makeIdRow (displayId) + rows.append(row) + else: + row = self.makeIdRow (select) + if row == None: + print "Display Id %d not known" % select + return + rows.append(row) + self.disp.table("Translation of Display IDs:", + ("DisplayID", "Flags", "BootSequence", "Broker", "Bank", "Object"), + rows) + + def do_list (self, data): + tokens = data.split () + if len (tokens) == 0: + self.listClasses () + else: + self.listObjects (tokens) + + def do_show (self, data): + tokens = data.split () + self.showObjects (tokens) + + def do_schema (self, data): + if data == "": + self.schemaSummary () + else: + self.schemaTable (data) + + def do_call (self, data): + encTokens = data.split () + try: + tokens = [a.decode(locale.getpreferredencoding()) for a in encArgs] + except: + tokens = encTokens + if len (tokens) < 2: + print "Not enough arguments supplied" + return + + displayId = long (tokens[0]) + methodName = tokens[1] + args = tokens[2:] + self.callMethod (displayId, methodName, args) + + def do_id (self, data): + if data == "": + select = 0 + else: + select = int(data) + self.listIds(select) + + def do_exit (self): + self.mclient.removeChannel (self.mch) |