summaryrefslogtreecommitdiff
path: root/java/tools/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/tools/src')
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java142
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/TestParams.java16
2 files changed, 146 insertions, 12 deletions
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java
index 5f2c1a23dc..5c98c645f4 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java
@@ -1,5 +1,6 @@
package org.apache.qpid.tools;
+import java.io.FileWriter;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -13,8 +14,40 @@ import javax.jms.MessageProducer;
import org.apache.qpid.client.message.AMQPEncodedMapMessage;
+/**
+ * The Controller coordinates a test run between a number
+ * of producers and consumers, configured via -Dprod_count and -Dcons_count.
+ *
+ * It waits till all the producers and consumers have registered and then
+ * conducts a warmup run. Once all consumers and producers have completed
+ * the warmup run and is ready, it will conduct the actual test run and
+ * collect all stats from the participants and calculates the system
+ * throughput, the avg/min/max for producer rates, consumer rates and latency.
+ *
+ * These stats are then printed to std out.
+ * The Controller also prints events to std out to give a running account
+ * of the test run in progress. Ex registering of participants, starting warmup ..etc.
+ * This allows a scripting tool to monitor the progress.
+ *
+ * The Controller can be run in two modes.
+ * 1. A single test run (default) where it just runs until the message count specified
+ * for the producers via -Dmsg_count is sent and received.
+ *
+ * 2. Time based, configured via -Dduration=x, where x is in mins.
+ * In this mode, the Controller repeatedly cycles through the tests (after an initial
+ * warmup run) until the desired time is reached. If a test run is in progress
+ * and the time is up, it will allow the run the complete.
+ *
+ * After each iteration, the stats will be printed out in csv format to a separate log file.
+ * System throughput is calculated as follows
+ * totalMsgCount/(totalTestTime)
+ */
public class PerfTestController extends PerfBase implements MessageListener
{
+ enum TestMode { SINGLE_RUN, TIME_BASED };
+
+ TestMode testMode = TestMode.SINGLE_RUN;
+
long totalTestTime;
private double avgSystemLatency = 0.0;
@@ -35,6 +68,7 @@ public class PerfTestController extends PerfBase implements MessageListener
private int consumerCount = Integer.getInteger("cons_count", 1);
private int producerCount = Integer.getInteger("prod_count", 1);
+ private int duration = Integer.getInteger("duration", -1); // in mins
private Map<String,MapMessage> consumers;
private Map<String,MapMessage> producers;
@@ -48,10 +82,11 @@ public class PerfTestController extends PerfBase implements MessageListener
private MessageConsumer consumer;
private boolean printStdDev = false;
+ FileWriter writer;
public PerfTestController()
{
- super();
+ super("");
consumers = new ConcurrentHashMap<String,MapMessage>(consumerCount);
producers = new ConcurrentHashMap<String,MapMessage>(producerCount);
@@ -59,16 +94,20 @@ public class PerfTestController extends PerfBase implements MessageListener
prodRegistered = new CountDownLatch(producerCount);
consReady = new CountDownLatch(consumerCount);
prodReady = new CountDownLatch(producerCount);
- receivedConsStats = new CountDownLatch(consumerCount);
- receivedProdStats = new CountDownLatch(producerCount);
- receivedEndMsg = new CountDownLatch(producerCount);
printStdDev = params.isPrintStdDev();
+ testMode = (duration == -1) ? TestMode.SINGLE_RUN : TestMode.TIME_BASED;
}
public void setUp() throws Exception
{
super.setUp();
+ if (testMode == TestMode.TIME_BASED)
+ {
+ writer = new FileWriter("stats-csv.log");
+ }
consumer = controllerSession.createConsumer(controllerQueue);
+ System.out.println("\nController: " + producerCount + " producers are expected");
+ System.out.println("Controller: " + consumerCount + " consumers are expected \n");
consumer.setMessageListener(this);
consRegistered.await();
prodRegistered.await();
@@ -87,6 +126,7 @@ public class PerfTestController extends PerfBase implements MessageListener
public void startTest() throws Exception
{
+ resetCounters();
System.out.println("\nController Starting test......");
long start = Clock.getTime();
sendMessageToNodes(OPCode.PRODUCER_START,producers.values());
@@ -97,6 +137,22 @@ public class PerfTestController extends PerfBase implements MessageListener
receivedConsStats.await();
}
+ public void resetCounters()
+ {
+ minSystemLatency = Double.MAX_VALUE;
+ maxSystemLatency = 0;
+ maxSystemConsRate = 0.0;
+ minSystemConsRate = Double.MAX_VALUE;
+ maxSystemProdRate = 0.0;
+ minSystemProdRate = Double.MAX_VALUE;
+
+ totalMsgCount = 0;
+
+ receivedConsStats = new CountDownLatch(consumerCount);
+ receivedProdStats = new CountDownLatch(producerCount);
+ receivedEndMsg = new CountDownLatch(producerCount);
+ }
+
public void calcStats() throws Exception
{
double totLatency = 0.0;
@@ -194,7 +250,6 @@ public class PerfTestController extends PerfBase implements MessageListener
System.out.println(new StringBuilder("Avg System Std Dev : ").
append(avgSystemLatencyStdDev));
}
- System.out.println("Controller: Completed the test......\n");
}
private synchronized void sendMessageToNodes(OPCode code,Collection<MapMessage> nodes) throws Exception
@@ -230,11 +285,23 @@ public class PerfTestController extends PerfBase implements MessageListener
switch (code)
{
case REGISTER_CONSUMER :
+ if (consRegistered.getCount() == 0)
+ {
+ System.out.println("Warning : Expected number of consumers have already registered," +
+ "ignoring extra consumer");
+ break;
+ }
consumers.put(m.getString(ID),m);
consRegistered.countDown();
break;
case REGISTER_PRODUCER :
+ if (prodRegistered.getCount() == 0)
+ {
+ System.out.println("Warning : Expected number of producers have already registered," +
+ "ignoring extra producer");
+ break;
+ }
producers.put(m.getString(ID),m);
prodRegistered.countDown();
break;
@@ -277,10 +344,36 @@ public class PerfTestController extends PerfBase implements MessageListener
{
setUp();
warmup();
- startTest();
- calcStats();
- printResults();
+ if (testMode == TestMode.SINGLE_RUN)
+ {
+ startTest();
+ calcStats();
+ printResults();
+ }
+ else
+ {
+ long startTime = Clock.getTime();
+ long timeLimit = duration * 60 * 1000; // duration is in mins.
+ boolean nextIteration = true;
+ while (nextIteration)
+ {
+ startTest();
+ calcStats();
+ writeStatsToFile();
+ if (Clock.getTime() - startTime < timeLimit)
+ {
+ sendMessageToNodes(OPCode.CONTINUE_TEST,consumers.values());
+ sendMessageToNodes(OPCode.CONTINUE_TEST,producers.values());
+ nextIteration = true;
+ }
+ else
+ {
+ nextIteration = false;
+ }
+ }
+ }
tearDown();
+
}
catch(Exception e)
{
@@ -288,6 +381,39 @@ public class PerfTestController extends PerfBase implements MessageListener
}
}
+ @Override
+ public void tearDown() throws Exception {
+ System.out.println("Controller: Completed the test......\n");
+ if (testMode == TestMode.TIME_BASED)
+ {
+ writer.close();
+ }
+ sendMessageToNodes(OPCode.STOP_TEST,consumers.values());
+ sendMessageToNodes(OPCode.STOP_TEST,producers.values());
+ super.tearDown();
+ }
+
+ public void writeStatsToFile() throws Exception
+ {
+ writer.append(String.valueOf(totalMsgCount)).append(",");
+ writer.append(df.format(totalSystemThroughput)).append(",");
+ writer.append(df.format(avgSystemConsRate)).append(",");
+ writer.append(df.format(minSystemConsRate)).append(",");
+ writer.append(df.format(maxSystemConsRate)).append(",");
+ writer.append(df.format(avgSystemProdRate)).append(",");
+ writer.append(df.format(minSystemProdRate)).append(",");
+ writer.append(df.format(maxSystemProdRate)).append(",");
+ writer.append(df.format(avgSystemLatency)).append(",");
+ writer.append(df.format(minSystemLatency)).append(",");
+ writer.append(df.format(maxSystemLatency));
+ if (printStdDev)
+ {
+ writer.append(",").append(String.valueOf(avgSystemLatencyStdDev));
+ }
+ writer.append("\n");
+ writer.flush();
+ }
+
public static void main(String[] args)
{
PerfTestController controller = new PerfTestController();
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
index a563aef6cc..d73be0181b 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
@@ -43,7 +43,7 @@ public class TestParams
private int msg_size = 1024;
- private int msg_type = 1; // not used yet
+ private int random_msg_size_start_from = 1;
private boolean cacheMessage = false;
@@ -73,6 +73,8 @@ public class TestParams
private boolean externalController = false;
+ private boolean useUniqueDest = false; // useful when using multiple connections.
+
public TestParams()
{
@@ -82,7 +84,6 @@ public class TestParams
address = System.getProperty("address",address);
msg_size = Integer.getInteger("msg_size", 1024);
- msg_type = Integer.getInteger("msg_type",1);
cacheMessage = Boolean.getBoolean("cache_msg");
disableMessageID = Boolean.getBoolean("disableMessageID");
disableTimestamp = Boolean.getBoolean("disableTimestamp");
@@ -97,6 +98,8 @@ public class TestParams
printStdDev = Boolean.getBoolean("print_std_dev");
rate = Long.getLong("rate",-1);
externalController = Boolean.getBoolean("ext_controller");
+ useUniqueDest = Boolean.getBoolean("use_unique_dest");
+ random_msg_size_start_from = Integer.getInteger("random_msg_size_start_from", 1);
}
public String getUrl()
@@ -134,9 +137,9 @@ public class TestParams
return msg_size;
}
- public int getMsgType()
+ public int getRandomMsgSizeStartFrom()
{
- return msg_type;
+ return random_msg_size_start_from;
}
public boolean isDurable()
@@ -203,4 +206,9 @@ public class TestParams
{
address = addr;
}
+
+ public boolean isUseUniqueDests()
+ {
+ return useUniqueDest;
+ }
}