summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-03-17 14:36:35 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-03-17 14:36:35 +0000
commit54556026728f891c61a72048e141a1e6a1d85c20 (patch)
tree06467b9098a2f58302ced584db3f6384d1641a55
parent46eaa1d1dc3be11277a1568ef33608744e5145b5 (diff)
downloadqpid-python-54556026728f891c61a72048e141a1e6a1d85c20.tar.gz
checkpoint
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@924309 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/include/qpid/agent/ManagementAgent.h10
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp138
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.h9
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp342
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h6
5 files changed, 361 insertions, 144 deletions
diff --git a/qpid/cpp/include/qpid/agent/ManagementAgent.h b/qpid/cpp/include/qpid/agent/ManagementAgent.h
index d14fc9a24a..d786ec9ec6 100644
--- a/qpid/cpp/include/qpid/agent/ManagementAgent.h
+++ b/qpid/cpp/include/qpid/agent/ManagementAgent.h
@@ -54,9 +54,10 @@ class ManagementAgent
class Name {
public:
- QMF_AGENT_EXTERN Name(std::string vendor,
- std::string product,
- std::string name);
+ QMF_AGENT_EXTERN Name(const std::string &vendor,
+ const std::string &product,
+ const std::string &name);
+ QMF_AGENT_EXTERN Name(const std::string &fullName);
QMF_AGENT_EXTERN Name();
QMF_AGENT_EXTERN operator std::string() const;
@@ -118,6 +119,9 @@ class ManagementAgent
bool useExternalThread = false,
const std::string& storeFile = "") = 0;
+ // Extract the unique name for this agent
+ virtual const Name& getName() = 0;
+
// Register a schema with the management agent. This is normally called by the
// package initializer generated by the management code generator.
//
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 5b2148a850..f7a4fbe6d0 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -23,6 +23,8 @@
#include "qpid/log/Statement.h"
#include "qpid/agent/ManagementAgentImpl.h"
#include "qpid/messaging/Message.h"
+#include "qpid/messaging/ListContent.h"
+#include "qpid/messaging/MapContent.h"
#include <list>
#include <string.h>
#include <stdlib.h>
@@ -198,7 +200,6 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se
{
Mutex::ScopedLock lock(agentLock);
Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
stringstream key;
@@ -210,8 +211,7 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se
::qpid::messaging::VariantMap &map_ = content.asMap();
::qpid::messaging::VariantMap schemaId;
::qpid::messaging::VariantMap values;
-
- mapEncodeHeader(map_, 'e');
+ ::qpid::messaging::VariantMap headers;
map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
event.getEventName(),
@@ -221,8 +221,15 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se
map_["_timestamp"] = uint64_t(Duration(now()));
map_["_severity"] = sev;
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_event";
+ headers["qmf.agent"] = std::string(agentName);
+
content.encode();
- connThreadBody.sendBuffer(msg.getContent(), "qpid.management", key.str());
+ connThreadBody.sendBuffer(msg.getContent(), 0,
+ headers,
+ "qpid.management", key.str());
}
uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
@@ -416,12 +423,13 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc
ClassMap& cMap = pIter->second;
ClassMap::iterator cIter = cMap.find(key);
if (cIter != cMap.end()) {
- SchemaClass& schema = cIter->second;
+ //SchemaClass& schema = cIter->second;
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
encodeHeader(outBuffer, 's', sequence);
- schema.writeSchemaCall(outBuffer);
+ //schema.writeSchemaCall(outBuffer);
+ assert(false); // TODO FIX ABOVE
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
@@ -448,7 +456,9 @@ void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequenc
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- ObjectId objId(inBuffer);
+ assert(false); // TODO FIX OBJ ID!!
+ //ObjectId objId(inBuffer);
+ ObjectId objId(std::string("foobag?"));
inBuffer.getShortString(packageName);
inBuffer.getShortString(className);
inBuffer.getBin128(hash);
@@ -469,7 +479,8 @@ void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequenc
else
try {
outBuffer.record();
- iter->second->doMethod(methodName, inBuffer, outBuffer);
+ //iter->second->doMethod(methodName, inBuffer, outBuffer);
+ assert(false); // TODO: fix above
} catch(exception& e) {
outBuffer.restore();
outBuffer.putLong(Manageable::STATUS_EXCEPTION);
@@ -508,17 +519,22 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st
::qpid::messaging::Variant::List &list_ = content.asList();
::qpid::messaging::Variant::Map map_;
::qpid::messaging::Variant::Map values;
+ ::qpid::messaging::Variant::Map headers;
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
- mapEncodeHeader(map_, 'g', sequence);
object->mapEncodeValues(values, true, true); // write both stats and properties
map_["_values"] = values;
- list.push_back(map_);
+ list_.push_back(map_);
+
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_query_response";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = std::string(agentName);
content.encode();
- connThreadBody.sendBuffer(m.getContent(), "amq.direct", replyTo);
+ connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo);
QPID_LOG(trace, "SENT ObjectInd");
}
@@ -538,17 +554,22 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st
::qpid::messaging::Variant::List &list_ = content.asList();
::qpid::messaging::Variant::Map map_;
::qpid::messaging::Variant::Map values;
+ ::qpid::messaging::Variant::Map headers;
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
- mapEncodeHeader(map_, 'g', sequence);
object->mapEncodeValues(values, true, true); // write both stats and properties
map_["_values"] = values;
- list.push_back(map_);
+ list_.push_back(map_);
+
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_query_response";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = std::string(agentName);
content.encode();
- connThreadBody.sendBuffer(m.getContent(), "amq.direct", replyTo);
+ connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo);
QPID_LOG(trace, "SENT ObjectInd");
}
@@ -623,22 +644,6 @@ void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq
buf.putLong (seq);
}
-void ManagementAgentImpl::mapEncodeHeader(::qpid::messaging::VariantMap &map_, uint8_t opcode, uint32_t seq)
-{
- map_["_version"] = "AM2";
- map_["_opcode"] = opcode;
- map_["_sequence"] = seq;
-}
-
-
-void ManagementAgentImpl::mapEncodeHeader(::qpid::messaging::VariantMap &map_, uint8_t opcode, uint32_t seq)
-{
- map_["_version"] = "AM2";
- map_["_opcode"] = opcode;
- map_["_sequence"] = seq;
-}
-
-
qpid::messaging::Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const std::string& pname,
const std::string& cname,
const uint8_t *md5Sum)
@@ -648,7 +653,7 @@ qpid::messaging::Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const std::
map_["_package_name"] = pname;
map_["_class_name"] = cname;
map_["_hash_str"] = std::string((const char *)md5Sum,
- qpid::managment::ManagmentObject::MD5_LEN);
+ qpid::management::ManagementObject::MD5_LEN);
return map_;
}
@@ -797,32 +802,25 @@ void ManagementAgentImpl::periodicProcessing()
iter != managementObjects.end();
iter++) {
ManagementObject* object = iter->second;
+ bool send_stats, send_props;
if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
object->setFlags(1);
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
- if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) {
- ::qpid::messaging::Variant::Map map_;
- ::qpid::messaging::Variant::Map values;
- mapEncodeHeader(map_, 'c');
+ send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted());
+ send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
- object->getPackageName();
- object->getClassName();
- (object->getMd5Sum(), MD5_LEN);
-
- object->mapEncodeValues(values, true, false); // encode properties only
- map_["_values"] = values;
- list.push_back(map_);
- }
-
- if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) {
+ if (send_stats || send_props) {
::qpid::messaging::Variant::Map map_;
::qpid::messaging::Variant::Map values;
- mapEncodeHeader(map_, 'i');
- object->mapEncodeValues(values, false, true); // encode statistics only
+
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ object->getMd5Sum());
+ object->mapEncodeValues(values, send_props, send_stats);
map_["_values"] = values;
- list.push_back(map_);
+ list_.push_back(map_);
}
if (object->isDeleted())
@@ -835,9 +833,15 @@ void ManagementAgentImpl::periodicProcessing()
const std::string &str = m.getContent();
if (str.length()) {
stringstream key;
+ ::qpid::messaging::Variant::Map headers;
key << "console.obj." << assignedBrokerBank << "." << assignedAgentBank << "." <<
baseObject->getPackageName() << "." << baseObject->getClassName();
- connThreadBody.sendBuffer(str, "qpid.management", key.str());
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = std::string(agentName);
+
+ connThreadBody.sendBuffer(str, 0, headers, "qpid.management", key.str());
}
}
@@ -951,18 +955,47 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf,
const string& exchange,
const string& routingKey)
{
+ Message msg;
string data;
buf.getRawData(data, length);
- sendBuffer(data, exchange, routingKey);
+ msg.setData(data);
+ sendMessage(msg, exchange, routingKey);
}
void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data,
+ uint32_t sequence,
+ const qpid::messaging::VariantMap headers,
const string& exchange,
const string& routingKey)
{
+ Message msg;
+ qpid::messaging::VariantMap::const_iterator i;
+
+ if (sequence) {
+ std::stringstream seqstr;
+ seqstr << sequence;
+ msg.getMessageProperties().setCorrelationId(seqstr.str());
+ }
+ for (i = headers.begin(); i != headers.end(); ++i) {
+ msg.getHeaders().setString(i->first, i->second.asString());
+ }
+ msg.getHeaders().setString("app_id", "qmf2");
+
+ msg.setData(data);
+ sendMessage(msg, exchange, routingKey);
+}
+
+
+
+
+
+void ManagementAgentImpl::ConnectionThread::sendMessage(Message msg,
+ const string& exchange,
+ const string& routingKey)
+{
ConnectionThread::shared_ptr s;
{
Mutex::ScopedLock _lock(connLock);
@@ -971,15 +1004,12 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data,
s = subscriptions;
}
- Message msg;
-
msg.getDeliveryProperties().setRoutingKey(routingKey);
msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
- msg.setData(data);
try {
session.messageTransfer(arg::content=msg, arg::destination=exchange);
} catch(exception& e) {
- QPID_LOG(error, "Exception caught in sendBuffer: " << e.what());
+ QPID_LOG(error, "Exception caught in sendMessage: " << e.what());
// Bounce the connection
if (s)
s->stop();
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
index b3130154df..e778d43a13 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -64,6 +64,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
uint16_t intervalSeconds = 10,
bool useExternalThread = false,
const std::string& storeFile = "");
+ const Name& getName();
bool isConnected() { return connected; }
std::string& getLastFailure() { return lastFailure; }
void registerClass(const std::string& packageName,
@@ -200,8 +201,13 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
const std::string& exchange,
const std::string& routingKey);
void sendBuffer(const std::string& data,
+ const uint32_t sequence,
+ const qpid::messaging::VariantMap headers,
const std::string& exchange,
const std::string& routingKey);
+ void sendMessage(qpid::client::Message msg,
+ const std::string& exchange,
+ const std::string& routingKey);
void bindToBank(uint32_t brokerBank, uint32_t agentBank);
void close();
bool isSleeping() const;
@@ -225,6 +231,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
static const std::string storeMagicNumber;
+ Name agentName;
+
void startProtocol();
void storeData(bool requested=false);
void retrieveData();
@@ -241,7 +249,6 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
PackageMap::iterator pIter,
ClassMap::iterator cIter);
void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
- void mapEncodeHeader (::qpid::messaging::VariantMap& map_, uint8_t opcode, uint32_t seq = 0);
qpid::messaging::Variant::Map mapEncodeSchemaId(const std::string& pname,
const std::string& cname,
const uint8_t *md5Sum);
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 4254961f5e..c48d235aa2 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -31,6 +31,8 @@
#include "qpid/broker/AclModule.h"
#include "qpid/messaging/Variant.h"
#include "qpid/messaging/Uuid.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/ListContent.h"
#include <list>
#include <iostream>
#include <fstream>
@@ -47,6 +49,22 @@ using namespace qpid::sys;
using namespace std;
namespace _qmf = qmf::org::apache::qpid::broker;
+
+
+static qpid::messaging::Variant::Map mapEncodeSchemaId(const std::string& pname,
+ const std::string& cname,
+ const uint8_t *md5Sum)
+{
+ qpid::messaging::Variant::Map map_;
+
+ map_["_package_name"] = pname;
+ map_["_class_name"] = cname;
+ map_["_hash_str"] = std::string((const char *)md5Sum,
+ qpid::management::ManagementObject::MD5_LEN);
+ return map_;
+}
+
+
ManagementAgent::RemoteAgent::~RemoteAgent ()
{
QPID_LOG(trace, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]");
@@ -59,7 +77,7 @@ ManagementAgent::RemoteAgent::~RemoteAgent ()
ManagementAgent::ManagementAgent () :
threadPoolSize(1), interval(10), broker(0), timer(0),
startTime(uint64_t(Duration(now()))),
- suppressed(false)
+ suppressed(false), agentName("")
{
nextObjectId = 1;
brokerBank = 1;
@@ -223,17 +241,29 @@ ObjectId ManagementAgent::addObject(ManagementObject* object,
newManagementObjects[objId] = object;
if (publishNow) {
-#define IMM_BUFSIZE 65536
- char rawBuf[IMM_BUFSIZE];
- Buffer msgBuffer(rawBuf, IMM_BUFSIZE);
-
- encodeHeader(msgBuffer, 'c');
- object->writeProperties(msgBuffer);
- uint32_t contentSize = msgBuffer.getPosition();
+ ::qpid::messaging::Message m;
+ ::qpid::messaging::ListContent content(m);
+ ::qpid::messaging::Variant::List &list_ = content.asList();
+ ::qpid::messaging::Variant::Map map_;
+ ::qpid::messaging::Variant::Map values;
+ ::qpid::messaging::Variant::Map headers;
+
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ object->getMd5Sum());
+ object->mapEncodeValues(values, true, false); // send props only
+ map_["_values"] = values;
+ list_.push_back(map_);
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = std::string(agentName);
+
+ content.encode();
stringstream key;
key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName();
- msgBuffer.reset();
- sendBuffer(msgBuffer, contentSize, mExchange, key.str());
+ sendBuffer(m.getContent(), 0, headers, mExchange, key.str());
QPID_LOG(trace, "SEND Immediate ContentInd to=" << key.str());
}
@@ -243,20 +273,31 @@ ObjectId ManagementAgent::addObject(ManagementObject* object,
void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severity)
{
Mutex::ScopedLock lock (userLock);
- Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
- encodeHeader(outBuffer, 'e');
- outBuffer.putShortString(event.getPackageName());
- outBuffer.putShortString(event.getEventName());
- outBuffer.putBin128(event.getMd5Sum());
- outBuffer.putLongLong(uint64_t(Duration(now())));
- outBuffer.putOctet(sev);
- event.encode(outBuffer);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBuffer(outBuffer, outLen, mExchange,
+ ::qpid::messaging::Message msg;
+ ::qpid::messaging::MapContent content(msg);
+ ::qpid::messaging::VariantMap &map_ = content.asMap();
+ ::qpid::messaging::VariantMap schemaId;
+ ::qpid::messaging::VariantMap values;
+ ::qpid::messaging::VariantMap headers;
+
+ map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
+ event.getEventName(),
+ event.getMd5Sum());
+ event.mapEncode(values);
+ map_["_values"] = values;
+ map_["_timestamp"] = uint64_t(Duration(now()));
+ map_["_severity"] = sev;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_event";
+ headers["qmf.agent"] = std::string(agentName);
+
+ content.encode();
+
+ sendBuffer(msg.getContent(), 0, headers, mExchange,
"console.event.1.0." + event.getPackageName() + "." + event.getEventName());
}
@@ -365,6 +406,61 @@ void ManagementAgent::sendBuffer(Buffer& buf,
} catch(exception&) {}
}
+
+void ManagementAgent::sendBuffer(const std::string& data,
+ const uint32_t sequence,
+ const qpid::messaging::VariantMap headers,
+ qpid::broker::Exchange::shared_ptr exchange,
+ string routingKey)
+{
+ qpid::messaging::VariantMap::const_iterator i;
+
+ if (suppressed) {
+ QPID_LOG(trace, "Suppressed management message to " << routingKey);
+ return;
+ }
+ if (exchange.get() == 0) return;
+
+ intrusive_ptr<Message> msg(new Message());
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0)));
+ AMQFrame header((AMQHeaderBody()));
+ AMQFrame content((AMQContentBody(data)));
+
+ method.setEof(false);
+ header.setBof(false);
+ header.setEof(false);
+ content.setBof(false);
+
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+
+ MessageProperties* props =
+ msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ props->setContentLength(data.length());
+ if (sequence) {
+ std::stringstream seqstr;
+ seqstr << sequence;
+ props->setCorrelationId(seqstr.str());
+ }
+
+ for (i = headers.begin(); i != headers.end(); ++i) {
+ msg->getOrInsertHeaders().setString(i->first, i->second.asString());
+ }
+ msg->getOrInsertHeaders().setString("app_id", "qmf2");
+
+ DeliveryProperties* dp =
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
+ dp->setRoutingKey(routingKey);
+
+ msg->getFrames().append(content);
+
+ DeliverableMessage deliverable (msg);
+ try {
+ exchange->route(deliverable, routingKey, 0);
+ } catch(exception&) {}
+}
+
+
void ManagementAgent::moveNewObjectsLH()
{
Mutex::ScopedLock lock (addLock);
@@ -399,12 +495,8 @@ void ManagementAgent::moveNewObjectsLH()
void ManagementAgent::periodicProcessing (void)
{
-#define BUFSIZE 65536
-#define HEADROOM 4096
- QPID_LOG(trace, "Management agent periodic processing")
- Mutex::ScopedLock lock (userLock);
- char msgChars[BUFSIZE];
- uint32_t contentSize;
+ QPID_LOG(trace, "Management agent periodic processing");
+ Mutex::ScopedLock lock (userLock);
string routingKey;
list<pair<ObjectId, ManagementObject*> > deleteList;
@@ -448,43 +540,57 @@ void ManagementAgent::periodicProcessing (void)
!baseObject->isDeleted()))
continue;
- Buffer msgBuffer(msgChars, BUFSIZE);
+ ::qpid::messaging::Message m;
+ ::qpid::messaging::ListContent content(m);
+ ::qpid::messaging::Variant::List &list_ = content.asList();
+
for (ManagementObjectMap::iterator iter = baseIter;
iter != managementObjects.end();
iter++) {
ManagementObject* object = iter->second;
+ bool send_stats, send_props;
if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
object->setFlags(1);
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
- if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) {
- encodeHeader(msgBuffer, 'c');
- object->writeProperties(msgBuffer);
- pcount++;
- }
-
- if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) {
- encodeHeader(msgBuffer, 'i');
- object->writeStatistics(msgBuffer);
- scount++;
+ send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted());
+ send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
+
+ if (send_stats || send_props) {
+ ::qpid::messaging::Variant::Map map_;
+ ::qpid::messaging::Variant::Map values;
+
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ object->getMd5Sum());
+ object->mapEncodeValues(values, send_props, send_stats);
+ map_["_values"] = values;
+ list_.push_back(map_);
+
+ if (send_props) pcount++;
+ if (send_stats) scount++;
}
if (object->isDeleted())
deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object));
object->setForcePublish(false);
-
- if (msgBuffer.available() < HEADROOM)
- break;
}
}
- contentSize = BUFSIZE - msgBuffer.available();
- if (contentSize > 0) {
- msgBuffer.reset();
+ content.encode();
+ const std::string &str = m.getContent();
+ if (str.length()) {
stringstream key;
+ ::qpid::messaging::Variant::Map headers;
key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName();
- sendBuffer(msgBuffer, contentSize, mExchange, key.str());
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = std::string(agentName);
+
+ sendBuffer(str, 0, headers, mExchange, key.str());
QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
}
}
@@ -502,15 +608,33 @@ void ManagementAgent::periodicProcessing (void)
for (ManagementObjectVector::iterator cdIter = deletedManagementObjects.begin();
cdIter != deletedManagementObjects.end(); cdIter++) {
collisionDeletions = true;
- Buffer msgBuffer(msgChars, BUFSIZE);
- encodeHeader(msgBuffer, 'c');
- (*cdIter)->writeProperties(msgBuffer);
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
- stringstream key;
- key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName();
- sendBuffer (msgBuffer, contentSize, mExchange, key.str());
- QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str());
+ {
+ ::qpid::messaging::Message m;
+ ::qpid::messaging::ListContent content(m);
+ ::qpid::messaging::Variant::List &list_ = content.asList();
+ ::qpid::messaging::Variant::Map map_;
+ ::qpid::messaging::Variant::Map values;
+ ::qpid::messaging::Variant::Map headers;
+
+ map_["_schema_id"] = mapEncodeSchemaId((*cdIter)->getPackageName(),
+ (*cdIter)->getClassName(),
+ (*cdIter)->getMd5Sum());
+ (*cdIter)->mapEncodeValues(values, true, false);
+ map_["_values"] = values;
+ list_.push_back(map_);
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = std::string(agentName);
+
+ content.encode();
+
+ stringstream key;
+ key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName();
+ sendBuffer(m.getContent(), 0, headers, mExchange, key.str());
+ QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str());
+ }
}
if (!deleteList.empty() || collisionDeletions) {
@@ -519,6 +643,9 @@ void ManagementAgent::periodicProcessing (void)
}
{
+#define BUFSIZE 65536
+ uint32_t contentSize;
+ char msgChars[BUFSIZE];
Buffer msgBuffer(msgChars, BUFSIZE);
encodeHeader(msgBuffer, 'h');
msgBuffer.putLongLong(uint64_t(Duration(now())));
@@ -541,18 +668,31 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
if (!object->isDeleted())
return;
-#define DNOW_BUFSIZE 2048
- char msgChars[DNOW_BUFSIZE];
- uint32_t contentSize;
- Buffer msgBuffer(msgChars, DNOW_BUFSIZE);
+ ::qpid::messaging::Message m;
+ ::qpid::messaging::ListContent content(m);
+ ::qpid::messaging::Variant::List &list_ = content.asList();
+ ::qpid::messaging::Variant::Map map_;
+ ::qpid::messaging::Variant::Map values;
+
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ object->getMd5Sum());
+ object->mapEncodeValues(values, true, false);
+ map_["_values"] = values;
+ list_.push_back(map_);
- encodeHeader(msgBuffer, 'c');
- object->writeProperties(msgBuffer);
- contentSize = msgBuffer.getPosition();
- msgBuffer.reset();
stringstream key;
key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName();
- sendBuffer(msgBuffer, contentSize, mExchange, key.str());
+
+ content.encode();
+
+ ::qpid::messaging::Variant::Map headers;
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = std::string(agentName);
+
+ sendBuffer(m.getContent(), 0, headers, mExchange, key.str());
QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str());
managementObjects.erase(oid);
@@ -621,7 +761,9 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
uint32_t outLen;
AclModule* acl = broker->getAcl();
- ObjectId objId(inBuffer);
+ //ObjectId objId(inBuffer);
+ assert(false); // KAG TODO FIXME
+ ObjectId objId(std::string("fleabag???"));
inBuffer.getShortString(packageName);
inBuffer.getShortString(className);
inBuffer.getBin128(hash);
@@ -674,7 +816,8 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
try {
outBuffer.record();
Mutex::ScopedUnlock u(userLock);
- iter->second->doMethod(methodName, inBuffer, outBuffer);
+ //iter->second->doMethod(methodName, inBuffer, outBuffer);
+ assert(false); // KAG TODO FIX
} catch(exception& e) {
outBuffer.restore();
outBuffer.putLong(Manageable::STATUS_EXCEPTION);
@@ -814,7 +957,8 @@ void ManagementAgent::SchemaClass::appendSchema(Buffer& buf)
// is from a remote management agent, send the stored schema information.
if (writeSchemaCall != 0)
- writeSchemaCall(buf);
+ //writeSchemaCall(buf);
+ assert(false); // KAG TODO FIX
else
buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size());
}
@@ -991,7 +1135,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, string replyToKey
agent->mgmtObject->set_connectionRef(agent->connectionRef);
agent->mgmtObject->set_label (label);
agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
- agent->mgmtObject->set_systemId (systemId);
+ agent->mgmtObject->set_systemId ((const unsigned char*)systemId.data());
agent->mgmtObject->set_brokerBank (brokerBank);
agent->mgmtObject->set_agentBank (assignedBank);
addObject (agent->mgmtObject, 0, true);
@@ -1034,19 +1178,29 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin
ManagementObjectMap::iterator iter = numericFind(selector);
if (iter != managementObjects.end()) {
ManagementObject* object = iter->second;
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ ::qpid::messaging::Message m;
+ ::qpid::messaging::ListContent content(m);
+ ::qpid::messaging::Variant::List &list_ = content.asList();
+ ::qpid::messaging::Variant::Map map_;
+ ::qpid::messaging::Variant::Map values;
+ ::qpid::messaging::Variant::Map headers;
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
if (!object->isDeleted()) {
- encodeHeader(outBuffer, 'g', sequence);
- object->writeProperties(outBuffer);
- object->writeStatistics(outBuffer, true);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ object->mapEncodeValues(values, true, true); // write both stats and properties
+ map_["_values"] = values;
+ list_.push_back(map_);
+
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_query_response";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = std::string(agentName);
+
+ content.encode();
+
+ sendBuffer(m.getContent(), sequence, headers, dExchange, replyToKey);
QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
}
}
@@ -1061,19 +1215,29 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin
iter++) {
ManagementObject* object = iter->second;
if (object->getClassName () == className) {
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ ::qpid::messaging::Message m;
+ ::qpid::messaging::ListContent content(m);
+ ::qpid::messaging::Variant::List &list_ = content.asList();
+ ::qpid::messaging::Variant::Map map_;
+ ::qpid::messaging::Variant::Map values;
+ ::qpid::messaging::Variant::Map headers;
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
if (!object->isDeleted()) {
- encodeHeader(outBuffer, 'g', sequence);
- object->writeProperties(outBuffer);
- object->writeStatistics(outBuffer, true);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ object->mapEncodeValues(values, true, true); // write both stats and properties
+ map_["_values"] = values;
+ list_.push_back(map_);
+
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_query_response";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = std::string(agentName);
+
+ content.encode();
+
+ sendBuffer(m.getContent(), sequence, headers, dExchange, replyToKey);
QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
}
}
@@ -1096,7 +1260,9 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
inBuffer.reset();
if (!checkHeader(inBuffer, &opcode, &sequence))
- return false;
+ // KAG TODO: handle new map style messages also!
+ //return false;
+ assert(false);
if (opcode == 'M') {
// TODO: check method call against ACL list.
@@ -1111,7 +1277,9 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
string methodName;
map<acl::Property, string> params;
- ObjectId objId(inBuffer);
+ //ObjectId objId(inBuffer);
+ inBuffer.getLongLong();
+ inBuffer.getLongLong();
inBuffer.getShortString(packageName);
inBuffer.getShortString(className);
inBuffer.getBin128(hash);
@@ -1132,6 +1300,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
+ // KAG TODO: old-style response
encodeHeader(outBuffer, 'm', sequence);
outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
@@ -1173,6 +1342,7 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg)
uint32_t bufferLen = inBuffer.getPosition();
inBuffer.reset();
+ // KAG TODO: need to handle map style method requests
while (inBuffer.getPosition() < bufferLen) {
if (!checkHeader(inBuffer, &opcode, &sequence))
return;
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index 356710cb95..893ff947a4 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -256,6 +256,7 @@ private:
typedef std::pair<std::string,std::string> MethodName;
typedef std::map<MethodName, std::string> DisallowedMethods;
DisallowedMethods disallowed;
+ std::string agentName; // KAG TODO FIX
# define MA_BUFFER_SIZE 65536
@@ -272,6 +273,11 @@ private:
uint32_t length,
qpid::broker::Exchange::shared_ptr exchange,
std::string routingKey);
+ void sendBuffer(const std::string& data,
+ const uint32_t sequence,
+ const qpid::messaging::VariantMap headers,
+ qpid::broker::Exchange::shared_ptr exchange,
+ std::string routingKey);
void moveNewObjectsLH();
bool authorizeAgentMessageLH(qpid::broker::Message& msg);