diff options
author | Nuno Santos <nsantos@apache.org> | 2008-03-25 20:30:01 +0000 |
---|---|---|
committer | Nuno Santos <nsantos@apache.org> | 2008-03-25 20:30:01 +0000 |
commit | aefbf926e26e61460a5a11533361a8da9c11bb9c (patch) | |
tree | 3971bd2ad4ae9108b0b20239bf08020e9753b4ce /cpp | |
parent | 5444def1a124b7ef609ad2a585d333a4654c736a (diff) | |
download | qpid-python-aefbf926e26e61460a5a11533361a8da9c11bb9c.tar.gz |
QPID-877: applied patch from Ted Ross
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@640970 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/managementgen/templates/Class.cpp | 5 | ||||
-rw-r--r-- | cpp/managementgen/templates/Class.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 182 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 30 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementObject.h | 4 |
6 files changed, 165 insertions, 65 deletions
diff --git a/cpp/managementgen/templates/Class.cpp b/cpp/managementgen/templates/Class.cpp index 2a3f71e262..3c3dfff5a2 100644 --- a/cpp/managementgen/templates/Class.cpp +++ b/cpp/managementgen/templates/Class.cpp @@ -106,11 +106,12 @@ void /*MGEN:Class.NameCap*/::writeConfig (Buffer& buf) /*MGEN:Class.WriteConfig*/ } -void /*MGEN:Class.NameCap*/::writeInstrumentation (Buffer& buf) +void /*MGEN:Class.NameCap*/::writeInstrumentation (Buffer& buf, bool skipHeaders) { instChanged = false; - writeTimestamps (buf); + if (!skipHeaders) + writeTimestamps (buf); /*MGEN:Class.WriteInst*/ // Maintenance of hi-lo statistics diff --git a/cpp/managementgen/templates/Class.h b/cpp/managementgen/templates/Class.h index 82fac00d47..047d7cc950 100644 --- a/cpp/managementgen/templates/Class.h +++ b/cpp/managementgen/templates/Class.h @@ -25,8 +25,9 @@ #include "qpid/sys/Mutex.h" #include "qpid/management/ManagementObject.h" +#include "qpid/framing/Uuid.h" -namespace qpid { +namespace qpid { namespace management { class /*MGEN:Class.NameCap*/ : public ManagementObject @@ -45,7 +46,8 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject // Private Methods static void writeSchema (qpid::framing::Buffer& buf); void writeConfig (qpid::framing::Buffer& buf); - void writeInstrumentation (qpid::framing::Buffer& buf); + void writeInstrumentation (qpid::framing::Buffer& buf, + bool skipHeaders = false); void doMethod (std::string methodName, qpid::framing::Buffer& inBuf, qpid::framing::Buffer& outBuf); diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index bbcdb9cbce..a183ce9d02 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -127,7 +127,8 @@ Broker::Broker(const Broker::Options& conf) : dtxManager.setStore (store); if(conf.enableMgmt){ - ManagementAgent::enableManagement (); + ManagementAgent::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (), + conf.mgmtPubInterval); managementAgent = ManagementAgent::getAgent (); managementAgent->setInterval (conf.mgmtPubInterval); diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 46c780fc9f..a5ed84fb32 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -25,6 +25,8 @@ #include <qpid/broker/Message.h> #include <qpid/broker/MessageDelivery.h> #include <list> +#include <iostream> +#include <fstream> using boost::intrusive_ptr; using namespace qpid::framing; @@ -36,25 +38,62 @@ using namespace std; ManagementAgent::shared_ptr ManagementAgent::agent; bool ManagementAgent::enabled = 0; -ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval) +ManagementAgent::ManagementAgent (string _dataDir, uint16_t _interval) : + dataDir (_dataDir), interval (_interval) { timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval))); nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ())); nextRemotePrefix = 101; + + // Get from file or generate and save to file. + if (dataDir.empty ()) + { + uuid.generate (); + QPID_LOG (info, "ManagementAgent has no data directory, generated new broker ID: " + << uuid); + } + else + { + string filename (dataDir + "/brokerId"); + ifstream inFile (filename.c_str ()); + + if (inFile.good ()) + { + inFile >> uuid; + inFile.close (); + QPID_LOG (debug, "ManagementAgent restored broker ID: " << uuid); + } + else + { + uuid.generate (); + QPID_LOG (info, "ManagementAgent generated broker ID: " << uuid); + + ofstream outFile (filename.c_str ()); + if (outFile.good ()) + { + outFile << uuid << endl; + outFile.close (); + QPID_LOG (debug, "ManagementAgent saved broker ID"); + } + else + { + QPID_LOG (warning, "ManagementAgent unable to save broker ID"); + } + } + } } ManagementAgent::~ManagementAgent () {} -void ManagementAgent::enableManagement (void) +void ManagementAgent::enableManagement (string dataDir, uint16_t interval) { enabled = 1; + if (agent.get () == 0) + agent = shared_ptr (new ManagementAgent (dataDir, interval)); } ManagementAgent::shared_ptr ManagementAgent::getAgent (void) { - if (enabled && agent.get () == 0) - agent = shared_ptr (new ManagementAgent (10)); - return agent; } @@ -122,27 +161,25 @@ void ManagementAgent::clientAdded (void) } } -void ManagementAgent::EncodeHeader (Buffer& buf, uint8_t opcode, uint8_t cls) +void ManagementAgent::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) { buf.putOctet ('A'); buf.putOctet ('M'); - buf.putOctet ('0'); buf.putOctet ('1'); buf.putOctet (opcode); - buf.putOctet (cls); + buf.putLong (seq); } -bool ManagementAgent::CheckHeader (Buffer& buf, uint8_t *opcode, uint8_t *cls) +bool ManagementAgent::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) { uint8_t h1 = buf.getOctet (); uint8_t h2 = buf.getOctet (); uint8_t h3 = buf.getOctet (); - uint8_t h4 = buf.getOctet (); *opcode = buf.getOctet (); - *cls = buf.getOctet (); + *seq = buf.getLong (); - return h1 == 'A' && h2 == 'M' && h3 == '0' && h4 == '1'; + return h1 == 'A' && h2 == 'M' && h3 == '1'; } void ManagementAgent::SendBuffer (Buffer& buf, @@ -199,24 +236,24 @@ void ManagementAgent::PeriodicProcessing (void) if (object->getConfigChanged () || object->isDeleted ()) { Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer, 'C', 'C'); + EncodeHeader (msgBuffer, 'c'); object->writeConfig (msgBuffer); contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); - routingKey = "mgmt.config." + object->getClassName (); + routingKey = "mgmt." + uuid.str() + ".config." + object->getClassName (); SendBuffer (msgBuffer, contentSize, mExchange, routingKey); } if (object->getInstChanged ()) { Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer, 'C', 'I'); + EncodeHeader (msgBuffer, 'i'); object->writeInstrumentation (msgBuffer); contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); - routingKey = "mgmt.inst." + object->getClassName (); + routingKey = "mgmt." + uuid.str () + ".inst." + object->getClassName (); SendBuffer (msgBuffer, contentSize, mExchange, routingKey); } @@ -233,6 +270,20 @@ void ManagementAgent::PeriodicProcessing (void) deleteList.clear (); } +void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence, + uint32_t code, string text) +{ + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'z', sequence); + outBuffer.putLong (code); + outBuffer.putShortString (text); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); +} + void ManagementAgent::dispatchCommand (Deliverable& deliverable, const string& routingKey, const FieldTable* /*args*/) @@ -295,13 +346,13 @@ void ManagementAgent::dispatchMethod (Message& msg, Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - uint8_t opcode, unused; + uint32_t outLen, sequence; + uint8_t opcode; msg.encodeContent (inBuffer); inBuffer.reset (); - if (!CheckHeader (inBuffer, &opcode, &unused)) + if (!CheckHeader (inBuffer, &opcode, &sequence)) { QPID_LOG (debug, " Invalid content header"); return; @@ -313,8 +364,7 @@ void ManagementAgent::dispatchMethod (Message& msg, return; } - uint32_t methodId = inBuffer.getLong (); - uint64_t objId = inBuffer.getLongLong (); + uint64_t objId = inBuffer.getLongLong (); string replyToKey; const framing::MessageProperties* p = @@ -330,8 +380,7 @@ void ManagementAgent::dispatchMethod (Message& msg, return; } - EncodeHeader (outBuffer, 'm'); - outBuffer.putLong (methodId); + EncodeHeader (outBuffer, 'm', sequence); ManagementObjectMap::iterator iter = managementObjects.find (objId); if (iter == managementObjects.end ()) @@ -349,22 +398,20 @@ void ManagementAgent::dispatchMethod (Message& msg, SendBuffer (outBuffer, outLen, dExchange, replyToKey); } -void ManagementAgent::handleHello (Buffer&, string replyToKey) +void ManagementAgent::handleBrokerRequest (Buffer&, string replyToKey, uint32_t sequence) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - uint8_t* dat = (uint8_t*) "Broker ID"; - EncodeHeader (outBuffer, 'I'); - outBuffer.putShort (9); - outBuffer.putRawData (dat, 9); + EncodeHeader (outBuffer, 'b', sequence); + uuid.encode (outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); SendBuffer (outBuffer, outLen, dExchange, replyToKey); } -void ManagementAgent::handlePackageQuery (Buffer&, string replyToKey) +void ManagementAgent::handlePackageQuery (Buffer&, string replyToKey, uint32_t sequence) { for (PackageMap::iterator pIter = packages.begin (); pIter != packages.end (); @@ -373,15 +420,17 @@ void ManagementAgent::handlePackageQuery (Buffer&, string replyToKey) Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'p'); + EncodeHeader (outBuffer, 'p', sequence); EncodePackageIndication (outBuffer, pIter); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); SendBuffer (outBuffer, outLen, dExchange, replyToKey); } + + sendCommandComplete (replyToKey, sequence); } -void ManagementAgent::handlePackageInd (Buffer& inBuffer, string /*replyToKey*/) +void ManagementAgent::handlePackageInd (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/) { std::string packageName; @@ -389,7 +438,7 @@ void ManagementAgent::handlePackageInd (Buffer& inBuffer, string /*replyToKey*/) FindOrAddPackage (packageName); } -void ManagementAgent::handleClassQuery (Buffer& inBuffer, string replyToKey) +void ManagementAgent::handleClassQuery (Buffer& inBuffer, string replyToKey, uint32_t sequence) { std::string packageName; @@ -405,16 +454,18 @@ void ManagementAgent::handleClassQuery (Buffer& inBuffer, string replyToKey) Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'q'); + EncodeHeader (outBuffer, 'q', sequence); EncodeClassIndication (outBuffer, pIter, cIter); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); SendBuffer (outBuffer, outLen, dExchange, replyToKey); } } + + sendCommandComplete (replyToKey, sequence); } -void ManagementAgent::handleSchemaQuery (Buffer& inBuffer, string replyToKey) +void ManagementAgent::handleSchemaQuery (Buffer& inBuffer, string replyToKey, uint32_t sequence) { string packageName; SchemaClassKey key; @@ -436,7 +487,7 @@ void ManagementAgent::handleSchemaQuery (Buffer& inBuffer, string replyToKey) if (classInfo.writeSchemaCall != 0) { - EncodeHeader (outBuffer, 's'); + EncodeHeader (outBuffer, 's', sequence); classInfo.writeSchemaCall (outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); @@ -459,7 +510,7 @@ uint32_t ManagementAgent::assignPrefix (uint32_t /*requestedPrefix*/) return nextRemotePrefix++; } -void ManagementAgent::handleAttachRequest (Buffer& inBuffer, string replyToKey) +void ManagementAgent::handleAttachRequest (Buffer& inBuffer, string replyToKey, uint32_t sequence) { string label; uint32_t requestedPrefix; @@ -472,17 +523,55 @@ void ManagementAgent::handleAttachRequest (Buffer& inBuffer, string replyToKey) Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'a'); + EncodeHeader (outBuffer, 'a', sequence); outBuffer.putLong (assignedPrefix); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); SendBuffer (outBuffer, outLen, dExchange, replyToKey); } +void ManagementAgent::handleGetRequest (Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + FieldTable ft; + FieldTable::ValuePtr value; + + ft.decode (inBuffer); + value = ft.get ("_class"); + if (value->empty () || !value->convertsTo<string> ()) + { + // TODO: Send completion with an error code + return; + } + + string className (value->get<string> ()); + + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject::shared_ptr object = iter->second; + if (object->getClassName () == className) + { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'g', sequence); + object->writeConfig (outBuffer); + object->writeInstrumentation (outBuffer, true); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); + } + } + + sendCommandComplete (replyToKey, sequence); +} + void ManagementAgent::dispatchAgentCommand (Message& msg) { Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); - uint8_t opcode, unused; + uint8_t opcode; + uint32_t sequence; string replyToKey; const framing::MessageProperties* p = @@ -498,15 +587,16 @@ void ManagementAgent::dispatchAgentCommand (Message& msg) msg.encodeContent (inBuffer); inBuffer.reset (); - if (!CheckHeader (inBuffer, &opcode, &unused)) + if (!CheckHeader (inBuffer, &opcode, &sequence)) return; - if (opcode == 'H') handleHello (inBuffer, replyToKey); - else if (opcode == 'P') handlePackageQuery (inBuffer, replyToKey); - else if (opcode == 'p') handlePackageInd (inBuffer, replyToKey); - else if (opcode == 'Q') handleClassQuery (inBuffer, replyToKey); - else if (opcode == 'S') handleSchemaQuery (inBuffer, replyToKey); - else if (opcode == 'A') handleAttachRequest (inBuffer, replyToKey); + if (opcode == 'B') handleBrokerRequest (inBuffer, replyToKey, sequence); + else if (opcode == 'P') handlePackageQuery (inBuffer, replyToKey, sequence); + else if (opcode == 'p') handlePackageInd (inBuffer, replyToKey, sequence); + else if (opcode == 'Q') handleClassQuery (inBuffer, replyToKey, sequence); + else if (opcode == 'S') handleSchemaQuery (inBuffer, replyToKey, sequence); + else if (opcode == 'A') handleAttachRequest (inBuffer, replyToKey, sequence); + else if (opcode == 'G') handleGetRequest (inBuffer, replyToKey, sequence); } ManagementAgent::PackageMap::iterator ManagementAgent::FindOrAddPackage (std::string name) @@ -528,7 +618,7 @@ ManagementAgent::PackageMap::iterator ManagementAgent::FindOrAddPackage (std::st EncodePackageIndication (outBuffer, result.first); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, mExchange, "mgmt.schema.package"); + SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package"); return result.first; } diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index 2acbe124bd..f2cd0373c0 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -25,19 +25,20 @@ #include "qpid/Options.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/Timer.h" +#include "qpid/framing/Uuid.h" #include "qpid/sys/Mutex.h" #include "ManagementObject.h" #include <qpid/framing/AMQFrame.h> #include <boost/shared_ptr.hpp> -namespace qpid { +namespace qpid { namespace management { class ManagementAgent { private: - ManagementAgent (uint16_t interval); + ManagementAgent (std::string dataDir, uint16_t interval); public: @@ -45,7 +46,7 @@ class ManagementAgent typedef boost::shared_ptr<ManagementAgent> shared_ptr; - static void enableManagement (void); + static void enableManagement (std::string dataDir, uint16_t interval); static shared_ptr getAgent (void); static void shutdown (void); @@ -130,10 +131,12 @@ class ManagementAgent static shared_ptr agent; static bool enabled; + qpid::framing::Uuid uuid; qpid::sys::RWlock userLock; broker::Timer timer; broker::Exchange::shared_ptr mExchange; broker::Exchange::shared_ptr dExchange; + std::string dataDir; uint16_t interval; uint64_t nextObjectId; uint32_t nextRemotePrefix; @@ -143,8 +146,8 @@ class ManagementAgent char outputBuffer[MA_BUFFER_SIZE]; void PeriodicProcessing (void); - void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint8_t cls = 0); - bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint8_t *cls); + void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); + bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); void SendBuffer (qpid::framing::Buffer& buf, uint32_t length, broker::Exchange::shared_ptr exchange, @@ -164,16 +167,17 @@ class ManagementAgent PackageMap::iterator pIter, ClassMap::iterator cIter); uint32_t assignPrefix (uint32_t requestedPrefix); - void handleHello (qpid::framing::Buffer& inBuffer, std::string replyToKey); - void handlePackageQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey); - void handlePackageInd (qpid::framing::Buffer& inBuffer, std::string replyToKey); - void handleClassQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey); - void handleSchemaQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey); - void handleAttachRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey); + void sendCommandComplete (std::string replyToKey, uint32_t sequence, + uint32_t code = 0, std::string text = std::string("OK")); + void handleBrokerRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handlePackageQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handlePackageInd (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleClassQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleSchemaQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleAttachRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleGetRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); }; }} - - #endif /*!_ManagementAgent_*/ diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h index 87c3ccf22a..23042ad988 100644 --- a/cpp/src/qpid/management/ManagementObject.h +++ b/cpp/src/qpid/management/ManagementObject.h @@ -57,6 +57,7 @@ class ManagementObject static const uint8_t TYPE_BOOL = 11; static const uint8_t TYPE_FLOAT = 12; static const uint8_t TYPE_DOUBLE = 13; + static const uint8_t TYPE_UUID = 14; static const uint8_t ACCESS_RC = 1; static const uint8_t ACCESS_RW = 2; @@ -85,7 +86,8 @@ class ManagementObject virtual writeSchemaCall_t getWriteSchemaCall (void) = 0; virtual bool firstInstance (void) = 0; virtual void writeConfig (qpid::framing::Buffer& buf) = 0; - virtual void writeInstrumentation (qpid::framing::Buffer& buf) = 0; + virtual void writeInstrumentation (qpid::framing::Buffer& buf, + bool skipHeaders = false) = 0; virtual void doMethod (std::string methodName, qpid::framing::Buffer& inBuf, qpid::framing::Buffer& outBuf) = 0; |