summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-14 16:11:57 +0000
committerAlan Conway <aconway@apache.org>2012-02-14 16:11:57 +0000
commit78a6fbc1a4c95cbcc210320da0853baaf4eb2f36 (patch)
tree303d4ca494ebaffae0dbe62cc36093b663a13796
parent7b5295b9f2b21a259dfb084ee6e4d04103ac6cc0 (diff)
downloadqpid-python-78a6fbc1a4c95cbcc210320da0853baaf4eb2f36.tar.gz
QPID-3603: Added failover test for HA brokers.
Disabled: failing due to known issue in current code, enable when fixed. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-6@1244104 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/tests/brokertest.py6
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py60
2 files changed, 59 insertions, 7 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 5b18e58dee..5f235e4451 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -658,10 +658,14 @@ class NumberedReceiver(Thread):
except Exception:
self.error = RethrownException(self.receiver.pname)
+ def check(self):
+ """Raise an exception if there has been an error"""
+ if self.error: raise self.error
+
def stop(self):
"""Returns when termination message is received"""
join(self)
- if self.error: raise self.error
+ self.check()
class ErrorGenerator(StoppableThread):
"""
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index e38538a36f..950a092018 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -52,7 +52,6 @@ class HaBroker(Broker):
class ShortTests(BrokerTest):
"""Short HA functionality tests."""
- # FIXME aconway 2011-11-15: work around async configuration replication.
# Wait for an address to become valid.
def wait(self, session, address):
def check():
@@ -62,7 +61,6 @@ class ShortTests(BrokerTest):
except NotFound: return False
assert retry(check), "Timed out waiting for %s"%(address)
- # FIXME aconway 2012-01-23: work around async configuration replication.
# Wait for address to become valid on a backup broker.
def wait_backup(self, backup, address):
bs = self.connect_admin(backup).session()
@@ -114,15 +112,14 @@ class ShortTests(BrokerTest):
us = primary.connect_old().session(str(qpid.datatypes.uuid4()))
us.exchange_unbind(exchange=prefix+"e4", binding_key="", queue=prefix+"q4")
p.sender(prefix+"e4").send(Message("drop1")) # Should be dropped
- # FIXME aconway 2011-11-24: need a marker so we can wait till sync is done.
+ # Need a marker so we can wait till sync is done.
p.sender(queue(prefix+"x", "configuration"))
def verify(b, prefix, p):
"""Verify setup was replicated to backup b"""
- # FIXME aconway 2011-11-21: wait for configuration to replicate.
+ # Wait for configuration to replicate.
self.wait(b, prefix+"x");
- # FIXME aconway 2011-11-24: assert_browse_retry to deal with async replication.
self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"])
self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "b")
@@ -295,13 +292,64 @@ class ShortTests(BrokerTest):
brokers[0].connect().session().sender(
"q;{create:always,%s}"%(self.qpid_replicate())).send("a")
for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"])
- # FIXME aconway 2012-01-30: failing - not using set URL?
brokers[0].kill()
brokers[2].promote() # c must fail over to b.
brokers[2].connect().session().sender("q").send("b")
self.assert_browse_backup(brokers[1], "q", ["a","b"])
for b in brokers[1:]: b.kill()
+class LongTests(BrokerTest):
+ """Tests that can run for a long time if -DDURATION=<minutes> is set"""
+
+ def duration(self):
+ d = self.config.defines.get("DURATION")
+ if d: return float(d)*60
+ else: return 3 # Default is to be quick
+
+
+ def disable_test_failover(self):
+ """Test failover with continuous send-receive"""
+ # FIXME aconway 2012-02-03: fails due to dropped messages,
+ # known issue: sending messages to new primary before
+ # backups are ready.
+
+ # Start a cluster, all members will be killed during the test.
+ brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL)
+ for name in ["ha0","ha1","ha2"] ]
+ url = ",".join([b.host_port() for b in brokers])
+ for b in brokers: b.set_broker_url(url)
+ brokers[0].promote()
+
+ # Start sender and receiver threads
+ sender = NumberedSender(brokers[0], max_depth=1000, failover_updates=False)
+ receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False)
+ receiver.start()
+ sender.start()
+ # Wait for sender & receiver to get up and running
+ assert retry(lambda: receiver.received > 100)
+ # Kill and restart brokers in a cycle:
+ endtime = time.time() + self.duration()
+ i = 0
+ while time.time() < endtime or i < 3: # At least 3 iterations
+ sender.sender.assert_running()
+ receiver.receiver.assert_running()
+ port = brokers[i].port()
+ brokers[i].kill()
+ brokers.append(
+ HaBroker(self, name="ha%d"%(i+3), broker_url=url, port=port,
+ expect=EXPECT_EXIT_FAIL))
+ i += 1
+ brokers[i].promote()
+ n = receiver.received # Verify we're still running
+ def enough():
+ receiver.check() # Verify no exceptions
+ return receiver.received > n + 100
+ assert retry(enough, timeout=5)
+
+ sender.stop()
+ receiver.stop()
+ for b in brokers[i:]: b.kill()
+
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])