diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueRegistry.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 2 |
5 files changed, 22 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index 0fb521d626..820cc2f397 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -180,7 +180,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& std::pair<Queue::shared_ptr, bool> queue_created = getBroker().getQueues().declare( name, durable, - autoDelete && !exclusive, + autoDelete, exclusive ? &getConnection() : 0); queue = queue_created.first; assert(queue); @@ -202,7 +202,11 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& if (exclusive) { getConnection().exclusiveQueues.push_back(queue); } - } + } else { + if (exclusive && !queue->hasExclusiveOwner()) { + queue->setExclusiveOwner(&getConnection()); + } + } } if (exclusive && !queue->isExclusiveOwner(&getConnection())) throw ResourceLockedException( diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index a21db0f603..21d759c901 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -29,6 +29,7 @@ #include "SemanticHandler.h" #include <boost/utility/in_place_factory.hpp> +#include <boost/bind.hpp> using namespace boost; using namespace qpid::sys; @@ -76,10 +77,14 @@ void Connection::closed(){ try { while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); - broker.getQueues().destroy(q->getName()); + q->releaseExclusiveOwnership(); + if (q->canAutoDelete() && + broker.getQueues().destroyIf(q->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), q))) { + + q->unbind(broker.getExchanges(), q); + q->destroy(); + } exclusiveQueues.erase(exclusiveQueues.begin()); - q->unbind(broker.getExchanges(), q); - q->destroy(); } } catch(std::exception& e) { QPID_LOG(error, " Unhandled exception while closing session: " << diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 17ace522c3..7ee9106ef0 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -67,7 +67,7 @@ namespace qpid { const string name; const bool autodelete; MessageStore* const store; - const ConnectionToken* const owner; + const ConnectionToken* owner; Consumers acquirers; Consumers browsers; Messages messages; @@ -155,6 +155,8 @@ namespace qpid { uint32_t getConsumerCount() const; inline const string& getName() const { return name; } inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; } + inline void releaseExclusiveOwnership() { owner = 0; } + inline void setExclusiveOwner(const ConnectionToken* const o) { owner = o; } inline bool hasExclusiveConsumer() const { return exclusive; } inline bool hasExclusiveOwner() const { return owner != 0; } inline bool isDurable() const { return store != 0; } diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h index 1a766b810a..f73f467945 100644 --- a/cpp/src/qpid/broker/QueueRegistry.h +++ b/cpp/src/qpid/broker/QueueRegistry.h @@ -62,11 +62,14 @@ class QueueRegistry{ * */ void destroy(const string& name); - template <class Test> void destroyIf(const string& name, Test test) + template <class Test> bool destroyIf(const string& name, Test test) { qpid::sys::RWlock::ScopedWlock locker(lock); if (test()) { queues.erase(name); + return true; + } else { + return false; } } diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 4214f67bfe..1d49e08eb0 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -290,7 +290,7 @@ void SemanticState::ConsumerImpl::cancel() { if(queue) { queue->cancel(this); - if (queue->canAutoDelete()) { + if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { parent->getSession().getBroker().getQueues().destroyIf( queue->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue)); |