diff options
author | Alan Conway <aconway@apache.org> | 2012-05-16 17:19:48 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-05-16 17:19:48 +0000 |
commit | 107e3d0dbe930dbe1cac9c7a193534eac75dc731 (patch) | |
tree | 4b49336e9c28cd0f3bdcfcc7d86d2e8cc11cd347 | |
parent | 0af8a9d232df62b0104d0e08d9fd8fd855486444 (diff) | |
download | qpid-python-107e3d0dbe930dbe1cac9c7a193534eac75dc731.tar.gz |
QPID-3603: HA don't replicate excluseive, auto-delete, non-timeout queues.
Such queues don't need to be replicated because they are destroyed when
as the owning session disconnects, so won't survive a failover.
This eliminsates managment subscriptio queues from replication.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1339268 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 29 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.h | 3 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 36 |
3 files changed, 53 insertions, 15 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index ee6b6e42fb..60ec4ea59f 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -73,6 +73,8 @@ const string ARGS("args"); const string ARGUMENTS("arguments"); const string AUTODEL("autoDel"); const string AUTODELETE("autoDelete"); +const string EXCL("excl"); +const string EXCLUSIVE("exclusive"); const string BIND("bind"); const string UNBIND("unbind"); const string BINDING("binding"); @@ -278,7 +280,9 @@ void BrokerReplicator::route(Deliverable& msg) { void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { string name = values[QNAME].asString(); Variant::Map argsMap = asMapVoid(values[ARGS]); - if (!haBroker.replicateLevel(argsMap)) return; // Not a replicated queue. + if (!isReplicated( + values[ARGS].asMap(), values[AUTODEL].asBool(), values[EXCL].asBool())) + return; if (values[DISP] == CREATED && haBroker.replicateLevel(argsMap)) { framing::FieldTable args; amqp_0_10::translate(argsMap, args); @@ -286,19 +290,20 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { // The queue was definitely created on the primary. if (broker.getQueues().find(name)) { broker.getQueues().destroy(name); - QPID_LOG(warning, logPrefix << "queue declare event, replaced exsiting: " << name); + QPID_LOG(warning, logPrefix << "queue declare event, replaced exsiting: " + << name); } std::pair<boost::shared_ptr<Queue>, bool> result = broker.createQueue( name, values[DURABLE].asBool(), values[AUTODEL].asBool(), - 0 /*i.e. no owner regardless of exclusivity on master*/, + 0, // no owner regardless of exclusivity on primary values[ALTEX].asString(), args, values[USER].asString(), values[RHOST].asString()); - assert(result.second); + assert(result.second); // Should be true since we destroyed existing queue above QPID_LOG(debug, logPrefix << "queue declare event: " << name); startQueueReplicator(result.first); } @@ -420,7 +425,10 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) { void BrokerReplicator::doResponseQueue(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); - if (!haBroker.replicateLevel(argsMap)) return; + if (!isReplicated(values[ARGUMENTS].asMap(), + values[AUTODELETE].asBool(), + values[EXCLUSIVE].asBool())) + return; framing::FieldTable args; amqp_0_10::translate(argsMap, args); string name(values[NAME].asString()); @@ -522,6 +530,17 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { } } +namespace { +const std::string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout"); +} + +bool BrokerReplicator::isReplicated( + const Variant::Map& args, bool autodelete, bool exclusive) +{ + bool ignore = autodelete && exclusive && args.find(AUTO_DELETE_TIMEOUT) == args.end(); + return haBroker.replicateLevel(args) && !ignore; +} + void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) { if (haBroker.replicateLevel(queue->getSettings()) == ALL) { diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index da9802268a..d2fd23e63d 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -88,7 +88,8 @@ class BrokerReplicator : public broker::Exchange void doResponseHaBroker(types::Variant::Map& values); QueueReplicatorPtr findQueueReplicator(const std::string& qname); - void startQueueReplicator(const boost::shared_ptr<broker::Queue>&); + void startQueueReplicator(const boost::shared_ptr<broker::Queue>&); + bool isReplicated(const types::Variant::Map& args, bool autodelete, bool exclusive); void ready(); LogPrefix logPrefix; diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 3126b757f9..a5c6271ba0 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -171,12 +171,12 @@ def wait_address(session, address): except NotFound: return False assert retry(check), "Timed out waiting for address %s"%(address) -def assert_missing(session, address): - """Assert that the address is _not_ valid""" +def valid_address(session, address): + """Test if an address is valid""" try: session.receiver(address) - self.fail("Expected NotFound: %s"%(address)) - except NotFound: pass + return True + except NotFound: return False class ReplicationTests(BrokerTest): """Correctness tests for HA replication.""" @@ -223,7 +223,7 @@ class ReplicationTests(BrokerTest): self.assert_browse_retry(b, prefix+"q1", ["1", "4"]) self.assert_browse_retry(b, prefix+"q2", []) # configuration only - assert_missing(b, prefix+"q3") + assert not valid_address(b, prefix+"q3") b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"]) b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=configuration @@ -603,9 +603,26 @@ class ReplicationTests(BrokerTest): test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}"); test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}") + def test_auto_delete_exclusive(self): + """Verify that we ignore auto-delete, exclusive, non-auto-delete-timeout queues""" + cluster = HaCluster(self,2) + s = cluster[0].connect().session() + s.receiver("exad;{create:always,node:{x-declare:{exclusive:True,auto-delete:True}}}") + s.receiver("ex;{create:always,node:{x-declare:{exclusive:True}}}") + s.receiver("ad;{create:always,node:{x-declare:{auto-delete:True}}}") + s.receiver("time;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}") + s.receiver("q;{create:always}") + + s = cluster[1].connect_admin().session() + cluster[1].wait_backup("q") + assert not valid_address(s, "exad") + assert valid_address(s, "ex") + assert valid_address(s, "ad") + assert valid_address(s, "time") + def test_recovering(self): """Verify that the primary broker does not go active until expected - backups have connected or timeout expires.""" + backups have connected""" cluster = HaCluster(self, 3, args=["--ha-expected-backups=2"]) c = cluster[0].connect() for i in xrange(10): @@ -691,13 +708,14 @@ class LongTests(BrokerTest): receiver.receiver.assert_running() n = receiver.received # FIXME aconway 2012-05-01: don't kill primary till it's active - # otherwise we can lose messages. This is in lieu of not - # promoting catchup brokers. + # otherwise we can lose messages. When we implement non-promotion + # of catchup brokers we can make this stronger: wait only for + # there to be at least one ready backup. assert retry(brokers[i%3].try_connect, 1) brokers.bounce(i%3) i += 1 def enough(): # Verify we're still running - receiver.check() # Verify no exceptions + receiver.check() # Verify no exceptions return receiver.received > n + 100 assert retry(enough, 1) except: |