diff options
Diffstat (limited to 'qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 267 |
1 files changed, 128 insertions, 139 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 1633e77a4f..5966b0766c 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -44,6 +44,7 @@ using std::ifstream; using std::string; using std::cout; using std::endl; +using qpid::messaging::Variant; Mutex ManagementAgent::Singleton::lock; bool ManagementAgent::Singleton::disabled = false; @@ -179,7 +180,7 @@ void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings, void ManagementAgentImpl::registerClass(const string& packageName, const string& className, uint8_t* md5Sum, - qpid::management::ManagementObject::writeSchemaCall_t schemaCall) + ManagementObject::writeSchemaCall_t schemaCall) { Mutex::ScopedLock lock(agentLock); PackageMap::iterator pIter = findOrAddPackage(packageName); @@ -189,7 +190,7 @@ void ManagementAgentImpl::registerClass(const string& packageName, void ManagementAgentImpl::registerEvent(const string& packageName, const string& eventName, uint8_t* md5Sum, - qpid::management::ManagementObject::writeSchemaCall_t schemaCall) + ManagementObject::writeSchemaCall_t schemaCall) { Mutex::ScopedLock lock(agentLock); PackageMap::iterator pIter = findOrAddPackage(packageName); @@ -239,12 +240,12 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." << event.getPackageName() << "." << event.getEventName(); - ::qpid::messaging::Message msg; - ::qpid::messaging::MapContent content(msg); - ::qpid::messaging::VariantMap &map_ = content.asMap(); - ::qpid::messaging::VariantMap schemaId; - ::qpid::messaging::VariantMap values; - ::qpid::messaging::VariantMap headers; + qpid::messaging::Message msg; + qpid::messaging::MapContent content(msg); + Variant::Map &map_ = content.asMap(); + Variant::Map schemaId; + Variant::Map values; + Variant::Map headers; map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(), event.getEventName(), @@ -379,73 +380,29 @@ void ManagementAgentImpl::sendHeartbeat() QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); } -void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequence, - uint32_t code, string text) +void ManagementAgentImpl::sendException(const string& replyToKey, const string& cid, + const string& text, uint32_t code) { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - encodeHeader(outBuffer, 'z', sequence); - outBuffer.putLong(code); - outBuffer.putShortString(text); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyToKey); - QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text); -} - -void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) -{ - Mutex::ScopedLock lock(agentLock); - - assignedBrokerBank = inBuffer.getLong(); - assignedAgentBank = inBuffer.getLong(); - - QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank); - - if ((assignedBrokerBank != requestedBrokerBank) || - (assignedAgentBank != requestedAgentBank)) { - if (requestedAgentBank == 0) { - QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." << - assignedAgentBank); - } else { - QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank << - "." << assignedAgentBank); - } - storeData(); - requestedBrokerBank = assignedBrokerBank; - requestedAgentBank = assignedAgentBank; - } + static const string addr_exchange("qmf.default.direct"); - attachment.setBanks(assignedBrokerBank, assignedAgentBank); + messaging::Message msg; + messaging::MapContent content(msg); + messaging::Variant::Map& map(content.asMap()); + messaging::Variant::Map headers; + messaging::Variant::Map values; - // Bind to qpid.management to receive commands - connThreadBody.bindToBank(assignedBrokerBank, assignedAgentBank); + headers["method"] = "indication"; + headers["qmf.opcode"] = "_exception"; + headers["qmf.agent"] = name_address; - // Send package indications for all local packages - for (PackageMap::iterator pIter = packages.begin(); - pIter != packages.end(); - pIter++) { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + values["error_code"] = code; + values["error_text"] = text; + map["_values"] = values; - encodeHeader(outBuffer, 'p'); - encodePackageIndication(outBuffer, pIter); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); + content.encode(); + connThreadBody.sendBuffer(msg.getContent(), cid, headers, addr_exchange, replyToKey); - // Send class indications for all local classes - ClassMap cMap = pIter->second; - for (ClassMap::iterator cIter = cMap.begin(); cIter != cMap.end(); cIter++) { - outBuffer.reset(); - encodeHeader(outBuffer, 'q'); - encodeClassIndication(outBuffer, pIter, cIter); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); - } - } + QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text); } void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo) @@ -509,7 +466,7 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& } else { string methodName; ObjectId objId; - qpid::messaging::Variant::Map inArgs; + Variant::Map inArgs; try { // coversions will throw if input is invalid. @@ -539,7 +496,7 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& } } - qpid::messaging::Variant::Map headers; + Variant::Map headers; headers["method"] = "response"; headers["qmf.agent"] = name_address; if (failed) @@ -551,20 +508,14 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo); } -void ManagementAgentImpl::handleGetQuery(const string& body, const string& contentType, - const string& cid, const string& replyTo) +void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& replyTo) { moveNewObjectsLH(); - if (contentType != "_query_v1") { - QPID_LOG(warning, "Support for QMF V2 Query format TBD!!!"); - return; - } - qpid::messaging::Message inMsg(body); qpid::messaging::MapView inMap(inMsg); qpid::messaging::MapView::const_iterator i; - ::qpid::messaging::Variant::Map headers; + Variant::Map headers; QPID_LOG(trace, "RCVD GetQuery: map=" << inMap << " cid=" << cid); @@ -572,71 +523,111 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& conte headers["qmf.opcode"] = "_query_response"; headers["qmf.content"] = "_data"; headers["qmf.agent"] = name_address; - headers["partial"]; + headers["partial"] = Variant(); + + qpid::messaging::Message outMsg; + qpid::messaging::ListContent content(outMsg); + Variant::List &list_ = content.asList(); + Variant::Map map_; + Variant::Map values; + Variant::Map oidMap; + + /* + * Unpack the _what element of the query. Currently we only support OBJECT queries. + */ + i = inMap.find("_what"); + if (i == inMap.end()) { + sendException(replyTo, cid, "_what element missing in Query"); + return; + } + + if (i->second.getType() != qpid::messaging::VAR_STRING) { + sendException(replyTo, cid, "_what element is not a string"); + return; + } + + if (i->second.asString() != "OBJECT") { + sendException(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported"); + return; + } - ::qpid::messaging::Message outMsg; - ::qpid::messaging::ListContent content(outMsg); - ::qpid::messaging::Variant::List &list_ = content.asList(); - ::qpid::messaging::Variant::Map map_; - ::qpid::messaging::Variant::Map values; string className; + string packageName; - i = inMap.find("_class"); - if (i != inMap.end()) - try { - className = i->second.asString(); - } catch(exception& e) { - className.clear(); - QPID_LOG(trace, "RCVD GetQuery: invalid format - class target ignored."); - } + /* + * Handle the _schema_id element, if supplied. + */ + i = inMap.find("_schema_id"); + if (i != inMap.end() && i->second.getType() == qpid::messaging::VAR_MAP) { + const Variant::Map& schemaIdMap(i->second.asMap()); - if (className.empty()) { - ObjectId objId; - i = inMap.find("_object_id"); - if (i != inMap.end()) { - - try { - objId = ObjectId(i->second.asMap()); - } catch (exception &e) { - objId = ObjectId(); // empty object id - won't find a match (I hope). - QPID_LOG(trace, "RCVD GetQuery (invalid Object Id format) to=" << replyTo << " seq=" << cid); - } + Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name"); + if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::messaging::VAR_STRING) + className = s_iter->second.asString(); - ManagementObjectMap::iterator iter = managementObjects.find(objId); - if (iter != managementObjects.end()) { - ManagementObject* object = iter->second; + s_iter = schemaIdMap.find("_package_name"); + if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::messaging::VAR_STRING) + packageName = s_iter->second.asString(); + } - if (object->getConfigChanged() || object->getInstChanged()) - object->setUpdateTime(); + /* + * Unpack the _object_id element of the query if it is present. If it is present, find that one + * object and return it. If it is not present, send a class-based result. + */ + i = inMap.find("_object_id"); + if (i != inMap.end() && i->second.getType() == qpid::messaging::VAR_MAP) { + ObjectId objId(i->second); - object->mapEncodeValues(values, true, true); // write both stats and properties - map_["_values"] = values; - list_.push_back(map_); + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter != managementObjects.end()) { + ManagementObject* object = iter->second; - content.encode(); - connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo); - } + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + object->mapEncodeValues(values, true, true); // write both stats and properties + objId.mapEncode(oidMap); + map_["_values"] = values; + map_["_object_id"] = oidMap; + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + object->getMd5Sum()); + list_.push_back(map_); + headers.erase("partial"); + + content.encode(); + connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo); + return; } } else { for (ManagementObjectMap::iterator iter = managementObjects.begin(); iter != managementObjects.end(); iter++) { ManagementObject* object = iter->second; - if (object->getClassName() == className) { + if (object->getClassName() == className && + (packageName.empty() || object->getPackageName() == packageName)) { // @todo support multiple object reply per message values.clear(); list_.clear(); + oidMap.clear(); if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); object->mapEncodeValues(values, true, true); // write both stats and properties + iter->first.mapEncode(oidMap); map_["_values"] = values; + map_["_object_id"] = oidMap; + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + object->getMd5Sum()); list_.push_back(map_); content.encode(); - connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo); + connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo); } } } @@ -645,8 +636,8 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& conte list_.clear(); headers.erase("partial"); content.encode(); - connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo); - QPID_LOG(trace, "SENT ObjectInd"); + connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo); } void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo) @@ -718,9 +709,7 @@ void ManagementAgentImpl::received(Message& msg) if (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToKey); else if (opcode == "_method_request") handleMethodRequest(msg.getData(), cid, replyToKey); - else if (opcode == "_query_request") handleGetQuery(msg.getData(), - mp.getApplicationHeaders().getAsString("qmf.content"), - cid, replyToKey); + else if (opcode == "_query_request") handleGetQuery(msg.getData(), cid, replyToKey); else { QPID_LOG(warning, "Support for QMF V2 Opcode [" << opcode << "] TBD!!!"); } @@ -737,9 +726,9 @@ void ManagementAgentImpl::received(Message& msg) if (checkHeader(inBuffer, &opcode, &sequence)) { - if (opcode == 'a') handleAttachResponse(inBuffer); - else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToKey); + if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToKey); else if (opcode == 'x') handleConsoleAddedIndication(); + else QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode)); } } @@ -754,15 +743,15 @@ void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq buf.putLong (seq); } -qpid::messaging::Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname, - const string& cname, - const uint8_t *md5Sum) +Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname, + const string& cname, + const uint8_t *md5Sum) { - qpid::messaging::Variant::Map map_; + Variant::Map map_; map_["_package_name"] = pname; map_["_class_name"] = cname; - map_["_hash_str"] = messaging::Uuid(md5Sum); + map_["_hash"] = messaging::Uuid(md5Sum); return map_; } @@ -821,7 +810,7 @@ void ManagementAgentImpl::addClassLocal(uint8_t classKind, PackageMap::iterator pIter, const string& className, uint8_t* md5Sum, - qpid::management::ManagementObject::writeSchemaCall_t schemaCall) + ManagementObject::writeSchemaCall_t schemaCall) { SchemaClassKey key; ClassMap& cMap = pIter->second; @@ -902,9 +891,9 @@ void ManagementAgentImpl::periodicProcessing() !baseObject->isDeleted())) continue; - ::qpid::messaging::Message m; - ::qpid::messaging::ListContent content(m); - ::qpid::messaging::Variant::List &list_ = content.asList(); + qpid::messaging::Message m; + qpid::messaging::ListContent content(m); + Variant::List &list_ = content.asList(); for (ManagementObjectMap::iterator iter = baseIter; iter != managementObjects.end(); @@ -920,9 +909,9 @@ void ManagementAgentImpl::periodicProcessing() send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish())); if (send_stats || send_props) { - ::qpid::messaging::Variant::Map map_; - ::qpid::messaging::Variant::Map values; - ::qpid::messaging::Variant::Map oid; + Variant::Map map_; + Variant::Map values; + Variant::Map oid; object->getObjectId().mapEncode(oid); map_["_object_id"] = oid; @@ -943,7 +932,7 @@ void ManagementAgentImpl::periodicProcessing() content.encode(); const string &str = m.getContent(); if (str.length()) { - ::qpid::messaging::Variant::Map headers; + Variant::Map headers; headers["method"] = "indication"; headers["qmf.opcode"] = "_data_indication"; headers["qmf.content"] = "_data"; @@ -1068,13 +1057,13 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data, const string& cid, - const qpid::messaging::VariantMap headers, + const Variant::Map headers, const string& exchange, const string& routingKey, const string& contentType) { Message msg; - qpid::messaging::VariantMap::const_iterator i; + Variant::Map::const_iterator i; if (!cid.empty()) msg.getMessageProperties().setCorrelationId(cid); |