diff options
author | Ted Ross <tross@apache.org> | 2009-06-11 15:54:37 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2009-06-11 15:54:37 +0000 |
commit | 52be8c6219f98781d83711b32d4c5971b1fa6d1c (patch) | |
tree | 812f608b9472a76d757e92be8dc3a184a2e2ae13 /python | |
parent | 2a61047873520b644cbc368ebd59fb2d5a0c217d (diff) | |
download | qpid-python-52be8c6219f98781d83711b32d4c5971b1fa6d1c.tar.gz |
QPID-1786 - Committed qmf patches from Bryan Kearney
Additionally updated existing qmf and Qman to be compatible.
The magic number for qmf messages has been incremented.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@783818 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rwxr-xr-x | python/commands/qpid-config | 2 | ||||
-rw-r--r-- | python/qmf/console.py | 743 | ||||
-rw-r--r-- | python/qpid/management.py | 9 | ||||
-rw-r--r-- | python/qpid/managementdata.py | 6 |
4 files changed, 527 insertions, 233 deletions
diff --git a/python/commands/qpid-config b/python/commands/qpid-config index 838e9f340f..59145620cd 100755 --- a/python/commands/qpid-config +++ b/python/commands/qpid-config @@ -473,7 +473,7 @@ try: except KeyboardInterrupt: print except Exception,e: - print "Failed:", e.args + print "Failed: %s: %s" % (e.__class__.__name__, e) sys.exit(1) bm.Disconnect() diff --git a/python/qmf/console.py b/python/qmf/console.py index de8df06adc..36553562f7 100644 --- a/python/qmf/console.py +++ b/python/qmf/console.py @@ -25,10 +25,13 @@ import qpid import struct import socket import re +from qpid.datatypes import UUID +from qpid.datatypes import timestamp +from qpid.datatypes import datetime from qpid.peer import Closed from qpid.session import SessionDetached -from qpid.connection import Connection, ConnectionFailed -from qpid.datatypes import Message, RangedSet +from qpid.connection import Connection, ConnectionFailed, Timeout +from qpid.datatypes import Message, RangedSet, UUID from qpid.util import connect, ssl, URL from qpid.codec010 import StringCodec as Codec from threading import Lock, Condition, Thread @@ -107,6 +110,289 @@ class BrokerURL(URL): def match(self, host, port): return socket.gethostbyname(self.host) == socket.gethostbyname(host) and self.port == port +class Object(object): + """ This class defines a 'proxy' object representing a real managed object on an agent. + Actions taken on this proxy are remotely affected on the real managed object. + """ + def __init__(self, session, broker, schema, codec, prop, stat, managed=True, kwargs={}): + self._session = session + self._broker = broker + self._schema = schema + self._managed = managed + if self._managed: + self._currentTime = codec.read_uint64() + self._createTime = codec.read_uint64() + self._deleteTime = codec.read_uint64() + self._objectId = ObjectId(codec) + else: + self._currentTime = None + self._createTime = None + self._deleteTime = None + self._objectId = None + self._properties = [] + self._statistics = [] + if codec: + if prop: + notPresent = self._parsePresenceMasks(codec, schema) + for property in schema.getProperties(): + if property.name in notPresent: + self._properties.append((property, None)) + else: + self._properties.append((property, self._session._decodeValue(codec, property.type, broker))) + if stat: + for statistic in schema.getStatistics(): + self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type, broker))) + else: + for property in schema.getProperties(): + if property.optional: + self._properties.append((property, None)) + else: + self._properties.append((property, self._session._defaultValue(property, broker, kwargs))) + for statistic in schema.getStatistics(): + self._statistics.append((statistic, self._session._defaultValue(statistic, broker, kwargs))) + + def getBroker(self): + """ Return the broker from which this object was sent """ + return self._broker + + def getObjectId(self): + """ Return the object identifier for this object """ + return self._objectId + + def getClassKey(self): + """ Return the class-key that references the schema describing this object. """ + return self._schema.getKey() + + def getSchema(self): + """ Return the schema that describes this object. """ + return self._schema + + def getMethods(self): + """ Return a list of methods available for this object. """ + return self._schema.getMethods() + + def getTimestamps(self): + """ Return the current, creation, and deletion times for this object. """ + return self._currentTime, self._createTime, self._deleteTime + + def isDeleted(self): + """ Return True iff this object has been deleted. """ + return self._deleteTime != 0 + + def isManaged(self): + """ Return True iff this object is a proxy for a managed object on an agent. """ + return self._managed + + def getIndex(self): + """ Return a string describing this object's primary key. """ + result = u"" + for property, value in self._properties: + if property.index: + if result != u"": + result += u":" + try: + valstr = unicode(self._session._displayValue(value, property.type)) + except: + valstr = u"<undecodable>" + result += valstr + return result + + def getProperties(self): + """ Return a list of object properties """ + return self._properties + + def getStatistics(self): + """ Return a list of object statistics """ + return self._statistics + + def mergeUpdate(self, newer): + """ Replace properties and/or statistics with a newly received update """ + if not self.isManaged(): + raise Exception("Object is not managed") + if self._objectId != newer._objectId: + raise Exception("Objects with different object-ids") + if len(newer.getProperties()) > 0: + self._properties = newer.getProperties() + if len(newer.getStatistics()) > 0: + self._statistics = newer.getStatistics() + + def update(self): + """ Contact the agent and retrieve the lastest property and statistic values for this object. """ + if not self.isManaged(): + raise Exception("Object is not managed") + obj = self._session.getObjects(_objectId = self._objectId, _broker=self._broker) + if obj: + self.mergeUpdate(obj[0]) + else: + raise Exception("Underlying object no longer exists") + + def __repr__(self): + if self.isManaged(): + id = self.getObjectId().__repr__() + else: + id = "unmanaged" + key = self.getClassKey() + return key.getPackageName() + ":" + key.getClassName() +\ + "[" + id + "] " + self.getIndex().encode("utf8") + + def __getattr__(self, name): + for method in self._schema.getMethods(): + if name == method.name: + return lambda *args, **kwargs : self._invoke(name, args, kwargs) + for property, value in self._properties: + if name == property.name: + return value + if name == "_" + property.name + "_" and property.type == 10: # Dereference references + deref = self._session.getObjects(_objectId=value, _broker=self._broker) + if len(deref) != 1: + return None + else: + return deref[0] + for statistic, value in self._statistics: + if name == statistic.name: + return value + raise Exception("Type Object has no attribute '%s'" % name) + + def __setattr__(self, name, value): + if name[0] == '_': + super.__setattr__(self, name, value) + return + + for prop, unusedValue in self._properties: + if name == prop.name: + newprop = (prop, value) + newlist = [] + for old, val in self._properties: + if name == old.name: + newlist.append(newprop) + else: + newlist.append((old, val)) + self._properties = newlist + return + super.__setattr__(self, name, value) + + def _sendMethodRequest(self, name, args, kwargs, synchronous=False, timeWait=None): + for method in self._schema.getMethods(): + if name == method.name: + aIdx = 0 + sendCodec = Codec(self._broker.conn.spec) + seq = self._session.seqMgr._reserve((method, synchronous)) + self._broker._setHeader(sendCodec, 'M', seq) + self._objectId.encode(sendCodec) + self._schema.getKey().encode(sendCodec) + sendCodec.write_str8(name) + + count = 0 + for arg in method.arguments: + if arg.dir.find("I") != -1: + count += 1 + if count != len(args): + raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args))) + + for arg in method.arguments: + if arg.dir.find("I") != -1: + self._session._encodeValue(sendCodec, args[aIdx], arg.type) + aIdx += 1 + if timeWait: + ttl = timeWait * 1000 + else: + ttl = None + smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" % + (self._objectId.getBrokerBank(), self._objectId.getAgentBank()), + ttl=ttl) + if synchronous: + try: + self._broker.cv.acquire() + self._broker.syncInFlight = True + finally: + self._broker.cv.release() + self._broker._send(smsg) + return seq + return None + + def _invoke(self, name, args, kwargs): + if not self.isManaged(): + raise Exception("Object is not managed") + if "_timeout" in kwargs: + timeout = kwargs["_timeout"] + else: + timeout = self._broker.SYNC_TIME + + if "_async" in kwargs and kwargs["_async"]: + sync = False + if "_timeout" not in kwargs: + timeout = None + else: + sync = True + + seq = self._sendMethodRequest(name, args, kwargs, sync, timeout) + if seq: + if not sync: + return seq + try: + self._broker.cv.acquire() + starttime = time() + while self._broker.syncInFlight and self._broker.error == None: + self._broker.cv.wait(timeout) + if time() - starttime > timeout: + self._session.seqMgr._release(seq) + raise RuntimeError("Timed out waiting for method to respond") + finally: + self._broker.cv.release() + if self._broker.error != None: + errorText = self._broker.error + self._broker.error = None + raise Exception(errorText) + return self._broker.syncResult + raise Exception("Invalid Method (software defect) [%s]" % name) + + def _encodeUnmanaged(self, codec): + # emit presence masks for optional properties + mask = 0 + bit = 0 + for prop, value in self._properties: + if prop.optional: + if bit == 0: + bit = 1 + if value: + mask |= bit + bit = bit << 1 + if bit == 256: + bit = 0 + codec.write_uint8(mask) + mask = 0 + if bit != 0: + codec.write_uint8(mask) + + codec.write_uint8(20) + codec.write_str8(self._schema.getKey().getPackageName()) + codec.write_str8(self._schema.getKey().getClassName()) + codec.write_bin128(self._schema.getKey().getHash()) + + # encode properties + for prop, value in self._properties: + if value != None: + self._session._encodeValue(codec, value, prop.type) + + # encode statistics + for stat, value in self._statistics: + self._session._encodeValue(codec, value, stat.type) + + def _parsePresenceMasks(self, codec, schema): + excludeList = [] + bit = 0 + for property in schema.getProperties(): + if property.optional: + if bit == 0: + mask = codec.read_uint8() + bit = 1 + if (mask & bit) == 0: + excludeList.append(property.name) + bit *= 2 + if bit == 256: + bit = 0 + return excludeList + class Session: """ An instance of the Session class represents a console session running @@ -119,6 +405,18 @@ class Session: DEFAULT_GET_WAIT_TIME = 60 + ENCODINGS = { + str: 7, + timestamp: 8, + datetime: 8, + int: 9, + long: 9, + float: 13, + UUID: 14, + Object: 20, + list: 21 + } + def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True, manageConnections=False, userBindings=False): """ @@ -265,6 +563,13 @@ class Session: agentList.append(a) return agentList + def makeObject(self, classKey, broker=None, **kwargs): + """ Create a new, unmanaged object of the schema indicated by classKey """ + schema = self.getSchema(classKey) + if schema == None: + raise Exception("Schema not found for classKey") + return Object(self, broker, schema, None, True, True, False, kwargs) + def getObjects(self, **kwargs): """ Get a list of objects from QMF agents. All arguments are passed by name(keyword). @@ -521,7 +826,7 @@ class Session: if code == 0: for arg in method.arguments: if arg.dir.find("O") != -1: - outArgs[arg.name] = self._decodeValue(codec, arg.type) + outArgs[arg.name] = self._decodeValue(codec, arg.type, broker) result = MethodResult(code, text, outArgs) if synchronous: try: @@ -559,7 +864,7 @@ class Session: def _handleSchemaResp(self, broker, codec, seq): kind = codec.read_uint8() classKey = ClassKey(codec) - _class = SchemaClass(kind, classKey, codec) + _class = SchemaClass(kind, classKey, codec, self) try: self.cv.acquire() self.packages[classKey.getPackageName()][classKey.getPackageKey()] = _class @@ -605,9 +910,10 @@ class Session: self.console.objectStats(broker, object) def _handleError(self, error): - self.error = error try: self.cv.acquire() + if len(self.syncSequenceList) > 0: + self.error = error self.syncSequenceList = [] self.cv.notify() finally: @@ -621,7 +927,7 @@ class Session: return False return True - def _decodeValue(self, codec, typecode): + def _decodeValue(self, codec, typecode, broker=None): """ Decode, from the codec, a value based on its typecode. """ if typecode == 1: data = codec.read_uint8() # U8 elif typecode == 2: data = codec.read_uint16() # U16 @@ -636,11 +942,59 @@ class Session: elif typecode == 12: data = codec.read_float() # FLOAT elif typecode == 13: data = codec.read_double() # DOUBLE elif typecode == 14: data = codec.read_uuid() # UUID - elif typecode == 15: data = codec.read_map() # FTABLE elif typecode == 16: data = codec.read_int8() # S8 elif typecode == 17: data = codec.read_int16() # S16 elif typecode == 18: data = codec.read_int32() # S32 elif typecode == 19: data = codec.read_int64() # S63 + elif typecode == 15: # FTABLE + data = {} + sc = Codec(codec.spec, codec.read_vbin32()) + if sc.encoded: + count = sc.read_uint32() + while count > 0: + k = sc.read_str8() + code = sc.read_uint8() + v = self._decodeValue(sc, code, broker) + data[k] = v + count -= 1 + elif typecode == 20: # OBJECT + # Peek at the type, and if it is still 20 pull it decode. If + # Not, call back into self. + inner_type_code = codec.read_uint8() + if inner_type_code == 20: + classKey = ClassKey(codec) + try: + self.cv.acquire() + pname = classKey.getPackageName() + if pname not in self.packages: + return None + pkey = classKey.getPackageKey() + if pkey not in self.packages[pname]: + return None + schema = self.packages[pname][pkey] + finally: + self.cv.release() + data = Object(self, broker, schema, codec, True, True, False) + else: + data = self._decodeValue(codec, inner_type_code, broker) + elif typecode == 21: # List + #taken from codec10.read_list + sc = Codec(codec.spec, codec.read_vbin32()) + count = sc.read_uint32() + data = [] + while count > 0: + type = sc.read_uint8() + data.append(self._decodeValue(sc,type,broker)) + count -= 1 + elif typecode == 22: #Array + #taken from codec10.read_array + sc = Codec(codec.spec, codec.read_vbin32()) + count = sc.read_uint32() + type = sc.read_uint8() + data = [] + while count > 0: + data.append(self._decodeValue(sc,type,broker)) + count -= 1 else: raise ValueError("Invalid type code: %d" % typecode) return data @@ -660,14 +1014,54 @@ class Session: elif typecode == 12: codec.write_float (float(value)) # FLOAT elif typecode == 13: codec.write_double (float(value)) # DOUBLE elif typecode == 14: codec.write_uuid (value.bytes) # UUID - elif typecode == 15: codec.write_map (value) # FTABLE elif typecode == 16: codec.write_int8 (int(value)) # S8 elif typecode == 17: codec.write_int16 (int(value)) # S16 elif typecode == 18: codec.write_int32 (int(value)) # S32 elif typecode == 19: codec.write_int64 (int(value)) # S64 + elif typecode == 20: value._encodeUnmanaged(codec) # OBJECT + elif typecode == 15: # FTABLE + sc = Codec(codec.spec) + if value is not None: + sc.write_uint32(len(value)) + for k, v in value.items(): + mtype = self.encoding(v) + sc.write_str8(k) + sc.write_uint8(mtype) + self._encodeValue(sc, v, mtype) + else: + sc.write_uint32(0) + codec.write_vbin32(sc.encoded) + elif typecode == 21: # List + sc = Codec(codec.spec) + self._encodeValue(sc, len(value), 3) + for o in value: + ltype=self.encoding(o) + self._encodeValue(sc,ltype,1) + self._encodeValue(sc, o, ltype) + codec.write_vbin32(sc.encoded) + elif typecode == 22: # Array + sc = Codec(codec.spec) + self._encodeValue(sc, len(value), 3) + if len(value) > 0: + ltype = self.encoding(value[0]) + self._encodeValue(sc,ltype,1) + for o in value: + self._encodeValue(sc, o, ltype) + codec.write_vbin32(sc.encoded) else: raise ValueError ("Invalid type code: %d" % typecode) + def encoding(self, value): + return self._encoding(value.__class__) + + def _encoding(self, klass): + if Session.ENCODINGS.has_key(klass): + return self.ENCODINGS[klass] + for base in klass.__bases__: + result = self._encoding(base, obj) + if result != None: + return result + def _displayValue(self, value, typecode): """ """ if typecode == 1: return unicode(value) @@ -690,8 +1084,64 @@ class Session: elif typecode == 17: return unicode(value) elif typecode == 18: return unicode(value) elif typecode == 19: return unicode(value) + elif typecode == 20: return unicode(value.__repr__()) + elif typecode == 21: return unicode(value.__repr__()) + elif typecode == 22: return unicode(value.__repr__()) + else: + raise ValueError ("Invalid type code: %d" % typecode) + + def _defaultValue(self, stype, broker=None, kwargs={}): + """ """ + typecode = stype.type + if typecode == 1: return 0 + elif typecode == 2: return 0 + elif typecode == 3: return 0 + elif typecode == 4: return 0 + elif typecode == 6: return "" + elif typecode == 7: return "" + elif typecode == 8: return 0 + elif typecode == 9: return 0 + elif typecode == 10: return ObjectId(None) + elif typecode == 11: return False + elif typecode == 12: return 0.0 + elif typecode == 13: return 0.0 + elif typecode == 14: return UUID([0 for i in range(16)]) + elif typecode == 15: return {} + elif typecode == 16: return 0 + elif typecode == 17: return 0 + elif typecode == 18: return 0 + elif typecode == 19: return 0 + elif typecode == 21: return [] + elif typecode == 22: return [] + elif typecode == 20: + try: + if "classKeys" in kwargs: + keyList = kwargs["classKeys"] + else: + keyList = None + classKey = self._bestClassKey(stype.refPackage, stype.refClass, keyList) + if classKey: + return self.makeObject(classKey, broker, kwargs) + except: + pass + return None else: raise ValueError ("Invalid type code: %d" % typecode) + + def _bestClassKey(self, pname, cname, preferredList): + """ """ + if pname == None or cname == None: + if len(preferredList) == 0: + return None + return preferredList[0] + for p in preferredList: + if p.getPackageName() == pname and p.getClassName() == cname: + return p + clist = self.getClasses(pname) + for c in clist: + if c.getClassName() == cname: + return c + return None def _sendMethodRequest(self, broker, schemaKey, objectId, name, argList): """ This function can be used to send a method request to an object given only the @@ -783,18 +1233,24 @@ class SchemaClass: CLASS_KIND_TABLE = 1 CLASS_KIND_EVENT = 2 - def __init__(self, kind, key, codec): + def __init__(self, kind, key, codec, session): self.kind = kind self.classKey = key self.properties = [] self.statistics = [] self.methods = [] self.arguments = [] + self.session = session + hasSupertype = codec.read_uint8() if self.kind == self.CLASS_KIND_TABLE: propCount = codec.read_uint16() statCount = codec.read_uint16() methodCount = codec.read_uint16() + if hasSupertype == 1: + self.superTypeKey = ClassKey(codec) + else: + self.superTypeKey = None ; for idx in range(propCount): self.properties.append(SchemaProperty(codec)) for idx in range(statCount): @@ -804,6 +1260,10 @@ class SchemaClass: elif self.kind == self.CLASS_KIND_EVENT: argCount = codec.read_uint16() + if (hasSupertype): + self.superTypeKey = ClassKey(codec) + else: + self.superTypeKey = None ; for idx in range(argCount): self.arguments.append(SchemaArgument(codec, methodArg=False)) @@ -823,19 +1283,32 @@ class SchemaClass: def getProperties(self): """ Return the list of properties for the class. """ - return self.properties + if (self.superTypeKey == None): + return self.properties + else: + return self.properties + self.session.getSchema(self.superTypeKey).getProperties() def getStatistics(self): """ Return the list of statistics for the class. """ - return self.statistics + if (self.superTypeKey == None): + return self.statistics + else: + return self.statistics + self.session.getSchema(self.superTypeKey).getStatistics() def getMethods(self): """ Return the list of methods for the class. """ - return self.methods + if (self.superTypeKey == None): + return self.methods + else: + return self.methods + self.session.getSchema(self.superTypeKey).getMethods() def getArguments(self): """ Return the list of events for the class. """ - return self.arguments + """ Return the list of methods for the class. """ + if (self.superTypeKey == None): + return self.arguments + else: + return self.arguments + self.session.getSchema(self.superTypeKey).getArguments() class SchemaProperty: """ """ @@ -846,18 +1319,22 @@ class SchemaProperty: self.access = str(map["access"]) self.index = map["index"] != 0 self.optional = map["optional"] != 0 - self.unit = None - self.min = None - self.max = None - self.maxlen = None - self.desc = None + self.refPackage = None + self.refClass = None + self.unit = None + self.min = None + self.max = None + self.maxlen = None + self.desc = None for key, value in map.items(): - if key == "unit" : self.unit = value - elif key == "min" : self.min = value - elif key == "max" : self.max = value - elif key == "maxlen" : self.maxlen = value - elif key == "desc" : self.desc = value + if key == "unit" : self.unit = value + elif key == "min" : self.min = value + elif key == "max" : self.max = value + elif key == "maxlen" : self.maxlen = value + elif key == "desc" : self.desc = value + elif key == "refPackage" : self.refPackage = value + elif key == "refClass" : self.refClass = value def __repr__(self): return self.name @@ -920,6 +1397,8 @@ class SchemaArgument: self.maxlen = None self.desc = None self.default = None + self.refPackage = None + self.refClass = None for key, value in map.items(): if key == "unit" : self.unit = value @@ -928,6 +1407,8 @@ class SchemaArgument: elif key == "maxlen" : self.maxlen = value elif key == "desc" : self.desc = value elif key == "default" : self.default = value + elif key == "refPackage" : self.refPackage = value + elif key == "refClass" : self.refClass = value class ObjectId: """ Object that represents QMF object identifiers """ @@ -987,209 +1468,6 @@ class ObjectId: def __eq__(self, other): return (self.first, self.second).__eq__(other) -class Object(object): - """ This class defines a 'proxy' object representing a real managed object on an agent. - Actions taken on this proxy are remotely affected on the real managed object. - """ - def __init__(self, session, broker, schema, codec, prop, stat): - self._session = session - self._broker = broker - self._schema = schema - self._currentTime = codec.read_uint64() - self._createTime = codec.read_uint64() - self._deleteTime = codec.read_uint64() - self._objectId = ObjectId(codec) - self._properties = [] - self._statistics = [] - if prop: - notPresent = self._parsePresenceMasks(codec, schema) - for property in schema.getProperties(): - if property.name in notPresent: - self._properties.append((property, None)) - else: - self._properties.append((property, self._session._decodeValue(codec, property.type))) - if stat: - for statistic in schema.getStatistics(): - self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type))) - - def getBroker(self): - """ Return the broker from which this object was sent """ - return self._broker - - def getObjectId(self): - """ Return the object identifier for this object """ - return self._objectId - - def getClassKey(self): - """ Return the class-key that references the schema describing this object. """ - return self._schema.getKey() - - def getSchema(self): - """ Return the schema that describes this object. """ - return self._schema - - def getMethods(self): - """ Return a list of methods available for this object. """ - return self._schema.getMethods() - - def getTimestamps(self): - """ Return the current, creation, and deletion times for this object. """ - return self._currentTime, self._createTime, self._deleteTime - - def isDeleted(self): - """ Return True iff this object has been deleted. """ - return self._deleteTime != 0 - - def getIndex(self): - """ Return a string describing this object's primary key. """ - result = u"" - for property, value in self._properties: - if property.index: - if result != u"": - result += u":" - try: - valstr = unicode(self._session._displayValue(value, property.type)) - except: - valstr = u"<undecodable>" - result += valstr - return result - - def getProperties(self): - """ Return a list of object properties """ - return self._properties - - def getStatistics(self): - """ Return a list of object statistics """ - return self._statistics - - def mergeUpdate(self, newer): - """ Replace properties and/or statistics with a newly received update """ - if self._objectId != newer._objectId: - raise Exception("Objects with different object-ids") - if len(newer.getProperties()) > 0: - self._properties = newer.getProperties() - if len(newer.getStatistics()) > 0: - self._statistics = newer.getStatistics() - - def update(self): - """ Contact the agent and retrieve the lastest property and statistic values for this object. """ - obj = self._session.getObjects(_objectId = self._objectId, _broker=self._broker) - if obj: - self.mergeUpdate(obj[0]) - else: - raise Exception("Underlying object no longer exists") - - def __repr__(self): - key = self.getClassKey() - return key.getPackageName() + ":" + key.getClassName() +\ - "[" + self.getObjectId().__repr__() + "] " + self.getIndex().encode("utf8") - - def __getattr__(self, name): - for method in self._schema.getMethods(): - if name == method.name: - return lambda *args, **kwargs : self._invoke(name, args, kwargs) - for property, value in self._properties: - if name == property.name: - return value - if name == "_" + property.name + "_" and property.type == 10: # Dereference references - deref = self._session.getObjects(_objectId=value, _broker=self._broker) - if len(deref) != 1: - return None - else: - return deref[0] - for statistic, value in self._statistics: - if name == statistic.name: - return value - raise Exception("Type Object has no attribute '%s'" % name) - - def _sendMethodRequest(self, name, args, kwargs, synchronous=False, timeWait=None): - for method in self._schema.getMethods(): - if name == method.name: - aIdx = 0 - sendCodec = Codec(self._broker.conn.spec) - seq = self._session.seqMgr._reserve((method, synchronous)) - self._broker._setHeader(sendCodec, 'M', seq) - self._objectId.encode(sendCodec) - self._schema.getKey().encode(sendCodec) - sendCodec.write_str8(name) - - count = 0 - for arg in method.arguments: - if arg.dir.find("I") != -1: - count += 1 - if count != len(args): - raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args))) - - for arg in method.arguments: - if arg.dir.find("I") != -1: - self._session._encodeValue(sendCodec, args[aIdx], arg.type) - aIdx += 1 - if timeWait: - ttl = timeWait * 1000 - else: - ttl = None - smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" % - (self._objectId.getBrokerBank(), self._objectId.getAgentBank()), - ttl=ttl) - if synchronous: - try: - self._broker.cv.acquire() - self._broker.syncInFlight = True - finally: - self._broker.cv.release() - self._broker._send(smsg) - return seq - return None - - def _invoke(self, name, args, kwargs): - if "_timeout" in kwargs: - timeout = kwargs["_timeout"] - else: - timeout = self._broker.SYNC_TIME - - if "_async" in kwargs and kwargs["_async"]: - sync = False - if "_timeout" not in kwargs: - timeout = None - else: - sync = True - - seq = self._sendMethodRequest(name, args, kwargs, sync, timeout) - if seq: - if not sync: - return seq - try: - self._broker.cv.acquire() - starttime = time() - while self._broker.syncInFlight and self._broker.error == None: - self._broker.cv.wait(timeout) - if time() - starttime > timeout: - self._session.seqMgr._release(seq) - raise RuntimeError("Timed out waiting for method to respond") - finally: - self._broker.cv.release() - if self._broker.error != None: - errorText = self._broker.error - self._broker.error = None - raise Exception(errorText) - return self._broker.syncResult - raise Exception("Invalid Method (software defect) [%s]" % name) - - def _parsePresenceMasks(self, codec, schema): - excludeList = [] - bit = 0 - for property in schema.getProperties(): - if property.optional: - if bit == 0: - mask = codec.read_uint8() - bit = 1 - if (mask & bit) == 0: - excludeList.append(property.name) - bit *= 2 - if bit == 256: - bit = 0 - return excludeList - class MethodResult(object): """ """ def __init__(self, status, text, outArgs): @@ -1361,9 +1639,13 @@ class Broker: self.reqsOutstanding = 1 sock = connect(self.host, self.port) + sock.settimeout(5) if self.ssl: sock = ssl(sock) - self.conn = Connection(sock, username=self.authUser, password=self.authPass) + self.conn = Connection(sock, username=self.authUser, password=self.authPass, heartbeat=2) + def aborted(): + raise Timeout("read timed out") + self.conn.aborted = aborted self.conn.start() self.replyName = "reply-%s" % self.amqpSessionId self.amqpSession = self.conn.session(self.amqpSessionId) @@ -1424,7 +1706,7 @@ class Broker: """ Compose the header of a management message. """ codec.write_uint8(ord('A')) codec.write_uint8(ord('M')) - codec.write_uint8(ord('2')) + codec.write_uint8(ord('3')) codec.write_uint8(ord(opcode)) codec.write_uint32(seq) @@ -1438,7 +1720,7 @@ class Broker: if octet != 'M': return None, None octet = chr(codec.read_uint8()) - if octet != '2': + if octet != '3': return None, None opcode = chr(codec.read_uint8()) seq = codec.read_uint32() @@ -1453,6 +1735,7 @@ class Broker: dp.ttl = ttl mp = self.amqpSession.message_properties() mp.content_type = "x-application/qmf" + mp.user_id = self.authUser mp.reply_to = self.amqpSession.reply_to("amq.direct", self.replyName) return Message(dp, mp, body) @@ -1576,7 +1859,7 @@ class Event: self.schema = session.packages[pname][pkey] self.arguments = {} for arg in self.schema.arguments: - self.arguments[arg.name] = session._decodeValue(codec, arg.type) + self.arguments[arg.name] = session._decodeValue(codec, arg.type, broker) def __repr__(self): if self.schema == None: diff --git a/python/qpid/management.py b/python/qpid/management.py index 546e68ae8e..b97709d367 100644 --- a/python/qpid/management.py +++ b/python/qpid/management.py @@ -401,7 +401,7 @@ class managementClient: """ Compose the header of a management message. """ codec.write_uint8 (ord ('A')) codec.write_uint8 (ord ('M')) - codec.write_uint8 (ord ('2')) + codec.write_uint8 (ord ('3')) codec.write_uint8 (opcode) codec.write_uint32 (seq) @@ -415,7 +415,7 @@ class managementClient: if octet != 'M': return None octet = chr (codec.read_uint8 ()) - if octet != '2': + if octet != '3': return None opcode = chr (codec.read_uint8 ()) seq = codec.read_uint32 () @@ -672,9 +672,14 @@ class managementClient: packageName = codec.read_str8 () className = codec.read_str8 () hash = codec.read_bin128 () + hasSupertype = codec.read_uint8() configCount = codec.read_uint16 () instCount = codec.read_uint16 () methodCount = codec.read_uint16 () + if hasSupertype != 0: + supertypePackage = codec.read_str8() + supertypeClass = codec.read_str8() + supertypeHash = codec.read_bin128() if packageName not in self.packages: return diff --git a/python/qpid/managementdata.py b/python/qpid/managementdata.py index e1fd8d54eb..84eb9c3ff8 100644 --- a/python/qpid/managementdata.py +++ b/python/qpid/managementdata.py @@ -360,6 +360,12 @@ class ManagementData: return "int32" elif typecode == 19: return "int64" + elif typecode == 20: + return "object" + elif typecode == 21: + return "list" + elif typecode == 22: + return "array" else: raise ValueError ("Invalid type code: %d" % typecode) |