diff options
Diffstat (limited to 'cpp/src/qpid/broker/ExchangeRegistry.cpp')
-rw-r--r-- | cpp/src/qpid/broker/ExchangeRegistry.cpp | 44 |
1 files changed, 37 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index b31c7bd7b8..645918d526 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -29,20 +29,26 @@ #include "qpid/management/ManagementDirectExchange.h" #include "qpid/management/ManagementTopicExchange.h" #include "qpid/framing/reply_exceptions.h" +#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" +#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" using namespace qpid::broker; using namespace qpid::sys; using std::pair; using std::string; using qpid::framing::FieldTable; +using qpid::management::ManagementAgent; +namespace _qmf = qmf::org::apache::qpid::broker; pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type){ return declare(name, type, false, FieldTable()); } -pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type, - bool durable, const FieldTable& args){ +pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare( + const string& name, const string& type, bool durable, const FieldTable& args, + Exchange::shared_ptr alternate, const string& connectionId, const string& userId) +{ Exchange::shared_ptr exchange; std::pair<Exchange::shared_ptr, bool> result; { @@ -73,31 +79,55 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c } exchanges[name] = exchange; result = std::pair<Exchange::shared_ptr, bool>(exchange, true); + if (alternate) exchange->setAlternate(alternate); + // Call exchangeCreate inside the lock to ensure correct ordering. + if (broker) broker->getConfigurationObservers().exchangeCreate(exchange); } else { result = std::pair<Exchange::shared_ptr, bool>(i->second, false); } + if (broker && broker->getManagementAgent()) { + // Call raiseEvent inside the lock to ensure correct ordering. + broker->getManagementAgent()->raiseEvent( + _qmf::EventExchangeDeclare( + connectionId, + userId, + name, + type, + alternate ? alternate->getName() : string(), + durable, + false, + ManagementAgent::toMap(result.first->getArgs()), + result.second ? "created" : "existing")); + } } - if (broker && exchange) broker->getConfigurationObservers().exchangeCreate(exchange); return result; } -void ExchangeRegistry::destroy(const string& name){ +void ExchangeRegistry::destroy( + const string& name, const string& connectionId, const string& userId) +{ if (name.empty() || (name.find("amq.") == 0 && (name == "amq.direct" || name == "amq.fanout" || name == "amq.topic" || name == "amq.match")) || name == "qpid.management") throw framing::NotAllowedException(QPID_MSG("Cannot delete default exchange: '" << name << "'")); - Exchange::shared_ptr exchange; { RWlock::ScopedWlock locker(lock); ExchangeMap::iterator i = exchanges.find(name); if (i != exchanges.end()) { - exchange = i->second; + if (broker) { + // Call exchangeDestroy and raiseEvent inside the lock to ensure + // correct ordering. + broker->getConfigurationObservers().exchangeDestroy(i->second); + if (broker->getManagementAgent()) + broker->getManagementAgent()->raiseEvent( + _qmf::EventExchangeDelete(connectionId, userId, name)); + } i->second->destroy(); exchanges.erase(i); + } } - if (broker && exchange) broker->getConfigurationObservers().exchangeDestroy(exchange); } Exchange::shared_ptr ExchangeRegistry::find(const string& name){ |