summaryrefslogtreecommitdiff
path: root/java/tools
diff options
context:
space:
mode:
Diffstat (limited to 'java/tools')
-rw-r--r--java/tools/README.txt153
-rwxr-xr-xjava/tools/bin/perf_report.sh140
-rw-r--r--java/tools/bin/qpid-bench35
-rwxr-xr-xjava/tools/bin/qpid-python-testkit30
-rw-r--r--java/tools/bin/run_pub.sh24
-rw-r--r--java/tools/bin/run_sub.sh25
-rw-r--r--java/tools/bin/set-testkit-env.sh88
-rw-r--r--java/tools/bin/setenv.sh49
-rwxr-xr-xjava/tools/bin/testkit.py278
-rw-r--r--java/tools/build.xml27
-rw-r--r--java/tools/etc/test.log4j28
-rw-r--r--java/tools/src/main/java/org/apache/qpid/testkit/Client.java154
-rw-r--r--java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java27
-rw-r--r--java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java216
-rw-r--r--java/tools/src/main/java/org/apache/qpid/testkit/Sender.java197
-rw-r--r--java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java370
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java200
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java349
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java64
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java78
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java267
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java262
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java904
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/TestParams.java170
24 files changed, 0 insertions, 4135 deletions
diff --git a/java/tools/README.txt b/java/tools/README.txt
deleted file mode 100644
index fdde734027..0000000000
--- a/java/tools/README.txt
+++ /dev/null
@@ -1,153 +0,0 @@
-Introduction
-============
-
-The Test kit for the java client consists of 2 components.
-
-1) A Simple Perf Test that can be used to,
- a) Run a predefined perf report consisting of 8 use cases (see below)
- b) Run a producer and a consumer with a number of different options
-
-2) Soak tests that can be run for longer durations (hours or days).
-
-I am planning to add some stress tests to this module as well.
-Please note this is not a replacement for the existing perf/systests etc.
-But rather a small test kit thats focused on providing a packaged set of tests that can be quickly deployed on an environment to do quick smoke testing or easily setup a soak test.
-
-Table of Contents
-=================
-1. Perf Kit
-2. Soak Kit
-3. Perf Test use cases
-4. Soak Test use cases
-5. Running the sample perf test report
-6. Running the sample soak test report
-
-1.0 Perf Kit
-------------
-1.1 The perf kit can be packaged as an RPM or a tar file and deploy on a target environment and run the perf report.
-Or else a perf report can be automated to run every day or so an record numbers to catch perf regressions.
-
-1.2 It calculates the following results in msg/sec.
-
- System throuhgput : no_of_msgs / (time_last_msg_rcvd - time_first_msg_send)
-
- Producer rate : no_of_msgs / (time_after_sending - time_before_sending)
-
- Producer rate : no_of_msgs / (time_last_msg_rcvd - time_first_msg_rcvd)
-
- Latency : time_msg_rcvd - time_msg_sent
-
-The test will print min, max and avg latency.
-
-1.3 The test assume that both producer and consumer are run on the same machine or different machines that are time synced.
-
-1.4 You can also use run_sub.sh and run_pub.sh to run different use cases with several options.
- Please look at TestParams.java for all the configurable options.
-
-1.5 You can also use the test kit to benchmark against any vendor.
-
-
-2.0 Soak tests
---------------
-2.0 This includes a set of soak tests that can be run for a longer duration.
-
-2.1 A typical test will send x-1 messages and the xth message will contain an "End" marker.
- The producer will print the timestamp as soon as it sends the xth message.
- The consumer will reply with an empty message to the replyTo destination given in the xth message.
- The consumer prints the throuhgput for the iteration and the latency for the xth message.
- A typical value for x is 100k
-
-2.2 The feedback loop prevents the producer from overrunning the consumer.
- And the printout for every xth message will let you know how many iterations been completed at any given time.
- (Ex a simple cat log | wc -l will give you the how many iterations have been completed so far).
-
-2.2 The following results can be calculated for these tests.
-
- Memory, CPU for each producer/consumer - look at testkit/bin/run_soak_client.sh for an example
-
- You can find the Avg, Min & Max for throughput, latency, CPU and memory for the entire test run.
- (look at testkit/bin/soak_report.sh) for an example).
-
- You could also graph throughput, latency, CPU and memory using the comma separated log files.
-
-2.2 If you use different machines for producer and consumer the machines have to be time synced to have meaningful latency samples.
-
-3.0 Perf Test report use cases
--------------------------------
-3.1 Please check testkit/bin/perf_report.sh for more details
-
-3.2 A typical test run will send 1000 msgs during warmup and 200k msgs for result calculation.
-
-Test 1 Trans Queue
-
-Test 2 Dura Queue
-
-Test 3 Dura Queue Sync
-
-Test 4 Topic
-
-Test 5 Durable Topic
-
-Test 6 Fanout
-
-Test 7 Small TX (about 2 msgs per tx)
-
-Test 8 Large TX (about 1000 msgs per tx)
-
-
-4.0 Soak tests use cases
--------------------------
-4.1 Following are the current tests available in the test kit.
-
-4.2 Please refer to the source to see the javadoc and options
-
-
-1. SimpleProducer/Consumer sends X messages at a time and will wait for confirmation from producer before proceeding with the next iteration. A no of options can be configured.
-
-2. MultiThreadedProducer/Consumer does the same thing as above but runs each session in a separate thread.
- It can also send messages transactionally. Again a no of options can be configured.
-
-3. ResourceLeakTest will setup consumer/producers sends x messages and then teard down everything and continue again.
-
-
-5.0 Running the sample perf test report
----------------------------------------
-The testkit/bin contains perf_report.sh.
-It runs the above 8 use cases against a broker and print the results in a tabular format.
-
-For example
-================================================================================================
-|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|
-------------------------------------------------------------------------------------------------
-|Trans_Queue | xxxxx.xx| xxxxx.xx| xxxxx.xx| xx.xx| x| xx|
-
-
-5.1 running perf_report.sh
-
-5.1.1 set JAVA_HOME to point to Java 1.5 and above
-5.1.2 set QPID_TEST_HOME to point to the testkit dir
-5.1.3 set VENDOR_LIB to point to the Qpid (or other JMS providers) jar files.
-5.1.4 start a broker
-5.1.5 update the testkit/etc/jndi.properties to point to the correct broker
-5.1.6 execute perf_report.sh
-
-
-6.0 Running the sample soak test report
----------------------------------------
-The testkit/bin contains soak_report.sh
-It runs MultiThreadedProducer/Consumer for the duration specified and prints a report for the following stats.
-Avg, Min and Max for System Throughput, letency, CPU and memory.
-
-6.1 running soak_report.sh
-
-5.1.1 set JAVA_HOME to point to Java 1.5 and above
-5.1.2 set QPID_TEST_HOME to point to the testkit dir
-5.1.3 set JAR_PATH to point to the Qpid jars
-5.1.4 start a broker
-5.1.5 execute soak_report.sh with correct params.
- Ex sh soak_report.sh 1 36000 will run for 10 hours colllecting CPU, memory every second.
-
-5.1.6 Please note the total duration for the test is log_freq * log_iterations
- So if you want to run the test for 10 hours and collect 10 second samples then do the following
- sh soak_report.sh 10 3600
-
diff --git a/java/tools/bin/perf_report.sh b/java/tools/bin/perf_report.sh
deleted file mode 100755
index e6b4c987e5..0000000000
--- a/java/tools/bin/perf_report.sh
+++ /dev/null
@@ -1,140 +0,0 @@
-#!/bin/sh
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# This will run the 8 use cases defined below and produce
-# a report in tabular format. Refer to the documentation
-# for more details.
-
-SUB_MEM=-Xmx1024M
-PUB_MEM=-Xmx1024M
-LOG_CONFIG="-Damqj.logging.level=WARN"
-QUEUE="queue;{create:always,node:{x-declare:{auto-delete:true}}}"
-DURA_QUEUE="dqueue;{create:always,node:{durable:true,x-declare:{auto-delete:true}}}"
-TOPIC="amq.topic/test"
-DURA_TOPIC="amq.topic/test;{create:always,link:{durable:true}}"
-
-. setenv.sh
-
-waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; }
-cleanup()
-{
- pids=`ps aux | grep java | grep Perf | awk '{print $2}'`
- if [ "$pids" != "" ]; then
- kill -3 $pids
- kill -9 $pids >/dev/null 2>&1
- fi
-}
-
-# $1 test name
-# $2 consumer options
-# $3 producer options
-run_testcase()
-{
- sh run_sub.sh $LOG_CONFIG $SUB_MEM $2 > sub.out &
- waitfor sub.out "Warming up"
- sh run_pub.sh $LOG_CONFIG $PUB_MEM $3 > pub.out &
- waitfor sub.out "Completed the test"
- waitfor pub.out "Consumer has completed the test"
- sleep 2 #give a grace period to shutdown
- print_result $1
-}
-
-print_result()
-{
- prod_rate=`cat pub.out | grep "Producer rate" | awk '{print $3}'`
- sys_rate=`cat sub.out | grep "System Throughput" | awk '{print $4}'`
- cons_rate=`cat sub.out | grep "Consumer rate" | awk '{print $4}'`
- avg_latency=`cat sub.out | grep "Avg Latency" | awk '{print $4}'`
- min_latency=`cat sub.out | grep "Min Latency" | awk '{print $4}'`
- max_latency=`cat sub.out | grep "Max Latency" | awk '{print $4}'`
-
- printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11d|%11d|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency
- echo "------------------------------------------------------------------------------------------------"
-}
-
-trap cleanup EXIT
-
-echo "Test report on " `date +%F`
-echo "================================================================================================"
-echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|"
-echo "------------------------------------------------------------------------------------------------"
-
-# The message counts and warmup counts are set to very low values for quick testing of the script.
-# For a real performance run I recommend setting warmup count to 10k and message count in excess of 100k
-# However for transactions, sync_publish and especially small durable transactions (which is quite slow) I recommend
-# setting very low values to start with and experiment while increasing them slowly.
-
-# Test 1 Trans Queue
-#run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 2 Dura Queue
-run_testcase "Dura_Queue" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 3 Dura Queue Sync
-run_testcase "Dura_Queue_Sync" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent"
-
-# Test 4 Dura Queue Sync Publish and Ack
-run_testcase "Dura_SyncPubAck" "-Daddress=$DURA_QUEUE -Ddurable=true -Dsync_ack=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent"
-
-# Test 5 Topic
-run_testcase "Topic" "-Daddress=$TOPIC" "-Daddress=$TOPIC -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 6 Durable Topic
-run_testcase "Dura_Topic" "-Daddress=$DURA_TOPIC -Ddurable=true" "-Daddress=$DURA_TOPIC -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 7 Fanout
-run_testcase "Fanout" "-Daddress=amq.fanout" "-Daddress=amq.fanout -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 8 Small TX
-run_testcase "Small_Txs_2" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=1" \
- "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=1"
-
-# Test 9 Large TX
-run_testcase "Large_Txs_1000" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=10" \
- "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=10"
-
-# Test 10 256 MSG
-run_testcase "Msg_256b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=256 -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 11 512 MSG
-run_testcase "Msg_512b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=512 -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 12 2048 MSG
-run_testcase "Msg_2048b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=2048 -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 13 Random size MSG
-run_testcase "Random_Msg_Size" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 14 Random size MSG Durable
-run_testcase "Rand_Msg_Dura" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 15 64K MSG
-run_testcase "Msg_64K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 16 Durable 64K MSG
-run_testcase "Msg_Durable_64K" "-Daddress=$DURA_QUEUE -Ddurable=true -Damqj.tcpNoDelay=true" \
- "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 17 500K MSG
-run_testcase "Msg_500K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 18 Durable 500K MSG
-run_testcase "Msg_Dura_500K" "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Ddurable=true" \
- "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
diff --git a/java/tools/bin/qpid-bench b/java/tools/bin/qpid-bench
deleted file mode 100644
index c982e64efd..0000000000
--- a/java/tools/bin/qpid-bench
+++ /dev/null
@@ -1,35 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-if [ -z "$QPID_HOME" ]; then
- export QPID_HOME=$(dirname $(dirname $(readlink -f $0)))
- export PATH=${PATH}:${QPID_HOME}/bin
-fi
-
-# Set classpath to include Qpid jar with all required jars in manifest
-QPID_LIBS=$QPID_HOME/lib/qpid-all.jar
-
-# Set other variables used by the qpid-run script before calling
-export JAVA=java \
- JAVA_VM=-server \
- JAVA_MEM=-Xmx1024m \
- QPID_CLASSPATH=$QPID_LIBS
-
-. qpid-run org.apache.qpid.tools.QpidBench "$@"
diff --git a/java/tools/bin/qpid-python-testkit b/java/tools/bin/qpid-python-testkit
deleted file mode 100755
index cbe7972421..0000000000
--- a/java/tools/bin/qpid-python-testkit
+++ /dev/null
@@ -1,30 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# This is wrapper script to run the tests defined in testkit.py
-# via the python test runner. The defaults are set for a running
-# from an svn checkout
-
-. ./set-testkit-env.sh
-
-export PYTHONPATH=./:$PYTHONPATH
-rm -rf $OUTDIR
-qpid-python-test -DOUTDIR=$OUTDIR -m testkit "$@"
-
diff --git a/java/tools/bin/run_pub.sh b/java/tools/bin/run_pub.sh
deleted file mode 100644
index 91b9287dea..0000000000
--- a/java/tools/bin/run_pub.sh
+++ /dev/null
@@ -1,24 +0,0 @@
-#!/bin/sh
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-. $QPID_TEST_HOME/bin/setenv.sh
-
-echo "$@"
-$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.tools.PerfProducer
diff --git a/java/tools/bin/run_sub.sh b/java/tools/bin/run_sub.sh
deleted file mode 100644
index c9ad2fed74..0000000000
--- a/java/tools/bin/run_sub.sh
+++ /dev/null
@@ -1,25 +0,0 @@
-#!/bin/sh
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-. $QPID_TEST_HOME/bin/setenv.sh
-
-echo "$@"
-$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.tools.PerfConsumer
-
diff --git a/java/tools/bin/set-testkit-env.sh b/java/tools/bin/set-testkit-env.sh
deleted file mode 100644
index 051dad8179..0000000000
--- a/java/tools/bin/set-testkit-env.sh
+++ /dev/null
@@ -1,88 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# If QPIDD_EXEC ..etc is not set, it will first check to see
-# if this is run from a qpid svn check out, if not it will look
-# for installed rpms.
-
-abs_path()
-{
- D=`dirname "$1"`
- B=`basename "$1"`
- echo "`cd \"$D\" 2>/dev/null && pwd || echo \"$D\"`/$B"
-}
-
-# Environment for python tests
-
-if [ -d ../../../python ] ; then
- PYTHON_DIR=../../../python
- PYTHONPATH=$PYTHON_DIR:$PYTHON_DIR/qpid
-elif [ -z `echo $PYTHONPATH | awk '$0 ~ /qpid/'` ]; then
- echo "WARNING: skipping test, no qpid python scripts found ."; exit 0;
-fi
-
-
-if [ "$QPIDD_EXEC" = "" ] ; then
- if [ -x ../../../cpp/src/qpidd ]; then
- QPIDD_EXEC=`abs_path "../../../cpp/src/qpidd"`
- elif [ -n "$(which qpidd)" ] ; then
- QPIDD_EXEC=$(which qpidd)
- else
- echo "WARNING: skipping test, QPIDD_EXEC not set and qpidd not found."; exit 0;
- fi
-fi
-
-if [ "$CLUSTER_LIB" = "" ] ; then
- if [ -x ../../../cpp/src/.libs/cluster.so ]; then
- CLUSTER_LIB=`abs_path "../../../cpp/src/.libs/cluster.so"`
- elif [ -e /usr/lib64/qpid/daemon/cluster.so ] ; then
- CLUSTER_LIB="/usr/lib64/qpid/daemon/cluster.so"
- elif [ -e /usr/lib/qpid/daemon/cluster.so ] ; then
- CLUSTER_LIB="/usr/lib/qpid/daemon/cluster.so"
- else
- echo "WARNING: skipping test, CLUSTER_LIB not set and cluster.so not found."; exit 0;
- fi
-fi
-
-if [ "$STORE_LIB" = "" ] ; then
- if [ -e /usr/lib64/qpid/daemon/msgstore.so ] ; then
- STORE_LIB="/usr/lib64/qpid/daemon/msgstore.so"
- elif [ -e /usr/lib/qpid/daemon/msgstore.so ] ; then
- STORE_LIB="/usr/lib/qpid/daemon/msgstore.so"
- #else
- # echo "WARNING: skipping test, STORE_LIB not set and msgstore.so not found."; exit 0;
- fi
-fi
-
-if [ "$QP_CP" = "" ] ; then
- if [ -d ../../build/lib/ ]; then
- QP_JAR_PATH=`abs_path "../../build/lib/"`
- elif [ -d /usr/share/java/qpid-deps ]; then
- QP_JAR_PATH=`abs_path "/usr/share/java"`
- else
- "WARNING: skipping test, QP_CP not set and the Qpid jars are not present."; exit 0;
- fi
- QP_CP=`find $QP_JAR_PATH -name '*.jar' | tr '\n' ':'`
-fi
-
-if [ "$OUTDIR" = "" ] ; then
- OUTDIR=`abs_path "./output"`
-fi
-
-export PYTHONPATH PYTHON_DIR QPIDD_EXEC CLUSTER_LIB QP_CP OUTDIR
diff --git a/java/tools/bin/setenv.sh b/java/tools/bin/setenv.sh
deleted file mode 100644
index 24135e711b..0000000000
--- a/java/tools/bin/setenv.sh
+++ /dev/null
@@ -1,49 +0,0 @@
-#!/bin/sh
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Compiles the test classes and sets the CLASSPATH
-
-# check for QPID_TEST_HOME
-if [ "$QPID_TEST_HOME" = "" ] ; then
- echo "ERROR: Please set QPID_TEST_HOME ...."
- exit 1
-fi
-
-# check for JAVA_HOME
-if [ "$JAVA_HOME" = "" ] ; then
- echo "ERROR: Please set JAVA_HOME ...."
- exit 1
-fi
-
-# VENDOR_LIB path needs to be set
-# for Qpid set this to {qpid_checkout}/java/build/lib
-if [ "$VENDOR_LIB" = "" ] ; then
- echo "ERROR: Please set VENDOR_LIB path in the script ...."
- exit 1
-fi
-
-
-[ -d $QPID_TEST_HOME/classes ] || mkdir $QPID_TEST_HOME/classes
-
-CLASSPATH=`find $VENDOR_LIB -name *.jar* | tr '\n' ":"`
-$JAVA_HOME/bin/javac -cp $CLASSPATH -d $QPID_TEST_HOME/classes -sourcepath $QPID_TEST_HOME/src `find $QPID_TEST_HOME/src -name '*.java'`
-
-export CLASSPATH=$QPID_TEST_HOME/classes:$CLASSPATH
-
diff --git a/java/tools/bin/testkit.py b/java/tools/bin/testkit.py
deleted file mode 100755
index 1c2ad598b8..0000000000
--- a/java/tools/bin/testkit.py
+++ /dev/null
@@ -1,278 +0,0 @@
-#!/usr/bin/env python
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import time, string, traceback
-from brokertest import *
-from qpid.messaging import *
-
-
-try:
- import java.lang.System
- _cp = java.lang.System.getProperty("java.class.path");
-except ImportError:
- _cp = checkenv("QP_CP")
-
-class Formatter:
-
- def __init__(self, message):
- self.message = message
- self.environ = {"M": self.message,
- "P": self.message.properties,
- "C": self.message.content}
-
- def __getitem__(self, st):
- return eval(st, self.environ)
-
-# The base test case has support for launching the generic
-# receiver and sender through the TestLauncher with all the options.
-#
-class JavaClientTest(BrokerTest):
- """Base Case for Java Test cases"""
-
- client_class = "org.apache.qpid.testkit.TestLauncher"
-
- # currently there is no transparent reconnection.
- # temp hack: just creating the queue here and closing it.
- def start_error_watcher(self,broker=None):
- ssn = broker.connect().session()
- err_watcher = ssn.receiver("control; {create:always}", capacity=1)
- ssn.close()
-
- def store_module_args(self):
- if BrokerTest.store_lib:
- return ["--load-module", BrokerTest.store_lib]
- else:
- print "Store module not present."
- return [""]
-
- def client(self,**options):
- cmd = ["java","-cp",_cp]
-
- cmd += ["-Dtest_name=" + options.get("test_name", "UNKNOWN")]
- cmd += ["-Dhost=" + options.get("host","127.0.0.1")]
- cmd += ["-Dport=" + str(options.get("port",5672))]
- cmd += ["-Dcon_count=" + str(options.get("con_count",1))]
- cmd += ["-Dssn_per_con=" + str(options.get("ssn_per_con",1))]
- cmd += ["-Duse_unique_dests=" + str(options.get("use_unique_dests",False))]
- cmd += ["-Dcheck_for_dups=" + str(options.get("check_for_dups",False))]
- cmd += ["-Ddurable=" + str(options.get("durable",False))]
- cmd += ["-Dtransacted=" + str(options.get("transacted",False))]
- cmd += ["-Dreceiver=" + str(options.get("receiver",False))]
- cmd += ["-Dsync_rcv=" + str(options.get("sync_rcv",False))]
- cmd += ["-Dsender=" + str(options.get("sender",False))]
- cmd += ["-Dmsg_size=" + str(options.get("msg_size",256))]
- cmd += ["-Dtx_size=" + str(options.get("tx_size",10))]
- cmd += ["-Dmsg_count=" + str(options.get("msg_count",1000))]
- cmd += ["-Dmax_prefetch=" + str(options.get("max_prefetch",500))]
- cmd += ["-Dsync_ack=" + str(options.get("sync_ack",False))]
- cmd += ["-Dsync_persistence=" + str(options.get("sync_pub",False))]
- cmd += ["-Dsleep_time=" + str(options.get("sleep_time",1000))]
- cmd += ["-Dfailover=" + options.get("failover", "failover_exchange")]
- cmd += ["-Djms_durable_sub=" + str(options.get("jms_durable_sub", False))]
- cmd += ["-Dlog.level=" + options.get("log.level", "warn")]
- cmd += [self.client_class]
- cmd += [options.get("address", "my_queue; {create: always}")]
-
- #print str(options.get("port",5672))
- return cmd
-
- # currently there is no transparent reconnection.
- # temp hack: just creating a receiver and closing session soon after.
- def monitor_clients(self,broker=None,run_time=600,error_ck_freq=60):
- ssn = broker.connect().session()
- err_watcher = ssn.receiver("control; {create:always}", capacity=1)
- i = run_time/error_ck_freq
- is_error = False
- for j in range(i):
- not_empty = True
- while not_empty:
- try:
- m = err_watcher.fetch(timeout=error_ck_freq)
- ssn.acknowledge()
- print "Java process notified of an error"
- self.print_error(m)
- is_error = True
- except messaging.Empty, e:
- not_empty = False
-
- ssn.close()
- return is_error
-
- def print_error(self,msg):
- print msg.properties.get("exception-trace")
-
- def verify(self, receiver,sender):
- sender_running = receiver.is_running()
- receiver_running = sender.is_running()
-
- self.assertTrue(receiver_running,"Receiver has exited prematually")
- self.assertTrue(sender_running,"Sender has exited prematually")
-
- def start_sender_and_receiver(self,**options):
-
- receiver_opts = options
- receiver_opts["receiver"]=True
- receiver = self.popen(self.client(**receiver_opts),
- expect=EXPECT_RUNNING)
-
- sender_opts = options
- sender_opts["sender"]=True
- sender = self.popen(self.client(**sender_opts),
- expect=EXPECT_RUNNING)
-
- return receiver, sender
-
- def start_cluster(self,count=2,expect=EXPECT_RUNNING,**options):
- if options.get("durable",False)==True:
- cluster = Cluster(self, count=count, expect=expect, args=self.store_module_args())
- else:
- cluster = Cluster(self, count=count)
- return cluster
-
-class ConcurrencyTest(JavaClientTest):
- """A concurrency test suite for the JMS client"""
- skip = False
-
- def base_case(self,**options):
- if self.skip :
- print "Skipping test"
- return
-
- cluster = self.start_cluster(count=2,**options)
- self.start_error_watcher(broker=cluster[0])
- options["port"] = port=cluster[0].port()
-
- options["use_unique_dests"]=True
- options["address"]="amq.topic"
- receiver, sender = self.start_sender_and_receiver(**options)
- self.monitor_clients(broker=cluster[0],run_time=180)
- self.verify(receiver,sender)
-
- def test_multiplexing_con(self):
- """Tests multiple sessions on a single connection"""
-
- self.base_case(ssn_per_con=25,test_name=self.id())
-
- def test_multiplexing_con_with_tx(self):
- """Tests multiple transacted sessions on a single connection"""
-
- self.base_case(ssn_per_con=25,transacted=True,test_name=self.id())
-
- def test_multiplexing_con_with_sync_rcv(self):
- """Tests multiple sessions with sync receive"""
-
- self.base_case(ssn_per_con=25,sync_rcv=True,test_name=self.id())
-
- def test_multiplexing_con_with_durable_sub(self):
- """Tests multiple sessions with durable subs"""
-
- self.base_case(ssn_per_con=25,durable=True,jms_durable_sub=True,test_name=self.id())
-
- def test_multiplexing_con_with_sync_ack(self):
- """Tests multiple sessions with sync ack"""
-
- self.base_case(ssn_per_con=25,sync_ack=True,test_name=self.id())
-
- def test_multiplexing_con_with_sync_pub(self):
- """Tests multiple sessions with sync pub"""
-
- self.base_case(ssn_per_con=25,sync_pub=True,durable=True,test_name=self.id())
-
- def test_multiple_cons_and_ssns(self):
- """Tests multiple connections and sessions"""
-
- self.base_case(con_count=10,ssn_per_con=25,test_name=self.id())
-
-
-class SoakTest(JavaClientTest):
- """A soak test suite for the JMS client"""
-
- def base_case(self,**options):
- cluster = self.start_cluster(count=4, expect=EXPECT_EXIT_FAIL,**options)
- options["port"] = port=cluster[0].port()
- self.start_error_watcher(broker=cluster[0])
- options["use_unique_dests"]=True
- options["address"]="amq.topic"
- receiver,sender = self.start_sender_and_receiver(**options)
- is_error = self.monitor_clients(broker=cluster[0],run_time=30,error_ck_freq=30)
-
- if (is_error):
- print "The sender or receiver didn't start properly. Exiting test."
- return
- else:
- "Print no error !"
-
- # grace period for java clients to get the failover properly setup.
- time.sleep(30)
- error_msg= None
- # Kill original brokers, start new ones.
- try:
- for i in range(8):
- cluster[i].kill()
- b=cluster.start()
- self.monitor_clients(broker=b,run_time=30,error_ck_freq=30)
- print "iteration : " + str(i)
- except ConnectError, e1:
- error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1)
-
- except SessionError, e2:
- error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2)
-
- self.verify(receiver,sender)
- if error_msg:
- raise Exception(error_msg)
-
-
- def test_failover(self) :
- """Test basic failover"""
-
- self.base_case(test_name=self.id())
-
-
- def test_failover_with_durablesub(self):
- """Test failover with durable subscriber"""
-
- self.base_case(durable=True,jms_durable_sub=True,test_name=self.id())
-
-
- def test_failover_with_sync_rcv(self):
- """Test failover with sync receive"""
-
- self.base_case(sync_rcv=True,test_name=self.id())
-
-
- def test_failover_with_sync_ack(self):
- """Test failover with sync ack"""
-
- self.base_case(sync_ack=True,test_name=self.id())
-
-
- def test_failover_with_noprefetch(self):
- """Test failover with no prefetch"""
-
- self.base_case(max_prefetch=1,test_name=self.id())
-
-
- def test_failover_with_multiple_cons_and_ssns(self):
- """Test failover with multiple connections and sessions"""
-
- self.base_case(use_unique_dests=True,address="amq.topic",
- con_count=10,ssn_per_con=25,test_name=self.id())
diff --git a/java/tools/build.xml b/java/tools/build.xml
deleted file mode 100644
index 7cd1b1172c..0000000000
--- a/java/tools/build.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-<!--
- -
- - Licensed to the Apache Software Foundation (ASF) under one
- - or more contributor license agreements. See the NOTICE file
- - distributed with this work for additional information
- - regarding copyright ownership. The ASF licenses this file
- - to you under the Apache License, Version 2.0 (the
- - "License"); you may not use this file except in compliance
- - with the License. You may obtain a copy of the License at
- -
- - http://www.apache.org/licenses/LICENSE-2.0
- -
- - Unless required by applicable law or agreed to in writing,
- - software distributed under the License is distributed on an
- - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- - KIND, either express or implied. See the License for the
- - specific language governing permissions and limitations
- - under the License.
- -
- -->
-<project name="Qpid Tools" default="build">
-
- <property name="module.depends" value="client common"/>
-
- <import file="../module.xml"/>
-
-</project>
diff --git a/java/tools/etc/test.log4j b/java/tools/etc/test.log4j
deleted file mode 100644
index b574a7b5b7..0000000000
--- a/java/tools/etc/test.log4j
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-log4j.rootLogger=${root.logging.level}
-
-log4j.logger.org.apache.qpid=ERROR, console
-log4j.additivity.org.apache.qpid=false
-
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.Threshold=all
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
-
diff --git a/java/tools/src/main/java/org/apache/qpid/testkit/Client.java b/java/tools/src/main/java/org/apache/qpid/testkit/Client.java
deleted file mode 100644
index b10129d855..0000000000
--- a/java/tools/src/main/java/org/apache/qpid/testkit/Client.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit;
-
-
-import java.text.DateFormat;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.text.SimpleDateFormat;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Session;
-
-public abstract class Client implements ExceptionListener
-{
- private Connection con;
- private Session ssn;
- private boolean durable = false;
- private boolean transacted = false;
- private int txSize = 10;
- private int ack_mode = Session.AUTO_ACKNOWLEDGE;
- private String contentType = "application/octet-stream";
-
- private long reportFrequency = 60000; // every min
-
- private DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
- private NumberFormat nf = new DecimalFormat("##.00");
-
- private long startTime = System.currentTimeMillis();
- private ErrorHandler errorHandler = null;
-
- public Client(Connection con) throws Exception
- {
- this.con = con;
- this.con.setExceptionListener(this);
- durable = Boolean.getBoolean("durable");
- transacted = Boolean.getBoolean("transacted");
- txSize = Integer.getInteger("tx_size",10);
- contentType = System.getProperty("content_type","application/octet-stream");
- reportFrequency = Long.getLong("report_frequency", 60000);
- }
-
- public void close()
- {
- try
- {
- con.close();
- }
- catch (Exception e)
- {
- handleError("Error closing connection",e);
- }
- }
-
- public void onException(JMSException e)
- {
- handleError("Connection error",e);
- }
-
- public void setErrorHandler(ErrorHandler h)
- {
- this.errorHandler = h;
- }
-
- public void handleError(String msg,Exception e)
- {
- if (errorHandler != null)
- {
- errorHandler.handleError(msg, e);
- }
- else
- {
- System.err.println(msg);
- e.printStackTrace();
- }
- }
-
- protected Session getSsn()
- {
- return ssn;
- }
-
- protected void setSsn(Session ssn)
- {
- this.ssn = ssn;
- }
-
- protected boolean isDurable()
- {
- return durable;
- }
-
- protected boolean isTransacted()
- {
- return transacted;
- }
-
- protected int getTxSize()
- {
- return txSize;
- }
-
- protected int getAck_mode()
- {
- return ack_mode;
- }
-
- protected String getContentType()
- {
- return contentType;
- }
-
- protected long getReportFrequency()
- {
- return reportFrequency;
- }
-
- protected long getStartTime()
- {
- return startTime;
- }
-
- protected void setStartTime(long startTime)
- {
- this.startTime = startTime;
- }
-
- public DateFormat getDf()
- {
- return df;
- }
-
-}
diff --git a/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java b/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
deleted file mode 100644
index dbc73c404f..0000000000
--- a/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.apache.qpid.testkit;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-public interface ErrorHandler {
-
- public void handleError(String msg,Exception e);
-}
diff --git a/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java b/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java
deleted file mode 100644
index b4294ee4cc..0000000000
--- a/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit;
-
-
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.client.AMQAnyDestination;
-import org.apache.qpid.client.AMQConnection;
-
-/**
- * A generic receiver which consumes messages
- * from a given address in a broker (host/port)
- * until told to stop by killing it.
- *
- * It participates in a feedback loop to ensure the producer
- * doesn't fill up the queue. If it receives an "End" msg
- * it sends a reply to the replyTo address in that msg.
- *
- * It doesn't check for correctness or measure anything
- * leaving those concerns to another entity.
- * However it prints a timestamp every x secs(-Dreport_frequency)
- * as checkpoint to figure out how far the test has progressed if
- * a failure occurred.
- *
- * It also takes in an optional Error handler to
- * pass out any error in addition to writing them to std err.
- *
- * This is intended more as building block to create
- * more complex test cases. However there is a main method
- * provided to use this standalone.
- *
- * The following options are available and configurable
- * via jvm args.
- *
- * sync_rcv - Whether to consume sync (instead of using a listener).
- * report_frequency - how often a timestamp is printed
- * durable
- * transacted
- * tx_size - size of transaction batch in # msgs. *
- * check_for_dups - check for duplicate messages and out of order messages.
- * jms_durable_sub - create a durable subscription instead of a regular subscription.
- */
-public class Receiver extends Client implements MessageListener
-{
- long msg_count = 0;
- int sequence = 0;
- boolean syncRcv = Boolean.getBoolean("sync_rcv");
- boolean jmsDurableSub = Boolean.getBoolean("jms_durable_sub");
- boolean checkForDups = Boolean.getBoolean("check_for_dups");
- MessageConsumer consumer;
- List<Integer> duplicateMessages = new ArrayList<Integer>();
-
- public Receiver(Connection con,String addr) throws Exception
- {
- super(con);
- setSsn(con.createSession(isTransacted(), getAck_mode()));
- consumer = getSsn().createConsumer(new AMQAnyDestination(addr));
- if (!syncRcv)
- {
- consumer.setMessageListener(this);
- }
-
- System.out.println("Receiving messages from : " + addr);
- }
-
- public void onMessage(Message msg)
- {
- handleMessage(msg);
- }
-
- public void run() throws Exception
- {
- long sleepTime = getReportFrequency();
- while(true)
- {
- if(syncRcv)
- {
- long t = sleepTime;
- while (t > 0)
- {
- long start = System.currentTimeMillis();
- Message msg = consumer.receive(t);
- t = t - (System.currentTimeMillis() - start);
- handleMessage(msg);
- }
- }
- Thread.sleep(sleepTime);
- System.out.println(getDf().format(System.currentTimeMillis())
- + " - messages received : " + msg_count);
- }
- }
-
- private void handleMessage(Message m)
- {
- if (m == null) { return; }
-
- try
- {
- if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End"))
- {
- MessageProducer temp = getSsn().createProducer(m.getJMSReplyTo());
- Message controlMsg = getSsn().createTextMessage();
- temp.send(controlMsg);
- if (isTransacted())
- {
- getSsn().commit();
- }
- temp.close();
- }
- else
- {
-
- int seq = m.getIntProperty("sequence");
- if (checkForDups)
- {
- if (seq == 0)
- {
- sequence = 0; // wrap around for each iteration
- System.out.println("Received " + duplicateMessages.size() + " duplicate messages during the iteration");
- duplicateMessages.clear();
- }
-
- if (seq < sequence)
- {
- duplicateMessages.add(seq);
- }
- else if (seq == sequence)
- {
- sequence++;
- msg_count ++;
- }
- else
- {
- // Multiple publishers are not allowed in this test case.
- // So out of order messages are not allowed.
- throw new Exception(": Received an out of order message (expected="
- + sequence + ",received=" + seq + ")" );
- }
- }
- else
- {
- msg_count ++;
- }
-
- // Please note that this test case doesn't expect duplicates
- // When testing for transactions.
- if (isTransacted() && msg_count % getTxSize() == 0)
- {
- getSsn().commit();
- }
- }
- }
- catch (Exception e)
- {
- e.printStackTrace();
- handleError("Exception receiving messages",e);
- }
- }
-
- // Receiver host port address
- public static void main(String[] args) throws Exception
- {
- String host = "127.0.0.1";
- int port = 5672;
- String addr = "message_queue";
-
- if (args.length > 0)
- {
- host = args[0];
- }
- if (args.length > 1)
- {
- port = Integer.parseInt(args[1]);
- }
- if (args.length > 2)
- {
- addr = args[2];
- }
-
- AMQConnection con = new AMQConnection(
- "amqp://username:password@topicClientid/test?brokerlist='tcp://"
- + host + ":" + port + "'");
-
- Receiver rcv = new Receiver(con,addr);
- rcv.run();
- }
-
-}
diff --git a/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java b/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java
deleted file mode 100644
index 14b9b7302f..0000000000
--- a/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit;
-
-
-import java.text.DateFormat;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.text.SimpleDateFormat;
-import java.util.Random;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.client.AMQAnyDestination;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.tools.MessageFactory;
-
-/**
- * A generic sender which sends a stream of messages
- * to a given address in a broker (host/port)
- * until told to stop by killing it.
- *
- * It has a feedback loop to ensure it doesn't fill
- * up queues due to a slow consumer.
- *
- * It doesn't check for correctness or measure anything
- * leaving those concerns to another entity.
- * However it prints a timestamp every x secs(-Dreport_frequency)
- * as checkpoint to figure out how far the test has progressed if
- * a failure occurred.
- *
- * It also takes in an optional Error handler to
- * pass out any error in addition to writing them to std err.
- *
- * This is intended more as building block to create
- * more complex test cases. However there is a main method
- * provided to use this standalone.
- *
- * The following options are available and configurable
- * via jvm args.
- *
- * msg_size (256)
- * msg_count (10) - # messages before waiting for feedback
- * sleep_time (1000 ms) - sleep time btw each iteration
- * report_frequency - how often a timestamp is printed
- * durable
- * transacted
- * tx_size - size of transaction batch in # msgs.
- */
-public class Sender extends Client
-{
- protected int msg_size = 256;
- protected int msg_count = 10;
- protected int iterations = -1;
- protected long sleep_time = 1000;
-
- protected Destination dest = null;
- protected Destination replyTo = null;
- protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
- protected NumberFormat nf = new DecimalFormat("##.00");
-
- protected MessageProducer producer;
- Random gen = new Random(19770905);
-
- public Sender(Connection con,String addr) throws Exception
- {
- super(con);
- this.msg_size = Integer.getInteger("msg_size", 100);
- this.msg_count = Integer.getInteger("msg_count", 10);
- this.iterations = Integer.getInteger("iterations", -1);
- this.sleep_time = Long.getLong("sleep_time", 1000);
- this.setSsn(con.createSession(isTransacted(),Session.AUTO_ACKNOWLEDGE));
- this.dest = new AMQAnyDestination(addr);
- this.producer = getSsn().createProducer(dest);
- this.replyTo = getSsn().createTemporaryQueue();
-
- System.out.println("Sending messages to : " + addr);
- }
-
- /*
- * If msg_size not specified it generates a message
- * between 500-1500 bytes.
- */
- protected Message getNextMessage() throws Exception
- {
- int s = msg_size == -1 ? 500 + gen.nextInt(1000) : msg_size;
- Message msg = (getContentType().equals("text/plain")) ?
- MessageFactory.createTextMessage(getSsn(), s):
- MessageFactory.createBytesMessage(getSsn(), s);
-
- msg.setJMSDeliveryMode((isDurable()) ? DeliveryMode.PERSISTENT
- : DeliveryMode.NON_PERSISTENT);
- return msg;
- }
-
- public void run()
- {
- try
- {
- boolean infinite = (iterations == -1);
- for (int x=0; infinite || x < iterations; x++)
- {
- long now = System.currentTimeMillis();
- if (now - getStartTime() >= getReportFrequency())
- {
- System.out.println(df.format(now) + " - iterations : " + x);
- setStartTime(now);
- }
-
- for (int i = 0; i < msg_count; i++)
- {
- Message msg = getNextMessage();
- msg.setIntProperty("sequence",i);
- producer.send(msg);
- if (isTransacted() && msg_count % getTxSize() == 0)
- {
- getSsn().commit();
- }
- }
- TextMessage m = getSsn().createTextMessage("End");
- m.setJMSReplyTo(replyTo);
- producer.send(m);
-
- if (isTransacted())
- {
- getSsn().commit();
- }
-
- MessageConsumer feedbackConsumer = getSsn().createConsumer(replyTo);
- feedbackConsumer.receive();
- feedbackConsumer.close();
- if (isTransacted())
- {
- getSsn().commit();
- }
- Thread.sleep(sleep_time);
- }
- }
- catch (Exception e)
- {
- handleError("Exception sending messages",e);
- }
- }
-
- // Receiver host port address
- public static void main(String[] args) throws Exception
- {
- String host = "127.0.0.1";
- int port = 5672;
- String addr = "message_queue";
-
- if (args.length > 0)
- {
- host = args[0];
- }
- if (args.length > 1)
- {
- port = Integer.parseInt(args[1]);
- }
- if (args.length > 2)
- {
- addr = args[2];
- }
-
- AMQConnection con = new AMQConnection(
- "amqp://username:password@topicClientid/test?brokerlist='tcp://"
- + host + ":" + port + "'");
-
- Sender sender = new Sender(con,addr);
- sender.run();
- }
-}
diff --git a/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java b/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java
deleted file mode 100644
index 72ca48e1c9..0000000000
--- a/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit;
-
-
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.text.DateFormat;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.log4j.BasicConfigurator;
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
-import org.apache.qpid.client.AMQAnyDestination;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.thread.Threading;
-
-/**
- * A basic test case class that could launch a Sender/Receiver
- * or both, each on it's own separate thread.
- *
- * If con_count == ssn_count, then each entity created will have
- * it's own Connection. Else if con_count < ssn_count, then
- * a connection will be shared by ssn_count/con_count # of entities.
- *
- * The if both sender and receiver options are set, it will
- * share a connection.
- *
- * The following options are available as jvm args
- * host, port
- * con_count,ssn_count
- * con_idle_time - which determines heartbeat
- * sender, receiver - booleans which indicate which entity to create.
- * Setting them both is also a valid option.
- */
-public class TestLauncher implements ErrorHandler
-{
- protected String host = "127.0.0.1";
- protected int port = 5672;
- protected int sessions_per_con = 1;
- protected int connection_count = 1;
- protected long heartbeat = 5000;
- protected boolean sender = false;
- protected boolean receiver = false;
- protected boolean useUniqueDests = false;
- protected String url;
-
- protected String address = "my_queue; {create: always}";
- protected boolean durable = false;
- protected String failover = "";
- protected AMQConnection controlCon;
- protected Destination controlDest = null;
- protected Session controlSession = null;
- protected MessageProducer statusSender;
- protected List<AMQConnection> clients = new ArrayList<AMQConnection>();
- protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
- protected NumberFormat nf = new DecimalFormat("##.00");
- protected String testName;
-
- public TestLauncher()
- {
- testName = System.getProperty("test_name","UNKNOWN");
- host = System.getProperty("host", "127.0.0.1");
- port = Integer.getInteger("port", 5672);
- sessions_per_con = Integer.getInteger("ssn_per_con", 1);
- connection_count = Integer.getInteger("con_count", 1);
- heartbeat = Long.getLong("heartbeat", 5);
- sender = Boolean.getBoolean("sender");
- receiver = Boolean.getBoolean("receiver");
- useUniqueDests = Boolean.getBoolean("use_unique_dests");
-
- failover = System.getProperty("failover", "");
- durable = Boolean.getBoolean("durable");
-
- url = "amqp://username:password@topicClientid/test?brokerlist='tcp://"
- + host + ":" + port + "?heartbeat='" + heartbeat+ "''";
-
- if (failover.equalsIgnoreCase("failover_exchange"))
- {
- url += "&failover='failover_exchange'";
-
- System.out.println("Failover exchange " + url );
- }
-
- configureLogging();
- }
-
- protected void configureLogging()
- {
- PatternLayout layout = new PatternLayout();
- layout.setConversionPattern("%t %d %p [%c{4}] %m%n");
- BasicConfigurator.configure(new ConsoleAppender(layout));
-
- String logLevel = System.getProperty("log.level","warn");
- String logComponent = System.getProperty("log.comp","org.apache.qpid");
-
- Logger logger = Logger.getLogger(logComponent);
- logger.setLevel(Level.toLevel(logLevel, Level.WARN));
-
- System.out.println("Level " + logger.getLevel());
-
- }
-
- public void setUpControlChannel()
- {
- try
- {
- controlCon = new AMQConnection(url);
- controlCon.start();
-
- controlDest = new AMQAnyDestination("control; {create: always}"); // durable
-
- // Create the session to setup the messages
- controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
- statusSender = controlSession.createProducer(controlDest);
-
- }
- catch (Exception e)
- {
- handleError("Error while setting up the test",e);
- }
- }
-
- public void cleanup()
- {
- try
- {
- controlSession.close();
- controlCon.close();
- for (AMQConnection con : clients)
- {
- con.close();
- }
- }
- catch (Exception e)
- {
- handleError("Error while tearing down the test",e);
- }
- }
-
- public void start(String addr)
- {
- try
- {
- if (addr == null)
- {
- addr = address;
- }
-
- int ssn_per_con = sessions_per_con;
- String addrTemp = addr;
- for (int i = 0; i< connection_count; i++)
- {
- AMQConnection con = new AMQConnection(url);
- con.start();
- clients.add(con);
- for (int j = 0; j< ssn_per_con; j++)
- {
- String index = createPrefix(i,j);
- if (useUniqueDests)
- {
- addrTemp = modifySubject(index,addr);
- }
-
- if (sender)
- {
- createSender(index,con,addrTemp,this);
- }
-
- if (receiver)
- {
- System.out.println("########## Creating receiver ##################");
-
- createReceiver(index,con,addrTemp,this);
- }
- }
- }
- }
- catch (Exception e)
- {
- handleError("Exception while setting up the test",e);
- }
-
- }
-
- protected void createReceiver(String index,final AMQConnection con, final String addr, final ErrorHandler h)
- {
- Runnable r = new Runnable()
- {
- public void run()
- {
- try
- {
- Receiver rcv = new Receiver(con,addr);
- rcv.setErrorHandler(h);
- rcv.run();
- }
- catch (Exception e)
- {
- h.handleError("Error Starting Receiver", e);
- }
- }
- };
-
- Thread t = null;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- handleError("Error creating Receive thread",e);
- }
-
- t.setName("ReceiverThread-" + index);
- t.start();
- }
-
- protected void createSender(String index,final AMQConnection con, final String addr, final ErrorHandler h)
- {
- Runnable r = new Runnable()
- {
- public void run()
- {
- try
- {
- Sender sender = new Sender(con, addr);
- sender.setErrorHandler(h);
- sender.run();
- }
- catch (Exception e)
- {
- h.handleError("Error Starting Sender", e);
- }
- }
- };
-
- Thread t = null;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- handleError("Error creating Sender thread",e);
- }
-
- t.setName("SenderThread-" + index);
- t.start();
- }
-
- public synchronized void handleError(String msg,Exception e)
- {
- // In case sending the message fails
- StringBuilder sb = new StringBuilder();
- sb.append(msg);
- sb.append(" @ ");
- sb.append(df.format(new Date(System.currentTimeMillis())));
- sb.append(" ");
- sb.append(e.getMessage());
- System.err.println(sb.toString());
- e.printStackTrace();
-
- try
- {
- TextMessage errorMsg = controlSession.createTextMessage();
- errorMsg.setStringProperty("status", "error");
- errorMsg.setStringProperty("desc", msg);
- errorMsg.setStringProperty("time", df.format(new Date(System.currentTimeMillis())));
- errorMsg.setStringProperty("exception-trace", serializeStackTrace(e));
-
- System.out.println("Msg " + errorMsg);
-
- statusSender.send(errorMsg);
- }
- catch (JMSException e1)
- {
- e1.printStackTrace();
- }
- }
-
- private String serializeStackTrace(Exception e)
- {
- ByteArrayOutputStream bOut = new ByteArrayOutputStream();
- PrintStream printStream = new PrintStream(bOut);
- e.printStackTrace(printStream);
- printStream.close();
- return bOut.toString();
- }
-
- private String createPrefix(int i, int j)
- {
- return String.valueOf(i).concat(String.valueOf(j));
- }
-
- /**
- * A basic helper function to modify the subjects by
- * appending an index.
- */
- private String modifySubject(String index,String addr)
- {
- if (addr.indexOf("/") > 0)
- {
- addr = addr.substring(0,addr.indexOf("/")+1) +
- index +
- addr.substring(addr.indexOf("/")+1,addr.length());
- }
- else if (addr.indexOf(";") > 0)
- {
- addr = addr.substring(0,addr.indexOf(";")) +
- "/" + index +
- addr.substring(addr.indexOf(";"),addr.length());
- }
- else
- {
- addr = addr + "/" + index;
- }
-
- return addr;
- }
-
- public static void main(String[] args)
- {
- final TestLauncher test = new TestLauncher();
- test.setUpControlChannel();
- System.out.println("args.length " + args.length);
- System.out.println("args [0] " + args [0]);
- test.start(args.length > 0 ? args [0] : null);
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() { test.cleanup(); }
- });
-
- }
-}
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java b/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java
deleted file mode 100644
index 2390516ef0..0000000000
--- a/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.tools;
-
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.jms.FailoverPolicy;
-
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-import java.util.Properties;
-import java.util.Hashtable;
-import java.util.Enumeration;
-import java.util.List;
-import java.util.LinkedList;
-import java.io.IOException;
-import java.io.File;
-import java.io.FileInputStream;
-
-public class JNDICheck
-{
- private static final String QUEUE = "queue.";
- private static final String TOPIC = "topic.";
- private static final String DESTINATION = "destination.";
- private static final String CONNECTION_FACTORY = "connectionfactory.";
-
- public static void main(String[] args)
- {
-
- if (args.length != 1)
- {
- usage();
- }
-
- String propertyFile = args[0];
-
- new JNDICheck(propertyFile);
- }
-
- private static void usage()
- {
- exit("Usage: JNDICheck <JNDI Config file>", 0);
- }
-
- private static void exit(String message, int exitCode)
- {
- System.err.println(message);
- System.exit(exitCode);
- }
-
- private static String JAVA_NAMING = "java.naming.factory.initial";
-
- Context _context = null;
- Hashtable _environment = null;
-
- public JNDICheck(String propertyFile)
- {
-
- // Load JNDI properties
- Properties properties = new Properties();
-
- try
- {
- properties.load(new FileInputStream(new File(propertyFile)));
- }
- catch (IOException e)
- {
- exit("Unable to open property file:" + propertyFile + ". Due to:" + e.getMessage(), 1);
- }
-
- //Create the initial context
- try
- {
-
- System.setProperty(JAVA_NAMING, properties.getProperty(JAVA_NAMING));
-
- _context = new InitialContext(properties);
-
- _environment = _context.getEnvironment();
-
- Enumeration keys = _environment.keys();
-
- List<String> queues = new LinkedList<String>();
- List<String> topics = new LinkedList<String>();
- List<String> destinations = new LinkedList<String>();
- List<String> connectionFactories = new LinkedList<String>();
-
- while (keys.hasMoreElements())
- {
- String key = keys.nextElement().toString();
-
- if (key.startsWith(QUEUE))
- {
- queues.add(key);
- }
- else if (key.startsWith(TOPIC))
- {
- topics.add(key);
- }
- else if (key.startsWith(DESTINATION))
- {
- destinations.add(key);
- }
- else if (key.startsWith(CONNECTION_FACTORY))
- {
- connectionFactories.add(key);
- }
- }
-
- printHeader(propertyFile);
- printEntries(QUEUE, queues);
- printEntries(TOPIC, topics);
- printEntries(DESTINATION, destinations);
- printEntries(CONNECTION_FACTORY, connectionFactories);
-
- }
- catch (NamingException e)
- {
- exit("Unable to load JNDI Context due to:" + e.getMessage(), 1);
- }
-
- }
-
- private void printHeader(String file)
- {
- print("JNDI file :" + file);
- }
-
- private void printEntries(String type, List<String> list)
- {
- if (list.size() > 0)
- {
- String name = type.substring(0, 1).toUpperCase() + type.substring(1, type.length() - 1);
- print(name + " elements in file:");
- printList(list);
- print("");
- }
- }
-
- private void printList(List<String> list)
- {
- for (String item : list)
- {
- String key = item.substring(item.indexOf('.') + 1);
-
- try
- {
- print(key, _context.lookup(key));
- }
- catch (NamingException e)
- {
- exit("Error: item " + key + " no longer in context.", 1);
- }
- }
- }
-
- private void print(String key, Object object)
- {
- if (object instanceof AMQDestination)
- {
- print(key + ":" + object);
- }
- else if (object instanceof AMQConnectionFactory)
- {
- AMQConnectionFactory factory = (AMQConnectionFactory) object;
- print(key + ":Connection");
- print("ConnectionURL:");
- print(factory.getConnectionURL().toString());
- print("FailoverPolicy");
- print(new FailoverPolicy(factory.getConnectionURL(),null).toString());
- print("");
- }
- }
-
- private void print(String msg)
- {
- System.out.println(msg);
- }
-
-}
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java b/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java
deleted file mode 100644
index b88b242e6d..0000000000
--- a/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.io.FileOutputStream;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.thread.Threading;
-
-/**
- * Latency test sends an x number of messages in warmup mode and wait for a confirmation
- * from the consumer that it has successfully consumed them and ready to start the
- * test. It will start sending y number of messages and each message will contain a time
- * stamp. This will be used at the receiving end to measure the latency.
- *
- * It is important to have a sufficiently large number for the warmup count to
- * ensure the system is in steady state before the test is started.
- *
- * If you plan to plot the latencies then msg_count should be a smaller number (ex 500 or 1000)
- * You also need to specify a file name using -Dfile=/home/rajith/latency.log.1
- *
- * The idea is to get a latency sample for the system once it achieves steady state.
- *
- */
-
-public class LatencyTest extends PerfBase implements MessageListener
-{
- MessageProducer producer;
- MessageConsumer consumer;
- Message msg;
- byte[] payload;
- long maxLatency = 0;
- long minLatency = Long.MAX_VALUE;
- long totalLatency = 0; // to calculate avg latency.
- int rcvdMsgCount = 0;
- double stdDev = 0;
- double avgLatency = 0;
- boolean warmup_mode = true;
- boolean transacted = false;
- int transSize = 0;
-
- final List<Long> latencies;
- final Lock lock = new ReentrantLock();
- final Condition warmedUp;
- final Condition testCompleted;
-
- public LatencyTest()
- {
- super();
- warmedUp = lock.newCondition();
- testCompleted = lock.newCondition();
- // Storing the following two for efficiency
- transacted = params.isTransacted();
- transSize = params.getTransactionSize();
- latencies = new ArrayList <Long>(params.getMsgCount());
- }
-
- public void setUp() throws Exception
- {
- super.setUp();
- consumer = session.createConsumer(dest);
- consumer.setMessageListener(this);
-
- // if message caching is enabled we pre create the message
- // else we pre create the payload
- if (params.isCacheMessage())
- {
- msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
- msg.setJMSDeliveryMode(params.isDurable()?
- DeliveryMode.PERSISTENT :
- DeliveryMode.NON_PERSISTENT
- );
- }
- else
- {
- payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
- }
-
- producer = session.createProducer(dest);
- producer.setDisableMessageID(params.isDisableMessageID());
- producer.setDisableMessageTimestamp(params.isDisableTimestamp());
- }
-
- protected Message getNextMessage() throws Exception
- {
- if (params.isCacheMessage())
- {
- return msg;
- }
- else
- {
- msg = session.createBytesMessage();
- ((BytesMessage)msg).writeBytes(payload);
- return msg;
- }
- }
-
- public void warmup()throws Exception
- {
- System.out.println("Warming up......");
- int count = params.getWarmupCount();
- for (int i=0; i < count; i++)
- {
- producer.send(getNextMessage());
- }
- Message msg = session.createTextMessage("End");
- producer.send(msg);
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- try
- {
- lock.lock();
- warmedUp.await();
- }
- finally
- {
- lock.unlock();
- }
- }
-
- public void onMessage(Message msg)
- {
- try
- {
- if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End"))
- {
- if (warmup_mode)
- {
- warmup_mode = false;
- try
- {
- lock.lock();
- warmedUp.signal();
- }
- finally
- {
- lock.unlock();
- }
- }
- else
- {
- computeStats();
- }
- }
- else if (!warmup_mode)
- {
- long time = System.currentTimeMillis();
- rcvdMsgCount ++;
-
- if (transacted && (rcvdMsgCount % transSize == 0))
- {
- session.commit();
- }
-
- long latency = time - msg.getJMSTimestamp();
- latencies.add(latency);
- totalLatency = totalLatency + latency;
- }
-
- }
- catch(Exception e)
- {
- handleError(e,"Error when receiving messages");
- }
-
- }
-
- private void computeStats()
- {
- avgLatency = (double)totalLatency/(double)rcvdMsgCount;
- double sigma = 0;
-
- for (long latency: latencies)
- {
- maxLatency = Math.max(maxLatency, latency);
- minLatency = Math.min(minLatency, latency);
- sigma = sigma + Math.pow(latency - avgLatency,2);
- }
-
- stdDev = Math.sqrt(sigma/(rcvdMsgCount -1));
-
- try
- {
- lock.lock();
- testCompleted.signal();
- }
- finally
- {
- lock.unlock();
- }
- }
-
- public void writeToFile() throws Exception
- {
- String fileName = System.getProperty("file");
- PrintWriter writer = new PrintWriter(new FileOutputStream(fileName));
- for (long latency: latencies)
- {
- writer.println(String.valueOf(latency));
- }
- writer.flush();
- writer.close();
- }
-
- public void printToConsole()
- {
- System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
- System.out.println(new StringBuilder("Standard Deviation : ").
- append(df.format(stdDev)).
- append(" ms").toString());
- System.out.println(new StringBuilder("Avg Latency : ").
- append(df.format(avgLatency)).
- append(" ms").toString());
- System.out.println(new StringBuilder("Min Latency : ").
- append(minLatency).
- append(" ms").toString());
- System.out.println(new StringBuilder("Max Latency : ").
- append(maxLatency).
- append(" ms").toString());
- System.out.println("Completed the test......\n");
- }
-
- public void startTest() throws Exception
- {
- System.out.println("Starting test......");
- int count = params.getMsgCount();
-
- for(int i=0; i < count; i++ )
- {
- Message msg = getNextMessage();
- msg.setJMSTimestamp(System.currentTimeMillis());
- producer.send(msg);
- if ( transacted && ((i+1) % transSize == 0))
- {
- session.commit();
- }
- }
- Message msg = session.createTextMessage("End");
- producer.send(msg);
- if (params.isTransacted())
- {
- session.commit();
- }
- }
-
- public void tearDown() throws Exception
- {
- try
- {
- lock.lock();
- testCompleted.await();
- }
- finally
- {
- lock.unlock();
- }
-
- producer.close();
- consumer.close();
- session.close();
- con.close();
- }
-
- public void test()
- {
- try
- {
- setUp();
- warmup();
- startTest();
- tearDown();
- }
- catch(Exception e)
- {
- handleError(e,"Error when running test");
- }
- }
-
-
- public static void main(String[] args)
- {
- final LatencyTest latencyTest = new LatencyTest();
- Runnable r = new Runnable()
- {
- public void run()
- {
- latencyTest.test();
- latencyTest.printToConsole();
- if (System.getProperty("file") != null)
- {
- try
- {
- latencyTest.writeToFile();
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating latency test thread",e);
- }
- t.start();
- }
-} \ No newline at end of file
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java b/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java
deleted file mode 100644
index 8ab1379fce..0000000000
--- a/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package org.apache.qpid.tools;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-public class MessageFactory
-{
- public static Message createBytesMessage(Session ssn, int size) throws JMSException
- {
- BytesMessage msg = ssn.createBytesMessage();
- msg.writeBytes(createMessagePayload(size).getBytes());
- return msg;
- }
-
- public static Message createTextMessage(Session ssn, int size) throws JMSException
- {
- TextMessage msg = ssn.createTextMessage();
- msg.setText(createMessagePayload(size));
- return msg;
- }
-
- public static String createMessagePayload(int size)
- {
- String msgData = "Qpid Test Message";
-
- StringBuffer buf = new StringBuffer(size);
- int count = 0;
- while (count <= (size - msgData.length()))
- {
- buf.append(msgData);
- count += msgData.length();
- }
- if (count < size)
- {
- buf.append(msgData, 0, size - count);
- }
-
- return buf.toString();
- }
-}
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
deleted file mode 100644
index ac597d17de..0000000000
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.text.DecimalFormat;
-import java.util.Hashtable;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.Session;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-
-import org.apache.qpid.client.AMQAnyDestination;
-import org.apache.qpid.client.AMQConnection;
-
-public class PerfBase
-{
- TestParams params;
- Connection con;
- Session session;
- Destination dest;
- Destination feedbackDest;
- DecimalFormat df = new DecimalFormat("###.##");
-
- public PerfBase()
- {
- params = new TestParams();
- }
-
- public void setUp() throws Exception
- {
-
- if (params.getHost().equals("") || params.getPort() == -1)
- {
- con = new AMQConnection(params.getUrl());
- }
- else
- {
- con = new AMQConnection(params.getHost(),params.getPort(),"guest","guest","test","test");
- }
- con.start();
- session = con.createSession(params.isTransacted(),
- params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode());
-
- dest = new AMQAnyDestination(params.getAddress());
- }
-
- public void handleError(Exception e,String msg)
- {
- StringBuilder sb = new StringBuilder();
- sb.append(msg);
- sb.append(" ");
- sb.append(e.getMessage());
- System.err.println(sb.toString());
- e.printStackTrace();
- }
-}
-
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
deleted file mode 100644
index 0ef0455a64..0000000000
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.thread.Threading;
-
-/**
- * PerfConsumer will receive x no of messages in warmup mode.
- * Once it receives the Start message it will then signal the PerfProducer.
- * It will start recording stats from the first message it receives after
- * the warmup mode is done.
- *
- * The following calculations are done.
- * The important numbers to look at is
- * a) Avg Latency
- * b) System throughput.
- *
- * Latency.
- * =========
- * Currently this test is written with the assumption that either
- * a) The Perf Producer and Consumer are on the same machine
- * b) They are on separate machines that have their time synced via a Time Server
- *
- * In order to calculate latency the producer inserts a timestamp
- * hen the message is sent. The consumer will note the current time the message is
- * received and will calculate the latency as follows
- * latency = rcvdTime - msg.getJMSTimestamp()
- *
- * Through out the test it will keep track of the max and min latency to show the
- * variance in latencies.
- *
- * Avg latency is measured by adding all latencies and dividing by the total msgs.
- * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount
- *
- * Throughput
- * ===========
- * System throughput is calculated as follows
- * rcvdMsgCount/(rcvdTime - testStartTime)
- *
- * Consumer rate is calculated as
- * rcvdMsgCount/(rcvdTime - startTime)
- *
- * Note that the testStartTime referes to when the producer sent the first message
- * and startTime is when the consumer first received a message.
- *
- * rcvdTime keeps track of when the last message is received.
- *
- * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
- *
- */
-
-public class PerfConsumer extends PerfBase implements MessageListener
-{
- MessageConsumer consumer;
- long maxLatency = 0;
- long minLatency = Long.MAX_VALUE;
- long totalLatency = 0; // to calculate avg latency.
- int rcvdMsgCount = 0;
- long testStartTime = 0; // to measure system throughput
- long startTime = 0; // to measure consumer throughput
- long rcvdTime = 0;
- boolean transacted = false;
- int transSize = 0;
-
- final Object lock = new Object();
-
- public PerfConsumer()
- {
- super();
- }
-
- public void setUp() throws Exception
- {
- super.setUp();
- consumer = session.createConsumer(dest);
-
- // Storing the following two for efficiency
- transacted = params.isTransacted();
- transSize = params.getTransactionSize();
- }
-
- public void warmup()throws Exception
- {
- System.out.println("Warming up......");
-
- boolean start = false;
- while (!start)
- {
- Message msg = consumer.receive();
- if (msg instanceof TextMessage)
- {
- if (((TextMessage)msg).getText().equals("End"))
- {
- start = true;
- MessageProducer temp = session.createProducer(msg.getJMSReplyTo());
- temp.send(session.createMessage());
- if (params.isTransacted())
- {
- session.commit();
- }
- temp.close();
- }
- }
- }
- }
-
- public void startTest() throws Exception
- {
- System.out.println("Starting test......");
- consumer.setMessageListener(this);
- }
-
- public void printResults() throws Exception
- {
- synchronized (lock)
- {
- lock.wait();
- }
-
- double avgLatency = (double)totalLatency/(double)rcvdMsgCount;
- double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000;
- double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000;
- System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
- System.out.println(new StringBuilder("Consumer rate : ").
- append(df.format(consRate)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("System Throughput : ").
- append(df.format(throughput)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Avg Latency : ").
- append(df.format(avgLatency)).
- append(" ms").toString());
- System.out.println(new StringBuilder("Min Latency : ").
- append(minLatency).
- append(" ms").toString());
- System.out.println(new StringBuilder("Max Latency : ").
- append(maxLatency).
- append(" ms").toString());
- System.out.println("Completed the test......\n");
- }
-
- public void notifyCompletion(Destination replyTo) throws Exception
- {
- MessageProducer tmp = session.createProducer(replyTo);
- Message endMsg = session.createMessage();
- tmp.send(endMsg);
- if (params.isTransacted())
- {
- session.commit();
- }
- tmp.close();
- }
-
- public void tearDown() throws Exception
- {
- consumer.close();
- session.close();
- con.close();
- }
-
- public void onMessage(Message msg)
- {
- try
- {
- if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End"))
- {
- notifyCompletion(msg.getJMSReplyTo());
-
- synchronized (lock)
- {
- lock.notifyAll();
- }
- }
- else
- {
- rcvdTime = System.currentTimeMillis();
- rcvdMsgCount ++;
-
- if (rcvdMsgCount == 1)
- {
- startTime = rcvdTime;
- testStartTime = msg.getJMSTimestamp();
- }
-
- if (transacted && (rcvdMsgCount % transSize == 0))
- {
- session.commit();
- }
-
- long latency = rcvdTime - msg.getJMSTimestamp();
- maxLatency = Math.max(maxLatency, latency);
- minLatency = Math.min(minLatency, latency);
- totalLatency = totalLatency + latency;
- }
-
- }
- catch(Exception e)
- {
- handleError(e,"Error when receiving messages");
- }
-
- }
-
- public void test()
- {
- try
- {
- setUp();
- warmup();
- startTest();
- printResults();
- tearDown();
- }
- catch(Exception e)
- {
- handleError(e,"Error when running test");
- }
- }
-
- public static void main(String[] args)
- {
- final PerfConsumer cons = new PerfConsumer();
- Runnable r = new Runnable()
- {
- public void run()
- {
- cons.test();
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating consumer thread",e);
- }
- t.start();
- }
-} \ No newline at end of file
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
deleted file mode 100644
index 015d1e6205..0000000000
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-
-import org.apache.qpid.thread.Threading;
-
-/**
- * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation
- * from the consumer that it has successfully consumed them and ready to start the
- * test. It will start sending y no of messages and each message will contain a time
- * stamp. This will be used at the receiving end to measure the latency.
- *
- * This is done with the assumption that both consumer and producer are running on
- * the same machine or different machines which have time synced using a time server.
- *
- * This test also calculates the producer rate as follows.
- * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs)
- *
- * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
- *
- * Rajith - Producer rate is not an accurate perf metric IMO.
- * It is heavily inlfuenced by any in memory buffering.
- * System throughput and latencies calculated by the PerfConsumer are more realistic
- * numbers.
- *
- */
-public class PerfProducer extends PerfBase
-{
- MessageProducer producer;
- Message msg;
- byte[] payload;
- List<byte[]> payloads;
- boolean cacheMsg = false;
- boolean randomMsgSize = false;
- boolean durable = false;
- Random random;
- int msgSizeRange = 1024;
-
- public PerfProducer()
- {
- super();
- }
-
- public void setUp() throws Exception
- {
- super.setUp();
- feedbackDest = session.createTemporaryQueue();
-
- durable = params.isDurable();
-
- // if message caching is enabled we pre create the message
- // else we pre create the payload
- if (params.isCacheMessage())
- {
- cacheMsg = true;
-
- msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
- msg.setJMSDeliveryMode(durable?
- DeliveryMode.PERSISTENT :
- DeliveryMode.NON_PERSISTENT
- );
- }
- else if (params.isRandomMsgSize())
- {
- random = new Random(20080921);
- randomMsgSize = true;
- msgSizeRange = params.getMsgSize();
- payloads = new ArrayList<byte[]>(msgSizeRange);
-
- for (int i=0; i < msgSizeRange; i++)
- {
- payloads.add(MessageFactory.createMessagePayload(i).getBytes());
- }
- }
- else
- {
- payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
- }
-
- producer = session.createProducer(dest);
- producer.setDisableMessageID(params.isDisableMessageID());
- producer.setDisableMessageTimestamp(params.isDisableTimestamp());
- }
-
- protected Message getNextMessage() throws Exception
- {
- if (cacheMsg)
- {
- return msg;
- }
- else
- {
- msg = session.createBytesMessage();
-
- if (!randomMsgSize)
- {
- ((BytesMessage)msg).writeBytes(payload);
- }
- else
- {
- ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange)));
- }
- msg.setJMSDeliveryMode(durable?
- DeliveryMode.PERSISTENT :
- DeliveryMode.NON_PERSISTENT
- );
- return msg;
- }
- }
-
- public void warmup()throws Exception
- {
- System.out.println("Warming up......");
- MessageConsumer tmp = session.createConsumer(feedbackDest);
-
- for (int i=0; i < params.getWarmupCount() -1; i++)
- {
- producer.send(getNextMessage());
- }
- Message msg = session.createTextMessage("End");
- msg.setJMSReplyTo(feedbackDest);
- producer.send(msg);
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- tmp.receive();
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- tmp.close();
- }
-
- public void startTest() throws Exception
- {
- System.out.println("Starting test......");
- int count = params.getMsgCount();
- boolean transacted = params.isTransacted();
- int tranSize = params.getTransactionSize();
-
- long start = System.currentTimeMillis();
- for(int i=0; i < count; i++ )
- {
- Message msg = getNextMessage();
- msg.setJMSTimestamp(System.currentTimeMillis());
- producer.send(msg);
- if ( transacted && ((i+1) % tranSize == 0))
- {
- session.commit();
- }
- }
- long time = System.currentTimeMillis() - start;
- double rate = ((double)count/(double)time)*1000;
- System.out.println(new StringBuilder("Producer rate: ").
- append(df.format(rate)).
- append(" msg/sec").
- toString());
- }
-
- public void waitForCompletion() throws Exception
- {
- MessageConsumer tmp = session.createConsumer(feedbackDest);
- Message msg = session.createTextMessage("End");
- msg.setJMSReplyTo(feedbackDest);
- producer.send(msg);
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- tmp.receive();
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- tmp.close();
- System.out.println("Consumer has completed the test......");
- }
-
- public void tearDown() throws Exception
- {
- producer.close();
- session.close();
- con.close();
- }
-
- public void test()
- {
- try
- {
- setUp();
- warmup();
- startTest();
- waitForCompletion();
- tearDown();
- }
- catch(Exception e)
- {
- handleError(e,"Error when running test");
- }
- }
-
-
- public static void main(String[] args)
- {
- final PerfProducer prod = new PerfProducer();
- Runnable r = new Runnable()
- {
- public void run()
- {
- prod.test();
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating producer thread",e);
- }
- t.start();
- }
-} \ No newline at end of file
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
deleted file mode 100644
index 602fcc6321..0000000000
--- a/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
+++ /dev/null
@@ -1,904 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import static org.apache.qpid.tools.QpidBench.Mode.BOTH;
-import static org.apache.qpid.tools.QpidBench.Mode.CONSUME;
-import static org.apache.qpid.tools.QpidBench.Mode.PUBLISH;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.ExchangeBind;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageDeliveryMode;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.MessageSubscribe;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.QueueDeclare;
-import org.apache.qpid.transport.SessionException;
-import org.apache.qpid.transport.SessionListener;
-import org.apache.qpid.util.UUIDGen;
-import org.apache.qpid.util.UUIDs;
-
-/**
- * QpidBench
- *
- */
-
-public class QpidBench
-{
-
- static enum Mode
- {
- PUBLISH, CONSUME, BOTH
- }
-
- private static class Options
- {
- private StringBuilder usage = new StringBuilder("qpid-bench <options>");
-
- void usage(String name, String description, Object def)
- {
- String defval = "";
- if (def != null)
- {
- defval = String.format(" (%s)", def);
- }
- usage.append(String.format("\n %-15s%-14s %s", name, defval, description));
- }
-
- public String broker = "localhost";
- public int port = 5672;
- public long count = 1000000;
- public long window = 100000;
- public long sample = window;
- public int size = 1024;
- public Mode mode = BOTH;
- public boolean timestamp = false;
- public boolean message_id = false;
- public boolean message_cache = false;
- public boolean persistent = false;
- public boolean jms_publish = false;
- public boolean jms_consume = false;
- public boolean help = false;
-
- {
- usage("-b, --broker", "the broker hostname", broker);
- }
-
- public void parse__broker(String b)
- {
- this.broker = b;
- }
-
- public void parse_b(String b)
- {
- parse__broker(b);
- }
-
- {
- usage("-p, --port", "the broker port", port);
- }
-
- public void parse__port(String p)
- {
- this.port = Integer.parseInt(p);
- }
-
- public void parse_p(String p)
- {
- parse__port(p);
- }
-
- {
- usage("-c, --count", "the number of messages to send/receive, 0 means no limit", count);
- }
-
- public void parse__count(String c)
- {
- this.count = Long.parseLong(c);
- }
-
- public void parse_c(String c)
- {
- parse__count(c);
- }
-
- {
- usage("-w, --window", "the number of messages to send before blocking, 0 disables", window);
- }
-
- public void parse__window(String w)
- {
- this.window = Long.parseLong(w);
- }
-
- public void parse_w(String w)
- {
- parse__window(w);
- }
-
- {
- usage("--sample", "print stats after this many messages, 0 disables", sample);
- }
-
- public void parse__sample(String s)
- {
- this.sample = Long.parseLong(s);
- }
-
- {
- usage("-i, --interval", "sets both --window and --sample", window);
- }
-
- public void parse__interval(String i)
- {
- this.window = Long.parseLong(i);
- this.sample = window;
- }
-
- public void parse_i(String i)
- {
- parse__interval(i);
- }
-
- {
- usage("-s, --size", "the message size", size);
- }
-
- public void parse__size(String s)
- {
- this.size = Integer.parseInt(s);
- }
-
- public void parse_s(String s)
- {
- parse__size(s);
- }
-
- {
- usage("-m, --mode", "one of publish, consume, or both", mode);
- }
-
- public void parse__mode(String m)
- {
- if (m.equalsIgnoreCase("publish"))
- {
- this.mode = PUBLISH;
- }
- else if (m.equalsIgnoreCase("consume"))
- {
- this.mode = CONSUME;
- }
- else if (m.equalsIgnoreCase("both"))
- {
- this.mode = BOTH;
- }
- else
- {
- throw new IllegalArgumentException
- ("must be one of 'publish', 'consume', or 'both'");
- }
- }
-
- public void parse_m(String m)
- {
- parse__mode(m);
- }
-
- {
- usage("--timestamp", "set timestamps on each message if true", timestamp);
- }
-
- public void parse__timestamp(String t)
- {
- this.timestamp = Boolean.parseBoolean(t);
- }
-
- {
- usage("--mesage-id", "set the message-id on each message if true", message_id);
- }
-
- public void parse__message_id(String m)
- {
- this.message_id = Boolean.parseBoolean(m);
- }
-
- {
- usage("--message-cache", "reuse the same message for each send if true", message_cache);
- }
-
- public void parse__message_cache(String c)
- {
- this.message_cache = Boolean.parseBoolean(c);
- }
-
- {
- usage("--persistent", "set the delivery-mode to persistent if true", persistent);
- }
-
- public void parse__persistent(String p)
- {
- this.persistent = Boolean.parseBoolean(p);
- }
-
- {
- usage("--jms-publish", "use the jms client for publish", jms_publish);
- }
-
- public void parse__jms_publish(String jp)
- {
- this.jms_publish = Boolean.parseBoolean(jp);
- }
-
- {
- usage("--jms-consume", "use the jms client for consume", jms_consume);
- }
-
- public void parse__jms_consume(String jc)
- {
- this.jms_consume = Boolean.parseBoolean(jc);
- }
-
- {
- usage("--jms", "sets both --jms-publish and --jms-consume", false);
- }
-
- public void parse__jms(String j)
- {
- this.jms_publish = this.jms_consume = Boolean.parseBoolean(j);
- }
-
- {
- usage("-h, --help", "prints this message", null);
- }
-
- public void parse__help()
- {
- this.help = true;
- }
-
- public void parse_h()
- {
- parse__help();
- }
-
- public String parse(String ... args)
- {
- Class klass = getClass();
- List<String> arguments = new ArrayList<String>();
- for (int i = 0; i < args.length; i++)
- {
- String option = args[i];
-
- if (!option.startsWith("-"))
- {
- arguments.add(option);
- continue;
- }
-
- String method = "parse" + option.replace('-', '_');
- try
- {
- try
- {
- Method parser = klass.getMethod(method);
- parser.invoke(this);
- }
- catch (NoSuchMethodException e)
- {
- try
- {
- Method parser = klass.getMethod(method, String.class);
-
- String value = null;
- if (i + 1 < args.length)
- {
- value = args[i+1];
- i++;
- }
- else
- {
- return option + " requires a value";
- }
-
- parser.invoke(this, value);
- }
- catch (NoSuchMethodException e2)
- {
- return "no such option: " + option;
- }
- }
- }
- catch (InvocationTargetException e)
- {
- Throwable t = e.getCause();
- return String.format
- ("error parsing %s: %s: %s", option, t.getClass().getName(),
- t.getMessage());
- }
- catch (IllegalAccessException e)
- {
- throw new RuntimeException
- ("unable to access parse method: " + option, e);
- }
- }
-
- return parseArguments(arguments);
- }
-
- public String parseArguments(List<String> arguments)
- {
- if (arguments.size() > 0)
- {
- String args = arguments.toString();
- return "unrecognized arguments: " + args.substring(1, args.length() - 1);
- }
- else
- {
- return null;
- }
- }
-
- public String toString()
- {
- Class klass = getClass();
- Field[] fields = klass.getFields();
- StringBuilder str = new StringBuilder();
- for (int i = 0; i < fields.length; i++)
- {
- if (i > 0)
- {
- str.append("\n");
- }
-
- String name = fields[i].getName();
- str.append(name);
- str.append(" = ");
- Object value;
- try
- {
- value = fields[i].get(this);
- }
- catch (IllegalAccessException e)
- {
- throw new RuntimeException
- ("unable to access field: " + name, e);
- }
- str.append(value);
- }
-
- return str.toString();
- }
- }
-
- public static final void main(String[] args) throws Exception
- {
- final Options opts = new Options();
- String error = opts.parse(args);
- if (error != null)
- {
- System.err.println(error);
- System.exit(-1);
- return;
- }
-
- if (opts.help)
- {
- System.out.println(opts.usage);
- return;
- }
-
- System.out.println(opts);
-
- switch (opts.mode)
- {
- case CONSUME:
- case BOTH:
- Runnable r = new Runnable()
- {
- public void run()
- {
- try
- {
- if (opts.jms_consume)
- {
- jms_consumer(opts);
- }
- else
- {
- native_consumer(opts);
- }
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- System.out.println("Consumer Completed");
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating consumer thread",e);
- }
- t.start();
- break;
- }
-
- switch (opts.mode)
- {
- case PUBLISH:
- case BOTH:
- Runnable r = new Runnable()
- {
- public void run()
- {
- try
- {
- if (opts.jms_publish)
- {
- jms_publisher(opts);
- }
- else
- {
- native_publisher(opts);
- }
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- System.out.println("Producer Completed");
- }
- };
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating publisher thread",e);
- }
- t.start();
- break;
- }
- }
-
- private static enum Column
- {
- LEFT, RIGHT
- }
-
- private static final void sample(Options opts, Column col, String name, long count,
- long start, long time, long lastTime)
- {
- String pfx = "";
- String sfx = "";
- if (opts.mode == BOTH)
- {
- if (col == Column.RIGHT)
- {
- pfx = " -- ";
- }
- else
- {
- sfx = " --";
- }
- }
-
- if (count == 0)
- {
- String stats = String.format("%s: %tc", name, start);
- System.out.println(String.format("%s%-36s%s", pfx, stats, sfx));
- return;
- }
-
- double cumulative = 1000 * (double) count / (double) (time - start);
- double interval = 1000 * ((double) opts.sample / (double) (time - lastTime));
-
- String stats = String.format
- ("%s: %d %.2f %.2f", name, count, cumulative, interval);
- System.out.println(String.format("%s%-36s%s", pfx, stats, sfx));
- }
-
- private static final javax.jms.Connection getJMSConnection(Options opts) throws Exception
- {
- String url = String.format
- ("amqp://guest:guest@clientid/test?brokerlist='tcp://%s:%d'",
- opts.broker, opts.port);
- return new AMQConnection(url);
- }
-
- private static final void jms_publisher(Options opts) throws Exception
- {
- javax.jms.Connection conn = getJMSConnection(opts);
-
- javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
- Destination dest = ssn.createQueue("test-queue");
- Destination echo_dest = ssn.createQueue("echo-queue");
- MessageProducer prod = ssn.createProducer(dest);
- MessageConsumer cons = ssn.createConsumer(echo_dest);
- prod.setDisableMessageID(!opts.message_id);
- prod.setDisableMessageTimestamp(!opts.timestamp);
- prod.setDeliveryMode(opts.persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- StringBuilder str = new StringBuilder();
- for (int i = 0; i < opts.size; i++)
- {
- str.append((char) (i % 128));
- }
-
- String body = str.toString();
-
- TextMessage cached = ssn.createTextMessage();
- cached.setText(body);
-
- conn.start();
-
- long count = 0;
- long lastTime = 0;
- long start = System.currentTimeMillis();
- while (opts.count == 0 || count < opts.count)
- {
- if (opts.window > 0 && (count % opts.window) == 0 && count > 0)
- {
- Message echo = cons.receive();
- }
-
- if (opts.sample > 0 && (count % opts.sample) == 0)
- {
- long time = System.currentTimeMillis();
- sample(opts, Column.LEFT, "JP", count, start, time, lastTime);
- lastTime = time;
- }
-
- TextMessage m;
- if (opts.message_cache)
- {
- m = cached;
- }
- else
- {
- m = ssn.createTextMessage();
- m.setText(body);
- }
-
- prod.send(m);
- count++;
- }
-
- conn.close();
- }
-
- private static final void jms_consumer(final Options opts) throws Exception
- {
- final javax.jms.Connection conn = getJMSConnection(opts);
- javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
- Destination dest = ssn.createQueue("test-queue");
- Destination echo_dest = ssn.createQueue("echo-queue");
- MessageConsumer cons = ssn.createConsumer(dest);
- final MessageProducer prod = ssn.createProducer(echo_dest);
- prod.setDisableMessageID(true);
- prod.setDisableMessageTimestamp(true);
- prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- final TextMessage echo = ssn.createTextMessage();
- echo.setText("ECHO");
-
- final Object done = new Object();
- cons.setMessageListener(new MessageListener()
- {
- private long count = 0;
- private long lastTime = 0;
- private long start;
-
- public void onMessage(Message m)
- {
- if (count == 0)
- {
- start = System.currentTimeMillis();
- }
-
- try
- {
- boolean sample = opts.sample > 0 && (count % opts.sample) == 0;
- long time = sample ? System.currentTimeMillis() : 0;
-
- if (opts.window > 0 && (count % opts.window) == 0)
- {
- prod.send(echo);
- }
-
- if (sample)
- {
- sample(opts, Column.RIGHT, "JC", count, start, time, lastTime);
- lastTime = time;
- }
- }
- catch (JMSException e)
- {
- throw new RuntimeException(e);
- }
- count++;
-
- if (opts.count > 0 && count >= opts.count)
- {
- synchronized (done)
- {
- done.notify();
- }
- }
- }
- });
-
- conn.start();
- synchronized (done)
- {
- done.wait();
- }
- conn.close();
- }
-
- private static final org.apache.qpid.transport.Connection getConnection
- (Options opts)
- {
- org.apache.qpid.transport.Connection conn =
- new org.apache.qpid.transport.Connection();
- conn.connect(opts.broker, opts.port, null, "guest", "guest",false);
- return conn;
- }
-
- private static abstract class NativeListener implements SessionListener
- {
-
- public void opened(org.apache.qpid.transport.Session ssn) {}
-
- public void resumed(org.apache.qpid.transport.Session ssn) {}
-
- public void exception(org.apache.qpid.transport.Session ssn,
- SessionException exc)
- {
- exc.printStackTrace();
- }
-
- public void closed(org.apache.qpid.transport.Session ssn) {}
-
- }
-
- private static final void native_publisher(Options opts) throws Exception
- {
- final long[] echos = { 0 };
- org.apache.qpid.transport.Connection conn = getConnection(opts);
- org.apache.qpid.transport.Session ssn = conn.createSession();
- ssn.setSessionListener(new NativeListener()
- {
- public void message(org.apache.qpid.transport.Session ssn,
- MessageTransfer xfr)
- {
- synchronized (echos)
- {
- echos[0]++;
- echos.notify();
- }
- ssn.processed(xfr);
- }
- });
-
- ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
- ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
- ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue"));
- ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue"));
-
- MessageProperties cached_mp = new MessageProperties();
- DeliveryProperties cached_dp = new DeliveryProperties();
- cached_dp.setRoutingKey("test-queue");
- cached_dp.setDeliveryMode
- (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT);
-
- int size = opts.size;
- ByteBuffer body = ByteBuffer.allocate(size);
- for (int i = 0; i < size; i++)
- {
- body.put((byte) i);
- }
- body.flip();
-
- ssn.invoke(new MessageSubscribe()
- .queue("echo-queue")
- .destination("echo-queue")
- .acceptMode(MessageAcceptMode.NONE)
- .acquireMode(MessageAcquireMode.PRE_ACQUIRED));
- ssn.messageSetFlowMode("echo-queue", MessageFlowMode.WINDOW);
- ssn.messageFlow("echo-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF);
- ssn.messageFlow("echo-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF);
-
- UUIDGen gen = UUIDs.newGenerator();
-
- long count = 0;
- long lastTime = 0;
- long start = System.currentTimeMillis();
- while (opts.count == 0 || count < opts.count)
- {
- if (opts.window > 0 && (count % opts.window) == 0 && count > 0)
- {
- synchronized (echos)
- {
- while (echos[0] < (count/opts.window))
- {
- echos.wait();
- }
- }
- }
-
- if (opts.sample > 0 && (count % opts.sample) == 0)
- {
- long time = System.currentTimeMillis();
- sample(opts, Column.LEFT, "NP", count, start, time, lastTime);
- lastTime = time;
- }
-
- MessageProperties mp;
- DeliveryProperties dp;
- if (opts.message_cache)
- {
- mp = cached_mp;
- dp = cached_dp;
- }
- else
- {
- mp = new MessageProperties();
- dp = new DeliveryProperties();
- dp.setRoutingKey("test-queue");
- dp.setDeliveryMode
- (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT);
-
- }
-
- if (opts.message_id)
- {
- mp.setMessageId(gen.generate());
- }
-
- if (opts.timestamp)
- {
- dp.setTimestamp(System.currentTimeMillis());
- }
-
- ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
- new Header(dp, mp), body.slice());
- count++;
- }
-
- ssn.messageCancel("echo-queue");
-
- ssn.sync();
- ssn.close();
- conn.close();
- }
-
- private static final void native_consumer(final Options opts) throws Exception
- {
- final DeliveryProperties dp = new DeliveryProperties();
- final byte[] echo = new byte[0];
- dp.setRoutingKey("echo-queue");
- dp.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
- final MessageProperties mp = new MessageProperties();
- final Object done = new Object();
- org.apache.qpid.transport.Connection conn = getConnection(opts);
- org.apache.qpid.transport.Session ssn = conn.createSession();
- ssn.setSessionListener(new NativeListener()
- {
- private long count = 0;
- private long lastTime = 0;
- private long start;
-
- public void message(org.apache.qpid.transport.Session ssn,
- MessageTransfer xfr)
- {
- if (count == 0)
- {
- start = System.currentTimeMillis();
- }
-
- boolean sample = opts.sample > 0 && (count % opts.sample) == 0;
- long time = sample ? System.currentTimeMillis() : 0;
-
- if (opts.window > 0 && (count % opts.window) == 0)
- {
- ssn.messageTransfer("amq.direct",
- MessageAcceptMode.NONE,
- MessageAcquireMode.PRE_ACQUIRED,
- new Header(dp, mp),
- echo);
- }
-
- if (sample)
- {
- sample(opts, Column.RIGHT, "NC", count, start, time, lastTime);
- lastTime = time;
- }
- ssn.processed(xfr);
- count++;
-
- if (opts.count > 0 && count >= opts.count)
- {
- synchronized (done)
- {
- done.notify();
- }
- }
- }
- });
-
- ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
- ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
- ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue"));
- ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue"));
-
- ssn.invoke(new MessageSubscribe()
- .queue("test-queue")
- .destination("test-queue")
- .acceptMode(MessageAcceptMode.NONE)
- .acquireMode(MessageAcquireMode.PRE_ACQUIRED));
- ssn.messageSetFlowMode("test-queue", MessageFlowMode.WINDOW);
- ssn.messageFlow("test-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF);
- ssn.messageFlow("test-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF);
-
- synchronized (done)
- {
- done.wait();
- }
-
- ssn.messageCancel("test-queue");
-
- ssn.sync();
- ssn.close();
- conn.close();
- }
-
-}
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
deleted file mode 100644
index 89d6462a39..0000000000
--- a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import javax.jms.Session;
-
-public class TestParams
-{
- /*
- * By default the connection URL is used.
- * This allows a user to easily specify a fully fledged URL any given property.
- * Ex. SSL parameters
- *
- * By providing a host & port allows a user to simply override the URL.
- * This allows to create multiple clients in test scripts easily,
- * without having to deal with the long URL format.
- */
- private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'";
-
- private String host = "";
-
- private int port = -1;
-
- private String address = "queue; {create : always}";
-
- private int msg_size = 1024;
-
- private int msg_type = 1; // not used yet
-
- private boolean cacheMessage = false;
-
- private boolean disableMessageID = false;
-
- private boolean disableTimestamp = false;
-
- private boolean durable = false;
-
- private boolean transacted = false;
-
- private int transaction_size = 1000;
-
- private int ack_mode = Session.AUTO_ACKNOWLEDGE;
-
- private int msg_count = 10;
-
- private int warmup_count = 1;
-
- private boolean random_msg_size = false;
-
- public TestParams()
- {
-
- url = System.getProperty("url",url);
- host = System.getProperty("host","");
- port = Integer.getInteger("port", -1);
- address = System.getProperty("address","queue");
-
- msg_size = Integer.getInteger("msg_size", 1024);
- msg_type = Integer.getInteger("msg_type",1);
- cacheMessage = Boolean.getBoolean("cache_msg");
- disableMessageID = Boolean.getBoolean("disableMessageID");
- disableTimestamp = Boolean.getBoolean("disableTimestamp");
- durable = Boolean.getBoolean("durable");
- transacted = Boolean.getBoolean("transacted");
- transaction_size = Integer.getInteger("trans_size",1000);
- ack_mode = Integer.getInteger("ack_mode",Session.AUTO_ACKNOWLEDGE);
- msg_count = Integer.getInteger("msg_count",msg_count);
- warmup_count = Integer.getInteger("warmup_count",warmup_count);
- random_msg_size = Boolean.getBoolean("random_msg_size");
- }
-
- public String getUrl()
- {
- return url;
- }
-
- public String getHost()
- {
- return host;
- }
-
- public int getPort()
- {
- return port;
- }
-
- public String getAddress()
- {
- return address;
- }
-
- public int getAckMode()
- {
- return ack_mode;
- }
-
- public int getMsgCount()
- {
- return msg_count;
- }
-
- public int getMsgSize()
- {
- return msg_size;
- }
-
- public int getMsgType()
- {
- return msg_type;
- }
-
- public boolean isDurable()
- {
- return durable;
- }
-
- public boolean isTransacted()
- {
- return transacted;
- }
-
- public int getTransactionSize()
- {
- return transaction_size;
- }
-
- public int getWarmupCount()
- {
- return warmup_count;
- }
-
- public boolean isCacheMessage()
- {
- return cacheMessage;
- }
-
- public boolean isDisableMessageID()
- {
- return disableMessageID;
- }
-
- public boolean isDisableTimestamp()
- {
- return disableTimestamp;
- }
-
- public boolean isRandomMsgSize()
- {
- return random_msg_size;
- }
-
-}