diff options
author | Alan Conway <aconway@apache.org> | 2012-12-11 21:50:03 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-12-11 21:50:03 +0000 |
commit | bdc30f4246918c5f86eb50171d0efc95cd674895 (patch) | |
tree | cb7d7f4b2f3de83f9feab64a1ae07fbd4abb5163 /qpid/cpp/src | |
parent | fffac20e0f519d3b065785fc67b604630ed8edc3 (diff) | |
download | qpid-python-bdc30f4246918c5f86eb50171d0efc95cd674895.tar.gz |
QPID-4481: HA replication of propagated bindings can lead to incorrect configuration
When using dynamic federation between two independent HA broker clusters, it is
possible under certain failover scenarios for the propagated bindings on the
source broker to become out-of-sync with the true state of bindings on the
destination broker.
How reproducible:
Often -- race condition between re-establishment of federated link and the deletion of a binding on the destination broker
Steps to Reproduce:
1. Start a stand-alone broker (route destination) and an HA broker (route source w/ primary and backup)
2. Configure a dynamic federated route between a destination broker and a source broker. The dynamic federation needs to utilize an existing, non-auto-delete queue on the source broker.
3. Subscribe to an auto-delete queue on the destination broker and bind the auto-delete queue to the exchange configured for the dynamic federation
4. Kill the primary source broker
5. Kill the subscription to the auto-delete queue on the destination broker
6. Promote the backup source broker to primary
Actual results:
With the loss of the client subscription to the auto-delete queue, the binding will be removed. If the binding is removed prior to the re-establishment of the federated link to the source broker, the unbind command will not propagate. Since the backup source broker had previously replicated the propagated binding, the binding will incorrectly remain on the source broker.
Expected results:
Propagated bindings should not be replicated from the primary to backups since they are transient and will be recreated when the route is re-established.)))
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1420438 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DirectExchange.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/FanOutExchange.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/HeadersExchange.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/HeadersExchange.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 67 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.h | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionAdapter.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/TopicExchange.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 25 | ||||
-rw-r--r-- | qpid/cpp/src/tests/MessagingSessionTests.cpp | 21 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/federation.py | 106 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 17 |
14 files changed, 257 insertions, 23 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 6d3894d65f..d7844b50ce 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -49,6 +49,11 @@ using qpid::management::ManagementAgent; using std::string; namespace _qmf = qmf::org::apache::qpid::broker; +namespace { +const std::string QPID_REPLICATE("qpid.replicate"); +const std::string NONE("none"); +} + namespace qpid { namespace broker { @@ -333,6 +338,7 @@ void Bridge::propagateBinding(const string& key, const string& tagList, } string newTagList(tagList + string(tagList.empty() ? "" : ",") + localTag); + bindArgs.setString(QPID_REPLICATE, NONE); bindArgs.setString(qpidFedOp, op); bindArgs.setString(qpidFedTags, newTagList); if (origin.empty()) diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index cb93abfac7..094dd63527 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -1250,6 +1250,7 @@ void Broker::bind(const std::string& queueName, QPID_LOG_CAT(debug, model, "Create binding. exchange:" << exchangeName << " queue:" << queueName << " key:" << key + << " arguments:" << arguments << " user:" << userId << " rhost:" << connectionId); } diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp index 2fa7ce0fc5..773a99d2c9 100644 --- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp +++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp @@ -70,7 +70,7 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con if (args == 0 || fedOp.empty() || fedOp == fedOpBind) { Mutex::ScopedLock l(lock); - Binding::shared_ptr b(new Binding(routingKey, queue, this, FieldTable(), fedOrigin)); + Binding::shared_ptr b(new Binding(routingKey, queue, this, args ? *args : FieldTable(), fedOrigin)); BoundKey& bk = bindings[routingKey]; if (exclusiveBinding) bk.queues.clear(); diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp index 56c894c129..43c67af810 100644 --- a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp @@ -54,7 +54,7 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const bool propagate = false; if (args == 0 || fedOp.empty() || fedOp == fedOpBind) { - Binding::shared_ptr binding (new Binding ("", queue, this, FieldTable(), fedOrigin)); + Binding::shared_ptr binding (new Binding ("", queue, this, args ? *args : FieldTable(), fedOrigin)); if (bindings.add_unless(binding, MatchQueue(queue))) { binding->startManagement(); propagate = fedBinding.addOrigin(queue->getName(), fedOrigin); diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp index 02c05852ff..ea7fce4ff6 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp @@ -48,6 +48,7 @@ namespace { const std::string empty; // federation related args and values + const std::string QPID_RESERVED("qpid."); const std::string qpidFedOp("qpid.fed.op"); const std::string qpidFedTags("qpid.fed.tags"); const std::string qpidFedOrigin("qpid.fed.origin"); @@ -200,8 +201,8 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co //matching (they are internally added properties //controlling binding propagation but not relevant to //actual routing) - Binding::shared_ptr binding (new Binding (bindingKey, queue, this, extra_args)); - BoundKey bk(binding); + Binding::shared_ptr binding (new Binding (bindingKey, queue, this, args ? *args : FieldTable())); + BoundKey bk(binding, extra_args); if (bindings.add_unless(bk, MatchArgs(queue, &extra_args))) { binding->startManagement(); propagate = bk.fedBinding.addOrigin(queue->getName(), fedOrigin); @@ -282,7 +283,7 @@ void HeadersExchange::route(Deliverable& msg) Bindings::ConstPtr p = bindings.snapshot(); if (p.get()) { for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i) { - Matcher matcher(i->binding->args); + Matcher matcher(i->args); msg.getMessage().processProperties(matcher); if (matcher.matches()) { b->push_back(i->binding); @@ -298,7 +299,7 @@ bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, cons Bindings::ConstPtr p = bindings.snapshot(); if (p.get()){ for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i) { - if ( (!args || equal((*i).binding->args, *args)) && (!queue || (*i).binding->queue == queue)) { + if ( (!args || equal((*i).args, *args)) && (!queue || (*i).binding->queue == queue)) { return true; } } @@ -315,10 +316,7 @@ void HeadersExchange::getNonFedArgs(const FieldTable* args, FieldTable& nonFedAr for (qpid::framing::FieldTable::ValueMap::const_iterator i=args->begin(); i != args->end(); ++i) { - const string & name(i->first); - if (name == qpidFedOp || - name == qpidFedTags || - name == qpidFedOrigin) + if (i->first.find(QPID_RESERVED) == 0) { continue; } diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.h b/qpid/cpp/src/qpid/broker/HeadersExchange.h index 2e4669a018..67ba793ba8 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.h +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.h @@ -38,8 +38,9 @@ class HeadersExchange : public virtual Exchange { struct BoundKey { Binding::shared_ptr binding; + qpid::framing::FieldTable args; FedBinding fedBinding; - BoundKey(Binding::shared_ptr binding_) : binding(binding_) {} + BoundKey(Binding::shared_ptr binding_, const qpid::framing::FieldTable& args_) : binding(binding_), args(args_) {} }; struct MatchArgs diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index f6411c98dd..0965381fcd 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -37,9 +37,11 @@ #include "qpid/log/Statement.h" #include "qpid/ptr_map.h" #include "qpid/broker/AclModule.h" +#include "qpid/broker/FedOps.h" #include <boost/bind.hpp> #include <boost/format.hpp> +#include <boost/tuple/tuple_comparison.hpp> #include <iostream> #include <sstream> @@ -48,6 +50,11 @@ #include <assert.h> +namespace { +const std::string X_SCOPE("x-scope"); +const std::string SESSION("session"); +} + namespace qpid { namespace broker { @@ -87,6 +94,7 @@ void SemanticState::closed() { if (dtxBuffer.get()) { dtxBuffer->fail(); } + unbindSessionBindings(); requeue(); //now unsubscribe, which may trigger queue deletion and thus @@ -803,4 +811,63 @@ void SemanticState::detached() } } +void SemanticState::addBinding(const string& queueName, const string& exchangeName, + const string& routingKey, const framing::FieldTable& arguments) +{ + QPID_LOG (debug, "SemanticState::addBinding [" + << "queue=" << queueName << ", " + << "exchange=" << exchangeName << ", " + << "key=" << routingKey << ", " + << "args=" << arguments << "]"); + std::string fedOp = arguments.getAsString(qpidFedOp); + if ((arguments.isSet(qpidFedOp)) && (fedOp.empty())) { + fedOp = fedOpBind; + } + std::string fedOrigin = arguments.getAsString(qpidFedOrigin); + if ((arguments.getAsString(X_SCOPE) == SESSION) || (fedOp == fedOpBind)) { + bindings.insert(boost::make_tuple(queueName, exchangeName, routingKey, fedOrigin)); + } + else if (fedOp == fedOpUnbind) { + bindings.erase(boost::make_tuple(queueName, exchangeName, routingKey, fedOrigin)); + } +} + +void SemanticState::removeBinding(const string& queueName, const string& exchangeName, + const string& routingKey) +{ + QPID_LOG (debug, "SemanticState::removeBinding [" + << "queue=" << queueName << ", " + << "exchange=" << exchangeName << ", " + << "key=" << routingKey) + bindings.erase(boost::make_tuple(queueName, exchangeName, routingKey, "")); +} + +void SemanticState::unbindSessionBindings() +{ + //unbind session-scoped bindings + for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) { + QPID_LOG (debug, "SemanticState::unbindSessionBindings [" + << "queue=" << i->get<0>() << ", " + << "exchange=" << i->get<1>()<< ", " + << "key=" << i->get<2>() << ", " + << "fedOrigin=" << i->get<3>() << "]"); + try { + std::string fedOrigin = i->get<3>(); + if (!fedOrigin.empty()) { + framing::FieldTable fedArguments; + fedArguments.setString(qpidFedOp, fedOpUnbind); + fedArguments.setString(qpidFedOrigin, fedOrigin); + session.getBroker().bind(i->get<0>(), i->get<1>(), i->get<2>(), fedArguments, + userID, connectionId); + } else { + session.getBroker().unbind(i->get<0>(), i->get<1>(), i->get<2>(), + userID, connectionId); + } + } + catch (...) { + } + } + bindings.clear(); +} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index be7c8d490a..f873c5c656 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -46,10 +46,12 @@ #include <list> #include <map> +#include <set> #include <vector> #include <boost/enable_shared_from_this.hpp> #include <boost/cast.hpp> +#include <boost/tuple/tuple.hpp> namespace qpid { namespace broker { @@ -173,6 +175,8 @@ class SemanticState : private boost::noncopyable { private: typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap; + typedef boost::tuple<std::string, std::string, std::string, std::string> Binding; + typedef std::set<Binding> Bindings; SessionState& session; ConsumerImplMap consumers; @@ -190,6 +194,8 @@ class SemanticState : private boost::noncopyable { //needed for queue delete events in auto-delete: const std::string connectionId; + Bindings bindings; + void checkDtxTimeout(); bool complete(DeliveryRecord&); @@ -197,6 +203,7 @@ class SemanticState : private boost::noncopyable { void requestDispatch(); void cancel(ConsumerImpl::shared_ptr); void disable(ConsumerImpl::shared_ptr); + void unbindSessionBindings(); public: @@ -271,6 +278,11 @@ class SemanticState : private boost::noncopyable { void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; } void record(const DeliveryRecord& delivery); DtxBufferMap& getSuspendedXids() { return suspendedXids; } + + void addBinding(const std::string& queueName, const std::string& exchangeName, + const std::string& routingKey, const framing::FieldTable& arguments); + void removeBinding(const std::string& queueName, const std::string& exchangeName, + const std::string& routingKey); }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index 0263ff2a58..b679aebbfa 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -154,12 +154,14 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, { getBroker().bind(queueName, exchangeName, routingKey, arguments, getConnection().getUserId(), getConnection().getUrl()); + state.addBinding(queueName, exchangeName, routingKey, arguments); } void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, const string& exchangeName, const string& routingKey) { + state.removeBinding(queueName, exchangeName, routingKey); getBroker().unbind(queueName, exchangeName, routingKey, getConnection().getUserId(), getConnection().getUrl()); } diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp index c11389bb17..d49464b4e1 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp +++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp @@ -179,7 +179,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons } } - Binding::shared_ptr binding (new Binding (routingPattern, queue, this, FieldTable(), fedOrigin)); + Binding::shared_ptr binding (new Binding (routingPattern, queue, this, args ? *args : FieldTable(), fedOrigin)); binding->startManagement(); bk->bindingVector.push_back(binding); nBindings++; diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 76f7341b4b..8f3eb3bf90 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -534,17 +534,19 @@ void BrokerReplicator::doEventBind(Variant::Map& values) { exchanges.find(values[EXNAME].asString()); boost::shared_ptr<Queue> queue = queues.find(values[QNAME].asString()); + framing::FieldTable args; + qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args); // We only replicate binds for a replicated queue to replicated // exchange that both exist locally. if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && - queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) + queue && replicationTest.replicateLevel(queue->getSettings().storeSettings) && + replicationTest.replicateLevel(args)) { - framing::FieldTable args; - qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName() << " queue=" << queue->getName() - << " key=" << key); + << " key=" << key + << " args=" << args); queue->bind(exchange, key, args); } } @@ -559,13 +561,11 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) { if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) { - framing::FieldTable args; - qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); - exchange->unbind(queue, key, &args); + exchange->unbind(queue, key, 0); } } @@ -692,16 +692,19 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) { boost::shared_ptr<Exchange> exchange = exchanges.find(exName); boost::shared_ptr<Queue> queue = queues.find(qName); + framing::FieldTable args; + qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); + // Automatically replicate binding if queue and exchange exist and are replicated if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && - queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) + queue && replicationTest.replicateLevel(queue->getSettings().storeSettings) && + replicationTest.replicateLevel(args)) { string key = values[BINDING_KEY].asString(); QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName << " queue:" << qName - << " key:" << key); - framing::FieldTable args; - qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); + << " key:" << key + << " args:" << args); queue->bind(exchange, key, args); } } diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index edb50fce9c..55cff046e2 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -1196,6 +1196,27 @@ QPID_AUTO_TEST_CASE(testBrowseOnly) fix.session.acknowledge(); } +QPID_AUTO_TEST_CASE(testLinkBindingCleanup) +{ + MessagingFixture fix; + + Sender sender = fix.session.createSender("test.ex;{create:always,node:{type:topic}}"); + + Connection connection = fix.newConnection(); + connection.open(); + + Session session(connection.createSession()); + Receiver receiver1 = session.createReceiver("test.q;{create:always, node:{type:queue, x-bindings:[{exchange:test.ex,queue:test.q,key:#,arguments:{x-scope:session}}]}}"); + Receiver receiver2 = fix.session.createReceiver("test.q;{create:never, delete:always}"); + connection.close(); + + sender.send(Message("test-message"), true); + + // The session-scoped binding should be removed when receiver1's network connection is lost + Message in; + BOOST_CHECK(!receiver2.fetch(in, Duration::IMMEDIATE)); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py index dcd074eda9..6477c6effd 100755 --- a/qpid/cpp/src/tests/federation.py +++ b/qpid/cpp/src/tests/federation.py @@ -2604,3 +2604,109 @@ class FederationTests(TestBase010): self.verify_cleanup() + def test_dynamic_bounce_unbinds_named_queue(self): + """ Verify that a propagated binding is removed when the connection is + bounced + """ + session = self.session + + # create the federation + + self.startQmf() + qmf = self.qmf + + self._setup_brokers() + + # create exchange on each broker, and retrieve the corresponding + # management object for that exchange + + exchanges=[] + for _b in self._brokers[0:2]: + _b.client_session.exchange_declare(exchange="fedX", type="direct") + self.assertEqual(_b.client_session.exchange_query(name="fedX").type, + "direct", "exchange_declare failed!") + # pull the exchange out of qmf... + retries = 0 + my_exchange = None + timeout = time() + 10 + while my_exchange is None and time() <= timeout: + objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange") + for ooo in objs: + if ooo.name == "fedX": + my_exchange = ooo + break + if my_exchange is None: + self.fail("QMF failed to find new exchange!") + exchanges.append(my_exchange) + + # on the destination broker, create a binding for propagation + self._brokers[0].client_session.queue_declare(queue="fedDstQ") + self._brokers[0].client_session.exchange_bind(queue="fedDstQ", exchange="fedX", binding_key="spud") + + # on the source broker, create a bridge queue + self._brokers[1].client_session.queue_declare(queue="fedSrcQ") + + # connect B1 --> B0 + result = self._brokers[0].qmf_object.create( "link", + "Link-dynamic", + {"host":self._brokers[1].host, + "port":self._brokers[1].port}, False) + self.assertEqual(result.status, 0) + + # bridge the "fedX" exchange: + result = self._brokers[0].qmf_object.create("bridge", + "Bridge-dynamic", + {"link":"Link-dynamic", + "src":"fedX", + "dest":"fedX", + "dynamic":True, + "queue":"fedSrcQ"}, False) + self.assertEqual(result.status, 0) + + # wait for the inter-broker links to become operational + operational = False + timeout = time() + 10 + while not operational and time() <= timeout: + operational = True + for _l in qmf.getObjects(_class="link"): + #print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state))) + if _l.state != "Operational": + operational = False + self.failUnless(operational, "inter-broker links failed to become operational.") + + # wait until the binding key has propagated to the src broker + exchanges[1].update() + timeout = time() + 10 + while exchanges[1].bindingCount < 1 and time() <= timeout: + exchanges[1].update() + self.failUnless(exchanges[1].bindingCount == 1) + + # + # Tear down the bridges between the two exchanges, then wait + # for the bindings to be cleaned up + # + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + exchanges[1].update() + timeout = time() + 10 + while exchanges[1].bindingCount != 0 and time() <= timeout: + exchanges[1].update() + self.failUnless(exchanges[1].bindingCount == 0) + + self._brokers[1].client_session.queue_delete(queue="fedSrcQ") + + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + for _l in qmf.getObjects(_class="link"): + result = _l.close() + self.assertEqual(result.status, 0) + + for _b in self._brokers[0:2]: + _b.client_session.exchange_delete(exchange="fedX") + + self._teardown_brokers() + + self.verify_cleanup() diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 968ffa8b4a..ccb75d9cfd 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -478,6 +478,23 @@ class ReplicationTests(HaBrokerTest): self.fail("Excpected no-such-queue exception") except NotFound: pass + def test_replicate_binding(self): + """Verify that binding replication can be disabled""" + primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) + primary.promote() + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) + ps = primary.connect().session() + ps.sender("ex;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout'}}}") + ps.sender("q;{create:always,node:{type:queue,x-declare:{arguments:{'qpid.replicate':all}},x-bindings:[{exchange:'ex',queue:'q',key:'',arguments:{'qpid.replicate':none}}]}}") + backup.wait_backup("q") + + primary.kill() + assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die + backup.promote() + bs = backup.connect_admin().session() + bs.sender("ex").send(Message("msg")) + self.assert_browse_retry(bs, "q", []) + def test_invalid_replication(self): """Verify that we reject an attempt to declare a queue with invalid replication value.""" cluster = HaCluster(self, 1, ha_replicate="all") |