diff options
author | Nuno Santos <nsantos@apache.org> | 2008-03-25 20:30:01 +0000 |
---|---|---|
committer | Nuno Santos <nsantos@apache.org> | 2008-03-25 20:30:01 +0000 |
commit | aefbf926e26e61460a5a11533361a8da9c11bb9c (patch) | |
tree | 3971bd2ad4ae9108b0b20239bf08020e9753b4ce /python/qpid | |
parent | 5444def1a124b7ef609ad2a585d333a4654c736a (diff) | |
download | qpid-python-aefbf926e26e61460a5a11533361a8da9c11bb9c.tar.gz |
QPID-877: applied patch from Ted Ross
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@640970 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid')
-rw-r--r-- | python/qpid/management.py | 137 |
1 files changed, 100 insertions, 37 deletions
diff --git a/python/qpid/management.py b/python/qpid/management.py index b5d992cf5d..33679cf0da 100644 --- a/python/qpid/management.py +++ b/python/qpid/management.py @@ -69,12 +69,14 @@ class managementChannel: opens a session and performs all of the declarations and bindings needed to participate in the management protocol. """ response = ch.session_open (detached_lifetime=300) + self.sessionId = response.session_id self.topicName = "mgmt-" + base64.urlsafe_b64encode (response.session_id) self.replyName = "reply-" + base64.urlsafe_b64encode (response.session_id) self.qpidChannel = ch self.tcb = topicCb self.rcb = replyCb self.context = cbContext + self.reqsOutstanding = 0 ch.queue_declare (queue=self.topicName, exclusive=1, auto_delete=1) ch.queue_declare (queue=self.replyName, exclusive=1, auto_delete=1) @@ -114,6 +116,10 @@ class managementClient: network. It implements the management protocol and manages the management schemas as advertised by the various management agents in the network. """ + CTRL_BROKER_INFO = 1 + CTRL_SCHEMA_LOADED = 2 + CTRL_USER = 3 + #======================================================== # User API - interacts with the class's user #======================================================== @@ -144,7 +150,7 @@ class managementClient: """ Register a new channel. """ self.channels.append (channel) codec = Codec (StringIO (), self.spec) - self.setHeader (codec, ord ('H')) + self.setHeader (codec, ord ('B')) msg = Content (codec.stream.getvalue ()) msg["content_type"] = "application/octet-stream" msg["routing_key"] = "agent" @@ -161,6 +167,22 @@ class managementClient: """ Invoke a method on a managed object. """ self.method (channel, userSequence, objId, className, methodName, args) + def getObjects (self, channel, userSequence, className): + """ Request immediate content from broker """ + codec = Codec (StringIO (), self.spec) + self.setHeader (codec, ord ('G'), userSequence) + ft = {} + ft["_class"] = className + codec.encode_table (ft) + msg = Content (codec.stream.getvalue ()) + msg["content_type"] = "application/octet-stream" + msg["routing_key"] = "agent" + msg["reply_to"] = self.spec.struct ("reply_to") + msg["reply_to"]["exchange_name"] = "amq.direct" + msg["reply_to"]["routing_key"] = channel.replyName + channel.send ("qpid.management", msg) + + #======================================================== # Channel API - interacts with registered channel objects #======================================================== @@ -182,9 +204,11 @@ class managementClient: return if hdr[0] == 'm': - self.handleMethodReply (ch, codec) - elif hdr[0] == 'I': - self.handleInit (ch, codec) + self.handleMethodReply (ch, codec, hdr[1]) + elif hdr[0] == 'z': + self.handleCommandComplete (ch, codec, hdr[1]) + elif hdr[0] == 'b': + self.handleBrokerResponse (ch, codec) elif hdr[0] == 'p': self.handlePackageInd (ch, codec) elif hdr[0] == 'q': @@ -196,14 +220,13 @@ class managementClient: #======================================================== # Internal Functions #======================================================== - def setHeader (self, codec, opcode, cls = 0): + def setHeader (self, codec, opcode, seq = 0): """ Compose the header of a management message. """ codec.encode_octet (ord ('A')) codec.encode_octet (ord ('M')) - codec.encode_octet (ord ('0')) codec.encode_octet (ord ('1')) codec.encode_octet (opcode) - codec.encode_octet (cls) + codec.encode_long (seq) def checkHeader (self, codec): """ Check the header of a management message and extract the opcode and @@ -215,14 +238,11 @@ class managementClient: if octet != 'M': return None octet = chr (codec.decode_octet ()) - if octet != '0': - return None - octet = chr (codec.decode_octet ()) if octet != '1': return None opcode = chr (codec.decode_octet ()) - cls = chr (codec.decode_octet ()) - return (opcode, cls) + seq = codec.decode_long () + return (opcode, seq) def encodeValue (self, codec, value, typecode): """ Encode, into the codec, a value based on its typecode. """ @@ -252,6 +272,8 @@ class managementClient: codec.encode_float (float (value)) elif typecode == 13: # DOUBLE codec.encode_double (double (value)) + elif typecode == 14: # UUID + codec.encode_uuid (value) else: raise ValueError ("Invalid type code: %d" % typecode) @@ -283,14 +305,24 @@ class managementClient: data = codec.decode_float () elif typecode == 13: # DOUBLE data = codec.decode_double () + elif typecode == 14: # UUID + data = codec.decode_uuid () else: raise ValueError ("Invalid type code: %d" % typecode) return data - def handleMethodReply (self, ch, codec): - sequence = codec.decode_long () - status = codec.decode_long () - sText = codec.decode_shortstr () + def incOutstanding (self, ch): + ch.reqsOutstanding = ch.reqsOutstanding + 1 + + def decOutstanding (self, ch): + ch.reqsOutstanding = ch.reqsOutstanding - 1 + if ch.reqsOutstanding == 0: + if self.ctrlCb != None: + self.ctrlCb (ch.context, self.CTRL_SCHEMA_LOADED, None) + + def handleMethodReply (self, ch, codec, sequence): + status = codec.decode_long () + sText = codec.decode_shortstr () data = self.seqMgr.release (sequence) if data == None: @@ -317,15 +349,27 @@ class managementClient: if self.methodCb != None: self.methodCb (ch.context, userSequence, status, sText, args) - def handleInit (self, ch, codec): - len = codec.decode_short () - data = codec.decode_raw (len) + def handleCommandComplete (self, ch, codec, seq): + code = codec.decode_long () + text = codec.decode_shortstr () + data = (seq, code, text) + context = self.seqMgr.release (seq) + if context == "outstanding": + self.decOutstanding (ch) + elif self.ctrlCb != None: + self.ctrlCb (ch.context, self.CTRL_USER, data) + + def handleBrokerResponse (self, ch, codec): if self.ctrlCb != None: - self.ctrlCb (ch.context, len, data) + uuid = codec.decode_uuid () + data = (uuid, ch.sessionId) + self.ctrlCb (ch.context, self.CTRL_BROKER_INFO, data) # Send a package request sendCodec = Codec (StringIO (), self.spec) - self.setHeader (sendCodec, ord ('P')) + seq = self.seqMgr.reserve ("outstanding") + self.setHeader (sendCodec, ord ('P'), seq) + self.incOutstanding (ch) smsg = Content (sendCodec.stream.getvalue ()) smsg["content_type"] = "application/octet-stream" smsg["routing_key"] = "agent" @@ -341,7 +385,9 @@ class managementClient: # Send a class request sendCodec = Codec (StringIO (), self.spec) - self.setHeader (sendCodec, ord ('Q')) + seq = self.seqMgr.reserve ("outstanding") + self.setHeader (sendCodec, ord ('Q'), seq) + self.incOutstanding (ch) sendCodec.encode_shortstr (pname) smsg = Content (sendCodec.stream.getvalue ()) smsg["content_type"] = "application/octet-stream" @@ -362,6 +408,7 @@ class managementClient: # Send a schema request sendCodec = Codec (StringIO (), self.spec) self.setHeader (sendCodec, ord ('S')) + self.incOutstanding (ch) sendCodec.encode_shortstr (pname) sendCodec.encode_shortstr (cname) sendCodec.encode_bin128 (hash) @@ -373,8 +420,9 @@ class managementClient: smsg["reply_to"]["routing_key"] = ch.replyName ch.send ("qpid.management", smsg) - def parseSchema (self, ch, cls, codec): + def parseSchema (self, ch, codec): """ Parse a received schema-description message. """ + self.decOutstanding (ch) packageName = codec.decode_shortstr () className = codec.decode_shortstr () hash = codec.decode_bin128 () @@ -495,7 +543,7 @@ class managementClient: def parseContent (self, ch, cls, codec): """ Parse a received content message. """ - if cls == 'C' and self.configCb == None: + if (cls == 'C' or cls == 'B') and self.configCb == None: return if cls == 'I' and self.instCb == None: return @@ -516,23 +564,39 @@ class managementClient: timestamps.append (codec.decode_longlong ()) # Delete Time schemaClass = self.schema[classKey] - for element in schemaClass[cls][:]: - tc = element[1] - name = element[0] - data = self.decodeValue (codec, tc) - row.append ((name, data)) - - if cls == 'C': + if cls == 'C' or cls == 'B': + for element in schemaClass['C'][:]: + tc = element[1] + name = element[0] + data = self.decodeValue (codec, tc) + row.append ((name, data)) + + if cls == 'I' or cls == 'B': + if cls == 'B': + start = 1 + else: + start = 0 + for element in schemaClass['I'][start:]: + tc = element[1] + name = element[0] + data = self.decodeValue (codec, tc) + row.append ((name, data)) + + if cls == 'C' or cls == 'B': self.configCb (ch.context, classKey, row, timestamps) elif cls == 'I': self.instCb (ch.context, classKey, row, timestamps) - def parse (self, ch, codec, opcode, cls): + def parse (self, ch, codec, opcode, seq): """ Parse a message received from the topic queue. """ if opcode == 's': - self.parseSchema (ch, cls, codec) - elif opcode == 'C': - self.parseContent (ch, cls, codec) + self.parseSchema (ch, codec) + elif opcode == 'c': + self.parseContent (ch, 'C', codec) + elif opcode == 'i': + self.parseContent (ch, 'I', codec) + elif opcode == 'g': + self.parseContent (ch, 'B', codec) else: raise ValueError ("Unknown opcode: %c" % opcode); @@ -540,8 +604,7 @@ class managementClient: """ Invoke a method on an object """ codec = Codec (StringIO (), self.spec) sequence = self.seqMgr.reserve ((userSequence, classId, methodName)) - self.setHeader (codec, ord ('M')) - codec.encode_long (sequence) # Method sequence id + self.setHeader (codec, ord ('M'), sequence) codec.encode_longlong (objId) # ID of object # Encode args according to schema |