summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp70
1 files changed, 46 insertions, 24 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 4dba60cd0d..e2fd998cc0 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -59,11 +59,14 @@ Queue::Queue(const string& _name, bool _autodelete,
{
if (parent != 0)
{
- mgmtObject = management::Queue::shared_ptr
- (new management::Queue (this, parent, _name, _store != 0, _autodelete, 0));
-
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
- agent->addObject (mgmtObject);
+
+ if (agent.get () != 0)
+ {
+ mgmtObject = management::Queue::shared_ptr
+ (new management::Queue (this, parent, _name, _store != 0, _autodelete, 0));
+ agent->addObject (mgmtObject);
+ }
}
}
@@ -93,14 +96,14 @@ void Queue::deliver(intrusive_ptr<Message>& msg){
if (!enqueue(0, msg)){
push(msg);
msg->enqueueComplete();
- if (mgmtObject != 0) {
+ if (mgmtObject.get() != 0) {
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgDepth ();
mgmtObject->inc_byteDepth (msg->contentSize ());
}
}else {
- if (mgmtObject != 0) {
+ if (mgmtObject.get() != 0) {
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgDepth ();
@@ -118,7 +121,7 @@ void Queue::deliver(intrusive_ptr<Message>& msg){
void Queue::recover(intrusive_ptr<Message>& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
- if (mgmtObject != 0) {
+ if (mgmtObject.get() != 0) {
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgPersistEnqueues ();
@@ -136,7 +139,7 @@ void Queue::recover(intrusive_ptr<Message>& msg){
void Queue::process(intrusive_ptr<Message>& msg){
push(msg);
- if (mgmtObject != 0) {
+ if (mgmtObject.get() != 0) {
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgTxnEnqueues ();
@@ -319,7 +322,7 @@ void Queue::consume(Consumer&, bool requestExclusive){
}
consumerCount++;
- if (mgmtObject != 0){
+ if (mgmtObject.get() != 0){
mgmtObject->inc_consumers ();
}
}
@@ -329,7 +332,7 @@ void Queue::cancel(Consumer& c){
Mutex::ScopedLock locker(consumerLock);
consumerCount--;
if(exclusive) exclusive = false;
- if (mgmtObject != 0){
+ if (mgmtObject.get() != 0){
mgmtObject->dec_consumers ();
}
}
@@ -341,16 +344,6 @@ QueuedMessage Queue::dequeue(){
if(!messages.empty()){
msg = messages.front();
pop();
- if (mgmtObject != 0){
- mgmtObject->inc_msgTotalDequeues ();
- //mgmtObject->inc_byteTotalDequeues (msg->contentSize ());
- mgmtObject->dec_msgDepth ();
- //mgmtObject->dec_byteDepth (msg->contentSize ());
- if (0){//msg->isPersistent ()) {
- mgmtObject->inc_msgPersistDequeues ();
- //mgmtObject->inc_bytePersistDequeues (msg->contentSize ());
- }
- }
}
return msg;
}
@@ -366,7 +359,19 @@ uint32_t Queue::purge(){
* Assumes messageLock is held
*/
void Queue::pop(){
- if (policy.get()) policy->dequeued(messages.front().payload->contentSize());
+ QueuedMessage& msg = messages.front();
+
+ if (policy.get()) policy->dequeued(msg.payload->contentSize());
+ if (mgmtObject.get() != 0){
+ mgmtObject->inc_msgTotalDequeues ();
+ mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
+ mgmtObject->dec_msgDepth ();
+ mgmtObject->dec_byteDepth (msg.payload->contentSize());
+ if (msg.payload->isPersistent ()){
+ mgmtObject->inc_msgPersistDequeues ();
+ mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize());
+ }
+ }
messages.pop_front();
}
@@ -473,7 +478,8 @@ void Queue::destroy()
}
}
-void Queue::bound(const string& exchange, const string& key, const FieldTable& args)
+void Queue::bound(const string& exchange, const string& key,
+ const FieldTable& args)
{
bindings.add(exchange, key, args);
}
@@ -584,8 +590,24 @@ ManagementObject::shared_ptr Queue::GetManagementObject (void) const
return dynamic_pointer_cast<ManagementObject> (mgmtObject);
}
-Manageable::status_t Queue::ManagementMethod (uint32_t /*methodId*/,
+Manageable::status_t Queue::ManagementMethod (uint32_t methodId,
Args& /*args*/)
{
- return Manageable::STATUS_OK;
+ Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+ QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
+
+ switch (methodId)
+ {
+ case management::Queue::METHOD_PURGE :
+ purge ();
+ status = Manageable::STATUS_OK;
+ break;
+
+ case management::Queue::METHOD_INCREASEJOURNALSIZE :
+ status = Manageable::STATUS_NOT_IMPLEMENTED;
+ break;
+ }
+
+ return status;
}