summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xcpp/bindings/qmf/tests/python_console.py2
-rw-r--r--cpp/examples/qmf-agent/example.cpp21
-rw-r--r--cpp/examples/qmf-agent/schema.xml2
-rw-r--r--cpp/include/qpid/agent/ManagementAgent.h14
-rw-r--r--cpp/include/qpid/framing/FieldTable.h1
-rw-r--r--cpp/include/qpid/management/ManagementEvent.h12
-rw-r--r--cpp/include/qpid/management/ManagementObject.h83
-rwxr-xr-xcpp/managementgen/qmf-gen2
-rwxr-xr-xcpp/managementgen/qmfgen/generate.py41
-rw-r--r--cpp/managementgen/qmfgen/management-types.xml28
-rwxr-xr-xcpp/managementgen/qmfgen/schema.py314
-rw-r--r--cpp/managementgen/qmfgen/templates/Args.h4
-rw-r--r--cpp/managementgen/qmfgen/templates/Class.cpp158
-rw-r--r--cpp/managementgen/qmfgen/templates/Class.h30
-rw-r--r--cpp/managementgen/qmfgen/templates/Event.cpp34
-rw-r--r--cpp/managementgen/qmfgen/templates/Event.h9
-rw-r--r--cpp/src/qpid/acl/Acl.cpp5
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.cpp673
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.h42
-rw-r--r--cpp/src/qpid/broker/Broker.cpp9
-rw-r--r--cpp/src/qpid/broker/Broker.h1
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp4
-rw-r--r--cpp/src/qpid/broker/Queue.cpp2
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp2
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp9
-rw-r--r--cpp/src/qpid/broker/System.cpp3
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp1405
-rw-r--r--cpp/src/qpid/management/ManagementAgent.h53
-rw-r--r--cpp/src/qpid/management/ManagementDirectExchange.cpp10
-rw-r--r--cpp/src/qpid/management/ManagementObject.cpp224
-rw-r--r--cpp/src/qpid/management/ManagementTopicExchange.cpp15
-rw-r--r--cpp/src/tests/Makefile.am6
-rw-r--r--cpp/src/tests/ManagementTest.cpp22
-rw-r--r--extras/qmf/src/py/qmf/console.py1717
-rwxr-xr-xtools/src/py/qpid-cluster2
-rwxr-xr-xtools/src/py/qpid-config2
-rwxr-xr-xtools/src/py/qpid-stat2
37 files changed, 4035 insertions, 928 deletions
diff --git a/cpp/bindings/qmf/tests/python_console.py b/cpp/bindings/qmf/tests/python_console.py
index efd25a1d26..883aa8da1a 100755
--- a/cpp/bindings/qmf/tests/python_console.py
+++ b/cpp/bindings/qmf/tests/python_console.py
@@ -48,7 +48,7 @@ class QmfInteropTests(TestBase010):
self.assertEqual(len(parents), 1)
parent = parents[0]
for seq in range(10):
- result = parent.echo(seq)
+ result = parent.echo(seq, _timeout=5)
self.assertEqual(result.status, 0)
self.assertEqual(result.text, "OK")
self.assertEqual(result.sequence, seq)
diff --git a/cpp/examples/qmf-agent/example.cpp b/cpp/examples/qmf-agent/example.cpp
index 5ab9c10c91..18f1bf6a90 100644
--- a/cpp/examples/qmf-agent/example.cpp
+++ b/cpp/examples/qmf-agent/example.cpp
@@ -24,6 +24,7 @@
#include <qpid/agent/ManagementAgent.h>
#include <qpid/sys/Mutex.h>
#include <qpid/sys/Time.h>
+#include "qpid/types/Variant.h"
#include "qmf/org/apache/qpid/agent/example/Parent.h"
#include "qmf/org/apache/qpid/agent/example/Child.h"
#include "qmf/org/apache/qpid/agent/example/ArgsParentCreate_child.h"
@@ -44,6 +45,7 @@ using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
using qpid::sys::Mutex;
+using qpid::types::Variant;
namespace _qmf = qmf::org::apache::qpid::agent::example;
class ChildClass;
@@ -96,8 +98,22 @@ CoreClass::CoreClass(ManagementAgent* _agent, string _name) : name(_name), agent
static uint64_t persistId = 0x111222333444555LL;
mgmtObject = new _qmf::Parent(agent, this, name);
- agent->addObject(mgmtObject, persistId++);
+ agent->addObject(mgmtObject);
mgmtObject->set_state("IDLE");
+
+ Variant::Map args;
+ Variant::Map subMap;
+ args["first"] = "String data";
+ args["second"] = 34;
+ subMap["string-data"] = "Text";
+ subMap["numeric-data"] = 10000;
+ args["map-data"] = subMap;
+ mgmtObject->set_args(args);
+
+ Variant::List list;
+ list.push_back(20000);
+ list.push_back("string-item");
+ mgmtObject->set_list(list);
}
void CoreClass::doLoop()
@@ -178,6 +194,9 @@ int main_int(int argc, char** argv)
// Register the Qmf_example schema with the agent
_qmf::Package packageInit(agent);
+ // Name the agent.
+ agent->setName("apache.org", "qmf-example");
+
// Start the agent. It will attempt to make a connection to the
// management broker
agent->init(settings, 5, false, ".magentdata");
diff --git a/cpp/examples/qmf-agent/schema.xml b/cpp/examples/qmf-agent/schema.xml
index 1bf701a655..3c7755fe83 100644
--- a/cpp/examples/qmf-agent/schema.xml
+++ b/cpp/examples/qmf-agent/schema.xml
@@ -29,6 +29,8 @@
This class represents a parent object
<property name="name" type="sstr" access="RC" index="y"/>
+ <property name="args" type="map" access="RO"/>
+ <property name="list" type="list" access="RO"/>
<statistic name="state" type="sstr" desc="Operational state of the link"/>
<statistic name="count" type="count64" unit="tick" desc="Counter that increases monotonically"/>
diff --git a/cpp/include/qpid/agent/ManagementAgent.h b/cpp/include/qpid/agent/ManagementAgent.h
index b6ec82862c..aeb5585e61 100644
--- a/cpp/include/qpid/agent/ManagementAgent.h
+++ b/cpp/include/qpid/agent/ManagementAgent.h
@@ -63,6 +63,17 @@ class ManagementAgent
virtual int getMaxThreads() = 0;
+ // Set the name of the agent
+ //
+ // vendor - Vendor name or domain (i.e. "apache.org")
+ // product - Product name (i.e. "qpid")
+ // instance - A unique identifier for this instance of the agent.
+ // If empty, the agent will create a GUID for the instance.
+ //
+ virtual void setName(const std::string& vendor,
+ const std::string& product,
+ const std::string& instance="") = 0;
+
// Connect to a management broker
//
// brokerHost - Hostname or IP address (dotted-quad) of broker.
@@ -128,6 +139,9 @@ class ManagementAgent
// in an orderly way.
//
virtual ObjectId addObject(ManagementObject* objectPtr, uint64_t persistId = 0) = 0;
+ virtual ObjectId addObject(ManagementObject* objectPtr,
+ const std::string& key,
+ bool persistent = true) = 0;
//
//
diff --git a/cpp/include/qpid/framing/FieldTable.h b/cpp/include/qpid/framing/FieldTable.h
index 085f7ed110..fdb1a28b9d 100644
--- a/cpp/include/qpid/framing/FieldTable.h
+++ b/cpp/include/qpid/framing/FieldTable.h
@@ -51,6 +51,7 @@ class FieldTable
typedef boost::shared_ptr<FieldValue> ValuePtr;
typedef std::map<std::string, ValuePtr> ValueMap;
typedef ValueMap::iterator iterator;
+ typedef ValueMap::const_iterator const_iterator;
typedef ValueMap::const_reference const_reference;
typedef ValueMap::reference reference;
typedef ValueMap::value_type value_type;
diff --git a/cpp/include/qpid/management/ManagementEvent.h b/cpp/include/qpid/management/ManagementEvent.h
index 01b9ae49ec..e80175096f 100644
--- a/cpp/include/qpid/management/ManagementEvent.h
+++ b/cpp/include/qpid/management/ManagementEvent.h
@@ -23,7 +23,7 @@
*/
#include "qpid/management/ManagementObject.h"
-#include <qpid/framing/Buffer.h>
+#include "qpid/types/Variant.h"
#include <string>
namespace qpid {
@@ -32,16 +32,20 @@ namespace management {
class ManagementAgent;
class ManagementEvent : public ManagementItem {
-public:
- typedef void (*writeSchemaCall_t)(qpid::framing::Buffer&);
+ public:
+ static const uint8_t MD5_LEN = 16;
+ //typedef void (*writeSchemaCall_t)(qpid::framing::Buffer&);
+ typedef void (*writeSchemaCall_t)(std::string&);
virtual ~ManagementEvent() {}
virtual writeSchemaCall_t getWriteSchemaCall(void) = 0;
+ //virtual mapEncodeSchemaCall_t getMapEncodeSchemaCall(void) = 0;
virtual std::string& getEventName() const = 0;
virtual std::string& getPackageName() const = 0;
virtual uint8_t* getMd5Sum() const = 0;
virtual uint8_t getSeverity() const = 0;
- virtual void encode(qpid::framing::Buffer&) const = 0;
+ virtual void encode(std::string&) const = 0;
+ virtual void mapEncode(qpid::types::Variant::Map&) const = 0;
};
}}
diff --git a/cpp/include/qpid/management/ManagementObject.h b/cpp/include/qpid/management/ManagementObject.h
index b1c70f64d6..0e9c7f0a0b 100644
--- a/cpp/include/qpid/management/ManagementObject.h
+++ b/cpp/include/qpid/management/ManagementObject.h
@@ -24,8 +24,8 @@
#include "qpid/sys/Time.h"
#include "qpid/sys/Mutex.h"
-#include <qpid/framing/Buffer.h>
#include "qpid/CommonImportExport.h"
+#include "qpid/types/Variant.h"
#include <map>
#include <vector>
@@ -53,23 +53,33 @@ protected:
const AgentAttachment* agent;
uint64_t first;
uint64_t second;
+ uint64_t agentEpoch;
std::string v2Key;
+ std::string agentName;
void fromString(const std::string&);
public:
- QPID_COMMON_EXTERN ObjectId() : agent(0), first(0), second(0) {}
- QPID_COMMON_EXTERN ObjectId(framing::Buffer& buf) : agent(0) { decode(buf); }
- QPID_COMMON_EXTERN ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint32_t bank, uint64_t object);
- QPID_COMMON_EXTERN ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq, uint64_t object);
+ QPID_COMMON_EXTERN ObjectId() : agent(0), first(0), second(0), agentEpoch(0) {}
+ QPID_COMMON_EXTERN ObjectId(const types::Variant& map) :
+ agent(0), first(0), second(0), agentEpoch(0) { mapDecode(map.asMap()); }
+ QPID_COMMON_EXTERN ObjectId(uint8_t flags, uint16_t seq, uint32_t broker);
+ QPID_COMMON_EXTERN ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq);
QPID_COMMON_EXTERN ObjectId(std::istream&);
QPID_COMMON_EXTERN ObjectId(const std::string&);
+ // Deprecated:
+ QPID_COMMON_EXTERN ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint64_t object);
QPID_COMMON_EXTERN bool operator==(const ObjectId &other) const;
QPID_COMMON_EXTERN bool operator<(const ObjectId &other) const;
+ QPID_COMMON_EXTERN void mapEncode(types::Variant::Map& map) const;
+ QPID_COMMON_EXTERN void mapDecode(const types::Variant::Map& map);
+ QPID_COMMON_EXTERN operator types::Variant::Map() const;
QPID_COMMON_EXTERN uint32_t encodedSize() const { return 16; };
- QPID_COMMON_EXTERN void encode(framing::Buffer& buffer) const;
- QPID_COMMON_EXTERN void decode(framing::Buffer& buffer);
+ QPID_COMMON_EXTERN void encode(std::string& buffer) const;
+ QPID_COMMON_EXTERN void decode(const std::string& buffer);
+ QPID_COMMON_EXTERN bool equalV1(const ObjectId &other) const;
QPID_COMMON_EXTERN void setV2Key(const std::string& _key) { v2Key = _key; }
QPID_COMMON_EXTERN void setV2Key(const ManagementObject& object);
- QPID_COMMON_EXTERN bool equalV1(const ObjectId &other) const;
+ QPID_COMMON_EXTERN void setAgentName(const std::string& _name) { agentName = _name; }
+ QPID_COMMON_EXTERN const std::string& getAgentName() const { return agentName; }
QPID_COMMON_EXTERN const std::string& getV2Key() const { return v2Key; }
friend QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream&, const ObjectId&);
};
@@ -94,6 +104,7 @@ public:
static const uint8_t TYPE_S16 = 17;
static const uint8_t TYPE_S32 = 18;
static const uint8_t TYPE_S64 = 19;
+ static const uint8_t TYPE_LIST = 21;
static const uint8_t ACCESS_RC = 1;
static const uint8_t ACCESS_RW = 2;
@@ -125,7 +136,7 @@ protected:
uint64_t updateTime;
ObjectId objectId;
mutable bool configChanged;
- bool instChanged;
+ mutable bool instChanged;
bool deleted;
Manageable* coreObject;
mutable sys::Mutex accessLock;
@@ -135,13 +146,17 @@ protected:
bool forcePublish;
QPID_COMMON_EXTERN int getThreadIndex();
- QPID_COMMON_EXTERN void writeTimestamps(qpid::framing::Buffer& buf) const;
- QPID_COMMON_EXTERN void readTimestamps(qpid::framing::Buffer& buf);
+ QPID_COMMON_EXTERN void writeTimestamps(std::string& buf) const;
+ QPID_COMMON_EXTERN void writeTimestamps(types::Variant::Map& map) const;
+ QPID_COMMON_EXTERN void readTimestamps(const std::string& buf);
+ QPID_COMMON_EXTERN void readTimestamps(const types::Variant::Map& buf);
QPID_COMMON_EXTERN uint32_t writeTimestampsSize() const;
public:
+ QPID_COMMON_EXTERN static const uint8_t MD5_LEN = 16;
QPID_COMMON_EXTERN static int maxThreads;
- typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&);
+ //typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&);
+ typedef void (*writeSchemaCall_t) (std::string&);
ManagementObject(Manageable* _core) :
createTime(uint64_t(qpid::sys::Duration(qpid::sys::now()))),
@@ -151,15 +166,28 @@ protected:
virtual ~ManagementObject() {}
virtual writeSchemaCall_t getWriteSchemaCall() = 0;
- virtual void readProperties(qpid::framing::Buffer& buf) = 0;
- virtual uint32_t writePropertiesSize() const = 0;
- virtual void writeProperties(qpid::framing::Buffer& buf) const = 0;
- virtual void writeStatistics(qpid::framing::Buffer& buf,
- bool skipHeaders = false) = 0;
- virtual void doMethod(std::string& methodName,
- qpid::framing::Buffer& inBuf,
- qpid::framing::Buffer& outBuf) = 0;
virtual std::string getKey() const = 0;
+
+ // Encode & Decode the property and statistics values
+ // for this object.
+ virtual void mapEncodeValues(types::Variant::Map& map,
+ bool includeProperties,
+ bool includeStatistics) = 0;
+ virtual void mapDecodeValues(const types::Variant::Map& map) = 0;
+ virtual void doMethod(std::string& methodName,
+ const types::Variant::Map& inMap,
+ types::Variant::Map& outMap) = 0;
+
+ /**
+ * The following five methods are not pure-virtual because they will only
+ * be overridden in cases where QMFv1 is to be supported.
+ */
+ virtual uint32_t writePropertiesSize() const { return 0; }
+ virtual void readProperties(const std::string&) {}
+ virtual void writeProperties(std::string&) const {}
+ virtual void writeStatistics(std::string&, bool = false) {}
+ virtual void doMethod(std::string&, const std::string&, std::string&) {}
+
QPID_COMMON_EXTERN virtual void setReference(ObjectId objectId);
virtual std::string& getClassName() const = 0;
@@ -183,16 +211,23 @@ protected:
inline void setFlags(uint32_t f) { flags = f; }
inline uint32_t getFlags() { return flags; }
bool isSameClass(ManagementObject& other) {
- for (int idx = 0; idx < 16; idx++)
+ for (int idx = 0; idx < MD5_LEN; idx++)
if (other.getMd5Sum()[idx] != getMd5Sum()[idx])
return false;
return other.getClassName() == getClassName() &&
other.getPackageName() == getPackageName();
}
- QPID_COMMON_EXTERN void encode(qpid::framing::Buffer& buf) const { writeProperties(buf); }
- QPID_COMMON_EXTERN void decode(qpid::framing::Buffer& buf) { readProperties(buf); }
- QPID_COMMON_EXTERN uint32_t encodedSize() const { return writePropertiesSize(); }
+ // QPID_COMMON_EXTERN void encode(qpid::framing::Buffer& buf) const { writeProperties(buf); }
+ // QPID_COMMON_EXTERN void decode(qpid::framing::Buffer& buf) { readProperties(buf); }
+ //QPID_COMMON_EXTERN uint32_t encodedSize() const { return writePropertiesSize(); }
+
+ // Encode/Decode the entire object as a map
+ QPID_COMMON_EXTERN void mapEncode(types::Variant::Map& map,
+ bool includeProperties=true,
+ bool includeStatistics=true);
+
+ QPID_COMMON_EXTERN void mapDecode(const types::Variant::Map& map);
};
typedef std::map<ObjectId, ManagementObject*> ManagementObjectMap;
diff --git a/cpp/managementgen/qmf-gen b/cpp/managementgen/qmf-gen
index ebc07137ae..667aa1ba2d 100755
--- a/cpp/managementgen/qmf-gen
+++ b/cpp/managementgen/qmf-gen
@@ -62,8 +62,10 @@ if len(args) == 0:
vargs = {}
if opts.brokerplugin:
vargs["agentHeaderDir"] = "management"
+ vargs["genQmfV1"] = True
else:
vargs["agentHeaderDir"] = "agent"
+ vargs["genQmfV1"] = None
for schemafile in args:
package = SchemaPackage(typefile, schemafile, opts)
diff --git a/cpp/managementgen/qmfgen/generate.py b/cpp/managementgen/qmfgen/generate.py
index 4052b8c853..8a00b69761 100755
--- a/cpp/managementgen/qmfgen/generate.py
+++ b/cpp/managementgen/qmfgen/generate.py
@@ -38,39 +38,43 @@ class Template:
self.filename = filename
self.handler = handler
self.handler.initExpansion ()
- self.writing = True
+ self.writing = 0 # 0 => write output lines; >0 => recursive depth of conditional regions
def expandLine (self, line, stream, object):
cursor = 0
while 1:
sub = line.find ("/*MGEN:", cursor)
if sub == -1:
- if self.writing:
+ if self.writing == 0:
stream.write (line[cursor:len (line)])
return
subend = line.find("*/", sub)
- if self.writing:
+ if self.writing == 0:
stream.write (line[cursor:sub])
cursor = subend + 2
tag = line[sub:subend]
if tag[7:10] == "IF(":
- close = tag.find(")")
- if close == -1:
- raise ValueError ("Missing ')' on condition")
- cond = tag[10:close]
- dotPos = cond.find (".")
- if dotPos == -1:
- raise ValueError ("Invalid condition tag: %s" % cond)
- tagObject = cond[0:dotPos]
- tagName = cond[dotPos + 1 : len(cond)]
- if not self.handler.testCondition(object, tagObject, tagName):
- self.writing = False
+ if self.writing == 0:
+ close = tag.find(")")
+ if close == -1:
+ raise ValueError ("Missing ')' on condition")
+ cond = tag[10:close]
+ dotPos = cond.find (".")
+ if dotPos == -1:
+ raise ValueError ("Invalid condition tag: %s" % cond)
+ tagObject = cond[0:dotPos]
+ tagName = cond[dotPos + 1 : len(cond)]
+ if not self.handler.testCondition(object, tagObject, tagName):
+ self.writing += 1
+ else:
+ self.writing += 1
elif tag[7:12] == "ENDIF":
- self.writing = True
+ if self.writing > 0:
+ self.writing -= 1
else:
equalPos = tag.find ("=")
@@ -80,12 +84,12 @@ class Template:
raise ValueError ("Invalid tag: %s" % tag)
tagObject = tag[7:dotPos]
tagName = tag[dotPos + 1:len (tag)]
- if self.writing:
+ if self.writing == 0:
self.handler.substHandler (object, stream, tagObject, tagName)
else:
tagKey = tag[7:equalPos]
tagVal = tag[equalPos + 1:len (tag)]
- if self.writing:
+ if self.writing == 0:
self.handler.setVariable (tagKey, tagVal)
def expand (self, object):
@@ -297,6 +301,9 @@ class Generator:
self.packagelist.append(path)
self.packagePath = self.normalize(self.dest + path)
+ def testGenQMFv1 (self, variables):
+ return variables["genQmfV1"]
+
def genDisclaimer (self, stream, variables):
prefix = variables["commentPrefix"]
stream.write (prefix + " This source file was created by a code generator.\n")
diff --git a/cpp/managementgen/qmfgen/management-types.xml b/cpp/managementgen/qmfgen/management-types.xml
index 6dbabc90ff..857f8af212 100644
--- a/cpp/managementgen/qmfgen/management-types.xml
+++ b/cpp/managementgen/qmfgen/management-types.xml
@@ -19,7 +19,14 @@
under the License.
-->
-<type name="objId" base="REF" cpp="::qpid::management::ObjectId" encode="#.encode(@)" decode="#.decode(@)" stream="#.getV2Key()" size="16" accessor="direct" init="::qpid::management::ObjectId()" byRef="y"/>
+<!-- "unmap": cast to convert from Variant to native type constructor,
+ "map": cast to convert from native type to Variant constructor parameter
+-->
+
+<type name="objId" base="REF" cpp="::qpid::management::ObjectId"
+ encode="{std::string _s; #.encode(_s); @.putRawData(_s);}"
+ decode="{std::string _s; @.getRawData(_s, #.encodedSize()); #.decode(_s);}"
+stream="#.getV2Key()" size="16" accessor="direct" init="::qpid::management::ObjectId()" byRef="y"/>
<type name="uint8" base="U8" cpp="uint8_t" encode="@.putOctet(#)" decode="# = @.getOctet()" stream="#" size="1" accessor="direct" init="0"/>
<type name="uint16" base="U16" cpp="uint16_t" encode="@.putShort(#)" decode="# = @.getShort()" stream="#" size="2" accessor="direct" init="0"/>
<type name="uint32" base="U32" cpp="uint32_t" encode="@.putLong(#)" decode="# = @.getLong()" stream="#" size="4" accessor="direct" init="0"/>
@@ -29,14 +36,25 @@
<type name="int32" base="S32" cpp="int32_t" encode="@.putInt32(#)" decode="# = @.getInt32()" stream="#" size="4" accessor="direct" init="0"/>
<type name="int64" base="S64" cpp="int64_t" encode="@.putInt64(#)" decode="# = @.getInt64()" stream="#" size="8" accessor="direct" init="0"/>
<type name="bool" base="BOOL" cpp="uint8_t" encode="@.putOctet(#?1:0)" decode="# = @.getOctet()==1" stream="#" size="1" accessor="direct" init="0"/>
-<type name="sstr" base="SSTR" cpp="std::string" encode="@.putShortString(#)" decode="@.getShortString(#)" stream="#" size="(1 + #.length())" accessor="direct" init='""' byRef="y"/>
-<type name="lstr" base="LSTR" cpp="std::string" encode="@.putMediumString(#)" decode="@.getMediumString(#)" stream="#" size="(2 + #.length())" accessor="direct" init='""' byRef="y"/>
+<type name="sstr" base="SSTR" cpp="std::string" encode="@.putShortString(#)" decode="@.getShortString(#)" stream="#" size="(1 + #.length())" accessor="direct" init='""' byRef="y" unmap="(#).getString()"/>
+<type name="lstr" base="LSTR" cpp="std::string" encode="@.putMediumString(#)" decode="@.getMediumString(#)" stream="#" size="(2 + #.length())" accessor="direct" init='""' byRef="y" unmap="(#).getString()"/>
<type name="absTime" base="ABSTIME" cpp="int64_t" encode="@.putLongLong(#)" decode="# = @.getLongLong()" stream="#" size="8" accessor="direct" init="0"/>
<type name="deltaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong(#)" decode="# = @.getLongLong()" stream="#" size="8" accessor="direct" init="0"/>
<type name="float" base="FLOAT" cpp="float" encode="@.putFloat(#)" decode="# = @.getFloat()" stream="#" size="4" accessor="direct" init="0."/>
<type name="double" base="DOUBLE" cpp="double" encode="@.putDouble(#)" decode="# = @.getDouble()" stream="#" size="8" accessor="direct" init="0."/>
-<type name="uuid" base="UUID" cpp="::qpid::framing::Uuid" encode="#.encode(@)" decode="#.decode(@)" stream="#" size="16" accessor="direct" init="::qpid::framing::Uuid()" byRef="y"/>
-<type name="map" base="FTABLE" cpp="::qpid::framing::FieldTable" encode="#.encode(@)" decode="#.decode(@)" stream="#" size="#.encodedSize()" accessor="direct" init="::qpid::framing::FieldTable()" byRef="y"/>
+<type name="uuid" base="UUID" cpp="::qpid::types::Uuid"
+ encode="{::qpid::framing::Uuid _u(#.data()); _u.encode(@); }"
+ decode="{::qpid::framing::Uuid _u; _u.decode(@); # = ::qpid::types::Uuid(_u.data());}"
+ stream="#" size="16" accessor="direct" init="::qpid::types::Uuid()" byRef="y" unmap="(#).asUuid().data()" map="::qpid::types::Uuid((#).data())" />
+<type name="map" base="FTABLE" cpp="::qpid::types::Variant::Map"
+ encode="{::qpid::framing::FieldTable _f = ManagementAgent::fromMap(#); _f.encode(@);}"
+ decode="{::qpid::framing::FieldTable _f; _f.decode(@); # = ManagementAgent::toMap(_f);}"
+ size="::qpid::framing::FieldTable(ManagementAgent::fromMap(#)).encodedSize()"
+stream="#" accessor="direct" init="::qpid::types::Variant::Map()" byRef="y" unmap="::qpid::types::Variant::Map(); assert(false); /*TBD*/"/>
+<type name="list" base="LIST" cpp="::qpid::types::Variant::List"
+ encode="{::qpid::framing::List _l = ManagementAgent::fromList(#); _l.encode(@);}"
+ decode="{::qpid::framing::List _l; _l.decode(@); # = ManagementAgent::toList(_l);}"
+stream="#" size="#.encodedSize()" accessor="direct" init="::qpid::types::Variant::List()" byRef="y" unmap="::qpid::types::Variant::List(); assert(false); /*TBD*/"/>
<type name="hilo8" base="U8" cpp="uint8_t" encode="@.putOctet(#)" decode="# = @.getOctet()" style="wm" stream="#" size="1" accessor="counter" init="0"/>
<type name="hilo16" base="U16" cpp="uint16_t" encode="@.putShort(#)" decode="# = @.getShort()" style="wm" stream="#" size="2" accessor="counter" init="0"/>
diff --git a/cpp/managementgen/qmfgen/schema.py b/cpp/managementgen/qmfgen/schema.py
index d80567687e..c1bb507d84 100755
--- a/cpp/managementgen/qmfgen/schema.py
+++ b/cpp/managementgen/qmfgen/schema.py
@@ -19,12 +19,18 @@
from xml.dom.minidom import parse, parseString, Node
from cStringIO import StringIO
-import md5
+#import md5
+try:
+ import hashlib
+ _md5Obj = hashlib.md5
+except ImportError:
+ import md5
+ _md5Obj = md5.new
class Hash:
""" Manage the hash of an XML sub-tree """
def __init__(self, node):
- self.md5Sum = md5.new()
+ self.md5Sum = _md5Obj()
self._compute(node)
def addSubHash(self, hash):
@@ -64,6 +70,8 @@ class SchemaType:
self.init = "0"
self.perThread = False
self.byRef = False
+ self.unmap = "#"
+ self.map = "#"
attrs = node.attributes
for idx in range (attrs.length):
@@ -109,6 +117,12 @@ class SchemaType:
raise ValueError ("Expected 'y' in byRef attribute")
self.byRef = True
+ elif key == 'unmap':
+ self.unmap = val
+
+ elif key == 'map':
+ self.map = val
+
else:
raise ValueError ("Unknown attribute in type '%s'" % key)
@@ -211,6 +225,17 @@ class SchemaType:
def genRead (self, stream, varName, indent=" "):
stream.write(indent + self.decode.replace("@", "buf").replace("#", varName) + ";\n")
+ def genUnmap (self, stream, varName, indent=" ", key=None, mapName="_map",
+ _optional=False):
+ if key is None:
+ key = varName
+ stream.write(indent + "if ((_i = " + mapName + ".find(\"" + key + "\")) != " + mapName + ".end()) {\n")
+ stream.write(indent + " " + varName + " = " +
+ self.unmap.replace("#", "_i->second") + ";\n")
+ if _optional:
+ stream.write(indent + " _found = true;\n")
+ stream.write(indent + "}\n")
+
def genWrite (self, stream, varName, indent=" "):
if self.style != "mma":
stream.write (indent + self.encode.replace ("@", "buf").replace ("#", varName) + ";\n")
@@ -230,6 +255,31 @@ class SchemaType:
.replace ("#", varName + "Count ? " + varName + "Total / " +
varName + "Count : 0") + ";\n")
+ def genMap (self, stream, varName, indent=" ", key=None, mapName="_map"):
+ if key is None:
+ key = varName
+ if self.style != "mma":
+ var_cast = self.map.replace("#", varName)
+ stream.write(indent + mapName + "[\"" + key + "\"] = ::qpid::types::Variant(" + var_cast + ");\n")
+ if self.style == "wm":
+ var_cast_hi = self.map.replace("#", varName + "High")
+ var_cast_lo = self.map.replace("#", varName + "Low")
+ stream.write(indent + mapName + "[\"" + key + "High\"] = " +
+ "::qpid::types::Variant(" + var_cast_hi + ");\n")
+ stream.write(indent + mapName + "[\"" + key + "Low\"] = " +
+ "::qpid::types::Variant(" + var_cast_lo + ");\n")
+ if self.style == "mma":
+ var_cast = self.map.replace("#", varName + "Count")
+ stream.write(indent + mapName + "[\"" + key + "Count\"] = " + "::qpid::types::Variant(" + var_cast + ");\n")
+ var_cast = self.map.replace("#", varName + "Min")
+ stream.write(indent + mapName + "[\"" + key + "Min\"] = " +
+ "(" + varName + "Count ? ::qpid::types::Variant(" + var_cast + ") : ::qpid::types::Variant(0));\n")
+ var_cast = self.map.replace("#", varName + "Max")
+ stream.write(indent + mapName + "[\"" + key + "Max\"] = " + "::qpid::types::Variant(" + var_cast + ");\n")
+
+ var_cast = self.map.replace("#", "(" + varName + "Total / " + varName + "Count)")
+ stream.write(indent + mapName + "[\"" + key + "Avg\"] = " +
+ "(" + varName + "Count ? ::qpid::types::Variant(" + var_cast + ") : ::qpid::types::Variant(0));\n")
def getReadCode (self, varName, bufName):
result = self.decode.replace ("@", bufName).replace ("#", varName)
@@ -392,6 +442,29 @@ class SchemaProperty:
stream.write (" ft.setString (DESC, \"" + self.desc + "\");\n")
stream.write (" buf.put (ft);\n\n")
+
+ def genSchemaMap(self, stream):
+ stream.write (" {\n")
+ stream.write (" ::qpid::types::Variant::Map _value;\n")
+ stream.write (" _value[TYPE] = TYPE_" + self.type.type.base +";\n")
+ stream.write (" _value[ACCESS] = ACCESS_" + self.access + ";\n")
+ stream.write (" _value[IS_INDEX] = " + str (self.isIndex) + ";\n")
+ stream.write (" _value[IS_OPTIONAL] = " + str (self.isOptional) + ";\n")
+ if self.unit != None:
+ stream.write (" _value[UNIT] = \"" + self.unit + "\";\n")
+ if self.min != None:
+ stream.write (" _value[MIN] = " + self.min + ";\n")
+ if self.max != None:
+ stream.write (" _value[MAX] = " + self.max + ";\n")
+ if self.maxLen != None:
+ stream.write (" _value[MAXLEN] = " + self.maxLen + ";\n")
+ if self.desc != None:
+ stream.write (" _value[DESC] = \"" + self.desc + "\";\n")
+ stream.write (" _props[\"" + self.name + "\"] = _value;\n")
+ stream.write (" }\n\n")
+
+
+
def genSize (self, stream):
indent = " "
if self.isOptional:
@@ -419,6 +492,43 @@ class SchemaProperty:
if self.isOptional:
stream.write(" }\n")
+ def genUnmap (self, stream):
+ indent = " "
+ if self.isOptional:
+ stream.write(" _found = false;\n")
+ self.type.type.genUnmap (stream, self.name, indent, _optional=self.isOptional)
+ if self.isOptional:
+ stream.write(" if (_found) {\n")
+ stream.write(" presenceMask[presenceByte_%s] |= presenceMask_%s;\n" %
+ (self.name, self.name))
+ stream.write(" }\n")
+
+ def genMap (self, stream):
+ indent = " "
+ if self.isOptional:
+ stream.write(" if (presenceMask[presenceByte_%s] & presenceMask_%s) {\n" % (self.name, self.name))
+ indent = " "
+ self.type.type.genMap (stream, self.name, indent)
+ if self.isOptional:
+ stream.write(" }\n")
+
+
+ def __repr__(self):
+ m = {}
+ m["name"] = self.name
+ m["type"] = self.type
+ m["ref"] = self.ref
+ m["access"] = self.access
+ m["isIndex"] = self.isIndex
+ m["isParentRef"] = self.isParentRef
+ m["isGeneralRef"] = self.isGeneralRef
+ m["isOptional"] = self.isOptional
+ m["unit"] = self.unit
+ m["min"] = self.min
+ m["max"] = self.max
+ m["maxLen"] = self.maxLen
+ m["desc"] = self.desc
+ return str(m)
#=====================================================================================
#
@@ -492,6 +602,17 @@ class SchemaStatistic:
stream.write (" ft.setString (DESC, \"" + desc + "\");\n")
stream.write (" buf.put (ft);\n\n")
+ def genSchemaTextMap(self, stream, name, desc):
+ stream.write (" {\n")
+ stream.write (" ::qpid::types::Variant::Map _value;\n")
+ stream.write (" _value[TYPE] = TYPE_" + self.type.type.base +";\n")
+ if self.unit != None:
+ stream.write (" _value[UNIT] = \"" + self.unit + "\";\n")
+ if desc != None:
+ stream.write (" _value[DESC] = \"" + desc + "\";\n")
+ stream.write (" _stats[\"" + self.name + "\"] = _value;\n")
+ stream.write (" }\n\n")
+
def genSchema (self, stream):
if self.type.type.style != "mma":
self.genSchemaText (stream, self.name, self.desc)
@@ -518,6 +639,32 @@ class SchemaStatistic:
self.genSchemaText (stream, self.name + "Max", descMax)
self.genSchemaText (stream, self.name + "Average", descAverage)
+ def genSchemaMap (self, stream):
+ if self.type.type.style != "mma":
+ self.genSchemaTextMap (stream, self.name, self.desc)
+ if self.type.type.style == "wm":
+ descHigh = self.desc
+ descLow = self.desc
+ if self.desc != None:
+ descHigh = descHigh + " (High)"
+ descLow = descLow + " (Low)"
+ self.genSchemaTextMap (stream, self.name + "High", descHigh)
+ self.genSchemaTextMap (stream, self.name + "Low", descLow)
+ if self.type.type.style == "mma":
+ descCount = self.desc
+ descMin = self.desc
+ descMax = self.desc
+ descAverage = self.desc
+ if self.desc != None:
+ descCount = descCount + " (Samples)"
+ descMin = descMin + " (Min)"
+ descMax = descMax + " (Max)"
+ descAverage = descAverage + " (Average)"
+ self.genSchemaTextMap (stream, self.name + "Samples", descCount)
+ self.genSchemaTextMap (stream, self.name + "Min", descMin)
+ self.genSchemaTextMap (stream, self.name + "Max", descMax)
+ self.genSchemaTextMap (stream, self.name + "Average", descAverage)
+
def genAssign (self, stream):
if self.assign != None:
if self.type.type.perThread:
@@ -533,6 +680,12 @@ class SchemaStatistic:
else:
self.type.type.genWrite (stream, self.name)
+ def genMap (self, stream):
+ if self.type.type.perThread:
+ self.type.type.genMap(stream, "totals." + self.name, key=self.name)
+ else:
+ self.type.type.genMap(stream, self.name)
+
def genInitialize (self, stream, prefix="", indent=" "):
val = self.type.type.init
if self.type.type.style != "mma":
@@ -648,6 +801,30 @@ class SchemaArg:
stream.write (" ft.setString (DESC, \"" + self.desc + "\");\n")
stream.write (" buf.put (ft);\n\n")
+ def genSchemaMap (self, stream, event=False):
+ stream.write (" {\n")
+ stream.write (" ::qpid::types::Variant::Map _avalue;\n")
+ stream.write (" _avalue[TYPE] = TYPE_" + self.type.type.base +";\n")
+ if (not event):
+ stream.write (" _avalue[DIR] = \"" + self.dir + "\";\n")
+ if self.unit != None:
+ stream.write (" _avalue[UNIT] = \"" + self.unit + "\";\n")
+ if not event:
+ if self.min != None:
+ stream.write (" _avalue[MIN] = " + self.min + ";\n")
+ if self.max != None:
+ stream.write (" _avalue[MAX] = " + self.max + ";\n")
+ if self.maxLen != None:
+ stream.write (" _avalue[MAXLEN] = " + self.maxLen + ";\n")
+ if self.default != None:
+ stream.write (" _avalue[DEFAULT] = \"" + self.default + "\";\n")
+ if self.desc != None:
+ stream.write (" _avalue[DESC] = \"" + self.desc + "\";\n")
+ stream.write (" _args[\"" + self.name + "\"] = _avalue;\n")
+ stream.write (" }\n")
+
+
+
def genFormalParam (self, stream, variables):
stream.write ("%s _%s" % (self.type.type.asArg, self.name))
@@ -727,6 +904,24 @@ class SchemaMethod:
for arg in self.args:
arg.genSchema (stream)
+ def genSchemaMap (self, stream, variables):
+ stream.write (" {\n")
+ stream.write (" ::qpid::types::Variant::Map _value;\n")
+ stream.write (" ::qpid::types::Variant::Map _args;\n")
+ stream.write (" _value[ARGCOUNT] = " + str(len(self.args)) + ";\n")
+ if self.desc != None:
+ stream.write (" _value[DESC] = \"" + self.desc + "\";\n")
+
+ for arg in self.args:
+ arg.genSchemaMap (stream)
+
+ stream.write (" if (!_args.empty())\n")
+ stream.write (" _value[ARGS] = _args;\n")
+
+
+ stream.write (" _methods[\"" + self.name + "\"] = _value;\n")
+ stream.write (" }\n\n")
+
#=====================================================================================
#
#=====================================================================================
@@ -849,10 +1044,20 @@ class SchemaEvent:
for arg in self.args:
stream.write(" " + arg.type.type.encode.replace("@", "buf").replace("#", arg.name) + ";\n")
+ def genArgMap(self, stream, variables):
+ for arg in self.args:
+ arg.type.type.genMap(stream, arg.name, " ", mapName="map")
+ #stream.write(" " + arg.type.type.encode.replace("@", "buf").replace("#", arg.name) + ";\n")
+
+
def genArgSchema(self, stream, variables):
for arg in self.args:
arg.genSchema(stream, True)
+ def genArgSchemaMap(self, stream, variables):
+ for arg in self.args:
+ arg.genSchemaMap(stream, True)
+
def genSchemaMD5(self, stream, variables):
sum = self.hash.getDigest()
for idx in range (len (sum)):
@@ -1023,12 +1228,36 @@ class SchemaClass:
inArgCount = inArgCount + 1
if methodCount == 0:
- stream.write ("string&, Buffer&, Buffer& outBuf")
+ stream.write ("string&, const string&, string& outStr")
+ else:
+ if inArgCount == 0:
+ stream.write ("string& methodName, const string&, string& outStr")
+ else:
+ stream.write ("string& methodName, const string& inStr, string& outStr")
+
+
+ def genDoMapMethodArgs (self, stream, variables):
+ methodCount = 0
+ inArgCount = 0
+ for method in self.methods:
+ methodCount = methodCount + 1
+ for arg in method.args:
+ if arg.getDir () == "I" or arg.getDir () == "IO":
+ inArgCount = inArgCount + 1
+
+ if methodCount == 0:
+ stream.write ("string&," +
+ " const ::qpid::types::Variant::Map&," +
+ " ::qpid::types::Variant::Map& outMap")
else:
if inArgCount == 0:
- stream.write ("string& methodName, Buffer&, Buffer& outBuf")
+ stream.write ("string& methodName," +
+ " const ::qpid::types::Variant::Map&," +
+ " ::qpid::types::Variant::Map& outMap")
else:
- stream.write ("string& methodName, Buffer& inBuf, Buffer& outBuf")
+ stream.write ("string& methodName," +
+ " const ::qpid::types::Variant::Map& inMap," +
+ " ::qpid::types::Variant::Map& outMap")
def genHiLoStatResets (self, stream, variables):
for inst in self.statistics:
@@ -1109,8 +1338,22 @@ class SchemaClass:
stream.write ("%d" % len (self.methods))
def genMethodHandlers (self, stream, variables):
+ inArgs = False
+ for method in self.methods:
+ for arg in method.args:
+ if arg.getDir () == "I" or arg.getDir () == "IO":
+ inArgs = True;
+ break
+
+ if inArgs:
+ stream.write("\n")
+ stream.write(" char *_tmpBuf = new char[inStr.length()];\n")
+ stream.write(" memcpy(_tmpBuf, inStr.data(), inStr.length());\n")
+ stream.write(" ::qpid::framing::Buffer inBuf(_tmpBuf, inStr.length());\n")
+
for method in self.methods:
stream.write ("\n if (methodName == \"" + method.getName () + "\") {\n")
+ stream.write (" _matched = true;\n")
if method.getArgCount () == 0:
stream.write (" ::qpid::management::ArgsNone ioArgs;\n")
else:
@@ -1131,7 +1374,43 @@ class SchemaClass:
stream.write (" " +\
arg.type.type.getWriteCode ("ioArgs." +\
arg.dir.lower () + "_" +\
- arg.name, "outBuf") + ";\n")
+ arg.name, "outBuf") + ";\n")
+ stream.write(" }\n")
+
+ if inArgs:
+ stream.write ("\n delete [] _tmpBuf;\n")
+
+
+
+ def genMapMethodHandlers (self, stream, variables):
+ for method in self.methods:
+ stream.write ("\n if (methodName == \"" + method.getName () + "\") {\n")
+ if method.getArgCount () == 0:
+ stream.write (" ::qpid::management::ArgsNone ioArgs;\n")
+ else:
+ stream.write (" Args" + method.getFullName () + " ioArgs;\n")
+ stream.write (" ::qpid::types::Variant::Map::const_iterator _i;\n")
+
+ # decode each input argument from the input map
+ for arg in method.args:
+ if arg.getDir () == "I" or arg.getDir () == "IO":
+ arg.type.type.genUnmap(stream,
+ "ioArgs." + arg.dir.lower () + "_" + arg.name,
+ " ",
+ arg.name,
+ "inMap")
+
+ stream.write (" status = coreObject->ManagementMethod (METHOD_" +\
+ method.getName().upper() + ", ioArgs, text);\n")
+ stream.write (" outMap[\"_status_code\"] = (uint32_t) status;\n")
+ stream.write (" outMap[\"_status_text\"] = ::qpid::management::Manageable::StatusText(status, text);\n")
+ for arg in method.args:
+ if arg.getDir () == "O" or arg.getDir () == "IO":
+ arg.type.type.genMap(stream,
+ "ioArgs." + arg.dir.lower () + "_" + arg.name,
+ " ",
+ arg.name,
+ "outMap")
stream.write (" return;\n }\n")
def genOpenNamespaces (self, stream, variables):
@@ -1160,6 +1439,10 @@ class SchemaClass:
for prop in self.properties:
prop.genSchema (stream)
+ def genPropertySchemaMap (self, stream, variables):
+ for prop in self.properties:
+ prop.genSchemaMap(stream)
+
def genSetGeneralReferenceDeclaration (self, stream, variables):
for prop in self.properties:
if prop.isGeneralRef:
@@ -1169,6 +1452,10 @@ class SchemaClass:
for stat in self.statistics:
stat.genSchema (stream)
+ def genStatisticSchemaMap (self, stream, variables):
+ for stat in self.statistics:
+ stat.genSchemaMap(stream)
+
def genMethodIdDeclarations (self, stream, variables):
number = 1
for method in self.methods:
@@ -1180,6 +1467,10 @@ class SchemaClass:
for method in self.methods:
method.genSchema (stream, variables)
+ def genMethodSchemaMap(self, stream, variables):
+ for method in self.methods:
+ method.genSchemaMap(stream, variables)
+
def genNameCap (self, stream, variables):
stream.write (capitalize(self.name))
@@ -1241,6 +1532,17 @@ class SchemaClass:
for stat in self.statistics:
stat.genWrite (stream)
+ def genMapEncodeProperties(self, stream, variables):
+ for prop in self.properties:
+ prop.genMap (stream)
+
+ def genMapEncodeStatistics (self, stream, variables):
+ for stat in self.statistics:
+ stat.genMap (stream)
+
+ def genMapDecodeProperties (self, stream, variables):
+ for prop in self.properties:
+ prop.genUnmap (stream)
class SchemaEventArgs:
def __init__(self, package, node, typespec, fragments, options):
diff --git a/cpp/managementgen/qmfgen/templates/Args.h b/cpp/managementgen/qmfgen/templates/Args.h
index 074ccf9940..20681ab477 100644
--- a/cpp/managementgen/qmfgen/templates/Args.h
+++ b/cpp/managementgen/qmfgen/templates/Args.h
@@ -24,8 +24,8 @@
/*MGEN:Root.Disclaimer*/
#include "qpid/management/Args.h"
-#include "qpid/framing/FieldTable.h"
-#include "qpid/framing/Uuid.h"
+//#include "qpid/framing/FieldTable.h"
+//#include "qpid/framing/Uuid.h"
#include <string>
namespace qmf {
diff --git a/cpp/managementgen/qmfgen/templates/Class.cpp b/cpp/managementgen/qmfgen/templates/Class.cpp
index e6362758ba..ed4e17720f 100644
--- a/cpp/managementgen/qmfgen/templates/Class.cpp
+++ b/cpp/managementgen/qmfgen/templates/Class.cpp
@@ -21,15 +21,15 @@
/*MGEN:Root.Disclaimer*/
#include "qpid/log/Statement.h"
+#include "qpid/management/Manageable.h"
#include "qpid/framing/FieldTable.h"
-#include "qpid/management/Manageable.h"
+#include "qpid/framing/Buffer.h"
#include "qpid//*MGEN:Class.AgentHeaderLocation*//ManagementAgent.h"
#include "/*MGEN:Class.NameCap*/.h"
/*MGEN:Class.MethodArgIncludes*/
#include <iostream>
using namespace qmf::/*MGEN:Class.Namespace*/;
-using namespace qpid::framing;
using qpid::management::ManagementAgent;
using qpid::management::Manageable;
using qpid::management::ManagementObject;
@@ -38,7 +38,7 @@ using std::string;
string /*MGEN:Class.NameCap*/::packageName = string ("/*MGEN:Class.NamePackageLower*/");
string /*MGEN:Class.NameCap*/::className = string ("/*MGEN:Class.NameLower*/");
-uint8_t /*MGEN:Class.NameCap*/::md5Sum[16] =
+uint8_t /*MGEN:Class.NameCap*/::md5Sum[MD5_LEN] =
{/*MGEN:Class.SchemaMD5*/};
/*MGEN:Class.NameCap*/::/*MGEN:Class.NameCap*/ (ManagementAgent*, Manageable* _core/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/) :
@@ -90,9 +90,12 @@ void /*MGEN:Class.NameCap*/::registerSelf(ManagementAgent* agent)
agent->registerClass(packageName, className, md5Sum, writeSchema);
}
-void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf)
+void /*MGEN:Class.NameCap*/::writeSchema (std::string& schema)
{
- FieldTable ft;
+ const int _bufSize=65536;
+ char _msgChars[_bufSize];
+ ::qpid::framing::Buffer buf(_msgChars, _bufSize);
+ ::qpid::framing::FieldTable ft;
// Schema class header:
buf.putOctet (CLASS_KIND_TABLE);
@@ -109,10 +112,15 @@ void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf)
/*MGEN:Class.StatisticSchema*/
// Methods
/*MGEN:Class.MethodSchema*/
+ {
+ uint32_t _len = buf.getPosition();
+ buf.reset();
+ buf.getRawData(schema, _len);
+ }
}
/*MGEN:IF(Class.ExistPerThreadStats)*/
-void /*MGEN:Class.NameCap*/::aggregatePerThreadStats(struct PerThreadStats* totals)
+void /*MGEN:Class.NameCap*/::aggregatePerThreadStats(struct PerThreadStats* totals) const
{
/*MGEN:Class.InitializeTotalPerThreadStats*/
for (int idx = 0; idx < maxThreads; idx++) {
@@ -124,6 +132,7 @@ void /*MGEN:Class.NameCap*/::aggregatePerThreadStats(struct PerThreadStats* tota
}
/*MGEN:ENDIF*/
+/*MGEN:IF(Root.GenQMFv1)*/
uint32_t /*MGEN:Class.NameCap*/::writePropertiesSize() const
{
uint32_t size = writeTimestampsSize();
@@ -134,32 +143,62 @@ uint32_t /*MGEN:Class.NameCap*/::writePropertiesSize() const
return size;
}
-void /*MGEN:Class.NameCap*/::readProperties (Buffer& buf)
+void /*MGEN:Class.NameCap*/::readProperties (const std::string& _sBuf)
{
+ char *_tmpBuf = new char[_sBuf.length()];
+ memcpy(_tmpBuf, _sBuf.data(), _sBuf.length());
+ ::qpid::framing::Buffer buf(_tmpBuf, _sBuf.length());
::qpid::sys::Mutex::ScopedLock mutex(accessLock);
- readTimestamps(buf);
+
+ {
+ std::string _tbuf;
+ buf.getRawData(_tbuf, writeTimestampsSize());
+ readTimestamps(_tbuf);
+ }
+
/*MGEN:IF(Class.ExistOptionals)*/
for (uint8_t idx = 0; idx < /*MGEN:Class.PresenceMaskBytes*/; idx++)
presenceMask[idx] = buf.getOctet();
/*MGEN:ENDIF*/
/*MGEN:Class.ReadProperties*/
+
+ delete [] _tmpBuf;
}
-void /*MGEN:Class.NameCap*/::writeProperties (Buffer& buf) const
+void /*MGEN:Class.NameCap*/::writeProperties (std::string& _sBuf) const
{
+ const int _bufSize=65536;
+ char _msgChars[_bufSize];
+ ::qpid::framing::Buffer buf(_msgChars, _bufSize);
+
::qpid::sys::Mutex::ScopedLock mutex(accessLock);
configChanged = false;
- writeTimestamps (buf);
+ {
+ std::string _tbuf;
+ writeTimestamps(_tbuf);
+ buf.putRawData(_tbuf);
+ }
+
+
/*MGEN:IF(Class.ExistOptionals)*/
for (uint8_t idx = 0; idx < /*MGEN:Class.PresenceMaskBytes*/; idx++)
buf.putOctet(presenceMask[idx]);
/*MGEN:ENDIF*/
/*MGEN:Class.WriteProperties*/
+
+ uint32_t _bufLen = buf.getPosition();
+ buf.reset();
+
+ buf.getRawData(_sBuf, _bufLen);
}
-void /*MGEN:Class.NameCap*/::writeStatistics (Buffer& buf, bool skipHeaders)
+void /*MGEN:Class.NameCap*/::writeStatistics (std::string& _sBuf, bool skipHeaders)
{
+ const int _bufSize=65536;
+ char _msgChars[_bufSize];
+ ::qpid::framing::Buffer buf(_msgChars, _bufSize);
+
::qpid::sys::Mutex::ScopedLock mutex(accessLock);
instChanged = false;
/*MGEN:IF(Class.ExistPerThreadAssign)*/
@@ -175,8 +214,12 @@ void /*MGEN:Class.NameCap*/::writeStatistics (Buffer& buf, bool skipHeaders)
aggregatePerThreadStats(&totals);
/*MGEN:ENDIF*/
/*MGEN:Class.Assign*/
- if (!skipHeaders)
- writeTimestamps (buf);
+ if (!skipHeaders) {
+ std::string _tbuf;
+ writeTimestamps (_tbuf);
+ buf.putRawData(_tbuf);
+ }
+
/*MGEN:Class.WriteStatistics*/
// Maintenance of hi-lo statistics
@@ -189,6 +232,11 @@ void /*MGEN:Class.NameCap*/::writeStatistics (Buffer& buf, bool skipHeaders)
}
}
/*MGEN:ENDIF*/
+
+ uint32_t _bufLen = buf.getPosition();
+ buf.reset();
+
+ buf.getRawData(_sBuf, _bufLen);
}
void /*MGEN:Class.NameCap*/::doMethod (/*MGEN:Class.DoMethodArgs*/)
@@ -196,11 +244,25 @@ void /*MGEN:Class.NameCap*/::doMethod (/*MGEN:Class.DoMethodArgs*/)
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
std::string text;
+ bool _matched = false;
+
+ const int _bufSize=65536;
+ char _msgChars[_bufSize];
+ ::qpid::framing::Buffer outBuf(_msgChars, _bufSize);
+
/*MGEN:Class.MethodHandlers*/
- outBuf.putLong(status);
- outBuf.putShortString(Manageable::StatusText(status, text));
-}
+ if (!_matched) {
+ outBuf.putLong(status);
+ outBuf.putShortString(Manageable::StatusText(status, text));
+ }
+
+ uint32_t _bufLen = outBuf.getPosition();
+ outBuf.reset();
+
+ outBuf.getRawData(outStr, _bufLen);
+}
+/*MGEN:ENDIF*/
std::string /*MGEN:Class.NameCap*/::getKey() const
{
std::stringstream key;
@@ -209,3 +271,67 @@ std::string /*MGEN:Class.NameCap*/::getKey() const
return key.str();
}
+
+
+void /*MGEN:Class.NameCap*/::mapEncodeValues (::qpid::types::Variant::Map& _map,
+ bool includeProperties,
+ bool includeStatistics)
+{
+ using namespace ::qpid::types;
+ ::qpid::sys::Mutex::ScopedLock mutex(accessLock);
+
+ if (includeProperties) {
+ configChanged = false;
+/*MGEN:Class.MapEncodeProperties*/
+ }
+
+ if (includeStatistics) {
+ instChanged = false;
+/*MGEN:IF(Class.ExistPerThreadAssign)*/
+ for (int idx = 0; idx < maxThreads; idx++) {
+ struct PerThreadStats* threadStats = perThreadStatsArray[idx];
+ if (threadStats != 0) {
+/*MGEN:Class.PerThreadAssign*/
+ }
+ }
+/*MGEN:ENDIF*/
+/*MGEN:IF(Class.ExistPerThreadStats)*/
+ struct PerThreadStats totals;
+ aggregatePerThreadStats(&totals);
+/*MGEN:ENDIF*/
+/*MGEN:Class.Assign*/
+
+/*MGEN:Class.MapEncodeStatistics*/
+
+ // Maintenance of hi-lo statistics
+/*MGEN:Class.HiLoStatResets*/
+/*MGEN:IF(Class.ExistPerThreadResets)*/
+ for (int idx = 0; idx < maxThreads; idx++) {
+ struct PerThreadStats* threadStats = perThreadStatsArray[idx];
+ if (threadStats != 0) {
+/*MGEN:Class.PerThreadHiLoStatResets*/
+ }
+ }
+/*MGEN:ENDIF*/
+ }
+}
+
+void /*MGEN:Class.NameCap*/::mapDecodeValues (const ::qpid::types::Variant::Map& _map)
+{
+ ::qpid::types::Variant::Map::const_iterator _i;
+ ::qpid::sys::Mutex::ScopedLock mutex(accessLock);
+/*MGEN:IF(Class.ExistOptionals)*/
+ bool _found;
+/*MGEN:ENDIF*/
+/*MGEN:Class.MapDecodeProperties*/
+}
+
+void /*MGEN:Class.NameCap*/::doMethod (/*MGEN:Class.DoMapMethodArgs*/)
+{
+ Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+ std::string text;
+
+/*MGEN:Class.MapMethodHandlers*/
+ outMap["_status_code"] = (uint32_t) status;
+ outMap["_status_text"] = Manageable::StatusText(status, text);
+}
diff --git a/cpp/managementgen/qmfgen/templates/Class.h b/cpp/managementgen/qmfgen/templates/Class.h
index 8efacd650f..cdb31c4c9e 100644
--- a/cpp/managementgen/qmfgen/templates/Class.h
+++ b/cpp/managementgen/qmfgen/templates/Class.h
@@ -24,8 +24,6 @@
/*MGEN:Root.Disclaimer*/
#include "qpid/management/ManagementObject.h"
-#include "qpid/framing/FieldTable.h"
-#include "qpid/framing/Uuid.h"
namespace qpid {
namespace management {
@@ -42,7 +40,7 @@ class /*MGEN:Class.NameCap*/ : public ::qpid::management::ManagementObject
static std::string packageName;
static std::string className;
- static uint8_t md5Sum[16];
+ static uint8_t md5Sum[MD5_LEN];
/*MGEN:IF(Class.ExistOptionals)*/
uint8_t presenceMask[/*MGEN:Class.PresenceMaskBytes*/];
/*MGEN:Class.PresenceMaskConstants*/
@@ -71,18 +69,28 @@ class /*MGEN:Class.NameCap*/ : public ::qpid::management::ManagementObject
return threadStats;
}
- void aggregatePerThreadStats(struct PerThreadStats*);
+ void aggregatePerThreadStats(struct PerThreadStats*) const;
/*MGEN:ENDIF*/
public:
- static void writeSchema(::qpid::framing::Buffer& buf);
+ static void writeSchema(std::string& schema);
+ void mapEncodeValues(::qpid::types::Variant::Map& map,
+ bool includeProperties=true,
+ bool includeStatistics=true);
+ void mapDecodeValues(const ::qpid::types::Variant::Map& map);
+ void doMethod(std::string& methodName,
+ const ::qpid::types::Variant::Map& inMap,
+ ::qpid::types::Variant::Map& outMap);
+ std::string getKey() const;
+/*MGEN:IF(Root.GenQMFv1)*/
uint32_t writePropertiesSize() const;
- void readProperties(::qpid::framing::Buffer& buf);
- void writeProperties(::qpid::framing::Buffer& buf) const;
- void writeStatistics(::qpid::framing::Buffer& buf, bool skipHeaders = false);
+ void readProperties(const std::string& buf);
+ void writeProperties(std::string& buf) const;
+ void writeStatistics(std::string& buf, bool skipHeaders = false);
void doMethod(std::string& methodName,
- ::qpid::framing::Buffer& inBuf,
- ::qpid::framing::Buffer& outBuf);
- std::string getKey() const;
+ const std::string& inBuf,
+ std::string& outBuf);
+/*MGEN:ENDIF*/
+
writeSchemaCall_t getWriteSchemaCall() { return writeSchema; }
/*MGEN:IF(Class.NoStatistics)*/
// Stub for getInstChanged. There are no statistics in this class.
diff --git a/cpp/managementgen/qmfgen/templates/Event.cpp b/cpp/managementgen/qmfgen/templates/Event.cpp
index a4fc28990d..d760fd9014 100644
--- a/cpp/managementgen/qmfgen/templates/Event.cpp
+++ b/cpp/managementgen/qmfgen/templates/Event.cpp
@@ -21,13 +21,13 @@
/*MGEN:Root.Disclaimer*/
#include "qpid/log/Statement.h"
-#include "qpid/framing/FieldTable.h"
#include "qpid/management/Manageable.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/Buffer.h"
#include "qpid//*MGEN:Event.AgentHeaderLocation*//ManagementAgent.h"
#include "Event/*MGEN:Event.NameCap*/.h"
using namespace qmf::/*MGEN:Event.Namespace*/;
-using namespace qpid::framing;
using qpid::management::ManagementAgent;
using qpid::management::Manageable;
using qpid::management::ManagementObject;
@@ -56,23 +56,45 @@ void Event/*MGEN:Event.NameCap*/::registerSelf(ManagementAgent* agent)
agent->registerEvent(packageName, eventName, md5Sum, writeSchema);
}
-void Event/*MGEN:Event.NameCap*/::writeSchema (Buffer& buf)
+void Event/*MGEN:Event.NameCap*/::writeSchema (std::string& schema)
{
- FieldTable ft;
+ const int _bufSize = 65536;
+ char _msgChars[_bufSize];
+ ::qpid::framing::Buffer buf(_msgChars, _bufSize);
+ ::qpid::framing::FieldTable ft;
// Schema class header:
buf.putOctet (CLASS_KIND_EVENT);
buf.putShortString (packageName); // Package Name
buf.putShortString (eventName); // Event Name
buf.putBin128 (md5Sum); // Schema Hash
- buf.putOctet (0); // No Superclass
buf.putShort (/*MGEN:Event.ArgCount*/); // Argument Count
// Arguments
/*MGEN:Event.ArgSchema*/
+ {
+ uint32_t _len = buf.getPosition();
+ buf.reset();
+ buf.getRawData(schema, _len);
+ }
}
-void Event/*MGEN:Event.NameCap*/::encode(::qpid::framing::Buffer& buf) const
+void Event/*MGEN:Event.NameCap*/::encode(std::string& _sBuf) const
{
+ const int _bufSize=65536;
+ char _msgChars[_bufSize];
+ ::qpid::framing::Buffer buf(_msgChars, _bufSize);
+
/*MGEN:Event.ArgEncodes*/
+
+ uint32_t _bufLen = buf.getPosition();
+ buf.reset();
+
+ buf.getRawData(_sBuf, _bufLen);
+}
+
+void Event/*MGEN:Event.NameCap*/::mapEncode(::qpid::types::Variant::Map& map) const
+{
+ using namespace ::qpid::types;
+/*MGEN:Event.ArgMap*/
}
diff --git a/cpp/managementgen/qmfgen/templates/Event.h b/cpp/managementgen/qmfgen/templates/Event.h
index b5c2a211d1..4f912cf220 100644
--- a/cpp/managementgen/qmfgen/templates/Event.h
+++ b/cpp/managementgen/qmfgen/templates/Event.h
@@ -24,8 +24,6 @@
/*MGEN:Root.Disclaimer*/
#include "qpid/management/ManagementEvent.h"
-#include "qpid/framing/FieldTable.h"
-#include "qpid/framing/Uuid.h"
namespace qmf {
/*MGEN:Event.OpenNamespaces*/
@@ -33,10 +31,10 @@ namespace qmf {
class Event/*MGEN:Event.NameCap*/ : public ::qpid::management::ManagementEvent
{
private:
- static void writeSchema (::qpid::framing::Buffer& buf);
+ static void writeSchema (std::string& schema);
static std::string packageName;
static std::string eventName;
- static uint8_t md5Sum[16];
+ static uint8_t md5Sum[MD5_LEN];
/*MGEN:Event.ArgDeclarations*/
@@ -51,7 +49,8 @@ class Event/*MGEN:Event.NameCap*/ : public ::qpid::management::ManagementEvent
std::string& getEventName() const { return eventName; }
uint8_t* getMd5Sum() const { return md5Sum; }
uint8_t getSeverity() const { return /*MGEN:Event.Severity*/; }
- void encode(::qpid::framing::Buffer& buffer) const;
+ void encode(std::string& buffer) const;
+ void mapEncode(::qpid::types::Variant::Map& map) const;
};
}/*MGEN:Event.CloseNamespaces*/
diff --git a/cpp/src/qpid/acl/Acl.cpp b/cpp/src/qpid/acl/Acl.cpp
index 21a9e2055e..e510920f6c 100644
--- a/cpp/src/qpid/acl/Acl.cpp
+++ b/cpp/src/qpid/acl/Acl.cpp
@@ -23,6 +23,7 @@
#include "qpid/Plugin.h"
#include "qpid/Options.h"
#include "qpid/log/Logger.h"
+#include "qpid/types/Variant.h"
#include "qmf/org/apache/qpid/acl/Package.h"
#include "qmf/org/apache/qpid/acl/EventAllow.h"
#include "qmf/org/apache/qpid/acl/EventDeny.h"
@@ -94,7 +95,7 @@ Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(fals
" ObjectType:" << AclHelper::getObjectTypeStr(objType) << " Name:" << name );
agent->raiseEvent(_qmf::EventAllow(id, AclHelper::getActionStr(action),
AclHelper::getObjectTypeStr(objType),
- name, framing::FieldTable()));
+ name, types::Variant::Map()));
case ALLOW:
return true;
case DENY:
@@ -106,7 +107,7 @@ Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(fals
QPID_LOG(info, "ACL Deny id:" << id << " action:" << AclHelper::getActionStr(action) << " ObjectType:" << AclHelper::getObjectTypeStr(objType) << " Name:" << name);
agent->raiseEvent(_qmf::EventDeny(id, AclHelper::getActionStr(action),
AclHelper::getObjectTypeStr(objType),
- name, framing::FieldTable()));
+ name, types::Variant::Map()));
return false;
}
return false;
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 637645bb04..42bc36c4b8 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -22,7 +22,7 @@
#include "qpid/management/ManagementObject.h"
#include "qpid/log/Statement.h"
#include "qpid/agent/ManagementAgentImpl.h"
-#include "qpid/sys/Mutex.h"
+#include "qpid/amqp_0_10/Codecs.h"
#include <list>
#include <string.h>
#include <stdlib.h>
@@ -41,6 +41,9 @@ using std::ofstream;
using std::ifstream;
using std::string;
using std::endl;
+using qpid::types::Variant;
+using qpid::amqp_0_10::MapCodec;
+using qpid::amqp_0_10::ListCodec;
namespace {
Mutex lock;
@@ -81,7 +84,7 @@ const string ManagementAgentImpl::storeMagicNumber("MA02");
ManagementAgentImpl::ManagementAgentImpl() :
interval(10), extThread(false), pipeHandle(0), notifyCallback(0), notifyContext(0),
notifyable(0), inCallback(false),
- initialized(false), connected(false), lastFailure("never connected"),
+ initialized(false), connected(false), useMapMsg(false), lastFailure("never connected"),
clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0),
assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0),
connThreadBody(*this), connThread(connThreadBody),
@@ -117,6 +120,21 @@ ManagementAgentImpl::~ManagementAgentImpl()
}
}
+void ManagementAgentImpl::setName(const string& vendor, const string& product, const string& instance)
+{
+ attrMap["_vendor"] = vendor;
+ attrMap["_product"] = product;
+ string inst;
+ if (instance.empty()) {
+ inst = qpid::types::Uuid(true).str();
+ } else
+ inst = instance;
+
+ name_address = vendor + ":" + product + ":" + inst;
+ attrMap["_instance"] = inst;
+ attrMap["_name"] = name_address;
+}
+
void ManagementAgentImpl::init(const string& brokerHost,
uint16_t brokerPort,
uint16_t intervalSeconds,
@@ -140,7 +158,7 @@ void ManagementAgentImpl::init(const string& brokerHost,
void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings,
uint16_t intervalSeconds,
bool useExternalThread,
- const std::string& _storeFile)
+ const string& _storeFile)
{
interval = intervalSeconds;
extThread = useExternalThread;
@@ -157,13 +175,16 @@ void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings,
bootSequence = 1;
storeData(true);
+ if (attrMap.empty())
+ setName("vendor", "product");
+
initialized = true;
}
void ManagementAgentImpl::registerClass(const string& packageName,
const string& className,
uint8_t* md5Sum,
- qpid::management::ManagementObject::writeSchemaCall_t schemaCall)
+ ManagementObject::writeSchemaCall_t schemaCall)
{
Mutex::ScopedLock lock(agentLock);
PackageMap::iterator pIter = findOrAddPackage(packageName);
@@ -173,49 +194,77 @@ void ManagementAgentImpl::registerClass(const string& packageName,
void ManagementAgentImpl::registerEvent(const string& packageName,
const string& eventName,
uint8_t* md5Sum,
- qpid::management::ManagementObject::writeSchemaCall_t schemaCall)
+ ManagementObject::writeSchemaCall_t schemaCall)
{
Mutex::ScopedLock lock(agentLock);
PackageMap::iterator pIter = findOrAddPackage(packageName);
addClassLocal(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
}
+// old-style add object: 64bit id - deprecated
ObjectId ManagementAgentImpl::addObject(ManagementObject* object,
uint64_t persistId)
{
+ std::string key;
+ if (persistId) {
+ key = boost::lexical_cast<std::string>(persistId);
+ }
+ return addObject(object, key, persistId != 0);
+}
+
+
+// new style add object - use this approach!
+ObjectId ManagementAgentImpl::addObject(ManagementObject* object,
+ const std::string& key,
+ bool persistent)
+{
Mutex::ScopedLock lock(addLock);
- uint16_t sequence = persistId ? 0 : bootSequence;
- uint64_t objectNum = persistId ? persistId : nextObjectId++;
- ObjectId objectId(&attachment, 0, sequence, objectNum);
+ uint16_t sequence = persistent ? 0 : bootSequence;
+
+ ObjectId objectId(&attachment, 0, sequence);
+ if (key.empty())
+ objectId.setV2Key(*object); // let object generate the key
+ else
+ objectId.setV2Key(key);
- // TODO: fix object-id handling
object->setObjectId(objectId);
newManagementObjects[objectId] = object;
return objectId;
}
+
void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t severity)
{
Mutex::ScopedLock lock(agentLock);
Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
stringstream key;
key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." <<
event.getPackageName() << "." << event.getEventName();
- encodeHeader(outBuffer, 'e');
- outBuffer.putShortString(event.getPackageName());
- outBuffer.putShortString(event.getEventName());
- outBuffer.putBin128(event.getMd5Sum());
- outBuffer.putLongLong(uint64_t(Duration(now())));
- outBuffer.putOctet(sev);
- event.encode(outBuffer);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", key.str());
+ Variant::Map map_;
+ Variant::Map schemaId;
+ Variant::Map values;
+ Variant::Map headers;
+ string content;
+
+ map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
+ event.getEventName(),
+ event.getMd5Sum());
+ event.mapEncode(values);
+ map_["_values"] = values;
+ map_["_timestamp"] = uint64_t(Duration(now()));
+ map_["_severity"] = sev;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_event";
+ headers["qmf.agent"] = name_address;
+
+ MapCodec::encode(map_, content);
+ connThreadBody.sendBuffer(content, "", headers, "qmf.default.topic", key.str());
}
uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
@@ -235,8 +284,7 @@ uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
methodQueue.pop_front();
{
Mutex::ScopedUnlock unlock(agentLock);
- Buffer inBuffer(const_cast<char*>(item->body.c_str()), item->body.size());
- invokeMethodRequest(inBuffer, item->sequence, item->replyTo);
+ invokeMethodRequest(item->body, item->cid, item->replyTo);
delete item;
}
}
@@ -274,20 +322,7 @@ void ManagementAgentImpl::setSignalCallback(Notifyable& _notifyable)
void ManagementAgentImpl::startProtocol()
{
- char rawbuffer[512];
- Buffer buffer(rawbuffer, 512);
-
- connected = true;
- encodeHeader(buffer, 'A');
- buffer.putShortString("RemoteAgent [C++]");
- systemId.encode (buffer);
- buffer.putLong(requestedBrokerBank);
- buffer.putLong(requestedAgentBank);
- uint32_t length = buffer.getPosition();
- buffer.reset();
- connThreadBody.sendBuffer(buffer, length, "qpid.management", "broker");
- QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank <<
- " reqAgent=" << requestedAgentBank);
+ sendHeartbeat();
}
void ManagementAgentImpl::storeData(bool requested)
@@ -323,76 +358,54 @@ void ManagementAgentImpl::retrieveData()
}
}
-void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequence,
- uint32_t code, string text)
+void ManagementAgentImpl::sendHeartbeat()
{
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ static const string addr_exchange("qmf.default.topic");
+ static const string addr_key("agent.ind.heartbeat");
- encodeHeader(outBuffer, 'z', sequence);
- outBuffer.putLong(code);
- outBuffer.putShortString(text);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyToKey);
- QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text);
-}
+ Variant::Map map;
+ Variant::Map headers;
+ string content;
-void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
-{
- Mutex::ScopedLock lock(agentLock);
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_agent_heartbeat_indication";
+ headers["qmf.agent"] = name_address;
- assignedBrokerBank = inBuffer.getLong();
- assignedAgentBank = inBuffer.getLong();
+ map["_values"] = attrMap;
+ map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
+ map["_values"].asMap()["heartbeat_interval"] = interval;
- QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank);
+ MapCodec::encode(map, content);
+ connThreadBody.sendBuffer(content, "", headers, addr_exchange, addr_key);
- if ((assignedBrokerBank != requestedBrokerBank) ||
- (assignedAgentBank != requestedAgentBank)) {
- if (requestedAgentBank == 0) {
- QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." <<
- assignedAgentBank);
- } else {
- QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank <<
- "." << assignedAgentBank);
- }
- storeData();
- requestedBrokerBank = assignedBrokerBank;
- requestedAgentBank = assignedAgentBank;
- }
+ QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
+}
- attachment.setBanks(assignedBrokerBank, assignedAgentBank);
+void ManagementAgentImpl::sendException(const string& replyToKey, const string& cid,
+ const string& text, uint32_t code)
+{
+ static const string addr_exchange("qmf.default.direct");
- // Bind to qpid.management to receive commands
- connThreadBody.bindToBank(assignedBrokerBank, assignedAgentBank);
+ Variant::Map map;
+ Variant::Map headers;
+ Variant::Map values;
+ string content;
- // Send package indications for all local packages
- for (PackageMap::iterator pIter = packages.begin();
- pIter != packages.end();
- pIter++) {
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_exception";
+ headers["qmf.agent"] = name_address;
- encodeHeader(outBuffer, 'p');
- encodePackageIndication(outBuffer, pIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
+ values["error_code"] = code;
+ values["error_text"] = text;
+ map["_values"] = values;
- // Send class indications for all local classes
- ClassMap cMap = pIter->second;
- for (ClassMap::iterator cIter = cMap.begin(); cIter != cMap.end(); cIter++) {
- outBuffer.reset();
- encodeHeader(outBuffer, 'q');
- encodeClassIndication(outBuffer, pIter, cIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
- }
- }
+ MapCodec::encode(map, content);
+ connThreadBody.sendBuffer(content, cid, headers, addr_exchange, replyToKey);
+
+ QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text);
}
-void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence)
+void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo)
{
Mutex::ScopedLock lock(agentLock);
string packageName;
@@ -412,12 +425,14 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc
SchemaClass& schema = cIter->second;
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
+ string body;
encodeHeader(outBuffer, 's', sequence);
- schema.writeSchemaCall(outBuffer);
+ schema.writeSchemaCall(body);
+ outBuffer.putRawData(body);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
+ connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name);
}
@@ -432,124 +447,250 @@ void ManagementAgentImpl::handleConsoleAddedIndication()
QPID_LOG(trace, "RCVD ConsoleAddedInd");
}
-void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
+void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& cid, const string& replyTo)
{
- string methodName;
- string packageName;
- string className;
- uint8_t hash[16];
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ string methodName;
+ bool failed = false;
+ Variant::Map inMap;
+ Variant::Map outMap;
+ Variant::Map::const_iterator oid, mid;
+ string content;
+
+ MapCodec::decode(body, inMap);
+
+ outMap["_values"] = Variant::Map();
+
+ if ((oid = inMap.find("_object_id")) == inMap.end() ||
+ (mid = inMap.find("_method_name")) == inMap.end()) {
+ (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_PARAMETER_INVALID;
+ (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID);
+ failed = true;
+ } else {
+ string methodName;
+ ObjectId objId;
+ Variant::Map inArgs;
+ Variant::Map callMap;
- ObjectId objId(inBuffer);
- inBuffer.getShortString(packageName);
- inBuffer.getShortString(className);
- inBuffer.getBin128(hash);
- inBuffer.getShortString(methodName);
+ try {
+ // conversions will throw if input is invalid.
+ objId = ObjectId(oid->second.asMap());
+ methodName = mid->second.getString();
- encodeHeader(outBuffer, 'm', sequence);
+ mid = inMap.find("_arguments");
+ if (mid != inMap.end()) {
+ inArgs = (mid->second).asMap();
+ }
- ManagementObjectMap::iterator iter = managementObjects.find(objId);
- if (iter == managementObjects.end() || iter->second->isDeleted()) {
- outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
- outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT));
- } else {
- if ((iter->second->getPackageName() != packageName) ||
- (iter->second->getClassName() != className)) {
- outBuffer.putLong (Manageable::STATUS_PARAMETER_INVALID);
- outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID));
- }
- else
- try {
- outBuffer.record();
- iter->second->doMethod(methodName, inBuffer, outBuffer);
- } catch(exception& e) {
- outBuffer.restore();
- outBuffer.putLong(Manageable::STATUS_EXCEPTION);
- outBuffer.putMediumString(e.what());
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ if (iter == managementObjects.end() || iter->second->isDeleted()) {
+ (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_UNKNOWN_OBJECT;
+ (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT);
+ failed = true;
+ } else {
+ iter->second->doMethod(methodName, inArgs, callMap);
}
+
+ if (callMap["_status_code"].asUint32() == 0) {
+ outMap["_arguments"] = Variant::Map();
+ for (Variant::Map::const_iterator iter = callMap.begin();
+ iter != callMap.end(); iter++)
+ if (iter->first != "_status_code" && iter->first != "_status_text")
+ outMap["_arguments"].asMap()[iter->first] = iter->second;
+ } else {
+ (outMap["_values"].asMap())["_status_code"] = callMap["_status_code"];
+ (outMap["_values"].asMap())["_status_text"] = callMap["_status_text"];
+ failed = true;
+ }
+
+ } catch(types::InvalidConversion& e) {
+ outMap.clear();
+ outMap["_values"] = Variant::Map();
+ (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_EXCEPTION;
+ (outMap["_values"].asMap())["_status_text"] = e.what();
+ failed = true;
+ }
+ }
+
+ Variant::Map headers;
+ headers["method"] = "response";
+ headers["qmf.agent"] = name_address;
+ if (failed) {
+ headers["qmf.opcode"] = "_exception";
+ QPID_LOG(trace, "SENT Exception map=" << outMap);
+ } else {
+ headers["qmf.opcode"] = "_method_response";
+ QPID_LOG(trace, "SENT MethodResponse map=" << outMap);
}
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+ MapCodec::encode(outMap, content);
+ connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo);
}
-void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, string replyTo)
+void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& replyTo)
{
- FieldTable ft;
- FieldTable::ValuePtr value;
-
moveNewObjectsLH();
- ft.decode(inBuffer);
+ Variant::Map inMap;
+ Variant::Map::const_iterator i;
+ Variant::Map headers;
+
+ MapCodec::decode(body, inMap);
+ QPID_LOG(trace, "RCVD GetQuery: map=" << inMap << " cid=" << cid);
+
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_query_response";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+ headers["partial"] = Variant();
+
+ Variant::List list_;
+ Variant::Map map_;
+ Variant::Map values;
+ Variant::Map oidMap;
+ string content;
+
+ /*
+ * Unpack the _what element of the query. Currently we only support OBJECT queries.
+ */
+ i = inMap.find("_what");
+ if (i == inMap.end()) {
+ sendException(replyTo, cid, "_what element missing in Query");
+ return;
+ }
- QPID_LOG(trace, "RCVD GetQuery: map=" << ft);
+ if (i->second.getType() != qpid::types::VAR_STRING) {
+ sendException(replyTo, cid, "_what element is not a string");
+ return;
+ }
- value = ft.get("_class");
- if (value.get() == 0 || !value->convertsTo<string>()) {
- value = ft.get("_objectid");
- if (value.get() == 0 || !value->convertsTo<string>())
- return;
+ if (i->second.asString() != "OBJECT") {
+ sendException(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported");
+ return;
+ }
+
+ string className;
+ string packageName;
+
+ /*
+ * Handle the _schema_id element, if supplied.
+ */
+ i = inMap.find("_schema_id");
+ if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
+ const Variant::Map& schemaIdMap(i->second.asMap());
+
+ Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name");
+ if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
+ className = s_iter->second.asString();
- ObjectId selector(value->get<string>());
- ManagementObjectMap::iterator iter = managementObjects.find(selector);
+ s_iter = schemaIdMap.find("_package_name");
+ if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
+ packageName = s_iter->second.asString();
+ }
+
+ /*
+ * Unpack the _object_id element of the query if it is present. If it is present, find that one
+ * object and return it. If it is not present, send a class-based result.
+ */
+ i = inMap.find("_object_id");
+ if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
+ ObjectId objId(i->second.asMap());
+
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
if (iter != managementObjects.end()) {
ManagementObject* object = iter->second;
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
- encodeHeader(outBuffer, 'g', sequence);
- object->writeProperties(outBuffer);
- object->writeStatistics(outBuffer, true);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+ object->mapEncodeValues(values, true, true); // write both stats and properties
+ objId.mapEncode(oidMap);
+ map_["_values"] = values;
+ map_["_object_id"] = oidMap;
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ object->getMd5Sum());
+ list_.push_back(map_);
+ headers.erase("partial");
+
+ ListCodec::encode(list_, content);
+ connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo);
+ return;
+ }
+ } else {
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject* object = iter->second;
+ if (object->getClassName() == className &&
+ (packageName.empty() || object->getPackageName() == packageName)) {
+
+ // @todo support multiple object reply per message
+ values.clear();
+ list_.clear();
+ oidMap.clear();
+
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
- QPID_LOG(trace, "SENT ObjectInd");
+ object->mapEncodeValues(values, true, true); // write both stats and properties
+ iter->first.mapEncode(oidMap);
+ map_["_values"] = values;
+ map_["_object_id"] = oidMap;
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ object->getMd5Sum());
+ list_.push_back(map_);
+
+ ListCodec::encode(list_, content);
+ connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo);
+ }
}
- sendCommandComplete(replyTo, sequence);
- return;
}
- string className(value->get<string>());
+ // end empty "non-partial" message to indicate CommandComplete
+ list_.clear();
+ headers.erase("partial");
+ ListCodec::encode(list_, content);
+ connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo);
+}
- for (ManagementObjectMap::iterator iter = managementObjects.begin();
- iter != managementObjects.end();
- iter++) {
- ManagementObject* object = iter->second;
- if (object->getClassName() == className) {
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo)
+{
+ QPID_LOG(trace, "RCVD AgentLocateRequest");
+ static const string addr_exchange("qmf.default.direct");
- if (object->getConfigChanged() || object->getInstChanged())
- object->setUpdateTime();
+ Variant::Map map;
+ Variant::Map headers;
+ string content;
- encodeHeader(outBuffer, 'g', sequence);
- object->writeProperties(outBuffer);
- object->writeStatistics(outBuffer, true);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_agent_locate_response";
+ headers["qmf.agent"] = name_address;
- QPID_LOG(trace, "SENT ObjectInd");
- }
- }
+ map["_values"] = attrMap;
+ map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
+ map["_values"].asMap()["heartbeat_interval"] = interval;
+
+ MapCodec::encode(map, content);
+ connThreadBody.sendBuffer(content, cid, headers, addr_exchange, replyTo);
+
+ QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo);
- sendCommandComplete(replyTo, sequence);
+ {
+ Mutex::ScopedLock lock(agentLock);
+ clientWasAdded = true;
+ }
}
-void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
+void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& replyTo)
{
if (extThread) {
Mutex::ScopedLock lock(agentLock);
- string body;
- inBuffer.getRawData(body, inBuffer.available());
- methodQueue.push_back(new QueuedMethod(sequence, replyTo, body));
+ methodQueue.push_back(new QueuedMethod(cid, replyTo, body));
if (pipeHandle != 0) {
pipeHandle->write("X", 1);
} else if (notifyable != 0) {
@@ -568,7 +709,7 @@ void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequenc
inCallback = false;
}
} else {
- invokeMethodRequest(inBuffer, sequence, replyTo);
+ invokeMethodRequest(body, cid, replyTo);
}
QPID_LOG(trace, "RCVD MethodRequest");
@@ -576,28 +717,45 @@ void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequenc
void ManagementAgentImpl::received(Message& msg)
{
+ string replyToKey;
+ framing::MessageProperties mp = msg.getMessageProperties();
+ if (mp.hasReplyTo()) {
+ const framing::ReplyTo& rt = mp.getReplyTo();
+ replyToKey = rt.getRoutingKey();
+ }
+
+ if (mp.hasAppId() && mp.getAppId() == "qmf2")
+ {
+ string opcode = mp.getApplicationHeaders().getAsString("qmf.opcode");
+ string cid = msg.getMessageProperties().getCorrelationId();
+
+ if (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToKey);
+ else if (opcode == "_method_request") handleMethodRequest(msg.getData(), cid, replyToKey);
+ else if (opcode == "_query_request") handleGetQuery(msg.getData(), cid, replyToKey);
+ else {
+ QPID_LOG(warning, "Support for QMF V2 Opcode [" << opcode << "] TBD!!!");
+ }
+ return;
+ }
+
+ // old preV2 binary messages
+
+ uint32_t sequence;
string data = msg.getData();
Buffer inBuffer(const_cast<char*>(data.c_str()), data.size());
uint8_t opcode;
- uint32_t sequence;
- string replyToKey;
- framing::MessageProperties p = msg.getMessageProperties();
- if (p.hasReplyTo()) {
- const framing::ReplyTo& rt = p.getReplyTo();
- replyToKey = rt.getRoutingKey();
- }
if (checkHeader(inBuffer, &opcode, &sequence))
{
- if (opcode == 'a') handleAttachResponse(inBuffer);
- else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence);
+ if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToKey);
else if (opcode == 'x') handleConsoleAddedIndication();
- else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey);
- else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey);
+ else
+ QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode));
}
}
+
void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq)
{
buf.putOctet('A');
@@ -607,6 +765,19 @@ void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq
buf.putLong (seq);
}
+Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname,
+ const string& cname,
+ const uint8_t *md5Sum)
+{
+ Variant::Map map_;
+
+ map_["_package_name"] = pname;
+ map_["_class_name"] = cname;
+ map_["_hash"] = types::Uuid(md5Sum);
+ return map_;
+}
+
+
bool ManagementAgentImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq)
{
if (buf.getSize() < 8)
@@ -661,7 +832,7 @@ void ManagementAgentImpl::addClassLocal(uint8_t classKind,
PackageMap::iterator pIter,
const string& className,
uint8_t* md5Sum,
- qpid::management::ManagementObject::writeSchemaCall_t schemaCall)
+ ManagementObject::writeSchemaCall_t schemaCall)
{
SchemaClassKey key;
ClassMap& cMap = pIter->second;
@@ -701,10 +872,7 @@ void ManagementAgentImpl::encodeClassIndication(Buffer& buf,
void ManagementAgentImpl::periodicProcessing()
{
-#define BUFSIZE 65536
Mutex::ScopedLock lock(agentLock);
- char msgChars[BUFSIZE];
- uint32_t contentSize;
list<pair<ObjectId, ManagementObject*> > deleteList;
if (!connected)
@@ -745,42 +913,53 @@ void ManagementAgentImpl::periodicProcessing()
!baseObject->isDeleted()))
continue;
- Buffer msgBuffer(msgChars, BUFSIZE);
+ Variant::List list_;
+
for (ManagementObjectMap::iterator iter = baseIter;
iter != managementObjects.end();
iter++) {
ManagementObject* object = iter->second;
+ bool send_stats, send_props;
if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
object->setFlags(1);
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
- if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) {
- encodeHeader(msgBuffer, 'c');
- object->writeProperties(msgBuffer);
- }
-
- if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) {
- encodeHeader(msgBuffer, 'i');
- object->writeStatistics(msgBuffer);
+ send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted());
+ send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
+
+ if (send_stats || send_props) {
+ Variant::Map map_;
+ Variant::Map values;
+ Variant::Map oid;
+
+ object->getObjectId().mapEncode(oid);
+ map_["_object_id"] = oid;
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ object->getMd5Sum());
+ object->mapEncodeValues(values, send_props, send_stats);
+ map_["_values"] = values;
+ list_.push_back(map_);
}
if (object->isDeleted())
deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object));
object->setForcePublish(false);
-
- if (msgBuffer.available() < (BUFSIZE / 2))
- break;
}
}
- contentSize = BUFSIZE - msgBuffer.available();
- if (contentSize > 0) {
- msgBuffer.reset();
- stringstream key;
- key << "console.obj." << assignedBrokerBank << "." << assignedAgentBank << "." <<
- baseObject->getPackageName() << "." << baseObject->getClassName();
- connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str());
+ string content;
+ ListCodec::encode(list_, content);
+ if (content.length()) {
+ Variant::Map headers;
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+
+ connThreadBody.sendBuffer(content, "", headers, "qmf.default.topic", "agent.ind.data", "amqp/list");
+ QPID_LOG(trace, "SENT DataIndication");
}
}
@@ -793,18 +972,7 @@ void ManagementAgentImpl::periodicProcessing()
}
deleteList.clear();
-
- {
- Buffer msgBuffer(msgChars, BUFSIZE);
- encodeHeader(msgBuffer, 'h');
- msgBuffer.putLongLong(uint64_t(Duration(now())));
- stringstream key;
- key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank;
-
- contentSize = BUFSIZE - msgBuffer.available();
- msgBuffer.reset();
- connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str());
- }
+ sendHeartbeat();
}
void ManagementAgentImpl::ConnectionThread::run()
@@ -831,6 +999,10 @@ void ManagementAgentImpl::ConnectionThread::run()
arg::exclusive=true);
session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(),
arg::bindingKey=queueName.str());
+ session.exchangeBind(arg::exchange="qmf.default.direct", arg::queue=queueName.str(),
+ arg::bindingKey=agent.name_address);
+ session.exchangeBind(arg::exchange="qmf.default.topic", arg::queue=queueName.str(),
+ arg::bindingKey="console.#");
subscriptions->subscribe(agent, queueName.str(), dest);
QPID_LOG(info, "Connection established with broker");
@@ -839,6 +1011,7 @@ void ManagementAgentImpl::ConnectionThread::run()
if (shutdown)
return;
operational = true;
+ agent.connected = true;
agent.startProtocol();
try {
Mutex::ScopedUnlock _unlock(connLock);
@@ -892,6 +1065,48 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf,
const string& exchange,
const string& routingKey)
{
+ Message msg;
+ string data;
+
+ buf.getRawData(data, length);
+ msg.setData(data);
+ sendMessage(msg, exchange, routingKey);
+}
+
+
+
+void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data,
+ const string& cid,
+ const Variant::Map headers,
+ const string& exchange,
+ const string& routingKey,
+ const string& contentType)
+{
+ Message msg;
+ Variant::Map::const_iterator i;
+
+ if (!cid.empty())
+ msg.getMessageProperties().setCorrelationId(cid);
+
+ if (!contentType.empty())
+ msg.getMessageProperties().setContentType(contentType);
+ for (i = headers.begin(); i != headers.end(); ++i) {
+ msg.getHeaders().setString(i->first, i->second.asString());
+ }
+ msg.getHeaders().setString("app_id", "qmf2");
+
+ msg.setData(data);
+ sendMessage(msg, exchange, routingKey);
+}
+
+
+
+
+
+void ManagementAgentImpl::ConnectionThread::sendMessage(Message msg,
+ const string& exchange,
+ const string& routingKey)
+{
ConnectionThread::shared_ptr s;
{
Mutex::ScopedLock _lock(connLock);
@@ -900,23 +1115,21 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf,
s = subscriptions;
}
- Message msg;
- string data;
-
- buf.getRawData(data, length);
msg.getDeliveryProperties().setRoutingKey(routingKey);
msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
- msg.setData(data);
+ msg.getMessageProperties().getApplicationHeaders().setString("qmf.agent", agent.name_address);
try {
session.messageTransfer(arg::content=msg, arg::destination=exchange);
} catch(exception& e) {
- QPID_LOG(error, "Exception caught in sendBuffer: " << e.what());
+ QPID_LOG(error, "Exception caught in sendMessage: " << e.what());
// Bounce the connection
if (s)
s->stop();
}
}
+
+
void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank)
{
stringstream key;
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h
index affaa45d2d..d1609341be 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -51,6 +51,9 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
// Methods from ManagementAgent
//
int getMaxThreads() { return 1; }
+ void setName(const std::string& vendor,
+ const std::string& product,
+ const std::string& instance="");
void init(const std::string& brokerHost = "localhost",
uint16_t brokerPort = 5672,
uint16_t intervalSeconds = 10,
@@ -75,6 +78,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
uint8_t* md5Sum,
management::ManagementObject::writeSchemaCall_t schemaCall);
ObjectId addObject(management::ManagementObject* objectPtr, uint64_t persistId = 0);
+ ObjectId addObject(management::ManagementObject* objectPtr, const std::string& key,
+ bool persistent);
void raiseEvent(const management::ManagementEvent& event, severity_t severity = SEV_DEFAULT);
uint32_t pollCallbacks(uint32_t callLimit = 0);
int getSignalFd();
@@ -120,10 +125,10 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
};
struct QueuedMethod {
- QueuedMethod(uint32_t _seq, std::string _reply, std::string _body) :
- sequence(_seq), replyTo(_reply), body(_body) {}
+ QueuedMethod(const std::string& _cid, const std::string& _reply, const std::string& _body) :
+ cid(_cid), replyTo(_reply), body(_body) {}
- uint32_t sequence;
+ std::string cid;
std::string replyTo;
std::string body;
};
@@ -140,6 +145,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
void received (client::Message& msg);
+ qpid::types::Variant::Map attrMap;
+ std::string name_address;
uint16_t interval;
bool extThread;
sys::PipeHandle* pipeHandle;
@@ -155,6 +162,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
client::ConnectionSettings connectionSettings;
bool initialized;
bool connected;
+ bool useMapMsg;
std::string lastFailure;
bool clientWasAdded;
@@ -198,6 +206,15 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
uint32_t length,
const std::string& exchange,
const std::string& routingKey);
+ void sendBuffer(const std::string& data,
+ const std::string& cid,
+ const qpid::types::Variant::Map headers,
+ const std::string& exchange,
+ const std::string& routingKey,
+ const std::string& contentType="amqp/map");
+ void sendMessage(qpid::client::Message msg,
+ const std::string& exchange,
+ const std::string& routingKey);
void bindToBank(uint32_t brokerBank, uint32_t agentBank);
void close();
bool isSleeping() const;
@@ -237,16 +254,21 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
PackageMap::iterator pIter,
ClassMap::iterator cIter);
void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
+ qpid::types::Variant::Map mapEncodeSchemaId(const std::string& pname,
+ const std::string& cname,
+ const uint8_t *md5Sum);
bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
- void sendCommandComplete (std::string replyToKey, uint32_t sequence,
- uint32_t code = 0, std::string text = std::string("OK"));
- void handleAttachResponse (qpid::framing::Buffer& inBuffer);
+ void sendHeartbeat();
+ void sendException(const std::string& replyToKey, const std::string& cid,
+ const std::string& text, uint32_t code=1);
void handlePackageRequest (qpid::framing::Buffer& inBuffer);
void handleClassQuery (qpid::framing::Buffer& inBuffer);
- void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence);
- void invokeMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
- void handleGetQuery (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
- void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
+ void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& replyTo);
+ void invokeMethodRequest (const std::string& body, const std::string& cid, const std::string& replyTo);
+
+ void handleGetQuery (const std::string& body, const std::string& cid, const std::string& replyTo);
+ void handleLocateRequest (const std::string& body, const std::string& sequence, const std::string& replyTo);
+ void handleMethodRequest (const std::string& body, const std::string& sequence, const std::string& replyTo);
void handleConsoleAddedIndication();
};
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index d94f228734..24c5a0c049 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -93,7 +93,8 @@ Broker::Options::Options(const std::string& name) :
tcpNoDelay(false),
requireEncrypted(false),
maxSessionRate(0),
- asyncQueueEvents(false) // Must be false in a cluster.
+ asyncQueueEvents(false), // Must be false in a cluster.
+ qmf2Support(false)
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -114,6 +115,7 @@ Broker::Options::Options(const std::string& name) :
("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
("staging-threshold", optValue(stagingThreshold, "N"), "Stages messages over N bytes to disk")
("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
+ ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Use QMF v2 for Broker Management")
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval")
("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"),
"Interval between attempts to purge any expired messages from queues")
@@ -138,7 +140,9 @@ const std::string knownHostsNone("none");
Broker::Broker(const Broker::Options& conf) :
poller(new Poller),
config(conf),
- managementAgent(conf.enableMgmt ? new ManagementAgent() : 0),
+ managementAgent(conf.enableMgmt ? new ManagementAgent(!conf.qmf2Support,
+ conf.qmf2Support)
+ : 0),
store(new NullMessageStore),
acl(0),
dataDir(conf.noDataDir ? std::string() : conf.dataDir),
@@ -164,6 +168,7 @@ Broker::Broker(const Broker::Options& conf) :
QPID_LOG(info, "Management enabled");
managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(),
conf.mgmtPubInterval, this, conf.workerThreads + 3);
+ managementAgent->setName("apache.org", "qpidd");
_qmf::Package packageInitializer(managementAgent.get());
System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this);
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 465a17f4eb..f9be992f0c 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -113,6 +113,7 @@ public:
std::string knownHosts;
uint32_t maxSessionRate;
bool asyncQueueEvents;
+ bool qmf2Support;
private:
std::string getHome();
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 7bb70ed24a..1d3da982d8 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -149,7 +149,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
mgmtExchange = new _qmf::Exchange (agent, this, parent, _name);
mgmtExchange->set_durable(durable);
mgmtExchange->set_autoDelete(false);
- mgmtExchange->set_arguments(args);
+ mgmtExchange->set_arguments(ManagementAgent::toMap(args));
if (!durable) {
if (name.empty()) {
agent->addObject (mgmtExchange, 0x1000000000000004LL); // Special default exchange ID
@@ -336,7 +336,7 @@ void Exchange::Binding::startManagement()
{
management::ObjectId queueId = mo->getObjectId();
mgmtBinding = new _qmf::Binding
- (agent, this, (Manageable*) parent, queueId, key, args);
+ (agent, this, (Manageable*) parent, queueId, key, ManagementAgent::toMap(args));
if (!origin.empty())
mgmtBinding->set_origin(origin);
agent->addObject (mgmtBinding, agent->allocateId(this));
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 9e379dfc49..8d9248212f 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -873,7 +873,7 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
if (mgmtObject != 0)
- mgmtObject->set_arguments (_settings);
+ mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
if ( isDurable() && ! getPersistenceId() && ! recovering )
store->create(*this, _settings);
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 73ef807a0a..5148d88e72 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -280,7 +280,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
if (agent != 0)
{
mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name,
- !acquire, ackExpected, exclusive ,arguments);
+ !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments));
agent->addObject (mgmtObject, agent->allocateId(this));
mgmtObject->set_creditMode("WINDOW");
}
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index c3b6f697fd..10eddc6045 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -106,7 +106,7 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), getConnection().getUserId(), exchange, type,
- alternateExchange, durable, false, args,
+ alternateExchange, durable, false, ManagementAgent::toMap(args),
response.second ? "created" : "existing"));
}catch(UnknownExchangeTypeException& /*e*/){
@@ -194,7 +194,8 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
- agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, exchangeRoutingKey, arguments));
+ agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName,
+ queueName, exchangeRoutingKey, ManagementAgent::toMap(arguments)));
}
}else{
throw NotFoundException("Bind failed. No such exchange: " + exchangeName);
@@ -389,7 +390,7 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
- name, durable, exclusive, autoDelete, arguments,
+ name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments),
queue_created.second ? "created" : "existing"));
}
@@ -499,7 +500,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(), getConnection().getUserId(),
- queueName, destination, exclusive, arguments));
+ queueName, destination, exclusive, ManagementAgent::toMap(arguments)));
}
void
diff --git a/cpp/src/qpid/broker/System.cpp b/cpp/src/qpid/broker/System.cpp
index 455ad11cf2..90c6b13cd3 100644
--- a/cpp/src/qpid/broker/System.cpp
+++ b/cpp/src/qpid/broker/System.cpp
@@ -22,6 +22,7 @@
#include "qpid/management/ManagementAgent.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/SystemInfo.h"
+#include "qpid/types/Uuid.h"
#include <iostream>
#include <fstream>
@@ -64,7 +65,7 @@ System::System (string _dataDir, Broker* broker) : mgmtObject(0)
}
}
- mgmtObject = new _qmf::System (agent, this, systemId);
+ mgmtObject = new _qmf::System(agent, this, types::Uuid(systemId.c_array()));
std::string sysname, nodename, release, version, machine;
qpid::sys::SystemInfo::getSystemId (sysname,
nodename,
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 4454d70427..bc62588f5d 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -29,20 +29,46 @@
#include "qpid/sys/Time.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/AclModule.h"
+#include "qpid/types/Variant.h"
+#include "qpid/types/Uuid.h"
+#include "qpid/framing/List.h"
+#include "qpid/amqp_0_10/Codecs.h"
#include <list>
#include <iostream>
#include <fstream>
#include <sstream>
+#include <typeinfo>
using boost::intrusive_ptr;
using qpid::framing::Uuid;
+using qpid::types::Variant;
+using qpid::amqp_0_10::MapCodec;
+using qpid::amqp_0_10::ListCodec;
using namespace qpid::framing;
using namespace qpid::management;
using namespace qpid::broker;
using namespace qpid::sys;
+using namespace qpid;
using namespace std;
namespace _qmf = qmf::org::apache::qpid::broker;
+
+
+static Variant::Map mapEncodeSchemaId(const std::string& pname,
+ const std::string& cname,
+ const std::string& type,
+ const uint8_t *md5Sum)
+{
+ Variant::Map map_;
+
+ map_["_package_name"] = pname;
+ map_["_class_name"] = cname;
+ map_["_type"] = type;
+ map_["_hash"] = qpid::types::Uuid(md5Sum);
+ return map_;
+}
+
+
ManagementAgent::RemoteAgent::~RemoteAgent ()
{
QPID_LOG(trace, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]");
@@ -52,10 +78,11 @@ ManagementAgent::RemoteAgent::~RemoteAgent ()
}
}
-ManagementAgent::ManagementAgent () :
+ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
threadPoolSize(1), interval(10), broker(0), timer(0),
startTime(uint64_t(Duration(now()))),
- suppressed(false)
+ suppressed(false),
+ qmf1Support(qmfV1), qmf2Support(qmfV2)
{
nextObjectId = 1;
brokerBank = 1;
@@ -148,6 +175,27 @@ void ManagementAgent::pluginsInitialized() {
timer->add(new Periodic(*this, interval));
}
+
+void ManagementAgent::setName(const string& vendor, const string& product, const string& instance)
+{
+ attrMap["_vendor"] = vendor;
+ attrMap["_product"] = product;
+ string inst;
+ if (instance.empty()) {
+ if (uuid.isNull())
+ {
+ throw Exception("ManagementAgent::configure() must be called if default name is used.");
+ }
+ inst = uuid.str();
+ } else
+ inst = instance;
+
+ name_address = vendor + ":" + product + ":" + inst;
+ attrMap["_instance"] = inst;
+ attrMap["_name"] = name_address;
+}
+
+
void ManagementAgent::writeData ()
{
string filename (dataDir + "/.mbrokerdata");
@@ -194,6 +242,7 @@ void ManagementAgent::registerEvent (const string& packageName,
addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
}
+// Deprecated: V1 objects
ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId)
{
uint16_t sequence;
@@ -207,8 +256,47 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId
objectNum = persistId;
}
- ObjectId objId(0 /*flags*/ , sequence, brokerBank, 0, objectNum);
- objId.setV2Key(*object);
+ ObjectId objId(0 /*flags*/, sequence, brokerBank, objectNum);
+ objId.setV2Key(*object); // let object generate the v2 key
+
+ object->setObjectId(objId);
+
+ {
+ Mutex::ScopedLock lock (addLock);
+ ManagementObjectMap::iterator destIter = newManagementObjects.find(objId);
+ if (destIter != newManagementObjects.end()) {
+ if (destIter->second->isDeleted()) {
+ newDeletedManagementObjects.push_back(destIter->second);
+ newManagementObjects.erase(destIter);
+ } else {
+ QPID_LOG(error, "ObjectId collision in addObject. class=" << object->getClassName() <<
+ " key=" << objId.getV2Key());
+ return objId;
+ }
+ }
+ newManagementObjects[objId] = object;
+ }
+
+ return objId;
+}
+
+
+
+ObjectId ManagementAgent::addObject(ManagementObject* object,
+ const std::string& key,
+ bool persistent)
+{
+ uint16_t sequence;
+
+ sequence = persistent ? 0 : bootSequence;
+
+ ObjectId objId(0 /*flags*/, sequence, brokerBank);
+ if (key.empty()) {
+ objId.setV2Key(*object); // let object generate the key
+ } else {
+ objId.setV2Key(key);
+ }
+
object->setObjectId(objId);
{
@@ -233,21 +321,57 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId
void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severity)
{
Mutex::ScopedLock lock (userLock);
- Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
- encodeHeader(outBuffer, 'e');
- outBuffer.putShortString(event.getPackageName());
- outBuffer.putShortString(event.getEventName());
- outBuffer.putBin128(event.getMd5Sum());
- outBuffer.putLongLong(uint64_t(Duration(now())));
- outBuffer.putOctet(sev);
- event.encode(outBuffer);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBuffer(outBuffer, outLen, mExchange,
- "console.event.1.0." + event.getPackageName() + "." + event.getEventName());
+ if (qmf1Support) {
+ Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ encodeHeader(outBuffer, 'e');
+ outBuffer.putShortString(event.getPackageName());
+ outBuffer.putShortString(event.getEventName());
+ outBuffer.putBin128(event.getMd5Sum());
+ outBuffer.putLongLong(uint64_t(Duration(now())));
+ outBuffer.putOctet(sev);
+ std::string sBuf;
+ event.encode(sBuf);
+ outBuffer.putRawData(sBuf);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ sendBuffer(outBuffer, outLen, mExchange,
+ "console.event.1.0." + event.getPackageName() + "." + event.getEventName());
+ QPID_LOG(trace, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName());
+ }
+
+ if (qmf2Support) {
+ Variant::Map map_;
+ Variant::Map schemaId;
+ Variant::Map values;
+ Variant::Map headers;
+
+ map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
+ event.getEventName(),
+ "_event",
+ event.getMd5Sum());
+ event.mapEncode(values);
+ map_["_values"] = values;
+ map_["_timestamp"] = uint64_t(Duration(now()));
+ map_["_severity"] = sev;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_event";
+ headers["qmf.agent"] = name_address;
+
+ stringstream key;
+ key << "agent.ind.event." << sev << "." << name_address << "." << event.getEventName();
+
+ string content;
+ MapCodec::encode(map_, content);
+ sendBuffer(content, "", headers, v2Topic, key.str());
+ QPID_LOG(trace, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName());
+ }
+
}
ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
@@ -355,6 +479,59 @@ void ManagementAgent::sendBuffer(Buffer& buf,
} catch(exception&) {}
}
+
+void ManagementAgent::sendBuffer(const std::string& data,
+ const std::string& cid,
+ const Variant::Map& headers,
+ qpid::broker::Exchange::shared_ptr exchange,
+ const std::string& routingKey)
+{
+ Variant::Map::const_iterator i;
+
+ if (suppressed) {
+ QPID_LOG(trace, "Suppressed management message to " << routingKey);
+ return;
+ }
+ if (exchange.get() == 0) return;
+
+ intrusive_ptr<Message> msg(new Message());
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0)));
+ AMQFrame header((AMQHeaderBody()));
+ AMQFrame content((AMQContentBody(data)));
+
+ method.setEof(false);
+ header.setBof(false);
+ header.setEof(false);
+ content.setBof(false);
+
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+
+ MessageProperties* props =
+ msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ props->setContentLength(data.length());
+ if (!cid.empty()) {
+ props->setCorrelationId(cid);
+ }
+
+ for (i = headers.begin(); i != headers.end(); ++i) {
+ msg->getOrInsertHeaders().setString(i->first, i->second.asString());
+ }
+ msg->getOrInsertHeaders().setString("app_id", "qmf2");
+
+ DeliveryProperties* dp =
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
+ dp->setRoutingKey(routingKey);
+
+ msg->getFrames().append(content);
+
+ DeliverableMessage deliverable (msg);
+ try {
+ exchange->route(deliverable, routingKey, 0);
+ } catch(exception&) {}
+}
+
+
void ManagementAgent::moveNewObjectsLH()
{
Mutex::ScopedLock lock (addLock);
@@ -391,12 +568,13 @@ void ManagementAgent::periodicProcessing (void)
{
#define BUFSIZE 65536
#define HEADROOM 4096
- QPID_LOG(trace, "Management agent periodic processing")
- Mutex::ScopedLock lock (userLock);
+ QPID_LOG(trace, "Management agent periodic processing");
+ Mutex::ScopedLock lock (userLock);
char msgChars[BUFSIZE];
uint32_t contentSize;
string routingKey;
list<pair<ObjectId, ManagementObject*> > deleteList;
+ std::string sBuf;
uint64_t uptime = uint64_t(Duration(now())) - startTime;
static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime);
@@ -439,43 +617,90 @@ void ManagementAgent::periodicProcessing (void)
continue;
Buffer msgBuffer(msgChars, BUFSIZE);
+ Variant::List list_;
+
for (ManagementObjectMap::iterator iter = baseIter;
iter != managementObjects.end();
iter++) {
ManagementObject* object = iter->second;
+ bool send_stats, send_props;
if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
object->setFlags(1);
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
- if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) {
+ send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted());
+ send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
+
+ if (send_props && qmf1Support) {
encodeHeader(msgBuffer, 'c');
- object->writeProperties(msgBuffer);
- pcount++;
+ sBuf.clear();
+ object->writeProperties(sBuf);
+ msgBuffer.putRawData(sBuf);
}
-
- if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) {
+
+ if (send_stats && qmf1Support) {
encodeHeader(msgBuffer, 'i');
- object->writeStatistics(msgBuffer);
- scount++;
+ sBuf.clear();
+ object->writeStatistics(sBuf);
+ msgBuffer.putRawData(sBuf);
}
+ if ((send_stats || send_props) && qmf2Support) {
+ Variant::Map map_;
+ Variant::Map values;
+
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ "_data",
+ object->getMd5Sum());
+ object->mapEncodeValues(values, send_props, send_stats);
+ map_["_values"] = values;
+ list_.push_back(map_);
+
+ }
+
+ if (send_props) pcount++;
+ if (send_stats) scount++;
+
if (object->isDeleted())
deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object));
object->setForcePublish(false);
- if (msgBuffer.available() < HEADROOM)
+ if (qmf1Support && (msgBuffer.available() < HEADROOM))
break;
}
}
- contentSize = BUFSIZE - msgBuffer.available();
- if (contentSize > 0) {
- msgBuffer.reset();
- stringstream key;
- key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName();
- sendBuffer(msgBuffer, contentSize, mExchange, key.str());
- QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
+ if (pcount || scount) {
+ if (qmf1Support) {
+ contentSize = BUFSIZE - msgBuffer.available();
+ if (contentSize > 0) {
+ msgBuffer.reset();
+ stringstream key;
+ key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName();
+ sendBuffer(msgBuffer, contentSize, mExchange, key.str());
+ QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
+ }
+ }
+
+ if (qmf2Support) {
+ string content;
+ ListCodec::encode(list_, content);
+ if (content.length()) {
+ stringstream key;
+ Variant::Map headers;
+ key << "agent.ind.data." << baseObject->getPackageName() << "." << baseObject->getClassName();
+ // key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName();
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+
+ sendBuffer(content, "", headers, v2Topic, key.str());
+ QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
+ }
+ }
}
}
@@ -492,15 +717,49 @@ void ManagementAgent::periodicProcessing (void)
for (ManagementObjectVector::iterator cdIter = deletedManagementObjects.begin();
cdIter != deletedManagementObjects.end(); cdIter++) {
collisionDeletions = true;
- Buffer msgBuffer(msgChars, BUFSIZE);
- encodeHeader(msgBuffer, 'c');
- (*cdIter)->writeProperties(msgBuffer);
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
- stringstream key;
- key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName();
- sendBuffer (msgBuffer, contentSize, mExchange, key.str());
- QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str());
+ {
+ if (qmf1Support) {
+ Buffer msgBuffer(msgChars, BUFSIZE);
+ encodeHeader(msgBuffer, 'c');
+ sBuf.clear();
+ (*cdIter)->writeProperties(sBuf);
+ msgBuffer.putRawData(sBuf);
+ contentSize = BUFSIZE - msgBuffer.available ();
+ msgBuffer.reset ();
+ stringstream key;
+ key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName();
+ sendBuffer (msgBuffer, contentSize, mExchange, key.str());
+ QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str());
+ }
+
+ if (qmf2Support) {
+ Variant::List list_;
+ Variant::Map map_;
+ Variant::Map values;
+ Variant::Map headers;
+
+ map_["_schema_id"] = mapEncodeSchemaId((*cdIter)->getPackageName(),
+ (*cdIter)->getClassName(),
+ "_data",
+ (*cdIter)->getMd5Sum());
+ (*cdIter)->mapEncodeValues(values, true, false);
+ map_["_values"] = values;
+ list_.push_back(map_);
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+
+ stringstream key;
+ key << "agent.ind.data." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName();
+
+ string content;
+ ListCodec::encode(list_, content);
+ sendBuffer(content, "", headers, v2Topic, key.str());
+ QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str());
+ }
+ }
}
if (!deleteList.empty() || collisionDeletions) {
@@ -508,7 +767,12 @@ void ManagementAgent::periodicProcessing (void)
deleteOrphanedAgentsLH();
}
- {
+ // heartbeat generation
+
+ if (qmf1Support) {
+#define BUFSIZE 65536
+ uint32_t contentSize;
+ char msgChars[BUFSIZE];
Buffer msgBuffer(msgChars, BUFSIZE);
encodeHeader(msgBuffer, 'h');
msgBuffer.putLongLong(uint64_t(Duration(now())));
@@ -519,6 +783,27 @@ void ManagementAgent::periodicProcessing (void)
sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey);
}
+
+ if (qmf2Support) {
+ static const string addr_key("agent.ind.heartbeat");
+
+ Variant::Map map;
+ Variant::Map headers;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_agent_heartbeat_indication";
+ headers["qmf.agent"] = name_address;
+
+ map["_values"] = attrMap;
+ map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
+ map["_values"].asMap()["heartbeat_interval"] = interval;
+
+ string content;
+ MapCodec::encode(map, content);
+ sendBuffer(content, "", headers, v2Topic, addr_key);
+
+ QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
+ }
QPID_LOG(debug, "periodic update " << debugSnapshot());
}
@@ -531,19 +816,51 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
if (!object->isDeleted())
return;
+ if (qmf1Support) {
#define DNOW_BUFSIZE 2048
- char msgChars[DNOW_BUFSIZE];
- uint32_t contentSize;
- Buffer msgBuffer(msgChars, DNOW_BUFSIZE);
-
- encodeHeader(msgBuffer, 'c');
- object->writeProperties(msgBuffer);
- contentSize = msgBuffer.getPosition();
- msgBuffer.reset();
- stringstream key;
- key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName();
- sendBuffer(msgBuffer, contentSize, mExchange, key.str());
- QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str());
+ char msgChars[DNOW_BUFSIZE];
+ uint32_t contentSize;
+ Buffer msgBuffer(msgChars, DNOW_BUFSIZE);
+ std::string sBuf;
+
+ encodeHeader(msgBuffer, 'c');
+ object->writeProperties(sBuf);
+ msgBuffer.putRawData(sBuf);
+ contentSize = msgBuffer.getPosition();
+ msgBuffer.reset();
+ stringstream key;
+ key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName();
+ sendBuffer(msgBuffer, contentSize, mExchange, key.str());
+ QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str());
+ }
+
+ if (qmf2Support) {
+ Variant::List list_;
+ Variant::Map map_;
+ Variant::Map values;
+
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ "_data",
+ object->getMd5Sum());
+ object->mapEncodeValues(values, true, false);
+ map_["_values"] = values;
+ list_.push_back(map_);
+
+ stringstream key;
+ key << "agent.ind.data." << object->getPackageName() << "." << object->getClassName();
+
+ Variant::Map headers;
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+
+ string content;
+ ListCodec::encode(list_, content);
+ sendBuffer(content, "", headers, v2Topic, key.str());
+ QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str());
+ }
managementObjects.erase(oid);
}
@@ -566,35 +883,68 @@ void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence,
bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
const string& routingKey,
- const FieldTable* /*args*/)
+ const FieldTable* /*args*/,
+ const bool topic)
{
Mutex::ScopedLock lock (userLock);
Message& msg = ((DeliverableMessage&) deliverable).getMessage ();
- // Parse the routing key. This management broker should act as though it
- // is bound to the exchange to match the following keys:
- //
- // agent.1.0.#
- // broker
- // schema.#
+ if (qmf1Support && topic) {
- if (routingKey == "broker") {
- dispatchAgentCommandLH(msg);
- return false;
- }
+ // qmf1 is bound only to the topic management exchange.
+ // Parse the routing key. This management broker should act as though it
+ // is bound to the exchange to match the following keys:
+ //
+ // agent.1.0.#
+ // broker
+ // schema.#
- else if (routingKey.compare(0, 9, "agent.1.0") == 0) {
- dispatchAgentCommandLH(msg);
- return false;
- }
+ if (routingKey == "broker") {
+ dispatchAgentCommandLH(msg);
+ return false;
+ }
+
+ if (routingKey.length() > 6) {
+
+ if (routingKey.compare(0, 9, "agent.1.0") == 0) {
+ dispatchAgentCommandLH(msg);
+ return false;
+ }
+
+ if (routingKey.compare(0, 8, "agent.1.") == 0) {
+ return authorizeAgentMessageLH(msg);
+ }
- else if (routingKey.compare(0, 8, "agent.1.") == 0) {
- return authorizeAgentMessageLH(msg);
+ if (routingKey.compare(0, 7, "schema.") == 0) {
+ dispatchAgentCommandLH(msg);
+ return true;
+ }
+ }
}
- else if (routingKey.compare(0, 7, "schema.") == 0) {
- dispatchAgentCommandLH(msg);
- return true;
+ if (qmf2Support) {
+
+ if (topic) {
+
+ // Intercept messages bound to:
+ // "console.ind.locate.# - process these messages, and also allow them to be forwarded.
+
+ if (routingKey.compare(0, 18, "console.ind.locate") == 0) {
+ dispatchAgentCommandLH(msg);
+ return true;
+ }
+
+ } else { // direct exchange
+
+ // Intercept messages bound to:
+ // "broker" - generic alias for the local broker
+ // "<name_address>" - the broker agent's proper name
+ // and do not forward them futher
+ if (routingKey == "broker" || routingKey == name_address) {
+ dispatchAgentCommandLH(msg);
+ return false;
+ }
+ }
}
return true;
@@ -610,14 +960,19 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
AclModule* acl = broker->getAcl();
+ std::string inArgs;
- ObjectId objId(inBuffer);
+ std::string sBuf;
+ inBuffer.getRawData(sBuf, 16);
+ ObjectId objId;
+ objId.decode(sBuf);
inBuffer.getShortString(packageName);
inBuffer.getShortString(className);
inBuffer.getBin128(hash);
inBuffer.getShortString(methodName);
+ inBuffer.getRawData(inArgs, inBuffer.available());
- QPID_LOG(trace, "RECV MethodRequest class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" <<
+ QPID_LOG(trace, "RECV MethodRequest (v1) class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" <<
methodName << " replyTo=" << replyToKey);
encodeHeader(outBuffer, 'm', sequence);
@@ -629,8 +984,8 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence)
- return;
+ QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence);
+ return;
}
if (acl != 0) {
@@ -645,8 +1000,8 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence)
- return;
+ QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
+ return;
}
}
@@ -664,7 +1019,9 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
try {
outBuffer.record();
Mutex::ScopedUnlock u(userLock);
- iter->second->doMethod(methodName, inBuffer, outBuffer);
+ std::string outBuf;
+ iter->second->doMethod(methodName, inArgs, outBuf);
+ outBuffer.putRawData(outBuf);
} catch(exception& e) {
outBuffer.restore();
outBuffer.putLong(Manageable::STATUS_EXCEPTION);
@@ -675,9 +1032,135 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND MethodResponse to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(trace, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence);
}
+
+void ManagementAgent::handleMethodRequestLH (const std::string& body, string replyTo,
+ const std::string& cid, const ConnectionToken* connToken)
+{
+ string methodName;
+ Variant::Map inMap;
+ MapCodec::decode(body, inMap);
+ Variant::Map::const_iterator oid, mid;
+ string content;
+
+ Variant::Map outMap;
+ Variant::Map headers;
+
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_method_response";
+ headers["qmf.agent"] = name_address;
+
+ if ((oid = inMap.find("_object_id")) == inMap.end() ||
+ (mid = inMap.find("_method_name")) == inMap.end())
+ {
+ headers["qmf.opcode"] = "_exception";
+ (outMap["_values"].asMap())["_status"] = Manageable::STATUS_PARAMETER_INVALID;
+ (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID);
+
+ MapCodec::encode(outMap, content);
+ sendBuffer(content, cid, headers, v2Direct, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse (invalid param) to=" << replyTo << " seq=" << cid);
+ return;
+ }
+
+ ObjectId objId;
+ Variant::Map inArgs;
+
+ try {
+ // coversions will throw if input is invalid.
+ objId = ObjectId(oid->second.asMap());
+ methodName = mid->second.getString();
+
+ mid = inMap.find("_arguments");
+ if (mid != inMap.end()) {
+ inArgs = (mid->second).asMap();
+ }
+ } catch(exception& e) {
+ headers["qmf.opcode"] = "_exception";
+ (outMap["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION;
+ (outMap["_values"].asMap())["_status_text"] = e.what();
+
+ MapCodec::encode(outMap, content);
+ sendBuffer(content, cid, headers, v2Direct, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse (invalid format) to=" << replyTo << " seq=" << cid);
+ return;
+ }
+
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+
+ if (iter == managementObjects.end() || iter->second->isDeleted()) {
+ headers["qmf.opcode"] = "_exception";
+ (outMap["_values"].asMap())["_status"] = Manageable::STATUS_UNKNOWN_OBJECT;
+ (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT);
+
+ MapCodec::encode(outMap, content);
+ sendBuffer(content, cid, headers, v2Direct, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse (unknown object) to=" << replyTo << " seq=" << cid);
+ return;
+ }
+
+ // validate
+ AclModule* acl = broker->getAcl();
+ DisallowedMethods::const_iterator i;
+
+ i = disallowed.find(std::make_pair(iter->second->getClassName(), methodName));
+ if (i != disallowed.end()) {
+ headers["qmf.opcode"] = "_exception";
+ (outMap["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN;
+ (outMap["_values"].asMap())["_status_text"] = i->second;
+
+ MapCodec::encode(outMap, content);
+ sendBuffer(content, cid, headers, v2Direct, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << cid);
+ return;
+ }
+
+ if (acl != 0) {
+ string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId();
+ map<acl::Property, string> params;
+ params[acl::PROP_SCHEMAPACKAGE] = iter->second->getPackageName();
+ params[acl::PROP_SCHEMACLASS] = iter->second->getClassName();
+
+ if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, &params)) {
+ headers["qmf.opcode"] = "_exception";
+ (outMap["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN;
+ (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN);
+
+ MapCodec::encode(outMap, content);
+ sendBuffer(content, cid, headers, v2Direct, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << cid);
+ return;
+ }
+ }
+
+ // invoke the method
+
+ QPID_LOG(trace, "RECV MethodRequest (v2) class=" << iter->second->getPackageName()
+ << ":" << iter->second->getClassName() << " method=" <<
+ methodName << " replyTo=" << replyTo);
+
+ try {
+ iter->second->doMethod(methodName, inArgs, outMap);
+ } catch(exception& e) {
+ outMap.clear();
+ headers["qmf.opcode"] = "_exception";
+ (outMap["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION;
+ (outMap["_values"].asMap())["_status_text"] = e.what();
+
+ MapCodec::encode(outMap, content);
+ sendBuffer(content, cid, headers, v2Direct, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse (exception) to=" << replyTo << " seq=" << cid);
+ return;
+ }
+
+ MapCodec::encode(outMap, content);
+ sendBuffer(content, cid, headers, v2Direct, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid);
+}
+
+
void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence)
{
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
@@ -781,6 +1264,7 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, string replyToKey, uin
uint32_t outLen;
uint32_t sequence = nextRequestSequence++;
+ // Schema Request
encodeHeader (outBuffer, 'S', sequence);
outBuffer.putShortString(packageName);
key.encode(outBuffer);
@@ -803,9 +1287,11 @@ void ManagementAgent::SchemaClass::appendSchema(Buffer& buf)
// linked in via plug-in), call the schema handler directly. If the package
// is from a remote management agent, send the stored schema information.
- if (writeSchemaCall != 0)
- writeSchemaCall(buf);
- else
+ if (writeSchemaCall != 0) {
+ std::string schema;
+ writeSchemaCall(schema);
+ buf.putRawData(schema);
+ } else
buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size());
}
@@ -981,7 +1467,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, string replyToKey
agent->mgmtObject->set_connectionRef(agent->connectionRef);
agent->mgmtObject->set_label (label);
agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
- agent->mgmtObject->set_systemId (systemId);
+ agent->mgmtObject->set_systemId ((const unsigned char*)systemId.data());
agent->mgmtObject->set_brokerBank (brokerBank);
agent->mgmtObject->set_agentBank (assignedBank);
addObject (agent->mgmtObject, 0);
@@ -1012,7 +1498,7 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin
ft.decode(inBuffer);
- QPID_LOG(trace, "RECV GetQuery query=" << ft << " seq=" << sequence);
+ QPID_LOG(trace, "RECV GetQuery (v1) query=" << ft << " seq=" << sequence);
value = ft.get("_class");
if (value.get() == 0 || !value->convertsTo<string>()) {
@@ -1031,13 +1517,17 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin
object->setUpdateTime();
if (!object->isDeleted()) {
+ std::string sBuf;
encodeHeader(outBuffer, 'g', sequence);
- object->writeProperties(outBuffer);
- object->writeStatistics(outBuffer, true);
+ object->writeProperties(sBuf);
+ outBuffer.putRawData(sBuf);
+ sBuf.clear();
+ object->writeStatistics(sBuf, true);
+ outBuffer.putRawData(sBuf);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
}
}
sendCommandComplete(replyToKey, sequence);
@@ -1058,13 +1548,17 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin
object->setUpdateTime();
if (!object->isDeleted()) {
+ std::string sBuf;
encodeHeader(outBuffer, 'g', sequence);
- object->writeProperties(outBuffer);
- object->writeStatistics(outBuffer, true);
+ object->writeProperties(sBuf);
+ outBuffer.putRawData(sBuf);
+ sBuf.clear();
+ object->writeStatistics(sBuf, true);
+ outBuffer.putRawData(sBuf);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
}
}
}
@@ -1072,64 +1566,285 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin
sendCommandComplete(replyToKey, sequence);
}
+
+void ManagementAgent::handleGetQueryLH(const std::string& body, std::string replyTo, const std::string& cid, const std::string& contentType)
+{
+ FieldTable ft;
+ FieldTable::ValuePtr value;
+
+ moveNewObjectsLH();
+
+ if (contentType != "_query_v1") {
+ QPID_LOG(warning, "Support for QMF V2 Query format TBD!!!");
+ return;
+ }
+
+ Variant::Map inMap;
+ MapCodec::decode(body, inMap);
+ Variant::Map::const_iterator i;
+ Variant::Map headers;
+
+ QPID_LOG(trace, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid);
+
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_query_response";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+ headers["partial"];
+
+ Variant::List list_;
+ Variant::Map map_;
+ Variant::Map values;
+ string className;
+ string content;
+
+ i = inMap.find("_class");
+ if (i != inMap.end())
+ try {
+ className = i->second.asString();
+ } catch(exception& e) {
+ className.clear();
+ QPID_LOG(trace, "RCVD GetQuery: invalid format - class target ignored.");
+ }
+
+ if (className.empty()) {
+ ObjectId objId;
+ i = inMap.find("_object_id");
+ if (i != inMap.end()) {
+
+ try {
+ objId = ObjectId(i->second.asMap());
+ } catch (exception &e) {
+ objId = ObjectId(); // empty object id - won't find a match (I hope).
+ QPID_LOG(trace, "RCVD GetQuery (invalid Object Id format) to=" << replyTo << " seq=" << cid);
+ }
+
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ if (iter != managementObjects.end()) {
+ ManagementObject* object = iter->second;
+
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ if (!object->isDeleted()) {
+ object->mapEncodeValues(values, true, true); // write both stats and properties
+ map_["_values"] = values;
+ list_.push_back(map_);
+
+ ListCodec::encode(list_, content);
+ sendBuffer(content, cid, headers, v2Direct, replyTo);
+ }
+ }
+ }
+ } else {
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject* object = iter->second;
+ if (object->getClassName () == className) {
+
+ // @todo: support multiple objects per message reply
+ values.clear();
+ list_.clear();
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ if (!object->isDeleted()) {
+ object->mapEncodeValues(values, true, true); // write both stats and properties
+ map_["_values"] = values;
+ list_.push_back(map_);
+
+ ListCodec::encode(list_, content);
+ sendBuffer(content, cid, headers, v2Direct, replyTo);
+ }
+ }
+ }
+ }
+
+ // end empty "non-partial" message to indicate CommandComplete
+ list_.clear();
+ headers.erase("partial");
+ ListCodec::encode(list_, content);
+ sendBuffer(content, cid, headers, v2Direct, replyTo);
+ QPID_LOG(trace, "SEND GetResponse (v2) to=" << replyTo << " seq=" << cid);
+}
+
+
+void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo,
+ const string& cid)
+{
+ QPID_LOG(trace, "RCVD AgentLocateRequest");
+
+ Variant::Map map;
+ Variant::Map headers;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_agent_locate_response";
+ headers["qmf.agent"] = name_address;
+
+ map["_values"] = attrMap;
+ map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
+ map["_values"].asMap()["heartbeat_interval"] = interval;
+
+ string content;
+ MapCodec::encode(map, content);
+ sendBuffer(content, cid, headers, v2Direct, replyTo);
+
+ QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo);
+}
+
+
bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
{
Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
- uint8_t opcode;
- uint32_t sequence;
- string replyToKey;
+ uint32_t sequence = 0;
+ bool methodReq = false;
+ bool mapMsg = false;
+ string packageName;
+ string className;
+ string methodName;
+ std::string cid;
if (msg.encodedSize() > MA_BUFFER_SIZE)
return false;
msg.encodeContent(inBuffer);
+ uint32_t bufferLen = inBuffer.getPosition();
inBuffer.reset();
- if (!checkHeader(inBuffer, &opcode, &sequence))
- return false;
+ const framing::MessageProperties* p =
+ msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+
+ const framing::FieldTable *headers = msg.getApplicationHeaders();
+
+ if (headers && headers->getAsString("app_id") == "qmf2")
+ {
+ mapMsg = true;
+
+ if (p && p->hasCorrelationId()) {
+ cid = p->getCorrelationId();
+ }
+
+ if (headers->getAsString("qmf.opcode") == "_method_request")
+ {
+ methodReq = true;
+
+ // extract object id and method name
+
+ std::string body;
+ inBuffer.getRawData(body, bufferLen);
+ Variant::Map inMap;
+ MapCodec::decode(body, inMap);
+ Variant::Map::const_iterator oid, mid;
+
+ ObjectId objId;
- if (opcode == 'M') {
+ if ((oid = inMap.find("_object_id")) == inMap.end() ||
+ (mid = inMap.find("_method_name")) == inMap.end()) {
+ QPID_LOG(warning,
+ "Missing fields in QMF authorize req received.");
+ return false;
+ }
+
+ try {
+ // coversions will throw if input is invalid.
+ objId = ObjectId(oid->second.asMap());
+ methodName = mid->second.getString();
+ } catch(exception& e) {
+ QPID_LOG(warning,
+ "Badly formatted QMF authorize req received.");
+ return false;
+ }
+
+ // look up schema for object to get package and class name
+
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+
+ if (iter == managementObjects.end() || iter->second->isDeleted()) {
+ QPID_LOG(debug, "ManagementAgent::authorizeAgentMessageLH: stale object id " <<
+ objId);
+ return false;
+ }
+
+ packageName = iter->second->getPackageName();
+ className = iter->second->getClassName();
+ }
+ } else { // old style binary message format
+
+ uint8_t opcode;
+
+ if (!checkHeader(inBuffer, &opcode, &sequence))
+ return false;
+
+ if (opcode == 'M') {
+ methodReq = true;
+
+ // extract method name & schema package and class name
+
+ uint8_t hash[16];
+ inBuffer.getLongLong(); // skip over object id
+ inBuffer.getLongLong();
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(className);
+ inBuffer.getBin128(hash);
+ inBuffer.getShortString(methodName);
+
+ }
+ }
+
+ if (methodReq) {
// TODO: check method call against ACL list.
+ map<acl::Property, string> params;
AclModule* acl = broker->getAcl();
if (acl == 0)
return true;
string userId = ((const qpid::broker::ConnectionState*) msg.getPublisher())->getUserId();
- string packageName;
- string className;
- uint8_t hash[16];
- string methodName;
-
- map<acl::Property, string> params;
- ObjectId objId(inBuffer);
- inBuffer.getShortString(packageName);
- inBuffer.getShortString(className);
- inBuffer.getBin128(hash);
- inBuffer.getShortString(methodName);
-
params[acl::PROP_SCHEMAPACKAGE] = packageName;
params[acl::PROP_SCHEMACLASS] = className;
if (acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, &params))
return true;
+ // authorization failed, send reply if replyTo present
+
const framing::MessageProperties* p =
msg.getFrames().getHeaders()->get<framing::MessageProperties>();
if (p && p->hasReplyTo()) {
const framing::ReplyTo& rt = p->getReplyTo();
- replyToKey = rt.getRoutingKey();
+ string replyToKey = rt.getRoutingKey();
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ if (mapMsg) {
- encodeHeader(outBuffer, 'm', sequence);
- outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
- outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence)
- }
+ Variant::Map outMap;
+ Variant::Map headers;
+
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_method_response";
+ headers["qmf.agent"] = name_address;
+
+ ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN;
+ ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN);
+
+ string content;
+ MapCodec::encode(outMap, content);
+ sendBuffer(content, cid, headers, v2Direct, replyToKey);
+
+ } else {
+
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ encodeHeader(outBuffer, 'm', sequence);
+ outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
+ outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ }
+
+ QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
+ }
return false;
}
@@ -1139,9 +1854,6 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
void ManagementAgent::dispatchAgentCommandLH(Message& msg)
{
- Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE);
- uint8_t opcode;
- uint32_t sequence;
string replyToKey;
const framing::MessageProperties* p =
@@ -1153,6 +1865,9 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg)
else
return;
+ Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE);
+ uint8_t opcode;
+
if (msg.encodedSize() > MA_BUFFER_SIZE) {
QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " <<
msg.encodedSize());
@@ -1163,7 +1878,36 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg)
uint32_t bufferLen = inBuffer.getPosition();
inBuffer.reset();
+ const framing::FieldTable *headers = msg.getApplicationHeaders();
+
+ if (headers && headers->getAsString("app_id") == "qmf2")
+ {
+ std::string opcode = headers->getAsString("qmf.opcode");
+ std::string contentType = headers->getAsString("qmf.content");
+ std::string body;
+ std::string cid;
+
+ inBuffer.getRawData(body, bufferLen);
+
+ if (p && p->hasCorrelationId()) {
+ cid = p->getCorrelationId();
+ }
+
+ if (opcode == "_method_request")
+ return handleMethodRequestLH(body, replyToKey, cid, msg.getPublisher());
+ else if (opcode == "_query_request")
+ return handleGetQueryLH(body, replyToKey, cid, contentType);
+ else if (opcode == "_agent_locate_request")
+ return handleLocateRequestLH(body, replyToKey, cid);
+
+ QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!");
+ return;
+ }
+
+ // old preV2 binary messages
+
while (inBuffer.getPosition() < bufferLen) {
+ uint32_t sequence;
if (!checkHeader(inBuffer, &opcode, &sequence))
return;
@@ -1359,7 +2103,6 @@ ManagementObjectMap::iterator ManagementAgent::numericFind(const ObjectId& oid)
return iter;
}
-
void ManagementAgent::setAllocator(std::auto_ptr<IdAllocator> a)
{
Mutex::ScopedLock lock (userLock);
@@ -1377,42 +2120,64 @@ void ManagementAgent::disallow(const std::string& className, const std::string&
disallowed[std::make_pair(className, methodName)] = message;
}
+void ManagementAgent::SchemaClassKey::mapEncode(Variant::Map& _map) const {
+ _map["_cname"] = name;
+ _map["_hash"] = qpid::types::Uuid(hash);
+}
+
+void ManagementAgent::SchemaClassKey::mapDecode(const Variant::Map& _map) {
+ Variant::Map::const_iterator i;
+
+ if ((i = _map.find("_cname")) != _map.end()) {
+ name = i->second.asString();
+ }
+
+ if ((i = _map.find("_hash")) != _map.end()) {
+ const qpid::types::Uuid& uuid = i->second.asUuid();
+ memcpy(hash, uuid.data(), uuid.size());
+ }
+}
+
void ManagementAgent::SchemaClassKey::encode(qpid::framing::Buffer& buffer) const {
- buffer.checkAvailable(encodedSize());
+ buffer.checkAvailable(encodedBufSize());
buffer.putShortString(name);
buffer.putBin128(hash);
}
void ManagementAgent::SchemaClassKey::decode(qpid::framing::Buffer& buffer) {
- buffer.checkAvailable(encodedSize());
+ buffer.checkAvailable(encodedBufSize());
buffer.getShortString(name);
buffer.getBin128(hash);
}
-uint32_t ManagementAgent::SchemaClassKey::encodedSize() const {
+uint32_t ManagementAgent::SchemaClassKey::encodedBufSize() const {
return 1 + name.size() + 16 /* bin128 */;
}
-void ManagementAgent::SchemaClass::encode(qpid::framing::Buffer& outBuf) const {
- outBuf.checkAvailable(encodedSize());
- outBuf.putOctet(kind);
- outBuf.putLong(pendingSequence);
- outBuf.putLongString(data);
+void ManagementAgent::SchemaClass::mapEncode(Variant::Map& _map) const {
+ _map["_type"] = kind;
+ _map["_pending_sequence"] = pendingSequence;
+ _map["_data"] = data;
}
-void ManagementAgent::SchemaClass::decode(qpid::framing::Buffer& inBuf) {
- inBuf.checkAvailable(encodedSize());
- kind = inBuf.getOctet();
- pendingSequence = inBuf.getLong();
- inBuf.getLongString(data);
-}
+void ManagementAgent::SchemaClass::mapDecode(const Variant::Map& _map) {
+ Variant::Map::const_iterator i;
-uint32_t ManagementAgent::SchemaClass::encodedSize() const {
- return sizeof(uint8_t) + sizeof(uint32_t) + sizeof(uint32_t) + data.size();
+ if ((i = _map.find("_type")) != _map.end()) {
+ kind = i->second;
+ }
+ if ((i = _map.find("_pending_sequence")) != _map.end()) {
+ pendingSequence = i->second;
+ }
+ if ((i = _map.find("_data")) != _map.end()) {
+ data = i->second.asString();
+ }
}
void ManagementAgent::exportSchemas(std::string& out) {
- out.clear();
+ Variant::List list_;
+ Variant::Map map_, kmap, cmap;
+
for (PackageMap::const_iterator i = packages.begin(); i != packages.end(); ++i) {
string name = i->first;
const ClassMap& classes = i ->second;
@@ -1421,90 +2186,143 @@ void ManagementAgent::exportSchemas(std::string& out) {
const SchemaClass& klass = j->second;
if (klass.writeSchemaCall == 0) { // Ignore built-in schemas.
// Encode name, schema-key, schema-class
- size_t encodedSize = 1+name.size()+key.encodedSize()+klass.encodedSize();
- size_t end = out.size();
- out.resize(end + encodedSize);
- framing::Buffer outBuf(&out[end], encodedSize);
- outBuf.putShortString(name);
- key.encode(outBuf);
- klass.encode(outBuf);
+
+ map_.clear();
+ kmap.clear();
+ cmap.clear();
+
+ key.mapEncode(kmap);
+ klass.mapEncode(cmap);
+
+ map_["_pname"] = name;
+ map_["_key"] = kmap;
+ map_["_class"] = cmap;
+ list_.push_back(map_);
}
}
}
+
+ ListCodec::encode(list_, out);
}
void ManagementAgent::importSchemas(qpid::framing::Buffer& inBuf) {
- while (inBuf.available()) {
+
+ string buf(inBuf.getPointer(), inBuf.available());
+ Variant::List content;
+ ListCodec::decode(buf, content);
+ Variant::List::const_iterator l;
+
+
+ for (l = content.begin(); l != content.end(); l++) {
string package;
SchemaClassKey key;
SchemaClass klass;
- inBuf.getShortString(package);
- key.decode(inBuf);
- klass.decode(inBuf);
- packages[package][key] = klass;
+ Variant::Map map_, kmap, cmap;
+ Variant::Map::const_iterator i;
+
+ map_ = l->asMap();
+
+ if ((i = map_.find("_pname")) != map_.end()) {
+ package = i->second.asString();
+
+ if ((i = map_.find("_key")) != map_.end()) {
+ key.mapDecode(i->second.asMap());
+
+ if ((i = map_.find("_class")) != map_.end()) {
+ klass.mapDecode(i->second.asMap());
+
+ packages[package][key] = klass;
+ }
+ }
+ }
}
}
-void ManagementAgent::RemoteAgent::encode(qpid::framing::Buffer& outBuf) const {
- outBuf.checkAvailable(encodedSize());
- outBuf.putLong(brokerBank);
- outBuf.putLong(agentBank);
- outBuf.putShortString(routingKey);
- // TODO aconway 2010-03-04: we send the v2Key instead of the
- // ObjectId because that has the same meaning on different
- // brokers. ObjectId::encode doesn't currently encode the v2Key,
- // this can be cleaned up when it does.
- outBuf.putMediumString(connectionRef.getV2Key());
- mgmtObject->writeProperties(outBuf);
+void ManagementAgent::RemoteAgent::mapEncode(Variant::Map& map_) const {
+ Variant::Map _objId, _values;
+
+ map_["_brokerBank"] = brokerBank;
+ map_["_agentBank"] = agentBank;
+ map_["_routingKey"] = routingKey;
+
+ connectionRef.mapEncode(_objId);
+ map_["_object_id"] = _objId;
+
+ mgmtObject->mapEncodeValues(_values, true, false);
+ map_["_values"] = _values;
}
-void ManagementAgent::RemoteAgent::decode(qpid::framing::Buffer& inBuf) {
- brokerBank = inBuf.getLong();
- agentBank = inBuf.getLong();
- inBuf.getShortString(routingKey);
+void ManagementAgent::RemoteAgent::mapDecode(const Variant::Map& map_) {
+ Variant::Map::const_iterator i;
+
+ if ((i = map_.find("_brokerBank")) != map_.end()) {
+ brokerBank = i->second;
+ }
+
+ if ((i = map_.find("_agentBank")) != map_.end()) {
+ agentBank = i->second;
+ }
+
+ if ((i = map_.find("_routingKey")) != map_.end()) {
+ routingKey = i->second.getString();
+ }
- // TODO aconway 2010-03-04: see comment in encode()
- string connectionKey;
- inBuf.getMediumString(connectionKey);
- connectionRef = ObjectId(); // Clear out any existing value.
- connectionRef.setV2Key(connectionKey);
+ if ((i = map_.find("_object_id")) != map_.end()) {
+ connectionRef.mapDecode(i->second.asMap());
+ }
mgmtObject = new _qmf::Agent(&agent, this);
- mgmtObject->readProperties(inBuf);
+
+ if ((i = map_.find("_values")) != map_.end()) {
+ mgmtObject->mapDecodeValues(i->second.asMap());
+ }
+
// TODO aconway 2010-03-04: see comment in encode(), readProperties doesn't set v2key.
mgmtObject->set_connectionRef(connectionRef);
}
-uint32_t ManagementAgent::RemoteAgent::encodedSize() const {
- // TODO aconway 2010-03-04: see comment in encode()
- return sizeof(uint32_t) + sizeof(uint32_t) // 2 x Long
- + routingKey.size() + sizeof(uint8_t) // ShortString
- + connectionRef.getV2Key().size() + sizeof(uint16_t) // medium string
- + mgmtObject->writePropertiesSize();
-}
-
void ManagementAgent::exportAgents(std::string& out) {
- out.clear();
+ Variant::List list_;
+ Variant::Map map_, omap, amap;
+
for (RemoteAgentMap::const_iterator i = remoteAgents.begin();
i != remoteAgents.end();
++i)
{
// TODO aconway 2010-03-04: see comment in ManagementAgent::RemoteAgent::encode
RemoteAgent* agent = i->second;
- size_t encodedSize = agent->encodedSize();
- size_t end = out.size();
- out.resize(end + encodedSize);
- framing::Buffer outBuf(&out[end], encodedSize);
- agent->encode(outBuf);
+
+ map_.clear();
+ amap.clear();
+
+ agent->mapEncode(amap);
+ map_["_remote_agent"] = amap;
+ list_.push_back(map_);
}
+
+ ListCodec::encode(list_, out);
}
void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) {
- while (inBuf.available()) {
+ string buf(inBuf.getPointer(), inBuf.available());
+ Variant::List content;
+ ListCodec::decode(buf, content);
+ Variant::List::const_iterator l;
+
+ for (l = content.begin(); l != content.end(); l++) {
std::auto_ptr<RemoteAgent> agent(new RemoteAgent(*this));
- agent->decode(inBuf);
- addObject(agent->mgmtObject, 0);
- remoteAgents[agent->connectionRef] = agent.release();
+ Variant::Map map_;
+ Variant::Map::const_iterator i;
+
+ map_ = l->asMap();
+
+ if ((i = map_.find("_remote_agent")) != map_.end()) {
+
+ agent->mapDecode(i->second.asMap());
+
+ addObject (agent->mgmtObject, 0, false);
+ remoteAgents[agent->connectionRef] = agent.release();
+ }
}
}
@@ -1519,3 +2337,198 @@ std::string ManagementAgent::debugSnapshot() {
msg << " new objects: " << newManagementObjects.size();
return msg.str();
}
+
+Variant::Map ManagementAgent::toMap(const FieldTable& from)
+{
+ Variant::Map map;
+
+ for (FieldTable::const_iterator iter = from.begin(); iter != from.end(); iter++) {
+ const string& key(iter->first);
+ const FieldTable::ValuePtr& val(iter->second);
+
+ map[key] = toVariant(val);
+ }
+
+ return map;
+}
+
+Variant::List ManagementAgent::toList(const List& from)
+{
+ Variant::List _list;
+
+ for (List::const_iterator iter = from.begin(); iter != from.end(); iter++) {
+ const List::ValuePtr& val(*iter);
+
+ _list.push_back(toVariant(val));
+ }
+
+ return _list;
+}
+
+qpid::framing::FieldTable ManagementAgent::fromMap(const Variant::Map& from)
+{
+ qpid::framing::FieldTable ft;
+
+ for (Variant::Map::const_iterator iter = from.begin();
+ iter != from.end();
+ iter++) {
+ const string& key(iter->first);
+ const Variant& val(iter->second);
+
+ ft.set(key, toFieldValue(val));
+ }
+
+ return ft;
+}
+
+
+List ManagementAgent::fromList(const Variant::List& from)
+{
+ List fa;
+
+ for (Variant::List::const_iterator iter = from.begin();
+ iter != from.end();
+ iter++) {
+ const Variant& val(*iter);
+
+ fa.push_back(toFieldValue(val));
+ }
+
+ return fa;
+}
+
+
+boost::shared_ptr<FieldValue> ManagementAgent::toFieldValue(const Variant& in)
+{
+
+ switch(in.getType()) {
+
+ case types::VAR_VOID: return boost::shared_ptr<FieldValue>(new VoidValue());
+ case types::VAR_BOOL: return boost::shared_ptr<FieldValue>(new BoolValue(in.asBool()));
+ case types::VAR_UINT8: return boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8()));
+ case types::VAR_UINT16: return boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16()));
+ case types::VAR_UINT32: return boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32()));
+ case types::VAR_UINT64: return boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64()));
+ case types::VAR_INT8: return boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8()));
+ case types::VAR_INT16: return boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16()));
+ case types::VAR_INT32: return boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32()));
+ case types::VAR_INT64: return boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64()));
+ case types::VAR_FLOAT: return boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat()));
+ case types::VAR_DOUBLE: return boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble()));
+ case types::VAR_STRING: return boost::shared_ptr<FieldValue>(new Str16Value(in.asString()));
+ case types::VAR_UUID: return boost::shared_ptr<FieldValue>(new UuidValue(in.asUuid().data()));
+ case types::VAR_MAP: return boost::shared_ptr<FieldValue>(new FieldTableValue(ManagementAgent::fromMap(in.asMap())));
+ case types::VAR_LIST: return boost::shared_ptr<FieldValue>(new ListValue(ManagementAgent::fromList(in.asList())));
+ }
+
+ QPID_LOG(error, "Unknown Variant type - not converted: [" << in.getType() << "]");
+ return boost::shared_ptr<FieldValue>(new VoidValue());
+}
+
+// stolen from qpid/client/amqp0_10/Codecs.cpp - TODO: make Codecs public, and remove this dup.
+Variant ManagementAgent::toVariant(const boost::shared_ptr<FieldValue>& in)
+{
+ const std::string iso885915("iso-8859-15");
+ const std::string utf8("utf8");
+ const std::string utf16("utf16");
+ //const std::string binary("binary");
+ const std::string amqp0_10_binary("amqp0-10:binary");
+ //const std::string amqp0_10_bit("amqp0-10:bit");
+ const std::string amqp0_10_datetime("amqp0-10:datetime");
+ const std::string amqp0_10_struct("amqp0-10:struct");
+ Variant out;
+
+ //based on AMQP 0-10 typecode, pick most appropriate variant type
+ switch (in->getType()) {
+ //Fixed Width types:
+ case 0x00: //bin8
+ case 0x01: out.setEncoding(amqp0_10_binary); // int8
+ case 0x02: out = in->getIntegerValue<int8_t, 1>(); break; //uint8
+ case 0x03: out = in->getIntegerValue<uint8_t, 1>(); break; //
+ // case 0x04: break; //TODO: iso-8859-15 char // char
+ case 0x08: out = static_cast<bool>(in->getIntegerValue<uint8_t, 1>()); break; // bool int8
+
+ case 0x10: out.setEncoding(amqp0_10_binary); // bin16
+ case 0x11: out = in->getIntegerValue<int16_t, 2>(); break; // int16
+ case 0x12: out = in->getIntegerValue<uint16_t, 2>(); break; //uint16
+
+ case 0x20: out.setEncoding(amqp0_10_binary); // bin32
+ case 0x21: out = in->getIntegerValue<int32_t, 4>(); break; // int32
+ case 0x22: out = in->getIntegerValue<uint32_t, 4>(); break; // uint32
+
+ case 0x23: out = in->get<float>(); break; // float(32)
+
+ // case 0x27: break; //TODO: utf-32 char
+
+ case 0x30: out.setEncoding(amqp0_10_binary); // bin64
+ case 0x31: out = in->getIntegerValue<int64_t, 8>(); break; //int64
+
+ case 0x38: out.setEncoding(amqp0_10_datetime); //treat datetime as uint64_t, but set encoding
+ case 0x32: out = in->getIntegerValue<uint64_t, 8>(); break; //uint64
+ case 0x33: out = in->get<double>(); break; // double
+
+ case 0x48: // uuid
+ {
+ unsigned char data[16];
+ in->getFixedWidthValue<16>(data);
+ out = qpid::types::Uuid(data);
+ } break;
+
+ //TODO: figure out whether and how to map values with codes 0x40-0xd8
+
+ case 0xf0: break;//void, which is the default value for Variant
+ // case 0xf1: out.setEncoding(amqp0_10_bit); break;//treat 'bit' as void, which is the default value for Variant
+
+ //Variable Width types:
+ //strings:
+ case 0x80: // str8
+ case 0x90: // str16
+ case 0xa0: // str32
+ out = in->get<std::string>();
+ out.setEncoding(amqp0_10_binary);
+ break;
+
+ case 0x84: // str8
+ case 0x94: // str16
+ out = in->get<std::string>();
+ out.setEncoding(iso885915);
+ break;
+
+ case 0x85: // str8
+ case 0x95: // str16
+ out = in->get<std::string>();
+ out.setEncoding(utf8);
+ break;
+
+ case 0x86: // str8
+ case 0x96: // str16
+ out = in->get<std::string>();
+ out.setEncoding(utf16);
+ break;
+
+ case 0xab: // str32
+ out = in->get<std::string>();
+ out.setEncoding(amqp0_10_struct);
+ break;
+
+ case 0xa8: // map
+ out = ManagementAgent::toMap(in->get<FieldTable>());
+ break;
+
+ case 0xa9: // list of variant types
+ out = ManagementAgent::toList(in->get<List>());
+ break;
+ //case 0xaa: //convert amqp0-10 array (uniform type) into variant list
+ // out = Variant::List();
+ // translate<Array>(in, out.asList(), &toVariant);
+ // break;
+
+ default:
+ //error?
+ QPID_LOG(error, "Unknown FieldValue type - not converted: [" << (unsigned int)(in->getType()) << "]");
+ break;
+ }
+
+ return out;
+}
+
diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h
index 5b2c54f1b8..0250f39dd6 100644
--- a/cpp/src/qpid/management/ManagementAgent.h
+++ b/cpp/src/qpid/management/ManagementAgent.h
@@ -32,7 +32,9 @@
#include "qpid/management/ManagementEvent.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Agent.h"
+#include "qpid/types/Variant.h"
#include <qpid/framing/AMQFrame.h>
+#include <qpid/framing/FieldValue.h>
#include <memory>
#include <string>
#include <map>
@@ -62,7 +64,7 @@ public:
} severity_t;
- ManagementAgent ();
+ ManagementAgent (const bool qmfV1, const bool qmfV2);
virtual ~ManagementAgent ();
/** Called before plugins are initialized */
@@ -74,6 +76,9 @@ public:
/** Called by cluster to suppress management output during update. */
void suppress(bool s) { suppressed = s; }
+ void setName(const std::string& vendor,
+ const std::string& product,
+ const std::string& instance="");
void setInterval(uint16_t _interval) { interval = _interval; }
void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange,
qpid::broker::Exchange::shared_ptr directExchange);
@@ -91,6 +96,9 @@ public:
ManagementObject::writeSchemaCall_t schemaCall);
QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object,
uint64_t persistId = 0);
+ QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object,
+ const std::string& key,
+ bool persistent = true);
QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event,
severity_t severity = SEV_DEFAULT);
QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey);
@@ -99,7 +107,8 @@ public:
bool dispatchCommand (qpid::broker::Deliverable& msg,
const std::string& routingKey,
- const framing::FieldTable* args);
+ const framing::FieldTable* args,
+ const bool topic);
const framing::Uuid& getUuid() const { return uuid; }
@@ -128,6 +137,15 @@ public:
uint16_t getBootSequence(void) { return bootSequence; }
void setBootSequence(uint16_t b) { bootSequence = b; }
+ // TODO: remove these when Variant API moved into common library.
+ static types::Variant::Map toMap(const framing::FieldTable& from);
+ static framing::FieldTable fromMap(const types::Variant::Map& from);
+ static types::Variant::List toList(const framing::List& from);
+ static framing::List fromList(const types::Variant::List& from);
+ static boost::shared_ptr<framing::FieldValue> toFieldValue(const types::Variant& in);
+ static types::Variant toVariant(const boost::shared_ptr<framing::FieldValue>& val);
+
+
private:
struct Periodic : public qpid::sys::TimerTask
{
@@ -153,9 +171,8 @@ private:
ManagementObject* GetManagementObject (void) const { return mgmtObject; }
virtual ~RemoteAgent ();
- void encode(framing::Buffer& buffer) const;
- void decode(framing::Buffer& buffer);
- uint32_t encodedSize() const;
+ void mapEncode(qpid::types::Variant::Map& _map) const;
+ void mapDecode(const qpid::types::Variant::Map& _map);
};
// TODO: Eventually replace string with entire reply-to structure. reply-to
@@ -175,9 +192,11 @@ private:
std::string name;
uint8_t hash[16];
+ void mapEncode(qpid::types::Variant::Map& _map) const;
+ void mapDecode(const qpid::types::Variant::Map& _map);
void encode(framing::Buffer& buffer) const;
void decode(framing::Buffer& buffer);
- uint32_t encodedSize() const;
+ uint32_t encodedBufSize() const;
};
struct SchemaClassKeyComp
@@ -209,9 +228,8 @@ private:
bool hasSchema () { return (writeSchemaCall != 0) || !data.empty(); }
void appendSchema (framing::Buffer& buf);
- void encode(framing::Buffer& buffer) const;
- void decode(framing::Buffer& buffer);
- uint32_t encodedSize() const;
+ void mapEncode(qpid::types::Variant::Map& _map) const;
+ void mapDecode(const qpid::types::Variant::Map& _map);
};
typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
@@ -264,6 +282,14 @@ private:
typedef std::map<MethodName, std::string> DisallowedMethods;
DisallowedMethods disallowed;
+ // Agent name and address
+ qpid::types::Variant::Map attrMap;
+ std::string name_address;
+
+ // supported management protocol
+ bool qmf1Support;
+ bool qmf2Support;
+
# define MA_BUFFER_SIZE 65536
char inputBuffer[MA_BUFFER_SIZE];
@@ -279,6 +305,11 @@ private:
uint32_t length,
qpid::broker::Exchange::shared_ptr exchange,
std::string routingKey);
+ void sendBuffer(const std::string& data,
+ const std::string& cid,
+ const qpid::types::Variant::Map& headers,
+ qpid::broker::Exchange::shared_ptr exchange,
+ const std::string& routingKey);
void moveNewObjectsLH();
bool authorizeAgentMessageLH(qpid::broker::Message& msg);
@@ -311,6 +342,10 @@ private:
void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
+ void handleGetQueryLH (const std::string& body, std::string replyToKey, const std::string& cid, const std::string& contentType);
+ void handleMethodRequestLH (const std::string& body, std::string replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken);
+ void handleLocateRequestLH (const std::string& body, const std::string &replyToKey, const std::string& cid);
+
size_t validateSchema(framing::Buffer&, uint8_t kind);
size_t validateTableSchema(framing::Buffer&);
diff --git a/cpp/src/qpid/management/ManagementDirectExchange.cpp b/cpp/src/qpid/management/ManagementDirectExchange.cpp
index 0813e30891..6dc41ef073 100644
--- a/cpp/src/qpid/management/ManagementDirectExchange.cpp
+++ b/cpp/src/qpid/management/ManagementDirectExchange.cpp
@@ -29,13 +29,16 @@ using namespace qpid::framing;
using namespace qpid::sys;
ManagementDirectExchange::ManagementDirectExchange(const string& _name, Manageable* _parent, Broker* b) :
- Exchange (_name, _parent, b), DirectExchange(_name, _parent, b) {}
+ Exchange (_name, _parent, b),
+ DirectExchange(_name, _parent, b),
+ managementAgent(0) {}
ManagementDirectExchange::ManagementDirectExchange(const std::string& _name,
bool _durable,
const FieldTable& _args,
Manageable* _parent, Broker* b) :
Exchange (_name, _durable, _args, _parent, b),
- DirectExchange(_name, _durable, _args, _parent, b) {}
+ DirectExchange(_name, _durable, _args, _parent, b),
+ managementAgent(0) {}
void ManagementDirectExchange::route(Deliverable& msg,
const string& routingKey,
@@ -43,7 +46,8 @@ void ManagementDirectExchange::route(Deliverable& msg,
{
bool routeIt = true;
- // TODO: Intercept messages directed to the embedded agent and send them to the management agent.
+ if (managementAgent)
+ routeIt = managementAgent->dispatchCommand(msg, routingKey, args, false /*direct*/);
if (routeIt)
DirectExchange::route(msg, routingKey, args);
diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp
index 4b87800174..46fc67d07f 100644
--- a/cpp/src/qpid/management/ManagementObject.cpp
+++ b/cpp/src/qpid/management/ManagementObject.cpp
@@ -22,7 +22,10 @@
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementObject.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/Buffer.h"
#include "qpid/sys/Thread.h"
+#include "qpid/log/Statement.h"
+#include <boost/lexical_cast.hpp>
#include <stdlib.h>
@@ -36,26 +39,37 @@ void AgentAttachment::setBanks(uint32_t broker, uint32_t bank)
((uint64_t) (bank & 0x0fffffff));
}
-ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint32_t bank, uint64_t object)
- : agent(0)
+// Deprecated
+ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint64_t object)
+ : agent(0), agentEpoch(seq)
{
first =
((uint64_t) (flags & 0x0f)) << 60 |
((uint64_t) (seq & 0x0fff)) << 48 |
- ((uint64_t) (broker & 0x000fffff)) << 28 |
- ((uint64_t) (bank & 0x0fffffff));
+ ((uint64_t) (broker & 0x000fffff)) << 28;
second = object;
}
-ObjectId::ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq, uint64_t object)
- : agent(_agent)
+
+ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker)
+ : agent(0), second(0), agentEpoch(seq)
{
first =
+ ((uint64_t) (flags & 0x0f)) << 60 |
+ ((uint64_t) (seq & 0x0fff)) << 48 |
+ ((uint64_t) (broker & 0x000fffff)) << 28;
+}
+
+ObjectId::ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq)
+ : agent(_agent), second(0), agentEpoch(seq)
+{
+
+ first =
((uint64_t) (flags & 0x0f)) << 60 |
((uint64_t) (seq & 0x0fff)) << 48;
- second = object;
}
+
ObjectId::ObjectId(std::istream& in) : agent(0)
{
std::string text;
@@ -75,6 +89,10 @@ void ObjectId::fromString(const std::string& text)
# define atoll(X) _atoi64(X)
#endif
+ // format:
+ // V1: <flags>-<sequence>-<broker-bank>-<agent-bank>-<uint64-app-id>
+ // V2: Not used
+
std::string copy(text.c_str());
char* cText;
char* field[FIELDS];
@@ -99,10 +117,13 @@ void ObjectId::fromString(const std::string& text)
if (idx != FIELDS)
throw Exception("Invalid ObjectId format");
+ agentEpoch = atoll(field[1]);
+
first = (atoll(field[0]) << 60) +
(atoll(field[1]) << 48) +
- (atoll(field[2]) << 28) +
- atoll(field[3]);
+ (atoll(field[2]) << 28);
+
+ agentName = std::string(field[3]);
second = atoll(field[4]);
}
@@ -123,21 +144,40 @@ bool ObjectId::equalV1(const ObjectId &other) const
return first == otherFirst && second == other.second;
}
-void ObjectId::encode(framing::Buffer& buffer) const
+// encode as V1-format binary
+void ObjectId::encode(std::string& buffer) const
{
+ const uint32_t len = 16;
+ char _data[len];
+ qpid::framing::Buffer body(_data, len);
+
if (agent == 0)
- buffer.putLongLong(first);
+ body.putLongLong(first);
else
- buffer.putLongLong(first | agent->first);
- buffer.putLongLong(second);
+ body.putLongLong(first | agent->first);
+ body.putLongLong(second);
+
+ body.reset();
+ body.getRawData(buffer, len);
}
-void ObjectId::decode(framing::Buffer& buffer)
+// decode as V1-format binary
+void ObjectId::decode(const std::string& buffer)
{
- first = buffer.getLongLong();
- second = buffer.getLongLong();
+ const uint32_t len = 16;
+ char _data[len];
+ qpid::framing::Buffer body(_data, len);
+
+ body.checkAvailable(buffer.length());
+ body.putRawData(buffer);
+ body.reset();
+ first = body.getLongLong();
+ second = body.getLongLong();
+ v2Key = boost::lexical_cast<std::string>(second);
}
+// generate the V2 key from the index fields defined
+// in the schema.
void ObjectId::setV2Key(const ManagementObject& object)
{
std::stringstream oname;
@@ -145,6 +185,42 @@ void ObjectId::setV2Key(const ManagementObject& object)
v2Key = oname.str();
}
+// encode as V2-format map
+void ObjectId::mapEncode(types::Variant::Map& map) const
+{
+ map["_object_name"] = v2Key;
+ if (!agentName.empty())
+ map["_agent_name"] = agentName;
+ if (agentEpoch)
+ map["_agent_epoch"] = agentEpoch;
+}
+
+// decode as v2-format map
+void ObjectId::mapDecode(const types::Variant::Map& map)
+{
+ types::Variant::Map::const_iterator i;
+
+ if ((i = map.find("_object_name")) != map.end())
+ v2Key = i->second.asString();
+ else
+ throw Exception("Required _object_name field missing.");
+
+ if ((i = map.find("_agent_name")) != map.end())
+ agentName = i->second.asString();
+
+ if ((i = map.find("_agent_epoch")) != map.end())
+ agentEpoch = i->second.asInt64();
+}
+
+
+ObjectId::operator types::Variant::Map() const
+{
+ types::Variant::Map m;
+ mapEncode(m);
+ return m;
+}
+
+
namespace qpid {
namespace management {
@@ -158,7 +234,7 @@ std::ostream& operator<<(std::ostream& out, const ObjectId& i)
out << ((virtFirst & 0xF000000000000000LL) >> 60) <<
"-" << ((virtFirst & 0x0FFF000000000000LL) >> 48) <<
"-" << ((virtFirst & 0x0000FFFFF0000000LL) >> 28) <<
- "-" << (virtFirst & 0x000000000FFFFFFFLL) <<
+ "-" << i.agentName <<
"-" << i.second;
return out;
}
@@ -168,43 +244,88 @@ std::ostream& operator<<(std::ostream& out, const ObjectId& i)
int ManagementObject::maxThreads = 1;
int ManagementObject::nextThreadIndex = 0;
-void ManagementObject::writeTimestamps (framing::Buffer& buf) const
+void ManagementObject::writeTimestamps (std::string& buf) const
{
- buf.putShortString (getPackageName ());
- buf.putShortString (getClassName ());
- buf.putBin128 (getMd5Sum ());
- buf.putLongLong (updateTime);
- buf.putLongLong (createTime);
- buf.putLongLong (destroyTime);
- objectId.encode(buf);
+ char _data[4000];
+ qpid::framing::Buffer body(_data, 4000);
+
+ body.putShortString (getPackageName ());
+ body.putShortString (getClassName ());
+ body.putBin128 (getMd5Sum ());
+ body.putLongLong (updateTime);
+ body.putLongLong (createTime);
+ body.putLongLong (destroyTime);
+
+ uint32_t len = body.getPosition();
+ body.reset();
+ body.getRawData(buf, len);
+
+ std::string oid;
+ objectId.encode(oid);
+ buf += oid;
}
-void ManagementObject::readTimestamps (framing::Buffer& buf)
+void ManagementObject::readTimestamps (const std::string& buf)
{
+ char _data[4000];
+ qpid::framing::Buffer body(_data, 4000);
std::string unused;
uint8_t unusedUuid[16];
- ObjectId unusedObjectId;
- buf.getShortString(unused);
- buf.getShortString(unused);
- buf.getBin128(unusedUuid);
- updateTime = buf.getLongLong();
- createTime = buf.getLongLong();
- destroyTime = buf.getLongLong();
- unusedObjectId.decode(buf);
+ body.checkAvailable(buf.length());
+ body.putRawData(buf);
+ body.reset();
+
+ body.getShortString(unused);
+ body.getShortString(unused);
+ body.getBin128(unusedUuid);
+ updateTime = body.getLongLong();
+ createTime = body.getLongLong();
+ destroyTime = body.getLongLong();
}
uint32_t ManagementObject::writeTimestampsSize() const
{
return 1 + getPackageName().length() + // str8
- 1 + getClassName().length() + // str8
- 16 + // bin128
- 8 + // uint64
- 8 + // uint64
- 8 + // uint64
- objectId.encodedSize(); // objectId
+ 1 + getClassName().length() + // str8
+ 16 + // bin128
+ 8 + // uint64
+ 8 + // uint64
+ 8 + // uint64
+ objectId.encodedSize(); // objectId
+}
+
+
+void ManagementObject::writeTimestamps (types::Variant::Map& map) const
+{
+ types::Variant::Map oid, sid;
+
+ sid["_package_name"] = getPackageName();
+ sid["_class_name"] = getClassName();
+ sid["_hash"] = qpid::types::Uuid(getMd5Sum());
+ map["_schema_id"] = sid;
+
+ objectId.mapEncode(oid);
+ map["_object_id"] = oid;
+
+ map["_update_ts"] = updateTime;
+ map["_create_ts"] = createTime;
+ map["_delete_ts"] = destroyTime;
+}
+
+void ManagementObject::readTimestamps (const types::Variant::Map& map)
+{
+ types::Variant::Map::const_iterator i;
+
+ if ((i = map.find("_update_ts")) != map.end())
+ updateTime = i->second.asUint64();
+ if ((i = map.find("_create_ts")) != map.end())
+ createTime = i->second.asUint64();
+ if ((i = map.find("_delete_ts")) != map.end())
+ destroyTime = i->second.asUint64();
}
+
void ManagementObject::setReference(ObjectId) {}
int ManagementObject::getThreadIndex() {
@@ -217,3 +338,26 @@ int ManagementObject::getThreadIndex() {
}
return thisIndex;
}
+
+
+void ManagementObject::mapEncode(types::Variant::Map& map,
+ bool includeProperties,
+ bool includeStatistics)
+{
+ types::Variant::Map values;
+
+ writeTimestamps(map);
+
+ mapEncodeValues(values, includeProperties, includeStatistics);
+ map["_values"] = values;
+}
+
+void ManagementObject::mapDecode(const types::Variant::Map& map)
+{
+ types::Variant::Map::const_iterator i;
+
+ readTimestamps(map);
+
+ if ((i = map.find("_values")) != map.end())
+ mapDecodeValues(i->second.asMap());
+}
diff --git a/cpp/src/qpid/management/ManagementTopicExchange.cpp b/cpp/src/qpid/management/ManagementTopicExchange.cpp
index 98650b3adf..7fdce133e5 100644
--- a/cpp/src/qpid/management/ManagementTopicExchange.cpp
+++ b/cpp/src/qpid/management/ManagementTopicExchange.cpp
@@ -28,13 +28,16 @@ using namespace qpid::framing;
using namespace qpid::sys;
ManagementTopicExchange::ManagementTopicExchange(const string& _name, Manageable* _parent, Broker* b) :
- Exchange (_name, _parent, b), TopicExchange(_name, _parent, b) {}
+ Exchange (_name, _parent, b),
+ TopicExchange(_name, _parent, b),
+ managementAgent(0) {}
ManagementTopicExchange::ManagementTopicExchange(const std::string& _name,
bool _durable,
const FieldTable& _args,
Manageable* _parent, Broker* b) :
Exchange (_name, _durable, _args, _parent, b),
- TopicExchange(_name, _durable, _args, _parent, b) {}
+ TopicExchange(_name, _durable, _args, _parent, b),
+ managementAgent(0) {}
void ManagementTopicExchange::route(Deliverable& msg,
const string& routingKey,
@@ -43,12 +46,8 @@ void ManagementTopicExchange::route(Deliverable& msg,
bool routeIt = true;
// Intercept management agent commands
- if (qmfVersion == 1) {
- if ((routingKey.length() > 6 &&
- routingKey.substr(0, 6).compare("agent.") == 0) ||
- (routingKey == "broker"))
- routeIt = managementAgent->dispatchCommand(msg, routingKey, args);
- }
+ if (managementAgent)
+ routeIt = managementAgent->dispatchCommand(msg, routingKey, args, true /* topic */);
if (routeIt)
TopicExchange::route(msg, routingKey, args);
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 7377edc3bb..9c1a761062 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -267,15 +267,15 @@ txjob_LDADD=$(lib_client)
check_PROGRAMS+=PollerTest
PollerTest_SOURCES=PollerTest.cpp
-PollerTest_LDADD=$(lib_common) $(SOCKLIBS)
+PollerTest_LDADD=$(lib_common) $(lib_client) $(SOCKLIBS)
check_PROGRAMS+=DispatcherTest
DispatcherTest_SOURCES=DispatcherTest.cpp
-DispatcherTest_LDADD=$(lib_common) $(SOCKLIBS)
+DispatcherTest_LDADD=$(lib_common) $(lib_client) $(SOCKLIBS)
check_PROGRAMS+=datagen
datagen_SOURCES=datagen.cpp
-datagen_LDADD=$(lib_common)
+datagen_LDADD=$(lib_common) $(lib_client)
check_PROGRAMS+=qrsh_server
qrsh_server_SOURCES=qrsh_server.cpp
diff --git a/cpp/src/tests/ManagementTest.cpp b/cpp/src/tests/ManagementTest.cpp
index d05b4676ba..99b9c1f03e 100644
--- a/cpp/src/tests/ManagementTest.cpp
+++ b/cpp/src/tests/ManagementTest.cpp
@@ -56,32 +56,34 @@ QPID_AUTO_TEST_CASE(testObjectIdSerializeString) {
}
QPID_AUTO_TEST_CASE(testObjectIdEncode) {
- char buffer[100];
- Buffer msgBuf(buffer, 100);
- msgBuf.putLongLong(0x1002000030000004LL);
- msgBuf.putLongLong(0x0000000000000005LL);
- msgBuf.reset();
+ qpid::types::Variant::Map oidMap;
- ObjectId oid(msgBuf);
+ ObjectId oid(1, 2, 3, 9999);
+ oid.setV2Key("testkey");
+ oid.setAgentName("myAgent");
std::stringstream out1;
out1 << oid;
- BOOST_CHECK_EQUAL(out1.str(), "1-2-3-4-5");
+ BOOST_CHECK_EQUAL(out1.str(), "1-2-3-myAgent-9999");
}
QPID_AUTO_TEST_CASE(testObjectIdAttach) {
AgentAttachment agent;
- ObjectId oid(&agent, 10, 20, 50);
+ ObjectId oid(&agent, 10, 20);
+ oid.setV2Key("GabbaGabbaHey");
+ oid.setAgentName("MrSmith");
std::stringstream out1;
out1 << oid;
- BOOST_CHECK_EQUAL(out1.str(), "10-20-0-0-50");
+
+ BOOST_CHECK_EQUAL(out1.str(), "10-20-0-MrSmith-0");
agent.setBanks(30, 40);
std::stringstream out2;
out2 << oid;
- BOOST_CHECK_EQUAL(out2.str(), "10-20-30-40-50");
+
+ BOOST_CHECK_EQUAL(out2.str(), "10-20-30-MrSmith-0");
}
QPID_AUTO_TEST_CASE(testConsoleObjectId) {
diff --git a/extras/qmf/src/py/qmf/console.py b/extras/qmf/src/py/qmf/console.py
index 7bda233b9a..3d32ae36f7 100644
--- a/extras/qmf/src/py/qmf/console.py
+++ b/extras/qmf/src/py/qmf/console.py
@@ -41,6 +41,9 @@ from cStringIO import StringIO
#import qpid.log
#qpid.log.enable(name="qpid.io.cmd", level=qpid.log.DEBUG)
+#===================================================================================================
+# CONSOLE
+#===================================================================================================
class Console:
""" To access the asynchronous operations, a class must be derived from
Console with overrides of any combination of the available methods. """
@@ -94,6 +97,10 @@ class Console:
""" Invoked when a method response from an asynchronous method call is received. """
pass
+
+#===================================================================================================
+# BrokerURL
+#===================================================================================================
class BrokerURL(URL):
def __init__(self, text):
URL.__init__(self, text)
@@ -115,16 +122,30 @@ class BrokerURL(URL):
def match(self, host, port):
return socket.getaddrinfo(self.host, self.port)[0][4] == socket.getaddrinfo(host, port)[0][4]
+
+#===================================================================================================
+# Object
+#===================================================================================================
class Object(object):
- """ This class defines a 'proxy' object representing a real managed object on an agent.
- Actions taken on this proxy are remotely affected on the real managed object.
"""
- def __init__(self, session, broker, schema, codec, prop, stat, managed=True, kwargs={}):
- self._session = session
- self._broker = broker
+ This class defines a 'proxy' object representing a real managed object on an agent.
+ Actions taken on this proxy are remotely affected on the real managed object.
+ """
+ def __init__(self, agent, schema, codec=None, prop=None, stat=None, v2Map=None, agentName=None, kwargs={}):
+ self._agent = agent
+ self._session = None
+ self._broker = None
+ if agent:
+ self._session = agent.session
+ self._broker = agent.broker
self._schema = schema
- self._managed = managed
- if self._managed:
+ self._properties = []
+ self._statistics = []
+ if v2Map:
+ self.v2Init(v2Map, agentName)
+ return
+
+ if self._agent:
self._currentTime = codec.read_uint64()
self._createTime = codec.read_uint64()
self._deleteTime = codec.read_uint64()
@@ -134,8 +155,6 @@ class Object(object):
self._createTime = None
self._deleteTime = None
self._objectId = None
- self._properties = []
- self._statistics = []
if codec:
if prop:
notPresent = self._parsePresenceMasks(codec, schema)
@@ -143,18 +162,38 @@ class Object(object):
if property.name in notPresent:
self._properties.append((property, None))
else:
- self._properties.append((property, self._session._decodeValue(codec, property.type, broker)))
+ self._properties.append((property, self._session._decodeValue(codec, property.type, self._broker)))
if stat:
for statistic in schema.getStatistics():
- self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type, broker)))
+ self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type, self._broker)))
else:
for property in schema.getProperties():
if property.optional:
self._properties.append((property, None))
else:
- self._properties.append((property, self._session._defaultValue(property, broker, kwargs)))
+ self._properties.append((property, self._session._defaultValue(property, self._broker, kwargs)))
for statistic in schema.getStatistics():
- self._statistics.append((statistic, self._session._defaultValue(statistic, broker, kwargs)))
+ self._statistics.append((statistic, self._session._defaultValue(statistic, self._broker, kwargs)))
+
+ def v2Init(self, omap, agentName):
+ if omap.__class__ != dict:
+ raise Exception("QMFv2 object data must be a map/dict")
+ if '_values' not in omap:
+ raise Exception("QMFv2 object must have '_values' element")
+
+ values = omap['_values']
+ for prop in self._schema.getProperties():
+ if prop.name in values:
+ self._properties.append((prop, values[prop.name]))
+ for stat in self._schema.getStatistics():
+ if stat.name in values:
+ self._statistics.append((stat, values[stat.name]))
+ if '_subtypes' in omap:
+ self._subtypes = omap['_subtypes']
+ if '_object_id' in omap:
+ self._objectId = ObjectId(omap['_object_id'], agentName=agentName)
+ else:
+ self._objectId = None
def getBroker(self):
""" Return the broker from which this object was sent """
@@ -186,7 +225,7 @@ class Object(object):
def isManaged(self):
""" Return True iff this object is a proxy for a managed object on an agent. """
- return self._managed
+ return self._objectId and self._agent
def getIndex(self):
""" Return a string describing this object's primary key. """
@@ -225,7 +264,7 @@ class Object(object):
""" Contact the agent and retrieve the lastest property and statistic values for this object. """
if not self.isManaged():
raise Exception("Object is not managed")
- obj = self._session.getObjects(_objectId = self._objectId, _broker=self._broker)
+ obj = self._agent.getObjects(_objectId=self._objectId)
if obj:
self.mergeUpdate(obj[0])
else:
@@ -244,17 +283,17 @@ class Object(object):
for method in self._schema.getMethods():
if name == method.name:
return lambda *args, **kwargs : self._invoke(name, args, kwargs)
- for property, value in self._properties:
- if name == property.name:
+ for prop, value in self._properties:
+ if name == prop.name:
return value
- if name == "_" + property.name + "_" and property.type == 10: # Dereference references
- deref = self._session.getObjects(_objectId=value, _broker=self._broker)
+ if name == "_" + prop.name + "_" and prop.type == 10: # Dereference references
+ deref = self._agent.getObjects(_objectId=value)
if len(deref) != 1:
return None
else:
return deref[0]
- for statistic, value in self._statistics:
- if name == statistic.name:
+ for stat, value in self._statistics:
+ if name == stat.name:
return value
raise Exception("Type Object has no attribute '%s'" % name)
@@ -282,10 +321,6 @@ class Object(object):
aIdx = 0
sendCodec = Codec()
seq = self._session.seqMgr._reserve((method, synchronous))
- self._broker._setHeader(sendCodec, 'M', seq)
- self._objectId.encode(sendCodec)
- self._schema.getKey().encode(sendCodec)
- sendCodec.write_str8(name)
count = 0
for arg in method.arguments:
@@ -294,24 +329,64 @@ class Object(object):
if count != len(args):
raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args)))
- for arg in method.arguments:
- if arg.dir.find("I") != -1:
- self._session._encodeValue(sendCodec, args[aIdx], arg.type)
- aIdx += 1
- if timeWait:
- ttl = timeWait * 1000
+ if self._agent.isV2:
+ #
+ # Compose and send a QMFv2 method request
+ #
+ call = {}
+ call['_object_id'] = self._objectId.asMap()
+ call['_method_name'] = name
+ argMap = {}
+ for arg in method.arguments:
+ if arg.dir.find("I") != -1:
+ argMap[arg.name] = args[aIdx]
+ aIdx += 1
+ call['_arguments'] = argMap
+
+ dp = self._broker.amqpSession.delivery_properties()
+ dp.routing_key = self._objectId.getAgentBank()
+ mp = self._broker.amqpSession.message_properties()
+ mp.content_type = "amqp/map"
+ mp.user_id = self._broker.authUser
+ mp.correlation_id = str(seq)
+ mp.app_id = "qmf2"
+ mp.reply_to = self._broker.amqpSession.reply_to("qmf.default.direct", self._broker.v2_queue_name)
+ mp.application_headers = {'qmf.opcode':'_method_request'}
+ sendCodec.write_map(call)
+ smsg = Message(dp, mp, sendCodec.encoded)
+ exchange = "qmf.default.direct"
+
else:
- ttl = None
- smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" %
- (self._objectId.getBrokerBank(), self._objectId.getAgentBank()),
- ttl=ttl)
+ #
+ # Associate this sequence with the agent hosting the object so we can correctly
+ # route the method-response
+ #
+ agent = self._broker.getAgent(self._broker.getBrokerBank(), self._objectId.getAgentBank())
+ self._broker._setSequence(seq, agent)
+
+ #
+ # Compose and send a QMFv1 method request
+ #
+ self._broker._setHeader(sendCodec, 'M', seq)
+ self._objectId.encode(sendCodec)
+ self._schema.getKey().encode(sendCodec)
+ sendCodec.write_str8(name)
+
+ for arg in method.arguments:
+ if arg.dir.find("I") != -1:
+ self._session._encodeValue(sendCodec, args[aIdx], arg.type)
+ aIdx += 1
+ smsg = self._broker._message(sendCodec.encoded, "agent.%d.%s" %
+ (self._objectId.getBrokerBank(), self._objectId.getAgentBank()))
+ exchange = "qpid.management"
+
if synchronous:
try:
self._broker.cv.acquire()
self._broker.syncInFlight = True
finally:
self._broker.cv.release()
- self._broker._send(smsg)
+ self._broker._send(smsg, exchange)
return seq
return None
@@ -352,7 +427,6 @@ class Object(object):
raise Exception("Invalid Method (software defect) [%s]" % name)
def _encodeUnmanaged(self, codec):
-
codec.write_uint8(20)
codec.write_str8(self._schema.getKey().getPackageName())
codec.write_str8(self._schema.getKey().getClassName())
@@ -399,6 +473,10 @@ class Object(object):
bit = 0
return excludeList
+
+#===================================================================================================
+# Session
+#===================================================================================================
class Session:
"""
An instance of the Session class represents a console session running
@@ -423,6 +501,7 @@ class Session:
list: 21
}
+
def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True,
manageConnections=False, userBindings=False):
"""
@@ -450,7 +529,7 @@ class Session:
"""
self.console = console
self.brokers = []
- self.packages = {}
+ self.schemaCache = SchemaCache()
self.seqMgr = SequenceManager()
self.cv = Condition()
self.syncSequenceList = []
@@ -471,9 +550,35 @@ class Session:
if self.userBindings and not self.rcvObjects:
raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided")
+
+ def _getBrokerForAgentAddr(self, agent_addr):
+ try:
+ self.cv.acquire()
+ key = (1, agent_addr)
+ for b in self.brokers:
+ if key in b.agents:
+ return b
+ finally:
+ self.cv.release()
+ return None
+
+
+ def _getAgentForAgentAddr(self, agent_addr):
+ try:
+ self.cv.acquire()
+ key = agent_addr
+ for b in self.brokers:
+ if key in b.agents:
+ return b.agents[key]
+ finally:
+ self.cv.release()
+ return None
+
+
def __repr__(self):
return "QMF Console Session Manager (brokers: %d)" % len(self.brokers)
+
def addBroker(self, target="localhost", timeout=None, mechanisms=None):
""" Connect to a Qpid broker. Returns an object of type Broker. """
url = BrokerURL(target)
@@ -482,9 +587,12 @@ class Session:
self.brokers.append(broker)
if not self.manageConnections:
- self.getObjects(broker=broker, _class="agent")
+ agent = broker.getAgent(1,0)
+ if agent:
+ agent.getObjects(_class="agent")
return broker
+
def delBroker(self, broker):
""" Disconnect from a broker. The 'broker' argument is the object
returned from the addBroker call """
@@ -495,34 +603,27 @@ class Session:
self.brokers.remove(broker)
del broker
+
def getPackages(self):
""" Get the list of known QMF packages """
for broker in self.brokers:
broker._waitForStable()
- list = []
- for package in self.packages:
- list.append(package)
- return list
+ return self.schemaCache.getPackages()
+
def getClasses(self, packageName):
""" Get the list of known classes within a QMF package """
for broker in self.brokers:
broker._waitForStable()
- list = []
- if packageName in self.packages:
- for pkey in self.packages[packageName]:
- list.append(self.packages[packageName][pkey].getKey())
- return list
+ return self.schemaCache.getClasses(packageName)
+
def getSchema(self, classKey):
""" Get the schema for a QMF class """
for broker in self.brokers:
broker._waitForStable()
- pname = classKey.getPackageName()
- pkey = classKey.getPackageKey()
- if pname in self.packages:
- if pkey in self.packages[pname]:
- return self.packages[pname][pkey]
+ return self.schemaCache.getSchema(classKey)
+
def bindPackage(self, packageName):
""" Request object updates for all table classes within a package. """
@@ -535,6 +636,7 @@ class Session:
broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
binding_key=key)
+
def bindClass(self, pname, cname):
""" Request object updates for a particular table class by package and class name. """
if not self.userBindings or not self.rcvObjects:
@@ -545,6 +647,7 @@ class Session:
if broker.isConnected():
broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
binding_key=key)
+
def bindClassKey(self, classKey):
""" Request object updates for a particular table class by class key. """
@@ -552,6 +655,7 @@ class Session:
cname = classKey.getClassName()
self.bindClass(pname, cname)
+
def getAgents(self, broker=None):
""" Get a list of currently known agents """
brokerList = []
@@ -569,12 +673,14 @@ class Session:
agentList.append(a)
return agentList
- def makeObject(self, classKey, broker=None, **kwargs):
+
+ def makeObject(self, classKey, **kwargs):
""" Create a new, unmanaged object of the schema indicated by classKey """
schema = self.getSchema(classKey)
if schema == None:
raise Exception("Schema not found for classKey")
- return Object(self, broker, schema, None, True, True, False, kwargs)
+ return Object(None, schema, None, True, True, kwargs)
+
def getObjects(self, **kwargs):
""" Get a list of objects from QMF agents.
@@ -644,81 +750,24 @@ class Session:
if len(agentList) == 0:
return []
- pname = None
- cname = None
- hash = None
- classKey = None
- if "_schema" in kwargs: classKey = kwargs["_schema"].getKey()
- elif "_key" in kwargs: classKey = kwargs["_key"]
- elif "_class" in kwargs:
- cname = kwargs["_class"]
- if "_package" in kwargs:
- pname = kwargs["_package"]
- if cname == None and classKey == None and "_objectId" not in kwargs:
- raise Exception("No class supplied, use '_schema', '_key', '_class', or '_objectId' argument")
-
- map = {}
- self.getSelect = []
- if "_objectId" in kwargs:
- map["_objectid"] = kwargs["_objectId"].__repr__()
- else:
- if cname == None:
- cname = classKey.getClassName()
- pname = classKey.getPackageName()
- hash = classKey.getHash()
- map["_class"] = cname
- if pname != None: map["_package"] = pname
- if hash != None: map["_hash"] = hash
- for item in kwargs:
- if item[0] != '_':
- self.getSelect.append((item, kwargs[item]))
-
- self.getResult = []
+ #
+ # We now have a list of agents to query, start the queries and gather the results.
+ #
+ request = SessionGetRequest(len(agentList))
for agent in agentList:
- broker = agent.broker
- sendCodec = Codec()
- try:
- self.cv.acquire()
- seq = self.seqMgr._reserve(self._CONTEXT_MULTIGET)
- self.syncSequenceList.append(seq)
- finally:
- self.cv.release()
- broker._setHeader(sendCodec, 'G', seq)
- sendCodec.write_map(map)
- smsg = broker._message(sendCodec.encoded, "agent.%d.%d" % (agent.brokerBank, agent.agentBank))
- broker._send(smsg)
+ agent.getObjects(request, **kwargs)
+ timeout = 60
+ if '_timeout' in kwargs:
+ timeout = kwargs['_timeout']
+ request.wait(timeout)
+ return request.result
- starttime = time()
- timeout = False
- if "_timeout" in kwargs:
- waitTime = kwargs["_timeout"]
- else:
- waitTime = self.DEFAULT_GET_WAIT_TIME
- try:
- self.cv.acquire()
- while len(self.syncSequenceList) > 0 and self.error == None:
- self.cv.wait(waitTime)
- if time() - starttime > waitTime:
- for pendingSeq in self.syncSequenceList:
- self.seqMgr._release(pendingSeq)
- self.syncSequenceList = []
- timeout = True
- finally:
- self.cv.release()
-
- if self.error:
- errorText = self.error
- self.error = None
- raise Exception(errorText)
-
- if len(self.getResult) == 0 and timeout:
- raise RuntimeError("No agent responded within timeout period")
- return self.getResult
def setEventFilter(self, **kwargs):
""" """
pass
+
def _bindingKeys(self):
keyList = []
keyList.append("schema.#")
@@ -735,18 +784,21 @@ class Session:
keyList.append("console.heartbeat.#")
return keyList
+
def _handleBrokerConnect(self, broker):
if self.console:
for agent in broker.getAgents():
self.console.newAgent(agent)
self.console.brokerConnected(broker)
+
def _handleBrokerDisconnect(self, broker):
if self.console:
for agent in broker.getAgents():
self.console.delAgent(agent)
self.console.brokerDisconnected(broker)
+
def _handleBrokerResp(self, broker, codec, seq):
broker.brokerId = codec.read_uuid()
if self.console != None:
@@ -760,16 +812,10 @@ class Session:
smsg = broker._message(sendCodec.encoded)
broker._send(smsg)
+
def _handlePackageInd(self, broker, codec, seq):
pname = str(codec.read_str8())
- notify = False
- try:
- self.cv.acquire()
- if pname not in self.packages:
- self.packages[pname] = {}
- notify = True
- finally:
- self.cv.release()
+ notify = self.schemaCache.declarePackage(pname)
if notify and self.console != None:
self.console.newPackage(pname)
@@ -782,7 +828,8 @@ class Session:
smsg = broker._message(sendCodec.encoded)
broker._send(smsg)
- def _handleCommandComplete(self, broker, codec, seq):
+
+ def _handleCommandComplete(self, broker, codec, seq, agent):
code = codec.read_uint32()
text = codec.read_str8()
context = self.seqMgr._release(seq)
@@ -804,20 +851,16 @@ class Session:
finally:
self.cv.release()
+ if agent:
+ agent._handleV1Completion(seq, code, text)
+
+
def _handleClassInd(self, broker, codec, seq):
kind = codec.read_uint8()
classKey = ClassKey(codec)
- unknown = False
+ schema = self.schemaCache.getSchema(classKey)
- try:
- self.cv.acquire()
- if classKey.getPackageName() in self.packages:
- if classKey.getPackageKey() not in self.packages[classKey.getPackageName()]:
- unknown = True
- finally:
- self.cv.release()
-
- if unknown:
+ if not schema:
# Send a schema request for the unknown class
broker._incOutstanding()
sendCodec = Codec()
@@ -827,30 +870,6 @@ class Session:
smsg = broker._message(sendCodec.encoded)
broker._send(smsg)
- def _handleMethodResp(self, broker, codec, seq):
- code = codec.read_uint32()
- text = codec.read_str16()
- outArgs = {}
- pair = self.seqMgr._release(seq)
- if pair == None:
- return
- method, synchronous = pair
- if code == 0:
- for arg in method.arguments:
- if arg.dir.find("O") != -1:
- outArgs[arg.name] = self._decodeValue(codec, arg.type, broker)
- result = MethodResult(code, text, outArgs)
- if synchronous:
- try:
- broker.cv.acquire()
- broker.syncResult = result
- broker.syncInFlight = False
- broker.cv.notify()
- finally:
- broker.cv.release()
- else:
- if self.console:
- self.console.methodResponse(broker, seq, result)
def _handleHeartbeatInd(self, broker, codec, seq, msg):
brokerBank = 1
@@ -873,59 +892,49 @@ class Session:
timestamp = codec.read_uint64()
if self.console != None and agent != None:
self.console.heartbeat(agent, timestamp)
+ broker._ageAgents()
- def _handleEventInd(self, broker, codec, seq):
- if self.console != None:
- event = Event(self, broker, codec)
- self.console.event(broker, event)
- def _handleSchemaResp(self, broker, codec, seq):
+ def _handleSchemaResp(self, broker, codec, seq, agent_addr):
kind = codec.read_uint8()
classKey = ClassKey(codec)
_class = SchemaClass(kind, classKey, codec, self)
- try:
- self.cv.acquire()
- self.packages[classKey.getPackageName()][classKey.getPackageKey()] = _class
- finally:
- self.cv.release()
-
- self.seqMgr._release(seq)
- broker._decOutstanding()
+ self.schemaCache.declareClass(classKey, _class)
+ ctx = self.seqMgr._release(seq)
+ if ctx:
+ broker._decOutstanding()
if self.console != None:
self.console.newClass(kind, classKey)
- def _handleContentInd(self, broker, codec, seq, prop=False, stat=False):
- classKey = ClassKey(codec)
- try:
- self.cv.acquire()
- pname = classKey.getPackageName()
- if pname not in self.packages:
- return
- pkey = classKey.getPackageKey()
- if pkey not in self.packages[pname]:
- return
- schema = self.packages[pname][pkey]
- finally:
- self.cv.release()
+ if agent_addr and (agent_addr.__class__ == str or agent_addr.__class__ == unicode):
+ agent = self._getAgentForAgentAddr(agent_addr)
+ if agent:
+ agent._schemaInfoFromV2Agent()
- object = Object(self, broker, schema, codec, prop, stat)
- if pname == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop:
- broker._updateAgent(object)
+ def _v2HandleHeartbeatInd(self, broker, mp, ah, content):
try:
- self.cv.acquire()
- if seq in self.syncSequenceList:
- if object.getTimestamps()[2] == 0 and self._selectMatch(object):
- self.getResult.append(object)
- return
- finally:
- self.cv.release()
+ agentName = ah["qmf.agent"]
+ values = content["_values"]
+ timestamp = values["timestamp"]
+ interval = values["heartbeat_interval"]
+ except:
+ return
+
+ agent = broker.getAgent(1, agentName)
+ if agent == None:
+ agent = Agent(broker, agentName, "QMFv2 Agent", True, interval)
+ broker._addAgent(agentName, agent)
+ else:
+ agent.touch()
+ if self.console and agent:
+ self.console.heartbeat(agent, timestamp)
+ broker._ageAgents()
+
+
+ def _v2HandleAgentLocateRsp(self, broker, mp, ah, content):
+ self._v2HandleHeartbeatInd(broker, mp, ah, content)
- if self.console and self.rcvObjects:
- if prop:
- self.console.objectProps(broker, object)
- if stat:
- self.console.objectStats(broker, object)
def _handleError(self, error):
try:
@@ -937,6 +946,7 @@ class Session:
finally:
self.cv.release()
+
def _selectMatch(self, object):
""" Check the object against self.getSelect to check for a match """
for key, value in self.getSelect:
@@ -945,6 +955,7 @@ class Session:
return False
return True
+
def _decodeValue(self, codec, typecode, broker=None):
""" Decode, from the codec, a value based on its typecode. """
if typecode == 1: data = codec.read_uint8() # U8
@@ -971,17 +982,9 @@ class Session:
inner_type_code = codec.read_uint8()
if inner_type_code == 20:
classKey = ClassKey(codec)
- try:
- self.cv.acquire()
- pname = classKey.getPackageName()
- if pname not in self.packages:
- return None
- pkey = classKey.getPackageKey()
- if pkey not in self.packages[pname]:
- return None
- schema = self.packages[pname][pkey]
- finally:
- self.cv.release()
+ schema = self.schemaCache.getSchema(classKey)
+ if not schema:
+ return None
data = Object(self, broker, schema, codec, True, True, False)
else:
data = self._decodeValue(codec, inner_type_code, broker)
@@ -999,6 +1002,7 @@ class Session:
raise ValueError("Invalid type code: %d" % typecode)
return data
+
def _encodeValue(self, codec, value, typecode):
""" Encode, into the codec, a value based on its typecode. """
if typecode == 1: codec.write_uint8 (int(value)) # U8
@@ -1033,9 +1037,11 @@ class Session:
else:
raise ValueError ("Invalid type code: %d" % typecode)
+
def encoding(self, value):
return self._encoding(value.__class__)
+
def _encoding(self, klass):
if Session.ENCODINGS.has_key(klass):
return self.ENCODINGS[klass]
@@ -1044,6 +1050,7 @@ class Session:
if result != None:
return result
+
def _displayValue(self, value, typecode):
""" """
if typecode == 1: return unicode(value)
@@ -1072,6 +1079,7 @@ class Session:
else:
raise ValueError ("Invalid type code: %d" % typecode)
+
def _defaultValue(self, stype, broker=None, kwargs={}):
""" """
typecode = stype.type
@@ -1110,6 +1118,7 @@ class Session:
else:
raise ValueError ("Invalid type code: %d" % typecode)
+
def _bestClassKey(self, pname, cname, preferredList):
""" """
if pname == None or cname == None:
@@ -1125,6 +1134,7 @@ class Session:
return c
return None
+
def _sendMethodRequest(self, broker, schemaKey, objectId, name, argList):
""" This function can be used to send a method request to an object given only the
broker, schemaKey, and objectId. This is an uncommon usage pattern as methods are
@@ -1133,14 +1143,10 @@ class Session:
schema = self.getSchema(schemaKey)
for method in schema.getMethods():
if name == method.name:
- aIdx = 0
- sendCodec = Codec()
- seq = self.seqMgr._reserve((method, False))
- broker._setHeader(sendCodec, 'M', seq)
- objectId.encode(sendCodec)
- schemaKey.encode(sendCodec)
- sendCodec.write_str8(name)
-
+ #
+ # Count the arguments supplied and validate that the number is what is expected
+ # based on the schema.
+ #
count = 0
for arg in method.arguments:
if arg.dir.find("I") != -1:
@@ -1148,25 +1154,192 @@ class Session:
if count != len(argList):
raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(argList)))
- for arg in method.arguments:
- if arg.dir.find("I") != -1:
- self._encodeValue(sendCodec, argList[aIdx], arg.type)
- aIdx += 1
- smsg = broker._message(sendCodec.encoded, "agent.%d.%d" %
- (objectId.getBrokerBank(), objectId.getAgentBank()))
- broker._send(smsg)
+ aIdx = 0
+ sendCodec = Codec()
+ seq = self.seqMgr._reserve((method, False))
+
+ if objectId.isV2():
+ #
+ # Compose and send a QMFv2 method request
+ #
+ call = {}
+ call['_object_id'] = objectId.asMap()
+ call['_method_name'] = name
+ args = {}
+ for arg in method.arguments:
+ if arg.dir.find("I") != -1:
+ args[arg.name] = argList[aIdx]
+ aIdx += 1
+ call['_arguments'] = args
+
+ dp = broker.amqpSession.delivery_properties()
+ dp.routing_key = objectId.getAgentBank()
+ mp = broker.amqpSession.message_properties()
+ mp.content_type = "amqp/map"
+ mp.user_id = broker.authUser
+ mp.correlation_id = str(seq)
+ mp.app_id = "qmf2"
+ mp.reply_to = broker.amqpSession.reply_to("qmf.default.direct", broker.v2_queue_name)
+ mp.application_headers = {'qmf.opcode':'_method_request'}
+ sendCodec.write_map(call)
+ msg = Message(dp, mp, sendCodec.encoded)
+ broker._send(msg, "qmf.default.direct")
+
+ else:
+ #
+ # Associate this sequence with the agent hosting the object so we can correctly
+ # route the method-response
+ #
+ agent = broker.getAgent(broker.getBrokerBank(), objectId.getAgentBank())
+ broker._setSequence(seq, agent)
+
+ #
+ # Compose and send a QMFv1 method request
+ #
+ broker._setHeader(sendCodec, 'M', seq)
+ objectId.encode(sendCodec)
+ schemaKey.encode(sendCodec)
+ sendCodec.write_str8(name)
+
+ for arg in method.arguments:
+ if arg.dir.find("I") != -1:
+ self._encodeValue(sendCodec, argList[aIdx], arg.type)
+ aIdx += 1
+ smsg = broker._message(sendCodec.encoded, "agent.%d.%s" %
+ (objectId.getBrokerBank(), objectId.getAgentBank()))
+ broker._send(smsg)
return seq
return None
-class Package:
- """ """
- def __init__(self, name):
- self.name = name
+#===================================================================================================
+# SessionGetRequest
+#===================================================================================================
+class SessionGetRequest(object):
+ """
+ This class is used to track get-object queries at the Session level.
+ """
+ def __init__(self, agentCount):
+ self.agentCount = agentCount
+ self.result = []
+ self.cv = Condition()
+ self.waiting = True
+
+ def __call__(self, **kwargs):
+ """
+ Callable entry point for gathering collected objects.
+ """
+ try:
+ self.cv.acquire()
+ if 'qmf_object' in kwargs:
+ self.result.append(kwargs['qmf_object'])
+ elif 'qmf_complete' in kwargs or 'qmf_exception' in kwargs:
+ self.agentCount -= 1
+ if self.agentCount == 0:
+ self.waiting = None
+ self.cv.notify()
+ finally:
+ self.cv.release()
+
+ def wait(self, timeout):
+ starttime = time()
+ try:
+ self.cv.acquire()
+ while self.waiting:
+ if (time() - starttime) > timeout:
+ raise Exception("Timed out after %d seconds" % timeout)
+ self.cv.wait(1)
+ finally:
+ self.cv.release()
+
+
+#===================================================================================================
+# SchemaCache
+#===================================================================================================
+class SchemaCache(object):
+ """
+ The SchemaCache is a data structure that stores learned schema information.
+ """
+ def __init__(self):
+ """
+ Create a map of schema packages and a lock to protect this data structure.
+ Note that this lock is at the bottom of any lock hierarchy. If it is held, no other
+ lock in the system should attempt to be acquired.
+ """
+ self.packages = {}
+ self.lock = Lock()
+
+ def getPackages(self):
+ """ Get the list of known QMF packages """
+ list = []
+ try:
+ self.lock.acquire()
+ for package in self.packages:
+ list.append(package)
+ finally:
+ self.lock.release()
+ return list
+
+ def getClasses(self, packageName):
+ """ Get the list of known classes within a QMF package """
+ list = []
+ try:
+ self.lock.acquire()
+ if packageName in self.packages:
+ for pkey in self.packages[packageName]:
+ list.append(self.packages[packageName][pkey].getKey())
+ finally:
+ self.lock.release()
+ return list
+
+ def getSchema(self, classKey):
+ """ Get the schema for a QMF class """
+ pname = classKey.getPackageName()
+ pkey = classKey.getPackageKey()
+ try:
+ self.lock.acquire()
+ if pname in self.packages:
+ if pkey in self.packages[pname]:
+ return self.packages[pname][pkey]
+ finally:
+ self.lock.release()
+ return None
+
+ def declarePackage(self, pname):
+ """ Maybe add a package to the cache. Return True if package was added, None if it pre-existed. """
+ try:
+ self.lock.acquire()
+ if pname in self.packages:
+ return None
+ self.packages[pname] = {}
+ finally:
+ self.lock.release()
+ return True
+
+ def declareClass(self, classKey, classDef):
+ """ Maybe add a class definition to the cache. Return True if added, None if pre-existed. """
+ pname = classKey.getPackageName()
+ pkey = classKey.getPackageKey()
+ try:
+ self.lock.acquire()
+ if pname not in self.packages:
+ self.packages[pname] = {}
+ packageMap = self.packages[pname]
+ if pkey in packageMap:
+ return None
+ packageMap[pkey] = classDef
+ finally:
+ self.lock.release()
+ return True
+
+
+#===================================================================================================
+# ClassKey
+#===================================================================================================
class ClassKey:
""" A ClassKey uniquely identifies a class from the schema. """
def __init__(self, constructor):
- if type(constructor) == str:
+ if constructor.__class__ == str:
# construct from __repr__ string
try:
self.pname, cls = constructor.split(":")
@@ -1177,20 +1350,33 @@ class ClassKey:
h1 = int(hexValues[1], 16)
h2 = int(hexValues[2], 16)
h3 = int(hexValues[3], 16)
- self.hash = struct.pack("!LLLL", h0, h1, h2, h3)
+ h4 = int(hexValues[4][0:4], 16)
+ h5 = int(hexValues[4][4:12], 16)
+ self.hash = UUID(struct.pack("!LHHHHL", h0, h1, h2, h3, h4, h5))
except:
raise Exception("Invalid ClassKey format")
+ elif constructor.__class__ == dict:
+ # construct from QMFv2 map
+ try:
+ self.pname = constructor['_package_name']
+ self.cname = constructor['_class_name']
+ self.hash = constructor['_hash']
+ except:
+ raise Exception("Invalid ClassKey map format")
else:
# construct from codec
codec = constructor
self.pname = str(codec.read_str8())
self.cname = str(codec.read_str8())
- self.hash = codec.read_bin128()
+ self.hash = UUID(codec.read_bin128())
def encode(self, codec):
codec.write_str8(self.pname)
codec.write_str8(self.cname)
- codec.write_bin128(self.hash)
+ codec.write_bin128(self.hash.bytes)
+
+ def asMap(self):
+ return {'_package_name': self.pname, '_class_name': self.cname, '_hash': self.hash}
def getPackageName(self):
return self.pname
@@ -1202,7 +1388,7 @@ class ClassKey:
return self.hash
def getHashString(self):
- return "%08x-%08x-%08x-%08x" % struct.unpack ("!LLLL", self.hash)
+ return str(self.hash)
def getPackageKey(self):
return (self.cname, self.hash)
@@ -1210,6 +1396,10 @@ class ClassKey:
def __repr__(self):
return self.pname + ":" + self.cname + "(" + self.getHashString() + ")"
+
+#===================================================================================================
+# SchemaClass
+#===================================================================================================
class SchemaClass:
""" """
CLASS_KIND_TABLE = 1
@@ -1292,6 +1482,10 @@ class SchemaClass:
else:
return self.arguments + self.session.getSchema(self.superTypeKey).getArguments()
+
+#===================================================================================================
+# SchemaProperty
+#===================================================================================================
class SchemaProperty:
""" """
def __init__(self, codec):
@@ -1321,6 +1515,10 @@ class SchemaProperty:
def __repr__(self):
return self.name
+
+#===================================================================================================
+# SchemaStatistic
+#===================================================================================================
class SchemaStatistic:
""" """
def __init__(self, codec):
@@ -1337,6 +1535,10 @@ class SchemaStatistic:
def __repr__(self):
return self.name
+
+#===================================================================================================
+# SchemaMethod
+#===================================================================================================
class SchemaMethod:
""" """
def __init__(self, codec):
@@ -1365,6 +1567,10 @@ class SchemaMethod:
result += ")"
return result
+
+#===================================================================================================
+# SchemaArgument
+#===================================================================================================
class SchemaArgument:
""" """
def __init__(self, codec, methodArg):
@@ -1392,64 +1598,113 @@ class SchemaArgument:
elif key == "refPackage" : self.refPackage = value
elif key == "refClass" : self.refClass = value
+
+#===================================================================================================
+# ObjectId
+#===================================================================================================
class ObjectId:
""" Object that represents QMF object identifiers """
- def __init__(self, codec, first=0, second=0):
- if codec:
- self.first = codec.read_uint64()
- self.second = codec.read_uint64()
+ def __init__(self, constructor, first=0, second=0, agentName=None):
+ if constructor.__class__ == dict:
+ self.agentName = agentName
+ self.agentEpoch = 0
+ if '_agent_name' in constructor: self.agentName = constructor['_agent_name']
+ if '_agent_epoch' in constructor: self.agentEpoch = constructor['_agent_epoch']
+ if '_object_name' not in constructor:
+ raise Exception("QMFv2 OBJECT_ID must have the '_object_name' field.")
+ self.objectName = constructor['_object_name']
else:
- self.first = first
- self.second = second
+ if not constructor:
+ first = first
+ second = second
+ else:
+ first = constructor.read_uint64()
+ second = constructor.read_uint64()
+ self.agentName = str(first & 0x000000000FFFFFFF)
+ self.agentEpoch = (first & 0x0FFF000000000000) >> 48
+ self.objectName = str(second)
def __cmp__(self, other):
if other == None or not isinstance(other, ObjectId) :
return 1
- if self.first < other.first:
+
+ if self.objectName < other.objectName:
+ return -1
+ if self.objectName > other.objectName:
+ return 1
+
+ if self.agentName < other.agentName:
return -1
- if self.first > other.first:
+ if self.agentName > other.agentName:
return 1
- if self.second < other.second:
+
+ if self.agentEpoch < other.agentEpoch:
return -1
- if self.second > other.second:
+ if self.agentEpoch > other.agentEpoch:
return 1
return 0
def __repr__(self):
- return "%d-%d-%d-%d-%d" % (self.getFlags(), self.getSequence(),
+ return "%d-%d-%d-%s-%s" % (self.getFlags(), self.getSequence(),
self.getBrokerBank(), self.getAgentBank(), self.getObject())
+ def isV2(self):
+ return not self.agentName.isdigit()
+
def index(self):
- return (self.first, self.second)
+ return self.__repr__()
def getFlags(self):
- return (self.first & 0xF000000000000000) >> 60
+ return 0
def getSequence(self):
- return (self.first & 0x0FFF000000000000) >> 48
+ return self.agentEpoch
def getBrokerBank(self):
- return (self.first & 0x0000FFFFF0000000) >> 28
+ return 1
def getAgentBank(self):
- return self.first & 0x000000000FFFFFFF
+ return self.agentName
def getObject(self):
- return self.second
+ return self.objectName
def isDurable(self):
return self.getSequence() == 0
def encode(self, codec):
- codec.write_uint64(self.first)
- codec.write_uint64(self.second)
+ first = (self.agentEpoch << 48) + (1 << 28)
+ second = 0
+
+ try:
+ first += int(self.agentName)
+ except:
+ pass
+
+ try:
+ second = int(self.objectName)
+ except:
+ pass
+
+ codec.write_uint64(first)
+ codec.write_uint64(second)
+
+ def asMap(self):
+ omap = {'_agent_name': self.agentName, '_object_name': self.objectName}
+ if self.agentEpoch != 0:
+ omap['_agent_epoch'] = self.agentEpoch
+ return omap
def __hash__(self):
- return (self.first, self.second).__hash__()
+ return self.__repr__().__hash__()
def __eq__(self, other):
- return (self.first, self.second).__eq__(other)
+ return self.__repr__().__eq__(other)
+
+#===================================================================================================
+# MethodResult
+#===================================================================================================
class MethodResult(object):
""" """
def __init__(self, status, text, outArgs):
@@ -1465,6 +1720,10 @@ class MethodResult(object):
def __repr__(self):
return "%s (%d) - %s" % (self.text, self.status, self.outArgs)
+
+#===================================================================================================
+# ManagedConnection
+#===================================================================================================
class ManagedConnection(Thread):
""" Thread class for managing a connection. """
DELAY_MIN = 1
@@ -1527,6 +1786,10 @@ class ManagedConnection(Thread):
finally:
self.cv.release()
+
+#===================================================================================================
+# Broker
+#===================================================================================================
class Broker:
""" This object represents a connection (or potential connection) to a QMF broker. """
SYNC_TIME = 60
@@ -1542,6 +1805,7 @@ class Broker:
self.authUser = authUser
self.authPass = authPass
self.cv = Condition()
+ self.seqToAgentMap = {}
self.error = None
self.brokerId = None
self.connected = False
@@ -1574,9 +1838,13 @@ class Broker:
def getAgent(self, brokerBank, agentBank):
""" Return the agent object associated with a particular broker and agent bank value."""
- bankKey = (brokerBank, agentBank)
- if bankKey in self.agents:
- return self.agents[bankKey]
+ bankKey = str(agentBank)
+ try:
+ self.cv.acquire()
+ if bankKey in self.agents:
+ return self.agents[bankKey]
+ finally:
+ self.cv.release()
return None
def getSessionId(self):
@@ -1585,7 +1853,11 @@ class Broker:
def getAgents(self):
""" Get the list of agents reachable via this broker """
- return self.agents.values()
+ try:
+ self.cv.acquire()
+ return self.agents.values()
+ finally:
+ self.cv.release()
def getAmqpSession(self):
""" Get the AMQP session object for this connected broker. """
@@ -1612,10 +1884,29 @@ class Broker:
else:
return "Disconnected Broker"
+ def _setSequence(self, sequence, agent):
+ try:
+ self.cv.acquire()
+ self.seqToAgentMap[sequence] = agent
+ finally:
+ self.cv.release()
+
+ def _clearSequence(self, sequence):
+ try:
+ self.cv.acquire()
+ self.seqToAgentMap.pop(sequence)
+ finally:
+ self.cv.release()
+
def _tryToConnect(self):
try:
- self.agents = {}
- self.agents[(1,0)] = Agent(self, 0, "BrokerAgent")
+ try:
+ self.cv.acquire()
+ self.agents = {}
+ self.agents['0'] = Agent(self, 0, "BrokerAgent")
+ finally:
+ self.cv.release()
+
self.topicBound = False
self.syncInFlight = False
self.syncRequest = 0
@@ -1649,7 +1940,7 @@ class Broker:
self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest",
accept_mode=self.amqpSession.accept_mode.none,
acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
- self.amqpSession.incoming("rdest").listen(self._replyCb, self._exceptionCb)
+ self.amqpSession.incoming("rdest").listen(self._v1Cb, self._exceptionCb)
self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1)
self.amqpSession.message_flow(destination="rdest", unit=0, value=0xFFFFFFFFL)
self.amqpSession.message_flow(destination="rdest", unit=1, value=0xFFFFFFFFL)
@@ -1659,11 +1950,29 @@ class Broker:
self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest",
accept_mode=self.amqpSession.accept_mode.none,
acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
- self.amqpSession.incoming("tdest").listen(self._replyCb)
+ self.amqpSession.incoming("tdest").listen(self._v1Cb)
self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1)
self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFFL)
self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFFL)
+ ##
+ ## Set up connectivity for QMFv2
+ ##
+ self.v2_queue_name = "qmfc-v2-%s" % self.amqpSessionId
+ self.amqpSession.queue_declare(queue=self.v2_queue_name, exclusive=True, auto_delete=True)
+ self.amqpSession.exchange_bind(exchange="qmf.default.direct",
+ queue=self.v2_queue_name, binding_key=self.v2_queue_name)
+ self.amqpSession.exchange_bind(exchange="qmf.default.topic",
+ queue=self.v2_queue_name, binding_key="agent.#")
+ ## Other bindings here...
+ self.amqpSession.message_subscribe(queue=self.v2_queue_name, destination="v2dest",
+ accept_mode=self.amqpSession.accept_mode.none,
+ acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
+ self.amqpSession.incoming("v2dest").listen(self._v2Cb, self._exceptionCb)
+ self.amqpSession.message_set_flow_mode(destination="v2dest", flow_mode=1)
+ self.amqpSession.message_flow(destination="v2dest", unit=0, value=0xFFFFFFFFL)
+ self.amqpSession.message_flow(destination="v2dest", unit=1, value=0xFFFFFFFFL)
+
self.connected = True
self.session._handleBrokerConnect(self)
@@ -1671,6 +1980,7 @@ class Broker:
self._setHeader(codec, 'B')
msg = self._message(codec.encoded)
self._send(msg)
+ self._v2SendAgentLocate()
except socket.error, e:
self.error = "Socket Error %s - %s" % (e.__class__.__name__, e)
@@ -1683,18 +1993,73 @@ class Broker:
raise
def _updateAgent(self, obj):
- bankKey = (obj.brokerBank, obj.agentBank)
+ bankKey = str(obj.agentBank)
+ agent = None
if obj._deleteTime == 0:
- if bankKey not in self.agents:
- agent = Agent(self, obj.agentBank, obj.label)
- self.agents[bankKey] = agent
- if self.session.console != None:
- self.session.console.newAgent(agent)
+ try:
+ self.cv.acquire()
+ if bankKey not in self.agents:
+ agent = Agent(self, obj.agentBank, obj.label)
+ self.agents[bankKey] = agent
+ finally:
+ self.cv.release()
+ if agent and self.session.console:
+ self.session.console.newAgent(agent)
else:
- agent = self.agents.pop(bankKey, None)
- if agent != None and self.session.console != None:
+ try:
+ self.cv.acquire()
+ agent = self.agents.pop(bankKey, None)
+ if agent:
+ agent.close()
+ finally:
+ self.cv.release()
+ if agent and self.session.console:
self.session.console.delAgent(agent)
+ def _addAgent(self, name, agent):
+ try:
+ self.cv.acquire()
+ self.agents[name] = agent
+ finally:
+ self.cv.release()
+ if self.session.console:
+ self.session.console.newAgent(agent)
+
+ def _ageAgents(self):
+ try:
+ self.cv.acquire()
+ to_delete = []
+ to_notify = []
+ for key in self.agents:
+ if self.agents[key].isOld():
+ to_delete.append(key)
+ for key in to_delete:
+ agent = self.agents.pop(key)
+ agent.close()
+ to_notify.append(agent)
+ finally:
+ self.cv.release()
+ if self.session.console:
+ for agent in to_notify:
+ self.session.console.delAgent(agent)
+
+ def _v2SendAgentLocate(self, predicate={}):
+ """
+ Broadcast an agent-locate request to cause all agents in the domain to tell us who they are.
+ """
+ dp = self.amqpSession.delivery_properties()
+ dp.routing_key = "console.request.agent_locate"
+ mp = self.amqpSession.message_properties()
+ mp.content_type = "amqp/map"
+ mp.user_id = self.authUser
+ mp.app_id = "qmf2"
+ mp.reply_to = self.amqpSession.reply_to("qmf.default.direct", self.v2_queue_name)
+ mp.application_headers = {'qmf.opcode':'_agent_locate_request'}
+ sendCodec = Codec()
+ sendCodec.write_map(predicate)
+ msg = Message(dp, mp, sendCodec.encoded)
+ self._send(msg, "qmf.default.topic")
+
def _setHeader(self, codec, opcode, seq=0):
""" Compose the header of a management message. """
codec.write_uint8(ord('A'))
@@ -1785,24 +2150,105 @@ class Broker:
finally:
self.cv.release()
- def _replyCb(self, msg):
+ def _v1Cb(self, msg):
+ try:
+ self._v1CbProtected(msg)
+ except Exception, e:
+ print "EXCEPTION in Broker._v1Cb:", e
+
+ def _v1CbProtected(self, msg):
+ """
+ This is the general message handler for messages received via the QMFv1 exchanges.
+ """
+ agent = None
+ agent_addr = None
+ mp = msg.get("message_properties")
+ ah = mp.application_headers
+ if ah and 'qmf.agent' in ah:
+ agent_addr = ah['qmf.agent']
+
+ if not agent_addr:
+ #
+ # See if we can determine the agent identity from the routing key
+ #
+ dp = msg.get("delivery_properties")
+ rkey = None
+ if dp.routing_key:
+ rkey = dp.routing_key
+ items = rkey.split('.')
+ if len(items) >= 4:
+ if items[0] == 'console' and items[3].isdigit():
+ agent_addr = str(items[3]) # The QMFv1 Agent Bank
+ if agent_addr != None and agent_addr in self.agents:
+ agent = self.agents[agent_addr]
+
codec = Codec(msg.body)
+ alreadyTried = None
while True:
opcode, seq = self._checkHeader(codec)
+
+ if not agent and not alreadyTried:
+ alreadyTried = True
+ try:
+ self.cv.acquire()
+ if seq in self.seqToAgentMap:
+ agent = self.seqToAgentMap[seq]
+ finally:
+ self.cv.release()
+
if opcode == None: return
if opcode == 'b': self.session._handleBrokerResp (self, codec, seq)
elif opcode == 'p': self.session._handlePackageInd (self, codec, seq)
- elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq)
elif opcode == 'q': self.session._handleClassInd (self, codec, seq)
- elif opcode == 'm': self.session._handleMethodResp (self, codec, seq)
+ elif opcode == 's': self.session._handleSchemaResp (self, codec, seq, agent_addr)
elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg)
- elif opcode == 'e': self.session._handleEventInd (self, codec, seq)
- elif opcode == 's': self.session._handleSchemaResp (self, codec, seq)
- elif opcode == 'c': self.session._handleContentInd (self, codec, seq, prop=True)
- elif opcode == 'i': self.session._handleContentInd (self, codec, seq, stat=True)
- elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True)
- self.session.receiver._completed.add(msg.id)
- self.session.channel.session_completed(self.session.receiver._completed)
+ elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq, agent)
+ elif agent:
+ agent._handleQmfV1Message(opcode, seq, mp, ah, codec)
+
+ self.amqpSession.receiver._completed.add(msg.id)
+ self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed)
+
+ def _v2Cb(self, msg):
+ """
+ This is the general message handler for messages received via QMFv2 exchanges.
+ """
+ mp = msg.get("message_properties")
+ ah = mp["application_headers"]
+ codec = Codec(msg.body)
+
+ if 'qmf.opcode' in ah:
+ opcode = ah['qmf.opcode']
+ if mp.content_type == "amqp/list":
+ content = codec.read_list()
+ if not content:
+ content = []
+ elif mp.content_type == "amqp/map":
+ content = codec.read_map()
+ if not content:
+ content = {}
+ else:
+ content = None
+
+ if content != None:
+ ##
+ ## Directly handle agent heartbeats and agent locate responses as these are broker-scope (they are
+ ## used to maintain the broker's list of agent proxies.
+ ##
+ if opcode == '_agent_heartbeat_indication': self.session._v2HandleHeartbeatInd(self, mp, ah, content)
+ elif opcode == '_agent_locate_response': self.session._v2HandleAgentLocateRsp(self, mp, ah, content)
+ else:
+ ##
+ ## All other opcodes are agent-scope and are forwarded to the agent proxy representing the sender
+ ## of the message.
+ ##
+ agent_addr = ah['qmf.agent']
+ if agent_addr in self.agents:
+ agent = self.agents[agent_addr]
+ agent._handleQmfV2Message(opcode, mp, ah, content)
+
+ self.amqpSession.receiver._completed.add(msg.id)
+ self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed)
def _exceptionCb(self, data):
self.connected = False
@@ -1818,43 +2264,697 @@ class Broker:
if self.thread:
self.thread.disconnected()
+
+#===================================================================================================
+# Agent
+#===================================================================================================
class Agent:
- """ """
- def __init__(self, broker, agentBank, label):
+ """
+ This class represents a proxy for a remote agent being managed
+ """
+ def __init__(self, broker, agentBank, label, isV2=False, interval=0):
self.broker = broker
+ self.session = broker.session
+ self.schemaCache = self.session.schemaCache
self.brokerBank = broker.getBrokerBank()
- self.agentBank = agentBank
+ self.agentBank = str(agentBank)
self.label = label
+ self.isV2 = isV2
+ self.heartbeatInterval = interval
+ self.lock = Lock()
+ self.seqMgr = self.session.seqMgr
+ self.contextMap = {}
+ self.unsolicitedContext = RequestContext(self, self)
+ self.lastSeenTime = time()
+ self.closed = None
+
+
+ def _checkClosed(self):
+ if self.closed:
+ raise Exception("Agent is disconnected")
+
+
+ def __call__(self, **kwargs):
+ """
+ This is the handler for unsolicited stuff received from the agent
+ """
+ if 'qmf_object' in kwargs:
+ if self.session.console:
+ self.session.console.objectProps(self.broker, kwargs['qmf_object'])
+ elif 'qmf_object_stats' in kwargs:
+ if self.session.console:
+ self.session.console.objectStats(self.broker, kwargs['qmf_object_stats'])
+ elif 'qmf_event' in kwargs:
+ if self.session.console:
+ self.session.console.event(self.broker, kwargs['qmf_event'])
+
+
+ def touch(self):
+ self.lastSeenTime = time()
+
+
+ def isOld(self):
+ if self.heartbeatInterval == 0:
+ return None
+ if time() - self.lastSeenTime > (2.0 * self.heartbeatInterval):
+ return True
+ return None
+
+
+ def close(self):
+ self.closed = True
+ copy = {}
+ try:
+ self.lock.acquire()
+ for seq in self.contextMap:
+ copy[seq] = self.contextMap[seq]
+ finally:
+ self.lock.release()
+
+ for seq in copy:
+ context = copy[seq]
+ context.cancel("Agent disconnected")
+
def __repr__(self):
- return "Agent at bank %d.%d (%s)" % (self.brokerBank, self.agentBank, self.label)
+ if self.isV2:
+ ver = "v2"
+ else:
+ ver = "v1"
+ return "Agent(%s) at bank %d.%s (%s)" % (ver, self.brokerBank, self.agentBank, self.label)
+
def getBroker(self):
+ self._checkClosed()
return self.broker
+
def getBrokerBank(self):
+ self._checkClosed()
return self.brokerBank
+
def getAgentBank(self):
+ self._checkClosed()
return self.agentBank
+
+ def getObjects(self, notifiable=None, **kwargs):
+ """ Get a list of objects from QMF agents.
+ All arguments are passed by name(keyword).
+
+ If 'notifiable' is None (default), this call will block until completion or timeout.
+ If supplied, notifiable is assumed to be a callable object that will be called when the
+ list of queried objects arrives. The single argument to the call shall be a list of
+ the returned objects.
+
+ The class for queried objects may be specified in one of the following ways:
+
+ _schema = <schema> - supply a schema object returned from getSchema.
+ _key = <key> - supply a classKey from the list returned by getClasses.
+ _class = <name> - supply a class name as a string. If the class name exists
+ in multiple packages, a _package argument may also be supplied.
+ _objectId = <id> - get the object referenced by the object-id
+
+ The default timeout for this synchronous operation is 60 seconds. To change the timeout,
+ use the following argument:
+
+ _timeout = <time in seconds>
+
+ If additional arguments are supplied, they are used as property selectors. For example,
+ if the argument name="test" is supplied, only objects whose "name" property is "test"
+ will be returned in the result.
+ """
+ self._checkClosed()
+ if notifiable:
+ if not callable(notifiable):
+ raise Exception("notifiable object must be callable")
+
+ #
+ # Isolate the selectors from the kwargs
+ #
+ selectors = {}
+ for key in kwargs:
+ value = kwargs[key]
+ if key[0] != '_':
+ selectors[key] = value
+
+ #
+ # Allocate a context to track this asynchronous request.
+ #
+ context = RequestContext(self, notifiable, selectors)
+ sequence = self.seqMgr._reserve(context)
+ try:
+ self.lock.acquire()
+ self.contextMap[sequence] = context
+ context.setSequence(sequence)
+ finally:
+ self.lock.release()
+
+ #
+ # Compose and send the query message to the agent using the appropriate protocol for the
+ # agent's QMF version.
+ #
+ if self.isV2:
+ self._v2SendGetQuery(sequence, kwargs)
+ else:
+ self.broker._setSequence(sequence, self)
+ self._v1SendGetQuery(sequence, kwargs)
+
+ #
+ # If this is a synchronous call, block and wait for completion.
+ #
+ if not notifiable:
+ timeout = 60
+ if '_timeout' in kwargs:
+ timeout = kwargs['_timeout']
+ context.waitForSignal(timeout)
+ if context.exception:
+ raise Exception(context.exception)
+ result = context.queryResults
+ return result
+
+
+ def _clearContext(self, sequence):
+ try:
+ self.lock.acquire()
+ self.contextMap.pop(sequence)
+ finally:
+ self.lock.release()
+
+
+ def _schemaInfoFromV2Agent(self):
+ """
+ We have just received new schema information from this agent. Check to see if there's
+ more work that can now be done.
+ """
+ try:
+ self.lock.acquire()
+ copy_of_map = {}
+ for item in self.contextMap:
+ copy_of_map[item] = self.contextMap[item]
+ finally:
+ self.lock.release()
+
+ self.unsolicitedContext.reprocess()
+ for context in copy_of_map:
+ copy_of_map[context].reprocess()
+
+
+ def _handleV1Completion(self, sequence, code, text):
+ """
+ Called if one of this agent's V1 commands completed
+ """
+ context = None
+ try:
+ self.lock.acquire()
+ if sequence in self.contextMap:
+ context = self.contextMap[sequence]
+ finally:
+ self.lock.release()
+
+ if context:
+ if code != 0:
+ ex = "Error %d: %s" % (code, text)
+ context.setException(ex)
+ context.signal()
+ self.broker._clearSequence(sequence)
+
+
+ def _v1HandleMethodResp(self, codec, seq):
+ """
+ Handle a QMFv1 method response
+ """
+ code = codec.read_uint32()
+ text = codec.read_str16()
+ outArgs = {}
+ self.broker._clearSequence(seq)
+ pair = self.seqMgr._release(seq)
+ if pair == None:
+ return
+ method, synchronous = pair
+ if code == 0:
+ for arg in method.arguments:
+ if arg.dir.find("O") != -1:
+ outArgs[arg.name] = self.session._decodeValue(codec, arg.type, self.broker)
+ result = MethodResult(code, text, outArgs)
+ if synchronous:
+ try:
+ self.broker.cv.acquire()
+ self.broker.syncResult = result
+ self.broker.syncInFlight = False
+ self.broker.cv.notify()
+ finally:
+ self.broker.cv.release()
+ else:
+ if self.session.console:
+ self.session.console.methodResponse(self.broker, seq, result)
+
+
+ def _v1HandleEventInd(self, codec, seq):
+ """
+ Handle a QMFv1 event indication
+ """
+ event = Event(self, codec)
+ self.unsolicitedContext.doEvent(event)
+
+
+ def _v1HandleContentInd(self, codec, sequence, prop=False, stat=False):
+ """
+ Handle a QMFv1 content indication
+ """
+ classKey = ClassKey(codec)
+ schema = self.schemaCache.getSchema(classKey)
+ if not schema:
+ return
+
+ obj = Object(self, schema, codec, prop, stat)
+ if classKey.getPackageName() == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop:
+ self.broker._updateAgent(obj)
+
+ context = self.unsolicitedContext
+ try:
+ self.lock.acquire()
+ if sequence in self.contextMap:
+ context = self.contextMap[sequence]
+ finally:
+ self.lock.release()
+
+ context.addV1QueryResult(obj)
+
+
+ def _v2HandleDataInd(self, mp, ah, content):
+ """
+ Handle a QMFv2 data indication from the agent
+ """
+ if mp.correlation_id:
+ try:
+ self.lock.acquire()
+ sequence = int(mp.correlation_id)
+ if sequence not in self.contextMap:
+ return
+ context = self.contextMap[sequence]
+ finally:
+ self.lock.release()
+ else:
+ context = self.unsolicitedContext
+
+ kind = "_data"
+ if "qmf.content" in ah:
+ kind = ah["qmf.content"]
+ if kind == "_data":
+ if content.__class__ != list:
+ return
+ for omap in content:
+ context.addV2QueryResult(omap)
+ context.processV2Data()
+
+ if 'partial' not in ah:
+ context.signal()
+
+
+ def _v2HandleMethodResp(self, mp, ah, content):
+ """
+ Handle a QMFv2 method response from the agent
+ """
+ context = None
+ sequence = None
+ if mp.correlation_id:
+ try:
+ self.lock.acquire()
+ seq = int(mp.correlation_id)
+ finally:
+ self.lock.release()
+ else:
+ return
+
+ pair = self.seqMgr._release(seq)
+ if pair == None:
+ return
+ method, synchronous = pair
+
+ result = MethodResult(0, 'OK', content['_arguments'])
+ if synchronous:
+ try:
+ self.broker.cv.acquire()
+ self.broker.syncResult = result
+ self.broker.syncInFlight = False
+ self.broker.cv.notify()
+ finally:
+ self.broker.cv.release()
+ else:
+ if self.session.console:
+ self.session.console.methodResponse(self.broker, seq, result)
+
+ def _v2HandleException(self, mp, ah, content):
+ """
+ Handle a QMFv2 exception
+ """
+ context = None
+ if mp.correlation_id:
+ try:
+ self.lock.acquire()
+ seq = int(mp.correlation_id)
+ finally:
+ self.lock.release()
+ else:
+ return
+
+ pair = self.seqMgr._release(seq)
+ if pair == None:
+ return
+ method, synchronous = pair
+
+ code = 7
+ text = ""
+ if '_status_code' in content:
+ code = content['_status_code']
+ if '_status_text' in content:
+ text = content['_status_text']
+ else:
+ text = content
+
+ result = MethodResult(code, text, {})
+ if synchronous:
+ try:
+ self.broker.cv.acquire()
+ self.broker.syncResult = result
+ self.broker.syncInFlight = False
+ self.broker.cv.notify()
+ finally:
+ self.broker.cv.release()
+ else:
+ if self.session.console:
+ self.session.console.methodResponse(self.broker, seq, result)
+
+
+ def _v1SendGetQuery(self, sequence, kwargs):
+ """
+ Send a get query to a QMFv1 agent.
+ """
+ #
+ # Build the query map
+ #
+ query = {}
+ if '_class' in kwargs:
+ query['_class'] = kwargs['_class']
+ if '_package' in kwargs:
+ query['_package'] = kwargs['_package']
+ elif '_key' in kwargs:
+ key = kwargs['_key']
+ query['_class'] = key.getClassName()
+ query['_package'] = key.getPackageName()
+ elif '_objectId' in kwargs:
+ query['_objectid'] = kwargs['_objectId'].__repr__()
+
+ #
+ # Construct and transmit the message
+ #
+ sendCodec = Codec()
+ self.broker._setHeader(sendCodec, 'G', sequence)
+ sendCodec.write_map(query)
+ smsg = self.broker._message(sendCodec.encoded, "agent.%d.%s" % (self.brokerBank, self.agentBank))
+ self.broker._send(smsg)
+
+
+ def _v2SendGetQuery(self, sequence, kwargs):
+ """
+ Send a get query to a QMFv2 agent.
+ """
+ #
+ # Build the query map
+ #
+ query = {'_what': 'OBJECT'}
+ if '_class' in kwargs:
+ schemaMap = {'_class_name': kwargs['_class']}
+ if '_package' in kwargs:
+ schemaMap['_package_name'] = kwargs['_package']
+ query['_schema_id'] = schemaMap
+ elif '_key' in kwargs:
+ query['_schema_id'] = kwargs['_key'].asMap()
+ elif '_objectId' in kwargs:
+ query['_object_id'] = kwargs['_objectId'].asMap()
+
+ #
+ # Construct and transmit the message
+ #
+ dp = self.broker.amqpSession.delivery_properties()
+ dp.routing_key = self.agentBank
+ mp = self.broker.amqpSession.message_properties()
+ mp.content_type = "amqp/map"
+ mp.user_id = self.broker.authUser
+ mp.correlation_id = str(sequence)
+ mp.app_id = "qmf2"
+ mp.reply_to = self.broker.amqpSession.reply_to("qmf.default.direct", self.broker.v2_queue_name)
+ mp.application_headers = {'qmf.opcode':'_query_request'}
+ sendCodec = Codec()
+ sendCodec.write_map(query)
+ msg = Message(dp, mp, sendCodec.encoded)
+ self.broker._send(msg, "qmf.default.direct")
+
+
+ def _v2SendSchemaRequest(self, schemaId):
+ """
+ Send a query to an agent to request details on a particular schema class.
+ IMPORTANT: This function currently sends a QMFv1 schema-request to the address of
+ the agent. The agent will send its response to amq.direct/<our-key>.
+ Eventually, this will be converted to a proper QMFv2 schema query.
+ """
+ sendCodec = Codec()
+ seq = self.seqMgr._reserve(None)
+ self.broker._setHeader(sendCodec, 'S', seq)
+ schemaId.encode(sendCodec)
+ smsg = self.broker._message(sendCodec.encoded, self.agentBank)
+ self.broker._send(smsg, "qmf.default.direct")
+
+
+ def _handleQmfV1Message(self, opcode, seq, mp, ah, codec):
+ """
+ Process QMFv1 messages arriving from an agent.
+ """
+ if opcode == 'm': self._v1HandleMethodResp(codec, seq)
+ elif opcode == 'e': self._v1HandleEventInd(codec, seq)
+ elif opcode == 'c': self._v1HandleContentInd(codec, seq, prop=True)
+ elif opcode == 'i': self._v1HandleContentInd(codec, seq, stat=True)
+ elif opcode == 'g': self._v1HandleContentInd(codec, seq, prop=True, stat=True)
+
+
+ def _handleQmfV2Message(self, opcode, mp, ah, content):
+ """
+ Process QMFv2 messages arriving from an agent.
+ """
+ if opcode == '_data_indication': self._v2HandleDataInd(mp, ah, content)
+ elif opcode == '_query_response': self._v2HandleDataInd(mp, ah, content)
+ elif opcode == '_method_response': self._v2HandleMethodResp(mp, ah, content)
+ elif opcode == '_exception': self._v2HandleException(mp, ah, content)
+
+
+#===================================================================================================
+# RequestContext
+#===================================================================================================
+class RequestContext(object):
+ """
+ This class tracks an asynchronous request sent to an agent.
+ TODO: Add logic for client-side selection and filtering deleted objects from get-queries
+ """
+ def __init__(self, agent, notifiable, selectors={}):
+ self.sequence = None
+ self.agent = agent
+ self.schemaCache = self.agent.schemaCache
+ self.notifiable = notifiable
+ self.selectors = selectors
+ self.startTime = time()
+ self.rawQueryResults = []
+ self.queryResults = []
+ self.exception = None
+ self.waitingForSchema = None
+ self.pendingSignal = None
+ self.cv = Condition()
+ self.blocked = notifiable == None
+
+
+ def setSequence(self, sequence):
+ self.sequence = sequence
+
+
+ def addV1QueryResult(self, data):
+ values = {}
+ for prop, val in data.getProperties():
+ values[prop.name] = val
+ for stat, val in data.getStatistics():
+ values[stat.name] = val
+ for key in values:
+ val = values[key]
+ if key in self.selectors and val != self.selectors[key]:
+ return
+
+ if self.notifiable:
+ self.notifiable(qmf_object=data)
+ else:
+ self.queryResults.append(data)
+
+
+ def addV2QueryResult(self, data):
+ values = data['_values']
+ for key in values:
+ val = values[key]
+ if key in self.selectors and val != self.selectors[key]:
+ return
+ self.rawQueryResults.append(data)
+
+
+ def doEvent(self, data):
+ if self.notifiable:
+ self.notifiable(qmf_event=data)
+
+
+ def setException(self, ex):
+ self.exception = ex
+
+
+ def getAge(self):
+ return time() - self.startTime
+
+
+ def cancel(self, exception):
+ self.setException(exception)
+ try:
+ self.cv.acquire()
+ self.blocked = None
+ self.waitingForSchema = None
+ self.cv.notify()
+ finally:
+ self.cv.release()
+ self._complete()
+
+
+ def waitForSignal(self, timeout):
+ try:
+ self.cv.acquire()
+ while self.blocked:
+ if (time() - self.startTime) > timeout:
+ self.exception = "Request timed out after %d seconds" % timeout
+ return
+ self.cv.wait(1)
+ finally:
+ self.cv.release()
+
+
+ def signal(self):
+ try:
+ self.cv.acquire()
+ if self.waitingForSchema:
+ self.pendingSignal = True
+ return
+ else:
+ self.blocked = None
+ self.cv.notify()
+ finally:
+ self.cv.release()
+ self._complete()
+
+
+ def _complete(self):
+ if self.notifiable:
+ if self.exception:
+ self.notifiable(qmf_exception=self.exception)
+ else:
+ self.notifiable(qmf_complete=True)
+
+ if self.sequence:
+ self.agent._clearContext(self.sequence)
+
+
+ def processV2Data(self):
+ """
+ Attempt to make progress on the entries in the raw_query_results queue. If an entry has a schema
+ that is in our schema cache, process it. Otherwise, send a request for the schema information
+ to the agent that manages the object.
+ """
+ schemaId = None
+ queryResults = []
+ try:
+ self.cv.acquire()
+ if self.waitingForSchema:
+ return
+ while (not self.waitingForSchema) and len(self.rawQueryResults) > 0:
+ head = self.rawQueryResults[0]
+ schemaId = self._getSchemaIdforV2ObjectLH(head)
+ schema = self.schemaCache.getSchema(schemaId)
+ if schema:
+ obj = Object(self.agent, schema, v2Map=head, agentName=self.agent.agentBank)
+ queryResults.append(obj)
+ self.rawQueryResults.pop(0)
+ else:
+ self.waitingForSchema = True
+ finally:
+ self.cv.release()
+
+ if self.waitingForSchema:
+ self.agent._v2SendSchemaRequest(schemaId)
+
+ for result in queryResults:
+ if self.notifiable:
+ self.notifiable(qmf_object=result)
+ else:
+ self.queryResults.append(result)
+
+ complete = None
+ try:
+ self.cv.acquire()
+ if not self.waitingForSchema and self.pendingSignal:
+ self.blocked = None
+ self.cv.notify()
+ complete = True
+ finally:
+ self.cv.release()
+
+ if complete:
+ self._complete()
+
+
+ def reprocess(self):
+ """
+ New schema information has been added to the schema-cache. Clear our 'waiting' status
+ and see if we can make more progress on the raw query list.
+ """
+ try:
+ self.cv.acquire()
+ self.waitingForSchema = None
+ finally:
+ self.cv.release()
+ self.processV2Data()
+
+
+ def _getSchemaIdforV2ObjectLH(self, data):
+ """
+ Given a data map, extract the schema-identifier.
+ """
+ if data.__class__ != dict:
+ return None
+ if '_schema_id' in data:
+ return ClassKey(data['_schema_id'])
+ return None
+
+
+#===================================================================================================
+# Event
+#===================================================================================================
class Event:
""" """
- def __init__(self, session, broker, codec):
- self.session = session
- self.broker = broker
+ def __init__(self, agent, codec):
+ self.agent = agent
+ self.session = agent.session
+ self.broker = agent.broker
self.classKey = ClassKey(codec)
self.timestamp = codec.read_int64()
self.severity = codec.read_uint8()
- self.schema = None
- pname = self.classKey.getPackageName()
- pkey = self.classKey.getPackageKey()
- if pname in session.packages:
- if pkey in session.packages[pname]:
- self.schema = session.packages[pname][pkey]
- self.arguments = {}
- for arg in self.schema.arguments:
- self.arguments[arg.name] = session._decodeValue(codec, arg.type, broker)
+ self.arguments = {}
+ self.schema = self.session.schemaCache.getSchema(self.classKey)
+ if not self.schema:
+ return
+ for arg in self.schema.arguments:
+ self.arguments[arg.name] = self.session._decodeValue(codec, arg.type, self.broker)
def __repr__(self):
if self.schema == None:
@@ -1895,6 +2995,10 @@ class Event:
def getSchema(self):
return self.schema
+
+#===================================================================================================
+# SequenceManager
+#===================================================================================================
class SequenceManager:
""" Manage sequence numbers for asynchronous method calls """
def __init__(self):
@@ -1926,6 +3030,9 @@ class SequenceManager:
return data
+#===================================================================================================
+# DebugConsole
+#===================================================================================================
class DebugConsole(Console):
""" """
def brokerConnected(self, broker):
diff --git a/tools/src/py/qpid-cluster b/tools/src/py/qpid-cluster
index 6d64765184..7e608e4f2b 100755
--- a/tools/src/py/qpid-cluster
+++ b/tools/src/py/qpid-cluster
@@ -94,7 +94,7 @@ class BrokerManager:
self.broker = self.qmf.addBroker(brokerUrl, self.config._connTimeout)
agents = self.qmf.getAgents()
for a in agents:
- if a.getAgentBank() == 0:
+ if a.getAgentBank() == '0':
self.brokerAgent = a
def Disconnect(self):
diff --git a/tools/src/py/qpid-config b/tools/src/py/qpid-config
index 0db42bc6c7..24a20b0431 100755
--- a/tools/src/py/qpid-config
+++ b/tools/src/py/qpid-config
@@ -189,7 +189,7 @@ class BrokerManager:
self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
agents = self.qmf.getAgents()
for a in agents:
- if a.getAgentBank() == 0:
+ if a.getAgentBank() == '0':
self.brokerAgent = a
def Disconnect(self):
diff --git a/tools/src/py/qpid-stat b/tools/src/py/qpid-stat
index 1541d6ec9f..e3bfe288dd 100755
--- a/tools/src/py/qpid-stat
+++ b/tools/src/py/qpid-stat
@@ -91,7 +91,7 @@ class Broker(object):
agents = qmf.getAgents()
for a in agents:
- if a.getAgentBank() == 0:
+ if a.getAgentBank() == '0':
self.brokerAgent = a
bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _agent=self.brokerAgent)[0]