From cf00c9069b84d6de78b7507ac2fa4b0f6770fb6f Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Tue, 10 Apr 2012 11:15:58 +0000 Subject: QPID-3941 Renamed existing files as the first step. The existing distributed testing framework is now named Mercury, and the related files are prefixed with that name for easy identification. The existing perf-report was renamed as jms-quick-perf-report, as that is exactly what it does. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1311673 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/tools/bin/controller | 132 ------ qpid/java/tools/bin/jms-quick-perf-report | 137 +++++++ qpid/java/tools/bin/mercury-controller | 132 ++++++ qpid/java/tools/bin/mercury-start-consumers | 119 ++++++ qpid/java/tools/bin/mercury-start-producers | 136 +++++++ qpid/java/tools/bin/perf-report | 137 ------- qpid/java/tools/bin/start-consumers | 119 ------ qpid/java/tools/bin/start-producers | 136 ------- .../org/apache/qpid/tools/JVMArgConfiguration.java | 214 ++++++++++ .../java/org/apache/qpid/tools/MercuryBase.java | 226 +++++++++++ .../qpid/tools/MercuryConsumerController.java | 325 +++++++++++++++ .../qpid/tools/MercuryProducerController.java | 358 +++++++++++++++++ .../apache/qpid/tools/MercuryTestController.java | 442 +++++++++++++++++++++ .../main/java/org/apache/qpid/tools/PerfBase.java | 226 ----------- .../java/org/apache/qpid/tools/PerfConsumer.java | 325 --------------- .../java/org/apache/qpid/tools/PerfProducer.java | 358 ----------------- .../org/apache/qpid/tools/PerfTestController.java | 442 --------------------- .../java/org/apache/qpid/tools/TestParams.java | 214 ---------- 18 files changed, 2089 insertions(+), 2089 deletions(-) delete mode 100644 qpid/java/tools/bin/controller create mode 100755 qpid/java/tools/bin/jms-quick-perf-report create mode 100644 qpid/java/tools/bin/mercury-controller create mode 100644 qpid/java/tools/bin/mercury-start-consumers create mode 100644 qpid/java/tools/bin/mercury-start-producers delete mode 100755 qpid/java/tools/bin/perf-report delete mode 100644 qpid/java/tools/bin/start-consumers delete mode 100644 qpid/java/tools/bin/start-producers create mode 100644 qpid/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java create mode 100644 qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java create mode 100644 qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java create mode 100644 qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java create mode 100644 qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java delete mode 100644 qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java delete mode 100644 qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java delete mode 100644 qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java delete mode 100644 qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java delete mode 100644 qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java diff --git a/qpid/java/tools/bin/controller b/qpid/java/tools/bin/controller deleted file mode 100644 index fab8614039..0000000000 --- a/qpid/java/tools/bin/controller +++ /dev/null @@ -1,132 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# This starts the controller for coordinating perf tests/ - -. check-qpid-java-env - -PROGRAM_NAME=controller -CONSUMER_COUNT=1 -PRODUCER_COUNT=1 -DURATION=-1 -TEST_NAME="TEST_NAME" -EXTRA_JVM_ARGS="" - -TEMP=$(getopt -n $PROGRAM_NAME -o c:p:d:n:a:h --long consumers:,producers:,jvm-args:help -- "$@") - -usage() -{ - printf "\n%s\n" "Usage: controller [option].." - - printf "\n%31s\n%52s\n" "-c, --consumer-count=count" "No of consumers participating in the test" - - printf "\n%31s\n%52s\n" "-p, --producer-count=count" "No of producers participating in the test" - - printf "\n%24s\n%94s\n" "-d, --duration=mins" "The duration of the test in mins. If not specified, it will just run one iteration." - - printf "\n%27s\n%32s\n" "-n, --name=" "The name of the test." - - printf "\n%19s\n%50s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify" -} - -eval set -- "$TEMP" -while true; do - case $1 in - -c|--consumer-count) - CONSUMER_COUNT="$2"; shift; shift; continue - ;; - -p|--producer-count) - PRODUCER_COUNT="$2"; shift; shift; continue - ;; - -d|--duration) - DURATION="$2"; shift; shift; continue - ;; - -n|--name) - TEST_NAME="$2"; shift; shift; continue - ;; - -h|--help) - usage - exit 0 - ;; - -a|--jvm-args) - EXTRA_JVM_ARGS="$2"; shift; shift; continue - ;; - --) - # no more arguments to parse - break - ;; - *) - # no more arguments to parse - break - ;; - esac -done - -CONTROLLER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dprecision=mili -Dprod_count=$PRODUCER_COUNT -Dcons_count=$CONSUMER_COUNT -Dprint_std_dev=true -Dduration=${DURATION}" - - -waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; } -cleanup() -{ - pids=`ps aux | grep java | grep PerfTestController | awk '{print $2}'` - if [ "$pids" != "" ]; then - kill -3 $pids - kill -9 $pids >/dev/null 2>&1 - fi -} - -run_controller() -{ - TEST_ARGS="$LOG_CONFIG $JAVA_MEM $CONTROLLER_ARGS $EXTRA_JVM_ARGS" - echo "Running controller with : $TEST_ARGS" > test.out - $JAVA -cp $CLASSPATH $TEST_ARGS org.apache.qpid.tools.PerfTestController >> test.out & - waitfor test.out "Controller: Completed the test" - sleep 2 #give a grace period to shutdown - print_result $TEST_NAME -} - -print_result() -{ - prod_rate=`cat test.out | grep "Avg Producer rate" | awk '{print $5}'` - sys_rate=`cat test.out | grep "System Throughput" | awk '{print $4}'` - cons_rate=`cat test.out | grep "Avg Consumer rate" | awk '{print $5}'` - avg_latency=`cat test.out | grep "Avg System Latency" | awk '{print $5}'` - min_latency=`cat test.out | grep "Min System Latency" | awk '{print $5}'` - max_latency=`cat test.out | grep "Max System Latency" | awk '{print $5}'` - std_dev=`cat test.out | grep "Avg System Std Dev" | awk '{print $6}'` - - printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11.2f|%11.2f|%7.2f|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency $std_dev - echo "--------------------------------------------------------------------------------------------------------" -} - -trap cleanup EXIT - -rm -rf *.out - -if [ "$DURATION" = -1 ]; then - echo "Test report on " `date +%F` - echo "========================================================================================================" - echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|Std Dev|" - echo "--------------------------------------------------------------------------------------------------------" -else - echo "Test in progress....Tail stats-csv.log to see results being printed for each iteration." -fi - -run_controller diff --git a/qpid/java/tools/bin/jms-quick-perf-report b/qpid/java/tools/bin/jms-quick-perf-report new file mode 100755 index 0000000000..7de3f2b602 --- /dev/null +++ b/qpid/java/tools/bin/jms-quick-perf-report @@ -0,0 +1,137 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This will run the following test cases defined below and produce +# a report in tabular format. + +QUEUE="queue;{create:always,node:{x-declare:{auto-delete:true}}}" +DURA_QUEUE="dqueue;{create:always,node:{durable:true,x-declare:{auto-delete:true}}}" +TOPIC="amq.topic/test" +DURA_TOPIC="amq.topic/test;{create:always,link:{durable:true}}" + +COMMON_CONFIG="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'" + +waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; } +cleanup() +{ + pids=`ps aux | grep java | grep Perf | awk '{print $2}'` + if [ "$pids" != "" ]; then + kill -3 $pids + kill -9 $pids >/dev/null 2>&1 + fi +} + +# $1 test name +# $2 consumer options +# $3 producer options +run_testcase() +{ + sh run-sub $COMMON_CONFIG $2 > sub.out & + sh run-pub $COMMON_CONFIG $3 > pub.out & + waitfor pub.out "Controller: Completed the test" + sleep 2 #give a grace period to shutdown + print_result $1 + mv pub.out $1.pub.out + mv sub.out $1.sub.out +} + +print_result() +{ + prod_rate=`cat pub.out | grep "Avg Producer rate" | awk '{print $5}'` + sys_rate=`cat pub.out | grep "System Throughput" | awk '{print $4}'` + cons_rate=`cat pub.out | grep "Avg Consumer rate" | awk '{print $5}'` + avg_latency=`cat pub.out | grep "Avg System Latency" | awk '{print $5}'` + min_latency=`cat pub.out | grep "Min System Latency" | awk '{print $5}'` + max_latency=`cat pub.out | grep "Max System Latency" | awk '{print $5}'` + + printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11.2f|%11.2f|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency + echo "------------------------------------------------------------------------------------------------" +} + +trap cleanup EXIT +rm -rf *.out #cleanup old files. + +echo "Test report on " `date +%F` +echo "================================================================================================" +echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|" +echo "------------------------------------------------------------------------------------------------" + +# The message counts and warmup counts are set to very low values for quick testing of the script. +# For a real performance run I recommend setting warmup count to 10k and message count in excess of 100k +# However for transactions, sync_publish and especially small durable transactions (which is quite slow) I recommend +# setting very low values to start with and experiment while increasing them slowly. + +# Test 1 Trans Queue +run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10" + +# Test 2 Dura Queue +run_testcase "Dura_Queue" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" + +# Test 3 Dura Queue Sync +run_testcase "Dura_Queue_Sync" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent" + +# Test 4 Dura Queue Sync Publish and Ack +run_testcase "Dura_SyncPubAck" "-Daddress=$DURA_QUEUE -Ddurable=true -Dsync_ack=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent" + +# Test 5 Topic +run_testcase "Topic" "-Daddress=$TOPIC" "-Daddress=$TOPIC -Dwarmup_count=1 -Dmsg_count=10" + +# Test 6 Durable Topic +run_testcase "Dura_Topic" "-Daddress=$DURA_TOPIC -Ddurable=true" "-Daddress=$DURA_TOPIC -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" + +# Test 7 Fanout +run_testcase "Fanout" "-Daddress=amq.fanout" "-Daddress=amq.fanout -Dwarmup_count=1 -Dmsg_count=10" + +# Test 8 Small TX +run_testcase "Small_Txs_2" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=1" \ + "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=1" + +# Test 9 Large TX +run_testcase "Large_Txs_1000" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=10" \ + "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=10" + +# Test 10 256 MSG +run_testcase "Msg_256b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=256 -Dwarmup_count=1 -Dmsg_count=10" + +# Test 11 512 MSG +run_testcase "Msg_512b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=512 -Dwarmup_count=1 -Dmsg_count=10" + +# Test 12 2048 MSG +run_testcase "Msg_2048b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=2048 -Dwarmup_count=1 -Dmsg_count=10" + +# Test 13 Random size MSG +run_testcase "Random_Msg_Size" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10" + +# Test 14 Random size MSG Durable +run_testcase "Rand_Msg_Dura" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10" + +# Test 15 64K MSG +run_testcase "Msg_64K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Dwarmup_count=1 -Dmsg_count=10" + +# Test 16 Durable 64K MSG +run_testcase "Msg_Durable_64K" "-Daddress=$DURA_QUEUE -Ddurable=true -Damqj.tcpNoDelay=true" \ + "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" + +# Test 17 500K MSG +run_testcase "Msg_500K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Dwarmup_count=1 -Dmsg_count=10" + +# Test 18 Durable 500K MSG +run_testcase "Msg_Dura_500K" "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Ddurable=true" \ + "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" diff --git a/qpid/java/tools/bin/mercury-controller b/qpid/java/tools/bin/mercury-controller new file mode 100644 index 0000000000..fab8614039 --- /dev/null +++ b/qpid/java/tools/bin/mercury-controller @@ -0,0 +1,132 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This starts the controller for coordinating perf tests/ + +. check-qpid-java-env + +PROGRAM_NAME=controller +CONSUMER_COUNT=1 +PRODUCER_COUNT=1 +DURATION=-1 +TEST_NAME="TEST_NAME" +EXTRA_JVM_ARGS="" + +TEMP=$(getopt -n $PROGRAM_NAME -o c:p:d:n:a:h --long consumers:,producers:,jvm-args:help -- "$@") + +usage() +{ + printf "\n%s\n" "Usage: controller [option].." + + printf "\n%31s\n%52s\n" "-c, --consumer-count=count" "No of consumers participating in the test" + + printf "\n%31s\n%52s\n" "-p, --producer-count=count" "No of producers participating in the test" + + printf "\n%24s\n%94s\n" "-d, --duration=mins" "The duration of the test in mins. If not specified, it will just run one iteration." + + printf "\n%27s\n%32s\n" "-n, --name=" "The name of the test." + + printf "\n%19s\n%50s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify" +} + +eval set -- "$TEMP" +while true; do + case $1 in + -c|--consumer-count) + CONSUMER_COUNT="$2"; shift; shift; continue + ;; + -p|--producer-count) + PRODUCER_COUNT="$2"; shift; shift; continue + ;; + -d|--duration) + DURATION="$2"; shift; shift; continue + ;; + -n|--name) + TEST_NAME="$2"; shift; shift; continue + ;; + -h|--help) + usage + exit 0 + ;; + -a|--jvm-args) + EXTRA_JVM_ARGS="$2"; shift; shift; continue + ;; + --) + # no more arguments to parse + break + ;; + *) + # no more arguments to parse + break + ;; + esac +done + +CONTROLLER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dprecision=mili -Dprod_count=$PRODUCER_COUNT -Dcons_count=$CONSUMER_COUNT -Dprint_std_dev=true -Dduration=${DURATION}" + + +waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; } +cleanup() +{ + pids=`ps aux | grep java | grep PerfTestController | awk '{print $2}'` + if [ "$pids" != "" ]; then + kill -3 $pids + kill -9 $pids >/dev/null 2>&1 + fi +} + +run_controller() +{ + TEST_ARGS="$LOG_CONFIG $JAVA_MEM $CONTROLLER_ARGS $EXTRA_JVM_ARGS" + echo "Running controller with : $TEST_ARGS" > test.out + $JAVA -cp $CLASSPATH $TEST_ARGS org.apache.qpid.tools.PerfTestController >> test.out & + waitfor test.out "Controller: Completed the test" + sleep 2 #give a grace period to shutdown + print_result $TEST_NAME +} + +print_result() +{ + prod_rate=`cat test.out | grep "Avg Producer rate" | awk '{print $5}'` + sys_rate=`cat test.out | grep "System Throughput" | awk '{print $4}'` + cons_rate=`cat test.out | grep "Avg Consumer rate" | awk '{print $5}'` + avg_latency=`cat test.out | grep "Avg System Latency" | awk '{print $5}'` + min_latency=`cat test.out | grep "Min System Latency" | awk '{print $5}'` + max_latency=`cat test.out | grep "Max System Latency" | awk '{print $5}'` + std_dev=`cat test.out | grep "Avg System Std Dev" | awk '{print $6}'` + + printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11.2f|%11.2f|%7.2f|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency $std_dev + echo "--------------------------------------------------------------------------------------------------------" +} + +trap cleanup EXIT + +rm -rf *.out + +if [ "$DURATION" = -1 ]; then + echo "Test report on " `date +%F` + echo "========================================================================================================" + echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|Std Dev|" + echo "--------------------------------------------------------------------------------------------------------" +else + echo "Test in progress....Tail stats-csv.log to see results being printed for each iteration." +fi + +run_controller diff --git a/qpid/java/tools/bin/mercury-start-consumers b/qpid/java/tools/bin/mercury-start-consumers new file mode 100644 index 0000000000..c71fc0c21f --- /dev/null +++ b/qpid/java/tools/bin/mercury-start-consumers @@ -0,0 +1,119 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This starts the controller for coordinating perf tests/ + +. check-qpid-java-env + +PROGRAM_NAME="start-consumers" +PROCESS_COUNT=1 +CON_COUNT=1 +MSG_COUNT=10000 +ADDRESS="queue;{create:always}" +UNIQUE_DEST="false" + +EXTRA_JVM_ARGS=" -Dmax_prefetch=500 " + +TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'` + +TEMP=$(getopt -n $PROGRAM_NAME -o C:P:uc:p:a:s:t:w:h\ + --long connection-count:,process-count:,create-unique-queues-topics,\ +jvm-args:,queue:,topic:,address:,\ +msg-count:,help -- "$@") + +usage() +{ + printf "\n%s\n" "Usage: start-producers [option].." + + printf "\n%32s\n%51s\n" "-C, --connection-count=count" "No of consumers participating in the test" + + printf "\n%29s\n%51s\n" "-P, --process-count=count" "No of producers participating in the test" + + printf "\n%37s\n%105s\n" "-u, --create-unique-queues-topics" "This will create unique queue names and topics based on what you specify for --queue or --topic" + + printf "\n%11s\n%55s\n" "--queue" "The Queue you want to publish to. Ex my-queue" + + printf "\n%11s\n%84s\n" "--topic" "The Topic you want to publish to in amq.topic exchange. Ex amq.topic/topic" + + printf "\n%13s\n%44s\n" "--address" "The address you want to publish to" + + printf "\n%25s\n%50s\n" "-c, --msg-count=count" "message count per test (default 500,000)" + + printf "\n%18s\n%49s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify" +} + +eval set -- "$TEMP" +while true; do + case $1 in + -C|--connection-count) + CON_COUNT="$2"; shift; shift; continue + ;; + -P|--process-count) + PROCESS_COUNT="$2"; shift; shift; continue + ;; + -u|--create-unique-queues-topics) + UNIQUE_DEST="true"; shift; continue + ;; + --queue) + ADDRESS="$2;{create: always}"; shift; shift; continue + ;; + --topic) + ADDRESS="amq.topic/$2"; shift; shift; continue + ;; + --address) + ADDRESS="$2"; shift; shift; continue + ;; + -h|--help) + usage + exit 0 + ;; + -a|--jvm-args) + EXTRA_JVM_ARGS="$2"; shift; shift; continue + ;; + -c|--msg-count) + MSG_COUNT="$2"; shift; shift; continue + ;; + --) + # no more arguments to parse + break + ;; + *) + # no more arguments to parse + break + ;; + esac +done + +CONSUMER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dprecision=mili -Dcon_count=$CON_COUNT -Dprint_std_dev=true" + +start_consumers() +{ + for ((i=0; i<$PROCESS_COUNT; i++)) + do + if [ "$UNIQUE_DEST" = "true" ]; then + sh run-sub "$CONSUMER_ARGS $@" "${TEST_ID}_$i" > ${TEST_ID}_$i.sub.out 2>&1 & + else + sh run-sub "$CONSUMER_ARGS $@" > ${TEST_ID}_$i.sub.out 2>&1 & + fi + done +} + +start_consumers "-Daddress=$ADDRESS -Duse_unique_dest=$UNIQUE_DEST -Dmsg_count=$MSG_COUNT -Dcon_count=$CON_COUNT $EXTRA_JVM_ARGS" + diff --git a/qpid/java/tools/bin/mercury-start-producers b/qpid/java/tools/bin/mercury-start-producers new file mode 100644 index 0000000000..7ba0286f7c --- /dev/null +++ b/qpid/java/tools/bin/mercury-start-producers @@ -0,0 +1,136 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This starts the controller for coordinating perf tests/ + +. check-qpid-java-env + +PROGRAM_NAME="start-producers" +PROCESS_COUNT=1 +CON_COUNT=1 +MSG_TYPE="bytes" +WARMUP_MSG_COUNT=1000 +MSG_COUNT=10000 +MSG_SIZE=1024 +ADDRESS="queue;{create:always}" +UNIQUE_DEST="false" + +EXTRA_JVM_ARGS="" +TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'` + +TEMP=$(getopt -n $PROGRAM_NAME -o C:P:uc:p:a:s:t:w:h\ + --long connection-count:,process-count:,create-unique-queues-topics,\ +jvm-args:,queue:,topic:,address:,\ +msg-count:,msg-size:msg-type:,warmup-msg-count,help -- "$@") + +usage() +{ + printf "\n%s\n" "Usage: start-producers [option].." + + printf "\n%32s\n%51s\n" "-C, --connection-count=count" "No of consumers participating in the test" + + printf "\n%29s\n%51s\n" "-P, --process-count=count" "No of producers participating in the test" + + printf "\n%37s\n%105s\n" "-u, --create-unique-queues-topics" "This will create unique queue names and topics based on what you specify for --queue or --topic" + + printf "\n%11s\n%55s\n" "--queue" "The Queue you want to publish to. Ex my-queue" + + printf "\n%11s\n%84s\n" "--topic" "The Topic you want to publish to in amq.topic exchange. Ex amq.topic/topic" + + printf "\n%13s\n%44s\n" "--address" "The address you want to publish to" + + printf "\n%23s\n%37s\n" "-s, --msg-size=size" "message size (default 1024)" + + printf "\n%25s\n%50s\n" "-c, --msg-count=count" "message count per test (default 500,000)" + + printf "\n%18s\n%38s\n" "-t, --msg-type" "{bytes|text} (default bytes)" + + printf "\n%26s\n%49s\n" "-w, --warmup-msg-count" "warm up message count (default 100,000)" + + printf "\n%18s\n%49s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify" +} + +eval set -- "$TEMP" +while true; do + case $1 in + -C|--connection-count) + CON_COUNT="$2"; shift; shift; continue + ;; + -P|--process-count) + PROCESS_COUNT="$2"; shift; shift; continue + ;; + -u|--create-unique-queues-topics) + UNIQUE_DEST="true"; shift; continue + ;; + --queue) + ADDRESS="$2;{create: always}"; shift; shift; continue + ;; + --topic) + ADDRESS="amq.topic/$2"; shift; shift; continue + ;; + --address) + ADDRESS="$2"; shift; shift; continue + ;; + -h|--help) + usage + exit 0 + ;; + -a|--jvm-args) + EXTRA_JVM_ARGS="$2"; shift; shift; continue + ;; + -s|--msg-size) + MSG_SIZE="$2"; shift; shift; continue + ;; + -c|--msg-count) + MSG_COUNT="$2"; shift; shift; continue + ;; + -t|--msg_type) + MSG_TYPE="$2"; shift; shift; continue + ;; + -w|--warmup-msg-count) + WARMUP_MSG_COUNT="$2"; shift; shift; continue + ;; + --) + # no more arguments to parse + break + ;; + *) + # no more arguments to parse + break + ;; + esac +done + +PRODUCER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dext_controller=true -Dprecision=mili -Dcon_count=$CON_COUNT" + +start_producers() +{ + for ((i=0; i<$PROCESS_COUNT; i++)) + do + if [ "$UNIQUE_DEST" = "true" ]; then + sh run-pub "$PRODUCER_ARGS $@" "${TEST_ID}_$i" > ${TEST_ID}_$i.pub.out 2>&1 & + else + sh run-pub "$PRODUCER_ARGS $@" > ${TEST_ID}_$i.pub.out 2>&1 & + fi + done +} + +start_producers "-Daddress=$ADDRESS -Duse_unique_dest=$UNIQUE_DEST -Dmsg_count=$MSG_COUNT -Dmsg_size=$MSG_SIZE -Dwarmup_count=$WARMUP_MSG_COUNT -Dmsg_type=$MSG_TYPE -Dcon_count=$CON_COUNT $EXTRA_JVM_ARGS" + diff --git a/qpid/java/tools/bin/perf-report b/qpid/java/tools/bin/perf-report deleted file mode 100755 index 7de3f2b602..0000000000 --- a/qpid/java/tools/bin/perf-report +++ /dev/null @@ -1,137 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# This will run the following test cases defined below and produce -# a report in tabular format. - -QUEUE="queue;{create:always,node:{x-declare:{auto-delete:true}}}" -DURA_QUEUE="dqueue;{create:always,node:{durable:true,x-declare:{auto-delete:true}}}" -TOPIC="amq.topic/test" -DURA_TOPIC="amq.topic/test;{create:always,link:{durable:true}}" - -COMMON_CONFIG="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'" - -waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; } -cleanup() -{ - pids=`ps aux | grep java | grep Perf | awk '{print $2}'` - if [ "$pids" != "" ]; then - kill -3 $pids - kill -9 $pids >/dev/null 2>&1 - fi -} - -# $1 test name -# $2 consumer options -# $3 producer options -run_testcase() -{ - sh run-sub $COMMON_CONFIG $2 > sub.out & - sh run-pub $COMMON_CONFIG $3 > pub.out & - waitfor pub.out "Controller: Completed the test" - sleep 2 #give a grace period to shutdown - print_result $1 - mv pub.out $1.pub.out - mv sub.out $1.sub.out -} - -print_result() -{ - prod_rate=`cat pub.out | grep "Avg Producer rate" | awk '{print $5}'` - sys_rate=`cat pub.out | grep "System Throughput" | awk '{print $4}'` - cons_rate=`cat pub.out | grep "Avg Consumer rate" | awk '{print $5}'` - avg_latency=`cat pub.out | grep "Avg System Latency" | awk '{print $5}'` - min_latency=`cat pub.out | grep "Min System Latency" | awk '{print $5}'` - max_latency=`cat pub.out | grep "Max System Latency" | awk '{print $5}'` - - printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11.2f|%11.2f|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency - echo "------------------------------------------------------------------------------------------------" -} - -trap cleanup EXIT -rm -rf *.out #cleanup old files. - -echo "Test report on " `date +%F` -echo "================================================================================================" -echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|" -echo "------------------------------------------------------------------------------------------------" - -# The message counts and warmup counts are set to very low values for quick testing of the script. -# For a real performance run I recommend setting warmup count to 10k and message count in excess of 100k -# However for transactions, sync_publish and especially small durable transactions (which is quite slow) I recommend -# setting very low values to start with and experiment while increasing them slowly. - -# Test 1 Trans Queue -run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10" - -# Test 2 Dura Queue -run_testcase "Dura_Queue" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" - -# Test 3 Dura Queue Sync -run_testcase "Dura_Queue_Sync" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent" - -# Test 4 Dura Queue Sync Publish and Ack -run_testcase "Dura_SyncPubAck" "-Daddress=$DURA_QUEUE -Ddurable=true -Dsync_ack=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent" - -# Test 5 Topic -run_testcase "Topic" "-Daddress=$TOPIC" "-Daddress=$TOPIC -Dwarmup_count=1 -Dmsg_count=10" - -# Test 6 Durable Topic -run_testcase "Dura_Topic" "-Daddress=$DURA_TOPIC -Ddurable=true" "-Daddress=$DURA_TOPIC -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" - -# Test 7 Fanout -run_testcase "Fanout" "-Daddress=amq.fanout" "-Daddress=amq.fanout -Dwarmup_count=1 -Dmsg_count=10" - -# Test 8 Small TX -run_testcase "Small_Txs_2" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=1" \ - "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=1" - -# Test 9 Large TX -run_testcase "Large_Txs_1000" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=10" \ - "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=10" - -# Test 10 256 MSG -run_testcase "Msg_256b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=256 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 11 512 MSG -run_testcase "Msg_512b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=512 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 12 2048 MSG -run_testcase "Msg_2048b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=2048 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 13 Random size MSG -run_testcase "Random_Msg_Size" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10" - -# Test 14 Random size MSG Durable -run_testcase "Rand_Msg_Dura" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10" - -# Test 15 64K MSG -run_testcase "Msg_64K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 16 Durable 64K MSG -run_testcase "Msg_Durable_64K" "-Daddress=$DURA_QUEUE -Ddurable=true -Damqj.tcpNoDelay=true" \ - "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" - -# Test 17 500K MSG -run_testcase "Msg_500K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 18 Durable 500K MSG -run_testcase "Msg_Dura_500K" "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Ddurable=true" \ - "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" diff --git a/qpid/java/tools/bin/start-consumers b/qpid/java/tools/bin/start-consumers deleted file mode 100644 index c71fc0c21f..0000000000 --- a/qpid/java/tools/bin/start-consumers +++ /dev/null @@ -1,119 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# This starts the controller for coordinating perf tests/ - -. check-qpid-java-env - -PROGRAM_NAME="start-consumers" -PROCESS_COUNT=1 -CON_COUNT=1 -MSG_COUNT=10000 -ADDRESS="queue;{create:always}" -UNIQUE_DEST="false" - -EXTRA_JVM_ARGS=" -Dmax_prefetch=500 " - -TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'` - -TEMP=$(getopt -n $PROGRAM_NAME -o C:P:uc:p:a:s:t:w:h\ - --long connection-count:,process-count:,create-unique-queues-topics,\ -jvm-args:,queue:,topic:,address:,\ -msg-count:,help -- "$@") - -usage() -{ - printf "\n%s\n" "Usage: start-producers [option].." - - printf "\n%32s\n%51s\n" "-C, --connection-count=count" "No of consumers participating in the test" - - printf "\n%29s\n%51s\n" "-P, --process-count=count" "No of producers participating in the test" - - printf "\n%37s\n%105s\n" "-u, --create-unique-queues-topics" "This will create unique queue names and topics based on what you specify for --queue or --topic" - - printf "\n%11s\n%55s\n" "--queue" "The Queue you want to publish to. Ex my-queue" - - printf "\n%11s\n%84s\n" "--topic" "The Topic you want to publish to in amq.topic exchange. Ex amq.topic/topic" - - printf "\n%13s\n%44s\n" "--address" "The address you want to publish to" - - printf "\n%25s\n%50s\n" "-c, --msg-count=count" "message count per test (default 500,000)" - - printf "\n%18s\n%49s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify" -} - -eval set -- "$TEMP" -while true; do - case $1 in - -C|--connection-count) - CON_COUNT="$2"; shift; shift; continue - ;; - -P|--process-count) - PROCESS_COUNT="$2"; shift; shift; continue - ;; - -u|--create-unique-queues-topics) - UNIQUE_DEST="true"; shift; continue - ;; - --queue) - ADDRESS="$2;{create: always}"; shift; shift; continue - ;; - --topic) - ADDRESS="amq.topic/$2"; shift; shift; continue - ;; - --address) - ADDRESS="$2"; shift; shift; continue - ;; - -h|--help) - usage - exit 0 - ;; - -a|--jvm-args) - EXTRA_JVM_ARGS="$2"; shift; shift; continue - ;; - -c|--msg-count) - MSG_COUNT="$2"; shift; shift; continue - ;; - --) - # no more arguments to parse - break - ;; - *) - # no more arguments to parse - break - ;; - esac -done - -CONSUMER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dprecision=mili -Dcon_count=$CON_COUNT -Dprint_std_dev=true" - -start_consumers() -{ - for ((i=0; i<$PROCESS_COUNT; i++)) - do - if [ "$UNIQUE_DEST" = "true" ]; then - sh run-sub "$CONSUMER_ARGS $@" "${TEST_ID}_$i" > ${TEST_ID}_$i.sub.out 2>&1 & - else - sh run-sub "$CONSUMER_ARGS $@" > ${TEST_ID}_$i.sub.out 2>&1 & - fi - done -} - -start_consumers "-Daddress=$ADDRESS -Duse_unique_dest=$UNIQUE_DEST -Dmsg_count=$MSG_COUNT -Dcon_count=$CON_COUNT $EXTRA_JVM_ARGS" - diff --git a/qpid/java/tools/bin/start-producers b/qpid/java/tools/bin/start-producers deleted file mode 100644 index 7ba0286f7c..0000000000 --- a/qpid/java/tools/bin/start-producers +++ /dev/null @@ -1,136 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# This starts the controller for coordinating perf tests/ - -. check-qpid-java-env - -PROGRAM_NAME="start-producers" -PROCESS_COUNT=1 -CON_COUNT=1 -MSG_TYPE="bytes" -WARMUP_MSG_COUNT=1000 -MSG_COUNT=10000 -MSG_SIZE=1024 -ADDRESS="queue;{create:always}" -UNIQUE_DEST="false" - -EXTRA_JVM_ARGS="" -TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'` - -TEMP=$(getopt -n $PROGRAM_NAME -o C:P:uc:p:a:s:t:w:h\ - --long connection-count:,process-count:,create-unique-queues-topics,\ -jvm-args:,queue:,topic:,address:,\ -msg-count:,msg-size:msg-type:,warmup-msg-count,help -- "$@") - -usage() -{ - printf "\n%s\n" "Usage: start-producers [option].." - - printf "\n%32s\n%51s\n" "-C, --connection-count=count" "No of consumers participating in the test" - - printf "\n%29s\n%51s\n" "-P, --process-count=count" "No of producers participating in the test" - - printf "\n%37s\n%105s\n" "-u, --create-unique-queues-topics" "This will create unique queue names and topics based on what you specify for --queue or --topic" - - printf "\n%11s\n%55s\n" "--queue" "The Queue you want to publish to. Ex my-queue" - - printf "\n%11s\n%84s\n" "--topic" "The Topic you want to publish to in amq.topic exchange. Ex amq.topic/topic" - - printf "\n%13s\n%44s\n" "--address" "The address you want to publish to" - - printf "\n%23s\n%37s\n" "-s, --msg-size=size" "message size (default 1024)" - - printf "\n%25s\n%50s\n" "-c, --msg-count=count" "message count per test (default 500,000)" - - printf "\n%18s\n%38s\n" "-t, --msg-type" "{bytes|text} (default bytes)" - - printf "\n%26s\n%49s\n" "-w, --warmup-msg-count" "warm up message count (default 100,000)" - - printf "\n%18s\n%49s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify" -} - -eval set -- "$TEMP" -while true; do - case $1 in - -C|--connection-count) - CON_COUNT="$2"; shift; shift; continue - ;; - -P|--process-count) - PROCESS_COUNT="$2"; shift; shift; continue - ;; - -u|--create-unique-queues-topics) - UNIQUE_DEST="true"; shift; continue - ;; - --queue) - ADDRESS="$2;{create: always}"; shift; shift; continue - ;; - --topic) - ADDRESS="amq.topic/$2"; shift; shift; continue - ;; - --address) - ADDRESS="$2"; shift; shift; continue - ;; - -h|--help) - usage - exit 0 - ;; - -a|--jvm-args) - EXTRA_JVM_ARGS="$2"; shift; shift; continue - ;; - -s|--msg-size) - MSG_SIZE="$2"; shift; shift; continue - ;; - -c|--msg-count) - MSG_COUNT="$2"; shift; shift; continue - ;; - -t|--msg_type) - MSG_TYPE="$2"; shift; shift; continue - ;; - -w|--warmup-msg-count) - WARMUP_MSG_COUNT="$2"; shift; shift; continue - ;; - --) - # no more arguments to parse - break - ;; - *) - # no more arguments to parse - break - ;; - esac -done - -PRODUCER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dext_controller=true -Dprecision=mili -Dcon_count=$CON_COUNT" - -start_producers() -{ - for ((i=0; i<$PROCESS_COUNT; i++)) - do - if [ "$UNIQUE_DEST" = "true" ]; then - sh run-pub "$PRODUCER_ARGS $@" "${TEST_ID}_$i" > ${TEST_ID}_$i.pub.out 2>&1 & - else - sh run-pub "$PRODUCER_ARGS $@" > ${TEST_ID}_$i.pub.out 2>&1 & - fi - done -} - -start_producers "-Daddress=$ADDRESS -Duse_unique_dest=$UNIQUE_DEST -Dmsg_count=$MSG_COUNT -Dmsg_size=$MSG_SIZE -Dwarmup_count=$WARMUP_MSG_COUNT -Dmsg_type=$MSG_TYPE -Dcon_count=$CON_COUNT $EXTRA_JVM_ARGS" - diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java new file mode 100644 index 0000000000..d73be0181b --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java @@ -0,0 +1,214 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import javax.jms.Session; + +public class TestParams +{ + /* + * By default the connection URL is used. + * This allows a user to easily specify a fully fledged URL any given property. + * Ex. SSL parameters + * + * By providing a host & port allows a user to simply override the URL. + * This allows to create multiple clients in test scripts easily, + * without having to deal with the long URL format. + */ + private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"; + + private String host = ""; + + private int port = -1; + + private String address = "queue; {create : always}"; + + private int msg_size = 1024; + + private int random_msg_size_start_from = 1; + + private boolean cacheMessage = false; + + private boolean disableMessageID = false; + + private boolean disableTimestamp = false; + + private boolean durable = false; + + private boolean transacted = false; + + private int transaction_size = 1000; + + private int ack_mode = Session.AUTO_ACKNOWLEDGE; + + private int msg_count = 10; + + private int warmup_count = 1; + + private boolean random_msg_size = false; + + private String msgType = "bytes"; + + private boolean printStdDev = false; + + private long rate = -1; + + private boolean externalController = false; + + private boolean useUniqueDest = false; // useful when using multiple connections. + + public TestParams() + { + + url = System.getProperty("url",url); + host = System.getProperty("host",""); + port = Integer.getInteger("port", -1); + address = System.getProperty("address",address); + + msg_size = Integer.getInteger("msg_size", 1024); + cacheMessage = Boolean.getBoolean("cache_msg"); + disableMessageID = Boolean.getBoolean("disableMessageID"); + disableTimestamp = Boolean.getBoolean("disableTimestamp"); + durable = Boolean.getBoolean("durable"); + transacted = Boolean.getBoolean("transacted"); + transaction_size = Integer.getInteger("trans_size",1000); + ack_mode = Integer.getInteger("ack_mode",Session.AUTO_ACKNOWLEDGE); + msg_count = Integer.getInteger("msg_count",msg_count); + warmup_count = Integer.getInteger("warmup_count",warmup_count); + random_msg_size = Boolean.getBoolean("random_msg_size"); + msgType = System.getProperty("msg_type","bytes"); + printStdDev = Boolean.getBoolean("print_std_dev"); + rate = Long.getLong("rate",-1); + externalController = Boolean.getBoolean("ext_controller"); + useUniqueDest = Boolean.getBoolean("use_unique_dest"); + random_msg_size_start_from = Integer.getInteger("random_msg_size_start_from", 1); + } + + public String getUrl() + { + return url; + } + + public String getHost() + { + return host; + } + + public int getPort() + { + return port; + } + + public String getAddress() + { + return address; + } + + public int getAckMode() + { + return ack_mode; + } + + public int getMsgCount() + { + return msg_count; + } + + public int getMsgSize() + { + return msg_size; + } + + public int getRandomMsgSizeStartFrom() + { + return random_msg_size_start_from; + } + + public boolean isDurable() + { + return durable; + } + + public boolean isTransacted() + { + return transacted; + } + + public int getTransactionSize() + { + return transaction_size; + } + + public int getWarmupCount() + { + return warmup_count; + } + + public boolean isCacheMessage() + { + return cacheMessage; + } + + public boolean isDisableMessageID() + { + return disableMessageID; + } + + public boolean isDisableTimestamp() + { + return disableTimestamp; + } + + public boolean isRandomMsgSize() + { + return random_msg_size; + } + + public String getMessageType() + { + return msgType; + } + + public boolean isPrintStdDev() + { + return printStdDev; + } + + public long getRate() + { + return rate; + } + + public boolean isExternalController() + { + return externalController; + } + + public void setAddress(String addr) + { + address = addr; + } + + public boolean isUseUniqueDests() + { + return useUniqueDest; + } +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java new file mode 100644 index 0000000000..121e94cea1 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java @@ -0,0 +1,226 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.net.InetAddress; +import java.text.DecimalFormat; +import java.util.UUID; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession_0_10; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.messaging.Address; + +public class PerfBase +{ + public final static String CODE = "CODE"; + public final static String ID = "ID"; + public final static String REPLY_ADDR = "REPLY_ADDR"; + public final static String MAX_LATENCY = "MAX_LATENCY"; + public final static String MIN_LATENCY = "MIN_LATENCY"; + public final static String AVG_LATENCY = "AVG_LATENCY"; + public final static String STD_DEV = "STD_DEV"; + public final static String CONS_RATE = "CONS_RATE"; + public final static String PROD_RATE = "PROD_RATE"; + public final static String MSG_COUNT = "MSG_COUNT"; + public final static String TIMESTAMP = "Timestamp"; + + String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}"); + + TestParams params; + Connection con; + Session session; + Session controllerSession; + Destination dest; + Destination myControlQueue; + Destination controllerQueue; + DecimalFormat df = new DecimalFormat("###.##"); + String id; + String myControlQueueAddr; + + MessageProducer sendToController; + MessageConsumer receiveFromController; + String prefix = ""; + + enum OPCode { + REGISTER_CONSUMER, REGISTER_PRODUCER, + PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP, + CONSUMER_READY, PRODUCER_READY, + PRODUCER_START, + RECEIVED_END_MSG, CONSUMER_STOP, + RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS, + CONTINUE_TEST, STOP_TEST + }; + + enum MessageType { + BYTES, TEXT, MAP, OBJECT; + + public static MessageType getType(String s) throws Exception + { + if ("text".equalsIgnoreCase(s)) + { + return TEXT; + } + else if ("bytes".equalsIgnoreCase(s)) + { + return BYTES; + } + /*else if ("map".equalsIgnoreCase(s)) + { + return MAP; + } + else if ("object".equalsIgnoreCase(s)) + { + return OBJECT; + }*/ + else + { + throw new Exception("Unsupported message type"); + } + } + }; + + MessageType msgType = MessageType.BYTES; + + public PerfBase(String prefix) + { + params = new TestParams(); + String host = ""; + try + { + host = InetAddress.getLocalHost().getHostName(); + } + catch (Exception e) + { + } + id = host + "-" + UUID.randomUUID().toString(); + this.prefix = prefix; + this.myControlQueueAddr = id + ";{create: always}"; + } + + public void setUp() throws Exception + { + if (params.getHost().equals("") || params.getPort() == -1) + { + con = new AMQConnection(params.getUrl()); + } + else + { + con = new AMQConnection(params.getHost(),params.getPort(),"guest","guest","test","test"); + } + con.start(); + session = con.createSession(params.isTransacted(), + params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode()); + + controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + + dest = createDestination(); + controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR); + myControlQueue = session.createQueue(myControlQueueAddr); + msgType = MessageType.getType(params.getMessageType()); + System.out.println("Using " + msgType + " messages"); + + sendToController = controllerSession.createProducer(controllerQueue); + receiveFromController = controllerSession.createConsumer(myControlQueue); + } + + private Destination createDestination() throws Exception + { + if (params.isUseUniqueDests()) + { + System.out.println("Prefix : " + prefix); + Address addr = Address.parse(params.getAddress()); + AMQAnyDestination temp = new AMQAnyDestination(params.getAddress()); + int type = ((AMQSession_0_10)session).resolveAddressType(temp); + + if ( type == AMQDestination.TOPIC_TYPE) + { + addr = new Address(addr.getName(),addr.getSubject() + "." + prefix,addr.getOptions()); + System.out.println("Setting subject : " + addr); + } + else + { + addr = new Address(addr.getName() + "_" + prefix,addr.getSubject(),addr.getOptions()); + System.out.println("Setting name : " + addr); + } + + return new AMQAnyDestination(addr); + } + else + { + return new AMQAnyDestination(params.getAddress()); + } + } + + public synchronized void sendMessageToController(MapMessage m) throws Exception + { + m.setString(ID, id); + m.setString(REPLY_ADDR,myControlQueueAddr); + sendToController.send(m); + } + + public void receiveFromController(OPCode expected) throws Exception + { + MapMessage m = (MapMessage)receiveFromController.receive(); + OPCode code = OPCode.values()[m.getInt(CODE)]; + System.out.println("Received Code : " + code); + if (expected != code) + { + throw new Exception("Expected OPCode : " + expected + " but received : " + code); + } + + } + + public boolean continueTest() throws Exception + { + MapMessage m = (MapMessage)receiveFromController.receive(); + OPCode code = OPCode.values()[m.getInt(CODE)]; + System.out.println("Received Code : " + code); + return (code == OPCode.CONTINUE_TEST); + } + + public void tearDown() throws Exception + { + session.close(); + controllerSession.close(); + con.close(); + } + + public void handleError(Exception e,String msg) + { + StringBuilder sb = new StringBuilder(); + sb.append(msg); + sb.append(" "); + sb.append(e.getMessage()); + System.err.println(sb.toString()); + e.printStackTrace(); + } +} + diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java new file mode 100644 index 0000000000..b63892bb51 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java @@ -0,0 +1,325 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.thread.Threading; + +/** + * PerfConsumer will receive x no of messages in warmup mode. + * Once it receives the Start message it will then signal the PerfProducer. + * It will start recording stats from the first message it receives after + * the warmup mode is done. + * + * The following calculations are done. + * The important numbers to look at is + * a) Avg Latency + * b) System throughput. + * + * Latency. + * ========= + * Currently this test is written with the assumption that either + * a) The Perf Producer and Consumer are on the same machine + * b) They are on separate machines that have their time synced via a Time Server + * + * In order to calculate latency the producer inserts a timestamp + * when the message is sent. The consumer will note the current time the message is + * received and will calculate the latency as follows + * latency = rcvdTime - msg.getJMSTimestamp() + * + * Through out the test it will keep track of the max and min latency to show the + * variance in latencies. + * + * Avg latency is measured by adding all latencies and dividing by the total msgs. + * + * Throughput + * =========== + * Consumer rate is calculated as + * rcvdMsgCount/(rcvdTime - startTime) + * + * Note that the testStartTime referes to when the producer sent the first message + * and startTime is when the consumer first received a message. + * + * rcvdTime keeps track of when the last message is received. + * + * All throughput rates are given as msg/sec so the rates are multiplied by 1000. + * + */ + +public class PerfConsumer extends PerfBase implements MessageListener +{ + MessageConsumer consumer; + long maxLatency = 0; + long minLatency = Long.MAX_VALUE; + long totalLatency = 0; // to calculate avg latency. + int rcvdMsgCount = 0; + long startTime = 0; // to measure consumer throughput + long rcvdTime = 0; + boolean transacted = false; + int transSize = 0; + + boolean printStdDev = false; + List sample; + + final Object lock = new Object(); + + public PerfConsumer(String prefix) + { + super(prefix); + System.out.println("Consumer ID : " + id); + } + + public void setUp() throws Exception + { + super.setUp(); + consumer = session.createConsumer(dest); + System.out.println("Consumer: " + id + " Receiving messages from : " + ((AMQDestination)dest).getQueueName() + "\n"); + + // Storing the following two for efficiency + transacted = params.isTransacted(); + transSize = params.getTransactionSize(); + printStdDev = params.isPrintStdDev(); + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal()); + sendMessageToController(m); + } + + public void warmup()throws Exception + { + receiveFromController(OPCode.CONSUMER_STARTWARMUP); + Message msg = consumer.receive(); + // This is to ensure we drain the queue before we start the actual test. + while ( msg != null) + { + if (msg.getBooleanProperty("End") == true) + { + // It's more realistic for the consumer to signal this. + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.PRODUCER_READY.ordinal()); + sendMessageToController(m); + } + msg = consumer.receive(1000); + } + + if (params.isTransacted()) + { + session.commit(); + } + + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.CONSUMER_READY.ordinal()); + sendMessageToController(m); + consumer.setMessageListener(this); + } + + public void startTest() throws Exception + { + System.out.println("Consumer: " + id + " Starting test......" + "\n"); + resetCounters(); + } + + public void resetCounters() + { + rcvdMsgCount = 0; + maxLatency = 0; + minLatency = Long.MAX_VALUE; + totalLatency = 0; + if (printStdDev) + { + sample = null; + sample = new ArrayList(params.getMsgCount()); + } + } + + public void sendResults() throws Exception + { + receiveFromController(OPCode.CONSUMER_STOP); + + double avgLatency = (double)totalLatency/(double)rcvdMsgCount; + double consRate = (double)rcvdMsgCount*Clock.convertToSecs()/(double)(rcvdTime - startTime); + double stdDev = 0.0; + if (printStdDev) + { + stdDev = calculateStdDev(avgLatency); + } + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal()); + m.setDouble(AVG_LATENCY, avgLatency/Clock.convertToMiliSecs()); + m.setDouble(MIN_LATENCY,minLatency/Clock.convertToMiliSecs()); + m.setDouble(MAX_LATENCY,maxLatency/Clock.convertToMiliSecs()); + m.setDouble(STD_DEV, stdDev/Clock.convertToMiliSecs()); + m.setDouble(CONS_RATE, consRate); + m.setLong(MSG_COUNT, rcvdMsgCount); + sendMessageToController(m); + + System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); + System.out.println(new StringBuilder("Consumer rate : "). + append(df.format(consRate)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Avg Latency : "). + append(df.format(avgLatency/Clock.convertToMiliSecs())). + append(" ms").toString()); + System.out.println(new StringBuilder("Min Latency : "). + append(df.format(minLatency/Clock.convertToMiliSecs())). + append(" ms").toString()); + System.out.println(new StringBuilder("Max Latency : "). + append(df.format(maxLatency/Clock.convertToMiliSecs())). + append(" ms").toString()); + if (printStdDev) + { + System.out.println(new StringBuilder("Std Dev : "). + append(stdDev/Clock.convertToMiliSecs()).toString()); + } + } + + public double calculateStdDev(double mean) + { + double v = 0; + for (double latency: sample) + { + v = v + Math.pow((latency-mean), 2); + } + v = v/sample.size(); + return Math.round(Math.sqrt(v)); + } + + public void onMessage(Message msg) + { + try + { + // To figure out the decoding overhead of text + if (msgType == MessageType.TEXT) + { + ((TextMessage)msg).getText(); + } + + if (msg.getBooleanProperty("End")) + { + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal()); + sendMessageToController(m); + } + else + { + rcvdTime = Clock.getTime(); + rcvdMsgCount ++; + + if (rcvdMsgCount == 1) + { + startTime = rcvdTime; + } + + if (transacted && (rcvdMsgCount % transSize == 0)) + { + session.commit(); + } + + long latency = rcvdTime - msg.getLongProperty(TIMESTAMP); + maxLatency = Math.max(maxLatency, latency); + minLatency = Math.min(minLatency, latency); + totalLatency = totalLatency + latency; + if (printStdDev) + { + sample.add(latency); + } + } + + } + catch(Exception e) + { + handleError(e,"Error when receiving messages"); + } + + } + + public void run() + { + try + { + setUp(); + warmup(); + boolean nextIteration = true; + while (nextIteration) + { + System.out.println("=========================================================\n"); + System.out.println("Consumer: " + id + " starting a new iteration ......\n"); + startTest(); + sendResults(); + nextIteration = continueTest(); + } + tearDown(); + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + @Override + public void tearDown() throws Exception + { + super.tearDown(); + } + + public static void main(String[] args) throws InterruptedException + { + String scriptId = (args.length == 1) ? args[0] : ""; + int conCount = Integer.getInteger("con_count",1); + final CountDownLatch testCompleted = new CountDownLatch(conCount); + for (int i=0; i < conCount; i++) + { + + final PerfConsumer cons = new PerfConsumer(scriptId + i); + Runnable r = new Runnable() + { + public void run() + { + cons.run(); + testCompleted.countDown(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } + t.start(); + + } + testCompleted.await(); + System.out.println("Consumers have completed the test......\n"); + } +} \ No newline at end of file diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java new file mode 100644 index 0000000000..ac6129ab68 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java @@ -0,0 +1,358 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; + +import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageProducer; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.thread.Threading; + +/** + * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation + * from the consumer that it has successfully consumed them and ready to start the + * test. It will start sending y no of messages and each message will contain a time + * stamp. This will be used at the receiving end to measure the latency. + * + * This is done with the assumption that both consumer and producer are running on + * the same machine or different machines which have time synced using a time server. + * + * This test also calculates the producer rate as follows. + * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs) + * + * All throughput rates are given as msg/sec so the rates are multiplied by 1000. + * + * Rajith - Producer rate is not an accurate perf metric IMO. + * It is heavily inlfuenced by any in memory buffering. + * System throughput and latencies calculated by the PerfConsumer are more realistic + * numbers. + * + * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs + * I have done so far, it seems quite useful to compute the producer rate as it gives an + * indication of how the system behaves. For ex if there is a gap between producer and consumer rates + * you could clearly see the higher latencies and when producer and consumer rates are very close, + * latency is good. + * + */ +public class PerfProducer extends PerfBase +{ + private static long SEC = 60000; + + MessageProducer producer; + Message msg; + Object payload; + List payloads; + boolean cacheMsg = false; + boolean randomMsgSize = false; + boolean durable = false; + Random random; + int msgSizeRange = 1024; + boolean rateLimitProducer = false; + double rateFactor = 0.4; + double rate = 0.0; + + public PerfProducer(String prefix) + { + super(prefix); + System.out.println("Producer ID : " + id); + } + + public void setUp() throws Exception + { + super.setUp(); + durable = params.isDurable(); + rateLimitProducer = params.getRate() > 0 ? true : false; + if (rateLimitProducer) + { + System.out.println("The test will attempt to limit the producer to " + params.getRate() + " msg/sec"); + } + + // if message caching is enabled we pre create the message + // else we pre create the payload + if (params.isCacheMessage()) + { + cacheMsg = true; + msg = createMessage(createPayload(params.getMsgSize())); + msg.setJMSDeliveryMode(durable? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + } + else if (params.isRandomMsgSize()) + { + random = new Random(20080921); + randomMsgSize = true; + msgSizeRange = params.getMsgSize(); + payloads = new ArrayList(msgSizeRange); + + for (int i=0; i < msgSizeRange; i++) + { + payloads.add(createPayload(i)); + } + } + else + { + payload = createPayload(params.getMsgSize()); + } + + producer = session.createProducer(dest); + System.out.println("Producer: " + id + " Sending messages to: " + ((AMQDestination)dest).getQueueName()); + producer.setDisableMessageID(params.isDisableMessageID()); + producer.setDisableMessageTimestamp(params.isDisableTimestamp()); + + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal()); + sendMessageToController(m); + } + + Object createPayload(int size) + { + if (msgType == MessageType.TEXT) + { + return MessageFactory.createMessagePayload(size); + } + else + { + return MessageFactory.createMessagePayload(size).getBytes(); + } + } + + Message createMessage(Object payload) throws Exception + { + if (msgType == MessageType.TEXT) + { + return session.createTextMessage((String)payload); + } + else + { + BytesMessage m = session.createBytesMessage(); + m.writeBytes((byte[])payload); + return m; + } + } + + protected Message getNextMessage() throws Exception + { + if (cacheMsg) + { + return msg; + } + else + { + Message m; + + if (!randomMsgSize) + { + m = createMessage(payload); + } + else + { + m = createMessage(payloads.get(random.nextInt(msgSizeRange))); + } + m.setJMSDeliveryMode(durable? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + return m; + } + } + + public void warmup()throws Exception + { + receiveFromController(OPCode.PRODUCER_STARTWARMUP); + System.out.println("Producer: " + id + " Warming up......"); + + for (int i=0; i < params.getWarmupCount() -1; i++) + { + producer.send(getNextMessage()); + } + sendEndMessage(); + + if (params.isTransacted()) + { + session.commit(); + } + } + + public void startTest() throws Exception + { + resetCounters(); + receiveFromController(OPCode.PRODUCER_START); + int count = params.getMsgCount(); + boolean transacted = params.isTransacted(); + int tranSize = params.getTransactionSize(); + + long limit = (long)(params.getRate() * rateFactor); // in msecs + long timeLimit = (long)(SEC * rateFactor); // in msecs + + long start = Clock.getTime(); // defaults to nano secs + long interval = start; + for(int i=0; i < count; i++ ) + { + Message msg = getNextMessage(); + msg.setLongProperty(TIMESTAMP, Clock.getTime()); + producer.send(msg); + if ( transacted && ((i+1) % tranSize == 0)) + { + session.commit(); + } + + if (rateLimitProducer && i%limit == 0) + { + long elapsed = (Clock.getTime() - interval)*Clock.convertToMiliSecs(); // in msecs + if (elapsed < timeLimit) + { + Thread.sleep(elapsed); + } + interval = Clock.getTime(); + + } + } + sendEndMessage(); + if ( transacted) + { + session.commit(); + } + long time = Clock.getTime() - start; + rate = (double)count*Clock.convertToSecs()/(double)time; + System.out.println(new StringBuilder("Producer rate: "). + append(df.format(rate)). + append(" msg/sec"). + toString()); + } + + public void resetCounters() + { + + } + + public void sendEndMessage() throws Exception + { + Message msg = session.createMessage(); + msg.setBooleanProperty("End", true); + producer.send(msg); + } + + public void sendResults() throws Exception + { + MapMessage msg = controllerSession.createMapMessage(); + msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal()); + msg.setDouble(PROD_RATE, rate); + sendMessageToController(msg); + } + + @Override + public void tearDown() throws Exception + { + super.tearDown(); + } + + public void run() + { + try + { + setUp(); + warmup(); + boolean nextIteration = true; + while (nextIteration) + { + System.out.println("=========================================================\n"); + System.out.println("Producer: " + id + " starting a new iteration ......\n"); + startTest(); + sendResults(); + nextIteration = continueTest(); + } + tearDown(); + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + public void startControllerIfNeeded() + { + if (!params.isExternalController()) + { + final PerfTestController controller = new PerfTestController(); + Runnable r = new Runnable() + { + public void run() + { + controller.run(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating controller thread",e); + } + t.start(); + } + } + + + public static void main(String[] args) throws InterruptedException + { + String scriptId = (args.length == 1) ? args[0] : ""; + int conCount = Integer.getInteger("con_count",1); + final CountDownLatch testCompleted = new CountDownLatch(conCount); + for (int i=0; i < conCount; i++) + { + final PerfProducer prod = new PerfProducer(scriptId + i); + prod.startControllerIfNeeded(); + Runnable r = new Runnable() + { + public void run() + { + prod.run(); + testCompleted.countDown(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating producer thread",e); + } + t.start(); + } + testCompleted.await(); + System.out.println("Producers have completed the test......"); + } +} \ No newline at end of file diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java new file mode 100644 index 0000000000..5fca1fa4bd --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java @@ -0,0 +1,442 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.io.FileWriter; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; + +import org.apache.qpid.client.message.AMQPEncodedMapMessage; + +/** + * The Controller coordinates a test run between a number + * of producers and consumers, configured via -Dprod_count and -Dcons_count. + * + * It waits till all the producers and consumers have registered and then + * conducts a warmup run. Once all consumers and producers have completed + * the warmup run and is ready, it will conduct the actual test run and + * collect all stats from the participants and calculates the system + * throughput, the avg/min/max for producer rates, consumer rates and latency. + * + * These stats are then printed to std out. + * The Controller also prints events to std out to give a running account + * of the test run in progress. Ex registering of participants, starting warmup ..etc. + * This allows a scripting tool to monitor the progress. + * + * The Controller can be run in two modes. + * 1. A single test run (default) where it just runs until the message count specified + * for the producers via -Dmsg_count is sent and received. + * + * 2. Time based, configured via -Dduration=x, where x is in mins. + * In this mode, the Controller repeatedly cycles through the tests (after an initial + * warmup run) until the desired time is reached. If a test run is in progress + * and the time is up, it will allow the run the complete. + * + * After each iteration, the stats will be printed out in csv format to a separate log file. + * System throughput is calculated as follows + * totalMsgCount/(totalTestTime) + */ +public class PerfTestController extends PerfBase implements MessageListener +{ + enum TestMode { SINGLE_RUN, TIME_BASED }; + + TestMode testMode = TestMode.SINGLE_RUN; + + long totalTestTime; + + private double avgSystemLatency = 0.0; + private double minSystemLatency = Double.MAX_VALUE; + private double maxSystemLatency = 0; + private double avgSystemLatencyStdDev = 0.0; + + private double avgSystemConsRate = 0.0; + private double maxSystemConsRate = 0.0; + private double minSystemConsRate = Double.MAX_VALUE; + + private double avgSystemProdRate = 0.0; + private double maxSystemProdRate = 0.0; + private double minSystemProdRate = Double.MAX_VALUE; + + private long totalMsgCount = 0; + private double totalSystemThroughput = 0.0; + + private int consumerCount = Integer.getInteger("cons_count", 1); + private int producerCount = Integer.getInteger("prod_count", 1); + private int duration = Integer.getInteger("duration", -1); // in mins + private Map consumers; + private Map producers; + + private CountDownLatch consRegistered; + private CountDownLatch prodRegistered; + private CountDownLatch consReady; + private CountDownLatch prodReady; + private CountDownLatch receivedEndMsg; + private CountDownLatch receivedConsStats; + private CountDownLatch receivedProdStats; + + private MessageConsumer consumer; + private boolean printStdDev = false; + FileWriter writer; + + public PerfTestController() + { + super(""); + consumers = new ConcurrentHashMap(consumerCount); + producers = new ConcurrentHashMap(producerCount); + + consRegistered = new CountDownLatch(consumerCount); + prodRegistered = new CountDownLatch(producerCount); + consReady = new CountDownLatch(consumerCount); + prodReady = new CountDownLatch(producerCount); + printStdDev = params.isPrintStdDev(); + testMode = (duration == -1) ? TestMode.SINGLE_RUN : TestMode.TIME_BASED; + } + + public void setUp() throws Exception + { + super.setUp(); + if (testMode == TestMode.TIME_BASED) + { + writer = new FileWriter("stats-csv.log"); + } + consumer = controllerSession.createConsumer(controllerQueue); + System.out.println("\nController: " + producerCount + " producers are expected"); + System.out.println("Controller: " + consumerCount + " consumers are expected \n"); + consumer.setMessageListener(this); + consRegistered.await(); + prodRegistered.await(); + System.out.println("\nController: All producers and consumers have registered......\n"); + } + + public void warmup() throws Exception + { + System.out.println("Controller initiating warm up sequence......"); + sendMessageToNodes(OPCode.CONSUMER_STARTWARMUP,consumers.values()); + sendMessageToNodes(OPCode.PRODUCER_STARTWARMUP,producers.values()); + prodReady.await(); + consReady.await(); + System.out.println("\nController : All producers and consumers are ready to start the test......\n"); + } + + public void startTest() throws Exception + { + resetCounters(); + System.out.println("\nController Starting test......"); + long start = Clock.getTime(); + sendMessageToNodes(OPCode.PRODUCER_START,producers.values()); + receivedEndMsg.await(); + totalTestTime = Clock.getTime() - start; + sendMessageToNodes(OPCode.CONSUMER_STOP,consumers.values()); + receivedProdStats.await(); + receivedConsStats.await(); + } + + public void resetCounters() + { + minSystemLatency = Double.MAX_VALUE; + maxSystemLatency = 0; + maxSystemConsRate = 0.0; + minSystemConsRate = Double.MAX_VALUE; + maxSystemProdRate = 0.0; + minSystemProdRate = Double.MAX_VALUE; + + totalMsgCount = 0; + + receivedConsStats = new CountDownLatch(consumerCount); + receivedProdStats = new CountDownLatch(producerCount); + receivedEndMsg = new CountDownLatch(producerCount); + } + + public void calcStats() throws Exception + { + double totLatency = 0.0; + double totStdDev = 0.0; + double totalConsRate = 0.0; + double totalProdRate = 0.0; + + MapMessage conStat = null; // for error handling + try + { + for (MapMessage m: consumers.values()) + { + conStat = m; + minSystemLatency = Math.min(minSystemLatency,m.getDouble(MIN_LATENCY)); + maxSystemLatency = Math.max(maxSystemLatency,m.getDouble(MAX_LATENCY)); + totLatency = totLatency + m.getDouble(AVG_LATENCY); + totStdDev = totStdDev + m.getDouble(STD_DEV); + + minSystemConsRate = Math.min(minSystemConsRate,m.getDouble(CONS_RATE)); + maxSystemConsRate = Math.max(maxSystemConsRate,m.getDouble(CONS_RATE)); + totalConsRate = totalConsRate + m.getDouble(CONS_RATE); + + totalMsgCount = totalMsgCount + m.getLong(MSG_COUNT); + } + } + catch(Exception e) + { + System.out.println("Error calculating stats from Consumer : " + conStat); + } + + + MapMessage prodStat = null; // for error handling + try + { + for (MapMessage m: producers.values()) + { + prodStat = m; + minSystemProdRate = Math.min(minSystemProdRate,m.getDouble(PROD_RATE)); + maxSystemProdRate = Math.max(maxSystemProdRate,m.getDouble(PROD_RATE)); + totalProdRate = totalProdRate + m.getDouble(PROD_RATE); + } + } + catch(Exception e) + { + System.out.println("Error calculating stats from Producer : " + conStat); + } + + avgSystemLatency = totLatency/consumers.size(); + avgSystemLatencyStdDev = totStdDev/consumers.size(); + avgSystemConsRate = totalConsRate/consumers.size(); + avgSystemProdRate = totalProdRate/producers.size(); + + System.out.println("Total test time : " + totalTestTime + " in " + Clock.getPrecision()); + + totalSystemThroughput = (totalMsgCount*Clock.convertToSecs()/totalTestTime); + } + + public void printResults() throws Exception + { + System.out.println(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString()); + System.out.println(new StringBuilder("System Throughput : "). + append(df.format(totalSystemThroughput)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Avg Consumer rate : "). + append(df.format(avgSystemConsRate)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Min Consumer rate : "). + append(df.format(minSystemConsRate)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Max Consumer rate : "). + append(df.format(maxSystemConsRate)). + append(" msg/sec").toString()); + + System.out.println(new StringBuilder("Avg Producer rate : "). + append(df.format(avgSystemProdRate)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Min Producer rate : "). + append(df.format(minSystemProdRate)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Max Producer rate : "). + append(df.format(maxSystemProdRate)). + append(" msg/sec").toString()); + + System.out.println(new StringBuilder("Avg System Latency : "). + append(df.format(avgSystemLatency)). + append(" ms").toString()); + System.out.println(new StringBuilder("Min System Latency : "). + append(df.format(minSystemLatency)). + append(" ms").toString()); + System.out.println(new StringBuilder("Max System Latency : "). + append(df.format(maxSystemLatency)). + append(" ms").toString()); + if (printStdDev) + { + System.out.println(new StringBuilder("Avg System Std Dev : "). + append(avgSystemLatencyStdDev)); + } + } + + private synchronized void sendMessageToNodes(OPCode code,Collection nodes) throws Exception + { + System.out.println("\nController: Sending code " + code); + MessageProducer tmpProd = controllerSession.createProducer(null); + MapMessage msg = controllerSession.createMapMessage(); + msg.setInt(CODE, code.ordinal()); + for (MapMessage node : nodes) + { + if (node.getString(REPLY_ADDR) == null) + { + System.out.println("REPLY_ADDR is null " + node); + } + else + { + System.out.println("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR)); + } + tmpProd.send(controllerSession.createQueue(node.getString(REPLY_ADDR)), msg); + } + } + + public void onMessage(Message msg) + { + try + { + MapMessage m = (MapMessage)msg; + OPCode code = OPCode.values()[m.getInt(CODE)]; + + System.out.println("\n---------Controller Received Code : " + code); + System.out.println("---------Data : " + ((AMQPEncodedMapMessage)m).getMap()); + + switch (code) + { + case REGISTER_CONSUMER : + if (consRegistered.getCount() == 0) + { + System.out.println("Warning : Expected number of consumers have already registered," + + "ignoring extra consumer"); + break; + } + consumers.put(m.getString(ID),m); + consRegistered.countDown(); + break; + + case REGISTER_PRODUCER : + if (prodRegistered.getCount() == 0) + { + System.out.println("Warning : Expected number of producers have already registered," + + "ignoring extra producer"); + break; + } + producers.put(m.getString(ID),m); + prodRegistered.countDown(); + break; + + case CONSUMER_READY : + consReady.countDown(); + break; + + case PRODUCER_READY : + prodReady.countDown(); + break; + + case RECEIVED_END_MSG : + receivedEndMsg.countDown(); + break; + + case RECEIVED_CONSUMER_STATS : + consumers.put(m.getString(ID),m); + receivedConsStats.countDown(); + break; + + case RECEIVED_PRODUCER_STATS : + producers.put(m.getString(ID),m); + receivedProdStats.countDown(); + break; + + default: + throw new Exception("Invalid OPCode " + code); + } + } + catch (Exception e) + { + handleError(e,"Error when receiving messages " + msg); + } + } + + public void run() + { + try + { + setUp(); + warmup(); + if (testMode == TestMode.SINGLE_RUN) + { + startTest(); + calcStats(); + printResults(); + } + else + { + long startTime = Clock.getTime(); + long timeLimit = duration * 60 * 1000; // duration is in mins. + boolean nextIteration = true; + while (nextIteration) + { + startTest(); + calcStats(); + writeStatsToFile(); + if (Clock.getTime() - startTime < timeLimit) + { + sendMessageToNodes(OPCode.CONTINUE_TEST,consumers.values()); + sendMessageToNodes(OPCode.CONTINUE_TEST,producers.values()); + nextIteration = true; + } + else + { + nextIteration = false; + } + } + } + tearDown(); + + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + @Override + public void tearDown() throws Exception { + System.out.println("Controller: Completed the test......\n"); + if (testMode == TestMode.TIME_BASED) + { + writer.close(); + } + sendMessageToNodes(OPCode.STOP_TEST,consumers.values()); + sendMessageToNodes(OPCode.STOP_TEST,producers.values()); + super.tearDown(); + } + + public void writeStatsToFile() throws Exception + { + writer.append(String.valueOf(totalMsgCount)).append(","); + writer.append(df.format(totalSystemThroughput)).append(","); + writer.append(df.format(avgSystemConsRate)).append(","); + writer.append(df.format(minSystemConsRate)).append(","); + writer.append(df.format(maxSystemConsRate)).append(","); + writer.append(df.format(avgSystemProdRate)).append(","); + writer.append(df.format(minSystemProdRate)).append(","); + writer.append(df.format(maxSystemProdRate)).append(","); + writer.append(df.format(avgSystemLatency)).append(","); + writer.append(df.format(minSystemLatency)).append(","); + writer.append(df.format(maxSystemLatency)); + if (printStdDev) + { + writer.append(",").append(String.valueOf(avgSystemLatencyStdDev)); + } + writer.append("\n"); + writer.flush(); + } + + public static void main(String[] args) + { + PerfTestController controller = new PerfTestController(); + controller.run(); + } +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java deleted file mode 100644 index 121e94cea1..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import java.net.InetAddress; -import java.text.DecimalFormat; -import java.util.UUID; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.MapMessage; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.qpid.client.AMQAnyDestination; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQSession_0_10; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.messaging.Address; - -public class PerfBase -{ - public final static String CODE = "CODE"; - public final static String ID = "ID"; - public final static String REPLY_ADDR = "REPLY_ADDR"; - public final static String MAX_LATENCY = "MAX_LATENCY"; - public final static String MIN_LATENCY = "MIN_LATENCY"; - public final static String AVG_LATENCY = "AVG_LATENCY"; - public final static String STD_DEV = "STD_DEV"; - public final static String CONS_RATE = "CONS_RATE"; - public final static String PROD_RATE = "PROD_RATE"; - public final static String MSG_COUNT = "MSG_COUNT"; - public final static String TIMESTAMP = "Timestamp"; - - String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}"); - - TestParams params; - Connection con; - Session session; - Session controllerSession; - Destination dest; - Destination myControlQueue; - Destination controllerQueue; - DecimalFormat df = new DecimalFormat("###.##"); - String id; - String myControlQueueAddr; - - MessageProducer sendToController; - MessageConsumer receiveFromController; - String prefix = ""; - - enum OPCode { - REGISTER_CONSUMER, REGISTER_PRODUCER, - PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP, - CONSUMER_READY, PRODUCER_READY, - PRODUCER_START, - RECEIVED_END_MSG, CONSUMER_STOP, - RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS, - CONTINUE_TEST, STOP_TEST - }; - - enum MessageType { - BYTES, TEXT, MAP, OBJECT; - - public static MessageType getType(String s) throws Exception - { - if ("text".equalsIgnoreCase(s)) - { - return TEXT; - } - else if ("bytes".equalsIgnoreCase(s)) - { - return BYTES; - } - /*else if ("map".equalsIgnoreCase(s)) - { - return MAP; - } - else if ("object".equalsIgnoreCase(s)) - { - return OBJECT; - }*/ - else - { - throw new Exception("Unsupported message type"); - } - } - }; - - MessageType msgType = MessageType.BYTES; - - public PerfBase(String prefix) - { - params = new TestParams(); - String host = ""; - try - { - host = InetAddress.getLocalHost().getHostName(); - } - catch (Exception e) - { - } - id = host + "-" + UUID.randomUUID().toString(); - this.prefix = prefix; - this.myControlQueueAddr = id + ";{create: always}"; - } - - public void setUp() throws Exception - { - if (params.getHost().equals("") || params.getPort() == -1) - { - con = new AMQConnection(params.getUrl()); - } - else - { - con = new AMQConnection(params.getHost(),params.getPort(),"guest","guest","test","test"); - } - con.start(); - session = con.createSession(params.isTransacted(), - params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode()); - - controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - - dest = createDestination(); - controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR); - myControlQueue = session.createQueue(myControlQueueAddr); - msgType = MessageType.getType(params.getMessageType()); - System.out.println("Using " + msgType + " messages"); - - sendToController = controllerSession.createProducer(controllerQueue); - receiveFromController = controllerSession.createConsumer(myControlQueue); - } - - private Destination createDestination() throws Exception - { - if (params.isUseUniqueDests()) - { - System.out.println("Prefix : " + prefix); - Address addr = Address.parse(params.getAddress()); - AMQAnyDestination temp = new AMQAnyDestination(params.getAddress()); - int type = ((AMQSession_0_10)session).resolveAddressType(temp); - - if ( type == AMQDestination.TOPIC_TYPE) - { - addr = new Address(addr.getName(),addr.getSubject() + "." + prefix,addr.getOptions()); - System.out.println("Setting subject : " + addr); - } - else - { - addr = new Address(addr.getName() + "_" + prefix,addr.getSubject(),addr.getOptions()); - System.out.println("Setting name : " + addr); - } - - return new AMQAnyDestination(addr); - } - else - { - return new AMQAnyDestination(params.getAddress()); - } - } - - public synchronized void sendMessageToController(MapMessage m) throws Exception - { - m.setString(ID, id); - m.setString(REPLY_ADDR,myControlQueueAddr); - sendToController.send(m); - } - - public void receiveFromController(OPCode expected) throws Exception - { - MapMessage m = (MapMessage)receiveFromController.receive(); - OPCode code = OPCode.values()[m.getInt(CODE)]; - System.out.println("Received Code : " + code); - if (expected != code) - { - throw new Exception("Expected OPCode : " + expected + " but received : " + code); - } - - } - - public boolean continueTest() throws Exception - { - MapMessage m = (MapMessage)receiveFromController.receive(); - OPCode code = OPCode.values()[m.getInt(CODE)]; - System.out.println("Received Code : " + code); - return (code == OPCode.CONTINUE_TEST); - } - - public void tearDown() throws Exception - { - session.close(); - controllerSession.close(); - con.close(); - } - - public void handleError(Exception e,String msg) - { - StringBuilder sb = new StringBuilder(); - sb.append(msg); - sb.append(" "); - sb.append(e.getMessage()); - System.err.println(sb.toString()); - e.printStackTrace(); - } -} - diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java deleted file mode 100644 index b63892bb51..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java +++ /dev/null @@ -1,325 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; - -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.TextMessage; - -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.thread.Threading; - -/** - * PerfConsumer will receive x no of messages in warmup mode. - * Once it receives the Start message it will then signal the PerfProducer. - * It will start recording stats from the first message it receives after - * the warmup mode is done. - * - * The following calculations are done. - * The important numbers to look at is - * a) Avg Latency - * b) System throughput. - * - * Latency. - * ========= - * Currently this test is written with the assumption that either - * a) The Perf Producer and Consumer are on the same machine - * b) They are on separate machines that have their time synced via a Time Server - * - * In order to calculate latency the producer inserts a timestamp - * when the message is sent. The consumer will note the current time the message is - * received and will calculate the latency as follows - * latency = rcvdTime - msg.getJMSTimestamp() - * - * Through out the test it will keep track of the max and min latency to show the - * variance in latencies. - * - * Avg latency is measured by adding all latencies and dividing by the total msgs. - * - * Throughput - * =========== - * Consumer rate is calculated as - * rcvdMsgCount/(rcvdTime - startTime) - * - * Note that the testStartTime referes to when the producer sent the first message - * and startTime is when the consumer first received a message. - * - * rcvdTime keeps track of when the last message is received. - * - * All throughput rates are given as msg/sec so the rates are multiplied by 1000. - * - */ - -public class PerfConsumer extends PerfBase implements MessageListener -{ - MessageConsumer consumer; - long maxLatency = 0; - long minLatency = Long.MAX_VALUE; - long totalLatency = 0; // to calculate avg latency. - int rcvdMsgCount = 0; - long startTime = 0; // to measure consumer throughput - long rcvdTime = 0; - boolean transacted = false; - int transSize = 0; - - boolean printStdDev = false; - List sample; - - final Object lock = new Object(); - - public PerfConsumer(String prefix) - { - super(prefix); - System.out.println("Consumer ID : " + id); - } - - public void setUp() throws Exception - { - super.setUp(); - consumer = session.createConsumer(dest); - System.out.println("Consumer: " + id + " Receiving messages from : " + ((AMQDestination)dest).getQueueName() + "\n"); - - // Storing the following two for efficiency - transacted = params.isTransacted(); - transSize = params.getTransactionSize(); - printStdDev = params.isPrintStdDev(); - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal()); - sendMessageToController(m); - } - - public void warmup()throws Exception - { - receiveFromController(OPCode.CONSUMER_STARTWARMUP); - Message msg = consumer.receive(); - // This is to ensure we drain the queue before we start the actual test. - while ( msg != null) - { - if (msg.getBooleanProperty("End") == true) - { - // It's more realistic for the consumer to signal this. - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.PRODUCER_READY.ordinal()); - sendMessageToController(m); - } - msg = consumer.receive(1000); - } - - if (params.isTransacted()) - { - session.commit(); - } - - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.CONSUMER_READY.ordinal()); - sendMessageToController(m); - consumer.setMessageListener(this); - } - - public void startTest() throws Exception - { - System.out.println("Consumer: " + id + " Starting test......" + "\n"); - resetCounters(); - } - - public void resetCounters() - { - rcvdMsgCount = 0; - maxLatency = 0; - minLatency = Long.MAX_VALUE; - totalLatency = 0; - if (printStdDev) - { - sample = null; - sample = new ArrayList(params.getMsgCount()); - } - } - - public void sendResults() throws Exception - { - receiveFromController(OPCode.CONSUMER_STOP); - - double avgLatency = (double)totalLatency/(double)rcvdMsgCount; - double consRate = (double)rcvdMsgCount*Clock.convertToSecs()/(double)(rcvdTime - startTime); - double stdDev = 0.0; - if (printStdDev) - { - stdDev = calculateStdDev(avgLatency); - } - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal()); - m.setDouble(AVG_LATENCY, avgLatency/Clock.convertToMiliSecs()); - m.setDouble(MIN_LATENCY,minLatency/Clock.convertToMiliSecs()); - m.setDouble(MAX_LATENCY,maxLatency/Clock.convertToMiliSecs()); - m.setDouble(STD_DEV, stdDev/Clock.convertToMiliSecs()); - m.setDouble(CONS_RATE, consRate); - m.setLong(MSG_COUNT, rcvdMsgCount); - sendMessageToController(m); - - System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); - System.out.println(new StringBuilder("Consumer rate : "). - append(df.format(consRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Avg Latency : "). - append(df.format(avgLatency/Clock.convertToMiliSecs())). - append(" ms").toString()); - System.out.println(new StringBuilder("Min Latency : "). - append(df.format(minLatency/Clock.convertToMiliSecs())). - append(" ms").toString()); - System.out.println(new StringBuilder("Max Latency : "). - append(df.format(maxLatency/Clock.convertToMiliSecs())). - append(" ms").toString()); - if (printStdDev) - { - System.out.println(new StringBuilder("Std Dev : "). - append(stdDev/Clock.convertToMiliSecs()).toString()); - } - } - - public double calculateStdDev(double mean) - { - double v = 0; - for (double latency: sample) - { - v = v + Math.pow((latency-mean), 2); - } - v = v/sample.size(); - return Math.round(Math.sqrt(v)); - } - - public void onMessage(Message msg) - { - try - { - // To figure out the decoding overhead of text - if (msgType == MessageType.TEXT) - { - ((TextMessage)msg).getText(); - } - - if (msg.getBooleanProperty("End")) - { - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal()); - sendMessageToController(m); - } - else - { - rcvdTime = Clock.getTime(); - rcvdMsgCount ++; - - if (rcvdMsgCount == 1) - { - startTime = rcvdTime; - } - - if (transacted && (rcvdMsgCount % transSize == 0)) - { - session.commit(); - } - - long latency = rcvdTime - msg.getLongProperty(TIMESTAMP); - maxLatency = Math.max(maxLatency, latency); - minLatency = Math.min(minLatency, latency); - totalLatency = totalLatency + latency; - if (printStdDev) - { - sample.add(latency); - } - } - - } - catch(Exception e) - { - handleError(e,"Error when receiving messages"); - } - - } - - public void run() - { - try - { - setUp(); - warmup(); - boolean nextIteration = true; - while (nextIteration) - { - System.out.println("=========================================================\n"); - System.out.println("Consumer: " + id + " starting a new iteration ......\n"); - startTest(); - sendResults(); - nextIteration = continueTest(); - } - tearDown(); - } - catch(Exception e) - { - handleError(e,"Error when running test"); - } - } - - @Override - public void tearDown() throws Exception - { - super.tearDown(); - } - - public static void main(String[] args) throws InterruptedException - { - String scriptId = (args.length == 1) ? args[0] : ""; - int conCount = Integer.getInteger("con_count",1); - final CountDownLatch testCompleted = new CountDownLatch(conCount); - for (int i=0; i < conCount; i++) - { - - final PerfConsumer cons = new PerfConsumer(scriptId + i); - Runnable r = new Runnable() - { - public void run() - { - cons.run(); - testCompleted.countDown(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); - } - t.start(); - - } - testCompleted.await(); - System.out.println("Consumers have completed the test......\n"); - } -} \ No newline at end of file diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java deleted file mode 100644 index ac6129ab68..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java +++ /dev/null @@ -1,358 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.concurrent.CountDownLatch; - -import javax.jms.BytesMessage; -import javax.jms.DeliveryMode; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageProducer; - -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.thread.Threading; - -/** - * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation - * from the consumer that it has successfully consumed them and ready to start the - * test. It will start sending y no of messages and each message will contain a time - * stamp. This will be used at the receiving end to measure the latency. - * - * This is done with the assumption that both consumer and producer are running on - * the same machine or different machines which have time synced using a time server. - * - * This test also calculates the producer rate as follows. - * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs) - * - * All throughput rates are given as msg/sec so the rates are multiplied by 1000. - * - * Rajith - Producer rate is not an accurate perf metric IMO. - * It is heavily inlfuenced by any in memory buffering. - * System throughput and latencies calculated by the PerfConsumer are more realistic - * numbers. - * - * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs - * I have done so far, it seems quite useful to compute the producer rate as it gives an - * indication of how the system behaves. For ex if there is a gap between producer and consumer rates - * you could clearly see the higher latencies and when producer and consumer rates are very close, - * latency is good. - * - */ -public class PerfProducer extends PerfBase -{ - private static long SEC = 60000; - - MessageProducer producer; - Message msg; - Object payload; - List payloads; - boolean cacheMsg = false; - boolean randomMsgSize = false; - boolean durable = false; - Random random; - int msgSizeRange = 1024; - boolean rateLimitProducer = false; - double rateFactor = 0.4; - double rate = 0.0; - - public PerfProducer(String prefix) - { - super(prefix); - System.out.println("Producer ID : " + id); - } - - public void setUp() throws Exception - { - super.setUp(); - durable = params.isDurable(); - rateLimitProducer = params.getRate() > 0 ? true : false; - if (rateLimitProducer) - { - System.out.println("The test will attempt to limit the producer to " + params.getRate() + " msg/sec"); - } - - // if message caching is enabled we pre create the message - // else we pre create the payload - if (params.isCacheMessage()) - { - cacheMsg = true; - msg = createMessage(createPayload(params.getMsgSize())); - msg.setJMSDeliveryMode(durable? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - } - else if (params.isRandomMsgSize()) - { - random = new Random(20080921); - randomMsgSize = true; - msgSizeRange = params.getMsgSize(); - payloads = new ArrayList(msgSizeRange); - - for (int i=0; i < msgSizeRange; i++) - { - payloads.add(createPayload(i)); - } - } - else - { - payload = createPayload(params.getMsgSize()); - } - - producer = session.createProducer(dest); - System.out.println("Producer: " + id + " Sending messages to: " + ((AMQDestination)dest).getQueueName()); - producer.setDisableMessageID(params.isDisableMessageID()); - producer.setDisableMessageTimestamp(params.isDisableTimestamp()); - - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal()); - sendMessageToController(m); - } - - Object createPayload(int size) - { - if (msgType == MessageType.TEXT) - { - return MessageFactory.createMessagePayload(size); - } - else - { - return MessageFactory.createMessagePayload(size).getBytes(); - } - } - - Message createMessage(Object payload) throws Exception - { - if (msgType == MessageType.TEXT) - { - return session.createTextMessage((String)payload); - } - else - { - BytesMessage m = session.createBytesMessage(); - m.writeBytes((byte[])payload); - return m; - } - } - - protected Message getNextMessage() throws Exception - { - if (cacheMsg) - { - return msg; - } - else - { - Message m; - - if (!randomMsgSize) - { - m = createMessage(payload); - } - else - { - m = createMessage(payloads.get(random.nextInt(msgSizeRange))); - } - m.setJMSDeliveryMode(durable? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - return m; - } - } - - public void warmup()throws Exception - { - receiveFromController(OPCode.PRODUCER_STARTWARMUP); - System.out.println("Producer: " + id + " Warming up......"); - - for (int i=0; i < params.getWarmupCount() -1; i++) - { - producer.send(getNextMessage()); - } - sendEndMessage(); - - if (params.isTransacted()) - { - session.commit(); - } - } - - public void startTest() throws Exception - { - resetCounters(); - receiveFromController(OPCode.PRODUCER_START); - int count = params.getMsgCount(); - boolean transacted = params.isTransacted(); - int tranSize = params.getTransactionSize(); - - long limit = (long)(params.getRate() * rateFactor); // in msecs - long timeLimit = (long)(SEC * rateFactor); // in msecs - - long start = Clock.getTime(); // defaults to nano secs - long interval = start; - for(int i=0; i < count; i++ ) - { - Message msg = getNextMessage(); - msg.setLongProperty(TIMESTAMP, Clock.getTime()); - producer.send(msg); - if ( transacted && ((i+1) % tranSize == 0)) - { - session.commit(); - } - - if (rateLimitProducer && i%limit == 0) - { - long elapsed = (Clock.getTime() - interval)*Clock.convertToMiliSecs(); // in msecs - if (elapsed < timeLimit) - { - Thread.sleep(elapsed); - } - interval = Clock.getTime(); - - } - } - sendEndMessage(); - if ( transacted) - { - session.commit(); - } - long time = Clock.getTime() - start; - rate = (double)count*Clock.convertToSecs()/(double)time; - System.out.println(new StringBuilder("Producer rate: "). - append(df.format(rate)). - append(" msg/sec"). - toString()); - } - - public void resetCounters() - { - - } - - public void sendEndMessage() throws Exception - { - Message msg = session.createMessage(); - msg.setBooleanProperty("End", true); - producer.send(msg); - } - - public void sendResults() throws Exception - { - MapMessage msg = controllerSession.createMapMessage(); - msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal()); - msg.setDouble(PROD_RATE, rate); - sendMessageToController(msg); - } - - @Override - public void tearDown() throws Exception - { - super.tearDown(); - } - - public void run() - { - try - { - setUp(); - warmup(); - boolean nextIteration = true; - while (nextIteration) - { - System.out.println("=========================================================\n"); - System.out.println("Producer: " + id + " starting a new iteration ......\n"); - startTest(); - sendResults(); - nextIteration = continueTest(); - } - tearDown(); - } - catch(Exception e) - { - handleError(e,"Error when running test"); - } - } - - public void startControllerIfNeeded() - { - if (!params.isExternalController()) - { - final PerfTestController controller = new PerfTestController(); - Runnable r = new Runnable() - { - public void run() - { - controller.run(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating controller thread",e); - } - t.start(); - } - } - - - public static void main(String[] args) throws InterruptedException - { - String scriptId = (args.length == 1) ? args[0] : ""; - int conCount = Integer.getInteger("con_count",1); - final CountDownLatch testCompleted = new CountDownLatch(conCount); - for (int i=0; i < conCount; i++) - { - final PerfProducer prod = new PerfProducer(scriptId + i); - prod.startControllerIfNeeded(); - Runnable r = new Runnable() - { - public void run() - { - prod.run(); - testCompleted.countDown(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating producer thread",e); - } - t.start(); - } - testCompleted.await(); - System.out.println("Producers have completed the test......"); - } -} \ No newline at end of file diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java deleted file mode 100644 index 5fca1fa4bd..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java +++ /dev/null @@ -1,442 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import java.io.FileWriter; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; - -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; - -import org.apache.qpid.client.message.AMQPEncodedMapMessage; - -/** - * The Controller coordinates a test run between a number - * of producers and consumers, configured via -Dprod_count and -Dcons_count. - * - * It waits till all the producers and consumers have registered and then - * conducts a warmup run. Once all consumers and producers have completed - * the warmup run and is ready, it will conduct the actual test run and - * collect all stats from the participants and calculates the system - * throughput, the avg/min/max for producer rates, consumer rates and latency. - * - * These stats are then printed to std out. - * The Controller also prints events to std out to give a running account - * of the test run in progress. Ex registering of participants, starting warmup ..etc. - * This allows a scripting tool to monitor the progress. - * - * The Controller can be run in two modes. - * 1. A single test run (default) where it just runs until the message count specified - * for the producers via -Dmsg_count is sent and received. - * - * 2. Time based, configured via -Dduration=x, where x is in mins. - * In this mode, the Controller repeatedly cycles through the tests (after an initial - * warmup run) until the desired time is reached. If a test run is in progress - * and the time is up, it will allow the run the complete. - * - * After each iteration, the stats will be printed out in csv format to a separate log file. - * System throughput is calculated as follows - * totalMsgCount/(totalTestTime) - */ -public class PerfTestController extends PerfBase implements MessageListener -{ - enum TestMode { SINGLE_RUN, TIME_BASED }; - - TestMode testMode = TestMode.SINGLE_RUN; - - long totalTestTime; - - private double avgSystemLatency = 0.0; - private double minSystemLatency = Double.MAX_VALUE; - private double maxSystemLatency = 0; - private double avgSystemLatencyStdDev = 0.0; - - private double avgSystemConsRate = 0.0; - private double maxSystemConsRate = 0.0; - private double minSystemConsRate = Double.MAX_VALUE; - - private double avgSystemProdRate = 0.0; - private double maxSystemProdRate = 0.0; - private double minSystemProdRate = Double.MAX_VALUE; - - private long totalMsgCount = 0; - private double totalSystemThroughput = 0.0; - - private int consumerCount = Integer.getInteger("cons_count", 1); - private int producerCount = Integer.getInteger("prod_count", 1); - private int duration = Integer.getInteger("duration", -1); // in mins - private Map consumers; - private Map producers; - - private CountDownLatch consRegistered; - private CountDownLatch prodRegistered; - private CountDownLatch consReady; - private CountDownLatch prodReady; - private CountDownLatch receivedEndMsg; - private CountDownLatch receivedConsStats; - private CountDownLatch receivedProdStats; - - private MessageConsumer consumer; - private boolean printStdDev = false; - FileWriter writer; - - public PerfTestController() - { - super(""); - consumers = new ConcurrentHashMap(consumerCount); - producers = new ConcurrentHashMap(producerCount); - - consRegistered = new CountDownLatch(consumerCount); - prodRegistered = new CountDownLatch(producerCount); - consReady = new CountDownLatch(consumerCount); - prodReady = new CountDownLatch(producerCount); - printStdDev = params.isPrintStdDev(); - testMode = (duration == -1) ? TestMode.SINGLE_RUN : TestMode.TIME_BASED; - } - - public void setUp() throws Exception - { - super.setUp(); - if (testMode == TestMode.TIME_BASED) - { - writer = new FileWriter("stats-csv.log"); - } - consumer = controllerSession.createConsumer(controllerQueue); - System.out.println("\nController: " + producerCount + " producers are expected"); - System.out.println("Controller: " + consumerCount + " consumers are expected \n"); - consumer.setMessageListener(this); - consRegistered.await(); - prodRegistered.await(); - System.out.println("\nController: All producers and consumers have registered......\n"); - } - - public void warmup() throws Exception - { - System.out.println("Controller initiating warm up sequence......"); - sendMessageToNodes(OPCode.CONSUMER_STARTWARMUP,consumers.values()); - sendMessageToNodes(OPCode.PRODUCER_STARTWARMUP,producers.values()); - prodReady.await(); - consReady.await(); - System.out.println("\nController : All producers and consumers are ready to start the test......\n"); - } - - public void startTest() throws Exception - { - resetCounters(); - System.out.println("\nController Starting test......"); - long start = Clock.getTime(); - sendMessageToNodes(OPCode.PRODUCER_START,producers.values()); - receivedEndMsg.await(); - totalTestTime = Clock.getTime() - start; - sendMessageToNodes(OPCode.CONSUMER_STOP,consumers.values()); - receivedProdStats.await(); - receivedConsStats.await(); - } - - public void resetCounters() - { - minSystemLatency = Double.MAX_VALUE; - maxSystemLatency = 0; - maxSystemConsRate = 0.0; - minSystemConsRate = Double.MAX_VALUE; - maxSystemProdRate = 0.0; - minSystemProdRate = Double.MAX_VALUE; - - totalMsgCount = 0; - - receivedConsStats = new CountDownLatch(consumerCount); - receivedProdStats = new CountDownLatch(producerCount); - receivedEndMsg = new CountDownLatch(producerCount); - } - - public void calcStats() throws Exception - { - double totLatency = 0.0; - double totStdDev = 0.0; - double totalConsRate = 0.0; - double totalProdRate = 0.0; - - MapMessage conStat = null; // for error handling - try - { - for (MapMessage m: consumers.values()) - { - conStat = m; - minSystemLatency = Math.min(minSystemLatency,m.getDouble(MIN_LATENCY)); - maxSystemLatency = Math.max(maxSystemLatency,m.getDouble(MAX_LATENCY)); - totLatency = totLatency + m.getDouble(AVG_LATENCY); - totStdDev = totStdDev + m.getDouble(STD_DEV); - - minSystemConsRate = Math.min(minSystemConsRate,m.getDouble(CONS_RATE)); - maxSystemConsRate = Math.max(maxSystemConsRate,m.getDouble(CONS_RATE)); - totalConsRate = totalConsRate + m.getDouble(CONS_RATE); - - totalMsgCount = totalMsgCount + m.getLong(MSG_COUNT); - } - } - catch(Exception e) - { - System.out.println("Error calculating stats from Consumer : " + conStat); - } - - - MapMessage prodStat = null; // for error handling - try - { - for (MapMessage m: producers.values()) - { - prodStat = m; - minSystemProdRate = Math.min(minSystemProdRate,m.getDouble(PROD_RATE)); - maxSystemProdRate = Math.max(maxSystemProdRate,m.getDouble(PROD_RATE)); - totalProdRate = totalProdRate + m.getDouble(PROD_RATE); - } - } - catch(Exception e) - { - System.out.println("Error calculating stats from Producer : " + conStat); - } - - avgSystemLatency = totLatency/consumers.size(); - avgSystemLatencyStdDev = totStdDev/consumers.size(); - avgSystemConsRate = totalConsRate/consumers.size(); - avgSystemProdRate = totalProdRate/producers.size(); - - System.out.println("Total test time : " + totalTestTime + " in " + Clock.getPrecision()); - - totalSystemThroughput = (totalMsgCount*Clock.convertToSecs()/totalTestTime); - } - - public void printResults() throws Exception - { - System.out.println(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString()); - System.out.println(new StringBuilder("System Throughput : "). - append(df.format(totalSystemThroughput)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Avg Consumer rate : "). - append(df.format(avgSystemConsRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Min Consumer rate : "). - append(df.format(minSystemConsRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Max Consumer rate : "). - append(df.format(maxSystemConsRate)). - append(" msg/sec").toString()); - - System.out.println(new StringBuilder("Avg Producer rate : "). - append(df.format(avgSystemProdRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Min Producer rate : "). - append(df.format(minSystemProdRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Max Producer rate : "). - append(df.format(maxSystemProdRate)). - append(" msg/sec").toString()); - - System.out.println(new StringBuilder("Avg System Latency : "). - append(df.format(avgSystemLatency)). - append(" ms").toString()); - System.out.println(new StringBuilder("Min System Latency : "). - append(df.format(minSystemLatency)). - append(" ms").toString()); - System.out.println(new StringBuilder("Max System Latency : "). - append(df.format(maxSystemLatency)). - append(" ms").toString()); - if (printStdDev) - { - System.out.println(new StringBuilder("Avg System Std Dev : "). - append(avgSystemLatencyStdDev)); - } - } - - private synchronized void sendMessageToNodes(OPCode code,Collection nodes) throws Exception - { - System.out.println("\nController: Sending code " + code); - MessageProducer tmpProd = controllerSession.createProducer(null); - MapMessage msg = controllerSession.createMapMessage(); - msg.setInt(CODE, code.ordinal()); - for (MapMessage node : nodes) - { - if (node.getString(REPLY_ADDR) == null) - { - System.out.println("REPLY_ADDR is null " + node); - } - else - { - System.out.println("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR)); - } - tmpProd.send(controllerSession.createQueue(node.getString(REPLY_ADDR)), msg); - } - } - - public void onMessage(Message msg) - { - try - { - MapMessage m = (MapMessage)msg; - OPCode code = OPCode.values()[m.getInt(CODE)]; - - System.out.println("\n---------Controller Received Code : " + code); - System.out.println("---------Data : " + ((AMQPEncodedMapMessage)m).getMap()); - - switch (code) - { - case REGISTER_CONSUMER : - if (consRegistered.getCount() == 0) - { - System.out.println("Warning : Expected number of consumers have already registered," + - "ignoring extra consumer"); - break; - } - consumers.put(m.getString(ID),m); - consRegistered.countDown(); - break; - - case REGISTER_PRODUCER : - if (prodRegistered.getCount() == 0) - { - System.out.println("Warning : Expected number of producers have already registered," + - "ignoring extra producer"); - break; - } - producers.put(m.getString(ID),m); - prodRegistered.countDown(); - break; - - case CONSUMER_READY : - consReady.countDown(); - break; - - case PRODUCER_READY : - prodReady.countDown(); - break; - - case RECEIVED_END_MSG : - receivedEndMsg.countDown(); - break; - - case RECEIVED_CONSUMER_STATS : - consumers.put(m.getString(ID),m); - receivedConsStats.countDown(); - break; - - case RECEIVED_PRODUCER_STATS : - producers.put(m.getString(ID),m); - receivedProdStats.countDown(); - break; - - default: - throw new Exception("Invalid OPCode " + code); - } - } - catch (Exception e) - { - handleError(e,"Error when receiving messages " + msg); - } - } - - public void run() - { - try - { - setUp(); - warmup(); - if (testMode == TestMode.SINGLE_RUN) - { - startTest(); - calcStats(); - printResults(); - } - else - { - long startTime = Clock.getTime(); - long timeLimit = duration * 60 * 1000; // duration is in mins. - boolean nextIteration = true; - while (nextIteration) - { - startTest(); - calcStats(); - writeStatsToFile(); - if (Clock.getTime() - startTime < timeLimit) - { - sendMessageToNodes(OPCode.CONTINUE_TEST,consumers.values()); - sendMessageToNodes(OPCode.CONTINUE_TEST,producers.values()); - nextIteration = true; - } - else - { - nextIteration = false; - } - } - } - tearDown(); - - } - catch(Exception e) - { - handleError(e,"Error when running test"); - } - } - - @Override - public void tearDown() throws Exception { - System.out.println("Controller: Completed the test......\n"); - if (testMode == TestMode.TIME_BASED) - { - writer.close(); - } - sendMessageToNodes(OPCode.STOP_TEST,consumers.values()); - sendMessageToNodes(OPCode.STOP_TEST,producers.values()); - super.tearDown(); - } - - public void writeStatsToFile() throws Exception - { - writer.append(String.valueOf(totalMsgCount)).append(","); - writer.append(df.format(totalSystemThroughput)).append(","); - writer.append(df.format(avgSystemConsRate)).append(","); - writer.append(df.format(minSystemConsRate)).append(","); - writer.append(df.format(maxSystemConsRate)).append(","); - writer.append(df.format(avgSystemProdRate)).append(","); - writer.append(df.format(minSystemProdRate)).append(","); - writer.append(df.format(maxSystemProdRate)).append(","); - writer.append(df.format(avgSystemLatency)).append(","); - writer.append(df.format(minSystemLatency)).append(","); - writer.append(df.format(maxSystemLatency)); - if (printStdDev) - { - writer.append(",").append(String.valueOf(avgSystemLatencyStdDev)); - } - writer.append("\n"); - writer.flush(); - } - - public static void main(String[] args) - { - PerfTestController controller = new PerfTestController(); - controller.run(); - } -} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java deleted file mode 100644 index d73be0181b..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import javax.jms.Session; - -public class TestParams -{ - /* - * By default the connection URL is used. - * This allows a user to easily specify a fully fledged URL any given property. - * Ex. SSL parameters - * - * By providing a host & port allows a user to simply override the URL. - * This allows to create multiple clients in test scripts easily, - * without having to deal with the long URL format. - */ - private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"; - - private String host = ""; - - private int port = -1; - - private String address = "queue; {create : always}"; - - private int msg_size = 1024; - - private int random_msg_size_start_from = 1; - - private boolean cacheMessage = false; - - private boolean disableMessageID = false; - - private boolean disableTimestamp = false; - - private boolean durable = false; - - private boolean transacted = false; - - private int transaction_size = 1000; - - private int ack_mode = Session.AUTO_ACKNOWLEDGE; - - private int msg_count = 10; - - private int warmup_count = 1; - - private boolean random_msg_size = false; - - private String msgType = "bytes"; - - private boolean printStdDev = false; - - private long rate = -1; - - private boolean externalController = false; - - private boolean useUniqueDest = false; // useful when using multiple connections. - - public TestParams() - { - - url = System.getProperty("url",url); - host = System.getProperty("host",""); - port = Integer.getInteger("port", -1); - address = System.getProperty("address",address); - - msg_size = Integer.getInteger("msg_size", 1024); - cacheMessage = Boolean.getBoolean("cache_msg"); - disableMessageID = Boolean.getBoolean("disableMessageID"); - disableTimestamp = Boolean.getBoolean("disableTimestamp"); - durable = Boolean.getBoolean("durable"); - transacted = Boolean.getBoolean("transacted"); - transaction_size = Integer.getInteger("trans_size",1000); - ack_mode = Integer.getInteger("ack_mode",Session.AUTO_ACKNOWLEDGE); - msg_count = Integer.getInteger("msg_count",msg_count); - warmup_count = Integer.getInteger("warmup_count",warmup_count); - random_msg_size = Boolean.getBoolean("random_msg_size"); - msgType = System.getProperty("msg_type","bytes"); - printStdDev = Boolean.getBoolean("print_std_dev"); - rate = Long.getLong("rate",-1); - externalController = Boolean.getBoolean("ext_controller"); - useUniqueDest = Boolean.getBoolean("use_unique_dest"); - random_msg_size_start_from = Integer.getInteger("random_msg_size_start_from", 1); - } - - public String getUrl() - { - return url; - } - - public String getHost() - { - return host; - } - - public int getPort() - { - return port; - } - - public String getAddress() - { - return address; - } - - public int getAckMode() - { - return ack_mode; - } - - public int getMsgCount() - { - return msg_count; - } - - public int getMsgSize() - { - return msg_size; - } - - public int getRandomMsgSizeStartFrom() - { - return random_msg_size_start_from; - } - - public boolean isDurable() - { - return durable; - } - - public boolean isTransacted() - { - return transacted; - } - - public int getTransactionSize() - { - return transaction_size; - } - - public int getWarmupCount() - { - return warmup_count; - } - - public boolean isCacheMessage() - { - return cacheMessage; - } - - public boolean isDisableMessageID() - { - return disableMessageID; - } - - public boolean isDisableTimestamp() - { - return disableTimestamp; - } - - public boolean isRandomMsgSize() - { - return random_msg_size; - } - - public String getMessageType() - { - return msgType; - } - - public boolean isPrintStdDev() - { - return printStdDev; - } - - public long getRate() - { - return rate; - } - - public boolean isExternalController() - { - return externalController; - } - - public void setAddress(String addr) - { - address = addr; - } - - public boolean isUseUniqueDests() - { - return useUniqueDest; - } -} -- cgit v1.2.1