diff options
author | Jonathan Robie <jonathan@apache.org> | 2010-10-08 17:11:06 +0000 |
---|---|---|
committer | Jonathan Robie <jonathan@apache.org> | 2010-10-08 17:11:06 +0000 |
commit | 2f02708cbfc29669ffa64ce44f8bb9ebff01906b (patch) | |
tree | dd3d019f4142d8e2c54b01d4015f2f8f5d9cdb9c | |
parent | 53eff695b3bdac45260295b519a4274c9ad582a3 (diff) | |
download | qpid-python-2f02708cbfc29669ffa64ce44f8bb9ebff01906b.tar.gz |
Registers the amq.failover exchange in the management exchange.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1005908 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/FailoverExchange.cpp | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/FailoverExchange.h | 4 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 12 |
5 files changed, 23 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp index 20fdc4164a..8122e5c2d9 100644 --- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -45,7 +45,7 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c if (i == exchanges.end()) { Exchange::shared_ptr exchange; - if(type == TopicExchange::typeName){ + if (type == TopicExchange::typeName){ exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent, broker)); }else if(type == DirectExchange::typeName){ exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent, broker)); diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 5da1579cb7..acf4206629 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -282,7 +282,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : broker.setClusterTimer(std::auto_ptr<sys::Timer>(timer)); // Failover exchange provides membership updates to clients. - failoverExchange.reset(new FailoverExchange(this)); + failoverExchange.reset(new FailoverExchange(broker.GetVhostObject(), &broker)); broker.getExchanges().registerExchange(failoverExchange); // Update exchange is used during updates to replicate messages diff --git a/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp b/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp index 24518dbe9f..84232dac1b 100644 --- a/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp +++ b/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp @@ -38,11 +38,11 @@ using namespace std; using namespace broker; using namespace framing; -const string FailoverExchange::TYPE_NAME("amq.failover"); - -FailoverExchange::FailoverExchange(management::Manageable* parent) : Exchange(TYPE_NAME, parent) { +const string FailoverExchange::typeName("amq.failover"); + +FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b) : Exchange(typeName, parent, b ) { if (mgmtExchange != 0) - mgmtExchange->set_type(TYPE_NAME); + mgmtExchange->set_type(typeName); } void FailoverExchange::setUrls(const vector<Url>& u) { @@ -58,7 +58,7 @@ void FailoverExchange::updateUrls(const vector<Url>& u) { boost::bind(&FailoverExchange::sendUpdate, this, _1)); } -string FailoverExchange::getType() const { return TYPE_NAME; } +string FailoverExchange::getType() const { return typeName; } bool FailoverExchange::bind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) { Lock l(lock); @@ -77,7 +77,7 @@ bool FailoverExchange::isBound(Queue::shared_ptr queue, const string* const, con } void FailoverExchange::route(Deliverable&, const string& , const framing::FieldTable* ) { - QPID_LOG(warning, "Message received by exchange " << TYPE_NAME << " ignoring"); + QPID_LOG(warning, "Message received by exchange " << typeName << " ignoring"); } void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) { @@ -88,16 +88,17 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) { array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str()))); const ProtocolVersion v; boost::intrusive_ptr<Message> msg(new Message); - AMQFrame command(MessageTransferBody(v, TYPE_NAME, 1, 0)); + AMQFrame command(MessageTransferBody(v, typeName, 1, 0)); command.setLastSegment(false); msg->getFrames().append(command); AMQHeaderBody header; header.get<MessageProperties>(true)->setContentLength(0); - header.get<MessageProperties>(true)->getApplicationHeaders().setArray(TYPE_NAME, array); + header.get<MessageProperties>(true)->getApplicationHeaders().setArray(typeName, array); AMQFrame headerFrame(header); headerFrame.setFirstSegment(false); msg->getFrames().append(headerFrame); DeliverableMessage(msg).deliverTo(queue); } + }} // namespace cluster diff --git a/qpid/cpp/src/qpid/cluster/FailoverExchange.h b/qpid/cpp/src/qpid/cluster/FailoverExchange.h index 7eb9ea8c3a..b4caa70db4 100644 --- a/qpid/cpp/src/qpid/cluster/FailoverExchange.h +++ b/qpid/cpp/src/qpid/cluster/FailoverExchange.h @@ -38,9 +38,9 @@ namespace cluster { class FailoverExchange : public broker::Exchange { public: - static const std::string TYPE_NAME; + static const std::string typeName; - FailoverExchange(management::Manageable* parent); + FailoverExchange(management::Manageable* parent, broker::Broker* b); /** Set the URLs but don't send an update.*/ void setUrls(const std::vector<Url>&); diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index b4fa510528..7272675971 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -18,7 +18,7 @@ # under the License. # -import os, signal, sys, time, imp, re +import os, signal, sys, time, imp, re, subprocess from qpid import datatypes, messaging from qpid.brokertest import * from qpid.harness import Skipped @@ -214,6 +214,15 @@ acl allow all all for b in cluster: b.ready() # Make sure all brokers still running. + def test_amqfailover_visible(self): + """Verify that the amq.failover exchange can be seen by + QMF-based tools - regression test for BZ615300.""" + broker1 = self.cluster(1)[0] + broker2 = self.cluster(1)[0] + qs = subprocess.Popen(["qpid-stat", "-e", broker1.host_port()], stdout=subprocess.PIPE) + out = qs.communicate()[0] + assert out.find("amq.failover") > 0 + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): @@ -525,4 +534,3 @@ class StoreTests(BrokerTest): self.assertEqual(c.get_message("q").content, "x") b = cluster.start("b") self.assertEqual(c.get_message("q").content, "y") - |