From 86b2f3ac9b93951fb0017ddbe8980ea0212b9ece Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Fri, 10 Jan 2014 22:22:54 +0000 Subject: QPID-5467: fix handling of delete-on-close git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1557272 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp | 10 +++++++++- qpid/cpp/src/qpid/broker/amqp/Outgoing.h | 2 ++ qpid/cpp/src/qpid/broker/amqp/Session.cpp | 18 ++++++++++++------ qpid/cpp/src/qpid/broker/amqp/Session.h | 3 ++- 4 files changed, 25 insertions(+), 8 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index 7a0bc6c0d8..2ae676a66f 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -66,7 +66,8 @@ OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, current(0), outstanding(0), buffer(1024)/*used only for header at present*/, //for exclusive queues, assume unreliable unless reliable is explicitly requested; otherwise assume reliable unless unreliable requested - unreliable(exclusive ? !requested_reliable(link) : requested_unreliable(link)) + unreliable(exclusive ? !requested_reliable(link) : requested_unreliable(link)), + cancelled(false) { for (size_t i = 0 ; i < deliveries.capacity(); ++i) { deliveries[i].init(i); @@ -181,6 +182,13 @@ void OutgoingFromQueue::detached() } if (exclusive) queue->releaseExclusiveOwnership(); else if (isControllingUser) queue->releaseFromUse(true); + cancelled = true; +} + + +OutgoingFromQueue::~OutgoingFromQueue() +{ + if (!cancelled && isControllingUser) queue->releaseFromUse(true); } //Consumer interface: diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h index 48f041171c..3808b68d32 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h @@ -90,6 +90,7 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public public: OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr q, pn_link_t* l, Session&, qpid::sys::OutputControl& o, SubscriptionType type, bool exclusive, bool isControllingUser); + ~OutgoingFromQueue(); void setSubjectFilter(const std::string&); void setSelectorFilter(const std::string&); void init(); @@ -144,6 +145,7 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public std::string subjectFilter; boost::scoped_ptr selector; bool unreliable; + bool cancelled; }; }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 537eb09c6b..537912ea83 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -251,14 +251,20 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te } qpid::framing::FieldTable args; qpid::amqp_0_10::translate(node.properties.getProperties(), args); - node.exchange = connection.getBroker().createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.isAutodelete(), - node.properties.getAlternateExchange(), - args, connection.getUserId(), connection.getId()).first; + std::pair, bool> result + = connection.getBroker().createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.isAutodelete(), + node.properties.getAlternateExchange(), + args, connection.getUserId(), connection.getId()); + node.exchange = result.first; + node.created = result.second; } else { if (node.exchange) { QPID_LOG_CAT(warning, model, "Node name will be ambiguous, creation of queue named " << name << " requested when exchange of the same name already exists"); } - node.queue = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()).first; + std::pair, bool> result + = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()); + node.queue = result.first; + node.created = result.second; } } else { boost::shared_ptr nodePolicy = connection.getNodePolicies().match(name); @@ -415,7 +421,7 @@ void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::s source = sourceAddress; } if (node.queue) { - boost::shared_ptr q(new IncomingToQueue(connection.getBroker(), *this, node.queue, link, source, node.properties.trackControllingLink())); + boost::shared_ptr q(new IncomingToQueue(connection.getBroker(), *this, node.queue, link, source, node.created && node.properties.trackControllingLink())); incoming[link] = q; } else if (node.exchange) { boost::shared_ptr e(new IncomingToExchange(connection.getBroker(), *this, node.exchange, link, source)); @@ -460,7 +466,7 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s if (type == CONSUMER && node.queue->hasExclusiveOwner() && !node.queue->isExclusiveOwner(this)) { throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, std::string("Cannot consume from exclusive queue ") + node.queue->getName()); } - boost::shared_ptr q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, type, false, node.properties.trackControllingLink())); + boost::shared_ptr q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, type, false, node.created && node.properties.trackControllingLink())); q->init(); filter.apply(q); outgoing[link] = q; diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.h b/qpid/cpp/src/qpid/broker/amqp/Session.h index 1d9c2d161d..523e17948b 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.h +++ b/qpid/cpp/src/qpid/broker/amqp/Session.h @@ -100,7 +100,8 @@ class Session : public ManagedSession, public boost::enable_shared_from_this topic; boost::shared_ptr relay; NodeProperties properties; - ResolvedNode(bool isDynamic) : properties(isDynamic) {} + bool created; + ResolvedNode(bool isDynamic) : properties(isDynamic), created(false) {} }; ResolvedNode resolve(const std::string name, pn_terminus_t* terminus, bool incoming); -- cgit v1.2.1