summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am6
-rw-r--r--cpp/src/qpid/broker/Broker.cpp22
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.cpp9
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp15
-rw-r--r--cpp/src/qpid/management/ManagementAgent.h11
-rw-r--r--cpp/src/qpid/management/ManagementDirectExchange.cpp63
-rw-r--r--cpp/src/qpid/management/ManagementDirectExchange.h59
-rw-r--r--cpp/src/qpid/management/ManagementExchange.cpp72
-rw-r--r--cpp/src/qpid/management/ManagementTopicExchange.cpp76
-rw-r--r--cpp/src/qpid/management/ManagementTopicExchange.h (renamed from cpp/src/qpid/management/ManagementExchange.h)29
10 files changed, 260 insertions, 102 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 4fe5d7b85b..281814a828 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -636,8 +636,10 @@ libqpidbroker_la_SOURCES = \
qpid/management/IdAllocator.h \
qpid/management/ManagementAgent.cpp \
qpid/management/ManagementAgent.h \
- qpid/management/ManagementExchange.cpp \
- qpid/management/ManagementExchange.h \
+ qpid/management/ManagementDirectExchange.cpp \
+ qpid/management/ManagementDirectExchange.h \
+ qpid/management/ManagementTopicExchange.cpp \
+ qpid/management/ManagementTopicExchange.h \
qpid/sys/TCPIOPlugin.cpp
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index cbccca6eea..d94f228734 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -35,7 +35,8 @@
#include "qmf/org/apache/qpid/broker/Package.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
-#include "qpid/management/ManagementExchange.h"
+#include "qpid/management/ManagementDirectExchange.h"
+#include "qpid/management/ManagementTopicExchange.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ProtocolInitiation.h"
@@ -234,11 +235,22 @@ Broker::Broker(const Broker::Options& conf) :
declareStandardExchange(amq_match, HeadersExchange::typeName);
if(conf.enableMgmt) {
- exchanges.declare(qpid_management, ManagementExchange::typeName);
- Exchange::shared_ptr mExchange = exchanges.get (qpid_management);
- Exchange::shared_ptr dExchange = exchanges.get (amq_direct);
+ exchanges.declare(qpid_management, ManagementTopicExchange::typeName);
+ Exchange::shared_ptr mExchange = exchanges.get(qpid_management);
+ Exchange::shared_ptr dExchange = exchanges.get(amq_direct);
managementAgent->setExchange(mExchange, dExchange);
- boost::dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent(managementAgent.get());
+ boost::dynamic_pointer_cast<ManagementTopicExchange>(mExchange)->setManagmentAgent(managementAgent.get(), 1);
+
+ std::string qmfTopic("qmf.default.topic");
+ std::string qmfDirect("qmf.default.direct");
+
+ std::pair<Exchange::shared_ptr, bool> topicPair(exchanges.declare(qmfTopic, ManagementTopicExchange::typeName));
+ std::pair<Exchange::shared_ptr, bool> directPair(exchanges.declare(qmfDirect, ManagementDirectExchange::typeName));
+
+ boost::dynamic_pointer_cast<ManagementDirectExchange>(directPair.first)->setManagmentAgent(managementAgent.get(), 2);
+ boost::dynamic_pointer_cast<ManagementTopicExchange>(topicPair.first)->setManagmentAgent(managementAgent.get(), 2);
+
+ managementAgent->setExchangeV2(topicPair.first, directPair.first);
}
else
QPID_LOG(info, "Management not enabled");
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp
index f4a860fa1e..20fdc4164a 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -24,7 +24,8 @@
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
#include "qpid/broker/TopicExchange.h"
-#include "qpid/management/ManagementExchange.h"
+#include "qpid/management/ManagementDirectExchange.h"
+#include "qpid/management/ManagementTopicExchange.h"
#include "qpid/framing/reply_exceptions.h"
using namespace qpid::broker;
@@ -52,8 +53,10 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c
exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent, broker));
}else if (type == HeadersExchange::typeName) {
exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent, broker));
- }else if (type == ManagementExchange::typeName) {
- exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args, parent, broker));
+ }else if (type == ManagementDirectExchange::typeName) {
+ exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker));
+ }else if (type == ManagementTopicExchange::typeName) {
+ exchange = Exchange::shared_ptr(new ManagementTopicExchange(name, durable, args, parent, broker));
}else{
FunctionMap::iterator i = factory.find(type);
if (i == factory.end()) {
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index dbe7062a25..918acfe2c4 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -71,11 +71,13 @@ ManagementAgent::~ManagementAgent ()
Mutex::ScopedLock lock (userLock);
// Reset the shared pointers to exchanges. If this is not done now, the exchanges
- // will stick around until dExchange and mExchange are implicitely destroyed (long
+ // will stick around until dExchange and mExchange are implicitly destroyed (long
// after this destructor completes). Those exchanges hold references to management
// objects that will be invalid.
dExchange.reset();
mExchange.reset();
+ v2Topic.reset();
+ v2Direct.reset();
moveNewObjectsLH();
for (ManagementObjectMap::iterator iter = managementObjects.begin ();
@@ -158,13 +160,20 @@ void ManagementAgent::writeData ()
}
}
-void ManagementAgent::setExchange (qpid::broker::Exchange::shared_ptr _mexchange,
- qpid::broker::Exchange::shared_ptr _dexchange)
+void ManagementAgent::setExchange(qpid::broker::Exchange::shared_ptr _mexchange,
+ qpid::broker::Exchange::shared_ptr _dexchange)
{
mExchange = _mexchange;
dExchange = _dexchange;
}
+void ManagementAgent::setExchangeV2(qpid::broker::Exchange::shared_ptr _texchange,
+ qpid::broker::Exchange::shared_ptr _dexchange)
+{
+ v2Topic = _texchange;
+ v2Direct = _dexchange;
+}
+
void ManagementAgent::registerClass (const string& packageName,
const string& className,
uint8_t* md5Sum,
diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h
index 5ea951d8d0..5b2c54f1b8 100644
--- a/cpp/src/qpid/management/ManagementAgent.h
+++ b/cpp/src/qpid/management/ManagementAgent.h
@@ -74,9 +74,12 @@ public:
/** Called by cluster to suppress management output during update. */
void suppress(bool s) { suppressed = s; }
- void setInterval (uint16_t _interval) { interval = _interval; }
- void setExchange (qpid::broker::Exchange::shared_ptr mgmtExchange,
- qpid::broker::Exchange::shared_ptr directExchange);
+ void setInterval(uint16_t _interval) { interval = _interval; }
+ void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange,
+ qpid::broker::Exchange::shared_ptr directExchange);
+ void setExchangeV2(qpid::broker::Exchange::shared_ptr topicExchange,
+ qpid::broker::Exchange::shared_ptr directExchange);
+
int getMaxThreads () { return threadPoolSize; }
QPID_BROKER_EXTERN void registerClass (const std::string& packageName,
const std::string& className,
@@ -240,6 +243,8 @@ private:
qpid::broker::Exchange::shared_ptr mExchange;
qpid::broker::Exchange::shared_ptr dExchange;
+ qpid::broker::Exchange::shared_ptr v2Topic;
+ qpid::broker::Exchange::shared_ptr v2Direct;
std::string dataDir;
uint16_t interval;
qpid::broker::Broker* broker;
diff --git a/cpp/src/qpid/management/ManagementDirectExchange.cpp b/cpp/src/qpid/management/ManagementDirectExchange.cpp
new file mode 100644
index 0000000000..0813e30891
--- /dev/null
+++ b/cpp/src/qpid/management/ManagementDirectExchange.cpp
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/management/ManagementDirectExchange.h"
+#include "qpid/log/Statement.h"
+#include <assert.h>
+
+using namespace qpid::management;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+ManagementDirectExchange::ManagementDirectExchange(const string& _name, Manageable* _parent, Broker* b) :
+ Exchange (_name, _parent, b), DirectExchange(_name, _parent, b) {}
+ManagementDirectExchange::ManagementDirectExchange(const std::string& _name,
+ bool _durable,
+ const FieldTable& _args,
+ Manageable* _parent, Broker* b) :
+ Exchange (_name, _durable, _args, _parent, b),
+ DirectExchange(_name, _durable, _args, _parent, b) {}
+
+void ManagementDirectExchange::route(Deliverable& msg,
+ const string& routingKey,
+ const FieldTable* args)
+{
+ bool routeIt = true;
+
+ // TODO: Intercept messages directed to the embedded agent and send them to the management agent.
+
+ if (routeIt)
+ DirectExchange::route(msg, routingKey, args);
+}
+
+void ManagementDirectExchange::setManagmentAgent(ManagementAgent* agent, int qv)
+{
+ managementAgent = agent;
+ qmfVersion = qv;
+ assert(qmfVersion == 2); // QMFv1 doesn't use a specialized direct exchange
+}
+
+
+ManagementDirectExchange::~ManagementDirectExchange() {}
+
+const std::string ManagementDirectExchange::typeName("management-direct");
+
diff --git a/cpp/src/qpid/management/ManagementDirectExchange.h b/cpp/src/qpid/management/ManagementDirectExchange.h
new file mode 100644
index 0000000000..ab691afa70
--- /dev/null
+++ b/cpp/src/qpid/management/ManagementDirectExchange.h
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ManagementDirectExchange_
+#define _ManagementDirectExchange_
+
+#include "qpid/broker/DirectExchange.h"
+#include "qpid/management/ManagementAgent.h"
+
+namespace qpid {
+namespace broker {
+
+class ManagementDirectExchange : public virtual DirectExchange
+{
+ private:
+ management::ManagementAgent* managementAgent;
+ int qmfVersion;
+
+ public:
+ static const std::string typeName;
+
+ ManagementDirectExchange(const string& name, Manageable* _parent = 0, Broker* broker = 0);
+ ManagementDirectExchange(const string& _name, bool _durable,
+ const qpid::framing::FieldTable& _args,
+ Manageable* _parent = 0, Broker* broker = 0);
+
+ virtual std::string getType() const { return typeName; }
+
+ virtual void route(Deliverable& msg,
+ const string& routingKey,
+ const qpid::framing::FieldTable* args);
+
+ void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion);
+
+ virtual ~ManagementDirectExchange();
+};
+
+
+}
+}
+
+#endif
diff --git a/cpp/src/qpid/management/ManagementExchange.cpp b/cpp/src/qpid/management/ManagementExchange.cpp
deleted file mode 100644
index b90bcd87d8..0000000000
--- a/cpp/src/qpid/management/ManagementExchange.cpp
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/management/ManagementExchange.h"
-#include "qpid/log/Statement.h"
-
-using namespace qpid::management;
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-ManagementExchange::ManagementExchange (const string& _name, Manageable* _parent, Broker* b) :
- Exchange (_name, _parent, b), TopicExchange(_name, _parent, b) {}
-ManagementExchange::ManagementExchange (const std::string& _name,
- bool _durable,
- const FieldTable& _args,
- Manageable* _parent, Broker* b) :
- Exchange (_name, _durable, _args, _parent, b),
- TopicExchange(_name, _durable, _args, _parent, b) {}
-
-void ManagementExchange::route (Deliverable& msg,
- const string& routingKey,
- const FieldTable* args)
-{
- bool routeIt = true;
-
- // Intercept management agent commands
- if ((routingKey.length() > 6 &&
- routingKey.substr(0, 6).compare("agent.") == 0) ||
- (routingKey == "broker"))
- routeIt = managementAgent->dispatchCommand(msg, routingKey, args);
-
- if (routeIt)
- TopicExchange::route(msg, routingKey, args);
-}
-
-bool ManagementExchange::bind (Queue::shared_ptr queue,
- const string& routingKey,
- const qpid::framing::FieldTable* args)
-{
- managementAgent->clientAdded(routingKey);
- return TopicExchange::bind(queue, routingKey, args);
-}
-
-void ManagementExchange::setManagmentAgent (ManagementAgent* agent)
-{
- managementAgent = agent;
-}
-
-
-ManagementExchange::~ManagementExchange() {}
-
-const std::string ManagementExchange::typeName("management");
-
diff --git a/cpp/src/qpid/management/ManagementTopicExchange.cpp b/cpp/src/qpid/management/ManagementTopicExchange.cpp
new file mode 100644
index 0000000000..98650b3adf
--- /dev/null
+++ b/cpp/src/qpid/management/ManagementTopicExchange.cpp
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/management/ManagementTopicExchange.h"
+#include "qpid/log/Statement.h"
+
+using namespace qpid::management;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+ManagementTopicExchange::ManagementTopicExchange(const string& _name, Manageable* _parent, Broker* b) :
+ Exchange (_name, _parent, b), TopicExchange(_name, _parent, b) {}
+ManagementTopicExchange::ManagementTopicExchange(const std::string& _name,
+ bool _durable,
+ const FieldTable& _args,
+ Manageable* _parent, Broker* b) :
+ Exchange (_name, _durable, _args, _parent, b),
+ TopicExchange(_name, _durable, _args, _parent, b) {}
+
+void ManagementTopicExchange::route(Deliverable& msg,
+ const string& routingKey,
+ const FieldTable* args)
+{
+ bool routeIt = true;
+
+ // Intercept management agent commands
+ if (qmfVersion == 1) {
+ if ((routingKey.length() > 6 &&
+ routingKey.substr(0, 6).compare("agent.") == 0) ||
+ (routingKey == "broker"))
+ routeIt = managementAgent->dispatchCommand(msg, routingKey, args);
+ }
+
+ if (routeIt)
+ TopicExchange::route(msg, routingKey, args);
+}
+
+bool ManagementTopicExchange::bind(Queue::shared_ptr queue,
+ const string& routingKey,
+ const qpid::framing::FieldTable* args)
+{
+ if (qmfVersion == 1)
+ managementAgent->clientAdded(routingKey);
+ return TopicExchange::bind(queue, routingKey, args);
+}
+
+void ManagementTopicExchange::setManagmentAgent(ManagementAgent* agent, int qv)
+{
+ managementAgent = agent;
+ qmfVersion = qv;
+}
+
+
+ManagementTopicExchange::~ManagementTopicExchange() {}
+
+const std::string ManagementTopicExchange::typeName("management-topic");
+
diff --git a/cpp/src/qpid/management/ManagementExchange.h b/cpp/src/qpid/management/ManagementTopicExchange.h
index 3fa4039af7..ece1c88ecf 100644
--- a/cpp/src/qpid/management/ManagementExchange.h
+++ b/cpp/src/qpid/management/ManagementTopicExchange.h
@@ -18,8 +18,8 @@
* under the License.
*
*/
-#ifndef _ManagementExchange_
-#define _ManagementExchange_
+#ifndef _ManagementTopicExchange_
+#define _ManagementTopicExchange_
#include "qpid/broker/TopicExchange.h"
#include "qpid/management/ManagementAgent.h"
@@ -27,32 +27,33 @@
namespace qpid {
namespace broker {
-class ManagementExchange : public virtual TopicExchange
+class ManagementTopicExchange : public virtual TopicExchange
{
private:
management::ManagementAgent* managementAgent;
+ int qmfVersion;
public:
static const std::string typeName;
- ManagementExchange (const string& name, Manageable* _parent = 0, Broker* broker = 0);
- ManagementExchange (const string& _name, bool _durable,
- const qpid::framing::FieldTable& _args,
- Manageable* _parent = 0, Broker* broker = 0);
+ ManagementTopicExchange(const string& name, Manageable* _parent = 0, Broker* broker = 0);
+ ManagementTopicExchange(const string& _name, bool _durable,
+ const qpid::framing::FieldTable& _args,
+ Manageable* _parent = 0, Broker* broker = 0);
virtual std::string getType() const { return typeName; }
- virtual void route (Deliverable& msg,
- const string& routingKey,
- const qpid::framing::FieldTable* args);
-
- virtual bool bind (Queue::shared_ptr queue,
+ virtual void route(Deliverable& msg,
const string& routingKey,
const qpid::framing::FieldTable* args);
- void setManagmentAgent (management::ManagementAgent* agent);
+ virtual bool bind(Queue::shared_ptr queue,
+ const string& routingKey,
+ const qpid::framing::FieldTable* args);
+
+ void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion);
- virtual ~ManagementExchange();
+ virtual ~ManagementTopicExchange();
};