diff options
-rw-r--r-- | qpid/java/testkit/bin/setenv.sh | 10 | ||||
-rwxr-xr-x | qpid/java/testkit/testkit.py | 186 |
2 files changed, 74 insertions, 122 deletions
diff --git a/qpid/java/testkit/bin/setenv.sh b/qpid/java/testkit/bin/setenv.sh index 71bb219dcf..e6a726eef1 100644 --- a/qpid/java/testkit/bin/setenv.sh +++ b/qpid/java/testkit/bin/setenv.sh @@ -60,6 +60,16 @@ if [ "$CLUSTER_LIB" = "" ] ; then fi fi +if [ "$STORE_LIB" = "" ] ; then + if [ -e /usr/lib64/qpid/daemon/msgstore.so ] ; then + CLUSTER_LIB="/usr/lib64/qpid/daemon/msgstore.so" + elif [ -e /usr/lib/qpid/daemon/msgstore.so ] ; then + CLUSTER_LIB="/usr/lib/qpid/daemon/msgstore.so" + else + echo "WARNING: skipping test, STORE_LIB not set and msgstore.so not found."; exit 0; + fi +fi + if [ "$QP_CP" = "" ] ; then if [ -d ../../build/lib/ ]; then QP_JAR_PATH=`abs_path "../../build/lib/"` diff --git a/qpid/java/testkit/testkit.py b/qpid/java/testkit/testkit.py index 5f83156c38..c308b90d77 100755 --- a/qpid/java/testkit/testkit.py +++ b/qpid/java/testkit/testkit.py @@ -56,6 +56,13 @@ class JavaClientTest(BrokerTest): err_watcher = ssn.receiver("control; {create:always}", capacity=1) ssn.close() + def store_module_args(self): + if BrokerTest.store_lib: + return ["--load-module", BrokerTest.store_lib] + else: + print "Store module not present." + return [""] + def client(self,**options): cmd = ["java","-cp",_cp] @@ -65,7 +72,7 @@ class JavaClientTest(BrokerTest): cmd += ["-Dcon_count=" + str(options.get("con_count",1))] cmd += ["-Dssn_per_con=" + str(options.get("ssn_per_con",1))] cmd += ["-Duse_unique_dests=" + str(options.get("use_unique_dests",False))] - cmd += ["-Dcheck_for_dups=" + str(options.get("check_for_dups",True))] + cmd += ["-Dcheck_for_dups=" + str(options.get("check_for_dups",False))] cmd += ["-Ddurable=" + str(options.get("durable",False))] cmd += ["-Dtransacted=" + str(options.get("transacted",False))] cmd += ["-Dreceiver=" + str(options.get("receiver",False))] @@ -73,7 +80,10 @@ class JavaClientTest(BrokerTest): cmd += ["-Dsender=" + str(options.get("sender",False))] cmd += ["-Dmsg_size=" + str(options.get("msg_size",256))] cmd += ["-Dtx_size=" + str(options.get("tx_size",10))] - cmd += ["-Dmsg_count=" + str(options.get("msg_count",10))] + cmd += ["-Dmsg_count=" + str(options.get("msg_count",1000))] + cmd += ["-Dmax_prefetch=" + str(options.get("max_prefetch",500))] + cmd += ["-Dsync_ack=" + str(options.get("sync_ack",False))] + cmd += ["-Dsync_persistence=" + str(options.get("sync_pub",False))] cmd += ["-Dsleep_time=" + str(options.get("sleep_time",1000))] cmd += ["-Dfailover=" + options.get("failover", "failover_exchange")] cmd += ["-Djms_durable_sub=" + str(options.get("jms_durable_sub", False))] @@ -113,140 +123,72 @@ class JavaClientTest(BrokerTest): self.assertTrue(receiver_running,"Receiver has exited prematually") self.assertTrue(sender_running,"Sender has exited prematually") + def start_sender_and_receiver(self,**options): + + options["use_unique_dests"]=True + options["address"]="amq.topic" + receiver_opts = options + receiver_opts["receiver"]=True + receiver = self.popen(self.client(**receiver_opts), + expect=EXPECT_RUNNING) + + sender_opts = options + sender_opts["sender"]=True + sender = self.popen(self.client(**sender_opts), + expect=EXPECT_RUNNING) + + return receiver, sender class ConcurrencyTest(JavaClientTest): """A concurrency test suite for the JMS client""" + skip = False - def test_multiplexing_con(self): - """Tests multiple sessions on a single connection""" + def base_case(self,**options): + if self.skip : + print "Skipping test" + return - cluster = Cluster(self, 2) - p = cluster[0].port() - + if options["durable"]==True: + cluster = Cluster(self, count=2,args=self.store_module_args()) + else: + cluster = Cluster(self, count=2) self.start_error_watcher(broker=cluster[0]) - - receiver = self.popen(self.client(receiver=True, - ssn_per_con=25, - port=p, - address="amq.topic", - use_unique_dests=True, - test_name=self.id()), - expect=EXPECT_RUNNING) - - sender = self.popen(self.client(sender=True, - ssn_per_con=25, - port=p, - address="amq.topic", - use_unique_dests=True, - test_name=self.id()), - expect=EXPECT_RUNNING) - + options["port"] = port=cluster[0].port() + receiver, sender = self.start_sender_and_receiver(**options) self.monitor_clients(broker=cluster[0],run_time=180) self.verify(receiver,sender) + def test_multiplexing_con(self): + """Tests multiple sessions on a single connection""" + + self.base_case(ssn_per_con=25,test_name=self.id()) - def test_multiplexing_con_tx(self): + def test_multiplexing_con_with_tx(self): """Tests multiple transacted sessions on a single connection""" - cluster = Cluster(self,2) - ssn = cluster[0].connect().session() - p = cluster[0].port() - - self.start_error_watcher(broker=cluster[0]) + self.base_case(ssn_per_con=25,transacted=True,test_name=self.id()) - receiver = self.popen(self.client(receiver=True, - ssn_per_con=25, - port=p, - transacted=True, - address="amq.topic", - use_unique_dests=True, - test_name=self.id()), - expect=EXPECT_RUNNING) - - sender = self.popen(self.client(sender=True, - ssn_per_con=25, - port=p, - transacted=True, - address="amq.topic", - use_unique_dests=True, - test_name=self.id()), - expect=EXPECT_RUNNING) - - self.monitor_clients(broker=cluster[0],run_time=60) - ssn.close(); - self.verify(receiver,sender) + def test_multiplexing_con_with_sync_rcv(self): + """Tests multiple sessions with sync receive""" -class SoakTest(JavaClientTest): - """A soak test suite for the JMS client""" + self.base_case(ssn_per_con=25,sync_rcv=True,test_name=self.id()) - def test_failover(self): - cluster = self.cluster(4, expect=EXPECT_EXIT_FAIL) - p = cluster[0].port() - self.start_error_watcher(broker=cluster[0]) - receiver = self.popen(self.client(receiver=True, - ssn_per_con=1, - port=p, - test_name=self.id()), - expect=EXPECT_RUNNING) - - sender = self.popen(self.client(sender=True, - ssn_per_con=1, - port=p, - test_name=self.id()), - expect=EXPECT_RUNNING) - - # grace period for java clients to get the failover properly setup. - time.sleep(30) - error_msg= None - # Kill original brokers, start new ones. - try: - for i in range(8): - cluster[i].kill() - b=cluster.start() - self.monitor_clients(broker=b,run_time=30,error_ck_freq=30) - print "iteration : " + str(i) - except ConnectError, e1: - error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1) - - except SessionError, e2: - error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2) + def test_multiplexing_con_with_durable_sub(self): + """Tests multiple sessions with durable subs""" - self.verify(receiver,sender) - if error_msg: - raise Exception(error_msg) - - def test_failover_with_durablesub(self): - cluster = self.cluster(4, expect=EXPECT_EXIT_FAIL) - p = cluster[0].port() - self.start_error_watcher(broker=cluster[0]) - receiver = self.popen(self.client(receiver=True, - ssn_per_con=1, - port=p, - jms_durable_sub=True, - test_name=self.id()), - expect=EXPECT_RUNNING) - - sender = self.popen(self.client(sender=True, - ssn_per_con=1, - port=p, - test_name=self.id()), - expect=EXPECT_RUNNING) - - # grace period for java clients to get the failover properly setup. - time.sleep(30) - error_msg=None - # Kill original brokers, start new ones. - try: - for i in range(8): - cluster[i].kill() - b=cluster.start() - self.monitor_clients(broker=b,run_time=30,error_ck_freq=30) - except ConnectError, e1: - error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1) - - except SessionError, e2: - error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2) + self.base_case(ssn_per_con=25,jms_durable_sub=True,test_name=self.id()) - self.verify(receiver,sender) - if error_msg: - raise Exception(error_msg) + def test_multiplexing_con_with_sync_ack(self): + """Tests multiple sessions with sync ack""" + + self.base_case(ssn_per_con=25,sync_ack=True,test_name=self.id()) + + def test_multiplexing_con_with_sync_pub(self): + """Tests multiple sessions with sync pub""" + + self.base_case(ssn_per_con=25,sync_pub=True,durable=True,test_name=self.id()) + + def test_multiple_cons_and_ssns(self): + """Tests multiple connections and sessions""" + + self.base_case(con_count=25,ssn_per_con=25,test_name=self.id()) |