diff options
-rw-r--r-- | qpid/cpp/include/qpid/management/ManagementEvent.h | 2 | ||||
-rw-r--r-- | qpid/cpp/include/qpid/management/ManagementObject.h | 33 | ||||
-rw-r--r-- | qpid/cpp/managementgen/qmfgen/management-types.xml | 18 | ||||
-rwxr-xr-x | qpid/cpp/managementgen/qmfgen/schema.py | 27 | ||||
-rw-r--r-- | qpid/cpp/managementgen/qmfgen/templates/Class.cpp | 136 | ||||
-rw-r--r-- | qpid/cpp/managementgen/qmfgen/templates/Class.h | 8 | ||||
-rw-r--r-- | qpid/cpp/managementgen/qmfgen/templates/Event.cpp | 20 | ||||
-rw-r--r-- | qpid/cpp/managementgen/qmfgen/templates/Event.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.cpp | 664 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.h | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementObject.cpp | 133 | ||||
-rw-r--r-- | qpid/extras/qmf/src/py/qmf/console.py | 2 |
14 files changed, 767 insertions, 299 deletions
diff --git a/qpid/cpp/include/qpid/management/ManagementEvent.h b/qpid/cpp/include/qpid/management/ManagementEvent.h index ce2f28b943..b809001b1b 100644 --- a/qpid/cpp/include/qpid/management/ManagementEvent.h +++ b/qpid/cpp/include/qpid/management/ManagementEvent.h @@ -45,7 +45,7 @@ class ManagementEvent : public ManagementItem { virtual std::string& getPackageName() const = 0; virtual uint8_t* getMd5Sum() const = 0; virtual uint8_t getSeverity() const = 0; - //virtual void encode(qpid::framing::Buffer&) const = 0; + virtual void encode(std::string&) const = 0; virtual void mapEncode(qpid::messaging::VariantMap&) const = 0; }; diff --git a/qpid/cpp/include/qpid/management/ManagementObject.h b/qpid/cpp/include/qpid/management/ManagementObject.h index 1bb3c57ed2..f580dda575 100644 --- a/qpid/cpp/include/qpid/management/ManagementObject.h +++ b/qpid/cpp/include/qpid/management/ManagementObject.h @@ -53,14 +53,15 @@ class ObjectId { protected: const AgentAttachment* agent; uint64_t first; + uint64_t second; uint64_t agentEpoch; std::string v2Key; std::string agentName; void fromString(const std::string&); public: - QPID_COMMON_EXTERN ObjectId() : agent(0), first(0) {} + QPID_COMMON_EXTERN ObjectId() : agent(0), first(0), second(0) {} QPID_COMMON_EXTERN ObjectId(const messaging::Variant& map) : - agent(0), first(0), agentEpoch(0) { mapDecode(map.asMap()); } + agent(0), first(0), second(0), agentEpoch(0) { mapDecode(map.asMap()); } QPID_COMMON_EXTERN ObjectId(uint8_t flags, uint16_t seq, uint32_t broker); QPID_COMMON_EXTERN ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq); QPID_COMMON_EXTERN ObjectId(std::istream&); @@ -70,6 +71,10 @@ public: QPID_COMMON_EXTERN void mapEncode(messaging::VariantMap& map) const; QPID_COMMON_EXTERN void mapDecode(const messaging::VariantMap& map); QPID_COMMON_EXTERN operator messaging::VariantMap() const; + QPID_COMMON_EXTERN uint32_t encodedSize() const { return 16; }; + QPID_COMMON_EXTERN void encode(std::string& buffer) const; + QPID_COMMON_EXTERN void decode(const std::string& buffer); + QPID_COMMON_EXTERN bool equalV1(const ObjectId &other) const; QPID_COMMON_EXTERN void setV2Key(const std::string& _key) { v2Key = _key; } QPID_COMMON_EXTERN void setV2Key(const ManagementObject& object); QPID_COMMON_EXTERN void setAgentName(const std::string& _name) { agentName = _name; } @@ -140,11 +145,11 @@ protected: bool forcePublish; QPID_COMMON_EXTERN int getThreadIndex(); - //QPID_COMMON_EXTERN void writeTimestamps(qpid::framing::Buffer& buf) const; + QPID_COMMON_EXTERN void writeTimestamps(std::string& buf) const; QPID_COMMON_EXTERN void writeTimestamps(messaging::VariantMap& map) const; - //QPID_COMMON_EXTERN void readTimestamps(qpid::framing::Buffer& buf); + QPID_COMMON_EXTERN void readTimestamps(const std::string& buf); QPID_COMMON_EXTERN void readTimestamps(const messaging::VariantMap& buf); - //QPID_COMMON_EXTERN uint32_t writeTimestampsSize() const; + QPID_COMMON_EXTERN uint32_t writeTimestampsSize() const; public: QPID_COMMON_EXTERN static const uint8_t MD5_LEN = 16; @@ -160,15 +165,6 @@ protected: virtual ~ManagementObject() {} virtual writeSchemaCall_t getWriteSchemaCall() = 0; - //virtual mapEncodeSchemaCall_t getMapEncodeSchemaCall() = 0; - //virtual void readProperties(qpid::framing::Buffer& buf) = 0; - //virtual uint32_t writePropertiesBufSize() const = 0; - //virtual void writeProperties(qpid::framing::Buffer& buf) const = 0; - //virtual void writeStatistics(qpid::framing::Buffer& buf, - // bool skipHeaders = false) = 0; - //virtual void doMethod(std::string& methodName, - // qpid::framing::Buffer& inBuf, - // qpid::framing::Buffer& outBuf) = 0; virtual std::string getKey() const = 0; // Encode & Decode the property and statistics values @@ -180,7 +176,14 @@ protected: virtual void doMethod(std::string& methodName, const messaging::VariantMap& inMap, messaging::VariantMap& outMap) = 0; - + virtual uint32_t writePropertiesSize() const = 0; + virtual void readProperties(const std::string& buf) = 0; + virtual void writeProperties(std::string& buf) const = 0; + virtual void writeStatistics(std::string& buf, + bool skipHeaders = false) = 0; + virtual void doMethod(std::string& methodName, + const std::string& inBuf, + std::string& outBuf) = 0; QPID_COMMON_EXTERN virtual void setReference(ObjectId objectId); virtual std::string& getClassName() const = 0; diff --git a/qpid/cpp/managementgen/qmfgen/management-types.xml b/qpid/cpp/managementgen/qmfgen/management-types.xml index 95434a278b..f00958c22d 100644 --- a/qpid/cpp/managementgen/qmfgen/management-types.xml +++ b/qpid/cpp/managementgen/qmfgen/management-types.xml @@ -23,7 +23,10 @@ "map": cast to convert from native type to Variant constructor parameter --> -<type name="objId" base="REF" cpp="::qpid::management::ObjectId" encode="#.encode(@)" decode="#.decode(@)" stream="#.getV2Key()" size="16" accessor="direct" init="::qpid::management::ObjectId()" byRef="y"/> +<type name="objId" base="REF" cpp="::qpid::management::ObjectId" + encode="{std::string _s; #.encode(_s); @.putRawData(_s);}" + decode="{std::string _s; @.getRawData(_s, #.encodedSize()); #.decode(_s);}" +stream="#.getV2Key()" size="16" accessor="direct" init="::qpid::management::ObjectId()" byRef="y"/> <type name="uint8" base="U8" cpp="uint8_t" encode="@.putOctet(#)" decode="# = @.getOctet()" stream="#" size="1" accessor="direct" init="0"/> <type name="uint16" base="U16" cpp="uint16_t" encode="@.putShort(#)" decode="# = @.getShort()" stream="#" size="2" accessor="direct" init="0"/> <type name="uint32" base="U32" cpp="uint32_t" encode="@.putLong(#)" decode="# = @.getLong()" stream="#" size="4" accessor="direct" init="0"/> @@ -39,9 +42,18 @@ <type name="deltaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong(#)" decode="# = @.getLongLong()" stream="#" size="8" accessor="direct" init="0"/> <type name="float" base="FLOAT" cpp="float" encode="@.putFloat(#)" decode="# = @.getFloat()" stream="#" size="4" accessor="direct" init="0."/> <type name="double" base="DOUBLE" cpp="double" encode="@.putDouble(#)" decode="# = @.getDouble()" stream="#" size="8" accessor="direct" init="0."/> -<type name="uuid" base="UUID" cpp="::qpid::messaging::Uuid" encode="#.encode(@)" decode="#.decode(@)" stream="#" size="16" accessor="direct" init="::qpid::messaging::Uuid()" byRef="y" unmap="(#).asUuid().data()" map="::qpid::messaging::Uuid((#).data())" /> -<type name="map" base="FTABLE" cpp="::qpid::messaging::VariantMap" encode="#.encode(@)" decode="#.decode(@)" stream="#" size="#.encodedSize()" accessor="direct" init="::qpid::messaging::VariantMap()" byRef="y" unmap="::qpid::messaging::VariantMap(); assert(false); /*TBD*/"/> +<type name="uuid" base="UUID" cpp="::qpid::messaging::Uuid" + encode="{::qpid::framing::Uuid _u(#.data()); _u.encode(@); }" + decode="{::qpid::framing::Uuid _u; _u.decode(@); # = ::qpid::messaging::Uuid(_u.data());}" + stream="#" size="16" accessor="direct" init="::qpid::messaging::Uuid()" byRef="y" unmap="(#).asUuid().data()" map="::qpid::messaging::Uuid((#).data())" /> +<type name="map" base="FTABLE" cpp="::qpid::messaging::VariantMap" + encode="{::qpid::framing::FieldTable _f = ManagementAgent::fromMap(#); _f.encode(@);}" + decode="{::qpid::framing::FieldTable _f; _f.decode(@); # = ManagementAgent::toMap(_f);}" + size="::qpid::framing::FieldTable(ManagementAgent::fromMap(#)).encodedSize()" +stream="#" accessor="direct" init="::qpid::messaging::VariantMap()" byRef="y" unmap="::qpid::messaging::VariantMap(); assert(false); /*TBD*/"/> +<!-- not supported in V1 <type name="list" base="LIST" cpp="::qpid::messaging::Variant::List" encode="#.encode(@)" decode="#.decode(@)" stream="#" size="#.encodedSize()" accessor="direct" init="::qpid::messaging::Variant::List()" byRef="y" unmap="::qpid::messaging::Variant::List(); assert(false); /*TBD*/"/> +--> <type name="hilo8" base="U8" cpp="uint8_t" encode="@.putOctet(#)" decode="# = @.getOctet()" style="wm" stream="#" size="1" accessor="counter" init="0"/> <type name="hilo16" base="U16" cpp="uint16_t" encode="@.putShort(#)" decode="# = @.getShort()" style="wm" stream="#" size="2" accessor="counter" init="0"/> diff --git a/qpid/cpp/managementgen/qmfgen/schema.py b/qpid/cpp/managementgen/qmfgen/schema.py index a827799b4b..30d7ba872d 100755 --- a/qpid/cpp/managementgen/qmfgen/schema.py +++ b/qpid/cpp/managementgen/qmfgen/schema.py @@ -1228,12 +1228,12 @@ class SchemaClass: inArgCount = inArgCount + 1 if methodCount == 0: - stream.write ("string&, Buffer&, Buffer& outBuf") + stream.write ("string&, const string&, string& outStr") else: if inArgCount == 0: - stream.write ("string& methodName, Buffer&, Buffer& outBuf") + stream.write ("string& methodName, const string&, string& outStr") else: - stream.write ("string& methodName, Buffer& inBuf, Buffer& outBuf") + stream.write ("string& methodName, const string& inStr, string& outStr") def genDoMapMethodArgs (self, stream, variables): @@ -1338,8 +1338,22 @@ class SchemaClass: stream.write ("%d" % len (self.methods)) def genMethodHandlers (self, stream, variables): + inArgs = False + for method in self.methods: + for arg in method.args: + if arg.getDir () == "I" or arg.getDir () == "IO": + inArgs = True; + break + + if inArgs: + stream.write("\n") + stream.write(" char *_tmpBuf = new char[inStr.length()];\n") + stream.write(" memcpy(_tmpBuf, inStr.data(), inStr.length());\n") + stream.write(" ::qpid::framing::Buffer inBuf(_tmpBuf, inStr.length());\n") + for method in self.methods: stream.write ("\n if (methodName == \"" + method.getName () + "\") {\n") + stream.write (" _matched = true;\n") if method.getArgCount () == 0: stream.write (" ::qpid::management::ArgsNone ioArgs;\n") else: @@ -1360,8 +1374,11 @@ class SchemaClass: stream.write (" " +\ arg.type.type.getWriteCode ("ioArgs." +\ arg.dir.lower () + "_" +\ - arg.name, "outBuf") + ";\n") - stream.write (" return;\n }\n") + arg.name, "outBuf") + ";\n") + stream.write(" }\n") + + if inArgs: + stream.write ("\n delete [] _tmpBuf;\n") diff --git a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp b/qpid/cpp/managementgen/qmfgen/templates/Class.cpp index 84a6e0b450..0302e5eba2 100644 --- a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp +++ b/qpid/cpp/managementgen/qmfgen/templates/Class.cpp @@ -93,9 +93,9 @@ void /*MGEN:Class.NameCap*/::registerSelf(ManagementAgent* agent) void /*MGEN:Class.NameCap*/::writeSchema (std::string& schema) { -#define BUFSIZE 65536 - char _msgChars[BUFSIZE]; - ::qpid::framing::Buffer buf(_msgChars, BUFSIZE); + const int _bufSize=65536; + char _msgChars[_bufSize]; + ::qpid::framing::Buffer buf(_msgChars, _bufSize); ::qpid::framing::FieldTable ft; // Schema class header: @@ -134,6 +134,136 @@ void /*MGEN:Class.NameCap*/::aggregatePerThreadStats(struct PerThreadStats* tota /*MGEN:ENDIF*/ +uint32_t /*MGEN:Class.NameCap*/::writePropertiesSize() const +{ + uint32_t size = writeTimestampsSize(); +/*MGEN:IF(Class.ExistOptionals)*/ + size += /*MGEN:Class.PresenceMaskBytes*/; +/*MGEN:ENDIF*/ +/*MGEN:Class.SizeProperties*/ + return size; +} + +void /*MGEN:Class.NameCap*/::readProperties (const std::string& _sBuf) +{ + char *_tmpBuf = new char[_sBuf.length()]; + memcpy(_tmpBuf, _sBuf.data(), _sBuf.length()); + ::qpid::framing::Buffer buf(_tmpBuf, _sBuf.length()); + ::qpid::sys::Mutex::ScopedLock mutex(accessLock); + + { + std::string _tbuf; + buf.getRawData(_tbuf, writeTimestampsSize()); + readTimestamps(_tbuf); + } + +/*MGEN:IF(Class.ExistOptionals)*/ + for (uint8_t idx = 0; idx < /*MGEN:Class.PresenceMaskBytes*/; idx++) + presenceMask[idx] = buf.getOctet(); +/*MGEN:ENDIF*/ +/*MGEN:Class.ReadProperties*/ + + delete [] _tmpBuf; +} + +void /*MGEN:Class.NameCap*/::writeProperties (std::string& _sBuf) const +{ + const int _bufSize=65536; + char _msgChars[_bufSize]; + ::qpid::framing::Buffer buf(_msgChars, _bufSize); + + ::qpid::sys::Mutex::ScopedLock mutex(accessLock); + configChanged = false; + + { + std::string _tbuf; + writeTimestamps(_tbuf); + buf.putRawData(_tbuf); + } + + +/*MGEN:IF(Class.ExistOptionals)*/ + for (uint8_t idx = 0; idx < /*MGEN:Class.PresenceMaskBytes*/; idx++) + buf.putOctet(presenceMask[idx]); +/*MGEN:ENDIF*/ +/*MGEN:Class.WriteProperties*/ + + uint32_t _bufLen = buf.getPosition(); + buf.reset(); + + buf.getRawData(_sBuf, _bufLen); +} + +void /*MGEN:Class.NameCap*/::writeStatistics (std::string& _sBuf, bool skipHeaders) +{ + const int _bufSize=65536; + char _msgChars[_bufSize]; + ::qpid::framing::Buffer buf(_msgChars, _bufSize); + + ::qpid::sys::Mutex::ScopedLock mutex(accessLock); + instChanged = false; +/*MGEN:IF(Class.ExistPerThreadAssign)*/ + for (int idx = 0; idx < maxThreads; idx++) { + struct PerThreadStats* threadStats = perThreadStatsArray[idx]; + if (threadStats != 0) { +/*MGEN:Class.PerThreadAssign*/ + } + } +/*MGEN:ENDIF*/ +/*MGEN:IF(Class.ExistPerThreadStats)*/ + struct PerThreadStats totals; + aggregatePerThreadStats(&totals); +/*MGEN:ENDIF*/ +/*MGEN:Class.Assign*/ + if (!skipHeaders) { + std::string _tbuf; + writeTimestamps (_tbuf); + buf.putRawData(_tbuf); + } + +/*MGEN:Class.WriteStatistics*/ + + // Maintenance of hi-lo statistics +/*MGEN:Class.HiLoStatResets*/ +/*MGEN:IF(Class.ExistPerThreadResets)*/ + for (int idx = 0; idx < maxThreads; idx++) { + struct PerThreadStats* threadStats = perThreadStatsArray[idx]; + if (threadStats != 0) { +/*MGEN:Class.PerThreadHiLoStatResets*/ + } + } +/*MGEN:ENDIF*/ + + uint32_t _bufLen = buf.getPosition(); + buf.reset(); + + buf.getRawData(_sBuf, _bufLen); +} + +void /*MGEN:Class.NameCap*/::doMethod (/*MGEN:Class.DoMethodArgs*/) +{ + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + std::string text; + + bool _matched = false; + + const int _bufSize=65536; + char _msgChars[_bufSize]; + ::qpid::framing::Buffer outBuf(_msgChars, _bufSize); + +/*MGEN:Class.MethodHandlers*/ + + if (!_matched) { + outBuf.putLong(status); + outBuf.putShortString(Manageable::StatusText(status, text)); + } + + uint32_t _bufLen = outBuf.getPosition(); + outBuf.reset(); + + outBuf.getRawData(outStr, _bufLen); +} + std::string /*MGEN:Class.NameCap*/::getKey() const { std::stringstream key; diff --git a/qpid/cpp/managementgen/qmfgen/templates/Class.h b/qpid/cpp/managementgen/qmfgen/templates/Class.h index 2f057dc9d2..026b1d79eb 100644 --- a/qpid/cpp/managementgen/qmfgen/templates/Class.h +++ b/qpid/cpp/managementgen/qmfgen/templates/Class.h @@ -83,6 +83,14 @@ class /*MGEN:Class.NameCap*/ : public ::qpid::management::ManagementObject const ::qpid::messaging::VariantMap& inMap, ::qpid::messaging::VariantMap& outMap); std::string getKey() const; + uint32_t writePropertiesSize() const; + void readProperties(const std::string& buf); + void writeProperties(std::string& buf) const; + void writeStatistics(std::string& buf, bool skipHeaders = false); + void doMethod(std::string& methodName, + const std::string& inBuf, + std::string& outBuf); + writeSchemaCall_t getWriteSchemaCall() { return writeSchema; } /*MGEN:IF(Class.NoStatistics)*/ // Stub for getInstChanged. There are no statistics in this class. diff --git a/qpid/cpp/managementgen/qmfgen/templates/Event.cpp b/qpid/cpp/managementgen/qmfgen/templates/Event.cpp index 0289d678ef..dea02fd545 100644 --- a/qpid/cpp/managementgen/qmfgen/templates/Event.cpp +++ b/qpid/cpp/managementgen/qmfgen/templates/Event.cpp @@ -58,9 +58,9 @@ void Event/*MGEN:Event.NameCap*/::registerSelf(ManagementAgent* agent) void Event/*MGEN:Event.NameCap*/::writeSchema (std::string& schema) { -#define BUFSIZE 65536 - char _msgChars[BUFSIZE]; - ::qpid::framing::Buffer buf(_msgChars, BUFSIZE); + const int _bufSize = 65536; + char _msgChars[_bufSize]; + ::qpid::framing::Buffer buf(_msgChars, _bufSize); ::qpid::framing::FieldTable ft; // Schema class header: @@ -80,6 +80,20 @@ void Event/*MGEN:Event.NameCap*/::writeSchema (std::string& schema) } } +void Event/*MGEN:Event.NameCap*/::encode(std::string& _sBuf) const +{ + const int _bufSize=65536; + char _msgChars[_bufSize]; + ::qpid::framing::Buffer buf(_msgChars, _bufSize); + +/*MGEN:Event.ArgEncodes*/ + + uint32_t _bufLen = buf.getPosition(); + buf.reset(); + + buf.getRawData(_sBuf, _bufLen); +} + void Event/*MGEN:Event.NameCap*/::mapEncode(::qpid::messaging::VariantMap& map) const { using namespace ::qpid::messaging; diff --git a/qpid/cpp/managementgen/qmfgen/templates/Event.h b/qpid/cpp/managementgen/qmfgen/templates/Event.h index 93def35ece..ab783c1698 100644 --- a/qpid/cpp/managementgen/qmfgen/templates/Event.h +++ b/qpid/cpp/managementgen/qmfgen/templates/Event.h @@ -51,6 +51,7 @@ class Event/*MGEN:Event.NameCap*/ : public ::qpid::management::ManagementEvent std::string& getEventName() const { return eventName; } uint8_t* getMd5Sum() const { return md5Sum; } uint8_t getSeverity() const { return /*MGEN:Event.Severity*/; } + void encode(std::string& buffer) const; void mapEncode(::qpid::messaging::VariantMap& map) const; }; diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index d94f228734..c5a7bb1ea9 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -93,7 +93,8 @@ Broker::Options::Options(const std::string& name) : tcpNoDelay(false), requireEncrypted(false), maxSessionRate(0), - asyncQueueEvents(false) // Must be false in a cluster. + asyncQueueEvents(false), // Must be false in a cluster. + qmf2Support(false) { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; @@ -114,6 +115,7 @@ Broker::Options::Options(const std::string& name) : ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket") ("staging-threshold", optValue(stagingThreshold, "N"), "Stages messages over N bytes to disk") ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management") + ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Use QMF v2 for Broker Management") ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval") ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"), "Interval between attempts to purge any expired messages from queues") @@ -138,7 +140,9 @@ const std::string knownHostsNone("none"); Broker::Broker(const Broker::Options& conf) : poller(new Poller), config(conf), - managementAgent(conf.enableMgmt ? new ManagementAgent() : 0), + managementAgent(conf.enableMgmt ? new ManagementAgent(!conf.qmf2Support, + conf.qmf2Support) + : 0), store(new NullMessageStore), acl(0), dataDir(conf.noDataDir ? std::string() : conf.dataDir), diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 465a17f4eb..f9be992f0c 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -113,6 +113,7 @@ public: std::string knownHosts; uint32_t maxSessionRate; bool asyncQueueEvents; + bool qmf2Support; private: std::string getHome(); diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 7e8dd7e764..127c43a701 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -47,6 +47,7 @@ using namespace qpid::framing; using namespace qpid::management; using namespace qpid::broker; using namespace qpid::sys; +using namespace qpid; using namespace std; namespace _qmf = qmf::org::apache::qpid::broker; @@ -76,10 +77,11 @@ ManagementAgent::RemoteAgent::~RemoteAgent () } } -ManagementAgent::ManagementAgent () : +ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : threadPoolSize(1), interval(10), broker(0), timer(0), startTime(uint64_t(Duration(now()))), - suppressed(false), agentName("") + suppressed(false), agentName(""), + qmf1Support(qmfV1), qmf2Support(qmfV2) { nextObjectId = 1; brokerBank = 1; @@ -268,32 +270,56 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi Mutex::ScopedLock lock (userLock); uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; - ::qpid::messaging::Message msg; - ::qpid::messaging::MapContent content(msg); - ::qpid::messaging::VariantMap &map_ = content.asMap(); - ::qpid::messaging::VariantMap schemaId; - ::qpid::messaging::VariantMap values; - ::qpid::messaging::VariantMap headers; - - map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(), - event.getEventName(), - "_event", - event.getMd5Sum()); - event.mapEncode(values); - map_["_values"] = values; - map_["_timestamp"] = uint64_t(Duration(now())); - map_["_severity"] = sev; - - headers["method"] = "indication"; - headers["qmf.opcode"] = "_data_indication"; - headers["qmf.content"] = "_event"; - headers["qmf.agent"] = std::string(agentName); + if (qmf1Support) { + Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); + uint32_t outLen; - content.encode(); + encodeHeader(outBuffer, 'e'); + outBuffer.putShortString(event.getPackageName()); + outBuffer.putShortString(event.getEventName()); + outBuffer.putBin128(event.getMd5Sum()); + outBuffer.putLongLong(uint64_t(Duration(now()))); + outBuffer.putOctet(sev); + std::string sBuf; + event.encode(sBuf); + outBuffer.putRawData(sBuf); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, mExchange, + "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); + QPID_LOG(trace, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName()); + } + + if (qmf2Support) { + ::qpid::messaging::Message msg; + ::qpid::messaging::MapContent content(msg); + ::qpid::messaging::VariantMap &map_ = content.asMap(); + ::qpid::messaging::VariantMap schemaId; + ::qpid::messaging::VariantMap values; + ::qpid::messaging::VariantMap headers; + + map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(), + event.getEventName(), + "_event", + event.getMd5Sum()); + event.mapEncode(values); + map_["_values"] = values; + map_["_timestamp"] = uint64_t(Duration(now())); + map_["_severity"] = sev; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_event"; + headers["qmf.agent"] = std::string(agentName); + + stringstream key; + key << "agent.ind.event." << sev << "." << std::string(agentName) << "." << event.getEventName(); + + content.encode(); + sendBuffer(msg.getContent(), "", headers, v2Topic, key.str()); + QPID_LOG(trace, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName()); + } - sendBuffer(msg.getContent(), "", headers, mExchange, - "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); - QPID_LOG(trace, "SEND raiseEvent class=" << event.getPackageName() << "." << event.getEventName()); } ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) @@ -488,10 +514,15 @@ void ManagementAgent::moveNewObjectsLH() void ManagementAgent::periodicProcessing (void) { +#define BUFSIZE 65536 +#define HEADROOM 4096 QPID_LOG(trace, "Management agent periodic processing"); Mutex::ScopedLock lock (userLock); + char msgChars[BUFSIZE]; + uint32_t contentSize; string routingKey; list<pair<ObjectId, ManagementObject*> > deleteList; + std::string sBuf; uint64_t uptime = uint64_t(Duration(now())) - startTime; static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); @@ -533,6 +564,7 @@ void ManagementAgent::periodicProcessing (void) !baseObject->isDeleted())) continue; + Buffer msgBuffer(msgChars, BUFSIZE); ::qpid::messaging::Message m; ::qpid::messaging::ListContent content(m); ::qpid::messaging::Variant::List &list_ = content.asList(); @@ -550,7 +582,21 @@ void ManagementAgent::periodicProcessing (void) send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()); send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish())); - if (send_stats || send_props) { + if (send_props && qmf1Support) { + encodeHeader(msgBuffer, 'c'); + sBuf.clear(); + object->writeProperties(sBuf); + msgBuffer.putRawData(sBuf); + } + + if (send_stats && qmf1Support) { + encodeHeader(msgBuffer, 'i'); + sBuf.clear(); + object->writeStatistics(sBuf); + msgBuffer.putRawData(sBuf); + } + + if ((send_stats || send_props) && qmf2Support) { ::qpid::messaging::Variant::Map map_; ::qpid::messaging::Variant::Map values; @@ -562,30 +608,49 @@ void ManagementAgent::periodicProcessing (void) map_["_values"] = values; list_.push_back(map_); - if (send_props) pcount++; - if (send_stats) scount++; } + if (send_props) pcount++; + if (send_stats) scount++; + if (object->isDeleted()) deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object)); object->setForcePublish(false); + + if (qmf1Support && (msgBuffer.available() < HEADROOM)) + break; } } - content.encode(); - const std::string &body = m.getContent(); - if (body.length()) { - stringstream key; - ::qpid::messaging::Variant::Map headers; - key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName(); - - headers["method"] = "indication"; - headers["qmf.opcode"] = "_data_indication"; - headers["qmf.content"] = "_data"; - headers["qmf.agent"] = std::string(agentName); - - sendBuffer(body, "", headers, mExchange, key.str()); - QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount); + if (pcount || scount) { + if (qmf1Support) { + contentSize = BUFSIZE - msgBuffer.available(); + if (contentSize > 0) { + msgBuffer.reset(); + stringstream key; + key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName(); + sendBuffer(msgBuffer, contentSize, mExchange, key.str()); + QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount); + } + } + + if (qmf2Support) { + content.encode(); + const std::string &body = m.getContent(); + if (body.length()) { + stringstream key; + ::qpid::messaging::Variant::Map headers; + key << "agent.ind.data." << baseObject->getPackageName() << "." << baseObject->getClassName(); + // key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName(); + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = std::string(agentName); + + sendBuffer(body, "", headers, v2Topic, key.str()); + QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount); + } + } } } @@ -603,32 +668,48 @@ void ManagementAgent::periodicProcessing (void) cdIter != deletedManagementObjects.end(); cdIter++) { collisionDeletions = true; { - ::qpid::messaging::Message m; - ::qpid::messaging::ListContent content(m); - ::qpid::messaging::Variant::List &list_ = content.asList(); - ::qpid::messaging::Variant::Map map_; - ::qpid::messaging::Variant::Map values; - ::qpid::messaging::Variant::Map headers; - - map_["_schema_id"] = mapEncodeSchemaId((*cdIter)->getPackageName(), - (*cdIter)->getClassName(), - "_data", - (*cdIter)->getMd5Sum()); - (*cdIter)->mapEncodeValues(values, true, false); - map_["_values"] = values; - list_.push_back(map_); - - headers["method"] = "indication"; - headers["qmf.opcode"] = "_data_indication"; - headers["qmf.content"] = "_data"; - headers["qmf.agent"] = std::string(agentName); - - content.encode(); - - stringstream key; - key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName(); - sendBuffer(m.getContent(), "", headers, mExchange, key.str()); - QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str()); + if (qmf1Support) { + Buffer msgBuffer(msgChars, BUFSIZE); + encodeHeader(msgBuffer, 'c'); + sBuf.clear(); + (*cdIter)->writeProperties(sBuf); + msgBuffer.putRawData(sBuf); + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + stringstream key; + key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName(); + sendBuffer (msgBuffer, contentSize, mExchange, key.str()); + QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str()); + } + + if (qmf2Support) { + ::qpid::messaging::Message m; + ::qpid::messaging::ListContent content(m); + ::qpid::messaging::Variant::List &list_ = content.asList(); + ::qpid::messaging::Variant::Map map_; + ::qpid::messaging::Variant::Map values; + ::qpid::messaging::Variant::Map headers; + + map_["_schema_id"] = mapEncodeSchemaId((*cdIter)->getPackageName(), + (*cdIter)->getClassName(), + "_data", + (*cdIter)->getMd5Sum()); + (*cdIter)->mapEncodeValues(values, true, false); + map_["_values"] = values; + list_.push_back(map_); + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = std::string(agentName); + + stringstream key; + key << "agent.ind.data." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName(); + + content.encode(); + sendBuffer(m.getContent(), "", headers, v2Topic, key.str()); + QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str()); + } } } @@ -663,33 +744,52 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) if (!object->isDeleted()) return; - ::qpid::messaging::Message m; - ::qpid::messaging::ListContent content(m); - ::qpid::messaging::Variant::List &list_ = content.asList(); - ::qpid::messaging::Variant::Map map_; - ::qpid::messaging::Variant::Map values; - - map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), - object->getClassName(), - "_data", - object->getMd5Sum()); - object->mapEncodeValues(values, true, false); - map_["_values"] = values; - list_.push_back(map_); - - stringstream key; - key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName(); + if (qmf1Support) { +#define DNOW_BUFSIZE 2048 + char msgChars[DNOW_BUFSIZE]; + uint32_t contentSize; + Buffer msgBuffer(msgChars, DNOW_BUFSIZE); + std::string sBuf; + + encodeHeader(msgBuffer, 'c'); + object->writeProperties(sBuf); + msgBuffer.putRawData(sBuf); + contentSize = msgBuffer.getPosition(); + msgBuffer.reset(); + stringstream key; + key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName(); + sendBuffer(msgBuffer, contentSize, mExchange, key.str()); + QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str()); + } + + if (qmf2Support) { + ::qpid::messaging::Message m; + ::qpid::messaging::ListContent content(m); + ::qpid::messaging::Variant::List &list_ = content.asList(); + ::qpid::messaging::Variant::Map map_; + ::qpid::messaging::Variant::Map values; + + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + "_data", + object->getMd5Sum()); + object->mapEncodeValues(values, true, false); + map_["_values"] = values; + list_.push_back(map_); - content.encode(); + stringstream key; + key << "agent.ind.data." << object->getPackageName() << "." << object->getClassName(); - ::qpid::messaging::Variant::Map headers; - headers["method"] = "indication"; - headers["qmf.opcode"] = "_data_indication"; - headers["qmf.content"] = "_data"; - headers["qmf.agent"] = std::string(agentName); + ::qpid::messaging::Variant::Map headers; + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = std::string(agentName); - sendBuffer(m.getContent(), "", headers, mExchange, key.str()); - QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str()); + content.encode(); + sendBuffer(m.getContent(), "", headers, v2Topic, key.str()); + QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str()); + } managementObjects.erase(oid); } @@ -749,14 +849,6 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken) { -#if 1 // deprecated - QPID_LOG(warning, "Ignoring old-format QMF Method Request!!!"); - // prevent "unused param" warnings - (void)inBuffer; - (void)replyToKey; - (void)sequence; - (void)connToken; -#else string methodName; string packageName; string className; @@ -764,14 +856,19 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; AclModule* acl = broker->getAcl(); + std::string inArgs; - ObjectId objId(inBuffer); + std::string sBuf; + inBuffer.getRawData(sBuf, 16); + ObjectId objId; + objId.decode(sBuf); inBuffer.getShortString(packageName); inBuffer.getShortString(className); inBuffer.getBin128(hash); inBuffer.getShortString(methodName); + inBuffer.getRawData(inArgs, inBuffer.available()); - QPID_LOG(trace, "RECV MethodRequest class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" << + QPID_LOG(trace, "RECV MethodRequest (v1) class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" << methodName << " replyTo=" << replyToKey); encodeHeader(outBuffer, 'm', sequence); @@ -783,8 +880,8 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence) - return; + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence); + return; } if (acl != 0) { @@ -799,12 +896,12 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence) - return; + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); + return; } } - ManagementObjectMap::iterator iter = managementObjects.find(objId); + ManagementObjectMap::iterator iter = numericFind(objId); if (iter == managementObjects.end() || iter->second->isDeleted()) { outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); @@ -818,7 +915,9 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey try { outBuffer.record(); Mutex::ScopedUnlock u(userLock); - iter->second->doMethod(methodName, inBuffer, outBuffer); + std::string outBuf; + iter->second->doMethod(methodName, inArgs, outBuf); + outBuffer.putRawData(outBuf); } catch(exception& e) { outBuffer.restore(); outBuffer.putLong(Manageable::STATUS_EXCEPTION); @@ -829,8 +928,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND MethodResponse to=" << replyToKey << " seq=" << sequence); -#endif + QPID_LOG(trace, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence); } @@ -857,7 +955,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep (outMap["_values"].asMap())["_status"] = Manageable::STATUS_PARAMETER_INVALID; (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID); outMap.encode(); - sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo); + sendBuffer(outMsg.getContent(), cid, headers, v2Direct, replyTo); QPID_LOG(trace, "SEND MethodResponse (invalid param) to=" << replyTo << " seq=" << cid); return; } @@ -879,7 +977,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep (outMap["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION; (outMap["_values"].asMap())["_status_text"] = e.what(); outMap.encode(); - sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo); + sendBuffer(outMsg.getContent(), cid, headers, v2Direct, replyTo); QPID_LOG(trace, "SEND MethodResponse (invalid format) to=" << replyTo << " seq=" << cid); return; } @@ -891,7 +989,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep (outMap["_values"].asMap())["_status"] = Manageable::STATUS_UNKNOWN_OBJECT; (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT); outMap.encode(); - sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo); + sendBuffer(outMsg.getContent(), cid, headers, v2Direct, replyTo); QPID_LOG(trace, "SEND MethodResponse (unknown object) to=" << replyTo << " seq=" << cid); return; } @@ -906,7 +1004,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep (outMap["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN; (outMap["_values"].asMap())["_status_text"] = i->second; outMap.encode(); - sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo); + sendBuffer(outMsg.getContent(), cid, headers, v2Direct, replyTo); QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << cid); return; } @@ -922,7 +1020,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep (outMap["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN; (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN); outMap.encode(); - sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo); + sendBuffer(outMsg.getContent(), cid, headers, v2Direct, replyTo); QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << cid); return; } @@ -930,6 +1028,10 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep // invoke the method + QPID_LOG(trace, "RECV MethodRequest (v2) class=" << iter->second->getPackageName() + << ":" << iter->second->getClassName() << " method=" << + methodName << " replyTo=" << replyTo); + try { iter->second->doMethod(methodName, inArgs, outMap.asMap()); } catch(exception& e) { @@ -938,14 +1040,14 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep (outMap["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION; (outMap["_values"].asMap())["_status_text"] = e.what(); outMap.encode(); - sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo); + sendBuffer(outMsg.getContent(), cid, headers, v2Direct, replyTo); QPID_LOG(trace, "SEND MethodResponse (exception) to=" << replyTo << " seq=" << cid); return; } outMap.encode(); - sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo); - QPID_LOG(trace, "SEND MethodResponse to=" << replyTo << " seq=" << cid); + sendBuffer(outMsg.getContent(), cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid); } @@ -1286,7 +1388,7 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin ft.decode(inBuffer); - QPID_LOG(trace, "RECV GetQuery query=" << ft << " seq=" << sequence); + QPID_LOG(trace, "RECV GetQuery (v1) query=" << ft << " seq=" << sequence); value = ft.get("_class"); if (value.get() == 0 || !value->convertsTo<string>()) { @@ -1295,34 +1397,27 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin return; ObjectId selector(value->get<string>()); - ManagementObjectMap::iterator iter = managementObjects.find(selector); + ManagementObjectMap::iterator iter = numericFind(selector); if (iter != managementObjects.end()) { ManagementObject* object = iter->second; - ::qpid::messaging::Message m; - ::qpid::messaging::ListContent content(m); - ::qpid::messaging::Variant::List &list_ = content.asList(); - ::qpid::messaging::Variant::Map map_; - ::qpid::messaging::Variant::Map values; - ::qpid::messaging::Variant::Map headers; + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); if (!object->isDeleted()) { - object->mapEncodeValues(values, true, true); // write both stats and properties - map_["_values"] = values; - list_.push_back(map_); - - headers["method"] = "response"; - headers["qmf.opcode"] = "_query_response"; - headers["qmf.content"] = "_data"; - headers["qmf.agent"] = std::string(agentName); - - content.encode(); - - sendBuffer(m.getContent(), boost::lexical_cast<std::string>(sequence), - headers, dExchange, replyToKey); - QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence); + std::string sBuf; + encodeHeader(outBuffer, 'g', sequence); + object->writeProperties(sBuf); + outBuffer.putRawData(sBuf); + sBuf.clear(); + object->writeStatistics(sBuf, true); + outBuffer.putRawData(sBuf); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); + QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); } } sendCommandComplete(replyToKey, sequence); @@ -1336,31 +1431,24 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin iter++) { ManagementObject* object = iter->second; if (object->getClassName () == className) { - ::qpid::messaging::Message m; - ::qpid::messaging::ListContent content(m); - ::qpid::messaging::Variant::List &list_ = content.asList(); - ::qpid::messaging::Variant::Map map_; - ::qpid::messaging::Variant::Map values; - ::qpid::messaging::Variant::Map headers; + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); if (!object->isDeleted()) { - object->mapEncodeValues(values, true, true); // write both stats and properties - map_["_values"] = values; - list_.push_back(map_); - - headers["method"] = "response"; - headers["qmf.opcode"] = "_query_response"; - headers["qmf.content"] = "_data"; - headers["qmf.agent"] = std::string(agentName); - - content.encode(); - - sendBuffer(m.getContent(), boost::lexical_cast<std::string>(sequence), - headers, dExchange, replyToKey); - QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence); + std::string sBuf; + encodeHeader(outBuffer, 'g', sequence); + object->writeProperties(sBuf); + outBuffer.putRawData(sBuf); + sBuf.clear(); + object->writeStatistics(sBuf, true); + outBuffer.putRawData(sBuf); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); + QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); } } } @@ -1386,7 +1474,7 @@ void ManagementAgent::handleGetQueryLH(const std::string& body, std::string& rep qpid::messaging::MapView::const_iterator i; ::qpid::messaging::Variant::Map headers; - QPID_LOG(trace, "RECV GetQuery: map=" << inMap << " seq=" << cid); + QPID_LOG(trace, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid); headers["method"] = "response"; headers["qmf.opcode"] = "_query_response"; @@ -1435,7 +1523,7 @@ void ManagementAgent::handleGetQueryLH(const std::string& body, std::string& rep list_.push_back(map_); content.encode(); - sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo); + sendBuffer(outMsg.getContent(), cid, headers, v2Direct, replyTo); } } } @@ -1458,7 +1546,7 @@ void ManagementAgent::handleGetQueryLH(const std::string& body, std::string& rep list_.push_back(map_); content.encode(); - sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo); + sendBuffer(outMsg.getContent(), cid, headers, v2Direct, replyTo); } } } @@ -1468,8 +1556,8 @@ void ManagementAgent::handleGetQueryLH(const std::string& body, std::string& rep list_.clear(); headers.erase("partial"); content.encode(); - sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo); - QPID_LOG(trace, "SEND GetResponse to=" << replyTo << " seq=" << cid); + sendBuffer(outMsg.getContent(), cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND GetResponse (v2) to=" << replyTo << " seq=" << cid); } bool ManagementAgent::authorizeAgentMessageLH(Message& msg) @@ -1481,6 +1569,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) string packageName; string className; string methodName; + std::string cid; if (msg.encodedSize() > MA_BUFFER_SIZE) return false; @@ -1499,16 +1588,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) mapMsg = true; if (p && p->hasCorrelationId()) { - std::string cid = p->getCorrelationId(); - if (!cid.empty()) { - try { - sequence = boost::lexical_cast<uint32_t>(cid); - } catch(const boost::bad_lexical_cast&) { - QPID_LOG(warning, - "Bad correlation Id for received QMF authorize req: [" << cid << "]"); - return false; - } - } + cid = p->getCorrelationId(); } if (headers->getAsString("qmf.opcode") == "_method_request") @@ -1613,8 +1693,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN; ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN); outMap.encode(); - sendBuffer(outMsg.getContent(), boost::lexical_cast<std::string>(sequence), - headers, dExchange, replyToKey); + sendBuffer(outMsg.getContent(), cid, headers, v2Direct, replyToKey); } else { @@ -1690,7 +1769,6 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg) // old preV2 binary messages - while (inBuffer.getPosition() < bufferLen) { uint32_t sequence; if (!checkHeader(inBuffer, &opcode, &sequence)) @@ -1877,6 +1955,17 @@ size_t ManagementAgent::validateEventSchema(Buffer& inBuffer) return end - start; } +ManagementObjectMap::iterator ManagementAgent::numericFind(const ObjectId& oid) +{ + ManagementObjectMap::iterator iter = managementObjects.begin(); + for (; iter != managementObjects.end(); iter++) { + if (oid.equalV1(iter->first)) + break; + } + + return iter; +} + void ManagementAgent::setAllocator(std::auto_ptr<IdAllocator> a) { Mutex::ScopedLock lock (userLock); @@ -2125,41 +2214,192 @@ qpid::messaging::Variant::Map ManagementAgent::toMap(const FieldTable& from) const string& key(iter->first); const FieldTable::ValuePtr& val(iter->second); - if (typeid(val.get()) == typeid(Str8Value) || typeid(val.get()) == typeid(Str16Value)) { - map[key] = Variant(val->get<string>()); - } else if (typeid(val.get()) == typeid(FloatValue)) { - map[key] = Variant(val->get<float>()); - } else if (typeid(val.get()) == typeid(DoubleValue)) { - map[key] = Variant(val->get<double>()); - } else if (typeid(val.get()) == typeid(IntegerValue)) { - map[key] = Variant(val->get<int>()); - } else if (typeid(val.get()) == typeid(TimeValue)) { - map[key] = Variant(val->get<int64_t>()); - } else if (typeid(val.get()) == typeid(Integer64Value)) { - map[key] = Variant(val->get<int64_t>()); - } else if (typeid(val.get()) == typeid(Unsigned64Value)) { - map[key] = Variant(val->get<uint64_t>()); - } else if (typeid(val.get()) == typeid(FieldTableValue)) { - map[key] = Variant(toMap(val->get<FieldTable>())); - } else if (typeid(val.get()) == typeid(VoidValue)) { - map[key] = Variant(); - } else if (typeid(val.get()) == typeid(BoolValue)) { - map[key] = Variant(val->get<bool>()); - } else if (typeid(val.get()) == typeid(Unsigned8Value)) { - map[key] = Variant(val->get<uint8_t>()); - } else if (typeid(val.get()) == typeid(Unsigned16Value)) { - map[key] = Variant(val->get<uint16_t>()); - } else if (typeid(val.get()) == typeid(Unsigned32Value)) { - map[key] = Variant(val->get<uint32_t>()); - } else if (typeid(val.get()) == typeid(Integer8Value)) { - map[key] = Variant(val->get<int8_t>()); - } else if (typeid(val.get()) == typeid(Integer16Value)) { - map[key] = Variant(val->get<int16_t>()); - } else if (typeid(val.get()) == typeid(UuidValue)) { - map[key] = Variant(messaging::Uuid(val->get<framing::Uuid>().c_array())); - } + map[key] = toVariant(val); } return map; } +// qpid::messaging::Variant::List ManagementAgent::toList(const qpid::framing::Array& from) +// { +// qpid::messaging::Variant::List _list; + +// for (qpid::framing::Array::const_iterator iter = from.begin(); iter != from.end(); iter++) { +// const qpid::framing::Array::ValuePtr& val(*iter); + +// _list.push_back(toVariant(val)); +// } + +// return _list; +// } + +qpid::framing::FieldTable ManagementAgent::fromMap(const qpid::messaging::Variant::Map& from) +{ + qpid::framing::FieldTable ft; + + for (qpid::messaging::Variant::Map::const_iterator iter = from.begin(); + iter != from.end(); + iter++) { + const string& key(iter->first); + const qpid::messaging::Variant& val(iter->second); + + ft.set(key, toFieldValue(val)); + } + + return ft; +} + + +// qpid::framing::Array ManagementAgent::fromList(const qpid::messaging::Variant::List& from) +// { +// qpid::framing::Array fa; + +// for (qpid::messaging::Variant::List::const_iterator iter = from.begin(); +// iter != from.end(); +// iter++) { +// const qpid::messaging::Variant& val(*iter); + +// fa.push_back(toFieldValue(val)); +// } + +// return fa; +// } + + +boost::shared_ptr<FieldValue> ManagementAgent::toFieldValue(const Variant& in) +{ + + switch(in.getType()) { + + case messaging::VAR_VOID: return boost::shared_ptr<FieldValue>(new VoidValue()); + case messaging::VAR_BOOL: return boost::shared_ptr<FieldValue>(new BoolValue(in.asBool())); + case messaging::VAR_UINT8: return boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8())); + case messaging::VAR_UINT16: return boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16())); + case messaging::VAR_UINT32: return boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32())); + case messaging::VAR_UINT64: return boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64())); + case messaging::VAR_INT8: return boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8())); + case messaging::VAR_INT16: return boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16())); + case messaging::VAR_INT32: return boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32())); + case messaging::VAR_INT64: return boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64())); + case messaging::VAR_FLOAT: return boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat())); + case messaging::VAR_DOUBLE: return boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble())); + case messaging::VAR_STRING: return boost::shared_ptr<FieldValue>(new Str16Value(in.asString())); + case messaging::VAR_UUID: return boost::shared_ptr<FieldValue>(new UuidValue(in.asUuid().data())); + case messaging::VAR_MAP: return boost::shared_ptr<FieldValue>(new FieldTableValue(ManagementAgent::fromMap(in.asMap()))); + default: + break; + //case messaging::VAR_LIST: return boost::shared_ptr<FieldValue>(new ArrayValue(ManagementAgent::fromList(in.asList()))); + } + + QPID_LOG(error, "Unknown Variant type - not converted: [" << in.getType() << "]"); + return boost::shared_ptr<FieldValue>(new VoidValue()); +} + +// stolen from qpid/client/amqp0_10/Codecs.cpp - TODO: make Codecs public, and remove this dup. +qpid::messaging::Variant ManagementAgent::toVariant(const boost::shared_ptr<FieldValue>& in) +{ + const std::string iso885915("iso-8859-15"); + const std::string utf8("utf8"); + const std::string utf16("utf16"); + //const std::string binary("binary"); + const std::string amqp0_10_binary("amqp0-10:binary"); + //const std::string amqp0_10_bit("amqp0-10:bit"); + const std::string amqp0_10_datetime("amqp0-10:datetime"); + const std::string amqp0_10_struct("amqp0-10:struct"); + Variant out; + + //based on AMQP 0-10 typecode, pick most appropriate variant type + switch (in->getType()) { + //Fixed Width types: + case 0x00: //bin8 + case 0x01: out.setEncoding(amqp0_10_binary); // int8 + case 0x02: out = in->getIntegerValue<int8_t, 1>(); break; //uint8 + case 0x03: out = in->getIntegerValue<uint8_t, 1>(); break; // + // case 0x04: break; //TODO: iso-8859-15 char // char + case 0x08: out = static_cast<bool>(in->getIntegerValue<uint8_t, 1>()); break; // bool int8 + + case 0x10: out.setEncoding(amqp0_10_binary); // bin16 + case 0x11: out = in->getIntegerValue<int16_t, 2>(); break; // int16 + case 0x12: out = in->getIntegerValue<uint16_t, 2>(); break; //uint16 + + case 0x20: out.setEncoding(amqp0_10_binary); // bin32 + case 0x21: out = in->getIntegerValue<int32_t, 4>(); break; // int32 + case 0x22: out = in->getIntegerValue<uint32_t, 4>(); break; // uint32 + + case 0x23: out = in->get<float>(); break; // float(32) + + // case 0x27: break; //TODO: utf-32 char + + case 0x30: out.setEncoding(amqp0_10_binary); // bin64 + case 0x31: out = in->getIntegerValue<int64_t, 8>(); break; //int64 + + case 0x38: out.setEncoding(amqp0_10_datetime); //treat datetime as uint64_t, but set encoding + case 0x32: out = in->getIntegerValue<uint64_t, 8>(); break; //uint64 + case 0x33: out = in->get<double>(); break; // double + + case 0x48: // uuid + { + unsigned char data[16]; + in->getFixedWidthValue<16>(data); + out = qpid::messaging::Uuid(data); + } break; + + //TODO: figure out whether and how to map values with codes 0x40-0xd8 + + case 0xf0: break;//void, which is the default value for Variant + // case 0xf1: out.setEncoding(amqp0_10_bit); break;//treat 'bit' as void, which is the default value for Variant + + //Variable Width types: + //strings: + case 0x80: // str8 + case 0x90: // str16 + case 0xa0: // str32 + out = in->get<std::string>(); + out.setEncoding(amqp0_10_binary); + break; + + case 0x84: // str8 + case 0x94: // str16 + out = in->get<std::string>(); + out.setEncoding(iso885915); + break; + + case 0x85: // str8 + case 0x95: // str16 + out = in->get<std::string>(); + out.setEncoding(utf8); + break; + + case 0x86: // str8 + case 0x96: // str16 + out = in->get<std::string>(); + out.setEncoding(utf16); + break; + + case 0xab: // str32 + out = in->get<std::string>(); + out.setEncoding(amqp0_10_struct); + break; + + case 0xa8: // map + out = ManagementAgent::toMap(in->get<FieldTable>()); + break; + + //case 0xa9: // list of variant types + // out = Variant::List(); + // translate<List>(in, out.asList(), &toVariant); + // break; + //case 0xaa: //convert amqp0-10 array (uniform type) into variant list + // out = Variant::List(); + // translate<Array>(in, out.asList(), &toVariant); + // break; + + default: + //error? + QPID_LOG(error, "Unknown FieldValue type - not converted: [" << (unsigned int)(in->getType()) << "]"); + break; + } + + return out; +} + diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index 675bcb7774..44d3961d52 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -34,6 +34,7 @@ #include "qmf/org/apache/qpid/broker/Agent.h" #include "qpid/messaging/Variant.h" #include <qpid/framing/AMQFrame.h> +#include <qpid/framing/FieldValue.h> #include <memory> #include <string> #include <map> @@ -63,7 +64,7 @@ public: } severity_t; - ManagementAgent (); + ManagementAgent (const bool qmfV1, const bool qmfV2); virtual ~ManagementAgent (); /** Called before plugins are initialized */ @@ -132,7 +133,14 @@ public: uint16_t getBootSequence(void) { return bootSequence; } void setBootSequence(uint16_t b) { bootSequence = b; } + // TODO: remove these when Variant API moved into common library. static messaging::Variant::Map toMap(const framing::FieldTable& from); + static framing::FieldTable fromMap(const messaging::Variant::Map& from); + //static messaging::Variant::List toList(const framing::Array& from); + //static framing::Array fromList(const messaging::Variant::List& from); + static boost::shared_ptr<framing::FieldValue> toFieldValue(const messaging::Variant& in); + static messaging::Variant toVariant(const boost::shared_ptr<framing::FieldValue>& val); + private: struct Periodic : public qpid::sys::TimerTask @@ -270,6 +278,8 @@ private: typedef std::map<MethodName, std::string> DisallowedMethods; DisallowedMethods disallowed; std::string agentName; // KAG TODO FIX + bool qmf1Support; + bool qmf2Support; # define MA_BUFFER_SIZE 65536 @@ -329,6 +339,7 @@ private: size_t validateSchema(framing::Buffer&, uint8_t kind); size_t validateTableSchema(framing::Buffer&); size_t validateEventSchema(framing::Buffer&); + ManagementObjectMap::iterator numericFind(const ObjectId& oid); std::string debugSnapshot(); }; diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp index 6184ed0697..b37c8df731 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.cpp +++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp @@ -21,7 +21,8 @@ #include "qpid/management/Manageable.h" #include "qpid/management/ManagementObject.h" -//#include "qpid/framing/FieldTable.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/Buffer.h" #include "qpid/sys/Thread.h" #include <stdlib.h> @@ -131,28 +132,40 @@ bool ObjectId::operator<(const ObjectId &other) const return v2Key < other.v2Key; } -#if 0 bool ObjectId::equalV1(const ObjectId &other) const { uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL; return first == otherFirst && second == other.second; } -#endif -// void ObjectId::encode(framing::Buffer& buffer) const -// { -// if (agent == 0) -// buffer.putLongLong(first); -// else -// buffer.putLongLong(first | agent->first); -// buffer.putLongLong(second); -// } - -// void ObjectId::decode(framing::Buffer& buffer) -// { -// first = buffer.getLongLong(); -// second = buffer.getLongLong(); -// } +void ObjectId::encode(std::string& buffer) const +{ + const uint32_t len = 16; + char _data[len]; + qpid::framing::Buffer body(_data, len); + + if (agent == 0) + body.putLongLong(first); + else + body.putLongLong(first | agent->first); + body.putLongLong(second); + + body.reset(); + body.getRawData(buffer, len); +} + +void ObjectId::decode(const std::string& buffer) +{ + const uint32_t len = 16; + char _data[len]; + qpid::framing::Buffer body(_data, len); + + body.checkAvailable(buffer.length()); + body.putRawData(buffer); + body.reset(); + first = body.getLongLong(); + second = body.getLongLong(); +} void ObjectId::setV2Key(const ManagementObject& object) { @@ -223,42 +236,56 @@ std::ostream& operator<<(std::ostream& out, const ObjectId& i) int ManagementObject::maxThreads = 1; int ManagementObject::nextThreadIndex = 0; -// void ManagementObject::writeTimestamps (framing::Buffer& buf) const -// { -// buf.putShortString (getPackageName ()); -// buf.putShortString (getClassName ()); -// buf.putBin128 (getMd5Sum ()); -// buf.putLongLong (updateTime); -// buf.putLongLong (createTime); -// buf.putLongLong (destroyTime); -// objectId.encode(buf); -// } - -// void ManagementObject::readTimestamps (framing::Buffer& buf) -// { -// std::string unused; -// uint8_t unusedUuid[16]; -// ObjectId unusedObjectId; - -// buf.getShortString(unused); -// buf.getShortString(unused); -// buf.getBin128(unusedUuid); -// updateTime = buf.getLongLong(); -// createTime = buf.getLongLong(); -// destroyTime = buf.getLongLong(); -// unusedObjectId.decode(buf); -// } - -// uint32_t ManagementObject::writeTimestampsSize() const -// { -// return 1 + getPackageName().length() + // str8 -// 1 + getClassName().length() + // str8 -// 16 + // bin128 -// 8 + // uint64 -// 8 + // uint64 -// 8 + // uint64 -// objectId.encodedSize(); // objectId -// } +void ManagementObject::writeTimestamps (std::string& buf) const +{ + char _data[4000]; + qpid::framing::Buffer body(_data, 4000); + + body.putShortString (getPackageName ()); + body.putShortString (getClassName ()); + body.putBin128 (getMd5Sum ()); + body.putLongLong (updateTime); + body.putLongLong (createTime); + body.putLongLong (destroyTime); + + uint32_t len = body.getPosition(); + body.reset(); + body.getRawData(buf, len); + + std::string oid; + objectId.encode(oid); + buf += oid; +} + +void ManagementObject::readTimestamps (const std::string& buf) +{ + char _data[4000]; + qpid::framing::Buffer body(_data, 4000); + std::string unused; + uint8_t unusedUuid[16]; + + body.checkAvailable(buf.length()); + body.putRawData(buf); + body.reset(); + + body.getShortString(unused); + body.getShortString(unused); + body.getBin128(unusedUuid); + updateTime = body.getLongLong(); + createTime = body.getLongLong(); + destroyTime = body.getLongLong(); +} + +uint32_t ManagementObject::writeTimestampsSize() const +{ + return 1 + getPackageName().length() + // str8 + 1 + getClassName().length() + // str8 + 16 + // bin128 + 8 + // uint64 + 8 + // uint64 + 8 + // uint64 + objectId.encodedSize(); // objectId +} void ManagementObject::writeTimestamps (messaging::VariantMap& map) const diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py index d3cbf76dc7..8d12577deb 100644 --- a/qpid/extras/qmf/src/py/qmf/console.py +++ b/qpid/extras/qmf/src/py/qmf/console.py @@ -341,7 +341,7 @@ class Object(object): ttl = timeWait * 1000 else: ttl = None - smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" % + smsg = self._broker._message(sendCodec.encoded, "agent.%d.%s" % (self._objectId.getBrokerBank(), self._objectId.getAgentBank()), ttl=ttl) if synchronous: |