diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 52 |
1 files changed, 28 insertions, 24 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 0f670ded83..3530f3ef6f 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -824,13 +824,14 @@ namespace { pending.erase(i); if (ready && pending.empty()) { - framing::Invoker::Result r; + framing::Invoker::Result r; // message.accept does not return result data Mutex::ScopedUnlock ul(lock); completed( r ); } } - /** allow the Message.Accept to complete */ + /** allow the Message.Accept to complete - do this only after all + * deliveryIds have been added() */ void enable() { Mutex::ScopedLock l(lock); @@ -854,14 +855,14 @@ namespace { boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd; public: DequeueDone( const DeliveryId & _id, - boost::intrusive_ptr<AsyncMessageAcceptCmd>& _cmd ) + const boost::intrusive_ptr<AsyncMessageAcceptCmd>& _cmd ) : id(_id), cmd(_cmd) {} void operator()() { cmd->complete( id ); } }; /** factory to create the above callback - passed to queue's dequeue - method, only called if dequeue is async! */ + method, only used if dequeue is asynchronous! */ boost::shared_ptr<Queue::DequeueDoneCallback> factory( SemanticState *state, const DeliveryId& id, boost::intrusive_ptr<AsyncMessageAcceptCmd>& cmd ) @@ -873,10 +874,18 @@ namespace { boost::shared_ptr<DequeueDone> x( new DequeueDone(id, cmd ) ); return x; } + + /** predicate to process unacked delivery records */ + bool acceptDelivery( SemanticState *state, + boost::intrusive_ptr<AsyncMessageAcceptCmd>& cmd, + DeliveryRecord dr ) + { + Queue::DequeueDoneCallbackFactory f = boost::bind(factory, state, dr.getId(), cmd); + return dr.accept((TransactionContext*) 0, &f); + } } void SemanticState::accepted(const SequenceSet& commands) { - QPID_LOG(error, "SemanticState::accepted (" << commands << ")"); assertClusterSafe(); if (txBuffer.get()) { //in transactional mode, don't dequeue or remove, just @@ -900,26 +909,21 @@ void SemanticState::accepted(const SequenceSet& commands) { unacked.erase(removed, unacked.end()); } } else { - /** @todo KAG - the following code removes the command from unacked - even if the dequeue has not completed. note that the command will - still not complete until all dequeues complete. I'm doing this to - avoid having to lock the unacked list, which would be necessary if - we remove when the dequeue completes. Is this ok? */ + /** @todo KAG - the following code removes the message from unacked + list even if the dequeue has not yet completed. Note that the + entire command will still not complete until all dequeues + complete. I'm doing this to avoid having to lock the unacked list, + which would be necessary if we remove when the dequeue + completes. Is this ok? */ boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd; - DeliveryRecords::iterator i; - DeliveryRecords undone; - for (i = unacked.begin(); i < unacked.end(); ++i) { - if (i->coveredBy(&commands)) { - Queue::DequeueDoneCallbackFactory f = boost::bind(factory, this, i->getId(), cmd); - if (i->accept((TransactionContext*) 0, &f) == false) { - undone.push_back(*i); - } - } - } - if (undone.empty()) - unacked.clear(); - else - unacked.swap(undone); + DeliveryRecords::iterator removed = + remove_if(unacked.begin(), unacked.end(), + isInSequenceSetAnd(commands, + bind(acceptDelivery, + this, + cmd, + _1))); + unacked.erase(removed, unacked.end()); if (cmd) { boost::intrusive_ptr<SessionContext::AsyncCommandContext> pcmd(boost::static_pointer_cast<SessionContext::AsyncCommandContext>(cmd)); |