diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 465 |
1 files changed, 214 insertions, 251 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 3de93ed74e..cfb32749a0 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -23,11 +23,16 @@ #include "qpid/broker/Queue.h" #include "qpid/broker/QueueEvents.h" #include "qpid/broker/Exchange.h" +#include "qpid/broker/Fairshare.h" #include "qpid/broker/DeliverableMessage.h" +#include "qpid/broker/LegacyLVQ.h" +#include "qpid/broker/MessageDeque.h" +#include "qpid/broker/MessageMap.h" #include "qpid/broker/MessageStore.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/QueueFlowLimit.h" +#include "qpid/broker/ThresholdAlerts.h" #include "qpid/StringUtils.h" #include "qpid/log/Statement.h" @@ -67,11 +72,13 @@ const std::string qpidMaxCount("qpid.max_count"); const std::string qpidNoLocal("no-local"); const std::string qpidTraceIdentity("qpid.trace.id"); const std::string qpidTraceExclude("qpid.trace.exclude"); +const std::string qpidLastValueQueueKey("qpid.last_value_queue_key"); const std::string qpidLastValueQueue("qpid.last_value_queue"); const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse"); const std::string qpidPersistLastNode("qpid.persist_last_node"); const std::string qpidVQMatchProperty("qpid.LVQ_key"); const std::string qpidQueueEventGeneration("qpid.queue_event_generation"); +const std::string qpidAutoDeleteTimeout("qpid.auto_delete_timeout"); //following feature is not ready for general use as it doesn't handle //the case where a message is enqueued on more than one queue well enough: const std::string qpidInsertSequenceNumbers("qpid.insert_sequence_numbers"); @@ -93,19 +100,18 @@ Queue::Queue(const string& _name, bool _autodelete, consumerCount(0), exclusive(0), noLocal(false), - lastValueQueue(false), - lastValueQueueNoBrowse(false), persistLastNode(false), inLastNodeFailure(false), + messages(new MessageDeque()), persistenceId(0), policyExceeded(false), mgmtObject(0), eventMode(0), - eventMgr(0), insertSeqNo(0), broker(b), deleted(false), - barrier(*this) + barrier(*this), + autoDeleteTimeout(0) { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); @@ -160,7 +166,6 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ } else { enqueue(0, msg); push(msg); - mgntEnqStats(msg); QPID_LOG(debug, "Message " << msg << " enqueued on " << name); } } @@ -179,7 +184,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ msg->addToSyncList(shared_from_this(), store); } msg->enqueueComplete(); // mark the message as enqueued - mgntEnqStats(msg); if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) { //content has not been loaded, need to ensure that lazy loading mode is set: @@ -194,7 +198,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); - mgntEnqStats(msg); if (mgmtObject != 0){ mgmtObject->inc_msgTxnEnqueues (); mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); @@ -208,7 +211,7 @@ void Queue::requeue(const QueuedMessage& msg){ Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return; msg.payload->enqueueComplete(); // mark the message as enqueued - messages.insert(lower_bound(messages.begin(), messages.end(), msg), msg); + messages->reinsert(msg); listeners.populate(copy); // for persistLastNode - don't force a message twice to disk, but force it if no force before @@ -223,57 +226,23 @@ void Queue::requeue(const QueuedMessage& msg){ copy.notify(); } -void Queue::clearLVQIndex(const QueuedMessage& msg){ - assertClusterSafe(); - const framing::FieldTable* ft = msg.payload ? msg.payload->getApplicationHeaders() : 0; - if (lastValueQueue && ft){ - string key = ft->getAsString(qpidVQMatchProperty); - lvq.erase(key); - } -} - bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { Mutex::ScopedLock locker(messageLock); assertClusterSafe(); QPID_LOG(debug, "Attempting to acquire message at " << position); - - Messages::iterator i = findAt(position); - if (i != messages.end() ) { - message = *i; - if (lastValueQueue) { - clearLVQIndex(*i); - } - QPID_LOG(debug, - "Acquired message at " << i->position << " from " << name); - messages.erase(i); + if (messages->remove(position, message)) { + QPID_LOG(debug, "Acquired message at " << position << " from " << name); return true; - } - QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position"); - return false; + } else { + QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position"); + return false; + } } bool Queue::acquire(const QueuedMessage& msg) { - Mutex::ScopedLock locker(messageLock); - assertClusterSafe(); - - QPID_LOG(debug, "attempting to acquire " << msg.position); - Messages::iterator i = findAt(msg.position); - if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set - (!lastValueQueue || - (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0 - ) { - - clearLVQIndex(msg); - QPID_LOG(debug, - "Match found, acquire succeeded: " << - i->position << " == " << msg.position); - messages.erase(i); - return true; - } - - QPID_LOG(debug, "Acquire failed for " << msg.position); - return false; + QueuedMessage copy = msg; + return acquireMessageAt(msg.position, copy); } void Queue::notifyListener() @@ -282,7 +251,7 @@ void Queue::notifyListener() QueueListeners::NotificationSet set; { Mutex::ScopedLock locker(messageLock); - if (messages.size()) { + if (messages->size()) { listeners.populate(set); } } @@ -311,12 +280,12 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ { while (true) { Mutex::ScopedLock locker(messageLock); - if (messages.empty()) { + if (messages->empty()) { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); listeners.addListener(c); return NO_MESSAGES; } else { - QueuedMessage msg = getFront(); + QueuedMessage msg = messages->front(); if (msg.payload->hasExpired()) { QPID_LOG(debug, "Message expired from queue '" << name << "'"); popAndDequeue(); @@ -326,7 +295,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ if (c->filter(msg.payload)) { if (c->accept(msg.payload)) { m = msg; - popMsg(msg); + pop(); return CONSUMED; } else { //message(s) are available but consumer hasn't got enough credit @@ -352,11 +321,6 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) //consumer wants the message c->position = msg.position; m = msg; - if (!lastValueQueueNoBrowse) clearLVQIndex(msg); - if (lastValueQueue) { - boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this); - if (replacement.get()) m.payload = replacement; - } return true; } else { //browser hasn't got enough credit for the message @@ -378,7 +342,7 @@ void Queue::removeListener(Consumer::shared_ptr c) { Mutex::ScopedLock locker(messageLock); listeners.removeListener(c); - if (messages.size()) { + if (messages->size()) { listeners.populate(set); } } @@ -399,52 +363,20 @@ bool Queue::dispatch(Consumer::shared_ptr c) // Find the next message bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) { Mutex::ScopedLock locker(messageLock); - if (!messages.empty() && messages.back().position > c->position) { - if (c->position < getFront().position) { - msg = getFront(); - return true; - } else { - Messages::iterator pos = findAt(c->position); - if (pos != messages.end() && pos+1 != messages.end()) { - msg = *(pos+1); - return true; - } - } + if (messages->next(c->position, msg)) { + return true; + } else { + listeners.addListener(c); + return false; } - listeners.addListener(c); - return false; } -Queue::Messages::iterator Queue::findAt(SequenceNumber pos) { - - if(!messages.empty()){ - QueuedMessage compM; - compM.position = pos; - unsigned long diff = pos.getValue() - messages.front().position.getValue(); - long maxEnd = diff < messages.size()? diff : messages.size(); - - Messages::iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM); - if (i!= messages.end() && i->position == pos) - return i; - } - return messages.end(); // no match found. -} - - QueuedMessage Queue::find(SequenceNumber pos) const { Mutex::ScopedLock locker(messageLock); - if(!messages.empty()){ - QueuedMessage compM; - compM.position = pos; - unsigned long diff = pos.getValue() - messages.front().position.getValue(); - long maxEnd = diff < messages.size()? diff : messages.size(); - - Messages::const_iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM); - if (i != messages.end()) - return *i; - } - return QueuedMessage(); + QueuedMessage msg; + messages->find(pos, msg); + return msg; } void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ @@ -464,6 +396,10 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ consumerCount++; if (mgmtObject != 0) mgmtObject->inc_consumerCount (); + //reset auto deletion timer if necessary + if (autoDeleteTimeout && autoDeleteTask) { + autoDeleteTask->cancel(); + } } void Queue::cancel(Consumer::shared_ptr c){ @@ -478,12 +414,18 @@ void Queue::cancel(Consumer::shared_ptr c){ QueuedMessage Queue::get(){ Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); + messages->pop(msg); + return msg; +} - if(!messages.empty()){ - msg = getFront(); - popMsg(msg); +bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& message) +{ + if (message.payload->hasExpired()) { + expired.push_back(message); + return true; + } else { + return false; } - return msg; } void Queue::purgeExpired() @@ -492,37 +434,11 @@ void Queue::purgeExpired() //bother explicitly expiring if the rate of dequeues since last //attempt is less than one per second. - //Note: This method is currently called periodically on the timer - //thread. In a clustered broker this means that the purging does - //not occur on the cluster event dispatch thread and consequently - //that is not totally ordered w.r.t other events (including - //publication of messages). However the cluster does ensure that - //the actual expiration of messages (as distinct from the removing - //of those expired messages from the queue) *is* consistently - //ordered w.r.t. cluster events. This means that delivery of - //messages is in general consistent across the cluster inspite of - //any non-determinism in the triggering of a purge. However at - //present purging a last value queue could potentially cause - //inconsistencies in the cluster (as the order w.r.t publications - //can affect the order in which messages appear in the - //queue). Consequently periodic purging of an LVQ is not enabled - //(expired messages will be removed on delivery and consolidated - //by key as part of normal LVQ operation). - - if (dequeueTracker.sampleRatePerSecond() < 1 && !lastValueQueue) { - Messages expired; + if (dequeueTracker.sampleRatePerSecond() < 1) { + std::deque<QueuedMessage> expired; { Mutex::ScopedLock locker(messageLock); - for (Messages::iterator i = messages.begin(); i != messages.end();) { - //Re-introduce management of LVQ-specific state here - //if purging is renabled for that case (see note above) - if (i->payload->hasExpired()) { - expired.push_back(*i); - i = messages.erase(i); - } else { - ++i; - } - } + messages->removeIf(boost::bind(&collect_if_expired, expired, _1)); } for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } @@ -548,13 +464,13 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> uint32_t count = 0; // Either purge them all or just the some (purge_count) while the queue isn't empty. - while((!purge_request || purge_count--) && !messages.empty()) { + while((!purge_request || purge_count--) && !messages->empty()) { if (dest.get()) { // // If there is a destination exchange, stage the messages onto a reroute queue // so they don't wind up getting purged more than once. // - DeliverableMessage msg(getFront().payload); + DeliverableMessage msg(messages->front().payload); rerouteQueue.push_back(msg); } popAndDequeue(); @@ -580,101 +496,53 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) { uint32_t move_count = qty; // only comes into play if qty >0 uint32_t count = 0; // count how many were moved for returning - while((!qty || move_count--) && !messages.empty()) { - QueuedMessage qmsg = getFront(); + while((!qty || move_count--) && !messages->empty()) { + QueuedMessage qmsg = messages->front(); boost::intrusive_ptr<Message> msg = qmsg.payload; destq->deliver(msg); // deliver message to the destination queue - popMsg(qmsg); + pop(); dequeue(0, qmsg); count++; } return count; } -void Queue::popMsg(QueuedMessage& qmsg) +void Queue::pop() { assertClusterSafe(); - const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders(); - if (lastValueQueue && ft){ - string key = ft->getAsString(qpidVQMatchProperty); - lvq.erase(key); - } - messages.pop_front(); + messages->pop(); ++dequeueTracker; } void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ assertClusterSafe(); QueueListeners::NotificationSet copy; + QueuedMessage removed; + bool dequeueRequired = false; { Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); - LVQ::iterator i; - const framing::FieldTable* ft = msg->getApplicationHeaders(); - if (lastValueQueue && ft){ - string key = ft->getAsString(qpidVQMatchProperty); - - i = lvq.find(key); - if (i == lvq.end() || (broker && broker->isClusterUpdatee())) { - messages.push_back(qm); - listeners.populate(copy); - lvq[key] = msg; - }else { - boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this); - if (!old) old = i->second; - i->second->setReplacementMessage(msg,this); - if (isRecovery) { - //can't issue new requests for the store until - //recovery is complete - pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position)); - } else { - Mutex::ScopedUnlock u(messageLock); - dequeue(0, QueuedMessage(qm.queue, old, qm.position)); - } - } - }else { - messages.push_back(qm); - listeners.populate(copy); - } - if (eventMode) { - if (eventMgr) eventMgr->enqueued(qm); - else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName()); - } - if (policy.get()) { - policy->enqueued(qm); - } - if (flowLimit.get()) - flowLimit->enqueued(qm); + dequeueRequired = messages->push(qm, removed); + listeners.populate(copy); + enqueued(qm); } copy.notify(); -} - -QueuedMessage Queue::getFront() -{ - QueuedMessage msg = messages.front(); - if (lastValueQueue) { - boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this); - if (replacement.get()) msg.payload = replacement; + if (dequeueRequired) { + if (isRecovery) { + //can't issue new requests for the store until + //recovery is complete + pendingDequeues.push_back(removed); + } else { + dequeue(0, removed); + } } - return msg; } -QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg) +void isEnqueueComplete(uint32_t* result, const QueuedMessage& message) { - boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this); - if (replacement.get()) { - const framing::FieldTable* ft = replacement->getApplicationHeaders(); - if (ft) { - string key = ft->getAsString(qpidVQMatchProperty); - if (lvq.find(key) != lvq.end()){ - lvq[key] = replacement; - } - } - msg.payload = replacement; - } - return msg; + if (message.payload->isIngressComplete()) (*result)++; } /** function only provided for unit tests, or code not in critical message path */ @@ -682,20 +550,14 @@ uint32_t Queue::getEnqueueCompleteMessageCount() const { Mutex::ScopedLock locker(messageLock); uint32_t count = 0; - for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) { - //NOTE: don't need to use checkLvqReplace() here as it - //is only relevant for LVQ which does not support persistence - //so the enqueueComplete check has no effect - if ( i->payload->isIngressComplete() ) count ++; - } - + messages->foreach(boost::bind(&isEnqueueComplete, &count, _1)); return count; } uint32_t Queue::getMessageCount() const { Mutex::ScopedLock locker(messageLock); - return messages.size(); + return messages->size(); } uint32_t Queue::getConsumerCount() const @@ -707,7 +569,7 @@ uint32_t Queue::getConsumerCount() const bool Queue::canAutoDelete() const { Mutex::ScopedLock locker(consumerLock); - return autodelete && !consumerCount; + return autodelete && !consumerCount && !owner; } void Queue::clearLastNodeFailure() @@ -715,21 +577,22 @@ void Queue::clearLastNodeFailure() inLastNodeFailure = false; } +void Queue::forcePersistent(QueuedMessage& message) +{ + if(!message.payload->isStoredOnQueue(shared_from_this())) { + message.payload->forcePersistent(); + if (message.payload->isForcedPersistent() ){ + enqueue(0, message.payload); + } + } +} + void Queue::setLastNodeFailure() { if (persistLastNode){ Mutex::ScopedLock locker(messageLock); try { - for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) { - if (lastValueQueue) checkLvqReplace(*i); - // don't force a message twice to disk. - if(!i->payload->isStoredOnQueue(shared_from_this())) { - i->payload->forcePersistent(); - if (i->payload->isForcedPersistent() ){ - enqueue(0, i->payload); - } - } - } + messages->foreach(boost::bind(&Queue::forcePersistent, this, _1)); } catch (const std::exception& e) { // Could not go into last node standing (for example journal not large enough) QPID_LOG(error, "Unable to fail to last node standing for queue: " << name << " : " << e.what()); @@ -746,7 +609,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg if (!u.acquired) return false; if (policy.get() && !suppressPolicyCheck) { - Messages dequeues; + std::deque<QueuedMessage> dequeues; { Mutex::ScopedLock locker(messageLock); policy->tryEnqueue(msg); @@ -833,8 +696,8 @@ void Queue::dequeueCommitted(const QueuedMessage& msg) */ void Queue::popAndDequeue() { - QueuedMessage msg = getFront(); - popMsg(msg); + QueuedMessage msg = messages->front(); + pop(); dequeue(0, msg); } @@ -845,11 +708,16 @@ void Queue::popAndDequeue() void Queue::dequeued(const QueuedMessage& msg) { if (policy.get()) policy->dequeued(msg); + /** todo KAG make flowLimit an observer */ if (flowLimit.get()) flowLimit->dequeued(msg); mgntDeqStats(msg.payload); - if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) { - eventMgr->dequeued(msg); + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->dequeued(msg); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of dequeue for queue " << getName() << ": " << e.what()); + } } } @@ -863,16 +731,41 @@ void Queue::create(const FieldTable& _settings) configure(_settings); } + +int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key) +{ + qpid::framing::FieldTable::ValuePtr v = settings.get(key); + if (!v) { + return 0; + } else if (v->convertsTo<int>()) { + return v->get<int>(); + } else if (v->convertsTo<std::string>()){ + std::string s = v->get<std::string>(); + try { + return boost::lexical_cast<int>(s); + } catch(const boost::bad_lexical_cast&) { + QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s); + return 0; + } + } else { + QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << *v); + return 0; + } +} + void Queue::configure(const FieldTable& _settings, bool recovering) { eventMode = _settings.getAsInt(qpidQueueEventGeneration); + if (eventMode && broker) { + broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY); + } if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && - (!store || NullMessageStore::isNullStore(store) || (eventMode && eventMgr && !eventMgr->isSync()) )) { + (!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) { if ( NullMessageStore::isNullStore(store)) { QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName()); - } else if (eventMgr && !eventMgr->isSync() ) { + } else if (broker && !(broker->getQueueEvents().isSync()) ) { QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName()); } FieldTable copy(_settings); @@ -881,17 +774,30 @@ void Queue::configure(const FieldTable& _settings, bool recovering) } else { setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings)); } + if (broker && broker->getManagementAgent()) { + ThresholdAlerts::observe(*this, *(broker->getManagementAgent()), _settings); + } + //set this regardless of owner to allow use of no-local with exclusive consumers also noLocal = _settings.get(qpidNoLocal); QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal); - lastValueQueue= _settings.get(qpidLastValueQueue); - if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue for: " << getName()); - - lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse); - if (lastValueQueueNoBrowse){ - QPID_LOG(debug, "Configured queue as Last Value Queue No Browse for: " << getName()); - lastValueQueue = lastValueQueueNoBrowse; + std::string lvqKey = _settings.getAsString(qpidLastValueQueueKey); + if (lvqKey.size()) { + QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey); + messages = std::auto_ptr<Messages>(new MessageMap(lvqKey)); + } else if (_settings.get(qpidLastValueQueueNoBrowse)) { + QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on"); + messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker); + } else if (_settings.get(qpidLastValueQueue)) { + QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue"); + messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker); + } else { + std::auto_ptr<Messages> m = Fairshare::create(_settings); + if (m.get()) { + messages = m; + QPID_LOG(debug, "Configured queue " << getName() << " as priority queue."); + } } persistLastNode= _settings.get(qpidPersistLastNode); @@ -910,6 +816,10 @@ void Queue::configure(const FieldTable& _settings, bool recovering) flowLimit = QueueFlowLimit::createQueueFlowLimit(this, _settings); + autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout); + if (autoDeleteTimeout) + QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); + if (mgmtObject != 0) { mgmtObject->set_arguments(ManagementAgent::toMap(_settings)); if (flowLimit.get()) @@ -924,8 +834,8 @@ void Queue::destroy() { if (alternateExchange.get()) { Mutex::ScopedLock locker(messageLock); - while(!messages.empty()){ - DeliverableMessage msg(getFront().payload); + while(!messages->empty()){ + DeliverableMessage msg(messages->front().payload); alternateExchange->route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders()); popAndDequeue(); @@ -939,6 +849,7 @@ void Queue::destroy() store->destroy(*this); store = 0;//ensure we make no more calls to the store for this queue } + if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>(); } void Queue::notifyDeleted() @@ -1043,15 +954,46 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange() return alternateExchange; } -void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) +void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue) { if (broker.getQueues().destroyIf(queue->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) { + QPID_LOG(debug, "Auto-deleting " << queue->getName()); queue->unbind(broker.getExchanges(), queue); queue->destroy(); } } +struct AutoDeleteTask : qpid::sys::TimerTask +{ + Broker& broker; + Queue::shared_ptr queue; + + AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) + : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {} + + void fire() + { + //need to detect case where queue was used after the task was + //created, but then became unused again before the task fired; + //in this case ignore this request as there will have already + //been a later task added + tryAutoDeleteImpl(broker, queue); + } +}; + +void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) +{ + if (queue->autoDeleteTimeout && queue->canAutoDelete()) { + AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC)); + queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time)); + broker.getClusterTimer().add(queue->autoDeleteTask); + QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated"); + } else { + tryAutoDeleteImpl(broker, queue); + } +} + bool Queue::isExclusiveOwner(const OwnershipToken* const o) const { Mutex::ScopedLock locker(ownershipLock); @@ -1066,6 +1008,10 @@ void Queue::releaseExclusiveOwnership() bool Queue::setExclusiveOwner(const OwnershipToken* const o) { + //reset auto deletion timer if necessary + if (autoDeleteTimeout && autoDeleteTask) { + autoDeleteTask->cancel(); + } Mutex::ScopedLock locker(ownershipLock); if (owner) { return false; @@ -1154,11 +1100,6 @@ SequenceNumber Queue::getPosition() { int Queue::getEventMode() { return eventMode; } -void Queue::setQueueEventManager(QueueEvents& mgr) -{ - eventMgr = &mgr; -} - void Queue::recoveryComplete(ExchangeRegistry& exchanges) { // set the alternate exchange @@ -1184,16 +1125,31 @@ void Queue::insertSequenceNumbers(const std::string& key) void Queue::enqueued(const QueuedMessage& m) { - if (m.payload) { - if (policy.get()) { - policy->recoverEnqueued(m.payload); - policy->enqueued(m); + for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) { + try { + (*i)->enqueued(m); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what()); } - if (flowLimit.get()) - flowLimit->enqueued(m); - mgntEnqStats(m.payload); + } + if (policy.get()) { + policy->enqueued(m); + } + /** todo make flowlimit an observer */ + if (flowLimit.get()) + flowLimit->enqueued(m); + mgntEnqStats(m.payload); +} + +void Queue::updateEnqueued(const QueuedMessage& m) +{ + if (m.payload) { boost::intrusive_ptr<Message> payload = m.payload; enqueue ( 0, payload, true ); + if (policy.get()) { + policy->recoverEnqueued(payload); + } + enqueued(m); } else { QPID_LOG(warning, "Queue informed of enqueued message that has no payload"); } @@ -1205,6 +1161,8 @@ bool Queue::isEnqueued(const QueuedMessage& msg) } QueueListeners& Queue::getListeners() { return listeners; } +Messages& Queue::getMessages() { return *messages; } +const Messages& Queue::getMessages() const { return *messages; } void Queue::checkNotDeleted() { @@ -1213,6 +1171,11 @@ void Queue::checkNotDeleted() } } +void Queue::addObserver(boost::shared_ptr<QueueObserver> observer) +{ + observers.insert(observer); +} + void Queue::flush() { ScopedUse u(barrier); |