diff options
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 80 |
1 files changed, 48 insertions, 32 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index c213e6a4ff..e782b57f7f 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -27,18 +27,30 @@ from logging import getLogger, WARN, ERROR, DEBUG log = getLogger("qpid.ha-tests") +class HaBroker(Broker): + def __init__(self, test, args=[], broker_url=None, **kwargs): + assert BrokerTest.ha_lib, "Cannot locate HA plug-in" + Broker.__init__(self, test, + args=["--load-module", BrokerTest.ha_lib, + "--ha-enable=yes", + "--ha-broker-url", broker_url ], + **kwargs) + + def promote(self): + assert os.system("qpid-ha-tool --promote %s"%(self.host_port())) == 0 + + def set_client_url(self, url): + assert os.system( + "qpid-ha-tool --client-addresses=%s %s"%(url,self.host_port())) == 0 + + def set_broker_url(self, url): + assert os.system( + "qpid-ha-tool --broker-addresses=%s %s"%(url, self.host_port())) == 0 + + class ShortTests(BrokerTest): """Short HA functionality tests.""" - def ha_broker(self, args=[], client_url="dummy", broker_url="dummy", **kwargs): - assert BrokerTest.ha_lib, "Cannot locate HA plug-in" - return Broker(self, args=["--load-module", BrokerTest.ha_lib, - "--ha-enable=yes", - "--ha-client-url", client_url, - "--ha-broker-url", broker_url, - ] + args, - **kwargs) - # FIXME aconway 2011-11-15: work around async configuration replication. # Wait for an address to become valid. def wait(self, session, address): @@ -49,15 +61,19 @@ class ShortTests(BrokerTest): except NotFound: return False assert retry(check), "Timed out waiting for %s"%(address) - # FIXME aconway 2012-01-23: workaround: we need to give the - # backup a chance to attach to the queue. + # FIXME aconway 2012-01-23: work around async configuration replication. + # Wait for address to become valid on a backup broker. def wait_backup(self, backup, address): bs = self.connect_admin(backup).session() self.wait(bs, address) bs.connection.close() - def promote(self, broker): - os.system("qpid-ha-tool --promote %s"%(broker.host_port())) + # Combines wait_backup and assert_browse_retry + def assert_browse_backup(self, backup, queue, expected, **kwargs): + bs = self.connect_admin(backup).session() + self.wait(bs, queue) + self.assert_browse_retry(bs, queue, expected, **kwargs) + bs.connection.close() def assert_missing(self, session, address): try: @@ -122,13 +138,13 @@ class ShortTests(BrokerTest): b.sender(prefix+"e4").send(Message("drop2")) # Verify unbind. self.assert_browse_retry(b, prefix+"q4", ["6","7"]) - primary = self.ha_broker(name="primary") - self.promote(primary) + primary = HaBroker(self, name="primary") + primary.promote() p = primary.connect().session() # Create config, send messages before starting the backup, to test catch-up replication. setup(p, "1", primary) - backup = self.ha_broker(name="backup", broker_url=primary.host_port()) + backup = HaBroker(self, name="backup", broker_url=primary.host_port()) # Create config, send messages after starting the backup, to test steady-state replication. setup(p, "2", primary) @@ -165,16 +181,16 @@ class ShortTests(BrokerTest): def test_sync(self): def queue(name, replicate): return "%s;{create:always,%s}"%(name, self.qpid_replicate(replicate)) - primary = self.ha_broker(name="primary") - self.promote(primary) + primary = HaBroker(self, name="primary") + primary.promote() p = primary.connect().session() s = p.sender(queue("q","messages")) for m in [str(i) for i in range(0,10)]: s.send(m) s.sync() - backup1 = self.ha_broker(name="backup1", broker_url=primary.host_port()) + backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) for m in [str(i) for i in range(10,20)]: s.send(m) s.sync() - backup2 = self.ha_broker(name="backup2", broker_url=primary.host_port()) + backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port()) for m in [str(i) for i in range(20,30)]: s.send(m) s.sync() @@ -188,10 +204,10 @@ class ShortTests(BrokerTest): def test_send_receive(self): """Verify sequence numbers of messages sent by qpid-send""" - primary = self.ha_broker(name="primary") - self.promote(primary) - backup1 = self.ha_broker(name="backup1", broker_url=primary.host_port()) - backup2 = self.ha_broker(name="backup2", broker_url=primary.host_port()) + primary = HaBroker(self, name="primary") + primary.promote() + backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) + backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port()) sender = self.popen( ["qpid-send", "--broker", primary.host_port(), @@ -222,9 +238,9 @@ class ShortTests(BrokerTest): def test_failover(self): """Verify that backups rejects connections and that fail-over works in python client""" getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover - primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL) - self.promote(primary) - backup = self.ha_broker(name="backup", broker_url=primary.host_port()) + primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) + primary.promote() + backup = HaBroker(self, name="backup", broker_url=primary.host_port()) # Check that backup rejects normal connections try: backup.connect() @@ -241,14 +257,14 @@ class ShortTests(BrokerTest): sender.send("foo") primary.kill() assert retry(lambda: not is_running(primary.pid)) - self.promote(backup) + backup.promote() self.assert_browse_retry(s, "q", ["foo"]) c.close() def test_failover_cpp(self): - primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL) - self.promote(primary) - backup = self.ha_broker(name="backup", broker_url=primary.host_port()) + primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) + primary.promote() + backup = HaBroker(self, name="backup", broker_url=primary.host_port()) url="%s,%s"%(primary.host_port(), backup.host_port()) primary.connect().session().sender("q;{create:always,%s}"%(self.qpid_replicate())) self.wait_backup(backup, "q") @@ -262,7 +278,7 @@ class ShortTests(BrokerTest): primary.kill() assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die - self.promote(backup) + backup.promote() n = receiver.received # Make sure we are still running assert retry(lambda: receiver.received > n + 10) sender.stop() |