diff options
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DirectExchange.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/FanOutExchange.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/HeadersExchange.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/HeadersExchange.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 67 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.h | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionAdapter.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/TopicExchange.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 25 | ||||
-rw-r--r-- | qpid/cpp/src/tests/MessagingSessionTests.cpp | 21 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/federation.py | 106 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 17 |
14 files changed, 257 insertions, 23 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 6d3894d65f..d7844b50ce 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -49,6 +49,11 @@ using qpid::management::ManagementAgent; using std::string; namespace _qmf = qmf::org::apache::qpid::broker; +namespace { +const std::string QPID_REPLICATE("qpid.replicate"); +const std::string NONE("none"); +} + namespace qpid { namespace broker { @@ -333,6 +338,7 @@ void Bridge::propagateBinding(const string& key, const string& tagList, } string newTagList(tagList + string(tagList.empty() ? "" : ",") + localTag); + bindArgs.setString(QPID_REPLICATE, NONE); bindArgs.setString(qpidFedOp, op); bindArgs.setString(qpidFedTags, newTagList); if (origin.empty()) diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index cb93abfac7..094dd63527 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -1250,6 +1250,7 @@ void Broker::bind(const std::string& queueName, QPID_LOG_CAT(debug, model, "Create binding. exchange:" << exchangeName << " queue:" << queueName << " key:" << key + << " arguments:" << arguments << " user:" << userId << " rhost:" << connectionId); } diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp index 2fa7ce0fc5..773a99d2c9 100644 --- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp +++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp @@ -70,7 +70,7 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con if (args == 0 || fedOp.empty() || fedOp == fedOpBind) { Mutex::ScopedLock l(lock); - Binding::shared_ptr b(new Binding(routingKey, queue, this, FieldTable(), fedOrigin)); + Binding::shared_ptr b(new Binding(routingKey, queue, this, args ? *args : FieldTable(), fedOrigin)); BoundKey& bk = bindings[routingKey]; if (exclusiveBinding) bk.queues.clear(); diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp index 56c894c129..43c67af810 100644 --- a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp @@ -54,7 +54,7 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const bool propagate = false; if (args == 0 || fedOp.empty() || fedOp == fedOpBind) { - Binding::shared_ptr binding (new Binding ("", queue, this, FieldTable(), fedOrigin)); + Binding::shared_ptr binding (new Binding ("", queue, this, args ? *args : FieldTable(), fedOrigin)); if (bindings.add_unless(binding, MatchQueue(queue))) { binding->startManagement(); propagate = fedBinding.addOrigin(queue->getName(), fedOrigin); diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp index 02c05852ff..ea7fce4ff6 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp @@ -48,6 +48,7 @@ namespace { const std::string empty; // federation related args and values + const std::string QPID_RESERVED("qpid."); const std::string qpidFedOp("qpid.fed.op"); const std::string qpidFedTags("qpid.fed.tags"); const std::string qpidFedOrigin("qpid.fed.origin"); @@ -200,8 +201,8 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co //matching (they are internally added properties //controlling binding propagation but not relevant to //actual routing) - Binding::shared_ptr binding (new Binding (bindingKey, queue, this, extra_args)); - BoundKey bk(binding); + Binding::shared_ptr binding (new Binding (bindingKey, queue, this, args ? *args : FieldTable())); + BoundKey bk(binding, extra_args); if (bindings.add_unless(bk, MatchArgs(queue, &extra_args))) { binding->startManagement(); propagate = bk.fedBinding.addOrigin(queue->getName(), fedOrigin); @@ -282,7 +283,7 @@ void HeadersExchange::route(Deliverable& msg) Bindings::ConstPtr p = bindings.snapshot(); if (p.get()) { for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i) { - Matcher matcher(i->binding->args); + Matcher matcher(i->args); msg.getMessage().processProperties(matcher); if (matcher.matches()) { b->push_back(i->binding); @@ -298,7 +299,7 @@ bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, cons Bindings::ConstPtr p = bindings.snapshot(); if (p.get()){ for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i) { - if ( (!args || equal((*i).binding->args, *args)) && (!queue || (*i).binding->queue == queue)) { + if ( (!args || equal((*i).args, *args)) && (!queue || (*i).binding->queue == queue)) { return true; } } @@ -315,10 +316,7 @@ void HeadersExchange::getNonFedArgs(const FieldTable* args, FieldTable& nonFedAr for (qpid::framing::FieldTable::ValueMap::const_iterator i=args->begin(); i != args->end(); ++i) { - const string & name(i->first); - if (name == qpidFedOp || - name == qpidFedTags || - name == qpidFedOrigin) + if (i->first.find(QPID_RESERVED) == 0) { continue; } diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.h b/qpid/cpp/src/qpid/broker/HeadersExchange.h index 2e4669a018..67ba793ba8 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.h +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.h @@ -38,8 +38,9 @@ class HeadersExchange : public virtual Exchange { struct BoundKey { Binding::shared_ptr binding; + qpid::framing::FieldTable args; FedBinding fedBinding; - BoundKey(Binding::shared_ptr binding_) : binding(binding_) {} + BoundKey(Binding::shared_ptr binding_, const qpid::framing::FieldTable& args_) : binding(binding_), args(args_) {} }; struct MatchArgs diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index f6411c98dd..0965381fcd 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -37,9 +37,11 @@ #include "qpid/log/Statement.h" #include "qpid/ptr_map.h" #include "qpid/broker/AclModule.h" +#include "qpid/broker/FedOps.h" #include <boost/bind.hpp> #include <boost/format.hpp> +#include <boost/tuple/tuple_comparison.hpp> #include <iostream> #include <sstream> @@ -48,6 +50,11 @@ #include <assert.h> +namespace { +const std::string X_SCOPE("x-scope"); +const std::string SESSION("session"); +} + namespace qpid { namespace broker { @@ -87,6 +94,7 @@ void SemanticState::closed() { if (dtxBuffer.get()) { dtxBuffer->fail(); } + unbindSessionBindings(); requeue(); //now unsubscribe, which may trigger queue deletion and thus @@ -803,4 +811,63 @@ void SemanticState::detached() } } +void SemanticState::addBinding(const string& queueName, const string& exchangeName, + const string& routingKey, const framing::FieldTable& arguments) +{ + QPID_LOG (debug, "SemanticState::addBinding [" + << "queue=" << queueName << ", " + << "exchange=" << exchangeName << ", " + << "key=" << routingKey << ", " + << "args=" << arguments << "]"); + std::string fedOp = arguments.getAsString(qpidFedOp); + if ((arguments.isSet(qpidFedOp)) && (fedOp.empty())) { + fedOp = fedOpBind; + } + std::string fedOrigin = arguments.getAsString(qpidFedOrigin); + if ((arguments.getAsString(X_SCOPE) == SESSION) || (fedOp == fedOpBind)) { + bindings.insert(boost::make_tuple(queueName, exchangeName, routingKey, fedOrigin)); + } + else if (fedOp == fedOpUnbind) { + bindings.erase(boost::make_tuple(queueName, exchangeName, routingKey, fedOrigin)); + } +} + +void SemanticState::removeBinding(const string& queueName, const string& exchangeName, + const string& routingKey) +{ + QPID_LOG (debug, "SemanticState::removeBinding [" + << "queue=" << queueName << ", " + << "exchange=" << exchangeName << ", " + << "key=" << routingKey) + bindings.erase(boost::make_tuple(queueName, exchangeName, routingKey, "")); +} + +void SemanticState::unbindSessionBindings() +{ + //unbind session-scoped bindings + for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) { + QPID_LOG (debug, "SemanticState::unbindSessionBindings [" + << "queue=" << i->get<0>() << ", " + << "exchange=" << i->get<1>()<< ", " + << "key=" << i->get<2>() << ", " + << "fedOrigin=" << i->get<3>() << "]"); + try { + std::string fedOrigin = i->get<3>(); + if (!fedOrigin.empty()) { + framing::FieldTable fedArguments; + fedArguments.setString(qpidFedOp, fedOpUnbind); + fedArguments.setString(qpidFedOrigin, fedOrigin); + session.getBroker().bind(i->get<0>(), i->get<1>(), i->get<2>(), fedArguments, + userID, connectionId); + } else { + session.getBroker().unbind(i->get<0>(), i->get<1>(), i->get<2>(), + userID, connectionId); + } + } + catch (...) { + } + } + bindings.clear(); +} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index be7c8d490a..f873c5c656 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -46,10 +46,12 @@ #include <list> #include <map> +#include <set> #include <vector> #include <boost/enable_shared_from_this.hpp> #include <boost/cast.hpp> +#include <boost/tuple/tuple.hpp> namespace qpid { namespace broker { @@ -173,6 +175,8 @@ class SemanticState : private boost::noncopyable { private: typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap; + typedef boost::tuple<std::string, std::string, std::string, std::string> Binding; + typedef std::set<Binding> Bindings; SessionState& session; ConsumerImplMap consumers; @@ -190,6 +194,8 @@ class SemanticState : private boost::noncopyable { //needed for queue delete events in auto-delete: const std::string connectionId; + Bindings bindings; + void checkDtxTimeout(); bool complete(DeliveryRecord&); @@ -197,6 +203,7 @@ class SemanticState : private boost::noncopyable { void requestDispatch(); void cancel(ConsumerImpl::shared_ptr); void disable(ConsumerImpl::shared_ptr); + void unbindSessionBindings(); public: @@ -271,6 +278,11 @@ class SemanticState : private boost::noncopyable { void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; } void record(const DeliveryRecord& delivery); DtxBufferMap& getSuspendedXids() { return suspendedXids; } + + void addBinding(const std::string& queueName, const std::string& exchangeName, + const std::string& routingKey, const framing::FieldTable& arguments); + void removeBinding(const std::string& queueName, const std::string& exchangeName, + const std::string& routingKey); }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index 0263ff2a58..b679aebbfa 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -154,12 +154,14 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, { getBroker().bind(queueName, exchangeName, routingKey, arguments, getConnection().getUserId(), getConnection().getUrl()); + state.addBinding(queueName, exchangeName, routingKey, arguments); } void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, const string& exchangeName, const string& routingKey) { + state.removeBinding(queueName, exchangeName, routingKey); getBroker().unbind(queueName, exchangeName, routingKey, getConnection().getUserId(), getConnection().getUrl()); } diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp index c11389bb17..d49464b4e1 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp +++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp @@ -179,7 +179,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons } } - Binding::shared_ptr binding (new Binding (routingPattern, queue, this, FieldTable(), fedOrigin)); + Binding::shared_ptr binding (new Binding (routingPattern, queue, this, args ? *args : FieldTable(), fedOrigin)); binding->startManagement(); bk->bindingVector.push_back(binding); nBindings++; diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 76f7341b4b..8f3eb3bf90 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -534,17 +534,19 @@ void BrokerReplicator::doEventBind(Variant::Map& values) { exchanges.find(values[EXNAME].asString()); boost::shared_ptr<Queue> queue = queues.find(values[QNAME].asString()); + framing::FieldTable args; + qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args); // We only replicate binds for a replicated queue to replicated // exchange that both exist locally. if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && - queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) + queue && replicationTest.replicateLevel(queue->getSettings().storeSettings) && + replicationTest.replicateLevel(args)) { - framing::FieldTable args; - qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName() << " queue=" << queue->getName() - << " key=" << key); + << " key=" << key + << " args=" << args); queue->bind(exchange, key, args); } } @@ -559,13 +561,11 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) { if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) { - framing::FieldTable args; - qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); - exchange->unbind(queue, key, &args); + exchange->unbind(queue, key, 0); } } @@ -692,16 +692,19 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) { boost::shared_ptr<Exchange> exchange = exchanges.find(exName); boost::shared_ptr<Queue> queue = queues.find(qName); + framing::FieldTable args; + qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); + // Automatically replicate binding if queue and exchange exist and are replicated if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && - queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) + queue && replicationTest.replicateLevel(queue->getSettings().storeSettings) && + replicationTest.replicateLevel(args)) { string key = values[BINDING_KEY].asString(); QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName << " queue:" << qName - << " key:" << key); - framing::FieldTable args; - qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); + << " key:" << key + << " args:" << args); queue->bind(exchange, key, args); } } diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index edb50fce9c..55cff046e2 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -1196,6 +1196,27 @@ QPID_AUTO_TEST_CASE(testBrowseOnly) fix.session.acknowledge(); } +QPID_AUTO_TEST_CASE(testLinkBindingCleanup) +{ + MessagingFixture fix; + + Sender sender = fix.session.createSender("test.ex;{create:always,node:{type:topic}}"); + + Connection connection = fix.newConnection(); + connection.open(); + + Session session(connection.createSession()); + Receiver receiver1 = session.createReceiver("test.q;{create:always, node:{type:queue, x-bindings:[{exchange:test.ex,queue:test.q,key:#,arguments:{x-scope:session}}]}}"); + Receiver receiver2 = fix.session.createReceiver("test.q;{create:never, delete:always}"); + connection.close(); + + sender.send(Message("test-message"), true); + + // The session-scoped binding should be removed when receiver1's network connection is lost + Message in; + BOOST_CHECK(!receiver2.fetch(in, Duration::IMMEDIATE)); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py index dcd074eda9..6477c6effd 100755 --- a/qpid/cpp/src/tests/federation.py +++ b/qpid/cpp/src/tests/federation.py @@ -2604,3 +2604,109 @@ class FederationTests(TestBase010): self.verify_cleanup() + def test_dynamic_bounce_unbinds_named_queue(self): + """ Verify that a propagated binding is removed when the connection is + bounced + """ + session = self.session + + # create the federation + + self.startQmf() + qmf = self.qmf + + self._setup_brokers() + + # create exchange on each broker, and retrieve the corresponding + # management object for that exchange + + exchanges=[] + for _b in self._brokers[0:2]: + _b.client_session.exchange_declare(exchange="fedX", type="direct") + self.assertEqual(_b.client_session.exchange_query(name="fedX").type, + "direct", "exchange_declare failed!") + # pull the exchange out of qmf... + retries = 0 + my_exchange = None + timeout = time() + 10 + while my_exchange is None and time() <= timeout: + objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange") + for ooo in objs: + if ooo.name == "fedX": + my_exchange = ooo + break + if my_exchange is None: + self.fail("QMF failed to find new exchange!") + exchanges.append(my_exchange) + + # on the destination broker, create a binding for propagation + self._brokers[0].client_session.queue_declare(queue="fedDstQ") + self._brokers[0].client_session.exchange_bind(queue="fedDstQ", exchange="fedX", binding_key="spud") + + # on the source broker, create a bridge queue + self._brokers[1].client_session.queue_declare(queue="fedSrcQ") + + # connect B1 --> B0 + result = self._brokers[0].qmf_object.create( "link", + "Link-dynamic", + {"host":self._brokers[1].host, + "port":self._brokers[1].port}, False) + self.assertEqual(result.status, 0) + + # bridge the "fedX" exchange: + result = self._brokers[0].qmf_object.create("bridge", + "Bridge-dynamic", + {"link":"Link-dynamic", + "src":"fedX", + "dest":"fedX", + "dynamic":True, + "queue":"fedSrcQ"}, False) + self.assertEqual(result.status, 0) + + # wait for the inter-broker links to become operational + operational = False + timeout = time() + 10 + while not operational and time() <= timeout: + operational = True + for _l in qmf.getObjects(_class="link"): + #print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state))) + if _l.state != "Operational": + operational = False + self.failUnless(operational, "inter-broker links failed to become operational.") + + # wait until the binding key has propagated to the src broker + exchanges[1].update() + timeout = time() + 10 + while exchanges[1].bindingCount < 1 and time() <= timeout: + exchanges[1].update() + self.failUnless(exchanges[1].bindingCount == 1) + + # + # Tear down the bridges between the two exchanges, then wait + # for the bindings to be cleaned up + # + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + exchanges[1].update() + timeout = time() + 10 + while exchanges[1].bindingCount != 0 and time() <= timeout: + exchanges[1].update() + self.failUnless(exchanges[1].bindingCount == 0) + + self._brokers[1].client_session.queue_delete(queue="fedSrcQ") + + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + for _l in qmf.getObjects(_class="link"): + result = _l.close() + self.assertEqual(result.status, 0) + + for _b in self._brokers[0:2]: + _b.client_session.exchange_delete(exchange="fedX") + + self._teardown_brokers() + + self.verify_cleanup() diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 968ffa8b4a..ccb75d9cfd 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -478,6 +478,23 @@ class ReplicationTests(HaBrokerTest): self.fail("Excpected no-such-queue exception") except NotFound: pass + def test_replicate_binding(self): + """Verify that binding replication can be disabled""" + primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) + primary.promote() + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) + ps = primary.connect().session() + ps.sender("ex;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout'}}}") + ps.sender("q;{create:always,node:{type:queue,x-declare:{arguments:{'qpid.replicate':all}},x-bindings:[{exchange:'ex',queue:'q',key:'',arguments:{'qpid.replicate':none}}]}}") + backup.wait_backup("q") + + primary.kill() + assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die + backup.promote() + bs = backup.connect_admin().session() + bs.sender("ex").send(Message("msg")) + self.assert_browse_retry(bs, "q", []) + def test_invalid_replication(self): """Verify that we reject an attempt to declare a queue with invalid replication value.""" cluster = HaCluster(self, 1, ha_replicate="all") |