From 4cd29d042523b98666a6e671037d00271403a302 Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Fri, 2 Mar 2012 20:51:19 +0000 Subject: QPID-3875: allow direct access to per-thread statistics git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1296448 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/managementgen/qmfgen/templates/Class.h | 8 ++++ qpid/cpp/src/qpid/broker/Connection.cpp | 24 ++++++----- qpid/cpp/src/qpid/broker/Exchange.cpp | 25 ++++++----- qpid/cpp/src/qpid/broker/Queue.cpp | 55 +++++++++++++++++++++++-- qpid/cpp/src/qpid/broker/Queue.h | 32 -------------- 5 files changed, 88 insertions(+), 56 deletions(-) diff --git a/qpid/cpp/managementgen/qmfgen/templates/Class.h b/qpid/cpp/managementgen/qmfgen/templates/Class.h index 0527d53536..95939f3d03 100644 --- a/qpid/cpp/managementgen/qmfgen/templates/Class.h +++ b/qpid/cpp/managementgen/qmfgen/templates/Class.h @@ -53,9 +53,12 @@ QPID_BROKER_CLASS_EXTERN class /*MGEN:Class.NameCap*/ : public ::qpid::managemen /*MGEN:Class.InstDeclarations*/ /*MGEN:IF(Class.ExistPerThreadStats)*/ // Per-Thread Statistics + + public: struct PerThreadStats { /*MGEN:Class.PerThreadDeclarations*/ }; + private: struct PerThreadStats** perThreadStatsArray; @@ -120,6 +123,11 @@ QPID_BROKER_CLASS_EXTERN class /*MGEN:Class.NameCap*/ : public ::qpid::managemen /*MGEN:Class.MethodIdDeclarations*/ // Accessor Methods /*MGEN:Class.AccessorMethods*/ + +/*MGEN:IF(Class.ExistPerThreadStats)*/ + struct PerThreadStats* getStatistics() { return getThreadStats(); } + void statisticsUpdated() { instChanged = true; } +/*MGEN:ENDIF*/ }; }/*MGEN:Class.CloseNamespaces*/ diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 1e6aab217c..aca3c26a4f 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -185,11 +185,13 @@ void Connection::recordFromServer(const framing::AMQFrame& frame) // Don't record management stats in cluster-unsafe contexts if (mgmtObject != 0 && isClusterSafe()) { - mgmtObject->inc_framesToClient(); - mgmtObject->inc_bytesToClient(frame.encodedSize()); - if (isMessage(frame.getMethod())) { - mgmtObject->inc_msgsToClient(); - } + qmf::org::apache::qpid::broker::Connection::PerThreadStats *cStats = mgmtObject->getStatistics(); + cStats->framesToClient += 1; + cStats->bytesToClient += frame.encodedSize(); + if (isMessage(frame.getMethod())) { + cStats->msgsToClient += 1; + } + mgmtObject->statisticsUpdated(); } } @@ -198,11 +200,13 @@ void Connection::recordFromClient(const framing::AMQFrame& frame) // Don't record management stats in cluster-unsafe contexts if (mgmtObject != 0 && isClusterSafe()) { - mgmtObject->inc_framesFromClient(); - mgmtObject->inc_bytesFromClient(frame.encodedSize()); - if (isMessage(frame.getMethod())) { - mgmtObject->inc_msgsFromClient(); - } + qmf::org::apache::qpid::broker::Connection::PerThreadStats *cStats = mgmtObject->getStatistics(); + cStats->framesFromClient += 1; + cStats->bytesFromClient += frame.encodedSize(); + if (isMessage(frame.getMethod())) { + cStats->msgsFromClient += 1; + } + mgmtObject->statisticsUpdated(); } } diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index ecaa492903..bf0dac111f 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -135,20 +135,23 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b) if (mgmtExchange != 0) { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); - if (count == 0) + qmf::org::apache::qpid::broker::Exchange::PerThreadStats *eStats = mgmtExchange->getStatistics(); + uint64_t contentSize = msg.contentSize(); + + eStats->msgReceives += 1; + eStats->byteReceives += contentSize; + if (count == 0) { - //QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found"); - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); - if (brokerMgmtObject) - brokerMgmtObject->inc_discardsNoRoute(); + //QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found"); + eStats->msgDrops += 1; + eStats->byteDrops += contentSize; + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsNoRoute(); } - else + else { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + eStats->msgRoutes += count; + eStats->byteRoutes += count * contentSize; } } } diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 015957927f..ab9560a346 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -88,8 +88,57 @@ const std::string qpidInsertSequenceNumbers("qpid.insert_sequence_numbers"); const int ENQUEUE_ONLY=1; const int ENQUEUE_AND_DEQUEUE=2; + +inline void mgntEnqStats(const boost::intrusive_ptr& msg, + _qmf::Queue* mgmtObject, + _qmf::Broker* brokerMgmtObject) +{ + if (mgmtObject != 0) { + qmf::org::apache::qpid::broker::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); + qmf::org::apache::qpid::broker::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); + + uint64_t contentSize = msg->contentSize(); + qStats->msgTotalEnqueues +=1; + bStats->msgTotalEnqueues += 1; + qStats->byteTotalEnqueues += contentSize; + bStats->byteTotalEnqueues += contentSize; + if (msg->isPersistent ()) { + qStats->msgPersistEnqueues += 1; + bStats->msgPersistEnqueues += 1; + qStats->bytePersistEnqueues += contentSize; + bStats->bytePersistEnqueues += contentSize; + } + mgmtObject->statisticsUpdated(); + brokerMgmtObject->statisticsUpdated(); + } +} + +inline void mgntDeqStats(const boost::intrusive_ptr& msg, + _qmf::Queue* mgmtObject, + _qmf::Broker* brokerMgmtObject) +{ + if (mgmtObject != 0){ + qmf::org::apache::qpid::broker::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); + qmf::org::apache::qpid::broker::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); + uint64_t contentSize = msg->contentSize(); + + qStats->msgTotalDequeues += 1; + bStats->msgTotalDequeues += 1; + qStats->byteTotalDequeues += contentSize; + bStats->byteTotalDequeues += contentSize; + if (msg->isPersistent ()){ + qStats->msgPersistDequeues += 1; + bStats->msgPersistDequeues += 1; + qStats->bytePersistDequeues += contentSize; + bStats->bytePersistDequeues += contentSize; + } + mgmtObject->statisticsUpdated(); + brokerMgmtObject->statisticsUpdated(); + } } +} // namespace + Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, const OwnershipToken* const _owner, @@ -238,7 +287,7 @@ void Queue::requeue(const QueuedMessage& msg){ if (brokerMgmtObject) brokerMgmtObject->inc_abandoned(); } - mgntDeqStats(msg.payload); + mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject); } else { messages->release(msg); listeners.populate(copy); @@ -951,7 +1000,7 @@ bool Queue::popAndDequeue(QueuedMessage& msg, const Mutex::ScopedLock& locker) */ void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) { - mgntDeqStats(msg.payload); + mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject); if (policy.get()) policy->dequeued(msg); messages->deleted(msg); for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { @@ -1512,7 +1561,7 @@ void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&) if (policy.get()) { policy->enqueued(m); } - mgntEnqStats(m.payload); + mgntEnqStats(m.payload, mgmtObject, brokerMgmtObject); } void Queue::updateEnqueued(const QueuedMessage& m) diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 66efe4eca3..282eb691b9 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -157,38 +157,6 @@ class Queue : public boost::enable_shared_from_this, void forcePersistent(QueuedMessage& msg); int getEventMode(); void configureImpl(const qpid::framing::FieldTable& settings); - - inline void mgntEnqStats(const boost::intrusive_ptr& msg) - { - if (mgmtObject != 0) { - mgmtObject->inc_msgTotalEnqueues (); - mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); - brokerMgmtObject->inc_msgTotalEnqueues (); - brokerMgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); - if (msg->isPersistent ()) { - mgmtObject->inc_msgPersistEnqueues (); - mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); - brokerMgmtObject->inc_msgPersistEnqueues (); - brokerMgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); - } - } - } - inline void mgntDeqStats(const boost::intrusive_ptr& msg) - { - if (mgmtObject != 0){ - mgmtObject->inc_msgTotalDequeues (); - mgmtObject->inc_byteTotalDequeues (msg->contentSize()); - brokerMgmtObject->inc_msgTotalDequeues (); - brokerMgmtObject->inc_byteTotalDequeues (msg->contentSize()); - if (msg->isPersistent ()){ - mgmtObject->inc_msgPersistDequeues (); - mgmtObject->inc_bytePersistDequeues (msg->contentSize()); - brokerMgmtObject->inc_msgPersistDequeues (); - brokerMgmtObject->inc_bytePersistDequeues (msg->contentSize()); - } - } - } - void checkNotDeleted(const Consumer::shared_ptr& c); void notifyDeleted(); -- cgit v1.2.1