diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2012-03-22 14:47:15 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2012-03-22 14:47:15 +0000 |
commit | 930f964023ad5e6fc6d7a0d2c1ab0556263c119a (patch) | |
tree | 2f319b97e0c6d9e0052a8606f0e29d2b643834c2 | |
parent | e290b3c879d74326d0152bf93b7e9eeb02615e3d (diff) | |
download | qpid-python-930f964023ad5e6fc6d7a0d2c1ab0556263c119a.tar.gz |
QPID-3890: merge Queue lock scope reduction performance tweaks into trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1303815 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 514 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 30 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueListeners.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueListeners.h | 7 |
4 files changed, 342 insertions, 213 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index fb1284168f..fdd95ae3bd 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -234,11 +234,16 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ void Queue::recoverPrepared(boost::intrusive_ptr<Message>& msg) { + Mutex::ScopedLock locker(messageLock); if (policy.get()) policy->recoverEnqueued(msg); } -void Queue::recover(boost::intrusive_ptr<Message>& msg){ - if (policy.get()) policy->recoverEnqueued(msg); +void Queue::recover(boost::intrusive_ptr<Message>& msg) +{ + { + Mutex::ScopedLock locker(messageLock); + if (policy.get()) policy->recoverEnqueued(msg); + } push(msg, true); if (store){ @@ -278,7 +283,6 @@ void Queue::requeue(const QueuedMessage& msg){ assertClusterSafe(); QueueListeners::NotificationSet copy; { - Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return; if (deleted) { // @@ -296,8 +300,18 @@ void Queue::requeue(const QueuedMessage& msg){ } mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject); } else { - messages->release(msg); - listeners.populate(copy); + { + Mutex::ScopedLock locker(messageLock); + messages->release(msg); + observeRequeue(msg, locker); + listeners.populate(copy); + } + + if (mgmtObject) { + mgmtObject->inc_releases(); + if (brokerMgmtObject) + brokerMgmtObject->inc_releases(); + } // for persistLastNode - don't force a message twice to disk, but force it if no force before if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) { @@ -307,7 +321,6 @@ void Queue::requeue(const QueuedMessage& msg){ enqueue(0, payload); } } - observeRequeue(msg, locker); } } copy.notify(); @@ -315,10 +328,9 @@ void Queue::requeue(const QueuedMessage& msg){ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { - Mutex::ScopedLock locker(messageLock); assertClusterSafe(); QPID_LOG(debug, "Attempting to acquire message at " << position); - if (acquire(position, message, locker)) { + if (acquire(position, message)) { QPID_LOG(debug, "Acquired message at " << position << " from " << name); return true; } else { @@ -329,17 +341,20 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer) { - Mutex::ScopedLock locker(messageLock); assertClusterSafe(); QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position); - - if (!allocator->allocate( consumer, msg )) { + bool ok; + { + Mutex::ScopedLock locker(messageLock); + ok = allocator->allocate( consumer, msg ); + } + if (!ok) { QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name); return false; } QueuedMessage copy(msg); - if (acquire( msg.position, copy, locker)) { + if (acquire( msg.position, copy)) { QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name); return true; } @@ -381,59 +396,73 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) { while (true) { - Mutex::ScopedLock locker(messageLock); QueuedMessage msg; - if (allocator->nextConsumableMessage(c, msg)) { - if (msg.payload->hasExpired()) { - QPID_LOG(debug, "Message expired from queue '" << name << "'"); - c->setPosition(msg.position); - dequeue(0, msg); - if (mgmtObject) { - mgmtObject->inc_discardsTtl(); - if (brokerMgmtObject) - brokerMgmtObject->inc_discardsTtl(); - } + bool found; + { + Mutex::ScopedLock locker(messageLock); + found = allocator->nextConsumableMessage(c, msg); + if (!found) listeners.addListener(c); + } + if (!found) { + QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); + return NO_MESSAGES; + } - continue; + if (msg.payload->hasExpired()) { + QPID_LOG(debug, "Message expired from queue '" << name << "'"); + c->setPosition(msg.position); + dequeue(0, msg); + if (mgmtObject) { + mgmtObject->inc_discardsTtl(); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsTtl(); } + continue; + } - if (c->filter(msg.payload)) { - if (c->accept(msg.payload)) { + if (c->filter(msg.payload)) { + if (c->accept(msg.payload)) { + { + Mutex::ScopedLock locker(messageLock); bool ok = allocator->allocate( c->getName(), msg ); // inform allocator (void) ok; assert(ok); observeAcquire(msg, locker); - m = msg; - return CONSUMED; - } else { - //message(s) are available but consumer hasn't got enough credit - QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); - messages->release(msg); - return CANT_CONSUME; } + if (mgmtObject) { + mgmtObject->inc_acquires(); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(); + } + m = msg; + return CONSUMED; } else { - //consumer will never want this message - QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); - messages->release(msg); - return CANT_CONSUME; + //message(s) are available but consumer hasn't got enough credit + QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); } } else { - QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); - listeners.addListener(c); - return NO_MESSAGES; + //consumer will never want this message + QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); } + + Mutex::ScopedLock locker(messageLock); + messages->release(msg); + return CANT_CONSUME; } } bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) { while (true) { - Mutex::ScopedLock locker(messageLock); QueuedMessage msg; - - if (!allocator->nextBrowsableMessage(c, msg)) { // no next available + bool found; + { + Mutex::ScopedLock locker(messageLock); + found = allocator->nextBrowsableMessage(c, msg); + if (!found) listeners.addListener(c); + } + if (!found) { // no next available QPID_LOG(debug, "No browsable messages available for consumer " << c->getName() << " on queue '" << name << "'"); - listeners.addListener(c); return false; } @@ -491,7 +520,7 @@ bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const { void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ assertClusterSafe(); { - Mutex::ScopedLock locker(consumerLock); + Mutex::ScopedLock locker(messageLock); if(exclusive) { throw ResourceLockedException( QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); @@ -504,47 +533,43 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ } } consumerCount++; - if (mgmtObject != 0) - mgmtObject->inc_consumerCount (); //reset auto deletion timer if necessary if (autoDeleteTimeout && autoDeleteTask) { autoDeleteTask->cancel(); } + observeConsumerAdd(*c, locker); } - Mutex::ScopedLock locker(messageLock); - for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { - try{ - (*i)->consumerAdded(*c); - } catch (const std::exception& e) { - QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what()); - } - } + if (mgmtObject != 0) + mgmtObject->inc_consumerCount (); } void Queue::cancel(Consumer::shared_ptr c){ removeListener(c); { - Mutex::ScopedLock locker(consumerLock); + Mutex::ScopedLock locker(messageLock); consumerCount--; if(exclusive) exclusive = 0; - if (mgmtObject != 0) - mgmtObject->dec_consumerCount (); - } - Mutex::ScopedLock locker(messageLock); - for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { - try{ - (*i)->consumerRemoved(*c); - } catch (const std::exception& e) { - QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what()); - } + observeConsumerRemove(*c, locker); } + if (mgmtObject != 0) + mgmtObject->dec_consumerCount (); } QueuedMessage Queue::get(){ - Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); - if (messages->consume(msg)) - observeAcquire(msg, locker); + bool ok; + { + Mutex::ScopedLock locker(messageLock); + ok = messages->consume(msg); + if (ok) observeAcquire(msg, locker); + } + + if (ok && mgmtObject) { + mgmtObject->inc_acquires(); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(); + } + return msg; } @@ -576,22 +601,26 @@ void Queue::purgeExpired(qpid::sys::Duration lapse) messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1)); } - // - // Report the count of discarded-by-ttl messages - // - if (mgmtObject && !expired.empty()) { - mgmtObject->inc_discardsTtl(expired.size()); - if (brokerMgmtObject) - brokerMgmtObject->inc_discardsTtl(expired.size()); - } + if (!expired.empty()) { + if (mgmtObject) { + mgmtObject->inc_acquires(expired.size()); + mgmtObject->inc_discardsTtl(expired.size()); + if (brokerMgmtObject) { + brokerMgmtObject->inc_acquires(expired.size()); + brokerMgmtObject->inc_discardsTtl(expired.size()); + } + } - for (std::deque<QueuedMessage>::const_iterator i = expired.begin(); - i != expired.end(); ++i) { - { - Mutex::ScopedLock locker(messageLock); - observeAcquire(*i, locker); + for (std::deque<QueuedMessage>::const_iterator i = expired.begin(); + i != expired.end(); ++i) { + { + // KAG: should be safe to retake lock after the removeIf, since + // no other thread can touch these messages after the removeIf() call + Mutex::ScopedLock locker(messageLock); + observeAcquire(*i, locker); + } + dequeue( 0, *i ); } - dequeue( 0, *i ); } } } @@ -717,32 +746,46 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter)); Collector c(*mf.get(), purge_request); - Mutex::ScopedLock locker(messageLock); - messages->removeIf( boost::bind<bool>(boost::ref(c), _1) ); + { + Mutex::ScopedLock locker(messageLock); + messages->removeIf( boost::bind<bool>(boost::ref(c), _1) ); + } - if (mgmtObject && !c.matches.empty()) { - if (dest.get()) { - mgmtObject->inc_reroutes(c.matches.size()); - if (brokerMgmtObject) - brokerMgmtObject->inc_reroutes(c.matches.size()); - } else { - mgmtObject->inc_discardsPurge(c.matches.size()); - if (brokerMgmtObject) - brokerMgmtObject->inc_discardsPurge(c.matches.size()); + if (!c.matches.empty()) { + if (mgmtObject) { + mgmtObject->inc_acquires(c.matches.size()); + if (dest.get()) { + mgmtObject->inc_reroutes(c.matches.size()); + if (brokerMgmtObject) { + brokerMgmtObject->inc_acquires(c.matches.size()); + brokerMgmtObject->inc_reroutes(c.matches.size()); + } + } else { + mgmtObject->inc_discardsPurge(c.matches.size()); + if (brokerMgmtObject) { + brokerMgmtObject->inc_acquires(c.matches.size()); + brokerMgmtObject->inc_discardsPurge(c.matches.size()); + } + } } - } - for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); - qmsg != c.matches.end(); ++qmsg) { - // Update observers and message state: - observeAcquire(*qmsg, locker); - dequeue(0, *qmsg); - QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName()); - // now reroute if necessary - if (dest.get()) { - assert(qmsg->payload); - DeliverableMessage dmsg(qmsg->payload); - dest->routeWithAlternate(dmsg); + for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); + qmsg != c.matches.end(); ++qmsg) { + + { + // KAG: should be safe to retake lock after the removeIf, since + // no other thread can touch these messages after the removeIf call + Mutex::ScopedLock locker(messageLock); + observeAcquire(*qmsg, locker); + } + dequeue(0, *qmsg); + QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName()); + // now reroute if necessary + if (dest.get()) { + assert(qmsg->payload); + DeliverableMessage dmsg(qmsg->payload); + dest->routeWithAlternate(dmsg); + } } } return c.matches.size(); @@ -754,27 +797,51 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty, std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter)); Collector c(*mf.get(), qty); - Mutex::ScopedLock locker(messageLock); - messages->removeIf( boost::bind<bool>(boost::ref(c), _1) ); + { + Mutex::ScopedLock locker(messageLock); + messages->removeIf( boost::bind<bool>(boost::ref(c), _1) ); + } + - for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); - qmsg != c.matches.end(); ++qmsg) { + if (!c.matches.empty()) { // Update observers and message state: - observeAcquire(*qmsg, locker); - dequeue(0, *qmsg); - // and move to destination Queue. - assert(qmsg->payload); - destq->deliver(qmsg->payload); + + if (mgmtObject) { + mgmtObject->inc_acquires(c.matches.size()); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(c.matches.size()); + } + + for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); + qmsg != c.matches.end(); ++qmsg) { + { + Mutex::ScopedLock locker(messageLock); + observeAcquire(*qmsg, locker); + } + dequeue(0, *qmsg); + // and move to destination Queue. + assert(qmsg->payload); + destq->deliver(qmsg->payload); + } } return c.matches.size(); } /** Acquire the message at the given position, return true and msg if acquire succeeds */ -bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg, - const Mutex::ScopedLock& locker) +bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg) { - if (messages->acquire(position, msg)) { - observeAcquire(msg, locker); + bool ok; + { + Mutex::ScopedLock locker(messageLock); + ok = messages->acquire(position, msg); + if (ok) observeAcquire(msg, locker); + } + if (ok) { + if (mgmtObject) { + mgmtObject->inc_acquires(); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(); + } ++dequeueSincePurge; return true; } @@ -784,35 +851,43 @@ bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ assertClusterSafe(); QueueListeners::NotificationSet copy; - QueuedMessage removed; + QueuedMessage removed, qm(this, msg); bool dequeueRequired = false; { Mutex::ScopedLock locker(messageLock); - QueuedMessage qm(this, msg, ++sequence); - if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence); - - dequeueRequired = messages->push(qm, removed); - if (dequeueRequired) { + qm.position = ++sequence; + if (messages->push(qm, removed)) { + dequeueRequired = true; observeAcquire(removed, locker); - if (mgmtObject) { - mgmtObject->inc_discardsLvq(); - if (brokerMgmtObject) - brokerMgmtObject->inc_discardsLvq(); - } } - listeners.populate(copy); observeEnqueue(qm, locker); + if (policy.get()) { + policy->enqueued(qm); + } + listeners.populate(copy); } - copy.notify(); + if (insertSeqNo) msg->insertCustomProperty(seqNoKey, qm.position); + + mgntEnqStats(msg, mgmtObject, brokerMgmtObject); + if (dequeueRequired) { + if (mgmtObject) { + mgmtObject->inc_acquires(); + mgmtObject->inc_discardsLvq(); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(); + brokerMgmtObject->inc_discardsLvq(); + } if (isRecovery) { //can't issue new requests for the store until //recovery is complete + Mutex::ScopedLock locker(messageLock); pendingDequeues.push_back(removed); } else { dequeue(0, removed); } } + copy.notify(); } void isEnqueueComplete(uint32_t* result, const QueuedMessage& message) @@ -823,8 +898,8 @@ void isEnqueueComplete(uint32_t* result, const QueuedMessage& message) /** function only provided for unit tests, or code not in critical message path */ uint32_t Queue::getEnqueueCompleteMessageCount() const { - Mutex::ScopedLock locker(messageLock); uint32_t count = 0; + Mutex::ScopedLock locker(messageLock); messages->foreach(boost::bind(&isEnqueueComplete, &count, _1)); return count; } @@ -837,13 +912,13 @@ uint32_t Queue::getMessageCount() const uint32_t Queue::getConsumerCount() const { - Mutex::ScopedLock locker(consumerLock); + Mutex::ScopedLock locker(messageLock); return consumerCount; } bool Queue::canAutoDelete() const { - Mutex::ScopedLock locker(consumerLock); + Mutex::ScopedLock locker(messageLock); return autodelete && !consumerCount && !owner; } @@ -950,14 +1025,20 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { ScopedUse u(barrier); if (!u.acquired) return false; - { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return false; if (!ctxt) { + if (policy.get()) policy->dequeued(msg); + messages->deleted(msg); observeDequeue(msg, locker); } } + + if (!ctxt) { + mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject); + } + // This check prevents messages which have been forced persistent on one queue from dequeuing // from another on which no forcing has taken place and thus causing a store error. bool fp = msg.payload->isForcedPersistent(); @@ -974,8 +1055,13 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) void Queue::dequeueCommitted(const QueuedMessage& msg) { - Mutex::ScopedLock locker(messageLock); - observeDequeue(msg, locker); + { + Mutex::ScopedLock locker(messageLock); + if (policy.get()) policy->dequeued(msg); + messages->deleted(msg); + observeDequeue(msg, locker); + } + mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject); if (mgmtObject != 0) { _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); const uint64_t contentSize = msg.payload->contentSize(); @@ -995,10 +1081,20 @@ void Queue::dequeueCommitted(const QueuedMessage& msg) * Removes the first (oldest) message from the in-memory delivery queue as well dequeing * it from the logical (and persistent if applicable) queue */ -bool Queue::popAndDequeue(QueuedMessage& msg, const Mutex::ScopedLock& locker) +bool Queue::popAndDequeue(QueuedMessage& msg) { - if (messages->consume(msg)) { - observeAcquire(msg, locker); + bool popped; + { + Mutex::ScopedLock locker(messageLock); + popped = messages->consume(msg); + if (popped) observeAcquire(msg, locker); + } + if (popped) { + if (mgmtObject) { + mgmtObject->inc_acquires(); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(); + } dequeue(0, msg); return true; } else { @@ -1008,13 +1104,10 @@ bool Queue::popAndDequeue(QueuedMessage& msg, const Mutex::ScopedLock& locker) /** * Updates policy and management when a message has been dequeued, - * expects messageLock to be held + * Requires messageLock be held by caller. */ -void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) +void Queue::observeDequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&) { - mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject); - if (policy.get()) policy->dequeued(msg); - messages->deleted(msg); for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ (*i)->dequeued(msg); @@ -1024,17 +1117,11 @@ void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) } } -/** updates queue observers when a message has become unavailable for transfer, - * expects messageLock to be held +/** updates queue observers when a message has become unavailable for transfer. + * Requires messageLock be held by caller. */ -void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&) +void Queue::observeAcquire(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&) { - if (mgmtObject) { - mgmtObject->inc_acquires(); - if (brokerMgmtObject) - brokerMgmtObject->inc_acquires(); - } - for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ (*i)->acquired(msg); @@ -1044,17 +1131,11 @@ void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&) } } -/** updates queue observers when a message has become re-available for transfer, - * expects messageLock to be held +/** updates queue observers when a message has become re-available for transfer + * Requires messageLock be held by caller. */ -void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) +void Queue::observeRequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&) { - if (mgmtObject) { - mgmtObject->inc_releases(); - if (brokerMgmtObject) - brokerMgmtObject->inc_releases(); - } - for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ (*i)->requeued(msg); @@ -1064,6 +1145,33 @@ void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) } } +/** updates queue observers when a new consumer has subscribed to this queue. + */ +void Queue::observeConsumerAdd( const Consumer& c, const qpid::sys::Mutex::ScopedLock&) +{ + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->consumerAdded(c); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what()); + } + } +} + +/** updates queue observers when a consumer has unsubscribed from this queue. + */ +void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::ScopedLock&) +{ + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->consumerRemoved(c); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what()); + } + } +} + + void Queue::create(const FieldTable& _settings) { settings = _settings; @@ -1211,23 +1319,21 @@ void Queue::configureImpl(const FieldTable& _settings) void Queue::destroyed() { unbind(broker->getExchanges()); - { - Mutex::ScopedLock locker(messageLock); - QueuedMessage m; - while(popAndDequeue(m, locker)) { - DeliverableMessage msg(m.payload); - if (alternateExchange.get()) { - if (brokerMgmtObject) - brokerMgmtObject->inc_abandonedViaAlt(); - alternateExchange->routeWithAlternate(msg); - } else { - if (brokerMgmtObject) - brokerMgmtObject->inc_abandoned(); - } + + QueuedMessage m; + while(popAndDequeue(m)) { + DeliverableMessage msg(m.payload); + if (alternateExchange.get()) { + if (brokerMgmtObject) + brokerMgmtObject->inc_abandonedViaAlt(); + alternateExchange->routeWithAlternate(msg); + } else { + if (brokerMgmtObject) + brokerMgmtObject->inc_abandoned(); } - if (alternateExchange.get()) - alternateExchange->decAlternateUsers(); } + if (alternateExchange.get()) + alternateExchange->decAlternateUsers(); if (store) { barrier.destroy(); @@ -1238,7 +1344,7 @@ void Queue::destroyed() if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>(); notifyDeleted(); { - Mutex::ScopedLock locker(messageLock); + Mutex::ScopedLock lock(messageLock); observers.clear(); } } @@ -1248,8 +1354,8 @@ void Queue::notifyDeleted() QueueListeners::ListenerSet set; { Mutex::ScopedLock locker(messageLock); - listeners.snapshot(set); deleted = true; + listeners.snapshot(set); } set.notifyAll(); } @@ -1267,6 +1373,7 @@ void Queue::unbind(ExchangeRegistry& exchanges) void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy) { + Mutex::ScopedLock locker(messageLock); policy = _policy; if (policy.get()) policy->setQueue(this); @@ -1274,6 +1381,7 @@ void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy) const QueuePolicy* Queue::getPolicy() { + Mutex::ScopedLock locker(messageLock); return policy.get(); } @@ -1555,8 +1663,12 @@ void Queue::recoveryComplete(ExchangeRegistry& exchanges) << "\": exchange does not exist."); } //process any pending dequeues - for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); - pendingDequeues.clear(); + std::deque<QueuedMessage> pd; + { + Mutex::ScopedLock locker(messageLock); + pendingDequeues.swap(pd); + } + for_each(pd.begin(), pd.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } void Queue::insertSequenceNumbers(const std::string& key) @@ -1566,10 +1678,10 @@ void Queue::insertSequenceNumbers(const std::string& key) QPID_LOG(debug, "Inserting sequence numbers as " << key); } -/** updates queue observers and state when a message has become available for transfer, - * expects messageLock to be held +/** updates queue observers and state when a message has become available for transfer + * Requires messageLock be held by caller. */ -void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&) +void Queue::observeEnqueue(const QueuedMessage& m, const qpid::sys::Mutex::ScopedLock&) { for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) { try { @@ -1578,10 +1690,6 @@ void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&) QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what()); } } - if (policy.get()) { - policy->enqueued(m); - } - mgntEnqStats(m.payload, mgmtObject, brokerMgmtObject); } void Queue::updateEnqueued(const QueuedMessage& m) @@ -1589,12 +1697,16 @@ void Queue::updateEnqueued(const QueuedMessage& m) if (m.payload) { boost::intrusive_ptr<Message> payload = m.payload; enqueue(0, payload, true); - messages->updateAcquired(m); - if (policy.get()) { - policy->recoverEnqueued(payload); + { + Mutex::ScopedLock locker(messageLock); + messages->updateAcquired(m); + observeEnqueue(m, locker); + if (policy.get()) { + policy->recoverEnqueued(payload); + policy->enqueued(m); + } } - Mutex::ScopedLock locker(messageLock); - observeEnqueue(m, locker); + mgntEnqStats(m.payload, mgmtObject, brokerMgmtObject); } else { QPID_LOG(warning, "Queue informed of enqueued message that has no payload"); } @@ -1602,10 +1714,16 @@ void Queue::updateEnqueued(const QueuedMessage& m) bool Queue::isEnqueued(const QueuedMessage& msg) { + Mutex::ScopedLock locker(messageLock); return !policy.get() || policy->isEnqueued(msg); } +// Note: accessing listeners outside of lock is dangerous. Caller must ensure the queue's +// state is not changed while listeners is referenced. QueueListeners& Queue::getListeners() { return listeners; } + +// Note: accessing messages outside of lock is dangerous. Caller must ensure the queue's +// state is not changed while messages is referenced. Messages& Queue::getMessages() { return *messages; } const Messages& Queue::getMessages() const { return *messages; } @@ -1618,13 +1736,13 @@ void Queue::checkNotDeleted(const Consumer::shared_ptr& c) void Queue::addObserver(boost::shared_ptr<QueueObserver> observer) { - Mutex::ScopedLock locker(messageLock); + Mutex::ScopedLock lock(messageLock); observers.insert(observer); } void Queue::removeObserver(boost::shared_ptr<QueueObserver> observer) { - Mutex::ScopedLock locker(messageLock); + Mutex::ScopedLock lock(messageLock); observers.erase(observer); } @@ -1687,7 +1805,7 @@ Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {} bool Queue::UsageBarrier::acquire() { - Monitor::ScopedLock l(parent.messageLock); + Monitor::ScopedLock l(parent.messageLock); /** @todo: use a dedicated lock instead of messageLock */ if (parent.deleted) { return false; } else { diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 282eb691b9..9b9acc677c 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -107,7 +107,22 @@ class Queue : public boost::enable_shared_from_this<Queue>, QueueListeners listeners; std::auto_ptr<Messages> messages; std::deque<QueuedMessage> pendingDequeues;//used to avoid dequeuing during recovery - mutable qpid::sys::Mutex consumerLock; + /** messageLock is used to keep the Queue's state consistent while processing message + * events, such as message dispatch, enqueue, acquire, and dequeue. It must be held + * while updating certain members in order to keep these members consistent with + * each other: + * o messages + * o sequence + * o policy + * o listeners + * o allocator + * o observeXXX() methods + * o observers + * o pendingDequeues (TBD: move under separate lock) + * o exclusive OwnershipToken (TBD: move under separate lock) + * o consumerCount (TBD: move under separate lock) + * o Queue::UsageBarrier (TBD: move under separate lock) + */ mutable qpid::sys::Monitor messageLock; mutable qpid::sys::Mutex ownershipLock; mutable uint64_t persistenceId; @@ -143,17 +158,17 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool isExcluded(boost::intrusive_ptr<Message>& msg); - /** update queue observers, stats, policy, etc when the messages' state changes. Lock - * must be held by caller */ + /** update queue observers, stats, policy, etc when the messages' state changes. + * messageLock is held by caller */ void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); - bool popAndDequeue(QueuedMessage&, const sys::Mutex::ScopedLock& lock); - // acquire message @ position, return true and set msg if acquire succeeds - bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg, - const sys::Mutex::ScopedLock& held); + void observeConsumerAdd( const Consumer&, const sys::Mutex::ScopedLock& lock); + void observeConsumerRemove( const Consumer&, const sys::Mutex::ScopedLock& lock); + bool popAndDequeue(QueuedMessage&); + bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg); void forcePersistent(QueuedMessage& msg); int getEventMode(); void configureImpl(const qpid::framing::FieldTable& settings); @@ -355,6 +370,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, /** Apply f to each Observer on the queue */ template <class F> void eachObserver(F f) { + sys::Mutex::ScopedLock l(messageLock); std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f); } diff --git a/qpid/cpp/src/qpid/broker/QueueListeners.cpp b/qpid/cpp/src/qpid/broker/QueueListeners.cpp index 32c208b073..0338a674cf 100644 --- a/qpid/cpp/src/qpid/broker/QueueListeners.cpp +++ b/qpid/cpp/src/qpid/broker/QueueListeners.cpp @@ -79,10 +79,6 @@ void QueueListeners::NotificationSet::notify() std::for_each(browsers.begin(), browsers.end(), boost::mem_fn(&Consumer::notify)); } -bool QueueListeners::contains(Consumer::shared_ptr c) const { - return c->inListeners; -} - void QueueListeners::ListenerSet::notifyAll() { std::for_each(listeners.begin(), listeners.end(), boost::mem_fn(&Consumer::notify)); diff --git a/qpid/cpp/src/qpid/broker/QueueListeners.h b/qpid/cpp/src/qpid/broker/QueueListeners.h index 0659499253..ca844fd47e 100644 --- a/qpid/cpp/src/qpid/broker/QueueListeners.h +++ b/qpid/cpp/src/qpid/broker/QueueListeners.h @@ -30,7 +30,7 @@ namespace broker { /** * Track and notify components that wish to be notified of messages * that become available on a queue. - * + * * None of the methods defined here are protected by locking. However * the populate method allows a 'snapshot' to be taken of the * listeners to be notified. NotificationSet::notify() may then be @@ -61,11 +61,10 @@ class QueueListeners friend class QueueListeners; }; - void addListener(Consumer::shared_ptr); - void removeListener(Consumer::shared_ptr); + void addListener(Consumer::shared_ptr); + void removeListener(Consumer::shared_ptr); void populate(NotificationSet&); void snapshot(ListenerSet&); - bool contains(Consumer::shared_ptr c) const; void notifyAll(); template <class F> void eachListener(F f) { |