diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/acl/Acl.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.cpp | 673 | ||||
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.h | 42 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/broker/System.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 1405 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 53 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementDirectExchange.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementObject.cpp | 224 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementTopicExchange.cpp | 15 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 6 | ||||
-rw-r--r-- | cpp/src/tests/ManagementTest.cpp | 22 |
17 files changed, 1963 insertions, 522 deletions
diff --git a/cpp/src/qpid/acl/Acl.cpp b/cpp/src/qpid/acl/Acl.cpp index 21a9e2055e..e510920f6c 100644 --- a/cpp/src/qpid/acl/Acl.cpp +++ b/cpp/src/qpid/acl/Acl.cpp @@ -23,6 +23,7 @@ #include "qpid/Plugin.h" #include "qpid/Options.h" #include "qpid/log/Logger.h" +#include "qpid/types/Variant.h" #include "qmf/org/apache/qpid/acl/Package.h" #include "qmf/org/apache/qpid/acl/EventAllow.h" #include "qmf/org/apache/qpid/acl/EventDeny.h" @@ -94,7 +95,7 @@ Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(fals " ObjectType:" << AclHelper::getObjectTypeStr(objType) << " Name:" << name ); agent->raiseEvent(_qmf::EventAllow(id, AclHelper::getActionStr(action), AclHelper::getObjectTypeStr(objType), - name, framing::FieldTable())); + name, types::Variant::Map())); case ALLOW: return true; case DENY: @@ -106,7 +107,7 @@ Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(fals QPID_LOG(info, "ACL Deny id:" << id << " action:" << AclHelper::getActionStr(action) << " ObjectType:" << AclHelper::getObjectTypeStr(objType) << " Name:" << name); agent->raiseEvent(_qmf::EventDeny(id, AclHelper::getActionStr(action), AclHelper::getObjectTypeStr(objType), - name, framing::FieldTable())); + name, types::Variant::Map())); return false; } return false; diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 637645bb04..42bc36c4b8 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -22,7 +22,7 @@ #include "qpid/management/ManagementObject.h" #include "qpid/log/Statement.h" #include "qpid/agent/ManagementAgentImpl.h" -#include "qpid/sys/Mutex.h" +#include "qpid/amqp_0_10/Codecs.h" #include <list> #include <string.h> #include <stdlib.h> @@ -41,6 +41,9 @@ using std::ofstream; using std::ifstream; using std::string; using std::endl; +using qpid::types::Variant; +using qpid::amqp_0_10::MapCodec; +using qpid::amqp_0_10::ListCodec; namespace { Mutex lock; @@ -81,7 +84,7 @@ const string ManagementAgentImpl::storeMagicNumber("MA02"); ManagementAgentImpl::ManagementAgentImpl() : interval(10), extThread(false), pipeHandle(0), notifyCallback(0), notifyContext(0), notifyable(0), inCallback(false), - initialized(false), connected(false), lastFailure("never connected"), + initialized(false), connected(false), useMapMsg(false), lastFailure("never connected"), clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0), assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0), connThreadBody(*this), connThread(connThreadBody), @@ -117,6 +120,21 @@ ManagementAgentImpl::~ManagementAgentImpl() } } +void ManagementAgentImpl::setName(const string& vendor, const string& product, const string& instance) +{ + attrMap["_vendor"] = vendor; + attrMap["_product"] = product; + string inst; + if (instance.empty()) { + inst = qpid::types::Uuid(true).str(); + } else + inst = instance; + + name_address = vendor + ":" + product + ":" + inst; + attrMap["_instance"] = inst; + attrMap["_name"] = name_address; +} + void ManagementAgentImpl::init(const string& brokerHost, uint16_t brokerPort, uint16_t intervalSeconds, @@ -140,7 +158,7 @@ void ManagementAgentImpl::init(const string& brokerHost, void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings, uint16_t intervalSeconds, bool useExternalThread, - const std::string& _storeFile) + const string& _storeFile) { interval = intervalSeconds; extThread = useExternalThread; @@ -157,13 +175,16 @@ void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings, bootSequence = 1; storeData(true); + if (attrMap.empty()) + setName("vendor", "product"); + initialized = true; } void ManagementAgentImpl::registerClass(const string& packageName, const string& className, uint8_t* md5Sum, - qpid::management::ManagementObject::writeSchemaCall_t schemaCall) + ManagementObject::writeSchemaCall_t schemaCall) { Mutex::ScopedLock lock(agentLock); PackageMap::iterator pIter = findOrAddPackage(packageName); @@ -173,49 +194,77 @@ void ManagementAgentImpl::registerClass(const string& packageName, void ManagementAgentImpl::registerEvent(const string& packageName, const string& eventName, uint8_t* md5Sum, - qpid::management::ManagementObject::writeSchemaCall_t schemaCall) + ManagementObject::writeSchemaCall_t schemaCall) { Mutex::ScopedLock lock(agentLock); PackageMap::iterator pIter = findOrAddPackage(packageName); addClassLocal(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall); } +// old-style add object: 64bit id - deprecated ObjectId ManagementAgentImpl::addObject(ManagementObject* object, uint64_t persistId) { + std::string key; + if (persistId) { + key = boost::lexical_cast<std::string>(persistId); + } + return addObject(object, key, persistId != 0); +} + + +// new style add object - use this approach! +ObjectId ManagementAgentImpl::addObject(ManagementObject* object, + const std::string& key, + bool persistent) +{ Mutex::ScopedLock lock(addLock); - uint16_t sequence = persistId ? 0 : bootSequence; - uint64_t objectNum = persistId ? persistId : nextObjectId++; - ObjectId objectId(&attachment, 0, sequence, objectNum); + uint16_t sequence = persistent ? 0 : bootSequence; + + ObjectId objectId(&attachment, 0, sequence); + if (key.empty()) + objectId.setV2Key(*object); // let object generate the key + else + objectId.setV2Key(key); - // TODO: fix object-id handling object->setObjectId(objectId); newManagementObjects[objectId] = object; return objectId; } + void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t severity) { Mutex::ScopedLock lock(agentLock); Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); - uint32_t outLen; uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; stringstream key; key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." << event.getPackageName() << "." << event.getEventName(); - encodeHeader(outBuffer, 'e'); - outBuffer.putShortString(event.getPackageName()); - outBuffer.putShortString(event.getEventName()); - outBuffer.putBin128(event.getMd5Sum()); - outBuffer.putLongLong(uint64_t(Duration(now()))); - outBuffer.putOctet(sev); - event.encode(outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", key.str()); + Variant::Map map_; + Variant::Map schemaId; + Variant::Map values; + Variant::Map headers; + string content; + + map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(), + event.getEventName(), + event.getMd5Sum()); + event.mapEncode(values); + map_["_values"] = values; + map_["_timestamp"] = uint64_t(Duration(now())); + map_["_severity"] = sev; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_event"; + headers["qmf.agent"] = name_address; + + MapCodec::encode(map_, content); + connThreadBody.sendBuffer(content, "", headers, "qmf.default.topic", key.str()); } uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) @@ -235,8 +284,7 @@ uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) methodQueue.pop_front(); { Mutex::ScopedUnlock unlock(agentLock); - Buffer inBuffer(const_cast<char*>(item->body.c_str()), item->body.size()); - invokeMethodRequest(inBuffer, item->sequence, item->replyTo); + invokeMethodRequest(item->body, item->cid, item->replyTo); delete item; } } @@ -274,20 +322,7 @@ void ManagementAgentImpl::setSignalCallback(Notifyable& _notifyable) void ManagementAgentImpl::startProtocol() { - char rawbuffer[512]; - Buffer buffer(rawbuffer, 512); - - connected = true; - encodeHeader(buffer, 'A'); - buffer.putShortString("RemoteAgent [C++]"); - systemId.encode (buffer); - buffer.putLong(requestedBrokerBank); - buffer.putLong(requestedAgentBank); - uint32_t length = buffer.getPosition(); - buffer.reset(); - connThreadBody.sendBuffer(buffer, length, "qpid.management", "broker"); - QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank << - " reqAgent=" << requestedAgentBank); + sendHeartbeat(); } void ManagementAgentImpl::storeData(bool requested) @@ -323,76 +358,54 @@ void ManagementAgentImpl::retrieveData() } } -void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequence, - uint32_t code, string text) +void ManagementAgentImpl::sendHeartbeat() { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + static const string addr_exchange("qmf.default.topic"); + static const string addr_key("agent.ind.heartbeat"); - encodeHeader(outBuffer, 'z', sequence); - outBuffer.putLong(code); - outBuffer.putShortString(text); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyToKey); - QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text); -} + Variant::Map map; + Variant::Map headers; + string content; -void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) -{ - Mutex::ScopedLock lock(agentLock); + headers["method"] = "indication"; + headers["qmf.opcode"] = "_agent_heartbeat_indication"; + headers["qmf.agent"] = name_address; - assignedBrokerBank = inBuffer.getLong(); - assignedAgentBank = inBuffer.getLong(); + map["_values"] = attrMap; + map["_values"].asMap()["timestamp"] = uint64_t(Duration(now())); + map["_values"].asMap()["heartbeat_interval"] = interval; - QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank); + MapCodec::encode(map, content); + connThreadBody.sendBuffer(content, "", headers, addr_exchange, addr_key); - if ((assignedBrokerBank != requestedBrokerBank) || - (assignedAgentBank != requestedAgentBank)) { - if (requestedAgentBank == 0) { - QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." << - assignedAgentBank); - } else { - QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank << - "." << assignedAgentBank); - } - storeData(); - requestedBrokerBank = assignedBrokerBank; - requestedAgentBank = assignedAgentBank; - } + QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); +} - attachment.setBanks(assignedBrokerBank, assignedAgentBank); +void ManagementAgentImpl::sendException(const string& replyToKey, const string& cid, + const string& text, uint32_t code) +{ + static const string addr_exchange("qmf.default.direct"); - // Bind to qpid.management to receive commands - connThreadBody.bindToBank(assignedBrokerBank, assignedAgentBank); + Variant::Map map; + Variant::Map headers; + Variant::Map values; + string content; - // Send package indications for all local packages - for (PackageMap::iterator pIter = packages.begin(); - pIter != packages.end(); - pIter++) { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + headers["method"] = "indication"; + headers["qmf.opcode"] = "_exception"; + headers["qmf.agent"] = name_address; - encodeHeader(outBuffer, 'p'); - encodePackageIndication(outBuffer, pIter); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); + values["error_code"] = code; + values["error_text"] = text; + map["_values"] = values; - // Send class indications for all local classes - ClassMap cMap = pIter->second; - for (ClassMap::iterator cIter = cMap.begin(); cIter != cMap.end(); cIter++) { - outBuffer.reset(); - encodeHeader(outBuffer, 'q'); - encodeClassIndication(outBuffer, pIter, cIter); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); - } - } + MapCodec::encode(map, content); + connThreadBody.sendBuffer(content, cid, headers, addr_exchange, replyToKey); + + QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text); } -void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence) +void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo) { Mutex::ScopedLock lock(agentLock); string packageName; @@ -412,12 +425,14 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc SchemaClass& schema = cIter->second; Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; + string body; encodeHeader(outBuffer, 's', sequence); - schema.writeSchemaCall(outBuffer); + schema.writeSchemaCall(body); + outBuffer.putRawData(body); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); + connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name); } @@ -432,124 +447,250 @@ void ManagementAgentImpl::handleConsoleAddedIndication() QPID_LOG(trace, "RCVD ConsoleAddedInd"); } -void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo) +void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& cid, const string& replyTo) { - string methodName; - string packageName; - string className; - uint8_t hash[16]; - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + string methodName; + bool failed = false; + Variant::Map inMap; + Variant::Map outMap; + Variant::Map::const_iterator oid, mid; + string content; + + MapCodec::decode(body, inMap); + + outMap["_values"] = Variant::Map(); + + if ((oid = inMap.find("_object_id")) == inMap.end() || + (mid = inMap.find("_method_name")) == inMap.end()) { + (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_PARAMETER_INVALID; + (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID); + failed = true; + } else { + string methodName; + ObjectId objId; + Variant::Map inArgs; + Variant::Map callMap; - ObjectId objId(inBuffer); - inBuffer.getShortString(packageName); - inBuffer.getShortString(className); - inBuffer.getBin128(hash); - inBuffer.getShortString(methodName); + try { + // conversions will throw if input is invalid. + objId = ObjectId(oid->second.asMap()); + methodName = mid->second.getString(); - encodeHeader(outBuffer, 'm', sequence); + mid = inMap.find("_arguments"); + if (mid != inMap.end()) { + inArgs = (mid->second).asMap(); + } - ManagementObjectMap::iterator iter = managementObjects.find(objId); - if (iter == managementObjects.end() || iter->second->isDeleted()) { - outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); - outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT)); - } else { - if ((iter->second->getPackageName() != packageName) || - (iter->second->getClassName() != className)) { - outBuffer.putLong (Manageable::STATUS_PARAMETER_INVALID); - outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID)); - } - else - try { - outBuffer.record(); - iter->second->doMethod(methodName, inBuffer, outBuffer); - } catch(exception& e) { - outBuffer.restore(); - outBuffer.putLong(Manageable::STATUS_EXCEPTION); - outBuffer.putMediumString(e.what()); + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter == managementObjects.end() || iter->second->isDeleted()) { + (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_UNKNOWN_OBJECT; + (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT); + failed = true; + } else { + iter->second->doMethod(methodName, inArgs, callMap); } + + if (callMap["_status_code"].asUint32() == 0) { + outMap["_arguments"] = Variant::Map(); + for (Variant::Map::const_iterator iter = callMap.begin(); + iter != callMap.end(); iter++) + if (iter->first != "_status_code" && iter->first != "_status_text") + outMap["_arguments"].asMap()[iter->first] = iter->second; + } else { + (outMap["_values"].asMap())["_status_code"] = callMap["_status_code"]; + (outMap["_values"].asMap())["_status_text"] = callMap["_status_text"]; + failed = true; + } + + } catch(types::InvalidConversion& e) { + outMap.clear(); + outMap["_values"] = Variant::Map(); + (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_EXCEPTION; + (outMap["_values"].asMap())["_status_text"] = e.what(); + failed = true; + } + } + + Variant::Map headers; + headers["method"] = "response"; + headers["qmf.agent"] = name_address; + if (failed) { + headers["qmf.opcode"] = "_exception"; + QPID_LOG(trace, "SENT Exception map=" << outMap); + } else { + headers["qmf.opcode"] = "_method_response"; + QPID_LOG(trace, "SENT MethodResponse map=" << outMap); } - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); + MapCodec::encode(outMap, content); + connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo); } -void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, string replyTo) +void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& replyTo) { - FieldTable ft; - FieldTable::ValuePtr value; - moveNewObjectsLH(); - ft.decode(inBuffer); + Variant::Map inMap; + Variant::Map::const_iterator i; + Variant::Map headers; + + MapCodec::decode(body, inMap); + QPID_LOG(trace, "RCVD GetQuery: map=" << inMap << " cid=" << cid); + + headers["method"] = "response"; + headers["qmf.opcode"] = "_query_response"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + headers["partial"] = Variant(); + + Variant::List list_; + Variant::Map map_; + Variant::Map values; + Variant::Map oidMap; + string content; + + /* + * Unpack the _what element of the query. Currently we only support OBJECT queries. + */ + i = inMap.find("_what"); + if (i == inMap.end()) { + sendException(replyTo, cid, "_what element missing in Query"); + return; + } - QPID_LOG(trace, "RCVD GetQuery: map=" << ft); + if (i->second.getType() != qpid::types::VAR_STRING) { + sendException(replyTo, cid, "_what element is not a string"); + return; + } - value = ft.get("_class"); - if (value.get() == 0 || !value->convertsTo<string>()) { - value = ft.get("_objectid"); - if (value.get() == 0 || !value->convertsTo<string>()) - return; + if (i->second.asString() != "OBJECT") { + sendException(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported"); + return; + } + + string className; + string packageName; + + /* + * Handle the _schema_id element, if supplied. + */ + i = inMap.find("_schema_id"); + if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) { + const Variant::Map& schemaIdMap(i->second.asMap()); + + Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name"); + if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING) + className = s_iter->second.asString(); - ObjectId selector(value->get<string>()); - ManagementObjectMap::iterator iter = managementObjects.find(selector); + s_iter = schemaIdMap.find("_package_name"); + if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING) + packageName = s_iter->second.asString(); + } + + /* + * Unpack the _object_id element of the query if it is present. If it is present, find that one + * object and return it. If it is not present, send a class-based result. + */ + i = inMap.find("_object_id"); + if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) { + ObjectId objId(i->second.asMap()); + + ManagementObjectMap::iterator iter = managementObjects.find(objId); if (iter != managementObjects.end()) { ManagementObject* object = iter->second; - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); - encodeHeader(outBuffer, 'g', sequence); - object->writeProperties(outBuffer); - object->writeStatistics(outBuffer, true); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); + object->mapEncodeValues(values, true, true); // write both stats and properties + objId.mapEncode(oidMap); + map_["_values"] = values; + map_["_object_id"] = oidMap; + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + object->getMd5Sum()); + list_.push_back(map_); + headers.erase("partial"); + + ListCodec::encode(list_, content); + connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo); + return; + } + } else { + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second; + if (object->getClassName() == className && + (packageName.empty() || object->getPackageName() == packageName)) { + + // @todo support multiple object reply per message + values.clear(); + list_.clear(); + oidMap.clear(); + + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); - QPID_LOG(trace, "SENT ObjectInd"); + object->mapEncodeValues(values, true, true); // write both stats and properties + iter->first.mapEncode(oidMap); + map_["_values"] = values; + map_["_object_id"] = oidMap; + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + object->getMd5Sum()); + list_.push_back(map_); + + ListCodec::encode(list_, content); + connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo); + } } - sendCommandComplete(replyTo, sequence); - return; } - string className(value->get<string>()); + // end empty "non-partial" message to indicate CommandComplete + list_.clear(); + headers.erase("partial"); + ListCodec::encode(list_, content); + connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo); +} - for (ManagementObjectMap::iterator iter = managementObjects.begin(); - iter != managementObjects.end(); - iter++) { - ManagementObject* object = iter->second; - if (object->getClassName() == className) { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; +void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo) +{ + QPID_LOG(trace, "RCVD AgentLocateRequest"); + static const string addr_exchange("qmf.default.direct"); - if (object->getConfigChanged() || object->getInstChanged()) - object->setUpdateTime(); + Variant::Map map; + Variant::Map headers; + string content; - encodeHeader(outBuffer, 'g', sequence); - object->writeProperties(outBuffer); - object->writeStatistics(outBuffer, true); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); + headers["method"] = "indication"; + headers["qmf.opcode"] = "_agent_locate_response"; + headers["qmf.agent"] = name_address; - QPID_LOG(trace, "SENT ObjectInd"); - } - } + map["_values"] = attrMap; + map["_values"].asMap()["timestamp"] = uint64_t(Duration(now())); + map["_values"].asMap()["heartbeat_interval"] = interval; + + MapCodec::encode(map, content); + connThreadBody.sendBuffer(content, cid, headers, addr_exchange, replyTo); + + QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo); - sendCommandComplete(replyTo, sequence); + { + Mutex::ScopedLock lock(agentLock); + clientWasAdded = true; + } } -void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo) +void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& replyTo) { if (extThread) { Mutex::ScopedLock lock(agentLock); - string body; - inBuffer.getRawData(body, inBuffer.available()); - methodQueue.push_back(new QueuedMethod(sequence, replyTo, body)); + methodQueue.push_back(new QueuedMethod(cid, replyTo, body)); if (pipeHandle != 0) { pipeHandle->write("X", 1); } else if (notifyable != 0) { @@ -568,7 +709,7 @@ void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequenc inCallback = false; } } else { - invokeMethodRequest(inBuffer, sequence, replyTo); + invokeMethodRequest(body, cid, replyTo); } QPID_LOG(trace, "RCVD MethodRequest"); @@ -576,28 +717,45 @@ void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequenc void ManagementAgentImpl::received(Message& msg) { + string replyToKey; + framing::MessageProperties mp = msg.getMessageProperties(); + if (mp.hasReplyTo()) { + const framing::ReplyTo& rt = mp.getReplyTo(); + replyToKey = rt.getRoutingKey(); + } + + if (mp.hasAppId() && mp.getAppId() == "qmf2") + { + string opcode = mp.getApplicationHeaders().getAsString("qmf.opcode"); + string cid = msg.getMessageProperties().getCorrelationId(); + + if (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToKey); + else if (opcode == "_method_request") handleMethodRequest(msg.getData(), cid, replyToKey); + else if (opcode == "_query_request") handleGetQuery(msg.getData(), cid, replyToKey); + else { + QPID_LOG(warning, "Support for QMF V2 Opcode [" << opcode << "] TBD!!!"); + } + return; + } + + // old preV2 binary messages + + uint32_t sequence; string data = msg.getData(); Buffer inBuffer(const_cast<char*>(data.c_str()), data.size()); uint8_t opcode; - uint32_t sequence; - string replyToKey; - framing::MessageProperties p = msg.getMessageProperties(); - if (p.hasReplyTo()) { - const framing::ReplyTo& rt = p.getReplyTo(); - replyToKey = rt.getRoutingKey(); - } if (checkHeader(inBuffer, &opcode, &sequence)) { - if (opcode == 'a') handleAttachResponse(inBuffer); - else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence); + if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToKey); else if (opcode == 'x') handleConsoleAddedIndication(); - else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey); - else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey); + else + QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode)); } } + void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq) { buf.putOctet('A'); @@ -607,6 +765,19 @@ void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq buf.putLong (seq); } +Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname, + const string& cname, + const uint8_t *md5Sum) +{ + Variant::Map map_; + + map_["_package_name"] = pname; + map_["_class_name"] = cname; + map_["_hash"] = types::Uuid(md5Sum); + return map_; +} + + bool ManagementAgentImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) { if (buf.getSize() < 8) @@ -661,7 +832,7 @@ void ManagementAgentImpl::addClassLocal(uint8_t classKind, PackageMap::iterator pIter, const string& className, uint8_t* md5Sum, - qpid::management::ManagementObject::writeSchemaCall_t schemaCall) + ManagementObject::writeSchemaCall_t schemaCall) { SchemaClassKey key; ClassMap& cMap = pIter->second; @@ -701,10 +872,7 @@ void ManagementAgentImpl::encodeClassIndication(Buffer& buf, void ManagementAgentImpl::periodicProcessing() { -#define BUFSIZE 65536 Mutex::ScopedLock lock(agentLock); - char msgChars[BUFSIZE]; - uint32_t contentSize; list<pair<ObjectId, ManagementObject*> > deleteList; if (!connected) @@ -745,42 +913,53 @@ void ManagementAgentImpl::periodicProcessing() !baseObject->isDeleted())) continue; - Buffer msgBuffer(msgChars, BUFSIZE); + Variant::List list_; + for (ManagementObjectMap::iterator iter = baseIter; iter != managementObjects.end(); iter++) { ManagementObject* object = iter->second; + bool send_stats, send_props; if (baseObject->isSameClass(*object) && object->getFlags() == 0) { object->setFlags(1); if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); - if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) { - encodeHeader(msgBuffer, 'c'); - object->writeProperties(msgBuffer); - } - - if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) { - encodeHeader(msgBuffer, 'i'); - object->writeStatistics(msgBuffer); + send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()); + send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish())); + + if (send_stats || send_props) { + Variant::Map map_; + Variant::Map values; + Variant::Map oid; + + object->getObjectId().mapEncode(oid); + map_["_object_id"] = oid; + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + object->getMd5Sum()); + object->mapEncodeValues(values, send_props, send_stats); + map_["_values"] = values; + list_.push_back(map_); } if (object->isDeleted()) deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object)); object->setForcePublish(false); - - if (msgBuffer.available() < (BUFSIZE / 2)) - break; } } - contentSize = BUFSIZE - msgBuffer.available(); - if (contentSize > 0) { - msgBuffer.reset(); - stringstream key; - key << "console.obj." << assignedBrokerBank << "." << assignedAgentBank << "." << - baseObject->getPackageName() << "." << baseObject->getClassName(); - connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str()); + string content; + ListCodec::encode(list_, content); + if (content.length()) { + Variant::Map headers; + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + connThreadBody.sendBuffer(content, "", headers, "qmf.default.topic", "agent.ind.data", "amqp/list"); + QPID_LOG(trace, "SENT DataIndication"); } } @@ -793,18 +972,7 @@ void ManagementAgentImpl::periodicProcessing() } deleteList.clear(); - - { - Buffer msgBuffer(msgChars, BUFSIZE); - encodeHeader(msgBuffer, 'h'); - msgBuffer.putLongLong(uint64_t(Duration(now()))); - stringstream key; - key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank; - - contentSize = BUFSIZE - msgBuffer.available(); - msgBuffer.reset(); - connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str()); - } + sendHeartbeat(); } void ManagementAgentImpl::ConnectionThread::run() @@ -831,6 +999,10 @@ void ManagementAgentImpl::ConnectionThread::run() arg::exclusive=true); session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(), arg::bindingKey=queueName.str()); + session.exchangeBind(arg::exchange="qmf.default.direct", arg::queue=queueName.str(), + arg::bindingKey=agent.name_address); + session.exchangeBind(arg::exchange="qmf.default.topic", arg::queue=queueName.str(), + arg::bindingKey="console.#"); subscriptions->subscribe(agent, queueName.str(), dest); QPID_LOG(info, "Connection established with broker"); @@ -839,6 +1011,7 @@ void ManagementAgentImpl::ConnectionThread::run() if (shutdown) return; operational = true; + agent.connected = true; agent.startProtocol(); try { Mutex::ScopedUnlock _unlock(connLock); @@ -892,6 +1065,48 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, const string& exchange, const string& routingKey) { + Message msg; + string data; + + buf.getRawData(data, length); + msg.setData(data); + sendMessage(msg, exchange, routingKey); +} + + + +void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data, + const string& cid, + const Variant::Map headers, + const string& exchange, + const string& routingKey, + const string& contentType) +{ + Message msg; + Variant::Map::const_iterator i; + + if (!cid.empty()) + msg.getMessageProperties().setCorrelationId(cid); + + if (!contentType.empty()) + msg.getMessageProperties().setContentType(contentType); + for (i = headers.begin(); i != headers.end(); ++i) { + msg.getHeaders().setString(i->first, i->second.asString()); + } + msg.getHeaders().setString("app_id", "qmf2"); + + msg.setData(data); + sendMessage(msg, exchange, routingKey); +} + + + + + +void ManagementAgentImpl::ConnectionThread::sendMessage(Message msg, + const string& exchange, + const string& routingKey) +{ ConnectionThread::shared_ptr s; { Mutex::ScopedLock _lock(connLock); @@ -900,23 +1115,21 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, s = subscriptions; } - Message msg; - string data; - - buf.getRawData(data, length); msg.getDeliveryProperties().setRoutingKey(routingKey); msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); - msg.setData(data); + msg.getMessageProperties().getApplicationHeaders().setString("qmf.agent", agent.name_address); try { session.messageTransfer(arg::content=msg, arg::destination=exchange); } catch(exception& e) { - QPID_LOG(error, "Exception caught in sendBuffer: " << e.what()); + QPID_LOG(error, "Exception caught in sendMessage: " << e.what()); // Bounce the connection if (s) s->stop(); } } + + void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank) { stringstream key; diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h index affaa45d2d..d1609341be 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -51,6 +51,9 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen // Methods from ManagementAgent // int getMaxThreads() { return 1; } + void setName(const std::string& vendor, + const std::string& product, + const std::string& instance=""); void init(const std::string& brokerHost = "localhost", uint16_t brokerPort = 5672, uint16_t intervalSeconds = 10, @@ -75,6 +78,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen uint8_t* md5Sum, management::ManagementObject::writeSchemaCall_t schemaCall); ObjectId addObject(management::ManagementObject* objectPtr, uint64_t persistId = 0); + ObjectId addObject(management::ManagementObject* objectPtr, const std::string& key, + bool persistent); void raiseEvent(const management::ManagementEvent& event, severity_t severity = SEV_DEFAULT); uint32_t pollCallbacks(uint32_t callLimit = 0); int getSignalFd(); @@ -120,10 +125,10 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen }; struct QueuedMethod { - QueuedMethod(uint32_t _seq, std::string _reply, std::string _body) : - sequence(_seq), replyTo(_reply), body(_body) {} + QueuedMethod(const std::string& _cid, const std::string& _reply, const std::string& _body) : + cid(_cid), replyTo(_reply), body(_body) {} - uint32_t sequence; + std::string cid; std::string replyTo; std::string body; }; @@ -140,6 +145,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void received (client::Message& msg); + qpid::types::Variant::Map attrMap; + std::string name_address; uint16_t interval; bool extThread; sys::PipeHandle* pipeHandle; @@ -155,6 +162,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen client::ConnectionSettings connectionSettings; bool initialized; bool connected; + bool useMapMsg; std::string lastFailure; bool clientWasAdded; @@ -198,6 +206,15 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen uint32_t length, const std::string& exchange, const std::string& routingKey); + void sendBuffer(const std::string& data, + const std::string& cid, + const qpid::types::Variant::Map headers, + const std::string& exchange, + const std::string& routingKey, + const std::string& contentType="amqp/map"); + void sendMessage(qpid::client::Message msg, + const std::string& exchange, + const std::string& routingKey); void bindToBank(uint32_t brokerBank, uint32_t agentBank); void close(); bool isSleeping() const; @@ -237,16 +254,21 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen PackageMap::iterator pIter, ClassMap::iterator cIter); void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); + qpid::types::Variant::Map mapEncodeSchemaId(const std::string& pname, + const std::string& cname, + const uint8_t *md5Sum); bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); - void sendCommandComplete (std::string replyToKey, uint32_t sequence, - uint32_t code = 0, std::string text = std::string("OK")); - void handleAttachResponse (qpid::framing::Buffer& inBuffer); + void sendHeartbeat(); + void sendException(const std::string& replyToKey, const std::string& cid, + const std::string& text, uint32_t code=1); void handlePackageRequest (qpid::framing::Buffer& inBuffer); void handleClassQuery (qpid::framing::Buffer& inBuffer); - void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence); - void invokeMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); - void handleGetQuery (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); - void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); + void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& replyTo); + void invokeMethodRequest (const std::string& body, const std::string& cid, const std::string& replyTo); + + void handleGetQuery (const std::string& body, const std::string& cid, const std::string& replyTo); + void handleLocateRequest (const std::string& body, const std::string& sequence, const std::string& replyTo); + void handleMethodRequest (const std::string& body, const std::string& sequence, const std::string& replyTo); void handleConsoleAddedIndication(); }; diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index d94f228734..24c5a0c049 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -93,7 +93,8 @@ Broker::Options::Options(const std::string& name) : tcpNoDelay(false), requireEncrypted(false), maxSessionRate(0), - asyncQueueEvents(false) // Must be false in a cluster. + asyncQueueEvents(false), // Must be false in a cluster. + qmf2Support(false) { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; @@ -114,6 +115,7 @@ Broker::Options::Options(const std::string& name) : ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket") ("staging-threshold", optValue(stagingThreshold, "N"), "Stages messages over N bytes to disk") ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management") + ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Use QMF v2 for Broker Management") ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval") ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"), "Interval between attempts to purge any expired messages from queues") @@ -138,7 +140,9 @@ const std::string knownHostsNone("none"); Broker::Broker(const Broker::Options& conf) : poller(new Poller), config(conf), - managementAgent(conf.enableMgmt ? new ManagementAgent() : 0), + managementAgent(conf.enableMgmt ? new ManagementAgent(!conf.qmf2Support, + conf.qmf2Support) + : 0), store(new NullMessageStore), acl(0), dataDir(conf.noDataDir ? std::string() : conf.dataDir), @@ -164,6 +168,7 @@ Broker::Broker(const Broker::Options& conf) : QPID_LOG(info, "Management enabled"); managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(), conf.mgmtPubInterval, this, conf.workerThreads + 3); + managementAgent->setName("apache.org", "qpidd"); _qmf::Package packageInitializer(managementAgent.get()); System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 465a17f4eb..f9be992f0c 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -113,6 +113,7 @@ public: std::string knownHosts; uint32_t maxSessionRate; bool asyncQueueEvents; + bool qmf2Support; private: std::string getHome(); diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 7bb70ed24a..1d3da982d8 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -149,7 +149,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel mgmtExchange = new _qmf::Exchange (agent, this, parent, _name); mgmtExchange->set_durable(durable); mgmtExchange->set_autoDelete(false); - mgmtExchange->set_arguments(args); + mgmtExchange->set_arguments(ManagementAgent::toMap(args)); if (!durable) { if (name.empty()) { agent->addObject (mgmtExchange, 0x1000000000000004LL); // Special default exchange ID @@ -336,7 +336,7 @@ void Exchange::Binding::startManagement() { management::ObjectId queueId = mo->getObjectId(); mgmtBinding = new _qmf::Binding - (agent, this, (Manageable*) parent, queueId, key, args); + (agent, this, (Manageable*) parent, queueId, key, ManagementAgent::toMap(args)); if (!origin.empty()) mgmtBinding->set_origin(origin); agent->addObject (mgmtBinding, agent->allocateId(this)); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 9e379dfc49..8d9248212f 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -873,7 +873,7 @@ void Queue::configure(const FieldTable& _settings, bool recovering) if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>()); if (mgmtObject != 0) - mgmtObject->set_arguments (_settings); + mgmtObject->set_arguments(ManagementAgent::toMap(_settings)); if ( isDurable() && ! getPersistenceId() && ! recovering ) store->create(*this, _settings); diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 73ef807a0a..5148d88e72 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -280,7 +280,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, if (agent != 0) { mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name, - !acquire, ackExpected, exclusive ,arguments); + !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments)); agent->addObject (mgmtObject, agent->allocateId(this)); mgmtObject->set_creditMode("WINDOW"); } diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index c3b6f697fd..10eddc6045 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -106,7 +106,7 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), getConnection().getUserId(), exchange, type, - alternateExchange, durable, false, args, + alternateExchange, durable, false, ManagementAgent::toMap(args), response.second ? "created" : "existing")); }catch(UnknownExchangeTypeException& /*e*/){ @@ -194,7 +194,8 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) - agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, exchangeRoutingKey, arguments)); + agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, + queueName, exchangeRoutingKey, ManagementAgent::toMap(arguments))); } }else{ throw NotFoundException("Bind failed. No such exchange: " + exchangeName); @@ -389,7 +390,7 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(), - name, durable, exclusive, autoDelete, arguments, + name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments), queue_created.second ? "created" : "existing")); } @@ -499,7 +500,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(), getConnection().getUserId(), - queueName, destination, exclusive, arguments)); + queueName, destination, exclusive, ManagementAgent::toMap(arguments))); } void diff --git a/cpp/src/qpid/broker/System.cpp b/cpp/src/qpid/broker/System.cpp index 455ad11cf2..90c6b13cd3 100644 --- a/cpp/src/qpid/broker/System.cpp +++ b/cpp/src/qpid/broker/System.cpp @@ -22,6 +22,7 @@ #include "qpid/management/ManagementAgent.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/SystemInfo.h" +#include "qpid/types/Uuid.h" #include <iostream> #include <fstream> @@ -64,7 +65,7 @@ System::System (string _dataDir, Broker* broker) : mgmtObject(0) } } - mgmtObject = new _qmf::System (agent, this, systemId); + mgmtObject = new _qmf::System(agent, this, types::Uuid(systemId.c_array())); std::string sysname, nodename, release, version, machine; qpid::sys::SystemInfo::getSystemId (sysname, nodename, diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 4454d70427..bc62588f5d 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -29,20 +29,46 @@ #include "qpid/sys/Time.h" #include "qpid/broker/ConnectionState.h" #include "qpid/broker/AclModule.h" +#include "qpid/types/Variant.h" +#include "qpid/types/Uuid.h" +#include "qpid/framing/List.h" +#include "qpid/amqp_0_10/Codecs.h" #include <list> #include <iostream> #include <fstream> #include <sstream> +#include <typeinfo> using boost::intrusive_ptr; using qpid::framing::Uuid; +using qpid::types::Variant; +using qpid::amqp_0_10::MapCodec; +using qpid::amqp_0_10::ListCodec; using namespace qpid::framing; using namespace qpid::management; using namespace qpid::broker; using namespace qpid::sys; +using namespace qpid; using namespace std; namespace _qmf = qmf::org::apache::qpid::broker; + + +static Variant::Map mapEncodeSchemaId(const std::string& pname, + const std::string& cname, + const std::string& type, + const uint8_t *md5Sum) +{ + Variant::Map map_; + + map_["_package_name"] = pname; + map_["_class_name"] = cname; + map_["_type"] = type; + map_["_hash"] = qpid::types::Uuid(md5Sum); + return map_; +} + + ManagementAgent::RemoteAgent::~RemoteAgent () { QPID_LOG(trace, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]"); @@ -52,10 +78,11 @@ ManagementAgent::RemoteAgent::~RemoteAgent () } } -ManagementAgent::ManagementAgent () : +ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : threadPoolSize(1), interval(10), broker(0), timer(0), startTime(uint64_t(Duration(now()))), - suppressed(false) + suppressed(false), + qmf1Support(qmfV1), qmf2Support(qmfV2) { nextObjectId = 1; brokerBank = 1; @@ -148,6 +175,27 @@ void ManagementAgent::pluginsInitialized() { timer->add(new Periodic(*this, interval)); } + +void ManagementAgent::setName(const string& vendor, const string& product, const string& instance) +{ + attrMap["_vendor"] = vendor; + attrMap["_product"] = product; + string inst; + if (instance.empty()) { + if (uuid.isNull()) + { + throw Exception("ManagementAgent::configure() must be called if default name is used."); + } + inst = uuid.str(); + } else + inst = instance; + + name_address = vendor + ":" + product + ":" + inst; + attrMap["_instance"] = inst; + attrMap["_name"] = name_address; +} + + void ManagementAgent::writeData () { string filename (dataDir + "/.mbrokerdata"); @@ -194,6 +242,7 @@ void ManagementAgent::registerEvent (const string& packageName, addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall); } +// Deprecated: V1 objects ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId) { uint16_t sequence; @@ -207,8 +256,47 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId objectNum = persistId; } - ObjectId objId(0 /*flags*/ , sequence, brokerBank, 0, objectNum); - objId.setV2Key(*object); + ObjectId objId(0 /*flags*/, sequence, brokerBank, objectNum); + objId.setV2Key(*object); // let object generate the v2 key + + object->setObjectId(objId); + + { + Mutex::ScopedLock lock (addLock); + ManagementObjectMap::iterator destIter = newManagementObjects.find(objId); + if (destIter != newManagementObjects.end()) { + if (destIter->second->isDeleted()) { + newDeletedManagementObjects.push_back(destIter->second); + newManagementObjects.erase(destIter); + } else { + QPID_LOG(error, "ObjectId collision in addObject. class=" << object->getClassName() << + " key=" << objId.getV2Key()); + return objId; + } + } + newManagementObjects[objId] = object; + } + + return objId; +} + + + +ObjectId ManagementAgent::addObject(ManagementObject* object, + const std::string& key, + bool persistent) +{ + uint16_t sequence; + + sequence = persistent ? 0 : bootSequence; + + ObjectId objId(0 /*flags*/, sequence, brokerBank); + if (key.empty()) { + objId.setV2Key(*object); // let object generate the key + } else { + objId.setV2Key(key); + } + object->setObjectId(objId); { @@ -233,21 +321,57 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severity) { Mutex::ScopedLock lock (userLock); - Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); - uint32_t outLen; uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; - encodeHeader(outBuffer, 'e'); - outBuffer.putShortString(event.getPackageName()); - outBuffer.putShortString(event.getEventName()); - outBuffer.putBin128(event.getMd5Sum()); - outBuffer.putLongLong(uint64_t(Duration(now()))); - outBuffer.putOctet(sev); - event.encode(outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBuffer(outBuffer, outLen, mExchange, - "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); + if (qmf1Support) { + Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader(outBuffer, 'e'); + outBuffer.putShortString(event.getPackageName()); + outBuffer.putShortString(event.getEventName()); + outBuffer.putBin128(event.getMd5Sum()); + outBuffer.putLongLong(uint64_t(Duration(now()))); + outBuffer.putOctet(sev); + std::string sBuf; + event.encode(sBuf); + outBuffer.putRawData(sBuf); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, mExchange, + "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); + QPID_LOG(trace, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName()); + } + + if (qmf2Support) { + Variant::Map map_; + Variant::Map schemaId; + Variant::Map values; + Variant::Map headers; + + map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(), + event.getEventName(), + "_event", + event.getMd5Sum()); + event.mapEncode(values); + map_["_values"] = values; + map_["_timestamp"] = uint64_t(Duration(now())); + map_["_severity"] = sev; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_event"; + headers["qmf.agent"] = name_address; + + stringstream key; + key << "agent.ind.event." << sev << "." << name_address << "." << event.getEventName(); + + string content; + MapCodec::encode(map_, content); + sendBuffer(content, "", headers, v2Topic, key.str()); + QPID_LOG(trace, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName()); + } + } ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) @@ -355,6 +479,59 @@ void ManagementAgent::sendBuffer(Buffer& buf, } catch(exception&) {} } + +void ManagementAgent::sendBuffer(const std::string& data, + const std::string& cid, + const Variant::Map& headers, + qpid::broker::Exchange::shared_ptr exchange, + const std::string& routingKey) +{ + Variant::Map::const_iterator i; + + if (suppressed) { + QPID_LOG(trace, "Suppressed management message to " << routingKey); + return; + } + if (exchange.get() == 0) return; + + intrusive_ptr<Message> msg(new Message()); + AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0))); + AMQFrame header((AMQHeaderBody())); + AMQFrame content((AMQContentBody(data))); + + method.setEof(false); + header.setBof(false); + header.setEof(false); + content.setBof(false); + + msg->getFrames().append(method); + msg->getFrames().append(header); + + MessageProperties* props = + msg->getFrames().getHeaders()->get<MessageProperties>(true); + props->setContentLength(data.length()); + if (!cid.empty()) { + props->setCorrelationId(cid); + } + + for (i = headers.begin(); i != headers.end(); ++i) { + msg->getOrInsertHeaders().setString(i->first, i->second.asString()); + } + msg->getOrInsertHeaders().setString("app_id", "qmf2"); + + DeliveryProperties* dp = + msg->getFrames().getHeaders()->get<DeliveryProperties>(true); + dp->setRoutingKey(routingKey); + + msg->getFrames().append(content); + + DeliverableMessage deliverable (msg); + try { + exchange->route(deliverable, routingKey, 0); + } catch(exception&) {} +} + + void ManagementAgent::moveNewObjectsLH() { Mutex::ScopedLock lock (addLock); @@ -391,12 +568,13 @@ void ManagementAgent::periodicProcessing (void) { #define BUFSIZE 65536 #define HEADROOM 4096 - QPID_LOG(trace, "Management agent periodic processing") - Mutex::ScopedLock lock (userLock); + QPID_LOG(trace, "Management agent periodic processing"); + Mutex::ScopedLock lock (userLock); char msgChars[BUFSIZE]; uint32_t contentSize; string routingKey; list<pair<ObjectId, ManagementObject*> > deleteList; + std::string sBuf; uint64_t uptime = uint64_t(Duration(now())) - startTime; static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); @@ -439,43 +617,90 @@ void ManagementAgent::periodicProcessing (void) continue; Buffer msgBuffer(msgChars, BUFSIZE); + Variant::List list_; + for (ManagementObjectMap::iterator iter = baseIter; iter != managementObjects.end(); iter++) { ManagementObject* object = iter->second; + bool send_stats, send_props; if (baseObject->isSameClass(*object) && object->getFlags() == 0) { object->setFlags(1); if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); - if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) { + send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()); + send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish())); + + if (send_props && qmf1Support) { encodeHeader(msgBuffer, 'c'); - object->writeProperties(msgBuffer); - pcount++; + sBuf.clear(); + object->writeProperties(sBuf); + msgBuffer.putRawData(sBuf); } - - if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) { + + if (send_stats && qmf1Support) { encodeHeader(msgBuffer, 'i'); - object->writeStatistics(msgBuffer); - scount++; + sBuf.clear(); + object->writeStatistics(sBuf); + msgBuffer.putRawData(sBuf); } + if ((send_stats || send_props) && qmf2Support) { + Variant::Map map_; + Variant::Map values; + + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + "_data", + object->getMd5Sum()); + object->mapEncodeValues(values, send_props, send_stats); + map_["_values"] = values; + list_.push_back(map_); + + } + + if (send_props) pcount++; + if (send_stats) scount++; + if (object->isDeleted()) deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object)); object->setForcePublish(false); - if (msgBuffer.available() < HEADROOM) + if (qmf1Support && (msgBuffer.available() < HEADROOM)) break; } } - contentSize = BUFSIZE - msgBuffer.available(); - if (contentSize > 0) { - msgBuffer.reset(); - stringstream key; - key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName(); - sendBuffer(msgBuffer, contentSize, mExchange, key.str()); - QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount); + if (pcount || scount) { + if (qmf1Support) { + contentSize = BUFSIZE - msgBuffer.available(); + if (contentSize > 0) { + msgBuffer.reset(); + stringstream key; + key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName(); + sendBuffer(msgBuffer, contentSize, mExchange, key.str()); + QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount); + } + } + + if (qmf2Support) { + string content; + ListCodec::encode(list_, content); + if (content.length()) { + stringstream key; + Variant::Map headers; + key << "agent.ind.data." << baseObject->getPackageName() << "." << baseObject->getClassName(); + // key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName(); + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + sendBuffer(content, "", headers, v2Topic, key.str()); + QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount); + } + } } } @@ -492,15 +717,49 @@ void ManagementAgent::periodicProcessing (void) for (ManagementObjectVector::iterator cdIter = deletedManagementObjects.begin(); cdIter != deletedManagementObjects.end(); cdIter++) { collisionDeletions = true; - Buffer msgBuffer(msgChars, BUFSIZE); - encodeHeader(msgBuffer, 'c'); - (*cdIter)->writeProperties(msgBuffer); - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - stringstream key; - key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName(); - sendBuffer (msgBuffer, contentSize, mExchange, key.str()); - QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str()); + { + if (qmf1Support) { + Buffer msgBuffer(msgChars, BUFSIZE); + encodeHeader(msgBuffer, 'c'); + sBuf.clear(); + (*cdIter)->writeProperties(sBuf); + msgBuffer.putRawData(sBuf); + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + stringstream key; + key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName(); + sendBuffer (msgBuffer, contentSize, mExchange, key.str()); + QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str()); + } + + if (qmf2Support) { + Variant::List list_; + Variant::Map map_; + Variant::Map values; + Variant::Map headers; + + map_["_schema_id"] = mapEncodeSchemaId((*cdIter)->getPackageName(), + (*cdIter)->getClassName(), + "_data", + (*cdIter)->getMd5Sum()); + (*cdIter)->mapEncodeValues(values, true, false); + map_["_values"] = values; + list_.push_back(map_); + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + stringstream key; + key << "agent.ind.data." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName(); + + string content; + ListCodec::encode(list_, content); + sendBuffer(content, "", headers, v2Topic, key.str()); + QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str()); + } + } } if (!deleteList.empty() || collisionDeletions) { @@ -508,7 +767,12 @@ void ManagementAgent::periodicProcessing (void) deleteOrphanedAgentsLH(); } - { + // heartbeat generation + + if (qmf1Support) { +#define BUFSIZE 65536 + uint32_t contentSize; + char msgChars[BUFSIZE]; Buffer msgBuffer(msgChars, BUFSIZE); encodeHeader(msgBuffer, 'h'); msgBuffer.putLongLong(uint64_t(Duration(now()))); @@ -519,6 +783,27 @@ void ManagementAgent::periodicProcessing (void) sendBuffer (msgBuffer, contentSize, mExchange, routingKey); QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey); } + + if (qmf2Support) { + static const string addr_key("agent.ind.heartbeat"); + + Variant::Map map; + Variant::Map headers; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_agent_heartbeat_indication"; + headers["qmf.agent"] = name_address; + + map["_values"] = attrMap; + map["_values"].asMap()["timestamp"] = uint64_t(Duration(now())); + map["_values"].asMap()["heartbeat_interval"] = interval; + + string content; + MapCodec::encode(map, content); + sendBuffer(content, "", headers, v2Topic, addr_key); + + QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); + } QPID_LOG(debug, "periodic update " << debugSnapshot()); } @@ -531,19 +816,51 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) if (!object->isDeleted()) return; + if (qmf1Support) { #define DNOW_BUFSIZE 2048 - char msgChars[DNOW_BUFSIZE]; - uint32_t contentSize; - Buffer msgBuffer(msgChars, DNOW_BUFSIZE); - - encodeHeader(msgBuffer, 'c'); - object->writeProperties(msgBuffer); - contentSize = msgBuffer.getPosition(); - msgBuffer.reset(); - stringstream key; - key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName(); - sendBuffer(msgBuffer, contentSize, mExchange, key.str()); - QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str()); + char msgChars[DNOW_BUFSIZE]; + uint32_t contentSize; + Buffer msgBuffer(msgChars, DNOW_BUFSIZE); + std::string sBuf; + + encodeHeader(msgBuffer, 'c'); + object->writeProperties(sBuf); + msgBuffer.putRawData(sBuf); + contentSize = msgBuffer.getPosition(); + msgBuffer.reset(); + stringstream key; + key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName(); + sendBuffer(msgBuffer, contentSize, mExchange, key.str()); + QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str()); + } + + if (qmf2Support) { + Variant::List list_; + Variant::Map map_; + Variant::Map values; + + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + "_data", + object->getMd5Sum()); + object->mapEncodeValues(values, true, false); + map_["_values"] = values; + list_.push_back(map_); + + stringstream key; + key << "agent.ind.data." << object->getPackageName() << "." << object->getClassName(); + + Variant::Map headers; + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + string content; + ListCodec::encode(list_, content); + sendBuffer(content, "", headers, v2Topic, key.str()); + QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str()); + } managementObjects.erase(oid); } @@ -566,35 +883,68 @@ void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence, bool ManagementAgent::dispatchCommand (Deliverable& deliverable, const string& routingKey, - const FieldTable* /*args*/) + const FieldTable* /*args*/, + const bool topic) { Mutex::ScopedLock lock (userLock); Message& msg = ((DeliverableMessage&) deliverable).getMessage (); - // Parse the routing key. This management broker should act as though it - // is bound to the exchange to match the following keys: - // - // agent.1.0.# - // broker - // schema.# + if (qmf1Support && topic) { - if (routingKey == "broker") { - dispatchAgentCommandLH(msg); - return false; - } + // qmf1 is bound only to the topic management exchange. + // Parse the routing key. This management broker should act as though it + // is bound to the exchange to match the following keys: + // + // agent.1.0.# + // broker + // schema.# - else if (routingKey.compare(0, 9, "agent.1.0") == 0) { - dispatchAgentCommandLH(msg); - return false; - } + if (routingKey == "broker") { + dispatchAgentCommandLH(msg); + return false; + } + + if (routingKey.length() > 6) { + + if (routingKey.compare(0, 9, "agent.1.0") == 0) { + dispatchAgentCommandLH(msg); + return false; + } + + if (routingKey.compare(0, 8, "agent.1.") == 0) { + return authorizeAgentMessageLH(msg); + } - else if (routingKey.compare(0, 8, "agent.1.") == 0) { - return authorizeAgentMessageLH(msg); + if (routingKey.compare(0, 7, "schema.") == 0) { + dispatchAgentCommandLH(msg); + return true; + } + } } - else if (routingKey.compare(0, 7, "schema.") == 0) { - dispatchAgentCommandLH(msg); - return true; + if (qmf2Support) { + + if (topic) { + + // Intercept messages bound to: + // "console.ind.locate.# - process these messages, and also allow them to be forwarded. + + if (routingKey.compare(0, 18, "console.ind.locate") == 0) { + dispatchAgentCommandLH(msg); + return true; + } + + } else { // direct exchange + + // Intercept messages bound to: + // "broker" - generic alias for the local broker + // "<name_address>" - the broker agent's proper name + // and do not forward them futher + if (routingKey == "broker" || routingKey == name_address) { + dispatchAgentCommandLH(msg); + return false; + } + } } return true; @@ -610,14 +960,19 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; AclModule* acl = broker->getAcl(); + std::string inArgs; - ObjectId objId(inBuffer); + std::string sBuf; + inBuffer.getRawData(sBuf, 16); + ObjectId objId; + objId.decode(sBuf); inBuffer.getShortString(packageName); inBuffer.getShortString(className); inBuffer.getBin128(hash); inBuffer.getShortString(methodName); + inBuffer.getRawData(inArgs, inBuffer.available()); - QPID_LOG(trace, "RECV MethodRequest class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" << + QPID_LOG(trace, "RECV MethodRequest (v1) class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" << methodName << " replyTo=" << replyToKey); encodeHeader(outBuffer, 'm', sequence); @@ -629,8 +984,8 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence) - return; + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence); + return; } if (acl != 0) { @@ -645,8 +1000,8 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence) - return; + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); + return; } } @@ -664,7 +1019,9 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey try { outBuffer.record(); Mutex::ScopedUnlock u(userLock); - iter->second->doMethod(methodName, inBuffer, outBuffer); + std::string outBuf; + iter->second->doMethod(methodName, inArgs, outBuf); + outBuffer.putRawData(outBuf); } catch(exception& e) { outBuffer.restore(); outBuffer.putLong(Manageable::STATUS_EXCEPTION); @@ -675,9 +1032,135 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND MethodResponse to=" << replyToKey << " seq=" << sequence); + QPID_LOG(trace, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence); } + +void ManagementAgent::handleMethodRequestLH (const std::string& body, string replyTo, + const std::string& cid, const ConnectionToken* connToken) +{ + string methodName; + Variant::Map inMap; + MapCodec::decode(body, inMap); + Variant::Map::const_iterator oid, mid; + string content; + + Variant::Map outMap; + Variant::Map headers; + + headers["method"] = "response"; + headers["qmf.opcode"] = "_method_response"; + headers["qmf.agent"] = name_address; + + if ((oid = inMap.find("_object_id")) == inMap.end() || + (mid = inMap.find("_method_name")) == inMap.end()) + { + headers["qmf.opcode"] = "_exception"; + (outMap["_values"].asMap())["_status"] = Manageable::STATUS_PARAMETER_INVALID; + (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID); + + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND MethodResponse (invalid param) to=" << replyTo << " seq=" << cid); + return; + } + + ObjectId objId; + Variant::Map inArgs; + + try { + // coversions will throw if input is invalid. + objId = ObjectId(oid->second.asMap()); + methodName = mid->second.getString(); + + mid = inMap.find("_arguments"); + if (mid != inMap.end()) { + inArgs = (mid->second).asMap(); + } + } catch(exception& e) { + headers["qmf.opcode"] = "_exception"; + (outMap["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION; + (outMap["_values"].asMap())["_status_text"] = e.what(); + + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND MethodResponse (invalid format) to=" << replyTo << " seq=" << cid); + return; + } + + ManagementObjectMap::iterator iter = managementObjects.find(objId); + + if (iter == managementObjects.end() || iter->second->isDeleted()) { + headers["qmf.opcode"] = "_exception"; + (outMap["_values"].asMap())["_status"] = Manageable::STATUS_UNKNOWN_OBJECT; + (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT); + + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND MethodResponse (unknown object) to=" << replyTo << " seq=" << cid); + return; + } + + // validate + AclModule* acl = broker->getAcl(); + DisallowedMethods::const_iterator i; + + i = disallowed.find(std::make_pair(iter->second->getClassName(), methodName)); + if (i != disallowed.end()) { + headers["qmf.opcode"] = "_exception"; + (outMap["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN; + (outMap["_values"].asMap())["_status_text"] = i->second; + + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << cid); + return; + } + + if (acl != 0) { + string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId(); + map<acl::Property, string> params; + params[acl::PROP_SCHEMAPACKAGE] = iter->second->getPackageName(); + params[acl::PROP_SCHEMACLASS] = iter->second->getClassName(); + + if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) { + headers["qmf.opcode"] = "_exception"; + (outMap["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN; + (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN); + + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << cid); + return; + } + } + + // invoke the method + + QPID_LOG(trace, "RECV MethodRequest (v2) class=" << iter->second->getPackageName() + << ":" << iter->second->getClassName() << " method=" << + methodName << " replyTo=" << replyTo); + + try { + iter->second->doMethod(methodName, inArgs, outMap); + } catch(exception& e) { + outMap.clear(); + headers["qmf.opcode"] = "_exception"; + (outMap["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION; + (outMap["_values"].asMap())["_status_text"] = e.what(); + + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND MethodResponse (exception) to=" << replyTo << " seq=" << cid); + return; + } + + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid); +} + + void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); @@ -781,6 +1264,7 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, string replyToKey, uin uint32_t outLen; uint32_t sequence = nextRequestSequence++; + // Schema Request encodeHeader (outBuffer, 'S', sequence); outBuffer.putShortString(packageName); key.encode(outBuffer); @@ -803,9 +1287,11 @@ void ManagementAgent::SchemaClass::appendSchema(Buffer& buf) // linked in via plug-in), call the schema handler directly. If the package // is from a remote management agent, send the stored schema information. - if (writeSchemaCall != 0) - writeSchemaCall(buf); - else + if (writeSchemaCall != 0) { + std::string schema; + writeSchemaCall(schema); + buf.putRawData(schema); + } else buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size()); } @@ -981,7 +1467,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, string replyToKey agent->mgmtObject->set_connectionRef(agent->connectionRef); agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); - agent->mgmtObject->set_systemId (systemId); + agent->mgmtObject->set_systemId ((const unsigned char*)systemId.data()); agent->mgmtObject->set_brokerBank (brokerBank); agent->mgmtObject->set_agentBank (assignedBank); addObject (agent->mgmtObject, 0); @@ -1012,7 +1498,7 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin ft.decode(inBuffer); - QPID_LOG(trace, "RECV GetQuery query=" << ft << " seq=" << sequence); + QPID_LOG(trace, "RECV GetQuery (v1) query=" << ft << " seq=" << sequence); value = ft.get("_class"); if (value.get() == 0 || !value->convertsTo<string>()) { @@ -1031,13 +1517,17 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin object->setUpdateTime(); if (!object->isDeleted()) { + std::string sBuf; encodeHeader(outBuffer, 'g', sequence); - object->writeProperties(outBuffer); - object->writeStatistics(outBuffer, true); + object->writeProperties(sBuf); + outBuffer.putRawData(sBuf); + sBuf.clear(); + object->writeStatistics(sBuf, true); + outBuffer.putRawData(sBuf); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence); + QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); } } sendCommandComplete(replyToKey, sequence); @@ -1058,13 +1548,17 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin object->setUpdateTime(); if (!object->isDeleted()) { + std::string sBuf; encodeHeader(outBuffer, 'g', sequence); - object->writeProperties(outBuffer); - object->writeStatistics(outBuffer, true); + object->writeProperties(sBuf); + outBuffer.putRawData(sBuf); + sBuf.clear(); + object->writeStatistics(sBuf, true); + outBuffer.putRawData(sBuf); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence); + QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); } } } @@ -1072,64 +1566,285 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin sendCommandComplete(replyToKey, sequence); } + +void ManagementAgent::handleGetQueryLH(const std::string& body, std::string replyTo, const std::string& cid, const std::string& contentType) +{ + FieldTable ft; + FieldTable::ValuePtr value; + + moveNewObjectsLH(); + + if (contentType != "_query_v1") { + QPID_LOG(warning, "Support for QMF V2 Query format TBD!!!"); + return; + } + + Variant::Map inMap; + MapCodec::decode(body, inMap); + Variant::Map::const_iterator i; + Variant::Map headers; + + QPID_LOG(trace, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid); + + headers["method"] = "response"; + headers["qmf.opcode"] = "_query_response"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + headers["partial"]; + + Variant::List list_; + Variant::Map map_; + Variant::Map values; + string className; + string content; + + i = inMap.find("_class"); + if (i != inMap.end()) + try { + className = i->second.asString(); + } catch(exception& e) { + className.clear(); + QPID_LOG(trace, "RCVD GetQuery: invalid format - class target ignored."); + } + + if (className.empty()) { + ObjectId objId; + i = inMap.find("_object_id"); + if (i != inMap.end()) { + + try { + objId = ObjectId(i->second.asMap()); + } catch (exception &e) { + objId = ObjectId(); // empty object id - won't find a match (I hope). + QPID_LOG(trace, "RCVD GetQuery (invalid Object Id format) to=" << replyTo << " seq=" << cid); + } + + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter != managementObjects.end()) { + ManagementObject* object = iter->second; + + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + if (!object->isDeleted()) { + object->mapEncodeValues(values, true, true); // write both stats and properties + map_["_values"] = values; + list_.push_back(map_); + + ListCodec::encode(list_, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + } + } + } + } else { + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second; + if (object->getClassName () == className) { + + // @todo: support multiple objects per message reply + values.clear(); + list_.clear(); + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + if (!object->isDeleted()) { + object->mapEncodeValues(values, true, true); // write both stats and properties + map_["_values"] = values; + list_.push_back(map_); + + ListCodec::encode(list_, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + } + } + } + } + + // end empty "non-partial" message to indicate CommandComplete + list_.clear(); + headers.erase("partial"); + ListCodec::encode(list_, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND GetResponse (v2) to=" << replyTo << " seq=" << cid); +} + + +void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo, + const string& cid) +{ + QPID_LOG(trace, "RCVD AgentLocateRequest"); + + Variant::Map map; + Variant::Map headers; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_agent_locate_response"; + headers["qmf.agent"] = name_address; + + map["_values"] = attrMap; + map["_values"].asMap()["timestamp"] = uint64_t(Duration(now())); + map["_values"].asMap()["heartbeat_interval"] = interval; + + string content; + MapCodec::encode(map, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + + QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo); +} + + bool ManagementAgent::authorizeAgentMessageLH(Message& msg) { Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); - uint8_t opcode; - uint32_t sequence; - string replyToKey; + uint32_t sequence = 0; + bool methodReq = false; + bool mapMsg = false; + string packageName; + string className; + string methodName; + std::string cid; if (msg.encodedSize() > MA_BUFFER_SIZE) return false; msg.encodeContent(inBuffer); + uint32_t bufferLen = inBuffer.getPosition(); inBuffer.reset(); - if (!checkHeader(inBuffer, &opcode, &sequence)) - return false; + const framing::MessageProperties* p = + msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + + const framing::FieldTable *headers = msg.getApplicationHeaders(); + + if (headers && headers->getAsString("app_id") == "qmf2") + { + mapMsg = true; + + if (p && p->hasCorrelationId()) { + cid = p->getCorrelationId(); + } + + if (headers->getAsString("qmf.opcode") == "_method_request") + { + methodReq = true; + + // extract object id and method name + + std::string body; + inBuffer.getRawData(body, bufferLen); + Variant::Map inMap; + MapCodec::decode(body, inMap); + Variant::Map::const_iterator oid, mid; + + ObjectId objId; - if (opcode == 'M') { + if ((oid = inMap.find("_object_id")) == inMap.end() || + (mid = inMap.find("_method_name")) == inMap.end()) { + QPID_LOG(warning, + "Missing fields in QMF authorize req received."); + return false; + } + + try { + // coversions will throw if input is invalid. + objId = ObjectId(oid->second.asMap()); + methodName = mid->second.getString(); + } catch(exception& e) { + QPID_LOG(warning, + "Badly formatted QMF authorize req received."); + return false; + } + + // look up schema for object to get package and class name + + ManagementObjectMap::iterator iter = managementObjects.find(objId); + + if (iter == managementObjects.end() || iter->second->isDeleted()) { + QPID_LOG(debug, "ManagementAgent::authorizeAgentMessageLH: stale object id " << + objId); + return false; + } + + packageName = iter->second->getPackageName(); + className = iter->second->getClassName(); + } + } else { // old style binary message format + + uint8_t opcode; + + if (!checkHeader(inBuffer, &opcode, &sequence)) + return false; + + if (opcode == 'M') { + methodReq = true; + + // extract method name & schema package and class name + + uint8_t hash[16]; + inBuffer.getLongLong(); // skip over object id + inBuffer.getLongLong(); + inBuffer.getShortString(packageName); + inBuffer.getShortString(className); + inBuffer.getBin128(hash); + inBuffer.getShortString(methodName); + + } + } + + if (methodReq) { // TODO: check method call against ACL list. + map<acl::Property, string> params; AclModule* acl = broker->getAcl(); if (acl == 0) return true; string userId = ((const qpid::broker::ConnectionState*) msg.getPublisher())->getUserId(); - string packageName; - string className; - uint8_t hash[16]; - string methodName; - - map<acl::Property, string> params; - ObjectId objId(inBuffer); - inBuffer.getShortString(packageName); - inBuffer.getShortString(className); - inBuffer.getBin128(hash); - inBuffer.getShortString(methodName); - params[acl::PROP_SCHEMAPACKAGE] = packageName; params[acl::PROP_SCHEMACLASS] = className; if (acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) return true; + // authorization failed, send reply if replyTo present + const framing::MessageProperties* p = msg.getFrames().getHeaders()->get<framing::MessageProperties>(); if (p && p->hasReplyTo()) { const framing::ReplyTo& rt = p->getReplyTo(); - replyToKey = rt.getRoutingKey(); + string replyToKey = rt.getRoutingKey(); - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + if (mapMsg) { - encodeHeader(outBuffer, 'm', sequence); - outBuffer.putLong(Manageable::STATUS_FORBIDDEN); - outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence) - } + Variant::Map outMap; + Variant::Map headers; + + headers["method"] = "response"; + headers["qmf.opcode"] = "_method_response"; + headers["qmf.agent"] = name_address; + + ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN; + ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN); + + string content; + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, v2Direct, replyToKey); + + } else { + + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader(outBuffer, 'm', sequence); + outBuffer.putLong(Manageable::STATUS_FORBIDDEN); + outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); + } + + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); + } return false; } @@ -1139,9 +1854,6 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) void ManagementAgent::dispatchAgentCommandLH(Message& msg) { - Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE); - uint8_t opcode; - uint32_t sequence; string replyToKey; const framing::MessageProperties* p = @@ -1153,6 +1865,9 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg) else return; + Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE); + uint8_t opcode; + if (msg.encodedSize() > MA_BUFFER_SIZE) { QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " << msg.encodedSize()); @@ -1163,7 +1878,36 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg) uint32_t bufferLen = inBuffer.getPosition(); inBuffer.reset(); + const framing::FieldTable *headers = msg.getApplicationHeaders(); + + if (headers && headers->getAsString("app_id") == "qmf2") + { + std::string opcode = headers->getAsString("qmf.opcode"); + std::string contentType = headers->getAsString("qmf.content"); + std::string body; + std::string cid; + + inBuffer.getRawData(body, bufferLen); + + if (p && p->hasCorrelationId()) { + cid = p->getCorrelationId(); + } + + if (opcode == "_method_request") + return handleMethodRequestLH(body, replyToKey, cid, msg.getPublisher()); + else if (opcode == "_query_request") + return handleGetQueryLH(body, replyToKey, cid, contentType); + else if (opcode == "_agent_locate_request") + return handleLocateRequestLH(body, replyToKey, cid); + + QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!"); + return; + } + + // old preV2 binary messages + while (inBuffer.getPosition() < bufferLen) { + uint32_t sequence; if (!checkHeader(inBuffer, &opcode, &sequence)) return; @@ -1359,7 +2103,6 @@ ManagementObjectMap::iterator ManagementAgent::numericFind(const ObjectId& oid) return iter; } - void ManagementAgent::setAllocator(std::auto_ptr<IdAllocator> a) { Mutex::ScopedLock lock (userLock); @@ -1377,42 +2120,64 @@ void ManagementAgent::disallow(const std::string& className, const std::string& disallowed[std::make_pair(className, methodName)] = message; } +void ManagementAgent::SchemaClassKey::mapEncode(Variant::Map& _map) const { + _map["_cname"] = name; + _map["_hash"] = qpid::types::Uuid(hash); +} + +void ManagementAgent::SchemaClassKey::mapDecode(const Variant::Map& _map) { + Variant::Map::const_iterator i; + + if ((i = _map.find("_cname")) != _map.end()) { + name = i->second.asString(); + } + + if ((i = _map.find("_hash")) != _map.end()) { + const qpid::types::Uuid& uuid = i->second.asUuid(); + memcpy(hash, uuid.data(), uuid.size()); + } +} + void ManagementAgent::SchemaClassKey::encode(qpid::framing::Buffer& buffer) const { - buffer.checkAvailable(encodedSize()); + buffer.checkAvailable(encodedBufSize()); buffer.putShortString(name); buffer.putBin128(hash); } void ManagementAgent::SchemaClassKey::decode(qpid::framing::Buffer& buffer) { - buffer.checkAvailable(encodedSize()); + buffer.checkAvailable(encodedBufSize()); buffer.getShortString(name); buffer.getBin128(hash); } -uint32_t ManagementAgent::SchemaClassKey::encodedSize() const { +uint32_t ManagementAgent::SchemaClassKey::encodedBufSize() const { return 1 + name.size() + 16 /* bin128 */; } -void ManagementAgent::SchemaClass::encode(qpid::framing::Buffer& outBuf) const { - outBuf.checkAvailable(encodedSize()); - outBuf.putOctet(kind); - outBuf.putLong(pendingSequence); - outBuf.putLongString(data); +void ManagementAgent::SchemaClass::mapEncode(Variant::Map& _map) const { + _map["_type"] = kind; + _map["_pending_sequence"] = pendingSequence; + _map["_data"] = data; } -void ManagementAgent::SchemaClass::decode(qpid::framing::Buffer& inBuf) { - inBuf.checkAvailable(encodedSize()); - kind = inBuf.getOctet(); - pendingSequence = inBuf.getLong(); - inBuf.getLongString(data); -} +void ManagementAgent::SchemaClass::mapDecode(const Variant::Map& _map) { + Variant::Map::const_iterator i; -uint32_t ManagementAgent::SchemaClass::encodedSize() const { - return sizeof(uint8_t) + sizeof(uint32_t) + sizeof(uint32_t) + data.size(); + if ((i = _map.find("_type")) != _map.end()) { + kind = i->second; + } + if ((i = _map.find("_pending_sequence")) != _map.end()) { + pendingSequence = i->second; + } + if ((i = _map.find("_data")) != _map.end()) { + data = i->second.asString(); + } } void ManagementAgent::exportSchemas(std::string& out) { - out.clear(); + Variant::List list_; + Variant::Map map_, kmap, cmap; + for (PackageMap::const_iterator i = packages.begin(); i != packages.end(); ++i) { string name = i->first; const ClassMap& classes = i ->second; @@ -1421,90 +2186,143 @@ void ManagementAgent::exportSchemas(std::string& out) { const SchemaClass& klass = j->second; if (klass.writeSchemaCall == 0) { // Ignore built-in schemas. // Encode name, schema-key, schema-class - size_t encodedSize = 1+name.size()+key.encodedSize()+klass.encodedSize(); - size_t end = out.size(); - out.resize(end + encodedSize); - framing::Buffer outBuf(&out[end], encodedSize); - outBuf.putShortString(name); - key.encode(outBuf); - klass.encode(outBuf); + + map_.clear(); + kmap.clear(); + cmap.clear(); + + key.mapEncode(kmap); + klass.mapEncode(cmap); + + map_["_pname"] = name; + map_["_key"] = kmap; + map_["_class"] = cmap; + list_.push_back(map_); } } } + + ListCodec::encode(list_, out); } void ManagementAgent::importSchemas(qpid::framing::Buffer& inBuf) { - while (inBuf.available()) { + + string buf(inBuf.getPointer(), inBuf.available()); + Variant::List content; + ListCodec::decode(buf, content); + Variant::List::const_iterator l; + + + for (l = content.begin(); l != content.end(); l++) { string package; SchemaClassKey key; SchemaClass klass; - inBuf.getShortString(package); - key.decode(inBuf); - klass.decode(inBuf); - packages[package][key] = klass; + Variant::Map map_, kmap, cmap; + Variant::Map::const_iterator i; + + map_ = l->asMap(); + + if ((i = map_.find("_pname")) != map_.end()) { + package = i->second.asString(); + + if ((i = map_.find("_key")) != map_.end()) { + key.mapDecode(i->second.asMap()); + + if ((i = map_.find("_class")) != map_.end()) { + klass.mapDecode(i->second.asMap()); + + packages[package][key] = klass; + } + } + } } } -void ManagementAgent::RemoteAgent::encode(qpid::framing::Buffer& outBuf) const { - outBuf.checkAvailable(encodedSize()); - outBuf.putLong(brokerBank); - outBuf.putLong(agentBank); - outBuf.putShortString(routingKey); - // TODO aconway 2010-03-04: we send the v2Key instead of the - // ObjectId because that has the same meaning on different - // brokers. ObjectId::encode doesn't currently encode the v2Key, - // this can be cleaned up when it does. - outBuf.putMediumString(connectionRef.getV2Key()); - mgmtObject->writeProperties(outBuf); +void ManagementAgent::RemoteAgent::mapEncode(Variant::Map& map_) const { + Variant::Map _objId, _values; + + map_["_brokerBank"] = brokerBank; + map_["_agentBank"] = agentBank; + map_["_routingKey"] = routingKey; + + connectionRef.mapEncode(_objId); + map_["_object_id"] = _objId; + + mgmtObject->mapEncodeValues(_values, true, false); + map_["_values"] = _values; } -void ManagementAgent::RemoteAgent::decode(qpid::framing::Buffer& inBuf) { - brokerBank = inBuf.getLong(); - agentBank = inBuf.getLong(); - inBuf.getShortString(routingKey); +void ManagementAgent::RemoteAgent::mapDecode(const Variant::Map& map_) { + Variant::Map::const_iterator i; + + if ((i = map_.find("_brokerBank")) != map_.end()) { + brokerBank = i->second; + } + + if ((i = map_.find("_agentBank")) != map_.end()) { + agentBank = i->second; + } + + if ((i = map_.find("_routingKey")) != map_.end()) { + routingKey = i->second.getString(); + } - // TODO aconway 2010-03-04: see comment in encode() - string connectionKey; - inBuf.getMediumString(connectionKey); - connectionRef = ObjectId(); // Clear out any existing value. - connectionRef.setV2Key(connectionKey); + if ((i = map_.find("_object_id")) != map_.end()) { + connectionRef.mapDecode(i->second.asMap()); + } mgmtObject = new _qmf::Agent(&agent, this); - mgmtObject->readProperties(inBuf); + + if ((i = map_.find("_values")) != map_.end()) { + mgmtObject->mapDecodeValues(i->second.asMap()); + } + // TODO aconway 2010-03-04: see comment in encode(), readProperties doesn't set v2key. mgmtObject->set_connectionRef(connectionRef); } -uint32_t ManagementAgent::RemoteAgent::encodedSize() const { - // TODO aconway 2010-03-04: see comment in encode() - return sizeof(uint32_t) + sizeof(uint32_t) // 2 x Long - + routingKey.size() + sizeof(uint8_t) // ShortString - + connectionRef.getV2Key().size() + sizeof(uint16_t) // medium string - + mgmtObject->writePropertiesSize(); -} - void ManagementAgent::exportAgents(std::string& out) { - out.clear(); + Variant::List list_; + Variant::Map map_, omap, amap; + for (RemoteAgentMap::const_iterator i = remoteAgents.begin(); i != remoteAgents.end(); ++i) { // TODO aconway 2010-03-04: see comment in ManagementAgent::RemoteAgent::encode RemoteAgent* agent = i->second; - size_t encodedSize = agent->encodedSize(); - size_t end = out.size(); - out.resize(end + encodedSize); - framing::Buffer outBuf(&out[end], encodedSize); - agent->encode(outBuf); + + map_.clear(); + amap.clear(); + + agent->mapEncode(amap); + map_["_remote_agent"] = amap; + list_.push_back(map_); } + + ListCodec::encode(list_, out); } void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) { - while (inBuf.available()) { + string buf(inBuf.getPointer(), inBuf.available()); + Variant::List content; + ListCodec::decode(buf, content); + Variant::List::const_iterator l; + + for (l = content.begin(); l != content.end(); l++) { std::auto_ptr<RemoteAgent> agent(new RemoteAgent(*this)); - agent->decode(inBuf); - addObject(agent->mgmtObject, 0); - remoteAgents[agent->connectionRef] = agent.release(); + Variant::Map map_; + Variant::Map::const_iterator i; + + map_ = l->asMap(); + + if ((i = map_.find("_remote_agent")) != map_.end()) { + + agent->mapDecode(i->second.asMap()); + + addObject (agent->mgmtObject, 0, false); + remoteAgents[agent->connectionRef] = agent.release(); + } } } @@ -1519,3 +2337,198 @@ std::string ManagementAgent::debugSnapshot() { msg << " new objects: " << newManagementObjects.size(); return msg.str(); } + +Variant::Map ManagementAgent::toMap(const FieldTable& from) +{ + Variant::Map map; + + for (FieldTable::const_iterator iter = from.begin(); iter != from.end(); iter++) { + const string& key(iter->first); + const FieldTable::ValuePtr& val(iter->second); + + map[key] = toVariant(val); + } + + return map; +} + +Variant::List ManagementAgent::toList(const List& from) +{ + Variant::List _list; + + for (List::const_iterator iter = from.begin(); iter != from.end(); iter++) { + const List::ValuePtr& val(*iter); + + _list.push_back(toVariant(val)); + } + + return _list; +} + +qpid::framing::FieldTable ManagementAgent::fromMap(const Variant::Map& from) +{ + qpid::framing::FieldTable ft; + + for (Variant::Map::const_iterator iter = from.begin(); + iter != from.end(); + iter++) { + const string& key(iter->first); + const Variant& val(iter->second); + + ft.set(key, toFieldValue(val)); + } + + return ft; +} + + +List ManagementAgent::fromList(const Variant::List& from) +{ + List fa; + + for (Variant::List::const_iterator iter = from.begin(); + iter != from.end(); + iter++) { + const Variant& val(*iter); + + fa.push_back(toFieldValue(val)); + } + + return fa; +} + + +boost::shared_ptr<FieldValue> ManagementAgent::toFieldValue(const Variant& in) +{ + + switch(in.getType()) { + + case types::VAR_VOID: return boost::shared_ptr<FieldValue>(new VoidValue()); + case types::VAR_BOOL: return boost::shared_ptr<FieldValue>(new BoolValue(in.asBool())); + case types::VAR_UINT8: return boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8())); + case types::VAR_UINT16: return boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16())); + case types::VAR_UINT32: return boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32())); + case types::VAR_UINT64: return boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64())); + case types::VAR_INT8: return boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8())); + case types::VAR_INT16: return boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16())); + case types::VAR_INT32: return boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32())); + case types::VAR_INT64: return boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64())); + case types::VAR_FLOAT: return boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat())); + case types::VAR_DOUBLE: return boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble())); + case types::VAR_STRING: return boost::shared_ptr<FieldValue>(new Str16Value(in.asString())); + case types::VAR_UUID: return boost::shared_ptr<FieldValue>(new UuidValue(in.asUuid().data())); + case types::VAR_MAP: return boost::shared_ptr<FieldValue>(new FieldTableValue(ManagementAgent::fromMap(in.asMap()))); + case types::VAR_LIST: return boost::shared_ptr<FieldValue>(new ListValue(ManagementAgent::fromList(in.asList()))); + } + + QPID_LOG(error, "Unknown Variant type - not converted: [" << in.getType() << "]"); + return boost::shared_ptr<FieldValue>(new VoidValue()); +} + +// stolen from qpid/client/amqp0_10/Codecs.cpp - TODO: make Codecs public, and remove this dup. +Variant ManagementAgent::toVariant(const boost::shared_ptr<FieldValue>& in) +{ + const std::string iso885915("iso-8859-15"); + const std::string utf8("utf8"); + const std::string utf16("utf16"); + //const std::string binary("binary"); + const std::string amqp0_10_binary("amqp0-10:binary"); + //const std::string amqp0_10_bit("amqp0-10:bit"); + const std::string amqp0_10_datetime("amqp0-10:datetime"); + const std::string amqp0_10_struct("amqp0-10:struct"); + Variant out; + + //based on AMQP 0-10 typecode, pick most appropriate variant type + switch (in->getType()) { + //Fixed Width types: + case 0x00: //bin8 + case 0x01: out.setEncoding(amqp0_10_binary); // int8 + case 0x02: out = in->getIntegerValue<int8_t, 1>(); break; //uint8 + case 0x03: out = in->getIntegerValue<uint8_t, 1>(); break; // + // case 0x04: break; //TODO: iso-8859-15 char // char + case 0x08: out = static_cast<bool>(in->getIntegerValue<uint8_t, 1>()); break; // bool int8 + + case 0x10: out.setEncoding(amqp0_10_binary); // bin16 + case 0x11: out = in->getIntegerValue<int16_t, 2>(); break; // int16 + case 0x12: out = in->getIntegerValue<uint16_t, 2>(); break; //uint16 + + case 0x20: out.setEncoding(amqp0_10_binary); // bin32 + case 0x21: out = in->getIntegerValue<int32_t, 4>(); break; // int32 + case 0x22: out = in->getIntegerValue<uint32_t, 4>(); break; // uint32 + + case 0x23: out = in->get<float>(); break; // float(32) + + // case 0x27: break; //TODO: utf-32 char + + case 0x30: out.setEncoding(amqp0_10_binary); // bin64 + case 0x31: out = in->getIntegerValue<int64_t, 8>(); break; //int64 + + case 0x38: out.setEncoding(amqp0_10_datetime); //treat datetime as uint64_t, but set encoding + case 0x32: out = in->getIntegerValue<uint64_t, 8>(); break; //uint64 + case 0x33: out = in->get<double>(); break; // double + + case 0x48: // uuid + { + unsigned char data[16]; + in->getFixedWidthValue<16>(data); + out = qpid::types::Uuid(data); + } break; + + //TODO: figure out whether and how to map values with codes 0x40-0xd8 + + case 0xf0: break;//void, which is the default value for Variant + // case 0xf1: out.setEncoding(amqp0_10_bit); break;//treat 'bit' as void, which is the default value for Variant + + //Variable Width types: + //strings: + case 0x80: // str8 + case 0x90: // str16 + case 0xa0: // str32 + out = in->get<std::string>(); + out.setEncoding(amqp0_10_binary); + break; + + case 0x84: // str8 + case 0x94: // str16 + out = in->get<std::string>(); + out.setEncoding(iso885915); + break; + + case 0x85: // str8 + case 0x95: // str16 + out = in->get<std::string>(); + out.setEncoding(utf8); + break; + + case 0x86: // str8 + case 0x96: // str16 + out = in->get<std::string>(); + out.setEncoding(utf16); + break; + + case 0xab: // str32 + out = in->get<std::string>(); + out.setEncoding(amqp0_10_struct); + break; + + case 0xa8: // map + out = ManagementAgent::toMap(in->get<FieldTable>()); + break; + + case 0xa9: // list of variant types + out = ManagementAgent::toList(in->get<List>()); + break; + //case 0xaa: //convert amqp0-10 array (uniform type) into variant list + // out = Variant::List(); + // translate<Array>(in, out.asList(), &toVariant); + // break; + + default: + //error? + QPID_LOG(error, "Unknown FieldValue type - not converted: [" << (unsigned int)(in->getType()) << "]"); + break; + } + + return out; +} + diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index 5b2c54f1b8..0250f39dd6 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -32,7 +32,9 @@ #include "qpid/management/ManagementEvent.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Agent.h" +#include "qpid/types/Variant.h" #include <qpid/framing/AMQFrame.h> +#include <qpid/framing/FieldValue.h> #include <memory> #include <string> #include <map> @@ -62,7 +64,7 @@ public: } severity_t; - ManagementAgent (); + ManagementAgent (const bool qmfV1, const bool qmfV2); virtual ~ManagementAgent (); /** Called before plugins are initialized */ @@ -74,6 +76,9 @@ public: /** Called by cluster to suppress management output during update. */ void suppress(bool s) { suppressed = s; } + void setName(const std::string& vendor, + const std::string& product, + const std::string& instance=""); void setInterval(uint16_t _interval) { interval = _interval; } void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange, qpid::broker::Exchange::shared_ptr directExchange); @@ -91,6 +96,9 @@ public: ManagementObject::writeSchemaCall_t schemaCall); QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object, uint64_t persistId = 0); + QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object, + const std::string& key, + bool persistent = true); QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event, severity_t severity = SEV_DEFAULT); QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey); @@ -99,7 +107,8 @@ public: bool dispatchCommand (qpid::broker::Deliverable& msg, const std::string& routingKey, - const framing::FieldTable* args); + const framing::FieldTable* args, + const bool topic); const framing::Uuid& getUuid() const { return uuid; } @@ -128,6 +137,15 @@ public: uint16_t getBootSequence(void) { return bootSequence; } void setBootSequence(uint16_t b) { bootSequence = b; } + // TODO: remove these when Variant API moved into common library. + static types::Variant::Map toMap(const framing::FieldTable& from); + static framing::FieldTable fromMap(const types::Variant::Map& from); + static types::Variant::List toList(const framing::List& from); + static framing::List fromList(const types::Variant::List& from); + static boost::shared_ptr<framing::FieldValue> toFieldValue(const types::Variant& in); + static types::Variant toVariant(const boost::shared_ptr<framing::FieldValue>& val); + + private: struct Periodic : public qpid::sys::TimerTask { @@ -153,9 +171,8 @@ private: ManagementObject* GetManagementObject (void) const { return mgmtObject; } virtual ~RemoteAgent (); - void encode(framing::Buffer& buffer) const; - void decode(framing::Buffer& buffer); - uint32_t encodedSize() const; + void mapEncode(qpid::types::Variant::Map& _map) const; + void mapDecode(const qpid::types::Variant::Map& _map); }; // TODO: Eventually replace string with entire reply-to structure. reply-to @@ -175,9 +192,11 @@ private: std::string name; uint8_t hash[16]; + void mapEncode(qpid::types::Variant::Map& _map) const; + void mapDecode(const qpid::types::Variant::Map& _map); void encode(framing::Buffer& buffer) const; void decode(framing::Buffer& buffer); - uint32_t encodedSize() const; + uint32_t encodedBufSize() const; }; struct SchemaClassKeyComp @@ -209,9 +228,8 @@ private: bool hasSchema () { return (writeSchemaCall != 0) || !data.empty(); } void appendSchema (framing::Buffer& buf); - void encode(framing::Buffer& buffer) const; - void decode(framing::Buffer& buffer); - uint32_t encodedSize() const; + void mapEncode(qpid::types::Variant::Map& _map) const; + void mapDecode(const qpid::types::Variant::Map& _map); }; typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; @@ -264,6 +282,14 @@ private: typedef std::map<MethodName, std::string> DisallowedMethods; DisallowedMethods disallowed; + // Agent name and address + qpid::types::Variant::Map attrMap; + std::string name_address; + + // supported management protocol + bool qmf1Support; + bool qmf2Support; + # define MA_BUFFER_SIZE 65536 char inputBuffer[MA_BUFFER_SIZE]; @@ -279,6 +305,11 @@ private: uint32_t length, qpid::broker::Exchange::shared_ptr exchange, std::string routingKey); + void sendBuffer(const std::string& data, + const std::string& cid, + const qpid::types::Variant::Map& headers, + qpid::broker::Exchange::shared_ptr exchange, + const std::string& routingKey); void moveNewObjectsLH(); bool authorizeAgentMessageLH(qpid::broker::Message& msg); @@ -311,6 +342,10 @@ private: void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); + void handleGetQueryLH (const std::string& body, std::string replyToKey, const std::string& cid, const std::string& contentType); + void handleMethodRequestLH (const std::string& body, std::string replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken); + void handleLocateRequestLH (const std::string& body, const std::string &replyToKey, const std::string& cid); + size_t validateSchema(framing::Buffer&, uint8_t kind); size_t validateTableSchema(framing::Buffer&); diff --git a/cpp/src/qpid/management/ManagementDirectExchange.cpp b/cpp/src/qpid/management/ManagementDirectExchange.cpp index 0813e30891..6dc41ef073 100644 --- a/cpp/src/qpid/management/ManagementDirectExchange.cpp +++ b/cpp/src/qpid/management/ManagementDirectExchange.cpp @@ -29,13 +29,16 @@ using namespace qpid::framing; using namespace qpid::sys; ManagementDirectExchange::ManagementDirectExchange(const string& _name, Manageable* _parent, Broker* b) : - Exchange (_name, _parent, b), DirectExchange(_name, _parent, b) {} + Exchange (_name, _parent, b), + DirectExchange(_name, _parent, b), + managementAgent(0) {} ManagementDirectExchange::ManagementDirectExchange(const std::string& _name, bool _durable, const FieldTable& _args, Manageable* _parent, Broker* b) : Exchange (_name, _durable, _args, _parent, b), - DirectExchange(_name, _durable, _args, _parent, b) {} + DirectExchange(_name, _durable, _args, _parent, b), + managementAgent(0) {} void ManagementDirectExchange::route(Deliverable& msg, const string& routingKey, @@ -43,7 +46,8 @@ void ManagementDirectExchange::route(Deliverable& msg, { bool routeIt = true; - // TODO: Intercept messages directed to the embedded agent and send them to the management agent. + if (managementAgent) + routeIt = managementAgent->dispatchCommand(msg, routingKey, args, false /*direct*/); if (routeIt) DirectExchange::route(msg, routingKey, args); diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp index 4b87800174..46fc67d07f 100644 --- a/cpp/src/qpid/management/ManagementObject.cpp +++ b/cpp/src/qpid/management/ManagementObject.cpp @@ -22,7 +22,10 @@ #include "qpid/management/Manageable.h" #include "qpid/management/ManagementObject.h" #include "qpid/framing/FieldTable.h" +#include "qpid/framing/Buffer.h" #include "qpid/sys/Thread.h" +#include "qpid/log/Statement.h" +#include <boost/lexical_cast.hpp> #include <stdlib.h> @@ -36,26 +39,37 @@ void AgentAttachment::setBanks(uint32_t broker, uint32_t bank) ((uint64_t) (bank & 0x0fffffff)); } -ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint32_t bank, uint64_t object) - : agent(0) +// Deprecated +ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint64_t object) + : agent(0), agentEpoch(seq) { first = ((uint64_t) (flags & 0x0f)) << 60 | ((uint64_t) (seq & 0x0fff)) << 48 | - ((uint64_t) (broker & 0x000fffff)) << 28 | - ((uint64_t) (bank & 0x0fffffff)); + ((uint64_t) (broker & 0x000fffff)) << 28; second = object; } -ObjectId::ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq, uint64_t object) - : agent(_agent) + +ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker) + : agent(0), second(0), agentEpoch(seq) { first = + ((uint64_t) (flags & 0x0f)) << 60 | + ((uint64_t) (seq & 0x0fff)) << 48 | + ((uint64_t) (broker & 0x000fffff)) << 28; +} + +ObjectId::ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq) + : agent(_agent), second(0), agentEpoch(seq) +{ + + first = ((uint64_t) (flags & 0x0f)) << 60 | ((uint64_t) (seq & 0x0fff)) << 48; - second = object; } + ObjectId::ObjectId(std::istream& in) : agent(0) { std::string text; @@ -75,6 +89,10 @@ void ObjectId::fromString(const std::string& text) # define atoll(X) _atoi64(X) #endif + // format: + // V1: <flags>-<sequence>-<broker-bank>-<agent-bank>-<uint64-app-id> + // V2: Not used + std::string copy(text.c_str()); char* cText; char* field[FIELDS]; @@ -99,10 +117,13 @@ void ObjectId::fromString(const std::string& text) if (idx != FIELDS) throw Exception("Invalid ObjectId format"); + agentEpoch = atoll(field[1]); + first = (atoll(field[0]) << 60) + (atoll(field[1]) << 48) + - (atoll(field[2]) << 28) + - atoll(field[3]); + (atoll(field[2]) << 28); + + agentName = std::string(field[3]); second = atoll(field[4]); } @@ -123,21 +144,40 @@ bool ObjectId::equalV1(const ObjectId &other) const return first == otherFirst && second == other.second; } -void ObjectId::encode(framing::Buffer& buffer) const +// encode as V1-format binary +void ObjectId::encode(std::string& buffer) const { + const uint32_t len = 16; + char _data[len]; + qpid::framing::Buffer body(_data, len); + if (agent == 0) - buffer.putLongLong(first); + body.putLongLong(first); else - buffer.putLongLong(first | agent->first); - buffer.putLongLong(second); + body.putLongLong(first | agent->first); + body.putLongLong(second); + + body.reset(); + body.getRawData(buffer, len); } -void ObjectId::decode(framing::Buffer& buffer) +// decode as V1-format binary +void ObjectId::decode(const std::string& buffer) { - first = buffer.getLongLong(); - second = buffer.getLongLong(); + const uint32_t len = 16; + char _data[len]; + qpid::framing::Buffer body(_data, len); + + body.checkAvailable(buffer.length()); + body.putRawData(buffer); + body.reset(); + first = body.getLongLong(); + second = body.getLongLong(); + v2Key = boost::lexical_cast<std::string>(second); } +// generate the V2 key from the index fields defined +// in the schema. void ObjectId::setV2Key(const ManagementObject& object) { std::stringstream oname; @@ -145,6 +185,42 @@ void ObjectId::setV2Key(const ManagementObject& object) v2Key = oname.str(); } +// encode as V2-format map +void ObjectId::mapEncode(types::Variant::Map& map) const +{ + map["_object_name"] = v2Key; + if (!agentName.empty()) + map["_agent_name"] = agentName; + if (agentEpoch) + map["_agent_epoch"] = agentEpoch; +} + +// decode as v2-format map +void ObjectId::mapDecode(const types::Variant::Map& map) +{ + types::Variant::Map::const_iterator i; + + if ((i = map.find("_object_name")) != map.end()) + v2Key = i->second.asString(); + else + throw Exception("Required _object_name field missing."); + + if ((i = map.find("_agent_name")) != map.end()) + agentName = i->second.asString(); + + if ((i = map.find("_agent_epoch")) != map.end()) + agentEpoch = i->second.asInt64(); +} + + +ObjectId::operator types::Variant::Map() const +{ + types::Variant::Map m; + mapEncode(m); + return m; +} + + namespace qpid { namespace management { @@ -158,7 +234,7 @@ std::ostream& operator<<(std::ostream& out, const ObjectId& i) out << ((virtFirst & 0xF000000000000000LL) >> 60) << "-" << ((virtFirst & 0x0FFF000000000000LL) >> 48) << "-" << ((virtFirst & 0x0000FFFFF0000000LL) >> 28) << - "-" << (virtFirst & 0x000000000FFFFFFFLL) << + "-" << i.agentName << "-" << i.second; return out; } @@ -168,43 +244,88 @@ std::ostream& operator<<(std::ostream& out, const ObjectId& i) int ManagementObject::maxThreads = 1; int ManagementObject::nextThreadIndex = 0; -void ManagementObject::writeTimestamps (framing::Buffer& buf) const +void ManagementObject::writeTimestamps (std::string& buf) const { - buf.putShortString (getPackageName ()); - buf.putShortString (getClassName ()); - buf.putBin128 (getMd5Sum ()); - buf.putLongLong (updateTime); - buf.putLongLong (createTime); - buf.putLongLong (destroyTime); - objectId.encode(buf); + char _data[4000]; + qpid::framing::Buffer body(_data, 4000); + + body.putShortString (getPackageName ()); + body.putShortString (getClassName ()); + body.putBin128 (getMd5Sum ()); + body.putLongLong (updateTime); + body.putLongLong (createTime); + body.putLongLong (destroyTime); + + uint32_t len = body.getPosition(); + body.reset(); + body.getRawData(buf, len); + + std::string oid; + objectId.encode(oid); + buf += oid; } -void ManagementObject::readTimestamps (framing::Buffer& buf) +void ManagementObject::readTimestamps (const std::string& buf) { + char _data[4000]; + qpid::framing::Buffer body(_data, 4000); std::string unused; uint8_t unusedUuid[16]; - ObjectId unusedObjectId; - buf.getShortString(unused); - buf.getShortString(unused); - buf.getBin128(unusedUuid); - updateTime = buf.getLongLong(); - createTime = buf.getLongLong(); - destroyTime = buf.getLongLong(); - unusedObjectId.decode(buf); + body.checkAvailable(buf.length()); + body.putRawData(buf); + body.reset(); + + body.getShortString(unused); + body.getShortString(unused); + body.getBin128(unusedUuid); + updateTime = body.getLongLong(); + createTime = body.getLongLong(); + destroyTime = body.getLongLong(); } uint32_t ManagementObject::writeTimestampsSize() const { return 1 + getPackageName().length() + // str8 - 1 + getClassName().length() + // str8 - 16 + // bin128 - 8 + // uint64 - 8 + // uint64 - 8 + // uint64 - objectId.encodedSize(); // objectId + 1 + getClassName().length() + // str8 + 16 + // bin128 + 8 + // uint64 + 8 + // uint64 + 8 + // uint64 + objectId.encodedSize(); // objectId +} + + +void ManagementObject::writeTimestamps (types::Variant::Map& map) const +{ + types::Variant::Map oid, sid; + + sid["_package_name"] = getPackageName(); + sid["_class_name"] = getClassName(); + sid["_hash"] = qpid::types::Uuid(getMd5Sum()); + map["_schema_id"] = sid; + + objectId.mapEncode(oid); + map["_object_id"] = oid; + + map["_update_ts"] = updateTime; + map["_create_ts"] = createTime; + map["_delete_ts"] = destroyTime; +} + +void ManagementObject::readTimestamps (const types::Variant::Map& map) +{ + types::Variant::Map::const_iterator i; + + if ((i = map.find("_update_ts")) != map.end()) + updateTime = i->second.asUint64(); + if ((i = map.find("_create_ts")) != map.end()) + createTime = i->second.asUint64(); + if ((i = map.find("_delete_ts")) != map.end()) + destroyTime = i->second.asUint64(); } + void ManagementObject::setReference(ObjectId) {} int ManagementObject::getThreadIndex() { @@ -217,3 +338,26 @@ int ManagementObject::getThreadIndex() { } return thisIndex; } + + +void ManagementObject::mapEncode(types::Variant::Map& map, + bool includeProperties, + bool includeStatistics) +{ + types::Variant::Map values; + + writeTimestamps(map); + + mapEncodeValues(values, includeProperties, includeStatistics); + map["_values"] = values; +} + +void ManagementObject::mapDecode(const types::Variant::Map& map) +{ + types::Variant::Map::const_iterator i; + + readTimestamps(map); + + if ((i = map.find("_values")) != map.end()) + mapDecodeValues(i->second.asMap()); +} diff --git a/cpp/src/qpid/management/ManagementTopicExchange.cpp b/cpp/src/qpid/management/ManagementTopicExchange.cpp index 98650b3adf..7fdce133e5 100644 --- a/cpp/src/qpid/management/ManagementTopicExchange.cpp +++ b/cpp/src/qpid/management/ManagementTopicExchange.cpp @@ -28,13 +28,16 @@ using namespace qpid::framing; using namespace qpid::sys; ManagementTopicExchange::ManagementTopicExchange(const string& _name, Manageable* _parent, Broker* b) : - Exchange (_name, _parent, b), TopicExchange(_name, _parent, b) {} + Exchange (_name, _parent, b), + TopicExchange(_name, _parent, b), + managementAgent(0) {} ManagementTopicExchange::ManagementTopicExchange(const std::string& _name, bool _durable, const FieldTable& _args, Manageable* _parent, Broker* b) : Exchange (_name, _durable, _args, _parent, b), - TopicExchange(_name, _durable, _args, _parent, b) {} + TopicExchange(_name, _durable, _args, _parent, b), + managementAgent(0) {} void ManagementTopicExchange::route(Deliverable& msg, const string& routingKey, @@ -43,12 +46,8 @@ void ManagementTopicExchange::route(Deliverable& msg, bool routeIt = true; // Intercept management agent commands - if (qmfVersion == 1) { - if ((routingKey.length() > 6 && - routingKey.substr(0, 6).compare("agent.") == 0) || - (routingKey == "broker")) - routeIt = managementAgent->dispatchCommand(msg, routingKey, args); - } + if (managementAgent) + routeIt = managementAgent->dispatchCommand(msg, routingKey, args, true /* topic */); if (routeIt) TopicExchange::route(msg, routingKey, args); diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 7377edc3bb..9c1a761062 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -267,15 +267,15 @@ txjob_LDADD=$(lib_client) check_PROGRAMS+=PollerTest PollerTest_SOURCES=PollerTest.cpp -PollerTest_LDADD=$(lib_common) $(SOCKLIBS) +PollerTest_LDADD=$(lib_common) $(lib_client) $(SOCKLIBS) check_PROGRAMS+=DispatcherTest DispatcherTest_SOURCES=DispatcherTest.cpp -DispatcherTest_LDADD=$(lib_common) $(SOCKLIBS) +DispatcherTest_LDADD=$(lib_common) $(lib_client) $(SOCKLIBS) check_PROGRAMS+=datagen datagen_SOURCES=datagen.cpp -datagen_LDADD=$(lib_common) +datagen_LDADD=$(lib_common) $(lib_client) check_PROGRAMS+=qrsh_server qrsh_server_SOURCES=qrsh_server.cpp diff --git a/cpp/src/tests/ManagementTest.cpp b/cpp/src/tests/ManagementTest.cpp index d05b4676ba..99b9c1f03e 100644 --- a/cpp/src/tests/ManagementTest.cpp +++ b/cpp/src/tests/ManagementTest.cpp @@ -56,32 +56,34 @@ QPID_AUTO_TEST_CASE(testObjectIdSerializeString) { } QPID_AUTO_TEST_CASE(testObjectIdEncode) { - char buffer[100]; - Buffer msgBuf(buffer, 100); - msgBuf.putLongLong(0x1002000030000004LL); - msgBuf.putLongLong(0x0000000000000005LL); - msgBuf.reset(); + qpid::types::Variant::Map oidMap; - ObjectId oid(msgBuf); + ObjectId oid(1, 2, 3, 9999); + oid.setV2Key("testkey"); + oid.setAgentName("myAgent"); std::stringstream out1; out1 << oid; - BOOST_CHECK_EQUAL(out1.str(), "1-2-3-4-5"); + BOOST_CHECK_EQUAL(out1.str(), "1-2-3-myAgent-9999"); } QPID_AUTO_TEST_CASE(testObjectIdAttach) { AgentAttachment agent; - ObjectId oid(&agent, 10, 20, 50); + ObjectId oid(&agent, 10, 20); + oid.setV2Key("GabbaGabbaHey"); + oid.setAgentName("MrSmith"); std::stringstream out1; out1 << oid; - BOOST_CHECK_EQUAL(out1.str(), "10-20-0-0-50"); + + BOOST_CHECK_EQUAL(out1.str(), "10-20-0-MrSmith-0"); agent.setBanks(30, 40); std::stringstream out2; out2 << oid; - BOOST_CHECK_EQUAL(out2.str(), "10-20-30-40-50"); + + BOOST_CHECK_EQUAL(out2.str(), "10-20-30-MrSmith-0"); } QPID_AUTO_TEST_CASE(testConsoleObjectId) { |