summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNuno Santos <nsantos@apache.org>2008-03-25 20:30:01 +0000
committerNuno Santos <nsantos@apache.org>2008-03-25 20:30:01 +0000
commitaefbf926e26e61460a5a11533361a8da9c11bb9c (patch)
tree3971bd2ad4ae9108b0b20239bf08020e9753b4ce
parent5444def1a124b7ef609ad2a585d333a4654c736a (diff)
downloadqpid-python-aefbf926e26e61460a5a11533361a8da9c11bb9c.tar.gz
QPID-877: applied patch from Ted Ross
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@640970 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/managementgen/templates/Class.cpp5
-rw-r--r--cpp/managementgen/templates/Class.h6
-rw-r--r--cpp/src/qpid/broker/Broker.cpp3
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp182
-rw-r--r--cpp/src/qpid/management/ManagementAgent.h30
-rw-r--r--cpp/src/qpid/management/ManagementObject.h4
-rw-r--r--python/mgmt-cli/managementdata.py8
-rw-r--r--python/qpid/management.py137
-rw-r--r--specs/management-schema.xml9
-rw-r--r--specs/management-types.xml25
10 files changed, 294 insertions, 115 deletions
diff --git a/cpp/managementgen/templates/Class.cpp b/cpp/managementgen/templates/Class.cpp
index 2a3f71e262..3c3dfff5a2 100644
--- a/cpp/managementgen/templates/Class.cpp
+++ b/cpp/managementgen/templates/Class.cpp
@@ -106,11 +106,12 @@ void /*MGEN:Class.NameCap*/::writeConfig (Buffer& buf)
/*MGEN:Class.WriteConfig*/
}
-void /*MGEN:Class.NameCap*/::writeInstrumentation (Buffer& buf)
+void /*MGEN:Class.NameCap*/::writeInstrumentation (Buffer& buf, bool skipHeaders)
{
instChanged = false;
- writeTimestamps (buf);
+ if (!skipHeaders)
+ writeTimestamps (buf);
/*MGEN:Class.WriteInst*/
// Maintenance of hi-lo statistics
diff --git a/cpp/managementgen/templates/Class.h b/cpp/managementgen/templates/Class.h
index 82fac00d47..047d7cc950 100644
--- a/cpp/managementgen/templates/Class.h
+++ b/cpp/managementgen/templates/Class.h
@@ -25,8 +25,9 @@
#include "qpid/sys/Mutex.h"
#include "qpid/management/ManagementObject.h"
+#include "qpid/framing/Uuid.h"
-namespace qpid {
+namespace qpid {
namespace management {
class /*MGEN:Class.NameCap*/ : public ManagementObject
@@ -45,7 +46,8 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject
// Private Methods
static void writeSchema (qpid::framing::Buffer& buf);
void writeConfig (qpid::framing::Buffer& buf);
- void writeInstrumentation (qpid::framing::Buffer& buf);
+ void writeInstrumentation (qpid::framing::Buffer& buf,
+ bool skipHeaders = false);
void doMethod (std::string methodName,
qpid::framing::Buffer& inBuf,
qpid::framing::Buffer& outBuf);
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index bbcdb9cbce..a183ce9d02 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -127,7 +127,8 @@ Broker::Broker(const Broker::Options& conf) :
dtxManager.setStore (store);
if(conf.enableMgmt){
- ManagementAgent::enableManagement ();
+ ManagementAgent::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (),
+ conf.mgmtPubInterval);
managementAgent = ManagementAgent::getAgent ();
managementAgent->setInterval (conf.mgmtPubInterval);
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 46c780fc9f..a5ed84fb32 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -25,6 +25,8 @@
#include <qpid/broker/Message.h>
#include <qpid/broker/MessageDelivery.h>
#include <list>
+#include <iostream>
+#include <fstream>
using boost::intrusive_ptr;
using namespace qpid::framing;
@@ -36,25 +38,62 @@ using namespace std;
ManagementAgent::shared_ptr ManagementAgent::agent;
bool ManagementAgent::enabled = 0;
-ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval)
+ManagementAgent::ManagementAgent (string _dataDir, uint16_t _interval) :
+ dataDir (_dataDir), interval (_interval)
{
timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval)));
nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ()));
nextRemotePrefix = 101;
+
+ // Get from file or generate and save to file.
+ if (dataDir.empty ())
+ {
+ uuid.generate ();
+ QPID_LOG (info, "ManagementAgent has no data directory, generated new broker ID: "
+ << uuid);
+ }
+ else
+ {
+ string filename (dataDir + "/brokerId");
+ ifstream inFile (filename.c_str ());
+
+ if (inFile.good ())
+ {
+ inFile >> uuid;
+ inFile.close ();
+ QPID_LOG (debug, "ManagementAgent restored broker ID: " << uuid);
+ }
+ else
+ {
+ uuid.generate ();
+ QPID_LOG (info, "ManagementAgent generated broker ID: " << uuid);
+
+ ofstream outFile (filename.c_str ());
+ if (outFile.good ())
+ {
+ outFile << uuid << endl;
+ outFile.close ();
+ QPID_LOG (debug, "ManagementAgent saved broker ID");
+ }
+ else
+ {
+ QPID_LOG (warning, "ManagementAgent unable to save broker ID");
+ }
+ }
+ }
}
ManagementAgent::~ManagementAgent () {}
-void ManagementAgent::enableManagement (void)
+void ManagementAgent::enableManagement (string dataDir, uint16_t interval)
{
enabled = 1;
+ if (agent.get () == 0)
+ agent = shared_ptr (new ManagementAgent (dataDir, interval));
}
ManagementAgent::shared_ptr ManagementAgent::getAgent (void)
{
- if (enabled && agent.get () == 0)
- agent = shared_ptr (new ManagementAgent (10));
-
return agent;
}
@@ -122,27 +161,25 @@ void ManagementAgent::clientAdded (void)
}
}
-void ManagementAgent::EncodeHeader (Buffer& buf, uint8_t opcode, uint8_t cls)
+void ManagementAgent::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
{
buf.putOctet ('A');
buf.putOctet ('M');
- buf.putOctet ('0');
buf.putOctet ('1');
buf.putOctet (opcode);
- buf.putOctet (cls);
+ buf.putLong (seq);
}
-bool ManagementAgent::CheckHeader (Buffer& buf, uint8_t *opcode, uint8_t *cls)
+bool ManagementAgent::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
{
uint8_t h1 = buf.getOctet ();
uint8_t h2 = buf.getOctet ();
uint8_t h3 = buf.getOctet ();
- uint8_t h4 = buf.getOctet ();
*opcode = buf.getOctet ();
- *cls = buf.getOctet ();
+ *seq = buf.getLong ();
- return h1 == 'A' && h2 == 'M' && h3 == '0' && h4 == '1';
+ return h1 == 'A' && h2 == 'M' && h3 == '1';
}
void ManagementAgent::SendBuffer (Buffer& buf,
@@ -199,24 +236,24 @@ void ManagementAgent::PeriodicProcessing (void)
if (object->getConfigChanged () || object->isDeleted ())
{
Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer, 'C', 'C');
+ EncodeHeader (msgBuffer, 'c');
object->writeConfig (msgBuffer);
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
- routingKey = "mgmt.config." + object->getClassName ();
+ routingKey = "mgmt." + uuid.str() + ".config." + object->getClassName ();
SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
if (object->getInstChanged ())
{
Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer, 'C', 'I');
+ EncodeHeader (msgBuffer, 'i');
object->writeInstrumentation (msgBuffer);
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
- routingKey = "mgmt.inst." + object->getClassName ();
+ routingKey = "mgmt." + uuid.str () + ".inst." + object->getClassName ();
SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
@@ -233,6 +270,20 @@ void ManagementAgent::PeriodicProcessing (void)
deleteList.clear ();
}
+void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence,
+ uint32_t code, string text)
+{
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'z', sequence);
+ outBuffer.putLong (code);
+ outBuffer.putShortString (text);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+}
+
void ManagementAgent::dispatchCommand (Deliverable& deliverable,
const string& routingKey,
const FieldTable* /*args*/)
@@ -295,13 +346,13 @@ void ManagementAgent::dispatchMethod (Message& msg,
Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
- uint8_t opcode, unused;
+ uint32_t outLen, sequence;
+ uint8_t opcode;
msg.encodeContent (inBuffer);
inBuffer.reset ();
- if (!CheckHeader (inBuffer, &opcode, &unused))
+ if (!CheckHeader (inBuffer, &opcode, &sequence))
{
QPID_LOG (debug, " Invalid content header");
return;
@@ -313,8 +364,7 @@ void ManagementAgent::dispatchMethod (Message& msg,
return;
}
- uint32_t methodId = inBuffer.getLong ();
- uint64_t objId = inBuffer.getLongLong ();
+ uint64_t objId = inBuffer.getLongLong ();
string replyToKey;
const framing::MessageProperties* p =
@@ -330,8 +380,7 @@ void ManagementAgent::dispatchMethod (Message& msg,
return;
}
- EncodeHeader (outBuffer, 'm');
- outBuffer.putLong (methodId);
+ EncodeHeader (outBuffer, 'm', sequence);
ManagementObjectMap::iterator iter = managementObjects.find (objId);
if (iter == managementObjects.end ())
@@ -349,22 +398,20 @@ void ManagementAgent::dispatchMethod (Message& msg,
SendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
-void ManagementAgent::handleHello (Buffer&, string replyToKey)
+void ManagementAgent::handleBrokerRequest (Buffer&, string replyToKey, uint32_t sequence)
{
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- uint8_t* dat = (uint8_t*) "Broker ID";
- EncodeHeader (outBuffer, 'I');
- outBuffer.putShort (9);
- outBuffer.putRawData (dat, 9);
+ EncodeHeader (outBuffer, 'b', sequence);
+ uuid.encode (outBuffer);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
SendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
-void ManagementAgent::handlePackageQuery (Buffer&, string replyToKey)
+void ManagementAgent::handlePackageQuery (Buffer&, string replyToKey, uint32_t sequence)
{
for (PackageMap::iterator pIter = packages.begin ();
pIter != packages.end ();
@@ -373,15 +420,17 @@ void ManagementAgent::handlePackageQuery (Buffer&, string replyToKey)
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'p');
+ EncodeHeader (outBuffer, 'p', sequence);
EncodePackageIndication (outBuffer, pIter);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
SendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
+
+ sendCommandComplete (replyToKey, sequence);
}
-void ManagementAgent::handlePackageInd (Buffer& inBuffer, string /*replyToKey*/)
+void ManagementAgent::handlePackageInd (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/)
{
std::string packageName;
@@ -389,7 +438,7 @@ void ManagementAgent::handlePackageInd (Buffer& inBuffer, string /*replyToKey*/)
FindOrAddPackage (packageName);
}
-void ManagementAgent::handleClassQuery (Buffer& inBuffer, string replyToKey)
+void ManagementAgent::handleClassQuery (Buffer& inBuffer, string replyToKey, uint32_t sequence)
{
std::string packageName;
@@ -405,16 +454,18 @@ void ManagementAgent::handleClassQuery (Buffer& inBuffer, string replyToKey)
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'q');
+ EncodeHeader (outBuffer, 'q', sequence);
EncodeClassIndication (outBuffer, pIter, cIter);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
SendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
}
+
+ sendCommandComplete (replyToKey, sequence);
}
-void ManagementAgent::handleSchemaQuery (Buffer& inBuffer, string replyToKey)
+void ManagementAgent::handleSchemaQuery (Buffer& inBuffer, string replyToKey, uint32_t sequence)
{
string packageName;
SchemaClassKey key;
@@ -436,7 +487,7 @@ void ManagementAgent::handleSchemaQuery (Buffer& inBuffer, string replyToKey)
if (classInfo.writeSchemaCall != 0)
{
- EncodeHeader (outBuffer, 's');
+ EncodeHeader (outBuffer, 's', sequence);
classInfo.writeSchemaCall (outBuffer);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
@@ -459,7 +510,7 @@ uint32_t ManagementAgent::assignPrefix (uint32_t /*requestedPrefix*/)
return nextRemotePrefix++;
}
-void ManagementAgent::handleAttachRequest (Buffer& inBuffer, string replyToKey)
+void ManagementAgent::handleAttachRequest (Buffer& inBuffer, string replyToKey, uint32_t sequence)
{
string label;
uint32_t requestedPrefix;
@@ -472,17 +523,55 @@ void ManagementAgent::handleAttachRequest (Buffer& inBuffer, string replyToKey)
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'a');
+ EncodeHeader (outBuffer, 'a', sequence);
outBuffer.putLong (assignedPrefix);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
SendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
+void ManagementAgent::handleGetRequest (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+{
+ FieldTable ft;
+ FieldTable::ValuePtr value;
+
+ ft.decode (inBuffer);
+ value = ft.get ("_class");
+ if (value->empty () || !value->convertsTo<string> ())
+ {
+ // TODO: Send completion with an error code
+ return;
+ }
+
+ string className (value->get<string> ());
+
+ for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+ iter != managementObjects.end ();
+ iter++)
+ {
+ ManagementObject::shared_ptr object = iter->second;
+ if (object->getClassName () == className)
+ {
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'g', sequence);
+ object->writeConfig (outBuffer);
+ object->writeInstrumentation (outBuffer, true);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ }
+ }
+
+ sendCommandComplete (replyToKey, sequence);
+}
+
void ManagementAgent::dispatchAgentCommand (Message& msg)
{
Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
- uint8_t opcode, unused;
+ uint8_t opcode;
+ uint32_t sequence;
string replyToKey;
const framing::MessageProperties* p =
@@ -498,15 +587,16 @@ void ManagementAgent::dispatchAgentCommand (Message& msg)
msg.encodeContent (inBuffer);
inBuffer.reset ();
- if (!CheckHeader (inBuffer, &opcode, &unused))
+ if (!CheckHeader (inBuffer, &opcode, &sequence))
return;
- if (opcode == 'H') handleHello (inBuffer, replyToKey);
- else if (opcode == 'P') handlePackageQuery (inBuffer, replyToKey);
- else if (opcode == 'p') handlePackageInd (inBuffer, replyToKey);
- else if (opcode == 'Q') handleClassQuery (inBuffer, replyToKey);
- else if (opcode == 'S') handleSchemaQuery (inBuffer, replyToKey);
- else if (opcode == 'A') handleAttachRequest (inBuffer, replyToKey);
+ if (opcode == 'B') handleBrokerRequest (inBuffer, replyToKey, sequence);
+ else if (opcode == 'P') handlePackageQuery (inBuffer, replyToKey, sequence);
+ else if (opcode == 'p') handlePackageInd (inBuffer, replyToKey, sequence);
+ else if (opcode == 'Q') handleClassQuery (inBuffer, replyToKey, sequence);
+ else if (opcode == 'S') handleSchemaQuery (inBuffer, replyToKey, sequence);
+ else if (opcode == 'A') handleAttachRequest (inBuffer, replyToKey, sequence);
+ else if (opcode == 'G') handleGetRequest (inBuffer, replyToKey, sequence);
}
ManagementAgent::PackageMap::iterator ManagementAgent::FindOrAddPackage (std::string name)
@@ -528,7 +618,7 @@ ManagementAgent::PackageMap::iterator ManagementAgent::FindOrAddPackage (std::st
EncodePackageIndication (outBuffer, result.first);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, mExchange, "mgmt.schema.package");
+ SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package");
return result.first;
}
diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h
index 2acbe124bd..f2cd0373c0 100644
--- a/cpp/src/qpid/management/ManagementAgent.h
+++ b/cpp/src/qpid/management/ManagementAgent.h
@@ -25,19 +25,20 @@
#include "qpid/Options.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/Timer.h"
+#include "qpid/framing/Uuid.h"
#include "qpid/sys/Mutex.h"
#include "ManagementObject.h"
#include <qpid/framing/AMQFrame.h>
#include <boost/shared_ptr.hpp>
-namespace qpid {
+namespace qpid {
namespace management {
class ManagementAgent
{
private:
- ManagementAgent (uint16_t interval);
+ ManagementAgent (std::string dataDir, uint16_t interval);
public:
@@ -45,7 +46,7 @@ class ManagementAgent
typedef boost::shared_ptr<ManagementAgent> shared_ptr;
- static void enableManagement (void);
+ static void enableManagement (std::string dataDir, uint16_t interval);
static shared_ptr getAgent (void);
static void shutdown (void);
@@ -130,10 +131,12 @@ class ManagementAgent
static shared_ptr agent;
static bool enabled;
+ qpid::framing::Uuid uuid;
qpid::sys::RWlock userLock;
broker::Timer timer;
broker::Exchange::shared_ptr mExchange;
broker::Exchange::shared_ptr dExchange;
+ std::string dataDir;
uint16_t interval;
uint64_t nextObjectId;
uint32_t nextRemotePrefix;
@@ -143,8 +146,8 @@ class ManagementAgent
char outputBuffer[MA_BUFFER_SIZE];
void PeriodicProcessing (void);
- void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint8_t cls = 0);
- bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint8_t *cls);
+ void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
+ bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
void SendBuffer (qpid::framing::Buffer& buf,
uint32_t length,
broker::Exchange::shared_ptr exchange,
@@ -164,16 +167,17 @@ class ManagementAgent
PackageMap::iterator pIter,
ClassMap::iterator cIter);
uint32_t assignPrefix (uint32_t requestedPrefix);
- void handleHello (qpid::framing::Buffer& inBuffer, std::string replyToKey);
- void handlePackageQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey);
- void handlePackageInd (qpid::framing::Buffer& inBuffer, std::string replyToKey);
- void handleClassQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey);
- void handleSchemaQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey);
- void handleAttachRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey);
+ void sendCommandComplete (std::string replyToKey, uint32_t sequence,
+ uint32_t code = 0, std::string text = std::string("OK"));
+ void handleBrokerRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handlePackageQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handlePackageInd (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleClassQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleSchemaQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleAttachRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleGetRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
};
}}
-
-
#endif /*!_ManagementAgent_*/
diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h
index 87c3ccf22a..23042ad988 100644
--- a/cpp/src/qpid/management/ManagementObject.h
+++ b/cpp/src/qpid/management/ManagementObject.h
@@ -57,6 +57,7 @@ class ManagementObject
static const uint8_t TYPE_BOOL = 11;
static const uint8_t TYPE_FLOAT = 12;
static const uint8_t TYPE_DOUBLE = 13;
+ static const uint8_t TYPE_UUID = 14;
static const uint8_t ACCESS_RC = 1;
static const uint8_t ACCESS_RW = 2;
@@ -85,7 +86,8 @@ class ManagementObject
virtual writeSchemaCall_t getWriteSchemaCall (void) = 0;
virtual bool firstInstance (void) = 0;
virtual void writeConfig (qpid::framing::Buffer& buf) = 0;
- virtual void writeInstrumentation (qpid::framing::Buffer& buf) = 0;
+ virtual void writeInstrumentation (qpid::framing::Buffer& buf,
+ bool skipHeaders = false) = 0;
virtual void doMethod (std::string methodName,
qpid::framing::Buffer& inBuf,
qpid::framing::Buffer& outBuf) = 0;
diff --git a/python/mgmt-cli/managementdata.py b/python/mgmt-cli/managementdata.py
index 5b13594994..adff05a710 100644
--- a/python/mgmt-cli/managementdata.py
+++ b/python/mgmt-cli/managementdata.py
@@ -111,6 +111,10 @@ class ManagementData:
finally:
self.lock.release ()
+ def ctrlHandler (self, context, op, data):
+ if op == self.mclient.CTRL_BROKER_INFO:
+ pass
+
def configHandler (self, context, className, list, timestamps):
self.dataHandler (0, className, list, timestamps);
@@ -149,7 +153,7 @@ class ManagementData:
self.client.start ({"LOGIN": username, "PASSWORD": password})
self.channel = self.client.channel (1)
- self.mclient = managementClient (self.spec, None, self.configHandler,
+ self.mclient = managementClient (self.spec, self.ctrlHandler, self.configHandler,
self.instHandler, self.methodReply)
self.mclient.schemaListener (self.schemaHandler)
self.mch = managementChannel (self.channel, self.mclient.topicCb, self.mclient.replyCb)
@@ -194,6 +198,8 @@ class ManagementData:
return "False"
else:
return "True"
+ elif typecode == 14:
+ return str (UUID (bytes=value))
return "*type-error*"
def getObjIndex (self, className, config):
diff --git a/python/qpid/management.py b/python/qpid/management.py
index b5d992cf5d..33679cf0da 100644
--- a/python/qpid/management.py
+++ b/python/qpid/management.py
@@ -69,12 +69,14 @@ class managementChannel:
opens a session and performs all of the declarations and bindings needed
to participate in the management protocol. """
response = ch.session_open (detached_lifetime=300)
+ self.sessionId = response.session_id
self.topicName = "mgmt-" + base64.urlsafe_b64encode (response.session_id)
self.replyName = "reply-" + base64.urlsafe_b64encode (response.session_id)
self.qpidChannel = ch
self.tcb = topicCb
self.rcb = replyCb
self.context = cbContext
+ self.reqsOutstanding = 0
ch.queue_declare (queue=self.topicName, exclusive=1, auto_delete=1)
ch.queue_declare (queue=self.replyName, exclusive=1, auto_delete=1)
@@ -114,6 +116,10 @@ class managementClient:
network. It implements the management protocol and manages the management
schemas as advertised by the various management agents in the network. """
+ CTRL_BROKER_INFO = 1
+ CTRL_SCHEMA_LOADED = 2
+ CTRL_USER = 3
+
#========================================================
# User API - interacts with the class's user
#========================================================
@@ -144,7 +150,7 @@ class managementClient:
""" Register a new channel. """
self.channels.append (channel)
codec = Codec (StringIO (), self.spec)
- self.setHeader (codec, ord ('H'))
+ self.setHeader (codec, ord ('B'))
msg = Content (codec.stream.getvalue ())
msg["content_type"] = "application/octet-stream"
msg["routing_key"] = "agent"
@@ -161,6 +167,22 @@ class managementClient:
""" Invoke a method on a managed object. """
self.method (channel, userSequence, objId, className, methodName, args)
+ def getObjects (self, channel, userSequence, className):
+ """ Request immediate content from broker """
+ codec = Codec (StringIO (), self.spec)
+ self.setHeader (codec, ord ('G'), userSequence)
+ ft = {}
+ ft["_class"] = className
+ codec.encode_table (ft)
+ msg = Content (codec.stream.getvalue ())
+ msg["content_type"] = "application/octet-stream"
+ msg["routing_key"] = "agent"
+ msg["reply_to"] = self.spec.struct ("reply_to")
+ msg["reply_to"]["exchange_name"] = "amq.direct"
+ msg["reply_to"]["routing_key"] = channel.replyName
+ channel.send ("qpid.management", msg)
+
+
#========================================================
# Channel API - interacts with registered channel objects
#========================================================
@@ -182,9 +204,11 @@ class managementClient:
return
if hdr[0] == 'm':
- self.handleMethodReply (ch, codec)
- elif hdr[0] == 'I':
- self.handleInit (ch, codec)
+ self.handleMethodReply (ch, codec, hdr[1])
+ elif hdr[0] == 'z':
+ self.handleCommandComplete (ch, codec, hdr[1])
+ elif hdr[0] == 'b':
+ self.handleBrokerResponse (ch, codec)
elif hdr[0] == 'p':
self.handlePackageInd (ch, codec)
elif hdr[0] == 'q':
@@ -196,14 +220,13 @@ class managementClient:
#========================================================
# Internal Functions
#========================================================
- def setHeader (self, codec, opcode, cls = 0):
+ def setHeader (self, codec, opcode, seq = 0):
""" Compose the header of a management message. """
codec.encode_octet (ord ('A'))
codec.encode_octet (ord ('M'))
- codec.encode_octet (ord ('0'))
codec.encode_octet (ord ('1'))
codec.encode_octet (opcode)
- codec.encode_octet (cls)
+ codec.encode_long (seq)
def checkHeader (self, codec):
""" Check the header of a management message and extract the opcode and
@@ -215,14 +238,11 @@ class managementClient:
if octet != 'M':
return None
octet = chr (codec.decode_octet ())
- if octet != '0':
- return None
- octet = chr (codec.decode_octet ())
if octet != '1':
return None
opcode = chr (codec.decode_octet ())
- cls = chr (codec.decode_octet ())
- return (opcode, cls)
+ seq = codec.decode_long ()
+ return (opcode, seq)
def encodeValue (self, codec, value, typecode):
""" Encode, into the codec, a value based on its typecode. """
@@ -252,6 +272,8 @@ class managementClient:
codec.encode_float (float (value))
elif typecode == 13: # DOUBLE
codec.encode_double (double (value))
+ elif typecode == 14: # UUID
+ codec.encode_uuid (value)
else:
raise ValueError ("Invalid type code: %d" % typecode)
@@ -283,14 +305,24 @@ class managementClient:
data = codec.decode_float ()
elif typecode == 13: # DOUBLE
data = codec.decode_double ()
+ elif typecode == 14: # UUID
+ data = codec.decode_uuid ()
else:
raise ValueError ("Invalid type code: %d" % typecode)
return data
- def handleMethodReply (self, ch, codec):
- sequence = codec.decode_long ()
- status = codec.decode_long ()
- sText = codec.decode_shortstr ()
+ def incOutstanding (self, ch):
+ ch.reqsOutstanding = ch.reqsOutstanding + 1
+
+ def decOutstanding (self, ch):
+ ch.reqsOutstanding = ch.reqsOutstanding - 1
+ if ch.reqsOutstanding == 0:
+ if self.ctrlCb != None:
+ self.ctrlCb (ch.context, self.CTRL_SCHEMA_LOADED, None)
+
+ def handleMethodReply (self, ch, codec, sequence):
+ status = codec.decode_long ()
+ sText = codec.decode_shortstr ()
data = self.seqMgr.release (sequence)
if data == None:
@@ -317,15 +349,27 @@ class managementClient:
if self.methodCb != None:
self.methodCb (ch.context, userSequence, status, sText, args)
- def handleInit (self, ch, codec):
- len = codec.decode_short ()
- data = codec.decode_raw (len)
+ def handleCommandComplete (self, ch, codec, seq):
+ code = codec.decode_long ()
+ text = codec.decode_shortstr ()
+ data = (seq, code, text)
+ context = self.seqMgr.release (seq)
+ if context == "outstanding":
+ self.decOutstanding (ch)
+ elif self.ctrlCb != None:
+ self.ctrlCb (ch.context, self.CTRL_USER, data)
+
+ def handleBrokerResponse (self, ch, codec):
if self.ctrlCb != None:
- self.ctrlCb (ch.context, len, data)
+ uuid = codec.decode_uuid ()
+ data = (uuid, ch.sessionId)
+ self.ctrlCb (ch.context, self.CTRL_BROKER_INFO, data)
# Send a package request
sendCodec = Codec (StringIO (), self.spec)
- self.setHeader (sendCodec, ord ('P'))
+ seq = self.seqMgr.reserve ("outstanding")
+ self.setHeader (sendCodec, ord ('P'), seq)
+ self.incOutstanding (ch)
smsg = Content (sendCodec.stream.getvalue ())
smsg["content_type"] = "application/octet-stream"
smsg["routing_key"] = "agent"
@@ -341,7 +385,9 @@ class managementClient:
# Send a class request
sendCodec = Codec (StringIO (), self.spec)
- self.setHeader (sendCodec, ord ('Q'))
+ seq = self.seqMgr.reserve ("outstanding")
+ self.setHeader (sendCodec, ord ('Q'), seq)
+ self.incOutstanding (ch)
sendCodec.encode_shortstr (pname)
smsg = Content (sendCodec.stream.getvalue ())
smsg["content_type"] = "application/octet-stream"
@@ -362,6 +408,7 @@ class managementClient:
# Send a schema request
sendCodec = Codec (StringIO (), self.spec)
self.setHeader (sendCodec, ord ('S'))
+ self.incOutstanding (ch)
sendCodec.encode_shortstr (pname)
sendCodec.encode_shortstr (cname)
sendCodec.encode_bin128 (hash)
@@ -373,8 +420,9 @@ class managementClient:
smsg["reply_to"]["routing_key"] = ch.replyName
ch.send ("qpid.management", smsg)
- def parseSchema (self, ch, cls, codec):
+ def parseSchema (self, ch, codec):
""" Parse a received schema-description message. """
+ self.decOutstanding (ch)
packageName = codec.decode_shortstr ()
className = codec.decode_shortstr ()
hash = codec.decode_bin128 ()
@@ -495,7 +543,7 @@ class managementClient:
def parseContent (self, ch, cls, codec):
""" Parse a received content message. """
- if cls == 'C' and self.configCb == None:
+ if (cls == 'C' or cls == 'B') and self.configCb == None:
return
if cls == 'I' and self.instCb == None:
return
@@ -516,23 +564,39 @@ class managementClient:
timestamps.append (codec.decode_longlong ()) # Delete Time
schemaClass = self.schema[classKey]
- for element in schemaClass[cls][:]:
- tc = element[1]
- name = element[0]
- data = self.decodeValue (codec, tc)
- row.append ((name, data))
-
- if cls == 'C':
+ if cls == 'C' or cls == 'B':
+ for element in schemaClass['C'][:]:
+ tc = element[1]
+ name = element[0]
+ data = self.decodeValue (codec, tc)
+ row.append ((name, data))
+
+ if cls == 'I' or cls == 'B':
+ if cls == 'B':
+ start = 1
+ else:
+ start = 0
+ for element in schemaClass['I'][start:]:
+ tc = element[1]
+ name = element[0]
+ data = self.decodeValue (codec, tc)
+ row.append ((name, data))
+
+ if cls == 'C' or cls == 'B':
self.configCb (ch.context, classKey, row, timestamps)
elif cls == 'I':
self.instCb (ch.context, classKey, row, timestamps)
- def parse (self, ch, codec, opcode, cls):
+ def parse (self, ch, codec, opcode, seq):
""" Parse a message received from the topic queue. """
if opcode == 's':
- self.parseSchema (ch, cls, codec)
- elif opcode == 'C':
- self.parseContent (ch, cls, codec)
+ self.parseSchema (ch, codec)
+ elif opcode == 'c':
+ self.parseContent (ch, 'C', codec)
+ elif opcode == 'i':
+ self.parseContent (ch, 'I', codec)
+ elif opcode == 'g':
+ self.parseContent (ch, 'B', codec)
else:
raise ValueError ("Unknown opcode: %c" % opcode);
@@ -540,8 +604,7 @@ class managementClient:
""" Invoke a method on an object """
codec = Codec (StringIO (), self.spec)
sequence = self.seqMgr.reserve ((userSequence, classId, methodName))
- self.setHeader (codec, ord ('M'))
- codec.encode_long (sequence) # Method sequence id
+ self.setHeader (codec, ord ('M'), sequence)
codec.encode_longlong (objId) # ID of object
# Encode args according to schema
diff --git a/specs/management-schema.xml b/specs/management-schema.xml
index 33c41fb884..a704a95a2c 100644
--- a/specs/management-schema.xml
+++ b/specs/management-schema.xml
@@ -93,6 +93,15 @@
<!--
===============================================================
+ Management Agent
+ ===============================================================
+ -->
+ <class name="agent">
+ <configElement name="id" type="uuid" access="RO" index="y" desc="Agent ID"/>
+ </class>
+
+ <!--
+ ===============================================================
Virtual Host
===============================================================
-->
diff --git a/specs/management-types.xml b/specs/management-types.xml
index 6c86be3db1..7d77ea98a7 100644
--- a/specs/management-types.xml
+++ b/specs/management-types.xml
@@ -19,18 +19,19 @@
under the License.
-->
-<type name="objId" base="REF" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
-<type name="uint8" base="U8" cpp="uint8_t" encode="@.putOctet (#)" decode="# = @.getOctet ()" accessor="direct" init="0"/>
-<type name="uint16" base="U16" cpp="uint16_t" encode="@.putShort (#)" decode="# = @.getShort ()" accessor="direct" init="0"/>
-<type name="uint32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" accessor="direct" init="0"/>
-<type name="uint64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
-<type name="bool" base="BOOL" cpp="uint8_t" encode="@.putOctet (#?1:0)" decode="# = @.getOctet ()==1" accessor="direct" init="0"/>
-<type name="sstr" base="SSTR" cpp="std::string" encode="@.putShortString (#)" decode="@.getShortString (#)" accessor="direct" init='""'/>
-<type name="lstr" base="LSTR" cpp="std::string" encode="@.putLongString (#)" decode="@.getLongString (#)" accessor="direct" init='""'/>
-<type name="absTime" base="ABSTIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
-<type name="deltaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
-<type name="float" base="FLOAT" cpp="float" encode="@.putFloat (#)" decode="# = @.getFloat ()" accessor="direct" init="0."/>
-<type name="double" base="DOUBLE" cpp="double" encode="@.putDouble (#)" decode="# = @.getDouble ()" accessor="direct" init="0."/>
+<type name="objId" base="REF" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
+<type name="uint8" base="U8" cpp="uint8_t" encode="@.putOctet (#)" decode="# = @.getOctet ()" accessor="direct" init="0"/>
+<type name="uint16" base="U16" cpp="uint16_t" encode="@.putShort (#)" decode="# = @.getShort ()" accessor="direct" init="0"/>
+<type name="uint32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" accessor="direct" init="0"/>
+<type name="uint64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
+<type name="bool" base="BOOL" cpp="uint8_t" encode="@.putOctet (#?1:0)" decode="# = @.getOctet ()==1" accessor="direct" init="0"/>
+<type name="sstr" base="SSTR" cpp="std::string" encode="@.putShortString (#)" decode="@.getShortString (#)" accessor="direct" init='""'/>
+<type name="lstr" base="LSTR" cpp="std::string" encode="@.putLongString (#)" decode="@.getLongString (#)" accessor="direct" init='""'/>
+<type name="absTime" base="ABSTIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
+<type name="deltaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
+<type name="float" base="FLOAT" cpp="float" encode="@.putFloat (#)" decode="# = @.getFloat ()" accessor="direct" init="0."/>
+<type name="double" base="DOUBLE" cpp="double" encode="@.putDouble (#)" decode="# = @.getDouble ()" accessor="direct" init="0."/>
+<type name="uuid" base="UUID" cpp="framing::Uuid" encode="#.encode (@)" decode="#.decode (@)" accessor="direct"/>
<type name="hilo8" base="U8" cpp="uint8_t" encode="@.putOctet (#)" decode="# = @.getOctet ()" style="wm" accessor="counter" init="0"/>
<type name="hilo16" base="U16" cpp="uint16_t" encode="@.putShort (#)" decode="# = @.getShort ()" style="wm" accessor="counter" init="0"/>