diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2010-10-05 01:24:46 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2010-10-05 01:24:46 +0000 |
commit | 3fb69c9de4abc5e43a3830d210b8168ad5838de5 (patch) | |
tree | 4f5a25e89b729933a6ea06fe306d76b93b27d75f | |
parent | 435bd1198541e022440d422ec5f68974118c2f44 (diff) | |
download | qpid-python-3fb69c9de4abc5e43a3830d210b8168ad5838de5.tar.gz |
Added more test cases to cover failover.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1004496 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-x | qpid/java/testkit/testkit.py | 108 |
1 files changed, 96 insertions, 12 deletions
diff --git a/qpid/java/testkit/testkit.py b/qpid/java/testkit/testkit.py index c308b90d77..a551371aba 100755 --- a/qpid/java/testkit/testkit.py +++ b/qpid/java/testkit/testkit.py @@ -100,6 +100,7 @@ class JavaClientTest(BrokerTest): ssn = broker.connect().session() err_watcher = ssn.receiver("control; {create:always}", capacity=1) i = run_time/error_ck_freq + is_error = False for j in range(i): not_empty = True while not_empty: @@ -107,13 +108,15 @@ class JavaClientTest(BrokerTest): m = err_watcher.fetch(timeout=error_ck_freq) ssn.acknowledge() print "Java process notified of an error" - self.check_for_error(m) + self.print_error(m) + is_error = True except messaging.Empty, e: not_empty = False ssn.close() + return is_error - def check_for_error(self,msg): + def print_error(self,msg): print msg.properties.get("exception-trace") def verify(self, receiver,sender): @@ -125,8 +128,6 @@ class JavaClientTest(BrokerTest): def start_sender_and_receiver(self,**options): - options["use_unique_dests"]=True - options["address"]="amq.topic" receiver_opts = options receiver_opts["receiver"]=True receiver = self.popen(self.client(**receiver_opts), @@ -139,21 +140,28 @@ class JavaClientTest(BrokerTest): return receiver, sender + def start_cluster(self,count=2,expect=EXPECT_RUNNING,**options): + if options.get("durable",False)==True: + cluster = Cluster(self, count=count, expect=expect, args=self.store_module_args()) + else: + cluster = Cluster(self, count=count) + return cluster + class ConcurrencyTest(JavaClientTest): """A concurrency test suite for the JMS client""" - skip = False + skip = True def base_case(self,**options): if self.skip : print "Skipping test" return - if options["durable"]==True: - cluster = Cluster(self, count=2,args=self.store_module_args()) - else: - cluster = Cluster(self, count=2) + cluster = self.start_cluster(count=2,**options) self.start_error_watcher(broker=cluster[0]) - options["port"] = port=cluster[0].port() + options["port"] = port=cluster[0].port() + + options["use_unique_dests"]=True + options["address"]="amq.topic" receiver, sender = self.start_sender_and_receiver(**options) self.monitor_clients(broker=cluster[0],run_time=180) self.verify(receiver,sender) @@ -176,7 +184,7 @@ class ConcurrencyTest(JavaClientTest): def test_multiplexing_con_with_durable_sub(self): """Tests multiple sessions with durable subs""" - self.base_case(ssn_per_con=25,jms_durable_sub=True,test_name=self.id()) + self.base_case(ssn_per_con=25,durable=True,jms_durable_sub=True,test_name=self.id()) def test_multiplexing_con_with_sync_ack(self): """Tests multiple sessions with sync ack""" @@ -191,4 +199,80 @@ class ConcurrencyTest(JavaClientTest): def test_multiple_cons_and_ssns(self): """Tests multiple connections and sessions""" - self.base_case(con_count=25,ssn_per_con=25,test_name=self.id()) + self.base_case(con_count=10,ssn_per_con=25,test_name=self.id()) + + +class SoakTest(JavaClientTest): + """A soak test suite for the JMS client""" + + def base_case(self,**options): + cluster = self.start_cluster(count=4, expect=EXPECT_EXIT_FAIL,**options) + options["port"] = port=cluster[0].port() + self.start_error_watcher(broker=cluster[0]) + options["use_unique_dests"]=True + options["address"]="amq.topic" + receiver,sender = self.start_sender_and_receiver(**options) + is_error = self.monitor_clients(broker=cluster[0],run_time=30,error_ck_freq=30) + + if (is_error): + print "The sender or receiver didn't start properly. Exiting test." + return + else: + "Print no error !" + + # grace period for java clients to get the failover properly setup. + time.sleep(30) + error_msg= None + # Kill original brokers, start new ones. + try: + for i in range(8): + cluster[i].kill() + b=cluster.start() + self.monitor_clients(broker=b,run_time=30,error_ck_freq=30) + print "iteration : " + str(i) + except ConnectError, e1: + error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1) + + except SessionError, e2: + error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2) + + self.verify(receiver,sender) + if error_msg: + raise Exception(error_msg) + + + def test_failover(self) : + """Test basic failover""" + + self.base_case(test_name=self.id()) + + + def test_failover_with_durablesub(self): + """Test failover with durable subscriber""" + + self.base_case(durable=True,jms_durable_sub=True,test_name=self.id()) + + + def test_failover_with_sync_rcv(self): + """Test failover with sync receive""" + + self.base_case(sync_rcv=True,test_name=self.id()) + + + def test_failover_with_sync_ack(self): + """Test failover with sync ack""" + + self.base_case(sync_ack=True,test_name=self.id()) + + + def test_failover_with_noprefetch(self): + """Test failover with no prefetch""" + + self.base_case(max_prefetch=1,test_name=self.id()) + + + def test_failover_with_multiple_cons_and_ssns(self): + """Test failover with multiple connections and sessions""" + + self.base_case(use_unique_dests=True,address="amq.topic", + con_count=10,ssn_per_con=25,test_name=self.id()) |