diff options
-rw-r--r-- | cpp/managementgen/templates/Class.cpp | 5 | ||||
-rw-r--r-- | cpp/managementgen/templates/Class.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 182 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 30 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementObject.h | 4 | ||||
-rw-r--r-- | python/mgmt-cli/managementdata.py | 8 | ||||
-rw-r--r-- | python/qpid/management.py | 137 | ||||
-rw-r--r-- | specs/management-schema.xml | 9 | ||||
-rw-r--r-- | specs/management-types.xml | 25 |
10 files changed, 294 insertions, 115 deletions
diff --git a/cpp/managementgen/templates/Class.cpp b/cpp/managementgen/templates/Class.cpp index 2a3f71e262..3c3dfff5a2 100644 --- a/cpp/managementgen/templates/Class.cpp +++ b/cpp/managementgen/templates/Class.cpp @@ -106,11 +106,12 @@ void /*MGEN:Class.NameCap*/::writeConfig (Buffer& buf) /*MGEN:Class.WriteConfig*/ } -void /*MGEN:Class.NameCap*/::writeInstrumentation (Buffer& buf) +void /*MGEN:Class.NameCap*/::writeInstrumentation (Buffer& buf, bool skipHeaders) { instChanged = false; - writeTimestamps (buf); + if (!skipHeaders) + writeTimestamps (buf); /*MGEN:Class.WriteInst*/ // Maintenance of hi-lo statistics diff --git a/cpp/managementgen/templates/Class.h b/cpp/managementgen/templates/Class.h index 82fac00d47..047d7cc950 100644 --- a/cpp/managementgen/templates/Class.h +++ b/cpp/managementgen/templates/Class.h @@ -25,8 +25,9 @@ #include "qpid/sys/Mutex.h" #include "qpid/management/ManagementObject.h" +#include "qpid/framing/Uuid.h" -namespace qpid { +namespace qpid { namespace management { class /*MGEN:Class.NameCap*/ : public ManagementObject @@ -45,7 +46,8 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject // Private Methods static void writeSchema (qpid::framing::Buffer& buf); void writeConfig (qpid::framing::Buffer& buf); - void writeInstrumentation (qpid::framing::Buffer& buf); + void writeInstrumentation (qpid::framing::Buffer& buf, + bool skipHeaders = false); void doMethod (std::string methodName, qpid::framing::Buffer& inBuf, qpid::framing::Buffer& outBuf); diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index bbcdb9cbce..a183ce9d02 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -127,7 +127,8 @@ Broker::Broker(const Broker::Options& conf) : dtxManager.setStore (store); if(conf.enableMgmt){ - ManagementAgent::enableManagement (); + ManagementAgent::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (), + conf.mgmtPubInterval); managementAgent = ManagementAgent::getAgent (); managementAgent->setInterval (conf.mgmtPubInterval); diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 46c780fc9f..a5ed84fb32 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -25,6 +25,8 @@ #include <qpid/broker/Message.h> #include <qpid/broker/MessageDelivery.h> #include <list> +#include <iostream> +#include <fstream> using boost::intrusive_ptr; using namespace qpid::framing; @@ -36,25 +38,62 @@ using namespace std; ManagementAgent::shared_ptr ManagementAgent::agent; bool ManagementAgent::enabled = 0; -ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval) +ManagementAgent::ManagementAgent (string _dataDir, uint16_t _interval) : + dataDir (_dataDir), interval (_interval) { timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval))); nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ())); nextRemotePrefix = 101; + + // Get from file or generate and save to file. + if (dataDir.empty ()) + { + uuid.generate (); + QPID_LOG (info, "ManagementAgent has no data directory, generated new broker ID: " + << uuid); + } + else + { + string filename (dataDir + "/brokerId"); + ifstream inFile (filename.c_str ()); + + if (inFile.good ()) + { + inFile >> uuid; + inFile.close (); + QPID_LOG (debug, "ManagementAgent restored broker ID: " << uuid); + } + else + { + uuid.generate (); + QPID_LOG (info, "ManagementAgent generated broker ID: " << uuid); + + ofstream outFile (filename.c_str ()); + if (outFile.good ()) + { + outFile << uuid << endl; + outFile.close (); + QPID_LOG (debug, "ManagementAgent saved broker ID"); + } + else + { + QPID_LOG (warning, "ManagementAgent unable to save broker ID"); + } + } + } } ManagementAgent::~ManagementAgent () {} -void ManagementAgent::enableManagement (void) +void ManagementAgent::enableManagement (string dataDir, uint16_t interval) { enabled = 1; + if (agent.get () == 0) + agent = shared_ptr (new ManagementAgent (dataDir, interval)); } ManagementAgent::shared_ptr ManagementAgent::getAgent (void) { - if (enabled && agent.get () == 0) - agent = shared_ptr (new ManagementAgent (10)); - return agent; } @@ -122,27 +161,25 @@ void ManagementAgent::clientAdded (void) } } -void ManagementAgent::EncodeHeader (Buffer& buf, uint8_t opcode, uint8_t cls) +void ManagementAgent::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) { buf.putOctet ('A'); buf.putOctet ('M'); - buf.putOctet ('0'); buf.putOctet ('1'); buf.putOctet (opcode); - buf.putOctet (cls); + buf.putLong (seq); } -bool ManagementAgent::CheckHeader (Buffer& buf, uint8_t *opcode, uint8_t *cls) +bool ManagementAgent::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) { uint8_t h1 = buf.getOctet (); uint8_t h2 = buf.getOctet (); uint8_t h3 = buf.getOctet (); - uint8_t h4 = buf.getOctet (); *opcode = buf.getOctet (); - *cls = buf.getOctet (); + *seq = buf.getLong (); - return h1 == 'A' && h2 == 'M' && h3 == '0' && h4 == '1'; + return h1 == 'A' && h2 == 'M' && h3 == '1'; } void ManagementAgent::SendBuffer (Buffer& buf, @@ -199,24 +236,24 @@ void ManagementAgent::PeriodicProcessing (void) if (object->getConfigChanged () || object->isDeleted ()) { Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer, 'C', 'C'); + EncodeHeader (msgBuffer, 'c'); object->writeConfig (msgBuffer); contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); - routingKey = "mgmt.config." + object->getClassName (); + routingKey = "mgmt." + uuid.str() + ".config." + object->getClassName (); SendBuffer (msgBuffer, contentSize, mExchange, routingKey); } if (object->getInstChanged ()) { Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer, 'C', 'I'); + EncodeHeader (msgBuffer, 'i'); object->writeInstrumentation (msgBuffer); contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); - routingKey = "mgmt.inst." + object->getClassName (); + routingKey = "mgmt." + uuid.str () + ".inst." + object->getClassName (); SendBuffer (msgBuffer, contentSize, mExchange, routingKey); } @@ -233,6 +270,20 @@ void ManagementAgent::PeriodicProcessing (void) deleteList.clear (); } +void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence, + uint32_t code, string text) +{ + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'z', sequence); + outBuffer.putLong (code); + outBuffer.putShortString (text); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); +} + void ManagementAgent::dispatchCommand (Deliverable& deliverable, const string& routingKey, const FieldTable* /*args*/) @@ -295,13 +346,13 @@ void ManagementAgent::dispatchMethod (Message& msg, Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - uint8_t opcode, unused; + uint32_t outLen, sequence; + uint8_t opcode; msg.encodeContent (inBuffer); inBuffer.reset (); - if (!CheckHeader (inBuffer, &opcode, &unused)) + if (!CheckHeader (inBuffer, &opcode, &sequence)) { QPID_LOG (debug, " Invalid content header"); return; @@ -313,8 +364,7 @@ void ManagementAgent::dispatchMethod (Message& msg, return; } - uint32_t methodId = inBuffer.getLong (); - uint64_t objId = inBuffer.getLongLong (); + uint64_t objId = inBuffer.getLongLong (); string replyToKey; const framing::MessageProperties* p = @@ -330,8 +380,7 @@ void ManagementAgent::dispatchMethod (Message& msg, return; } - EncodeHeader (outBuffer, 'm'); - outBuffer.putLong (methodId); + EncodeHeader (outBuffer, 'm', sequence); ManagementObjectMap::iterator iter = managementObjects.find (objId); if (iter == managementObjects.end ()) @@ -349,22 +398,20 @@ void ManagementAgent::dispatchMethod (Message& msg, SendBuffer (outBuffer, outLen, dExchange, replyToKey); } -void ManagementAgent::handleHello (Buffer&, string replyToKey) +void ManagementAgent::handleBrokerRequest (Buffer&, string replyToKey, uint32_t sequence) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - uint8_t* dat = (uint8_t*) "Broker ID"; - EncodeHeader (outBuffer, 'I'); - outBuffer.putShort (9); - outBuffer.putRawData (dat, 9); + EncodeHeader (outBuffer, 'b', sequence); + uuid.encode (outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); SendBuffer (outBuffer, outLen, dExchange, replyToKey); } -void ManagementAgent::handlePackageQuery (Buffer&, string replyToKey) +void ManagementAgent::handlePackageQuery (Buffer&, string replyToKey, uint32_t sequence) { for (PackageMap::iterator pIter = packages.begin (); pIter != packages.end (); @@ -373,15 +420,17 @@ void ManagementAgent::handlePackageQuery (Buffer&, string replyToKey) Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'p'); + EncodeHeader (outBuffer, 'p', sequence); EncodePackageIndication (outBuffer, pIter); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); SendBuffer (outBuffer, outLen, dExchange, replyToKey); } + + sendCommandComplete (replyToKey, sequence); } -void ManagementAgent::handlePackageInd (Buffer& inBuffer, string /*replyToKey*/) +void ManagementAgent::handlePackageInd (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/) { std::string packageName; @@ -389,7 +438,7 @@ void ManagementAgent::handlePackageInd (Buffer& inBuffer, string /*replyToKey*/) FindOrAddPackage (packageName); } -void ManagementAgent::handleClassQuery (Buffer& inBuffer, string replyToKey) +void ManagementAgent::handleClassQuery (Buffer& inBuffer, string replyToKey, uint32_t sequence) { std::string packageName; @@ -405,16 +454,18 @@ void ManagementAgent::handleClassQuery (Buffer& inBuffer, string replyToKey) Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'q'); + EncodeHeader (outBuffer, 'q', sequence); EncodeClassIndication (outBuffer, pIter, cIter); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); SendBuffer (outBuffer, outLen, dExchange, replyToKey); } } + + sendCommandComplete (replyToKey, sequence); } -void ManagementAgent::handleSchemaQuery (Buffer& inBuffer, string replyToKey) +void ManagementAgent::handleSchemaQuery (Buffer& inBuffer, string replyToKey, uint32_t sequence) { string packageName; SchemaClassKey key; @@ -436,7 +487,7 @@ void ManagementAgent::handleSchemaQuery (Buffer& inBuffer, string replyToKey) if (classInfo.writeSchemaCall != 0) { - EncodeHeader (outBuffer, 's'); + EncodeHeader (outBuffer, 's', sequence); classInfo.writeSchemaCall (outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); @@ -459,7 +510,7 @@ uint32_t ManagementAgent::assignPrefix (uint32_t /*requestedPrefix*/) return nextRemotePrefix++; } -void ManagementAgent::handleAttachRequest (Buffer& inBuffer, string replyToKey) +void ManagementAgent::handleAttachRequest (Buffer& inBuffer, string replyToKey, uint32_t sequence) { string label; uint32_t requestedPrefix; @@ -472,17 +523,55 @@ void ManagementAgent::handleAttachRequest (Buffer& inBuffer, string replyToKey) Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'a'); + EncodeHeader (outBuffer, 'a', sequence); outBuffer.putLong (assignedPrefix); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); SendBuffer (outBuffer, outLen, dExchange, replyToKey); } +void ManagementAgent::handleGetRequest (Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + FieldTable ft; + FieldTable::ValuePtr value; + + ft.decode (inBuffer); + value = ft.get ("_class"); + if (value->empty () || !value->convertsTo<string> ()) + { + // TODO: Send completion with an error code + return; + } + + string className (value->get<string> ()); + + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject::shared_ptr object = iter->second; + if (object->getClassName () == className) + { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'g', sequence); + object->writeConfig (outBuffer); + object->writeInstrumentation (outBuffer, true); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); + } + } + + sendCommandComplete (replyToKey, sequence); +} + void ManagementAgent::dispatchAgentCommand (Message& msg) { Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); - uint8_t opcode, unused; + uint8_t opcode; + uint32_t sequence; string replyToKey; const framing::MessageProperties* p = @@ -498,15 +587,16 @@ void ManagementAgent::dispatchAgentCommand (Message& msg) msg.encodeContent (inBuffer); inBuffer.reset (); - if (!CheckHeader (inBuffer, &opcode, &unused)) + if (!CheckHeader (inBuffer, &opcode, &sequence)) return; - if (opcode == 'H') handleHello (inBuffer, replyToKey); - else if (opcode == 'P') handlePackageQuery (inBuffer, replyToKey); - else if (opcode == 'p') handlePackageInd (inBuffer, replyToKey); - else if (opcode == 'Q') handleClassQuery (inBuffer, replyToKey); - else if (opcode == 'S') handleSchemaQuery (inBuffer, replyToKey); - else if (opcode == 'A') handleAttachRequest (inBuffer, replyToKey); + if (opcode == 'B') handleBrokerRequest (inBuffer, replyToKey, sequence); + else if (opcode == 'P') handlePackageQuery (inBuffer, replyToKey, sequence); + else if (opcode == 'p') handlePackageInd (inBuffer, replyToKey, sequence); + else if (opcode == 'Q') handleClassQuery (inBuffer, replyToKey, sequence); + else if (opcode == 'S') handleSchemaQuery (inBuffer, replyToKey, sequence); + else if (opcode == 'A') handleAttachRequest (inBuffer, replyToKey, sequence); + else if (opcode == 'G') handleGetRequest (inBuffer, replyToKey, sequence); } ManagementAgent::PackageMap::iterator ManagementAgent::FindOrAddPackage (std::string name) @@ -528,7 +618,7 @@ ManagementAgent::PackageMap::iterator ManagementAgent::FindOrAddPackage (std::st EncodePackageIndication (outBuffer, result.first); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, mExchange, "mgmt.schema.package"); + SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package"); return result.first; } diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index 2acbe124bd..f2cd0373c0 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -25,19 +25,20 @@ #include "qpid/Options.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/Timer.h" +#include "qpid/framing/Uuid.h" #include "qpid/sys/Mutex.h" #include "ManagementObject.h" #include <qpid/framing/AMQFrame.h> #include <boost/shared_ptr.hpp> -namespace qpid { +namespace qpid { namespace management { class ManagementAgent { private: - ManagementAgent (uint16_t interval); + ManagementAgent (std::string dataDir, uint16_t interval); public: @@ -45,7 +46,7 @@ class ManagementAgent typedef boost::shared_ptr<ManagementAgent> shared_ptr; - static void enableManagement (void); + static void enableManagement (std::string dataDir, uint16_t interval); static shared_ptr getAgent (void); static void shutdown (void); @@ -130,10 +131,12 @@ class ManagementAgent static shared_ptr agent; static bool enabled; + qpid::framing::Uuid uuid; qpid::sys::RWlock userLock; broker::Timer timer; broker::Exchange::shared_ptr mExchange; broker::Exchange::shared_ptr dExchange; + std::string dataDir; uint16_t interval; uint64_t nextObjectId; uint32_t nextRemotePrefix; @@ -143,8 +146,8 @@ class ManagementAgent char outputBuffer[MA_BUFFER_SIZE]; void PeriodicProcessing (void); - void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint8_t cls = 0); - bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint8_t *cls); + void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); + bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); void SendBuffer (qpid::framing::Buffer& buf, uint32_t length, broker::Exchange::shared_ptr exchange, @@ -164,16 +167,17 @@ class ManagementAgent PackageMap::iterator pIter, ClassMap::iterator cIter); uint32_t assignPrefix (uint32_t requestedPrefix); - void handleHello (qpid::framing::Buffer& inBuffer, std::string replyToKey); - void handlePackageQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey); - void handlePackageInd (qpid::framing::Buffer& inBuffer, std::string replyToKey); - void handleClassQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey); - void handleSchemaQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey); - void handleAttachRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey); + void sendCommandComplete (std::string replyToKey, uint32_t sequence, + uint32_t code = 0, std::string text = std::string("OK")); + void handleBrokerRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handlePackageQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handlePackageInd (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleClassQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleSchemaQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleAttachRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleGetRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); }; }} - - #endif /*!_ManagementAgent_*/ diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h index 87c3ccf22a..23042ad988 100644 --- a/cpp/src/qpid/management/ManagementObject.h +++ b/cpp/src/qpid/management/ManagementObject.h @@ -57,6 +57,7 @@ class ManagementObject static const uint8_t TYPE_BOOL = 11; static const uint8_t TYPE_FLOAT = 12; static const uint8_t TYPE_DOUBLE = 13; + static const uint8_t TYPE_UUID = 14; static const uint8_t ACCESS_RC = 1; static const uint8_t ACCESS_RW = 2; @@ -85,7 +86,8 @@ class ManagementObject virtual writeSchemaCall_t getWriteSchemaCall (void) = 0; virtual bool firstInstance (void) = 0; virtual void writeConfig (qpid::framing::Buffer& buf) = 0; - virtual void writeInstrumentation (qpid::framing::Buffer& buf) = 0; + virtual void writeInstrumentation (qpid::framing::Buffer& buf, + bool skipHeaders = false) = 0; virtual void doMethod (std::string methodName, qpid::framing::Buffer& inBuf, qpid::framing::Buffer& outBuf) = 0; diff --git a/python/mgmt-cli/managementdata.py b/python/mgmt-cli/managementdata.py index 5b13594994..adff05a710 100644 --- a/python/mgmt-cli/managementdata.py +++ b/python/mgmt-cli/managementdata.py @@ -111,6 +111,10 @@ class ManagementData: finally: self.lock.release () + def ctrlHandler (self, context, op, data): + if op == self.mclient.CTRL_BROKER_INFO: + pass + def configHandler (self, context, className, list, timestamps): self.dataHandler (0, className, list, timestamps); @@ -149,7 +153,7 @@ class ManagementData: self.client.start ({"LOGIN": username, "PASSWORD": password}) self.channel = self.client.channel (1) - self.mclient = managementClient (self.spec, None, self.configHandler, + self.mclient = managementClient (self.spec, self.ctrlHandler, self.configHandler, self.instHandler, self.methodReply) self.mclient.schemaListener (self.schemaHandler) self.mch = managementChannel (self.channel, self.mclient.topicCb, self.mclient.replyCb) @@ -194,6 +198,8 @@ class ManagementData: return "False" else: return "True" + elif typecode == 14: + return str (UUID (bytes=value)) return "*type-error*" def getObjIndex (self, className, config): diff --git a/python/qpid/management.py b/python/qpid/management.py index b5d992cf5d..33679cf0da 100644 --- a/python/qpid/management.py +++ b/python/qpid/management.py @@ -69,12 +69,14 @@ class managementChannel: opens a session and performs all of the declarations and bindings needed to participate in the management protocol. """ response = ch.session_open (detached_lifetime=300) + self.sessionId = response.session_id self.topicName = "mgmt-" + base64.urlsafe_b64encode (response.session_id) self.replyName = "reply-" + base64.urlsafe_b64encode (response.session_id) self.qpidChannel = ch self.tcb = topicCb self.rcb = replyCb self.context = cbContext + self.reqsOutstanding = 0 ch.queue_declare (queue=self.topicName, exclusive=1, auto_delete=1) ch.queue_declare (queue=self.replyName, exclusive=1, auto_delete=1) @@ -114,6 +116,10 @@ class managementClient: network. It implements the management protocol and manages the management schemas as advertised by the various management agents in the network. """ + CTRL_BROKER_INFO = 1 + CTRL_SCHEMA_LOADED = 2 + CTRL_USER = 3 + #======================================================== # User API - interacts with the class's user #======================================================== @@ -144,7 +150,7 @@ class managementClient: """ Register a new channel. """ self.channels.append (channel) codec = Codec (StringIO (), self.spec) - self.setHeader (codec, ord ('H')) + self.setHeader (codec, ord ('B')) msg = Content (codec.stream.getvalue ()) msg["content_type"] = "application/octet-stream" msg["routing_key"] = "agent" @@ -161,6 +167,22 @@ class managementClient: """ Invoke a method on a managed object. """ self.method (channel, userSequence, objId, className, methodName, args) + def getObjects (self, channel, userSequence, className): + """ Request immediate content from broker """ + codec = Codec (StringIO (), self.spec) + self.setHeader (codec, ord ('G'), userSequence) + ft = {} + ft["_class"] = className + codec.encode_table (ft) + msg = Content (codec.stream.getvalue ()) + msg["content_type"] = "application/octet-stream" + msg["routing_key"] = "agent" + msg["reply_to"] = self.spec.struct ("reply_to") + msg["reply_to"]["exchange_name"] = "amq.direct" + msg["reply_to"]["routing_key"] = channel.replyName + channel.send ("qpid.management", msg) + + #======================================================== # Channel API - interacts with registered channel objects #======================================================== @@ -182,9 +204,11 @@ class managementClient: return if hdr[0] == 'm': - self.handleMethodReply (ch, codec) - elif hdr[0] == 'I': - self.handleInit (ch, codec) + self.handleMethodReply (ch, codec, hdr[1]) + elif hdr[0] == 'z': + self.handleCommandComplete (ch, codec, hdr[1]) + elif hdr[0] == 'b': + self.handleBrokerResponse (ch, codec) elif hdr[0] == 'p': self.handlePackageInd (ch, codec) elif hdr[0] == 'q': @@ -196,14 +220,13 @@ class managementClient: #======================================================== # Internal Functions #======================================================== - def setHeader (self, codec, opcode, cls = 0): + def setHeader (self, codec, opcode, seq = 0): """ Compose the header of a management message. """ codec.encode_octet (ord ('A')) codec.encode_octet (ord ('M')) - codec.encode_octet (ord ('0')) codec.encode_octet (ord ('1')) codec.encode_octet (opcode) - codec.encode_octet (cls) + codec.encode_long (seq) def checkHeader (self, codec): """ Check the header of a management message and extract the opcode and @@ -215,14 +238,11 @@ class managementClient: if octet != 'M': return None octet = chr (codec.decode_octet ()) - if octet != '0': - return None - octet = chr (codec.decode_octet ()) if octet != '1': return None opcode = chr (codec.decode_octet ()) - cls = chr (codec.decode_octet ()) - return (opcode, cls) + seq = codec.decode_long () + return (opcode, seq) def encodeValue (self, codec, value, typecode): """ Encode, into the codec, a value based on its typecode. """ @@ -252,6 +272,8 @@ class managementClient: codec.encode_float (float (value)) elif typecode == 13: # DOUBLE codec.encode_double (double (value)) + elif typecode == 14: # UUID + codec.encode_uuid (value) else: raise ValueError ("Invalid type code: %d" % typecode) @@ -283,14 +305,24 @@ class managementClient: data = codec.decode_float () elif typecode == 13: # DOUBLE data = codec.decode_double () + elif typecode == 14: # UUID + data = codec.decode_uuid () else: raise ValueError ("Invalid type code: %d" % typecode) return data - def handleMethodReply (self, ch, codec): - sequence = codec.decode_long () - status = codec.decode_long () - sText = codec.decode_shortstr () + def incOutstanding (self, ch): + ch.reqsOutstanding = ch.reqsOutstanding + 1 + + def decOutstanding (self, ch): + ch.reqsOutstanding = ch.reqsOutstanding - 1 + if ch.reqsOutstanding == 0: + if self.ctrlCb != None: + self.ctrlCb (ch.context, self.CTRL_SCHEMA_LOADED, None) + + def handleMethodReply (self, ch, codec, sequence): + status = codec.decode_long () + sText = codec.decode_shortstr () data = self.seqMgr.release (sequence) if data == None: @@ -317,15 +349,27 @@ class managementClient: if self.methodCb != None: self.methodCb (ch.context, userSequence, status, sText, args) - def handleInit (self, ch, codec): - len = codec.decode_short () - data = codec.decode_raw (len) + def handleCommandComplete (self, ch, codec, seq): + code = codec.decode_long () + text = codec.decode_shortstr () + data = (seq, code, text) + context = self.seqMgr.release (seq) + if context == "outstanding": + self.decOutstanding (ch) + elif self.ctrlCb != None: + self.ctrlCb (ch.context, self.CTRL_USER, data) + + def handleBrokerResponse (self, ch, codec): if self.ctrlCb != None: - self.ctrlCb (ch.context, len, data) + uuid = codec.decode_uuid () + data = (uuid, ch.sessionId) + self.ctrlCb (ch.context, self.CTRL_BROKER_INFO, data) # Send a package request sendCodec = Codec (StringIO (), self.spec) - self.setHeader (sendCodec, ord ('P')) + seq = self.seqMgr.reserve ("outstanding") + self.setHeader (sendCodec, ord ('P'), seq) + self.incOutstanding (ch) smsg = Content (sendCodec.stream.getvalue ()) smsg["content_type"] = "application/octet-stream" smsg["routing_key"] = "agent" @@ -341,7 +385,9 @@ class managementClient: # Send a class request sendCodec = Codec (StringIO (), self.spec) - self.setHeader (sendCodec, ord ('Q')) + seq = self.seqMgr.reserve ("outstanding") + self.setHeader (sendCodec, ord ('Q'), seq) + self.incOutstanding (ch) sendCodec.encode_shortstr (pname) smsg = Content (sendCodec.stream.getvalue ()) smsg["content_type"] = "application/octet-stream" @@ -362,6 +408,7 @@ class managementClient: # Send a schema request sendCodec = Codec (StringIO (), self.spec) self.setHeader (sendCodec, ord ('S')) + self.incOutstanding (ch) sendCodec.encode_shortstr (pname) sendCodec.encode_shortstr (cname) sendCodec.encode_bin128 (hash) @@ -373,8 +420,9 @@ class managementClient: smsg["reply_to"]["routing_key"] = ch.replyName ch.send ("qpid.management", smsg) - def parseSchema (self, ch, cls, codec): + def parseSchema (self, ch, codec): """ Parse a received schema-description message. """ + self.decOutstanding (ch) packageName = codec.decode_shortstr () className = codec.decode_shortstr () hash = codec.decode_bin128 () @@ -495,7 +543,7 @@ class managementClient: def parseContent (self, ch, cls, codec): """ Parse a received content message. """ - if cls == 'C' and self.configCb == None: + if (cls == 'C' or cls == 'B') and self.configCb == None: return if cls == 'I' and self.instCb == None: return @@ -516,23 +564,39 @@ class managementClient: timestamps.append (codec.decode_longlong ()) # Delete Time schemaClass = self.schema[classKey] - for element in schemaClass[cls][:]: - tc = element[1] - name = element[0] - data = self.decodeValue (codec, tc) - row.append ((name, data)) - - if cls == 'C': + if cls == 'C' or cls == 'B': + for element in schemaClass['C'][:]: + tc = element[1] + name = element[0] + data = self.decodeValue (codec, tc) + row.append ((name, data)) + + if cls == 'I' or cls == 'B': + if cls == 'B': + start = 1 + else: + start = 0 + for element in schemaClass['I'][start:]: + tc = element[1] + name = element[0] + data = self.decodeValue (codec, tc) + row.append ((name, data)) + + if cls == 'C' or cls == 'B': self.configCb (ch.context, classKey, row, timestamps) elif cls == 'I': self.instCb (ch.context, classKey, row, timestamps) - def parse (self, ch, codec, opcode, cls): + def parse (self, ch, codec, opcode, seq): """ Parse a message received from the topic queue. """ if opcode == 's': - self.parseSchema (ch, cls, codec) - elif opcode == 'C': - self.parseContent (ch, cls, codec) + self.parseSchema (ch, codec) + elif opcode == 'c': + self.parseContent (ch, 'C', codec) + elif opcode == 'i': + self.parseContent (ch, 'I', codec) + elif opcode == 'g': + self.parseContent (ch, 'B', codec) else: raise ValueError ("Unknown opcode: %c" % opcode); @@ -540,8 +604,7 @@ class managementClient: """ Invoke a method on an object """ codec = Codec (StringIO (), self.spec) sequence = self.seqMgr.reserve ((userSequence, classId, methodName)) - self.setHeader (codec, ord ('M')) - codec.encode_long (sequence) # Method sequence id + self.setHeader (codec, ord ('M'), sequence) codec.encode_longlong (objId) # ID of object # Encode args according to schema diff --git a/specs/management-schema.xml b/specs/management-schema.xml index 33c41fb884..a704a95a2c 100644 --- a/specs/management-schema.xml +++ b/specs/management-schema.xml @@ -93,6 +93,15 @@ <!-- =============================================================== + Management Agent + =============================================================== + --> + <class name="agent"> + <configElement name="id" type="uuid" access="RO" index="y" desc="Agent ID"/> + </class> + + <!-- + =============================================================== Virtual Host =============================================================== --> diff --git a/specs/management-types.xml b/specs/management-types.xml index 6c86be3db1..7d77ea98a7 100644 --- a/specs/management-types.xml +++ b/specs/management-types.xml @@ -19,18 +19,19 @@ under the License. --> -<type name="objId" base="REF" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/> -<type name="uint8" base="U8" cpp="uint8_t" encode="@.putOctet (#)" decode="# = @.getOctet ()" accessor="direct" init="0"/> -<type name="uint16" base="U16" cpp="uint16_t" encode="@.putShort (#)" decode="# = @.getShort ()" accessor="direct" init="0"/> -<type name="uint32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" accessor="direct" init="0"/> -<type name="uint64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/> -<type name="bool" base="BOOL" cpp="uint8_t" encode="@.putOctet (#?1:0)" decode="# = @.getOctet ()==1" accessor="direct" init="0"/> -<type name="sstr" base="SSTR" cpp="std::string" encode="@.putShortString (#)" decode="@.getShortString (#)" accessor="direct" init='""'/> -<type name="lstr" base="LSTR" cpp="std::string" encode="@.putLongString (#)" decode="@.getLongString (#)" accessor="direct" init='""'/> -<type name="absTime" base="ABSTIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/> -<type name="deltaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/> -<type name="float" base="FLOAT" cpp="float" encode="@.putFloat (#)" decode="# = @.getFloat ()" accessor="direct" init="0."/> -<type name="double" base="DOUBLE" cpp="double" encode="@.putDouble (#)" decode="# = @.getDouble ()" accessor="direct" init="0."/> +<type name="objId" base="REF" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/> +<type name="uint8" base="U8" cpp="uint8_t" encode="@.putOctet (#)" decode="# = @.getOctet ()" accessor="direct" init="0"/> +<type name="uint16" base="U16" cpp="uint16_t" encode="@.putShort (#)" decode="# = @.getShort ()" accessor="direct" init="0"/> +<type name="uint32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" accessor="direct" init="0"/> +<type name="uint64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/> +<type name="bool" base="BOOL" cpp="uint8_t" encode="@.putOctet (#?1:0)" decode="# = @.getOctet ()==1" accessor="direct" init="0"/> +<type name="sstr" base="SSTR" cpp="std::string" encode="@.putShortString (#)" decode="@.getShortString (#)" accessor="direct" init='""'/> +<type name="lstr" base="LSTR" cpp="std::string" encode="@.putLongString (#)" decode="@.getLongString (#)" accessor="direct" init='""'/> +<type name="absTime" base="ABSTIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/> +<type name="deltaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/> +<type name="float" base="FLOAT" cpp="float" encode="@.putFloat (#)" decode="# = @.getFloat ()" accessor="direct" init="0."/> +<type name="double" base="DOUBLE" cpp="double" encode="@.putDouble (#)" decode="# = @.getDouble ()" accessor="direct" init="0."/> +<type name="uuid" base="UUID" cpp="framing::Uuid" encode="#.encode (@)" decode="#.decode (@)" accessor="direct"/> <type name="hilo8" base="U8" cpp="uint8_t" encode="@.putOctet (#)" decode="# = @.getOctet ()" style="wm" accessor="counter" init="0"/> <type name="hilo16" base="U16" cpp="uint16_t" encode="@.putShort (#)" decode="# = @.getShort ()" style="wm" accessor="counter" init="0"/> |