diff options
author | Ted Ross <tross@apache.org> | 2010-03-08 03:46:44 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2010-03-08 03:46:44 +0000 |
commit | 82e5138dbf07c58944f18cf30824a0996b4d1824 (patch) | |
tree | e8894758c5bf20eb3334fecf099bd0334d661d63 /cpp | |
parent | 5198e3474da92d82799446a4ccd183aa83d9a839 (diff) | |
download | qpid-python-82e5138dbf07c58944f18cf30824a0996b4d1824.tar.gz |
Added hooks in the broker for QMFv2 management of the broker.
Now both DirectExchange and TopicExchange have been subclassed so messages can be
redirected to the embedded management agent (in QMFv1, only the topic exchange was
subclassed this way).
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@920189 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/Makefile.am | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 22 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ExchangeRegistry.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 11 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementDirectExchange.cpp | 63 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementDirectExchange.h | 59 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementExchange.cpp | 72 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementTopicExchange.cpp | 76 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementTopicExchange.h (renamed from cpp/src/qpid/management/ManagementExchange.h) | 29 |
10 files changed, 260 insertions, 102 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 4fe5d7b85b..281814a828 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -636,8 +636,10 @@ libqpidbroker_la_SOURCES = \ qpid/management/IdAllocator.h \ qpid/management/ManagementAgent.cpp \ qpid/management/ManagementAgent.h \ - qpid/management/ManagementExchange.cpp \ - qpid/management/ManagementExchange.h \ + qpid/management/ManagementDirectExchange.cpp \ + qpid/management/ManagementDirectExchange.h \ + qpid/management/ManagementTopicExchange.cpp \ + qpid/management/ManagementTopicExchange.h \ qpid/sys/TCPIOPlugin.cpp diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index cbccca6eea..d94f228734 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -35,7 +35,8 @@ #include "qmf/org/apache/qpid/broker/Package.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h" -#include "qpid/management/ManagementExchange.h" +#include "qpid/management/ManagementDirectExchange.h" +#include "qpid/management/ManagementTopicExchange.h" #include "qpid/log/Statement.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ProtocolInitiation.h" @@ -234,11 +235,22 @@ Broker::Broker(const Broker::Options& conf) : declareStandardExchange(amq_match, HeadersExchange::typeName); if(conf.enableMgmt) { - exchanges.declare(qpid_management, ManagementExchange::typeName); - Exchange::shared_ptr mExchange = exchanges.get (qpid_management); - Exchange::shared_ptr dExchange = exchanges.get (amq_direct); + exchanges.declare(qpid_management, ManagementTopicExchange::typeName); + Exchange::shared_ptr mExchange = exchanges.get(qpid_management); + Exchange::shared_ptr dExchange = exchanges.get(amq_direct); managementAgent->setExchange(mExchange, dExchange); - boost::dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent(managementAgent.get()); + boost::dynamic_pointer_cast<ManagementTopicExchange>(mExchange)->setManagmentAgent(managementAgent.get(), 1); + + std::string qmfTopic("qmf.default.topic"); + std::string qmfDirect("qmf.default.direct"); + + std::pair<Exchange::shared_ptr, bool> topicPair(exchanges.declare(qmfTopic, ManagementTopicExchange::typeName)); + std::pair<Exchange::shared_ptr, bool> directPair(exchanges.declare(qmfDirect, ManagementDirectExchange::typeName)); + + boost::dynamic_pointer_cast<ManagementDirectExchange>(directPair.first)->setManagmentAgent(managementAgent.get(), 2); + boost::dynamic_pointer_cast<ManagementTopicExchange>(topicPair.first)->setManagmentAgent(managementAgent.get(), 2); + + managementAgent->setExchangeV2(topicPair.first, directPair.first); } else QPID_LOG(info, "Management not enabled"); diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index f4a860fa1e..20fdc4164a 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -24,7 +24,8 @@ #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/HeadersExchange.h" #include "qpid/broker/TopicExchange.h" -#include "qpid/management/ManagementExchange.h" +#include "qpid/management/ManagementDirectExchange.h" +#include "qpid/management/ManagementTopicExchange.h" #include "qpid/framing/reply_exceptions.h" using namespace qpid::broker; @@ -52,8 +53,10 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent, broker)); }else if (type == HeadersExchange::typeName) { exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent, broker)); - }else if (type == ManagementExchange::typeName) { - exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args, parent, broker)); + }else if (type == ManagementDirectExchange::typeName) { + exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker)); + }else if (type == ManagementTopicExchange::typeName) { + exchange = Exchange::shared_ptr(new ManagementTopicExchange(name, durable, args, parent, broker)); }else{ FunctionMap::iterator i = factory.find(type); if (i == factory.end()) { diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index dbe7062a25..918acfe2c4 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -71,11 +71,13 @@ ManagementAgent::~ManagementAgent () Mutex::ScopedLock lock (userLock); // Reset the shared pointers to exchanges. If this is not done now, the exchanges - // will stick around until dExchange and mExchange are implicitely destroyed (long + // will stick around until dExchange and mExchange are implicitly destroyed (long // after this destructor completes). Those exchanges hold references to management // objects that will be invalid. dExchange.reset(); mExchange.reset(); + v2Topic.reset(); + v2Direct.reset(); moveNewObjectsLH(); for (ManagementObjectMap::iterator iter = managementObjects.begin (); @@ -158,13 +160,20 @@ void ManagementAgent::writeData () } } -void ManagementAgent::setExchange (qpid::broker::Exchange::shared_ptr _mexchange, - qpid::broker::Exchange::shared_ptr _dexchange) +void ManagementAgent::setExchange(qpid::broker::Exchange::shared_ptr _mexchange, + qpid::broker::Exchange::shared_ptr _dexchange) { mExchange = _mexchange; dExchange = _dexchange; } +void ManagementAgent::setExchangeV2(qpid::broker::Exchange::shared_ptr _texchange, + qpid::broker::Exchange::shared_ptr _dexchange) +{ + v2Topic = _texchange; + v2Direct = _dexchange; +} + void ManagementAgent::registerClass (const string& packageName, const string& className, uint8_t* md5Sum, diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index 5ea951d8d0..5b2c54f1b8 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -74,9 +74,12 @@ public: /** Called by cluster to suppress management output during update. */ void suppress(bool s) { suppressed = s; } - void setInterval (uint16_t _interval) { interval = _interval; } - void setExchange (qpid::broker::Exchange::shared_ptr mgmtExchange, - qpid::broker::Exchange::shared_ptr directExchange); + void setInterval(uint16_t _interval) { interval = _interval; } + void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange, + qpid::broker::Exchange::shared_ptr directExchange); + void setExchangeV2(qpid::broker::Exchange::shared_ptr topicExchange, + qpid::broker::Exchange::shared_ptr directExchange); + int getMaxThreads () { return threadPoolSize; } QPID_BROKER_EXTERN void registerClass (const std::string& packageName, const std::string& className, @@ -240,6 +243,8 @@ private: qpid::broker::Exchange::shared_ptr mExchange; qpid::broker::Exchange::shared_ptr dExchange; + qpid::broker::Exchange::shared_ptr v2Topic; + qpid::broker::Exchange::shared_ptr v2Direct; std::string dataDir; uint16_t interval; qpid::broker::Broker* broker; diff --git a/cpp/src/qpid/management/ManagementDirectExchange.cpp b/cpp/src/qpid/management/ManagementDirectExchange.cpp new file mode 100644 index 0000000000..0813e30891 --- /dev/null +++ b/cpp/src/qpid/management/ManagementDirectExchange.cpp @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/management/ManagementDirectExchange.h" +#include "qpid/log/Statement.h" +#include <assert.h> + +using namespace qpid::management; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +ManagementDirectExchange::ManagementDirectExchange(const string& _name, Manageable* _parent, Broker* b) : + Exchange (_name, _parent, b), DirectExchange(_name, _parent, b) {} +ManagementDirectExchange::ManagementDirectExchange(const std::string& _name, + bool _durable, + const FieldTable& _args, + Manageable* _parent, Broker* b) : + Exchange (_name, _durable, _args, _parent, b), + DirectExchange(_name, _durable, _args, _parent, b) {} + +void ManagementDirectExchange::route(Deliverable& msg, + const string& routingKey, + const FieldTable* args) +{ + bool routeIt = true; + + // TODO: Intercept messages directed to the embedded agent and send them to the management agent. + + if (routeIt) + DirectExchange::route(msg, routingKey, args); +} + +void ManagementDirectExchange::setManagmentAgent(ManagementAgent* agent, int qv) +{ + managementAgent = agent; + qmfVersion = qv; + assert(qmfVersion == 2); // QMFv1 doesn't use a specialized direct exchange +} + + +ManagementDirectExchange::~ManagementDirectExchange() {} + +const std::string ManagementDirectExchange::typeName("management-direct"); + diff --git a/cpp/src/qpid/management/ManagementDirectExchange.h b/cpp/src/qpid/management/ManagementDirectExchange.h new file mode 100644 index 0000000000..ab691afa70 --- /dev/null +++ b/cpp/src/qpid/management/ManagementDirectExchange.h @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ManagementDirectExchange_ +#define _ManagementDirectExchange_ + +#include "qpid/broker/DirectExchange.h" +#include "qpid/management/ManagementAgent.h" + +namespace qpid { +namespace broker { + +class ManagementDirectExchange : public virtual DirectExchange +{ + private: + management::ManagementAgent* managementAgent; + int qmfVersion; + + public: + static const std::string typeName; + + ManagementDirectExchange(const string& name, Manageable* _parent = 0, Broker* broker = 0); + ManagementDirectExchange(const string& _name, bool _durable, + const qpid::framing::FieldTable& _args, + Manageable* _parent = 0, Broker* broker = 0); + + virtual std::string getType() const { return typeName; } + + virtual void route(Deliverable& msg, + const string& routingKey, + const qpid::framing::FieldTable* args); + + void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion); + + virtual ~ManagementDirectExchange(); +}; + + +} +} + +#endif diff --git a/cpp/src/qpid/management/ManagementExchange.cpp b/cpp/src/qpid/management/ManagementExchange.cpp deleted file mode 100644 index b90bcd87d8..0000000000 --- a/cpp/src/qpid/management/ManagementExchange.cpp +++ /dev/null @@ -1,72 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/management/ManagementExchange.h" -#include "qpid/log/Statement.h" - -using namespace qpid::management; -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; - -ManagementExchange::ManagementExchange (const string& _name, Manageable* _parent, Broker* b) : - Exchange (_name, _parent, b), TopicExchange(_name, _parent, b) {} -ManagementExchange::ManagementExchange (const std::string& _name, - bool _durable, - const FieldTable& _args, - Manageable* _parent, Broker* b) : - Exchange (_name, _durable, _args, _parent, b), - TopicExchange(_name, _durable, _args, _parent, b) {} - -void ManagementExchange::route (Deliverable& msg, - const string& routingKey, - const FieldTable* args) -{ - bool routeIt = true; - - // Intercept management agent commands - if ((routingKey.length() > 6 && - routingKey.substr(0, 6).compare("agent.") == 0) || - (routingKey == "broker")) - routeIt = managementAgent->dispatchCommand(msg, routingKey, args); - - if (routeIt) - TopicExchange::route(msg, routingKey, args); -} - -bool ManagementExchange::bind (Queue::shared_ptr queue, - const string& routingKey, - const qpid::framing::FieldTable* args) -{ - managementAgent->clientAdded(routingKey); - return TopicExchange::bind(queue, routingKey, args); -} - -void ManagementExchange::setManagmentAgent (ManagementAgent* agent) -{ - managementAgent = agent; -} - - -ManagementExchange::~ManagementExchange() {} - -const std::string ManagementExchange::typeName("management"); - diff --git a/cpp/src/qpid/management/ManagementTopicExchange.cpp b/cpp/src/qpid/management/ManagementTopicExchange.cpp new file mode 100644 index 0000000000..98650b3adf --- /dev/null +++ b/cpp/src/qpid/management/ManagementTopicExchange.cpp @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/management/ManagementTopicExchange.h" +#include "qpid/log/Statement.h" + +using namespace qpid::management; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +ManagementTopicExchange::ManagementTopicExchange(const string& _name, Manageable* _parent, Broker* b) : + Exchange (_name, _parent, b), TopicExchange(_name, _parent, b) {} +ManagementTopicExchange::ManagementTopicExchange(const std::string& _name, + bool _durable, + const FieldTable& _args, + Manageable* _parent, Broker* b) : + Exchange (_name, _durable, _args, _parent, b), + TopicExchange(_name, _durable, _args, _parent, b) {} + +void ManagementTopicExchange::route(Deliverable& msg, + const string& routingKey, + const FieldTable* args) +{ + bool routeIt = true; + + // Intercept management agent commands + if (qmfVersion == 1) { + if ((routingKey.length() > 6 && + routingKey.substr(0, 6).compare("agent.") == 0) || + (routingKey == "broker")) + routeIt = managementAgent->dispatchCommand(msg, routingKey, args); + } + + if (routeIt) + TopicExchange::route(msg, routingKey, args); +} + +bool ManagementTopicExchange::bind(Queue::shared_ptr queue, + const string& routingKey, + const qpid::framing::FieldTable* args) +{ + if (qmfVersion == 1) + managementAgent->clientAdded(routingKey); + return TopicExchange::bind(queue, routingKey, args); +} + +void ManagementTopicExchange::setManagmentAgent(ManagementAgent* agent, int qv) +{ + managementAgent = agent; + qmfVersion = qv; +} + + +ManagementTopicExchange::~ManagementTopicExchange() {} + +const std::string ManagementTopicExchange::typeName("management-topic"); + diff --git a/cpp/src/qpid/management/ManagementExchange.h b/cpp/src/qpid/management/ManagementTopicExchange.h index 3fa4039af7..ece1c88ecf 100644 --- a/cpp/src/qpid/management/ManagementExchange.h +++ b/cpp/src/qpid/management/ManagementTopicExchange.h @@ -18,8 +18,8 @@ * under the License. * */ -#ifndef _ManagementExchange_ -#define _ManagementExchange_ +#ifndef _ManagementTopicExchange_ +#define _ManagementTopicExchange_ #include "qpid/broker/TopicExchange.h" #include "qpid/management/ManagementAgent.h" @@ -27,32 +27,33 @@ namespace qpid { namespace broker { -class ManagementExchange : public virtual TopicExchange +class ManagementTopicExchange : public virtual TopicExchange { private: management::ManagementAgent* managementAgent; + int qmfVersion; public: static const std::string typeName; - ManagementExchange (const string& name, Manageable* _parent = 0, Broker* broker = 0); - ManagementExchange (const string& _name, bool _durable, - const qpid::framing::FieldTable& _args, - Manageable* _parent = 0, Broker* broker = 0); + ManagementTopicExchange(const string& name, Manageable* _parent = 0, Broker* broker = 0); + ManagementTopicExchange(const string& _name, bool _durable, + const qpid::framing::FieldTable& _args, + Manageable* _parent = 0, Broker* broker = 0); virtual std::string getType() const { return typeName; } - virtual void route (Deliverable& msg, - const string& routingKey, - const qpid::framing::FieldTable* args); - - virtual bool bind (Queue::shared_ptr queue, + virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args); - void setManagmentAgent (management::ManagementAgent* agent); + virtual bool bind(Queue::shared_ptr queue, + const string& routingKey, + const qpid::framing::FieldTable* args); + + void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion); - virtual ~ManagementExchange(); + virtual ~ManagementTopicExchange(); }; |