summaryrefslogtreecommitdiff
path: root/java/tools
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-07-20 20:46:49 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-07-20 20:46:49 +0000
commit831edb0cb38965f03507ccb89bf0b2d098382446 (patch)
treec4b51bf08d3fbcb5f13adc9849438c7220ce2843 /java/tools
parent338e7808666ec8f9ca8301ccfe4f86d5dc709f0d (diff)
downloadqpid-python-831edb0cb38965f03507ccb89bf0b2d098382446.tar.gz
QPID-3358 Added a controller to coordinate tests run on several jvm's on the same host or multiple host machines.
1. The controller registers participants and waits until the desired numbers of producers and consumers join the test. 2. It then coordinates warmup runs between the participants and starts the actual test run. 3. Once the test is done it collects stats and computes averages,max,minetc.. and prints them out. The collector is designed run by itself, but can be run inline with a producer for simplicity. Infact by default the producer will run a controller unless -Dext_controller=true is specified. This is done to ensure that the simple test cases like the per-report script can be run without additional configuration. The code would need a bit of cleanup later on. The current form was done quite quickly as a POC for an urgent task. I will be reviewing the code and making improvements over the comming days. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1148935 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/tools')
-rwxr-xr-xjava/tools/bin/perf-report37
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/Clock.java92
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java68
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java119
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java115
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java296
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/TestParams.java13
7 files changed, 607 insertions, 133 deletions
diff --git a/java/tools/bin/perf-report b/java/tools/bin/perf-report
index 228f792a52..7de3f2b602 100755
--- a/java/tools/bin/perf-report
+++ b/java/tools/bin/perf-report
@@ -21,16 +21,16 @@
# This will run the following test cases defined below and produce
# a report in tabular format.
-SUB_MEM=-Xmx1024M
-PUB_MEM=-Xmx1024M
QUEUE="queue;{create:always,node:{x-declare:{auto-delete:true}}}"
DURA_QUEUE="dqueue;{create:always,node:{durable:true,x-declare:{auto-delete:true}}}"
TOPIC="amq.topic/test"
DURA_TOPIC="amq.topic/test;{create:always,link:{durable:true}}"
+COMMON_CONFIG="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"
+
waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; }
cleanup()
-{
+{
pids=`ps aux | grep java | grep Perf | awk '{print $2}'`
if [ "$pids" != "" ]; then
kill -3 $pids
@@ -42,30 +42,31 @@ cleanup()
# $2 consumer options
# $3 producer options
run_testcase()
-{
- sh run-sub $LOG_CONFIG $SUB_MEM $2 > sub.out &
- waitfor sub.out "Warming up"
- sh run-pub $LOG_CONFIG $PUB_MEM $3 > pub.out &
- waitfor sub.out "Completed the test"
- waitfor pub.out "Consumer has completed the test"
+{
+ sh run-sub $COMMON_CONFIG $2 > sub.out &
+ sh run-pub $COMMON_CONFIG $3 > pub.out &
+ waitfor pub.out "Controller: Completed the test"
sleep 2 #give a grace period to shutdown
- print_result $1
+ print_result $1
+ mv pub.out $1.pub.out
+ mv sub.out $1.sub.out
}
print_result()
{
- prod_rate=`cat pub.out | grep "Producer rate" | awk '{print $3}'`
- sys_rate=`cat sub.out | grep "System Throughput" | awk '{print $4}'`
- cons_rate=`cat sub.out | grep "Consumer rate" | awk '{print $4}'`
- avg_latency=`cat sub.out | grep "Avg Latency" | awk '{print $4}'`
- min_latency=`cat sub.out | grep "Min Latency" | awk '{print $4}'`
- max_latency=`cat sub.out | grep "Max Latency" | awk '{print $4}'`
-
- printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11d|%11d|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency
+ prod_rate=`cat pub.out | grep "Avg Producer rate" | awk '{print $5}'`
+ sys_rate=`cat pub.out | grep "System Throughput" | awk '{print $4}'`
+ cons_rate=`cat pub.out | grep "Avg Consumer rate" | awk '{print $5}'`
+ avg_latency=`cat pub.out | grep "Avg System Latency" | awk '{print $5}'`
+ min_latency=`cat pub.out | grep "Min System Latency" | awk '{print $5}'`
+ max_latency=`cat pub.out | grep "Max System Latency" | awk '{print $5}'`
+
+ printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11.2f|%11.2f|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency
echo "------------------------------------------------------------------------------------------------"
}
trap cleanup EXIT
+rm -rf *.out #cleanup old files.
echo "Test report on " `date +%F`
echo "================================================================================================"
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/Clock.java b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java
new file mode 100644
index 0000000000..37369959a8
--- /dev/null
+++ b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java
@@ -0,0 +1,92 @@
+package org.apache.qpid.tools;
+
+/**
+ * In the future this will be replaced by a Clock abstraction
+ * that can utilize a realtime clock when running in RT Java.
+ */
+
+public class Clock
+{
+ private static Precision precision;
+ private static long offset = -1; // in nano secs
+
+ public enum Precision
+ {
+ NANO_SECS, MILI_SECS;
+
+ static Precision getPrecision(String str)
+ {
+ if ("mili".equalsIgnoreCase(str))
+ {
+ return MILI_SECS;
+ }
+ else
+ {
+ return NANO_SECS;
+ }
+ }
+ };
+
+ static
+ {
+ precision = Precision.getPrecision(System.getProperty("precision","mili"));
+ //offset = Long.getLong("offset",-1);
+
+ System.out.println("Using precision : " + precision + " and offset " + offset);
+ }
+
+ public static Precision getPrecision()
+ {
+ return precision;
+ }
+
+ public static long getTime()
+ {
+ if (precision == Precision.NANO_SECS)
+ {
+ if (offset == -1)
+ {
+ return System.nanoTime();
+ }
+ else
+ {
+ return System.nanoTime() + offset;
+ }
+ }
+ else
+ {
+ if (offset == -1)
+ {
+ return System.currentTimeMillis();
+ }
+ else
+ {
+ return System.currentTimeMillis() + offset/convertToMiliSecs();
+ }
+ }
+ }
+
+ public static long convertToSecs()
+ {
+ if (precision == Precision.NANO_SECS)
+ {
+ return 1000000000;
+ }
+ else
+ {
+ return 1000;
+ }
+ }
+
+ public static long convertToMiliSecs()
+ {
+ if (precision == Precision.NANO_SECS)
+ {
+ return 1000000;
+ }
+ else
+ {
+ return 1;
+ }
+ }
+}
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
index c78c752eca..340f11f5e4 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
@@ -21,9 +21,13 @@
package org.apache.qpid.tools;
import java.text.DecimalFormat;
+import java.util.UUID;
import javax.jms.Connection;
import javax.jms.Destination;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.qpid.client.AMQAnyDestination;
@@ -31,12 +35,42 @@ import org.apache.qpid.client.AMQConnection;
public class PerfBase
{
+ public final static String CODE = "CODE";
+ public final static String ID = "ID";
+ public final static String REPLY_ADDR = "REPLY_ADDR";
+ public final static String MAX_LATENCY = "MAX_LATENCY";
+ public final static String MIN_LATENCY = "MIN_LATENCY";
+ public final static String AVG_LATENCY = "AVG_LATENCY";
+ public final static String STD_DEV = "STD_DEV";
+ public final static String CONS_RATE = "CONS_RATE";
+ public final static String PROD_RATE = "PROD_RATE";
+ public final static String MSG_COUNT = "MSG_COUNT";
+ public final static String TIMESTAMP = "Timestamp";
+
+ String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}");
+
TestParams params;
Connection con;
Session session;
+ Session controllerSession;
Destination dest;
- Destination feedbackDest;
+ Destination myControlQueue;
+ Destination controllerQueue;
DecimalFormat df = new DecimalFormat("###.##");
+ String id = UUID.randomUUID().toString();
+ String myControlQueueAddr = id + ";{create: always}";
+
+ MessageProducer sendToController;
+ MessageConsumer receiveFromController;
+
+ enum OPCode {
+ REGISTER_CONSUMER, REGISTER_PRODUCER,
+ PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP,
+ CONSUMER_READY, PRODUCER_READY,
+ PRODUCER_START,
+ RECEIVED_END_MSG, CONSUMER_STOP,
+ RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS
+ };
enum MessageType {
BYTES, TEXT, MAP, OBJECT;
@@ -88,9 +122,41 @@ public class PerfBase
session = con.createSession(params.isTransacted(),
params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode());
+ controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
dest = new AMQAnyDestination(params.getAddress());
+ controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR);
+ myControlQueue = session.createQueue(myControlQueueAddr);
msgType = MessageType.getType(params.getMessageType());
System.out.println("Using " + msgType + " messages");
+
+ sendToController = controllerSession.createProducer(controllerQueue);
+ receiveFromController = controllerSession.createConsumer(myControlQueue);
+ }
+
+ public synchronized void sendMessageToController(MapMessage m) throws Exception
+ {
+ m.setString(ID, id);
+ sendToController.send(m);
+ }
+
+ public void receiveFromController(OPCode expected) throws Exception
+ {
+ MapMessage m = (MapMessage)receiveFromController.receive();
+ OPCode code = OPCode.values()[m.getInt(CODE)];
+ System.out.println("Received Code : " + code);
+ if (expected != code)
+ {
+ throw new Exception("Expected OPCode : " + expected + " but received : " + code);
+ }
+
+ }
+
+ public void tearDown() throws Exception
+ {
+ session.close();
+ controllerSession.close();
+ con.close();
}
public void handleError(Exception e,String msg)
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
index 43ffce23cc..ae439b7ce0 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
@@ -23,12 +23,10 @@ package org.apache.qpid.tools;
import java.util.ArrayList;
import java.util.List;
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
+import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import org.apache.qpid.thread.Threading;
@@ -99,6 +97,7 @@ public class PerfConsumer extends PerfBase implements MessageListener
public PerfConsumer()
{
super();
+ System.out.println("Consumer ID : " + id);
}
public void setUp() throws Exception
@@ -114,68 +113,87 @@ public class PerfConsumer extends PerfBase implements MessageListener
{
sample = new ArrayList<Long>(params.getMsgCount());
}
+
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal());
+ m.setString(REPLY_ADDR,myControlQueueAddr);
+ sendMessageToController(m);
}
public void warmup()throws Exception
{
- System.out.println("Warming up......");
-
+ receiveFromController(OPCode.CONSUMER_STARTWARMUP);
boolean start = false;
- while (!start)
+ Message msg = consumer.receive();
+ // This is to ensure we drain the queue before we start the actual test.
+ while ( msg != null)
{
- Message msg = consumer.receive();
- if (msg.getBooleanProperty("End"))
+ if (msg.getBooleanProperty("End") == true)
{
- start = true;
- MessageProducer temp = session.createProducer(msg.getJMSReplyTo());
- temp.send(session.createMessage());
- if (params.isTransacted())
- {
- session.commit();
- }
- temp.close();
+ // It's more realistic for the consumer to signal this.
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.PRODUCER_READY.ordinal());
+ sendMessageToController(m);
}
+ msg = consumer.receive(1000);
}
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.CONSUMER_READY.ordinal());
+ sendMessageToController(m);
}
public void startTest() throws Exception
{
- System.out.println("Starting test......");
+ System.out.println("Consumer Starting test......");
consumer.setMessageListener(this);
}
- public void printResults() throws Exception
+ public void sendResults() throws Exception
{
- synchronized (lock)
+ receiveFromController(OPCode.CONSUMER_STOP);
+
+ double avgLatency = (double)totalLatency/(double)rcvdMsgCount;
+ double consRate = (double)rcvdMsgCount*Clock.convertToSecs()/(double)(rcvdTime - startTime);
+ double stdDev = 0.0;
+ if (printStdDev)
{
- lock.wait();
+ stdDev = calculateStdDev(avgLatency);
}
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal());
+ m.setDouble(AVG_LATENCY, avgLatency/Clock.convertToMiliSecs());
+ m.setDouble(MIN_LATENCY,minLatency/Clock.convertToMiliSecs());
+ m.setDouble(MAX_LATENCY,maxLatency/Clock.convertToMiliSecs());
+ m.setDouble(STD_DEV, stdDev/Clock.convertToMiliSecs());
+ m.setDouble(CONS_RATE, consRate);
+ m.setLong(MSG_COUNT, rcvdMsgCount);
+ sendMessageToController(m);
- double avgLatency = (double)totalLatency/(double)rcvdMsgCount;
- double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000;
- double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000;
System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
System.out.println(new StringBuilder("Consumer rate : ").
append(df.format(consRate)).
append(" msg/sec").toString());
- System.out.println(new StringBuilder("System Throughput : ").
- append(df.format(throughput)).
- append(" msg/sec").toString());
System.out.println(new StringBuilder("Avg Latency : ").
- append(df.format(avgLatency)).
+ append(df.format(avgLatency/Clock.convertToMiliSecs())).
append(" ms").toString());
System.out.println(new StringBuilder("Min Latency : ").
- append(minLatency).
+ append(df.format(minLatency/Clock.convertToMiliSecs())).
append(" ms").toString());
System.out.println(new StringBuilder("Max Latency : ").
- append(maxLatency).
+ append(df.format(maxLatency/Clock.convertToMiliSecs())).
append(" ms").toString());
if (printStdDev)
{
System.out.println(new StringBuilder("Std Dev : ").
- append(calculateStdDev(avgLatency)).toString());
+ append(stdDev/Clock.convertToMiliSecs()).toString());
}
- System.out.println("Completed the test......\n");
+ System.out.println("Consumer has completed the test......\n");
}
public double calculateStdDev(double mean)
@@ -189,25 +207,6 @@ public class PerfConsumer extends PerfBase implements MessageListener
return Math.round(Math.sqrt(v));
}
- public void notifyCompletion(Destination replyTo) throws Exception
- {
- MessageProducer tmp = session.createProducer(replyTo);
- Message endMsg = session.createMessage();
- tmp.send(endMsg);
- if (params.isTransacted())
- {
- session.commit();
- }
- tmp.close();
- }
-
- public void tearDown() throws Exception
- {
- consumer.close();
- session.close();
- con.close();
- }
-
public void onMessage(Message msg)
{
try
@@ -220,22 +219,18 @@ public class PerfConsumer extends PerfBase implements MessageListener
if (msg.getBooleanProperty("End"))
{
- notifyCompletion(msg.getJMSReplyTo());
-
- synchronized (lock)
- {
- lock.notifyAll();
- }
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal());
+ sendMessageToController(m);
}
else
{
- rcvdTime = System.currentTimeMillis();
+ rcvdTime = Clock.getTime();
rcvdMsgCount ++;
if (rcvdMsgCount == 1)
{
startTime = rcvdTime;
- testStartTime = msg.getJMSTimestamp();
}
if (transacted && (rcvdMsgCount % transSize == 0))
@@ -243,7 +238,7 @@ public class PerfConsumer extends PerfBase implements MessageListener
session.commit();
}
- long latency = rcvdTime - msg.getJMSTimestamp();
+ long latency = rcvdTime - msg.getLongProperty(TIMESTAMP);
maxLatency = Math.max(maxLatency, latency);
minLatency = Math.min(minLatency, latency);
totalLatency = totalLatency + latency;
@@ -261,14 +256,14 @@ public class PerfConsumer extends PerfBase implements MessageListener
}
- public void test()
+ public void run()
{
try
{
setUp();
warmup();
startTest();
- printResults();
+ sendResults();
tearDown();
}
catch(Exception e)
@@ -284,7 +279,7 @@ public class PerfConsumer extends PerfBase implements MessageListener
{
public void run()
{
- cons.test();
+ cons.run();
}
};
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
index 6b8ba25d7f..4cecd6f4df 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
@@ -26,8 +26,8 @@ import java.util.Random;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
+import javax.jms.MapMessage;
import javax.jms.Message;
-import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import org.apache.qpid.thread.Threading;
@@ -67,17 +67,17 @@ public class PerfProducer extends PerfBase
int msgSizeRange = 1024;
boolean rateLimitProducer = false;
double rateFactor = 0.4;
+ double rate = 0.0;
public PerfProducer()
{
super();
+ System.out.println("Producer ID : " + id);
}
public void setUp() throws Exception
{
super.setUp();
- feedbackDest = session.createTemporaryQueue();
-
durable = params.isDurable();
rateLimitProducer = params.getRate() > 0 ? true : false;
if (rateLimitProducer)
@@ -116,6 +116,11 @@ public class PerfProducer extends PerfBase
producer = session.createProducer(dest);
producer.setDisableMessageID(params.isDisableMessageID());
producer.setDisableMessageTimestamp(params.isDisableTimestamp());
+
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal());
+ m.setString(REPLY_ADDR,myControlQueueAddr);
+ sendMessageToController(m);
}
Object createPayload(int size)
@@ -144,7 +149,6 @@ public class PerfProducer extends PerfBase
}
}
-
protected Message getNextMessage() throws Exception
{
if (cacheMsg)
@@ -173,48 +177,37 @@ public class PerfProducer extends PerfBase
public void warmup()throws Exception
{
- System.out.println("Warming up......");
- MessageConsumer tmp = session.createConsumer(feedbackDest);
+ receiveFromController(OPCode.PRODUCER_STARTWARMUP);
+ System.out.println("Producer Warming up......");
for (int i=0; i < params.getWarmupCount() -1; i++)
{
producer.send(getNextMessage());
}
- Message msg = session.createMessage();
- msg.setBooleanProperty("End", true);
- msg.setJMSReplyTo(feedbackDest);
- producer.send(msg);
+ sendEndMessage();
if (params.isTransacted())
{
session.commit();
}
-
- tmp.receive();
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- tmp.close();
}
public void startTest() throws Exception
{
- System.out.println("Starting test......");
+ receiveFromController(OPCode.PRODUCER_START);
int count = params.getMsgCount();
boolean transacted = params.isTransacted();
int tranSize = params.getTransactionSize();
- long limit = (long)(params.getRate() * rateFactor);
- long timeLimit = (long)(SEC * rateFactor);
+ long limit = (long)(params.getRate() * rateFactor); // in msecs
+ long timeLimit = (long)(SEC * rateFactor); // in msecs
- long start = System.currentTimeMillis();
+ long start = Clock.getTime(); // defaults to nano secs
long interval = start;
for(int i=0; i < count; i++ )
{
Message msg = getNextMessage();
+ msg.setLongProperty(TIMESTAMP, Clock.getTime());
producer.send(msg);
if ( transacted && ((i+1) % tranSize == 0))
{
@@ -223,62 +216,53 @@ public class PerfProducer extends PerfBase
if (rateLimitProducer && i%limit == 0)
{
- long elapsed = System.currentTimeMillis() - interval;
+ long elapsed = (Clock.getTime() - interval)*Clock.convertToMiliSecs(); // in msecs
if (elapsed < timeLimit)
{
Thread.sleep(elapsed);
}
- interval = System.currentTimeMillis();
+ interval = Clock.getTime();
}
}
- long time = System.currentTimeMillis() - start;
- double rate = ((double)count/(double)time)*1000;
+ sendEndMessage();
+ if ( transacted)
+ {
+ session.commit();
+ }
+ long time = Clock.getTime() - start;
+ rate = (double)count*Clock.convertToSecs()/(double)time;
System.out.println(new StringBuilder("Producer rate: ").
append(df.format(rate)).
append(" msg/sec").
toString());
+
+ System.out.println("Producer has completed the test......");
}
- public void waitForCompletion() throws Exception
+ public void sendEndMessage() throws Exception
{
- MessageConsumer tmp = session.createConsumer(feedbackDest);
Message msg = session.createMessage();
msg.setBooleanProperty("End", true);
- msg.setJMSReplyTo(feedbackDest);
producer.send(msg);
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- tmp.receive();
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- tmp.close();
- System.out.println("Consumer has completed the test......");
}
- public void tearDown() throws Exception
+ public void sendResults() throws Exception
{
- producer.close();
- session.close();
- con.close();
+ MapMessage msg = controllerSession.createMapMessage();
+ msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal());
+ msg.setDouble(PROD_RATE, rate);
+ sendMessageToController(msg);
}
- public void test()
+ public void run()
{
try
{
setUp();
warmup();
startTest();
- waitForCompletion();
+ sendResults();
tearDown();
}
catch(Exception e)
@@ -287,15 +271,42 @@ public class PerfProducer extends PerfBase
}
}
+ public void startControllerIfNeeded()
+ {
+ if (!params.isExternalController())
+ {
+ final PerfTestController controller = new PerfTestController();
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ controller.run();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating controller thread",e);
+ }
+ t.start();
+ }
+ }
+
public static void main(String[] args)
{
final PerfProducer prod = new PerfProducer();
+ prod.startControllerIfNeeded();
Runnable r = new Runnable()
{
public void run()
{
- prod.test();
+ prod.run();
}
};
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java
new file mode 100644
index 0000000000..5f2c1a23dc
--- /dev/null
+++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java
@@ -0,0 +1,296 @@
+package org.apache.qpid.tools;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+
+import org.apache.qpid.client.message.AMQPEncodedMapMessage;
+
+public class PerfTestController extends PerfBase implements MessageListener
+{
+ long totalTestTime;
+
+ private double avgSystemLatency = 0.0;
+ private double minSystemLatency = Double.MAX_VALUE;
+ private double maxSystemLatency = 0;
+ private double avgSystemLatencyStdDev = 0.0;
+
+ private double avgSystemConsRate = 0.0;
+ private double maxSystemConsRate = 0.0;
+ private double minSystemConsRate = Double.MAX_VALUE;
+
+ private double avgSystemProdRate = 0.0;
+ private double maxSystemProdRate = 0.0;
+ private double minSystemProdRate = Double.MAX_VALUE;
+
+ private long totalMsgCount = 0;
+ private double totalSystemThroughput = 0.0;
+
+ private int consumerCount = Integer.getInteger("cons_count", 1);
+ private int producerCount = Integer.getInteger("prod_count", 1);
+ private Map<String,MapMessage> consumers;
+ private Map<String,MapMessage> producers;
+
+ private CountDownLatch consRegistered;
+ private CountDownLatch prodRegistered;
+ private CountDownLatch consReady;
+ private CountDownLatch prodReady;
+ private CountDownLatch receivedEndMsg;
+ private CountDownLatch receivedConsStats;
+ private CountDownLatch receivedProdStats;
+
+ private MessageConsumer consumer;
+ private boolean printStdDev = false;
+
+ public PerfTestController()
+ {
+ super();
+ consumers = new ConcurrentHashMap<String,MapMessage>(consumerCount);
+ producers = new ConcurrentHashMap<String,MapMessage>(producerCount);
+
+ consRegistered = new CountDownLatch(consumerCount);
+ prodRegistered = new CountDownLatch(producerCount);
+ consReady = new CountDownLatch(consumerCount);
+ prodReady = new CountDownLatch(producerCount);
+ receivedConsStats = new CountDownLatch(consumerCount);
+ receivedProdStats = new CountDownLatch(producerCount);
+ receivedEndMsg = new CountDownLatch(producerCount);
+ printStdDev = params.isPrintStdDev();
+ }
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ consumer = controllerSession.createConsumer(controllerQueue);
+ consumer.setMessageListener(this);
+ consRegistered.await();
+ prodRegistered.await();
+ System.out.println("\nController: All producers and consumers have registered......\n");
+ }
+
+ public void warmup() throws Exception
+ {
+ System.out.println("Controller initiating warm up sequence......");
+ sendMessageToNodes(OPCode.CONSUMER_STARTWARMUP,consumers.values());
+ sendMessageToNodes(OPCode.PRODUCER_STARTWARMUP,producers.values());
+ prodReady.await();
+ consReady.await();
+ System.out.println("\nController : All producers and consumers are ready to start the test......\n");
+ }
+
+ public void startTest() throws Exception
+ {
+ System.out.println("\nController Starting test......");
+ long start = Clock.getTime();
+ sendMessageToNodes(OPCode.PRODUCER_START,producers.values());
+ receivedEndMsg.await();
+ totalTestTime = Clock.getTime() - start;
+ sendMessageToNodes(OPCode.CONSUMER_STOP,consumers.values());
+ receivedProdStats.await();
+ receivedConsStats.await();
+ }
+
+ public void calcStats() throws Exception
+ {
+ double totLatency = 0.0;
+ double totStdDev = 0.0;
+ double totalConsRate = 0.0;
+ double totalProdRate = 0.0;
+
+ MapMessage conStat = null; // for error handling
+ try
+ {
+ for (MapMessage m: consumers.values())
+ {
+ conStat = m;
+ minSystemLatency = Math.min(minSystemLatency,m.getDouble(MIN_LATENCY));
+ maxSystemLatency = Math.max(maxSystemLatency,m.getDouble(MAX_LATENCY));
+ totLatency = totLatency + m.getDouble(AVG_LATENCY);
+ totStdDev = totStdDev + m.getDouble(STD_DEV);
+
+ minSystemConsRate = Math.min(minSystemConsRate,m.getDouble(CONS_RATE));
+ maxSystemConsRate = Math.max(maxSystemConsRate,m.getDouble(CONS_RATE));
+ totalConsRate = totalConsRate + m.getDouble(CONS_RATE);
+
+ totalMsgCount = totalMsgCount + m.getLong(MSG_COUNT);
+ }
+ }
+ catch(Exception e)
+ {
+ System.out.println("Error calculating stats from Consumer : " + conStat);
+ }
+
+
+ MapMessage prodStat = null; // for error handling
+ try
+ {
+ for (MapMessage m: producers.values())
+ {
+ prodStat = m;
+ minSystemProdRate = Math.min(minSystemProdRate,m.getDouble(PROD_RATE));
+ maxSystemProdRate = Math.max(maxSystemProdRate,m.getDouble(PROD_RATE));
+ totalProdRate = totalProdRate + m.getDouble(PROD_RATE);
+ }
+ }
+ catch(Exception e)
+ {
+ System.out.println("Error calculating stats from Producer : " + conStat);
+ }
+
+ avgSystemLatency = totLatency/consumers.size();
+ avgSystemLatencyStdDev = totStdDev/consumers.size();
+ avgSystemConsRate = totalConsRate/consumers.size();
+ avgSystemProdRate = totalProdRate/producers.size();
+
+ System.out.println("Total test time : " + totalTestTime + " in " + Clock.getPrecision());
+
+ totalSystemThroughput = (totalMsgCount*Clock.convertToSecs()/totalTestTime);
+ }
+
+ public void printResults() throws Exception
+ {
+ System.out.println(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString());
+ System.out.println(new StringBuilder("System Throughput : ").
+ append(df.format(totalSystemThroughput)).
+ append(" msg/sec").toString());
+ System.out.println(new StringBuilder("Avg Consumer rate : ").
+ append(df.format(avgSystemConsRate)).
+ append(" msg/sec").toString());
+ System.out.println(new StringBuilder("Min Consumer rate : ").
+ append(df.format(minSystemConsRate)).
+ append(" msg/sec").toString());
+ System.out.println(new StringBuilder("Max Consumer rate : ").
+ append(df.format(maxSystemConsRate)).
+ append(" msg/sec").toString());
+
+ System.out.println(new StringBuilder("Avg Producer rate : ").
+ append(df.format(avgSystemProdRate)).
+ append(" msg/sec").toString());
+ System.out.println(new StringBuilder("Min Producer rate : ").
+ append(df.format(minSystemProdRate)).
+ append(" msg/sec").toString());
+ System.out.println(new StringBuilder("Max Producer rate : ").
+ append(df.format(maxSystemProdRate)).
+ append(" msg/sec").toString());
+
+ System.out.println(new StringBuilder("Avg System Latency : ").
+ append(df.format(avgSystemLatency)).
+ append(" ms").toString());
+ System.out.println(new StringBuilder("Min System Latency : ").
+ append(df.format(minSystemLatency)).
+ append(" ms").toString());
+ System.out.println(new StringBuilder("Max System Latency : ").
+ append(df.format(maxSystemLatency)).
+ append(" ms").toString());
+ if (printStdDev)
+ {
+ System.out.println(new StringBuilder("Avg System Std Dev : ").
+ append(avgSystemLatencyStdDev));
+ }
+ System.out.println("Controller: Completed the test......\n");
+ }
+
+ private synchronized void sendMessageToNodes(OPCode code,Collection<MapMessage> nodes) throws Exception
+ {
+ System.out.println("\nController: Sending code " + code);
+ MessageProducer tmpProd = controllerSession.createProducer(null);
+ MapMessage msg = controllerSession.createMapMessage();
+ msg.setInt(CODE, code.ordinal());
+ for (MapMessage node : nodes)
+ {
+ if (node.getString(REPLY_ADDR) == null)
+ {
+ System.out.println("REPLY_ADDR is null " + node);
+ }
+ else
+ {
+ System.out.println("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR));
+ }
+ tmpProd.send(controllerSession.createQueue(node.getString(REPLY_ADDR)), msg);
+ }
+ }
+
+ public void onMessage(Message msg)
+ {
+ try
+ {
+ MapMessage m = (MapMessage)msg;
+ OPCode code = OPCode.values()[m.getInt(CODE)];
+
+ System.out.println("\n---------Controller Received Code : " + code);
+ System.out.println("---------Data : " + ((AMQPEncodedMapMessage)m).getMap());
+
+ switch (code)
+ {
+ case REGISTER_CONSUMER :
+ consumers.put(m.getString(ID),m);
+ consRegistered.countDown();
+ break;
+
+ case REGISTER_PRODUCER :
+ producers.put(m.getString(ID),m);
+ prodRegistered.countDown();
+ break;
+
+ case CONSUMER_READY :
+ consReady.countDown();
+ break;
+
+ case PRODUCER_READY :
+ prodReady.countDown();
+ break;
+
+ case RECEIVED_END_MSG :
+ receivedEndMsg.countDown();
+ break;
+
+ case RECEIVED_CONSUMER_STATS :
+ consumers.put(m.getString(ID),m);
+ receivedConsStats.countDown();
+ break;
+
+ case RECEIVED_PRODUCER_STATS :
+ producers.put(m.getString(ID),m);
+ receivedProdStats.countDown();
+ break;
+
+ default:
+ throw new Exception("Invalid OPCode " + code);
+ }
+ }
+ catch (Exception e)
+ {
+ handleError(e,"Error when receiving messages " + msg);
+ }
+ }
+
+ public void run()
+ {
+ try
+ {
+ setUp();
+ warmup();
+ startTest();
+ calcStats();
+ printResults();
+ tearDown();
+ }
+ catch(Exception e)
+ {
+ handleError(e,"Error when running test");
+ }
+ }
+
+ public static void main(String[] args)
+ {
+ PerfTestController controller = new PerfTestController();
+ controller.run();
+ }
+}
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
index c656ba3cfc..a563aef6cc 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
@@ -71,6 +71,8 @@ public class TestParams
private long rate = -1;
+ private boolean externalController = false;
+
public TestParams()
{
@@ -94,6 +96,7 @@ public class TestParams
msgType = System.getProperty("msg_type","bytes");
printStdDev = Boolean.getBoolean("print_std_dev");
rate = Long.getLong("rate",-1);
+ externalController = Boolean.getBoolean("ext_controller");
}
public String getUrl()
@@ -190,4 +193,14 @@ public class TestParams
{
return rate;
}
+
+ public boolean isExternalController()
+ {
+ return externalController;
+ }
+
+ public void setAddress(String addr)
+ {
+ address = addr;
+ }
}