summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-03-17 17:26:29 +0000
committerTed Ross <tross@apache.org>2010-03-17 17:26:29 +0000
commitabd00458baef20580c487567ab402aa27c7b3e3a (patch)
tree7c416bf0ecebcdb79fe174e308dcca8b5a790d9e
parent54556026728f891c61a72048e141a1e6a1d85c20 (diff)
downloadqpid-python-abd00458baef20580c487567ab402aa27c7b3e3a.tar.gz
Added agent-naming, agent-heartbeat.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@924376 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/examples/qmf-agent/example.cpp3
-rw-r--r--qpid/cpp/include/qpid/agent/ManagementAgent.h29
-rw-r--r--qpid/cpp/src/qpid/acl/Acl.cpp5
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp76
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.h9
5 files changed, 66 insertions, 56 deletions
diff --git a/qpid/cpp/examples/qmf-agent/example.cpp b/qpid/cpp/examples/qmf-agent/example.cpp
index 5ab9c10c91..e4420788c2 100644
--- a/qpid/cpp/examples/qmf-agent/example.cpp
+++ b/qpid/cpp/examples/qmf-agent/example.cpp
@@ -178,6 +178,9 @@ int main_int(int argc, char** argv)
// Register the Qmf_example schema with the agent
_qmf::Package packageInit(agent);
+ // Name the agent.
+ agent->setName("apache.org", "qmf-example");
+
// Start the agent. It will attempt to make a connection to the
// management broker
agent->init(settings, 5, false, ".magentdata");
diff --git a/qpid/cpp/include/qpid/agent/ManagementAgent.h b/qpid/cpp/include/qpid/agent/ManagementAgent.h
index d786ec9ec6..f0bf443bf5 100644
--- a/qpid/cpp/include/qpid/agent/ManagementAgent.h
+++ b/qpid/cpp/include/qpid/agent/ManagementAgent.h
@@ -52,21 +52,6 @@ class ManagementAgent
static ManagementAgent* agent;
};
- class Name {
- public:
- QMF_AGENT_EXTERN Name(const std::string &vendor,
- const std::string &product,
- const std::string &name);
- QMF_AGENT_EXTERN Name(const std::string &fullName);
- QMF_AGENT_EXTERN Name();
- QMF_AGENT_EXTERN operator std::string() const;
-
- private:
- std::string vendor;
- std::string product;
- std::string name;
- };
-
typedef enum {
SEV_EMERG = 0,
SEV_ALERT = 1,
@@ -84,6 +69,17 @@ class ManagementAgent
virtual int getMaxThreads() = 0;
+ // Set the name of the agent
+ //
+ // vendor - Vendor name or domain (i.e. "apache.org")
+ // product - Product name (i.e. "qpid")
+ // instance - A unique identifier for this instance of the agent.
+ // If empty, the agent will create a GUID for the instance.
+ //
+ virtual void setName(const std::string& vendor,
+ const std::string& product,
+ const std::string& instance="") = 0;
+
// Connect to a management broker
//
// brokerHost - Hostname or IP address (dotted-quad) of broker.
@@ -119,9 +115,6 @@ class ManagementAgent
bool useExternalThread = false,
const std::string& storeFile = "") = 0;
- // Extract the unique name for this agent
- virtual const Name& getName() = 0;
-
// Register a schema with the management agent. This is normally called by the
// package initializer generated by the management code generator.
//
diff --git a/qpid/cpp/src/qpid/acl/Acl.cpp b/qpid/cpp/src/qpid/acl/Acl.cpp
index 21a9e2055e..32287a42af 100644
--- a/qpid/cpp/src/qpid/acl/Acl.cpp
+++ b/qpid/cpp/src/qpid/acl/Acl.cpp
@@ -23,6 +23,7 @@
#include "qpid/Plugin.h"
#include "qpid/Options.h"
#include "qpid/log/Logger.h"
+#include "qpid/messaging/Variant.h"
#include "qmf/org/apache/qpid/acl/Package.h"
#include "qmf/org/apache/qpid/acl/EventAllow.h"
#include "qmf/org/apache/qpid/acl/EventDeny.h"
@@ -94,7 +95,7 @@ Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(fals
" ObjectType:" << AclHelper::getObjectTypeStr(objType) << " Name:" << name );
agent->raiseEvent(_qmf::EventAllow(id, AclHelper::getActionStr(action),
AclHelper::getObjectTypeStr(objType),
- name, framing::FieldTable()));
+ name, messaging::Variant::Map()));
case ALLOW:
return true;
case DENY:
@@ -106,7 +107,7 @@ Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(fals
QPID_LOG(info, "ACL Deny id:" << id << " action:" << AclHelper::getActionStr(action) << " ObjectType:" << AclHelper::getObjectTypeStr(objType) << " Name:" << name);
agent->raiseEvent(_qmf::EventDeny(id, AclHelper::getActionStr(action),
AclHelper::getObjectTypeStr(objType),
- name, framing::FieldTable()));
+ name, messaging::Variant::Map()));
return false;
}
return false;
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index f7a4fbe6d0..344dd4f43b 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -118,6 +118,21 @@ ManagementAgentImpl::~ManagementAgentImpl()
}
}
+void ManagementAgentImpl::setName(const string& vendor, const string& product, const string& instance)
+{
+ attrMap["_vendor"] = vendor;
+ attrMap["_product"] = product;
+ string inst;
+ if (instance.empty()) {
+ inst = qpid::messaging::Uuid(true).str();
+ } else
+ inst = instance;
+
+ name_address = vendor + ":" + product + ":" + inst;
+ attrMap["_instance"] = inst;
+ attrMap["_name"] = name_address;
+}
+
void ManagementAgentImpl::init(const string& brokerHost,
uint16_t brokerPort,
uint16_t intervalSeconds,
@@ -224,7 +239,7 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se
headers["method"] = "indication";
headers["qmf.opcode"] = "_data_indication";
headers["qmf.content"] = "_event";
- headers["qmf.agent"] = std::string(agentName);
+ headers["qmf.agent"] = name_address;
content.encode();
connThreadBody.sendBuffer(msg.getContent(), 0,
@@ -288,20 +303,7 @@ void ManagementAgentImpl::setSignalCallback(Notifyable& _notifyable)
void ManagementAgentImpl::startProtocol()
{
- char rawbuffer[512];
- Buffer buffer(rawbuffer, 512);
-
- connected = true;
- encodeHeader(buffer, 'A');
- buffer.putShortString("RemoteAgent [C++]");
- systemId.encode (buffer);
- buffer.putLong(requestedBrokerBank);
- buffer.putLong(requestedAgentBank);
- uint32_t length = buffer.getPosition();
- buffer.reset();
- connThreadBody.sendBuffer(buffer, length, "qpid.management", "broker");
- QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank <<
- " reqAgent=" << requestedAgentBank);
+ sendHeartbeat();
}
void ManagementAgentImpl::storeData(bool requested)
@@ -337,6 +339,27 @@ void ManagementAgentImpl::retrieveData()
}
}
+void ManagementAgentImpl::sendHeartbeat()
+{
+ static const string addr_exchange("qmf.default.topic");
+ static const string addr_key("agent.ind.heartbeat");
+
+ messaging::Message msg;
+ messaging::MapContent content(msg);
+ messaging::Variant::Map& map(content.asMap());
+ messaging::Variant::Map headers;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_agent_heartbeat_indication";
+ headers["qmf.agent"] = name_address;
+
+ map["_values"] = attrMap;
+ content.encode();
+ connThreadBody.sendBuffer(msg.getContent(), 0, headers, addr_exchange, addr_key);
+
+ QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
+}
+
void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequence,
uint32_t code, string text)
{
@@ -531,7 +554,7 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st
headers["method"] = "response";
headers["qmf.opcode"] = "_query_response";
headers["qmf.content"] = "_data";
- headers["qmf.agent"] = std::string(agentName);
+ headers["qmf.agent"] = name_address;
content.encode();
connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo);
@@ -566,7 +589,7 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st
headers["method"] = "response";
headers["qmf.opcode"] = "_query_response";
headers["qmf.content"] = "_data";
- headers["qmf.agent"] = std::string(agentName);
+ headers["qmf.agent"] = name_address;
content.encode();
connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo);
@@ -753,7 +776,6 @@ void ManagementAgentImpl::encodeClassIndication(Buffer& buf,
void ManagementAgentImpl::periodicProcessing()
{
Mutex::ScopedLock lock(agentLock);
- uint32_t contentSize;
list<pair<ObjectId, ManagementObject*> > deleteList;
if (!connected)
@@ -839,7 +861,7 @@ void ManagementAgentImpl::periodicProcessing()
headers["method"] = "indication";
headers["qmf.opcode"] = "_data_indication";
headers["qmf.content"] = "_data";
- headers["qmf.agent"] = std::string(agentName);
+ headers["qmf.agent"] = name_address;
connThreadBody.sendBuffer(str, 0, headers, "qpid.management", key.str());
}
@@ -854,20 +876,7 @@ void ManagementAgentImpl::periodicProcessing()
}
deleteList.clear();
-
- {
-#define BUFSIZE 65536
- char msgChars[BUFSIZE];
- Buffer msgBuffer(msgChars, BUFSIZE);
- encodeHeader(msgBuffer, 'h');
- msgBuffer.putLongLong(uint64_t(Duration(now())));
- stringstream key;
- key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank;
-
- contentSize = BUFSIZE - msgBuffer.available();
- msgBuffer.reset();
- connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str());
- }
+ sendHeartbeat();
}
void ManagementAgentImpl::ConnectionThread::run()
@@ -902,6 +911,7 @@ void ManagementAgentImpl::ConnectionThread::run()
if (shutdown)
return;
operational = true;
+ agent.connected = true;
agent.startProtocol();
try {
Mutex::ScopedUnlock _unlock(connLock);
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
index e778d43a13..b69a5e5843 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -51,6 +51,9 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
// Methods from ManagementAgent
//
int getMaxThreads() { return 1; }
+ void setName(const std::string& vendor,
+ const std::string& product,
+ const std::string& instance="");
void init(const std::string& brokerHost = "localhost",
uint16_t brokerPort = 5672,
uint16_t intervalSeconds = 10,
@@ -64,7 +67,6 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
uint16_t intervalSeconds = 10,
bool useExternalThread = false,
const std::string& storeFile = "");
- const Name& getName();
bool isConnected() { return connected; }
std::string& getLastFailure() { return lastFailure; }
void registerClass(const std::string& packageName,
@@ -141,6 +143,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
void received (client::Message& msg);
+ qpid::messaging::Variant::Map attrMap;
+ std::string name_address;
uint16_t interval;
bool extThread;
sys::PipeHandle* pipeHandle;
@@ -231,8 +235,6 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
static const std::string storeMagicNumber;
- Name agentName;
-
void startProtocol();
void storeData(bool requested=false);
void retrieveData();
@@ -253,6 +255,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
const std::string& cname,
const uint8_t *md5Sum);
bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
+ void sendHeartbeat();
void sendCommandComplete (std::string replyToKey, uint32_t sequence,
uint32_t code = 0, std::string text = std::string("OK"));
void handleAttachResponse (qpid::framing::Buffer& inBuffer);