diff options
author | Alan Conway <aconway@apache.org> | 2012-09-27 21:51:30 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-09-27 21:51:30 +0000 |
commit | 203d6d4c76fbb7f52c507318ef0e92b8d4dba0bf (patch) | |
tree | e210086d914489db14b8e06bb5b12d32731adf85 /cpp/src/tests | |
parent | b59d102288a4d59d42eddd1361a3106cad264dd6 (diff) | |
download | qpid-python-203d6d4c76fbb7f52c507318ef0e92b8d4dba0bf.tar.gz |
NO-JIRA: Fix logging in ha_tests.py
In order to suppress unwanted warnings from certain test, the ha_test framework
was actually turning off all python logging.
This patch selectively turns off wanrnings in specific code regions and then
restores the configured logging level.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1391232 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rwxr-xr-x | cpp/src/tests/ha_test.py | 13 | ||||
-rwxr-xr-x | cpp/src/tests/ha_tests.py | 358 |
2 files changed, 200 insertions, 171 deletions
diff --git a/cpp/src/tests/ha_test.py b/cpp/src/tests/ha_test.py index 18a969a07b..2f9d9a1211 100755 --- a/cpp/src/tests/ha_test.py +++ b/cpp/src/tests/ha_test.py @@ -30,6 +30,18 @@ from uuid import UUID log = getLogger(__name__) +class LogLevel: + """ + Temporarily change the log settings on the root logger. + Used to suppress expected WARN messages from the python client. + """ + def __init__(self, level): + self.save_level = getLogger().getEffectiveLevel() + getLogger().setLevel(level) + + def restore(self): + getLogger().setLevel(self.save_level) + class QmfAgent(object): """Access to a QMF broker agent.""" def __init__(self, address, **kwargs): @@ -73,7 +85,6 @@ class HaBroker(Broker): assert os.path.exists(self.qpid_ha_path) self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config") assert os.path.exists(self.qpid_config_path) - getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. self.qpid_ha_script=import_script(self.qpid_ha_path) self._agent = None self.client_credentials = client_credentials diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py index de87c49d21..3c43c6a914 100755 --- a/cpp/src/tests/ha_tests.py +++ b/cpp/src/tests/ha_tests.py @@ -88,42 +88,45 @@ class ReplicationTests(BrokerTest): b.sender(prefix+"e4/key4").send(Message("drop2")) # Verify unbind. self.assert_browse_retry(b, prefix+"q4", ["6","7"]) - primary = HaBroker(self, name="primary") - primary.promote() - p = primary.connect().session() - - # Create config, send messages before starting the backup, to test catch-up replication. - setup(p, "1", primary) - backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) - # Create config, send messages after starting the backup, to test steady-state replication. - setup(p, "2", primary) - - # Verify the data on the backup - b = backup.connect_admin().session() - verify(b, "1", p) - verify(b, "2", p) - # Test a series of messages, enqueue all then dequeue all. - s = p.sender(queue("foo","all")) - wait_address(b, "foo") - msgs = [str(i) for i in range(10)] - for m in msgs: s.send(Message(m)) - self.assert_browse_retry(p, "foo", msgs) - self.assert_browse_retry(b, "foo", msgs) - r = p.receiver("foo") - for m in msgs: self.assertEqual(m, r.fetch(timeout=0).content) - p.acknowledge() - self.assert_browse_retry(p, "foo", []) - self.assert_browse_retry(b, "foo", []) - - # Another series, this time verify each dequeue individually. - for m in msgs: s.send(Message(m)) - self.assert_browse_retry(p, "foo", msgs) - self.assert_browse_retry(b, "foo", msgs) - for i in range(len(msgs)): - self.assertEqual(msgs[i], r.fetch(timeout=0).content) + l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. + try: + primary = HaBroker(self, name="primary") + primary.promote() + p = primary.connect().session() + + # Create config, send messages before starting the backup, to test catch-up replication. + setup(p, "1", primary) + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) + # Create config, send messages after starting the backup, to test steady-state replication. + setup(p, "2", primary) + + # Verify the data on the backup + b = backup.connect_admin().session() + verify(b, "1", p) + verify(b, "2", p) + # Test a series of messages, enqueue all then dequeue all. + s = p.sender(queue("foo","all")) + wait_address(b, "foo") + msgs = [str(i) for i in range(10)] + for m in msgs: s.send(Message(m)) + self.assert_browse_retry(p, "foo", msgs) + self.assert_browse_retry(b, "foo", msgs) + r = p.receiver("foo") + for m in msgs: self.assertEqual(m, r.fetch(timeout=0).content) p.acknowledge() - self.assert_browse_retry(p, "foo", msgs[i+1:]) - self.assert_browse_retry(b, "foo", msgs[i+1:]) + self.assert_browse_retry(p, "foo", []) + self.assert_browse_retry(b, "foo", []) + + # Another series, this time verify each dequeue individually. + for m in msgs: s.send(Message(m)) + self.assert_browse_retry(p, "foo", msgs) + self.assert_browse_retry(b, "foo", msgs) + for i in range(len(msgs)): + self.assertEqual(msgs[i], r.fetch(timeout=0).content) + p.acknowledge() + self.assert_browse_retry(p, "foo", msgs[i+1:]) + self.assert_browse_retry(b, "foo", msgs[i+1:]) + finally: l.restore() def test_sync(self): primary = HaBroker(self, name="primary") @@ -149,53 +152,59 @@ class ReplicationTests(BrokerTest): def test_send_receive(self): """Verify sequence numbers of messages sent by qpid-send""" - brokers = HaCluster(self, 3) - sender = self.popen( - ["qpid-send", - "--broker", brokers[0].host_port(), - "--address", "q;{create:always}", - "--messages=1000", - "--content-string=x" - ]) - receiver = self.popen( - ["qpid-receive", - "--broker", brokers[0].host_port(), - "--address", "q;{create:always}", - "--messages=990", - "--timeout=10" - ]) - self.assertEqual(sender.wait(), 0) - self.assertEqual(receiver.wait(), 0) - expect = [long(i) for i in range(991, 1001)] - sn = lambda m: m.properties["sn"] - brokers[1].assert_browse_backup("q", expect, transform=sn) - brokers[2].assert_browse_backup("q", expect, transform=sn) + l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. + try: + brokers = HaCluster(self, 3) + sender = self.popen( + ["qpid-send", + "--broker", brokers[0].host_port(), + "--address", "q;{create:always}", + "--messages=1000", + "--content-string=x" + ]) + receiver = self.popen( + ["qpid-receive", + "--broker", brokers[0].host_port(), + "--address", "q;{create:always}", + "--messages=990", + "--timeout=10" + ]) + self.assertEqual(sender.wait(), 0) + self.assertEqual(receiver.wait(), 0) + expect = [long(i) for i in range(991, 1001)] + sn = lambda m: m.properties["sn"] + brokers[1].assert_browse_backup("q", expect, transform=sn) + brokers[2].assert_browse_backup("q", expect, transform=sn) + finally: l.restore() def test_failover_python(self): """Verify that backups rejects connections and that fail-over works in python client""" - primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) - primary.promote() - backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) - # Check that backup rejects normal connections + l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. try: - backup.connect().session() - self.fail("Expected connection to backup to fail") - except ConnectionError: pass - # Check that admin connections are allowed to backup. - backup.connect_admin().close() - - # Test discovery: should connect to primary after reject by backup - c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True) - s = c.session() - sender = s.sender("q;{create:always}") - backup.wait_backup("q") - sender.send("foo") - primary.kill() - assert retry(lambda: not is_running(primary.pid)) - backup.promote() - sender.send("bar") - self.assert_browse_retry(s, "q", ["foo", "bar"]) - c.close() + primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) + primary.promote() + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) + # Check that backup rejects normal connections + try: + backup.connect().session() + self.fail("Expected connection to backup to fail") + except ConnectionError: pass + # Check that admin connections are allowed to backup. + backup.connect_admin().close() + + # Test discovery: should connect to primary after reject by backup + c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True) + s = c.session() + sender = s.sender("q;{create:always}") + backup.wait_backup("q") + sender.send("foo") + primary.kill() + assert retry(lambda: not is_running(primary.pid)) + backup.promote() + sender.send("bar") + self.assert_browse_retry(s, "q", ["foo", "bar"]) + c.close() + finally: l.restore() def test_failover_cpp(self): """Verify that failover works in the C++ client.""" @@ -244,51 +253,57 @@ class ReplicationTests(BrokerTest): def test_standalone_queue_replica(self): """Test replication of individual queues outside of cluster mode""" - getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. - primary = HaBroker(self, name="primary", ha_cluster=False) - pc = primary.connect() - ps = pc.session().sender("q;{create:always}") - pr = pc.session().receiver("q;{create:always}") - backup = HaBroker(self, name="backup", ha_cluster=False) - br = backup.connect().session().receiver("q;{create:always}") - - # Set up replication with qpid-ha - backup.replicate(primary.host_port(), "q") - ps.send("a") - backup.assert_browse_backup("q", ["a"]) - ps.send("b") - backup.assert_browse_backup("q", ["a", "b"]) - self.assertEqual("a", pr.fetch().content) - pr.session.acknowledge() - backup.assert_browse_backup("q", ["b"]) - - # Set up replication with qpid-config - ps2 = pc.session().sender("q2;{create:always}") - backup.config_replicate(primary.host_port(), "q2"); - ps2.send("x") - backup.assert_browse_backup("q2", ["x"]) - + l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. + try: + primary = HaBroker(self, name="primary", ha_cluster=False) + pc = primary.connect() + ps = pc.session().sender("q;{create:always}") + pr = pc.session().receiver("q;{create:always}") + backup = HaBroker(self, name="backup", ha_cluster=False) + br = backup.connect().session().receiver("q;{create:always}") + + # Set up replication with qpid-ha + backup.replicate(primary.host_port(), "q") + ps.send("a") + backup.assert_browse_backup("q", ["a"]) + ps.send("b") + backup.assert_browse_backup("q", ["a", "b"]) + self.assertEqual("a", pr.fetch().content) + pr.session.acknowledge() + backup.assert_browse_backup("q", ["b"]) + + # Set up replication with qpid-config + ps2 = pc.session().sender("q2;{create:always}") + backup.config_replicate(primary.host_port(), "q2"); + ps2.send("x") + backup.assert_browse_backup("q2", ["x"]) + finally: l.restore() def test_queue_replica_failover(self): """Test individual queue replication from a cluster to a standalone backup broker, verify it fails over.""" - cluster = HaCluster(self, 2) - primary = cluster[0] - pc = cluster.connect(0) - ps = pc.session().sender("q;{create:always}") - pr = pc.session().receiver("q;{create:always}") - backup = HaBroker(self, name="backup", ha_cluster=False) - br = backup.connect().session().receiver("q;{create:always}") - backup.replicate(cluster.url, "q") - ps.send("a") - backup.assert_browse_backup("q", ["a"]) - cluster.bounce(0) - backup.assert_browse_backup("q", ["a"]) - ps.send("b") - backup.assert_browse_backup("q", ["a", "b"]) - cluster.bounce(1) - self.assertEqual("a", pr.fetch().content) - pr.session.acknowledge() - backup.assert_browse_backup("q", ["b"]) + l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. + try: + cluster = HaCluster(self, 2) + primary = cluster[0] + pc = cluster.connect(0) + ps = pc.session().sender("q;{create:always}") + pr = pc.session().receiver("q;{create:always}") + backup = HaBroker(self, name="backup", ha_cluster=False) + br = backup.connect().session().receiver("q;{create:always}") + backup.replicate(cluster.url, "q") + ps.send("a") + backup.assert_browse_backup("q", ["a"]) + cluster.bounce(0) + backup.assert_browse_backup("q", ["a"]) + ps.send("b") + backup.assert_browse_backup("q", ["a", "b"]) + cluster.bounce(1) + self.assertEqual("a", pr.fetch().content) + pr.session.acknowledge() + backup.assert_browse_backup("q", ["b"]) + pc.close() + br.close() + finally: l.restore() def test_lvq(self): """Verify that we replicate to an LVQ correctly""" @@ -733,56 +748,59 @@ class RecoveryTests(BrokerTest): """Verify that the broker holds queues without sufficient backup, i.e. does not complete messages sent to those queues.""" - # We don't want backups to time out for this test, set long timeout. - cluster = HaCluster(self, 4, args=["--ha-backup-timeout=100000"]); - # Wait for the primary to be ready - cluster[0].wait_status("active") - # Create a queue before the failure. - s1 = cluster.connect(0).session().sender("q1;{create:always}") - for b in cluster: b.wait_backup("q1") - for i in xrange(100): s1.send(str(i)) - # Kill primary and 2 backups - for i in [0,1,2]: cluster.kill(i, False) - cluster[3].promote() # New primary, backups will be 1 and 2 - cluster[3].wait_status("recovering") - - def assertSyncTimeout(s): - try: - s.sync(timeout=.01) - self.fail("Expected Timeout exception") - except Timeout: pass - - # Create a queue after the failure - s2 = cluster.connect(3).session().sender("q2;{create:always}") - - # Verify that messages sent are not completed - for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False) - assertSyncTimeout(s1) - self.assertEqual(s1.unsettled(), 100) - assertSyncTimeout(s2) - self.assertEqual(s2.unsettled(), 100) - - # Verify we can receive even if sending is on hold: - cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)]) - - # Restart backups, verify queues are released only when both backups are up - cluster.restart(1) - assertSyncTimeout(s1) - self.assertEqual(s1.unsettled(), 100) - assertSyncTimeout(s2) - self.assertEqual(s2.unsettled(), 100) - self.assertEqual(cluster[3].ha_status(), "recovering") - cluster.restart(2) - - # Verify everything is up to date and active - def settled(sender): sender.sync(); return sender.unsettled() == 0; - assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled()) - assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled()) - cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)]) - cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)]) - cluster[3].wait_status("active"), - s1.session.connection.close() - s2.session.connection.close() + l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. + try: + # We don't want backups to time out for this test, set long timeout. + cluster = HaCluster(self, 4, args=["--ha-backup-timeout=100000"]); + # Wait for the primary to be ready + cluster[0].wait_status("active") + # Create a queue before the failure. + s1 = cluster.connect(0).session().sender("q1;{create:always}") + for b in cluster: b.wait_backup("q1") + for i in xrange(100): s1.send(str(i)) + # Kill primary and 2 backups + for i in [0,1,2]: cluster.kill(i, False) + cluster[3].promote() # New primary, backups will be 1 and 2 + cluster[3].wait_status("recovering") + + def assertSyncTimeout(s): + try: + s.sync(timeout=.01) + self.fail("Expected Timeout exception") + except Timeout: pass + + # Create a queue after the failure + s2 = cluster.connect(3).session().sender("q2;{create:always}") + + # Verify that messages sent are not completed + for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False) + assertSyncTimeout(s1) + self.assertEqual(s1.unsettled(), 100) + assertSyncTimeout(s2) + self.assertEqual(s2.unsettled(), 100) + + # Verify we can receive even if sending is on hold: + cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)]) + + # Restart backups, verify queues are released only when both backups are up + cluster.restart(1) + assertSyncTimeout(s1) + self.assertEqual(s1.unsettled(), 100) + assertSyncTimeout(s2) + self.assertEqual(s2.unsettled(), 100) + self.assertEqual(cluster[3].ha_status(), "recovering") + cluster.restart(2) + + # Verify everything is up to date and active + def settled(sender): sender.sync(); return sender.unsettled() == 0; + assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled()) + assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled()) + cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)]) + cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)]) + cluster[3].wait_status("active"), + s1.session.connection.close() + s2.session.connection.close() + finally: l.restore() def test_expected_backup_timeout(self): """Verify that we time-out expected backups and release held queues |