summaryrefslogtreecommitdiff
path: root/cpp/src/tests/ha_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/ha_tests.py')
-rwxr-xr-xcpp/src/tests/ha_tests.py52
1 files changed, 50 insertions, 2 deletions
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py
index 822e07c702..e9d44c21e0 100755
--- a/cpp/src/tests/ha_tests.py
+++ b/cpp/src/tests/ha_tests.py
@@ -100,8 +100,8 @@ class HaCluster(object):
def qr_node(value="all"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
-class ShortTests(BrokerTest):
- """Short HA functionality tests."""
+class HaTest(BrokerTest):
+ """Base class for HA test cases, defines convenience functions"""
# Wait for an address to become valid.
def wait(self, session, address):
@@ -135,6 +135,9 @@ class ShortTests(BrokerTest):
"""Connect to a backup broker as an admin connection"""
return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs)
+class ReplicationTests(HaTest):
+ """Correctness tests for HA replication."""
+
def test_replication(self):
"""Test basic replication of configuration and messages before and
after backup has connected"""
@@ -491,6 +494,51 @@ class ShortTests(BrokerTest):
# self.assert_browse_backup(backup, "q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
self.assert_browse_backup(backup, "q", [9,9,9,9,2], transform=lambda m: m.priority)
+ def test_backup_acquired(self):
+ """Verify that acquired messages are backed up, for all queue types."""
+ class Test:
+ def __init__(self, queue, arguments, expect):
+ self.queue = queue
+ self.address = "%s;{create:always,node:{x-declare:{arguments:{%s}}}}"%(
+ self.queue, ",".join(arguments + ["'qpid.replicate':all"]))
+ self.expect = [str(i) for i in expect]
+
+ def send(self, connection):
+ """Send messages, then acquire one but don't acknowledge"""
+ s = connection.session()
+ for m in range(10): s.sender(self.address).send(str(m))
+ s.receiver(self.address).fetch()
+
+ def wait(self, brokertest, backup):
+ brokertest.wait_backup(backup, self.queue)
+
+ def verify(self, brokertest, backup):
+ brokertest.assert_browse_backup(
+ backup, self.queue, self.expect, msg=self.queue)
+
+ tests = [
+ Test("plain",[],range(10)),
+ Test("ring", ["'qpid.policy_type':ring", "'qpid.max_count':5"], range(5,10)),
+ Test("priority",["'qpid.priorities':10"], range(10)),
+ Test("fairshare", ["'qpid.priorities':10,'qpid.fairshare':5"], range(10)),
+ Test("lvq", ["'qpid.last_value_queue_key':lvq-key"], [9])
+ ]
+
+ primary = HaBroker(self, name="primary")
+ primary.promote()
+ backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
+ c = primary.connect()
+ for t in tests: t.send(c) # Send messages, leave one unacknowledged.
+
+ backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port())
+ # Wait for backups to catch up.
+ for t in tests:
+ t.wait(self, backup1)
+ t.wait(self, backup2)
+ # Verify acquired message was replicated
+ for t in tests: t.verify(self, backup1)
+ for t in tests: t.verify(self, backup2)
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit