summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp8
-rw-r--r--cpp/src/qpid/broker/Connection.cpp11
-rw-r--r--cpp/src/qpid/broker/Queue.h4
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.h5
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp2
5 files changed, 22 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index 0fb521d626..820cc2f397 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -180,7 +180,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
std::pair<Queue::shared_ptr, bool> queue_created =
getBroker().getQueues().declare(
name, durable,
- autoDelete && !exclusive,
+ autoDelete,
exclusive ? &getConnection() : 0);
queue = queue_created.first;
assert(queue);
@@ -202,7 +202,11 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
if (exclusive) {
getConnection().exclusiveQueues.push_back(queue);
}
- }
+ } else {
+ if (exclusive && !queue->hasExclusiveOwner()) {
+ queue->setExclusiveOwner(&getConnection());
+ }
+ }
}
if (exclusive && !queue->isExclusiveOwner(&getConnection()))
throw ResourceLockedException(
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index a21db0f603..21d759c901 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -29,6 +29,7 @@
#include "SemanticHandler.h"
#include <boost/utility/in_place_factory.hpp>
+#include <boost/bind.hpp>
using namespace boost;
using namespace qpid::sys;
@@ -76,10 +77,14 @@ void Connection::closed(){
try {
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
- broker.getQueues().destroy(q->getName());
+ q->releaseExclusiveOwnership();
+ if (q->canAutoDelete() &&
+ broker.getQueues().destroyIf(q->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), q))) {
+
+ q->unbind(broker.getExchanges(), q);
+ q->destroy();
+ }
exclusiveQueues.erase(exclusiveQueues.begin());
- q->unbind(broker.getExchanges(), q);
- q->destroy();
}
} catch(std::exception& e) {
QPID_LOG(error, " Unhandled exception while closing session: " <<
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 17ace522c3..7ee9106ef0 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -67,7 +67,7 @@ namespace qpid {
const string name;
const bool autodelete;
MessageStore* const store;
- const ConnectionToken* const owner;
+ const ConnectionToken* owner;
Consumers acquirers;
Consumers browsers;
Messages messages;
@@ -155,6 +155,8 @@ namespace qpid {
uint32_t getConsumerCount() const;
inline const string& getName() const { return name; }
inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
+ inline void releaseExclusiveOwnership() { owner = 0; }
+ inline void setExclusiveOwner(const ConnectionToken* const o) { owner = o; }
inline bool hasExclusiveConsumer() const { return exclusive; }
inline bool hasExclusiveOwner() const { return owner != 0; }
inline bool isDurable() const { return store != 0; }
diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h
index 1a766b810a..f73f467945 100644
--- a/cpp/src/qpid/broker/QueueRegistry.h
+++ b/cpp/src/qpid/broker/QueueRegistry.h
@@ -62,11 +62,14 @@ class QueueRegistry{
*
*/
void destroy(const string& name);
- template <class Test> void destroyIf(const string& name, Test test)
+ template <class Test> bool destroyIf(const string& name, Test test)
{
qpid::sys::RWlock::ScopedWlock locker(lock);
if (test()) {
queues.erase(name);
+ return true;
+ } else {
+ return false;
}
}
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 4214f67bfe..1d49e08eb0 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -290,7 +290,7 @@ void SemanticState::ConsumerImpl::cancel()
{
if(queue) {
queue->cancel(this);
- if (queue->canAutoDelete()) {
+ if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
parent->getSession().getBroker().getQueues().destroyIf(
queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue));