diff options
Diffstat (limited to 'qpid/python/qpid/management.py')
-rw-r--r-- | qpid/python/qpid/management.py | 239 |
1 files changed, 179 insertions, 60 deletions
diff --git a/qpid/python/qpid/management.py b/qpid/python/qpid/management.py index a5ad997a24..8373ecceb7 100644 --- a/qpid/python/qpid/management.py +++ b/qpid/python/qpid/management.py @@ -31,17 +31,74 @@ from qpid.client import Client from qpid.content import Content from cStringIO import StringIO from codec import Codec, EOF +from threading import Lock + + +class SequenceManager: + def __init__ (self): + self.lock = Lock () + self.sequence = 0 + self.pending = {} + + def reserve (self, data): + self.lock.acquire () + result = self.sequence + self.sequence = self.sequence + 1 + self.pending[result] = data + self.lock.release () + return result + + def release (self, seq): + data = None + self.lock.acquire () + if seq in self.pending: + data = self.pending[seq] + del self.pending[seq] + self.lock.release () + return data -#=================================================================== -# ManagementMetadata -# -# One instance of this class is created for each ManagedBroker. It -# is used to store metadata from the broker which is needed for the -# proper interpretation of recevied management content. -# -#=================================================================== class ManagementMetadata: - + """One instance of this class is created for each ManagedBroker. It + is used to store metadata from the broker which is needed for the + proper interpretation of received management content.""" + + def encodeValue (self, codec, value, typecode): + if typecode == 1: + codec.encode_octet (int (value)) + elif typecode == 2: + codec.encode_short (int (value)) + elif typecode == 3: + codec.encode_long (long (value)) + elif typecode == 4: + codec.encode_longlong (long (value)) + elif typecode == 5: + codec.encode_octet (int (value)) + elif typecode == 6: + codec.encode_shortstr (value) + elif typecode == 7: + codec.encode_longstr (value) + else: + raise ValueError ("Invalid type code: %d" % typecode) + + def decodeValue (self, codec, typecode): + if typecode == 1: + data = codec.decode_octet () + elif typecode == 2: + data = codec.decode_short () + elif typecode == 3: + data = codec.decode_long () + elif typecode == 4: + data = codec.decode_longlong () + elif typecode == 5: + data = codec.decode_octet () + elif typecode == 6: + data = codec.decode_shortstr () + elif typecode == 7: + data = codec.decode_longstr () + else: + raise ValueError ("Invalid type code: %d" % typecode) + return data + def parseSchema (self, cls, codec): className = codec.decode_shortstr () configCount = codec.decode_short () @@ -100,12 +157,56 @@ class ManagementMetadata: inst = (name, type, unit, desc) insts.append (inst) - # TODO: Handle notification of schema change outbound + for idx in range (methodCount): + ft = codec.decode_table () + mname = ft["name"] + argCount = ft["argCount"] + if "desc" in ft: + mdesc = ft["desc"] + else: + mdesc = None + + args = [] + for aidx in range (argCount): + ft = codec.decode_table () + name = ft["name"] + type = ft["type"] + dir = ft["dir"].upper () + unit = None + min = None + max = None + maxlen = None + desc = None + default = None + + for key, value in ft.items (): + if key == "unit": + unit = value + elif key == "min": + min = value + elif key == "max": + max = value + elif key == "maxlen": + maxlen = value + elif key == "desc": + desc = value + elif key == "default": + default = value + + arg = (name, type, dir, unit, desc, min, max, maxlen, default) + args.append (arg) + methods.append ((mname, mdesc, args)) + + self.schema[(className,'C')] = configs self.schema[(className,'I')] = insts self.schema[(className,'M')] = methods self.schema[(className,'E')] = events + if self.broker.schema_cb != None: + self.broker.schema_cb[1] (self.broker.schema_cb[0], className, + configs, insts, methods, events) + def parseContent (self, cls, codec): if cls == 'C' and self.broker.config_cb == None: return @@ -127,22 +228,7 @@ class ManagementMetadata: for element in self.schema[(className,cls)][:]: tc = element[1] name = element[0] - if tc == 1: # TODO: Define constants for these - data = codec.decode_octet () - elif tc == 2: - data = codec.decode_short () - elif tc == 3: - data = codec.decode_long () - elif tc == 4: - data = codec.decode_longlong () - elif tc == 5: - data = codec.decode_octet () - elif tc == 6: - data = codec.decode_shortstr () - elif tc == 7: - data = codec.decode_longstr () - else: - raise ValueError ("Invalid type code: %d" % tc) + data = self.decodeValue (codec, tc) row.append ((name, data)) if cls == 'C': @@ -168,14 +254,9 @@ class ManagementMetadata: self.schema = {} -#=================================================================== -# ManagedBroker -# -# An object of this class represents a connection (over AMQP) to a -# single managed broker. -# -#=================================================================== class ManagedBroker: + """An object of this class represents a connection (over AMQP) to a + single managed broker.""" mExchange = "qpid.management" dExchange = "amq.direct" @@ -205,18 +286,35 @@ class ManagedBroker: msg.complete () def reply_cb (self, msg): - codec = Codec (StringIO (msg.content.body), self.spec) - methodId = codec.decode_long () + codec = Codec (StringIO (msg.content.body), self.spec) + sequence = codec.decode_long () status = codec.decode_long () sText = codec.decode_shortstr () - args = {} + data = self.sequenceManager.release (sequence) + if data == None: + msg.complete () + return + + (userSequence, className, methodName) = data + if status == 0: - args["sequence"] = codec.decode_long () - args["body"] = codec.decode_longstr () + ms = self.metadata.schema[(className,'M')] + arglist = None + for (mname, mdesc, margs) in ms: + if mname == methodName: + arglist = margs + if arglist == None: + msg.complete () + return + + args = {} + for arg in arglist: + if arg[2].find("O") != -1: + args[arg[0]] = self.metadata.decodeValue (codec, arg[1]) if self.method_cb != None: - self.method_cb[1] (self.method_cb[0], methodId, status, sText, args) + self.method_cb[1] (self.method_cb[0], userSequence, status, sText, args) msg.complete () @@ -225,17 +323,18 @@ class ManagedBroker: port = 5672, username = "guest", password = "guest", - specfile = "../specs/amqp.0-10-preview.xml"): - - self.spec = qpid.spec.load (specfile) - self.client = None - self.channel = None - self.queue = None - self.rqueue = None - self.qname = None - self.rqname = None - self.metadata = ManagementMetadata (self) - self.connected = 0 + specfile = "/usr/share/amqp/amqp.0-10-preview.xml"): + + self.spec = qpid.spec.load (specfile) + self.client = None + self.channel = None + self.queue = None + self.rqueue = None + self.qname = None + self.rqname = None + self.metadata = ManagementMetadata (self) + self.sequenceManager = SequenceManager () + self.connected = 0 self.lastConnectError = None # Initialize the callback records @@ -265,17 +364,37 @@ class ManagedBroker: def instrumentationListener (self, context, callback): self.inst_cb = (context, callback) - def method (self, methodId, objId, className, + def method (self, userSequence, objId, className, methodName, args=None, packageName="qpid"): codec = Codec (StringIO (), self.spec); - codec.encode_long (methodId) - codec.encode_longlong (objId) - codec.encode_shortstr (self.rqname) - - # TODO: Encode args according to schema - if methodName == "echo": - codec.encode_long (args["sequence"]) - codec.encode_longstr (args["body"]) + sequence = self.sequenceManager.reserve ((userSequence, className, methodName)) + codec.encode_long (sequence) # Method sequence id + codec.encode_longlong (objId) # ID of object + codec.encode_shortstr (self.rqname) # name of reply queue + + # Encode args according to schema + if (className,'M') not in self.metadata.schema: + self.sequenceManager.release (sequence) + raise ValueError ("Unknown class name: %s" % className) + + ms = self.metadata.schema[(className,'M')] + arglist = None + for (mname, mdesc, margs) in ms: + if mname == methodName: + arglist = margs + if arglist == None: + self.sequenceManager.release (sequence) + raise ValueError ("Unknown method name: %s" % methodName) + + for arg in arglist: + if arg[2].find("I") != -1: + value = arg[8] # default + if arg[0] in args: + value = args[arg[0]] + if value == None: + self.sequenceManager.release (sequence) + raise ValueError ("Missing non-defaulted argument: %s" % arg[0]) + self.metadata.encodeValue (codec, value, arg[1]) msg = Content (codec.stream.getvalue ()) msg["content_type"] = "application/octet-stream" @@ -325,7 +444,7 @@ class ManagedBroker: self.connected = 1 except socket.error, e: - print "Socket Error Detected:", e[1] + print "Socket Error:", e[1] self.lastConnectError = e raise except: |