summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-05-16 17:19:48 +0000
committerAlan Conway <aconway@apache.org>2012-05-16 17:19:48 +0000
commit107e3d0dbe930dbe1cac9c7a193534eac75dc731 (patch)
tree4b49336e9c28cd0f3bdcfcc7d86d2e8cc11cd347
parent0af8a9d232df62b0104d0e08d9fd8fd855486444 (diff)
downloadqpid-python-107e3d0dbe930dbe1cac9c7a193534eac75dc731.tar.gz
QPID-3603: HA don't replicate excluseive, auto-delete, non-timeout queues.
Such queues don't need to be replicated because they are destroyed when as the owning session disconnects, so won't survive a failover. This eliminsates managment subscriptio queues from replication. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1339268 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp29
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h3
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py36
3 files changed, 53 insertions, 15 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index ee6b6e42fb..60ec4ea59f 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -73,6 +73,8 @@ const string ARGS("args");
const string ARGUMENTS("arguments");
const string AUTODEL("autoDel");
const string AUTODELETE("autoDelete");
+const string EXCL("excl");
+const string EXCLUSIVE("exclusive");
const string BIND("bind");
const string UNBIND("unbind");
const string BINDING("binding");
@@ -278,7 +280,9 @@ void BrokerReplicator::route(Deliverable& msg) {
void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
string name = values[QNAME].asString();
Variant::Map argsMap = asMapVoid(values[ARGS]);
- if (!haBroker.replicateLevel(argsMap)) return; // Not a replicated queue.
+ if (!isReplicated(
+ values[ARGS].asMap(), values[AUTODEL].asBool(), values[EXCL].asBool()))
+ return;
if (values[DISP] == CREATED && haBroker.replicateLevel(argsMap)) {
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
@@ -286,19 +290,20 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
// The queue was definitely created on the primary.
if (broker.getQueues().find(name)) {
broker.getQueues().destroy(name);
- QPID_LOG(warning, logPrefix << "queue declare event, replaced exsiting: " << name);
+ QPID_LOG(warning, logPrefix << "queue declare event, replaced exsiting: "
+ << name);
}
std::pair<boost::shared_ptr<Queue>, bool> result =
broker.createQueue(
name,
values[DURABLE].asBool(),
values[AUTODEL].asBool(),
- 0 /*i.e. no owner regardless of exclusivity on master*/,
+ 0, // no owner regardless of exclusivity on primary
values[ALTEX].asString(),
args,
values[USER].asString(),
values[RHOST].asString());
- assert(result.second);
+ assert(result.second); // Should be true since we destroyed existing queue above
QPID_LOG(debug, logPrefix << "queue declare event: " << name);
startQueueReplicator(result.first);
}
@@ -420,7 +425,10 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) {
void BrokerReplicator::doResponseQueue(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
- if (!haBroker.replicateLevel(argsMap)) return;
+ if (!isReplicated(values[ARGUMENTS].asMap(),
+ values[AUTODELETE].asBool(),
+ values[EXCLUSIVE].asBool()))
+ return;
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
string name(values[NAME].asString());
@@ -522,6 +530,17 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
}
}
+namespace {
+const std::string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout");
+}
+
+bool BrokerReplicator::isReplicated(
+ const Variant::Map& args, bool autodelete, bool exclusive)
+{
+ bool ignore = autodelete && exclusive && args.find(AUTO_DELETE_TIMEOUT) == args.end();
+ return haBroker.replicateLevel(args) && !ignore;
+}
+
void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue)
{
if (haBroker.replicateLevel(queue->getSettings()) == ALL) {
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index da9802268a..d2fd23e63d 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -88,7 +88,8 @@ class BrokerReplicator : public broker::Exchange
void doResponseHaBroker(types::Variant::Map& values);
QueueReplicatorPtr findQueueReplicator(const std::string& qname);
- void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
+ void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
+ bool isReplicated(const types::Variant::Map& args, bool autodelete, bool exclusive);
void ready();
LogPrefix logPrefix;
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 3126b757f9..a5c6271ba0 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -171,12 +171,12 @@ def wait_address(session, address):
except NotFound: return False
assert retry(check), "Timed out waiting for address %s"%(address)
-def assert_missing(session, address):
- """Assert that the address is _not_ valid"""
+def valid_address(session, address):
+ """Test if an address is valid"""
try:
session.receiver(address)
- self.fail("Expected NotFound: %s"%(address))
- except NotFound: pass
+ return True
+ except NotFound: return False
class ReplicationTests(BrokerTest):
"""Correctness tests for HA replication."""
@@ -223,7 +223,7 @@ class ReplicationTests(BrokerTest):
self.assert_browse_retry(b, prefix+"q1", ["1", "4"])
self.assert_browse_retry(b, prefix+"q2", []) # configuration only
- assert_missing(b, prefix+"q3")
+ assert not valid_address(b, prefix+"q3")
b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all
self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=configuration
@@ -603,9 +603,26 @@ class ReplicationTests(BrokerTest):
test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}");
test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}")
+ def test_auto_delete_exclusive(self):
+ """Verify that we ignore auto-delete, exclusive, non-auto-delete-timeout queues"""
+ cluster = HaCluster(self,2)
+ s = cluster[0].connect().session()
+ s.receiver("exad;{create:always,node:{x-declare:{exclusive:True,auto-delete:True}}}")
+ s.receiver("ex;{create:always,node:{x-declare:{exclusive:True}}}")
+ s.receiver("ad;{create:always,node:{x-declare:{auto-delete:True}}}")
+ s.receiver("time;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}")
+ s.receiver("q;{create:always}")
+
+ s = cluster[1].connect_admin().session()
+ cluster[1].wait_backup("q")
+ assert not valid_address(s, "exad")
+ assert valid_address(s, "ex")
+ assert valid_address(s, "ad")
+ assert valid_address(s, "time")
+
def test_recovering(self):
"""Verify that the primary broker does not go active until expected
- backups have connected or timeout expires."""
+ backups have connected"""
cluster = HaCluster(self, 3, args=["--ha-expected-backups=2"])
c = cluster[0].connect()
for i in xrange(10):
@@ -691,13 +708,14 @@ class LongTests(BrokerTest):
receiver.receiver.assert_running()
n = receiver.received
# FIXME aconway 2012-05-01: don't kill primary till it's active
- # otherwise we can lose messages. This is in lieu of not
- # promoting catchup brokers.
+ # otherwise we can lose messages. When we implement non-promotion
+ # of catchup brokers we can make this stronger: wait only for
+ # there to be at least one ready backup.
assert retry(brokers[i%3].try_connect, 1)
brokers.bounce(i%3)
i += 1
def enough(): # Verify we're still running
- receiver.check() # Verify no exceptions
+ receiver.check() # Verify no exceptions
return receiver.received > n + 100
assert retry(enough, 1)
except: