diff options
author | Ted Ross <tross@apache.org> | 2008-09-03 18:01:44 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-09-03 18:01:44 +0000 |
commit | bf45f1241b9f801b55ede16d77c3dbbe505f0f89 (patch) | |
tree | 419d953ae460ce8e3a7607c9e749baf3c2829e6c | |
parent | 0da3229c29d5948c3a48631b83a2484dc349a974 (diff) | |
download | qpid-python-bf45f1241b9f801b55ede16d77c3dbbe505f0f89.tar.gz |
QPID-1174 Updates to the management framework
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@691700 13f79535-47bb-0310-9956-ffa450edef68
25 files changed, 1315 insertions, 496 deletions
diff --git a/qpid/cpp/examples/qmf-agent/example.cpp b/qpid/cpp/examples/qmf-agent/example.cpp index 35ea97d4c0..c8d63a62b9 100644 --- a/qpid/cpp/examples/qmf-agent/example.cpp +++ b/qpid/cpp/examples/qmf-agent/example.cpp @@ -89,9 +89,10 @@ public: CoreClass::CoreClass(ManagementAgent* _agent, string _name) : name(_name), agent(_agent) { + static uint64_t persistId = 0x111222333444555LL; mgmtObject = new Parent(agent, this, name); - agent->addObject(mgmtObject); + agent->addObject(mgmtObject, persistId++); mgmtObject->set_state("IDLE"); } @@ -128,6 +129,8 @@ Manageable::status_t CoreClass::ManagementMethod(uint32_t methodId, Args& args) children.push_back(child); + mgmtObject->event_childCreated(ioArgs.i_name); + return STATUS_OK; } @@ -145,7 +148,8 @@ ChildClass::ChildClass(ManagementAgent* agent, CoreClass* parent, string name) //============================================================== // Main program //============================================================== -int main(int argc, char** argv) { +int main_int(int argc, char** argv) +{ ManagementAgent::Singleton singleton; const char* host = argc>1 ? argv[1] : "127.0.0.1"; int port = argc>2 ? atoi(argv[2]) : 5672; @@ -158,7 +162,7 @@ int main(int argc, char** argv) { // Start the agent. It will attempt to make a connection to the // management broker - agent->init(string(host), port); + agent->init(string(host), port, 5, false, ".magentdata"); // Allocate some core objects CoreClass core1(agent, "Example Core Object #1"); @@ -168,4 +172,12 @@ int main(int argc, char** argv) { core1.doLoop(); } +int main(int argc, char** argv) +{ + try { + return main_int(argc, argv); + } catch(std::exception& e) { + cout << "Top Level Exception: " << e.what() << endl; + } +} diff --git a/qpid/cpp/examples/qmf-agent/schema.xml b/qpid/cpp/examples/qmf-agent/schema.xml index de8776c818..b0f25fcbed 100644 --- a/qpid/cpp/examples/qmf-agent/schema.xml +++ b/qpid/cpp/examples/qmf-agent/schema.xml @@ -37,6 +37,14 @@ <arg name="name" dir="I" type="sstr"/> <arg name="childRef" dir="O" type="objId"/> </method> + + <event name="childCreated"> + <arg name="name" type="sstr"/> + </event> + + <event name="childDestroyed"> + <arg name="name" type="sstr"/> + </event> </class> diff --git a/qpid/cpp/managementgen/schema.py b/qpid/cpp/managementgen/schema.py index 2ee61fff80..f911c28db3 100755 --- a/qpid/cpp/managementgen/schema.py +++ b/qpid/cpp/managementgen/schema.py @@ -79,7 +79,7 @@ class SchemaType: def getName (self): return self.name - def genAccessor (self, stream, varName, changeFlag = None): + def genAccessor (self, stream, varName, changeFlag = None, optional = False): if self.perThread: prefix = "getThreadStats()->" if self.style == "wm": @@ -87,11 +87,13 @@ class SchemaType: else: prefix = "" if self.accessor == "direct": - stream.write (" inline void set_" + varName + " (" + self.cpp + " val){\n"); + stream.write (" inline void set_" + varName + " (" + self.cpp + " val) {\n"); if not self.perThread: stream.write (" sys::Mutex::ScopedLock mutex(accessLock);\n") if self.style != "mma": - stream.write (" " + prefix + varName + " = val;\n"); + stream.write (" " + prefix + varName + " = val;\n") + if optional: + stream.write (" presenceMask[presenceByte_%s] |= presenceMask_%s;\n" % (varName, varName)) if self.style == "wm": stream.write (" if (" + varName + "Low > val)\n") stream.write (" " + varName + "Low = val;\n") @@ -106,9 +108,24 @@ class SchemaType: stream.write (" " + prefix + varName + "Max = val;\n") if changeFlag != None: stream.write (" " + changeFlag + " = true;\n") - stream.write (" }\n"); + stream.write (" }\n") + if self.style != "mma": + stream.write (" inline " + self.cpp + "& get_" + varName + "() {\n"); + if not self.perThread: + stream.write (" sys::Mutex::ScopedLock mutex(accessLock);\n") + stream.write (" return " + prefix + varName + ";\n") + stream.write (" }\n") + if optional: + stream.write (" inline void clr_" + varName + "() {\n") + stream.write (" presenceMask[presenceByte_%s] &= ~presenceMask_%s;\n" % (varName, varName)) + if changeFlag != None: + stream.write (" " + changeFlag + " = true;\n") + stream.write (" }\n") + stream.write (" inline bool isSet_" + varName + "() {\n") + stream.write (" return presenceMask[presenceByte_%s] & presenceMask_%s != 0;\n" % (varName, varName)) + stream.write (" }\n") elif self.accessor == "counter": - stream.write (" inline void inc_" + varName + " (" + self.cpp + " by = 1){\n"); + stream.write (" inline void inc_" + varName + " (" + self.cpp + " by = 1) {\n"); if not self.perThread: stream.write (" sys::Mutex::ScopedLock mutex(accessLock);\n") stream.write (" " + prefix + varName + " += by;\n") @@ -118,7 +135,7 @@ class SchemaType: if changeFlag != None: stream.write (" " + changeFlag + " = true;\n") stream.write (" }\n"); - stream.write (" inline void dec_" + varName + " (" + self.cpp + " by = 1){\n"); + stream.write (" inline void dec_" + varName + " (" + self.cpp + " by = 1) {\n"); if not self.perThread: stream.write (" sys::Mutex::ScopedLock mutex(accessLock);\n") stream.write (" " + prefix + varName + " -= by;\n") @@ -146,22 +163,22 @@ class SchemaType: stream.write (" threadStats->" + varName + "Min = std::numeric_limits<" + cpptype + ">::max();\n") stream.write (" threadStats->" + varName + "Max = std::numeric_limits<" + cpptype + ">::min();\n") - def genWrite (self, stream, varName): + def genWrite (self, stream, varName, indent=" "): if self.style != "mma": - stream.write (" " + self.encode.replace ("@", "buf").replace ("#", varName) + ";\n") + stream.write (indent + self.encode.replace ("@", "buf").replace ("#", varName) + ";\n") if self.style == "wm": - stream.write (" " + self.encode.replace ("@", "buf") \ + stream.write (indent + self.encode.replace ("@", "buf") \ .replace ("#", varName + "High") + ";\n") - stream.write (" " + self.encode.replace ("@", "buf") \ + stream.write (indent + self.encode.replace ("@", "buf") \ .replace ("#", varName + "Low") + ";\n") if self.style == "mma": - stream.write (" " + self.encode.replace ("@", "buf") \ + stream.write (indent + self.encode.replace ("@", "buf") \ .replace ("#", varName + "Count") + ";\n") - stream.write (" " + self.encode.replace ("@", "buf") \ + stream.write (indent + self.encode.replace ("@", "buf") \ .replace ("#", varName + "Count ? " + varName + "Min : 0") + ";\n") - stream.write (" " + self.encode.replace ("@", "buf") \ + stream.write (indent + self.encode.replace ("@", "buf") \ .replace ("#", varName + "Max") + ";\n") - stream.write (" " + self.encode.replace ("@", "buf") \ + stream.write (indent + self.encode.replace ("@", "buf") \ .replace ("#", varName + "Count ? " + varName + "Total / " + varName + "Count : 0") + ";\n") @@ -207,7 +224,7 @@ class Type: #===================================================================================== # #===================================================================================== -class SchemaConfig: +class SchemaProperty: def __init__ (self, node, typespec): self.name = None self.type = None @@ -216,6 +233,7 @@ class SchemaConfig: self.isIndex = 0 self.isParentRef = 0 self.isGeneralRef = 0 + self.isOptional = 0 self.unit = None self.min = None self.max = None @@ -255,6 +273,11 @@ class SchemaConfig: raise ValueError ("Expected 'y' in isGeneralReference attribute") self.isGeneralRef = 1 + elif key == 'optional': + if val != 'y': + raise ValueError ("Expected 'y' in optional attribute") + self.isOptional = 1 + elif key == 'unit': self.unit = val @@ -273,6 +296,9 @@ class SchemaConfig: else: raise ValueError ("Unknown attribute in property '%s'" % key) + if self.access == "RC" and self.isOptional == 1: + raise ValueError ("Properties with ReadCreate access must not be optional (%s)" % self.name) + if self.name == None: raise ValueError ("Missing 'name' attribute in property") if self.type == None: @@ -289,18 +315,19 @@ class SchemaConfig: def genDeclaration (self, stream, prefix=" "): stream.write (prefix + self.type.type.cpp + " " + self.name + ";\n") - def genFormalParam (self, stream): + def genFormalParam (self, stream, variables): stream.write (self.type.type.cpp + " _" + self.name) def genAccessor (self, stream): - self.type.type.genAccessor (stream, self.name, "configChanged") + self.type.type.genAccessor (stream, self.name, "configChanged", self.isOptional == 1) def genSchema (self, stream): stream.write (" ft = FieldTable ();\n") - stream.write (" ft.setString (NAME, \"" + self.name + "\");\n") - stream.write (" ft.setInt (TYPE, TYPE_" + self.type.type.base +");\n") + stream.write (" ft.setString (NAME, \"" + self.name + "\");\n") + stream.write (" ft.setInt (TYPE, TYPE_" + self.type.type.base +");\n") stream.write (" ft.setInt (ACCESS, ACCESS_" + self.access + ");\n") - stream.write (" ft.setInt (INDEX, " + str (self.isIndex) + ");\n") + stream.write (" ft.setInt (INDEX, " + str (self.isIndex) + ");\n") + stream.write (" ft.setInt (OPTIONAL, " + str (self.isOptional) + ");\n") if self.unit != None: stream.write (" ft.setString (UNIT, \"" + self.unit + "\");\n") if self.min != None: @@ -314,13 +341,19 @@ class SchemaConfig: stream.write (" buf.put (ft);\n\n") def genWrite (self, stream): - self.type.type.genWrite (stream, self.name) + indent = " " + if self.isOptional: + stream.write(" if (presenceMask[presenceByte_%s] & presenceMask_%s) {\n" % (self.name, self.name)) + indent = " " + self.type.type.genWrite (stream, self.name, indent) + if self.isOptional: + stream.write(" }\n") #===================================================================================== # #===================================================================================== -class SchemaInst: +class SchemaStatistic: def __init__ (self, node, typespec): self.name = None self.type = None @@ -523,25 +556,30 @@ class SchemaArg: def getDir (self): return self.dir - def genSchema (self, stream): + def genSchema (self, stream, event=False): stream.write (" ft = FieldTable ();\n") stream.write (" ft.setString (NAME, \"" + self.name + "\");\n") stream.write (" ft.setInt (TYPE, TYPE_" + self.type.type.base +");\n") - stream.write (" ft.setString (DIR, \"" + self.dir + "\");\n") + if (not event): + stream.write (" ft.setString (DIR, \"" + self.dir + "\");\n") if self.unit != None: stream.write (" ft.setString (UNIT, \"" + self.unit + "\");\n") - if self.min != None: - stream.write (" ft.setInt (MIN, " + self.min + ");\n") - if self.max != None: - stream.write (" ft.setInt (MAX, " + self.max + ");\n") - if self.maxLen != None: - stream.write (" ft.setInt (MAXLEN, " + self.maxLen + ");\n") + if not event: + if self.min != None: + stream.write (" ft.setInt (MIN, " + self.min + ");\n") + if self.max != None: + stream.write (" ft.setInt (MAX, " + self.max + ");\n") + if self.maxLen != None: + stream.write (" ft.setInt (MAXLEN, " + self.maxLen + ");\n") + if self.default != None: + stream.write (" ft.setString (DEFAULT, \"" + self.default + "\");\n") if self.desc != None: stream.write (" ft.setString (DESC, \"" + self.desc + "\");\n") - if self.default != None: - stream.write (" ft.setString (DEFAULT, \"" + self.default + "\");\n") stream.write (" buf.put (ft);\n\n") + def genFormalParam (self, stream, variables): + stream.write ("%s _%s" % (self.type.type.cpp, self.name)) + #===================================================================================== # #===================================================================================== @@ -649,6 +687,50 @@ class SchemaEvent: def getArgCount (self): return len (self.args) + def genMethodBody (self, stream, variables, classObject): + stream.write("void ") + classObject.genNameCap(stream, variables) + stream.write("::event_%s(" % self.name) + count = 0 + for arg in self.args: + arg.genFormalParam(stream, variables) + count += 1 + if count < len(self.args): + stream.write(", ") + stream.write(") {\n") + stream.write(" sys::Mutex::ScopedLock mutex(getMutex());\n") + stream.write(" Buffer* buf = startEventLH();\n") + stream.write(" objectId.encode(*buf);\n") + stream.write(" buf->putShortString(packageName);\n") + stream.write(" buf->putShortString(className);\n") + stream.write(" buf->putBin128(md5Sum);\n") + stream.write(" buf->putShortString(\"%s\");\n" % self.name) + for arg in self.args: + stream.write(" %s;\n" % arg.type.type.encode.replace("@", "(*buf)").replace("#", "_" + arg.name)) + stream.write(" finishEventLH(buf);\n") + stream.write("}\n\n") + + def genMethodDecl (self, stream, variables): + stream.write(" void event_%s(" % self.name) + count = 0 + for arg in self.args: + arg.genFormalParam(stream, variables) + count += 1 + if count < len(self.args): + stream.write(", ") + stream.write(");\n") + + def genSchema(self, stream, variables): + stream.write (" ft = FieldTable ();\n") + stream.write (" ft.setString (NAME, \"" + self.name + "\");\n") + stream.write (" ft.setInt (ARGCOUNT, " + str (len (self.args)) + ");\n") + if self.desc != None: + stream.write (" ft.setString (DESC, \"" + self.desc + "\");\n") + stream.write (" buf.put (ft);\n\n") + for arg in self.args: + arg.genSchema (stream, True) + + class SchemaClass: def __init__ (self, package, node, typespec, fragments, options): @@ -669,11 +751,11 @@ class SchemaClass: for child in children: if child.nodeType == Node.ELEMENT_NODE: if child.nodeName == 'property': - sub = SchemaConfig (child, typespec) + sub = SchemaProperty (child, typespec) self.properties.append (sub) elif child.nodeName == 'statistic': - sub = SchemaInst (child, typespec) + sub = SchemaStatistic (child, typespec) self.statistics.append (sub) elif child.nodeName == 'method': @@ -758,6 +840,12 @@ class SchemaClass: # Code Generation Functions. The names of these functions (minus the leading "gen") # match the substitution keywords in the template files. #=================================================================================== + def testExistOptionals (self, variables): + for prop in self.properties: + if prop.isOptional == 1: + return True + return False + def testExistPerThreadStats (self, variables): for inst in self.statistics: if inst.type.type.perThread: @@ -794,17 +882,13 @@ class SchemaClass: for element in self.properties: element.genDeclaration (stream) - def genConfigElementSchema (self, stream, variables): - for config in self.properties: - config.genSchema (stream) - def genConstructorArgs (self, stream, variables): # Constructor args are config elements with read-create access result = "" for element in self.properties: if element.isConstructorArg (): stream.write (", ") - element.genFormalParam (stream) + element.genFormalParam (stream, variables) def genConstructorInits (self, stream, variables): for element in self.properties: @@ -831,8 +915,17 @@ class SchemaClass: def genEventCount (self, stream, variables): stream.write ("%d" % len (self.events)) + def genEventMethodBodies (self, stream, variables): + for event in self.events: + event.genMethodBody (stream, variables, self) + + def genEventMethodDecls (self, stream, variables): + for event in self.events: + event.genMethodDecl (stream, variables) + def genEventSchema (self, stream, variables): - pass ########################################################################### + for event in self.events: + event.genSchema (stream, variables) def genHiLoStatResets (self, stream, variables): for inst in self.statistics: @@ -884,10 +977,6 @@ class SchemaClass: if element.type.type.perThread: element.genDeclaration (stream, " ") - def genInstElementSchema (self, stream, variables): - for inst in self.statistics: - inst.genSchema (stream) - def genMethodArgIncludes (self, stream, variables): for method in self.methods: if method.getArgCount () > 0: @@ -898,7 +987,7 @@ class SchemaClass: def genMethodHandlers (self, stream, variables): for method in self.methods: - stream.write ("\n if (methodName == \"" + method.getName () + "\")\n {\n") + stream.write ("\n if (methodName == \"" + method.getName () + "\") {\n") if method.getArgCount () == 0: stream.write (" ArgsNone ioArgs;\n") else: @@ -922,10 +1011,36 @@ class SchemaClass: arg.name, "outBuf") + ";\n") stream.write (" return;\n }\n") + def genPresenceMaskBytes (self, stream, variables): + count = 0 + for prop in self.properties: + if prop.isOptional == 1: + count += 1 + if count == 0: + stream.write("0") + else: + stream.write (str(((count - 1) / 8) + 1)) + + def genPresenceMaskConstants (self, stream, variables): + count = 0 + for prop in self.properties: + if prop.isOptional == 1: + stream.write(" static const uint8_t presenceByte_%s = %d;\n" % (prop.name, count / 8)) + stream.write(" static const uint8_t presenceMask_%s = %d;\n" % (prop.name, 1 << (count % 8))) + count += 1 + + def genPropertySchema (self, stream, variables): + for prop in self.properties: + prop.genSchema (stream) + def genSetGeneralReferenceDeclaration (self, stream, variables): for prop in self.properties: if prop.isGeneralRef: - stream.write ("void setReference(uint64_t objectId) { " + prop.name + " = objectId; }\n") + stream.write ("void setReference(ObjectId objectId) { " + prop.name + " = objectId; }\n") + + def genStatisticSchema (self, stream, variables): + for stat in self.statistics: + stat.genSchema (stream) def genMethodIdDeclarations (self, stream, variables): number = 1 @@ -983,13 +1098,13 @@ class SchemaClass: if inst.type.type.perThread: inst.genAssign (stream) - def genWriteConfig (self, stream, variables): - for config in self.properties: - config.genWrite (stream) + def genWriteProperties (self, stream, variables): + for prop in self.properties: + prop.genWrite (stream) - def genWriteInst (self, stream, variables): - for inst in self.statistics: - inst.genWrite (stream) + def genWriteStatistics (self, stream, variables): + for stat in self.statistics: + stat.genWrite (stream) @@ -1046,15 +1161,9 @@ class PackageSchema: def genClassRegisters (self, stream, variables): for _class in self.classes: - stream.write ("agent->RegisterClass (") - _class.genNameCap (stream, variables) - stream.write ("::packageName, ") - _class.genNameCap (stream, variables) - stream.write ("::className, ") - _class.genNameCap (stream, variables) - stream.write ("::md5Sum, ") + stream.write (" ") _class.genNameCap (stream, variables) - stream.write ("::writeSchema);\n") + stream.write ("::registerClass(agent);\n") #===================================================================================== diff --git a/qpid/cpp/managementgen/templates/Class.cpp b/qpid/cpp/managementgen/templates/Class.cpp index 289427d742..2a0e55b34d 100644 --- a/qpid/cpp/managementgen/templates/Class.cpp +++ b/qpid/cpp/managementgen/templates/Class.cpp @@ -42,6 +42,11 @@ uint8_t /*MGEN:Class.NameCap*/::md5Sum[16] = { /*MGEN:Class.ParentRefAssignment*/ /*MGEN:Class.InitializeElements*/ +/*MGEN:IF(Class.ExistOptionals)*/ + // Optional properties start out not-present + for (uint8_t idx = 0; idx < /*MGEN:Class.PresenceMaskBytes*/; idx++) + presenceMask[idx] = 0; +/*MGEN:ENDIF*/ /*MGEN:IF(Class.ExistPerThreadStats)*/ maxThreads = agent->getMaxThreads(); perThreadStatsArray = new struct PerThreadStats*[maxThreads]; @@ -65,6 +70,7 @@ namespace { const string TYPE("type"); const string ACCESS("access"); const string INDEX("index"); + const string OPTIONAL("optional"); const string UNIT("unit"); const string MIN("min"); const string MAX("max"); @@ -76,6 +82,11 @@ namespace { const string DEFAULT("default"); } +void /*MGEN:Class.NameCap*/::registerClass(ManagementAgent* agent) +{ + agent->RegisterClass(packageName, className, md5Sum, writeSchema); +} + void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf) { FieldTable ft; @@ -90,9 +101,9 @@ void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf) buf.putShort (/*MGEN:Class.EventCount*/); // Event Count // Properties -/*MGEN:Class.ConfigElementSchema*/ +/*MGEN:Class.PropertySchema*/ // Statistics -/*MGEN:Class.InstElementSchema*/ +/*MGEN:Class.StatisticSchema*/ // Methods /*MGEN:Class.MethodSchema*/ // Events @@ -118,7 +129,11 @@ void /*MGEN:Class.NameCap*/::writeProperties (Buffer& buf) configChanged = false; writeTimestamps (buf); -/*MGEN:Class.WriteConfig*/ +/*MGEN:IF(Class.ExistOptionals)*/ + for (uint8_t idx = 0; idx < /*MGEN:Class.PresenceMaskBytes*/; idx++) + buf.putOctet(presenceMask[idx]); +/*MGEN:ENDIF*/ +/*MGEN:Class.WriteProperties*/ } void /*MGEN:Class.NameCap*/::writeStatistics (Buffer& buf, bool skipHeaders) @@ -140,7 +155,7 @@ void /*MGEN:Class.NameCap*/::writeStatistics (Buffer& buf, bool skipHeaders) /*MGEN:Class.Assign*/ if (!skipHeaders) writeTimestamps (buf); -/*MGEN:Class.WriteInst*/ +/*MGEN:Class.WriteStatistics*/ // Maintenance of hi-lo statistics /*MGEN:Class.HiLoStatResets*/ @@ -162,3 +177,4 @@ void /*MGEN:Class.NameCap*/::doMethod (/*MGEN:Class.DoMethodArgs*/) outBuf.putShortString (Manageable::StatusText (status)); } +/*MGEN:Class.EventMethodBodies*/ diff --git a/qpid/cpp/managementgen/templates/Class.h b/qpid/cpp/managementgen/templates/Class.h index fac63d5d55..40ad20eb85 100644 --- a/qpid/cpp/managementgen/templates/Class.h +++ b/qpid/cpp/managementgen/templates/Class.h @@ -37,6 +37,10 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject static std::string packageName; static std::string className; static uint8_t md5Sum[16]; +/*MGEN:IF(Class.ExistOptionals)*/ + uint8_t presenceMask[/*MGEN:Class.PresenceMaskBytes*/]; +/*MGEN:Class.PresenceMaskConstants*/ +/*MGEN:ENDIF*/ // Properties /*MGEN:Class.ConfigDeclarations*/ @@ -78,14 +82,13 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject /*MGEN:ENDIF*/ public: - friend class Package/*MGEN:Class.NamePackageCap*/; - /*MGEN:Class.NameCap*/ (ManagementAgent* agent, Manageable* coreObject/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/); ~/*MGEN:Class.NameCap*/ (void); /*MGEN:Class.SetGeneralReferenceDeclaration*/ + static void registerClass (ManagementAgent* agent); std::string& getPackageName (void) { return packageName; } std::string& getClassName (void) { return className; } uint8_t* getMd5Sum (void) { return md5Sum; } @@ -94,6 +97,8 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject /*MGEN:Class.MethodIdDeclarations*/ // Accessor Methods /*MGEN:Class.AccessorMethods*/ + // Event Methods +/*MGEN:Class.EventMethodDecls*/ }; }} diff --git a/qpid/cpp/src/qpid/agent/ManagementAgent.h b/qpid/cpp/src/qpid/agent/ManagementAgent.h index e7379e6c94..1c219f7463 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgent.h +++ b/qpid/cpp/src/qpid/agent/ManagementAgent.h @@ -65,10 +65,14 @@ class ManagementAgent // agent's thread. In this case, the callback implementations // MUST be thread safe. // + // storeFile - File where this process has read and write access. This + // file shall be used to store persistent state. + // virtual void init (std::string brokerHost = "localhost", uint16_t brokerPort = 5672, uint16_t intervalSeconds = 10, - bool useExternalThread = false) = 0; + bool useExternalThread = false, + std::string storeFile = "") = 0; // Register a schema with the management agent. This is normally called by the // package initializer generated by the management code generator. @@ -93,9 +97,8 @@ class ManagementAgent // pointer. This allows the management agent to report the deletion of the object // in an orderly way. // - virtual uint64_t addObject (ManagementObject* objectPtr, - uint32_t persistId = 0, - uint32_t persistBank = 4) = 0; + virtual ObjectId addObject (ManagementObject* objectPtr, + uint64_t persistId = 0) = 0; // If "useExternalThread" was set to true in init, this method must // be called to provide a thread for any pending method calls that have arrived. @@ -120,6 +123,12 @@ class ManagementAgent // virtual int getSignalFd (void) = 0; +protected: + friend class ManagementObject; + virtual sys::Mutex& getMutex() = 0; + virtual framing::Buffer* startEventLH() = 0; + virtual void finishEventLH(framing::Buffer* buf) = 0; + }; }} diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index ebdc71e3b1..c4108b0ae2 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -24,12 +24,21 @@ #include <list> #include <unistd.h> #include <string.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <unistd.h> +#include <fcntl.h> +#include <iostream> +#include <fstream> + using namespace qpid::client; using namespace qpid::framing; using namespace qpid::management; using namespace qpid::sys; using std::stringstream; +using std::ofstream; +using std::ifstream; using std::string; using std::cout; using std::endl; @@ -66,128 +75,186 @@ ManagementAgent* ManagementAgent::Singleton::getInstance() return agent; } +const string ManagementAgentImpl::storeMagicNumber("MA01"); + ManagementAgentImpl::ManagementAgentImpl() : - clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread), startupWait(false) + extThread(false), writeFd(-1), readFd(-1), + clientWasAdded(true), requestedBank(0), + assignedBank(0), brokerBank(0), bootSequence(0), + connThreadBody(*this), connThread(connThreadBody), + pubThreadBody(*this), pubThread(pubThreadBody) { // TODO: Establish system ID } -void ManagementAgentImpl::init(std::string brokerHost, - uint16_t brokerPort, - uint16_t intervalSeconds, - bool useExternalThread) +void ManagementAgentImpl::init(string brokerHost, + uint16_t brokerPort, + uint16_t intervalSeconds, + bool useExternalThread, + string _storeFile) { - { - Mutex::ScopedLock lock(agentLock); - startupWait = true; - } - interval = intervalSeconds; extThread = useExternalThread; + storeFile = _storeFile; nextObjectId = 1; + host = brokerHost; + port = brokerPort; + + // TODO: Abstract the socket calls for portability + if (extThread) { + int pair[2]; + int result = socketpair(PF_LOCAL, SOCK_STREAM, 0, pair); + if (result == -1) { + return; + } + writeFd = pair[0]; + readFd = pair[1]; - sessionId.generate(); - queueName << "qmfagent-" << sessionId; - string dest = "qmfagent"; - - connection.open(brokerHost.c_str(), brokerPort); - session = connection.newSession (queueName.str()); - dispatcher = new client::Dispatcher(session); + // Set the readFd to non-blocking + int flags = fcntl(readFd, F_GETFL); + fcntl(readFd, F_SETFL, flags | O_NONBLOCK); + } + retrieveData(); + bootSequence++; + if ((bootSequence & 0xF000) != 0) + bootSequence = 1; + storeData(true); +} - session.queueDeclare (arg::queue=queueName.str()); - session.exchangeBind (arg::exchange="amq.direct", arg::queue=queueName.str(), - arg::bindingKey=queueName.str()); - session.messageSubscribe (arg::queue=queueName.str(), - arg::destination=dest); - session.messageFlow (arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF); - session.messageFlow (arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF); +ManagementAgentImpl::~ManagementAgentImpl() +{ +} - Message attachRequest; - char rawbuffer[512]; - Buffer buffer (rawbuffer, 512); +void ManagementAgentImpl::RegisterClass(std::string packageName, + std::string className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall) +{ + Mutex::ScopedLock lock(agentLock); + PackageMap::iterator pIter = FindOrAddPackage(packageName); + AddClassLocal(pIter, className, md5Sum, schemaCall); +} - attachRequest.getDeliveryProperties().setRoutingKey("broker"); - attachRequest.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); +ObjectId ManagementAgentImpl::addObject(ManagementObject* object, + uint64_t persistId) +{ + Mutex::ScopedLock lock(addLock); + uint16_t sequence = persistId ? 0 : bootSequence; + uint64_t objectNum = persistId ? persistId : nextObjectId++; - EncodeHeader (buffer, 'A'); - buffer.putShortString ("RemoteAgent [C++]"); - systemId.encode (buffer); - buffer.putLong (11); + ObjectId objectId(&attachment, 0, sequence, objectNum); - size_t length = 512 - buffer.available (); - string stringBuffer (rawbuffer, length); - attachRequest.setData (stringBuffer); + // TODO: fix object-id handling + object->setObjectId(objectId); + newManagementObjects[objectId] = object; + return objectId; +} - session.messageTransfer(arg::content=attachRequest, arg::destination="qpid.management"); +uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) +{ + Mutex::ScopedLock lock(agentLock); - dispatcher->listen(dest, this); - dispatcher->start(); + for (uint32_t idx = 0; callLimit == 0 || idx < callLimit; idx++) { + if (methodQueue.empty()) + break; - { - Mutex::ScopedLock lock(agentLock); - if (startupWait) - startupCond.wait(agentLock); + QueuedMethod* item = methodQueue.front(); + methodQueue.pop_front(); + { + Mutex::ScopedUnlock unlock(agentLock); + Buffer inBuffer(const_cast<char*>(item->body.c_str()), item->body.size()); + invokeMethodRequest(inBuffer, item->sequence, item->replyTo); + delete item; + } } + + uint8_t rbuf[100]; + while (read(readFd, rbuf, 100) > 0); // Consume all signaling bytes + return methodQueue.size(); } -ManagementAgentImpl::~ManagementAgentImpl() +int ManagementAgentImpl::getSignalFd(void) { - dispatcher->stop(); - session.close(); - delete dispatcher; + return readFd; } -void ManagementAgentImpl::RegisterClass (std::string packageName, - std::string className, - uint8_t* md5Sum, - management::ManagementObject::writeSchemaCall_t schemaCall) -{ - Mutex::ScopedLock lock(agentLock); - PackageMap::iterator pIter = FindOrAddPackage (packageName); - AddClassLocal (pIter, className, md5Sum, schemaCall); +void ManagementAgentImpl::startProtocol() +{ + char rawbuffer[512]; + Buffer buffer(rawbuffer, 512); + + EncodeHeader(buffer, 'A'); + buffer.putShortString("RemoteAgent [C++]"); + systemId.encode (buffer); + buffer.putLong(requestedBank); + uint32_t length = 512 - buffer.available(); + buffer.reset(); + connThreadBody.sendBuffer(buffer, length, "qpid.management", "broker"); } -uint64_t ManagementAgentImpl::addObject (ManagementObject* object, - uint32_t /*persistId*/, - uint32_t /*persistBank*/) +void ManagementAgentImpl::storeData(bool requested) { - Mutex::ScopedLock lock(addLock); - uint64_t objectId; + if (!storeFile.empty()) { + ofstream outFile(storeFile.c_str()); + uint32_t bankToWrite = requested ? requestedBank : assignedBank; - // TODO: fix object-id handling - objectId = objIdPrefix | ((nextObjectId++) & 0x00FFFFFF); - object->setObjectId (objectId); - newManagementObjects[objectId] = object; - return objectId; + if (outFile.good()) { + outFile << storeMagicNumber << " " << bankToWrite << " " << bootSequence << endl; + outFile.close(); + } + } } -uint32_t ManagementAgentImpl::pollCallbacks(uint32_t /*callLimit*/) +void ManagementAgentImpl::retrieveData() { - return 0; + if (!storeFile.empty()) { + ifstream inFile(storeFile.c_str()); + string mn; + + if (inFile.good()) { + inFile >> mn; + if (mn == storeMagicNumber) { + inFile >> requestedBank; + inFile >> bootSequence; + } + inFile.close(); + } + } } -int ManagementAgentImpl::getSignalFd(void) +void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequence, + uint32_t code, string text) { - return -1; + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader(outBuffer, 'z', sequence); + outBuffer.putLong(code); + outBuffer.putShortString(text); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyToKey); } void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) { Mutex::ScopedLock lock(agentLock); - uint32_t assigned; - stringstream key; - assigned = inBuffer.getLong(); - objIdPrefix = ((uint64_t) assigned) << 24; + brokerBank = inBuffer.getLong(); + assignedBank = inBuffer.getLong(); + if (assignedBank != requestedBank) { + if (requestedBank == 0) + cout << "Initial object-id bank assigned: " << assignedBank << endl; + else + cout << "Collision in object-id! New bank assigned: " << assignedBank << endl; + storeData(); + } - startupWait = false; - startupCond.notify(); + attachment.setBanks(brokerBank, assignedBank); // Bind to qpid.management to receive commands - key << "agent." << assigned; - session.exchangeBind (arg::exchange="qpid.management", arg::queue=queueName.str(), - arg::bindingKey=key.str()); + connThreadBody.bindToBank(assignedBank); // Send package indications for all local packages for (PackageMap::iterator pIter = packages.begin(); @@ -198,9 +265,9 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) EncodeHeader(outBuffer, 'p'); EncodePackageIndication(outBuffer, pIter); - outLen = MA_BUFFER_SIZE - outBuffer.available (); + outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "qpid.management", "broker"); + connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); // Send class indications for all local classes ClassMap cMap = pIter->second; @@ -208,9 +275,9 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) outBuffer.reset(); EncodeHeader(outBuffer, 'q'); EncodeClassIndication(outBuffer, pIter, cIter); - outLen = MA_BUFFER_SIZE - outBuffer.available (); + outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "qpid.management", "broker"); + connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); } } } @@ -236,9 +303,9 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc EncodeHeader(outBuffer, 's', sequence); schema.writeSchemaCall(outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available (); + outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "qpid.management", "broker"); + connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); } } } @@ -249,28 +316,93 @@ void ManagementAgentImpl::handleConsoleAddedIndication() clientWasAdded = true; } -void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo) +void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo) { string methodName; - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + string packageName; + string className; + uint8_t hash[16]; + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - uint64_t objId = inBuffer.getLongLong(); + ObjectId objId(inBuffer); + inBuffer.getShortString(packageName); + inBuffer.getShortString(className); + inBuffer.getBin128(hash); inBuffer.getShortString(methodName); EncodeHeader(outBuffer, 'm', sequence); ManagementObjectMap::iterator iter = managementObjects.find(objId); if (iter == managementObjects.end() || iter->second->isDeleted()) { - outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); - outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); + outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); + outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT)); } else { - iter->second->doMethod(methodName, inBuffer, outBuffer); + if ((iter->second->getPackageName() != packageName) || + (iter->second->getClassName() != className)) { + outBuffer.putLong (Manageable::STATUS_INVALID_PARAMETER); + outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER)); + } + else + iter->second->doMethod(methodName, inBuffer, outBuffer); } outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "amq.direct", replyTo); + connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); +} + +void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, string replyTo) +{ + FieldTable ft; + FieldTable::ValuePtr value; + + moveNewObjectsLH(); + + ft.decode(inBuffer); + value = ft.get("_class"); + if (value.get() == 0 || !value->convertsTo<string>()) + { + // TODO: Send completion with an error code + return; + } + + string className(value->get<string>()); + + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) + { + ManagementObject* object = iter->second; + if (object->getClassName() == className) + { + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader(outBuffer, 'g', sequence); + object->writeProperties(outBuffer); + object->writeStatistics(outBuffer, true); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); + } + } + + sendCommandComplete(replyTo, sequence); +} + +void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo) +{ + if (extThread) { + Mutex::ScopedLock lock(agentLock); + string body; + + inBuffer.getRawData(body, inBuffer.available()); + methodQueue.push_back(new QueuedMethod(sequence, replyTo, body)); + write(writeFd, "X", 1); + } else { + invokeMethodRequest(inBuffer, sequence, replyTo); + } } void ManagementAgentImpl::received(Message& msg) @@ -287,103 +419,86 @@ void ManagementAgentImpl::received(Message& msg) replyToKey = rt.getRoutingKey(); } - if (CheckHeader (inBuffer, &opcode, &sequence)) + if (CheckHeader(inBuffer, &opcode, &sequence)) { if (opcode == 'a') handleAttachResponse(inBuffer); else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence); else if (opcode == 'x') handleConsoleAddedIndication(); + else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey); else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey); } } -void ManagementAgentImpl::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) +void ManagementAgentImpl::EncodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq) { - buf.putOctet ('A'); - buf.putOctet ('M'); - buf.putOctet ('1'); - buf.putOctet (opcode); - buf.putLong (seq); + buf.putOctet('A'); + buf.putOctet('M'); + buf.putOctet('1'); + buf.putOctet(opcode); + buf.putLong (seq); } -bool ManagementAgentImpl::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) +bool ManagementAgentImpl::CheckHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) { if (buf.getSize() < 8) return false; - uint8_t h1 = buf.getOctet (); - uint8_t h2 = buf.getOctet (); - uint8_t h3 = buf.getOctet (); + uint8_t h1 = buf.getOctet(); + uint8_t h2 = buf.getOctet(); + uint8_t h3 = buf.getOctet(); - *opcode = buf.getOctet (); - *seq = buf.getLong (); + *opcode = buf.getOctet(); + *seq = buf.getLong(); return h1 == 'A' && h2 == 'M' && h3 == '1'; } -void ManagementAgentImpl::SendBuffer (Buffer& buf, - uint32_t length, - string exchange, - string routingKey) -{ - Message msg; - string data; - - if (objIdPrefix == 0) - return; - - buf.getRawData(data, length); - msg.getDeliveryProperties().setRoutingKey(routingKey); - msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); - msg.setData (data); - session.messageTransfer (arg::content=msg, arg::destination=exchange); -} - -ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage (std::string name) +ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage(std::string name) { - PackageMap::iterator pIter = packages.find (name); - if (pIter != packages.end ()) + PackageMap::iterator pIter = packages.find(name); + if (pIter != packages.end()) return pIter; // No such package found, create a new map entry. std::pair<PackageMap::iterator, bool> result = - packages.insert (std::pair<string, ClassMap> (name, ClassMap ())); + packages.insert(std::pair<string, ClassMap>(name, ClassMap())); // Publish a package-indication message - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'p'); - EncodePackageIndication (outBuffer, result.first); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, "qpid.management", "mgmt.schema.package"); + EncodeHeader(outBuffer, 'p'); + EncodePackageIndication(outBuffer, result.first); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "mgmt.schema.package"); return result.first; } void ManagementAgentImpl::moveNewObjectsLH() { - Mutex::ScopedLock lock (addLock); - for (ManagementObjectMap::iterator iter = newManagementObjects.begin (); - iter != newManagementObjects.end (); + Mutex::ScopedLock lock(addLock); + for (ManagementObjectMap::iterator iter = newManagementObjects.begin(); + iter != newManagementObjects.end(); iter++) managementObjects[iter->first] = iter->second; newManagementObjects.clear(); } -void ManagementAgentImpl::AddClassLocal (PackageMap::iterator pIter, - string className, - uint8_t* md5Sum, - management::ManagementObject::writeSchemaCall_t schemaCall) +void ManagementAgentImpl::AddClassLocal(PackageMap::iterator pIter, + string className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall) { SchemaClassKey key; ClassMap& cMap = pIter->second; key.name = className; - memcpy (&key.hash, md5Sum, 16); + memcpy(&key.hash, md5Sum, 16); - ClassMap::iterator cIter = cMap.find (key); - if (cIter != cMap.end ()) + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end()) return; // No such class found, create a new class with local information. @@ -395,21 +510,21 @@ void ManagementAgentImpl::AddClassLocal (PackageMap::iterator pIter, // TODO: Publish a class-indication message } -void ManagementAgentImpl::EncodePackageIndication (Buffer& buf, - PackageMap::iterator pIter) +void ManagementAgentImpl::EncodePackageIndication(Buffer& buf, + PackageMap::iterator pIter) { - buf.putShortString ((*pIter).first); + buf.putShortString((*pIter).first); } -void ManagementAgentImpl::EncodeClassIndication (Buffer& buf, - PackageMap::iterator pIter, - ClassMap::iterator cIter) +void ManagementAgentImpl::EncodeClassIndication(Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter) { SchemaClassKey key = (*cIter).first; - buf.putShortString ((*pIter).first); - buf.putShortString (key.name); - buf.putBin128 (key.hash); + buf.putShortString((*pIter).first); + buf.putShortString(key.name); + buf.putBin128 (key.hash); } void ManagementAgentImpl::PeriodicProcessing() @@ -419,17 +534,17 @@ void ManagementAgentImpl::PeriodicProcessing() char msgChars[BUFSIZE]; uint32_t contentSize; string routingKey; - std::list<uint64_t> deleteList; + std::list<ObjectId> deleteList; { Buffer msgBuffer(msgChars, BUFSIZE); EncodeHeader(msgBuffer, 'h'); msgBuffer.putLongLong(uint64_t(Duration(now()))); - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); + contentSize = BUFSIZE - msgBuffer.available(); + msgBuffer.reset(); routingKey = "mgmt." + systemId.str() + ".heartbeat"; - SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey); + connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey); } moveNewObjectsLH(); @@ -437,65 +552,171 @@ void ManagementAgentImpl::PeriodicProcessing() if (clientWasAdded) { clientWasAdded = false; - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); iter++) { ManagementObject* object = iter->second; - object->setAllChanged (); + object->setAllChanged(); } } - if (managementObjects.empty ()) + if (managementObjects.empty()) return; - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); iter++) { ManagementObject* object = iter->second; - if (object->getConfigChanged () || object->isDeleted ()) + if (object->getConfigChanged() || object->isDeleted()) { - Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer, 'c'); + Buffer msgBuffer(msgChars, BUFSIZE); + EncodeHeader(msgBuffer, 'c'); object->writeProperties(msgBuffer); - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - routingKey = "mgmt." + systemId.str() + ".prop." + object->getClassName (); - SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey); + contentSize = BUFSIZE - msgBuffer.available(); + msgBuffer.reset(); + routingKey = "mgmt." + systemId.str() + ".prop." + object->getClassName(); + connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey); } - if (object->getInstChanged ()) + if (object->getInstChanged()) { - Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer, 'i'); + Buffer msgBuffer(msgChars, BUFSIZE); + EncodeHeader(msgBuffer, 'i'); object->writeStatistics(msgBuffer); - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - routingKey = "mgmt." + systemId.str () + ".stat." + object->getClassName (); - SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey); + contentSize = BUFSIZE - msgBuffer.available(); + msgBuffer.reset(); + routingKey = "mgmt." + systemId.str() + ".stat." + object->getClassName(); + connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey); } - if (object->isDeleted ()) - deleteList.push_back (iter->first); + if (object->isDeleted()) + deleteList.push_back(iter->first); } // Delete flagged objects - for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin (); - iter != deleteList.rend (); + for (std::list<ObjectId>::reverse_iterator iter = deleteList.rbegin(); + iter != deleteList.rend(); iter++) - managementObjects.erase (*iter); + managementObjects.erase(*iter); - deleteList.clear (); + deleteList.clear(); } -void ManagementAgentImpl::BackgroundThread::run() +void ManagementAgentImpl::ConnectionThread::run() +{ + static const int delayMin(1); + static const int delayMax(128); + static const int delayFactor(2); + int delay(delayMin); + string dest("qmfagent"); + + sessionId.generate(); + queueName << "qmfagent-" << sessionId; + + while (true) { + try { + if (!agent.host.empty()) { + connection.open(agent.host.c_str(), agent.port); + session = connection.newSession(queueName.str()); + subscriptions = new client::SubscriptionManager(session); + + session.queueDeclare(arg::queue=queueName.str()); + session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(), + arg::bindingKey=queueName.str()); + + subscriptions->subscribe(agent, queueName.str(), dest); + { + Mutex::ScopedLock _lock(connLock); + operational = true; + agent.startProtocol(); + try { + Mutex::ScopedUnlock _unlock(connLock); + subscriptions->run(); + } catch (std::exception) {} + + operational = false; + } + delay = delayMin; + delete subscriptions; + subscriptions = 0; + session.close(); + } + } catch (std::exception &e) { + if (delay < delayMax) + delay *= delayFactor; + } + + ::sleep(delay); + } +} + +ManagementAgentImpl::ConnectionThread::~ConnectionThread() +{ + if (subscriptions != 0) { + delete subscriptions; + } +} + +void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, + uint32_t length, + string exchange, + string routingKey) +{ + { + Mutex::ScopedLock _lock(connLock); + if (!operational) + return; + } + + Message msg; + string data; + + buf.getRawData(data, length); + msg.getDeliveryProperties().setRoutingKey(routingKey); + msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); + msg.setData(data); + session.messageTransfer(arg::content=msg, arg::destination=exchange); +} + +void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t agentBank) +{ + stringstream key; + key << "agent." << agentBank; + session.exchangeBind(arg::exchange="qpid.management", arg::queue=queueName.str(), + arg::bindingKey=key.str()); +} + + +void ManagementAgentImpl::PublishThread::run() { while (true) { ::sleep(5); agent.PeriodicProcessing(); } } + +Mutex& ManagementAgentImpl::getMutex() +{ + return agentLock; +} + +Buffer* ManagementAgentImpl::startEventLH() +{ + Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE)); + EncodeHeader(*outBuffer, 'e'); + outBuffer->putLongLong(uint64_t(Duration(now()))); + return outBuffer; +} + +void ManagementAgentImpl::finishEventLH(Buffer* outBuffer) +{ + uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available(); + outBuffer->reset(); + connThreadBody.sendBuffer(*outBuffer, outLen, "qpid.management", "mgmt.event"); + delete outBuffer; +} diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h index f7f19e145d..7d9be6daf9 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -22,7 +22,7 @@ #include "ManagementAgent.h" #include "qpid/client/Connection.h" -#include "qpid/client/Dispatcher.h" +#include "qpid/client/SubscriptionManager.h" #include "qpid/client/Session.h" #include "qpid/client/AsyncSession.h" #include "qpid/client/Message.h" @@ -30,10 +30,10 @@ #include "qpid/sys/Thread.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Mutex.h" -#include "qpid/sys/Condition.h" #include "qpid/framing/Uuid.h" #include <iostream> #include <sstream> +#include <deque> namespace qpid { namespace management { @@ -49,14 +49,14 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void init(std::string brokerHost = "localhost", uint16_t brokerPort = 5672, uint16_t intervalSeconds = 10, - bool useExternalThread = false); + bool useExternalThread = false, + std::string storeFile = ""); void RegisterClass(std::string packageName, std::string className, uint8_t* md5Sum, management::ManagementObject::writeSchemaCall_t schemaCall); - uint64_t addObject (management::ManagementObject* objectPtr, - uint32_t persistId = 0, - uint32_t persistBank = 4); + ObjectId addObject (management::ManagementObject* objectPtr, + uint64_t persistId = 0); uint32_t pollCallbacks (uint32_t callLimit = 0); int getSignalFd (void); @@ -64,14 +64,12 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen private: - struct SchemaClassKey - { + struct SchemaClassKey { std::string name; uint8_t hash[16]; }; - struct SchemaClassKeyComp - { + struct SchemaClassKeyComp { bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const { if (lhs.name != rhs.name) @@ -84,53 +82,95 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen } }; - struct SchemaClass - { + struct SchemaClass { management::ManagementObject::writeSchemaCall_t writeSchemaCall; SchemaClass () : writeSchemaCall(0) {} }; + struct QueuedMethod { + QueuedMethod(uint32_t _seq, std::string _reply, std::string _body) : + sequence(_seq), replyTo(_reply), body(_body) {} + + uint32_t sequence; + std::string replyTo; + std::string body; + }; + + typedef std::deque<QueuedMethod*> MethodQueue; typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; typedef std::map<std::string, ClassMap> PackageMap; PackageMap packages; + AgentAttachment attachment; management::ManagementObjectMap managementObjects; management::ManagementObjectMap newManagementObjects; + MethodQueue methodQueue; void received (client::Message& msg); uint16_t interval; bool extThread; + int writeFd; + int readFd; uint64_t nextObjectId; + std::string storeFile; sys::Mutex agentLock; sys::Mutex addLock; - framing::Uuid sessionId; framing::Uuid systemId; + std::string host; + uint16_t port; - int signalFdIn, signalFdOut; - client::Connection connection; - client::Session session; - client::Dispatcher* dispatcher; bool clientWasAdded; - uint64_t objIdPrefix; - std::stringstream queueName; + uint32_t requestedBank; + uint32_t assignedBank; + uint32_t brokerBank; + uint16_t bootSequence; # define MA_BUFFER_SIZE 65536 char outputBuffer[MA_BUFFER_SIZE]; + char eventBuffer[MA_BUFFER_SIZE]; - class BackgroundThread : public sys::Runnable + friend class ConnectionThread; + class ConnectionThread : public sys::Runnable { + bool operational; ManagementAgentImpl& agent; + framing::Uuid sessionId; + client::Connection connection; + client::Session session; + client::SubscriptionManager* subscriptions; + std::stringstream queueName; + sys::Mutex connLock; void run(); public: - BackgroundThread(ManagementAgentImpl& _agent) : agent(_agent) {} + ConnectionThread(ManagementAgentImpl& _agent) : + operational(false), agent(_agent), subscriptions(0) {} + ~ConnectionThread(); + void sendBuffer(qpid::framing::Buffer& buf, + uint32_t length, + std::string exchange, + std::string routingKey); + void bindToBank(uint32_t agentBank); }; - BackgroundThread bgThread; - sys::Thread thread; - sys::Condition startupCond; - bool startupWait; + class PublishThread : public sys::Runnable + { + ManagementAgentImpl& agent; + void run(); + public: + PublishThread(ManagementAgentImpl& _agent) : agent(_agent) {} + }; + + ConnectionThread connThreadBody; + sys::Thread connThread; + PublishThread pubThreadBody; + sys::Thread pubThread; + + static const std::string storeMagicNumber; + void startProtocol(); + void storeData(bool requested=false); + void retrieveData(); PackageMap::iterator FindOrAddPackage (std::string name); void moveNewObjectsLH(); void AddClassLocal (PackageMap::iterator pIter, @@ -144,16 +184,19 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen ClassMap::iterator cIter); void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); - void SendBuffer (qpid::framing::Buffer& buf, - uint32_t length, - std::string exchange, - std::string routingKey); + void sendCommandComplete (std::string replyToKey, uint32_t sequence, + uint32_t code = 0, std::string text = std::string("OK")); void handleAttachResponse (qpid::framing::Buffer& inBuffer); void handlePackageRequest (qpid::framing::Buffer& inBuffer); void handleClassQuery (qpid::framing::Buffer& inBuffer); void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence); + void invokeMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); + void handleGetQuery (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); void handleConsoleAddedIndication(); + sys::Mutex& getMutex(); + framing::Buffer* startEventLH(); + void finishEventLH(framing::Buffer* outBuffer); }; }} diff --git a/qpid/cpp/src/qpid/broker/AclModule.h b/qpid/cpp/src/qpid/broker/AclModule.h index ec832daf22..36a3f0baab 100644 --- a/qpid/cpp/src/qpid/broker/AclModule.h +++ b/qpid/cpp/src/qpid/broker/AclModule.h @@ -33,9 +33,11 @@ namespace qpid { namespace acl { -enum ObjectType {QUEUE, EXCHANGE, BROKER, LINK, ROUTE,OBJECTSIZE}; -enum Action {CONSUME, PUBLISH, CREATE, ACCESS, BIND, UNBIND, DELETE, PURGE, UPDATE, ACTIONSIZE}; -enum Property {NAME, DURABLE, OWNER, ROUTINGKEY, PASSIVE, AUTODELETE, EXCLUSIVE, TYPE, ALTERNATE, QUEUENAME}; +enum ObjectType {QUEUE, EXCHANGE, BROKER, LINK, ROUTE, METHOD, OBJECTSIZE}; // OBJECTSIZE must be last in list +enum Action {CONSUME, PUBLISH, CREATE, ACCESS, BIND, UNBIND, DELETE, PURGE, + UPDATE, ACTIONSIZE}; // ACTIONSIZE must be last in list +enum Property {NAME, DURABLE, OWNER, ROUTINGKEY, PASSIVE, AUTODELETE, EXCLUSIVE, TYPE, ALTERNATE, + QUEUENAME, SCHEMAPACKAGE, SCHEMACLASS}; enum AclResult {ALLOW, ALLOWLOG, DENY, DENYLOG}; } // namespace acl @@ -74,6 +76,7 @@ class AclHelper { if (str.compare("broker") == 0) return BROKER; if (str.compare("link") == 0) return LINK; if (str.compare("route") == 0) return ROUTE; + if (str.compare("method") == 0) return METHOD; throw str; } static inline std::string getObjectTypeStr(const ObjectType o) { @@ -83,6 +86,7 @@ class AclHelper { case BROKER: return "broker"; case LINK: return "link"; case ROUTE: return "route"; + case METHOD: return "method"; default: assert(false); // should never get here } } @@ -123,6 +127,8 @@ class AclHelper { if (str.compare("type") == 0) return TYPE; if (str.compare("alternate") == 0) return ALTERNATE; if (str.compare("queuename") == 0) return QUEUENAME; + if (str.compare("schemapackage") == 0) return SCHEMAPACKAGE; + if (str.compare("schemaclass") == 0) return SCHEMACLASS; throw str; } static inline std::string getPropertyStr(const Property p) { @@ -137,6 +143,8 @@ class AclHelper { case TYPE: return "type"; case ALTERNATE: return "alternate"; case QUEUENAME: return "queuename"; + case SCHEMAPACKAGE: return "schemapackage"; + case SCHEMACLASS: return "schemaclass"; default: assert(false); // should never get here } } @@ -231,6 +239,17 @@ class AclHelper { a3->insert(actionPair(DELETE, p0)); map->insert(objectPair(ROUTE, a3)); + + // == Method == + + propSetPtr p5(new propSet); + p5->insert(SCHEMAPACKAGE); + p5->insert(SCHEMACLASS); + + actionMapPtr a4(new actionMap); + a4->insert(actionPair(ACCESS, p5)); + + map->insert(objectPair(METHOD, a4)); } }; diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index e983aee5c9..94a392b921 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -158,10 +158,12 @@ Broker::Broker(const Broker::Options& conf) : mgmtObject->set_stagingThreshold (conf.stagingThreshold); mgmtObject->set_mgmtPubInterval (conf.mgmtPubInterval); mgmtObject->set_version (PACKAGE_VERSION); - mgmtObject->set_dataDirEnabled (dataDir.isEnabled ()); - mgmtObject->set_dataDir (dataDir.getPath ()); + if (dataDir.isEnabled()) + mgmtObject->set_dataDir(dataDir.getPath()); + else + mgmtObject->clr_dataDir(); - managementAgent->addObject (mgmtObject, 2, 1); + managementAgent->addObject (mgmtObject, 0x1000000000000002LL); // Since there is currently no support for virtual hosts, a placeholder object // representing the implied single virtual host is added here to keep the diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index fbfcaede82..6416e2fc73 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -57,9 +57,9 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel mgmtExchange = new management::Exchange (agent, this, parent, _name, durable); if (!durable) { if (name == "") - agent->addObject (mgmtExchange, 4, 1); // Special default exchange ID + agent->addObject (mgmtExchange, 0x1000000000000004LL); // Special default exchange ID else if (name == "qpid.management") - agent->addObject (mgmtExchange, 5, 1); // Special management exchange ID + agent->addObject (mgmtExchange, 0x1000000000000005LL); // Special management exchange ID else agent->addObject (mgmtExchange); } @@ -78,7 +78,7 @@ void Exchange::setPersistenceId(uint64_t id) const if (mgmtExchange != 0 && persistenceId == 0) { ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); - agent->addObject (mgmtExchange, id, 2); + agent->addObject (mgmtExchange, 0x2000000000000000LL + id); } persistenceId = id; } @@ -130,7 +130,7 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang ManagementObject* mo = queue->GetManagementObject(); if (mo != 0) { - uint64_t queueId = mo->getObjectId(); + management::ObjectId queueId = mo->getObjectId(); mgmtBinding = new management::Binding (agent, this, (Manageable*) parent, queueId, key, args); agent->addObject (mgmtBinding); } diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 40dfb80da2..090c4b4bca 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -618,7 +618,7 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const if (mgmtObject != 0 && persistenceId == 0) { ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); - agent->addObject (mgmtObject, _persistenceId, 3); + agent->addObject (mgmtObject, 0x3000000000000000LL + _persistenceId); if (externalQueueStore) { ManagementObject* childObj = externalQueueStore->GetManagementObject(); diff --git a/qpid/cpp/src/qpid/broker/System.cpp b/qpid/cpp/src/qpid/broker/System.cpp index 6c58339432..d562c43069 100644 --- a/qpid/cpp/src/qpid/broker/System.cpp +++ b/qpid/cpp/src/qpid/broker/System.cpp @@ -73,7 +73,7 @@ System::System (string _dataDir) : mgmtObject(0) mgmtObject->set_machine (std::string (_uname.machine)); } - agent->addObject (mgmtObject, 1, 1); + agent->addObject (mgmtObject, 0x1000000000000001LL); } } diff --git a/qpid/cpp/src/qpid/broker/Vhost.cpp b/qpid/cpp/src/qpid/broker/Vhost.cpp index 23203ec13e..c0eb6f03ed 100644 --- a/qpid/cpp/src/qpid/broker/Vhost.cpp +++ b/qpid/cpp/src/qpid/broker/Vhost.cpp @@ -32,7 +32,7 @@ Vhost::Vhost (management::Manageable* parentBroker) : mgmtObject(0) if (agent != 0) { mgmtObject = new management::Vhost (agent, this, parentBroker, "/"); - agent->addObject (mgmtObject, 3, 1); + agent->addObject (mgmtObject, 0x1000000000000003LL); } } } diff --git a/qpid/cpp/src/qpid/framing/FieldTable.h b/qpid/cpp/src/qpid/framing/FieldTable.h index 3c65d31aee..ed27f3fef6 100644 --- a/qpid/cpp/src/qpid/framing/FieldTable.h +++ b/qpid/cpp/src/qpid/framing/FieldTable.h @@ -58,6 +58,7 @@ class FieldTable int count() const; void set(const std::string& name, const ValuePtr& value); ValuePtr get(const std::string& name) const; + bool isSet(const std::string& name) const { return get(name).get() != 0; } void setString(const std::string& name, const std::string& value); void setInt(const std::string& name, int value); diff --git a/qpid/cpp/src/qpid/management/Manageable.cpp b/qpid/cpp/src/qpid/management/Manageable.cpp index 0f3fbab55c..19fba87fde 100644 --- a/qpid/cpp/src/qpid/management/Manageable.cpp +++ b/qpid/cpp/src/qpid/management/Manageable.cpp @@ -31,6 +31,7 @@ std::string Manageable::StatusText (status_t status) case STATUS_NOT_IMPLEMENTED : return "NotImplemented"; case STATUS_INVALID_PARAMETER : return "InvalidParameter"; case STATUS_FEATURE_NOT_IMPLEMENTED : return "FeatureNotImplemented"; + case STATUS_FORBIDDEN : return "Forbidden"; } return "??"; diff --git a/qpid/cpp/src/qpid/management/Manageable.h b/qpid/cpp/src/qpid/management/Manageable.h index e2b8980465..4e2f33b625 100644 --- a/qpid/cpp/src/qpid/management/Manageable.h +++ b/qpid/cpp/src/qpid/management/Manageable.h @@ -44,6 +44,7 @@ class Manageable static const status_t STATUS_NOT_IMPLEMENTED = 3; static const status_t STATUS_INVALID_PARAMETER = 4; static const status_t STATUS_FEATURE_NOT_IMPLEMENTED = 5; + static const status_t STATUS_FORBIDDEN = 6; // Every "Manageable" object must hold a reference to exactly one // management object. This object is always of a class derived from diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.cpp b/qpid/cpp/src/qpid/management/ManagementBroker.cpp index 1bdd8ab836..17f5c14592 100644 --- a/qpid/cpp/src/qpid/management/ManagementBroker.cpp +++ b/qpid/cpp/src/qpid/management/ManagementBroker.cpp @@ -27,6 +27,7 @@ #include "qpid/framing/MessageTransferBody.h" #include "qpid/sys/Time.h" #include "qpid/broker/ConnectionState.h" +#include "qpid/broker/AclModule.h" #include <list> #include <iostream> #include <fstream> @@ -80,8 +81,8 @@ ManagementBroker::RemoteAgent::~RemoteAgent () ManagementBroker::ManagementBroker () : threadPoolSize(1), interval(10), broker(0) { - localBank = 5; nextObjectId = 1; + brokerBank = 1; bootSequence = 1; nextRemoteBank = 10; nextRequestSequence = 1; @@ -112,7 +113,7 @@ ManagementBroker::~ManagementBroker () } } -void ManagementBroker::configure(string _dataDir, uint16_t _interval, Manageable* _broker, int _threads) +void ManagementBroker::configure(string _dataDir, uint16_t _interval, broker::Broker* _broker, int _threads) { dataDir = _dataDir; interval = _interval; @@ -140,7 +141,10 @@ void ManagementBroker::configure(string _dataDir, uint16_t _interval, Manageable inFile.close(); QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid); + // if sequence goes beyond a 12-bit field, skip zero and wrap to 1. bootSequence++; + if (bootSequence & 0xF000) + bootSequence = 1; writeData(); } else @@ -183,29 +187,26 @@ void ManagementBroker::RegisterClass (string packageName, AddClass(pIter, className, md5Sum, schemaCall); } -uint64_t ManagementBroker::addObject (ManagementObject* object, - uint32_t persistId, - uint32_t persistBank) +ObjectId ManagementBroker::addObject (ManagementObject* object, + uint64_t persistId) { Mutex::ScopedLock lock (addLock); - uint64_t objectId; + uint16_t sequence; + uint64_t objectNum; - if (persistId == 0) - { - objectId = ((uint64_t) bootSequence) << 48 | - ((uint64_t) localBank) << 24 | nextObjectId++; - if ((nextObjectId & 0xFF000000) != 0) - { - nextObjectId = 1; - localBank++; - } + if (persistId == 0) { + sequence = bootSequence; + objectNum = nextObjectId++; + } else { + sequence = 0; + objectNum = persistId; } - else - objectId = ((uint64_t) persistBank) << 24 | persistId; - object->setObjectId (objectId); - newManagementObjects[objectId] = object; - return objectId; + ObjectId objId(0 /*flags*/ , sequence, brokerBank, 0, objectNum); + + object->setObjectId(objId); + newManagementObjects[objId] = object; + return objId; } ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds) @@ -308,7 +309,7 @@ void ManagementBroker::PeriodicProcessing (void) char msgChars[BUFSIZE]; uint32_t contentSize; string routingKey; - std::list<uint64_t> deleteList; + std::list<ObjectId> deleteList; { Buffer msgBuffer(msgChars, BUFSIZE); @@ -373,7 +374,7 @@ void ManagementBroker::PeriodicProcessing (void) } // Delete flagged objects - for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin (); + for (std::list<ObjectId>::reverse_iterator iter = deleteList.rbegin (); iter != deleteList.rend (); iter++) managementObjects.erase (*iter); @@ -408,48 +409,72 @@ bool ManagementBroker::dispatchCommand (Deliverable& deliverable, // Parse the routing key. This management broker should act as though it // is bound to the exchange to match the following keys: // - // agent.<X>.# - // broker.# - // - // where <X> is any non-negative decimal integer less than the lowest remote - // object-id bank. + // agent.0.# + // broker if (routingKey == "broker") { - dispatchAgentCommandLH (msg); + dispatchAgentCommandLH(msg); + return false; + } + + else if (routingKey.compare(0, 7, "agent.0") == 0) { + dispatchAgentCommandLH(msg); return false; } else if (routingKey.compare(0, 6, "agent.") == 0) { - std::string::size_type delim = routingKey.find('.', 6); - if (delim == string::npos) - delim = routingKey.length(); - string bank = routingKey.substr(6, delim - 6); - if ((uint32_t) atoi(bank.c_str()) <= localBank) { - dispatchAgentCommandLH (msg); - return false; - } + return authorizeAgentMessageLH(msg); } return true; } -void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, + uint32_t sequence, const ConnectionToken* connToken) { string methodName; + string packageName; + string className; + uint8_t hash[16]; Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; + AclModule* acl = broker->getAcl(); - uint64_t objId = inBuffer.getLongLong(); + ObjectId objId(inBuffer); + inBuffer.getShortString(packageName); + inBuffer.getShortString(className); + inBuffer.getBin128(hash); inBuffer.getShortString(methodName); - EncodeHeader(outBuffer, 'm', sequence); + if (acl != 0) { + string userId = ((const broker::ConnectionState*) connToken)->getUserId(); + std::map<acl::Property, string> params; + params[acl::SCHEMAPACKAGE] = packageName; + params[acl::SCHEMACLASS] = className; + + if (!acl->authorise(userId, acl::ACCESS, acl::METHOD, methodName, ¶ms)) { + outBuffer.putLong(Manageable::STATUS_FORBIDDEN); + outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, dExchange, replyToKey); + return; + } + } + ManagementObjectMap::iterator iter = managementObjects.find(objId); if (iter == managementObjects.end() || iter->second->isDeleted()) { outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); } else { - iter->second->doMethod(methodName, inBuffer, outBuffer); + if ((iter->second->getPackageName() != packageName) || + (iter->second->getClassName() != className)) { + outBuffer.putLong (Manageable::STATUS_INVALID_PARAMETER); + outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER)); + } + else + iter->second->doMethod(methodName, inBuffer, outBuffer); } outLen = MA_BUFFER_SIZE - outBuffer.available(); @@ -497,34 +522,33 @@ void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey FindOrAddPackageLH(packageName); } -void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) { std::string packageName; - inBuffer.getShortString (packageName); - PackageMap::iterator pIter = packages.find (packageName); - if (pIter != packages.end ()) + inBuffer.getShortString(packageName); + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) { ClassMap cMap = pIter->second; - for (ClassMap::iterator cIter = cMap.begin (); - cIter != cMap.end (); + for (ClassMap::iterator cIter = cMap.begin(); + cIter != cMap.end(); cIter++) { - if (cIter->second->hasSchema ()) + if (cIter->second.hasSchema()) { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'q', sequence); - EncodeClassIndication (outBuffer, pIter, cIter); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + EncodeHeader(outBuffer, 'q', sequence); + EncodeClassIndication(outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, dExchange, replyToKey); } } } - - sendCommandComplete (replyToKey, sequence); + sendCommandComplete(replyToKey, sequence); } void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t) @@ -551,9 +575,7 @@ void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, ui outBuffer.reset (); SendBuffer (outBuffer, outLen, dExchange, replyToKey); - SchemaClass* newSchema = new SchemaClass; - newSchema->pendingSequence = sequence; - pIter->second[key] = newSchema; + pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(sequence))); } } @@ -569,7 +591,7 @@ void ManagementBroker::SchemaClass::appendSchema(Buffer& buf) buf.putRawData(buffer, bufferLen); } -void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +void ManagementBroker::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) { string packageName; SchemaClassKey key; @@ -578,33 +600,33 @@ void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKe inBuffer.getShortString (key.name); inBuffer.getBin128 (key.hash); - PackageMap::iterator pIter = packages.find (packageName); + PackageMap::iterator pIter = packages.find(packageName); if (pIter != packages.end()) { ClassMap cMap = pIter->second; - ClassMap::iterator cIter = cMap.find (key); + ClassMap::iterator cIter = cMap.find(key); if (cIter != cMap.end()) { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - SchemaClass* classInfo = cIter->second; + SchemaClass& classInfo = cIter->second; - if (classInfo->hasSchema()) { + if (classInfo.hasSchema()) { EncodeHeader(outBuffer, 's', sequence); - classInfo->appendSchema (outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available (); + classInfo.appendSchema(outBuffer); + outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + SendBuffer(outBuffer, outLen, dExchange, replyToKey); } else - sendCommandComplete (replyToKey, sequence, 1, "Schema not available"); + sendCommandComplete(replyToKey, sequence, 1, "Schema not available"); } else - sendCommandComplete (replyToKey, sequence, 1, "Class key not found"); + sendCommandComplete(replyToKey, sequence, 1, "Class key not found"); } else - sendCommandComplete (replyToKey, sequence, 1, "Package not found"); + sendCommandComplete(replyToKey, sequence, 1, "Package not found"); } -void ManagementBroker::handleSchemaResponseLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence) +void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence) { string packageName; SchemaClassKey key; @@ -619,24 +641,26 @@ void ManagementBroker::handleSchemaResponseLH (Buffer& inBuffer, string /*replyT if (pIter != packages.end()) { ClassMap cMap = pIter->second; ClassMap::iterator cIter = cMap.find(key); - if (cIter != cMap.end() && cIter->second->pendingSequence == sequence) { + if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) { size_t length = ValidateSchema(inBuffer); - if (length == 0) + if (length == 0) { + QPID_LOG(warning, "Management Broker received invalid schema response: " << packageName << "." << key.name); cMap.erase(key); + } else { - cIter->second->buffer = (uint8_t*) malloc(length); - cIter->second->bufferLen = length; - inBuffer.getRawData(cIter->second->buffer, cIter->second->bufferLen); + cIter->second.buffer = (uint8_t*) malloc(length); + cIter->second.bufferLen = length; + inBuffer.getRawData(cIter->second.buffer, cIter->second.bufferLen); // Publish a class-indication message - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'q'); - EncodeClassIndication (outBuffer, pIter, cIter); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema"); + EncodeHeader(outBuffer, 'q'); + EncodeClassIndication(outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema"); } } } @@ -671,14 +695,14 @@ uint32_t ManagementBroker::assignBankLH (uint32_t requestedBank) void ManagementBroker::deleteOrphanedAgentsLH() { - vector<uint64_t> deleteList; + vector<ObjectId> deleteList; for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) { - uint64_t connectionRef = aIter->first; + ObjectId connectionRef = aIter->first; bool found = false; - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); iter++) { if (iter->first == connectionRef && !iter->second->isDeleted()) { found = true; @@ -692,10 +716,8 @@ void ManagementBroker::deleteOrphanedAgentsLH() } } - for (vector<uint64_t>::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) { - + for (vector<ObjectId>::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) remoteAgents.erase(*dIter); - } deleteList.clear(); } @@ -705,7 +727,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe string label; uint32_t requestedBank; uint32_t assignedBank; - uint64_t connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); + ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); Uuid systemId; moveNewObjectsLH(); @@ -741,6 +763,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe uint32_t outLen; EncodeHeader (outBuffer, 'a', sequence); + outBuffer.putLong (brokerBank); outBuffer.putLong (assignedBank); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); @@ -786,13 +809,77 @@ void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, ui sendCommandComplete (replyToKey, sequence); } -void ManagementBroker::dispatchAgentCommandLH (Message& msg) +bool ManagementBroker::authorizeAgentMessageLH(Message& msg) { Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); uint8_t opcode; uint32_t sequence; string replyToKey; + if (msg.encodedSize() > MA_BUFFER_SIZE) + return false; + + msg.encodeContent(inBuffer); + inBuffer.reset(); + + if (!CheckHeader(inBuffer, &opcode, &sequence)) + return false; + + if (opcode == 'M') { + // TODO: check method call against ACL list. + AclModule* acl = broker->getAcl(); + if (acl == 0) + return true; + + string userId = ((const broker::ConnectionState*) msg.getPublisher())->getUserId(); + string packageName; + string className; + uint8_t hash[16]; + string methodName; + + std::map<acl::Property, string> params; + ObjectId objId(inBuffer); + inBuffer.getShortString(packageName); + inBuffer.getShortString(className); + inBuffer.getBin128(hash); + inBuffer.getShortString(methodName); + + params[acl::SCHEMAPACKAGE] = packageName; + params[acl::SCHEMACLASS] = className; + + if (acl->authorise(userId, acl::ACCESS, acl::METHOD, methodName, ¶ms)) + return true; + + const framing::MessageProperties* p = + msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + if (p && p->hasReplyTo()) { + const framing::ReplyTo& rt = p->getReplyTo(); + replyToKey = rt.getRoutingKey(); + + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader(outBuffer, 'm', sequence); + outBuffer.putLong(Manageable::STATUS_FORBIDDEN); + outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, dExchange, replyToKey); + } + + return false; + } + + return true; +} + +void ManagementBroker::dispatchAgentCommandLH(Message& msg) +{ + Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE); + uint8_t opcode; + uint32_t sequence; + string replyToKey; + const framing::MessageProperties* p = msg.getFrames().getHeaders()->get<framing::MessageProperties>(); if (p && p->hasReplyTo()) { @@ -823,7 +910,7 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg) else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence); else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher()); else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); - else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher()); } ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name) @@ -834,7 +921,7 @@ ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std: // No such package found, create a new map entry. pair<PackageMap::iterator, bool> result = - packages.insert (pair<string, ClassMap> (name, ClassMap ())); + packages.insert(pair<string, ClassMap>(name, ClassMap())); QPID_LOG (debug, "ManagementBroker added package " << name); // Publish a package-indication message @@ -859,20 +946,18 @@ void ManagementBroker::AddClass(PackageMap::iterator pIter, ClassMap& cMap = pIter->second; key.name = className; - memcpy (&key.hash, md5Sum, 16); + memcpy(&key.hash, md5Sum, 16); - ClassMap::iterator cIter = cMap.find (key); - if (cIter != cMap.end ()) + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end()) return; // No such class found, create a new class with local information. QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." << key.name); - SchemaClass* classInfo = new SchemaClass; - classInfo->writeSchemaCall = schemaCall; - cMap[key] = classInfo; - cIter = cMap.find (key); + cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall))); + cIter = cMap.find(key); } void ManagementBroker::EncodePackageIndication (Buffer& buf, @@ -917,6 +1002,8 @@ size_t ManagementBroker::ValidateSchema(Buffer& inBuffer) for (uint16_t idx = 0; idx < methCount; idx++) { FieldTable ft; ft.decode(inBuffer); + if (!ft.isSet("argCount")) + return 0; int argCount = ft.getInt("argCount"); for (int mIdx = 0; mIdx < argCount; mIdx++) { FieldTable aft; @@ -924,10 +1011,41 @@ size_t ManagementBroker::ValidateSchema(Buffer& inBuffer) } } - if (evntCount != 0) - return 0; + for (uint16_t idx = 0; idx < evntCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + if (!ft.isSet("argCount")) + return 0; + int argCount = ft.getInt("argCount"); + for (int mIdx = 0; mIdx < argCount; mIdx++) { + FieldTable aft; + aft.decode(inBuffer); + } + } end = inBuffer.getPosition(); inBuffer.restore(); // restore original position return end - start; } + +Mutex& ManagementBroker::getMutex() +{ + return userLock; +} + +Buffer* ManagementBroker::startEventLH() +{ + Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE)); + EncodeHeader(*outBuffer, 'e'); + outBuffer->putLongLong(uint64_t(Duration(now()))); + return outBuffer; +} + +void ManagementBroker::finishEventLH(Buffer* outBuffer) +{ + uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available(); + outBuffer->reset(); + SendBuffer(*outBuffer, outLen, mExchange, "mgmt.event"); + delete outBuffer; +} + diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.h b/qpid/cpp/src/qpid/management/ManagementBroker.h index 151926f526..e3b5504752 100644 --- a/qpid/cpp/src/qpid/management/ManagementBroker.h +++ b/qpid/cpp/src/qpid/management/ManagementBroker.h @@ -47,7 +47,7 @@ class ManagementBroker : public ManagementAgent ManagementBroker (); virtual ~ManagementBroker (); - void configure (std::string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize); + void configure (std::string dataDir, uint16_t interval, broker::Broker* broker, int threadPoolSize); void setInterval (uint16_t _interval) { interval = _interval; } void setExchange (broker::Exchange::shared_ptr mgmtExchange, broker::Exchange::shared_ptr directExchange); @@ -56,16 +56,15 @@ class ManagementBroker : public ManagementAgent std::string className, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall); - uint64_t addObject (ManagementObject* object, - uint32_t persistId = 0, - uint32_t persistBank = 4); + ObjectId addObject (ManagementObject* object, + uint64_t persistId = 0); void clientAdded (void); bool dispatchCommand (broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); // Stubs for remote management agent calls - void init (std::string, uint16_t, uint16_t, bool) { assert(0); } + void init (std::string, uint16_t, uint16_t, bool, std::string) { assert(0); } uint32_t pollCallbacks (uint32_t) { assert(0); return 0; } int getSignalFd () { assert(0); return -1; } @@ -88,7 +87,7 @@ class ManagementBroker : public ManagementAgent { uint32_t objIdBank; std::string routingKey; - uint64_t connectionRef; + ObjectId connectionRef; Agent* mgmtObject; ManagementObject* GetManagementObject (void) const { return mgmtObject; } virtual ~RemoteAgent (); @@ -97,7 +96,7 @@ class ManagementBroker : public ManagementAgent // TODO: Eventually replace string with entire reply-to structure. reply-to // currently assumes that the exchange is "amq.direct" even though it could // in theory be specified differently. - typedef std::map<uint64_t, RemoteAgent*> RemoteAgentMap; + typedef std::map<ObjectId, RemoteAgent*> RemoteAgentMap; typedef std::vector<std::string> ReplyToVector; // Storage for known schema classes: @@ -133,12 +132,15 @@ class ManagementBroker : public ManagementAgent size_t bufferLen; uint8_t* buffer; - SchemaClass () : writeSchemaCall(0), pendingSequence(0), bufferLen(0), buffer(0) {} + SchemaClass(uint32_t seq) : + writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {} + SchemaClass(ManagementObject::writeSchemaCall_t call) : + writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {} bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); } void appendSchema (framing::Buffer& buf); }; - typedef std::map<SchemaClassKey, SchemaClass*, SchemaClassKeyComp> ClassMap; + typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; typedef std::map<std::string, ClassMap> PackageMap; RemoteAgentMap remoteAgents; @@ -157,10 +159,10 @@ class ManagementBroker : public ManagementAgent broker::Exchange::shared_ptr dExchange; std::string dataDir; uint16_t interval; - Manageable* broker; + broker::Broker* broker; uint16_t bootSequence; - uint32_t localBank; uint32_t nextObjectId; + uint32_t brokerBank; uint32_t nextRemoteBank; uint32_t nextRequestSequence; bool clientWasAdded; @@ -168,6 +170,7 @@ class ManagementBroker : public ManagementAgent # define MA_BUFFER_SIZE 65536 char inputBuffer[MA_BUFFER_SIZE]; char outputBuffer[MA_BUFFER_SIZE]; + char eventBuffer[MA_BUFFER_SIZE]; void writeData (); void PeriodicProcessing (void); @@ -179,7 +182,8 @@ class ManagementBroker : public ManagementAgent std::string routingKey); void moveNewObjectsLH(); - void dispatchAgentCommandLH (broker::Message& msg); + bool authorizeAgentMessageLH(broker::Message& msg); + void dispatchAgentCommandLH(broker::Message& msg); PackageMap::iterator FindOrAddPackageLH(std::string name); void AddClass(PackageMap::iterator pIter, @@ -206,9 +210,12 @@ class ManagementBroker : public ManagementAgent void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const broker::ConnectionToken* connToken); void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const broker::ConnectionToken* connToken); size_t ValidateSchema(framing::Buffer&); + sys::Mutex& getMutex(); + framing::Buffer* startEventLH(); + void finishEventLH(framing::Buffer* outBuffer); }; }} diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp index 74d9571d10..e0386ee057 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.cpp +++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp @@ -28,6 +28,62 @@ using namespace qpid::framing; using namespace qpid::management; using namespace qpid::sys; +void AgentAttachment::setBanks(uint32_t broker, uint32_t bank) +{ + first = + ((uint64_t) (broker & 0x000fffff)) << 28 | + ((uint64_t) (bank & 0x0fffffff)); +} + +ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint32_t bank, uint64_t object) + : agent(0) +{ + first = + ((uint64_t) (flags & 0x0f)) << 60 | + ((uint64_t) (seq & 0x0fff)) << 48 | + ((uint64_t) (broker & 0x000fffff)) << 28 | + ((uint64_t) (bank & 0x0fffffff)); + second = object; +} + +ObjectId::ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq, uint64_t object) + : agent(_agent) +{ + first = + ((uint64_t) (flags & 0x0f)) << 60 | + ((uint64_t) (seq & 0x0fff)) << 48; + second = object; +} + +bool ObjectId::operator==(const ObjectId &other) const +{ + uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL; + + return first == otherFirst && second == other.second; +} + +bool ObjectId::operator<(const ObjectId &other) const +{ + uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL; + + return (first < otherFirst) || ((first == otherFirst) && (second < other.second)); +} + +void ObjectId::encode(framing::Buffer& buffer) +{ + if (agent == 0) + buffer.putLongLong(first); + else + buffer.putLongLong(first | agent->first); + buffer.putLongLong(second); +} + +void ObjectId::decode(framing::Buffer& buffer) +{ + first = buffer.getLongLong(); + second = buffer.getLongLong(); +} + int ManagementObject::nextThreadIndex = 0; void ManagementObject::writeTimestamps (Buffer& buf) @@ -38,10 +94,10 @@ void ManagementObject::writeTimestamps (Buffer& buf) buf.putLongLong (uint64_t (Duration (now ()))); buf.putLongLong (createTime); buf.putLongLong (destroyTime); - buf.putLongLong (objectId); + objectId.encode(buf); } -void ManagementObject::setReference(uint64_t) {} +void ManagementObject::setReference(ObjectId) {} int ManagementObject::getThreadIndex() { static __thread int thisIndex = -1; @@ -54,3 +110,17 @@ int ManagementObject::getThreadIndex() { return thisIndex; } +Mutex& ManagementObject::getMutex() +{ + return agent->getMutex(); +} + +Buffer* ManagementObject::startEventLH() +{ + return agent->startEventLH(); +} + +void ManagementObject::finishEventLH(Buffer* buf) +{ + agent->finishEventLH(buf); +} diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h index 78d065aac2..1b809f5125 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.h +++ b/qpid/cpp/src/qpid/management/ManagementObject.h @@ -32,6 +32,34 @@ namespace management { class Manageable; class ManagementAgent; +class ObjectId; + + +class AgentAttachment { + friend class ObjectId; +private: + uint64_t first; +public: + AgentAttachment() : first(0) {} + void setBanks(uint32_t broker, uint32_t bank); +}; + + +class ObjectId { +private: + const AgentAttachment* agent; + uint64_t first; + uint64_t second; +public: + ObjectId() : agent(0), first(0), second(0) {} + ObjectId(framing::Buffer& buf) : agent(0) { decode(buf); } + ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint32_t bank, uint64_t object); + ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq, uint64_t object); + bool operator==(const ObjectId &other) const; + bool operator<(const ObjectId &other) const; + void encode(framing::Buffer& buffer); + void decode(framing::Buffer& buffer); +}; class ManagementObject { @@ -39,7 +67,7 @@ class ManagementObject uint64_t createTime; uint64_t destroyTime; - uint64_t objectId; + ObjectId objectId; bool configChanged; bool instChanged; bool deleted; @@ -84,11 +112,15 @@ class ManagementObject int getThreadIndex(); void writeTimestamps (qpid::framing::Buffer& buf); + sys::Mutex& getMutex(); + framing::Buffer* startEventLH(); + void finishEventLH(framing::Buffer* buf); + public: typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&); ManagementObject (ManagementAgent* _agent, Manageable* _core) : - destroyTime(0), objectId (0), configChanged(true), + destroyTime(0), configChanged(true), instChanged(true), deleted(false), coreObject(_core), agent(_agent) { createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); } virtual ~ManagementObject () {} @@ -100,14 +132,14 @@ class ManagementObject virtual void doMethod (std::string methodName, qpid::framing::Buffer& inBuf, qpid::framing::Buffer& outBuf) = 0; - virtual void setReference (uint64_t objectId); + virtual void setReference (ObjectId objectId); virtual std::string& getClassName (void) = 0; virtual std::string& getPackageName (void) = 0; virtual uint8_t* getMd5Sum (void) = 0; - void setObjectId (uint64_t oid) { objectId = oid; } - uint64_t getObjectId (void) { return objectId; } + void setObjectId (ObjectId oid) { objectId = oid; } + ObjectId getObjectId (void) { return objectId; } inline bool getConfigChanged (void) { return configChanged; } virtual bool getInstChanged (void) { return instChanged; } inline void setAllChanged (void) { @@ -120,10 +152,9 @@ class ManagementObject deleted = true; } inline bool isDeleted (void) { return deleted; } - inline sys::Mutex& getLock() { return accessLock; } }; -typedef std::map<uint64_t,ManagementObject*> ManagementObjectMap; +typedef std::map<ObjectId, ManagementObject*> ManagementObjectMap; }} diff --git a/qpid/python/qpid/management.py b/qpid/python/qpid/management.py index 83c29a78a5..99c902ab30 100644 --- a/qpid/python/qpid/management.py +++ b/qpid/python/qpid/management.py @@ -69,6 +69,53 @@ class mgmtObject (object): for cell in row: setattr (self, cell[0], cell[1]) +class objectId(object): + """ Object that represents QMF object identifiers """ + + def __init__(self, codec): + self.first = codec.read_uint64() + self.second = codec.read_uint64() + + def __cmp__(self, other): + if other == None: + return 1 + if self.first < other.first: + return -1 + if self.first > other.first: + return 1 + if self.second < other.second: + return -1 + if self.second > other.second: + return 1 + return 0 + + + def index(self): + return (self.first, self.second) + + def getFlags(self): + return (self.first & 0xF000000000000000) >> 60 + + def getSequence(self): + return (self.first & 0x0FFF000000000000) >> 48 + + def getBroker(self): + return (self.first & 0x0000FFFFF0000000) >> 28 + + def getBank(self): + return self.first & 0x000000000FFFFFFF + + def getObject(self): + return self.second + + def isDurable(self): + return self.getSequence() == 0 + + def encode(self, codec): + codec.write_uint64(self.first) + codec.write_uint64(self.second) + + class methodResult: """ Object that contains the result of a method call """ @@ -308,6 +355,8 @@ class managementClient: self.handleClassInd (ch, codec) elif hdr[0] == 'h': self.handleHeartbeat (ch, codec) + elif hdr[0] == 'e': + self.handleEvent (ch, codec) else: self.parse (ch, codec, hdr[0], hdr[1]) ch.accept(msg) @@ -386,7 +435,7 @@ class managementClient: elif typecode == 9: # DELTATIME codec.write_uint64 (long (value)) elif typecode == 10: # REF - codec.write_uint64 (long (value)) + value.encode(codec) elif typecode == 11: # BOOL codec.write_uint8 (int (value)) elif typecode == 12: # FLOAT @@ -429,7 +478,7 @@ class managementClient: elif typecode == 9: # DELTATIME data = codec.read_uint64 () elif typecode == 10: # REF - data = codec.read_uint64 () + data = objectId(codec) elif typecode == 11: # BOOL data = codec.read_uint8 () elif typecode == 12: # FLOAT @@ -551,9 +600,9 @@ class managementClient: ch.send ("qpid.management", smsg) def handleClassInd (self, ch, codec): - pname = str (codec.read_str8 ()) - cname = str (codec.read_str8 ()) - hash = codec.read_bin128 () + pname = str (codec.read_str8()) + cname = str (codec.read_str8()) + hash = codec.read_bin128() if pname not in self.packages: return @@ -574,6 +623,32 @@ class managementClient: if self.ctrlCb != None: self.ctrlCb (ch.context, self.CTRL_HEARTBEAT, timestamp) + def handleEvent (self, ch, codec): + if self.eventCb == None: + return + timestamp = codec.read_uint64() + objId = objectId(codec) + packageName = str(codec.read_str8()) + className = str(codec.read_str8()) + hash = codec.read_bin128() + name = str(codec.read_str8()) + classKey = (packageName, className, hash) + if classKey not in self.schema: + return; + schemaClass = self.schema[classKey] + row = [] + es = schemaClass['E'] + arglist = None + for ename in es: + (edesc, eargs) = es[ename] + if ename == name: + arglist = eargs + if arglist == None: + return + for arg in arglist: + row.append((arg[0], self.decodeValue(codec, arg[1]))) + self.eventCb(ch.context, classKey, objId, name, row) + def parseSchema (self, ch, codec): """ Parse a received schema-description message. """ self.decOutstanding (ch) @@ -597,22 +672,23 @@ class managementClient: configs = [] insts = [] methods = {} - events = [] + events = {} configs.append (("id", 4, "", "", 1, 1, None, None, None, None, None)) insts.append (("id", 4, None, None)) for idx in range (configCount): ft = codec.read_map () - name = str (ft["name"]) - type = ft["type"] - access = ft["access"] - index = ft["index"] - unit = None - min = None - max = None - maxlen = None - desc = None + name = str (ft["name"]) + type = ft["type"] + access = ft["access"] + index = ft["index"] + optional = ft["optional"] + unit = None + min = None + max = None + maxlen = None + desc = None for key, value in ft.items (): if key == "unit": @@ -626,7 +702,7 @@ class managementClient: elif key == "desc": desc = str (value) - config = (name, type, unit, desc, access, index, min, max, maxlen) + config = (name, type, unit, desc, access, index, min, max, maxlen, optional) configs.append (config) for idx in range (instCount): @@ -685,6 +761,33 @@ class managementClient: args.append (arg) methods[mname] = (mdesc, args) + for idx in range (eventCount): + ft = codec.read_map () + ename = str (ft["name"]) + argCount = ft["argCount"] + if "desc" in ft: + edesc = str (ft["desc"]) + else: + edesc = None + + args = [] + for aidx in range (argCount): + ft = codec.read_map () + name = str (ft["name"]) + type = ft["type"] + unit = None + desc = None + + for key, value in ft.items (): + if key == "unit": + unit = str (value) + elif key == "desc": + desc = str (value) + + arg = (name, type, unit, desc) + args.append (arg) + events[ename] = (edesc, args) + schemaClass = {} schemaClass['C'] = configs schemaClass['I'] = insts @@ -695,6 +798,22 @@ class managementClient: if self.schemaCb != None: self.schemaCb (ch.context, classKey, configs, insts, methods, events) + def parsePresenceMasks(self, codec, schemaClass): + """ Generate a list of not-present properties """ + excludeList = [] + bit = 0 + for element in schemaClass['C'][1:]: + if element[9] == 1: + if bit == 0: + mask = codec.read_uint8() + bit = 1 + if (mask & bit) == 0: + excludeList.append(element[0]) + bit = bit * 2 + if bit == 256: + bit = 0 + return excludeList + def parseContent (self, ch, cls, codec, seq=0): """ Parse a received content message. """ if (cls == 'C' or (cls == 'B' and seq == 0)) and self.configCb == None: @@ -716,21 +835,26 @@ class managementClient: timestamps.append (codec.read_uint64 ()) # Current Time timestamps.append (codec.read_uint64 ()) # Create Time timestamps.append (codec.read_uint64 ()) # Delete Time - + objId = objectId(codec) schemaClass = self.schema[classKey] if cls == 'C' or cls == 'B': - for element in schemaClass['C'][:]: + notPresent = self.parsePresenceMasks(codec, schemaClass) + + if cls == 'C' or cls == 'B': + row.append(("id", objId)) + for element in schemaClass['C'][1:]: tc = element[1] name = element[0] - data = self.decodeValue (codec, tc) - row.append ((name, data)) + if name in notPresent: + row.append((name, None)) + else: + data = self.decodeValue(codec, tc) + row.append((name, data)) if cls == 'I' or cls == 'B': - if cls == 'B': - start = 1 - else: - start = 0 - for element in schemaClass['I'][start:]: + if cls == 'I': + row.append(("id", objId)) + for element in schemaClass['I'][1:]: tc = element[1] name = element[0] data = self.decodeValue (codec, tc) @@ -763,9 +887,12 @@ class managementClient: codec = Codec (self.spec) sequence = self.seqMgr.reserve ((userSequence, classId, methodName)) self.setHeader (codec, ord ('M'), sequence) - codec.write_uint64 (objId) # ID of object + objId.encode(codec) + codec.write_str8 (classId[0]) + codec.write_str8 (classId[1]) + codec.write_bin128 (classId[2]) codec.write_str8 (methodName) - bank = (objId & 0x0000FFFFFF000000) >> 24 + bank = objId.getBank() # Encode args according to schema if classId not in self.schema: diff --git a/qpid/python/qpid/managementdata.py b/qpid/python/qpid/managementdata.py index fc9eb391b7..5a8b9cdf9d 100644 --- a/qpid/python/qpid/managementdata.py +++ b/qpid/python/qpid/managementdata.py @@ -71,14 +71,14 @@ class ManagementData: # def registerObjId (self, objId): - if not objId in self.idBackMap: - self.idBackMap[objId] = self.nextId + if not objId.index() in self.idBackMap: + self.idBackMap[objId.index()] = self.nextId self.idMap[self.nextId] = objId self.nextId += 1 - def displayObjId (self, objId): - if objId in self.idBackMap: - return self.idBackMap[objId] + def displayObjId (self, objIdIndex): + if objIdIndex in self.idBackMap: + return self.idBackMap[objIdIndex] else: return 0 @@ -86,7 +86,7 @@ class ManagementData: if displayId in self.idMap: return self.idMap[displayId] else: - return 0 + return None def displayClassName (self, cls): (packageName, className, hash) = cls @@ -102,19 +102,20 @@ class ManagementData: self.tables[className] = {} # Register the ID so a more friendly presentation can be displayed - id = long (list[0][1]) - self.registerObjId (id) + objId = list[0][1] + oidx = objId.index() + self.registerObjId (objId) # If this object hasn't been seen before, create a new object record with # the timestamps and empty lists for configuration and instrumentation data. - if id not in self.tables[className]: - self.tables[className][id] = (timestamps, [], []) + if oidx not in self.tables[className]: + self.tables[className][oidx] = (timestamps, [], []) - (unused, oldConf, oldInst) = self.tables[className][id] + (unused, oldConf, oldInst) = self.tables[className][oidx] # For config updates, simply replace old config list with the new one. if context == 0: #config - self.tables[className][id] = (timestamps, list, oldInst) + self.tables[className][oidx] = (timestamps, list, oldInst) # For instrumentation updates, carry the minimum and maximum values for # "hi-lo" stats forward. @@ -132,7 +133,7 @@ class ManagementData: if oldInst[idx][1] < value: value = oldInst[idx][1] newInst.append ((key, value)) - self.tables[className][id] = (timestamps, oldConf, newInst) + self.tables[className][oidx] = (timestamps, oldConf, newInst) finally: self.lock.release () @@ -211,11 +212,13 @@ class ManagementData: pass def refName (self, oid): - if oid == 0: + if oid == None: return "NULL" - return str (self.displayObjId (oid)) + return str (self.displayObjId (oid.index())) def valueDisplay (self, classKey, key, value): + if value == None: + return "<NULL>" for kind in range (2): schema = self.schema[classKey][kind] for item in schema: @@ -437,7 +440,7 @@ class ManagementData: if classKey in self.tables: ids = self.listOfIds(classKey, tokens[1:]) for objId in ids: - (ts, config, inst) = self.tables[classKey][self.rawObjId(objId)] + (ts, config, inst) = self.tables[classKey][self.rawObjId(objId).index()] createTime = self.disp.timestamp (ts[1]) destroyTime = "-" if ts[2] > 0: @@ -486,32 +489,32 @@ class ManagementData: rows = [] timestamp = None - config = self.tables[classKey][ids[0]][1] + config = self.tables[classKey][ids[0].index()][1] for eIdx in range (len (config)): key = config[eIdx][0] if key != "id": row = ("property", key) for id in ids: if timestamp == None or \ - timestamp < self.tables[classKey][id][0][0]: - timestamp = self.tables[classKey][id][0][0] - (key, value) = self.tables[classKey][id][1][eIdx] + timestamp < self.tables[classKey][id.index()][0][0]: + timestamp = self.tables[classKey][id.index()][0][0] + (key, value) = self.tables[classKey][id.index()][1][eIdx] row = row + (self.valueDisplay (classKey, key, value),) rows.append (row) - inst = self.tables[classKey][ids[0]][2] + inst = self.tables[classKey][ids[0].index()][2] for eIdx in range (len (inst)): key = inst[eIdx][0] if key != "id": row = ("statistic", key) for id in ids: - (key, value) = self.tables[classKey][id][2][eIdx] + (key, value) = self.tables[classKey][id.index()][2][eIdx] row = row + (self.valueDisplay (classKey, key, value),) rows.append (row) titleRow = ("Type", "Element") for id in ids: - titleRow = titleRow + (self.refName (id),) + titleRow = titleRow + (self.refName(id),) caption = "Object of type %s.%s:" % (classKey[0], classKey[1]) if timestamp != None: caption = caption + " (last sample time: " + self.disp.timestamp (timestamp) + ")" @@ -563,13 +566,15 @@ class ManagementData: access = self.accessName (config[4]) extra = "" if config[5] == 1: - extra = extra + "index " + extra += "index " if config[6] != None: - extra = extra + "Min: " + str (config[6]) + extra += "Min: " + str(config[6]) + " " if config[7] != None: - extra = extra + "Max: " + str (config[7]) + extra += "Max: " + str(config[7]) + " " if config[8] != None: - extra = extra + "MaxLen: " + str (config[8]) + extra += "MaxLen: " + str(config[8]) + " " + if config[9] == 1: + extra += "optional " rows.append ((name, typename, unit, access, extra, desc)) for config in self.schema[classKey][1]: @@ -613,7 +618,7 @@ class ManagementData: def getClassForId (self, objId): """ Given an object ID, return the class key for the referenced object """ for classKey in self.tables: - if objId in self.tables[classKey]: + if objId.index() in self.tables[classKey]: return classKey return None @@ -659,14 +664,19 @@ class ManagementData: def makeIdRow (self, displayId): if displayId in self.idMap: - rawId = self.idMap[displayId] + objId = self.idMap[displayId] else: return None - return (displayId, - rawId, - (rawId & 0x7FFF000000000000) >> 48, - (rawId & 0x0000FFFFFF000000) >> 24, - (rawId & 0x0000000000FFFFFF)) + if objId.getFlags() == 0: + flags = "" + else: + flags = str(objId.getFlags()) + seq = objId.getSequence() + if seq == 0: + seqText = "<durable>" + else: + seqText = str(seq) + return (displayId, flags, seqText, objId.getBroker(), objId.getBank(), hex(objId.getObject())) def listIds (self, select): rows = [] @@ -683,7 +693,7 @@ class ManagementData: return rows.append(row) self.disp.table("Translation of Display IDs:", - ("DisplayID", "RawID", "BootSequence", "Bank", "Object"), + ("DisplayID", "Flags", "BootSequence", "Broker", "Bank", "Object"), rows) def do_list (self, data): diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml index 850f9c62e6..5952595997 100644 --- a/qpid/specs/management-schema.xml +++ b/qpid/specs/management-schema.xml @@ -61,18 +61,16 @@ =============================================================== --> <class name="Broker"> - <property name="systemRef" type="objId" references="System" access="RC" index="y" desc="System ID" parentRef="y"/> - <property name="port" type="uint16" access="RC" index="y" desc="TCP Port for AMQP Service"/> - <property name="workerThreads" type="uint16" access="RO" desc="Thread pool size"/> - <property name="maxConns" type="uint16" access="RO" desc="Maximum allowed connections"/> - <property name="connBacklog" type="uint16" access="RO" desc="Connection backlog limit for listening socket"/> - <property name="stagingThreshold" type="uint32" access="RO" desc="Broker stages messages over this size to disk"/> - <property name="mgmtPubInterval" type="uint16" access="RW" unit="second" min="1" desc="Interval for management broadcasts"/> - <property name="clusterName" type="sstr" access="RO" - desc="Name of cluster this server is a member of"/> - <property name="version" type="sstr" access="RO" desc="Running software version"/> - <property name="dataDirEnabled" type="bool" access="RO" desc="Persistent configuration storage enabled"/> - <property name="dataDir" type="sstr" access="RO" desc="Persistent configuration storage location"/> + <property name="systemRef" type="objId" references="System" access="RC" index="y" desc="System ID" parentRef="y"/> + <property name="port" type="uint16" access="RC" index="y" desc="TCP Port for AMQP Service"/> + <property name="workerThreads" type="uint16" access="RO" desc="Thread pool size"/> + <property name="maxConns" type="uint16" access="RO" desc="Maximum allowed connections"/> + <property name="connBacklog" type="uint16" access="RO" desc="Connection backlog limit for listening socket"/> + <property name="stagingThreshold" type="uint32" access="RO" desc="Broker stages messages over this size to disk"/> + <property name="mgmtPubInterval" type="uint16" access="RW" unit="second" min="1" desc="Interval for management broadcasts"/> + <property name="clusterName" type="sstr" access="RO" desc="Name of cluster this server is a member of"/> + <property name="version" type="sstr" access="RO" desc="Running software version"/> + <property name="dataDir" type="sstr" access="RO" optional="y" desc="Persistent configuration storage location"/> <method name="joinCluster"> <arg name="clusterName" dir="I" type="sstr"/> @@ -94,6 +92,17 @@ <arg name="username" dir="I" type="sstr"/> <arg name="password" dir="I" type="sstr"/> </method> + + <event name="agentConnect" desc="QMF Management Agent has connected to the broker"> + <arg name="remoteAddress" type="sstr"/> + <arg name="label" type="sstr"/> + <arg name="brokerBank" type="uint32"/> + <arg name="agentBank" type="uint32"/> + </event> + + <event name="agentDisconnect" desc="QMF Management Agent has disconnected from the broker"> + <arg name="remoteAddress" type="sstr"/> + </event> </class> <!-- diff --git a/qpid/specs/management-types.xml b/qpid/specs/management-types.xml index 309c94c98b..1d3e9f900b 100644 --- a/qpid/specs/management-types.xml +++ b/qpid/specs/management-types.xml @@ -19,7 +19,7 @@ under the License. --> -<type name="objId" base="REF" cpp="uint64_t" encode="@.putLongLong(#)" decode="# = @.getLongLong()" accessor="direct" init="0"/> +<type name="objId" base="REF" cpp="ObjectId" encode="#.encode(@)" decode="#.decode(@)" accessor="direct" init="0"/> <type name="uint8" base="U8" cpp="uint8_t" encode="@.putOctet(#)" decode="# = @.getOctet()" accessor="direct" init="0"/> <type name="uint16" base="U16" cpp="uint16_t" encode="@.putShort(#)" decode="# = @.getShort()" accessor="direct" init="0"/> <type name="uint32" base="U32" cpp="uint32_t" encode="@.putLong(#)" decode="# = @.getLong()" accessor="direct" init="0"/> |