summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp28
1 files changed, 16 insertions, 12 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index c4094a117b..92e87cc9d8 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -19,21 +19,25 @@
*
*/
-#include "qpid/log/Statement.h"
-#include "qpid/framing/reply_exceptions.h"
#include "Broker.h"
#include "Queue.h"
#include "Exchange.h"
#include "DeliverableMessage.h"
#include "MessageStore.h"
+#include "QueueRegistry.h"
+
+#include "qpid/log/Statement.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
+
#include <iostream>
-#include <boost/bind.hpp>
-#include "QueueRegistry.h"
#include <algorithm>
#include <functional>
+#include <boost/bind.hpp>
+#include <boost/intrusive_ptr.hpp>
+
using namespace qpid::broker;
using namespace qpid::sys;
using namespace qpid::framing;
@@ -87,7 +91,7 @@ void Queue::notifyDurableIOComplete()
}
-void Queue::deliver(intrusive_ptr<Message>& msg){
+void Queue::deliver(boost::intrusive_ptr<Message>& msg){
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg);
@@ -124,7 +128,7 @@ void Queue::deliver(intrusive_ptr<Message>& msg){
}
-void Queue::recover(intrusive_ptr<Message>& msg){
+void Queue::recover(boost::intrusive_ptr<Message>& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
if (mgmtObject.get() != 0) {
@@ -144,7 +148,7 @@ void Queue::recover(intrusive_ptr<Message>& msg){
}
}
-void Queue::process(intrusive_ptr<Message>& msg){
+void Queue::process(boost::intrusive_ptr<Message>& msg){
push(msg);
if (mgmtObject.get() != 0) {
Mutex::ScopedLock alock(mgmtObject->accessorLock);
@@ -393,7 +397,7 @@ void Queue::pop(){
messages.pop_front();
}
-void Queue::push(intrusive_ptr<Message>& msg){
+void Queue::push(boost::intrusive_ptr<Message>& msg){
Mutex::ScopedLock locker(messageLock);
messages.push_back(QueuedMessage(this, msg, ++sequence));
if (policy.get()) {
@@ -434,11 +438,11 @@ bool Queue::canAutoDelete() const{
}
// return true if store exists,
-bool Queue::enqueue(TransactionContext* ctxt, intrusive_ptr<Message> msg)
+bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
{
if (msg->isPersistent() && store) {
msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
- intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg);
+ boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
store->enqueue(ctxt, pmsg, *this);
return true;
}
@@ -447,11 +451,11 @@ bool Queue::enqueue(TransactionContext* ctxt, intrusive_ptr<Message> msg)
}
// return true if store exists,
-bool Queue::dequeue(TransactionContext* ctxt, intrusive_ptr<Message> msg)
+bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
{
if (msg->isPersistent() && store) {
msg->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
- intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg);
+ boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
store->dequeue(ctxt, pmsg, *this);
return true;
}