From 31fd7b3308b31117ae918c55a12b3183889d0b07 Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Mon, 29 Mar 2010 15:48:33 +0000 Subject: broker agent qmf2 heartbeat and naming support git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@928808 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Broker.cpp | 1 + qpid/cpp/src/qpid/management/ManagementAgent.cpp | 95 +++++++++++++++++++++--- qpid/cpp/src/qpid/management/ManagementAgent.h | 16 +++- 3 files changed, 97 insertions(+), 15 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index c5a7bb1ea9..24c5a0c049 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -168,6 +168,7 @@ Broker::Broker(const Broker::Options& conf) : QPID_LOG(info, "Management enabled"); managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(), conf.mgmtPubInterval, this, conf.workerThreads + 3); + managementAgent->setName("apache.org", "qpidd"); _qmf::Package packageInitializer(managementAgent.get()); System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this); diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 1ee5671c6d..c8efae9f99 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -81,7 +81,7 @@ ManagementAgent::RemoteAgent::~RemoteAgent () ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : threadPoolSize(1), interval(10), broker(0), timer(0), startTime(uint64_t(Duration(now()))), - suppressed(false), agentName(""), + suppressed(false), qmf1Support(qmfV1), qmf2Support(qmfV2) { nextObjectId = 1; @@ -175,6 +175,27 @@ void ManagementAgent::pluginsInitialized() { timer->add(new Periodic(*this, interval)); } + +void ManagementAgent::setName(const string& vendor, const string& product, const string& instance) +{ + attrMap["_vendor"] = vendor; + attrMap["_product"] = product; + string inst; + if (instance.empty()) { + if (uuid.isNull()) + { + throw Exception("ManagementAgent::configure() must be called if default name is used."); + } + inst = uuid.str(); + } else + inst = instance; + + name_address = vendor + ":" + product + ":" + inst; + attrMap["_instance"] = inst; + attrMap["_name"] = name_address; +} + + void ManagementAgent::writeData () { string filename (dataDir + "/.mbrokerdata"); @@ -342,10 +363,10 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi headers["method"] = "indication"; headers["qmf.opcode"] = "_data_indication"; headers["qmf.content"] = "_event"; - headers["qmf.agent"] = std::string(agentName); + headers["qmf.agent"] = name_address; stringstream key; - key << "agent.ind.event." << sev << "." << std::string(agentName) << "." << event.getEventName(); + key << "agent.ind.event." << sev << "." << name_address << "." << event.getEventName(); content.encode(); sendBuffer(msg.getContent(), "", headers, v2Topic, key.str()); @@ -464,7 +485,7 @@ void ManagementAgent::sendBuffer(const std::string& data, const std::string& cid, const qpid::messaging::VariantMap& headers, qpid::broker::Exchange::shared_ptr exchange, - string routingKey) + const std::string& routingKey) { qpid::messaging::VariantMap::const_iterator i; @@ -677,7 +698,7 @@ void ManagementAgent::periodicProcessing (void) headers["method"] = "indication"; headers["qmf.opcode"] = "_data_indication"; headers["qmf.content"] = "_data"; - headers["qmf.agent"] = std::string(agentName); + headers["qmf.agent"] = name_address; sendBuffer(body, "", headers, v2Topic, key.str()); QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount); @@ -733,7 +754,7 @@ void ManagementAgent::periodicProcessing (void) headers["method"] = "indication"; headers["qmf.opcode"] = "_data_indication"; headers["qmf.content"] = "_data"; - headers["qmf.agent"] = std::string(agentName); + headers["qmf.agent"] = name_address; stringstream key; key << "agent.ind.data." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName(); @@ -750,7 +771,9 @@ void ManagementAgent::periodicProcessing (void) deleteOrphanedAgentsLH(); } - { + // heartbeat generation + + if (qmf1Support) { #define BUFSIZE 65536 uint32_t contentSize; char msgChars[BUFSIZE]; @@ -764,6 +787,27 @@ void ManagementAgent::periodicProcessing (void) sendBuffer (msgBuffer, contentSize, mExchange, routingKey); QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey); } + + if (qmf2Support) { + static const string addr_key("agent.ind.heartbeat"); + + messaging::Message msg; + messaging::MapContent content(msg); + messaging::Variant::Map& map(content.asMap()); + messaging::Variant::Map headers; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_agent_heartbeat_indication"; + headers["qmf.agent"] = name_address; + + map["_values"] = attrMap; + map["_values"].asMap()["timestamp"] = uint64_t(Duration(now())); + map["_values"].asMap()["heartbeat_interval"] = interval; + content.encode(); + sendBuffer(msg.getContent(), "", headers, v2Topic, addr_key); + + QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); + } QPID_LOG(debug, "periodic update " << debugSnapshot()); } @@ -816,7 +860,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) headers["method"] = "indication"; headers["qmf.opcode"] = "_data_indication"; headers["qmf.content"] = "_data"; - headers["qmf.agent"] = std::string(agentName); + headers["qmf.agent"] = name_address; content.encode(); sendBuffer(m.getContent(), "", headers, v2Topic, key.str()); @@ -978,7 +1022,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep headers["method"] = "response"; headers["qmf.opcode"] = "_method_response"; - headers["qmf.agent"] = std::string(agentName); + headers["qmf.agent"] = name_address; if ((oid = inMap.find("_object_id")) == inMap.end() || (mid = inMap.find("_method_name")) == inMap.end()) @@ -1489,7 +1533,7 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin } -void ManagementAgent::handleGetQueryLH(const std::string& body, std::string& replyTo, const std::string& cid, const std::string& contentType) +void ManagementAgent::handleGetQueryLH(const std::string& body, std::string replyTo, const std::string& cid, const std::string& contentType) { FieldTable ft; FieldTable::ValuePtr value; @@ -1511,7 +1555,7 @@ void ManagementAgent::handleGetQueryLH(const std::string& body, std::string& rep headers["method"] = "response"; headers["qmf.opcode"] = "_query_response"; headers["qmf.content"] = "_data"; - headers["qmf.agent"] = std::string(agentName); + headers["qmf.agent"] = name_address; headers["partial"]; ::qpid::messaging::Message outMsg; @@ -1592,6 +1636,31 @@ void ManagementAgent::handleGetQueryLH(const std::string& body, std::string& rep QPID_LOG(trace, "SEND GetResponse (v2) to=" << replyTo << " seq=" << cid); } + +void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo, + const string& cid) +{ + QPID_LOG(trace, "RCVD AgentLocateRequest"); + + messaging::Message msg; + messaging::MapContent content(msg); + messaging::Variant::Map& map(content.asMap()); + messaging::Variant::Map headers; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_agent_locate_response"; + headers["qmf.agent"] = name_address; + + map["_values"] = attrMap; + map["_values"].asMap()["timestamp"] = uint64_t(Duration(now())); + map["_values"].asMap()["heartbeat_interval"] = interval; + content.encode(); + sendBuffer(msg.getContent(), cid, headers, v2Direct, replyTo); + + QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo); +} + + bool ManagementAgent::authorizeAgentMessageLH(Message& msg) { Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); @@ -1720,7 +1789,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) headers["method"] = "response"; headers["qmf.opcode"] = "_method_response"; - headers["qmf.agent"] = std::string(agentName); + headers["qmf.agent"] = name_address; ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN; ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN); @@ -1794,6 +1863,8 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg) return handleMethodRequestLH(body, replyToKey, cid, msg.getPublisher()); else if (opcode == "_query_request") return handleGetQueryLH(body, replyToKey, cid, contentType); + else if (opcode == "_agent_locate_request") + return handleLocateRequestLH(body, replyToKey, cid); QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!"); return; diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index 73760940ab..2f83a66476 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -76,6 +76,9 @@ public: /** Called by cluster to suppress management output during update. */ void suppress(bool s) { suppressed = s; } + void setName(const std::string& vendor, + const std::string& product, + const std::string& instance=""); void setInterval(uint16_t _interval) { interval = _interval; } void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange, qpid::broker::Exchange::shared_ptr directExchange); @@ -277,7 +280,12 @@ private: typedef std::pair MethodName; typedef std::map DisallowedMethods; DisallowedMethods disallowed; - std::string agentName; // KAG TODO FIX + + // Agent name and address + qpid::messaging::Variant::Map attrMap; + std::string name_address; + + // supported management protocol bool qmf1Support; bool qmf2Support; @@ -300,7 +308,7 @@ private: const std::string& cid, const qpid::messaging::VariantMap& headers, qpid::broker::Exchange::shared_ptr exchange, - std::string routingKey); + const std::string& routingKey); void moveNewObjectsLH(); bool authorizeAgentMessageLH(qpid::broker::Message& msg); @@ -333,8 +341,10 @@ private: void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); - void handleGetQueryLH (const std::string& body, std::string& replyToKey, const std::string& cid, const std::string& contentType); + void handleGetQueryLH (const std::string& body, std::string replyToKey, const std::string& cid, const std::string& contentType); void handleMethodRequestLH (const std::string& body, std::string replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken); + void handleLocateRequestLH (const std::string& body, const std::string &replyToKey, const std::string& cid); + size_t validateSchema(framing::Buffer&, uint8_t kind); size_t validateTableSchema(framing::Buffer&); -- cgit v1.2.1