diff options
Diffstat (limited to 'qpid/java/tools/bin')
-rwxr-xr-x | qpid/java/tools/bin/perf_report.sh | 140 | ||||
-rw-r--r-- | qpid/java/tools/bin/qpid-bench | 35 | ||||
-rwxr-xr-x | qpid/java/tools/bin/qpid-python-testkit | 30 | ||||
-rw-r--r-- | qpid/java/tools/bin/run_pub.sh | 24 | ||||
-rw-r--r-- | qpid/java/tools/bin/run_sub.sh | 25 | ||||
-rw-r--r-- | qpid/java/tools/bin/set-testkit-env.sh | 88 | ||||
-rw-r--r-- | qpid/java/tools/bin/setenv.sh | 49 | ||||
-rwxr-xr-x | qpid/java/tools/bin/testkit.py | 278 |
8 files changed, 669 insertions, 0 deletions
diff --git a/qpid/java/tools/bin/perf_report.sh b/qpid/java/tools/bin/perf_report.sh new file mode 100755 index 0000000000..e6b4c987e5 --- /dev/null +++ b/qpid/java/tools/bin/perf_report.sh @@ -0,0 +1,140 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This will run the 8 use cases defined below and produce +# a report in tabular format. Refer to the documentation +# for more details. + +SUB_MEM=-Xmx1024M +PUB_MEM=-Xmx1024M +LOG_CONFIG="-Damqj.logging.level=WARN" +QUEUE="queue;{create:always,node:{x-declare:{auto-delete:true}}}" +DURA_QUEUE="dqueue;{create:always,node:{durable:true,x-declare:{auto-delete:true}}}" +TOPIC="amq.topic/test" +DURA_TOPIC="amq.topic/test;{create:always,link:{durable:true}}" + +. setenv.sh + +waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; } +cleanup() +{ + pids=`ps aux | grep java | grep Perf | awk '{print $2}'` + if [ "$pids" != "" ]; then + kill -3 $pids + kill -9 $pids >/dev/null 2>&1 + fi +} + +# $1 test name +# $2 consumer options +# $3 producer options +run_testcase() +{ + sh run_sub.sh $LOG_CONFIG $SUB_MEM $2 > sub.out & + waitfor sub.out "Warming up" + sh run_pub.sh $LOG_CONFIG $PUB_MEM $3 > pub.out & + waitfor sub.out "Completed the test" + waitfor pub.out "Consumer has completed the test" + sleep 2 #give a grace period to shutdown + print_result $1 +} + +print_result() +{ + prod_rate=`cat pub.out | grep "Producer rate" | awk '{print $3}'` + sys_rate=`cat sub.out | grep "System Throughput" | awk '{print $4}'` + cons_rate=`cat sub.out | grep "Consumer rate" | awk '{print $4}'` + avg_latency=`cat sub.out | grep "Avg Latency" | awk '{print $4}'` + min_latency=`cat sub.out | grep "Min Latency" | awk '{print $4}'` + max_latency=`cat sub.out | grep "Max Latency" | awk '{print $4}'` + + printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11d|%11d|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency + echo "------------------------------------------------------------------------------------------------" +} + +trap cleanup EXIT + +echo "Test report on " `date +%F` +echo "================================================================================================" +echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|" +echo "------------------------------------------------------------------------------------------------" + +# The message counts and warmup counts are set to very low values for quick testing of the script. +# For a real performance run I recommend setting warmup count to 10k and message count in excess of 100k +# However for transactions, sync_publish and especially small durable transactions (which is quite slow) I recommend +# setting very low values to start with and experiment while increasing them slowly. + +# Test 1 Trans Queue +#run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10" + +# Test 2 Dura Queue +run_testcase "Dura_Queue" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" + +# Test 3 Dura Queue Sync +run_testcase "Dura_Queue_Sync" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent" + +# Test 4 Dura Queue Sync Publish and Ack +run_testcase "Dura_SyncPubAck" "-Daddress=$DURA_QUEUE -Ddurable=true -Dsync_ack=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent" + +# Test 5 Topic +run_testcase "Topic" "-Daddress=$TOPIC" "-Daddress=$TOPIC -Dwarmup_count=1 -Dmsg_count=10" + +# Test 6 Durable Topic +run_testcase "Dura_Topic" "-Daddress=$DURA_TOPIC -Ddurable=true" "-Daddress=$DURA_TOPIC -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" + +# Test 7 Fanout +run_testcase "Fanout" "-Daddress=amq.fanout" "-Daddress=amq.fanout -Dwarmup_count=1 -Dmsg_count=10" + +# Test 8 Small TX +run_testcase "Small_Txs_2" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=1" \ + "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=1" + +# Test 9 Large TX +run_testcase "Large_Txs_1000" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=10" \ + "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=10" + +# Test 10 256 MSG +run_testcase "Msg_256b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=256 -Dwarmup_count=1 -Dmsg_count=10" + +# Test 11 512 MSG +run_testcase "Msg_512b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=512 -Dwarmup_count=1 -Dmsg_count=10" + +# Test 12 2048 MSG +run_testcase "Msg_2048b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=2048 -Dwarmup_count=1 -Dmsg_count=10" + +# Test 13 Random size MSG +run_testcase "Random_Msg_Size" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10" + +# Test 14 Random size MSG Durable +run_testcase "Rand_Msg_Dura" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10" + +# Test 15 64K MSG +run_testcase "Msg_64K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Dwarmup_count=1 -Dmsg_count=10" + +# Test 16 Durable 64K MSG +run_testcase "Msg_Durable_64K" "-Daddress=$DURA_QUEUE -Ddurable=true -Damqj.tcpNoDelay=true" \ + "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" + +# Test 17 500K MSG +run_testcase "Msg_500K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Dwarmup_count=1 -Dmsg_count=10" + +# Test 18 Durable 500K MSG +run_testcase "Msg_Dura_500K" "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Ddurable=true" \ + "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" diff --git a/qpid/java/tools/bin/qpid-bench b/qpid/java/tools/bin/qpid-bench new file mode 100644 index 0000000000..c982e64efd --- /dev/null +++ b/qpid/java/tools/bin/qpid-bench @@ -0,0 +1,35 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +if [ -z "$QPID_HOME" ]; then + export QPID_HOME=$(dirname $(dirname $(readlink -f $0))) + export PATH=${PATH}:${QPID_HOME}/bin +fi + +# Set classpath to include Qpid jar with all required jars in manifest +QPID_LIBS=$QPID_HOME/lib/qpid-all.jar + +# Set other variables used by the qpid-run script before calling +export JAVA=java \ + JAVA_VM=-server \ + JAVA_MEM=-Xmx1024m \ + QPID_CLASSPATH=$QPID_LIBS + +. qpid-run org.apache.qpid.tools.QpidBench "$@" diff --git a/qpid/java/tools/bin/qpid-python-testkit b/qpid/java/tools/bin/qpid-python-testkit new file mode 100755 index 0000000000..cbe7972421 --- /dev/null +++ b/qpid/java/tools/bin/qpid-python-testkit @@ -0,0 +1,30 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This is wrapper script to run the tests defined in testkit.py +# via the python test runner. The defaults are set for a running +# from an svn checkout + +. ./set-testkit-env.sh + +export PYTHONPATH=./:$PYTHONPATH +rm -rf $OUTDIR +qpid-python-test -DOUTDIR=$OUTDIR -m testkit "$@" + diff --git a/qpid/java/tools/bin/run_pub.sh b/qpid/java/tools/bin/run_pub.sh new file mode 100644 index 0000000000..91b9287dea --- /dev/null +++ b/qpid/java/tools/bin/run_pub.sh @@ -0,0 +1,24 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +. $QPID_TEST_HOME/bin/setenv.sh + +echo "$@" +$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.tools.PerfProducer diff --git a/qpid/java/tools/bin/run_sub.sh b/qpid/java/tools/bin/run_sub.sh new file mode 100644 index 0000000000..c9ad2fed74 --- /dev/null +++ b/qpid/java/tools/bin/run_sub.sh @@ -0,0 +1,25 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +. $QPID_TEST_HOME/bin/setenv.sh + +echo "$@" +$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.tools.PerfConsumer + diff --git a/qpid/java/tools/bin/set-testkit-env.sh b/qpid/java/tools/bin/set-testkit-env.sh new file mode 100644 index 0000000000..051dad8179 --- /dev/null +++ b/qpid/java/tools/bin/set-testkit-env.sh @@ -0,0 +1,88 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# If QPIDD_EXEC ..etc is not set, it will first check to see +# if this is run from a qpid svn check out, if not it will look +# for installed rpms. + +abs_path() +{ + D=`dirname "$1"` + B=`basename "$1"` + echo "`cd \"$D\" 2>/dev/null && pwd || echo \"$D\"`/$B" +} + +# Environment for python tests + +if [ -d ../../../python ] ; then + PYTHON_DIR=../../../python + PYTHONPATH=$PYTHON_DIR:$PYTHON_DIR/qpid +elif [ -z `echo $PYTHONPATH | awk '$0 ~ /qpid/'` ]; then + echo "WARNING: skipping test, no qpid python scripts found ."; exit 0; +fi + + +if [ "$QPIDD_EXEC" = "" ] ; then + if [ -x ../../../cpp/src/qpidd ]; then + QPIDD_EXEC=`abs_path "../../../cpp/src/qpidd"` + elif [ -n "$(which qpidd)" ] ; then + QPIDD_EXEC=$(which qpidd) + else + echo "WARNING: skipping test, QPIDD_EXEC not set and qpidd not found."; exit 0; + fi +fi + +if [ "$CLUSTER_LIB" = "" ] ; then + if [ -x ../../../cpp/src/.libs/cluster.so ]; then + CLUSTER_LIB=`abs_path "../../../cpp/src/.libs/cluster.so"` + elif [ -e /usr/lib64/qpid/daemon/cluster.so ] ; then + CLUSTER_LIB="/usr/lib64/qpid/daemon/cluster.so" + elif [ -e /usr/lib/qpid/daemon/cluster.so ] ; then + CLUSTER_LIB="/usr/lib/qpid/daemon/cluster.so" + else + echo "WARNING: skipping test, CLUSTER_LIB not set and cluster.so not found."; exit 0; + fi +fi + +if [ "$STORE_LIB" = "" ] ; then + if [ -e /usr/lib64/qpid/daemon/msgstore.so ] ; then + STORE_LIB="/usr/lib64/qpid/daemon/msgstore.so" + elif [ -e /usr/lib/qpid/daemon/msgstore.so ] ; then + STORE_LIB="/usr/lib/qpid/daemon/msgstore.so" + #else + # echo "WARNING: skipping test, STORE_LIB not set and msgstore.so not found."; exit 0; + fi +fi + +if [ "$QP_CP" = "" ] ; then + if [ -d ../../build/lib/ ]; then + QP_JAR_PATH=`abs_path "../../build/lib/"` + elif [ -d /usr/share/java/qpid-deps ]; then + QP_JAR_PATH=`abs_path "/usr/share/java"` + else + "WARNING: skipping test, QP_CP not set and the Qpid jars are not present."; exit 0; + fi + QP_CP=`find $QP_JAR_PATH -name '*.jar' | tr '\n' ':'` +fi + +if [ "$OUTDIR" = "" ] ; then + OUTDIR=`abs_path "./output"` +fi + +export PYTHONPATH PYTHON_DIR QPIDD_EXEC CLUSTER_LIB QP_CP OUTDIR diff --git a/qpid/java/tools/bin/setenv.sh b/qpid/java/tools/bin/setenv.sh new file mode 100644 index 0000000000..24135e711b --- /dev/null +++ b/qpid/java/tools/bin/setenv.sh @@ -0,0 +1,49 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Compiles the test classes and sets the CLASSPATH + +# check for QPID_TEST_HOME +if [ "$QPID_TEST_HOME" = "" ] ; then + echo "ERROR: Please set QPID_TEST_HOME ...." + exit 1 +fi + +# check for JAVA_HOME +if [ "$JAVA_HOME" = "" ] ; then + echo "ERROR: Please set JAVA_HOME ...." + exit 1 +fi + +# VENDOR_LIB path needs to be set +# for Qpid set this to {qpid_checkout}/java/build/lib +if [ "$VENDOR_LIB" = "" ] ; then + echo "ERROR: Please set VENDOR_LIB path in the script ...." + exit 1 +fi + + +[ -d $QPID_TEST_HOME/classes ] || mkdir $QPID_TEST_HOME/classes + +CLASSPATH=`find $VENDOR_LIB -name *.jar* | tr '\n' ":"` +$JAVA_HOME/bin/javac -cp $CLASSPATH -d $QPID_TEST_HOME/classes -sourcepath $QPID_TEST_HOME/src `find $QPID_TEST_HOME/src -name '*.java'` + +export CLASSPATH=$QPID_TEST_HOME/classes:$CLASSPATH + diff --git a/qpid/java/tools/bin/testkit.py b/qpid/java/tools/bin/testkit.py new file mode 100755 index 0000000000..1c2ad598b8 --- /dev/null +++ b/qpid/java/tools/bin/testkit.py @@ -0,0 +1,278 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import time, string, traceback +from brokertest import * +from qpid.messaging import * + + +try: + import java.lang.System + _cp = java.lang.System.getProperty("java.class.path"); +except ImportError: + _cp = checkenv("QP_CP") + +class Formatter: + + def __init__(self, message): + self.message = message + self.environ = {"M": self.message, + "P": self.message.properties, + "C": self.message.content} + + def __getitem__(self, st): + return eval(st, self.environ) + +# The base test case has support for launching the generic +# receiver and sender through the TestLauncher with all the options. +# +class JavaClientTest(BrokerTest): + """Base Case for Java Test cases""" + + client_class = "org.apache.qpid.testkit.TestLauncher" + + # currently there is no transparent reconnection. + # temp hack: just creating the queue here and closing it. + def start_error_watcher(self,broker=None): + ssn = broker.connect().session() + err_watcher = ssn.receiver("control; {create:always}", capacity=1) + ssn.close() + + def store_module_args(self): + if BrokerTest.store_lib: + return ["--load-module", BrokerTest.store_lib] + else: + print "Store module not present." + return [""] + + def client(self,**options): + cmd = ["java","-cp",_cp] + + cmd += ["-Dtest_name=" + options.get("test_name", "UNKNOWN")] + cmd += ["-Dhost=" + options.get("host","127.0.0.1")] + cmd += ["-Dport=" + str(options.get("port",5672))] + cmd += ["-Dcon_count=" + str(options.get("con_count",1))] + cmd += ["-Dssn_per_con=" + str(options.get("ssn_per_con",1))] + cmd += ["-Duse_unique_dests=" + str(options.get("use_unique_dests",False))] + cmd += ["-Dcheck_for_dups=" + str(options.get("check_for_dups",False))] + cmd += ["-Ddurable=" + str(options.get("durable",False))] + cmd += ["-Dtransacted=" + str(options.get("transacted",False))] + cmd += ["-Dreceiver=" + str(options.get("receiver",False))] + cmd += ["-Dsync_rcv=" + str(options.get("sync_rcv",False))] + cmd += ["-Dsender=" + str(options.get("sender",False))] + cmd += ["-Dmsg_size=" + str(options.get("msg_size",256))] + cmd += ["-Dtx_size=" + str(options.get("tx_size",10))] + cmd += ["-Dmsg_count=" + str(options.get("msg_count",1000))] + cmd += ["-Dmax_prefetch=" + str(options.get("max_prefetch",500))] + cmd += ["-Dsync_ack=" + str(options.get("sync_ack",False))] + cmd += ["-Dsync_persistence=" + str(options.get("sync_pub",False))] + cmd += ["-Dsleep_time=" + str(options.get("sleep_time",1000))] + cmd += ["-Dfailover=" + options.get("failover", "failover_exchange")] + cmd += ["-Djms_durable_sub=" + str(options.get("jms_durable_sub", False))] + cmd += ["-Dlog.level=" + options.get("log.level", "warn")] + cmd += [self.client_class] + cmd += [options.get("address", "my_queue; {create: always}")] + + #print str(options.get("port",5672)) + return cmd + + # currently there is no transparent reconnection. + # temp hack: just creating a receiver and closing session soon after. + def monitor_clients(self,broker=None,run_time=600,error_ck_freq=60): + ssn = broker.connect().session() + err_watcher = ssn.receiver("control; {create:always}", capacity=1) + i = run_time/error_ck_freq + is_error = False + for j in range(i): + not_empty = True + while not_empty: + try: + m = err_watcher.fetch(timeout=error_ck_freq) + ssn.acknowledge() + print "Java process notified of an error" + self.print_error(m) + is_error = True + except messaging.Empty, e: + not_empty = False + + ssn.close() + return is_error + + def print_error(self,msg): + print msg.properties.get("exception-trace") + + def verify(self, receiver,sender): + sender_running = receiver.is_running() + receiver_running = sender.is_running() + + self.assertTrue(receiver_running,"Receiver has exited prematually") + self.assertTrue(sender_running,"Sender has exited prematually") + + def start_sender_and_receiver(self,**options): + + receiver_opts = options + receiver_opts["receiver"]=True + receiver = self.popen(self.client(**receiver_opts), + expect=EXPECT_RUNNING) + + sender_opts = options + sender_opts["sender"]=True + sender = self.popen(self.client(**sender_opts), + expect=EXPECT_RUNNING) + + return receiver, sender + + def start_cluster(self,count=2,expect=EXPECT_RUNNING,**options): + if options.get("durable",False)==True: + cluster = Cluster(self, count=count, expect=expect, args=self.store_module_args()) + else: + cluster = Cluster(self, count=count) + return cluster + +class ConcurrencyTest(JavaClientTest): + """A concurrency test suite for the JMS client""" + skip = False + + def base_case(self,**options): + if self.skip : + print "Skipping test" + return + + cluster = self.start_cluster(count=2,**options) + self.start_error_watcher(broker=cluster[0]) + options["port"] = port=cluster[0].port() + + options["use_unique_dests"]=True + options["address"]="amq.topic" + receiver, sender = self.start_sender_and_receiver(**options) + self.monitor_clients(broker=cluster[0],run_time=180) + self.verify(receiver,sender) + + def test_multiplexing_con(self): + """Tests multiple sessions on a single connection""" + + self.base_case(ssn_per_con=25,test_name=self.id()) + + def test_multiplexing_con_with_tx(self): + """Tests multiple transacted sessions on a single connection""" + + self.base_case(ssn_per_con=25,transacted=True,test_name=self.id()) + + def test_multiplexing_con_with_sync_rcv(self): + """Tests multiple sessions with sync receive""" + + self.base_case(ssn_per_con=25,sync_rcv=True,test_name=self.id()) + + def test_multiplexing_con_with_durable_sub(self): + """Tests multiple sessions with durable subs""" + + self.base_case(ssn_per_con=25,durable=True,jms_durable_sub=True,test_name=self.id()) + + def test_multiplexing_con_with_sync_ack(self): + """Tests multiple sessions with sync ack""" + + self.base_case(ssn_per_con=25,sync_ack=True,test_name=self.id()) + + def test_multiplexing_con_with_sync_pub(self): + """Tests multiple sessions with sync pub""" + + self.base_case(ssn_per_con=25,sync_pub=True,durable=True,test_name=self.id()) + + def test_multiple_cons_and_ssns(self): + """Tests multiple connections and sessions""" + + self.base_case(con_count=10,ssn_per_con=25,test_name=self.id()) + + +class SoakTest(JavaClientTest): + """A soak test suite for the JMS client""" + + def base_case(self,**options): + cluster = self.start_cluster(count=4, expect=EXPECT_EXIT_FAIL,**options) + options["port"] = port=cluster[0].port() + self.start_error_watcher(broker=cluster[0]) + options["use_unique_dests"]=True + options["address"]="amq.topic" + receiver,sender = self.start_sender_and_receiver(**options) + is_error = self.monitor_clients(broker=cluster[0],run_time=30,error_ck_freq=30) + + if (is_error): + print "The sender or receiver didn't start properly. Exiting test." + return + else: + "Print no error !" + + # grace period for java clients to get the failover properly setup. + time.sleep(30) + error_msg= None + # Kill original brokers, start new ones. + try: + for i in range(8): + cluster[i].kill() + b=cluster.start() + self.monitor_clients(broker=b,run_time=30,error_ck_freq=30) + print "iteration : " + str(i) + except ConnectError, e1: + error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1) + + except SessionError, e2: + error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2) + + self.verify(receiver,sender) + if error_msg: + raise Exception(error_msg) + + + def test_failover(self) : + """Test basic failover""" + + self.base_case(test_name=self.id()) + + + def test_failover_with_durablesub(self): + """Test failover with durable subscriber""" + + self.base_case(durable=True,jms_durable_sub=True,test_name=self.id()) + + + def test_failover_with_sync_rcv(self): + """Test failover with sync receive""" + + self.base_case(sync_rcv=True,test_name=self.id()) + + + def test_failover_with_sync_ack(self): + """Test failover with sync ack""" + + self.base_case(sync_ack=True,test_name=self.id()) + + + def test_failover_with_noprefetch(self): + """Test failover with no prefetch""" + + self.base_case(max_prefetch=1,test_name=self.id()) + + + def test_failover_with_multiple_cons_and_ssns(self): + """Test failover with multiple connections and sessions""" + + self.base_case(use_unique_dests=True,address="amq.topic", + con_count=10,ssn_per_con=25,test_name=self.id()) |