/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/broker/Queue.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Connection.h" #include "qpid/broker/AclModule.h" #include "qpid/broker/QueueCursor.h" #include "qpid/broker/QueueDepth.h" #include "qpid/broker/QueueSettings.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/DeliverableMessage.h" #include "qpid/broker/MessageStore.h" #include "qpid/broker/MessageDeque.h" #include "qpid/broker/MessageDistributor.h" #include "qpid/broker/FifoDistributor.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/Selector.h" #include "qpid/broker/TransactionObserver.h" #include "qpid/broker/TxDequeue.h" //TODO: get rid of this #include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/StringUtils.h" #include "qpid/log/Statement.h" #include "qpid/management/ManagementAgent.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/FieldValue.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" #include "qpid/sys/Timer.h" #include "qpid/types/Variant.h" #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h" #include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h" #include "qmf/org/apache/qpid/broker/EventQueueDelete.h" #include "qmf/org/apache/qpid/broker/EventSubscribe.h" #include "qmf/org/apache/qpid/broker/EventUnsubscribe.h" #include #include #include #include #include namespace qpid { namespace broker { using namespace qpid::sys; using namespace qpid::framing; using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; using qpid::management::getCurrentPublisher; using std::string; using std::for_each; using std::mem_fun; namespace _qmf = qmf::org::apache::qpid::broker; namespace { inline void mgntEnqStats(const Message& msg, _qmf::Queue::shared_ptr mgmtObject, _qmf::Broker::shared_ptr brokerMgmtObject) { if (mgmtObject != 0) { _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); uint64_t contentSize = msg.getMessageSize(); qStats->msgTotalEnqueues +=1; bStats->msgTotalEnqueues += 1; qStats->byteTotalEnqueues += contentSize; bStats->byteTotalEnqueues += contentSize; if (msg.isPersistent ()) { qStats->msgPersistEnqueues += 1; bStats->msgPersistEnqueues += 1; qStats->bytePersistEnqueues += contentSize; bStats->bytePersistEnqueues += contentSize; } mgmtObject->statisticsUpdated(); brokerMgmtObject->statisticsUpdated(); } } inline void mgntDeqStats(const Message& msg, _qmf::Queue::shared_ptr mgmtObject, _qmf::Broker::shared_ptr brokerMgmtObject) { if (mgmtObject != 0){ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); uint64_t contentSize = msg.getMessageSize(); qStats->msgTotalDequeues += 1; bStats->msgTotalDequeues += 1; qStats->byteTotalDequeues += contentSize; bStats->byteTotalDequeues += contentSize; if (msg.isPersistent ()){ qStats->msgPersistDequeues += 1; bStats->msgPersistDequeues += 1; qStats->bytePersistDequeues += contentSize; bStats->bytePersistDequeues += contentSize; } mgmtObject->statisticsUpdated(); brokerMgmtObject->statisticsUpdated(); } } QueueSettings merge(const QueueSettings& inputs, const Broker& broker) { QueueSettings settings(inputs); settings.maxDepth = QueueDepth(); if (inputs.maxDepth.hasCount() && inputs.maxDepth.getCount()) { settings.maxDepth.setCount(inputs.maxDepth.getCount()); } if (inputs.maxDepth.hasSize()) { if (inputs.maxDepth.getSize()) { settings.maxDepth.setSize(inputs.maxDepth.getSize()); } } else if (broker.getQueueLimit()) { settings.maxDepth.setSize(broker.getQueueLimit()); } return settings; } } Queue::TxPublish::TxPublish(const Message& m, boost::shared_ptr q) : message(m), queue(q), prepared(false) {} bool Queue::TxPublish::prepare(TransactionContext* ctxt) throw() { try { prepared = queue->enqueue(ctxt, message); return true; } catch (const std::exception& e) { QPID_LOG(error, "Failed to prepare: " << e.what()); return false; } } void Queue::TxPublish::commit() throw() { try { if (prepared) queue->process(message); } catch (const std::exception& e) { QPID_LOG(error, "Failed to commit: " << e.what()); } } void Queue::TxPublish::rollback() throw() { try { if (prepared) queue->enqueueAborted(message); } catch (const std::exception& e) { QPID_LOG(error, "Failed to rollback: " << e.what()); } } void Queue::TxPublish::callObserver( const boost::shared_ptr& observer) { observer->enqueue(queue, message); } Queue::Queue(const string& _name, const QueueSettings& _settings, MessageStore* const _store, Manageable* parent, Broker* b) : name(_name), store(_store), owner(0), exclusive(0), messages(new MessageDeque()), persistenceId(0), settings(b ? merge(_settings, *b) : _settings), eventMode(0), observers(name, messageLock), broker(b), deleted(false), barrier(*this), allocator(new FifoDistributor( *messages )), redirectSource(false) { current.setCount(0);//always track depth in messages if (settings.maxDepth.getSize()) current.setSize(0);//track depth in bytes only if policy requires it if (settings.traceExcludes.size()) { split(traceExclude, settings.traceExcludes, ", "); } qpid::amqp_0_10::translate(settings.asMap(), encodableSettings); if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); if (agent != 0) { mgmtObject = _qmf::Queue::shared_ptr( new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete)); mgmtObject->set_arguments(settings.asMap()); agent->addObject(mgmtObject, 0, store != 0); brokerMgmtObject = boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject()); if (brokerMgmtObject) brokerMgmtObject->inc_queueCount(); } } if ( settings.isBrowseOnly ) { QPID_LOG ( info, "Queue " << name << " is browse-only." ); } if (settings.filter.size()) { selector.reset(new Selector(settings.filter)); QPID_LOG (info, "Queue " << name << " using filter: " << settings.filter); } } Queue::~Queue() { if (mgmtObject != 0) { mgmtObject->debugStats("destroying"); mgmtObject->resourceDestroy(); } } bool Queue::isLocal(const Message& msg) { //message is considered local if it was published on the same //connection as that of the session which declared this queue //exclusive (owner) or which has an exclusive subscription //(exclusive) return settings.noLocal && (msg.isLocalTo(owner) || msg.isLocalTo(exclusive)); } bool Queue::isExcluded(const Message& msg) { return traceExclude.size() && msg.isExcluded(traceExclude); } bool Queue::accept(const Message& msg) { //TODO: move some of this out of the queue and into the publishing //'link' for whatever protocol is used; that would let protocol //specific stuff be kept out the queue if (broker::amqp_0_10::MessageTransfer::isImmediateDeliveryRequired(msg) && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg, 0); alternateExchange->route(deliverable); } return false; } else if (isLocal(msg)) { //drop message QPID_LOG(info, "Dropping 'local' message from " << getName()); return false; } else if (isExcluded(msg)) { //drop message QPID_LOG(info, "Dropping excluded message from " << getName()); return false; } else { messages->check(msg); if (selector) { return selector->filter(msg); } else { return true; } } } void Queue::deliver(Message msg, TxBuffer* txn) { if (redirectPeer) { redirectPeer->deliverTo(msg, txn); } else { deliverTo(msg, txn); } } void Queue::deliverTo(Message msg, TxBuffer* txn) { if (accept(msg)) { interceptors.record(msg); if (txn) { TxOp::shared_ptr op(new TxPublish(msg, shared_from_this())); txn->enlist(op); QPID_LOG(debug, "Message " << msg.getSequence() << " enqueue on " << name << " enlisted in " << txn); } else { if (enqueue(0, msg)) { push(msg); QPID_LOG(debug, "Message " << msg.getSequence() << " enqueued on " << name); } else { QPID_LOG(debug, "Message " << msg.getSequence() << " dropped from " << name); } } } } void Queue::recoverPrepared(const Message& msg) { Mutex::ScopedLock locker(messageLock); current += QueueDepth(1, msg.getMessageSize()); } void Queue::recover(Message& msg) { recoverPrepared(msg); push(msg, true); } void Queue::process(Message& msg) { push(msg); if (mgmtObject != 0){ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); const uint64_t contentSize = msg.getMessageSize(); qStats->msgTxnEnqueues += 1; qStats->byteTxnEnqueues += contentSize; mgmtObject->statisticsUpdated(); if (brokerMgmtObject) { _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); bStats->msgTxnEnqueues += 1; bStats->byteTxnEnqueues += contentSize; brokerMgmtObject->statisticsUpdated(); } } } void Queue::release(const QueueCursor& position, bool markRedelivered) { QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); if (!deleted) { Message* message = messages->release(position); if (message) { if (!markRedelivered) message->undeliver(); listeners.populate(copy); observeRequeue(*message, locker); if (mgmtObject) { mgmtObject->inc_releases(); if (brokerMgmtObject) brokerMgmtObject->inc_releases(); } } } } copy.notify(); } bool Queue::dequeueMessageAt(const SequenceNumber& position) { ScopedAutoDelete autodelete(*this); boost::intrusive_ptr pmsg; { Mutex::ScopedLock locker(messageLock); QPID_LOG(debug, "Attempting to dequeue message at " << position); QueueCursor cursor; Message* msg = messages->find(position, &cursor); if (msg) { if (msg->isPersistent()) pmsg = msg->getPersistentContext(); observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0); messages->deleted(cursor); } else { QPID_LOG(debug, "Could not dequeue message at " << position << "; no such message"); return false; } } dequeueFromStore(pmsg); return true; } bool Queue::acquire(const QueueCursor& position, const std::string& consumer) { Mutex::ScopedLock locker(messageLock); Message* msg; msg = messages->find(position); if (msg) { QPID_LOG(debug, consumer << " attempting to acquire message at " << msg->getSequence()); if (!allocator->acquire(consumer, *msg)) { QPID_LOG(debug, "Not permitted to acquire msg at " << msg->getSequence() << " from '" << name); return false; } else { observeAcquire(*msg, locker); QPID_LOG(debug, "Acquired message at " << msg->getSequence() << " from " << name); return true; } } else { QPID_LOG(debug, "Failed to acquire message which no longer exists on " << name); return false; } } bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c) { if (!checkNotDeleted(c)) return false; QueueListeners::NotificationSet set; ScopedAutoDelete autodelete(*this); bool messageFound(false); while (true) { //TODO: reduce lock scope Mutex::ScopedLock locker(messageLock); QueueCursor cursor = c->getCursor(); // Save current position. Message* msg = messages->next(*c); // Advances c. if (msg) { if (isExpired(name, *msg, sys::AbsTime::now())) { QPID_LOG(debug, "Message expired from queue '" << name << "'"); observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0); //ERROR: don't hold lock across call to store!! if (msg->isPersistent()) dequeueFromStore(msg->getPersistentContext()); if (mgmtObject) { mgmtObject->inc_discardsTtl(); if (brokerMgmtObject) brokerMgmtObject->inc_discardsTtl(); } messages->deleted(*c); continue; } if (c->filter(*msg)) { if (c->accept(*msg)) { if (c->preAcquires()) { QPID_LOG(debug, "Attempting to acquire message " << msg->getSequence() << " from '" << name << "' with state " << msg->getState()); if (allocator->acquire(c->getName(), *msg)) { if (mgmtObject) { mgmtObject->inc_acquires(); if (brokerMgmtObject) brokerMgmtObject->inc_acquires(); } observeAcquire(*msg, locker); msg->deliver(); } else { QPID_LOG(debug, "Could not acquire message from '" << name << "'"); continue; //try another message } } QPID_LOG(debug, "Message " << msg->getSequence() << " retrieved from '" << name << "'"); m = *msg; messageFound = true; break; } else { //message(s) are available but consumer hasn't got enough credit QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); c->setCursor(cursor); // Restore cursor, will try again with credit if (c->preAcquires()) { //let someone else try listeners.populate(set); } break; } } else { //consumer will never want this message, try another one QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); if (c->preAcquires()) { //let someone else try to take this one listeners.populate(set); } } } else { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); c->stopped(); listeners.addListener(c); break; } } set.notify(); return messageFound; } void Queue::removeListener(Consumer::shared_ptr c) { QueueListeners::NotificationSet set; { Mutex::ScopedLock locker(messageLock); listeners.removeListener(c); if (messages->size()) { listeners.populate(set); } } set.notify(); } bool Queue::dispatch(Consumer::shared_ptr c) { Message msg; if (getNextMessage(msg, c)) { c->deliver(*c, msg); return true; } else { return false; } } bool Queue::find(SequenceNumber pos, Message& msg) const { Mutex::ScopedLock locker(messageLock); Message* ptr = messages->find(pos, 0); if (ptr) { msg = *ptr; return true; } return false; } void Queue::markInUse(bool controlling) { Mutex::ScopedLock locker(messageLock); if (controlling) users.addLifecycleController(); else users.addOther(); } void Queue::releaseFromUse(bool controlling, bool doDelete) { bool trydelete; if (controlling) { Mutex::ScopedLock locker(messageLock); users.removeLifecycleController(); trydelete = true; } else { Mutex::ScopedLock locker(messageLock); users.removeOther(); trydelete = isUnused(locker); } if (trydelete && doDelete) scheduleAutoDelete(); } void Queue::consume(Consumer::shared_ptr c, bool requestExclusive, const framing::FieldTable& arguments, const std::string& connectionId, const std::string& userId) { boost::intrusive_ptr t; { Mutex::ScopedLock locker(messageLock); if (c->preAcquires()) { if(settings.isBrowseOnly) { throw NotAllowedException( QPID_MSG("Queue " << name << " is browse only. Refusing acquiring consumer.")); } if(exclusive) { throw ResourceLockedException( QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); } else if(requestExclusive) { if(users.hasConsumers()) { throw ResourceLockedException( QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); } else { exclusive = c->getSession(); } } users.addConsumer(); } else if(c->isCounted()) { users.addBrowser(); } if(c->isCounted()) { //reset auto deletion timer if necessary if (settings.autoDeleteDelay && autoDeleteTask) { t = autoDeleteTask; } observeConsumerAdd(*c, locker); } } if (t) t->cancel(); if (mgmtObject != 0 && c->isCounted()) { mgmtObject->inc_consumerCount(); } if (broker) { ManagementAgent* agent = broker->getManagementAgent(); if (agent) { agent->raiseEvent( _qmf::EventSubscribe(connectionId, userId, name, c->getTag(), requestExclusive, ManagementAgent::toMap(arguments))); } } } void Queue::cancel(Consumer::shared_ptr c, const std::string& connectionId, const std::string& userId) { removeListener(c); if(c->isCounted()) { bool unused; { Mutex::ScopedLock locker(messageLock); if (c->preAcquires()) { users.removeConsumer(); if (exclusive) exclusive = 0; } else { users.removeBrowser(); } observeConsumerRemove(*c, locker); unused = !users.isUsed(); } if (mgmtObject != 0) { mgmtObject->dec_consumerCount(); } if (unused && settings.autodelete) scheduleAutoDelete(); } if (broker) { ManagementAgent* agent = broker->getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventUnsubscribe(connectionId, userId, c->getTag())); } } bool Queue::isExpired(const std::string& name, const Message& m, AbsTime now) { if (m.getExpiration() < now) { QPID_LOG(debug, "Message expired from queue '" << name << "': " << m.printProperties()); return true; } else { return false; } } /** *@param lapse: time since the last purgeExpired */ void Queue::purgeExpired(sys::Duration lapse) { //As expired messages are discarded during dequeue also, only //bother explicitly expiring if the rate of dequeues since last //attempt is less than one per second. int count = dequeueSincePurge.get(); dequeueSincePurge -= count; int seconds = int64_t(lapse)/qpid::sys::TIME_SEC; if (seconds == 0 || count / seconds < 1) { sys::AbsTime time = sys::AbsTime::now(); uint32_t count = remove(0, boost::bind(&isExpired, name, _1, time), 0, CONSUMER, settings.autodelete); QPID_LOG(debug, "Purged " << count << " expired messages from " << getName()); // // Report the count of discarded-by-ttl messages // if (mgmtObject && count) { mgmtObject->inc_discardsTtl(count); if (brokerMgmtObject) { brokerMgmtObject->inc_discardsTtl(count); } } } } namespace { // for use with purge/move below - collect messages that match a given filter // class MessageFilter { public: static const std::string typeKey; static const std::string paramsKey; static MessageFilter *create( const ::qpid::types::Variant::Map *filter ); virtual bool match( const Message& ) const { return true; } virtual ~MessageFilter() {} protected: MessageFilter() {}; }; const std::string MessageFilter::typeKey("filter_type"); const std::string MessageFilter::paramsKey("filter_params"); // filter by message header string value exact match class HeaderMatchFilter : public MessageFilter { public: /* Config: { 'filter_type' : 'header_match_str', 'filter_params' : { 'header_key' : "
", 'header_value' : "" } } */ static const std::string typeKey; static const std::string headerKey; static const std::string valueKey; HeaderMatchFilter( const std::string& _header, const std::string& _value ) : MessageFilter (), header(_header), value(_value) {} bool match( const Message& msg ) const { return msg.getPropertyAsString(header) == value; } private: const std::string header; const std::string value; }; const std::string HeaderMatchFilter::typeKey("header_match_str"); const std::string HeaderMatchFilter::headerKey("header_key"); const std::string HeaderMatchFilter::valueKey("header_value"); // factory to create correct filter based on map MessageFilter* MessageFilter::create( const ::qpid::types::Variant::Map *filter ) { using namespace qpid::types; if (filter && !filter->empty()) { Variant::Map::const_iterator i = filter->find(MessageFilter::typeKey); if (i != filter->end()) { if (i->second.asString() == HeaderMatchFilter::typeKey) { Variant::Map::const_iterator p = filter->find(MessageFilter::paramsKey); if (p != filter->end() && p->second.getType() == VAR_MAP) { Variant::Map::const_iterator k = p->second.asMap().find(HeaderMatchFilter::headerKey); Variant::Map::const_iterator v = p->second.asMap().find(HeaderMatchFilter::valueKey); if (k != p->second.asMap().end() && v != p->second.asMap().end()) { std::string headerKey(k->second.asString()); std::string value(v->second.asString()); QPID_LOG(debug, "Message filtering by header value configured. key: " << headerKey << " value: " << value ); return new HeaderMatchFilter( headerKey, value ); } } } } QPID_LOG(error, "Unrecognized message filter: '" << *filter << "'"); throw qpid::Exception(QPID_MSG("Unrecognized message filter: '" << *filter << "'")); } return new MessageFilter(); } void moveTo(boost::shared_ptr q, Message& m) { if (q) { q->deliver(m); } } } // end namespace uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunctor f, SubscriptionType type, bool triggerAutoDelete, uint32_t maxTests) { ScopedAutoDelete autodelete(*this); std::deque removed; { QueueCursor c(type); uint32_t count(0), tests(0); Mutex::ScopedLock locker(messageLock); Message* m = messages->next(c); while (m){ if (maxTests && tests++ >= maxTests) break; if (!p || p(*m)) { if (maxCount && count++ >= maxCount) break; if (m->getState() == AVAILABLE) { //don't actually acquire, just act as if we did observeAcquire(*m, locker); } observeDequeue(*m, locker, triggerAutoDelete ? &autodelete : 0); removed.push_back(*m);//takes a copy of the message if (!messages->deleted(c)) { QPID_LOG(warning, "Failed to correctly remove message from " << name << "; state is not consistent!"); assert(false); } } m = messages->next(c); } } for (std::deque::iterator i = removed.begin(); i != removed.end(); ++i) { if (f) f(*i);//ERROR? need to clear old persistent context? if (i->isPersistent()) dequeueFromStore(i->getPersistentContext());//do this outside of lock and after any re-routing } return removed.size(); } /** * purge - for purging all or some messages on a queue * depending on the purge_request * * qty == 0 then purge all messages * == N then purge N messages from queue * Sometimes qty == 1 to unblock the top of queue * * The dest exchange may be supplied to re-route messages through the exchange. * It is safe to re-route messages such that they arrive back on the same queue, * even if the queue is ordered by priority. * * An optional filter can be supplied that will be applied against each message. The * message is purged only if the filter matches. See MessageDistributor for more detail. */ uint32_t Queue::purge(const uint32_t qty, boost::shared_ptr dest, const qpid::types::Variant::Map *filter) { std::auto_ptr mf(MessageFilter::create(filter)); uint32_t count = remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&reroute, dest, _1), CONSUMER/*?*/, settings.autodelete); if (mgmtObject && count) { mgmtObject->inc_acquires(count); if (dest.get()) { mgmtObject->inc_reroutes(count); if (brokerMgmtObject) { brokerMgmtObject->inc_acquires(count); brokerMgmtObject->inc_reroutes(count); } } else { mgmtObject->inc_discardsPurge(count); if (brokerMgmtObject) { brokerMgmtObject->inc_acquires(count); brokerMgmtObject->inc_discardsPurge(count); } } } return count; } uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty, const qpid::types::Variant::Map *filter) { std::auto_ptr mf(MessageFilter::create(filter)); return remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&moveTo, destq, _1), CONSUMER/*?*/, settings.autodelete); } void Queue::push(Message& message, bool /*isRecovery*/) { QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); message.setSequence(++sequence); if (settings.sequencing) message.addAnnotation(settings.sequenceKey, (uint32_t)sequence); interceptors.publish(message); messages->publish(message); listeners.populate(copy); observeEnqueue(message, locker); } copy.notify(); } uint32_t Queue::getMessageCount() const { Mutex::ScopedLock locker(messageLock); return messages->size(); } uint32_t Queue::getConsumerCount() const { Mutex::ScopedLock locker(messageLock); return users.getSubscriberCount(); } bool Queue::canAutoDelete() const { Mutex::ScopedLock locker(messageLock); return !deleted && checkAutoDelete(locker); } bool Queue::checkAutoDelete(const Mutex::ScopedLock& lock) const { if (settings.autodelete) { switch (settings.lifetime) { case QueueSettings::DELETE_IF_UNUSED: return isUnused(lock); case QueueSettings::DELETE_IF_EMPTY: return !users.isInUseByController() && isEmpty(lock); case QueueSettings::DELETE_IF_UNUSED_AND_EMPTY: return isUnused(lock) && isEmpty(lock); case QueueSettings::DELETE_ON_CLOSE: return !users.isInUseByController(); } } return false; } bool Queue::isUnused(const Mutex::ScopedLock&) const { return !owner && !users.isUsed();; } bool Queue::isEmpty(const Mutex::ScopedLock&) const { return current.getCount() == 0; } /* * return true if enqueue succeeded and message should be made * available; returning false will result in the message being dropped */ bool Queue::enqueue(TransactionContext* ctxt, Message& msg) { ScopedUse u(barrier); if (!u.acquired) return false; { Mutex::ScopedLock locker(messageLock); if (!checkDepth(QueueDepth(1, msg.getMessageSize()), msg)) { return false; } } if (settings.traceId.size()) { msg.addTraceId(settings.traceId); } if (msg.isPersistent() && store) { // mark the message as being enqueued - the store MUST CALL msg->enqueueComplete() // when it considers the message stored. boost::intrusive_ptr pmsg = msg.getPersistentContext(); assert(pmsg); pmsg->enqueueAsync(shared_from_this()); try { store->enqueue(ctxt, pmsg, *this); } catch (...) { enqueueAborted(msg); throw; } } return true; } void Queue::enqueueAborted(const Message& msg) { //Called when any transactional enqueue is aborted (including but //not limited to a recovered dtx transaction) Mutex::ScopedLock locker(messageLock); current -= QueueDepth(1, msg.getMessageSize()); } void Queue::enqueueCommited(Message& msg) { //called when a recovered dtx enqueue operation is committed; the //message is already on disk and space has been reserved in policy //but it should now be made available process(msg); } void Queue::dequeueAborted(Message& msg) { //called when a recovered dtx dequeue operation is aborted; the //message should be added back to the queue push(msg); } void Queue::dequeueCommited(const Message& msg) { //called when a recovered dtx dequeue operation is committed; the //message will at this point have already been removed from the //store and will not be available for delivery. The only action //required is to ensure the observers are notified and the //management stats are correctly decremented ScopedAutoDelete autodelete(*this); Mutex::ScopedLock locker(messageLock); observeDequeue(msg, locker, settings.autodelete ? &autodelete : 0); if (mgmtObject != 0) { mgmtObject->inc_msgTxnDequeues(); mgmtObject->inc_byteTxnDequeues(msg.getMessageSize()); } } void Queue::dequeueFromStore(boost::intrusive_ptr msg) { ScopedUse u(barrier); if (u.acquired && msg && store) { store->dequeue(0, msg, *this); } } void Queue::dequeue(const QueueCursor& cursor, TxBuffer* txn) { if (txn) { TxOp::shared_ptr op; { Mutex::ScopedLock locker(messageLock); Message* msg = messages->find(cursor); if (msg) { op = TxOp::shared_ptr(new TxDequeue(cursor, shared_from_this(), msg->getSequence(), msg->getReplicationId())); } } if (op) txn->enlist(op); } else { dequeue(0, cursor); } } void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor) { ScopedUse u(barrier); if (!u.acquired) return; ScopedAutoDelete autodelete(*this); boost::intrusive_ptr pmsg; { Mutex::ScopedLock locker(messageLock); Message* msg = messages->find(cursor); if (msg) { if (msg->isPersistent()) pmsg = msg->getPersistentContext(); if (!ctxt) { observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0); messages->deleted(cursor);//message pointer not valid after this } } else { return; } } if (store && pmsg) { store->dequeue(ctxt, pmsg, *this); } } void Queue::dequeueCommitted(const QueueCursor& cursor) { ScopedAutoDelete autodelete(*this); Mutex::ScopedLock locker(messageLock); Message* msg = messages->find(cursor); if (msg) { const uint64_t contentSize = msg->getMessageSize(); observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0); if (mgmtObject != 0) { mgmtObject->inc_msgTxnDequeues(); mgmtObject->inc_byteTxnDequeues(contentSize); } if (brokerMgmtObject) { _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); bStats->msgTxnDequeues += 1; bStats->byteTxnDequeues += contentSize; brokerMgmtObject->statisticsUpdated(); } messages->deleted(cursor); } else { QPID_LOG(error, "Could not find dequeued message on commit"); } } /** * Updates policy and management when a message has been dequeued, * Requires messageLock be held by caller. */ void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock& lock, ScopedAutoDelete* autodelete) { current -= QueueDepth(1, msg.getMessageSize()); mgntDeqStats(msg, mgmtObject, brokerMgmtObject); observers.dequeued(msg, lock); if (autodelete && isEmpty(lock)) autodelete->check(lock); } /** updates queue observers when a message has become unavailable for transfer. * Requires messageLock be held by caller. */ void Queue::observeAcquire(const Message& msg, const Mutex::ScopedLock& l) { observers.acquired(msg, l); } /** updates queue observers when a message has become re-available for transfer * Requires messageLock be held by caller. */ void Queue::observeRequeue(const Message& msg, const Mutex::ScopedLock& l) { observers.requeued(msg, l); } /** updates queue observers when a new consumer has subscribed to this queue. */ void Queue::observeConsumerAdd( const Consumer& c, const qpid::sys::Mutex::ScopedLock& l) { observers.consumerAdded(c, l); } /** updates queue observers when a consumer has unsubscribed from this queue. */ void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::ScopedLock& l) { observers.consumerRemoved(c, l); } void Queue::create() { if (store) { store->create(*this, settings.storeSettings); } } int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key) { qpid::framing::FieldTable::ValuePtr v = settings.get(key); if (!v) { return 0; } else if (v->convertsTo()) { return v->get(); } else if (v->convertsTo()){ std::string s = v->get(); try { return boost::lexical_cast(s); } catch(const boost::bad_lexical_cast&) { QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s); return 0; } } else { QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << *v); return 0; } } bool getBoolSetting(const qpid::framing::FieldTable& settings, const std::string& key) { qpid::framing::FieldTable::ValuePtr v = settings.get(key); if (!v) { return false; } else if (v->convertsTo()) { return v->get() != 0; } else if (v->convertsTo()){ std::string s = v->get(); if (s == "True") return true; if (s == "true") return true; if (s == "False") return false; if (s == "false") return false; try { return boost::lexical_cast(s); } catch(const boost::bad_lexical_cast&) { QPID_LOG(warning, "Ignoring invalid boolean value for " << key << ": " << s); return false; } } else { QPID_LOG(warning, "Ignoring invalid boolean value for " << key << ": " << *v); return false; } } void Queue::abandoned(const Message& message) { if (reroute(alternateExchange, message) && brokerMgmtObject) brokerMgmtObject->inc_abandonedViaAlt(); else if (brokerMgmtObject) brokerMgmtObject->inc_abandoned(); } void Queue::destroyed() { if (mgmtObject != 0) mgmtObject->debugStats("destroying"); unbind(broker->getExchanges()); remove(0, 0, boost::bind(&Queue::abandoned, this, _1), REPLICATOR/*even acquired message are treated as abandoned*/, false); if (alternateExchange.get()) { alternateExchange->decAlternateUsers(); alternateExchange.reset(); } if (store) { barrier.destroy(); store->flush(*this); store->destroy(*this); store = 0;//ensure we make no more calls to the store for this queue } notifyDeleted(); { Mutex::ScopedLock l(messageLock); if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr(); observers.destroy(l); } if (mgmtObject != 0) { mgmtObject->resourceDestroy(); if (brokerMgmtObject) brokerMgmtObject->dec_queueCount(); mgmtObject = _qmf::Queue::shared_ptr(); // dont print debugStats in Queue::~Queue } } void Queue::notifyDeleted() { QueueListeners::ListenerSet set; { Mutex::ScopedLock locker(messageLock); deleted = true; listeners.snapshot(set); } set.notifyAll(); } void Queue::bound(const string& exchange, const string& key, const FieldTable& args) { bindings.add(exchange, key, args); } void Queue::unbind(ExchangeRegistry& exchanges) { bindings.unbind(exchanges, shared_from_this()); } uint64_t Queue::getPersistenceId() const { return persistenceId; } void Queue::setPersistenceId(uint64_t _persistenceId) const { if (mgmtObject != 0 && persistenceId == 0 && externalQueueStore) { ManagementObject::shared_ptr childObj = externalQueueStore->GetManagementObject(); if (childObj != 0) childObj->setReference(mgmtObject->getObjectId()); } persistenceId = _persistenceId; } void Queue::encode(Buffer& buffer) const { buffer.putShortString(name); buffer.put(encodableSettings); buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string("")); buffer.putShortString(userId); buffer.putInt8(isAutoDelete()); } uint32_t Queue::encodedSize() const { return name.size() + 1/*short string size octet*/ + (alternateExchange.get() ? alternateExchange->getName().size() : 0) + 1 /* short string */ + userId.size() + 1 /* short string */ + 1 /* autodelete flag */ + encodableSettings.encodedSize(); } void Queue::updateAclUserQueueCount() { if (broker->getAcl()) broker->getAcl()->approveCreateQueue(userId, name); } Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer ) { string name; string _userId; FieldTable ft; boost::shared_ptr alternate; QueueSettings settings(true, false); // settings.autodelete might be overwritten string altExch; bool has_userId = false; bool has_altExch = false; buffer.getShortString(name); buffer.get(ft); settings.populate(ft, settings.storeSettings); //get alternate exchange if (buffer.available()) { buffer.getShortString(altExch); has_altExch = true; } //get userId of queue's creator; ACL counters for userId are done after ACL plugin is initialized if (buffer.available()) { buffer.getShortString(_userId); has_userId = true; } //get autodelete flag if (buffer.available()) { settings.autodelete = buffer.getInt8(); } std::pair result = queues.declare(name, settings, alternate, true); if (has_altExch) result.first->alternateExchangeName.assign(altExch); if (has_userId) result.first->setOwningUser(_userId); if (result.first->getSettings().autoDeleteDelay) { result.first->scheduleAutoDelete(); } return result.first; } void Queue::setAlternateExchange(boost::shared_ptr exchange) { alternateExchange = exchange; alternateExchange->incAlternateUsers(); if (mgmtObject) { if (exchange.get() != 0) mgmtObject->set_altExchange(exchange->GetManagementObject()->getObjectId()); else mgmtObject->clr_altExchange(); } } boost::shared_ptr Queue::getAlternateExchange() { return alternateExchange; } struct AutoDeleteTask : qpid::sys::TimerTask { Queue::shared_ptr queue; long expectedVersion; AutoDeleteTask(Queue::shared_ptr q, AbsTime fireTime) : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), queue(q), expectedVersion(q->version) {} void fire() { //need to detect case where queue was used after the task was //created, but then became unused again before the task fired; //in this case ignore this request as there will have already //been a later task added queue->tryAutoDelete(expectedVersion); } }; void Queue::scheduleAutoDelete(bool immediate) { if (canAutoDelete()) { if (!immediate && settings.autoDeleteDelay) { AbsTime time(now(), Duration(settings.autoDeleteDelay * TIME_SEC)); autoDeleteTask = boost::intrusive_ptr(new AutoDeleteTask(shared_from_this(), time)); broker->getTimer().add(autoDeleteTask); QPID_LOG(debug, "Timed auto-delete for " << getName() << " initiated"); } else { tryAutoDelete(version); } } } void Queue::tryAutoDelete(long expectedVersion) { bool proceed(false); { Mutex::ScopedLock locker(messageLock); if (!deleted && checkAutoDelete(locker)) { proceed = true; } } if (proceed) { if (broker->getQueues().destroyIfUntouched(name, expectedVersion)) { { Mutex::ScopedLock locker(messageLock); deleted = true; } if (broker->getAcl()) broker->getAcl()->recordDestroyQueue(name); QPID_LOG_CAT(debug, model, "Auto-delete queue deleted: " << name << " (" << deleted << ")"); } else { //queue was accessed since the delayed auto-delete was scheduled, so try again QPID_LOG_CAT(debug, model, "Auto-delete interrupted for queue: " << name); scheduleAutoDelete(); } } else { QPID_LOG_CAT(debug, model, "Auto-delete queue could not be deleted: " << name); } } bool Queue::isExclusiveOwner(const OwnershipToken* const o) const { Mutex::ScopedLock locker(messageLock); return o == owner; } void Queue::releaseExclusiveOwnership(bool immediateExpiry) { bool unused; { Mutex::ScopedLock locker(messageLock); owner = 0; if (mgmtObject) { mgmtObject->set_exclusive(false); } unused = !users.isUsed(); } if (unused && settings.autodelete) { scheduleAutoDelete(immediateExpiry); } } bool Queue::setExclusiveOwner(const OwnershipToken* const o) { //reset auto deletion timer if necessary if (settings.autoDeleteDelay && autoDeleteTask) { autoDeleteTask->cancel(); } Mutex::ScopedLock locker(messageLock); if (owner || users.hasConsumers()) { return false; } else { owner = o; if (mgmtObject) { mgmtObject->set_exclusive(true); } return true; } } bool Queue::hasExclusiveOwner() const { Mutex::ScopedLock locker(messageLock); return owner != 0; } bool Queue::hasExclusiveConsumer() const { return exclusive; } void Queue::setExternalQueueStore(ExternalQueueStore* inst) { if (externalQueueStore!=inst && externalQueueStore) delete externalQueueStore; externalQueueStore = inst; if (inst) { ManagementObject::shared_ptr childObj = inst->GetManagementObject(); if (childObj != 0 && mgmtObject != 0) childObj->setReference(mgmtObject->getObjectId()); } } void Queue::countRejected() const { if (mgmtObject) { mgmtObject->inc_discardsSubscriber(); if (brokerMgmtObject) brokerMgmtObject->inc_discardsSubscriber(); } } ManagementObject::shared_ptr Queue::GetManagementObject(void) const { return mgmtObject; } Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string& etext) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; AclModule* acl = broker->getAcl(); std::string _userId = (getCurrentPublisher()?getCurrentPublisher()->getUserId():""); QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]"); switch (methodId) { case _qmf::Queue::METHOD_PURGE : { if ((acl)&&(!(acl->authorise(_userId, acl::ACT_PURGE, acl::OBJ_QUEUE, name, NULL)))) { throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied purge request from " << _userId)); } _qmf::ArgsQueuePurge& purgeArgs = (_qmf::ArgsQueuePurge&) args; purge(purgeArgs.i_request, boost::shared_ptr(), &purgeArgs.i_filter); status = Manageable::STATUS_OK; } break; case _qmf::Queue::METHOD_REROUTE : { _qmf::ArgsQueueReroute& rerouteArgs = (_qmf::ArgsQueueReroute&) args; boost::shared_ptr dest; if (rerouteArgs.i_useAltExchange) { if (!alternateExchange) { status = Manageable::STATUS_PARAMETER_INVALID; etext = "No alternate-exchange defined"; break; } dest = alternateExchange; } else { try { dest = broker->getExchanges().get(rerouteArgs.i_exchange); } catch(const std::exception&) { status = Manageable::STATUS_PARAMETER_INVALID; etext = "Exchange not found"; break; } } if (acl) { std::map params; params.insert(make_pair(acl::PROP_EXCHANGENAME, dest->getName())); if (!acl->authorise(_userId, acl::ACT_REROUTE, acl::OBJ_QUEUE, name, ¶ms)) { throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied reroute request from " << _userId)); } } purge(rerouteArgs.i_request, dest, &rerouteArgs.i_filter); status = Manageable::STATUS_OK; } break; } return status; } void Queue::query(qpid::types::Variant::Map& results) const { Mutex::ScopedLock locker(messageLock); /** @todo add any interesting queue state into results */ if (allocator) allocator->query(results); } namespace { struct After { framing::SequenceNumber seq; After(framing::SequenceNumber s) : seq(s) {} bool operator()(const Message& m) { return m.getSequence() > seq; } }; } // namespace void Queue::setPosition(SequenceNumber n) { Mutex::ScopedLock locker(messageLock); if (n < sequence) { remove(0, After(n), MessagePredicate(), BROWSER, false); } sequence = n; QPID_LOG(debug, "Set position to " << sequence << " on " << getName()); } SequenceNumber Queue::getPosition() { Mutex::ScopedLock locker(messageLock); return sequence; } void Queue::getRange(framing::SequenceNumber& front, framing::SequenceNumber& back, SubscriptionType type) { Mutex::ScopedLock locker(messageLock); QueueCursor cursor(type); back = sequence; Message* message = messages->next(cursor); front = message ? message->getSequence() : back+1; } int Queue::getEventMode() { return eventMode; } void Queue::recoveryComplete(ExchangeRegistry& exchanges) { // set the alternate exchange if (!alternateExchangeName.empty()) { Exchange::shared_ptr ae = exchanges.find(alternateExchangeName); if (ae) setAlternateExchange(ae); else QPID_LOG(warning, "Could not set alternate exchange \"" << alternateExchangeName << "\" on queue \"" << name << "\": exchange does not exist."); } //process any pending dequeues for (std::vector::iterator i = pendingDequeues.begin(); i != pendingDequeues.end(); ++i) { dequeueFromStore(i->getPersistentContext()); } pendingDequeues.clear(); } /** updates queue observers and state when a message has become available for transfer * Requires messageLock be held by caller. */ void Queue::observeEnqueue(const Message& m, const Mutex::ScopedLock& l) { observers.enqueued(m, l); mgntEnqStats(m, mgmtObject, brokerMgmtObject); } bool Queue::checkNotDeleted(const Consumer::shared_ptr& c) { if (deleted && !c->hideDeletedError()) throw ResourceDeletedException(QPID_MSG("Queue " << getName() << " has been deleted.")); return !deleted; } bool Queue::isDeleted() const { Mutex::ScopedLock lock(messageLock); return deleted; } void Queue::flush() { ScopedUse u(barrier); if (u.acquired && store) store->flush(*this); } bool Queue::bind(boost::shared_ptr exchange, const std::string& key, const qpid::framing::FieldTable& arguments) { if (!isDeleted() && exchange->bind(shared_from_this(), key, &arguments)) { bound(exchange->getName(), key, arguments); if (exchange->isDurable() && isDurable()) { store->bind(*exchange, *this, key, arguments); } return true; } else { return false; } } Broker* Queue::getBroker() { return broker; } void Queue::setDequeueSincePurge(uint32_t value) { dequeueSincePurge = value; } void Queue::reject(const QueueCursor& cursor) { ScopedAutoDelete autodelete(*this); Exchange::shared_ptr alternate = getAlternateExchange(); Message copy; boost::intrusive_ptr pmsg; { Mutex::ScopedLock locker(messageLock); Message* message = messages->find(cursor); if (message) { if (alternate) copy = *message; if (message->isPersistent()) pmsg = message->getPersistentContext(); countRejected(); observeDequeue(*message, locker, settings.autodelete ? &autodelete : 0); messages->deleted(cursor); } else { return; } } if (alternate) { copy.resetDeliveryCount(); DeliverableMessage delivery(copy, 0); alternate->routeWithAlternate(delivery); QPID_LOG(info, "Routed rejected message from " << getName() << " to " << alternate->getName()); } else { //just drop it QPID_LOG(info, "Dropping rejected message from " << getName()); } dequeueFromStore(pmsg); } bool Queue::checkDepth(const QueueDepth& increment, const Message&) { if (settings.maxDepth && (settings.maxDepth - current < increment)) { if (mgmtObject) { mgmtObject->inc_discardsOverflow(); if (brokerMgmtObject) brokerMgmtObject->inc_discardsOverflow(); } throw ResourceLimitExceededException(QPID_MSG("Maximum depth exceeded on " << name << ": current=[" << current << "], max=[" << settings.maxDepth << "]")); } else { current += increment; return true; } } bool Queue::seek(QueueCursor& cursor, MessagePredicate predicate) { Mutex::ScopedLock locker(messageLock); //hold lock across calls to predicate, or take copy of message? //currently hold lock, may want to revise depending on any new use //cases Message* message = messages->next(cursor); while (message && (predicate && !predicate(*message))) { message = messages->next(cursor); } return message != 0; } bool Queue::seek(QueueCursor& cursor, MessagePredicate predicate, qpid::framing::SequenceNumber start) { Mutex::ScopedLock locker(messageLock); //hold lock across calls to predicate, or take copy of message? //currently hold lock, may want to revise depending on any new use //cases Message* message; message = messages->find(start, &cursor); if (message && (!predicate || predicate(*message))) return true; return seek(cursor, predicate); } bool Queue::seek(QueueCursor& cursor, qpid::framing::SequenceNumber start) { Mutex::ScopedLock locker(messageLock); return messages->find(start, &cursor); } Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {} bool Queue::UsageBarrier::acquire() { Monitor::ScopedLock l(usageLock); if (parent.deleted) { return false; } else { ++count; return true; } } void Queue::UsageBarrier::release() { Monitor::ScopedLock l(usageLock); if (--count == 0) usageLock.notifyAll(); } void Queue::UsageBarrier::destroy() { Monitor::ScopedLock l(usageLock); parent.deleted = true; while (count) usageLock.wait(); } void Queue::addArgument(const string& key, const types::Variant& value) { settings.original[key] = value; qpid::amqp_0_10::translate(settings.asMap(), encodableSettings); boost::shared_ptr v; qpid::amqp_0_10::translate(value, v); settings.storeSettings.set(key, v); if (mgmtObject != 0) mgmtObject->set_arguments(settings.asMap()); } void Queue::setRedirectPeer ( Queue::shared_ptr peer, bool isSrc) { Mutex::ScopedLock locker(messageLock); redirectPeer = peer; redirectSource = isSrc; } void Queue::setMgmtRedirectState( std::string peer, bool enabled, bool isSrc ) { if (mgmtObject != 0) { mgmtObject->set_redirectPeer(enabled ? peer : ""); mgmtObject->set_redirectSource(isSrc); } } void Queue::setOwningUser(std::string& _userId) { userId = _userId; if (mgmtObject != 0) mgmtObject->set_creator(userId); } bool Queue::reroute(boost::shared_ptr e, const Message& m) { if (e) { DeliverableMessage d(m, 0); d.getMessage().clearTrace(); e->routeWithAlternate(d); return true; } else { return false; } } Queue::QueueUsers::QueueUsers() : consumers(0), browsers(0), others(0), controller(false) {} void Queue::QueueUsers::addConsumer() { ++consumers; } void Queue::QueueUsers::addBrowser() { ++browsers; } void Queue::QueueUsers::addLifecycleController() { assert(!controller); controller = true; } void Queue::QueueUsers::addOther(){ ++others; } void Queue::QueueUsers::removeConsumer() { assert(consumers > 0); --consumers; } void Queue::QueueUsers::removeBrowser() { assert(browsers > 0); --browsers; } void Queue::QueueUsers::removeLifecycleController() { assert(controller); controller = false; } void Queue::QueueUsers::removeOther() { assert(others > 0); --others; } bool Queue::QueueUsers::isInUseByController() const { return controller; } bool Queue::QueueUsers::isUsed() const { return controller || consumers || browsers || others; } uint32_t Queue::QueueUsers::getSubscriberCount() const { return consumers + browsers; } bool Queue::QueueUsers::hasConsumers() const { return consumers; } Queue::ScopedAutoDelete::ScopedAutoDelete(Queue& q) : queue(q), eligible(false) {} void Queue::ScopedAutoDelete::check(const sys::Mutex::ScopedLock& lock) { eligible = queue.checkAutoDelete(lock); } Queue::ScopedAutoDelete::~ScopedAutoDelete() { if (eligible) queue.scheduleAutoDelete(); } }}