diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 667 |
1 files changed, 438 insertions, 229 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 015957927f..e7305c021d 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -19,8 +19,9 @@ * */ -#include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" + +#include "qpid/broker/Broker.h" #include "qpid/broker/QueueEvents.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/Fairshare.h" @@ -41,6 +42,7 @@ #include "qpid/management/ManagementAgent.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/FieldTable.h" +#include "qpid/framing/FieldValue.h" #include "qpid/sys/ClusterSafe.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" @@ -56,7 +58,9 @@ #include <boost/intrusive_ptr.hpp> -using namespace qpid::broker; +namespace qpid { +namespace broker { + using namespace qpid::sys; using namespace qpid::framing; using qpid::management::ManagementAgent; @@ -88,8 +92,57 @@ const std::string qpidInsertSequenceNumbers("qpid.insert_sequence_numbers"); const int ENQUEUE_ONLY=1; const int ENQUEUE_AND_DEQUEUE=2; + +inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg, + _qmf::Queue* mgmtObject, + _qmf::Broker* brokerMgmtObject) +{ + if (mgmtObject != 0) { + _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); + _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); + + uint64_t contentSize = msg->contentSize(); + qStats->msgTotalEnqueues +=1; + bStats->msgTotalEnqueues += 1; + qStats->byteTotalEnqueues += contentSize; + bStats->byteTotalEnqueues += contentSize; + if (msg->isPersistent ()) { + qStats->msgPersistEnqueues += 1; + bStats->msgPersistEnqueues += 1; + qStats->bytePersistEnqueues += contentSize; + bStats->bytePersistEnqueues += contentSize; + } + mgmtObject->statisticsUpdated(); + brokerMgmtObject->statisticsUpdated(); + } +} + +inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg, + _qmf::Queue* mgmtObject, + _qmf::Broker* brokerMgmtObject) +{ + if (mgmtObject != 0){ + _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); + _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); + uint64_t contentSize = msg->contentSize(); + + qStats->msgTotalDequeues += 1; + bStats->msgTotalDequeues += 1; + qStats->byteTotalDequeues += contentSize; + bStats->byteTotalDequeues += contentSize; + if (msg->isPersistent ()){ + qStats->msgPersistDequeues += 1; + bStats->msgPersistDequeues += 1; + qStats->bytePersistDequeues += contentSize; + bStats->bytePersistDequeues += contentSize; + } + mgmtObject->statisticsUpdated(); + brokerMgmtObject->statisticsUpdated(); + } } +} // namespace + Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, const OwnershipToken* const _owner, @@ -101,6 +154,7 @@ Queue::Queue(const string& _name, bool _autodelete, store(_store), owner(_owner), consumerCount(0), + browserCount(0), exclusive(0), noLocal(false), persistLastNode(false), @@ -166,7 +220,7 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ if (msg->isImmediate() && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg); - alternateExchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders()); + alternateExchange->route(deliverable); } } else if (isLocal(msg)) { //drop message @@ -183,11 +237,16 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ void Queue::recoverPrepared(boost::intrusive_ptr<Message>& msg) { + Mutex::ScopedLock locker(messageLock); if (policy.get()) policy->recoverEnqueued(msg); } -void Queue::recover(boost::intrusive_ptr<Message>& msg){ - if (policy.get()) policy->recoverEnqueued(msg); +void Queue::recover(boost::intrusive_ptr<Message>& msg) +{ + { + Mutex::ScopedLock locker(messageLock); + if (policy.get()) policy->recoverEnqueued(msg); + } push(msg, true); if (store){ @@ -209,11 +268,16 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); if (mgmtObject != 0){ - mgmtObject->inc_msgTxnEnqueues (); - mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); + _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); + const uint64_t contentSize = msg->contentSize(); + qStats->msgTxnEnqueues += 1; + qStats->byteTxnEnqueues += contentSize; + mgmtObject->statisticsUpdated(); if (brokerMgmtObject) { - brokerMgmtObject->inc_msgTxnEnqueues (); - brokerMgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); + _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); + bStats->msgTxnEnqueues += 1; + bStats->byteTxnEnqueues += contentSize; + brokerMgmtObject->statisticsUpdated(); } } } @@ -222,7 +286,6 @@ void Queue::requeue(const QueuedMessage& msg){ assertClusterSafe(); QueueListeners::NotificationSet copy; { - Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return; if (deleted) { // @@ -238,10 +301,20 @@ void Queue::requeue(const QueuedMessage& msg){ if (brokerMgmtObject) brokerMgmtObject->inc_abandoned(); } - mgntDeqStats(msg.payload); + mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject); } else { - messages->release(msg); - listeners.populate(copy); + { + Mutex::ScopedLock locker(messageLock); + messages->release(msg); + observeRequeue(msg, locker); + listeners.populate(copy); + } + + if (mgmtObject) { + mgmtObject->inc_releases(); + if (brokerMgmtObject) + brokerMgmtObject->inc_releases(); + } // for persistLastNode - don't force a message twice to disk, but force it if no force before if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) { @@ -251,7 +324,6 @@ void Queue::requeue(const QueuedMessage& msg){ enqueue(0, payload); } } - observeRequeue(msg, locker); } } copy.notify(); @@ -259,10 +331,9 @@ void Queue::requeue(const QueuedMessage& msg){ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { - Mutex::ScopedLock locker(messageLock); assertClusterSafe(); QPID_LOG(debug, "Attempting to acquire message at " << position); - if (acquire(position, message, locker)) { + if (acquire(position, message)) { QPID_LOG(debug, "Acquired message at " << position << " from " << name); return true; } else { @@ -273,17 +344,20 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer) { - Mutex::ScopedLock locker(messageLock); assertClusterSafe(); QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position); - - if (!allocator->allocate( consumer, msg )) { + bool ok; + { + Mutex::ScopedLock locker(messageLock); + ok = allocator->allocate( consumer, msg ); + } + if (!ok) { QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name); return false; } QueuedMessage copy(msg); - if (acquire( msg.position, copy, locker)) { + if (acquire( msg.position, copy)) { QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name); return true; } @@ -325,59 +399,73 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) { while (true) { - Mutex::ScopedLock locker(messageLock); QueuedMessage msg; - if (allocator->nextConsumableMessage(c, msg)) { - if (msg.payload->hasExpired()) { - QPID_LOG(debug, "Message expired from queue '" << name << "'"); - c->setPosition(msg.position); - dequeue(0, msg); - if (mgmtObject) { - mgmtObject->inc_discardsTtl(); - if (brokerMgmtObject) - brokerMgmtObject->inc_discardsTtl(); - } + bool found; + { + Mutex::ScopedLock locker(messageLock); + found = allocator->nextConsumableMessage(c, msg); + if (!found) listeners.addListener(c); + } + if (!found) { + QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); + return NO_MESSAGES; + } - continue; + if (msg.payload->hasExpired()) { + QPID_LOG(debug, "Message expired from queue '" << name << "'"); + c->setPosition(msg.position); + dequeue(0, msg); + if (mgmtObject) { + mgmtObject->inc_discardsTtl(); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsTtl(); } + continue; + } - if (c->filter(msg.payload)) { - if (c->accept(msg.payload)) { + if (c->filter(msg.payload)) { + if (c->accept(msg.payload)) { + { + Mutex::ScopedLock locker(messageLock); bool ok = allocator->allocate( c->getName(), msg ); // inform allocator (void) ok; assert(ok); observeAcquire(msg, locker); - m = msg; - return CONSUMED; - } else { - //message(s) are available but consumer hasn't got enough credit - QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); - messages->release(msg); - return CANT_CONSUME; } + if (mgmtObject) { + mgmtObject->inc_acquires(); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(); + } + m = msg; + return CONSUMED; } else { - //consumer will never want this message - QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); - messages->release(msg); - return CANT_CONSUME; + //message(s) are available but consumer hasn't got enough credit + QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); } } else { - QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); - listeners.addListener(c); - return NO_MESSAGES; + //consumer will never want this message + QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); } + + Mutex::ScopedLock locker(messageLock); + messages->release(msg); + return CANT_CONSUME; } } bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) { while (true) { - Mutex::ScopedLock locker(messageLock); QueuedMessage msg; - - if (!allocator->nextBrowsableMessage(c, msg)) { // no next available + bool found; + { + Mutex::ScopedLock locker(messageLock); + found = allocator->nextBrowsableMessage(c, msg); + if (!found) listeners.addListener(c); + } + if (!found) { // no next available QPID_LOG(debug, "No browsable messages available for consumer " << c->getName() << " on queue '" << name << "'"); - listeners.addListener(c); return false; } @@ -435,60 +523,67 @@ bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const { void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ assertClusterSafe(); { - Mutex::ScopedLock locker(consumerLock); - if(exclusive) { - throw ResourceLockedException( - QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); - } else if(requestExclusive) { - if(consumerCount) { + Mutex::ScopedLock locker(messageLock); + // NOTE: consumerCount is actually a count of all + // subscriptions, both acquiring and non-acquiring (browsers). + // Check for exclusivity of acquiring consumers. + size_t acquiringConsumers = consumerCount - browserCount; + if (c->preAcquires()) { + if(exclusive) { throw ResourceLockedException( - QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); - } else { - exclusive = c->getSession(); + QPID_MSG("Queue " << getName() + << " has an exclusive consumer. No more consumers allowed.")); + } else if(requestExclusive) { + if(acquiringConsumers) { + throw ResourceLockedException( + QPID_MSG("Queue " << getName() + << " already has consumers. Exclusive access denied.")); + } else { + exclusive = c->getSession(); + } } } + else + browserCount++; consumerCount++; - if (mgmtObject != 0) - mgmtObject->inc_consumerCount (); //reset auto deletion timer if necessary if (autoDeleteTimeout && autoDeleteTask) { autoDeleteTask->cancel(); } + observeConsumerAdd(*c, locker); } - Mutex::ScopedLock locker(messageLock); - for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { - try{ - (*i)->consumerAdded(*c); - } catch (const std::exception& e) { - QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what()); - } - } + if (mgmtObject != 0) + mgmtObject->inc_consumerCount (); } void Queue::cancel(Consumer::shared_ptr c){ removeListener(c); { - Mutex::ScopedLock locker(consumerLock); + Mutex::ScopedLock locker(messageLock); consumerCount--; + if (!c->preAcquires()) browserCount--; if(exclusive) exclusive = 0; - if (mgmtObject != 0) - mgmtObject->dec_consumerCount (); - } - Mutex::ScopedLock locker(messageLock); - for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { - try{ - (*i)->consumerRemoved(*c); - } catch (const std::exception& e) { - QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what()); - } + observeConsumerRemove(*c, locker); } + if (mgmtObject != 0) + mgmtObject->dec_consumerCount (); } QueuedMessage Queue::get(){ - Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); - if (messages->consume(msg)) - observeAcquire(msg, locker); + bool ok; + { + Mutex::ScopedLock locker(messageLock); + ok = messages->consume(msg); + if (ok) observeAcquire(msg, locker); + } + + if (ok && mgmtObject) { + mgmtObject->inc_acquires(); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(); + } + return msg; } @@ -520,22 +615,26 @@ void Queue::purgeExpired(qpid::sys::Duration lapse) messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1)); } - // - // Report the count of discarded-by-ttl messages - // - if (mgmtObject && !expired.empty()) { - mgmtObject->inc_discardsTtl(expired.size()); - if (brokerMgmtObject) - brokerMgmtObject->inc_discardsTtl(expired.size()); - } + if (!expired.empty()) { + if (mgmtObject) { + mgmtObject->inc_acquires(expired.size()); + mgmtObject->inc_discardsTtl(expired.size()); + if (brokerMgmtObject) { + brokerMgmtObject->inc_acquires(expired.size()); + brokerMgmtObject->inc_discardsTtl(expired.size()); + } + } - for (std::deque<QueuedMessage>::const_iterator i = expired.begin(); - i != expired.end(); ++i) { - { - Mutex::ScopedLock locker(messageLock); - observeAcquire(*i, locker); + for (std::deque<QueuedMessage>::const_iterator i = expired.begin(); + i != expired.end(); ++i) { + { + // KAG: should be safe to retake lock after the removeIf, since + // no other thread can touch these messages after the removeIf() call + Mutex::ScopedLock locker(messageLock); + observeAcquire(*i, locker); + } + dequeue( 0, *i ); } - dequeue( 0, *i ); } } } @@ -661,32 +760,46 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter)); Collector c(*mf.get(), purge_request); - Mutex::ScopedLock locker(messageLock); - messages->removeIf( boost::bind<bool>(boost::ref(c), _1) ); + { + Mutex::ScopedLock locker(messageLock); + messages->removeIf( boost::bind<bool>(boost::ref(c), _1) ); + } - if (mgmtObject && !c.matches.empty()) { - if (dest.get()) { - mgmtObject->inc_reroutes(c.matches.size()); - if (brokerMgmtObject) - brokerMgmtObject->inc_reroutes(c.matches.size()); - } else { - mgmtObject->inc_discardsPurge(c.matches.size()); - if (brokerMgmtObject) - brokerMgmtObject->inc_discardsPurge(c.matches.size()); + if (!c.matches.empty()) { + if (mgmtObject) { + mgmtObject->inc_acquires(c.matches.size()); + if (dest.get()) { + mgmtObject->inc_reroutes(c.matches.size()); + if (brokerMgmtObject) { + brokerMgmtObject->inc_acquires(c.matches.size()); + brokerMgmtObject->inc_reroutes(c.matches.size()); + } + } else { + mgmtObject->inc_discardsPurge(c.matches.size()); + if (brokerMgmtObject) { + brokerMgmtObject->inc_acquires(c.matches.size()); + brokerMgmtObject->inc_discardsPurge(c.matches.size()); + } + } } - } - for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); - qmsg != c.matches.end(); ++qmsg) { - // Update observers and message state: - observeAcquire(*qmsg, locker); - dequeue(0, *qmsg); - QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName()); - // now reroute if necessary - if (dest.get()) { - assert(qmsg->payload); - DeliverableMessage dmsg(qmsg->payload); - dest->routeWithAlternate(dmsg); + for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); + qmsg != c.matches.end(); ++qmsg) { + + { + // KAG: should be safe to retake lock after the removeIf, since + // no other thread can touch these messages after the removeIf call + Mutex::ScopedLock locker(messageLock); + observeAcquire(*qmsg, locker); + } + dequeue(0, *qmsg); + QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName()); + // now reroute if necessary + if (dest.get()) { + assert(qmsg->payload); + DeliverableMessage dmsg(qmsg->payload); + dest->routeWithAlternate(dmsg); + } } } return c.matches.size(); @@ -698,27 +811,51 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty, std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter)); Collector c(*mf.get(), qty); - Mutex::ScopedLock locker(messageLock); - messages->removeIf( boost::bind<bool>(boost::ref(c), _1) ); + { + Mutex::ScopedLock locker(messageLock); + messages->removeIf( boost::bind<bool>(boost::ref(c), _1) ); + } + - for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); - qmsg != c.matches.end(); ++qmsg) { + if (!c.matches.empty()) { // Update observers and message state: - observeAcquire(*qmsg, locker); - dequeue(0, *qmsg); - // and move to destination Queue. - assert(qmsg->payload); - destq->deliver(qmsg->payload); + + if (mgmtObject) { + mgmtObject->inc_acquires(c.matches.size()); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(c.matches.size()); + } + + for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); + qmsg != c.matches.end(); ++qmsg) { + { + Mutex::ScopedLock locker(messageLock); + observeAcquire(*qmsg, locker); + } + dequeue(0, *qmsg); + // and move to destination Queue. + assert(qmsg->payload); + destq->deliver(qmsg->payload); + } } return c.matches.size(); } /** Acquire the message at the given position, return true and msg if acquire succeeds */ -bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg, - const Mutex::ScopedLock& locker) +bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg) { - if (messages->acquire(position, msg)) { - observeAcquire(msg, locker); + bool ok; + { + Mutex::ScopedLock locker(messageLock); + ok = messages->acquire(position, msg); + if (ok) observeAcquire(msg, locker); + } + if (ok) { + if (mgmtObject) { + mgmtObject->inc_acquires(); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(); + } ++dequeueSincePurge; return true; } @@ -728,35 +865,43 @@ bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ assertClusterSafe(); QueueListeners::NotificationSet copy; - QueuedMessage removed; + QueuedMessage removed, qm(this, msg); bool dequeueRequired = false; { Mutex::ScopedLock locker(messageLock); - QueuedMessage qm(this, msg, ++sequence); - if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence); - - dequeueRequired = messages->push(qm, removed); - if (dequeueRequired) { + qm.position = ++sequence; + if (messages->push(qm, removed)) { + dequeueRequired = true; observeAcquire(removed, locker); - if (mgmtObject) { - mgmtObject->inc_discardsLvq(); - if (brokerMgmtObject) - brokerMgmtObject->inc_discardsLvq(); - } } - listeners.populate(copy); observeEnqueue(qm, locker); + if (policy.get()) { + policy->enqueued(qm); + } + listeners.populate(copy); } - copy.notify(); + if (insertSeqNo) msg->insertCustomProperty(seqNoKey, qm.position); + + mgntEnqStats(msg, mgmtObject, brokerMgmtObject); + if (dequeueRequired) { + if (mgmtObject) { + mgmtObject->inc_acquires(); + mgmtObject->inc_discardsLvq(); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(); + brokerMgmtObject->inc_discardsLvq(); + } if (isRecovery) { //can't issue new requests for the store until //recovery is complete + Mutex::ScopedLock locker(messageLock); pendingDequeues.push_back(removed); } else { dequeue(0, removed); } } + copy.notify(); } void isEnqueueComplete(uint32_t* result, const QueuedMessage& message) @@ -767,8 +912,8 @@ void isEnqueueComplete(uint32_t* result, const QueuedMessage& message) /** function only provided for unit tests, or code not in critical message path */ uint32_t Queue::getEnqueueCompleteMessageCount() const { - Mutex::ScopedLock locker(messageLock); uint32_t count = 0; + Mutex::ScopedLock locker(messageLock); messages->foreach(boost::bind(&isEnqueueComplete, &count, _1)); return count; } @@ -781,13 +926,13 @@ uint32_t Queue::getMessageCount() const uint32_t Queue::getConsumerCount() const { - Mutex::ScopedLock locker(consumerLock); + Mutex::ScopedLock locker(messageLock); return consumerCount; } bool Queue::canAutoDelete() const { - Mutex::ScopedLock locker(consumerLock); + Mutex::ScopedLock locker(messageLock); return autodelete && !consumerCount && !owner; } @@ -894,14 +1039,20 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { ScopedUse u(barrier); if (!u.acquired) return false; - { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return false; if (!ctxt) { + if (policy.get()) policy->dequeued(msg); + messages->deleted(msg); observeDequeue(msg, locker); } } + + if (!ctxt) { + mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject); + } + // This check prevents messages which have been forced persistent on one queue from dequeuing // from another on which no forcing has taken place and thus causing a store error. bool fp = msg.payload->isForcedPersistent(); @@ -918,14 +1069,24 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) void Queue::dequeueCommitted(const QueuedMessage& msg) { - Mutex::ScopedLock locker(messageLock); - observeDequeue(msg, locker); + { + Mutex::ScopedLock locker(messageLock); + if (policy.get()) policy->dequeued(msg); + messages->deleted(msg); + observeDequeue(msg, locker); + } + mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject); if (mgmtObject != 0) { - mgmtObject->inc_msgTxnDequeues(); - mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize()); + _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); + const uint64_t contentSize = msg.payload->contentSize(); + qStats->msgTxnDequeues += 1; + qStats->byteTxnDequeues += contentSize; + mgmtObject->statisticsUpdated(); if (brokerMgmtObject) { - brokerMgmtObject->inc_msgTxnDequeues(); - brokerMgmtObject->inc_byteTxnDequeues(msg.payload->contentSize()); + _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); + bStats->msgTxnDequeues += 1; + bStats->byteTxnDequeues += contentSize; + brokerMgmtObject->statisticsUpdated(); } } } @@ -934,10 +1095,20 @@ void Queue::dequeueCommitted(const QueuedMessage& msg) * Removes the first (oldest) message from the in-memory delivery queue as well dequeing * it from the logical (and persistent if applicable) queue */ -bool Queue::popAndDequeue(QueuedMessage& msg, const Mutex::ScopedLock& locker) +bool Queue::popAndDequeue(QueuedMessage& msg) { - if (messages->consume(msg)) { - observeAcquire(msg, locker); + bool popped; + { + Mutex::ScopedLock locker(messageLock); + popped = messages->consume(msg); + if (popped) observeAcquire(msg, locker); + } + if (popped) { + if (mgmtObject) { + mgmtObject->inc_acquires(); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(); + } dequeue(0, msg); return true; } else { @@ -947,13 +1118,10 @@ bool Queue::popAndDequeue(QueuedMessage& msg, const Mutex::ScopedLock& locker) /** * Updates policy and management when a message has been dequeued, - * expects messageLock to be held + * Requires messageLock be held by caller. */ -void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) +void Queue::observeDequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&) { - mgntDeqStats(msg.payload); - if (policy.get()) policy->dequeued(msg); - messages->deleted(msg); for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ (*i)->dequeued(msg); @@ -963,17 +1131,11 @@ void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) } } -/** updates queue observers when a message has become unavailable for transfer, - * expects messageLock to be held +/** updates queue observers when a message has become unavailable for transfer. + * Requires messageLock be held by caller. */ -void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&) +void Queue::observeAcquire(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&) { - if (mgmtObject) { - mgmtObject->inc_acquires(); - if (brokerMgmtObject) - brokerMgmtObject->inc_acquires(); - } - for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ (*i)->acquired(msg); @@ -983,17 +1145,11 @@ void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&) } } -/** updates queue observers when a message has become re-available for transfer, - * expects messageLock to be held +/** updates queue observers when a message has become re-available for transfer + * Requires messageLock be held by caller. */ -void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) +void Queue::observeRequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&) { - if (mgmtObject) { - mgmtObject->inc_releases(); - if (brokerMgmtObject) - brokerMgmtObject->inc_releases(); - } - for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ (*i)->requeued(msg); @@ -1003,6 +1159,33 @@ void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) } } +/** updates queue observers when a new consumer has subscribed to this queue. + */ +void Queue::observeConsumerAdd( const Consumer& c, const qpid::sys::Mutex::ScopedLock&) +{ + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->consumerAdded(c); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what()); + } + } +} + +/** updates queue observers when a consumer has unsubscribed from this queue. + */ +void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::ScopedLock&) +{ + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->consumerRemoved(c); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what()); + } + } +} + + void Queue::create(const FieldTable& _settings) { settings = _settings; @@ -1150,23 +1333,21 @@ void Queue::configureImpl(const FieldTable& _settings) void Queue::destroyed() { unbind(broker->getExchanges()); - { - Mutex::ScopedLock locker(messageLock); - QueuedMessage m; - while(popAndDequeue(m, locker)) { - DeliverableMessage msg(m.payload); - if (alternateExchange.get()) { - if (brokerMgmtObject) - brokerMgmtObject->inc_abandonedViaAlt(); - alternateExchange->routeWithAlternate(msg); - } else { - if (brokerMgmtObject) - brokerMgmtObject->inc_abandoned(); - } + + QueuedMessage m; + while(popAndDequeue(m)) { + DeliverableMessage msg(m.payload); + if (alternateExchange.get()) { + if (brokerMgmtObject) + brokerMgmtObject->inc_abandonedViaAlt(); + alternateExchange->routeWithAlternate(msg); + } else { + if (brokerMgmtObject) + brokerMgmtObject->inc_abandoned(); } - if (alternateExchange.get()) - alternateExchange->decAlternateUsers(); } + if (alternateExchange.get()) + alternateExchange->decAlternateUsers(); if (store) { barrier.destroy(); @@ -1177,7 +1358,7 @@ void Queue::destroyed() if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>(); notifyDeleted(); { - Mutex::ScopedLock locker(messageLock); + Mutex::ScopedLock lock(messageLock); observers.clear(); } } @@ -1187,8 +1368,8 @@ void Queue::notifyDeleted() QueueListeners::ListenerSet set; { Mutex::ScopedLock locker(messageLock); - listeners.snapshot(set); deleted = true; + listeners.snapshot(set); } set.notifyAll(); } @@ -1206,6 +1387,7 @@ void Queue::unbind(ExchangeRegistry& exchanges) void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy) { + Mutex::ScopedLock locker(messageLock); policy = _policy; if (policy.get()) policy->setQueue(this); @@ -1213,6 +1395,7 @@ void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy) const QueuePolicy* Queue::getPolicy() { + Mutex::ScopedLock locker(messageLock); return policy.get(); } @@ -1302,7 +1485,7 @@ struct AutoDeleteTask : qpid::sys::TimerTask Queue::shared_ptr queue; AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) - : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {} + : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q) {} void fire() { @@ -1388,11 +1571,15 @@ void Queue::countRejected() const void Queue::countFlowedToDisk(uint64_t size) const { if (mgmtObject) { - mgmtObject->inc_msgFtdEnqueues(); - mgmtObject->inc_byteFtdEnqueues(size); + _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); + qStats->msgFtdEnqueues += 1; + qStats->byteFtdEnqueues += size; + mgmtObject->statisticsUpdated(); if (brokerMgmtObject) { - brokerMgmtObject->inc_msgFtdEnqueues(); - brokerMgmtObject->inc_byteFtdEnqueues(size); + _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); + bStats->msgFtdEnqueues += 1; + bStats->byteFtdEnqueues += size; + brokerMgmtObject->statisticsUpdated(); } } } @@ -1400,11 +1587,15 @@ void Queue::countFlowedToDisk(uint64_t size) const void Queue::countLoadedFromDisk(uint64_t size) const { if (mgmtObject) { - mgmtObject->inc_msgFtdDequeues(); - mgmtObject->inc_byteFtdDequeues(size); + _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); + qStats->msgFtdDequeues += 1; + qStats->byteFtdDequeues += size; + mgmtObject->statisticsUpdated(); if (brokerMgmtObject) { - brokerMgmtObject->inc_msgFtdDequeues(); - brokerMgmtObject->inc_byteFtdDequeues(size); + _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); + bStats->msgFtdDequeues += 1; + bStats->byteFtdDequeues += size; + brokerMgmtObject->statisticsUpdated(); } } } @@ -1434,9 +1625,14 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str { _qmf::ArgsQueueReroute& rerouteArgs = (_qmf::ArgsQueueReroute&) args; boost::shared_ptr<Exchange> dest; - if (rerouteArgs.i_useAltExchange) + if (rerouteArgs.i_useAltExchange) { + if (!alternateExchange) { + status = Manageable::STATUS_PARAMETER_INVALID; + etext = "No alternate-exchange defined"; + break; + } dest = alternateExchange; - else { + } else { try { dest = broker->getExchanges().get(rerouteArgs.i_exchange); } catch(const std::exception&) { @@ -1486,8 +1682,12 @@ void Queue::recoveryComplete(ExchangeRegistry& exchanges) << "\": exchange does not exist."); } //process any pending dequeues - for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); - pendingDequeues.clear(); + std::deque<QueuedMessage> pd; + { + Mutex::ScopedLock locker(messageLock); + pendingDequeues.swap(pd); + } + for_each(pd.begin(), pd.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } void Queue::insertSequenceNumbers(const std::string& key) @@ -1497,10 +1697,10 @@ void Queue::insertSequenceNumbers(const std::string& key) QPID_LOG(debug, "Inserting sequence numbers as " << key); } -/** updates queue observers and state when a message has become available for transfer, - * expects messageLock to be held +/** updates queue observers and state when a message has become available for transfer + * Requires messageLock be held by caller. */ -void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&) +void Queue::observeEnqueue(const QueuedMessage& m, const qpid::sys::Mutex::ScopedLock&) { for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) { try { @@ -1509,10 +1709,6 @@ void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&) QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what()); } } - if (policy.get()) { - policy->enqueued(m); - } - mgntEnqStats(m.payload); } void Queue::updateEnqueued(const QueuedMessage& m) @@ -1520,12 +1716,16 @@ void Queue::updateEnqueued(const QueuedMessage& m) if (m.payload) { boost::intrusive_ptr<Message> payload = m.payload; enqueue(0, payload, true); - messages->updateAcquired(m); - if (policy.get()) { - policy->recoverEnqueued(payload); + { + Mutex::ScopedLock locker(messageLock); + messages->updateAcquired(m); + observeEnqueue(m, locker); + if (policy.get()) { + policy->recoverEnqueued(payload); + policy->enqueued(m); + } } - Mutex::ScopedLock locker(messageLock); - observeEnqueue(m, locker); + mgntEnqStats(m.payload, mgmtObject, brokerMgmtObject); } else { QPID_LOG(warning, "Queue informed of enqueued message that has no payload"); } @@ -1533,10 +1733,16 @@ void Queue::updateEnqueued(const QueuedMessage& m) bool Queue::isEnqueued(const QueuedMessage& msg) { + Mutex::ScopedLock locker(messageLock); return !policy.get() || policy->isEnqueued(msg); } +// Note: accessing listeners outside of lock is dangerous. Caller must ensure the queue's +// state is not changed while listeners is referenced. QueueListeners& Queue::getListeners() { return listeners; } + +// Note: accessing messages outside of lock is dangerous. Caller must ensure the queue's +// state is not changed while messages is referenced. Messages& Queue::getMessages() { return *messages; } const Messages& Queue::getMessages() const { return *messages; } @@ -1549,13 +1755,13 @@ void Queue::checkNotDeleted(const Consumer::shared_ptr& c) void Queue::addObserver(boost::shared_ptr<QueueObserver> observer) { - Mutex::ScopedLock locker(messageLock); + Mutex::ScopedLock lock(messageLock); observers.insert(observer); } void Queue::removeObserver(boost::shared_ptr<QueueObserver> observer) { - Mutex::ScopedLock locker(messageLock); + Mutex::ScopedLock lock(messageLock); observers.erase(observer); } @@ -1618,7 +1824,7 @@ Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {} bool Queue::UsageBarrier::acquire() { - Monitor::ScopedLock l(parent.messageLock); + Monitor::ScopedLock l(parent.messageLock); /** @todo: use a dedicated lock instead of messageLock */ if (parent.deleted) { return false; } else { @@ -1639,3 +1845,6 @@ void Queue::UsageBarrier::destroy() parent.deleted = true; while (count) parent.messageLock.wait(); } + +}} + |