diff options
Diffstat (limited to 'java/tools/src')
-rw-r--r-- | java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java | 142 | ||||
-rw-r--r-- | java/tools/src/main/java/org/apache/qpid/tools/TestParams.java | 16 |
2 files changed, 146 insertions, 12 deletions
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java index 5f2c1a23dc..5c98c645f4 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java @@ -1,5 +1,6 @@ package org.apache.qpid.tools; +import java.io.FileWriter; import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -13,8 +14,40 @@ import javax.jms.MessageProducer; import org.apache.qpid.client.message.AMQPEncodedMapMessage; +/** + * The Controller coordinates a test run between a number + * of producers and consumers, configured via -Dprod_count and -Dcons_count. + * + * It waits till all the producers and consumers have registered and then + * conducts a warmup run. Once all consumers and producers have completed + * the warmup run and is ready, it will conduct the actual test run and + * collect all stats from the participants and calculates the system + * throughput, the avg/min/max for producer rates, consumer rates and latency. + * + * These stats are then printed to std out. + * The Controller also prints events to std out to give a running account + * of the test run in progress. Ex registering of participants, starting warmup ..etc. + * This allows a scripting tool to monitor the progress. + * + * The Controller can be run in two modes. + * 1. A single test run (default) where it just runs until the message count specified + * for the producers via -Dmsg_count is sent and received. + * + * 2. Time based, configured via -Dduration=x, where x is in mins. + * In this mode, the Controller repeatedly cycles through the tests (after an initial + * warmup run) until the desired time is reached. If a test run is in progress + * and the time is up, it will allow the run the complete. + * + * After each iteration, the stats will be printed out in csv format to a separate log file. + * System throughput is calculated as follows + * totalMsgCount/(totalTestTime) + */ public class PerfTestController extends PerfBase implements MessageListener { + enum TestMode { SINGLE_RUN, TIME_BASED }; + + TestMode testMode = TestMode.SINGLE_RUN; + long totalTestTime; private double avgSystemLatency = 0.0; @@ -35,6 +68,7 @@ public class PerfTestController extends PerfBase implements MessageListener private int consumerCount = Integer.getInteger("cons_count", 1); private int producerCount = Integer.getInteger("prod_count", 1); + private int duration = Integer.getInteger("duration", -1); // in mins private Map<String,MapMessage> consumers; private Map<String,MapMessage> producers; @@ -48,10 +82,11 @@ public class PerfTestController extends PerfBase implements MessageListener private MessageConsumer consumer; private boolean printStdDev = false; + FileWriter writer; public PerfTestController() { - super(); + super(""); consumers = new ConcurrentHashMap<String,MapMessage>(consumerCount); producers = new ConcurrentHashMap<String,MapMessage>(producerCount); @@ -59,16 +94,20 @@ public class PerfTestController extends PerfBase implements MessageListener prodRegistered = new CountDownLatch(producerCount); consReady = new CountDownLatch(consumerCount); prodReady = new CountDownLatch(producerCount); - receivedConsStats = new CountDownLatch(consumerCount); - receivedProdStats = new CountDownLatch(producerCount); - receivedEndMsg = new CountDownLatch(producerCount); printStdDev = params.isPrintStdDev(); + testMode = (duration == -1) ? TestMode.SINGLE_RUN : TestMode.TIME_BASED; } public void setUp() throws Exception { super.setUp(); + if (testMode == TestMode.TIME_BASED) + { + writer = new FileWriter("stats-csv.log"); + } consumer = controllerSession.createConsumer(controllerQueue); + System.out.println("\nController: " + producerCount + " producers are expected"); + System.out.println("Controller: " + consumerCount + " consumers are expected \n"); consumer.setMessageListener(this); consRegistered.await(); prodRegistered.await(); @@ -87,6 +126,7 @@ public class PerfTestController extends PerfBase implements MessageListener public void startTest() throws Exception { + resetCounters(); System.out.println("\nController Starting test......"); long start = Clock.getTime(); sendMessageToNodes(OPCode.PRODUCER_START,producers.values()); @@ -97,6 +137,22 @@ public class PerfTestController extends PerfBase implements MessageListener receivedConsStats.await(); } + public void resetCounters() + { + minSystemLatency = Double.MAX_VALUE; + maxSystemLatency = 0; + maxSystemConsRate = 0.0; + minSystemConsRate = Double.MAX_VALUE; + maxSystemProdRate = 0.0; + minSystemProdRate = Double.MAX_VALUE; + + totalMsgCount = 0; + + receivedConsStats = new CountDownLatch(consumerCount); + receivedProdStats = new CountDownLatch(producerCount); + receivedEndMsg = new CountDownLatch(producerCount); + } + public void calcStats() throws Exception { double totLatency = 0.0; @@ -194,7 +250,6 @@ public class PerfTestController extends PerfBase implements MessageListener System.out.println(new StringBuilder("Avg System Std Dev : "). append(avgSystemLatencyStdDev)); } - System.out.println("Controller: Completed the test......\n"); } private synchronized void sendMessageToNodes(OPCode code,Collection<MapMessage> nodes) throws Exception @@ -230,11 +285,23 @@ public class PerfTestController extends PerfBase implements MessageListener switch (code) { case REGISTER_CONSUMER : + if (consRegistered.getCount() == 0) + { + System.out.println("Warning : Expected number of consumers have already registered," + + "ignoring extra consumer"); + break; + } consumers.put(m.getString(ID),m); consRegistered.countDown(); break; case REGISTER_PRODUCER : + if (prodRegistered.getCount() == 0) + { + System.out.println("Warning : Expected number of producers have already registered," + + "ignoring extra producer"); + break; + } producers.put(m.getString(ID),m); prodRegistered.countDown(); break; @@ -277,10 +344,36 @@ public class PerfTestController extends PerfBase implements MessageListener { setUp(); warmup(); - startTest(); - calcStats(); - printResults(); + if (testMode == TestMode.SINGLE_RUN) + { + startTest(); + calcStats(); + printResults(); + } + else + { + long startTime = Clock.getTime(); + long timeLimit = duration * 60 * 1000; // duration is in mins. + boolean nextIteration = true; + while (nextIteration) + { + startTest(); + calcStats(); + writeStatsToFile(); + if (Clock.getTime() - startTime < timeLimit) + { + sendMessageToNodes(OPCode.CONTINUE_TEST,consumers.values()); + sendMessageToNodes(OPCode.CONTINUE_TEST,producers.values()); + nextIteration = true; + } + else + { + nextIteration = false; + } + } + } tearDown(); + } catch(Exception e) { @@ -288,6 +381,39 @@ public class PerfTestController extends PerfBase implements MessageListener } } + @Override + public void tearDown() throws Exception { + System.out.println("Controller: Completed the test......\n"); + if (testMode == TestMode.TIME_BASED) + { + writer.close(); + } + sendMessageToNodes(OPCode.STOP_TEST,consumers.values()); + sendMessageToNodes(OPCode.STOP_TEST,producers.values()); + super.tearDown(); + } + + public void writeStatsToFile() throws Exception + { + writer.append(String.valueOf(totalMsgCount)).append(","); + writer.append(df.format(totalSystemThroughput)).append(","); + writer.append(df.format(avgSystemConsRate)).append(","); + writer.append(df.format(minSystemConsRate)).append(","); + writer.append(df.format(maxSystemConsRate)).append(","); + writer.append(df.format(avgSystemProdRate)).append(","); + writer.append(df.format(minSystemProdRate)).append(","); + writer.append(df.format(maxSystemProdRate)).append(","); + writer.append(df.format(avgSystemLatency)).append(","); + writer.append(df.format(minSystemLatency)).append(","); + writer.append(df.format(maxSystemLatency)); + if (printStdDev) + { + writer.append(",").append(String.valueOf(avgSystemLatencyStdDev)); + } + writer.append("\n"); + writer.flush(); + } + public static void main(String[] args) { PerfTestController controller = new PerfTestController(); diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java index a563aef6cc..d73be0181b 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java @@ -43,7 +43,7 @@ public class TestParams private int msg_size = 1024; - private int msg_type = 1; // not used yet + private int random_msg_size_start_from = 1; private boolean cacheMessage = false; @@ -73,6 +73,8 @@ public class TestParams private boolean externalController = false; + private boolean useUniqueDest = false; // useful when using multiple connections. + public TestParams() { @@ -82,7 +84,6 @@ public class TestParams address = System.getProperty("address",address); msg_size = Integer.getInteger("msg_size", 1024); - msg_type = Integer.getInteger("msg_type",1); cacheMessage = Boolean.getBoolean("cache_msg"); disableMessageID = Boolean.getBoolean("disableMessageID"); disableTimestamp = Boolean.getBoolean("disableTimestamp"); @@ -97,6 +98,8 @@ public class TestParams printStdDev = Boolean.getBoolean("print_std_dev"); rate = Long.getLong("rate",-1); externalController = Boolean.getBoolean("ext_controller"); + useUniqueDest = Boolean.getBoolean("use_unique_dest"); + random_msg_size_start_from = Integer.getInteger("random_msg_size_start_from", 1); } public String getUrl() @@ -134,9 +137,9 @@ public class TestParams return msg_size; } - public int getMsgType() + public int getRandomMsgSizeStartFrom() { - return msg_type; + return random_msg_size_start_from; } public boolean isDurable() @@ -203,4 +206,9 @@ public class TestParams { address = addr; } + + public boolean isUseUniqueDests() + { + return useUniqueDest; + } } |