diff options
Diffstat (limited to 'qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 1390 |
1 files changed, 1390 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp new file mode 100644 index 0000000000..633401ef5b --- /dev/null +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -0,0 +1,1390 @@ + +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "qpid/management/Manageable.h" +#include "qpid/management/ManagementObject.h" +#include "qpid/log/Statement.h" +#include "qpid/agent/ManagementAgentImpl.h" +#include "qpid/amqp_0_10/Codecs.h" +#include <list> +#include <string.h> +#include <stdlib.h> +#include <sys/types.h> +#include <iostream> +#include <fstream> +#include <boost/lexical_cast.hpp> + +using namespace qpid::client; +using namespace qpid::framing; +using namespace qpid::management; +using namespace qpid::sys; +using namespace std; +using std::stringstream; +using std::ofstream; +using std::ifstream; +using std::string; +using std::endl; +using qpid::types::Variant; +using qpid::amqp_0_10::MapCodec; +using qpid::amqp_0_10::ListCodec; + +namespace { + qpid::sys::Mutex lock; + bool disabled = false; + ManagementAgent* agent = 0; + int refCount = 0; + + const string defaultVendorName("vendor"); + const string defaultProductName("product"); + + // Create a valid binding key substring by + // replacing all '.' chars with '_' + const string keyifyNameStr(const string& name) + { + string n2 = name; + + size_t pos = n2.find('.'); + while (pos != n2.npos) { + n2.replace(pos, 1, "_"); + pos = n2.find('.', pos); + } + return n2; + } +} + +ManagementAgent::Singleton::Singleton(bool disableManagement) +{ + sys::Mutex::ScopedLock _lock(lock); + if (disableManagement && !disabled) { + disabled = true; + assert(refCount == 0); // can't disable after agent has been allocated + } + if (refCount == 0 && !disabled) + agent = new ManagementAgentImpl(); + refCount++; +} + +ManagementAgent::Singleton::~Singleton() +{ + sys::Mutex::ScopedLock _lock(lock); + refCount--; + if (refCount == 0 && !disabled) { + delete agent; + agent = 0; + } +} + +ManagementAgent* ManagementAgent::Singleton::getInstance() +{ + return agent; +} + +const string ManagementAgentImpl::storeMagicNumber("MA02"); + +ManagementAgentImpl::ManagementAgentImpl() : + interval(10), extThread(false), pipeHandle(0), notifyCallback(0), notifyContext(0), + notifyable(0), inCallback(false), + initialized(false), connected(false), useMapMsg(false), lastFailure("never connected"), + topicExchange("qmf.default.topic"), directExchange("qmf.default.direct"), + schemaTimestamp(Duration(EPOCH, now())), + publishAllData(true), requestedBrokerBank(0), requestedAgentBank(0), + assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0), + maxV2ReplyObjs(10), // KAG todo: make this a tuneable parameter + connThreadBody(*this), connThread(connThreadBody), + pubThreadBody(*this), pubThread(pubThreadBody) +{ +} + +ManagementAgentImpl::~ManagementAgentImpl() +{ + // shutdown & cleanup all threads + connThreadBody.close(); + pubThreadBody.close(); + + connThread.join(); + pubThread.join(); + + if (pipeHandle) { + delete pipeHandle; + pipeHandle = 0; + } +} + +void ManagementAgentImpl::setName(const string& vendor, const string& product, const string& instance) +{ + if (vendor.find(':') != vendor.npos) { + throw Exception("vendor string cannot contain a ':' character."); + } + if (product.find(':') != product.npos) { + throw Exception("product string cannot contain a ':' character."); + } + + attrMap["_vendor"] = vendor; + attrMap["_product"] = product; + if (!instance.empty()) { + attrMap["_instance"] = instance; + } +} + + +void ManagementAgentImpl::getName(string& vendor, string& product, string& instance) +{ + vendor = std::string(attrMap["_vendor"]); + product = std::string(attrMap["_product"]); + instance = std::string(attrMap["_instance"]); +} + + +const std::string& ManagementAgentImpl::getAddress() +{ + return name_address; +} + + +void ManagementAgentImpl::init(const string& brokerHost, + uint16_t brokerPort, + uint16_t intervalSeconds, + bool useExternalThread, + const string& _storeFile, + const string& uid, + const string& pwd, + const string& mech, + const string& proto) +{ + management::ConnectionSettings settings; + settings.protocol = proto; + settings.host = brokerHost; + settings.port = brokerPort; + settings.username = uid; + settings.password = pwd; + settings.mechanism = mech; + settings.heartbeat = 10; + init(settings, intervalSeconds, useExternalThread, _storeFile); +} + +void ManagementAgentImpl::init(const qpid::management::ConnectionSettings& settings, + uint16_t intervalSeconds, + bool useExternalThread, + const string& _storeFile) +{ + std::string cfgVendor, cfgProduct, cfgInstance; + + interval = intervalSeconds; + extThread = useExternalThread; + storeFile = _storeFile; + nextObjectId = 1; + + // + // Convert from management::ConnectionSettings to client::ConnectionSettings + // + connectionSettings.protocol = settings.protocol; + connectionSettings.host = settings.host; + connectionSettings.port = settings.port; + connectionSettings.virtualhost = settings.virtualhost; + connectionSettings.username = settings.username; + connectionSettings.password = settings.password; + connectionSettings.mechanism = settings.mechanism; + connectionSettings.locale = settings.locale; + connectionSettings.heartbeat = settings.heartbeat; + connectionSettings.maxChannels = settings.maxChannels; + connectionSettings.maxFrameSize = settings.maxFrameSize; + connectionSettings.bounds = settings.bounds; + connectionSettings.tcpNoDelay = settings.tcpNoDelay; + connectionSettings.service = settings.service; + connectionSettings.minSsf = settings.minSsf; + connectionSettings.maxSsf = settings.maxSsf; + + retrieveData(cfgVendor, cfgProduct, cfgInstance); + + bootSequence++; + if ((bootSequence & 0xF000) != 0) + bootSequence = 1; + + // setup the agent's name. The name may be set via a call to setName(). If setName() + // has not been called, the name can be read from the configuration file. If there is + // no name in the configuration file, a unique default name is provided. + if (attrMap.empty()) { + // setName() never called by application, so use names retrieved from config, otherwise defaults. + setName(cfgVendor.empty() ? defaultVendorName : cfgVendor, + cfgProduct.empty() ? defaultProductName : cfgProduct, + cfgInstance.empty() ? qpid::types::Uuid(true).str() : cfgInstance); + } else if (attrMap.find("_instance") == attrMap.end()) { + // setName() called, but instance was not specified, use config or generate a uuid + setName(attrMap["_vendor"].asString(), attrMap["_product"].asString(), + cfgInstance.empty() ? qpid::types::Uuid(true).str() : cfgInstance); + } + + name_address = attrMap["_vendor"].asString() + ":" + attrMap["_product"].asString() + ":" + attrMap["_instance"].asString(); + vendorNameKey = keyifyNameStr(attrMap["_vendor"].asString()); + productNameKey = keyifyNameStr(attrMap["_product"].asString()); + instanceNameKey = keyifyNameStr(attrMap["_instance"].asString()); + attrMap["_name"] = name_address; + + storeData(true); + + QPID_LOG(info, "QMF Agent Initialized: broker=" << settings.host << ":" << settings.port << + " interval=" << intervalSeconds << " storeFile=" << _storeFile << " name=" << name_address); + + initialized = true; +} + +void ManagementAgentImpl::registerClass(const string& packageName, + const string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) +{ + sys::Mutex::ScopedLock lock(agentLock); + PackageMap::iterator pIter = findOrAddPackage(packageName); + addClassLocal(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall); +} + +void ManagementAgentImpl::registerEvent(const string& packageName, + const string& eventName, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) +{ + sys::Mutex::ScopedLock lock(agentLock); + PackageMap::iterator pIter = findOrAddPackage(packageName); + addClassLocal(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall); +} + +// old-style add object: 64bit id - deprecated +ObjectId ManagementAgentImpl::addObject(ManagementObject* object, + uint64_t persistId) +{ + std::string key; + if (persistId) { + key = boost::lexical_cast<std::string>(persistId); + } + return addObject(object, key, persistId != 0); +} + + +// new style add object - use this approach! +ObjectId ManagementAgentImpl::addObject(ManagementObject* object, + const std::string& key, + bool persistent) +{ + sys::Mutex::ScopedLock lock(addLock); + + uint16_t sequence = persistent ? 0 : bootSequence; + + ObjectId objectId(&attachment, 0, sequence); + if (key.empty()) + objectId.setV2Key(*object); // let object generate the key + else + objectId.setV2Key(key); + objectId.setAgentName(name_address); + + object->setObjectId(objectId); + newManagementObjects[objectId] = boost::shared_ptr<ManagementObject>(object); + return objectId; +} + + +void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t severity) +{ + static const std::string severityStr[] = { + "emerg", "alert", "crit", "error", "warn", + "note", "info", "debug" + }; + string content; + stringstream key; + Variant::Map headers; + + { + sys::Mutex::ScopedLock lock(agentLock); + Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); + uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; + + // key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." << + // event.getPackageName() << "." << event.getEventName(); + key << "agent.ind.event." << keyifyNameStr(event.getPackageName()) + << "." << keyifyNameStr(event.getEventName()) + << "." << severityStr[sev] + << "." << vendorNameKey + << "." << productNameKey + << "." << instanceNameKey; + + Variant::Map map_; + Variant::Map schemaId; + Variant::Map values; + + map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(), + event.getEventName(), + event.getMd5Sum(), + ManagementItem::CLASS_KIND_EVENT); + event.mapEncode(values); + map_["_values"] = values; + map_["_timestamp"] = uint64_t(Duration(EPOCH, now())); + map_["_severity"] = sev; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_event"; + headers["qmf.agent"] = name_address; + + Variant::List list; + list.push_back(map_); + ListCodec::encode(list, content); + } + + connThreadBody.sendBuffer(content, "", headers, topicExchange, key.str(), "amqp/list"); +} + +uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) +{ + sys::Mutex::ScopedLock lock(agentLock); + + if (inCallback) { + QPID_LOG(critical, "pollCallbacks invoked from the agent's thread!"); + return 0; + } + + for (uint32_t idx = 0; callLimit == 0 || idx < callLimit; idx++) { + if (methodQueue.empty()) + break; + + QueuedMethod* item = methodQueue.front(); + methodQueue.pop_front(); + { + sys::Mutex::ScopedUnlock unlock(agentLock); + invokeMethodRequest(item->body, item->cid, item->replyToExchange, item->replyToKey, item->userId); + delete item; + } + } + + if (pipeHandle != 0) { + char rbuf[100]; + while (pipeHandle->read(rbuf, 100) > 0) ; // Consume all signaling bytes + } + return methodQueue.size(); +} + +int ManagementAgentImpl::getSignalFd() +{ + if (extThread) { + if (pipeHandle == 0) + pipeHandle = new PipeHandle(true); + return pipeHandle->getReadHandle(); + } + + return -1; +} + +void ManagementAgentImpl::setSignalCallback(cb_t callback, void* context) +{ + sys::Mutex::ScopedLock lock(agentLock); + notifyCallback = callback; + notifyContext = context; +} + +void ManagementAgentImpl::setSignalCallback(Notifyable& _notifyable) +{ + sys::Mutex::ScopedLock lock(agentLock); + notifyable = &_notifyable; +} + +void ManagementAgentImpl::startProtocol() +{ + sendHeartbeat(); + { + sys::Mutex::ScopedLock lock(agentLock); + publishAllData = true; + } +} + +void ManagementAgentImpl::storeData(bool requested) +{ + if (!storeFile.empty()) { + ofstream outFile(storeFile.c_str()); + uint32_t brokerBankToWrite = requested ? requestedBrokerBank : assignedBrokerBank; + uint32_t agentBankToWrite = requested ? requestedAgentBank : assignedAgentBank; + + if (outFile.good()) { + outFile << storeMagicNumber << " " << brokerBankToWrite << " " << + agentBankToWrite << " " << bootSequence << endl; + + if (attrMap.find("_vendor") != attrMap.end()) + outFile << "vendor=" << attrMap["_vendor"] << endl; + if (attrMap.find("_product") != attrMap.end()) + outFile << "product=" << attrMap["_product"] << endl; + if (attrMap.find("_instance") != attrMap.end()) + outFile << "instance=" << attrMap["_instance"] << endl; + + outFile.close(); + } + } +} + +void ManagementAgentImpl::retrieveData(std::string& vendor, std::string& product, std::string& inst) +{ + vendor.clear(); + product.clear(); + inst.clear(); + + if (!storeFile.empty()) { + ifstream inFile(storeFile.c_str()); + string mn; + + if (inFile.good()) { + inFile >> mn; + if (mn == storeMagicNumber) { + std::string inText; + + inFile >> requestedBrokerBank; + inFile >> requestedAgentBank; + inFile >> bootSequence; + + while (inFile.good()) { + std::getline(inFile, inText); + if (!inText.compare(0, 7, "vendor=")) { + vendor = inText.substr(7); + QPID_LOG(debug, "read vendor name [" << vendor << "] from configuration file."); + } else if (!inText.compare(0, 8, "product=")) { + product = inText.substr(8); + QPID_LOG(debug, "read product name [" << product << "] from configuration file."); + } else if (!inText.compare(0, 9, "instance=")) { + inst = inText.substr(9); + QPID_LOG(debug, "read instance name [" << inst << "] from configuration file."); + } + } + } + inFile.close(); + } + } +} + +void ManagementAgentImpl::sendHeartbeat() +{ + static const string addr_key_base("agent.ind.heartbeat."); + + Variant::Map map; + Variant::Map headers; + string content; + std::stringstream addr_key; + + addr_key << addr_key_base << vendorNameKey + << "." << productNameKey + << "." << instanceNameKey; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_agent_heartbeat_indication"; + headers["qmf.agent"] = name_address; + + getHeartbeatContent(map); + MapCodec::encode(map, content); + + // Set TTL (in msecs) on outgoing heartbeat indications based on the interval + // time to prevent stale heartbeats from getting to the consoles. + + connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str(), + "amqp/map", interval * 2 * 1000); + + QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); +} + +void ManagementAgentImpl::sendException(const string& rte, const string& rtk, const string& cid, + const string& text, uint32_t code) +{ + Variant::Map map; + Variant::Map headers; + Variant::Map values; + string content; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_exception"; + headers["qmf.agent"] = name_address; + + values["error_code"] = code; + values["error_text"] = text; + map["_values"] = values; + + MapCodec::encode(map, content); + connThreadBody.sendBuffer(content, cid, headers, rte, rtk); + + QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text); +} + +void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& rte, const string& rtk) +{ + string packageName; + SchemaClassKey key; + uint32_t outLen(0); + char localBuffer[MA_BUFFER_SIZE]; + Buffer outBuffer(localBuffer, MA_BUFFER_SIZE); + bool found(false); + + inBuffer.getShortString(packageName); + inBuffer.getShortString(key.name); + inBuffer.getBin128(key.hash); + + QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name); + + { + sys::Mutex::ScopedLock lock(agentLock); + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) { + ClassMap& cMap = pIter->second; + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end()) { + SchemaClass& schema = cIter->second; + string body; + + encodeHeader(outBuffer, 's', sequence); + schema.writeSchemaCall(body); + outBuffer.putRawData(body); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + found = true; + } + } + } + + if (found) { + connThreadBody.sendBuffer(outBuffer, outLen, rte, rtk); + QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name); + } +} + +void ManagementAgentImpl::handleConsoleAddedIndication() +{ + sys::Mutex::ScopedLock lock(agentLock); + publishAllData = true; + + QPID_LOG(trace, "RCVD ConsoleAddedInd"); +} + +void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& cid, const string& rte, const string& rtk, const string& userId) +{ + string methodName; + bool failed = false; + Variant::Map inMap; + Variant::Map outMap; + Variant::Map::const_iterator oid, mid; + string content; + + MapCodec::decode(body, inMap); + + if ((oid = inMap.find("_object_id")) == inMap.end() || + (mid = inMap.find("_method_name")) == inMap.end()) { + sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID), + Manageable::STATUS_PARAMETER_INVALID); + failed = true; + } else { + string methodName; + ObjectId objId; + Variant::Map inArgs; + Variant::Map callMap; + + try { + // conversions will throw if input is invalid. + objId = ObjectId(oid->second.asMap()); + methodName = mid->second.getString(); + + mid = inMap.find("_arguments"); + if (mid != inMap.end()) { + inArgs = (mid->second).asMap(); + } + + QPID_LOG(trace, "Invoking Method: name=" << methodName << " args=" << inArgs); + + boost::shared_ptr<ManagementObject> oPtr; + { + sys::Mutex::ScopedLock lock(agentLock); + ObjectMap::iterator iter = managementObjects.find(objId); + if (iter != managementObjects.end() && !iter->second->isDeleted()) + oPtr = iter->second; + } + + if (oPtr.get() == 0) { + sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT), + Manageable::STATUS_UNKNOWN_OBJECT); + failed = true; + } else { + oPtr->doMethod(methodName, inArgs, callMap, userId); + + if (callMap["_status_code"].asUint32() == 0) { + outMap["_arguments"] = Variant::Map(); + for (Variant::Map::const_iterator iter = callMap.begin(); + iter != callMap.end(); iter++) + if (iter->first != "_status_code" && iter->first != "_status_text") + outMap["_arguments"].asMap()[iter->first] = iter->second; + } else { + sendException(rte, rtk, cid, callMap["_status_text"], callMap["_status_code"]); + failed = true; + } + } + + } catch(types::InvalidConversion& e) { + sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION); + failed = true; + } + } + + if (!failed) { + Variant::Map headers; + headers["method"] = "response"; + headers["qmf.agent"] = name_address; + headers["qmf.opcode"] = "_method_response"; + QPID_LOG(trace, "SENT MethodResponse map=" << outMap); + MapCodec::encode(outMap, content); + connThreadBody.sendBuffer(content, cid, headers, rte, rtk); + } +} + +void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& rte, const string& rtk) +{ + moveNewObjectsLH(); + + Variant::Map inMap; + Variant::Map::const_iterator i; + Variant::Map headers; + + MapCodec::decode(body, inMap); + QPID_LOG(trace, "RCVD GetQuery: map=" << inMap << " cid=" << cid); + + headers["method"] = "response"; + headers["qmf.opcode"] = "_query_response"; + headers["qmf.agent"] = name_address; + headers["partial"] = Variant(); + + Variant::List list_; + Variant::Map map_; + Variant::Map values; + Variant::Map oidMap; + string content; + + /* + * Unpack the _what element of the query. Currently we only support OBJECT queries. + */ + i = inMap.find("_what"); + if (i == inMap.end()) { + sendException(rte, rtk, cid, "_what element missing in Query"); + return; + } + + if (i->second.getType() != qpid::types::VAR_STRING) { + sendException(rte, rtk, cid, "_what element is not a string"); + return; + } + + if (i->second.asString() == "OBJECT") { + headers["qmf.content"] = "_data"; + /* + * Unpack the _object_id element of the query if it is present. If it is present, find that one + * object and return it. If it is not present, send a class-based result. + */ + i = inMap.find("_object_id"); + if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) { + ObjectId objId(i->second.asMap()); + boost::shared_ptr<ManagementObject> object; + + { + sys::Mutex::ScopedLock lock(agentLock); + ObjectMap::iterator iter = managementObjects.find(objId); + if (iter != managementObjects.end()) + object = iter->second; + } + + if (object.get() != 0) { + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + object->mapEncodeValues(values, true, true); // write both stats and properties + objId.mapEncode(oidMap); + map_["_values"] = values; + map_["_object_id"] = oidMap; + object->writeTimestamps(map_); + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + object->getMd5Sum()); + list_.push_back(map_); + headers.erase("partial"); + + ListCodec::encode(list_, content); + connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk); + return; + } + } else { // match using schema_id, if supplied + + string className; + string packageName; + + i = inMap.find("_schema_id"); + if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) { + const Variant::Map& schemaIdMap(i->second.asMap()); + + Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name"); + if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING) + className = s_iter->second.asString(); + + s_iter = schemaIdMap.find("_package_name"); + if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING) + packageName = s_iter->second.asString(); + + typedef list<boost::shared_ptr<ManagementObject> > StageList; + StageList staging; + + { + sys::Mutex::ScopedLock lock(agentLock); + for (ObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second.get(); + if (object->getClassName() == className && + (packageName.empty() || object->getPackageName() == packageName)) + staging.push_back(iter->second); + } + } + + unsigned int objCount = 0; + for (StageList::iterator iter = staging.begin(); iter != staging.end(); iter++) { + ManagementObject* object = iter->get(); + if (object->getClassName() == className && + (packageName.empty() || object->getPackageName() == packageName)) { + + values.clear(); + oidMap.clear(); + map_.clear(); + + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + object->mapEncodeValues(values, true, true); // write both stats and properties + object->getObjectId().mapEncode(oidMap); + map_["_values"] = values; + map_["_object_id"] = oidMap; + object->writeTimestamps(map_); + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + object->getMd5Sum()); + list_.push_back(map_); + + if (++objCount >= maxV2ReplyObjs) { + objCount = 0; + ListCodec::encode(list_, content); + connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk); + content.clear(); + list_.clear(); + } + } + } + } + } + + // Send last "non-partial" message to indicate CommandComplete + headers.erase("partial"); + ListCodec::encode(list_, content); + connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (last message, no 'partial' indicator) to=" << rte << "/" << rtk); + + } else if (i->second.asString() == "SCHEMA_ID") { + headers["qmf.content"] = "_schema_id"; + /** + * @todo - support for a predicate. For now, send a list of all known schema class keys. + */ + for (PackageMap::iterator pIter = packages.begin(); + pIter != packages.end(); pIter++) { + for (ClassMap::iterator cIter = pIter->second.begin(); + cIter != pIter->second.end(); cIter++) { + + list_.push_back(mapEncodeSchemaId( pIter->first, + cIter->first.name, + cIter->first.hash, + cIter->second.kind )); + } + } + + headers.erase("partial"); + ListCodec::encode(list_, content); + connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (SchemaId) to=" << rte << "/" << rtk); + + } else { + // Unknown query target + sendException(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported"); + } +} + +void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& rte, const string& rtk) +{ + QPID_LOG(trace, "RCVD AgentLocateRequest"); + + Variant::Map map; + Variant::Map headers; + string content; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_agent_locate_response"; + headers["qmf.agent"] = name_address; + + getHeartbeatContent(map); + MapCodec::encode(map, content); + connThreadBody.sendBuffer(content, cid, headers, rte, rtk); + + QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << rte << "/" << rtk); + + { + sys::Mutex::ScopedLock lock(agentLock); + publishAllData = true; + } +} + +void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& rte, const string& rtk, const string& userId) +{ + if (extThread) { + sys::Mutex::ScopedLock lock(agentLock); + + methodQueue.push_back(new QueuedMethod(cid, rte, rtk, body, userId)); + if (pipeHandle != 0) { + pipeHandle->write("X", 1); + } else if (notifyable != 0) { + inCallback = true; + { + sys::Mutex::ScopedUnlock unlock(agentLock); + notifyable->notify(); + } + inCallback = false; + } else if (notifyCallback != 0) { + inCallback = true; + { + sys::Mutex::ScopedUnlock unlock(agentLock); + notifyCallback(notifyContext); + } + inCallback = false; + } + } else { + invokeMethodRequest(body, cid, rte, rtk, userId); + } + + QPID_LOG(trace, "RCVD MethodRequest"); +} + +void ManagementAgentImpl::received(Message& msg) +{ + string replyToExchange; + string replyToKey; + framing::MessageProperties mp = msg.getMessageProperties(); + if (mp.hasReplyTo()) { + const framing::ReplyTo& rt = mp.getReplyTo(); + replyToExchange = rt.getExchange(); + replyToKey = rt.getRoutingKey(); + } + + string userId; + if (mp.hasUserId()) + userId = mp.getUserId(); + + if (mp.hasAppId() && mp.getAppId() == "qmf2") + { + string opcode = mp.getApplicationHeaders().getAsString("qmf.opcode"); + string cid = msg.getMessageProperties().getCorrelationId(); + + if (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToExchange, replyToKey); + else if (opcode == "_method_request") handleMethodRequest(msg.getData(), cid, replyToExchange, replyToKey, userId); + else if (opcode == "_query_request") handleGetQuery(msg.getData(), cid, replyToExchange, replyToKey); + else { + QPID_LOG(warning, "Support for QMF V2 Opcode [" << opcode << "] TBD!!!"); + } + return; + } + + // old preV2 binary messages + + uint32_t sequence; + string data = msg.getData(); + Buffer inBuffer(const_cast<char*>(data.c_str()), data.size()); + uint8_t opcode; + + + if (checkHeader(inBuffer, &opcode, &sequence)) + { + if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey); + else if (opcode == 'x') handleConsoleAddedIndication(); + else + QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode)); + } +} + + +void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq) +{ + buf.putOctet('A'); + buf.putOctet('M'); + buf.putOctet('2'); + buf.putOctet(opcode); + buf.putLong (seq); +} + +Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname, + const string& cname, + const uint8_t *md5Sum, + uint8_t type) +{ + Variant::Map map_; + + map_["_package_name"] = pname; + map_["_class_name"] = cname; + map_["_hash"] = types::Uuid(md5Sum); + if (type == ManagementItem::CLASS_KIND_EVENT) + map_["_type"] = "_event"; + else + map_["_type"] = "_data"; + + return map_; +} + + +bool ManagementAgentImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) +{ + if (buf.getSize() < 8) + return false; + + uint8_t h1 = buf.getOctet(); + uint8_t h2 = buf.getOctet(); + uint8_t h3 = buf.getOctet(); + + *opcode = buf.getOctet(); + *seq = buf.getLong(); + + return h1 == 'A' && h2 == 'M' && h3 == '2'; +} + +ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::findOrAddPackage(const string& name) +{ + PackageMap::iterator pIter = packages.find(name); + if (pIter != packages.end()) + return pIter; + + // No such package found, create a new map entry. + pair<PackageMap::iterator, bool> result = + packages.insert(pair<string, ClassMap>(name, ClassMap())); + + return result.first; +} + +void ManagementAgentImpl::moveNewObjectsLH() +{ + sys::Mutex::ScopedLock lock(addLock); + for (ObjectMap::iterator iter = newManagementObjects.begin(); + iter != newManagementObjects.end(); + iter++) + managementObjects[iter->first] = iter->second; + newManagementObjects.clear(); +} + +void ManagementAgentImpl::addClassLocal(uint8_t classKind, + PackageMap::iterator pIter, + const string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) +{ + SchemaClassKey key; + ClassMap& cMap = pIter->second; + + key.name = className; + memcpy(&key.hash, md5Sum, 16); + + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end()) + return; + + // No such class found, create a new class with local information. + cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall, classKind))); + schemaTimestamp = Duration(EPOCH, now()); + QPID_LOG(trace, "Updated schema timestamp, now=" << uint64_t(schemaTimestamp)); +} + +void ManagementAgentImpl::encodePackageIndication(Buffer& buf, + PackageMap::iterator pIter) +{ + buf.putShortString((*pIter).first); + + QPID_LOG(trace, "SENT PackageInd: package=" << (*pIter).first); +} + +void ManagementAgentImpl::encodeClassIndication(Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter) +{ + SchemaClassKey key = (*cIter).first; + + buf.putOctet((*cIter).second.kind); + buf.putShortString((*pIter).first); + buf.putShortString(key.name); + buf.putBin128(key.hash); + + QPID_LOG(trace, "SENT ClassInd: package=" << (*pIter).first << " class=" << key.name); +} + +struct MessageItem { + string content; + Variant::Map headers; + string key; + MessageItem(const Variant::Map& h, const string& k) : headers(h), key(k) {} +}; + +void ManagementAgentImpl::periodicProcessing() +{ + string addr_key_base = "agent.ind.data."; + list<ObjectId> deleteList; + list<boost::shared_ptr<MessageItem> > message_list; + + sendHeartbeat(); + + { + sys::Mutex::ScopedLock lock(agentLock); + + if (!connected) + return; + + moveNewObjectsLH(); + + // + // Clear the been-here flag on all objects in the map. + // + for (ObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second.get(); + object->setFlags(0); + if (publishAllData) { + object->setForcePublish(true); + } + } + + publishAllData = false; + + // + // Process the entire object map. + // + uint32_t v2Objs = 0; + + for (ObjectMap::iterator baseIter = managementObjects.begin(); + baseIter != managementObjects.end(); + baseIter++) { + ManagementObject* baseObject = baseIter->second.get(); + + // + // Skip until we find a base object requiring a sent message. + // + if (baseObject->getFlags() == 1 || + (!baseObject->getConfigChanged() && + !baseObject->getInstChanged() && + !baseObject->getForcePublish() && + !baseObject->isDeleted())) + continue; + + std::string packageName = baseObject->getPackageName(); + std::string className = baseObject->getClassName(); + + Variant::List list_; + std::stringstream addr_key; + Variant::Map headers; + + addr_key << addr_key_base; + addr_key << keyifyNameStr(packageName) + << "." << keyifyNameStr(className) + << "." << vendorNameKey + << "." << productNameKey + << "." << instanceNameKey; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + for (ObjectMap::iterator iter = baseIter; + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second.get(); + bool send_stats, send_props; + if (baseObject->isSameClass(*object) && object->getFlags() == 0) { + object->setFlags(1); + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()); + send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish())); + + if (send_stats || send_props) { + Variant::Map map_; + Variant::Map values; + Variant::Map oid; + + object->getObjectId().mapEncode(oid); + map_["_object_id"] = oid; + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + object->getMd5Sum()); + object->writeTimestamps(map_); + object->mapEncodeValues(values, send_props, send_stats); + map_["_values"] = values; + list_.push_back(map_); + + if (++v2Objs >= maxV2ReplyObjs) { + v2Objs = 0; + boost::shared_ptr<MessageItem> item(new MessageItem(headers, addr_key.str())); + ListCodec::encode(list_, item->content); + message_list.push_back(item); + list_.clear(); + } + } + + if (object->isDeleted()) + deleteList.push_back(iter->first); + object->setForcePublish(false); + } + } + + if (!list_.empty()) { + boost::shared_ptr<MessageItem> item(new MessageItem(headers, addr_key.str())); + ListCodec::encode(list_, item->content); + message_list.push_back(item); + } + } + + // Delete flagged objects + for (list<ObjectId>::reverse_iterator iter = deleteList.rbegin(); + iter != deleteList.rend(); + iter++) + managementObjects.erase(*iter); + } + + while (!message_list.empty()) { + boost::shared_ptr<MessageItem> item(message_list.front()); + message_list.pop_front(); + connThreadBody.sendBuffer(item->content, "", item->headers, topicExchange, item->key, "amqp/list"); + QPID_LOG(trace, "SENT DataIndication"); + } +} + + +void ManagementAgentImpl::getHeartbeatContent(qpid::types::Variant::Map& map) +{ + map["_values"] = attrMap; + map["_values"].asMap()["_timestamp"] = uint64_t(Duration(EPOCH, now())); + map["_values"].asMap()["_heartbeat_interval"] = interval; + map["_values"].asMap()["_epoch"] = bootSequence; + map["_values"].asMap()["_schema_updated"] = uint64_t(schemaTimestamp); +} + +void ManagementAgentImpl::ConnectionThread::run() +{ + static const int delayMin(1); + static const int delayMax(128); + static const int delayFactor(2); + int delay(delayMin); + string dest("qmfagent"); + ConnectionThread::shared_ptr tmp; + + sessionId.generate(); + queueName << "qmfagent-" << sessionId; + + while (true) { + try { + if (agent.initialized) { + QPID_LOG(debug, "QMF Agent attempting to connect to the broker..."); + connection.open(agent.connectionSettings); + session = connection.newSession(queueName.str()); + subscriptions.reset(new client::SubscriptionManager(session)); + + session.queueDeclare(arg::queue=queueName.str(), arg::autoDelete=true, + arg::exclusive=true); + session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(), + arg::bindingKey=queueName.str()); + session.exchangeBind(arg::exchange=agent.directExchange, arg::queue=queueName.str(), + arg::bindingKey=agent.name_address); + session.exchangeBind(arg::exchange=agent.topicExchange, arg::queue=queueName.str(), + arg::bindingKey="console.#"); + + subscriptions->subscribe(agent, queueName.str(), dest); + QPID_LOG(info, "Connection established with broker"); + { + sys::Mutex::ScopedLock _lock(connLock); + if (shutdown) + return; + operational = true; + agent.connected = true; + agent.startProtocol(); + try { + sys::Mutex::ScopedUnlock _unlock(connLock); + subscriptions->run(); + } catch (exception) {} + + QPID_LOG(warning, "Connection to the broker has been lost"); + + operational = false; + agent.connected = false; + tmp = subscriptions; + subscriptions.reset(); + } + tmp.reset(); // frees the subscription outside the lock + delay = delayMin; + connection.close(); + } + } catch (exception &e) { + if (delay < delayMax) + delay *= delayFactor; + QPID_LOG(debug, "Connection failed: exception=" << e.what()); + } + + { + // sleep for "delay" seconds, but peridically check if the + // agent is shutting down so we don't hang for up to delayMax + // seconds during agent shutdown + sys::Mutex::ScopedLock _lock(connLock); + if (shutdown) + return; + sleeping = true; + int totalSleep = 0; + do { + sys::Mutex::ScopedUnlock _unlock(connLock); + ::sleep(delayMin); + totalSleep += delayMin; + } while (totalSleep < delay && !shutdown); + sleeping = false; + if (shutdown) + return; + } + } +} + +ManagementAgentImpl::ConnectionThread::~ConnectionThread() +{ +} + +void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, + uint32_t length, + const string& exchange, + const string& routingKey) +{ + Message msg; + string data; + + buf.getRawData(data, length); + msg.setData(data); + sendMessage(msg, exchange, routingKey); +} + + + +void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data, + const string& cid, + const Variant::Map headers, + const string& exchange, + const string& routingKey, + const string& contentType, + uint64_t ttl_msec) +{ + Message msg; + Variant::Map::const_iterator i; + + if (!cid.empty()) + msg.getMessageProperties().setCorrelationId(cid); + + if (!contentType.empty()) + msg.getMessageProperties().setContentType(contentType); + + if (ttl_msec) + msg.getDeliveryProperties().setTtl(ttl_msec); + + for (i = headers.begin(); i != headers.end(); ++i) { + msg.getHeaders().setString(i->first, i->second.asString()); + } + + msg.setData(data); + sendMessage(msg, exchange, routingKey); +} + + + + + +void ManagementAgentImpl::ConnectionThread::sendMessage(Message msg, + const string& exchange, + const string& routingKey) +{ + ConnectionThread::shared_ptr s; + { + sys::Mutex::ScopedLock _lock(connLock); + if (!operational) + return; + s = subscriptions; + } + + msg.getDeliveryProperties().setRoutingKey(routingKey); + msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); + msg.getMessageProperties().getApplicationHeaders().setString("qmf.agent", agent.name_address); + msg.getMessageProperties().setAppId("qmf2"); + try { + session.messageTransfer(arg::content=msg, arg::destination=exchange); + } catch(exception& e) { + QPID_LOG(error, "Exception caught in sendMessage: " << e.what()); + // Bounce the connection + if (s) + s->stop(); + } +} + + + +void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank) +{ + stringstream key; + key << "agent." << brokerBank << "." << agentBank; + session.exchangeBind(arg::exchange="qpid.management", arg::queue=queueName.str(), + arg::bindingKey=key.str()); +} + +void ManagementAgentImpl::ConnectionThread::close() +{ + ConnectionThread::shared_ptr s; + { + sys::Mutex::ScopedLock _lock(connLock); + shutdown = true; + s = subscriptions; + } + if (s) + s->stop(); +} + +bool ManagementAgentImpl::ConnectionThread::isSleeping() const +{ + sys::Mutex::ScopedLock _lock(connLock); + return sleeping; +} + + +void ManagementAgentImpl::PublishThread::run() +{ + uint16_t totalSleep; + + while (!shutdown) { + agent.periodicProcessing(); + totalSleep = 0; + while (totalSleep++ < agent.getInterval() && !shutdown) { + ::sleep(1); + } + } +} |