summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/ha_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py80
1 files changed, 48 insertions, 32 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index c213e6a4ff..e782b57f7f 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -27,18 +27,30 @@ from logging import getLogger, WARN, ERROR, DEBUG
log = getLogger("qpid.ha-tests")
+class HaBroker(Broker):
+ def __init__(self, test, args=[], broker_url=None, **kwargs):
+ assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
+ Broker.__init__(self, test,
+ args=["--load-module", BrokerTest.ha_lib,
+ "--ha-enable=yes",
+ "--ha-broker-url", broker_url ],
+ **kwargs)
+
+ def promote(self):
+ assert os.system("qpid-ha-tool --promote %s"%(self.host_port())) == 0
+
+ def set_client_url(self, url):
+ assert os.system(
+ "qpid-ha-tool --client-addresses=%s %s"%(url,self.host_port())) == 0
+
+ def set_broker_url(self, url):
+ assert os.system(
+ "qpid-ha-tool --broker-addresses=%s %s"%(url, self.host_port())) == 0
+
+
class ShortTests(BrokerTest):
"""Short HA functionality tests."""
- def ha_broker(self, args=[], client_url="dummy", broker_url="dummy", **kwargs):
- assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
- return Broker(self, args=["--load-module", BrokerTest.ha_lib,
- "--ha-enable=yes",
- "--ha-client-url", client_url,
- "--ha-broker-url", broker_url,
- ] + args,
- **kwargs)
-
# FIXME aconway 2011-11-15: work around async configuration replication.
# Wait for an address to become valid.
def wait(self, session, address):
@@ -49,15 +61,19 @@ class ShortTests(BrokerTest):
except NotFound: return False
assert retry(check), "Timed out waiting for %s"%(address)
- # FIXME aconway 2012-01-23: workaround: we need to give the
- # backup a chance to attach to the queue.
+ # FIXME aconway 2012-01-23: work around async configuration replication.
+ # Wait for address to become valid on a backup broker.
def wait_backup(self, backup, address):
bs = self.connect_admin(backup).session()
self.wait(bs, address)
bs.connection.close()
- def promote(self, broker):
- os.system("qpid-ha-tool --promote %s"%(broker.host_port()))
+ # Combines wait_backup and assert_browse_retry
+ def assert_browse_backup(self, backup, queue, expected, **kwargs):
+ bs = self.connect_admin(backup).session()
+ self.wait(bs, queue)
+ self.assert_browse_retry(bs, queue, expected, **kwargs)
+ bs.connection.close()
def assert_missing(self, session, address):
try:
@@ -122,13 +138,13 @@ class ShortTests(BrokerTest):
b.sender(prefix+"e4").send(Message("drop2")) # Verify unbind.
self.assert_browse_retry(b, prefix+"q4", ["6","7"])
- primary = self.ha_broker(name="primary")
- self.promote(primary)
+ primary = HaBroker(self, name="primary")
+ primary.promote()
p = primary.connect().session()
# Create config, send messages before starting the backup, to test catch-up replication.
setup(p, "1", primary)
- backup = self.ha_broker(name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", broker_url=primary.host_port())
# Create config, send messages after starting the backup, to test steady-state replication.
setup(p, "2", primary)
@@ -165,16 +181,16 @@ class ShortTests(BrokerTest):
def test_sync(self):
def queue(name, replicate):
return "%s;{create:always,%s}"%(name, self.qpid_replicate(replicate))
- primary = self.ha_broker(name="primary")
- self.promote(primary)
+ primary = HaBroker(self, name="primary")
+ primary.promote()
p = primary.connect().session()
s = p.sender(queue("q","messages"))
for m in [str(i) for i in range(0,10)]: s.send(m)
s.sync()
- backup1 = self.ha_broker(name="backup1", broker_url=primary.host_port())
+ backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
for m in [str(i) for i in range(10,20)]: s.send(m)
s.sync()
- backup2 = self.ha_broker(name="backup2", broker_url=primary.host_port())
+ backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port())
for m in [str(i) for i in range(20,30)]: s.send(m)
s.sync()
@@ -188,10 +204,10 @@ class ShortTests(BrokerTest):
def test_send_receive(self):
"""Verify sequence numbers of messages sent by qpid-send"""
- primary = self.ha_broker(name="primary")
- self.promote(primary)
- backup1 = self.ha_broker(name="backup1", broker_url=primary.host_port())
- backup2 = self.ha_broker(name="backup2", broker_url=primary.host_port())
+ primary = HaBroker(self, name="primary")
+ primary.promote()
+ backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
+ backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port())
sender = self.popen(
["qpid-send",
"--broker", primary.host_port(),
@@ -222,9 +238,9 @@ class ShortTests(BrokerTest):
def test_failover(self):
"""Verify that backups rejects connections and that fail-over works in python client"""
getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover
- primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL)
- self.promote(primary)
- backup = self.ha_broker(name="backup", broker_url=primary.host_port())
+ primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
+ primary.promote()
+ backup = HaBroker(self, name="backup", broker_url=primary.host_port())
# Check that backup rejects normal connections
try:
backup.connect()
@@ -241,14 +257,14 @@ class ShortTests(BrokerTest):
sender.send("foo")
primary.kill()
assert retry(lambda: not is_running(primary.pid))
- self.promote(backup)
+ backup.promote()
self.assert_browse_retry(s, "q", ["foo"])
c.close()
def test_failover_cpp(self):
- primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL)
- self.promote(primary)
- backup = self.ha_broker(name="backup", broker_url=primary.host_port())
+ primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
+ primary.promote()
+ backup = HaBroker(self, name="backup", broker_url=primary.host_port())
url="%s,%s"%(primary.host_port(), backup.host_port())
primary.connect().session().sender("q;{create:always,%s}"%(self.qpid_replicate()))
self.wait_backup(backup, "q")
@@ -262,7 +278,7 @@ class ShortTests(BrokerTest):
primary.kill()
assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die
- self.promote(backup)
+ backup.promote()
n = receiver.received # Make sure we are still running
assert retry(lambda: receiver.received > n + 10)
sender.stop()