summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2009-11-11 00:13:13 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2009-11-11 00:13:13 +0000
commit1f74efa4eefa85382ce9f8d55d50f81c274abb05 (patch)
tree363a8e654004bbb67bea829771ebe1f5b7c6335a
parent0caf9647078392d464386d030c6a014b642ef35f (diff)
downloadqpid-python-1f74efa4eefa85382ce9f8d55d50f81c274abb05.tar.gz
Removed the following files as they will be replaced by a generic Sender and Receiver.
The bin files and the files under o/a/qpid/teskit/perf are moved under tools. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@834721 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/testkit/bin/perf_report.sh131
-rw-r--r--java/testkit/bin/run_pub.sh24
-rw-r--r--java/testkit/bin/run_sub.sh25
-rw-r--r--java/testkit/bin/setenv.sh49
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java350
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java102
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java267
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java263
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java168
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java152
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java166
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java176
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java76
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java151
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java162
15 files changed, 51 insertions, 2211 deletions
diff --git a/java/testkit/bin/perf_report.sh b/java/testkit/bin/perf_report.sh
deleted file mode 100644
index 22c839e08c..0000000000
--- a/java/testkit/bin/perf_report.sh
+++ /dev/null
@@ -1,131 +0,0 @@
-#!/bin/sh
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# This will run the 8 use cases defined below and produce
-# a report in tabular format. Refer to the documentation
-# for more details.
-
-SUB_MEM=-Xmx1024M
-PUB_MEM=-Xmx1024M
-LOG_CONFIG="-Damqj.logging.level=WARN"
-
-. setenv.sh
-
-waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; }
-cleanup()
-{
- pids=`ps aux | grep java | grep Perf | awk '{print $2}'`
- if [ "$pids" != "" ]; then
- kill -3 $pids
- kill -9 $pids >/dev/null 2>&1
- fi
-}
-
-# $1 test name
-# $2 consumer options
-# $3 producer options
-run_testcase()
-{
- sh run_sub.sh $LOG_CONFIG $SUB_MEM $2 > sub.out &
- waitfor sub.out "Warming up"
- sh run_pub.sh $LOG_CONFIG $PUB_MEM $3 > pub.out &
- waitfor sub.out "Completed the test"
- waitfor pub.out "Consumer has completed the test"
- sleep 2 #give a grace period to shutdown
- print_result $1
-}
-
-print_result()
-{
- prod_rate=`cat pub.out | grep "Producer rate" | awk '{print $3}'`
- sys_rate=`cat sub.out | grep "System Throughput" | awk '{print $4}'`
- cons_rate=`cat sub.out | grep "Consumer rate" | awk '{print $4}'`
- avg_latency=`cat sub.out | grep "Avg Latency" | awk '{print $4}'`
- min_latency=`cat sub.out | grep "Min Latency" | awk '{print $4}'`
- max_latency=`cat sub.out | grep "Max Latency" | awk '{print $4}'`
-
- printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11d|%11d|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency
- echo "------------------------------------------------------------------------------------------------"
-}
-
-trap cleanup EXIT
-
-echo "Test report on " `date +%F`
-echo "================================================================================================"
-echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|"
-echo "------------------------------------------------------------------------------------------------"
-
-# Test 1 Trans Queue
-run_testcase "Trans_Queue" "" "-Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 2 Dura Queue
-run_testcase "Dura_Queue" "-Ddurable=true" "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 3 Dura Queue Sync
-run_testcase "Dura_Queue_Sync" "-Ddurable=true" "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_persistence=true"
-
-# Test 4 Dura Queue Sync Publish and Ack
-run_testcase "Dura_SyncPubAck" "-Ddurable=true -Dsync_ack=true" "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent"
-
-# Test 5 Topic
-run_testcase "Topic" "-DtransDest=transientTopic" "-DtransDest=transientTopic -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 6 Durable Topic
-run_testcase "Dura_Topic" "-Ddurable=true -DtransDest=durableTopic" "-Ddurable=true -DtransDest=durableTopic -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 7 Fanout
-run_testcase "Fanout" "-DtransDest=fanoutQueue" "-DtransDest=fanoutQueue -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 8 Small TX
-run_testcase "Small_Txs_2" "-Ddurable=true -Dtransacted=true -Dtrans_size=1" \
- "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=1"
-
-# Test 9 Large TX
-run_testcase "Large_Txs_1000" "-Ddurable=true -Dtransacted=true -Dtrans_size=10" \
- "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=10"
-
-# Test 10 256 MSG
-run_testcase "Msg_256b" "" "-Dmsg_size=256 -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 11 512 MSG
-run_testcase "Msg_512b" "" "-Dmsg_size=512 -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 12 2048 MSG
-run_testcase "Msg_2048b" "" "-Dmsg_size=2048 -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 13 Random size MSG
-run_testcase "Random_Msg_Size" "" "-Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 14 Random size MSG Durable
-run_testcase "Rand_Msg_Dura" "-Ddurable=true" "-Ddurable=true -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 15 64K MSG
-run_testcase "Msg_64K" "-Damqj.tcpNoDelay=true" "-Damqj.tcpNoDelay=true -Dmsg_size=64000 -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 16 Durable 64K MSG
-run_testcase "Msg_Durable_64K" "-Ddurable=true -Damqj.tcpNoDelay=true" \
- "-Damqj.tcpNoDelay=true -Dmsg_size=64000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 17 500K MSG
-run_testcase "Msg_500K" "-Damqj.tcpNoDelay=true" "-Damqj.tcpNoDelay=true -Dmsg_size=500000 -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 18 Durable 500K MSG
-run_testcase "Msg_Dura_500K" "-Damqj.tcpNoDelay=true -Ddurable=true" \
- "-Damqj.tcpNoDelay=true -Dmsg_size=500000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
diff --git a/java/testkit/bin/run_pub.sh b/java/testkit/bin/run_pub.sh
deleted file mode 100644
index 0702a55de9..0000000000
--- a/java/testkit/bin/run_pub.sh
+++ /dev/null
@@ -1,24 +0,0 @@
-#!/bin/sh
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-. $QPID_TEST_HOME/bin/setenv.sh
-
-echo "$@"
-$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.testkit.perf.PerfProducer
diff --git a/java/testkit/bin/run_sub.sh b/java/testkit/bin/run_sub.sh
deleted file mode 100644
index f7e687de38..0000000000
--- a/java/testkit/bin/run_sub.sh
+++ /dev/null
@@ -1,25 +0,0 @@
-#!/bin/sh
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-. $QPID_TEST_HOME/bin/setenv.sh
-
-echo "$@"
-$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.testkit.perf.PerfConsumer
-
diff --git a/java/testkit/bin/setenv.sh b/java/testkit/bin/setenv.sh
deleted file mode 100644
index 24135e711b..0000000000
--- a/java/testkit/bin/setenv.sh
+++ /dev/null
@@ -1,49 +0,0 @@
-#!/bin/sh
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Compiles the test classes and sets the CLASSPATH
-
-# check for QPID_TEST_HOME
-if [ "$QPID_TEST_HOME" = "" ] ; then
- echo "ERROR: Please set QPID_TEST_HOME ...."
- exit 1
-fi
-
-# check for JAVA_HOME
-if [ "$JAVA_HOME" = "" ] ; then
- echo "ERROR: Please set JAVA_HOME ...."
- exit 1
-fi
-
-# VENDOR_LIB path needs to be set
-# for Qpid set this to {qpid_checkout}/java/build/lib
-if [ "$VENDOR_LIB" = "" ] ; then
- echo "ERROR: Please set VENDOR_LIB path in the script ...."
- exit 1
-fi
-
-
-[ -d $QPID_TEST_HOME/classes ] || mkdir $QPID_TEST_HOME/classes
-
-CLASSPATH=`find $VENDOR_LIB -name *.jar* | tr '\n' ":"`
-$JAVA_HOME/bin/javac -cp $CLASSPATH -d $QPID_TEST_HOME/classes -sourcepath $QPID_TEST_HOME/src `find $QPID_TEST_HOME/src -name '*.java'`
-
-export CLASSPATH=$QPID_TEST_HOME/classes:$CLASSPATH
-
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java b/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java
deleted file mode 100644
index 4a4f3d124b..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java
+++ /dev/null
@@ -1,350 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit.perf;
-
-import java.io.FileOutputStream;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.testkit.MessageFactory;
-import org.apache.qpid.thread.Threading;
-
-/**
- * Latency test sends an x number of messages in warmup mode and wait for a confirmation
- * from the consumer that it has successfully consumed them and ready to start the
- * test. It will start sending y number of messages and each message will contain a time
- * stamp. This will be used at the receiving end to measure the latency.
- *
- * It is important to have a sufficiently large number for the warmup count to
- * ensure the system is in steady state before the test is started.
- *
- * If you plan to plot the latencies then msg_count should be a smaller number (ex 500 or 1000)
- * You also need to specify a file name using -Dfile=/home/rajith/latency.log.1
- *
- * The idea is to get a latency sample for the system once it achieves steady state.
- *
- */
-
-public class LatencyTest extends PerfBase implements MessageListener
-{
- MessageProducer producer;
- MessageConsumer consumer;
- Message msg;
- byte[] payload;
- long maxLatency = 0;
- long minLatency = Long.MAX_VALUE;
- long totalLatency = 0; // to calculate avg latency.
- int rcvdMsgCount = 0;
- double stdDev = 0;
- double avgLatency = 0;
- boolean warmup_mode = true;
- boolean transacted = false;
- int transSize = 0;
-
- final List<Long> latencies;
- final Lock lock = new ReentrantLock();
- final Condition warmedUp;
- final Condition testCompleted;
-
- public LatencyTest()
- {
- super();
- warmedUp = lock.newCondition();
- testCompleted = lock.newCondition();
- // Storing the following two for efficiency
- transacted = params.isTransacted();
- transSize = params.getTransactionSize();
- latencies = new ArrayList <Long>(params.getMsgCount());
- }
-
- public void setUp() throws Exception
- {
- super.setUp();
- consumer = session.createConsumer(dest);
- consumer.setMessageListener(this);
-
- // if message caching is enabled we pre create the message
- // else we pre create the payload
- if (params.isCacheMessage())
- {
- msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
- msg.setJMSDeliveryMode(params.isDurable()?
- DeliveryMode.PERSISTENT :
- DeliveryMode.NON_PERSISTENT
- );
- }
- else
- {
- payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
- }
-
- producer = session.createProducer(dest);
- producer.setDisableMessageID(params.isDisableMessageID());
- producer.setDisableMessageTimestamp(params.isDisableTimestamp());
- }
-
- protected Message getNextMessage() throws Exception
- {
- if (params.isCacheMessage())
- {
- return msg;
- }
- else
- {
- msg = session.createBytesMessage();
- ((BytesMessage)msg).writeBytes(payload);
- return msg;
- }
- }
-
- public void warmup()throws Exception
- {
- System.out.println("Warming up......");
- int count = params.getWarmupCount();
- for (int i=0; i < count; i++)
- {
- producer.send(getNextMessage());
- }
- Message msg = session.createTextMessage("End");
- producer.send(msg);
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- try
- {
- lock.lock();
- warmedUp.await();
- }
- finally
- {
- lock.unlock();
- }
- }
-
- public void onMessage(Message msg)
- {
- try
- {
- if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End"))
- {
- if (warmup_mode)
- {
- warmup_mode = false;
- try
- {
- lock.lock();
- warmedUp.signal();
- }
- finally
- {
- lock.unlock();
- }
- }
- else
- {
- computeStats();
- }
- }
- else if (!warmup_mode)
- {
- long time = System.currentTimeMillis();
- rcvdMsgCount ++;
-
- if (transacted && (rcvdMsgCount % transSize == 0))
- {
- session.commit();
- }
-
- long latency = time - msg.getJMSTimestamp();
- latencies.add(latency);
- totalLatency = totalLatency + latency;
- }
-
- }
- catch(Exception e)
- {
- handleError(e,"Error when receiving messages");
- }
-
- }
-
- private void computeStats()
- {
- avgLatency = (double)totalLatency/(double)rcvdMsgCount;
- double sigma = 0;
-
- for (long latency: latencies)
- {
- maxLatency = Math.max(maxLatency, latency);
- minLatency = Math.min(minLatency, latency);
- sigma = sigma + Math.pow(latency - avgLatency,2);
- }
-
- stdDev = Math.sqrt(sigma/(rcvdMsgCount -1));
-
- try
- {
- lock.lock();
- testCompleted.signal();
- }
- finally
- {
- lock.unlock();
- }
- }
-
- public void writeToFile() throws Exception
- {
- String fileName = System.getProperty("file");
- PrintWriter writer = new PrintWriter(new FileOutputStream(fileName));
- for (long latency: latencies)
- {
- writer.println(String.valueOf(latency));
- }
- writer.flush();
- writer.close();
- }
-
- public void printToConsole()
- {
- System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
- System.out.println(new StringBuilder("Standard Deviation : ").
- append(df.format(stdDev)).
- append(" ms").toString());
- System.out.println(new StringBuilder("Avg Latency : ").
- append(df.format(avgLatency)).
- append(" ms").toString());
- System.out.println(new StringBuilder("Min Latency : ").
- append(minLatency).
- append(" ms").toString());
- System.out.println(new StringBuilder("Max Latency : ").
- append(maxLatency).
- append(" ms").toString());
- System.out.println("Completed the test......\n");
- }
-
- public void startTest() throws Exception
- {
- System.out.println("Starting test......");
- int count = params.getMsgCount();
-
- for(int i=0; i < count; i++ )
- {
- Message msg = getNextMessage();
- msg.setJMSTimestamp(System.currentTimeMillis());
- producer.send(msg);
- if ( transacted && ((i+1) % transSize == 0))
- {
- session.commit();
- }
- }
- Message msg = session.createTextMessage("End");
- producer.send(msg);
- if (params.isTransacted())
- {
- session.commit();
- }
- }
-
- public void tearDown() throws Exception
- {
- try
- {
- lock.lock();
- testCompleted.await();
- }
- finally
- {
- lock.unlock();
- }
-
- producer.close();
- consumer.close();
- session.close();
- con.close();
- }
-
- public void test()
- {
- try
- {
- setUp();
- warmup();
- startTest();
- tearDown();
- }
- catch(Exception e)
- {
- handleError(e,"Error when running test");
- }
- }
-
-
- public static void main(String[] args)
- {
- final LatencyTest latencyTest = new LatencyTest();
- Runnable r = new Runnable()
- {
- public void run()
- {
- latencyTest.test();
- latencyTest.printToConsole();
- if (System.getProperty("file") != null)
- {
- try
- {
- latencyTest.writeToFile();
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating latency test thread",e);
- }
- t.start();
- }
-} \ No newline at end of file
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java b/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java
deleted file mode 100644
index 95670f0507..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit.perf;
-
-import java.text.DecimalFormat;
-import java.util.Hashtable;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.Session;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-
-public class PerfBase
-{
- TestParams params;
- Connection con;
- Session session;
- Destination dest;
- Destination feedbackDest;
- DecimalFormat df = new DecimalFormat("###.##");
-
- public PerfBase()
- {
- params = new TestParams();
- }
-
- public void setUp() throws Exception
- {
- Hashtable<String,String> env = new Hashtable<String,String>();
- env.put(Context.INITIAL_CONTEXT_FACTORY, params.getInitialContextFactory());
- env.put(Context.PROVIDER_URL, params.getProviderURL());
-
- Context ctx = null;
- try
- {
- ctx = new InitialContext(env);
- }
- catch(Exception e)
- {
- throw new Exception("Error initializing JNDI",e);
-
- }
-
- ConnectionFactory conFac = null;
- try
- {
- conFac = (ConnectionFactory)ctx.lookup(params.getConnectionFactory());
- }
- catch(Exception e)
- {
- throw new Exception("Error looking up connection factory",e);
- }
-
- con = conFac.createConnection();
- con.start();
- session = con.createSession(params.isTransacted(),
- params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode());
-
- try
- {
- dest = (Destination)ctx.lookup( params.isDurable()?
- params.getDurableDestination():
- params.getTransientDestination()
- );
- }
- catch(Exception e)
- {
- throw new Exception("Error looking up destination",e);
- }
- }
-
- public void handleError(Exception e,String msg)
- {
- StringBuilder sb = new StringBuilder();
- sb.append(msg);
- sb.append(" ");
- sb.append(e.getMessage());
- System.err.println(sb.toString());
- e.printStackTrace();
- }
-}
-
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java b/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java
deleted file mode 100644
index 9781a7e839..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit.perf;
-
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.thread.Threading;
-
-/**
- * PerfConsumer will receive x no of messages in warmup mode.
- * Once it receives the Start message it will then signal the PerfProducer.
- * It will start recording stats from the first message it receives after
- * the warmup mode is done.
- *
- * The following calculations are done.
- * The important numbers to look at is
- * a) Avg Latency
- * b) System throughput.
- *
- * Latency.
- * =========
- * Currently this test is written with the assumption that either
- * a) The Perf Producer and Consumer are on the same machine
- * b) They are on separate machines that have their time synced via a Time Server
- *
- * In order to calculate latency the producer inserts a timestamp
- * hen the message is sent. The consumer will note the current time the message is
- * received and will calculate the latency as follows
- * latency = rcvdTime - msg.getJMSTimestamp()
- *
- * Through out the test it will keep track of the max and min latency to show the
- * variance in latencies.
- *
- * Avg latency is measured by adding all latencies and dividing by the total msgs.
- * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount
- *
- * Throughput
- * ===========
- * System throughput is calculated as follows
- * rcvdMsgCount/(rcvdTime - testStartTime)
- *
- * Consumer rate is calculated as
- * rcvdMsgCount/(rcvdTime - startTime)
- *
- * Note that the testStartTime referes to when the producer sent the first message
- * and startTime is when the consumer first received a message.
- *
- * rcvdTime keeps track of when the last message is received.
- *
- * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
- *
- */
-
-public class PerfConsumer extends PerfBase implements MessageListener
-{
- MessageConsumer consumer;
- long maxLatency = 0;
- long minLatency = Long.MAX_VALUE;
- long totalLatency = 0; // to calculate avg latency.
- int rcvdMsgCount = 0;
- long testStartTime = 0; // to measure system throughput
- long startTime = 0; // to measure consumer throughput
- long rcvdTime = 0;
- boolean transacted = false;
- int transSize = 0;
-
- final Object lock = new Object();
-
- public PerfConsumer()
- {
- super();
- }
-
- public void setUp() throws Exception
- {
- super.setUp();
- consumer = session.createConsumer(dest);
-
- // Storing the following two for efficiency
- transacted = params.isTransacted();
- transSize = params.getTransactionSize();
- }
-
- public void warmup()throws Exception
- {
- System.out.println("Warming up......");
-
- boolean start = false;
- while (!start)
- {
- Message msg = consumer.receive();
- if (msg instanceof TextMessage)
- {
- if (((TextMessage)msg).getText().equals("End"))
- {
- start = true;
- MessageProducer temp = session.createProducer(msg.getJMSReplyTo());
- temp.send(session.createMessage());
- if (params.isTransacted())
- {
- session.commit();
- }
- temp.close();
- }
- }
- }
- }
-
- public void startTest() throws Exception
- {
- System.out.println("Starting test......");
- consumer.setMessageListener(this);
- }
-
- public void printResults() throws Exception
- {
- synchronized (lock)
- {
- lock.wait();
- }
-
- double avgLatency = (double)totalLatency/(double)rcvdMsgCount;
- double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000;
- double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000;
- System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
- System.out.println(new StringBuilder("Consumer rate : ").
- append(df.format(consRate)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("System Throughput : ").
- append(df.format(throughput)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Avg Latency : ").
- append(df.format(avgLatency)).
- append(" ms").toString());
- System.out.println(new StringBuilder("Min Latency : ").
- append(minLatency).
- append(" ms").toString());
- System.out.println(new StringBuilder("Max Latency : ").
- append(maxLatency).
- append(" ms").toString());
- System.out.println("Completed the test......\n");
- }
-
- public void notifyCompletion(Destination replyTo) throws Exception
- {
- MessageProducer tmp = session.createProducer(replyTo);
- Message endMsg = session.createMessage();
- tmp.send(endMsg);
- if (params.isTransacted())
- {
- session.commit();
- }
- tmp.close();
- }
-
- public void tearDown() throws Exception
- {
- consumer.close();
- session.close();
- con.close();
- }
-
- public void onMessage(Message msg)
- {
- try
- {
- if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End"))
- {
- notifyCompletion(msg.getJMSReplyTo());
-
- synchronized (lock)
- {
- lock.notifyAll();
- }
- }
- else
- {
- rcvdTime = System.currentTimeMillis();
- rcvdMsgCount ++;
-
- if (rcvdMsgCount == 1)
- {
- startTime = rcvdTime;
- testStartTime = msg.getJMSTimestamp();
- }
-
- if (transacted && (rcvdMsgCount % transSize == 0))
- {
- session.commit();
- }
-
- long latency = rcvdTime - msg.getJMSTimestamp();
- maxLatency = Math.max(maxLatency, latency);
- minLatency = Math.min(minLatency, latency);
- totalLatency = totalLatency + latency;
- }
-
- }
- catch(Exception e)
- {
- handleError(e,"Error when receiving messages");
- }
-
- }
-
- public void test()
- {
- try
- {
- setUp();
- warmup();
- startTest();
- printResults();
- tearDown();
- }
- catch(Exception e)
- {
- handleError(e,"Error when running test");
- }
- }
-
- public static void main(String[] args)
- {
- final PerfConsumer cons = new PerfConsumer();
- Runnable r = new Runnable()
- {
- public void run()
- {
- cons.test();
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating consumer thread",e);
- }
- t.start();
- }
-} \ No newline at end of file
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java b/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java
deleted file mode 100644
index 62392e0e83..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit.perf;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-
-import org.apache.qpid.testkit.MessageFactory;
-import org.apache.qpid.thread.Threading;
-
-/**
- * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation
- * from the consumer that it has successfully consumed them and ready to start the
- * test. It will start sending y no of messages and each message will contain a time
- * stamp. This will be used at the receiving end to measure the latency.
- *
- * This is done with the assumption that both consumer and producer are running on
- * the same machine or different machines which have time synced using a time server.
- *
- * This test also calculates the producer rate as follows.
- * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs)
- *
- * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
- *
- * Rajith - Producer rate is not an accurate perf metric IMO.
- * It is heavily inlfuenced by any in memory buffering.
- * System throughput and latencies calculated by the PerfConsumer are more realistic
- * numbers.
- *
- */
-public class PerfProducer extends PerfBase
-{
- MessageProducer producer;
- Message msg;
- byte[] payload;
- List<byte[]> payloads;
- boolean cacheMsg = false;
- boolean randomMsgSize = false;
- boolean durable = false;
- Random random;
- int msgSizeRange = 1024;
-
- public PerfProducer()
- {
- super();
- }
-
- public void setUp() throws Exception
- {
- super.setUp();
- feedbackDest = session.createTemporaryQueue();
-
- durable = params.isDurable();
-
- // if message caching is enabled we pre create the message
- // else we pre create the payload
- if (params.isCacheMessage())
- {
- cacheMsg = true;
-
- msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
- msg.setJMSDeliveryMode(durable?
- DeliveryMode.PERSISTENT :
- DeliveryMode.NON_PERSISTENT
- );
- }
- else if (params.isRandomMsgSize())
- {
- random = new Random(20080921);
- randomMsgSize = true;
- msgSizeRange = params.getMsgSize();
- payloads = new ArrayList<byte[]>(msgSizeRange);
-
- for (int i=0; i < msgSizeRange; i++)
- {
- payloads.add(MessageFactory.createMessagePayload(i).getBytes());
- }
- }
- else
- {
- payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
- }
-
- producer = session.createProducer(dest);
- producer.setDisableMessageID(params.isDisableMessageID());
- producer.setDisableMessageTimestamp(params.isDisableTimestamp());
- }
-
- protected Message getNextMessage() throws Exception
- {
- if (cacheMsg)
- {
- return msg;
- }
- else
- {
- msg = session.createBytesMessage();
-
- if (!randomMsgSize)
- {
- ((BytesMessage)msg).writeBytes(payload);
- }
- else
- {
- ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange)));
- }
- msg.setJMSDeliveryMode(durable?
- DeliveryMode.PERSISTENT :
- DeliveryMode.NON_PERSISTENT
- );
- return msg;
- }
- }
-
- public void warmup()throws Exception
- {
- System.out.println("Warming up......");
- MessageConsumer tmp = session.createConsumer(feedbackDest);
-
- for (int i=0; i < params.getWarmupCount() -1; i++)
- {
- producer.send(getNextMessage());
- }
- Message msg = session.createTextMessage("End");
- msg.setJMSReplyTo(feedbackDest);
- producer.send(msg);
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- tmp.receive();
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- tmp.close();
- }
-
- public void startTest() throws Exception
- {
- System.out.println("Starting test......");
- int count = params.getMsgCount();
- boolean transacted = params.isTransacted();
- int tranSize = params.getTransactionSize();
-
- long start = System.currentTimeMillis();
- for(int i=0; i < count; i++ )
- {
- Message msg = getNextMessage();
- msg.setJMSTimestamp(System.currentTimeMillis());
- producer.send(msg);
- if ( transacted && ((i+1) % tranSize == 0))
- {
- session.commit();
- }
- }
- long time = System.currentTimeMillis() - start;
- double rate = ((double)count/(double)time)*1000;
- System.out.println(new StringBuilder("Producer rate: ").
- append(df.format(rate)).
- append(" msg/sec").
- toString());
- }
-
- public void waitForCompletion() throws Exception
- {
- MessageConsumer tmp = session.createConsumer(feedbackDest);
- Message msg = session.createTextMessage("End");
- msg.setJMSReplyTo(feedbackDest);
- producer.send(msg);
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- tmp.receive();
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- tmp.close();
- System.out.println("Consumer has completed the test......");
- }
-
- public void tearDown() throws Exception
- {
- producer.close();
- session.close();
- con.close();
- }
-
- public void test()
- {
- try
- {
- setUp();
- warmup();
- startTest();
- waitForCompletion();
- tearDown();
- }
- catch(Exception e)
- {
- handleError(e,"Error when running test");
- }
- }
-
-
- public static void main(String[] args)
- {
- final PerfProducer prod = new PerfProducer();
- Runnable r = new Runnable()
- {
- public void run()
- {
- prod.test();
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating producer thread",e);
- }
- t.start();
- }
-} \ No newline at end of file
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java b/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java
deleted file mode 100644
index 2612af36e1..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit.perf;
-
-import javax.jms.Session;
-
-public class TestParams
-{
- private String initialContextFactory = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
-
- private String providerURL = System.getenv("QPID_TEST_HOME") + "/etc/jndi.properties";
-
- private String connectionFactory = "connectionFactory";
-
- private String transientDest = "transientQueue";
-
- private String durableDest = "durableQueue";
-
- private int msg_size = 1024;
-
- private int msg_type = 1; // not used yet
-
- private boolean cacheMessage = false;
-
- private boolean disableMessageID = false;
-
- private boolean disableTimestamp = false;
-
- private boolean durable = false;
-
- private boolean transacted = false;
-
- private int transaction_size = 1000;
-
- private int ack_mode = Session.AUTO_ACKNOWLEDGE;
-
- private int msg_count = 10;
-
- private int warmup_count = 1;
-
- private boolean random_msg_size = false;
-
- public TestParams()
- {
- initialContextFactory = System.getProperty("java.naming.factory.initial",initialContextFactory);
- providerURL = System.getProperty("java.naming.provider.url",providerURL);
-
- transientDest = System.getProperty("transDest",transientDest);
- durableDest = System.getProperty("durableDest",durableDest);
-
- msg_size = Integer.getInteger("msg_size", 1024);
- msg_type = Integer.getInteger("msg_type",1);
- cacheMessage = Boolean.getBoolean("cache_msg");
- disableMessageID = Boolean.getBoolean("disableMessageID");
- disableTimestamp = Boolean.getBoolean("disableTimestamp");
- durable = Boolean.getBoolean("durable");
- transacted = Boolean.getBoolean("transacted");
- transaction_size = Integer.getInteger("trans_size",1000);
- ack_mode = Integer.getInteger("ack_mode",Session.AUTO_ACKNOWLEDGE);
- msg_count = Integer.getInteger("msg_count",msg_count);
- warmup_count = Integer.getInteger("warmup_count",warmup_count);
- random_msg_size = Boolean.getBoolean("random_msg_size");
- }
-
- public int getAckMode()
- {
- return ack_mode;
- }
-
- public String getConnectionFactory()
- {
- return connectionFactory;
- }
-
- public String getTransientDestination()
- {
- return transientDest;
- }
-
- public String getDurableDestination()
- {
- return durableDest;
- }
-
- public String getInitialContextFactory()
- {
- return initialContextFactory;
- }
-
- public int getMsgCount()
- {
- return msg_count;
- }
-
- public int getMsgSize()
- {
- return msg_size;
- }
-
- public int getMsgType()
- {
- return msg_type;
- }
-
- public boolean isDurable()
- {
- return durable;
- }
-
- public String getProviderURL()
- {
- return providerURL;
- }
-
- public boolean isTransacted()
- {
- return transacted;
- }
-
- public int getTransactionSize()
- {
- return transaction_size;
- }
-
- public int getWarmupCount()
- {
- return warmup_count;
- }
-
- public boolean isCacheMessage()
- {
- return cacheMessage;
- }
-
- public boolean isDisableMessageID()
- {
- return disableMessageID;
- }
-
- public boolean isDisableTimestamp()
- {
- return disableTimestamp;
- }
-
- public boolean isRandomMsgSize()
- {
- return random_msg_size;
- }
-
-}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java
deleted file mode 100644
index 0c3a17b3d8..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit.soak;
-
-
-import java.text.DateFormat;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.Session;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.testkit.MessageFactory;
-
-public class BaseTest
-{
- protected String host = "127.0.0.1";
- protected int msg_size = 100;
- protected int msg_count = 10;
- protected int session_count = 1;
- protected boolean durable = false;
- protected String queue_name = "message_queue";
- protected String exchange_name = "amq.direct";
- protected String routing_key = "routing_key";
- protected String contentType = "application/octet-stream";
- protected int port = 5672;
- protected String url;
- protected Message[] msgArray;
-
- protected AMQConnection con;
- protected Destination dest = null;
- protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
- protected NumberFormat nf = new DecimalFormat("##.00");
-
- public BaseTest()
- {
- host = System.getProperty("host", "127.0.0.1");
- port = Integer.getInteger("port", 5672);
- msg_size = Integer.getInteger("msg_size", 100);
- msg_count = Integer.getInteger("msg_count", 10);
- session_count = Integer.getInteger("session_count", 1);
- durable = Boolean.getBoolean("durable");
- queue_name = System.getProperty("queue_name", "message_queue");
- exchange_name = System.getProperty("exchange_name", "amq.direct");
- routing_key = System.getProperty("routing_key", "routing_key");
- contentType = System.getProperty("content_type","application/octet-stream");
-
-
-
- url = "amqp://username:password@topicClientid/test?brokerlist='tcp://" + host + ":" + port + "'";
- }
-
- public void setUp()
- {
- try
- {
- con = new AMQConnection(url);
- con.start();
-
-
- if (exchange_name.equals("amq.topic"))
- {
- dest = new AMQTopic(new AMQShortString(exchange_name),
- new AMQShortString(routing_key),
- false, //auto-delete
- null, //queue name
- durable);
- }
- else
- {
- dest = new AMQQueue(new AMQShortString(exchange_name),
- new AMQShortString(routing_key),
- new AMQShortString(queue_name),
- false, //exclusive
- false, //auto-delete
- durable);
- }
-
- // Create the session to setup the messages
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- if (msg_size == -1)
- {
- // This creates an array of 1000 messages from 500-1500 bytes
- // During the tests a message will be picked randomly
- msgArray = new Message[1000];
- for (int i = 0; i < 1000; i++)
- {
- Message msg = (contentType.equals("text/plain")) ?
- MessageFactory.createTextMessage(session,500 + i) :
- MessageFactory.createBytesMessage(session, 500 + i);
- msg.setJMSDeliveryMode((durable) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
- msgArray[i] = msg;
- }
- }
- else
- {
- Message msg = (contentType.equals("text/plain")) ?
- MessageFactory.createTextMessage(session, msg_size):
- MessageFactory.createBytesMessage(session, msg_size);
- msg.setJMSDeliveryMode((durable) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
- msgArray = new Message[]
- { msg };
- }
-
- session.close();
-
- }
- catch (Exception e)
- {
- handleError(e,"Error while setting up the test");
- }
- }
-
- public void handleError(Exception e,String msg)
- {
- StringBuilder sb = new StringBuilder();
- sb.append(msg);
- sb.append(" @ ");
- sb.append(df.format(new Date(System.currentTimeMillis())));
- sb.append(" ");
- sb.append(e.getMessage());
- System.err.println(sb.toString());
- e.printStackTrace();
- }
-}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java
deleted file mode 100644
index d5514873e6..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit.soak;
-
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.thread.Threading;
-
-/**
- * Test Description
- * ================
- * The difference between this test and the
- * LongDurationConsumer is that each Session runs
- * in it's own Thread and the ability to receive
- * messages transactionally.
- *
- * All consumers will still share the same destination.
- *
- */
-public class MultiThreadedConsumer extends BaseTest
-{
- protected final boolean transacted;
-
- public MultiThreadedConsumer()
- {
- super();
- transacted = Boolean.getBoolean("transacted");
- // needed only to calculate throughput.
- // If msg_count is different set it via -Dmsg_count
- msg_count = 10;
- }
-
- /**
- * Creates a Session and a consumer that runs in its
- * own thread.
- * It can also consume transactionally.
- *
- */
- public void test()
- {
- try
- {
- for (int i = 0; i < session_count; i++)
- {
-
- final Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Runnable r = new Runnable()
- {
- public void run()
- {
- try
- {
- MessageConsumer consumer = session.createConsumer(dest);
-
- consumer.setMessageListener(new MessageListener()
- {
-
- private boolean startIteration = true;
- private long startTime = 0;
- private long iterations = 0;
-
- public void onMessage(Message m)
- {
- try
- {
- long now = System.currentTimeMillis();
- if (startIteration)
- {
- startTime = m.getJMSTimestamp();
- startIteration = false;
- }
-
- if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End"))
- {
- startIteration = true;
- long totalIterationTime = now - startTime;
- double throughput = ((double)msg_count/(double)totalIterationTime) * 1000;
- long latencySample = now - m.getJMSTimestamp();
- iterations++;
-
- StringBuilder sb = new StringBuilder();
- sb.append(iterations).append(",").
- append(nf.format(throughput)).append(",").append(latencySample);
-
- System.out.println(sb.toString());
-
- MessageProducer temp = session.createProducer(m.getJMSReplyTo());
- Message controlMsg = session.createTextMessage();
- temp.send(controlMsg);
- if (transacted)
- {
- session.commit();
- }
- temp.close();
- }
- }
- catch (JMSException e)
- {
- handleError(e,"Exception receiving messages");
- }
- }
- });
- }
- catch (Exception e)
- {
- handleError(e,"Exception creating a consumer");
- }
-
- }
-
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating consumer thread",e);
- }
-
- t.setName("session-" + i);
- t.start();
- } // for loop
- }
- catch (Exception e)
- {
- handleError(e,"Exception while setting up the test");
- }
-
- }
-
- public static void main(String[] args)
- {
- MultiThreadedConsumer test = new MultiThreadedConsumer();
- test.setUp();
- test.test();
- }
-
-}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java
deleted file mode 100644
index 1cf4ee28ca..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit.soak;
-
-
-import java.util.Random;
-import java.util.UUID;
-
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.thread.Threading;
-
-/**
- * Test Description
- * ================
- *
- * This test creats x number of sessions, where each session
- * runs in it's own thread. Each session creates a producer
- * and it's own feedback queue.
- *
- * A producer will send n-1 messages, followed by the n-th
- * message which contains "End" in it's payload to signal
- * that this is the last message message in the sequence.
- * The end message has the feedback queue as it's replyTo.
- * It will then listen on the feedback queue waiting for the
- * confirmation and then sleeps for 1000 ms before proceeding
- * with the next n messages.
- *
- * This hand shaking mechanism ensures that all of the
- * messages sent are consumed by some consumer. This prevents
- * the producers from saturating the broker especially when
- * the consumers are slow.
- *
- * All producers send to a single destination
- * If using transactions it's best to use smaller message count
- * as the test only commits after sending all messages in a batch.
- *
- */
-
-public class MultiThreadedProducer extends SimpleProducer
-{
- protected final boolean transacted;
-
- public MultiThreadedProducer()
- {
- super();
- transacted = Boolean.getBoolean("transacted");
- }
-
- public void test()
- {
- try
- {
- final int msg_count_per_session = msg_count/session_count;
-
- for (int i = 0; i < session_count; i++)
- {
- final Session session = con.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
- Runnable r = new Runnable()
- {
- private Random gen = new Random();
-
- private Message getNextMessage()
- {
- if (msg_size == -1)
- {
- int index = gen.nextInt(1000);
- return msgArray[index];
- }
- else
- {
- return msgArray[0];
- }
- }
-
- public void run()
- {
- try
- {
- MessageProducer prod = session.createProducer(dest);
- // this will ensure that the producer will not overun the consumer.
- feedbackQueue = new AMQQueue(new AMQShortString("amq.direct"), new AMQShortString(UUID
- .randomUUID().toString()), new AMQShortString("control"));
-
- MessageConsumer feedbackConsumer = session.createConsumer(feedbackQueue);
-
- while (true)
- {
- for (int i = 0; i < msg_count_per_session; i++)
- {
- Message msg = getNextMessage();
- msg.setJMSMessageID("ID:" + UUID.randomUUID());
- prod.send(msg);
- }
-
- TextMessage m = session.createTextMessage("End");
- m.setJMSReplyTo(feedbackQueue);
- prod.send(m);
-
- if (transacted)
- {
- session.commit();
- }
-
- System.out.println(df.format(System.currentTimeMillis()));
- feedbackConsumer.receive();
- if (transacted)
- {
- session.commit();
- }
- Thread.sleep(1000);
- }
-
- }
- catch (Exception e)
- {
- handleError(e,"Exception in producing message");
- }
-
- }
-
- };
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating producer thread",e);
- }
- t.setName("session-" + i);
- t.start();
-
- }
-
- }
- catch (Exception e)
- {
- handleError(e,"Exception while setting up the test");
- }
-
- }
-
- public static void main(String[] args)
- {
- MultiThreadedProducer test = new MultiThreadedProducer();
- test.setUp();
- test.test();
- }
-
-}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java
index 1ae2c35970..c240ecdf2e 100644
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java
+++ b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java
@@ -21,6 +21,8 @@
package org.apache.qpid.testkit.soak;
+import java.util.Random;
+
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
@@ -29,7 +31,11 @@ import javax.jms.Session;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.BasicMessageConsumer;
+import org.apache.qpid.client.BasicMessageProducer;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.testkit.TestLauncher;
import org.apache.qpid.thread.Threading;
/**
@@ -37,8 +43,9 @@ import org.apache.qpid.thread.Threading;
* ================
* This test will open x number of connections where each
* connection will create a session and a producer/consumer pair,
- * and then send configurable no of messages.
- * It will them sleep for configurable time interval and
+ * and then a randomly selected set of connections (about 1/4th)
+ * will send a configurable no of messages and try to receive them.
+ * It will then sleep for configurable time interval and
* tear down the connections/sessions/consumers.
* It will then repeat the process again until the test is stopped.
*
@@ -47,16 +54,14 @@ import org.apache.qpid.thread.Threading;
* To find if the broker has leaks when cleaning resources.
* To find if the client has leaks with resources.
*/
-public class ResourceLeakTest extends BaseTest
+public class ResourceLeakTest extends TestLauncher
{
- protected int connection_count = 10;
- protected long connection_idle_time = 5000;
-
+ /* protected long connection_idle_time = 5000;
+ protected Random rand = new Random();
+
public ResourceLeakTest()
{
- super();
- connection_count = Integer.getInteger("con_count",10);
- connection_idle_time = Long.getLong("con_idle_time", 5000);
+ super();
}
public void test()
@@ -68,13 +73,7 @@ public class ResourceLeakTest extends BaseTest
Session[] sessions = new Session[connection_count];
MessageConsumer[] msgCons = new MessageConsumer[connection_count];
MessageProducer [] msgProds = new MessageProducer[connection_count];
- Destination dest = new AMQQueue(new AMQShortString(exchange_name),
- new AMQShortString(routing_key),
- new AMQShortString(queue_name),
- true, //exclusive
- true // auto delete
- );
-
+
while (true)
{
for (int i = 0; i < connection_count; i++)
@@ -84,23 +83,36 @@ public class ResourceLeakTest extends BaseTest
cons[i] = con;
Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
sessions[i] = ssn;
+ Destination dest = new AMQQueue(new AMQShortString(exchange_name),
+ new AMQShortString(routing_key + i),
+ new AMQShortString(queue_name + i),
+ true, //exclusive
+ true // auto delete
+ );
MessageConsumer msgCon = ssn.createConsumer(dest);
msgCons[i] = msgCon;
MessageProducer msgProd = ssn.createProducer(dest);
msgProds[i] = msgProd;
-
- BytesMessage msg = ssn.createBytesMessage();
+ }
+
+ // Select some connections randomly and send/recv messages
+ // Exercise around quarter of the connections
+ for (int i=0; i < connection_count/4; i++)
+ {
+ int k = rand.nextInt(connection_count);
+
+ BytesMessage msg = sessions[k].createBytesMessage();
msg.writeBytes("Test Msg".getBytes());
for (int j = 0; j < msg_count;j++)
{
- msgProd.send(msg);
+ msgProds[k].send(msg);
}
int j = 0;
while (j < msg_count)
{
- msgCon.receive();
+ msgCons[k].receive();
j++;
}
}
@@ -111,10 +123,24 @@ public class ResourceLeakTest extends BaseTest
{
for (int i = 0; i < connection_count; i++)
{
- msgCons[i].close();
- msgProds[i].close();
- sessions[i].close();
- cons[i].close();
+ if (!((BasicMessageConsumer)msgCons[i]).isClosed())
+ {
+ msgCons[i].close();
+ }
+
+ if (!((BasicMessageProducer)msgProds[i]).isClosed())
+ {
+ msgProds[i].close();
+ }
+
+ if (!((AMQSession)sessions[i]).isClosed())
+ {
+ sessions[i].close();
+ }
+ if (!((AMQConnection)cons[i]).isClosed())
+ {
+ cons[i].close();
+ }
}
}
catch (Exception e)
@@ -149,6 +175,6 @@ public class ResourceLeakTest extends BaseTest
{
throw new Error("Error creating test thread",e);
}
- }
+ }*/
}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java
deleted file mode 100644
index cd6d9013f8..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit.soak;
-
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.thread.Threading;
-
-/**
- * Test Description
- * ================
- * This test will create x number of sessions.
- * Each session will have it's own consumer.
- * Once a consumer receives the "End" message it
- * will send a message to the destination indicated
- * by the replyTo field in the End message.
- * This will signal the producer that all the previous
- * messages have been consumed. The producer will
- * then start sending messages again.
- *
- * This prevents the producer from overruning the
- * consumer.
- * *
- * All consumers share a single destination
- *
- */
-
-public class SimpleConsumer extends BaseTest
-{
- public SimpleConsumer()
- {
- super();
- //needed only to calculate throughput.
- // If msg_count is different set it via -Dmsg_count
- msg_count = 10;
- }
-
- public void test()
- {
- try
- {
- final Session[] sessions = new Session[session_count];
- MessageConsumer[] cons = new MessageConsumer[session_count];
-
- for (int i = 0; i < session_count; i++)
- {
- sessions[i] = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- cons[i] = sessions[i].createConsumer(dest);
- cons[i].setMessageListener(new MessageListener()
- {
-
- private boolean startIteration = true;
- private long startTime = 0;
- private long iterations = 0;
-
- public void onMessage(Message m)
- {
- try
- {
- long now = System.currentTimeMillis();
- if (startIteration)
- {
- startTime = m.getJMSTimestamp();
- startIteration = false;
- }
-
- if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End"))
- {
-
- long totalIterationTime = now - startTime;
- startIteration = true;
- double throughput = ((double)msg_count/(double)totalIterationTime) * 1000;
- long latencySample = now - m.getJMSTimestamp();
- iterations++;
-
- StringBuilder sb = new StringBuilder();
- sb.append(iterations).append(",").
- append(nf.format(throughput)).append(",").append(latencySample);
-
- System.out.println(sb.toString());
-
- MessageProducer temp = sessions[0].createProducer(m.getJMSReplyTo());
- Message controlMsg = sessions[0].createTextMessage();
- temp.send(controlMsg);
- temp.close();
- }
- }
- catch (JMSException e)
- {
- handleError(e,"Exception when receiving the message");
- }
- }
- });
- }
-
- }
- catch (Exception e)
- {
- handleError(e,"Exception when setting up the consumers");
- }
-
- }
-
- public static void main(String[] args)
- {
- final SimpleConsumer test = new SimpleConsumer();
- Runnable r = new Runnable(){
- public void run()
- {
- test.setUp();
- test.test();
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating consumer thread",e);
- }
- }
-
-}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java
deleted file mode 100644
index 805ce7ac29..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit.soak;
-
-
-import java.util.Random;
-import java.util.UUID;
-
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.thread.Threading;
-
-/**
- * Test Description
- * ================
- * This test will send n-1 messages, followed by the n-th
- * message which contains "End" in it's payload to signal
- * that this is the last message message in the sequence.
- * The end message has the feedback queue as it's replyTo.
- * It will then listen on the feedback queue waiting for the
- * confirmation and then sleeps for 1000 ms before proceeding
- * with the next n messages.
- *
- * This hand shaking mechanism ensures that all of the
- * messages sent are consumed by some consumer. This prevents
- * the producers from saturating the broker especially when
- * the consumers are slow.
- *
- * It creates a producer per session.
- * If session_count is > 1 it will round robin the messages
- * btw the producers.
- *
- * All producers send to a single destination
- *
- */
-
-public class SimpleProducer extends BaseTest
-{
- protected Destination feedbackQueue;
- Random gen = new Random();
-
- public SimpleProducer()
- {
- super();
- }
-
- protected Message getNextMessage()
- {
- if (msg_size == -1)
- {
- int index = gen.nextInt(1000);
- return msgArray[index];
- }
- else
- {
- return msgArray[0];
- }
- }
-
- public void test()
- {
- try
- {
- Session[] sessions = new Session[session_count];
- MessageProducer[] prods = new MessageProducer[session_count];
-
- for (int i = 0; i < session_count; i++)
- {
- sessions[i] = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- prods[i] = sessions[i].createProducer(dest);
- }
-
- // this will ensure that the producer will not overun the consumer.
- feedbackQueue = new AMQQueue(new AMQShortString("amq.direct"),
- new AMQShortString(UUID.randomUUID().toString()),
- new AMQShortString("control"));
-
- MessageConsumer feedbackConsumer = sessions[0].createConsumer(feedbackQueue);
-
- int prod_pointer = 0;
- boolean multi_session = session_count > 1 ? true : false;
-
- while (true)
- {
- for (int i = 0; i < msg_count - 1; i++)
- {
- Message msg = getNextMessage();
- msg.setJMSTimestamp(System.currentTimeMillis());
- prods[prod_pointer].send(msg);
- if (multi_session)
- {
- prod_pointer++;
- if (prod_pointer == session_count)
- {
- prod_pointer = 0;
- }
- }
- }
-
- TextMessage m = sessions[0].createTextMessage("End");
- m.setJMSReplyTo(feedbackQueue);
- prods[prod_pointer].send(m);
- System.out.println(df.format(System.currentTimeMillis()));
- feedbackConsumer.receive();
- Thread.sleep(1000);
- }
- }
- catch (Exception e)
- {
- handleError(e,"Exception while setting up the producer");
- }
-
- }
-
- public static void main(String[] args)
- {
- final SimpleProducer test = new SimpleProducer();
- Runnable r = new Runnable(){
- public void run()
- {
- test.setUp();
- test.test();
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating producer thread",e);
- }
- }
-
-}