summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp47
1 files changed, 22 insertions, 25 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index bde539f9ec..830589e687 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -438,8 +438,7 @@ void Queue::purgeExpired()
Mutex::ScopedLock locker(messageLock);
messages->removeIf(boost::bind(&collect_if_expired, expired, _1));
}
- for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1,
- (DequeueDoneCallbackFactory*)0));
+ for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
}
@@ -614,8 +613,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg
policy->getPendingDequeues(dequeues);
}
//depending on policy, may have some dequeues that need to performed without holding the lock
- for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1,
- (DequeueDoneCallbackFactory*)0));
+ for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
if (inLastNodeFailure && persistLastNode){
@@ -655,19 +653,19 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
}
/**
- * returns false if the dequeue completed, otherwise the dequeue will complete
- * asynchronously. If the caller needs to know when an asynchronous dequeue
- * completes, it must provide a factory that will provide the callback.
+ * Returns a null pointer if the dequeue completed, otherwise the dequeue will complete
+ * asynchronously, and a pointer to a DequeueCompletion object is returned.
*/
-bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg,
- DequeueDoneCallbackFactory *factory)
+boost::intrusive_ptr<Queue::DequeueCompletion>
+Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
+ static const boost::intrusive_ptr<DequeueCompletion> empty;
ScopedUse u(barrier);
- if (!u.acquired) return false;
+ if (!u.acquired) return empty;
{
Mutex::ScopedLock locker(messageLock);
- if (!isEnqueued(msg)) return false;
+ if (!isEnqueued(msg)) return empty;
if (!ctxt) {
dequeued(msg);
}
@@ -679,17 +677,17 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg,
if ((msg.payload->isPersistent() || msg.payload->checkContentReleasable()) && store) {
msg.payload->dequeueAsync(shared_from_this(), store);
boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
- if (factory) {
- boost::shared_ptr<DequeueDoneCallback> callback((*factory)());
+ boost::intrusive_ptr<DequeueCompletion> dc(new DequeueCompletion());
+ {
Mutex::ScopedLock locker(messageLock);
- pendingDequeueCallbacks[pmsg.get()] = callback;
+ pendingDequeueCompletions[pmsg.get()] = dc;
}
QPID_LOG(debug, "Message " << pmsg << " async dequeue started on queue " << name);
store->dequeue(ctxt, pmsg, shared_from_this()); // invokes Queue::dequeueComplete() when done
- return true;
+ return dc;
}
}
- return false;
+ return empty;
}
void Queue::dequeueCommitted(const QueuedMessage& msg)
@@ -1121,8 +1119,7 @@ void Queue::recoveryComplete(ExchangeRegistry& exchanges)
}
}
//process any pending dequeues
- for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1,
- (DequeueDoneCallbackFactory*)0));
+ for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
pendingDequeues.clear();
}
@@ -1221,17 +1218,17 @@ void Queue::dequeueComplete(const boost::intrusive_ptr<PersistableMessage>& msg
QPID_LOG(debug, "Message " << msg << " dequeue complete on queue " << name);
msg->dequeueComplete(shared_from_this(), store);
- boost::shared_ptr<DequeueDoneCallback> cb;
+ boost::intrusive_ptr<DequeueCompletion> dc;
{
Mutex::ScopedLock locker(messageLock);
- std::map< PersistableMessage *, boost::shared_ptr<DequeueDoneCallback> >::iterator i;
- i = pendingDequeueCallbacks.find(msg.get());
- if (i != pendingDequeueCallbacks.end()) {
- cb = i->second;
- pendingDequeueCallbacks.erase(i);
+ std::map<PersistableMessage *, boost::intrusive_ptr<DequeueCompletion> >::iterator i;
+ i = pendingDequeueCompletions.find(msg.get());
+ if (i != pendingDequeueCompletions.end()) {
+ dc = i->second;
+ pendingDequeueCompletions.erase(i);
}
}
- if (cb) (*cb)();
+ if (dc) dc->dequeueDone();
}