diff options
Diffstat (limited to 'cpp/src/qpid/broker/QueueRegistry.cpp')
-rw-r--r-- | cpp/src/qpid/broker/QueueRegistry.cpp | 66 |
1 files changed, 38 insertions, 28 deletions
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index 236d5ae34c..1401356444 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/QueueEvents.h" @@ -46,40 +47,49 @@ QueueRegistry::declare(const string& declareName, bool durable, definition from persistente record*/) { - RWlock::ScopedWlock locker(lock); - string name = declareName.empty() ? generateName() : declareName; - assert(!name.empty()); - QueueMap::iterator i = queues.find(name); + Queue::shared_ptr queue; + std::pair<Queue::shared_ptr, bool> result; + { + RWlock::ScopedWlock locker(lock); + string name = declareName.empty() ? generateName() : declareName; + assert(!name.empty()); + QueueMap::iterator i = queues.find(name); - if (i == queues.end()) { - Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker)); - if (alternate) { - queue->setAlternateExchange(alternate);//need to do this *before* create - alternate->incAlternateUsers(); - } - if (!recovering) { - //apply settings & create persistent record if required - queue->create(arguments); + if (i == queues.end()) { + queue.reset(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker)); + if (alternate) { + queue->setAlternateExchange(alternate);//need to do this *before* create + alternate->incAlternateUsers(); + } + if (!recovering) { + //apply settings & create persistent record if required + queue->create(arguments); + } else { + //i.e. recovering a queue for which we already have a persistent record + queue->configure(arguments); + } + queues[name] = queue; + if (lastNode) queue->setLastNodeFailure(); + result = std::pair<Queue::shared_ptr, bool>(queue, true); } else { - //i.e. recovering a queue for which we already have a persistent record - queue->configure(arguments); + result = std::pair<Queue::shared_ptr, bool>(i->second, false); } - queues[name] = queue; - if (lastNode) queue->setLastNodeFailure(); - - return std::pair<Queue::shared_ptr, bool>(queue, true); - } else { - return std::pair<Queue::shared_ptr, bool>(i->second, false); } + if (broker && queue) broker->getConfigurationObservers().queueCreate(queue); + return result; } -void QueueRegistry::destroyLH (const string& name){ - queues.erase(name); -} - -void QueueRegistry::destroy (const string& name){ - RWlock::ScopedWlock locker(lock); - destroyLH (name); +void QueueRegistry::destroy(const string& name) { + Queue::shared_ptr q; + { + qpid::sys::RWlock::ScopedWlock locker(lock); + QueueMap::iterator i = queues.find(name); + if (i != queues.end()) { + Queue::shared_ptr q = i->second; + queues.erase(i); + } + } + if (broker && q) broker->getConfigurationObservers().queueDestroy(q); } Queue::shared_ptr QueueRegistry::find(const string& name){ |