From 841377c0309773dad4db14af13002fff5cc6d236 Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Mon, 19 Mar 2012 20:07:37 +0000 Subject: QPID-3890: revert changes to observer methods interfaces git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3890@1302629 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Queue.cpp | 48 +++++++++++++++++++++----------------- qpid/cpp/src/qpid/broker/Queue.h | 12 +++++----- 2 files changed, 32 insertions(+), 28 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 111c47a861..3bd9233791 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -301,7 +301,7 @@ void Queue::requeue(const QueuedMessage& msg){ { Mutex::ScopedLock locker(messageLock); messages->release(msg); - observeRequeueLH(msg); + observeRequeue(msg, locker); listeners.populate(copy); } @@ -424,7 +424,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ Mutex::ScopedLock locker(messageLock); bool ok = allocator->allocate( c->getName(), msg ); // inform allocator (void) ok; assert(ok); - observeAcquireLH(msg); + observeAcquire(msg, locker); } if (mgmtObject) { mgmtObject->inc_acquires(); @@ -535,7 +535,7 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ if (autoDeleteTimeout && autoDeleteTask) { autoDeleteTask->cancel(); } - observeConsumerAddLH(*c); + observeConsumerAdd(*c, locker); } if (mgmtObject != 0) mgmtObject->inc_consumerCount (); @@ -547,7 +547,7 @@ void Queue::cancel(Consumer::shared_ptr c){ Mutex::ScopedLock locker(messageLock); consumerCount--; if(exclusive) exclusive = 0; - observeConsumerRemoveLH(*c); + observeConsumerRemove(*c, locker); } if (mgmtObject != 0) mgmtObject->dec_consumerCount (); @@ -559,7 +559,7 @@ QueuedMessage Queue::get(){ { Mutex::ScopedLock locker(messageLock); ok = messages->consume(msg); - if (ok) observeAcquireLH(msg); + if (ok) observeAcquire(msg, locker); } if (ok && mgmtObject) { @@ -615,7 +615,7 @@ void Queue::purgeExpired(qpid::sys::Duration lapse) // KAG: should be safe to retake lock after the removeIf, since // no other thread can touch these messages after the removeIf() call Mutex::ScopedLock locker(messageLock); - observeAcquireLH(*i); + observeAcquire(*i, locker); } dequeue( 0, *i ); } @@ -774,7 +774,7 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr // KAG: should be safe to retake lock after the removeIf, since // no other thread can touch these messages after the removeIf call Mutex::ScopedLock locker(messageLock); - observeAcquireLH(*qmsg); + observeAcquire(*qmsg, locker); } dequeue(0, *qmsg); QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName()); @@ -814,7 +814,7 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty, qmsg != c.matches.end(); ++qmsg) { { Mutex::ScopedLock locker(messageLock); - observeAcquireLH(*qmsg); + observeAcquire(*qmsg, locker); } dequeue(0, *qmsg); // and move to destination Queue. @@ -832,7 +832,7 @@ bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage { Mutex::ScopedLock locker(messageLock); ok = messages->acquire(position, msg); - if (ok) observeAcquireLH(msg); + if (ok) observeAcquire(msg, locker); } if (ok) { if (mgmtObject) { @@ -856,9 +856,9 @@ void Queue::push(boost::intrusive_ptr& msg, bool isRecovery){ qm.position = ++sequence; if (messages->push(qm, removed)) { dequeueRequired = true; - observeAcquireLH(removed); + observeAcquire(removed, locker); } - observeEnqueueLH(qm); + observeEnqueue(qm, locker); if (policy.get()) { policy->enqueued(qm); } @@ -1029,7 +1029,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) if (!ctxt) { if (policy.get()) policy->dequeued(msg); messages->deleted(msg); - observeDequeueLH(msg); + observeDequeue(msg, locker); } } @@ -1057,7 +1057,7 @@ void Queue::dequeueCommitted(const QueuedMessage& msg) Mutex::ScopedLock locker(messageLock); if (policy.get()) policy->dequeued(msg); messages->deleted(msg); - observeDequeueLH(msg); + observeDequeue(msg, locker); } mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject); if (mgmtObject != 0) { @@ -1085,7 +1085,7 @@ bool Queue::popAndDequeue(QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); popped = messages->consume(msg); - if (popped) observeAcquireLH(msg); + if (popped) observeAcquire(msg, locker); } if (popped) { if (mgmtObject) { @@ -1102,8 +1102,9 @@ bool Queue::popAndDequeue(QueuedMessage& msg) /** * Updates policy and management when a message has been dequeued, + * Requires messageLock be held by caller. */ -void Queue::observeDequeueLH(const QueuedMessage& msg) +void Queue::observeDequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&) { for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ @@ -1114,9 +1115,10 @@ void Queue::observeDequeueLH(const QueuedMessage& msg) } } -/** updates queue observers when a message has become unavailable for transfer +/** updates queue observers when a message has become unavailable for transfer. + * Requires messageLock be held by caller. */ -void Queue::observeAcquireLH(const QueuedMessage& msg) +void Queue::observeAcquire(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&) { for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ @@ -1128,8 +1130,9 @@ void Queue::observeAcquireLH(const QueuedMessage& msg) } /** updates queue observers when a message has become re-available for transfer + * Requires messageLock be held by caller. */ -void Queue::observeRequeueLH(const QueuedMessage& msg) +void Queue::observeRequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&) { for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ @@ -1142,7 +1145,7 @@ void Queue::observeRequeueLH(const QueuedMessage& msg) /** updates queue observers when a new consumer has subscribed to this queue. */ -void Queue::observeConsumerAddLH( const Consumer& c) +void Queue::observeConsumerAdd( const Consumer& c, const qpid::sys::Mutex::ScopedLock&) { for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ @@ -1155,7 +1158,7 @@ void Queue::observeConsumerAddLH( const Consumer& c) /** updates queue observers when a consumer has unsubscribed from this queue. */ -void Queue::observeConsumerRemoveLH( const Consumer& c) +void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::ScopedLock&) { for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ @@ -1674,8 +1677,9 @@ void Queue::insertSequenceNumbers(const std::string& key) } /** updates queue observers and state when a message has become available for transfer + * Requires messageLock be held by caller. */ -void Queue::observeEnqueueLH(const QueuedMessage& m) +void Queue::observeEnqueue(const QueuedMessage& m, const qpid::sys::Mutex::ScopedLock&) { for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) { try { @@ -1694,7 +1698,7 @@ void Queue::updateEnqueued(const QueuedMessage& m) { Mutex::ScopedLock locker(messageLock); messages->updateAcquired(m); - observeEnqueueLH(m); + observeEnqueue(m, locker); if (policy.get()) { policy->recoverEnqueued(payload); policy->enqueued(m); diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 9cdfb9846e..4e24740730 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -144,12 +144,12 @@ class Queue : public boost::enable_shared_from_this, /** update queue observers, stats, policy, etc when the messages' state changes. * messageLock is held by caller */ - void observeEnqueueLH(const QueuedMessage& msg); - void observeAcquireLH(const QueuedMessage& msg); - void observeRequeueLH(const QueuedMessage& msg); - void observeDequeueLH(const QueuedMessage& msg); - void observeConsumerAddLH( const Consumer& ); - void observeConsumerRemoveLH( const Consumer& ); + void observeEnqueue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&); + void observeAcquire(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&); + void observeRequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&); + void observeDequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&); + void observeConsumerAdd( const Consumer&, const qpid::sys::Mutex::ScopedLock&); + void observeConsumerRemove( const Consumer&, const qpid::sys::Mutex::ScopedLock&); bool popAndDequeue(QueuedMessage&); bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg); -- cgit v1.2.1