diff options
author | Ted Ross <tross@apache.org> | 2008-07-11 20:14:07 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-07-11 20:14:07 +0000 |
commit | 2ebe3bcb668151cfd9a860e4416fe4478d9a56f4 (patch) | |
tree | 806288ff720f5b6bd73709e008e4f63c7e838896 /qpid/cpp | |
parent | 525081bf3d3e9cb04cd9c7d3be030b4a2153be23 (diff) | |
download | qpid-python-2ebe3bcb668151cfd9a860e4416fe4478d9a56f4.tar.gz |
QPID-1174 Remote Management Agent for management of external components
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@676067 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r-- | qpid/cpp/examples/qmf-agent/Makefile | 85 | ||||
-rw-r--r-- | qpid/cpp/examples/qmf-agent/example.cpp | 97 | ||||
-rw-r--r-- | qpid/cpp/examples/qmf-agent/schema.xml | 57 | ||||
-rwxr-xr-x | qpid/cpp/managementgen/main.py | 6 | ||||
-rwxr-xr-x | qpid/cpp/managementgen/schema.py | 5 | ||||
-rw-r--r-- | qpid/cpp/managementgen/templates/Package.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/Makefile.am | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 426 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.h | 157 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementBroker.cpp | 201 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementBroker.h | 44 |
11 files changed, 1017 insertions, 69 deletions
diff --git a/qpid/cpp/examples/qmf-agent/Makefile b/qpid/cpp/examples/qmf-agent/Makefile new file mode 100644 index 0000000000..cc33dd1dc6 --- /dev/null +++ b/qpid/cpp/examples/qmf-agent/Makefile @@ -0,0 +1,85 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +SRC_DIR = . +QPID_DIR = ../../.. +SCHEMA_FILE = $(SRC_DIR)/schema.xml +GEN_DIR = $(SRC_DIR)/gen +OUT_FILE = $(SRC_DIR)/qmf-agent + +CC = gcc +LIB_DIR = $(QPID_DIR)/cpp/src/.libs +CC_INCLUDES = -I$(SRC_DIR) -I$(QPID_DIR)/cpp/src -I$(QPID_DIR)/cpp/src/gen -I$(GEN_DIR) +CC_FLAGS = -g -O2 +LD_FLAGS = -lqpidclient -lqpidcommon -L$(LIB_DIR) +SPEC_DIR = $(QPID_DIR)/specs +MGEN_DIR = $(QPID_DIR)/cpp/managementgen +TEMPLATE_DIR = $(MGEN_DIR)/templates +MGEN = $(MGEN_DIR)/main.py +OBJ_DIR = $(SRC_DIR)/.libs + +vpath %.cpp $(SRC_DIR):$(GEN_DIR) +vpath %.d $(OBJ_DIR) +vpath %.o $(OBJ_DIR) + +cpps = $(wildcard $(SRC_DIR)/*.cpp) +cpps += $(wildcard $(GEN_DIR)/*.cpp) +deps = $(addsuffix .d, $(basename $(cpps))) +objects = $(addsuffix .o, $(basename $(cpps))) + +.PHONY: all clean + +#========================================================== +# Pass 0: generate source files from schema +ifeq ($(MAKELEVEL), 0) + +all: + $(MGEN) $(SCHEMA_FILE) $(SPEC_DIR)/management-types.xml $(TEMPLATE_DIR) $(GEN_DIR) + $(MAKE) + +clean: + rm -rf $(GEN_DIR) $(OUT_FILE) *.d *.o + + +#========================================================== +# Pass 1: generate dependencies +else ifeq ($(MAKELEVEL), 1) + +all: $(deps) + $(MAKE) + +%.d : %.cpp + $(CC) -M $(CC_FLAGS) $(CC_INCLUDES) $< > $@ + + +#========================================================== +# Pass 2: build project +else ifeq ($(MAKELEVEL), 2) + +$(OUT_FILE) : $(objects) + $(CC) -o $(OUT_FILE) $(CC_FLAGS) $(LD_FLAGS) $(objects) + +include $(deps) + +%.o : %.cpp + $(CC) -c $(CC_FLAGS) $(CC_INCLUDES) -o $@ $< + +endif + + diff --git a/qpid/cpp/examples/qmf-agent/example.cpp b/qpid/cpp/examples/qmf-agent/example.cpp new file mode 100644 index 0000000000..4113d22cac --- /dev/null +++ b/qpid/cpp/examples/qmf-agent/example.cpp @@ -0,0 +1,97 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/management/Manageable.h> +#include <qpid/management/ManagementObject.h> +#include <qpid/agent/ManagementAgent.h> +#include <qpid/agent/ManagementAgentImpl.h> +#include "Parent.h" +#include "PackageQmf_example.h" + +#include <unistd.h> +#include <cstdlib> +#include <iostream> + +#include <sstream> + +using namespace qpid::management; +using namespace std; + +//============================================================== +// CoreClass is the operational class that corresponds to the +// "Parent" class in the management schema. +//============================================================== +class CoreClass : public Manageable +{ + string name; + Parent* mgmtObject; + +public: + + CoreClass(ManagementAgent* agent, string _name); + ~CoreClass() {} + + void bumpCounter() { mgmtObject->inc_count(); } + + ManagementObject* GetManagementObject(void) const + { return mgmtObject; } +}; + +CoreClass::CoreClass(ManagementAgent* agent, string _name) : name(_name) +{ + mgmtObject = new Parent(agent, this, name); + + agent->addObject(mgmtObject); + mgmtObject->set_state("IDLE"); +} + + +//============================================================== +// Main program +//============================================================== +int main(int argc, char** argv) { + const char* host = argc>1 ? argv[1] : "127.0.0.1"; + int port = argc>2 ? atoi(argv[2]) : 5672; + + // Create the qmf management agent + ManagementAgent* agent = new ManagementAgentImpl(); + + // Register the Qmf_example schema with the agent + PackageQmf_example packageInit(agent); + + // Start the agent. It will attempt to make a connection to the + // management broker + agent->init (string(host), port); + + // Allocate some core objects + CoreClass core1(agent, "Example Core Object #1"); + CoreClass core2(agent, "Example Core Object #2"); + CoreClass core3(agent, "Example Core Object #3"); + + // Periodically bump a counter in core1 to provide a changing statistical value + while (1) + { + sleep(1); + core1.bumpCounter(); + } +} + + diff --git a/qpid/cpp/examples/qmf-agent/schema.xml b/qpid/cpp/examples/qmf-agent/schema.xml new file mode 100644 index 0000000000..de8776c818 --- /dev/null +++ b/qpid/cpp/examples/qmf-agent/schema.xml @@ -0,0 +1,57 @@ +<schema package="qmf_example"> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + + <!-- + =============================================================== + Parent + =============================================================== + --> + <class name="Parent"> + + This class represents a parent object + + <property name="name" type="sstr" access="RC" index="y"/> + + <statistic name="state" type="sstr" desc="Operational state of the link"/> + <statistic name="count" type="count64" unit="tick" desc="Counter that increases monotonically"/> + + <method name="create_child" desc="Create child object"> + <arg name="name" dir="I" type="sstr"/> + <arg name="childRef" dir="O" type="objId"/> + </method> + </class> + + + <!-- + =============================================================== + Child + =============================================================== + --> + <class name="Child"> + <property name="ParentRef" type="objId" references="Parent" access="RC" index="y" parentRef="y"/> + <property name="name" type="sstr" access="RC" index="y"/> + + <statistic name="count" type="count64" unit="tick" desc="Counter that increases monotonically"/> + + <method name="delete"/> + </class> +</schema> + diff --git a/qpid/cpp/managementgen/main.py b/qpid/cpp/managementgen/main.py index 87ef3d5298..4459177a53 100755 --- a/qpid/cpp/managementgen/main.py +++ b/qpid/cpp/managementgen/main.py @@ -28,9 +28,6 @@ usage = "usage: %prog [options] schema-document type-document template-director parser = OptionParser (usage=usage) parser.add_option ("-m", "--makefile", dest="makefile", metavar="FILE", help="Makefile fragment") -parser.add_option ("-i", "--include-prefix", dest="include_prefix", metavar="PATH", - default="qpid/management/", - help="Prefix for #include of generated headers in generated source, default: qpid/management/") (opts, args) = parser.parse_args () @@ -42,9 +39,6 @@ typefile = args[1] templatedir = args[2] outdir = args[3] -if opts.include_prefix == ".": - opts.include_prefix = None - gen = Generator (outdir, templatedir) schema = PackageSchema (typefile, schemafile, opts) diff --git a/qpid/cpp/managementgen/schema.py b/qpid/cpp/managementgen/schema.py index 4f5dc216ab..921c1bf01f 100755 --- a/qpid/cpp/managementgen/schema.py +++ b/qpid/cpp/managementgen/schema.py @@ -890,8 +890,7 @@ class SchemaClass: def genMethodArgIncludes (self, stream, variables): for method in self.methods: if method.getArgCount () > 0: - stream.write ("#include \"" + (self.options.include_prefix or "") +\ - "Args" + method.getFullName () + ".h\"\n") + stream.write ("#include \"Args" + method.getFullName () + ".h\"\n") def genMethodCount (self, stream, variables): stream.write ("%d" % len (self.methods)) @@ -1040,7 +1039,7 @@ class PackageSchema: def genClassIncludes (self, stream, variables): for _class in self.classes: - stream.write ("#include \"qpid/management/") + stream.write ("#include \"") _class.genNameCap (stream, variables) stream.write (".h\"\n") diff --git a/qpid/cpp/managementgen/templates/Package.cpp b/qpid/cpp/managementgen/templates/Package.cpp index 8bb2d42c47..15e7fc15ec 100644 --- a/qpid/cpp/managementgen/templates/Package.cpp +++ b/qpid/cpp/managementgen/templates/Package.cpp @@ -20,7 +20,7 @@ /*MGEN:Root.Disclaimer*/ -#include "qpid/management/Package/*MGEN:Schema.PackageNameCap*/.h" +#include "Package/*MGEN:Schema.PackageNameCap*/.h" /*MGEN:Schema.ClassIncludes*/ using namespace qpid::management; diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index bfebd4ae88..74aa504e90 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -221,6 +221,8 @@ libqpidcommon_la_SOURCES = \ qpid/Plugin.cpp \ qpid/StringUtils.cpp \ qpid/Url.cpp \ + qpid/management/Manageable.cpp \ + qpid/management/ManagementObject.cpp \ qpid/sys/AggregateOutput.cpp \ qpid/sys/AsynchIOHandler.cpp \ qpid/sys/Dispatcher.cpp \ @@ -304,10 +306,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/TxBuffer.cpp \ qpid/broker/TxPublish.cpp \ qpid/broker/Vhost.cpp \ - qpid/management/Manageable.cpp \ qpid/management/ManagementBroker.cpp \ qpid/management/ManagementExchange.cpp \ - qpid/management/ManagementObject.cpp \ qpid/sys/TCPIOPlugin.cpp if HAVE_XML @@ -319,6 +319,7 @@ libqpidclient_la_LIBADD = libqpidcommon.la -luuid libqpidclient_la_SOURCES = \ $(rgen_client_srcs) \ + qpid/agent/ManagementAgentImpl.cpp \ qpid/client/AckPolicy.cpp \ qpid/client/Bounds.cpp \ qpid/client/ConnectionImpl.cpp \ @@ -367,6 +368,7 @@ nobase_include_HEADERS = \ qpid/memory.h \ qpid/shared_ptr.h \ qpid/agent/ManagementAgent.h \ + qpid/agent/ManagementAgentImpl.h \ qpid/broker/Broker.h \ qpid/broker/SessionAdapter.h \ qpid/broker/Exchange.h \ diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp new file mode 100644 index 0000000000..3c079a5a0a --- /dev/null +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -0,0 +1,426 @@ + +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "qpid/management/Manageable.h" +#include "qpid/management/ManagementObject.h" +#include "ManagementAgentImpl.h" +#include <list> +#include <unistd.h> + +using namespace qpid::client; +using namespace qpid::framing; +using namespace qpid::management; +using namespace qpid::sys; +using std::stringstream; +using std::string; +using std::cout; +using std::endl; + +ManagementAgent* ManagementAgent::getAgent() +{ + //static ManagementAgent* agent = 0; + + //if (agent == 0) + // agent = new ManagementAgentImpl(); + //return agent; + return 0; +} + +ManagementAgentImpl::ManagementAgentImpl() : + clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread) +{ + // TODO: Establish system ID +} + +void ManagementAgentImpl::init (std::string brokerHost, + uint16_t brokerPort, + uint16_t intervalSeconds, + bool useExternalThread) +{ + interval = intervalSeconds; + extThread = useExternalThread; + nextObjectId = 1; + + sessionId.generate(); + queueName << "qmfagent-" << sessionId; + string dest = "qmfagent"; + + connection.open(brokerHost.c_str(), brokerPort); + session = connection.newSession (queueName.str()); + dispatcher = new client::Dispatcher(session); + + + session.queueDeclare (arg::queue=queueName.str()); + session.exchangeBind (arg::exchange="amq.direct", arg::queue=queueName.str(), + arg::bindingKey=queueName.str ()); + session.messageSubscribe (arg::queue=queueName.str(), + arg::destination=dest); + session.messageFlow (arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF); + session.messageFlow (arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF); + + Message attachRequest; + char rawbuffer[512]; // TODO: Modify Buffer so it can use stringstream + Buffer buffer (rawbuffer, 512); + + attachRequest.getDeliveryProperties().setRoutingKey("agent"); + attachRequest.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); + + EncodeHeader (buffer, 'A'); + buffer.putShortString ("RemoteAgent [C++]"); + buffer.putShortString (queueName.str()); + systemId.encode (buffer); + buffer.putLong (11); + + size_t length = 512 - buffer.available (); + string stringBuffer (rawbuffer, length); + attachRequest.setData (stringBuffer); + + session.messageTransfer (arg::content=attachRequest, arg::destination="qpid.management"); + + dispatcher->listen (dest, this); + dispatcher->start (); +} + +ManagementAgentImpl::~ManagementAgentImpl () +{ + dispatcher->stop (); + delete dispatcher; +} + +void ManagementAgentImpl::RegisterClass (std::string packageName, + std::string className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall) +{ + Mutex::ScopedLock lock(agentLock); + PackageMap::iterator pIter = FindOrAddPackage (packageName); + AddClassLocal (pIter, className, md5Sum, schemaCall); +} + +uint64_t ManagementAgentImpl::addObject (ManagementObject* object, + uint32_t /*persistId*/, + uint32_t /*persistBank*/) +{ + Mutex::ScopedLock lock(addLock); + uint64_t objectId; + + // TODO: fix object-id handling + objectId = objIdPrefix | ((nextObjectId++) & 0x00FFFFFF); + object->setObjectId (objectId); + newManagementObjects[objectId] = object; + return objectId; +} + +uint32_t ManagementAgentImpl::pollCallbacks (uint32_t /*callLimit*/) +{ + return 0; +} + +int ManagementAgentImpl::getSignalFd (void) +{ + return -1; +} + +void ManagementAgentImpl::handleAttachResponse (Buffer& inBuffer) +{ + Mutex::ScopedLock lock(agentLock); + uint32_t assigned; + + assigned = inBuffer.getLong(); + objIdPrefix = ((uint64_t) assigned) << 24; + + // Send package indications for all local packages + for (PackageMap::iterator pIter = packages.begin(); + pIter != packages.end(); + pIter++) { + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader(outBuffer, 'p'); + EncodePackageIndication(outBuffer, pIter); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, "qpid.management", "agent"); + + // Send class indications for all local classes + ClassMap cMap = pIter->second; + for (ClassMap::iterator cIter = cMap.begin(); cIter != cMap.end(); cIter++) { + outBuffer.reset(); + EncodeHeader(outBuffer, 'q'); + EncodeClassIndication(outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, "qpid.management", "agent"); + } + } +} + +void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence) +{ + Mutex::ScopedLock lock(agentLock); + string packageName; + SchemaClassKey key; + + inBuffer.getShortString(packageName); + inBuffer.getShortString(key.name); + inBuffer.getBin128(key.hash); + + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) { + ClassMap cMap = pIter->second; + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end()) { + SchemaClass schema = cIter->second; + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader(outBuffer, 's', sequence); + schema.writeSchemaCall(outBuffer); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, "qpid.management", "agent"); + } + } +} + +void ManagementAgentImpl::handleConsoleAddedIndication() +{ + Mutex::ScopedLock lock(agentLock); + clientWasAdded = true; +} + +void ManagementAgentImpl::received (Message& msg) +{ + string data = msg.getData (); + Buffer inBuffer (const_cast<char*>(data.c_str()), data.size()); + uint8_t opcode; + uint32_t sequence; + + if (CheckHeader (inBuffer, &opcode, &sequence)) + { + if (opcode == 'a') handleAttachResponse(inBuffer); + else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence); + else if (opcode == 'x') handleConsoleAddedIndication(); + } +} + +void ManagementAgentImpl::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) +{ + buf.putOctet ('A'); + buf.putOctet ('M'); + buf.putOctet ('1'); + buf.putOctet (opcode); + buf.putLong (seq); +} + +bool ManagementAgentImpl::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) +{ + if (buf.getSize() < 8) + return false; + + uint8_t h1 = buf.getOctet (); + uint8_t h2 = buf.getOctet (); + uint8_t h3 = buf.getOctet (); + + *opcode = buf.getOctet (); + *seq = buf.getLong (); + + return h1 == 'A' && h2 == 'M' && h3 == '1'; +} + +void ManagementAgentImpl::SendBuffer (Buffer& buf, + uint32_t length, + string exchange, + string routingKey) +{ + Message msg; + string data; + + if (objIdPrefix == 0) + return; + + buf.getRawData(data, length); + msg.getDeliveryProperties().setRoutingKey(routingKey); + msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); + msg.setData (data); + session.messageTransfer (arg::content=msg, arg::destination=exchange); +} + +ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage (std::string name) +{ + PackageMap::iterator pIter = packages.find (name); + if (pIter != packages.end ()) + return pIter; + + // No such package found, create a new map entry. + std::pair<PackageMap::iterator, bool> result = + packages.insert (std::pair<string, ClassMap> (name, ClassMap ())); + + // Publish a package-indication message + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'p'); + EncodePackageIndication (outBuffer, result.first); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, "qpid.management", "mgmt.schema.package"); + + return result.first; +} + +void ManagementAgentImpl::moveNewObjectsLH() +{ + Mutex::ScopedLock lock (addLock); + for (ManagementObjectMap::iterator iter = newManagementObjects.begin (); + iter != newManagementObjects.end (); + iter++) + managementObjects[iter->first] = iter->second; + newManagementObjects.clear(); +} + +void ManagementAgentImpl::AddClassLocal (PackageMap::iterator pIter, + string className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall) +{ + SchemaClassKey key; + ClassMap& cMap = pIter->second; + + key.name = className; + memcpy (&key.hash, md5Sum, 16); + + ClassMap::iterator cIter = cMap.find (key); + if (cIter != cMap.end ()) + return; + + // No such class found, create a new class with local information. + SchemaClass classInfo; + + classInfo.writeSchemaCall = schemaCall; + cMap[key] = classInfo; + + // TODO: Publish a class-indication message +} + +void ManagementAgentImpl::EncodePackageIndication (Buffer& buf, + PackageMap::iterator pIter) +{ + buf.putShortString ((*pIter).first); +} + +void ManagementAgentImpl::EncodeClassIndication (Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter) +{ + SchemaClassKey key = (*cIter).first; + + buf.putShortString ((*pIter).first); + buf.putShortString (key.name); + buf.putBin128 (key.hash); +} + +void ManagementAgentImpl::PeriodicProcessing() +{ +#define BUFSIZE 65536 + Mutex::ScopedLock lock(agentLock); + char msgChars[BUFSIZE]; + uint32_t contentSize; + string routingKey; + std::list<uint64_t> deleteList; + + { + Buffer msgBuffer(msgChars, BUFSIZE); + EncodeHeader(msgBuffer, 'h'); + msgBuffer.putLongLong(uint64_t(Duration(now()))); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + routingKey = "mgmt." + systemId.str() + ".heartbeat"; + SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey); + } + + moveNewObjectsLH(); + + if (clientWasAdded) + { + clientWasAdded = false; + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject* object = iter->second; + object->setAllChanged (); + } + } + + if (managementObjects.empty ()) + return; + + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject* object = iter->second; + + if (object->getConfigChanged () || object->isDeleted ()) + { + Buffer msgBuffer (msgChars, BUFSIZE); + EncodeHeader (msgBuffer, 'c'); + object->writeProperties(msgBuffer); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + routingKey = "mgmt." + systemId.str() + ".prop." + object->getClassName (); + SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey); + } + + if (object->getInstChanged ()) + { + Buffer msgBuffer (msgChars, BUFSIZE); + EncodeHeader (msgBuffer, 'i'); + object->writeStatistics(msgBuffer); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + routingKey = "mgmt." + systemId.str () + ".stat." + object->getClassName (); + SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey); + } + + if (object->isDeleted ()) + deleteList.push_back (iter->first); + } + + // Delete flagged objects + for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin (); + iter != deleteList.rend (); + iter++) + managementObjects.erase (*iter); + + deleteList.clear (); +} + +void ManagementAgentImpl::BackgroundThread::run() +{ + while (true) { + ::sleep(5); + agent.PeriodicProcessing(); + } +} diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h new file mode 100644 index 0000000000..b7572fe833 --- /dev/null +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -0,0 +1,157 @@ +#ifndef _qpid_agent_ManagementAgentImpl_ +#define _qpid_agent_ManagementAgentImpl_ + +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "ManagementAgent.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Dispatcher.h" +#include "qpid/client/Session.h" +#include "qpid/client/AsyncSession.h" +#include "qpid/client/Message.h" +#include "qpid/client/MessageListener.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Mutex.h" +#include "qpid/framing/Uuid.h" +#include <iostream> +#include <sstream> + +namespace qpid { +namespace management { + +class ManagementAgentImpl : public ManagementAgent, public client::MessageListener +{ + public: + + ManagementAgentImpl (); + virtual ~ManagementAgentImpl (); + + int getMaxThreads() { return 1; } + void init(std::string brokerHost = "localhost", + uint16_t brokerPort = 5672, + uint16_t intervalSeconds = 10, + bool useExternalThread = false); + void RegisterClass(std::string packageName, + std::string className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall); + uint64_t addObject (management::ManagementObject* objectPtr, + uint32_t persistId = 0, + uint32_t persistBank = 4); + uint32_t pollCallbacks (uint32_t callLimit = 0); + int getSignalFd (void); + + void PeriodicProcessing(); + + private: + + struct SchemaClassKey + { + std::string name; + uint8_t hash[16]; + }; + + struct SchemaClassKeyComp + { + bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const + { + if (lhs.name != rhs.name) + return lhs.name < rhs.name; + else + for (int i = 0; i < 16; i++) + if (lhs.hash[i] != rhs.hash[i]) + return lhs.hash[i] < rhs.hash[i]; + return false; + } + }; + + struct SchemaClass + { + management::ManagementObject::writeSchemaCall_t writeSchemaCall; + + SchemaClass () : writeSchemaCall(0) {} + }; + + typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; + typedef std::map<std::string, ClassMap> PackageMap; + + PackageMap packages; + management::ManagementObjectMap managementObjects; + management::ManagementObjectMap newManagementObjects; + + void received (client::Message& msg); + + uint16_t interval; + bool extThread; + uint64_t nextObjectId; + sys::Mutex agentLock; + sys::Mutex addLock; + framing::Uuid sessionId; + framing::Uuid systemId; + + int signalFdIn, signalFdOut; + client::Connection connection; + client::Session session; + client::Dispatcher* dispatcher; + bool clientWasAdded; + uint64_t objIdPrefix; + std::stringstream queueName; +# define MA_BUFFER_SIZE 65536 + char outputBuffer[MA_BUFFER_SIZE]; + + class BackgroundThread : public sys::Runnable + { + ManagementAgentImpl& agent; + void run(); + public: + BackgroundThread(ManagementAgentImpl& _agent) : agent(_agent) {} + }; + + BackgroundThread bgThread; + sys::Thread thread; + + PackageMap::iterator FindOrAddPackage (std::string name); + void moveNewObjectsLH(); + void AddClassLocal (PackageMap::iterator pIter, + std::string className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall); + void EncodePackageIndication (qpid::framing::Buffer& buf, + PackageMap::iterator pIter); + void EncodeClassIndication (qpid::framing::Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter); + void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); + bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); + void SendBuffer (qpid::framing::Buffer& buf, + uint32_t length, + std::string exchange, + std::string routingKey); + void handleAttachResponse (qpid::framing::Buffer& inBuffer); + void handlePackageRequest (qpid::framing::Buffer& inBuffer); + void handleClassQuery (qpid::framing::Buffer& inBuffer); + void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence); + void handleConsoleAddedIndication(); +}; + +}} + +#endif /*!_qpid_agent_ManagementAgentImpl_*/ diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.cpp b/qpid/cpp/src/qpid/management/ManagementBroker.cpp index 106033f76f..84e0c650f2 100644 --- a/qpid/cpp/src/qpid/management/ManagementBroker.cpp +++ b/qpid/cpp/src/qpid/management/ManagementBroker.cpp @@ -55,6 +55,7 @@ ManagementBroker::ManagementBroker (string _dataDir, uint16_t _interval, Managea nextObjectId = 1; bootSequence = 1; nextRemoteBank = 10; + nextRequestSequence = 1; clientWasAdded = false; // Get from file or generate and save to file. @@ -155,8 +156,8 @@ void ManagementBroker::RegisterClass (string packageName, ManagementObject::writeSchemaCall_t schemaCall) { Mutex::ScopedLock lock (userLock); - PackageMap::iterator pIter = FindOrAddPackage (packageName); - AddClassLocal (pIter, className, md5Sum, schemaCall); + PackageMap::iterator pIter = FindOrAddPackageLH(packageName); + AddClass(pIter, className, md5Sum, schemaCall); } uint64_t ManagementBroker::addObject (ManagementObject* object, @@ -200,6 +201,17 @@ void ManagementBroker::clientAdded (void) Mutex::ScopedLock lock (userLock); clientWasAdded = true; + for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); + aIter != remoteAgents.end(); + aIter++) { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'x'); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey); + } } void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) @@ -512,8 +524,12 @@ void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_ sendCommandComplete (replyToKey, sequence); } -void ManagementBroker::handlePackageIndLH (Buffer& /*inBuffer*/, string /*replyToKey*/, uint32_t /*sequence*/) +void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/) { + std::string packageName; + + inBuffer.getShortString(packageName); + FindOrAddPackageLH(packageName); } void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) @@ -529,7 +545,7 @@ void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, cIter != cMap.end (); cIter++) { - if (cIter->second.hasSchema ()) + if (cIter->second->hasSchema ()) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; @@ -546,16 +562,46 @@ void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, sendCommandComplete (replyToKey, sequence); } -void ManagementBroker::SchemaClass::appendSchema (Buffer& buf) +void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t) +{ + std::string packageName; + SchemaClassKey key; + + inBuffer.getShortString(packageName); + inBuffer.getShortString(key.name); + inBuffer.getBin128(key.hash); + + PackageMap::iterator pIter = FindOrAddPackageLH(packageName); + ClassMap::iterator cIter = pIter->second.find(key); + if (cIter == pIter->second.end()) { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + uint32_t sequence = nextRequestSequence++; + + EncodeHeader (outBuffer, 'S', sequence); + outBuffer.putShortString(packageName); + outBuffer.putShortString(key.name); + outBuffer.putBin128(key.hash); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); + + SchemaClass* newSchema = new SchemaClass; + newSchema->pendingSequence = sequence; + pIter->second[key] = newSchema; + } +} + +void ManagementBroker::SchemaClass::appendSchema(Buffer& buf) { // If the management package is attached locally (embedded in the broker or // linked in via plug-in), call the schema handler directly. If the package // is from a remote management agent, send the stored schema information. if (writeSchemaCall != 0) - writeSchemaCall (buf); + writeSchemaCall(buf); else - buf.putRawData (buffer, bufferLen); + buf.putRawData(buffer, bufferLen); } void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) @@ -568,22 +614,19 @@ void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKe inBuffer.getBin128 (key.hash); PackageMap::iterator pIter = packages.find (packageName); - if (pIter != packages.end ()) - { + if (pIter != packages.end()) { ClassMap cMap = pIter->second; ClassMap::iterator cIter = cMap.find (key); - if (cIter != cMap.end ()) - { + if (cIter != cMap.end()) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - SchemaClass classInfo = cIter->second; + SchemaClass* classInfo = cIter->second; - if (classInfo.hasSchema()) - { - EncodeHeader (outBuffer, 's', sequence); - classInfo.appendSchema (outBuffer); + if (classInfo->hasSchema()) { + EncodeHeader(outBuffer, 's', sequence); + classInfo->appendSchema (outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); + outBuffer.reset(); SendBuffer (outBuffer, outLen, dExchange, replyToKey); } else @@ -596,6 +639,44 @@ void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKe sendCommandComplete (replyToKey, sequence, 1, "Package not found"); } +void ManagementBroker::handleSchemaResponseLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence) +{ + string packageName; + SchemaClassKey key; + + inBuffer.record(); + inBuffer.getShortString (packageName); + inBuffer.getShortString (key.name); + inBuffer.getBin128 (key.hash); + inBuffer.restore(); + + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) { + ClassMap cMap = pIter->second; + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end() && cIter->second->pendingSequence == sequence) { + size_t length = ValidateSchema(inBuffer); + if (length == 0) + cMap.erase(key); + else { + cIter->second->buffer = (uint8_t*) malloc(length); + cIter->second->bufferLen = length; + inBuffer.getRawData(cIter->second->buffer, cIter->second->bufferLen); + + // Publish a class-indication message + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'q'); + EncodeClassIndication (outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema"); + } + } + } +} + bool ManagementBroker::bankInUse (uint32_t bank) { for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); @@ -628,16 +709,16 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe string label; uint32_t requestedBank; uint32_t assignedBank; - Uuid sessionId; + string sessionName; Uuid systemId; inBuffer.getShortString (label); - sessionId.decode (inBuffer); + inBuffer.getShortString (sessionName); systemId.decode (inBuffer); requestedBank = inBuffer.getLong (); assignedBank = assignBankLH (requestedBank); - RemoteAgentMap::iterator aIter = remoteAgents.find (sessionId); + RemoteAgentMap::iterator aIter = remoteAgents.find (sessionName); if (aIter != remoteAgents.end()) { // There already exists an agent on this session. Reject the request. @@ -645,17 +726,21 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe return; } + // TODO: Reject requests for which the session name does not match an existing session. + RemoteAgent* agent = new RemoteAgent; agent->objIdBank = assignedBank; + agent->routingKey = replyToKey; agent->mgmtObject = new management::Agent (this, agent); - agent->mgmtObject->set_sessionId (sessionId); + agent->mgmtObject->set_sessionName (sessionName); agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); agent->mgmtObject->set_systemId (systemId); addObject (agent->mgmtObject); - remoteAgents[sessionId] = agent; + remoteAgents[sessionName] = agent; + // Send an Attach Response Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; @@ -734,16 +819,18 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg) if (!CheckHeader (inBuffer, &opcode, &sequence)) return; - if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence); - else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence); - else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence); - else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence); - else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence); - //else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence); - else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); + if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence); + else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence); + else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence); + else if (opcode == 'q') handleClassIndLH (inBuffer, replyToKey, sequence); + else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence); + else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); } -ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackage (std::string name) +ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name) { PackageMap::iterator pIter = packages.find (name); if (pIter != packages.end ()) @@ -767,10 +854,10 @@ ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackage (std:: return result.first; } -void ManagementBroker::AddClassLocal (PackageMap::iterator pIter, - string className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall) +void ManagementBroker::AddClass(PackageMap::iterator pIter, + string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) { SchemaClassKey key; ClassMap& cMap = pIter->second; @@ -785,12 +872,11 @@ void ManagementBroker::AddClassLocal (PackageMap::iterator pIter, // No such class found, create a new class with local information. QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." << key.name); - SchemaClass classInfo; + SchemaClass* classInfo = new SchemaClass; - classInfo.writeSchemaCall = schemaCall; + classInfo->writeSchemaCall = schemaCall; cMap[key] = classInfo; - - // TODO: Publish a class-indication message + cIter = cMap.find (key); } void ManagementBroker::EncodePackageIndication (Buffer& buf, @@ -810,3 +896,42 @@ void ManagementBroker::EncodeClassIndication (Buffer& buf, buf.putBin128 (key.hash); } +size_t ManagementBroker::ValidateSchema(Buffer& inBuffer) +{ + uint32_t start = inBuffer.getPosition(); + uint32_t end; + string text; + uint8_t hash[16]; + + inBuffer.record(); + inBuffer.getShortString(text); + inBuffer.getShortString(text); + inBuffer.getBin128(hash); + + uint16_t propCount = inBuffer.getShort(); + uint16_t statCount = inBuffer.getShort(); + uint16_t methCount = inBuffer.getShort(); + uint16_t evntCount = inBuffer.getShort(); + + for (uint16_t idx = 0; idx < propCount + statCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + } + + for (uint16_t idx = 0; idx < methCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + int argCount = ft.getInt("argCount"); + for (int mIdx = 0; mIdx < argCount; mIdx++) { + FieldTable aft; + aft.decode(inBuffer); + } + } + + if (evntCount != 0) + return 0; + + end = inBuffer.getPosition(); + inBuffer.restore(); // restore original position + return end - start; +} diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.h b/qpid/cpp/src/qpid/management/ManagementBroker.h index 5e9114c3f4..685b7db977 100644 --- a/qpid/cpp/src/qpid/management/ManagementBroker.h +++ b/qpid/cpp/src/qpid/management/ManagementBroker.h @@ -89,6 +89,7 @@ class ManagementBroker : public ManagementAgent struct RemoteAgent : public Manageable { uint32_t objIdBank; + std::string routingKey; Agent* mgmtObject; ManagementObject* GetManagementObject (void) const { return mgmtObject; } virtual ~RemoteAgent (); @@ -97,8 +98,8 @@ class ManagementBroker : public ManagementAgent // TODO: Eventually replace string with entire reply-to structure. reply-to // currently assumes that the exchange is "amq.direct" even though it could // in theory be specified differently. - typedef std::map<framing::Uuid, RemoteAgent*> RemoteAgentMap; - typedef std::vector<std::string> ReplyToVector; + typedef std::map<std::string, RemoteAgent*> RemoteAgentMap; + typedef std::vector<std::string> ReplyToVector; // Storage for known schema classes: // @@ -129,16 +130,16 @@ class ManagementBroker : public ManagementAgent struct SchemaClass { ManagementObject::writeSchemaCall_t writeSchemaCall; - ReplyToVector remoteAgents; - size_t bufferLen; - uint8_t* buffer; + uint32_t pendingSequence; + size_t bufferLen; + uint8_t* buffer; - SchemaClass () : writeSchemaCall(0), bufferLen(0), buffer(0) {} + SchemaClass () : writeSchemaCall(0), pendingSequence(0), bufferLen(0), buffer(0) {} bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); } void appendSchema (framing::Buffer& buf); }; - typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; + typedef std::map<SchemaClassKey, SchemaClass*, SchemaClassKeyComp> ClassMap; typedef std::map<std::string, ClassMap> PackageMap; RemoteAgentMap remoteAgents; @@ -162,6 +163,7 @@ class ManagementBroker : public ManagementAgent uint32_t localBank; uint32_t nextObjectId; uint32_t nextRemoteBank; + uint32_t nextRequestSequence; bool clientWasAdded; # define MA_BUFFER_SIZE 65536 @@ -183,11 +185,11 @@ class ManagementBroker : public ManagementAgent size_t first); void dispatchAgentCommandLH (broker::Message& msg); - PackageMap::iterator FindOrAddPackage (std::string name); - void AddClassLocal (PackageMap::iterator pIter, - std::string className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall); + PackageMap::iterator FindOrAddPackageLH(std::string name); + void AddClass(PackageMap::iterator pIter, + std::string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); void EncodePackageIndication (framing::Buffer& buf, PackageMap::iterator pIter); void EncodeClassIndication (framing::Buffer& buf, @@ -198,13 +200,17 @@ class ManagementBroker : public ManagementAgent uint32_t assignBankLH (uint32_t requestedPrefix); void sendCommandComplete (std::string replyToKey, uint32_t sequence, uint32_t code = 0, std::string text = std::string("OK")); - void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handlePackageQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handlePackageIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleClassQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handlePackageQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handlePackageIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleClassQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleClassIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + + size_t ValidateSchema(framing::Buffer&); }; }} |