summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-09-03 18:01:44 +0000
committerTed Ross <tross@apache.org>2008-09-03 18:01:44 +0000
commitbf45f1241b9f801b55ede16d77c3dbbe505f0f89 (patch)
tree419d953ae460ce8e3a7607c9e749baf3c2829e6c
parent0da3229c29d5948c3a48631b83a2484dc349a974 (diff)
downloadqpid-python-bf45f1241b9f801b55ede16d77c3dbbe505f0f89.tar.gz
QPID-1174 Updates to the management framework
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@691700 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/examples/qmf-agent/example.cpp18
-rw-r--r--qpid/cpp/examples/qmf-agent/schema.xml8
-rwxr-xr-xqpid/cpp/managementgen/schema.py229
-rw-r--r--qpid/cpp/managementgen/templates/Class.cpp24
-rw-r--r--qpid/cpp/managementgen/templates/Class.h9
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgent.h17
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp585
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.h101
-rw-r--r--qpid/cpp/src/qpid/broker/AclModule.h25
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/System.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Vhost.cpp2
-rw-r--r--qpid/cpp/src/qpid/framing/FieldTable.h1
-rw-r--r--qpid/cpp/src/qpid/management/Manageable.cpp1
-rw-r--r--qpid/cpp/src/qpid/management/Manageable.h1
-rw-r--r--qpid/cpp/src/qpid/management/ManagementBroker.cpp322
-rw-r--r--qpid/cpp/src/qpid/management/ManagementBroker.h33
-rw-r--r--qpid/cpp/src/qpid/management/ManagementObject.cpp74
-rw-r--r--qpid/cpp/src/qpid/management/ManagementObject.h45
-rw-r--r--qpid/python/qpid/management.py181
-rw-r--r--qpid/python/qpid/managementdata.py80
-rw-r--r--qpid/specs/management-schema.xml33
-rw-r--r--qpid/specs/management-types.xml2
25 files changed, 1315 insertions, 496 deletions
diff --git a/qpid/cpp/examples/qmf-agent/example.cpp b/qpid/cpp/examples/qmf-agent/example.cpp
index 35ea97d4c0..c8d63a62b9 100644
--- a/qpid/cpp/examples/qmf-agent/example.cpp
+++ b/qpid/cpp/examples/qmf-agent/example.cpp
@@ -89,9 +89,10 @@ public:
CoreClass::CoreClass(ManagementAgent* _agent, string _name) : name(_name), agent(_agent)
{
+ static uint64_t persistId = 0x111222333444555LL;
mgmtObject = new Parent(agent, this, name);
- agent->addObject(mgmtObject);
+ agent->addObject(mgmtObject, persistId++);
mgmtObject->set_state("IDLE");
}
@@ -128,6 +129,8 @@ Manageable::status_t CoreClass::ManagementMethod(uint32_t methodId, Args& args)
children.push_back(child);
+ mgmtObject->event_childCreated(ioArgs.i_name);
+
return STATUS_OK;
}
@@ -145,7 +148,8 @@ ChildClass::ChildClass(ManagementAgent* agent, CoreClass* parent, string name)
//==============================================================
// Main program
//==============================================================
-int main(int argc, char** argv) {
+int main_int(int argc, char** argv)
+{
ManagementAgent::Singleton singleton;
const char* host = argc>1 ? argv[1] : "127.0.0.1";
int port = argc>2 ? atoi(argv[2]) : 5672;
@@ -158,7 +162,7 @@ int main(int argc, char** argv) {
// Start the agent. It will attempt to make a connection to the
// management broker
- agent->init(string(host), port);
+ agent->init(string(host), port, 5, false, ".magentdata");
// Allocate some core objects
CoreClass core1(agent, "Example Core Object #1");
@@ -168,4 +172,12 @@ int main(int argc, char** argv) {
core1.doLoop();
}
+int main(int argc, char** argv)
+{
+ try {
+ return main_int(argc, argv);
+ } catch(std::exception& e) {
+ cout << "Top Level Exception: " << e.what() << endl;
+ }
+}
diff --git a/qpid/cpp/examples/qmf-agent/schema.xml b/qpid/cpp/examples/qmf-agent/schema.xml
index de8776c818..b0f25fcbed 100644
--- a/qpid/cpp/examples/qmf-agent/schema.xml
+++ b/qpid/cpp/examples/qmf-agent/schema.xml
@@ -37,6 +37,14 @@
<arg name="name" dir="I" type="sstr"/>
<arg name="childRef" dir="O" type="objId"/>
</method>
+
+ <event name="childCreated">
+ <arg name="name" type="sstr"/>
+ </event>
+
+ <event name="childDestroyed">
+ <arg name="name" type="sstr"/>
+ </event>
</class>
diff --git a/qpid/cpp/managementgen/schema.py b/qpid/cpp/managementgen/schema.py
index 2ee61fff80..f911c28db3 100755
--- a/qpid/cpp/managementgen/schema.py
+++ b/qpid/cpp/managementgen/schema.py
@@ -79,7 +79,7 @@ class SchemaType:
def getName (self):
return self.name
- def genAccessor (self, stream, varName, changeFlag = None):
+ def genAccessor (self, stream, varName, changeFlag = None, optional = False):
if self.perThread:
prefix = "getThreadStats()->"
if self.style == "wm":
@@ -87,11 +87,13 @@ class SchemaType:
else:
prefix = ""
if self.accessor == "direct":
- stream.write (" inline void set_" + varName + " (" + self.cpp + " val){\n");
+ stream.write (" inline void set_" + varName + " (" + self.cpp + " val) {\n");
if not self.perThread:
stream.write (" sys::Mutex::ScopedLock mutex(accessLock);\n")
if self.style != "mma":
- stream.write (" " + prefix + varName + " = val;\n");
+ stream.write (" " + prefix + varName + " = val;\n")
+ if optional:
+ stream.write (" presenceMask[presenceByte_%s] |= presenceMask_%s;\n" % (varName, varName))
if self.style == "wm":
stream.write (" if (" + varName + "Low > val)\n")
stream.write (" " + varName + "Low = val;\n")
@@ -106,9 +108,24 @@ class SchemaType:
stream.write (" " + prefix + varName + "Max = val;\n")
if changeFlag != None:
stream.write (" " + changeFlag + " = true;\n")
- stream.write (" }\n");
+ stream.write (" }\n")
+ if self.style != "mma":
+ stream.write (" inline " + self.cpp + "& get_" + varName + "() {\n");
+ if not self.perThread:
+ stream.write (" sys::Mutex::ScopedLock mutex(accessLock);\n")
+ stream.write (" return " + prefix + varName + ";\n")
+ stream.write (" }\n")
+ if optional:
+ stream.write (" inline void clr_" + varName + "() {\n")
+ stream.write (" presenceMask[presenceByte_%s] &= ~presenceMask_%s;\n" % (varName, varName))
+ if changeFlag != None:
+ stream.write (" " + changeFlag + " = true;\n")
+ stream.write (" }\n")
+ stream.write (" inline bool isSet_" + varName + "() {\n")
+ stream.write (" return presenceMask[presenceByte_%s] & presenceMask_%s != 0;\n" % (varName, varName))
+ stream.write (" }\n")
elif self.accessor == "counter":
- stream.write (" inline void inc_" + varName + " (" + self.cpp + " by = 1){\n");
+ stream.write (" inline void inc_" + varName + " (" + self.cpp + " by = 1) {\n");
if not self.perThread:
stream.write (" sys::Mutex::ScopedLock mutex(accessLock);\n")
stream.write (" " + prefix + varName + " += by;\n")
@@ -118,7 +135,7 @@ class SchemaType:
if changeFlag != None:
stream.write (" " + changeFlag + " = true;\n")
stream.write (" }\n");
- stream.write (" inline void dec_" + varName + " (" + self.cpp + " by = 1){\n");
+ stream.write (" inline void dec_" + varName + " (" + self.cpp + " by = 1) {\n");
if not self.perThread:
stream.write (" sys::Mutex::ScopedLock mutex(accessLock);\n")
stream.write (" " + prefix + varName + " -= by;\n")
@@ -146,22 +163,22 @@ class SchemaType:
stream.write (" threadStats->" + varName + "Min = std::numeric_limits<" + cpptype + ">::max();\n")
stream.write (" threadStats->" + varName + "Max = std::numeric_limits<" + cpptype + ">::min();\n")
- def genWrite (self, stream, varName):
+ def genWrite (self, stream, varName, indent=" "):
if self.style != "mma":
- stream.write (" " + self.encode.replace ("@", "buf").replace ("#", varName) + ";\n")
+ stream.write (indent + self.encode.replace ("@", "buf").replace ("#", varName) + ";\n")
if self.style == "wm":
- stream.write (" " + self.encode.replace ("@", "buf") \
+ stream.write (indent + self.encode.replace ("@", "buf") \
.replace ("#", varName + "High") + ";\n")
- stream.write (" " + self.encode.replace ("@", "buf") \
+ stream.write (indent + self.encode.replace ("@", "buf") \
.replace ("#", varName + "Low") + ";\n")
if self.style == "mma":
- stream.write (" " + self.encode.replace ("@", "buf") \
+ stream.write (indent + self.encode.replace ("@", "buf") \
.replace ("#", varName + "Count") + ";\n")
- stream.write (" " + self.encode.replace ("@", "buf") \
+ stream.write (indent + self.encode.replace ("@", "buf") \
.replace ("#", varName + "Count ? " + varName + "Min : 0") + ";\n")
- stream.write (" " + self.encode.replace ("@", "buf") \
+ stream.write (indent + self.encode.replace ("@", "buf") \
.replace ("#", varName + "Max") + ";\n")
- stream.write (" " + self.encode.replace ("@", "buf") \
+ stream.write (indent + self.encode.replace ("@", "buf") \
.replace ("#", varName + "Count ? " + varName + "Total / " +
varName + "Count : 0") + ";\n")
@@ -207,7 +224,7 @@ class Type:
#=====================================================================================
#
#=====================================================================================
-class SchemaConfig:
+class SchemaProperty:
def __init__ (self, node, typespec):
self.name = None
self.type = None
@@ -216,6 +233,7 @@ class SchemaConfig:
self.isIndex = 0
self.isParentRef = 0
self.isGeneralRef = 0
+ self.isOptional = 0
self.unit = None
self.min = None
self.max = None
@@ -255,6 +273,11 @@ class SchemaConfig:
raise ValueError ("Expected 'y' in isGeneralReference attribute")
self.isGeneralRef = 1
+ elif key == 'optional':
+ if val != 'y':
+ raise ValueError ("Expected 'y' in optional attribute")
+ self.isOptional = 1
+
elif key == 'unit':
self.unit = val
@@ -273,6 +296,9 @@ class SchemaConfig:
else:
raise ValueError ("Unknown attribute in property '%s'" % key)
+ if self.access == "RC" and self.isOptional == 1:
+ raise ValueError ("Properties with ReadCreate access must not be optional (%s)" % self.name)
+
if self.name == None:
raise ValueError ("Missing 'name' attribute in property")
if self.type == None:
@@ -289,18 +315,19 @@ class SchemaConfig:
def genDeclaration (self, stream, prefix=" "):
stream.write (prefix + self.type.type.cpp + " " + self.name + ";\n")
- def genFormalParam (self, stream):
+ def genFormalParam (self, stream, variables):
stream.write (self.type.type.cpp + " _" + self.name)
def genAccessor (self, stream):
- self.type.type.genAccessor (stream, self.name, "configChanged")
+ self.type.type.genAccessor (stream, self.name, "configChanged", self.isOptional == 1)
def genSchema (self, stream):
stream.write (" ft = FieldTable ();\n")
- stream.write (" ft.setString (NAME, \"" + self.name + "\");\n")
- stream.write (" ft.setInt (TYPE, TYPE_" + self.type.type.base +");\n")
+ stream.write (" ft.setString (NAME, \"" + self.name + "\");\n")
+ stream.write (" ft.setInt (TYPE, TYPE_" + self.type.type.base +");\n")
stream.write (" ft.setInt (ACCESS, ACCESS_" + self.access + ");\n")
- stream.write (" ft.setInt (INDEX, " + str (self.isIndex) + ");\n")
+ stream.write (" ft.setInt (INDEX, " + str (self.isIndex) + ");\n")
+ stream.write (" ft.setInt (OPTIONAL, " + str (self.isOptional) + ");\n")
if self.unit != None:
stream.write (" ft.setString (UNIT, \"" + self.unit + "\");\n")
if self.min != None:
@@ -314,13 +341,19 @@ class SchemaConfig:
stream.write (" buf.put (ft);\n\n")
def genWrite (self, stream):
- self.type.type.genWrite (stream, self.name)
+ indent = " "
+ if self.isOptional:
+ stream.write(" if (presenceMask[presenceByte_%s] & presenceMask_%s) {\n" % (self.name, self.name))
+ indent = " "
+ self.type.type.genWrite (stream, self.name, indent)
+ if self.isOptional:
+ stream.write(" }\n")
#=====================================================================================
#
#=====================================================================================
-class SchemaInst:
+class SchemaStatistic:
def __init__ (self, node, typespec):
self.name = None
self.type = None
@@ -523,25 +556,30 @@ class SchemaArg:
def getDir (self):
return self.dir
- def genSchema (self, stream):
+ def genSchema (self, stream, event=False):
stream.write (" ft = FieldTable ();\n")
stream.write (" ft.setString (NAME, \"" + self.name + "\");\n")
stream.write (" ft.setInt (TYPE, TYPE_" + self.type.type.base +");\n")
- stream.write (" ft.setString (DIR, \"" + self.dir + "\");\n")
+ if (not event):
+ stream.write (" ft.setString (DIR, \"" + self.dir + "\");\n")
if self.unit != None:
stream.write (" ft.setString (UNIT, \"" + self.unit + "\");\n")
- if self.min != None:
- stream.write (" ft.setInt (MIN, " + self.min + ");\n")
- if self.max != None:
- stream.write (" ft.setInt (MAX, " + self.max + ");\n")
- if self.maxLen != None:
- stream.write (" ft.setInt (MAXLEN, " + self.maxLen + ");\n")
+ if not event:
+ if self.min != None:
+ stream.write (" ft.setInt (MIN, " + self.min + ");\n")
+ if self.max != None:
+ stream.write (" ft.setInt (MAX, " + self.max + ");\n")
+ if self.maxLen != None:
+ stream.write (" ft.setInt (MAXLEN, " + self.maxLen + ");\n")
+ if self.default != None:
+ stream.write (" ft.setString (DEFAULT, \"" + self.default + "\");\n")
if self.desc != None:
stream.write (" ft.setString (DESC, \"" + self.desc + "\");\n")
- if self.default != None:
- stream.write (" ft.setString (DEFAULT, \"" + self.default + "\");\n")
stream.write (" buf.put (ft);\n\n")
+ def genFormalParam (self, stream, variables):
+ stream.write ("%s _%s" % (self.type.type.cpp, self.name))
+
#=====================================================================================
#
#=====================================================================================
@@ -649,6 +687,50 @@ class SchemaEvent:
def getArgCount (self):
return len (self.args)
+ def genMethodBody (self, stream, variables, classObject):
+ stream.write("void ")
+ classObject.genNameCap(stream, variables)
+ stream.write("::event_%s(" % self.name)
+ count = 0
+ for arg in self.args:
+ arg.genFormalParam(stream, variables)
+ count += 1
+ if count < len(self.args):
+ stream.write(", ")
+ stream.write(") {\n")
+ stream.write(" sys::Mutex::ScopedLock mutex(getMutex());\n")
+ stream.write(" Buffer* buf = startEventLH();\n")
+ stream.write(" objectId.encode(*buf);\n")
+ stream.write(" buf->putShortString(packageName);\n")
+ stream.write(" buf->putShortString(className);\n")
+ stream.write(" buf->putBin128(md5Sum);\n")
+ stream.write(" buf->putShortString(\"%s\");\n" % self.name)
+ for arg in self.args:
+ stream.write(" %s;\n" % arg.type.type.encode.replace("@", "(*buf)").replace("#", "_" + arg.name))
+ stream.write(" finishEventLH(buf);\n")
+ stream.write("}\n\n")
+
+ def genMethodDecl (self, stream, variables):
+ stream.write(" void event_%s(" % self.name)
+ count = 0
+ for arg in self.args:
+ arg.genFormalParam(stream, variables)
+ count += 1
+ if count < len(self.args):
+ stream.write(", ")
+ stream.write(");\n")
+
+ def genSchema(self, stream, variables):
+ stream.write (" ft = FieldTable ();\n")
+ stream.write (" ft.setString (NAME, \"" + self.name + "\");\n")
+ stream.write (" ft.setInt (ARGCOUNT, " + str (len (self.args)) + ");\n")
+ if self.desc != None:
+ stream.write (" ft.setString (DESC, \"" + self.desc + "\");\n")
+ stream.write (" buf.put (ft);\n\n")
+ for arg in self.args:
+ arg.genSchema (stream, True)
+
+
class SchemaClass:
def __init__ (self, package, node, typespec, fragments, options):
@@ -669,11 +751,11 @@ class SchemaClass:
for child in children:
if child.nodeType == Node.ELEMENT_NODE:
if child.nodeName == 'property':
- sub = SchemaConfig (child, typespec)
+ sub = SchemaProperty (child, typespec)
self.properties.append (sub)
elif child.nodeName == 'statistic':
- sub = SchemaInst (child, typespec)
+ sub = SchemaStatistic (child, typespec)
self.statistics.append (sub)
elif child.nodeName == 'method':
@@ -758,6 +840,12 @@ class SchemaClass:
# Code Generation Functions. The names of these functions (minus the leading "gen")
# match the substitution keywords in the template files.
#===================================================================================
+ def testExistOptionals (self, variables):
+ for prop in self.properties:
+ if prop.isOptional == 1:
+ return True
+ return False
+
def testExistPerThreadStats (self, variables):
for inst in self.statistics:
if inst.type.type.perThread:
@@ -794,17 +882,13 @@ class SchemaClass:
for element in self.properties:
element.genDeclaration (stream)
- def genConfigElementSchema (self, stream, variables):
- for config in self.properties:
- config.genSchema (stream)
-
def genConstructorArgs (self, stream, variables):
# Constructor args are config elements with read-create access
result = ""
for element in self.properties:
if element.isConstructorArg ():
stream.write (", ")
- element.genFormalParam (stream)
+ element.genFormalParam (stream, variables)
def genConstructorInits (self, stream, variables):
for element in self.properties:
@@ -831,8 +915,17 @@ class SchemaClass:
def genEventCount (self, stream, variables):
stream.write ("%d" % len (self.events))
+ def genEventMethodBodies (self, stream, variables):
+ for event in self.events:
+ event.genMethodBody (stream, variables, self)
+
+ def genEventMethodDecls (self, stream, variables):
+ for event in self.events:
+ event.genMethodDecl (stream, variables)
+
def genEventSchema (self, stream, variables):
- pass ###########################################################################
+ for event in self.events:
+ event.genSchema (stream, variables)
def genHiLoStatResets (self, stream, variables):
for inst in self.statistics:
@@ -884,10 +977,6 @@ class SchemaClass:
if element.type.type.perThread:
element.genDeclaration (stream, " ")
- def genInstElementSchema (self, stream, variables):
- for inst in self.statistics:
- inst.genSchema (stream)
-
def genMethodArgIncludes (self, stream, variables):
for method in self.methods:
if method.getArgCount () > 0:
@@ -898,7 +987,7 @@ class SchemaClass:
def genMethodHandlers (self, stream, variables):
for method in self.methods:
- stream.write ("\n if (methodName == \"" + method.getName () + "\")\n {\n")
+ stream.write ("\n if (methodName == \"" + method.getName () + "\") {\n")
if method.getArgCount () == 0:
stream.write (" ArgsNone ioArgs;\n")
else:
@@ -922,10 +1011,36 @@ class SchemaClass:
arg.name, "outBuf") + ";\n")
stream.write (" return;\n }\n")
+ def genPresenceMaskBytes (self, stream, variables):
+ count = 0
+ for prop in self.properties:
+ if prop.isOptional == 1:
+ count += 1
+ if count == 0:
+ stream.write("0")
+ else:
+ stream.write (str(((count - 1) / 8) + 1))
+
+ def genPresenceMaskConstants (self, stream, variables):
+ count = 0
+ for prop in self.properties:
+ if prop.isOptional == 1:
+ stream.write(" static const uint8_t presenceByte_%s = %d;\n" % (prop.name, count / 8))
+ stream.write(" static const uint8_t presenceMask_%s = %d;\n" % (prop.name, 1 << (count % 8)))
+ count += 1
+
+ def genPropertySchema (self, stream, variables):
+ for prop in self.properties:
+ prop.genSchema (stream)
+
def genSetGeneralReferenceDeclaration (self, stream, variables):
for prop in self.properties:
if prop.isGeneralRef:
- stream.write ("void setReference(uint64_t objectId) { " + prop.name + " = objectId; }\n")
+ stream.write ("void setReference(ObjectId objectId) { " + prop.name + " = objectId; }\n")
+
+ def genStatisticSchema (self, stream, variables):
+ for stat in self.statistics:
+ stat.genSchema (stream)
def genMethodIdDeclarations (self, stream, variables):
number = 1
@@ -983,13 +1098,13 @@ class SchemaClass:
if inst.type.type.perThread:
inst.genAssign (stream)
- def genWriteConfig (self, stream, variables):
- for config in self.properties:
- config.genWrite (stream)
+ def genWriteProperties (self, stream, variables):
+ for prop in self.properties:
+ prop.genWrite (stream)
- def genWriteInst (self, stream, variables):
- for inst in self.statistics:
- inst.genWrite (stream)
+ def genWriteStatistics (self, stream, variables):
+ for stat in self.statistics:
+ stat.genWrite (stream)
@@ -1046,15 +1161,9 @@ class PackageSchema:
def genClassRegisters (self, stream, variables):
for _class in self.classes:
- stream.write ("agent->RegisterClass (")
- _class.genNameCap (stream, variables)
- stream.write ("::packageName, ")
- _class.genNameCap (stream, variables)
- stream.write ("::className, ")
- _class.genNameCap (stream, variables)
- stream.write ("::md5Sum, ")
+ stream.write (" ")
_class.genNameCap (stream, variables)
- stream.write ("::writeSchema);\n")
+ stream.write ("::registerClass(agent);\n")
#=====================================================================================
diff --git a/qpid/cpp/managementgen/templates/Class.cpp b/qpid/cpp/managementgen/templates/Class.cpp
index 289427d742..2a0e55b34d 100644
--- a/qpid/cpp/managementgen/templates/Class.cpp
+++ b/qpid/cpp/managementgen/templates/Class.cpp
@@ -42,6 +42,11 @@ uint8_t /*MGEN:Class.NameCap*/::md5Sum[16] =
{
/*MGEN:Class.ParentRefAssignment*/
/*MGEN:Class.InitializeElements*/
+/*MGEN:IF(Class.ExistOptionals)*/
+ // Optional properties start out not-present
+ for (uint8_t idx = 0; idx < /*MGEN:Class.PresenceMaskBytes*/; idx++)
+ presenceMask[idx] = 0;
+/*MGEN:ENDIF*/
/*MGEN:IF(Class.ExistPerThreadStats)*/
maxThreads = agent->getMaxThreads();
perThreadStatsArray = new struct PerThreadStats*[maxThreads];
@@ -65,6 +70,7 @@ namespace {
const string TYPE("type");
const string ACCESS("access");
const string INDEX("index");
+ const string OPTIONAL("optional");
const string UNIT("unit");
const string MIN("min");
const string MAX("max");
@@ -76,6 +82,11 @@ namespace {
const string DEFAULT("default");
}
+void /*MGEN:Class.NameCap*/::registerClass(ManagementAgent* agent)
+{
+ agent->RegisterClass(packageName, className, md5Sum, writeSchema);
+}
+
void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf)
{
FieldTable ft;
@@ -90,9 +101,9 @@ void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf)
buf.putShort (/*MGEN:Class.EventCount*/); // Event Count
// Properties
-/*MGEN:Class.ConfigElementSchema*/
+/*MGEN:Class.PropertySchema*/
// Statistics
-/*MGEN:Class.InstElementSchema*/
+/*MGEN:Class.StatisticSchema*/
// Methods
/*MGEN:Class.MethodSchema*/
// Events
@@ -118,7 +129,11 @@ void /*MGEN:Class.NameCap*/::writeProperties (Buffer& buf)
configChanged = false;
writeTimestamps (buf);
-/*MGEN:Class.WriteConfig*/
+/*MGEN:IF(Class.ExistOptionals)*/
+ for (uint8_t idx = 0; idx < /*MGEN:Class.PresenceMaskBytes*/; idx++)
+ buf.putOctet(presenceMask[idx]);
+/*MGEN:ENDIF*/
+/*MGEN:Class.WriteProperties*/
}
void /*MGEN:Class.NameCap*/::writeStatistics (Buffer& buf, bool skipHeaders)
@@ -140,7 +155,7 @@ void /*MGEN:Class.NameCap*/::writeStatistics (Buffer& buf, bool skipHeaders)
/*MGEN:Class.Assign*/
if (!skipHeaders)
writeTimestamps (buf);
-/*MGEN:Class.WriteInst*/
+/*MGEN:Class.WriteStatistics*/
// Maintenance of hi-lo statistics
/*MGEN:Class.HiLoStatResets*/
@@ -162,3 +177,4 @@ void /*MGEN:Class.NameCap*/::doMethod (/*MGEN:Class.DoMethodArgs*/)
outBuf.putShortString (Manageable::StatusText (status));
}
+/*MGEN:Class.EventMethodBodies*/
diff --git a/qpid/cpp/managementgen/templates/Class.h b/qpid/cpp/managementgen/templates/Class.h
index fac63d5d55..40ad20eb85 100644
--- a/qpid/cpp/managementgen/templates/Class.h
+++ b/qpid/cpp/managementgen/templates/Class.h
@@ -37,6 +37,10 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject
static std::string packageName;
static std::string className;
static uint8_t md5Sum[16];
+/*MGEN:IF(Class.ExistOptionals)*/
+ uint8_t presenceMask[/*MGEN:Class.PresenceMaskBytes*/];
+/*MGEN:Class.PresenceMaskConstants*/
+/*MGEN:ENDIF*/
// Properties
/*MGEN:Class.ConfigDeclarations*/
@@ -78,14 +82,13 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject
/*MGEN:ENDIF*/
public:
- friend class Package/*MGEN:Class.NamePackageCap*/;
-
/*MGEN:Class.NameCap*/ (ManagementAgent* agent,
Manageable* coreObject/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/);
~/*MGEN:Class.NameCap*/ (void);
/*MGEN:Class.SetGeneralReferenceDeclaration*/
+ static void registerClass (ManagementAgent* agent);
std::string& getPackageName (void) { return packageName; }
std::string& getClassName (void) { return className; }
uint8_t* getMd5Sum (void) { return md5Sum; }
@@ -94,6 +97,8 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject
/*MGEN:Class.MethodIdDeclarations*/
// Accessor Methods
/*MGEN:Class.AccessorMethods*/
+ // Event Methods
+/*MGEN:Class.EventMethodDecls*/
};
}}
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgent.h b/qpid/cpp/src/qpid/agent/ManagementAgent.h
index e7379e6c94..1c219f7463 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/agent/ManagementAgent.h
@@ -65,10 +65,14 @@ class ManagementAgent
// agent's thread. In this case, the callback implementations
// MUST be thread safe.
//
+ // storeFile - File where this process has read and write access. This
+ // file shall be used to store persistent state.
+ //
virtual void init (std::string brokerHost = "localhost",
uint16_t brokerPort = 5672,
uint16_t intervalSeconds = 10,
- bool useExternalThread = false) = 0;
+ bool useExternalThread = false,
+ std::string storeFile = "") = 0;
// Register a schema with the management agent. This is normally called by the
// package initializer generated by the management code generator.
@@ -93,9 +97,8 @@ class ManagementAgent
// pointer. This allows the management agent to report the deletion of the object
// in an orderly way.
//
- virtual uint64_t addObject (ManagementObject* objectPtr,
- uint32_t persistId = 0,
- uint32_t persistBank = 4) = 0;
+ virtual ObjectId addObject (ManagementObject* objectPtr,
+ uint64_t persistId = 0) = 0;
// If "useExternalThread" was set to true in init, this method must
// be called to provide a thread for any pending method calls that have arrived.
@@ -120,6 +123,12 @@ class ManagementAgent
//
virtual int getSignalFd (void) = 0;
+protected:
+ friend class ManagementObject;
+ virtual sys::Mutex& getMutex() = 0;
+ virtual framing::Buffer* startEventLH() = 0;
+ virtual void finishEventLH(framing::Buffer* buf) = 0;
+
};
}}
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index ebdc71e3b1..c4108b0ae2 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -24,12 +24,21 @@
#include <list>
#include <unistd.h>
#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <iostream>
+#include <fstream>
+
using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::management;
using namespace qpid::sys;
using std::stringstream;
+using std::ofstream;
+using std::ifstream;
using std::string;
using std::cout;
using std::endl;
@@ -66,128 +75,186 @@ ManagementAgent* ManagementAgent::Singleton::getInstance()
return agent;
}
+const string ManagementAgentImpl::storeMagicNumber("MA01");
+
ManagementAgentImpl::ManagementAgentImpl() :
- clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread), startupWait(false)
+ extThread(false), writeFd(-1), readFd(-1),
+ clientWasAdded(true), requestedBank(0),
+ assignedBank(0), brokerBank(0), bootSequence(0),
+ connThreadBody(*this), connThread(connThreadBody),
+ pubThreadBody(*this), pubThread(pubThreadBody)
{
// TODO: Establish system ID
}
-void ManagementAgentImpl::init(std::string brokerHost,
- uint16_t brokerPort,
- uint16_t intervalSeconds,
- bool useExternalThread)
+void ManagementAgentImpl::init(string brokerHost,
+ uint16_t brokerPort,
+ uint16_t intervalSeconds,
+ bool useExternalThread,
+ string _storeFile)
{
- {
- Mutex::ScopedLock lock(agentLock);
- startupWait = true;
- }
-
interval = intervalSeconds;
extThread = useExternalThread;
+ storeFile = _storeFile;
nextObjectId = 1;
+ host = brokerHost;
+ port = brokerPort;
+
+ // TODO: Abstract the socket calls for portability
+ if (extThread) {
+ int pair[2];
+ int result = socketpair(PF_LOCAL, SOCK_STREAM, 0, pair);
+ if (result == -1) {
+ return;
+ }
+ writeFd = pair[0];
+ readFd = pair[1];
- sessionId.generate();
- queueName << "qmfagent-" << sessionId;
- string dest = "qmfagent";
-
- connection.open(brokerHost.c_str(), brokerPort);
- session = connection.newSession (queueName.str());
- dispatcher = new client::Dispatcher(session);
+ // Set the readFd to non-blocking
+ int flags = fcntl(readFd, F_GETFL);
+ fcntl(readFd, F_SETFL, flags | O_NONBLOCK);
+ }
+ retrieveData();
+ bootSequence++;
+ if ((bootSequence & 0xF000) != 0)
+ bootSequence = 1;
+ storeData(true);
+}
- session.queueDeclare (arg::queue=queueName.str());
- session.exchangeBind (arg::exchange="amq.direct", arg::queue=queueName.str(),
- arg::bindingKey=queueName.str());
- session.messageSubscribe (arg::queue=queueName.str(),
- arg::destination=dest);
- session.messageFlow (arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF);
- session.messageFlow (arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF);
+ManagementAgentImpl::~ManagementAgentImpl()
+{
+}
- Message attachRequest;
- char rawbuffer[512];
- Buffer buffer (rawbuffer, 512);
+void ManagementAgentImpl::RegisterClass(std::string packageName,
+ std::string className,
+ uint8_t* md5Sum,
+ management::ManagementObject::writeSchemaCall_t schemaCall)
+{
+ Mutex::ScopedLock lock(agentLock);
+ PackageMap::iterator pIter = FindOrAddPackage(packageName);
+ AddClassLocal(pIter, className, md5Sum, schemaCall);
+}
- attachRequest.getDeliveryProperties().setRoutingKey("broker");
- attachRequest.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
+ObjectId ManagementAgentImpl::addObject(ManagementObject* object,
+ uint64_t persistId)
+{
+ Mutex::ScopedLock lock(addLock);
+ uint16_t sequence = persistId ? 0 : bootSequence;
+ uint64_t objectNum = persistId ? persistId : nextObjectId++;
- EncodeHeader (buffer, 'A');
- buffer.putShortString ("RemoteAgent [C++]");
- systemId.encode (buffer);
- buffer.putLong (11);
+ ObjectId objectId(&attachment, 0, sequence, objectNum);
- size_t length = 512 - buffer.available ();
- string stringBuffer (rawbuffer, length);
- attachRequest.setData (stringBuffer);
+ // TODO: fix object-id handling
+ object->setObjectId(objectId);
+ newManagementObjects[objectId] = object;
+ return objectId;
+}
- session.messageTransfer(arg::content=attachRequest, arg::destination="qpid.management");
+uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
+{
+ Mutex::ScopedLock lock(agentLock);
- dispatcher->listen(dest, this);
- dispatcher->start();
+ for (uint32_t idx = 0; callLimit == 0 || idx < callLimit; idx++) {
+ if (methodQueue.empty())
+ break;
- {
- Mutex::ScopedLock lock(agentLock);
- if (startupWait)
- startupCond.wait(agentLock);
+ QueuedMethod* item = methodQueue.front();
+ methodQueue.pop_front();
+ {
+ Mutex::ScopedUnlock unlock(agentLock);
+ Buffer inBuffer(const_cast<char*>(item->body.c_str()), item->body.size());
+ invokeMethodRequest(inBuffer, item->sequence, item->replyTo);
+ delete item;
+ }
}
+
+ uint8_t rbuf[100];
+ while (read(readFd, rbuf, 100) > 0); // Consume all signaling bytes
+ return methodQueue.size();
}
-ManagementAgentImpl::~ManagementAgentImpl()
+int ManagementAgentImpl::getSignalFd(void)
{
- dispatcher->stop();
- session.close();
- delete dispatcher;
+ return readFd;
}
-void ManagementAgentImpl::RegisterClass (std::string packageName,
- std::string className,
- uint8_t* md5Sum,
- management::ManagementObject::writeSchemaCall_t schemaCall)
-{
- Mutex::ScopedLock lock(agentLock);
- PackageMap::iterator pIter = FindOrAddPackage (packageName);
- AddClassLocal (pIter, className, md5Sum, schemaCall);
+void ManagementAgentImpl::startProtocol()
+{
+ char rawbuffer[512];
+ Buffer buffer(rawbuffer, 512);
+
+ EncodeHeader(buffer, 'A');
+ buffer.putShortString("RemoteAgent [C++]");
+ systemId.encode (buffer);
+ buffer.putLong(requestedBank);
+ uint32_t length = 512 - buffer.available();
+ buffer.reset();
+ connThreadBody.sendBuffer(buffer, length, "qpid.management", "broker");
}
-uint64_t ManagementAgentImpl::addObject (ManagementObject* object,
- uint32_t /*persistId*/,
- uint32_t /*persistBank*/)
+void ManagementAgentImpl::storeData(bool requested)
{
- Mutex::ScopedLock lock(addLock);
- uint64_t objectId;
+ if (!storeFile.empty()) {
+ ofstream outFile(storeFile.c_str());
+ uint32_t bankToWrite = requested ? requestedBank : assignedBank;
- // TODO: fix object-id handling
- objectId = objIdPrefix | ((nextObjectId++) & 0x00FFFFFF);
- object->setObjectId (objectId);
- newManagementObjects[objectId] = object;
- return objectId;
+ if (outFile.good()) {
+ outFile << storeMagicNumber << " " << bankToWrite << " " << bootSequence << endl;
+ outFile.close();
+ }
+ }
}
-uint32_t ManagementAgentImpl::pollCallbacks(uint32_t /*callLimit*/)
+void ManagementAgentImpl::retrieveData()
{
- return 0;
+ if (!storeFile.empty()) {
+ ifstream inFile(storeFile.c_str());
+ string mn;
+
+ if (inFile.good()) {
+ inFile >> mn;
+ if (mn == storeMagicNumber) {
+ inFile >> requestedBank;
+ inFile >> bootSequence;
+ }
+ inFile.close();
+ }
+ }
}
-int ManagementAgentImpl::getSignalFd(void)
+void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequence,
+ uint32_t code, string text)
{
- return -1;
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader(outBuffer, 'z', sequence);
+ outBuffer.putLong(code);
+ outBuffer.putShortString(text);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyToKey);
}
void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
{
Mutex::ScopedLock lock(agentLock);
- uint32_t assigned;
- stringstream key;
- assigned = inBuffer.getLong();
- objIdPrefix = ((uint64_t) assigned) << 24;
+ brokerBank = inBuffer.getLong();
+ assignedBank = inBuffer.getLong();
+ if (assignedBank != requestedBank) {
+ if (requestedBank == 0)
+ cout << "Initial object-id bank assigned: " << assignedBank << endl;
+ else
+ cout << "Collision in object-id! New bank assigned: " << assignedBank << endl;
+ storeData();
+ }
- startupWait = false;
- startupCond.notify();
+ attachment.setBanks(brokerBank, assignedBank);
// Bind to qpid.management to receive commands
- key << "agent." << assigned;
- session.exchangeBind (arg::exchange="qpid.management", arg::queue=queueName.str(),
- arg::bindingKey=key.str());
+ connThreadBody.bindToBank(assignedBank);
// Send package indications for all local packages
for (PackageMap::iterator pIter = packages.begin();
@@ -198,9 +265,9 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
EncodeHeader(outBuffer, 'p');
EncodePackageIndication(outBuffer, pIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, "qpid.management", "broker");
+ connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
// Send class indications for all local classes
ClassMap cMap = pIter->second;
@@ -208,9 +275,9 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
outBuffer.reset();
EncodeHeader(outBuffer, 'q');
EncodeClassIndication(outBuffer, pIter, cIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, "qpid.management", "broker");
+ connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
}
}
}
@@ -236,9 +303,9 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc
EncodeHeader(outBuffer, 's', sequence);
schema.writeSchemaCall(outBuffer);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, "qpid.management", "broker");
+ connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
}
}
}
@@ -249,28 +316,93 @@ void ManagementAgentImpl::handleConsoleAddedIndication()
clientWasAdded = true;
}
-void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
+void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
{
string methodName;
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ string packageName;
+ string className;
+ uint8_t hash[16];
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- uint64_t objId = inBuffer.getLongLong();
+ ObjectId objId(inBuffer);
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(className);
+ inBuffer.getBin128(hash);
inBuffer.getShortString(methodName);
EncodeHeader(outBuffer, 'm', sequence);
ManagementObjectMap::iterator iter = managementObjects.find(objId);
if (iter == managementObjects.end() || iter->second->isDeleted()) {
- outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
- outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
+ outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
+ outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT));
} else {
- iter->second->doMethod(methodName, inBuffer, outBuffer);
+ if ((iter->second->getPackageName() != packageName) ||
+ (iter->second->getClassName() != className)) {
+ outBuffer.putLong (Manageable::STATUS_INVALID_PARAMETER);
+ outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER));
+ }
+ else
+ iter->second->doMethod(methodName, inBuffer, outBuffer);
}
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+ connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+}
+
+void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, string replyTo)
+{
+ FieldTable ft;
+ FieldTable::ValuePtr value;
+
+ moveNewObjectsLH();
+
+ ft.decode(inBuffer);
+ value = ft.get("_class");
+ if (value.get() == 0 || !value->convertsTo<string>())
+ {
+ // TODO: Send completion with an error code
+ return;
+ }
+
+ string className(value->get<string>());
+
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
+ iter++)
+ {
+ ManagementObject* object = iter->second;
+ if (object->getClassName() == className)
+ {
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader(outBuffer, 'g', sequence);
+ object->writeProperties(outBuffer);
+ object->writeStatistics(outBuffer, true);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+ }
+ }
+
+ sendCommandComplete(replyTo, sequence);
+}
+
+void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
+{
+ if (extThread) {
+ Mutex::ScopedLock lock(agentLock);
+ string body;
+
+ inBuffer.getRawData(body, inBuffer.available());
+ methodQueue.push_back(new QueuedMethod(sequence, replyTo, body));
+ write(writeFd, "X", 1);
+ } else {
+ invokeMethodRequest(inBuffer, sequence, replyTo);
+ }
}
void ManagementAgentImpl::received(Message& msg)
@@ -287,103 +419,86 @@ void ManagementAgentImpl::received(Message& msg)
replyToKey = rt.getRoutingKey();
}
- if (CheckHeader (inBuffer, &opcode, &sequence))
+ if (CheckHeader(inBuffer, &opcode, &sequence))
{
if (opcode == 'a') handleAttachResponse(inBuffer);
else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence);
else if (opcode == 'x') handleConsoleAddedIndication();
+ else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey);
else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey);
}
}
-void ManagementAgentImpl::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
+void ManagementAgentImpl::EncodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq)
{
- buf.putOctet ('A');
- buf.putOctet ('M');
- buf.putOctet ('1');
- buf.putOctet (opcode);
- buf.putLong (seq);
+ buf.putOctet('A');
+ buf.putOctet('M');
+ buf.putOctet('1');
+ buf.putOctet(opcode);
+ buf.putLong (seq);
}
-bool ManagementAgentImpl::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
+bool ManagementAgentImpl::CheckHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq)
{
if (buf.getSize() < 8)
return false;
- uint8_t h1 = buf.getOctet ();
- uint8_t h2 = buf.getOctet ();
- uint8_t h3 = buf.getOctet ();
+ uint8_t h1 = buf.getOctet();
+ uint8_t h2 = buf.getOctet();
+ uint8_t h3 = buf.getOctet();
- *opcode = buf.getOctet ();
- *seq = buf.getLong ();
+ *opcode = buf.getOctet();
+ *seq = buf.getLong();
return h1 == 'A' && h2 == 'M' && h3 == '1';
}
-void ManagementAgentImpl::SendBuffer (Buffer& buf,
- uint32_t length,
- string exchange,
- string routingKey)
-{
- Message msg;
- string data;
-
- if (objIdPrefix == 0)
- return;
-
- buf.getRawData(data, length);
- msg.getDeliveryProperties().setRoutingKey(routingKey);
- msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
- msg.setData (data);
- session.messageTransfer (arg::content=msg, arg::destination=exchange);
-}
-
-ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage (std::string name)
+ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage(std::string name)
{
- PackageMap::iterator pIter = packages.find (name);
- if (pIter != packages.end ())
+ PackageMap::iterator pIter = packages.find(name);
+ if (pIter != packages.end())
return pIter;
// No such package found, create a new map entry.
std::pair<PackageMap::iterator, bool> result =
- packages.insert (std::pair<string, ClassMap> (name, ClassMap ()));
+ packages.insert(std::pair<string, ClassMap>(name, ClassMap()));
// Publish a package-indication message
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'p');
- EncodePackageIndication (outBuffer, result.first);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, "qpid.management", "mgmt.schema.package");
+ EncodeHeader(outBuffer, 'p');
+ EncodePackageIndication(outBuffer, result.first);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "mgmt.schema.package");
return result.first;
}
void ManagementAgentImpl::moveNewObjectsLH()
{
- Mutex::ScopedLock lock (addLock);
- for (ManagementObjectMap::iterator iter = newManagementObjects.begin ();
- iter != newManagementObjects.end ();
+ Mutex::ScopedLock lock(addLock);
+ for (ManagementObjectMap::iterator iter = newManagementObjects.begin();
+ iter != newManagementObjects.end();
iter++)
managementObjects[iter->first] = iter->second;
newManagementObjects.clear();
}
-void ManagementAgentImpl::AddClassLocal (PackageMap::iterator pIter,
- string className,
- uint8_t* md5Sum,
- management::ManagementObject::writeSchemaCall_t schemaCall)
+void ManagementAgentImpl::AddClassLocal(PackageMap::iterator pIter,
+ string className,
+ uint8_t* md5Sum,
+ management::ManagementObject::writeSchemaCall_t schemaCall)
{
SchemaClassKey key;
ClassMap& cMap = pIter->second;
key.name = className;
- memcpy (&key.hash, md5Sum, 16);
+ memcpy(&key.hash, md5Sum, 16);
- ClassMap::iterator cIter = cMap.find (key);
- if (cIter != cMap.end ())
+ ClassMap::iterator cIter = cMap.find(key);
+ if (cIter != cMap.end())
return;
// No such class found, create a new class with local information.
@@ -395,21 +510,21 @@ void ManagementAgentImpl::AddClassLocal (PackageMap::iterator pIter,
// TODO: Publish a class-indication message
}
-void ManagementAgentImpl::EncodePackageIndication (Buffer& buf,
- PackageMap::iterator pIter)
+void ManagementAgentImpl::EncodePackageIndication(Buffer& buf,
+ PackageMap::iterator pIter)
{
- buf.putShortString ((*pIter).first);
+ buf.putShortString((*pIter).first);
}
-void ManagementAgentImpl::EncodeClassIndication (Buffer& buf,
- PackageMap::iterator pIter,
- ClassMap::iterator cIter)
+void ManagementAgentImpl::EncodeClassIndication(Buffer& buf,
+ PackageMap::iterator pIter,
+ ClassMap::iterator cIter)
{
SchemaClassKey key = (*cIter).first;
- buf.putShortString ((*pIter).first);
- buf.putShortString (key.name);
- buf.putBin128 (key.hash);
+ buf.putShortString((*pIter).first);
+ buf.putShortString(key.name);
+ buf.putBin128 (key.hash);
}
void ManagementAgentImpl::PeriodicProcessing()
@@ -419,17 +534,17 @@ void ManagementAgentImpl::PeriodicProcessing()
char msgChars[BUFSIZE];
uint32_t contentSize;
string routingKey;
- std::list<uint64_t> deleteList;
+ std::list<ObjectId> deleteList;
{
Buffer msgBuffer(msgChars, BUFSIZE);
EncodeHeader(msgBuffer, 'h');
msgBuffer.putLongLong(uint64_t(Duration(now())));
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
+ contentSize = BUFSIZE - msgBuffer.available();
+ msgBuffer.reset();
routingKey = "mgmt." + systemId.str() + ".heartbeat";
- SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey);
+ connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey);
}
moveNewObjectsLH();
@@ -437,65 +552,171 @@ void ManagementAgentImpl::PeriodicProcessing()
if (clientWasAdded)
{
clientWasAdded = false;
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
iter++)
{
ManagementObject* object = iter->second;
- object->setAllChanged ();
+ object->setAllChanged();
}
}
- if (managementObjects.empty ())
+ if (managementObjects.empty())
return;
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
iter++)
{
ManagementObject* object = iter->second;
- if (object->getConfigChanged () || object->isDeleted ())
+ if (object->getConfigChanged() || object->isDeleted())
{
- Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer, 'c');
+ Buffer msgBuffer(msgChars, BUFSIZE);
+ EncodeHeader(msgBuffer, 'c');
object->writeProperties(msgBuffer);
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
- routingKey = "mgmt." + systemId.str() + ".prop." + object->getClassName ();
- SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey);
+ contentSize = BUFSIZE - msgBuffer.available();
+ msgBuffer.reset();
+ routingKey = "mgmt." + systemId.str() + ".prop." + object->getClassName();
+ connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey);
}
- if (object->getInstChanged ())
+ if (object->getInstChanged())
{
- Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer, 'i');
+ Buffer msgBuffer(msgChars, BUFSIZE);
+ EncodeHeader(msgBuffer, 'i');
object->writeStatistics(msgBuffer);
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
- routingKey = "mgmt." + systemId.str () + ".stat." + object->getClassName ();
- SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey);
+ contentSize = BUFSIZE - msgBuffer.available();
+ msgBuffer.reset();
+ routingKey = "mgmt." + systemId.str() + ".stat." + object->getClassName();
+ connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey);
}
- if (object->isDeleted ())
- deleteList.push_back (iter->first);
+ if (object->isDeleted())
+ deleteList.push_back(iter->first);
}
// Delete flagged objects
- for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin ();
- iter != deleteList.rend ();
+ for (std::list<ObjectId>::reverse_iterator iter = deleteList.rbegin();
+ iter != deleteList.rend();
iter++)
- managementObjects.erase (*iter);
+ managementObjects.erase(*iter);
- deleteList.clear ();
+ deleteList.clear();
}
-void ManagementAgentImpl::BackgroundThread::run()
+void ManagementAgentImpl::ConnectionThread::run()
+{
+ static const int delayMin(1);
+ static const int delayMax(128);
+ static const int delayFactor(2);
+ int delay(delayMin);
+ string dest("qmfagent");
+
+ sessionId.generate();
+ queueName << "qmfagent-" << sessionId;
+
+ while (true) {
+ try {
+ if (!agent.host.empty()) {
+ connection.open(agent.host.c_str(), agent.port);
+ session = connection.newSession(queueName.str());
+ subscriptions = new client::SubscriptionManager(session);
+
+ session.queueDeclare(arg::queue=queueName.str());
+ session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(),
+ arg::bindingKey=queueName.str());
+
+ subscriptions->subscribe(agent, queueName.str(), dest);
+ {
+ Mutex::ScopedLock _lock(connLock);
+ operational = true;
+ agent.startProtocol();
+ try {
+ Mutex::ScopedUnlock _unlock(connLock);
+ subscriptions->run();
+ } catch (std::exception) {}
+
+ operational = false;
+ }
+ delay = delayMin;
+ delete subscriptions;
+ subscriptions = 0;
+ session.close();
+ }
+ } catch (std::exception &e) {
+ if (delay < delayMax)
+ delay *= delayFactor;
+ }
+
+ ::sleep(delay);
+ }
+}
+
+ManagementAgentImpl::ConnectionThread::~ConnectionThread()
+{
+ if (subscriptions != 0) {
+ delete subscriptions;
+ }
+}
+
+void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf,
+ uint32_t length,
+ string exchange,
+ string routingKey)
+{
+ {
+ Mutex::ScopedLock _lock(connLock);
+ if (!operational)
+ return;
+ }
+
+ Message msg;
+ string data;
+
+ buf.getRawData(data, length);
+ msg.getDeliveryProperties().setRoutingKey(routingKey);
+ msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
+ msg.setData(data);
+ session.messageTransfer(arg::content=msg, arg::destination=exchange);
+}
+
+void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t agentBank)
+{
+ stringstream key;
+ key << "agent." << agentBank;
+ session.exchangeBind(arg::exchange="qpid.management", arg::queue=queueName.str(),
+ arg::bindingKey=key.str());
+}
+
+
+void ManagementAgentImpl::PublishThread::run()
{
while (true) {
::sleep(5);
agent.PeriodicProcessing();
}
}
+
+Mutex& ManagementAgentImpl::getMutex()
+{
+ return agentLock;
+}
+
+Buffer* ManagementAgentImpl::startEventLH()
+{
+ Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE));
+ EncodeHeader(*outBuffer, 'e');
+ outBuffer->putLongLong(uint64_t(Duration(now())));
+ return outBuffer;
+}
+
+void ManagementAgentImpl::finishEventLH(Buffer* outBuffer)
+{
+ uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available();
+ outBuffer->reset();
+ connThreadBody.sendBuffer(*outBuffer, outLen, "qpid.management", "mgmt.event");
+ delete outBuffer;
+}
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
index f7f19e145d..7d9be6daf9 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -22,7 +22,7 @@
#include "ManagementAgent.h"
#include "qpid/client/Connection.h"
-#include "qpid/client/Dispatcher.h"
+#include "qpid/client/SubscriptionManager.h"
#include "qpid/client/Session.h"
#include "qpid/client/AsyncSession.h"
#include "qpid/client/Message.h"
@@ -30,10 +30,10 @@
#include "qpid/sys/Thread.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/sys/Condition.h"
#include "qpid/framing/Uuid.h"
#include <iostream>
#include <sstream>
+#include <deque>
namespace qpid {
namespace management {
@@ -49,14 +49,14 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
void init(std::string brokerHost = "localhost",
uint16_t brokerPort = 5672,
uint16_t intervalSeconds = 10,
- bool useExternalThread = false);
+ bool useExternalThread = false,
+ std::string storeFile = "");
void RegisterClass(std::string packageName,
std::string className,
uint8_t* md5Sum,
management::ManagementObject::writeSchemaCall_t schemaCall);
- uint64_t addObject (management::ManagementObject* objectPtr,
- uint32_t persistId = 0,
- uint32_t persistBank = 4);
+ ObjectId addObject (management::ManagementObject* objectPtr,
+ uint64_t persistId = 0);
uint32_t pollCallbacks (uint32_t callLimit = 0);
int getSignalFd (void);
@@ -64,14 +64,12 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
private:
- struct SchemaClassKey
- {
+ struct SchemaClassKey {
std::string name;
uint8_t hash[16];
};
- struct SchemaClassKeyComp
- {
+ struct SchemaClassKeyComp {
bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
{
if (lhs.name != rhs.name)
@@ -84,53 +82,95 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
}
};
- struct SchemaClass
- {
+ struct SchemaClass {
management::ManagementObject::writeSchemaCall_t writeSchemaCall;
SchemaClass () : writeSchemaCall(0) {}
};
+ struct QueuedMethod {
+ QueuedMethod(uint32_t _seq, std::string _reply, std::string _body) :
+ sequence(_seq), replyTo(_reply), body(_body) {}
+
+ uint32_t sequence;
+ std::string replyTo;
+ std::string body;
+ };
+
+ typedef std::deque<QueuedMethod*> MethodQueue;
typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
typedef std::map<std::string, ClassMap> PackageMap;
PackageMap packages;
+ AgentAttachment attachment;
management::ManagementObjectMap managementObjects;
management::ManagementObjectMap newManagementObjects;
+ MethodQueue methodQueue;
void received (client::Message& msg);
uint16_t interval;
bool extThread;
+ int writeFd;
+ int readFd;
uint64_t nextObjectId;
+ std::string storeFile;
sys::Mutex agentLock;
sys::Mutex addLock;
- framing::Uuid sessionId;
framing::Uuid systemId;
+ std::string host;
+ uint16_t port;
- int signalFdIn, signalFdOut;
- client::Connection connection;
- client::Session session;
- client::Dispatcher* dispatcher;
bool clientWasAdded;
- uint64_t objIdPrefix;
- std::stringstream queueName;
+ uint32_t requestedBank;
+ uint32_t assignedBank;
+ uint32_t brokerBank;
+ uint16_t bootSequence;
# define MA_BUFFER_SIZE 65536
char outputBuffer[MA_BUFFER_SIZE];
+ char eventBuffer[MA_BUFFER_SIZE];
- class BackgroundThread : public sys::Runnable
+ friend class ConnectionThread;
+ class ConnectionThread : public sys::Runnable
{
+ bool operational;
ManagementAgentImpl& agent;
+ framing::Uuid sessionId;
+ client::Connection connection;
+ client::Session session;
+ client::SubscriptionManager* subscriptions;
+ std::stringstream queueName;
+ sys::Mutex connLock;
void run();
public:
- BackgroundThread(ManagementAgentImpl& _agent) : agent(_agent) {}
+ ConnectionThread(ManagementAgentImpl& _agent) :
+ operational(false), agent(_agent), subscriptions(0) {}
+ ~ConnectionThread();
+ void sendBuffer(qpid::framing::Buffer& buf,
+ uint32_t length,
+ std::string exchange,
+ std::string routingKey);
+ void bindToBank(uint32_t agentBank);
};
- BackgroundThread bgThread;
- sys::Thread thread;
- sys::Condition startupCond;
- bool startupWait;
+ class PublishThread : public sys::Runnable
+ {
+ ManagementAgentImpl& agent;
+ void run();
+ public:
+ PublishThread(ManagementAgentImpl& _agent) : agent(_agent) {}
+ };
+
+ ConnectionThread connThreadBody;
+ sys::Thread connThread;
+ PublishThread pubThreadBody;
+ sys::Thread pubThread;
+
+ static const std::string storeMagicNumber;
+ void startProtocol();
+ void storeData(bool requested=false);
+ void retrieveData();
PackageMap::iterator FindOrAddPackage (std::string name);
void moveNewObjectsLH();
void AddClassLocal (PackageMap::iterator pIter,
@@ -144,16 +184,19 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
ClassMap::iterator cIter);
void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
- void SendBuffer (qpid::framing::Buffer& buf,
- uint32_t length,
- std::string exchange,
- std::string routingKey);
+ void sendCommandComplete (std::string replyToKey, uint32_t sequence,
+ uint32_t code = 0, std::string text = std::string("OK"));
void handleAttachResponse (qpid::framing::Buffer& inBuffer);
void handlePackageRequest (qpid::framing::Buffer& inBuffer);
void handleClassQuery (qpid::framing::Buffer& inBuffer);
void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence);
+ void invokeMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
+ void handleGetQuery (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
void handleConsoleAddedIndication();
+ sys::Mutex& getMutex();
+ framing::Buffer* startEventLH();
+ void finishEventLH(framing::Buffer* outBuffer);
};
}}
diff --git a/qpid/cpp/src/qpid/broker/AclModule.h b/qpid/cpp/src/qpid/broker/AclModule.h
index ec832daf22..36a3f0baab 100644
--- a/qpid/cpp/src/qpid/broker/AclModule.h
+++ b/qpid/cpp/src/qpid/broker/AclModule.h
@@ -33,9 +33,11 @@ namespace qpid {
namespace acl {
-enum ObjectType {QUEUE, EXCHANGE, BROKER, LINK, ROUTE,OBJECTSIZE};
-enum Action {CONSUME, PUBLISH, CREATE, ACCESS, BIND, UNBIND, DELETE, PURGE, UPDATE, ACTIONSIZE};
-enum Property {NAME, DURABLE, OWNER, ROUTINGKEY, PASSIVE, AUTODELETE, EXCLUSIVE, TYPE, ALTERNATE, QUEUENAME};
+enum ObjectType {QUEUE, EXCHANGE, BROKER, LINK, ROUTE, METHOD, OBJECTSIZE}; // OBJECTSIZE must be last in list
+enum Action {CONSUME, PUBLISH, CREATE, ACCESS, BIND, UNBIND, DELETE, PURGE,
+ UPDATE, ACTIONSIZE}; // ACTIONSIZE must be last in list
+enum Property {NAME, DURABLE, OWNER, ROUTINGKEY, PASSIVE, AUTODELETE, EXCLUSIVE, TYPE, ALTERNATE,
+ QUEUENAME, SCHEMAPACKAGE, SCHEMACLASS};
enum AclResult {ALLOW, ALLOWLOG, DENY, DENYLOG};
} // namespace acl
@@ -74,6 +76,7 @@ class AclHelper {
if (str.compare("broker") == 0) return BROKER;
if (str.compare("link") == 0) return LINK;
if (str.compare("route") == 0) return ROUTE;
+ if (str.compare("method") == 0) return METHOD;
throw str;
}
static inline std::string getObjectTypeStr(const ObjectType o) {
@@ -83,6 +86,7 @@ class AclHelper {
case BROKER: return "broker";
case LINK: return "link";
case ROUTE: return "route";
+ case METHOD: return "method";
default: assert(false); // should never get here
}
}
@@ -123,6 +127,8 @@ class AclHelper {
if (str.compare("type") == 0) return TYPE;
if (str.compare("alternate") == 0) return ALTERNATE;
if (str.compare("queuename") == 0) return QUEUENAME;
+ if (str.compare("schemapackage") == 0) return SCHEMAPACKAGE;
+ if (str.compare("schemaclass") == 0) return SCHEMACLASS;
throw str;
}
static inline std::string getPropertyStr(const Property p) {
@@ -137,6 +143,8 @@ class AclHelper {
case TYPE: return "type";
case ALTERNATE: return "alternate";
case QUEUENAME: return "queuename";
+ case SCHEMAPACKAGE: return "schemapackage";
+ case SCHEMACLASS: return "schemaclass";
default: assert(false); // should never get here
}
}
@@ -231,6 +239,17 @@ class AclHelper {
a3->insert(actionPair(DELETE, p0));
map->insert(objectPair(ROUTE, a3));
+
+ // == Method ==
+
+ propSetPtr p5(new propSet);
+ p5->insert(SCHEMAPACKAGE);
+ p5->insert(SCHEMACLASS);
+
+ actionMapPtr a4(new actionMap);
+ a4->insert(actionPair(ACCESS, p5));
+
+ map->insert(objectPair(METHOD, a4));
}
};
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index e983aee5c9..94a392b921 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -158,10 +158,12 @@ Broker::Broker(const Broker::Options& conf) :
mgmtObject->set_stagingThreshold (conf.stagingThreshold);
mgmtObject->set_mgmtPubInterval (conf.mgmtPubInterval);
mgmtObject->set_version (PACKAGE_VERSION);
- mgmtObject->set_dataDirEnabled (dataDir.isEnabled ());
- mgmtObject->set_dataDir (dataDir.getPath ());
+ if (dataDir.isEnabled())
+ mgmtObject->set_dataDir(dataDir.getPath());
+ else
+ mgmtObject->clr_dataDir();
- managementAgent->addObject (mgmtObject, 2, 1);
+ managementAgent->addObject (mgmtObject, 0x1000000000000002LL);
// Since there is currently no support for virtual hosts, a placeholder object
// representing the implied single virtual host is added here to keep the
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index fbfcaede82..6416e2fc73 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -57,9 +57,9 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
mgmtExchange = new management::Exchange (agent, this, parent, _name, durable);
if (!durable) {
if (name == "")
- agent->addObject (mgmtExchange, 4, 1); // Special default exchange ID
+ agent->addObject (mgmtExchange, 0x1000000000000004LL); // Special default exchange ID
else if (name == "qpid.management")
- agent->addObject (mgmtExchange, 5, 1); // Special management exchange ID
+ agent->addObject (mgmtExchange, 0x1000000000000005LL); // Special management exchange ID
else
agent->addObject (mgmtExchange);
}
@@ -78,7 +78,7 @@ void Exchange::setPersistenceId(uint64_t id) const
if (mgmtExchange != 0 && persistenceId == 0)
{
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
- agent->addObject (mgmtExchange, id, 2);
+ agent->addObject (mgmtExchange, 0x2000000000000000LL + id);
}
persistenceId = id;
}
@@ -130,7 +130,7 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang
ManagementObject* mo = queue->GetManagementObject();
if (mo != 0)
{
- uint64_t queueId = mo->getObjectId();
+ management::ObjectId queueId = mo->getObjectId();
mgmtBinding = new management::Binding (agent, this, (Manageable*) parent, queueId, key, args);
agent->addObject (mgmtBinding);
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 40dfb80da2..090c4b4bca 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -618,7 +618,7 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const
if (mgmtObject != 0 && persistenceId == 0)
{
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
- agent->addObject (mgmtObject, _persistenceId, 3);
+ agent->addObject (mgmtObject, 0x3000000000000000LL + _persistenceId);
if (externalQueueStore) {
ManagementObject* childObj = externalQueueStore->GetManagementObject();
diff --git a/qpid/cpp/src/qpid/broker/System.cpp b/qpid/cpp/src/qpid/broker/System.cpp
index 6c58339432..d562c43069 100644
--- a/qpid/cpp/src/qpid/broker/System.cpp
+++ b/qpid/cpp/src/qpid/broker/System.cpp
@@ -73,7 +73,7 @@ System::System (string _dataDir) : mgmtObject(0)
mgmtObject->set_machine (std::string (_uname.machine));
}
- agent->addObject (mgmtObject, 1, 1);
+ agent->addObject (mgmtObject, 0x1000000000000001LL);
}
}
diff --git a/qpid/cpp/src/qpid/broker/Vhost.cpp b/qpid/cpp/src/qpid/broker/Vhost.cpp
index 23203ec13e..c0eb6f03ed 100644
--- a/qpid/cpp/src/qpid/broker/Vhost.cpp
+++ b/qpid/cpp/src/qpid/broker/Vhost.cpp
@@ -32,7 +32,7 @@ Vhost::Vhost (management::Manageable* parentBroker) : mgmtObject(0)
if (agent != 0)
{
mgmtObject = new management::Vhost (agent, this, parentBroker, "/");
- agent->addObject (mgmtObject, 3, 1);
+ agent->addObject (mgmtObject, 0x1000000000000003LL);
}
}
}
diff --git a/qpid/cpp/src/qpid/framing/FieldTable.h b/qpid/cpp/src/qpid/framing/FieldTable.h
index 3c65d31aee..ed27f3fef6 100644
--- a/qpid/cpp/src/qpid/framing/FieldTable.h
+++ b/qpid/cpp/src/qpid/framing/FieldTable.h
@@ -58,6 +58,7 @@ class FieldTable
int count() const;
void set(const std::string& name, const ValuePtr& value);
ValuePtr get(const std::string& name) const;
+ bool isSet(const std::string& name) const { return get(name).get() != 0; }
void setString(const std::string& name, const std::string& value);
void setInt(const std::string& name, int value);
diff --git a/qpid/cpp/src/qpid/management/Manageable.cpp b/qpid/cpp/src/qpid/management/Manageable.cpp
index 0f3fbab55c..19fba87fde 100644
--- a/qpid/cpp/src/qpid/management/Manageable.cpp
+++ b/qpid/cpp/src/qpid/management/Manageable.cpp
@@ -31,6 +31,7 @@ std::string Manageable::StatusText (status_t status)
case STATUS_NOT_IMPLEMENTED : return "NotImplemented";
case STATUS_INVALID_PARAMETER : return "InvalidParameter";
case STATUS_FEATURE_NOT_IMPLEMENTED : return "FeatureNotImplemented";
+ case STATUS_FORBIDDEN : return "Forbidden";
}
return "??";
diff --git a/qpid/cpp/src/qpid/management/Manageable.h b/qpid/cpp/src/qpid/management/Manageable.h
index e2b8980465..4e2f33b625 100644
--- a/qpid/cpp/src/qpid/management/Manageable.h
+++ b/qpid/cpp/src/qpid/management/Manageable.h
@@ -44,6 +44,7 @@ class Manageable
static const status_t STATUS_NOT_IMPLEMENTED = 3;
static const status_t STATUS_INVALID_PARAMETER = 4;
static const status_t STATUS_FEATURE_NOT_IMPLEMENTED = 5;
+ static const status_t STATUS_FORBIDDEN = 6;
// Every "Manageable" object must hold a reference to exactly one
// management object. This object is always of a class derived from
diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.cpp b/qpid/cpp/src/qpid/management/ManagementBroker.cpp
index 1bdd8ab836..17f5c14592 100644
--- a/qpid/cpp/src/qpid/management/ManagementBroker.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementBroker.cpp
@@ -27,6 +27,7 @@
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/sys/Time.h"
#include "qpid/broker/ConnectionState.h"
+#include "qpid/broker/AclModule.h"
#include <list>
#include <iostream>
#include <fstream>
@@ -80,8 +81,8 @@ ManagementBroker::RemoteAgent::~RemoteAgent ()
ManagementBroker::ManagementBroker () :
threadPoolSize(1), interval(10), broker(0)
{
- localBank = 5;
nextObjectId = 1;
+ brokerBank = 1;
bootSequence = 1;
nextRemoteBank = 10;
nextRequestSequence = 1;
@@ -112,7 +113,7 @@ ManagementBroker::~ManagementBroker ()
}
}
-void ManagementBroker::configure(string _dataDir, uint16_t _interval, Manageable* _broker, int _threads)
+void ManagementBroker::configure(string _dataDir, uint16_t _interval, broker::Broker* _broker, int _threads)
{
dataDir = _dataDir;
interval = _interval;
@@ -140,7 +141,10 @@ void ManagementBroker::configure(string _dataDir, uint16_t _interval, Manageable
inFile.close();
QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid);
+ // if sequence goes beyond a 12-bit field, skip zero and wrap to 1.
bootSequence++;
+ if (bootSequence & 0xF000)
+ bootSequence = 1;
writeData();
}
else
@@ -183,29 +187,26 @@ void ManagementBroker::RegisterClass (string packageName,
AddClass(pIter, className, md5Sum, schemaCall);
}
-uint64_t ManagementBroker::addObject (ManagementObject* object,
- uint32_t persistId,
- uint32_t persistBank)
+ObjectId ManagementBroker::addObject (ManagementObject* object,
+ uint64_t persistId)
{
Mutex::ScopedLock lock (addLock);
- uint64_t objectId;
+ uint16_t sequence;
+ uint64_t objectNum;
- if (persistId == 0)
- {
- objectId = ((uint64_t) bootSequence) << 48 |
- ((uint64_t) localBank) << 24 | nextObjectId++;
- if ((nextObjectId & 0xFF000000) != 0)
- {
- nextObjectId = 1;
- localBank++;
- }
+ if (persistId == 0) {
+ sequence = bootSequence;
+ objectNum = nextObjectId++;
+ } else {
+ sequence = 0;
+ objectNum = persistId;
}
- else
- objectId = ((uint64_t) persistBank) << 24 | persistId;
- object->setObjectId (objectId);
- newManagementObjects[objectId] = object;
- return objectId;
+ ObjectId objId(0 /*flags*/ , sequence, brokerBank, 0, objectNum);
+
+ object->setObjectId(objId);
+ newManagementObjects[objId] = object;
+ return objId;
}
ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds)
@@ -308,7 +309,7 @@ void ManagementBroker::PeriodicProcessing (void)
char msgChars[BUFSIZE];
uint32_t contentSize;
string routingKey;
- std::list<uint64_t> deleteList;
+ std::list<ObjectId> deleteList;
{
Buffer msgBuffer(msgChars, BUFSIZE);
@@ -373,7 +374,7 @@ void ManagementBroker::PeriodicProcessing (void)
}
// Delete flagged objects
- for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin ();
+ for (std::list<ObjectId>::reverse_iterator iter = deleteList.rbegin ();
iter != deleteList.rend ();
iter++)
managementObjects.erase (*iter);
@@ -408,48 +409,72 @@ bool ManagementBroker::dispatchCommand (Deliverable& deliverable,
// Parse the routing key. This management broker should act as though it
// is bound to the exchange to match the following keys:
//
- // agent.<X>.#
- // broker.#
- //
- // where <X> is any non-negative decimal integer less than the lowest remote
- // object-id bank.
+ // agent.0.#
+ // broker
if (routingKey == "broker") {
- dispatchAgentCommandLH (msg);
+ dispatchAgentCommandLH(msg);
+ return false;
+ }
+
+ else if (routingKey.compare(0, 7, "agent.0") == 0) {
+ dispatchAgentCommandLH(msg);
return false;
}
else if (routingKey.compare(0, 6, "agent.") == 0) {
- std::string::size_type delim = routingKey.find('.', 6);
- if (delim == string::npos)
- delim = routingKey.length();
- string bank = routingKey.substr(6, delim - 6);
- if ((uint32_t) atoi(bank.c_str()) <= localBank) {
- dispatchAgentCommandLH (msg);
- return false;
- }
+ return authorizeAgentMessageLH(msg);
}
return true;
}
-void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey,
+ uint32_t sequence, const ConnectionToken* connToken)
{
string methodName;
+ string packageName;
+ string className;
+ uint8_t hash[16];
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
+ AclModule* acl = broker->getAcl();
- uint64_t objId = inBuffer.getLongLong();
+ ObjectId objId(inBuffer);
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(className);
+ inBuffer.getBin128(hash);
inBuffer.getShortString(methodName);
-
EncodeHeader(outBuffer, 'm', sequence);
+ if (acl != 0) {
+ string userId = ((const broker::ConnectionState*) connToken)->getUserId();
+ std::map<acl::Property, string> params;
+ params[acl::SCHEMAPACKAGE] = packageName;
+ params[acl::SCHEMACLASS] = className;
+
+ if (!acl->authorise(userId, acl::ACCESS, acl::METHOD, methodName, &params)) {
+ outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
+ outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ return;
+ }
+ }
+
ManagementObjectMap::iterator iter = managementObjects.find(objId);
if (iter == managementObjects.end() || iter->second->isDeleted()) {
outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
} else {
- iter->second->doMethod(methodName, inBuffer, outBuffer);
+ if ((iter->second->getPackageName() != packageName) ||
+ (iter->second->getClassName() != className)) {
+ outBuffer.putLong (Manageable::STATUS_INVALID_PARAMETER);
+ outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER));
+ }
+ else
+ iter->second->doMethod(methodName, inBuffer, outBuffer);
}
outLen = MA_BUFFER_SIZE - outBuffer.available();
@@ -497,34 +522,33 @@ void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey
FindOrAddPackageLH(packageName);
}
-void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence)
{
std::string packageName;
- inBuffer.getShortString (packageName);
- PackageMap::iterator pIter = packages.find (packageName);
- if (pIter != packages.end ())
+ inBuffer.getShortString(packageName);
+ PackageMap::iterator pIter = packages.find(packageName);
+ if (pIter != packages.end())
{
ClassMap cMap = pIter->second;
- for (ClassMap::iterator cIter = cMap.begin ();
- cIter != cMap.end ();
+ for (ClassMap::iterator cIter = cMap.begin();
+ cIter != cMap.end();
cIter++)
{
- if (cIter->second->hasSchema ())
+ if (cIter->second.hasSchema())
{
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'q', sequence);
- EncodeClassIndication (outBuffer, pIter, cIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ EncodeHeader(outBuffer, 'q', sequence);
+ EncodeClassIndication(outBuffer, pIter, cIter);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ SendBuffer(outBuffer, outLen, dExchange, replyToKey);
}
}
}
-
- sendCommandComplete (replyToKey, sequence);
+ sendCommandComplete(replyToKey, sequence);
}
void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t)
@@ -551,9 +575,7 @@ void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, ui
outBuffer.reset ();
SendBuffer (outBuffer, outLen, dExchange, replyToKey);
- SchemaClass* newSchema = new SchemaClass;
- newSchema->pendingSequence = sequence;
- pIter->second[key] = newSchema;
+ pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(sequence)));
}
}
@@ -569,7 +591,7 @@ void ManagementBroker::SchemaClass::appendSchema(Buffer& buf)
buf.putRawData(buffer, bufferLen);
}
-void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+void ManagementBroker::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, uint32_t sequence)
{
string packageName;
SchemaClassKey key;
@@ -578,33 +600,33 @@ void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKe
inBuffer.getShortString (key.name);
inBuffer.getBin128 (key.hash);
- PackageMap::iterator pIter = packages.find (packageName);
+ PackageMap::iterator pIter = packages.find(packageName);
if (pIter != packages.end()) {
ClassMap cMap = pIter->second;
- ClassMap::iterator cIter = cMap.find (key);
+ ClassMap::iterator cIter = cMap.find(key);
if (cIter != cMap.end()) {
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- SchemaClass* classInfo = cIter->second;
+ SchemaClass& classInfo = cIter->second;
- if (classInfo->hasSchema()) {
+ if (classInfo.hasSchema()) {
EncodeHeader(outBuffer, 's', sequence);
- classInfo->appendSchema (outBuffer);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ classInfo.appendSchema(outBuffer);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ SendBuffer(outBuffer, outLen, dExchange, replyToKey);
}
else
- sendCommandComplete (replyToKey, sequence, 1, "Schema not available");
+ sendCommandComplete(replyToKey, sequence, 1, "Schema not available");
}
else
- sendCommandComplete (replyToKey, sequence, 1, "Class key not found");
+ sendCommandComplete(replyToKey, sequence, 1, "Class key not found");
}
else
- sendCommandComplete (replyToKey, sequence, 1, "Package not found");
+ sendCommandComplete(replyToKey, sequence, 1, "Package not found");
}
-void ManagementBroker::handleSchemaResponseLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence)
+void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence)
{
string packageName;
SchemaClassKey key;
@@ -619,24 +641,26 @@ void ManagementBroker::handleSchemaResponseLH (Buffer& inBuffer, string /*replyT
if (pIter != packages.end()) {
ClassMap cMap = pIter->second;
ClassMap::iterator cIter = cMap.find(key);
- if (cIter != cMap.end() && cIter->second->pendingSequence == sequence) {
+ if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) {
size_t length = ValidateSchema(inBuffer);
- if (length == 0)
+ if (length == 0) {
+ QPID_LOG(warning, "Management Broker received invalid schema response: " << packageName << "." << key.name);
cMap.erase(key);
+ }
else {
- cIter->second->buffer = (uint8_t*) malloc(length);
- cIter->second->bufferLen = length;
- inBuffer.getRawData(cIter->second->buffer, cIter->second->bufferLen);
+ cIter->second.buffer = (uint8_t*) malloc(length);
+ cIter->second.bufferLen = length;
+ inBuffer.getRawData(cIter->second.buffer, cIter->second.bufferLen);
// Publish a class-indication message
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'q');
- EncodeClassIndication (outBuffer, pIter, cIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema");
+ EncodeHeader(outBuffer, 'q');
+ EncodeClassIndication(outBuffer, pIter, cIter);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ SendBuffer(outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema");
}
}
}
@@ -671,14 +695,14 @@ uint32_t ManagementBroker::assignBankLH (uint32_t requestedBank)
void ManagementBroker::deleteOrphanedAgentsLH()
{
- vector<uint64_t> deleteList;
+ vector<ObjectId> deleteList;
for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) {
- uint64_t connectionRef = aIter->first;
+ ObjectId connectionRef = aIter->first;
bool found = false;
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
iter++) {
if (iter->first == connectionRef && !iter->second->isDeleted()) {
found = true;
@@ -692,10 +716,8 @@ void ManagementBroker::deleteOrphanedAgentsLH()
}
}
- for (vector<uint64_t>::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) {
-
+ for (vector<ObjectId>::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++)
remoteAgents.erase(*dIter);
- }
deleteList.clear();
}
@@ -705,7 +727,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
string label;
uint32_t requestedBank;
uint32_t assignedBank;
- uint64_t connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
+ ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
Uuid systemId;
moveNewObjectsLH();
@@ -741,6 +763,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
uint32_t outLen;
EncodeHeader (outBuffer, 'a', sequence);
+ outBuffer.putLong (brokerBank);
outBuffer.putLong (assignedBank);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
@@ -786,13 +809,77 @@ void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, ui
sendCommandComplete (replyToKey, sequence);
}
-void ManagementBroker::dispatchAgentCommandLH (Message& msg)
+bool ManagementBroker::authorizeAgentMessageLH(Message& msg)
{
Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
uint8_t opcode;
uint32_t sequence;
string replyToKey;
+ if (msg.encodedSize() > MA_BUFFER_SIZE)
+ return false;
+
+ msg.encodeContent(inBuffer);
+ inBuffer.reset();
+
+ if (!CheckHeader(inBuffer, &opcode, &sequence))
+ return false;
+
+ if (opcode == 'M') {
+ // TODO: check method call against ACL list.
+ AclModule* acl = broker->getAcl();
+ if (acl == 0)
+ return true;
+
+ string userId = ((const broker::ConnectionState*) msg.getPublisher())->getUserId();
+ string packageName;
+ string className;
+ uint8_t hash[16];
+ string methodName;
+
+ std::map<acl::Property, string> params;
+ ObjectId objId(inBuffer);
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(className);
+ inBuffer.getBin128(hash);
+ inBuffer.getShortString(methodName);
+
+ params[acl::SCHEMAPACKAGE] = packageName;
+ params[acl::SCHEMACLASS] = className;
+
+ if (acl->authorise(userId, acl::ACCESS, acl::METHOD, methodName, &params))
+ return true;
+
+ const framing::MessageProperties* p =
+ msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+ if (p && p->hasReplyTo()) {
+ const framing::ReplyTo& rt = p->getReplyTo();
+ replyToKey = rt.getRoutingKey();
+
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader(outBuffer, 'm', sequence);
+ outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
+ outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ }
+
+ return false;
+ }
+
+ return true;
+}
+
+void ManagementBroker::dispatchAgentCommandLH(Message& msg)
+{
+ Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE);
+ uint8_t opcode;
+ uint32_t sequence;
+ string replyToKey;
+
const framing::MessageProperties* p =
msg.getFrames().getHeaders()->get<framing::MessageProperties>();
if (p && p->hasReplyTo()) {
@@ -823,7 +910,7 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg)
else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence);
else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher());
else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher());
}
ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name)
@@ -834,7 +921,7 @@ ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std:
// No such package found, create a new map entry.
pair<PackageMap::iterator, bool> result =
- packages.insert (pair<string, ClassMap> (name, ClassMap ()));
+ packages.insert(pair<string, ClassMap>(name, ClassMap()));
QPID_LOG (debug, "ManagementBroker added package " << name);
// Publish a package-indication message
@@ -859,20 +946,18 @@ void ManagementBroker::AddClass(PackageMap::iterator pIter,
ClassMap& cMap = pIter->second;
key.name = className;
- memcpy (&key.hash, md5Sum, 16);
+ memcpy(&key.hash, md5Sum, 16);
- ClassMap::iterator cIter = cMap.find (key);
- if (cIter != cMap.end ())
+ ClassMap::iterator cIter = cMap.find(key);
+ if (cIter != cMap.end())
return;
// No such class found, create a new class with local information.
QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." <<
key.name);
- SchemaClass* classInfo = new SchemaClass;
- classInfo->writeSchemaCall = schemaCall;
- cMap[key] = classInfo;
- cIter = cMap.find (key);
+ cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall)));
+ cIter = cMap.find(key);
}
void ManagementBroker::EncodePackageIndication (Buffer& buf,
@@ -917,6 +1002,8 @@ size_t ManagementBroker::ValidateSchema(Buffer& inBuffer)
for (uint16_t idx = 0; idx < methCount; idx++) {
FieldTable ft;
ft.decode(inBuffer);
+ if (!ft.isSet("argCount"))
+ return 0;
int argCount = ft.getInt("argCount");
for (int mIdx = 0; mIdx < argCount; mIdx++) {
FieldTable aft;
@@ -924,10 +1011,41 @@ size_t ManagementBroker::ValidateSchema(Buffer& inBuffer)
}
}
- if (evntCount != 0)
- return 0;
+ for (uint16_t idx = 0; idx < evntCount; idx++) {
+ FieldTable ft;
+ ft.decode(inBuffer);
+ if (!ft.isSet("argCount"))
+ return 0;
+ int argCount = ft.getInt("argCount");
+ for (int mIdx = 0; mIdx < argCount; mIdx++) {
+ FieldTable aft;
+ aft.decode(inBuffer);
+ }
+ }
end = inBuffer.getPosition();
inBuffer.restore(); // restore original position
return end - start;
}
+
+Mutex& ManagementBroker::getMutex()
+{
+ return userLock;
+}
+
+Buffer* ManagementBroker::startEventLH()
+{
+ Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE));
+ EncodeHeader(*outBuffer, 'e');
+ outBuffer->putLongLong(uint64_t(Duration(now())));
+ return outBuffer;
+}
+
+void ManagementBroker::finishEventLH(Buffer* outBuffer)
+{
+ uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available();
+ outBuffer->reset();
+ SendBuffer(*outBuffer, outLen, mExchange, "mgmt.event");
+ delete outBuffer;
+}
+
diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.h b/qpid/cpp/src/qpid/management/ManagementBroker.h
index 151926f526..e3b5504752 100644
--- a/qpid/cpp/src/qpid/management/ManagementBroker.h
+++ b/qpid/cpp/src/qpid/management/ManagementBroker.h
@@ -47,7 +47,7 @@ class ManagementBroker : public ManagementAgent
ManagementBroker ();
virtual ~ManagementBroker ();
- void configure (std::string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize);
+ void configure (std::string dataDir, uint16_t interval, broker::Broker* broker, int threadPoolSize);
void setInterval (uint16_t _interval) { interval = _interval; }
void setExchange (broker::Exchange::shared_ptr mgmtExchange,
broker::Exchange::shared_ptr directExchange);
@@ -56,16 +56,15 @@ class ManagementBroker : public ManagementAgent
std::string className,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall);
- uint64_t addObject (ManagementObject* object,
- uint32_t persistId = 0,
- uint32_t persistBank = 4);
+ ObjectId addObject (ManagementObject* object,
+ uint64_t persistId = 0);
void clientAdded (void);
bool dispatchCommand (broker::Deliverable& msg,
const std::string& routingKey,
const framing::FieldTable* args);
// Stubs for remote management agent calls
- void init (std::string, uint16_t, uint16_t, bool) { assert(0); }
+ void init (std::string, uint16_t, uint16_t, bool, std::string) { assert(0); }
uint32_t pollCallbacks (uint32_t) { assert(0); return 0; }
int getSignalFd () { assert(0); return -1; }
@@ -88,7 +87,7 @@ class ManagementBroker : public ManagementAgent
{
uint32_t objIdBank;
std::string routingKey;
- uint64_t connectionRef;
+ ObjectId connectionRef;
Agent* mgmtObject;
ManagementObject* GetManagementObject (void) const { return mgmtObject; }
virtual ~RemoteAgent ();
@@ -97,7 +96,7 @@ class ManagementBroker : public ManagementAgent
// TODO: Eventually replace string with entire reply-to structure. reply-to
// currently assumes that the exchange is "amq.direct" even though it could
// in theory be specified differently.
- typedef std::map<uint64_t, RemoteAgent*> RemoteAgentMap;
+ typedef std::map<ObjectId, RemoteAgent*> RemoteAgentMap;
typedef std::vector<std::string> ReplyToVector;
// Storage for known schema classes:
@@ -133,12 +132,15 @@ class ManagementBroker : public ManagementAgent
size_t bufferLen;
uint8_t* buffer;
- SchemaClass () : writeSchemaCall(0), pendingSequence(0), bufferLen(0), buffer(0) {}
+ SchemaClass(uint32_t seq) :
+ writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {}
+ SchemaClass(ManagementObject::writeSchemaCall_t call) :
+ writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {}
bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); }
void appendSchema (framing::Buffer& buf);
};
- typedef std::map<SchemaClassKey, SchemaClass*, SchemaClassKeyComp> ClassMap;
+ typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
typedef std::map<std::string, ClassMap> PackageMap;
RemoteAgentMap remoteAgents;
@@ -157,10 +159,10 @@ class ManagementBroker : public ManagementAgent
broker::Exchange::shared_ptr dExchange;
std::string dataDir;
uint16_t interval;
- Manageable* broker;
+ broker::Broker* broker;
uint16_t bootSequence;
- uint32_t localBank;
uint32_t nextObjectId;
+ uint32_t brokerBank;
uint32_t nextRemoteBank;
uint32_t nextRequestSequence;
bool clientWasAdded;
@@ -168,6 +170,7 @@ class ManagementBroker : public ManagementAgent
# define MA_BUFFER_SIZE 65536
char inputBuffer[MA_BUFFER_SIZE];
char outputBuffer[MA_BUFFER_SIZE];
+ char eventBuffer[MA_BUFFER_SIZE];
void writeData ();
void PeriodicProcessing (void);
@@ -179,7 +182,8 @@ class ManagementBroker : public ManagementAgent
std::string routingKey);
void moveNewObjectsLH();
- void dispatchAgentCommandLH (broker::Message& msg);
+ bool authorizeAgentMessageLH(broker::Message& msg);
+ void dispatchAgentCommandLH(broker::Message& msg);
PackageMap::iterator FindOrAddPackageLH(std::string name);
void AddClass(PackageMap::iterator pIter,
@@ -206,9 +210,12 @@ class ManagementBroker : public ManagementAgent
void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const broker::ConnectionToken* connToken);
void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const broker::ConnectionToken* connToken);
size_t ValidateSchema(framing::Buffer&);
+ sys::Mutex& getMutex();
+ framing::Buffer* startEventLH();
+ void finishEventLH(framing::Buffer* outBuffer);
};
}}
diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp
index 74d9571d10..e0386ee057 100644
--- a/qpid/cpp/src/qpid/management/ManagementObject.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp
@@ -28,6 +28,62 @@ using namespace qpid::framing;
using namespace qpid::management;
using namespace qpid::sys;
+void AgentAttachment::setBanks(uint32_t broker, uint32_t bank)
+{
+ first =
+ ((uint64_t) (broker & 0x000fffff)) << 28 |
+ ((uint64_t) (bank & 0x0fffffff));
+}
+
+ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint32_t bank, uint64_t object)
+ : agent(0)
+{
+ first =
+ ((uint64_t) (flags & 0x0f)) << 60 |
+ ((uint64_t) (seq & 0x0fff)) << 48 |
+ ((uint64_t) (broker & 0x000fffff)) << 28 |
+ ((uint64_t) (bank & 0x0fffffff));
+ second = object;
+}
+
+ObjectId::ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq, uint64_t object)
+ : agent(_agent)
+{
+ first =
+ ((uint64_t) (flags & 0x0f)) << 60 |
+ ((uint64_t) (seq & 0x0fff)) << 48;
+ second = object;
+}
+
+bool ObjectId::operator==(const ObjectId &other) const
+{
+ uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL;
+
+ return first == otherFirst && second == other.second;
+}
+
+bool ObjectId::operator<(const ObjectId &other) const
+{
+ uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL;
+
+ return (first < otherFirst) || ((first == otherFirst) && (second < other.second));
+}
+
+void ObjectId::encode(framing::Buffer& buffer)
+{
+ if (agent == 0)
+ buffer.putLongLong(first);
+ else
+ buffer.putLongLong(first | agent->first);
+ buffer.putLongLong(second);
+}
+
+void ObjectId::decode(framing::Buffer& buffer)
+{
+ first = buffer.getLongLong();
+ second = buffer.getLongLong();
+}
+
int ManagementObject::nextThreadIndex = 0;
void ManagementObject::writeTimestamps (Buffer& buf)
@@ -38,10 +94,10 @@ void ManagementObject::writeTimestamps (Buffer& buf)
buf.putLongLong (uint64_t (Duration (now ())));
buf.putLongLong (createTime);
buf.putLongLong (destroyTime);
- buf.putLongLong (objectId);
+ objectId.encode(buf);
}
-void ManagementObject::setReference(uint64_t) {}
+void ManagementObject::setReference(ObjectId) {}
int ManagementObject::getThreadIndex() {
static __thread int thisIndex = -1;
@@ -54,3 +110,17 @@ int ManagementObject::getThreadIndex() {
return thisIndex;
}
+Mutex& ManagementObject::getMutex()
+{
+ return agent->getMutex();
+}
+
+Buffer* ManagementObject::startEventLH()
+{
+ return agent->startEventLH();
+}
+
+void ManagementObject::finishEventLH(Buffer* buf)
+{
+ agent->finishEventLH(buf);
+}
diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h
index 78d065aac2..1b809f5125 100644
--- a/qpid/cpp/src/qpid/management/ManagementObject.h
+++ b/qpid/cpp/src/qpid/management/ManagementObject.h
@@ -32,6 +32,34 @@ namespace management {
class Manageable;
class ManagementAgent;
+class ObjectId;
+
+
+class AgentAttachment {
+ friend class ObjectId;
+private:
+ uint64_t first;
+public:
+ AgentAttachment() : first(0) {}
+ void setBanks(uint32_t broker, uint32_t bank);
+};
+
+
+class ObjectId {
+private:
+ const AgentAttachment* agent;
+ uint64_t first;
+ uint64_t second;
+public:
+ ObjectId() : agent(0), first(0), second(0) {}
+ ObjectId(framing::Buffer& buf) : agent(0) { decode(buf); }
+ ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint32_t bank, uint64_t object);
+ ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq, uint64_t object);
+ bool operator==(const ObjectId &other) const;
+ bool operator<(const ObjectId &other) const;
+ void encode(framing::Buffer& buffer);
+ void decode(framing::Buffer& buffer);
+};
class ManagementObject
{
@@ -39,7 +67,7 @@ class ManagementObject
uint64_t createTime;
uint64_t destroyTime;
- uint64_t objectId;
+ ObjectId objectId;
bool configChanged;
bool instChanged;
bool deleted;
@@ -84,11 +112,15 @@ class ManagementObject
int getThreadIndex();
void writeTimestamps (qpid::framing::Buffer& buf);
+ sys::Mutex& getMutex();
+ framing::Buffer* startEventLH();
+ void finishEventLH(framing::Buffer* buf);
+
public:
typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&);
ManagementObject (ManagementAgent* _agent, Manageable* _core) :
- destroyTime(0), objectId (0), configChanged(true),
+ destroyTime(0), configChanged(true),
instChanged(true), deleted(false), coreObject(_core), agent(_agent)
{ createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); }
virtual ~ManagementObject () {}
@@ -100,14 +132,14 @@ class ManagementObject
virtual void doMethod (std::string methodName,
qpid::framing::Buffer& inBuf,
qpid::framing::Buffer& outBuf) = 0;
- virtual void setReference (uint64_t objectId);
+ virtual void setReference (ObjectId objectId);
virtual std::string& getClassName (void) = 0;
virtual std::string& getPackageName (void) = 0;
virtual uint8_t* getMd5Sum (void) = 0;
- void setObjectId (uint64_t oid) { objectId = oid; }
- uint64_t getObjectId (void) { return objectId; }
+ void setObjectId (ObjectId oid) { objectId = oid; }
+ ObjectId getObjectId (void) { return objectId; }
inline bool getConfigChanged (void) { return configChanged; }
virtual bool getInstChanged (void) { return instChanged; }
inline void setAllChanged (void) {
@@ -120,10 +152,9 @@ class ManagementObject
deleted = true;
}
inline bool isDeleted (void) { return deleted; }
- inline sys::Mutex& getLock() { return accessLock; }
};
-typedef std::map<uint64_t,ManagementObject*> ManagementObjectMap;
+typedef std::map<ObjectId, ManagementObject*> ManagementObjectMap;
}}
diff --git a/qpid/python/qpid/management.py b/qpid/python/qpid/management.py
index 83c29a78a5..99c902ab30 100644
--- a/qpid/python/qpid/management.py
+++ b/qpid/python/qpid/management.py
@@ -69,6 +69,53 @@ class mgmtObject (object):
for cell in row:
setattr (self, cell[0], cell[1])
+class objectId(object):
+ """ Object that represents QMF object identifiers """
+
+ def __init__(self, codec):
+ self.first = codec.read_uint64()
+ self.second = codec.read_uint64()
+
+ def __cmp__(self, other):
+ if other == None:
+ return 1
+ if self.first < other.first:
+ return -1
+ if self.first > other.first:
+ return 1
+ if self.second < other.second:
+ return -1
+ if self.second > other.second:
+ return 1
+ return 0
+
+
+ def index(self):
+ return (self.first, self.second)
+
+ def getFlags(self):
+ return (self.first & 0xF000000000000000) >> 60
+
+ def getSequence(self):
+ return (self.first & 0x0FFF000000000000) >> 48
+
+ def getBroker(self):
+ return (self.first & 0x0000FFFFF0000000) >> 28
+
+ def getBank(self):
+ return self.first & 0x000000000FFFFFFF
+
+ def getObject(self):
+ return self.second
+
+ def isDurable(self):
+ return self.getSequence() == 0
+
+ def encode(self, codec):
+ codec.write_uint64(self.first)
+ codec.write_uint64(self.second)
+
+
class methodResult:
""" Object that contains the result of a method call """
@@ -308,6 +355,8 @@ class managementClient:
self.handleClassInd (ch, codec)
elif hdr[0] == 'h':
self.handleHeartbeat (ch, codec)
+ elif hdr[0] == 'e':
+ self.handleEvent (ch, codec)
else:
self.parse (ch, codec, hdr[0], hdr[1])
ch.accept(msg)
@@ -386,7 +435,7 @@ class managementClient:
elif typecode == 9: # DELTATIME
codec.write_uint64 (long (value))
elif typecode == 10: # REF
- codec.write_uint64 (long (value))
+ value.encode(codec)
elif typecode == 11: # BOOL
codec.write_uint8 (int (value))
elif typecode == 12: # FLOAT
@@ -429,7 +478,7 @@ class managementClient:
elif typecode == 9: # DELTATIME
data = codec.read_uint64 ()
elif typecode == 10: # REF
- data = codec.read_uint64 ()
+ data = objectId(codec)
elif typecode == 11: # BOOL
data = codec.read_uint8 ()
elif typecode == 12: # FLOAT
@@ -551,9 +600,9 @@ class managementClient:
ch.send ("qpid.management", smsg)
def handleClassInd (self, ch, codec):
- pname = str (codec.read_str8 ())
- cname = str (codec.read_str8 ())
- hash = codec.read_bin128 ()
+ pname = str (codec.read_str8())
+ cname = str (codec.read_str8())
+ hash = codec.read_bin128()
if pname not in self.packages:
return
@@ -574,6 +623,32 @@ class managementClient:
if self.ctrlCb != None:
self.ctrlCb (ch.context, self.CTRL_HEARTBEAT, timestamp)
+ def handleEvent (self, ch, codec):
+ if self.eventCb == None:
+ return
+ timestamp = codec.read_uint64()
+ objId = objectId(codec)
+ packageName = str(codec.read_str8())
+ className = str(codec.read_str8())
+ hash = codec.read_bin128()
+ name = str(codec.read_str8())
+ classKey = (packageName, className, hash)
+ if classKey not in self.schema:
+ return;
+ schemaClass = self.schema[classKey]
+ row = []
+ es = schemaClass['E']
+ arglist = None
+ for ename in es:
+ (edesc, eargs) = es[ename]
+ if ename == name:
+ arglist = eargs
+ if arglist == None:
+ return
+ for arg in arglist:
+ row.append((arg[0], self.decodeValue(codec, arg[1])))
+ self.eventCb(ch.context, classKey, objId, name, row)
+
def parseSchema (self, ch, codec):
""" Parse a received schema-description message. """
self.decOutstanding (ch)
@@ -597,22 +672,23 @@ class managementClient:
configs = []
insts = []
methods = {}
- events = []
+ events = {}
configs.append (("id", 4, "", "", 1, 1, None, None, None, None, None))
insts.append (("id", 4, None, None))
for idx in range (configCount):
ft = codec.read_map ()
- name = str (ft["name"])
- type = ft["type"]
- access = ft["access"]
- index = ft["index"]
- unit = None
- min = None
- max = None
- maxlen = None
- desc = None
+ name = str (ft["name"])
+ type = ft["type"]
+ access = ft["access"]
+ index = ft["index"]
+ optional = ft["optional"]
+ unit = None
+ min = None
+ max = None
+ maxlen = None
+ desc = None
for key, value in ft.items ():
if key == "unit":
@@ -626,7 +702,7 @@ class managementClient:
elif key == "desc":
desc = str (value)
- config = (name, type, unit, desc, access, index, min, max, maxlen)
+ config = (name, type, unit, desc, access, index, min, max, maxlen, optional)
configs.append (config)
for idx in range (instCount):
@@ -685,6 +761,33 @@ class managementClient:
args.append (arg)
methods[mname] = (mdesc, args)
+ for idx in range (eventCount):
+ ft = codec.read_map ()
+ ename = str (ft["name"])
+ argCount = ft["argCount"]
+ if "desc" in ft:
+ edesc = str (ft["desc"])
+ else:
+ edesc = None
+
+ args = []
+ for aidx in range (argCount):
+ ft = codec.read_map ()
+ name = str (ft["name"])
+ type = ft["type"]
+ unit = None
+ desc = None
+
+ for key, value in ft.items ():
+ if key == "unit":
+ unit = str (value)
+ elif key == "desc":
+ desc = str (value)
+
+ arg = (name, type, unit, desc)
+ args.append (arg)
+ events[ename] = (edesc, args)
+
schemaClass = {}
schemaClass['C'] = configs
schemaClass['I'] = insts
@@ -695,6 +798,22 @@ class managementClient:
if self.schemaCb != None:
self.schemaCb (ch.context, classKey, configs, insts, methods, events)
+ def parsePresenceMasks(self, codec, schemaClass):
+ """ Generate a list of not-present properties """
+ excludeList = []
+ bit = 0
+ for element in schemaClass['C'][1:]:
+ if element[9] == 1:
+ if bit == 0:
+ mask = codec.read_uint8()
+ bit = 1
+ if (mask & bit) == 0:
+ excludeList.append(element[0])
+ bit = bit * 2
+ if bit == 256:
+ bit = 0
+ return excludeList
+
def parseContent (self, ch, cls, codec, seq=0):
""" Parse a received content message. """
if (cls == 'C' or (cls == 'B' and seq == 0)) and self.configCb == None:
@@ -716,21 +835,26 @@ class managementClient:
timestamps.append (codec.read_uint64 ()) # Current Time
timestamps.append (codec.read_uint64 ()) # Create Time
timestamps.append (codec.read_uint64 ()) # Delete Time
-
+ objId = objectId(codec)
schemaClass = self.schema[classKey]
if cls == 'C' or cls == 'B':
- for element in schemaClass['C'][:]:
+ notPresent = self.parsePresenceMasks(codec, schemaClass)
+
+ if cls == 'C' or cls == 'B':
+ row.append(("id", objId))
+ for element in schemaClass['C'][1:]:
tc = element[1]
name = element[0]
- data = self.decodeValue (codec, tc)
- row.append ((name, data))
+ if name in notPresent:
+ row.append((name, None))
+ else:
+ data = self.decodeValue(codec, tc)
+ row.append((name, data))
if cls == 'I' or cls == 'B':
- if cls == 'B':
- start = 1
- else:
- start = 0
- for element in schemaClass['I'][start:]:
+ if cls == 'I':
+ row.append(("id", objId))
+ for element in schemaClass['I'][1:]:
tc = element[1]
name = element[0]
data = self.decodeValue (codec, tc)
@@ -763,9 +887,12 @@ class managementClient:
codec = Codec (self.spec)
sequence = self.seqMgr.reserve ((userSequence, classId, methodName))
self.setHeader (codec, ord ('M'), sequence)
- codec.write_uint64 (objId) # ID of object
+ objId.encode(codec)
+ codec.write_str8 (classId[0])
+ codec.write_str8 (classId[1])
+ codec.write_bin128 (classId[2])
codec.write_str8 (methodName)
- bank = (objId & 0x0000FFFFFF000000) >> 24
+ bank = objId.getBank()
# Encode args according to schema
if classId not in self.schema:
diff --git a/qpid/python/qpid/managementdata.py b/qpid/python/qpid/managementdata.py
index fc9eb391b7..5a8b9cdf9d 100644
--- a/qpid/python/qpid/managementdata.py
+++ b/qpid/python/qpid/managementdata.py
@@ -71,14 +71,14 @@ class ManagementData:
#
def registerObjId (self, objId):
- if not objId in self.idBackMap:
- self.idBackMap[objId] = self.nextId
+ if not objId.index() in self.idBackMap:
+ self.idBackMap[objId.index()] = self.nextId
self.idMap[self.nextId] = objId
self.nextId += 1
- def displayObjId (self, objId):
- if objId in self.idBackMap:
- return self.idBackMap[objId]
+ def displayObjId (self, objIdIndex):
+ if objIdIndex in self.idBackMap:
+ return self.idBackMap[objIdIndex]
else:
return 0
@@ -86,7 +86,7 @@ class ManagementData:
if displayId in self.idMap:
return self.idMap[displayId]
else:
- return 0
+ return None
def displayClassName (self, cls):
(packageName, className, hash) = cls
@@ -102,19 +102,20 @@ class ManagementData:
self.tables[className] = {}
# Register the ID so a more friendly presentation can be displayed
- id = long (list[0][1])
- self.registerObjId (id)
+ objId = list[0][1]
+ oidx = objId.index()
+ self.registerObjId (objId)
# If this object hasn't been seen before, create a new object record with
# the timestamps and empty lists for configuration and instrumentation data.
- if id not in self.tables[className]:
- self.tables[className][id] = (timestamps, [], [])
+ if oidx not in self.tables[className]:
+ self.tables[className][oidx] = (timestamps, [], [])
- (unused, oldConf, oldInst) = self.tables[className][id]
+ (unused, oldConf, oldInst) = self.tables[className][oidx]
# For config updates, simply replace old config list with the new one.
if context == 0: #config
- self.tables[className][id] = (timestamps, list, oldInst)
+ self.tables[className][oidx] = (timestamps, list, oldInst)
# For instrumentation updates, carry the minimum and maximum values for
# "hi-lo" stats forward.
@@ -132,7 +133,7 @@ class ManagementData:
if oldInst[idx][1] < value:
value = oldInst[idx][1]
newInst.append ((key, value))
- self.tables[className][id] = (timestamps, oldConf, newInst)
+ self.tables[className][oidx] = (timestamps, oldConf, newInst)
finally:
self.lock.release ()
@@ -211,11 +212,13 @@ class ManagementData:
pass
def refName (self, oid):
- if oid == 0:
+ if oid == None:
return "NULL"
- return str (self.displayObjId (oid))
+ return str (self.displayObjId (oid.index()))
def valueDisplay (self, classKey, key, value):
+ if value == None:
+ return "<NULL>"
for kind in range (2):
schema = self.schema[classKey][kind]
for item in schema:
@@ -437,7 +440,7 @@ class ManagementData:
if classKey in self.tables:
ids = self.listOfIds(classKey, tokens[1:])
for objId in ids:
- (ts, config, inst) = self.tables[classKey][self.rawObjId(objId)]
+ (ts, config, inst) = self.tables[classKey][self.rawObjId(objId).index()]
createTime = self.disp.timestamp (ts[1])
destroyTime = "-"
if ts[2] > 0:
@@ -486,32 +489,32 @@ class ManagementData:
rows = []
timestamp = None
- config = self.tables[classKey][ids[0]][1]
+ config = self.tables[classKey][ids[0].index()][1]
for eIdx in range (len (config)):
key = config[eIdx][0]
if key != "id":
row = ("property", key)
for id in ids:
if timestamp == None or \
- timestamp < self.tables[classKey][id][0][0]:
- timestamp = self.tables[classKey][id][0][0]
- (key, value) = self.tables[classKey][id][1][eIdx]
+ timestamp < self.tables[classKey][id.index()][0][0]:
+ timestamp = self.tables[classKey][id.index()][0][0]
+ (key, value) = self.tables[classKey][id.index()][1][eIdx]
row = row + (self.valueDisplay (classKey, key, value),)
rows.append (row)
- inst = self.tables[classKey][ids[0]][2]
+ inst = self.tables[classKey][ids[0].index()][2]
for eIdx in range (len (inst)):
key = inst[eIdx][0]
if key != "id":
row = ("statistic", key)
for id in ids:
- (key, value) = self.tables[classKey][id][2][eIdx]
+ (key, value) = self.tables[classKey][id.index()][2][eIdx]
row = row + (self.valueDisplay (classKey, key, value),)
rows.append (row)
titleRow = ("Type", "Element")
for id in ids:
- titleRow = titleRow + (self.refName (id),)
+ titleRow = titleRow + (self.refName(id),)
caption = "Object of type %s.%s:" % (classKey[0], classKey[1])
if timestamp != None:
caption = caption + " (last sample time: " + self.disp.timestamp (timestamp) + ")"
@@ -563,13 +566,15 @@ class ManagementData:
access = self.accessName (config[4])
extra = ""
if config[5] == 1:
- extra = extra + "index "
+ extra += "index "
if config[6] != None:
- extra = extra + "Min: " + str (config[6])
+ extra += "Min: " + str(config[6]) + " "
if config[7] != None:
- extra = extra + "Max: " + str (config[7])
+ extra += "Max: " + str(config[7]) + " "
if config[8] != None:
- extra = extra + "MaxLen: " + str (config[8])
+ extra += "MaxLen: " + str(config[8]) + " "
+ if config[9] == 1:
+ extra += "optional "
rows.append ((name, typename, unit, access, extra, desc))
for config in self.schema[classKey][1]:
@@ -613,7 +618,7 @@ class ManagementData:
def getClassForId (self, objId):
""" Given an object ID, return the class key for the referenced object """
for classKey in self.tables:
- if objId in self.tables[classKey]:
+ if objId.index() in self.tables[classKey]:
return classKey
return None
@@ -659,14 +664,19 @@ class ManagementData:
def makeIdRow (self, displayId):
if displayId in self.idMap:
- rawId = self.idMap[displayId]
+ objId = self.idMap[displayId]
else:
return None
- return (displayId,
- rawId,
- (rawId & 0x7FFF000000000000) >> 48,
- (rawId & 0x0000FFFFFF000000) >> 24,
- (rawId & 0x0000000000FFFFFF))
+ if objId.getFlags() == 0:
+ flags = ""
+ else:
+ flags = str(objId.getFlags())
+ seq = objId.getSequence()
+ if seq == 0:
+ seqText = "<durable>"
+ else:
+ seqText = str(seq)
+ return (displayId, flags, seqText, objId.getBroker(), objId.getBank(), hex(objId.getObject()))
def listIds (self, select):
rows = []
@@ -683,7 +693,7 @@ class ManagementData:
return
rows.append(row)
self.disp.table("Translation of Display IDs:",
- ("DisplayID", "RawID", "BootSequence", "Bank", "Object"),
+ ("DisplayID", "Flags", "BootSequence", "Broker", "Bank", "Object"),
rows)
def do_list (self, data):
diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml
index 850f9c62e6..5952595997 100644
--- a/qpid/specs/management-schema.xml
+++ b/qpid/specs/management-schema.xml
@@ -61,18 +61,16 @@
===============================================================
-->
<class name="Broker">
- <property name="systemRef" type="objId" references="System" access="RC" index="y" desc="System ID" parentRef="y"/>
- <property name="port" type="uint16" access="RC" index="y" desc="TCP Port for AMQP Service"/>
- <property name="workerThreads" type="uint16" access="RO" desc="Thread pool size"/>
- <property name="maxConns" type="uint16" access="RO" desc="Maximum allowed connections"/>
- <property name="connBacklog" type="uint16" access="RO" desc="Connection backlog limit for listening socket"/>
- <property name="stagingThreshold" type="uint32" access="RO" desc="Broker stages messages over this size to disk"/>
- <property name="mgmtPubInterval" type="uint16" access="RW" unit="second" min="1" desc="Interval for management broadcasts"/>
- <property name="clusterName" type="sstr" access="RO"
- desc="Name of cluster this server is a member of"/>
- <property name="version" type="sstr" access="RO" desc="Running software version"/>
- <property name="dataDirEnabled" type="bool" access="RO" desc="Persistent configuration storage enabled"/>
- <property name="dataDir" type="sstr" access="RO" desc="Persistent configuration storage location"/>
+ <property name="systemRef" type="objId" references="System" access="RC" index="y" desc="System ID" parentRef="y"/>
+ <property name="port" type="uint16" access="RC" index="y" desc="TCP Port for AMQP Service"/>
+ <property name="workerThreads" type="uint16" access="RO" desc="Thread pool size"/>
+ <property name="maxConns" type="uint16" access="RO" desc="Maximum allowed connections"/>
+ <property name="connBacklog" type="uint16" access="RO" desc="Connection backlog limit for listening socket"/>
+ <property name="stagingThreshold" type="uint32" access="RO" desc="Broker stages messages over this size to disk"/>
+ <property name="mgmtPubInterval" type="uint16" access="RW" unit="second" min="1" desc="Interval for management broadcasts"/>
+ <property name="clusterName" type="sstr" access="RO" desc="Name of cluster this server is a member of"/>
+ <property name="version" type="sstr" access="RO" desc="Running software version"/>
+ <property name="dataDir" type="sstr" access="RO" optional="y" desc="Persistent configuration storage location"/>
<method name="joinCluster">
<arg name="clusterName" dir="I" type="sstr"/>
@@ -94,6 +92,17 @@
<arg name="username" dir="I" type="sstr"/>
<arg name="password" dir="I" type="sstr"/>
</method>
+
+ <event name="agentConnect" desc="QMF Management Agent has connected to the broker">
+ <arg name="remoteAddress" type="sstr"/>
+ <arg name="label" type="sstr"/>
+ <arg name="brokerBank" type="uint32"/>
+ <arg name="agentBank" type="uint32"/>
+ </event>
+
+ <event name="agentDisconnect" desc="QMF Management Agent has disconnected from the broker">
+ <arg name="remoteAddress" type="sstr"/>
+ </event>
</class>
<!--
diff --git a/qpid/specs/management-types.xml b/qpid/specs/management-types.xml
index 309c94c98b..1d3e9f900b 100644
--- a/qpid/specs/management-types.xml
+++ b/qpid/specs/management-types.xml
@@ -19,7 +19,7 @@
under the License.
-->
-<type name="objId" base="REF" cpp="uint64_t" encode="@.putLongLong(#)" decode="# = @.getLongLong()" accessor="direct" init="0"/>
+<type name="objId" base="REF" cpp="ObjectId" encode="#.encode(@)" decode="#.decode(@)" accessor="direct" init="0"/>
<type name="uint8" base="U8" cpp="uint8_t" encode="@.putOctet(#)" decode="# = @.getOctet()" accessor="direct" init="0"/>
<type name="uint16" base="U16" cpp="uint16_t" encode="@.putShort(#)" decode="# = @.getShort()" accessor="direct" init="0"/>
<type name="uint32" base="U32" cpp="uint32_t" encode="@.putLong(#)" decode="# = @.getLong()" accessor="direct" init="0"/>