From 0c72a9c689407001ab0ccab00d091166b93ed9ec Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Fri, 26 Mar 2010 17:01:23 +0000 Subject: Updating qmf branch with latest updates: - Implemented V2 query in Python - Cleaned up map formatting in c++ agents - Updated for changes in the protocol wiki - Significant cleanup and refactoring in the pure-Python console git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@927970 13f79535-47bb-0310-9956-ffa450edef68 --- .../cpp/include/qpid/management/ManagementObject.h | 3 +- qpid/cpp/managementgen/qmfgen/schema.py | 2 +- qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 267 +++---- qpid/cpp/src/qpid/agent/ManagementAgentImpl.h | 8 +- qpid/cpp/src/qpid/management/ManagementAgent.cpp | 10 +- qpid/cpp/src/qpid/management/ManagementObject.cpp | 5 +- qpid/extras/qmf/src/py/qmf/console.py | 855 ++++++++++++++------- 7 files changed, 727 insertions(+), 423 deletions(-) diff --git a/qpid/cpp/include/qpid/management/ManagementObject.h b/qpid/cpp/include/qpid/management/ManagementObject.h index 2a5e5b6e52..1bb3c57ed2 100644 --- a/qpid/cpp/include/qpid/management/ManagementObject.h +++ b/qpid/cpp/include/qpid/management/ManagementObject.h @@ -59,7 +59,8 @@ protected: void fromString(const std::string&); public: QPID_COMMON_EXTERN ObjectId() : agent(0), first(0) {} - QPID_COMMON_EXTERN ObjectId(const messaging::Variant& map) : agent(0) { mapDecode(map.asMap()); } + QPID_COMMON_EXTERN ObjectId(const messaging::Variant& map) : + agent(0), first(0), agentEpoch(0) { mapDecode(map.asMap()); } QPID_COMMON_EXTERN ObjectId(uint8_t flags, uint16_t seq, uint32_t broker); QPID_COMMON_EXTERN ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq); QPID_COMMON_EXTERN ObjectId(std::istream&); diff --git a/qpid/cpp/managementgen/qmfgen/schema.py b/qpid/cpp/managementgen/qmfgen/schema.py index ca6628c366..a827799b4b 100755 --- a/qpid/cpp/managementgen/qmfgen/schema.py +++ b/qpid/cpp/managementgen/qmfgen/schema.py @@ -682,7 +682,7 @@ class SchemaStatistic: def genMap (self, stream): if self.type.type.perThread: - self.type.type.genMap(stream, "totals." + self.name) + self.type.type.genMap(stream, "totals." + self.name, key=self.name) else: self.type.type.genMap(stream, self.name) diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 1633e77a4f..5966b0766c 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -44,6 +44,7 @@ using std::ifstream; using std::string; using std::cout; using std::endl; +using qpid::messaging::Variant; Mutex ManagementAgent::Singleton::lock; bool ManagementAgent::Singleton::disabled = false; @@ -179,7 +180,7 @@ void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings, void ManagementAgentImpl::registerClass(const string& packageName, const string& className, uint8_t* md5Sum, - qpid::management::ManagementObject::writeSchemaCall_t schemaCall) + ManagementObject::writeSchemaCall_t schemaCall) { Mutex::ScopedLock lock(agentLock); PackageMap::iterator pIter = findOrAddPackage(packageName); @@ -189,7 +190,7 @@ void ManagementAgentImpl::registerClass(const string& packageName, void ManagementAgentImpl::registerEvent(const string& packageName, const string& eventName, uint8_t* md5Sum, - qpid::management::ManagementObject::writeSchemaCall_t schemaCall) + ManagementObject::writeSchemaCall_t schemaCall) { Mutex::ScopedLock lock(agentLock); PackageMap::iterator pIter = findOrAddPackage(packageName); @@ -239,12 +240,12 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." << event.getPackageName() << "." << event.getEventName(); - ::qpid::messaging::Message msg; - ::qpid::messaging::MapContent content(msg); - ::qpid::messaging::VariantMap &map_ = content.asMap(); - ::qpid::messaging::VariantMap schemaId; - ::qpid::messaging::VariantMap values; - ::qpid::messaging::VariantMap headers; + qpid::messaging::Message msg; + qpid::messaging::MapContent content(msg); + Variant::Map &map_ = content.asMap(); + Variant::Map schemaId; + Variant::Map values; + Variant::Map headers; map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(), event.getEventName(), @@ -379,73 +380,29 @@ void ManagementAgentImpl::sendHeartbeat() QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); } -void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequence, - uint32_t code, string text) +void ManagementAgentImpl::sendException(const string& replyToKey, const string& cid, + const string& text, uint32_t code) { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - encodeHeader(outBuffer, 'z', sequence); - outBuffer.putLong(code); - outBuffer.putShortString(text); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyToKey); - QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text); -} - -void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) -{ - Mutex::ScopedLock lock(agentLock); - - assignedBrokerBank = inBuffer.getLong(); - assignedAgentBank = inBuffer.getLong(); - - QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank); - - if ((assignedBrokerBank != requestedBrokerBank) || - (assignedAgentBank != requestedAgentBank)) { - if (requestedAgentBank == 0) { - QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." << - assignedAgentBank); - } else { - QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank << - "." << assignedAgentBank); - } - storeData(); - requestedBrokerBank = assignedBrokerBank; - requestedAgentBank = assignedAgentBank; - } + static const string addr_exchange("qmf.default.direct"); - attachment.setBanks(assignedBrokerBank, assignedAgentBank); + messaging::Message msg; + messaging::MapContent content(msg); + messaging::Variant::Map& map(content.asMap()); + messaging::Variant::Map headers; + messaging::Variant::Map values; - // Bind to qpid.management to receive commands - connThreadBody.bindToBank(assignedBrokerBank, assignedAgentBank); + headers["method"] = "indication"; + headers["qmf.opcode"] = "_exception"; + headers["qmf.agent"] = name_address; - // Send package indications for all local packages - for (PackageMap::iterator pIter = packages.begin(); - pIter != packages.end(); - pIter++) { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + values["error_code"] = code; + values["error_text"] = text; + map["_values"] = values; - encodeHeader(outBuffer, 'p'); - encodePackageIndication(outBuffer, pIter); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); + content.encode(); + connThreadBody.sendBuffer(msg.getContent(), cid, headers, addr_exchange, replyToKey); - // Send class indications for all local classes - ClassMap cMap = pIter->second; - for (ClassMap::iterator cIter = cMap.begin(); cIter != cMap.end(); cIter++) { - outBuffer.reset(); - encodeHeader(outBuffer, 'q'); - encodeClassIndication(outBuffer, pIter, cIter); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); - } - } + QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text); } void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo) @@ -509,7 +466,7 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& } else { string methodName; ObjectId objId; - qpid::messaging::Variant::Map inArgs; + Variant::Map inArgs; try { // coversions will throw if input is invalid. @@ -539,7 +496,7 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& } } - qpid::messaging::Variant::Map headers; + Variant::Map headers; headers["method"] = "response"; headers["qmf.agent"] = name_address; if (failed) @@ -551,20 +508,14 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo); } -void ManagementAgentImpl::handleGetQuery(const string& body, const string& contentType, - const string& cid, const string& replyTo) +void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& replyTo) { moveNewObjectsLH(); - if (contentType != "_query_v1") { - QPID_LOG(warning, "Support for QMF V2 Query format TBD!!!"); - return; - } - qpid::messaging::Message inMsg(body); qpid::messaging::MapView inMap(inMsg); qpid::messaging::MapView::const_iterator i; - ::qpid::messaging::Variant::Map headers; + Variant::Map headers; QPID_LOG(trace, "RCVD GetQuery: map=" << inMap << " cid=" << cid); @@ -572,71 +523,111 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& conte headers["qmf.opcode"] = "_query_response"; headers["qmf.content"] = "_data"; headers["qmf.agent"] = name_address; - headers["partial"]; + headers["partial"] = Variant(); + + qpid::messaging::Message outMsg; + qpid::messaging::ListContent content(outMsg); + Variant::List &list_ = content.asList(); + Variant::Map map_; + Variant::Map values; + Variant::Map oidMap; + + /* + * Unpack the _what element of the query. Currently we only support OBJECT queries. + */ + i = inMap.find("_what"); + if (i == inMap.end()) { + sendException(replyTo, cid, "_what element missing in Query"); + return; + } + + if (i->second.getType() != qpid::messaging::VAR_STRING) { + sendException(replyTo, cid, "_what element is not a string"); + return; + } + + if (i->second.asString() != "OBJECT") { + sendException(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported"); + return; + } - ::qpid::messaging::Message outMsg; - ::qpid::messaging::ListContent content(outMsg); - ::qpid::messaging::Variant::List &list_ = content.asList(); - ::qpid::messaging::Variant::Map map_; - ::qpid::messaging::Variant::Map values; string className; + string packageName; - i = inMap.find("_class"); - if (i != inMap.end()) - try { - className = i->second.asString(); - } catch(exception& e) { - className.clear(); - QPID_LOG(trace, "RCVD GetQuery: invalid format - class target ignored."); - } + /* + * Handle the _schema_id element, if supplied. + */ + i = inMap.find("_schema_id"); + if (i != inMap.end() && i->second.getType() == qpid::messaging::VAR_MAP) { + const Variant::Map& schemaIdMap(i->second.asMap()); - if (className.empty()) { - ObjectId objId; - i = inMap.find("_object_id"); - if (i != inMap.end()) { - - try { - objId = ObjectId(i->second.asMap()); - } catch (exception &e) { - objId = ObjectId(); // empty object id - won't find a match (I hope). - QPID_LOG(trace, "RCVD GetQuery (invalid Object Id format) to=" << replyTo << " seq=" << cid); - } + Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name"); + if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::messaging::VAR_STRING) + className = s_iter->second.asString(); - ManagementObjectMap::iterator iter = managementObjects.find(objId); - if (iter != managementObjects.end()) { - ManagementObject* object = iter->second; + s_iter = schemaIdMap.find("_package_name"); + if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::messaging::VAR_STRING) + packageName = s_iter->second.asString(); + } - if (object->getConfigChanged() || object->getInstChanged()) - object->setUpdateTime(); + /* + * Unpack the _object_id element of the query if it is present. If it is present, find that one + * object and return it. If it is not present, send a class-based result. + */ + i = inMap.find("_object_id"); + if (i != inMap.end() && i->second.getType() == qpid::messaging::VAR_MAP) { + ObjectId objId(i->second); - object->mapEncodeValues(values, true, true); // write both stats and properties - map_["_values"] = values; - list_.push_back(map_); + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter != managementObjects.end()) { + ManagementObject* object = iter->second; - content.encode(); - connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo); - } + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + object->mapEncodeValues(values, true, true); // write both stats and properties + objId.mapEncode(oidMap); + map_["_values"] = values; + map_["_object_id"] = oidMap; + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + object->getMd5Sum()); + list_.push_back(map_); + headers.erase("partial"); + + content.encode(); + connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo); + return; } } else { for (ManagementObjectMap::iterator iter = managementObjects.begin(); iter != managementObjects.end(); iter++) { ManagementObject* object = iter->second; - if (object->getClassName() == className) { + if (object->getClassName() == className && + (packageName.empty() || object->getPackageName() == packageName)) { // @todo support multiple object reply per message values.clear(); list_.clear(); + oidMap.clear(); if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); object->mapEncodeValues(values, true, true); // write both stats and properties + iter->first.mapEncode(oidMap); map_["_values"] = values; + map_["_object_id"] = oidMap; + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + object->getMd5Sum()); list_.push_back(map_); content.encode(); - connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo); + connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo); } } } @@ -645,8 +636,8 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& conte list_.clear(); headers.erase("partial"); content.encode(); - connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo); - QPID_LOG(trace, "SENT ObjectInd"); + connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo); } void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo) @@ -718,9 +709,7 @@ void ManagementAgentImpl::received(Message& msg) if (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToKey); else if (opcode == "_method_request") handleMethodRequest(msg.getData(), cid, replyToKey); - else if (opcode == "_query_request") handleGetQuery(msg.getData(), - mp.getApplicationHeaders().getAsString("qmf.content"), - cid, replyToKey); + else if (opcode == "_query_request") handleGetQuery(msg.getData(), cid, replyToKey); else { QPID_LOG(warning, "Support for QMF V2 Opcode [" << opcode << "] TBD!!!"); } @@ -737,9 +726,9 @@ void ManagementAgentImpl::received(Message& msg) if (checkHeader(inBuffer, &opcode, &sequence)) { - if (opcode == 'a') handleAttachResponse(inBuffer); - else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToKey); + if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToKey); else if (opcode == 'x') handleConsoleAddedIndication(); + else QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode)); } } @@ -754,15 +743,15 @@ void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq buf.putLong (seq); } -qpid::messaging::Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname, - const string& cname, - const uint8_t *md5Sum) +Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname, + const string& cname, + const uint8_t *md5Sum) { - qpid::messaging::Variant::Map map_; + Variant::Map map_; map_["_package_name"] = pname; map_["_class_name"] = cname; - map_["_hash_str"] = messaging::Uuid(md5Sum); + map_["_hash"] = messaging::Uuid(md5Sum); return map_; } @@ -821,7 +810,7 @@ void ManagementAgentImpl::addClassLocal(uint8_t classKind, PackageMap::iterator pIter, const string& className, uint8_t* md5Sum, - qpid::management::ManagementObject::writeSchemaCall_t schemaCall) + ManagementObject::writeSchemaCall_t schemaCall) { SchemaClassKey key; ClassMap& cMap = pIter->second; @@ -902,9 +891,9 @@ void ManagementAgentImpl::periodicProcessing() !baseObject->isDeleted())) continue; - ::qpid::messaging::Message m; - ::qpid::messaging::ListContent content(m); - ::qpid::messaging::Variant::List &list_ = content.asList(); + qpid::messaging::Message m; + qpid::messaging::ListContent content(m); + Variant::List &list_ = content.asList(); for (ManagementObjectMap::iterator iter = baseIter; iter != managementObjects.end(); @@ -920,9 +909,9 @@ void ManagementAgentImpl::periodicProcessing() send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish())); if (send_stats || send_props) { - ::qpid::messaging::Variant::Map map_; - ::qpid::messaging::Variant::Map values; - ::qpid::messaging::Variant::Map oid; + Variant::Map map_; + Variant::Map values; + Variant::Map oid; object->getObjectId().mapEncode(oid); map_["_object_id"] = oid; @@ -943,7 +932,7 @@ void ManagementAgentImpl::periodicProcessing() content.encode(); const string &str = m.getContent(); if (str.length()) { - ::qpid::messaging::Variant::Map headers; + Variant::Map headers; headers["method"] = "indication"; headers["qmf.opcode"] = "_data_indication"; headers["qmf.content"] = "_data"; @@ -1068,13 +1057,13 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data, const string& cid, - const qpid::messaging::VariantMap headers, + const Variant::Map headers, const string& exchange, const string& routingKey, const string& contentType) { Message msg; - qpid::messaging::VariantMap::const_iterator i; + Variant::Map::const_iterator i; if (!cid.empty()) msg.getMessageProperties().setCorrelationId(cid); diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h index 69a891a1b2..43b9f36c31 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -259,16 +259,14 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen const uint8_t *md5Sum); bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); void sendHeartbeat(); - void sendCommandComplete (std::string replyToKey, uint32_t sequence, - uint32_t code = 0, std::string text = std::string("OK")); - void handleAttachResponse (qpid::framing::Buffer& inBuffer); + void sendException(const std::string& replyToKey, const std::string& cid, + const std::string& text, uint32_t code=1); void handlePackageRequest (qpid::framing::Buffer& inBuffer); void handleClassQuery (qpid::framing::Buffer& inBuffer); void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& replyTo); void invokeMethodRequest (const std::string& body, const std::string& cid, const std::string& replyTo); - void handleGetQuery (const std::string& body, const std::string& content_type, - const std::string& cid, const std::string& replyTo); + void handleGetQuery (const std::string& body, const std::string& cid, const std::string& replyTo); void handleLocateRequest (const std::string& body, const std::string& sequence, const std::string& replyTo); void handleMethodRequest (const std::string& body, const std::string& sequence, const std::string& replyTo); void handleConsoleAddedIndication(); diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index b1802293b7..7e8dd7e764 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -62,8 +62,7 @@ static qpid::messaging::Variant::Map mapEncodeSchemaId(const std::string& pname, map_["_package_name"] = pname; map_["_class_name"] = cname; map_["_type"] = type; - map_["_hash_str"] = std::string((const char *)md5Sum, - qpid::management::ManagementObject::MD5_LEN); + map_["_hash"] = qpid::messaging::Uuid(md5Sum); return map_; } @@ -1896,9 +1895,8 @@ void ManagementAgent::disallow(const std::string& className, const std::string& } void ManagementAgent::SchemaClassKey::mapEncode(qpid::messaging::Variant::Map& _map) const { - const std::string hash_str((const char *)hash, sizeof(hash)); _map["_cname"] = name; - _map["_hash"] = hash_str; + _map["_hash"] = qpid::messaging::Uuid(hash); } void ManagementAgent::SchemaClassKey::mapDecode(const qpid::messaging::Variant::Map& _map) { @@ -1909,8 +1907,8 @@ void ManagementAgent::SchemaClassKey::mapDecode(const qpid::messaging::Variant:: } if ((i = _map.find("_hash")) != _map.end()) { - const std::string s = i->second.asString(); - memcpy(hash, s.data(), sizeof(hash)); + const qpid::messaging::Uuid& uuid = i->second.asUuid(); + memcpy(hash, uuid.data(), uuid.size()); } } diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp index 9f9216824c..68fcdaca71 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.cpp +++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp @@ -184,9 +184,6 @@ void ObjectId::mapDecode(const messaging::VariantMap& map) else throw Exception("Required _object_name field missing."); - if ((i = map.find("_first")) != map.end()) - first = i->second.asUint64(); - if ((i = map.find("_agent_name")) != map.end()) agentName = i->second.asString(); @@ -270,7 +267,7 @@ void ManagementObject::writeTimestamps (messaging::VariantMap& map) const sid["_package_name"] = getPackageName(); sid["_class_name"] = getClassName(); - sid["_hash_str"] = std::string((const char *)getMd5Sum(), MD5_LEN); + sid["_hash"] = qpid::messaging::Uuid(getMd5Sum()); map["_schema_id"] = sid; objectId.mapEncode(oid); diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py index 0b0ec417d0..4bbe69655e 100644 --- a/qpid/extras/qmf/src/py/qmf/console.py +++ b/qpid/extras/qmf/src/py/qmf/console.py @@ -41,6 +41,9 @@ from cStringIO import StringIO #import qpid.log #qpid.log.enable(name="qpid.io.cmd", level=qpid.log.DEBUG) +#=================================================================================================== +# CONSOLE +#=================================================================================================== class Console: """ To access the asynchronous operations, a class must be derived from Console with overrides of any combination of the available methods. """ @@ -94,6 +97,10 @@ class Console: """ Invoked when a method response from an asynchronous method call is received. """ pass + +#=================================================================================================== +# BrokerURL +#=================================================================================================== class BrokerURL(URL): def __init__(self, text): URL.__init__(self, text) @@ -115,13 +122,22 @@ class BrokerURL(URL): def match(self, host, port): return socket.getaddrinfo(self.host, self.port)[0][4] == socket.getaddrinfo(host, port)[0][4] + +#=================================================================================================== +# Object +#=================================================================================================== class Object(object): - """ This class defines a 'proxy' object representing a real managed object on an agent. - Actions taken on this proxy are remotely affected on the real managed object. """ - def __init__(self, session, broker, schema, codec=None, prop=None, stat=None, managed=True, v2Map=None, agentName=None, kwargs={}): - self._session = session - self._broker = broker + This class defines a 'proxy' object representing a real managed object on an agent. + Actions taken on this proxy are remotely affected on the real managed object. + """ + def __init__(self, agent, schema, codec=None, prop=None, stat=None, v2Map=None, agentName=None, kwargs={}): + self._agent = agent + self._session = None + self._broker = None + if agent: + self._session = agent.session + self._broker = agent.broker self._schema = schema self._properties = [] self._statistics = [] @@ -129,8 +145,7 @@ class Object(object): self.v2Init(v2Map, agentName) return - self._managed = managed - if self._managed: + if self._agent: self._currentTime = codec.read_uint64() self._createTime = codec.read_uint64() self._deleteTime = codec.read_uint64() @@ -176,10 +191,9 @@ class Object(object): if '_subtypes' in omap: self._subtypes = omap['_subtypes'] if '_object_id' in omap: - self._managed = True self._objectId = ObjectId(omap['_object_id'], agentName=agentName) else: - self._managed = None + self._objectId = None def getBroker(self): """ Return the broker from which this object was sent """ @@ -211,7 +225,7 @@ class Object(object): def isManaged(self): """ Return True iff this object is a proxy for a managed object on an agent. """ - return self._managed + return self._objectId and self._agent def getIndex(self): """ Return a string describing this object's primary key. """ @@ -250,7 +264,7 @@ class Object(object): """ Contact the agent and retrieve the lastest property and statistic values for this object. """ if not self.isManaged(): raise Exception("Object is not managed") - obj = self._session.getObjects(_objectId = self._objectId, _broker=self._broker) + obj = self._agent.getObjects(_objectId=self._objectId) if obj: self.mergeUpdate(obj[0]) else: @@ -423,6 +437,10 @@ class Object(object): bit = 0 return excludeList + +#=================================================================================================== +# Session +#=================================================================================================== class Session: """ An instance of the Session class represents a console session running @@ -495,132 +513,26 @@ class Session: if self.userBindings and not self.rcvObjects: raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided") - """ - ## - ## v2_data_queues is used to store object data received from QMFv2 agents. - ## It is stored here in case we need to go and query schema data from the - ## agent before reporting to the user. - ## - ## v2_data_queues is a map, keyed by agent address of queues of entries - ## The format of entries in the queue is a data map - ## This list must be protected by self.cv - ## - """ - self.v2_data_queues = {} - self.v2_pending_queues = {} - def _getBrokerForAgentAddr(self, agent_addr): - broker = None try: self.cv.acquire() key = (1, agent_addr) for b in self.brokers: if key in b.agents: - broker = b - finally: - self.cv.release() - return broker - - def _processV2Data(self): - """ - Attempt to make progress on the entries in the v2_data_queue. If an entry has a schema - that is in our schema cache, process it. Otherwise, send a request for the schema information - to the agent that manages the object. - """ - try: - self.cv.acquire() - pop_list = [] - for agent_addr in self.v2_data_queues: - entries = self.v2_data_queues[agent_addr] - keep_going = True - while keep_going and len(entries) > 0: - schemaId = self._getSchemaIdforV2ObjectLH(entries[0]) - schema = self.schemaCache.getSchema(schemaId) - if schema: - broker = self._getBrokerForAgentAddr(agent_addr) - obj = Object(self, broker, schema, v2Map=entries[0], agentName=agent_addr) - entries.pop(0) - - """ - TODO: This following code assumes that the data indication came unsolicited. - This needs to be enhanced to handle the case of a query response. - """ - if self.console: - self.console.objectProps(broker, obj) - - else: - """ - We have no schema for this data object, move the queue to the pending map and request - schema data from the agent - """ - self.v2_pending_queues[agent_addr] = self.v2_data_queues[agent_addr] - pop_list.append(agent_addr) - self._v2SendSchemaRequest(agent_addr, schemaId) - keep_going = None - for agent_addr in pop_list: - self.v2_data_queues.pop(agent_addr) - finally: - self.cv.release() - - def _addV2Data(self, agent_addr, data_map): - """ - Add data-for-processing to the work queue - """ - process = None - try: - self.cv.acquire() - if agent_addr in self.v2_pending_queues: - self.v2_pending_queues[agent_addr].append(data_map) - else: - if agent_addr not in self.v2_data_queues: - self.v2_data_queues[agent_addr] = [] - self.v2_data_queues[agent_addr].append(data_map) - process = True - finally: - self.cv.release() - - if process: - self._processV2Data() - - def _removeV2Agent(self, agent): - """ - Remove entries in the data queues related to a lost agent. - """ - agent_name = agent.getAgentBank() - try: - self.cv.acquire() - if agent_name in self.v2_data_queues: - self.v2_data_queues.pop(agent_name) - if agent_name in self.v2_pending_queues: - self.v2_pending_queues.pop(agent_name) + return b finally: self.cv.release() + return None - def _schemaInfoFromV2Agent(self, agent_addr): - """ - We have just received new schema information from an agent. Check to see if there's - more work that can now be done. - """ - re_process = None + def _getAgentForAgentAddr(self, agent_addr): try: self.cv.acquire() - if agent_addr in self.v2_pending_queues: - self.v2_data_queues[agent_addr] = self.v2_pending_queues.pop(agent_addr) - re_process = True + key = agent_addr + for b in self.brokers: + if key in b.agents: + return b.agents[key] finally: self.cv.release() - - if re_process: - self._processV2Data() - - def _getSchemaIdforV2ObjectLH(self, data): - """ - Given a data map, extract the schema-identifier. - """ - if data.__class__ != dict: - return None - if '_schema_id' in data: - return ClassKey(data['_schema_id']) return None def __repr__(self): @@ -642,7 +554,6 @@ class Session: returned from the addBroker call """ if self.console: for agent in broker.getAgents(): - self.console.removev2Agent(agent) self.console.delAgent(agent) broker._shutdown() self.brokers.remove(broker) @@ -711,12 +622,12 @@ class Session: agentList.append(a) return agentList - def makeObject(self, classKey, broker=None, **kwargs): + def makeObject(self, classKey, **kwargs): """ Create a new, unmanaged object of the schema indicated by classKey """ schema = self.getSchema(classKey) if schema == None: raise Exception("Schema not found for classKey") - return Object(self, broker, schema, None, True, True, False, kwargs) + return Object(None, schema, None, True, True, kwargs) def getObjects(self, **kwargs): """ Get a list of objects from QMF agents. @@ -886,7 +797,6 @@ class Session: def _handleBrokerDisconnect(self, broker): if self.console: for agent in broker.getAgents(): - self.session._removeV2Agent(agent) self.console.delAgent(agent) self.console.brokerDisconnected(broker) @@ -955,31 +865,6 @@ class Session: smsg = broker._message(sendCodec.encoded) broker._send(smsg) - def _handleMethodResp(self, broker, codec, seq): - code = codec.read_uint32() - text = codec.read_str16() - outArgs = {} - pair = self.seqMgr._release(seq) - if pair == None: - return - method, synchronous = pair - if code == 0: - for arg in method.arguments: - if arg.dir.find("O") != -1: - outArgs[arg.name] = self._decodeValue(codec, arg.type, broker) - result = MethodResult(code, text, outArgs) - if synchronous: - try: - broker.cv.acquire() - broker.syncResult = result - broker.syncInFlight = False - broker.cv.notify() - finally: - broker.cv.release() - else: - if self.console: - self.console.methodResponse(broker, seq, result) - def _handleHeartbeatInd(self, broker, codec, seq, msg): brokerBank = 1 agentBank = 0 @@ -1003,58 +888,32 @@ class Session: self.console.heartbeat(agent, timestamp) broker._ageAgents() - def _handleEventInd(self, broker, codec, seq): - if self.console != None: - event = Event(self, broker, codec) - self.console.event(broker, event) - def _handleSchemaResp(self, broker, codec, seq, agent_addr): kind = codec.read_uint8() classKey = ClassKey(codec) _class = SchemaClass(kind, classKey, codec, self) self.schemaCache.declareClass(classKey, _class) - self.seqMgr._release(seq) - broker._decOutstanding() + ctx = self.seqMgr._release(seq) + if ctx: + broker._decOutstanding() if self.console != None: self.console.newClass(kind, classKey) - if agent_addr: - self._schemaInfoFromV2Agent(agent_addr) - - def _handleContentInd(self, broker, codec, seq, prop=False, stat=False): - classKey = ClassKey(codec) - schema = self.schemaCache.getSchema(classKey) - if not schema: - return - - object = Object(self, broker, schema, codec, prop, stat) - if classKey.getPackageName() == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop: - broker._updateAgent(object) - - try: - self.cv.acquire() - if seq in self.syncSequenceList: - if object.getTimestamps()[2] == 0 and self._selectMatch(object): - self.getResult.append(object) - return - finally: - self.cv.release() - - if self.console and self.rcvObjects: - if prop: - self.console.objectProps(broker, object) - if stat: - self.console.objectStats(broker, object) + if agent_addr and (agent_addr.__class__ == str or agent_addr.__class__ == unicode): + agent = self._getAgentForAgentAddr(agent_addr) + if agent: + agent._schemaInfoFromV2Agent() def _v2HandleHeartbeatInd(self, broker, mp, ah, content): - brokerBank = 1 - agentName = ah["qmf.agent"] - values = content["_values"] - timestamp = values["timestamp"] - interval = values["heartbeat_interval"] - if agentName == None: + try: + agentName = ah["qmf.agent"] + values = content["_values"] + timestamp = values["timestamp"] + interval = values["heartbeat_interval"] + except: return - agent = broker.getAgent(brokerBank, agentName) + + agent = broker.getAgent(1, agentName) if agent == None: agent = Agent(broker, agentName, "QMFv2 Agent", True, interval) broker._addAgent(agentName, agent) @@ -1067,44 +926,6 @@ class Session: def _v2HandleAgentLocateRsp(self, broker, mp, ah, content): self._v2HandleHeartbeatInd(broker, mp, ah, content) - def _v2HandleDataInd(self, broker, mp, ah, content): - kind = "_data" - if "qmf.content" in ah: - kind = ah["qmf.content"] - agent_addr = ah["qmf.agent"] - if content.__class__ != list: - return - if kind == "_data": - for omap in content: - self._addV2Data(agent_addr, omap) - - def _v2HandleQueryRsp(self, broker, mp, ah, content): - pass - - def _v2HandleMethodRsp(self, broker, mp, ah, content): - pass - - def _v2HandleException(self, broker, mp, ah, content): - pass - - def _v2SendSchemaRequest(self, agent_addr, schemaId): - """ - Send a query to an agent to request details on a particular schema class. - IMPORTANT: This function currently sends a QMFv1 schema-request to the address of - the agent. The agent will send its response to amq.direct/. - Eventually, this will be converted to a proper QMFv2 schema query. - """ - broker = self._getBrokerForAgentAddr(agent_addr) - if not broker: - return - - sendCodec = Codec() - seq = self.seqMgr._reserve(None) - broker._setHeader(sendCodec, 'S', seq) - schemaId.encode(sendCodec) - smsg = broker._message(sendCodec.encoded, agent_addr) - broker._send(smsg, "qmf.default.direct") - def _handleError(self, error): try: self.cv.acquire() @@ -1343,6 +1164,10 @@ class Session: return seq return None + +#=================================================================================================== +# SchemaCache +#=================================================================================================== class SchemaCache(object): """ The SchemaCache is a data structure that stores learned schema information. @@ -1419,6 +1244,10 @@ class SchemaCache(object): self.lock.release() return True + +#=================================================================================================== +# ClassKey +#=================================================================================================== class ClassKey: """ A ClassKey uniquely identifies a class from the schema. """ def __init__(self, constructor): @@ -1443,7 +1272,7 @@ class ClassKey: try: self.pname = constructor['_package_name'] self.cname = constructor['_class_name'] - self.hash = constructor['_hash_str'] + self.hash = constructor['_hash'] except: raise Exception("Invalid ClassKey map format") else: @@ -1458,6 +1287,9 @@ class ClassKey: codec.write_str8(self.cname) codec.write_bin128(self.hash.bytes) + def asMap(self): + return {'_package_name': self.pname, '_class_name': self.cname, '_hash': self.hash} + def getPackageName(self): return self.pname @@ -1476,6 +1308,10 @@ class ClassKey: def __repr__(self): return self.pname + ":" + self.cname + "(" + self.getHashString() + ")" + +#=================================================================================================== +# SchemaClass +#=================================================================================================== class SchemaClass: """ """ CLASS_KIND_TABLE = 1 @@ -1558,6 +1394,10 @@ class SchemaClass: else: return self.arguments + self.session.getSchema(self.superTypeKey).getArguments() + +#=================================================================================================== +# SchemaProperty +#=================================================================================================== class SchemaProperty: """ """ def __init__(self, codec): @@ -1587,6 +1427,10 @@ class SchemaProperty: def __repr__(self): return self.name + +#=================================================================================================== +# SchemaStatistic +#=================================================================================================== class SchemaStatistic: """ """ def __init__(self, codec): @@ -1603,6 +1447,10 @@ class SchemaStatistic: def __repr__(self): return self.name + +#=================================================================================================== +# SchemaMethod +#=================================================================================================== class SchemaMethod: """ """ def __init__(self, codec): @@ -1631,6 +1479,10 @@ class SchemaMethod: result += ")" return result + +#=================================================================================================== +# SchemaArgument +#=================================================================================================== class SchemaArgument: """ """ def __init__(self, codec, methodArg): @@ -1658,6 +1510,10 @@ class SchemaArgument: elif key == "refPackage" : self.refPackage = value elif key == "refClass" : self.refClass = value + +#=================================================================================================== +# ObjectId +#=================================================================================================== class ObjectId: """ Object that represents QMF object identifiers """ def __init__(self, constructor, first=0, second=0, agentName=None): @@ -1742,12 +1598,22 @@ class ObjectId: codec.write_uint64(first) codec.write_uint64(second) + def asMap(self): + omap = {'_agent_name': self.agentName, '_object_name': self.objectName} + if self.agentEpoch != 0: + omap['_agent_epoch'] = self.agentEpoch + return omap + def __hash__(self): return self.__repr__().__hash__() def __eq__(self, other): return self.__repr__().__eq__(other) + +#=================================================================================================== +# MethodResult +#=================================================================================================== class MethodResult(object): """ """ def __init__(self, status, text, outArgs): @@ -1763,6 +1629,10 @@ class MethodResult(object): def __repr__(self): return "%s (%d) - %s" % (self.text, self.status, self.outArgs) + +#=================================================================================================== +# ManagedConnection +#=================================================================================================== class ManagedConnection(Thread): """ Thread class for managing a connection. """ DELAY_MIN = 1 @@ -1825,6 +1695,10 @@ class ManagedConnection(Thread): finally: self.cv.release() + +#=================================================================================================== +# Broker +#=================================================================================================== class Broker: """ This object represents a connection (or potential connection) to a QMF broker. """ SYNC_TIME = 60 @@ -1872,7 +1746,7 @@ class Broker: def getAgent(self, brokerBank, agentBank): """ Return the agent object associated with a particular broker and agent bank value.""" - bankKey = (brokerBank, agentBank) + bankKey = agentBank try: self.cv.acquire() if bankKey in self.agents: @@ -1923,7 +1797,7 @@ class Broker: try: self.cv.acquire() self.agents = {} - self.agents[(1,0)] = Agent(self, 0, "BrokerAgent") + self.agents[0] = Agent(self, 0, "BrokerAgent") finally: self.cv.release() @@ -1960,7 +1834,7 @@ class Broker: self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest", accept_mode=self.amqpSession.accept_mode.none, acquire_mode=self.amqpSession.acquire_mode.pre_acquired) - self.amqpSession.incoming("rdest").listen(self._replyCb, self._exceptionCb) + self.amqpSession.incoming("rdest").listen(self._v1Cb, self._exceptionCb) self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1) self.amqpSession.message_flow(destination="rdest", unit=0, value=0xFFFFFFFFL) self.amqpSession.message_flow(destination="rdest", unit=1, value=0xFFFFFFFFL) @@ -1970,7 +1844,7 @@ class Broker: self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest", accept_mode=self.amqpSession.accept_mode.none, acquire_mode=self.amqpSession.acquire_mode.pre_acquired) - self.amqpSession.incoming("tdest").listen(self._replyCb) + self.amqpSession.incoming("tdest").listen(self._v1Cb) self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1) self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFFL) self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFFL) @@ -2013,7 +1887,7 @@ class Broker: raise def _updateAgent(self, obj): - bankKey = (obj.brokerBank, obj.agentBank) + bankKey = obj.agentBank agent = None if obj._deleteTime == 0: try: @@ -2037,7 +1911,7 @@ class Broker: def _addAgent(self, name, agent): try: self.cv.acquire() - self.agents[(1, name)] = agent + self.agents[name] = agent finally: self.cv.release() if self.session.console: @@ -2057,7 +1931,6 @@ class Broker: self.cv.release() if self.session.console: for agent in to_notify: - self.session._removeV2Agent(agent) self.session.console.delAgent(agent) def _v2SendAgentLocate(self, predicate={}): @@ -2167,51 +2040,88 @@ class Broker: finally: self.cv.release() - def _replyCb(self, msg): + def _v1Cb(self, msg): + """ + This is the general message handler for messages received via the QMFv1 exchanges. + """ + agent = None agent_addr = None mp = msg.get("message_properties") ah = mp.application_headers if ah and 'qmf.agent' in ah: agent_addr = ah['qmf.agent'] + + if not agent_addr: + # + # See if we can determine the agent identity from the routing key + # + dp = msg.get("delivery_properties") + rkey = None + if dp.routing_key: + rkey = dp.routing_key + items = rkey.split('.') + if len(items) >= 4: + if items[0] == 'console' and items[3].isdigit(): + agent_addr = int(items[3]) # The QMFv1 Agent Bank + if agent_addr and agent_addr in self.agents: + agent = self.agents[agent_addr] + codec = Codec(msg.body) while True: opcode, seq = self._checkHeader(codec) if opcode == None: return if opcode == 'b': self.session._handleBrokerResp (self, codec, seq) elif opcode == 'p': self.session._handlePackageInd (self, codec, seq) - elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq) elif opcode == 'q': self.session._handleClassInd (self, codec, seq) - elif opcode == 'm': self.session._handleMethodResp (self, codec, seq) - elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg) - elif opcode == 'e': self.session._handleEventInd (self, codec, seq) elif opcode == 's': self.session._handleSchemaResp (self, codec, seq, agent_addr) - elif opcode == 'c': self.session._handleContentInd (self, codec, seq, prop=True) - elif opcode == 'i': self.session._handleContentInd (self, codec, seq, stat=True) - elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True) - self.session.receiver._completed.add(msg.id) - self.session.channel.session_completed(self.session.receiver._completed) + elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg) + elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq) + elif agent: + agent._handleQmfV1Message(opcode, mp, ah, codec) + + self.amqpSession.receiver._completed.add(msg.id) + self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed) def _v2Cb(self, msg): - dp = msg.get("delivery_properties") + """ + This is the general message handler for messages received via QMFv2 exchanges. + """ mp = msg.get("message_properties") ah = mp["application_headers"] - opcode = ah["qmf.opcode"] codec = Codec(msg.body) - if mp.content_type == "amqp/list": - content = codec.read_list() - elif mp.content_type == "amqp/map": - content = codec.read_map() - else: - return - - if opcode == None: return - elif opcode == '_agent_heartbeat_indication': self.session._v2HandleHeartbeatInd(self, mp, ah, content) - elif opcode == '_agent_locate_response': self.session._v2HandleAgentLocateRsp(self, mp, ah, content) - elif opcode == '_data_indication': self.session._v2HandleDataInd(self, mp, ah, content) - elif opcode == '_query_response': self.session._v2HandleQueryRsp(self, mp, ah, content) - elif opcode == '_method_response': self.session._v2HandleMethodRsp(self, mp, ah, content) - elif opcode == '_exception': self.session._v2HandleException(self, mp, ah, content) + if 'qmf.opcode' in ah: + opcode = ah['qmf.opcode'] + if mp.content_type == "amqp/list": + content = codec.read_list() + if not content: + content = [] + elif mp.content_type == "amqp/map": + content = codec.read_map() + if not content: + content = {} + else: + content = None + + if content != None: + ## + ## Directly handle agent heartbeats and agent locate responses as these are broker-scope (they are + ## used to maintain the broker's list of agent proxies. + ## + if opcode == '_agent_heartbeat_indication': self.session._v2HandleHeartbeatInd(self, mp, ah, content) + elif opcode == '_agent_locate_response': self.session._v2HandleAgentLocateRsp(self, mp, ah, content) + else: + ## + ## All other opcodes are agent-scope and are forwarded to the agent proxy representing the sender + ## of the message. + ## + agent_addr = ah['qmf.agent'] + if agent_addr in self.agents: + agent = self.agents[agent_addr] + agent._handleQmfV2Message(opcode, mp, ah, content) + + self.amqpSession.receiver._completed.add(msg.id) + self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed) def _exceptionCb(self, data): self.connected = False @@ -2227,20 +2137,46 @@ class Broker: if self.thread: self.thread.disconnected() + +#=================================================================================================== +# Agent +#=================================================================================================== class Agent: - """ """ + """ + This class represents a proxy for a remote agent being managed + """ def __init__(self, broker, agentBank, label, isV2=False, interval=0): self.broker = broker + self.session = broker.session + self.schemaCache = self.session.schemaCache self.brokerBank = broker.getBrokerBank() self.agentBank = agentBank self.label = label self.isV2 = isV2 self.heartbeatInterval = interval + self.lock = Lock() + self.seqMgr = self.session.seqMgr + self.contextMap = {} + self.unsolicitedContext = RequestContext(self, self) self.lastSeenTime = time() + + def __call__(self, **kwargs): + """ + This is the handler for unsolicited stuff received from the agent + """ + if 'qmf_object' in kwargs: + if self.session.console: + self.session.console.objectProps(self.broker, kwargs['qmf_object']) + if 'qmf_object_stats' in kwargs: + if self.session.console: + self.session.console.objectStats(self.broker, kwargs['qmf_object_stats']) + + def touch(self): self.lastSeenTime = time() + def isOld(self): if self.heartbeatInterval == 0: return None @@ -2248,6 +2184,7 @@ class Agent: return True return None + def __repr__(self): if self.isV2: ver = "v2" @@ -2255,15 +2192,392 @@ class Agent: ver = "v1" return "Agent(%s) at bank %d.%s (%s)" % (ver, self.brokerBank, self.agentBank, self.label) + def getBroker(self): return self.broker + def getBrokerBank(self): return self.brokerBank + def getAgentBank(self): return self.agentBank + + def getObjects(self, notifiable=None, **kwargs): + """ Get a list of objects from QMF agents. + All arguments are passed by name(keyword). + + If 'notifiable' is None (default), this call will block until completion or timeout. + If supplied, notifiable is assumed to be a callable object that will be called when the + list of queried objects arrives. The single argument to the call shall be a list of + the returned objects. + + The class for queried objects may be specified in one of the following ways: + + _schema = - supply a schema object returned from getSchema. + _key = - supply a classKey from the list returned by getClasses. + _class = - supply a class name as a string. If the class name exists + in multiple packages, a _package argument may also be supplied. + _objectId = - get the object referenced by the object-id + + The default timeout for this synchronous operation is 60 seconds. To change the timeout, + use the following argument: + + _timeout =