summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2012-02-09 21:11:41 +0000
committerTed Ross <tross@apache.org>2012-02-09 21:11:41 +0000
commit192126471686e72d7b59ef9923458fcefe6847a2 (patch)
treee0a652a61c5bae9dd2b7f26d847a4049f0ed7693 /qpid/cpp
parent19a3076040f4d144e604f825b59e48ab27524440 (diff)
downloadqpid-python-192126471686e72d7b59ef9923458fcefe6847a2.tar.gz
QPID-3824 - Additional queue statistics, posix memory statistics, and broker-scope statistics
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1242526 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/include/qpid/sys/MemStat.h38
-rw-r--r--qpid/cpp/src/CMakeLists.txt2
-rw-r--r--qpid/cpp/src/Makefile.am2
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.h2
-rw-r--r--qpid/cpp/src/qpid/broker/HeadersExchange.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp146
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h16
-rw-r--r--qpid/cpp/src/qpid/broker/QueuePolicy.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/QueuePolicy.h6
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp12
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h7
-rw-r--r--qpid/cpp/src/qpid/sys/posix/MemStat.cpp38
-rw-r--r--qpid/cpp/src/qpid/sys/windows/MemStat.cpp29
17 files changed, 306 insertions, 15 deletions
diff --git a/qpid/cpp/include/qpid/sys/MemStat.h b/qpid/cpp/include/qpid/sys/MemStat.h
new file mode 100644
index 0000000000..d855786cd5
--- /dev/null
+++ b/qpid/cpp/include/qpid/sys/MemStat.h
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef sys_MemStat
+#define sys_MemStat
+
+#include "qpid/CommonImportExport.h"
+#include "qmf/org/apache/qpid/broker/Memory.h"
+
+namespace qpid {
+namespace sys {
+
+ class QPID_COMMON_CLASS_EXTERN MemStat {
+ public:
+ QPID_COMMON_EXTERN static void loadMemInfo(qmf::org::apache::qpid::broker::Memory* object);
+ };
+
+}}
+
+#endif
+
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index 41d3cec1f6..1a84f5e79a 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -669,6 +669,7 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows)
qpid/sys/windows/SystemInfo.cpp
qpid/sys/windows/Thread.cpp
qpid/sys/windows/Time.cpp
+ qpid/sys/windows/MemStat.cpp
qpid/client/windows/SaslFactory.cpp
${sslcommon_windows_SOURCES}
)
@@ -740,6 +741,7 @@ else (CMAKE_SYSTEM_NAME STREQUAL Windows)
qpid/sys/posix/FileSysDir.cpp
qpid/sys/posix/IOHandle.cpp
qpid/sys/posix/LockFile.cpp
+ qpid/sys/posix/MemStat.cpp
qpid/sys/posix/Mutex.cpp
qpid/sys/posix/PipeHandle.cpp
qpid/sys/posix/PollableCondition.cpp
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index fb26251da0..9533e37565 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -53,6 +53,7 @@ windows_dist = \
../include/qpid/sys/windows/Time.h \
qpid/sys/windows/uuid.cpp \
qpid/sys/windows/uuid.h \
+ qpid/sys/windows/MemStat.cpp \
windows/QpiddBroker.cpp \
windows/SCM.h \
windows/SCM.cpp \
@@ -163,6 +164,7 @@ libqpidcommon_la_SOURCES += \
qpid/sys/posix/Time.cpp \
qpid/sys/posix/Thread.cpp \
qpid/sys/posix/Shlib.cpp \
+ qpid/sys/posix/MemStat.cpp \
qpid/sys/posix/Mutex.cpp \
qpid/sys/posix/Fork.cpp \
qpid/sys/posix/StrError.cpp \
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 89532ae256..ff6da087c3 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -194,6 +194,7 @@ Broker::Broker(const Broker::Options& conf) :
conf.replayFlushLimit*1024, // convert kb to bytes.
conf.replayHardLimit*1024),
*this),
+ mgmtObject(0),
queueCleaner(queues, &timer),
queueEvents(poller,!conf.asyncQueueEvents),
recovery(true),
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
index 0b8fe95d5e..adc145dc84 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -142,6 +142,7 @@ void DeliveryRecord::reject()
//just drop it
QPID_LOG(info, "Dropping rejected message from " << queue->getName());
}
+ queue->countRejected();
dequeue();
setEnded();
}
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index 5d763bf0da..ecaa492903 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -142,6 +142,8 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
//QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found");
mgmtExchange->inc_msgDrops ();
mgmtExchange->inc_byteDrops (msg.contentSize ());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsNoRoute();
}
else
{
@@ -161,7 +163,7 @@ void Exchange::routeIVE(){
Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
name(_name), durable(false), persistenceId(0), sequence(false),
- sequenceNo(0), ive(false), mgmtExchange(0), broker(b), destroyed(false)
+ sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false)
{
if (parent != 0 && broker != 0)
{
@@ -172,6 +174,8 @@ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
mgmtExchange->set_durable(durable);
mgmtExchange->set_autoDelete(false);
agent->addObject(mgmtExchange, 0, durable);
+ if (broker)
+ brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
}
}
}
@@ -179,7 +183,7 @@ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
Manageable* parent, Broker* b)
: name(_name), durable(_durable), alternateUsers(0), persistenceId(0),
- args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), broker(b), destroyed(false)
+ args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false)
{
if (parent != 0 && broker != 0)
{
@@ -191,6 +195,8 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
mgmtExchange->set_autoDelete(false);
mgmtExchange->set_arguments(ManagementAgent::toMap(args));
agent->addObject(mgmtExchange, 0, durable);
+ if (broker)
+ brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
}
}
diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h
index b12af9a1dd..9179dd5c7c 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.h
+++ b/qpid/cpp/src/qpid/broker/Exchange.h
@@ -32,6 +32,7 @@
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Exchange.h"
#include "qmf/org/apache/qpid/broker/Binding.h"
+#include "qmf/org/apache/qpid/broker/Broker.h"
namespace qpid {
namespace broker {
@@ -158,6 +159,7 @@ protected:
};
qmf::org::apache::qpid::broker::Exchange* mgmtExchange;
+ qmf::org::apache::qpid::broker::Broker* brokerMgmtObject;
public:
typedef boost::shared_ptr<Exchange> shared_ptr;
diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
index 4bda70d313..142c23f276 100644
--- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -200,6 +200,8 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons
mgmtExchange->inc_byteReceives(msg.contentSize());
mgmtExchange->inc_msgDrops();
mgmtExchange->inc_byteDrops(msg.contentSize());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsNoRoute();
}
return;
}
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index d13109dad1..ae4503328a 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -270,6 +270,7 @@ void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16
morecontent = getContentFrame(queue, frame, maxContentSize, offset);
out.handle(frame);
}
+ queue.countLoadedFromDisk(contentSize());
} else {
Count c;
frames.map_if(c, TypeFilter<CONTENT_BODY>());
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index dd23760922..969d510e26 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -109,6 +109,7 @@ Queue::Queue(const string& _name, bool _autodelete,
persistenceId(0),
policyExceeded(false),
mgmtObject(0),
+ brokerMgmtObject(0),
eventMode(0),
insertSeqNo(0),
broker(b),
@@ -123,14 +124,20 @@ Queue::Queue(const string& _name, bool _autodelete,
if (agent != 0) {
mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete, _owner != 0);
agent->addObject(mgmtObject, 0, store != 0);
+ brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_queueCount();
}
}
}
Queue::~Queue()
{
- if (mgmtObject != 0)
+ if (mgmtObject != 0) {
mgmtObject->resourceDestroy();
+ if (brokerMgmtObject)
+ brokerMgmtObject->dec_queueCount();
+ }
}
bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg)
@@ -204,6 +211,10 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){
if (mgmtObject != 0){
mgmtObject->inc_msgTxnEnqueues ();
mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
+ if (brokerMgmtObject) {
+ brokerMgmtObject->inc_msgTxnEnqueues ();
+ brokerMgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
+ }
}
}
@@ -221,7 +232,13 @@ void Queue::requeue(const QueuedMessage& msg){
if (alternateExchange.get()) {
DeliverableMessage dmsg(msg.payload);
alternateExchange->routeWithAlternate(dmsg);
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_abandonedViaAlt();
+ } else {
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_abandoned();
}
+ mgntDeqStats(msg.payload);
} else {
messages->reinsert(msg);
listeners.populate(copy);
@@ -234,8 +251,8 @@ void Queue::requeue(const QueuedMessage& msg){
enqueue(0, payload);
}
}
+ observeRequeue(msg, locker);
}
- observeRequeue(msg, locker);
}
copy.notify();
}
@@ -323,6 +340,11 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
c->setPosition(msg.position);
acquire( msg.position, msg, locker);
dequeue( 0, msg );
+ if (mgmtObject) {
+ mgmtObject->inc_discardsTtl();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsTtl();
+ }
continue;
}
@@ -504,6 +526,15 @@ void Queue::purgeExpired(qpid::sys::Duration lapse)
messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1));
}
+ //
+ // Report the count of discarded-by-ttl messages
+ //
+ if (mgmtObject && !expired.empty()) {
+ mgmtObject->inc_discardsTtl(expired.size());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsTtl(expired.size());
+ }
+
for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
i != expired.end(); ++i) {
{
@@ -638,6 +669,19 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange>
Mutex::ScopedLock locker(messageLock);
messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
+
+ if (mgmtObject && !c.matches.empty()) {
+ if (dest.get()) {
+ mgmtObject->inc_reroutes(c.matches.size());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_reroutes(c.matches.size());
+ } else {
+ mgmtObject->inc_discardsPurge(c.matches.size());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsPurge(c.matches.size());
+ }
+ }
+
for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
qmsg != c.matches.end(); ++qmsg) {
// Update observers and message state:
@@ -710,8 +754,14 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence);
dequeueRequired = messages->push(qm, removed);
- if (dequeueRequired)
+ if (dequeueRequired) {
observeAcquire(removed, locker);
+ if (mgmtObject) {
+ mgmtObject->inc_discardsLvq();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsLvq();
+ }
+ }
listeners.populate(copy);
observeEnqueue(qm, locker);
}
@@ -799,10 +849,30 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg
std::deque<QueuedMessage> dequeues;
{
Mutex::ScopedLock locker(messageLock);
- policy->tryEnqueue(msg);
+ try {
+ policy->tryEnqueue(msg);
+ } catch(ResourceLimitExceededException&) {
+ if (mgmtObject) {
+ mgmtObject->inc_discardsOverflow();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsOverflow();
+ }
+ throw;
+ }
policy->getPendingDequeues(dequeues);
}
//depending on policy, may have some dequeues that need to performed without holding the lock
+
+ //
+ // Count the dequeues as ring-discards. We know that these aren't rejects because
+ // policy->tryEnqueue would have thrown an exception.
+ //
+ if (mgmtObject && !dequeues.empty()) {
+ mgmtObject->inc_discardsRing(dequeues.size());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsRing(dequeues.size());
+ }
+
for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
@@ -871,6 +941,10 @@ void Queue::dequeueCommitted(const QueuedMessage& msg)
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
+ if (brokerMgmtObject) {
+ brokerMgmtObject->inc_msgTxnDequeues();
+ brokerMgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
+ }
}
}
@@ -893,8 +967,8 @@ void Queue::popAndDequeue(const Mutex::ScopedLock& held)
*/
void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
{
- if (policy.get()) policy->dequeued(msg);
mgntDeqStats(msg.payload);
+ if (policy.get()) policy->dequeued(msg);
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->dequeued(msg);
@@ -909,6 +983,12 @@ void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
*/
void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&)
{
+ if (mgmtObject) {
+ mgmtObject->inc_acquires();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_acquires();
+ }
+
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->acquired(msg);
@@ -923,6 +1003,12 @@ void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&)
*/
void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
{
+ if (mgmtObject) {
+ mgmtObject->inc_releases();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_releases();
+ }
+
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->requeued(msg);
@@ -1079,14 +1165,22 @@ void Queue::configureImpl(const FieldTable& _settings)
void Queue::destroyed()
{
unbind(broker->getExchanges());
- if (alternateExchange.get()) {
+ {
Mutex::ScopedLock locker(messageLock);
while(!messages->empty()){
DeliverableMessage msg(messages->front().payload);
- alternateExchange->routeWithAlternate(msg);
+ if (alternateExchange.get()) {
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_abandonedViaAlt();
+ alternateExchange->routeWithAlternate(msg);
+ } else {
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_abandoned();
+ }
popAndDequeue(locker);
}
- alternateExchange->decAlternateUsers();
+ if (alternateExchange.get())
+ alternateExchange->decAlternateUsers();
}
if (store) {
@@ -1124,6 +1218,8 @@ void Queue::unbind(ExchangeRegistry& exchanges)
void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
{
policy = _policy;
+ if (policy.get())
+ policy->setQueue(this);
}
const QueuePolicy* Queue::getPolicy()
@@ -1291,6 +1387,40 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
}
}
+void Queue::countRejected() const
+{
+ if (mgmtObject) {
+ mgmtObject->inc_discardsSubscriber();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsSubscriber();
+ }
+}
+
+void Queue::countFlowedToDisk(uint64_t size) const
+{
+ if (mgmtObject) {
+ mgmtObject->inc_msgFtdEnqueues();
+ mgmtObject->inc_byteFtdEnqueues(size);
+ if (brokerMgmtObject) {
+ brokerMgmtObject->inc_msgFtdEnqueues();
+ brokerMgmtObject->inc_byteFtdEnqueues(size);
+ }
+ }
+}
+
+void Queue::countLoadedFromDisk(uint64_t size) const
+{
+ if (mgmtObject) {
+ mgmtObject->inc_msgFtdDequeues();
+ mgmtObject->inc_byteFtdDequeues(size);
+ if (brokerMgmtObject) {
+ brokerMgmtObject->inc_msgFtdDequeues();
+ brokerMgmtObject->inc_byteFtdDequeues(size);
+ }
+ }
+}
+
+
ManagementObject* Queue::GetManagementObject (void) const
{
return (ManagementObject*) mgmtObject;
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 59ae41e768..5eca1e9b0c 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -39,6 +39,7 @@
#include "qpid/sys/Timer.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Queue.h"
+#include "qmf/org/apache/qpid/broker/Broker.h"
#include "qpid/framing/amqp_types.h"
#include <boost/shared_ptr.hpp>
@@ -92,7 +93,6 @@ class Queue : public boost::enable_shared_from_this<Queue>,
typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
-
const std::string name;
const bool autodelete;
MessageStore* store;
@@ -119,6 +119,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
boost::shared_ptr<Exchange> alternateExchange;
framing::SequenceNumber sequence;
qmf::org::apache::qpid::broker::Queue* mgmtObject;
+ qmf::org::apache::qpid::broker::Broker* brokerMgmtObject;
sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge.
int eventMode;
Observers observers;
@@ -165,9 +166,13 @@ class Queue : public boost::enable_shared_from_this<Queue>,
if (mgmtObject != 0) {
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
+ brokerMgmtObject->inc_msgTotalEnqueues ();
+ brokerMgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
if (msg->isPersistent ()) {
mgmtObject->inc_msgPersistEnqueues ();
mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
+ brokerMgmtObject->inc_msgPersistEnqueues ();
+ brokerMgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
}
}
}
@@ -176,9 +181,13 @@ class Queue : public boost::enable_shared_from_this<Queue>,
if (mgmtObject != 0){
mgmtObject->inc_msgTotalDequeues ();
mgmtObject->inc_byteTotalDequeues (msg->contentSize());
+ brokerMgmtObject->inc_msgTotalDequeues ();
+ brokerMgmtObject->inc_byteTotalDequeues (msg->contentSize());
if (msg->isPersistent ()){
mgmtObject->inc_msgPersistDequeues ();
mgmtObject->inc_bytePersistDequeues (msg->contentSize());
+ brokerMgmtObject->inc_msgPersistDequeues ();
+ brokerMgmtObject->inc_bytePersistDequeues (msg->contentSize());
}
}
}
@@ -355,6 +364,11 @@ class Queue : public boost::enable_shared_from_this<Queue>,
virtual void setExternalQueueStore(ExternalQueueStore* inst);
+ // Increment the rejected-by-consumer counter.
+ void countRejected() const;
+ void countFlowedToDisk(uint64_t size) const;
+ void countLoadedFromDisk(uint64_t size) const;
+
// Manageable entry points
management::ManagementObject* GetManagementObject (void) const;
management::Manageable::status_t
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
index dafcf92a63..d5b4c1ae86 100644
--- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -31,7 +31,7 @@ using namespace qpid::broker;
using namespace qpid::framing;
QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
- maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), name(_name) {
+ maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), queue(0), name(_name) {
QPID_LOG(info, "Queue \"" << name << "\": Policy created: type=" << type << "; maxCount=" << maxCount << "; maxSize=" << maxSize);
}
@@ -204,7 +204,11 @@ FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount,
bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m)
{
- if (!QueuePolicy::checkLimit(m)) m->requestContentRelease();
+ if (!QueuePolicy::checkLimit(m)) {
+ m->requestContentRelease();
+ if (queue)
+ queue->countFlowedToDisk(m->contentSize());
+ }
return true;
}
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.h b/qpid/cpp/src/qpid/broker/QueuePolicy.h
index ec7f846704..f23b709f18 100644
--- a/qpid/cpp/src/qpid/broker/QueuePolicy.h
+++ b/qpid/cpp/src/qpid/broker/QueuePolicy.h
@@ -33,6 +33,8 @@
namespace qpid {
namespace broker {
+class Queue;
+
class QueuePolicy
{
static uint64_t defaultMaxSize;
@@ -44,8 +46,8 @@ class QueuePolicy
uint64_t size;
bool policyExceeded;
-
protected:
+ Queue* queue;
uint64_t getCurrentQueueSize() const { return size; }
public:
@@ -72,6 +74,8 @@ class QueuePolicy
void decode ( framing::Buffer& buffer );
uint32_t encodedSize() const;
virtual void getPendingDequeues(Messages& result);
+ std::string getType() const { return type; }
+ void setQueue(Queue* q) { queue = q; }
static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings);
static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index ff5271f83b..741ef442b0 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -122,7 +122,7 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
suppressed(false), disallowAllV1Methods(false),
vendorNameKey(defaultVendorName), productNameKey(defaultProductName),
qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100),
- msgBuffer(MA_BUFFER_SIZE)
+ msgBuffer(MA_BUFFER_SIZE), memstat(0)
{
nextObjectId = 1;
brokerBank = 1;
@@ -132,6 +132,9 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
clientWasAdded = false;
attrMap["_vendor"] = defaultVendorName;
attrMap["_product"] = defaultProductName;
+
+ memstat = new qmf::org::apache::qpid::broker::Memory(this, 0, "amqp-broker");
+ addObject(memstat, "amqp-broker");
}
ManagementAgent::~ManagementAgent ()
@@ -720,6 +723,7 @@ void ManagementAgent::periodicProcessing (void)
static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime);
moveNewObjectsLH();
+ qpid::sys::MemStat::loadMemInfo(memstat);
//
// Clear the been-here flag on all objects in the map.
@@ -1834,6 +1838,9 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe
string className (value->get<string>());
std::list<ObjectId>matches;
+ if (className == "memory")
+ qpid::sys::MemStat::loadMemInfo(memstat);
+
// build up a set of all objects to be dumped
for (ManagementObjectMap::iterator iter = managementObjects.begin();
iter != managementObjects.end();
@@ -1946,6 +1953,8 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co
packageName = s_iter->second.asString();
}
+ if (className == "memory")
+ qpid::sys::MemStat::loadMemInfo(memstat);
/*
* Unpack the _object_id element of the query if it is present. If it is present, find that one
@@ -1968,6 +1977,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co
Variant::Map values;
Variant::Map oidMap;
+ object->writeTimestamps(map_);
object->mapEncodeValues(values, true, true); // write both stats and properties
objId.mapEncode(oidMap);
map_["_values"] = values;
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index c21f384433..f68bfe0577 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -32,6 +32,8 @@
#include "qpid/management/ManagementEvent.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Agent.h"
+#include "qmf/org/apache/qpid/broker/Memory.h"
+#include "qpid/sys/MemStat.h"
#include "qpid/types/Variant.h"
#include <qpid/framing/AMQFrame.h>
#include <qpid/framing/FieldValue.h>
@@ -343,6 +345,11 @@ private:
char eventBuffer[MA_BUFFER_SIZE];
framing::ResizableBuffer msgBuffer;
+ //
+ // Memory statistics object
+ //
+ qmf::org::apache::qpid::broker::Memory *memstat;
+
void writeData ();
void periodicProcessing (void);
void deleteObjectNowLH(const ObjectId& oid);
diff --git a/qpid/cpp/src/qpid/sys/posix/MemStat.cpp b/qpid/cpp/src/qpid/sys/posix/MemStat.cpp
new file mode 100644
index 0000000000..72c53e5886
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/MemStat.cpp
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/MemStat.h"
+#include <malloc.h>
+
+void qpid::sys::MemStat::loadMemInfo(qmf::org::apache::qpid::broker::Memory* object)
+{
+ struct mallinfo info(mallinfo());
+
+ object->set_malloc_arena(info.arena);
+ object->set_malloc_ordblks(info.ordblks);
+ object->set_malloc_hblks(info.hblks);
+ object->set_malloc_hblkhd(info.hblkhd);
+ object->set_malloc_uordblks(info.uordblks);
+ object->set_malloc_fordblks(info.fordblks);
+ object->set_malloc_keepcost(info.keepcost);
+}
+
+
diff --git a/qpid/cpp/src/qpid/sys/windows/MemStat.cpp b/qpid/cpp/src/qpid/sys/windows/MemStat.cpp
new file mode 100644
index 0000000000..4ad73933ad
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/MemStat.cpp
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/MemStat.h"
+
+qpid::sys::MemStat::loadMemInfo(qmf::org::apache::qpid::broker::Memory*)
+{
+ // TODO: Add Windows-specific memory stats to the object and load them here.
+}
+
+