diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 69 |
1 files changed, 46 insertions, 23 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 41a5767457..376b9367d0 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -58,7 +58,7 @@ Queue::Queue(const string& _name, bool _autodelete, if (parent != 0) { mgmtObject = management::Queue::shared_ptr - (new management::Queue (this, parent, _name, _store != 0, _autodelete)); + (new management::Queue (this, parent, _name, _store != 0, _autodelete, 0)); ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); agent->addObject (mgmtObject); @@ -92,11 +92,21 @@ void Queue::deliver(intrusive_ptr<Message>& msg){ if (!enqueue(0, msg)){ push(msg); msg->enqueueComplete(); - if (mgmtObject != 0) - mgmtObject->enqueue (msg->contentSize ()); + if (mgmtObject != 0) { + mgmtObject->inc_msgTotalEnqueues (); + mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); + mgmtObject->inc_msgDepth (); + mgmtObject->inc_byteDepth (msg->contentSize ()); + } }else { - if (mgmtObject != 0) - mgmtObject->enqueue (msg->contentSize (), management::MSG_MASK_PERSIST); + if (mgmtObject != 0) { + mgmtObject->inc_msgTotalEnqueues (); + mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); + mgmtObject->inc_msgDepth (); + mgmtObject->inc_byteDepth (msg->contentSize ()); + mgmtObject->inc_msgPersistEnqueues (); + mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); + } push(msg); } QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); @@ -108,8 +118,15 @@ void Queue::deliver(intrusive_ptr<Message>& msg){ void Queue::recover(intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued - if (mgmtObject != 0) - mgmtObject->enqueue (msg->contentSize (), management::MSG_MASK_PERSIST); + if (mgmtObject != 0) { + mgmtObject->inc_msgTotalEnqueues (); + mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); + mgmtObject->inc_msgPersistEnqueues (); + mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); + mgmtObject->inc_msgDepth (); + mgmtObject->inc_byteDepth (msg->contentSize ()); + } + if (store && !msg->isContentLoaded()) { //content has not been loaded, need to ensure that lazy loading mode is set: //TODO: find a nicer way to do this @@ -118,15 +135,19 @@ void Queue::recover(intrusive_ptr<Message>& msg){ } void Queue::process(intrusive_ptr<Message>& msg){ - - uint32_t mask = management::MSG_MASK_TX; - - if (msg->isPersistent ()) - mask |= management::MSG_MASK_PERSIST; - push(msg); - if (mgmtObject != 0) - mgmtObject->enqueue (msg->contentSize (), mask); + if (mgmtObject != 0) { + mgmtObject->inc_msgTotalEnqueues (); + mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); + mgmtObject->inc_msgTxnEnqueues (); + mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); + mgmtObject->inc_msgDepth (); + mgmtObject->inc_byteDepth (msg->contentSize ()); + if (msg->isPersistent ()) { + mgmtObject->inc_msgPersistEnqueues (); + mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); + } + } serializer.execute(dispatchCallback); } @@ -309,7 +330,7 @@ void Queue::consume(Consumer::ptr c, bool requestExclusive){ } if (mgmtObject != 0){ - mgmtObject->incConsumers (); + mgmtObject->inc_consumers (); } } @@ -321,7 +342,7 @@ void Queue::cancel(Consumer::ptr c){ cancel(c, browsers); } if (mgmtObject != 0){ - mgmtObject->decConsumers (); + mgmtObject->dec_consumers (); } if(exclusive == c) exclusive.reset(); } @@ -341,12 +362,14 @@ QueuedMessage Queue::dequeue(){ msg = messages.front(); pop(); if (mgmtObject != 0){ - uint32_t mask = 0; - - if (msg.payload->isPersistent ()) - mask |= management::MSG_MASK_PERSIST; - - mgmtObject->dequeue (msg.payload->contentSize (), mask); + mgmtObject->inc_msgTotalDequeues (); + //mgmtObject->inc_byteTotalDequeues (msg->contentSize ()); + mgmtObject->dec_msgDepth (); + //mgmtObject->dec_byteDepth (msg->contentSize ()); + if (0){//msg->isPersistent ()) { + mgmtObject->inc_msgPersistDequeues (); + //mgmtObject->inc_bytePersistDequeues (msg->contentSize ()); + } } } return msg; |