summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Queue.h
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2012-03-22 14:47:15 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2012-03-22 14:47:15 +0000
commit930f964023ad5e6fc6d7a0d2c1ab0556263c119a (patch)
tree2f319b97e0c6d9e0052a8606f0e29d2b643834c2 /qpid/cpp/src/qpid/broker/Queue.h
parente290b3c879d74326d0152bf93b7e9eeb02615e3d (diff)
downloadqpid-python-930f964023ad5e6fc6d7a0d2c1ab0556263c119a.tar.gz
QPID-3890: merge Queue lock scope reduction performance tweaks into trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1303815 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.h')
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h30
1 files changed, 23 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 282eb691b9..9b9acc677c 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -107,7 +107,22 @@ class Queue : public boost::enable_shared_from_this<Queue>,
QueueListeners listeners;
std::auto_ptr<Messages> messages;
std::deque<QueuedMessage> pendingDequeues;//used to avoid dequeuing during recovery
- mutable qpid::sys::Mutex consumerLock;
+ /** messageLock is used to keep the Queue's state consistent while processing message
+ * events, such as message dispatch, enqueue, acquire, and dequeue. It must be held
+ * while updating certain members in order to keep these members consistent with
+ * each other:
+ * o messages
+ * o sequence
+ * o policy
+ * o listeners
+ * o allocator
+ * o observeXXX() methods
+ * o observers
+ * o pendingDequeues (TBD: move under separate lock)
+ * o exclusive OwnershipToken (TBD: move under separate lock)
+ * o consumerCount (TBD: move under separate lock)
+ * o Queue::UsageBarrier (TBD: move under separate lock)
+ */
mutable qpid::sys::Monitor messageLock;
mutable qpid::sys::Mutex ownershipLock;
mutable uint64_t persistenceId;
@@ -143,17 +158,17 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool isExcluded(boost::intrusive_ptr<Message>& msg);
- /** update queue observers, stats, policy, etc when the messages' state changes. Lock
- * must be held by caller */
+ /** update queue observers, stats, policy, etc when the messages' state changes.
+ * messageLock is held by caller */
void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
- bool popAndDequeue(QueuedMessage&, const sys::Mutex::ScopedLock& lock);
- // acquire message @ position, return true and set msg if acquire succeeds
- bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
- const sys::Mutex::ScopedLock& held);
+ void observeConsumerAdd( const Consumer&, const sys::Mutex::ScopedLock& lock);
+ void observeConsumerRemove( const Consumer&, const sys::Mutex::ScopedLock& lock);
+ bool popAndDequeue(QueuedMessage&);
+ bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg);
void forcePersistent(QueuedMessage& msg);
int getEventMode();
void configureImpl(const qpid::framing::FieldTable& settings);
@@ -355,6 +370,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
/** Apply f to each Observer on the queue */
template <class F> void eachObserver(F f) {
+ sys::Mutex::ScopedLock l(messageLock);
std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f);
}