summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp')
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp1390
1 files changed, 1390 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
new file mode 100644
index 0000000000..633401ef5b
--- /dev/null
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -0,0 +1,1390 @@
+
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "qpid/management/Manageable.h"
+#include "qpid/management/ManagementObject.h"
+#include "qpid/log/Statement.h"
+#include "qpid/agent/ManagementAgentImpl.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include <list>
+#include <string.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <iostream>
+#include <fstream>
+#include <boost/lexical_cast.hpp>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::management;
+using namespace qpid::sys;
+using namespace std;
+using std::stringstream;
+using std::ofstream;
+using std::ifstream;
+using std::string;
+using std::endl;
+using qpid::types::Variant;
+using qpid::amqp_0_10::MapCodec;
+using qpid::amqp_0_10::ListCodec;
+
+namespace {
+ qpid::sys::Mutex lock;
+ bool disabled = false;
+ ManagementAgent* agent = 0;
+ int refCount = 0;
+
+ const string defaultVendorName("vendor");
+ const string defaultProductName("product");
+
+ // Create a valid binding key substring by
+ // replacing all '.' chars with '_'
+ const string keyifyNameStr(const string& name)
+ {
+ string n2 = name;
+
+ size_t pos = n2.find('.');
+ while (pos != n2.npos) {
+ n2.replace(pos, 1, "_");
+ pos = n2.find('.', pos);
+ }
+ return n2;
+ }
+}
+
+ManagementAgent::Singleton::Singleton(bool disableManagement)
+{
+ sys::Mutex::ScopedLock _lock(lock);
+ if (disableManagement && !disabled) {
+ disabled = true;
+ assert(refCount == 0); // can't disable after agent has been allocated
+ }
+ if (refCount == 0 && !disabled)
+ agent = new ManagementAgentImpl();
+ refCount++;
+}
+
+ManagementAgent::Singleton::~Singleton()
+{
+ sys::Mutex::ScopedLock _lock(lock);
+ refCount--;
+ if (refCount == 0 && !disabled) {
+ delete agent;
+ agent = 0;
+ }
+}
+
+ManagementAgent* ManagementAgent::Singleton::getInstance()
+{
+ return agent;
+}
+
+const string ManagementAgentImpl::storeMagicNumber("MA02");
+
+ManagementAgentImpl::ManagementAgentImpl() :
+ interval(10), extThread(false), pipeHandle(0), notifyCallback(0), notifyContext(0),
+ notifyable(0), inCallback(false),
+ initialized(false), connected(false), useMapMsg(false), lastFailure("never connected"),
+ topicExchange("qmf.default.topic"), directExchange("qmf.default.direct"),
+ schemaTimestamp(Duration(EPOCH, now())),
+ publishAllData(true), requestedBrokerBank(0), requestedAgentBank(0),
+ assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0),
+ maxV2ReplyObjs(10), // KAG todo: make this a tuneable parameter
+ connThreadBody(*this), connThread(connThreadBody),
+ pubThreadBody(*this), pubThread(pubThreadBody)
+{
+}
+
+ManagementAgentImpl::~ManagementAgentImpl()
+{
+ // shutdown & cleanup all threads
+ connThreadBody.close();
+ pubThreadBody.close();
+
+ connThread.join();
+ pubThread.join();
+
+ if (pipeHandle) {
+ delete pipeHandle;
+ pipeHandle = 0;
+ }
+}
+
+void ManagementAgentImpl::setName(const string& vendor, const string& product, const string& instance)
+{
+ if (vendor.find(':') != vendor.npos) {
+ throw Exception("vendor string cannot contain a ':' character.");
+ }
+ if (product.find(':') != product.npos) {
+ throw Exception("product string cannot contain a ':' character.");
+ }
+
+ attrMap["_vendor"] = vendor;
+ attrMap["_product"] = product;
+ if (!instance.empty()) {
+ attrMap["_instance"] = instance;
+ }
+}
+
+
+void ManagementAgentImpl::getName(string& vendor, string& product, string& instance)
+{
+ vendor = std::string(attrMap["_vendor"]);
+ product = std::string(attrMap["_product"]);
+ instance = std::string(attrMap["_instance"]);
+}
+
+
+const std::string& ManagementAgentImpl::getAddress()
+{
+ return name_address;
+}
+
+
+void ManagementAgentImpl::init(const string& brokerHost,
+ uint16_t brokerPort,
+ uint16_t intervalSeconds,
+ bool useExternalThread,
+ const string& _storeFile,
+ const string& uid,
+ const string& pwd,
+ const string& mech,
+ const string& proto)
+{
+ management::ConnectionSettings settings;
+ settings.protocol = proto;
+ settings.host = brokerHost;
+ settings.port = brokerPort;
+ settings.username = uid;
+ settings.password = pwd;
+ settings.mechanism = mech;
+ settings.heartbeat = 10;
+ init(settings, intervalSeconds, useExternalThread, _storeFile);
+}
+
+void ManagementAgentImpl::init(const qpid::management::ConnectionSettings& settings,
+ uint16_t intervalSeconds,
+ bool useExternalThread,
+ const string& _storeFile)
+{
+ std::string cfgVendor, cfgProduct, cfgInstance;
+
+ interval = intervalSeconds;
+ extThread = useExternalThread;
+ storeFile = _storeFile;
+ nextObjectId = 1;
+
+ //
+ // Convert from management::ConnectionSettings to client::ConnectionSettings
+ //
+ connectionSettings.protocol = settings.protocol;
+ connectionSettings.host = settings.host;
+ connectionSettings.port = settings.port;
+ connectionSettings.virtualhost = settings.virtualhost;
+ connectionSettings.username = settings.username;
+ connectionSettings.password = settings.password;
+ connectionSettings.mechanism = settings.mechanism;
+ connectionSettings.locale = settings.locale;
+ connectionSettings.heartbeat = settings.heartbeat;
+ connectionSettings.maxChannels = settings.maxChannels;
+ connectionSettings.maxFrameSize = settings.maxFrameSize;
+ connectionSettings.bounds = settings.bounds;
+ connectionSettings.tcpNoDelay = settings.tcpNoDelay;
+ connectionSettings.service = settings.service;
+ connectionSettings.minSsf = settings.minSsf;
+ connectionSettings.maxSsf = settings.maxSsf;
+
+ retrieveData(cfgVendor, cfgProduct, cfgInstance);
+
+ bootSequence++;
+ if ((bootSequence & 0xF000) != 0)
+ bootSequence = 1;
+
+ // setup the agent's name. The name may be set via a call to setName(). If setName()
+ // has not been called, the name can be read from the configuration file. If there is
+ // no name in the configuration file, a unique default name is provided.
+ if (attrMap.empty()) {
+ // setName() never called by application, so use names retrieved from config, otherwise defaults.
+ setName(cfgVendor.empty() ? defaultVendorName : cfgVendor,
+ cfgProduct.empty() ? defaultProductName : cfgProduct,
+ cfgInstance.empty() ? qpid::types::Uuid(true).str() : cfgInstance);
+ } else if (attrMap.find("_instance") == attrMap.end()) {
+ // setName() called, but instance was not specified, use config or generate a uuid
+ setName(attrMap["_vendor"].asString(), attrMap["_product"].asString(),
+ cfgInstance.empty() ? qpid::types::Uuid(true).str() : cfgInstance);
+ }
+
+ name_address = attrMap["_vendor"].asString() + ":" + attrMap["_product"].asString() + ":" + attrMap["_instance"].asString();
+ vendorNameKey = keyifyNameStr(attrMap["_vendor"].asString());
+ productNameKey = keyifyNameStr(attrMap["_product"].asString());
+ instanceNameKey = keyifyNameStr(attrMap["_instance"].asString());
+ attrMap["_name"] = name_address;
+
+ storeData(true);
+
+ QPID_LOG(info, "QMF Agent Initialized: broker=" << settings.host << ":" << settings.port <<
+ " interval=" << intervalSeconds << " storeFile=" << _storeFile << " name=" << name_address);
+
+ initialized = true;
+}
+
+void ManagementAgentImpl::registerClass(const string& packageName,
+ const string& className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
+{
+ sys::Mutex::ScopedLock lock(agentLock);
+ PackageMap::iterator pIter = findOrAddPackage(packageName);
+ addClassLocal(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall);
+}
+
+void ManagementAgentImpl::registerEvent(const string& packageName,
+ const string& eventName,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
+{
+ sys::Mutex::ScopedLock lock(agentLock);
+ PackageMap::iterator pIter = findOrAddPackage(packageName);
+ addClassLocal(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
+}
+
+// old-style add object: 64bit id - deprecated
+ObjectId ManagementAgentImpl::addObject(ManagementObject* object,
+ uint64_t persistId)
+{
+ std::string key;
+ if (persistId) {
+ key = boost::lexical_cast<std::string>(persistId);
+ }
+ return addObject(object, key, persistId != 0);
+}
+
+
+// new style add object - use this approach!
+ObjectId ManagementAgentImpl::addObject(ManagementObject* object,
+ const std::string& key,
+ bool persistent)
+{
+ sys::Mutex::ScopedLock lock(addLock);
+
+ uint16_t sequence = persistent ? 0 : bootSequence;
+
+ ObjectId objectId(&attachment, 0, sequence);
+ if (key.empty())
+ objectId.setV2Key(*object); // let object generate the key
+ else
+ objectId.setV2Key(key);
+ objectId.setAgentName(name_address);
+
+ object->setObjectId(objectId);
+ newManagementObjects[objectId] = boost::shared_ptr<ManagementObject>(object);
+ return objectId;
+}
+
+
+void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t severity)
+{
+ static const std::string severityStr[] = {
+ "emerg", "alert", "crit", "error", "warn",
+ "note", "info", "debug"
+ };
+ string content;
+ stringstream key;
+ Variant::Map headers;
+
+ {
+ sys::Mutex::ScopedLock lock(agentLock);
+ Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
+ uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
+
+ // key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." <<
+ // event.getPackageName() << "." << event.getEventName();
+ key << "agent.ind.event." << keyifyNameStr(event.getPackageName())
+ << "." << keyifyNameStr(event.getEventName())
+ << "." << severityStr[sev]
+ << "." << vendorNameKey
+ << "." << productNameKey
+ << "." << instanceNameKey;
+
+ Variant::Map map_;
+ Variant::Map schemaId;
+ Variant::Map values;
+
+ map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
+ event.getEventName(),
+ event.getMd5Sum(),
+ ManagementItem::CLASS_KIND_EVENT);
+ event.mapEncode(values);
+ map_["_values"] = values;
+ map_["_timestamp"] = uint64_t(Duration(EPOCH, now()));
+ map_["_severity"] = sev;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_event";
+ headers["qmf.agent"] = name_address;
+
+ Variant::List list;
+ list.push_back(map_);
+ ListCodec::encode(list, content);
+ }
+
+ connThreadBody.sendBuffer(content, "", headers, topicExchange, key.str(), "amqp/list");
+}
+
+uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
+{
+ sys::Mutex::ScopedLock lock(agentLock);
+
+ if (inCallback) {
+ QPID_LOG(critical, "pollCallbacks invoked from the agent's thread!");
+ return 0;
+ }
+
+ for (uint32_t idx = 0; callLimit == 0 || idx < callLimit; idx++) {
+ if (methodQueue.empty())
+ break;
+
+ QueuedMethod* item = methodQueue.front();
+ methodQueue.pop_front();
+ {
+ sys::Mutex::ScopedUnlock unlock(agentLock);
+ invokeMethodRequest(item->body, item->cid, item->replyToExchange, item->replyToKey, item->userId);
+ delete item;
+ }
+ }
+
+ if (pipeHandle != 0) {
+ char rbuf[100];
+ while (pipeHandle->read(rbuf, 100) > 0) ; // Consume all signaling bytes
+ }
+ return methodQueue.size();
+}
+
+int ManagementAgentImpl::getSignalFd()
+{
+ if (extThread) {
+ if (pipeHandle == 0)
+ pipeHandle = new PipeHandle(true);
+ return pipeHandle->getReadHandle();
+ }
+
+ return -1;
+}
+
+void ManagementAgentImpl::setSignalCallback(cb_t callback, void* context)
+{
+ sys::Mutex::ScopedLock lock(agentLock);
+ notifyCallback = callback;
+ notifyContext = context;
+}
+
+void ManagementAgentImpl::setSignalCallback(Notifyable& _notifyable)
+{
+ sys::Mutex::ScopedLock lock(agentLock);
+ notifyable = &_notifyable;
+}
+
+void ManagementAgentImpl::startProtocol()
+{
+ sendHeartbeat();
+ {
+ sys::Mutex::ScopedLock lock(agentLock);
+ publishAllData = true;
+ }
+}
+
+void ManagementAgentImpl::storeData(bool requested)
+{
+ if (!storeFile.empty()) {
+ ofstream outFile(storeFile.c_str());
+ uint32_t brokerBankToWrite = requested ? requestedBrokerBank : assignedBrokerBank;
+ uint32_t agentBankToWrite = requested ? requestedAgentBank : assignedAgentBank;
+
+ if (outFile.good()) {
+ outFile << storeMagicNumber << " " << brokerBankToWrite << " " <<
+ agentBankToWrite << " " << bootSequence << endl;
+
+ if (attrMap.find("_vendor") != attrMap.end())
+ outFile << "vendor=" << attrMap["_vendor"] << endl;
+ if (attrMap.find("_product") != attrMap.end())
+ outFile << "product=" << attrMap["_product"] << endl;
+ if (attrMap.find("_instance") != attrMap.end())
+ outFile << "instance=" << attrMap["_instance"] << endl;
+
+ outFile.close();
+ }
+ }
+}
+
+void ManagementAgentImpl::retrieveData(std::string& vendor, std::string& product, std::string& inst)
+{
+ vendor.clear();
+ product.clear();
+ inst.clear();
+
+ if (!storeFile.empty()) {
+ ifstream inFile(storeFile.c_str());
+ string mn;
+
+ if (inFile.good()) {
+ inFile >> mn;
+ if (mn == storeMagicNumber) {
+ std::string inText;
+
+ inFile >> requestedBrokerBank;
+ inFile >> requestedAgentBank;
+ inFile >> bootSequence;
+
+ while (inFile.good()) {
+ std::getline(inFile, inText);
+ if (!inText.compare(0, 7, "vendor=")) {
+ vendor = inText.substr(7);
+ QPID_LOG(debug, "read vendor name [" << vendor << "] from configuration file.");
+ } else if (!inText.compare(0, 8, "product=")) {
+ product = inText.substr(8);
+ QPID_LOG(debug, "read product name [" << product << "] from configuration file.");
+ } else if (!inText.compare(0, 9, "instance=")) {
+ inst = inText.substr(9);
+ QPID_LOG(debug, "read instance name [" << inst << "] from configuration file.");
+ }
+ }
+ }
+ inFile.close();
+ }
+ }
+}
+
+void ManagementAgentImpl::sendHeartbeat()
+{
+ static const string addr_key_base("agent.ind.heartbeat.");
+
+ Variant::Map map;
+ Variant::Map headers;
+ string content;
+ std::stringstream addr_key;
+
+ addr_key << addr_key_base << vendorNameKey
+ << "." << productNameKey
+ << "." << instanceNameKey;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_agent_heartbeat_indication";
+ headers["qmf.agent"] = name_address;
+
+ getHeartbeatContent(map);
+ MapCodec::encode(map, content);
+
+ // Set TTL (in msecs) on outgoing heartbeat indications based on the interval
+ // time to prevent stale heartbeats from getting to the consoles.
+
+ connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str(),
+ "amqp/map", interval * 2 * 1000);
+
+ QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
+}
+
+void ManagementAgentImpl::sendException(const string& rte, const string& rtk, const string& cid,
+ const string& text, uint32_t code)
+{
+ Variant::Map map;
+ Variant::Map headers;
+ Variant::Map values;
+ string content;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_exception";
+ headers["qmf.agent"] = name_address;
+
+ values["error_code"] = code;
+ values["error_text"] = text;
+ map["_values"] = values;
+
+ MapCodec::encode(map, content);
+ connThreadBody.sendBuffer(content, cid, headers, rte, rtk);
+
+ QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text);
+}
+
+void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& rte, const string& rtk)
+{
+ string packageName;
+ SchemaClassKey key;
+ uint32_t outLen(0);
+ char localBuffer[MA_BUFFER_SIZE];
+ Buffer outBuffer(localBuffer, MA_BUFFER_SIZE);
+ bool found(false);
+
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(key.name);
+ inBuffer.getBin128(key.hash);
+
+ QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name);
+
+ {
+ sys::Mutex::ScopedLock lock(agentLock);
+ PackageMap::iterator pIter = packages.find(packageName);
+ if (pIter != packages.end()) {
+ ClassMap& cMap = pIter->second;
+ ClassMap::iterator cIter = cMap.find(key);
+ if (cIter != cMap.end()) {
+ SchemaClass& schema = cIter->second;
+ string body;
+
+ encodeHeader(outBuffer, 's', sequence);
+ schema.writeSchemaCall(body);
+ outBuffer.putRawData(body);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ found = true;
+ }
+ }
+ }
+
+ if (found) {
+ connThreadBody.sendBuffer(outBuffer, outLen, rte, rtk);
+ QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name);
+ }
+}
+
+void ManagementAgentImpl::handleConsoleAddedIndication()
+{
+ sys::Mutex::ScopedLock lock(agentLock);
+ publishAllData = true;
+
+ QPID_LOG(trace, "RCVD ConsoleAddedInd");
+}
+
+void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& cid, const string& rte, const string& rtk, const string& userId)
+{
+ string methodName;
+ bool failed = false;
+ Variant::Map inMap;
+ Variant::Map outMap;
+ Variant::Map::const_iterator oid, mid;
+ string content;
+
+ MapCodec::decode(body, inMap);
+
+ if ((oid = inMap.find("_object_id")) == inMap.end() ||
+ (mid = inMap.find("_method_name")) == inMap.end()) {
+ sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID),
+ Manageable::STATUS_PARAMETER_INVALID);
+ failed = true;
+ } else {
+ string methodName;
+ ObjectId objId;
+ Variant::Map inArgs;
+ Variant::Map callMap;
+
+ try {
+ // conversions will throw if input is invalid.
+ objId = ObjectId(oid->second.asMap());
+ methodName = mid->second.getString();
+
+ mid = inMap.find("_arguments");
+ if (mid != inMap.end()) {
+ inArgs = (mid->second).asMap();
+ }
+
+ QPID_LOG(trace, "Invoking Method: name=" << methodName << " args=" << inArgs);
+
+ boost::shared_ptr<ManagementObject> oPtr;
+ {
+ sys::Mutex::ScopedLock lock(agentLock);
+ ObjectMap::iterator iter = managementObjects.find(objId);
+ if (iter != managementObjects.end() && !iter->second->isDeleted())
+ oPtr = iter->second;
+ }
+
+ if (oPtr.get() == 0) {
+ sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT),
+ Manageable::STATUS_UNKNOWN_OBJECT);
+ failed = true;
+ } else {
+ oPtr->doMethod(methodName, inArgs, callMap, userId);
+
+ if (callMap["_status_code"].asUint32() == 0) {
+ outMap["_arguments"] = Variant::Map();
+ for (Variant::Map::const_iterator iter = callMap.begin();
+ iter != callMap.end(); iter++)
+ if (iter->first != "_status_code" && iter->first != "_status_text")
+ outMap["_arguments"].asMap()[iter->first] = iter->second;
+ } else {
+ sendException(rte, rtk, cid, callMap["_status_text"], callMap["_status_code"]);
+ failed = true;
+ }
+ }
+
+ } catch(types::InvalidConversion& e) {
+ sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION);
+ failed = true;
+ }
+ }
+
+ if (!failed) {
+ Variant::Map headers;
+ headers["method"] = "response";
+ headers["qmf.agent"] = name_address;
+ headers["qmf.opcode"] = "_method_response";
+ QPID_LOG(trace, "SENT MethodResponse map=" << outMap);
+ MapCodec::encode(outMap, content);
+ connThreadBody.sendBuffer(content, cid, headers, rte, rtk);
+ }
+}
+
+void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& rte, const string& rtk)
+{
+ moveNewObjectsLH();
+
+ Variant::Map inMap;
+ Variant::Map::const_iterator i;
+ Variant::Map headers;
+
+ MapCodec::decode(body, inMap);
+ QPID_LOG(trace, "RCVD GetQuery: map=" << inMap << " cid=" << cid);
+
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_query_response";
+ headers["qmf.agent"] = name_address;
+ headers["partial"] = Variant();
+
+ Variant::List list_;
+ Variant::Map map_;
+ Variant::Map values;
+ Variant::Map oidMap;
+ string content;
+
+ /*
+ * Unpack the _what element of the query. Currently we only support OBJECT queries.
+ */
+ i = inMap.find("_what");
+ if (i == inMap.end()) {
+ sendException(rte, rtk, cid, "_what element missing in Query");
+ return;
+ }
+
+ if (i->second.getType() != qpid::types::VAR_STRING) {
+ sendException(rte, rtk, cid, "_what element is not a string");
+ return;
+ }
+
+ if (i->second.asString() == "OBJECT") {
+ headers["qmf.content"] = "_data";
+ /*
+ * Unpack the _object_id element of the query if it is present. If it is present, find that one
+ * object and return it. If it is not present, send a class-based result.
+ */
+ i = inMap.find("_object_id");
+ if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
+ ObjectId objId(i->second.asMap());
+ boost::shared_ptr<ManagementObject> object;
+
+ {
+ sys::Mutex::ScopedLock lock(agentLock);
+ ObjectMap::iterator iter = managementObjects.find(objId);
+ if (iter != managementObjects.end())
+ object = iter->second;
+ }
+
+ if (object.get() != 0) {
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ object->mapEncodeValues(values, true, true); // write both stats and properties
+ objId.mapEncode(oidMap);
+ map_["_values"] = values;
+ map_["_object_id"] = oidMap;
+ object->writeTimestamps(map_);
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ object->getMd5Sum());
+ list_.push_back(map_);
+ headers.erase("partial");
+
+ ListCodec::encode(list_, content);
+ connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk);
+ return;
+ }
+ } else { // match using schema_id, if supplied
+
+ string className;
+ string packageName;
+
+ i = inMap.find("_schema_id");
+ if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
+ const Variant::Map& schemaIdMap(i->second.asMap());
+
+ Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name");
+ if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
+ className = s_iter->second.asString();
+
+ s_iter = schemaIdMap.find("_package_name");
+ if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
+ packageName = s_iter->second.asString();
+
+ typedef list<boost::shared_ptr<ManagementObject> > StageList;
+ StageList staging;
+
+ {
+ sys::Mutex::ScopedLock lock(agentLock);
+ for (ObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject* object = iter->second.get();
+ if (object->getClassName() == className &&
+ (packageName.empty() || object->getPackageName() == packageName))
+ staging.push_back(iter->second);
+ }
+ }
+
+ unsigned int objCount = 0;
+ for (StageList::iterator iter = staging.begin(); iter != staging.end(); iter++) {
+ ManagementObject* object = iter->get();
+ if (object->getClassName() == className &&
+ (packageName.empty() || object->getPackageName() == packageName)) {
+
+ values.clear();
+ oidMap.clear();
+ map_.clear();
+
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ object->mapEncodeValues(values, true, true); // write both stats and properties
+ object->getObjectId().mapEncode(oidMap);
+ map_["_values"] = values;
+ map_["_object_id"] = oidMap;
+ object->writeTimestamps(map_);
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ object->getMd5Sum());
+ list_.push_back(map_);
+
+ if (++objCount >= maxV2ReplyObjs) {
+ objCount = 0;
+ ListCodec::encode(list_, content);
+ connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk);
+ content.clear();
+ list_.clear();
+ }
+ }
+ }
+ }
+ }
+
+ // Send last "non-partial" message to indicate CommandComplete
+ headers.erase("partial");
+ ListCodec::encode(list_, content);
+ connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (last message, no 'partial' indicator) to=" << rte << "/" << rtk);
+
+ } else if (i->second.asString() == "SCHEMA_ID") {
+ headers["qmf.content"] = "_schema_id";
+ /**
+ * @todo - support for a predicate. For now, send a list of all known schema class keys.
+ */
+ for (PackageMap::iterator pIter = packages.begin();
+ pIter != packages.end(); pIter++) {
+ for (ClassMap::iterator cIter = pIter->second.begin();
+ cIter != pIter->second.end(); cIter++) {
+
+ list_.push_back(mapEncodeSchemaId( pIter->first,
+ cIter->first.name,
+ cIter->first.hash,
+ cIter->second.kind ));
+ }
+ }
+
+ headers.erase("partial");
+ ListCodec::encode(list_, content);
+ connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (SchemaId) to=" << rte << "/" << rtk);
+
+ } else {
+ // Unknown query target
+ sendException(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported");
+ }
+}
+
+void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& rte, const string& rtk)
+{
+ QPID_LOG(trace, "RCVD AgentLocateRequest");
+
+ Variant::Map map;
+ Variant::Map headers;
+ string content;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_agent_locate_response";
+ headers["qmf.agent"] = name_address;
+
+ getHeartbeatContent(map);
+ MapCodec::encode(map, content);
+ connThreadBody.sendBuffer(content, cid, headers, rte, rtk);
+
+ QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << rte << "/" << rtk);
+
+ {
+ sys::Mutex::ScopedLock lock(agentLock);
+ publishAllData = true;
+ }
+}
+
+void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& rte, const string& rtk, const string& userId)
+{
+ if (extThread) {
+ sys::Mutex::ScopedLock lock(agentLock);
+
+ methodQueue.push_back(new QueuedMethod(cid, rte, rtk, body, userId));
+ if (pipeHandle != 0) {
+ pipeHandle->write("X", 1);
+ } else if (notifyable != 0) {
+ inCallback = true;
+ {
+ sys::Mutex::ScopedUnlock unlock(agentLock);
+ notifyable->notify();
+ }
+ inCallback = false;
+ } else if (notifyCallback != 0) {
+ inCallback = true;
+ {
+ sys::Mutex::ScopedUnlock unlock(agentLock);
+ notifyCallback(notifyContext);
+ }
+ inCallback = false;
+ }
+ } else {
+ invokeMethodRequest(body, cid, rte, rtk, userId);
+ }
+
+ QPID_LOG(trace, "RCVD MethodRequest");
+}
+
+void ManagementAgentImpl::received(Message& msg)
+{
+ string replyToExchange;
+ string replyToKey;
+ framing::MessageProperties mp = msg.getMessageProperties();
+ if (mp.hasReplyTo()) {
+ const framing::ReplyTo& rt = mp.getReplyTo();
+ replyToExchange = rt.getExchange();
+ replyToKey = rt.getRoutingKey();
+ }
+
+ string userId;
+ if (mp.hasUserId())
+ userId = mp.getUserId();
+
+ if (mp.hasAppId() && mp.getAppId() == "qmf2")
+ {
+ string opcode = mp.getApplicationHeaders().getAsString("qmf.opcode");
+ string cid = msg.getMessageProperties().getCorrelationId();
+
+ if (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToExchange, replyToKey);
+ else if (opcode == "_method_request") handleMethodRequest(msg.getData(), cid, replyToExchange, replyToKey, userId);
+ else if (opcode == "_query_request") handleGetQuery(msg.getData(), cid, replyToExchange, replyToKey);
+ else {
+ QPID_LOG(warning, "Support for QMF V2 Opcode [" << opcode << "] TBD!!!");
+ }
+ return;
+ }
+
+ // old preV2 binary messages
+
+ uint32_t sequence;
+ string data = msg.getData();
+ Buffer inBuffer(const_cast<char*>(data.c_str()), data.size());
+ uint8_t opcode;
+
+
+ if (checkHeader(inBuffer, &opcode, &sequence))
+ {
+ if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey);
+ else if (opcode == 'x') handleConsoleAddedIndication();
+ else
+ QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode));
+ }
+}
+
+
+void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq)
+{
+ buf.putOctet('A');
+ buf.putOctet('M');
+ buf.putOctet('2');
+ buf.putOctet(opcode);
+ buf.putLong (seq);
+}
+
+Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname,
+ const string& cname,
+ const uint8_t *md5Sum,
+ uint8_t type)
+{
+ Variant::Map map_;
+
+ map_["_package_name"] = pname;
+ map_["_class_name"] = cname;
+ map_["_hash"] = types::Uuid(md5Sum);
+ if (type == ManagementItem::CLASS_KIND_EVENT)
+ map_["_type"] = "_event";
+ else
+ map_["_type"] = "_data";
+
+ return map_;
+}
+
+
+bool ManagementAgentImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq)
+{
+ if (buf.getSize() < 8)
+ return false;
+
+ uint8_t h1 = buf.getOctet();
+ uint8_t h2 = buf.getOctet();
+ uint8_t h3 = buf.getOctet();
+
+ *opcode = buf.getOctet();
+ *seq = buf.getLong();
+
+ return h1 == 'A' && h2 == 'M' && h3 == '2';
+}
+
+ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::findOrAddPackage(const string& name)
+{
+ PackageMap::iterator pIter = packages.find(name);
+ if (pIter != packages.end())
+ return pIter;
+
+ // No such package found, create a new map entry.
+ pair<PackageMap::iterator, bool> result =
+ packages.insert(pair<string, ClassMap>(name, ClassMap()));
+
+ return result.first;
+}
+
+void ManagementAgentImpl::moveNewObjectsLH()
+{
+ sys::Mutex::ScopedLock lock(addLock);
+ for (ObjectMap::iterator iter = newManagementObjects.begin();
+ iter != newManagementObjects.end();
+ iter++)
+ managementObjects[iter->first] = iter->second;
+ newManagementObjects.clear();
+}
+
+void ManagementAgentImpl::addClassLocal(uint8_t classKind,
+ PackageMap::iterator pIter,
+ const string& className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
+{
+ SchemaClassKey key;
+ ClassMap& cMap = pIter->second;
+
+ key.name = className;
+ memcpy(&key.hash, md5Sum, 16);
+
+ ClassMap::iterator cIter = cMap.find(key);
+ if (cIter != cMap.end())
+ return;
+
+ // No such class found, create a new class with local information.
+ cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall, classKind)));
+ schemaTimestamp = Duration(EPOCH, now());
+ QPID_LOG(trace, "Updated schema timestamp, now=" << uint64_t(schemaTimestamp));
+}
+
+void ManagementAgentImpl::encodePackageIndication(Buffer& buf,
+ PackageMap::iterator pIter)
+{
+ buf.putShortString((*pIter).first);
+
+ QPID_LOG(trace, "SENT PackageInd: package=" << (*pIter).first);
+}
+
+void ManagementAgentImpl::encodeClassIndication(Buffer& buf,
+ PackageMap::iterator pIter,
+ ClassMap::iterator cIter)
+{
+ SchemaClassKey key = (*cIter).first;
+
+ buf.putOctet((*cIter).second.kind);
+ buf.putShortString((*pIter).first);
+ buf.putShortString(key.name);
+ buf.putBin128(key.hash);
+
+ QPID_LOG(trace, "SENT ClassInd: package=" << (*pIter).first << " class=" << key.name);
+}
+
+struct MessageItem {
+ string content;
+ Variant::Map headers;
+ string key;
+ MessageItem(const Variant::Map& h, const string& k) : headers(h), key(k) {}
+};
+
+void ManagementAgentImpl::periodicProcessing()
+{
+ string addr_key_base = "agent.ind.data.";
+ list<ObjectId> deleteList;
+ list<boost::shared_ptr<MessageItem> > message_list;
+
+ sendHeartbeat();
+
+ {
+ sys::Mutex::ScopedLock lock(agentLock);
+
+ if (!connected)
+ return;
+
+ moveNewObjectsLH();
+
+ //
+ // Clear the been-here flag on all objects in the map.
+ //
+ for (ObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject* object = iter->second.get();
+ object->setFlags(0);
+ if (publishAllData) {
+ object->setForcePublish(true);
+ }
+ }
+
+ publishAllData = false;
+
+ //
+ // Process the entire object map.
+ //
+ uint32_t v2Objs = 0;
+
+ for (ObjectMap::iterator baseIter = managementObjects.begin();
+ baseIter != managementObjects.end();
+ baseIter++) {
+ ManagementObject* baseObject = baseIter->second.get();
+
+ //
+ // Skip until we find a base object requiring a sent message.
+ //
+ if (baseObject->getFlags() == 1 ||
+ (!baseObject->getConfigChanged() &&
+ !baseObject->getInstChanged() &&
+ !baseObject->getForcePublish() &&
+ !baseObject->isDeleted()))
+ continue;
+
+ std::string packageName = baseObject->getPackageName();
+ std::string className = baseObject->getClassName();
+
+ Variant::List list_;
+ std::stringstream addr_key;
+ Variant::Map headers;
+
+ addr_key << addr_key_base;
+ addr_key << keyifyNameStr(packageName)
+ << "." << keyifyNameStr(className)
+ << "." << vendorNameKey
+ << "." << productNameKey
+ << "." << instanceNameKey;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+
+ for (ObjectMap::iterator iter = baseIter;
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject* object = iter->second.get();
+ bool send_stats, send_props;
+ if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
+ object->setFlags(1);
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted());
+ send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
+
+ if (send_stats || send_props) {
+ Variant::Map map_;
+ Variant::Map values;
+ Variant::Map oid;
+
+ object->getObjectId().mapEncode(oid);
+ map_["_object_id"] = oid;
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ object->getMd5Sum());
+ object->writeTimestamps(map_);
+ object->mapEncodeValues(values, send_props, send_stats);
+ map_["_values"] = values;
+ list_.push_back(map_);
+
+ if (++v2Objs >= maxV2ReplyObjs) {
+ v2Objs = 0;
+ boost::shared_ptr<MessageItem> item(new MessageItem(headers, addr_key.str()));
+ ListCodec::encode(list_, item->content);
+ message_list.push_back(item);
+ list_.clear();
+ }
+ }
+
+ if (object->isDeleted())
+ deleteList.push_back(iter->first);
+ object->setForcePublish(false);
+ }
+ }
+
+ if (!list_.empty()) {
+ boost::shared_ptr<MessageItem> item(new MessageItem(headers, addr_key.str()));
+ ListCodec::encode(list_, item->content);
+ message_list.push_back(item);
+ }
+ }
+
+ // Delete flagged objects
+ for (list<ObjectId>::reverse_iterator iter = deleteList.rbegin();
+ iter != deleteList.rend();
+ iter++)
+ managementObjects.erase(*iter);
+ }
+
+ while (!message_list.empty()) {
+ boost::shared_ptr<MessageItem> item(message_list.front());
+ message_list.pop_front();
+ connThreadBody.sendBuffer(item->content, "", item->headers, topicExchange, item->key, "amqp/list");
+ QPID_LOG(trace, "SENT DataIndication");
+ }
+}
+
+
+void ManagementAgentImpl::getHeartbeatContent(qpid::types::Variant::Map& map)
+{
+ map["_values"] = attrMap;
+ map["_values"].asMap()["_timestamp"] = uint64_t(Duration(EPOCH, now()));
+ map["_values"].asMap()["_heartbeat_interval"] = interval;
+ map["_values"].asMap()["_epoch"] = bootSequence;
+ map["_values"].asMap()["_schema_updated"] = uint64_t(schemaTimestamp);
+}
+
+void ManagementAgentImpl::ConnectionThread::run()
+{
+ static const int delayMin(1);
+ static const int delayMax(128);
+ static const int delayFactor(2);
+ int delay(delayMin);
+ string dest("qmfagent");
+ ConnectionThread::shared_ptr tmp;
+
+ sessionId.generate();
+ queueName << "qmfagent-" << sessionId;
+
+ while (true) {
+ try {
+ if (agent.initialized) {
+ QPID_LOG(debug, "QMF Agent attempting to connect to the broker...");
+ connection.open(agent.connectionSettings);
+ session = connection.newSession(queueName.str());
+ subscriptions.reset(new client::SubscriptionManager(session));
+
+ session.queueDeclare(arg::queue=queueName.str(), arg::autoDelete=true,
+ arg::exclusive=true);
+ session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(),
+ arg::bindingKey=queueName.str());
+ session.exchangeBind(arg::exchange=agent.directExchange, arg::queue=queueName.str(),
+ arg::bindingKey=agent.name_address);
+ session.exchangeBind(arg::exchange=agent.topicExchange, arg::queue=queueName.str(),
+ arg::bindingKey="console.#");
+
+ subscriptions->subscribe(agent, queueName.str(), dest);
+ QPID_LOG(info, "Connection established with broker");
+ {
+ sys::Mutex::ScopedLock _lock(connLock);
+ if (shutdown)
+ return;
+ operational = true;
+ agent.connected = true;
+ agent.startProtocol();
+ try {
+ sys::Mutex::ScopedUnlock _unlock(connLock);
+ subscriptions->run();
+ } catch (exception) {}
+
+ QPID_LOG(warning, "Connection to the broker has been lost");
+
+ operational = false;
+ agent.connected = false;
+ tmp = subscriptions;
+ subscriptions.reset();
+ }
+ tmp.reset(); // frees the subscription outside the lock
+ delay = delayMin;
+ connection.close();
+ }
+ } catch (exception &e) {
+ if (delay < delayMax)
+ delay *= delayFactor;
+ QPID_LOG(debug, "Connection failed: exception=" << e.what());
+ }
+
+ {
+ // sleep for "delay" seconds, but peridically check if the
+ // agent is shutting down so we don't hang for up to delayMax
+ // seconds during agent shutdown
+ sys::Mutex::ScopedLock _lock(connLock);
+ if (shutdown)
+ return;
+ sleeping = true;
+ int totalSleep = 0;
+ do {
+ sys::Mutex::ScopedUnlock _unlock(connLock);
+ ::sleep(delayMin);
+ totalSleep += delayMin;
+ } while (totalSleep < delay && !shutdown);
+ sleeping = false;
+ if (shutdown)
+ return;
+ }
+ }
+}
+
+ManagementAgentImpl::ConnectionThread::~ConnectionThread()
+{
+}
+
+void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf,
+ uint32_t length,
+ const string& exchange,
+ const string& routingKey)
+{
+ Message msg;
+ string data;
+
+ buf.getRawData(data, length);
+ msg.setData(data);
+ sendMessage(msg, exchange, routingKey);
+}
+
+
+
+void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data,
+ const string& cid,
+ const Variant::Map headers,
+ const string& exchange,
+ const string& routingKey,
+ const string& contentType,
+ uint64_t ttl_msec)
+{
+ Message msg;
+ Variant::Map::const_iterator i;
+
+ if (!cid.empty())
+ msg.getMessageProperties().setCorrelationId(cid);
+
+ if (!contentType.empty())
+ msg.getMessageProperties().setContentType(contentType);
+
+ if (ttl_msec)
+ msg.getDeliveryProperties().setTtl(ttl_msec);
+
+ for (i = headers.begin(); i != headers.end(); ++i) {
+ msg.getHeaders().setString(i->first, i->second.asString());
+ }
+
+ msg.setData(data);
+ sendMessage(msg, exchange, routingKey);
+}
+
+
+
+
+
+void ManagementAgentImpl::ConnectionThread::sendMessage(Message msg,
+ const string& exchange,
+ const string& routingKey)
+{
+ ConnectionThread::shared_ptr s;
+ {
+ sys::Mutex::ScopedLock _lock(connLock);
+ if (!operational)
+ return;
+ s = subscriptions;
+ }
+
+ msg.getDeliveryProperties().setRoutingKey(routingKey);
+ msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
+ msg.getMessageProperties().getApplicationHeaders().setString("qmf.agent", agent.name_address);
+ msg.getMessageProperties().setAppId("qmf2");
+ try {
+ session.messageTransfer(arg::content=msg, arg::destination=exchange);
+ } catch(exception& e) {
+ QPID_LOG(error, "Exception caught in sendMessage: " << e.what());
+ // Bounce the connection
+ if (s)
+ s->stop();
+ }
+}
+
+
+
+void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank)
+{
+ stringstream key;
+ key << "agent." << brokerBank << "." << agentBank;
+ session.exchangeBind(arg::exchange="qpid.management", arg::queue=queueName.str(),
+ arg::bindingKey=key.str());
+}
+
+void ManagementAgentImpl::ConnectionThread::close()
+{
+ ConnectionThread::shared_ptr s;
+ {
+ sys::Mutex::ScopedLock _lock(connLock);
+ shutdown = true;
+ s = subscriptions;
+ }
+ if (s)
+ s->stop();
+}
+
+bool ManagementAgentImpl::ConnectionThread::isSleeping() const
+{
+ sys::Mutex::ScopedLock _lock(connLock);
+ return sleeping;
+}
+
+
+void ManagementAgentImpl::PublishThread::run()
+{
+ uint16_t totalSleep;
+
+ while (!shutdown) {
+ agent.periodicProcessing();
+ totalSleep = 0;
+ while (totalSleep++ < agent.getInterval() && !shutdown) {
+ ::sleep(1);
+ }
+ }
+}