summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/DirectExchange.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/FanOutExchange.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/HeadersExchange.cpp14
-rw-r--r--qpid/cpp/src/qpid/broker/HeadersExchange.h3
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp67
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h12
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/TopicExchange.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp25
-rw-r--r--qpid/cpp/src/tests/MessagingSessionTests.cpp21
-rwxr-xr-xqpid/cpp/src/tests/federation.py106
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py17
14 files changed, 257 insertions, 23 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 6d3894d65f..d7844b50ce 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -49,6 +49,11 @@ using qpid::management::ManagementAgent;
using std::string;
namespace _qmf = qmf::org::apache::qpid::broker;
+namespace {
+const std::string QPID_REPLICATE("qpid.replicate");
+const std::string NONE("none");
+}
+
namespace qpid {
namespace broker {
@@ -333,6 +338,7 @@ void Bridge::propagateBinding(const string& key, const string& tagList,
}
string newTagList(tagList + string(tagList.empty() ? "" : ",") + localTag);
+ bindArgs.setString(QPID_REPLICATE, NONE);
bindArgs.setString(qpidFedOp, op);
bindArgs.setString(qpidFedTags, newTagList);
if (origin.empty())
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index cb93abfac7..094dd63527 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -1250,6 +1250,7 @@ void Broker::bind(const std::string& queueName,
QPID_LOG_CAT(debug, model, "Create binding. exchange:" << exchangeName
<< " queue:" << queueName
<< " key:" << key
+ << " arguments:" << arguments
<< " user:" << userId
<< " rhost:" << connectionId);
}
diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp
index 2fa7ce0fc5..773a99d2c9 100644
--- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp
@@ -70,7 +70,7 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con
if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
Mutex::ScopedLock l(lock);
- Binding::shared_ptr b(new Binding(routingKey, queue, this, FieldTable(), fedOrigin));
+ Binding::shared_ptr b(new Binding(routingKey, queue, this, args ? *args : FieldTable(), fedOrigin));
BoundKey& bk = bindings[routingKey];
if (exclusiveBinding) bk.queues.clear();
diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
index 56c894c129..43c67af810 100644
--- a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -54,7 +54,7 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const
bool propagate = false;
if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
- Binding::shared_ptr binding (new Binding ("", queue, this, FieldTable(), fedOrigin));
+ Binding::shared_ptr binding (new Binding ("", queue, this, args ? *args : FieldTable(), fedOrigin));
if (bindings.add_unless(binding, MatchQueue(queue))) {
binding->startManagement();
propagate = fedBinding.addOrigin(queue->getName(), fedOrigin);
diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
index 02c05852ff..ea7fce4ff6 100644
--- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -48,6 +48,7 @@ namespace {
const std::string empty;
// federation related args and values
+ const std::string QPID_RESERVED("qpid.");
const std::string qpidFedOp("qpid.fed.op");
const std::string qpidFedTags("qpid.fed.tags");
const std::string qpidFedOrigin("qpid.fed.origin");
@@ -200,8 +201,8 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co
//matching (they are internally added properties
//controlling binding propagation but not relevant to
//actual routing)
- Binding::shared_ptr binding (new Binding (bindingKey, queue, this, extra_args));
- BoundKey bk(binding);
+ Binding::shared_ptr binding (new Binding (bindingKey, queue, this, args ? *args : FieldTable()));
+ BoundKey bk(binding, extra_args);
if (bindings.add_unless(bk, MatchArgs(queue, &extra_args))) {
binding->startManagement();
propagate = bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
@@ -282,7 +283,7 @@ void HeadersExchange::route(Deliverable& msg)
Bindings::ConstPtr p = bindings.snapshot();
if (p.get()) {
for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i) {
- Matcher matcher(i->binding->args);
+ Matcher matcher(i->args);
msg.getMessage().processProperties(matcher);
if (matcher.matches()) {
b->push_back(i->binding);
@@ -298,7 +299,7 @@ bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, cons
Bindings::ConstPtr p = bindings.snapshot();
if (p.get()){
for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i) {
- if ( (!args || equal((*i).binding->args, *args)) && (!queue || (*i).binding->queue == queue)) {
+ if ( (!args || equal((*i).args, *args)) && (!queue || (*i).binding->queue == queue)) {
return true;
}
}
@@ -315,10 +316,7 @@ void HeadersExchange::getNonFedArgs(const FieldTable* args, FieldTable& nonFedAr
for (qpid::framing::FieldTable::ValueMap::const_iterator i=args->begin(); i != args->end(); ++i)
{
- const string & name(i->first);
- if (name == qpidFedOp ||
- name == qpidFedTags ||
- name == qpidFedOrigin)
+ if (i->first.find(QPID_RESERVED) == 0)
{
continue;
}
diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.h b/qpid/cpp/src/qpid/broker/HeadersExchange.h
index 2e4669a018..67ba793ba8 100644
--- a/qpid/cpp/src/qpid/broker/HeadersExchange.h
+++ b/qpid/cpp/src/qpid/broker/HeadersExchange.h
@@ -38,8 +38,9 @@ class HeadersExchange : public virtual Exchange {
struct BoundKey
{
Binding::shared_ptr binding;
+ qpid::framing::FieldTable args;
FedBinding fedBinding;
- BoundKey(Binding::shared_ptr binding_) : binding(binding_) {}
+ BoundKey(Binding::shared_ptr binding_, const qpid::framing::FieldTable& args_) : binding(binding_), args(args_) {}
};
struct MatchArgs
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index f6411c98dd..0965381fcd 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -37,9 +37,11 @@
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
#include "qpid/broker/AclModule.h"
+#include "qpid/broker/FedOps.h"
#include <boost/bind.hpp>
#include <boost/format.hpp>
+#include <boost/tuple/tuple_comparison.hpp>
#include <iostream>
#include <sstream>
@@ -48,6 +50,11 @@
#include <assert.h>
+namespace {
+const std::string X_SCOPE("x-scope");
+const std::string SESSION("session");
+}
+
namespace qpid {
namespace broker {
@@ -87,6 +94,7 @@ void SemanticState::closed() {
if (dtxBuffer.get()) {
dtxBuffer->fail();
}
+ unbindSessionBindings();
requeue();
//now unsubscribe, which may trigger queue deletion and thus
@@ -803,4 +811,63 @@ void SemanticState::detached()
}
}
+void SemanticState::addBinding(const string& queueName, const string& exchangeName,
+ const string& routingKey, const framing::FieldTable& arguments)
+{
+ QPID_LOG (debug, "SemanticState::addBinding ["
+ << "queue=" << queueName << ", "
+ << "exchange=" << exchangeName << ", "
+ << "key=" << routingKey << ", "
+ << "args=" << arguments << "]");
+ std::string fedOp = arguments.getAsString(qpidFedOp);
+ if ((arguments.isSet(qpidFedOp)) && (fedOp.empty())) {
+ fedOp = fedOpBind;
+ }
+ std::string fedOrigin = arguments.getAsString(qpidFedOrigin);
+ if ((arguments.getAsString(X_SCOPE) == SESSION) || (fedOp == fedOpBind)) {
+ bindings.insert(boost::make_tuple(queueName, exchangeName, routingKey, fedOrigin));
+ }
+ else if (fedOp == fedOpUnbind) {
+ bindings.erase(boost::make_tuple(queueName, exchangeName, routingKey, fedOrigin));
+ }
+}
+
+void SemanticState::removeBinding(const string& queueName, const string& exchangeName,
+ const string& routingKey)
+{
+ QPID_LOG (debug, "SemanticState::removeBinding ["
+ << "queue=" << queueName << ", "
+ << "exchange=" << exchangeName << ", "
+ << "key=" << routingKey)
+ bindings.erase(boost::make_tuple(queueName, exchangeName, routingKey, ""));
+}
+
+void SemanticState::unbindSessionBindings()
+{
+ //unbind session-scoped bindings
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
+ QPID_LOG (debug, "SemanticState::unbindSessionBindings ["
+ << "queue=" << i->get<0>() << ", "
+ << "exchange=" << i->get<1>()<< ", "
+ << "key=" << i->get<2>() << ", "
+ << "fedOrigin=" << i->get<3>() << "]");
+ try {
+ std::string fedOrigin = i->get<3>();
+ if (!fedOrigin.empty()) {
+ framing::FieldTable fedArguments;
+ fedArguments.setString(qpidFedOp, fedOpUnbind);
+ fedArguments.setString(qpidFedOrigin, fedOrigin);
+ session.getBroker().bind(i->get<0>(), i->get<1>(), i->get<2>(), fedArguments,
+ userID, connectionId);
+ } else {
+ session.getBroker().unbind(i->get<0>(), i->get<1>(), i->get<2>(),
+ userID, connectionId);
+ }
+ }
+ catch (...) {
+ }
+ }
+ bindings.clear();
+}
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index be7c8d490a..f873c5c656 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -46,10 +46,12 @@
#include <list>
#include <map>
+#include <set>
#include <vector>
#include <boost/enable_shared_from_this.hpp>
#include <boost/cast.hpp>
+#include <boost/tuple/tuple.hpp>
namespace qpid {
namespace broker {
@@ -173,6 +175,8 @@ class SemanticState : private boost::noncopyable {
private:
typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
+ typedef boost::tuple<std::string, std::string, std::string, std::string> Binding;
+ typedef std::set<Binding> Bindings;
SessionState& session;
ConsumerImplMap consumers;
@@ -190,6 +194,8 @@ class SemanticState : private boost::noncopyable {
//needed for queue delete events in auto-delete:
const std::string connectionId;
+ Bindings bindings;
+
void checkDtxTimeout();
bool complete(DeliveryRecord&);
@@ -197,6 +203,7 @@ class SemanticState : private boost::noncopyable {
void requestDispatch();
void cancel(ConsumerImpl::shared_ptr);
void disable(ConsumerImpl::shared_ptr);
+ void unbindSessionBindings();
public:
@@ -271,6 +278,11 @@ class SemanticState : private boost::noncopyable {
void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; }
void record(const DeliveryRecord& delivery);
DtxBufferMap& getSuspendedXids() { return suspendedXids; }
+
+ void addBinding(const std::string& queueName, const std::string& exchangeName,
+ const std::string& routingKey, const framing::FieldTable& arguments);
+ void removeBinding(const std::string& queueName, const std::string& exchangeName,
+ const std::string& routingKey);
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index 0263ff2a58..b679aebbfa 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -154,12 +154,14 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
{
getBroker().bind(queueName, exchangeName, routingKey, arguments,
getConnection().getUserId(), getConnection().getUrl());
+ state.addBinding(queueName, exchangeName, routingKey, arguments);
}
void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
const string& exchangeName,
const string& routingKey)
{
+ state.removeBinding(queueName, exchangeName, routingKey);
getBroker().unbind(queueName, exchangeName, routingKey,
getConnection().getUserId(), getConnection().getUrl());
}
diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp
index c11389bb17..d49464b4e1 100644
--- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp
@@ -179,7 +179,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
}
}
- Binding::shared_ptr binding (new Binding (routingPattern, queue, this, FieldTable(), fedOrigin));
+ Binding::shared_ptr binding (new Binding (routingPattern, queue, this, args ? *args : FieldTable(), fedOrigin));
binding->startManagement();
bk->bindingVector.push_back(binding);
nBindings++;
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 76f7341b4b..8f3eb3bf90 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -534,17 +534,19 @@ void BrokerReplicator::doEventBind(Variant::Map& values) {
exchanges.find(values[EXNAME].asString());
boost::shared_ptr<Queue> queue =
queues.find(values[QNAME].asString());
+ framing::FieldTable args;
+ qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
// We only replicate binds for a replicated queue to replicated
// exchange that both exist locally.
if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
+ queue && replicationTest.replicateLevel(queue->getSettings().storeSettings) &&
+ replicationTest.replicateLevel(args))
{
- framing::FieldTable args;
- qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
- << " key=" << key);
+ << " key=" << key
+ << " args=" << args);
queue->bind(exchange, key, args);
}
}
@@ -559,13 +561,11 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) {
if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
{
- framing::FieldTable args;
- qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
- exchange->unbind(queue, key, &args);
+ exchange->unbind(queue, key, 0);
}
}
@@ -692,16 +692,19 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) {
boost::shared_ptr<Exchange> exchange = exchanges.find(exName);
boost::shared_ptr<Queue> queue = queues.find(qName);
+ framing::FieldTable args;
+ qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+
// Automatically replicate binding if queue and exchange exist and are replicated
if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
+ queue && replicationTest.replicateLevel(queue->getSettings().storeSettings) &&
+ replicationTest.replicateLevel(args))
{
string key = values[BINDING_KEY].asString();
QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName
<< " queue:" << qName
- << " key:" << key);
- framing::FieldTable args;
- qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+ << " key:" << key
+ << " args:" << args);
queue->bind(exchange, key, args);
}
}
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp
index edb50fce9c..55cff046e2 100644
--- a/qpid/cpp/src/tests/MessagingSessionTests.cpp
+++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp
@@ -1196,6 +1196,27 @@ QPID_AUTO_TEST_CASE(testBrowseOnly)
fix.session.acknowledge();
}
+QPID_AUTO_TEST_CASE(testLinkBindingCleanup)
+{
+ MessagingFixture fix;
+
+ Sender sender = fix.session.createSender("test.ex;{create:always,node:{type:topic}}");
+
+ Connection connection = fix.newConnection();
+ connection.open();
+
+ Session session(connection.createSession());
+ Receiver receiver1 = session.createReceiver("test.q;{create:always, node:{type:queue, x-bindings:[{exchange:test.ex,queue:test.q,key:#,arguments:{x-scope:session}}]}}");
+ Receiver receiver2 = fix.session.createReceiver("test.q;{create:never, delete:always}");
+ connection.close();
+
+ sender.send(Message("test-message"), true);
+
+ // The session-scoped binding should be removed when receiver1's network connection is lost
+ Message in;
+ BOOST_CHECK(!receiver2.fetch(in, Duration::IMMEDIATE));
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py
index dcd074eda9..6477c6effd 100755
--- a/qpid/cpp/src/tests/federation.py
+++ b/qpid/cpp/src/tests/federation.py
@@ -2604,3 +2604,109 @@ class FederationTests(TestBase010):
self.verify_cleanup()
+ def test_dynamic_bounce_unbinds_named_queue(self):
+ """ Verify that a propagated binding is removed when the connection is
+ bounced
+ """
+ session = self.session
+
+ # create the federation
+
+ self.startQmf()
+ qmf = self.qmf
+
+ self._setup_brokers()
+
+ # create exchange on each broker, and retrieve the corresponding
+ # management object for that exchange
+
+ exchanges=[]
+ for _b in self._brokers[0:2]:
+ _b.client_session.exchange_declare(exchange="fedX", type="direct")
+ self.assertEqual(_b.client_session.exchange_query(name="fedX").type,
+ "direct", "exchange_declare failed!")
+ # pull the exchange out of qmf...
+ retries = 0
+ my_exchange = None
+ timeout = time() + 10
+ while my_exchange is None and time() <= timeout:
+ objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange")
+ for ooo in objs:
+ if ooo.name == "fedX":
+ my_exchange = ooo
+ break
+ if my_exchange is None:
+ self.fail("QMF failed to find new exchange!")
+ exchanges.append(my_exchange)
+
+ # on the destination broker, create a binding for propagation
+ self._brokers[0].client_session.queue_declare(queue="fedDstQ")
+ self._brokers[0].client_session.exchange_bind(queue="fedDstQ", exchange="fedX", binding_key="spud")
+
+ # on the source broker, create a bridge queue
+ self._brokers[1].client_session.queue_declare(queue="fedSrcQ")
+
+ # connect B1 --> B0
+ result = self._brokers[0].qmf_object.create( "link",
+ "Link-dynamic",
+ {"host":self._brokers[1].host,
+ "port":self._brokers[1].port}, False)
+ self.assertEqual(result.status, 0)
+
+ # bridge the "fedX" exchange:
+ result = self._brokers[0].qmf_object.create("bridge",
+ "Bridge-dynamic",
+ {"link":"Link-dynamic",
+ "src":"fedX",
+ "dest":"fedX",
+ "dynamic":True,
+ "queue":"fedSrcQ"}, False)
+ self.assertEqual(result.status, 0)
+
+ # wait for the inter-broker links to become operational
+ operational = False
+ timeout = time() + 10
+ while not operational and time() <= timeout:
+ operational = True
+ for _l in qmf.getObjects(_class="link"):
+ #print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state)))
+ if _l.state != "Operational":
+ operational = False
+ self.failUnless(operational, "inter-broker links failed to become operational.")
+
+ # wait until the binding key has propagated to the src broker
+ exchanges[1].update()
+ timeout = time() + 10
+ while exchanges[1].bindingCount < 1 and time() <= timeout:
+ exchanges[1].update()
+ self.failUnless(exchanges[1].bindingCount == 1)
+
+ #
+ # Tear down the bridges between the two exchanges, then wait
+ # for the bindings to be cleaned up
+ #
+ for _b in qmf.getObjects(_class="bridge"):
+ result = _b.close()
+ self.assertEqual(result.status, 0)
+ exchanges[1].update()
+ timeout = time() + 10
+ while exchanges[1].bindingCount != 0 and time() <= timeout:
+ exchanges[1].update()
+ self.failUnless(exchanges[1].bindingCount == 0)
+
+ self._brokers[1].client_session.queue_delete(queue="fedSrcQ")
+
+ for _b in qmf.getObjects(_class="bridge"):
+ result = _b.close()
+ self.assertEqual(result.status, 0)
+
+ for _l in qmf.getObjects(_class="link"):
+ result = _l.close()
+ self.assertEqual(result.status, 0)
+
+ for _b in self._brokers[0:2]:
+ _b.client_session.exchange_delete(exchange="fedX")
+
+ self._teardown_brokers()
+
+ self.verify_cleanup()
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 968ffa8b4a..ccb75d9cfd 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -478,6 +478,23 @@ class ReplicationTests(HaBrokerTest):
self.fail("Excpected no-such-queue exception")
except NotFound: pass
+ def test_replicate_binding(self):
+ """Verify that binding replication can be disabled"""
+ primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
+ primary.promote()
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
+ ps = primary.connect().session()
+ ps.sender("ex;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout'}}}")
+ ps.sender("q;{create:always,node:{type:queue,x-declare:{arguments:{'qpid.replicate':all}},x-bindings:[{exchange:'ex',queue:'q',key:'',arguments:{'qpid.replicate':none}}]}}")
+ backup.wait_backup("q")
+
+ primary.kill()
+ assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die
+ backup.promote()
+ bs = backup.connect_admin().session()
+ bs.sender("ex").send(Message("msg"))
+ self.assert_browse_retry(bs, "q", [])
+
def test_invalid_replication(self):
"""Verify that we reject an attempt to declare a queue with invalid replication value."""
cluster = HaCluster(self, 1, ha_replicate="all")