summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2014-01-10 22:22:54 +0000
committerGordon Sim <gsim@apache.org>2014-01-10 22:22:54 +0000
commit86b2f3ac9b93951fb0017ddbe8980ea0212b9ece (patch)
tree317d0434f9ff3f2d132630b12c5a05ba0eb79ec5
parent1ff021ef3b7db9a1b552bffa12603a61154fa3c5 (diff)
downloadqpid-python-86b2f3ac9b93951fb0017ddbe8980ea0212b9ece.tar.gz
QPID-5467: fix handling of delete-on-close
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1557272 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.h2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp18
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.h3
4 files changed, 25 insertions, 8 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
index 7a0bc6c0d8..2ae676a66f 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -66,7 +66,8 @@ OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source,
current(0), outstanding(0),
buffer(1024)/*used only for header at present*/,
//for exclusive queues, assume unreliable unless reliable is explicitly requested; otherwise assume reliable unless unreliable requested
- unreliable(exclusive ? !requested_reliable(link) : requested_unreliable(link))
+ unreliable(exclusive ? !requested_reliable(link) : requested_unreliable(link)),
+ cancelled(false)
{
for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
deliveries[i].init(i);
@@ -181,6 +182,13 @@ void OutgoingFromQueue::detached()
}
if (exclusive) queue->releaseExclusiveOwnership();
else if (isControllingUser) queue->releaseFromUse(true);
+ cancelled = true;
+}
+
+
+OutgoingFromQueue::~OutgoingFromQueue()
+{
+ if (!cancelled && isControllingUser) queue->releaseFromUse(true);
}
//Consumer interface:
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
index 48f041171c..3808b68d32 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
@@ -90,6 +90,7 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public
public:
OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&,
qpid::sys::OutputControl& o, SubscriptionType type, bool exclusive, bool isControllingUser);
+ ~OutgoingFromQueue();
void setSubjectFilter(const std::string&);
void setSelectorFilter(const std::string&);
void init();
@@ -144,6 +145,7 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public
std::string subjectFilter;
boost::scoped_ptr<Selector> selector;
bool unreliable;
+ bool cancelled;
};
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index 537eb09c6b..537912ea83 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -251,14 +251,20 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te
}
qpid::framing::FieldTable args;
qpid::amqp_0_10::translate(node.properties.getProperties(), args);
- node.exchange = connection.getBroker().createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.isAutodelete(),
- node.properties.getAlternateExchange(),
- args, connection.getUserId(), connection.getId()).first;
+ std::pair<boost::shared_ptr<Exchange>, bool> result
+ = connection.getBroker().createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.isAutodelete(),
+ node.properties.getAlternateExchange(),
+ args, connection.getUserId(), connection.getId());
+ node.exchange = result.first;
+ node.created = result.second;
} else {
if (node.exchange) {
QPID_LOG_CAT(warning, model, "Node name will be ambiguous, creation of queue named " << name << " requested when exchange of the same name already exists");
}
- node.queue = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()).first;
+ std::pair<boost::shared_ptr<Queue>, bool> result
+ = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId());
+ node.queue = result.first;
+ node.created = result.second;
}
} else {
boost::shared_ptr<NodePolicy> nodePolicy = connection.getNodePolicies().match(name);
@@ -415,7 +421,7 @@ void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::s
source = sourceAddress;
}
if (node.queue) {
- boost::shared_ptr<Incoming> q(new IncomingToQueue(connection.getBroker(), *this, node.queue, link, source, node.properties.trackControllingLink()));
+ boost::shared_ptr<Incoming> q(new IncomingToQueue(connection.getBroker(), *this, node.queue, link, source, node.created && node.properties.trackControllingLink()));
incoming[link] = q;
} else if (node.exchange) {
boost::shared_ptr<Incoming> e(new IncomingToExchange(connection.getBroker(), *this, node.exchange, link, source));
@@ -460,7 +466,7 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s
if (type == CONSUMER && node.queue->hasExclusiveOwner() && !node.queue->isExclusiveOwner(this)) {
throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, std::string("Cannot consume from exclusive queue ") + node.queue->getName());
}
- boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, type, false, node.properties.trackControllingLink()));
+ boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, type, false, node.created && node.properties.trackControllingLink()));
q->init();
filter.apply(q);
outgoing[link] = q;
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.h b/qpid/cpp/src/qpid/broker/amqp/Session.h
index 1d9c2d161d..523e17948b 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.h
@@ -100,7 +100,8 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses
boost::shared_ptr<qpid::broker::amqp::Topic> topic;
boost::shared_ptr<Relay> relay;
NodeProperties properties;
- ResolvedNode(bool isDynamic) : properties(isDynamic) {}
+ bool created;
+ ResolvedNode(bool isDynamic) : properties(isDynamic), created(false) {}
};
ResolvedNode resolve(const std::string name, pn_terminus_t* terminus, bool incoming);