diff options
Diffstat (limited to 'cpp/src/qpid/broker/QueueRegistry.cpp')
-rw-r--r-- | cpp/src/qpid/broker/QueueRegistry.cpp | 42 |
1 files changed, 35 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index eb525b6727..2fdcf3b8e6 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -23,10 +23,14 @@ #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/Exchange.h" #include "qpid/log/Statement.h" +#include "qpid/management/ManagementAgent.h" #include "qpid/framing/reply_exceptions.h" +#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" +#include "qmf/org/apache/qpid/broker/EventQueueDelete.h" #include <sstream> #include <assert.h> +namespace _qmf = qmf::org::apache::qpid::broker; using namespace qpid::broker; using namespace qpid::sys; using std::string; @@ -44,7 +48,10 @@ QueueRegistry::declare(const string& name, const QueueSettings& settings, bool recovering/*true if this declare is a result of recovering queue definition from persistent - record*/) + record*/, + const OwnershipToken* owner, + std::string connectionId, + std::string userId) { std::pair<Queue::shared_ptr, bool> result; { @@ -53,25 +60,38 @@ QueueRegistry::declare(const string& name, const QueueSettings& settings, if (i == queues.end()) { Queue::shared_ptr queue = create(name, settings); //Move this to factory also? - if (alternate) { + if (alternate) queue->setAlternateExchange(alternate);//need to do this *before* create - alternate->incAlternateUsers(); - } if (!recovering) { //create persistent record if required queue->create(); } queues[name] = queue; + // NOTE: raiseEvent and queueCreate must be called with the lock held in + // order to ensure events are generated in the correct order. + // Call queueCreate before raiseEvents so it can add arguments that + // will be included in the management event. + if (getBroker()) getBroker()->getConfigurationObservers().queueCreate(queue); result = std::pair<Queue::shared_ptr, bool>(queue, true); } else { result = std::pair<Queue::shared_ptr, bool>(i->second, false); } + if (getBroker() && getBroker()->getManagementAgent()) { + getBroker()->getManagementAgent()->raiseEvent( + _qmf::EventQueueDeclare( + connectionId, userId, name, + settings.durable, owner, settings.autodelete, + alternate ? alternate->getName() : string(), + result.first->getSettings().asMap(), + result.second ? "created" : "existing")); + } } - if (getBroker() && result.second) getBroker()->getConfigurationObservers().queueCreate(result.first); return result; } -void QueueRegistry::destroy(const string& name) { +void QueueRegistry::destroy( + const string& name, const string& connectionId, const string& userId) +{ Queue::shared_ptr q; { qpid::sys::RWlock::ScopedWlock locker(lock); @@ -79,9 +99,17 @@ void QueueRegistry::destroy(const string& name) { if (i != queues.end()) { q = i->second; queues.erase(i); + if (getBroker()) { + // NOTE: queueDestroy and raiseEvent must be called with the + // lock held in order to ensure events are generated + // in the correct order. + getBroker()->getConfigurationObservers().queueDestroy(q); + if (getBroker()->getManagementAgent()) + getBroker()->getManagementAgent()->raiseEvent( + _qmf::EventQueueDelete(connectionId, userId, name)); + } } } - if (getBroker() && q) getBroker()->getConfigurationObservers().queueDestroy(q); } Queue::shared_ptr QueueRegistry::find(const string& name){ |