diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-06-16 15:09:41 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-06-16 15:09:41 +0000 |
commit | 2519eecbf08f512535aaef845c498774700948ab (patch) | |
tree | 9475b21dfc0b8a205e599ba49bc0727f6e2540f9 | |
parent | 16b17128fe65f7b29b3e4d1b8e2d0f4af25f368d (diff) | |
download | qpid-python-2519eecbf08f512535aaef845c498774700948ab.tar.gz |
QPID-3079: restructured Queue::dequeue() interface. Performance tuning changes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3079@1136478 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/DeliveryRecord.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DeliveryRecord.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/PersistableMessage.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 47 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 59 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 147 |
6 files changed, 134 insertions, 141 deletions
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp index 635123d242..d0a450ce1e 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -111,13 +111,16 @@ void DeliveryRecord::complete() { } /** Accept msg, and optionally notify caller when dequeue completes */ -bool DeliveryRecord::accept(TransactionContext* ctxt, Queue::DequeueDoneCallbackFactory *f) { +boost::intrusive_ptr<Queue::DequeueCompletion> +DeliveryRecord::accept(TransactionContext* ctxt) { + static const boost::intrusive_ptr<Queue::DequeueCompletion> empty; if (acquired && !ended) { - queue->dequeue(ctxt, msg, f); - setEnded(); QPID_LOG(debug, "Accepted " << id); + boost::intrusive_ptr<Queue::DequeueCompletion> dq(queue->dequeue(ctxt, msg)); + setEnded(); + return dq; } - return isRedundant(); + return empty; } void DeliveryRecord::dequeue(TransactionContext* ctxt) const{ diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h index 10491df94c..43d5ef7074 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h @@ -86,7 +86,7 @@ class DeliveryRecord void redeliver(SemanticState* const); void acquire(DeliveryIds& results); void complete(); - bool accept(TransactionContext*, Queue::DequeueDoneCallbackFactory *); // Returns isRedundant() + boost::intrusive_ptr<Queue::DequeueCompletion> accept(TransactionContext*); bool setEnded(); // Returns isRedundant() void committed() const; @@ -104,7 +104,7 @@ class DeliveryRecord void deliver(framing::FrameHandler& h, DeliveryId deliveryId, uint16_t framesize); void setId(DeliveryId _id) { id = _id; } - typedef std::deque<DeliveryRecord> DeliveryRecords; + typedef std::list<DeliveryRecord> DeliveryRecords; static AckRange findRange(DeliveryRecords& records, DeliveryId first, DeliveryId last); const QueuedMessage& getMessage() const { return msg; } framing::SequenceNumber getId() const { return id; } diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp index 0d5e32b40d..74f76da656 100644 --- a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp @@ -70,11 +70,12 @@ bool PersistableMessage::isContentReleased() const bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){ if (store && (queue->getPersistenceId()!=0)) { sys::ScopedLock<sys::Mutex> l(storeLock); - for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) { + syncList::iterator i = synclist.find(queue->getName()); + if (i != synclist.end()) { PersistableQueue::shared_ptr q(i->second.lock()); if (q && q->getPersistenceId() == queue->getPersistenceId()) return true; - } - } + } + } return false; } diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index bde539f9ec..830589e687 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -438,8 +438,7 @@ void Queue::purgeExpired() Mutex::ScopedLock locker(messageLock); messages->removeIf(boost::bind(&collect_if_expired, expired, _1)); } - for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1, - (DequeueDoneCallbackFactory*)0)); + for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } } @@ -614,8 +613,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg policy->getPendingDequeues(dequeues); } //depending on policy, may have some dequeues that need to performed without holding the lock - for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1, - (DequeueDoneCallbackFactory*)0)); + for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } if (inLastNodeFailure && persistLastNode){ @@ -655,19 +653,19 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg) } /** - * returns false if the dequeue completed, otherwise the dequeue will complete - * asynchronously. If the caller needs to know when an asynchronous dequeue - * completes, it must provide a factory that will provide the callback. + * Returns a null pointer if the dequeue completed, otherwise the dequeue will complete + * asynchronously, and a pointer to a DequeueCompletion object is returned. */ -bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg, - DequeueDoneCallbackFactory *factory) +boost::intrusive_ptr<Queue::DequeueCompletion> +Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { + static const boost::intrusive_ptr<DequeueCompletion> empty; ScopedUse u(barrier); - if (!u.acquired) return false; + if (!u.acquired) return empty; { Mutex::ScopedLock locker(messageLock); - if (!isEnqueued(msg)) return false; + if (!isEnqueued(msg)) return empty; if (!ctxt) { dequeued(msg); } @@ -679,17 +677,17 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg, if ((msg.payload->isPersistent() || msg.payload->checkContentReleasable()) && store) { msg.payload->dequeueAsync(shared_from_this(), store); boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload); - if (factory) { - boost::shared_ptr<DequeueDoneCallback> callback((*factory)()); + boost::intrusive_ptr<DequeueCompletion> dc(new DequeueCompletion()); + { Mutex::ScopedLock locker(messageLock); - pendingDequeueCallbacks[pmsg.get()] = callback; + pendingDequeueCompletions[pmsg.get()] = dc; } QPID_LOG(debug, "Message " << pmsg << " async dequeue started on queue " << name); store->dequeue(ctxt, pmsg, shared_from_this()); // invokes Queue::dequeueComplete() when done - return true; + return dc; } } - return false; + return empty; } void Queue::dequeueCommitted(const QueuedMessage& msg) @@ -1121,8 +1119,7 @@ void Queue::recoveryComplete(ExchangeRegistry& exchanges) } } //process any pending dequeues - for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1, - (DequeueDoneCallbackFactory*)0)); + for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); pendingDequeues.clear(); } @@ -1221,17 +1218,17 @@ void Queue::dequeueComplete(const boost::intrusive_ptr<PersistableMessage>& msg QPID_LOG(debug, "Message " << msg << " dequeue complete on queue " << name); msg->dequeueComplete(shared_from_this(), store); - boost::shared_ptr<DequeueDoneCallback> cb; + boost::intrusive_ptr<DequeueCompletion> dc; { Mutex::ScopedLock locker(messageLock); - std::map< PersistableMessage *, boost::shared_ptr<DequeueDoneCallback> >::iterator i; - i = pendingDequeueCallbacks.find(msg.get()); - if (i != pendingDequeueCallbacks.end()) { - cb = i->second; - pendingDequeueCallbacks.erase(i); + std::map<PersistableMessage *, boost::intrusive_ptr<DequeueCompletion> >::iterator i; + i = pendingDequeueCompletions.find(msg.get()); + if (i != pendingDequeueCompletions.end()) { + dc = i->second; + pendingDequeueCompletions.erase(i); } } - if (cb) (*cb)(); + if (dc) dc->dequeueDone(); } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 0bb0903a71..69c07b1b78 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -40,6 +40,7 @@ #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Queue.h" #include "qpid/framing/amqp_types.h" +#include "qpid/sys/AtomicValue.h" #include <boost/shared_ptr.hpp> #include <boost/intrusive_ptr.hpp> @@ -275,26 +276,50 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck = false); void enqueueAborted(boost::intrusive_ptr<Message> msg); /** - * dequeue from store (only done once messages is acknowledged). Dequeue - * -may- complete asynchronously. This method returns 'false' if the - * dequeue has completed, else 'true' if the dequeue is asynchronous. If - * the caller is interested in receiving notification when the asynchronous - * dequeue completes, it may pass a pointer to a factory functor that - * returns a shareable DequeueDoneCallback object. If the dequeue is - * completed synchronously, this pointer is ignored. If the dequeue will - * complete asynchronously, the factory is called to obtain a - * DequeueDoneCallback. When the dequeue completes, the - * DequeueDoneCallback is invoked. The callback should be prepared to - * execute on any thread. + * dequeue from Queue (only done once messages is acknowledged). Dequeue + * -may- complete asynchronously. This method returns a DequeueCompletion + * object for use by the caller to determine when the dequeue has + * completed. If the dequeue is able to complete during the dequeue call, + * a null pointer is returned. If the caller is interested in receiving + * notification when an asynchronous dequeue completes, it may register a + * callback function and a context. The callback should be prepared to + * execute on any thread, and the callback object's lifecycle must ensure + * that it survives until the dequeue callback is made. */ - class DequeueDoneCallback + class DequeueCompletion : public RefCounted { public: - virtual void operator()() = 0; + typedef void callback( boost::intrusive_ptr<RefCounted>& ctxt ); + + DequeueCompletion() + : completionsNeeded(2), // one for register call, another for done call + cb(0) {} + + void dequeueDone() + { + assert(completionsNeeded.get() > 0); + if (--completionsNeeded == 0) { + assert(cb); + (*cb)(ctxt); + ctxt.reset(); + } + } + + void registerCallback( callback *f, boost::intrusive_ptr<RefCounted>& _ctxt ) + { + cb = f; + ctxt = _ctxt; + dequeueDone(); // invoke callback if dequeue already done. + } + + private: + mutable qpid::sys::AtomicValue<int> completionsNeeded; + callback *cb; + boost::intrusive_ptr<RefCounted> ctxt; + friend class Queue; + }; - typedef boost::function<boost::shared_ptr<DequeueDoneCallback>()> DequeueDoneCallbackFactory; - QPID_BROKER_EXTERN bool dequeue(TransactionContext* ctxt, const QueuedMessage& msg, - DequeueDoneCallbackFactory *factory = 0); + QPID_BROKER_EXTERN boost::intrusive_ptr<DequeueCompletion> dequeue(TransactionContext* ctxt, const QueuedMessage& msg); /** invoked by store to signal dequeue() has completed */ QPID_BROKER_EXTERN void dequeueComplete(const boost::intrusive_ptr<PersistableMessage>& msg); @@ -405,7 +430,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, const Broker* getBroker(); private: - std::map< PersistableMessage *, boost::shared_ptr<DequeueDoneCallback> > pendingDequeueCallbacks; + std::map<PersistableMessage *, boost::intrusive_ptr<DequeueCompletion> > pendingDequeueCompletions; }; } } diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 2c792c2d43..96e8fe8b2d 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -548,7 +548,7 @@ void SemanticState::recover(bool requeue) //unconfirmed messages re redelivered and therefore have their //id adjusted, confirmed messages are not and so the ordering //w.r.t id is lost - sort(unacked.begin(), unacked.end()); + unacked.sort(); } } @@ -779,63 +779,60 @@ namespace { class AsyncMessageAcceptCmd : public SessionContext::AsyncCommandContext { mutable qpid::sys::Mutex lock; - std::map<DeliveryId, boost::shared_ptr<Queue> > pending; // for dequeue to complete - bool ready; + unsigned int pending; + std::vector<boost::shared_ptr<Queue> > queues; // for flush() SemanticState& state; + /** completes this command. Note: may run in *any* thread */ + void complete() + { + Mutex::ScopedLock l(lock); + assert(pending); + if (--pending == 0) { + framing::Invoker::Result r; // message.accept does not return result data + Mutex::ScopedUnlock ul(lock); + QPID_LOG(trace, "Completing async message.accept cmd=" << getId()); + completed( r ); + } + } + public: AsyncMessageAcceptCmd(SemanticState& _state) - : ready(false), state(_state) {} + : pending(1), state(_state) {} - /** called from session to urge pending dequeues to complete ASAP */ + /** signal this dequeue done. Note: may be run in *any* thread */ + static void dequeueDone( boost::intrusive_ptr<RefCounted>& ctxt ) + { + boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd(boost::static_pointer_cast<AsyncMessageAcceptCmd>(ctxt)); + cmd->complete(); + } + + /** called from session to urge pending dequeues to complete ASAP, done + as a result of an execution.sync */ void flush() { QPID_LOG(trace, "Flushing pending message.accept cmd=" << getId()); - std::map<DeliveryId, boost::shared_ptr<Queue> > copy; + std::vector<boost::shared_ptr<Queue> > copy; { Mutex::ScopedLock l(lock); - copy = pending; + copy.swap(queues); // no longer needed after flushing... } - std::set<Queue *> flushedQs; // flush each queue only once! - for (std::map<DeliveryId, boost::shared_ptr<Queue> >::iterator i = copy.begin(); + for (std::vector<boost::shared_ptr<Queue> >::iterator i = copy.begin(); i != copy.end(); ++i) { - Queue *queue(i->second.get()); - if (flushedQs.find(queue) == flushedQs.end()) { - flushedQs.insert(queue); - i->second->flush(); - } + (*i)->flush(); } } /** add a pending dequeue to track */ - void add( const DeliveryId& id, const boost::shared_ptr<Queue>& queue ) + void add( const boost::shared_ptr<Queue>& queue ) { - QPID_LOG(trace, "Scheduling dequeue of delivery " << id + QPID_LOG(trace, "Scheduling dequeue of delivery " << getId() << " on session " << state.getSession().getSessionId()); Mutex::ScopedLock l(lock); - bool unique = pending.insert(std::pair<DeliveryId, boost::shared_ptr<Queue> >(id, queue)).second; - if (!unique) { - assert(false); - } + ++pending; + queues.push_back(queue); } - /** signal this dequeue done. Note: may be run in *any* thread */ - void complete( const DeliveryId& id ) - { - QPID_LOG(trace, "Dequeue of delivery " << id - << " completed on session " << state.getSession().getSessionId()); - Mutex::ScopedLock l(lock); - std::map<DeliveryId, boost::shared_ptr<Queue> >::iterator i = pending.find(id); - assert(i != pending.end()); - pending.erase(i); - - if (ready && pending.empty()) { - framing::Invoker::Result r; // message.accept does not return result data - Mutex::ScopedUnlock ul(lock); - QPID_LOG(trace, "Completing async message.accept cmd=" << getId()); - completed( r ); - } - } /** allow the Message.Accept to complete - do this only after all * deliveryIds have been added() and this has been registered with the @@ -844,57 +841,15 @@ namespace { { QPID_LOG(trace, "Dispatching async message.accept cmd=" << getId()); Mutex::ScopedLock l(lock); - if (pending.empty()) { + assert(pending); + if (--pending == 0) { framing::Invoker::Result r; Mutex::ScopedUnlock ul(lock); + QPID_LOG(trace, "Completing async message.accept cmd=" << getId()); completed( r ); - return; } - ready = true; } }; - - - /** callback to indicate a single message has completed its asynchronous - dequeue. This object is made available to the queue when a dequeue is - started. The queue will invoke the callback when the dequeue - completes. */ - class DequeueDone : public Queue::DequeueDoneCallback - { - DeliveryId id; - boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd; - public: - DequeueDone( const DeliveryId & _id, - const boost::intrusive_ptr<AsyncMessageAcceptCmd>& _cmd ) - : id(_id), cmd(_cmd) {} - void operator()() { cmd->complete( id ); } - }; - - - /** factory to create the above callback - passed to queue's dequeue - method, only invoked if the dequeue operation is asynchronous! */ - boost::shared_ptr<Queue::DequeueDoneCallback> factory( SemanticState *state, - const DeliveryId& id, - const boost::shared_ptr<Queue>& queue, - boost::intrusive_ptr<AsyncMessageAcceptCmd>* cmd ) - { - if (!cmd->get()) { // first async dequeue creates the context - cmd->reset(new AsyncMessageAcceptCmd(*state)); - } - (*cmd)->add( id, queue ); - boost::shared_ptr<DequeueDone> x( new DequeueDone(id, *cmd ) ); - return x; - } - - /** predicate to process unacked delivery records during Message.accept - processing */ - bool acceptDelivery( SemanticState *state, - boost::intrusive_ptr<AsyncMessageAcceptCmd>* cmd, - DeliveryRecord& dr ) - { - Queue::DequeueDoneCallbackFactory f = boost::bind(factory, state, dr.getId(), dr.getQueue(), cmd); - return dr.accept((TransactionContext*) 0, &f); - } } // namespace @@ -929,14 +884,26 @@ void SemanticState::accepted(const SequenceSet& commands) { which would be necessary if we remove when the dequeue completes. Is this ok? */ boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd; - DeliveryRecords::iterator removed = - remove_if(unacked.begin(), unacked.end(), - isInSequenceSetAnd(commands, - bind(acceptDelivery, - this, - &cmd, - _1))); - unacked.erase(removed, unacked.end()); + IsInSequenceSet isInSeq(commands); + DeliveryRecords::const_iterator end(unacked.end()); + DeliveryRecords::iterator i = unacked.begin(); + while (i != end) { + const SequenceNumber seq(i->getId()); + if (isInSeq(seq)) { + boost::intrusive_ptr<Queue::DequeueCompletion> async(i->accept((TransactionContext *)0)); + if (async) { + if (!cmd) cmd = boost::intrusive_ptr<AsyncMessageAcceptCmd>(new AsyncMessageAcceptCmd(*this)); + cmd->add(i->getQueue()); + boost::intrusive_ptr<qpid::RefCounted> rc(boost::static_pointer_cast<RefCounted>(cmd)); + async->registerCallback(&AsyncMessageAcceptCmd::dequeueDone, rc); + } + if (i->isRedundant()) + i = unacked.erase(i); + else + ++i; + } else + ++i; + } if (cmd) { boost::intrusive_ptr<SessionContext::AsyncCommandContext> pcmd(boost::static_pointer_cast<SessionContext::AsyncCommandContext>(cmd)); |