summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/QueueRegistry.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/QueueRegistry.cpp')
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.cpp42
1 files changed, 35 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp
index eb525b6727..2fdcf3b8e6 100644
--- a/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -23,10 +23,14 @@
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/Exchange.h"
#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include <sstream>
#include <assert.h>
+namespace _qmf = qmf::org::apache::qpid::broker;
using namespace qpid::broker;
using namespace qpid::sys;
using std::string;
@@ -44,7 +48,10 @@ QueueRegistry::declare(const string& name, const QueueSettings& settings,
bool recovering/*true if this declare is a
result of recovering queue
definition from persistent
- record*/)
+ record*/,
+ const OwnershipToken* owner,
+ std::string connectionId,
+ std::string userId)
{
std::pair<Queue::shared_ptr, bool> result;
{
@@ -53,25 +60,38 @@ QueueRegistry::declare(const string& name, const QueueSettings& settings,
if (i == queues.end()) {
Queue::shared_ptr queue = create(name, settings);
//Move this to factory also?
- if (alternate) {
+ if (alternate)
queue->setAlternateExchange(alternate);//need to do this *before* create
- alternate->incAlternateUsers();
- }
if (!recovering) {
//create persistent record if required
queue->create();
}
queues[name] = queue;
+ // NOTE: raiseEvent and queueCreate must be called with the lock held in
+ // order to ensure events are generated in the correct order.
+ // Call queueCreate before raiseEvents so it can add arguments that
+ // will be included in the management event.
+ if (getBroker()) getBroker()->getConfigurationObservers().queueCreate(queue);
result = std::pair<Queue::shared_ptr, bool>(queue, true);
} else {
result = std::pair<Queue::shared_ptr, bool>(i->second, false);
}
+ if (getBroker() && getBroker()->getManagementAgent()) {
+ getBroker()->getManagementAgent()->raiseEvent(
+ _qmf::EventQueueDeclare(
+ connectionId, userId, name,
+ settings.durable, owner, settings.autodelete,
+ alternate ? alternate->getName() : string(),
+ result.first->getSettings().asMap(),
+ result.second ? "created" : "existing"));
+ }
}
- if (getBroker() && result.second) getBroker()->getConfigurationObservers().queueCreate(result.first);
return result;
}
-void QueueRegistry::destroy(const string& name) {
+void QueueRegistry::destroy(
+ const string& name, const string& connectionId, const string& userId)
+{
Queue::shared_ptr q;
{
qpid::sys::RWlock::ScopedWlock locker(lock);
@@ -79,9 +99,17 @@ void QueueRegistry::destroy(const string& name) {
if (i != queues.end()) {
q = i->second;
queues.erase(i);
+ if (getBroker()) {
+ // NOTE: queueDestroy and raiseEvent must be called with the
+ // lock held in order to ensure events are generated
+ // in the correct order.
+ getBroker()->getConfigurationObservers().queueDestroy(q);
+ if (getBroker()->getManagementAgent())
+ getBroker()->getManagementAgent()->raiseEvent(
+ _qmf::EventQueueDelete(connectionId, userId, name));
+ }
}
}
- if (getBroker() && q) getBroker()->getConfigurationObservers().queueDestroy(q);
}
Queue::shared_ptr QueueRegistry::find(const string& name){