diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 47 |
1 files changed, 22 insertions, 25 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index bde539f9ec..830589e687 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -438,8 +438,7 @@ void Queue::purgeExpired() Mutex::ScopedLock locker(messageLock); messages->removeIf(boost::bind(&collect_if_expired, expired, _1)); } - for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1, - (DequeueDoneCallbackFactory*)0)); + for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } } @@ -614,8 +613,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg policy->getPendingDequeues(dequeues); } //depending on policy, may have some dequeues that need to performed without holding the lock - for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1, - (DequeueDoneCallbackFactory*)0)); + for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } if (inLastNodeFailure && persistLastNode){ @@ -655,19 +653,19 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg) } /** - * returns false if the dequeue completed, otherwise the dequeue will complete - * asynchronously. If the caller needs to know when an asynchronous dequeue - * completes, it must provide a factory that will provide the callback. + * Returns a null pointer if the dequeue completed, otherwise the dequeue will complete + * asynchronously, and a pointer to a DequeueCompletion object is returned. */ -bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg, - DequeueDoneCallbackFactory *factory) +boost::intrusive_ptr<Queue::DequeueCompletion> +Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { + static const boost::intrusive_ptr<DequeueCompletion> empty; ScopedUse u(barrier); - if (!u.acquired) return false; + if (!u.acquired) return empty; { Mutex::ScopedLock locker(messageLock); - if (!isEnqueued(msg)) return false; + if (!isEnqueued(msg)) return empty; if (!ctxt) { dequeued(msg); } @@ -679,17 +677,17 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg, if ((msg.payload->isPersistent() || msg.payload->checkContentReleasable()) && store) { msg.payload->dequeueAsync(shared_from_this(), store); boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload); - if (factory) { - boost::shared_ptr<DequeueDoneCallback> callback((*factory)()); + boost::intrusive_ptr<DequeueCompletion> dc(new DequeueCompletion()); + { Mutex::ScopedLock locker(messageLock); - pendingDequeueCallbacks[pmsg.get()] = callback; + pendingDequeueCompletions[pmsg.get()] = dc; } QPID_LOG(debug, "Message " << pmsg << " async dequeue started on queue " << name); store->dequeue(ctxt, pmsg, shared_from_this()); // invokes Queue::dequeueComplete() when done - return true; + return dc; } } - return false; + return empty; } void Queue::dequeueCommitted(const QueuedMessage& msg) @@ -1121,8 +1119,7 @@ void Queue::recoveryComplete(ExchangeRegistry& exchanges) } } //process any pending dequeues - for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1, - (DequeueDoneCallbackFactory*)0)); + for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); pendingDequeues.clear(); } @@ -1221,17 +1218,17 @@ void Queue::dequeueComplete(const boost::intrusive_ptr<PersistableMessage>& msg QPID_LOG(debug, "Message " << msg << " dequeue complete on queue " << name); msg->dequeueComplete(shared_from_this(), store); - boost::shared_ptr<DequeueDoneCallback> cb; + boost::intrusive_ptr<DequeueCompletion> dc; { Mutex::ScopedLock locker(messageLock); - std::map< PersistableMessage *, boost::shared_ptr<DequeueDoneCallback> >::iterator i; - i = pendingDequeueCallbacks.find(msg.get()); - if (i != pendingDequeueCallbacks.end()) { - cb = i->second; - pendingDequeueCallbacks.erase(i); + std::map<PersistableMessage *, boost::intrusive_ptr<DequeueCompletion> >::iterator i; + i = pendingDequeueCompletions.find(msg.get()); + if (i != pendingDequeueCompletions.end()) { + dc = i->second; + pendingDequeueCompletions.erase(i); } } - if (cb) (*cb)(); + if (dc) dc->dequeueDone(); } |