diff options
-rw-r--r-- | qpid/cpp/managementgen/templates/Class.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementObject.h | 4 | ||||
-rw-r--r-- | qpid/python/mgmt-cli/disp.py | 77 | ||||
-rw-r--r-- | qpid/python/mgmt-cli/main.py | 164 | ||||
-rw-r--r-- | qpid/python/mgmt-cli/managementdata.py | 417 | ||||
-rw-r--r-- | qpid/python/qpid/management.py | 239 | ||||
-rw-r--r-- | qpid/python/qpid/management.py.rej | 457 | ||||
-rw-r--r-- | qpid/specs/management-schema.xml | 21 |
9 files changed, 1305 insertions, 83 deletions
diff --git a/qpid/cpp/managementgen/templates/Class.h b/qpid/cpp/managementgen/templates/Class.h index cff915412e..6a54b2131c 100644 --- a/qpid/cpp/managementgen/templates/Class.h +++ b/qpid/cpp/managementgen/templates/Class.h @@ -22,6 +22,7 @@ /*MGEN:Root.Disclaimer*/ +#include "qpid/sys/Mutex.h" #include "qpid/management/ManagementObject.h" namespace qpid { @@ -52,6 +53,7 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject public: typedef boost::shared_ptr</*MGEN:Class.NameCap*/> shared_ptr; + qpid::sys::Mutex accessorLock; /*MGEN:Class.NameCap*/ (Manageable* coreObject, Manageable* parentObject, /*MGEN:Class.ConstructorArgs*/); diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index e2fd998cc0..a5384014d8 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -97,6 +97,7 @@ void Queue::deliver(intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); if (mgmtObject.get() != 0) { + Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); @@ -104,6 +105,7 @@ void Queue::deliver(intrusive_ptr<Message>& msg){ } }else { if (mgmtObject.get() != 0) { + Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); @@ -122,6 +124,7 @@ void Queue::recover(intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued if (mgmtObject.get() != 0) { + Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgPersistEnqueues (); @@ -140,6 +143,7 @@ void Queue::recover(intrusive_ptr<Message>& msg){ void Queue::process(intrusive_ptr<Message>& msg){ push(msg); if (mgmtObject.get() != 0) { + Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgTxnEnqueues (); @@ -323,6 +327,7 @@ void Queue::consume(Consumer&, bool requestExclusive){ consumerCount++; if (mgmtObject.get() != 0){ + Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_consumers (); } } @@ -333,6 +338,7 @@ void Queue::cancel(Consumer& c){ consumerCount--; if(exclusive) exclusive = false; if (mgmtObject.get() != 0){ + Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->dec_consumers (); } } @@ -363,6 +369,7 @@ void Queue::pop(){ if (policy.get()) policy->dequeued(msg.payload->contentSize()); if (mgmtObject.get() != 0){ + Mutex::ScopedLock alock(mgmtObject->accessorLock); mgmtObject->inc_msgTotalDequeues (); mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize()); mgmtObject->dec_msgDepth (); diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h index a8ba231419..ff136c397d 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.h +++ b/qpid/cpp/src/qpid/management/ManagementObject.h @@ -54,8 +54,8 @@ class ManagementObject static const uint8_t TYPE_LSTR = 7; static const uint8_t ACCESS_RC = 1; - static const uint8_t ACCESS_RW = 1; - static const uint8_t ACCESS_RO = 1; + static const uint8_t ACCESS_RW = 2; + static const uint8_t ACCESS_RO = 3; static const uint8_t DIR_I = 1; static const uint8_t DIR_O = 2; diff --git a/qpid/python/mgmt-cli/disp.py b/qpid/python/mgmt-cli/disp.py new file mode 100644 index 0000000000..5746a26e51 --- /dev/null +++ b/qpid/python/mgmt-cli/disp.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from time import strftime, gmtime + +class Display: + """ Display formatting for QPID Management CLI """ + + def __init__ (self): + self.tableSpacing = 2 + self.tablePrefix = " " + self.timestampFormat = "%X" + + def table (self, title, heads, rows): + """ Print a formatted table with autosized columns """ + print title + if len (rows) == 0: + return + colWidth = [] + col = 0 + line = self.tablePrefix + for head in heads: + width = len (head) + for row in rows: + cellWidth = len (str (row[col])) + if cellWidth > width: + width = cellWidth + colWidth.append (width + self.tableSpacing) + line = line + head + for i in range (colWidth[col] - len (head)): + line = line + " " + col = col + 1 + print line + line = self.tablePrefix + for width in colWidth: + for i in range (width): + line = line + "=" + print line + + for row in rows: + line = self.tablePrefix + col = 0 + for width in colWidth: + line = line + str (row[col]) + for i in range (width - len (str (row[col]))): + line = line + " " + col = col + 1 + print line + + def do_setTimeFormat (self, fmt): + """ Select timestamp format """ + if fmt == "long": + self.timestampFormat = "%c" + elif fmt == "short": + self.timestampFormat = "%X" + + def timestamp (self, nsec): + """ Format a nanosecond-since-the-epoch timestamp for printing """ + return strftime (self.timestampFormat, gmtime (nsec / 1000000000)) diff --git a/qpid/python/mgmt-cli/main.py b/qpid/python/mgmt-cli/main.py new file mode 100644 index 0000000000..2990e25437 --- /dev/null +++ b/qpid/python/mgmt-cli/main.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import getopt +import sys +import socket +from cmd import Cmd +from managementdata import ManagementData +from shlex import split +from disp import Display +from qpid.peer import Closed + +class Mcli (Cmd): + """ Management Command Interpreter """ + prompt = "qpid: " + + def __init__ (self, dataObject, dispObject): + Cmd.__init__ (self) + self.dataObject = dataObject + self.dispObject = dispObject + + def emptyline (self): + pass + + def do_help (self, data): + print "Management Tool for QPID" + print + print "Commands:" + print " list - Print summary of existing objects by class" + print " list <className> - Print list of objects of the specified class" + print " list <className> all - Print contents of all objects of specified class" + print " list <className> <list-of-IDs> - Print contents of one or more objects" + print " list is space-separated, ranges may be specified (i.e. 1004-1010)" + print " call <ID> <methodName> [<args>] - Invoke a method on an object" + print " schema - Print summary of object classes seen on the target" + print " schema [className] - Print details of an object class" + print " set time-format short - Select short timestamp format (default)" + print " set time-format long - Select long timestamp format" + print " quit or ^D - Exit the program" + print + + def complete_set (self, text, line, begidx, endidx): + """ Command completion for the 'set' command """ + tokens = split (line) + if len (tokens) < 2: + return ["time-format "] + elif tokens[1] == "time-format": + if len (tokens) == 2: + return ["long", "short"] + elif len (tokens) == 3: + if "long".find (text) == 0: + return ["long"] + elif "short".find (text) == 0: + return ["short"] + elif "time-format".find (text) == 0: + return ["time-format "] + return [] + + def do_set (self, data): + tokens = split (data) + try: + if tokens[0] == "time-format": + self.dispObject.do_setTimeFormat (tokens[1]) + except: + pass + + def complete_schema (self, text, line, begidx, endidx): + tokens = split (line) + if len (tokens) > 2: + return [] + return self.dataObject.classCompletions (text) + + def do_schema (self, data): + self.dataObject.do_schema (data) + + def complete_list (self, text, line, begidx, endidx): + tokens = split (line) + if len (tokens) > 2: + return [] + return self.dataObject.classCompletions (text) + + def do_list (self, data): + self.dataObject.do_list (data) + + def do_call (self, data): + self.dataObject.do_call (data) + + def do_EOF (self, data): + print "quit" + return True + + def do_quit (self, data): + return True + + def postcmd (self, stop, line): + return stop + + def postloop (self): + print "Exiting..." + self.dataObject.close () + +def Usage (): + print sys.argv[0], "[<target-host> [<tcp-port>]]" + print + sys.exit (1) + +#========================================================= +# Main Program +#========================================================= + +# Get host name and port if specified on the command line +try: + (optlist, cargs) = getopt.getopt (sys.argv[1:], 's:') +except: + Usage () + +specpath = "/usr/share/amqp/amqp.0-10-preview.xml" +host = "localhost" +port = 5672 + +if "s" in optlist: + specpath = optlist["s"] + +if len (cargs) > 0: + host = cargs[0] + +if len (cargs) > 1: + port = int (cargs[1]) + +print ("Management Tool for QPID") +disp = Display () + +# Attempt to make a connection to the target broker +try: + data = ManagementData (disp, host, port, spec=specpath) +except socket.error, e: + sys.exit (0) +except Closed, e: + if str(e).find ("Exchange not found") != -1: + print "Management not enabled on broker: Use '-m yes' option on broker startup." + sys.exit (0) + +# Instantiate the CLI interpreter and launch it. +cli = Mcli (data, disp) +cli.cmdloop () diff --git a/qpid/python/mgmt-cli/managementdata.py b/qpid/python/mgmt-cli/managementdata.py new file mode 100644 index 0000000000..b770677825 --- /dev/null +++ b/qpid/python/mgmt-cli/managementdata.py @@ -0,0 +1,417 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.management import ManagedBroker +from threading import Lock +from disp import Display +from shlex import split + +class ManagementData: + + # + # Data Structure: + # + # Please note that this data structure holds only the most recent + # configuration and instrumentation data for each object. It does + # not hold the detailed historical data that is sent from the broker. + # The only historical data it keeps are the high and low watermarks + # for hi-lo statistics. + # + # tables :== {<class-name>} + # {<obj-id>} + # (timestamp, config-record, inst-record) + # timestamp :== (<last-interval-time>, <create-time>, <delete-time>) + # config-record :== [element] + # inst-record :== [element] + # element :== (<element-name>, <element-value>) + # + + def dataHandler (self, context, className, list, timestamps): + """ Callback for configuration and instrumentation data updates """ + self.lock.acquire () + try: + # If this class has not been seen before, create an empty dictionary to + # hold objects of this class + if className not in self.tables: + self.tables[className] = {} + + # Calculate a base-id so displayed IDs are reasonable 4-digit numbers + id = long (list[0][1]) + if self.baseId == 0: + self.baseId = id - 1000 + + # If this object hasn't been seen before, create a new object record with + # the timestamps and empty lists for configuration and instrumentation data. + if id not in self.tables[className]: + self.tables[className][id] = (timestamps, [], []) + + (unused, oldConf, oldInst) = self.tables[className][id] + + # For config updates, simply replace old config list with the new one. + if context == 0: #config + self.tables[className][id] = (timestamps, list, oldInst) + + # For instrumentation updates, carry the minimum and maximum values for + # "hi-lo" stats forward. + elif context == 1: #inst + if len (oldInst) == 0: + newInst = list + else: + newInst = [] + for idx in range (len (list)): + (key, value) = list[idx] + if key.find ("High") == len (key) - 4: + if oldInst[idx][1] > value: + value = oldInst[idx][1] + if key.find ("Low") == len (key) - 3: + if oldInst[idx][1] < value: + value = oldInst[idx][1] + newInst.append ((key, value)) + self.tables[className][id] = (timestamps, oldConf, newInst) + + finally: + self.lock.release () + + def methodReply (self, broker, methodId, status, sText, args): + """ Callback for method-reply messages """ + pass + + def schemaHandler (self, context, className, configs, insts, methods, events): + """ Callback for schema updates """ + if className not in self.schema: + self.schema[className] = (configs, insts, methods, events) + + def __init__ (self, disp, host, port=5672, username="guest", password="guest", + spec="../../specs/amqp.0-10-preview.xml"): + self.broker = ManagedBroker (host, port, username, password, spec) + self.broker.configListener (0, self.dataHandler) + self.broker.instrumentationListener (1, self.dataHandler) + self.broker.methodListener (None, self.methodReply) + self.broker.schemaListener (None, self.schemaHandler) + self.lock = Lock () + self.tables = {} + self.schema = {} + self.baseId = 0 + self.disp = disp + self.broker.start () + + def close (self): + self.broker.stop () + + def getObjIndex (self, className, config): + """ Concatenate the values from index columns to form a unique object name """ + result = "" + schemaConfig = self.schema[className][0] + for item in schemaConfig: + if item[5] == 1 and item[0] != "id": + if result != "": + result = result + "." + for key,val in config: + if key == item[0]: + if key.find ("Ref") != -1: + val = val - self.baseId + result = result + str (val) + return result + + def classCompletions (self, prefix): + """ Provide a list of candidate class names for command completion """ + self.lock.acquire () + complist = [] + try: + for name in self.tables: + if name.find (prefix) == 0: + complist.append (name) + finally: + self.lock.release () + return complist + + def typeName (self, typecode): + """ Convert type-codes to printable strings """ + if typecode == 1: + return "uint8" + elif typecode == 2: + return "uint16" + elif typecode == 3: + return "uint32" + elif typecode == 4: + return "uint64" + elif typecode == 5: + return "bool" + elif typecode == 6: + return "short-string" + elif typecode == 7: + return "long-string" + else: + raise ValueError ("Invalid type code: %d" % typecode) + + def accessName (self, code): + """ Convert element access codes to printable strings """ + if code == 1: + return "ReadCreate" + elif code == 2: + return "ReadWrite" + elif code == 3: + return "ReadOnly" + else: + raise ValueErrir ("Invalid access code: %d" %code) + + def notNone (self, text): + if text == None: + return "" + else: + return text + + def listOfIds (self, className, tokens): + """ Generate a tuple of object ids for a classname based on command tokens. """ + list = [] + if tokens[0] == "all": + for id in self.tables[className]: + list.append (id - self.baseId) + + else: + for token in tokens: + if token.find ("-") != -1: + ids = token.split("-", 2) + for id in range (int (ids[0]), int (ids[1]) + 1): + if self.getClassForId (long (id) + self.baseId) == className: + list.append (id) + else: + list.append (token) + + list.sort () + result = () + for item in list: + result = result + (item,) + return result + + def listClasses (self): + """ Generate a display of the list of classes """ + self.lock.acquire () + try: + rows = [] + sorted = self.tables.keys () + sorted.sort () + for name in sorted: + active = 0 + deleted = 0 + for record in self.tables[name]: + isdel = False + ts = self.tables[name][record][0] + if ts[2] > 0: + isdel = True + if isdel: + deleted = deleted + 1 + else: + active = active + 1 + rows.append ((name, active, deleted)) + self.disp.table ("Management Object Types:", + ("ObjectType", "Active", "Deleted"), rows) + finally: + self.lock.release () + + def listObjects (self, className): + """ Generate a display of a list of objects in a class """ + self.lock.acquire () + try: + if className not in self.tables: + print ("Object type %s not known" % className) + else: + rows = [] + sorted = self.tables[className].keys () + sorted.sort () + for objId in sorted: + (ts, config, inst) = self.tables[className][objId] + createTime = self.disp.timestamp (ts[1]) + destroyTime = "-" + if ts[2] > 0: + destroyTime = self.disp.timestamp (ts[2]) + objIndex = self.getObjIndex (className, config) + row = (objId - self.baseId, createTime, destroyTime, objIndex) + rows.append (row) + self.disp.table ("Objects of type %s" % className, + ("ID", "Created", "Destroyed", "Index"), + rows) + finally: + self.lock.release () + + def showObjects (self, tokens): + """ Generate a display of object data for a particular class """ + self.lock.acquire () + try: + className = tokens[0] + if className not in self.tables: + print "Class not known: %s" % className + raise ValueError () + + userIds = self.listOfIds (className, tokens[1:]) + if len (userIds) == 0: + print "No object IDs supplied" + raise ValueError () + + ids = [] + for id in userIds: + if self.getClassForId (long (id) + self.baseId) == className: + ids.append (long (id) + self.baseId) + + rows = [] + config = self.tables[className][ids[0]][1] + for eIdx in range (len (config)): + key = config[eIdx][0] + if key != "id": + isRef = key.find ("Ref") == len (key) - 3 + row = ("config", key) + for id in ids: + value = self.tables[className][id][1][eIdx][1] + if isRef: + value = value - self.baseId + row = row + (value,) + rows.append (row) + + inst = self.tables[className][ids[0]][2] + for eIdx in range (len (inst)): + key = inst[eIdx][0] + if key != "id": + isRef = key.find ("Ref") == len (key) - 3 + row = ("inst", key) + for id in ids: + value = self.tables[className][id][2][eIdx][1] + if isRef: + value = value - self.baseId + row = row + (value,) + rows.append (row) + + titleRow = ("Type", "Element") + for id in ids: + titleRow = titleRow + (str (id - self.baseId),) + self.disp.table ("Object of type %s:" % className, titleRow, rows) + + except: + pass + self.lock.release () + + def schemaSummary (self): + """ Generate a display of the list of classes in the schema """ + self.lock.acquire () + try: + rows = [] + sorted = self.schema.keys () + sorted.sort () + for className in sorted: + tuple = self.schema[className] + row = (className, len (tuple[0]), len (tuple[1]), len (tuple[2]), len (tuple[3])) + rows.append (row) + self.disp.table ("Classes in Schema:", + ("Class", "ConfigElements", "InstElements", "Methods", "Events"), + rows) + finally: + self.lock.release () + + def schemaTable (self, className): + """ Generate a display of details of the schema of a particular class """ + self.lock.acquire () + try: + if className not in self.schema: + print ("Class name %s not known" % className) + raise ValueError () + + rows = [] + for config in self.schema[className][0]: + name = config[0] + if name != "id": + typename = self.typeName(config[1]) + unit = self.notNone (config[2]) + desc = self.notNone (config[3]) + access = self.accessName (config[4]) + extra = "" + if config[5] == 1: + extra = extra + "index " + if config[6] != None: + extra = extra + "Min: " + str (config[6]) + if config[7] != None: + extra = extra + "Max: " + str (config[7]) + if config[8] != None: + extra = extra + "MaxLen: " + str (config[8]) + rows.append ((name, typename, unit, access, extra, desc)) + + for config in self.schema[className][1]: + name = config[0] + if name != "id": + typename = self.typeName(config[1]) + unit = self.notNone (config[2]) + desc = self.notNone (config[3]) + rows.append ((name, typename, unit, "", "", desc)) + + titles = ("Element", "Type", "Unit", "Access", "Notes", "Description") + self.disp.table ("Schema for class '%s':" % className, titles, rows) + + for method in self.schema[className][2]: + mname = method[0] + mdesc = method[1] + args = method[2] + caption = "\nMethod '%s' %s" % (mname, self.notNone (mdesc)) + rows = [] + for arg in args: + name = arg[0] + typename = self.typeName (arg[1]) + dir = arg[2] + unit = self.notNone (arg[3]) + desc = self.notNone (arg[4]) + extra = "" + if arg[5] != None: + extra = extra + "Min: " + str (arg[5]) + if arg[6] != None: + extra = extra + "Max: " + str (arg[6]) + if arg[7] != None: + extra = extra + "MaxLen: " + str (arg[7]) + if arg[8] != None: + extra = extra + "Default: " + str (arg[8]) + rows.append ((name, typename, dir, unit, extra, desc)) + titles = ("Argument", "Type", "Direction", "Unit", "Notes", "Description") + self.disp.table (caption, titles, rows) + + except: + pass + self.lock.release () + + def getClassForId (self, objId): + """ Given an object ID, return the class name for the referenced object """ + for className in self.tables: + if objId in self.tables[className]: + return className + return None + + def do_list (self, data): + tokens = data.split () + if len (tokens) == 0: + self.listClasses () + elif len (tokens) == 1: + self.listObjects (data) + else: + self.showObjects (tokens) + + def do_schema (self, data): + if data == "": + self.schemaSummary () + else: + self.schemaTable (data) + + def do_call (self, data): + print "Not yet implemented" diff --git a/qpid/python/qpid/management.py b/qpid/python/qpid/management.py index a5ad997a24..8373ecceb7 100644 --- a/qpid/python/qpid/management.py +++ b/qpid/python/qpid/management.py @@ -31,17 +31,74 @@ from qpid.client import Client from qpid.content import Content from cStringIO import StringIO from codec import Codec, EOF +from threading import Lock + + +class SequenceManager: + def __init__ (self): + self.lock = Lock () + self.sequence = 0 + self.pending = {} + + def reserve (self, data): + self.lock.acquire () + result = self.sequence + self.sequence = self.sequence + 1 + self.pending[result] = data + self.lock.release () + return result + + def release (self, seq): + data = None + self.lock.acquire () + if seq in self.pending: + data = self.pending[seq] + del self.pending[seq] + self.lock.release () + return data -#=================================================================== -# ManagementMetadata -# -# One instance of this class is created for each ManagedBroker. It -# is used to store metadata from the broker which is needed for the -# proper interpretation of recevied management content. -# -#=================================================================== class ManagementMetadata: - + """One instance of this class is created for each ManagedBroker. It + is used to store metadata from the broker which is needed for the + proper interpretation of received management content.""" + + def encodeValue (self, codec, value, typecode): + if typecode == 1: + codec.encode_octet (int (value)) + elif typecode == 2: + codec.encode_short (int (value)) + elif typecode == 3: + codec.encode_long (long (value)) + elif typecode == 4: + codec.encode_longlong (long (value)) + elif typecode == 5: + codec.encode_octet (int (value)) + elif typecode == 6: + codec.encode_shortstr (value) + elif typecode == 7: + codec.encode_longstr (value) + else: + raise ValueError ("Invalid type code: %d" % typecode) + + def decodeValue (self, codec, typecode): + if typecode == 1: + data = codec.decode_octet () + elif typecode == 2: + data = codec.decode_short () + elif typecode == 3: + data = codec.decode_long () + elif typecode == 4: + data = codec.decode_longlong () + elif typecode == 5: + data = codec.decode_octet () + elif typecode == 6: + data = codec.decode_shortstr () + elif typecode == 7: + data = codec.decode_longstr () + else: + raise ValueError ("Invalid type code: %d" % typecode) + return data + def parseSchema (self, cls, codec): className = codec.decode_shortstr () configCount = codec.decode_short () @@ -100,12 +157,56 @@ class ManagementMetadata: inst = (name, type, unit, desc) insts.append (inst) - # TODO: Handle notification of schema change outbound + for idx in range (methodCount): + ft = codec.decode_table () + mname = ft["name"] + argCount = ft["argCount"] + if "desc" in ft: + mdesc = ft["desc"] + else: + mdesc = None + + args = [] + for aidx in range (argCount): + ft = codec.decode_table () + name = ft["name"] + type = ft["type"] + dir = ft["dir"].upper () + unit = None + min = None + max = None + maxlen = None + desc = None + default = None + + for key, value in ft.items (): + if key == "unit": + unit = value + elif key == "min": + min = value + elif key == "max": + max = value + elif key == "maxlen": + maxlen = value + elif key == "desc": + desc = value + elif key == "default": + default = value + + arg = (name, type, dir, unit, desc, min, max, maxlen, default) + args.append (arg) + methods.append ((mname, mdesc, args)) + + self.schema[(className,'C')] = configs self.schema[(className,'I')] = insts self.schema[(className,'M')] = methods self.schema[(className,'E')] = events + if self.broker.schema_cb != None: + self.broker.schema_cb[1] (self.broker.schema_cb[0], className, + configs, insts, methods, events) + def parseContent (self, cls, codec): if cls == 'C' and self.broker.config_cb == None: return @@ -127,22 +228,7 @@ class ManagementMetadata: for element in self.schema[(className,cls)][:]: tc = element[1] name = element[0] - if tc == 1: # TODO: Define constants for these - data = codec.decode_octet () - elif tc == 2: - data = codec.decode_short () - elif tc == 3: - data = codec.decode_long () - elif tc == 4: - data = codec.decode_longlong () - elif tc == 5: - data = codec.decode_octet () - elif tc == 6: - data = codec.decode_shortstr () - elif tc == 7: - data = codec.decode_longstr () - else: - raise ValueError ("Invalid type code: %d" % tc) + data = self.decodeValue (codec, tc) row.append ((name, data)) if cls == 'C': @@ -168,14 +254,9 @@ class ManagementMetadata: self.schema = {} -#=================================================================== -# ManagedBroker -# -# An object of this class represents a connection (over AMQP) to a -# single managed broker. -# -#=================================================================== class ManagedBroker: + """An object of this class represents a connection (over AMQP) to a + single managed broker.""" mExchange = "qpid.management" dExchange = "amq.direct" @@ -205,18 +286,35 @@ class ManagedBroker: msg.complete () def reply_cb (self, msg): - codec = Codec (StringIO (msg.content.body), self.spec) - methodId = codec.decode_long () + codec = Codec (StringIO (msg.content.body), self.spec) + sequence = codec.decode_long () status = codec.decode_long () sText = codec.decode_shortstr () - args = {} + data = self.sequenceManager.release (sequence) + if data == None: + msg.complete () + return + + (userSequence, className, methodName) = data + if status == 0: - args["sequence"] = codec.decode_long () - args["body"] = codec.decode_longstr () + ms = self.metadata.schema[(className,'M')] + arglist = None + for (mname, mdesc, margs) in ms: + if mname == methodName: + arglist = margs + if arglist == None: + msg.complete () + return + + args = {} + for arg in arglist: + if arg[2].find("O") != -1: + args[arg[0]] = self.metadata.decodeValue (codec, arg[1]) if self.method_cb != None: - self.method_cb[1] (self.method_cb[0], methodId, status, sText, args) + self.method_cb[1] (self.method_cb[0], userSequence, status, sText, args) msg.complete () @@ -225,17 +323,18 @@ class ManagedBroker: port = 5672, username = "guest", password = "guest", - specfile = "../specs/amqp.0-10-preview.xml"): - - self.spec = qpid.spec.load (specfile) - self.client = None - self.channel = None - self.queue = None - self.rqueue = None - self.qname = None - self.rqname = None - self.metadata = ManagementMetadata (self) - self.connected = 0 + specfile = "/usr/share/amqp/amqp.0-10-preview.xml"): + + self.spec = qpid.spec.load (specfile) + self.client = None + self.channel = None + self.queue = None + self.rqueue = None + self.qname = None + self.rqname = None + self.metadata = ManagementMetadata (self) + self.sequenceManager = SequenceManager () + self.connected = 0 self.lastConnectError = None # Initialize the callback records @@ -265,17 +364,37 @@ class ManagedBroker: def instrumentationListener (self, context, callback): self.inst_cb = (context, callback) - def method (self, methodId, objId, className, + def method (self, userSequence, objId, className, methodName, args=None, packageName="qpid"): codec = Codec (StringIO (), self.spec); - codec.encode_long (methodId) - codec.encode_longlong (objId) - codec.encode_shortstr (self.rqname) - - # TODO: Encode args according to schema - if methodName == "echo": - codec.encode_long (args["sequence"]) - codec.encode_longstr (args["body"]) + sequence = self.sequenceManager.reserve ((userSequence, className, methodName)) + codec.encode_long (sequence) # Method sequence id + codec.encode_longlong (objId) # ID of object + codec.encode_shortstr (self.rqname) # name of reply queue + + # Encode args according to schema + if (className,'M') not in self.metadata.schema: + self.sequenceManager.release (sequence) + raise ValueError ("Unknown class name: %s" % className) + + ms = self.metadata.schema[(className,'M')] + arglist = None + for (mname, mdesc, margs) in ms: + if mname == methodName: + arglist = margs + if arglist == None: + self.sequenceManager.release (sequence) + raise ValueError ("Unknown method name: %s" % methodName) + + for arg in arglist: + if arg[2].find("I") != -1: + value = arg[8] # default + if arg[0] in args: + value = args[arg[0]] + if value == None: + self.sequenceManager.release (sequence) + raise ValueError ("Missing non-defaulted argument: %s" % arg[0]) + self.metadata.encodeValue (codec, value, arg[1]) msg = Content (codec.stream.getvalue ()) msg["content_type"] = "application/octet-stream" @@ -325,7 +444,7 @@ class ManagedBroker: self.connected = 1 except socket.error, e: - print "Socket Error Detected:", e[1] + print "Socket Error:", e[1] self.lastConnectError = e raise except: diff --git a/qpid/python/qpid/management.py.rej b/qpid/python/qpid/management.py.rej new file mode 100644 index 0000000000..28d172abe5 --- /dev/null +++ b/qpid/python/qpid/management.py.rej @@ -0,0 +1,457 @@ +*************** +*** 18,24 **** + # + + """ +- Management classes for AMQP + """ + + import qpid +--- 18,24 ---- + # + + """ ++ Management API for Qpid + """ + + import qpid +*************** +*** 42,91 **** + #=================================================================== + class ManagementMetadata: + +- def parseSchema (self, cls, oid, len, codec): +- #print "Schema Record: objId=", oid + +- config = [] +- inst = [] +- while 1: +- flags = codec.decode_octet () +- if flags == 0x80: +- break + +- tc = codec.decode_octet () +- name = codec.decode_shortstr () +- desc = codec.decode_shortstr () + +- if flags & 1: # TODO: Define constants for these +- config.append ((tc, name, desc)) +- if (flags & 1) == 0 or (flags & 2) == 2: +- inst.append ((tc, name, desc)) + + # TODO: Handle notification of schema change outbound +- self.schema[(oid,'C')] = config +- self.schema[(oid,'I')] = inst + +- def parseContent (self, cls, oid, len, codec): +- #print "Content Record: Class=", cls, ", objId=", oid +- + if cls == 'C' and self.broker.config_cb == None: + return + if cls == 'I' and self.broker.inst_cb == None: + return + +- if (oid,cls) not in self.schema: + return + + row = [] + timestamps = [] + +- timestamps.append (codec.decode_longlong ()); # Current Time +- timestamps.append (codec.decode_longlong ()); # Create Time +- timestamps.append (codec.decode_longlong ()); # Delete Time + +- for element in self.schema[(oid,cls)][:]: +- tc = element[0] +- name = element[1] + if tc == 1: # TODO: Define constants for these + data = codec.decode_octet () + elif tc == 2: +--- 42,132 ---- + #=================================================================== + class ManagementMetadata: + ++ def parseSchema (self, cls, codec): ++ className = codec.decode_shortstr () ++ configCount = codec.decode_short () ++ instCount = codec.decode_short () ++ methodCount = codec.decode_short () ++ eventCount = codec.decode_short () + ++ configs = [] ++ insts = [] ++ methods = [] ++ events = [] + ++ configs.append (("id", 4, "", "", 1, 1, None, None, None, None, None)) ++ insts.append (("id", 4, None, None)) + ++ for idx in range (configCount): ++ ft = codec.decode_table () ++ name = ft["name"] ++ type = ft["type"] ++ access = ft["access"] ++ index = ft["index"] ++ unit = None ++ min = None ++ max = None ++ maxlen = None ++ desc = None + ++ for key, value in ft.items (): ++ if key == "unit": ++ unit = value ++ elif key == "min": ++ min = value ++ elif key == "max": ++ max = value ++ elif key == "maxlen": ++ maxlen = value ++ elif key == "desc": ++ desc = value ++ ++ config = (name, type, unit, desc, access, index, min, max, maxlen) ++ configs.append (config) ++ ++ for idx in range (instCount): ++ ft = codec.decode_table () ++ name = ft["name"] ++ type = ft["type"] ++ unit = None ++ desc = None ++ ++ for key, value in ft.items (): ++ if key == "unit": ++ unit = value ++ elif key == "desc": ++ desc = value ++ ++ inst = (name, type, unit, desc) ++ insts.append (inst) ++ + # TODO: Handle notification of schema change outbound ++ self.schema[(className,'C')] = configs ++ self.schema[(className,'I')] = insts ++ self.schema[(className,'M')] = methods ++ self.schema[(className,'E')] = events + ++ def parseContent (self, cls, codec): + if cls == 'C' and self.broker.config_cb == None: + return + if cls == 'I' and self.broker.inst_cb == None: + return + ++ className = codec.decode_shortstr () ++ ++ if (className,cls) not in self.schema: + return + + row = [] + timestamps = [] + ++ timestamps.append (codec.decode_longlong ()) # Current Time ++ timestamps.append (codec.decode_longlong ()) # Create Time ++ timestamps.append (codec.decode_longlong ()) # Delete Time + ++ for element in self.schema[(className,cls)][:]: ++ tc = element[1] ++ name = element[0] + if tc == 1: # TODO: Define constants for these + data = codec.decode_octet () + elif tc == 2: +*************** +*** 98,130 **** + data = codec.decode_octet () + elif tc == 6: + data = codec.decode_shortstr () + row.append ((name, data)) + + if cls == 'C': +- self.broker.config_cb[1] (self.broker.config_cb[0], oid, row, timestamps) +- if cls == 'I': +- self.broker.inst_cb[1] (self.broker.inst_cb[0], oid, row, timestamps) + + def parse (self, codec): +- try: +- opcode = chr (codec.decode_octet ()) +- except EOF: +- return 0 + +- cls = chr (codec.decode_octet ()) +- oid = codec.decode_short () +- len = codec.decode_long () +- +- if len < 8: +- raise ValueError ("parse error: value of length field too small") +- + if opcode == 'S': +- self.parseSchema (cls, oid, len, codec) + +- if opcode == 'C': +- self.parseContent (cls, oid, len, codec) + +- return 1 + + def __init__ (self, broker): + self.broker = broker +--- 139,167 ---- + data = codec.decode_octet () + elif tc == 6: + data = codec.decode_shortstr () ++ elif tc == 7: ++ data = codec.decode_longstr () ++ else: ++ raise ValueError ("Invalid type code: %d" % tc) + row.append ((name, data)) + + if cls == 'C': ++ self.broker.config_cb[1] (self.broker.config_cb[0], className, row, timestamps) ++ elif cls == 'I': ++ self.broker.inst_cb[1] (self.broker.inst_cb[0], className, row, timestamps) + + def parse (self, codec): ++ opcode = chr (codec.decode_octet ()) ++ cls = chr (codec.decode_octet ()) + + if opcode == 'S': ++ self.parseSchema (cls, codec) + ++ elif opcode == 'C': ++ self.parseContent (cls, codec) + ++ else: ++ raise ValueError ("Unknown opcode: %c" % opcode); + + def __init__ (self, broker): + self.broker = broker +*************** +*** 140,146 **** + #=================================================================== + class ManagedBroker: + +- exchange = "qpid.management" + + def checkHeader (self, codec): + octet = chr (codec.decode_octet ()) +--- 177,184 ---- + #=================================================================== + class ManagedBroker: + ++ mExchange = "qpid.management" ++ dExchange = "amq.direct" + + def checkHeader (self, codec): + octet = chr (codec.decode_octet ()) +*************** +*** 157,225 **** + return 0 + return 1 + +- def receive_cb (self, msg): + codec = Codec (StringIO (msg.content.body), self.spec) + + if self.checkHeader (codec) == 0: + raise ValueError ("outer header invalid"); + +- while self.metadata.parse (codec): +- pass + + msg.complete () + +- def __init__ (self, host = "localhost", port = 5672, +- username = "guest", password = "guest"): + +- self.spec = qpid.spec.load ("../specs/amqp.0-10-preview.xml") +- self.client = None +- self.channel = None +- self.queue = None +- self.qname = None +- self.metadata = ManagementMetadata (self) + + # Initialize the callback records + self.schema_cb = None + self.config_cb = None + self.inst_cb = None + + self.host = host + self.port = port + self.username = username + self.password = password + + def schemaListener (self, context, callback): + self.schema_cb = (context, callback) + + def configListener (self, context, callback): + self.config_cb = (context, callback) + + def instrumentationListener (self, context, callback): + self.inst_cb = (context, callback) + + def start (self): +- print "Connecting to broker", self.host + + try: + self.client = Client (self.host, self.port, self.spec) + self.client.start ({"LOGIN": self.username, "PASSWORD": self.password}) + self.channel = self.client.channel (1) +- response = self.channel.session_open (detached_lifetime=300) +- self.qname = "mgmt-" + base64.urlsafe_b64encode(response.session_id) + +- self.channel.queue_declare (queue=self.qname, exclusive=1, auto_delete=1) +- self.channel.queue_bind (exchange=ManagedBroker.exchange, queue=self.qname, +- routing_key="mgmt") +- self.channel.message_subscribe (queue=self.qname, destination="dest") +- self.queue = self.client.queue ("dest") +- self.queue.listen (self.receive_cb) + +- self.channel.message_flow_mode (destination="dest", mode=1) +- self.channel.message_flow (destination="dest", unit=0, value=0xFFFFFFFF) +- self.channel.message_flow (destination="dest", unit=1, value=0xFFFFFFFF) + + except socket.error, e: + print "Socket Error Detected:", e[1] + raise + except: + raise +--- 195,335 ---- + return 0 + return 1 + ++ def publish_cb (self, msg): + codec = Codec (StringIO (msg.content.body), self.spec) + + if self.checkHeader (codec) == 0: + raise ValueError ("outer header invalid"); + ++ self.metadata.parse (codec) ++ msg.complete () + ++ def reply_cb (self, msg): ++ codec = Codec (StringIO (msg.content.body), self.spec) ++ methodId = codec.decode_long () ++ status = codec.decode_long () ++ sText = codec.decode_shortstr () ++ ++ args = {} ++ if status == 0: ++ args["sequence"] = codec.decode_long () ++ args["body"] = codec.decode_longstr () ++ ++ if self.method_cb != None: ++ self.method_cb[1] (self.method_cb[0], methodId, status, sText, args) ++ + msg.complete () + ++ def __init__ (self, ++ host = "localhost", ++ port = 5672, ++ username = "guest", ++ password = "guest", ++ specfile = "../specs/amqp.0-10-preview.xml"): + ++ self.spec = qpid.spec.load (specfile) ++ self.client = None ++ self.channel = None ++ self.queue = None ++ self.rqueue = None ++ self.qname = None ++ self.rqname = None ++ self.metadata = ManagementMetadata (self) ++ self.connected = 0 ++ self.lastConnectError = None + + # Initialize the callback records ++ self.status_cb = None + self.schema_cb = None + self.config_cb = None + self.inst_cb = None ++ self.method_cb = None + + self.host = host + self.port = port + self.username = username + self.password = password + ++ def statusListener (self, context, callback): ++ self.status_cb = (context, callback) ++ + def schemaListener (self, context, callback): + self.schema_cb = (context, callback) + + def configListener (self, context, callback): + self.config_cb = (context, callback) + ++ def methodListener (self, context, callback): ++ self.method_cb = (context, callback) ++ + def instrumentationListener (self, context, callback): + self.inst_cb = (context, callback) + ++ def method (self, methodId, objId, className, ++ methodName, args=None, packageName="qpid"): ++ codec = Codec (StringIO (), self.spec); ++ codec.encode_long (methodId) ++ codec.encode_longlong (objId) ++ codec.encode_shortstr (self.rqname) ++ ++ # TODO: Encode args according to schema ++ if methodName == "echo": ++ codec.encode_long (args["sequence"]) ++ codec.encode_longstr (args["body"]) ++ ++ msg = Content (codec.stream.getvalue ()) ++ msg["content_type"] = "application/octet-stream" ++ msg["routing_key"] = "method." + packageName + "." + className + "." + methodName ++ msg["reply_to"] = self.spec.struct ("reply_to") ++ self.channel.message_transfer (destination="qpid.management", content=msg) ++ ++ def isConnected (self): ++ return connected ++ + def start (self): ++ print "Connecting to broker %s:%d" % (self.host, self.port) + + try: + self.client = Client (self.host, self.port, self.spec) + self.client.start ({"LOGIN": self.username, "PASSWORD": self.password}) + self.channel = self.client.channel (1) ++ response = self.channel.session_open (detached_lifetime=10) ++ self.qname = "mgmt-" + base64.urlsafe_b64encode (response.session_id) ++ self.rqname = "reply-" + base64.urlsafe_b64encode (response.session_id) + ++ self.channel.queue_declare (queue=self.qname, exclusive=1, auto_delete=1) ++ self.channel.queue_declare (queue=self.rqname, exclusive=1, auto_delete=1) ++ ++ self.channel.queue_bind (exchange=ManagedBroker.mExchange, queue=self.qname, ++ routing_key="mgmt.#") ++ self.channel.queue_bind (exchange=ManagedBroker.dExchange, queue=self.rqname, ++ routing_key=self.rqname) + ++ self.channel.message_subscribe (queue=self.qname, destination="mdest") ++ self.channel.message_subscribe (queue=self.rqname, destination="rdest") + ++ self.queue = self.client.queue ("mdest") ++ self.queue.listen (self.publish_cb) ++ ++ self.channel.message_flow_mode (destination="mdest", mode=1) ++ self.channel.message_flow (destination="mdest", unit=0, value=0xFFFFFFFF) ++ self.channel.message_flow (destination="mdest", unit=1, value=0xFFFFFFFF) ++ ++ self.rqueue = self.client.queue ("rdest") ++ self.rqueue.listen (self.reply_cb) ++ ++ self.channel.message_flow_mode (destination="rdest", mode=1) ++ self.channel.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF) ++ self.channel.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF) ++ ++ self.connected = 1 ++ + except socket.error, e: + print "Socket Error Detected:", e[1] ++ self.lastConnectError = e + raise + except: + raise ++ ++ def stop (self): ++ pass diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml index 3105c5e055..2147122f0a 100644 --- a/qpid/specs/management-schema.xml +++ b/qpid/specs/management-schema.xml @@ -109,27 +109,6 @@ <configElement name="durable" type="bool" access="RC"/> <configElement name="autoDelete" type="bool" access="RC"/> <configElement name="exclusive" type="bool" access="RC"/> - <configElement name="pageMemoryLimit" type="uint32" access="RO"/> - - <!-- Persistent Journal Support --> - <instElement name="journalLocation" type="sstr" desc="Logical directory on disk"/> - <instElement name="journalBaseFileName" type="sstr" desc="Base filename prefix for journal"/> - <instElement name="journalInitialFileCount" type="uint32" desc="Number of files initially allocated to this journal"/> - <instElement name="journalCurrentFileCount" type="uint32" desc="Number of files currently allocated to this journal"/> - <instElement name="journalDataFileSize" type="uint32" unit="byte" desc="Size of each journal data file"/> - <instElement name="journalFreeFileCount" type="hilo32" desc="Number of files free on this journal. Includes free files trapped in holes."/> - <instElement name="journalAvailableFileCount" type="hilo32" desc="Number of files available to be written. Excluding holes"/> - <instElement name="journalRecordDepth" type="hilo32" unit="record" desc="Number of enqueued records (durable messages)"/> - <instElement name="journalRecordEnqueues" type="count64" unit="record" desc="Total enqueued records on journal"/> - <instElement name="journalRecordDequeues" type="count64" unit="record" desc="Total dequeued records on journal"/> - <instElement name="journalWriteWaitFailures" type="count64" unit="record" desc="AIO Wait failures on write"/> - <instElement name="journalWriteBusyFailures" type="count64" unit="record" desc="AIO Busy failures on write"/> - <instElement name="journalReadRecordCount" type="count64" unit="record" desc="Records read from the journal"/> - <instElement name="journalReadBusyFailures" type="count64" unit="record" desc="AIO Busy failures on read"/> - <instElement name="journalWritePageCacheDepth" type="hilo32" unit="page" desc="Current depth of write-page-cache"/> - <instElement name="journalWritePageSize" type="uint32" unit="byte" desc="Page size in write-page-cache"/> - <instElement name="journalReadPageCacheDepth" type="hilo32" unit="page" desc="Current depth of read-page-cache"/> - <instElement name="journalReadPageSize" type="uint32" unit="byte" desc="Page size in read-page-cache"/> <instElement name="msgTotalEnqueues" type="count64" unit="message" desc="Total messages enqueued"/> <instElement name="msgTotalDequeues" type="count64" unit="message" desc="Total messages dequeued"/> |