summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJonathan Robie <jonathan@apache.org>2010-10-08 17:11:06 +0000
committerJonathan Robie <jonathan@apache.org>2010-10-08 17:11:06 +0000
commit2f02708cbfc29669ffa64ce44f8bb9ebff01906b (patch)
treedd3d019f4142d8e2c54b01d4015f2f8f5d9cdb9c
parent53eff695b3bdac45260295b519a4274c9ad582a3 (diff)
downloadqpid-python-2f02708cbfc29669ffa64ce44f8bb9ebff01906b.tar.gz
Registers the amq.failover exchange in the management exchange.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1005908 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/FailoverExchange.cpp17
-rw-r--r--qpid/cpp/src/qpid/cluster/FailoverExchange.h4
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py12
5 files changed, 23 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
index 20fdc4164a..8122e5c2d9 100644
--- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -45,7 +45,7 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c
if (i == exchanges.end()) {
Exchange::shared_ptr exchange;
- if(type == TopicExchange::typeName){
+ if (type == TopicExchange::typeName){
exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent, broker));
}else if(type == DirectExchange::typeName){
exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent, broker));
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 5da1579cb7..acf4206629 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -282,7 +282,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
broker.setClusterTimer(std::auto_ptr<sys::Timer>(timer));
// Failover exchange provides membership updates to clients.
- failoverExchange.reset(new FailoverExchange(this));
+ failoverExchange.reset(new FailoverExchange(broker.GetVhostObject(), &broker));
broker.getExchanges().registerExchange(failoverExchange);
// Update exchange is used during updates to replicate messages
diff --git a/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp b/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp
index 24518dbe9f..84232dac1b 100644
--- a/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp
+++ b/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp
@@ -38,11 +38,11 @@ using namespace std;
using namespace broker;
using namespace framing;
-const string FailoverExchange::TYPE_NAME("amq.failover");
-
-FailoverExchange::FailoverExchange(management::Manageable* parent) : Exchange(TYPE_NAME, parent) {
+const string FailoverExchange::typeName("amq.failover");
+
+FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b) : Exchange(typeName, parent, b ) {
if (mgmtExchange != 0)
- mgmtExchange->set_type(TYPE_NAME);
+ mgmtExchange->set_type(typeName);
}
void FailoverExchange::setUrls(const vector<Url>& u) {
@@ -58,7 +58,7 @@ void FailoverExchange::updateUrls(const vector<Url>& u) {
boost::bind(&FailoverExchange::sendUpdate, this, _1));
}
-string FailoverExchange::getType() const { return TYPE_NAME; }
+string FailoverExchange::getType() const { return typeName; }
bool FailoverExchange::bind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) {
Lock l(lock);
@@ -77,7 +77,7 @@ bool FailoverExchange::isBound(Queue::shared_ptr queue, const string* const, con
}
void FailoverExchange::route(Deliverable&, const string& , const framing::FieldTable* ) {
- QPID_LOG(warning, "Message received by exchange " << TYPE_NAME << " ignoring");
+ QPID_LOG(warning, "Message received by exchange " << typeName << " ignoring");
}
void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) {
@@ -88,16 +88,17 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) {
array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str())));
const ProtocolVersion v;
boost::intrusive_ptr<Message> msg(new Message);
- AMQFrame command(MessageTransferBody(v, TYPE_NAME, 1, 0));
+ AMQFrame command(MessageTransferBody(v, typeName, 1, 0));
command.setLastSegment(false);
msg->getFrames().append(command);
AMQHeaderBody header;
header.get<MessageProperties>(true)->setContentLength(0);
- header.get<MessageProperties>(true)->getApplicationHeaders().setArray(TYPE_NAME, array);
+ header.get<MessageProperties>(true)->getApplicationHeaders().setArray(typeName, array);
AMQFrame headerFrame(header);
headerFrame.setFirstSegment(false);
msg->getFrames().append(headerFrame);
DeliverableMessage(msg).deliverTo(queue);
}
+
}} // namespace cluster
diff --git a/qpid/cpp/src/qpid/cluster/FailoverExchange.h b/qpid/cpp/src/qpid/cluster/FailoverExchange.h
index 7eb9ea8c3a..b4caa70db4 100644
--- a/qpid/cpp/src/qpid/cluster/FailoverExchange.h
+++ b/qpid/cpp/src/qpid/cluster/FailoverExchange.h
@@ -38,9 +38,9 @@ namespace cluster {
class FailoverExchange : public broker::Exchange
{
public:
- static const std::string TYPE_NAME;
+ static const std::string typeName;
- FailoverExchange(management::Manageable* parent);
+ FailoverExchange(management::Manageable* parent, broker::Broker* b);
/** Set the URLs but don't send an update.*/
void setUrls(const std::vector<Url>&);
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index b4fa510528..7272675971 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -18,7 +18,7 @@
# under the License.
#
-import os, signal, sys, time, imp, re
+import os, signal, sys, time, imp, re, subprocess
from qpid import datatypes, messaging
from qpid.brokertest import *
from qpid.harness import Skipped
@@ -214,6 +214,15 @@ acl allow all all
for b in cluster: b.ready() # Make sure all brokers still running.
+ def test_amqfailover_visible(self):
+ """Verify that the amq.failover exchange can be seen by
+ QMF-based tools - regression test for BZ615300."""
+ broker1 = self.cluster(1)[0]
+ broker2 = self.cluster(1)[0]
+ qs = subprocess.Popen(["qpid-stat", "-e", broker1.host_port()], stdout=subprocess.PIPE)
+ out = qs.communicate()[0]
+ assert out.find("amq.failover") > 0
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
@@ -525,4 +534,3 @@ class StoreTests(BrokerTest):
self.assertEqual(c.get_message("q").content, "x")
b = cluster.start("b")
self.assertEqual(c.get_message("q").content, "y")
-