diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-03-16 19:19:21 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-03-16 19:19:21 +0000 |
commit | eac621b6c39cdfb5f6bbaa9661ebcb9630062b01 (patch) | |
tree | b1c941062fee2ccecc0c84674508b48080a674b5 | |
parent | f540a63aae900898bfb08399e43b13cde6dfec91 (diff) | |
download | qpid-python-eac621b6c39cdfb5f6bbaa9661ebcb9630062b01.tar.gz |
code gen changes
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@923942 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/include/qpid/management/ManagementEvent.h | 15 | ||||
-rw-r--r-- | qpid/cpp/include/qpid/management/ManagementObject.h | 52 | ||||
-rw-r--r-- | qpid/cpp/managementgen/qmfgen/management-types.xml | 4 | ||||
-rwxr-xr-x | qpid/cpp/managementgen/qmfgen/schema.py | 57 | ||||
-rw-r--r-- | qpid/cpp/managementgen/qmfgen/templates/Args.h | 4 | ||||
-rw-r--r-- | qpid/cpp/managementgen/qmfgen/templates/Class.cpp | 118 | ||||
-rw-r--r-- | qpid/cpp/managementgen/qmfgen/templates/Class.h | 13 | ||||
-rw-r--r-- | qpid/cpp/managementgen/qmfgen/templates/Event.cpp | 31 | ||||
-rw-r--r-- | qpid/cpp/managementgen/qmfgen/templates/Event.h | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 161 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.h | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementObject.cpp | 102 |
12 files changed, 309 insertions, 265 deletions
diff --git a/qpid/cpp/include/qpid/management/ManagementEvent.h b/qpid/cpp/include/qpid/management/ManagementEvent.h index 9327a2b710..d6c0b7078c 100644 --- a/qpid/cpp/include/qpid/management/ManagementEvent.h +++ b/qpid/cpp/include/qpid/management/ManagementEvent.h @@ -23,7 +23,9 @@ */ #include "qpid/management/ManagementObject.h" -#include <qpid/framing/Buffer.h> +//#include <qpid/framing/Buffer.h> +#include "qpid/messaging/MapContent.h" +#include "qpid/messaging/MapView.h" #include <string> namespace qpid { @@ -32,9 +34,10 @@ namespace management { class ManagementAgent; class ManagementEvent : public ManagementItem { -public: - typedef void (*writeSchemaCall_t)(qpid::framing::Buffer&); - //typedef void (*mapEncodeSchemaCall_t)(qpid::messaging::MapContent&); + public: + static const uint8_t MD5_LEN = 16; + //typedef void (*writeSchemaCall_t)(qpid::framing::Buffer&); + typedef void (*writeSchemaCall_t)(qpid::messaging::VariantMap&); virtual ~ManagementEvent() {} virtual writeSchemaCall_t getWriteSchemaCall(void) = 0; @@ -43,8 +46,8 @@ public: virtual std::string& getPackageName() const = 0; virtual uint8_t* getMd5Sum() const = 0; virtual uint8_t getSeverity() const = 0; - virtual void encode(qpid::framing::Buffer&) const = 0; - virtual void mapEncode(qpid::messaging::MapContent&) const = 0; + //virtual void encode(qpid::framing::Buffer&) const = 0; + virtual void mapEncode(qpid::messaging::VariantMap&) const = 0; }; }} diff --git a/qpid/cpp/include/qpid/management/ManagementObject.h b/qpid/cpp/include/qpid/management/ManagementObject.h index 9e4c227e1c..7463fee495 100644 --- a/qpid/cpp/include/qpid/management/ManagementObject.h +++ b/qpid/cpp/include/qpid/management/ManagementObject.h @@ -24,7 +24,7 @@ #include "qpid/sys/Time.h" #include "qpid/sys/Mutex.h" -#include <qpid/framing/Buffer.h> +//#include <qpid/framing/Buffer.h> #include "qpid/CommonImportExport.h" #include "qpid/messaging/MapContent.h" #include "qpid/messaging/MapView.h" @@ -59,7 +59,7 @@ protected: void fromString(const std::string&); public: QPID_COMMON_EXTERN ObjectId() : agent(0), first(0), second(0) {} - QPID_COMMON_EXTERN ObjectId(framing::Buffer& buf) : agent(0) { decode(buf); } + //QPID_COMMON_EXTERN ObjectId(framing::Buffer& buf) : agent(0) { decode(buf); } QPID_COMMON_EXTERN ObjectId(const messaging::Variant& map) : agent(0) { mapDecode(map.asMap()); } QPID_COMMON_EXTERN ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint32_t bank, uint64_t object); QPID_COMMON_EXTERN ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq, uint64_t object); @@ -67,9 +67,9 @@ public: QPID_COMMON_EXTERN ObjectId(const std::string&); QPID_COMMON_EXTERN bool operator==(const ObjectId &other) const; QPID_COMMON_EXTERN bool operator<(const ObjectId &other) const; - QPID_COMMON_EXTERN uint32_t encodedBufSize() const { return 16; }; - QPID_COMMON_EXTERN void encode(framing::Buffer& buffer) const; - QPID_COMMON_EXTERN void decode(framing::Buffer& buffer); + //QPID_COMMON_EXTERN uint32_t encodedSize() const { return 16; }; + //QPID_COMMON_EXTERN void encode(framing::Buffer& buffer) const; + //QPID_COMMON_EXTERN void decode(framing::Buffer& buffer); QPID_COMMON_EXTERN void mapEncode(messaging::VariantMap& map) const; QPID_COMMON_EXTERN void mapDecode(const messaging::VariantMap& map); QPID_COMMON_EXTERN operator messaging::VariantMap() const; @@ -141,18 +141,17 @@ protected: bool forcePublish; QPID_COMMON_EXTERN int getThreadIndex(); - QPID_COMMON_EXTERN void writeTimestamps(qpid::framing::Buffer& buf) const; + //QPID_COMMON_EXTERN void writeTimestamps(qpid::framing::Buffer& buf) const; QPID_COMMON_EXTERN void writeTimestamps(messaging::VariantMap& map) const; - QPID_COMMON_EXTERN void readTimestamps(qpid::framing::Buffer& buf); + //QPID_COMMON_EXTERN void readTimestamps(qpid::framing::Buffer& buf); QPID_COMMON_EXTERN void readTimestamps(const messaging::VariantMap& buf); - QPID_COMMON_EXTERN uint32_t writeTimestampsBufSize() const; - - static const uint8_t MD5_LEN = 16; + //QPID_COMMON_EXTERN uint32_t writeTimestampsSize() const; public: + QPID_COMMON_EXTERN static const uint8_t MD5_LEN = 16; QPID_COMMON_EXTERN static int maxThreads; - typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&); - //typedef void (*mapEncodeSchemaCall_t) (messaging::MapContent&); + //typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&); + typedef void (*writeSchemaCall_t) (qpid::messaging::VariantMap&); ManagementObject(Manageable* _core) : createTime(uint64_t(qpid::sys::Duration(qpid::sys::now()))), @@ -163,14 +162,14 @@ protected: virtual writeSchemaCall_t getWriteSchemaCall() = 0; //virtual mapEncodeSchemaCall_t getMapEncodeSchemaCall() = 0; - virtual void readProperties(qpid::framing::Buffer& buf) = 0; - virtual uint32_t writePropertiesBufSize() const = 0; - virtual void writeProperties(qpid::framing::Buffer& buf) const = 0; - virtual void writeStatistics(qpid::framing::Buffer& buf, - bool skipHeaders = false) = 0; - virtual void doMethod(std::string& methodName, - qpid::framing::Buffer& inBuf, - qpid::framing::Buffer& outBuf) = 0; + //virtual void readProperties(qpid::framing::Buffer& buf) = 0; + //virtual uint32_t writePropertiesBufSize() const = 0; + //virtual void writeProperties(qpid::framing::Buffer& buf) const = 0; + //virtual void writeStatistics(qpid::framing::Buffer& buf, + // bool skipHeaders = false) = 0; + //virtual void doMethod(std::string& methodName, + // qpid::framing::Buffer& inBuf, + // qpid::framing::Buffer& outBuf) = 0; virtual std::string getKey() const = 0; // Encode & Decode the property and statistics values @@ -179,10 +178,9 @@ protected: bool includeProperties, bool includeStatistics) = 0; virtual void mapDecodeValues(const messaging::VariantMap& map) = 0; - // @TODO KAG: fix this - virtual void KAGdoMethod(std::string& methodName, - const messaging::VariantMap& inMap, - messaging::VariantMap& outMap) = 0; + virtual void doMethod(std::string& methodName, + const messaging::VariantMap& inMap, + messaging::VariantMap& outMap) = 0; QPID_COMMON_EXTERN virtual void setReference(ObjectId objectId); @@ -214,9 +212,9 @@ protected: other.getPackageName() == getPackageName(); } - QPID_COMMON_EXTERN void encode(qpid::framing::Buffer& buf) const { writeProperties(buf); } - QPID_COMMON_EXTERN void decode(qpid::framing::Buffer& buf) { readProperties(buf); } - QPID_COMMON_EXTERN uint32_t encodedBufSize() const { return writePropertiesBufSize(); } + // QPID_COMMON_EXTERN void encode(qpid::framing::Buffer& buf) const { writeProperties(buf); } + // QPID_COMMON_EXTERN void decode(qpid::framing::Buffer& buf) { readProperties(buf); } + //QPID_COMMON_EXTERN uint32_t encodedSize() const { return writePropertiesSize(); } // Encode/Decode the entire object as a map QPID_COMMON_EXTERN void mapEncode(messaging::VariantMap& map, diff --git a/qpid/cpp/managementgen/qmfgen/management-types.xml b/qpid/cpp/managementgen/qmfgen/management-types.xml index e5994ff5b4..d2c1bdc9e0 100644 --- a/qpid/cpp/managementgen/qmfgen/management-types.xml +++ b/qpid/cpp/managementgen/qmfgen/management-types.xml @@ -39,8 +39,8 @@ <type name="deltaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong(#)" decode="# = @.getLongLong()" stream="#" size="8" accessor="direct" init="0"/> <type name="float" base="FLOAT" cpp="float" encode="@.putFloat(#)" decode="# = @.getFloat()" stream="#" size="4" accessor="direct" init="0."/> <type name="double" base="DOUBLE" cpp="double" encode="@.putDouble(#)" decode="# = @.getDouble()" stream="#" size="8" accessor="direct" init="0."/> -<type name="uuid" base="UUID" cpp="::qpid::framing::Uuid" encode="#.encode(@)" decode="#.decode(@)" stream="#" size="16" accessor="direct" init="::qpid::framing::Uuid()" byRef="y" unmap="(#).asUuid().data()" map="::qpid::messaging::Uuid((#).data())" /> -<type name="map" base="FTABLE" cpp="::qpid::framing::FieldTable" encode="#.encode(@)" decode="#.decode(@)" stream="#" size="#.encodedSize()" accessor="direct" init="::qpid::framing::FieldTable()" byRef="y" unmap="::qpid::framing::FieldTable(); assert(false); /*TBD*/" map='(assert(false),"TBD: unsupported type")' /> +<type name="uuid" base="UUID" cpp="::qpid::messaging::Uuid" encode="#.encode(@)" decode="#.decode(@)" stream="#" size="16" accessor="direct" init="::qpid::messaging::Uuid()" byRef="y" unmap="(#).asUuid().data()" map="::qpid::messaging::Uuid((#).data())" /> +<type name="map" base="FTABLE" cpp="::qpid::messaging::VariantMap" encode="#.encode(@)" decode="#.decode(@)" stream="#" size="#.encodedSize()" accessor="direct" init="::qpid::messaging::VariantMap()" byRef="y" unmap="::qpid::messaging::VariantMap(); assert(false); /*TBD*/" map='(assert(false),"TBD: unsupported type")' /> <type name="hilo8" base="U8" cpp="uint8_t" encode="@.putOctet(#)" decode="# = @.getOctet()" style="wm" stream="#" size="1" accessor="counter" init="0"/> <type name="hilo16" base="U16" cpp="uint16_t" encode="@.putShort(#)" decode="# = @.getShort()" style="wm" stream="#" size="2" accessor="counter" init="0"/> diff --git a/qpid/cpp/managementgen/qmfgen/schema.py b/qpid/cpp/managementgen/qmfgen/schema.py index 2f6bde78bc..5ce57bc268 100755 --- a/qpid/cpp/managementgen/qmfgen/schema.py +++ b/qpid/cpp/managementgen/qmfgen/schema.py @@ -443,7 +443,7 @@ class SchemaProperty: stream.write (" buf.put (ft);\n\n") - def genMapSchema(self, stream): + def genSchemaMap(self, stream): stream.write (" {\n") stream.write (" ::qpid::messaging::VariantMap _value;\n") stream.write (" _value[TYPE] = TYPE_" + self.type.type.base +";\n") @@ -602,7 +602,7 @@ class SchemaStatistic: stream.write (" ft.setString (DESC, \"" + desc + "\");\n") stream.write (" buf.put (ft);\n\n") - def genMapSchemaText(self, stream, name, desc): + def genSchemaTextMap(self, stream, name, desc): stream.write (" {\n") stream.write (" ::qpid::messaging::VariantMap _value;\n") stream.write (" _value[TYPE] = TYPE_" + self.type.type.base +";\n") @@ -639,6 +639,32 @@ class SchemaStatistic: self.genSchemaText (stream, self.name + "Max", descMax) self.genSchemaText (stream, self.name + "Average", descAverage) + def genSchemaMap (self, stream): + if self.type.type.style != "mma": + self.genSchemaTextMap (stream, self.name, self.desc) + if self.type.type.style == "wm": + descHigh = self.desc + descLow = self.desc + if self.desc != None: + descHigh = descHigh + " (High)" + descLow = descLow + " (Low)" + self.genSchemaTextMap (stream, self.name + "High", descHigh) + self.genSchemaTextMap (stream, self.name + "Low", descLow) + if self.type.type.style == "mma": + descCount = self.desc + descMin = self.desc + descMax = self.desc + descAverage = self.desc + if self.desc != None: + descCount = descCount + " (Samples)" + descMin = descMin + " (Min)" + descMax = descMax + " (Max)" + descAverage = descAverage + " (Average)" + self.genSchemaTextMap (stream, self.name + "Samples", descCount) + self.genSchemaTextMap (stream, self.name + "Min", descMin) + self.genSchemaTextMap (stream, self.name + "Max", descMax) + self.genSchemaTextMap (stream, self.name + "Average", descAverage) + def genAssign (self, stream): if self.assign != None: if self.type.type.perThread: @@ -775,7 +801,7 @@ class SchemaArg: stream.write (" ft.setString (DESC, \"" + self.desc + "\");\n") stream.write (" buf.put (ft);\n\n") - def genMapSchema (self, stream, event=False): + def genSchemaMap (self, stream, event=False): stream.write (" {\n") stream.write (" ::qpid::messaging::VariantMap _avalue;\n") stream.write (" _avalue[TYPE] = TYPE_" + self.type.type.base +";\n") @@ -878,7 +904,7 @@ class SchemaMethod: for arg in self.args: arg.genSchema (stream) - def genMapSchema (self, stream, variables): + def genSchemaMap (self, stream, variables): stream.write (" {\n") stream.write (" ::qpid::messaging::VariantMap _value;\n") stream.write (" ::qpid::messaging::VariantMap _args;\n") @@ -887,9 +913,12 @@ class SchemaMethod: stream.write (" _value[DESC] = \"" + self.desc + "\";\n") for arg in self.args: - arg.genSchema (stream) + arg.genSchemaMap (stream) + + stream.write (" if (!_args.empty())\n") + stream.write (" _value[ARGS] = _args;\n") + - stream.write (" _value[ARGS] = _args;\n") stream.write (" _methods[\"" + self.name + "\"] = _value;\n") stream.write (" }\n\n") @@ -1025,6 +1054,10 @@ class SchemaEvent: for arg in self.args: arg.genSchema(stream, True) + def genArgSchemaMap(self, stream, variables): + for arg in self.args: + arg.genSchemaMap(stream, True) + def genSchemaMD5(self, stream, variables): sum = self.hash.getDigest() for idx in range (len (sum)): @@ -1388,6 +1421,10 @@ class SchemaClass: for prop in self.properties: prop.genSchema (stream) + def genPropertySchemaMap (self, stream, variables): + for prop in self.properties: + prop.genSchemaMap(stream) + def genSetGeneralReferenceDeclaration (self, stream, variables): for prop in self.properties: if prop.isGeneralRef: @@ -1397,6 +1434,10 @@ class SchemaClass: for stat in self.statistics: stat.genSchema (stream) + def genStatisticSchemaMap (self, stream, variables): + for stat in self.statistics: + stat.genSchemaMap(stream) + def genMethodIdDeclarations (self, stream, variables): number = 1 for method in self.methods: @@ -1408,6 +1449,10 @@ class SchemaClass: for method in self.methods: method.genSchema (stream, variables) + def genMethodSchemaMap(self, stream, variables): + for method in self.methods: + method.genSchemaMap(stream, variables) + def genNameCap (self, stream, variables): stream.write (capitalize(self.name)) diff --git a/qpid/cpp/managementgen/qmfgen/templates/Args.h b/qpid/cpp/managementgen/qmfgen/templates/Args.h index 074ccf9940..20681ab477 100644 --- a/qpid/cpp/managementgen/qmfgen/templates/Args.h +++ b/qpid/cpp/managementgen/qmfgen/templates/Args.h @@ -24,8 +24,8 @@ /*MGEN:Root.Disclaimer*/ #include "qpid/management/Args.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/framing/Uuid.h" +//#include "qpid/framing/FieldTable.h" +//#include "qpid/framing/Uuid.h" #include <string> namespace qmf { diff --git a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp b/qpid/cpp/managementgen/qmfgen/templates/Class.cpp index 15f493bfb3..19142a3238 100644 --- a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp +++ b/qpid/cpp/managementgen/qmfgen/templates/Class.cpp @@ -21,15 +21,13 @@ /*MGEN:Root.Disclaimer*/ #include "qpid/log/Statement.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/management/Manageable.h" +#include "qpid/management/Manageable.h" #include "qpid//*MGEN:Class.AgentHeaderLocation*//ManagementAgent.h" #include "/*MGEN:Class.NameCap*/.h" /*MGEN:Class.MethodArgIncludes*/ #include <iostream> using namespace qmf::/*MGEN:Class.Namespace*/; -using namespace qpid::framing; using namespace qpid::messaging; using qpid::management::ManagementAgent; using qpid::management::Manageable; @@ -91,27 +89,37 @@ void /*MGEN:Class.NameCap*/::registerSelf(ManagementAgent* agent) agent->registerClass(packageName, className, md5Sum, writeSchema); } -void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf) +void /*MGEN:Class.NameCap*/::writeSchema (::qpid::messaging::VariantMap& map) { - FieldTable ft; + ::qpid::messaging::Variant::Map _sid; + ::qpid::messaging::Variant::Map _props; + ::qpid::messaging::Variant::Map _stats; + ::qpid::messaging::Variant::Map _methods; - // Schema class header: - buf.putOctet (CLASS_KIND_TABLE); - buf.putShortString (packageName); // Package Name - buf.putShortString (className); // Class Name - buf.putBin128 (md5Sum); // Schema Hash - buf.putShort (/*MGEN:Class.ConfigCount*/); // Config Element Count - buf.putShort (/*MGEN:Class.InstCount*/); // Inst Element Count - buf.putShort (/*MGEN:Class.MethodCount*/); // Method Count + _sid["_type"] = CLASS_KIND_TABLE; + _sid["_package_name"] = packageName; + _sid["_class_name"] = className; + _sid["_hash_str"] = std::string((const char *)md5Sum, sizeof(md5Sum)); + map["_schema_id"] = _sid; + + map["_config_ct"] = /*MGEN:Class.ConfigCount*/; + map["_inst_ct"] = /*MGEN:Class.InstCount*/; + map["_method_ct"] = /*MGEN:Class.MethodCount*/; // Properties -/*MGEN:Class.PropertySchema*/ +/*MGEN:Class.PropertySchemaMap*/ + if (!_props.empty()) + map["_properties"] = _props; // Statistics -/*MGEN:Class.StatisticSchema*/ +/*MGEN:Class.StatisticSchemaMap*/ + if (!_stats.empty()) + map["_statistics"] = _stats; // Methods -/*MGEN:Class.MethodSchema*/ +/*MGEN:Class.MethodSchemaMap*/ + if (!_methods.empty()) + map["_methods"] = _methods; } /*MGEN:IF(Class.ExistPerThreadStats)*/ @@ -127,82 +135,6 @@ void /*MGEN:Class.NameCap*/::aggregatePerThreadStats(struct PerThreadStats* tota } /*MGEN:ENDIF*/ -uint32_t /*MGEN:Class.NameCap*/::writePropertiesBufSize() const -{ - uint32_t size = writeTimestampsBufSize(); -/*MGEN:IF(Class.ExistOptionals)*/ - size += /*MGEN:Class.PresenceMaskBytes*/; -/*MGEN:ENDIF*/ -/*MGEN:Class.SizeProperties*/ - return size; -} - -void /*MGEN:Class.NameCap*/::readProperties (Buffer& buf) -{ - ::qpid::sys::Mutex::ScopedLock mutex(accessLock); - readTimestamps(buf); -/*MGEN:IF(Class.ExistOptionals)*/ - for (uint8_t idx = 0; idx < /*MGEN:Class.PresenceMaskBytes*/; idx++) - presenceMask[idx] = buf.getOctet(); -/*MGEN:ENDIF*/ -/*MGEN:Class.ReadProperties*/ -} - -void /*MGEN:Class.NameCap*/::writeProperties (Buffer& buf) const -{ - ::qpid::sys::Mutex::ScopedLock mutex(accessLock); - configChanged = false; - - writeTimestamps (buf); -/*MGEN:IF(Class.ExistOptionals)*/ - for (uint8_t idx = 0; idx < /*MGEN:Class.PresenceMaskBytes*/; idx++) - buf.putOctet(presenceMask[idx]); -/*MGEN:ENDIF*/ -/*MGEN:Class.WriteProperties*/ -} - -void /*MGEN:Class.NameCap*/::writeStatistics (Buffer& buf, bool skipHeaders) -{ - ::qpid::sys::Mutex::ScopedLock mutex(accessLock); - instChanged = false; -/*MGEN:IF(Class.ExistPerThreadAssign)*/ - for (int idx = 0; idx < maxThreads; idx++) { - struct PerThreadStats* threadStats = perThreadStatsArray[idx]; - if (threadStats != 0) { -/*MGEN:Class.PerThreadAssign*/ - } - } -/*MGEN:ENDIF*/ -/*MGEN:IF(Class.ExistPerThreadStats)*/ - struct PerThreadStats totals; - aggregatePerThreadStats(&totals); -/*MGEN:ENDIF*/ -/*MGEN:Class.Assign*/ - if (!skipHeaders) - writeTimestamps (buf); -/*MGEN:Class.WriteStatistics*/ - - // Maintenance of hi-lo statistics -/*MGEN:Class.HiLoStatResets*/ -/*MGEN:IF(Class.ExistPerThreadResets)*/ - for (int idx = 0; idx < maxThreads; idx++) { - struct PerThreadStats* threadStats = perThreadStatsArray[idx]; - if (threadStats != 0) { -/*MGEN:Class.PerThreadHiLoStatResets*/ - } - } -/*MGEN:ENDIF*/ -} - -void /*MGEN:Class.NameCap*/::doMethod (/*MGEN:Class.DoMethodArgs*/) -{ - Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; - std::string text; - -/*MGEN:Class.MethodHandlers*/ - outBuf.putLong(status); - outBuf.putShortString(Manageable::StatusText(status, text)); -} std::string /*MGEN:Class.NameCap*/::getKey() const { @@ -267,7 +199,7 @@ void /*MGEN:Class.NameCap*/::mapDecodeValues (const ::qpid::messaging::VariantMa /*MGEN:Class.MapDecodeProperties*/ } -void /*MGEN:Class.NameCap*/::KAGdoMethod (/*MGEN:Class.DoMapMethodArgs*/) +void /*MGEN:Class.NameCap*/::doMethod (/*MGEN:Class.DoMapMethodArgs*/) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; std::string text; diff --git a/qpid/cpp/managementgen/qmfgen/templates/Class.h b/qpid/cpp/managementgen/qmfgen/templates/Class.h index 61dc395013..54096437e1 100644 --- a/qpid/cpp/managementgen/qmfgen/templates/Class.h +++ b/qpid/cpp/managementgen/qmfgen/templates/Class.h @@ -24,8 +24,6 @@ /*MGEN:Root.Disclaimer*/ #include "qpid/management/ManagementObject.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/framing/Uuid.h" #include "qpid/messaging/MapContent.h" #include "qpid/messaging/MapView.h" @@ -76,19 +74,12 @@ class /*MGEN:Class.NameCap*/ : public ::qpid::management::ManagementObject void aggregatePerThreadStats(struct PerThreadStats*) const; /*MGEN:ENDIF*/ public: - static void writeSchema(::qpid::framing::Buffer& buf); - uint32_t writePropertiesBufSize() const; - void readProperties(::qpid::framing::Buffer& buf); - void writeProperties(::qpid::framing::Buffer& buf) const; - void writeStatistics(::qpid::framing::Buffer& buf, bool skipHeaders = false); - void doMethod(std::string& methodName, - ::qpid::framing::Buffer& inBuf, - ::qpid::framing::Buffer& outBuf); + static void writeSchema(::qpid::messaging::VariantMap& map); void mapEncodeValues(::qpid::messaging::VariantMap& map, bool includeProperties=true, bool includeStatistics=true); void mapDecodeValues(const ::qpid::messaging::VariantMap& map); - void KAGdoMethod(std::string& methodName, + void doMethod(std::string& methodName, const ::qpid::messaging::VariantMap& inMap, ::qpid::messaging::VariantMap& outMap); std::string getKey() const; diff --git a/qpid/cpp/managementgen/qmfgen/templates/Event.cpp b/qpid/cpp/managementgen/qmfgen/templates/Event.cpp index 0db0421f75..67eac4262b 100644 --- a/qpid/cpp/managementgen/qmfgen/templates/Event.cpp +++ b/qpid/cpp/managementgen/qmfgen/templates/Event.cpp @@ -21,13 +21,11 @@ /*MGEN:Root.Disclaimer*/ #include "qpid/log/Statement.h" -#include "qpid/framing/FieldTable.h" #include "qpid/management/Manageable.h" #include "qpid//*MGEN:Event.AgentHeaderLocation*//ManagementAgent.h" #include "Event/*MGEN:Event.NameCap*/.h" using namespace qmf::/*MGEN:Event.Namespace*/; -using namespace qpid::framing; using qpid::management::ManagementAgent; using qpid::management::Manageable; using qpid::management::ManagementObject; @@ -56,28 +54,27 @@ void Event/*MGEN:Event.NameCap*/::registerSelf(ManagementAgent* agent) agent->registerEvent(packageName, eventName, md5Sum, writeSchema); } -void Event/*MGEN:Event.NameCap*/::writeSchema (Buffer& buf) +void Event/*MGEN:Event.NameCap*/::writeSchema (::qpid::messaging::VariantMap& map) { - FieldTable ft; + ::qpid::messaging::Variant::Map _sid; + ::qpid::messaging::Variant::Map _args; // Schema class header: - buf.putOctet (CLASS_KIND_EVENT); - buf.putShortString (packageName); // Package Name - buf.putShortString (eventName); // Event Name - buf.putBin128 (md5Sum); // Schema Hash - buf.putOctet (0); // No Superclass - buf.putShort (/*MGEN:Event.ArgCount*/); // Argument Count - // Arguments -/*MGEN:Event.ArgSchema*/ -} + _sid["_type"] = CLASS_KIND_EVENT; + _sid["_package_name"] = packageName; + _sid["_class_name"] = eventName; + _sid["_hash_str"] = std::string((const char *)md5Sum, sizeof(md5Sum)); + map["_schema_id"] = _sid; -void Event/*MGEN:Event.NameCap*/::encode(::qpid::framing::Buffer& buf) const -{ -/*MGEN:Event.ArgEncodes*/ + + // Arguments +/*MGEN:Event.ArgSchemaMap*/ + if (!_args.empty()) + map["_arguments"] = _args; } -void Event/*MGEN:Event.NameCap*/::mapEncode(::qpid::messaging::MapContent& map) const +void Event/*MGEN:Event.NameCap*/::mapEncode(::qpid::messaging::VariantMap& map) const { using namespace ::qpid::messaging; /*MGEN:Event.ArgMap*/ diff --git a/qpid/cpp/managementgen/qmfgen/templates/Event.h b/qpid/cpp/managementgen/qmfgen/templates/Event.h index 20d51a9f82..a29605adba 100644 --- a/qpid/cpp/managementgen/qmfgen/templates/Event.h +++ b/qpid/cpp/managementgen/qmfgen/templates/Event.h @@ -24,8 +24,6 @@ /*MGEN:Root.Disclaimer*/ #include "qpid/management/ManagementEvent.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/framing/Uuid.h" #include "qpid/messaging/MapContent.h" #include "qpid/messaging/MapView.h" @@ -35,10 +33,10 @@ namespace qmf { class Event/*MGEN:Event.NameCap*/ : public ::qpid::management::ManagementEvent { private: - static void writeSchema (::qpid::framing::Buffer& buf); + static void writeSchema (::qpid::messaging::VariantMap& map); static std::string packageName; static std::string eventName; - static uint8_t md5Sum[16]; + static uint8_t md5Sum[MD5_LEN]; /*MGEN:Event.ArgDeclarations*/ @@ -53,8 +51,7 @@ class Event/*MGEN:Event.NameCap*/ : public ::qpid::management::ManagementEvent std::string& getEventName() const { return eventName; } uint8_t* getMd5Sum() const { return md5Sum; } uint8_t getSeverity() const { return /*MGEN:Event.Severity*/; } - void encode(::qpid::framing::Buffer& buffer) const; - void mapEncode(::qpid::messaging::MapContent& map) const; + void mapEncode(::qpid::messaging::VariantMap& map) const; }; }/*MGEN:Event.CloseNamespaces*/ diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 71899561a9..5b2148a850 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -22,6 +22,7 @@ #include "qpid/management/ManagementObject.h" #include "qpid/log/Statement.h" #include "qpid/agent/ManagementAgentImpl.h" +#include "qpid/messaging/Message.h" #include <list> #include <string.h> #include <stdlib.h> @@ -79,7 +80,7 @@ const string ManagementAgentImpl::storeMagicNumber("MA02"); ManagementAgentImpl::ManagementAgentImpl() : interval(10), extThread(false), pipeHandle(0), notifyCallback(0), notifyContext(0), notifyable(0), inCallback(false), - initialized(false), connected(false), lastFailure("never connected"), + initialized(false), connected(false), useMapMsg(false), lastFailure("never connected"), clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0), assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0), connThreadBody(*this), connThread(connThreadBody), @@ -204,16 +205,24 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." << event.getPackageName() << "." << event.getEventName(); - encodeHeader(outBuffer, 'e'); - outBuffer.putShortString(event.getPackageName()); - outBuffer.putShortString(event.getEventName()); - outBuffer.putBin128(event.getMd5Sum()); - outBuffer.putLongLong(uint64_t(Duration(now()))); - outBuffer.putOctet(sev); - event.encode(outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", key.str()); + ::qpid::messaging::Message msg; + ::qpid::messaging::MapContent content(msg); + ::qpid::messaging::VariantMap &map_ = content.asMap(); + ::qpid::messaging::VariantMap schemaId; + ::qpid::messaging::VariantMap values; + + mapEncodeHeader(map_, 'e'); + + map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(), + event.getEventName(), + event.getMd5Sum()); + event.mapEncode(values); + map_["_values"] = values; + map_["_timestamp"] = uint64_t(Duration(now())); + map_["_severity"] = sev; + + content.encode(); + connThreadBody.sendBuffer(msg.getContent(), "qpid.management", key.str()); } uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) @@ -494,18 +503,22 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st ManagementObjectMap::iterator iter = managementObjects.find(selector); if (iter != managementObjects.end()) { ManagementObject* object = iter->second; - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ::qpid::messaging::Message m; + ::qpid::messaging::ListContent content(m); + ::qpid::messaging::Variant::List &list_ = content.asList(); + ::qpid::messaging::Variant::Map map_; + ::qpid::messaging::Variant::Map values; if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); - encodeHeader(outBuffer, 'g', sequence); - object->writeProperties(outBuffer); - object->writeStatistics(outBuffer, true); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); + mapEncodeHeader(map_, 'g', sequence); + object->mapEncodeValues(values, true, true); // write both stats and properties + map_["_values"] = values; + list.push_back(map_); + + content.encode(); + connThreadBody.sendBuffer(m.getContent(), "amq.direct", replyTo); QPID_LOG(trace, "SENT ObjectInd"); } @@ -520,18 +533,22 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st iter++) { ManagementObject* object = iter->second; if (object->getClassName() == className) { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ::qpid::messaging::Message m; + ::qpid::messaging::ListContent content(m); + ::qpid::messaging::Variant::List &list_ = content.asList(); + ::qpid::messaging::Variant::Map map_; + ::qpid::messaging::Variant::Map values; if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); - encodeHeader(outBuffer, 'g', sequence); - object->writeProperties(outBuffer); - object->writeStatistics(outBuffer, true); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); + mapEncodeHeader(map_, 'g', sequence); + object->mapEncodeValues(values, true, true); // write both stats and properties + map_["_values"] = values; + list.push_back(map_); + + content.encode(); + connThreadBody.sendBuffer(m.getContent(), "amq.direct", replyTo); QPID_LOG(trace, "SENT ObjectInd"); } @@ -596,6 +613,7 @@ void ManagementAgentImpl::received(Message& msg) } } + void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq) { buf.putOctet('A'); @@ -605,6 +623,36 @@ void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq buf.putLong (seq); } +void ManagementAgentImpl::mapEncodeHeader(::qpid::messaging::VariantMap &map_, uint8_t opcode, uint32_t seq) +{ + map_["_version"] = "AM2"; + map_["_opcode"] = opcode; + map_["_sequence"] = seq; +} + + +void ManagementAgentImpl::mapEncodeHeader(::qpid::messaging::VariantMap &map_, uint8_t opcode, uint32_t seq) +{ + map_["_version"] = "AM2"; + map_["_opcode"] = opcode; + map_["_sequence"] = seq; +} + + +qpid::messaging::Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const std::string& pname, + const std::string& cname, + const uint8_t *md5Sum) +{ + qpid::messaging::Variant::Map map_; + + map_["_package_name"] = pname; + map_["_class_name"] = cname; + map_["_hash_str"] = std::string((const char *)md5Sum, + qpid::managment::ManagmentObject::MD5_LEN); + return map_; +} + + bool ManagementAgentImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) { if (buf.getSize() < 8) @@ -699,9 +747,7 @@ void ManagementAgentImpl::encodeClassIndication(Buffer& buf, void ManagementAgentImpl::periodicProcessing() { -#define BUFSIZE 65536 Mutex::ScopedLock lock(agentLock); - char msgChars[BUFSIZE]; uint32_t contentSize; list<pair<ObjectId, ManagementObject*> > deleteList; @@ -743,7 +789,10 @@ void ManagementAgentImpl::periodicProcessing() !baseObject->isDeleted())) continue; - Buffer msgBuffer(msgChars, BUFSIZE); + ::qpid::messaging::Message m; + ::qpid::messaging::ListContent content(m); + ::qpid::messaging::Variant::List &list_ = content.asList(); + for (ManagementObjectMap::iterator iter = baseIter; iter != managementObjects.end(); iter++) { @@ -754,31 +803,41 @@ void ManagementAgentImpl::periodicProcessing() object->setUpdateTime(); if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) { - encodeHeader(msgBuffer, 'c'); - object->writeProperties(msgBuffer); + ::qpid::messaging::Variant::Map map_; + ::qpid::messaging::Variant::Map values; + mapEncodeHeader(map_, 'c'); + + object->getPackageName(); + object->getClassName(); + (object->getMd5Sum(), MD5_LEN); + + object->mapEncodeValues(values, true, false); // encode properties only + map_["_values"] = values; + list.push_back(map_); } - + if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) { - encodeHeader(msgBuffer, 'i'); - object->writeStatistics(msgBuffer); + ::qpid::messaging::Variant::Map map_; + ::qpid::messaging::Variant::Map values; + mapEncodeHeader(map_, 'i'); + object->mapEncodeValues(values, false, true); // encode statistics only + map_["_values"] = values; + list.push_back(map_); } if (object->isDeleted()) deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object)); object->setForcePublish(false); - - if (msgBuffer.available() < (BUFSIZE / 2)) - break; } } - contentSize = BUFSIZE - msgBuffer.available(); - if (contentSize > 0) { - msgBuffer.reset(); + content.encode(); + const std::string &str = m.getContent(); + if (str.length()) { stringstream key; key << "console.obj." << assignedBrokerBank << "." << assignedAgentBank << "." << baseObject->getPackageName() << "." << baseObject->getClassName(); - connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str()); + connThreadBody.sendBuffer(str, "qpid.management", key.str()); } } @@ -793,6 +852,8 @@ void ManagementAgentImpl::periodicProcessing() deleteList.clear(); { +#define BUFSIZE 65536 + char msgChars[BUFSIZE]; Buffer msgBuffer(msgChars, BUFSIZE); encodeHeader(msgBuffer, 'h'); msgBuffer.putLongLong(uint64_t(Duration(now()))); @@ -890,6 +951,18 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, const string& exchange, const string& routingKey) { + string data; + + buf.getRawData(data, length); + sendBuffer(data, exchange, routingKey); +} + + + +void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data, + const string& exchange, + const string& routingKey) +{ ConnectionThread::shared_ptr s; { Mutex::ScopedLock _lock(connLock); @@ -899,9 +972,7 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, } Message msg; - string data; - buf.getRawData(data, length); msg.getDeliveryProperties().setRoutingKey(routingKey); msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); msg.setData(data); @@ -915,6 +986,8 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, } } + + void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank) { stringstream key; diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h index affaa45d2d..b3130154df 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -155,6 +155,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen client::ConnectionSettings connectionSettings; bool initialized; bool connected; + bool useMapMsg; std::string lastFailure; bool clientWasAdded; @@ -198,6 +199,9 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen uint32_t length, const std::string& exchange, const std::string& routingKey); + void sendBuffer(const std::string& data, + const std::string& exchange, + const std::string& routingKey); void bindToBank(uint32_t brokerBank, uint32_t agentBank); void close(); bool isSleeping() const; @@ -237,6 +241,10 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen PackageMap::iterator pIter, ClassMap::iterator cIter); void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); + void mapEncodeHeader (::qpid::messaging::VariantMap& map_, uint8_t opcode, uint32_t seq = 0); + qpid::messaging::Variant::Map mapEncodeSchemaId(const std::string& pname, + const std::string& cname, + const uint8_t *md5Sum); bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); void sendCommandComplete (std::string replyToKey, uint32_t sequence, uint32_t code = 0, std::string text = std::string("OK")); diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp index a72767ea93..0434c59194 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.cpp +++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp @@ -21,7 +21,7 @@ #include "qpid/management/Manageable.h" #include "qpid/management/ManagementObject.h" -#include "qpid/framing/FieldTable.h" +//#include "qpid/framing/FieldTable.h" #include "qpid/sys/Thread.h" #include <stdlib.h> @@ -123,20 +123,20 @@ bool ObjectId::equalV1(const ObjectId &other) const return first == otherFirst && second == other.second; } -void ObjectId::encode(framing::Buffer& buffer) const -{ - if (agent == 0) - buffer.putLongLong(first); - else - buffer.putLongLong(first | agent->first); - buffer.putLongLong(second); -} - -void ObjectId::decode(framing::Buffer& buffer) -{ - first = buffer.getLongLong(); - second = buffer.getLongLong(); -} +// void ObjectId::encode(framing::Buffer& buffer) const +// { +// if (agent == 0) +// buffer.putLongLong(first); +// else +// buffer.putLongLong(first | agent->first); +// buffer.putLongLong(second); +// } + +// void ObjectId::decode(framing::Buffer& buffer) +// { +// first = buffer.getLongLong(); +// second = buffer.getLongLong(); +// } void ObjectId::setV2Key(const ManagementObject& object) { @@ -201,42 +201,42 @@ std::ostream& operator<<(std::ostream& out, const ObjectId& i) int ManagementObject::maxThreads = 1; int ManagementObject::nextThreadIndex = 0; -void ManagementObject::writeTimestamps (framing::Buffer& buf) const -{ - buf.putShortString (getPackageName ()); - buf.putShortString (getClassName ()); - buf.putBin128 (getMd5Sum ()); - buf.putLongLong (updateTime); - buf.putLongLong (createTime); - buf.putLongLong (destroyTime); - objectId.encode(buf); -} - -void ManagementObject::readTimestamps (framing::Buffer& buf) -{ - std::string unused; - uint8_t unusedUuid[16]; - ObjectId unusedObjectId; - - buf.getShortString(unused); - buf.getShortString(unused); - buf.getBin128(unusedUuid); - updateTime = buf.getLongLong(); - createTime = buf.getLongLong(); - destroyTime = buf.getLongLong(); - unusedObjectId.decode(buf); -} - -uint32_t ManagementObject::writeTimestampsBufSize() const -{ - return 1 + getPackageName().length() + // str8 - 1 + getClassName().length() + // str8 - 16 + // bin128 - 8 + // uint64 - 8 + // uint64 - 8 + // uint64 - objectId.encodedBufSize(); // objectId -} +// void ManagementObject::writeTimestamps (framing::Buffer& buf) const +// { +// buf.putShortString (getPackageName ()); +// buf.putShortString (getClassName ()); +// buf.putBin128 (getMd5Sum ()); +// buf.putLongLong (updateTime); +// buf.putLongLong (createTime); +// buf.putLongLong (destroyTime); +// objectId.encode(buf); +// } + +// void ManagementObject::readTimestamps (framing::Buffer& buf) +// { +// std::string unused; +// uint8_t unusedUuid[16]; +// ObjectId unusedObjectId; + +// buf.getShortString(unused); +// buf.getShortString(unused); +// buf.getBin128(unusedUuid); +// updateTime = buf.getLongLong(); +// createTime = buf.getLongLong(); +// destroyTime = buf.getLongLong(); +// unusedObjectId.decode(buf); +// } + +// uint32_t ManagementObject::writeTimestampsSize() const +// { +// return 1 + getPackageName().length() + // str8 +// 1 + getClassName().length() + // str8 +// 16 + // bin128 +// 8 + // uint64 +// 8 + // uint64 +// 8 + // uint64 +// objectId.encodedSize(); // objectId +// } void ManagementObject::writeTimestamps (messaging::VariantMap& map) const |