summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp62
1 files changed, 31 insertions, 31 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 18c1ab1056..d668ee0505 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -71,18 +71,18 @@ void Queue::deliver(Message::shared_ptr& msg){
} else {
- // if no store then mark as enqueued
+ // if no store then mark as enqueued
if (!enqueue(0, msg)){
push(msg);
- msg->enqueueComplete();
- if (mgmtObjectPtr != 0)
- mgmtObjectPtr->enqueue (msg->contentSize ());
- }else {
- if (mgmtObjectPtr != 0)
- mgmtObjectPtr->enqueue (msg->contentSize (), MSG_MASK_PERSIST);
+ msg->enqueueComplete();
+ if (mgmtObject != 0)
+ mgmtObject->enqueue (msg->contentSize ());
+ }else {
+ if (mgmtObject != 0)
+ mgmtObject->enqueue (msg->contentSize (), MSG_MASK_PERSIST);
push(msg);
- }
- QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
+ }
+ QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
serializer.execute(dispatchCallback);
}
}
@@ -91,8 +91,8 @@ void Queue::deliver(Message::shared_ptr& msg){
void Queue::recover(Message::shared_ptr& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
- if (mgmtObjectPtr != 0)
- mgmtObjectPtr->enqueue (msg->contentSize (), MSG_MASK_PERSIST);
+ if (mgmtObject != 0)
+ mgmtObject->enqueue (msg->contentSize (), MSG_MASK_PERSIST);
if (store && !msg->isContentLoaded()) {
//content has not been loaded, need to ensure that lazy loading mode is set:
//TODO: find a nicer way to do this
@@ -108,20 +108,19 @@ void Queue::process(Message::shared_ptr& msg){
mask |= MSG_MASK_PERSIST;
push(msg);
- if (mgmtObjectPtr != 0)
- mgmtObjectPtr->enqueue (msg->contentSize (), mask);
+ if (mgmtObject != 0)
+ mgmtObject->enqueue (msg->contentSize (), mask);
serializer.execute(dispatchCallback);
}
void Queue::requeue(const QueuedMessage& msg){
{
- Mutex::ScopedLock locker(messageLock);
- msg.payload->enqueueComplete(); // mark the message as enqueued
- messages.push_front(msg);
+ Mutex::ScopedLock locker(messageLock);
+ msg.payload->enqueueComplete(); // mark the message as enqueued
+ messages.push_front(msg);
}
serializer.execute(dispatchCallback);
-
}
bool Queue::acquire(const QueuedMessage& msg) {
@@ -221,7 +220,7 @@ void Queue::dispatch()
QPID_LOG(debug, "Message " << msg.payload << " filtered out of " << name << "[" << this << "]");
} else {
break;
- }
+ }
}
serviceAllBrowsers();
}
@@ -290,8 +289,8 @@ void Queue::consume(Consumer::ptr c, bool requestExclusive){
browsers.push_back(c);
}
- if (mgmtObjectPtr != 0){
- mgmtObjectPtr->incConsumers ();
+ if (mgmtObject != 0){
+ mgmtObject->incConsumers ();
}
}
@@ -302,8 +301,8 @@ void Queue::cancel(Consumer::ptr c){
} else {
cancel(c, browsers);
}
- if (mgmtObjectPtr != 0){
- mgmtObjectPtr->decConsumers ();
+ if (mgmtObject != 0){
+ mgmtObject->decConsumers ();
}
if(exclusive == c) exclusive.reset();
}
@@ -318,17 +317,18 @@ void Queue::cancel(Consumer::ptr c, Consumers& consumers)
QueuedMessage Queue::dequeue(){
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg;
+
if(!messages.empty()){
msg = messages.front();
pop();
- if (mgmtObjectPtr != 0){
- uint32_t mask = 0;
+ if (mgmtObject != 0){
+ uint32_t mask = 0;
- if (msg.payload->isPersistent ())
- mask |= MSG_MASK_PERSIST;
+ if (msg.payload->isPersistent ())
+ mask |= MSG_MASK_PERSIST;
- mgmtObjectPtr->dequeue (msg.payload->contentSize (), mask);
- }
+ mgmtObject->dequeue (msg.payload->contentSize (), mask);
+ }
}
return msg;
}
@@ -343,7 +343,7 @@ uint32_t Queue::purge(){
void Queue::pop(){
Mutex::ScopedLock locker(messageLock);
if (policy.get()) policy->dequeued(messages.front().payload->contentSize());
- messages.pop_front();
+ messages.pop_front();
}
void Queue::push(Message::shared_ptr& msg){
@@ -390,7 +390,7 @@ bool Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr msg)
if (msg->isPersistent() && store) {
msg->enqueueAsync(this, store); //increment to async counter -- for message sent to more than one queue
store->enqueue(ctxt, *msg.get(), *this);
- return true;
+ return true;
}
return false;
}
@@ -401,7 +401,7 @@ bool Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr msg)
if (msg->isPersistent() && store) {
msg->dequeueAsync(this, store); //increment to async counter -- for message sent to more than one queue
store->dequeue(ctxt, *msg.get(), *this);
- return true;
+ return true;
}
return false;
}