summaryrefslogtreecommitdiff
path: root/qpid/java/testkit/testkit.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/testkit/testkit.py')
-rwxr-xr-xqpid/java/testkit/testkit.py108
1 files changed, 12 insertions, 96 deletions
diff --git a/qpid/java/testkit/testkit.py b/qpid/java/testkit/testkit.py
index a551371aba..c308b90d77 100755
--- a/qpid/java/testkit/testkit.py
+++ b/qpid/java/testkit/testkit.py
@@ -100,7 +100,6 @@ class JavaClientTest(BrokerTest):
ssn = broker.connect().session()
err_watcher = ssn.receiver("control; {create:always}", capacity=1)
i = run_time/error_ck_freq
- is_error = False
for j in range(i):
not_empty = True
while not_empty:
@@ -108,15 +107,13 @@ class JavaClientTest(BrokerTest):
m = err_watcher.fetch(timeout=error_ck_freq)
ssn.acknowledge()
print "Java process notified of an error"
- self.print_error(m)
- is_error = True
+ self.check_for_error(m)
except messaging.Empty, e:
not_empty = False
ssn.close()
- return is_error
- def print_error(self,msg):
+ def check_for_error(self,msg):
print msg.properties.get("exception-trace")
def verify(self, receiver,sender):
@@ -128,6 +125,8 @@ class JavaClientTest(BrokerTest):
def start_sender_and_receiver(self,**options):
+ options["use_unique_dests"]=True
+ options["address"]="amq.topic"
receiver_opts = options
receiver_opts["receiver"]=True
receiver = self.popen(self.client(**receiver_opts),
@@ -140,28 +139,21 @@ class JavaClientTest(BrokerTest):
return receiver, sender
- def start_cluster(self,count=2,expect=EXPECT_RUNNING,**options):
- if options.get("durable",False)==True:
- cluster = Cluster(self, count=count, expect=expect, args=self.store_module_args())
- else:
- cluster = Cluster(self, count=count)
- return cluster
-
class ConcurrencyTest(JavaClientTest):
"""A concurrency test suite for the JMS client"""
- skip = True
+ skip = False
def base_case(self,**options):
if self.skip :
print "Skipping test"
return
- cluster = self.start_cluster(count=2,**options)
+ if options["durable"]==True:
+ cluster = Cluster(self, count=2,args=self.store_module_args())
+ else:
+ cluster = Cluster(self, count=2)
self.start_error_watcher(broker=cluster[0])
- options["port"] = port=cluster[0].port()
-
- options["use_unique_dests"]=True
- options["address"]="amq.topic"
+ options["port"] = port=cluster[0].port()
receiver, sender = self.start_sender_and_receiver(**options)
self.monitor_clients(broker=cluster[0],run_time=180)
self.verify(receiver,sender)
@@ -184,7 +176,7 @@ class ConcurrencyTest(JavaClientTest):
def test_multiplexing_con_with_durable_sub(self):
"""Tests multiple sessions with durable subs"""
- self.base_case(ssn_per_con=25,durable=True,jms_durable_sub=True,test_name=self.id())
+ self.base_case(ssn_per_con=25,jms_durable_sub=True,test_name=self.id())
def test_multiplexing_con_with_sync_ack(self):
"""Tests multiple sessions with sync ack"""
@@ -199,80 +191,4 @@ class ConcurrencyTest(JavaClientTest):
def test_multiple_cons_and_ssns(self):
"""Tests multiple connections and sessions"""
- self.base_case(con_count=10,ssn_per_con=25,test_name=self.id())
-
-
-class SoakTest(JavaClientTest):
- """A soak test suite for the JMS client"""
-
- def base_case(self,**options):
- cluster = self.start_cluster(count=4, expect=EXPECT_EXIT_FAIL,**options)
- options["port"] = port=cluster[0].port()
- self.start_error_watcher(broker=cluster[0])
- options["use_unique_dests"]=True
- options["address"]="amq.topic"
- receiver,sender = self.start_sender_and_receiver(**options)
- is_error = self.monitor_clients(broker=cluster[0],run_time=30,error_ck_freq=30)
-
- if (is_error):
- print "The sender or receiver didn't start properly. Exiting test."
- return
- else:
- "Print no error !"
-
- # grace period for java clients to get the failover properly setup.
- time.sleep(30)
- error_msg= None
- # Kill original brokers, start new ones.
- try:
- for i in range(8):
- cluster[i].kill()
- b=cluster.start()
- self.monitor_clients(broker=b,run_time=30,error_ck_freq=30)
- print "iteration : " + str(i)
- except ConnectError, e1:
- error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1)
-
- except SessionError, e2:
- error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2)
-
- self.verify(receiver,sender)
- if error_msg:
- raise Exception(error_msg)
-
-
- def test_failover(self) :
- """Test basic failover"""
-
- self.base_case(test_name=self.id())
-
-
- def test_failover_with_durablesub(self):
- """Test failover with durable subscriber"""
-
- self.base_case(durable=True,jms_durable_sub=True,test_name=self.id())
-
-
- def test_failover_with_sync_rcv(self):
- """Test failover with sync receive"""
-
- self.base_case(sync_rcv=True,test_name=self.id())
-
-
- def test_failover_with_sync_ack(self):
- """Test failover with sync ack"""
-
- self.base_case(sync_ack=True,test_name=self.id())
-
-
- def test_failover_with_noprefetch(self):
- """Test failover with no prefetch"""
-
- self.base_case(max_prefetch=1,test_name=self.id())
-
-
- def test_failover_with_multiple_cons_and_ssns(self):
- """Test failover with multiple connections and sessions"""
-
- self.base_case(use_unique_dests=True,address="amq.topic",
- con_count=10,ssn_per_con=25,test_name=self.id())
+ self.base_case(con_count=25,ssn_per_con=25,test_name=self.id())