diff options
author | Ted Ross <tross@apache.org> | 2010-03-17 17:26:29 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2010-03-17 17:26:29 +0000 |
commit | abd00458baef20580c487567ab402aa27c7b3e3a (patch) | |
tree | 7c416bf0ecebcdb79fe174e308dcca8b5a790d9e | |
parent | 54556026728f891c61a72048e141a1e6a1d85c20 (diff) | |
download | qpid-python-abd00458baef20580c487567ab402aa27c7b3e3a.tar.gz |
Added agent-naming, agent-heartbeat.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@924376 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/examples/qmf-agent/example.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/include/qpid/agent/ManagementAgent.h | 29 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/acl/Acl.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 76 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.h | 9 |
5 files changed, 66 insertions, 56 deletions
diff --git a/qpid/cpp/examples/qmf-agent/example.cpp b/qpid/cpp/examples/qmf-agent/example.cpp index 5ab9c10c91..e4420788c2 100644 --- a/qpid/cpp/examples/qmf-agent/example.cpp +++ b/qpid/cpp/examples/qmf-agent/example.cpp @@ -178,6 +178,9 @@ int main_int(int argc, char** argv) // Register the Qmf_example schema with the agent _qmf::Package packageInit(agent); + // Name the agent. + agent->setName("apache.org", "qmf-example"); + // Start the agent. It will attempt to make a connection to the // management broker agent->init(settings, 5, false, ".magentdata"); diff --git a/qpid/cpp/include/qpid/agent/ManagementAgent.h b/qpid/cpp/include/qpid/agent/ManagementAgent.h index d786ec9ec6..f0bf443bf5 100644 --- a/qpid/cpp/include/qpid/agent/ManagementAgent.h +++ b/qpid/cpp/include/qpid/agent/ManagementAgent.h @@ -52,21 +52,6 @@ class ManagementAgent static ManagementAgent* agent; }; - class Name { - public: - QMF_AGENT_EXTERN Name(const std::string &vendor, - const std::string &product, - const std::string &name); - QMF_AGENT_EXTERN Name(const std::string &fullName); - QMF_AGENT_EXTERN Name(); - QMF_AGENT_EXTERN operator std::string() const; - - private: - std::string vendor; - std::string product; - std::string name; - }; - typedef enum { SEV_EMERG = 0, SEV_ALERT = 1, @@ -84,6 +69,17 @@ class ManagementAgent virtual int getMaxThreads() = 0; + // Set the name of the agent + // + // vendor - Vendor name or domain (i.e. "apache.org") + // product - Product name (i.e. "qpid") + // instance - A unique identifier for this instance of the agent. + // If empty, the agent will create a GUID for the instance. + // + virtual void setName(const std::string& vendor, + const std::string& product, + const std::string& instance="") = 0; + // Connect to a management broker // // brokerHost - Hostname or IP address (dotted-quad) of broker. @@ -119,9 +115,6 @@ class ManagementAgent bool useExternalThread = false, const std::string& storeFile = "") = 0; - // Extract the unique name for this agent - virtual const Name& getName() = 0; - // Register a schema with the management agent. This is normally called by the // package initializer generated by the management code generator. // diff --git a/qpid/cpp/src/qpid/acl/Acl.cpp b/qpid/cpp/src/qpid/acl/Acl.cpp index 21a9e2055e..32287a42af 100644 --- a/qpid/cpp/src/qpid/acl/Acl.cpp +++ b/qpid/cpp/src/qpid/acl/Acl.cpp @@ -23,6 +23,7 @@ #include "qpid/Plugin.h" #include "qpid/Options.h" #include "qpid/log/Logger.h" +#include "qpid/messaging/Variant.h" #include "qmf/org/apache/qpid/acl/Package.h" #include "qmf/org/apache/qpid/acl/EventAllow.h" #include "qmf/org/apache/qpid/acl/EventDeny.h" @@ -94,7 +95,7 @@ Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(fals " ObjectType:" << AclHelper::getObjectTypeStr(objType) << " Name:" << name ); agent->raiseEvent(_qmf::EventAllow(id, AclHelper::getActionStr(action), AclHelper::getObjectTypeStr(objType), - name, framing::FieldTable())); + name, messaging::Variant::Map())); case ALLOW: return true; case DENY: @@ -106,7 +107,7 @@ Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(fals QPID_LOG(info, "ACL Deny id:" << id << " action:" << AclHelper::getActionStr(action) << " ObjectType:" << AclHelper::getObjectTypeStr(objType) << " Name:" << name); agent->raiseEvent(_qmf::EventDeny(id, AclHelper::getActionStr(action), AclHelper::getObjectTypeStr(objType), - name, framing::FieldTable())); + name, messaging::Variant::Map())); return false; } return false; diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index f7a4fbe6d0..344dd4f43b 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -118,6 +118,21 @@ ManagementAgentImpl::~ManagementAgentImpl() } } +void ManagementAgentImpl::setName(const string& vendor, const string& product, const string& instance) +{ + attrMap["_vendor"] = vendor; + attrMap["_product"] = product; + string inst; + if (instance.empty()) { + inst = qpid::messaging::Uuid(true).str(); + } else + inst = instance; + + name_address = vendor + ":" + product + ":" + inst; + attrMap["_instance"] = inst; + attrMap["_name"] = name_address; +} + void ManagementAgentImpl::init(const string& brokerHost, uint16_t brokerPort, uint16_t intervalSeconds, @@ -224,7 +239,7 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se headers["method"] = "indication"; headers["qmf.opcode"] = "_data_indication"; headers["qmf.content"] = "_event"; - headers["qmf.agent"] = std::string(agentName); + headers["qmf.agent"] = name_address; content.encode(); connThreadBody.sendBuffer(msg.getContent(), 0, @@ -288,20 +303,7 @@ void ManagementAgentImpl::setSignalCallback(Notifyable& _notifyable) void ManagementAgentImpl::startProtocol() { - char rawbuffer[512]; - Buffer buffer(rawbuffer, 512); - - connected = true; - encodeHeader(buffer, 'A'); - buffer.putShortString("RemoteAgent [C++]"); - systemId.encode (buffer); - buffer.putLong(requestedBrokerBank); - buffer.putLong(requestedAgentBank); - uint32_t length = buffer.getPosition(); - buffer.reset(); - connThreadBody.sendBuffer(buffer, length, "qpid.management", "broker"); - QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank << - " reqAgent=" << requestedAgentBank); + sendHeartbeat(); } void ManagementAgentImpl::storeData(bool requested) @@ -337,6 +339,27 @@ void ManagementAgentImpl::retrieveData() } } +void ManagementAgentImpl::sendHeartbeat() +{ + static const string addr_exchange("qmf.default.topic"); + static const string addr_key("agent.ind.heartbeat"); + + messaging::Message msg; + messaging::MapContent content(msg); + messaging::Variant::Map& map(content.asMap()); + messaging::Variant::Map headers; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_agent_heartbeat_indication"; + headers["qmf.agent"] = name_address; + + map["_values"] = attrMap; + content.encode(); + connThreadBody.sendBuffer(msg.getContent(), 0, headers, addr_exchange, addr_key); + + QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); +} + void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequence, uint32_t code, string text) { @@ -531,7 +554,7 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st headers["method"] = "response"; headers["qmf.opcode"] = "_query_response"; headers["qmf.content"] = "_data"; - headers["qmf.agent"] = std::string(agentName); + headers["qmf.agent"] = name_address; content.encode(); connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo); @@ -566,7 +589,7 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st headers["method"] = "response"; headers["qmf.opcode"] = "_query_response"; headers["qmf.content"] = "_data"; - headers["qmf.agent"] = std::string(agentName); + headers["qmf.agent"] = name_address; content.encode(); connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo); @@ -753,7 +776,6 @@ void ManagementAgentImpl::encodeClassIndication(Buffer& buf, void ManagementAgentImpl::periodicProcessing() { Mutex::ScopedLock lock(agentLock); - uint32_t contentSize; list<pair<ObjectId, ManagementObject*> > deleteList; if (!connected) @@ -839,7 +861,7 @@ void ManagementAgentImpl::periodicProcessing() headers["method"] = "indication"; headers["qmf.opcode"] = "_data_indication"; headers["qmf.content"] = "_data"; - headers["qmf.agent"] = std::string(agentName); + headers["qmf.agent"] = name_address; connThreadBody.sendBuffer(str, 0, headers, "qpid.management", key.str()); } @@ -854,20 +876,7 @@ void ManagementAgentImpl::periodicProcessing() } deleteList.clear(); - - { -#define BUFSIZE 65536 - char msgChars[BUFSIZE]; - Buffer msgBuffer(msgChars, BUFSIZE); - encodeHeader(msgBuffer, 'h'); - msgBuffer.putLongLong(uint64_t(Duration(now()))); - stringstream key; - key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank; - - contentSize = BUFSIZE - msgBuffer.available(); - msgBuffer.reset(); - connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str()); - } + sendHeartbeat(); } void ManagementAgentImpl::ConnectionThread::run() @@ -902,6 +911,7 @@ void ManagementAgentImpl::ConnectionThread::run() if (shutdown) return; operational = true; + agent.connected = true; agent.startProtocol(); try { Mutex::ScopedUnlock _unlock(connLock); diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h index e778d43a13..b69a5e5843 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -51,6 +51,9 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen // Methods from ManagementAgent // int getMaxThreads() { return 1; } + void setName(const std::string& vendor, + const std::string& product, + const std::string& instance=""); void init(const std::string& brokerHost = "localhost", uint16_t brokerPort = 5672, uint16_t intervalSeconds = 10, @@ -64,7 +67,6 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen uint16_t intervalSeconds = 10, bool useExternalThread = false, const std::string& storeFile = ""); - const Name& getName(); bool isConnected() { return connected; } std::string& getLastFailure() { return lastFailure; } void registerClass(const std::string& packageName, @@ -141,6 +143,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void received (client::Message& msg); + qpid::messaging::Variant::Map attrMap; + std::string name_address; uint16_t interval; bool extThread; sys::PipeHandle* pipeHandle; @@ -231,8 +235,6 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen static const std::string storeMagicNumber; - Name agentName; - void startProtocol(); void storeData(bool requested=false); void retrieveData(); @@ -253,6 +255,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen const std::string& cname, const uint8_t *md5Sum); bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); + void sendHeartbeat(); void sendCommandComplete (std::string replyToKey, uint32_t sequence, uint32_t code = 0, std::string text = std::string("OK")); void handleAttachResponse (qpid::framing::Buffer& inBuffer); |