summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2011-02-15 18:58:44 +0000
committerGordon Sim <gsim@apache.org>2011-02-15 18:58:44 +0000
commit8d1696bd02332cbe7afc929d88d8e7bce73c8f2d (patch)
treebd284a09a32498c16aad18c5d4f162c5332c9802
parente9cbb2646bdc51c5ae9e0e899231f017e9d9d20d (diff)
downloadqpid-python-8d1696bd02332cbe7afc929d88d8e7bce73c8f2d.tar.gz
QPID-3000: Added optional delay for auto-deletion
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1071013 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp74
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h3
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py1
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py37
4 files changed, 112 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index d5dc3e85f1..27c1cc4ad7 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -77,6 +77,7 @@ const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse");
const std::string qpidPersistLastNode("qpid.persist_last_node");
const std::string qpidVQMatchProperty("qpid.LVQ_key");
const std::string qpidQueueEventGeneration("qpid.queue_event_generation");
+const std::string qpidAutoDeleteTimeout("qpid.auto_delete_timeout");
//following feature is not ready for general use as it doesn't handle
//the case where a message is enqueued on more than one queue well enough:
const std::string qpidInsertSequenceNumbers("qpid.insert_sequence_numbers");
@@ -108,7 +109,8 @@ Queue::Queue(const string& _name, bool _autodelete,
insertSeqNo(0),
broker(b),
deleted(false),
- barrier(*this)
+ barrier(*this),
+ autoDeleteTimeout(0)
{
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
@@ -398,6 +400,10 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
consumerCount++;
if (mgmtObject != 0)
mgmtObject->inc_consumerCount ();
+ //reset auto deletion timer if necessary
+ if (autoDeleteTimeout && autoDeleteTask) {
+ autoDeleteTask->cancel();
+ }
}
void Queue::cancel(Consumer::shared_ptr c){
@@ -567,7 +573,7 @@ uint32_t Queue::getConsumerCount() const
bool Queue::canAutoDelete() const
{
Mutex::ScopedLock locker(consumerLock);
- return autodelete && !consumerCount;
+ return autodelete && !consumerCount && !owner;
}
void Queue::clearLastNodeFailure()
@@ -726,6 +732,28 @@ void Queue::create(const FieldTable& _settings)
configure(_settings);
}
+
+int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key)
+{
+ qpid::framing::FieldTable::ValuePtr v = settings.get(key);
+ if (!v) {
+ return 0;
+ } else if (v->convertsTo<int>()) {
+ return v->get<int>();
+ } else if (v->convertsTo<std::string>()){
+ std::string s = v->get<std::string>();
+ try {
+ return boost::lexical_cast<int>(s);
+ } catch(const boost::bad_lexical_cast&) {
+ QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s);
+ return 0;
+ }
+ } else {
+ QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << *v);
+ return 0;
+ }
+}
+
void Queue::configure(const FieldTable& _settings, bool recovering)
{
@@ -787,6 +815,10 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
+ autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout);
+ if (autoDeleteTimeout)
+ QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout);
+
if (mgmtObject != 0)
mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
@@ -813,6 +845,7 @@ void Queue::destroy()
store->destroy(*this);
store = 0;//ensure we make no more calls to the store for this queue
}
+ if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
}
void Queue::notifyDeleted()
@@ -917,15 +950,46 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange()
return alternateExchange;
}
-void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
{
if (broker.getQueues().destroyIf(queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
+ QPID_LOG(debug, "Auto-deleting " << queue->getName());
queue->unbind(broker.getExchanges(), queue);
queue->destroy();
}
}
+struct AutoDeleteTask : qpid::sys::TimerTask
+{
+ Broker& broker;
+ Queue::shared_ptr queue;
+
+ AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
+ : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {}
+
+ void fire()
+ {
+ //need to detect case where queue was used after the task was
+ //created, but then became unused again before the task fired;
+ //in this case ignore this request as there will have already
+ //been a later task added
+ tryAutoDeleteImpl(broker, queue);
+ }
+};
+
+void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+{
+ if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
+ AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
+ queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time));
+ broker.getClusterTimer().add(queue->autoDeleteTask);
+ QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
+ } else {
+ tryAutoDeleteImpl(broker, queue);
+ }
+}
+
bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
{
Mutex::ScopedLock locker(ownershipLock);
@@ -940,6 +1004,10 @@ void Queue::releaseExclusiveOwnership()
bool Queue::setExclusiveOwner(const OwnershipToken* const o)
{
+ //reset auto deletion timer if necessary
+ if (autoDeleteTimeout && autoDeleteTask) {
+ autoDeleteTask->cancel();
+ }
Mutex::ScopedLock locker(ownershipLock);
if (owner) {
return false;
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 664b1e0f01..12a3d273be 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -36,6 +36,7 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Timer.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Queue.h"
#include "qpid/framing/amqp_types.h"
@@ -126,6 +127,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
Broker* broker;
bool deleted;
UsageBarrier barrier;
+ int autoDeleteTimeout;
+ boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py b/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
index 46d56ee0c0..921786af22 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
@@ -32,3 +32,4 @@ from tx import *
from lvq import *
from priority import *
from threshold import *
+from extensions import *
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py b/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py
new file mode 100644
index 0000000000..26ea3cb0e9
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import TestBase010
+from time import sleep
+
+class ExtensionTests(TestBase010):
+ """Tests for various extensions to AMQP 0-10"""
+
+ def test_timed_autodelete(self):
+ session = self.session
+ session2 = self.conn.session("another-session")
+ session2.queue_declare(queue="my-queue", exclusive=True, auto_delete=True, arguments={"qpid.auto_delete_timeout":5})
+ session2.close()
+ result = session.queue_query(queue="my-queue")
+ self.assertEqual("my-queue", result.queue)
+ sleep(5)
+ result = session.queue_query(queue="my-queue")
+ self.assert_(not result.queue)