summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-06-11 15:54:37 +0000
committerTed Ross <tross@apache.org>2009-06-11 15:54:37 +0000
commit52be8c6219f98781d83711b32d4c5971b1fa6d1c (patch)
tree812f608b9472a76d757e92be8dc3a184a2e2ae13 /python
parent2a61047873520b644cbc368ebd59fb2d5a0c217d (diff)
downloadqpid-python-52be8c6219f98781d83711b32d4c5971b1fa6d1c.tar.gz
QPID-1786 - Committed qmf patches from Bryan Kearney
Additionally updated existing qmf and Qman to be compatible. The magic number for qmf messages has been incremented. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@783818 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rwxr-xr-xpython/commands/qpid-config2
-rw-r--r--python/qmf/console.py743
-rw-r--r--python/qpid/management.py9
-rw-r--r--python/qpid/managementdata.py6
4 files changed, 527 insertions, 233 deletions
diff --git a/python/commands/qpid-config b/python/commands/qpid-config
index 838e9f340f..59145620cd 100755
--- a/python/commands/qpid-config
+++ b/python/commands/qpid-config
@@ -473,7 +473,7 @@ try:
except KeyboardInterrupt:
print
except Exception,e:
- print "Failed:", e.args
+ print "Failed: %s: %s" % (e.__class__.__name__, e)
sys.exit(1)
bm.Disconnect()
diff --git a/python/qmf/console.py b/python/qmf/console.py
index de8df06adc..36553562f7 100644
--- a/python/qmf/console.py
+++ b/python/qmf/console.py
@@ -25,10 +25,13 @@ import qpid
import struct
import socket
import re
+from qpid.datatypes import UUID
+from qpid.datatypes import timestamp
+from qpid.datatypes import datetime
from qpid.peer import Closed
from qpid.session import SessionDetached
-from qpid.connection import Connection, ConnectionFailed
-from qpid.datatypes import Message, RangedSet
+from qpid.connection import Connection, ConnectionFailed, Timeout
+from qpid.datatypes import Message, RangedSet, UUID
from qpid.util import connect, ssl, URL
from qpid.codec010 import StringCodec as Codec
from threading import Lock, Condition, Thread
@@ -107,6 +110,289 @@ class BrokerURL(URL):
def match(self, host, port):
return socket.gethostbyname(self.host) == socket.gethostbyname(host) and self.port == port
+class Object(object):
+ """ This class defines a 'proxy' object representing a real managed object on an agent.
+ Actions taken on this proxy are remotely affected on the real managed object.
+ """
+ def __init__(self, session, broker, schema, codec, prop, stat, managed=True, kwargs={}):
+ self._session = session
+ self._broker = broker
+ self._schema = schema
+ self._managed = managed
+ if self._managed:
+ self._currentTime = codec.read_uint64()
+ self._createTime = codec.read_uint64()
+ self._deleteTime = codec.read_uint64()
+ self._objectId = ObjectId(codec)
+ else:
+ self._currentTime = None
+ self._createTime = None
+ self._deleteTime = None
+ self._objectId = None
+ self._properties = []
+ self._statistics = []
+ if codec:
+ if prop:
+ notPresent = self._parsePresenceMasks(codec, schema)
+ for property in schema.getProperties():
+ if property.name in notPresent:
+ self._properties.append((property, None))
+ else:
+ self._properties.append((property, self._session._decodeValue(codec, property.type, broker)))
+ if stat:
+ for statistic in schema.getStatistics():
+ self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type, broker)))
+ else:
+ for property in schema.getProperties():
+ if property.optional:
+ self._properties.append((property, None))
+ else:
+ self._properties.append((property, self._session._defaultValue(property, broker, kwargs)))
+ for statistic in schema.getStatistics():
+ self._statistics.append((statistic, self._session._defaultValue(statistic, broker, kwargs)))
+
+ def getBroker(self):
+ """ Return the broker from which this object was sent """
+ return self._broker
+
+ def getObjectId(self):
+ """ Return the object identifier for this object """
+ return self._objectId
+
+ def getClassKey(self):
+ """ Return the class-key that references the schema describing this object. """
+ return self._schema.getKey()
+
+ def getSchema(self):
+ """ Return the schema that describes this object. """
+ return self._schema
+
+ def getMethods(self):
+ """ Return a list of methods available for this object. """
+ return self._schema.getMethods()
+
+ def getTimestamps(self):
+ """ Return the current, creation, and deletion times for this object. """
+ return self._currentTime, self._createTime, self._deleteTime
+
+ def isDeleted(self):
+ """ Return True iff this object has been deleted. """
+ return self._deleteTime != 0
+
+ def isManaged(self):
+ """ Return True iff this object is a proxy for a managed object on an agent. """
+ return self._managed
+
+ def getIndex(self):
+ """ Return a string describing this object's primary key. """
+ result = u""
+ for property, value in self._properties:
+ if property.index:
+ if result != u"":
+ result += u":"
+ try:
+ valstr = unicode(self._session._displayValue(value, property.type))
+ except:
+ valstr = u"<undecodable>"
+ result += valstr
+ return result
+
+ def getProperties(self):
+ """ Return a list of object properties """
+ return self._properties
+
+ def getStatistics(self):
+ """ Return a list of object statistics """
+ return self._statistics
+
+ def mergeUpdate(self, newer):
+ """ Replace properties and/or statistics with a newly received update """
+ if not self.isManaged():
+ raise Exception("Object is not managed")
+ if self._objectId != newer._objectId:
+ raise Exception("Objects with different object-ids")
+ if len(newer.getProperties()) > 0:
+ self._properties = newer.getProperties()
+ if len(newer.getStatistics()) > 0:
+ self._statistics = newer.getStatistics()
+
+ def update(self):
+ """ Contact the agent and retrieve the lastest property and statistic values for this object. """
+ if not self.isManaged():
+ raise Exception("Object is not managed")
+ obj = self._session.getObjects(_objectId = self._objectId, _broker=self._broker)
+ if obj:
+ self.mergeUpdate(obj[0])
+ else:
+ raise Exception("Underlying object no longer exists")
+
+ def __repr__(self):
+ if self.isManaged():
+ id = self.getObjectId().__repr__()
+ else:
+ id = "unmanaged"
+ key = self.getClassKey()
+ return key.getPackageName() + ":" + key.getClassName() +\
+ "[" + id + "] " + self.getIndex().encode("utf8")
+
+ def __getattr__(self, name):
+ for method in self._schema.getMethods():
+ if name == method.name:
+ return lambda *args, **kwargs : self._invoke(name, args, kwargs)
+ for property, value in self._properties:
+ if name == property.name:
+ return value
+ if name == "_" + property.name + "_" and property.type == 10: # Dereference references
+ deref = self._session.getObjects(_objectId=value, _broker=self._broker)
+ if len(deref) != 1:
+ return None
+ else:
+ return deref[0]
+ for statistic, value in self._statistics:
+ if name == statistic.name:
+ return value
+ raise Exception("Type Object has no attribute '%s'" % name)
+
+ def __setattr__(self, name, value):
+ if name[0] == '_':
+ super.__setattr__(self, name, value)
+ return
+
+ for prop, unusedValue in self._properties:
+ if name == prop.name:
+ newprop = (prop, value)
+ newlist = []
+ for old, val in self._properties:
+ if name == old.name:
+ newlist.append(newprop)
+ else:
+ newlist.append((old, val))
+ self._properties = newlist
+ return
+ super.__setattr__(self, name, value)
+
+ def _sendMethodRequest(self, name, args, kwargs, synchronous=False, timeWait=None):
+ for method in self._schema.getMethods():
+ if name == method.name:
+ aIdx = 0
+ sendCodec = Codec(self._broker.conn.spec)
+ seq = self._session.seqMgr._reserve((method, synchronous))
+ self._broker._setHeader(sendCodec, 'M', seq)
+ self._objectId.encode(sendCodec)
+ self._schema.getKey().encode(sendCodec)
+ sendCodec.write_str8(name)
+
+ count = 0
+ for arg in method.arguments:
+ if arg.dir.find("I") != -1:
+ count += 1
+ if count != len(args):
+ raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args)))
+
+ for arg in method.arguments:
+ if arg.dir.find("I") != -1:
+ self._session._encodeValue(sendCodec, args[aIdx], arg.type)
+ aIdx += 1
+ if timeWait:
+ ttl = timeWait * 1000
+ else:
+ ttl = None
+ smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" %
+ (self._objectId.getBrokerBank(), self._objectId.getAgentBank()),
+ ttl=ttl)
+ if synchronous:
+ try:
+ self._broker.cv.acquire()
+ self._broker.syncInFlight = True
+ finally:
+ self._broker.cv.release()
+ self._broker._send(smsg)
+ return seq
+ return None
+
+ def _invoke(self, name, args, kwargs):
+ if not self.isManaged():
+ raise Exception("Object is not managed")
+ if "_timeout" in kwargs:
+ timeout = kwargs["_timeout"]
+ else:
+ timeout = self._broker.SYNC_TIME
+
+ if "_async" in kwargs and kwargs["_async"]:
+ sync = False
+ if "_timeout" not in kwargs:
+ timeout = None
+ else:
+ sync = True
+
+ seq = self._sendMethodRequest(name, args, kwargs, sync, timeout)
+ if seq:
+ if not sync:
+ return seq
+ try:
+ self._broker.cv.acquire()
+ starttime = time()
+ while self._broker.syncInFlight and self._broker.error == None:
+ self._broker.cv.wait(timeout)
+ if time() - starttime > timeout:
+ self._session.seqMgr._release(seq)
+ raise RuntimeError("Timed out waiting for method to respond")
+ finally:
+ self._broker.cv.release()
+ if self._broker.error != None:
+ errorText = self._broker.error
+ self._broker.error = None
+ raise Exception(errorText)
+ return self._broker.syncResult
+ raise Exception("Invalid Method (software defect) [%s]" % name)
+
+ def _encodeUnmanaged(self, codec):
+ # emit presence masks for optional properties
+ mask = 0
+ bit = 0
+ for prop, value in self._properties:
+ if prop.optional:
+ if bit == 0:
+ bit = 1
+ if value:
+ mask |= bit
+ bit = bit << 1
+ if bit == 256:
+ bit = 0
+ codec.write_uint8(mask)
+ mask = 0
+ if bit != 0:
+ codec.write_uint8(mask)
+
+ codec.write_uint8(20)
+ codec.write_str8(self._schema.getKey().getPackageName())
+ codec.write_str8(self._schema.getKey().getClassName())
+ codec.write_bin128(self._schema.getKey().getHash())
+
+ # encode properties
+ for prop, value in self._properties:
+ if value != None:
+ self._session._encodeValue(codec, value, prop.type)
+
+ # encode statistics
+ for stat, value in self._statistics:
+ self._session._encodeValue(codec, value, stat.type)
+
+ def _parsePresenceMasks(self, codec, schema):
+ excludeList = []
+ bit = 0
+ for property in schema.getProperties():
+ if property.optional:
+ if bit == 0:
+ mask = codec.read_uint8()
+ bit = 1
+ if (mask & bit) == 0:
+ excludeList.append(property.name)
+ bit *= 2
+ if bit == 256:
+ bit = 0
+ return excludeList
+
class Session:
"""
An instance of the Session class represents a console session running
@@ -119,6 +405,18 @@ class Session:
DEFAULT_GET_WAIT_TIME = 60
+ ENCODINGS = {
+ str: 7,
+ timestamp: 8,
+ datetime: 8,
+ int: 9,
+ long: 9,
+ float: 13,
+ UUID: 14,
+ Object: 20,
+ list: 21
+ }
+
def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True,
manageConnections=False, userBindings=False):
"""
@@ -265,6 +563,13 @@ class Session:
agentList.append(a)
return agentList
+ def makeObject(self, classKey, broker=None, **kwargs):
+ """ Create a new, unmanaged object of the schema indicated by classKey """
+ schema = self.getSchema(classKey)
+ if schema == None:
+ raise Exception("Schema not found for classKey")
+ return Object(self, broker, schema, None, True, True, False, kwargs)
+
def getObjects(self, **kwargs):
""" Get a list of objects from QMF agents.
All arguments are passed by name(keyword).
@@ -521,7 +826,7 @@ class Session:
if code == 0:
for arg in method.arguments:
if arg.dir.find("O") != -1:
- outArgs[arg.name] = self._decodeValue(codec, arg.type)
+ outArgs[arg.name] = self._decodeValue(codec, arg.type, broker)
result = MethodResult(code, text, outArgs)
if synchronous:
try:
@@ -559,7 +864,7 @@ class Session:
def _handleSchemaResp(self, broker, codec, seq):
kind = codec.read_uint8()
classKey = ClassKey(codec)
- _class = SchemaClass(kind, classKey, codec)
+ _class = SchemaClass(kind, classKey, codec, self)
try:
self.cv.acquire()
self.packages[classKey.getPackageName()][classKey.getPackageKey()] = _class
@@ -605,9 +910,10 @@ class Session:
self.console.objectStats(broker, object)
def _handleError(self, error):
- self.error = error
try:
self.cv.acquire()
+ if len(self.syncSequenceList) > 0:
+ self.error = error
self.syncSequenceList = []
self.cv.notify()
finally:
@@ -621,7 +927,7 @@ class Session:
return False
return True
- def _decodeValue(self, codec, typecode):
+ def _decodeValue(self, codec, typecode, broker=None):
""" Decode, from the codec, a value based on its typecode. """
if typecode == 1: data = codec.read_uint8() # U8
elif typecode == 2: data = codec.read_uint16() # U16
@@ -636,11 +942,59 @@ class Session:
elif typecode == 12: data = codec.read_float() # FLOAT
elif typecode == 13: data = codec.read_double() # DOUBLE
elif typecode == 14: data = codec.read_uuid() # UUID
- elif typecode == 15: data = codec.read_map() # FTABLE
elif typecode == 16: data = codec.read_int8() # S8
elif typecode == 17: data = codec.read_int16() # S16
elif typecode == 18: data = codec.read_int32() # S32
elif typecode == 19: data = codec.read_int64() # S63
+ elif typecode == 15: # FTABLE
+ data = {}
+ sc = Codec(codec.spec, codec.read_vbin32())
+ if sc.encoded:
+ count = sc.read_uint32()
+ while count > 0:
+ k = sc.read_str8()
+ code = sc.read_uint8()
+ v = self._decodeValue(sc, code, broker)
+ data[k] = v
+ count -= 1
+ elif typecode == 20: # OBJECT
+ # Peek at the type, and if it is still 20 pull it decode. If
+ # Not, call back into self.
+ inner_type_code = codec.read_uint8()
+ if inner_type_code == 20:
+ classKey = ClassKey(codec)
+ try:
+ self.cv.acquire()
+ pname = classKey.getPackageName()
+ if pname not in self.packages:
+ return None
+ pkey = classKey.getPackageKey()
+ if pkey not in self.packages[pname]:
+ return None
+ schema = self.packages[pname][pkey]
+ finally:
+ self.cv.release()
+ data = Object(self, broker, schema, codec, True, True, False)
+ else:
+ data = self._decodeValue(codec, inner_type_code, broker)
+ elif typecode == 21: # List
+ #taken from codec10.read_list
+ sc = Codec(codec.spec, codec.read_vbin32())
+ count = sc.read_uint32()
+ data = []
+ while count > 0:
+ type = sc.read_uint8()
+ data.append(self._decodeValue(sc,type,broker))
+ count -= 1
+ elif typecode == 22: #Array
+ #taken from codec10.read_array
+ sc = Codec(codec.spec, codec.read_vbin32())
+ count = sc.read_uint32()
+ type = sc.read_uint8()
+ data = []
+ while count > 0:
+ data.append(self._decodeValue(sc,type,broker))
+ count -= 1
else:
raise ValueError("Invalid type code: %d" % typecode)
return data
@@ -660,14 +1014,54 @@ class Session:
elif typecode == 12: codec.write_float (float(value)) # FLOAT
elif typecode == 13: codec.write_double (float(value)) # DOUBLE
elif typecode == 14: codec.write_uuid (value.bytes) # UUID
- elif typecode == 15: codec.write_map (value) # FTABLE
elif typecode == 16: codec.write_int8 (int(value)) # S8
elif typecode == 17: codec.write_int16 (int(value)) # S16
elif typecode == 18: codec.write_int32 (int(value)) # S32
elif typecode == 19: codec.write_int64 (int(value)) # S64
+ elif typecode == 20: value._encodeUnmanaged(codec) # OBJECT
+ elif typecode == 15: # FTABLE
+ sc = Codec(codec.spec)
+ if value is not None:
+ sc.write_uint32(len(value))
+ for k, v in value.items():
+ mtype = self.encoding(v)
+ sc.write_str8(k)
+ sc.write_uint8(mtype)
+ self._encodeValue(sc, v, mtype)
+ else:
+ sc.write_uint32(0)
+ codec.write_vbin32(sc.encoded)
+ elif typecode == 21: # List
+ sc = Codec(codec.spec)
+ self._encodeValue(sc, len(value), 3)
+ for o in value:
+ ltype=self.encoding(o)
+ self._encodeValue(sc,ltype,1)
+ self._encodeValue(sc, o, ltype)
+ codec.write_vbin32(sc.encoded)
+ elif typecode == 22: # Array
+ sc = Codec(codec.spec)
+ self._encodeValue(sc, len(value), 3)
+ if len(value) > 0:
+ ltype = self.encoding(value[0])
+ self._encodeValue(sc,ltype,1)
+ for o in value:
+ self._encodeValue(sc, o, ltype)
+ codec.write_vbin32(sc.encoded)
else:
raise ValueError ("Invalid type code: %d" % typecode)
+ def encoding(self, value):
+ return self._encoding(value.__class__)
+
+ def _encoding(self, klass):
+ if Session.ENCODINGS.has_key(klass):
+ return self.ENCODINGS[klass]
+ for base in klass.__bases__:
+ result = self._encoding(base, obj)
+ if result != None:
+ return result
+
def _displayValue(self, value, typecode):
""" """
if typecode == 1: return unicode(value)
@@ -690,8 +1084,64 @@ class Session:
elif typecode == 17: return unicode(value)
elif typecode == 18: return unicode(value)
elif typecode == 19: return unicode(value)
+ elif typecode == 20: return unicode(value.__repr__())
+ elif typecode == 21: return unicode(value.__repr__())
+ elif typecode == 22: return unicode(value.__repr__())
+ else:
+ raise ValueError ("Invalid type code: %d" % typecode)
+
+ def _defaultValue(self, stype, broker=None, kwargs={}):
+ """ """
+ typecode = stype.type
+ if typecode == 1: return 0
+ elif typecode == 2: return 0
+ elif typecode == 3: return 0
+ elif typecode == 4: return 0
+ elif typecode == 6: return ""
+ elif typecode == 7: return ""
+ elif typecode == 8: return 0
+ elif typecode == 9: return 0
+ elif typecode == 10: return ObjectId(None)
+ elif typecode == 11: return False
+ elif typecode == 12: return 0.0
+ elif typecode == 13: return 0.0
+ elif typecode == 14: return UUID([0 for i in range(16)])
+ elif typecode == 15: return {}
+ elif typecode == 16: return 0
+ elif typecode == 17: return 0
+ elif typecode == 18: return 0
+ elif typecode == 19: return 0
+ elif typecode == 21: return []
+ elif typecode == 22: return []
+ elif typecode == 20:
+ try:
+ if "classKeys" in kwargs:
+ keyList = kwargs["classKeys"]
+ else:
+ keyList = None
+ classKey = self._bestClassKey(stype.refPackage, stype.refClass, keyList)
+ if classKey:
+ return self.makeObject(classKey, broker, kwargs)
+ except:
+ pass
+ return None
else:
raise ValueError ("Invalid type code: %d" % typecode)
+
+ def _bestClassKey(self, pname, cname, preferredList):
+ """ """
+ if pname == None or cname == None:
+ if len(preferredList) == 0:
+ return None
+ return preferredList[0]
+ for p in preferredList:
+ if p.getPackageName() == pname and p.getClassName() == cname:
+ return p
+ clist = self.getClasses(pname)
+ for c in clist:
+ if c.getClassName() == cname:
+ return c
+ return None
def _sendMethodRequest(self, broker, schemaKey, objectId, name, argList):
""" This function can be used to send a method request to an object given only the
@@ -783,18 +1233,24 @@ class SchemaClass:
CLASS_KIND_TABLE = 1
CLASS_KIND_EVENT = 2
- def __init__(self, kind, key, codec):
+ def __init__(self, kind, key, codec, session):
self.kind = kind
self.classKey = key
self.properties = []
self.statistics = []
self.methods = []
self.arguments = []
+ self.session = session
+ hasSupertype = codec.read_uint8()
if self.kind == self.CLASS_KIND_TABLE:
propCount = codec.read_uint16()
statCount = codec.read_uint16()
methodCount = codec.read_uint16()
+ if hasSupertype == 1:
+ self.superTypeKey = ClassKey(codec)
+ else:
+ self.superTypeKey = None ;
for idx in range(propCount):
self.properties.append(SchemaProperty(codec))
for idx in range(statCount):
@@ -804,6 +1260,10 @@ class SchemaClass:
elif self.kind == self.CLASS_KIND_EVENT:
argCount = codec.read_uint16()
+ if (hasSupertype):
+ self.superTypeKey = ClassKey(codec)
+ else:
+ self.superTypeKey = None ;
for idx in range(argCount):
self.arguments.append(SchemaArgument(codec, methodArg=False))
@@ -823,19 +1283,32 @@ class SchemaClass:
def getProperties(self):
""" Return the list of properties for the class. """
- return self.properties
+ if (self.superTypeKey == None):
+ return self.properties
+ else:
+ return self.properties + self.session.getSchema(self.superTypeKey).getProperties()
def getStatistics(self):
""" Return the list of statistics for the class. """
- return self.statistics
+ if (self.superTypeKey == None):
+ return self.statistics
+ else:
+ return self.statistics + self.session.getSchema(self.superTypeKey).getStatistics()
def getMethods(self):
""" Return the list of methods for the class. """
- return self.methods
+ if (self.superTypeKey == None):
+ return self.methods
+ else:
+ return self.methods + self.session.getSchema(self.superTypeKey).getMethods()
def getArguments(self):
""" Return the list of events for the class. """
- return self.arguments
+ """ Return the list of methods for the class. """
+ if (self.superTypeKey == None):
+ return self.arguments
+ else:
+ return self.arguments + self.session.getSchema(self.superTypeKey).getArguments()
class SchemaProperty:
""" """
@@ -846,18 +1319,22 @@ class SchemaProperty:
self.access = str(map["access"])
self.index = map["index"] != 0
self.optional = map["optional"] != 0
- self.unit = None
- self.min = None
- self.max = None
- self.maxlen = None
- self.desc = None
+ self.refPackage = None
+ self.refClass = None
+ self.unit = None
+ self.min = None
+ self.max = None
+ self.maxlen = None
+ self.desc = None
for key, value in map.items():
- if key == "unit" : self.unit = value
- elif key == "min" : self.min = value
- elif key == "max" : self.max = value
- elif key == "maxlen" : self.maxlen = value
- elif key == "desc" : self.desc = value
+ if key == "unit" : self.unit = value
+ elif key == "min" : self.min = value
+ elif key == "max" : self.max = value
+ elif key == "maxlen" : self.maxlen = value
+ elif key == "desc" : self.desc = value
+ elif key == "refPackage" : self.refPackage = value
+ elif key == "refClass" : self.refClass = value
def __repr__(self):
return self.name
@@ -920,6 +1397,8 @@ class SchemaArgument:
self.maxlen = None
self.desc = None
self.default = None
+ self.refPackage = None
+ self.refClass = None
for key, value in map.items():
if key == "unit" : self.unit = value
@@ -928,6 +1407,8 @@ class SchemaArgument:
elif key == "maxlen" : self.maxlen = value
elif key == "desc" : self.desc = value
elif key == "default" : self.default = value
+ elif key == "refPackage" : self.refPackage = value
+ elif key == "refClass" : self.refClass = value
class ObjectId:
""" Object that represents QMF object identifiers """
@@ -987,209 +1468,6 @@ class ObjectId:
def __eq__(self, other):
return (self.first, self.second).__eq__(other)
-class Object(object):
- """ This class defines a 'proxy' object representing a real managed object on an agent.
- Actions taken on this proxy are remotely affected on the real managed object.
- """
- def __init__(self, session, broker, schema, codec, prop, stat):
- self._session = session
- self._broker = broker
- self._schema = schema
- self._currentTime = codec.read_uint64()
- self._createTime = codec.read_uint64()
- self._deleteTime = codec.read_uint64()
- self._objectId = ObjectId(codec)
- self._properties = []
- self._statistics = []
- if prop:
- notPresent = self._parsePresenceMasks(codec, schema)
- for property in schema.getProperties():
- if property.name in notPresent:
- self._properties.append((property, None))
- else:
- self._properties.append((property, self._session._decodeValue(codec, property.type)))
- if stat:
- for statistic in schema.getStatistics():
- self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type)))
-
- def getBroker(self):
- """ Return the broker from which this object was sent """
- return self._broker
-
- def getObjectId(self):
- """ Return the object identifier for this object """
- return self._objectId
-
- def getClassKey(self):
- """ Return the class-key that references the schema describing this object. """
- return self._schema.getKey()
-
- def getSchema(self):
- """ Return the schema that describes this object. """
- return self._schema
-
- def getMethods(self):
- """ Return a list of methods available for this object. """
- return self._schema.getMethods()
-
- def getTimestamps(self):
- """ Return the current, creation, and deletion times for this object. """
- return self._currentTime, self._createTime, self._deleteTime
-
- def isDeleted(self):
- """ Return True iff this object has been deleted. """
- return self._deleteTime != 0
-
- def getIndex(self):
- """ Return a string describing this object's primary key. """
- result = u""
- for property, value in self._properties:
- if property.index:
- if result != u"":
- result += u":"
- try:
- valstr = unicode(self._session._displayValue(value, property.type))
- except:
- valstr = u"<undecodable>"
- result += valstr
- return result
-
- def getProperties(self):
- """ Return a list of object properties """
- return self._properties
-
- def getStatistics(self):
- """ Return a list of object statistics """
- return self._statistics
-
- def mergeUpdate(self, newer):
- """ Replace properties and/or statistics with a newly received update """
- if self._objectId != newer._objectId:
- raise Exception("Objects with different object-ids")
- if len(newer.getProperties()) > 0:
- self._properties = newer.getProperties()
- if len(newer.getStatistics()) > 0:
- self._statistics = newer.getStatistics()
-
- def update(self):
- """ Contact the agent and retrieve the lastest property and statistic values for this object. """
- obj = self._session.getObjects(_objectId = self._objectId, _broker=self._broker)
- if obj:
- self.mergeUpdate(obj[0])
- else:
- raise Exception("Underlying object no longer exists")
-
- def __repr__(self):
- key = self.getClassKey()
- return key.getPackageName() + ":" + key.getClassName() +\
- "[" + self.getObjectId().__repr__() + "] " + self.getIndex().encode("utf8")
-
- def __getattr__(self, name):
- for method in self._schema.getMethods():
- if name == method.name:
- return lambda *args, **kwargs : self._invoke(name, args, kwargs)
- for property, value in self._properties:
- if name == property.name:
- return value
- if name == "_" + property.name + "_" and property.type == 10: # Dereference references
- deref = self._session.getObjects(_objectId=value, _broker=self._broker)
- if len(deref) != 1:
- return None
- else:
- return deref[0]
- for statistic, value in self._statistics:
- if name == statistic.name:
- return value
- raise Exception("Type Object has no attribute '%s'" % name)
-
- def _sendMethodRequest(self, name, args, kwargs, synchronous=False, timeWait=None):
- for method in self._schema.getMethods():
- if name == method.name:
- aIdx = 0
- sendCodec = Codec(self._broker.conn.spec)
- seq = self._session.seqMgr._reserve((method, synchronous))
- self._broker._setHeader(sendCodec, 'M', seq)
- self._objectId.encode(sendCodec)
- self._schema.getKey().encode(sendCodec)
- sendCodec.write_str8(name)
-
- count = 0
- for arg in method.arguments:
- if arg.dir.find("I") != -1:
- count += 1
- if count != len(args):
- raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args)))
-
- for arg in method.arguments:
- if arg.dir.find("I") != -1:
- self._session._encodeValue(sendCodec, args[aIdx], arg.type)
- aIdx += 1
- if timeWait:
- ttl = timeWait * 1000
- else:
- ttl = None
- smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" %
- (self._objectId.getBrokerBank(), self._objectId.getAgentBank()),
- ttl=ttl)
- if synchronous:
- try:
- self._broker.cv.acquire()
- self._broker.syncInFlight = True
- finally:
- self._broker.cv.release()
- self._broker._send(smsg)
- return seq
- return None
-
- def _invoke(self, name, args, kwargs):
- if "_timeout" in kwargs:
- timeout = kwargs["_timeout"]
- else:
- timeout = self._broker.SYNC_TIME
-
- if "_async" in kwargs and kwargs["_async"]:
- sync = False
- if "_timeout" not in kwargs:
- timeout = None
- else:
- sync = True
-
- seq = self._sendMethodRequest(name, args, kwargs, sync, timeout)
- if seq:
- if not sync:
- return seq
- try:
- self._broker.cv.acquire()
- starttime = time()
- while self._broker.syncInFlight and self._broker.error == None:
- self._broker.cv.wait(timeout)
- if time() - starttime > timeout:
- self._session.seqMgr._release(seq)
- raise RuntimeError("Timed out waiting for method to respond")
- finally:
- self._broker.cv.release()
- if self._broker.error != None:
- errorText = self._broker.error
- self._broker.error = None
- raise Exception(errorText)
- return self._broker.syncResult
- raise Exception("Invalid Method (software defect) [%s]" % name)
-
- def _parsePresenceMasks(self, codec, schema):
- excludeList = []
- bit = 0
- for property in schema.getProperties():
- if property.optional:
- if bit == 0:
- mask = codec.read_uint8()
- bit = 1
- if (mask & bit) == 0:
- excludeList.append(property.name)
- bit *= 2
- if bit == 256:
- bit = 0
- return excludeList
-
class MethodResult(object):
""" """
def __init__(self, status, text, outArgs):
@@ -1361,9 +1639,13 @@ class Broker:
self.reqsOutstanding = 1
sock = connect(self.host, self.port)
+ sock.settimeout(5)
if self.ssl:
sock = ssl(sock)
- self.conn = Connection(sock, username=self.authUser, password=self.authPass)
+ self.conn = Connection(sock, username=self.authUser, password=self.authPass, heartbeat=2)
+ def aborted():
+ raise Timeout("read timed out")
+ self.conn.aborted = aborted
self.conn.start()
self.replyName = "reply-%s" % self.amqpSessionId
self.amqpSession = self.conn.session(self.amqpSessionId)
@@ -1424,7 +1706,7 @@ class Broker:
""" Compose the header of a management message. """
codec.write_uint8(ord('A'))
codec.write_uint8(ord('M'))
- codec.write_uint8(ord('2'))
+ codec.write_uint8(ord('3'))
codec.write_uint8(ord(opcode))
codec.write_uint32(seq)
@@ -1438,7 +1720,7 @@ class Broker:
if octet != 'M':
return None, None
octet = chr(codec.read_uint8())
- if octet != '2':
+ if octet != '3':
return None, None
opcode = chr(codec.read_uint8())
seq = codec.read_uint32()
@@ -1453,6 +1735,7 @@ class Broker:
dp.ttl = ttl
mp = self.amqpSession.message_properties()
mp.content_type = "x-application/qmf"
+ mp.user_id = self.authUser
mp.reply_to = self.amqpSession.reply_to("amq.direct", self.replyName)
return Message(dp, mp, body)
@@ -1576,7 +1859,7 @@ class Event:
self.schema = session.packages[pname][pkey]
self.arguments = {}
for arg in self.schema.arguments:
- self.arguments[arg.name] = session._decodeValue(codec, arg.type)
+ self.arguments[arg.name] = session._decodeValue(codec, arg.type, broker)
def __repr__(self):
if self.schema == None:
diff --git a/python/qpid/management.py b/python/qpid/management.py
index 546e68ae8e..b97709d367 100644
--- a/python/qpid/management.py
+++ b/python/qpid/management.py
@@ -401,7 +401,7 @@ class managementClient:
""" Compose the header of a management message. """
codec.write_uint8 (ord ('A'))
codec.write_uint8 (ord ('M'))
- codec.write_uint8 (ord ('2'))
+ codec.write_uint8 (ord ('3'))
codec.write_uint8 (opcode)
codec.write_uint32 (seq)
@@ -415,7 +415,7 @@ class managementClient:
if octet != 'M':
return None
octet = chr (codec.read_uint8 ())
- if octet != '2':
+ if octet != '3':
return None
opcode = chr (codec.read_uint8 ())
seq = codec.read_uint32 ()
@@ -672,9 +672,14 @@ class managementClient:
packageName = codec.read_str8 ()
className = codec.read_str8 ()
hash = codec.read_bin128 ()
+ hasSupertype = codec.read_uint8()
configCount = codec.read_uint16 ()
instCount = codec.read_uint16 ()
methodCount = codec.read_uint16 ()
+ if hasSupertype != 0:
+ supertypePackage = codec.read_str8()
+ supertypeClass = codec.read_str8()
+ supertypeHash = codec.read_bin128()
if packageName not in self.packages:
return
diff --git a/python/qpid/managementdata.py b/python/qpid/managementdata.py
index e1fd8d54eb..84eb9c3ff8 100644
--- a/python/qpid/managementdata.py
+++ b/python/qpid/managementdata.py
@@ -360,6 +360,12 @@ class ManagementData:
return "int32"
elif typecode == 19:
return "int64"
+ elif typecode == 20:
+ return "object"
+ elif typecode == 21:
+ return "list"
+ elif typecode == 22:
+ return "array"
else:
raise ValueError ("Invalid type code: %d" % typecode)