diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2012-03-22 14:47:15 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2012-03-22 14:47:15 +0000 |
commit | 930f964023ad5e6fc6d7a0d2c1ab0556263c119a (patch) | |
tree | 2f319b97e0c6d9e0052a8606f0e29d2b643834c2 /qpid/cpp/src/qpid/broker/Queue.h | |
parent | e290b3c879d74326d0152bf93b7e9eeb02615e3d (diff) | |
download | qpid-python-930f964023ad5e6fc6d7a0d2c1ab0556263c119a.tar.gz |
QPID-3890: merge Queue lock scope reduction performance tweaks into trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1303815 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.h')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 30 |
1 files changed, 23 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 282eb691b9..9b9acc677c 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -107,7 +107,22 @@ class Queue : public boost::enable_shared_from_this<Queue>, QueueListeners listeners; std::auto_ptr<Messages> messages; std::deque<QueuedMessage> pendingDequeues;//used to avoid dequeuing during recovery - mutable qpid::sys::Mutex consumerLock; + /** messageLock is used to keep the Queue's state consistent while processing message + * events, such as message dispatch, enqueue, acquire, and dequeue. It must be held + * while updating certain members in order to keep these members consistent with + * each other: + * o messages + * o sequence + * o policy + * o listeners + * o allocator + * o observeXXX() methods + * o observers + * o pendingDequeues (TBD: move under separate lock) + * o exclusive OwnershipToken (TBD: move under separate lock) + * o consumerCount (TBD: move under separate lock) + * o Queue::UsageBarrier (TBD: move under separate lock) + */ mutable qpid::sys::Monitor messageLock; mutable qpid::sys::Mutex ownershipLock; mutable uint64_t persistenceId; @@ -143,17 +158,17 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool isExcluded(boost::intrusive_ptr<Message>& msg); - /** update queue observers, stats, policy, etc when the messages' state changes. Lock - * must be held by caller */ + /** update queue observers, stats, policy, etc when the messages' state changes. + * messageLock is held by caller */ void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); - bool popAndDequeue(QueuedMessage&, const sys::Mutex::ScopedLock& lock); - // acquire message @ position, return true and set msg if acquire succeeds - bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg, - const sys::Mutex::ScopedLock& held); + void observeConsumerAdd( const Consumer&, const sys::Mutex::ScopedLock& lock); + void observeConsumerRemove( const Consumer&, const sys::Mutex::ScopedLock& lock); + bool popAndDequeue(QueuedMessage&); + bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg); void forcePersistent(QueuedMessage& msg); int getEventMode(); void configureImpl(const qpid::framing::FieldTable& settings); @@ -355,6 +370,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, /** Apply f to each Observer on the queue */ template <class F> void eachObserver(F f) { + sys::Mutex::ScopedLock l(messageLock); std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f); } |