diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 62 |
1 files changed, 31 insertions, 31 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 18c1ab1056..d668ee0505 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -71,18 +71,18 @@ void Queue::deliver(Message::shared_ptr& msg){ } else { - // if no store then mark as enqueued + // if no store then mark as enqueued if (!enqueue(0, msg)){ push(msg); - msg->enqueueComplete(); - if (mgmtObjectPtr != 0) - mgmtObjectPtr->enqueue (msg->contentSize ()); - }else { - if (mgmtObjectPtr != 0) - mgmtObjectPtr->enqueue (msg->contentSize (), MSG_MASK_PERSIST); + msg->enqueueComplete(); + if (mgmtObject != 0) + mgmtObject->enqueue (msg->contentSize ()); + }else { + if (mgmtObject != 0) + mgmtObject->enqueue (msg->contentSize (), MSG_MASK_PERSIST); push(msg); - } - QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); + } + QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); serializer.execute(dispatchCallback); } } @@ -91,8 +91,8 @@ void Queue::deliver(Message::shared_ptr& msg){ void Queue::recover(Message::shared_ptr& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued - if (mgmtObjectPtr != 0) - mgmtObjectPtr->enqueue (msg->contentSize (), MSG_MASK_PERSIST); + if (mgmtObject != 0) + mgmtObject->enqueue (msg->contentSize (), MSG_MASK_PERSIST); if (store && !msg->isContentLoaded()) { //content has not been loaded, need to ensure that lazy loading mode is set: //TODO: find a nicer way to do this @@ -108,20 +108,19 @@ void Queue::process(Message::shared_ptr& msg){ mask |= MSG_MASK_PERSIST; push(msg); - if (mgmtObjectPtr != 0) - mgmtObjectPtr->enqueue (msg->contentSize (), mask); + if (mgmtObject != 0) + mgmtObject->enqueue (msg->contentSize (), mask); serializer.execute(dispatchCallback); } void Queue::requeue(const QueuedMessage& msg){ { - Mutex::ScopedLock locker(messageLock); - msg.payload->enqueueComplete(); // mark the message as enqueued - messages.push_front(msg); + Mutex::ScopedLock locker(messageLock); + msg.payload->enqueueComplete(); // mark the message as enqueued + messages.push_front(msg); } serializer.execute(dispatchCallback); - } bool Queue::acquire(const QueuedMessage& msg) { @@ -221,7 +220,7 @@ void Queue::dispatch() QPID_LOG(debug, "Message " << msg.payload << " filtered out of " << name << "[" << this << "]"); } else { break; - } + } } serviceAllBrowsers(); } @@ -290,8 +289,8 @@ void Queue::consume(Consumer::ptr c, bool requestExclusive){ browsers.push_back(c); } - if (mgmtObjectPtr != 0){ - mgmtObjectPtr->incConsumers (); + if (mgmtObject != 0){ + mgmtObject->incConsumers (); } } @@ -302,8 +301,8 @@ void Queue::cancel(Consumer::ptr c){ } else { cancel(c, browsers); } - if (mgmtObjectPtr != 0){ - mgmtObjectPtr->decConsumers (); + if (mgmtObject != 0){ + mgmtObject->decConsumers (); } if(exclusive == c) exclusive.reset(); } @@ -318,17 +317,18 @@ void Queue::cancel(Consumer::ptr c, Consumers& consumers) QueuedMessage Queue::dequeue(){ Mutex::ScopedLock locker(messageLock); QueuedMessage msg; + if(!messages.empty()){ msg = messages.front(); pop(); - if (mgmtObjectPtr != 0){ - uint32_t mask = 0; + if (mgmtObject != 0){ + uint32_t mask = 0; - if (msg.payload->isPersistent ()) - mask |= MSG_MASK_PERSIST; + if (msg.payload->isPersistent ()) + mask |= MSG_MASK_PERSIST; - mgmtObjectPtr->dequeue (msg.payload->contentSize (), mask); - } + mgmtObject->dequeue (msg.payload->contentSize (), mask); + } } return msg; } @@ -343,7 +343,7 @@ uint32_t Queue::purge(){ void Queue::pop(){ Mutex::ScopedLock locker(messageLock); if (policy.get()) policy->dequeued(messages.front().payload->contentSize()); - messages.pop_front(); + messages.pop_front(); } void Queue::push(Message::shared_ptr& msg){ @@ -390,7 +390,7 @@ bool Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr msg) if (msg->isPersistent() && store) { msg->enqueueAsync(this, store); //increment to async counter -- for message sent to more than one queue store->enqueue(ctxt, *msg.get(), *this); - return true; + return true; } return false; } @@ -401,7 +401,7 @@ bool Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr msg) if (msg->isPersistent() && store) { msg->dequeueAsync(this, store); //increment to async counter -- for message sent to more than one queue store->dequeue(ctxt, *msg.get(), *this); - return true; + return true; } return false; } |