summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp')
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp76
1 files changed, 43 insertions, 33 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index f7a4fbe6d0..344dd4f43b 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -118,6 +118,21 @@ ManagementAgentImpl::~ManagementAgentImpl()
}
}
+void ManagementAgentImpl::setName(const string& vendor, const string& product, const string& instance)
+{
+ attrMap["_vendor"] = vendor;
+ attrMap["_product"] = product;
+ string inst;
+ if (instance.empty()) {
+ inst = qpid::messaging::Uuid(true).str();
+ } else
+ inst = instance;
+
+ name_address = vendor + ":" + product + ":" + inst;
+ attrMap["_instance"] = inst;
+ attrMap["_name"] = name_address;
+}
+
void ManagementAgentImpl::init(const string& brokerHost,
uint16_t brokerPort,
uint16_t intervalSeconds,
@@ -224,7 +239,7 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se
headers["method"] = "indication";
headers["qmf.opcode"] = "_data_indication";
headers["qmf.content"] = "_event";
- headers["qmf.agent"] = std::string(agentName);
+ headers["qmf.agent"] = name_address;
content.encode();
connThreadBody.sendBuffer(msg.getContent(), 0,
@@ -288,20 +303,7 @@ void ManagementAgentImpl::setSignalCallback(Notifyable& _notifyable)
void ManagementAgentImpl::startProtocol()
{
- char rawbuffer[512];
- Buffer buffer(rawbuffer, 512);
-
- connected = true;
- encodeHeader(buffer, 'A');
- buffer.putShortString("RemoteAgent [C++]");
- systemId.encode (buffer);
- buffer.putLong(requestedBrokerBank);
- buffer.putLong(requestedAgentBank);
- uint32_t length = buffer.getPosition();
- buffer.reset();
- connThreadBody.sendBuffer(buffer, length, "qpid.management", "broker");
- QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank <<
- " reqAgent=" << requestedAgentBank);
+ sendHeartbeat();
}
void ManagementAgentImpl::storeData(bool requested)
@@ -337,6 +339,27 @@ void ManagementAgentImpl::retrieveData()
}
}
+void ManagementAgentImpl::sendHeartbeat()
+{
+ static const string addr_exchange("qmf.default.topic");
+ static const string addr_key("agent.ind.heartbeat");
+
+ messaging::Message msg;
+ messaging::MapContent content(msg);
+ messaging::Variant::Map& map(content.asMap());
+ messaging::Variant::Map headers;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_agent_heartbeat_indication";
+ headers["qmf.agent"] = name_address;
+
+ map["_values"] = attrMap;
+ content.encode();
+ connThreadBody.sendBuffer(msg.getContent(), 0, headers, addr_exchange, addr_key);
+
+ QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
+}
+
void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequence,
uint32_t code, string text)
{
@@ -531,7 +554,7 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st
headers["method"] = "response";
headers["qmf.opcode"] = "_query_response";
headers["qmf.content"] = "_data";
- headers["qmf.agent"] = std::string(agentName);
+ headers["qmf.agent"] = name_address;
content.encode();
connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo);
@@ -566,7 +589,7 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st
headers["method"] = "response";
headers["qmf.opcode"] = "_query_response";
headers["qmf.content"] = "_data";
- headers["qmf.agent"] = std::string(agentName);
+ headers["qmf.agent"] = name_address;
content.encode();
connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo);
@@ -753,7 +776,6 @@ void ManagementAgentImpl::encodeClassIndication(Buffer& buf,
void ManagementAgentImpl::periodicProcessing()
{
Mutex::ScopedLock lock(agentLock);
- uint32_t contentSize;
list<pair<ObjectId, ManagementObject*> > deleteList;
if (!connected)
@@ -839,7 +861,7 @@ void ManagementAgentImpl::periodicProcessing()
headers["method"] = "indication";
headers["qmf.opcode"] = "_data_indication";
headers["qmf.content"] = "_data";
- headers["qmf.agent"] = std::string(agentName);
+ headers["qmf.agent"] = name_address;
connThreadBody.sendBuffer(str, 0, headers, "qpid.management", key.str());
}
@@ -854,20 +876,7 @@ void ManagementAgentImpl::periodicProcessing()
}
deleteList.clear();
-
- {
-#define BUFSIZE 65536
- char msgChars[BUFSIZE];
- Buffer msgBuffer(msgChars, BUFSIZE);
- encodeHeader(msgBuffer, 'h');
- msgBuffer.putLongLong(uint64_t(Duration(now())));
- stringstream key;
- key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank;
-
- contentSize = BUFSIZE - msgBuffer.available();
- msgBuffer.reset();
- connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str());
- }
+ sendHeartbeat();
}
void ManagementAgentImpl::ConnectionThread::run()
@@ -902,6 +911,7 @@ void ManagementAgentImpl::ConnectionThread::run()
if (shutdown)
return;
operational = true;
+ agent.connected = true;
agent.startProtocol();
try {
Mutex::ScopedUnlock _unlock(connLock);