summaryrefslogtreecommitdiff
path: root/qpid/java/tools/bin
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/tools/bin')
-rwxr-xr-xqpid/java/tools/bin/perf_report.sh140
-rw-r--r--qpid/java/tools/bin/qpid-bench35
-rwxr-xr-xqpid/java/tools/bin/qpid-python-testkit30
-rw-r--r--qpid/java/tools/bin/run_pub.sh24
-rw-r--r--qpid/java/tools/bin/run_sub.sh25
-rw-r--r--qpid/java/tools/bin/set-testkit-env.sh88
-rw-r--r--qpid/java/tools/bin/setenv.sh49
-rwxr-xr-xqpid/java/tools/bin/testkit.py278
8 files changed, 669 insertions, 0 deletions
diff --git a/qpid/java/tools/bin/perf_report.sh b/qpid/java/tools/bin/perf_report.sh
new file mode 100755
index 0000000000..e6b4c987e5
--- /dev/null
+++ b/qpid/java/tools/bin/perf_report.sh
@@ -0,0 +1,140 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# This will run the 8 use cases defined below and produce
+# a report in tabular format. Refer to the documentation
+# for more details.
+
+SUB_MEM=-Xmx1024M
+PUB_MEM=-Xmx1024M
+LOG_CONFIG="-Damqj.logging.level=WARN"
+QUEUE="queue;{create:always,node:{x-declare:{auto-delete:true}}}"
+DURA_QUEUE="dqueue;{create:always,node:{durable:true,x-declare:{auto-delete:true}}}"
+TOPIC="amq.topic/test"
+DURA_TOPIC="amq.topic/test;{create:always,link:{durable:true}}"
+
+. setenv.sh
+
+waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; }
+cleanup()
+{
+ pids=`ps aux | grep java | grep Perf | awk '{print $2}'`
+ if [ "$pids" != "" ]; then
+ kill -3 $pids
+ kill -9 $pids >/dev/null 2>&1
+ fi
+}
+
+# $1 test name
+# $2 consumer options
+# $3 producer options
+run_testcase()
+{
+ sh run_sub.sh $LOG_CONFIG $SUB_MEM $2 > sub.out &
+ waitfor sub.out "Warming up"
+ sh run_pub.sh $LOG_CONFIG $PUB_MEM $3 > pub.out &
+ waitfor sub.out "Completed the test"
+ waitfor pub.out "Consumer has completed the test"
+ sleep 2 #give a grace period to shutdown
+ print_result $1
+}
+
+print_result()
+{
+ prod_rate=`cat pub.out | grep "Producer rate" | awk '{print $3}'`
+ sys_rate=`cat sub.out | grep "System Throughput" | awk '{print $4}'`
+ cons_rate=`cat sub.out | grep "Consumer rate" | awk '{print $4}'`
+ avg_latency=`cat sub.out | grep "Avg Latency" | awk '{print $4}'`
+ min_latency=`cat sub.out | grep "Min Latency" | awk '{print $4}'`
+ max_latency=`cat sub.out | grep "Max Latency" | awk '{print $4}'`
+
+ printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11d|%11d|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency
+ echo "------------------------------------------------------------------------------------------------"
+}
+
+trap cleanup EXIT
+
+echo "Test report on " `date +%F`
+echo "================================================================================================"
+echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|"
+echo "------------------------------------------------------------------------------------------------"
+
+# The message counts and warmup counts are set to very low values for quick testing of the script.
+# For a real performance run I recommend setting warmup count to 10k and message count in excess of 100k
+# However for transactions, sync_publish and especially small durable transactions (which is quite slow) I recommend
+# setting very low values to start with and experiment while increasing them slowly.
+
+# Test 1 Trans Queue
+#run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 2 Dura Queue
+run_testcase "Dura_Queue" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 3 Dura Queue Sync
+run_testcase "Dura_Queue_Sync" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent"
+
+# Test 4 Dura Queue Sync Publish and Ack
+run_testcase "Dura_SyncPubAck" "-Daddress=$DURA_QUEUE -Ddurable=true -Dsync_ack=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent"
+
+# Test 5 Topic
+run_testcase "Topic" "-Daddress=$TOPIC" "-Daddress=$TOPIC -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 6 Durable Topic
+run_testcase "Dura_Topic" "-Daddress=$DURA_TOPIC -Ddurable=true" "-Daddress=$DURA_TOPIC -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 7 Fanout
+run_testcase "Fanout" "-Daddress=amq.fanout" "-Daddress=amq.fanout -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 8 Small TX
+run_testcase "Small_Txs_2" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=1" \
+ "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=1"
+
+# Test 9 Large TX
+run_testcase "Large_Txs_1000" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=10" \
+ "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=10"
+
+# Test 10 256 MSG
+run_testcase "Msg_256b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=256 -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 11 512 MSG
+run_testcase "Msg_512b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=512 -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 12 2048 MSG
+run_testcase "Msg_2048b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=2048 -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 13 Random size MSG
+run_testcase "Random_Msg_Size" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 14 Random size MSG Durable
+run_testcase "Rand_Msg_Dura" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 15 64K MSG
+run_testcase "Msg_64K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 16 Durable 64K MSG
+run_testcase "Msg_Durable_64K" "-Daddress=$DURA_QUEUE -Ddurable=true -Damqj.tcpNoDelay=true" \
+ "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 17 500K MSG
+run_testcase "Msg_500K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 18 Durable 500K MSG
+run_testcase "Msg_Dura_500K" "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Ddurable=true" \
+ "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
diff --git a/qpid/java/tools/bin/qpid-bench b/qpid/java/tools/bin/qpid-bench
new file mode 100644
index 0000000000..c982e64efd
--- /dev/null
+++ b/qpid/java/tools/bin/qpid-bench
@@ -0,0 +1,35 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+if [ -z "$QPID_HOME" ]; then
+ export QPID_HOME=$(dirname $(dirname $(readlink -f $0)))
+ export PATH=${PATH}:${QPID_HOME}/bin
+fi
+
+# Set classpath to include Qpid jar with all required jars in manifest
+QPID_LIBS=$QPID_HOME/lib/qpid-all.jar
+
+# Set other variables used by the qpid-run script before calling
+export JAVA=java \
+ JAVA_VM=-server \
+ JAVA_MEM=-Xmx1024m \
+ QPID_CLASSPATH=$QPID_LIBS
+
+. qpid-run org.apache.qpid.tools.QpidBench "$@"
diff --git a/qpid/java/tools/bin/qpid-python-testkit b/qpid/java/tools/bin/qpid-python-testkit
new file mode 100755
index 0000000000..cbe7972421
--- /dev/null
+++ b/qpid/java/tools/bin/qpid-python-testkit
@@ -0,0 +1,30 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# This is wrapper script to run the tests defined in testkit.py
+# via the python test runner. The defaults are set for a running
+# from an svn checkout
+
+. ./set-testkit-env.sh
+
+export PYTHONPATH=./:$PYTHONPATH
+rm -rf $OUTDIR
+qpid-python-test -DOUTDIR=$OUTDIR -m testkit "$@"
+
diff --git a/qpid/java/tools/bin/run_pub.sh b/qpid/java/tools/bin/run_pub.sh
new file mode 100644
index 0000000000..91b9287dea
--- /dev/null
+++ b/qpid/java/tools/bin/run_pub.sh
@@ -0,0 +1,24 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+. $QPID_TEST_HOME/bin/setenv.sh
+
+echo "$@"
+$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.tools.PerfProducer
diff --git a/qpid/java/tools/bin/run_sub.sh b/qpid/java/tools/bin/run_sub.sh
new file mode 100644
index 0000000000..c9ad2fed74
--- /dev/null
+++ b/qpid/java/tools/bin/run_sub.sh
@@ -0,0 +1,25 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+. $QPID_TEST_HOME/bin/setenv.sh
+
+echo "$@"
+$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.tools.PerfConsumer
+
diff --git a/qpid/java/tools/bin/set-testkit-env.sh b/qpid/java/tools/bin/set-testkit-env.sh
new file mode 100644
index 0000000000..051dad8179
--- /dev/null
+++ b/qpid/java/tools/bin/set-testkit-env.sh
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# If QPIDD_EXEC ..etc is not set, it will first check to see
+# if this is run from a qpid svn check out, if not it will look
+# for installed rpms.
+
+abs_path()
+{
+ D=`dirname "$1"`
+ B=`basename "$1"`
+ echo "`cd \"$D\" 2>/dev/null && pwd || echo \"$D\"`/$B"
+}
+
+# Environment for python tests
+
+if [ -d ../../../python ] ; then
+ PYTHON_DIR=../../../python
+ PYTHONPATH=$PYTHON_DIR:$PYTHON_DIR/qpid
+elif [ -z `echo $PYTHONPATH | awk '$0 ~ /qpid/'` ]; then
+ echo "WARNING: skipping test, no qpid python scripts found ."; exit 0;
+fi
+
+
+if [ "$QPIDD_EXEC" = "" ] ; then
+ if [ -x ../../../cpp/src/qpidd ]; then
+ QPIDD_EXEC=`abs_path "../../../cpp/src/qpidd"`
+ elif [ -n "$(which qpidd)" ] ; then
+ QPIDD_EXEC=$(which qpidd)
+ else
+ echo "WARNING: skipping test, QPIDD_EXEC not set and qpidd not found."; exit 0;
+ fi
+fi
+
+if [ "$CLUSTER_LIB" = "" ] ; then
+ if [ -x ../../../cpp/src/.libs/cluster.so ]; then
+ CLUSTER_LIB=`abs_path "../../../cpp/src/.libs/cluster.so"`
+ elif [ -e /usr/lib64/qpid/daemon/cluster.so ] ; then
+ CLUSTER_LIB="/usr/lib64/qpid/daemon/cluster.so"
+ elif [ -e /usr/lib/qpid/daemon/cluster.so ] ; then
+ CLUSTER_LIB="/usr/lib/qpid/daemon/cluster.so"
+ else
+ echo "WARNING: skipping test, CLUSTER_LIB not set and cluster.so not found."; exit 0;
+ fi
+fi
+
+if [ "$STORE_LIB" = "" ] ; then
+ if [ -e /usr/lib64/qpid/daemon/msgstore.so ] ; then
+ STORE_LIB="/usr/lib64/qpid/daemon/msgstore.so"
+ elif [ -e /usr/lib/qpid/daemon/msgstore.so ] ; then
+ STORE_LIB="/usr/lib/qpid/daemon/msgstore.so"
+ #else
+ # echo "WARNING: skipping test, STORE_LIB not set and msgstore.so not found."; exit 0;
+ fi
+fi
+
+if [ "$QP_CP" = "" ] ; then
+ if [ -d ../../build/lib/ ]; then
+ QP_JAR_PATH=`abs_path "../../build/lib/"`
+ elif [ -d /usr/share/java/qpid-deps ]; then
+ QP_JAR_PATH=`abs_path "/usr/share/java"`
+ else
+ "WARNING: skipping test, QP_CP not set and the Qpid jars are not present."; exit 0;
+ fi
+ QP_CP=`find $QP_JAR_PATH -name '*.jar' | tr '\n' ':'`
+fi
+
+if [ "$OUTDIR" = "" ] ; then
+ OUTDIR=`abs_path "./output"`
+fi
+
+export PYTHONPATH PYTHON_DIR QPIDD_EXEC CLUSTER_LIB QP_CP OUTDIR
diff --git a/qpid/java/tools/bin/setenv.sh b/qpid/java/tools/bin/setenv.sh
new file mode 100644
index 0000000000..24135e711b
--- /dev/null
+++ b/qpid/java/tools/bin/setenv.sh
@@ -0,0 +1,49 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Compiles the test classes and sets the CLASSPATH
+
+# check for QPID_TEST_HOME
+if [ "$QPID_TEST_HOME" = "" ] ; then
+ echo "ERROR: Please set QPID_TEST_HOME ...."
+ exit 1
+fi
+
+# check for JAVA_HOME
+if [ "$JAVA_HOME" = "" ] ; then
+ echo "ERROR: Please set JAVA_HOME ...."
+ exit 1
+fi
+
+# VENDOR_LIB path needs to be set
+# for Qpid set this to {qpid_checkout}/java/build/lib
+if [ "$VENDOR_LIB" = "" ] ; then
+ echo "ERROR: Please set VENDOR_LIB path in the script ...."
+ exit 1
+fi
+
+
+[ -d $QPID_TEST_HOME/classes ] || mkdir $QPID_TEST_HOME/classes
+
+CLASSPATH=`find $VENDOR_LIB -name *.jar* | tr '\n' ":"`
+$JAVA_HOME/bin/javac -cp $CLASSPATH -d $QPID_TEST_HOME/classes -sourcepath $QPID_TEST_HOME/src `find $QPID_TEST_HOME/src -name '*.java'`
+
+export CLASSPATH=$QPID_TEST_HOME/classes:$CLASSPATH
+
diff --git a/qpid/java/tools/bin/testkit.py b/qpid/java/tools/bin/testkit.py
new file mode 100755
index 0000000000..1c2ad598b8
--- /dev/null
+++ b/qpid/java/tools/bin/testkit.py
@@ -0,0 +1,278 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time, string, traceback
+from brokertest import *
+from qpid.messaging import *
+
+
+try:
+ import java.lang.System
+ _cp = java.lang.System.getProperty("java.class.path");
+except ImportError:
+ _cp = checkenv("QP_CP")
+
+class Formatter:
+
+ def __init__(self, message):
+ self.message = message
+ self.environ = {"M": self.message,
+ "P": self.message.properties,
+ "C": self.message.content}
+
+ def __getitem__(self, st):
+ return eval(st, self.environ)
+
+# The base test case has support for launching the generic
+# receiver and sender through the TestLauncher with all the options.
+#
+class JavaClientTest(BrokerTest):
+ """Base Case for Java Test cases"""
+
+ client_class = "org.apache.qpid.testkit.TestLauncher"
+
+ # currently there is no transparent reconnection.
+ # temp hack: just creating the queue here and closing it.
+ def start_error_watcher(self,broker=None):
+ ssn = broker.connect().session()
+ err_watcher = ssn.receiver("control; {create:always}", capacity=1)
+ ssn.close()
+
+ def store_module_args(self):
+ if BrokerTest.store_lib:
+ return ["--load-module", BrokerTest.store_lib]
+ else:
+ print "Store module not present."
+ return [""]
+
+ def client(self,**options):
+ cmd = ["java","-cp",_cp]
+
+ cmd += ["-Dtest_name=" + options.get("test_name", "UNKNOWN")]
+ cmd += ["-Dhost=" + options.get("host","127.0.0.1")]
+ cmd += ["-Dport=" + str(options.get("port",5672))]
+ cmd += ["-Dcon_count=" + str(options.get("con_count",1))]
+ cmd += ["-Dssn_per_con=" + str(options.get("ssn_per_con",1))]
+ cmd += ["-Duse_unique_dests=" + str(options.get("use_unique_dests",False))]
+ cmd += ["-Dcheck_for_dups=" + str(options.get("check_for_dups",False))]
+ cmd += ["-Ddurable=" + str(options.get("durable",False))]
+ cmd += ["-Dtransacted=" + str(options.get("transacted",False))]
+ cmd += ["-Dreceiver=" + str(options.get("receiver",False))]
+ cmd += ["-Dsync_rcv=" + str(options.get("sync_rcv",False))]
+ cmd += ["-Dsender=" + str(options.get("sender",False))]
+ cmd += ["-Dmsg_size=" + str(options.get("msg_size",256))]
+ cmd += ["-Dtx_size=" + str(options.get("tx_size",10))]
+ cmd += ["-Dmsg_count=" + str(options.get("msg_count",1000))]
+ cmd += ["-Dmax_prefetch=" + str(options.get("max_prefetch",500))]
+ cmd += ["-Dsync_ack=" + str(options.get("sync_ack",False))]
+ cmd += ["-Dsync_persistence=" + str(options.get("sync_pub",False))]
+ cmd += ["-Dsleep_time=" + str(options.get("sleep_time",1000))]
+ cmd += ["-Dfailover=" + options.get("failover", "failover_exchange")]
+ cmd += ["-Djms_durable_sub=" + str(options.get("jms_durable_sub", False))]
+ cmd += ["-Dlog.level=" + options.get("log.level", "warn")]
+ cmd += [self.client_class]
+ cmd += [options.get("address", "my_queue; {create: always}")]
+
+ #print str(options.get("port",5672))
+ return cmd
+
+ # currently there is no transparent reconnection.
+ # temp hack: just creating a receiver and closing session soon after.
+ def monitor_clients(self,broker=None,run_time=600,error_ck_freq=60):
+ ssn = broker.connect().session()
+ err_watcher = ssn.receiver("control; {create:always}", capacity=1)
+ i = run_time/error_ck_freq
+ is_error = False
+ for j in range(i):
+ not_empty = True
+ while not_empty:
+ try:
+ m = err_watcher.fetch(timeout=error_ck_freq)
+ ssn.acknowledge()
+ print "Java process notified of an error"
+ self.print_error(m)
+ is_error = True
+ except messaging.Empty, e:
+ not_empty = False
+
+ ssn.close()
+ return is_error
+
+ def print_error(self,msg):
+ print msg.properties.get("exception-trace")
+
+ def verify(self, receiver,sender):
+ sender_running = receiver.is_running()
+ receiver_running = sender.is_running()
+
+ self.assertTrue(receiver_running,"Receiver has exited prematually")
+ self.assertTrue(sender_running,"Sender has exited prematually")
+
+ def start_sender_and_receiver(self,**options):
+
+ receiver_opts = options
+ receiver_opts["receiver"]=True
+ receiver = self.popen(self.client(**receiver_opts),
+ expect=EXPECT_RUNNING)
+
+ sender_opts = options
+ sender_opts["sender"]=True
+ sender = self.popen(self.client(**sender_opts),
+ expect=EXPECT_RUNNING)
+
+ return receiver, sender
+
+ def start_cluster(self,count=2,expect=EXPECT_RUNNING,**options):
+ if options.get("durable",False)==True:
+ cluster = Cluster(self, count=count, expect=expect, args=self.store_module_args())
+ else:
+ cluster = Cluster(self, count=count)
+ return cluster
+
+class ConcurrencyTest(JavaClientTest):
+ """A concurrency test suite for the JMS client"""
+ skip = False
+
+ def base_case(self,**options):
+ if self.skip :
+ print "Skipping test"
+ return
+
+ cluster = self.start_cluster(count=2,**options)
+ self.start_error_watcher(broker=cluster[0])
+ options["port"] = port=cluster[0].port()
+
+ options["use_unique_dests"]=True
+ options["address"]="amq.topic"
+ receiver, sender = self.start_sender_and_receiver(**options)
+ self.monitor_clients(broker=cluster[0],run_time=180)
+ self.verify(receiver,sender)
+
+ def test_multiplexing_con(self):
+ """Tests multiple sessions on a single connection"""
+
+ self.base_case(ssn_per_con=25,test_name=self.id())
+
+ def test_multiplexing_con_with_tx(self):
+ """Tests multiple transacted sessions on a single connection"""
+
+ self.base_case(ssn_per_con=25,transacted=True,test_name=self.id())
+
+ def test_multiplexing_con_with_sync_rcv(self):
+ """Tests multiple sessions with sync receive"""
+
+ self.base_case(ssn_per_con=25,sync_rcv=True,test_name=self.id())
+
+ def test_multiplexing_con_with_durable_sub(self):
+ """Tests multiple sessions with durable subs"""
+
+ self.base_case(ssn_per_con=25,durable=True,jms_durable_sub=True,test_name=self.id())
+
+ def test_multiplexing_con_with_sync_ack(self):
+ """Tests multiple sessions with sync ack"""
+
+ self.base_case(ssn_per_con=25,sync_ack=True,test_name=self.id())
+
+ def test_multiplexing_con_with_sync_pub(self):
+ """Tests multiple sessions with sync pub"""
+
+ self.base_case(ssn_per_con=25,sync_pub=True,durable=True,test_name=self.id())
+
+ def test_multiple_cons_and_ssns(self):
+ """Tests multiple connections and sessions"""
+
+ self.base_case(con_count=10,ssn_per_con=25,test_name=self.id())
+
+
+class SoakTest(JavaClientTest):
+ """A soak test suite for the JMS client"""
+
+ def base_case(self,**options):
+ cluster = self.start_cluster(count=4, expect=EXPECT_EXIT_FAIL,**options)
+ options["port"] = port=cluster[0].port()
+ self.start_error_watcher(broker=cluster[0])
+ options["use_unique_dests"]=True
+ options["address"]="amq.topic"
+ receiver,sender = self.start_sender_and_receiver(**options)
+ is_error = self.monitor_clients(broker=cluster[0],run_time=30,error_ck_freq=30)
+
+ if (is_error):
+ print "The sender or receiver didn't start properly. Exiting test."
+ return
+ else:
+ "Print no error !"
+
+ # grace period for java clients to get the failover properly setup.
+ time.sleep(30)
+ error_msg= None
+ # Kill original brokers, start new ones.
+ try:
+ for i in range(8):
+ cluster[i].kill()
+ b=cluster.start()
+ self.monitor_clients(broker=b,run_time=30,error_ck_freq=30)
+ print "iteration : " + str(i)
+ except ConnectError, e1:
+ error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1)
+
+ except SessionError, e2:
+ error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2)
+
+ self.verify(receiver,sender)
+ if error_msg:
+ raise Exception(error_msg)
+
+
+ def test_failover(self) :
+ """Test basic failover"""
+
+ self.base_case(test_name=self.id())
+
+
+ def test_failover_with_durablesub(self):
+ """Test failover with durable subscriber"""
+
+ self.base_case(durable=True,jms_durable_sub=True,test_name=self.id())
+
+
+ def test_failover_with_sync_rcv(self):
+ """Test failover with sync receive"""
+
+ self.base_case(sync_rcv=True,test_name=self.id())
+
+
+ def test_failover_with_sync_ack(self):
+ """Test failover with sync ack"""
+
+ self.base_case(sync_ack=True,test_name=self.id())
+
+
+ def test_failover_with_noprefetch(self):
+ """Test failover with no prefetch"""
+
+ self.base_case(max_prefetch=1,test_name=self.id())
+
+
+ def test_failover_with_multiple_cons_and_ssns(self):
+ """Test failover with multiple connections and sessions"""
+
+ self.base_case(use_unique_dests=True,address="amq.topic",
+ con_count=10,ssn_per_con=25,test_name=self.id())