diff options
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 53 |
1 files changed, 39 insertions, 14 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index f3c45ba7a3..6d5dc19aa6 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -22,7 +22,7 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil from qpid.messaging import Message, NotFound, ConnectionError, Connection from brokertest import * from threading import Thread, Lock, Condition -from logging import getLogger +from logging import getLogger, WARN, ERROR, DEBUG log = getLogger("qpid.ha-tests") @@ -39,7 +39,8 @@ class ShortTests(BrokerTest): ] + args, **kwargs) - # FIXME aconway 2011-11-15: work around async replication. + # FIXME aconway 2011-11-15: work around async wiring replication. + # Wait for an address to become valid. def wait(self, session, address): def check(): try: @@ -48,6 +49,13 @@ class ShortTests(BrokerTest): except NotFound: return False assert retry(check), "Timed out waiting for %s"%(address) + # FIXME aconway 2012-01-23: workaround: we need to give the + # backup a chance to attach to the queue. + def wait_backup(self, backup, address): + bs = self.connect_admin(backup).session() + self.wait(bs, address) + bs.connection.close() + def set_ha_status(self, address, status): os.system("qpid-ha-status %s %s"%(address, status)) @@ -209,7 +217,8 @@ class ShortTests(BrokerTest): raise def test_failover(self): - """Verify that backups rejects connections and that fail-over works""" + """Verify that backups rejects connections and that fail-over works in python client""" + getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL, broker_url="primary") # Temp hack to identify primary backup = self.ha_broker(name="backup", broker_url=primary.host_port()) # Check that backup rejects normal connections @@ -223,20 +232,36 @@ class ShortTests(BrokerTest): # Test discovery: should connect to primary after reject by backup c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True) s = c.session() - s.sender("q;{create:always,%s}"%(self.qpid_replicate())).send("foo", sync=True) - # FIXME aconway 2012-01-23: we shouldn't need the wait and retry here, - # send(sync=True) shouldn't return till the backup acknowledges. - bs = self.connect_admin(backup).session() - self.wait(bs, "q") - self.assert_browse_retry(bs, "q", ["foo"]) - bs.connection.close() + sender = s.sender("q;{create:always,%s}"%(self.qpid_replicate())) + self.wait_backup(backup, "q") + sender.send("foo") + primary.kill() + assert retry(lambda: not is_running(primary.pid)) + self.set_ha_status(backup.host_port(), "primary") # Promote the backup + self.assert_browse_retry(s, "q", ["foo"]) + c.close() + + def test_failover_cpp(self): + primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL, broker_url="primary") # Temp hack to identify primary + backup = self.ha_broker(name="backup", broker_url=primary.host_port()) + url="%s,%s"%(primary.host_port(), backup.host_port()) + primary.connect().session().sender("q;{create:always,%s}"%(self.qpid_replicate())) + self.wait_backup(backup, "q") + + sender = NumberedSender(primary, url=url, queue="q", failover_updates = False) + receiver = NumberedReceiver(primary, url=url, queue="q", failover_updates = False) + receiver.start() + sender.start() + self.wait_backup(backup, "q") + assert retry(lambda: receiver.received > 10) # Wait for some messages to get thru primary.kill() - # Promote the backup + assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die self.set_ha_status(backup.host_port(), "primary") - # FIXME aconway 2012-01-23: should re-use session s below - self.assert_browse_retry(c.session(), "q", ["foo"]) - c.close() + n = receiver.received # Make sure we are still running + assert retry(lambda: receiver.received > n + 10) + sender.stop() + receiver.stop() if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) |