diff options
author | Gordon Sim <gsim@apache.org> | 2015-11-06 20:32:27 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2015-11-06 20:32:27 +0000 |
commit | 317ef8dba524d3aee6c2006c53637e6d38c97d2b (patch) | |
tree | 3feed5a26f3c674d171c684302fa642aac1f8cdb /qpid | |
parent | b1dcd3eb8ac737c9be61c1c67bcb57ff04344c63 (diff) | |
download | qpid-python-317ef8dba524d3aee6c2006c53637e6d38c97d2b.tar.gz |
QPID-6834: allow delete-on-close in queue policies
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1713025 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueSettings.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Session.cpp | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Session.h | 1 |
3 files changed, 14 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.cpp b/qpid/cpp/src/qpid/broker/QueueSettings.cpp index 8de8539579..29d482d7f2 100644 --- a/qpid/cpp/src/qpid/broker/QueueSettings.cpp +++ b/qpid/cpp/src/qpid/broker/QueueSettings.cpp @@ -60,6 +60,7 @@ const std::string MAX_PAGES("qpid.max_pages_loaded"); const std::string PAGE_FACTOR("qpid.page_factor"); const std::string FILTER("qpid.filter"); const std::string LIFETIME_POLICY("qpid.lifetime-policy"); +const std::string DELETE_ON_CLOSE_KEY("delete-on-close"); const std::string DELETE_IF_UNUSED_KEY("delete-if-unused"); const std::string DELETE_IF_UNUSED_AND_EMPTY_KEY("delete-if-unused-and-empty"); const std::string MANUAL("manual"); @@ -234,6 +235,9 @@ bool QueueSettings::handle(const std::string& key, const qpid::types::Variant& v autodelete = true; } else if (value.asString() == MANUAL) { autodelete = false; + } else if (value.asString() == DELETE_ON_CLOSE_KEY) { + lifetime = DELETE_ON_CLOSE; + autodelete = true; } else { QPID_LOG(warning, "Invalid value for " << LIFETIME_POLICY << ": " << value); } diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 2f793aea14..e0ca8f58e0 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -231,6 +231,11 @@ class IncomingToCoordinator : public DecodingIncoming private: }; +bool Session::ResolvedNode::trackControllingLink() const +{ + return created && (properties.trackControllingLink() || (queue && queue->getSettings().lifetime == QueueSettings::DELETE_ON_CLOSE)); +} + Session::Session(pn_session_t* s, Connection& c, qpid::sys::OutputControl& o) : ManagedSession(c.getBroker(), c, (boost::format("%1%") % s).str()), session(s), connection(c), out(o), deleted(false), authorise(connection.getUserId(), connection.getBroker().getAcl()), @@ -306,6 +311,7 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te std::pair<boost::shared_ptr<Queue>, boost::shared_ptr<Topic> > result = nodePolicy->create(name, connection); node.queue = result.first; node.topic = result.second; + node.created = node.queue || node.topic; if (node.topic) node.exchange = node.topic->getExchange(); if (node.queue) { @@ -478,10 +484,10 @@ void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::s source = sourceAddress; } if (node.queue) { - boost::shared_ptr<Incoming> q(new IncomingToQueue(connection.getBroker(), *this, node.queue, link, source, node.created && node.properties.trackControllingLink())); + boost::shared_ptr<Incoming> q(new IncomingToQueue(connection.getBroker(), *this, node.queue, link, source, node.trackControllingLink())); incoming[link] = q; } else if (node.exchange) { - boost::shared_ptr<Incoming> e(new IncomingToExchange(connection.getBroker(), *this, node.exchange, link, source, node.created && node.properties.trackControllingLink())); + boost::shared_ptr<Incoming> e(new IncomingToExchange(connection.getBroker(), *this, node.exchange, link, source, node.trackControllingLink())); incoming[link] = e; } else if (node.relay) { boost::shared_ptr<Incoming> in(new IncomingToRelay(link, connection.getBroker(), *this, source, name, pn_link_name(link), node.relay)); @@ -523,7 +529,7 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s if (type == CONSUMER && node.queue->hasExclusiveOwner() && !node.queue->isExclusiveOwner(this)) { throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, std::string("Cannot consume from exclusive queue ") + node.queue->getName()); } - boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, type, false, node.created && node.properties.trackControllingLink())); + boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, type, false, node.trackControllingLink())); q->init(); filter.apply(q); outgoing[link] = q; diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.h b/qpid/cpp/src/qpid/broker/amqp/Session.h index 83376064c7..e48d563f27 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.h +++ b/qpid/cpp/src/qpid/broker/amqp/Session.h @@ -134,6 +134,7 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses NodeProperties properties; bool created; ResolvedNode(bool isDynamic) : properties(isDynamic), created(false) {} + bool trackControllingLink() const; }; ResolvedNode resolve(const std::string name, pn_terminus_t* terminus, bool incoming); |