summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/ha_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py53
1 files changed, 39 insertions, 14 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index f3c45ba7a3..6d5dc19aa6 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -22,7 +22,7 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil
from qpid.messaging import Message, NotFound, ConnectionError, Connection
from brokertest import *
from threading import Thread, Lock, Condition
-from logging import getLogger
+from logging import getLogger, WARN, ERROR, DEBUG
log = getLogger("qpid.ha-tests")
@@ -39,7 +39,8 @@ class ShortTests(BrokerTest):
] + args,
**kwargs)
- # FIXME aconway 2011-11-15: work around async replication.
+ # FIXME aconway 2011-11-15: work around async wiring replication.
+ # Wait for an address to become valid.
def wait(self, session, address):
def check():
try:
@@ -48,6 +49,13 @@ class ShortTests(BrokerTest):
except NotFound: return False
assert retry(check), "Timed out waiting for %s"%(address)
+ # FIXME aconway 2012-01-23: workaround: we need to give the
+ # backup a chance to attach to the queue.
+ def wait_backup(self, backup, address):
+ bs = self.connect_admin(backup).session()
+ self.wait(bs, address)
+ bs.connection.close()
+
def set_ha_status(self, address, status):
os.system("qpid-ha-status %s %s"%(address, status))
@@ -209,7 +217,8 @@ class ShortTests(BrokerTest):
raise
def test_failover(self):
- """Verify that backups rejects connections and that fail-over works"""
+ """Verify that backups rejects connections and that fail-over works in python client"""
+ getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover
primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL, broker_url="primary") # Temp hack to identify primary
backup = self.ha_broker(name="backup", broker_url=primary.host_port())
# Check that backup rejects normal connections
@@ -223,20 +232,36 @@ class ShortTests(BrokerTest):
# Test discovery: should connect to primary after reject by backup
c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True)
s = c.session()
- s.sender("q;{create:always,%s}"%(self.qpid_replicate())).send("foo", sync=True)
- # FIXME aconway 2012-01-23: we shouldn't need the wait and retry here,
- # send(sync=True) shouldn't return till the backup acknowledges.
- bs = self.connect_admin(backup).session()
- self.wait(bs, "q")
- self.assert_browse_retry(bs, "q", ["foo"])
- bs.connection.close()
+ sender = s.sender("q;{create:always,%s}"%(self.qpid_replicate()))
+ self.wait_backup(backup, "q")
+ sender.send("foo")
+ primary.kill()
+ assert retry(lambda: not is_running(primary.pid))
+ self.set_ha_status(backup.host_port(), "primary") # Promote the backup
+ self.assert_browse_retry(s, "q", ["foo"])
+ c.close()
+
+ def test_failover_cpp(self):
+ primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL, broker_url="primary") # Temp hack to identify primary
+ backup = self.ha_broker(name="backup", broker_url=primary.host_port())
+ url="%s,%s"%(primary.host_port(), backup.host_port())
+ primary.connect().session().sender("q;{create:always,%s}"%(self.qpid_replicate()))
+ self.wait_backup(backup, "q")
+
+ sender = NumberedSender(primary, url=url, queue="q", failover_updates = False)
+ receiver = NumberedReceiver(primary, url=url, queue="q", failover_updates = False)
+ receiver.start()
+ sender.start()
+ self.wait_backup(backup, "q")
+ assert retry(lambda: receiver.received > 10) # Wait for some messages to get thru
primary.kill()
- # Promote the backup
+ assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die
self.set_ha_status(backup.host_port(), "primary")
- # FIXME aconway 2012-01-23: should re-use session s below
- self.assert_browse_retry(c.session(), "q", ["foo"])
- c.close()
+ n = receiver.received # Make sure we are still running
+ assert retry(lambda: receiver.received > n + 10)
+ sender.stop()
+ receiver.stop()
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)