diff options
Diffstat (limited to 'qpid/java/testkit/testkit.py')
-rwxr-xr-x | qpid/java/testkit/testkit.py | 108 |
1 files changed, 12 insertions, 96 deletions
diff --git a/qpid/java/testkit/testkit.py b/qpid/java/testkit/testkit.py index a551371aba..c308b90d77 100755 --- a/qpid/java/testkit/testkit.py +++ b/qpid/java/testkit/testkit.py @@ -100,7 +100,6 @@ class JavaClientTest(BrokerTest): ssn = broker.connect().session() err_watcher = ssn.receiver("control; {create:always}", capacity=1) i = run_time/error_ck_freq - is_error = False for j in range(i): not_empty = True while not_empty: @@ -108,15 +107,13 @@ class JavaClientTest(BrokerTest): m = err_watcher.fetch(timeout=error_ck_freq) ssn.acknowledge() print "Java process notified of an error" - self.print_error(m) - is_error = True + self.check_for_error(m) except messaging.Empty, e: not_empty = False ssn.close() - return is_error - def print_error(self,msg): + def check_for_error(self,msg): print msg.properties.get("exception-trace") def verify(self, receiver,sender): @@ -128,6 +125,8 @@ class JavaClientTest(BrokerTest): def start_sender_and_receiver(self,**options): + options["use_unique_dests"]=True + options["address"]="amq.topic" receiver_opts = options receiver_opts["receiver"]=True receiver = self.popen(self.client(**receiver_opts), @@ -140,28 +139,21 @@ class JavaClientTest(BrokerTest): return receiver, sender - def start_cluster(self,count=2,expect=EXPECT_RUNNING,**options): - if options.get("durable",False)==True: - cluster = Cluster(self, count=count, expect=expect, args=self.store_module_args()) - else: - cluster = Cluster(self, count=count) - return cluster - class ConcurrencyTest(JavaClientTest): """A concurrency test suite for the JMS client""" - skip = True + skip = False def base_case(self,**options): if self.skip : print "Skipping test" return - cluster = self.start_cluster(count=2,**options) + if options["durable"]==True: + cluster = Cluster(self, count=2,args=self.store_module_args()) + else: + cluster = Cluster(self, count=2) self.start_error_watcher(broker=cluster[0]) - options["port"] = port=cluster[0].port() - - options["use_unique_dests"]=True - options["address"]="amq.topic" + options["port"] = port=cluster[0].port() receiver, sender = self.start_sender_and_receiver(**options) self.monitor_clients(broker=cluster[0],run_time=180) self.verify(receiver,sender) @@ -184,7 +176,7 @@ class ConcurrencyTest(JavaClientTest): def test_multiplexing_con_with_durable_sub(self): """Tests multiple sessions with durable subs""" - self.base_case(ssn_per_con=25,durable=True,jms_durable_sub=True,test_name=self.id()) + self.base_case(ssn_per_con=25,jms_durable_sub=True,test_name=self.id()) def test_multiplexing_con_with_sync_ack(self): """Tests multiple sessions with sync ack""" @@ -199,80 +191,4 @@ class ConcurrencyTest(JavaClientTest): def test_multiple_cons_and_ssns(self): """Tests multiple connections and sessions""" - self.base_case(con_count=10,ssn_per_con=25,test_name=self.id()) - - -class SoakTest(JavaClientTest): - """A soak test suite for the JMS client""" - - def base_case(self,**options): - cluster = self.start_cluster(count=4, expect=EXPECT_EXIT_FAIL,**options) - options["port"] = port=cluster[0].port() - self.start_error_watcher(broker=cluster[0]) - options["use_unique_dests"]=True - options["address"]="amq.topic" - receiver,sender = self.start_sender_and_receiver(**options) - is_error = self.monitor_clients(broker=cluster[0],run_time=30,error_ck_freq=30) - - if (is_error): - print "The sender or receiver didn't start properly. Exiting test." - return - else: - "Print no error !" - - # grace period for java clients to get the failover properly setup. - time.sleep(30) - error_msg= None - # Kill original brokers, start new ones. - try: - for i in range(8): - cluster[i].kill() - b=cluster.start() - self.monitor_clients(broker=b,run_time=30,error_ck_freq=30) - print "iteration : " + str(i) - except ConnectError, e1: - error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1) - - except SessionError, e2: - error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2) - - self.verify(receiver,sender) - if error_msg: - raise Exception(error_msg) - - - def test_failover(self) : - """Test basic failover""" - - self.base_case(test_name=self.id()) - - - def test_failover_with_durablesub(self): - """Test failover with durable subscriber""" - - self.base_case(durable=True,jms_durable_sub=True,test_name=self.id()) - - - def test_failover_with_sync_rcv(self): - """Test failover with sync receive""" - - self.base_case(sync_rcv=True,test_name=self.id()) - - - def test_failover_with_sync_ack(self): - """Test failover with sync ack""" - - self.base_case(sync_ack=True,test_name=self.id()) - - - def test_failover_with_noprefetch(self): - """Test failover with no prefetch""" - - self.base_case(max_prefetch=1,test_name=self.id()) - - - def test_failover_with_multiple_cons_and_ssns(self): - """Test failover with multiple connections and sessions""" - - self.base_case(use_unique_dests=True,address="amq.topic", - con_count=10,ssn_per_con=25,test_name=self.id()) + self.base_case(con_count=25,ssn_per_con=25,test_name=self.id()) |