diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 70 |
1 files changed, 46 insertions, 24 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 4dba60cd0d..e2fd998cc0 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -59,11 +59,14 @@ Queue::Queue(const string& _name, bool _autodelete, { if (parent != 0) { - mgmtObject = management::Queue::shared_ptr - (new management::Queue (this, parent, _name, _store != 0, _autodelete, 0)); - ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); - agent->addObject (mgmtObject); + + if (agent.get () != 0) + { + mgmtObject = management::Queue::shared_ptr + (new management::Queue (this, parent, _name, _store != 0, _autodelete, 0)); + agent->addObject (mgmtObject); + } } } @@ -93,14 +96,14 @@ void Queue::deliver(intrusive_ptr<Message>& msg){ if (!enqueue(0, msg)){ push(msg); msg->enqueueComplete(); - if (mgmtObject != 0) { + if (mgmtObject.get() != 0) { mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); mgmtObject->inc_byteDepth (msg->contentSize ()); } }else { - if (mgmtObject != 0) { + if (mgmtObject.get() != 0) { mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); @@ -118,7 +121,7 @@ void Queue::deliver(intrusive_ptr<Message>& msg){ void Queue::recover(intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued - if (mgmtObject != 0) { + if (mgmtObject.get() != 0) { mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgPersistEnqueues (); @@ -136,7 +139,7 @@ void Queue::recover(intrusive_ptr<Message>& msg){ void Queue::process(intrusive_ptr<Message>& msg){ push(msg); - if (mgmtObject != 0) { + if (mgmtObject.get() != 0) { mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgTxnEnqueues (); @@ -319,7 +322,7 @@ void Queue::consume(Consumer&, bool requestExclusive){ } consumerCount++; - if (mgmtObject != 0){ + if (mgmtObject.get() != 0){ mgmtObject->inc_consumers (); } } @@ -329,7 +332,7 @@ void Queue::cancel(Consumer& c){ Mutex::ScopedLock locker(consumerLock); consumerCount--; if(exclusive) exclusive = false; - if (mgmtObject != 0){ + if (mgmtObject.get() != 0){ mgmtObject->dec_consumers (); } } @@ -341,16 +344,6 @@ QueuedMessage Queue::dequeue(){ if(!messages.empty()){ msg = messages.front(); pop(); - if (mgmtObject != 0){ - mgmtObject->inc_msgTotalDequeues (); - //mgmtObject->inc_byteTotalDequeues (msg->contentSize ()); - mgmtObject->dec_msgDepth (); - //mgmtObject->dec_byteDepth (msg->contentSize ()); - if (0){//msg->isPersistent ()) { - mgmtObject->inc_msgPersistDequeues (); - //mgmtObject->inc_bytePersistDequeues (msg->contentSize ()); - } - } } return msg; } @@ -366,7 +359,19 @@ uint32_t Queue::purge(){ * Assumes messageLock is held */ void Queue::pop(){ - if (policy.get()) policy->dequeued(messages.front().payload->contentSize()); + QueuedMessage& msg = messages.front(); + + if (policy.get()) policy->dequeued(msg.payload->contentSize()); + if (mgmtObject.get() != 0){ + mgmtObject->inc_msgTotalDequeues (); + mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize()); + mgmtObject->dec_msgDepth (); + mgmtObject->dec_byteDepth (msg.payload->contentSize()); + if (msg.payload->isPersistent ()){ + mgmtObject->inc_msgPersistDequeues (); + mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize()); + } + } messages.pop_front(); } @@ -473,7 +478,8 @@ void Queue::destroy() } } -void Queue::bound(const string& exchange, const string& key, const FieldTable& args) +void Queue::bound(const string& exchange, const string& key, + const FieldTable& args) { bindings.add(exchange, key, args); } @@ -584,8 +590,24 @@ ManagementObject::shared_ptr Queue::GetManagementObject (void) const return dynamic_pointer_cast<ManagementObject> (mgmtObject); } -Manageable::status_t Queue::ManagementMethod (uint32_t /*methodId*/, +Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& /*args*/) { - return Manageable::STATUS_OK; + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + + QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]"); + + switch (methodId) + { + case management::Queue::METHOD_PURGE : + purge (); + status = Manageable::STATUS_OK; + break; + + case management::Queue::METHOD_INCREASEJOURNALSIZE : + status = Manageable::STATUS_NOT_IMPLEMENTED; + break; + } + + return status; } |