diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 27 |
1 files changed, 21 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 27c1cc4ad7..40cb80010c 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -444,7 +444,7 @@ void Queue::purgeExpired() Mutex::ScopedLock locker(messageLock); messages->removeIf(boost::bind(&collect_if_expired, expired, _1)); } - for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); + for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } } @@ -826,8 +826,9 @@ void Queue::configure(const FieldTable& _settings, bool recovering) store->create(*this, _settings); } -void Queue::destroy() +void Queue::destroyed() { + unbind(broker->getExchanges()); if (alternateExchange.get()) { Mutex::ScopedLock locker(messageLock); while(!messages->empty()){ @@ -846,6 +847,7 @@ void Queue::destroy() store = 0;//ensure we make no more calls to the store for this queue } if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>(); + notifyDeleted(); } void Queue::notifyDeleted() @@ -865,9 +867,9 @@ void Queue::bound(const string& exchange, const string& key, bindings.add(exchange, key, args); } -void Queue::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref) +void Queue::unbind(ExchangeRegistry& exchanges) { - bindings.unbind(exchanges, shared_ref); + bindings.unbind(exchanges, shared_from_this()); } void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy) @@ -955,8 +957,7 @@ void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue) if (broker.getQueues().destroyIf(queue->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) { QPID_LOG(debug, "Auto-deleting " << queue->getName()); - queue->unbind(broker.getExchanges(), queue); - queue->destroy(); + queue->destroyed(); } } @@ -1175,6 +1176,20 @@ void Queue::flush() if (u.acquired && store) store->flush(*this); } +bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key, + const qpid::framing::FieldTable& arguments) +{ + if (exchange->bind(shared_from_this(), key, &arguments)) { + bound(exchange->getName(), key, arguments); + if (exchange->isDurable() && isDurable()) { + store->bind(*exchange, *this, key, arguments); + } + return true; + } else { + return false; + } +} + Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {} bool Queue::UsageBarrier::acquire() |