summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp')
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp267
1 files changed, 128 insertions, 139 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 1633e77a4f..5966b0766c 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -44,6 +44,7 @@ using std::ifstream;
using std::string;
using std::cout;
using std::endl;
+using qpid::messaging::Variant;
Mutex ManagementAgent::Singleton::lock;
bool ManagementAgent::Singleton::disabled = false;
@@ -179,7 +180,7 @@ void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings,
void ManagementAgentImpl::registerClass(const string& packageName,
const string& className,
uint8_t* md5Sum,
- qpid::management::ManagementObject::writeSchemaCall_t schemaCall)
+ ManagementObject::writeSchemaCall_t schemaCall)
{
Mutex::ScopedLock lock(agentLock);
PackageMap::iterator pIter = findOrAddPackage(packageName);
@@ -189,7 +190,7 @@ void ManagementAgentImpl::registerClass(const string& packageName,
void ManagementAgentImpl::registerEvent(const string& packageName,
const string& eventName,
uint8_t* md5Sum,
- qpid::management::ManagementObject::writeSchemaCall_t schemaCall)
+ ManagementObject::writeSchemaCall_t schemaCall)
{
Mutex::ScopedLock lock(agentLock);
PackageMap::iterator pIter = findOrAddPackage(packageName);
@@ -239,12 +240,12 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se
key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." <<
event.getPackageName() << "." << event.getEventName();
- ::qpid::messaging::Message msg;
- ::qpid::messaging::MapContent content(msg);
- ::qpid::messaging::VariantMap &map_ = content.asMap();
- ::qpid::messaging::VariantMap schemaId;
- ::qpid::messaging::VariantMap values;
- ::qpid::messaging::VariantMap headers;
+ qpid::messaging::Message msg;
+ qpid::messaging::MapContent content(msg);
+ Variant::Map &map_ = content.asMap();
+ Variant::Map schemaId;
+ Variant::Map values;
+ Variant::Map headers;
map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
event.getEventName(),
@@ -379,73 +380,29 @@ void ManagementAgentImpl::sendHeartbeat()
QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
}
-void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequence,
- uint32_t code, string text)
+void ManagementAgentImpl::sendException(const string& replyToKey, const string& cid,
+ const string& text, uint32_t code)
{
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- encodeHeader(outBuffer, 'z', sequence);
- outBuffer.putLong(code);
- outBuffer.putShortString(text);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyToKey);
- QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text);
-}
-
-void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
-{
- Mutex::ScopedLock lock(agentLock);
-
- assignedBrokerBank = inBuffer.getLong();
- assignedAgentBank = inBuffer.getLong();
-
- QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank);
-
- if ((assignedBrokerBank != requestedBrokerBank) ||
- (assignedAgentBank != requestedAgentBank)) {
- if (requestedAgentBank == 0) {
- QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." <<
- assignedAgentBank);
- } else {
- QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank <<
- "." << assignedAgentBank);
- }
- storeData();
- requestedBrokerBank = assignedBrokerBank;
- requestedAgentBank = assignedAgentBank;
- }
+ static const string addr_exchange("qmf.default.direct");
- attachment.setBanks(assignedBrokerBank, assignedAgentBank);
+ messaging::Message msg;
+ messaging::MapContent content(msg);
+ messaging::Variant::Map& map(content.asMap());
+ messaging::Variant::Map headers;
+ messaging::Variant::Map values;
- // Bind to qpid.management to receive commands
- connThreadBody.bindToBank(assignedBrokerBank, assignedAgentBank);
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_exception";
+ headers["qmf.agent"] = name_address;
- // Send package indications for all local packages
- for (PackageMap::iterator pIter = packages.begin();
- pIter != packages.end();
- pIter++) {
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ values["error_code"] = code;
+ values["error_text"] = text;
+ map["_values"] = values;
- encodeHeader(outBuffer, 'p');
- encodePackageIndication(outBuffer, pIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
+ content.encode();
+ connThreadBody.sendBuffer(msg.getContent(), cid, headers, addr_exchange, replyToKey);
- // Send class indications for all local classes
- ClassMap cMap = pIter->second;
- for (ClassMap::iterator cIter = cMap.begin(); cIter != cMap.end(); cIter++) {
- outBuffer.reset();
- encodeHeader(outBuffer, 'q');
- encodeClassIndication(outBuffer, pIter, cIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
- }
- }
+ QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text);
}
void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo)
@@ -509,7 +466,7 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string&
} else {
string methodName;
ObjectId objId;
- qpid::messaging::Variant::Map inArgs;
+ Variant::Map inArgs;
try {
// coversions will throw if input is invalid.
@@ -539,7 +496,7 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string&
}
}
- qpid::messaging::Variant::Map headers;
+ Variant::Map headers;
headers["method"] = "response";
headers["qmf.agent"] = name_address;
if (failed)
@@ -551,20 +508,14 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string&
connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo);
}
-void ManagementAgentImpl::handleGetQuery(const string& body, const string& contentType,
- const string& cid, const string& replyTo)
+void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& replyTo)
{
moveNewObjectsLH();
- if (contentType != "_query_v1") {
- QPID_LOG(warning, "Support for QMF V2 Query format TBD!!!");
- return;
- }
-
qpid::messaging::Message inMsg(body);
qpid::messaging::MapView inMap(inMsg);
qpid::messaging::MapView::const_iterator i;
- ::qpid::messaging::Variant::Map headers;
+ Variant::Map headers;
QPID_LOG(trace, "RCVD GetQuery: map=" << inMap << " cid=" << cid);
@@ -572,71 +523,111 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& conte
headers["qmf.opcode"] = "_query_response";
headers["qmf.content"] = "_data";
headers["qmf.agent"] = name_address;
- headers["partial"];
+ headers["partial"] = Variant();
+
+ qpid::messaging::Message outMsg;
+ qpid::messaging::ListContent content(outMsg);
+ Variant::List &list_ = content.asList();
+ Variant::Map map_;
+ Variant::Map values;
+ Variant::Map oidMap;
+
+ /*
+ * Unpack the _what element of the query. Currently we only support OBJECT queries.
+ */
+ i = inMap.find("_what");
+ if (i == inMap.end()) {
+ sendException(replyTo, cid, "_what element missing in Query");
+ return;
+ }
+
+ if (i->second.getType() != qpid::messaging::VAR_STRING) {
+ sendException(replyTo, cid, "_what element is not a string");
+ return;
+ }
+
+ if (i->second.asString() != "OBJECT") {
+ sendException(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported");
+ return;
+ }
- ::qpid::messaging::Message outMsg;
- ::qpid::messaging::ListContent content(outMsg);
- ::qpid::messaging::Variant::List &list_ = content.asList();
- ::qpid::messaging::Variant::Map map_;
- ::qpid::messaging::Variant::Map values;
string className;
+ string packageName;
- i = inMap.find("_class");
- if (i != inMap.end())
- try {
- className = i->second.asString();
- } catch(exception& e) {
- className.clear();
- QPID_LOG(trace, "RCVD GetQuery: invalid format - class target ignored.");
- }
+ /*
+ * Handle the _schema_id element, if supplied.
+ */
+ i = inMap.find("_schema_id");
+ if (i != inMap.end() && i->second.getType() == qpid::messaging::VAR_MAP) {
+ const Variant::Map& schemaIdMap(i->second.asMap());
- if (className.empty()) {
- ObjectId objId;
- i = inMap.find("_object_id");
- if (i != inMap.end()) {
-
- try {
- objId = ObjectId(i->second.asMap());
- } catch (exception &e) {
- objId = ObjectId(); // empty object id - won't find a match (I hope).
- QPID_LOG(trace, "RCVD GetQuery (invalid Object Id format) to=" << replyTo << " seq=" << cid);
- }
+ Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name");
+ if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::messaging::VAR_STRING)
+ className = s_iter->second.asString();
- ManagementObjectMap::iterator iter = managementObjects.find(objId);
- if (iter != managementObjects.end()) {
- ManagementObject* object = iter->second;
+ s_iter = schemaIdMap.find("_package_name");
+ if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::messaging::VAR_STRING)
+ packageName = s_iter->second.asString();
+ }
- if (object->getConfigChanged() || object->getInstChanged())
- object->setUpdateTime();
+ /*
+ * Unpack the _object_id element of the query if it is present. If it is present, find that one
+ * object and return it. If it is not present, send a class-based result.
+ */
+ i = inMap.find("_object_id");
+ if (i != inMap.end() && i->second.getType() == qpid::messaging::VAR_MAP) {
+ ObjectId objId(i->second);
- object->mapEncodeValues(values, true, true); // write both stats and properties
- map_["_values"] = values;
- list_.push_back(map_);
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ if (iter != managementObjects.end()) {
+ ManagementObject* object = iter->second;
- content.encode();
- connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo);
- }
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ object->mapEncodeValues(values, true, true); // write both stats and properties
+ objId.mapEncode(oidMap);
+ map_["_values"] = values;
+ map_["_object_id"] = oidMap;
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ object->getMd5Sum());
+ list_.push_back(map_);
+ headers.erase("partial");
+
+ content.encode();
+ connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo);
+ return;
}
} else {
for (ManagementObjectMap::iterator iter = managementObjects.begin();
iter != managementObjects.end();
iter++) {
ManagementObject* object = iter->second;
- if (object->getClassName() == className) {
+ if (object->getClassName() == className &&
+ (packageName.empty() || object->getPackageName() == packageName)) {
// @todo support multiple object reply per message
values.clear();
list_.clear();
+ oidMap.clear();
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
object->mapEncodeValues(values, true, true); // write both stats and properties
+ iter->first.mapEncode(oidMap);
map_["_values"] = values;
+ map_["_object_id"] = oidMap;
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ object->getMd5Sum());
list_.push_back(map_);
content.encode();
- connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo);
+ connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo);
}
}
}
@@ -645,8 +636,8 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& conte
list_.clear();
headers.erase("partial");
content.encode();
- connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo);
- QPID_LOG(trace, "SENT ObjectInd");
+ connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo);
}
void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo)
@@ -718,9 +709,7 @@ void ManagementAgentImpl::received(Message& msg)
if (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToKey);
else if (opcode == "_method_request") handleMethodRequest(msg.getData(), cid, replyToKey);
- else if (opcode == "_query_request") handleGetQuery(msg.getData(),
- mp.getApplicationHeaders().getAsString("qmf.content"),
- cid, replyToKey);
+ else if (opcode == "_query_request") handleGetQuery(msg.getData(), cid, replyToKey);
else {
QPID_LOG(warning, "Support for QMF V2 Opcode [" << opcode << "] TBD!!!");
}
@@ -737,9 +726,9 @@ void ManagementAgentImpl::received(Message& msg)
if (checkHeader(inBuffer, &opcode, &sequence))
{
- if (opcode == 'a') handleAttachResponse(inBuffer);
- else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToKey);
+ if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToKey);
else if (opcode == 'x') handleConsoleAddedIndication();
+ else
QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode));
}
}
@@ -754,15 +743,15 @@ void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq
buf.putLong (seq);
}
-qpid::messaging::Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname,
- const string& cname,
- const uint8_t *md5Sum)
+Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname,
+ const string& cname,
+ const uint8_t *md5Sum)
{
- qpid::messaging::Variant::Map map_;
+ Variant::Map map_;
map_["_package_name"] = pname;
map_["_class_name"] = cname;
- map_["_hash_str"] = messaging::Uuid(md5Sum);
+ map_["_hash"] = messaging::Uuid(md5Sum);
return map_;
}
@@ -821,7 +810,7 @@ void ManagementAgentImpl::addClassLocal(uint8_t classKind,
PackageMap::iterator pIter,
const string& className,
uint8_t* md5Sum,
- qpid::management::ManagementObject::writeSchemaCall_t schemaCall)
+ ManagementObject::writeSchemaCall_t schemaCall)
{
SchemaClassKey key;
ClassMap& cMap = pIter->second;
@@ -902,9 +891,9 @@ void ManagementAgentImpl::periodicProcessing()
!baseObject->isDeleted()))
continue;
- ::qpid::messaging::Message m;
- ::qpid::messaging::ListContent content(m);
- ::qpid::messaging::Variant::List &list_ = content.asList();
+ qpid::messaging::Message m;
+ qpid::messaging::ListContent content(m);
+ Variant::List &list_ = content.asList();
for (ManagementObjectMap::iterator iter = baseIter;
iter != managementObjects.end();
@@ -920,9 +909,9 @@ void ManagementAgentImpl::periodicProcessing()
send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
if (send_stats || send_props) {
- ::qpid::messaging::Variant::Map map_;
- ::qpid::messaging::Variant::Map values;
- ::qpid::messaging::Variant::Map oid;
+ Variant::Map map_;
+ Variant::Map values;
+ Variant::Map oid;
object->getObjectId().mapEncode(oid);
map_["_object_id"] = oid;
@@ -943,7 +932,7 @@ void ManagementAgentImpl::periodicProcessing()
content.encode();
const string &str = m.getContent();
if (str.length()) {
- ::qpid::messaging::Variant::Map headers;
+ Variant::Map headers;
headers["method"] = "indication";
headers["qmf.opcode"] = "_data_indication";
headers["qmf.content"] = "_data";
@@ -1068,13 +1057,13 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf,
void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data,
const string& cid,
- const qpid::messaging::VariantMap headers,
+ const Variant::Map headers,
const string& exchange,
const string& routingKey,
const string& contentType)
{
Message msg;
- qpid::messaging::VariantMap::const_iterator i;
+ Variant::Map::const_iterator i;
if (!cid.empty())
msg.getMessageProperties().setCorrelationId(cid);