summaryrefslogtreecommitdiff
path: root/cpp/src/tests/ha_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/ha_tests.py')
-rwxr-xr-xcpp/src/tests/ha_tests.py142
1 files changed, 79 insertions, 63 deletions
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py
index e9d44c21e0..10d5fc0db2 100755
--- a/cpp/src/tests/ha_tests.py
+++ b/cpp/src/tests/ha_tests.py
@@ -29,13 +29,17 @@ from qpidtoollibs import BrokerAgent
log = getLogger("qpid.ha-tests")
class HaBroker(Broker):
- def __init__(self, test, args=[], broker_url=None, ha_cluster=True, **kwargs):
+ def __init__(self, test, args=[], broker_url=None, ha_cluster=True,
+ ha_replicate_default="all", **kwargs):
assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
args = copy(args)
- args.extend(["--load-module", BrokerTest.ha_lib,
- # FIXME aconway 2012-02-13: workaround slow link failover.
- "--link-maintenace-interval=0.1",
- "--ha-cluster=%s"%ha_cluster])
+ args += ["--load-module", BrokerTest.ha_lib,
+ "--log-enable=info+", "--log-enable=debug+:ha::",
+ # FIXME aconway 2012-02-13: workaround slow link failover.
+ "--link-maintenace-interval=0.1",
+ "--ha-cluster=%s"%ha_cluster]
+ if ha_replicate_default is not None:
+ args += [ "--ha-replicate-default=%s"%ha_replicate_default ]
if broker_url: args.extend([ "--ha-brokers", broker_url ])
Broker.__init__(self, test, args, **kwargs)
self.commands=os.getenv("PYTHON_COMMANDS")
@@ -64,6 +68,10 @@ class HaBroker(Broker):
assert os.system(
"%s/qpid-config --broker=%s add queue %s --replicate %s"%(self.commands, self.host_port(), queue, replication)) == 0
+ def connect_admin(self, **kwargs):
+ return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
+
+
class HaCluster(object):
_cluster_count = 0
@@ -72,9 +80,9 @@ class HaCluster(object):
self.test = test
self._brokers = [ HaBroker(test, name="broker%s-%s"%(HaCluster._cluster_count, i), **kwargs) for i in xrange(n)]
HaCluster._cluster_count += 1
- self[0].promote()
self.url = ",".join([b.host_port() for b in self])
for b in self: b.set_broker_url(self.url)
+ self[0].promote()
def connect(self, i):
"""Connect with reconnect_urls"""
@@ -98,8 +106,6 @@ class HaCluster(object):
def __iter__(self): return self._brokers.__iter__()
-def qr_node(value="all"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
-
class HaTest(BrokerTest):
"""Base class for HA test cases, defines convenience functions"""
@@ -114,13 +120,13 @@ class HaTest(BrokerTest):
# Wait for address to become valid on a backup broker.
def wait_backup(self, backup, address):
- bs = self.connect_admin(backup).session()
+ bs = backup.connect_admin().session()
self.wait(bs, address)
bs.connection.close()
# Combines wait_backup and assert_browse_retry
def assert_browse_backup(self, backup, queue, expected, **kwargs):
- bs = self.connect_admin(backup).session()
+ bs = backup.connect_admin().session()
self.wait(bs, queue)
self.assert_browse_retry(bs, queue, expected, **kwargs)
bs.connection.close()
@@ -128,12 +134,9 @@ class HaTest(BrokerTest):
def assert_missing(self, session, address):
try:
session.receiver(address)
- self.fail("Should not have been replicated: %s"%(address))
+ self.fail("Expected NotFound: %s"%(address))
except NotFound: pass
- def connect_admin(self, backup, **kwargs):
- """Connect to a backup broker as an admin connection"""
- return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs)
class ReplicationTests(HaTest):
"""Correctness tests for HA replication."""
@@ -173,7 +176,6 @@ class ReplicationTests(HaTest):
def verify(b, prefix, p):
"""Verify setup was replicated to backup b"""
-
# Wait for configuration to replicate.
self.wait(b, prefix+"x");
self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"])
@@ -203,7 +205,7 @@ class ReplicationTests(HaTest):
setup(p, "2", primary)
# Verify the data on the backup
- b = self.connect_admin(backup).session()
+ b = backup.connect_admin().session()
verify(b, "1", p)
verify(b, "2", p)
# Test a series of messages, enqueue all then dequeue all.
@@ -230,12 +232,10 @@ class ReplicationTests(HaTest):
self.assert_browse_retry(b, "foo", msgs[i+1:])
def test_sync(self):
- def queue(name, replicate):
- return "%s;{create:always,%s}"%(name, qr_node(replicate))
primary = HaBroker(self, name="primary")
primary.promote()
p = primary.connect().session()
- s = p.sender(queue("q","all"))
+ s = p.sender("q;{create:always}")
for m in [str(i) for i in range(0,10)]: s.send(m)
s.sync()
backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
@@ -246,46 +246,37 @@ class ReplicationTests(HaTest):
s.sync()
msgs = [str(i) for i in range(30)]
- b1 = self.connect_admin(backup1).session()
+ b1 = backup1.connect_admin().session()
self.wait(b1, "q");
self.assert_browse_retry(b1, "q", msgs)
- b2 = self.connect_admin(backup2).session()
+ b2 = backup2.connect_admin().session()
self.wait(b2, "q");
self.assert_browse_retry(b2, "q", msgs)
def test_send_receive(self):
"""Verify sequence numbers of messages sent by qpid-send"""
+ brokers = HaCluster(self, 3)
getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
- primary = HaBroker(self, name="primary")
- primary.promote()
- backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
- backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port())
sender = self.popen(
["qpid-send",
- "--broker", primary.host_port(),
- "--address", "q;{create:always,%s}"%(qr_node("all")),
+ "--broker", brokers[0].host_port(),
+ "--address", "q;{create:always}",
"--messages=1000",
"--content-string=x"
])
receiver = self.popen(
["qpid-receive",
- "--broker", primary.host_port(),
- "--address", "q;{create:always,%s}"%(qr_node("all")),
+ "--broker", brokers[0].host_port(),
+ "--address", "q;{create:always}",
"--messages=990",
"--timeout=10"
])
- try:
- self.assertEqual(sender.wait(), 0)
- self.assertEqual(receiver.wait(), 0)
- expect = [long(i) for i in range(991, 1001)]
- sn = lambda m: m.properties["sn"]
- self.assert_browse_retry(self.connect_admin(backup1).session(), "q", expect, transform=sn)
- self.assert_browse_retry(self.connect_admin(backup2).session(), "q", expect, transform=sn)
- except:
- print self.browse(primary.connect().session(), "q", transform=sn)
- print self.browse(self.connect_admin(backup1).session(), "q", transform=sn)
- print self.browse(self.connect_admin(backup2).session(), "q", transform=sn)
- raise
+ self.assertEqual(sender.wait(), 0)
+ self.assertEqual(receiver.wait(), 0)
+ expect = [long(i) for i in range(991, 1001)]
+ sn = lambda m: m.properties["sn"]
+ self.assert_browse_backup(brokers[1], "q", expect, transform=sn)
+ self.assert_browse_backup(brokers[2], "q", expect, transform=sn)
def test_failover_python(self):
"""Verify that backups rejects connections and that fail-over works in python client"""
@@ -299,12 +290,12 @@ class ReplicationTests(HaTest):
self.fail("Expected connection to backup to fail")
except ConnectionError: pass
# Check that admin connections are allowed to backup.
- self.connect_admin(backup).close()
+ backup.connect_admin().close()
# Test discovery: should connect to primary after reject by backup
c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True)
s = c.session()
- sender = s.sender("q;{create:always,%s}"%(qr_node()))
+ sender = s.sender("q;{create:always}")
self.wait_backup(backup, "q")
sender.send("foo")
primary.kill()
@@ -319,7 +310,7 @@ class ReplicationTests(HaTest):
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
url="%s,%s"%(primary.host_port(), backup.host_port())
- primary.connect().session().sender("q;{create:always,%s}"%(qr_node()))
+ primary.connect().session().sender("q;{create:always}")
self.wait_backup(backup, "q")
sender = NumberedSender(primary, url=url, queue="q", failover_updates = False)
@@ -340,8 +331,7 @@ class ReplicationTests(HaTest):
def test_backup_failover(self):
"""Verify that a backup broker fails over and recovers queue state"""
brokers = HaCluster(self, 3)
- brokers[0].connect().session().sender(
- "q;{create:always,%s}"%(qr_node())).send("a")
+ brokers[0].connect().session().sender("q;{create:always}").send("a")
for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"])
brokers[0].expect = EXPECT_EXIT_FAIL
brokers.kill(0)
@@ -362,11 +352,11 @@ class ReplicationTests(HaTest):
def test_standalone_queue_replica(self):
"""Test replication of individual queues outside of cluster mode"""
getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
- primary = HaBroker(self, name="primary", ha_cluster=False, args=["--log-enable=debug+"])
+ primary = HaBroker(self, name="primary", ha_cluster=False)
pc = primary.connect()
ps = pc.session().sender("q;{create:always}")
pr = pc.session().receiver("q;{create:always}")
- backup = HaBroker(self, name="backup", ha_cluster=False, args=["--log-enable=debug+"])
+ backup = HaBroker(self, name="backup", ha_cluster=False)
br = backup.connect().session().receiver("q;{create:always}")
# Set up replication with qpid-ha
@@ -392,9 +382,9 @@ class ReplicationTests(HaTest):
cluster = HaCluster(self, 2)
primary = cluster[0]
pc = cluster.connect(0)
- ps = pc.session().sender("q;{create:always,%s}"%qr_node("all"))
- pr = pc.session().receiver("q;{create:always,%s}"%qr_node("all"))
- backup = HaBroker(self, name="backup", ha_cluster=False, args=["--log-enable=debug+"])
+ ps = pc.session().sender("q;{create:always}")
+ pr = pc.session().receiver("q;{create:always}")
+ backup = HaBroker(self, name="backup", ha_cluster=False)
br = backup.connect().session().receiver("q;{create:always}")
backup.replicate(cluster.url, "q")
ps.send("a")
@@ -413,7 +403,7 @@ class ReplicationTests(HaTest):
primary = HaBroker(self, name="primary")
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
- s = primary.connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key, 'qpid.replicate':all}}}}")
+ s = primary.connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}")
def send(key,value): s.send(Message(content=value,properties={"lvq-key":key}))
for kv in [("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]:
send(*kv)
@@ -426,19 +416,21 @@ class ReplicationTests(HaTest):
self.assert_browse_backup(backup, "lvq", ["a-3", "b-2", "c-3", "d-1"])
def test_ring(self):
+ """Test replication with the ring queue policy"""
primary = HaBroker(self, name="primary")
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
- s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.replicate':all}}}}")
+ s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5}}}}")
for i in range(10): s.send(Message(str(i)))
self.assert_browse_backup(backup, "q", [str(i) for i in range(5,10)])
def test_reject(self):
+ """Test replication with the reject queue policy"""
getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
primary = HaBroker(self, name="primary")
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
- s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5, 'qpid.replicate':all}}}}")
+ s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5}}}}")
try:
for i in range(10): s.send(Message(str(i)), sync=False)
except qpid.messaging.exceptions.TargetCapacityExceeded: pass
@@ -450,12 +442,12 @@ class ReplicationTests(HaTest):
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
session = primary.connect().session()
- s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10, 'qpid.replicate':all}}}}")
+ s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10}}}}")
priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
for p in priorities: s.send(Message(priority=p))
# Can't use browse_backup as browser sees messages in delivery order not priority.
self.wait_backup(backup, "priority-queue")
- r = self.connect_admin(backup).session().receiver("priority-queue")
+ r = backup.connect_admin().session().receiver("priority-queue")
received = [r.fetch().priority for i in priorities]
self.assertEqual(sorted(priorities, reverse=True), received)
@@ -469,11 +461,11 @@ class ReplicationTests(HaTest):
priorities = [4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3]
limits={7:0,6:4,5:3,4:2,3:2,2:2,1:2}
limit_policy = ",".join(["'qpid.fairshare':5"] + ["'qpid.fairshare-%s':%s"%(i[0],i[1]) for i in limits.iteritems()])
- s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':%s, %s, 'qpid.replicate':all}}}}"%(levels,limit_policy))
+ s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':%s, %s}}}}"%(levels,limit_policy))
messages = [Message(content=str(uuid4()), priority = p) for p in priorities]
for m in messages: s.send(m)
self.wait_backup(backup, s.target)
- r = self.connect_admin(backup).session().receiver("priority-queue")
+ r = backup.connect_admin().session().receiver("priority-queue")
received = [r.fetch().content for i in priorities]
sort = sorted(messages, key=lambda m: priority_level(m.priority, levels), reverse=True)
fair = [m.content for m in fairshare(sort, lambda l: limits.get(l,0), levels)]
@@ -483,13 +475,14 @@ class ReplicationTests(HaTest):
primary = HaBroker(self, name="primary")
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
- s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10, 'qpid.replicate':all}}}}")
+ s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10}}}}")
priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
for p in priorities: s.send(Message(priority=p))
- # FIXME aconway 2012-02-22: there is a bug in priority ring queues that allows a low
- # priority message to displace a high one. The following commented-out assert_browse
- # is for the correct result, the uncommented one is for the actualy buggy result.
- # See https://issues.apache.org/jira/browse/QPID-3866
+ # FIXME aconway 2012-02-22: there is a bug in priority ring
+ # queues that allows a low priority message to displace a high
+ # one. The following commented-out assert_browse is for the
+ # correct result, the uncommented one is for the actualy buggy
+ # result. See https://issues.apache.org/jira/browse/QPID-3866
#
# self.assert_browse_backup(backup, "q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
self.assert_browse_backup(backup, "q", [9,9,9,9,2], transform=lambda m: m.priority)
@@ -539,6 +532,29 @@ class ReplicationTests(HaTest):
for t in tests: t.verify(self, backup1)
for t in tests: t.verify(self, backup2)
+ def test_replicate_default(self):
+ """Make sure we don't replicate if ha-replicate-default is unspecified or none"""
+ cluster1 = HaCluster(self, 2, ha_replicate_default=None)
+ c1 = cluster1[0].connect().session().sender("q;{create:always}")
+ cluster2 = HaCluster(self, 2, ha_replicate_default="none")
+ cluster2[0].connect().session().sender("q;{create:always}")
+ time.sleep(.1) # Give replication a chance.
+ try:
+ cluster1[1].connect_admin().session().receiver("q")
+ self.fail("Excpected no-such-queue exception")
+ except NotFound: pass
+ try:
+ cluster2[1].connect_admin().session().receiver("q")
+ self.fail("Excpected no-such-queue exception")
+ except NotFound: pass
+
+ def test_invalid_default(self):
+ """Verify that a queue with an invalid qpid.replicate gets default treatment"""
+ cluster = HaCluster(self, 2, ha_replicate_default="all")
+ c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}")
+ self.wait_backup(cluster[1], "q")
+
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit