diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2009-11-11 00:13:13 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2009-11-11 00:13:13 +0000 |
commit | 1f74efa4eefa85382ce9f8d55d50f81c274abb05 (patch) | |
tree | 363a8e654004bbb67bea829771ebe1f5b7c6335a | |
parent | 0caf9647078392d464386d030c6a014b642ef35f (diff) | |
download | qpid-python-1f74efa4eefa85382ce9f8d55d50f81c274abb05.tar.gz |
Removed the following files as they will be replaced by a generic Sender and Receiver.
The bin files and the files under o/a/qpid/teskit/perf are moved under tools.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@834721 13f79535-47bb-0310-9956-ffa450edef68
15 files changed, 51 insertions, 2211 deletions
diff --git a/java/testkit/bin/perf_report.sh b/java/testkit/bin/perf_report.sh deleted file mode 100644 index 22c839e08c..0000000000 --- a/java/testkit/bin/perf_report.sh +++ /dev/null @@ -1,131 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# This will run the 8 use cases defined below and produce -# a report in tabular format. Refer to the documentation -# for more details. - -SUB_MEM=-Xmx1024M -PUB_MEM=-Xmx1024M -LOG_CONFIG="-Damqj.logging.level=WARN" - -. setenv.sh - -waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; } -cleanup() -{ - pids=`ps aux | grep java | grep Perf | awk '{print $2}'` - if [ "$pids" != "" ]; then - kill -3 $pids - kill -9 $pids >/dev/null 2>&1 - fi -} - -# $1 test name -# $2 consumer options -# $3 producer options -run_testcase() -{ - sh run_sub.sh $LOG_CONFIG $SUB_MEM $2 > sub.out & - waitfor sub.out "Warming up" - sh run_pub.sh $LOG_CONFIG $PUB_MEM $3 > pub.out & - waitfor sub.out "Completed the test" - waitfor pub.out "Consumer has completed the test" - sleep 2 #give a grace period to shutdown - print_result $1 -} - -print_result() -{ - prod_rate=`cat pub.out | grep "Producer rate" | awk '{print $3}'` - sys_rate=`cat sub.out | grep "System Throughput" | awk '{print $4}'` - cons_rate=`cat sub.out | grep "Consumer rate" | awk '{print $4}'` - avg_latency=`cat sub.out | grep "Avg Latency" | awk '{print $4}'` - min_latency=`cat sub.out | grep "Min Latency" | awk '{print $4}'` - max_latency=`cat sub.out | grep "Max Latency" | awk '{print $4}'` - - printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11d|%11d|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency - echo "------------------------------------------------------------------------------------------------" -} - -trap cleanup EXIT - -echo "Test report on " `date +%F` -echo "================================================================================================" -echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|" -echo "------------------------------------------------------------------------------------------------" - -# Test 1 Trans Queue -run_testcase "Trans_Queue" "" "-Dwarmup_count=1 -Dmsg_count=10" - -# Test 2 Dura Queue -run_testcase "Dura_Queue" "-Ddurable=true" "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" - -# Test 3 Dura Queue Sync -run_testcase "Dura_Queue_Sync" "-Ddurable=true" "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_persistence=true" - -# Test 4 Dura Queue Sync Publish and Ack -run_testcase "Dura_SyncPubAck" "-Ddurable=true -Dsync_ack=true" "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent" - -# Test 5 Topic -run_testcase "Topic" "-DtransDest=transientTopic" "-DtransDest=transientTopic -Dwarmup_count=1 -Dmsg_count=10" - -# Test 6 Durable Topic -run_testcase "Dura_Topic" "-Ddurable=true -DtransDest=durableTopic" "-Ddurable=true -DtransDest=durableTopic -Dwarmup_count=1 -Dmsg_count=10" - -# Test 7 Fanout -run_testcase "Fanout" "-DtransDest=fanoutQueue" "-DtransDest=fanoutQueue -Dwarmup_count=1 -Dmsg_count=10" - -# Test 8 Small TX -run_testcase "Small_Txs_2" "-Ddurable=true -Dtransacted=true -Dtrans_size=1" \ - "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=1" - -# Test 9 Large TX -run_testcase "Large_Txs_1000" "-Ddurable=true -Dtransacted=true -Dtrans_size=10" \ - "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=10" - -# Test 10 256 MSG -run_testcase "Msg_256b" "" "-Dmsg_size=256 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 11 512 MSG -run_testcase "Msg_512b" "" "-Dmsg_size=512 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 12 2048 MSG -run_testcase "Msg_2048b" "" "-Dmsg_size=2048 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 13 Random size MSG -run_testcase "Random_Msg_Size" "" "-Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10" - -# Test 14 Random size MSG Durable -run_testcase "Rand_Msg_Dura" "-Ddurable=true" "-Ddurable=true -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10" - -# Test 15 64K MSG -run_testcase "Msg_64K" "-Damqj.tcpNoDelay=true" "-Damqj.tcpNoDelay=true -Dmsg_size=64000 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 16 Durable 64K MSG -run_testcase "Msg_Durable_64K" "-Ddurable=true -Damqj.tcpNoDelay=true" \ - "-Damqj.tcpNoDelay=true -Dmsg_size=64000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" - -# Test 17 500K MSG -run_testcase "Msg_500K" "-Damqj.tcpNoDelay=true" "-Damqj.tcpNoDelay=true -Dmsg_size=500000 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 18 Durable 500K MSG -run_testcase "Msg_Dura_500K" "-Damqj.tcpNoDelay=true -Ddurable=true" \ - "-Damqj.tcpNoDelay=true -Dmsg_size=500000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" diff --git a/java/testkit/bin/run_pub.sh b/java/testkit/bin/run_pub.sh deleted file mode 100644 index 0702a55de9..0000000000 --- a/java/testkit/bin/run_pub.sh +++ /dev/null @@ -1,24 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -. $QPID_TEST_HOME/bin/setenv.sh - -echo "$@" -$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.testkit.perf.PerfProducer diff --git a/java/testkit/bin/run_sub.sh b/java/testkit/bin/run_sub.sh deleted file mode 100644 index f7e687de38..0000000000 --- a/java/testkit/bin/run_sub.sh +++ /dev/null @@ -1,25 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -. $QPID_TEST_HOME/bin/setenv.sh - -echo "$@" -$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.testkit.perf.PerfConsumer - diff --git a/java/testkit/bin/setenv.sh b/java/testkit/bin/setenv.sh deleted file mode 100644 index 24135e711b..0000000000 --- a/java/testkit/bin/setenv.sh +++ /dev/null @@ -1,49 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Compiles the test classes and sets the CLASSPATH - -# check for QPID_TEST_HOME -if [ "$QPID_TEST_HOME" = "" ] ; then - echo "ERROR: Please set QPID_TEST_HOME ...." - exit 1 -fi - -# check for JAVA_HOME -if [ "$JAVA_HOME" = "" ] ; then - echo "ERROR: Please set JAVA_HOME ...." - exit 1 -fi - -# VENDOR_LIB path needs to be set -# for Qpid set this to {qpid_checkout}/java/build/lib -if [ "$VENDOR_LIB" = "" ] ; then - echo "ERROR: Please set VENDOR_LIB path in the script ...." - exit 1 -fi - - -[ -d $QPID_TEST_HOME/classes ] || mkdir $QPID_TEST_HOME/classes - -CLASSPATH=`find $VENDOR_LIB -name *.jar* | tr '\n' ":"` -$JAVA_HOME/bin/javac -cp $CLASSPATH -d $QPID_TEST_HOME/classes -sourcepath $QPID_TEST_HOME/src `find $QPID_TEST_HOME/src -name '*.java'` - -export CLASSPATH=$QPID_TEST_HOME/classes:$CLASSPATH - diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java b/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java deleted file mode 100644 index 4a4f3d124b..0000000000 --- a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java +++ /dev/null @@ -1,350 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit.perf; - -import java.io.FileOutputStream; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import javax.jms.BytesMessage; -import javax.jms.DeliveryMode; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.TextMessage; - -import org.apache.qpid.testkit.MessageFactory; -import org.apache.qpid.thread.Threading; - -/** - * Latency test sends an x number of messages in warmup mode and wait for a confirmation - * from the consumer that it has successfully consumed them and ready to start the - * test. It will start sending y number of messages and each message will contain a time - * stamp. This will be used at the receiving end to measure the latency. - * - * It is important to have a sufficiently large number for the warmup count to - * ensure the system is in steady state before the test is started. - * - * If you plan to plot the latencies then msg_count should be a smaller number (ex 500 or 1000) - * You also need to specify a file name using -Dfile=/home/rajith/latency.log.1 - * - * The idea is to get a latency sample for the system once it achieves steady state. - * - */ - -public class LatencyTest extends PerfBase implements MessageListener -{ - MessageProducer producer; - MessageConsumer consumer; - Message msg; - byte[] payload; - long maxLatency = 0; - long minLatency = Long.MAX_VALUE; - long totalLatency = 0; // to calculate avg latency. - int rcvdMsgCount = 0; - double stdDev = 0; - double avgLatency = 0; - boolean warmup_mode = true; - boolean transacted = false; - int transSize = 0; - - final List<Long> latencies; - final Lock lock = new ReentrantLock(); - final Condition warmedUp; - final Condition testCompleted; - - public LatencyTest() - { - super(); - warmedUp = lock.newCondition(); - testCompleted = lock.newCondition(); - // Storing the following two for efficiency - transacted = params.isTransacted(); - transSize = params.getTransactionSize(); - latencies = new ArrayList <Long>(params.getMsgCount()); - } - - public void setUp() throws Exception - { - super.setUp(); - consumer = session.createConsumer(dest); - consumer.setMessageListener(this); - - // if message caching is enabled we pre create the message - // else we pre create the payload - if (params.isCacheMessage()) - { - msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); - msg.setJMSDeliveryMode(params.isDurable()? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - } - else - { - payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); - } - - producer = session.createProducer(dest); - producer.setDisableMessageID(params.isDisableMessageID()); - producer.setDisableMessageTimestamp(params.isDisableTimestamp()); - } - - protected Message getNextMessage() throws Exception - { - if (params.isCacheMessage()) - { - return msg; - } - else - { - msg = session.createBytesMessage(); - ((BytesMessage)msg).writeBytes(payload); - return msg; - } - } - - public void warmup()throws Exception - { - System.out.println("Warming up......"); - int count = params.getWarmupCount(); - for (int i=0; i < count; i++) - { - producer.send(getNextMessage()); - } - Message msg = session.createTextMessage("End"); - producer.send(msg); - - if (params.isTransacted()) - { - session.commit(); - } - - try - { - lock.lock(); - warmedUp.await(); - } - finally - { - lock.unlock(); - } - } - - public void onMessage(Message msg) - { - try - { - if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End")) - { - if (warmup_mode) - { - warmup_mode = false; - try - { - lock.lock(); - warmedUp.signal(); - } - finally - { - lock.unlock(); - } - } - else - { - computeStats(); - } - } - else if (!warmup_mode) - { - long time = System.currentTimeMillis(); - rcvdMsgCount ++; - - if (transacted && (rcvdMsgCount % transSize == 0)) - { - session.commit(); - } - - long latency = time - msg.getJMSTimestamp(); - latencies.add(latency); - totalLatency = totalLatency + latency; - } - - } - catch(Exception e) - { - handleError(e,"Error when receiving messages"); - } - - } - - private void computeStats() - { - avgLatency = (double)totalLatency/(double)rcvdMsgCount; - double sigma = 0; - - for (long latency: latencies) - { - maxLatency = Math.max(maxLatency, latency); - minLatency = Math.min(minLatency, latency); - sigma = sigma + Math.pow(latency - avgLatency,2); - } - - stdDev = Math.sqrt(sigma/(rcvdMsgCount -1)); - - try - { - lock.lock(); - testCompleted.signal(); - } - finally - { - lock.unlock(); - } - } - - public void writeToFile() throws Exception - { - String fileName = System.getProperty("file"); - PrintWriter writer = new PrintWriter(new FileOutputStream(fileName)); - for (long latency: latencies) - { - writer.println(String.valueOf(latency)); - } - writer.flush(); - writer.close(); - } - - public void printToConsole() - { - System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); - System.out.println(new StringBuilder("Standard Deviation : "). - append(df.format(stdDev)). - append(" ms").toString()); - System.out.println(new StringBuilder("Avg Latency : "). - append(df.format(avgLatency)). - append(" ms").toString()); - System.out.println(new StringBuilder("Min Latency : "). - append(minLatency). - append(" ms").toString()); - System.out.println(new StringBuilder("Max Latency : "). - append(maxLatency). - append(" ms").toString()); - System.out.println("Completed the test......\n"); - } - - public void startTest() throws Exception - { - System.out.println("Starting test......"); - int count = params.getMsgCount(); - - for(int i=0; i < count; i++ ) - { - Message msg = getNextMessage(); - msg.setJMSTimestamp(System.currentTimeMillis()); - producer.send(msg); - if ( transacted && ((i+1) % transSize == 0)) - { - session.commit(); - } - } - Message msg = session.createTextMessage("End"); - producer.send(msg); - if (params.isTransacted()) - { - session.commit(); - } - } - - public void tearDown() throws Exception - { - try - { - lock.lock(); - testCompleted.await(); - } - finally - { - lock.unlock(); - } - - producer.close(); - consumer.close(); - session.close(); - con.close(); - } - - public void test() - { - try - { - setUp(); - warmup(); - startTest(); - tearDown(); - } - catch(Exception e) - { - handleError(e,"Error when running test"); - } - } - - - public static void main(String[] args) - { - final LatencyTest latencyTest = new LatencyTest(); - Runnable r = new Runnable() - { - public void run() - { - latencyTest.test(); - latencyTest.printToConsole(); - if (System.getProperty("file") != null) - { - try - { - latencyTest.writeToFile(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating latency test thread",e); - } - t.start(); - } -}
\ No newline at end of file diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java b/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java deleted file mode 100644 index 95670f0507..0000000000 --- a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit.perf; - -import java.text.DecimalFormat; -import java.util.Hashtable; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.Session; -import javax.naming.Context; -import javax.naming.InitialContext; - -public class PerfBase -{ - TestParams params; - Connection con; - Session session; - Destination dest; - Destination feedbackDest; - DecimalFormat df = new DecimalFormat("###.##"); - - public PerfBase() - { - params = new TestParams(); - } - - public void setUp() throws Exception - { - Hashtable<String,String> env = new Hashtable<String,String>(); - env.put(Context.INITIAL_CONTEXT_FACTORY, params.getInitialContextFactory()); - env.put(Context.PROVIDER_URL, params.getProviderURL()); - - Context ctx = null; - try - { - ctx = new InitialContext(env); - } - catch(Exception e) - { - throw new Exception("Error initializing JNDI",e); - - } - - ConnectionFactory conFac = null; - try - { - conFac = (ConnectionFactory)ctx.lookup(params.getConnectionFactory()); - } - catch(Exception e) - { - throw new Exception("Error looking up connection factory",e); - } - - con = conFac.createConnection(); - con.start(); - session = con.createSession(params.isTransacted(), - params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode()); - - try - { - dest = (Destination)ctx.lookup( params.isDurable()? - params.getDurableDestination(): - params.getTransientDestination() - ); - } - catch(Exception e) - { - throw new Exception("Error looking up destination",e); - } - } - - public void handleError(Exception e,String msg) - { - StringBuilder sb = new StringBuilder(); - sb.append(msg); - sb.append(" "); - sb.append(e.getMessage()); - System.err.println(sb.toString()); - e.printStackTrace(); - } -} - diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java b/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java deleted file mode 100644 index 9781a7e839..0000000000 --- a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit.perf; - -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.TextMessage; - -import org.apache.qpid.thread.Threading; - -/** - * PerfConsumer will receive x no of messages in warmup mode. - * Once it receives the Start message it will then signal the PerfProducer. - * It will start recording stats from the first message it receives after - * the warmup mode is done. - * - * The following calculations are done. - * The important numbers to look at is - * a) Avg Latency - * b) System throughput. - * - * Latency. - * ========= - * Currently this test is written with the assumption that either - * a) The Perf Producer and Consumer are on the same machine - * b) They are on separate machines that have their time synced via a Time Server - * - * In order to calculate latency the producer inserts a timestamp - * hen the message is sent. The consumer will note the current time the message is - * received and will calculate the latency as follows - * latency = rcvdTime - msg.getJMSTimestamp() - * - * Through out the test it will keep track of the max and min latency to show the - * variance in latencies. - * - * Avg latency is measured by adding all latencies and dividing by the total msgs. - * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount - * - * Throughput - * =========== - * System throughput is calculated as follows - * rcvdMsgCount/(rcvdTime - testStartTime) - * - * Consumer rate is calculated as - * rcvdMsgCount/(rcvdTime - startTime) - * - * Note that the testStartTime referes to when the producer sent the first message - * and startTime is when the consumer first received a message. - * - * rcvdTime keeps track of when the last message is received. - * - * All throughput rates are given as msg/sec so the rates are multiplied by 1000. - * - */ - -public class PerfConsumer extends PerfBase implements MessageListener -{ - MessageConsumer consumer; - long maxLatency = 0; - long minLatency = Long.MAX_VALUE; - long totalLatency = 0; // to calculate avg latency. - int rcvdMsgCount = 0; - long testStartTime = 0; // to measure system throughput - long startTime = 0; // to measure consumer throughput - long rcvdTime = 0; - boolean transacted = false; - int transSize = 0; - - final Object lock = new Object(); - - public PerfConsumer() - { - super(); - } - - public void setUp() throws Exception - { - super.setUp(); - consumer = session.createConsumer(dest); - - // Storing the following two for efficiency - transacted = params.isTransacted(); - transSize = params.getTransactionSize(); - } - - public void warmup()throws Exception - { - System.out.println("Warming up......"); - - boolean start = false; - while (!start) - { - Message msg = consumer.receive(); - if (msg instanceof TextMessage) - { - if (((TextMessage)msg).getText().equals("End")) - { - start = true; - MessageProducer temp = session.createProducer(msg.getJMSReplyTo()); - temp.send(session.createMessage()); - if (params.isTransacted()) - { - session.commit(); - } - temp.close(); - } - } - } - } - - public void startTest() throws Exception - { - System.out.println("Starting test......"); - consumer.setMessageListener(this); - } - - public void printResults() throws Exception - { - synchronized (lock) - { - lock.wait(); - } - - double avgLatency = (double)totalLatency/(double)rcvdMsgCount; - double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000; - double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000; - System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); - System.out.println(new StringBuilder("Consumer rate : "). - append(df.format(consRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("System Throughput : "). - append(df.format(throughput)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Avg Latency : "). - append(df.format(avgLatency)). - append(" ms").toString()); - System.out.println(new StringBuilder("Min Latency : "). - append(minLatency). - append(" ms").toString()); - System.out.println(new StringBuilder("Max Latency : "). - append(maxLatency). - append(" ms").toString()); - System.out.println("Completed the test......\n"); - } - - public void notifyCompletion(Destination replyTo) throws Exception - { - MessageProducer tmp = session.createProducer(replyTo); - Message endMsg = session.createMessage(); - tmp.send(endMsg); - if (params.isTransacted()) - { - session.commit(); - } - tmp.close(); - } - - public void tearDown() throws Exception - { - consumer.close(); - session.close(); - con.close(); - } - - public void onMessage(Message msg) - { - try - { - if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End")) - { - notifyCompletion(msg.getJMSReplyTo()); - - synchronized (lock) - { - lock.notifyAll(); - } - } - else - { - rcvdTime = System.currentTimeMillis(); - rcvdMsgCount ++; - - if (rcvdMsgCount == 1) - { - startTime = rcvdTime; - testStartTime = msg.getJMSTimestamp(); - } - - if (transacted && (rcvdMsgCount % transSize == 0)) - { - session.commit(); - } - - long latency = rcvdTime - msg.getJMSTimestamp(); - maxLatency = Math.max(maxLatency, latency); - minLatency = Math.min(minLatency, latency); - totalLatency = totalLatency + latency; - } - - } - catch(Exception e) - { - handleError(e,"Error when receiving messages"); - } - - } - - public void test() - { - try - { - setUp(); - warmup(); - startTest(); - printResults(); - tearDown(); - } - catch(Exception e) - { - handleError(e,"Error when running test"); - } - } - - public static void main(String[] args) - { - final PerfConsumer cons = new PerfConsumer(); - Runnable r = new Runnable() - { - public void run() - { - cons.test(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); - } - t.start(); - } -}
\ No newline at end of file diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java b/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java deleted file mode 100644 index 62392e0e83..0000000000 --- a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java +++ /dev/null @@ -1,263 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit.perf; - -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - -import javax.jms.BytesMessage; -import javax.jms.DeliveryMode; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; - -import org.apache.qpid.testkit.MessageFactory; -import org.apache.qpid.thread.Threading; - -/** - * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation - * from the consumer that it has successfully consumed them and ready to start the - * test. It will start sending y no of messages and each message will contain a time - * stamp. This will be used at the receiving end to measure the latency. - * - * This is done with the assumption that both consumer and producer are running on - * the same machine or different machines which have time synced using a time server. - * - * This test also calculates the producer rate as follows. - * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs) - * - * All throughput rates are given as msg/sec so the rates are multiplied by 1000. - * - * Rajith - Producer rate is not an accurate perf metric IMO. - * It is heavily inlfuenced by any in memory buffering. - * System throughput and latencies calculated by the PerfConsumer are more realistic - * numbers. - * - */ -public class PerfProducer extends PerfBase -{ - MessageProducer producer; - Message msg; - byte[] payload; - List<byte[]> payloads; - boolean cacheMsg = false; - boolean randomMsgSize = false; - boolean durable = false; - Random random; - int msgSizeRange = 1024; - - public PerfProducer() - { - super(); - } - - public void setUp() throws Exception - { - super.setUp(); - feedbackDest = session.createTemporaryQueue(); - - durable = params.isDurable(); - - // if message caching is enabled we pre create the message - // else we pre create the payload - if (params.isCacheMessage()) - { - cacheMsg = true; - - msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); - msg.setJMSDeliveryMode(durable? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - } - else if (params.isRandomMsgSize()) - { - random = new Random(20080921); - randomMsgSize = true; - msgSizeRange = params.getMsgSize(); - payloads = new ArrayList<byte[]>(msgSizeRange); - - for (int i=0; i < msgSizeRange; i++) - { - payloads.add(MessageFactory.createMessagePayload(i).getBytes()); - } - } - else - { - payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); - } - - producer = session.createProducer(dest); - producer.setDisableMessageID(params.isDisableMessageID()); - producer.setDisableMessageTimestamp(params.isDisableTimestamp()); - } - - protected Message getNextMessage() throws Exception - { - if (cacheMsg) - { - return msg; - } - else - { - msg = session.createBytesMessage(); - - if (!randomMsgSize) - { - ((BytesMessage)msg).writeBytes(payload); - } - else - { - ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange))); - } - msg.setJMSDeliveryMode(durable? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - return msg; - } - } - - public void warmup()throws Exception - { - System.out.println("Warming up......"); - MessageConsumer tmp = session.createConsumer(feedbackDest); - - for (int i=0; i < params.getWarmupCount() -1; i++) - { - producer.send(getNextMessage()); - } - Message msg = session.createTextMessage("End"); - msg.setJMSReplyTo(feedbackDest); - producer.send(msg); - - if (params.isTransacted()) - { - session.commit(); - } - - tmp.receive(); - - if (params.isTransacted()) - { - session.commit(); - } - - tmp.close(); - } - - public void startTest() throws Exception - { - System.out.println("Starting test......"); - int count = params.getMsgCount(); - boolean transacted = params.isTransacted(); - int tranSize = params.getTransactionSize(); - - long start = System.currentTimeMillis(); - for(int i=0; i < count; i++ ) - { - Message msg = getNextMessage(); - msg.setJMSTimestamp(System.currentTimeMillis()); - producer.send(msg); - if ( transacted && ((i+1) % tranSize == 0)) - { - session.commit(); - } - } - long time = System.currentTimeMillis() - start; - double rate = ((double)count/(double)time)*1000; - System.out.println(new StringBuilder("Producer rate: "). - append(df.format(rate)). - append(" msg/sec"). - toString()); - } - - public void waitForCompletion() throws Exception - { - MessageConsumer tmp = session.createConsumer(feedbackDest); - Message msg = session.createTextMessage("End"); - msg.setJMSReplyTo(feedbackDest); - producer.send(msg); - - if (params.isTransacted()) - { - session.commit(); - } - - tmp.receive(); - - if (params.isTransacted()) - { - session.commit(); - } - - tmp.close(); - System.out.println("Consumer has completed the test......"); - } - - public void tearDown() throws Exception - { - producer.close(); - session.close(); - con.close(); - } - - public void test() - { - try - { - setUp(); - warmup(); - startTest(); - waitForCompletion(); - tearDown(); - } - catch(Exception e) - { - handleError(e,"Error when running test"); - } - } - - - public static void main(String[] args) - { - final PerfProducer prod = new PerfProducer(); - Runnable r = new Runnable() - { - public void run() - { - prod.test(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating producer thread",e); - } - t.start(); - } -}
\ No newline at end of file diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java b/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java deleted file mode 100644 index 2612af36e1..0000000000 --- a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit.perf; - -import javax.jms.Session; - -public class TestParams -{ - private String initialContextFactory = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; - - private String providerURL = System.getenv("QPID_TEST_HOME") + "/etc/jndi.properties"; - - private String connectionFactory = "connectionFactory"; - - private String transientDest = "transientQueue"; - - private String durableDest = "durableQueue"; - - private int msg_size = 1024; - - private int msg_type = 1; // not used yet - - private boolean cacheMessage = false; - - private boolean disableMessageID = false; - - private boolean disableTimestamp = false; - - private boolean durable = false; - - private boolean transacted = false; - - private int transaction_size = 1000; - - private int ack_mode = Session.AUTO_ACKNOWLEDGE; - - private int msg_count = 10; - - private int warmup_count = 1; - - private boolean random_msg_size = false; - - public TestParams() - { - initialContextFactory = System.getProperty("java.naming.factory.initial",initialContextFactory); - providerURL = System.getProperty("java.naming.provider.url",providerURL); - - transientDest = System.getProperty("transDest",transientDest); - durableDest = System.getProperty("durableDest",durableDest); - - msg_size = Integer.getInteger("msg_size", 1024); - msg_type = Integer.getInteger("msg_type",1); - cacheMessage = Boolean.getBoolean("cache_msg"); - disableMessageID = Boolean.getBoolean("disableMessageID"); - disableTimestamp = Boolean.getBoolean("disableTimestamp"); - durable = Boolean.getBoolean("durable"); - transacted = Boolean.getBoolean("transacted"); - transaction_size = Integer.getInteger("trans_size",1000); - ack_mode = Integer.getInteger("ack_mode",Session.AUTO_ACKNOWLEDGE); - msg_count = Integer.getInteger("msg_count",msg_count); - warmup_count = Integer.getInteger("warmup_count",warmup_count); - random_msg_size = Boolean.getBoolean("random_msg_size"); - } - - public int getAckMode() - { - return ack_mode; - } - - public String getConnectionFactory() - { - return connectionFactory; - } - - public String getTransientDestination() - { - return transientDest; - } - - public String getDurableDestination() - { - return durableDest; - } - - public String getInitialContextFactory() - { - return initialContextFactory; - } - - public int getMsgCount() - { - return msg_count; - } - - public int getMsgSize() - { - return msg_size; - } - - public int getMsgType() - { - return msg_type; - } - - public boolean isDurable() - { - return durable; - } - - public String getProviderURL() - { - return providerURL; - } - - public boolean isTransacted() - { - return transacted; - } - - public int getTransactionSize() - { - return transaction_size; - } - - public int getWarmupCount() - { - return warmup_count; - } - - public boolean isCacheMessage() - { - return cacheMessage; - } - - public boolean isDisableMessageID() - { - return disableMessageID; - } - - public boolean isDisableTimestamp() - { - return disableTimestamp; - } - - public boolean isRandomMsgSize() - { - return random_msg_size; - } - -} diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java deleted file mode 100644 index 0c3a17b3d8..0000000000 --- a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit.soak; - - -import java.text.DateFormat; -import java.text.DecimalFormat; -import java.text.NumberFormat; -import java.text.SimpleDateFormat; -import java.util.Date; - -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.Session; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.testkit.MessageFactory; - -public class BaseTest -{ - protected String host = "127.0.0.1"; - protected int msg_size = 100; - protected int msg_count = 10; - protected int session_count = 1; - protected boolean durable = false; - protected String queue_name = "message_queue"; - protected String exchange_name = "amq.direct"; - protected String routing_key = "routing_key"; - protected String contentType = "application/octet-stream"; - protected int port = 5672; - protected String url; - protected Message[] msgArray; - - protected AMQConnection con; - protected Destination dest = null; - protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); - protected NumberFormat nf = new DecimalFormat("##.00"); - - public BaseTest() - { - host = System.getProperty("host", "127.0.0.1"); - port = Integer.getInteger("port", 5672); - msg_size = Integer.getInteger("msg_size", 100); - msg_count = Integer.getInteger("msg_count", 10); - session_count = Integer.getInteger("session_count", 1); - durable = Boolean.getBoolean("durable"); - queue_name = System.getProperty("queue_name", "message_queue"); - exchange_name = System.getProperty("exchange_name", "amq.direct"); - routing_key = System.getProperty("routing_key", "routing_key"); - contentType = System.getProperty("content_type","application/octet-stream"); - - - - url = "amqp://username:password@topicClientid/test?brokerlist='tcp://" + host + ":" + port + "'"; - } - - public void setUp() - { - try - { - con = new AMQConnection(url); - con.start(); - - - if (exchange_name.equals("amq.topic")) - { - dest = new AMQTopic(new AMQShortString(exchange_name), - new AMQShortString(routing_key), - false, //auto-delete - null, //queue name - durable); - } - else - { - dest = new AMQQueue(new AMQShortString(exchange_name), - new AMQShortString(routing_key), - new AMQShortString(queue_name), - false, //exclusive - false, //auto-delete - durable); - } - - // Create the session to setup the messages - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - - if (msg_size == -1) - { - // This creates an array of 1000 messages from 500-1500 bytes - // During the tests a message will be picked randomly - msgArray = new Message[1000]; - for (int i = 0; i < 1000; i++) - { - Message msg = (contentType.equals("text/plain")) ? - MessageFactory.createTextMessage(session,500 + i) : - MessageFactory.createBytesMessage(session, 500 + i); - msg.setJMSDeliveryMode((durable) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - msgArray[i] = msg; - } - } - else - { - Message msg = (contentType.equals("text/plain")) ? - MessageFactory.createTextMessage(session, msg_size): - MessageFactory.createBytesMessage(session, msg_size); - msg.setJMSDeliveryMode((durable) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - msgArray = new Message[] - { msg }; - } - - session.close(); - - } - catch (Exception e) - { - handleError(e,"Error while setting up the test"); - } - } - - public void handleError(Exception e,String msg) - { - StringBuilder sb = new StringBuilder(); - sb.append(msg); - sb.append(" @ "); - sb.append(df.format(new Date(System.currentTimeMillis()))); - sb.append(" "); - sb.append(e.getMessage()); - System.err.println(sb.toString()); - e.printStackTrace(); - } -} diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java deleted file mode 100644 index d5514873e6..0000000000 --- a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit.soak; - - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.qpid.thread.Threading; - -/** - * Test Description - * ================ - * The difference between this test and the - * LongDurationConsumer is that each Session runs - * in it's own Thread and the ability to receive - * messages transactionally. - * - * All consumers will still share the same destination. - * - */ -public class MultiThreadedConsumer extends BaseTest -{ - protected final boolean transacted; - - public MultiThreadedConsumer() - { - super(); - transacted = Boolean.getBoolean("transacted"); - // needed only to calculate throughput. - // If msg_count is different set it via -Dmsg_count - msg_count = 10; - } - - /** - * Creates a Session and a consumer that runs in its - * own thread. - * It can also consume transactionally. - * - */ - public void test() - { - try - { - for (int i = 0; i < session_count; i++) - { - - final Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - Runnable r = new Runnable() - { - public void run() - { - try - { - MessageConsumer consumer = session.createConsumer(dest); - - consumer.setMessageListener(new MessageListener() - { - - private boolean startIteration = true; - private long startTime = 0; - private long iterations = 0; - - public void onMessage(Message m) - { - try - { - long now = System.currentTimeMillis(); - if (startIteration) - { - startTime = m.getJMSTimestamp(); - startIteration = false; - } - - if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End")) - { - startIteration = true; - long totalIterationTime = now - startTime; - double throughput = ((double)msg_count/(double)totalIterationTime) * 1000; - long latencySample = now - m.getJMSTimestamp(); - iterations++; - - StringBuilder sb = new StringBuilder(); - sb.append(iterations).append(","). - append(nf.format(throughput)).append(",").append(latencySample); - - System.out.println(sb.toString()); - - MessageProducer temp = session.createProducer(m.getJMSReplyTo()); - Message controlMsg = session.createTextMessage(); - temp.send(controlMsg); - if (transacted) - { - session.commit(); - } - temp.close(); - } - } - catch (JMSException e) - { - handleError(e,"Exception receiving messages"); - } - } - }); - } - catch (Exception e) - { - handleError(e,"Exception creating a consumer"); - } - - } - - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); - } - - t.setName("session-" + i); - t.start(); - } // for loop - } - catch (Exception e) - { - handleError(e,"Exception while setting up the test"); - } - - } - - public static void main(String[] args) - { - MultiThreadedConsumer test = new MultiThreadedConsumer(); - test.setUp(); - test.test(); - } - -} diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java deleted file mode 100644 index 1cf4ee28ca..0000000000 --- a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit.soak; - - -import java.util.Random; -import java.util.UUID; - -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.thread.Threading; - -/** - * Test Description - * ================ - * - * This test creats x number of sessions, where each session - * runs in it's own thread. Each session creates a producer - * and it's own feedback queue. - * - * A producer will send n-1 messages, followed by the n-th - * message which contains "End" in it's payload to signal - * that this is the last message message in the sequence. - * The end message has the feedback queue as it's replyTo. - * It will then listen on the feedback queue waiting for the - * confirmation and then sleeps for 1000 ms before proceeding - * with the next n messages. - * - * This hand shaking mechanism ensures that all of the - * messages sent are consumed by some consumer. This prevents - * the producers from saturating the broker especially when - * the consumers are slow. - * - * All producers send to a single destination - * If using transactions it's best to use smaller message count - * as the test only commits after sending all messages in a batch. - * - */ - -public class MultiThreadedProducer extends SimpleProducer -{ - protected final boolean transacted; - - public MultiThreadedProducer() - { - super(); - transacted = Boolean.getBoolean("transacted"); - } - - public void test() - { - try - { - final int msg_count_per_session = msg_count/session_count; - - for (int i = 0; i < session_count; i++) - { - final Session session = con.createSession(transacted, Session.AUTO_ACKNOWLEDGE); - Runnable r = new Runnable() - { - private Random gen = new Random(); - - private Message getNextMessage() - { - if (msg_size == -1) - { - int index = gen.nextInt(1000); - return msgArray[index]; - } - else - { - return msgArray[0]; - } - } - - public void run() - { - try - { - MessageProducer prod = session.createProducer(dest); - // this will ensure that the producer will not overun the consumer. - feedbackQueue = new AMQQueue(new AMQShortString("amq.direct"), new AMQShortString(UUID - .randomUUID().toString()), new AMQShortString("control")); - - MessageConsumer feedbackConsumer = session.createConsumer(feedbackQueue); - - while (true) - { - for (int i = 0; i < msg_count_per_session; i++) - { - Message msg = getNextMessage(); - msg.setJMSMessageID("ID:" + UUID.randomUUID()); - prod.send(msg); - } - - TextMessage m = session.createTextMessage("End"); - m.setJMSReplyTo(feedbackQueue); - prod.send(m); - - if (transacted) - { - session.commit(); - } - - System.out.println(df.format(System.currentTimeMillis())); - feedbackConsumer.receive(); - if (transacted) - { - session.commit(); - } - Thread.sleep(1000); - } - - } - catch (Exception e) - { - handleError(e,"Exception in producing message"); - } - - } - - }; - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating producer thread",e); - } - t.setName("session-" + i); - t.start(); - - } - - } - catch (Exception e) - { - handleError(e,"Exception while setting up the test"); - } - - } - - public static void main(String[] args) - { - MultiThreadedProducer test = new MultiThreadedProducer(); - test.setUp(); - test.test(); - } - -} diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java index 1ae2c35970..c240ecdf2e 100644 --- a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java +++ b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java @@ -21,6 +21,8 @@ package org.apache.qpid.testkit.soak; +import java.util.Random; + import javax.jms.BytesMessage; import javax.jms.Destination; import javax.jms.MessageConsumer; @@ -29,7 +31,11 @@ import javax.jms.Session; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.BasicMessageConsumer; +import org.apache.qpid.client.BasicMessageProducer; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.testkit.TestLauncher; import org.apache.qpid.thread.Threading; /** @@ -37,8 +43,9 @@ import org.apache.qpid.thread.Threading; * ================ * This test will open x number of connections where each * connection will create a session and a producer/consumer pair, - * and then send configurable no of messages. - * It will them sleep for configurable time interval and + * and then a randomly selected set of connections (about 1/4th) + * will send a configurable no of messages and try to receive them. + * It will then sleep for configurable time interval and * tear down the connections/sessions/consumers. * It will then repeat the process again until the test is stopped. * @@ -47,16 +54,14 @@ import org.apache.qpid.thread.Threading; * To find if the broker has leaks when cleaning resources. * To find if the client has leaks with resources. */ -public class ResourceLeakTest extends BaseTest +public class ResourceLeakTest extends TestLauncher { - protected int connection_count = 10; - protected long connection_idle_time = 5000; - + /* protected long connection_idle_time = 5000; + protected Random rand = new Random(); + public ResourceLeakTest() { - super(); - connection_count = Integer.getInteger("con_count",10); - connection_idle_time = Long.getLong("con_idle_time", 5000); + super(); } public void test() @@ -68,13 +73,7 @@ public class ResourceLeakTest extends BaseTest Session[] sessions = new Session[connection_count]; MessageConsumer[] msgCons = new MessageConsumer[connection_count]; MessageProducer [] msgProds = new MessageProducer[connection_count]; - Destination dest = new AMQQueue(new AMQShortString(exchange_name), - new AMQShortString(routing_key), - new AMQShortString(queue_name), - true, //exclusive - true // auto delete - ); - + while (true) { for (int i = 0; i < connection_count; i++) @@ -84,23 +83,36 @@ public class ResourceLeakTest extends BaseTest cons[i] = con; Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE); sessions[i] = ssn; + Destination dest = new AMQQueue(new AMQShortString(exchange_name), + new AMQShortString(routing_key + i), + new AMQShortString(queue_name + i), + true, //exclusive + true // auto delete + ); MessageConsumer msgCon = ssn.createConsumer(dest); msgCons[i] = msgCon; MessageProducer msgProd = ssn.createProducer(dest); msgProds[i] = msgProd; - - BytesMessage msg = ssn.createBytesMessage(); + } + + // Select some connections randomly and send/recv messages + // Exercise around quarter of the connections + for (int i=0; i < connection_count/4; i++) + { + int k = rand.nextInt(connection_count); + + BytesMessage msg = sessions[k].createBytesMessage(); msg.writeBytes("Test Msg".getBytes()); for (int j = 0; j < msg_count;j++) { - msgProd.send(msg); + msgProds[k].send(msg); } int j = 0; while (j < msg_count) { - msgCon.receive(); + msgCons[k].receive(); j++; } } @@ -111,10 +123,24 @@ public class ResourceLeakTest extends BaseTest { for (int i = 0; i < connection_count; i++) { - msgCons[i].close(); - msgProds[i].close(); - sessions[i].close(); - cons[i].close(); + if (!((BasicMessageConsumer)msgCons[i]).isClosed()) + { + msgCons[i].close(); + } + + if (!((BasicMessageProducer)msgProds[i]).isClosed()) + { + msgProds[i].close(); + } + + if (!((AMQSession)sessions[i]).isClosed()) + { + sessions[i].close(); + } + if (!((AMQConnection)cons[i]).isClosed()) + { + cons[i].close(); + } } } catch (Exception e) @@ -149,6 +175,6 @@ public class ResourceLeakTest extends BaseTest { throw new Error("Error creating test thread",e); } - } + }*/ } diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java deleted file mode 100644 index cd6d9013f8..0000000000 --- a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit.soak; - - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.qpid.thread.Threading; - -/** - * Test Description - * ================ - * This test will create x number of sessions. - * Each session will have it's own consumer. - * Once a consumer receives the "End" message it - * will send a message to the destination indicated - * by the replyTo field in the End message. - * This will signal the producer that all the previous - * messages have been consumed. The producer will - * then start sending messages again. - * - * This prevents the producer from overruning the - * consumer. - * * - * All consumers share a single destination - * - */ - -public class SimpleConsumer extends BaseTest -{ - public SimpleConsumer() - { - super(); - //needed only to calculate throughput. - // If msg_count is different set it via -Dmsg_count - msg_count = 10; - } - - public void test() - { - try - { - final Session[] sessions = new Session[session_count]; - MessageConsumer[] cons = new MessageConsumer[session_count]; - - for (int i = 0; i < session_count; i++) - { - sessions[i] = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - cons[i] = sessions[i].createConsumer(dest); - cons[i].setMessageListener(new MessageListener() - { - - private boolean startIteration = true; - private long startTime = 0; - private long iterations = 0; - - public void onMessage(Message m) - { - try - { - long now = System.currentTimeMillis(); - if (startIteration) - { - startTime = m.getJMSTimestamp(); - startIteration = false; - } - - if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End")) - { - - long totalIterationTime = now - startTime; - startIteration = true; - double throughput = ((double)msg_count/(double)totalIterationTime) * 1000; - long latencySample = now - m.getJMSTimestamp(); - iterations++; - - StringBuilder sb = new StringBuilder(); - sb.append(iterations).append(","). - append(nf.format(throughput)).append(",").append(latencySample); - - System.out.println(sb.toString()); - - MessageProducer temp = sessions[0].createProducer(m.getJMSReplyTo()); - Message controlMsg = sessions[0].createTextMessage(); - temp.send(controlMsg); - temp.close(); - } - } - catch (JMSException e) - { - handleError(e,"Exception when receiving the message"); - } - } - }); - } - - } - catch (Exception e) - { - handleError(e,"Exception when setting up the consumers"); - } - - } - - public static void main(String[] args) - { - final SimpleConsumer test = new SimpleConsumer(); - Runnable r = new Runnable(){ - public void run() - { - test.setUp(); - test.test(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); - } - } - -} diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java deleted file mode 100644 index 805ce7ac29..0000000000 --- a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit.soak; - - -import java.util.Random; -import java.util.UUID; - -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.thread.Threading; - -/** - * Test Description - * ================ - * This test will send n-1 messages, followed by the n-th - * message which contains "End" in it's payload to signal - * that this is the last message message in the sequence. - * The end message has the feedback queue as it's replyTo. - * It will then listen on the feedback queue waiting for the - * confirmation and then sleeps for 1000 ms before proceeding - * with the next n messages. - * - * This hand shaking mechanism ensures that all of the - * messages sent are consumed by some consumer. This prevents - * the producers from saturating the broker especially when - * the consumers are slow. - * - * It creates a producer per session. - * If session_count is > 1 it will round robin the messages - * btw the producers. - * - * All producers send to a single destination - * - */ - -public class SimpleProducer extends BaseTest -{ - protected Destination feedbackQueue; - Random gen = new Random(); - - public SimpleProducer() - { - super(); - } - - protected Message getNextMessage() - { - if (msg_size == -1) - { - int index = gen.nextInt(1000); - return msgArray[index]; - } - else - { - return msgArray[0]; - } - } - - public void test() - { - try - { - Session[] sessions = new Session[session_count]; - MessageProducer[] prods = new MessageProducer[session_count]; - - for (int i = 0; i < session_count; i++) - { - sessions[i] = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - prods[i] = sessions[i].createProducer(dest); - } - - // this will ensure that the producer will not overun the consumer. - feedbackQueue = new AMQQueue(new AMQShortString("amq.direct"), - new AMQShortString(UUID.randomUUID().toString()), - new AMQShortString("control")); - - MessageConsumer feedbackConsumer = sessions[0].createConsumer(feedbackQueue); - - int prod_pointer = 0; - boolean multi_session = session_count > 1 ? true : false; - - while (true) - { - for (int i = 0; i < msg_count - 1; i++) - { - Message msg = getNextMessage(); - msg.setJMSTimestamp(System.currentTimeMillis()); - prods[prod_pointer].send(msg); - if (multi_session) - { - prod_pointer++; - if (prod_pointer == session_count) - { - prod_pointer = 0; - } - } - } - - TextMessage m = sessions[0].createTextMessage("End"); - m.setJMSReplyTo(feedbackQueue); - prods[prod_pointer].send(m); - System.out.println(df.format(System.currentTimeMillis())); - feedbackConsumer.receive(); - Thread.sleep(1000); - } - } - catch (Exception e) - { - handleError(e,"Exception while setting up the producer"); - } - - } - - public static void main(String[] args) - { - final SimpleProducer test = new SimpleProducer(); - Runnable r = new Runnable(){ - public void run() - { - test.setUp(); - test.test(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating producer thread",e); - } - } - -} |