summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/testkit/bin/setenv.sh10
-rwxr-xr-xqpid/java/testkit/testkit.py186
2 files changed, 74 insertions, 122 deletions
diff --git a/qpid/java/testkit/bin/setenv.sh b/qpid/java/testkit/bin/setenv.sh
index 71bb219dcf..e6a726eef1 100644
--- a/qpid/java/testkit/bin/setenv.sh
+++ b/qpid/java/testkit/bin/setenv.sh
@@ -60,6 +60,16 @@ if [ "$CLUSTER_LIB" = "" ] ; then
fi
fi
+if [ "$STORE_LIB" = "" ] ; then
+ if [ -e /usr/lib64/qpid/daemon/msgstore.so ] ; then
+ CLUSTER_LIB="/usr/lib64/qpid/daemon/msgstore.so"
+ elif [ -e /usr/lib/qpid/daemon/msgstore.so ] ; then
+ CLUSTER_LIB="/usr/lib/qpid/daemon/msgstore.so"
+ else
+ echo "WARNING: skipping test, STORE_LIB not set and msgstore.so not found."; exit 0;
+ fi
+fi
+
if [ "$QP_CP" = "" ] ; then
if [ -d ../../build/lib/ ]; then
QP_JAR_PATH=`abs_path "../../build/lib/"`
diff --git a/qpid/java/testkit/testkit.py b/qpid/java/testkit/testkit.py
index 5f83156c38..c308b90d77 100755
--- a/qpid/java/testkit/testkit.py
+++ b/qpid/java/testkit/testkit.py
@@ -56,6 +56,13 @@ class JavaClientTest(BrokerTest):
err_watcher = ssn.receiver("control; {create:always}", capacity=1)
ssn.close()
+ def store_module_args(self):
+ if BrokerTest.store_lib:
+ return ["--load-module", BrokerTest.store_lib]
+ else:
+ print "Store module not present."
+ return [""]
+
def client(self,**options):
cmd = ["java","-cp",_cp]
@@ -65,7 +72,7 @@ class JavaClientTest(BrokerTest):
cmd += ["-Dcon_count=" + str(options.get("con_count",1))]
cmd += ["-Dssn_per_con=" + str(options.get("ssn_per_con",1))]
cmd += ["-Duse_unique_dests=" + str(options.get("use_unique_dests",False))]
- cmd += ["-Dcheck_for_dups=" + str(options.get("check_for_dups",True))]
+ cmd += ["-Dcheck_for_dups=" + str(options.get("check_for_dups",False))]
cmd += ["-Ddurable=" + str(options.get("durable",False))]
cmd += ["-Dtransacted=" + str(options.get("transacted",False))]
cmd += ["-Dreceiver=" + str(options.get("receiver",False))]
@@ -73,7 +80,10 @@ class JavaClientTest(BrokerTest):
cmd += ["-Dsender=" + str(options.get("sender",False))]
cmd += ["-Dmsg_size=" + str(options.get("msg_size",256))]
cmd += ["-Dtx_size=" + str(options.get("tx_size",10))]
- cmd += ["-Dmsg_count=" + str(options.get("msg_count",10))]
+ cmd += ["-Dmsg_count=" + str(options.get("msg_count",1000))]
+ cmd += ["-Dmax_prefetch=" + str(options.get("max_prefetch",500))]
+ cmd += ["-Dsync_ack=" + str(options.get("sync_ack",False))]
+ cmd += ["-Dsync_persistence=" + str(options.get("sync_pub",False))]
cmd += ["-Dsleep_time=" + str(options.get("sleep_time",1000))]
cmd += ["-Dfailover=" + options.get("failover", "failover_exchange")]
cmd += ["-Djms_durable_sub=" + str(options.get("jms_durable_sub", False))]
@@ -113,140 +123,72 @@ class JavaClientTest(BrokerTest):
self.assertTrue(receiver_running,"Receiver has exited prematually")
self.assertTrue(sender_running,"Sender has exited prematually")
+ def start_sender_and_receiver(self,**options):
+
+ options["use_unique_dests"]=True
+ options["address"]="amq.topic"
+ receiver_opts = options
+ receiver_opts["receiver"]=True
+ receiver = self.popen(self.client(**receiver_opts),
+ expect=EXPECT_RUNNING)
+
+ sender_opts = options
+ sender_opts["sender"]=True
+ sender = self.popen(self.client(**sender_opts),
+ expect=EXPECT_RUNNING)
+
+ return receiver, sender
class ConcurrencyTest(JavaClientTest):
"""A concurrency test suite for the JMS client"""
+ skip = False
- def test_multiplexing_con(self):
- """Tests multiple sessions on a single connection"""
+ def base_case(self,**options):
+ if self.skip :
+ print "Skipping test"
+ return
- cluster = Cluster(self, 2)
- p = cluster[0].port()
-
+ if options["durable"]==True:
+ cluster = Cluster(self, count=2,args=self.store_module_args())
+ else:
+ cluster = Cluster(self, count=2)
self.start_error_watcher(broker=cluster[0])
-
- receiver = self.popen(self.client(receiver=True,
- ssn_per_con=25,
- port=p,
- address="amq.topic",
- use_unique_dests=True,
- test_name=self.id()),
- expect=EXPECT_RUNNING)
-
- sender = self.popen(self.client(sender=True,
- ssn_per_con=25,
- port=p,
- address="amq.topic",
- use_unique_dests=True,
- test_name=self.id()),
- expect=EXPECT_RUNNING)
-
+ options["port"] = port=cluster[0].port()
+ receiver, sender = self.start_sender_and_receiver(**options)
self.monitor_clients(broker=cluster[0],run_time=180)
self.verify(receiver,sender)
+ def test_multiplexing_con(self):
+ """Tests multiple sessions on a single connection"""
+
+ self.base_case(ssn_per_con=25,test_name=self.id())
- def test_multiplexing_con_tx(self):
+ def test_multiplexing_con_with_tx(self):
"""Tests multiple transacted sessions on a single connection"""
- cluster = Cluster(self,2)
- ssn = cluster[0].connect().session()
- p = cluster[0].port()
-
- self.start_error_watcher(broker=cluster[0])
+ self.base_case(ssn_per_con=25,transacted=True,test_name=self.id())
- receiver = self.popen(self.client(receiver=True,
- ssn_per_con=25,
- port=p,
- transacted=True,
- address="amq.topic",
- use_unique_dests=True,
- test_name=self.id()),
- expect=EXPECT_RUNNING)
-
- sender = self.popen(self.client(sender=True,
- ssn_per_con=25,
- port=p,
- transacted=True,
- address="amq.topic",
- use_unique_dests=True,
- test_name=self.id()),
- expect=EXPECT_RUNNING)
-
- self.monitor_clients(broker=cluster[0],run_time=60)
- ssn.close();
- self.verify(receiver,sender)
+ def test_multiplexing_con_with_sync_rcv(self):
+ """Tests multiple sessions with sync receive"""
-class SoakTest(JavaClientTest):
- """A soak test suite for the JMS client"""
+ self.base_case(ssn_per_con=25,sync_rcv=True,test_name=self.id())
- def test_failover(self):
- cluster = self.cluster(4, expect=EXPECT_EXIT_FAIL)
- p = cluster[0].port()
- self.start_error_watcher(broker=cluster[0])
- receiver = self.popen(self.client(receiver=True,
- ssn_per_con=1,
- port=p,
- test_name=self.id()),
- expect=EXPECT_RUNNING)
-
- sender = self.popen(self.client(sender=True,
- ssn_per_con=1,
- port=p,
- test_name=self.id()),
- expect=EXPECT_RUNNING)
-
- # grace period for java clients to get the failover properly setup.
- time.sleep(30)
- error_msg= None
- # Kill original brokers, start new ones.
- try:
- for i in range(8):
- cluster[i].kill()
- b=cluster.start()
- self.monitor_clients(broker=b,run_time=30,error_ck_freq=30)
- print "iteration : " + str(i)
- except ConnectError, e1:
- error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1)
-
- except SessionError, e2:
- error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2)
+ def test_multiplexing_con_with_durable_sub(self):
+ """Tests multiple sessions with durable subs"""
- self.verify(receiver,sender)
- if error_msg:
- raise Exception(error_msg)
-
- def test_failover_with_durablesub(self):
- cluster = self.cluster(4, expect=EXPECT_EXIT_FAIL)
- p = cluster[0].port()
- self.start_error_watcher(broker=cluster[0])
- receiver = self.popen(self.client(receiver=True,
- ssn_per_con=1,
- port=p,
- jms_durable_sub=True,
- test_name=self.id()),
- expect=EXPECT_RUNNING)
-
- sender = self.popen(self.client(sender=True,
- ssn_per_con=1,
- port=p,
- test_name=self.id()),
- expect=EXPECT_RUNNING)
-
- # grace period for java clients to get the failover properly setup.
- time.sleep(30)
- error_msg=None
- # Kill original brokers, start new ones.
- try:
- for i in range(8):
- cluster[i].kill()
- b=cluster.start()
- self.monitor_clients(broker=b,run_time=30,error_ck_freq=30)
- except ConnectError, e1:
- error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1)
-
- except SessionError, e2:
- error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2)
+ self.base_case(ssn_per_con=25,jms_durable_sub=True,test_name=self.id())
- self.verify(receiver,sender)
- if error_msg:
- raise Exception(error_msg)
+ def test_multiplexing_con_with_sync_ack(self):
+ """Tests multiple sessions with sync ack"""
+
+ self.base_case(ssn_per_con=25,sync_ack=True,test_name=self.id())
+
+ def test_multiplexing_con_with_sync_pub(self):
+ """Tests multiple sessions with sync pub"""
+
+ self.base_case(ssn_per_con=25,sync_pub=True,durable=True,test_name=self.id())
+
+ def test_multiple_cons_and_ssns(self):
+ """Tests multiple connections and sessions"""
+
+ self.base_case(con_count=25,ssn_per_con=25,test_name=self.id())