summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/ha_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py135
1 files changed, 65 insertions, 70 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index e251e4d8c8..0c8ac569b8 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -72,6 +72,19 @@ class HaBroker(Broker):
def connect_admin(self, **kwargs):
return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
+ def wait_backup(self, address):
+ """Wait for address to become valid on a backup broker."""
+ bs = self.connect_admin().session()
+ try: wait_address(bs, address)
+ finally: bs.connection.close()
+
+ def assert_browse_backup(self, queue, expected, **kwargs):
+ """Combines wait_backup and assert_browse_retry."""
+ bs = self.connect_admin().session()
+ try:
+ wait_address(bs, queue)
+ assert_browse_retry(bs, queue, expected, **kwargs)
+ finally: bs.connection.close()
class HaCluster(object):
_cluster_count = 0
@@ -106,40 +119,23 @@ class HaCluster(object):
def __getitem__(self,index): return self._brokers[index]
def __iter__(self): return self._brokers.__iter__()
-
-class HaTest(BrokerTest):
- """Base class for HA test cases, defines convenience functions"""
-
- # Wait for an address to become valid.
- def wait(self, session, address):
- def check():
- try:
- session.sender(address)
- return True
- except NotFound: return False
- assert retry(check), "Timed out waiting for address %s"%(address)
-
- # Wait for address to become valid on a backup broker.
- def wait_backup(self, backup, address):
- bs = backup.connect_admin().session()
- self.wait(bs, address)
- bs.connection.close()
-
- # Combines wait_backup and assert_browse_retry
- def assert_browse_backup(self, backup, queue, expected, **kwargs):
- bs = backup.connect_admin().session()
- self.wait(bs, queue)
- self.assert_browse_retry(bs, queue, expected, **kwargs)
- bs.connection.close()
-
- def assert_missing(self, session, address):
+def wait_address(session, address):
+ """Wait for an address to become valid."""
+ def check():
try:
- session.receiver(address)
- self.fail("Expected NotFound: %s"%(address))
- except NotFound: pass
-
-
-class ReplicationTests(HaTest):
+ session.sender(address)
+ return True
+ except NotFound: return False
+ assert retry(check), "Timed out waiting for address %s"%(address)
+
+def assert_missing(session, address):
+ """Assert that the address is _not_ valid"""
+ try:
+ session.receiver(address)
+ self.fail("Expected NotFound: %s"%(address))
+ except NotFound: pass
+
+class ReplicationTests(BrokerTest):
"""Correctness tests for HA replication."""
def test_replication(self):
@@ -176,7 +172,7 @@ class ReplicationTests(HaTest):
def verify(b, prefix, p):
"""Verify setup was replicated to backup b"""
# Wait for configuration to replicate.
- self.wait(b, prefix+"x");
+ wait_address(b, prefix+"x");
self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"])
self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "b")
@@ -184,7 +180,7 @@ class ReplicationTests(HaTest):
self.assert_browse_retry(b, prefix+"q1", ["1", "4"])
self.assert_browse_retry(b, prefix+"q2", []) # configuration only
- self.assert_missing(b, prefix+"q3")
+ assert_missing(b, prefix+"q3")
b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all
self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=configuration
@@ -209,7 +205,7 @@ class ReplicationTests(HaTest):
verify(b, "2", p)
# Test a series of messages, enqueue all then dequeue all.
s = p.sender(queue("foo","all"))
- self.wait(b, "foo")
+ wait_address(b, "foo")
msgs = [str(i) for i in range(10)]
for m in msgs: s.send(Message(m))
self.assert_browse_retry(p, "foo", msgs)
@@ -246,10 +242,10 @@ class ReplicationTests(HaTest):
msgs = [str(i) for i in range(30)]
b1 = backup1.connect_admin().session()
- self.wait(b1, "q");
+ wait_address(b1, "q");
self.assert_browse_retry(b1, "q", msgs)
b2 = backup2.connect_admin().session()
- self.wait(b2, "q");
+ wait_address(b2, "q");
self.assert_browse_retry(b2, "q", msgs)
def test_send_receive(self):
@@ -273,8 +269,8 @@ class ReplicationTests(HaTest):
self.assertEqual(receiver.wait(), 0)
expect = [long(i) for i in range(991, 1001)]
sn = lambda m: m.properties["sn"]
- self.assert_browse_backup(brokers[1], "q", expect, transform=sn)
- self.assert_browse_backup(brokers[2], "q", expect, transform=sn)
+ brokers[1].assert_browse_backup("q", expect, transform=sn)
+ brokers[2].assert_browse_backup("q", expect, transform=sn)
def test_failover_python(self):
"""Verify that backups rejects connections and that fail-over works in python client"""
@@ -293,7 +289,7 @@ class ReplicationTests(HaTest):
c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True)
s = c.session()
sender = s.sender("q;{create:always}")
- self.wait_backup(backup, "q")
+ backup.wait_backup("q")
sender.send("foo")
primary.kill()
assert retry(lambda: not is_running(primary.pid))
@@ -308,13 +304,13 @@ class ReplicationTests(HaTest):
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
url="%s,%s"%(primary.host_port(), backup.host_port())
primary.connect().session().sender("q;{create:always}")
- self.wait_backup(backup, "q")
+ backup.wait_backup("q")
sender = NumberedSender(primary, url=url, queue="q", failover_updates = False)
receiver = NumberedReceiver(primary, url=url, queue="q", failover_updates = False)
receiver.start()
sender.start()
- self.wait_backup(backup, "q")
+ backup.wait_backup("q")
assert retry(lambda: receiver.received > 10) # Wait for some messages to get thru
primary.kill()
@@ -329,22 +325,22 @@ class ReplicationTests(HaTest):
"""Verify that a backup broker fails over and recovers queue state"""
brokers = HaCluster(self, 3)
brokers[0].connect().session().sender("q;{create:always}").send("a")
- for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"])
+ for b in brokers[1:]: b.assert_browse_backup("q", ["a"])
brokers[0].expect = EXPECT_EXIT_FAIL
brokers.kill(0)
brokers[1].connect().session().sender("q").send("b")
- self.assert_browse_backup(brokers[2], "q", ["a","b"])
+ brokers[2].assert_browse_backup("q", ["a","b"])
s = brokers[1].connect().session()
self.assertEqual("a", s.receiver("q").fetch().content)
s.acknowledge()
- self.assert_browse_backup(brokers[2], "q", ["b"])
+ brokers[2].assert_browse_backup("q", ["b"])
def test_qpid_config_replication(self):
"""Set up replication via qpid-config"""
brokers = HaCluster(self,2)
brokers[0].config_declare("q","all")
brokers[0].connect().session().sender("q").send("foo")
- self.assert_browse_backup(brokers[1], "q", ["foo"])
+ brokers[1].assert_browse_backup("q", ["foo"])
def test_standalone_queue_replica(self):
"""Test replication of individual queues outside of cluster mode"""
@@ -358,18 +354,18 @@ class ReplicationTests(HaTest):
# Set up replication with qpid-ha
backup.replicate(primary.host_port(), "q")
ps.send("a")
- self.assert_browse_backup(backup, "q", ["a"])
+ backup.assert_browse_backup("q", ["a"])
ps.send("b")
- self.assert_browse_backup(backup, "q", ["a", "b"])
+ backup.assert_browse_backup("q", ["a", "b"])
self.assertEqual("a", pr.fetch().content)
pr.session.acknowledge()
- self.assert_browse_backup(backup, "q", ["b"])
+ backup.assert_browse_backup("q", ["b"])
# Set up replication with qpid-config
ps2 = pc.session().sender("q2;{create:always}")
backup.config_replicate(primary.host_port(), "q2");
ps2.send("x")
- self.assert_browse_backup(backup, "q2", ["x"])
+ backup.assert_browse_backup("q2", ["x"])
def test_queue_replica_failover(self):
@@ -383,15 +379,15 @@ class ReplicationTests(HaTest):
br = backup.connect().session().receiver("q;{create:always}")
backup.replicate(cluster.url, "q")
ps.send("a")
- self.assert_browse_backup(backup, "q", ["a"])
+ backup.assert_browse_backup("q", ["a"])
cluster.bounce(0)
- self.assert_browse_backup(backup, "q", ["a"])
+ backup.assert_browse_backup("q", ["a"])
ps.send("b")
- self.assert_browse_backup(backup, "q", ["a", "b"])
+ backup.assert_browse_backup("q", ["a", "b"])
cluster.bounce(1)
self.assertEqual("a", pr.fetch().content)
pr.session.acknowledge()
- self.assert_browse_backup(backup, "q", ["b"])
+ backup.assert_browse_backup("q", ["b"])
def test_lvq(self):
"""Verify that we replicate to an LVQ correctly"""
@@ -402,13 +398,13 @@ class ReplicationTests(HaTest):
def send(key,value): s.send(Message(content=value,properties={"lvq-key":key}))
for kv in [("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]:
send(*kv)
- self.assert_browse_backup(backup, "lvq", ["b-1", "a-3", "c-2"])
+ backup.assert_browse_backup("lvq", ["b-1", "a-3", "c-2"])
send("b","b-2")
- self.assert_browse_backup(backup, "lvq", ["a-3", "c-2", "b-2"])
+ backup.assert_browse_backup("lvq", ["a-3", "c-2", "b-2"])
send("c","c-3")
- self.assert_browse_backup(backup, "lvq", ["a-3", "b-2", "c-3"])
+ backup.assert_browse_backup("lvq", ["a-3", "b-2", "c-3"])
send("d","d-1")
- self.assert_browse_backup(backup, "lvq", ["a-3", "b-2", "c-3", "d-1"])
+ backup.assert_browse_backup("lvq", ["a-3", "b-2", "c-3", "d-1"])
def test_ring(self):
"""Test replication with the ring queue policy"""
@@ -417,7 +413,7 @@ class ReplicationTests(HaTest):
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5}}}}")
for i in range(10): s.send(Message(str(i)))
- self.assert_browse_backup(backup, "q", [str(i) for i in range(5,10)])
+ backup.assert_browse_backup("q", [str(i) for i in range(5,10)])
def test_reject(self):
"""Test replication with the reject queue policy"""
@@ -428,7 +424,7 @@ class ReplicationTests(HaTest):
try:
for i in range(10): s.send(Message(str(i)), sync=False)
except qpid.messaging.exceptions.TargetCapacityExceeded: pass
- self.assert_browse_backup(backup, "q", [str(i) for i in range(0,5)])
+ backup.assert_browse_backup("q", [str(i) for i in range(0,5)])
def test_priority(self):
"""Verify priority queues replicate correctly"""
@@ -440,7 +436,7 @@ class ReplicationTests(HaTest):
priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
for p in priorities: s.send(Message(priority=p))
# Can't use browse_backup as browser sees messages in delivery order not priority.
- self.wait_backup(backup, "priority-queue")
+ backup.wait_backup("priority-queue")
r = backup.connect_admin().session().receiver("priority-queue")
received = [r.fetch().priority for i in priorities]
self.assertEqual(sorted(priorities, reverse=True), received)
@@ -458,7 +454,7 @@ class ReplicationTests(HaTest):
s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':%s, %s}}}}"%(levels,limit_policy))
messages = [Message(content=str(uuid4()), priority = p) for p in priorities]
for m in messages: s.send(m)
- self.wait_backup(backup, s.target)
+ backup.wait_backup(s.target)
r = backup.connect_admin().session().receiver("priority-queue")
received = [r.fetch().content for i in priorities]
sort = sorted(messages, key=lambda m: priority_level(m.priority, levels), reverse=True)
@@ -478,8 +474,8 @@ class ReplicationTests(HaTest):
# correct result, the uncommented one is for the actualy buggy
# result. See https://issues.apache.org/jira/browse/QPID-3866
#
- # self.assert_browse_backup(backup, "q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
- self.assert_browse_backup(backup, "q", [9,9,9,9,2], transform=lambda m: m.priority)
+ # backup.assert_browse_backup("q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
+ backup.assert_browse_backup("q", [9,9,9,9,2], transform=lambda m: m.priority)
def test_backup_acquired(self):
"""Verify that acquired messages are backed up, for all queue types."""
@@ -497,11 +493,10 @@ class ReplicationTests(HaTest):
s.receiver(self.address).fetch()
def wait(self, brokertest, backup):
- brokertest.wait_backup(backup, self.queue)
+ backup.wait_backup(self.queue)
def verify(self, brokertest, backup):
- brokertest.assert_browse_backup(
- backup, self.queue, self.expect, msg=self.queue)
+ backup.assert_browse_backup(self.queue, self.expect, msg=self.queue)
tests = [
Test("plain",[],range(10)),
@@ -546,7 +541,7 @@ class ReplicationTests(HaTest):
"""Verify that a queue with an invalid qpid.replicate gets default treatment"""
cluster = HaCluster(self, 2, ha_replicate="all")
c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}")
- self.wait_backup(cluster[1], "q")
+ cluster[1].wait_backup("q")
def test_exclusive_queue(self):
"""Ensure that we can back-up exclusive queues, i.e. the replicating
@@ -559,7 +554,7 @@ class ReplicationTests(HaTest):
try: c.session().receiver(addr); self.fail("Expected exclusive exception")
except ReceiverError: pass
s = c.session().sender(q).send(q)
- self.assert_browse_backup(cluster[1], q, [q])
+ cluster[1].assert_browse_backup(q, [q])
test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}");
test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}")