summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-04-17 15:16:41 +0000
committerAlan Conway <aconway@apache.org>2012-04-17 15:16:41 +0000
commit07aad7e3cac4568f2226b6e2bec00e9668de9cb4 (patch)
treec5e993c21a61c77bb0c58e2f6eee669613a2ded2
parent95e371124817b222fc53df491469df0e8503d24c (diff)
downloadqpid-python-07aad7e3cac4568f2226b6e2bec00e9668de9cb4.tar.gz
NO-JIRA: Minor code clean-up in brokertest.py and ha_tests.py.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1327137 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/tests/brokertest.py59
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py135
2 files changed, 97 insertions, 97 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index ccf25f35b5..1c73e20cbb 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -436,6 +436,35 @@ class Cluster:
def __getitem__(self,index): return self._brokers[index]
def __iter__(self): return self._brokers.__iter__()
+
+def browse(session, queue, timeout=0, transform=lambda m: m.content):
+ """Return a list with the contents of each message on queue."""
+ r = session.receiver("%s;{mode:browse}"%(queue))
+ r.capacity = 100
+ try:
+ contents = []
+ try:
+ while True: contents.append(transform(r.fetch(timeout=timeout)))
+ except messaging.Empty: pass
+ finally: r.close()
+ return contents
+
+def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg="browse failed"):
+ """Assert that the contents of messages on queue (as retrieved
+ using session and timeout) exactly match the strings in
+ expect_contents"""
+ actual_contents = browse(session, queue, timeout, transform=transform)
+ if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
+ assert expect_contents == actual_contents, "%s: %s != %s"%(msg, expect, actual)
+
+def assert_browse_retry(session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content, msg="browse failed"):
+ """Wait up to timeout for contents of queue to match expect_contents"""
+ test = lambda: browse(session, queue, 0, transform=transform) == expect_contents
+ retry(test, timeout, delay)
+ actual_contents = browse(session, queue, 0, transform=transform)
+ if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
+ assert expect_contents == actual_contents, "%s: %s != %s"%(msg, expect, actual)
+
class BrokerTest(TestCase):
"""
Tracks processes started by test and kills at end of test.
@@ -501,33 +530,9 @@ class BrokerTest(TestCase):
cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd)
return cluster
- def browse(self, session, queue, timeout=0, transform=lambda m: m.content):
- """Return a list with the contents of each message on queue."""
- r = session.receiver("%s;{mode:browse}"%(queue))
- r.capacity = 100
- try:
- contents = []
- try:
- while True: contents.append(transform(r.fetch(timeout=timeout)))
- except messaging.Empty: pass
- finally: r.close()
- return contents
-
- def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg=None):
- """Assert that the contents of messages on queue (as retrieved
- using session and timeout) exactly match the strings in
- expect_contents"""
- actual_contents = self.browse(session, queue, timeout, transform=transform)
- if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
- self.assertEqual(expect_contents, actual_contents, msg)
-
- def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content, msg=None):
- """Wait up to timeout for contents of queue to match expect_contents"""
- test = lambda: self.browse(session, queue, 0, transform=transform) == expect_contents
- retry(test, timeout, delay)
- actual_contents = self.browse(session, queue, 0, transform=transform)
- if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
- self.assertEqual(expect_contents, actual_contents, msg)
+ def browse(self, *args, **kwargs): browse(*args, **kwargs)
+ def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs)
+ def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs)
def join(thread, timeout=10):
thread.join(timeout)
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index e251e4d8c8..0c8ac569b8 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -72,6 +72,19 @@ class HaBroker(Broker):
def connect_admin(self, **kwargs):
return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
+ def wait_backup(self, address):
+ """Wait for address to become valid on a backup broker."""
+ bs = self.connect_admin().session()
+ try: wait_address(bs, address)
+ finally: bs.connection.close()
+
+ def assert_browse_backup(self, queue, expected, **kwargs):
+ """Combines wait_backup and assert_browse_retry."""
+ bs = self.connect_admin().session()
+ try:
+ wait_address(bs, queue)
+ assert_browse_retry(bs, queue, expected, **kwargs)
+ finally: bs.connection.close()
class HaCluster(object):
_cluster_count = 0
@@ -106,40 +119,23 @@ class HaCluster(object):
def __getitem__(self,index): return self._brokers[index]
def __iter__(self): return self._brokers.__iter__()
-
-class HaTest(BrokerTest):
- """Base class for HA test cases, defines convenience functions"""
-
- # Wait for an address to become valid.
- def wait(self, session, address):
- def check():
- try:
- session.sender(address)
- return True
- except NotFound: return False
- assert retry(check), "Timed out waiting for address %s"%(address)
-
- # Wait for address to become valid on a backup broker.
- def wait_backup(self, backup, address):
- bs = backup.connect_admin().session()
- self.wait(bs, address)
- bs.connection.close()
-
- # Combines wait_backup and assert_browse_retry
- def assert_browse_backup(self, backup, queue, expected, **kwargs):
- bs = backup.connect_admin().session()
- self.wait(bs, queue)
- self.assert_browse_retry(bs, queue, expected, **kwargs)
- bs.connection.close()
-
- def assert_missing(self, session, address):
+def wait_address(session, address):
+ """Wait for an address to become valid."""
+ def check():
try:
- session.receiver(address)
- self.fail("Expected NotFound: %s"%(address))
- except NotFound: pass
-
-
-class ReplicationTests(HaTest):
+ session.sender(address)
+ return True
+ except NotFound: return False
+ assert retry(check), "Timed out waiting for address %s"%(address)
+
+def assert_missing(session, address):
+ """Assert that the address is _not_ valid"""
+ try:
+ session.receiver(address)
+ self.fail("Expected NotFound: %s"%(address))
+ except NotFound: pass
+
+class ReplicationTests(BrokerTest):
"""Correctness tests for HA replication."""
def test_replication(self):
@@ -176,7 +172,7 @@ class ReplicationTests(HaTest):
def verify(b, prefix, p):
"""Verify setup was replicated to backup b"""
# Wait for configuration to replicate.
- self.wait(b, prefix+"x");
+ wait_address(b, prefix+"x");
self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"])
self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "b")
@@ -184,7 +180,7 @@ class ReplicationTests(HaTest):
self.assert_browse_retry(b, prefix+"q1", ["1", "4"])
self.assert_browse_retry(b, prefix+"q2", []) # configuration only
- self.assert_missing(b, prefix+"q3")
+ assert_missing(b, prefix+"q3")
b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all
self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=configuration
@@ -209,7 +205,7 @@ class ReplicationTests(HaTest):
verify(b, "2", p)
# Test a series of messages, enqueue all then dequeue all.
s = p.sender(queue("foo","all"))
- self.wait(b, "foo")
+ wait_address(b, "foo")
msgs = [str(i) for i in range(10)]
for m in msgs: s.send(Message(m))
self.assert_browse_retry(p, "foo", msgs)
@@ -246,10 +242,10 @@ class ReplicationTests(HaTest):
msgs = [str(i) for i in range(30)]
b1 = backup1.connect_admin().session()
- self.wait(b1, "q");
+ wait_address(b1, "q");
self.assert_browse_retry(b1, "q", msgs)
b2 = backup2.connect_admin().session()
- self.wait(b2, "q");
+ wait_address(b2, "q");
self.assert_browse_retry(b2, "q", msgs)
def test_send_receive(self):
@@ -273,8 +269,8 @@ class ReplicationTests(HaTest):
self.assertEqual(receiver.wait(), 0)
expect = [long(i) for i in range(991, 1001)]
sn = lambda m: m.properties["sn"]
- self.assert_browse_backup(brokers[1], "q", expect, transform=sn)
- self.assert_browse_backup(brokers[2], "q", expect, transform=sn)
+ brokers[1].assert_browse_backup("q", expect, transform=sn)
+ brokers[2].assert_browse_backup("q", expect, transform=sn)
def test_failover_python(self):
"""Verify that backups rejects connections and that fail-over works in python client"""
@@ -293,7 +289,7 @@ class ReplicationTests(HaTest):
c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True)
s = c.session()
sender = s.sender("q;{create:always}")
- self.wait_backup(backup, "q")
+ backup.wait_backup("q")
sender.send("foo")
primary.kill()
assert retry(lambda: not is_running(primary.pid))
@@ -308,13 +304,13 @@ class ReplicationTests(HaTest):
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
url="%s,%s"%(primary.host_port(), backup.host_port())
primary.connect().session().sender("q;{create:always}")
- self.wait_backup(backup, "q")
+ backup.wait_backup("q")
sender = NumberedSender(primary, url=url, queue="q", failover_updates = False)
receiver = NumberedReceiver(primary, url=url, queue="q", failover_updates = False)
receiver.start()
sender.start()
- self.wait_backup(backup, "q")
+ backup.wait_backup("q")
assert retry(lambda: receiver.received > 10) # Wait for some messages to get thru
primary.kill()
@@ -329,22 +325,22 @@ class ReplicationTests(HaTest):
"""Verify that a backup broker fails over and recovers queue state"""
brokers = HaCluster(self, 3)
brokers[0].connect().session().sender("q;{create:always}").send("a")
- for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"])
+ for b in brokers[1:]: b.assert_browse_backup("q", ["a"])
brokers[0].expect = EXPECT_EXIT_FAIL
brokers.kill(0)
brokers[1].connect().session().sender("q").send("b")
- self.assert_browse_backup(brokers[2], "q", ["a","b"])
+ brokers[2].assert_browse_backup("q", ["a","b"])
s = brokers[1].connect().session()
self.assertEqual("a", s.receiver("q").fetch().content)
s.acknowledge()
- self.assert_browse_backup(brokers[2], "q", ["b"])
+ brokers[2].assert_browse_backup("q", ["b"])
def test_qpid_config_replication(self):
"""Set up replication via qpid-config"""
brokers = HaCluster(self,2)
brokers[0].config_declare("q","all")
brokers[0].connect().session().sender("q").send("foo")
- self.assert_browse_backup(brokers[1], "q", ["foo"])
+ brokers[1].assert_browse_backup("q", ["foo"])
def test_standalone_queue_replica(self):
"""Test replication of individual queues outside of cluster mode"""
@@ -358,18 +354,18 @@ class ReplicationTests(HaTest):
# Set up replication with qpid-ha
backup.replicate(primary.host_port(), "q")
ps.send("a")
- self.assert_browse_backup(backup, "q", ["a"])
+ backup.assert_browse_backup("q", ["a"])
ps.send("b")
- self.assert_browse_backup(backup, "q", ["a", "b"])
+ backup.assert_browse_backup("q", ["a", "b"])
self.assertEqual("a", pr.fetch().content)
pr.session.acknowledge()
- self.assert_browse_backup(backup, "q", ["b"])
+ backup.assert_browse_backup("q", ["b"])
# Set up replication with qpid-config
ps2 = pc.session().sender("q2;{create:always}")
backup.config_replicate(primary.host_port(), "q2");
ps2.send("x")
- self.assert_browse_backup(backup, "q2", ["x"])
+ backup.assert_browse_backup("q2", ["x"])
def test_queue_replica_failover(self):
@@ -383,15 +379,15 @@ class ReplicationTests(HaTest):
br = backup.connect().session().receiver("q;{create:always}")
backup.replicate(cluster.url, "q")
ps.send("a")
- self.assert_browse_backup(backup, "q", ["a"])
+ backup.assert_browse_backup("q", ["a"])
cluster.bounce(0)
- self.assert_browse_backup(backup, "q", ["a"])
+ backup.assert_browse_backup("q", ["a"])
ps.send("b")
- self.assert_browse_backup(backup, "q", ["a", "b"])
+ backup.assert_browse_backup("q", ["a", "b"])
cluster.bounce(1)
self.assertEqual("a", pr.fetch().content)
pr.session.acknowledge()
- self.assert_browse_backup(backup, "q", ["b"])
+ backup.assert_browse_backup("q", ["b"])
def test_lvq(self):
"""Verify that we replicate to an LVQ correctly"""
@@ -402,13 +398,13 @@ class ReplicationTests(HaTest):
def send(key,value): s.send(Message(content=value,properties={"lvq-key":key}))
for kv in [("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]:
send(*kv)
- self.assert_browse_backup(backup, "lvq", ["b-1", "a-3", "c-2"])
+ backup.assert_browse_backup("lvq", ["b-1", "a-3", "c-2"])
send("b","b-2")
- self.assert_browse_backup(backup, "lvq", ["a-3", "c-2", "b-2"])
+ backup.assert_browse_backup("lvq", ["a-3", "c-2", "b-2"])
send("c","c-3")
- self.assert_browse_backup(backup, "lvq", ["a-3", "b-2", "c-3"])
+ backup.assert_browse_backup("lvq", ["a-3", "b-2", "c-3"])
send("d","d-1")
- self.assert_browse_backup(backup, "lvq", ["a-3", "b-2", "c-3", "d-1"])
+ backup.assert_browse_backup("lvq", ["a-3", "b-2", "c-3", "d-1"])
def test_ring(self):
"""Test replication with the ring queue policy"""
@@ -417,7 +413,7 @@ class ReplicationTests(HaTest):
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5}}}}")
for i in range(10): s.send(Message(str(i)))
- self.assert_browse_backup(backup, "q", [str(i) for i in range(5,10)])
+ backup.assert_browse_backup("q", [str(i) for i in range(5,10)])
def test_reject(self):
"""Test replication with the reject queue policy"""
@@ -428,7 +424,7 @@ class ReplicationTests(HaTest):
try:
for i in range(10): s.send(Message(str(i)), sync=False)
except qpid.messaging.exceptions.TargetCapacityExceeded: pass
- self.assert_browse_backup(backup, "q", [str(i) for i in range(0,5)])
+ backup.assert_browse_backup("q", [str(i) for i in range(0,5)])
def test_priority(self):
"""Verify priority queues replicate correctly"""
@@ -440,7 +436,7 @@ class ReplicationTests(HaTest):
priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
for p in priorities: s.send(Message(priority=p))
# Can't use browse_backup as browser sees messages in delivery order not priority.
- self.wait_backup(backup, "priority-queue")
+ backup.wait_backup("priority-queue")
r = backup.connect_admin().session().receiver("priority-queue")
received = [r.fetch().priority for i in priorities]
self.assertEqual(sorted(priorities, reverse=True), received)
@@ -458,7 +454,7 @@ class ReplicationTests(HaTest):
s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':%s, %s}}}}"%(levels,limit_policy))
messages = [Message(content=str(uuid4()), priority = p) for p in priorities]
for m in messages: s.send(m)
- self.wait_backup(backup, s.target)
+ backup.wait_backup(s.target)
r = backup.connect_admin().session().receiver("priority-queue")
received = [r.fetch().content for i in priorities]
sort = sorted(messages, key=lambda m: priority_level(m.priority, levels), reverse=True)
@@ -478,8 +474,8 @@ class ReplicationTests(HaTest):
# correct result, the uncommented one is for the actualy buggy
# result. See https://issues.apache.org/jira/browse/QPID-3866
#
- # self.assert_browse_backup(backup, "q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
- self.assert_browse_backup(backup, "q", [9,9,9,9,2], transform=lambda m: m.priority)
+ # backup.assert_browse_backup("q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
+ backup.assert_browse_backup("q", [9,9,9,9,2], transform=lambda m: m.priority)
def test_backup_acquired(self):
"""Verify that acquired messages are backed up, for all queue types."""
@@ -497,11 +493,10 @@ class ReplicationTests(HaTest):
s.receiver(self.address).fetch()
def wait(self, brokertest, backup):
- brokertest.wait_backup(backup, self.queue)
+ backup.wait_backup(self.queue)
def verify(self, brokertest, backup):
- brokertest.assert_browse_backup(
- backup, self.queue, self.expect, msg=self.queue)
+ backup.assert_browse_backup(self.queue, self.expect, msg=self.queue)
tests = [
Test("plain",[],range(10)),
@@ -546,7 +541,7 @@ class ReplicationTests(HaTest):
"""Verify that a queue with an invalid qpid.replicate gets default treatment"""
cluster = HaCluster(self, 2, ha_replicate="all")
c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}")
- self.wait_backup(cluster[1], "q")
+ cluster[1].wait_backup("q")
def test_exclusive_queue(self):
"""Ensure that we can back-up exclusive queues, i.e. the replicating
@@ -559,7 +554,7 @@ class ReplicationTests(HaTest):
try: c.session().receiver(addr); self.fail("Expected exclusive exception")
except ReceiverError: pass
s = c.session().sender(q).send(q)
- self.assert_browse_backup(cluster[1], q, [q])
+ cluster[1].assert_browse_backup(q, [q])
test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}");
test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}")