diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 28 |
1 files changed, 16 insertions, 12 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index c4094a117b..92e87cc9d8 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -19,21 +19,25 @@ * */ -#include "qpid/log/Statement.h" -#include "qpid/framing/reply_exceptions.h" #include "Broker.h" #include "Queue.h" #include "Exchange.h" #include "DeliverableMessage.h" #include "MessageStore.h" +#include "QueueRegistry.h" + +#include "qpid/log/Statement.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" + #include <iostream> -#include <boost/bind.hpp> -#include "QueueRegistry.h" #include <algorithm> #include <functional> +#include <boost/bind.hpp> +#include <boost/intrusive_ptr.hpp> + using namespace qpid::broker; using namespace qpid::sys; using namespace qpid::framing; @@ -87,7 +91,7 @@ void Queue::notifyDurableIOComplete() } -void Queue::deliver(intrusive_ptr<Message>& msg){ +void Queue::deliver(boost::intrusive_ptr<Message>& msg){ if (msg->isImmediate() && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg); @@ -124,7 +128,7 @@ void Queue::deliver(intrusive_ptr<Message>& msg){ } -void Queue::recover(intrusive_ptr<Message>& msg){ +void Queue::recover(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued if (mgmtObject.get() != 0) { @@ -144,7 +148,7 @@ void Queue::recover(intrusive_ptr<Message>& msg){ } } -void Queue::process(intrusive_ptr<Message>& msg){ +void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); if (mgmtObject.get() != 0) { Mutex::ScopedLock alock(mgmtObject->accessorLock); @@ -393,7 +397,7 @@ void Queue::pop(){ messages.pop_front(); } -void Queue::push(intrusive_ptr<Message>& msg){ +void Queue::push(boost::intrusive_ptr<Message>& msg){ Mutex::ScopedLock locker(messageLock); messages.push_back(QueuedMessage(this, msg, ++sequence)); if (policy.get()) { @@ -434,11 +438,11 @@ bool Queue::canAutoDelete() const{ } // return true if store exists, -bool Queue::enqueue(TransactionContext* ctxt, intrusive_ptr<Message> msg) +bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) { if (msg->isPersistent() && store) { msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue - intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg); + boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); store->enqueue(ctxt, pmsg, *this); return true; } @@ -447,11 +451,11 @@ bool Queue::enqueue(TransactionContext* ctxt, intrusive_ptr<Message> msg) } // return true if store exists, -bool Queue::dequeue(TransactionContext* ctxt, intrusive_ptr<Message> msg) +bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) { if (msg->isPersistent() && store) { msg->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue - intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg); + boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); store->dequeue(ctxt, pmsg, *this); return true; } |