diff options
Diffstat (limited to 'cpp/src/tests/ha_tests.py')
-rwxr-xr-x | cpp/src/tests/ha_tests.py | 52 |
1 files changed, 50 insertions, 2 deletions
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py index 822e07c702..e9d44c21e0 100755 --- a/cpp/src/tests/ha_tests.py +++ b/cpp/src/tests/ha_tests.py @@ -100,8 +100,8 @@ class HaCluster(object): def qr_node(value="all"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value -class ShortTests(BrokerTest): - """Short HA functionality tests.""" +class HaTest(BrokerTest): + """Base class for HA test cases, defines convenience functions""" # Wait for an address to become valid. def wait(self, session, address): @@ -135,6 +135,9 @@ class ShortTests(BrokerTest): """Connect to a backup broker as an admin connection""" return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs) +class ReplicationTests(HaTest): + """Correctness tests for HA replication.""" + def test_replication(self): """Test basic replication of configuration and messages before and after backup has connected""" @@ -491,6 +494,51 @@ class ShortTests(BrokerTest): # self.assert_browse_backup(backup, "q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority) self.assert_browse_backup(backup, "q", [9,9,9,9,2], transform=lambda m: m.priority) + def test_backup_acquired(self): + """Verify that acquired messages are backed up, for all queue types.""" + class Test: + def __init__(self, queue, arguments, expect): + self.queue = queue + self.address = "%s;{create:always,node:{x-declare:{arguments:{%s}}}}"%( + self.queue, ",".join(arguments + ["'qpid.replicate':all"])) + self.expect = [str(i) for i in expect] + + def send(self, connection): + """Send messages, then acquire one but don't acknowledge""" + s = connection.session() + for m in range(10): s.sender(self.address).send(str(m)) + s.receiver(self.address).fetch() + + def wait(self, brokertest, backup): + brokertest.wait_backup(backup, self.queue) + + def verify(self, brokertest, backup): + brokertest.assert_browse_backup( + backup, self.queue, self.expect, msg=self.queue) + + tests = [ + Test("plain",[],range(10)), + Test("ring", ["'qpid.policy_type':ring", "'qpid.max_count':5"], range(5,10)), + Test("priority",["'qpid.priorities':10"], range(10)), + Test("fairshare", ["'qpid.priorities':10,'qpid.fairshare':5"], range(10)), + Test("lvq", ["'qpid.last_value_queue_key':lvq-key"], [9]) + ] + + primary = HaBroker(self, name="primary") + primary.promote() + backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) + c = primary.connect() + for t in tests: t.send(c) # Send messages, leave one unacknowledged. + + backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port()) + # Wait for backups to catch up. + for t in tests: + t.wait(self, backup1) + t.wait(self, backup2) + # Verify acquired message was replicated + for t in tests: t.verify(self, backup1) + for t in tests: t.verify(self, backup2) + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit |