diff options
author | Alan Conway <aconway@apache.org> | 2007-11-15 20:49:25 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-11-15 20:49:25 +0000 |
commit | c21e3f400f7c264dc0328cea9083d625c80e8845 (patch) | |
tree | 26a36552949c41f2086f1a9cf8cee7cd3c67b2a7 /qpid/cpp/src | |
parent | eb1561a655940a5d32da698187ff05e845e6f093 (diff) | |
download | qpid-python-c21e3f400f7c264dc0328cea9083d625c80e8845.tar.gz |
QPID-687: comitted qpid-patch7-cpp.diff qpid-patch7-python.diff
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@595453 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/management/Broker.cpp | 176 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/Broker.h | 18 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.cpp | 181 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementObject.cpp | 36 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementObject.h | 66 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/Queue.cpp | 282 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/Queue.h | 18 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/Vhost.cpp | 32 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/Vhost.h | 18 |
10 files changed, 555 insertions, 277 deletions
diff --git a/qpid/cpp/src/qpid/management/Broker.cpp b/qpid/cpp/src/qpid/management/Broker.cpp index 8626654c43..2c27512669 100644 --- a/qpid/cpp/src/qpid/management/Broker.cpp +++ b/qpid/cpp/src/qpid/management/Broker.cpp @@ -21,6 +21,7 @@ #include "config.h" #include "qpid/broker/Broker.h" +#include "qpid/framing/FieldTable.h" #include "Broker.h" #include "ArgsBrokerEcho.h" @@ -31,7 +32,7 @@ using namespace qpid::framing; bool Broker::schemaNeeded = true; Broker::Broker (Manageable* _core, const Options& _conf) : - ManagementObject (_core) + ManagementObject (_core, "broker") { broker::Broker::Options& conf = (broker::Broker::Options&) _conf; @@ -54,28 +55,149 @@ Broker::~Broker () {} void Broker::writeSchema (Buffer& buf) { + FieldTable ft; + FieldTable arg; + schemaNeeded = false; - schemaListBegin (buf); - schemaItem (buf, TYPE_UINT32, "systemRef", "System ID", true, true); - schemaItem (buf, TYPE_UINT16, "port", "TCP Port for AMQP Service", true, true); - schemaItem (buf, TYPE_UINT16, "workerThreads", "Thread pool size", true); - schemaItem (buf, TYPE_UINT16, "maxConns", "Maximum allowed connections", true); - schemaItem (buf, TYPE_UINT16, "connBacklog", - "Connection backlog limit for listening socket", true); - schemaItem (buf, TYPE_UINT32, "stagingThreshold", - "Broker stages messages over this size to disk", true); - schemaItem (buf, TYPE_STRING, "storeLib", "Name of persistent storage library", true); - schemaItem (buf, TYPE_UINT8, "asyncStore", "Use async persistent store", true); - schemaItem (buf, TYPE_UINT16, "mgmtPubInterval", "Interval for management broadcasts", true); - schemaItem (buf, TYPE_UINT32, "initialDiskPageSize", - "Number of disk pages allocated for storage", true); - schemaItem (buf, TYPE_UINT32, "initialPagesPerQueue", - "Number of disk pages allocated per queue", true); - schemaItem (buf, TYPE_STRING, "clusterName", - "Name of cluster this server is a member of, zero-length for standalone server", true); - schemaItem (buf, TYPE_STRING, "version", "Running software version", true); - schemaListEnd (buf); + // Schema class header: + buf.putShortString (className); // Class Name + buf.putShort (13); // Config Element Count + buf.putShort (0); // Inst Element Count + buf.putShort (1); // Method Count + buf.putShort (0); // Event Count + + // Config Elements + ft = FieldTable (); + ft.setString ("name", "systemRef"); + ft.setInt ("type", TYPE_U64); + ft.setInt ("access", ACCESS_RC); + ft.setInt ("index", 1); + ft.setString ("desc", "System ID"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "port"); + ft.setInt ("type", TYPE_U16); + ft.setInt ("access", ACCESS_RC); + ft.setInt ("index", 1); + ft.setString ("desc", "TCP Port for AMQP Service"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "workerThreads"); + ft.setInt ("type", TYPE_U16); + ft.setInt ("access", ACCESS_RO); + ft.setInt ("index", 0); + ft.setString ("desc", "Thread pool size"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "maxConns"); + ft.setInt ("type", TYPE_U16); + ft.setInt ("access", ACCESS_RO); + ft.setInt ("index", 0); + ft.setString ("desc", "Maximum allowed connections"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "connBacklog"); + ft.setInt ("type", TYPE_U16); + ft.setInt ("access", ACCESS_RO); + ft.setInt ("index", 0); + ft.setString ("desc", "Connection backlog limit for listening socket"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "stagingThreshold"); + ft.setInt ("type", TYPE_U32); + ft.setInt ("access", ACCESS_RO); + ft.setInt ("index", 0); + ft.setString ("desc", "Broker stages messages over this size to disk"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "storeLib"); + ft.setInt ("type", TYPE_SSTR); + ft.setInt ("access", ACCESS_RO); + ft.setInt ("index", 0); + ft.setString ("desc", "Name of persistent storage library"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "asyncStore"); + ft.setInt ("type", TYPE_U8); + ft.setInt ("access", ACCESS_RO); + ft.setInt ("index", 0); + ft.setString ("desc", "Use async persistent store"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "mgmtPubInterval"); + ft.setInt ("type", TYPE_U16); + ft.setInt ("access", ACCESS_RW); + ft.setInt ("index", 0); + ft.setInt ("min", 1); + ft.setString ("unit", "second"); + ft.setString ("desc", "Interval for management broadcasts"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "initialDiskPageSize"); + ft.setInt ("type", TYPE_U32); + ft.setInt ("access", ACCESS_RO); + ft.setInt ("index", 0); + ft.setString ("desc", "Number of disk pages allocated for storage"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "initialPagesPerQueue"); + ft.setInt ("type", TYPE_U32); + ft.setInt ("access", ACCESS_RO); + ft.setInt ("index", 0); + ft.setString ("desc", "Number of disk pages allocated per queue"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "clusterName"); + ft.setInt ("type", TYPE_SSTR); + ft.setInt ("access", ACCESS_RO); + ft.setInt ("index", 0); + ft.setString ("desc", "Name of cluster this server is a member of, zero-length for standalone server"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "version"); + ft.setInt ("type", TYPE_SSTR); + ft.setInt ("access", ACCESS_RO); + ft.setInt ("index", 0); + ft.setString ("desc", "Running software version"); + buf.put (ft); + + // Inst Elements + + return; // TODO - Remove + + // Methods + ft = FieldTable (); + ft.setString ("name", "echo"); + ft.setInt ("args", 2); + + arg = FieldTable (); + arg.setString ("name", "sequence"); + arg.setInt ("type", TYPE_U32); + arg.setInt ("dir", DIR_IO); + ft.setTable ("arg", arg); + + arg = FieldTable (); + arg.setString ("name", "body"); + arg.setInt ("type", TYPE_LSTR); + arg.setInt ("dir", DIR_IO); + ft.setTable ("arg", arg); + + buf.put (ft); + + // Events } void Broker::writeConfig (Buffer& buf) @@ -83,7 +205,7 @@ void Broker::writeConfig (Buffer& buf) configChanged = false; writeTimestamps (buf); - buf.putLong (0); + buf.putLongLong (0); buf.putShort (port); buf.putShort (workerThreads); buf.putShort (maxConns); @@ -99,8 +221,8 @@ void Broker::writeConfig (Buffer& buf) } void Broker::doMethod (string methodName, - Buffer& inBuf, - Buffer& outBuf) + Buffer& inBuf, + Buffer& outBuf) { if (methodName.compare ("echo") == 0) { @@ -117,6 +239,12 @@ void Broker::doMethod (string methodName, outBuf.putLong (args.io_sequence); outBuf.putLongString (args.io_body); } + + // TODO - Remove this method prior to beta + else if (methodName.compare ("crash") == 0) + { + assert (0); + } else { outBuf.putLong (1); diff --git a/qpid/cpp/src/qpid/management/Broker.h b/qpid/cpp/src/qpid/management/Broker.h index 91fddd3724..2a8ef153d1 100644 --- a/qpid/cpp/src/qpid/management/Broker.h +++ b/qpid/cpp/src/qpid/management/Broker.h @@ -56,16 +56,14 @@ class Broker : public ManagementObject std::string clusterName; std::string version; - uint16_t getObjectType (void) { return OBJECT_BROKER; } - std::string getObjectName (void) { return "broker"; } - void writeSchema (qpid::framing::Buffer& buf); - void writeConfig (qpid::framing::Buffer& buf); - void writeInstrumentation (qpid::framing::Buffer& /*buf*/) {} - bool getSchemaNeeded (void) { return schemaNeeded; } - void setSchemaNeeded (void) { schemaNeeded = true; } - void doMethod (std::string methodName, - qpid::framing::Buffer& inBuf, - qpid::framing::Buffer& outBuf); + void writeSchema (qpid::framing::Buffer& buf); + void writeConfig (qpid::framing::Buffer& buf); + void writeInstrumentation (qpid::framing::Buffer& /*buf*/) {} + bool getSchemaNeeded (void) { return schemaNeeded; } + void setSchemaNeeded (void) { schemaNeeded = true; } + void doMethod (std::string methodName, + qpid::framing::Buffer& inBuf, + qpid::framing::Buffer& outBuf); inline bool getInstChanged (void) { return false; } }; diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 2f7f25b9d6..6636c59bf5 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -84,26 +84,60 @@ void ManagementAgent::clientAdded (void) } } +void ManagementAgent::EncodeHeader (Buffer& buf) +{ + buf.putOctet ('A'); + buf.putOctet ('M'); + buf.putOctet ('0'); + buf.putOctet ('1'); +} + +void ManagementAgent::SendBuffer (Buffer& buf, + uint32_t length, + Exchange::shared_ptr exchange, + string routingKey) +{ + intrusive_ptr<Message> msg (new Message ()); + AMQFrame method (0, MessageTransferBody(ProtocolVersion(), + 0, exchange->getName (), 0, 0)); + AMQFrame header (0, AMQHeaderBody()); + AMQFrame content; + + QPID_LOG (debug, "ManagementAgent::SendBuffer - key=" + << routingKey << " len=" << length); + + content.setBody(AMQContentBody()); + content.castBody<AMQContentBody>()->decode(buf, length); + + method.setEof (false); + header.setBof (false); + header.setEof (false); + content.setBof (false); + + msg->getFrames().append(method); + msg->getFrames().append(header); + + MessageProperties* props = + msg->getFrames().getHeaders()->get<MessageProperties>(true); + props->setContentLength(length); + msg->getFrames().append(content); + + DeliverableMessage deliverable (msg); + exchange->route (deliverable, routingKey, 0); +} + void ManagementAgent::PeriodicProcessing (void) { #define BUFSIZE 65536 #define THRESHOLD 16384 char msgChars[BUFSIZE]; - Buffer msgBuffer (msgChars, BUFSIZE); uint32_t contentSize; + string routingKey; std::list<uint64_t> deleteList; if (managementObjects.empty ()) return; - intrusive_ptr<Message> msg (new Message ()); - - // Build the magic number for the management message. - msgBuffer.putOctet ('A'); - msgBuffer.putOctet ('M'); - msgBuffer.putOctet ('0'); - msgBuffer.putOctet ('1'); - for (ManagementObjectMap::iterator iter = managementObjects.begin (); iter != managementObjects.end (); iter++) @@ -112,105 +146,56 @@ void ManagementAgent::PeriodicProcessing (void) if (object->getSchemaNeeded ()) { - uint32_t startAvail = msgBuffer.available (); - uint32_t recordLength; - + Buffer msgBuffer (msgChars, BUFSIZE); + EncodeHeader (msgBuffer); msgBuffer.putOctet ('S'); // opcode = Schema Record msgBuffer.putOctet (0); // content-class = N/A - msgBuffer.putShort (object->getObjectType ()); - msgBuffer.record (); // Record the position of the length field - msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length - object->writeSchema (msgBuffer); - recordLength = startAvail - msgBuffer.available (); - msgBuffer.restore (true); // Restore pointer to length field - msgBuffer.putLong (recordLength); - msgBuffer.restore (); // Re-restore to get to the end of the buffer + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + routingKey = "mgmt.schema." + object->getClassName (); + SendBuffer (msgBuffer, contentSize, mExchange, routingKey); } if (object->getConfigChanged ()) { - uint32_t startAvail = msgBuffer.available (); - uint32_t recordLength; - + Buffer msgBuffer (msgChars, BUFSIZE); + EncodeHeader (msgBuffer); msgBuffer.putOctet ('C'); // opcode = Content Record msgBuffer.putOctet ('C'); // content-class = Configuration - msgBuffer.putShort (object->getObjectType ()); - msgBuffer.record (); // Record the position of the length field - msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length - object->writeConfig (msgBuffer); - recordLength = startAvail - msgBuffer.available (); - msgBuffer.restore (true); // Restore pointer to length field - msgBuffer.putLong (recordLength); - msgBuffer.restore (); // Re-restore to get to the end of the buffer + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + routingKey = "mgmt.config." + object->getClassName (); + SendBuffer (msgBuffer, contentSize, mExchange, routingKey); } if (object->getInstChanged ()) { - uint32_t startAvail = msgBuffer.available (); - uint32_t recordLength; - + Buffer msgBuffer (msgChars, BUFSIZE); + EncodeHeader (msgBuffer); msgBuffer.putOctet ('C'); // opcode = Content Record msgBuffer.putOctet ('I'); // content-class = Instrumentation - msgBuffer.putShort (object->getObjectType ()); - msgBuffer.record (); // Record the position of the length field - msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length - object->writeInstrumentation (msgBuffer); - recordLength = startAvail - msgBuffer.available (); - msgBuffer.restore (true); // Restore pointer to length field - msgBuffer.putLong (recordLength); - msgBuffer.restore (); // Re-restore to get to the end of the buffer + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + routingKey = "mgmt.inst." + object->getClassName (); + SendBuffer (msgBuffer, contentSize, mExchange, routingKey); } if (object->isDeleted ()) deleteList.push_back (iter->first); - - // Temporary protection against buffer overrun. - // This needs to be replaced with frame fragmentation. - if (msgBuffer.available () < THRESHOLD) - break; } - msgBuffer.putOctet ('X'); // End-of-message - msgBuffer.putOctet (0); - msgBuffer.putShort (0); - msgBuffer.putLong (8); - - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - - AMQFrame method (0, MessageTransferBody(ProtocolVersion(), - 0, "qpid.management", 0, 0)); - AMQFrame header (0, AMQHeaderBody()); - AMQFrame content; - - content.setBody(AMQContentBody()); - content.castBody<AMQContentBody>()->decode(msgBuffer, contentSize); - - method.setEof (false); - header.setBof (false); - header.setEof (false); - content.setBof (false); - - msg->getFrames().append(method); - msg->getFrames().append(header); - - MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); - props->setContentLength(contentSize); - msg->getFrames().append(content); - - DeliverableMessage deliverable (msg); - mExchange->route (deliverable, "mgmt", 0); - // Delete flagged objects for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin (); iter != deleteList.rend (); iter++) - { managementObjects.erase (*iter); - } + deleteList.clear (); } @@ -264,10 +249,11 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable, if (contentSize < 8 || contentSize > 65536) return; - char *inMem = new char[contentSize]; - char outMem[4096]; // TODO Fix This - Buffer inBuffer (inMem, contentSize); - Buffer outBuffer (outMem, 4096); + char *inMem = new char[contentSize]; + char outMem[4096]; // TODO Fix This + Buffer inBuffer (inMem, contentSize); + Buffer outBuffer (outMem, 4096); + uint32_t outLen; msg.encodeContent (inBuffer); inBuffer.reset (); @@ -294,32 +280,9 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable, iter->second->doMethod (methodName, inBuffer, outBuffer); } - intrusive_ptr<Message> outMsg (new Message ()); - uint32_t msgSize = 4096 - outBuffer.available (); + outLen = 4096 - outBuffer.available (); outBuffer.reset (); - AMQFrame method (0, MessageTransferBody(ProtocolVersion(), - 0, "amq.direct", 0, 0)); - AMQFrame header (0, AMQHeaderBody()); - AMQFrame content; - - content.setBody(AMQContentBody()); - content.castBody<AMQContentBody>()->decode(outBuffer, msgSize); - - method.setEof (false); - header.setBof (false); - header.setEof (false); - content.setBof (false); - - outMsg->getFrames().append(method); - outMsg->getFrames().append(header); - - MessageProperties* props = outMsg->getFrames().getHeaders()->get<MessageProperties>(true); - props->setContentLength(msgSize); - outMsg->getFrames().append(content); - - DeliverableMessage outDeliverable (outMsg); - dExchange->route (outDeliverable, replyTo, 0); - + SendBuffer (outBuffer, outLen, dExchange, replyTo); free (inMem); } diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index e84c0478f3..c33a59adff 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -72,6 +72,11 @@ class ManagementAgent uint64_t nextObjectId; void PeriodicProcessing (void); + void EncodeHeader (qpid::framing::Buffer& buf); + void SendBuffer (qpid::framing::Buffer& buf, + uint32_t length, + broker::Exchange::shared_ptr exchange, + std::string routingKey); }; }} diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp index 24588b4edd..c2d1f56be0 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.cpp +++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp @@ -21,41 +21,17 @@ #include "Manageable.h" #include "ManagementObject.h" +#include "qpid/framing/FieldTable.h" using namespace qpid::framing; using namespace qpid::management; using namespace qpid::sys; -void ManagementObject::schemaItem (Buffer& buf, - uint8_t typeCode, - std::string name, - std::string description, - bool isConfig, - bool isIndex) -{ - uint8_t flags = - (isConfig ? FLAG_CONFIG : 0) | (isIndex ? FLAG_INDEX : 0); - - buf.putOctet (flags); - buf.putOctet (typeCode); - buf.putShortString (name); - buf.putShortString (description); -} - -void ManagementObject::schemaListBegin (Buffer& buf) -{ - schemaItem (buf, TYPE_UINT64, "id", "Object ID", true, true); -} - -void ManagementObject::schemaListEnd (Buffer& buf) -{ - buf.putOctet (FLAG_END); -} - void ManagementObject::writeTimestamps (Buffer& buf) { - buf.putLongLong (uint64_t (Duration (now ()))); - buf.putLongLong (createTime); - buf.putLongLong (destroyTime); - buf.putLongLong (objectId); + buf.putShortString (className); + buf.putLongLong (uint64_t (Duration (now ()))); + buf.putLongLong (createTime); + buf.putLongLong (destroyTime); + buf.putLongLong (objectId); } diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h index 416a477796..a8ba231419 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.h +++ b/qpid/cpp/src/qpid/management/ManagementObject.h @@ -31,18 +31,6 @@ namespace qpid { namespace management { -const uint16_t OBJECT_SYSTEM = 1; -const uint16_t OBJECT_BROKER = 2; -const uint16_t OBJECT_VHOST = 3; -const uint16_t OBJECT_QUEUE = 4; -const uint16_t OBJECT_EXCHANGE = 5; -const uint16_t OBJECT_BINDING = 6; -const uint16_t OBJECT_CLIENT = 7; -const uint16_t OBJECT_SESSION = 8; -const uint16_t OBJECT_DESTINATION = 9; -const uint16_t OBJECT_PRODUCER = 10; -const uint16_t OBJECT_CONSUMER = 11; - class Manageable; class ManagementObject @@ -56,48 +44,48 @@ class ManagementObject bool instChanged; bool deleted; Manageable* coreObject; + std::string className; - static const uint8_t TYPE_UINT8 = 1; - static const uint8_t TYPE_UINT16 = 2; - static const uint8_t TYPE_UINT32 = 3; - static const uint8_t TYPE_UINT64 = 4; - static const uint8_t TYPE_BOOL = 5; - static const uint8_t TYPE_STRING = 6; + static const uint8_t TYPE_U8 = 1; + static const uint8_t TYPE_U16 = 2; + static const uint8_t TYPE_U32 = 3; + static const uint8_t TYPE_U64 = 4; + static const uint8_t TYPE_SSTR = 6; + static const uint8_t TYPE_LSTR = 7; + + static const uint8_t ACCESS_RC = 1; + static const uint8_t ACCESS_RW = 1; + static const uint8_t ACCESS_RO = 1; + + static const uint8_t DIR_I = 1; + static const uint8_t DIR_O = 2; + static const uint8_t DIR_IO = 3; static const uint8_t FLAG_CONFIG = 0x01; static const uint8_t FLAG_INDEX = 0x02; static const uint8_t FLAG_END = 0x80; - - void schemaItem (qpid::framing::Buffer& buf, - uint8_t typeCode, - std::string name, - std::string description, - bool isConfig = false, - bool isIndex = false); - void schemaListBegin (qpid::framing::Buffer& buf); - void schemaListEnd (qpid::framing::Buffer& buf); + void writeTimestamps (qpid::framing::Buffer& buf); public: typedef boost::shared_ptr<ManagementObject> shared_ptr; - ManagementObject (Manageable* _core) : + ManagementObject (Manageable* _core, std::string _name) : destroyTime(0), objectId (0), configChanged(true), - instChanged(true), deleted(false), coreObject(_core) + instChanged(true), deleted(false), coreObject(_core), className(_name) { createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); } virtual ~ManagementObject () {} - virtual uint16_t getObjectType (void) = 0; - virtual std::string getObjectName (void) = 0; - virtual void writeSchema (qpid::framing::Buffer& buf) = 0; - virtual void writeConfig (qpid::framing::Buffer& buf) = 0; - virtual void writeInstrumentation (qpid::framing::Buffer& buf) = 0; - virtual bool getSchemaNeeded (void) = 0; - virtual void setSchemaNeeded (void) = 0; - virtual void doMethod (std::string methodName, - qpid::framing::Buffer& inBuf, - qpid::framing::Buffer& outBuf) = 0; + virtual void writeSchema (qpid::framing::Buffer& buf) = 0; + virtual void writeConfig (qpid::framing::Buffer& buf) = 0; + virtual void writeInstrumentation (qpid::framing::Buffer& buf) = 0; + virtual bool getSchemaNeeded (void) = 0; + virtual void setSchemaNeeded (void) = 0; + virtual void doMethod (std::string methodName, + qpid::framing::Buffer& inBuf, + qpid::framing::Buffer& outBuf) = 0; + std::string getClassName (void) { return className; } void setObjectId (uint64_t oid) { objectId = oid; } uint64_t getObjectId (void) { return objectId; } inline bool getConfigChanged (void) { return configChanged; } diff --git a/qpid/cpp/src/qpid/management/Queue.cpp b/qpid/cpp/src/qpid/management/Queue.cpp index 3c82877ebd..b30ff2a6a7 100644 --- a/qpid/cpp/src/qpid/management/Queue.cpp +++ b/qpid/cpp/src/qpid/management/Queue.cpp @@ -20,6 +20,7 @@ */ #include "qpid/log/Statement.h" +#include "qpid/framing/FieldTable.h" #include "Manageable.h" #include "Queue.h" @@ -32,7 +33,7 @@ bool Queue::schemaNeeded = true; Queue::Queue (Manageable* _core, Manageable* _parent, const std::string& _name, bool _durable, bool _autoDelete) : - ManagementObject(_core), name(_name), + ManagementObject(_core, "queue"), name(_name), durable(_durable), autoDelete(_autoDelete) { vhostRef = _parent->GetManagementObject ()->getObjectId (); @@ -83,47 +84,248 @@ Queue::~Queue () {} void Queue::writeSchema (Buffer& buf) { + FieldTable ft; + schemaNeeded = false; - schemaListBegin (buf); - schemaItem (buf, TYPE_UINT64, "vhostRef", "Virtual Host Ref", true); - schemaItem (buf, TYPE_STRING, "name", "Queue Name", true); - schemaItem (buf, TYPE_BOOL, "durable", "Durable", true); - schemaItem (buf, TYPE_BOOL, "autoDelete", "AutoDelete", true); - schemaItem (buf, TYPE_UINT64, "msgTotalEnqueues", "Total messages enqueued"); - schemaItem (buf, TYPE_UINT64, "msgTotalDequeues", "Total messages dequeued"); - schemaItem (buf, TYPE_UINT64, "msgTxnEnqueues", "Transactional messages enqueued"); - schemaItem (buf, TYPE_UINT64, "msgTxnDequeues", "Transactional messages dequeued"); - schemaItem (buf, TYPE_UINT64, "msgPersistEnqueues", "Persistent messages enqueued"); - schemaItem (buf, TYPE_UINT64, "msgPersistDequeues", "Persistent messages dequeued"); - schemaItem (buf, TYPE_UINT32, "msgDepth", "Current size of queue in messages"); - schemaItem (buf, TYPE_UINT32, "msgDepthLow", "Low-water queue size, this interval"); - schemaItem (buf, TYPE_UINT32, "msgDepthHigh", "High-water queue size, this interval"); - schemaItem (buf, TYPE_UINT64, "byteTotalEnqueues", "Total messages enqueued"); - schemaItem (buf, TYPE_UINT64, "byteTotalDequeues", "Total messages dequeued"); - schemaItem (buf, TYPE_UINT64, "byteTxnEnqueues", "Transactional messages enqueued"); - schemaItem (buf, TYPE_UINT64, "byteTxnDequeues", "Transactional messages dequeued"); - schemaItem (buf, TYPE_UINT64, "bytePersistEnqueues", "Persistent messages enqueued"); - schemaItem (buf, TYPE_UINT64, "bytePersistDequeues", "Persistent messages dequeued"); - schemaItem (buf, TYPE_UINT32, "byteDepth", "Current size of queue in bytes"); - schemaItem (buf, TYPE_UINT32, "byteDepthLow", "Low-water mark this interval"); - schemaItem (buf, TYPE_UINT32, "byteDepthHigh", "High-water mark this interval"); - schemaItem (buf, TYPE_UINT64, "enqueueTxnStarts", "Total enqueue transactions started "); - schemaItem (buf, TYPE_UINT64, "enqueueTxnCommits", "Total enqueue transactions committed"); - schemaItem (buf, TYPE_UINT64, "enqueueTxnRejects", "Total enqueue transactions rejected"); - schemaItem (buf, TYPE_UINT32, "enqueueTxnCount", "Current pending enqueue transactions"); - schemaItem (buf, TYPE_UINT32, "enqueueTxnCountLow", "Low water mark this interval"); - schemaItem (buf, TYPE_UINT32, "enqueueTxnCountHigh", "High water mark this interval"); - schemaItem (buf, TYPE_UINT64, "dequeueTxnStarts", "Total dequeue transactions started "); - schemaItem (buf, TYPE_UINT64, "dequeueTxnCommits", "Total dequeue transactions committed"); - schemaItem (buf, TYPE_UINT64, "dequeueTxnRejects", "Total dequeue transactions rejected"); - schemaItem (buf, TYPE_UINT32, "dequeueTxnCount", "Current pending dequeue transactions"); - schemaItem (buf, TYPE_UINT32, "dequeueTxnCountLow", "Transaction low water mark this interval"); - schemaItem (buf, TYPE_UINT32, "dequeueTxnCountHigh", "Transaction high water mark this interval"); - schemaItem (buf, TYPE_UINT32, "consumers", "Current consumers on queue"); - schemaItem (buf, TYPE_UINT32, "consumersLow", "Consumer low water mark this interval"); - schemaItem (buf, TYPE_UINT32, "consumersHigh", "Consumer high water mark this interval"); - schemaListEnd (buf); + // Schema class header: + buf.putShortString (className); // Class Name + buf.putShort (4); // Config Element Count + buf.putShort (33); // Inst Element Count + buf.putShort (0); // Method Count + buf.putShort (0); // Event Count + + // Config Elements + ft = FieldTable (); + ft.setString ("name", "vhostRef"); + ft.setInt ("type", TYPE_U64); + ft.setInt ("access", ACCESS_RO); + ft.setInt ("index", 1); + ft.setString ("desc", "Virtual Host Ref"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "name"); + ft.setInt ("type", TYPE_SSTR); + ft.setInt ("access", ACCESS_RO); + ft.setInt ("index", 1); + ft.setString ("desc", "Queue Name"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "durable"); + ft.setInt ("type", TYPE_U8); + ft.setInt ("access", ACCESS_RO); + ft.setInt ("index", 0); + ft.setString ("desc", "Durable"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "autoDelete"); + ft.setInt ("type", TYPE_U8); + ft.setInt ("access", ACCESS_RO); + ft.setInt ("index", 0); + ft.setString ("desc", "AutoDelete"); + buf.put (ft); + + // Inst Elements + ft = FieldTable (); + ft.setString ("name", "msgTotalEnqueues"); + ft.setInt ("type", TYPE_U64); + ft.setString ("desc", "Total messages enqueued"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "msgTotalDequeues"); + ft.setInt ("type", TYPE_U64); + ft.setString ("desc", "Total messages dequeued"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "msgTxnEnqueues"); + ft.setInt ("type", TYPE_U64); + ft.setString ("desc", "Transactional messages enqueued"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "msgTxnDequeues"); + ft.setInt ("type", TYPE_U64); + ft.setString ("desc", "Transactional messages dequeued"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "msgPersistEnqueues"); + ft.setInt ("type", TYPE_U64); + ft.setString ("desc", "Persistent messages enqueued"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "msgPersistDequeues"); + ft.setInt ("type", TYPE_U64); + ft.setString ("desc", "Persistent messages dequeued"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "msgDepth"); + ft.setInt ("type", TYPE_U32); + ft.setString ("desc", "Current size of queue in messages"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "msgDepthLow"); + ft.setInt ("type", TYPE_U32); + ft.setString ("desc", "Low-water queue size, this interval"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "msgDepthHigh"); + ft.setInt ("type", TYPE_U32); + ft.setString ("desc", "High-water queue size, this interval"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "byteTotalEnqueues"); + ft.setInt ("type", TYPE_U64); + ft.setString ("desc", "Total messages enqueued"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "byteTotalDequeues"); + ft.setInt ("type", TYPE_U64); + ft.setString ("desc", "Total messages dequeued"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "byteTxnEnqueues"); + ft.setInt ("type", TYPE_U64); + ft.setString ("desc", "Transactional messages enqueued"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "byteTxnDequeues"); + ft.setInt ("type", TYPE_U64); + ft.setString ("desc", "Transactional messages dequeued"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "bytePersistEnqueues"); + ft.setInt ("type", TYPE_U64); + ft.setString ("desc", "Persistent messages enqueued"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "bytePersistDequeues"); + ft.setInt ("type", TYPE_U64); + ft.setString ("desc", "Persistent messages dequeued"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "byteDepth"); + ft.setInt ("type", TYPE_U32); + ft.setString ("desc", "Current size of queue in bytes"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "byteDepthLow"); + ft.setInt ("type", TYPE_U32); + ft.setString ("desc", "Low-water mark this interval"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "byteDepthHigh"); + ft.setInt ("type", TYPE_U32); + ft.setString ("desc", "High-water mark this interval"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "enqueueTxnStarts"); + ft.setInt ("type", TYPE_U64); + ft.setString ("desc", "Total enqueue transactions started "); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "enqueueTxnCommits"); + ft.setInt ("type", TYPE_U64); + ft.setString ("desc", "Total enqueue transactions committed"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "enqueueTxnRejects"); + ft.setInt ("type", TYPE_U64); + ft.setString ("desc", "Total enqueue transactions rejected"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "enqueueTxnCount"); + ft.setInt ("type", TYPE_U32); + ft.setString ("desc", "Current pending enqueue transactions"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "enqueueTxnCountLow"); + ft.setInt ("type", TYPE_U32); + ft.setString ("desc", "Low water mark this interval"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "enqueueTxnCountHigh"); + ft.setInt ("type", TYPE_U32); + ft.setString ("desc", "High water mark this interval"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "dequeueTxnStarts"); + ft.setInt ("type", TYPE_U64); + ft.setString ("desc", "Total dequeue transactions started "); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "dequeueTxnCommits"); + ft.setInt ("type", TYPE_U64); + ft.setString ("desc", "Total dequeue transactions committed"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "dequeueTxnRejects"); + ft.setInt ("type", TYPE_U64); + ft.setString ("desc", "Total dequeue transactions rejected"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "dequeueTxnCount"); + ft.setInt ("type", TYPE_U32); + ft.setString ("desc", "Current pending dequeue transactions"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "dequeueTxnCountLow"); + ft.setInt ("type", TYPE_U32); + ft.setString ("desc", "Transaction low water mark this interval"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "dequeueTxnCountHigh"); + ft.setInt ("type", TYPE_U32); + ft.setString ("desc", "Transaction high water mark this interval"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "consumers"); + ft.setInt ("type", TYPE_U32); + ft.setString ("desc", "Current consumers on queue"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "consumersLow"); + ft.setInt ("type", TYPE_U32); + ft.setString ("desc", "Consumer low water mark this interval"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "consumersHigh"); + ft.setInt ("type", TYPE_U32); + ft.setString ("desc", "Consumer high water mark this interval"); + buf.put (ft); } void Queue::writeConfig (Buffer& buf) diff --git a/qpid/cpp/src/qpid/management/Queue.h b/qpid/cpp/src/qpid/management/Queue.h index 3a7fdf5263..2ed43e5576 100644 --- a/qpid/cpp/src/qpid/management/Queue.h +++ b/qpid/cpp/src/qpid/management/Queue.h @@ -83,16 +83,14 @@ class Queue : public ManagementObject uint32_t consumersLow; // Low water mark this interval uint32_t consumersHigh; // High water mark this interval - uint16_t getObjectType (void) { return OBJECT_QUEUE; } - std::string getObjectName (void) { return "queue"; } - void writeSchema (qpid::framing::Buffer& buf); - void writeConfig (qpid::framing::Buffer& buf); - void writeInstrumentation (qpid::framing::Buffer& buf); - bool getSchemaNeeded (void) { return schemaNeeded; } - void setSchemaNeeded (void) { schemaNeeded = true; } - void doMethod (std::string /*methodName*/, - qpid::framing::Buffer& /*inBuf*/, - qpid::framing::Buffer& /*outBuf*/) {} + void writeSchema (qpid::framing::Buffer& buf); + void writeConfig (qpid::framing::Buffer& buf); + void writeInstrumentation (qpid::framing::Buffer& buf); + bool getSchemaNeeded (void) { return schemaNeeded; } + void setSchemaNeeded (void) { schemaNeeded = true; } + void doMethod (std::string /*methodName*/, + qpid::framing::Buffer& /*inBuf*/, + qpid::framing::Buffer& /*outBuf*/) {} inline void adjustQueueHiLo (void){ if (msgDepth > msgDepthHigh) msgDepthHigh = msgDepth; diff --git a/qpid/cpp/src/qpid/management/Vhost.cpp b/qpid/cpp/src/qpid/management/Vhost.cpp index effcb1599c..c4fb84e8f2 100644 --- a/qpid/cpp/src/qpid/management/Vhost.cpp +++ b/qpid/cpp/src/qpid/management/Vhost.cpp @@ -21,6 +21,7 @@ #include "Manageable.h" #include "Vhost.h" +#include "qpid/framing/FieldTable.h" using namespace qpid::management; using namespace qpid::sys; @@ -29,7 +30,7 @@ using namespace qpid::framing; bool Vhost::schemaNeeded = true; Vhost::Vhost (Manageable* _core, Manageable* _parent) : - ManagementObject (_core), name("/") + ManagementObject (_core, "vhost"), name("/") { brokerRef = _parent->GetManagementObject ()->getObjectId (); } @@ -38,12 +39,33 @@ Vhost::~Vhost () {} void Vhost::writeSchema (Buffer& buf) { + FieldTable ft; + schemaNeeded = false; - schemaListBegin (buf); - schemaItem (buf, TYPE_UINT64, "brokerRef", "Broker Reference" , true); - schemaItem (buf, TYPE_STRING, "name", "Name of virtual host", true); - schemaListEnd (buf); + // Schema class header: + buf.putShortString (className); // Class Name + buf.putShort (2); // Config Element Count + buf.putShort (0); // Inst Element Count + buf.putShort (0); // Method Count + buf.putShort (0); // Event Count + + // Config Elements + ft = FieldTable (); + ft.setString ("name", "brokerRef"); + ft.setInt ("type", TYPE_U64); + ft.setInt ("access", ACCESS_RC); + ft.setInt ("index", 1); + ft.setString ("desc", "Broker Reference"); + buf.put (ft); + + ft = FieldTable (); + ft.setString ("name", "name"); + ft.setInt ("type", TYPE_SSTR); + ft.setInt ("access", ACCESS_RO); + ft.setInt ("index", 1); + ft.setString ("desc", "Name of virtual host"); + buf.put (ft); } void Vhost::writeConfig (Buffer& buf) diff --git a/qpid/cpp/src/qpid/management/Vhost.h b/qpid/cpp/src/qpid/management/Vhost.h index 5fc5a2870b..286514d7d7 100644 --- a/qpid/cpp/src/qpid/management/Vhost.h +++ b/qpid/cpp/src/qpid/management/Vhost.h @@ -45,16 +45,14 @@ class Vhost : public ManagementObject uint64_t brokerRef; std::string name; - uint16_t getObjectType (void) { return OBJECT_VHOST; } - std::string getObjectName (void) { return "vhost"; } - void writeSchema (qpid::framing::Buffer& buf); - void writeConfig (qpid::framing::Buffer& buf); - void writeInstrumentation (qpid::framing::Buffer& /*buf*/) {} - bool getSchemaNeeded (void) { return schemaNeeded; } - void setSchemaNeeded (void) { schemaNeeded = true; } - void doMethod (std::string /*methodName*/, - qpid::framing::Buffer& /*inBuf*/, - qpid::framing::Buffer& /*outBuf*/) {} + void writeSchema (qpid::framing::Buffer& buf); + void writeConfig (qpid::framing::Buffer& buf); + void writeInstrumentation (qpid::framing::Buffer& /*buf*/) {} + bool getSchemaNeeded (void) { return schemaNeeded; } + void setSchemaNeeded (void) { schemaNeeded = true; } + void doMethod (std::string /*methodName*/, + qpid::framing::Buffer& /*inBuf*/, + qpid::framing::Buffer& /*outBuf*/) {} inline bool getInstChanged (void) { return false; } }; |