diff options
Diffstat (limited to 'qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 138 |
1 files changed, 84 insertions, 54 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 5b2148a850..f7a4fbe6d0 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -23,6 +23,8 @@ #include "qpid/log/Statement.h" #include "qpid/agent/ManagementAgentImpl.h" #include "qpid/messaging/Message.h" +#include "qpid/messaging/ListContent.h" +#include "qpid/messaging/MapContent.h" #include <list> #include <string.h> #include <stdlib.h> @@ -198,7 +200,6 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se { Mutex::ScopedLock lock(agentLock); Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); - uint32_t outLen; uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; stringstream key; @@ -210,8 +211,7 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se ::qpid::messaging::VariantMap &map_ = content.asMap(); ::qpid::messaging::VariantMap schemaId; ::qpid::messaging::VariantMap values; - - mapEncodeHeader(map_, 'e'); + ::qpid::messaging::VariantMap headers; map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(), event.getEventName(), @@ -221,8 +221,15 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se map_["_timestamp"] = uint64_t(Duration(now())); map_["_severity"] = sev; + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_event"; + headers["qmf.agent"] = std::string(agentName); + content.encode(); - connThreadBody.sendBuffer(msg.getContent(), "qpid.management", key.str()); + connThreadBody.sendBuffer(msg.getContent(), 0, + headers, + "qpid.management", key.str()); } uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) @@ -416,12 +423,13 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc ClassMap& cMap = pIter->second; ClassMap::iterator cIter = cMap.find(key); if (cIter != cMap.end()) { - SchemaClass& schema = cIter->second; + //SchemaClass& schema = cIter->second; Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; encodeHeader(outBuffer, 's', sequence); - schema.writeSchemaCall(outBuffer); + //schema.writeSchemaCall(outBuffer); + assert(false); // TODO FIX ABOVE outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); @@ -448,7 +456,9 @@ void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequenc Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - ObjectId objId(inBuffer); + assert(false); // TODO FIX OBJ ID!! + //ObjectId objId(inBuffer); + ObjectId objId(std::string("foobag?")); inBuffer.getShortString(packageName); inBuffer.getShortString(className); inBuffer.getBin128(hash); @@ -469,7 +479,8 @@ void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequenc else try { outBuffer.record(); - iter->second->doMethod(methodName, inBuffer, outBuffer); + //iter->second->doMethod(methodName, inBuffer, outBuffer); + assert(false); // TODO: fix above } catch(exception& e) { outBuffer.restore(); outBuffer.putLong(Manageable::STATUS_EXCEPTION); @@ -508,17 +519,22 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st ::qpid::messaging::Variant::List &list_ = content.asList(); ::qpid::messaging::Variant::Map map_; ::qpid::messaging::Variant::Map values; + ::qpid::messaging::Variant::Map headers; if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); - mapEncodeHeader(map_, 'g', sequence); object->mapEncodeValues(values, true, true); // write both stats and properties map_["_values"] = values; - list.push_back(map_); + list_.push_back(map_); + + headers["method"] = "response"; + headers["qmf.opcode"] = "_query_response"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = std::string(agentName); content.encode(); - connThreadBody.sendBuffer(m.getContent(), "amq.direct", replyTo); + connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo); QPID_LOG(trace, "SENT ObjectInd"); } @@ -538,17 +554,22 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st ::qpid::messaging::Variant::List &list_ = content.asList(); ::qpid::messaging::Variant::Map map_; ::qpid::messaging::Variant::Map values; + ::qpid::messaging::Variant::Map headers; if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); - mapEncodeHeader(map_, 'g', sequence); object->mapEncodeValues(values, true, true); // write both stats and properties map_["_values"] = values; - list.push_back(map_); + list_.push_back(map_); + + headers["method"] = "response"; + headers["qmf.opcode"] = "_query_response"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = std::string(agentName); content.encode(); - connThreadBody.sendBuffer(m.getContent(), "amq.direct", replyTo); + connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo); QPID_LOG(trace, "SENT ObjectInd"); } @@ -623,22 +644,6 @@ void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq buf.putLong (seq); } -void ManagementAgentImpl::mapEncodeHeader(::qpid::messaging::VariantMap &map_, uint8_t opcode, uint32_t seq) -{ - map_["_version"] = "AM2"; - map_["_opcode"] = opcode; - map_["_sequence"] = seq; -} - - -void ManagementAgentImpl::mapEncodeHeader(::qpid::messaging::VariantMap &map_, uint8_t opcode, uint32_t seq) -{ - map_["_version"] = "AM2"; - map_["_opcode"] = opcode; - map_["_sequence"] = seq; -} - - qpid::messaging::Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const std::string& pname, const std::string& cname, const uint8_t *md5Sum) @@ -648,7 +653,7 @@ qpid::messaging::Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const std:: map_["_package_name"] = pname; map_["_class_name"] = cname; map_["_hash_str"] = std::string((const char *)md5Sum, - qpid::managment::ManagmentObject::MD5_LEN); + qpid::management::ManagementObject::MD5_LEN); return map_; } @@ -797,32 +802,25 @@ void ManagementAgentImpl::periodicProcessing() iter != managementObjects.end(); iter++) { ManagementObject* object = iter->second; + bool send_stats, send_props; if (baseObject->isSameClass(*object) && object->getFlags() == 0) { object->setFlags(1); if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); - if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) { - ::qpid::messaging::Variant::Map map_; - ::qpid::messaging::Variant::Map values; - mapEncodeHeader(map_, 'c'); + send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()); + send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish())); - object->getPackageName(); - object->getClassName(); - (object->getMd5Sum(), MD5_LEN); - - object->mapEncodeValues(values, true, false); // encode properties only - map_["_values"] = values; - list.push_back(map_); - } - - if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) { + if (send_stats || send_props) { ::qpid::messaging::Variant::Map map_; ::qpid::messaging::Variant::Map values; - mapEncodeHeader(map_, 'i'); - object->mapEncodeValues(values, false, true); // encode statistics only + + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + object->getMd5Sum()); + object->mapEncodeValues(values, send_props, send_stats); map_["_values"] = values; - list.push_back(map_); + list_.push_back(map_); } if (object->isDeleted()) @@ -835,9 +833,15 @@ void ManagementAgentImpl::periodicProcessing() const std::string &str = m.getContent(); if (str.length()) { stringstream key; + ::qpid::messaging::Variant::Map headers; key << "console.obj." << assignedBrokerBank << "." << assignedAgentBank << "." << baseObject->getPackageName() << "." << baseObject->getClassName(); - connThreadBody.sendBuffer(str, "qpid.management", key.str()); + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = std::string(agentName); + + connThreadBody.sendBuffer(str, 0, headers, "qpid.management", key.str()); } } @@ -951,18 +955,47 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, const string& exchange, const string& routingKey) { + Message msg; string data; buf.getRawData(data, length); - sendBuffer(data, exchange, routingKey); + msg.setData(data); + sendMessage(msg, exchange, routingKey); } void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data, + uint32_t sequence, + const qpid::messaging::VariantMap headers, const string& exchange, const string& routingKey) { + Message msg; + qpid::messaging::VariantMap::const_iterator i; + + if (sequence) { + std::stringstream seqstr; + seqstr << sequence; + msg.getMessageProperties().setCorrelationId(seqstr.str()); + } + for (i = headers.begin(); i != headers.end(); ++i) { + msg.getHeaders().setString(i->first, i->second.asString()); + } + msg.getHeaders().setString("app_id", "qmf2"); + + msg.setData(data); + sendMessage(msg, exchange, routingKey); +} + + + + + +void ManagementAgentImpl::ConnectionThread::sendMessage(Message msg, + const string& exchange, + const string& routingKey) +{ ConnectionThread::shared_ptr s; { Mutex::ScopedLock _lock(connLock); @@ -971,15 +1004,12 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data, s = subscriptions; } - Message msg; - msg.getDeliveryProperties().setRoutingKey(routingKey); msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); - msg.setData(data); try { session.messageTransfer(arg::content=msg, arg::destination=exchange); } catch(exception& e) { - QPID_LOG(error, "Exception caught in sendBuffer: " << e.what()); + QPID_LOG(error, "Exception caught in sendMessage: " << e.what()); // Bounce the connection if (s) s->stop(); |