diff options
author | Alan Conway <aconway@apache.org> | 2012-02-17 14:16:43 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-02-17 14:16:43 +0000 |
commit | 1e5f4ac2ab4a8d9e5cecd22ba40d06b4619d2708 (patch) | |
tree | 6415053b0c0248f214c80dd2609884f07e03ea95 | |
parent | 09d1a9ef73770af529e9775da2c95c38c2d87e2f (diff) | |
download | qpid-python-1e5f4ac2ab4a8d9e5cecd22ba40d06b4619d2708.tar.gz |
QPID-3603: Added failover test for HA brokers.
Disabled: failing due to known issue in current code, enable when fixed.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-7@1245551 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 6 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 60 |
2 files changed, 59 insertions, 7 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 5b18e58dee..5f235e4451 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -658,10 +658,14 @@ class NumberedReceiver(Thread): except Exception: self.error = RethrownException(self.receiver.pname) + def check(self): + """Raise an exception if there has been an error""" + if self.error: raise self.error + def stop(self): """Returns when termination message is received""" join(self) - if self.error: raise self.error + self.check() class ErrorGenerator(StoppableThread): """ diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index e38538a36f..950a092018 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -52,7 +52,6 @@ class HaBroker(Broker): class ShortTests(BrokerTest): """Short HA functionality tests.""" - # FIXME aconway 2011-11-15: work around async configuration replication. # Wait for an address to become valid. def wait(self, session, address): def check(): @@ -62,7 +61,6 @@ class ShortTests(BrokerTest): except NotFound: return False assert retry(check), "Timed out waiting for %s"%(address) - # FIXME aconway 2012-01-23: work around async configuration replication. # Wait for address to become valid on a backup broker. def wait_backup(self, backup, address): bs = self.connect_admin(backup).session() @@ -114,15 +112,14 @@ class ShortTests(BrokerTest): us = primary.connect_old().session(str(qpid.datatypes.uuid4())) us.exchange_unbind(exchange=prefix+"e4", binding_key="", queue=prefix+"q4") p.sender(prefix+"e4").send(Message("drop1")) # Should be dropped - # FIXME aconway 2011-11-24: need a marker so we can wait till sync is done. + # Need a marker so we can wait till sync is done. p.sender(queue(prefix+"x", "configuration")) def verify(b, prefix, p): """Verify setup was replicated to backup b""" - # FIXME aconway 2011-11-21: wait for configuration to replicate. + # Wait for configuration to replicate. self.wait(b, prefix+"x"); - # FIXME aconway 2011-11-24: assert_browse_retry to deal with async replication. self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"]) self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "b") @@ -295,13 +292,64 @@ class ShortTests(BrokerTest): brokers[0].connect().session().sender( "q;{create:always,%s}"%(self.qpid_replicate())).send("a") for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"]) - # FIXME aconway 2012-01-30: failing - not using set URL? brokers[0].kill() brokers[2].promote() # c must fail over to b. brokers[2].connect().session().sender("q").send("b") self.assert_browse_backup(brokers[1], "q", ["a","b"]) for b in brokers[1:]: b.kill() +class LongTests(BrokerTest): + """Tests that can run for a long time if -DDURATION=<minutes> is set""" + + def duration(self): + d = self.config.defines.get("DURATION") + if d: return float(d)*60 + else: return 3 # Default is to be quick + + + def disable_test_failover(self): + """Test failover with continuous send-receive""" + # FIXME aconway 2012-02-03: fails due to dropped messages, + # known issue: sending messages to new primary before + # backups are ready. + + # Start a cluster, all members will be killed during the test. + brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL) + for name in ["ha0","ha1","ha2"] ] + url = ",".join([b.host_port() for b in brokers]) + for b in brokers: b.set_broker_url(url) + brokers[0].promote() + + # Start sender and receiver threads + sender = NumberedSender(brokers[0], max_depth=1000, failover_updates=False) + receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False) + receiver.start() + sender.start() + # Wait for sender & receiver to get up and running + assert retry(lambda: receiver.received > 100) + # Kill and restart brokers in a cycle: + endtime = time.time() + self.duration() + i = 0 + while time.time() < endtime or i < 3: # At least 3 iterations + sender.sender.assert_running() + receiver.receiver.assert_running() + port = brokers[i].port() + brokers[i].kill() + brokers.append( + HaBroker(self, name="ha%d"%(i+3), broker_url=url, port=port, + expect=EXPECT_EXIT_FAIL)) + i += 1 + brokers[i].promote() + n = receiver.received # Verify we're still running + def enough(): + receiver.check() # Verify no exceptions + return receiver.received > n + 100 + assert retry(enough, timeout=5) + + sender.stop() + receiver.stop() + for b in brokers[i:]: b.kill() + if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:]) |