diff options
Diffstat (limited to 'java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java')
-rw-r--r-- | java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java | 450 |
1 files changed, 450 insertions, 0 deletions
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java b/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java new file mode 100644 index 0000000000..8c66a1e44d --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java @@ -0,0 +1,450 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.io.FileWriter; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; + +import org.apache.qpid.client.message.AMQPEncodedMapMessage; +import org.apache.qpid.tools.report.Reporter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The Controller coordinates a test run between a number + * of producers and consumers, configured via -Dprod_count and -Dcons_count. + * + * It waits till all the producers and consumers have registered and then + * conducts a warmup run. Once all consumers and producers have completed + * the warmup run and is ready, it will conduct the actual test run and + * collect all stats from the participants and calculates the system + * throughput, the avg/min/max for producer rates, consumer rates and latency. + * + * These stats are then printed to std out. + * The Controller also prints events to std out to give a running account + * of the test run in progress. Ex registering of participants, starting warmup ..etc. + * This allows a scripting tool to monitor the progress. + * + * The Controller can be run in two modes. + * 1. A single test run (default) where it just runs until the message count specified + * for the producers via -Dmsg_count is sent and received. + * + * 2. Time based, configured via -Dduration=x, where x is in mins. + * In this mode, the Controller repeatedly cycles through the tests (after an initial + * warmup run) until the desired time is reached. If a test run is in progress + * and the time is up, it will allow the run the complete. + * + * After each iteration, the stats will be printed out in csv format to a separate log file. + * System throughput is calculated as follows + * totalMsgCount/(totalTestTime) + */ +public class MercuryTestController extends MercuryBase implements MessageListener +{ + private static final Logger _logger = LoggerFactory.getLogger(MercuryProducerController.class); + + enum TestMode { SINGLE_RUN, TIME_BASED }; + + TestMode testMode = TestMode.SINGLE_RUN; + + long totalTestTime; + + private double avgSystemLatency = 0.0; + private double minSystemLatency = Double.MAX_VALUE; + private double maxSystemLatency = 0; + private double avgSystemLatencyStdDev = 0.0; + + private double avgSystemConsRate = 0.0; + private double maxSystemConsRate = 0.0; + private double minSystemConsRate = Double.MAX_VALUE; + + private double avgSystemProdRate = 0.0; + private double maxSystemProdRate = 0.0; + private double minSystemProdRate = Double.MAX_VALUE; + + private long totalMsgCount = 0; + private double totalSystemThroughput = 0.0; + + private int consumerCount = Integer.getInteger("cons_count", 1); + private int producerCount = Integer.getInteger("prod_count", 1); + private int duration = Integer.getInteger("duration", -1); // in mins + private Map<String,MapMessage> consumers; + private Map<String,MapMessage> producers; + + private CountDownLatch consRegistered; + private CountDownLatch prodRegistered; + private CountDownLatch consReady; + private CountDownLatch prodReady; + private CountDownLatch receivedEndMsg; + private CountDownLatch receivedConsStats; + private CountDownLatch receivedProdStats; + + private MessageConsumer consumer; + private boolean printStdDev = false; + private FileWriter writer; + private Reporter report; + + public MercuryTestController(TestConfiguration config) + { + super(config,""); + + consumers = new ConcurrentHashMap<String,MapMessage>(consumerCount); + producers = new ConcurrentHashMap<String,MapMessage>(producerCount); + + consRegistered = new CountDownLatch(consumerCount); + prodRegistered = new CountDownLatch(producerCount); + consReady = new CountDownLatch(consumerCount); + prodReady = new CountDownLatch(producerCount); + printStdDev = config.isPrintStdDev(); + testMode = (duration == -1) ? TestMode.SINGLE_RUN : TestMode.TIME_BASED; + } + + public void setUp() throws Exception + { + super.setUp(); + if (testMode == TestMode.TIME_BASED) + { + writer = new FileWriter("stats-csv.log"); + } + consumer = controllerSession.createConsumer(controllerQueue); + report.log("\nController: " + producerCount + " producers are expected"); + report.log("Controller: " + consumerCount + " consumers are expected \n"); + consumer.setMessageListener(this); + consRegistered.await(); + prodRegistered.await(); + report.log("\nController: All producers and consumers have registered......\n"); + } + + public void warmup() throws Exception + { + report.log("Controller initiating warm up sequence......"); + sendMessageToNodes(OPCode.CONSUMER_STARTWARMUP,consumers.values()); + sendMessageToNodes(OPCode.PRODUCER_STARTWARMUP,producers.values()); + prodReady.await(); + consReady.await(); + report.log("\nController : All producers and consumers are ready to start the test......\n"); + } + + public void startTest() throws Exception + { + resetCounters(); + report.log("\nController Starting test......"); + long start = Clock.getTime(); + sendMessageToNodes(OPCode.PRODUCER_START,producers.values()); + receivedEndMsg.await(); + totalTestTime = Clock.getTime() - start; + sendMessageToNodes(OPCode.CONSUMER_STOP,consumers.values()); + receivedProdStats.await(); + receivedConsStats.await(); + } + + public void resetCounters() + { + minSystemLatency = Double.MAX_VALUE; + maxSystemLatency = 0; + maxSystemConsRate = 0.0; + minSystemConsRate = Double.MAX_VALUE; + maxSystemProdRate = 0.0; + minSystemProdRate = Double.MAX_VALUE; + + totalMsgCount = 0; + + receivedConsStats = new CountDownLatch(consumerCount); + receivedProdStats = new CountDownLatch(producerCount); + receivedEndMsg = new CountDownLatch(producerCount); + } + + public void calcStats() throws Exception + { + double totLatency = 0.0; + double totStdDev = 0.0; + double totalConsRate = 0.0; + double totalProdRate = 0.0; + + MapMessage conStat = null; // for error handling + try + { + for (MapMessage m: consumers.values()) + { + conStat = m; + minSystemLatency = Math.min(minSystemLatency,m.getDouble(MIN_LATENCY)); + maxSystemLatency = Math.max(maxSystemLatency,m.getDouble(MAX_LATENCY)); + totLatency = totLatency + m.getDouble(AVG_LATENCY); + totStdDev = totStdDev + m.getDouble(STD_DEV); + + minSystemConsRate = Math.min(minSystemConsRate,m.getDouble(CONS_RATE)); + maxSystemConsRate = Math.max(maxSystemConsRate,m.getDouble(CONS_RATE)); + totalConsRate = totalConsRate + m.getDouble(CONS_RATE); + + totalMsgCount = totalMsgCount + m.getLong(MSG_COUNT); + } + } + catch(Exception e) + { + System.err.println("Error calculating stats from Consumer : " + conStat); + } + + + MapMessage prodStat = null; // for error handling + try + { + for (MapMessage m: producers.values()) + { + prodStat = m; + minSystemProdRate = Math.min(minSystemProdRate,m.getDouble(PROD_RATE)); + maxSystemProdRate = Math.max(maxSystemProdRate,m.getDouble(PROD_RATE)); + totalProdRate = totalProdRate + m.getDouble(PROD_RATE); + } + } + catch(Exception e) + { + System.err.println("Error calculating stats from Producer : " + conStat); + } + + avgSystemLatency = totLatency/consumers.size(); + avgSystemLatencyStdDev = totStdDev/consumers.size(); + avgSystemConsRate = totalConsRate/consumers.size(); + avgSystemProdRate = totalProdRate/producers.size(); + + report.log("Total test time : " + totalTestTime + " in " + Clock.getPrecision()); + + totalSystemThroughput = (totalMsgCount*Clock.convertToSecs()/totalTestTime); + } + + public void printResults() throws Exception + { + report.log(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString()); + report.log(new StringBuilder("System Throughput : "). + append(config.getDecimalFormat().format(totalSystemThroughput)). + append(" msg/sec").toString()); + report.log(new StringBuilder("Avg Consumer rate : "). + append(config.getDecimalFormat().format(avgSystemConsRate)). + append(" msg/sec").toString()); + report.log(new StringBuilder("Min Consumer rate : "). + append(config.getDecimalFormat().format(minSystemConsRate)). + append(" msg/sec").toString()); + report.log(new StringBuilder("Max Consumer rate : "). + append(config.getDecimalFormat().format(maxSystemConsRate)). + append(" msg/sec").toString()); + + report.log(new StringBuilder("Avg Producer rate : "). + append(config.getDecimalFormat().format(avgSystemProdRate)). + append(" msg/sec").toString()); + report.log(new StringBuilder("Min Producer rate : "). + append(config.getDecimalFormat().format(minSystemProdRate)). + append(" msg/sec").toString()); + report.log(new StringBuilder("Max Producer rate : "). + append(config.getDecimalFormat().format(maxSystemProdRate)). + append(" msg/sec").toString()); + + report.log(new StringBuilder("Avg System Latency : "). + append(config.getDecimalFormat().format(avgSystemLatency)). + append(" ms").toString()); + report.log(new StringBuilder("Min System Latency : "). + append(config.getDecimalFormat().format(minSystemLatency)). + append(" ms").toString()); + report.log(new StringBuilder("Max System Latency : "). + append(config.getDecimalFormat().format(maxSystemLatency)). + append(" ms").toString()); + if (printStdDev) + { + report.log(new StringBuilder("Avg System Std Dev : "). + append(avgSystemLatencyStdDev).toString()); + } + } + + private synchronized void sendMessageToNodes(OPCode code,Collection<MapMessage> nodes) throws Exception + { + report.log("\nController: Sending code " + code); + MessageProducer tmpProd = controllerSession.createProducer(null); + MapMessage msg = controllerSession.createMapMessage(); + msg.setInt(CODE, code.ordinal()); + for (MapMessage node : nodes) + { + if (node.getString(REPLY_ADDR) == null) + { + report.log("REPLY_ADDR is null " + node); + } + else + { + report.log("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR)); + } + tmpProd.send(controllerSession.createQueue(node.getString(REPLY_ADDR)), msg); + } + } + + public void onMessage(Message msg) + { + try + { + MapMessage m = (MapMessage)msg; + OPCode code = OPCode.values()[m.getInt(CODE)]; + + report.log("\n---------Controller Received Code : " + code); + report.log("---------Data : " + ((AMQPEncodedMapMessage)m).getMap()); + + switch (code) + { + case REGISTER_CONSUMER : + if (consRegistered.getCount() == 0) + { + report.log("Warning : Expected number of consumers have already registered," + + "ignoring extra consumer"); + break; + } + consumers.put(m.getString(ID),m); + consRegistered.countDown(); + break; + + case REGISTER_PRODUCER : + if (prodRegistered.getCount() == 0) + { + report.log("Warning : Expected number of producers have already registered," + + "ignoring extra producer"); + break; + } + producers.put(m.getString(ID),m); + prodRegistered.countDown(); + break; + + case CONSUMER_READY : + consReady.countDown(); + break; + + case PRODUCER_READY : + prodReady.countDown(); + break; + + case RECEIVED_END_MSG : + receivedEndMsg.countDown(); + break; + + case RECEIVED_CONSUMER_STATS : + consumers.put(m.getString(ID),m); + receivedConsStats.countDown(); + break; + + case RECEIVED_PRODUCER_STATS : + producers.put(m.getString(ID),m); + receivedProdStats.countDown(); + break; + + default: + throw new Exception("Invalid OPCode " + code); + } + } + catch (Exception e) + { + handleError(e,"Error when receiving messages " + msg); + } + } + + public void run() + { + try + { + setUp(); + warmup(); + if (testMode == TestMode.SINGLE_RUN) + { + startTest(); + calcStats(); + printResults(); + } + else + { + long startTime = Clock.getTime(); + long timeLimit = duration * 60 * 1000; // duration is in mins. + boolean nextIteration = true; + while (nextIteration) + { + startTest(); + calcStats(); + writeStatsToFile(); + if (Clock.getTime() - startTime < timeLimit) + { + sendMessageToNodes(OPCode.CONTINUE_TEST,consumers.values()); + sendMessageToNodes(OPCode.CONTINUE_TEST,producers.values()); + nextIteration = true; + } + else + { + nextIteration = false; + } + } + } + tearDown(); + + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + @Override + public void tearDown() throws Exception { + report.log("Controller: Completed the test......\n"); + if (testMode == TestMode.TIME_BASED) + { + writer.close(); + } + sendMessageToNodes(OPCode.STOP_TEST,consumers.values()); + sendMessageToNodes(OPCode.STOP_TEST,producers.values()); + super.tearDown(); + } + + public void writeStatsToFile() throws Exception + { + writer.append(String.valueOf(totalMsgCount)).append(","); + writer.append(config.getDecimalFormat().format(totalSystemThroughput)).append(","); + writer.append(config.getDecimalFormat().format(avgSystemConsRate)).append(","); + writer.append(config.getDecimalFormat().format(minSystemConsRate)).append(","); + writer.append(config.getDecimalFormat().format(maxSystemConsRate)).append(","); + writer.append(config.getDecimalFormat().format(avgSystemProdRate)).append(","); + writer.append(config.getDecimalFormat().format(minSystemProdRate)).append(","); + writer.append(config.getDecimalFormat().format(maxSystemProdRate)).append(","); + writer.append(config.getDecimalFormat().format(avgSystemLatency)).append(","); + writer.append(config.getDecimalFormat().format(minSystemLatency)).append(","); + writer.append(config.getDecimalFormat().format(maxSystemLatency)); + if (printStdDev) + { + writer.append(",").append(String.valueOf(avgSystemLatencyStdDev)); + } + writer.append("\n"); + writer.flush(); + } + + public static void main(String[] args) + { + TestConfiguration config = new JVMArgConfiguration(); + MercuryTestController controller = new MercuryTestController(config); + controller.run(); + } +} |