diff options
author | Gordon Sim <gsim@apache.org> | 2011-02-15 18:58:44 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2011-02-15 18:58:44 +0000 |
commit | 8d1696bd02332cbe7afc929d88d8e7bce73c8f2d (patch) | |
tree | bd284a09a32498c16aad18c5d4f162c5332c9802 | |
parent | e9cbb2646bdc51c5ae9e0e899231f017e9d9d20d (diff) | |
download | qpid-python-8d1696bd02332cbe7afc929d88d8e7bce73c8f2d.tar.gz |
QPID-3000: Added optional delay for auto-deletion
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1071013 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 74 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 3 | ||||
-rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py | 1 | ||||
-rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py | 37 |
4 files changed, 112 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index d5dc3e85f1..27c1cc4ad7 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -77,6 +77,7 @@ const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse"); const std::string qpidPersistLastNode("qpid.persist_last_node"); const std::string qpidVQMatchProperty("qpid.LVQ_key"); const std::string qpidQueueEventGeneration("qpid.queue_event_generation"); +const std::string qpidAutoDeleteTimeout("qpid.auto_delete_timeout"); //following feature is not ready for general use as it doesn't handle //the case where a message is enqueued on more than one queue well enough: const std::string qpidInsertSequenceNumbers("qpid.insert_sequence_numbers"); @@ -108,7 +109,8 @@ Queue::Queue(const string& _name, bool _autodelete, insertSeqNo(0), broker(b), deleted(false), - barrier(*this) + barrier(*this), + autoDeleteTimeout(0) { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); @@ -398,6 +400,10 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ consumerCount++; if (mgmtObject != 0) mgmtObject->inc_consumerCount (); + //reset auto deletion timer if necessary + if (autoDeleteTimeout && autoDeleteTask) { + autoDeleteTask->cancel(); + } } void Queue::cancel(Consumer::shared_ptr c){ @@ -567,7 +573,7 @@ uint32_t Queue::getConsumerCount() const bool Queue::canAutoDelete() const { Mutex::ScopedLock locker(consumerLock); - return autodelete && !consumerCount; + return autodelete && !consumerCount && !owner; } void Queue::clearLastNodeFailure() @@ -726,6 +732,28 @@ void Queue::create(const FieldTable& _settings) configure(_settings); } + +int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key) +{ + qpid::framing::FieldTable::ValuePtr v = settings.get(key); + if (!v) { + return 0; + } else if (v->convertsTo<int>()) { + return v->get<int>(); + } else if (v->convertsTo<std::string>()){ + std::string s = v->get<std::string>(); + try { + return boost::lexical_cast<int>(s); + } catch(const boost::bad_lexical_cast&) { + QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s); + return 0; + } + } else { + QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << *v); + return 0; + } +} + void Queue::configure(const FieldTable& _settings, bool recovering) { @@ -787,6 +815,10 @@ void Queue::configure(const FieldTable& _settings, bool recovering) FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers); if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>()); + autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout); + if (autoDeleteTimeout) + QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); + if (mgmtObject != 0) mgmtObject->set_arguments(ManagementAgent::toMap(_settings)); @@ -813,6 +845,7 @@ void Queue::destroy() store->destroy(*this); store = 0;//ensure we make no more calls to the store for this queue } + if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>(); } void Queue::notifyDeleted() @@ -917,15 +950,46 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange() return alternateExchange; } -void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) +void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue) { if (broker.getQueues().destroyIf(queue->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) { + QPID_LOG(debug, "Auto-deleting " << queue->getName()); queue->unbind(broker.getExchanges(), queue); queue->destroy(); } } +struct AutoDeleteTask : qpid::sys::TimerTask +{ + Broker& broker; + Queue::shared_ptr queue; + + AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) + : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {} + + void fire() + { + //need to detect case where queue was used after the task was + //created, but then became unused again before the task fired; + //in this case ignore this request as there will have already + //been a later task added + tryAutoDeleteImpl(broker, queue); + } +}; + +void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) +{ + if (queue->autoDeleteTimeout && queue->canAutoDelete()) { + AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC)); + queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time)); + broker.getClusterTimer().add(queue->autoDeleteTask); + QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated"); + } else { + tryAutoDeleteImpl(broker, queue); + } +} + bool Queue::isExclusiveOwner(const OwnershipToken* const o) const { Mutex::ScopedLock locker(ownershipLock); @@ -940,6 +1004,10 @@ void Queue::releaseExclusiveOwnership() bool Queue::setExclusiveOwner(const OwnershipToken* const o) { + //reset auto deletion timer if necessary + if (autoDeleteTimeout && autoDeleteTask) { + autoDeleteTask->cancel(); + } Mutex::ScopedLock locker(ownershipLock); if (owner) { return false; diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 664b1e0f01..12a3d273be 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -36,6 +36,7 @@ #include "qpid/framing/FieldTable.h" #include "qpid/sys/Monitor.h" +#include "qpid/sys/Timer.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Queue.h" #include "qpid/framing/amqp_types.h" @@ -126,6 +127,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, Broker* broker; bool deleted; UsageBarrier barrier; + int autoDeleteTimeout; + boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask; void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); void setPolicy(std::auto_ptr<QueuePolicy> policy); diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py b/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py index 46d56ee0c0..921786af22 100644 --- a/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py @@ -32,3 +32,4 @@ from tx import * from lvq import * from priority import * from threshold import * +from extensions import * diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py b/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py new file mode 100644 index 0000000000..26ea3cb0e9 --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import TestBase010 +from time import sleep + +class ExtensionTests(TestBase010): + """Tests for various extensions to AMQP 0-10""" + + def test_timed_autodelete(self): + session = self.session + session2 = self.conn.session("another-session") + session2.queue_declare(queue="my-queue", exclusive=True, auto_delete=True, arguments={"qpid.auto_delete_timeout":5}) + session2.close() + result = session.queue_query(queue="my-queue") + self.assertEqual("my-queue", result.queue) + sleep(5) + result = session.queue_query(queue="my-queue") + self.assert_(not result.queue) |