diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /java/tools | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/tools')
24 files changed, 0 insertions, 4135 deletions
diff --git a/java/tools/README.txt b/java/tools/README.txt deleted file mode 100644 index fdde734027..0000000000 --- a/java/tools/README.txt +++ /dev/null @@ -1,153 +0,0 @@ -Introduction -============ - -The Test kit for the java client consists of 2 components. - -1) A Simple Perf Test that can be used to, - a) Run a predefined perf report consisting of 8 use cases (see below) - b) Run a producer and a consumer with a number of different options - -2) Soak tests that can be run for longer durations (hours or days). - -I am planning to add some stress tests to this module as well. -Please note this is not a replacement for the existing perf/systests etc. -But rather a small test kit thats focused on providing a packaged set of tests that can be quickly deployed on an environment to do quick smoke testing or easily setup a soak test. - -Table of Contents -================= -1. Perf Kit -2. Soak Kit -3. Perf Test use cases -4. Soak Test use cases -5. Running the sample perf test report -6. Running the sample soak test report - -1.0 Perf Kit ------------- -1.1 The perf kit can be packaged as an RPM or a tar file and deploy on a target environment and run the perf report. -Or else a perf report can be automated to run every day or so an record numbers to catch perf regressions. - -1.2 It calculates the following results in msg/sec. - - System throuhgput : no_of_msgs / (time_last_msg_rcvd - time_first_msg_send) - - Producer rate : no_of_msgs / (time_after_sending - time_before_sending) - - Producer rate : no_of_msgs / (time_last_msg_rcvd - time_first_msg_rcvd) - - Latency : time_msg_rcvd - time_msg_sent - -The test will print min, max and avg latency. - -1.3 The test assume that both producer and consumer are run on the same machine or different machines that are time synced. - -1.4 You can also use run_sub.sh and run_pub.sh to run different use cases with several options. - Please look at TestParams.java for all the configurable options. - -1.5 You can also use the test kit to benchmark against any vendor. - - -2.0 Soak tests --------------- -2.0 This includes a set of soak tests that can be run for a longer duration. - -2.1 A typical test will send x-1 messages and the xth message will contain an "End" marker. - The producer will print the timestamp as soon as it sends the xth message. - The consumer will reply with an empty message to the replyTo destination given in the xth message. - The consumer prints the throuhgput for the iteration and the latency for the xth message. - A typical value for x is 100k - -2.2 The feedback loop prevents the producer from overrunning the consumer. - And the printout for every xth message will let you know how many iterations been completed at any given time. - (Ex a simple cat log | wc -l will give you the how many iterations have been completed so far). - -2.2 The following results can be calculated for these tests. - - Memory, CPU for each producer/consumer - look at testkit/bin/run_soak_client.sh for an example - - You can find the Avg, Min & Max for throughput, latency, CPU and memory for the entire test run. - (look at testkit/bin/soak_report.sh) for an example). - - You could also graph throughput, latency, CPU and memory using the comma separated log files. - -2.2 If you use different machines for producer and consumer the machines have to be time synced to have meaningful latency samples. - -3.0 Perf Test report use cases -------------------------------- -3.1 Please check testkit/bin/perf_report.sh for more details - -3.2 A typical test run will send 1000 msgs during warmup and 200k msgs for result calculation. - -Test 1 Trans Queue - -Test 2 Dura Queue - -Test 3 Dura Queue Sync - -Test 4 Topic - -Test 5 Durable Topic - -Test 6 Fanout - -Test 7 Small TX (about 2 msgs per tx) - -Test 8 Large TX (about 1000 msgs per tx) - - -4.0 Soak tests use cases -------------------------- -4.1 Following are the current tests available in the test kit. - -4.2 Please refer to the source to see the javadoc and options - - -1. SimpleProducer/Consumer sends X messages at a time and will wait for confirmation from producer before proceeding with the next iteration. A no of options can be configured. - -2. MultiThreadedProducer/Consumer does the same thing as above but runs each session in a separate thread. - It can also send messages transactionally. Again a no of options can be configured. - -3. ResourceLeakTest will setup consumer/producers sends x messages and then teard down everything and continue again. - - -5.0 Running the sample perf test report ---------------------------------------- -The testkit/bin contains perf_report.sh. -It runs the above 8 use cases against a broker and print the results in a tabular format. - -For example -================================================================================================ -|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency| ------------------------------------------------------------------------------------------------- -|Trans_Queue | xxxxx.xx| xxxxx.xx| xxxxx.xx| xx.xx| x| xx| - - -5.1 running perf_report.sh - -5.1.1 set JAVA_HOME to point to Java 1.5 and above -5.1.2 set QPID_TEST_HOME to point to the testkit dir -5.1.3 set VENDOR_LIB to point to the Qpid (or other JMS providers) jar files. -5.1.4 start a broker -5.1.5 update the testkit/etc/jndi.properties to point to the correct broker -5.1.6 execute perf_report.sh - - -6.0 Running the sample soak test report ---------------------------------------- -The testkit/bin contains soak_report.sh -It runs MultiThreadedProducer/Consumer for the duration specified and prints a report for the following stats. -Avg, Min and Max for System Throughput, letency, CPU and memory. - -6.1 running soak_report.sh - -5.1.1 set JAVA_HOME to point to Java 1.5 and above -5.1.2 set QPID_TEST_HOME to point to the testkit dir -5.1.3 set JAR_PATH to point to the Qpid jars -5.1.4 start a broker -5.1.5 execute soak_report.sh with correct params. - Ex sh soak_report.sh 1 36000 will run for 10 hours colllecting CPU, memory every second. - -5.1.6 Please note the total duration for the test is log_freq * log_iterations - So if you want to run the test for 10 hours and collect 10 second samples then do the following - sh soak_report.sh 10 3600 - diff --git a/java/tools/bin/perf_report.sh b/java/tools/bin/perf_report.sh deleted file mode 100755 index e6b4c987e5..0000000000 --- a/java/tools/bin/perf_report.sh +++ /dev/null @@ -1,140 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# This will run the 8 use cases defined below and produce -# a report in tabular format. Refer to the documentation -# for more details. - -SUB_MEM=-Xmx1024M -PUB_MEM=-Xmx1024M -LOG_CONFIG="-Damqj.logging.level=WARN" -QUEUE="queue;{create:always,node:{x-declare:{auto-delete:true}}}" -DURA_QUEUE="dqueue;{create:always,node:{durable:true,x-declare:{auto-delete:true}}}" -TOPIC="amq.topic/test" -DURA_TOPIC="amq.topic/test;{create:always,link:{durable:true}}" - -. setenv.sh - -waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; } -cleanup() -{ - pids=`ps aux | grep java | grep Perf | awk '{print $2}'` - if [ "$pids" != "" ]; then - kill -3 $pids - kill -9 $pids >/dev/null 2>&1 - fi -} - -# $1 test name -# $2 consumer options -# $3 producer options -run_testcase() -{ - sh run_sub.sh $LOG_CONFIG $SUB_MEM $2 > sub.out & - waitfor sub.out "Warming up" - sh run_pub.sh $LOG_CONFIG $PUB_MEM $3 > pub.out & - waitfor sub.out "Completed the test" - waitfor pub.out "Consumer has completed the test" - sleep 2 #give a grace period to shutdown - print_result $1 -} - -print_result() -{ - prod_rate=`cat pub.out | grep "Producer rate" | awk '{print $3}'` - sys_rate=`cat sub.out | grep "System Throughput" | awk '{print $4}'` - cons_rate=`cat sub.out | grep "Consumer rate" | awk '{print $4}'` - avg_latency=`cat sub.out | grep "Avg Latency" | awk '{print $4}'` - min_latency=`cat sub.out | grep "Min Latency" | awk '{print $4}'` - max_latency=`cat sub.out | grep "Max Latency" | awk '{print $4}'` - - printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11d|%11d|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency - echo "------------------------------------------------------------------------------------------------" -} - -trap cleanup EXIT - -echo "Test report on " `date +%F` -echo "================================================================================================" -echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|" -echo "------------------------------------------------------------------------------------------------" - -# The message counts and warmup counts are set to very low values for quick testing of the script. -# For a real performance run I recommend setting warmup count to 10k and message count in excess of 100k -# However for transactions, sync_publish and especially small durable transactions (which is quite slow) I recommend -# setting very low values to start with and experiment while increasing them slowly. - -# Test 1 Trans Queue -#run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10" - -# Test 2 Dura Queue -run_testcase "Dura_Queue" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" - -# Test 3 Dura Queue Sync -run_testcase "Dura_Queue_Sync" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent" - -# Test 4 Dura Queue Sync Publish and Ack -run_testcase "Dura_SyncPubAck" "-Daddress=$DURA_QUEUE -Ddurable=true -Dsync_ack=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent" - -# Test 5 Topic -run_testcase "Topic" "-Daddress=$TOPIC" "-Daddress=$TOPIC -Dwarmup_count=1 -Dmsg_count=10" - -# Test 6 Durable Topic -run_testcase "Dura_Topic" "-Daddress=$DURA_TOPIC -Ddurable=true" "-Daddress=$DURA_TOPIC -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" - -# Test 7 Fanout -run_testcase "Fanout" "-Daddress=amq.fanout" "-Daddress=amq.fanout -Dwarmup_count=1 -Dmsg_count=10" - -# Test 8 Small TX -run_testcase "Small_Txs_2" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=1" \ - "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=1" - -# Test 9 Large TX -run_testcase "Large_Txs_1000" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=10" \ - "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=10" - -# Test 10 256 MSG -run_testcase "Msg_256b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=256 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 11 512 MSG -run_testcase "Msg_512b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=512 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 12 2048 MSG -run_testcase "Msg_2048b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=2048 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 13 Random size MSG -run_testcase "Random_Msg_Size" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10" - -# Test 14 Random size MSG Durable -run_testcase "Rand_Msg_Dura" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10" - -# Test 15 64K MSG -run_testcase "Msg_64K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 16 Durable 64K MSG -run_testcase "Msg_Durable_64K" "-Daddress=$DURA_QUEUE -Ddurable=true -Damqj.tcpNoDelay=true" \ - "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" - -# Test 17 500K MSG -run_testcase "Msg_500K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 18 Durable 500K MSG -run_testcase "Msg_Dura_500K" "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Ddurable=true" \ - "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" diff --git a/java/tools/bin/qpid-bench b/java/tools/bin/qpid-bench deleted file mode 100644 index c982e64efd..0000000000 --- a/java/tools/bin/qpid-bench +++ /dev/null @@ -1,35 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -if [ -z "$QPID_HOME" ]; then - export QPID_HOME=$(dirname $(dirname $(readlink -f $0))) - export PATH=${PATH}:${QPID_HOME}/bin -fi - -# Set classpath to include Qpid jar with all required jars in manifest -QPID_LIBS=$QPID_HOME/lib/qpid-all.jar - -# Set other variables used by the qpid-run script before calling -export JAVA=java \ - JAVA_VM=-server \ - JAVA_MEM=-Xmx1024m \ - QPID_CLASSPATH=$QPID_LIBS - -. qpid-run org.apache.qpid.tools.QpidBench "$@" diff --git a/java/tools/bin/qpid-python-testkit b/java/tools/bin/qpid-python-testkit deleted file mode 100755 index cbe7972421..0000000000 --- a/java/tools/bin/qpid-python-testkit +++ /dev/null @@ -1,30 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# This is wrapper script to run the tests defined in testkit.py -# via the python test runner. The defaults are set for a running -# from an svn checkout - -. ./set-testkit-env.sh - -export PYTHONPATH=./:$PYTHONPATH -rm -rf $OUTDIR -qpid-python-test -DOUTDIR=$OUTDIR -m testkit "$@" - diff --git a/java/tools/bin/run_pub.sh b/java/tools/bin/run_pub.sh deleted file mode 100644 index 91b9287dea..0000000000 --- a/java/tools/bin/run_pub.sh +++ /dev/null @@ -1,24 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -. $QPID_TEST_HOME/bin/setenv.sh - -echo "$@" -$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.tools.PerfProducer diff --git a/java/tools/bin/run_sub.sh b/java/tools/bin/run_sub.sh deleted file mode 100644 index c9ad2fed74..0000000000 --- a/java/tools/bin/run_sub.sh +++ /dev/null @@ -1,25 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -. $QPID_TEST_HOME/bin/setenv.sh - -echo "$@" -$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.tools.PerfConsumer - diff --git a/java/tools/bin/set-testkit-env.sh b/java/tools/bin/set-testkit-env.sh deleted file mode 100644 index 051dad8179..0000000000 --- a/java/tools/bin/set-testkit-env.sh +++ /dev/null @@ -1,88 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# If QPIDD_EXEC ..etc is not set, it will first check to see -# if this is run from a qpid svn check out, if not it will look -# for installed rpms. - -abs_path() -{ - D=`dirname "$1"` - B=`basename "$1"` - echo "`cd \"$D\" 2>/dev/null && pwd || echo \"$D\"`/$B" -} - -# Environment for python tests - -if [ -d ../../../python ] ; then - PYTHON_DIR=../../../python - PYTHONPATH=$PYTHON_DIR:$PYTHON_DIR/qpid -elif [ -z `echo $PYTHONPATH | awk '$0 ~ /qpid/'` ]; then - echo "WARNING: skipping test, no qpid python scripts found ."; exit 0; -fi - - -if [ "$QPIDD_EXEC" = "" ] ; then - if [ -x ../../../cpp/src/qpidd ]; then - QPIDD_EXEC=`abs_path "../../../cpp/src/qpidd"` - elif [ -n "$(which qpidd)" ] ; then - QPIDD_EXEC=$(which qpidd) - else - echo "WARNING: skipping test, QPIDD_EXEC not set and qpidd not found."; exit 0; - fi -fi - -if [ "$CLUSTER_LIB" = "" ] ; then - if [ -x ../../../cpp/src/.libs/cluster.so ]; then - CLUSTER_LIB=`abs_path "../../../cpp/src/.libs/cluster.so"` - elif [ -e /usr/lib64/qpid/daemon/cluster.so ] ; then - CLUSTER_LIB="/usr/lib64/qpid/daemon/cluster.so" - elif [ -e /usr/lib/qpid/daemon/cluster.so ] ; then - CLUSTER_LIB="/usr/lib/qpid/daemon/cluster.so" - else - echo "WARNING: skipping test, CLUSTER_LIB not set and cluster.so not found."; exit 0; - fi -fi - -if [ "$STORE_LIB" = "" ] ; then - if [ -e /usr/lib64/qpid/daemon/msgstore.so ] ; then - STORE_LIB="/usr/lib64/qpid/daemon/msgstore.so" - elif [ -e /usr/lib/qpid/daemon/msgstore.so ] ; then - STORE_LIB="/usr/lib/qpid/daemon/msgstore.so" - #else - # echo "WARNING: skipping test, STORE_LIB not set and msgstore.so not found."; exit 0; - fi -fi - -if [ "$QP_CP" = "" ] ; then - if [ -d ../../build/lib/ ]; then - QP_JAR_PATH=`abs_path "../../build/lib/"` - elif [ -d /usr/share/java/qpid-deps ]; then - QP_JAR_PATH=`abs_path "/usr/share/java"` - else - "WARNING: skipping test, QP_CP not set and the Qpid jars are not present."; exit 0; - fi - QP_CP=`find $QP_JAR_PATH -name '*.jar' | tr '\n' ':'` -fi - -if [ "$OUTDIR" = "" ] ; then - OUTDIR=`abs_path "./output"` -fi - -export PYTHONPATH PYTHON_DIR QPIDD_EXEC CLUSTER_LIB QP_CP OUTDIR diff --git a/java/tools/bin/setenv.sh b/java/tools/bin/setenv.sh deleted file mode 100644 index 24135e711b..0000000000 --- a/java/tools/bin/setenv.sh +++ /dev/null @@ -1,49 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Compiles the test classes and sets the CLASSPATH - -# check for QPID_TEST_HOME -if [ "$QPID_TEST_HOME" = "" ] ; then - echo "ERROR: Please set QPID_TEST_HOME ...." - exit 1 -fi - -# check for JAVA_HOME -if [ "$JAVA_HOME" = "" ] ; then - echo "ERROR: Please set JAVA_HOME ...." - exit 1 -fi - -# VENDOR_LIB path needs to be set -# for Qpid set this to {qpid_checkout}/java/build/lib -if [ "$VENDOR_LIB" = "" ] ; then - echo "ERROR: Please set VENDOR_LIB path in the script ...." - exit 1 -fi - - -[ -d $QPID_TEST_HOME/classes ] || mkdir $QPID_TEST_HOME/classes - -CLASSPATH=`find $VENDOR_LIB -name *.jar* | tr '\n' ":"` -$JAVA_HOME/bin/javac -cp $CLASSPATH -d $QPID_TEST_HOME/classes -sourcepath $QPID_TEST_HOME/src `find $QPID_TEST_HOME/src -name '*.java'` - -export CLASSPATH=$QPID_TEST_HOME/classes:$CLASSPATH - diff --git a/java/tools/bin/testkit.py b/java/tools/bin/testkit.py deleted file mode 100755 index 1c2ad598b8..0000000000 --- a/java/tools/bin/testkit.py +++ /dev/null @@ -1,278 +0,0 @@ -#!/usr/bin/env python - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import time, string, traceback -from brokertest import * -from qpid.messaging import * - - -try: - import java.lang.System - _cp = java.lang.System.getProperty("java.class.path"); -except ImportError: - _cp = checkenv("QP_CP") - -class Formatter: - - def __init__(self, message): - self.message = message - self.environ = {"M": self.message, - "P": self.message.properties, - "C": self.message.content} - - def __getitem__(self, st): - return eval(st, self.environ) - -# The base test case has support for launching the generic -# receiver and sender through the TestLauncher with all the options. -# -class JavaClientTest(BrokerTest): - """Base Case for Java Test cases""" - - client_class = "org.apache.qpid.testkit.TestLauncher" - - # currently there is no transparent reconnection. - # temp hack: just creating the queue here and closing it. - def start_error_watcher(self,broker=None): - ssn = broker.connect().session() - err_watcher = ssn.receiver("control; {create:always}", capacity=1) - ssn.close() - - def store_module_args(self): - if BrokerTest.store_lib: - return ["--load-module", BrokerTest.store_lib] - else: - print "Store module not present." - return [""] - - def client(self,**options): - cmd = ["java","-cp",_cp] - - cmd += ["-Dtest_name=" + options.get("test_name", "UNKNOWN")] - cmd += ["-Dhost=" + options.get("host","127.0.0.1")] - cmd += ["-Dport=" + str(options.get("port",5672))] - cmd += ["-Dcon_count=" + str(options.get("con_count",1))] - cmd += ["-Dssn_per_con=" + str(options.get("ssn_per_con",1))] - cmd += ["-Duse_unique_dests=" + str(options.get("use_unique_dests",False))] - cmd += ["-Dcheck_for_dups=" + str(options.get("check_for_dups",False))] - cmd += ["-Ddurable=" + str(options.get("durable",False))] - cmd += ["-Dtransacted=" + str(options.get("transacted",False))] - cmd += ["-Dreceiver=" + str(options.get("receiver",False))] - cmd += ["-Dsync_rcv=" + str(options.get("sync_rcv",False))] - cmd += ["-Dsender=" + str(options.get("sender",False))] - cmd += ["-Dmsg_size=" + str(options.get("msg_size",256))] - cmd += ["-Dtx_size=" + str(options.get("tx_size",10))] - cmd += ["-Dmsg_count=" + str(options.get("msg_count",1000))] - cmd += ["-Dmax_prefetch=" + str(options.get("max_prefetch",500))] - cmd += ["-Dsync_ack=" + str(options.get("sync_ack",False))] - cmd += ["-Dsync_persistence=" + str(options.get("sync_pub",False))] - cmd += ["-Dsleep_time=" + str(options.get("sleep_time",1000))] - cmd += ["-Dfailover=" + options.get("failover", "failover_exchange")] - cmd += ["-Djms_durable_sub=" + str(options.get("jms_durable_sub", False))] - cmd += ["-Dlog.level=" + options.get("log.level", "warn")] - cmd += [self.client_class] - cmd += [options.get("address", "my_queue; {create: always}")] - - #print str(options.get("port",5672)) - return cmd - - # currently there is no transparent reconnection. - # temp hack: just creating a receiver and closing session soon after. - def monitor_clients(self,broker=None,run_time=600,error_ck_freq=60): - ssn = broker.connect().session() - err_watcher = ssn.receiver("control; {create:always}", capacity=1) - i = run_time/error_ck_freq - is_error = False - for j in range(i): - not_empty = True - while not_empty: - try: - m = err_watcher.fetch(timeout=error_ck_freq) - ssn.acknowledge() - print "Java process notified of an error" - self.print_error(m) - is_error = True - except messaging.Empty, e: - not_empty = False - - ssn.close() - return is_error - - def print_error(self,msg): - print msg.properties.get("exception-trace") - - def verify(self, receiver,sender): - sender_running = receiver.is_running() - receiver_running = sender.is_running() - - self.assertTrue(receiver_running,"Receiver has exited prematually") - self.assertTrue(sender_running,"Sender has exited prematually") - - def start_sender_and_receiver(self,**options): - - receiver_opts = options - receiver_opts["receiver"]=True - receiver = self.popen(self.client(**receiver_opts), - expect=EXPECT_RUNNING) - - sender_opts = options - sender_opts["sender"]=True - sender = self.popen(self.client(**sender_opts), - expect=EXPECT_RUNNING) - - return receiver, sender - - def start_cluster(self,count=2,expect=EXPECT_RUNNING,**options): - if options.get("durable",False)==True: - cluster = Cluster(self, count=count, expect=expect, args=self.store_module_args()) - else: - cluster = Cluster(self, count=count) - return cluster - -class ConcurrencyTest(JavaClientTest): - """A concurrency test suite for the JMS client""" - skip = False - - def base_case(self,**options): - if self.skip : - print "Skipping test" - return - - cluster = self.start_cluster(count=2,**options) - self.start_error_watcher(broker=cluster[0]) - options["port"] = port=cluster[0].port() - - options["use_unique_dests"]=True - options["address"]="amq.topic" - receiver, sender = self.start_sender_and_receiver(**options) - self.monitor_clients(broker=cluster[0],run_time=180) - self.verify(receiver,sender) - - def test_multiplexing_con(self): - """Tests multiple sessions on a single connection""" - - self.base_case(ssn_per_con=25,test_name=self.id()) - - def test_multiplexing_con_with_tx(self): - """Tests multiple transacted sessions on a single connection""" - - self.base_case(ssn_per_con=25,transacted=True,test_name=self.id()) - - def test_multiplexing_con_with_sync_rcv(self): - """Tests multiple sessions with sync receive""" - - self.base_case(ssn_per_con=25,sync_rcv=True,test_name=self.id()) - - def test_multiplexing_con_with_durable_sub(self): - """Tests multiple sessions with durable subs""" - - self.base_case(ssn_per_con=25,durable=True,jms_durable_sub=True,test_name=self.id()) - - def test_multiplexing_con_with_sync_ack(self): - """Tests multiple sessions with sync ack""" - - self.base_case(ssn_per_con=25,sync_ack=True,test_name=self.id()) - - def test_multiplexing_con_with_sync_pub(self): - """Tests multiple sessions with sync pub""" - - self.base_case(ssn_per_con=25,sync_pub=True,durable=True,test_name=self.id()) - - def test_multiple_cons_and_ssns(self): - """Tests multiple connections and sessions""" - - self.base_case(con_count=10,ssn_per_con=25,test_name=self.id()) - - -class SoakTest(JavaClientTest): - """A soak test suite for the JMS client""" - - def base_case(self,**options): - cluster = self.start_cluster(count=4, expect=EXPECT_EXIT_FAIL,**options) - options["port"] = port=cluster[0].port() - self.start_error_watcher(broker=cluster[0]) - options["use_unique_dests"]=True - options["address"]="amq.topic" - receiver,sender = self.start_sender_and_receiver(**options) - is_error = self.monitor_clients(broker=cluster[0],run_time=30,error_ck_freq=30) - - if (is_error): - print "The sender or receiver didn't start properly. Exiting test." - return - else: - "Print no error !" - - # grace period for java clients to get the failover properly setup. - time.sleep(30) - error_msg= None - # Kill original brokers, start new ones. - try: - for i in range(8): - cluster[i].kill() - b=cluster.start() - self.monitor_clients(broker=b,run_time=30,error_ck_freq=30) - print "iteration : " + str(i) - except ConnectError, e1: - error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1) - - except SessionError, e2: - error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2) - - self.verify(receiver,sender) - if error_msg: - raise Exception(error_msg) - - - def test_failover(self) : - """Test basic failover""" - - self.base_case(test_name=self.id()) - - - def test_failover_with_durablesub(self): - """Test failover with durable subscriber""" - - self.base_case(durable=True,jms_durable_sub=True,test_name=self.id()) - - - def test_failover_with_sync_rcv(self): - """Test failover with sync receive""" - - self.base_case(sync_rcv=True,test_name=self.id()) - - - def test_failover_with_sync_ack(self): - """Test failover with sync ack""" - - self.base_case(sync_ack=True,test_name=self.id()) - - - def test_failover_with_noprefetch(self): - """Test failover with no prefetch""" - - self.base_case(max_prefetch=1,test_name=self.id()) - - - def test_failover_with_multiple_cons_and_ssns(self): - """Test failover with multiple connections and sessions""" - - self.base_case(use_unique_dests=True,address="amq.topic", - con_count=10,ssn_per_con=25,test_name=self.id()) diff --git a/java/tools/build.xml b/java/tools/build.xml deleted file mode 100644 index 7cd1b1172c..0000000000 --- a/java/tools/build.xml +++ /dev/null @@ -1,27 +0,0 @@ -<!-- - - - - Licensed to the Apache Software Foundation (ASF) under one - - or more contributor license agreements. See the NOTICE file - - distributed with this work for additional information - - regarding copyright ownership. The ASF licenses this file - - to you under the Apache License, Version 2.0 (the - - "License"); you may not use this file except in compliance - - with the License. You may obtain a copy of the License at - - - - http://www.apache.org/licenses/LICENSE-2.0 - - - - Unless required by applicable law or agreed to in writing, - - software distributed under the License is distributed on an - - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - - KIND, either express or implied. See the License for the - - specific language governing permissions and limitations - - under the License. - - - --> -<project name="Qpid Tools" default="build"> - - <property name="module.depends" value="client common"/> - - <import file="../module.xml"/> - -</project> diff --git a/java/tools/etc/test.log4j b/java/tools/etc/test.log4j deleted file mode 100644 index b574a7b5b7..0000000000 --- a/java/tools/etc/test.log4j +++ /dev/null @@ -1,28 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -log4j.rootLogger=${root.logging.level} - -log4j.logger.org.apache.qpid=ERROR, console -log4j.additivity.org.apache.qpid=false - -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.Threshold=all -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n - diff --git a/java/tools/src/main/java/org/apache/qpid/testkit/Client.java b/java/tools/src/main/java/org/apache/qpid/testkit/Client.java deleted file mode 100644 index b10129d855..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/testkit/Client.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit; - - -import java.text.DateFormat; -import java.text.DecimalFormat; -import java.text.NumberFormat; -import java.text.SimpleDateFormat; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Session; - -public abstract class Client implements ExceptionListener -{ - private Connection con; - private Session ssn; - private boolean durable = false; - private boolean transacted = false; - private int txSize = 10; - private int ack_mode = Session.AUTO_ACKNOWLEDGE; - private String contentType = "application/octet-stream"; - - private long reportFrequency = 60000; // every min - - private DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); - private NumberFormat nf = new DecimalFormat("##.00"); - - private long startTime = System.currentTimeMillis(); - private ErrorHandler errorHandler = null; - - public Client(Connection con) throws Exception - { - this.con = con; - this.con.setExceptionListener(this); - durable = Boolean.getBoolean("durable"); - transacted = Boolean.getBoolean("transacted"); - txSize = Integer.getInteger("tx_size",10); - contentType = System.getProperty("content_type","application/octet-stream"); - reportFrequency = Long.getLong("report_frequency", 60000); - } - - public void close() - { - try - { - con.close(); - } - catch (Exception e) - { - handleError("Error closing connection",e); - } - } - - public void onException(JMSException e) - { - handleError("Connection error",e); - } - - public void setErrorHandler(ErrorHandler h) - { - this.errorHandler = h; - } - - public void handleError(String msg,Exception e) - { - if (errorHandler != null) - { - errorHandler.handleError(msg, e); - } - else - { - System.err.println(msg); - e.printStackTrace(); - } - } - - protected Session getSsn() - { - return ssn; - } - - protected void setSsn(Session ssn) - { - this.ssn = ssn; - } - - protected boolean isDurable() - { - return durable; - } - - protected boolean isTransacted() - { - return transacted; - } - - protected int getTxSize() - { - return txSize; - } - - protected int getAck_mode() - { - return ack_mode; - } - - protected String getContentType() - { - return contentType; - } - - protected long getReportFrequency() - { - return reportFrequency; - } - - protected long getStartTime() - { - return startTime; - } - - protected void setStartTime(long startTime) - { - this.startTime = startTime; - } - - public DateFormat getDf() - { - return df; - } - -} diff --git a/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java b/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java deleted file mode 100644 index dbc73c404f..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java +++ /dev/null @@ -1,27 +0,0 @@ -package org.apache.qpid.testkit; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -public interface ErrorHandler { - - public void handleError(String msg,Exception e); -} diff --git a/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java b/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java deleted file mode 100644 index b4294ee4cc..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit; - - -import java.util.ArrayList; -import java.util.List; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.TextMessage; - -import org.apache.qpid.client.AMQAnyDestination; -import org.apache.qpid.client.AMQConnection; - -/** - * A generic receiver which consumes messages - * from a given address in a broker (host/port) - * until told to stop by killing it. - * - * It participates in a feedback loop to ensure the producer - * doesn't fill up the queue. If it receives an "End" msg - * it sends a reply to the replyTo address in that msg. - * - * It doesn't check for correctness or measure anything - * leaving those concerns to another entity. - * However it prints a timestamp every x secs(-Dreport_frequency) - * as checkpoint to figure out how far the test has progressed if - * a failure occurred. - * - * It also takes in an optional Error handler to - * pass out any error in addition to writing them to std err. - * - * This is intended more as building block to create - * more complex test cases. However there is a main method - * provided to use this standalone. - * - * The following options are available and configurable - * via jvm args. - * - * sync_rcv - Whether to consume sync (instead of using a listener). - * report_frequency - how often a timestamp is printed - * durable - * transacted - * tx_size - size of transaction batch in # msgs. * - * check_for_dups - check for duplicate messages and out of order messages. - * jms_durable_sub - create a durable subscription instead of a regular subscription. - */ -public class Receiver extends Client implements MessageListener -{ - long msg_count = 0; - int sequence = 0; - boolean syncRcv = Boolean.getBoolean("sync_rcv"); - boolean jmsDurableSub = Boolean.getBoolean("jms_durable_sub"); - boolean checkForDups = Boolean.getBoolean("check_for_dups"); - MessageConsumer consumer; - List<Integer> duplicateMessages = new ArrayList<Integer>(); - - public Receiver(Connection con,String addr) throws Exception - { - super(con); - setSsn(con.createSession(isTransacted(), getAck_mode())); - consumer = getSsn().createConsumer(new AMQAnyDestination(addr)); - if (!syncRcv) - { - consumer.setMessageListener(this); - } - - System.out.println("Receiving messages from : " + addr); - } - - public void onMessage(Message msg) - { - handleMessage(msg); - } - - public void run() throws Exception - { - long sleepTime = getReportFrequency(); - while(true) - { - if(syncRcv) - { - long t = sleepTime; - while (t > 0) - { - long start = System.currentTimeMillis(); - Message msg = consumer.receive(t); - t = t - (System.currentTimeMillis() - start); - handleMessage(msg); - } - } - Thread.sleep(sleepTime); - System.out.println(getDf().format(System.currentTimeMillis()) - + " - messages received : " + msg_count); - } - } - - private void handleMessage(Message m) - { - if (m == null) { return; } - - try - { - if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End")) - { - MessageProducer temp = getSsn().createProducer(m.getJMSReplyTo()); - Message controlMsg = getSsn().createTextMessage(); - temp.send(controlMsg); - if (isTransacted()) - { - getSsn().commit(); - } - temp.close(); - } - else - { - - int seq = m.getIntProperty("sequence"); - if (checkForDups) - { - if (seq == 0) - { - sequence = 0; // wrap around for each iteration - System.out.println("Received " + duplicateMessages.size() + " duplicate messages during the iteration"); - duplicateMessages.clear(); - } - - if (seq < sequence) - { - duplicateMessages.add(seq); - } - else if (seq == sequence) - { - sequence++; - msg_count ++; - } - else - { - // Multiple publishers are not allowed in this test case. - // So out of order messages are not allowed. - throw new Exception(": Received an out of order message (expected=" - + sequence + ",received=" + seq + ")" ); - } - } - else - { - msg_count ++; - } - - // Please note that this test case doesn't expect duplicates - // When testing for transactions. - if (isTransacted() && msg_count % getTxSize() == 0) - { - getSsn().commit(); - } - } - } - catch (Exception e) - { - e.printStackTrace(); - handleError("Exception receiving messages",e); - } - } - - // Receiver host port address - public static void main(String[] args) throws Exception - { - String host = "127.0.0.1"; - int port = 5672; - String addr = "message_queue"; - - if (args.length > 0) - { - host = args[0]; - } - if (args.length > 1) - { - port = Integer.parseInt(args[1]); - } - if (args.length > 2) - { - addr = args[2]; - } - - AMQConnection con = new AMQConnection( - "amqp://username:password@topicClientid/test?brokerlist='tcp://" - + host + ":" + port + "'"); - - Receiver rcv = new Receiver(con,addr); - rcv.run(); - } - -} diff --git a/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java b/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java deleted file mode 100644 index 14b9b7302f..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit; - - -import java.text.DateFormat; -import java.text.DecimalFormat; -import java.text.NumberFormat; -import java.text.SimpleDateFormat; -import java.util.Random; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.qpid.client.AMQAnyDestination; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.tools.MessageFactory; - -/** - * A generic sender which sends a stream of messages - * to a given address in a broker (host/port) - * until told to stop by killing it. - * - * It has a feedback loop to ensure it doesn't fill - * up queues due to a slow consumer. - * - * It doesn't check for correctness or measure anything - * leaving those concerns to another entity. - * However it prints a timestamp every x secs(-Dreport_frequency) - * as checkpoint to figure out how far the test has progressed if - * a failure occurred. - * - * It also takes in an optional Error handler to - * pass out any error in addition to writing them to std err. - * - * This is intended more as building block to create - * more complex test cases. However there is a main method - * provided to use this standalone. - * - * The following options are available and configurable - * via jvm args. - * - * msg_size (256) - * msg_count (10) - # messages before waiting for feedback - * sleep_time (1000 ms) - sleep time btw each iteration - * report_frequency - how often a timestamp is printed - * durable - * transacted - * tx_size - size of transaction batch in # msgs. - */ -public class Sender extends Client -{ - protected int msg_size = 256; - protected int msg_count = 10; - protected int iterations = -1; - protected long sleep_time = 1000; - - protected Destination dest = null; - protected Destination replyTo = null; - protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); - protected NumberFormat nf = new DecimalFormat("##.00"); - - protected MessageProducer producer; - Random gen = new Random(19770905); - - public Sender(Connection con,String addr) throws Exception - { - super(con); - this.msg_size = Integer.getInteger("msg_size", 100); - this.msg_count = Integer.getInteger("msg_count", 10); - this.iterations = Integer.getInteger("iterations", -1); - this.sleep_time = Long.getLong("sleep_time", 1000); - this.setSsn(con.createSession(isTransacted(),Session.AUTO_ACKNOWLEDGE)); - this.dest = new AMQAnyDestination(addr); - this.producer = getSsn().createProducer(dest); - this.replyTo = getSsn().createTemporaryQueue(); - - System.out.println("Sending messages to : " + addr); - } - - /* - * If msg_size not specified it generates a message - * between 500-1500 bytes. - */ - protected Message getNextMessage() throws Exception - { - int s = msg_size == -1 ? 500 + gen.nextInt(1000) : msg_size; - Message msg = (getContentType().equals("text/plain")) ? - MessageFactory.createTextMessage(getSsn(), s): - MessageFactory.createBytesMessage(getSsn(), s); - - msg.setJMSDeliveryMode((isDurable()) ? DeliveryMode.PERSISTENT - : DeliveryMode.NON_PERSISTENT); - return msg; - } - - public void run() - { - try - { - boolean infinite = (iterations == -1); - for (int x=0; infinite || x < iterations; x++) - { - long now = System.currentTimeMillis(); - if (now - getStartTime() >= getReportFrequency()) - { - System.out.println(df.format(now) + " - iterations : " + x); - setStartTime(now); - } - - for (int i = 0; i < msg_count; i++) - { - Message msg = getNextMessage(); - msg.setIntProperty("sequence",i); - producer.send(msg); - if (isTransacted() && msg_count % getTxSize() == 0) - { - getSsn().commit(); - } - } - TextMessage m = getSsn().createTextMessage("End"); - m.setJMSReplyTo(replyTo); - producer.send(m); - - if (isTransacted()) - { - getSsn().commit(); - } - - MessageConsumer feedbackConsumer = getSsn().createConsumer(replyTo); - feedbackConsumer.receive(); - feedbackConsumer.close(); - if (isTransacted()) - { - getSsn().commit(); - } - Thread.sleep(sleep_time); - } - } - catch (Exception e) - { - handleError("Exception sending messages",e); - } - } - - // Receiver host port address - public static void main(String[] args) throws Exception - { - String host = "127.0.0.1"; - int port = 5672; - String addr = "message_queue"; - - if (args.length > 0) - { - host = args[0]; - } - if (args.length > 1) - { - port = Integer.parseInt(args[1]); - } - if (args.length > 2) - { - addr = args[2]; - } - - AMQConnection con = new AMQConnection( - "amqp://username:password@topicClientid/test?brokerlist='tcp://" - + host + ":" + port + "'"); - - Sender sender = new Sender(con,addr); - sender.run(); - } -} diff --git a/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java b/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java deleted file mode 100644 index 72ca48e1c9..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java +++ /dev/null @@ -1,370 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit; - - -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; -import java.text.DateFormat; -import java.text.DecimalFormat; -import java.text.NumberFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.log4j.BasicConfigurator; -import org.apache.log4j.ConsoleAppender; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.log4j.PatternLayout; -import org.apache.qpid.client.AMQAnyDestination; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.thread.Threading; - -/** - * A basic test case class that could launch a Sender/Receiver - * or both, each on it's own separate thread. - * - * If con_count == ssn_count, then each entity created will have - * it's own Connection. Else if con_count < ssn_count, then - * a connection will be shared by ssn_count/con_count # of entities. - * - * The if both sender and receiver options are set, it will - * share a connection. - * - * The following options are available as jvm args - * host, port - * con_count,ssn_count - * con_idle_time - which determines heartbeat - * sender, receiver - booleans which indicate which entity to create. - * Setting them both is also a valid option. - */ -public class TestLauncher implements ErrorHandler -{ - protected String host = "127.0.0.1"; - protected int port = 5672; - protected int sessions_per_con = 1; - protected int connection_count = 1; - protected long heartbeat = 5000; - protected boolean sender = false; - protected boolean receiver = false; - protected boolean useUniqueDests = false; - protected String url; - - protected String address = "my_queue; {create: always}"; - protected boolean durable = false; - protected String failover = ""; - protected AMQConnection controlCon; - protected Destination controlDest = null; - protected Session controlSession = null; - protected MessageProducer statusSender; - protected List<AMQConnection> clients = new ArrayList<AMQConnection>(); - protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); - protected NumberFormat nf = new DecimalFormat("##.00"); - protected String testName; - - public TestLauncher() - { - testName = System.getProperty("test_name","UNKNOWN"); - host = System.getProperty("host", "127.0.0.1"); - port = Integer.getInteger("port", 5672); - sessions_per_con = Integer.getInteger("ssn_per_con", 1); - connection_count = Integer.getInteger("con_count", 1); - heartbeat = Long.getLong("heartbeat", 5); - sender = Boolean.getBoolean("sender"); - receiver = Boolean.getBoolean("receiver"); - useUniqueDests = Boolean.getBoolean("use_unique_dests"); - - failover = System.getProperty("failover", ""); - durable = Boolean.getBoolean("durable"); - - url = "amqp://username:password@topicClientid/test?brokerlist='tcp://" - + host + ":" + port + "?heartbeat='" + heartbeat+ "''"; - - if (failover.equalsIgnoreCase("failover_exchange")) - { - url += "&failover='failover_exchange'"; - - System.out.println("Failover exchange " + url ); - } - - configureLogging(); - } - - protected void configureLogging() - { - PatternLayout layout = new PatternLayout(); - layout.setConversionPattern("%t %d %p [%c{4}] %m%n"); - BasicConfigurator.configure(new ConsoleAppender(layout)); - - String logLevel = System.getProperty("log.level","warn"); - String logComponent = System.getProperty("log.comp","org.apache.qpid"); - - Logger logger = Logger.getLogger(logComponent); - logger.setLevel(Level.toLevel(logLevel, Level.WARN)); - - System.out.println("Level " + logger.getLevel()); - - } - - public void setUpControlChannel() - { - try - { - controlCon = new AMQConnection(url); - controlCon.start(); - - controlDest = new AMQAnyDestination("control; {create: always}"); // durable - - // Create the session to setup the messages - controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE); - statusSender = controlSession.createProducer(controlDest); - - } - catch (Exception e) - { - handleError("Error while setting up the test",e); - } - } - - public void cleanup() - { - try - { - controlSession.close(); - controlCon.close(); - for (AMQConnection con : clients) - { - con.close(); - } - } - catch (Exception e) - { - handleError("Error while tearing down the test",e); - } - } - - public void start(String addr) - { - try - { - if (addr == null) - { - addr = address; - } - - int ssn_per_con = sessions_per_con; - String addrTemp = addr; - for (int i = 0; i< connection_count; i++) - { - AMQConnection con = new AMQConnection(url); - con.start(); - clients.add(con); - for (int j = 0; j< ssn_per_con; j++) - { - String index = createPrefix(i,j); - if (useUniqueDests) - { - addrTemp = modifySubject(index,addr); - } - - if (sender) - { - createSender(index,con,addrTemp,this); - } - - if (receiver) - { - System.out.println("########## Creating receiver ##################"); - - createReceiver(index,con,addrTemp,this); - } - } - } - } - catch (Exception e) - { - handleError("Exception while setting up the test",e); - } - - } - - protected void createReceiver(String index,final AMQConnection con, final String addr, final ErrorHandler h) - { - Runnable r = new Runnable() - { - public void run() - { - try - { - Receiver rcv = new Receiver(con,addr); - rcv.setErrorHandler(h); - rcv.run(); - } - catch (Exception e) - { - h.handleError("Error Starting Receiver", e); - } - } - }; - - Thread t = null; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - handleError("Error creating Receive thread",e); - } - - t.setName("ReceiverThread-" + index); - t.start(); - } - - protected void createSender(String index,final AMQConnection con, final String addr, final ErrorHandler h) - { - Runnable r = new Runnable() - { - public void run() - { - try - { - Sender sender = new Sender(con, addr); - sender.setErrorHandler(h); - sender.run(); - } - catch (Exception e) - { - h.handleError("Error Starting Sender", e); - } - } - }; - - Thread t = null; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - handleError("Error creating Sender thread",e); - } - - t.setName("SenderThread-" + index); - t.start(); - } - - public synchronized void handleError(String msg,Exception e) - { - // In case sending the message fails - StringBuilder sb = new StringBuilder(); - sb.append(msg); - sb.append(" @ "); - sb.append(df.format(new Date(System.currentTimeMillis()))); - sb.append(" "); - sb.append(e.getMessage()); - System.err.println(sb.toString()); - e.printStackTrace(); - - try - { - TextMessage errorMsg = controlSession.createTextMessage(); - errorMsg.setStringProperty("status", "error"); - errorMsg.setStringProperty("desc", msg); - errorMsg.setStringProperty("time", df.format(new Date(System.currentTimeMillis()))); - errorMsg.setStringProperty("exception-trace", serializeStackTrace(e)); - - System.out.println("Msg " + errorMsg); - - statusSender.send(errorMsg); - } - catch (JMSException e1) - { - e1.printStackTrace(); - } - } - - private String serializeStackTrace(Exception e) - { - ByteArrayOutputStream bOut = new ByteArrayOutputStream(); - PrintStream printStream = new PrintStream(bOut); - e.printStackTrace(printStream); - printStream.close(); - return bOut.toString(); - } - - private String createPrefix(int i, int j) - { - return String.valueOf(i).concat(String.valueOf(j)); - } - - /** - * A basic helper function to modify the subjects by - * appending an index. - */ - private String modifySubject(String index,String addr) - { - if (addr.indexOf("/") > 0) - { - addr = addr.substring(0,addr.indexOf("/")+1) + - index + - addr.substring(addr.indexOf("/")+1,addr.length()); - } - else if (addr.indexOf(";") > 0) - { - addr = addr.substring(0,addr.indexOf(";")) + - "/" + index + - addr.substring(addr.indexOf(";"),addr.length()); - } - else - { - addr = addr + "/" + index; - } - - return addr; - } - - public static void main(String[] args) - { - final TestLauncher test = new TestLauncher(); - test.setUpControlChannel(); - System.out.println("args.length " + args.length); - System.out.println("args [0] " + args [0]); - test.start(args.length > 0 ? args [0] : null); - Runtime.getRuntime().addShutdownHook(new Thread() { - public void run() { test.cleanup(); } - }); - - } -} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java b/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java deleted file mode 100644 index 2390516ef0..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.tools; - -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.jms.FailoverPolicy; - -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; -import java.util.Properties; -import java.util.Hashtable; -import java.util.Enumeration; -import java.util.List; -import java.util.LinkedList; -import java.io.IOException; -import java.io.File; -import java.io.FileInputStream; - -public class JNDICheck -{ - private static final String QUEUE = "queue."; - private static final String TOPIC = "topic."; - private static final String DESTINATION = "destination."; - private static final String CONNECTION_FACTORY = "connectionfactory."; - - public static void main(String[] args) - { - - if (args.length != 1) - { - usage(); - } - - String propertyFile = args[0]; - - new JNDICheck(propertyFile); - } - - private static void usage() - { - exit("Usage: JNDICheck <JNDI Config file>", 0); - } - - private static void exit(String message, int exitCode) - { - System.err.println(message); - System.exit(exitCode); - } - - private static String JAVA_NAMING = "java.naming.factory.initial"; - - Context _context = null; - Hashtable _environment = null; - - public JNDICheck(String propertyFile) - { - - // Load JNDI properties - Properties properties = new Properties(); - - try - { - properties.load(new FileInputStream(new File(propertyFile))); - } - catch (IOException e) - { - exit("Unable to open property file:" + propertyFile + ". Due to:" + e.getMessage(), 1); - } - - //Create the initial context - try - { - - System.setProperty(JAVA_NAMING, properties.getProperty(JAVA_NAMING)); - - _context = new InitialContext(properties); - - _environment = _context.getEnvironment(); - - Enumeration keys = _environment.keys(); - - List<String> queues = new LinkedList<String>(); - List<String> topics = new LinkedList<String>(); - List<String> destinations = new LinkedList<String>(); - List<String> connectionFactories = new LinkedList<String>(); - - while (keys.hasMoreElements()) - { - String key = keys.nextElement().toString(); - - if (key.startsWith(QUEUE)) - { - queues.add(key); - } - else if (key.startsWith(TOPIC)) - { - topics.add(key); - } - else if (key.startsWith(DESTINATION)) - { - destinations.add(key); - } - else if (key.startsWith(CONNECTION_FACTORY)) - { - connectionFactories.add(key); - } - } - - printHeader(propertyFile); - printEntries(QUEUE, queues); - printEntries(TOPIC, topics); - printEntries(DESTINATION, destinations); - printEntries(CONNECTION_FACTORY, connectionFactories); - - } - catch (NamingException e) - { - exit("Unable to load JNDI Context due to:" + e.getMessage(), 1); - } - - } - - private void printHeader(String file) - { - print("JNDI file :" + file); - } - - private void printEntries(String type, List<String> list) - { - if (list.size() > 0) - { - String name = type.substring(0, 1).toUpperCase() + type.substring(1, type.length() - 1); - print(name + " elements in file:"); - printList(list); - print(""); - } - } - - private void printList(List<String> list) - { - for (String item : list) - { - String key = item.substring(item.indexOf('.') + 1); - - try - { - print(key, _context.lookup(key)); - } - catch (NamingException e) - { - exit("Error: item " + key + " no longer in context.", 1); - } - } - } - - private void print(String key, Object object) - { - if (object instanceof AMQDestination) - { - print(key + ":" + object); - } - else if (object instanceof AMQConnectionFactory) - { - AMQConnectionFactory factory = (AMQConnectionFactory) object; - print(key + ":Connection"); - print("ConnectionURL:"); - print(factory.getConnectionURL().toString()); - print("FailoverPolicy"); - print(new FailoverPolicy(factory.getConnectionURL(),null).toString()); - print(""); - } - } - - private void print(String msg) - { - System.out.println(msg); - } - -} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java b/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java deleted file mode 100644 index b88b242e6d..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java +++ /dev/null @@ -1,349 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import java.io.FileOutputStream; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import javax.jms.BytesMessage; -import javax.jms.DeliveryMode; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.TextMessage; - -import org.apache.qpid.thread.Threading; - -/** - * Latency test sends an x number of messages in warmup mode and wait for a confirmation - * from the consumer that it has successfully consumed them and ready to start the - * test. It will start sending y number of messages and each message will contain a time - * stamp. This will be used at the receiving end to measure the latency. - * - * It is important to have a sufficiently large number for the warmup count to - * ensure the system is in steady state before the test is started. - * - * If you plan to plot the latencies then msg_count should be a smaller number (ex 500 or 1000) - * You also need to specify a file name using -Dfile=/home/rajith/latency.log.1 - * - * The idea is to get a latency sample for the system once it achieves steady state. - * - */ - -public class LatencyTest extends PerfBase implements MessageListener -{ - MessageProducer producer; - MessageConsumer consumer; - Message msg; - byte[] payload; - long maxLatency = 0; - long minLatency = Long.MAX_VALUE; - long totalLatency = 0; // to calculate avg latency. - int rcvdMsgCount = 0; - double stdDev = 0; - double avgLatency = 0; - boolean warmup_mode = true; - boolean transacted = false; - int transSize = 0; - - final List<Long> latencies; - final Lock lock = new ReentrantLock(); - final Condition warmedUp; - final Condition testCompleted; - - public LatencyTest() - { - super(); - warmedUp = lock.newCondition(); - testCompleted = lock.newCondition(); - // Storing the following two for efficiency - transacted = params.isTransacted(); - transSize = params.getTransactionSize(); - latencies = new ArrayList <Long>(params.getMsgCount()); - } - - public void setUp() throws Exception - { - super.setUp(); - consumer = session.createConsumer(dest); - consumer.setMessageListener(this); - - // if message caching is enabled we pre create the message - // else we pre create the payload - if (params.isCacheMessage()) - { - msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); - msg.setJMSDeliveryMode(params.isDurable()? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - } - else - { - payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); - } - - producer = session.createProducer(dest); - producer.setDisableMessageID(params.isDisableMessageID()); - producer.setDisableMessageTimestamp(params.isDisableTimestamp()); - } - - protected Message getNextMessage() throws Exception - { - if (params.isCacheMessage()) - { - return msg; - } - else - { - msg = session.createBytesMessage(); - ((BytesMessage)msg).writeBytes(payload); - return msg; - } - } - - public void warmup()throws Exception - { - System.out.println("Warming up......"); - int count = params.getWarmupCount(); - for (int i=0; i < count; i++) - { - producer.send(getNextMessage()); - } - Message msg = session.createTextMessage("End"); - producer.send(msg); - - if (params.isTransacted()) - { - session.commit(); - } - - try - { - lock.lock(); - warmedUp.await(); - } - finally - { - lock.unlock(); - } - } - - public void onMessage(Message msg) - { - try - { - if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End")) - { - if (warmup_mode) - { - warmup_mode = false; - try - { - lock.lock(); - warmedUp.signal(); - } - finally - { - lock.unlock(); - } - } - else - { - computeStats(); - } - } - else if (!warmup_mode) - { - long time = System.currentTimeMillis(); - rcvdMsgCount ++; - - if (transacted && (rcvdMsgCount % transSize == 0)) - { - session.commit(); - } - - long latency = time - msg.getJMSTimestamp(); - latencies.add(latency); - totalLatency = totalLatency + latency; - } - - } - catch(Exception e) - { - handleError(e,"Error when receiving messages"); - } - - } - - private void computeStats() - { - avgLatency = (double)totalLatency/(double)rcvdMsgCount; - double sigma = 0; - - for (long latency: latencies) - { - maxLatency = Math.max(maxLatency, latency); - minLatency = Math.min(minLatency, latency); - sigma = sigma + Math.pow(latency - avgLatency,2); - } - - stdDev = Math.sqrt(sigma/(rcvdMsgCount -1)); - - try - { - lock.lock(); - testCompleted.signal(); - } - finally - { - lock.unlock(); - } - } - - public void writeToFile() throws Exception - { - String fileName = System.getProperty("file"); - PrintWriter writer = new PrintWriter(new FileOutputStream(fileName)); - for (long latency: latencies) - { - writer.println(String.valueOf(latency)); - } - writer.flush(); - writer.close(); - } - - public void printToConsole() - { - System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); - System.out.println(new StringBuilder("Standard Deviation : "). - append(df.format(stdDev)). - append(" ms").toString()); - System.out.println(new StringBuilder("Avg Latency : "). - append(df.format(avgLatency)). - append(" ms").toString()); - System.out.println(new StringBuilder("Min Latency : "). - append(minLatency). - append(" ms").toString()); - System.out.println(new StringBuilder("Max Latency : "). - append(maxLatency). - append(" ms").toString()); - System.out.println("Completed the test......\n"); - } - - public void startTest() throws Exception - { - System.out.println("Starting test......"); - int count = params.getMsgCount(); - - for(int i=0; i < count; i++ ) - { - Message msg = getNextMessage(); - msg.setJMSTimestamp(System.currentTimeMillis()); - producer.send(msg); - if ( transacted && ((i+1) % transSize == 0)) - { - session.commit(); - } - } - Message msg = session.createTextMessage("End"); - producer.send(msg); - if (params.isTransacted()) - { - session.commit(); - } - } - - public void tearDown() throws Exception - { - try - { - lock.lock(); - testCompleted.await(); - } - finally - { - lock.unlock(); - } - - producer.close(); - consumer.close(); - session.close(); - con.close(); - } - - public void test() - { - try - { - setUp(); - warmup(); - startTest(); - tearDown(); - } - catch(Exception e) - { - handleError(e,"Error when running test"); - } - } - - - public static void main(String[] args) - { - final LatencyTest latencyTest = new LatencyTest(); - Runnable r = new Runnable() - { - public void run() - { - latencyTest.test(); - latencyTest.printToConsole(); - if (System.getProperty("file") != null) - { - try - { - latencyTest.writeToFile(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating latency test thread",e); - } - t.start(); - } -}
\ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java b/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java deleted file mode 100644 index 8ab1379fce..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java +++ /dev/null @@ -1,64 +0,0 @@ -package org.apache.qpid.tools; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import javax.jms.BytesMessage; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.Session; -import javax.jms.TextMessage; - -public class MessageFactory -{ - public static Message createBytesMessage(Session ssn, int size) throws JMSException - { - BytesMessage msg = ssn.createBytesMessage(); - msg.writeBytes(createMessagePayload(size).getBytes()); - return msg; - } - - public static Message createTextMessage(Session ssn, int size) throws JMSException - { - TextMessage msg = ssn.createTextMessage(); - msg.setText(createMessagePayload(size)); - return msg; - } - - public static String createMessagePayload(int size) - { - String msgData = "Qpid Test Message"; - - StringBuffer buf = new StringBuffer(size); - int count = 0; - while (count <= (size - msgData.length())) - { - buf.append(msgData); - count += msgData.length(); - } - if (count < size) - { - buf.append(msgData, 0, size - count); - } - - return buf.toString(); - } -} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java deleted file mode 100644 index ac597d17de..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import java.text.DecimalFormat; -import java.util.Hashtable; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.Session; -import javax.naming.Context; -import javax.naming.InitialContext; - -import org.apache.qpid.client.AMQAnyDestination; -import org.apache.qpid.client.AMQConnection; - -public class PerfBase -{ - TestParams params; - Connection con; - Session session; - Destination dest; - Destination feedbackDest; - DecimalFormat df = new DecimalFormat("###.##"); - - public PerfBase() - { - params = new TestParams(); - } - - public void setUp() throws Exception - { - - if (params.getHost().equals("") || params.getPort() == -1) - { - con = new AMQConnection(params.getUrl()); - } - else - { - con = new AMQConnection(params.getHost(),params.getPort(),"guest","guest","test","test"); - } - con.start(); - session = con.createSession(params.isTransacted(), - params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode()); - - dest = new AMQAnyDestination(params.getAddress()); - } - - public void handleError(Exception e,String msg) - { - StringBuilder sb = new StringBuilder(); - sb.append(msg); - sb.append(" "); - sb.append(e.getMessage()); - System.err.println(sb.toString()); - e.printStackTrace(); - } -} - diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java deleted file mode 100644 index 0ef0455a64..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.TextMessage; - -import org.apache.qpid.thread.Threading; - -/** - * PerfConsumer will receive x no of messages in warmup mode. - * Once it receives the Start message it will then signal the PerfProducer. - * It will start recording stats from the first message it receives after - * the warmup mode is done. - * - * The following calculations are done. - * The important numbers to look at is - * a) Avg Latency - * b) System throughput. - * - * Latency. - * ========= - * Currently this test is written with the assumption that either - * a) The Perf Producer and Consumer are on the same machine - * b) They are on separate machines that have their time synced via a Time Server - * - * In order to calculate latency the producer inserts a timestamp - * hen the message is sent. The consumer will note the current time the message is - * received and will calculate the latency as follows - * latency = rcvdTime - msg.getJMSTimestamp() - * - * Through out the test it will keep track of the max and min latency to show the - * variance in latencies. - * - * Avg latency is measured by adding all latencies and dividing by the total msgs. - * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount - * - * Throughput - * =========== - * System throughput is calculated as follows - * rcvdMsgCount/(rcvdTime - testStartTime) - * - * Consumer rate is calculated as - * rcvdMsgCount/(rcvdTime - startTime) - * - * Note that the testStartTime referes to when the producer sent the first message - * and startTime is when the consumer first received a message. - * - * rcvdTime keeps track of when the last message is received. - * - * All throughput rates are given as msg/sec so the rates are multiplied by 1000. - * - */ - -public class PerfConsumer extends PerfBase implements MessageListener -{ - MessageConsumer consumer; - long maxLatency = 0; - long minLatency = Long.MAX_VALUE; - long totalLatency = 0; // to calculate avg latency. - int rcvdMsgCount = 0; - long testStartTime = 0; // to measure system throughput - long startTime = 0; // to measure consumer throughput - long rcvdTime = 0; - boolean transacted = false; - int transSize = 0; - - final Object lock = new Object(); - - public PerfConsumer() - { - super(); - } - - public void setUp() throws Exception - { - super.setUp(); - consumer = session.createConsumer(dest); - - // Storing the following two for efficiency - transacted = params.isTransacted(); - transSize = params.getTransactionSize(); - } - - public void warmup()throws Exception - { - System.out.println("Warming up......"); - - boolean start = false; - while (!start) - { - Message msg = consumer.receive(); - if (msg instanceof TextMessage) - { - if (((TextMessage)msg).getText().equals("End")) - { - start = true; - MessageProducer temp = session.createProducer(msg.getJMSReplyTo()); - temp.send(session.createMessage()); - if (params.isTransacted()) - { - session.commit(); - } - temp.close(); - } - } - } - } - - public void startTest() throws Exception - { - System.out.println("Starting test......"); - consumer.setMessageListener(this); - } - - public void printResults() throws Exception - { - synchronized (lock) - { - lock.wait(); - } - - double avgLatency = (double)totalLatency/(double)rcvdMsgCount; - double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000; - double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000; - System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); - System.out.println(new StringBuilder("Consumer rate : "). - append(df.format(consRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("System Throughput : "). - append(df.format(throughput)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Avg Latency : "). - append(df.format(avgLatency)). - append(" ms").toString()); - System.out.println(new StringBuilder("Min Latency : "). - append(minLatency). - append(" ms").toString()); - System.out.println(new StringBuilder("Max Latency : "). - append(maxLatency). - append(" ms").toString()); - System.out.println("Completed the test......\n"); - } - - public void notifyCompletion(Destination replyTo) throws Exception - { - MessageProducer tmp = session.createProducer(replyTo); - Message endMsg = session.createMessage(); - tmp.send(endMsg); - if (params.isTransacted()) - { - session.commit(); - } - tmp.close(); - } - - public void tearDown() throws Exception - { - consumer.close(); - session.close(); - con.close(); - } - - public void onMessage(Message msg) - { - try - { - if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End")) - { - notifyCompletion(msg.getJMSReplyTo()); - - synchronized (lock) - { - lock.notifyAll(); - } - } - else - { - rcvdTime = System.currentTimeMillis(); - rcvdMsgCount ++; - - if (rcvdMsgCount == 1) - { - startTime = rcvdTime; - testStartTime = msg.getJMSTimestamp(); - } - - if (transacted && (rcvdMsgCount % transSize == 0)) - { - session.commit(); - } - - long latency = rcvdTime - msg.getJMSTimestamp(); - maxLatency = Math.max(maxLatency, latency); - minLatency = Math.min(minLatency, latency); - totalLatency = totalLatency + latency; - } - - } - catch(Exception e) - { - handleError(e,"Error when receiving messages"); - } - - } - - public void test() - { - try - { - setUp(); - warmup(); - startTest(); - printResults(); - tearDown(); - } - catch(Exception e) - { - handleError(e,"Error when running test"); - } - } - - public static void main(String[] args) - { - final PerfConsumer cons = new PerfConsumer(); - Runnable r = new Runnable() - { - public void run() - { - cons.test(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); - } - t.start(); - } -}
\ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java deleted file mode 100644 index 015d1e6205..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - -import javax.jms.BytesMessage; -import javax.jms.DeliveryMode; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; - -import org.apache.qpid.thread.Threading; - -/** - * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation - * from the consumer that it has successfully consumed them and ready to start the - * test. It will start sending y no of messages and each message will contain a time - * stamp. This will be used at the receiving end to measure the latency. - * - * This is done with the assumption that both consumer and producer are running on - * the same machine or different machines which have time synced using a time server. - * - * This test also calculates the producer rate as follows. - * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs) - * - * All throughput rates are given as msg/sec so the rates are multiplied by 1000. - * - * Rajith - Producer rate is not an accurate perf metric IMO. - * It is heavily inlfuenced by any in memory buffering. - * System throughput and latencies calculated by the PerfConsumer are more realistic - * numbers. - * - */ -public class PerfProducer extends PerfBase -{ - MessageProducer producer; - Message msg; - byte[] payload; - List<byte[]> payloads; - boolean cacheMsg = false; - boolean randomMsgSize = false; - boolean durable = false; - Random random; - int msgSizeRange = 1024; - - public PerfProducer() - { - super(); - } - - public void setUp() throws Exception - { - super.setUp(); - feedbackDest = session.createTemporaryQueue(); - - durable = params.isDurable(); - - // if message caching is enabled we pre create the message - // else we pre create the payload - if (params.isCacheMessage()) - { - cacheMsg = true; - - msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); - msg.setJMSDeliveryMode(durable? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - } - else if (params.isRandomMsgSize()) - { - random = new Random(20080921); - randomMsgSize = true; - msgSizeRange = params.getMsgSize(); - payloads = new ArrayList<byte[]>(msgSizeRange); - - for (int i=0; i < msgSizeRange; i++) - { - payloads.add(MessageFactory.createMessagePayload(i).getBytes()); - } - } - else - { - payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); - } - - producer = session.createProducer(dest); - producer.setDisableMessageID(params.isDisableMessageID()); - producer.setDisableMessageTimestamp(params.isDisableTimestamp()); - } - - protected Message getNextMessage() throws Exception - { - if (cacheMsg) - { - return msg; - } - else - { - msg = session.createBytesMessage(); - - if (!randomMsgSize) - { - ((BytesMessage)msg).writeBytes(payload); - } - else - { - ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange))); - } - msg.setJMSDeliveryMode(durable? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - return msg; - } - } - - public void warmup()throws Exception - { - System.out.println("Warming up......"); - MessageConsumer tmp = session.createConsumer(feedbackDest); - - for (int i=0; i < params.getWarmupCount() -1; i++) - { - producer.send(getNextMessage()); - } - Message msg = session.createTextMessage("End"); - msg.setJMSReplyTo(feedbackDest); - producer.send(msg); - - if (params.isTransacted()) - { - session.commit(); - } - - tmp.receive(); - - if (params.isTransacted()) - { - session.commit(); - } - - tmp.close(); - } - - public void startTest() throws Exception - { - System.out.println("Starting test......"); - int count = params.getMsgCount(); - boolean transacted = params.isTransacted(); - int tranSize = params.getTransactionSize(); - - long start = System.currentTimeMillis(); - for(int i=0; i < count; i++ ) - { - Message msg = getNextMessage(); - msg.setJMSTimestamp(System.currentTimeMillis()); - producer.send(msg); - if ( transacted && ((i+1) % tranSize == 0)) - { - session.commit(); - } - } - long time = System.currentTimeMillis() - start; - double rate = ((double)count/(double)time)*1000; - System.out.println(new StringBuilder("Producer rate: "). - append(df.format(rate)). - append(" msg/sec"). - toString()); - } - - public void waitForCompletion() throws Exception - { - MessageConsumer tmp = session.createConsumer(feedbackDest); - Message msg = session.createTextMessage("End"); - msg.setJMSReplyTo(feedbackDest); - producer.send(msg); - - if (params.isTransacted()) - { - session.commit(); - } - - tmp.receive(); - - if (params.isTransacted()) - { - session.commit(); - } - - tmp.close(); - System.out.println("Consumer has completed the test......"); - } - - public void tearDown() throws Exception - { - producer.close(); - session.close(); - con.close(); - } - - public void test() - { - try - { - setUp(); - warmup(); - startTest(); - waitForCompletion(); - tearDown(); - } - catch(Exception e) - { - handleError(e,"Error when running test"); - } - } - - - public static void main(String[] args) - { - final PerfProducer prod = new PerfProducer(); - Runnable r = new Runnable() - { - public void run() - { - prod.test(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating producer thread",e); - } - t.start(); - } -}
\ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java deleted file mode 100644 index 602fcc6321..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java +++ /dev/null @@ -1,904 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import static org.apache.qpid.tools.QpidBench.Mode.BOTH; -import static org.apache.qpid.tools.QpidBench.Mode.CONSUME; -import static org.apache.qpid.tools.QpidBench.Mode.PUBLISH; - -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.TextMessage; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.ExchangeBind; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageDeliveryMode; -import org.apache.qpid.transport.MessageFlowMode; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.MessageSubscribe; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.QueueDeclare; -import org.apache.qpid.transport.SessionException; -import org.apache.qpid.transport.SessionListener; -import org.apache.qpid.util.UUIDGen; -import org.apache.qpid.util.UUIDs; - -/** - * QpidBench - * - */ - -public class QpidBench -{ - - static enum Mode - { - PUBLISH, CONSUME, BOTH - } - - private static class Options - { - private StringBuilder usage = new StringBuilder("qpid-bench <options>"); - - void usage(String name, String description, Object def) - { - String defval = ""; - if (def != null) - { - defval = String.format(" (%s)", def); - } - usage.append(String.format("\n %-15s%-14s %s", name, defval, description)); - } - - public String broker = "localhost"; - public int port = 5672; - public long count = 1000000; - public long window = 100000; - public long sample = window; - public int size = 1024; - public Mode mode = BOTH; - public boolean timestamp = false; - public boolean message_id = false; - public boolean message_cache = false; - public boolean persistent = false; - public boolean jms_publish = false; - public boolean jms_consume = false; - public boolean help = false; - - { - usage("-b, --broker", "the broker hostname", broker); - } - - public void parse__broker(String b) - { - this.broker = b; - } - - public void parse_b(String b) - { - parse__broker(b); - } - - { - usage("-p, --port", "the broker port", port); - } - - public void parse__port(String p) - { - this.port = Integer.parseInt(p); - } - - public void parse_p(String p) - { - parse__port(p); - } - - { - usage("-c, --count", "the number of messages to send/receive, 0 means no limit", count); - } - - public void parse__count(String c) - { - this.count = Long.parseLong(c); - } - - public void parse_c(String c) - { - parse__count(c); - } - - { - usage("-w, --window", "the number of messages to send before blocking, 0 disables", window); - } - - public void parse__window(String w) - { - this.window = Long.parseLong(w); - } - - public void parse_w(String w) - { - parse__window(w); - } - - { - usage("--sample", "print stats after this many messages, 0 disables", sample); - } - - public void parse__sample(String s) - { - this.sample = Long.parseLong(s); - } - - { - usage("-i, --interval", "sets both --window and --sample", window); - } - - public void parse__interval(String i) - { - this.window = Long.parseLong(i); - this.sample = window; - } - - public void parse_i(String i) - { - parse__interval(i); - } - - { - usage("-s, --size", "the message size", size); - } - - public void parse__size(String s) - { - this.size = Integer.parseInt(s); - } - - public void parse_s(String s) - { - parse__size(s); - } - - { - usage("-m, --mode", "one of publish, consume, or both", mode); - } - - public void parse__mode(String m) - { - if (m.equalsIgnoreCase("publish")) - { - this.mode = PUBLISH; - } - else if (m.equalsIgnoreCase("consume")) - { - this.mode = CONSUME; - } - else if (m.equalsIgnoreCase("both")) - { - this.mode = BOTH; - } - else - { - throw new IllegalArgumentException - ("must be one of 'publish', 'consume', or 'both'"); - } - } - - public void parse_m(String m) - { - parse__mode(m); - } - - { - usage("--timestamp", "set timestamps on each message if true", timestamp); - } - - public void parse__timestamp(String t) - { - this.timestamp = Boolean.parseBoolean(t); - } - - { - usage("--mesage-id", "set the message-id on each message if true", message_id); - } - - public void parse__message_id(String m) - { - this.message_id = Boolean.parseBoolean(m); - } - - { - usage("--message-cache", "reuse the same message for each send if true", message_cache); - } - - public void parse__message_cache(String c) - { - this.message_cache = Boolean.parseBoolean(c); - } - - { - usage("--persistent", "set the delivery-mode to persistent if true", persistent); - } - - public void parse__persistent(String p) - { - this.persistent = Boolean.parseBoolean(p); - } - - { - usage("--jms-publish", "use the jms client for publish", jms_publish); - } - - public void parse__jms_publish(String jp) - { - this.jms_publish = Boolean.parseBoolean(jp); - } - - { - usage("--jms-consume", "use the jms client for consume", jms_consume); - } - - public void parse__jms_consume(String jc) - { - this.jms_consume = Boolean.parseBoolean(jc); - } - - { - usage("--jms", "sets both --jms-publish and --jms-consume", false); - } - - public void parse__jms(String j) - { - this.jms_publish = this.jms_consume = Boolean.parseBoolean(j); - } - - { - usage("-h, --help", "prints this message", null); - } - - public void parse__help() - { - this.help = true; - } - - public void parse_h() - { - parse__help(); - } - - public String parse(String ... args) - { - Class klass = getClass(); - List<String> arguments = new ArrayList<String>(); - for (int i = 0; i < args.length; i++) - { - String option = args[i]; - - if (!option.startsWith("-")) - { - arguments.add(option); - continue; - } - - String method = "parse" + option.replace('-', '_'); - try - { - try - { - Method parser = klass.getMethod(method); - parser.invoke(this); - } - catch (NoSuchMethodException e) - { - try - { - Method parser = klass.getMethod(method, String.class); - - String value = null; - if (i + 1 < args.length) - { - value = args[i+1]; - i++; - } - else - { - return option + " requires a value"; - } - - parser.invoke(this, value); - } - catch (NoSuchMethodException e2) - { - return "no such option: " + option; - } - } - } - catch (InvocationTargetException e) - { - Throwable t = e.getCause(); - return String.format - ("error parsing %s: %s: %s", option, t.getClass().getName(), - t.getMessage()); - } - catch (IllegalAccessException e) - { - throw new RuntimeException - ("unable to access parse method: " + option, e); - } - } - - return parseArguments(arguments); - } - - public String parseArguments(List<String> arguments) - { - if (arguments.size() > 0) - { - String args = arguments.toString(); - return "unrecognized arguments: " + args.substring(1, args.length() - 1); - } - else - { - return null; - } - } - - public String toString() - { - Class klass = getClass(); - Field[] fields = klass.getFields(); - StringBuilder str = new StringBuilder(); - for (int i = 0; i < fields.length; i++) - { - if (i > 0) - { - str.append("\n"); - } - - String name = fields[i].getName(); - str.append(name); - str.append(" = "); - Object value; - try - { - value = fields[i].get(this); - } - catch (IllegalAccessException e) - { - throw new RuntimeException - ("unable to access field: " + name, e); - } - str.append(value); - } - - return str.toString(); - } - } - - public static final void main(String[] args) throws Exception - { - final Options opts = new Options(); - String error = opts.parse(args); - if (error != null) - { - System.err.println(error); - System.exit(-1); - return; - } - - if (opts.help) - { - System.out.println(opts.usage); - return; - } - - System.out.println(opts); - - switch (opts.mode) - { - case CONSUME: - case BOTH: - Runnable r = new Runnable() - { - public void run() - { - try - { - if (opts.jms_consume) - { - jms_consumer(opts); - } - else - { - native_consumer(opts); - } - } - catch (Exception e) - { - throw new RuntimeException(e); - } - System.out.println("Consumer Completed"); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); - } - t.start(); - break; - } - - switch (opts.mode) - { - case PUBLISH: - case BOTH: - Runnable r = new Runnable() - { - public void run() - { - try - { - if (opts.jms_publish) - { - jms_publisher(opts); - } - else - { - native_publisher(opts); - } - } - catch (Exception e) - { - throw new RuntimeException(e); - } - System.out.println("Producer Completed"); - } - }; - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating publisher thread",e); - } - t.start(); - break; - } - } - - private static enum Column - { - LEFT, RIGHT - } - - private static final void sample(Options opts, Column col, String name, long count, - long start, long time, long lastTime) - { - String pfx = ""; - String sfx = ""; - if (opts.mode == BOTH) - { - if (col == Column.RIGHT) - { - pfx = " -- "; - } - else - { - sfx = " --"; - } - } - - if (count == 0) - { - String stats = String.format("%s: %tc", name, start); - System.out.println(String.format("%s%-36s%s", pfx, stats, sfx)); - return; - } - - double cumulative = 1000 * (double) count / (double) (time - start); - double interval = 1000 * ((double) opts.sample / (double) (time - lastTime)); - - String stats = String.format - ("%s: %d %.2f %.2f", name, count, cumulative, interval); - System.out.println(String.format("%s%-36s%s", pfx, stats, sfx)); - } - - private static final javax.jms.Connection getJMSConnection(Options opts) throws Exception - { - String url = String.format - ("amqp://guest:guest@clientid/test?brokerlist='tcp://%s:%d'", - opts.broker, opts.port); - return new AMQConnection(url); - } - - private static final void jms_publisher(Options opts) throws Exception - { - javax.jms.Connection conn = getJMSConnection(opts); - - javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); - Destination dest = ssn.createQueue("test-queue"); - Destination echo_dest = ssn.createQueue("echo-queue"); - MessageProducer prod = ssn.createProducer(dest); - MessageConsumer cons = ssn.createConsumer(echo_dest); - prod.setDisableMessageID(!opts.message_id); - prod.setDisableMessageTimestamp(!opts.timestamp); - prod.setDeliveryMode(opts.persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - - StringBuilder str = new StringBuilder(); - for (int i = 0; i < opts.size; i++) - { - str.append((char) (i % 128)); - } - - String body = str.toString(); - - TextMessage cached = ssn.createTextMessage(); - cached.setText(body); - - conn.start(); - - long count = 0; - long lastTime = 0; - long start = System.currentTimeMillis(); - while (opts.count == 0 || count < opts.count) - { - if (opts.window > 0 && (count % opts.window) == 0 && count > 0) - { - Message echo = cons.receive(); - } - - if (opts.sample > 0 && (count % opts.sample) == 0) - { - long time = System.currentTimeMillis(); - sample(opts, Column.LEFT, "JP", count, start, time, lastTime); - lastTime = time; - } - - TextMessage m; - if (opts.message_cache) - { - m = cached; - } - else - { - m = ssn.createTextMessage(); - m.setText(body); - } - - prod.send(m); - count++; - } - - conn.close(); - } - - private static final void jms_consumer(final Options opts) throws Exception - { - final javax.jms.Connection conn = getJMSConnection(opts); - javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); - Destination dest = ssn.createQueue("test-queue"); - Destination echo_dest = ssn.createQueue("echo-queue"); - MessageConsumer cons = ssn.createConsumer(dest); - final MessageProducer prod = ssn.createProducer(echo_dest); - prod.setDisableMessageID(true); - prod.setDisableMessageTimestamp(true); - prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - final TextMessage echo = ssn.createTextMessage(); - echo.setText("ECHO"); - - final Object done = new Object(); - cons.setMessageListener(new MessageListener() - { - private long count = 0; - private long lastTime = 0; - private long start; - - public void onMessage(Message m) - { - if (count == 0) - { - start = System.currentTimeMillis(); - } - - try - { - boolean sample = opts.sample > 0 && (count % opts.sample) == 0; - long time = sample ? System.currentTimeMillis() : 0; - - if (opts.window > 0 && (count % opts.window) == 0) - { - prod.send(echo); - } - - if (sample) - { - sample(opts, Column.RIGHT, "JC", count, start, time, lastTime); - lastTime = time; - } - } - catch (JMSException e) - { - throw new RuntimeException(e); - } - count++; - - if (opts.count > 0 && count >= opts.count) - { - synchronized (done) - { - done.notify(); - } - } - } - }); - - conn.start(); - synchronized (done) - { - done.wait(); - } - conn.close(); - } - - private static final org.apache.qpid.transport.Connection getConnection - (Options opts) - { - org.apache.qpid.transport.Connection conn = - new org.apache.qpid.transport.Connection(); - conn.connect(opts.broker, opts.port, null, "guest", "guest",false); - return conn; - } - - private static abstract class NativeListener implements SessionListener - { - - public void opened(org.apache.qpid.transport.Session ssn) {} - - public void resumed(org.apache.qpid.transport.Session ssn) {} - - public void exception(org.apache.qpid.transport.Session ssn, - SessionException exc) - { - exc.printStackTrace(); - } - - public void closed(org.apache.qpid.transport.Session ssn) {} - - } - - private static final void native_publisher(Options opts) throws Exception - { - final long[] echos = { 0 }; - org.apache.qpid.transport.Connection conn = getConnection(opts); - org.apache.qpid.transport.Session ssn = conn.createSession(); - ssn.setSessionListener(new NativeListener() - { - public void message(org.apache.qpid.transport.Session ssn, - MessageTransfer xfr) - { - synchronized (echos) - { - echos[0]++; - echos.notify(); - } - ssn.processed(xfr); - } - }); - - ssn.invoke(new QueueDeclare().queue("test-queue").durable(false)); - ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false)); - ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue")); - ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue")); - - MessageProperties cached_mp = new MessageProperties(); - DeliveryProperties cached_dp = new DeliveryProperties(); - cached_dp.setRoutingKey("test-queue"); - cached_dp.setDeliveryMode - (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT); - - int size = opts.size; - ByteBuffer body = ByteBuffer.allocate(size); - for (int i = 0; i < size; i++) - { - body.put((byte) i); - } - body.flip(); - - ssn.invoke(new MessageSubscribe() - .queue("echo-queue") - .destination("echo-queue") - .acceptMode(MessageAcceptMode.NONE) - .acquireMode(MessageAcquireMode.PRE_ACQUIRED)); - ssn.messageSetFlowMode("echo-queue", MessageFlowMode.WINDOW); - ssn.messageFlow("echo-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF); - ssn.messageFlow("echo-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF); - - UUIDGen gen = UUIDs.newGenerator(); - - long count = 0; - long lastTime = 0; - long start = System.currentTimeMillis(); - while (opts.count == 0 || count < opts.count) - { - if (opts.window > 0 && (count % opts.window) == 0 && count > 0) - { - synchronized (echos) - { - while (echos[0] < (count/opts.window)) - { - echos.wait(); - } - } - } - - if (opts.sample > 0 && (count % opts.sample) == 0) - { - long time = System.currentTimeMillis(); - sample(opts, Column.LEFT, "NP", count, start, time, lastTime); - lastTime = time; - } - - MessageProperties mp; - DeliveryProperties dp; - if (opts.message_cache) - { - mp = cached_mp; - dp = cached_dp; - } - else - { - mp = new MessageProperties(); - dp = new DeliveryProperties(); - dp.setRoutingKey("test-queue"); - dp.setDeliveryMode - (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT); - - } - - if (opts.message_id) - { - mp.setMessageId(gen.generate()); - } - - if (opts.timestamp) - { - dp.setTimestamp(System.currentTimeMillis()); - } - - ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, - new Header(dp, mp), body.slice()); - count++; - } - - ssn.messageCancel("echo-queue"); - - ssn.sync(); - ssn.close(); - conn.close(); - } - - private static final void native_consumer(final Options opts) throws Exception - { - final DeliveryProperties dp = new DeliveryProperties(); - final byte[] echo = new byte[0]; - dp.setRoutingKey("echo-queue"); - dp.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); - final MessageProperties mp = new MessageProperties(); - final Object done = new Object(); - org.apache.qpid.transport.Connection conn = getConnection(opts); - org.apache.qpid.transport.Session ssn = conn.createSession(); - ssn.setSessionListener(new NativeListener() - { - private long count = 0; - private long lastTime = 0; - private long start; - - public void message(org.apache.qpid.transport.Session ssn, - MessageTransfer xfr) - { - if (count == 0) - { - start = System.currentTimeMillis(); - } - - boolean sample = opts.sample > 0 && (count % opts.sample) == 0; - long time = sample ? System.currentTimeMillis() : 0; - - if (opts.window > 0 && (count % opts.window) == 0) - { - ssn.messageTransfer("amq.direct", - MessageAcceptMode.NONE, - MessageAcquireMode.PRE_ACQUIRED, - new Header(dp, mp), - echo); - } - - if (sample) - { - sample(opts, Column.RIGHT, "NC", count, start, time, lastTime); - lastTime = time; - } - ssn.processed(xfr); - count++; - - if (opts.count > 0 && count >= opts.count) - { - synchronized (done) - { - done.notify(); - } - } - } - }); - - ssn.invoke(new QueueDeclare().queue("test-queue").durable(false)); - ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false)); - ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue")); - ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue")); - - ssn.invoke(new MessageSubscribe() - .queue("test-queue") - .destination("test-queue") - .acceptMode(MessageAcceptMode.NONE) - .acquireMode(MessageAcquireMode.PRE_ACQUIRED)); - ssn.messageSetFlowMode("test-queue", MessageFlowMode.WINDOW); - ssn.messageFlow("test-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF); - ssn.messageFlow("test-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF); - - synchronized (done) - { - done.wait(); - } - - ssn.messageCancel("test-queue"); - - ssn.sync(); - ssn.close(); - conn.close(); - } - -} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java deleted file mode 100644 index 89d6462a39..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import javax.jms.Session; - -public class TestParams -{ - /* - * By default the connection URL is used. - * This allows a user to easily specify a fully fledged URL any given property. - * Ex. SSL parameters - * - * By providing a host & port allows a user to simply override the URL. - * This allows to create multiple clients in test scripts easily, - * without having to deal with the long URL format. - */ - private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"; - - private String host = ""; - - private int port = -1; - - private String address = "queue; {create : always}"; - - private int msg_size = 1024; - - private int msg_type = 1; // not used yet - - private boolean cacheMessage = false; - - private boolean disableMessageID = false; - - private boolean disableTimestamp = false; - - private boolean durable = false; - - private boolean transacted = false; - - private int transaction_size = 1000; - - private int ack_mode = Session.AUTO_ACKNOWLEDGE; - - private int msg_count = 10; - - private int warmup_count = 1; - - private boolean random_msg_size = false; - - public TestParams() - { - - url = System.getProperty("url",url); - host = System.getProperty("host",""); - port = Integer.getInteger("port", -1); - address = System.getProperty("address","queue"); - - msg_size = Integer.getInteger("msg_size", 1024); - msg_type = Integer.getInteger("msg_type",1); - cacheMessage = Boolean.getBoolean("cache_msg"); - disableMessageID = Boolean.getBoolean("disableMessageID"); - disableTimestamp = Boolean.getBoolean("disableTimestamp"); - durable = Boolean.getBoolean("durable"); - transacted = Boolean.getBoolean("transacted"); - transaction_size = Integer.getInteger("trans_size",1000); - ack_mode = Integer.getInteger("ack_mode",Session.AUTO_ACKNOWLEDGE); - msg_count = Integer.getInteger("msg_count",msg_count); - warmup_count = Integer.getInteger("warmup_count",warmup_count); - random_msg_size = Boolean.getBoolean("random_msg_size"); - } - - public String getUrl() - { - return url; - } - - public String getHost() - { - return host; - } - - public int getPort() - { - return port; - } - - public String getAddress() - { - return address; - } - - public int getAckMode() - { - return ack_mode; - } - - public int getMsgCount() - { - return msg_count; - } - - public int getMsgSize() - { - return msg_size; - } - - public int getMsgType() - { - return msg_type; - } - - public boolean isDurable() - { - return durable; - } - - public boolean isTransacted() - { - return transacted; - } - - public int getTransactionSize() - { - return transaction_size; - } - - public int getWarmupCount() - { - return warmup_count; - } - - public boolean isCacheMessage() - { - return cacheMessage; - } - - public boolean isDisableMessageID() - { - return disableMessageID; - } - - public boolean isDisableTimestamp() - { - return disableTimestamp; - } - - public boolean isRandomMsgSize() - { - return random_msg_size; - } - -} |