summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-03-29 15:48:33 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-03-29 15:48:33 +0000
commit31fd7b3308b31117ae918c55a12b3183889d0b07 (patch)
treeea3b222eb671e55b8b8a589d5b76484d197a0ba3
parentb4ec63cfb66c4c709d9755c5ab5f051cdb2718ed (diff)
downloadqpid-python-31fd7b3308b31117ae918c55a12b3183889d0b07.tar.gz
broker agent qmf2 heartbeat and naming support
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@928808 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp1
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp95
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h16
3 files changed, 97 insertions, 15 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index c5a7bb1ea9..24c5a0c049 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -168,6 +168,7 @@ Broker::Broker(const Broker::Options& conf) :
QPID_LOG(info, "Management enabled");
managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(),
conf.mgmtPubInterval, this, conf.workerThreads + 3);
+ managementAgent->setName("apache.org", "qpidd");
_qmf::Package packageInitializer(managementAgent.get());
System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this);
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 1ee5671c6d..c8efae9f99 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -81,7 +81,7 @@ ManagementAgent::RemoteAgent::~RemoteAgent ()
ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
threadPoolSize(1), interval(10), broker(0), timer(0),
startTime(uint64_t(Duration(now()))),
- suppressed(false), agentName(""),
+ suppressed(false),
qmf1Support(qmfV1), qmf2Support(qmfV2)
{
nextObjectId = 1;
@@ -175,6 +175,27 @@ void ManagementAgent::pluginsInitialized() {
timer->add(new Periodic(*this, interval));
}
+
+void ManagementAgent::setName(const string& vendor, const string& product, const string& instance)
+{
+ attrMap["_vendor"] = vendor;
+ attrMap["_product"] = product;
+ string inst;
+ if (instance.empty()) {
+ if (uuid.isNull())
+ {
+ throw Exception("ManagementAgent::configure() must be called if default name is used.");
+ }
+ inst = uuid.str();
+ } else
+ inst = instance;
+
+ name_address = vendor + ":" + product + ":" + inst;
+ attrMap["_instance"] = inst;
+ attrMap["_name"] = name_address;
+}
+
+
void ManagementAgent::writeData ()
{
string filename (dataDir + "/.mbrokerdata");
@@ -342,10 +363,10 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi
headers["method"] = "indication";
headers["qmf.opcode"] = "_data_indication";
headers["qmf.content"] = "_event";
- headers["qmf.agent"] = std::string(agentName);
+ headers["qmf.agent"] = name_address;
stringstream key;
- key << "agent.ind.event." << sev << "." << std::string(agentName) << "." << event.getEventName();
+ key << "agent.ind.event." << sev << "." << name_address << "." << event.getEventName();
content.encode();
sendBuffer(msg.getContent(), "", headers, v2Topic, key.str());
@@ -464,7 +485,7 @@ void ManagementAgent::sendBuffer(const std::string& data,
const std::string& cid,
const qpid::messaging::VariantMap& headers,
qpid::broker::Exchange::shared_ptr exchange,
- string routingKey)
+ const std::string& routingKey)
{
qpid::messaging::VariantMap::const_iterator i;
@@ -677,7 +698,7 @@ void ManagementAgent::periodicProcessing (void)
headers["method"] = "indication";
headers["qmf.opcode"] = "_data_indication";
headers["qmf.content"] = "_data";
- headers["qmf.agent"] = std::string(agentName);
+ headers["qmf.agent"] = name_address;
sendBuffer(body, "", headers, v2Topic, key.str());
QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
@@ -733,7 +754,7 @@ void ManagementAgent::periodicProcessing (void)
headers["method"] = "indication";
headers["qmf.opcode"] = "_data_indication";
headers["qmf.content"] = "_data";
- headers["qmf.agent"] = std::string(agentName);
+ headers["qmf.agent"] = name_address;
stringstream key;
key << "agent.ind.data." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName();
@@ -750,7 +771,9 @@ void ManagementAgent::periodicProcessing (void)
deleteOrphanedAgentsLH();
}
- {
+ // heartbeat generation
+
+ if (qmf1Support) {
#define BUFSIZE 65536
uint32_t contentSize;
char msgChars[BUFSIZE];
@@ -764,6 +787,27 @@ void ManagementAgent::periodicProcessing (void)
sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey);
}
+
+ if (qmf2Support) {
+ static const string addr_key("agent.ind.heartbeat");
+
+ messaging::Message msg;
+ messaging::MapContent content(msg);
+ messaging::Variant::Map& map(content.asMap());
+ messaging::Variant::Map headers;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_agent_heartbeat_indication";
+ headers["qmf.agent"] = name_address;
+
+ map["_values"] = attrMap;
+ map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
+ map["_values"].asMap()["heartbeat_interval"] = interval;
+ content.encode();
+ sendBuffer(msg.getContent(), "", headers, v2Topic, addr_key);
+
+ QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
+ }
QPID_LOG(debug, "periodic update " << debugSnapshot());
}
@@ -816,7 +860,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
headers["method"] = "indication";
headers["qmf.opcode"] = "_data_indication";
headers["qmf.content"] = "_data";
- headers["qmf.agent"] = std::string(agentName);
+ headers["qmf.agent"] = name_address;
content.encode();
sendBuffer(m.getContent(), "", headers, v2Topic, key.str());
@@ -978,7 +1022,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
headers["method"] = "response";
headers["qmf.opcode"] = "_method_response";
- headers["qmf.agent"] = std::string(agentName);
+ headers["qmf.agent"] = name_address;
if ((oid = inMap.find("_object_id")) == inMap.end() ||
(mid = inMap.find("_method_name")) == inMap.end())
@@ -1489,7 +1533,7 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin
}
-void ManagementAgent::handleGetQueryLH(const std::string& body, std::string& replyTo, const std::string& cid, const std::string& contentType)
+void ManagementAgent::handleGetQueryLH(const std::string& body, std::string replyTo, const std::string& cid, const std::string& contentType)
{
FieldTable ft;
FieldTable::ValuePtr value;
@@ -1511,7 +1555,7 @@ void ManagementAgent::handleGetQueryLH(const std::string& body, std::string& rep
headers["method"] = "response";
headers["qmf.opcode"] = "_query_response";
headers["qmf.content"] = "_data";
- headers["qmf.agent"] = std::string(agentName);
+ headers["qmf.agent"] = name_address;
headers["partial"];
::qpid::messaging::Message outMsg;
@@ -1592,6 +1636,31 @@ void ManagementAgent::handleGetQueryLH(const std::string& body, std::string& rep
QPID_LOG(trace, "SEND GetResponse (v2) to=" << replyTo << " seq=" << cid);
}
+
+void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo,
+ const string& cid)
+{
+ QPID_LOG(trace, "RCVD AgentLocateRequest");
+
+ messaging::Message msg;
+ messaging::MapContent content(msg);
+ messaging::Variant::Map& map(content.asMap());
+ messaging::Variant::Map headers;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_agent_locate_response";
+ headers["qmf.agent"] = name_address;
+
+ map["_values"] = attrMap;
+ map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
+ map["_values"].asMap()["heartbeat_interval"] = interval;
+ content.encode();
+ sendBuffer(msg.getContent(), cid, headers, v2Direct, replyTo);
+
+ QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo);
+}
+
+
bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
{
Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
@@ -1720,7 +1789,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
headers["method"] = "response";
headers["qmf.opcode"] = "_method_response";
- headers["qmf.agent"] = std::string(agentName);
+ headers["qmf.agent"] = name_address;
((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN;
((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN);
@@ -1794,6 +1863,8 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg)
return handleMethodRequestLH(body, replyToKey, cid, msg.getPublisher());
else if (opcode == "_query_request")
return handleGetQueryLH(body, replyToKey, cid, contentType);
+ else if (opcode == "_agent_locate_request")
+ return handleLocateRequestLH(body, replyToKey, cid);
QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!");
return;
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index 73760940ab..2f83a66476 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -76,6 +76,9 @@ public:
/** Called by cluster to suppress management output during update. */
void suppress(bool s) { suppressed = s; }
+ void setName(const std::string& vendor,
+ const std::string& product,
+ const std::string& instance="");
void setInterval(uint16_t _interval) { interval = _interval; }
void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange,
qpid::broker::Exchange::shared_ptr directExchange);
@@ -277,7 +280,12 @@ private:
typedef std::pair<std::string,std::string> MethodName;
typedef std::map<MethodName, std::string> DisallowedMethods;
DisallowedMethods disallowed;
- std::string agentName; // KAG TODO FIX
+
+ // Agent name and address
+ qpid::messaging::Variant::Map attrMap;
+ std::string name_address;
+
+ // supported management protocol
bool qmf1Support;
bool qmf2Support;
@@ -300,7 +308,7 @@ private:
const std::string& cid,
const qpid::messaging::VariantMap& headers,
qpid::broker::Exchange::shared_ptr exchange,
- std::string routingKey);
+ const std::string& routingKey);
void moveNewObjectsLH();
bool authorizeAgentMessageLH(qpid::broker::Message& msg);
@@ -333,8 +341,10 @@ private:
void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
- void handleGetQueryLH (const std::string& body, std::string& replyToKey, const std::string& cid, const std::string& contentType);
+ void handleGetQueryLH (const std::string& body, std::string replyToKey, const std::string& cid, const std::string& contentType);
void handleMethodRequestLH (const std::string& body, std::string replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken);
+ void handleLocateRequestLH (const std::string& body, const std::string &replyToKey, const std::string& cid);
+
size_t validateSchema(framing::Buffer&, uint8_t kind);
size_t validateTableSchema(framing::Buffer&);