diff options
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 46 |
1 files changed, 34 insertions, 12 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index bf044765b5..f3c45ba7a3 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -19,7 +19,7 @@ # import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil -from qpid.messaging import Message, NotFound, ConnectionError +from qpid.messaging import Message, NotFound, ConnectionError, Connection from brokertest import * from threading import Thread, Lock, Condition from logging import getLogger @@ -48,15 +48,18 @@ class ShortTests(BrokerTest): except NotFound: return False assert retry(check), "Timed out waiting for %s"%(address) - def assert_missing(self,session, address): + def set_ha_status(self, address, status): + os.system("qpid-ha-status %s %s"%(address, status)) + + def assert_missing(self, session, address): try: session.receiver(address) self.fail("Should not have been replicated: %s"%(address)) except NotFound: pass def connect_admin(self, backup, **kwargs): - """Connect to a backup broker as the admin user""" - return backup.connect(username="qpid-ha-admin", password="dummy", mechanism="PLAIN", **kwargs) + """Connect to a backup broker as an admin connection""" + return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs) def test_replication(self): """Test basic replication of wiring and messages before and @@ -113,6 +116,7 @@ class ShortTests(BrokerTest): primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary p = primary.connect().session() + # Create config, send messages before starting the backup, to test catch-up replication. setup(p, "1", primary) backup = self.ha_broker(name="backup", broker_url=primary.host_port()) @@ -120,10 +124,9 @@ class ShortTests(BrokerTest): setup(p, "2", primary) # Verify the data on the backup - b = self.connect_admin(backup, ).session() + b = self.connect_admin(backup).session() verify(b, "1", p) verify(b, "2", p) - # Test a series of messages, enqueue all then dequeue all. s = p.sender(queue("foo","all")) self.wait(b, "foo") @@ -174,6 +177,7 @@ class ShortTests(BrokerTest): self.assert_browse_retry(b2, "q", msgs) def test_send_receive(self): + """Verify sequence numbers of messages sent by qpid-send""" primary = self.ha_broker(name="primary", broker_url="primary") backup1 = self.ha_broker(name="backup1", broker_url=primary.host_port()) backup2 = self.ha_broker(name="backup2", broker_url=primary.host_port()) @@ -204,17 +208,35 @@ class ShortTests(BrokerTest): print self.browse(self.connect_admin(backup2).session(), "q", transform=sn) raise - def test_exclude(self): - """Verify that backup rejects connections""" - primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary + def test_failover(self): + """Verify that backups rejects connections and that fail-over works""" + primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL, broker_url="primary") # Temp hack to identify primary backup = self.ha_broker(name="backup", broker_url=primary.host_port()) - # Admin is allowed - self.connect_admin(backup) - # Others are not + # Check that backup rejects normal connections try: backup.connect() self.fail("Expected connection to backup to fail") except ConnectionError: pass + # Check that admin connections are allowed to backup. + self.connect_admin(backup).close() + + # Test discovery: should connect to primary after reject by backup + c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True) + s = c.session() + s.sender("q;{create:always,%s}"%(self.qpid_replicate())).send("foo", sync=True) + # FIXME aconway 2012-01-23: we shouldn't need the wait and retry here, + # send(sync=True) shouldn't return till the backup acknowledges. + bs = self.connect_admin(backup).session() + self.wait(bs, "q") + self.assert_browse_retry(bs, "q", ["foo"]) + bs.connection.close() + + primary.kill() + # Promote the backup + self.set_ha_status(backup.host_port(), "primary") + # FIXME aconway 2012-01-23: should re-use session s below + self.assert_browse_retry(c.session(), "q", ["foo"]) + c.close() if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) |