diff options
author | Alan Conway <aconway@apache.org> | 2011-06-14 17:57:25 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-06-14 17:57:25 +0000 |
commit | aa8367c48e6144a7aae832c1e4a151d013916443 (patch) | |
tree | f6c090198b00d0654f6a0714282c8a4e19fb6461 | |
parent | 1c934ab62af1ef9d0da342f5d45e7b614ac59789 (diff) | |
download | qpid-python-aa8367c48e6144a7aae832c1e4a151d013916443.tar.gz |
NO-JIRA: Fix bugs in test_failover test and the brokertest.py framework.
- brokertest.py was not reliably detecting failed processes.
- test_failover was not setting the reconnect option on its connections.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1135722 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 13 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 34 |
2 files changed, 30 insertions, 17 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index a19dd305e5..2795af3c5a 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -157,8 +157,13 @@ class Popen(subprocess.Popen): try: self.kill() # Just make sure its dead except: pass elif self.expect == EXPECT_RUNNING: - try: self.kill() - except: self.unexpected("expected running, exit code %d" % self.wait()) + if self.poll() != None: + self.unexpected("expected running, exit code %d" % self.returncode) + else: + try: + self.kill() + except Exception,e: + self.unexpected("exception from kill: %s" % str(e)) else: retry(lambda: self.poll() is not None) if self.returncode is None: # Still haven't stopped @@ -544,6 +549,7 @@ class NumberedSender(Thread): "--broker", "localhost:%s"%broker.port(), "--address", "%s;{create:always}"%queue, "--failover-updates", + "--connection-options", "{reconnect:true,reconnect-timeout:10}", "--content-stdin" ], expect=EXPECT_RUNNING, @@ -562,6 +568,7 @@ class NumberedSender(Thread): try: self.sent = 0 while not self.stopped: + self.sender.assert_running() if self.max: self.condition.acquire() while not self.stopped and self.sent - self.received > self.max: @@ -604,6 +611,7 @@ class NumberedReceiver(Thread): "--broker", "localhost:%s"%broker.port(), "--address", "%s;{create:always}"%queue, "--failover-updates", + "--connection-options", "{reconnect:true,reconnect_timeout:10}", "--forever" ], expect=EXPECT_RUNNING, @@ -620,6 +628,7 @@ class NumberedReceiver(Thread): self.received = 0 m = self.read_message() while m != -1: + self.receiver.assert_running() assert(m <= self.received) # Check for missing messages if (m == self.received): # Ignore duplicates self.received += 1 diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 593791297a..8f20f3cac6 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -253,6 +253,7 @@ acl allow all all client was attached. """ args=["--mgmt-pub-interval=1","--log-enable=trace+:management"] + # First broker will be killed. cluster0 = self.cluster(1, args=args) cluster1 = self.cluster(1, args=args) assert 0 == subprocess.call( @@ -287,6 +288,7 @@ acl allow all all # Force a change of elder cluster0.start() + cluster0[0].expect=EXPECT_EXIT_FAIL # About to die. cluster0[0].kill() time.sleep(2) # Allow a management interval to pass. # Verify logs are consistent @@ -305,7 +307,7 @@ acl allow all all "--sequence=true", "--send-eos=1", "--messages=100000", - "--connection-options={reconnect:true}" + "--connection-options={reconnect:true,reconnect_timeout:10}" ]) self.receiver = self.popen( ["qpid-receive", @@ -313,7 +315,7 @@ acl allow all all "--address", queue, "--ignore-duplicates", "--check-redelivered", - "--connection-options={reconnect:true}", + "--connection-options={reconnect:true,reconnect_timeout:10}", "--forever" ]) time.sleep(1)#give sender enough time to have some messages to replay @@ -461,7 +463,7 @@ acl allow all all "--content-size=%s" % self.size, "--messages=%s" % self.count, "--failover-updates", - "--connection-options={reconnect:true}", + "--connection-options={reconnect:true,reconnect_timeout:10}", "--address=%s" % self.queue, "--broker=%s" % self.broker.host_port()]) self.sender.wait() @@ -493,7 +495,7 @@ acl allow all all "--timeout=1", "--print-content=no", "--failover-updates", - "--connection-options={reconnect:true}", + "--connection-options={reconnect:true,reconnect_timeout:10}", "--ack-frequency=1", "--address=flq", "--broker=%s" % cluster[1].host_port()]) @@ -518,14 +520,14 @@ acl allow all all "--timeout=1", "--print-content=no", "--failover-updates", - "--connection-options={reconnect:true}", + "--connection-options={reconnect:true,reconnect_timeout:10}", "--ack-frequency=1", "--address=flq", "--broker=%s" % cluster[2].host_port()]) receiver.wait() q_obj.update() assert not q_obj.flowStopped - assert q_obj.msgDepth == 0 + self.assertEqual(q_obj.msgDepth, 0) # verify that the sender has become unblocked sender.join(timeout=5) @@ -701,18 +703,21 @@ class LongTests(BrokerTest): # Start sender and receiver threads cluster[0].declare_queue("test-queue") - sender = NumberedSender(cluster[1], 1000) # Max queue depth - receiver = NumberedReceiver(cluster[2], sender) + sender = NumberedSender(cluster[0], 1000) # Max queue depth + receiver = NumberedReceiver(cluster[0], sender) receiver.start() sender.start() - + for b in cluster: b.ready() # Make sure brokers are ready # Kill original brokers, start new ones for the duration. endtime = time.time() + self.duration() i = 0 while time.time() < endtime: + sender.sender.assert_running() + receiver.receiver.assert_running() cluster[i].kill() i += 1 b = cluster.start(expect=EXPECT_EXIT_FAIL) + for b in cluster[i:]: b.ready() ErrorGenerator(b) time.sleep(5) sender.stop() @@ -853,29 +858,28 @@ class LongTests(BrokerTest): # Original cluster will all be killed so expect exit with failure cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL) - #for b in cluster: ErrorGenerator(b) # create a queue with rather draconian flow control settings ssn0 = cluster[0].connect().session() s0 = ssn0.sender("test-queue; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':2000, 'qpid.flow_resume_count':100}}}}") - receiver = NumberedReceiver(cluster[2]) + receiver = NumberedReceiver(cluster[0]) receiver.start() - senders = [NumberedSender(cluster[i]) for i in range(1,3)] + senders = [NumberedSender(cluster[0]) for i in range(1,3)] for s in senders: s.start() + for b in cluster: b.ready() # Make sure brokers are ready # Kill original brokers, start new ones for the duration. endtime = time.time() + self.duration(); i = 0 while time.time() < endtime: + for s in senders: s.sender.assert_running() + receiver.receiver.assert_running() cluster[i].kill() i += 1 b = cluster.start(expect=EXPECT_EXIT_FAIL) - #ErrorGenerator(b) time.sleep(5) - #b = cluster[0] - #b.startQmf() for s in senders: s.stop() receiver.stop() |