summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-07-11 20:14:07 +0000
committerTed Ross <tross@apache.org>2008-07-11 20:14:07 +0000
commit2ebe3bcb668151cfd9a860e4416fe4478d9a56f4 (patch)
tree806288ff720f5b6bd73709e008e4f63c7e838896 /qpid/cpp
parent525081bf3d3e9cb04cd9c7d3be030b4a2153be23 (diff)
downloadqpid-python-2ebe3bcb668151cfd9a860e4416fe4478d9a56f4.tar.gz
QPID-1174 Remote Management Agent for management of external components
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@676067 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/examples/qmf-agent/Makefile85
-rw-r--r--qpid/cpp/examples/qmf-agent/example.cpp97
-rw-r--r--qpid/cpp/examples/qmf-agent/schema.xml57
-rwxr-xr-xqpid/cpp/managementgen/main.py6
-rwxr-xr-xqpid/cpp/managementgen/schema.py5
-rw-r--r--qpid/cpp/managementgen/templates/Package.cpp2
-rw-r--r--qpid/cpp/src/Makefile.am6
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp426
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.h157
-rw-r--r--qpid/cpp/src/qpid/management/ManagementBroker.cpp201
-rw-r--r--qpid/cpp/src/qpid/management/ManagementBroker.h44
11 files changed, 1017 insertions, 69 deletions
diff --git a/qpid/cpp/examples/qmf-agent/Makefile b/qpid/cpp/examples/qmf-agent/Makefile
new file mode 100644
index 0000000000..cc33dd1dc6
--- /dev/null
+++ b/qpid/cpp/examples/qmf-agent/Makefile
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+SRC_DIR = .
+QPID_DIR = ../../..
+SCHEMA_FILE = $(SRC_DIR)/schema.xml
+GEN_DIR = $(SRC_DIR)/gen
+OUT_FILE = $(SRC_DIR)/qmf-agent
+
+CC = gcc
+LIB_DIR = $(QPID_DIR)/cpp/src/.libs
+CC_INCLUDES = -I$(SRC_DIR) -I$(QPID_DIR)/cpp/src -I$(QPID_DIR)/cpp/src/gen -I$(GEN_DIR)
+CC_FLAGS = -g -O2
+LD_FLAGS = -lqpidclient -lqpidcommon -L$(LIB_DIR)
+SPEC_DIR = $(QPID_DIR)/specs
+MGEN_DIR = $(QPID_DIR)/cpp/managementgen
+TEMPLATE_DIR = $(MGEN_DIR)/templates
+MGEN = $(MGEN_DIR)/main.py
+OBJ_DIR = $(SRC_DIR)/.libs
+
+vpath %.cpp $(SRC_DIR):$(GEN_DIR)
+vpath %.d $(OBJ_DIR)
+vpath %.o $(OBJ_DIR)
+
+cpps = $(wildcard $(SRC_DIR)/*.cpp)
+cpps += $(wildcard $(GEN_DIR)/*.cpp)
+deps = $(addsuffix .d, $(basename $(cpps)))
+objects = $(addsuffix .o, $(basename $(cpps)))
+
+.PHONY: all clean
+
+#==========================================================
+# Pass 0: generate source files from schema
+ifeq ($(MAKELEVEL), 0)
+
+all:
+ $(MGEN) $(SCHEMA_FILE) $(SPEC_DIR)/management-types.xml $(TEMPLATE_DIR) $(GEN_DIR)
+ $(MAKE)
+
+clean:
+ rm -rf $(GEN_DIR) $(OUT_FILE) *.d *.o
+
+
+#==========================================================
+# Pass 1: generate dependencies
+else ifeq ($(MAKELEVEL), 1)
+
+all: $(deps)
+ $(MAKE)
+
+%.d : %.cpp
+ $(CC) -M $(CC_FLAGS) $(CC_INCLUDES) $< > $@
+
+
+#==========================================================
+# Pass 2: build project
+else ifeq ($(MAKELEVEL), 2)
+
+$(OUT_FILE) : $(objects)
+ $(CC) -o $(OUT_FILE) $(CC_FLAGS) $(LD_FLAGS) $(objects)
+
+include $(deps)
+
+%.o : %.cpp
+ $(CC) -c $(CC_FLAGS) $(CC_INCLUDES) -o $@ $<
+
+endif
+
+
diff --git a/qpid/cpp/examples/qmf-agent/example.cpp b/qpid/cpp/examples/qmf-agent/example.cpp
new file mode 100644
index 0000000000..4113d22cac
--- /dev/null
+++ b/qpid/cpp/examples/qmf-agent/example.cpp
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/management/Manageable.h>
+#include <qpid/management/ManagementObject.h>
+#include <qpid/agent/ManagementAgent.h>
+#include <qpid/agent/ManagementAgentImpl.h>
+#include "Parent.h"
+#include "PackageQmf_example.h"
+
+#include <unistd.h>
+#include <cstdlib>
+#include <iostream>
+
+#include <sstream>
+
+using namespace qpid::management;
+using namespace std;
+
+//==============================================================
+// CoreClass is the operational class that corresponds to the
+// "Parent" class in the management schema.
+//==============================================================
+class CoreClass : public Manageable
+{
+ string name;
+ Parent* mgmtObject;
+
+public:
+
+ CoreClass(ManagementAgent* agent, string _name);
+ ~CoreClass() {}
+
+ void bumpCounter() { mgmtObject->inc_count(); }
+
+ ManagementObject* GetManagementObject(void) const
+ { return mgmtObject; }
+};
+
+CoreClass::CoreClass(ManagementAgent* agent, string _name) : name(_name)
+{
+ mgmtObject = new Parent(agent, this, name);
+
+ agent->addObject(mgmtObject);
+ mgmtObject->set_state("IDLE");
+}
+
+
+//==============================================================
+// Main program
+//==============================================================
+int main(int argc, char** argv) {
+ const char* host = argc>1 ? argv[1] : "127.0.0.1";
+ int port = argc>2 ? atoi(argv[2]) : 5672;
+
+ // Create the qmf management agent
+ ManagementAgent* agent = new ManagementAgentImpl();
+
+ // Register the Qmf_example schema with the agent
+ PackageQmf_example packageInit(agent);
+
+ // Start the agent. It will attempt to make a connection to the
+ // management broker
+ agent->init (string(host), port);
+
+ // Allocate some core objects
+ CoreClass core1(agent, "Example Core Object #1");
+ CoreClass core2(agent, "Example Core Object #2");
+ CoreClass core3(agent, "Example Core Object #3");
+
+ // Periodically bump a counter in core1 to provide a changing statistical value
+ while (1)
+ {
+ sleep(1);
+ core1.bumpCounter();
+ }
+}
+
+
diff --git a/qpid/cpp/examples/qmf-agent/schema.xml b/qpid/cpp/examples/qmf-agent/schema.xml
new file mode 100644
index 0000000000..de8776c818
--- /dev/null
+++ b/qpid/cpp/examples/qmf-agent/schema.xml
@@ -0,0 +1,57 @@
+<schema package="qmf_example">
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+ <!--
+ ===============================================================
+ Parent
+ ===============================================================
+ -->
+ <class name="Parent">
+
+ This class represents a parent object
+
+ <property name="name" type="sstr" access="RC" index="y"/>
+
+ <statistic name="state" type="sstr" desc="Operational state of the link"/>
+ <statistic name="count" type="count64" unit="tick" desc="Counter that increases monotonically"/>
+
+ <method name="create_child" desc="Create child object">
+ <arg name="name" dir="I" type="sstr"/>
+ <arg name="childRef" dir="O" type="objId"/>
+ </method>
+ </class>
+
+
+ <!--
+ ===============================================================
+ Child
+ ===============================================================
+ -->
+ <class name="Child">
+ <property name="ParentRef" type="objId" references="Parent" access="RC" index="y" parentRef="y"/>
+ <property name="name" type="sstr" access="RC" index="y"/>
+
+ <statistic name="count" type="count64" unit="tick" desc="Counter that increases monotonically"/>
+
+ <method name="delete"/>
+ </class>
+</schema>
+
diff --git a/qpid/cpp/managementgen/main.py b/qpid/cpp/managementgen/main.py
index 87ef3d5298..4459177a53 100755
--- a/qpid/cpp/managementgen/main.py
+++ b/qpid/cpp/managementgen/main.py
@@ -28,9 +28,6 @@ usage = "usage: %prog [options] schema-document type-document template-director
parser = OptionParser (usage=usage)
parser.add_option ("-m", "--makefile", dest="makefile", metavar="FILE",
help="Makefile fragment")
-parser.add_option ("-i", "--include-prefix", dest="include_prefix", metavar="PATH",
- default="qpid/management/",
- help="Prefix for #include of generated headers in generated source, default: qpid/management/")
(opts, args) = parser.parse_args ()
@@ -42,9 +39,6 @@ typefile = args[1]
templatedir = args[2]
outdir = args[3]
-if opts.include_prefix == ".":
- opts.include_prefix = None
-
gen = Generator (outdir, templatedir)
schema = PackageSchema (typefile, schemafile, opts)
diff --git a/qpid/cpp/managementgen/schema.py b/qpid/cpp/managementgen/schema.py
index 4f5dc216ab..921c1bf01f 100755
--- a/qpid/cpp/managementgen/schema.py
+++ b/qpid/cpp/managementgen/schema.py
@@ -890,8 +890,7 @@ class SchemaClass:
def genMethodArgIncludes (self, stream, variables):
for method in self.methods:
if method.getArgCount () > 0:
- stream.write ("#include \"" + (self.options.include_prefix or "") +\
- "Args" + method.getFullName () + ".h\"\n")
+ stream.write ("#include \"Args" + method.getFullName () + ".h\"\n")
def genMethodCount (self, stream, variables):
stream.write ("%d" % len (self.methods))
@@ -1040,7 +1039,7 @@ class PackageSchema:
def genClassIncludes (self, stream, variables):
for _class in self.classes:
- stream.write ("#include \"qpid/management/")
+ stream.write ("#include \"")
_class.genNameCap (stream, variables)
stream.write (".h\"\n")
diff --git a/qpid/cpp/managementgen/templates/Package.cpp b/qpid/cpp/managementgen/templates/Package.cpp
index 8bb2d42c47..15e7fc15ec 100644
--- a/qpid/cpp/managementgen/templates/Package.cpp
+++ b/qpid/cpp/managementgen/templates/Package.cpp
@@ -20,7 +20,7 @@
/*MGEN:Root.Disclaimer*/
-#include "qpid/management/Package/*MGEN:Schema.PackageNameCap*/.h"
+#include "Package/*MGEN:Schema.PackageNameCap*/.h"
/*MGEN:Schema.ClassIncludes*/
using namespace qpid::management;
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index bfebd4ae88..74aa504e90 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -221,6 +221,8 @@ libqpidcommon_la_SOURCES = \
qpid/Plugin.cpp \
qpid/StringUtils.cpp \
qpid/Url.cpp \
+ qpid/management/Manageable.cpp \
+ qpid/management/ManagementObject.cpp \
qpid/sys/AggregateOutput.cpp \
qpid/sys/AsynchIOHandler.cpp \
qpid/sys/Dispatcher.cpp \
@@ -304,10 +306,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/TxBuffer.cpp \
qpid/broker/TxPublish.cpp \
qpid/broker/Vhost.cpp \
- qpid/management/Manageable.cpp \
qpid/management/ManagementBroker.cpp \
qpid/management/ManagementExchange.cpp \
- qpid/management/ManagementObject.cpp \
qpid/sys/TCPIOPlugin.cpp
if HAVE_XML
@@ -319,6 +319,7 @@ libqpidclient_la_LIBADD = libqpidcommon.la -luuid
libqpidclient_la_SOURCES = \
$(rgen_client_srcs) \
+ qpid/agent/ManagementAgentImpl.cpp \
qpid/client/AckPolicy.cpp \
qpid/client/Bounds.cpp \
qpid/client/ConnectionImpl.cpp \
@@ -367,6 +368,7 @@ nobase_include_HEADERS = \
qpid/memory.h \
qpid/shared_ptr.h \
qpid/agent/ManagementAgent.h \
+ qpid/agent/ManagementAgentImpl.h \
qpid/broker/Broker.h \
qpid/broker/SessionAdapter.h \
qpid/broker/Exchange.h \
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
new file mode 100644
index 0000000000..3c079a5a0a
--- /dev/null
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -0,0 +1,426 @@
+
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "qpid/management/Manageable.h"
+#include "qpid/management/ManagementObject.h"
+#include "ManagementAgentImpl.h"
+#include <list>
+#include <unistd.h>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::management;
+using namespace qpid::sys;
+using std::stringstream;
+using std::string;
+using std::cout;
+using std::endl;
+
+ManagementAgent* ManagementAgent::getAgent()
+{
+ //static ManagementAgent* agent = 0;
+
+ //if (agent == 0)
+ // agent = new ManagementAgentImpl();
+ //return agent;
+ return 0;
+}
+
+ManagementAgentImpl::ManagementAgentImpl() :
+ clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread)
+{
+ // TODO: Establish system ID
+}
+
+void ManagementAgentImpl::init (std::string brokerHost,
+ uint16_t brokerPort,
+ uint16_t intervalSeconds,
+ bool useExternalThread)
+{
+ interval = intervalSeconds;
+ extThread = useExternalThread;
+ nextObjectId = 1;
+
+ sessionId.generate();
+ queueName << "qmfagent-" << sessionId;
+ string dest = "qmfagent";
+
+ connection.open(brokerHost.c_str(), brokerPort);
+ session = connection.newSession (queueName.str());
+ dispatcher = new client::Dispatcher(session);
+
+
+ session.queueDeclare (arg::queue=queueName.str());
+ session.exchangeBind (arg::exchange="amq.direct", arg::queue=queueName.str(),
+ arg::bindingKey=queueName.str ());
+ session.messageSubscribe (arg::queue=queueName.str(),
+ arg::destination=dest);
+ session.messageFlow (arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF);
+ session.messageFlow (arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF);
+
+ Message attachRequest;
+ char rawbuffer[512]; // TODO: Modify Buffer so it can use stringstream
+ Buffer buffer (rawbuffer, 512);
+
+ attachRequest.getDeliveryProperties().setRoutingKey("agent");
+ attachRequest.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
+
+ EncodeHeader (buffer, 'A');
+ buffer.putShortString ("RemoteAgent [C++]");
+ buffer.putShortString (queueName.str());
+ systemId.encode (buffer);
+ buffer.putLong (11);
+
+ size_t length = 512 - buffer.available ();
+ string stringBuffer (rawbuffer, length);
+ attachRequest.setData (stringBuffer);
+
+ session.messageTransfer (arg::content=attachRequest, arg::destination="qpid.management");
+
+ dispatcher->listen (dest, this);
+ dispatcher->start ();
+}
+
+ManagementAgentImpl::~ManagementAgentImpl ()
+{
+ dispatcher->stop ();
+ delete dispatcher;
+}
+
+void ManagementAgentImpl::RegisterClass (std::string packageName,
+ std::string className,
+ uint8_t* md5Sum,
+ management::ManagementObject::writeSchemaCall_t schemaCall)
+{
+ Mutex::ScopedLock lock(agentLock);
+ PackageMap::iterator pIter = FindOrAddPackage (packageName);
+ AddClassLocal (pIter, className, md5Sum, schemaCall);
+}
+
+uint64_t ManagementAgentImpl::addObject (ManagementObject* object,
+ uint32_t /*persistId*/,
+ uint32_t /*persistBank*/)
+{
+ Mutex::ScopedLock lock(addLock);
+ uint64_t objectId;
+
+ // TODO: fix object-id handling
+ objectId = objIdPrefix | ((nextObjectId++) & 0x00FFFFFF);
+ object->setObjectId (objectId);
+ newManagementObjects[objectId] = object;
+ return objectId;
+}
+
+uint32_t ManagementAgentImpl::pollCallbacks (uint32_t /*callLimit*/)
+{
+ return 0;
+}
+
+int ManagementAgentImpl::getSignalFd (void)
+{
+ return -1;
+}
+
+void ManagementAgentImpl::handleAttachResponse (Buffer& inBuffer)
+{
+ Mutex::ScopedLock lock(agentLock);
+ uint32_t assigned;
+
+ assigned = inBuffer.getLong();
+ objIdPrefix = ((uint64_t) assigned) << 24;
+
+ // Send package indications for all local packages
+ for (PackageMap::iterator pIter = packages.begin();
+ pIter != packages.end();
+ pIter++) {
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader(outBuffer, 'p');
+ EncodePackageIndication(outBuffer, pIter);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset();
+ SendBuffer(outBuffer, outLen, "qpid.management", "agent");
+
+ // Send class indications for all local classes
+ ClassMap cMap = pIter->second;
+ for (ClassMap::iterator cIter = cMap.begin(); cIter != cMap.end(); cIter++) {
+ outBuffer.reset();
+ EncodeHeader(outBuffer, 'q');
+ EncodeClassIndication(outBuffer, pIter, cIter);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset();
+ SendBuffer(outBuffer, outLen, "qpid.management", "agent");
+ }
+ }
+}
+
+void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence)
+{
+ Mutex::ScopedLock lock(agentLock);
+ string packageName;
+ SchemaClassKey key;
+
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(key.name);
+ inBuffer.getBin128(key.hash);
+
+ PackageMap::iterator pIter = packages.find(packageName);
+ if (pIter != packages.end()) {
+ ClassMap cMap = pIter->second;
+ ClassMap::iterator cIter = cMap.find(key);
+ if (cIter != cMap.end()) {
+ SchemaClass schema = cIter->second;
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader(outBuffer, 's', sequence);
+ schema.writeSchemaCall(outBuffer);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset();
+ SendBuffer(outBuffer, outLen, "qpid.management", "agent");
+ }
+ }
+}
+
+void ManagementAgentImpl::handleConsoleAddedIndication()
+{
+ Mutex::ScopedLock lock(agentLock);
+ clientWasAdded = true;
+}
+
+void ManagementAgentImpl::received (Message& msg)
+{
+ string data = msg.getData ();
+ Buffer inBuffer (const_cast<char*>(data.c_str()), data.size());
+ uint8_t opcode;
+ uint32_t sequence;
+
+ if (CheckHeader (inBuffer, &opcode, &sequence))
+ {
+ if (opcode == 'a') handleAttachResponse(inBuffer);
+ else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence);
+ else if (opcode == 'x') handleConsoleAddedIndication();
+ }
+}
+
+void ManagementAgentImpl::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
+{
+ buf.putOctet ('A');
+ buf.putOctet ('M');
+ buf.putOctet ('1');
+ buf.putOctet (opcode);
+ buf.putLong (seq);
+}
+
+bool ManagementAgentImpl::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
+{
+ if (buf.getSize() < 8)
+ return false;
+
+ uint8_t h1 = buf.getOctet ();
+ uint8_t h2 = buf.getOctet ();
+ uint8_t h3 = buf.getOctet ();
+
+ *opcode = buf.getOctet ();
+ *seq = buf.getLong ();
+
+ return h1 == 'A' && h2 == 'M' && h3 == '1';
+}
+
+void ManagementAgentImpl::SendBuffer (Buffer& buf,
+ uint32_t length,
+ string exchange,
+ string routingKey)
+{
+ Message msg;
+ string data;
+
+ if (objIdPrefix == 0)
+ return;
+
+ buf.getRawData(data, length);
+ msg.getDeliveryProperties().setRoutingKey(routingKey);
+ msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
+ msg.setData (data);
+ session.messageTransfer (arg::content=msg, arg::destination=exchange);
+}
+
+ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage (std::string name)
+{
+ PackageMap::iterator pIter = packages.find (name);
+ if (pIter != packages.end ())
+ return pIter;
+
+ // No such package found, create a new map entry.
+ std::pair<PackageMap::iterator, bool> result =
+ packages.insert (std::pair<string, ClassMap> (name, ClassMap ()));
+
+ // Publish a package-indication message
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'p');
+ EncodePackageIndication (outBuffer, result.first);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, "qpid.management", "mgmt.schema.package");
+
+ return result.first;
+}
+
+void ManagementAgentImpl::moveNewObjectsLH()
+{
+ Mutex::ScopedLock lock (addLock);
+ for (ManagementObjectMap::iterator iter = newManagementObjects.begin ();
+ iter != newManagementObjects.end ();
+ iter++)
+ managementObjects[iter->first] = iter->second;
+ newManagementObjects.clear();
+}
+
+void ManagementAgentImpl::AddClassLocal (PackageMap::iterator pIter,
+ string className,
+ uint8_t* md5Sum,
+ management::ManagementObject::writeSchemaCall_t schemaCall)
+{
+ SchemaClassKey key;
+ ClassMap& cMap = pIter->second;
+
+ key.name = className;
+ memcpy (&key.hash, md5Sum, 16);
+
+ ClassMap::iterator cIter = cMap.find (key);
+ if (cIter != cMap.end ())
+ return;
+
+ // No such class found, create a new class with local information.
+ SchemaClass classInfo;
+
+ classInfo.writeSchemaCall = schemaCall;
+ cMap[key] = classInfo;
+
+ // TODO: Publish a class-indication message
+}
+
+void ManagementAgentImpl::EncodePackageIndication (Buffer& buf,
+ PackageMap::iterator pIter)
+{
+ buf.putShortString ((*pIter).first);
+}
+
+void ManagementAgentImpl::EncodeClassIndication (Buffer& buf,
+ PackageMap::iterator pIter,
+ ClassMap::iterator cIter)
+{
+ SchemaClassKey key = (*cIter).first;
+
+ buf.putShortString ((*pIter).first);
+ buf.putShortString (key.name);
+ buf.putBin128 (key.hash);
+}
+
+void ManagementAgentImpl::PeriodicProcessing()
+{
+#define BUFSIZE 65536
+ Mutex::ScopedLock lock(agentLock);
+ char msgChars[BUFSIZE];
+ uint32_t contentSize;
+ string routingKey;
+ std::list<uint64_t> deleteList;
+
+ {
+ Buffer msgBuffer(msgChars, BUFSIZE);
+ EncodeHeader(msgBuffer, 'h');
+ msgBuffer.putLongLong(uint64_t(Duration(now())));
+
+ contentSize = BUFSIZE - msgBuffer.available ();
+ msgBuffer.reset ();
+ routingKey = "mgmt." + systemId.str() + ".heartbeat";
+ SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey);
+ }
+
+ moveNewObjectsLH();
+
+ if (clientWasAdded)
+ {
+ clientWasAdded = false;
+ for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+ iter != managementObjects.end ();
+ iter++)
+ {
+ ManagementObject* object = iter->second;
+ object->setAllChanged ();
+ }
+ }
+
+ if (managementObjects.empty ())
+ return;
+
+ for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+ iter != managementObjects.end ();
+ iter++)
+ {
+ ManagementObject* object = iter->second;
+
+ if (object->getConfigChanged () || object->isDeleted ())
+ {
+ Buffer msgBuffer (msgChars, BUFSIZE);
+ EncodeHeader (msgBuffer, 'c');
+ object->writeProperties(msgBuffer);
+
+ contentSize = BUFSIZE - msgBuffer.available ();
+ msgBuffer.reset ();
+ routingKey = "mgmt." + systemId.str() + ".prop." + object->getClassName ();
+ SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey);
+ }
+
+ if (object->getInstChanged ())
+ {
+ Buffer msgBuffer (msgChars, BUFSIZE);
+ EncodeHeader (msgBuffer, 'i');
+ object->writeStatistics(msgBuffer);
+
+ contentSize = BUFSIZE - msgBuffer.available ();
+ msgBuffer.reset ();
+ routingKey = "mgmt." + systemId.str () + ".stat." + object->getClassName ();
+ SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey);
+ }
+
+ if (object->isDeleted ())
+ deleteList.push_back (iter->first);
+ }
+
+ // Delete flagged objects
+ for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin ();
+ iter != deleteList.rend ();
+ iter++)
+ managementObjects.erase (*iter);
+
+ deleteList.clear ();
+}
+
+void ManagementAgentImpl::BackgroundThread::run()
+{
+ while (true) {
+ ::sleep(5);
+ agent.PeriodicProcessing();
+ }
+}
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
new file mode 100644
index 0000000000..b7572fe833
--- /dev/null
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -0,0 +1,157 @@
+#ifndef _qpid_agent_ManagementAgentImpl_
+#define _qpid_agent_ManagementAgentImpl_
+
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "ManagementAgent.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Dispatcher.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/framing/Uuid.h"
+#include <iostream>
+#include <sstream>
+
+namespace qpid {
+namespace management {
+
+class ManagementAgentImpl : public ManagementAgent, public client::MessageListener
+{
+ public:
+
+ ManagementAgentImpl ();
+ virtual ~ManagementAgentImpl ();
+
+ int getMaxThreads() { return 1; }
+ void init(std::string brokerHost = "localhost",
+ uint16_t brokerPort = 5672,
+ uint16_t intervalSeconds = 10,
+ bool useExternalThread = false);
+ void RegisterClass(std::string packageName,
+ std::string className,
+ uint8_t* md5Sum,
+ management::ManagementObject::writeSchemaCall_t schemaCall);
+ uint64_t addObject (management::ManagementObject* objectPtr,
+ uint32_t persistId = 0,
+ uint32_t persistBank = 4);
+ uint32_t pollCallbacks (uint32_t callLimit = 0);
+ int getSignalFd (void);
+
+ void PeriodicProcessing();
+
+ private:
+
+ struct SchemaClassKey
+ {
+ std::string name;
+ uint8_t hash[16];
+ };
+
+ struct SchemaClassKeyComp
+ {
+ bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
+ {
+ if (lhs.name != rhs.name)
+ return lhs.name < rhs.name;
+ else
+ for (int i = 0; i < 16; i++)
+ if (lhs.hash[i] != rhs.hash[i])
+ return lhs.hash[i] < rhs.hash[i];
+ return false;
+ }
+ };
+
+ struct SchemaClass
+ {
+ management::ManagementObject::writeSchemaCall_t writeSchemaCall;
+
+ SchemaClass () : writeSchemaCall(0) {}
+ };
+
+ typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
+ typedef std::map<std::string, ClassMap> PackageMap;
+
+ PackageMap packages;
+ management::ManagementObjectMap managementObjects;
+ management::ManagementObjectMap newManagementObjects;
+
+ void received (client::Message& msg);
+
+ uint16_t interval;
+ bool extThread;
+ uint64_t nextObjectId;
+ sys::Mutex agentLock;
+ sys::Mutex addLock;
+ framing::Uuid sessionId;
+ framing::Uuid systemId;
+
+ int signalFdIn, signalFdOut;
+ client::Connection connection;
+ client::Session session;
+ client::Dispatcher* dispatcher;
+ bool clientWasAdded;
+ uint64_t objIdPrefix;
+ std::stringstream queueName;
+# define MA_BUFFER_SIZE 65536
+ char outputBuffer[MA_BUFFER_SIZE];
+
+ class BackgroundThread : public sys::Runnable
+ {
+ ManagementAgentImpl& agent;
+ void run();
+ public:
+ BackgroundThread(ManagementAgentImpl& _agent) : agent(_agent) {}
+ };
+
+ BackgroundThread bgThread;
+ sys::Thread thread;
+
+ PackageMap::iterator FindOrAddPackage (std::string name);
+ void moveNewObjectsLH();
+ void AddClassLocal (PackageMap::iterator pIter,
+ std::string className,
+ uint8_t* md5Sum,
+ management::ManagementObject::writeSchemaCall_t schemaCall);
+ void EncodePackageIndication (qpid::framing::Buffer& buf,
+ PackageMap::iterator pIter);
+ void EncodeClassIndication (qpid::framing::Buffer& buf,
+ PackageMap::iterator pIter,
+ ClassMap::iterator cIter);
+ void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
+ bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
+ void SendBuffer (qpid::framing::Buffer& buf,
+ uint32_t length,
+ std::string exchange,
+ std::string routingKey);
+ void handleAttachResponse (qpid::framing::Buffer& inBuffer);
+ void handlePackageRequest (qpid::framing::Buffer& inBuffer);
+ void handleClassQuery (qpid::framing::Buffer& inBuffer);
+ void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence);
+ void handleConsoleAddedIndication();
+};
+
+}}
+
+#endif /*!_qpid_agent_ManagementAgentImpl_*/
diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.cpp b/qpid/cpp/src/qpid/management/ManagementBroker.cpp
index 106033f76f..84e0c650f2 100644
--- a/qpid/cpp/src/qpid/management/ManagementBroker.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementBroker.cpp
@@ -55,6 +55,7 @@ ManagementBroker::ManagementBroker (string _dataDir, uint16_t _interval, Managea
nextObjectId = 1;
bootSequence = 1;
nextRemoteBank = 10;
+ nextRequestSequence = 1;
clientWasAdded = false;
// Get from file or generate and save to file.
@@ -155,8 +156,8 @@ void ManagementBroker::RegisterClass (string packageName,
ManagementObject::writeSchemaCall_t schemaCall)
{
Mutex::ScopedLock lock (userLock);
- PackageMap::iterator pIter = FindOrAddPackage (packageName);
- AddClassLocal (pIter, className, md5Sum, schemaCall);
+ PackageMap::iterator pIter = FindOrAddPackageLH(packageName);
+ AddClass(pIter, className, md5Sum, schemaCall);
}
uint64_t ManagementBroker::addObject (ManagementObject* object,
@@ -200,6 +201,17 @@ void ManagementBroker::clientAdded (void)
Mutex::ScopedLock lock (userLock);
clientWasAdded = true;
+ for (RemoteAgentMap::iterator aIter = remoteAgents.begin();
+ aIter != remoteAgents.end();
+ aIter++) {
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'x');
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey);
+ }
}
void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
@@ -512,8 +524,12 @@ void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_
sendCommandComplete (replyToKey, sequence);
}
-void ManagementBroker::handlePackageIndLH (Buffer& /*inBuffer*/, string /*replyToKey*/, uint32_t /*sequence*/)
+void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/)
{
+ std::string packageName;
+
+ inBuffer.getShortString(packageName);
+ FindOrAddPackageLH(packageName);
}
void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
@@ -529,7 +545,7 @@ void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey,
cIter != cMap.end ();
cIter++)
{
- if (cIter->second.hasSchema ())
+ if (cIter->second->hasSchema ())
{
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
@@ -546,16 +562,46 @@ void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey,
sendCommandComplete (replyToKey, sequence);
}
-void ManagementBroker::SchemaClass::appendSchema (Buffer& buf)
+void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t)
+{
+ std::string packageName;
+ SchemaClassKey key;
+
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(key.name);
+ inBuffer.getBin128(key.hash);
+
+ PackageMap::iterator pIter = FindOrAddPackageLH(packageName);
+ ClassMap::iterator cIter = pIter->second.find(key);
+ if (cIter == pIter->second.end()) {
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+ uint32_t sequence = nextRequestSequence++;
+
+ EncodeHeader (outBuffer, 'S', sequence);
+ outBuffer.putShortString(packageName);
+ outBuffer.putShortString(key.name);
+ outBuffer.putBin128(key.hash);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+
+ SchemaClass* newSchema = new SchemaClass;
+ newSchema->pendingSequence = sequence;
+ pIter->second[key] = newSchema;
+ }
+}
+
+void ManagementBroker::SchemaClass::appendSchema(Buffer& buf)
{
// If the management package is attached locally (embedded in the broker or
// linked in via plug-in), call the schema handler directly. If the package
// is from a remote management agent, send the stored schema information.
if (writeSchemaCall != 0)
- writeSchemaCall (buf);
+ writeSchemaCall(buf);
else
- buf.putRawData (buffer, bufferLen);
+ buf.putRawData(buffer, bufferLen);
}
void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
@@ -568,22 +614,19 @@ void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKe
inBuffer.getBin128 (key.hash);
PackageMap::iterator pIter = packages.find (packageName);
- if (pIter != packages.end ())
- {
+ if (pIter != packages.end()) {
ClassMap cMap = pIter->second;
ClassMap::iterator cIter = cMap.find (key);
- if (cIter != cMap.end ())
- {
+ if (cIter != cMap.end()) {
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- SchemaClass classInfo = cIter->second;
+ SchemaClass* classInfo = cIter->second;
- if (classInfo.hasSchema())
- {
- EncodeHeader (outBuffer, 's', sequence);
- classInfo.appendSchema (outBuffer);
+ if (classInfo->hasSchema()) {
+ EncodeHeader(outBuffer, 's', sequence);
+ classInfo->appendSchema (outBuffer);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
+ outBuffer.reset();
SendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
else
@@ -596,6 +639,44 @@ void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKe
sendCommandComplete (replyToKey, sequence, 1, "Package not found");
}
+void ManagementBroker::handleSchemaResponseLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence)
+{
+ string packageName;
+ SchemaClassKey key;
+
+ inBuffer.record();
+ inBuffer.getShortString (packageName);
+ inBuffer.getShortString (key.name);
+ inBuffer.getBin128 (key.hash);
+ inBuffer.restore();
+
+ PackageMap::iterator pIter = packages.find(packageName);
+ if (pIter != packages.end()) {
+ ClassMap cMap = pIter->second;
+ ClassMap::iterator cIter = cMap.find(key);
+ if (cIter != cMap.end() && cIter->second->pendingSequence == sequence) {
+ size_t length = ValidateSchema(inBuffer);
+ if (length == 0)
+ cMap.erase(key);
+ else {
+ cIter->second->buffer = (uint8_t*) malloc(length);
+ cIter->second->bufferLen = length;
+ inBuffer.getRawData(cIter->second->buffer, cIter->second->bufferLen);
+
+ // Publish a class-indication message
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'q');
+ EncodeClassIndication (outBuffer, pIter, cIter);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema");
+ }
+ }
+ }
+}
+
bool ManagementBroker::bankInUse (uint32_t bank)
{
for (RemoteAgentMap::iterator aIter = remoteAgents.begin();
@@ -628,16 +709,16 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
string label;
uint32_t requestedBank;
uint32_t assignedBank;
- Uuid sessionId;
+ string sessionName;
Uuid systemId;
inBuffer.getShortString (label);
- sessionId.decode (inBuffer);
+ inBuffer.getShortString (sessionName);
systemId.decode (inBuffer);
requestedBank = inBuffer.getLong ();
assignedBank = assignBankLH (requestedBank);
- RemoteAgentMap::iterator aIter = remoteAgents.find (sessionId);
+ RemoteAgentMap::iterator aIter = remoteAgents.find (sessionName);
if (aIter != remoteAgents.end())
{
// There already exists an agent on this session. Reject the request.
@@ -645,17 +726,21 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
return;
}
+ // TODO: Reject requests for which the session name does not match an existing session.
+
RemoteAgent* agent = new RemoteAgent;
agent->objIdBank = assignedBank;
+ agent->routingKey = replyToKey;
agent->mgmtObject = new management::Agent (this, agent);
- agent->mgmtObject->set_sessionId (sessionId);
+ agent->mgmtObject->set_sessionName (sessionName);
agent->mgmtObject->set_label (label);
agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
agent->mgmtObject->set_systemId (systemId);
addObject (agent->mgmtObject);
- remoteAgents[sessionId] = agent;
+ remoteAgents[sessionName] = agent;
+ // Send an Attach Response
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
@@ -734,16 +819,18 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg)
if (!CheckHeader (inBuffer, &opcode, &sequence))
return;
- if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence);
- //else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence);
+ if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'q') handleClassIndLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence);
}
-ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackage (std::string name)
+ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name)
{
PackageMap::iterator pIter = packages.find (name);
if (pIter != packages.end ())
@@ -767,10 +854,10 @@ ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackage (std::
return result.first;
}
-void ManagementBroker::AddClassLocal (PackageMap::iterator pIter,
- string className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall)
+void ManagementBroker::AddClass(PackageMap::iterator pIter,
+ string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
{
SchemaClassKey key;
ClassMap& cMap = pIter->second;
@@ -785,12 +872,11 @@ void ManagementBroker::AddClassLocal (PackageMap::iterator pIter,
// No such class found, create a new class with local information.
QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." <<
key.name);
- SchemaClass classInfo;
+ SchemaClass* classInfo = new SchemaClass;
- classInfo.writeSchemaCall = schemaCall;
+ classInfo->writeSchemaCall = schemaCall;
cMap[key] = classInfo;
-
- // TODO: Publish a class-indication message
+ cIter = cMap.find (key);
}
void ManagementBroker::EncodePackageIndication (Buffer& buf,
@@ -810,3 +896,42 @@ void ManagementBroker::EncodeClassIndication (Buffer& buf,
buf.putBin128 (key.hash);
}
+size_t ManagementBroker::ValidateSchema(Buffer& inBuffer)
+{
+ uint32_t start = inBuffer.getPosition();
+ uint32_t end;
+ string text;
+ uint8_t hash[16];
+
+ inBuffer.record();
+ inBuffer.getShortString(text);
+ inBuffer.getShortString(text);
+ inBuffer.getBin128(hash);
+
+ uint16_t propCount = inBuffer.getShort();
+ uint16_t statCount = inBuffer.getShort();
+ uint16_t methCount = inBuffer.getShort();
+ uint16_t evntCount = inBuffer.getShort();
+
+ for (uint16_t idx = 0; idx < propCount + statCount; idx++) {
+ FieldTable ft;
+ ft.decode(inBuffer);
+ }
+
+ for (uint16_t idx = 0; idx < methCount; idx++) {
+ FieldTable ft;
+ ft.decode(inBuffer);
+ int argCount = ft.getInt("argCount");
+ for (int mIdx = 0; mIdx < argCount; mIdx++) {
+ FieldTable aft;
+ aft.decode(inBuffer);
+ }
+ }
+
+ if (evntCount != 0)
+ return 0;
+
+ end = inBuffer.getPosition();
+ inBuffer.restore(); // restore original position
+ return end - start;
+}
diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.h b/qpid/cpp/src/qpid/management/ManagementBroker.h
index 5e9114c3f4..685b7db977 100644
--- a/qpid/cpp/src/qpid/management/ManagementBroker.h
+++ b/qpid/cpp/src/qpid/management/ManagementBroker.h
@@ -89,6 +89,7 @@ class ManagementBroker : public ManagementAgent
struct RemoteAgent : public Manageable
{
uint32_t objIdBank;
+ std::string routingKey;
Agent* mgmtObject;
ManagementObject* GetManagementObject (void) const { return mgmtObject; }
virtual ~RemoteAgent ();
@@ -97,8 +98,8 @@ class ManagementBroker : public ManagementAgent
// TODO: Eventually replace string with entire reply-to structure. reply-to
// currently assumes that the exchange is "amq.direct" even though it could
// in theory be specified differently.
- typedef std::map<framing::Uuid, RemoteAgent*> RemoteAgentMap;
- typedef std::vector<std::string> ReplyToVector;
+ typedef std::map<std::string, RemoteAgent*> RemoteAgentMap;
+ typedef std::vector<std::string> ReplyToVector;
// Storage for known schema classes:
//
@@ -129,16 +130,16 @@ class ManagementBroker : public ManagementAgent
struct SchemaClass
{
ManagementObject::writeSchemaCall_t writeSchemaCall;
- ReplyToVector remoteAgents;
- size_t bufferLen;
- uint8_t* buffer;
+ uint32_t pendingSequence;
+ size_t bufferLen;
+ uint8_t* buffer;
- SchemaClass () : writeSchemaCall(0), bufferLen(0), buffer(0) {}
+ SchemaClass () : writeSchemaCall(0), pendingSequence(0), bufferLen(0), buffer(0) {}
bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); }
void appendSchema (framing::Buffer& buf);
};
- typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
+ typedef std::map<SchemaClassKey, SchemaClass*, SchemaClassKeyComp> ClassMap;
typedef std::map<std::string, ClassMap> PackageMap;
RemoteAgentMap remoteAgents;
@@ -162,6 +163,7 @@ class ManagementBroker : public ManagementAgent
uint32_t localBank;
uint32_t nextObjectId;
uint32_t nextRemoteBank;
+ uint32_t nextRequestSequence;
bool clientWasAdded;
# define MA_BUFFER_SIZE 65536
@@ -183,11 +185,11 @@ class ManagementBroker : public ManagementAgent
size_t first);
void dispatchAgentCommandLH (broker::Message& msg);
- PackageMap::iterator FindOrAddPackage (std::string name);
- void AddClassLocal (PackageMap::iterator pIter,
- std::string className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall);
+ PackageMap::iterator FindOrAddPackageLH(std::string name);
+ void AddClass(PackageMap::iterator pIter,
+ std::string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
void EncodePackageIndication (framing::Buffer& buf,
PackageMap::iterator pIter);
void EncodeClassIndication (framing::Buffer& buf,
@@ -198,13 +200,17 @@ class ManagementBroker : public ManagementAgent
uint32_t assignBankLH (uint32_t requestedPrefix);
void sendCommandComplete (std::string replyToKey, uint32_t sequence,
uint32_t code = 0, std::string text = std::string("OK"));
- void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handlePackageQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handlePackageIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handleClassQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handlePackageQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handlePackageIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleClassQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleClassIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+
+ size_t ValidateSchema(framing::Buffer&);
};
}}