summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp69
1 files changed, 46 insertions, 23 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 41a5767457..376b9367d0 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -58,7 +58,7 @@ Queue::Queue(const string& _name, bool _autodelete,
if (parent != 0)
{
mgmtObject = management::Queue::shared_ptr
- (new management::Queue (this, parent, _name, _store != 0, _autodelete));
+ (new management::Queue (this, parent, _name, _store != 0, _autodelete, 0));
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
agent->addObject (mgmtObject);
@@ -92,11 +92,21 @@ void Queue::deliver(intrusive_ptr<Message>& msg){
if (!enqueue(0, msg)){
push(msg);
msg->enqueueComplete();
- if (mgmtObject != 0)
- mgmtObject->enqueue (msg->contentSize ());
+ if (mgmtObject != 0) {
+ mgmtObject->inc_msgTotalEnqueues ();
+ mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
+ mgmtObject->inc_msgDepth ();
+ mgmtObject->inc_byteDepth (msg->contentSize ());
+ }
}else {
- if (mgmtObject != 0)
- mgmtObject->enqueue (msg->contentSize (), management::MSG_MASK_PERSIST);
+ if (mgmtObject != 0) {
+ mgmtObject->inc_msgTotalEnqueues ();
+ mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
+ mgmtObject->inc_msgDepth ();
+ mgmtObject->inc_byteDepth (msg->contentSize ());
+ mgmtObject->inc_msgPersistEnqueues ();
+ mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
+ }
push(msg);
}
QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
@@ -108,8 +118,15 @@ void Queue::deliver(intrusive_ptr<Message>& msg){
void Queue::recover(intrusive_ptr<Message>& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
- if (mgmtObject != 0)
- mgmtObject->enqueue (msg->contentSize (), management::MSG_MASK_PERSIST);
+ if (mgmtObject != 0) {
+ mgmtObject->inc_msgTotalEnqueues ();
+ mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
+ mgmtObject->inc_msgPersistEnqueues ();
+ mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
+ mgmtObject->inc_msgDepth ();
+ mgmtObject->inc_byteDepth (msg->contentSize ());
+ }
+
if (store && !msg->isContentLoaded()) {
//content has not been loaded, need to ensure that lazy loading mode is set:
//TODO: find a nicer way to do this
@@ -118,15 +135,19 @@ void Queue::recover(intrusive_ptr<Message>& msg){
}
void Queue::process(intrusive_ptr<Message>& msg){
-
- uint32_t mask = management::MSG_MASK_TX;
-
- if (msg->isPersistent ())
- mask |= management::MSG_MASK_PERSIST;
-
push(msg);
- if (mgmtObject != 0)
- mgmtObject->enqueue (msg->contentSize (), mask);
+ if (mgmtObject != 0) {
+ mgmtObject->inc_msgTotalEnqueues ();
+ mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
+ mgmtObject->inc_msgTxnEnqueues ();
+ mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
+ mgmtObject->inc_msgDepth ();
+ mgmtObject->inc_byteDepth (msg->contentSize ());
+ if (msg->isPersistent ()) {
+ mgmtObject->inc_msgPersistEnqueues ();
+ mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
+ }
+ }
serializer.execute(dispatchCallback);
}
@@ -309,7 +330,7 @@ void Queue::consume(Consumer::ptr c, bool requestExclusive){
}
if (mgmtObject != 0){
- mgmtObject->incConsumers ();
+ mgmtObject->inc_consumers ();
}
}
@@ -321,7 +342,7 @@ void Queue::cancel(Consumer::ptr c){
cancel(c, browsers);
}
if (mgmtObject != 0){
- mgmtObject->decConsumers ();
+ mgmtObject->dec_consumers ();
}
if(exclusive == c) exclusive.reset();
}
@@ -341,12 +362,14 @@ QueuedMessage Queue::dequeue(){
msg = messages.front();
pop();
if (mgmtObject != 0){
- uint32_t mask = 0;
-
- if (msg.payload->isPersistent ())
- mask |= management::MSG_MASK_PERSIST;
-
- mgmtObject->dequeue (msg.payload->contentSize (), mask);
+ mgmtObject->inc_msgTotalDequeues ();
+ //mgmtObject->inc_byteTotalDequeues (msg->contentSize ());
+ mgmtObject->dec_msgDepth ();
+ //mgmtObject->dec_byteDepth (msg->contentSize ());
+ if (0){//msg->isPersistent ()) {
+ mgmtObject->inc_msgPersistDequeues ();
+ //mgmtObject->inc_bytePersistDequeues (msg->contentSize ());
+ }
}
}
return msg;