diff options
author | Alan Conway <aconway@apache.org> | 2012-04-17 15:16:41 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-04-17 15:16:41 +0000 |
commit | 07aad7e3cac4568f2226b6e2bec00e9668de9cb4 (patch) | |
tree | c5e993c21a61c77bb0c58e2f6eee669613a2ded2 | |
parent | 95e371124817b222fc53df491469df0e8503d24c (diff) | |
download | qpid-python-07aad7e3cac4568f2226b6e2bec00e9668de9cb4.tar.gz |
NO-JIRA: Minor code clean-up in brokertest.py and ha_tests.py.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1327137 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 59 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 135 |
2 files changed, 97 insertions, 97 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index ccf25f35b5..1c73e20cbb 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -436,6 +436,35 @@ class Cluster: def __getitem__(self,index): return self._brokers[index] def __iter__(self): return self._brokers.__iter__() + +def browse(session, queue, timeout=0, transform=lambda m: m.content): + """Return a list with the contents of each message on queue.""" + r = session.receiver("%s;{mode:browse}"%(queue)) + r.capacity = 100 + try: + contents = [] + try: + while True: contents.append(transform(r.fetch(timeout=timeout))) + except messaging.Empty: pass + finally: r.close() + return contents + +def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg="browse failed"): + """Assert that the contents of messages on queue (as retrieved + using session and timeout) exactly match the strings in + expect_contents""" + actual_contents = browse(session, queue, timeout, transform=transform) + if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents) + assert expect_contents == actual_contents, "%s: %s != %s"%(msg, expect, actual) + +def assert_browse_retry(session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content, msg="browse failed"): + """Wait up to timeout for contents of queue to match expect_contents""" + test = lambda: browse(session, queue, 0, transform=transform) == expect_contents + retry(test, timeout, delay) + actual_contents = browse(session, queue, 0, transform=transform) + if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents) + assert expect_contents == actual_contents, "%s: %s != %s"%(msg, expect, actual) + class BrokerTest(TestCase): """ Tracks processes started by test and kills at end of test. @@ -501,33 +530,9 @@ class BrokerTest(TestCase): cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd) return cluster - def browse(self, session, queue, timeout=0, transform=lambda m: m.content): - """Return a list with the contents of each message on queue.""" - r = session.receiver("%s;{mode:browse}"%(queue)) - r.capacity = 100 - try: - contents = [] - try: - while True: contents.append(transform(r.fetch(timeout=timeout))) - except messaging.Empty: pass - finally: r.close() - return contents - - def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg=None): - """Assert that the contents of messages on queue (as retrieved - using session and timeout) exactly match the strings in - expect_contents""" - actual_contents = self.browse(session, queue, timeout, transform=transform) - if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents) - self.assertEqual(expect_contents, actual_contents, msg) - - def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content, msg=None): - """Wait up to timeout for contents of queue to match expect_contents""" - test = lambda: self.browse(session, queue, 0, transform=transform) == expect_contents - retry(test, timeout, delay) - actual_contents = self.browse(session, queue, 0, transform=transform) - if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents) - self.assertEqual(expect_contents, actual_contents, msg) + def browse(self, *args, **kwargs): browse(*args, **kwargs) + def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs) + def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs) def join(thread, timeout=10): thread.join(timeout) diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index e251e4d8c8..0c8ac569b8 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -72,6 +72,19 @@ class HaBroker(Broker): def connect_admin(self, **kwargs): return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs) + def wait_backup(self, address): + """Wait for address to become valid on a backup broker.""" + bs = self.connect_admin().session() + try: wait_address(bs, address) + finally: bs.connection.close() + + def assert_browse_backup(self, queue, expected, **kwargs): + """Combines wait_backup and assert_browse_retry.""" + bs = self.connect_admin().session() + try: + wait_address(bs, queue) + assert_browse_retry(bs, queue, expected, **kwargs) + finally: bs.connection.close() class HaCluster(object): _cluster_count = 0 @@ -106,40 +119,23 @@ class HaCluster(object): def __getitem__(self,index): return self._brokers[index] def __iter__(self): return self._brokers.__iter__() - -class HaTest(BrokerTest): - """Base class for HA test cases, defines convenience functions""" - - # Wait for an address to become valid. - def wait(self, session, address): - def check(): - try: - session.sender(address) - return True - except NotFound: return False - assert retry(check), "Timed out waiting for address %s"%(address) - - # Wait for address to become valid on a backup broker. - def wait_backup(self, backup, address): - bs = backup.connect_admin().session() - self.wait(bs, address) - bs.connection.close() - - # Combines wait_backup and assert_browse_retry - def assert_browse_backup(self, backup, queue, expected, **kwargs): - bs = backup.connect_admin().session() - self.wait(bs, queue) - self.assert_browse_retry(bs, queue, expected, **kwargs) - bs.connection.close() - - def assert_missing(self, session, address): +def wait_address(session, address): + """Wait for an address to become valid.""" + def check(): try: - session.receiver(address) - self.fail("Expected NotFound: %s"%(address)) - except NotFound: pass - - -class ReplicationTests(HaTest): + session.sender(address) + return True + except NotFound: return False + assert retry(check), "Timed out waiting for address %s"%(address) + +def assert_missing(session, address): + """Assert that the address is _not_ valid""" + try: + session.receiver(address) + self.fail("Expected NotFound: %s"%(address)) + except NotFound: pass + +class ReplicationTests(BrokerTest): """Correctness tests for HA replication.""" def test_replication(self): @@ -176,7 +172,7 @@ class ReplicationTests(HaTest): def verify(b, prefix, p): """Verify setup was replicated to backup b""" # Wait for configuration to replicate. - self.wait(b, prefix+"x"); + wait_address(b, prefix+"x"); self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"]) self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "b") @@ -184,7 +180,7 @@ class ReplicationTests(HaTest): self.assert_browse_retry(b, prefix+"q1", ["1", "4"]) self.assert_browse_retry(b, prefix+"q2", []) # configuration only - self.assert_missing(b, prefix+"q3") + assert_missing(b, prefix+"q3") b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"]) b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=configuration @@ -209,7 +205,7 @@ class ReplicationTests(HaTest): verify(b, "2", p) # Test a series of messages, enqueue all then dequeue all. s = p.sender(queue("foo","all")) - self.wait(b, "foo") + wait_address(b, "foo") msgs = [str(i) for i in range(10)] for m in msgs: s.send(Message(m)) self.assert_browse_retry(p, "foo", msgs) @@ -246,10 +242,10 @@ class ReplicationTests(HaTest): msgs = [str(i) for i in range(30)] b1 = backup1.connect_admin().session() - self.wait(b1, "q"); + wait_address(b1, "q"); self.assert_browse_retry(b1, "q", msgs) b2 = backup2.connect_admin().session() - self.wait(b2, "q"); + wait_address(b2, "q"); self.assert_browse_retry(b2, "q", msgs) def test_send_receive(self): @@ -273,8 +269,8 @@ class ReplicationTests(HaTest): self.assertEqual(receiver.wait(), 0) expect = [long(i) for i in range(991, 1001)] sn = lambda m: m.properties["sn"] - self.assert_browse_backup(brokers[1], "q", expect, transform=sn) - self.assert_browse_backup(brokers[2], "q", expect, transform=sn) + brokers[1].assert_browse_backup("q", expect, transform=sn) + brokers[2].assert_browse_backup("q", expect, transform=sn) def test_failover_python(self): """Verify that backups rejects connections and that fail-over works in python client""" @@ -293,7 +289,7 @@ class ReplicationTests(HaTest): c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True) s = c.session() sender = s.sender("q;{create:always}") - self.wait_backup(backup, "q") + backup.wait_backup("q") sender.send("foo") primary.kill() assert retry(lambda: not is_running(primary.pid)) @@ -308,13 +304,13 @@ class ReplicationTests(HaTest): backup = HaBroker(self, name="backup", broker_url=primary.host_port()) url="%s,%s"%(primary.host_port(), backup.host_port()) primary.connect().session().sender("q;{create:always}") - self.wait_backup(backup, "q") + backup.wait_backup("q") sender = NumberedSender(primary, url=url, queue="q", failover_updates = False) receiver = NumberedReceiver(primary, url=url, queue="q", failover_updates = False) receiver.start() sender.start() - self.wait_backup(backup, "q") + backup.wait_backup("q") assert retry(lambda: receiver.received > 10) # Wait for some messages to get thru primary.kill() @@ -329,22 +325,22 @@ class ReplicationTests(HaTest): """Verify that a backup broker fails over and recovers queue state""" brokers = HaCluster(self, 3) brokers[0].connect().session().sender("q;{create:always}").send("a") - for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"]) + for b in brokers[1:]: b.assert_browse_backup("q", ["a"]) brokers[0].expect = EXPECT_EXIT_FAIL brokers.kill(0) brokers[1].connect().session().sender("q").send("b") - self.assert_browse_backup(brokers[2], "q", ["a","b"]) + brokers[2].assert_browse_backup("q", ["a","b"]) s = brokers[1].connect().session() self.assertEqual("a", s.receiver("q").fetch().content) s.acknowledge() - self.assert_browse_backup(brokers[2], "q", ["b"]) + brokers[2].assert_browse_backup("q", ["b"]) def test_qpid_config_replication(self): """Set up replication via qpid-config""" brokers = HaCluster(self,2) brokers[0].config_declare("q","all") brokers[0].connect().session().sender("q").send("foo") - self.assert_browse_backup(brokers[1], "q", ["foo"]) + brokers[1].assert_browse_backup("q", ["foo"]) def test_standalone_queue_replica(self): """Test replication of individual queues outside of cluster mode""" @@ -358,18 +354,18 @@ class ReplicationTests(HaTest): # Set up replication with qpid-ha backup.replicate(primary.host_port(), "q") ps.send("a") - self.assert_browse_backup(backup, "q", ["a"]) + backup.assert_browse_backup("q", ["a"]) ps.send("b") - self.assert_browse_backup(backup, "q", ["a", "b"]) + backup.assert_browse_backup("q", ["a", "b"]) self.assertEqual("a", pr.fetch().content) pr.session.acknowledge() - self.assert_browse_backup(backup, "q", ["b"]) + backup.assert_browse_backup("q", ["b"]) # Set up replication with qpid-config ps2 = pc.session().sender("q2;{create:always}") backup.config_replicate(primary.host_port(), "q2"); ps2.send("x") - self.assert_browse_backup(backup, "q2", ["x"]) + backup.assert_browse_backup("q2", ["x"]) def test_queue_replica_failover(self): @@ -383,15 +379,15 @@ class ReplicationTests(HaTest): br = backup.connect().session().receiver("q;{create:always}") backup.replicate(cluster.url, "q") ps.send("a") - self.assert_browse_backup(backup, "q", ["a"]) + backup.assert_browse_backup("q", ["a"]) cluster.bounce(0) - self.assert_browse_backup(backup, "q", ["a"]) + backup.assert_browse_backup("q", ["a"]) ps.send("b") - self.assert_browse_backup(backup, "q", ["a", "b"]) + backup.assert_browse_backup("q", ["a", "b"]) cluster.bounce(1) self.assertEqual("a", pr.fetch().content) pr.session.acknowledge() - self.assert_browse_backup(backup, "q", ["b"]) + backup.assert_browse_backup("q", ["b"]) def test_lvq(self): """Verify that we replicate to an LVQ correctly""" @@ -402,13 +398,13 @@ class ReplicationTests(HaTest): def send(key,value): s.send(Message(content=value,properties={"lvq-key":key})) for kv in [("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]: send(*kv) - self.assert_browse_backup(backup, "lvq", ["b-1", "a-3", "c-2"]) + backup.assert_browse_backup("lvq", ["b-1", "a-3", "c-2"]) send("b","b-2") - self.assert_browse_backup(backup, "lvq", ["a-3", "c-2", "b-2"]) + backup.assert_browse_backup("lvq", ["a-3", "c-2", "b-2"]) send("c","c-3") - self.assert_browse_backup(backup, "lvq", ["a-3", "b-2", "c-3"]) + backup.assert_browse_backup("lvq", ["a-3", "b-2", "c-3"]) send("d","d-1") - self.assert_browse_backup(backup, "lvq", ["a-3", "b-2", "c-3", "d-1"]) + backup.assert_browse_backup("lvq", ["a-3", "b-2", "c-3", "d-1"]) def test_ring(self): """Test replication with the ring queue policy""" @@ -417,7 +413,7 @@ class ReplicationTests(HaTest): backup = HaBroker(self, name="backup", broker_url=primary.host_port()) s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5}}}}") for i in range(10): s.send(Message(str(i))) - self.assert_browse_backup(backup, "q", [str(i) for i in range(5,10)]) + backup.assert_browse_backup("q", [str(i) for i in range(5,10)]) def test_reject(self): """Test replication with the reject queue policy""" @@ -428,7 +424,7 @@ class ReplicationTests(HaTest): try: for i in range(10): s.send(Message(str(i)), sync=False) except qpid.messaging.exceptions.TargetCapacityExceeded: pass - self.assert_browse_backup(backup, "q", [str(i) for i in range(0,5)]) + backup.assert_browse_backup("q", [str(i) for i in range(0,5)]) def test_priority(self): """Verify priority queues replicate correctly""" @@ -440,7 +436,7 @@ class ReplicationTests(HaTest): priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2] for p in priorities: s.send(Message(priority=p)) # Can't use browse_backup as browser sees messages in delivery order not priority. - self.wait_backup(backup, "priority-queue") + backup.wait_backup("priority-queue") r = backup.connect_admin().session().receiver("priority-queue") received = [r.fetch().priority for i in priorities] self.assertEqual(sorted(priorities, reverse=True), received) @@ -458,7 +454,7 @@ class ReplicationTests(HaTest): s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':%s, %s}}}}"%(levels,limit_policy)) messages = [Message(content=str(uuid4()), priority = p) for p in priorities] for m in messages: s.send(m) - self.wait_backup(backup, s.target) + backup.wait_backup(s.target) r = backup.connect_admin().session().receiver("priority-queue") received = [r.fetch().content for i in priorities] sort = sorted(messages, key=lambda m: priority_level(m.priority, levels), reverse=True) @@ -478,8 +474,8 @@ class ReplicationTests(HaTest): # correct result, the uncommented one is for the actualy buggy # result. See https://issues.apache.org/jira/browse/QPID-3866 # - # self.assert_browse_backup(backup, "q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority) - self.assert_browse_backup(backup, "q", [9,9,9,9,2], transform=lambda m: m.priority) + # backup.assert_browse_backup("q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority) + backup.assert_browse_backup("q", [9,9,9,9,2], transform=lambda m: m.priority) def test_backup_acquired(self): """Verify that acquired messages are backed up, for all queue types.""" @@ -497,11 +493,10 @@ class ReplicationTests(HaTest): s.receiver(self.address).fetch() def wait(self, brokertest, backup): - brokertest.wait_backup(backup, self.queue) + backup.wait_backup(self.queue) def verify(self, brokertest, backup): - brokertest.assert_browse_backup( - backup, self.queue, self.expect, msg=self.queue) + backup.assert_browse_backup(self.queue, self.expect, msg=self.queue) tests = [ Test("plain",[],range(10)), @@ -546,7 +541,7 @@ class ReplicationTests(HaTest): """Verify that a queue with an invalid qpid.replicate gets default treatment""" cluster = HaCluster(self, 2, ha_replicate="all") c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}") - self.wait_backup(cluster[1], "q") + cluster[1].wait_backup("q") def test_exclusive_queue(self): """Ensure that we can back-up exclusive queues, i.e. the replicating @@ -559,7 +554,7 @@ class ReplicationTests(HaTest): try: c.session().receiver(addr); self.fail("Expected exclusive exception") except ReceiverError: pass s = c.session().sender(q).send(q) - self.assert_browse_backup(cluster[1], q, [q]) + cluster[1].assert_browse_backup(q, [q]) test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}"); test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}") |