summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/QueueRegistry.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/QueueRegistry.cpp')
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.cpp65
1 files changed, 19 insertions, 46 deletions
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp
index 1401356444..3521e08325 100644
--- a/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -21,7 +21,6 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
-#include "qpid/broker/QueueEvents.h"
#include "qpid/broker/Exchange.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/reply_exceptions.h"
@@ -32,50 +31,43 @@ using namespace qpid::broker;
using namespace qpid::sys;
using std::string;
-QueueRegistry::QueueRegistry(Broker* b) :
- counter(1), store(0), events(0), parent(0), lastNode(false), broker(b) {}
+QueueRegistry::QueueRegistry(Broker* b)
+{
+ setBroker(b);
+}
QueueRegistry::~QueueRegistry(){}
std::pair<Queue::shared_ptr, bool>
-QueueRegistry::declare(const string& declareName, bool durable,
- bool autoDelete, const OwnershipToken* owner,
+QueueRegistry::declare(const string& name, const QueueSettings& settings,
boost::shared_ptr<Exchange> alternate,
- const qpid::framing::FieldTable& arguments,
bool recovering/*true if this declare is a
result of recovering queue
- definition from persistente
+ definition from persistent
record*/)
{
- Queue::shared_ptr queue;
std::pair<Queue::shared_ptr, bool> result;
{
RWlock::ScopedWlock locker(lock);
- string name = declareName.empty() ? generateName() : declareName;
- assert(!name.empty());
QueueMap::iterator i = queues.find(name);
-
if (i == queues.end()) {
- queue.reset(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker));
+ Queue::shared_ptr queue = create(name, settings);
+ //Move this to factory also?
if (alternate) {
queue->setAlternateExchange(alternate);//need to do this *before* create
alternate->incAlternateUsers();
}
if (!recovering) {
- //apply settings & create persistent record if required
- queue->create(arguments);
- } else {
- //i.e. recovering a queue for which we already have a persistent record
- queue->configure(arguments);
+ //create persistent record if required
+ queue->create();
}
queues[name] = queue;
- if (lastNode) queue->setLastNodeFailure();
result = std::pair<Queue::shared_ptr, bool>(queue, true);
} else {
result = std::pair<Queue::shared_ptr, bool>(i->second, false);
}
}
- if (broker && queue) broker->getConfigurationObservers().queueCreate(queue);
+ if (getBroker() && result.second) getBroker()->getConfigurationObservers().queueCreate(result.first);
return result;
}
@@ -85,11 +77,11 @@ void QueueRegistry::destroy(const string& name) {
qpid::sys::RWlock::ScopedWlock locker(lock);
QueueMap::iterator i = queues.find(name);
if (i != queues.end()) {
- Queue::shared_ptr q = i->second;
+ q = i->second;
queues.erase(i);
}
}
- if (broker && q) broker->getConfigurationObservers().queueDestroy(q);
+ if (getBroker() && q) getBroker()->getConfigurationObservers().queueDestroy(q);
}
Queue::shared_ptr QueueRegistry::find(const string& name){
@@ -108,36 +100,17 @@ Queue::shared_ptr QueueRegistry::get(const string& name) {
return q;
}
-string QueueRegistry::generateName(){
- string name;
- do {
- std::stringstream ss;
- ss << "tmp_" << counter++;
- name = ss.str();
- // Thread safety: Private function, only called with lock held
- // so this is OK.
- } while(queues.find(name) != queues.end());
- return name;
-}
-
void QueueRegistry::setStore (MessageStore* _store)
{
- store = _store;
+ QueueFactory::setStore(_store);
}
-MessageStore* QueueRegistry::getStore() const {
- return store;
+MessageStore* QueueRegistry::getStore() const
+{
+ return QueueFactory::getStore();
}
-void QueueRegistry::updateQueueClusterState(bool _lastNode)
+void QueueRegistry::setParent(qpid::management::Manageable* _parent)
{
- RWlock::ScopedRlock locker(lock);
- for (QueueMap::iterator i = queues.begin(); i != queues.end(); i++) {
- if (_lastNode){
- i->second->setLastNodeFailure();
- } else {
- i->second->clearLastNodeFailure();
- }
- }
- lastNode = _lastNode;
+ QueueFactory::setParent(_parent);
}