diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 22 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ExchangeRegistry.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 11 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementDirectExchange.cpp | 63 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementDirectExchange.h | 59 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementExchange.cpp | 72 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementTopicExchange.cpp | 76 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementTopicExchange.h (renamed from cpp/src/qpid/management/ManagementExchange.h) | 29 |
10 files changed, 260 insertions, 102 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 4fe5d7b85b..281814a828 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -636,8 +636,10 @@ libqpidbroker_la_SOURCES = \ qpid/management/IdAllocator.h \ qpid/management/ManagementAgent.cpp \ qpid/management/ManagementAgent.h \ - qpid/management/ManagementExchange.cpp \ - qpid/management/ManagementExchange.h \ + qpid/management/ManagementDirectExchange.cpp \ + qpid/management/ManagementDirectExchange.h \ + qpid/management/ManagementTopicExchange.cpp \ + qpid/management/ManagementTopicExchange.h \ qpid/sys/TCPIOPlugin.cpp diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index cbccca6eea..d94f228734 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -35,7 +35,8 @@ #include "qmf/org/apache/qpid/broker/Package.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h" -#include "qpid/management/ManagementExchange.h" +#include "qpid/management/ManagementDirectExchange.h" +#include "qpid/management/ManagementTopicExchange.h" #include "qpid/log/Statement.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ProtocolInitiation.h" @@ -234,11 +235,22 @@ Broker::Broker(const Broker::Options& conf) : declareStandardExchange(amq_match, HeadersExchange::typeName); if(conf.enableMgmt) { - exchanges.declare(qpid_management, ManagementExchange::typeName); - Exchange::shared_ptr mExchange = exchanges.get (qpid_management); - Exchange::shared_ptr dExchange = exchanges.get (amq_direct); + exchanges.declare(qpid_management, ManagementTopicExchange::typeName); + Exchange::shared_ptr mExchange = exchanges.get(qpid_management); + Exchange::shared_ptr dExchange = exchanges.get(amq_direct); managementAgent->setExchange(mExchange, dExchange); - boost::dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent(managementAgent.get()); + boost::dynamic_pointer_cast<ManagementTopicExchange>(mExchange)->setManagmentAgent(managementAgent.get(), 1); + + std::string qmfTopic("qmf.default.topic"); + std::string qmfDirect("qmf.default.direct"); + + std::pair<Exchange::shared_ptr, bool> topicPair(exchanges.declare(qmfTopic, ManagementTopicExchange::typeName)); + std::pair<Exchange::shared_ptr, bool> directPair(exchanges.declare(qmfDirect, ManagementDirectExchange::typeName)); + + boost::dynamic_pointer_cast<ManagementDirectExchange>(directPair.first)->setManagmentAgent(managementAgent.get(), 2); + boost::dynamic_pointer_cast<ManagementTopicExchange>(topicPair.first)->setManagmentAgent(managementAgent.get(), 2); + + managementAgent->setExchangeV2(topicPair.first, directPair.first); } else QPID_LOG(info, "Management not enabled"); diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index f4a860fa1e..20fdc4164a 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -24,7 +24,8 @@ #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/HeadersExchange.h" #include "qpid/broker/TopicExchange.h" -#include "qpid/management/ManagementExchange.h" +#include "qpid/management/ManagementDirectExchange.h" +#include "qpid/management/ManagementTopicExchange.h" #include "qpid/framing/reply_exceptions.h" using namespace qpid::broker; @@ -52,8 +53,10 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent, broker)); }else if (type == HeadersExchange::typeName) { exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent, broker)); - }else if (type == ManagementExchange::typeName) { - exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args, parent, broker)); + }else if (type == ManagementDirectExchange::typeName) { + exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker)); + }else if (type == ManagementTopicExchange::typeName) { + exchange = Exchange::shared_ptr(new ManagementTopicExchange(name, durable, args, parent, broker)); }else{ FunctionMap::iterator i = factory.find(type); if (i == factory.end()) { diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index dbe7062a25..918acfe2c4 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -71,11 +71,13 @@ ManagementAgent::~ManagementAgent () Mutex::ScopedLock lock (userLock); // Reset the shared pointers to exchanges. If this is not done now, the exchanges - // will stick around until dExchange and mExchange are implicitely destroyed (long + // will stick around until dExchange and mExchange are implicitly destroyed (long // after this destructor completes). Those exchanges hold references to management // objects that will be invalid. dExchange.reset(); mExchange.reset(); + v2Topic.reset(); + v2Direct.reset(); moveNewObjectsLH(); for (ManagementObjectMap::iterator iter = managementObjects.begin (); @@ -158,13 +160,20 @@ void ManagementAgent::writeData () } } -void ManagementAgent::setExchange (qpid::broker::Exchange::shared_ptr _mexchange, - qpid::broker::Exchange::shared_ptr _dexchange) +void ManagementAgent::setExchange(qpid::broker::Exchange::shared_ptr _mexchange, + qpid::broker::Exchange::shared_ptr _dexchange) { mExchange = _mexchange; dExchange = _dexchange; } +void ManagementAgent::setExchangeV2(qpid::broker::Exchange::shared_ptr _texchange, + qpid::broker::Exchange::shared_ptr _dexchange) +{ + v2Topic = _texchange; + v2Direct = _dexchange; +} + void ManagementAgent::registerClass (const string& packageName, const string& className, uint8_t* md5Sum, diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index 5ea951d8d0..5b2c54f1b8 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -74,9 +74,12 @@ public: /** Called by cluster to suppress management output during update. */ void suppress(bool s) { suppressed = s; } - void setInterval (uint16_t _interval) { interval = _interval; } - void setExchange (qpid::broker::Exchange::shared_ptr mgmtExchange, - qpid::broker::Exchange::shared_ptr directExchange); + void setInterval(uint16_t _interval) { interval = _interval; } + void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange, + qpid::broker::Exchange::shared_ptr directExchange); + void setExchangeV2(qpid::broker::Exchange::shared_ptr topicExchange, + qpid::broker::Exchange::shared_ptr directExchange); + int getMaxThreads () { return threadPoolSize; } QPID_BROKER_EXTERN void registerClass (const std::string& packageName, const std::string& className, @@ -240,6 +243,8 @@ private: qpid::broker::Exchange::shared_ptr mExchange; qpid::broker::Exchange::shared_ptr dExchange; + qpid::broker::Exchange::shared_ptr v2Topic; + qpid::broker::Exchange::shared_ptr v2Direct; std::string dataDir; uint16_t interval; qpid::broker::Broker* broker; diff --git a/cpp/src/qpid/management/ManagementDirectExchange.cpp b/cpp/src/qpid/management/ManagementDirectExchange.cpp new file mode 100644 index 0000000000..0813e30891 --- /dev/null +++ b/cpp/src/qpid/management/ManagementDirectExchange.cpp @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/management/ManagementDirectExchange.h" +#include "qpid/log/Statement.h" +#include <assert.h> + +using namespace qpid::management; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +ManagementDirectExchange::ManagementDirectExchange(const string& _name, Manageable* _parent, Broker* b) : + Exchange (_name, _parent, b), DirectExchange(_name, _parent, b) {} +ManagementDirectExchange::ManagementDirectExchange(const std::string& _name, + bool _durable, + const FieldTable& _args, + Manageable* _parent, Broker* b) : + Exchange (_name, _durable, _args, _parent, b), + DirectExchange(_name, _durable, _args, _parent, b) {} + +void ManagementDirectExchange::route(Deliverable& msg, + const string& routingKey, + const FieldTable* args) +{ + bool routeIt = true; + + // TODO: Intercept messages directed to the embedded agent and send them to the management agent. + + if (routeIt) + DirectExchange::route(msg, routingKey, args); +} + +void ManagementDirectExchange::setManagmentAgent(ManagementAgent* agent, int qv) +{ + managementAgent = agent; + qmfVersion = qv; + assert(qmfVersion == 2); // QMFv1 doesn't use a specialized direct exchange +} + + +ManagementDirectExchange::~ManagementDirectExchange() {} + +const std::string ManagementDirectExchange::typeName("management-direct"); + diff --git a/cpp/src/qpid/management/ManagementDirectExchange.h b/cpp/src/qpid/management/ManagementDirectExchange.h new file mode 100644 index 0000000000..ab691afa70 --- /dev/null +++ b/cpp/src/qpid/management/ManagementDirectExchange.h @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ManagementDirectExchange_ +#define _ManagementDirectExchange_ + +#include "qpid/broker/DirectExchange.h" +#include "qpid/management/ManagementAgent.h" + +namespace qpid { +namespace broker { + +class ManagementDirectExchange : public virtual DirectExchange +{ + private: + management::ManagementAgent* managementAgent; + int qmfVersion; + + public: + static const std::string typeName; + + ManagementDirectExchange(const string& name, Manageable* _parent = 0, Broker* broker = 0); + ManagementDirectExchange(const string& _name, bool _durable, + const qpid::framing::FieldTable& _args, + Manageable* _parent = 0, Broker* broker = 0); + + virtual std::string getType() const { return typeName; } + + virtual void route(Deliverable& msg, + const string& routingKey, + const qpid::framing::FieldTable* args); + + void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion); + + virtual ~ManagementDirectExchange(); +}; + + +} +} + +#endif diff --git a/cpp/src/qpid/management/ManagementExchange.cpp b/cpp/src/qpid/management/ManagementExchange.cpp deleted file mode 100644 index b90bcd87d8..0000000000 --- a/cpp/src/qpid/management/ManagementExchange.cpp +++ /dev/null @@ -1,72 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/management/ManagementExchange.h" -#include "qpid/log/Statement.h" - -using namespace qpid::management; -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; - -ManagementExchange::ManagementExchange (const string& _name, Manageable* _parent, Broker* b) : - Exchange (_name, _parent, b), TopicExchange(_name, _parent, b) {} -ManagementExchange::ManagementExchange (const std::string& _name, - bool _durable, - const FieldTable& _args, - Manageable* _parent, Broker* b) : - Exchange (_name, _durable, _args, _parent, b), - TopicExchange(_name, _durable, _args, _parent, b) {} - -void ManagementExchange::route (Deliverable& msg, - const string& routingKey, - const FieldTable* args) -{ - bool routeIt = true; - - // Intercept management agent commands - if ((routingKey.length() > 6 && - routingKey.substr(0, 6).compare("agent.") == 0) || - (routingKey == "broker")) - routeIt = managementAgent->dispatchCommand(msg, routingKey, args); - - if (routeIt) - TopicExchange::route(msg, routingKey, args); -} - -bool ManagementExchange::bind (Queue::shared_ptr queue, - const string& routingKey, - const qpid::framing::FieldTable* args) -{ - managementAgent->clientAdded(routingKey); - return TopicExchange::bind(queue, routingKey, args); -} - -void ManagementExchange::setManagmentAgent (ManagementAgent* agent) -{ - managementAgent = agent; -} - - -ManagementExchange::~ManagementExchange() {} - -const std::string ManagementExchange::typeName("management"); - diff --git a/cpp/src/qpid/management/ManagementTopicExchange.cpp b/cpp/src/qpid/management/ManagementTopicExchange.cpp new file mode 100644 index 0000000000..98650b3adf --- /dev/null +++ b/cpp/src/qpid/management/ManagementTopicExchange.cpp @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/management/ManagementTopicExchange.h" +#include "qpid/log/Statement.h" + +using namespace qpid::management; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +ManagementTopicExchange::ManagementTopicExchange(const string& _name, Manageable* _parent, Broker* b) : + Exchange (_name, _parent, b), TopicExchange(_name, _parent, b) {} +ManagementTopicExchange::ManagementTopicExchange(const std::string& _name, + bool _durable, + const FieldTable& _args, + Manageable* _parent, Broker* b) : + Exchange (_name, _durable, _args, _parent, b), + TopicExchange(_name, _durable, _args, _parent, b) {} + +void ManagementTopicExchange::route(Deliverable& msg, + const string& routingKey, + const FieldTable* args) +{ + bool routeIt = true; + + // Intercept management agent commands + if (qmfVersion == 1) { + if ((routingKey.length() > 6 && + routingKey.substr(0, 6).compare("agent.") == 0) || + (routingKey == "broker")) + routeIt = managementAgent->dispatchCommand(msg, routingKey, args); + } + + if (routeIt) + TopicExchange::route(msg, routingKey, args); +} + +bool ManagementTopicExchange::bind(Queue::shared_ptr queue, + const string& routingKey, + const qpid::framing::FieldTable* args) +{ + if (qmfVersion == 1) + managementAgent->clientAdded(routingKey); + return TopicExchange::bind(queue, routingKey, args); +} + +void ManagementTopicExchange::setManagmentAgent(ManagementAgent* agent, int qv) +{ + managementAgent = agent; + qmfVersion = qv; +} + + +ManagementTopicExchange::~ManagementTopicExchange() {} + +const std::string ManagementTopicExchange::typeName("management-topic"); + diff --git a/cpp/src/qpid/management/ManagementExchange.h b/cpp/src/qpid/management/ManagementTopicExchange.h index 3fa4039af7..ece1c88ecf 100644 --- a/cpp/src/qpid/management/ManagementExchange.h +++ b/cpp/src/qpid/management/ManagementTopicExchange.h @@ -18,8 +18,8 @@ * under the License. * */ -#ifndef _ManagementExchange_ -#define _ManagementExchange_ +#ifndef _ManagementTopicExchange_ +#define _ManagementTopicExchange_ #include "qpid/broker/TopicExchange.h" #include "qpid/management/ManagementAgent.h" @@ -27,32 +27,33 @@ namespace qpid { namespace broker { -class ManagementExchange : public virtual TopicExchange +class ManagementTopicExchange : public virtual TopicExchange { private: management::ManagementAgent* managementAgent; + int qmfVersion; public: static const std::string typeName; - ManagementExchange (const string& name, Manageable* _parent = 0, Broker* broker = 0); - ManagementExchange (const string& _name, bool _durable, - const qpid::framing::FieldTable& _args, - Manageable* _parent = 0, Broker* broker = 0); + ManagementTopicExchange(const string& name, Manageable* _parent = 0, Broker* broker = 0); + ManagementTopicExchange(const string& _name, bool _durable, + const qpid::framing::FieldTable& _args, + Manageable* _parent = 0, Broker* broker = 0); virtual std::string getType() const { return typeName; } - virtual void route (Deliverable& msg, - const string& routingKey, - const qpid::framing::FieldTable* args); - - virtual bool bind (Queue::shared_ptr queue, + virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args); - void setManagmentAgent (management::ManagementAgent* agent); + virtual bool bind(Queue::shared_ptr queue, + const string& routingKey, + const qpid::framing::FieldTable* args); + + void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion); - virtual ~ManagementExchange(); + virtual ~ManagementTopicExchange(); }; |