summaryrefslogtreecommitdiff
path: root/python/qpid
diff options
context:
space:
mode:
authorNuno Santos <nsantos@apache.org>2008-03-25 20:30:01 +0000
committerNuno Santos <nsantos@apache.org>2008-03-25 20:30:01 +0000
commitaefbf926e26e61460a5a11533361a8da9c11bb9c (patch)
tree3971bd2ad4ae9108b0b20239bf08020e9753b4ce /python/qpid
parent5444def1a124b7ef609ad2a585d333a4654c736a (diff)
downloadqpid-python-aefbf926e26e61460a5a11533361a8da9c11bb9c.tar.gz
QPID-877: applied patch from Ted Ross
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@640970 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid')
-rw-r--r--python/qpid/management.py137
1 files changed, 100 insertions, 37 deletions
diff --git a/python/qpid/management.py b/python/qpid/management.py
index b5d992cf5d..33679cf0da 100644
--- a/python/qpid/management.py
+++ b/python/qpid/management.py
@@ -69,12 +69,14 @@ class managementChannel:
opens a session and performs all of the declarations and bindings needed
to participate in the management protocol. """
response = ch.session_open (detached_lifetime=300)
+ self.sessionId = response.session_id
self.topicName = "mgmt-" + base64.urlsafe_b64encode (response.session_id)
self.replyName = "reply-" + base64.urlsafe_b64encode (response.session_id)
self.qpidChannel = ch
self.tcb = topicCb
self.rcb = replyCb
self.context = cbContext
+ self.reqsOutstanding = 0
ch.queue_declare (queue=self.topicName, exclusive=1, auto_delete=1)
ch.queue_declare (queue=self.replyName, exclusive=1, auto_delete=1)
@@ -114,6 +116,10 @@ class managementClient:
network. It implements the management protocol and manages the management
schemas as advertised by the various management agents in the network. """
+ CTRL_BROKER_INFO = 1
+ CTRL_SCHEMA_LOADED = 2
+ CTRL_USER = 3
+
#========================================================
# User API - interacts with the class's user
#========================================================
@@ -144,7 +150,7 @@ class managementClient:
""" Register a new channel. """
self.channels.append (channel)
codec = Codec (StringIO (), self.spec)
- self.setHeader (codec, ord ('H'))
+ self.setHeader (codec, ord ('B'))
msg = Content (codec.stream.getvalue ())
msg["content_type"] = "application/octet-stream"
msg["routing_key"] = "agent"
@@ -161,6 +167,22 @@ class managementClient:
""" Invoke a method on a managed object. """
self.method (channel, userSequence, objId, className, methodName, args)
+ def getObjects (self, channel, userSequence, className):
+ """ Request immediate content from broker """
+ codec = Codec (StringIO (), self.spec)
+ self.setHeader (codec, ord ('G'), userSequence)
+ ft = {}
+ ft["_class"] = className
+ codec.encode_table (ft)
+ msg = Content (codec.stream.getvalue ())
+ msg["content_type"] = "application/octet-stream"
+ msg["routing_key"] = "agent"
+ msg["reply_to"] = self.spec.struct ("reply_to")
+ msg["reply_to"]["exchange_name"] = "amq.direct"
+ msg["reply_to"]["routing_key"] = channel.replyName
+ channel.send ("qpid.management", msg)
+
+
#========================================================
# Channel API - interacts with registered channel objects
#========================================================
@@ -182,9 +204,11 @@ class managementClient:
return
if hdr[0] == 'm':
- self.handleMethodReply (ch, codec)
- elif hdr[0] == 'I':
- self.handleInit (ch, codec)
+ self.handleMethodReply (ch, codec, hdr[1])
+ elif hdr[0] == 'z':
+ self.handleCommandComplete (ch, codec, hdr[1])
+ elif hdr[0] == 'b':
+ self.handleBrokerResponse (ch, codec)
elif hdr[0] == 'p':
self.handlePackageInd (ch, codec)
elif hdr[0] == 'q':
@@ -196,14 +220,13 @@ class managementClient:
#========================================================
# Internal Functions
#========================================================
- def setHeader (self, codec, opcode, cls = 0):
+ def setHeader (self, codec, opcode, seq = 0):
""" Compose the header of a management message. """
codec.encode_octet (ord ('A'))
codec.encode_octet (ord ('M'))
- codec.encode_octet (ord ('0'))
codec.encode_octet (ord ('1'))
codec.encode_octet (opcode)
- codec.encode_octet (cls)
+ codec.encode_long (seq)
def checkHeader (self, codec):
""" Check the header of a management message and extract the opcode and
@@ -215,14 +238,11 @@ class managementClient:
if octet != 'M':
return None
octet = chr (codec.decode_octet ())
- if octet != '0':
- return None
- octet = chr (codec.decode_octet ())
if octet != '1':
return None
opcode = chr (codec.decode_octet ())
- cls = chr (codec.decode_octet ())
- return (opcode, cls)
+ seq = codec.decode_long ()
+ return (opcode, seq)
def encodeValue (self, codec, value, typecode):
""" Encode, into the codec, a value based on its typecode. """
@@ -252,6 +272,8 @@ class managementClient:
codec.encode_float (float (value))
elif typecode == 13: # DOUBLE
codec.encode_double (double (value))
+ elif typecode == 14: # UUID
+ codec.encode_uuid (value)
else:
raise ValueError ("Invalid type code: %d" % typecode)
@@ -283,14 +305,24 @@ class managementClient:
data = codec.decode_float ()
elif typecode == 13: # DOUBLE
data = codec.decode_double ()
+ elif typecode == 14: # UUID
+ data = codec.decode_uuid ()
else:
raise ValueError ("Invalid type code: %d" % typecode)
return data
- def handleMethodReply (self, ch, codec):
- sequence = codec.decode_long ()
- status = codec.decode_long ()
- sText = codec.decode_shortstr ()
+ def incOutstanding (self, ch):
+ ch.reqsOutstanding = ch.reqsOutstanding + 1
+
+ def decOutstanding (self, ch):
+ ch.reqsOutstanding = ch.reqsOutstanding - 1
+ if ch.reqsOutstanding == 0:
+ if self.ctrlCb != None:
+ self.ctrlCb (ch.context, self.CTRL_SCHEMA_LOADED, None)
+
+ def handleMethodReply (self, ch, codec, sequence):
+ status = codec.decode_long ()
+ sText = codec.decode_shortstr ()
data = self.seqMgr.release (sequence)
if data == None:
@@ -317,15 +349,27 @@ class managementClient:
if self.methodCb != None:
self.methodCb (ch.context, userSequence, status, sText, args)
- def handleInit (self, ch, codec):
- len = codec.decode_short ()
- data = codec.decode_raw (len)
+ def handleCommandComplete (self, ch, codec, seq):
+ code = codec.decode_long ()
+ text = codec.decode_shortstr ()
+ data = (seq, code, text)
+ context = self.seqMgr.release (seq)
+ if context == "outstanding":
+ self.decOutstanding (ch)
+ elif self.ctrlCb != None:
+ self.ctrlCb (ch.context, self.CTRL_USER, data)
+
+ def handleBrokerResponse (self, ch, codec):
if self.ctrlCb != None:
- self.ctrlCb (ch.context, len, data)
+ uuid = codec.decode_uuid ()
+ data = (uuid, ch.sessionId)
+ self.ctrlCb (ch.context, self.CTRL_BROKER_INFO, data)
# Send a package request
sendCodec = Codec (StringIO (), self.spec)
- self.setHeader (sendCodec, ord ('P'))
+ seq = self.seqMgr.reserve ("outstanding")
+ self.setHeader (sendCodec, ord ('P'), seq)
+ self.incOutstanding (ch)
smsg = Content (sendCodec.stream.getvalue ())
smsg["content_type"] = "application/octet-stream"
smsg["routing_key"] = "agent"
@@ -341,7 +385,9 @@ class managementClient:
# Send a class request
sendCodec = Codec (StringIO (), self.spec)
- self.setHeader (sendCodec, ord ('Q'))
+ seq = self.seqMgr.reserve ("outstanding")
+ self.setHeader (sendCodec, ord ('Q'), seq)
+ self.incOutstanding (ch)
sendCodec.encode_shortstr (pname)
smsg = Content (sendCodec.stream.getvalue ())
smsg["content_type"] = "application/octet-stream"
@@ -362,6 +408,7 @@ class managementClient:
# Send a schema request
sendCodec = Codec (StringIO (), self.spec)
self.setHeader (sendCodec, ord ('S'))
+ self.incOutstanding (ch)
sendCodec.encode_shortstr (pname)
sendCodec.encode_shortstr (cname)
sendCodec.encode_bin128 (hash)
@@ -373,8 +420,9 @@ class managementClient:
smsg["reply_to"]["routing_key"] = ch.replyName
ch.send ("qpid.management", smsg)
- def parseSchema (self, ch, cls, codec):
+ def parseSchema (self, ch, codec):
""" Parse a received schema-description message. """
+ self.decOutstanding (ch)
packageName = codec.decode_shortstr ()
className = codec.decode_shortstr ()
hash = codec.decode_bin128 ()
@@ -495,7 +543,7 @@ class managementClient:
def parseContent (self, ch, cls, codec):
""" Parse a received content message. """
- if cls == 'C' and self.configCb == None:
+ if (cls == 'C' or cls == 'B') and self.configCb == None:
return
if cls == 'I' and self.instCb == None:
return
@@ -516,23 +564,39 @@ class managementClient:
timestamps.append (codec.decode_longlong ()) # Delete Time
schemaClass = self.schema[classKey]
- for element in schemaClass[cls][:]:
- tc = element[1]
- name = element[0]
- data = self.decodeValue (codec, tc)
- row.append ((name, data))
-
- if cls == 'C':
+ if cls == 'C' or cls == 'B':
+ for element in schemaClass['C'][:]:
+ tc = element[1]
+ name = element[0]
+ data = self.decodeValue (codec, tc)
+ row.append ((name, data))
+
+ if cls == 'I' or cls == 'B':
+ if cls == 'B':
+ start = 1
+ else:
+ start = 0
+ for element in schemaClass['I'][start:]:
+ tc = element[1]
+ name = element[0]
+ data = self.decodeValue (codec, tc)
+ row.append ((name, data))
+
+ if cls == 'C' or cls == 'B':
self.configCb (ch.context, classKey, row, timestamps)
elif cls == 'I':
self.instCb (ch.context, classKey, row, timestamps)
- def parse (self, ch, codec, opcode, cls):
+ def parse (self, ch, codec, opcode, seq):
""" Parse a message received from the topic queue. """
if opcode == 's':
- self.parseSchema (ch, cls, codec)
- elif opcode == 'C':
- self.parseContent (ch, cls, codec)
+ self.parseSchema (ch, codec)
+ elif opcode == 'c':
+ self.parseContent (ch, 'C', codec)
+ elif opcode == 'i':
+ self.parseContent (ch, 'I', codec)
+ elif opcode == 'g':
+ self.parseContent (ch, 'B', codec)
else:
raise ValueError ("Unknown opcode: %c" % opcode);
@@ -540,8 +604,7 @@ class managementClient:
""" Invoke a method on an object """
codec = Codec (StringIO (), self.spec)
sequence = self.seqMgr.reserve ((userSequence, classId, methodName))
- self.setHeader (codec, ord ('M'))
- codec.encode_long (sequence) # Method sequence id
+ self.setHeader (codec, ord ('M'), sequence)
codec.encode_longlong (objId) # ID of object
# Encode args according to schema