diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-03-17 14:36:35 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-03-17 14:36:35 +0000 |
commit | 54556026728f891c61a72048e141a1e6a1d85c20 (patch) | |
tree | 06467b9098a2f58302ced584db3f6384d1641a55 | |
parent | 46eaa1d1dc3be11277a1568ef33608744e5145b5 (diff) | |
download | qpid-python-54556026728f891c61a72048e141a1e6a1d85c20.tar.gz |
checkpoint
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@924309 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/include/qpid/agent/ManagementAgent.h | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 138 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.h | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.cpp | 342 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.h | 6 |
5 files changed, 361 insertions, 144 deletions
diff --git a/qpid/cpp/include/qpid/agent/ManagementAgent.h b/qpid/cpp/include/qpid/agent/ManagementAgent.h index d14fc9a24a..d786ec9ec6 100644 --- a/qpid/cpp/include/qpid/agent/ManagementAgent.h +++ b/qpid/cpp/include/qpid/agent/ManagementAgent.h @@ -54,9 +54,10 @@ class ManagementAgent class Name { public: - QMF_AGENT_EXTERN Name(std::string vendor, - std::string product, - std::string name); + QMF_AGENT_EXTERN Name(const std::string &vendor, + const std::string &product, + const std::string &name); + QMF_AGENT_EXTERN Name(const std::string &fullName); QMF_AGENT_EXTERN Name(); QMF_AGENT_EXTERN operator std::string() const; @@ -118,6 +119,9 @@ class ManagementAgent bool useExternalThread = false, const std::string& storeFile = "") = 0; + // Extract the unique name for this agent + virtual const Name& getName() = 0; + // Register a schema with the management agent. This is normally called by the // package initializer generated by the management code generator. // diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 5b2148a850..f7a4fbe6d0 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -23,6 +23,8 @@ #include "qpid/log/Statement.h" #include "qpid/agent/ManagementAgentImpl.h" #include "qpid/messaging/Message.h" +#include "qpid/messaging/ListContent.h" +#include "qpid/messaging/MapContent.h" #include <list> #include <string.h> #include <stdlib.h> @@ -198,7 +200,6 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se { Mutex::ScopedLock lock(agentLock); Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); - uint32_t outLen; uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; stringstream key; @@ -210,8 +211,7 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se ::qpid::messaging::VariantMap &map_ = content.asMap(); ::qpid::messaging::VariantMap schemaId; ::qpid::messaging::VariantMap values; - - mapEncodeHeader(map_, 'e'); + ::qpid::messaging::VariantMap headers; map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(), event.getEventName(), @@ -221,8 +221,15 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se map_["_timestamp"] = uint64_t(Duration(now())); map_["_severity"] = sev; + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_event"; + headers["qmf.agent"] = std::string(agentName); + content.encode(); - connThreadBody.sendBuffer(msg.getContent(), "qpid.management", key.str()); + connThreadBody.sendBuffer(msg.getContent(), 0, + headers, + "qpid.management", key.str()); } uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) @@ -416,12 +423,13 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc ClassMap& cMap = pIter->second; ClassMap::iterator cIter = cMap.find(key); if (cIter != cMap.end()) { - SchemaClass& schema = cIter->second; + //SchemaClass& schema = cIter->second; Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; encodeHeader(outBuffer, 's', sequence); - schema.writeSchemaCall(outBuffer); + //schema.writeSchemaCall(outBuffer); + assert(false); // TODO FIX ABOVE outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); @@ -448,7 +456,9 @@ void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequenc Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - ObjectId objId(inBuffer); + assert(false); // TODO FIX OBJ ID!! + //ObjectId objId(inBuffer); + ObjectId objId(std::string("foobag?")); inBuffer.getShortString(packageName); inBuffer.getShortString(className); inBuffer.getBin128(hash); @@ -469,7 +479,8 @@ void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequenc else try { outBuffer.record(); - iter->second->doMethod(methodName, inBuffer, outBuffer); + //iter->second->doMethod(methodName, inBuffer, outBuffer); + assert(false); // TODO: fix above } catch(exception& e) { outBuffer.restore(); outBuffer.putLong(Manageable::STATUS_EXCEPTION); @@ -508,17 +519,22 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st ::qpid::messaging::Variant::List &list_ = content.asList(); ::qpid::messaging::Variant::Map map_; ::qpid::messaging::Variant::Map values; + ::qpid::messaging::Variant::Map headers; if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); - mapEncodeHeader(map_, 'g', sequence); object->mapEncodeValues(values, true, true); // write both stats and properties map_["_values"] = values; - list.push_back(map_); + list_.push_back(map_); + + headers["method"] = "response"; + headers["qmf.opcode"] = "_query_response"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = std::string(agentName); content.encode(); - connThreadBody.sendBuffer(m.getContent(), "amq.direct", replyTo); + connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo); QPID_LOG(trace, "SENT ObjectInd"); } @@ -538,17 +554,22 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st ::qpid::messaging::Variant::List &list_ = content.asList(); ::qpid::messaging::Variant::Map map_; ::qpid::messaging::Variant::Map values; + ::qpid::messaging::Variant::Map headers; if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); - mapEncodeHeader(map_, 'g', sequence); object->mapEncodeValues(values, true, true); // write both stats and properties map_["_values"] = values; - list.push_back(map_); + list_.push_back(map_); + + headers["method"] = "response"; + headers["qmf.opcode"] = "_query_response"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = std::string(agentName); content.encode(); - connThreadBody.sendBuffer(m.getContent(), "amq.direct", replyTo); + connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo); QPID_LOG(trace, "SENT ObjectInd"); } @@ -623,22 +644,6 @@ void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq buf.putLong (seq); } -void ManagementAgentImpl::mapEncodeHeader(::qpid::messaging::VariantMap &map_, uint8_t opcode, uint32_t seq) -{ - map_["_version"] = "AM2"; - map_["_opcode"] = opcode; - map_["_sequence"] = seq; -} - - -void ManagementAgentImpl::mapEncodeHeader(::qpid::messaging::VariantMap &map_, uint8_t opcode, uint32_t seq) -{ - map_["_version"] = "AM2"; - map_["_opcode"] = opcode; - map_["_sequence"] = seq; -} - - qpid::messaging::Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const std::string& pname, const std::string& cname, const uint8_t *md5Sum) @@ -648,7 +653,7 @@ qpid::messaging::Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const std:: map_["_package_name"] = pname; map_["_class_name"] = cname; map_["_hash_str"] = std::string((const char *)md5Sum, - qpid::managment::ManagmentObject::MD5_LEN); + qpid::management::ManagementObject::MD5_LEN); return map_; } @@ -797,32 +802,25 @@ void ManagementAgentImpl::periodicProcessing() iter != managementObjects.end(); iter++) { ManagementObject* object = iter->second; + bool send_stats, send_props; if (baseObject->isSameClass(*object) && object->getFlags() == 0) { object->setFlags(1); if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); - if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) { - ::qpid::messaging::Variant::Map map_; - ::qpid::messaging::Variant::Map values; - mapEncodeHeader(map_, 'c'); + send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()); + send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish())); - object->getPackageName(); - object->getClassName(); - (object->getMd5Sum(), MD5_LEN); - - object->mapEncodeValues(values, true, false); // encode properties only - map_["_values"] = values; - list.push_back(map_); - } - - if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) { + if (send_stats || send_props) { ::qpid::messaging::Variant::Map map_; ::qpid::messaging::Variant::Map values; - mapEncodeHeader(map_, 'i'); - object->mapEncodeValues(values, false, true); // encode statistics only + + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + object->getMd5Sum()); + object->mapEncodeValues(values, send_props, send_stats); map_["_values"] = values; - list.push_back(map_); + list_.push_back(map_); } if (object->isDeleted()) @@ -835,9 +833,15 @@ void ManagementAgentImpl::periodicProcessing() const std::string &str = m.getContent(); if (str.length()) { stringstream key; + ::qpid::messaging::Variant::Map headers; key << "console.obj." << assignedBrokerBank << "." << assignedAgentBank << "." << baseObject->getPackageName() << "." << baseObject->getClassName(); - connThreadBody.sendBuffer(str, "qpid.management", key.str()); + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = std::string(agentName); + + connThreadBody.sendBuffer(str, 0, headers, "qpid.management", key.str()); } } @@ -951,18 +955,47 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, const string& exchange, const string& routingKey) { + Message msg; string data; buf.getRawData(data, length); - sendBuffer(data, exchange, routingKey); + msg.setData(data); + sendMessage(msg, exchange, routingKey); } void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data, + uint32_t sequence, + const qpid::messaging::VariantMap headers, const string& exchange, const string& routingKey) { + Message msg; + qpid::messaging::VariantMap::const_iterator i; + + if (sequence) { + std::stringstream seqstr; + seqstr << sequence; + msg.getMessageProperties().setCorrelationId(seqstr.str()); + } + for (i = headers.begin(); i != headers.end(); ++i) { + msg.getHeaders().setString(i->first, i->second.asString()); + } + msg.getHeaders().setString("app_id", "qmf2"); + + msg.setData(data); + sendMessage(msg, exchange, routingKey); +} + + + + + +void ManagementAgentImpl::ConnectionThread::sendMessage(Message msg, + const string& exchange, + const string& routingKey) +{ ConnectionThread::shared_ptr s; { Mutex::ScopedLock _lock(connLock); @@ -971,15 +1004,12 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data, s = subscriptions; } - Message msg; - msg.getDeliveryProperties().setRoutingKey(routingKey); msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); - msg.setData(data); try { session.messageTransfer(arg::content=msg, arg::destination=exchange); } catch(exception& e) { - QPID_LOG(error, "Exception caught in sendBuffer: " << e.what()); + QPID_LOG(error, "Exception caught in sendMessage: " << e.what()); // Bounce the connection if (s) s->stop(); diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h index b3130154df..e778d43a13 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -64,6 +64,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen uint16_t intervalSeconds = 10, bool useExternalThread = false, const std::string& storeFile = ""); + const Name& getName(); bool isConnected() { return connected; } std::string& getLastFailure() { return lastFailure; } void registerClass(const std::string& packageName, @@ -200,8 +201,13 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen const std::string& exchange, const std::string& routingKey); void sendBuffer(const std::string& data, + const uint32_t sequence, + const qpid::messaging::VariantMap headers, const std::string& exchange, const std::string& routingKey); + void sendMessage(qpid::client::Message msg, + const std::string& exchange, + const std::string& routingKey); void bindToBank(uint32_t brokerBank, uint32_t agentBank); void close(); bool isSleeping() const; @@ -225,6 +231,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen static const std::string storeMagicNumber; + Name agentName; + void startProtocol(); void storeData(bool requested=false); void retrieveData(); @@ -241,7 +249,6 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen PackageMap::iterator pIter, ClassMap::iterator cIter); void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); - void mapEncodeHeader (::qpid::messaging::VariantMap& map_, uint8_t opcode, uint32_t seq = 0); qpid::messaging::Variant::Map mapEncodeSchemaId(const std::string& pname, const std::string& cname, const uint8_t *md5Sum); diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 4254961f5e..c48d235aa2 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -31,6 +31,8 @@ #include "qpid/broker/AclModule.h" #include "qpid/messaging/Variant.h" #include "qpid/messaging/Uuid.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/ListContent.h" #include <list> #include <iostream> #include <fstream> @@ -47,6 +49,22 @@ using namespace qpid::sys; using namespace std; namespace _qmf = qmf::org::apache::qpid::broker; + + +static qpid::messaging::Variant::Map mapEncodeSchemaId(const std::string& pname, + const std::string& cname, + const uint8_t *md5Sum) +{ + qpid::messaging::Variant::Map map_; + + map_["_package_name"] = pname; + map_["_class_name"] = cname; + map_["_hash_str"] = std::string((const char *)md5Sum, + qpid::management::ManagementObject::MD5_LEN); + return map_; +} + + ManagementAgent::RemoteAgent::~RemoteAgent () { QPID_LOG(trace, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]"); @@ -59,7 +77,7 @@ ManagementAgent::RemoteAgent::~RemoteAgent () ManagementAgent::ManagementAgent () : threadPoolSize(1), interval(10), broker(0), timer(0), startTime(uint64_t(Duration(now()))), - suppressed(false) + suppressed(false), agentName("") { nextObjectId = 1; brokerBank = 1; @@ -223,17 +241,29 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, newManagementObjects[objId] = object; if (publishNow) { -#define IMM_BUFSIZE 65536 - char rawBuf[IMM_BUFSIZE]; - Buffer msgBuffer(rawBuf, IMM_BUFSIZE); - - encodeHeader(msgBuffer, 'c'); - object->writeProperties(msgBuffer); - uint32_t contentSize = msgBuffer.getPosition(); + ::qpid::messaging::Message m; + ::qpid::messaging::ListContent content(m); + ::qpid::messaging::Variant::List &list_ = content.asList(); + ::qpid::messaging::Variant::Map map_; + ::qpid::messaging::Variant::Map values; + ::qpid::messaging::Variant::Map headers; + + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + object->getMd5Sum()); + object->mapEncodeValues(values, true, false); // send props only + map_["_values"] = values; + list_.push_back(map_); + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = std::string(agentName); + + content.encode(); stringstream key; key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName(); - msgBuffer.reset(); - sendBuffer(msgBuffer, contentSize, mExchange, key.str()); + sendBuffer(m.getContent(), 0, headers, mExchange, key.str()); QPID_LOG(trace, "SEND Immediate ContentInd to=" << key.str()); } @@ -243,20 +273,31 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severity) { Mutex::ScopedLock lock (userLock); - Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); - uint32_t outLen; uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; - encodeHeader(outBuffer, 'e'); - outBuffer.putShortString(event.getPackageName()); - outBuffer.putShortString(event.getEventName()); - outBuffer.putBin128(event.getMd5Sum()); - outBuffer.putLongLong(uint64_t(Duration(now()))); - outBuffer.putOctet(sev); - event.encode(outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBuffer(outBuffer, outLen, mExchange, + ::qpid::messaging::Message msg; + ::qpid::messaging::MapContent content(msg); + ::qpid::messaging::VariantMap &map_ = content.asMap(); + ::qpid::messaging::VariantMap schemaId; + ::qpid::messaging::VariantMap values; + ::qpid::messaging::VariantMap headers; + + map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(), + event.getEventName(), + event.getMd5Sum()); + event.mapEncode(values); + map_["_values"] = values; + map_["_timestamp"] = uint64_t(Duration(now())); + map_["_severity"] = sev; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_event"; + headers["qmf.agent"] = std::string(agentName); + + content.encode(); + + sendBuffer(msg.getContent(), 0, headers, mExchange, "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); } @@ -365,6 +406,61 @@ void ManagementAgent::sendBuffer(Buffer& buf, } catch(exception&) {} } + +void ManagementAgent::sendBuffer(const std::string& data, + const uint32_t sequence, + const qpid::messaging::VariantMap headers, + qpid::broker::Exchange::shared_ptr exchange, + string routingKey) +{ + qpid::messaging::VariantMap::const_iterator i; + + if (suppressed) { + QPID_LOG(trace, "Suppressed management message to " << routingKey); + return; + } + if (exchange.get() == 0) return; + + intrusive_ptr<Message> msg(new Message()); + AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0))); + AMQFrame header((AMQHeaderBody())); + AMQFrame content((AMQContentBody(data))); + + method.setEof(false); + header.setBof(false); + header.setEof(false); + content.setBof(false); + + msg->getFrames().append(method); + msg->getFrames().append(header); + + MessageProperties* props = + msg->getFrames().getHeaders()->get<MessageProperties>(true); + props->setContentLength(data.length()); + if (sequence) { + std::stringstream seqstr; + seqstr << sequence; + props->setCorrelationId(seqstr.str()); + } + + for (i = headers.begin(); i != headers.end(); ++i) { + msg->getOrInsertHeaders().setString(i->first, i->second.asString()); + } + msg->getOrInsertHeaders().setString("app_id", "qmf2"); + + DeliveryProperties* dp = + msg->getFrames().getHeaders()->get<DeliveryProperties>(true); + dp->setRoutingKey(routingKey); + + msg->getFrames().append(content); + + DeliverableMessage deliverable (msg); + try { + exchange->route(deliverable, routingKey, 0); + } catch(exception&) {} +} + + void ManagementAgent::moveNewObjectsLH() { Mutex::ScopedLock lock (addLock); @@ -399,12 +495,8 @@ void ManagementAgent::moveNewObjectsLH() void ManagementAgent::periodicProcessing (void) { -#define BUFSIZE 65536 -#define HEADROOM 4096 - QPID_LOG(trace, "Management agent periodic processing") - Mutex::ScopedLock lock (userLock); - char msgChars[BUFSIZE]; - uint32_t contentSize; + QPID_LOG(trace, "Management agent periodic processing"); + Mutex::ScopedLock lock (userLock); string routingKey; list<pair<ObjectId, ManagementObject*> > deleteList; @@ -448,43 +540,57 @@ void ManagementAgent::periodicProcessing (void) !baseObject->isDeleted())) continue; - Buffer msgBuffer(msgChars, BUFSIZE); + ::qpid::messaging::Message m; + ::qpid::messaging::ListContent content(m); + ::qpid::messaging::Variant::List &list_ = content.asList(); + for (ManagementObjectMap::iterator iter = baseIter; iter != managementObjects.end(); iter++) { ManagementObject* object = iter->second; + bool send_stats, send_props; if (baseObject->isSameClass(*object) && object->getFlags() == 0) { object->setFlags(1); if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); - if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) { - encodeHeader(msgBuffer, 'c'); - object->writeProperties(msgBuffer); - pcount++; - } - - if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) { - encodeHeader(msgBuffer, 'i'); - object->writeStatistics(msgBuffer); - scount++; + send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()); + send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish())); + + if (send_stats || send_props) { + ::qpid::messaging::Variant::Map map_; + ::qpid::messaging::Variant::Map values; + + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + object->getMd5Sum()); + object->mapEncodeValues(values, send_props, send_stats); + map_["_values"] = values; + list_.push_back(map_); + + if (send_props) pcount++; + if (send_stats) scount++; } if (object->isDeleted()) deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object)); object->setForcePublish(false); - - if (msgBuffer.available() < HEADROOM) - break; } } - contentSize = BUFSIZE - msgBuffer.available(); - if (contentSize > 0) { - msgBuffer.reset(); + content.encode(); + const std::string &str = m.getContent(); + if (str.length()) { stringstream key; + ::qpid::messaging::Variant::Map headers; key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName(); - sendBuffer(msgBuffer, contentSize, mExchange, key.str()); + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = std::string(agentName); + + sendBuffer(str, 0, headers, mExchange, key.str()); QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount); } } @@ -502,15 +608,33 @@ void ManagementAgent::periodicProcessing (void) for (ManagementObjectVector::iterator cdIter = deletedManagementObjects.begin(); cdIter != deletedManagementObjects.end(); cdIter++) { collisionDeletions = true; - Buffer msgBuffer(msgChars, BUFSIZE); - encodeHeader(msgBuffer, 'c'); - (*cdIter)->writeProperties(msgBuffer); - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - stringstream key; - key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName(); - sendBuffer (msgBuffer, contentSize, mExchange, key.str()); - QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str()); + { + ::qpid::messaging::Message m; + ::qpid::messaging::ListContent content(m); + ::qpid::messaging::Variant::List &list_ = content.asList(); + ::qpid::messaging::Variant::Map map_; + ::qpid::messaging::Variant::Map values; + ::qpid::messaging::Variant::Map headers; + + map_["_schema_id"] = mapEncodeSchemaId((*cdIter)->getPackageName(), + (*cdIter)->getClassName(), + (*cdIter)->getMd5Sum()); + (*cdIter)->mapEncodeValues(values, true, false); + map_["_values"] = values; + list_.push_back(map_); + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = std::string(agentName); + + content.encode(); + + stringstream key; + key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName(); + sendBuffer(m.getContent(), 0, headers, mExchange, key.str()); + QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str()); + } } if (!deleteList.empty() || collisionDeletions) { @@ -519,6 +643,9 @@ void ManagementAgent::periodicProcessing (void) } { +#define BUFSIZE 65536 + uint32_t contentSize; + char msgChars[BUFSIZE]; Buffer msgBuffer(msgChars, BUFSIZE); encodeHeader(msgBuffer, 'h'); msgBuffer.putLongLong(uint64_t(Duration(now()))); @@ -541,18 +668,31 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) if (!object->isDeleted()) return; -#define DNOW_BUFSIZE 2048 - char msgChars[DNOW_BUFSIZE]; - uint32_t contentSize; - Buffer msgBuffer(msgChars, DNOW_BUFSIZE); + ::qpid::messaging::Message m; + ::qpid::messaging::ListContent content(m); + ::qpid::messaging::Variant::List &list_ = content.asList(); + ::qpid::messaging::Variant::Map map_; + ::qpid::messaging::Variant::Map values; + + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + object->getMd5Sum()); + object->mapEncodeValues(values, true, false); + map_["_values"] = values; + list_.push_back(map_); - encodeHeader(msgBuffer, 'c'); - object->writeProperties(msgBuffer); - contentSize = msgBuffer.getPosition(); - msgBuffer.reset(); stringstream key; key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName(); - sendBuffer(msgBuffer, contentSize, mExchange, key.str()); + + content.encode(); + + ::qpid::messaging::Variant::Map headers; + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = std::string(agentName); + + sendBuffer(m.getContent(), 0, headers, mExchange, key.str()); QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str()); managementObjects.erase(oid); @@ -621,7 +761,9 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey uint32_t outLen; AclModule* acl = broker->getAcl(); - ObjectId objId(inBuffer); + //ObjectId objId(inBuffer); + assert(false); // KAG TODO FIXME + ObjectId objId(std::string("fleabag???")); inBuffer.getShortString(packageName); inBuffer.getShortString(className); inBuffer.getBin128(hash); @@ -674,7 +816,8 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey try { outBuffer.record(); Mutex::ScopedUnlock u(userLock); - iter->second->doMethod(methodName, inBuffer, outBuffer); + //iter->second->doMethod(methodName, inBuffer, outBuffer); + assert(false); // KAG TODO FIX } catch(exception& e) { outBuffer.restore(); outBuffer.putLong(Manageable::STATUS_EXCEPTION); @@ -814,7 +957,8 @@ void ManagementAgent::SchemaClass::appendSchema(Buffer& buf) // is from a remote management agent, send the stored schema information. if (writeSchemaCall != 0) - writeSchemaCall(buf); + //writeSchemaCall(buf); + assert(false); // KAG TODO FIX else buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size()); } @@ -991,7 +1135,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, string replyToKey agent->mgmtObject->set_connectionRef(agent->connectionRef); agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); - agent->mgmtObject->set_systemId (systemId); + agent->mgmtObject->set_systemId ((const unsigned char*)systemId.data()); agent->mgmtObject->set_brokerBank (brokerBank); agent->mgmtObject->set_agentBank (assignedBank); addObject (agent->mgmtObject, 0, true); @@ -1034,19 +1178,29 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin ManagementObjectMap::iterator iter = numericFind(selector); if (iter != managementObjects.end()) { ManagementObject* object = iter->second; - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ::qpid::messaging::Message m; + ::qpid::messaging::ListContent content(m); + ::qpid::messaging::Variant::List &list_ = content.asList(); + ::qpid::messaging::Variant::Map map_; + ::qpid::messaging::Variant::Map values; + ::qpid::messaging::Variant::Map headers; if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); if (!object->isDeleted()) { - encodeHeader(outBuffer, 'g', sequence); - object->writeProperties(outBuffer); - object->writeStatistics(outBuffer, true); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBuffer(outBuffer, outLen, dExchange, replyToKey); + object->mapEncodeValues(values, true, true); // write both stats and properties + map_["_values"] = values; + list_.push_back(map_); + + headers["method"] = "response"; + headers["qmf.opcode"] = "_query_response"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = std::string(agentName); + + content.encode(); + + sendBuffer(m.getContent(), sequence, headers, dExchange, replyToKey); QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence); } } @@ -1061,19 +1215,29 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin iter++) { ManagementObject* object = iter->second; if (object->getClassName () == className) { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ::qpid::messaging::Message m; + ::qpid::messaging::ListContent content(m); + ::qpid::messaging::Variant::List &list_ = content.asList(); + ::qpid::messaging::Variant::Map map_; + ::qpid::messaging::Variant::Map values; + ::qpid::messaging::Variant::Map headers; if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); if (!object->isDeleted()) { - encodeHeader(outBuffer, 'g', sequence); - object->writeProperties(outBuffer); - object->writeStatistics(outBuffer, true); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBuffer(outBuffer, outLen, dExchange, replyToKey); + object->mapEncodeValues(values, true, true); // write both stats and properties + map_["_values"] = values; + list_.push_back(map_); + + headers["method"] = "response"; + headers["qmf.opcode"] = "_query_response"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = std::string(agentName); + + content.encode(); + + sendBuffer(m.getContent(), sequence, headers, dExchange, replyToKey); QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence); } } @@ -1096,7 +1260,9 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) inBuffer.reset(); if (!checkHeader(inBuffer, &opcode, &sequence)) - return false; + // KAG TODO: handle new map style messages also! + //return false; + assert(false); if (opcode == 'M') { // TODO: check method call against ACL list. @@ -1111,7 +1277,9 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) string methodName; map<acl::Property, string> params; - ObjectId objId(inBuffer); + //ObjectId objId(inBuffer); + inBuffer.getLongLong(); + inBuffer.getLongLong(); inBuffer.getShortString(packageName); inBuffer.getShortString(className); inBuffer.getBin128(hash); @@ -1132,6 +1300,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; + // KAG TODO: old-style response encodeHeader(outBuffer, 'm', sequence); outBuffer.putLong(Manageable::STATUS_FORBIDDEN); outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); @@ -1173,6 +1342,7 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg) uint32_t bufferLen = inBuffer.getPosition(); inBuffer.reset(); + // KAG TODO: need to handle map style method requests while (inBuffer.getPosition() < bufferLen) { if (!checkHeader(inBuffer, &opcode, &sequence)) return; diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index 356710cb95..893ff947a4 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -256,6 +256,7 @@ private: typedef std::pair<std::string,std::string> MethodName; typedef std::map<MethodName, std::string> DisallowedMethods; DisallowedMethods disallowed; + std::string agentName; // KAG TODO FIX # define MA_BUFFER_SIZE 65536 @@ -272,6 +273,11 @@ private: uint32_t length, qpid::broker::Exchange::shared_ptr exchange, std::string routingKey); + void sendBuffer(const std::string& data, + const uint32_t sequence, + const qpid::messaging::VariantMap headers, + qpid::broker::Exchange::shared_ptr exchange, + std::string routingKey); void moveNewObjectsLH(); bool authorizeAgentMessageLH(qpid::broker::Message& msg); |