summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2010-10-05 01:24:46 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2010-10-05 01:24:46 +0000
commit3fb69c9de4abc5e43a3830d210b8168ad5838de5 (patch)
tree4f5a25e89b729933a6ea06fe306d76b93b27d75f
parent435bd1198541e022440d422ec5f68974118c2f44 (diff)
downloadqpid-python-3fb69c9de4abc5e43a3830d210b8168ad5838de5.tar.gz
Added more test cases to cover failover.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1004496 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/java/testkit/testkit.py108
1 files changed, 96 insertions, 12 deletions
diff --git a/qpid/java/testkit/testkit.py b/qpid/java/testkit/testkit.py
index c308b90d77..a551371aba 100755
--- a/qpid/java/testkit/testkit.py
+++ b/qpid/java/testkit/testkit.py
@@ -100,6 +100,7 @@ class JavaClientTest(BrokerTest):
ssn = broker.connect().session()
err_watcher = ssn.receiver("control; {create:always}", capacity=1)
i = run_time/error_ck_freq
+ is_error = False
for j in range(i):
not_empty = True
while not_empty:
@@ -107,13 +108,15 @@ class JavaClientTest(BrokerTest):
m = err_watcher.fetch(timeout=error_ck_freq)
ssn.acknowledge()
print "Java process notified of an error"
- self.check_for_error(m)
+ self.print_error(m)
+ is_error = True
except messaging.Empty, e:
not_empty = False
ssn.close()
+ return is_error
- def check_for_error(self,msg):
+ def print_error(self,msg):
print msg.properties.get("exception-trace")
def verify(self, receiver,sender):
@@ -125,8 +128,6 @@ class JavaClientTest(BrokerTest):
def start_sender_and_receiver(self,**options):
- options["use_unique_dests"]=True
- options["address"]="amq.topic"
receiver_opts = options
receiver_opts["receiver"]=True
receiver = self.popen(self.client(**receiver_opts),
@@ -139,21 +140,28 @@ class JavaClientTest(BrokerTest):
return receiver, sender
+ def start_cluster(self,count=2,expect=EXPECT_RUNNING,**options):
+ if options.get("durable",False)==True:
+ cluster = Cluster(self, count=count, expect=expect, args=self.store_module_args())
+ else:
+ cluster = Cluster(self, count=count)
+ return cluster
+
class ConcurrencyTest(JavaClientTest):
"""A concurrency test suite for the JMS client"""
- skip = False
+ skip = True
def base_case(self,**options):
if self.skip :
print "Skipping test"
return
- if options["durable"]==True:
- cluster = Cluster(self, count=2,args=self.store_module_args())
- else:
- cluster = Cluster(self, count=2)
+ cluster = self.start_cluster(count=2,**options)
self.start_error_watcher(broker=cluster[0])
- options["port"] = port=cluster[0].port()
+ options["port"] = port=cluster[0].port()
+
+ options["use_unique_dests"]=True
+ options["address"]="amq.topic"
receiver, sender = self.start_sender_and_receiver(**options)
self.monitor_clients(broker=cluster[0],run_time=180)
self.verify(receiver,sender)
@@ -176,7 +184,7 @@ class ConcurrencyTest(JavaClientTest):
def test_multiplexing_con_with_durable_sub(self):
"""Tests multiple sessions with durable subs"""
- self.base_case(ssn_per_con=25,jms_durable_sub=True,test_name=self.id())
+ self.base_case(ssn_per_con=25,durable=True,jms_durable_sub=True,test_name=self.id())
def test_multiplexing_con_with_sync_ack(self):
"""Tests multiple sessions with sync ack"""
@@ -191,4 +199,80 @@ class ConcurrencyTest(JavaClientTest):
def test_multiple_cons_and_ssns(self):
"""Tests multiple connections and sessions"""
- self.base_case(con_count=25,ssn_per_con=25,test_name=self.id())
+ self.base_case(con_count=10,ssn_per_con=25,test_name=self.id())
+
+
+class SoakTest(JavaClientTest):
+ """A soak test suite for the JMS client"""
+
+ def base_case(self,**options):
+ cluster = self.start_cluster(count=4, expect=EXPECT_EXIT_FAIL,**options)
+ options["port"] = port=cluster[0].port()
+ self.start_error_watcher(broker=cluster[0])
+ options["use_unique_dests"]=True
+ options["address"]="amq.topic"
+ receiver,sender = self.start_sender_and_receiver(**options)
+ is_error = self.monitor_clients(broker=cluster[0],run_time=30,error_ck_freq=30)
+
+ if (is_error):
+ print "The sender or receiver didn't start properly. Exiting test."
+ return
+ else:
+ "Print no error !"
+
+ # grace period for java clients to get the failover properly setup.
+ time.sleep(30)
+ error_msg= None
+ # Kill original brokers, start new ones.
+ try:
+ for i in range(8):
+ cluster[i].kill()
+ b=cluster.start()
+ self.monitor_clients(broker=b,run_time=30,error_ck_freq=30)
+ print "iteration : " + str(i)
+ except ConnectError, e1:
+ error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1)
+
+ except SessionError, e2:
+ error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2)
+
+ self.verify(receiver,sender)
+ if error_msg:
+ raise Exception(error_msg)
+
+
+ def test_failover(self) :
+ """Test basic failover"""
+
+ self.base_case(test_name=self.id())
+
+
+ def test_failover_with_durablesub(self):
+ """Test failover with durable subscriber"""
+
+ self.base_case(durable=True,jms_durable_sub=True,test_name=self.id())
+
+
+ def test_failover_with_sync_rcv(self):
+ """Test failover with sync receive"""
+
+ self.base_case(sync_rcv=True,test_name=self.id())
+
+
+ def test_failover_with_sync_ack(self):
+ """Test failover with sync ack"""
+
+ self.base_case(sync_ack=True,test_name=self.id())
+
+
+ def test_failover_with_noprefetch(self):
+ """Test failover with no prefetch"""
+
+ self.base_case(max_prefetch=1,test_name=self.id())
+
+
+ def test_failover_with_multiple_cons_and_ssns(self):
+ """Test failover with multiple connections and sessions"""
+
+ self.base_case(use_unique_dests=True,address="amq.topic",
+ con_count=10,ssn_per_con=25,test_name=self.id())