diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/QueueRegistry.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueRegistry.cpp | 145 |
1 files changed, 145 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp new file mode 100644 index 0000000000..1283a42e6d --- /dev/null +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp @@ -0,0 +1,145 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Broker.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/Exchange.h" +#include "qpid/log/Statement.h" +#include "qpid/management/ManagementAgent.h" +#include "qpid/framing/reply_exceptions.h" +#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" +#include "qmf/org/apache/qpid/broker/EventQueueDelete.h" +#include <sstream> +#include <assert.h> + +namespace _qmf = qmf::org::apache::qpid::broker; +using namespace qpid::broker; +using namespace qpid::sys; +using std::string; + +QueueRegistry::QueueRegistry(Broker* b) +{ + setBroker(b); +} + +QueueRegistry::~QueueRegistry(){} + +std::pair<Queue::shared_ptr, bool> +QueueRegistry::declare(const string& name, const QueueSettings& settings, + boost::shared_ptr<Exchange> alternate, + bool recovering/*true if this declare is a + result of recovering queue + definition from persistent + record*/, + const OwnershipToken* owner, + std::string connectionId, + std::string userId) +{ + std::pair<Queue::shared_ptr, bool> result; + { + RWlock::ScopedWlock locker(lock); + QueueMap::iterator i = queues.find(name); + if (i == queues.end()) { + Queue::shared_ptr queue = create(name, settings); + // Allow BrokerObserver to modify settings before storing the message. + if (getBroker()) getBroker()->getBrokerObservers().queueCreate(queue); + //Move this to factory also? + if (alternate) + queue->setAlternateExchange(alternate);//need to do this *before* create + queue->setOwningUser(userId); + + if (!recovering) { + //create persistent record if required + queue->create(); + } + queues[name] = queue; + result = std::pair<Queue::shared_ptr, bool>(queue, true); + } else { + result = std::pair<Queue::shared_ptr, bool>(i->second, false); + } + if (getBroker() && getBroker()->getManagementAgent()) { + getBroker()->getManagementAgent()->raiseEvent( + _qmf::EventQueueDeclare( + connectionId, userId, name, + settings.durable, owner, settings.autodelete, + alternate ? alternate->getName() : string(), + result.first->getSettings().asMap(), + result.second ? "created" : "existing")); + } + } + return result; +} + +void QueueRegistry::destroy( + const string& name, const string& connectionId, const string& userId) +{ + Queue::shared_ptr q; + { + qpid::sys::RWlock::ScopedWlock locker(lock); + QueueMap::iterator i = queues.find(name); + if (i != queues.end()) { + q = i->second; + queues.erase(i); + if (getBroker()) { + // NOTE: queueDestroy and raiseEvent must be called with the + // lock held in order to ensure events are generated + // in the correct order. + getBroker()->getBrokerObservers().queueDestroy(q); + if (getBroker()->getManagementAgent()) + getBroker()->getManagementAgent()->raiseEvent( + _qmf::EventQueueDelete(connectionId, userId, name)); + } + } + } +} + +Queue::shared_ptr QueueRegistry::find(const string& name){ + RWlock::ScopedRlock locker(lock); + QueueMap::iterator i = queues.find(name); + if (i == queues.end()) { + return Queue::shared_ptr(); + } else { + return i->second; + } +} + +Queue::shared_ptr QueueRegistry::get(const string& name) { + Queue::shared_ptr q = find(name); + if (!q) { + throw framing::NotFoundException(QPID_MSG("Queue not found: "<<name)); + } + return q; +} + +void QueueRegistry::setStore (MessageStore* _store) +{ + QueueFactory::setStore(_store); +} + +MessageStore* QueueRegistry::getStore() const +{ + return QueueFactory::getStore(); +} + +void QueueRegistry::setParent(qpid::management::Manageable* _parent) +{ + QueueFactory::setParent(_parent); +} |