diff options
Diffstat (limited to 'cpp/src')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 74 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.h | 3 |
2 files changed, 74 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index d5dc3e85f1..27c1cc4ad7 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -77,6 +77,7 @@ const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse"); const std::string qpidPersistLastNode("qpid.persist_last_node"); const std::string qpidVQMatchProperty("qpid.LVQ_key"); const std::string qpidQueueEventGeneration("qpid.queue_event_generation"); +const std::string qpidAutoDeleteTimeout("qpid.auto_delete_timeout"); //following feature is not ready for general use as it doesn't handle //the case where a message is enqueued on more than one queue well enough: const std::string qpidInsertSequenceNumbers("qpid.insert_sequence_numbers"); @@ -108,7 +109,8 @@ Queue::Queue(const string& _name, bool _autodelete, insertSeqNo(0), broker(b), deleted(false), - barrier(*this) + barrier(*this), + autoDeleteTimeout(0) { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); @@ -398,6 +400,10 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ consumerCount++; if (mgmtObject != 0) mgmtObject->inc_consumerCount (); + //reset auto deletion timer if necessary + if (autoDeleteTimeout && autoDeleteTask) { + autoDeleteTask->cancel(); + } } void Queue::cancel(Consumer::shared_ptr c){ @@ -567,7 +573,7 @@ uint32_t Queue::getConsumerCount() const bool Queue::canAutoDelete() const { Mutex::ScopedLock locker(consumerLock); - return autodelete && !consumerCount; + return autodelete && !consumerCount && !owner; } void Queue::clearLastNodeFailure() @@ -726,6 +732,28 @@ void Queue::create(const FieldTable& _settings) configure(_settings); } + +int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key) +{ + qpid::framing::FieldTable::ValuePtr v = settings.get(key); + if (!v) { + return 0; + } else if (v->convertsTo<int>()) { + return v->get<int>(); + } else if (v->convertsTo<std::string>()){ + std::string s = v->get<std::string>(); + try { + return boost::lexical_cast<int>(s); + } catch(const boost::bad_lexical_cast&) { + QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s); + return 0; + } + } else { + QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << *v); + return 0; + } +} + void Queue::configure(const FieldTable& _settings, bool recovering) { @@ -787,6 +815,10 @@ void Queue::configure(const FieldTable& _settings, bool recovering) FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers); if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>()); + autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout); + if (autoDeleteTimeout) + QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); + if (mgmtObject != 0) mgmtObject->set_arguments(ManagementAgent::toMap(_settings)); @@ -813,6 +845,7 @@ void Queue::destroy() store->destroy(*this); store = 0;//ensure we make no more calls to the store for this queue } + if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>(); } void Queue::notifyDeleted() @@ -917,15 +950,46 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange() return alternateExchange; } -void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) +void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue) { if (broker.getQueues().destroyIf(queue->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) { + QPID_LOG(debug, "Auto-deleting " << queue->getName()); queue->unbind(broker.getExchanges(), queue); queue->destroy(); } } +struct AutoDeleteTask : qpid::sys::TimerTask +{ + Broker& broker; + Queue::shared_ptr queue; + + AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) + : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {} + + void fire() + { + //need to detect case where queue was used after the task was + //created, but then became unused again before the task fired; + //in this case ignore this request as there will have already + //been a later task added + tryAutoDeleteImpl(broker, queue); + } +}; + +void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) +{ + if (queue->autoDeleteTimeout && queue->canAutoDelete()) { + AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC)); + queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time)); + broker.getClusterTimer().add(queue->autoDeleteTask); + QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated"); + } else { + tryAutoDeleteImpl(broker, queue); + } +} + bool Queue::isExclusiveOwner(const OwnershipToken* const o) const { Mutex::ScopedLock locker(ownershipLock); @@ -940,6 +1004,10 @@ void Queue::releaseExclusiveOwnership() bool Queue::setExclusiveOwner(const OwnershipToken* const o) { + //reset auto deletion timer if necessary + if (autoDeleteTimeout && autoDeleteTask) { + autoDeleteTask->cancel(); + } Mutex::ScopedLock locker(ownershipLock); if (owner) { return false; diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 664b1e0f01..12a3d273be 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -36,6 +36,7 @@ #include "qpid/framing/FieldTable.h" #include "qpid/sys/Monitor.h" +#include "qpid/sys/Timer.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Queue.h" #include "qpid/framing/amqp_types.h" @@ -126,6 +127,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, Broker* broker; bool deleted; UsageBarrier barrier; + int autoDeleteTimeout; + boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask; void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); void setPolicy(std::auto_ptr<QueuePolicy> policy); |
