diff options
author | Ted Ross <tross@apache.org> | 2012-02-09 21:11:41 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2012-02-09 21:11:41 +0000 |
commit | 192126471686e72d7b59ef9923458fcefe6847a2 (patch) | |
tree | e0a652a61c5bae9dd2b7f26d847a4049f0ed7693 /qpid/cpp | |
parent | 19a3076040f4d144e604f825b59e48ab27524440 (diff) | |
download | qpid-python-192126471686e72d7b59ef9923458fcefe6847a2.tar.gz |
QPID-3824 - Additional queue statistics, posix memory statistics, and broker-scope statistics
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1242526 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r-- | qpid/cpp/include/qpid/sys/MemStat.h | 38 | ||||
-rw-r--r-- | qpid/cpp/src/CMakeLists.txt | 2 | ||||
-rw-r--r-- | qpid/cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DeliveryRecord.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Exchange.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Exchange.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/HeadersExchange.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 146 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 16 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueuePolicy.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueuePolicy.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.cpp | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/posix/MemStat.cpp | 38 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/windows/MemStat.cpp | 29 |
17 files changed, 306 insertions, 15 deletions
diff --git a/qpid/cpp/include/qpid/sys/MemStat.h b/qpid/cpp/include/qpid/sys/MemStat.h new file mode 100644 index 0000000000..d855786cd5 --- /dev/null +++ b/qpid/cpp/include/qpid/sys/MemStat.h @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef sys_MemStat +#define sys_MemStat + +#include "qpid/CommonImportExport.h" +#include "qmf/org/apache/qpid/broker/Memory.h" + +namespace qpid { +namespace sys { + + class QPID_COMMON_CLASS_EXTERN MemStat { + public: + QPID_COMMON_EXTERN static void loadMemInfo(qmf::org::apache::qpid::broker::Memory* object); + }; + +}} + +#endif + diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 41d3cec1f6..1a84f5e79a 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -669,6 +669,7 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows) qpid/sys/windows/SystemInfo.cpp qpid/sys/windows/Thread.cpp qpid/sys/windows/Time.cpp + qpid/sys/windows/MemStat.cpp qpid/client/windows/SaslFactory.cpp ${sslcommon_windows_SOURCES} ) @@ -740,6 +741,7 @@ else (CMAKE_SYSTEM_NAME STREQUAL Windows) qpid/sys/posix/FileSysDir.cpp qpid/sys/posix/IOHandle.cpp qpid/sys/posix/LockFile.cpp + qpid/sys/posix/MemStat.cpp qpid/sys/posix/Mutex.cpp qpid/sys/posix/PipeHandle.cpp qpid/sys/posix/PollableCondition.cpp diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index fb26251da0..9533e37565 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -53,6 +53,7 @@ windows_dist = \ ../include/qpid/sys/windows/Time.h \ qpid/sys/windows/uuid.cpp \ qpid/sys/windows/uuid.h \ + qpid/sys/windows/MemStat.cpp \ windows/QpiddBroker.cpp \ windows/SCM.h \ windows/SCM.cpp \ @@ -163,6 +164,7 @@ libqpidcommon_la_SOURCES += \ qpid/sys/posix/Time.cpp \ qpid/sys/posix/Thread.cpp \ qpid/sys/posix/Shlib.cpp \ + qpid/sys/posix/MemStat.cpp \ qpid/sys/posix/Mutex.cpp \ qpid/sys/posix/Fork.cpp \ qpid/sys/posix/StrError.cpp \ diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 89532ae256..ff6da087c3 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -194,6 +194,7 @@ Broker::Broker(const Broker::Options& conf) : conf.replayFlushLimit*1024, // convert kb to bytes. conf.replayHardLimit*1024), *this), + mgmtObject(0), queueCleaner(queues, &timer), queueEvents(poller,!conf.asyncQueueEvents), recovery(true), diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp index 0b8fe95d5e..adc145dc84 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -142,6 +142,7 @@ void DeliveryRecord::reject() //just drop it QPID_LOG(info, "Dropping rejected message from " << queue->getName()); } + queue->countRejected(); dequeue(); setEnded(); } diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index 5d763bf0da..ecaa492903 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -142,6 +142,8 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b) //QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found"); mgmtExchange->inc_msgDrops (); mgmtExchange->inc_byteDrops (msg.contentSize ()); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsNoRoute(); } else { @@ -161,7 +163,7 @@ void Exchange::routeIVE(){ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : name(_name), durable(false), persistenceId(0), sequence(false), - sequenceNo(0), ive(false), mgmtExchange(0), broker(b), destroyed(false) + sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false) { if (parent != 0 && broker != 0) { @@ -172,6 +174,8 @@ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : mgmtExchange->set_durable(durable); mgmtExchange->set_autoDelete(false); agent->addObject(mgmtExchange, 0, durable); + if (broker) + brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject(); } } } @@ -179,7 +183,7 @@ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, Manageable* parent, Broker* b) : name(_name), durable(_durable), alternateUsers(0), persistenceId(0), - args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), broker(b), destroyed(false) + args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false) { if (parent != 0 && broker != 0) { @@ -191,6 +195,8 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel mgmtExchange->set_autoDelete(false); mgmtExchange->set_arguments(ManagementAgent::toMap(args)); agent->addObject(mgmtExchange, 0, durable); + if (broker) + brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject(); } } diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h index b12af9a1dd..9179dd5c7c 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.h +++ b/qpid/cpp/src/qpid/broker/Exchange.h @@ -32,6 +32,7 @@ #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Exchange.h" #include "qmf/org/apache/qpid/broker/Binding.h" +#include "qmf/org/apache/qpid/broker/Broker.h" namespace qpid { namespace broker { @@ -158,6 +159,7 @@ protected: }; qmf::org::apache::qpid::broker::Exchange* mgmtExchange; + qmf::org::apache::qpid::broker::Broker* brokerMgmtObject; public: typedef boost::shared_ptr<Exchange> shared_ptr; diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp index 4bda70d313..142c23f276 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp @@ -200,6 +200,8 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons mgmtExchange->inc_byteReceives(msg.contentSize()); mgmtExchange->inc_msgDrops(); mgmtExchange->inc_byteDrops(msg.contentSize()); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsNoRoute(); } return; } diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index d13109dad1..ae4503328a 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -270,6 +270,7 @@ void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16 morecontent = getContentFrame(queue, frame, maxContentSize, offset); out.handle(frame); } + queue.countLoadedFromDisk(contentSize()); } else { Count c; frames.map_if(c, TypeFilter<CONTENT_BODY>()); diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index dd23760922..969d510e26 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -109,6 +109,7 @@ Queue::Queue(const string& _name, bool _autodelete, persistenceId(0), policyExceeded(false), mgmtObject(0), + brokerMgmtObject(0), eventMode(0), insertSeqNo(0), broker(b), @@ -123,14 +124,20 @@ Queue::Queue(const string& _name, bool _autodelete, if (agent != 0) { mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete, _owner != 0); agent->addObject(mgmtObject, 0, store != 0); + brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject(); + if (brokerMgmtObject) + brokerMgmtObject->inc_queueCount(); } } } Queue::~Queue() { - if (mgmtObject != 0) + if (mgmtObject != 0) { mgmtObject->resourceDestroy(); + if (brokerMgmtObject) + brokerMgmtObject->dec_queueCount(); + } } bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg) @@ -204,6 +211,10 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){ if (mgmtObject != 0){ mgmtObject->inc_msgTxnEnqueues (); mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); + if (brokerMgmtObject) { + brokerMgmtObject->inc_msgTxnEnqueues (); + brokerMgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); + } } } @@ -221,7 +232,13 @@ void Queue::requeue(const QueuedMessage& msg){ if (alternateExchange.get()) { DeliverableMessage dmsg(msg.payload); alternateExchange->routeWithAlternate(dmsg); + if (brokerMgmtObject) + brokerMgmtObject->inc_abandonedViaAlt(); + } else { + if (brokerMgmtObject) + brokerMgmtObject->inc_abandoned(); } + mgntDeqStats(msg.payload); } else { messages->reinsert(msg); listeners.populate(copy); @@ -234,8 +251,8 @@ void Queue::requeue(const QueuedMessage& msg){ enqueue(0, payload); } } + observeRequeue(msg, locker); } - observeRequeue(msg, locker); } copy.notify(); } @@ -323,6 +340,11 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ c->setPosition(msg.position); acquire( msg.position, msg, locker); dequeue( 0, msg ); + if (mgmtObject) { + mgmtObject->inc_discardsTtl(); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsTtl(); + } continue; } @@ -504,6 +526,15 @@ void Queue::purgeExpired(qpid::sys::Duration lapse) messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1)); } + // + // Report the count of discarded-by-ttl messages + // + if (mgmtObject && !expired.empty()) { + mgmtObject->inc_discardsTtl(expired.size()); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsTtl(expired.size()); + } + for (std::deque<QueuedMessage>::const_iterator i = expired.begin(); i != expired.end(); ++i) { { @@ -638,6 +669,19 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> Mutex::ScopedLock locker(messageLock); messages->removeIf( boost::bind<bool>(boost::ref(c), _1) ); + + if (mgmtObject && !c.matches.empty()) { + if (dest.get()) { + mgmtObject->inc_reroutes(c.matches.size()); + if (brokerMgmtObject) + brokerMgmtObject->inc_reroutes(c.matches.size()); + } else { + mgmtObject->inc_discardsPurge(c.matches.size()); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsPurge(c.matches.size()); + } + } + for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); qmsg != c.matches.end(); ++qmsg) { // Update observers and message state: @@ -710,8 +754,14 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence); dequeueRequired = messages->push(qm, removed); - if (dequeueRequired) + if (dequeueRequired) { observeAcquire(removed, locker); + if (mgmtObject) { + mgmtObject->inc_discardsLvq(); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsLvq(); + } + } listeners.populate(copy); observeEnqueue(qm, locker); } @@ -799,10 +849,30 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg std::deque<QueuedMessage> dequeues; { Mutex::ScopedLock locker(messageLock); - policy->tryEnqueue(msg); + try { + policy->tryEnqueue(msg); + } catch(ResourceLimitExceededException&) { + if (mgmtObject) { + mgmtObject->inc_discardsOverflow(); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsOverflow(); + } + throw; + } policy->getPendingDequeues(dequeues); } //depending on policy, may have some dequeues that need to performed without holding the lock + + // + // Count the dequeues as ring-discards. We know that these aren't rejects because + // policy->tryEnqueue would have thrown an exception. + // + if (mgmtObject && !dequeues.empty()) { + mgmtObject->inc_discardsRing(dequeues.size()); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsRing(dequeues.size()); + } + for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } @@ -871,6 +941,10 @@ void Queue::dequeueCommitted(const QueuedMessage& msg) if (mgmtObject != 0) { mgmtObject->inc_msgTxnDequeues(); mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize()); + if (brokerMgmtObject) { + brokerMgmtObject->inc_msgTxnDequeues(); + brokerMgmtObject->inc_byteTxnDequeues(msg.payload->contentSize()); + } } } @@ -893,8 +967,8 @@ void Queue::popAndDequeue(const Mutex::ScopedLock& held) */ void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) { - if (policy.get()) policy->dequeued(msg); mgntDeqStats(msg.payload); + if (policy.get()) policy->dequeued(msg); for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ (*i)->dequeued(msg); @@ -909,6 +983,12 @@ void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) */ void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&) { + if (mgmtObject) { + mgmtObject->inc_acquires(); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(); + } + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ (*i)->acquired(msg); @@ -923,6 +1003,12 @@ void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&) */ void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) { + if (mgmtObject) { + mgmtObject->inc_releases(); + if (brokerMgmtObject) + brokerMgmtObject->inc_releases(); + } + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ (*i)->requeued(msg); @@ -1079,14 +1165,22 @@ void Queue::configureImpl(const FieldTable& _settings) void Queue::destroyed() { unbind(broker->getExchanges()); - if (alternateExchange.get()) { + { Mutex::ScopedLock locker(messageLock); while(!messages->empty()){ DeliverableMessage msg(messages->front().payload); - alternateExchange->routeWithAlternate(msg); + if (alternateExchange.get()) { + if (brokerMgmtObject) + brokerMgmtObject->inc_abandonedViaAlt(); + alternateExchange->routeWithAlternate(msg); + } else { + if (brokerMgmtObject) + brokerMgmtObject->inc_abandoned(); + } popAndDequeue(locker); } - alternateExchange->decAlternateUsers(); + if (alternateExchange.get()) + alternateExchange->decAlternateUsers(); } if (store) { @@ -1124,6 +1218,8 @@ void Queue::unbind(ExchangeRegistry& exchanges) void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy) { policy = _policy; + if (policy.get()) + policy->setQueue(this); } const QueuePolicy* Queue::getPolicy() @@ -1291,6 +1387,40 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { } } +void Queue::countRejected() const +{ + if (mgmtObject) { + mgmtObject->inc_discardsSubscriber(); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsSubscriber(); + } +} + +void Queue::countFlowedToDisk(uint64_t size) const +{ + if (mgmtObject) { + mgmtObject->inc_msgFtdEnqueues(); + mgmtObject->inc_byteFtdEnqueues(size); + if (brokerMgmtObject) { + brokerMgmtObject->inc_msgFtdEnqueues(); + brokerMgmtObject->inc_byteFtdEnqueues(size); + } + } +} + +void Queue::countLoadedFromDisk(uint64_t size) const +{ + if (mgmtObject) { + mgmtObject->inc_msgFtdDequeues(); + mgmtObject->inc_byteFtdDequeues(size); + if (brokerMgmtObject) { + brokerMgmtObject->inc_msgFtdDequeues(); + brokerMgmtObject->inc_byteFtdDequeues(size); + } + } +} + + ManagementObject* Queue::GetManagementObject (void) const { return (ManagementObject*) mgmtObject; diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 59ae41e768..5eca1e9b0c 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -39,6 +39,7 @@ #include "qpid/sys/Timer.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Queue.h" +#include "qmf/org/apache/qpid/broker/Broker.h" #include "qpid/framing/amqp_types.h" #include <boost/shared_ptr.hpp> @@ -92,7 +93,6 @@ class Queue : public boost::enable_shared_from_this<Queue>, typedef std::set< boost::shared_ptr<QueueObserver> > Observers; enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2}; - const std::string name; const bool autodelete; MessageStore* store; @@ -119,6 +119,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, boost::shared_ptr<Exchange> alternateExchange; framing::SequenceNumber sequence; qmf::org::apache::qpid::broker::Queue* mgmtObject; + qmf::org::apache::qpid::broker::Broker* brokerMgmtObject; sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge. int eventMode; Observers observers; @@ -165,9 +166,13 @@ class Queue : public boost::enable_shared_from_this<Queue>, if (mgmtObject != 0) { mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); + brokerMgmtObject->inc_msgTotalEnqueues (); + brokerMgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); if (msg->isPersistent ()) { mgmtObject->inc_msgPersistEnqueues (); mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); + brokerMgmtObject->inc_msgPersistEnqueues (); + brokerMgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); } } } @@ -176,9 +181,13 @@ class Queue : public boost::enable_shared_from_this<Queue>, if (mgmtObject != 0){ mgmtObject->inc_msgTotalDequeues (); mgmtObject->inc_byteTotalDequeues (msg->contentSize()); + brokerMgmtObject->inc_msgTotalDequeues (); + brokerMgmtObject->inc_byteTotalDequeues (msg->contentSize()); if (msg->isPersistent ()){ mgmtObject->inc_msgPersistDequeues (); mgmtObject->inc_bytePersistDequeues (msg->contentSize()); + brokerMgmtObject->inc_msgPersistDequeues (); + brokerMgmtObject->inc_bytePersistDequeues (msg->contentSize()); } } } @@ -355,6 +364,11 @@ class Queue : public boost::enable_shared_from_this<Queue>, virtual void setExternalQueueStore(ExternalQueueStore* inst); + // Increment the rejected-by-consumer counter. + void countRejected() const; + void countFlowedToDisk(uint64_t size) const; + void countLoadedFromDisk(uint64_t size) const; + // Manageable entry points management::ManagementObject* GetManagementObject (void) const; management::Manageable::status_t diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp index dafcf92a63..d5b4c1ae86 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp @@ -31,7 +31,7 @@ using namespace qpid::broker; using namespace qpid::framing; QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : - maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), name(_name) { + maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), queue(0), name(_name) { QPID_LOG(info, "Queue \"" << name << "\": Policy created: type=" << type << "; maxCount=" << maxCount << "; maxSize=" << maxSize); } @@ -204,7 +204,11 @@ FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount, bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m) { - if (!QueuePolicy::checkLimit(m)) m->requestContentRelease(); + if (!QueuePolicy::checkLimit(m)) { + m->requestContentRelease(); + if (queue) + queue->countFlowedToDisk(m->contentSize()); + } return true; } diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.h b/qpid/cpp/src/qpid/broker/QueuePolicy.h index ec7f846704..f23b709f18 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.h +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.h @@ -33,6 +33,8 @@ namespace qpid { namespace broker { +class Queue; + class QueuePolicy { static uint64_t defaultMaxSize; @@ -44,8 +46,8 @@ class QueuePolicy uint64_t size; bool policyExceeded; - protected: + Queue* queue; uint64_t getCurrentQueueSize() const { return size; } public: @@ -72,6 +74,8 @@ class QueuePolicy void decode ( framing::Buffer& buffer ); uint32_t encodedSize() const; virtual void getPendingDequeues(Messages& result); + std::string getType() const { return type; } + void setQueue(Queue* q) { queue = q; } static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings); static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index ff5271f83b..741ef442b0 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -122,7 +122,7 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : suppressed(false), disallowAllV1Methods(false), vendorNameKey(defaultVendorName), productNameKey(defaultProductName), qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100), - msgBuffer(MA_BUFFER_SIZE) + msgBuffer(MA_BUFFER_SIZE), memstat(0) { nextObjectId = 1; brokerBank = 1; @@ -132,6 +132,9 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : clientWasAdded = false; attrMap["_vendor"] = defaultVendorName; attrMap["_product"] = defaultProductName; + + memstat = new qmf::org::apache::qpid::broker::Memory(this, 0, "amqp-broker"); + addObject(memstat, "amqp-broker"); } ManagementAgent::~ManagementAgent () @@ -720,6 +723,7 @@ void ManagementAgent::periodicProcessing (void) static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); moveNewObjectsLH(); + qpid::sys::MemStat::loadMemInfo(memstat); // // Clear the been-here flag on all objects in the map. @@ -1834,6 +1838,9 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe string className (value->get<string>()); std::list<ObjectId>matches; + if (className == "memory") + qpid::sys::MemStat::loadMemInfo(memstat); + // build up a set of all objects to be dumped for (ManagementObjectMap::iterator iter = managementObjects.begin(); iter != managementObjects.end(); @@ -1946,6 +1953,8 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co packageName = s_iter->second.asString(); } + if (className == "memory") + qpid::sys::MemStat::loadMemInfo(memstat); /* * Unpack the _object_id element of the query if it is present. If it is present, find that one @@ -1968,6 +1977,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co Variant::Map values; Variant::Map oidMap; + object->writeTimestamps(map_); object->mapEncodeValues(values, true, true); // write both stats and properties objId.mapEncode(oidMap); map_["_values"] = values; diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index c21f384433..f68bfe0577 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -32,6 +32,8 @@ #include "qpid/management/ManagementEvent.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Agent.h" +#include "qmf/org/apache/qpid/broker/Memory.h" +#include "qpid/sys/MemStat.h" #include "qpid/types/Variant.h" #include <qpid/framing/AMQFrame.h> #include <qpid/framing/FieldValue.h> @@ -343,6 +345,11 @@ private: char eventBuffer[MA_BUFFER_SIZE]; framing::ResizableBuffer msgBuffer; + // + // Memory statistics object + // + qmf::org::apache::qpid::broker::Memory *memstat; + void writeData (); void periodicProcessing (void); void deleteObjectNowLH(const ObjectId& oid); diff --git a/qpid/cpp/src/qpid/sys/posix/MemStat.cpp b/qpid/cpp/src/qpid/sys/posix/MemStat.cpp new file mode 100644 index 0000000000..72c53e5886 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/MemStat.cpp @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/MemStat.h" +#include <malloc.h> + +void qpid::sys::MemStat::loadMemInfo(qmf::org::apache::qpid::broker::Memory* object) +{ + struct mallinfo info(mallinfo()); + + object->set_malloc_arena(info.arena); + object->set_malloc_ordblks(info.ordblks); + object->set_malloc_hblks(info.hblks); + object->set_malloc_hblkhd(info.hblkhd); + object->set_malloc_uordblks(info.uordblks); + object->set_malloc_fordblks(info.fordblks); + object->set_malloc_keepcost(info.keepcost); +} + + diff --git a/qpid/cpp/src/qpid/sys/windows/MemStat.cpp b/qpid/cpp/src/qpid/sys/windows/MemStat.cpp new file mode 100644 index 0000000000..4ad73933ad --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/MemStat.cpp @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/MemStat.h" + +qpid::sys::MemStat::loadMemInfo(qmf::org::apache::qpid::broker::Memory*) +{ + // TODO: Add Windows-specific memory stats to the object and load them here. +} + + |