diff options
37 files changed, 4035 insertions, 928 deletions
diff --git a/cpp/bindings/qmf/tests/python_console.py b/cpp/bindings/qmf/tests/python_console.py index efd25a1d26..883aa8da1a 100755 --- a/cpp/bindings/qmf/tests/python_console.py +++ b/cpp/bindings/qmf/tests/python_console.py @@ -48,7 +48,7 @@ class QmfInteropTests(TestBase010): self.assertEqual(len(parents), 1) parent = parents[0] for seq in range(10): - result = parent.echo(seq) + result = parent.echo(seq, _timeout=5) self.assertEqual(result.status, 0) self.assertEqual(result.text, "OK") self.assertEqual(result.sequence, seq) diff --git a/cpp/examples/qmf-agent/example.cpp b/cpp/examples/qmf-agent/example.cpp index 5ab9c10c91..18f1bf6a90 100644 --- a/cpp/examples/qmf-agent/example.cpp +++ b/cpp/examples/qmf-agent/example.cpp @@ -24,6 +24,7 @@ #include <qpid/agent/ManagementAgent.h> #include <qpid/sys/Mutex.h> #include <qpid/sys/Time.h> +#include "qpid/types/Variant.h" #include "qmf/org/apache/qpid/agent/example/Parent.h" #include "qmf/org/apache/qpid/agent/example/Child.h" #include "qmf/org/apache/qpid/agent/example/ArgsParentCreate_child.h" @@ -44,6 +45,7 @@ using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; using qpid::sys::Mutex; +using qpid::types::Variant; namespace _qmf = qmf::org::apache::qpid::agent::example; class ChildClass; @@ -96,8 +98,22 @@ CoreClass::CoreClass(ManagementAgent* _agent, string _name) : name(_name), agent static uint64_t persistId = 0x111222333444555LL; mgmtObject = new _qmf::Parent(agent, this, name); - agent->addObject(mgmtObject, persistId++); + agent->addObject(mgmtObject); mgmtObject->set_state("IDLE"); + + Variant::Map args; + Variant::Map subMap; + args["first"] = "String data"; + args["second"] = 34; + subMap["string-data"] = "Text"; + subMap["numeric-data"] = 10000; + args["map-data"] = subMap; + mgmtObject->set_args(args); + + Variant::List list; + list.push_back(20000); + list.push_back("string-item"); + mgmtObject->set_list(list); } void CoreClass::doLoop() @@ -178,6 +194,9 @@ int main_int(int argc, char** argv) // Register the Qmf_example schema with the agent _qmf::Package packageInit(agent); + // Name the agent. + agent->setName("apache.org", "qmf-example"); + // Start the agent. It will attempt to make a connection to the // management broker agent->init(settings, 5, false, ".magentdata"); diff --git a/cpp/examples/qmf-agent/schema.xml b/cpp/examples/qmf-agent/schema.xml index 1bf701a655..3c7755fe83 100644 --- a/cpp/examples/qmf-agent/schema.xml +++ b/cpp/examples/qmf-agent/schema.xml @@ -29,6 +29,8 @@ This class represents a parent object <property name="name" type="sstr" access="RC" index="y"/> + <property name="args" type="map" access="RO"/> + <property name="list" type="list" access="RO"/> <statistic name="state" type="sstr" desc="Operational state of the link"/> <statistic name="count" type="count64" unit="tick" desc="Counter that increases monotonically"/> diff --git a/cpp/include/qpid/agent/ManagementAgent.h b/cpp/include/qpid/agent/ManagementAgent.h index b6ec82862c..aeb5585e61 100644 --- a/cpp/include/qpid/agent/ManagementAgent.h +++ b/cpp/include/qpid/agent/ManagementAgent.h @@ -63,6 +63,17 @@ class ManagementAgent virtual int getMaxThreads() = 0; + // Set the name of the agent + // + // vendor - Vendor name or domain (i.e. "apache.org") + // product - Product name (i.e. "qpid") + // instance - A unique identifier for this instance of the agent. + // If empty, the agent will create a GUID for the instance. + // + virtual void setName(const std::string& vendor, + const std::string& product, + const std::string& instance="") = 0; + // Connect to a management broker // // brokerHost - Hostname or IP address (dotted-quad) of broker. @@ -128,6 +139,9 @@ class ManagementAgent // in an orderly way. // virtual ObjectId addObject(ManagementObject* objectPtr, uint64_t persistId = 0) = 0; + virtual ObjectId addObject(ManagementObject* objectPtr, + const std::string& key, + bool persistent = true) = 0; // // diff --git a/cpp/include/qpid/framing/FieldTable.h b/cpp/include/qpid/framing/FieldTable.h index 085f7ed110..fdb1a28b9d 100644 --- a/cpp/include/qpid/framing/FieldTable.h +++ b/cpp/include/qpid/framing/FieldTable.h @@ -51,6 +51,7 @@ class FieldTable typedef boost::shared_ptr<FieldValue> ValuePtr; typedef std::map<std::string, ValuePtr> ValueMap; typedef ValueMap::iterator iterator; + typedef ValueMap::const_iterator const_iterator; typedef ValueMap::const_reference const_reference; typedef ValueMap::reference reference; typedef ValueMap::value_type value_type; diff --git a/cpp/include/qpid/management/ManagementEvent.h b/cpp/include/qpid/management/ManagementEvent.h index 01b9ae49ec..e80175096f 100644 --- a/cpp/include/qpid/management/ManagementEvent.h +++ b/cpp/include/qpid/management/ManagementEvent.h @@ -23,7 +23,7 @@ */ #include "qpid/management/ManagementObject.h" -#include <qpid/framing/Buffer.h> +#include "qpid/types/Variant.h" #include <string> namespace qpid { @@ -32,16 +32,20 @@ namespace management { class ManagementAgent; class ManagementEvent : public ManagementItem { -public: - typedef void (*writeSchemaCall_t)(qpid::framing::Buffer&); + public: + static const uint8_t MD5_LEN = 16; + //typedef void (*writeSchemaCall_t)(qpid::framing::Buffer&); + typedef void (*writeSchemaCall_t)(std::string&); virtual ~ManagementEvent() {} virtual writeSchemaCall_t getWriteSchemaCall(void) = 0; + //virtual mapEncodeSchemaCall_t getMapEncodeSchemaCall(void) = 0; virtual std::string& getEventName() const = 0; virtual std::string& getPackageName() const = 0; virtual uint8_t* getMd5Sum() const = 0; virtual uint8_t getSeverity() const = 0; - virtual void encode(qpid::framing::Buffer&) const = 0; + virtual void encode(std::string&) const = 0; + virtual void mapEncode(qpid::types::Variant::Map&) const = 0; }; }} diff --git a/cpp/include/qpid/management/ManagementObject.h b/cpp/include/qpid/management/ManagementObject.h index b1c70f64d6..0e9c7f0a0b 100644 --- a/cpp/include/qpid/management/ManagementObject.h +++ b/cpp/include/qpid/management/ManagementObject.h @@ -24,8 +24,8 @@ #include "qpid/sys/Time.h" #include "qpid/sys/Mutex.h" -#include <qpid/framing/Buffer.h> #include "qpid/CommonImportExport.h" +#include "qpid/types/Variant.h" #include <map> #include <vector> @@ -53,23 +53,33 @@ protected: const AgentAttachment* agent; uint64_t first; uint64_t second; + uint64_t agentEpoch; std::string v2Key; + std::string agentName; void fromString(const std::string&); public: - QPID_COMMON_EXTERN ObjectId() : agent(0), first(0), second(0) {} - QPID_COMMON_EXTERN ObjectId(framing::Buffer& buf) : agent(0) { decode(buf); } - QPID_COMMON_EXTERN ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint32_t bank, uint64_t object); - QPID_COMMON_EXTERN ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq, uint64_t object); + QPID_COMMON_EXTERN ObjectId() : agent(0), first(0), second(0), agentEpoch(0) {} + QPID_COMMON_EXTERN ObjectId(const types::Variant& map) : + agent(0), first(0), second(0), agentEpoch(0) { mapDecode(map.asMap()); } + QPID_COMMON_EXTERN ObjectId(uint8_t flags, uint16_t seq, uint32_t broker); + QPID_COMMON_EXTERN ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq); QPID_COMMON_EXTERN ObjectId(std::istream&); QPID_COMMON_EXTERN ObjectId(const std::string&); + // Deprecated: + QPID_COMMON_EXTERN ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint64_t object); QPID_COMMON_EXTERN bool operator==(const ObjectId &other) const; QPID_COMMON_EXTERN bool operator<(const ObjectId &other) const; + QPID_COMMON_EXTERN void mapEncode(types::Variant::Map& map) const; + QPID_COMMON_EXTERN void mapDecode(const types::Variant::Map& map); + QPID_COMMON_EXTERN operator types::Variant::Map() const; QPID_COMMON_EXTERN uint32_t encodedSize() const { return 16; }; - QPID_COMMON_EXTERN void encode(framing::Buffer& buffer) const; - QPID_COMMON_EXTERN void decode(framing::Buffer& buffer); + QPID_COMMON_EXTERN void encode(std::string& buffer) const; + QPID_COMMON_EXTERN void decode(const std::string& buffer); + QPID_COMMON_EXTERN bool equalV1(const ObjectId &other) const; QPID_COMMON_EXTERN void setV2Key(const std::string& _key) { v2Key = _key; } QPID_COMMON_EXTERN void setV2Key(const ManagementObject& object); - QPID_COMMON_EXTERN bool equalV1(const ObjectId &other) const; + QPID_COMMON_EXTERN void setAgentName(const std::string& _name) { agentName = _name; } + QPID_COMMON_EXTERN const std::string& getAgentName() const { return agentName; } QPID_COMMON_EXTERN const std::string& getV2Key() const { return v2Key; } friend QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream&, const ObjectId&); }; @@ -94,6 +104,7 @@ public: static const uint8_t TYPE_S16 = 17; static const uint8_t TYPE_S32 = 18; static const uint8_t TYPE_S64 = 19; + static const uint8_t TYPE_LIST = 21; static const uint8_t ACCESS_RC = 1; static const uint8_t ACCESS_RW = 2; @@ -125,7 +136,7 @@ protected: uint64_t updateTime; ObjectId objectId; mutable bool configChanged; - bool instChanged; + mutable bool instChanged; bool deleted; Manageable* coreObject; mutable sys::Mutex accessLock; @@ -135,13 +146,17 @@ protected: bool forcePublish; QPID_COMMON_EXTERN int getThreadIndex(); - QPID_COMMON_EXTERN void writeTimestamps(qpid::framing::Buffer& buf) const; - QPID_COMMON_EXTERN void readTimestamps(qpid::framing::Buffer& buf); + QPID_COMMON_EXTERN void writeTimestamps(std::string& buf) const; + QPID_COMMON_EXTERN void writeTimestamps(types::Variant::Map& map) const; + QPID_COMMON_EXTERN void readTimestamps(const std::string& buf); + QPID_COMMON_EXTERN void readTimestamps(const types::Variant::Map& buf); QPID_COMMON_EXTERN uint32_t writeTimestampsSize() const; public: + QPID_COMMON_EXTERN static const uint8_t MD5_LEN = 16; QPID_COMMON_EXTERN static int maxThreads; - typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&); + //typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&); + typedef void (*writeSchemaCall_t) (std::string&); ManagementObject(Manageable* _core) : createTime(uint64_t(qpid::sys::Duration(qpid::sys::now()))), @@ -151,15 +166,28 @@ protected: virtual ~ManagementObject() {} virtual writeSchemaCall_t getWriteSchemaCall() = 0; - virtual void readProperties(qpid::framing::Buffer& buf) = 0; - virtual uint32_t writePropertiesSize() const = 0; - virtual void writeProperties(qpid::framing::Buffer& buf) const = 0; - virtual void writeStatistics(qpid::framing::Buffer& buf, - bool skipHeaders = false) = 0; - virtual void doMethod(std::string& methodName, - qpid::framing::Buffer& inBuf, - qpid::framing::Buffer& outBuf) = 0; virtual std::string getKey() const = 0; + + // Encode & Decode the property and statistics values + // for this object. + virtual void mapEncodeValues(types::Variant::Map& map, + bool includeProperties, + bool includeStatistics) = 0; + virtual void mapDecodeValues(const types::Variant::Map& map) = 0; + virtual void doMethod(std::string& methodName, + const types::Variant::Map& inMap, + types::Variant::Map& outMap) = 0; + + /** + * The following five methods are not pure-virtual because they will only + * be overridden in cases where QMFv1 is to be supported. + */ + virtual uint32_t writePropertiesSize() const { return 0; } + virtual void readProperties(const std::string&) {} + virtual void writeProperties(std::string&) const {} + virtual void writeStatistics(std::string&, bool = false) {} + virtual void doMethod(std::string&, const std::string&, std::string&) {} + QPID_COMMON_EXTERN virtual void setReference(ObjectId objectId); virtual std::string& getClassName() const = 0; @@ -183,16 +211,23 @@ protected: inline void setFlags(uint32_t f) { flags = f; } inline uint32_t getFlags() { return flags; } bool isSameClass(ManagementObject& other) { - for (int idx = 0; idx < 16; idx++) + for (int idx = 0; idx < MD5_LEN; idx++) if (other.getMd5Sum()[idx] != getMd5Sum()[idx]) return false; return other.getClassName() == getClassName() && other.getPackageName() == getPackageName(); } - QPID_COMMON_EXTERN void encode(qpid::framing::Buffer& buf) const { writeProperties(buf); } - QPID_COMMON_EXTERN void decode(qpid::framing::Buffer& buf) { readProperties(buf); } - QPID_COMMON_EXTERN uint32_t encodedSize() const { return writePropertiesSize(); } + // QPID_COMMON_EXTERN void encode(qpid::framing::Buffer& buf) const { writeProperties(buf); } + // QPID_COMMON_EXTERN void decode(qpid::framing::Buffer& buf) { readProperties(buf); } + //QPID_COMMON_EXTERN uint32_t encodedSize() const { return writePropertiesSize(); } + + // Encode/Decode the entire object as a map + QPID_COMMON_EXTERN void mapEncode(types::Variant::Map& map, + bool includeProperties=true, + bool includeStatistics=true); + + QPID_COMMON_EXTERN void mapDecode(const types::Variant::Map& map); }; typedef std::map<ObjectId, ManagementObject*> ManagementObjectMap; diff --git a/cpp/managementgen/qmf-gen b/cpp/managementgen/qmf-gen index ebc07137ae..667aa1ba2d 100755 --- a/cpp/managementgen/qmf-gen +++ b/cpp/managementgen/qmf-gen @@ -62,8 +62,10 @@ if len(args) == 0: vargs = {} if opts.brokerplugin: vargs["agentHeaderDir"] = "management" + vargs["genQmfV1"] = True else: vargs["agentHeaderDir"] = "agent" + vargs["genQmfV1"] = None for schemafile in args: package = SchemaPackage(typefile, schemafile, opts) diff --git a/cpp/managementgen/qmfgen/generate.py b/cpp/managementgen/qmfgen/generate.py index 4052b8c853..8a00b69761 100755 --- a/cpp/managementgen/qmfgen/generate.py +++ b/cpp/managementgen/qmfgen/generate.py @@ -38,39 +38,43 @@ class Template: self.filename = filename self.handler = handler self.handler.initExpansion () - self.writing = True + self.writing = 0 # 0 => write output lines; >0 => recursive depth of conditional regions def expandLine (self, line, stream, object): cursor = 0 while 1: sub = line.find ("/*MGEN:", cursor) if sub == -1: - if self.writing: + if self.writing == 0: stream.write (line[cursor:len (line)]) return subend = line.find("*/", sub) - if self.writing: + if self.writing == 0: stream.write (line[cursor:sub]) cursor = subend + 2 tag = line[sub:subend] if tag[7:10] == "IF(": - close = tag.find(")") - if close == -1: - raise ValueError ("Missing ')' on condition") - cond = tag[10:close] - dotPos = cond.find (".") - if dotPos == -1: - raise ValueError ("Invalid condition tag: %s" % cond) - tagObject = cond[0:dotPos] - tagName = cond[dotPos + 1 : len(cond)] - if not self.handler.testCondition(object, tagObject, tagName): - self.writing = False + if self.writing == 0: + close = tag.find(")") + if close == -1: + raise ValueError ("Missing ')' on condition") + cond = tag[10:close] + dotPos = cond.find (".") + if dotPos == -1: + raise ValueError ("Invalid condition tag: %s" % cond) + tagObject = cond[0:dotPos] + tagName = cond[dotPos + 1 : len(cond)] + if not self.handler.testCondition(object, tagObject, tagName): + self.writing += 1 + else: + self.writing += 1 elif tag[7:12] == "ENDIF": - self.writing = True + if self.writing > 0: + self.writing -= 1 else: equalPos = tag.find ("=") @@ -80,12 +84,12 @@ class Template: raise ValueError ("Invalid tag: %s" % tag) tagObject = tag[7:dotPos] tagName = tag[dotPos + 1:len (tag)] - if self.writing: + if self.writing == 0: self.handler.substHandler (object, stream, tagObject, tagName) else: tagKey = tag[7:equalPos] tagVal = tag[equalPos + 1:len (tag)] - if self.writing: + if self.writing == 0: self.handler.setVariable (tagKey, tagVal) def expand (self, object): @@ -297,6 +301,9 @@ class Generator: self.packagelist.append(path) self.packagePath = self.normalize(self.dest + path) + def testGenQMFv1 (self, variables): + return variables["genQmfV1"] + def genDisclaimer (self, stream, variables): prefix = variables["commentPrefix"] stream.write (prefix + " This source file was created by a code generator.\n") diff --git a/cpp/managementgen/qmfgen/management-types.xml b/cpp/managementgen/qmfgen/management-types.xml index 6dbabc90ff..857f8af212 100644 --- a/cpp/managementgen/qmfgen/management-types.xml +++ b/cpp/managementgen/qmfgen/management-types.xml @@ -19,7 +19,14 @@ under the License. --> -<type name="objId" base="REF" cpp="::qpid::management::ObjectId" encode="#.encode(@)" decode="#.decode(@)" stream="#.getV2Key()" size="16" accessor="direct" init="::qpid::management::ObjectId()" byRef="y"/> +<!-- "unmap": cast to convert from Variant to native type constructor, + "map": cast to convert from native type to Variant constructor parameter +--> + +<type name="objId" base="REF" cpp="::qpid::management::ObjectId" + encode="{std::string _s; #.encode(_s); @.putRawData(_s);}" + decode="{std::string _s; @.getRawData(_s, #.encodedSize()); #.decode(_s);}" +stream="#.getV2Key()" size="16" accessor="direct" init="::qpid::management::ObjectId()" byRef="y"/> <type name="uint8" base="U8" cpp="uint8_t" encode="@.putOctet(#)" decode="# = @.getOctet()" stream="#" size="1" accessor="direct" init="0"/> <type name="uint16" base="U16" cpp="uint16_t" encode="@.putShort(#)" decode="# = @.getShort()" stream="#" size="2" accessor="direct" init="0"/> <type name="uint32" base="U32" cpp="uint32_t" encode="@.putLong(#)" decode="# = @.getLong()" stream="#" size="4" accessor="direct" init="0"/> @@ -29,14 +36,25 @@ <type name="int32" base="S32" cpp="int32_t" encode="@.putInt32(#)" decode="# = @.getInt32()" stream="#" size="4" accessor="direct" init="0"/> <type name="int64" base="S64" cpp="int64_t" encode="@.putInt64(#)" decode="# = @.getInt64()" stream="#" size="8" accessor="direct" init="0"/> <type name="bool" base="BOOL" cpp="uint8_t" encode="@.putOctet(#?1:0)" decode="# = @.getOctet()==1" stream="#" size="1" accessor="direct" init="0"/> -<type name="sstr" base="SSTR" cpp="std::string" encode="@.putShortString(#)" decode="@.getShortString(#)" stream="#" size="(1 + #.length())" accessor="direct" init='""' byRef="y"/> -<type name="lstr" base="LSTR" cpp="std::string" encode="@.putMediumString(#)" decode="@.getMediumString(#)" stream="#" size="(2 + #.length())" accessor="direct" init='""' byRef="y"/> +<type name="sstr" base="SSTR" cpp="std::string" encode="@.putShortString(#)" decode="@.getShortString(#)" stream="#" size="(1 + #.length())" accessor="direct" init='""' byRef="y" unmap="(#).getString()"/> +<type name="lstr" base="LSTR" cpp="std::string" encode="@.putMediumString(#)" decode="@.getMediumString(#)" stream="#" size="(2 + #.length())" accessor="direct" init='""' byRef="y" unmap="(#).getString()"/> <type name="absTime" base="ABSTIME" cpp="int64_t" encode="@.putLongLong(#)" decode="# = @.getLongLong()" stream="#" size="8" accessor="direct" init="0"/> <type name="deltaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong(#)" decode="# = @.getLongLong()" stream="#" size="8" accessor="direct" init="0"/> <type name="float" base="FLOAT" cpp="float" encode="@.putFloat(#)" decode="# = @.getFloat()" stream="#" size="4" accessor="direct" init="0."/> <type name="double" base="DOUBLE" cpp="double" encode="@.putDouble(#)" decode="# = @.getDouble()" stream="#" size="8" accessor="direct" init="0."/> -<type name="uuid" base="UUID" cpp="::qpid::framing::Uuid" encode="#.encode(@)" decode="#.decode(@)" stream="#" size="16" accessor="direct" init="::qpid::framing::Uuid()" byRef="y"/> -<type name="map" base="FTABLE" cpp="::qpid::framing::FieldTable" encode="#.encode(@)" decode="#.decode(@)" stream="#" size="#.encodedSize()" accessor="direct" init="::qpid::framing::FieldTable()" byRef="y"/> +<type name="uuid" base="UUID" cpp="::qpid::types::Uuid" + encode="{::qpid::framing::Uuid _u(#.data()); _u.encode(@); }" + decode="{::qpid::framing::Uuid _u; _u.decode(@); # = ::qpid::types::Uuid(_u.data());}" + stream="#" size="16" accessor="direct" init="::qpid::types::Uuid()" byRef="y" unmap="(#).asUuid().data()" map="::qpid::types::Uuid((#).data())" /> +<type name="map" base="FTABLE" cpp="::qpid::types::Variant::Map" + encode="{::qpid::framing::FieldTable _f = ManagementAgent::fromMap(#); _f.encode(@);}" + decode="{::qpid::framing::FieldTable _f; _f.decode(@); # = ManagementAgent::toMap(_f);}" + size="::qpid::framing::FieldTable(ManagementAgent::fromMap(#)).encodedSize()" +stream="#" accessor="direct" init="::qpid::types::Variant::Map()" byRef="y" unmap="::qpid::types::Variant::Map(); assert(false); /*TBD*/"/> +<type name="list" base="LIST" cpp="::qpid::types::Variant::List" + encode="{::qpid::framing::List _l = ManagementAgent::fromList(#); _l.encode(@);}" + decode="{::qpid::framing::List _l; _l.decode(@); # = ManagementAgent::toList(_l);}" +stream="#" size="#.encodedSize()" accessor="direct" init="::qpid::types::Variant::List()" byRef="y" unmap="::qpid::types::Variant::List(); assert(false); /*TBD*/"/> <type name="hilo8" base="U8" cpp="uint8_t" encode="@.putOctet(#)" decode="# = @.getOctet()" style="wm" stream="#" size="1" accessor="counter" init="0"/> <type name="hilo16" base="U16" cpp="uint16_t" encode="@.putShort(#)" decode="# = @.getShort()" style="wm" stream="#" size="2" accessor="counter" init="0"/> diff --git a/cpp/managementgen/qmfgen/schema.py b/cpp/managementgen/qmfgen/schema.py index d80567687e..c1bb507d84 100755 --- a/cpp/managementgen/qmfgen/schema.py +++ b/cpp/managementgen/qmfgen/schema.py @@ -19,12 +19,18 @@ from xml.dom.minidom import parse, parseString, Node from cStringIO import StringIO -import md5 +#import md5 +try: + import hashlib + _md5Obj = hashlib.md5 +except ImportError: + import md5 + _md5Obj = md5.new class Hash: """ Manage the hash of an XML sub-tree """ def __init__(self, node): - self.md5Sum = md5.new() + self.md5Sum = _md5Obj() self._compute(node) def addSubHash(self, hash): @@ -64,6 +70,8 @@ class SchemaType: self.init = "0" self.perThread = False self.byRef = False + self.unmap = "#" + self.map = "#" attrs = node.attributes for idx in range (attrs.length): @@ -109,6 +117,12 @@ class SchemaType: raise ValueError ("Expected 'y' in byRef attribute") self.byRef = True + elif key == 'unmap': + self.unmap = val + + elif key == 'map': + self.map = val + else: raise ValueError ("Unknown attribute in type '%s'" % key) @@ -211,6 +225,17 @@ class SchemaType: def genRead (self, stream, varName, indent=" "): stream.write(indent + self.decode.replace("@", "buf").replace("#", varName) + ";\n") + def genUnmap (self, stream, varName, indent=" ", key=None, mapName="_map", + _optional=False): + if key is None: + key = varName + stream.write(indent + "if ((_i = " + mapName + ".find(\"" + key + "\")) != " + mapName + ".end()) {\n") + stream.write(indent + " " + varName + " = " + + self.unmap.replace("#", "_i->second") + ";\n") + if _optional: + stream.write(indent + " _found = true;\n") + stream.write(indent + "}\n") + def genWrite (self, stream, varName, indent=" "): if self.style != "mma": stream.write (indent + self.encode.replace ("@", "buf").replace ("#", varName) + ";\n") @@ -230,6 +255,31 @@ class SchemaType: .replace ("#", varName + "Count ? " + varName + "Total / " + varName + "Count : 0") + ";\n") + def genMap (self, stream, varName, indent=" ", key=None, mapName="_map"): + if key is None: + key = varName + if self.style != "mma": + var_cast = self.map.replace("#", varName) + stream.write(indent + mapName + "[\"" + key + "\"] = ::qpid::types::Variant(" + var_cast + ");\n") + if self.style == "wm": + var_cast_hi = self.map.replace("#", varName + "High") + var_cast_lo = self.map.replace("#", varName + "Low") + stream.write(indent + mapName + "[\"" + key + "High\"] = " + + "::qpid::types::Variant(" + var_cast_hi + ");\n") + stream.write(indent + mapName + "[\"" + key + "Low\"] = " + + "::qpid::types::Variant(" + var_cast_lo + ");\n") + if self.style == "mma": + var_cast = self.map.replace("#", varName + "Count") + stream.write(indent + mapName + "[\"" + key + "Count\"] = " + "::qpid::types::Variant(" + var_cast + ");\n") + var_cast = self.map.replace("#", varName + "Min") + stream.write(indent + mapName + "[\"" + key + "Min\"] = " + + "(" + varName + "Count ? ::qpid::types::Variant(" + var_cast + ") : ::qpid::types::Variant(0));\n") + var_cast = self.map.replace("#", varName + "Max") + stream.write(indent + mapName + "[\"" + key + "Max\"] = " + "::qpid::types::Variant(" + var_cast + ");\n") + + var_cast = self.map.replace("#", "(" + varName + "Total / " + varName + "Count)") + stream.write(indent + mapName + "[\"" + key + "Avg\"] = " + + "(" + varName + "Count ? ::qpid::types::Variant(" + var_cast + ") : ::qpid::types::Variant(0));\n") def getReadCode (self, varName, bufName): result = self.decode.replace ("@", bufName).replace ("#", varName) @@ -392,6 +442,29 @@ class SchemaProperty: stream.write (" ft.setString (DESC, \"" + self.desc + "\");\n") stream.write (" buf.put (ft);\n\n") + + def genSchemaMap(self, stream): + stream.write (" {\n") + stream.write (" ::qpid::types::Variant::Map _value;\n") + stream.write (" _value[TYPE] = TYPE_" + self.type.type.base +";\n") + stream.write (" _value[ACCESS] = ACCESS_" + self.access + ";\n") + stream.write (" _value[IS_INDEX] = " + str (self.isIndex) + ";\n") + stream.write (" _value[IS_OPTIONAL] = " + str (self.isOptional) + ";\n") + if self.unit != None: + stream.write (" _value[UNIT] = \"" + self.unit + "\";\n") + if self.min != None: + stream.write (" _value[MIN] = " + self.min + ";\n") + if self.max != None: + stream.write (" _value[MAX] = " + self.max + ";\n") + if self.maxLen != None: + stream.write (" _value[MAXLEN] = " + self.maxLen + ";\n") + if self.desc != None: + stream.write (" _value[DESC] = \"" + self.desc + "\";\n") + stream.write (" _props[\"" + self.name + "\"] = _value;\n") + stream.write (" }\n\n") + + + def genSize (self, stream): indent = " " if self.isOptional: @@ -419,6 +492,43 @@ class SchemaProperty: if self.isOptional: stream.write(" }\n") + def genUnmap (self, stream): + indent = " " + if self.isOptional: + stream.write(" _found = false;\n") + self.type.type.genUnmap (stream, self.name, indent, _optional=self.isOptional) + if self.isOptional: + stream.write(" if (_found) {\n") + stream.write(" presenceMask[presenceByte_%s] |= presenceMask_%s;\n" % + (self.name, self.name)) + stream.write(" }\n") + + def genMap (self, stream): + indent = " " + if self.isOptional: + stream.write(" if (presenceMask[presenceByte_%s] & presenceMask_%s) {\n" % (self.name, self.name)) + indent = " " + self.type.type.genMap (stream, self.name, indent) + if self.isOptional: + stream.write(" }\n") + + + def __repr__(self): + m = {} + m["name"] = self.name + m["type"] = self.type + m["ref"] = self.ref + m["access"] = self.access + m["isIndex"] = self.isIndex + m["isParentRef"] = self.isParentRef + m["isGeneralRef"] = self.isGeneralRef + m["isOptional"] = self.isOptional + m["unit"] = self.unit + m["min"] = self.min + m["max"] = self.max + m["maxLen"] = self.maxLen + m["desc"] = self.desc + return str(m) #===================================================================================== # @@ -492,6 +602,17 @@ class SchemaStatistic: stream.write (" ft.setString (DESC, \"" + desc + "\");\n") stream.write (" buf.put (ft);\n\n") + def genSchemaTextMap(self, stream, name, desc): + stream.write (" {\n") + stream.write (" ::qpid::types::Variant::Map _value;\n") + stream.write (" _value[TYPE] = TYPE_" + self.type.type.base +";\n") + if self.unit != None: + stream.write (" _value[UNIT] = \"" + self.unit + "\";\n") + if desc != None: + stream.write (" _value[DESC] = \"" + desc + "\";\n") + stream.write (" _stats[\"" + self.name + "\"] = _value;\n") + stream.write (" }\n\n") + def genSchema (self, stream): if self.type.type.style != "mma": self.genSchemaText (stream, self.name, self.desc) @@ -518,6 +639,32 @@ class SchemaStatistic: self.genSchemaText (stream, self.name + "Max", descMax) self.genSchemaText (stream, self.name + "Average", descAverage) + def genSchemaMap (self, stream): + if self.type.type.style != "mma": + self.genSchemaTextMap (stream, self.name, self.desc) + if self.type.type.style == "wm": + descHigh = self.desc + descLow = self.desc + if self.desc != None: + descHigh = descHigh + " (High)" + descLow = descLow + " (Low)" + self.genSchemaTextMap (stream, self.name + "High", descHigh) + self.genSchemaTextMap (stream, self.name + "Low", descLow) + if self.type.type.style == "mma": + descCount = self.desc + descMin = self.desc + descMax = self.desc + descAverage = self.desc + if self.desc != None: + descCount = descCount + " (Samples)" + descMin = descMin + " (Min)" + descMax = descMax + " (Max)" + descAverage = descAverage + " (Average)" + self.genSchemaTextMap (stream, self.name + "Samples", descCount) + self.genSchemaTextMap (stream, self.name + "Min", descMin) + self.genSchemaTextMap (stream, self.name + "Max", descMax) + self.genSchemaTextMap (stream, self.name + "Average", descAverage) + def genAssign (self, stream): if self.assign != None: if self.type.type.perThread: @@ -533,6 +680,12 @@ class SchemaStatistic: else: self.type.type.genWrite (stream, self.name) + def genMap (self, stream): + if self.type.type.perThread: + self.type.type.genMap(stream, "totals." + self.name, key=self.name) + else: + self.type.type.genMap(stream, self.name) + def genInitialize (self, stream, prefix="", indent=" "): val = self.type.type.init if self.type.type.style != "mma": @@ -648,6 +801,30 @@ class SchemaArg: stream.write (" ft.setString (DESC, \"" + self.desc + "\");\n") stream.write (" buf.put (ft);\n\n") + def genSchemaMap (self, stream, event=False): + stream.write (" {\n") + stream.write (" ::qpid::types::Variant::Map _avalue;\n") + stream.write (" _avalue[TYPE] = TYPE_" + self.type.type.base +";\n") + if (not event): + stream.write (" _avalue[DIR] = \"" + self.dir + "\";\n") + if self.unit != None: + stream.write (" _avalue[UNIT] = \"" + self.unit + "\";\n") + if not event: + if self.min != None: + stream.write (" _avalue[MIN] = " + self.min + ";\n") + if self.max != None: + stream.write (" _avalue[MAX] = " + self.max + ";\n") + if self.maxLen != None: + stream.write (" _avalue[MAXLEN] = " + self.maxLen + ";\n") + if self.default != None: + stream.write (" _avalue[DEFAULT] = \"" + self.default + "\";\n") + if self.desc != None: + stream.write (" _avalue[DESC] = \"" + self.desc + "\";\n") + stream.write (" _args[\"" + self.name + "\"] = _avalue;\n") + stream.write (" }\n") + + + def genFormalParam (self, stream, variables): stream.write ("%s _%s" % (self.type.type.asArg, self.name)) @@ -727,6 +904,24 @@ class SchemaMethod: for arg in self.args: arg.genSchema (stream) + def genSchemaMap (self, stream, variables): + stream.write (" {\n") + stream.write (" ::qpid::types::Variant::Map _value;\n") + stream.write (" ::qpid::types::Variant::Map _args;\n") + stream.write (" _value[ARGCOUNT] = " + str(len(self.args)) + ";\n") + if self.desc != None: + stream.write (" _value[DESC] = \"" + self.desc + "\";\n") + + for arg in self.args: + arg.genSchemaMap (stream) + + stream.write (" if (!_args.empty())\n") + stream.write (" _value[ARGS] = _args;\n") + + + stream.write (" _methods[\"" + self.name + "\"] = _value;\n") + stream.write (" }\n\n") + #===================================================================================== # #===================================================================================== @@ -849,10 +1044,20 @@ class SchemaEvent: for arg in self.args: stream.write(" " + arg.type.type.encode.replace("@", "buf").replace("#", arg.name) + ";\n") + def genArgMap(self, stream, variables): + for arg in self.args: + arg.type.type.genMap(stream, arg.name, " ", mapName="map") + #stream.write(" " + arg.type.type.encode.replace("@", "buf").replace("#", arg.name) + ";\n") + + def genArgSchema(self, stream, variables): for arg in self.args: arg.genSchema(stream, True) + def genArgSchemaMap(self, stream, variables): + for arg in self.args: + arg.genSchemaMap(stream, True) + def genSchemaMD5(self, stream, variables): sum = self.hash.getDigest() for idx in range (len (sum)): @@ -1023,12 +1228,36 @@ class SchemaClass: inArgCount = inArgCount + 1 if methodCount == 0: - stream.write ("string&, Buffer&, Buffer& outBuf") + stream.write ("string&, const string&, string& outStr") + else: + if inArgCount == 0: + stream.write ("string& methodName, const string&, string& outStr") + else: + stream.write ("string& methodName, const string& inStr, string& outStr") + + + def genDoMapMethodArgs (self, stream, variables): + methodCount = 0 + inArgCount = 0 + for method in self.methods: + methodCount = methodCount + 1 + for arg in method.args: + if arg.getDir () == "I" or arg.getDir () == "IO": + inArgCount = inArgCount + 1 + + if methodCount == 0: + stream.write ("string&," + + " const ::qpid::types::Variant::Map&," + + " ::qpid::types::Variant::Map& outMap") else: if inArgCount == 0: - stream.write ("string& methodName, Buffer&, Buffer& outBuf") + stream.write ("string& methodName," + + " const ::qpid::types::Variant::Map&," + + " ::qpid::types::Variant::Map& outMap") else: - stream.write ("string& methodName, Buffer& inBuf, Buffer& outBuf") + stream.write ("string& methodName," + + " const ::qpid::types::Variant::Map& inMap," + + " ::qpid::types::Variant::Map& outMap") def genHiLoStatResets (self, stream, variables): for inst in self.statistics: @@ -1109,8 +1338,22 @@ class SchemaClass: stream.write ("%d" % len (self.methods)) def genMethodHandlers (self, stream, variables): + inArgs = False + for method in self.methods: + for arg in method.args: + if arg.getDir () == "I" or arg.getDir () == "IO": + inArgs = True; + break + + if inArgs: + stream.write("\n") + stream.write(" char *_tmpBuf = new char[inStr.length()];\n") + stream.write(" memcpy(_tmpBuf, inStr.data(), inStr.length());\n") + stream.write(" ::qpid::framing::Buffer inBuf(_tmpBuf, inStr.length());\n") + for method in self.methods: stream.write ("\n if (methodName == \"" + method.getName () + "\") {\n") + stream.write (" _matched = true;\n") if method.getArgCount () == 0: stream.write (" ::qpid::management::ArgsNone ioArgs;\n") else: @@ -1131,7 +1374,43 @@ class SchemaClass: stream.write (" " +\ arg.type.type.getWriteCode ("ioArgs." +\ arg.dir.lower () + "_" +\ - arg.name, "outBuf") + ";\n") + arg.name, "outBuf") + ";\n") + stream.write(" }\n") + + if inArgs: + stream.write ("\n delete [] _tmpBuf;\n") + + + + def genMapMethodHandlers (self, stream, variables): + for method in self.methods: + stream.write ("\n if (methodName == \"" + method.getName () + "\") {\n") + if method.getArgCount () == 0: + stream.write (" ::qpid::management::ArgsNone ioArgs;\n") + else: + stream.write (" Args" + method.getFullName () + " ioArgs;\n") + stream.write (" ::qpid::types::Variant::Map::const_iterator _i;\n") + + # decode each input argument from the input map + for arg in method.args: + if arg.getDir () == "I" or arg.getDir () == "IO": + arg.type.type.genUnmap(stream, + "ioArgs." + arg.dir.lower () + "_" + arg.name, + " ", + arg.name, + "inMap") + + stream.write (" status = coreObject->ManagementMethod (METHOD_" +\ + method.getName().upper() + ", ioArgs, text);\n") + stream.write (" outMap[\"_status_code\"] = (uint32_t) status;\n") + stream.write (" outMap[\"_status_text\"] = ::qpid::management::Manageable::StatusText(status, text);\n") + for arg in method.args: + if arg.getDir () == "O" or arg.getDir () == "IO": + arg.type.type.genMap(stream, + "ioArgs." + arg.dir.lower () + "_" + arg.name, + " ", + arg.name, + "outMap") stream.write (" return;\n }\n") def genOpenNamespaces (self, stream, variables): @@ -1160,6 +1439,10 @@ class SchemaClass: for prop in self.properties: prop.genSchema (stream) + def genPropertySchemaMap (self, stream, variables): + for prop in self.properties: + prop.genSchemaMap(stream) + def genSetGeneralReferenceDeclaration (self, stream, variables): for prop in self.properties: if prop.isGeneralRef: @@ -1169,6 +1452,10 @@ class SchemaClass: for stat in self.statistics: stat.genSchema (stream) + def genStatisticSchemaMap (self, stream, variables): + for stat in self.statistics: + stat.genSchemaMap(stream) + def genMethodIdDeclarations (self, stream, variables): number = 1 for method in self.methods: @@ -1180,6 +1467,10 @@ class SchemaClass: for method in self.methods: method.genSchema (stream, variables) + def genMethodSchemaMap(self, stream, variables): + for method in self.methods: + method.genSchemaMap(stream, variables) + def genNameCap (self, stream, variables): stream.write (capitalize(self.name)) @@ -1241,6 +1532,17 @@ class SchemaClass: for stat in self.statistics: stat.genWrite (stream) + def genMapEncodeProperties(self, stream, variables): + for prop in self.properties: + prop.genMap (stream) + + def genMapEncodeStatistics (self, stream, variables): + for stat in self.statistics: + stat.genMap (stream) + + def genMapDecodeProperties (self, stream, variables): + for prop in self.properties: + prop.genUnmap (stream) class SchemaEventArgs: def __init__(self, package, node, typespec, fragments, options): diff --git a/cpp/managementgen/qmfgen/templates/Args.h b/cpp/managementgen/qmfgen/templates/Args.h index 074ccf9940..20681ab477 100644 --- a/cpp/managementgen/qmfgen/templates/Args.h +++ b/cpp/managementgen/qmfgen/templates/Args.h @@ -24,8 +24,8 @@ /*MGEN:Root.Disclaimer*/ #include "qpid/management/Args.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/framing/Uuid.h" +//#include "qpid/framing/FieldTable.h" +//#include "qpid/framing/Uuid.h" #include <string> namespace qmf { diff --git a/cpp/managementgen/qmfgen/templates/Class.cpp b/cpp/managementgen/qmfgen/templates/Class.cpp index e6362758ba..ed4e17720f 100644 --- a/cpp/managementgen/qmfgen/templates/Class.cpp +++ b/cpp/managementgen/qmfgen/templates/Class.cpp @@ -21,15 +21,15 @@ /*MGEN:Root.Disclaimer*/ #include "qpid/log/Statement.h" +#include "qpid/management/Manageable.h" #include "qpid/framing/FieldTable.h" -#include "qpid/management/Manageable.h" +#include "qpid/framing/Buffer.h" #include "qpid//*MGEN:Class.AgentHeaderLocation*//ManagementAgent.h" #include "/*MGEN:Class.NameCap*/.h" /*MGEN:Class.MethodArgIncludes*/ #include <iostream> using namespace qmf::/*MGEN:Class.Namespace*/; -using namespace qpid::framing; using qpid::management::ManagementAgent; using qpid::management::Manageable; using qpid::management::ManagementObject; @@ -38,7 +38,7 @@ using std::string; string /*MGEN:Class.NameCap*/::packageName = string ("/*MGEN:Class.NamePackageLower*/"); string /*MGEN:Class.NameCap*/::className = string ("/*MGEN:Class.NameLower*/"); -uint8_t /*MGEN:Class.NameCap*/::md5Sum[16] = +uint8_t /*MGEN:Class.NameCap*/::md5Sum[MD5_LEN] = {/*MGEN:Class.SchemaMD5*/}; /*MGEN:Class.NameCap*/::/*MGEN:Class.NameCap*/ (ManagementAgent*, Manageable* _core/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/) : @@ -90,9 +90,12 @@ void /*MGEN:Class.NameCap*/::registerSelf(ManagementAgent* agent) agent->registerClass(packageName, className, md5Sum, writeSchema); } -void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf) +void /*MGEN:Class.NameCap*/::writeSchema (std::string& schema) { - FieldTable ft; + const int _bufSize=65536; + char _msgChars[_bufSize]; + ::qpid::framing::Buffer buf(_msgChars, _bufSize); + ::qpid::framing::FieldTable ft; // Schema class header: buf.putOctet (CLASS_KIND_TABLE); @@ -109,10 +112,15 @@ void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf) /*MGEN:Class.StatisticSchema*/ // Methods /*MGEN:Class.MethodSchema*/ + { + uint32_t _len = buf.getPosition(); + buf.reset(); + buf.getRawData(schema, _len); + } } /*MGEN:IF(Class.ExistPerThreadStats)*/ -void /*MGEN:Class.NameCap*/::aggregatePerThreadStats(struct PerThreadStats* totals) +void /*MGEN:Class.NameCap*/::aggregatePerThreadStats(struct PerThreadStats* totals) const { /*MGEN:Class.InitializeTotalPerThreadStats*/ for (int idx = 0; idx < maxThreads; idx++) { @@ -124,6 +132,7 @@ void /*MGEN:Class.NameCap*/::aggregatePerThreadStats(struct PerThreadStats* tota } /*MGEN:ENDIF*/ +/*MGEN:IF(Root.GenQMFv1)*/ uint32_t /*MGEN:Class.NameCap*/::writePropertiesSize() const { uint32_t size = writeTimestampsSize(); @@ -134,32 +143,62 @@ uint32_t /*MGEN:Class.NameCap*/::writePropertiesSize() const return size; } -void /*MGEN:Class.NameCap*/::readProperties (Buffer& buf) +void /*MGEN:Class.NameCap*/::readProperties (const std::string& _sBuf) { + char *_tmpBuf = new char[_sBuf.length()]; + memcpy(_tmpBuf, _sBuf.data(), _sBuf.length()); + ::qpid::framing::Buffer buf(_tmpBuf, _sBuf.length()); ::qpid::sys::Mutex::ScopedLock mutex(accessLock); - readTimestamps(buf); + + { + std::string _tbuf; + buf.getRawData(_tbuf, writeTimestampsSize()); + readTimestamps(_tbuf); + } + /*MGEN:IF(Class.ExistOptionals)*/ for (uint8_t idx = 0; idx < /*MGEN:Class.PresenceMaskBytes*/; idx++) presenceMask[idx] = buf.getOctet(); /*MGEN:ENDIF*/ /*MGEN:Class.ReadProperties*/ + + delete [] _tmpBuf; } -void /*MGEN:Class.NameCap*/::writeProperties (Buffer& buf) const +void /*MGEN:Class.NameCap*/::writeProperties (std::string& _sBuf) const { + const int _bufSize=65536; + char _msgChars[_bufSize]; + ::qpid::framing::Buffer buf(_msgChars, _bufSize); + ::qpid::sys::Mutex::ScopedLock mutex(accessLock); configChanged = false; - writeTimestamps (buf); + { + std::string _tbuf; + writeTimestamps(_tbuf); + buf.putRawData(_tbuf); + } + + /*MGEN:IF(Class.ExistOptionals)*/ for (uint8_t idx = 0; idx < /*MGEN:Class.PresenceMaskBytes*/; idx++) buf.putOctet(presenceMask[idx]); /*MGEN:ENDIF*/ /*MGEN:Class.WriteProperties*/ + + uint32_t _bufLen = buf.getPosition(); + buf.reset(); + + buf.getRawData(_sBuf, _bufLen); } -void /*MGEN:Class.NameCap*/::writeStatistics (Buffer& buf, bool skipHeaders) +void /*MGEN:Class.NameCap*/::writeStatistics (std::string& _sBuf, bool skipHeaders) { + const int _bufSize=65536; + char _msgChars[_bufSize]; + ::qpid::framing::Buffer buf(_msgChars, _bufSize); + ::qpid::sys::Mutex::ScopedLock mutex(accessLock); instChanged = false; /*MGEN:IF(Class.ExistPerThreadAssign)*/ @@ -175,8 +214,12 @@ void /*MGEN:Class.NameCap*/::writeStatistics (Buffer& buf, bool skipHeaders) aggregatePerThreadStats(&totals); /*MGEN:ENDIF*/ /*MGEN:Class.Assign*/ - if (!skipHeaders) - writeTimestamps (buf); + if (!skipHeaders) { + std::string _tbuf; + writeTimestamps (_tbuf); + buf.putRawData(_tbuf); + } + /*MGEN:Class.WriteStatistics*/ // Maintenance of hi-lo statistics @@ -189,6 +232,11 @@ void /*MGEN:Class.NameCap*/::writeStatistics (Buffer& buf, bool skipHeaders) } } /*MGEN:ENDIF*/ + + uint32_t _bufLen = buf.getPosition(); + buf.reset(); + + buf.getRawData(_sBuf, _bufLen); } void /*MGEN:Class.NameCap*/::doMethod (/*MGEN:Class.DoMethodArgs*/) @@ -196,11 +244,25 @@ void /*MGEN:Class.NameCap*/::doMethod (/*MGEN:Class.DoMethodArgs*/) Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; std::string text; + bool _matched = false; + + const int _bufSize=65536; + char _msgChars[_bufSize]; + ::qpid::framing::Buffer outBuf(_msgChars, _bufSize); + /*MGEN:Class.MethodHandlers*/ - outBuf.putLong(status); - outBuf.putShortString(Manageable::StatusText(status, text)); -} + if (!_matched) { + outBuf.putLong(status); + outBuf.putShortString(Manageable::StatusText(status, text)); + } + + uint32_t _bufLen = outBuf.getPosition(); + outBuf.reset(); + + outBuf.getRawData(outStr, _bufLen); +} +/*MGEN:ENDIF*/ std::string /*MGEN:Class.NameCap*/::getKey() const { std::stringstream key; @@ -209,3 +271,67 @@ std::string /*MGEN:Class.NameCap*/::getKey() const return key.str(); } + + +void /*MGEN:Class.NameCap*/::mapEncodeValues (::qpid::types::Variant::Map& _map, + bool includeProperties, + bool includeStatistics) +{ + using namespace ::qpid::types; + ::qpid::sys::Mutex::ScopedLock mutex(accessLock); + + if (includeProperties) { + configChanged = false; +/*MGEN:Class.MapEncodeProperties*/ + } + + if (includeStatistics) { + instChanged = false; +/*MGEN:IF(Class.ExistPerThreadAssign)*/ + for (int idx = 0; idx < maxThreads; idx++) { + struct PerThreadStats* threadStats = perThreadStatsArray[idx]; + if (threadStats != 0) { +/*MGEN:Class.PerThreadAssign*/ + } + } +/*MGEN:ENDIF*/ +/*MGEN:IF(Class.ExistPerThreadStats)*/ + struct PerThreadStats totals; + aggregatePerThreadStats(&totals); +/*MGEN:ENDIF*/ +/*MGEN:Class.Assign*/ + +/*MGEN:Class.MapEncodeStatistics*/ + + // Maintenance of hi-lo statistics +/*MGEN:Class.HiLoStatResets*/ +/*MGEN:IF(Class.ExistPerThreadResets)*/ + for (int idx = 0; idx < maxThreads; idx++) { + struct PerThreadStats* threadStats = perThreadStatsArray[idx]; + if (threadStats != 0) { +/*MGEN:Class.PerThreadHiLoStatResets*/ + } + } +/*MGEN:ENDIF*/ + } +} + +void /*MGEN:Class.NameCap*/::mapDecodeValues (const ::qpid::types::Variant::Map& _map) +{ + ::qpid::types::Variant::Map::const_iterator _i; + ::qpid::sys::Mutex::ScopedLock mutex(accessLock); +/*MGEN:IF(Class.ExistOptionals)*/ + bool _found; +/*MGEN:ENDIF*/ +/*MGEN:Class.MapDecodeProperties*/ +} + +void /*MGEN:Class.NameCap*/::doMethod (/*MGEN:Class.DoMapMethodArgs*/) +{ + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + std::string text; + +/*MGEN:Class.MapMethodHandlers*/ + outMap["_status_code"] = (uint32_t) status; + outMap["_status_text"] = Manageable::StatusText(status, text); +} diff --git a/cpp/managementgen/qmfgen/templates/Class.h b/cpp/managementgen/qmfgen/templates/Class.h index 8efacd650f..cdb31c4c9e 100644 --- a/cpp/managementgen/qmfgen/templates/Class.h +++ b/cpp/managementgen/qmfgen/templates/Class.h @@ -24,8 +24,6 @@ /*MGEN:Root.Disclaimer*/ #include "qpid/management/ManagementObject.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/framing/Uuid.h" namespace qpid { namespace management { @@ -42,7 +40,7 @@ class /*MGEN:Class.NameCap*/ : public ::qpid::management::ManagementObject static std::string packageName; static std::string className; - static uint8_t md5Sum[16]; + static uint8_t md5Sum[MD5_LEN]; /*MGEN:IF(Class.ExistOptionals)*/ uint8_t presenceMask[/*MGEN:Class.PresenceMaskBytes*/]; /*MGEN:Class.PresenceMaskConstants*/ @@ -71,18 +69,28 @@ class /*MGEN:Class.NameCap*/ : public ::qpid::management::ManagementObject return threadStats; } - void aggregatePerThreadStats(struct PerThreadStats*); + void aggregatePerThreadStats(struct PerThreadStats*) const; /*MGEN:ENDIF*/ public: - static void writeSchema(::qpid::framing::Buffer& buf); + static void writeSchema(std::string& schema); + void mapEncodeValues(::qpid::types::Variant::Map& map, + bool includeProperties=true, + bool includeStatistics=true); + void mapDecodeValues(const ::qpid::types::Variant::Map& map); + void doMethod(std::string& methodName, + const ::qpid::types::Variant::Map& inMap, + ::qpid::types::Variant::Map& outMap); + std::string getKey() const; +/*MGEN:IF(Root.GenQMFv1)*/ uint32_t writePropertiesSize() const; - void readProperties(::qpid::framing::Buffer& buf); - void writeProperties(::qpid::framing::Buffer& buf) const; - void writeStatistics(::qpid::framing::Buffer& buf, bool skipHeaders = false); + void readProperties(const std::string& buf); + void writeProperties(std::string& buf) const; + void writeStatistics(std::string& buf, bool skipHeaders = false); void doMethod(std::string& methodName, - ::qpid::framing::Buffer& inBuf, - ::qpid::framing::Buffer& outBuf); - std::string getKey() const; + const std::string& inBuf, + std::string& outBuf); +/*MGEN:ENDIF*/ + writeSchemaCall_t getWriteSchemaCall() { return writeSchema; } /*MGEN:IF(Class.NoStatistics)*/ // Stub for getInstChanged. There are no statistics in this class. diff --git a/cpp/managementgen/qmfgen/templates/Event.cpp b/cpp/managementgen/qmfgen/templates/Event.cpp index a4fc28990d..d760fd9014 100644 --- a/cpp/managementgen/qmfgen/templates/Event.cpp +++ b/cpp/managementgen/qmfgen/templates/Event.cpp @@ -21,13 +21,13 @@ /*MGEN:Root.Disclaimer*/ #include "qpid/log/Statement.h" -#include "qpid/framing/FieldTable.h" #include "qpid/management/Manageable.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/Buffer.h" #include "qpid//*MGEN:Event.AgentHeaderLocation*//ManagementAgent.h" #include "Event/*MGEN:Event.NameCap*/.h" using namespace qmf::/*MGEN:Event.Namespace*/; -using namespace qpid::framing; using qpid::management::ManagementAgent; using qpid::management::Manageable; using qpid::management::ManagementObject; @@ -56,23 +56,45 @@ void Event/*MGEN:Event.NameCap*/::registerSelf(ManagementAgent* agent) agent->registerEvent(packageName, eventName, md5Sum, writeSchema); } -void Event/*MGEN:Event.NameCap*/::writeSchema (Buffer& buf) +void Event/*MGEN:Event.NameCap*/::writeSchema (std::string& schema) { - FieldTable ft; + const int _bufSize = 65536; + char _msgChars[_bufSize]; + ::qpid::framing::Buffer buf(_msgChars, _bufSize); + ::qpid::framing::FieldTable ft; // Schema class header: buf.putOctet (CLASS_KIND_EVENT); buf.putShortString (packageName); // Package Name buf.putShortString (eventName); // Event Name buf.putBin128 (md5Sum); // Schema Hash - buf.putOctet (0); // No Superclass buf.putShort (/*MGEN:Event.ArgCount*/); // Argument Count // Arguments /*MGEN:Event.ArgSchema*/ + { + uint32_t _len = buf.getPosition(); + buf.reset(); + buf.getRawData(schema, _len); + } } -void Event/*MGEN:Event.NameCap*/::encode(::qpid::framing::Buffer& buf) const +void Event/*MGEN:Event.NameCap*/::encode(std::string& _sBuf) const { + const int _bufSize=65536; + char _msgChars[_bufSize]; + ::qpid::framing::Buffer buf(_msgChars, _bufSize); + /*MGEN:Event.ArgEncodes*/ + + uint32_t _bufLen = buf.getPosition(); + buf.reset(); + + buf.getRawData(_sBuf, _bufLen); +} + +void Event/*MGEN:Event.NameCap*/::mapEncode(::qpid::types::Variant::Map& map) const +{ + using namespace ::qpid::types; +/*MGEN:Event.ArgMap*/ } diff --git a/cpp/managementgen/qmfgen/templates/Event.h b/cpp/managementgen/qmfgen/templates/Event.h index b5c2a211d1..4f912cf220 100644 --- a/cpp/managementgen/qmfgen/templates/Event.h +++ b/cpp/managementgen/qmfgen/templates/Event.h @@ -24,8 +24,6 @@ /*MGEN:Root.Disclaimer*/ #include "qpid/management/ManagementEvent.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/framing/Uuid.h" namespace qmf { /*MGEN:Event.OpenNamespaces*/ @@ -33,10 +31,10 @@ namespace qmf { class Event/*MGEN:Event.NameCap*/ : public ::qpid::management::ManagementEvent { private: - static void writeSchema (::qpid::framing::Buffer& buf); + static void writeSchema (std::string& schema); static std::string packageName; static std::string eventName; - static uint8_t md5Sum[16]; + static uint8_t md5Sum[MD5_LEN]; /*MGEN:Event.ArgDeclarations*/ @@ -51,7 +49,8 @@ class Event/*MGEN:Event.NameCap*/ : public ::qpid::management::ManagementEvent std::string& getEventName() const { return eventName; } uint8_t* getMd5Sum() const { return md5Sum; } uint8_t getSeverity() const { return /*MGEN:Event.Severity*/; } - void encode(::qpid::framing::Buffer& buffer) const; + void encode(std::string& buffer) const; + void mapEncode(::qpid::types::Variant::Map& map) const; }; }/*MGEN:Event.CloseNamespaces*/ diff --git a/cpp/src/qpid/acl/Acl.cpp b/cpp/src/qpid/acl/Acl.cpp index 21a9e2055e..e510920f6c 100644 --- a/cpp/src/qpid/acl/Acl.cpp +++ b/cpp/src/qpid/acl/Acl.cpp @@ -23,6 +23,7 @@ #include "qpid/Plugin.h" #include "qpid/Options.h" #include "qpid/log/Logger.h" +#include "qpid/types/Variant.h" #include "qmf/org/apache/qpid/acl/Package.h" #include "qmf/org/apache/qpid/acl/EventAllow.h" #include "qmf/org/apache/qpid/acl/EventDeny.h" @@ -94,7 +95,7 @@ Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(fals " ObjectType:" << AclHelper::getObjectTypeStr(objType) << " Name:" << name ); agent->raiseEvent(_qmf::EventAllow(id, AclHelper::getActionStr(action), AclHelper::getObjectTypeStr(objType), - name, framing::FieldTable())); + name, types::Variant::Map())); case ALLOW: return true; case DENY: @@ -106,7 +107,7 @@ Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(fals QPID_LOG(info, "ACL Deny id:" << id << " action:" << AclHelper::getActionStr(action) << " ObjectType:" << AclHelper::getObjectTypeStr(objType) << " Name:" << name); agent->raiseEvent(_qmf::EventDeny(id, AclHelper::getActionStr(action), AclHelper::getObjectTypeStr(objType), - name, framing::FieldTable())); + name, types::Variant::Map())); return false; } return false; diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 637645bb04..42bc36c4b8 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -22,7 +22,7 @@ #include "qpid/management/ManagementObject.h" #include "qpid/log/Statement.h" #include "qpid/agent/ManagementAgentImpl.h" -#include "qpid/sys/Mutex.h" +#include "qpid/amqp_0_10/Codecs.h" #include <list> #include <string.h> #include <stdlib.h> @@ -41,6 +41,9 @@ using std::ofstream; using std::ifstream; using std::string; using std::endl; +using qpid::types::Variant; +using qpid::amqp_0_10::MapCodec; +using qpid::amqp_0_10::ListCodec; namespace { Mutex lock; @@ -81,7 +84,7 @@ const string ManagementAgentImpl::storeMagicNumber("MA02"); ManagementAgentImpl::ManagementAgentImpl() : interval(10), extThread(false), pipeHandle(0), notifyCallback(0), notifyContext(0), notifyable(0), inCallback(false), - initialized(false), connected(false), lastFailure("never connected"), + initialized(false), connected(false), useMapMsg(false), lastFailure("never connected"), clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0), assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0), connThreadBody(*this), connThread(connThreadBody), @@ -117,6 +120,21 @@ ManagementAgentImpl::~ManagementAgentImpl() } } +void ManagementAgentImpl::setName(const string& vendor, const string& product, const string& instance) +{ + attrMap["_vendor"] = vendor; + attrMap["_product"] = product; + string inst; + if (instance.empty()) { + inst = qpid::types::Uuid(true).str(); + } else + inst = instance; + + name_address = vendor + ":" + product + ":" + inst; + attrMap["_instance"] = inst; + attrMap["_name"] = name_address; +} + void ManagementAgentImpl::init(const string& brokerHost, uint16_t brokerPort, uint16_t intervalSeconds, @@ -140,7 +158,7 @@ void ManagementAgentImpl::init(const string& brokerHost, void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings, uint16_t intervalSeconds, bool useExternalThread, - const std::string& _storeFile) + const string& _storeFile) { interval = intervalSeconds; extThread = useExternalThread; @@ -157,13 +175,16 @@ void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings, bootSequence = 1; storeData(true); + if (attrMap.empty()) + setName("vendor", "product"); + initialized = true; } void ManagementAgentImpl::registerClass(const string& packageName, const string& className, uint8_t* md5Sum, - qpid::management::ManagementObject::writeSchemaCall_t schemaCall) + ManagementObject::writeSchemaCall_t schemaCall) { Mutex::ScopedLock lock(agentLock); PackageMap::iterator pIter = findOrAddPackage(packageName); @@ -173,49 +194,77 @@ void ManagementAgentImpl::registerClass(const string& packageName, void ManagementAgentImpl::registerEvent(const string& packageName, const string& eventName, uint8_t* md5Sum, - qpid::management::ManagementObject::writeSchemaCall_t schemaCall) + ManagementObject::writeSchemaCall_t schemaCall) { Mutex::ScopedLock lock(agentLock); PackageMap::iterator pIter = findOrAddPackage(packageName); addClassLocal(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall); } +// old-style add object: 64bit id - deprecated ObjectId ManagementAgentImpl::addObject(ManagementObject* object, uint64_t persistId) { + std::string key; + if (persistId) { + key = boost::lexical_cast<std::string>(persistId); + } + return addObject(object, key, persistId != 0); +} + + +// new style add object - use this approach! +ObjectId ManagementAgentImpl::addObject(ManagementObject* object, + const std::string& key, + bool persistent) +{ Mutex::ScopedLock lock(addLock); - uint16_t sequence = persistId ? 0 : bootSequence; - uint64_t objectNum = persistId ? persistId : nextObjectId++; - ObjectId objectId(&attachment, 0, sequence, objectNum); + uint16_t sequence = persistent ? 0 : bootSequence; + + ObjectId objectId(&attachment, 0, sequence); + if (key.empty()) + objectId.setV2Key(*object); // let object generate the key + else + objectId.setV2Key(key); - // TODO: fix object-id handling object->setObjectId(objectId); newManagementObjects[objectId] = object; return objectId; } + void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t severity) { Mutex::ScopedLock lock(agentLock); Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); - uint32_t outLen; uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; stringstream key; key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." << event.getPackageName() << "." << event.getEventName(); - encodeHeader(outBuffer, 'e'); - outBuffer.putShortString(event.getPackageName()); - outBuffer.putShortString(event.getEventName()); - outBuffer.putBin128(event.getMd5Sum()); - outBuffer.putLongLong(uint64_t(Duration(now()))); - outBuffer.putOctet(sev); - event.encode(outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", key.str()); + Variant::Map map_; + Variant::Map schemaId; + Variant::Map values; + Variant::Map headers; + string content; + + map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(), + event.getEventName(), + event.getMd5Sum()); + event.mapEncode(values); + map_["_values"] = values; + map_["_timestamp"] = uint64_t(Duration(now())); + map_["_severity"] = sev; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_event"; + headers["qmf.agent"] = name_address; + + MapCodec::encode(map_, content); + connThreadBody.sendBuffer(content, "", headers, "qmf.default.topic", key.str()); } uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) @@ -235,8 +284,7 @@ uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) methodQueue.pop_front(); { Mutex::ScopedUnlock unlock(agentLock); - Buffer inBuffer(const_cast<char*>(item->body.c_str()), item->body.size()); - invokeMethodRequest(inBuffer, item->sequence, item->replyTo); + invokeMethodRequest(item->body, item->cid, item->replyTo); delete item; } } @@ -274,20 +322,7 @@ void ManagementAgentImpl::setSignalCallback(Notifyable& _notifyable) void ManagementAgentImpl::startProtocol() { - char rawbuffer[512]; - Buffer buffer(rawbuffer, 512); - - connected = true; - encodeHeader(buffer, 'A'); - buffer.putShortString("RemoteAgent [C++]"); - systemId.encode (buffer); - buffer.putLong(requestedBrokerBank); - buffer.putLong(requestedAgentBank); - uint32_t length = buffer.getPosition(); - buffer.reset(); - connThreadBody.sendBuffer(buffer, length, "qpid.management", "broker"); - QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank << - " reqAgent=" << requestedAgentBank); + sendHeartbeat(); } void ManagementAgentImpl::storeData(bool requested) @@ -323,76 +358,54 @@ void ManagementAgentImpl::retrieveData() } } -void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequence, - uint32_t code, string text) +void ManagementAgentImpl::sendHeartbeat() { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + static const string addr_exchange("qmf.default.topic"); + static const string addr_key("agent.ind.heartbeat"); - encodeHeader(outBuffer, 'z', sequence); - outBuffer.putLong(code); - outBuffer.putShortString(text); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyToKey); - QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text); -} + Variant::Map map; + Variant::Map headers; + string content; -void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) -{ - Mutex::ScopedLock lock(agentLock); + headers["method"] = "indication"; + headers["qmf.opcode"] = "_agent_heartbeat_indication"; + headers["qmf.agent"] = name_address; - assignedBrokerBank = inBuffer.getLong(); - assignedAgentBank = inBuffer.getLong(); + map["_values"] = attrMap; + map["_values"].asMap()["timestamp"] = uint64_t(Duration(now())); + map["_values"].asMap()["heartbeat_interval"] = interval; - QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank); + MapCodec::encode(map, content); + connThreadBody.sendBuffer(content, "", headers, addr_exchange, addr_key); - if ((assignedBrokerBank != requestedBrokerBank) || - (assignedAgentBank != requestedAgentBank)) { - if (requestedAgentBank == 0) { - QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." << - assignedAgentBank); - } else { - QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank << - "." << assignedAgentBank); - } - storeData(); - requestedBrokerBank = assignedBrokerBank; - requestedAgentBank = assignedAgentBank; - } + QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); +} - attachment.setBanks(assignedBrokerBank, assignedAgentBank); +void ManagementAgentImpl::sendException(const string& replyToKey, const string& cid, + const string& text, uint32_t code) +{ + static const string addr_exchange("qmf.default.direct"); - // Bind to qpid.management to receive commands - connThreadBody.bindToBank(assignedBrokerBank, assignedAgentBank); + Variant::Map map; + Variant::Map headers; + Variant::Map values; + string content; - // Send package indications for all local packages - for (PackageMap::iterator pIter = packages.begin(); - pIter != packages.end(); - pIter++) { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + headers["method"] = "indication"; + headers["qmf.opcode"] = "_exception"; + headers["qmf.agent"] = name_address; - encodeHeader(outBuffer, 'p'); - encodePackageIndication(outBuffer, pIter); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); + values["error_code"] = code; + values["error_text"] = text; + map["_values"] = values; - // Send class indications for all local classes - ClassMap cMap = pIter->second; - for (ClassMap::iterator cIter = cMap.begin(); cIter != cMap.end(); cIter++) { - outBuffer.reset(); - encodeHeader(outBuffer, 'q'); - encodeClassIndication(outBuffer, pIter, cIter); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); - } - } + MapCodec::encode(map, content); + connThreadBody.sendBuffer(content, cid, headers, addr_exchange, replyToKey); + + QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text); } -void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence) +void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo) { Mutex::ScopedLock lock(agentLock); string packageName; @@ -412,12 +425,14 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc SchemaClass& schema = cIter->second; Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; + string body; encodeHeader(outBuffer, 's', sequence); - schema.writeSchemaCall(outBuffer); + schema.writeSchemaCall(body); + outBuffer.putRawData(body); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); + connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name); } @@ -432,124 +447,250 @@ void ManagementAgentImpl::handleConsoleAddedIndication() QPID_LOG(trace, "RCVD ConsoleAddedInd"); } -void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo) +void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& cid, const string& replyTo) { - string methodName; - string packageName; - string className; - uint8_t hash[16]; - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + string methodName; + bool failed = false; + Variant::Map inMap; + Variant::Map outMap; + Variant::Map::const_iterator oid, mid; + string content; + + MapCodec::decode(body, inMap); + + outMap["_values"] = Variant::Map(); + + if ((oid = inMap.find("_object_id")) == inMap.end() || + (mid = inMap.find("_method_name")) == inMap.end()) { + (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_PARAMETER_INVALID; + (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID); + failed = true; + } else { + string methodName; + ObjectId objId; + Variant::Map inArgs; + Variant::Map callMap; - ObjectId objId(inBuffer); - inBuffer.getShortString(packageName); - inBuffer.getShortString(className); - inBuffer.getBin128(hash); - inBuffer.getShortString(methodName); + try { + // conversions will throw if input is invalid. + objId = ObjectId(oid->second.asMap()); + methodName = mid->second.getString(); - encodeHeader(outBuffer, 'm', sequence); + mid = inMap.find("_arguments"); + if (mid != inMap.end()) { + inArgs = (mid->second).asMap(); + } - ManagementObjectMap::iterator iter = managementObjects.find(objId); - if (iter == managementObjects.end() || iter->second->isDeleted()) { - outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); - outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT)); - } else { - if ((iter->second->getPackageName() != packageName) || - (iter->second->getClassName() != className)) { - outBuffer.putLong (Manageable::STATUS_PARAMETER_INVALID); - outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID)); - } - else - try { - outBuffer.record(); - iter->second->doMethod(methodName, inBuffer, outBuffer); - } catch(exception& e) { - outBuffer.restore(); - outBuffer.putLong(Manageable::STATUS_EXCEPTION); - outBuffer.putMediumString(e.what()); + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter == managementObjects.end() || iter->second->isDeleted()) { + (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_UNKNOWN_OBJECT; + (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT); + failed = true; + } else { + iter->second->doMethod(methodName, inArgs, callMap); } + + if (callMap["_status_code"].asUint32() == 0) { + outMap["_arguments"] = Variant::Map(); + for (Variant::Map::const_iterator iter = callMap.begin(); + iter != callMap.end(); iter++) + if (iter->first != "_status_code" && iter->first != "_status_text") + outMap["_arguments"].asMap()[iter->first] = iter->second; + } else { + (outMap["_values"].asMap())["_status_code"] = callMap["_status_code"]; + (outMap["_values"].asMap())["_status_text"] = callMap["_status_text"]; + failed = true; + } + + } catch(types::InvalidConversion& e) { + outMap.clear(); + outMap["_values"] = Variant::Map(); + (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_EXCEPTION; + (outMap["_values"].asMap())["_status_text"] = e.what(); + failed = true; + } + } + + Variant::Map headers; + headers["method"] = "response"; + headers["qmf.agent"] = name_address; + if (failed) { + headers["qmf.opcode"] = "_exception"; + QPID_LOG(trace, "SENT Exception map=" << outMap); + } else { + headers["qmf.opcode"] = "_method_response"; + QPID_LOG(trace, "SENT MethodResponse map=" << outMap); } - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); + MapCodec::encode(outMap, content); + connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo); } -void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, string replyTo) +void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& replyTo) { - FieldTable ft; - FieldTable::ValuePtr value; - moveNewObjectsLH(); - ft.decode(inBuffer); + Variant::Map inMap; + Variant::Map::const_iterator i; + Variant::Map headers; + + MapCodec::decode(body, inMap); + QPID_LOG(trace, "RCVD GetQuery: map=" << inMap << " cid=" << cid); + + headers["method"] = "response"; + headers["qmf.opcode"] = "_query_response"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + headers["partial"] = Variant(); + + Variant::List list_; + Variant::Map map_; + Variant::Map values; + Variant::Map oidMap; + string content; + + /* + * Unpack the _what element of the query. Currently we only support OBJECT queries. + */ + i = inMap.find("_what"); + if (i == inMap.end()) { + sendException(replyTo, cid, "_what element missing in Query"); + return; + } - QPID_LOG(trace, "RCVD GetQuery: map=" << ft); + if (i->second.getType() != qpid::types::VAR_STRING) { + sendException(replyTo, cid, "_what element is not a string"); + return; + } - value = ft.get("_class"); - if (value.get() == 0 || !value->convertsTo<string>()) { - value = ft.get("_objectid"); - if (value.get() == 0 || !value->convertsTo<string>()) - return; + if (i->second.asString() != "OBJECT") { + sendException(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported"); + return; + } + + string className; + string packageName; + + /* + * Handle the _schema_id element, if supplied. + */ + i = inMap.find("_schema_id"); + if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) { + const Variant::Map& schemaIdMap(i->second.asMap()); + + Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name"); + if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING) + className = s_iter->second.asString(); - ObjectId selector(value->get<string>()); - ManagementObjectMap::iterator iter = managementObjects.find(selector); + s_iter = schemaIdMap.find("_package_name"); + if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING) + packageName = s_iter->second.asString(); + } + + /* + * Unpack the _object_id element of the query if it is present. If it is present, find that one + * object and return it. If it is not present, send a class-based result. + */ + i = inMap.find("_object_id"); + if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) { + ObjectId objId(i->second.asMap()); + + ManagementObjectMap::iterator iter = managementObjects.find(objId); if (iter != managementObjects.end()) { ManagementObject* object = iter->second; - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); - encodeHeader(outBuffer, 'g', sequence); - object->writeProperties(outBuffer); - object->writeStatistics(outBuffer, true); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); + object->mapEncodeValues(values, true, true); // write both stats and properties + objId.mapEncode(oidMap); + map_["_values"] = values; + map_["_object_id"] = oidMap; + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + object->getMd5Sum()); + list_.push_back(map_); + headers.erase("partial"); + + ListCodec::encode(list_, content); + connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo); + return; + } + } else { + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second; + if (object->getClassName() == className && + (packageName.empty() || object->getPackageName() == packageName)) { + + // @todo support multiple object reply per message + values.clear(); + list_.clear(); + oidMap.clear(); + + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); - QPID_LOG(trace, "SENT ObjectInd"); + object->mapEncodeValues(values, true, true); // write both stats and properties + iter->first.mapEncode(oidMap); + map_["_values"] = values; + map_["_object_id"] = oidMap; + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + object->getMd5Sum()); + list_.push_back(map_); + + ListCodec::encode(list_, content); + connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo); + } } - sendCommandComplete(replyTo, sequence); - return; } - string className(value->get<string>()); + // end empty "non-partial" message to indicate CommandComplete + list_.clear(); + headers.erase("partial"); + ListCodec::encode(list_, content); + connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo); +} - for (ManagementObjectMap::iterator iter = managementObjects.begin(); - iter != managementObjects.end(); - iter++) { - ManagementObject* object = iter->second; - if (object->getClassName() == className) { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; +void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo) +{ + QPID_LOG(trace, "RCVD AgentLocateRequest"); + static const string addr_exchange("qmf.default.direct"); - if (object->getConfigChanged() || object->getInstChanged()) - object->setUpdateTime(); + Variant::Map map; + Variant::Map headers; + string content; - encodeHeader(outBuffer, 'g', sequence); - object->writeProperties(outBuffer); - object->writeStatistics(outBuffer, true); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); + headers["method"] = "indication"; + headers["qmf.opcode"] = "_agent_locate_response"; + headers["qmf.agent"] = name_address; - QPID_LOG(trace, "SENT ObjectInd"); - } - } + map["_values"] = attrMap; + map["_values"].asMap()["timestamp"] = uint64_t(Duration(now())); + map["_values"].asMap()["heartbeat_interval"] = interval; + + MapCodec::encode(map, content); + connThreadBody.sendBuffer(content, cid, headers, addr_exchange, replyTo); + + QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo); - sendCommandComplete(replyTo, sequence); + { + Mutex::ScopedLock lock(agentLock); + clientWasAdded = true; + } } -void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo) +void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& replyTo) { if (extThread) { Mutex::ScopedLock lock(agentLock); - string body; - inBuffer.getRawData(body, inBuffer.available()); - methodQueue.push_back(new QueuedMethod(sequence, replyTo, body)); + methodQueue.push_back(new QueuedMethod(cid, replyTo, body)); if (pipeHandle != 0) { pipeHandle->write("X", 1); } else if (notifyable != 0) { @@ -568,7 +709,7 @@ void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequenc inCallback = false; } } else { - invokeMethodRequest(inBuffer, sequence, replyTo); + invokeMethodRequest(body, cid, replyTo); } QPID_LOG(trace, "RCVD MethodRequest"); @@ -576,28 +717,45 @@ void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequenc void ManagementAgentImpl::received(Message& msg) { + string replyToKey; + framing::MessageProperties mp = msg.getMessageProperties(); + if (mp.hasReplyTo()) { + const framing::ReplyTo& rt = mp.getReplyTo(); + replyToKey = rt.getRoutingKey(); + } + + if (mp.hasAppId() && mp.getAppId() == "qmf2") + { + string opcode = mp.getApplicationHeaders().getAsString("qmf.opcode"); + string cid = msg.getMessageProperties().getCorrelationId(); + + if (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToKey); + else if (opcode == "_method_request") handleMethodRequest(msg.getData(), cid, replyToKey); + else if (opcode == "_query_request") handleGetQuery(msg.getData(), cid, replyToKey); + else { + QPID_LOG(warning, "Support for QMF V2 Opcode [" << opcode << "] TBD!!!"); + } + return; + } + + // old preV2 binary messages + + uint32_t sequence; string data = msg.getData(); Buffer inBuffer(const_cast<char*>(data.c_str()), data.size()); uint8_t opcode; - uint32_t sequence; - string replyToKey; - framing::MessageProperties p = msg.getMessageProperties(); - if (p.hasReplyTo()) { - const framing::ReplyTo& rt = p.getReplyTo(); - replyToKey = rt.getRoutingKey(); - } if (checkHeader(inBuffer, &opcode, &sequence)) { - if (opcode == 'a') handleAttachResponse(inBuffer); - else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence); + if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToKey); else if (opcode == 'x') handleConsoleAddedIndication(); - else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey); - else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey); + else + QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode)); } } + void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq) { buf.putOctet('A'); @@ -607,6 +765,19 @@ void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq buf.putLong (seq); } +Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname, + const string& cname, + const uint8_t *md5Sum) +{ + Variant::Map map_; + + map_["_package_name"] = pname; + map_["_class_name"] = cname; + map_["_hash"] = types::Uuid(md5Sum); + return map_; +} + + bool ManagementAgentImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) { if (buf.getSize() < 8) @@ -661,7 +832,7 @@ void ManagementAgentImpl::addClassLocal(uint8_t classKind, PackageMap::iterator pIter, const string& className, uint8_t* md5Sum, - qpid::management::ManagementObject::writeSchemaCall_t schemaCall) + ManagementObject::writeSchemaCall_t schemaCall) { SchemaClassKey key; ClassMap& cMap = pIter->second; @@ -701,10 +872,7 @@ void ManagementAgentImpl::encodeClassIndication(Buffer& buf, void ManagementAgentImpl::periodicProcessing() { -#define BUFSIZE 65536 Mutex::ScopedLock lock(agentLock); - char msgChars[BUFSIZE]; - uint32_t contentSize; list<pair<ObjectId, ManagementObject*> > deleteList; if (!connected) @@ -745,42 +913,53 @@ void ManagementAgentImpl::periodicProcessing() !baseObject->isDeleted())) continue; - Buffer msgBuffer(msgChars, BUFSIZE); + Variant::List list_; + for (ManagementObjectMap::iterator iter = baseIter; iter != managementObjects.end(); iter++) { ManagementObject* object = iter->second; + bool send_stats, send_props; if (baseObject->isSameClass(*object) && object->getFlags() == 0) { object->setFlags(1); if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); - if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) { - encodeHeader(msgBuffer, 'c'); - object->writeProperties(msgBuffer); - } - - if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) { - encodeHeader(msgBuffer, 'i'); - object->writeStatistics(msgBuffer); + send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()); + send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish())); + + if (send_stats || send_props) { + Variant::Map map_; + Variant::Map values; + Variant::Map oid; + + object->getObjectId().mapEncode(oid); + map_["_object_id"] = oid; + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + object->getMd5Sum()); + object->mapEncodeValues(values, send_props, send_stats); + map_["_values"] = values; + list_.push_back(map_); } if (object->isDeleted()) deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object)); object->setForcePublish(false); - - if (msgBuffer.available() < (BUFSIZE / 2)) - break; } } - contentSize = BUFSIZE - msgBuffer.available(); - if (contentSize > 0) { - msgBuffer.reset(); - stringstream key; - key << "console.obj." << assignedBrokerBank << "." << assignedAgentBank << "." << - baseObject->getPackageName() << "." << baseObject->getClassName(); - connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str()); + string content; + ListCodec::encode(list_, content); + if (content.length()) { + Variant::Map headers; + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + connThreadBody.sendBuffer(content, "", headers, "qmf.default.topic", "agent.ind.data", "amqp/list"); + QPID_LOG(trace, "SENT DataIndication"); } } @@ -793,18 +972,7 @@ void ManagementAgentImpl::periodicProcessing() } deleteList.clear(); - - { - Buffer msgBuffer(msgChars, BUFSIZE); - encodeHeader(msgBuffer, 'h'); - msgBuffer.putLongLong(uint64_t(Duration(now()))); - stringstream key; - key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank; - - contentSize = BUFSIZE - msgBuffer.available(); - msgBuffer.reset(); - connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str()); - } + sendHeartbeat(); } void ManagementAgentImpl::ConnectionThread::run() @@ -831,6 +999,10 @@ void ManagementAgentImpl::ConnectionThread::run() arg::exclusive=true); session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(), arg::bindingKey=queueName.str()); + session.exchangeBind(arg::exchange="qmf.default.direct", arg::queue=queueName.str(), + arg::bindingKey=agent.name_address); + session.exchangeBind(arg::exchange="qmf.default.topic", arg::queue=queueName.str(), + arg::bindingKey="console.#"); subscriptions->subscribe(agent, queueName.str(), dest); QPID_LOG(info, "Connection established with broker"); @@ -839,6 +1011,7 @@ void ManagementAgentImpl::ConnectionThread::run() if (shutdown) return; operational = true; + agent.connected = true; agent.startProtocol(); try { Mutex::ScopedUnlock _unlock(connLock); @@ -892,6 +1065,48 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, const string& exchange, const string& routingKey) { + Message msg; + string data; + + buf.getRawData(data, length); + msg.setData(data); + sendMessage(msg, exchange, routingKey); +} + + + +void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data, + const string& cid, + const Variant::Map headers, + const string& exchange, + const string& routingKey, + const string& contentType) +{ + Message msg; + Variant::Map::const_iterator i; + + if (!cid.empty()) + msg.getMessageProperties().setCorrelationId(cid); + + if (!contentType.empty()) + msg.getMessageProperties().setContentType(contentType); + for (i = headers.begin(); i != headers.end(); ++i) { + msg.getHeaders().setString(i->first, i->second.asString()); + } + msg.getHeaders().setString("app_id", "qmf2"); + + msg.setData(data); + sendMessage(msg, exchange, routingKey); +} + + + + + +void ManagementAgentImpl::ConnectionThread::sendMessage(Message msg, + const string& exchange, + const string& routingKey) +{ ConnectionThread::shared_ptr s; { Mutex::ScopedLock _lock(connLock); @@ -900,23 +1115,21 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, s = subscriptions; } - Message msg; - string data; - - buf.getRawData(data, length); msg.getDeliveryProperties().setRoutingKey(routingKey); msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); - msg.setData(data); + msg.getMessageProperties().getApplicationHeaders().setString("qmf.agent", agent.name_address); try { session.messageTransfer(arg::content=msg, arg::destination=exchange); } catch(exception& e) { - QPID_LOG(error, "Exception caught in sendBuffer: " << e.what()); + QPID_LOG(error, "Exception caught in sendMessage: " << e.what()); // Bounce the connection if (s) s->stop(); } } + + void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank) { stringstream key; diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h index affaa45d2d..d1609341be 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -51,6 +51,9 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen // Methods from ManagementAgent // int getMaxThreads() { return 1; } + void setName(const std::string& vendor, + const std::string& product, + const std::string& instance=""); void init(const std::string& brokerHost = "localhost", uint16_t brokerPort = 5672, uint16_t intervalSeconds = 10, @@ -75,6 +78,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen uint8_t* md5Sum, management::ManagementObject::writeSchemaCall_t schemaCall); ObjectId addObject(management::ManagementObject* objectPtr, uint64_t persistId = 0); + ObjectId addObject(management::ManagementObject* objectPtr, const std::string& key, + bool persistent); void raiseEvent(const management::ManagementEvent& event, severity_t severity = SEV_DEFAULT); uint32_t pollCallbacks(uint32_t callLimit = 0); int getSignalFd(); @@ -120,10 +125,10 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen }; struct QueuedMethod { - QueuedMethod(uint32_t _seq, std::string _reply, std::string _body) : - sequence(_seq), replyTo(_reply), body(_body) {} + QueuedMethod(const std::string& _cid, const std::string& _reply, const std::string& _body) : + cid(_cid), replyTo(_reply), body(_body) {} - uint32_t sequence; + std::string cid; std::string replyTo; std::string body; }; @@ -140,6 +145,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void received (client::Message& msg); + qpid::types::Variant::Map attrMap; + std::string name_address; uint16_t interval; bool extThread; sys::PipeHandle* pipeHandle; @@ -155,6 +162,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen client::ConnectionSettings connectionSettings; bool initialized; bool connected; + bool useMapMsg; std::string lastFailure; bool clientWasAdded; @@ -198,6 +206,15 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen uint32_t length, const std::string& exchange, const std::string& routingKey); + void sendBuffer(const std::string& data, + const std::string& cid, + const qpid::types::Variant::Map headers, + const std::string& exchange, + const std::string& routingKey, + const std::string& contentType="amqp/map"); + void sendMessage(qpid::client::Message msg, + const std::string& exchange, + const std::string& routingKey); void bindToBank(uint32_t brokerBank, uint32_t agentBank); void close(); bool isSleeping() const; @@ -237,16 +254,21 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen PackageMap::iterator pIter, ClassMap::iterator cIter); void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); + qpid::types::Variant::Map mapEncodeSchemaId(const std::string& pname, + const std::string& cname, + const uint8_t *md5Sum); bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); - void sendCommandComplete (std::string replyToKey, uint32_t sequence, - uint32_t code = 0, std::string text = std::string("OK")); - void handleAttachResponse (qpid::framing::Buffer& inBuffer); + void sendHeartbeat(); + void sendException(const std::string& replyToKey, const std::string& cid, + const std::string& text, uint32_t code=1); void handlePackageRequest (qpid::framing::Buffer& inBuffer); void handleClassQuery (qpid::framing::Buffer& inBuffer); - void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence); - void invokeMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); - void handleGetQuery (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); - void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); + void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& replyTo); + void invokeMethodRequest (const std::string& body, const std::string& cid, const std::string& replyTo); + + void handleGetQuery (const std::string& body, const std::string& cid, const std::string& replyTo); + void handleLocateRequest (const std::string& body, const std::string& sequence, const std::string& replyTo); + void handleMethodRequest (const std::string& body, const std::string& sequence, const std::string& replyTo); void handleConsoleAddedIndication(); }; diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index d94f228734..24c5a0c049 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -93,7 +93,8 @@ Broker::Options::Options(const std::string& name) : tcpNoDelay(false), requireEncrypted(false), maxSessionRate(0), - asyncQueueEvents(false) // Must be false in a cluster. + asyncQueueEvents(false), // Must be false in a cluster. + qmf2Support(false) { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; @@ -114,6 +115,7 @@ Broker::Options::Options(const std::string& name) : ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket") ("staging-threshold", optValue(stagingThreshold, "N"), "Stages messages over N bytes to disk") ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management") + ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Use QMF v2 for Broker Management") ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval") ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"), "Interval between attempts to purge any expired messages from queues") @@ -138,7 +140,9 @@ const std::string knownHostsNone("none"); Broker::Broker(const Broker::Options& conf) : poller(new Poller), config(conf), - managementAgent(conf.enableMgmt ? new ManagementAgent() : 0), + managementAgent(conf.enableMgmt ? new ManagementAgent(!conf.qmf2Support, + conf.qmf2Support) + : 0), store(new NullMessageStore), acl(0), dataDir(conf.noDataDir ? std::string() : conf.dataDir), @@ -164,6 +168,7 @@ Broker::Broker(const Broker::Options& conf) : QPID_LOG(info, "Management enabled"); managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(), conf.mgmtPubInterval, this, conf.workerThreads + 3); + managementAgent->setName("apache.org", "qpidd"); _qmf::Package packageInitializer(managementAgent.get()); System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 465a17f4eb..f9be992f0c 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -113,6 +113,7 @@ public: std::string knownHosts; uint32_t maxSessionRate; bool asyncQueueEvents; + bool qmf2Support; private: std::string getHome(); diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 7bb70ed24a..1d3da982d8 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -149,7 +149,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel mgmtExchange = new _qmf::Exchange (agent, this, parent, _name); mgmtExchange->set_durable(durable); mgmtExchange->set_autoDelete(false); - mgmtExchange->set_arguments(args); + mgmtExchange->set_arguments(ManagementAgent::toMap(args)); if (!durable) { if (name.empty()) { agent->addObject (mgmtExchange, 0x1000000000000004LL); // Special default exchange ID @@ -336,7 +336,7 @@ void Exchange::Binding::startManagement() { management::ObjectId queueId = mo->getObjectId(); mgmtBinding = new _qmf::Binding - (agent, this, (Manageable*) parent, queueId, key, args); + (agent, this, (Manageable*) parent, queueId, key, ManagementAgent::toMap(args)); if (!origin.empty()) mgmtBinding->set_origin(origin); agent->addObject (mgmtBinding, agent->allocateId(this)); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 9e379dfc49..8d9248212f 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -873,7 +873,7 @@ void Queue::configure(const FieldTable& _settings, bool recovering) if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>()); if (mgmtObject != 0) - mgmtObject->set_arguments (_settings); + mgmtObject->set_arguments(ManagementAgent::toMap(_settings)); if ( isDurable() && ! getPersistenceId() && ! recovering ) store->create(*this, _settings); diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 73ef807a0a..5148d88e72 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -280,7 +280,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, if (agent != 0) { mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name, - !acquire, ackExpected, exclusive ,arguments); + !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments)); agent->addObject (mgmtObject, agent->allocateId(this)); mgmtObject->set_creditMode("WINDOW"); } diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index c3b6f697fd..10eddc6045 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -106,7 +106,7 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), getConnection().getUserId(), exchange, type, - alternateExchange, durable, false, args, + alternateExchange, durable, false, ManagementAgent::toMap(args), response.second ? "created" : "existing")); }catch(UnknownExchangeTypeException& /*e*/){ @@ -194,7 +194,8 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) - agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, exchangeRoutingKey, arguments)); + agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, + queueName, exchangeRoutingKey, ManagementAgent::toMap(arguments))); } }else{ throw NotFoundException("Bind failed. No such exchange: " + exchangeName); @@ -389,7 +390,7 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(), - name, durable, exclusive, autoDelete, arguments, + name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments), queue_created.second ? "created" : "existing")); } @@ -499,7 +500,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(), getConnection().getUserId(), - queueName, destination, exclusive, arguments)); + queueName, destination, exclusive, ManagementAgent::toMap(arguments))); } void diff --git a/cpp/src/qpid/broker/System.cpp b/cpp/src/qpid/broker/System.cpp index 455ad11cf2..90c6b13cd3 100644 --- a/cpp/src/qpid/broker/System.cpp +++ b/cpp/src/qpid/broker/System.cpp @@ -22,6 +22,7 @@ #include "qpid/management/ManagementAgent.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/SystemInfo.h" +#include "qpid/types/Uuid.h" #include <iostream> #include <fstream> @@ -64,7 +65,7 @@ System::System (string _dataDir, Broker* broker) : mgmtObject(0) } } - mgmtObject = new _qmf::System (agent, this, systemId); + mgmtObject = new _qmf::System(agent, this, types::Uuid(systemId.c_array())); std::string sysname, nodename, release, version, machine; qpid::sys::SystemInfo::getSystemId (sysname, nodename, diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 4454d70427..bc62588f5d 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -29,20 +29,46 @@ #include "qpid/sys/Time.h" #include "qpid/broker/ConnectionState.h" #include "qpid/broker/AclModule.h" +#include "qpid/types/Variant.h" +#include "qpid/types/Uuid.h" +#include "qpid/framing/List.h" +#include "qpid/amqp_0_10/Codecs.h" #include <list> #include <iostream> #include <fstream> #include <sstream> +#include <typeinfo> using boost::intrusive_ptr; using qpid::framing::Uuid; +using qpid::types::Variant; +using qpid::amqp_0_10::MapCodec; +using qpid::amqp_0_10::ListCodec; using namespace qpid::framing; using namespace qpid::management; using namespace qpid::broker; using namespace qpid::sys; +using namespace qpid; using namespace std; namespace _qmf = qmf::org::apache::qpid::broker; + + +static Variant::Map mapEncodeSchemaId(const std::string& pname, + const std::string& cname, + const std::string& type, + const uint8_t *md5Sum) +{ + Variant::Map map_; + + map_["_package_name"] = pname; + map_["_class_name"] = cname; + map_["_type"] = type; + map_["_hash"] = qpid::types::Uuid(md5Sum); + return map_; +} + + ManagementAgent::RemoteAgent::~RemoteAgent () { QPID_LOG(trace, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]"); @@ -52,10 +78,11 @@ ManagementAgent::RemoteAgent::~RemoteAgent () } } -ManagementAgent::ManagementAgent () : +ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : threadPoolSize(1), interval(10), broker(0), timer(0), startTime(uint64_t(Duration(now()))), - suppressed(false) + suppressed(false), + qmf1Support(qmfV1), qmf2Support(qmfV2) { nextObjectId = 1; brokerBank = 1; @@ -148,6 +175,27 @@ void ManagementAgent::pluginsInitialized() { timer->add(new Periodic(*this, interval)); } + +void ManagementAgent::setName(const string& vendor, const string& product, const string& instance) +{ + attrMap["_vendor"] = vendor; + attrMap["_product"] = product; + string inst; + if (instance.empty()) { + if (uuid.isNull()) + { + throw Exception("ManagementAgent::configure() must be called if default name is used."); + } + inst = uuid.str(); + } else + inst = instance; + + name_address = vendor + ":" + product + ":" + inst; + attrMap["_instance"] = inst; + attrMap["_name"] = name_address; +} + + void ManagementAgent::writeData () { string filename (dataDir + "/.mbrokerdata"); @@ -194,6 +242,7 @@ void ManagementAgent::registerEvent (const string& packageName, addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall); } +// Deprecated: V1 objects ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId) { uint16_t sequence; @@ -207,8 +256,47 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId objectNum = persistId; } - ObjectId objId(0 /*flags*/ , sequence, brokerBank, 0, objectNum); - objId.setV2Key(*object); + ObjectId objId(0 /*flags*/, sequence, brokerBank, objectNum); + objId.setV2Key(*object); // let object generate the v2 key + + object->setObjectId(objId); + + { + Mutex::ScopedLock lock (addLock); + ManagementObjectMap::iterator destIter = newManagementObjects.find(objId); + if (destIter != newManagementObjects.end()) { + if (destIter->second->isDeleted()) { + newDeletedManagementObjects.push_back(destIter->second); + newManagementObjects.erase(destIter); + } else { + QPID_LOG(error, "ObjectId collision in addObject. class=" << object->getClassName() << + " key=" << objId.getV2Key()); + return objId; + } + } + newManagementObjects[objId] = object; + } + + return objId; +} + + + +ObjectId ManagementAgent::addObject(ManagementObject* object, + const std::string& key, + bool persistent) +{ + uint16_t sequence; + + sequence = persistent ? 0 : bootSequence; + + ObjectId objId(0 /*flags*/, sequence, brokerBank); + if (key.empty()) { + objId.setV2Key(*object); // let object generate the key + } else { + objId.setV2Key(key); + } + object->setObjectId(objId); { @@ -233,21 +321,57 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severity) { Mutex::ScopedLock lock (userLock); - Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); - uint32_t outLen; uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; - encodeHeader(outBuffer, 'e'); - outBuffer.putShortString(event.getPackageName()); - outBuffer.putShortString(event.getEventName()); - outBuffer.putBin128(event.getMd5Sum()); - outBuffer.putLongLong(uint64_t(Duration(now()))); - outBuffer.putOctet(sev); - event.encode(outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBuffer(outBuffer, outLen, mExchange, - "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); + if (qmf1Support) { + Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader(outBuffer, 'e'); + outBuffer.putShortString(event.getPackageName()); + outBuffer.putShortString(event.getEventName()); + outBuffer.putBin128(event.getMd5Sum()); + outBuffer.putLongLong(uint64_t(Duration(now()))); + outBuffer.putOctet(sev); + std::string sBuf; + event.encode(sBuf); + outBuffer.putRawData(sBuf); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, mExchange, + "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); + QPID_LOG(trace, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName()); + } + + if (qmf2Support) { + Variant::Map map_; + Variant::Map schemaId; + Variant::Map values; + Variant::Map headers; + + map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(), + event.getEventName(), + "_event", + event.getMd5Sum()); + event.mapEncode(values); + map_["_values"] = values; + map_["_timestamp"] = uint64_t(Duration(now())); + map_["_severity"] = sev; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_event"; + headers["qmf.agent"] = name_address; + + stringstream key; + key << "agent.ind.event." << sev << "." << name_address << "." << event.getEventName(); + + string content; + MapCodec::encode(map_, content); + sendBuffer(content, "", headers, v2Topic, key.str()); + QPID_LOG(trace, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName()); + } + } ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) @@ -355,6 +479,59 @@ void ManagementAgent::sendBuffer(Buffer& buf, } catch(exception&) {} } + +void ManagementAgent::sendBuffer(const std::string& data, + const std::string& cid, + const Variant::Map& headers, + qpid::broker::Exchange::shared_ptr exchange, + const std::string& routingKey) +{ + Variant::Map::const_iterator i; + + if (suppressed) { + QPID_LOG(trace, "Suppressed management message to " << routingKey); + return; + } + if (exchange.get() == 0) return; + + intrusive_ptr<Message> msg(new Message()); + AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0))); + AMQFrame header((AMQHeaderBody())); + AMQFrame content((AMQContentBody(data))); + + method.setEof(false); + header.setBof(false); + header.setEof(false); + content.setBof(false); + + msg->getFrames().append(method); + msg->getFrames().append(header); + + MessageProperties* props = + msg->getFrames().getHeaders()->get<MessageProperties>(true); + props->setContentLength(data.length()); + if (!cid.empty()) { + props->setCorrelationId(cid); + } + + for (i = headers.begin(); i != headers.end(); ++i) { + msg->getOrInsertHeaders().setString(i->first, i->second.asString()); + } + msg->getOrInsertHeaders().setString("app_id", "qmf2"); + + DeliveryProperties* dp = + msg->getFrames().getHeaders()->get<DeliveryProperties>(true); + dp->setRoutingKey(routingKey); + + msg->getFrames().append(content); + + DeliverableMessage deliverable (msg); + try { + exchange->route(deliverable, routingKey, 0); + } catch(exception&) {} +} + + void ManagementAgent::moveNewObjectsLH() { Mutex::ScopedLock lock (addLock); @@ -391,12 +568,13 @@ void ManagementAgent::periodicProcessing (void) { #define BUFSIZE 65536 #define HEADROOM 4096 - QPID_LOG(trace, "Management agent periodic processing") - Mutex::ScopedLock lock (userLock); + QPID_LOG(trace, "Management agent periodic processing"); + Mutex::ScopedLock lock (userLock); char msgChars[BUFSIZE]; uint32_t contentSize; string routingKey; list<pair<ObjectId, ManagementObject*> > deleteList; + std::string sBuf; uint64_t uptime = uint64_t(Duration(now())) - startTime; static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); @@ -439,43 +617,90 @@ void ManagementAgent::periodicProcessing (void) continue; Buffer msgBuffer(msgChars, BUFSIZE); + Variant::List list_; + for (ManagementObjectMap::iterator iter = baseIter; iter != managementObjects.end(); iter++) { ManagementObject* object = iter->second; + bool send_stats, send_props; if (baseObject->isSameClass(*object) && object->getFlags() == 0) { object->setFlags(1); if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); - if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) { + send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()); + send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish())); + + if (send_props && qmf1Support) { encodeHeader(msgBuffer, 'c'); - object->writeProperties(msgBuffer); - pcount++; + sBuf.clear(); + object->writeProperties(sBuf); + msgBuffer.putRawData(sBuf); } - - if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) { + + if (send_stats && qmf1Support) { encodeHeader(msgBuffer, 'i'); - object->writeStatistics(msgBuffer); - scount++; + sBuf.clear(); + object->writeStatistics(sBuf); + msgBuffer.putRawData(sBuf); } + if ((send_stats || send_props) && qmf2Support) { + Variant::Map map_; + Variant::Map values; + + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + "_data", + object->getMd5Sum()); + object->mapEncodeValues(values, send_props, send_stats); + map_["_values"] = values; + list_.push_back(map_); + + } + + if (send_props) pcount++; + if (send_stats) scount++; + if (object->isDeleted()) deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object)); object->setForcePublish(false); - if (msgBuffer.available() < HEADROOM) + if (qmf1Support && (msgBuffer.available() < HEADROOM)) break; } } - contentSize = BUFSIZE - msgBuffer.available(); - if (contentSize > 0) { - msgBuffer.reset(); - stringstream key; - key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName(); - sendBuffer(msgBuffer, contentSize, mExchange, key.str()); - QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount); + if (pcount || scount) { + if (qmf1Support) { + contentSize = BUFSIZE - msgBuffer.available(); + if (contentSize > 0) { + msgBuffer.reset(); + stringstream key; + key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName(); + sendBuffer(msgBuffer, contentSize, mExchange, key.str()); + QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount); + } + } + + if (qmf2Support) { + string content; + ListCodec::encode(list_, content); + if (content.length()) { + stringstream key; + Variant::Map headers; + key << "agent.ind.data." << baseObject->getPackageName() << "." << baseObject->getClassName(); + // key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName(); + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + sendBuffer(content, "", headers, v2Topic, key.str()); + QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount); + } + } } } @@ -492,15 +717,49 @@ void ManagementAgent::periodicProcessing (void) for (ManagementObjectVector::iterator cdIter = deletedManagementObjects.begin(); cdIter != deletedManagementObjects.end(); cdIter++) { collisionDeletions = true; - Buffer msgBuffer(msgChars, BUFSIZE); - encodeHeader(msgBuffer, 'c'); - (*cdIter)->writeProperties(msgBuffer); - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - stringstream key; - key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName(); - sendBuffer (msgBuffer, contentSize, mExchange, key.str()); - QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str()); + { + if (qmf1Support) { + Buffer msgBuffer(msgChars, BUFSIZE); + encodeHeader(msgBuffer, 'c'); + sBuf.clear(); + (*cdIter)->writeProperties(sBuf); + msgBuffer.putRawData(sBuf); + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + stringstream key; + key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName(); + sendBuffer (msgBuffer, contentSize, mExchange, key.str()); + QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str()); + } + + if (qmf2Support) { + Variant::List list_; + Variant::Map map_; + Variant::Map values; + Variant::Map headers; + + map_["_schema_id"] = mapEncodeSchemaId((*cdIter)->getPackageName(), + (*cdIter)->getClassName(), + "_data", + (*cdIter)->getMd5Sum()); + (*cdIter)->mapEncodeValues(values, true, false); + map_["_values"] = values; + list_.push_back(map_); + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + stringstream key; + key << "agent.ind.data." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName(); + + string content; + ListCodec::encode(list_, content); + sendBuffer(content, "", headers, v2Topic, key.str()); + QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str()); + } + } } if (!deleteList.empty() || collisionDeletions) { @@ -508,7 +767,12 @@ void ManagementAgent::periodicProcessing (void) deleteOrphanedAgentsLH(); } - { + // heartbeat generation + + if (qmf1Support) { +#define BUFSIZE 65536 + uint32_t contentSize; + char msgChars[BUFSIZE]; Buffer msgBuffer(msgChars, BUFSIZE); encodeHeader(msgBuffer, 'h'); msgBuffer.putLongLong(uint64_t(Duration(now()))); @@ -519,6 +783,27 @@ void ManagementAgent::periodicProcessing (void) sendBuffer (msgBuffer, contentSize, mExchange, routingKey); QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey); } + + if (qmf2Support) { + static const string addr_key("agent.ind.heartbeat"); + + Variant::Map map; + Variant::Map headers; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_agent_heartbeat_indication"; + headers["qmf.agent"] = name_address; + + map["_values"] = attrMap; + map["_values"].asMap()["timestamp"] = uint64_t(Duration(now())); + map["_values"].asMap()["heartbeat_interval"] = interval; + + string content; + MapCodec::encode(map, content); + sendBuffer(content, "", headers, v2Topic, addr_key); + + QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); + } QPID_LOG(debug, "periodic update " << debugSnapshot()); } @@ -531,19 +816,51 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) if (!object->isDeleted()) return; + if (qmf1Support) { #define DNOW_BUFSIZE 2048 - char msgChars[DNOW_BUFSIZE]; - uint32_t contentSize; - Buffer msgBuffer(msgChars, DNOW_BUFSIZE); - - encodeHeader(msgBuffer, 'c'); - object->writeProperties(msgBuffer); - contentSize = msgBuffer.getPosition(); - msgBuffer.reset(); - stringstream key; - key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName(); - sendBuffer(msgBuffer, contentSize, mExchange, key.str()); - QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str()); + char msgChars[DNOW_BUFSIZE]; + uint32_t contentSize; + Buffer msgBuffer(msgChars, DNOW_BUFSIZE); + std::string sBuf; + + encodeHeader(msgBuffer, 'c'); + object->writeProperties(sBuf); + msgBuffer.putRawData(sBuf); + contentSize = msgBuffer.getPosition(); + msgBuffer.reset(); + stringstream key; + key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName(); + sendBuffer(msgBuffer, contentSize, mExchange, key.str()); + QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str()); + } + + if (qmf2Support) { + Variant::List list_; + Variant::Map map_; + Variant::Map values; + + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + "_data", + object->getMd5Sum()); + object->mapEncodeValues(values, true, false); + map_["_values"] = values; + list_.push_back(map_); + + stringstream key; + key << "agent.ind.data." << object->getPackageName() << "." << object->getClassName(); + + Variant::Map headers; + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + string content; + ListCodec::encode(list_, content); + sendBuffer(content, "", headers, v2Topic, key.str()); + QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str()); + } managementObjects.erase(oid); } @@ -566,35 +883,68 @@ void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence, bool ManagementAgent::dispatchCommand (Deliverable& deliverable, const string& routingKey, - const FieldTable* /*args*/) + const FieldTable* /*args*/, + const bool topic) { Mutex::ScopedLock lock (userLock); Message& msg = ((DeliverableMessage&) deliverable).getMessage (); - // Parse the routing key. This management broker should act as though it - // is bound to the exchange to match the following keys: - // - // agent.1.0.# - // broker - // schema.# + if (qmf1Support && topic) { - if (routingKey == "broker") { - dispatchAgentCommandLH(msg); - return false; - } + // qmf1 is bound only to the topic management exchange. + // Parse the routing key. This management broker should act as though it + // is bound to the exchange to match the following keys: + // + // agent.1.0.# + // broker + // schema.# - else if (routingKey.compare(0, 9, "agent.1.0") == 0) { - dispatchAgentCommandLH(msg); - return false; - } + if (routingKey == "broker") { + dispatchAgentCommandLH(msg); + return false; + } + + if (routingKey.length() > 6) { + + if (routingKey.compare(0, 9, "agent.1.0") == 0) { + dispatchAgentCommandLH(msg); + return false; + } + + if (routingKey.compare(0, 8, "agent.1.") == 0) { + return authorizeAgentMessageLH(msg); + } - else if (routingKey.compare(0, 8, "agent.1.") == 0) { - return authorizeAgentMessageLH(msg); + if (routingKey.compare(0, 7, "schema.") == 0) { + dispatchAgentCommandLH(msg); + return true; + } + } } - else if (routingKey.compare(0, 7, "schema.") == 0) { - dispatchAgentCommandLH(msg); - return true; + if (qmf2Support) { + + if (topic) { + + // Intercept messages bound to: + // "console.ind.locate.# - process these messages, and also allow them to be forwarded. + + if (routingKey.compare(0, 18, "console.ind.locate") == 0) { + dispatchAgentCommandLH(msg); + return true; + } + + } else { // direct exchange + + // Intercept messages bound to: + // "broker" - generic alias for the local broker + // "<name_address>" - the broker agent's proper name + // and do not forward them futher + if (routingKey == "broker" || routingKey == name_address) { + dispatchAgentCommandLH(msg); + return false; + } + } } return true; @@ -610,14 +960,19 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; AclModule* acl = broker->getAcl(); + std::string inArgs; - ObjectId objId(inBuffer); + std::string sBuf; + inBuffer.getRawData(sBuf, 16); + ObjectId objId; + objId.decode(sBuf); inBuffer.getShortString(packageName); inBuffer.getShortString(className); inBuffer.getBin128(hash); inBuffer.getShortString(methodName); + inBuffer.getRawData(inArgs, inBuffer.available()); - QPID_LOG(trace, "RECV MethodRequest class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" << + QPID_LOG(trace, "RECV MethodRequest (v1) class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" << methodName << " replyTo=" << replyToKey); encodeHeader(outBuffer, 'm', sequence); @@ -629,8 +984,8 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence) - return; + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence); + return; } if (acl != 0) { @@ -645,8 +1000,8 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence) - return; + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); + return; } } @@ -664,7 +1019,9 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey try { outBuffer.record(); Mutex::ScopedUnlock u(userLock); - iter->second->doMethod(methodName, inBuffer, outBuffer); + std::string outBuf; + iter->second->doMethod(methodName, inArgs, outBuf); + outBuffer.putRawData(outBuf); } catch(exception& e) { outBuffer.restore(); outBuffer.putLong(Manageable::STATUS_EXCEPTION); @@ -675,9 +1032,135 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND MethodResponse to=" << replyToKey << " seq=" << sequence); + QPID_LOG(trace, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence); } + +void ManagementAgent::handleMethodRequestLH (const std::string& body, string replyTo, + const std::string& cid, const ConnectionToken* connToken) +{ + string methodName; + Variant::Map inMap; + MapCodec::decode(body, inMap); + Variant::Map::const_iterator oid, mid; + string content; + + Variant::Map outMap; + Variant::Map headers; + + headers["method"] = "response"; + headers["qmf.opcode"] = "_method_response"; + headers["qmf.agent"] = name_address; + + if ((oid = inMap.find("_object_id")) == inMap.end() || + (mid = inMap.find("_method_name")) == inMap.end()) + { + headers["qmf.opcode"] = "_exception"; + (outMap["_values"].asMap())["_status"] = Manageable::STATUS_PARAMETER_INVALID; + (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID); + + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND MethodResponse (invalid param) to=" << replyTo << " seq=" << cid); + return; + } + + ObjectId objId; + Variant::Map inArgs; + + try { + // coversions will throw if input is invalid. + objId = ObjectId(oid->second.asMap()); + methodName = mid->second.getString(); + + mid = inMap.find("_arguments"); + if (mid != inMap.end()) { + inArgs = (mid->second).asMap(); + } + } catch(exception& e) { + headers["qmf.opcode"] = "_exception"; + (outMap["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION; + (outMap["_values"].asMap())["_status_text"] = e.what(); + + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND MethodResponse (invalid format) to=" << replyTo << " seq=" << cid); + return; + } + + ManagementObjectMap::iterator iter = managementObjects.find(objId); + + if (iter == managementObjects.end() || iter->second->isDeleted()) { + headers["qmf.opcode"] = "_exception"; + (outMap["_values"].asMap())["_status"] = Manageable::STATUS_UNKNOWN_OBJECT; + (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT); + + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND MethodResponse (unknown object) to=" << replyTo << " seq=" << cid); + return; + } + + // validate + AclModule* acl = broker->getAcl(); + DisallowedMethods::const_iterator i; + + i = disallowed.find(std::make_pair(iter->second->getClassName(), methodName)); + if (i != disallowed.end()) { + headers["qmf.opcode"] = "_exception"; + (outMap["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN; + (outMap["_values"].asMap())["_status_text"] = i->second; + + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << cid); + return; + } + + if (acl != 0) { + string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId(); + map<acl::Property, string> params; + params[acl::PROP_SCHEMAPACKAGE] = iter->second->getPackageName(); + params[acl::PROP_SCHEMACLASS] = iter->second->getClassName(); + + if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) { + headers["qmf.opcode"] = "_exception"; + (outMap["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN; + (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN); + + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << cid); + return; + } + } + + // invoke the method + + QPID_LOG(trace, "RECV MethodRequest (v2) class=" << iter->second->getPackageName() + << ":" << iter->second->getClassName() << " method=" << + methodName << " replyTo=" << replyTo); + + try { + iter->second->doMethod(methodName, inArgs, outMap); + } catch(exception& e) { + outMap.clear(); + headers["qmf.opcode"] = "_exception"; + (outMap["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION; + (outMap["_values"].asMap())["_status_text"] = e.what(); + + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND MethodResponse (exception) to=" << replyTo << " seq=" << cid); + return; + } + + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid); +} + + void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); @@ -781,6 +1264,7 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, string replyToKey, uin uint32_t outLen; uint32_t sequence = nextRequestSequence++; + // Schema Request encodeHeader (outBuffer, 'S', sequence); outBuffer.putShortString(packageName); key.encode(outBuffer); @@ -803,9 +1287,11 @@ void ManagementAgent::SchemaClass::appendSchema(Buffer& buf) // linked in via plug-in), call the schema handler directly. If the package // is from a remote management agent, send the stored schema information. - if (writeSchemaCall != 0) - writeSchemaCall(buf); - else + if (writeSchemaCall != 0) { + std::string schema; + writeSchemaCall(schema); + buf.putRawData(schema); + } else buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size()); } @@ -981,7 +1467,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, string replyToKey agent->mgmtObject->set_connectionRef(agent->connectionRef); agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); - agent->mgmtObject->set_systemId (systemId); + agent->mgmtObject->set_systemId ((const unsigned char*)systemId.data()); agent->mgmtObject->set_brokerBank (brokerBank); agent->mgmtObject->set_agentBank (assignedBank); addObject (agent->mgmtObject, 0); @@ -1012,7 +1498,7 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin ft.decode(inBuffer); - QPID_LOG(trace, "RECV GetQuery query=" << ft << " seq=" << sequence); + QPID_LOG(trace, "RECV GetQuery (v1) query=" << ft << " seq=" << sequence); value = ft.get("_class"); if (value.get() == 0 || !value->convertsTo<string>()) { @@ -1031,13 +1517,17 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin object->setUpdateTime(); if (!object->isDeleted()) { + std::string sBuf; encodeHeader(outBuffer, 'g', sequence); - object->writeProperties(outBuffer); - object->writeStatistics(outBuffer, true); + object->writeProperties(sBuf); + outBuffer.putRawData(sBuf); + sBuf.clear(); + object->writeStatistics(sBuf, true); + outBuffer.putRawData(sBuf); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence); + QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); } } sendCommandComplete(replyToKey, sequence); @@ -1058,13 +1548,17 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin object->setUpdateTime(); if (!object->isDeleted()) { + std::string sBuf; encodeHeader(outBuffer, 'g', sequence); - object->writeProperties(outBuffer); - object->writeStatistics(outBuffer, true); + object->writeProperties(sBuf); + outBuffer.putRawData(sBuf); + sBuf.clear(); + object->writeStatistics(sBuf, true); + outBuffer.putRawData(sBuf); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence); + QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); } } } @@ -1072,64 +1566,285 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin sendCommandComplete(replyToKey, sequence); } + +void ManagementAgent::handleGetQueryLH(const std::string& body, std::string replyTo, const std::string& cid, const std::string& contentType) +{ + FieldTable ft; + FieldTable::ValuePtr value; + + moveNewObjectsLH(); + + if (contentType != "_query_v1") { + QPID_LOG(warning, "Support for QMF V2 Query format TBD!!!"); + return; + } + + Variant::Map inMap; + MapCodec::decode(body, inMap); + Variant::Map::const_iterator i; + Variant::Map headers; + + QPID_LOG(trace, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid); + + headers["method"] = "response"; + headers["qmf.opcode"] = "_query_response"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + headers["partial"]; + + Variant::List list_; + Variant::Map map_; + Variant::Map values; + string className; + string content; + + i = inMap.find("_class"); + if (i != inMap.end()) + try { + className = i->second.asString(); + } catch(exception& e) { + className.clear(); + QPID_LOG(trace, "RCVD GetQuery: invalid format - class target ignored."); + } + + if (className.empty()) { + ObjectId objId; + i = inMap.find("_object_id"); + if (i != inMap.end()) { + + try { + objId = ObjectId(i->second.asMap()); + } catch (exception &e) { + objId = ObjectId(); // empty object id - won't find a match (I hope). + QPID_LOG(trace, "RCVD GetQuery (invalid Object Id format) to=" << replyTo << " seq=" << cid); + } + + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter != managementObjects.end()) { + ManagementObject* object = iter->second; + + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + if (!object->isDeleted()) { + object->mapEncodeValues(values, true, true); // write both stats and properties + map_["_values"] = values; + list_.push_back(map_); + + ListCodec::encode(list_, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + } + } + } + } else { + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second; + if (object->getClassName () == className) { + + // @todo: support multiple objects per message reply + values.clear(); + list_.clear(); + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + if (!object->isDeleted()) { + object->mapEncodeValues(values, true, true); // write both stats and properties + map_["_values"] = values; + list_.push_back(map_); + + ListCodec::encode(list_, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + } + } + } + } + + // end empty "non-partial" message to indicate CommandComplete + list_.clear(); + headers.erase("partial"); + ListCodec::encode(list_, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND GetResponse (v2) to=" << replyTo << " seq=" << cid); +} + + +void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo, + const string& cid) +{ + QPID_LOG(trace, "RCVD AgentLocateRequest"); + + Variant::Map map; + Variant::Map headers; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_agent_locate_response"; + headers["qmf.agent"] = name_address; + + map["_values"] = attrMap; + map["_values"].asMap()["timestamp"] = uint64_t(Duration(now())); + map["_values"].asMap()["heartbeat_interval"] = interval; + + string content; + MapCodec::encode(map, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + + QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo); +} + + bool ManagementAgent::authorizeAgentMessageLH(Message& msg) { Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); - uint8_t opcode; - uint32_t sequence; - string replyToKey; + uint32_t sequence = 0; + bool methodReq = false; + bool mapMsg = false; + string packageName; + string className; + string methodName; + std::string cid; if (msg.encodedSize() > MA_BUFFER_SIZE) return false; msg.encodeContent(inBuffer); + uint32_t bufferLen = inBuffer.getPosition(); inBuffer.reset(); - if (!checkHeader(inBuffer, &opcode, &sequence)) - return false; + const framing::MessageProperties* p = + msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + + const framing::FieldTable *headers = msg.getApplicationHeaders(); + + if (headers && headers->getAsString("app_id") == "qmf2") + { + mapMsg = true; + + if (p && p->hasCorrelationId()) { + cid = p->getCorrelationId(); + } + + if (headers->getAsString("qmf.opcode") == "_method_request") + { + methodReq = true; + + // extract object id and method name + + std::string body; + inBuffer.getRawData(body, bufferLen); + Variant::Map inMap; + MapCodec::decode(body, inMap); + Variant::Map::const_iterator oid, mid; + + ObjectId objId; - if (opcode == 'M') { + if ((oid = inMap.find("_object_id")) == inMap.end() || + (mid = inMap.find("_method_name")) == inMap.end()) { + QPID_LOG(warning, + "Missing fields in QMF authorize req received."); + return false; + } + + try { + // coversions will throw if input is invalid. + objId = ObjectId(oid->second.asMap()); + methodName = mid->second.getString(); + } catch(exception& e) { + QPID_LOG(warning, + "Badly formatted QMF authorize req received."); + return false; + } + + // look up schema for object to get package and class name + + ManagementObjectMap::iterator iter = managementObjects.find(objId); + + if (iter == managementObjects.end() || iter->second->isDeleted()) { + QPID_LOG(debug, "ManagementAgent::authorizeAgentMessageLH: stale object id " << + objId); + return false; + } + + packageName = iter->second->getPackageName(); + className = iter->second->getClassName(); + } + } else { // old style binary message format + + uint8_t opcode; + + if (!checkHeader(inBuffer, &opcode, &sequence)) + return false; + + if (opcode == 'M') { + methodReq = true; + + // extract method name & schema package and class name + + uint8_t hash[16]; + inBuffer.getLongLong(); // skip over object id + inBuffer.getLongLong(); + inBuffer.getShortString(packageName); + inBuffer.getShortString(className); + inBuffer.getBin128(hash); + inBuffer.getShortString(methodName); + + } + } + + if (methodReq) { // TODO: check method call against ACL list. + map<acl::Property, string> params; AclModule* acl = broker->getAcl(); if (acl == 0) return true; string userId = ((const qpid::broker::ConnectionState*) msg.getPublisher())->getUserId(); - string packageName; - string className; - uint8_t hash[16]; - string methodName; - - map<acl::Property, string> params; - ObjectId objId(inBuffer); - inBuffer.getShortString(packageName); - inBuffer.getShortString(className); - inBuffer.getBin128(hash); - inBuffer.getShortString(methodName); - params[acl::PROP_SCHEMAPACKAGE] = packageName; params[acl::PROP_SCHEMACLASS] = className; if (acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) return true; + // authorization failed, send reply if replyTo present + const framing::MessageProperties* p = msg.getFrames().getHeaders()->get<framing::MessageProperties>(); if (p && p->hasReplyTo()) { const framing::ReplyTo& rt = p->getReplyTo(); - replyToKey = rt.getRoutingKey(); + string replyToKey = rt.getRoutingKey(); - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + if (mapMsg) { - encodeHeader(outBuffer, 'm', sequence); - outBuffer.putLong(Manageable::STATUS_FORBIDDEN); - outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence) - } + Variant::Map outMap; + Variant::Map headers; + + headers["method"] = "response"; + headers["qmf.opcode"] = "_method_response"; + headers["qmf.agent"] = name_address; + + ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN; + ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN); + + string content; + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, v2Direct, replyToKey); + + } else { + + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader(outBuffer, 'm', sequence); + outBuffer.putLong(Manageable::STATUS_FORBIDDEN); + outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); + } + + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); + } return false; } @@ -1139,9 +1854,6 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) void ManagementAgent::dispatchAgentCommandLH(Message& msg) { - Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE); - uint8_t opcode; - uint32_t sequence; string replyToKey; const framing::MessageProperties* p = @@ -1153,6 +1865,9 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg) else return; + Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE); + uint8_t opcode; + if (msg.encodedSize() > MA_BUFFER_SIZE) { QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " << msg.encodedSize()); @@ -1163,7 +1878,36 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg) uint32_t bufferLen = inBuffer.getPosition(); inBuffer.reset(); + const framing::FieldTable *headers = msg.getApplicationHeaders(); + + if (headers && headers->getAsString("app_id") == "qmf2") + { + std::string opcode = headers->getAsString("qmf.opcode"); + std::string contentType = headers->getAsString("qmf.content"); + std::string body; + std::string cid; + + inBuffer.getRawData(body, bufferLen); + + if (p && p->hasCorrelationId()) { + cid = p->getCorrelationId(); + } + + if (opcode == "_method_request") + return handleMethodRequestLH(body, replyToKey, cid, msg.getPublisher()); + else if (opcode == "_query_request") + return handleGetQueryLH(body, replyToKey, cid, contentType); + else if (opcode == "_agent_locate_request") + return handleLocateRequestLH(body, replyToKey, cid); + + QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!"); + return; + } + + // old preV2 binary messages + while (inBuffer.getPosition() < bufferLen) { + uint32_t sequence; if (!checkHeader(inBuffer, &opcode, &sequence)) return; @@ -1359,7 +2103,6 @@ ManagementObjectMap::iterator ManagementAgent::numericFind(const ObjectId& oid) return iter; } - void ManagementAgent::setAllocator(std::auto_ptr<IdAllocator> a) { Mutex::ScopedLock lock (userLock); @@ -1377,42 +2120,64 @@ void ManagementAgent::disallow(const std::string& className, const std::string& disallowed[std::make_pair(className, methodName)] = message; } +void ManagementAgent::SchemaClassKey::mapEncode(Variant::Map& _map) const { + _map["_cname"] = name; + _map["_hash"] = qpid::types::Uuid(hash); +} + +void ManagementAgent::SchemaClassKey::mapDecode(const Variant::Map& _map) { + Variant::Map::const_iterator i; + + if ((i = _map.find("_cname")) != _map.end()) { + name = i->second.asString(); + } + + if ((i = _map.find("_hash")) != _map.end()) { + const qpid::types::Uuid& uuid = i->second.asUuid(); + memcpy(hash, uuid.data(), uuid.size()); + } +} + void ManagementAgent::SchemaClassKey::encode(qpid::framing::Buffer& buffer) const { - buffer.checkAvailable(encodedSize()); + buffer.checkAvailable(encodedBufSize()); buffer.putShortString(name); buffer.putBin128(hash); } void ManagementAgent::SchemaClassKey::decode(qpid::framing::Buffer& buffer) { - buffer.checkAvailable(encodedSize()); + buffer.checkAvailable(encodedBufSize()); buffer.getShortString(name); buffer.getBin128(hash); } -uint32_t ManagementAgent::SchemaClassKey::encodedSize() const { +uint32_t ManagementAgent::SchemaClassKey::encodedBufSize() const { return 1 + name.size() + 16 /* bin128 */; } -void ManagementAgent::SchemaClass::encode(qpid::framing::Buffer& outBuf) const { - outBuf.checkAvailable(encodedSize()); - outBuf.putOctet(kind); - outBuf.putLong(pendingSequence); - outBuf.putLongString(data); +void ManagementAgent::SchemaClass::mapEncode(Variant::Map& _map) const { + _map["_type"] = kind; + _map["_pending_sequence"] = pendingSequence; + _map["_data"] = data; } -void ManagementAgent::SchemaClass::decode(qpid::framing::Buffer& inBuf) { - inBuf.checkAvailable(encodedSize()); - kind = inBuf.getOctet(); - pendingSequence = inBuf.getLong(); - inBuf.getLongString(data); -} +void ManagementAgent::SchemaClass::mapDecode(const Variant::Map& _map) { + Variant::Map::const_iterator i; -uint32_t ManagementAgent::SchemaClass::encodedSize() const { - return sizeof(uint8_t) + sizeof(uint32_t) + sizeof(uint32_t) + data.size(); + if ((i = _map.find("_type")) != _map.end()) { + kind = i->second; + } + if ((i = _map.find("_pending_sequence")) != _map.end()) { + pendingSequence = i->second; + } + if ((i = _map.find("_data")) != _map.end()) { + data = i->second.asString(); + } } void ManagementAgent::exportSchemas(std::string& out) { - out.clear(); + Variant::List list_; + Variant::Map map_, kmap, cmap; + for (PackageMap::const_iterator i = packages.begin(); i != packages.end(); ++i) { string name = i->first; const ClassMap& classes = i ->second; @@ -1421,90 +2186,143 @@ void ManagementAgent::exportSchemas(std::string& out) { const SchemaClass& klass = j->second; if (klass.writeSchemaCall == 0) { // Ignore built-in schemas. // Encode name, schema-key, schema-class - size_t encodedSize = 1+name.size()+key.encodedSize()+klass.encodedSize(); - size_t end = out.size(); - out.resize(end + encodedSize); - framing::Buffer outBuf(&out[end], encodedSize); - outBuf.putShortString(name); - key.encode(outBuf); - klass.encode(outBuf); + + map_.clear(); + kmap.clear(); + cmap.clear(); + + key.mapEncode(kmap); + klass.mapEncode(cmap); + + map_["_pname"] = name; + map_["_key"] = kmap; + map_["_class"] = cmap; + list_.push_back(map_); } } } + + ListCodec::encode(list_, out); } void ManagementAgent::importSchemas(qpid::framing::Buffer& inBuf) { - while (inBuf.available()) { + + string buf(inBuf.getPointer(), inBuf.available()); + Variant::List content; + ListCodec::decode(buf, content); + Variant::List::const_iterator l; + + + for (l = content.begin(); l != content.end(); l++) { string package; SchemaClassKey key; SchemaClass klass; - inBuf.getShortString(package); - key.decode(inBuf); - klass.decode(inBuf); - packages[package][key] = klass; + Variant::Map map_, kmap, cmap; + Variant::Map::const_iterator i; + + map_ = l->asMap(); + + if ((i = map_.find("_pname")) != map_.end()) { + package = i->second.asString(); + + if ((i = map_.find("_key")) != map_.end()) { + key.mapDecode(i->second.asMap()); + + if ((i = map_.find("_class")) != map_.end()) { + klass.mapDecode(i->second.asMap()); + + packages[package][key] = klass; + } + } + } } } -void ManagementAgent::RemoteAgent::encode(qpid::framing::Buffer& outBuf) const { - outBuf.checkAvailable(encodedSize()); - outBuf.putLong(brokerBank); - outBuf.putLong(agentBank); - outBuf.putShortString(routingKey); - // TODO aconway 2010-03-04: we send the v2Key instead of the - // ObjectId because that has the same meaning on different - // brokers. ObjectId::encode doesn't currently encode the v2Key, - // this can be cleaned up when it does. - outBuf.putMediumString(connectionRef.getV2Key()); - mgmtObject->writeProperties(outBuf); +void ManagementAgent::RemoteAgent::mapEncode(Variant::Map& map_) const { + Variant::Map _objId, _values; + + map_["_brokerBank"] = brokerBank; + map_["_agentBank"] = agentBank; + map_["_routingKey"] = routingKey; + + connectionRef.mapEncode(_objId); + map_["_object_id"] = _objId; + + mgmtObject->mapEncodeValues(_values, true, false); + map_["_values"] = _values; } -void ManagementAgent::RemoteAgent::decode(qpid::framing::Buffer& inBuf) { - brokerBank = inBuf.getLong(); - agentBank = inBuf.getLong(); - inBuf.getShortString(routingKey); +void ManagementAgent::RemoteAgent::mapDecode(const Variant::Map& map_) { + Variant::Map::const_iterator i; + + if ((i = map_.find("_brokerBank")) != map_.end()) { + brokerBank = i->second; + } + + if ((i = map_.find("_agentBank")) != map_.end()) { + agentBank = i->second; + } + + if ((i = map_.find("_routingKey")) != map_.end()) { + routingKey = i->second.getString(); + } - // TODO aconway 2010-03-04: see comment in encode() - string connectionKey; - inBuf.getMediumString(connectionKey); - connectionRef = ObjectId(); // Clear out any existing value. - connectionRef.setV2Key(connectionKey); + if ((i = map_.find("_object_id")) != map_.end()) { + connectionRef.mapDecode(i->second.asMap()); + } mgmtObject = new _qmf::Agent(&agent, this); - mgmtObject->readProperties(inBuf); + + if ((i = map_.find("_values")) != map_.end()) { + mgmtObject->mapDecodeValues(i->second.asMap()); + } + // TODO aconway 2010-03-04: see comment in encode(), readProperties doesn't set v2key. mgmtObject->set_connectionRef(connectionRef); } -uint32_t ManagementAgent::RemoteAgent::encodedSize() const { - // TODO aconway 2010-03-04: see comment in encode() - return sizeof(uint32_t) + sizeof(uint32_t) // 2 x Long - + routingKey.size() + sizeof(uint8_t) // ShortString - + connectionRef.getV2Key().size() + sizeof(uint16_t) // medium string - + mgmtObject->writePropertiesSize(); -} - void ManagementAgent::exportAgents(std::string& out) { - out.clear(); + Variant::List list_; + Variant::Map map_, omap, amap; + for (RemoteAgentMap::const_iterator i = remoteAgents.begin(); i != remoteAgents.end(); ++i) { // TODO aconway 2010-03-04: see comment in ManagementAgent::RemoteAgent::encode RemoteAgent* agent = i->second; - size_t encodedSize = agent->encodedSize(); - size_t end = out.size(); - out.resize(end + encodedSize); - framing::Buffer outBuf(&out[end], encodedSize); - agent->encode(outBuf); + + map_.clear(); + amap.clear(); + + agent->mapEncode(amap); + map_["_remote_agent"] = amap; + list_.push_back(map_); } + + ListCodec::encode(list_, out); } void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) { - while (inBuf.available()) { + string buf(inBuf.getPointer(), inBuf.available()); + Variant::List content; + ListCodec::decode(buf, content); + Variant::List::const_iterator l; + + for (l = content.begin(); l != content.end(); l++) { std::auto_ptr<RemoteAgent> agent(new RemoteAgent(*this)); - agent->decode(inBuf); - addObject(agent->mgmtObject, 0); - remoteAgents[agent->connectionRef] = agent.release(); + Variant::Map map_; + Variant::Map::const_iterator i; + + map_ = l->asMap(); + + if ((i = map_.find("_remote_agent")) != map_.end()) { + + agent->mapDecode(i->second.asMap()); + + addObject (agent->mgmtObject, 0, false); + remoteAgents[agent->connectionRef] = agent.release(); + } } } @@ -1519,3 +2337,198 @@ std::string ManagementAgent::debugSnapshot() { msg << " new objects: " << newManagementObjects.size(); return msg.str(); } + +Variant::Map ManagementAgent::toMap(const FieldTable& from) +{ + Variant::Map map; + + for (FieldTable::const_iterator iter = from.begin(); iter != from.end(); iter++) { + const string& key(iter->first); + const FieldTable::ValuePtr& val(iter->second); + + map[key] = toVariant(val); + } + + return map; +} + +Variant::List ManagementAgent::toList(const List& from) +{ + Variant::List _list; + + for (List::const_iterator iter = from.begin(); iter != from.end(); iter++) { + const List::ValuePtr& val(*iter); + + _list.push_back(toVariant(val)); + } + + return _list; +} + +qpid::framing::FieldTable ManagementAgent::fromMap(const Variant::Map& from) +{ + qpid::framing::FieldTable ft; + + for (Variant::Map::const_iterator iter = from.begin(); + iter != from.end(); + iter++) { + const string& key(iter->first); + const Variant& val(iter->second); + + ft.set(key, toFieldValue(val)); + } + + return ft; +} + + +List ManagementAgent::fromList(const Variant::List& from) +{ + List fa; + + for (Variant::List::const_iterator iter = from.begin(); + iter != from.end(); + iter++) { + const Variant& val(*iter); + + fa.push_back(toFieldValue(val)); + } + + return fa; +} + + +boost::shared_ptr<FieldValue> ManagementAgent::toFieldValue(const Variant& in) +{ + + switch(in.getType()) { + + case types::VAR_VOID: return boost::shared_ptr<FieldValue>(new VoidValue()); + case types::VAR_BOOL: return boost::shared_ptr<FieldValue>(new BoolValue(in.asBool())); + case types::VAR_UINT8: return boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8())); + case types::VAR_UINT16: return boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16())); + case types::VAR_UINT32: return boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32())); + case types::VAR_UINT64: return boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64())); + case types::VAR_INT8: return boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8())); + case types::VAR_INT16: return boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16())); + case types::VAR_INT32: return boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32())); + case types::VAR_INT64: return boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64())); + case types::VAR_FLOAT: return boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat())); + case types::VAR_DOUBLE: return boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble())); + case types::VAR_STRING: return boost::shared_ptr<FieldValue>(new Str16Value(in.asString())); + case types::VAR_UUID: return boost::shared_ptr<FieldValue>(new UuidValue(in.asUuid().data())); + case types::VAR_MAP: return boost::shared_ptr<FieldValue>(new FieldTableValue(ManagementAgent::fromMap(in.asMap()))); + case types::VAR_LIST: return boost::shared_ptr<FieldValue>(new ListValue(ManagementAgent::fromList(in.asList()))); + } + + QPID_LOG(error, "Unknown Variant type - not converted: [" << in.getType() << "]"); + return boost::shared_ptr<FieldValue>(new VoidValue()); +} + +// stolen from qpid/client/amqp0_10/Codecs.cpp - TODO: make Codecs public, and remove this dup. +Variant ManagementAgent::toVariant(const boost::shared_ptr<FieldValue>& in) +{ + const std::string iso885915("iso-8859-15"); + const std::string utf8("utf8"); + const std::string utf16("utf16"); + //const std::string binary("binary"); + const std::string amqp0_10_binary("amqp0-10:binary"); + //const std::string amqp0_10_bit("amqp0-10:bit"); + const std::string amqp0_10_datetime("amqp0-10:datetime"); + const std::string amqp0_10_struct("amqp0-10:struct"); + Variant out; + + //based on AMQP 0-10 typecode, pick most appropriate variant type + switch (in->getType()) { + //Fixed Width types: + case 0x00: //bin8 + case 0x01: out.setEncoding(amqp0_10_binary); // int8 + case 0x02: out = in->getIntegerValue<int8_t, 1>(); break; //uint8 + case 0x03: out = in->getIntegerValue<uint8_t, 1>(); break; // + // case 0x04: break; //TODO: iso-8859-15 char // char + case 0x08: out = static_cast<bool>(in->getIntegerValue<uint8_t, 1>()); break; // bool int8 + + case 0x10: out.setEncoding(amqp0_10_binary); // bin16 + case 0x11: out = in->getIntegerValue<int16_t, 2>(); break; // int16 + case 0x12: out = in->getIntegerValue<uint16_t, 2>(); break; //uint16 + + case 0x20: out.setEncoding(amqp0_10_binary); // bin32 + case 0x21: out = in->getIntegerValue<int32_t, 4>(); break; // int32 + case 0x22: out = in->getIntegerValue<uint32_t, 4>(); break; // uint32 + + case 0x23: out = in->get<float>(); break; // float(32) + + // case 0x27: break; //TODO: utf-32 char + + case 0x30: out.setEncoding(amqp0_10_binary); // bin64 + case 0x31: out = in->getIntegerValue<int64_t, 8>(); break; //int64 + + case 0x38: out.setEncoding(amqp0_10_datetime); //treat datetime as uint64_t, but set encoding + case 0x32: out = in->getIntegerValue<uint64_t, 8>(); break; //uint64 + case 0x33: out = in->get<double>(); break; // double + + case 0x48: // uuid + { + unsigned char data[16]; + in->getFixedWidthValue<16>(data); + out = qpid::types::Uuid(data); + } break; + + //TODO: figure out whether and how to map values with codes 0x40-0xd8 + + case 0xf0: break;//void, which is the default value for Variant + // case 0xf1: out.setEncoding(amqp0_10_bit); break;//treat 'bit' as void, which is the default value for Variant + + //Variable Width types: + //strings: + case 0x80: // str8 + case 0x90: // str16 + case 0xa0: // str32 + out = in->get<std::string>(); + out.setEncoding(amqp0_10_binary); + break; + + case 0x84: // str8 + case 0x94: // str16 + out = in->get<std::string>(); + out.setEncoding(iso885915); + break; + + case 0x85: // str8 + case 0x95: // str16 + out = in->get<std::string>(); + out.setEncoding(utf8); + break; + + case 0x86: // str8 + case 0x96: // str16 + out = in->get<std::string>(); + out.setEncoding(utf16); + break; + + case 0xab: // str32 + out = in->get<std::string>(); + out.setEncoding(amqp0_10_struct); + break; + + case 0xa8: // map + out = ManagementAgent::toMap(in->get<FieldTable>()); + break; + + case 0xa9: // list of variant types + out = ManagementAgent::toList(in->get<List>()); + break; + //case 0xaa: //convert amqp0-10 array (uniform type) into variant list + // out = Variant::List(); + // translate<Array>(in, out.asList(), &toVariant); + // break; + + default: + //error? + QPID_LOG(error, "Unknown FieldValue type - not converted: [" << (unsigned int)(in->getType()) << "]"); + break; + } + + return out; +} + diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index 5b2c54f1b8..0250f39dd6 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -32,7 +32,9 @@ #include "qpid/management/ManagementEvent.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Agent.h" +#include "qpid/types/Variant.h" #include <qpid/framing/AMQFrame.h> +#include <qpid/framing/FieldValue.h> #include <memory> #include <string> #include <map> @@ -62,7 +64,7 @@ public: } severity_t; - ManagementAgent (); + ManagementAgent (const bool qmfV1, const bool qmfV2); virtual ~ManagementAgent (); /** Called before plugins are initialized */ @@ -74,6 +76,9 @@ public: /** Called by cluster to suppress management output during update. */ void suppress(bool s) { suppressed = s; } + void setName(const std::string& vendor, + const std::string& product, + const std::string& instance=""); void setInterval(uint16_t _interval) { interval = _interval; } void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange, qpid::broker::Exchange::shared_ptr directExchange); @@ -91,6 +96,9 @@ public: ManagementObject::writeSchemaCall_t schemaCall); QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object, uint64_t persistId = 0); + QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object, + const std::string& key, + bool persistent = true); QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event, severity_t severity = SEV_DEFAULT); QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey); @@ -99,7 +107,8 @@ public: bool dispatchCommand (qpid::broker::Deliverable& msg, const std::string& routingKey, - const framing::FieldTable* args); + const framing::FieldTable* args, + const bool topic); const framing::Uuid& getUuid() const { return uuid; } @@ -128,6 +137,15 @@ public: uint16_t getBootSequence(void) { return bootSequence; } void setBootSequence(uint16_t b) { bootSequence = b; } + // TODO: remove these when Variant API moved into common library. + static types::Variant::Map toMap(const framing::FieldTable& from); + static framing::FieldTable fromMap(const types::Variant::Map& from); + static types::Variant::List toList(const framing::List& from); + static framing::List fromList(const types::Variant::List& from); + static boost::shared_ptr<framing::FieldValue> toFieldValue(const types::Variant& in); + static types::Variant toVariant(const boost::shared_ptr<framing::FieldValue>& val); + + private: struct Periodic : public qpid::sys::TimerTask { @@ -153,9 +171,8 @@ private: ManagementObject* GetManagementObject (void) const { return mgmtObject; } virtual ~RemoteAgent (); - void encode(framing::Buffer& buffer) const; - void decode(framing::Buffer& buffer); - uint32_t encodedSize() const; + void mapEncode(qpid::types::Variant::Map& _map) const; + void mapDecode(const qpid::types::Variant::Map& _map); }; // TODO: Eventually replace string with entire reply-to structure. reply-to @@ -175,9 +192,11 @@ private: std::string name; uint8_t hash[16]; + void mapEncode(qpid::types::Variant::Map& _map) const; + void mapDecode(const qpid::types::Variant::Map& _map); void encode(framing::Buffer& buffer) const; void decode(framing::Buffer& buffer); - uint32_t encodedSize() const; + uint32_t encodedBufSize() const; }; struct SchemaClassKeyComp @@ -209,9 +228,8 @@ private: bool hasSchema () { return (writeSchemaCall != 0) || !data.empty(); } void appendSchema (framing::Buffer& buf); - void encode(framing::Buffer& buffer) const; - void decode(framing::Buffer& buffer); - uint32_t encodedSize() const; + void mapEncode(qpid::types::Variant::Map& _map) const; + void mapDecode(const qpid::types::Variant::Map& _map); }; typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; @@ -264,6 +282,14 @@ private: typedef std::map<MethodName, std::string> DisallowedMethods; DisallowedMethods disallowed; + // Agent name and address + qpid::types::Variant::Map attrMap; + std::string name_address; + + // supported management protocol + bool qmf1Support; + bool qmf2Support; + # define MA_BUFFER_SIZE 65536 char inputBuffer[MA_BUFFER_SIZE]; @@ -279,6 +305,11 @@ private: uint32_t length, qpid::broker::Exchange::shared_ptr exchange, std::string routingKey); + void sendBuffer(const std::string& data, + const std::string& cid, + const qpid::types::Variant::Map& headers, + qpid::broker::Exchange::shared_ptr exchange, + const std::string& routingKey); void moveNewObjectsLH(); bool authorizeAgentMessageLH(qpid::broker::Message& msg); @@ -311,6 +342,10 @@ private: void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); + void handleGetQueryLH (const std::string& body, std::string replyToKey, const std::string& cid, const std::string& contentType); + void handleMethodRequestLH (const std::string& body, std::string replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken); + void handleLocateRequestLH (const std::string& body, const std::string &replyToKey, const std::string& cid); + size_t validateSchema(framing::Buffer&, uint8_t kind); size_t validateTableSchema(framing::Buffer&); diff --git a/cpp/src/qpid/management/ManagementDirectExchange.cpp b/cpp/src/qpid/management/ManagementDirectExchange.cpp index 0813e30891..6dc41ef073 100644 --- a/cpp/src/qpid/management/ManagementDirectExchange.cpp +++ b/cpp/src/qpid/management/ManagementDirectExchange.cpp @@ -29,13 +29,16 @@ using namespace qpid::framing; using namespace qpid::sys; ManagementDirectExchange::ManagementDirectExchange(const string& _name, Manageable* _parent, Broker* b) : - Exchange (_name, _parent, b), DirectExchange(_name, _parent, b) {} + Exchange (_name, _parent, b), + DirectExchange(_name, _parent, b), + managementAgent(0) {} ManagementDirectExchange::ManagementDirectExchange(const std::string& _name, bool _durable, const FieldTable& _args, Manageable* _parent, Broker* b) : Exchange (_name, _durable, _args, _parent, b), - DirectExchange(_name, _durable, _args, _parent, b) {} + DirectExchange(_name, _durable, _args, _parent, b), + managementAgent(0) {} void ManagementDirectExchange::route(Deliverable& msg, const string& routingKey, @@ -43,7 +46,8 @@ void ManagementDirectExchange::route(Deliverable& msg, { bool routeIt = true; - // TODO: Intercept messages directed to the embedded agent and send them to the management agent. + if (managementAgent) + routeIt = managementAgent->dispatchCommand(msg, routingKey, args, false /*direct*/); if (routeIt) DirectExchange::route(msg, routingKey, args); diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp index 4b87800174..46fc67d07f 100644 --- a/cpp/src/qpid/management/ManagementObject.cpp +++ b/cpp/src/qpid/management/ManagementObject.cpp @@ -22,7 +22,10 @@ #include "qpid/management/Manageable.h" #include "qpid/management/ManagementObject.h" #include "qpid/framing/FieldTable.h" +#include "qpid/framing/Buffer.h" #include "qpid/sys/Thread.h" +#include "qpid/log/Statement.h" +#include <boost/lexical_cast.hpp> #include <stdlib.h> @@ -36,26 +39,37 @@ void AgentAttachment::setBanks(uint32_t broker, uint32_t bank) ((uint64_t) (bank & 0x0fffffff)); } -ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint32_t bank, uint64_t object) - : agent(0) +// Deprecated +ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint64_t object) + : agent(0), agentEpoch(seq) { first = ((uint64_t) (flags & 0x0f)) << 60 | ((uint64_t) (seq & 0x0fff)) << 48 | - ((uint64_t) (broker & 0x000fffff)) << 28 | - ((uint64_t) (bank & 0x0fffffff)); + ((uint64_t) (broker & 0x000fffff)) << 28; second = object; } -ObjectId::ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq, uint64_t object) - : agent(_agent) + +ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker) + : agent(0), second(0), agentEpoch(seq) { first = + ((uint64_t) (flags & 0x0f)) << 60 | + ((uint64_t) (seq & 0x0fff)) << 48 | + ((uint64_t) (broker & 0x000fffff)) << 28; +} + +ObjectId::ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq) + : agent(_agent), second(0), agentEpoch(seq) +{ + + first = ((uint64_t) (flags & 0x0f)) << 60 | ((uint64_t) (seq & 0x0fff)) << 48; - second = object; } + ObjectId::ObjectId(std::istream& in) : agent(0) { std::string text; @@ -75,6 +89,10 @@ void ObjectId::fromString(const std::string& text) # define atoll(X) _atoi64(X) #endif + // format: + // V1: <flags>-<sequence>-<broker-bank>-<agent-bank>-<uint64-app-id> + // V2: Not used + std::string copy(text.c_str()); char* cText; char* field[FIELDS]; @@ -99,10 +117,13 @@ void ObjectId::fromString(const std::string& text) if (idx != FIELDS) throw Exception("Invalid ObjectId format"); + agentEpoch = atoll(field[1]); + first = (atoll(field[0]) << 60) + (atoll(field[1]) << 48) + - (atoll(field[2]) << 28) + - atoll(field[3]); + (atoll(field[2]) << 28); + + agentName = std::string(field[3]); second = atoll(field[4]); } @@ -123,21 +144,40 @@ bool ObjectId::equalV1(const ObjectId &other) const return first == otherFirst && second == other.second; } -void ObjectId::encode(framing::Buffer& buffer) const +// encode as V1-format binary +void ObjectId::encode(std::string& buffer) const { + const uint32_t len = 16; + char _data[len]; + qpid::framing::Buffer body(_data, len); + if (agent == 0) - buffer.putLongLong(first); + body.putLongLong(first); else - buffer.putLongLong(first | agent->first); - buffer.putLongLong(second); + body.putLongLong(first | agent->first); + body.putLongLong(second); + + body.reset(); + body.getRawData(buffer, len); } -void ObjectId::decode(framing::Buffer& buffer) +// decode as V1-format binary +void ObjectId::decode(const std::string& buffer) { - first = buffer.getLongLong(); - second = buffer.getLongLong(); + const uint32_t len = 16; + char _data[len]; + qpid::framing::Buffer body(_data, len); + + body.checkAvailable(buffer.length()); + body.putRawData(buffer); + body.reset(); + first = body.getLongLong(); + second = body.getLongLong(); + v2Key = boost::lexical_cast<std::string>(second); } +// generate the V2 key from the index fields defined +// in the schema. void ObjectId::setV2Key(const ManagementObject& object) { std::stringstream oname; @@ -145,6 +185,42 @@ void ObjectId::setV2Key(const ManagementObject& object) v2Key = oname.str(); } +// encode as V2-format map +void ObjectId::mapEncode(types::Variant::Map& map) const +{ + map["_object_name"] = v2Key; + if (!agentName.empty()) + map["_agent_name"] = agentName; + if (agentEpoch) + map["_agent_epoch"] = agentEpoch; +} + +// decode as v2-format map +void ObjectId::mapDecode(const types::Variant::Map& map) +{ + types::Variant::Map::const_iterator i; + + if ((i = map.find("_object_name")) != map.end()) + v2Key = i->second.asString(); + else + throw Exception("Required _object_name field missing."); + + if ((i = map.find("_agent_name")) != map.end()) + agentName = i->second.asString(); + + if ((i = map.find("_agent_epoch")) != map.end()) + agentEpoch = i->second.asInt64(); +} + + +ObjectId::operator types::Variant::Map() const +{ + types::Variant::Map m; + mapEncode(m); + return m; +} + + namespace qpid { namespace management { @@ -158,7 +234,7 @@ std::ostream& operator<<(std::ostream& out, const ObjectId& i) out << ((virtFirst & 0xF000000000000000LL) >> 60) << "-" << ((virtFirst & 0x0FFF000000000000LL) >> 48) << "-" << ((virtFirst & 0x0000FFFFF0000000LL) >> 28) << - "-" << (virtFirst & 0x000000000FFFFFFFLL) << + "-" << i.agentName << "-" << i.second; return out; } @@ -168,43 +244,88 @@ std::ostream& operator<<(std::ostream& out, const ObjectId& i) int ManagementObject::maxThreads = 1; int ManagementObject::nextThreadIndex = 0; -void ManagementObject::writeTimestamps (framing::Buffer& buf) const +void ManagementObject::writeTimestamps (std::string& buf) const { - buf.putShortString (getPackageName ()); - buf.putShortString (getClassName ()); - buf.putBin128 (getMd5Sum ()); - buf.putLongLong (updateTime); - buf.putLongLong (createTime); - buf.putLongLong (destroyTime); - objectId.encode(buf); + char _data[4000]; + qpid::framing::Buffer body(_data, 4000); + + body.putShortString (getPackageName ()); + body.putShortString (getClassName ()); + body.putBin128 (getMd5Sum ()); + body.putLongLong (updateTime); + body.putLongLong (createTime); + body.putLongLong (destroyTime); + + uint32_t len = body.getPosition(); + body.reset(); + body.getRawData(buf, len); + + std::string oid; + objectId.encode(oid); + buf += oid; } -void ManagementObject::readTimestamps (framing::Buffer& buf) +void ManagementObject::readTimestamps (const std::string& buf) { + char _data[4000]; + qpid::framing::Buffer body(_data, 4000); std::string unused; uint8_t unusedUuid[16]; - ObjectId unusedObjectId; - buf.getShortString(unused); - buf.getShortString(unused); - buf.getBin128(unusedUuid); - updateTime = buf.getLongLong(); - createTime = buf.getLongLong(); - destroyTime = buf.getLongLong(); - unusedObjectId.decode(buf); + body.checkAvailable(buf.length()); + body.putRawData(buf); + body.reset(); + + body.getShortString(unused); + body.getShortString(unused); + body.getBin128(unusedUuid); + updateTime = body.getLongLong(); + createTime = body.getLongLong(); + destroyTime = body.getLongLong(); } uint32_t ManagementObject::writeTimestampsSize() const { return 1 + getPackageName().length() + // str8 - 1 + getClassName().length() + // str8 - 16 + // bin128 - 8 + // uint64 - 8 + // uint64 - 8 + // uint64 - objectId.encodedSize(); // objectId + 1 + getClassName().length() + // str8 + 16 + // bin128 + 8 + // uint64 + 8 + // uint64 + 8 + // uint64 + objectId.encodedSize(); // objectId +} + + +void ManagementObject::writeTimestamps (types::Variant::Map& map) const +{ + types::Variant::Map oid, sid; + + sid["_package_name"] = getPackageName(); + sid["_class_name"] = getClassName(); + sid["_hash"] = qpid::types::Uuid(getMd5Sum()); + map["_schema_id"] = sid; + + objectId.mapEncode(oid); + map["_object_id"] = oid; + + map["_update_ts"] = updateTime; + map["_create_ts"] = createTime; + map["_delete_ts"] = destroyTime; +} + +void ManagementObject::readTimestamps (const types::Variant::Map& map) +{ + types::Variant::Map::const_iterator i; + + if ((i = map.find("_update_ts")) != map.end()) + updateTime = i->second.asUint64(); + if ((i = map.find("_create_ts")) != map.end()) + createTime = i->second.asUint64(); + if ((i = map.find("_delete_ts")) != map.end()) + destroyTime = i->second.asUint64(); } + void ManagementObject::setReference(ObjectId) {} int ManagementObject::getThreadIndex() { @@ -217,3 +338,26 @@ int ManagementObject::getThreadIndex() { } return thisIndex; } + + +void ManagementObject::mapEncode(types::Variant::Map& map, + bool includeProperties, + bool includeStatistics) +{ + types::Variant::Map values; + + writeTimestamps(map); + + mapEncodeValues(values, includeProperties, includeStatistics); + map["_values"] = values; +} + +void ManagementObject::mapDecode(const types::Variant::Map& map) +{ + types::Variant::Map::const_iterator i; + + readTimestamps(map); + + if ((i = map.find("_values")) != map.end()) + mapDecodeValues(i->second.asMap()); +} diff --git a/cpp/src/qpid/management/ManagementTopicExchange.cpp b/cpp/src/qpid/management/ManagementTopicExchange.cpp index 98650b3adf..7fdce133e5 100644 --- a/cpp/src/qpid/management/ManagementTopicExchange.cpp +++ b/cpp/src/qpid/management/ManagementTopicExchange.cpp @@ -28,13 +28,16 @@ using namespace qpid::framing; using namespace qpid::sys; ManagementTopicExchange::ManagementTopicExchange(const string& _name, Manageable* _parent, Broker* b) : - Exchange (_name, _parent, b), TopicExchange(_name, _parent, b) {} + Exchange (_name, _parent, b), + TopicExchange(_name, _parent, b), + managementAgent(0) {} ManagementTopicExchange::ManagementTopicExchange(const std::string& _name, bool _durable, const FieldTable& _args, Manageable* _parent, Broker* b) : Exchange (_name, _durable, _args, _parent, b), - TopicExchange(_name, _durable, _args, _parent, b) {} + TopicExchange(_name, _durable, _args, _parent, b), + managementAgent(0) {} void ManagementTopicExchange::route(Deliverable& msg, const string& routingKey, @@ -43,12 +46,8 @@ void ManagementTopicExchange::route(Deliverable& msg, bool routeIt = true; // Intercept management agent commands - if (qmfVersion == 1) { - if ((routingKey.length() > 6 && - routingKey.substr(0, 6).compare("agent.") == 0) || - (routingKey == "broker")) - routeIt = managementAgent->dispatchCommand(msg, routingKey, args); - } + if (managementAgent) + routeIt = managementAgent->dispatchCommand(msg, routingKey, args, true /* topic */); if (routeIt) TopicExchange::route(msg, routingKey, args); diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 7377edc3bb..9c1a761062 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -267,15 +267,15 @@ txjob_LDADD=$(lib_client) check_PROGRAMS+=PollerTest PollerTest_SOURCES=PollerTest.cpp -PollerTest_LDADD=$(lib_common) $(SOCKLIBS) +PollerTest_LDADD=$(lib_common) $(lib_client) $(SOCKLIBS) check_PROGRAMS+=DispatcherTest DispatcherTest_SOURCES=DispatcherTest.cpp -DispatcherTest_LDADD=$(lib_common) $(SOCKLIBS) +DispatcherTest_LDADD=$(lib_common) $(lib_client) $(SOCKLIBS) check_PROGRAMS+=datagen datagen_SOURCES=datagen.cpp -datagen_LDADD=$(lib_common) +datagen_LDADD=$(lib_common) $(lib_client) check_PROGRAMS+=qrsh_server qrsh_server_SOURCES=qrsh_server.cpp diff --git a/cpp/src/tests/ManagementTest.cpp b/cpp/src/tests/ManagementTest.cpp index d05b4676ba..99b9c1f03e 100644 --- a/cpp/src/tests/ManagementTest.cpp +++ b/cpp/src/tests/ManagementTest.cpp @@ -56,32 +56,34 @@ QPID_AUTO_TEST_CASE(testObjectIdSerializeString) { } QPID_AUTO_TEST_CASE(testObjectIdEncode) { - char buffer[100]; - Buffer msgBuf(buffer, 100); - msgBuf.putLongLong(0x1002000030000004LL); - msgBuf.putLongLong(0x0000000000000005LL); - msgBuf.reset(); + qpid::types::Variant::Map oidMap; - ObjectId oid(msgBuf); + ObjectId oid(1, 2, 3, 9999); + oid.setV2Key("testkey"); + oid.setAgentName("myAgent"); std::stringstream out1; out1 << oid; - BOOST_CHECK_EQUAL(out1.str(), "1-2-3-4-5"); + BOOST_CHECK_EQUAL(out1.str(), "1-2-3-myAgent-9999"); } QPID_AUTO_TEST_CASE(testObjectIdAttach) { AgentAttachment agent; - ObjectId oid(&agent, 10, 20, 50); + ObjectId oid(&agent, 10, 20); + oid.setV2Key("GabbaGabbaHey"); + oid.setAgentName("MrSmith"); std::stringstream out1; out1 << oid; - BOOST_CHECK_EQUAL(out1.str(), "10-20-0-0-50"); + + BOOST_CHECK_EQUAL(out1.str(), "10-20-0-MrSmith-0"); agent.setBanks(30, 40); std::stringstream out2; out2 << oid; - BOOST_CHECK_EQUAL(out2.str(), "10-20-30-40-50"); + + BOOST_CHECK_EQUAL(out2.str(), "10-20-30-MrSmith-0"); } QPID_AUTO_TEST_CASE(testConsoleObjectId) { diff --git a/extras/qmf/src/py/qmf/console.py b/extras/qmf/src/py/qmf/console.py index 7bda233b9a..3d32ae36f7 100644 --- a/extras/qmf/src/py/qmf/console.py +++ b/extras/qmf/src/py/qmf/console.py @@ -41,6 +41,9 @@ from cStringIO import StringIO #import qpid.log #qpid.log.enable(name="qpid.io.cmd", level=qpid.log.DEBUG) +#=================================================================================================== +# CONSOLE +#=================================================================================================== class Console: """ To access the asynchronous operations, a class must be derived from Console with overrides of any combination of the available methods. """ @@ -94,6 +97,10 @@ class Console: """ Invoked when a method response from an asynchronous method call is received. """ pass + +#=================================================================================================== +# BrokerURL +#=================================================================================================== class BrokerURL(URL): def __init__(self, text): URL.__init__(self, text) @@ -115,16 +122,30 @@ class BrokerURL(URL): def match(self, host, port): return socket.getaddrinfo(self.host, self.port)[0][4] == socket.getaddrinfo(host, port)[0][4] + +#=================================================================================================== +# Object +#=================================================================================================== class Object(object): - """ This class defines a 'proxy' object representing a real managed object on an agent. - Actions taken on this proxy are remotely affected on the real managed object. """ - def __init__(self, session, broker, schema, codec, prop, stat, managed=True, kwargs={}): - self._session = session - self._broker = broker + This class defines a 'proxy' object representing a real managed object on an agent. + Actions taken on this proxy are remotely affected on the real managed object. + """ + def __init__(self, agent, schema, codec=None, prop=None, stat=None, v2Map=None, agentName=None, kwargs={}): + self._agent = agent + self._session = None + self._broker = None + if agent: + self._session = agent.session + self._broker = agent.broker self._schema = schema - self._managed = managed - if self._managed: + self._properties = [] + self._statistics = [] + if v2Map: + self.v2Init(v2Map, agentName) + return + + if self._agent: self._currentTime = codec.read_uint64() self._createTime = codec.read_uint64() self._deleteTime = codec.read_uint64() @@ -134,8 +155,6 @@ class Object(object): self._createTime = None self._deleteTime = None self._objectId = None - self._properties = [] - self._statistics = [] if codec: if prop: notPresent = self._parsePresenceMasks(codec, schema) @@ -143,18 +162,38 @@ class Object(object): if property.name in notPresent: self._properties.append((property, None)) else: - self._properties.append((property, self._session._decodeValue(codec, property.type, broker))) + self._properties.append((property, self._session._decodeValue(codec, property.type, self._broker))) if stat: for statistic in schema.getStatistics(): - self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type, broker))) + self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type, self._broker))) else: for property in schema.getProperties(): if property.optional: self._properties.append((property, None)) else: - self._properties.append((property, self._session._defaultValue(property, broker, kwargs))) + self._properties.append((property, self._session._defaultValue(property, self._broker, kwargs))) for statistic in schema.getStatistics(): - self._statistics.append((statistic, self._session._defaultValue(statistic, broker, kwargs))) + self._statistics.append((statistic, self._session._defaultValue(statistic, self._broker, kwargs))) + + def v2Init(self, omap, agentName): + if omap.__class__ != dict: + raise Exception("QMFv2 object data must be a map/dict") + if '_values' not in omap: + raise Exception("QMFv2 object must have '_values' element") + + values = omap['_values'] + for prop in self._schema.getProperties(): + if prop.name in values: + self._properties.append((prop, values[prop.name])) + for stat in self._schema.getStatistics(): + if stat.name in values: + self._statistics.append((stat, values[stat.name])) + if '_subtypes' in omap: + self._subtypes = omap['_subtypes'] + if '_object_id' in omap: + self._objectId = ObjectId(omap['_object_id'], agentName=agentName) + else: + self._objectId = None def getBroker(self): """ Return the broker from which this object was sent """ @@ -186,7 +225,7 @@ class Object(object): def isManaged(self): """ Return True iff this object is a proxy for a managed object on an agent. """ - return self._managed + return self._objectId and self._agent def getIndex(self): """ Return a string describing this object's primary key. """ @@ -225,7 +264,7 @@ class Object(object): """ Contact the agent and retrieve the lastest property and statistic values for this object. """ if not self.isManaged(): raise Exception("Object is not managed") - obj = self._session.getObjects(_objectId = self._objectId, _broker=self._broker) + obj = self._agent.getObjects(_objectId=self._objectId) if obj: self.mergeUpdate(obj[0]) else: @@ -244,17 +283,17 @@ class Object(object): for method in self._schema.getMethods(): if name == method.name: return lambda *args, **kwargs : self._invoke(name, args, kwargs) - for property, value in self._properties: - if name == property.name: + for prop, value in self._properties: + if name == prop.name: return value - if name == "_" + property.name + "_" and property.type == 10: # Dereference references - deref = self._session.getObjects(_objectId=value, _broker=self._broker) + if name == "_" + prop.name + "_" and prop.type == 10: # Dereference references + deref = self._agent.getObjects(_objectId=value) if len(deref) != 1: return None else: return deref[0] - for statistic, value in self._statistics: - if name == statistic.name: + for stat, value in self._statistics: + if name == stat.name: return value raise Exception("Type Object has no attribute '%s'" % name) @@ -282,10 +321,6 @@ class Object(object): aIdx = 0 sendCodec = Codec() seq = self._session.seqMgr._reserve((method, synchronous)) - self._broker._setHeader(sendCodec, 'M', seq) - self._objectId.encode(sendCodec) - self._schema.getKey().encode(sendCodec) - sendCodec.write_str8(name) count = 0 for arg in method.arguments: @@ -294,24 +329,64 @@ class Object(object): if count != len(args): raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args))) - for arg in method.arguments: - if arg.dir.find("I") != -1: - self._session._encodeValue(sendCodec, args[aIdx], arg.type) - aIdx += 1 - if timeWait: - ttl = timeWait * 1000 + if self._agent.isV2: + # + # Compose and send a QMFv2 method request + # + call = {} + call['_object_id'] = self._objectId.asMap() + call['_method_name'] = name + argMap = {} + for arg in method.arguments: + if arg.dir.find("I") != -1: + argMap[arg.name] = args[aIdx] + aIdx += 1 + call['_arguments'] = argMap + + dp = self._broker.amqpSession.delivery_properties() + dp.routing_key = self._objectId.getAgentBank() + mp = self._broker.amqpSession.message_properties() + mp.content_type = "amqp/map" + mp.user_id = self._broker.authUser + mp.correlation_id = str(seq) + mp.app_id = "qmf2" + mp.reply_to = self._broker.amqpSession.reply_to("qmf.default.direct", self._broker.v2_queue_name) + mp.application_headers = {'qmf.opcode':'_method_request'} + sendCodec.write_map(call) + smsg = Message(dp, mp, sendCodec.encoded) + exchange = "qmf.default.direct" + else: - ttl = None - smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" % - (self._objectId.getBrokerBank(), self._objectId.getAgentBank()), - ttl=ttl) + # + # Associate this sequence with the agent hosting the object so we can correctly + # route the method-response + # + agent = self._broker.getAgent(self._broker.getBrokerBank(), self._objectId.getAgentBank()) + self._broker._setSequence(seq, agent) + + # + # Compose and send a QMFv1 method request + # + self._broker._setHeader(sendCodec, 'M', seq) + self._objectId.encode(sendCodec) + self._schema.getKey().encode(sendCodec) + sendCodec.write_str8(name) + + for arg in method.arguments: + if arg.dir.find("I") != -1: + self._session._encodeValue(sendCodec, args[aIdx], arg.type) + aIdx += 1 + smsg = self._broker._message(sendCodec.encoded, "agent.%d.%s" % + (self._objectId.getBrokerBank(), self._objectId.getAgentBank())) + exchange = "qpid.management" + if synchronous: try: self._broker.cv.acquire() self._broker.syncInFlight = True finally: self._broker.cv.release() - self._broker._send(smsg) + self._broker._send(smsg, exchange) return seq return None @@ -352,7 +427,6 @@ class Object(object): raise Exception("Invalid Method (software defect) [%s]" % name) def _encodeUnmanaged(self, codec): - codec.write_uint8(20) codec.write_str8(self._schema.getKey().getPackageName()) codec.write_str8(self._schema.getKey().getClassName()) @@ -399,6 +473,10 @@ class Object(object): bit = 0 return excludeList + +#=================================================================================================== +# Session +#=================================================================================================== class Session: """ An instance of the Session class represents a console session running @@ -423,6 +501,7 @@ class Session: list: 21 } + def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True, manageConnections=False, userBindings=False): """ @@ -450,7 +529,7 @@ class Session: """ self.console = console self.brokers = [] - self.packages = {} + self.schemaCache = SchemaCache() self.seqMgr = SequenceManager() self.cv = Condition() self.syncSequenceList = [] @@ -471,9 +550,35 @@ class Session: if self.userBindings and not self.rcvObjects: raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided") + + def _getBrokerForAgentAddr(self, agent_addr): + try: + self.cv.acquire() + key = (1, agent_addr) + for b in self.brokers: + if key in b.agents: + return b + finally: + self.cv.release() + return None + + + def _getAgentForAgentAddr(self, agent_addr): + try: + self.cv.acquire() + key = agent_addr + for b in self.brokers: + if key in b.agents: + return b.agents[key] + finally: + self.cv.release() + return None + + def __repr__(self): return "QMF Console Session Manager (brokers: %d)" % len(self.brokers) + def addBroker(self, target="localhost", timeout=None, mechanisms=None): """ Connect to a Qpid broker. Returns an object of type Broker. """ url = BrokerURL(target) @@ -482,9 +587,12 @@ class Session: self.brokers.append(broker) if not self.manageConnections: - self.getObjects(broker=broker, _class="agent") + agent = broker.getAgent(1,0) + if agent: + agent.getObjects(_class="agent") return broker + def delBroker(self, broker): """ Disconnect from a broker. The 'broker' argument is the object returned from the addBroker call """ @@ -495,34 +603,27 @@ class Session: self.brokers.remove(broker) del broker + def getPackages(self): """ Get the list of known QMF packages """ for broker in self.brokers: broker._waitForStable() - list = [] - for package in self.packages: - list.append(package) - return list + return self.schemaCache.getPackages() + def getClasses(self, packageName): """ Get the list of known classes within a QMF package """ for broker in self.brokers: broker._waitForStable() - list = [] - if packageName in self.packages: - for pkey in self.packages[packageName]: - list.append(self.packages[packageName][pkey].getKey()) - return list + return self.schemaCache.getClasses(packageName) + def getSchema(self, classKey): """ Get the schema for a QMF class """ for broker in self.brokers: broker._waitForStable() - pname = classKey.getPackageName() - pkey = classKey.getPackageKey() - if pname in self.packages: - if pkey in self.packages[pname]: - return self.packages[pname][pkey] + return self.schemaCache.getSchema(classKey) + def bindPackage(self, packageName): """ Request object updates for all table classes within a package. """ @@ -535,6 +636,7 @@ class Session: broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=key) + def bindClass(self, pname, cname): """ Request object updates for a particular table class by package and class name. """ if not self.userBindings or not self.rcvObjects: @@ -545,6 +647,7 @@ class Session: if broker.isConnected(): broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=key) + def bindClassKey(self, classKey): """ Request object updates for a particular table class by class key. """ @@ -552,6 +655,7 @@ class Session: cname = classKey.getClassName() self.bindClass(pname, cname) + def getAgents(self, broker=None): """ Get a list of currently known agents """ brokerList = [] @@ -569,12 +673,14 @@ class Session: agentList.append(a) return agentList - def makeObject(self, classKey, broker=None, **kwargs): + + def makeObject(self, classKey, **kwargs): """ Create a new, unmanaged object of the schema indicated by classKey """ schema = self.getSchema(classKey) if schema == None: raise Exception("Schema not found for classKey") - return Object(self, broker, schema, None, True, True, False, kwargs) + return Object(None, schema, None, True, True, kwargs) + def getObjects(self, **kwargs): """ Get a list of objects from QMF agents. @@ -644,81 +750,24 @@ class Session: if len(agentList) == 0: return [] - pname = None - cname = None - hash = None - classKey = None - if "_schema" in kwargs: classKey = kwargs["_schema"].getKey() - elif "_key" in kwargs: classKey = kwargs["_key"] - elif "_class" in kwargs: - cname = kwargs["_class"] - if "_package" in kwargs: - pname = kwargs["_package"] - if cname == None and classKey == None and "_objectId" not in kwargs: - raise Exception("No class supplied, use '_schema', '_key', '_class', or '_objectId' argument") - - map = {} - self.getSelect = [] - if "_objectId" in kwargs: - map["_objectid"] = kwargs["_objectId"].__repr__() - else: - if cname == None: - cname = classKey.getClassName() - pname = classKey.getPackageName() - hash = classKey.getHash() - map["_class"] = cname - if pname != None: map["_package"] = pname - if hash != None: map["_hash"] = hash - for item in kwargs: - if item[0] != '_': - self.getSelect.append((item, kwargs[item])) - - self.getResult = [] + # + # We now have a list of agents to query, start the queries and gather the results. + # + request = SessionGetRequest(len(agentList)) for agent in agentList: - broker = agent.broker - sendCodec = Codec() - try: - self.cv.acquire() - seq = self.seqMgr._reserve(self._CONTEXT_MULTIGET) - self.syncSequenceList.append(seq) - finally: - self.cv.release() - broker._setHeader(sendCodec, 'G', seq) - sendCodec.write_map(map) - smsg = broker._message(sendCodec.encoded, "agent.%d.%d" % (agent.brokerBank, agent.agentBank)) - broker._send(smsg) + agent.getObjects(request, **kwargs) + timeout = 60 + if '_timeout' in kwargs: + timeout = kwargs['_timeout'] + request.wait(timeout) + return request.result - starttime = time() - timeout = False - if "_timeout" in kwargs: - waitTime = kwargs["_timeout"] - else: - waitTime = self.DEFAULT_GET_WAIT_TIME - try: - self.cv.acquire() - while len(self.syncSequenceList) > 0 and self.error == None: - self.cv.wait(waitTime) - if time() - starttime > waitTime: - for pendingSeq in self.syncSequenceList: - self.seqMgr._release(pendingSeq) - self.syncSequenceList = [] - timeout = True - finally: - self.cv.release() - - if self.error: - errorText = self.error - self.error = None - raise Exception(errorText) - - if len(self.getResult) == 0 and timeout: - raise RuntimeError("No agent responded within timeout period") - return self.getResult def setEventFilter(self, **kwargs): """ """ pass + def _bindingKeys(self): keyList = [] keyList.append("schema.#") @@ -735,18 +784,21 @@ class Session: keyList.append("console.heartbeat.#") return keyList + def _handleBrokerConnect(self, broker): if self.console: for agent in broker.getAgents(): self.console.newAgent(agent) self.console.brokerConnected(broker) + def _handleBrokerDisconnect(self, broker): if self.console: for agent in broker.getAgents(): self.console.delAgent(agent) self.console.brokerDisconnected(broker) + def _handleBrokerResp(self, broker, codec, seq): broker.brokerId = codec.read_uuid() if self.console != None: @@ -760,16 +812,10 @@ class Session: smsg = broker._message(sendCodec.encoded) broker._send(smsg) + def _handlePackageInd(self, broker, codec, seq): pname = str(codec.read_str8()) - notify = False - try: - self.cv.acquire() - if pname not in self.packages: - self.packages[pname] = {} - notify = True - finally: - self.cv.release() + notify = self.schemaCache.declarePackage(pname) if notify and self.console != None: self.console.newPackage(pname) @@ -782,7 +828,8 @@ class Session: smsg = broker._message(sendCodec.encoded) broker._send(smsg) - def _handleCommandComplete(self, broker, codec, seq): + + def _handleCommandComplete(self, broker, codec, seq, agent): code = codec.read_uint32() text = codec.read_str8() context = self.seqMgr._release(seq) @@ -804,20 +851,16 @@ class Session: finally: self.cv.release() + if agent: + agent._handleV1Completion(seq, code, text) + + def _handleClassInd(self, broker, codec, seq): kind = codec.read_uint8() classKey = ClassKey(codec) - unknown = False + schema = self.schemaCache.getSchema(classKey) - try: - self.cv.acquire() - if classKey.getPackageName() in self.packages: - if classKey.getPackageKey() not in self.packages[classKey.getPackageName()]: - unknown = True - finally: - self.cv.release() - - if unknown: + if not schema: # Send a schema request for the unknown class broker._incOutstanding() sendCodec = Codec() @@ -827,30 +870,6 @@ class Session: smsg = broker._message(sendCodec.encoded) broker._send(smsg) - def _handleMethodResp(self, broker, codec, seq): - code = codec.read_uint32() - text = codec.read_str16() - outArgs = {} - pair = self.seqMgr._release(seq) - if pair == None: - return - method, synchronous = pair - if code == 0: - for arg in method.arguments: - if arg.dir.find("O") != -1: - outArgs[arg.name] = self._decodeValue(codec, arg.type, broker) - result = MethodResult(code, text, outArgs) - if synchronous: - try: - broker.cv.acquire() - broker.syncResult = result - broker.syncInFlight = False - broker.cv.notify() - finally: - broker.cv.release() - else: - if self.console: - self.console.methodResponse(broker, seq, result) def _handleHeartbeatInd(self, broker, codec, seq, msg): brokerBank = 1 @@ -873,59 +892,49 @@ class Session: timestamp = codec.read_uint64() if self.console != None and agent != None: self.console.heartbeat(agent, timestamp) + broker._ageAgents() - def _handleEventInd(self, broker, codec, seq): - if self.console != None: - event = Event(self, broker, codec) - self.console.event(broker, event) - def _handleSchemaResp(self, broker, codec, seq): + def _handleSchemaResp(self, broker, codec, seq, agent_addr): kind = codec.read_uint8() classKey = ClassKey(codec) _class = SchemaClass(kind, classKey, codec, self) - try: - self.cv.acquire() - self.packages[classKey.getPackageName()][classKey.getPackageKey()] = _class - finally: - self.cv.release() - - self.seqMgr._release(seq) - broker._decOutstanding() + self.schemaCache.declareClass(classKey, _class) + ctx = self.seqMgr._release(seq) + if ctx: + broker._decOutstanding() if self.console != None: self.console.newClass(kind, classKey) - def _handleContentInd(self, broker, codec, seq, prop=False, stat=False): - classKey = ClassKey(codec) - try: - self.cv.acquire() - pname = classKey.getPackageName() - if pname not in self.packages: - return - pkey = classKey.getPackageKey() - if pkey not in self.packages[pname]: - return - schema = self.packages[pname][pkey] - finally: - self.cv.release() + if agent_addr and (agent_addr.__class__ == str or agent_addr.__class__ == unicode): + agent = self._getAgentForAgentAddr(agent_addr) + if agent: + agent._schemaInfoFromV2Agent() - object = Object(self, broker, schema, codec, prop, stat) - if pname == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop: - broker._updateAgent(object) + def _v2HandleHeartbeatInd(self, broker, mp, ah, content): try: - self.cv.acquire() - if seq in self.syncSequenceList: - if object.getTimestamps()[2] == 0 and self._selectMatch(object): - self.getResult.append(object) - return - finally: - self.cv.release() + agentName = ah["qmf.agent"] + values = content["_values"] + timestamp = values["timestamp"] + interval = values["heartbeat_interval"] + except: + return + + agent = broker.getAgent(1, agentName) + if agent == None: + agent = Agent(broker, agentName, "QMFv2 Agent", True, interval) + broker._addAgent(agentName, agent) + else: + agent.touch() + if self.console and agent: + self.console.heartbeat(agent, timestamp) + broker._ageAgents() + + + def _v2HandleAgentLocateRsp(self, broker, mp, ah, content): + self._v2HandleHeartbeatInd(broker, mp, ah, content) - if self.console and self.rcvObjects: - if prop: - self.console.objectProps(broker, object) - if stat: - self.console.objectStats(broker, object) def _handleError(self, error): try: @@ -937,6 +946,7 @@ class Session: finally: self.cv.release() + def _selectMatch(self, object): """ Check the object against self.getSelect to check for a match """ for key, value in self.getSelect: @@ -945,6 +955,7 @@ class Session: return False return True + def _decodeValue(self, codec, typecode, broker=None): """ Decode, from the codec, a value based on its typecode. """ if typecode == 1: data = codec.read_uint8() # U8 @@ -971,17 +982,9 @@ class Session: inner_type_code = codec.read_uint8() if inner_type_code == 20: classKey = ClassKey(codec) - try: - self.cv.acquire() - pname = classKey.getPackageName() - if pname not in self.packages: - return None - pkey = classKey.getPackageKey() - if pkey not in self.packages[pname]: - return None - schema = self.packages[pname][pkey] - finally: - self.cv.release() + schema = self.schemaCache.getSchema(classKey) + if not schema: + return None data = Object(self, broker, schema, codec, True, True, False) else: data = self._decodeValue(codec, inner_type_code, broker) @@ -999,6 +1002,7 @@ class Session: raise ValueError("Invalid type code: %d" % typecode) return data + def _encodeValue(self, codec, value, typecode): """ Encode, into the codec, a value based on its typecode. """ if typecode == 1: codec.write_uint8 (int(value)) # U8 @@ -1033,9 +1037,11 @@ class Session: else: raise ValueError ("Invalid type code: %d" % typecode) + def encoding(self, value): return self._encoding(value.__class__) + def _encoding(self, klass): if Session.ENCODINGS.has_key(klass): return self.ENCODINGS[klass] @@ -1044,6 +1050,7 @@ class Session: if result != None: return result + def _displayValue(self, value, typecode): """ """ if typecode == 1: return unicode(value) @@ -1072,6 +1079,7 @@ class Session: else: raise ValueError ("Invalid type code: %d" % typecode) + def _defaultValue(self, stype, broker=None, kwargs={}): """ """ typecode = stype.type @@ -1110,6 +1118,7 @@ class Session: else: raise ValueError ("Invalid type code: %d" % typecode) + def _bestClassKey(self, pname, cname, preferredList): """ """ if pname == None or cname == None: @@ -1125,6 +1134,7 @@ class Session: return c return None + def _sendMethodRequest(self, broker, schemaKey, objectId, name, argList): """ This function can be used to send a method request to an object given only the broker, schemaKey, and objectId. This is an uncommon usage pattern as methods are @@ -1133,14 +1143,10 @@ class Session: schema = self.getSchema(schemaKey) for method in schema.getMethods(): if name == method.name: - aIdx = 0 - sendCodec = Codec() - seq = self.seqMgr._reserve((method, False)) - broker._setHeader(sendCodec, 'M', seq) - objectId.encode(sendCodec) - schemaKey.encode(sendCodec) - sendCodec.write_str8(name) - + # + # Count the arguments supplied and validate that the number is what is expected + # based on the schema. + # count = 0 for arg in method.arguments: if arg.dir.find("I") != -1: @@ -1148,25 +1154,192 @@ class Session: if count != len(argList): raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(argList))) - for arg in method.arguments: - if arg.dir.find("I") != -1: - self._encodeValue(sendCodec, argList[aIdx], arg.type) - aIdx += 1 - smsg = broker._message(sendCodec.encoded, "agent.%d.%d" % - (objectId.getBrokerBank(), objectId.getAgentBank())) - broker._send(smsg) + aIdx = 0 + sendCodec = Codec() + seq = self.seqMgr._reserve((method, False)) + + if objectId.isV2(): + # + # Compose and send a QMFv2 method request + # + call = {} + call['_object_id'] = objectId.asMap() + call['_method_name'] = name + args = {} + for arg in method.arguments: + if arg.dir.find("I") != -1: + args[arg.name] = argList[aIdx] + aIdx += 1 + call['_arguments'] = args + + dp = broker.amqpSession.delivery_properties() + dp.routing_key = objectId.getAgentBank() + mp = broker.amqpSession.message_properties() + mp.content_type = "amqp/map" + mp.user_id = broker.authUser + mp.correlation_id = str(seq) + mp.app_id = "qmf2" + mp.reply_to = broker.amqpSession.reply_to("qmf.default.direct", broker.v2_queue_name) + mp.application_headers = {'qmf.opcode':'_method_request'} + sendCodec.write_map(call) + msg = Message(dp, mp, sendCodec.encoded) + broker._send(msg, "qmf.default.direct") + + else: + # + # Associate this sequence with the agent hosting the object so we can correctly + # route the method-response + # + agent = broker.getAgent(broker.getBrokerBank(), objectId.getAgentBank()) + broker._setSequence(seq, agent) + + # + # Compose and send a QMFv1 method request + # + broker._setHeader(sendCodec, 'M', seq) + objectId.encode(sendCodec) + schemaKey.encode(sendCodec) + sendCodec.write_str8(name) + + for arg in method.arguments: + if arg.dir.find("I") != -1: + self._encodeValue(sendCodec, argList[aIdx], arg.type) + aIdx += 1 + smsg = broker._message(sendCodec.encoded, "agent.%d.%s" % + (objectId.getBrokerBank(), objectId.getAgentBank())) + broker._send(smsg) return seq return None -class Package: - """ """ - def __init__(self, name): - self.name = name +#=================================================================================================== +# SessionGetRequest +#=================================================================================================== +class SessionGetRequest(object): + """ + This class is used to track get-object queries at the Session level. + """ + def __init__(self, agentCount): + self.agentCount = agentCount + self.result = [] + self.cv = Condition() + self.waiting = True + + def __call__(self, **kwargs): + """ + Callable entry point for gathering collected objects. + """ + try: + self.cv.acquire() + if 'qmf_object' in kwargs: + self.result.append(kwargs['qmf_object']) + elif 'qmf_complete' in kwargs or 'qmf_exception' in kwargs: + self.agentCount -= 1 + if self.agentCount == 0: + self.waiting = None + self.cv.notify() + finally: + self.cv.release() + + def wait(self, timeout): + starttime = time() + try: + self.cv.acquire() + while self.waiting: + if (time() - starttime) > timeout: + raise Exception("Timed out after %d seconds" % timeout) + self.cv.wait(1) + finally: + self.cv.release() + + +#=================================================================================================== +# SchemaCache +#=================================================================================================== +class SchemaCache(object): + """ + The SchemaCache is a data structure that stores learned schema information. + """ + def __init__(self): + """ + Create a map of schema packages and a lock to protect this data structure. + Note that this lock is at the bottom of any lock hierarchy. If it is held, no other + lock in the system should attempt to be acquired. + """ + self.packages = {} + self.lock = Lock() + + def getPackages(self): + """ Get the list of known QMF packages """ + list = [] + try: + self.lock.acquire() + for package in self.packages: + list.append(package) + finally: + self.lock.release() + return list + + def getClasses(self, packageName): + """ Get the list of known classes within a QMF package """ + list = [] + try: + self.lock.acquire() + if packageName in self.packages: + for pkey in self.packages[packageName]: + list.append(self.packages[packageName][pkey].getKey()) + finally: + self.lock.release() + return list + + def getSchema(self, classKey): + """ Get the schema for a QMF class """ + pname = classKey.getPackageName() + pkey = classKey.getPackageKey() + try: + self.lock.acquire() + if pname in self.packages: + if pkey in self.packages[pname]: + return self.packages[pname][pkey] + finally: + self.lock.release() + return None + + def declarePackage(self, pname): + """ Maybe add a package to the cache. Return True if package was added, None if it pre-existed. """ + try: + self.lock.acquire() + if pname in self.packages: + return None + self.packages[pname] = {} + finally: + self.lock.release() + return True + + def declareClass(self, classKey, classDef): + """ Maybe add a class definition to the cache. Return True if added, None if pre-existed. """ + pname = classKey.getPackageName() + pkey = classKey.getPackageKey() + try: + self.lock.acquire() + if pname not in self.packages: + self.packages[pname] = {} + packageMap = self.packages[pname] + if pkey in packageMap: + return None + packageMap[pkey] = classDef + finally: + self.lock.release() + return True + + +#=================================================================================================== +# ClassKey +#=================================================================================================== class ClassKey: """ A ClassKey uniquely identifies a class from the schema. """ def __init__(self, constructor): - if type(constructor) == str: + if constructor.__class__ == str: # construct from __repr__ string try: self.pname, cls = constructor.split(":") @@ -1177,20 +1350,33 @@ class ClassKey: h1 = int(hexValues[1], 16) h2 = int(hexValues[2], 16) h3 = int(hexValues[3], 16) - self.hash = struct.pack("!LLLL", h0, h1, h2, h3) + h4 = int(hexValues[4][0:4], 16) + h5 = int(hexValues[4][4:12], 16) + self.hash = UUID(struct.pack("!LHHHHL", h0, h1, h2, h3, h4, h5)) except: raise Exception("Invalid ClassKey format") + elif constructor.__class__ == dict: + # construct from QMFv2 map + try: + self.pname = constructor['_package_name'] + self.cname = constructor['_class_name'] + self.hash = constructor['_hash'] + except: + raise Exception("Invalid ClassKey map format") else: # construct from codec codec = constructor self.pname = str(codec.read_str8()) self.cname = str(codec.read_str8()) - self.hash = codec.read_bin128() + self.hash = UUID(codec.read_bin128()) def encode(self, codec): codec.write_str8(self.pname) codec.write_str8(self.cname) - codec.write_bin128(self.hash) + codec.write_bin128(self.hash.bytes) + + def asMap(self): + return {'_package_name': self.pname, '_class_name': self.cname, '_hash': self.hash} def getPackageName(self): return self.pname @@ -1202,7 +1388,7 @@ class ClassKey: return self.hash def getHashString(self): - return "%08x-%08x-%08x-%08x" % struct.unpack ("!LLLL", self.hash) + return str(self.hash) def getPackageKey(self): return (self.cname, self.hash) @@ -1210,6 +1396,10 @@ class ClassKey: def __repr__(self): return self.pname + ":" + self.cname + "(" + self.getHashString() + ")" + +#=================================================================================================== +# SchemaClass +#=================================================================================================== class SchemaClass: """ """ CLASS_KIND_TABLE = 1 @@ -1292,6 +1482,10 @@ class SchemaClass: else: return self.arguments + self.session.getSchema(self.superTypeKey).getArguments() + +#=================================================================================================== +# SchemaProperty +#=================================================================================================== class SchemaProperty: """ """ def __init__(self, codec): @@ -1321,6 +1515,10 @@ class SchemaProperty: def __repr__(self): return self.name + +#=================================================================================================== +# SchemaStatistic +#=================================================================================================== class SchemaStatistic: """ """ def __init__(self, codec): @@ -1337,6 +1535,10 @@ class SchemaStatistic: def __repr__(self): return self.name + +#=================================================================================================== +# SchemaMethod +#=================================================================================================== class SchemaMethod: """ """ def __init__(self, codec): @@ -1365,6 +1567,10 @@ class SchemaMethod: result += ")" return result + +#=================================================================================================== +# SchemaArgument +#=================================================================================================== class SchemaArgument: """ """ def __init__(self, codec, methodArg): @@ -1392,64 +1598,113 @@ class SchemaArgument: elif key == "refPackage" : self.refPackage = value elif key == "refClass" : self.refClass = value + +#=================================================================================================== +# ObjectId +#=================================================================================================== class ObjectId: """ Object that represents QMF object identifiers """ - def __init__(self, codec, first=0, second=0): - if codec: - self.first = codec.read_uint64() - self.second = codec.read_uint64() + def __init__(self, constructor, first=0, second=0, agentName=None): + if constructor.__class__ == dict: + self.agentName = agentName + self.agentEpoch = 0 + if '_agent_name' in constructor: self.agentName = constructor['_agent_name'] + if '_agent_epoch' in constructor: self.agentEpoch = constructor['_agent_epoch'] + if '_object_name' not in constructor: + raise Exception("QMFv2 OBJECT_ID must have the '_object_name' field.") + self.objectName = constructor['_object_name'] else: - self.first = first - self.second = second + if not constructor: + first = first + second = second + else: + first = constructor.read_uint64() + second = constructor.read_uint64() + self.agentName = str(first & 0x000000000FFFFFFF) + self.agentEpoch = (first & 0x0FFF000000000000) >> 48 + self.objectName = str(second) def __cmp__(self, other): if other == None or not isinstance(other, ObjectId) : return 1 - if self.first < other.first: + + if self.objectName < other.objectName: + return -1 + if self.objectName > other.objectName: + return 1 + + if self.agentName < other.agentName: return -1 - if self.first > other.first: + if self.agentName > other.agentName: return 1 - if self.second < other.second: + + if self.agentEpoch < other.agentEpoch: return -1 - if self.second > other.second: + if self.agentEpoch > other.agentEpoch: return 1 return 0 def __repr__(self): - return "%d-%d-%d-%d-%d" % (self.getFlags(), self.getSequence(), + return "%d-%d-%d-%s-%s" % (self.getFlags(), self.getSequence(), self.getBrokerBank(), self.getAgentBank(), self.getObject()) + def isV2(self): + return not self.agentName.isdigit() + def index(self): - return (self.first, self.second) + return self.__repr__() def getFlags(self): - return (self.first & 0xF000000000000000) >> 60 + return 0 def getSequence(self): - return (self.first & 0x0FFF000000000000) >> 48 + return self.agentEpoch def getBrokerBank(self): - return (self.first & 0x0000FFFFF0000000) >> 28 + return 1 def getAgentBank(self): - return self.first & 0x000000000FFFFFFF + return self.agentName def getObject(self): - return self.second + return self.objectName def isDurable(self): return self.getSequence() == 0 def encode(self, codec): - codec.write_uint64(self.first) - codec.write_uint64(self.second) + first = (self.agentEpoch << 48) + (1 << 28) + second = 0 + + try: + first += int(self.agentName) + except: + pass + + try: + second = int(self.objectName) + except: + pass + + codec.write_uint64(first) + codec.write_uint64(second) + + def asMap(self): + omap = {'_agent_name': self.agentName, '_object_name': self.objectName} + if self.agentEpoch != 0: + omap['_agent_epoch'] = self.agentEpoch + return omap def __hash__(self): - return (self.first, self.second).__hash__() + return self.__repr__().__hash__() def __eq__(self, other): - return (self.first, self.second).__eq__(other) + return self.__repr__().__eq__(other) + +#=================================================================================================== +# MethodResult +#=================================================================================================== class MethodResult(object): """ """ def __init__(self, status, text, outArgs): @@ -1465,6 +1720,10 @@ class MethodResult(object): def __repr__(self): return "%s (%d) - %s" % (self.text, self.status, self.outArgs) + +#=================================================================================================== +# ManagedConnection +#=================================================================================================== class ManagedConnection(Thread): """ Thread class for managing a connection. """ DELAY_MIN = 1 @@ -1527,6 +1786,10 @@ class ManagedConnection(Thread): finally: self.cv.release() + +#=================================================================================================== +# Broker +#=================================================================================================== class Broker: """ This object represents a connection (or potential connection) to a QMF broker. """ SYNC_TIME = 60 @@ -1542,6 +1805,7 @@ class Broker: self.authUser = authUser self.authPass = authPass self.cv = Condition() + self.seqToAgentMap = {} self.error = None self.brokerId = None self.connected = False @@ -1574,9 +1838,13 @@ class Broker: def getAgent(self, brokerBank, agentBank): """ Return the agent object associated with a particular broker and agent bank value.""" - bankKey = (brokerBank, agentBank) - if bankKey in self.agents: - return self.agents[bankKey] + bankKey = str(agentBank) + try: + self.cv.acquire() + if bankKey in self.agents: + return self.agents[bankKey] + finally: + self.cv.release() return None def getSessionId(self): @@ -1585,7 +1853,11 @@ class Broker: def getAgents(self): """ Get the list of agents reachable via this broker """ - return self.agents.values() + try: + self.cv.acquire() + return self.agents.values() + finally: + self.cv.release() def getAmqpSession(self): """ Get the AMQP session object for this connected broker. """ @@ -1612,10 +1884,29 @@ class Broker: else: return "Disconnected Broker" + def _setSequence(self, sequence, agent): + try: + self.cv.acquire() + self.seqToAgentMap[sequence] = agent + finally: + self.cv.release() + + def _clearSequence(self, sequence): + try: + self.cv.acquire() + self.seqToAgentMap.pop(sequence) + finally: + self.cv.release() + def _tryToConnect(self): try: - self.agents = {} - self.agents[(1,0)] = Agent(self, 0, "BrokerAgent") + try: + self.cv.acquire() + self.agents = {} + self.agents['0'] = Agent(self, 0, "BrokerAgent") + finally: + self.cv.release() + self.topicBound = False self.syncInFlight = False self.syncRequest = 0 @@ -1649,7 +1940,7 @@ class Broker: self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest", accept_mode=self.amqpSession.accept_mode.none, acquire_mode=self.amqpSession.acquire_mode.pre_acquired) - self.amqpSession.incoming("rdest").listen(self._replyCb, self._exceptionCb) + self.amqpSession.incoming("rdest").listen(self._v1Cb, self._exceptionCb) self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1) self.amqpSession.message_flow(destination="rdest", unit=0, value=0xFFFFFFFFL) self.amqpSession.message_flow(destination="rdest", unit=1, value=0xFFFFFFFFL) @@ -1659,11 +1950,29 @@ class Broker: self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest", accept_mode=self.amqpSession.accept_mode.none, acquire_mode=self.amqpSession.acquire_mode.pre_acquired) - self.amqpSession.incoming("tdest").listen(self._replyCb) + self.amqpSession.incoming("tdest").listen(self._v1Cb) self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1) self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFFL) self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFFL) + ## + ## Set up connectivity for QMFv2 + ## + self.v2_queue_name = "qmfc-v2-%s" % self.amqpSessionId + self.amqpSession.queue_declare(queue=self.v2_queue_name, exclusive=True, auto_delete=True) + self.amqpSession.exchange_bind(exchange="qmf.default.direct", + queue=self.v2_queue_name, binding_key=self.v2_queue_name) + self.amqpSession.exchange_bind(exchange="qmf.default.topic", + queue=self.v2_queue_name, binding_key="agent.#") + ## Other bindings here... + self.amqpSession.message_subscribe(queue=self.v2_queue_name, destination="v2dest", + accept_mode=self.amqpSession.accept_mode.none, + acquire_mode=self.amqpSession.acquire_mode.pre_acquired) + self.amqpSession.incoming("v2dest").listen(self._v2Cb, self._exceptionCb) + self.amqpSession.message_set_flow_mode(destination="v2dest", flow_mode=1) + self.amqpSession.message_flow(destination="v2dest", unit=0, value=0xFFFFFFFFL) + self.amqpSession.message_flow(destination="v2dest", unit=1, value=0xFFFFFFFFL) + self.connected = True self.session._handleBrokerConnect(self) @@ -1671,6 +1980,7 @@ class Broker: self._setHeader(codec, 'B') msg = self._message(codec.encoded) self._send(msg) + self._v2SendAgentLocate() except socket.error, e: self.error = "Socket Error %s - %s" % (e.__class__.__name__, e) @@ -1683,18 +1993,73 @@ class Broker: raise def _updateAgent(self, obj): - bankKey = (obj.brokerBank, obj.agentBank) + bankKey = str(obj.agentBank) + agent = None if obj._deleteTime == 0: - if bankKey not in self.agents: - agent = Agent(self, obj.agentBank, obj.label) - self.agents[bankKey] = agent - if self.session.console != None: - self.session.console.newAgent(agent) + try: + self.cv.acquire() + if bankKey not in self.agents: + agent = Agent(self, obj.agentBank, obj.label) + self.agents[bankKey] = agent + finally: + self.cv.release() + if agent and self.session.console: + self.session.console.newAgent(agent) else: - agent = self.agents.pop(bankKey, None) - if agent != None and self.session.console != None: + try: + self.cv.acquire() + agent = self.agents.pop(bankKey, None) + if agent: + agent.close() + finally: + self.cv.release() + if agent and self.session.console: self.session.console.delAgent(agent) + def _addAgent(self, name, agent): + try: + self.cv.acquire() + self.agents[name] = agent + finally: + self.cv.release() + if self.session.console: + self.session.console.newAgent(agent) + + def _ageAgents(self): + try: + self.cv.acquire() + to_delete = [] + to_notify = [] + for key in self.agents: + if self.agents[key].isOld(): + to_delete.append(key) + for key in to_delete: + agent = self.agents.pop(key) + agent.close() + to_notify.append(agent) + finally: + self.cv.release() + if self.session.console: + for agent in to_notify: + self.session.console.delAgent(agent) + + def _v2SendAgentLocate(self, predicate={}): + """ + Broadcast an agent-locate request to cause all agents in the domain to tell us who they are. + """ + dp = self.amqpSession.delivery_properties() + dp.routing_key = "console.request.agent_locate" + mp = self.amqpSession.message_properties() + mp.content_type = "amqp/map" + mp.user_id = self.authUser + mp.app_id = "qmf2" + mp.reply_to = self.amqpSession.reply_to("qmf.default.direct", self.v2_queue_name) + mp.application_headers = {'qmf.opcode':'_agent_locate_request'} + sendCodec = Codec() + sendCodec.write_map(predicate) + msg = Message(dp, mp, sendCodec.encoded) + self._send(msg, "qmf.default.topic") + def _setHeader(self, codec, opcode, seq=0): """ Compose the header of a management message. """ codec.write_uint8(ord('A')) @@ -1785,24 +2150,105 @@ class Broker: finally: self.cv.release() - def _replyCb(self, msg): + def _v1Cb(self, msg): + try: + self._v1CbProtected(msg) + except Exception, e: + print "EXCEPTION in Broker._v1Cb:", e + + def _v1CbProtected(self, msg): + """ + This is the general message handler for messages received via the QMFv1 exchanges. + """ + agent = None + agent_addr = None + mp = msg.get("message_properties") + ah = mp.application_headers + if ah and 'qmf.agent' in ah: + agent_addr = ah['qmf.agent'] + + if not agent_addr: + # + # See if we can determine the agent identity from the routing key + # + dp = msg.get("delivery_properties") + rkey = None + if dp.routing_key: + rkey = dp.routing_key + items = rkey.split('.') + if len(items) >= 4: + if items[0] == 'console' and items[3].isdigit(): + agent_addr = str(items[3]) # The QMFv1 Agent Bank + if agent_addr != None and agent_addr in self.agents: + agent = self.agents[agent_addr] + codec = Codec(msg.body) + alreadyTried = None while True: opcode, seq = self._checkHeader(codec) + + if not agent and not alreadyTried: + alreadyTried = True + try: + self.cv.acquire() + if seq in self.seqToAgentMap: + agent = self.seqToAgentMap[seq] + finally: + self.cv.release() + if opcode == None: return if opcode == 'b': self.session._handleBrokerResp (self, codec, seq) elif opcode == 'p': self.session._handlePackageInd (self, codec, seq) - elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq) elif opcode == 'q': self.session._handleClassInd (self, codec, seq) - elif opcode == 'm': self.session._handleMethodResp (self, codec, seq) + elif opcode == 's': self.session._handleSchemaResp (self, codec, seq, agent_addr) elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg) - elif opcode == 'e': self.session._handleEventInd (self, codec, seq) - elif opcode == 's': self.session._handleSchemaResp (self, codec, seq) - elif opcode == 'c': self.session._handleContentInd (self, codec, seq, prop=True) - elif opcode == 'i': self.session._handleContentInd (self, codec, seq, stat=True) - elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True) - self.session.receiver._completed.add(msg.id) - self.session.channel.session_completed(self.session.receiver._completed) + elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq, agent) + elif agent: + agent._handleQmfV1Message(opcode, seq, mp, ah, codec) + + self.amqpSession.receiver._completed.add(msg.id) + self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed) + + def _v2Cb(self, msg): + """ + This is the general message handler for messages received via QMFv2 exchanges. + """ + mp = msg.get("message_properties") + ah = mp["application_headers"] + codec = Codec(msg.body) + + if 'qmf.opcode' in ah: + opcode = ah['qmf.opcode'] + if mp.content_type == "amqp/list": + content = codec.read_list() + if not content: + content = [] + elif mp.content_type == "amqp/map": + content = codec.read_map() + if not content: + content = {} + else: + content = None + + if content != None: + ## + ## Directly handle agent heartbeats and agent locate responses as these are broker-scope (they are + ## used to maintain the broker's list of agent proxies. + ## + if opcode == '_agent_heartbeat_indication': self.session._v2HandleHeartbeatInd(self, mp, ah, content) + elif opcode == '_agent_locate_response': self.session._v2HandleAgentLocateRsp(self, mp, ah, content) + else: + ## + ## All other opcodes are agent-scope and are forwarded to the agent proxy representing the sender + ## of the message. + ## + agent_addr = ah['qmf.agent'] + if agent_addr in self.agents: + agent = self.agents[agent_addr] + agent._handleQmfV2Message(opcode, mp, ah, content) + + self.amqpSession.receiver._completed.add(msg.id) + self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed) def _exceptionCb(self, data): self.connected = False @@ -1818,43 +2264,697 @@ class Broker: if self.thread: self.thread.disconnected() + +#=================================================================================================== +# Agent +#=================================================================================================== class Agent: - """ """ - def __init__(self, broker, agentBank, label): + """ + This class represents a proxy for a remote agent being managed + """ + def __init__(self, broker, agentBank, label, isV2=False, interval=0): self.broker = broker + self.session = broker.session + self.schemaCache = self.session.schemaCache self.brokerBank = broker.getBrokerBank() - self.agentBank = agentBank + self.agentBank = str(agentBank) self.label = label + self.isV2 = isV2 + self.heartbeatInterval = interval + self.lock = Lock() + self.seqMgr = self.session.seqMgr + self.contextMap = {} + self.unsolicitedContext = RequestContext(self, self) + self.lastSeenTime = time() + self.closed = None + + + def _checkClosed(self): + if self.closed: + raise Exception("Agent is disconnected") + + + def __call__(self, **kwargs): + """ + This is the handler for unsolicited stuff received from the agent + """ + if 'qmf_object' in kwargs: + if self.session.console: + self.session.console.objectProps(self.broker, kwargs['qmf_object']) + elif 'qmf_object_stats' in kwargs: + if self.session.console: + self.session.console.objectStats(self.broker, kwargs['qmf_object_stats']) + elif 'qmf_event' in kwargs: + if self.session.console: + self.session.console.event(self.broker, kwargs['qmf_event']) + + + def touch(self): + self.lastSeenTime = time() + + + def isOld(self): + if self.heartbeatInterval == 0: + return None + if time() - self.lastSeenTime > (2.0 * self.heartbeatInterval): + return True + return None + + + def close(self): + self.closed = True + copy = {} + try: + self.lock.acquire() + for seq in self.contextMap: + copy[seq] = self.contextMap[seq] + finally: + self.lock.release() + + for seq in copy: + context = copy[seq] + context.cancel("Agent disconnected") + def __repr__(self): - return "Agent at bank %d.%d (%s)" % (self.brokerBank, self.agentBank, self.label) + if self.isV2: + ver = "v2" + else: + ver = "v1" + return "Agent(%s) at bank %d.%s (%s)" % (ver, self.brokerBank, self.agentBank, self.label) + def getBroker(self): + self._checkClosed() return self.broker + def getBrokerBank(self): + self._checkClosed() return self.brokerBank + def getAgentBank(self): + self._checkClosed() return self.agentBank + + def getObjects(self, notifiable=None, **kwargs): + """ Get a list of objects from QMF agents. + All arguments are passed by name(keyword). + + If 'notifiable' is None (default), this call will block until completion or timeout. + If supplied, notifiable is assumed to be a callable object that will be called when the + list of queried objects arrives. The single argument to the call shall be a list of + the returned objects. + + The class for queried objects may be specified in one of the following ways: + + _schema = <schema> - supply a schema object returned from getSchema. + _key = <key> - supply a classKey from the list returned by getClasses. + _class = <name> - supply a class name as a string. If the class name exists + in multiple packages, a _package argument may also be supplied. + _objectId = <id> - get the object referenced by the object-id + + The default timeout for this synchronous operation is 60 seconds. To change the timeout, + use the following argument: + + _timeout = <time in seconds> + + If additional arguments are supplied, they are used as property selectors. For example, + if the argument name="test" is supplied, only objects whose "name" property is "test" + will be returned in the result. + """ + self._checkClosed() + if notifiable: + if not callable(notifiable): + raise Exception("notifiable object must be callable") + + # + # Isolate the selectors from the kwargs + # + selectors = {} + for key in kwargs: + value = kwargs[key] + if key[0] != '_': + selectors[key] = value + + # + # Allocate a context to track this asynchronous request. + # + context = RequestContext(self, notifiable, selectors) + sequence = self.seqMgr._reserve(context) + try: + self.lock.acquire() + self.contextMap[sequence] = context + context.setSequence(sequence) + finally: + self.lock.release() + + # + # Compose and send the query message to the agent using the appropriate protocol for the + # agent's QMF version. + # + if self.isV2: + self._v2SendGetQuery(sequence, kwargs) + else: + self.broker._setSequence(sequence, self) + self._v1SendGetQuery(sequence, kwargs) + + # + # If this is a synchronous call, block and wait for completion. + # + if not notifiable: + timeout = 60 + if '_timeout' in kwargs: + timeout = kwargs['_timeout'] + context.waitForSignal(timeout) + if context.exception: + raise Exception(context.exception) + result = context.queryResults + return result + + + def _clearContext(self, sequence): + try: + self.lock.acquire() + self.contextMap.pop(sequence) + finally: + self.lock.release() + + + def _schemaInfoFromV2Agent(self): + """ + We have just received new schema information from this agent. Check to see if there's + more work that can now be done. + """ + try: + self.lock.acquire() + copy_of_map = {} + for item in self.contextMap: + copy_of_map[item] = self.contextMap[item] + finally: + self.lock.release() + + self.unsolicitedContext.reprocess() + for context in copy_of_map: + copy_of_map[context].reprocess() + + + def _handleV1Completion(self, sequence, code, text): + """ + Called if one of this agent's V1 commands completed + """ + context = None + try: + self.lock.acquire() + if sequence in self.contextMap: + context = self.contextMap[sequence] + finally: + self.lock.release() + + if context: + if code != 0: + ex = "Error %d: %s" % (code, text) + context.setException(ex) + context.signal() + self.broker._clearSequence(sequence) + + + def _v1HandleMethodResp(self, codec, seq): + """ + Handle a QMFv1 method response + """ + code = codec.read_uint32() + text = codec.read_str16() + outArgs = {} + self.broker._clearSequence(seq) + pair = self.seqMgr._release(seq) + if pair == None: + return + method, synchronous = pair + if code == 0: + for arg in method.arguments: + if arg.dir.find("O") != -1: + outArgs[arg.name] = self.session._decodeValue(codec, arg.type, self.broker) + result = MethodResult(code, text, outArgs) + if synchronous: + try: + self.broker.cv.acquire() + self.broker.syncResult = result + self.broker.syncInFlight = False + self.broker.cv.notify() + finally: + self.broker.cv.release() + else: + if self.session.console: + self.session.console.methodResponse(self.broker, seq, result) + + + def _v1HandleEventInd(self, codec, seq): + """ + Handle a QMFv1 event indication + """ + event = Event(self, codec) + self.unsolicitedContext.doEvent(event) + + + def _v1HandleContentInd(self, codec, sequence, prop=False, stat=False): + """ + Handle a QMFv1 content indication + """ + classKey = ClassKey(codec) + schema = self.schemaCache.getSchema(classKey) + if not schema: + return + + obj = Object(self, schema, codec, prop, stat) + if classKey.getPackageName() == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop: + self.broker._updateAgent(obj) + + context = self.unsolicitedContext + try: + self.lock.acquire() + if sequence in self.contextMap: + context = self.contextMap[sequence] + finally: + self.lock.release() + + context.addV1QueryResult(obj) + + + def _v2HandleDataInd(self, mp, ah, content): + """ + Handle a QMFv2 data indication from the agent + """ + if mp.correlation_id: + try: + self.lock.acquire() + sequence = int(mp.correlation_id) + if sequence not in self.contextMap: + return + context = self.contextMap[sequence] + finally: + self.lock.release() + else: + context = self.unsolicitedContext + + kind = "_data" + if "qmf.content" in ah: + kind = ah["qmf.content"] + if kind == "_data": + if content.__class__ != list: + return + for omap in content: + context.addV2QueryResult(omap) + context.processV2Data() + + if 'partial' not in ah: + context.signal() + + + def _v2HandleMethodResp(self, mp, ah, content): + """ + Handle a QMFv2 method response from the agent + """ + context = None + sequence = None + if mp.correlation_id: + try: + self.lock.acquire() + seq = int(mp.correlation_id) + finally: + self.lock.release() + else: + return + + pair = self.seqMgr._release(seq) + if pair == None: + return + method, synchronous = pair + + result = MethodResult(0, 'OK', content['_arguments']) + if synchronous: + try: + self.broker.cv.acquire() + self.broker.syncResult = result + self.broker.syncInFlight = False + self.broker.cv.notify() + finally: + self.broker.cv.release() + else: + if self.session.console: + self.session.console.methodResponse(self.broker, seq, result) + + def _v2HandleException(self, mp, ah, content): + """ + Handle a QMFv2 exception + """ + context = None + if mp.correlation_id: + try: + self.lock.acquire() + seq = int(mp.correlation_id) + finally: + self.lock.release() + else: + return + + pair = self.seqMgr._release(seq) + if pair == None: + return + method, synchronous = pair + + code = 7 + text = "" + if '_status_code' in content: + code = content['_status_code'] + if '_status_text' in content: + text = content['_status_text'] + else: + text = content + + result = MethodResult(code, text, {}) + if synchronous: + try: + self.broker.cv.acquire() + self.broker.syncResult = result + self.broker.syncInFlight = False + self.broker.cv.notify() + finally: + self.broker.cv.release() + else: + if self.session.console: + self.session.console.methodResponse(self.broker, seq, result) + + + def _v1SendGetQuery(self, sequence, kwargs): + """ + Send a get query to a QMFv1 agent. + """ + # + # Build the query map + # + query = {} + if '_class' in kwargs: + query['_class'] = kwargs['_class'] + if '_package' in kwargs: + query['_package'] = kwargs['_package'] + elif '_key' in kwargs: + key = kwargs['_key'] + query['_class'] = key.getClassName() + query['_package'] = key.getPackageName() + elif '_objectId' in kwargs: + query['_objectid'] = kwargs['_objectId'].__repr__() + + # + # Construct and transmit the message + # + sendCodec = Codec() + self.broker._setHeader(sendCodec, 'G', sequence) + sendCodec.write_map(query) + smsg = self.broker._message(sendCodec.encoded, "agent.%d.%s" % (self.brokerBank, self.agentBank)) + self.broker._send(smsg) + + + def _v2SendGetQuery(self, sequence, kwargs): + """ + Send a get query to a QMFv2 agent. + """ + # + # Build the query map + # + query = {'_what': 'OBJECT'} + if '_class' in kwargs: + schemaMap = {'_class_name': kwargs['_class']} + if '_package' in kwargs: + schemaMap['_package_name'] = kwargs['_package'] + query['_schema_id'] = schemaMap + elif '_key' in kwargs: + query['_schema_id'] = kwargs['_key'].asMap() + elif '_objectId' in kwargs: + query['_object_id'] = kwargs['_objectId'].asMap() + + # + # Construct and transmit the message + # + dp = self.broker.amqpSession.delivery_properties() + dp.routing_key = self.agentBank + mp = self.broker.amqpSession.message_properties() + mp.content_type = "amqp/map" + mp.user_id = self.broker.authUser + mp.correlation_id = str(sequence) + mp.app_id = "qmf2" + mp.reply_to = self.broker.amqpSession.reply_to("qmf.default.direct", self.broker.v2_queue_name) + mp.application_headers = {'qmf.opcode':'_query_request'} + sendCodec = Codec() + sendCodec.write_map(query) + msg = Message(dp, mp, sendCodec.encoded) + self.broker._send(msg, "qmf.default.direct") + + + def _v2SendSchemaRequest(self, schemaId): + """ + Send a query to an agent to request details on a particular schema class. + IMPORTANT: This function currently sends a QMFv1 schema-request to the address of + the agent. The agent will send its response to amq.direct/<our-key>. + Eventually, this will be converted to a proper QMFv2 schema query. + """ + sendCodec = Codec() + seq = self.seqMgr._reserve(None) + self.broker._setHeader(sendCodec, 'S', seq) + schemaId.encode(sendCodec) + smsg = self.broker._message(sendCodec.encoded, self.agentBank) + self.broker._send(smsg, "qmf.default.direct") + + + def _handleQmfV1Message(self, opcode, seq, mp, ah, codec): + """ + Process QMFv1 messages arriving from an agent. + """ + if opcode == 'm': self._v1HandleMethodResp(codec, seq) + elif opcode == 'e': self._v1HandleEventInd(codec, seq) + elif opcode == 'c': self._v1HandleContentInd(codec, seq, prop=True) + elif opcode == 'i': self._v1HandleContentInd(codec, seq, stat=True) + elif opcode == 'g': self._v1HandleContentInd(codec, seq, prop=True, stat=True) + + + def _handleQmfV2Message(self, opcode, mp, ah, content): + """ + Process QMFv2 messages arriving from an agent. + """ + if opcode == '_data_indication': self._v2HandleDataInd(mp, ah, content) + elif opcode == '_query_response': self._v2HandleDataInd(mp, ah, content) + elif opcode == '_method_response': self._v2HandleMethodResp(mp, ah, content) + elif opcode == '_exception': self._v2HandleException(mp, ah, content) + + +#=================================================================================================== +# RequestContext +#=================================================================================================== +class RequestContext(object): + """ + This class tracks an asynchronous request sent to an agent. + TODO: Add logic for client-side selection and filtering deleted objects from get-queries + """ + def __init__(self, agent, notifiable, selectors={}): + self.sequence = None + self.agent = agent + self.schemaCache = self.agent.schemaCache + self.notifiable = notifiable + self.selectors = selectors + self.startTime = time() + self.rawQueryResults = [] + self.queryResults = [] + self.exception = None + self.waitingForSchema = None + self.pendingSignal = None + self.cv = Condition() + self.blocked = notifiable == None + + + def setSequence(self, sequence): + self.sequence = sequence + + + def addV1QueryResult(self, data): + values = {} + for prop, val in data.getProperties(): + values[prop.name] = val + for stat, val in data.getStatistics(): + values[stat.name] = val + for key in values: + val = values[key] + if key in self.selectors and val != self.selectors[key]: + return + + if self.notifiable: + self.notifiable(qmf_object=data) + else: + self.queryResults.append(data) + + + def addV2QueryResult(self, data): + values = data['_values'] + for key in values: + val = values[key] + if key in self.selectors and val != self.selectors[key]: + return + self.rawQueryResults.append(data) + + + def doEvent(self, data): + if self.notifiable: + self.notifiable(qmf_event=data) + + + def setException(self, ex): + self.exception = ex + + + def getAge(self): + return time() - self.startTime + + + def cancel(self, exception): + self.setException(exception) + try: + self.cv.acquire() + self.blocked = None + self.waitingForSchema = None + self.cv.notify() + finally: + self.cv.release() + self._complete() + + + def waitForSignal(self, timeout): + try: + self.cv.acquire() + while self.blocked: + if (time() - self.startTime) > timeout: + self.exception = "Request timed out after %d seconds" % timeout + return + self.cv.wait(1) + finally: + self.cv.release() + + + def signal(self): + try: + self.cv.acquire() + if self.waitingForSchema: + self.pendingSignal = True + return + else: + self.blocked = None + self.cv.notify() + finally: + self.cv.release() + self._complete() + + + def _complete(self): + if self.notifiable: + if self.exception: + self.notifiable(qmf_exception=self.exception) + else: + self.notifiable(qmf_complete=True) + + if self.sequence: + self.agent._clearContext(self.sequence) + + + def processV2Data(self): + """ + Attempt to make progress on the entries in the raw_query_results queue. If an entry has a schema + that is in our schema cache, process it. Otherwise, send a request for the schema information + to the agent that manages the object. + """ + schemaId = None + queryResults = [] + try: + self.cv.acquire() + if self.waitingForSchema: + return + while (not self.waitingForSchema) and len(self.rawQueryResults) > 0: + head = self.rawQueryResults[0] + schemaId = self._getSchemaIdforV2ObjectLH(head) + schema = self.schemaCache.getSchema(schemaId) + if schema: + obj = Object(self.agent, schema, v2Map=head, agentName=self.agent.agentBank) + queryResults.append(obj) + self.rawQueryResults.pop(0) + else: + self.waitingForSchema = True + finally: + self.cv.release() + + if self.waitingForSchema: + self.agent._v2SendSchemaRequest(schemaId) + + for result in queryResults: + if self.notifiable: + self.notifiable(qmf_object=result) + else: + self.queryResults.append(result) + + complete = None + try: + self.cv.acquire() + if not self.waitingForSchema and self.pendingSignal: + self.blocked = None + self.cv.notify() + complete = True + finally: + self.cv.release() + + if complete: + self._complete() + + + def reprocess(self): + """ + New schema information has been added to the schema-cache. Clear our 'waiting' status + and see if we can make more progress on the raw query list. + """ + try: + self.cv.acquire() + self.waitingForSchema = None + finally: + self.cv.release() + self.processV2Data() + + + def _getSchemaIdforV2ObjectLH(self, data): + """ + Given a data map, extract the schema-identifier. + """ + if data.__class__ != dict: + return None + if '_schema_id' in data: + return ClassKey(data['_schema_id']) + return None + + +#=================================================================================================== +# Event +#=================================================================================================== class Event: """ """ - def __init__(self, session, broker, codec): - self.session = session - self.broker = broker + def __init__(self, agent, codec): + self.agent = agent + self.session = agent.session + self.broker = agent.broker self.classKey = ClassKey(codec) self.timestamp = codec.read_int64() self.severity = codec.read_uint8() - self.schema = None - pname = self.classKey.getPackageName() - pkey = self.classKey.getPackageKey() - if pname in session.packages: - if pkey in session.packages[pname]: - self.schema = session.packages[pname][pkey] - self.arguments = {} - for arg in self.schema.arguments: - self.arguments[arg.name] = session._decodeValue(codec, arg.type, broker) + self.arguments = {} + self.schema = self.session.schemaCache.getSchema(self.classKey) + if not self.schema: + return + for arg in self.schema.arguments: + self.arguments[arg.name] = self.session._decodeValue(codec, arg.type, self.broker) def __repr__(self): if self.schema == None: @@ -1895,6 +2995,10 @@ class Event: def getSchema(self): return self.schema + +#=================================================================================================== +# SequenceManager +#=================================================================================================== class SequenceManager: """ Manage sequence numbers for asynchronous method calls """ def __init__(self): @@ -1926,6 +3030,9 @@ class SequenceManager: return data +#=================================================================================================== +# DebugConsole +#=================================================================================================== class DebugConsole(Console): """ """ def brokerConnected(self, broker): diff --git a/tools/src/py/qpid-cluster b/tools/src/py/qpid-cluster index 6d64765184..7e608e4f2b 100755 --- a/tools/src/py/qpid-cluster +++ b/tools/src/py/qpid-cluster @@ -94,7 +94,7 @@ class BrokerManager: self.broker = self.qmf.addBroker(brokerUrl, self.config._connTimeout) agents = self.qmf.getAgents() for a in agents: - if a.getAgentBank() == 0: + if a.getAgentBank() == '0': self.brokerAgent = a def Disconnect(self): diff --git a/tools/src/py/qpid-config b/tools/src/py/qpid-config index 0db42bc6c7..24a20b0431 100755 --- a/tools/src/py/qpid-config +++ b/tools/src/py/qpid-config @@ -189,7 +189,7 @@ class BrokerManager: self.broker = self.qmf.addBroker(brokerUrl, _connTimeout) agents = self.qmf.getAgents() for a in agents: - if a.getAgentBank() == 0: + if a.getAgentBank() == '0': self.brokerAgent = a def Disconnect(self): diff --git a/tools/src/py/qpid-stat b/tools/src/py/qpid-stat index 1541d6ec9f..e3bfe288dd 100755 --- a/tools/src/py/qpid-stat +++ b/tools/src/py/qpid-stat @@ -91,7 +91,7 @@ class Broker(object): agents = qmf.getAgents() for a in agents: - if a.getAgentBank() == 0: + if a.getAgentBank() == '0': self.brokerAgent = a bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _agent=self.brokerAgent)[0] |