diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2007-10-26 02:37:54 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2007-10-26 02:37:54 +0000 |
commit | b19d20c0277ff20609f9f0774224accd5ff3e452 (patch) | |
tree | c2842f576f8b8f7a1bfc30ad3165fb28116d7f95 /cpp/src | |
parent | 8863c11dabfec9fee5d398361250aa6b4a46e538 (diff) | |
download | qpid-python-b19d20c0277ff20609f9f0774224accd5ff3e452.tar.gz |
- added patch from Tedd
- QPID-660
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@588478 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 19 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ExchangeRegistry.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ManagementAgent.cpp | 164 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ManagementAgent.h | 12 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ManagementExchange.cpp | 77 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ManagementExchange.h | 61 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ManagementObject.cpp | 24 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ManagementObject.h | 65 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ManagementObjectQueue.cpp | 26 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ManagementObjectQueue.h | 99 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueRegistry.cpp | 48 |
13 files changed, 413 insertions, 202 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 02921fbd08..57b3d27b71 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -163,6 +163,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/HeadersExchange.cpp \ qpid/broker/IncomingExecutionContext.cpp \ qpid/broker/ManagementAgent.cpp \ + qpid/broker/ManagementExchange.cpp \ qpid/broker/ManagementObject.cpp \ qpid/broker/ManagementObjectQueue.cpp \ qpid/broker/Message.cpp \ @@ -260,6 +261,7 @@ nobase_include_HEADERS = \ qpid/broker/HeadersExchange.h \ qpid/broker/IncomingExecutionContext.h \ qpid/broker/ManagementAgent.h \ + qpid/broker/ManagementExchange.h \ qpid/broker/ManagementObject.h \ qpid/broker/ManagementObjectQueue.h \ qpid/broker/Message.h \ diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 44aeb482de..e53774740a 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -28,6 +28,7 @@ #include "NullMessageStore.h" #include "RecoveryManagerImpl.h" #include "TopicExchange.h" +#include "ManagementExchange.h" #include "qpid/log/Statement.h" #include "qpid/Url.h" @@ -104,8 +105,8 @@ Broker::Broker(const Broker::Options& conf) : dtxManager(store.get()) { if(conf.enableMgmt){ - managementAgent = ManagementAgent::shared_ptr (new ManagementAgent (conf.mgmtPubInterval)); - queues.setManagementAgent(managementAgent); + managementAgent = ManagementAgent::shared_ptr (new ManagementAgent (conf.mgmtPubInterval)); + queues.setManagementAgent(managementAgent); } exchanges.declare(empty, DirectExchange::typeName); // Default exchange. @@ -115,16 +116,18 @@ Broker::Broker(const Broker::Options& conf) : exchanges.declare(amq_match, HeadersExchange::typeName); if(conf.enableMgmt) { - QPID_LOG(info, "Management enabled"); - exchanges.declare(qpid_management, TopicExchange::typeName); - managementAgent->setExchange (exchanges.get (qpid_management)); + QPID_LOG(info, "Management enabled"); + exchanges.declare(qpid_management, ManagementExchange::typeName); + Exchange::shared_ptr mExchange = exchanges.get (qpid_management); + managementAgent->setExchange (mExchange); + dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent (managementAgent); } else - QPID_LOG(info, "Management not enabled"); + QPID_LOG(info, "Management not enabled"); if(store.get()) { - store->init(conf.storeDir, conf.storeAsync); - RecoveryManagerImpl recoverer(queues, exchanges, dtxManager, + store->init(conf.storeDir, conf.storeAsync); + RecoveryManagerImpl recoverer(queues, exchanges, dtxManager, conf.stagingThreshold); store->recover(recoverer); } diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index edc9a5b63b..ae1afe5abb 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -23,6 +23,7 @@ #include "FanOutExchange.h" #include "HeadersExchange.h" #include "TopicExchange.h" +#include "ManagementExchange.h" using namespace qpid::broker; using namespace qpid::sys; @@ -41,7 +42,7 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c RWlock::ScopedWlock locker(lock); ExchangeMap::iterator i = exchanges.find(name); if (i == exchanges.end()) { - Exchange::shared_ptr exchange; + Exchange::shared_ptr exchange; if(type == TopicExchange::typeName){ exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args)); @@ -51,13 +52,15 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args)); }else if (type == HeadersExchange::typeName) { exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args)); + }else if (type == ManagementExchange::typeName) { + exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args)); }else{ throw UnknownExchangeTypeException(); } - exchanges[name] = exchange; - return std::pair<Exchange::shared_ptr, bool>(exchange, true); + exchanges[name] = exchange; + return std::pair<Exchange::shared_ptr, bool>(exchange, true); } else { - return std::pair<Exchange::shared_ptr, bool>(i->second, false); + return std::pair<Exchange::shared_ptr, bool>(i->second, false); } } diff --git a/cpp/src/qpid/broker/ManagementAgent.cpp b/cpp/src/qpid/broker/ManagementAgent.cpp index f2ebe991b5..71b027c7df 100644 --- a/cpp/src/qpid/broker/ManagementAgent.cpp +++ b/cpp/src/qpid/broker/ManagementAgent.cpp @@ -35,9 +35,9 @@ ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval) timer.add (TimerTask::shared_ptr (new Periodic(*this, interval))); } -void ManagementAgent::setExchange (Exchange::shared_ptr exchangePtr) +void ManagementAgent::setExchange (Exchange::shared_ptr _exchange) { - exchange = exchangePtr; + exchange = _exchange; } void ManagementAgent::addObject (ManagementObject::shared_ptr object) @@ -46,12 +46,6 @@ void ManagementAgent::addObject (ManagementObject::shared_ptr object) QPID_LOG(info, "Management Object Added"); } -void ManagementAgent::deleteObject (ManagementObject::shared_ptr object) -{ - managementObjects.remove (object); - QPID_LOG (debug, "Management Object Removed"); -} - ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {} @@ -61,6 +55,18 @@ void ManagementAgent::Periodic::fire () agent.PeriodicProcessing (); } +void ManagementAgent::clientAdded (void) +{ + for (ManagementObjectList::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject::shared_ptr object = *iter; + object->setAllChanged (); + object->setSchemaNeeded (); + } +} + void ManagementAgent::PeriodicProcessing (void) { #define BUFSIZE 65536 @@ -69,10 +75,9 @@ void ManagementAgent::PeriodicProcessing (void) Buffer msgBuffer (msgChars, BUFSIZE); uint32_t contentSize; - //QPID_LOG (debug, "Timer Fired"); if (managementObjects.empty ()) - return; - + return; + Message::shared_ptr msg (new Message ()); // Build the magic number for the management message. @@ -82,74 +87,75 @@ void ManagementAgent::PeriodicProcessing (void) msgBuffer.putOctet ('1'); for (ManagementObjectList::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); - iter++) + iter != managementObjects.end (); + iter++) { - ManagementObject::shared_ptr objectPtr = *iter; - - //QPID_LOG (debug, " Object Found..."); - - if (objectPtr->getSchemaNeeded ()) - { - //QPID_LOG (debug, " Generating Schema"); - uint32_t startAvail = msgBuffer.available (); - uint32_t recordLength; - - msgBuffer.putOctet ('S'); // opcode = Schema Record - msgBuffer.putOctet (0); // content-class = N/A - msgBuffer.putShort (objectPtr->getObjectType ()); - msgBuffer.record (); // Record the position of the length field - msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length - - objectPtr->writeSchema (msgBuffer); - recordLength = startAvail - msgBuffer.available (); - msgBuffer.restore (true); // Restore pointer to length field - msgBuffer.putLong (recordLength); - msgBuffer.restore (); // Re-restore to get to the end of the buffer - } - - if (objectPtr->getConfigChanged ()) - { - //QPID_LOG (debug, " Generating Config"); - uint32_t startAvail = msgBuffer.available (); - uint32_t recordLength; - - msgBuffer.putOctet ('C'); // opcode = Content Record - msgBuffer.putOctet ('C'); // content-class = Configuration - msgBuffer.putShort (objectPtr->getObjectType ()); - msgBuffer.record (); // Record the position of the length field - msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length - - objectPtr->writeConfig (msgBuffer); - recordLength = startAvail - msgBuffer.available (); - msgBuffer.restore (true); // Restore pointer to length field - msgBuffer.putLong (recordLength); - msgBuffer.restore (); // Re-restore to get to the end of the buffer - } - - if (objectPtr->getInstChanged ()) - { - //QPID_LOG (debug, " Generating Instrumentation"); - uint32_t startAvail = msgBuffer.available (); - uint32_t recordLength; - - msgBuffer.putOctet ('C'); // opcode = Content Record - msgBuffer.putOctet ('I'); // content-class = Instrumentation - msgBuffer.putShort (objectPtr->getObjectType ()); - msgBuffer.record (); // Record the position of the length field - msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length - - objectPtr->writeInstrumentation (msgBuffer); - recordLength = startAvail - msgBuffer.available (); - msgBuffer.restore (true); // Restore pointer to length field - msgBuffer.putLong (recordLength); - msgBuffer.restore (); // Re-restore to get to the end of the buffer - } - - // Temporary protection against buffer overrun. - // This needs to be replaced with frame fragmentation. - if (msgBuffer.available () < THRESHOLD) - break; + ManagementObject::shared_ptr object = *iter; + + if (object->getSchemaNeeded ()) + { + uint32_t startAvail = msgBuffer.available (); + uint32_t recordLength; + + msgBuffer.putOctet ('S'); // opcode = Schema Record + msgBuffer.putOctet (0); // content-class = N/A + msgBuffer.putShort (object->getObjectType ()); + msgBuffer.record (); // Record the position of the length field + msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length + + object->writeSchema (msgBuffer); + recordLength = startAvail - msgBuffer.available (); + msgBuffer.restore (true); // Restore pointer to length field + msgBuffer.putLong (recordLength); + msgBuffer.restore (); // Re-restore to get to the end of the buffer + } + + if (object->getConfigChanged ()) + { + uint32_t startAvail = msgBuffer.available (); + uint32_t recordLength; + + msgBuffer.putOctet ('C'); // opcode = Content Record + msgBuffer.putOctet ('C'); // content-class = Configuration + msgBuffer.putShort (object->getObjectType ()); + msgBuffer.record (); // Record the position of the length field + msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length + + object->writeConfig (msgBuffer); + recordLength = startAvail - msgBuffer.available (); + msgBuffer.restore (true); // Restore pointer to length field + msgBuffer.putLong (recordLength); + msgBuffer.restore (); // Re-restore to get to the end of the buffer + } + + if (object->getInstChanged ()) + { + uint32_t startAvail = msgBuffer.available (); + uint32_t recordLength; + + msgBuffer.putOctet ('C'); // opcode = Content Record + msgBuffer.putOctet ('I'); // content-class = Instrumentation + msgBuffer.putShort (object->getObjectType ()); + msgBuffer.record (); // Record the position of the length field + msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length + + object->writeInstrumentation (msgBuffer); + recordLength = startAvail - msgBuffer.available (); + msgBuffer.restore (true); // Restore pointer to length field + msgBuffer.putLong (recordLength); + msgBuffer.restore (); // Re-restore to get to the end of the buffer + } + + if (object->isDeleted ()) + { + managementObjects.remove (object); + QPID_LOG (debug, "Management Object Removed"); + } + + // Temporary protection against buffer overrun. + // This needs to be replaced with frame fragmentation. + if (msgBuffer.available () < THRESHOLD) + break; } msgBuffer.putOctet ('X'); // End-of-message @@ -161,7 +167,7 @@ void ManagementAgent::PeriodicProcessing (void) msgBuffer.reset (); AMQFrame method (0, MessageTransferBody(ProtocolVersion(), - 0, "qpid.management", 0, 0)); + 0, "qpid.management", 0, 0)); AMQFrame header (0, AMQHeaderBody()); AMQFrame content; diff --git a/cpp/src/qpid/broker/ManagementAgent.h b/cpp/src/qpid/broker/ManagementAgent.h index 4f3b0a0f5f..1e332023a6 100644 --- a/cpp/src/qpid/broker/ManagementAgent.h +++ b/cpp/src/qpid/broker/ManagementAgent.h @@ -38,21 +38,21 @@ class ManagementAgent typedef boost::shared_ptr<ManagementAgent> shared_ptr; - ManagementAgent(uint16_t interval); + ManagementAgent (uint16_t interval); - void setExchange (Exchange::shared_ptr exchangePtr); + void setExchange (Exchange::shared_ptr exchange); void addObject (ManagementObject::shared_ptr object); - void deleteObject (ManagementObject::shared_ptr object); + void clientAdded (void); private: struct Periodic : public TimerTask { ManagementAgent& agent; - + Periodic (ManagementAgent& agent, uint32_t seconds); - ~Periodic () {} - void fire (); + ~Periodic () {} + void fire (); }; ManagementObjectList managementObjects; diff --git a/cpp/src/qpid/broker/ManagementExchange.cpp b/cpp/src/qpid/broker/ManagementExchange.cpp new file mode 100644 index 0000000000..d3de8bc8e1 --- /dev/null +++ b/cpp/src/qpid/broker/ManagementExchange.cpp @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementExchange.h" +#include "qpid/log/Statement.h" + +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +ManagementExchange::ManagementExchange (const string& _name) : + Exchange (_name), TopicExchange(_name) {} +ManagementExchange::ManagementExchange (const std::string& _name, + bool _durable, + const FieldTable& _args) : + Exchange (_name, _durable, _args), + TopicExchange(_name, _durable, _args) {} + + +bool ManagementExchange::bind (Queue::shared_ptr queue, + const string& routingKey, + const FieldTable* args) +{ + bool result = TopicExchange::bind (queue, routingKey, args); + + // Notify the management agent that a new management client has bound to the + // exchange. + if (result) + managementAgent->clientAdded (); + + return result; +} + +void ManagementExchange::route (Deliverable& msg, + const string& routingKey, + const FieldTable* args) +{ + // Intercept management commands + if (routingKey.length () > 7 && + routingKey.substr (0, 7).compare ("method.") == 0) + { + QPID_LOG (debug, "ManagementExchange: Intercept command " << routingKey); + // TODO: Send intercepted commands to ManagementAgent for dispatch + return; + } + + TopicExchange::route (msg, routingKey, args); +} + +void ManagementExchange::setManagmentAgent (ManagementAgent::shared_ptr agent) +{ + managementAgent = agent; +} + + +ManagementExchange::~ManagementExchange() {} + +const std::string ManagementExchange::typeName("management"); + diff --git a/cpp/src/qpid/broker/ManagementExchange.h b/cpp/src/qpid/broker/ManagementExchange.h new file mode 100644 index 0000000000..56c051a7f8 --- /dev/null +++ b/cpp/src/qpid/broker/ManagementExchange.h @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ManagementExchange_ +#define _ManagementExchange_ + +#include "TopicExchange.h" +#include "ManagementAgent.h" + +namespace qpid { +namespace broker { + +class ManagementExchange : public virtual TopicExchange +{ + private: + ManagementAgent::shared_ptr managementAgent; + + public: + static const std::string typeName; + + ManagementExchange (const string& name); + ManagementExchange (const string& _name, bool _durable, + const qpid::framing::FieldTable& _args); + + virtual std::string getType() const { return typeName; } + + virtual bool bind (Queue::shared_ptr queue, + const string& routingKey, + const qpid::framing::FieldTable* args); + + virtual void route (Deliverable& msg, + const string& routingKey, + const qpid::framing::FieldTable* args); + + void setManagmentAgent (ManagementAgent::shared_ptr agent); + + virtual ~ManagementExchange(); +}; + + +} +} + +#endif diff --git a/cpp/src/qpid/broker/ManagementObject.cpp b/cpp/src/qpid/broker/ManagementObject.cpp index 75913ba3ab..c536d96b1b 100644 --- a/cpp/src/qpid/broker/ManagementObject.cpp +++ b/cpp/src/qpid/broker/ManagementObject.cpp @@ -23,14 +23,19 @@ using namespace qpid::framing; using namespace qpid::broker; +using namespace qpid::sys; void ManagementObject::schemaItem (Buffer& buf, - uint8_t typeCode, - std::string name, - std::string description, - bool isConfig) + uint8_t typeCode, + std::string name, + std::string description, + bool isConfig, + bool isIndex) { - buf.putOctet (isConfig ? 1 : 0); + uint8_t flags = + (isConfig ? FLAG_CONFIG : 0) | (isIndex ? FLAG_INDEX : 0); + + buf.putOctet (flags); buf.putOctet (typeCode); buf.putShortString (name); buf.putShortString (description); @@ -38,5 +43,12 @@ void ManagementObject::schemaItem (Buffer& buf, void ManagementObject::schemaListEnd (Buffer& buf) { - buf.putOctet (0xFF); + buf.putOctet (FLAG_END); +} + +void ManagementObject::writeTimestamps (Buffer& buf) +{ + buf.putLongLong (uint64_t (Duration (now ()))); + buf.putLongLong (createTime); + buf.putLongLong (destroyTime); } diff --git a/cpp/src/qpid/broker/ManagementObject.h b/cpp/src/qpid/broker/ManagementObject.h index 1588aed641..243d853727 100644 --- a/cpp/src/qpid/broker/ManagementObject.h +++ b/cpp/src/qpid/broker/ManagementObject.h @@ -31,24 +31,30 @@ namespace qpid { namespace broker { using namespace qpid::framing; +using namespace qpid::sys; + +const uint16_t OBJECT_SYSTEM = 1; +const uint16_t OBJECT_BROKER = 2; +const uint16_t OBJECT_VHOST = 3; +const uint16_t OBJECT_QUEUE = 4; +const uint16_t OBJECT_EXCHANGE = 5; +const uint16_t OBJECT_BINDING = 6; +const uint16_t OBJECT_CLIENT = 7; +const uint16_t OBJECT_SESSION = 8; +const uint16_t OBJECT_DESTINATION = 9; +const uint16_t OBJECT_PRODUCER = 10; +const uint16_t OBJECT_CONSUMER = 11; -const uint16_t OBJECT_BROKER = 1; -const uint16_t OBJECT_SERVER = 2; -const uint16_t OBJECT_QUEUE = 3; -const uint16_t OBJECT_EXCHANGE = 4; -const uint16_t OBJECT_BINDING = 5; class ManagementObject { - private: - - qpid::sys::AbsTime createTime; - qpid::sys::AbsTime destroyTime; - protected: - bool configChanged; - bool instChanged; + uint64_t createTime; + uint64_t destroyTime; + bool configChanged; + bool instChanged; + bool deleted; static const uint8_t TYPE_UINT8 = 1; static const uint8_t TYPE_UINT16 = 2; @@ -56,18 +62,26 @@ class ManagementObject static const uint8_t TYPE_UINT64 = 4; static const uint8_t TYPE_BOOL = 5; static const uint8_t TYPE_STRING = 6; + + static const uint8_t FLAG_CONFIG = 0x01; + static const uint8_t FLAG_INDEX = 0x02; + static const uint8_t FLAG_END = 0x80; void schemaItem (Buffer& buf, - uint8_t typeCode, - std::string name, - std::string description, - bool isConfig = false); - void schemaListEnd (Buffer & buf); + uint8_t typeCode, + std::string name, + std::string description, + bool isConfig = false, + bool isIndex = false); + void schemaListEnd (Buffer& buf); + void writeTimestamps (Buffer& buf); public: typedef boost::shared_ptr<ManagementObject> shared_ptr; - ManagementObject () : configChanged(true), instChanged(true) { createTime = qpid::sys::now (); } + ManagementObject () : destroyTime(0), configChanged(true), + instChanged(true), deleted(false) + { createTime = uint64_t (Duration (now ())); } virtual ~ManagementObject () {} virtual uint16_t getObjectType (void) = 0; @@ -76,10 +90,21 @@ class ManagementObject virtual void writeConfig (Buffer& buf) = 0; virtual void writeInstrumentation (Buffer& buf) = 0; virtual bool getSchemaNeeded (void) = 0; - + virtual void setSchemaNeeded (void) = 0; + inline bool getConfigChanged (void) { return configChanged; } inline bool getInstChanged (void) { return instChanged; } - inline void resourceDestroy (void) { destroyTime = qpid::sys::now (); } + inline void setAllChanged (void) + { + configChanged = true; + instChanged = true; + } + + inline void resourceDestroy (void) { + destroyTime = uint64_t (Duration (now ())); + deleted = true; + } + bool isDeleted (void) { return deleted; } }; diff --git a/cpp/src/qpid/broker/ManagementObjectQueue.cpp b/cpp/src/qpid/broker/ManagementObjectQueue.cpp index b81cd7b60d..d30cda03a4 100644 --- a/cpp/src/qpid/broker/ManagementObjectQueue.cpp +++ b/cpp/src/qpid/broker/ManagementObjectQueue.cpp @@ -76,7 +76,9 @@ ManagementObjectQueue::~ManagementObjectQueue () {} void ManagementObjectQueue::writeSchema (Buffer& buf) { - schemaItem (buf, TYPE_STRING, "name", "Queue Name", true); + schemaNeeded = false; + + schemaItem (buf, TYPE_STRING, "name", "Queue Name", true, true); schemaItem (buf, TYPE_BOOL, "durable", "Durable", true); schemaItem (buf, TYPE_BOOL, "autoDelete", "AutoDelete", true); @@ -115,21 +117,24 @@ void ManagementObjectQueue::writeSchema (Buffer& buf) schemaItem (buf, TYPE_UINT32, "consumersHigh", "Consumer high water mark this interval"); schemaListEnd (buf); - - schemaNeeded = false; } void ManagementObjectQueue::writeConfig (Buffer& buf) { + configChanged = false; + + writeTimestamps (buf); buf.putShortString (name); buf.putOctet (durable ? 1 : 0); buf.putOctet (autoDelete ? 1 : 0); - - configChanged = false; } void ManagementObjectQueue::writeInstrumentation (Buffer& buf) { + instChanged = false; + + writeTimestamps (buf); + buf.putShortString (name); buf.putLongLong (msgTotalEnqueues); buf.putLongLong (msgTotalDequeues); buf.putLongLong (msgTxEnqueues); @@ -164,5 +169,14 @@ void ManagementObjectQueue::writeInstrumentation (Buffer& buf) buf.putLong (consumersLow); buf.putLong (consumersHigh); - instChanged = false; + msgDepthLow = msgDepth; + msgDepthHigh = msgDepth; + byteDepthLow = byteDepth; + byteDepthHigh = byteDepth; + enqueueTxCountLow = enqueueTxCount; + enqueueTxCountHigh = enqueueTxCount; + dequeueTxCountLow = dequeueTxCount; + dequeueTxCountHigh = dequeueTxCount; + consumersLow = consumers; + consumersHigh = consumers; } diff --git a/cpp/src/qpid/broker/ManagementObjectQueue.h b/cpp/src/qpid/broker/ManagementObjectQueue.h index 989d10f8c0..cb2d399b76 100644 --- a/cpp/src/qpid/broker/ManagementObjectQueue.h +++ b/cpp/src/qpid/broker/ManagementObjectQueue.h @@ -89,28 +89,29 @@ class ManagementObjectQueue : public ManagementObject void writeConfig (Buffer& buf); void writeInstrumentation (Buffer& buf); bool getSchemaNeeded (void) { return schemaNeeded; } + void setSchemaNeeded (void) { schemaNeeded = true; } inline void adjustQueueHiLo (void){ - if (msgDepth > msgDepthHigh) msgDepthHigh = msgDepth; - if (msgDepth < msgDepthLow) msgDepthLow = msgDepth; + if (msgDepth > msgDepthHigh) msgDepthHigh = msgDepth; + if (msgDepth < msgDepthLow) msgDepthLow = msgDepth; - if (byteDepth > byteDepthHigh) byteDepthHigh = byteDepth; - if (byteDepth < byteDepthLow) byteDepthLow = byteDepth; - instChanged = true; + if (byteDepth > byteDepthHigh) byteDepthHigh = byteDepth; + if (byteDepth < byteDepthLow) byteDepthLow = byteDepth; + instChanged = true; } inline void adjustTxHiLo (void){ - if (enqueueTxCount > enqueueTxCountHigh) enqueueTxCountHigh = enqueueTxCount; - if (enqueueTxCount < enqueueTxCountLow) enqueueTxCountLow = enqueueTxCount; - if (dequeueTxCount > dequeueTxCountHigh) dequeueTxCountHigh = dequeueTxCount; - if (dequeueTxCount < dequeueTxCountLow) dequeueTxCountLow = dequeueTxCount; - instChanged = true; + if (enqueueTxCount > enqueueTxCountHigh) enqueueTxCountHigh = enqueueTxCount; + if (enqueueTxCount < enqueueTxCountLow) enqueueTxCountLow = enqueueTxCount; + if (dequeueTxCount > dequeueTxCountHigh) dequeueTxCountHigh = dequeueTxCount; + if (dequeueTxCount < dequeueTxCountLow) dequeueTxCountLow = dequeueTxCount; + instChanged = true; } inline void adjustConsumerHiLo (void){ - if (consumers > consumersHigh) consumersHigh = consumers; - if (consumers < consumersLow) consumersLow = consumers; - instChanged = true; + if (consumers > consumersHigh) consumersHigh = consumers; + if (consumers < consumersLow) consumersLow = consumers; + instChanged = true; } public: @@ -124,51 +125,51 @@ class ManagementObjectQueue : public ManagementObject // messages when counting statistics. inline void enqueue (uint64_t bytes, uint32_t attrMask = 0){ - msgTotalEnqueues++; - byteTotalEnqueues += bytes; - - if (attrMask & MSG_MASK_TX){ - msgTxEnqueues++; - byteTxEnqueues += bytes; - } - - if (attrMask & MSG_MASK_PERSIST){ - msgPersistEnqueues++; - bytePersistEnqueues += bytes; - } - - msgDepth++; - byteDepth += bytes; - adjustQueueHiLo (); + msgTotalEnqueues++; + byteTotalEnqueues += bytes; + + if (attrMask & MSG_MASK_TX){ + msgTxEnqueues++; + byteTxEnqueues += bytes; + } + + if (attrMask & MSG_MASK_PERSIST){ + msgPersistEnqueues++; + bytePersistEnqueues += bytes; + } + + msgDepth++; + byteDepth += bytes; + adjustQueueHiLo (); } inline void dequeue (uint64_t bytes, uint32_t attrMask = 0){ - msgTotalDequeues++; - byteTotalDequeues += bytes; - - if (attrMask & MSG_MASK_TX){ - msgTxDequeues++; - byteTxDequeues += bytes; - } - - if (attrMask & MSG_MASK_PERSIST){ - msgPersistDequeues++; - bytePersistDequeues += bytes; - } - - msgDepth--; - byteDepth -= bytes; - adjustQueueHiLo (); + msgTotalDequeues++; + byteTotalDequeues += bytes; + + if (attrMask & MSG_MASK_TX){ + msgTxDequeues++; + byteTxDequeues += bytes; + } + + if (attrMask & MSG_MASK_PERSIST){ + msgPersistDequeues++; + bytePersistDequeues += bytes; + } + + msgDepth--; + byteDepth -= bytes; + adjustQueueHiLo (); } inline void incConsumers (void){ - consumers++; - adjustConsumerHiLo (); + consumers++; + adjustConsumerHiLo (); } inline void decConsumers (void){ - consumers--; - adjustConsumerHiLo (); + consumers--; + adjustConsumerHiLo (); } }; diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index ef1358feb9..116e8d9431 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -293,6 +293,10 @@ void Queue::consume(Consumer::ptr c, bool requestExclusive){ } browsers.push_back(c); } + + if (mgmtObjectPtr != 0){ + mgmtObjectPtr->incConsumers (); + } } void Queue::cancel(Consumer::ptr c){ @@ -302,6 +306,9 @@ void Queue::cancel(Consumer::ptr c){ } else { cancel(c, browsers); } + if (mgmtObjectPtr != 0){ + mgmtObjectPtr->decConsumers (); + } if(exclusive == c) exclusive.reset(); } diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index 927de4c079..6c87e5ff98 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -21,6 +21,7 @@ #include "QueueRegistry.h" #include "ManagementAgent.h" #include "ManagementObjectQueue.h" +#include "qpid/log/Statement.h" #include <sstream> #include <assert.h> @@ -42,33 +43,32 @@ QueueRegistry::declare(const string& declareName, bool durable, QueueMap::iterator i = queues.find(name); if (i == queues.end()) { - Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner)); - queues[name] = queue; + Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner)); + queues[name] = queue; - if (managementAgent){ - ManagementObjectQueue::shared_ptr mgmtObject(new ManagementObjectQueue (name, durable, autoDelete)); + if (managementAgent){ + ManagementObjectQueue::shared_ptr mgmtObject(new ManagementObjectQueue (name, durable, autoDelete)); - queue->setMgmt (mgmtObject); - managementAgent->addObject(dynamic_pointer_cast<ManagementObject>(mgmtObject)); - } - - return std::pair<Queue::shared_ptr, bool>(queue, true); + queue->setMgmt (mgmtObject); + managementAgent->addObject(dynamic_pointer_cast<ManagementObject>(mgmtObject)); + } + + return std::pair<Queue::shared_ptr, bool>(queue, true); } else { - return std::pair<Queue::shared_ptr, bool>(i->second, false); + return std::pair<Queue::shared_ptr, bool>(i->second, false); } } void QueueRegistry::destroy(const string& name){ RWlock::ScopedWlock locker(lock); - if (managementAgent){ - ManagementObjectQueue::shared_ptr mgmtObject; - QueueMap::iterator i = queues.find(name); + ManagementObjectQueue::shared_ptr mgmtObject; + QueueMap::iterator i = queues.find(name); - if (i != queues.end()){ - mgmtObject = i->second->getMgmt (); - managementAgent->deleteObject (dynamic_pointer_cast<ManagementObject>(mgmtObject)); - } + if (i != queues.end()){ + mgmtObject = i->second->getMgmt (); + mgmtObject->resourceDestroy (); + } } queues.erase(name); @@ -79,20 +79,20 @@ Queue::shared_ptr QueueRegistry::find(const string& name){ QueueMap::iterator i = queues.find(name); if (i == queues.end()) { - return Queue::shared_ptr(); + return Queue::shared_ptr(); } else { - return i->second; + return i->second; } } string QueueRegistry::generateName(){ string name; do { - std::stringstream ss; - ss << "tmp_" << counter++; - name = ss.str(); - // Thread safety: Private function, only called with lock held - // so this is OK. + std::stringstream ss; + ss << "tmp_" << counter++; + name = ss.str(); + // Thread safety: Private function, only called with lock held + // so this is OK. } while(queues.find(name) != queues.end()); return name; } |