diff options
Diffstat (limited to 'cpp/src/qpid/broker/QueueRegistry.cpp')
-rw-r--r-- | cpp/src/qpid/broker/QueueRegistry.cpp | 65 |
1 files changed, 19 insertions, 46 deletions
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index 1401356444..3521e08325 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -21,7 +21,6 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" -#include "qpid/broker/QueueEvents.h" #include "qpid/broker/Exchange.h" #include "qpid/log/Statement.h" #include "qpid/framing/reply_exceptions.h" @@ -32,50 +31,43 @@ using namespace qpid::broker; using namespace qpid::sys; using std::string; -QueueRegistry::QueueRegistry(Broker* b) : - counter(1), store(0), events(0), parent(0), lastNode(false), broker(b) {} +QueueRegistry::QueueRegistry(Broker* b) +{ + setBroker(b); +} QueueRegistry::~QueueRegistry(){} std::pair<Queue::shared_ptr, bool> -QueueRegistry::declare(const string& declareName, bool durable, - bool autoDelete, const OwnershipToken* owner, +QueueRegistry::declare(const string& name, const QueueSettings& settings, boost::shared_ptr<Exchange> alternate, - const qpid::framing::FieldTable& arguments, bool recovering/*true if this declare is a result of recovering queue - definition from persistente + definition from persistent record*/) { - Queue::shared_ptr queue; std::pair<Queue::shared_ptr, bool> result; { RWlock::ScopedWlock locker(lock); - string name = declareName.empty() ? generateName() : declareName; - assert(!name.empty()); QueueMap::iterator i = queues.find(name); - if (i == queues.end()) { - queue.reset(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker)); + Queue::shared_ptr queue = create(name, settings); + //Move this to factory also? if (alternate) { queue->setAlternateExchange(alternate);//need to do this *before* create alternate->incAlternateUsers(); } if (!recovering) { - //apply settings & create persistent record if required - queue->create(arguments); - } else { - //i.e. recovering a queue for which we already have a persistent record - queue->configure(arguments); + //create persistent record if required + queue->create(); } queues[name] = queue; - if (lastNode) queue->setLastNodeFailure(); result = std::pair<Queue::shared_ptr, bool>(queue, true); } else { result = std::pair<Queue::shared_ptr, bool>(i->second, false); } } - if (broker && queue) broker->getConfigurationObservers().queueCreate(queue); + if (getBroker() && result.second) getBroker()->getConfigurationObservers().queueCreate(result.first); return result; } @@ -85,11 +77,11 @@ void QueueRegistry::destroy(const string& name) { qpid::sys::RWlock::ScopedWlock locker(lock); QueueMap::iterator i = queues.find(name); if (i != queues.end()) { - Queue::shared_ptr q = i->second; + q = i->second; queues.erase(i); } } - if (broker && q) broker->getConfigurationObservers().queueDestroy(q); + if (getBroker() && q) getBroker()->getConfigurationObservers().queueDestroy(q); } Queue::shared_ptr QueueRegistry::find(const string& name){ @@ -108,36 +100,17 @@ Queue::shared_ptr QueueRegistry::get(const string& name) { return q; } -string QueueRegistry::generateName(){ - string name; - do { - std::stringstream ss; - ss << "tmp_" << counter++; - name = ss.str(); - // Thread safety: Private function, only called with lock held - // so this is OK. - } while(queues.find(name) != queues.end()); - return name; -} - void QueueRegistry::setStore (MessageStore* _store) { - store = _store; + QueueFactory::setStore(_store); } -MessageStore* QueueRegistry::getStore() const { - return store; +MessageStore* QueueRegistry::getStore() const +{ + return QueueFactory::getStore(); } -void QueueRegistry::updateQueueClusterState(bool _lastNode) +void QueueRegistry::setParent(qpid::management::Manageable* _parent) { - RWlock::ScopedRlock locker(lock); - for (QueueMap::iterator i = queues.begin(); i != queues.end(); i++) { - if (_lastNode){ - i->second->setLastNodeFailure(); - } else { - i->second->clearLastNodeFailure(); - } - } - lastNode = _lastNode; + QueueFactory::setParent(_parent); } |