diff options
Diffstat (limited to 'qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 76 |
1 files changed, 43 insertions, 33 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index f7a4fbe6d0..344dd4f43b 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -118,6 +118,21 @@ ManagementAgentImpl::~ManagementAgentImpl() } } +void ManagementAgentImpl::setName(const string& vendor, const string& product, const string& instance) +{ + attrMap["_vendor"] = vendor; + attrMap["_product"] = product; + string inst; + if (instance.empty()) { + inst = qpid::messaging::Uuid(true).str(); + } else + inst = instance; + + name_address = vendor + ":" + product + ":" + inst; + attrMap["_instance"] = inst; + attrMap["_name"] = name_address; +} + void ManagementAgentImpl::init(const string& brokerHost, uint16_t brokerPort, uint16_t intervalSeconds, @@ -224,7 +239,7 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se headers["method"] = "indication"; headers["qmf.opcode"] = "_data_indication"; headers["qmf.content"] = "_event"; - headers["qmf.agent"] = std::string(agentName); + headers["qmf.agent"] = name_address; content.encode(); connThreadBody.sendBuffer(msg.getContent(), 0, @@ -288,20 +303,7 @@ void ManagementAgentImpl::setSignalCallback(Notifyable& _notifyable) void ManagementAgentImpl::startProtocol() { - char rawbuffer[512]; - Buffer buffer(rawbuffer, 512); - - connected = true; - encodeHeader(buffer, 'A'); - buffer.putShortString("RemoteAgent [C++]"); - systemId.encode (buffer); - buffer.putLong(requestedBrokerBank); - buffer.putLong(requestedAgentBank); - uint32_t length = buffer.getPosition(); - buffer.reset(); - connThreadBody.sendBuffer(buffer, length, "qpid.management", "broker"); - QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank << - " reqAgent=" << requestedAgentBank); + sendHeartbeat(); } void ManagementAgentImpl::storeData(bool requested) @@ -337,6 +339,27 @@ void ManagementAgentImpl::retrieveData() } } +void ManagementAgentImpl::sendHeartbeat() +{ + static const string addr_exchange("qmf.default.topic"); + static const string addr_key("agent.ind.heartbeat"); + + messaging::Message msg; + messaging::MapContent content(msg); + messaging::Variant::Map& map(content.asMap()); + messaging::Variant::Map headers; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_agent_heartbeat_indication"; + headers["qmf.agent"] = name_address; + + map["_values"] = attrMap; + content.encode(); + connThreadBody.sendBuffer(msg.getContent(), 0, headers, addr_exchange, addr_key); + + QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); +} + void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequence, uint32_t code, string text) { @@ -531,7 +554,7 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st headers["method"] = "response"; headers["qmf.opcode"] = "_query_response"; headers["qmf.content"] = "_data"; - headers["qmf.agent"] = std::string(agentName); + headers["qmf.agent"] = name_address; content.encode(); connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo); @@ -566,7 +589,7 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st headers["method"] = "response"; headers["qmf.opcode"] = "_query_response"; headers["qmf.content"] = "_data"; - headers["qmf.agent"] = std::string(agentName); + headers["qmf.agent"] = name_address; content.encode(); connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo); @@ -753,7 +776,6 @@ void ManagementAgentImpl::encodeClassIndication(Buffer& buf, void ManagementAgentImpl::periodicProcessing() { Mutex::ScopedLock lock(agentLock); - uint32_t contentSize; list<pair<ObjectId, ManagementObject*> > deleteList; if (!connected) @@ -839,7 +861,7 @@ void ManagementAgentImpl::periodicProcessing() headers["method"] = "indication"; headers["qmf.opcode"] = "_data_indication"; headers["qmf.content"] = "_data"; - headers["qmf.agent"] = std::string(agentName); + headers["qmf.agent"] = name_address; connThreadBody.sendBuffer(str, 0, headers, "qpid.management", key.str()); } @@ -854,20 +876,7 @@ void ManagementAgentImpl::periodicProcessing() } deleteList.clear(); - - { -#define BUFSIZE 65536 - char msgChars[BUFSIZE]; - Buffer msgBuffer(msgChars, BUFSIZE); - encodeHeader(msgBuffer, 'h'); - msgBuffer.putLongLong(uint64_t(Duration(now()))); - stringstream key; - key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank; - - contentSize = BUFSIZE - msgBuffer.available(); - msgBuffer.reset(); - connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str()); - } + sendHeartbeat(); } void ManagementAgentImpl::ConnectionThread::run() @@ -902,6 +911,7 @@ void ManagementAgentImpl::ConnectionThread::run() if (shutdown) return; operational = true; + agent.connected = true; agent.startProtocol(); try { Mutex::ScopedUnlock _unlock(connLock); |