diff options
author | Ted Ross <tross@apache.org> | 2008-07-11 20:14:07 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-07-11 20:14:07 +0000 |
commit | 2ebe3bcb668151cfd9a860e4416fe4478d9a56f4 (patch) | |
tree | 806288ff720f5b6bd73709e008e4f63c7e838896 /qpid/cpp/src/qpid/agent | |
parent | 525081bf3d3e9cb04cd9c7d3be030b4a2153be23 (diff) | |
download | qpid-python-2ebe3bcb668151cfd9a860e4416fe4478d9a56f4.tar.gz |
QPID-1174 Remote Management Agent for management of external components
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@676067 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/agent')
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 426 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.h | 157 |
2 files changed, 583 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp new file mode 100644 index 0000000000..3c079a5a0a --- /dev/null +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -0,0 +1,426 @@ + +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "qpid/management/Manageable.h" +#include "qpid/management/ManagementObject.h" +#include "ManagementAgentImpl.h" +#include <list> +#include <unistd.h> + +using namespace qpid::client; +using namespace qpid::framing; +using namespace qpid::management; +using namespace qpid::sys; +using std::stringstream; +using std::string; +using std::cout; +using std::endl; + +ManagementAgent* ManagementAgent::getAgent() +{ + //static ManagementAgent* agent = 0; + + //if (agent == 0) + // agent = new ManagementAgentImpl(); + //return agent; + return 0; +} + +ManagementAgentImpl::ManagementAgentImpl() : + clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread) +{ + // TODO: Establish system ID +} + +void ManagementAgentImpl::init (std::string brokerHost, + uint16_t brokerPort, + uint16_t intervalSeconds, + bool useExternalThread) +{ + interval = intervalSeconds; + extThread = useExternalThread; + nextObjectId = 1; + + sessionId.generate(); + queueName << "qmfagent-" << sessionId; + string dest = "qmfagent"; + + connection.open(brokerHost.c_str(), brokerPort); + session = connection.newSession (queueName.str()); + dispatcher = new client::Dispatcher(session); + + + session.queueDeclare (arg::queue=queueName.str()); + session.exchangeBind (arg::exchange="amq.direct", arg::queue=queueName.str(), + arg::bindingKey=queueName.str ()); + session.messageSubscribe (arg::queue=queueName.str(), + arg::destination=dest); + session.messageFlow (arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF); + session.messageFlow (arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF); + + Message attachRequest; + char rawbuffer[512]; // TODO: Modify Buffer so it can use stringstream + Buffer buffer (rawbuffer, 512); + + attachRequest.getDeliveryProperties().setRoutingKey("agent"); + attachRequest.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); + + EncodeHeader (buffer, 'A'); + buffer.putShortString ("RemoteAgent [C++]"); + buffer.putShortString (queueName.str()); + systemId.encode (buffer); + buffer.putLong (11); + + size_t length = 512 - buffer.available (); + string stringBuffer (rawbuffer, length); + attachRequest.setData (stringBuffer); + + session.messageTransfer (arg::content=attachRequest, arg::destination="qpid.management"); + + dispatcher->listen (dest, this); + dispatcher->start (); +} + +ManagementAgentImpl::~ManagementAgentImpl () +{ + dispatcher->stop (); + delete dispatcher; +} + +void ManagementAgentImpl::RegisterClass (std::string packageName, + std::string className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall) +{ + Mutex::ScopedLock lock(agentLock); + PackageMap::iterator pIter = FindOrAddPackage (packageName); + AddClassLocal (pIter, className, md5Sum, schemaCall); +} + +uint64_t ManagementAgentImpl::addObject (ManagementObject* object, + uint32_t /*persistId*/, + uint32_t /*persistBank*/) +{ + Mutex::ScopedLock lock(addLock); + uint64_t objectId; + + // TODO: fix object-id handling + objectId = objIdPrefix | ((nextObjectId++) & 0x00FFFFFF); + object->setObjectId (objectId); + newManagementObjects[objectId] = object; + return objectId; +} + +uint32_t ManagementAgentImpl::pollCallbacks (uint32_t /*callLimit*/) +{ + return 0; +} + +int ManagementAgentImpl::getSignalFd (void) +{ + return -1; +} + +void ManagementAgentImpl::handleAttachResponse (Buffer& inBuffer) +{ + Mutex::ScopedLock lock(agentLock); + uint32_t assigned; + + assigned = inBuffer.getLong(); + objIdPrefix = ((uint64_t) assigned) << 24; + + // Send package indications for all local packages + for (PackageMap::iterator pIter = packages.begin(); + pIter != packages.end(); + pIter++) { + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader(outBuffer, 'p'); + EncodePackageIndication(outBuffer, pIter); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, "qpid.management", "agent"); + + // Send class indications for all local classes + ClassMap cMap = pIter->second; + for (ClassMap::iterator cIter = cMap.begin(); cIter != cMap.end(); cIter++) { + outBuffer.reset(); + EncodeHeader(outBuffer, 'q'); + EncodeClassIndication(outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, "qpid.management", "agent"); + } + } +} + +void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence) +{ + Mutex::ScopedLock lock(agentLock); + string packageName; + SchemaClassKey key; + + inBuffer.getShortString(packageName); + inBuffer.getShortString(key.name); + inBuffer.getBin128(key.hash); + + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) { + ClassMap cMap = pIter->second; + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end()) { + SchemaClass schema = cIter->second; + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader(outBuffer, 's', sequence); + schema.writeSchemaCall(outBuffer); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, "qpid.management", "agent"); + } + } +} + +void ManagementAgentImpl::handleConsoleAddedIndication() +{ + Mutex::ScopedLock lock(agentLock); + clientWasAdded = true; +} + +void ManagementAgentImpl::received (Message& msg) +{ + string data = msg.getData (); + Buffer inBuffer (const_cast<char*>(data.c_str()), data.size()); + uint8_t opcode; + uint32_t sequence; + + if (CheckHeader (inBuffer, &opcode, &sequence)) + { + if (opcode == 'a') handleAttachResponse(inBuffer); + else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence); + else if (opcode == 'x') handleConsoleAddedIndication(); + } +} + +void ManagementAgentImpl::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) +{ + buf.putOctet ('A'); + buf.putOctet ('M'); + buf.putOctet ('1'); + buf.putOctet (opcode); + buf.putLong (seq); +} + +bool ManagementAgentImpl::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) +{ + if (buf.getSize() < 8) + return false; + + uint8_t h1 = buf.getOctet (); + uint8_t h2 = buf.getOctet (); + uint8_t h3 = buf.getOctet (); + + *opcode = buf.getOctet (); + *seq = buf.getLong (); + + return h1 == 'A' && h2 == 'M' && h3 == '1'; +} + +void ManagementAgentImpl::SendBuffer (Buffer& buf, + uint32_t length, + string exchange, + string routingKey) +{ + Message msg; + string data; + + if (objIdPrefix == 0) + return; + + buf.getRawData(data, length); + msg.getDeliveryProperties().setRoutingKey(routingKey); + msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); + msg.setData (data); + session.messageTransfer (arg::content=msg, arg::destination=exchange); +} + +ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage (std::string name) +{ + PackageMap::iterator pIter = packages.find (name); + if (pIter != packages.end ()) + return pIter; + + // No such package found, create a new map entry. + std::pair<PackageMap::iterator, bool> result = + packages.insert (std::pair<string, ClassMap> (name, ClassMap ())); + + // Publish a package-indication message + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'p'); + EncodePackageIndication (outBuffer, result.first); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, "qpid.management", "mgmt.schema.package"); + + return result.first; +} + +void ManagementAgentImpl::moveNewObjectsLH() +{ + Mutex::ScopedLock lock (addLock); + for (ManagementObjectMap::iterator iter = newManagementObjects.begin (); + iter != newManagementObjects.end (); + iter++) + managementObjects[iter->first] = iter->second; + newManagementObjects.clear(); +} + +void ManagementAgentImpl::AddClassLocal (PackageMap::iterator pIter, + string className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall) +{ + SchemaClassKey key; + ClassMap& cMap = pIter->second; + + key.name = className; + memcpy (&key.hash, md5Sum, 16); + + ClassMap::iterator cIter = cMap.find (key); + if (cIter != cMap.end ()) + return; + + // No such class found, create a new class with local information. + SchemaClass classInfo; + + classInfo.writeSchemaCall = schemaCall; + cMap[key] = classInfo; + + // TODO: Publish a class-indication message +} + +void ManagementAgentImpl::EncodePackageIndication (Buffer& buf, + PackageMap::iterator pIter) +{ + buf.putShortString ((*pIter).first); +} + +void ManagementAgentImpl::EncodeClassIndication (Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter) +{ + SchemaClassKey key = (*cIter).first; + + buf.putShortString ((*pIter).first); + buf.putShortString (key.name); + buf.putBin128 (key.hash); +} + +void ManagementAgentImpl::PeriodicProcessing() +{ +#define BUFSIZE 65536 + Mutex::ScopedLock lock(agentLock); + char msgChars[BUFSIZE]; + uint32_t contentSize; + string routingKey; + std::list<uint64_t> deleteList; + + { + Buffer msgBuffer(msgChars, BUFSIZE); + EncodeHeader(msgBuffer, 'h'); + msgBuffer.putLongLong(uint64_t(Duration(now()))); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + routingKey = "mgmt." + systemId.str() + ".heartbeat"; + SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey); + } + + moveNewObjectsLH(); + + if (clientWasAdded) + { + clientWasAdded = false; + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject* object = iter->second; + object->setAllChanged (); + } + } + + if (managementObjects.empty ()) + return; + + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject* object = iter->second; + + if (object->getConfigChanged () || object->isDeleted ()) + { + Buffer msgBuffer (msgChars, BUFSIZE); + EncodeHeader (msgBuffer, 'c'); + object->writeProperties(msgBuffer); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + routingKey = "mgmt." + systemId.str() + ".prop." + object->getClassName (); + SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey); + } + + if (object->getInstChanged ()) + { + Buffer msgBuffer (msgChars, BUFSIZE); + EncodeHeader (msgBuffer, 'i'); + object->writeStatistics(msgBuffer); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + routingKey = "mgmt." + systemId.str () + ".stat." + object->getClassName (); + SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey); + } + + if (object->isDeleted ()) + deleteList.push_back (iter->first); + } + + // Delete flagged objects + for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin (); + iter != deleteList.rend (); + iter++) + managementObjects.erase (*iter); + + deleteList.clear (); +} + +void ManagementAgentImpl::BackgroundThread::run() +{ + while (true) { + ::sleep(5); + agent.PeriodicProcessing(); + } +} diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h new file mode 100644 index 0000000000..b7572fe833 --- /dev/null +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -0,0 +1,157 @@ +#ifndef _qpid_agent_ManagementAgentImpl_ +#define _qpid_agent_ManagementAgentImpl_ + +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "ManagementAgent.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Dispatcher.h" +#include "qpid/client/Session.h" +#include "qpid/client/AsyncSession.h" +#include "qpid/client/Message.h" +#include "qpid/client/MessageListener.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Mutex.h" +#include "qpid/framing/Uuid.h" +#include <iostream> +#include <sstream> + +namespace qpid { +namespace management { + +class ManagementAgentImpl : public ManagementAgent, public client::MessageListener +{ + public: + + ManagementAgentImpl (); + virtual ~ManagementAgentImpl (); + + int getMaxThreads() { return 1; } + void init(std::string brokerHost = "localhost", + uint16_t brokerPort = 5672, + uint16_t intervalSeconds = 10, + bool useExternalThread = false); + void RegisterClass(std::string packageName, + std::string className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall); + uint64_t addObject (management::ManagementObject* objectPtr, + uint32_t persistId = 0, + uint32_t persistBank = 4); + uint32_t pollCallbacks (uint32_t callLimit = 0); + int getSignalFd (void); + + void PeriodicProcessing(); + + private: + + struct SchemaClassKey + { + std::string name; + uint8_t hash[16]; + }; + + struct SchemaClassKeyComp + { + bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const + { + if (lhs.name != rhs.name) + return lhs.name < rhs.name; + else + for (int i = 0; i < 16; i++) + if (lhs.hash[i] != rhs.hash[i]) + return lhs.hash[i] < rhs.hash[i]; + return false; + } + }; + + struct SchemaClass + { + management::ManagementObject::writeSchemaCall_t writeSchemaCall; + + SchemaClass () : writeSchemaCall(0) {} + }; + + typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; + typedef std::map<std::string, ClassMap> PackageMap; + + PackageMap packages; + management::ManagementObjectMap managementObjects; + management::ManagementObjectMap newManagementObjects; + + void received (client::Message& msg); + + uint16_t interval; + bool extThread; + uint64_t nextObjectId; + sys::Mutex agentLock; + sys::Mutex addLock; + framing::Uuid sessionId; + framing::Uuid systemId; + + int signalFdIn, signalFdOut; + client::Connection connection; + client::Session session; + client::Dispatcher* dispatcher; + bool clientWasAdded; + uint64_t objIdPrefix; + std::stringstream queueName; +# define MA_BUFFER_SIZE 65536 + char outputBuffer[MA_BUFFER_SIZE]; + + class BackgroundThread : public sys::Runnable + { + ManagementAgentImpl& agent; + void run(); + public: + BackgroundThread(ManagementAgentImpl& _agent) : agent(_agent) {} + }; + + BackgroundThread bgThread; + sys::Thread thread; + + PackageMap::iterator FindOrAddPackage (std::string name); + void moveNewObjectsLH(); + void AddClassLocal (PackageMap::iterator pIter, + std::string className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall); + void EncodePackageIndication (qpid::framing::Buffer& buf, + PackageMap::iterator pIter); + void EncodeClassIndication (qpid::framing::Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter); + void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); + bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); + void SendBuffer (qpid::framing::Buffer& buf, + uint32_t length, + std::string exchange, + std::string routingKey); + void handleAttachResponse (qpid::framing::Buffer& inBuffer); + void handlePackageRequest (qpid::framing::Buffer& inBuffer); + void handleClassQuery (qpid::framing::Buffer& inBuffer); + void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence); + void handleConsoleAddedIndication(); +}; + +}} + +#endif /*!_qpid_agent_ManagementAgentImpl_*/ |