summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp27
1 files changed, 21 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 27c1cc4ad7..40cb80010c 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -444,7 +444,7 @@ void Queue::purgeExpired()
Mutex::ScopedLock locker(messageLock);
messages->removeIf(boost::bind(&collect_if_expired, expired, _1));
}
- for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+ for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
}
@@ -826,8 +826,9 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
store->create(*this, _settings);
}
-void Queue::destroy()
+void Queue::destroyed()
{
+ unbind(broker->getExchanges());
if (alternateExchange.get()) {
Mutex::ScopedLock locker(messageLock);
while(!messages->empty()){
@@ -846,6 +847,7 @@ void Queue::destroy()
store = 0;//ensure we make no more calls to the store for this queue
}
if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
+ notifyDeleted();
}
void Queue::notifyDeleted()
@@ -865,9 +867,9 @@ void Queue::bound(const string& exchange, const string& key,
bindings.add(exchange, key, args);
}
-void Queue::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref)
+void Queue::unbind(ExchangeRegistry& exchanges)
{
- bindings.unbind(exchanges, shared_ref);
+ bindings.unbind(exchanges, shared_from_this());
}
void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
@@ -955,8 +957,7 @@ void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
if (broker.getQueues().destroyIf(queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
QPID_LOG(debug, "Auto-deleting " << queue->getName());
- queue->unbind(broker.getExchanges(), queue);
- queue->destroy();
+ queue->destroyed();
}
}
@@ -1175,6 +1176,20 @@ void Queue::flush()
if (u.acquired && store) store->flush(*this);
}
+bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
+ const qpid::framing::FieldTable& arguments)
+{
+ if (exchange->bind(shared_from_this(), key, &arguments)) {
+ bound(exchange->getName(), key, arguments);
+ if (exchange->isDurable() && isDurable()) {
+ store->bind(*exchange, *this, key, arguments);
+ }
+ return true;
+ } else {
+ return false;
+ }
+}
+
Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
bool Queue::UsageBarrier::acquire()