summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp58
1 files changed, 38 insertions, 20 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 4404bedc24..226ba77fb2 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -94,8 +94,8 @@ inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg,
_qmf::Broker* brokerMgmtObject)
{
if (mgmtObject != 0) {
- qmf::org::apache::qpid::broker::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
- qmf::org::apache::qpid::broker::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
+ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
+ _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
uint64_t contentSize = msg->contentSize();
qStats->msgTotalEnqueues +=1;
@@ -118,8 +118,8 @@ inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg,
_qmf::Broker* brokerMgmtObject)
{
if (mgmtObject != 0){
- qmf::org::apache::qpid::broker::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
- qmf::org::apache::qpid::broker::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
+ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
+ _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
uint64_t contentSize = msg->contentSize();
qStats->msgTotalDequeues += 1;
@@ -258,11 +258,16 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
void Queue::process(boost::intrusive_ptr<Message>& msg){
push(msg);
if (mgmtObject != 0){
- mgmtObject->inc_msgTxnEnqueues ();
- mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
+ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
+ const uint64_t contentSize = msg->contentSize();
+ qStats->msgTxnEnqueues += 1;
+ qStats->byteTxnEnqueues += contentSize;
+ mgmtObject->statisticsUpdated();
if (brokerMgmtObject) {
- brokerMgmtObject->inc_msgTxnEnqueues ();
- brokerMgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
+ _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
+ bStats->msgTxnEnqueues += 1;
+ bStats->byteTxnEnqueues += contentSize;
+ brokerMgmtObject->statisticsUpdated();
}
}
}
@@ -970,11 +975,16 @@ void Queue::dequeueCommitted(const QueuedMessage& msg)
Mutex::ScopedLock locker(messageLock);
observeDequeue(msg, locker);
if (mgmtObject != 0) {
- mgmtObject->inc_msgTxnDequeues();
- mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
+ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
+ const uint64_t contentSize = msg.payload->contentSize();
+ qStats->msgTxnDequeues += 1;
+ qStats->byteTxnDequeues += contentSize;
+ mgmtObject->statisticsUpdated();
if (brokerMgmtObject) {
- brokerMgmtObject->inc_msgTxnDequeues();
- brokerMgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
+ _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
+ bStats->msgTxnDequeues += 1;
+ bStats->byteTxnDequeues += contentSize;
+ brokerMgmtObject->statisticsUpdated();
}
}
}
@@ -1437,11 +1447,15 @@ void Queue::countRejected() const
void Queue::countFlowedToDisk(uint64_t size) const
{
if (mgmtObject) {
- mgmtObject->inc_msgFtdEnqueues();
- mgmtObject->inc_byteFtdEnqueues(size);
+ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
+ qStats->msgFtdEnqueues += 1;
+ qStats->byteFtdEnqueues += size;
+ mgmtObject->statisticsUpdated();
if (brokerMgmtObject) {
- brokerMgmtObject->inc_msgFtdEnqueues();
- brokerMgmtObject->inc_byteFtdEnqueues(size);
+ _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
+ bStats->msgFtdEnqueues += 1;
+ bStats->byteFtdEnqueues += size;
+ brokerMgmtObject->statisticsUpdated();
}
}
}
@@ -1449,11 +1463,15 @@ void Queue::countFlowedToDisk(uint64_t size) const
void Queue::countLoadedFromDisk(uint64_t size) const
{
if (mgmtObject) {
- mgmtObject->inc_msgFtdDequeues();
- mgmtObject->inc_byteFtdDequeues(size);
+ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
+ qStats->msgFtdDequeues += 1;
+ qStats->byteFtdDequeues += size;
+ mgmtObject->statisticsUpdated();
if (brokerMgmtObject) {
- brokerMgmtObject->inc_msgFtdDequeues();
- brokerMgmtObject->inc_byteFtdDequeues(size);
+ _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
+ bStats->msgFtdDequeues += 1;
+ bStats->byteFtdDequeues += size;
+ brokerMgmtObject->statisticsUpdated();
}
}
}