diff options
author | Ted Ross <tross@apache.org> | 2008-07-11 20:14:07 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-07-11 20:14:07 +0000 |
commit | 2fd1b08b605d2664394ff5708c3cbaebd1dc21ef (patch) | |
tree | d0b5c7cfa8f31a1fc721fb45d7ca77a027875b7d /cpp/src | |
parent | 13e2db2a3d0d14881da3c088f084385740df0731 (diff) | |
download | qpid-python-2fd1b08b605d2664394ff5708c3cbaebd1dc21ef.tar.gz |
QPID-1174 Remote Management Agent for management of external components
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@676067 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 6 | ||||
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.cpp | 426 | ||||
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.h | 157 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementBroker.cpp | 201 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementBroker.h | 44 |
5 files changed, 775 insertions, 59 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index bfebd4ae88..74aa504e90 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -221,6 +221,8 @@ libqpidcommon_la_SOURCES = \ qpid/Plugin.cpp \ qpid/StringUtils.cpp \ qpid/Url.cpp \ + qpid/management/Manageable.cpp \ + qpid/management/ManagementObject.cpp \ qpid/sys/AggregateOutput.cpp \ qpid/sys/AsynchIOHandler.cpp \ qpid/sys/Dispatcher.cpp \ @@ -304,10 +306,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/TxBuffer.cpp \ qpid/broker/TxPublish.cpp \ qpid/broker/Vhost.cpp \ - qpid/management/Manageable.cpp \ qpid/management/ManagementBroker.cpp \ qpid/management/ManagementExchange.cpp \ - qpid/management/ManagementObject.cpp \ qpid/sys/TCPIOPlugin.cpp if HAVE_XML @@ -319,6 +319,7 @@ libqpidclient_la_LIBADD = libqpidcommon.la -luuid libqpidclient_la_SOURCES = \ $(rgen_client_srcs) \ + qpid/agent/ManagementAgentImpl.cpp \ qpid/client/AckPolicy.cpp \ qpid/client/Bounds.cpp \ qpid/client/ConnectionImpl.cpp \ @@ -367,6 +368,7 @@ nobase_include_HEADERS = \ qpid/memory.h \ qpid/shared_ptr.h \ qpid/agent/ManagementAgent.h \ + qpid/agent/ManagementAgentImpl.h \ qpid/broker/Broker.h \ qpid/broker/SessionAdapter.h \ qpid/broker/Exchange.h \ diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp new file mode 100644 index 0000000000..3c079a5a0a --- /dev/null +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -0,0 +1,426 @@ + +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "qpid/management/Manageable.h" +#include "qpid/management/ManagementObject.h" +#include "ManagementAgentImpl.h" +#include <list> +#include <unistd.h> + +using namespace qpid::client; +using namespace qpid::framing; +using namespace qpid::management; +using namespace qpid::sys; +using std::stringstream; +using std::string; +using std::cout; +using std::endl; + +ManagementAgent* ManagementAgent::getAgent() +{ + //static ManagementAgent* agent = 0; + + //if (agent == 0) + // agent = new ManagementAgentImpl(); + //return agent; + return 0; +} + +ManagementAgentImpl::ManagementAgentImpl() : + clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread) +{ + // TODO: Establish system ID +} + +void ManagementAgentImpl::init (std::string brokerHost, + uint16_t brokerPort, + uint16_t intervalSeconds, + bool useExternalThread) +{ + interval = intervalSeconds; + extThread = useExternalThread; + nextObjectId = 1; + + sessionId.generate(); + queueName << "qmfagent-" << sessionId; + string dest = "qmfagent"; + + connection.open(brokerHost.c_str(), brokerPort); + session = connection.newSession (queueName.str()); + dispatcher = new client::Dispatcher(session); + + + session.queueDeclare (arg::queue=queueName.str()); + session.exchangeBind (arg::exchange="amq.direct", arg::queue=queueName.str(), + arg::bindingKey=queueName.str ()); + session.messageSubscribe (arg::queue=queueName.str(), + arg::destination=dest); + session.messageFlow (arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF); + session.messageFlow (arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF); + + Message attachRequest; + char rawbuffer[512]; // TODO: Modify Buffer so it can use stringstream + Buffer buffer (rawbuffer, 512); + + attachRequest.getDeliveryProperties().setRoutingKey("agent"); + attachRequest.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); + + EncodeHeader (buffer, 'A'); + buffer.putShortString ("RemoteAgent [C++]"); + buffer.putShortString (queueName.str()); + systemId.encode (buffer); + buffer.putLong (11); + + size_t length = 512 - buffer.available (); + string stringBuffer (rawbuffer, length); + attachRequest.setData (stringBuffer); + + session.messageTransfer (arg::content=attachRequest, arg::destination="qpid.management"); + + dispatcher->listen (dest, this); + dispatcher->start (); +} + +ManagementAgentImpl::~ManagementAgentImpl () +{ + dispatcher->stop (); + delete dispatcher; +} + +void ManagementAgentImpl::RegisterClass (std::string packageName, + std::string className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall) +{ + Mutex::ScopedLock lock(agentLock); + PackageMap::iterator pIter = FindOrAddPackage (packageName); + AddClassLocal (pIter, className, md5Sum, schemaCall); +} + +uint64_t ManagementAgentImpl::addObject (ManagementObject* object, + uint32_t /*persistId*/, + uint32_t /*persistBank*/) +{ + Mutex::ScopedLock lock(addLock); + uint64_t objectId; + + // TODO: fix object-id handling + objectId = objIdPrefix | ((nextObjectId++) & 0x00FFFFFF); + object->setObjectId (objectId); + newManagementObjects[objectId] = object; + return objectId; +} + +uint32_t ManagementAgentImpl::pollCallbacks (uint32_t /*callLimit*/) +{ + return 0; +} + +int ManagementAgentImpl::getSignalFd (void) +{ + return -1; +} + +void ManagementAgentImpl::handleAttachResponse (Buffer& inBuffer) +{ + Mutex::ScopedLock lock(agentLock); + uint32_t assigned; + + assigned = inBuffer.getLong(); + objIdPrefix = ((uint64_t) assigned) << 24; + + // Send package indications for all local packages + for (PackageMap::iterator pIter = packages.begin(); + pIter != packages.end(); + pIter++) { + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader(outBuffer, 'p'); + EncodePackageIndication(outBuffer, pIter); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, "qpid.management", "agent"); + + // Send class indications for all local classes + ClassMap cMap = pIter->second; + for (ClassMap::iterator cIter = cMap.begin(); cIter != cMap.end(); cIter++) { + outBuffer.reset(); + EncodeHeader(outBuffer, 'q'); + EncodeClassIndication(outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, "qpid.management", "agent"); + } + } +} + +void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence) +{ + Mutex::ScopedLock lock(agentLock); + string packageName; + SchemaClassKey key; + + inBuffer.getShortString(packageName); + inBuffer.getShortString(key.name); + inBuffer.getBin128(key.hash); + + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) { + ClassMap cMap = pIter->second; + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end()) { + SchemaClass schema = cIter->second; + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader(outBuffer, 's', sequence); + schema.writeSchemaCall(outBuffer); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, "qpid.management", "agent"); + } + } +} + +void ManagementAgentImpl::handleConsoleAddedIndication() +{ + Mutex::ScopedLock lock(agentLock); + clientWasAdded = true; +} + +void ManagementAgentImpl::received (Message& msg) +{ + string data = msg.getData (); + Buffer inBuffer (const_cast<char*>(data.c_str()), data.size()); + uint8_t opcode; + uint32_t sequence; + + if (CheckHeader (inBuffer, &opcode, &sequence)) + { + if (opcode == 'a') handleAttachResponse(inBuffer); + else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence); + else if (opcode == 'x') handleConsoleAddedIndication(); + } +} + +void ManagementAgentImpl::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) +{ + buf.putOctet ('A'); + buf.putOctet ('M'); + buf.putOctet ('1'); + buf.putOctet (opcode); + buf.putLong (seq); +} + +bool ManagementAgentImpl::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) +{ + if (buf.getSize() < 8) + return false; + + uint8_t h1 = buf.getOctet (); + uint8_t h2 = buf.getOctet (); + uint8_t h3 = buf.getOctet (); + + *opcode = buf.getOctet (); + *seq = buf.getLong (); + + return h1 == 'A' && h2 == 'M' && h3 == '1'; +} + +void ManagementAgentImpl::SendBuffer (Buffer& buf, + uint32_t length, + string exchange, + string routingKey) +{ + Message msg; + string data; + + if (objIdPrefix == 0) + return; + + buf.getRawData(data, length); + msg.getDeliveryProperties().setRoutingKey(routingKey); + msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); + msg.setData (data); + session.messageTransfer (arg::content=msg, arg::destination=exchange); +} + +ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage (std::string name) +{ + PackageMap::iterator pIter = packages.find (name); + if (pIter != packages.end ()) + return pIter; + + // No such package found, create a new map entry. + std::pair<PackageMap::iterator, bool> result = + packages.insert (std::pair<string, ClassMap> (name, ClassMap ())); + + // Publish a package-indication message + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'p'); + EncodePackageIndication (outBuffer, result.first); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, "qpid.management", "mgmt.schema.package"); + + return result.first; +} + +void ManagementAgentImpl::moveNewObjectsLH() +{ + Mutex::ScopedLock lock (addLock); + for (ManagementObjectMap::iterator iter = newManagementObjects.begin (); + iter != newManagementObjects.end (); + iter++) + managementObjects[iter->first] = iter->second; + newManagementObjects.clear(); +} + +void ManagementAgentImpl::AddClassLocal (PackageMap::iterator pIter, + string className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall) +{ + SchemaClassKey key; + ClassMap& cMap = pIter->second; + + key.name = className; + memcpy (&key.hash, md5Sum, 16); + + ClassMap::iterator cIter = cMap.find (key); + if (cIter != cMap.end ()) + return; + + // No such class found, create a new class with local information. + SchemaClass classInfo; + + classInfo.writeSchemaCall = schemaCall; + cMap[key] = classInfo; + + // TODO: Publish a class-indication message +} + +void ManagementAgentImpl::EncodePackageIndication (Buffer& buf, + PackageMap::iterator pIter) +{ + buf.putShortString ((*pIter).first); +} + +void ManagementAgentImpl::EncodeClassIndication (Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter) +{ + SchemaClassKey key = (*cIter).first; + + buf.putShortString ((*pIter).first); + buf.putShortString (key.name); + buf.putBin128 (key.hash); +} + +void ManagementAgentImpl::PeriodicProcessing() +{ +#define BUFSIZE 65536 + Mutex::ScopedLock lock(agentLock); + char msgChars[BUFSIZE]; + uint32_t contentSize; + string routingKey; + std::list<uint64_t> deleteList; + + { + Buffer msgBuffer(msgChars, BUFSIZE); + EncodeHeader(msgBuffer, 'h'); + msgBuffer.putLongLong(uint64_t(Duration(now()))); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + routingKey = "mgmt." + systemId.str() + ".heartbeat"; + SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey); + } + + moveNewObjectsLH(); + + if (clientWasAdded) + { + clientWasAdded = false; + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject* object = iter->second; + object->setAllChanged (); + } + } + + if (managementObjects.empty ()) + return; + + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject* object = iter->second; + + if (object->getConfigChanged () || object->isDeleted ()) + { + Buffer msgBuffer (msgChars, BUFSIZE); + EncodeHeader (msgBuffer, 'c'); + object->writeProperties(msgBuffer); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + routingKey = "mgmt." + systemId.str() + ".prop." + object->getClassName (); + SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey); + } + + if (object->getInstChanged ()) + { + Buffer msgBuffer (msgChars, BUFSIZE); + EncodeHeader (msgBuffer, 'i'); + object->writeStatistics(msgBuffer); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + routingKey = "mgmt." + systemId.str () + ".stat." + object->getClassName (); + SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey); + } + + if (object->isDeleted ()) + deleteList.push_back (iter->first); + } + + // Delete flagged objects + for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin (); + iter != deleteList.rend (); + iter++) + managementObjects.erase (*iter); + + deleteList.clear (); +} + +void ManagementAgentImpl::BackgroundThread::run() +{ + while (true) { + ::sleep(5); + agent.PeriodicProcessing(); + } +} diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h new file mode 100644 index 0000000000..b7572fe833 --- /dev/null +++ b/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -0,0 +1,157 @@ +#ifndef _qpid_agent_ManagementAgentImpl_ +#define _qpid_agent_ManagementAgentImpl_ + +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "ManagementAgent.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Dispatcher.h" +#include "qpid/client/Session.h" +#include "qpid/client/AsyncSession.h" +#include "qpid/client/Message.h" +#include "qpid/client/MessageListener.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Mutex.h" +#include "qpid/framing/Uuid.h" +#include <iostream> +#include <sstream> + +namespace qpid { +namespace management { + +class ManagementAgentImpl : public ManagementAgent, public client::MessageListener +{ + public: + + ManagementAgentImpl (); + virtual ~ManagementAgentImpl (); + + int getMaxThreads() { return 1; } + void init(std::string brokerHost = "localhost", + uint16_t brokerPort = 5672, + uint16_t intervalSeconds = 10, + bool useExternalThread = false); + void RegisterClass(std::string packageName, + std::string className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall); + uint64_t addObject (management::ManagementObject* objectPtr, + uint32_t persistId = 0, + uint32_t persistBank = 4); + uint32_t pollCallbacks (uint32_t callLimit = 0); + int getSignalFd (void); + + void PeriodicProcessing(); + + private: + + struct SchemaClassKey + { + std::string name; + uint8_t hash[16]; + }; + + struct SchemaClassKeyComp + { + bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const + { + if (lhs.name != rhs.name) + return lhs.name < rhs.name; + else + for (int i = 0; i < 16; i++) + if (lhs.hash[i] != rhs.hash[i]) + return lhs.hash[i] < rhs.hash[i]; + return false; + } + }; + + struct SchemaClass + { + management::ManagementObject::writeSchemaCall_t writeSchemaCall; + + SchemaClass () : writeSchemaCall(0) {} + }; + + typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; + typedef std::map<std::string, ClassMap> PackageMap; + + PackageMap packages; + management::ManagementObjectMap managementObjects; + management::ManagementObjectMap newManagementObjects; + + void received (client::Message& msg); + + uint16_t interval; + bool extThread; + uint64_t nextObjectId; + sys::Mutex agentLock; + sys::Mutex addLock; + framing::Uuid sessionId; + framing::Uuid systemId; + + int signalFdIn, signalFdOut; + client::Connection connection; + client::Session session; + client::Dispatcher* dispatcher; + bool clientWasAdded; + uint64_t objIdPrefix; + std::stringstream queueName; +# define MA_BUFFER_SIZE 65536 + char outputBuffer[MA_BUFFER_SIZE]; + + class BackgroundThread : public sys::Runnable + { + ManagementAgentImpl& agent; + void run(); + public: + BackgroundThread(ManagementAgentImpl& _agent) : agent(_agent) {} + }; + + BackgroundThread bgThread; + sys::Thread thread; + + PackageMap::iterator FindOrAddPackage (std::string name); + void moveNewObjectsLH(); + void AddClassLocal (PackageMap::iterator pIter, + std::string className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall); + void EncodePackageIndication (qpid::framing::Buffer& buf, + PackageMap::iterator pIter); + void EncodeClassIndication (qpid::framing::Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter); + void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); + bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); + void SendBuffer (qpid::framing::Buffer& buf, + uint32_t length, + std::string exchange, + std::string routingKey); + void handleAttachResponse (qpid::framing::Buffer& inBuffer); + void handlePackageRequest (qpid::framing::Buffer& inBuffer); + void handleClassQuery (qpid::framing::Buffer& inBuffer); + void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence); + void handleConsoleAddedIndication(); +}; + +}} + +#endif /*!_qpid_agent_ManagementAgentImpl_*/ diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp index 106033f76f..84e0c650f2 100644 --- a/cpp/src/qpid/management/ManagementBroker.cpp +++ b/cpp/src/qpid/management/ManagementBroker.cpp @@ -55,6 +55,7 @@ ManagementBroker::ManagementBroker (string _dataDir, uint16_t _interval, Managea nextObjectId = 1; bootSequence = 1; nextRemoteBank = 10; + nextRequestSequence = 1; clientWasAdded = false; // Get from file or generate and save to file. @@ -155,8 +156,8 @@ void ManagementBroker::RegisterClass (string packageName, ManagementObject::writeSchemaCall_t schemaCall) { Mutex::ScopedLock lock (userLock); - PackageMap::iterator pIter = FindOrAddPackage (packageName); - AddClassLocal (pIter, className, md5Sum, schemaCall); + PackageMap::iterator pIter = FindOrAddPackageLH(packageName); + AddClass(pIter, className, md5Sum, schemaCall); } uint64_t ManagementBroker::addObject (ManagementObject* object, @@ -200,6 +201,17 @@ void ManagementBroker::clientAdded (void) Mutex::ScopedLock lock (userLock); clientWasAdded = true; + for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); + aIter != remoteAgents.end(); + aIter++) { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'x'); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey); + } } void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) @@ -512,8 +524,12 @@ void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_ sendCommandComplete (replyToKey, sequence); } -void ManagementBroker::handlePackageIndLH (Buffer& /*inBuffer*/, string /*replyToKey*/, uint32_t /*sequence*/) +void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/) { + std::string packageName; + + inBuffer.getShortString(packageName); + FindOrAddPackageLH(packageName); } void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) @@ -529,7 +545,7 @@ void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, cIter != cMap.end (); cIter++) { - if (cIter->second.hasSchema ()) + if (cIter->second->hasSchema ()) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; @@ -546,16 +562,46 @@ void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, sendCommandComplete (replyToKey, sequence); } -void ManagementBroker::SchemaClass::appendSchema (Buffer& buf) +void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t) +{ + std::string packageName; + SchemaClassKey key; + + inBuffer.getShortString(packageName); + inBuffer.getShortString(key.name); + inBuffer.getBin128(key.hash); + + PackageMap::iterator pIter = FindOrAddPackageLH(packageName); + ClassMap::iterator cIter = pIter->second.find(key); + if (cIter == pIter->second.end()) { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + uint32_t sequence = nextRequestSequence++; + + EncodeHeader (outBuffer, 'S', sequence); + outBuffer.putShortString(packageName); + outBuffer.putShortString(key.name); + outBuffer.putBin128(key.hash); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); + + SchemaClass* newSchema = new SchemaClass; + newSchema->pendingSequence = sequence; + pIter->second[key] = newSchema; + } +} + +void ManagementBroker::SchemaClass::appendSchema(Buffer& buf) { // If the management package is attached locally (embedded in the broker or // linked in via plug-in), call the schema handler directly. If the package // is from a remote management agent, send the stored schema information. if (writeSchemaCall != 0) - writeSchemaCall (buf); + writeSchemaCall(buf); else - buf.putRawData (buffer, bufferLen); + buf.putRawData(buffer, bufferLen); } void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) @@ -568,22 +614,19 @@ void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKe inBuffer.getBin128 (key.hash); PackageMap::iterator pIter = packages.find (packageName); - if (pIter != packages.end ()) - { + if (pIter != packages.end()) { ClassMap cMap = pIter->second; ClassMap::iterator cIter = cMap.find (key); - if (cIter != cMap.end ()) - { + if (cIter != cMap.end()) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - SchemaClass classInfo = cIter->second; + SchemaClass* classInfo = cIter->second; - if (classInfo.hasSchema()) - { - EncodeHeader (outBuffer, 's', sequence); - classInfo.appendSchema (outBuffer); + if (classInfo->hasSchema()) { + EncodeHeader(outBuffer, 's', sequence); + classInfo->appendSchema (outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); + outBuffer.reset(); SendBuffer (outBuffer, outLen, dExchange, replyToKey); } else @@ -596,6 +639,44 @@ void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKe sendCommandComplete (replyToKey, sequence, 1, "Package not found"); } +void ManagementBroker::handleSchemaResponseLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence) +{ + string packageName; + SchemaClassKey key; + + inBuffer.record(); + inBuffer.getShortString (packageName); + inBuffer.getShortString (key.name); + inBuffer.getBin128 (key.hash); + inBuffer.restore(); + + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) { + ClassMap cMap = pIter->second; + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end() && cIter->second->pendingSequence == sequence) { + size_t length = ValidateSchema(inBuffer); + if (length == 0) + cMap.erase(key); + else { + cIter->second->buffer = (uint8_t*) malloc(length); + cIter->second->bufferLen = length; + inBuffer.getRawData(cIter->second->buffer, cIter->second->bufferLen); + + // Publish a class-indication message + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'q'); + EncodeClassIndication (outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema"); + } + } + } +} + bool ManagementBroker::bankInUse (uint32_t bank) { for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); @@ -628,16 +709,16 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe string label; uint32_t requestedBank; uint32_t assignedBank; - Uuid sessionId; + string sessionName; Uuid systemId; inBuffer.getShortString (label); - sessionId.decode (inBuffer); + inBuffer.getShortString (sessionName); systemId.decode (inBuffer); requestedBank = inBuffer.getLong (); assignedBank = assignBankLH (requestedBank); - RemoteAgentMap::iterator aIter = remoteAgents.find (sessionId); + RemoteAgentMap::iterator aIter = remoteAgents.find (sessionName); if (aIter != remoteAgents.end()) { // There already exists an agent on this session. Reject the request. @@ -645,17 +726,21 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe return; } + // TODO: Reject requests for which the session name does not match an existing session. + RemoteAgent* agent = new RemoteAgent; agent->objIdBank = assignedBank; + agent->routingKey = replyToKey; agent->mgmtObject = new management::Agent (this, agent); - agent->mgmtObject->set_sessionId (sessionId); + agent->mgmtObject->set_sessionName (sessionName); agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); agent->mgmtObject->set_systemId (systemId); addObject (agent->mgmtObject); - remoteAgents[sessionId] = agent; + remoteAgents[sessionName] = agent; + // Send an Attach Response Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; @@ -734,16 +819,18 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg) if (!CheckHeader (inBuffer, &opcode, &sequence)) return; - if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence); - else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence); - else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence); - else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence); - else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence); - //else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence); - else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); + if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence); + else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence); + else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence); + else if (opcode == 'q') handleClassIndLH (inBuffer, replyToKey, sequence); + else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence); + else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); } -ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackage (std::string name) +ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name) { PackageMap::iterator pIter = packages.find (name); if (pIter != packages.end ()) @@ -767,10 +854,10 @@ ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackage (std:: return result.first; } -void ManagementBroker::AddClassLocal (PackageMap::iterator pIter, - string className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall) +void ManagementBroker::AddClass(PackageMap::iterator pIter, + string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) { SchemaClassKey key; ClassMap& cMap = pIter->second; @@ -785,12 +872,11 @@ void ManagementBroker::AddClassLocal (PackageMap::iterator pIter, // No such class found, create a new class with local information. QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." << key.name); - SchemaClass classInfo; + SchemaClass* classInfo = new SchemaClass; - classInfo.writeSchemaCall = schemaCall; + classInfo->writeSchemaCall = schemaCall; cMap[key] = classInfo; - - // TODO: Publish a class-indication message + cIter = cMap.find (key); } void ManagementBroker::EncodePackageIndication (Buffer& buf, @@ -810,3 +896,42 @@ void ManagementBroker::EncodeClassIndication (Buffer& buf, buf.putBin128 (key.hash); } +size_t ManagementBroker::ValidateSchema(Buffer& inBuffer) +{ + uint32_t start = inBuffer.getPosition(); + uint32_t end; + string text; + uint8_t hash[16]; + + inBuffer.record(); + inBuffer.getShortString(text); + inBuffer.getShortString(text); + inBuffer.getBin128(hash); + + uint16_t propCount = inBuffer.getShort(); + uint16_t statCount = inBuffer.getShort(); + uint16_t methCount = inBuffer.getShort(); + uint16_t evntCount = inBuffer.getShort(); + + for (uint16_t idx = 0; idx < propCount + statCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + } + + for (uint16_t idx = 0; idx < methCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + int argCount = ft.getInt("argCount"); + for (int mIdx = 0; mIdx < argCount; mIdx++) { + FieldTable aft; + aft.decode(inBuffer); + } + } + + if (evntCount != 0) + return 0; + + end = inBuffer.getPosition(); + inBuffer.restore(); // restore original position + return end - start; +} diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h index 5e9114c3f4..685b7db977 100644 --- a/cpp/src/qpid/management/ManagementBroker.h +++ b/cpp/src/qpid/management/ManagementBroker.h @@ -89,6 +89,7 @@ class ManagementBroker : public ManagementAgent struct RemoteAgent : public Manageable { uint32_t objIdBank; + std::string routingKey; Agent* mgmtObject; ManagementObject* GetManagementObject (void) const { return mgmtObject; } virtual ~RemoteAgent (); @@ -97,8 +98,8 @@ class ManagementBroker : public ManagementAgent // TODO: Eventually replace string with entire reply-to structure. reply-to // currently assumes that the exchange is "amq.direct" even though it could // in theory be specified differently. - typedef std::map<framing::Uuid, RemoteAgent*> RemoteAgentMap; - typedef std::vector<std::string> ReplyToVector; + typedef std::map<std::string, RemoteAgent*> RemoteAgentMap; + typedef std::vector<std::string> ReplyToVector; // Storage for known schema classes: // @@ -129,16 +130,16 @@ class ManagementBroker : public ManagementAgent struct SchemaClass { ManagementObject::writeSchemaCall_t writeSchemaCall; - ReplyToVector remoteAgents; - size_t bufferLen; - uint8_t* buffer; + uint32_t pendingSequence; + size_t bufferLen; + uint8_t* buffer; - SchemaClass () : writeSchemaCall(0), bufferLen(0), buffer(0) {} + SchemaClass () : writeSchemaCall(0), pendingSequence(0), bufferLen(0), buffer(0) {} bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); } void appendSchema (framing::Buffer& buf); }; - typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; + typedef std::map<SchemaClassKey, SchemaClass*, SchemaClassKeyComp> ClassMap; typedef std::map<std::string, ClassMap> PackageMap; RemoteAgentMap remoteAgents; @@ -162,6 +163,7 @@ class ManagementBroker : public ManagementAgent uint32_t localBank; uint32_t nextObjectId; uint32_t nextRemoteBank; + uint32_t nextRequestSequence; bool clientWasAdded; # define MA_BUFFER_SIZE 65536 @@ -183,11 +185,11 @@ class ManagementBroker : public ManagementAgent size_t first); void dispatchAgentCommandLH (broker::Message& msg); - PackageMap::iterator FindOrAddPackage (std::string name); - void AddClassLocal (PackageMap::iterator pIter, - std::string className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall); + PackageMap::iterator FindOrAddPackageLH(std::string name); + void AddClass(PackageMap::iterator pIter, + std::string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); void EncodePackageIndication (framing::Buffer& buf, PackageMap::iterator pIter); void EncodeClassIndication (framing::Buffer& buf, @@ -198,13 +200,17 @@ class ManagementBroker : public ManagementAgent uint32_t assignBankLH (uint32_t requestedPrefix); void sendCommandComplete (std::string replyToKey, uint32_t sequence, uint32_t code = 0, std::string text = std::string("OK")); - void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handlePackageQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handlePackageIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleClassQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handlePackageQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handlePackageIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleClassQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleClassIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + + size_t ValidateSchema(framing::Buffer&); }; }} |