summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp74
1 files changed, 71 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index d5dc3e85f1..27c1cc4ad7 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -77,6 +77,7 @@ const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse");
const std::string qpidPersistLastNode("qpid.persist_last_node");
const std::string qpidVQMatchProperty("qpid.LVQ_key");
const std::string qpidQueueEventGeneration("qpid.queue_event_generation");
+const std::string qpidAutoDeleteTimeout("qpid.auto_delete_timeout");
//following feature is not ready for general use as it doesn't handle
//the case where a message is enqueued on more than one queue well enough:
const std::string qpidInsertSequenceNumbers("qpid.insert_sequence_numbers");
@@ -108,7 +109,8 @@ Queue::Queue(const string& _name, bool _autodelete,
insertSeqNo(0),
broker(b),
deleted(false),
- barrier(*this)
+ barrier(*this),
+ autoDeleteTimeout(0)
{
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
@@ -398,6 +400,10 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
consumerCount++;
if (mgmtObject != 0)
mgmtObject->inc_consumerCount ();
+ //reset auto deletion timer if necessary
+ if (autoDeleteTimeout && autoDeleteTask) {
+ autoDeleteTask->cancel();
+ }
}
void Queue::cancel(Consumer::shared_ptr c){
@@ -567,7 +573,7 @@ uint32_t Queue::getConsumerCount() const
bool Queue::canAutoDelete() const
{
Mutex::ScopedLock locker(consumerLock);
- return autodelete && !consumerCount;
+ return autodelete && !consumerCount && !owner;
}
void Queue::clearLastNodeFailure()
@@ -726,6 +732,28 @@ void Queue::create(const FieldTable& _settings)
configure(_settings);
}
+
+int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key)
+{
+ qpid::framing::FieldTable::ValuePtr v = settings.get(key);
+ if (!v) {
+ return 0;
+ } else if (v->convertsTo<int>()) {
+ return v->get<int>();
+ } else if (v->convertsTo<std::string>()){
+ std::string s = v->get<std::string>();
+ try {
+ return boost::lexical_cast<int>(s);
+ } catch(const boost::bad_lexical_cast&) {
+ QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s);
+ return 0;
+ }
+ } else {
+ QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << *v);
+ return 0;
+ }
+}
+
void Queue::configure(const FieldTable& _settings, bool recovering)
{
@@ -787,6 +815,10 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
+ autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout);
+ if (autoDeleteTimeout)
+ QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout);
+
if (mgmtObject != 0)
mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
@@ -813,6 +845,7 @@ void Queue::destroy()
store->destroy(*this);
store = 0;//ensure we make no more calls to the store for this queue
}
+ if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
}
void Queue::notifyDeleted()
@@ -917,15 +950,46 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange()
return alternateExchange;
}
-void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
{
if (broker.getQueues().destroyIf(queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
+ QPID_LOG(debug, "Auto-deleting " << queue->getName());
queue->unbind(broker.getExchanges(), queue);
queue->destroy();
}
}
+struct AutoDeleteTask : qpid::sys::TimerTask
+{
+ Broker& broker;
+ Queue::shared_ptr queue;
+
+ AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
+ : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {}
+
+ void fire()
+ {
+ //need to detect case where queue was used after the task was
+ //created, but then became unused again before the task fired;
+ //in this case ignore this request as there will have already
+ //been a later task added
+ tryAutoDeleteImpl(broker, queue);
+ }
+};
+
+void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+{
+ if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
+ AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
+ queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time));
+ broker.getClusterTimer().add(queue->autoDeleteTask);
+ QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
+ } else {
+ tryAutoDeleteImpl(broker, queue);
+ }
+}
+
bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
{
Mutex::ScopedLock locker(ownershipLock);
@@ -940,6 +1004,10 @@ void Queue::releaseExclusiveOwnership()
bool Queue::setExclusiveOwner(const OwnershipToken* const o)
{
+ //reset auto deletion timer if necessary
+ if (autoDeleteTimeout && autoDeleteTask) {
+ autoDeleteTask->cancel();
+ }
Mutex::ScopedLock locker(ownershipLock);
if (owner) {
return false;