summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2010-09-30 01:58:23 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2010-09-30 01:58:23 +0000
commitea85924637502b1a29961cc029c8019b51626ed9 (patch)
tree1a8f94330a4be30a392d0ab4bbabbb5618dc3385
parent8c32d038ac6790d437f075285ae811e21d5149d3 (diff)
downloadqpid-python-ea85924637502b1a29961cc029c8019b51626ed9.tar.gz
Added the ability to load the store module via installed rpms or using an explicitly specified location.
Moved common functionality into base class and helper methods. Added several test cases for the ConcurrencyTest class. (Temporarily removed the FailoverTests.) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1002928 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/testkit/bin/setenv.sh10
-rwxr-xr-xqpid/java/testkit/testkit.py186
2 files changed, 74 insertions, 122 deletions
diff --git a/qpid/java/testkit/bin/setenv.sh b/qpid/java/testkit/bin/setenv.sh
index 71bb219dcf..e6a726eef1 100644
--- a/qpid/java/testkit/bin/setenv.sh
+++ b/qpid/java/testkit/bin/setenv.sh
@@ -60,6 +60,16 @@ if [ "$CLUSTER_LIB" = "" ] ; then
fi
fi
+if [ "$STORE_LIB" = "" ] ; then
+ if [ -e /usr/lib64/qpid/daemon/msgstore.so ] ; then
+ CLUSTER_LIB="/usr/lib64/qpid/daemon/msgstore.so"
+ elif [ -e /usr/lib/qpid/daemon/msgstore.so ] ; then
+ CLUSTER_LIB="/usr/lib/qpid/daemon/msgstore.so"
+ else
+ echo "WARNING: skipping test, STORE_LIB not set and msgstore.so not found."; exit 0;
+ fi
+fi
+
if [ "$QP_CP" = "" ] ; then
if [ -d ../../build/lib/ ]; then
QP_JAR_PATH=`abs_path "../../build/lib/"`
diff --git a/qpid/java/testkit/testkit.py b/qpid/java/testkit/testkit.py
index 5f83156c38..c308b90d77 100755
--- a/qpid/java/testkit/testkit.py
+++ b/qpid/java/testkit/testkit.py
@@ -56,6 +56,13 @@ class JavaClientTest(BrokerTest):
err_watcher = ssn.receiver("control; {create:always}", capacity=1)
ssn.close()
+ def store_module_args(self):
+ if BrokerTest.store_lib:
+ return ["--load-module", BrokerTest.store_lib]
+ else:
+ print "Store module not present."
+ return [""]
+
def client(self,**options):
cmd = ["java","-cp",_cp]
@@ -65,7 +72,7 @@ class JavaClientTest(BrokerTest):
cmd += ["-Dcon_count=" + str(options.get("con_count",1))]
cmd += ["-Dssn_per_con=" + str(options.get("ssn_per_con",1))]
cmd += ["-Duse_unique_dests=" + str(options.get("use_unique_dests",False))]
- cmd += ["-Dcheck_for_dups=" + str(options.get("check_for_dups",True))]
+ cmd += ["-Dcheck_for_dups=" + str(options.get("check_for_dups",False))]
cmd += ["-Ddurable=" + str(options.get("durable",False))]
cmd += ["-Dtransacted=" + str(options.get("transacted",False))]
cmd += ["-Dreceiver=" + str(options.get("receiver",False))]
@@ -73,7 +80,10 @@ class JavaClientTest(BrokerTest):
cmd += ["-Dsender=" + str(options.get("sender",False))]
cmd += ["-Dmsg_size=" + str(options.get("msg_size",256))]
cmd += ["-Dtx_size=" + str(options.get("tx_size",10))]
- cmd += ["-Dmsg_count=" + str(options.get("msg_count",10))]
+ cmd += ["-Dmsg_count=" + str(options.get("msg_count",1000))]
+ cmd += ["-Dmax_prefetch=" + str(options.get("max_prefetch",500))]
+ cmd += ["-Dsync_ack=" + str(options.get("sync_ack",False))]
+ cmd += ["-Dsync_persistence=" + str(options.get("sync_pub",False))]
cmd += ["-Dsleep_time=" + str(options.get("sleep_time",1000))]
cmd += ["-Dfailover=" + options.get("failover", "failover_exchange")]
cmd += ["-Djms_durable_sub=" + str(options.get("jms_durable_sub", False))]
@@ -113,140 +123,72 @@ class JavaClientTest(BrokerTest):
self.assertTrue(receiver_running,"Receiver has exited prematually")
self.assertTrue(sender_running,"Sender has exited prematually")
+ def start_sender_and_receiver(self,**options):
+
+ options["use_unique_dests"]=True
+ options["address"]="amq.topic"
+ receiver_opts = options
+ receiver_opts["receiver"]=True
+ receiver = self.popen(self.client(**receiver_opts),
+ expect=EXPECT_RUNNING)
+
+ sender_opts = options
+ sender_opts["sender"]=True
+ sender = self.popen(self.client(**sender_opts),
+ expect=EXPECT_RUNNING)
+
+ return receiver, sender
class ConcurrencyTest(JavaClientTest):
"""A concurrency test suite for the JMS client"""
+ skip = False
- def test_multiplexing_con(self):
- """Tests multiple sessions on a single connection"""
+ def base_case(self,**options):
+ if self.skip :
+ print "Skipping test"
+ return
- cluster = Cluster(self, 2)
- p = cluster[0].port()
-
+ if options["durable"]==True:
+ cluster = Cluster(self, count=2,args=self.store_module_args())
+ else:
+ cluster = Cluster(self, count=2)
self.start_error_watcher(broker=cluster[0])
-
- receiver = self.popen(self.client(receiver=True,
- ssn_per_con=25,
- port=p,
- address="amq.topic",
- use_unique_dests=True,
- test_name=self.id()),
- expect=EXPECT_RUNNING)
-
- sender = self.popen(self.client(sender=True,
- ssn_per_con=25,
- port=p,
- address="amq.topic",
- use_unique_dests=True,
- test_name=self.id()),
- expect=EXPECT_RUNNING)
-
+ options["port"] = port=cluster[0].port()
+ receiver, sender = self.start_sender_and_receiver(**options)
self.monitor_clients(broker=cluster[0],run_time=180)
self.verify(receiver,sender)
+ def test_multiplexing_con(self):
+ """Tests multiple sessions on a single connection"""
+
+ self.base_case(ssn_per_con=25,test_name=self.id())
- def test_multiplexing_con_tx(self):
+ def test_multiplexing_con_with_tx(self):
"""Tests multiple transacted sessions on a single connection"""
- cluster = Cluster(self,2)
- ssn = cluster[0].connect().session()
- p = cluster[0].port()
-
- self.start_error_watcher(broker=cluster[0])
+ self.base_case(ssn_per_con=25,transacted=True,test_name=self.id())
- receiver = self.popen(self.client(receiver=True,
- ssn_per_con=25,
- port=p,
- transacted=True,
- address="amq.topic",
- use_unique_dests=True,
- test_name=self.id()),
- expect=EXPECT_RUNNING)
-
- sender = self.popen(self.client(sender=True,
- ssn_per_con=25,
- port=p,
- transacted=True,
- address="amq.topic",
- use_unique_dests=True,
- test_name=self.id()),
- expect=EXPECT_RUNNING)
-
- self.monitor_clients(broker=cluster[0],run_time=60)
- ssn.close();
- self.verify(receiver,sender)
+ def test_multiplexing_con_with_sync_rcv(self):
+ """Tests multiple sessions with sync receive"""
-class SoakTest(JavaClientTest):
- """A soak test suite for the JMS client"""
+ self.base_case(ssn_per_con=25,sync_rcv=True,test_name=self.id())
- def test_failover(self):
- cluster = self.cluster(4, expect=EXPECT_EXIT_FAIL)
- p = cluster[0].port()
- self.start_error_watcher(broker=cluster[0])
- receiver = self.popen(self.client(receiver=True,
- ssn_per_con=1,
- port=p,
- test_name=self.id()),
- expect=EXPECT_RUNNING)
-
- sender = self.popen(self.client(sender=True,
- ssn_per_con=1,
- port=p,
- test_name=self.id()),
- expect=EXPECT_RUNNING)
-
- # grace period for java clients to get the failover properly setup.
- time.sleep(30)
- error_msg= None
- # Kill original brokers, start new ones.
- try:
- for i in range(8):
- cluster[i].kill()
- b=cluster.start()
- self.monitor_clients(broker=b,run_time=30,error_ck_freq=30)
- print "iteration : " + str(i)
- except ConnectError, e1:
- error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1)
-
- except SessionError, e2:
- error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2)
+ def test_multiplexing_con_with_durable_sub(self):
+ """Tests multiple sessions with durable subs"""
- self.verify(receiver,sender)
- if error_msg:
- raise Exception(error_msg)
-
- def test_failover_with_durablesub(self):
- cluster = self.cluster(4, expect=EXPECT_EXIT_FAIL)
- p = cluster[0].port()
- self.start_error_watcher(broker=cluster[0])
- receiver = self.popen(self.client(receiver=True,
- ssn_per_con=1,
- port=p,
- jms_durable_sub=True,
- test_name=self.id()),
- expect=EXPECT_RUNNING)
-
- sender = self.popen(self.client(sender=True,
- ssn_per_con=1,
- port=p,
- test_name=self.id()),
- expect=EXPECT_RUNNING)
-
- # grace period for java clients to get the failover properly setup.
- time.sleep(30)
- error_msg=None
- # Kill original brokers, start new ones.
- try:
- for i in range(8):
- cluster[i].kill()
- b=cluster.start()
- self.monitor_clients(broker=b,run_time=30,error_ck_freq=30)
- except ConnectError, e1:
- error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1)
-
- except SessionError, e2:
- error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2)
+ self.base_case(ssn_per_con=25,jms_durable_sub=True,test_name=self.id())
- self.verify(receiver,sender)
- if error_msg:
- raise Exception(error_msg)
+ def test_multiplexing_con_with_sync_ack(self):
+ """Tests multiple sessions with sync ack"""
+
+ self.base_case(ssn_per_con=25,sync_ack=True,test_name=self.id())
+
+ def test_multiplexing_con_with_sync_pub(self):
+ """Tests multiple sessions with sync pub"""
+
+ self.base_case(ssn_per_con=25,sync_pub=True,durable=True,test_name=self.id())
+
+ def test_multiple_cons_and_ssns(self):
+ """Tests multiple connections and sessions"""
+
+ self.base_case(con_count=25,ssn_per_con=25,test_name=self.id())