summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-03-26 19:01:46 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-03-26 19:01:46 +0000
commita43b0ae23cba1095ecad185980cf2ab8554f9117 (patch)
tree132a5c44622e7d26b9c5933067a9f6950b5d7620
parentb58c5c0d249f627dbf3eecc542b8cad4a489d7fa (diff)
downloadqpid-python-a43b0ae23cba1095ecad185980cf2ab8554f9117.tar.gz
dual qmf-mode broker agent
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@928017 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/include/qpid/management/ManagementEvent.h2
-rw-r--r--qpid/cpp/include/qpid/management/ManagementObject.h33
-rw-r--r--qpid/cpp/managementgen/qmfgen/management-types.xml18
-rwxr-xr-xqpid/cpp/managementgen/qmfgen/schema.py27
-rw-r--r--qpid/cpp/managementgen/qmfgen/templates/Class.cpp136
-rw-r--r--qpid/cpp/managementgen/qmfgen/templates/Class.h8
-rw-r--r--qpid/cpp/managementgen/qmfgen/templates/Event.cpp20
-rw-r--r--qpid/cpp/managementgen/qmfgen/templates/Event.h1
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h1
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp664
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h13
-rw-r--r--qpid/cpp/src/qpid/management/ManagementObject.cpp133
-rw-r--r--qpid/extras/qmf/src/py/qmf/console.py2
14 files changed, 767 insertions, 299 deletions
diff --git a/qpid/cpp/include/qpid/management/ManagementEvent.h b/qpid/cpp/include/qpid/management/ManagementEvent.h
index ce2f28b943..b809001b1b 100644
--- a/qpid/cpp/include/qpid/management/ManagementEvent.h
+++ b/qpid/cpp/include/qpid/management/ManagementEvent.h
@@ -45,7 +45,7 @@ class ManagementEvent : public ManagementItem {
virtual std::string& getPackageName() const = 0;
virtual uint8_t* getMd5Sum() const = 0;
virtual uint8_t getSeverity() const = 0;
- //virtual void encode(qpid::framing::Buffer&) const = 0;
+ virtual void encode(std::string&) const = 0;
virtual void mapEncode(qpid::messaging::VariantMap&) const = 0;
};
diff --git a/qpid/cpp/include/qpid/management/ManagementObject.h b/qpid/cpp/include/qpid/management/ManagementObject.h
index 1bb3c57ed2..f580dda575 100644
--- a/qpid/cpp/include/qpid/management/ManagementObject.h
+++ b/qpid/cpp/include/qpid/management/ManagementObject.h
@@ -53,14 +53,15 @@ class ObjectId {
protected:
const AgentAttachment* agent;
uint64_t first;
+ uint64_t second;
uint64_t agentEpoch;
std::string v2Key;
std::string agentName;
void fromString(const std::string&);
public:
- QPID_COMMON_EXTERN ObjectId() : agent(0), first(0) {}
+ QPID_COMMON_EXTERN ObjectId() : agent(0), first(0), second(0) {}
QPID_COMMON_EXTERN ObjectId(const messaging::Variant& map) :
- agent(0), first(0), agentEpoch(0) { mapDecode(map.asMap()); }
+ agent(0), first(0), second(0), agentEpoch(0) { mapDecode(map.asMap()); }
QPID_COMMON_EXTERN ObjectId(uint8_t flags, uint16_t seq, uint32_t broker);
QPID_COMMON_EXTERN ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq);
QPID_COMMON_EXTERN ObjectId(std::istream&);
@@ -70,6 +71,10 @@ public:
QPID_COMMON_EXTERN void mapEncode(messaging::VariantMap& map) const;
QPID_COMMON_EXTERN void mapDecode(const messaging::VariantMap& map);
QPID_COMMON_EXTERN operator messaging::VariantMap() const;
+ QPID_COMMON_EXTERN uint32_t encodedSize() const { return 16; };
+ QPID_COMMON_EXTERN void encode(std::string& buffer) const;
+ QPID_COMMON_EXTERN void decode(const std::string& buffer);
+ QPID_COMMON_EXTERN bool equalV1(const ObjectId &other) const;
QPID_COMMON_EXTERN void setV2Key(const std::string& _key) { v2Key = _key; }
QPID_COMMON_EXTERN void setV2Key(const ManagementObject& object);
QPID_COMMON_EXTERN void setAgentName(const std::string& _name) { agentName = _name; }
@@ -140,11 +145,11 @@ protected:
bool forcePublish;
QPID_COMMON_EXTERN int getThreadIndex();
- //QPID_COMMON_EXTERN void writeTimestamps(qpid::framing::Buffer& buf) const;
+ QPID_COMMON_EXTERN void writeTimestamps(std::string& buf) const;
QPID_COMMON_EXTERN void writeTimestamps(messaging::VariantMap& map) const;
- //QPID_COMMON_EXTERN void readTimestamps(qpid::framing::Buffer& buf);
+ QPID_COMMON_EXTERN void readTimestamps(const std::string& buf);
QPID_COMMON_EXTERN void readTimestamps(const messaging::VariantMap& buf);
- //QPID_COMMON_EXTERN uint32_t writeTimestampsSize() const;
+ QPID_COMMON_EXTERN uint32_t writeTimestampsSize() const;
public:
QPID_COMMON_EXTERN static const uint8_t MD5_LEN = 16;
@@ -160,15 +165,6 @@ protected:
virtual ~ManagementObject() {}
virtual writeSchemaCall_t getWriteSchemaCall() = 0;
- //virtual mapEncodeSchemaCall_t getMapEncodeSchemaCall() = 0;
- //virtual void readProperties(qpid::framing::Buffer& buf) = 0;
- //virtual uint32_t writePropertiesBufSize() const = 0;
- //virtual void writeProperties(qpid::framing::Buffer& buf) const = 0;
- //virtual void writeStatistics(qpid::framing::Buffer& buf,
- // bool skipHeaders = false) = 0;
- //virtual void doMethod(std::string& methodName,
- // qpid::framing::Buffer& inBuf,
- // qpid::framing::Buffer& outBuf) = 0;
virtual std::string getKey() const = 0;
// Encode & Decode the property and statistics values
@@ -180,7 +176,14 @@ protected:
virtual void doMethod(std::string& methodName,
const messaging::VariantMap& inMap,
messaging::VariantMap& outMap) = 0;
-
+ virtual uint32_t writePropertiesSize() const = 0;
+ virtual void readProperties(const std::string& buf) = 0;
+ virtual void writeProperties(std::string& buf) const = 0;
+ virtual void writeStatistics(std::string& buf,
+ bool skipHeaders = false) = 0;
+ virtual void doMethod(std::string& methodName,
+ const std::string& inBuf,
+ std::string& outBuf) = 0;
QPID_COMMON_EXTERN virtual void setReference(ObjectId objectId);
virtual std::string& getClassName() const = 0;
diff --git a/qpid/cpp/managementgen/qmfgen/management-types.xml b/qpid/cpp/managementgen/qmfgen/management-types.xml
index 95434a278b..f00958c22d 100644
--- a/qpid/cpp/managementgen/qmfgen/management-types.xml
+++ b/qpid/cpp/managementgen/qmfgen/management-types.xml
@@ -23,7 +23,10 @@
"map": cast to convert from native type to Variant constructor parameter
-->
-<type name="objId" base="REF" cpp="::qpid::management::ObjectId" encode="#.encode(@)" decode="#.decode(@)" stream="#.getV2Key()" size="16" accessor="direct" init="::qpid::management::ObjectId()" byRef="y"/>
+<type name="objId" base="REF" cpp="::qpid::management::ObjectId"
+ encode="{std::string _s; #.encode(_s); @.putRawData(_s);}"
+ decode="{std::string _s; @.getRawData(_s, #.encodedSize()); #.decode(_s);}"
+stream="#.getV2Key()" size="16" accessor="direct" init="::qpid::management::ObjectId()" byRef="y"/>
<type name="uint8" base="U8" cpp="uint8_t" encode="@.putOctet(#)" decode="# = @.getOctet()" stream="#" size="1" accessor="direct" init="0"/>
<type name="uint16" base="U16" cpp="uint16_t" encode="@.putShort(#)" decode="# = @.getShort()" stream="#" size="2" accessor="direct" init="0"/>
<type name="uint32" base="U32" cpp="uint32_t" encode="@.putLong(#)" decode="# = @.getLong()" stream="#" size="4" accessor="direct" init="0"/>
@@ -39,9 +42,18 @@
<type name="deltaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong(#)" decode="# = @.getLongLong()" stream="#" size="8" accessor="direct" init="0"/>
<type name="float" base="FLOAT" cpp="float" encode="@.putFloat(#)" decode="# = @.getFloat()" stream="#" size="4" accessor="direct" init="0."/>
<type name="double" base="DOUBLE" cpp="double" encode="@.putDouble(#)" decode="# = @.getDouble()" stream="#" size="8" accessor="direct" init="0."/>
-<type name="uuid" base="UUID" cpp="::qpid::messaging::Uuid" encode="#.encode(@)" decode="#.decode(@)" stream="#" size="16" accessor="direct" init="::qpid::messaging::Uuid()" byRef="y" unmap="(#).asUuid().data()" map="::qpid::messaging::Uuid((#).data())" />
-<type name="map" base="FTABLE" cpp="::qpid::messaging::VariantMap" encode="#.encode(@)" decode="#.decode(@)" stream="#" size="#.encodedSize()" accessor="direct" init="::qpid::messaging::VariantMap()" byRef="y" unmap="::qpid::messaging::VariantMap(); assert(false); /*TBD*/"/>
+<type name="uuid" base="UUID" cpp="::qpid::messaging::Uuid"
+ encode="{::qpid::framing::Uuid _u(#.data()); _u.encode(@); }"
+ decode="{::qpid::framing::Uuid _u; _u.decode(@); # = ::qpid::messaging::Uuid(_u.data());}"
+ stream="#" size="16" accessor="direct" init="::qpid::messaging::Uuid()" byRef="y" unmap="(#).asUuid().data()" map="::qpid::messaging::Uuid((#).data())" />
+<type name="map" base="FTABLE" cpp="::qpid::messaging::VariantMap"
+ encode="{::qpid::framing::FieldTable _f = ManagementAgent::fromMap(#); _f.encode(@);}"
+ decode="{::qpid::framing::FieldTable _f; _f.decode(@); # = ManagementAgent::toMap(_f);}"
+ size="::qpid::framing::FieldTable(ManagementAgent::fromMap(#)).encodedSize()"
+stream="#" accessor="direct" init="::qpid::messaging::VariantMap()" byRef="y" unmap="::qpid::messaging::VariantMap(); assert(false); /*TBD*/"/>
+<!-- not supported in V1
<type name="list" base="LIST" cpp="::qpid::messaging::Variant::List" encode="#.encode(@)" decode="#.decode(@)" stream="#" size="#.encodedSize()" accessor="direct" init="::qpid::messaging::Variant::List()" byRef="y" unmap="::qpid::messaging::Variant::List(); assert(false); /*TBD*/"/>
+-->
<type name="hilo8" base="U8" cpp="uint8_t" encode="@.putOctet(#)" decode="# = @.getOctet()" style="wm" stream="#" size="1" accessor="counter" init="0"/>
<type name="hilo16" base="U16" cpp="uint16_t" encode="@.putShort(#)" decode="# = @.getShort()" style="wm" stream="#" size="2" accessor="counter" init="0"/>
diff --git a/qpid/cpp/managementgen/qmfgen/schema.py b/qpid/cpp/managementgen/qmfgen/schema.py
index a827799b4b..30d7ba872d 100755
--- a/qpid/cpp/managementgen/qmfgen/schema.py
+++ b/qpid/cpp/managementgen/qmfgen/schema.py
@@ -1228,12 +1228,12 @@ class SchemaClass:
inArgCount = inArgCount + 1
if methodCount == 0:
- stream.write ("string&, Buffer&, Buffer& outBuf")
+ stream.write ("string&, const string&, string& outStr")
else:
if inArgCount == 0:
- stream.write ("string& methodName, Buffer&, Buffer& outBuf")
+ stream.write ("string& methodName, const string&, string& outStr")
else:
- stream.write ("string& methodName, Buffer& inBuf, Buffer& outBuf")
+ stream.write ("string& methodName, const string& inStr, string& outStr")
def genDoMapMethodArgs (self, stream, variables):
@@ -1338,8 +1338,22 @@ class SchemaClass:
stream.write ("%d" % len (self.methods))
def genMethodHandlers (self, stream, variables):
+ inArgs = False
+ for method in self.methods:
+ for arg in method.args:
+ if arg.getDir () == "I" or arg.getDir () == "IO":
+ inArgs = True;
+ break
+
+ if inArgs:
+ stream.write("\n")
+ stream.write(" char *_tmpBuf = new char[inStr.length()];\n")
+ stream.write(" memcpy(_tmpBuf, inStr.data(), inStr.length());\n")
+ stream.write(" ::qpid::framing::Buffer inBuf(_tmpBuf, inStr.length());\n")
+
for method in self.methods:
stream.write ("\n if (methodName == \"" + method.getName () + "\") {\n")
+ stream.write (" _matched = true;\n")
if method.getArgCount () == 0:
stream.write (" ::qpid::management::ArgsNone ioArgs;\n")
else:
@@ -1360,8 +1374,11 @@ class SchemaClass:
stream.write (" " +\
arg.type.type.getWriteCode ("ioArgs." +\
arg.dir.lower () + "_" +\
- arg.name, "outBuf") + ";\n")
- stream.write (" return;\n }\n")
+ arg.name, "outBuf") + ";\n")
+ stream.write(" }\n")
+
+ if inArgs:
+ stream.write ("\n delete [] _tmpBuf;\n")
diff --git a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp b/qpid/cpp/managementgen/qmfgen/templates/Class.cpp
index 84a6e0b450..0302e5eba2 100644
--- a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp
+++ b/qpid/cpp/managementgen/qmfgen/templates/Class.cpp
@@ -93,9 +93,9 @@ void /*MGEN:Class.NameCap*/::registerSelf(ManagementAgent* agent)
void /*MGEN:Class.NameCap*/::writeSchema (std::string& schema)
{
-#define BUFSIZE 65536
- char _msgChars[BUFSIZE];
- ::qpid::framing::Buffer buf(_msgChars, BUFSIZE);
+ const int _bufSize=65536;
+ char _msgChars[_bufSize];
+ ::qpid::framing::Buffer buf(_msgChars, _bufSize);
::qpid::framing::FieldTable ft;
// Schema class header:
@@ -134,6 +134,136 @@ void /*MGEN:Class.NameCap*/::aggregatePerThreadStats(struct PerThreadStats* tota
/*MGEN:ENDIF*/
+uint32_t /*MGEN:Class.NameCap*/::writePropertiesSize() const
+{
+ uint32_t size = writeTimestampsSize();
+/*MGEN:IF(Class.ExistOptionals)*/
+ size += /*MGEN:Class.PresenceMaskBytes*/;
+/*MGEN:ENDIF*/
+/*MGEN:Class.SizeProperties*/
+ return size;
+}
+
+void /*MGEN:Class.NameCap*/::readProperties (const std::string& _sBuf)
+{
+ char *_tmpBuf = new char[_sBuf.length()];
+ memcpy(_tmpBuf, _sBuf.data(), _sBuf.length());
+ ::qpid::framing::Buffer buf(_tmpBuf, _sBuf.length());
+ ::qpid::sys::Mutex::ScopedLock mutex(accessLock);
+
+ {
+ std::string _tbuf;
+ buf.getRawData(_tbuf, writeTimestampsSize());
+ readTimestamps(_tbuf);
+ }
+
+/*MGEN:IF(Class.ExistOptionals)*/
+ for (uint8_t idx = 0; idx < /*MGEN:Class.PresenceMaskBytes*/; idx++)
+ presenceMask[idx] = buf.getOctet();
+/*MGEN:ENDIF*/
+/*MGEN:Class.ReadProperties*/
+
+ delete [] _tmpBuf;
+}
+
+void /*MGEN:Class.NameCap*/::writeProperties (std::string& _sBuf) const
+{
+ const int _bufSize=65536;
+ char _msgChars[_bufSize];
+ ::qpid::framing::Buffer buf(_msgChars, _bufSize);
+
+ ::qpid::sys::Mutex::ScopedLock mutex(accessLock);
+ configChanged = false;
+
+ {
+ std::string _tbuf;
+ writeTimestamps(_tbuf);
+ buf.putRawData(_tbuf);
+ }
+
+
+/*MGEN:IF(Class.ExistOptionals)*/
+ for (uint8_t idx = 0; idx < /*MGEN:Class.PresenceMaskBytes*/; idx++)
+ buf.putOctet(presenceMask[idx]);
+/*MGEN:ENDIF*/
+/*MGEN:Class.WriteProperties*/
+
+ uint32_t _bufLen = buf.getPosition();
+ buf.reset();
+
+ buf.getRawData(_sBuf, _bufLen);
+}
+
+void /*MGEN:Class.NameCap*/::writeStatistics (std::string& _sBuf, bool skipHeaders)
+{
+ const int _bufSize=65536;
+ char _msgChars[_bufSize];
+ ::qpid::framing::Buffer buf(_msgChars, _bufSize);
+
+ ::qpid::sys::Mutex::ScopedLock mutex(accessLock);
+ instChanged = false;
+/*MGEN:IF(Class.ExistPerThreadAssign)*/
+ for (int idx = 0; idx < maxThreads; idx++) {
+ struct PerThreadStats* threadStats = perThreadStatsArray[idx];
+ if (threadStats != 0) {
+/*MGEN:Class.PerThreadAssign*/
+ }
+ }
+/*MGEN:ENDIF*/
+/*MGEN:IF(Class.ExistPerThreadStats)*/
+ struct PerThreadStats totals;
+ aggregatePerThreadStats(&totals);
+/*MGEN:ENDIF*/
+/*MGEN:Class.Assign*/
+ if (!skipHeaders) {
+ std::string _tbuf;
+ writeTimestamps (_tbuf);
+ buf.putRawData(_tbuf);
+ }
+
+/*MGEN:Class.WriteStatistics*/
+
+ // Maintenance of hi-lo statistics
+/*MGEN:Class.HiLoStatResets*/
+/*MGEN:IF(Class.ExistPerThreadResets)*/
+ for (int idx = 0; idx < maxThreads; idx++) {
+ struct PerThreadStats* threadStats = perThreadStatsArray[idx];
+ if (threadStats != 0) {
+/*MGEN:Class.PerThreadHiLoStatResets*/
+ }
+ }
+/*MGEN:ENDIF*/
+
+ uint32_t _bufLen = buf.getPosition();
+ buf.reset();
+
+ buf.getRawData(_sBuf, _bufLen);
+}
+
+void /*MGEN:Class.NameCap*/::doMethod (/*MGEN:Class.DoMethodArgs*/)
+{
+ Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+ std::string text;
+
+ bool _matched = false;
+
+ const int _bufSize=65536;
+ char _msgChars[_bufSize];
+ ::qpid::framing::Buffer outBuf(_msgChars, _bufSize);
+
+/*MGEN:Class.MethodHandlers*/
+
+ if (!_matched) {
+ outBuf.putLong(status);
+ outBuf.putShortString(Manageable::StatusText(status, text));
+ }
+
+ uint32_t _bufLen = outBuf.getPosition();
+ outBuf.reset();
+
+ outBuf.getRawData(outStr, _bufLen);
+}
+
std::string /*MGEN:Class.NameCap*/::getKey() const
{
std::stringstream key;
diff --git a/qpid/cpp/managementgen/qmfgen/templates/Class.h b/qpid/cpp/managementgen/qmfgen/templates/Class.h
index 2f057dc9d2..026b1d79eb 100644
--- a/qpid/cpp/managementgen/qmfgen/templates/Class.h
+++ b/qpid/cpp/managementgen/qmfgen/templates/Class.h
@@ -83,6 +83,14 @@ class /*MGEN:Class.NameCap*/ : public ::qpid::management::ManagementObject
const ::qpid::messaging::VariantMap& inMap,
::qpid::messaging::VariantMap& outMap);
std::string getKey() const;
+ uint32_t writePropertiesSize() const;
+ void readProperties(const std::string& buf);
+ void writeProperties(std::string& buf) const;
+ void writeStatistics(std::string& buf, bool skipHeaders = false);
+ void doMethod(std::string& methodName,
+ const std::string& inBuf,
+ std::string& outBuf);
+
writeSchemaCall_t getWriteSchemaCall() { return writeSchema; }
/*MGEN:IF(Class.NoStatistics)*/
// Stub for getInstChanged. There are no statistics in this class.
diff --git a/qpid/cpp/managementgen/qmfgen/templates/Event.cpp b/qpid/cpp/managementgen/qmfgen/templates/Event.cpp
index 0289d678ef..dea02fd545 100644
--- a/qpid/cpp/managementgen/qmfgen/templates/Event.cpp
+++ b/qpid/cpp/managementgen/qmfgen/templates/Event.cpp
@@ -58,9 +58,9 @@ void Event/*MGEN:Event.NameCap*/::registerSelf(ManagementAgent* agent)
void Event/*MGEN:Event.NameCap*/::writeSchema (std::string& schema)
{
-#define BUFSIZE 65536
- char _msgChars[BUFSIZE];
- ::qpid::framing::Buffer buf(_msgChars, BUFSIZE);
+ const int _bufSize = 65536;
+ char _msgChars[_bufSize];
+ ::qpid::framing::Buffer buf(_msgChars, _bufSize);
::qpid::framing::FieldTable ft;
// Schema class header:
@@ -80,6 +80,20 @@ void Event/*MGEN:Event.NameCap*/::writeSchema (std::string& schema)
}
}
+void Event/*MGEN:Event.NameCap*/::encode(std::string& _sBuf) const
+{
+ const int _bufSize=65536;
+ char _msgChars[_bufSize];
+ ::qpid::framing::Buffer buf(_msgChars, _bufSize);
+
+/*MGEN:Event.ArgEncodes*/
+
+ uint32_t _bufLen = buf.getPosition();
+ buf.reset();
+
+ buf.getRawData(_sBuf, _bufLen);
+}
+
void Event/*MGEN:Event.NameCap*/::mapEncode(::qpid::messaging::VariantMap& map) const
{
using namespace ::qpid::messaging;
diff --git a/qpid/cpp/managementgen/qmfgen/templates/Event.h b/qpid/cpp/managementgen/qmfgen/templates/Event.h
index 93def35ece..ab783c1698 100644
--- a/qpid/cpp/managementgen/qmfgen/templates/Event.h
+++ b/qpid/cpp/managementgen/qmfgen/templates/Event.h
@@ -51,6 +51,7 @@ class Event/*MGEN:Event.NameCap*/ : public ::qpid::management::ManagementEvent
std::string& getEventName() const { return eventName; }
uint8_t* getMd5Sum() const { return md5Sum; }
uint8_t getSeverity() const { return /*MGEN:Event.Severity*/; }
+ void encode(std::string& buffer) const;
void mapEncode(::qpid::messaging::VariantMap& map) const;
};
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index d94f228734..c5a7bb1ea9 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -93,7 +93,8 @@ Broker::Options::Options(const std::string& name) :
tcpNoDelay(false),
requireEncrypted(false),
maxSessionRate(0),
- asyncQueueEvents(false) // Must be false in a cluster.
+ asyncQueueEvents(false), // Must be false in a cluster.
+ qmf2Support(false)
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -114,6 +115,7 @@ Broker::Options::Options(const std::string& name) :
("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
("staging-threshold", optValue(stagingThreshold, "N"), "Stages messages over N bytes to disk")
("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
+ ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Use QMF v2 for Broker Management")
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval")
("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"),
"Interval between attempts to purge any expired messages from queues")
@@ -138,7 +140,9 @@ const std::string knownHostsNone("none");
Broker::Broker(const Broker::Options& conf) :
poller(new Poller),
config(conf),
- managementAgent(conf.enableMgmt ? new ManagementAgent() : 0),
+ managementAgent(conf.enableMgmt ? new ManagementAgent(!conf.qmf2Support,
+ conf.qmf2Support)
+ : 0),
store(new NullMessageStore),
acl(0),
dataDir(conf.noDataDir ? std::string() : conf.dataDir),
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 465a17f4eb..f9be992f0c 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -113,6 +113,7 @@ public:
std::string knownHosts;
uint32_t maxSessionRate;
bool asyncQueueEvents;
+ bool qmf2Support;
private:
std::string getHome();
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 7e8dd7e764..127c43a701 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -47,6 +47,7 @@ using namespace qpid::framing;
using namespace qpid::management;
using namespace qpid::broker;
using namespace qpid::sys;
+using namespace qpid;
using namespace std;
namespace _qmf = qmf::org::apache::qpid::broker;
@@ -76,10 +77,11 @@ ManagementAgent::RemoteAgent::~RemoteAgent ()
}
}
-ManagementAgent::ManagementAgent () :
+ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
threadPoolSize(1), interval(10), broker(0), timer(0),
startTime(uint64_t(Duration(now()))),
- suppressed(false), agentName("")
+ suppressed(false), agentName(""),
+ qmf1Support(qmfV1), qmf2Support(qmfV2)
{
nextObjectId = 1;
brokerBank = 1;
@@ -268,32 +270,56 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi
Mutex::ScopedLock lock (userLock);
uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
- ::qpid::messaging::Message msg;
- ::qpid::messaging::MapContent content(msg);
- ::qpid::messaging::VariantMap &map_ = content.asMap();
- ::qpid::messaging::VariantMap schemaId;
- ::qpid::messaging::VariantMap values;
- ::qpid::messaging::VariantMap headers;
-
- map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
- event.getEventName(),
- "_event",
- event.getMd5Sum());
- event.mapEncode(values);
- map_["_values"] = values;
- map_["_timestamp"] = uint64_t(Duration(now()));
- map_["_severity"] = sev;
-
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_data_indication";
- headers["qmf.content"] = "_event";
- headers["qmf.agent"] = std::string(agentName);
+ if (qmf1Support) {
+ Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
- content.encode();
+ encodeHeader(outBuffer, 'e');
+ outBuffer.putShortString(event.getPackageName());
+ outBuffer.putShortString(event.getEventName());
+ outBuffer.putBin128(event.getMd5Sum());
+ outBuffer.putLongLong(uint64_t(Duration(now())));
+ outBuffer.putOctet(sev);
+ std::string sBuf;
+ event.encode(sBuf);
+ outBuffer.putRawData(sBuf);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ sendBuffer(outBuffer, outLen, mExchange,
+ "console.event.1.0." + event.getPackageName() + "." + event.getEventName());
+ QPID_LOG(trace, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName());
+ }
+
+ if (qmf2Support) {
+ ::qpid::messaging::Message msg;
+ ::qpid::messaging::MapContent content(msg);
+ ::qpid::messaging::VariantMap &map_ = content.asMap();
+ ::qpid::messaging::VariantMap schemaId;
+ ::qpid::messaging::VariantMap values;
+ ::qpid::messaging::VariantMap headers;
+
+ map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
+ event.getEventName(),
+ "_event",
+ event.getMd5Sum());
+ event.mapEncode(values);
+ map_["_values"] = values;
+ map_["_timestamp"] = uint64_t(Duration(now()));
+ map_["_severity"] = sev;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_event";
+ headers["qmf.agent"] = std::string(agentName);
+
+ stringstream key;
+ key << "agent.ind.event." << sev << "." << std::string(agentName) << "." << event.getEventName();
+
+ content.encode();
+ sendBuffer(msg.getContent(), "", headers, v2Topic, key.str());
+ QPID_LOG(trace, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName());
+ }
- sendBuffer(msg.getContent(), "", headers, mExchange,
- "console.event.1.0." + event.getPackageName() + "." + event.getEventName());
- QPID_LOG(trace, "SEND raiseEvent class=" << event.getPackageName() << "." << event.getEventName());
}
ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
@@ -488,10 +514,15 @@ void ManagementAgent::moveNewObjectsLH()
void ManagementAgent::periodicProcessing (void)
{
+#define BUFSIZE 65536
+#define HEADROOM 4096
QPID_LOG(trace, "Management agent periodic processing");
Mutex::ScopedLock lock (userLock);
+ char msgChars[BUFSIZE];
+ uint32_t contentSize;
string routingKey;
list<pair<ObjectId, ManagementObject*> > deleteList;
+ std::string sBuf;
uint64_t uptime = uint64_t(Duration(now())) - startTime;
static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime);
@@ -533,6 +564,7 @@ void ManagementAgent::periodicProcessing (void)
!baseObject->isDeleted()))
continue;
+ Buffer msgBuffer(msgChars, BUFSIZE);
::qpid::messaging::Message m;
::qpid::messaging::ListContent content(m);
::qpid::messaging::Variant::List &list_ = content.asList();
@@ -550,7 +582,21 @@ void ManagementAgent::periodicProcessing (void)
send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted());
send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
- if (send_stats || send_props) {
+ if (send_props && qmf1Support) {
+ encodeHeader(msgBuffer, 'c');
+ sBuf.clear();
+ object->writeProperties(sBuf);
+ msgBuffer.putRawData(sBuf);
+ }
+
+ if (send_stats && qmf1Support) {
+ encodeHeader(msgBuffer, 'i');
+ sBuf.clear();
+ object->writeStatistics(sBuf);
+ msgBuffer.putRawData(sBuf);
+ }
+
+ if ((send_stats || send_props) && qmf2Support) {
::qpid::messaging::Variant::Map map_;
::qpid::messaging::Variant::Map values;
@@ -562,30 +608,49 @@ void ManagementAgent::periodicProcessing (void)
map_["_values"] = values;
list_.push_back(map_);
- if (send_props) pcount++;
- if (send_stats) scount++;
}
+ if (send_props) pcount++;
+ if (send_stats) scount++;
+
if (object->isDeleted())
deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object));
object->setForcePublish(false);
+
+ if (qmf1Support && (msgBuffer.available() < HEADROOM))
+ break;
}
}
- content.encode();
- const std::string &body = m.getContent();
- if (body.length()) {
- stringstream key;
- ::qpid::messaging::Variant::Map headers;
- key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName();
-
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_data_indication";
- headers["qmf.content"] = "_data";
- headers["qmf.agent"] = std::string(agentName);
-
- sendBuffer(body, "", headers, mExchange, key.str());
- QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
+ if (pcount || scount) {
+ if (qmf1Support) {
+ contentSize = BUFSIZE - msgBuffer.available();
+ if (contentSize > 0) {
+ msgBuffer.reset();
+ stringstream key;
+ key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName();
+ sendBuffer(msgBuffer, contentSize, mExchange, key.str());
+ QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
+ }
+ }
+
+ if (qmf2Support) {
+ content.encode();
+ const std::string &body = m.getContent();
+ if (body.length()) {
+ stringstream key;
+ ::qpid::messaging::Variant::Map headers;
+ key << "agent.ind.data." << baseObject->getPackageName() << "." << baseObject->getClassName();
+ // key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName();
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = std::string(agentName);
+
+ sendBuffer(body, "", headers, v2Topic, key.str());
+ QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
+ }
+ }
}
}
@@ -603,32 +668,48 @@ void ManagementAgent::periodicProcessing (void)
cdIter != deletedManagementObjects.end(); cdIter++) {
collisionDeletions = true;
{
- ::qpid::messaging::Message m;
- ::qpid::messaging::ListContent content(m);
- ::qpid::messaging::Variant::List &list_ = content.asList();
- ::qpid::messaging::Variant::Map map_;
- ::qpid::messaging::Variant::Map values;
- ::qpid::messaging::Variant::Map headers;
-
- map_["_schema_id"] = mapEncodeSchemaId((*cdIter)->getPackageName(),
- (*cdIter)->getClassName(),
- "_data",
- (*cdIter)->getMd5Sum());
- (*cdIter)->mapEncodeValues(values, true, false);
- map_["_values"] = values;
- list_.push_back(map_);
-
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_data_indication";
- headers["qmf.content"] = "_data";
- headers["qmf.agent"] = std::string(agentName);
-
- content.encode();
-
- stringstream key;
- key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName();
- sendBuffer(m.getContent(), "", headers, mExchange, key.str());
- QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str());
+ if (qmf1Support) {
+ Buffer msgBuffer(msgChars, BUFSIZE);
+ encodeHeader(msgBuffer, 'c');
+ sBuf.clear();
+ (*cdIter)->writeProperties(sBuf);
+ msgBuffer.putRawData(sBuf);
+ contentSize = BUFSIZE - msgBuffer.available ();
+ msgBuffer.reset ();
+ stringstream key;
+ key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName();
+ sendBuffer (msgBuffer, contentSize, mExchange, key.str());
+ QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str());
+ }
+
+ if (qmf2Support) {
+ ::qpid::messaging::Message m;
+ ::qpid::messaging::ListContent content(m);
+ ::qpid::messaging::Variant::List &list_ = content.asList();
+ ::qpid::messaging::Variant::Map map_;
+ ::qpid::messaging::Variant::Map values;
+ ::qpid::messaging::Variant::Map headers;
+
+ map_["_schema_id"] = mapEncodeSchemaId((*cdIter)->getPackageName(),
+ (*cdIter)->getClassName(),
+ "_data",
+ (*cdIter)->getMd5Sum());
+ (*cdIter)->mapEncodeValues(values, true, false);
+ map_["_values"] = values;
+ list_.push_back(map_);
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = std::string(agentName);
+
+ stringstream key;
+ key << "agent.ind.data." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName();
+
+ content.encode();
+ sendBuffer(m.getContent(), "", headers, v2Topic, key.str());
+ QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str());
+ }
}
}
@@ -663,33 +744,52 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
if (!object->isDeleted())
return;
- ::qpid::messaging::Message m;
- ::qpid::messaging::ListContent content(m);
- ::qpid::messaging::Variant::List &list_ = content.asList();
- ::qpid::messaging::Variant::Map map_;
- ::qpid::messaging::Variant::Map values;
-
- map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
- object->getClassName(),
- "_data",
- object->getMd5Sum());
- object->mapEncodeValues(values, true, false);
- map_["_values"] = values;
- list_.push_back(map_);
-
- stringstream key;
- key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName();
+ if (qmf1Support) {
+#define DNOW_BUFSIZE 2048
+ char msgChars[DNOW_BUFSIZE];
+ uint32_t contentSize;
+ Buffer msgBuffer(msgChars, DNOW_BUFSIZE);
+ std::string sBuf;
+
+ encodeHeader(msgBuffer, 'c');
+ object->writeProperties(sBuf);
+ msgBuffer.putRawData(sBuf);
+ contentSize = msgBuffer.getPosition();
+ msgBuffer.reset();
+ stringstream key;
+ key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName();
+ sendBuffer(msgBuffer, contentSize, mExchange, key.str());
+ QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str());
+ }
+
+ if (qmf2Support) {
+ ::qpid::messaging::Message m;
+ ::qpid::messaging::ListContent content(m);
+ ::qpid::messaging::Variant::List &list_ = content.asList();
+ ::qpid::messaging::Variant::Map map_;
+ ::qpid::messaging::Variant::Map values;
+
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ "_data",
+ object->getMd5Sum());
+ object->mapEncodeValues(values, true, false);
+ map_["_values"] = values;
+ list_.push_back(map_);
- content.encode();
+ stringstream key;
+ key << "agent.ind.data." << object->getPackageName() << "." << object->getClassName();
- ::qpid::messaging::Variant::Map headers;
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_data_indication";
- headers["qmf.content"] = "_data";
- headers["qmf.agent"] = std::string(agentName);
+ ::qpid::messaging::Variant::Map headers;
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = std::string(agentName);
- sendBuffer(m.getContent(), "", headers, mExchange, key.str());
- QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str());
+ content.encode();
+ sendBuffer(m.getContent(), "", headers, v2Topic, key.str());
+ QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str());
+ }
managementObjects.erase(oid);
}
@@ -749,14 +849,6 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey,
uint32_t sequence, const ConnectionToken* connToken)
{
-#if 1 // deprecated
- QPID_LOG(warning, "Ignoring old-format QMF Method Request!!!");
- // prevent "unused param" warnings
- (void)inBuffer;
- (void)replyToKey;
- (void)sequence;
- (void)connToken;
-#else
string methodName;
string packageName;
string className;
@@ -764,14 +856,19 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
AclModule* acl = broker->getAcl();
+ std::string inArgs;
- ObjectId objId(inBuffer);
+ std::string sBuf;
+ inBuffer.getRawData(sBuf, 16);
+ ObjectId objId;
+ objId.decode(sBuf);
inBuffer.getShortString(packageName);
inBuffer.getShortString(className);
inBuffer.getBin128(hash);
inBuffer.getShortString(methodName);
+ inBuffer.getRawData(inArgs, inBuffer.available());
- QPID_LOG(trace, "RECV MethodRequest class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" <<
+ QPID_LOG(trace, "RECV MethodRequest (v1) class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" <<
methodName << " replyTo=" << replyToKey);
encodeHeader(outBuffer, 'm', sequence);
@@ -783,8 +880,8 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence)
- return;
+ QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence);
+ return;
}
if (acl != 0) {
@@ -799,12 +896,12 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence)
- return;
+ QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
+ return;
}
}
- ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ ManagementObjectMap::iterator iter = numericFind(objId);
if (iter == managementObjects.end() || iter->second->isDeleted()) {
outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
@@ -818,7 +915,9 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
try {
outBuffer.record();
Mutex::ScopedUnlock u(userLock);
- iter->second->doMethod(methodName, inBuffer, outBuffer);
+ std::string outBuf;
+ iter->second->doMethod(methodName, inArgs, outBuf);
+ outBuffer.putRawData(outBuf);
} catch(exception& e) {
outBuffer.restore();
outBuffer.putLong(Manageable::STATUS_EXCEPTION);
@@ -829,8 +928,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND MethodResponse to=" << replyToKey << " seq=" << sequence);
-#endif
+ QPID_LOG(trace, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence);
}
@@ -857,7 +955,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
(outMap["_values"].asMap())["_status"] = Manageable::STATUS_PARAMETER_INVALID;
(outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID);
outMap.encode();
- sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo);
+ sendBuffer(outMsg.getContent(), cid, headers, v2Direct, replyTo);
QPID_LOG(trace, "SEND MethodResponse (invalid param) to=" << replyTo << " seq=" << cid);
return;
}
@@ -879,7 +977,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
(outMap["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION;
(outMap["_values"].asMap())["_status_text"] = e.what();
outMap.encode();
- sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo);
+ sendBuffer(outMsg.getContent(), cid, headers, v2Direct, replyTo);
QPID_LOG(trace, "SEND MethodResponse (invalid format) to=" << replyTo << " seq=" << cid);
return;
}
@@ -891,7 +989,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
(outMap["_values"].asMap())["_status"] = Manageable::STATUS_UNKNOWN_OBJECT;
(outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT);
outMap.encode();
- sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo);
+ sendBuffer(outMsg.getContent(), cid, headers, v2Direct, replyTo);
QPID_LOG(trace, "SEND MethodResponse (unknown object) to=" << replyTo << " seq=" << cid);
return;
}
@@ -906,7 +1004,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
(outMap["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN;
(outMap["_values"].asMap())["_status_text"] = i->second;
outMap.encode();
- sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo);
+ sendBuffer(outMsg.getContent(), cid, headers, v2Direct, replyTo);
QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << cid);
return;
}
@@ -922,7 +1020,7 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
(outMap["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN;
(outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN);
outMap.encode();
- sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo);
+ sendBuffer(outMsg.getContent(), cid, headers, v2Direct, replyTo);
QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << cid);
return;
}
@@ -930,6 +1028,10 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
// invoke the method
+ QPID_LOG(trace, "RECV MethodRequest (v2) class=" << iter->second->getPackageName()
+ << ":" << iter->second->getClassName() << " method=" <<
+ methodName << " replyTo=" << replyTo);
+
try {
iter->second->doMethod(methodName, inArgs, outMap.asMap());
} catch(exception& e) {
@@ -938,14 +1040,14 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep
(outMap["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION;
(outMap["_values"].asMap())["_status_text"] = e.what();
outMap.encode();
- sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo);
+ sendBuffer(outMsg.getContent(), cid, headers, v2Direct, replyTo);
QPID_LOG(trace, "SEND MethodResponse (exception) to=" << replyTo << " seq=" << cid);
return;
}
outMap.encode();
- sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo);
- QPID_LOG(trace, "SEND MethodResponse to=" << replyTo << " seq=" << cid);
+ sendBuffer(outMsg.getContent(), cid, headers, v2Direct, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid);
}
@@ -1286,7 +1388,7 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin
ft.decode(inBuffer);
- QPID_LOG(trace, "RECV GetQuery query=" << ft << " seq=" << sequence);
+ QPID_LOG(trace, "RECV GetQuery (v1) query=" << ft << " seq=" << sequence);
value = ft.get("_class");
if (value.get() == 0 || !value->convertsTo<string>()) {
@@ -1295,34 +1397,27 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin
return;
ObjectId selector(value->get<string>());
- ManagementObjectMap::iterator iter = managementObjects.find(selector);
+ ManagementObjectMap::iterator iter = numericFind(selector);
if (iter != managementObjects.end()) {
ManagementObject* object = iter->second;
- ::qpid::messaging::Message m;
- ::qpid::messaging::ListContent content(m);
- ::qpid::messaging::Variant::List &list_ = content.asList();
- ::qpid::messaging::Variant::Map map_;
- ::qpid::messaging::Variant::Map values;
- ::qpid::messaging::Variant::Map headers;
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
if (!object->isDeleted()) {
- object->mapEncodeValues(values, true, true); // write both stats and properties
- map_["_values"] = values;
- list_.push_back(map_);
-
- headers["method"] = "response";
- headers["qmf.opcode"] = "_query_response";
- headers["qmf.content"] = "_data";
- headers["qmf.agent"] = std::string(agentName);
-
- content.encode();
-
- sendBuffer(m.getContent(), boost::lexical_cast<std::string>(sequence),
- headers, dExchange, replyToKey);
- QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
+ std::string sBuf;
+ encodeHeader(outBuffer, 'g', sequence);
+ object->writeProperties(sBuf);
+ outBuffer.putRawData(sBuf);
+ sBuf.clear();
+ object->writeStatistics(sBuf, true);
+ outBuffer.putRawData(sBuf);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
}
}
sendCommandComplete(replyToKey, sequence);
@@ -1336,31 +1431,24 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin
iter++) {
ManagementObject* object = iter->second;
if (object->getClassName () == className) {
- ::qpid::messaging::Message m;
- ::qpid::messaging::ListContent content(m);
- ::qpid::messaging::Variant::List &list_ = content.asList();
- ::qpid::messaging::Variant::Map map_;
- ::qpid::messaging::Variant::Map values;
- ::qpid::messaging::Variant::Map headers;
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
if (!object->isDeleted()) {
- object->mapEncodeValues(values, true, true); // write both stats and properties
- map_["_values"] = values;
- list_.push_back(map_);
-
- headers["method"] = "response";
- headers["qmf.opcode"] = "_query_response";
- headers["qmf.content"] = "_data";
- headers["qmf.agent"] = std::string(agentName);
-
- content.encode();
-
- sendBuffer(m.getContent(), boost::lexical_cast<std::string>(sequence),
- headers, dExchange, replyToKey);
- QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
+ std::string sBuf;
+ encodeHeader(outBuffer, 'g', sequence);
+ object->writeProperties(sBuf);
+ outBuffer.putRawData(sBuf);
+ sBuf.clear();
+ object->writeStatistics(sBuf, true);
+ outBuffer.putRawData(sBuf);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
}
}
}
@@ -1386,7 +1474,7 @@ void ManagementAgent::handleGetQueryLH(const std::string& body, std::string& rep
qpid::messaging::MapView::const_iterator i;
::qpid::messaging::Variant::Map headers;
- QPID_LOG(trace, "RECV GetQuery: map=" << inMap << " seq=" << cid);
+ QPID_LOG(trace, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid);
headers["method"] = "response";
headers["qmf.opcode"] = "_query_response";
@@ -1435,7 +1523,7 @@ void ManagementAgent::handleGetQueryLH(const std::string& body, std::string& rep
list_.push_back(map_);
content.encode();
- sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo);
+ sendBuffer(outMsg.getContent(), cid, headers, v2Direct, replyTo);
}
}
}
@@ -1458,7 +1546,7 @@ void ManagementAgent::handleGetQueryLH(const std::string& body, std::string& rep
list_.push_back(map_);
content.encode();
- sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo);
+ sendBuffer(outMsg.getContent(), cid, headers, v2Direct, replyTo);
}
}
}
@@ -1468,8 +1556,8 @@ void ManagementAgent::handleGetQueryLH(const std::string& body, std::string& rep
list_.clear();
headers.erase("partial");
content.encode();
- sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo);
- QPID_LOG(trace, "SEND GetResponse to=" << replyTo << " seq=" << cid);
+ sendBuffer(outMsg.getContent(), cid, headers, v2Direct, replyTo);
+ QPID_LOG(trace, "SEND GetResponse (v2) to=" << replyTo << " seq=" << cid);
}
bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
@@ -1481,6 +1569,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
string packageName;
string className;
string methodName;
+ std::string cid;
if (msg.encodedSize() > MA_BUFFER_SIZE)
return false;
@@ -1499,16 +1588,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
mapMsg = true;
if (p && p->hasCorrelationId()) {
- std::string cid = p->getCorrelationId();
- if (!cid.empty()) {
- try {
- sequence = boost::lexical_cast<uint32_t>(cid);
- } catch(const boost::bad_lexical_cast&) {
- QPID_LOG(warning,
- "Bad correlation Id for received QMF authorize req: [" << cid << "]");
- return false;
- }
- }
+ cid = p->getCorrelationId();
}
if (headers->getAsString("qmf.opcode") == "_method_request")
@@ -1613,8 +1693,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN;
((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN);
outMap.encode();
- sendBuffer(outMsg.getContent(), boost::lexical_cast<std::string>(sequence),
- headers, dExchange, replyToKey);
+ sendBuffer(outMsg.getContent(), cid, headers, v2Direct, replyToKey);
} else {
@@ -1690,7 +1769,6 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg)
// old preV2 binary messages
-
while (inBuffer.getPosition() < bufferLen) {
uint32_t sequence;
if (!checkHeader(inBuffer, &opcode, &sequence))
@@ -1877,6 +1955,17 @@ size_t ManagementAgent::validateEventSchema(Buffer& inBuffer)
return end - start;
}
+ManagementObjectMap::iterator ManagementAgent::numericFind(const ObjectId& oid)
+{
+ ManagementObjectMap::iterator iter = managementObjects.begin();
+ for (; iter != managementObjects.end(); iter++) {
+ if (oid.equalV1(iter->first))
+ break;
+ }
+
+ return iter;
+}
+
void ManagementAgent::setAllocator(std::auto_ptr<IdAllocator> a)
{
Mutex::ScopedLock lock (userLock);
@@ -2125,41 +2214,192 @@ qpid::messaging::Variant::Map ManagementAgent::toMap(const FieldTable& from)
const string& key(iter->first);
const FieldTable::ValuePtr& val(iter->second);
- if (typeid(val.get()) == typeid(Str8Value) || typeid(val.get()) == typeid(Str16Value)) {
- map[key] = Variant(val->get<string>());
- } else if (typeid(val.get()) == typeid(FloatValue)) {
- map[key] = Variant(val->get<float>());
- } else if (typeid(val.get()) == typeid(DoubleValue)) {
- map[key] = Variant(val->get<double>());
- } else if (typeid(val.get()) == typeid(IntegerValue)) {
- map[key] = Variant(val->get<int>());
- } else if (typeid(val.get()) == typeid(TimeValue)) {
- map[key] = Variant(val->get<int64_t>());
- } else if (typeid(val.get()) == typeid(Integer64Value)) {
- map[key] = Variant(val->get<int64_t>());
- } else if (typeid(val.get()) == typeid(Unsigned64Value)) {
- map[key] = Variant(val->get<uint64_t>());
- } else if (typeid(val.get()) == typeid(FieldTableValue)) {
- map[key] = Variant(toMap(val->get<FieldTable>()));
- } else if (typeid(val.get()) == typeid(VoidValue)) {
- map[key] = Variant();
- } else if (typeid(val.get()) == typeid(BoolValue)) {
- map[key] = Variant(val->get<bool>());
- } else if (typeid(val.get()) == typeid(Unsigned8Value)) {
- map[key] = Variant(val->get<uint8_t>());
- } else if (typeid(val.get()) == typeid(Unsigned16Value)) {
- map[key] = Variant(val->get<uint16_t>());
- } else if (typeid(val.get()) == typeid(Unsigned32Value)) {
- map[key] = Variant(val->get<uint32_t>());
- } else if (typeid(val.get()) == typeid(Integer8Value)) {
- map[key] = Variant(val->get<int8_t>());
- } else if (typeid(val.get()) == typeid(Integer16Value)) {
- map[key] = Variant(val->get<int16_t>());
- } else if (typeid(val.get()) == typeid(UuidValue)) {
- map[key] = Variant(messaging::Uuid(val->get<framing::Uuid>().c_array()));
- }
+ map[key] = toVariant(val);
}
return map;
}
+// qpid::messaging::Variant::List ManagementAgent::toList(const qpid::framing::Array& from)
+// {
+// qpid::messaging::Variant::List _list;
+
+// for (qpid::framing::Array::const_iterator iter = from.begin(); iter != from.end(); iter++) {
+// const qpid::framing::Array::ValuePtr& val(*iter);
+
+// _list.push_back(toVariant(val));
+// }
+
+// return _list;
+// }
+
+qpid::framing::FieldTable ManagementAgent::fromMap(const qpid::messaging::Variant::Map& from)
+{
+ qpid::framing::FieldTable ft;
+
+ for (qpid::messaging::Variant::Map::const_iterator iter = from.begin();
+ iter != from.end();
+ iter++) {
+ const string& key(iter->first);
+ const qpid::messaging::Variant& val(iter->second);
+
+ ft.set(key, toFieldValue(val));
+ }
+
+ return ft;
+}
+
+
+// qpid::framing::Array ManagementAgent::fromList(const qpid::messaging::Variant::List& from)
+// {
+// qpid::framing::Array fa;
+
+// for (qpid::messaging::Variant::List::const_iterator iter = from.begin();
+// iter != from.end();
+// iter++) {
+// const qpid::messaging::Variant& val(*iter);
+
+// fa.push_back(toFieldValue(val));
+// }
+
+// return fa;
+// }
+
+
+boost::shared_ptr<FieldValue> ManagementAgent::toFieldValue(const Variant& in)
+{
+
+ switch(in.getType()) {
+
+ case messaging::VAR_VOID: return boost::shared_ptr<FieldValue>(new VoidValue());
+ case messaging::VAR_BOOL: return boost::shared_ptr<FieldValue>(new BoolValue(in.asBool()));
+ case messaging::VAR_UINT8: return boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8()));
+ case messaging::VAR_UINT16: return boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16()));
+ case messaging::VAR_UINT32: return boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32()));
+ case messaging::VAR_UINT64: return boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64()));
+ case messaging::VAR_INT8: return boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8()));
+ case messaging::VAR_INT16: return boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16()));
+ case messaging::VAR_INT32: return boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32()));
+ case messaging::VAR_INT64: return boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64()));
+ case messaging::VAR_FLOAT: return boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat()));
+ case messaging::VAR_DOUBLE: return boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble()));
+ case messaging::VAR_STRING: return boost::shared_ptr<FieldValue>(new Str16Value(in.asString()));
+ case messaging::VAR_UUID: return boost::shared_ptr<FieldValue>(new UuidValue(in.asUuid().data()));
+ case messaging::VAR_MAP: return boost::shared_ptr<FieldValue>(new FieldTableValue(ManagementAgent::fromMap(in.asMap())));
+ default:
+ break;
+ //case messaging::VAR_LIST: return boost::shared_ptr<FieldValue>(new ArrayValue(ManagementAgent::fromList(in.asList())));
+ }
+
+ QPID_LOG(error, "Unknown Variant type - not converted: [" << in.getType() << "]");
+ return boost::shared_ptr<FieldValue>(new VoidValue());
+}
+
+// stolen from qpid/client/amqp0_10/Codecs.cpp - TODO: make Codecs public, and remove this dup.
+qpid::messaging::Variant ManagementAgent::toVariant(const boost::shared_ptr<FieldValue>& in)
+{
+ const std::string iso885915("iso-8859-15");
+ const std::string utf8("utf8");
+ const std::string utf16("utf16");
+ //const std::string binary("binary");
+ const std::string amqp0_10_binary("amqp0-10:binary");
+ //const std::string amqp0_10_bit("amqp0-10:bit");
+ const std::string amqp0_10_datetime("amqp0-10:datetime");
+ const std::string amqp0_10_struct("amqp0-10:struct");
+ Variant out;
+
+ //based on AMQP 0-10 typecode, pick most appropriate variant type
+ switch (in->getType()) {
+ //Fixed Width types:
+ case 0x00: //bin8
+ case 0x01: out.setEncoding(amqp0_10_binary); // int8
+ case 0x02: out = in->getIntegerValue<int8_t, 1>(); break; //uint8
+ case 0x03: out = in->getIntegerValue<uint8_t, 1>(); break; //
+ // case 0x04: break; //TODO: iso-8859-15 char // char
+ case 0x08: out = static_cast<bool>(in->getIntegerValue<uint8_t, 1>()); break; // bool int8
+
+ case 0x10: out.setEncoding(amqp0_10_binary); // bin16
+ case 0x11: out = in->getIntegerValue<int16_t, 2>(); break; // int16
+ case 0x12: out = in->getIntegerValue<uint16_t, 2>(); break; //uint16
+
+ case 0x20: out.setEncoding(amqp0_10_binary); // bin32
+ case 0x21: out = in->getIntegerValue<int32_t, 4>(); break; // int32
+ case 0x22: out = in->getIntegerValue<uint32_t, 4>(); break; // uint32
+
+ case 0x23: out = in->get<float>(); break; // float(32)
+
+ // case 0x27: break; //TODO: utf-32 char
+
+ case 0x30: out.setEncoding(amqp0_10_binary); // bin64
+ case 0x31: out = in->getIntegerValue<int64_t, 8>(); break; //int64
+
+ case 0x38: out.setEncoding(amqp0_10_datetime); //treat datetime as uint64_t, but set encoding
+ case 0x32: out = in->getIntegerValue<uint64_t, 8>(); break; //uint64
+ case 0x33: out = in->get<double>(); break; // double
+
+ case 0x48: // uuid
+ {
+ unsigned char data[16];
+ in->getFixedWidthValue<16>(data);
+ out = qpid::messaging::Uuid(data);
+ } break;
+
+ //TODO: figure out whether and how to map values with codes 0x40-0xd8
+
+ case 0xf0: break;//void, which is the default value for Variant
+ // case 0xf1: out.setEncoding(amqp0_10_bit); break;//treat 'bit' as void, which is the default value for Variant
+
+ //Variable Width types:
+ //strings:
+ case 0x80: // str8
+ case 0x90: // str16
+ case 0xa0: // str32
+ out = in->get<std::string>();
+ out.setEncoding(amqp0_10_binary);
+ break;
+
+ case 0x84: // str8
+ case 0x94: // str16
+ out = in->get<std::string>();
+ out.setEncoding(iso885915);
+ break;
+
+ case 0x85: // str8
+ case 0x95: // str16
+ out = in->get<std::string>();
+ out.setEncoding(utf8);
+ break;
+
+ case 0x86: // str8
+ case 0x96: // str16
+ out = in->get<std::string>();
+ out.setEncoding(utf16);
+ break;
+
+ case 0xab: // str32
+ out = in->get<std::string>();
+ out.setEncoding(amqp0_10_struct);
+ break;
+
+ case 0xa8: // map
+ out = ManagementAgent::toMap(in->get<FieldTable>());
+ break;
+
+ //case 0xa9: // list of variant types
+ // out = Variant::List();
+ // translate<List>(in, out.asList(), &toVariant);
+ // break;
+ //case 0xaa: //convert amqp0-10 array (uniform type) into variant list
+ // out = Variant::List();
+ // translate<Array>(in, out.asList(), &toVariant);
+ // break;
+
+ default:
+ //error?
+ QPID_LOG(error, "Unknown FieldValue type - not converted: [" << (unsigned int)(in->getType()) << "]");
+ break;
+ }
+
+ return out;
+}
+
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index 675bcb7774..44d3961d52 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -34,6 +34,7 @@
#include "qmf/org/apache/qpid/broker/Agent.h"
#include "qpid/messaging/Variant.h"
#include <qpid/framing/AMQFrame.h>
+#include <qpid/framing/FieldValue.h>
#include <memory>
#include <string>
#include <map>
@@ -63,7 +64,7 @@ public:
} severity_t;
- ManagementAgent ();
+ ManagementAgent (const bool qmfV1, const bool qmfV2);
virtual ~ManagementAgent ();
/** Called before plugins are initialized */
@@ -132,7 +133,14 @@ public:
uint16_t getBootSequence(void) { return bootSequence; }
void setBootSequence(uint16_t b) { bootSequence = b; }
+ // TODO: remove these when Variant API moved into common library.
static messaging::Variant::Map toMap(const framing::FieldTable& from);
+ static framing::FieldTable fromMap(const messaging::Variant::Map& from);
+ //static messaging::Variant::List toList(const framing::Array& from);
+ //static framing::Array fromList(const messaging::Variant::List& from);
+ static boost::shared_ptr<framing::FieldValue> toFieldValue(const messaging::Variant& in);
+ static messaging::Variant toVariant(const boost::shared_ptr<framing::FieldValue>& val);
+
private:
struct Periodic : public qpid::sys::TimerTask
@@ -270,6 +278,8 @@ private:
typedef std::map<MethodName, std::string> DisallowedMethods;
DisallowedMethods disallowed;
std::string agentName; // KAG TODO FIX
+ bool qmf1Support;
+ bool qmf2Support;
# define MA_BUFFER_SIZE 65536
@@ -329,6 +339,7 @@ private:
size_t validateSchema(framing::Buffer&, uint8_t kind);
size_t validateTableSchema(framing::Buffer&);
size_t validateEventSchema(framing::Buffer&);
+ ManagementObjectMap::iterator numericFind(const ObjectId& oid);
std::string debugSnapshot();
};
diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp
index 6184ed0697..b37c8df731 100644
--- a/qpid/cpp/src/qpid/management/ManagementObject.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp
@@ -21,7 +21,8 @@
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementObject.h"
-//#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/Buffer.h"
#include "qpid/sys/Thread.h"
#include <stdlib.h>
@@ -131,28 +132,40 @@ bool ObjectId::operator<(const ObjectId &other) const
return v2Key < other.v2Key;
}
-#if 0
bool ObjectId::equalV1(const ObjectId &other) const
{
uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL;
return first == otherFirst && second == other.second;
}
-#endif
-// void ObjectId::encode(framing::Buffer& buffer) const
-// {
-// if (agent == 0)
-// buffer.putLongLong(first);
-// else
-// buffer.putLongLong(first | agent->first);
-// buffer.putLongLong(second);
-// }
-
-// void ObjectId::decode(framing::Buffer& buffer)
-// {
-// first = buffer.getLongLong();
-// second = buffer.getLongLong();
-// }
+void ObjectId::encode(std::string& buffer) const
+{
+ const uint32_t len = 16;
+ char _data[len];
+ qpid::framing::Buffer body(_data, len);
+
+ if (agent == 0)
+ body.putLongLong(first);
+ else
+ body.putLongLong(first | agent->first);
+ body.putLongLong(second);
+
+ body.reset();
+ body.getRawData(buffer, len);
+}
+
+void ObjectId::decode(const std::string& buffer)
+{
+ const uint32_t len = 16;
+ char _data[len];
+ qpid::framing::Buffer body(_data, len);
+
+ body.checkAvailable(buffer.length());
+ body.putRawData(buffer);
+ body.reset();
+ first = body.getLongLong();
+ second = body.getLongLong();
+}
void ObjectId::setV2Key(const ManagementObject& object)
{
@@ -223,42 +236,56 @@ std::ostream& operator<<(std::ostream& out, const ObjectId& i)
int ManagementObject::maxThreads = 1;
int ManagementObject::nextThreadIndex = 0;
-// void ManagementObject::writeTimestamps (framing::Buffer& buf) const
-// {
-// buf.putShortString (getPackageName ());
-// buf.putShortString (getClassName ());
-// buf.putBin128 (getMd5Sum ());
-// buf.putLongLong (updateTime);
-// buf.putLongLong (createTime);
-// buf.putLongLong (destroyTime);
-// objectId.encode(buf);
-// }
-
-// void ManagementObject::readTimestamps (framing::Buffer& buf)
-// {
-// std::string unused;
-// uint8_t unusedUuid[16];
-// ObjectId unusedObjectId;
-
-// buf.getShortString(unused);
-// buf.getShortString(unused);
-// buf.getBin128(unusedUuid);
-// updateTime = buf.getLongLong();
-// createTime = buf.getLongLong();
-// destroyTime = buf.getLongLong();
-// unusedObjectId.decode(buf);
-// }
-
-// uint32_t ManagementObject::writeTimestampsSize() const
-// {
-// return 1 + getPackageName().length() + // str8
-// 1 + getClassName().length() + // str8
-// 16 + // bin128
-// 8 + // uint64
-// 8 + // uint64
-// 8 + // uint64
-// objectId.encodedSize(); // objectId
-// }
+void ManagementObject::writeTimestamps (std::string& buf) const
+{
+ char _data[4000];
+ qpid::framing::Buffer body(_data, 4000);
+
+ body.putShortString (getPackageName ());
+ body.putShortString (getClassName ());
+ body.putBin128 (getMd5Sum ());
+ body.putLongLong (updateTime);
+ body.putLongLong (createTime);
+ body.putLongLong (destroyTime);
+
+ uint32_t len = body.getPosition();
+ body.reset();
+ body.getRawData(buf, len);
+
+ std::string oid;
+ objectId.encode(oid);
+ buf += oid;
+}
+
+void ManagementObject::readTimestamps (const std::string& buf)
+{
+ char _data[4000];
+ qpid::framing::Buffer body(_data, 4000);
+ std::string unused;
+ uint8_t unusedUuid[16];
+
+ body.checkAvailable(buf.length());
+ body.putRawData(buf);
+ body.reset();
+
+ body.getShortString(unused);
+ body.getShortString(unused);
+ body.getBin128(unusedUuid);
+ updateTime = body.getLongLong();
+ createTime = body.getLongLong();
+ destroyTime = body.getLongLong();
+}
+
+uint32_t ManagementObject::writeTimestampsSize() const
+{
+ return 1 + getPackageName().length() + // str8
+ 1 + getClassName().length() + // str8
+ 16 + // bin128
+ 8 + // uint64
+ 8 + // uint64
+ 8 + // uint64
+ objectId.encodedSize(); // objectId
+}
void ManagementObject::writeTimestamps (messaging::VariantMap& map) const
diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py
index d3cbf76dc7..8d12577deb 100644
--- a/qpid/extras/qmf/src/py/qmf/console.py
+++ b/qpid/extras/qmf/src/py/qmf/console.py
@@ -341,7 +341,7 @@ class Object(object):
ttl = timeWait * 1000
else:
ttl = None
- smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" %
+ smsg = self._broker._message(sendCodec.encoded, "agent.%d.%s" %
(self._objectId.getBrokerBank(), self._objectId.getAgentBank()),
ttl=ttl)
if synchronous: