diff options
author | Ted Ross <tross@apache.org> | 2010-03-23 21:21:17 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2010-03-23 21:21:17 +0000 |
commit | 412859fac1743869f40cf56598ca09dbcfb06379 (patch) | |
tree | e55747a5c49c27fc4a3bc6a4fb45fd2d05f90903 | |
parent | d1178e0c2abe09f41eba0c1f794edd7caf8b8605 (diff) | |
download | qpid-python-412859fac1743869f40cf56598ca09dbcfb06379.tar.gz |
Checkpointing updates for the Python console.
- Added 'list' type for QMF.
- Updated qmf-agent example to use the new string formats for agent name and object-id.
- Major updates in Python qmf.console to handle dual-mode operation.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@926788 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/examples/qmf-agent/example.cpp | 9 | ||||
-rw-r--r-- | qpid/cpp/examples/qmf-agent/schema.xml | 1 | ||||
-rw-r--r-- | qpid/cpp/include/qpid/management/ManagementObject.h | 1 | ||||
-rw-r--r-- | qpid/cpp/managementgen/qmfgen/management-types.xml | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 24 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.h | 2 | ||||
-rw-r--r-- | qpid/extras/qmf/src/py/qmf/console.py | 509 |
7 files changed, 407 insertions, 140 deletions
diff --git a/qpid/cpp/examples/qmf-agent/example.cpp b/qpid/cpp/examples/qmf-agent/example.cpp index 7d9043e097..96edc2f4d4 100644 --- a/qpid/cpp/examples/qmf-agent/example.cpp +++ b/qpid/cpp/examples/qmf-agent/example.cpp @@ -98,7 +98,7 @@ CoreClass::CoreClass(ManagementAgent* _agent, string _name) : name(_name), agent static uint64_t persistId = 0x111222333444555LL; mgmtObject = new _qmf::Parent(agent, this, name); - agent->addObject(mgmtObject, persistId++); + agent->addObject(mgmtObject); mgmtObject->set_state("IDLE"); Variant::Map args; @@ -109,6 +109,11 @@ CoreClass::CoreClass(ManagementAgent* _agent, string _name) : name(_name), agent subMap["numeric-data"] = 10000; args["map-data"] = subMap; mgmtObject->set_args(args); + + Variant::List list; + list.push_back(20000); + list.push_back("string-item"); + mgmtObject->set_list(list); } void CoreClass::doLoop() @@ -190,7 +195,7 @@ int main_int(int argc, char** argv) _qmf::Package packageInit(agent); // Name the agent. - agent->setName("apache.org", "qmf-example"); + agent->setName("apache.org", "qmf-example", "A"); // Start the agent. It will attempt to make a connection to the // management broker diff --git a/qpid/cpp/examples/qmf-agent/schema.xml b/qpid/cpp/examples/qmf-agent/schema.xml index 5662683eab..3c7755fe83 100644 --- a/qpid/cpp/examples/qmf-agent/schema.xml +++ b/qpid/cpp/examples/qmf-agent/schema.xml @@ -30,6 +30,7 @@ <property name="name" type="sstr" access="RC" index="y"/> <property name="args" type="map" access="RO"/> + <property name="list" type="list" access="RO"/> <statistic name="state" type="sstr" desc="Operational state of the link"/> <statistic name="count" type="count64" unit="tick" desc="Counter that increases monotonically"/> diff --git a/qpid/cpp/include/qpid/management/ManagementObject.h b/qpid/cpp/include/qpid/management/ManagementObject.h index 50c396d2a9..2a5e5b6e52 100644 --- a/qpid/cpp/include/qpid/management/ManagementObject.h +++ b/qpid/cpp/include/qpid/management/ManagementObject.h @@ -97,6 +97,7 @@ public: static const uint8_t TYPE_S16 = 17; static const uint8_t TYPE_S32 = 18; static const uint8_t TYPE_S64 = 19; + static const uint8_t TYPE_LIST = 21; static const uint8_t ACCESS_RC = 1; static const uint8_t ACCESS_RW = 2; diff --git a/qpid/cpp/managementgen/qmfgen/management-types.xml b/qpid/cpp/managementgen/qmfgen/management-types.xml index 139be6c5df..95434a278b 100644 --- a/qpid/cpp/managementgen/qmfgen/management-types.xml +++ b/qpid/cpp/managementgen/qmfgen/management-types.xml @@ -41,6 +41,7 @@ <type name="double" base="DOUBLE" cpp="double" encode="@.putDouble(#)" decode="# = @.getDouble()" stream="#" size="8" accessor="direct" init="0."/> <type name="uuid" base="UUID" cpp="::qpid::messaging::Uuid" encode="#.encode(@)" decode="#.decode(@)" stream="#" size="16" accessor="direct" init="::qpid::messaging::Uuid()" byRef="y" unmap="(#).asUuid().data()" map="::qpid::messaging::Uuid((#).data())" /> <type name="map" base="FTABLE" cpp="::qpid::messaging::VariantMap" encode="#.encode(@)" decode="#.decode(@)" stream="#" size="#.encodedSize()" accessor="direct" init="::qpid::messaging::VariantMap()" byRef="y" unmap="::qpid::messaging::VariantMap(); assert(false); /*TBD*/"/> +<type name="list" base="LIST" cpp="::qpid::messaging::Variant::List" encode="#.encode(@)" decode="#.decode(@)" stream="#" size="#.encodedSize()" accessor="direct" init="::qpid::messaging::Variant::List()" byRef="y" unmap="::qpid::messaging::Variant::List(); assert(false); /*TBD*/"/> <type name="hilo8" base="U8" cpp="uint8_t" encode="@.putOctet(#)" decode="# = @.getOctet()" style="wm" stream="#" size="1" accessor="counter" init="0"/> <type name="hilo16" base="U16" cpp="uint16_t" encode="@.putShort(#)" decode="# = @.getShort()" style="wm" stream="#" size="2" accessor="counter" init="0"/> diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 845d74d1ea..1633e77a4f 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -156,7 +156,7 @@ void ManagementAgentImpl::init(const string& brokerHost, void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings, uint16_t intervalSeconds, bool useExternalThread, - const std::string& _storeFile) + const string& _storeFile) { interval = intervalSeconds; extThread = useExternalThread; @@ -448,7 +448,7 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) } } -void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence) +void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo) { Mutex::ScopedLock lock(agentLock); string packageName; @@ -468,14 +468,14 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc SchemaClass& schema = cIter->second; Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - std::string body; + string body; encodeHeader(outBuffer, 's', sequence); schema.writeSchemaCall(body); outBuffer.putRawData(body); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); + connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name); } @@ -507,7 +507,7 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID); failed = true; } else { - std::string methodName; + string methodName; ObjectId objId; qpid::messaging::Variant::Map inArgs; @@ -738,7 +738,7 @@ void ManagementAgentImpl::received(Message& msg) if (checkHeader(inBuffer, &opcode, &sequence)) { if (opcode == 'a') handleAttachResponse(inBuffer); - else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence); + else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToKey); else if (opcode == 'x') handleConsoleAddedIndication(); QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode)); } @@ -754,15 +754,15 @@ void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq buf.putLong (seq); } -qpid::messaging::Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const std::string& pname, - const std::string& cname, +qpid::messaging::Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname, + const string& cname, const uint8_t *md5Sum) { qpid::messaging::Variant::Map map_; map_["_package_name"] = pname; map_["_class_name"] = cname; - map_["_hash_str"] = messaging::Uuid((const char*) md5Sum); + map_["_hash_str"] = messaging::Uuid(md5Sum); return map_; } @@ -922,7 +922,10 @@ void ManagementAgentImpl::periodicProcessing() if (send_stats || send_props) { ::qpid::messaging::Variant::Map map_; ::qpid::messaging::Variant::Map values; + ::qpid::messaging::Variant::Map oid; + object->getObjectId().mapEncode(oid); + map_["_object_id"] = oid; map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), object->getClassName(), object->getMd5Sum()); @@ -938,7 +941,7 @@ void ManagementAgentImpl::periodicProcessing() } content.encode(); - const std::string &str = m.getContent(); + const string &str = m.getContent(); if (str.length()) { ::qpid::messaging::Variant::Map headers; headers["method"] = "indication"; @@ -1105,6 +1108,7 @@ void ManagementAgentImpl::ConnectionThread::sendMessage(Message msg, msg.getDeliveryProperties().setRoutingKey(routingKey); msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); + msg.getMessageProperties().getApplicationHeaders().setString("qmf.agent", agent.name_address); try { session.messageTransfer(arg::content=msg, arg::destination=exchange); } catch(exception& e) { diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h index 399c9d7944..69a891a1b2 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -264,7 +264,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void handleAttachResponse (qpid::framing::Buffer& inBuffer); void handlePackageRequest (qpid::framing::Buffer& inBuffer); void handleClassQuery (qpid::framing::Buffer& inBuffer); - void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence); + void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& replyTo); void invokeMethodRequest (const std::string& body, const std::string& cid, const std::string& replyTo); void handleGetQuery (const std::string& body, const std::string& content_type, diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py index 226d586f8b..0b0ec417d0 100644 --- a/qpid/extras/qmf/src/py/qmf/console.py +++ b/qpid/extras/qmf/src/py/qmf/console.py @@ -119,10 +119,16 @@ class Object(object): """ This class defines a 'proxy' object representing a real managed object on an agent. Actions taken on this proxy are remotely affected on the real managed object. """ - def __init__(self, session, broker, schema, codec, prop, stat, managed=True, kwargs={}): + def __init__(self, session, broker, schema, codec=None, prop=None, stat=None, managed=True, v2Map=None, agentName=None, kwargs={}): self._session = session self._broker = broker self._schema = schema + self._properties = [] + self._statistics = [] + if v2Map: + self.v2Init(v2Map, agentName) + return + self._managed = managed if self._managed: self._currentTime = codec.read_uint64() @@ -134,8 +140,6 @@ class Object(object): self._createTime = None self._deleteTime = None self._objectId = None - self._properties = [] - self._statistics = [] if codec: if prop: notPresent = self._parsePresenceMasks(codec, schema) @@ -156,6 +160,27 @@ class Object(object): for statistic in schema.getStatistics(): self._statistics.append((statistic, self._session._defaultValue(statistic, broker, kwargs))) + def v2Init(self, omap, agentName): + if omap.__class__ != dict: + raise Exception("QMFv2 object data must be a map/dict") + if '_values' not in omap: + raise Exception("QMFv2 object must have '_values' element") + + values = omap['_values'] + for prop in self._schema.getProperties(): + if prop.name in values: + self._properties.append((prop, values[prop.name])) + for stat in self._schema.getStatistics(): + if stat.name in values: + self._statistics.append((stat, values[stat.name])) + if '_subtypes' in omap: + self._subtypes = omap['_subtypes'] + if '_object_id' in omap: + self._managed = True + self._objectId = ObjectId(omap['_object_id'], agentName=agentName) + else: + self._managed = None + def getBroker(self): """ Return the broker from which this object was sent """ return self._broker @@ -244,17 +269,17 @@ class Object(object): for method in self._schema.getMethods(): if name == method.name: return lambda *args, **kwargs : self._invoke(name, args, kwargs) - for property, value in self._properties: - if name == property.name: + for prop, value in self._properties: + if name == prop.name: return value - if name == "_" + property.name + "_" and property.type == 10: # Dereference references + if name == "_" + prop.name + "_" and prop.type == 10: # Dereference references deref = self._session.getObjects(_objectId=value, _broker=self._broker) if len(deref) != 1: return None else: return deref[0] - for statistic, value in self._statistics: - if name == statistic.name: + for stat, value in self._statistics: + if name == stat.name: return value raise Exception("Type Object has no attribute '%s'" % name) @@ -449,7 +474,7 @@ class Session: """ self.console = console self.brokers = [] - self.packages = {} + self.schemaCache = SchemaCache() self.seqMgr = SequenceManager() self.cv = Condition() self.syncSequenceList = [] @@ -470,6 +495,134 @@ class Session: if self.userBindings and not self.rcvObjects: raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided") + """ + ## + ## v2_data_queues is used to store object data received from QMFv2 agents. + ## It is stored here in case we need to go and query schema data from the + ## agent before reporting to the user. + ## + ## v2_data_queues is a map, keyed by agent address of queues of entries + ## The format of entries in the queue is a data map + ## This list must be protected by self.cv + ## + """ + self.v2_data_queues = {} + self.v2_pending_queues = {} + + def _getBrokerForAgentAddr(self, agent_addr): + broker = None + try: + self.cv.acquire() + key = (1, agent_addr) + for b in self.brokers: + if key in b.agents: + broker = b + finally: + self.cv.release() + return broker + + def _processV2Data(self): + """ + Attempt to make progress on the entries in the v2_data_queue. If an entry has a schema + that is in our schema cache, process it. Otherwise, send a request for the schema information + to the agent that manages the object. + """ + try: + self.cv.acquire() + pop_list = [] + for agent_addr in self.v2_data_queues: + entries = self.v2_data_queues[agent_addr] + keep_going = True + while keep_going and len(entries) > 0: + schemaId = self._getSchemaIdforV2ObjectLH(entries[0]) + schema = self.schemaCache.getSchema(schemaId) + if schema: + broker = self._getBrokerForAgentAddr(agent_addr) + obj = Object(self, broker, schema, v2Map=entries[0], agentName=agent_addr) + entries.pop(0) + + """ + TODO: This following code assumes that the data indication came unsolicited. + This needs to be enhanced to handle the case of a query response. + """ + if self.console: + self.console.objectProps(broker, obj) + + else: + """ + We have no schema for this data object, move the queue to the pending map and request + schema data from the agent + """ + self.v2_pending_queues[agent_addr] = self.v2_data_queues[agent_addr] + pop_list.append(agent_addr) + self._v2SendSchemaRequest(agent_addr, schemaId) + keep_going = None + for agent_addr in pop_list: + self.v2_data_queues.pop(agent_addr) + finally: + self.cv.release() + + def _addV2Data(self, agent_addr, data_map): + """ + Add data-for-processing to the work queue + """ + process = None + try: + self.cv.acquire() + if agent_addr in self.v2_pending_queues: + self.v2_pending_queues[agent_addr].append(data_map) + else: + if agent_addr not in self.v2_data_queues: + self.v2_data_queues[agent_addr] = [] + self.v2_data_queues[agent_addr].append(data_map) + process = True + finally: + self.cv.release() + + if process: + self._processV2Data() + + def _removeV2Agent(self, agent): + """ + Remove entries in the data queues related to a lost agent. + """ + agent_name = agent.getAgentBank() + try: + self.cv.acquire() + if agent_name in self.v2_data_queues: + self.v2_data_queues.pop(agent_name) + if agent_name in self.v2_pending_queues: + self.v2_pending_queues.pop(agent_name) + finally: + self.cv.release() + + def _schemaInfoFromV2Agent(self, agent_addr): + """ + We have just received new schema information from an agent. Check to see if there's + more work that can now be done. + """ + re_process = None + try: + self.cv.acquire() + if agent_addr in self.v2_pending_queues: + self.v2_data_queues[agent_addr] = self.v2_pending_queues.pop(agent_addr) + re_process = True + finally: + self.cv.release() + + if re_process: + self._processV2Data() + + def _getSchemaIdforV2ObjectLH(self, data): + """ + Given a data map, extract the schema-identifier. + """ + if data.__class__ != dict: + return None + if '_schema_id' in data: + return ClassKey(data['_schema_id']) + return None + def __repr__(self): return "QMF Console Session Manager (brokers: %d)" % len(self.brokers) @@ -489,6 +642,7 @@ class Session: returned from the addBroker call """ if self.console: for agent in broker.getAgents(): + self.console.removev2Agent(agent) self.console.delAgent(agent) broker._shutdown() self.brokers.remove(broker) @@ -498,30 +652,19 @@ class Session: """ Get the list of known QMF packages """ for broker in self.brokers: broker._waitForStable() - list = [] - for package in self.packages: - list.append(package) - return list + return self.schemaCache.getPackages() def getClasses(self, packageName): """ Get the list of known classes within a QMF package """ for broker in self.brokers: broker._waitForStable() - list = [] - if packageName in self.packages: - for pkey in self.packages[packageName]: - list.append(self.packages[packageName][pkey].getKey()) - return list + return self.schemaCache.getClasses(packageName) def getSchema(self, classKey): """ Get the schema for a QMF class """ for broker in self.brokers: broker._waitForStable() - pname = classKey.getPackageName() - pkey = classKey.getPackageKey() - if pname in self.packages: - if pkey in self.packages[pname]: - return self.packages[pname][pkey] + return self.schemaCache.getSchema(classKey) def bindPackage(self, packageName): """ Request object updates for all table classes within a package. """ @@ -743,6 +886,7 @@ class Session: def _handleBrokerDisconnect(self, broker): if self.console: for agent in broker.getAgents(): + self.session._removeV2Agent(agent) self.console.delAgent(agent) self.console.brokerDisconnected(broker) @@ -761,14 +905,7 @@ class Session: def _handlePackageInd(self, broker, codec, seq): pname = str(codec.read_str8()) - notify = False - try: - self.cv.acquire() - if pname not in self.packages: - self.packages[pname] = {} - notify = True - finally: - self.cv.release() + notify = self.schemaCache.declarePackage(pname) if notify and self.console != None: self.console.newPackage(pname) @@ -806,17 +943,9 @@ class Session: def _handleClassInd(self, broker, codec, seq): kind = codec.read_uint8() classKey = ClassKey(codec) - unknown = False + schema = self.schemaCache.getSchema(classKey) - try: - self.cv.acquire() - if classKey.getPackageName() in self.packages: - if classKey.getPackageKey() not in self.packages[classKey.getPackageName()]: - unknown = True - finally: - self.cv.release() - - if unknown: + if not schema: # Send a schema request for the unknown class broker._incOutstanding() sendCodec = Codec() @@ -879,37 +1008,27 @@ class Session: event = Event(self, broker, codec) self.console.event(broker, event) - def _handleSchemaResp(self, broker, codec, seq): + def _handleSchemaResp(self, broker, codec, seq, agent_addr): kind = codec.read_uint8() classKey = ClassKey(codec) _class = SchemaClass(kind, classKey, codec, self) - try: - self.cv.acquire() - self.packages[classKey.getPackageName()][classKey.getPackageKey()] = _class - finally: - self.cv.release() - + self.schemaCache.declareClass(classKey, _class) self.seqMgr._release(seq) broker._decOutstanding() if self.console != None: self.console.newClass(kind, classKey) + if agent_addr: + self._schemaInfoFromV2Agent(agent_addr) + def _handleContentInd(self, broker, codec, seq, prop=False, stat=False): classKey = ClassKey(codec) - try: - self.cv.acquire() - pname = classKey.getPackageName() - if pname not in self.packages: - return - pkey = classKey.getPackageKey() - if pkey not in self.packages[pname]: - return - schema = self.packages[pname][pkey] - finally: - self.cv.release() + schema = self.schemaCache.getSchema(classKey) + if not schema: + return object = Object(self, broker, schema, codec, prop, stat) - if pname == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop: + if classKey.getPackageName() == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop: broker._updateAgent(object) try: @@ -949,7 +1068,15 @@ class Session: self._v2HandleHeartbeatInd(broker, mp, ah, content) def _v2HandleDataInd(self, broker, mp, ah, content): - pass + kind = "_data" + if "qmf.content" in ah: + kind = ah["qmf.content"] + agent_addr = ah["qmf.agent"] + if content.__class__ != list: + return + if kind == "_data": + for omap in content: + self._addV2Data(agent_addr, omap) def _v2HandleQueryRsp(self, broker, mp, ah, content): pass @@ -960,6 +1087,24 @@ class Session: def _v2HandleException(self, broker, mp, ah, content): pass + def _v2SendSchemaRequest(self, agent_addr, schemaId): + """ + Send a query to an agent to request details on a particular schema class. + IMPORTANT: This function currently sends a QMFv1 schema-request to the address of + the agent. The agent will send its response to amq.direct/<our-key>. + Eventually, this will be converted to a proper QMFv2 schema query. + """ + broker = self._getBrokerForAgentAddr(agent_addr) + if not broker: + return + + sendCodec = Codec() + seq = self.seqMgr._reserve(None) + broker._setHeader(sendCodec, 'S', seq) + schemaId.encode(sendCodec) + smsg = broker._message(sendCodec.encoded, agent_addr) + broker._send(smsg, "qmf.default.direct") + def _handleError(self, error): try: self.cv.acquire() @@ -1004,17 +1149,9 @@ class Session: inner_type_code = codec.read_uint8() if inner_type_code == 20: classKey = ClassKey(codec) - try: - self.cv.acquire() - pname = classKey.getPackageName() - if pname not in self.packages: - return None - pkey = classKey.getPackageKey() - if pkey not in self.packages[pname]: - return None - schema = self.packages[pname][pkey] - finally: - self.cv.release() + schema = self.schemaCache.getSchema(classKey) + if not schema: + return None data = Object(self, broker, schema, codec, True, True, False) else: data = self._decodeValue(codec, inner_type_code, broker) @@ -1206,15 +1343,86 @@ class Session: return seq return None -class Package: - """ """ - def __init__(self, name): - self.name = name +class SchemaCache(object): + """ + The SchemaCache is a data structure that stores learned schema information. + """ + def __init__(self): + """ + Create a map of schema packages and a lock to protect this data structure. + Note that this lock is at the bottom of any lock hierarchy. If it is held, no other + lock in the system should attempt to be acquired. + """ + self.packages = {} + self.lock = Lock() + + def getPackages(self): + """ Get the list of known QMF packages """ + list = [] + try: + self.lock.acquire() + for package in self.packages: + list.append(package) + finally: + self.lock.release() + return list + + def getClasses(self, packageName): + """ Get the list of known classes within a QMF package """ + list = [] + try: + self.lock.acquire() + if packageName in self.packages: + for pkey in self.packages[packageName]: + list.append(self.packages[packageName][pkey].getKey()) + finally: + self.lock.release() + return list + + def getSchema(self, classKey): + """ Get the schema for a QMF class """ + pname = classKey.getPackageName() + pkey = classKey.getPackageKey() + try: + self.lock.acquire() + if pname in self.packages: + if pkey in self.packages[pname]: + return self.packages[pname][pkey] + finally: + self.lock.release() + return None + + def declarePackage(self, pname): + """ Maybe add a package to the cache. Return True if package was added, None if it pre-existed. """ + try: + self.lock.acquire() + if pname in self.packages: + return None + self.packages[pname] = {} + finally: + self.lock.release() + return True + + def declareClass(self, classKey, classDef): + """ Maybe add a class definition to the cache. Return True if added, None if pre-existed. """ + pname = classKey.getPackageName() + pkey = classKey.getPackageKey() + try: + self.lock.acquire() + if pname not in self.packages: + self.packages[pname] = {} + packageMap = self.packages[pname] + if pkey in packageMap: + return None + packageMap[pkey] = classDef + finally: + self.lock.release() + return True class ClassKey: """ A ClassKey uniquely identifies a class from the schema. """ def __init__(self, constructor): - if type(constructor) == str: + if constructor.__class__ == str: # construct from __repr__ string try: self.pname, cls = constructor.split(":") @@ -1225,20 +1433,30 @@ class ClassKey: h1 = int(hexValues[1], 16) h2 = int(hexValues[2], 16) h3 = int(hexValues[3], 16) - self.hash = struct.pack("!LLLL", h0, h1, h2, h3) + h4 = int(hexValues[4][0:4], 16) + h5 = int(hexValues[4][4:12], 16) + self.hash = UUID(struct.pack("!LHHHHL", h0, h1, h2, h3, h4, h5)) except: raise Exception("Invalid ClassKey format") + elif constructor.__class__ == dict: + # construct from QMFv2 map + try: + self.pname = constructor['_package_name'] + self.cname = constructor['_class_name'] + self.hash = constructor['_hash_str'] + except: + raise Exception("Invalid ClassKey map format") else: # construct from codec codec = constructor self.pname = str(codec.read_str8()) self.cname = str(codec.read_str8()) - self.hash = codec.read_bin128() + self.hash = UUID(codec.read_bin128()) def encode(self, codec): codec.write_str8(self.pname) codec.write_str8(self.cname) - codec.write_bin128(self.hash) + codec.write_bin128(self.hash.bytes) def getPackageName(self): return self.pname @@ -1250,7 +1468,7 @@ class ClassKey: return self.hash def getHashString(self): - return "%08x-%08x-%08x-%08x" % struct.unpack ("!LLLL", self.hash) + return str(self.hash) def getPackageKey(self): return (self.cname, self.hash) @@ -1442,61 +1660,93 @@ class SchemaArgument: class ObjectId: """ Object that represents QMF object identifiers """ - def __init__(self, codec, first=0, second=0): - if codec: - self.first = codec.read_uint64() - self.second = codec.read_uint64() + def __init__(self, constructor, first=0, second=0, agentName=None): + if constructor.__class__ == dict: + self.agentName = agentName + self.agentEpoch = 0 + if '_agent_name' in constructor: self.agentName = constructor['_agent_name'] + if '_agent_epoch' in constructor: self.agentEpoch = constructor['_agent_epoch'] + if '_object_name' not in constructor: + raise Exception("QMFv2 OBJECT_ID must have the '_object_name' field.") + self.objectName = constructor['_object_name'] else: - self.first = first - self.second = second + if not constructor: + first = first + second = second + else: + first = constructor.read_uint64() + second = constructor.read_uint64() + self.agentName = str((first & 0x0000FFFFF0000000) >> 28) + self.agentEpoch = (first & 0x0FFF000000000000) >> 48 + self.objectName = str(second) def __cmp__(self, other): if other == None or not isinstance(other, ObjectId) : return 1 - if self.first < other.first: + + if self.objectName < other.objectName: + return -1 + if self.objectName > other.objectName: + return 1 + + if self.agentName < other.agentName: return -1 - if self.first > other.first: + if self.agentName > other.agentName: return 1 - if self.second < other.second: + + if self.agentEpoch < other.agentEpoch: return -1 - if self.second > other.second: + if self.agentEpoch > other.agentEpoch: return 1 return 0 def __repr__(self): - return "%d-%d-%d-%d-%d" % (self.getFlags(), self.getSequence(), + return "%d-%d-%d-%s-%s" % (self.getFlags(), self.getSequence(), self.getBrokerBank(), self.getAgentBank(), self.getObject()) def index(self): - return (self.first, self.second) + return self.__repr__() def getFlags(self): - return (self.first & 0xF000000000000000) >> 60 + return 0 def getSequence(self): - return (self.first & 0x0FFF000000000000) >> 48 + return self.agentEpoch def getBrokerBank(self): - return (self.first & 0x0000FFFFF0000000) >> 28 + return 1 def getAgentBank(self): - return self.first & 0x000000000FFFFFFF + return self.agentName def getObject(self): - return self.second + return self.objectName def isDurable(self): return self.getSequence() == 0 def encode(self, codec): - codec.write_uint64(self.first) - codec.write_uint64(self.second) + first = self.agentEpoch << 48 + second = 0 + + try: + first += int(self.agentName) << 28 + except: + pass + + try: + second = int(self.objectName) + except: + pass + + codec.write_uint64(first) + codec.write_uint64(second) def __hash__(self): - return (self.first, self.second).__hash__() + return self.__repr__().__hash__() def __eq__(self, other): - return (self.first, self.second).__eq__(other) + return self.__repr__().__eq__(other) class MethodResult(object): """ """ @@ -1590,7 +1840,6 @@ class Broker: self.authUser = authUser self.authPass = authPass self.cv = Condition() - self.agentLock = Lock() self.error = None self.brokerId = None self.connected = False @@ -1625,11 +1874,11 @@ class Broker: """ Return the agent object associated with a particular broker and agent bank value.""" bankKey = (brokerBank, agentBank) try: - self.agentLock.acquire() + self.cv.acquire() if bankKey in self.agents: return self.agents[bankKey] finally: - self.agentLock.release() + self.cv.release() return None def getSessionId(self): @@ -1639,10 +1888,10 @@ class Broker: def getAgents(self): """ Get the list of agents reachable via this broker """ try: - self.agentLock.acquire() + self.cv.acquire() return self.agents.values() finally: - self.agentLock.release() + self.cv.release() def getAmqpSession(self): """ Get the AMQP session object for this connected broker. """ @@ -1672,11 +1921,11 @@ class Broker: def _tryToConnect(self): try: try: - self.agentLock.acquire() + self.cv.acquire() self.agents = {} self.agents[(1,0)] = Agent(self, 0, "BrokerAgent") finally: - self.agentLock.release() + self.cv.release() self.topicBound = False self.syncInFlight = False @@ -1768,35 +2017,35 @@ class Broker: agent = None if obj._deleteTime == 0: try: - self.agentLock.acquire() + self.cv.acquire() if bankKey not in self.agents: agent = Agent(self, obj.agentBank, obj.label) self.agents[bankKey] = agent finally: - self.agentLock.release() + self.cv.release() if agent and self.session.console: self.session.console.newAgent(agent) else: try: - self.agentLock.acquire() + self.cv.acquire() agent = self.agents.pop(bankKey, None) finally: - self.agentLock.release() + self.cv.release() if agent and self.session.console: self.session.console.delAgent(agent) def _addAgent(self, name, agent): try: - self.agentLock.acquire() + self.cv.acquire() self.agents[(1, name)] = agent finally: - self.agentLock.release() + self.cv.release() if self.session.console: self.session.console.newAgent(agent) def _ageAgents(self): try: - self.agentLock.acquire() + self.cv.acquire() to_delete = [] to_notify = [] for key in self.agents: @@ -1805,12 +2054,16 @@ class Broker: for key in to_delete: to_notify.append(self.agents.pop(key, None)) finally: - self.agentLock.release() + self.cv.release() if self.session.console: for agent in to_notify: + self.session._removeV2Agent(agent) self.session.console.delAgent(agent) def _v2SendAgentLocate(self, predicate={}): + """ + Broadcast an agent-locate request to cause all agents in the domain to tell us who they are. + """ dp = self.amqpSession.delivery_properties() dp.routing_key = "console.request.agent_locate" mp = self.amqpSession.message_properties() @@ -1915,6 +2168,11 @@ class Broker: self.cv.release() def _replyCb(self, msg): + agent_addr = None + mp = msg.get("message_properties") + ah = mp.application_headers + if ah and 'qmf.agent' in ah: + agent_addr = ah['qmf.agent'] codec = Codec(msg.body) while True: opcode, seq = self._checkHeader(codec) @@ -1926,7 +2184,7 @@ class Broker: elif opcode == 'm': self.session._handleMethodResp (self, codec, seq) elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg) elif opcode == 'e': self.session._handleEventInd (self, codec, seq) - elif opcode == 's': self.session._handleSchemaResp (self, codec, seq) + elif opcode == 's': self.session._handleSchemaResp (self, codec, seq, agent_addr) elif opcode == 'c': self.session._handleContentInd (self, codec, seq, prop=True) elif opcode == 'i': self.session._handleContentInd (self, codec, seq, stat=True) elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True) @@ -2014,15 +2272,12 @@ class Event: self.classKey = ClassKey(codec) self.timestamp = codec.read_int64() self.severity = codec.read_uint8() - self.schema = None - pname = self.classKey.getPackageName() - pkey = self.classKey.getPackageKey() - if pname in session.packages: - if pkey in session.packages[pname]: - self.schema = session.packages[pname][pkey] - self.arguments = {} - for arg in self.schema.arguments: - self.arguments[arg.name] = session._decodeValue(codec, arg.type, broker) + self.schema = session.schemaCache.getSchema(self.classKey) + if not self.schema: + return + self.arguments = {} + for arg in self.schema.arguments: + self.arguments[arg.name] = session._decodeValue(codec, arg.type, broker) def __repr__(self): if self.schema == None: |