summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-07-11 20:14:07 +0000
committerTed Ross <tross@apache.org>2008-07-11 20:14:07 +0000
commit2fd1b08b605d2664394ff5708c3cbaebd1dc21ef (patch)
treed0b5c7cfa8f31a1fc721fb45d7ca77a027875b7d /cpp/src
parent13e2db2a3d0d14881da3c088f084385740df0731 (diff)
downloadqpid-python-2fd1b08b605d2664394ff5708c3cbaebd1dc21ef.tar.gz
QPID-1174 Remote Management Agent for management of external components
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@676067 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am6
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.cpp426
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.h157
-rw-r--r--cpp/src/qpid/management/ManagementBroker.cpp201
-rw-r--r--cpp/src/qpid/management/ManagementBroker.h44
5 files changed, 775 insertions, 59 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index bfebd4ae88..74aa504e90 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -221,6 +221,8 @@ libqpidcommon_la_SOURCES = \
qpid/Plugin.cpp \
qpid/StringUtils.cpp \
qpid/Url.cpp \
+ qpid/management/Manageable.cpp \
+ qpid/management/ManagementObject.cpp \
qpid/sys/AggregateOutput.cpp \
qpid/sys/AsynchIOHandler.cpp \
qpid/sys/Dispatcher.cpp \
@@ -304,10 +306,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/TxBuffer.cpp \
qpid/broker/TxPublish.cpp \
qpid/broker/Vhost.cpp \
- qpid/management/Manageable.cpp \
qpid/management/ManagementBroker.cpp \
qpid/management/ManagementExchange.cpp \
- qpid/management/ManagementObject.cpp \
qpid/sys/TCPIOPlugin.cpp
if HAVE_XML
@@ -319,6 +319,7 @@ libqpidclient_la_LIBADD = libqpidcommon.la -luuid
libqpidclient_la_SOURCES = \
$(rgen_client_srcs) \
+ qpid/agent/ManagementAgentImpl.cpp \
qpid/client/AckPolicy.cpp \
qpid/client/Bounds.cpp \
qpid/client/ConnectionImpl.cpp \
@@ -367,6 +368,7 @@ nobase_include_HEADERS = \
qpid/memory.h \
qpid/shared_ptr.h \
qpid/agent/ManagementAgent.h \
+ qpid/agent/ManagementAgentImpl.h \
qpid/broker/Broker.h \
qpid/broker/SessionAdapter.h \
qpid/broker/Exchange.h \
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
new file mode 100644
index 0000000000..3c079a5a0a
--- /dev/null
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -0,0 +1,426 @@
+
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "qpid/management/Manageable.h"
+#include "qpid/management/ManagementObject.h"
+#include "ManagementAgentImpl.h"
+#include <list>
+#include <unistd.h>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::management;
+using namespace qpid::sys;
+using std::stringstream;
+using std::string;
+using std::cout;
+using std::endl;
+
+ManagementAgent* ManagementAgent::getAgent()
+{
+ //static ManagementAgent* agent = 0;
+
+ //if (agent == 0)
+ // agent = new ManagementAgentImpl();
+ //return agent;
+ return 0;
+}
+
+ManagementAgentImpl::ManagementAgentImpl() :
+ clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread)
+{
+ // TODO: Establish system ID
+}
+
+void ManagementAgentImpl::init (std::string brokerHost,
+ uint16_t brokerPort,
+ uint16_t intervalSeconds,
+ bool useExternalThread)
+{
+ interval = intervalSeconds;
+ extThread = useExternalThread;
+ nextObjectId = 1;
+
+ sessionId.generate();
+ queueName << "qmfagent-" << sessionId;
+ string dest = "qmfagent";
+
+ connection.open(brokerHost.c_str(), brokerPort);
+ session = connection.newSession (queueName.str());
+ dispatcher = new client::Dispatcher(session);
+
+
+ session.queueDeclare (arg::queue=queueName.str());
+ session.exchangeBind (arg::exchange="amq.direct", arg::queue=queueName.str(),
+ arg::bindingKey=queueName.str ());
+ session.messageSubscribe (arg::queue=queueName.str(),
+ arg::destination=dest);
+ session.messageFlow (arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF);
+ session.messageFlow (arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF);
+
+ Message attachRequest;
+ char rawbuffer[512]; // TODO: Modify Buffer so it can use stringstream
+ Buffer buffer (rawbuffer, 512);
+
+ attachRequest.getDeliveryProperties().setRoutingKey("agent");
+ attachRequest.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
+
+ EncodeHeader (buffer, 'A');
+ buffer.putShortString ("RemoteAgent [C++]");
+ buffer.putShortString (queueName.str());
+ systemId.encode (buffer);
+ buffer.putLong (11);
+
+ size_t length = 512 - buffer.available ();
+ string stringBuffer (rawbuffer, length);
+ attachRequest.setData (stringBuffer);
+
+ session.messageTransfer (arg::content=attachRequest, arg::destination="qpid.management");
+
+ dispatcher->listen (dest, this);
+ dispatcher->start ();
+}
+
+ManagementAgentImpl::~ManagementAgentImpl ()
+{
+ dispatcher->stop ();
+ delete dispatcher;
+}
+
+void ManagementAgentImpl::RegisterClass (std::string packageName,
+ std::string className,
+ uint8_t* md5Sum,
+ management::ManagementObject::writeSchemaCall_t schemaCall)
+{
+ Mutex::ScopedLock lock(agentLock);
+ PackageMap::iterator pIter = FindOrAddPackage (packageName);
+ AddClassLocal (pIter, className, md5Sum, schemaCall);
+}
+
+uint64_t ManagementAgentImpl::addObject (ManagementObject* object,
+ uint32_t /*persistId*/,
+ uint32_t /*persistBank*/)
+{
+ Mutex::ScopedLock lock(addLock);
+ uint64_t objectId;
+
+ // TODO: fix object-id handling
+ objectId = objIdPrefix | ((nextObjectId++) & 0x00FFFFFF);
+ object->setObjectId (objectId);
+ newManagementObjects[objectId] = object;
+ return objectId;
+}
+
+uint32_t ManagementAgentImpl::pollCallbacks (uint32_t /*callLimit*/)
+{
+ return 0;
+}
+
+int ManagementAgentImpl::getSignalFd (void)
+{
+ return -1;
+}
+
+void ManagementAgentImpl::handleAttachResponse (Buffer& inBuffer)
+{
+ Mutex::ScopedLock lock(agentLock);
+ uint32_t assigned;
+
+ assigned = inBuffer.getLong();
+ objIdPrefix = ((uint64_t) assigned) << 24;
+
+ // Send package indications for all local packages
+ for (PackageMap::iterator pIter = packages.begin();
+ pIter != packages.end();
+ pIter++) {
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader(outBuffer, 'p');
+ EncodePackageIndication(outBuffer, pIter);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset();
+ SendBuffer(outBuffer, outLen, "qpid.management", "agent");
+
+ // Send class indications for all local classes
+ ClassMap cMap = pIter->second;
+ for (ClassMap::iterator cIter = cMap.begin(); cIter != cMap.end(); cIter++) {
+ outBuffer.reset();
+ EncodeHeader(outBuffer, 'q');
+ EncodeClassIndication(outBuffer, pIter, cIter);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset();
+ SendBuffer(outBuffer, outLen, "qpid.management", "agent");
+ }
+ }
+}
+
+void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence)
+{
+ Mutex::ScopedLock lock(agentLock);
+ string packageName;
+ SchemaClassKey key;
+
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(key.name);
+ inBuffer.getBin128(key.hash);
+
+ PackageMap::iterator pIter = packages.find(packageName);
+ if (pIter != packages.end()) {
+ ClassMap cMap = pIter->second;
+ ClassMap::iterator cIter = cMap.find(key);
+ if (cIter != cMap.end()) {
+ SchemaClass schema = cIter->second;
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader(outBuffer, 's', sequence);
+ schema.writeSchemaCall(outBuffer);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset();
+ SendBuffer(outBuffer, outLen, "qpid.management", "agent");
+ }
+ }
+}
+
+void ManagementAgentImpl::handleConsoleAddedIndication()
+{
+ Mutex::ScopedLock lock(agentLock);
+ clientWasAdded = true;
+}
+
+void ManagementAgentImpl::received (Message& msg)
+{
+ string data = msg.getData ();
+ Buffer inBuffer (const_cast<char*>(data.c_str()), data.size());
+ uint8_t opcode;
+ uint32_t sequence;
+
+ if (CheckHeader (inBuffer, &opcode, &sequence))
+ {
+ if (opcode == 'a') handleAttachResponse(inBuffer);
+ else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence);
+ else if (opcode == 'x') handleConsoleAddedIndication();
+ }
+}
+
+void ManagementAgentImpl::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
+{
+ buf.putOctet ('A');
+ buf.putOctet ('M');
+ buf.putOctet ('1');
+ buf.putOctet (opcode);
+ buf.putLong (seq);
+}
+
+bool ManagementAgentImpl::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
+{
+ if (buf.getSize() < 8)
+ return false;
+
+ uint8_t h1 = buf.getOctet ();
+ uint8_t h2 = buf.getOctet ();
+ uint8_t h3 = buf.getOctet ();
+
+ *opcode = buf.getOctet ();
+ *seq = buf.getLong ();
+
+ return h1 == 'A' && h2 == 'M' && h3 == '1';
+}
+
+void ManagementAgentImpl::SendBuffer (Buffer& buf,
+ uint32_t length,
+ string exchange,
+ string routingKey)
+{
+ Message msg;
+ string data;
+
+ if (objIdPrefix == 0)
+ return;
+
+ buf.getRawData(data, length);
+ msg.getDeliveryProperties().setRoutingKey(routingKey);
+ msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
+ msg.setData (data);
+ session.messageTransfer (arg::content=msg, arg::destination=exchange);
+}
+
+ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage (std::string name)
+{
+ PackageMap::iterator pIter = packages.find (name);
+ if (pIter != packages.end ())
+ return pIter;
+
+ // No such package found, create a new map entry.
+ std::pair<PackageMap::iterator, bool> result =
+ packages.insert (std::pair<string, ClassMap> (name, ClassMap ()));
+
+ // Publish a package-indication message
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'p');
+ EncodePackageIndication (outBuffer, result.first);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, "qpid.management", "mgmt.schema.package");
+
+ return result.first;
+}
+
+void ManagementAgentImpl::moveNewObjectsLH()
+{
+ Mutex::ScopedLock lock (addLock);
+ for (ManagementObjectMap::iterator iter = newManagementObjects.begin ();
+ iter != newManagementObjects.end ();
+ iter++)
+ managementObjects[iter->first] = iter->second;
+ newManagementObjects.clear();
+}
+
+void ManagementAgentImpl::AddClassLocal (PackageMap::iterator pIter,
+ string className,
+ uint8_t* md5Sum,
+ management::ManagementObject::writeSchemaCall_t schemaCall)
+{
+ SchemaClassKey key;
+ ClassMap& cMap = pIter->second;
+
+ key.name = className;
+ memcpy (&key.hash, md5Sum, 16);
+
+ ClassMap::iterator cIter = cMap.find (key);
+ if (cIter != cMap.end ())
+ return;
+
+ // No such class found, create a new class with local information.
+ SchemaClass classInfo;
+
+ classInfo.writeSchemaCall = schemaCall;
+ cMap[key] = classInfo;
+
+ // TODO: Publish a class-indication message
+}
+
+void ManagementAgentImpl::EncodePackageIndication (Buffer& buf,
+ PackageMap::iterator pIter)
+{
+ buf.putShortString ((*pIter).first);
+}
+
+void ManagementAgentImpl::EncodeClassIndication (Buffer& buf,
+ PackageMap::iterator pIter,
+ ClassMap::iterator cIter)
+{
+ SchemaClassKey key = (*cIter).first;
+
+ buf.putShortString ((*pIter).first);
+ buf.putShortString (key.name);
+ buf.putBin128 (key.hash);
+}
+
+void ManagementAgentImpl::PeriodicProcessing()
+{
+#define BUFSIZE 65536
+ Mutex::ScopedLock lock(agentLock);
+ char msgChars[BUFSIZE];
+ uint32_t contentSize;
+ string routingKey;
+ std::list<uint64_t> deleteList;
+
+ {
+ Buffer msgBuffer(msgChars, BUFSIZE);
+ EncodeHeader(msgBuffer, 'h');
+ msgBuffer.putLongLong(uint64_t(Duration(now())));
+
+ contentSize = BUFSIZE - msgBuffer.available ();
+ msgBuffer.reset ();
+ routingKey = "mgmt." + systemId.str() + ".heartbeat";
+ SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey);
+ }
+
+ moveNewObjectsLH();
+
+ if (clientWasAdded)
+ {
+ clientWasAdded = false;
+ for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+ iter != managementObjects.end ();
+ iter++)
+ {
+ ManagementObject* object = iter->second;
+ object->setAllChanged ();
+ }
+ }
+
+ if (managementObjects.empty ())
+ return;
+
+ for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+ iter != managementObjects.end ();
+ iter++)
+ {
+ ManagementObject* object = iter->second;
+
+ if (object->getConfigChanged () || object->isDeleted ())
+ {
+ Buffer msgBuffer (msgChars, BUFSIZE);
+ EncodeHeader (msgBuffer, 'c');
+ object->writeProperties(msgBuffer);
+
+ contentSize = BUFSIZE - msgBuffer.available ();
+ msgBuffer.reset ();
+ routingKey = "mgmt." + systemId.str() + ".prop." + object->getClassName ();
+ SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey);
+ }
+
+ if (object->getInstChanged ())
+ {
+ Buffer msgBuffer (msgChars, BUFSIZE);
+ EncodeHeader (msgBuffer, 'i');
+ object->writeStatistics(msgBuffer);
+
+ contentSize = BUFSIZE - msgBuffer.available ();
+ msgBuffer.reset ();
+ routingKey = "mgmt." + systemId.str () + ".stat." + object->getClassName ();
+ SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey);
+ }
+
+ if (object->isDeleted ())
+ deleteList.push_back (iter->first);
+ }
+
+ // Delete flagged objects
+ for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin ();
+ iter != deleteList.rend ();
+ iter++)
+ managementObjects.erase (*iter);
+
+ deleteList.clear ();
+}
+
+void ManagementAgentImpl::BackgroundThread::run()
+{
+ while (true) {
+ ::sleep(5);
+ agent.PeriodicProcessing();
+ }
+}
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h
new file mode 100644
index 0000000000..b7572fe833
--- /dev/null
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -0,0 +1,157 @@
+#ifndef _qpid_agent_ManagementAgentImpl_
+#define _qpid_agent_ManagementAgentImpl_
+
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "ManagementAgent.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Dispatcher.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/framing/Uuid.h"
+#include <iostream>
+#include <sstream>
+
+namespace qpid {
+namespace management {
+
+class ManagementAgentImpl : public ManagementAgent, public client::MessageListener
+{
+ public:
+
+ ManagementAgentImpl ();
+ virtual ~ManagementAgentImpl ();
+
+ int getMaxThreads() { return 1; }
+ void init(std::string brokerHost = "localhost",
+ uint16_t brokerPort = 5672,
+ uint16_t intervalSeconds = 10,
+ bool useExternalThread = false);
+ void RegisterClass(std::string packageName,
+ std::string className,
+ uint8_t* md5Sum,
+ management::ManagementObject::writeSchemaCall_t schemaCall);
+ uint64_t addObject (management::ManagementObject* objectPtr,
+ uint32_t persistId = 0,
+ uint32_t persistBank = 4);
+ uint32_t pollCallbacks (uint32_t callLimit = 0);
+ int getSignalFd (void);
+
+ void PeriodicProcessing();
+
+ private:
+
+ struct SchemaClassKey
+ {
+ std::string name;
+ uint8_t hash[16];
+ };
+
+ struct SchemaClassKeyComp
+ {
+ bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
+ {
+ if (lhs.name != rhs.name)
+ return lhs.name < rhs.name;
+ else
+ for (int i = 0; i < 16; i++)
+ if (lhs.hash[i] != rhs.hash[i])
+ return lhs.hash[i] < rhs.hash[i];
+ return false;
+ }
+ };
+
+ struct SchemaClass
+ {
+ management::ManagementObject::writeSchemaCall_t writeSchemaCall;
+
+ SchemaClass () : writeSchemaCall(0) {}
+ };
+
+ typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
+ typedef std::map<std::string, ClassMap> PackageMap;
+
+ PackageMap packages;
+ management::ManagementObjectMap managementObjects;
+ management::ManagementObjectMap newManagementObjects;
+
+ void received (client::Message& msg);
+
+ uint16_t interval;
+ bool extThread;
+ uint64_t nextObjectId;
+ sys::Mutex agentLock;
+ sys::Mutex addLock;
+ framing::Uuid sessionId;
+ framing::Uuid systemId;
+
+ int signalFdIn, signalFdOut;
+ client::Connection connection;
+ client::Session session;
+ client::Dispatcher* dispatcher;
+ bool clientWasAdded;
+ uint64_t objIdPrefix;
+ std::stringstream queueName;
+# define MA_BUFFER_SIZE 65536
+ char outputBuffer[MA_BUFFER_SIZE];
+
+ class BackgroundThread : public sys::Runnable
+ {
+ ManagementAgentImpl& agent;
+ void run();
+ public:
+ BackgroundThread(ManagementAgentImpl& _agent) : agent(_agent) {}
+ };
+
+ BackgroundThread bgThread;
+ sys::Thread thread;
+
+ PackageMap::iterator FindOrAddPackage (std::string name);
+ void moveNewObjectsLH();
+ void AddClassLocal (PackageMap::iterator pIter,
+ std::string className,
+ uint8_t* md5Sum,
+ management::ManagementObject::writeSchemaCall_t schemaCall);
+ void EncodePackageIndication (qpid::framing::Buffer& buf,
+ PackageMap::iterator pIter);
+ void EncodeClassIndication (qpid::framing::Buffer& buf,
+ PackageMap::iterator pIter,
+ ClassMap::iterator cIter);
+ void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
+ bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
+ void SendBuffer (qpid::framing::Buffer& buf,
+ uint32_t length,
+ std::string exchange,
+ std::string routingKey);
+ void handleAttachResponse (qpid::framing::Buffer& inBuffer);
+ void handlePackageRequest (qpid::framing::Buffer& inBuffer);
+ void handleClassQuery (qpid::framing::Buffer& inBuffer);
+ void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence);
+ void handleConsoleAddedIndication();
+};
+
+}}
+
+#endif /*!_qpid_agent_ManagementAgentImpl_*/
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp
index 106033f76f..84e0c650f2 100644
--- a/cpp/src/qpid/management/ManagementBroker.cpp
+++ b/cpp/src/qpid/management/ManagementBroker.cpp
@@ -55,6 +55,7 @@ ManagementBroker::ManagementBroker (string _dataDir, uint16_t _interval, Managea
nextObjectId = 1;
bootSequence = 1;
nextRemoteBank = 10;
+ nextRequestSequence = 1;
clientWasAdded = false;
// Get from file or generate and save to file.
@@ -155,8 +156,8 @@ void ManagementBroker::RegisterClass (string packageName,
ManagementObject::writeSchemaCall_t schemaCall)
{
Mutex::ScopedLock lock (userLock);
- PackageMap::iterator pIter = FindOrAddPackage (packageName);
- AddClassLocal (pIter, className, md5Sum, schemaCall);
+ PackageMap::iterator pIter = FindOrAddPackageLH(packageName);
+ AddClass(pIter, className, md5Sum, schemaCall);
}
uint64_t ManagementBroker::addObject (ManagementObject* object,
@@ -200,6 +201,17 @@ void ManagementBroker::clientAdded (void)
Mutex::ScopedLock lock (userLock);
clientWasAdded = true;
+ for (RemoteAgentMap::iterator aIter = remoteAgents.begin();
+ aIter != remoteAgents.end();
+ aIter++) {
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'x');
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey);
+ }
}
void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
@@ -512,8 +524,12 @@ void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_
sendCommandComplete (replyToKey, sequence);
}
-void ManagementBroker::handlePackageIndLH (Buffer& /*inBuffer*/, string /*replyToKey*/, uint32_t /*sequence*/)
+void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/)
{
+ std::string packageName;
+
+ inBuffer.getShortString(packageName);
+ FindOrAddPackageLH(packageName);
}
void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
@@ -529,7 +545,7 @@ void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey,
cIter != cMap.end ();
cIter++)
{
- if (cIter->second.hasSchema ())
+ if (cIter->second->hasSchema ())
{
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
@@ -546,16 +562,46 @@ void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey,
sendCommandComplete (replyToKey, sequence);
}
-void ManagementBroker::SchemaClass::appendSchema (Buffer& buf)
+void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t)
+{
+ std::string packageName;
+ SchemaClassKey key;
+
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(key.name);
+ inBuffer.getBin128(key.hash);
+
+ PackageMap::iterator pIter = FindOrAddPackageLH(packageName);
+ ClassMap::iterator cIter = pIter->second.find(key);
+ if (cIter == pIter->second.end()) {
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+ uint32_t sequence = nextRequestSequence++;
+
+ EncodeHeader (outBuffer, 'S', sequence);
+ outBuffer.putShortString(packageName);
+ outBuffer.putShortString(key.name);
+ outBuffer.putBin128(key.hash);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+
+ SchemaClass* newSchema = new SchemaClass;
+ newSchema->pendingSequence = sequence;
+ pIter->second[key] = newSchema;
+ }
+}
+
+void ManagementBroker::SchemaClass::appendSchema(Buffer& buf)
{
// If the management package is attached locally (embedded in the broker or
// linked in via plug-in), call the schema handler directly. If the package
// is from a remote management agent, send the stored schema information.
if (writeSchemaCall != 0)
- writeSchemaCall (buf);
+ writeSchemaCall(buf);
else
- buf.putRawData (buffer, bufferLen);
+ buf.putRawData(buffer, bufferLen);
}
void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
@@ -568,22 +614,19 @@ void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKe
inBuffer.getBin128 (key.hash);
PackageMap::iterator pIter = packages.find (packageName);
- if (pIter != packages.end ())
- {
+ if (pIter != packages.end()) {
ClassMap cMap = pIter->second;
ClassMap::iterator cIter = cMap.find (key);
- if (cIter != cMap.end ())
- {
+ if (cIter != cMap.end()) {
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- SchemaClass classInfo = cIter->second;
+ SchemaClass* classInfo = cIter->second;
- if (classInfo.hasSchema())
- {
- EncodeHeader (outBuffer, 's', sequence);
- classInfo.appendSchema (outBuffer);
+ if (classInfo->hasSchema()) {
+ EncodeHeader(outBuffer, 's', sequence);
+ classInfo->appendSchema (outBuffer);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
+ outBuffer.reset();
SendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
else
@@ -596,6 +639,44 @@ void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKe
sendCommandComplete (replyToKey, sequence, 1, "Package not found");
}
+void ManagementBroker::handleSchemaResponseLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence)
+{
+ string packageName;
+ SchemaClassKey key;
+
+ inBuffer.record();
+ inBuffer.getShortString (packageName);
+ inBuffer.getShortString (key.name);
+ inBuffer.getBin128 (key.hash);
+ inBuffer.restore();
+
+ PackageMap::iterator pIter = packages.find(packageName);
+ if (pIter != packages.end()) {
+ ClassMap cMap = pIter->second;
+ ClassMap::iterator cIter = cMap.find(key);
+ if (cIter != cMap.end() && cIter->second->pendingSequence == sequence) {
+ size_t length = ValidateSchema(inBuffer);
+ if (length == 0)
+ cMap.erase(key);
+ else {
+ cIter->second->buffer = (uint8_t*) malloc(length);
+ cIter->second->bufferLen = length;
+ inBuffer.getRawData(cIter->second->buffer, cIter->second->bufferLen);
+
+ // Publish a class-indication message
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'q');
+ EncodeClassIndication (outBuffer, pIter, cIter);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema");
+ }
+ }
+ }
+}
+
bool ManagementBroker::bankInUse (uint32_t bank)
{
for (RemoteAgentMap::iterator aIter = remoteAgents.begin();
@@ -628,16 +709,16 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
string label;
uint32_t requestedBank;
uint32_t assignedBank;
- Uuid sessionId;
+ string sessionName;
Uuid systemId;
inBuffer.getShortString (label);
- sessionId.decode (inBuffer);
+ inBuffer.getShortString (sessionName);
systemId.decode (inBuffer);
requestedBank = inBuffer.getLong ();
assignedBank = assignBankLH (requestedBank);
- RemoteAgentMap::iterator aIter = remoteAgents.find (sessionId);
+ RemoteAgentMap::iterator aIter = remoteAgents.find (sessionName);
if (aIter != remoteAgents.end())
{
// There already exists an agent on this session. Reject the request.
@@ -645,17 +726,21 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
return;
}
+ // TODO: Reject requests for which the session name does not match an existing session.
+
RemoteAgent* agent = new RemoteAgent;
agent->objIdBank = assignedBank;
+ agent->routingKey = replyToKey;
agent->mgmtObject = new management::Agent (this, agent);
- agent->mgmtObject->set_sessionId (sessionId);
+ agent->mgmtObject->set_sessionName (sessionName);
agent->mgmtObject->set_label (label);
agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
agent->mgmtObject->set_systemId (systemId);
addObject (agent->mgmtObject);
- remoteAgents[sessionId] = agent;
+ remoteAgents[sessionName] = agent;
+ // Send an Attach Response
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
@@ -734,16 +819,18 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg)
if (!CheckHeader (inBuffer, &opcode, &sequence))
return;
- if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence);
- //else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence);
+ if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'q') handleClassIndLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence);
}
-ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackage (std::string name)
+ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name)
{
PackageMap::iterator pIter = packages.find (name);
if (pIter != packages.end ())
@@ -767,10 +854,10 @@ ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackage (std::
return result.first;
}
-void ManagementBroker::AddClassLocal (PackageMap::iterator pIter,
- string className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall)
+void ManagementBroker::AddClass(PackageMap::iterator pIter,
+ string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
{
SchemaClassKey key;
ClassMap& cMap = pIter->second;
@@ -785,12 +872,11 @@ void ManagementBroker::AddClassLocal (PackageMap::iterator pIter,
// No such class found, create a new class with local information.
QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." <<
key.name);
- SchemaClass classInfo;
+ SchemaClass* classInfo = new SchemaClass;
- classInfo.writeSchemaCall = schemaCall;
+ classInfo->writeSchemaCall = schemaCall;
cMap[key] = classInfo;
-
- // TODO: Publish a class-indication message
+ cIter = cMap.find (key);
}
void ManagementBroker::EncodePackageIndication (Buffer& buf,
@@ -810,3 +896,42 @@ void ManagementBroker::EncodeClassIndication (Buffer& buf,
buf.putBin128 (key.hash);
}
+size_t ManagementBroker::ValidateSchema(Buffer& inBuffer)
+{
+ uint32_t start = inBuffer.getPosition();
+ uint32_t end;
+ string text;
+ uint8_t hash[16];
+
+ inBuffer.record();
+ inBuffer.getShortString(text);
+ inBuffer.getShortString(text);
+ inBuffer.getBin128(hash);
+
+ uint16_t propCount = inBuffer.getShort();
+ uint16_t statCount = inBuffer.getShort();
+ uint16_t methCount = inBuffer.getShort();
+ uint16_t evntCount = inBuffer.getShort();
+
+ for (uint16_t idx = 0; idx < propCount + statCount; idx++) {
+ FieldTable ft;
+ ft.decode(inBuffer);
+ }
+
+ for (uint16_t idx = 0; idx < methCount; idx++) {
+ FieldTable ft;
+ ft.decode(inBuffer);
+ int argCount = ft.getInt("argCount");
+ for (int mIdx = 0; mIdx < argCount; mIdx++) {
+ FieldTable aft;
+ aft.decode(inBuffer);
+ }
+ }
+
+ if (evntCount != 0)
+ return 0;
+
+ end = inBuffer.getPosition();
+ inBuffer.restore(); // restore original position
+ return end - start;
+}
diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h
index 5e9114c3f4..685b7db977 100644
--- a/cpp/src/qpid/management/ManagementBroker.h
+++ b/cpp/src/qpid/management/ManagementBroker.h
@@ -89,6 +89,7 @@ class ManagementBroker : public ManagementAgent
struct RemoteAgent : public Manageable
{
uint32_t objIdBank;
+ std::string routingKey;
Agent* mgmtObject;
ManagementObject* GetManagementObject (void) const { return mgmtObject; }
virtual ~RemoteAgent ();
@@ -97,8 +98,8 @@ class ManagementBroker : public ManagementAgent
// TODO: Eventually replace string with entire reply-to structure. reply-to
// currently assumes that the exchange is "amq.direct" even though it could
// in theory be specified differently.
- typedef std::map<framing::Uuid, RemoteAgent*> RemoteAgentMap;
- typedef std::vector<std::string> ReplyToVector;
+ typedef std::map<std::string, RemoteAgent*> RemoteAgentMap;
+ typedef std::vector<std::string> ReplyToVector;
// Storage for known schema classes:
//
@@ -129,16 +130,16 @@ class ManagementBroker : public ManagementAgent
struct SchemaClass
{
ManagementObject::writeSchemaCall_t writeSchemaCall;
- ReplyToVector remoteAgents;
- size_t bufferLen;
- uint8_t* buffer;
+ uint32_t pendingSequence;
+ size_t bufferLen;
+ uint8_t* buffer;
- SchemaClass () : writeSchemaCall(0), bufferLen(0), buffer(0) {}
+ SchemaClass () : writeSchemaCall(0), pendingSequence(0), bufferLen(0), buffer(0) {}
bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); }
void appendSchema (framing::Buffer& buf);
};
- typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
+ typedef std::map<SchemaClassKey, SchemaClass*, SchemaClassKeyComp> ClassMap;
typedef std::map<std::string, ClassMap> PackageMap;
RemoteAgentMap remoteAgents;
@@ -162,6 +163,7 @@ class ManagementBroker : public ManagementAgent
uint32_t localBank;
uint32_t nextObjectId;
uint32_t nextRemoteBank;
+ uint32_t nextRequestSequence;
bool clientWasAdded;
# define MA_BUFFER_SIZE 65536
@@ -183,11 +185,11 @@ class ManagementBroker : public ManagementAgent
size_t first);
void dispatchAgentCommandLH (broker::Message& msg);
- PackageMap::iterator FindOrAddPackage (std::string name);
- void AddClassLocal (PackageMap::iterator pIter,
- std::string className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall);
+ PackageMap::iterator FindOrAddPackageLH(std::string name);
+ void AddClass(PackageMap::iterator pIter,
+ std::string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
void EncodePackageIndication (framing::Buffer& buf,
PackageMap::iterator pIter);
void EncodeClassIndication (framing::Buffer& buf,
@@ -198,13 +200,17 @@ class ManagementBroker : public ManagementAgent
uint32_t assignBankLH (uint32_t requestedPrefix);
void sendCommandComplete (std::string replyToKey, uint32_t sequence,
uint32_t code = 0, std::string text = std::string("OK"));
- void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handlePackageQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handlePackageIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handleClassQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handlePackageQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handlePackageIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleClassQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleClassIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+
+ size_t ValidateSchema(framing::Buffer&);
};
}}