diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2007-10-19 18:57:30 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2007-10-19 18:57:30 +0000 |
commit | b97be677001ec35469d080a98ba88276f2300651 (patch) | |
tree | 2d947cee25396e96ad1a49f43d9de82e04f0f96a /cpp/src | |
parent | f25df3a53ca1ef5eec396512fd584823e7f6636d (diff) | |
download | qpid-python-b97be677001ec35469d080a98ba88276f2300651.tar.gz |
QPID-651 applied patch from Ted
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@586578 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 26 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ManagementAgent.cpp | 187 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ManagementAgent.h | 70 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ManagementObject.cpp | 42 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ManagementObject.h | 92 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ManagementObjectQueue.cpp | 168 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ManagementObjectQueue.h | 179 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 23 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueRegistry.cpp | 37 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueRegistry.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/framing/Buffer.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/framing/Buffer.h | 3 |
15 files changed, 857 insertions, 10 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 7f644a3c8f..784f2db227 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -161,6 +161,9 @@ libqpidbroker_la_SOURCES = \ qpid/broker/FanOutExchange.cpp \ qpid/broker/HeadersExchange.cpp \ qpid/broker/IncomingExecutionContext.cpp \ + qpid/broker/ManagementAgent.cpp \ + qpid/broker/ManagementObject.cpp \ + qpid/broker/ManagementObjectQueue.cpp \ qpid/broker/Message.cpp \ qpid/broker/MessageAdapter.cpp \ qpid/broker/MessageBuilder.cpp \ @@ -254,6 +257,9 @@ nobase_include_HEADERS = \ qpid/broker/HandlerImpl.h \ qpid/broker/HeadersExchange.h \ qpid/broker/IncomingExecutionContext.h \ + qpid/broker/ManagementAgent.h \ + qpid/broker/ManagementObject.h \ + qpid/broker/ManagementObjectQueue.h \ qpid/broker/Message.h \ qpid/broker/MessageAdapter.h \ qpid/broker/MessageBuilder.h \ diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 84c5703a16..44aeb482de 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -60,8 +60,10 @@ Broker::Options::Options(const std::string& name) : connectionBacklog(10), store(), stagingThreshold(5000000), - storeDir("/var"), - storeAsync(false) + storeDir("/var"), + storeAsync(false), + enableMgmt(0), + mgmtPubInterval(10) { addOptions() ("port,p", optValue(port,"PORT"), @@ -79,7 +81,11 @@ Broker::Options::Options(const std::string& name) : ("store-directory", optValue(storeDir,"DIR"), "Store directory location for persistence.") ("store-async", optValue(storeAsync,"yes|no"), - "Use async persistence storage - if store supports it, enable AIO 0-DIRECT."); + "Use async persistence storage - if store supports it, enable AIO 0-DIRECT.") + ("mgmt,m", optValue(enableMgmt,"yes|no"), + "Enable Management") + ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), + "Management Publish Interval"); } const std::string empty; @@ -87,6 +93,7 @@ const std::string amq_direct("amq.direct"); const std::string amq_topic("amq.topic"); const std::string amq_fanout("amq.fanout"); const std::string amq_match("amq.match"); +const std::string qpid_management("qpid.management"); Broker::Broker(const Broker::Options& conf) : config(conf), @@ -96,11 +103,24 @@ Broker::Broker(const Broker::Options& conf) : factory(*this), dtxManager(store.get()) { + if(conf.enableMgmt){ + managementAgent = ManagementAgent::shared_ptr (new ManagementAgent (conf.mgmtPubInterval)); + queues.setManagementAgent(managementAgent); + } + exchanges.declare(empty, DirectExchange::typeName); // Default exchange. exchanges.declare(amq_direct, DirectExchange::typeName); exchanges.declare(amq_topic, TopicExchange::typeName); exchanges.declare(amq_fanout, FanOutExchange::typeName); exchanges.declare(amq_match, HeadersExchange::typeName); + + if(conf.enableMgmt) { + QPID_LOG(info, "Management enabled"); + exchanges.declare(qpid_management, TopicExchange::typeName); + managementAgent->setExchange (exchanges.get (qpid_management)); + } + else + QPID_LOG(info, "Management not enabled"); if(store.get()) { store->init(conf.storeDir, conf.storeAsync); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index b4b82e8433..2018371624 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -30,6 +30,7 @@ #include "MessageStore.h" #include "QueueRegistry.h" #include "SessionManager.h" +#include "ManagementAgent.h" #include "qpid/Options.h" #include "qpid/Plugin.h" #include "qpid/Url.h" @@ -64,8 +65,10 @@ class Broker : public sys::Runnable, public Plugin::Target int connectionBacklog; std::string store; long stagingThreshold; - string storeDir; - bool storeAsync; + string storeDir; + bool storeAsync; + bool enableMgmt; + uint16_t mgmtPubInterval; }; virtual ~Broker(); @@ -107,6 +110,7 @@ class Broker : public sys::Runnable, public Plugin::Target DtxManager& getDtxManager() { return dtxManager; } SessionManager& getSessionManager() { return sessionManager; } + ManagementAgent::shared_ptr getManagementAgent() { return managementAgent; } private: sys::Acceptor& getAcceptor() const; @@ -123,6 +127,7 @@ class Broker : public sys::Runnable, public Plugin::Target DtxManager dtxManager; HandlerUpdaters handlerUpdaters; SessionManager sessionManager; + ManagementAgent::shared_ptr managementAgent; static MessageStore* createStore(const Options& config); }; diff --git a/cpp/src/qpid/broker/ManagementAgent.cpp b/cpp/src/qpid/broker/ManagementAgent.cpp new file mode 100644 index 0000000000..f2ebe991b5 --- /dev/null +++ b/cpp/src/qpid/broker/ManagementAgent.cpp @@ -0,0 +1,187 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementAgent.h" +#include "DeliverableMessage.h" +#include "qpid/log/Statement.h" +#include <qpid/broker/Message.h> +#include <qpid/broker/MessageDelivery.h> +#include <qpid/framing/AMQFrame.h> + +using namespace qpid::framing; +using namespace qpid::broker; +using namespace qpid::sys; + +ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval) +{ + timer.add (TimerTask::shared_ptr (new Periodic(*this, interval))); +} + +void ManagementAgent::setExchange (Exchange::shared_ptr exchangePtr) +{ + exchange = exchangePtr; +} + +void ManagementAgent::addObject (ManagementObject::shared_ptr object) +{ + managementObjects.push_back (object); + QPID_LOG(info, "Management Object Added"); +} + +void ManagementAgent::deleteObject (ManagementObject::shared_ptr object) +{ + managementObjects.remove (object); + QPID_LOG (debug, "Management Object Removed"); +} + +ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) + : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {} + +void ManagementAgent::Periodic::fire () +{ + agent.timer.add (TimerTask::shared_ptr (new Periodic (agent, agent.interval))); + agent.PeriodicProcessing (); +} + +void ManagementAgent::PeriodicProcessing (void) +{ +#define BUFSIZE 65536 +#define THRESHOLD 16384 + char msgChars[BUFSIZE]; + Buffer msgBuffer (msgChars, BUFSIZE); + uint32_t contentSize; + + //QPID_LOG (debug, "Timer Fired"); + if (managementObjects.empty ()) + return; + + Message::shared_ptr msg (new Message ()); + + // Build the magic number for the management message. + msgBuffer.putOctet ('A'); + msgBuffer.putOctet ('M'); + msgBuffer.putOctet ('0'); + msgBuffer.putOctet ('1'); + + for (ManagementObjectList::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject::shared_ptr objectPtr = *iter; + + //QPID_LOG (debug, " Object Found..."); + + if (objectPtr->getSchemaNeeded ()) + { + //QPID_LOG (debug, " Generating Schema"); + uint32_t startAvail = msgBuffer.available (); + uint32_t recordLength; + + msgBuffer.putOctet ('S'); // opcode = Schema Record + msgBuffer.putOctet (0); // content-class = N/A + msgBuffer.putShort (objectPtr->getObjectType ()); + msgBuffer.record (); // Record the position of the length field + msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length + + objectPtr->writeSchema (msgBuffer); + recordLength = startAvail - msgBuffer.available (); + msgBuffer.restore (true); // Restore pointer to length field + msgBuffer.putLong (recordLength); + msgBuffer.restore (); // Re-restore to get to the end of the buffer + } + + if (objectPtr->getConfigChanged ()) + { + //QPID_LOG (debug, " Generating Config"); + uint32_t startAvail = msgBuffer.available (); + uint32_t recordLength; + + msgBuffer.putOctet ('C'); // opcode = Content Record + msgBuffer.putOctet ('C'); // content-class = Configuration + msgBuffer.putShort (objectPtr->getObjectType ()); + msgBuffer.record (); // Record the position of the length field + msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length + + objectPtr->writeConfig (msgBuffer); + recordLength = startAvail - msgBuffer.available (); + msgBuffer.restore (true); // Restore pointer to length field + msgBuffer.putLong (recordLength); + msgBuffer.restore (); // Re-restore to get to the end of the buffer + } + + if (objectPtr->getInstChanged ()) + { + //QPID_LOG (debug, " Generating Instrumentation"); + uint32_t startAvail = msgBuffer.available (); + uint32_t recordLength; + + msgBuffer.putOctet ('C'); // opcode = Content Record + msgBuffer.putOctet ('I'); // content-class = Instrumentation + msgBuffer.putShort (objectPtr->getObjectType ()); + msgBuffer.record (); // Record the position of the length field + msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length + + objectPtr->writeInstrumentation (msgBuffer); + recordLength = startAvail - msgBuffer.available (); + msgBuffer.restore (true); // Restore pointer to length field + msgBuffer.putLong (recordLength); + msgBuffer.restore (); // Re-restore to get to the end of the buffer + } + + // Temporary protection against buffer overrun. + // This needs to be replaced with frame fragmentation. + if (msgBuffer.available () < THRESHOLD) + break; + } + + msgBuffer.putOctet ('X'); // End-of-message + msgBuffer.putOctet (0); + msgBuffer.putShort (0); + msgBuffer.putLong (8); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + + AMQFrame method (0, MessageTransferBody(ProtocolVersion(), + 0, "qpid.management", 0, 0)); + AMQFrame header (0, AMQHeaderBody()); + AMQFrame content; + + content.setBody(AMQContentBody()); + content.castBody<AMQContentBody>()->decode(msgBuffer, contentSize); + + method.setEof (false); + header.setBof (false); + header.setEof (false); + content.setBof (false); + + msg->getFrames().append(method); + msg->getFrames().append(header); + + MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); + props->setContentLength(contentSize); + //msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey("mgmt"); + msg->getFrames().append(content); + + DeliverableMessage deliverable (msg); + exchange->route (deliverable, "mgmt", 0); +} + diff --git a/cpp/src/qpid/broker/ManagementAgent.h b/cpp/src/qpid/broker/ManagementAgent.h new file mode 100644 index 0000000000..4f3b0a0f5f --- /dev/null +++ b/cpp/src/qpid/broker/ManagementAgent.h @@ -0,0 +1,70 @@ +#ifndef _ManagementAgent_ +#define _ManagementAgent_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/Options.h" +#include "Exchange.h" +#include "ManagementObject.h" +#include "Timer.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace broker { + + +class ManagementAgent +{ + public: + + typedef boost::shared_ptr<ManagementAgent> shared_ptr; + + ManagementAgent(uint16_t interval); + + void setExchange (Exchange::shared_ptr exchangePtr); + void addObject (ManagementObject::shared_ptr object); + void deleteObject (ManagementObject::shared_ptr object); + + private: + + struct Periodic : public TimerTask + { + ManagementAgent& agent; + + Periodic (ManagementAgent& agent, uint32_t seconds); + ~Periodic () {} + void fire (); + }; + + ManagementObjectList managementObjects; + Timer timer; + Exchange::shared_ptr exchange; + uint16_t interval; + + void PeriodicProcessing (void); +}; + +}} + + + +#endif /*!_ManagementAgent_*/ diff --git a/cpp/src/qpid/broker/ManagementObject.cpp b/cpp/src/qpid/broker/ManagementObject.cpp new file mode 100644 index 0000000000..75913ba3ab --- /dev/null +++ b/cpp/src/qpid/broker/ManagementObject.cpp @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementObject.h" + +using namespace qpid::framing; +using namespace qpid::broker; + +void ManagementObject::schemaItem (Buffer& buf, + uint8_t typeCode, + std::string name, + std::string description, + bool isConfig) +{ + buf.putOctet (isConfig ? 1 : 0); + buf.putOctet (typeCode); + buf.putShortString (name); + buf.putShortString (description); +} + +void ManagementObject::schemaListEnd (Buffer& buf) +{ + buf.putOctet (0xFF); +} diff --git a/cpp/src/qpid/broker/ManagementObject.h b/cpp/src/qpid/broker/ManagementObject.h new file mode 100644 index 0000000000..1588aed641 --- /dev/null +++ b/cpp/src/qpid/broker/ManagementObject.h @@ -0,0 +1,92 @@ +#ifndef _ManagementObject_ +#define _ManagementObject_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Time.h" +#include <qpid/framing/Buffer.h> +#include <boost/shared_ptr.hpp> +#include <list> + +namespace qpid { +namespace broker { + +using namespace qpid::framing; + +const uint16_t OBJECT_BROKER = 1; +const uint16_t OBJECT_SERVER = 2; +const uint16_t OBJECT_QUEUE = 3; +const uint16_t OBJECT_EXCHANGE = 4; +const uint16_t OBJECT_BINDING = 5; + +class ManagementObject +{ + private: + + qpid::sys::AbsTime createTime; + qpid::sys::AbsTime destroyTime; + + protected: + + bool configChanged; + bool instChanged; + + static const uint8_t TYPE_UINT8 = 1; + static const uint8_t TYPE_UINT16 = 2; + static const uint8_t TYPE_UINT32 = 3; + static const uint8_t TYPE_UINT64 = 4; + static const uint8_t TYPE_BOOL = 5; + static const uint8_t TYPE_STRING = 6; + + void schemaItem (Buffer& buf, + uint8_t typeCode, + std::string name, + std::string description, + bool isConfig = false); + void schemaListEnd (Buffer & buf); + + public: + typedef boost::shared_ptr<ManagementObject> shared_ptr; + + ManagementObject () : configChanged(true), instChanged(true) { createTime = qpid::sys::now (); } + virtual ~ManagementObject () {} + + virtual uint16_t getObjectType (void) = 0; + virtual std::string getObjectName (void) = 0; + virtual void writeSchema (Buffer& buf) = 0; + virtual void writeConfig (Buffer& buf) = 0; + virtual void writeInstrumentation (Buffer& buf) = 0; + virtual bool getSchemaNeeded (void) = 0; + + inline bool getConfigChanged (void) { return configChanged; } + inline bool getInstChanged (void) { return instChanged; } + inline void resourceDestroy (void) { destroyTime = qpid::sys::now (); } + +}; + + typedef std::list<ManagementObject::shared_ptr> ManagementObjectList; + +}} + + + +#endif /*!_ManagementObject_*/ diff --git a/cpp/src/qpid/broker/ManagementObjectQueue.cpp b/cpp/src/qpid/broker/ManagementObjectQueue.cpp new file mode 100644 index 0000000000..b81cd7b60d --- /dev/null +++ b/cpp/src/qpid/broker/ManagementObjectQueue.cpp @@ -0,0 +1,168 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementObjectQueue.h" + +using namespace qpid::broker; +using namespace qpid::sys; +using namespace qpid::framing; + +bool ManagementObjectQueue::schemaNeeded = true; + +ManagementObjectQueue::ManagementObjectQueue (std::string& _name, bool _durable, bool _autoDelete) : + name(_name), durable(_durable), autoDelete(_autoDelete) +{ + msgTotalEnqueues = 0; + msgTotalDequeues = 0; + msgTxEnqueues = 0; + msgTxDequeues = 0; + msgPersistEnqueues = 0; + msgPersistDequeues = 0; + + msgDepth = 0; + msgDepthLow = 0; + msgDepthHigh = 0; + + byteTotalEnqueues = 0; + byteTotalDequeues = 0; + byteTxEnqueues = 0; + byteTxDequeues = 0; + bytePersistEnqueues = 0; + bytePersistDequeues = 0; + + byteDepth = 0; + byteDepthLow = 0; + byteDepthHigh = 0; + + enqueueTxStarts = 0; + enqueueTxCommits = 0; + enqueueTxRejects = 0; + dequeueTxStarts = 0; + dequeueTxCommits = 0; + dequeueTxRejects = 0; + + enqueueTxCount = 0; + enqueueTxCountLow = 0; + enqueueTxCountHigh = 0; + + dequeueTxCount = 0; + dequeueTxCountLow = 0; + dequeueTxCountHigh = 0; + + consumers = 0; + consumersLow = 0; + consumersHigh = 0; +} + +ManagementObjectQueue::~ManagementObjectQueue () {} + +void ManagementObjectQueue::writeSchema (Buffer& buf) +{ + schemaItem (buf, TYPE_STRING, "name", "Queue Name", true); + schemaItem (buf, TYPE_BOOL, "durable", "Durable", true); + schemaItem (buf, TYPE_BOOL, "autoDelete", "AutoDelete", true); + + schemaItem (buf, TYPE_UINT64, "msgTotalEnqueues", "Total messages enqueued"); + schemaItem (buf, TYPE_UINT64, "msgTotalDequeues", "Total messages dequeued"); + schemaItem (buf, TYPE_UINT64, "msgTxEnqueues", "Transactional messages enqueued"); + schemaItem (buf, TYPE_UINT64, "msgTxDequeues", "Transactional messages dequeued"); + schemaItem (buf, TYPE_UINT64, "msgPersistEnqueues", "Persistent messages enqueued"); + schemaItem (buf, TYPE_UINT64, "msgPersistDequeues", "Persistent messages dequeued"); + schemaItem (buf, TYPE_UINT32, "msgDepth", "Current size of queue in messages"); + schemaItem (buf, TYPE_UINT32, "msgDepthLow", "Low-water queue size, this interval"); + schemaItem (buf, TYPE_UINT32, "msgDepthHigh", "High-water queue size, this interval"); + schemaItem (buf, TYPE_UINT64, "byteTotalEnqueues", "Total messages enqueued"); + schemaItem (buf, TYPE_UINT64, "byteTotalDequeues", "Total messages dequeued"); + schemaItem (buf, TYPE_UINT64, "byteTxEnqueues", "Transactional messages enqueued"); + schemaItem (buf, TYPE_UINT64, "byteTxDequeues", "Transactional messages dequeued"); + schemaItem (buf, TYPE_UINT64, "bytePersistEnqueues", "Persistent messages enqueued"); + schemaItem (buf, TYPE_UINT64, "bytePersistDequeues", "Persistent messages dequeued"); + schemaItem (buf, TYPE_UINT32, "byteDepth", "Current size of queue in bytes"); + schemaItem (buf, TYPE_UINT32, "byteDepthLow", "Low-water mark this interval"); + schemaItem (buf, TYPE_UINT32, "byteDepthHigh", "High-water mark this interval"); + schemaItem (buf, TYPE_UINT64, "enqueueTxStarts", "Total enqueue transactions started "); + schemaItem (buf, TYPE_UINT64, "enqueueTxCommits", "Total enqueue transactions committed"); + schemaItem (buf, TYPE_UINT64, "enqueueTxRejects", "Total enqueue transactions rejected"); + schemaItem (buf, TYPE_UINT32, "enqueueTxCount", "Current pending enqueue transactions"); + schemaItem (buf, TYPE_UINT32, "enqueueTxCountLow", "Low water mark this interval"); + schemaItem (buf, TYPE_UINT32, "enqueueTxCountHigh", "High water mark this interval"); + schemaItem (buf, TYPE_UINT64, "dequeueTxStarts", "Total dequeue transactions started "); + schemaItem (buf, TYPE_UINT64, "dequeueTxCommits", "Total dequeue transactions committed"); + schemaItem (buf, TYPE_UINT64, "dequeueTxRejects", "Total dequeue transactions rejected"); + schemaItem (buf, TYPE_UINT32, "dequeueTxCount", "Current pending dequeue transactions"); + schemaItem (buf, TYPE_UINT32, "dequeueTxCountLow", "Transaction low water mark this interval"); + schemaItem (buf, TYPE_UINT32, "dequeueTxCountHigh", "Transaction high water mark this interval"); + schemaItem (buf, TYPE_UINT32, "consumers", "Current consumers on queue"); + schemaItem (buf, TYPE_UINT32, "consumersLow", "Consumer low water mark this interval"); + schemaItem (buf, TYPE_UINT32, "consumersHigh", "Consumer high water mark this interval"); + + schemaListEnd (buf); + + schemaNeeded = false; +} + +void ManagementObjectQueue::writeConfig (Buffer& buf) +{ + buf.putShortString (name); + buf.putOctet (durable ? 1 : 0); + buf.putOctet (autoDelete ? 1 : 0); + + configChanged = false; +} + +void ManagementObjectQueue::writeInstrumentation (Buffer& buf) +{ + buf.putLongLong (msgTotalEnqueues); + buf.putLongLong (msgTotalDequeues); + buf.putLongLong (msgTxEnqueues); + buf.putLongLong (msgTxDequeues); + buf.putLongLong (msgPersistEnqueues); + buf.putLongLong (msgPersistDequeues); + buf.putLong (msgDepth); + buf.putLong (msgDepthLow); + buf.putLong (msgDepthHigh); + buf.putLongLong (byteTotalEnqueues); + buf.putLongLong (byteTotalDequeues); + buf.putLongLong (byteTxEnqueues); + buf.putLongLong (byteTxDequeues); + buf.putLongLong (bytePersistEnqueues); + buf.putLongLong (bytePersistDequeues); + buf.putLong (byteDepth); + buf.putLong (byteDepthLow); + buf.putLong (byteDepthHigh); + buf.putLongLong (enqueueTxStarts); + buf.putLongLong (enqueueTxCommits); + buf.putLongLong (enqueueTxRejects); + buf.putLong (enqueueTxCount); + buf.putLong (enqueueTxCountLow); + buf.putLong (enqueueTxCountHigh); + buf.putLongLong (dequeueTxStarts); + buf.putLongLong (dequeueTxCommits); + buf.putLongLong (dequeueTxRejects); + buf.putLong (dequeueTxCount); + buf.putLong (dequeueTxCountLow); + buf.putLong (dequeueTxCountHigh); + buf.putLong (consumers); + buf.putLong (consumersLow); + buf.putLong (consumersHigh); + + instChanged = false; +} diff --git a/cpp/src/qpid/broker/ManagementObjectQueue.h b/cpp/src/qpid/broker/ManagementObjectQueue.h new file mode 100644 index 0000000000..989d10f8c0 --- /dev/null +++ b/cpp/src/qpid/broker/ManagementObjectQueue.h @@ -0,0 +1,179 @@ +#ifndef _ManagementObjectQueue_ +#define _ManagementObjectQueue_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementObject.h" + +namespace qpid { +namespace broker { + +const uint32_t MSG_MASK_TX = 1; // Transactional message +const uint32_t MSG_MASK_PERSIST = 2; // Persistent message + +class ManagementObjectQueue : public ManagementObject +{ + private: + + static bool schemaNeeded; + + std::string objectName; + std::string name; + bool durable; + bool autoDelete; + + uint64_t msgTotalEnqueues; // Total messages enqueued + uint64_t msgTotalDequeues; // Total messages dequeued + uint64_t msgTxEnqueues; // Transactional messages enqueued + uint64_t msgTxDequeues; // Transactional messages dequeued + uint64_t msgPersistEnqueues; // Persistent messages enqueued + uint64_t msgPersistDequeues; // Persistent messages dequeued + + uint32_t msgDepth; // Current size of queue in messages + uint32_t msgDepthLow; // Low-water queue size, this interval + uint32_t msgDepthHigh; // High-water queue size, this interval + + uint64_t byteTotalEnqueues; // Total messages enqueued + uint64_t byteTotalDequeues; // Total messages dequeued + uint64_t byteTxEnqueues; // Transactional messages enqueued + uint64_t byteTxDequeues; // Transactional messages dequeued + uint64_t bytePersistEnqueues; // Persistent messages enqueued + uint64_t bytePersistDequeues; // Persistent messages dequeued + + uint32_t byteDepth; // Current size of queue in bytes + uint32_t byteDepthLow; // Low-water mark this interval + uint32_t byteDepthHigh; // High-water mark this interval + + uint64_t enqueueTxStarts; // Total enqueue transactions started + uint64_t enqueueTxCommits; // Total enqueue transactions committed + uint64_t enqueueTxRejects; // Total enqueue transactions rejected + + uint32_t enqueueTxCount; // Current pending enqueue transactions + uint32_t enqueueTxCountLow; // Low water mark this interval + uint32_t enqueueTxCountHigh; // High water mark this interval + + uint64_t dequeueTxStarts; // Total dequeue transactions started + uint64_t dequeueTxCommits; // Total dequeue transactions committed + uint64_t dequeueTxRejects; // Total dequeue transactions rejected + + uint32_t dequeueTxCount; // Current pending dequeue transactions + uint32_t dequeueTxCountLow; // Low water mark this interval + uint32_t dequeueTxCountHigh; // High water mark this interval + + uint32_t consumers; // Current consumers on queue + uint32_t consumersLow; // Low water mark this interval + uint32_t consumersHigh; // High water mark this interval + + uint16_t getObjectType (void) { return OBJECT_QUEUE; } + std::string getObjectName (void) { return objectName; } + void writeSchema (Buffer& buf); + void writeConfig (Buffer& buf); + void writeInstrumentation (Buffer& buf); + bool getSchemaNeeded (void) { return schemaNeeded; } + + inline void adjustQueueHiLo (void){ + if (msgDepth > msgDepthHigh) msgDepthHigh = msgDepth; + if (msgDepth < msgDepthLow) msgDepthLow = msgDepth; + + if (byteDepth > byteDepthHigh) byteDepthHigh = byteDepth; + if (byteDepth < byteDepthLow) byteDepthLow = byteDepth; + instChanged = true; + } + + inline void adjustTxHiLo (void){ + if (enqueueTxCount > enqueueTxCountHigh) enqueueTxCountHigh = enqueueTxCount; + if (enqueueTxCount < enqueueTxCountLow) enqueueTxCountLow = enqueueTxCount; + if (dequeueTxCount > dequeueTxCountHigh) dequeueTxCountHigh = dequeueTxCount; + if (dequeueTxCount < dequeueTxCountLow) dequeueTxCountLow = dequeueTxCount; + instChanged = true; + } + + inline void adjustConsumerHiLo (void){ + if (consumers > consumersHigh) consumersHigh = consumers; + if (consumers < consumersLow) consumersLow = consumers; + instChanged = true; + } + + public: + + typedef boost::shared_ptr<ManagementObjectQueue> shared_ptr; + + ManagementObjectQueue (std::string& name, bool durable, bool autoDelete); + ~ManagementObjectQueue (void); + + // The following mask contents are used to describe enqueued or dequeued + // messages when counting statistics. + + inline void enqueue (uint64_t bytes, uint32_t attrMask = 0){ + msgTotalEnqueues++; + byteTotalEnqueues += bytes; + + if (attrMask & MSG_MASK_TX){ + msgTxEnqueues++; + byteTxEnqueues += bytes; + } + + if (attrMask & MSG_MASK_PERSIST){ + msgPersistEnqueues++; + bytePersistEnqueues += bytes; + } + + msgDepth++; + byteDepth += bytes; + adjustQueueHiLo (); + } + + inline void dequeue (uint64_t bytes, uint32_t attrMask = 0){ + msgTotalDequeues++; + byteTotalDequeues += bytes; + + if (attrMask & MSG_MASK_TX){ + msgTxDequeues++; + byteTxDequeues += bytes; + } + + if (attrMask & MSG_MASK_PERSIST){ + msgPersistDequeues++; + bytePersistDequeues += bytes; + } + + msgDepth--; + byteDepth -= bytes; + adjustQueueHiLo (); + } + + inline void incConsumers (void){ + consumers++; + adjustConsumerHiLo (); + } + + inline void decConsumers (void){ + consumers--; + adjustConsumerHiLo (); + } +}; + +}} + + + +#endif /*!_ManagementObjectQueue_*/ diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index af248b8fae..456e055c74 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -77,7 +77,11 @@ void Queue::deliver(Message::shared_ptr& msg){ if (!enqueue(0, msg)){ push(msg); msg->enqueueComplete(); + if (mgmtObjectPtr != 0) + mgmtObjectPtr->enqueue (msg->contentSize ()); }else { + if (mgmtObjectPtr != 0) + mgmtObjectPtr->enqueue (msg->contentSize (), MSG_MASK_PERSIST); push(msg); } QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); @@ -89,6 +93,8 @@ void Queue::deliver(Message::shared_ptr& msg){ void Queue::recover(Message::shared_ptr& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued + if (mgmtObjectPtr != 0) + mgmtObjectPtr->enqueue (msg->contentSize (), MSG_MASK_PERSIST); if (store && !msg->isContentLoaded()) { //content has not been loaded, need to ensure that lazy loading mode is set: //TODO: find a nicer way to do this @@ -97,8 +103,15 @@ void Queue::recover(Message::shared_ptr& msg){ } void Queue::process(Message::shared_ptr& msg){ - + + uint32_t mask = MSG_MASK_TX; + + if (msg->isPersistent ()) + mask |= MSG_MASK_PERSIST; + push(msg); + if (mgmtObjectPtr != 0) + mgmtObjectPtr->enqueue (msg->contentSize (), mask); serializer.execute(dispatchCallback); } @@ -267,6 +280,14 @@ QueuedMessage Queue::dequeue(){ if(!messages.empty()){ msg = messages.front(); pop(); + if (mgmtObjectPtr != 0){ + uint32_t mask = 0; + + if (msg.payload->isPersistent ()) + mask |= MSG_MASK_PERSIST; + + mgmtObjectPtr->dequeue (msg.payload->contentSize (), mask); + } } return msg; } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 082ccce246..24a9959d14 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -35,6 +35,7 @@ #include "PersistableQueue.h" #include "QueuePolicy.h" #include "QueueBindings.h" +#include "ManagementObjectQueue.h" namespace qpid { namespace broker { @@ -93,6 +94,7 @@ namespace qpid { qpid::sys::Serializer<DispatchFunctor> serializer; DispatchFunctor dispatchCallback; framing::SequenceNumber sequence; + ManagementObjectQueue::shared_ptr mgmtObjectPtr; void pop(); void push(Message::shared_ptr& msg); @@ -130,6 +132,8 @@ namespace qpid { void destroy(); void bound(const string& exchange, const string& key, const qpid::framing::FieldTable& args); void unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref); + void setMgmt (ManagementObjectQueue::shared_ptr mgmt) { mgmtObjectPtr = mgmt; } + ManagementObjectQueue::shared_ptr getMgmt (void) { return mgmtObjectPtr; } bool acquire(const QueuedMessage& msg); @@ -158,7 +162,7 @@ namespace qpid { * Request dispatch any queued messages providing there are * consumers for them. Only one thread can be dispatching * at any time, so this call schedules the despatch based on - * the serilizer policy. + * the serilizer policy. */ void requestDispatch(Consumer::ptr c = Consumer::ptr()); void flush(DispatchCompletion& callback); diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index ef1fb982e1..927de4c079 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -19,13 +19,16 @@ * */ #include "QueueRegistry.h" +#include "ManagementAgent.h" +#include "ManagementObjectQueue.h" #include <sstream> #include <assert.h> using namespace qpid::broker; using namespace qpid::sys; -QueueRegistry::QueueRegistry(MessageStore* const _store) : counter(1), store(_store){} +QueueRegistry::QueueRegistry(MessageStore* const _store) : + counter(1), store(_store) {} QueueRegistry::~QueueRegistry(){} @@ -37,9 +40,18 @@ QueueRegistry::declare(const string& declareName, bool durable, string name = declareName.empty() ? generateName() : declareName; assert(!name.empty()); QueueMap::iterator i = queues.find(name); + if (i == queues.end()) { Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner)); queues[name] = queue; + + if (managementAgent){ + ManagementObjectQueue::shared_ptr mgmtObject(new ManagementObjectQueue (name, durable, autoDelete)); + + queue->setMgmt (mgmtObject); + managementAgent->addObject(dynamic_pointer_cast<ManagementObject>(mgmtObject)); + } + return std::pair<Queue::shared_ptr, bool>(queue, true); } else { return std::pair<Queue::shared_ptr, bool>(i->second, false); @@ -48,12 +60,24 @@ QueueRegistry::declare(const string& declareName, bool durable, void QueueRegistry::destroy(const string& name){ RWlock::ScopedWlock locker(lock); + + if (managementAgent){ + ManagementObjectQueue::shared_ptr mgmtObject; + QueueMap::iterator i = queues.find(name); + + if (i != queues.end()){ + mgmtObject = i->second->getMgmt (); + managementAgent->deleteObject (dynamic_pointer_cast<ManagementObject>(mgmtObject)); + } + } + queues.erase(name); } Queue::shared_ptr QueueRegistry::find(const string& name){ RWlock::ScopedRlock locker(lock); QueueMap::iterator i = queues.find(name); + if (i == queues.end()) { return Queue::shared_ptr(); } else { @@ -76,3 +100,14 @@ string QueueRegistry::generateName(){ MessageStore* const QueueRegistry::getStore() const { return store; } + +void QueueRegistry::setManagementAgent (ManagementAgent::shared_ptr agent) +{ + managementAgent = agent; +} + +ManagementAgent::shared_ptr QueueRegistry::getManagementAgent (void) +{ + return managementAgent; +} + diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h index f73f467945..07669bb3a1 100644 --- a/cpp/src/qpid/broker/QueueRegistry.h +++ b/cpp/src/qpid/broker/QueueRegistry.h @@ -24,6 +24,7 @@ #include <map> #include "qpid/sys/Mutex.h" #include "Queue.h" +#include "ManagementAgent.h" namespace qpid { namespace broker { @@ -87,6 +88,12 @@ class QueueRegistry{ * Return the message store used. */ MessageStore* const getStore() const; + + /** + * Set/Get the ManagementAgent in use. + */ + void setManagementAgent (ManagementAgent::shared_ptr agent); + ManagementAgent::shared_ptr getManagementAgent (void); private: typedef std::map<string, Queue::shared_ptr> QueueMap; @@ -94,6 +101,7 @@ private: qpid::sys::RWlock lock; int counter; MessageStore* const store; + ManagementAgent::shared_ptr managementAgent; }; diff --git a/cpp/src/qpid/framing/Buffer.cpp b/cpp/src/qpid/framing/Buffer.cpp index 758d0b5f3c..eaa4433b5f 100644 --- a/cpp/src/qpid/framing/Buffer.cpp +++ b/cpp/src/qpid/framing/Buffer.cpp @@ -34,8 +34,17 @@ void Buffer::record(){ r_position = position; } -void Buffer::restore(){ +void Buffer::restore(bool reRecord){ + uint32_t savedPosition = position; + position = r_position; + + if (reRecord) + r_position = savedPosition; +} + +void Buffer::reset(){ + position = 0; } uint32_t Buffer::available(){ diff --git a/cpp/src/qpid/framing/Buffer.h b/cpp/src/qpid/framing/Buffer.h index 3b2e611881..190f736f46 100644 --- a/cpp/src/qpid/framing/Buffer.h +++ b/cpp/src/qpid/framing/Buffer.h @@ -41,7 +41,8 @@ public: Buffer(char* data, uint32_t size); void record(); - void restore(); + void restore(bool reRecord = false); + void reset(); uint32_t available(); void putOctet(uint8_t i); |