diff options
| author | Gordon Sim <gsim@apache.org> | 2013-07-10 13:47:05 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-07-10 13:47:05 +0000 |
| commit | 04ad679eb369aac2a49dbccfeaeaa5964e688dd8 (patch) | |
| tree | d7d482d9ca24a6922698034b1b66f56b7645e4d0 /cpp/src/qpid/broker/Queue.cpp | |
| parent | ed030b2309b484610bc3aca3c9b978cea1f6d00b (diff) | |
| download | qpid-python-04ad679eb369aac2a49dbccfeaeaa5964e688dd8.tar.gz | |
QPID-4976: support standard lifetime policies
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1501768 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 242 |
1 files changed, 165 insertions, 77 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index c402e3e016..1d0a8017ef 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -175,8 +175,6 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, name(_name), store(_store), owner(0), - consumerCount(0), - browserCount(0), exclusive(0), messages(new MessageDeque()), persistenceId(0), @@ -188,8 +186,8 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, allocator(new FifoDistributor( *messages )), redirectSource(false) { - if (settings.maxDepth.hasCount()) current.setCount(0); - if (settings.maxDepth.hasSize()) current.setSize(0); + current.setCount(0);//always track depth in messages + if (settings.maxDepth.hasSize()) current.setSize(0);//track depth in bytes only if policy requires it if (settings.traceExcludes.size()) { split(traceExclude, settings.traceExcludes, ", "); } @@ -340,6 +338,7 @@ void Queue::release(const QueueCursor& position, bool markRedelivered) bool Queue::dequeueMessageAt(const SequenceNumber& position) { + ScopedAutoDelete autodelete(*this); boost::intrusive_ptr<PersistableMessage> pmsg; { Mutex::ScopedLock locker(messageLock); @@ -348,7 +347,7 @@ bool Queue::dequeueMessageAt(const SequenceNumber& position) Message* msg = messages->find(position, &cursor); if (msg) { if (msg->isPersistent()) pmsg = msg->getPersistentContext(); - observeDequeue(*msg, locker); + observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0); messages->deleted(cursor); } else { QPID_LOG(debug, "Could not dequeue message at " << position << "; no such message"); @@ -385,6 +384,7 @@ bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c) { if (!checkNotDeleted(c)) return false; QueueListeners::NotificationSet set; + ScopedAutoDelete autodelete(*this); while (true) { //TODO: reduce lock scope Mutex::ScopedLock locker(messageLock); @@ -393,7 +393,7 @@ bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c) if (msg) { if (msg->hasExpired()) { QPID_LOG(debug, "Message expired from queue '" << name << "'"); - observeDequeue(*msg, locker); + observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0); //ERROR: don't hold lock across call to store!! if (msg->isPersistent()) dequeueFromStore(msg->getPersistentContext()); if (mgmtObject) { @@ -490,14 +490,31 @@ bool Queue::find(SequenceNumber pos, Message& msg) const return false; } +void Queue::markInUse(bool controlling) +{ + Mutex::ScopedLock locker(messageLock); + if (controlling) users.addLifecycleController(); + else users.addOther(); +} + +void Queue::releaseFromUse(bool controlling) +{ + if (controlling) { + { + Mutex::ScopedLock locker(messageLock); + users.removeLifecycleController(); + } + scheduleAutoDelete(); + } else { + Mutex::ScopedLock locker(messageLock); + users.removeOther(); + } +} + void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) { { Mutex::ScopedLock locker(messageLock); - // NOTE: consumerCount is actually a count of all - // subscriptions, both acquiring and non-acquiring (browsers). - // Check for exclusivity of acquiring consumers. - size_t acquiringConsumers = consumerCount - browserCount; if (c->preAcquires()) { if(settings.isBrowseOnly) { throw NotAllowedException( @@ -509,7 +526,7 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); } else if(requestExclusive) { - if(acquiringConsumers) { + if(users.hasConsumers()) { throw ResourceLockedException( QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); @@ -517,13 +534,11 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) exclusive = c->getSession(); } } - } - else if(c->isCounted()) { - browserCount++; + users.addConsumer(); + } else if(c->isCounted()) { + users.addBrowser(); } if(c->isCounted()) { - consumerCount++; - //reset auto deletion timer if necessary if (settings.autoDeleteDelay && autoDeleteTask) { autoDeleteTask->cancel(); @@ -542,14 +557,24 @@ void Queue::cancel(Consumer::shared_ptr c) removeListener(c); if(c->isCounted()) { - Mutex::ScopedLock locker(messageLock); - consumerCount--; - if (!c->preAcquires()) browserCount--; - if(exclusive) exclusive = 0; - observeConsumerRemove(*c, locker); - } - if (mgmtObject != 0 && c->isCounted()) { - mgmtObject->dec_consumerCount(); + bool unused; + { + Mutex::ScopedLock locker(messageLock); + if (c->preAcquires()) { + users.removeConsumer(); + if (exclusive) exclusive = 0; + } else { + users.removeBrowser(); + } + observeConsumerRemove(*c, locker); + unused = !users.isUsed(); + } + if (mgmtObject != 0) { + mgmtObject->dec_consumerCount(); + } + if (unused && settings.autodelete) { + scheduleAutoDelete(); + } } } @@ -564,7 +589,7 @@ void Queue::purgeExpired(sys::Duration lapse) { dequeueSincePurge -= count; int seconds = int64_t(lapse)/qpid::sys::TIME_SEC; if (seconds == 0 || count / seconds < 1) { - uint32_t count = remove(0, boost::bind(&Message::hasExpired, _1), 0, CONSUMER); + uint32_t count = remove(0, boost::bind(&Message::hasExpired, _1), 0, CONSUMER, settings.autodelete); QPID_LOG(debug, "Purged " << count << " expired messages from " << getName()); // // Report the count of discarded-by-ttl messages @@ -671,8 +696,9 @@ namespace { } } // end namespace -uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunctor f, SubscriptionType type) +uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunctor f, SubscriptionType type, bool triggerAutoDelete) { + ScopedAutoDelete autodelete(*this); std::deque<Message> removed; { QueueCursor c(type); @@ -686,7 +712,7 @@ uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunct //don't actually acquire, just act as if we did observeAcquire(*m, locker); } - observeDequeue(*m, locker); + observeDequeue(*m, locker, triggerAutoDelete ? &autodelete : 0); removed.push_back(*m);//takes a copy of the message if (!messages->deleted(c)) { QPID_LOG(warning, "Failed to correctly remove message from " << name << "; state is not consistent!"); @@ -726,7 +752,7 @@ uint32_t Queue::purge(const uint32_t qty, boost::shared_ptr<Exchange> dest, const qpid::types::Variant::Map *filter) { std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter)); - uint32_t count = remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&reroute, dest, _1), CONSUMER/*?*/); + uint32_t count = remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&reroute, dest, _1), CONSUMER/*?*/, settings.autodelete); if (mgmtObject && count) { mgmtObject->inc_acquires(count); @@ -752,7 +778,7 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty, const qpid::types::Variant::Map *filter) { std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter)); - return remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&moveTo, destq, _1), CONSUMER/*?*/); + return remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&moveTo, destq, _1), CONSUMER/*?*/, settings.autodelete); } void Queue::push(Message& message, bool /*isRecovery*/) @@ -779,15 +805,41 @@ uint32_t Queue::getMessageCount() const uint32_t Queue::getConsumerCount() const { Mutex::ScopedLock locker(messageLock); - return consumerCount; + return users.getSubscriberCount(); } bool Queue::canAutoDelete() const { Mutex::ScopedLock locker(messageLock); - return settings.autodelete && !consumerCount && !owner; + return !deleted && checkAutoDelete(locker); +} + +bool Queue::checkAutoDelete(const Mutex::ScopedLock& lock) const +{ + if (settings.autodelete) { + switch (settings.lifetime) { + case QueueSettings::DELETE_IF_UNUSED: + return isUnused(lock); + case QueueSettings::DELETE_IF_EMPTY: + return !users.isInUseByController() && isEmpty(lock); + case QueueSettings::DELETE_IF_UNUSED_AND_EMPTY: + return isUnused(lock) && isEmpty(lock); + case QueueSettings::DELETE_ON_CLOSE: + return !users.isInUseByController(); + } + } + return false; +} + +bool Queue::isUnused(const Mutex::ScopedLock&) const +{ + return !owner && !users.isUsed();; } +bool Queue::isEmpty(const Mutex::ScopedLock&) const +{ + return current.getCount() == 0; +} /* * return true if enqueue succeeded and message should be made * available; returning false will result in the message being dropped @@ -853,8 +905,9 @@ void Queue::dequeueCommited(const Message& msg) //store and will not be available for delivery. The only action //required is to ensure the observers are notified and the //management stats are correctly decremented + ScopedAutoDelete autodelete(*this); Mutex::ScopedLock locker(messageLock); - observeDequeue(msg, locker); + observeDequeue(msg, locker, settings.autodelete ? &autodelete : 0); if (mgmtObject != 0) { mgmtObject->inc_msgTxnDequeues(); mgmtObject->inc_byteTxnDequeues(msg.getContentSize()); @@ -874,6 +927,7 @@ void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor) { ScopedUse u(barrier); if (!u.acquired) return; + ScopedAutoDelete autodelete(*this); boost::intrusive_ptr<PersistableMessage> pmsg; { Mutex::ScopedLock locker(messageLock); @@ -881,7 +935,7 @@ void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor) if (msg) { if (msg->isPersistent()) pmsg = msg->getPersistentContext(); if (!ctxt) { - observeDequeue(*msg, locker); + observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0); messages->deleted(cursor);//message pointer not valid after this } } else { @@ -895,11 +949,12 @@ void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor) void Queue::dequeueCommitted(const QueueCursor& cursor) { + ScopedAutoDelete autodelete(*this); Mutex::ScopedLock locker(messageLock); Message* msg = messages->find(cursor); if (msg) { const uint64_t contentSize = msg->getContentSize(); - observeDequeue(*msg, locker); + observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0); if (mgmtObject != 0) { mgmtObject->inc_msgTxnDequeues(); mgmtObject->inc_byteTxnDequeues(contentSize); @@ -920,7 +975,7 @@ void Queue::dequeueCommitted(const QueueCursor& cursor) * Updates policy and management when a message has been dequeued, * Requires messageLock be held by caller. */ -void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock&) +void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock& lock, ScopedAutoDelete* autodelete) { current -= QueueDepth(1, msg.getContentSize()); mgntDeqStats(msg, mgmtObject, brokerMgmtObject); @@ -931,6 +986,7 @@ void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock&) QPID_LOG(warning, "Exception on notification of dequeue for queue " << getName() << ": " << e.what()); } } + if (autodelete && isEmpty(lock)) autodelete->check(lock); } /** updates queue observers when a message has become unavailable for transfer. @@ -1053,9 +1109,10 @@ void Queue::abandoned(const Message& message) void Queue::destroyed() { unbind(broker->getExchanges()); - remove(0, 0, boost::bind(&Queue::abandoned, this, _1), REPLICATOR/*even acquired message are treated as abandoned*/); + remove(0, 0, boost::bind(&Queue::abandoned, this, _1), REPLICATOR/*even acquired message are treated as abandoned*/, false); if (alternateExchange.get()) { alternateExchange->decAlternateUsers(); + alternateExchange.reset(); } if (store) { @@ -1170,33 +1227,12 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange() return alternateExchange; } -void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId) -{ - if (broker.getQueues().destroyIf(queue->getName(), - boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) { - if (broker.getAcl()) - broker.getAcl()->recordDestroyQueue(queue->getName()); - - QPID_LOG_CAT(debug, model, "Auto-delete queue: " << queue->getName() - << " user:" << userId - << " rhost:" << connectionId ); - queue->destroyed(); - } else { - QPID_LOG_CAT(debug, model, "Auto-delete queue could not be deleted: " << queue->getName() - << " user:" << userId - << " rhost:" << connectionId ); - } -} - struct AutoDeleteTask : qpid::sys::TimerTask { - Broker& broker; Queue::shared_ptr queue; - std::string connectionId; - std::string userId; - AutoDeleteTask(Broker& b, Queue::shared_ptr q, const std::string& cId, const std::string& uId, AbsTime fireTime) - : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q), connectionId(cId), userId(uId) {} + AutoDeleteTask(Queue::shared_ptr q, AbsTime fireTime) + : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), queue(q) {} void fire() { @@ -1204,34 +1240,56 @@ struct AutoDeleteTask : qpid::sys::TimerTask //created, but then became unused again before the task fired; //in this case ignore this request as there will have already //been a later task added - tryAutoDeleteImpl(broker, queue, connectionId, userId); + queue->tryAutoDelete(); } }; -void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId) +void Queue::scheduleAutoDelete() { - if (queue->settings.autoDeleteDelay && queue->canAutoDelete()) { - AbsTime time(now(), Duration(queue->settings.autoDeleteDelay * TIME_SEC)); - queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, connectionId, userId, time)); - broker.getTimer().add(queue->autoDeleteTask); - QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated"); + if (canAutoDelete()) { + if (settings.autoDeleteDelay) { + AbsTime time(now(), Duration(settings.autoDeleteDelay * TIME_SEC)); + autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(shared_from_this(), time)); + broker->getTimer().add(autoDeleteTask); + QPID_LOG(debug, "Timed auto-delete for " << getName() << " initiated"); + } else { + tryAutoDelete(); + } + } +} + +void Queue::tryAutoDelete() +{ + if (broker->getQueues().destroyIf(name, boost::bind(boost::mem_fn(&Queue::canAutoDelete), shared_from_this()))) { + if (broker->getAcl()) + broker->getAcl()->recordDestroyQueue(name); + + QPID_LOG_CAT(debug, model, "Auto-delete queue deleted: " << name << " (" << deleted << ")"); + destroyed(); } else { - tryAutoDeleteImpl(broker, queue, connectionId, userId); + QPID_LOG_CAT(debug, model, "Auto-delete queue could not be deleted: " << name); } } bool Queue::isExclusiveOwner(const OwnershipToken* const o) const { - Mutex::ScopedLock locker(ownershipLock); + Mutex::ScopedLock locker(messageLock); return o == owner; } void Queue::releaseExclusiveOwnership() { - Mutex::ScopedLock locker(ownershipLock); - owner = 0; - if (mgmtObject) { - mgmtObject->set_exclusive(false); + bool unused; + { + Mutex::ScopedLock locker(messageLock); + owner = 0; + if (mgmtObject) { + mgmtObject->set_exclusive(false); + } + unused = !users.isUsed(); + } + if (unused && settings.autodelete) { + scheduleAutoDelete(); } } @@ -1241,7 +1299,7 @@ bool Queue::setExclusiveOwner(const OwnershipToken* const o) if (settings.autoDeleteDelay && autoDeleteTask) { autoDeleteTask->cancel(); } - Mutex::ScopedLock locker(ownershipLock); + Mutex::ScopedLock locker(messageLock); if (owner) { return false; } else { @@ -1255,7 +1313,7 @@ bool Queue::setExclusiveOwner(const OwnershipToken* const o) bool Queue::hasExclusiveOwner() const { - Mutex::ScopedLock locker(ownershipLock); + Mutex::ScopedLock locker(messageLock); return owner != 0; } @@ -1388,7 +1446,7 @@ struct After { void Queue::setPosition(SequenceNumber n) { Mutex::ScopedLock locker(messageLock); if (n < sequence) { - remove(0, After(n), MessagePredicate(), BROWSER); + remove(0, After(n), MessagePredicate(), BROWSER, false); } sequence = n; QPID_LOG(debug, "Set position to " << sequence << " on " << getName()); @@ -1450,6 +1508,12 @@ bool Queue::checkNotDeleted(const Consumer::shared_ptr& c) return !deleted; } +bool Queue::isDeleted() const +{ + Mutex::ScopedLock lock(messageLock); + return deleted; +} + void Queue::addObserver(boost::shared_ptr<QueueObserver> observer) { Mutex::ScopedLock lock(messageLock); @@ -1495,6 +1559,7 @@ void Queue::setDequeueSincePurge(uint32_t value) { void Queue::reject(const QueueCursor& cursor) { + ScopedAutoDelete autodelete(*this); Exchange::shared_ptr alternate = getAlternateExchange(); Message copy; boost::intrusive_ptr<PersistableMessage> pmsg; @@ -1505,7 +1570,7 @@ void Queue::reject(const QueueCursor& cursor) if (alternate) copy = *message; if (message->isPersistent()) pmsg = message->getPersistentContext(); countRejected(); - observeDequeue(*message, locker); + observeDequeue(*message, locker, settings.autodelete ? &autodelete : 0); messages->deleted(cursor); } else { return; @@ -1526,7 +1591,7 @@ void Queue::reject(const QueueCursor& cursor) bool Queue::checkDepth(const QueueDepth& increment, const Message&) { - if (current && (settings.maxDepth - current < increment)) { + if (settings.maxDepth && (settings.maxDepth - current < increment)) { if (mgmtObject) { mgmtObject->inc_discardsOverflow(); if (brokerMgmtObject) @@ -1619,6 +1684,29 @@ void Queue::setMgmtRedirectState( std::string peer, bool enabled, bool isSrc ) { mgmtObject->set_redirectSource(isSrc); } } +Queue::QueueUsers::QueueUsers() : consumers(0), browsers(0), others(0), controller(false) {} +void Queue::QueueUsers::addConsumer() { ++consumers; } +void Queue::QueueUsers::addBrowser() { ++browsers; } +void Queue::QueueUsers::addLifecycleController() { assert(!controller); controller = true; } +void Queue::QueueUsers::addOther(){ ++others; } +void Queue::QueueUsers::removeConsumer() { assert(consumers > 0); --consumers; } +void Queue::QueueUsers::removeBrowser() { assert(browsers > 0); --browsers; } +void Queue::QueueUsers::removeLifecycleController() { assert(controller); controller = false; } +void Queue::QueueUsers::removeOther() { assert(others > 0); --others; } +bool Queue::QueueUsers::isInUseByController() const { return controller; } +bool Queue::QueueUsers::isUsed() const { return controller || consumers || browsers || others; } +uint32_t Queue::QueueUsers::getSubscriberCount() const { return consumers + browsers; } +bool Queue::QueueUsers::hasConsumers() const { return consumers; } + +Queue::ScopedAutoDelete::ScopedAutoDelete(Queue& q) : queue(q), eligible(false) {} +void Queue::ScopedAutoDelete::check(const sys::Mutex::ScopedLock& lock) +{ + eligible = queue.checkAutoDelete(lock); +} +Queue::ScopedAutoDelete::~ScopedAutoDelete() +{ + if (eligible) queue.scheduleAutoDelete(); +} }} |
