diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 147 |
1 files changed, 57 insertions, 90 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 2c792c2d43..96e8fe8b2d 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -548,7 +548,7 @@ void SemanticState::recover(bool requeue) //unconfirmed messages re redelivered and therefore have their //id adjusted, confirmed messages are not and so the ordering //w.r.t id is lost - sort(unacked.begin(), unacked.end()); + unacked.sort(); } } @@ -779,63 +779,60 @@ namespace { class AsyncMessageAcceptCmd : public SessionContext::AsyncCommandContext { mutable qpid::sys::Mutex lock; - std::map<DeliveryId, boost::shared_ptr<Queue> > pending; // for dequeue to complete - bool ready; + unsigned int pending; + std::vector<boost::shared_ptr<Queue> > queues; // for flush() SemanticState& state; + /** completes this command. Note: may run in *any* thread */ + void complete() + { + Mutex::ScopedLock l(lock); + assert(pending); + if (--pending == 0) { + framing::Invoker::Result r; // message.accept does not return result data + Mutex::ScopedUnlock ul(lock); + QPID_LOG(trace, "Completing async message.accept cmd=" << getId()); + completed( r ); + } + } + public: AsyncMessageAcceptCmd(SemanticState& _state) - : ready(false), state(_state) {} + : pending(1), state(_state) {} - /** called from session to urge pending dequeues to complete ASAP */ + /** signal this dequeue done. Note: may be run in *any* thread */ + static void dequeueDone( boost::intrusive_ptr<RefCounted>& ctxt ) + { + boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd(boost::static_pointer_cast<AsyncMessageAcceptCmd>(ctxt)); + cmd->complete(); + } + + /** called from session to urge pending dequeues to complete ASAP, done + as a result of an execution.sync */ void flush() { QPID_LOG(trace, "Flushing pending message.accept cmd=" << getId()); - std::map<DeliveryId, boost::shared_ptr<Queue> > copy; + std::vector<boost::shared_ptr<Queue> > copy; { Mutex::ScopedLock l(lock); - copy = pending; + copy.swap(queues); // no longer needed after flushing... } - std::set<Queue *> flushedQs; // flush each queue only once! - for (std::map<DeliveryId, boost::shared_ptr<Queue> >::iterator i = copy.begin(); + for (std::vector<boost::shared_ptr<Queue> >::iterator i = copy.begin(); i != copy.end(); ++i) { - Queue *queue(i->second.get()); - if (flushedQs.find(queue) == flushedQs.end()) { - flushedQs.insert(queue); - i->second->flush(); - } + (*i)->flush(); } } /** add a pending dequeue to track */ - void add( const DeliveryId& id, const boost::shared_ptr<Queue>& queue ) + void add( const boost::shared_ptr<Queue>& queue ) { - QPID_LOG(trace, "Scheduling dequeue of delivery " << id + QPID_LOG(trace, "Scheduling dequeue of delivery " << getId() << " on session " << state.getSession().getSessionId()); Mutex::ScopedLock l(lock); - bool unique = pending.insert(std::pair<DeliveryId, boost::shared_ptr<Queue> >(id, queue)).second; - if (!unique) { - assert(false); - } + ++pending; + queues.push_back(queue); } - /** signal this dequeue done. Note: may be run in *any* thread */ - void complete( const DeliveryId& id ) - { - QPID_LOG(trace, "Dequeue of delivery " << id - << " completed on session " << state.getSession().getSessionId()); - Mutex::ScopedLock l(lock); - std::map<DeliveryId, boost::shared_ptr<Queue> >::iterator i = pending.find(id); - assert(i != pending.end()); - pending.erase(i); - - if (ready && pending.empty()) { - framing::Invoker::Result r; // message.accept does not return result data - Mutex::ScopedUnlock ul(lock); - QPID_LOG(trace, "Completing async message.accept cmd=" << getId()); - completed( r ); - } - } /** allow the Message.Accept to complete - do this only after all * deliveryIds have been added() and this has been registered with the @@ -844,57 +841,15 @@ namespace { { QPID_LOG(trace, "Dispatching async message.accept cmd=" << getId()); Mutex::ScopedLock l(lock); - if (pending.empty()) { + assert(pending); + if (--pending == 0) { framing::Invoker::Result r; Mutex::ScopedUnlock ul(lock); + QPID_LOG(trace, "Completing async message.accept cmd=" << getId()); completed( r ); - return; } - ready = true; } }; - - - /** callback to indicate a single message has completed its asynchronous - dequeue. This object is made available to the queue when a dequeue is - started. The queue will invoke the callback when the dequeue - completes. */ - class DequeueDone : public Queue::DequeueDoneCallback - { - DeliveryId id; - boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd; - public: - DequeueDone( const DeliveryId & _id, - const boost::intrusive_ptr<AsyncMessageAcceptCmd>& _cmd ) - : id(_id), cmd(_cmd) {} - void operator()() { cmd->complete( id ); } - }; - - - /** factory to create the above callback - passed to queue's dequeue - method, only invoked if the dequeue operation is asynchronous! */ - boost::shared_ptr<Queue::DequeueDoneCallback> factory( SemanticState *state, - const DeliveryId& id, - const boost::shared_ptr<Queue>& queue, - boost::intrusive_ptr<AsyncMessageAcceptCmd>* cmd ) - { - if (!cmd->get()) { // first async dequeue creates the context - cmd->reset(new AsyncMessageAcceptCmd(*state)); - } - (*cmd)->add( id, queue ); - boost::shared_ptr<DequeueDone> x( new DequeueDone(id, *cmd ) ); - return x; - } - - /** predicate to process unacked delivery records during Message.accept - processing */ - bool acceptDelivery( SemanticState *state, - boost::intrusive_ptr<AsyncMessageAcceptCmd>* cmd, - DeliveryRecord& dr ) - { - Queue::DequeueDoneCallbackFactory f = boost::bind(factory, state, dr.getId(), dr.getQueue(), cmd); - return dr.accept((TransactionContext*) 0, &f); - } } // namespace @@ -929,14 +884,26 @@ void SemanticState::accepted(const SequenceSet& commands) { which would be necessary if we remove when the dequeue completes. Is this ok? */ boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd; - DeliveryRecords::iterator removed = - remove_if(unacked.begin(), unacked.end(), - isInSequenceSetAnd(commands, - bind(acceptDelivery, - this, - &cmd, - _1))); - unacked.erase(removed, unacked.end()); + IsInSequenceSet isInSeq(commands); + DeliveryRecords::const_iterator end(unacked.end()); + DeliveryRecords::iterator i = unacked.begin(); + while (i != end) { + const SequenceNumber seq(i->getId()); + if (isInSeq(seq)) { + boost::intrusive_ptr<Queue::DequeueCompletion> async(i->accept((TransactionContext *)0)); + if (async) { + if (!cmd) cmd = boost::intrusive_ptr<AsyncMessageAcceptCmd>(new AsyncMessageAcceptCmd(*this)); + cmd->add(i->getQueue()); + boost::intrusive_ptr<qpid::RefCounted> rc(boost::static_pointer_cast<RefCounted>(cmd)); + async->registerCallback(&AsyncMessageAcceptCmd::dequeueDone, rc); + } + if (i->isRedundant()) + i = unacked.erase(i); + else + ++i; + } else + ++i; + } if (cmd) { boost::intrusive_ptr<SessionContext::AsyncCommandContext> pcmd(boost::static_pointer_cast<SessionContext::AsyncCommandContext>(cmd)); |