summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2012-03-02 20:51:19 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2012-03-02 20:51:19 +0000
commit4cd29d042523b98666a6e671037d00271403a302 (patch)
tree4b2409c940b4332a369acc5fcbc5575164c814e1
parent2b9c7dbb5bd60e06e3f5c4f06d3f2ffa0d868cbd (diff)
downloadqpid-python-4cd29d042523b98666a6e671037d00271403a302.tar.gz
QPID-3875: allow direct access to per-thread statistics
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1296448 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/managementgen/qmfgen/templates/Class.h8
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp24
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp25
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp55
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h32
5 files changed, 88 insertions, 56 deletions
diff --git a/qpid/cpp/managementgen/qmfgen/templates/Class.h b/qpid/cpp/managementgen/qmfgen/templates/Class.h
index 0527d53536..95939f3d03 100644
--- a/qpid/cpp/managementgen/qmfgen/templates/Class.h
+++ b/qpid/cpp/managementgen/qmfgen/templates/Class.h
@@ -53,9 +53,12 @@ QPID_BROKER_CLASS_EXTERN class /*MGEN:Class.NameCap*/ : public ::qpid::managemen
/*MGEN:Class.InstDeclarations*/
/*MGEN:IF(Class.ExistPerThreadStats)*/
// Per-Thread Statistics
+
+ public:
struct PerThreadStats {
/*MGEN:Class.PerThreadDeclarations*/
};
+ private:
struct PerThreadStats** perThreadStatsArray;
@@ -120,6 +123,11 @@ QPID_BROKER_CLASS_EXTERN class /*MGEN:Class.NameCap*/ : public ::qpid::managemen
/*MGEN:Class.MethodIdDeclarations*/
// Accessor Methods
/*MGEN:Class.AccessorMethods*/
+
+/*MGEN:IF(Class.ExistPerThreadStats)*/
+ struct PerThreadStats* getStatistics() { return getThreadStats(); }
+ void statisticsUpdated() { instChanged = true; }
+/*MGEN:ENDIF*/
};
}/*MGEN:Class.CloseNamespaces*/
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index 1e6aab217c..aca3c26a4f 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -185,11 +185,13 @@ void Connection::recordFromServer(const framing::AMQFrame& frame)
// Don't record management stats in cluster-unsafe contexts
if (mgmtObject != 0 && isClusterSafe())
{
- mgmtObject->inc_framesToClient();
- mgmtObject->inc_bytesToClient(frame.encodedSize());
- if (isMessage(frame.getMethod())) {
- mgmtObject->inc_msgsToClient();
- }
+ qmf::org::apache::qpid::broker::Connection::PerThreadStats *cStats = mgmtObject->getStatistics();
+ cStats->framesToClient += 1;
+ cStats->bytesToClient += frame.encodedSize();
+ if (isMessage(frame.getMethod())) {
+ cStats->msgsToClient += 1;
+ }
+ mgmtObject->statisticsUpdated();
}
}
@@ -198,11 +200,13 @@ void Connection::recordFromClient(const framing::AMQFrame& frame)
// Don't record management stats in cluster-unsafe contexts
if (mgmtObject != 0 && isClusterSafe())
{
- mgmtObject->inc_framesFromClient();
- mgmtObject->inc_bytesFromClient(frame.encodedSize());
- if (isMessage(frame.getMethod())) {
- mgmtObject->inc_msgsFromClient();
- }
+ qmf::org::apache::qpid::broker::Connection::PerThreadStats *cStats = mgmtObject->getStatistics();
+ cStats->framesFromClient += 1;
+ cStats->bytesFromClient += frame.encodedSize();
+ if (isMessage(frame.getMethod())) {
+ cStats->msgsFromClient += 1;
+ }
+ mgmtObject->statisticsUpdated();
}
}
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index ecaa492903..bf0dac111f 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -135,20 +135,23 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
if (mgmtExchange != 0)
{
- mgmtExchange->inc_msgReceives ();
- mgmtExchange->inc_byteReceives (msg.contentSize ());
- if (count == 0)
+ qmf::org::apache::qpid::broker::Exchange::PerThreadStats *eStats = mgmtExchange->getStatistics();
+ uint64_t contentSize = msg.contentSize();
+
+ eStats->msgReceives += 1;
+ eStats->byteReceives += contentSize;
+ if (count == 0)
{
- //QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found");
- mgmtExchange->inc_msgDrops ();
- mgmtExchange->inc_byteDrops (msg.contentSize ());
- if (brokerMgmtObject)
- brokerMgmtObject->inc_discardsNoRoute();
+ //QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found");
+ eStats->msgDrops += 1;
+ eStats->byteDrops += contentSize;
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsNoRoute();
}
- else
+ else
{
- mgmtExchange->inc_msgRoutes (count);
- mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ eStats->msgRoutes += count;
+ eStats->byteRoutes += count * contentSize;
}
}
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 015957927f..ab9560a346 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -88,8 +88,57 @@ const std::string qpidInsertSequenceNumbers("qpid.insert_sequence_numbers");
const int ENQUEUE_ONLY=1;
const int ENQUEUE_AND_DEQUEUE=2;
+
+inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg,
+ _qmf::Queue* mgmtObject,
+ _qmf::Broker* brokerMgmtObject)
+{
+ if (mgmtObject != 0) {
+ qmf::org::apache::qpid::broker::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
+ qmf::org::apache::qpid::broker::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
+
+ uint64_t contentSize = msg->contentSize();
+ qStats->msgTotalEnqueues +=1;
+ bStats->msgTotalEnqueues += 1;
+ qStats->byteTotalEnqueues += contentSize;
+ bStats->byteTotalEnqueues += contentSize;
+ if (msg->isPersistent ()) {
+ qStats->msgPersistEnqueues += 1;
+ bStats->msgPersistEnqueues += 1;
+ qStats->bytePersistEnqueues += contentSize;
+ bStats->bytePersistEnqueues += contentSize;
+ }
+ mgmtObject->statisticsUpdated();
+ brokerMgmtObject->statisticsUpdated();
+ }
+}
+
+inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg,
+ _qmf::Queue* mgmtObject,
+ _qmf::Broker* brokerMgmtObject)
+{
+ if (mgmtObject != 0){
+ qmf::org::apache::qpid::broker::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
+ qmf::org::apache::qpid::broker::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
+ uint64_t contentSize = msg->contentSize();
+
+ qStats->msgTotalDequeues += 1;
+ bStats->msgTotalDequeues += 1;
+ qStats->byteTotalDequeues += contentSize;
+ bStats->byteTotalDequeues += contentSize;
+ if (msg->isPersistent ()){
+ qStats->msgPersistDequeues += 1;
+ bStats->msgPersistDequeues += 1;
+ qStats->bytePersistDequeues += contentSize;
+ bStats->bytePersistDequeues += contentSize;
+ }
+ mgmtObject->statisticsUpdated();
+ brokerMgmtObject->statisticsUpdated();
+ }
}
+} // namespace
+
Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
const OwnershipToken* const _owner,
@@ -238,7 +287,7 @@ void Queue::requeue(const QueuedMessage& msg){
if (brokerMgmtObject)
brokerMgmtObject->inc_abandoned();
}
- mgntDeqStats(msg.payload);
+ mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
} else {
messages->release(msg);
listeners.populate(copy);
@@ -951,7 +1000,7 @@ bool Queue::popAndDequeue(QueuedMessage& msg, const Mutex::ScopedLock& locker)
*/
void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
{
- mgntDeqStats(msg.payload);
+ mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
if (policy.get()) policy->dequeued(msg);
messages->deleted(msg);
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
@@ -1512,7 +1561,7 @@ void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&)
if (policy.get()) {
policy->enqueued(m);
}
- mgntEnqStats(m.payload);
+ mgntEnqStats(m.payload, mgmtObject, brokerMgmtObject);
}
void Queue::updateEnqueued(const QueuedMessage& m)
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 66efe4eca3..282eb691b9 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -157,38 +157,6 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void forcePersistent(QueuedMessage& msg);
int getEventMode();
void configureImpl(const qpid::framing::FieldTable& settings);
-
- inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg)
- {
- if (mgmtObject != 0) {
- mgmtObject->inc_msgTotalEnqueues ();
- mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
- brokerMgmtObject->inc_msgTotalEnqueues ();
- brokerMgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
- if (msg->isPersistent ()) {
- mgmtObject->inc_msgPersistEnqueues ();
- mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
- brokerMgmtObject->inc_msgPersistEnqueues ();
- brokerMgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
- }
- }
- }
- inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg)
- {
- if (mgmtObject != 0){
- mgmtObject->inc_msgTotalDequeues ();
- mgmtObject->inc_byteTotalDequeues (msg->contentSize());
- brokerMgmtObject->inc_msgTotalDequeues ();
- brokerMgmtObject->inc_byteTotalDequeues (msg->contentSize());
- if (msg->isPersistent ()){
- mgmtObject->inc_msgPersistDequeues ();
- mgmtObject->inc_bytePersistDequeues (msg->contentSize());
- brokerMgmtObject->inc_msgPersistDequeues ();
- brokerMgmtObject->inc_bytePersistDequeues (msg->contentSize());
- }
- }
- }
-
void checkNotDeleted(const Consumer::shared_ptr& c);
void notifyDeleted();