diff options
Diffstat (limited to 'qpid/java/tools')
24 files changed, 4135 insertions, 0 deletions
diff --git a/qpid/java/tools/README.txt b/qpid/java/tools/README.txt new file mode 100644 index 0000000000..fdde734027 --- /dev/null +++ b/qpid/java/tools/README.txt @@ -0,0 +1,153 @@ +Introduction +============ + +The Test kit for the java client consists of 2 components. + +1) A Simple Perf Test that can be used to, + a) Run a predefined perf report consisting of 8 use cases (see below) + b) Run a producer and a consumer with a number of different options + +2) Soak tests that can be run for longer durations (hours or days). + +I am planning to add some stress tests to this module as well. +Please note this is not a replacement for the existing perf/systests etc. +But rather a small test kit thats focused on providing a packaged set of tests that can be quickly deployed on an environment to do quick smoke testing or easily setup a soak test. + +Table of Contents +================= +1. Perf Kit +2. Soak Kit +3. Perf Test use cases +4. Soak Test use cases +5. Running the sample perf test report +6. Running the sample soak test report + +1.0 Perf Kit +------------ +1.1 The perf kit can be packaged as an RPM or a tar file and deploy on a target environment and run the perf report. +Or else a perf report can be automated to run every day or so an record numbers to catch perf regressions. + +1.2 It calculates the following results in msg/sec. + + System throuhgput : no_of_msgs / (time_last_msg_rcvd - time_first_msg_send) + + Producer rate : no_of_msgs / (time_after_sending - time_before_sending) + + Producer rate : no_of_msgs / (time_last_msg_rcvd - time_first_msg_rcvd) + + Latency : time_msg_rcvd - time_msg_sent + +The test will print min, max and avg latency. + +1.3 The test assume that both producer and consumer are run on the same machine or different machines that are time synced. + +1.4 You can also use run_sub.sh and run_pub.sh to run different use cases with several options. + Please look at TestParams.java for all the configurable options. + +1.5 You can also use the test kit to benchmark against any vendor. + + +2.0 Soak tests +-------------- +2.0 This includes a set of soak tests that can be run for a longer duration. + +2.1 A typical test will send x-1 messages and the xth message will contain an "End" marker. + The producer will print the timestamp as soon as it sends the xth message. + The consumer will reply with an empty message to the replyTo destination given in the xth message. + The consumer prints the throuhgput for the iteration and the latency for the xth message. + A typical value for x is 100k + +2.2 The feedback loop prevents the producer from overrunning the consumer. + And the printout for every xth message will let you know how many iterations been completed at any given time. + (Ex a simple cat log | wc -l will give you the how many iterations have been completed so far). + +2.2 The following results can be calculated for these tests. + + Memory, CPU for each producer/consumer - look at testkit/bin/run_soak_client.sh for an example + + You can find the Avg, Min & Max for throughput, latency, CPU and memory for the entire test run. + (look at testkit/bin/soak_report.sh) for an example). + + You could also graph throughput, latency, CPU and memory using the comma separated log files. + +2.2 If you use different machines for producer and consumer the machines have to be time synced to have meaningful latency samples. + +3.0 Perf Test report use cases +------------------------------- +3.1 Please check testkit/bin/perf_report.sh for more details + +3.2 A typical test run will send 1000 msgs during warmup and 200k msgs for result calculation. + +Test 1 Trans Queue + +Test 2 Dura Queue + +Test 3 Dura Queue Sync + +Test 4 Topic + +Test 5 Durable Topic + +Test 6 Fanout + +Test 7 Small TX (about 2 msgs per tx) + +Test 8 Large TX (about 1000 msgs per tx) + + +4.0 Soak tests use cases +------------------------- +4.1 Following are the current tests available in the test kit. + +4.2 Please refer to the source to see the javadoc and options + + +1. SimpleProducer/Consumer sends X messages at a time and will wait for confirmation from producer before proceeding with the next iteration. A no of options can be configured. + +2. MultiThreadedProducer/Consumer does the same thing as above but runs each session in a separate thread. + It can also send messages transactionally. Again a no of options can be configured. + +3. ResourceLeakTest will setup consumer/producers sends x messages and then teard down everything and continue again. + + +5.0 Running the sample perf test report +--------------------------------------- +The testkit/bin contains perf_report.sh. +It runs the above 8 use cases against a broker and print the results in a tabular format. + +For example +================================================================================================ +|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency| +------------------------------------------------------------------------------------------------ +|Trans_Queue | xxxxx.xx| xxxxx.xx| xxxxx.xx| xx.xx| x| xx| + + +5.1 running perf_report.sh + +5.1.1 set JAVA_HOME to point to Java 1.5 and above +5.1.2 set QPID_TEST_HOME to point to the testkit dir +5.1.3 set VENDOR_LIB to point to the Qpid (or other JMS providers) jar files. +5.1.4 start a broker +5.1.5 update the testkit/etc/jndi.properties to point to the correct broker +5.1.6 execute perf_report.sh + + +6.0 Running the sample soak test report +--------------------------------------- +The testkit/bin contains soak_report.sh +It runs MultiThreadedProducer/Consumer for the duration specified and prints a report for the following stats. +Avg, Min and Max for System Throughput, letency, CPU and memory. + +6.1 running soak_report.sh + +5.1.1 set JAVA_HOME to point to Java 1.5 and above +5.1.2 set QPID_TEST_HOME to point to the testkit dir +5.1.3 set JAR_PATH to point to the Qpid jars +5.1.4 start a broker +5.1.5 execute soak_report.sh with correct params. + Ex sh soak_report.sh 1 36000 will run for 10 hours colllecting CPU, memory every second. + +5.1.6 Please note the total duration for the test is log_freq * log_iterations + So if you want to run the test for 10 hours and collect 10 second samples then do the following + sh soak_report.sh 10 3600 + diff --git a/qpid/java/tools/bin/perf_report.sh b/qpid/java/tools/bin/perf_report.sh new file mode 100755 index 0000000000..e6b4c987e5 --- /dev/null +++ b/qpid/java/tools/bin/perf_report.sh @@ -0,0 +1,140 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This will run the 8 use cases defined below and produce +# a report in tabular format. Refer to the documentation +# for more details. + +SUB_MEM=-Xmx1024M +PUB_MEM=-Xmx1024M +LOG_CONFIG="-Damqj.logging.level=WARN" +QUEUE="queue;{create:always,node:{x-declare:{auto-delete:true}}}" +DURA_QUEUE="dqueue;{create:always,node:{durable:true,x-declare:{auto-delete:true}}}" +TOPIC="amq.topic/test" +DURA_TOPIC="amq.topic/test;{create:always,link:{durable:true}}" + +. setenv.sh + +waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; } +cleanup() +{ + pids=`ps aux | grep java | grep Perf | awk '{print $2}'` + if [ "$pids" != "" ]; then + kill -3 $pids + kill -9 $pids >/dev/null 2>&1 + fi +} + +# $1 test name +# $2 consumer options +# $3 producer options +run_testcase() +{ + sh run_sub.sh $LOG_CONFIG $SUB_MEM $2 > sub.out & + waitfor sub.out "Warming up" + sh run_pub.sh $LOG_CONFIG $PUB_MEM $3 > pub.out & + waitfor sub.out "Completed the test" + waitfor pub.out "Consumer has completed the test" + sleep 2 #give a grace period to shutdown + print_result $1 +} + +print_result() +{ + prod_rate=`cat pub.out | grep "Producer rate" | awk '{print $3}'` + sys_rate=`cat sub.out | grep "System Throughput" | awk '{print $4}'` + cons_rate=`cat sub.out | grep "Consumer rate" | awk '{print $4}'` + avg_latency=`cat sub.out | grep "Avg Latency" | awk '{print $4}'` + min_latency=`cat sub.out | grep "Min Latency" | awk '{print $4}'` + max_latency=`cat sub.out | grep "Max Latency" | awk '{print $4}'` + + printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11d|%11d|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency + echo "------------------------------------------------------------------------------------------------" +} + +trap cleanup EXIT + +echo "Test report on " `date +%F` +echo "================================================================================================" +echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|" +echo "------------------------------------------------------------------------------------------------" + +# The message counts and warmup counts are set to very low values for quick testing of the script. +# For a real performance run I recommend setting warmup count to 10k and message count in excess of 100k +# However for transactions, sync_publish and especially small durable transactions (which is quite slow) I recommend +# setting very low values to start with and experiment while increasing them slowly. + +# Test 1 Trans Queue +#run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10" + +# Test 2 Dura Queue +run_testcase "Dura_Queue" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" + +# Test 3 Dura Queue Sync +run_testcase "Dura_Queue_Sync" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent" + +# Test 4 Dura Queue Sync Publish and Ack +run_testcase "Dura_SyncPubAck" "-Daddress=$DURA_QUEUE -Ddurable=true -Dsync_ack=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent" + +# Test 5 Topic +run_testcase "Topic" "-Daddress=$TOPIC" "-Daddress=$TOPIC -Dwarmup_count=1 -Dmsg_count=10" + +# Test 6 Durable Topic +run_testcase "Dura_Topic" "-Daddress=$DURA_TOPIC -Ddurable=true" "-Daddress=$DURA_TOPIC -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" + +# Test 7 Fanout +run_testcase "Fanout" "-Daddress=amq.fanout" "-Daddress=amq.fanout -Dwarmup_count=1 -Dmsg_count=10" + +# Test 8 Small TX +run_testcase "Small_Txs_2" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=1" \ + "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=1" + +# Test 9 Large TX +run_testcase "Large_Txs_1000" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=10" \ + "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=10" + +# Test 10 256 MSG +run_testcase "Msg_256b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=256 -Dwarmup_count=1 -Dmsg_count=10" + +# Test 11 512 MSG +run_testcase "Msg_512b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=512 -Dwarmup_count=1 -Dmsg_count=10" + +# Test 12 2048 MSG +run_testcase "Msg_2048b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=2048 -Dwarmup_count=1 -Dmsg_count=10" + +# Test 13 Random size MSG +run_testcase "Random_Msg_Size" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10" + +# Test 14 Random size MSG Durable +run_testcase "Rand_Msg_Dura" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10" + +# Test 15 64K MSG +run_testcase "Msg_64K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Dwarmup_count=1 -Dmsg_count=10" + +# Test 16 Durable 64K MSG +run_testcase "Msg_Durable_64K" "-Daddress=$DURA_QUEUE -Ddurable=true -Damqj.tcpNoDelay=true" \ + "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" + +# Test 17 500K MSG +run_testcase "Msg_500K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Dwarmup_count=1 -Dmsg_count=10" + +# Test 18 Durable 500K MSG +run_testcase "Msg_Dura_500K" "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Ddurable=true" \ + "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" diff --git a/qpid/java/tools/bin/qpid-bench b/qpid/java/tools/bin/qpid-bench new file mode 100644 index 0000000000..c982e64efd --- /dev/null +++ b/qpid/java/tools/bin/qpid-bench @@ -0,0 +1,35 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +if [ -z "$QPID_HOME" ]; then + export QPID_HOME=$(dirname $(dirname $(readlink -f $0))) + export PATH=${PATH}:${QPID_HOME}/bin +fi + +# Set classpath to include Qpid jar with all required jars in manifest +QPID_LIBS=$QPID_HOME/lib/qpid-all.jar + +# Set other variables used by the qpid-run script before calling +export JAVA=java \ + JAVA_VM=-server \ + JAVA_MEM=-Xmx1024m \ + QPID_CLASSPATH=$QPID_LIBS + +. qpid-run org.apache.qpid.tools.QpidBench "$@" diff --git a/qpid/java/tools/bin/qpid-python-testkit b/qpid/java/tools/bin/qpid-python-testkit new file mode 100755 index 0000000000..cbe7972421 --- /dev/null +++ b/qpid/java/tools/bin/qpid-python-testkit @@ -0,0 +1,30 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This is wrapper script to run the tests defined in testkit.py +# via the python test runner. The defaults are set for a running +# from an svn checkout + +. ./set-testkit-env.sh + +export PYTHONPATH=./:$PYTHONPATH +rm -rf $OUTDIR +qpid-python-test -DOUTDIR=$OUTDIR -m testkit "$@" + diff --git a/qpid/java/tools/bin/run_pub.sh b/qpid/java/tools/bin/run_pub.sh new file mode 100644 index 0000000000..91b9287dea --- /dev/null +++ b/qpid/java/tools/bin/run_pub.sh @@ -0,0 +1,24 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +. $QPID_TEST_HOME/bin/setenv.sh + +echo "$@" +$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.tools.PerfProducer diff --git a/qpid/java/tools/bin/run_sub.sh b/qpid/java/tools/bin/run_sub.sh new file mode 100644 index 0000000000..c9ad2fed74 --- /dev/null +++ b/qpid/java/tools/bin/run_sub.sh @@ -0,0 +1,25 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +. $QPID_TEST_HOME/bin/setenv.sh + +echo "$@" +$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.tools.PerfConsumer + diff --git a/qpid/java/tools/bin/set-testkit-env.sh b/qpid/java/tools/bin/set-testkit-env.sh new file mode 100644 index 0000000000..051dad8179 --- /dev/null +++ b/qpid/java/tools/bin/set-testkit-env.sh @@ -0,0 +1,88 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# If QPIDD_EXEC ..etc is not set, it will first check to see +# if this is run from a qpid svn check out, if not it will look +# for installed rpms. + +abs_path() +{ + D=`dirname "$1"` + B=`basename "$1"` + echo "`cd \"$D\" 2>/dev/null && pwd || echo \"$D\"`/$B" +} + +# Environment for python tests + +if [ -d ../../../python ] ; then + PYTHON_DIR=../../../python + PYTHONPATH=$PYTHON_DIR:$PYTHON_DIR/qpid +elif [ -z `echo $PYTHONPATH | awk '$0 ~ /qpid/'` ]; then + echo "WARNING: skipping test, no qpid python scripts found ."; exit 0; +fi + + +if [ "$QPIDD_EXEC" = "" ] ; then + if [ -x ../../../cpp/src/qpidd ]; then + QPIDD_EXEC=`abs_path "../../../cpp/src/qpidd"` + elif [ -n "$(which qpidd)" ] ; then + QPIDD_EXEC=$(which qpidd) + else + echo "WARNING: skipping test, QPIDD_EXEC not set and qpidd not found."; exit 0; + fi +fi + +if [ "$CLUSTER_LIB" = "" ] ; then + if [ -x ../../../cpp/src/.libs/cluster.so ]; then + CLUSTER_LIB=`abs_path "../../../cpp/src/.libs/cluster.so"` + elif [ -e /usr/lib64/qpid/daemon/cluster.so ] ; then + CLUSTER_LIB="/usr/lib64/qpid/daemon/cluster.so" + elif [ -e /usr/lib/qpid/daemon/cluster.so ] ; then + CLUSTER_LIB="/usr/lib/qpid/daemon/cluster.so" + else + echo "WARNING: skipping test, CLUSTER_LIB not set and cluster.so not found."; exit 0; + fi +fi + +if [ "$STORE_LIB" = "" ] ; then + if [ -e /usr/lib64/qpid/daemon/msgstore.so ] ; then + STORE_LIB="/usr/lib64/qpid/daemon/msgstore.so" + elif [ -e /usr/lib/qpid/daemon/msgstore.so ] ; then + STORE_LIB="/usr/lib/qpid/daemon/msgstore.so" + #else + # echo "WARNING: skipping test, STORE_LIB not set and msgstore.so not found."; exit 0; + fi +fi + +if [ "$QP_CP" = "" ] ; then + if [ -d ../../build/lib/ ]; then + QP_JAR_PATH=`abs_path "../../build/lib/"` + elif [ -d /usr/share/java/qpid-deps ]; then + QP_JAR_PATH=`abs_path "/usr/share/java"` + else + "WARNING: skipping test, QP_CP not set and the Qpid jars are not present."; exit 0; + fi + QP_CP=`find $QP_JAR_PATH -name '*.jar' | tr '\n' ':'` +fi + +if [ "$OUTDIR" = "" ] ; then + OUTDIR=`abs_path "./output"` +fi + +export PYTHONPATH PYTHON_DIR QPIDD_EXEC CLUSTER_LIB QP_CP OUTDIR diff --git a/qpid/java/tools/bin/setenv.sh b/qpid/java/tools/bin/setenv.sh new file mode 100644 index 0000000000..24135e711b --- /dev/null +++ b/qpid/java/tools/bin/setenv.sh @@ -0,0 +1,49 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Compiles the test classes and sets the CLASSPATH + +# check for QPID_TEST_HOME +if [ "$QPID_TEST_HOME" = "" ] ; then + echo "ERROR: Please set QPID_TEST_HOME ...." + exit 1 +fi + +# check for JAVA_HOME +if [ "$JAVA_HOME" = "" ] ; then + echo "ERROR: Please set JAVA_HOME ...." + exit 1 +fi + +# VENDOR_LIB path needs to be set +# for Qpid set this to {qpid_checkout}/java/build/lib +if [ "$VENDOR_LIB" = "" ] ; then + echo "ERROR: Please set VENDOR_LIB path in the script ...." + exit 1 +fi + + +[ -d $QPID_TEST_HOME/classes ] || mkdir $QPID_TEST_HOME/classes + +CLASSPATH=`find $VENDOR_LIB -name *.jar* | tr '\n' ":"` +$JAVA_HOME/bin/javac -cp $CLASSPATH -d $QPID_TEST_HOME/classes -sourcepath $QPID_TEST_HOME/src `find $QPID_TEST_HOME/src -name '*.java'` + +export CLASSPATH=$QPID_TEST_HOME/classes:$CLASSPATH + diff --git a/qpid/java/tools/bin/testkit.py b/qpid/java/tools/bin/testkit.py new file mode 100755 index 0000000000..1c2ad598b8 --- /dev/null +++ b/qpid/java/tools/bin/testkit.py @@ -0,0 +1,278 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import time, string, traceback +from brokertest import * +from qpid.messaging import * + + +try: + import java.lang.System + _cp = java.lang.System.getProperty("java.class.path"); +except ImportError: + _cp = checkenv("QP_CP") + +class Formatter: + + def __init__(self, message): + self.message = message + self.environ = {"M": self.message, + "P": self.message.properties, + "C": self.message.content} + + def __getitem__(self, st): + return eval(st, self.environ) + +# The base test case has support for launching the generic +# receiver and sender through the TestLauncher with all the options. +# +class JavaClientTest(BrokerTest): + """Base Case for Java Test cases""" + + client_class = "org.apache.qpid.testkit.TestLauncher" + + # currently there is no transparent reconnection. + # temp hack: just creating the queue here and closing it. + def start_error_watcher(self,broker=None): + ssn = broker.connect().session() + err_watcher = ssn.receiver("control; {create:always}", capacity=1) + ssn.close() + + def store_module_args(self): + if BrokerTest.store_lib: + return ["--load-module", BrokerTest.store_lib] + else: + print "Store module not present." + return [""] + + def client(self,**options): + cmd = ["java","-cp",_cp] + + cmd += ["-Dtest_name=" + options.get("test_name", "UNKNOWN")] + cmd += ["-Dhost=" + options.get("host","127.0.0.1")] + cmd += ["-Dport=" + str(options.get("port",5672))] + cmd += ["-Dcon_count=" + str(options.get("con_count",1))] + cmd += ["-Dssn_per_con=" + str(options.get("ssn_per_con",1))] + cmd += ["-Duse_unique_dests=" + str(options.get("use_unique_dests",False))] + cmd += ["-Dcheck_for_dups=" + str(options.get("check_for_dups",False))] + cmd += ["-Ddurable=" + str(options.get("durable",False))] + cmd += ["-Dtransacted=" + str(options.get("transacted",False))] + cmd += ["-Dreceiver=" + str(options.get("receiver",False))] + cmd += ["-Dsync_rcv=" + str(options.get("sync_rcv",False))] + cmd += ["-Dsender=" + str(options.get("sender",False))] + cmd += ["-Dmsg_size=" + str(options.get("msg_size",256))] + cmd += ["-Dtx_size=" + str(options.get("tx_size",10))] + cmd += ["-Dmsg_count=" + str(options.get("msg_count",1000))] + cmd += ["-Dmax_prefetch=" + str(options.get("max_prefetch",500))] + cmd += ["-Dsync_ack=" + str(options.get("sync_ack",False))] + cmd += ["-Dsync_persistence=" + str(options.get("sync_pub",False))] + cmd += ["-Dsleep_time=" + str(options.get("sleep_time",1000))] + cmd += ["-Dfailover=" + options.get("failover", "failover_exchange")] + cmd += ["-Djms_durable_sub=" + str(options.get("jms_durable_sub", False))] + cmd += ["-Dlog.level=" + options.get("log.level", "warn")] + cmd += [self.client_class] + cmd += [options.get("address", "my_queue; {create: always}")] + + #print str(options.get("port",5672)) + return cmd + + # currently there is no transparent reconnection. + # temp hack: just creating a receiver and closing session soon after. + def monitor_clients(self,broker=None,run_time=600,error_ck_freq=60): + ssn = broker.connect().session() + err_watcher = ssn.receiver("control; {create:always}", capacity=1) + i = run_time/error_ck_freq + is_error = False + for j in range(i): + not_empty = True + while not_empty: + try: + m = err_watcher.fetch(timeout=error_ck_freq) + ssn.acknowledge() + print "Java process notified of an error" + self.print_error(m) + is_error = True + except messaging.Empty, e: + not_empty = False + + ssn.close() + return is_error + + def print_error(self,msg): + print msg.properties.get("exception-trace") + + def verify(self, receiver,sender): + sender_running = receiver.is_running() + receiver_running = sender.is_running() + + self.assertTrue(receiver_running,"Receiver has exited prematually") + self.assertTrue(sender_running,"Sender has exited prematually") + + def start_sender_and_receiver(self,**options): + + receiver_opts = options + receiver_opts["receiver"]=True + receiver = self.popen(self.client(**receiver_opts), + expect=EXPECT_RUNNING) + + sender_opts = options + sender_opts["sender"]=True + sender = self.popen(self.client(**sender_opts), + expect=EXPECT_RUNNING) + + return receiver, sender + + def start_cluster(self,count=2,expect=EXPECT_RUNNING,**options): + if options.get("durable",False)==True: + cluster = Cluster(self, count=count, expect=expect, args=self.store_module_args()) + else: + cluster = Cluster(self, count=count) + return cluster + +class ConcurrencyTest(JavaClientTest): + """A concurrency test suite for the JMS client""" + skip = False + + def base_case(self,**options): + if self.skip : + print "Skipping test" + return + + cluster = self.start_cluster(count=2,**options) + self.start_error_watcher(broker=cluster[0]) + options["port"] = port=cluster[0].port() + + options["use_unique_dests"]=True + options["address"]="amq.topic" + receiver, sender = self.start_sender_and_receiver(**options) + self.monitor_clients(broker=cluster[0],run_time=180) + self.verify(receiver,sender) + + def test_multiplexing_con(self): + """Tests multiple sessions on a single connection""" + + self.base_case(ssn_per_con=25,test_name=self.id()) + + def test_multiplexing_con_with_tx(self): + """Tests multiple transacted sessions on a single connection""" + + self.base_case(ssn_per_con=25,transacted=True,test_name=self.id()) + + def test_multiplexing_con_with_sync_rcv(self): + """Tests multiple sessions with sync receive""" + + self.base_case(ssn_per_con=25,sync_rcv=True,test_name=self.id()) + + def test_multiplexing_con_with_durable_sub(self): + """Tests multiple sessions with durable subs""" + + self.base_case(ssn_per_con=25,durable=True,jms_durable_sub=True,test_name=self.id()) + + def test_multiplexing_con_with_sync_ack(self): + """Tests multiple sessions with sync ack""" + + self.base_case(ssn_per_con=25,sync_ack=True,test_name=self.id()) + + def test_multiplexing_con_with_sync_pub(self): + """Tests multiple sessions with sync pub""" + + self.base_case(ssn_per_con=25,sync_pub=True,durable=True,test_name=self.id()) + + def test_multiple_cons_and_ssns(self): + """Tests multiple connections and sessions""" + + self.base_case(con_count=10,ssn_per_con=25,test_name=self.id()) + + +class SoakTest(JavaClientTest): + """A soak test suite for the JMS client""" + + def base_case(self,**options): + cluster = self.start_cluster(count=4, expect=EXPECT_EXIT_FAIL,**options) + options["port"] = port=cluster[0].port() + self.start_error_watcher(broker=cluster[0]) + options["use_unique_dests"]=True + options["address"]="amq.topic" + receiver,sender = self.start_sender_and_receiver(**options) + is_error = self.monitor_clients(broker=cluster[0],run_time=30,error_ck_freq=30) + + if (is_error): + print "The sender or receiver didn't start properly. Exiting test." + return + else: + "Print no error !" + + # grace period for java clients to get the failover properly setup. + time.sleep(30) + error_msg= None + # Kill original brokers, start new ones. + try: + for i in range(8): + cluster[i].kill() + b=cluster.start() + self.monitor_clients(broker=b,run_time=30,error_ck_freq=30) + print "iteration : " + str(i) + except ConnectError, e1: + error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1) + + except SessionError, e2: + error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2) + + self.verify(receiver,sender) + if error_msg: + raise Exception(error_msg) + + + def test_failover(self) : + """Test basic failover""" + + self.base_case(test_name=self.id()) + + + def test_failover_with_durablesub(self): + """Test failover with durable subscriber""" + + self.base_case(durable=True,jms_durable_sub=True,test_name=self.id()) + + + def test_failover_with_sync_rcv(self): + """Test failover with sync receive""" + + self.base_case(sync_rcv=True,test_name=self.id()) + + + def test_failover_with_sync_ack(self): + """Test failover with sync ack""" + + self.base_case(sync_ack=True,test_name=self.id()) + + + def test_failover_with_noprefetch(self): + """Test failover with no prefetch""" + + self.base_case(max_prefetch=1,test_name=self.id()) + + + def test_failover_with_multiple_cons_and_ssns(self): + """Test failover with multiple connections and sessions""" + + self.base_case(use_unique_dests=True,address="amq.topic", + con_count=10,ssn_per_con=25,test_name=self.id()) diff --git a/qpid/java/tools/build.xml b/qpid/java/tools/build.xml new file mode 100644 index 0000000000..7cd1b1172c --- /dev/null +++ b/qpid/java/tools/build.xml @@ -0,0 +1,27 @@ +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<project name="Qpid Tools" default="build"> + + <property name="module.depends" value="client common"/> + + <import file="../module.xml"/> + +</project> diff --git a/qpid/java/tools/etc/test.log4j b/qpid/java/tools/etc/test.log4j new file mode 100644 index 0000000000..b574a7b5b7 --- /dev/null +++ b/qpid/java/tools/etc/test.log4j @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +log4j.rootLogger=${root.logging.level} + +log4j.logger.org.apache.qpid=ERROR, console +log4j.additivity.org.apache.qpid=false + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=all +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n + diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java new file mode 100644 index 0000000000..b10129d855 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java @@ -0,0 +1,154 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit; + + +import java.text.DateFormat; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.text.SimpleDateFormat; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Session; + +public abstract class Client implements ExceptionListener +{ + private Connection con; + private Session ssn; + private boolean durable = false; + private boolean transacted = false; + private int txSize = 10; + private int ack_mode = Session.AUTO_ACKNOWLEDGE; + private String contentType = "application/octet-stream"; + + private long reportFrequency = 60000; // every min + + private DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); + private NumberFormat nf = new DecimalFormat("##.00"); + + private long startTime = System.currentTimeMillis(); + private ErrorHandler errorHandler = null; + + public Client(Connection con) throws Exception + { + this.con = con; + this.con.setExceptionListener(this); + durable = Boolean.getBoolean("durable"); + transacted = Boolean.getBoolean("transacted"); + txSize = Integer.getInteger("tx_size",10); + contentType = System.getProperty("content_type","application/octet-stream"); + reportFrequency = Long.getLong("report_frequency", 60000); + } + + public void close() + { + try + { + con.close(); + } + catch (Exception e) + { + handleError("Error closing connection",e); + } + } + + public void onException(JMSException e) + { + handleError("Connection error",e); + } + + public void setErrorHandler(ErrorHandler h) + { + this.errorHandler = h; + } + + public void handleError(String msg,Exception e) + { + if (errorHandler != null) + { + errorHandler.handleError(msg, e); + } + else + { + System.err.println(msg); + e.printStackTrace(); + } + } + + protected Session getSsn() + { + return ssn; + } + + protected void setSsn(Session ssn) + { + this.ssn = ssn; + } + + protected boolean isDurable() + { + return durable; + } + + protected boolean isTransacted() + { + return transacted; + } + + protected int getTxSize() + { + return txSize; + } + + protected int getAck_mode() + { + return ack_mode; + } + + protected String getContentType() + { + return contentType; + } + + protected long getReportFrequency() + { + return reportFrequency; + } + + protected long getStartTime() + { + return startTime; + } + + protected void setStartTime(long startTime) + { + this.startTime = startTime; + } + + public DateFormat getDf() + { + return df; + } + +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java new file mode 100644 index 0000000000..dbc73c404f --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java @@ -0,0 +1,27 @@ +package org.apache.qpid.testkit; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +public interface ErrorHandler { + + public void handleError(String msg,Exception e); +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java new file mode 100644 index 0000000000..b4294ee4cc --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java @@ -0,0 +1,216 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit; + + +import java.util.ArrayList; +import java.util.List; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQConnection; + +/** + * A generic receiver which consumes messages + * from a given address in a broker (host/port) + * until told to stop by killing it. + * + * It participates in a feedback loop to ensure the producer + * doesn't fill up the queue. If it receives an "End" msg + * it sends a reply to the replyTo address in that msg. + * + * It doesn't check for correctness or measure anything + * leaving those concerns to another entity. + * However it prints a timestamp every x secs(-Dreport_frequency) + * as checkpoint to figure out how far the test has progressed if + * a failure occurred. + * + * It also takes in an optional Error handler to + * pass out any error in addition to writing them to std err. + * + * This is intended more as building block to create + * more complex test cases. However there is a main method + * provided to use this standalone. + * + * The following options are available and configurable + * via jvm args. + * + * sync_rcv - Whether to consume sync (instead of using a listener). + * report_frequency - how often a timestamp is printed + * durable + * transacted + * tx_size - size of transaction batch in # msgs. * + * check_for_dups - check for duplicate messages and out of order messages. + * jms_durable_sub - create a durable subscription instead of a regular subscription. + */ +public class Receiver extends Client implements MessageListener +{ + long msg_count = 0; + int sequence = 0; + boolean syncRcv = Boolean.getBoolean("sync_rcv"); + boolean jmsDurableSub = Boolean.getBoolean("jms_durable_sub"); + boolean checkForDups = Boolean.getBoolean("check_for_dups"); + MessageConsumer consumer; + List<Integer> duplicateMessages = new ArrayList<Integer>(); + + public Receiver(Connection con,String addr) throws Exception + { + super(con); + setSsn(con.createSession(isTransacted(), getAck_mode())); + consumer = getSsn().createConsumer(new AMQAnyDestination(addr)); + if (!syncRcv) + { + consumer.setMessageListener(this); + } + + System.out.println("Receiving messages from : " + addr); + } + + public void onMessage(Message msg) + { + handleMessage(msg); + } + + public void run() throws Exception + { + long sleepTime = getReportFrequency(); + while(true) + { + if(syncRcv) + { + long t = sleepTime; + while (t > 0) + { + long start = System.currentTimeMillis(); + Message msg = consumer.receive(t); + t = t - (System.currentTimeMillis() - start); + handleMessage(msg); + } + } + Thread.sleep(sleepTime); + System.out.println(getDf().format(System.currentTimeMillis()) + + " - messages received : " + msg_count); + } + } + + private void handleMessage(Message m) + { + if (m == null) { return; } + + try + { + if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End")) + { + MessageProducer temp = getSsn().createProducer(m.getJMSReplyTo()); + Message controlMsg = getSsn().createTextMessage(); + temp.send(controlMsg); + if (isTransacted()) + { + getSsn().commit(); + } + temp.close(); + } + else + { + + int seq = m.getIntProperty("sequence"); + if (checkForDups) + { + if (seq == 0) + { + sequence = 0; // wrap around for each iteration + System.out.println("Received " + duplicateMessages.size() + " duplicate messages during the iteration"); + duplicateMessages.clear(); + } + + if (seq < sequence) + { + duplicateMessages.add(seq); + } + else if (seq == sequence) + { + sequence++; + msg_count ++; + } + else + { + // Multiple publishers are not allowed in this test case. + // So out of order messages are not allowed. + throw new Exception(": Received an out of order message (expected=" + + sequence + ",received=" + seq + ")" ); + } + } + else + { + msg_count ++; + } + + // Please note that this test case doesn't expect duplicates + // When testing for transactions. + if (isTransacted() && msg_count % getTxSize() == 0) + { + getSsn().commit(); + } + } + } + catch (Exception e) + { + e.printStackTrace(); + handleError("Exception receiving messages",e); + } + } + + // Receiver host port address + public static void main(String[] args) throws Exception + { + String host = "127.0.0.1"; + int port = 5672; + String addr = "message_queue"; + + if (args.length > 0) + { + host = args[0]; + } + if (args.length > 1) + { + port = Integer.parseInt(args[1]); + } + if (args.length > 2) + { + addr = args[2]; + } + + AMQConnection con = new AMQConnection( + "amqp://username:password@topicClientid/test?brokerlist='tcp://" + + host + ":" + port + "'"); + + Receiver rcv = new Receiver(con,addr); + rcv.run(); + } + +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java new file mode 100644 index 0000000000..14b9b7302f --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java @@ -0,0 +1,197 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit; + + +import java.text.DateFormat; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.text.SimpleDateFormat; +import java.util.Random; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.tools.MessageFactory; + +/** + * A generic sender which sends a stream of messages + * to a given address in a broker (host/port) + * until told to stop by killing it. + * + * It has a feedback loop to ensure it doesn't fill + * up queues due to a slow consumer. + * + * It doesn't check for correctness or measure anything + * leaving those concerns to another entity. + * However it prints a timestamp every x secs(-Dreport_frequency) + * as checkpoint to figure out how far the test has progressed if + * a failure occurred. + * + * It also takes in an optional Error handler to + * pass out any error in addition to writing them to std err. + * + * This is intended more as building block to create + * more complex test cases. However there is a main method + * provided to use this standalone. + * + * The following options are available and configurable + * via jvm args. + * + * msg_size (256) + * msg_count (10) - # messages before waiting for feedback + * sleep_time (1000 ms) - sleep time btw each iteration + * report_frequency - how often a timestamp is printed + * durable + * transacted + * tx_size - size of transaction batch in # msgs. + */ +public class Sender extends Client +{ + protected int msg_size = 256; + protected int msg_count = 10; + protected int iterations = -1; + protected long sleep_time = 1000; + + protected Destination dest = null; + protected Destination replyTo = null; + protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); + protected NumberFormat nf = new DecimalFormat("##.00"); + + protected MessageProducer producer; + Random gen = new Random(19770905); + + public Sender(Connection con,String addr) throws Exception + { + super(con); + this.msg_size = Integer.getInteger("msg_size", 100); + this.msg_count = Integer.getInteger("msg_count", 10); + this.iterations = Integer.getInteger("iterations", -1); + this.sleep_time = Long.getLong("sleep_time", 1000); + this.setSsn(con.createSession(isTransacted(),Session.AUTO_ACKNOWLEDGE)); + this.dest = new AMQAnyDestination(addr); + this.producer = getSsn().createProducer(dest); + this.replyTo = getSsn().createTemporaryQueue(); + + System.out.println("Sending messages to : " + addr); + } + + /* + * If msg_size not specified it generates a message + * between 500-1500 bytes. + */ + protected Message getNextMessage() throws Exception + { + int s = msg_size == -1 ? 500 + gen.nextInt(1000) : msg_size; + Message msg = (getContentType().equals("text/plain")) ? + MessageFactory.createTextMessage(getSsn(), s): + MessageFactory.createBytesMessage(getSsn(), s); + + msg.setJMSDeliveryMode((isDurable()) ? DeliveryMode.PERSISTENT + : DeliveryMode.NON_PERSISTENT); + return msg; + } + + public void run() + { + try + { + boolean infinite = (iterations == -1); + for (int x=0; infinite || x < iterations; x++) + { + long now = System.currentTimeMillis(); + if (now - getStartTime() >= getReportFrequency()) + { + System.out.println(df.format(now) + " - iterations : " + x); + setStartTime(now); + } + + for (int i = 0; i < msg_count; i++) + { + Message msg = getNextMessage(); + msg.setIntProperty("sequence",i); + producer.send(msg); + if (isTransacted() && msg_count % getTxSize() == 0) + { + getSsn().commit(); + } + } + TextMessage m = getSsn().createTextMessage("End"); + m.setJMSReplyTo(replyTo); + producer.send(m); + + if (isTransacted()) + { + getSsn().commit(); + } + + MessageConsumer feedbackConsumer = getSsn().createConsumer(replyTo); + feedbackConsumer.receive(); + feedbackConsumer.close(); + if (isTransacted()) + { + getSsn().commit(); + } + Thread.sleep(sleep_time); + } + } + catch (Exception e) + { + handleError("Exception sending messages",e); + } + } + + // Receiver host port address + public static void main(String[] args) throws Exception + { + String host = "127.0.0.1"; + int port = 5672; + String addr = "message_queue"; + + if (args.length > 0) + { + host = args[0]; + } + if (args.length > 1) + { + port = Integer.parseInt(args[1]); + } + if (args.length > 2) + { + addr = args[2]; + } + + AMQConnection con = new AMQConnection( + "amqp://username:password@topicClientid/test?brokerlist='tcp://" + + host + ":" + port + "'"); + + Sender sender = new Sender(con,addr); + sender.run(); + } +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java new file mode 100644 index 0000000000..72ca48e1c9 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java @@ -0,0 +1,370 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit; + + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.text.DateFormat; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.PatternLayout; +import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.thread.Threading; + +/** + * A basic test case class that could launch a Sender/Receiver + * or both, each on it's own separate thread. + * + * If con_count == ssn_count, then each entity created will have + * it's own Connection. Else if con_count < ssn_count, then + * a connection will be shared by ssn_count/con_count # of entities. + * + * The if both sender and receiver options are set, it will + * share a connection. + * + * The following options are available as jvm args + * host, port + * con_count,ssn_count + * con_idle_time - which determines heartbeat + * sender, receiver - booleans which indicate which entity to create. + * Setting them both is also a valid option. + */ +public class TestLauncher implements ErrorHandler +{ + protected String host = "127.0.0.1"; + protected int port = 5672; + protected int sessions_per_con = 1; + protected int connection_count = 1; + protected long heartbeat = 5000; + protected boolean sender = false; + protected boolean receiver = false; + protected boolean useUniqueDests = false; + protected String url; + + protected String address = "my_queue; {create: always}"; + protected boolean durable = false; + protected String failover = ""; + protected AMQConnection controlCon; + protected Destination controlDest = null; + protected Session controlSession = null; + protected MessageProducer statusSender; + protected List<AMQConnection> clients = new ArrayList<AMQConnection>(); + protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); + protected NumberFormat nf = new DecimalFormat("##.00"); + protected String testName; + + public TestLauncher() + { + testName = System.getProperty("test_name","UNKNOWN"); + host = System.getProperty("host", "127.0.0.1"); + port = Integer.getInteger("port", 5672); + sessions_per_con = Integer.getInteger("ssn_per_con", 1); + connection_count = Integer.getInteger("con_count", 1); + heartbeat = Long.getLong("heartbeat", 5); + sender = Boolean.getBoolean("sender"); + receiver = Boolean.getBoolean("receiver"); + useUniqueDests = Boolean.getBoolean("use_unique_dests"); + + failover = System.getProperty("failover", ""); + durable = Boolean.getBoolean("durable"); + + url = "amqp://username:password@topicClientid/test?brokerlist='tcp://" + + host + ":" + port + "?heartbeat='" + heartbeat+ "''"; + + if (failover.equalsIgnoreCase("failover_exchange")) + { + url += "&failover='failover_exchange'"; + + System.out.println("Failover exchange " + url ); + } + + configureLogging(); + } + + protected void configureLogging() + { + PatternLayout layout = new PatternLayout(); + layout.setConversionPattern("%t %d %p [%c{4}] %m%n"); + BasicConfigurator.configure(new ConsoleAppender(layout)); + + String logLevel = System.getProperty("log.level","warn"); + String logComponent = System.getProperty("log.comp","org.apache.qpid"); + + Logger logger = Logger.getLogger(logComponent); + logger.setLevel(Level.toLevel(logLevel, Level.WARN)); + + System.out.println("Level " + logger.getLevel()); + + } + + public void setUpControlChannel() + { + try + { + controlCon = new AMQConnection(url); + controlCon.start(); + + controlDest = new AMQAnyDestination("control; {create: always}"); // durable + + // Create the session to setup the messages + controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE); + statusSender = controlSession.createProducer(controlDest); + + } + catch (Exception e) + { + handleError("Error while setting up the test",e); + } + } + + public void cleanup() + { + try + { + controlSession.close(); + controlCon.close(); + for (AMQConnection con : clients) + { + con.close(); + } + } + catch (Exception e) + { + handleError("Error while tearing down the test",e); + } + } + + public void start(String addr) + { + try + { + if (addr == null) + { + addr = address; + } + + int ssn_per_con = sessions_per_con; + String addrTemp = addr; + for (int i = 0; i< connection_count; i++) + { + AMQConnection con = new AMQConnection(url); + con.start(); + clients.add(con); + for (int j = 0; j< ssn_per_con; j++) + { + String index = createPrefix(i,j); + if (useUniqueDests) + { + addrTemp = modifySubject(index,addr); + } + + if (sender) + { + createSender(index,con,addrTemp,this); + } + + if (receiver) + { + System.out.println("########## Creating receiver ##################"); + + createReceiver(index,con,addrTemp,this); + } + } + } + } + catch (Exception e) + { + handleError("Exception while setting up the test",e); + } + + } + + protected void createReceiver(String index,final AMQConnection con, final String addr, final ErrorHandler h) + { + Runnable r = new Runnable() + { + public void run() + { + try + { + Receiver rcv = new Receiver(con,addr); + rcv.setErrorHandler(h); + rcv.run(); + } + catch (Exception e) + { + h.handleError("Error Starting Receiver", e); + } + } + }; + + Thread t = null; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + handleError("Error creating Receive thread",e); + } + + t.setName("ReceiverThread-" + index); + t.start(); + } + + protected void createSender(String index,final AMQConnection con, final String addr, final ErrorHandler h) + { + Runnable r = new Runnable() + { + public void run() + { + try + { + Sender sender = new Sender(con, addr); + sender.setErrorHandler(h); + sender.run(); + } + catch (Exception e) + { + h.handleError("Error Starting Sender", e); + } + } + }; + + Thread t = null; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + handleError("Error creating Sender thread",e); + } + + t.setName("SenderThread-" + index); + t.start(); + } + + public synchronized void handleError(String msg,Exception e) + { + // In case sending the message fails + StringBuilder sb = new StringBuilder(); + sb.append(msg); + sb.append(" @ "); + sb.append(df.format(new Date(System.currentTimeMillis()))); + sb.append(" "); + sb.append(e.getMessage()); + System.err.println(sb.toString()); + e.printStackTrace(); + + try + { + TextMessage errorMsg = controlSession.createTextMessage(); + errorMsg.setStringProperty("status", "error"); + errorMsg.setStringProperty("desc", msg); + errorMsg.setStringProperty("time", df.format(new Date(System.currentTimeMillis()))); + errorMsg.setStringProperty("exception-trace", serializeStackTrace(e)); + + System.out.println("Msg " + errorMsg); + + statusSender.send(errorMsg); + } + catch (JMSException e1) + { + e1.printStackTrace(); + } + } + + private String serializeStackTrace(Exception e) + { + ByteArrayOutputStream bOut = new ByteArrayOutputStream(); + PrintStream printStream = new PrintStream(bOut); + e.printStackTrace(printStream); + printStream.close(); + return bOut.toString(); + } + + private String createPrefix(int i, int j) + { + return String.valueOf(i).concat(String.valueOf(j)); + } + + /** + * A basic helper function to modify the subjects by + * appending an index. + */ + private String modifySubject(String index,String addr) + { + if (addr.indexOf("/") > 0) + { + addr = addr.substring(0,addr.indexOf("/")+1) + + index + + addr.substring(addr.indexOf("/")+1,addr.length()); + } + else if (addr.indexOf(";") > 0) + { + addr = addr.substring(0,addr.indexOf(";")) + + "/" + index + + addr.substring(addr.indexOf(";"),addr.length()); + } + else + { + addr = addr + "/" + index; + } + + return addr; + } + + public static void main(String[] args) + { + final TestLauncher test = new TestLauncher(); + test.setUpControlChannel(); + System.out.println("args.length " + args.length); + System.out.println("args [0] " + args [0]); + test.start(args.length > 0 ? args [0] : null); + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { test.cleanup(); } + }); + + } +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java new file mode 100644 index 0000000000..2390516ef0 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java @@ -0,0 +1,200 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.tools; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.jms.FailoverPolicy; + +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.util.Properties; +import java.util.Hashtable; +import java.util.Enumeration; +import java.util.List; +import java.util.LinkedList; +import java.io.IOException; +import java.io.File; +import java.io.FileInputStream; + +public class JNDICheck +{ + private static final String QUEUE = "queue."; + private static final String TOPIC = "topic."; + private static final String DESTINATION = "destination."; + private static final String CONNECTION_FACTORY = "connectionfactory."; + + public static void main(String[] args) + { + + if (args.length != 1) + { + usage(); + } + + String propertyFile = args[0]; + + new JNDICheck(propertyFile); + } + + private static void usage() + { + exit("Usage: JNDICheck <JNDI Config file>", 0); + } + + private static void exit(String message, int exitCode) + { + System.err.println(message); + System.exit(exitCode); + } + + private static String JAVA_NAMING = "java.naming.factory.initial"; + + Context _context = null; + Hashtable _environment = null; + + public JNDICheck(String propertyFile) + { + + // Load JNDI properties + Properties properties = new Properties(); + + try + { + properties.load(new FileInputStream(new File(propertyFile))); + } + catch (IOException e) + { + exit("Unable to open property file:" + propertyFile + ". Due to:" + e.getMessage(), 1); + } + + //Create the initial context + try + { + + System.setProperty(JAVA_NAMING, properties.getProperty(JAVA_NAMING)); + + _context = new InitialContext(properties); + + _environment = _context.getEnvironment(); + + Enumeration keys = _environment.keys(); + + List<String> queues = new LinkedList<String>(); + List<String> topics = new LinkedList<String>(); + List<String> destinations = new LinkedList<String>(); + List<String> connectionFactories = new LinkedList<String>(); + + while (keys.hasMoreElements()) + { + String key = keys.nextElement().toString(); + + if (key.startsWith(QUEUE)) + { + queues.add(key); + } + else if (key.startsWith(TOPIC)) + { + topics.add(key); + } + else if (key.startsWith(DESTINATION)) + { + destinations.add(key); + } + else if (key.startsWith(CONNECTION_FACTORY)) + { + connectionFactories.add(key); + } + } + + printHeader(propertyFile); + printEntries(QUEUE, queues); + printEntries(TOPIC, topics); + printEntries(DESTINATION, destinations); + printEntries(CONNECTION_FACTORY, connectionFactories); + + } + catch (NamingException e) + { + exit("Unable to load JNDI Context due to:" + e.getMessage(), 1); + } + + } + + private void printHeader(String file) + { + print("JNDI file :" + file); + } + + private void printEntries(String type, List<String> list) + { + if (list.size() > 0) + { + String name = type.substring(0, 1).toUpperCase() + type.substring(1, type.length() - 1); + print(name + " elements in file:"); + printList(list); + print(""); + } + } + + private void printList(List<String> list) + { + for (String item : list) + { + String key = item.substring(item.indexOf('.') + 1); + + try + { + print(key, _context.lookup(key)); + } + catch (NamingException e) + { + exit("Error: item " + key + " no longer in context.", 1); + } + } + } + + private void print(String key, Object object) + { + if (object instanceof AMQDestination) + { + print(key + ":" + object); + } + else if (object instanceof AMQConnectionFactory) + { + AMQConnectionFactory factory = (AMQConnectionFactory) object; + print(key + ":Connection"); + print("ConnectionURL:"); + print(factory.getConnectionURL().toString()); + print("FailoverPolicy"); + print(new FailoverPolicy(factory.getConnectionURL(),null).toString()); + print(""); + } + } + + private void print(String msg) + { + System.out.println(msg); + } + +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java new file mode 100644 index 0000000000..b88b242e6d --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java @@ -0,0 +1,349 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.io.FileOutputStream; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; + +import org.apache.qpid.thread.Threading; + +/** + * Latency test sends an x number of messages in warmup mode and wait for a confirmation + * from the consumer that it has successfully consumed them and ready to start the + * test. It will start sending y number of messages and each message will contain a time + * stamp. This will be used at the receiving end to measure the latency. + * + * It is important to have a sufficiently large number for the warmup count to + * ensure the system is in steady state before the test is started. + * + * If you plan to plot the latencies then msg_count should be a smaller number (ex 500 or 1000) + * You also need to specify a file name using -Dfile=/home/rajith/latency.log.1 + * + * The idea is to get a latency sample for the system once it achieves steady state. + * + */ + +public class LatencyTest extends PerfBase implements MessageListener +{ + MessageProducer producer; + MessageConsumer consumer; + Message msg; + byte[] payload; + long maxLatency = 0; + long minLatency = Long.MAX_VALUE; + long totalLatency = 0; // to calculate avg latency. + int rcvdMsgCount = 0; + double stdDev = 0; + double avgLatency = 0; + boolean warmup_mode = true; + boolean transacted = false; + int transSize = 0; + + final List<Long> latencies; + final Lock lock = new ReentrantLock(); + final Condition warmedUp; + final Condition testCompleted; + + public LatencyTest() + { + super(); + warmedUp = lock.newCondition(); + testCompleted = lock.newCondition(); + // Storing the following two for efficiency + transacted = params.isTransacted(); + transSize = params.getTransactionSize(); + latencies = new ArrayList <Long>(params.getMsgCount()); + } + + public void setUp() throws Exception + { + super.setUp(); + consumer = session.createConsumer(dest); + consumer.setMessageListener(this); + + // if message caching is enabled we pre create the message + // else we pre create the payload + if (params.isCacheMessage()) + { + msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); + msg.setJMSDeliveryMode(params.isDurable()? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + } + else + { + payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); + } + + producer = session.createProducer(dest); + producer.setDisableMessageID(params.isDisableMessageID()); + producer.setDisableMessageTimestamp(params.isDisableTimestamp()); + } + + protected Message getNextMessage() throws Exception + { + if (params.isCacheMessage()) + { + return msg; + } + else + { + msg = session.createBytesMessage(); + ((BytesMessage)msg).writeBytes(payload); + return msg; + } + } + + public void warmup()throws Exception + { + System.out.println("Warming up......"); + int count = params.getWarmupCount(); + for (int i=0; i < count; i++) + { + producer.send(getNextMessage()); + } + Message msg = session.createTextMessage("End"); + producer.send(msg); + + if (params.isTransacted()) + { + session.commit(); + } + + try + { + lock.lock(); + warmedUp.await(); + } + finally + { + lock.unlock(); + } + } + + public void onMessage(Message msg) + { + try + { + if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End")) + { + if (warmup_mode) + { + warmup_mode = false; + try + { + lock.lock(); + warmedUp.signal(); + } + finally + { + lock.unlock(); + } + } + else + { + computeStats(); + } + } + else if (!warmup_mode) + { + long time = System.currentTimeMillis(); + rcvdMsgCount ++; + + if (transacted && (rcvdMsgCount % transSize == 0)) + { + session.commit(); + } + + long latency = time - msg.getJMSTimestamp(); + latencies.add(latency); + totalLatency = totalLatency + latency; + } + + } + catch(Exception e) + { + handleError(e,"Error when receiving messages"); + } + + } + + private void computeStats() + { + avgLatency = (double)totalLatency/(double)rcvdMsgCount; + double sigma = 0; + + for (long latency: latencies) + { + maxLatency = Math.max(maxLatency, latency); + minLatency = Math.min(minLatency, latency); + sigma = sigma + Math.pow(latency - avgLatency,2); + } + + stdDev = Math.sqrt(sigma/(rcvdMsgCount -1)); + + try + { + lock.lock(); + testCompleted.signal(); + } + finally + { + lock.unlock(); + } + } + + public void writeToFile() throws Exception + { + String fileName = System.getProperty("file"); + PrintWriter writer = new PrintWriter(new FileOutputStream(fileName)); + for (long latency: latencies) + { + writer.println(String.valueOf(latency)); + } + writer.flush(); + writer.close(); + } + + public void printToConsole() + { + System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); + System.out.println(new StringBuilder("Standard Deviation : "). + append(df.format(stdDev)). + append(" ms").toString()); + System.out.println(new StringBuilder("Avg Latency : "). + append(df.format(avgLatency)). + append(" ms").toString()); + System.out.println(new StringBuilder("Min Latency : "). + append(minLatency). + append(" ms").toString()); + System.out.println(new StringBuilder("Max Latency : "). + append(maxLatency). + append(" ms").toString()); + System.out.println("Completed the test......\n"); + } + + public void startTest() throws Exception + { + System.out.println("Starting test......"); + int count = params.getMsgCount(); + + for(int i=0; i < count; i++ ) + { + Message msg = getNextMessage(); + msg.setJMSTimestamp(System.currentTimeMillis()); + producer.send(msg); + if ( transacted && ((i+1) % transSize == 0)) + { + session.commit(); + } + } + Message msg = session.createTextMessage("End"); + producer.send(msg); + if (params.isTransacted()) + { + session.commit(); + } + } + + public void tearDown() throws Exception + { + try + { + lock.lock(); + testCompleted.await(); + } + finally + { + lock.unlock(); + } + + producer.close(); + consumer.close(); + session.close(); + con.close(); + } + + public void test() + { + try + { + setUp(); + warmup(); + startTest(); + tearDown(); + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + + public static void main(String[] args) + { + final LatencyTest latencyTest = new LatencyTest(); + Runnable r = new Runnable() + { + public void run() + { + latencyTest.test(); + latencyTest.printToConsole(); + if (System.getProperty("file") != null) + { + try + { + latencyTest.writeToFile(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating latency test thread",e); + } + t.start(); + } +}
\ No newline at end of file diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java new file mode 100644 index 0000000000..8ab1379fce --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java @@ -0,0 +1,64 @@ +package org.apache.qpid.tools; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; +import javax.jms.TextMessage; + +public class MessageFactory +{ + public static Message createBytesMessage(Session ssn, int size) throws JMSException + { + BytesMessage msg = ssn.createBytesMessage(); + msg.writeBytes(createMessagePayload(size).getBytes()); + return msg; + } + + public static Message createTextMessage(Session ssn, int size) throws JMSException + { + TextMessage msg = ssn.createTextMessage(); + msg.setText(createMessagePayload(size)); + return msg; + } + + public static String createMessagePayload(int size) + { + String msgData = "Qpid Test Message"; + + StringBuffer buf = new StringBuffer(size); + int count = 0; + while (count <= (size - msgData.length())) + { + buf.append(msgData); + count += msgData.length(); + } + if (count < size) + { + buf.append(msgData, 0, size - count); + } + + return buf.toString(); + } +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java new file mode 100644 index 0000000000..ac597d17de --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.text.DecimalFormat; +import java.util.Hashtable; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Session; +import javax.naming.Context; +import javax.naming.InitialContext; + +import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQConnection; + +public class PerfBase +{ + TestParams params; + Connection con; + Session session; + Destination dest; + Destination feedbackDest; + DecimalFormat df = new DecimalFormat("###.##"); + + public PerfBase() + { + params = new TestParams(); + } + + public void setUp() throws Exception + { + + if (params.getHost().equals("") || params.getPort() == -1) + { + con = new AMQConnection(params.getUrl()); + } + else + { + con = new AMQConnection(params.getHost(),params.getPort(),"guest","guest","test","test"); + } + con.start(); + session = con.createSession(params.isTransacted(), + params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode()); + + dest = new AMQAnyDestination(params.getAddress()); + } + + public void handleError(Exception e,String msg) + { + StringBuilder sb = new StringBuilder(); + sb.append(msg); + sb.append(" "); + sb.append(e.getMessage()); + System.err.println(sb.toString()); + e.printStackTrace(); + } +} + diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java new file mode 100644 index 0000000000..0ef0455a64 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java @@ -0,0 +1,267 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; + +import org.apache.qpid.thread.Threading; + +/** + * PerfConsumer will receive x no of messages in warmup mode. + * Once it receives the Start message it will then signal the PerfProducer. + * It will start recording stats from the first message it receives after + * the warmup mode is done. + * + * The following calculations are done. + * The important numbers to look at is + * a) Avg Latency + * b) System throughput. + * + * Latency. + * ========= + * Currently this test is written with the assumption that either + * a) The Perf Producer and Consumer are on the same machine + * b) They are on separate machines that have their time synced via a Time Server + * + * In order to calculate latency the producer inserts a timestamp + * hen the message is sent. The consumer will note the current time the message is + * received and will calculate the latency as follows + * latency = rcvdTime - msg.getJMSTimestamp() + * + * Through out the test it will keep track of the max and min latency to show the + * variance in latencies. + * + * Avg latency is measured by adding all latencies and dividing by the total msgs. + * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount + * + * Throughput + * =========== + * System throughput is calculated as follows + * rcvdMsgCount/(rcvdTime - testStartTime) + * + * Consumer rate is calculated as + * rcvdMsgCount/(rcvdTime - startTime) + * + * Note that the testStartTime referes to when the producer sent the first message + * and startTime is when the consumer first received a message. + * + * rcvdTime keeps track of when the last message is received. + * + * All throughput rates are given as msg/sec so the rates are multiplied by 1000. + * + */ + +public class PerfConsumer extends PerfBase implements MessageListener +{ + MessageConsumer consumer; + long maxLatency = 0; + long minLatency = Long.MAX_VALUE; + long totalLatency = 0; // to calculate avg latency. + int rcvdMsgCount = 0; + long testStartTime = 0; // to measure system throughput + long startTime = 0; // to measure consumer throughput + long rcvdTime = 0; + boolean transacted = false; + int transSize = 0; + + final Object lock = new Object(); + + public PerfConsumer() + { + super(); + } + + public void setUp() throws Exception + { + super.setUp(); + consumer = session.createConsumer(dest); + + // Storing the following two for efficiency + transacted = params.isTransacted(); + transSize = params.getTransactionSize(); + } + + public void warmup()throws Exception + { + System.out.println("Warming up......"); + + boolean start = false; + while (!start) + { + Message msg = consumer.receive(); + if (msg instanceof TextMessage) + { + if (((TextMessage)msg).getText().equals("End")) + { + start = true; + MessageProducer temp = session.createProducer(msg.getJMSReplyTo()); + temp.send(session.createMessage()); + if (params.isTransacted()) + { + session.commit(); + } + temp.close(); + } + } + } + } + + public void startTest() throws Exception + { + System.out.println("Starting test......"); + consumer.setMessageListener(this); + } + + public void printResults() throws Exception + { + synchronized (lock) + { + lock.wait(); + } + + double avgLatency = (double)totalLatency/(double)rcvdMsgCount; + double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000; + double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000; + System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); + System.out.println(new StringBuilder("Consumer rate : "). + append(df.format(consRate)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("System Throughput : "). + append(df.format(throughput)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Avg Latency : "). + append(df.format(avgLatency)). + append(" ms").toString()); + System.out.println(new StringBuilder("Min Latency : "). + append(minLatency). + append(" ms").toString()); + System.out.println(new StringBuilder("Max Latency : "). + append(maxLatency). + append(" ms").toString()); + System.out.println("Completed the test......\n"); + } + + public void notifyCompletion(Destination replyTo) throws Exception + { + MessageProducer tmp = session.createProducer(replyTo); + Message endMsg = session.createMessage(); + tmp.send(endMsg); + if (params.isTransacted()) + { + session.commit(); + } + tmp.close(); + } + + public void tearDown() throws Exception + { + consumer.close(); + session.close(); + con.close(); + } + + public void onMessage(Message msg) + { + try + { + if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End")) + { + notifyCompletion(msg.getJMSReplyTo()); + + synchronized (lock) + { + lock.notifyAll(); + } + } + else + { + rcvdTime = System.currentTimeMillis(); + rcvdMsgCount ++; + + if (rcvdMsgCount == 1) + { + startTime = rcvdTime; + testStartTime = msg.getJMSTimestamp(); + } + + if (transacted && (rcvdMsgCount % transSize == 0)) + { + session.commit(); + } + + long latency = rcvdTime - msg.getJMSTimestamp(); + maxLatency = Math.max(maxLatency, latency); + minLatency = Math.min(minLatency, latency); + totalLatency = totalLatency + latency; + } + + } + catch(Exception e) + { + handleError(e,"Error when receiving messages"); + } + + } + + public void test() + { + try + { + setUp(); + warmup(); + startTest(); + printResults(); + tearDown(); + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + public static void main(String[] args) + { + final PerfConsumer cons = new PerfConsumer(); + Runnable r = new Runnable() + { + public void run() + { + cons.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } + t.start(); + } +}
\ No newline at end of file diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java new file mode 100644 index 0000000000..015d1e6205 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java @@ -0,0 +1,262 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; + +import org.apache.qpid.thread.Threading; + +/** + * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation + * from the consumer that it has successfully consumed them and ready to start the + * test. It will start sending y no of messages and each message will contain a time + * stamp. This will be used at the receiving end to measure the latency. + * + * This is done with the assumption that both consumer and producer are running on + * the same machine or different machines which have time synced using a time server. + * + * This test also calculates the producer rate as follows. + * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs) + * + * All throughput rates are given as msg/sec so the rates are multiplied by 1000. + * + * Rajith - Producer rate is not an accurate perf metric IMO. + * It is heavily inlfuenced by any in memory buffering. + * System throughput and latencies calculated by the PerfConsumer are more realistic + * numbers. + * + */ +public class PerfProducer extends PerfBase +{ + MessageProducer producer; + Message msg; + byte[] payload; + List<byte[]> payloads; + boolean cacheMsg = false; + boolean randomMsgSize = false; + boolean durable = false; + Random random; + int msgSizeRange = 1024; + + public PerfProducer() + { + super(); + } + + public void setUp() throws Exception + { + super.setUp(); + feedbackDest = session.createTemporaryQueue(); + + durable = params.isDurable(); + + // if message caching is enabled we pre create the message + // else we pre create the payload + if (params.isCacheMessage()) + { + cacheMsg = true; + + msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); + msg.setJMSDeliveryMode(durable? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + } + else if (params.isRandomMsgSize()) + { + random = new Random(20080921); + randomMsgSize = true; + msgSizeRange = params.getMsgSize(); + payloads = new ArrayList<byte[]>(msgSizeRange); + + for (int i=0; i < msgSizeRange; i++) + { + payloads.add(MessageFactory.createMessagePayload(i).getBytes()); + } + } + else + { + payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); + } + + producer = session.createProducer(dest); + producer.setDisableMessageID(params.isDisableMessageID()); + producer.setDisableMessageTimestamp(params.isDisableTimestamp()); + } + + protected Message getNextMessage() throws Exception + { + if (cacheMsg) + { + return msg; + } + else + { + msg = session.createBytesMessage(); + + if (!randomMsgSize) + { + ((BytesMessage)msg).writeBytes(payload); + } + else + { + ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange))); + } + msg.setJMSDeliveryMode(durable? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + return msg; + } + } + + public void warmup()throws Exception + { + System.out.println("Warming up......"); + MessageConsumer tmp = session.createConsumer(feedbackDest); + + for (int i=0; i < params.getWarmupCount() -1; i++) + { + producer.send(getNextMessage()); + } + Message msg = session.createTextMessage("End"); + msg.setJMSReplyTo(feedbackDest); + producer.send(msg); + + if (params.isTransacted()) + { + session.commit(); + } + + tmp.receive(); + + if (params.isTransacted()) + { + session.commit(); + } + + tmp.close(); + } + + public void startTest() throws Exception + { + System.out.println("Starting test......"); + int count = params.getMsgCount(); + boolean transacted = params.isTransacted(); + int tranSize = params.getTransactionSize(); + + long start = System.currentTimeMillis(); + for(int i=0; i < count; i++ ) + { + Message msg = getNextMessage(); + msg.setJMSTimestamp(System.currentTimeMillis()); + producer.send(msg); + if ( transacted && ((i+1) % tranSize == 0)) + { + session.commit(); + } + } + long time = System.currentTimeMillis() - start; + double rate = ((double)count/(double)time)*1000; + System.out.println(new StringBuilder("Producer rate: "). + append(df.format(rate)). + append(" msg/sec"). + toString()); + } + + public void waitForCompletion() throws Exception + { + MessageConsumer tmp = session.createConsumer(feedbackDest); + Message msg = session.createTextMessage("End"); + msg.setJMSReplyTo(feedbackDest); + producer.send(msg); + + if (params.isTransacted()) + { + session.commit(); + } + + tmp.receive(); + + if (params.isTransacted()) + { + session.commit(); + } + + tmp.close(); + System.out.println("Consumer has completed the test......"); + } + + public void tearDown() throws Exception + { + producer.close(); + session.close(); + con.close(); + } + + public void test() + { + try + { + setUp(); + warmup(); + startTest(); + waitForCompletion(); + tearDown(); + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + + public static void main(String[] args) + { + final PerfProducer prod = new PerfProducer(); + Runnable r = new Runnable() + { + public void run() + { + prod.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating producer thread",e); + } + t.start(); + } +}
\ No newline at end of file diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java new file mode 100644 index 0000000000..602fcc6321 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java @@ -0,0 +1,904 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import static org.apache.qpid.tools.QpidBench.Mode.BOTH; +import static org.apache.qpid.tools.QpidBench.Mode.CONSUME; +import static org.apache.qpid.tools.QpidBench.Mode.PUBLISH; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.thread.Threading; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.ExchangeBind; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageDeliveryMode; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.MessageSubscribe; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.QueueDeclare; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; +import org.apache.qpid.util.UUIDGen; +import org.apache.qpid.util.UUIDs; + +/** + * QpidBench + * + */ + +public class QpidBench +{ + + static enum Mode + { + PUBLISH, CONSUME, BOTH + } + + private static class Options + { + private StringBuilder usage = new StringBuilder("qpid-bench <options>"); + + void usage(String name, String description, Object def) + { + String defval = ""; + if (def != null) + { + defval = String.format(" (%s)", def); + } + usage.append(String.format("\n %-15s%-14s %s", name, defval, description)); + } + + public String broker = "localhost"; + public int port = 5672; + public long count = 1000000; + public long window = 100000; + public long sample = window; + public int size = 1024; + public Mode mode = BOTH; + public boolean timestamp = false; + public boolean message_id = false; + public boolean message_cache = false; + public boolean persistent = false; + public boolean jms_publish = false; + public boolean jms_consume = false; + public boolean help = false; + + { + usage("-b, --broker", "the broker hostname", broker); + } + + public void parse__broker(String b) + { + this.broker = b; + } + + public void parse_b(String b) + { + parse__broker(b); + } + + { + usage("-p, --port", "the broker port", port); + } + + public void parse__port(String p) + { + this.port = Integer.parseInt(p); + } + + public void parse_p(String p) + { + parse__port(p); + } + + { + usage("-c, --count", "the number of messages to send/receive, 0 means no limit", count); + } + + public void parse__count(String c) + { + this.count = Long.parseLong(c); + } + + public void parse_c(String c) + { + parse__count(c); + } + + { + usage("-w, --window", "the number of messages to send before blocking, 0 disables", window); + } + + public void parse__window(String w) + { + this.window = Long.parseLong(w); + } + + public void parse_w(String w) + { + parse__window(w); + } + + { + usage("--sample", "print stats after this many messages, 0 disables", sample); + } + + public void parse__sample(String s) + { + this.sample = Long.parseLong(s); + } + + { + usage("-i, --interval", "sets both --window and --sample", window); + } + + public void parse__interval(String i) + { + this.window = Long.parseLong(i); + this.sample = window; + } + + public void parse_i(String i) + { + parse__interval(i); + } + + { + usage("-s, --size", "the message size", size); + } + + public void parse__size(String s) + { + this.size = Integer.parseInt(s); + } + + public void parse_s(String s) + { + parse__size(s); + } + + { + usage("-m, --mode", "one of publish, consume, or both", mode); + } + + public void parse__mode(String m) + { + if (m.equalsIgnoreCase("publish")) + { + this.mode = PUBLISH; + } + else if (m.equalsIgnoreCase("consume")) + { + this.mode = CONSUME; + } + else if (m.equalsIgnoreCase("both")) + { + this.mode = BOTH; + } + else + { + throw new IllegalArgumentException + ("must be one of 'publish', 'consume', or 'both'"); + } + } + + public void parse_m(String m) + { + parse__mode(m); + } + + { + usage("--timestamp", "set timestamps on each message if true", timestamp); + } + + public void parse__timestamp(String t) + { + this.timestamp = Boolean.parseBoolean(t); + } + + { + usage("--mesage-id", "set the message-id on each message if true", message_id); + } + + public void parse__message_id(String m) + { + this.message_id = Boolean.parseBoolean(m); + } + + { + usage("--message-cache", "reuse the same message for each send if true", message_cache); + } + + public void parse__message_cache(String c) + { + this.message_cache = Boolean.parseBoolean(c); + } + + { + usage("--persistent", "set the delivery-mode to persistent if true", persistent); + } + + public void parse__persistent(String p) + { + this.persistent = Boolean.parseBoolean(p); + } + + { + usage("--jms-publish", "use the jms client for publish", jms_publish); + } + + public void parse__jms_publish(String jp) + { + this.jms_publish = Boolean.parseBoolean(jp); + } + + { + usage("--jms-consume", "use the jms client for consume", jms_consume); + } + + public void parse__jms_consume(String jc) + { + this.jms_consume = Boolean.parseBoolean(jc); + } + + { + usage("--jms", "sets both --jms-publish and --jms-consume", false); + } + + public void parse__jms(String j) + { + this.jms_publish = this.jms_consume = Boolean.parseBoolean(j); + } + + { + usage("-h, --help", "prints this message", null); + } + + public void parse__help() + { + this.help = true; + } + + public void parse_h() + { + parse__help(); + } + + public String parse(String ... args) + { + Class klass = getClass(); + List<String> arguments = new ArrayList<String>(); + for (int i = 0; i < args.length; i++) + { + String option = args[i]; + + if (!option.startsWith("-")) + { + arguments.add(option); + continue; + } + + String method = "parse" + option.replace('-', '_'); + try + { + try + { + Method parser = klass.getMethod(method); + parser.invoke(this); + } + catch (NoSuchMethodException e) + { + try + { + Method parser = klass.getMethod(method, String.class); + + String value = null; + if (i + 1 < args.length) + { + value = args[i+1]; + i++; + } + else + { + return option + " requires a value"; + } + + parser.invoke(this, value); + } + catch (NoSuchMethodException e2) + { + return "no such option: " + option; + } + } + } + catch (InvocationTargetException e) + { + Throwable t = e.getCause(); + return String.format + ("error parsing %s: %s: %s", option, t.getClass().getName(), + t.getMessage()); + } + catch (IllegalAccessException e) + { + throw new RuntimeException + ("unable to access parse method: " + option, e); + } + } + + return parseArguments(arguments); + } + + public String parseArguments(List<String> arguments) + { + if (arguments.size() > 0) + { + String args = arguments.toString(); + return "unrecognized arguments: " + args.substring(1, args.length() - 1); + } + else + { + return null; + } + } + + public String toString() + { + Class klass = getClass(); + Field[] fields = klass.getFields(); + StringBuilder str = new StringBuilder(); + for (int i = 0; i < fields.length; i++) + { + if (i > 0) + { + str.append("\n"); + } + + String name = fields[i].getName(); + str.append(name); + str.append(" = "); + Object value; + try + { + value = fields[i].get(this); + } + catch (IllegalAccessException e) + { + throw new RuntimeException + ("unable to access field: " + name, e); + } + str.append(value); + } + + return str.toString(); + } + } + + public static final void main(String[] args) throws Exception + { + final Options opts = new Options(); + String error = opts.parse(args); + if (error != null) + { + System.err.println(error); + System.exit(-1); + return; + } + + if (opts.help) + { + System.out.println(opts.usage); + return; + } + + System.out.println(opts); + + switch (opts.mode) + { + case CONSUME: + case BOTH: + Runnable r = new Runnable() + { + public void run() + { + try + { + if (opts.jms_consume) + { + jms_consumer(opts); + } + else + { + native_consumer(opts); + } + } + catch (Exception e) + { + throw new RuntimeException(e); + } + System.out.println("Consumer Completed"); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } + t.start(); + break; + } + + switch (opts.mode) + { + case PUBLISH: + case BOTH: + Runnable r = new Runnable() + { + public void run() + { + try + { + if (opts.jms_publish) + { + jms_publisher(opts); + } + else + { + native_publisher(opts); + } + } + catch (Exception e) + { + throw new RuntimeException(e); + } + System.out.println("Producer Completed"); + } + }; + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating publisher thread",e); + } + t.start(); + break; + } + } + + private static enum Column + { + LEFT, RIGHT + } + + private static final void sample(Options opts, Column col, String name, long count, + long start, long time, long lastTime) + { + String pfx = ""; + String sfx = ""; + if (opts.mode == BOTH) + { + if (col == Column.RIGHT) + { + pfx = " -- "; + } + else + { + sfx = " --"; + } + } + + if (count == 0) + { + String stats = String.format("%s: %tc", name, start); + System.out.println(String.format("%s%-36s%s", pfx, stats, sfx)); + return; + } + + double cumulative = 1000 * (double) count / (double) (time - start); + double interval = 1000 * ((double) opts.sample / (double) (time - lastTime)); + + String stats = String.format + ("%s: %d %.2f %.2f", name, count, cumulative, interval); + System.out.println(String.format("%s%-36s%s", pfx, stats, sfx)); + } + + private static final javax.jms.Connection getJMSConnection(Options opts) throws Exception + { + String url = String.format + ("amqp://guest:guest@clientid/test?brokerlist='tcp://%s:%d'", + opts.broker, opts.port); + return new AMQConnection(url); + } + + private static final void jms_publisher(Options opts) throws Exception + { + javax.jms.Connection conn = getJMSConnection(opts); + + javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); + Destination dest = ssn.createQueue("test-queue"); + Destination echo_dest = ssn.createQueue("echo-queue"); + MessageProducer prod = ssn.createProducer(dest); + MessageConsumer cons = ssn.createConsumer(echo_dest); + prod.setDisableMessageID(!opts.message_id); + prod.setDisableMessageTimestamp(!opts.timestamp); + prod.setDeliveryMode(opts.persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + + StringBuilder str = new StringBuilder(); + for (int i = 0; i < opts.size; i++) + { + str.append((char) (i % 128)); + } + + String body = str.toString(); + + TextMessage cached = ssn.createTextMessage(); + cached.setText(body); + + conn.start(); + + long count = 0; + long lastTime = 0; + long start = System.currentTimeMillis(); + while (opts.count == 0 || count < opts.count) + { + if (opts.window > 0 && (count % opts.window) == 0 && count > 0) + { + Message echo = cons.receive(); + } + + if (opts.sample > 0 && (count % opts.sample) == 0) + { + long time = System.currentTimeMillis(); + sample(opts, Column.LEFT, "JP", count, start, time, lastTime); + lastTime = time; + } + + TextMessage m; + if (opts.message_cache) + { + m = cached; + } + else + { + m = ssn.createTextMessage(); + m.setText(body); + } + + prod.send(m); + count++; + } + + conn.close(); + } + + private static final void jms_consumer(final Options opts) throws Exception + { + final javax.jms.Connection conn = getJMSConnection(opts); + javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); + Destination dest = ssn.createQueue("test-queue"); + Destination echo_dest = ssn.createQueue("echo-queue"); + MessageConsumer cons = ssn.createConsumer(dest); + final MessageProducer prod = ssn.createProducer(echo_dest); + prod.setDisableMessageID(true); + prod.setDisableMessageTimestamp(true); + prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + final TextMessage echo = ssn.createTextMessage(); + echo.setText("ECHO"); + + final Object done = new Object(); + cons.setMessageListener(new MessageListener() + { + private long count = 0; + private long lastTime = 0; + private long start; + + public void onMessage(Message m) + { + if (count == 0) + { + start = System.currentTimeMillis(); + } + + try + { + boolean sample = opts.sample > 0 && (count % opts.sample) == 0; + long time = sample ? System.currentTimeMillis() : 0; + + if (opts.window > 0 && (count % opts.window) == 0) + { + prod.send(echo); + } + + if (sample) + { + sample(opts, Column.RIGHT, "JC", count, start, time, lastTime); + lastTime = time; + } + } + catch (JMSException e) + { + throw new RuntimeException(e); + } + count++; + + if (opts.count > 0 && count >= opts.count) + { + synchronized (done) + { + done.notify(); + } + } + } + }); + + conn.start(); + synchronized (done) + { + done.wait(); + } + conn.close(); + } + + private static final org.apache.qpid.transport.Connection getConnection + (Options opts) + { + org.apache.qpid.transport.Connection conn = + new org.apache.qpid.transport.Connection(); + conn.connect(opts.broker, opts.port, null, "guest", "guest",false); + return conn; + } + + private static abstract class NativeListener implements SessionListener + { + + public void opened(org.apache.qpid.transport.Session ssn) {} + + public void resumed(org.apache.qpid.transport.Session ssn) {} + + public void exception(org.apache.qpid.transport.Session ssn, + SessionException exc) + { + exc.printStackTrace(); + } + + public void closed(org.apache.qpid.transport.Session ssn) {} + + } + + private static final void native_publisher(Options opts) throws Exception + { + final long[] echos = { 0 }; + org.apache.qpid.transport.Connection conn = getConnection(opts); + org.apache.qpid.transport.Session ssn = conn.createSession(); + ssn.setSessionListener(new NativeListener() + { + public void message(org.apache.qpid.transport.Session ssn, + MessageTransfer xfr) + { + synchronized (echos) + { + echos[0]++; + echos.notify(); + } + ssn.processed(xfr); + } + }); + + ssn.invoke(new QueueDeclare().queue("test-queue").durable(false)); + ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false)); + ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue")); + ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue")); + + MessageProperties cached_mp = new MessageProperties(); + DeliveryProperties cached_dp = new DeliveryProperties(); + cached_dp.setRoutingKey("test-queue"); + cached_dp.setDeliveryMode + (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT); + + int size = opts.size; + ByteBuffer body = ByteBuffer.allocate(size); + for (int i = 0; i < size; i++) + { + body.put((byte) i); + } + body.flip(); + + ssn.invoke(new MessageSubscribe() + .queue("echo-queue") + .destination("echo-queue") + .acceptMode(MessageAcceptMode.NONE) + .acquireMode(MessageAcquireMode.PRE_ACQUIRED)); + ssn.messageSetFlowMode("echo-queue", MessageFlowMode.WINDOW); + ssn.messageFlow("echo-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF); + ssn.messageFlow("echo-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF); + + UUIDGen gen = UUIDs.newGenerator(); + + long count = 0; + long lastTime = 0; + long start = System.currentTimeMillis(); + while (opts.count == 0 || count < opts.count) + { + if (opts.window > 0 && (count % opts.window) == 0 && count > 0) + { + synchronized (echos) + { + while (echos[0] < (count/opts.window)) + { + echos.wait(); + } + } + } + + if (opts.sample > 0 && (count % opts.sample) == 0) + { + long time = System.currentTimeMillis(); + sample(opts, Column.LEFT, "NP", count, start, time, lastTime); + lastTime = time; + } + + MessageProperties mp; + DeliveryProperties dp; + if (opts.message_cache) + { + mp = cached_mp; + dp = cached_dp; + } + else + { + mp = new MessageProperties(); + dp = new DeliveryProperties(); + dp.setRoutingKey("test-queue"); + dp.setDeliveryMode + (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT); + + } + + if (opts.message_id) + { + mp.setMessageId(gen.generate()); + } + + if (opts.timestamp) + { + dp.setTimestamp(System.currentTimeMillis()); + } + + ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, + new Header(dp, mp), body.slice()); + count++; + } + + ssn.messageCancel("echo-queue"); + + ssn.sync(); + ssn.close(); + conn.close(); + } + + private static final void native_consumer(final Options opts) throws Exception + { + final DeliveryProperties dp = new DeliveryProperties(); + final byte[] echo = new byte[0]; + dp.setRoutingKey("echo-queue"); + dp.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); + final MessageProperties mp = new MessageProperties(); + final Object done = new Object(); + org.apache.qpid.transport.Connection conn = getConnection(opts); + org.apache.qpid.transport.Session ssn = conn.createSession(); + ssn.setSessionListener(new NativeListener() + { + private long count = 0; + private long lastTime = 0; + private long start; + + public void message(org.apache.qpid.transport.Session ssn, + MessageTransfer xfr) + { + if (count == 0) + { + start = System.currentTimeMillis(); + } + + boolean sample = opts.sample > 0 && (count % opts.sample) == 0; + long time = sample ? System.currentTimeMillis() : 0; + + if (opts.window > 0 && (count % opts.window) == 0) + { + ssn.messageTransfer("amq.direct", + MessageAcceptMode.NONE, + MessageAcquireMode.PRE_ACQUIRED, + new Header(dp, mp), + echo); + } + + if (sample) + { + sample(opts, Column.RIGHT, "NC", count, start, time, lastTime); + lastTime = time; + } + ssn.processed(xfr); + count++; + + if (opts.count > 0 && count >= opts.count) + { + synchronized (done) + { + done.notify(); + } + } + } + }); + + ssn.invoke(new QueueDeclare().queue("test-queue").durable(false)); + ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false)); + ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue")); + ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue")); + + ssn.invoke(new MessageSubscribe() + .queue("test-queue") + .destination("test-queue") + .acceptMode(MessageAcceptMode.NONE) + .acquireMode(MessageAcquireMode.PRE_ACQUIRED)); + ssn.messageSetFlowMode("test-queue", MessageFlowMode.WINDOW); + ssn.messageFlow("test-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF); + ssn.messageFlow("test-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF); + + synchronized (done) + { + done.wait(); + } + + ssn.messageCancel("test-queue"); + + ssn.sync(); + ssn.close(); + conn.close(); + } + +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java new file mode 100644 index 0000000000..89d6462a39 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java @@ -0,0 +1,170 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import javax.jms.Session; + +public class TestParams +{ + /* + * By default the connection URL is used. + * This allows a user to easily specify a fully fledged URL any given property. + * Ex. SSL parameters + * + * By providing a host & port allows a user to simply override the URL. + * This allows to create multiple clients in test scripts easily, + * without having to deal with the long URL format. + */ + private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"; + + private String host = ""; + + private int port = -1; + + private String address = "queue; {create : always}"; + + private int msg_size = 1024; + + private int msg_type = 1; // not used yet + + private boolean cacheMessage = false; + + private boolean disableMessageID = false; + + private boolean disableTimestamp = false; + + private boolean durable = false; + + private boolean transacted = false; + + private int transaction_size = 1000; + + private int ack_mode = Session.AUTO_ACKNOWLEDGE; + + private int msg_count = 10; + + private int warmup_count = 1; + + private boolean random_msg_size = false; + + public TestParams() + { + + url = System.getProperty("url",url); + host = System.getProperty("host",""); + port = Integer.getInteger("port", -1); + address = System.getProperty("address","queue"); + + msg_size = Integer.getInteger("msg_size", 1024); + msg_type = Integer.getInteger("msg_type",1); + cacheMessage = Boolean.getBoolean("cache_msg"); + disableMessageID = Boolean.getBoolean("disableMessageID"); + disableTimestamp = Boolean.getBoolean("disableTimestamp"); + durable = Boolean.getBoolean("durable"); + transacted = Boolean.getBoolean("transacted"); + transaction_size = Integer.getInteger("trans_size",1000); + ack_mode = Integer.getInteger("ack_mode",Session.AUTO_ACKNOWLEDGE); + msg_count = Integer.getInteger("msg_count",msg_count); + warmup_count = Integer.getInteger("warmup_count",warmup_count); + random_msg_size = Boolean.getBoolean("random_msg_size"); + } + + public String getUrl() + { + return url; + } + + public String getHost() + { + return host; + } + + public int getPort() + { + return port; + } + + public String getAddress() + { + return address; + } + + public int getAckMode() + { + return ack_mode; + } + + public int getMsgCount() + { + return msg_count; + } + + public int getMsgSize() + { + return msg_size; + } + + public int getMsgType() + { + return msg_type; + } + + public boolean isDurable() + { + return durable; + } + + public boolean isTransacted() + { + return transacted; + } + + public int getTransactionSize() + { + return transaction_size; + } + + public int getWarmupCount() + { + return warmup_count; + } + + public boolean isCacheMessage() + { + return cacheMessage; + } + + public boolean isDisableMessageID() + { + return disableMessageID; + } + + public boolean isDisableTimestamp() + { + return disableTimestamp; + } + + public boolean isRandomMsgSize() + { + return random_msg_size; + } + +} |