summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp')
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp138
1 files changed, 84 insertions, 54 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 5b2148a850..f7a4fbe6d0 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -23,6 +23,8 @@
#include "qpid/log/Statement.h"
#include "qpid/agent/ManagementAgentImpl.h"
#include "qpid/messaging/Message.h"
+#include "qpid/messaging/ListContent.h"
+#include "qpid/messaging/MapContent.h"
#include <list>
#include <string.h>
#include <stdlib.h>
@@ -198,7 +200,6 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se
{
Mutex::ScopedLock lock(agentLock);
Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
stringstream key;
@@ -210,8 +211,7 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se
::qpid::messaging::VariantMap &map_ = content.asMap();
::qpid::messaging::VariantMap schemaId;
::qpid::messaging::VariantMap values;
-
- mapEncodeHeader(map_, 'e');
+ ::qpid::messaging::VariantMap headers;
map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
event.getEventName(),
@@ -221,8 +221,15 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se
map_["_timestamp"] = uint64_t(Duration(now()));
map_["_severity"] = sev;
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_event";
+ headers["qmf.agent"] = std::string(agentName);
+
content.encode();
- connThreadBody.sendBuffer(msg.getContent(), "qpid.management", key.str());
+ connThreadBody.sendBuffer(msg.getContent(), 0,
+ headers,
+ "qpid.management", key.str());
}
uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
@@ -416,12 +423,13 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc
ClassMap& cMap = pIter->second;
ClassMap::iterator cIter = cMap.find(key);
if (cIter != cMap.end()) {
- SchemaClass& schema = cIter->second;
+ //SchemaClass& schema = cIter->second;
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
encodeHeader(outBuffer, 's', sequence);
- schema.writeSchemaCall(outBuffer);
+ //schema.writeSchemaCall(outBuffer);
+ assert(false); // TODO FIX ABOVE
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
@@ -448,7 +456,9 @@ void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequenc
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- ObjectId objId(inBuffer);
+ assert(false); // TODO FIX OBJ ID!!
+ //ObjectId objId(inBuffer);
+ ObjectId objId(std::string("foobag?"));
inBuffer.getShortString(packageName);
inBuffer.getShortString(className);
inBuffer.getBin128(hash);
@@ -469,7 +479,8 @@ void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequenc
else
try {
outBuffer.record();
- iter->second->doMethod(methodName, inBuffer, outBuffer);
+ //iter->second->doMethod(methodName, inBuffer, outBuffer);
+ assert(false); // TODO: fix above
} catch(exception& e) {
outBuffer.restore();
outBuffer.putLong(Manageable::STATUS_EXCEPTION);
@@ -508,17 +519,22 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st
::qpid::messaging::Variant::List &list_ = content.asList();
::qpid::messaging::Variant::Map map_;
::qpid::messaging::Variant::Map values;
+ ::qpid::messaging::Variant::Map headers;
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
- mapEncodeHeader(map_, 'g', sequence);
object->mapEncodeValues(values, true, true); // write both stats and properties
map_["_values"] = values;
- list.push_back(map_);
+ list_.push_back(map_);
+
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_query_response";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = std::string(agentName);
content.encode();
- connThreadBody.sendBuffer(m.getContent(), "amq.direct", replyTo);
+ connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo);
QPID_LOG(trace, "SENT ObjectInd");
}
@@ -538,17 +554,22 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st
::qpid::messaging::Variant::List &list_ = content.asList();
::qpid::messaging::Variant::Map map_;
::qpid::messaging::Variant::Map values;
+ ::qpid::messaging::Variant::Map headers;
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
- mapEncodeHeader(map_, 'g', sequence);
object->mapEncodeValues(values, true, true); // write both stats and properties
map_["_values"] = values;
- list.push_back(map_);
+ list_.push_back(map_);
+
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_query_response";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = std::string(agentName);
content.encode();
- connThreadBody.sendBuffer(m.getContent(), "amq.direct", replyTo);
+ connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo);
QPID_LOG(trace, "SENT ObjectInd");
}
@@ -623,22 +644,6 @@ void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq
buf.putLong (seq);
}
-void ManagementAgentImpl::mapEncodeHeader(::qpid::messaging::VariantMap &map_, uint8_t opcode, uint32_t seq)
-{
- map_["_version"] = "AM2";
- map_["_opcode"] = opcode;
- map_["_sequence"] = seq;
-}
-
-
-void ManagementAgentImpl::mapEncodeHeader(::qpid::messaging::VariantMap &map_, uint8_t opcode, uint32_t seq)
-{
- map_["_version"] = "AM2";
- map_["_opcode"] = opcode;
- map_["_sequence"] = seq;
-}
-
-
qpid::messaging::Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const std::string& pname,
const std::string& cname,
const uint8_t *md5Sum)
@@ -648,7 +653,7 @@ qpid::messaging::Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const std::
map_["_package_name"] = pname;
map_["_class_name"] = cname;
map_["_hash_str"] = std::string((const char *)md5Sum,
- qpid::managment::ManagmentObject::MD5_LEN);
+ qpid::management::ManagementObject::MD5_LEN);
return map_;
}
@@ -797,32 +802,25 @@ void ManagementAgentImpl::periodicProcessing()
iter != managementObjects.end();
iter++) {
ManagementObject* object = iter->second;
+ bool send_stats, send_props;
if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
object->setFlags(1);
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
- if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) {
- ::qpid::messaging::Variant::Map map_;
- ::qpid::messaging::Variant::Map values;
- mapEncodeHeader(map_, 'c');
+ send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted());
+ send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
- object->getPackageName();
- object->getClassName();
- (object->getMd5Sum(), MD5_LEN);
-
- object->mapEncodeValues(values, true, false); // encode properties only
- map_["_values"] = values;
- list.push_back(map_);
- }
-
- if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) {
+ if (send_stats || send_props) {
::qpid::messaging::Variant::Map map_;
::qpid::messaging::Variant::Map values;
- mapEncodeHeader(map_, 'i');
- object->mapEncodeValues(values, false, true); // encode statistics only
+
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ object->getMd5Sum());
+ object->mapEncodeValues(values, send_props, send_stats);
map_["_values"] = values;
- list.push_back(map_);
+ list_.push_back(map_);
}
if (object->isDeleted())
@@ -835,9 +833,15 @@ void ManagementAgentImpl::periodicProcessing()
const std::string &str = m.getContent();
if (str.length()) {
stringstream key;
+ ::qpid::messaging::Variant::Map headers;
key << "console.obj." << assignedBrokerBank << "." << assignedAgentBank << "." <<
baseObject->getPackageName() << "." << baseObject->getClassName();
- connThreadBody.sendBuffer(str, "qpid.management", key.str());
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = std::string(agentName);
+
+ connThreadBody.sendBuffer(str, 0, headers, "qpid.management", key.str());
}
}
@@ -951,18 +955,47 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf,
const string& exchange,
const string& routingKey)
{
+ Message msg;
string data;
buf.getRawData(data, length);
- sendBuffer(data, exchange, routingKey);
+ msg.setData(data);
+ sendMessage(msg, exchange, routingKey);
}
void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data,
+ uint32_t sequence,
+ const qpid::messaging::VariantMap headers,
const string& exchange,
const string& routingKey)
{
+ Message msg;
+ qpid::messaging::VariantMap::const_iterator i;
+
+ if (sequence) {
+ std::stringstream seqstr;
+ seqstr << sequence;
+ msg.getMessageProperties().setCorrelationId(seqstr.str());
+ }
+ for (i = headers.begin(); i != headers.end(); ++i) {
+ msg.getHeaders().setString(i->first, i->second.asString());
+ }
+ msg.getHeaders().setString("app_id", "qmf2");
+
+ msg.setData(data);
+ sendMessage(msg, exchange, routingKey);
+}
+
+
+
+
+
+void ManagementAgentImpl::ConnectionThread::sendMessage(Message msg,
+ const string& exchange,
+ const string& routingKey)
+{
ConnectionThread::shared_ptr s;
{
Mutex::ScopedLock _lock(connLock);
@@ -971,15 +1004,12 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data,
s = subscriptions;
}
- Message msg;
-
msg.getDeliveryProperties().setRoutingKey(routingKey);
msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
- msg.setData(data);
try {
session.messageTransfer(arg::content=msg, arg::destination=exchange);
} catch(exception& e) {
- QPID_LOG(error, "Exception caught in sendBuffer: " << e.what());
+ QPID_LOG(error, "Exception caught in sendMessage: " << e.what());
// Bounce the connection
if (s)
s->stop();