diff options
author | Alan Conway <aconway@apache.org> | 2011-06-20 15:20:21 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-06-20 15:20:21 +0000 |
commit | ed552685e7d21811fc6815116026eeb6a20b59c3 (patch) | |
tree | 5628778100fe2bad54979a41cdab381ba5266ecf | |
parent | f1616d61d152fe80e87e99d73851ade5e58b2e23 (diff) | |
download | qpid-python-ed552685e7d21811fc6815116026eeb6a20b59c3.tar.gz |
QPID-3129: cluster_tests.LongTests.test_failover hangs
Problem: the first broker in the cluster could be killed before the
receiver was connected, so the receiver could not fail-over, it didn't
have a failover update. Fix: wait for the first message to be received
by the receiver before starting the broker-kill loop.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1137657 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 6 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 14 |
2 files changed, 15 insertions, 5 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 36c91489e2..0415a667a2 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -76,7 +76,7 @@ def error_line(filename, n=1): except: return "" return ":\n" + "".join(result) -def retry(function, timeout=30, delay=.01): +def retry(function, timeout=10, delay=.01): """Call function until it returns True or timeout expires. Double the delay for each retry. Return True if function returns true, False if timeout expires.""" @@ -509,7 +509,7 @@ class BrokerTest(TestCase): actual_contents = self.browse(session, queue, timeout) self.assertEqual(expect_contents, actual_contents) -def join(thread, timeout=60): +def join(thread, timeout=10): thread.join(timeout) if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread) @@ -619,13 +619,13 @@ class NumberedReceiver(Thread): self.lock = Lock() self.error = None self.sender = sender + self.received = 0 def read_message(self): return int(self.receiver.stdout.readline()) def run(self): try: - self.received = 0 m = self.read_message() while m != -1: self.receiver.assert_running() diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index f5ce2a234f..18cb4d5c22 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -725,6 +725,7 @@ class LongTests(BrokerTest): # Original cluster will all be killed so expect exit with failure cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL) + for b in cluster: b.ready() # Wait for brokers to be ready for b in cluster: ErrorGenerator(b) # Start sender and receiver threads @@ -733,7 +734,9 @@ class LongTests(BrokerTest): receiver = NumberedReceiver(cluster[0], sender) receiver.start() sender.start() - for b in cluster: b.ready() # Make sure brokers are ready + # Wait for sender & receiver to get up and running + retry(lambda: receiver.received > 0) + # Kill original brokers, start new ones for the duration. endtime = time.time() + self.duration() i = 0 @@ -888,6 +891,7 @@ class LongTests(BrokerTest): # Original cluster will all be killed so expect exit with failure cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL) + for b in cluster: b.ready() # Wait for brokers to be ready # create a queue with rather draconian flow control settings ssn0 = cluster[0].connect().session() @@ -898,7 +902,8 @@ class LongTests(BrokerTest): senders = [NumberedSender(cluster[0]) for i in range(1,3)] for s in senders: s.start() - for b in cluster: b.ready() # Make sure brokers are ready + # Wait for senders & receiver to get up and running + retry(lambda: receiver.received > 2*senders) # Kill original brokers, start new ones for the duration. endtime = time.time() + self.duration(); @@ -990,6 +995,8 @@ class LongTests(BrokerTest): # Original cluster will all be killed so expect exit with failure # Set small purge interval. cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["--queue-purge-interval=1"]) + for b in cluster: b.ready() # Wait for brokers to be ready + # Python client failover produces noisy WARN logs, disable temporarily logger = logging.getLogger() log_level = logger.getEffectiveLevel() @@ -1000,6 +1007,8 @@ class LongTests(BrokerTest): receiver.start() sender = Sender(cluster[0], "q;{create:always}") sender.start() + # Wait for sender & receiver to get up and running + retry(lambda: receiver.received > 0) # Kill brokers in a cycle. endtime = time.time() + self.duration() @@ -1019,6 +1028,7 @@ class LongTests(BrokerTest): b.kill() self.assertEqual(sender.sent, receiver.received) cluster_test_logs.verify_logs() + finally: # Detach to avoid slow reconnect attempts during shut-down if test fails. sender.connection.detach() |