summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-06-14 17:57:25 +0000
committerAlan Conway <aconway@apache.org>2011-06-14 17:57:25 +0000
commitaa8367c48e6144a7aae832c1e4a151d013916443 (patch)
treef6c090198b00d0654f6a0714282c8a4e19fb6461
parent1c934ab62af1ef9d0da342f5d45e7b614ac59789 (diff)
downloadqpid-python-aa8367c48e6144a7aae832c1e4a151d013916443.tar.gz
NO-JIRA: Fix bugs in test_failover test and the brokertest.py framework.
- brokertest.py was not reliably detecting failed processes. - test_failover was not setting the reconnect option on its connections. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1135722 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/tests/brokertest.py13
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py34
2 files changed, 30 insertions, 17 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index a19dd305e5..2795af3c5a 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -157,8 +157,13 @@ class Popen(subprocess.Popen):
try: self.kill() # Just make sure its dead
except: pass
elif self.expect == EXPECT_RUNNING:
- try: self.kill()
- except: self.unexpected("expected running, exit code %d" % self.wait())
+ if self.poll() != None:
+ self.unexpected("expected running, exit code %d" % self.returncode)
+ else:
+ try:
+ self.kill()
+ except Exception,e:
+ self.unexpected("exception from kill: %s" % str(e))
else:
retry(lambda: self.poll() is not None)
if self.returncode is None: # Still haven't stopped
@@ -544,6 +549,7 @@ class NumberedSender(Thread):
"--broker", "localhost:%s"%broker.port(),
"--address", "%s;{create:always}"%queue,
"--failover-updates",
+ "--connection-options", "{reconnect:true,reconnect-timeout:10}",
"--content-stdin"
],
expect=EXPECT_RUNNING,
@@ -562,6 +568,7 @@ class NumberedSender(Thread):
try:
self.sent = 0
while not self.stopped:
+ self.sender.assert_running()
if self.max:
self.condition.acquire()
while not self.stopped and self.sent - self.received > self.max:
@@ -604,6 +611,7 @@ class NumberedReceiver(Thread):
"--broker", "localhost:%s"%broker.port(),
"--address", "%s;{create:always}"%queue,
"--failover-updates",
+ "--connection-options", "{reconnect:true,reconnect_timeout:10}",
"--forever"
],
expect=EXPECT_RUNNING,
@@ -620,6 +628,7 @@ class NumberedReceiver(Thread):
self.received = 0
m = self.read_message()
while m != -1:
+ self.receiver.assert_running()
assert(m <= self.received) # Check for missing messages
if (m == self.received): # Ignore duplicates
self.received += 1
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 593791297a..8f20f3cac6 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -253,6 +253,7 @@ acl allow all all
client was attached.
"""
args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
+ # First broker will be killed.
cluster0 = self.cluster(1, args=args)
cluster1 = self.cluster(1, args=args)
assert 0 == subprocess.call(
@@ -287,6 +288,7 @@ acl allow all all
# Force a change of elder
cluster0.start()
+ cluster0[0].expect=EXPECT_EXIT_FAIL # About to die.
cluster0[0].kill()
time.sleep(2) # Allow a management interval to pass.
# Verify logs are consistent
@@ -305,7 +307,7 @@ acl allow all all
"--sequence=true",
"--send-eos=1",
"--messages=100000",
- "--connection-options={reconnect:true}"
+ "--connection-options={reconnect:true,reconnect_timeout:10}"
])
self.receiver = self.popen(
["qpid-receive",
@@ -313,7 +315,7 @@ acl allow all all
"--address", queue,
"--ignore-duplicates",
"--check-redelivered",
- "--connection-options={reconnect:true}",
+ "--connection-options={reconnect:true,reconnect_timeout:10}",
"--forever"
])
time.sleep(1)#give sender enough time to have some messages to replay
@@ -461,7 +463,7 @@ acl allow all all
"--content-size=%s" % self.size,
"--messages=%s" % self.count,
"--failover-updates",
- "--connection-options={reconnect:true}",
+ "--connection-options={reconnect:true,reconnect_timeout:10}",
"--address=%s" % self.queue,
"--broker=%s" % self.broker.host_port()])
self.sender.wait()
@@ -493,7 +495,7 @@ acl allow all all
"--timeout=1",
"--print-content=no",
"--failover-updates",
- "--connection-options={reconnect:true}",
+ "--connection-options={reconnect:true,reconnect_timeout:10}",
"--ack-frequency=1",
"--address=flq",
"--broker=%s" % cluster[1].host_port()])
@@ -518,14 +520,14 @@ acl allow all all
"--timeout=1",
"--print-content=no",
"--failover-updates",
- "--connection-options={reconnect:true}",
+ "--connection-options={reconnect:true,reconnect_timeout:10}",
"--ack-frequency=1",
"--address=flq",
"--broker=%s" % cluster[2].host_port()])
receiver.wait()
q_obj.update()
assert not q_obj.flowStopped
- assert q_obj.msgDepth == 0
+ self.assertEqual(q_obj.msgDepth, 0)
# verify that the sender has become unblocked
sender.join(timeout=5)
@@ -701,18 +703,21 @@ class LongTests(BrokerTest):
# Start sender and receiver threads
cluster[0].declare_queue("test-queue")
- sender = NumberedSender(cluster[1], 1000) # Max queue depth
- receiver = NumberedReceiver(cluster[2], sender)
+ sender = NumberedSender(cluster[0], 1000) # Max queue depth
+ receiver = NumberedReceiver(cluster[0], sender)
receiver.start()
sender.start()
-
+ for b in cluster: b.ready() # Make sure brokers are ready
# Kill original brokers, start new ones for the duration.
endtime = time.time() + self.duration()
i = 0
while time.time() < endtime:
+ sender.sender.assert_running()
+ receiver.receiver.assert_running()
cluster[i].kill()
i += 1
b = cluster.start(expect=EXPECT_EXIT_FAIL)
+ for b in cluster[i:]: b.ready()
ErrorGenerator(b)
time.sleep(5)
sender.stop()
@@ -853,29 +858,28 @@ class LongTests(BrokerTest):
# Original cluster will all be killed so expect exit with failure
cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
- #for b in cluster: ErrorGenerator(b)
# create a queue with rather draconian flow control settings
ssn0 = cluster[0].connect().session()
s0 = ssn0.sender("test-queue; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':2000, 'qpid.flow_resume_count':100}}}}")
- receiver = NumberedReceiver(cluster[2])
+ receiver = NumberedReceiver(cluster[0])
receiver.start()
- senders = [NumberedSender(cluster[i]) for i in range(1,3)]
+ senders = [NumberedSender(cluster[0]) for i in range(1,3)]
for s in senders:
s.start()
+ for b in cluster: b.ready() # Make sure brokers are ready
# Kill original brokers, start new ones for the duration.
endtime = time.time() + self.duration();
i = 0
while time.time() < endtime:
+ for s in senders: s.sender.assert_running()
+ receiver.receiver.assert_running()
cluster[i].kill()
i += 1
b = cluster.start(expect=EXPECT_EXIT_FAIL)
- #ErrorGenerator(b)
time.sleep(5)
- #b = cluster[0]
- #b.startQmf()
for s in senders:
s.stop()
receiver.stop()