summaryrefslogtreecommitdiff
path: root/qpid/python/qpid/management.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python/qpid/management.py')
-rw-r--r--qpid/python/qpid/management.py239
1 files changed, 179 insertions, 60 deletions
diff --git a/qpid/python/qpid/management.py b/qpid/python/qpid/management.py
index a5ad997a24..8373ecceb7 100644
--- a/qpid/python/qpid/management.py
+++ b/qpid/python/qpid/management.py
@@ -31,17 +31,74 @@ from qpid.client import Client
from qpid.content import Content
from cStringIO import StringIO
from codec import Codec, EOF
+from threading import Lock
+
+
+class SequenceManager:
+ def __init__ (self):
+ self.lock = Lock ()
+ self.sequence = 0
+ self.pending = {}
+
+ def reserve (self, data):
+ self.lock.acquire ()
+ result = self.sequence
+ self.sequence = self.sequence + 1
+ self.pending[result] = data
+ self.lock.release ()
+ return result
+
+ def release (self, seq):
+ data = None
+ self.lock.acquire ()
+ if seq in self.pending:
+ data = self.pending[seq]
+ del self.pending[seq]
+ self.lock.release ()
+ return data
-#===================================================================
-# ManagementMetadata
-#
-# One instance of this class is created for each ManagedBroker. It
-# is used to store metadata from the broker which is needed for the
-# proper interpretation of recevied management content.
-#
-#===================================================================
class ManagementMetadata:
-
+ """One instance of this class is created for each ManagedBroker. It
+ is used to store metadata from the broker which is needed for the
+ proper interpretation of received management content."""
+
+ def encodeValue (self, codec, value, typecode):
+ if typecode == 1:
+ codec.encode_octet (int (value))
+ elif typecode == 2:
+ codec.encode_short (int (value))
+ elif typecode == 3:
+ codec.encode_long (long (value))
+ elif typecode == 4:
+ codec.encode_longlong (long (value))
+ elif typecode == 5:
+ codec.encode_octet (int (value))
+ elif typecode == 6:
+ codec.encode_shortstr (value)
+ elif typecode == 7:
+ codec.encode_longstr (value)
+ else:
+ raise ValueError ("Invalid type code: %d" % typecode)
+
+ def decodeValue (self, codec, typecode):
+ if typecode == 1:
+ data = codec.decode_octet ()
+ elif typecode == 2:
+ data = codec.decode_short ()
+ elif typecode == 3:
+ data = codec.decode_long ()
+ elif typecode == 4:
+ data = codec.decode_longlong ()
+ elif typecode == 5:
+ data = codec.decode_octet ()
+ elif typecode == 6:
+ data = codec.decode_shortstr ()
+ elif typecode == 7:
+ data = codec.decode_longstr ()
+ else:
+ raise ValueError ("Invalid type code: %d" % typecode)
+ return data
+
def parseSchema (self, cls, codec):
className = codec.decode_shortstr ()
configCount = codec.decode_short ()
@@ -100,12 +157,56 @@ class ManagementMetadata:
inst = (name, type, unit, desc)
insts.append (inst)
- # TODO: Handle notification of schema change outbound
+ for idx in range (methodCount):
+ ft = codec.decode_table ()
+ mname = ft["name"]
+ argCount = ft["argCount"]
+ if "desc" in ft:
+ mdesc = ft["desc"]
+ else:
+ mdesc = None
+
+ args = []
+ for aidx in range (argCount):
+ ft = codec.decode_table ()
+ name = ft["name"]
+ type = ft["type"]
+ dir = ft["dir"].upper ()
+ unit = None
+ min = None
+ max = None
+ maxlen = None
+ desc = None
+ default = None
+
+ for key, value in ft.items ():
+ if key == "unit":
+ unit = value
+ elif key == "min":
+ min = value
+ elif key == "max":
+ max = value
+ elif key == "maxlen":
+ maxlen = value
+ elif key == "desc":
+ desc = value
+ elif key == "default":
+ default = value
+
+ arg = (name, type, dir, unit, desc, min, max, maxlen, default)
+ args.append (arg)
+ methods.append ((mname, mdesc, args))
+
+
self.schema[(className,'C')] = configs
self.schema[(className,'I')] = insts
self.schema[(className,'M')] = methods
self.schema[(className,'E')] = events
+ if self.broker.schema_cb != None:
+ self.broker.schema_cb[1] (self.broker.schema_cb[0], className,
+ configs, insts, methods, events)
+
def parseContent (self, cls, codec):
if cls == 'C' and self.broker.config_cb == None:
return
@@ -127,22 +228,7 @@ class ManagementMetadata:
for element in self.schema[(className,cls)][:]:
tc = element[1]
name = element[0]
- if tc == 1: # TODO: Define constants for these
- data = codec.decode_octet ()
- elif tc == 2:
- data = codec.decode_short ()
- elif tc == 3:
- data = codec.decode_long ()
- elif tc == 4:
- data = codec.decode_longlong ()
- elif tc == 5:
- data = codec.decode_octet ()
- elif tc == 6:
- data = codec.decode_shortstr ()
- elif tc == 7:
- data = codec.decode_longstr ()
- else:
- raise ValueError ("Invalid type code: %d" % tc)
+ data = self.decodeValue (codec, tc)
row.append ((name, data))
if cls == 'C':
@@ -168,14 +254,9 @@ class ManagementMetadata:
self.schema = {}
-#===================================================================
-# ManagedBroker
-#
-# An object of this class represents a connection (over AMQP) to a
-# single managed broker.
-#
-#===================================================================
class ManagedBroker:
+ """An object of this class represents a connection (over AMQP) to a
+ single managed broker."""
mExchange = "qpid.management"
dExchange = "amq.direct"
@@ -205,18 +286,35 @@ class ManagedBroker:
msg.complete ()
def reply_cb (self, msg):
- codec = Codec (StringIO (msg.content.body), self.spec)
- methodId = codec.decode_long ()
+ codec = Codec (StringIO (msg.content.body), self.spec)
+ sequence = codec.decode_long ()
status = codec.decode_long ()
sText = codec.decode_shortstr ()
- args = {}
+ data = self.sequenceManager.release (sequence)
+ if data == None:
+ msg.complete ()
+ return
+
+ (userSequence, className, methodName) = data
+
if status == 0:
- args["sequence"] = codec.decode_long ()
- args["body"] = codec.decode_longstr ()
+ ms = self.metadata.schema[(className,'M')]
+ arglist = None
+ for (mname, mdesc, margs) in ms:
+ if mname == methodName:
+ arglist = margs
+ if arglist == None:
+ msg.complete ()
+ return
+
+ args = {}
+ for arg in arglist:
+ if arg[2].find("O") != -1:
+ args[arg[0]] = self.metadata.decodeValue (codec, arg[1])
if self.method_cb != None:
- self.method_cb[1] (self.method_cb[0], methodId, status, sText, args)
+ self.method_cb[1] (self.method_cb[0], userSequence, status, sText, args)
msg.complete ()
@@ -225,17 +323,18 @@ class ManagedBroker:
port = 5672,
username = "guest",
password = "guest",
- specfile = "../specs/amqp.0-10-preview.xml"):
-
- self.spec = qpid.spec.load (specfile)
- self.client = None
- self.channel = None
- self.queue = None
- self.rqueue = None
- self.qname = None
- self.rqname = None
- self.metadata = ManagementMetadata (self)
- self.connected = 0
+ specfile = "/usr/share/amqp/amqp.0-10-preview.xml"):
+
+ self.spec = qpid.spec.load (specfile)
+ self.client = None
+ self.channel = None
+ self.queue = None
+ self.rqueue = None
+ self.qname = None
+ self.rqname = None
+ self.metadata = ManagementMetadata (self)
+ self.sequenceManager = SequenceManager ()
+ self.connected = 0
self.lastConnectError = None
# Initialize the callback records
@@ -265,17 +364,37 @@ class ManagedBroker:
def instrumentationListener (self, context, callback):
self.inst_cb = (context, callback)
- def method (self, methodId, objId, className,
+ def method (self, userSequence, objId, className,
methodName, args=None, packageName="qpid"):
codec = Codec (StringIO (), self.spec);
- codec.encode_long (methodId)
- codec.encode_longlong (objId)
- codec.encode_shortstr (self.rqname)
-
- # TODO: Encode args according to schema
- if methodName == "echo":
- codec.encode_long (args["sequence"])
- codec.encode_longstr (args["body"])
+ sequence = self.sequenceManager.reserve ((userSequence, className, methodName))
+ codec.encode_long (sequence) # Method sequence id
+ codec.encode_longlong (objId) # ID of object
+ codec.encode_shortstr (self.rqname) # name of reply queue
+
+ # Encode args according to schema
+ if (className,'M') not in self.metadata.schema:
+ self.sequenceManager.release (sequence)
+ raise ValueError ("Unknown class name: %s" % className)
+
+ ms = self.metadata.schema[(className,'M')]
+ arglist = None
+ for (mname, mdesc, margs) in ms:
+ if mname == methodName:
+ arglist = margs
+ if arglist == None:
+ self.sequenceManager.release (sequence)
+ raise ValueError ("Unknown method name: %s" % methodName)
+
+ for arg in arglist:
+ if arg[2].find("I") != -1:
+ value = arg[8] # default
+ if arg[0] in args:
+ value = args[arg[0]]
+ if value == None:
+ self.sequenceManager.release (sequence)
+ raise ValueError ("Missing non-defaulted argument: %s" % arg[0])
+ self.metadata.encodeValue (codec, value, arg[1])
msg = Content (codec.stream.getvalue ())
msg["content_type"] = "application/octet-stream"
@@ -325,7 +444,7 @@ class ManagedBroker:
self.connected = 1
except socket.error, e:
- print "Socket Error Detected:", e[1]
+ print "Socket Error:", e[1]
self.lastConnectError = e
raise
except: