summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2015-11-06 20:32:27 +0000
committerGordon Sim <gsim@apache.org>2015-11-06 20:32:27 +0000
commit317ef8dba524d3aee6c2006c53637e6d38c97d2b (patch)
tree3feed5a26f3c674d171c684302fa642aac1f8cdb
parentb1dcd3eb8ac737c9be61c1c67bcb57ff04344c63 (diff)
downloadqpid-python-317ef8dba524d3aee6c2006c53637e6d38c97d2b.tar.gz
QPID-6834: allow delete-on-close in queue policies
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1713025 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/QueueSettings.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp12
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.h1
3 files changed, 14 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.cpp b/qpid/cpp/src/qpid/broker/QueueSettings.cpp
index 8de8539579..29d482d7f2 100644
--- a/qpid/cpp/src/qpid/broker/QueueSettings.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueSettings.cpp
@@ -60,6 +60,7 @@ const std::string MAX_PAGES("qpid.max_pages_loaded");
const std::string PAGE_FACTOR("qpid.page_factor");
const std::string FILTER("qpid.filter");
const std::string LIFETIME_POLICY("qpid.lifetime-policy");
+const std::string DELETE_ON_CLOSE_KEY("delete-on-close");
const std::string DELETE_IF_UNUSED_KEY("delete-if-unused");
const std::string DELETE_IF_UNUSED_AND_EMPTY_KEY("delete-if-unused-and-empty");
const std::string MANUAL("manual");
@@ -234,6 +235,9 @@ bool QueueSettings::handle(const std::string& key, const qpid::types::Variant& v
autodelete = true;
} else if (value.asString() == MANUAL) {
autodelete = false;
+ } else if (value.asString() == DELETE_ON_CLOSE_KEY) {
+ lifetime = DELETE_ON_CLOSE;
+ autodelete = true;
} else {
QPID_LOG(warning, "Invalid value for " << LIFETIME_POLICY << ": " << value);
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index 2f793aea14..e0ca8f58e0 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -231,6 +231,11 @@ class IncomingToCoordinator : public DecodingIncoming
private:
};
+bool Session::ResolvedNode::trackControllingLink() const
+{
+ return created && (properties.trackControllingLink() || (queue && queue->getSettings().lifetime == QueueSettings::DELETE_ON_CLOSE));
+}
+
Session::Session(pn_session_t* s, Connection& c, qpid::sys::OutputControl& o)
: ManagedSession(c.getBroker(), c, (boost::format("%1%") % s).str()), session(s), connection(c), out(o), deleted(false),
authorise(connection.getUserId(), connection.getBroker().getAcl()),
@@ -306,6 +311,7 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te
std::pair<boost::shared_ptr<Queue>, boost::shared_ptr<Topic> > result = nodePolicy->create(name, connection);
node.queue = result.first;
node.topic = result.second;
+ node.created = node.queue || node.topic;
if (node.topic) node.exchange = node.topic->getExchange();
if (node.queue) {
@@ -478,10 +484,10 @@ void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::s
source = sourceAddress;
}
if (node.queue) {
- boost::shared_ptr<Incoming> q(new IncomingToQueue(connection.getBroker(), *this, node.queue, link, source, node.created && node.properties.trackControllingLink()));
+ boost::shared_ptr<Incoming> q(new IncomingToQueue(connection.getBroker(), *this, node.queue, link, source, node.trackControllingLink()));
incoming[link] = q;
} else if (node.exchange) {
- boost::shared_ptr<Incoming> e(new IncomingToExchange(connection.getBroker(), *this, node.exchange, link, source, node.created && node.properties.trackControllingLink()));
+ boost::shared_ptr<Incoming> e(new IncomingToExchange(connection.getBroker(), *this, node.exchange, link, source, node.trackControllingLink()));
incoming[link] = e;
} else if (node.relay) {
boost::shared_ptr<Incoming> in(new IncomingToRelay(link, connection.getBroker(), *this, source, name, pn_link_name(link), node.relay));
@@ -523,7 +529,7 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s
if (type == CONSUMER && node.queue->hasExclusiveOwner() && !node.queue->isExclusiveOwner(this)) {
throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, std::string("Cannot consume from exclusive queue ") + node.queue->getName());
}
- boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, type, false, node.created && node.properties.trackControllingLink()));
+ boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, type, false, node.trackControllingLink()));
q->init();
filter.apply(q);
outgoing[link] = q;
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.h b/qpid/cpp/src/qpid/broker/amqp/Session.h
index 83376064c7..e48d563f27 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.h
@@ -134,6 +134,7 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses
NodeProperties properties;
bool created;
ResolvedNode(bool isDynamic) : properties(isDynamic), created(false) {}
+ bool trackControllingLink() const;
};
ResolvedNode resolve(const std::string name, pn_terminus_t* terminus, bool incoming);