diff options
5 files changed, 210 insertions, 477 deletions
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java index 979d2ef76f..4e79dd62a8 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java @@ -27,6 +27,8 @@ package org.apache.qpid.tools; public class Clock { + public final static long SEC = 60000; + private static Precision precision; private static long offset = -1; // in nano secs diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java index 121e94cea1..097b021b3e 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java @@ -21,7 +21,6 @@ package org.apache.qpid.tools; import java.net.InetAddress; -import java.text.DecimalFormat; import java.util.UUID; import javax.jms.Connection; @@ -32,14 +31,17 @@ import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.qpid.client.AMQAnyDestination; -import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession_0_10; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.messaging.Address; +import org.apache.qpid.tools.TestConfiguration.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class PerfBase +public class MercuryBase { + private static final Logger _logger = LoggerFactory.getLogger(MercuryBase.class); + public final static String CODE = "CODE"; public final static String ID = "ID"; public final static String REPLY_ADDR = "REPLY_ADDR"; @@ -54,14 +56,13 @@ public class PerfBase String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}"); - TestParams params; + TestConfiguration config; Connection con; Session session; Session controllerSession; Destination dest; Destination myControlQueue; Destination controllerQueue; - DecimalFormat df = new DecimalFormat("###.##"); String id; String myControlQueueAddr; @@ -69,7 +70,8 @@ public class PerfBase MessageConsumer receiveFromController; String prefix = ""; - enum OPCode { + enum OPCode + { REGISTER_CONSUMER, REGISTER_PRODUCER, PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP, CONSUMER_READY, PRODUCER_READY, @@ -79,39 +81,11 @@ public class PerfBase CONTINUE_TEST, STOP_TEST }; - enum MessageType { - BYTES, TEXT, MAP, OBJECT; - - public static MessageType getType(String s) throws Exception - { - if ("text".equalsIgnoreCase(s)) - { - return TEXT; - } - else if ("bytes".equalsIgnoreCase(s)) - { - return BYTES; - } - /*else if ("map".equalsIgnoreCase(s)) - { - return MAP; - } - else if ("object".equalsIgnoreCase(s)) - { - return OBJECT; - }*/ - else - { - throw new Exception("Unsupported message type"); - } - } - }; - MessageType msgType = MessageType.BYTES; - public PerfBase(String prefix) + public MercuryBase(TestConfiguration config,String prefix) { - params = new TestParams(); + this.config = config; String host = ""; try { @@ -127,25 +101,16 @@ public class PerfBase public void setUp() throws Exception { - if (params.getHost().equals("") || params.getPort() == -1) - { - con = new AMQConnection(params.getUrl()); - } - else - { - con = new AMQConnection(params.getHost(),params.getPort(),"guest","guest","test","test"); - } + con = config.createConnection(); con.start(); - session = con.createSession(params.isTransacted(), - params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode()); controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); dest = createDestination(); - controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR); + controllerQueue = AMQDestination.createDestination(CONTROLLER_ADDR); myControlQueue = session.createQueue(myControlQueueAddr); - msgType = MessageType.getType(params.getMessageType()); - System.out.println("Using " + msgType + " messages"); + msgType = MessageType.getType(config.getMessageType()); + _logger.debug("Using " + msgType + " messages"); sendToController = controllerSession.createProducer(controllerQueue); receiveFromController = controllerSession.createConsumer(myControlQueue); @@ -153,11 +118,11 @@ public class PerfBase private Destination createDestination() throws Exception { - if (params.isUseUniqueDests()) + if (config.isUseUniqueDests()) { - System.out.println("Prefix : " + prefix); - Address addr = Address.parse(params.getAddress()); - AMQAnyDestination temp = new AMQAnyDestination(params.getAddress()); + _logger.debug("Prefix : " + prefix); + Address addr = Address.parse(config.getAddress()); + AMQDestination temp = (AMQDestination) AMQDestination.createDestination(config.getAddress()); int type = ((AMQSession_0_10)session).resolveAddressType(temp); if ( type == AMQDestination.TOPIC_TYPE) @@ -171,11 +136,11 @@ public class PerfBase System.out.println("Setting name : " + addr); } - return new AMQAnyDestination(addr); + return AMQDestination.createDestination(addr.toString()); } else { - return new AMQAnyDestination(params.getAddress()); + return AMQDestination.createDestination(config.getAddress()); } } @@ -190,7 +155,7 @@ public class PerfBase { MapMessage m = (MapMessage)receiveFromController.receive(); OPCode code = OPCode.values()[m.getInt(CODE)]; - System.out.println("Received Code : " + code); + _logger.debug("Received Code : " + code); if (expected != code) { throw new Exception("Expected OPCode : " + expected + " but received : " + code); @@ -202,7 +167,7 @@ public class PerfBase { MapMessage m = (MapMessage)receiveFromController.receive(); OPCode code = OPCode.values()[m.getInt(CODE)]; - System.out.println("Received Code : " + code); + _logger.debug("Received Code : " + code); return (code == OPCode.CONTINUE_TEST); } diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java index b63892bb51..b35adc45d6 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java @@ -20,18 +20,15 @@ */ package org.apache.qpid.tools; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.CountDownLatch; import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.TextMessage; -import org.apache.qpid.client.AMQDestination; import org.apache.qpid.thread.Threading; +import org.apache.qpid.tools.report.MercuryReporter; +import org.apache.qpid.tools.report.MercuryReporter.MercuryThroughputAndLatency; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * PerfConsumer will receive x no of messages in warmup mode. @@ -74,39 +71,28 @@ import org.apache.qpid.thread.Threading; * */ -public class PerfConsumer extends PerfBase implements MessageListener +public class MercuryConsumerController extends MercuryBase { - MessageConsumer consumer; - long maxLatency = 0; - long minLatency = Long.MAX_VALUE; - long totalLatency = 0; // to calculate avg latency. - int rcvdMsgCount = 0; - long startTime = 0; // to measure consumer throughput - long rcvdTime = 0; - boolean transacted = false; - int transSize = 0; + private static final Logger _logger = LoggerFactory.getLogger(MercuryConsumerController.class); + MercuryReporter reporter; + TestConfiguration config; + QpidReceive receiver; - boolean printStdDev = false; - List<Long> sample; - - final Object lock = new Object(); - - public PerfConsumer(String prefix) + public MercuryConsumerController(TestConfiguration config, MercuryReporter reporter, String prefix) { - super(prefix); - System.out.println("Consumer ID : " + id); + super(config,prefix); + this.reporter = reporter; + if (_logger.isInfoEnabled()) + { + _logger.info("Consumer ID : " + id); + } } public void setUp() throws Exception { super.setUp(); - consumer = session.createConsumer(dest); - System.out.println("Consumer: " + id + " Receiving messages from : " + ((AMQDestination)dest).getQueueName() + "\n"); - - // Storing the following two for efficiency - transacted = params.isTransacted(); - transSize = params.getTransactionSize(); - printStdDev = params.isPrintStdDev(); + receiver = new QpidReceive(reporter,config, con,dest); + receiver.setUp(); MapMessage m = controllerSession.createMapMessage(); m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal()); sendMessageToController(m); @@ -115,151 +101,71 @@ public class PerfConsumer extends PerfBase implements MessageListener public void warmup()throws Exception { receiveFromController(OPCode.CONSUMER_STARTWARMUP); - Message msg = consumer.receive(); - // This is to ensure we drain the queue before we start the actual test. - while ( msg != null) - { - if (msg.getBooleanProperty("End") == true) - { - // It's more realistic for the consumer to signal this. - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.PRODUCER_READY.ordinal()); - sendMessageToController(m); - } - msg = consumer.receive(1000); - } + receiver.waitforCompletion(config.getWarmupCount()); - if (params.isTransacted()) - { - session.commit(); - } + // It's more realistic for the consumer to signal this. + MapMessage m1 = controllerSession.createMapMessage(); + m1.setInt(CODE, OPCode.PRODUCER_READY.ordinal()); + sendMessageToController(m1); - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.CONSUMER_READY.ordinal()); - sendMessageToController(m); - consumer.setMessageListener(this); + MapMessage m2 = controllerSession.createMapMessage(); + m2.setInt(CODE, OPCode.CONSUMER_READY.ordinal()); + sendMessageToController(m2); } - public void startTest() throws Exception + public void runReceiver() throws Exception { - System.out.println("Consumer: " + id + " Starting test......" + "\n"); + if (_logger.isInfoEnabled()) + { + _logger.info("Consumer: " + id + " Starting iteration......" + "\n"); + } resetCounters(); + receiver.waitforCompletion(config.getMsgCount()); + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal()); + sendMessageToController(m); } public void resetCounters() { - rcvdMsgCount = 0; - maxLatency = 0; - minLatency = Long.MAX_VALUE; - totalLatency = 0; - if (printStdDev) - { - sample = null; - sample = new ArrayList<Long>(params.getMsgCount()); - } + reporter.clear(); } public void sendResults() throws Exception { receiveFromController(OPCode.CONSUMER_STOP); + reporter.report(); - double avgLatency = (double)totalLatency/(double)rcvdMsgCount; - double consRate = (double)rcvdMsgCount*Clock.convertToSecs()/(double)(rcvdTime - startTime); - double stdDev = 0.0; - if (printStdDev) - { - stdDev = calculateStdDev(avgLatency); - } MapMessage m = controllerSession.createMapMessage(); m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal()); - m.setDouble(AVG_LATENCY, avgLatency/Clock.convertToMiliSecs()); - m.setDouble(MIN_LATENCY,minLatency/Clock.convertToMiliSecs()); - m.setDouble(MAX_LATENCY,maxLatency/Clock.convertToMiliSecs()); - m.setDouble(STD_DEV, stdDev/Clock.convertToMiliSecs()); - m.setDouble(CONS_RATE, consRate); - m.setLong(MSG_COUNT, rcvdMsgCount); + m.setDouble(AVG_LATENCY, reporter.getAvgLatency()); + m.setDouble(MIN_LATENCY, reporter.getMinLatency()); + m.setDouble(MAX_LATENCY, reporter.getMaxLatency()); + m.setDouble(STD_DEV, reporter.getStdDev()); + m.setDouble(CONS_RATE, reporter.getRate()); + m.setLong(MSG_COUNT, reporter.getSampleSize()); sendMessageToController(m); - System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); - System.out.println(new StringBuilder("Consumer rate : "). - append(df.format(consRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Avg Latency : "). - append(df.format(avgLatency/Clock.convertToMiliSecs())). - append(" ms").toString()); - System.out.println(new StringBuilder("Min Latency : "). - append(df.format(minLatency/Clock.convertToMiliSecs())). - append(" ms").toString()); - System.out.println(new StringBuilder("Max Latency : "). - append(df.format(maxLatency/Clock.convertToMiliSecs())). - append(" ms").toString()); - if (printStdDev) + reporter.log(new StringBuilder("Total Msgs Received : ").append(reporter.getSampleSize()).toString()); + reporter.log(new StringBuilder("Consumer rate : "). + append(config.getDecimalFormat().format(reporter.getRate())). + append(" msg/sec").toString()); + reporter.log(new StringBuilder("Avg Latency : "). + append(config.getDecimalFormat().format(reporter.getAvgLatency())). + append(" ms").toString()); + reporter.log(new StringBuilder("Min Latency : "). + append(config.getDecimalFormat().format(reporter.getMinLatency())). + append(" ms").toString()); + reporter.log(new StringBuilder("Max Latency : "). + append(config.getDecimalFormat().format(reporter.getMaxLatency())). + append(" ms").toString()); + if (config.isPrintStdDev()) { - System.out.println(new StringBuilder("Std Dev : "). - append(stdDev/Clock.convertToMiliSecs()).toString()); + reporter.log(new StringBuilder("Std Dev : "). + append(reporter.getStdDev()).toString()); } } - public double calculateStdDev(double mean) - { - double v = 0; - for (double latency: sample) - { - v = v + Math.pow((latency-mean), 2); - } - v = v/sample.size(); - return Math.round(Math.sqrt(v)); - } - - public void onMessage(Message msg) - { - try - { - // To figure out the decoding overhead of text - if (msgType == MessageType.TEXT) - { - ((TextMessage)msg).getText(); - } - - if (msg.getBooleanProperty("End")) - { - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal()); - sendMessageToController(m); - } - else - { - rcvdTime = Clock.getTime(); - rcvdMsgCount ++; - - if (rcvdMsgCount == 1) - { - startTime = rcvdTime; - } - - if (transacted && (rcvdMsgCount % transSize == 0)) - { - session.commit(); - } - - long latency = rcvdTime - msg.getLongProperty(TIMESTAMP); - maxLatency = Math.max(maxLatency, latency); - minLatency = Math.min(minLatency, latency); - totalLatency = totalLatency + latency; - if (printStdDev) - { - sample.add(latency); - } - } - - } - catch(Exception e) - { - handleError(e,"Error when receiving messages"); - } - - } - public void run() { try @@ -271,7 +177,7 @@ public class PerfConsumer extends PerfBase implements MessageListener { System.out.println("=========================================================\n"); System.out.println("Consumer: " + id + " starting a new iteration ......\n"); - startTest(); + runReceiver(); sendResults(); nextIteration = continueTest(); } @@ -283,21 +189,22 @@ public class PerfConsumer extends PerfBase implements MessageListener } } - @Override + @Override public void tearDown() throws Exception { super.tearDown(); } - public static void main(String[] args) throws InterruptedException + public static void main(String[] args) throws Exception { + TestConfiguration config = new JVMArgConfiguration(); + MercuryReporter reporter= new MercuryReporter(MercuryThroughputAndLatency.class,System.out,10,true); String scriptId = (args.length == 1) ? args[0] : ""; - int conCount = Integer.getInteger("con_count",1); + int conCount = config.getConnectionCount(); final CountDownLatch testCompleted = new CountDownLatch(conCount); for (int i=0; i < conCount; i++) { - - final PerfConsumer cons = new PerfConsumer(scriptId + i); + final MercuryConsumerController cons = new MercuryConsumerController(config, reporter, scriptId + i); Runnable r = new Runnable() { public void run() @@ -317,9 +224,8 @@ public class PerfConsumer extends PerfBase implements MessageListener throw new Error("Error creating consumer thread",e); } t.start(); - } testCompleted.await(); - System.out.println("Consumers have completed the test......\n"); + reporter.log("Consumers have completed the test......\n"); } }
\ No newline at end of file diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java index ac6129ab68..02377bb853 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java @@ -20,19 +20,15 @@ */ package org.apache.qpid.tools; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; import java.util.concurrent.CountDownLatch; -import javax.jms.BytesMessage; -import javax.jms.DeliveryMode; import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageProducer; -import org.apache.qpid.client.AMQDestination; import org.apache.qpid.thread.Threading; +import org.apache.qpid.tools.report.MercuryReporter; +import org.apache.qpid.tools.report.MercuryReporter.MercuryThroughput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation @@ -60,216 +56,68 @@ import org.apache.qpid.thread.Threading; * latency is good. * */ -public class PerfProducer extends PerfBase +public class MercuryProducerController extends MercuryBase { - private static long SEC = 60000; + private static final Logger _logger = LoggerFactory.getLogger(MercuryProducerController.class); + MercuryReporter reporter; + QpidSend sender; - MessageProducer producer; - Message msg; - Object payload; - List<Object> payloads; - boolean cacheMsg = false; - boolean randomMsgSize = false; - boolean durable = false; - Random random; - int msgSizeRange = 1024; - boolean rateLimitProducer = false; - double rateFactor = 0.4; - double rate = 0.0; - - public PerfProducer(String prefix) + public MercuryProducerController(TestConfiguration config, MercuryReporter reporter, String prefix) { - super(prefix); + super(config,prefix); + this.reporter = reporter; System.out.println("Producer ID : " + id); } public void setUp() throws Exception { super.setUp(); - durable = params.isDurable(); - rateLimitProducer = params.getRate() > 0 ? true : false; - if (rateLimitProducer) - { - System.out.println("The test will attempt to limit the producer to " + params.getRate() + " msg/sec"); - } - - // if message caching is enabled we pre create the message - // else we pre create the payload - if (params.isCacheMessage()) - { - cacheMsg = true; - msg = createMessage(createPayload(params.getMsgSize())); - msg.setJMSDeliveryMode(durable? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - } - else if (params.isRandomMsgSize()) - { - random = new Random(20080921); - randomMsgSize = true; - msgSizeRange = params.getMsgSize(); - payloads = new ArrayList<Object>(msgSizeRange); - - for (int i=0; i < msgSizeRange; i++) - { - payloads.add(createPayload(i)); - } - } - else - { - payload = createPayload(params.getMsgSize()); - } - - producer = session.createProducer(dest); - System.out.println("Producer: " + id + " Sending messages to: " + ((AMQDestination)dest).getQueueName()); - producer.setDisableMessageID(params.isDisableMessageID()); - producer.setDisableMessageTimestamp(params.isDisableTimestamp()); - + sender = new QpidSend(reporter,config, con,dest); + sender.setUp(); MapMessage m = controllerSession.createMapMessage(); m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal()); sendMessageToController(m); } - Object createPayload(int size) - { - if (msgType == MessageType.TEXT) - { - return MessageFactory.createMessagePayload(size); - } - else - { - return MessageFactory.createMessagePayload(size).getBytes(); - } - } - - Message createMessage(Object payload) throws Exception - { - if (msgType == MessageType.TEXT) - { - return session.createTextMessage((String)payload); - } - else - { - BytesMessage m = session.createBytesMessage(); - m.writeBytes((byte[])payload); - return m; - } - } - - protected Message getNextMessage() throws Exception - { - if (cacheMsg) - { - return msg; - } - else - { - Message m; - - if (!randomMsgSize) - { - m = createMessage(payload); - } - else - { - m = createMessage(payloads.get(random.nextInt(msgSizeRange))); - } - m.setJMSDeliveryMode(durable? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - return m; - } - } - public void warmup()throws Exception { receiveFromController(OPCode.PRODUCER_STARTWARMUP); - System.out.println("Producer: " + id + " Warming up......"); - - for (int i=0; i < params.getWarmupCount() -1; i++) + if (_logger.isInfoEnabled()) { - producer.send(getNextMessage()); - } - sendEndMessage(); - - if (params.isTransacted()) - { - session.commit(); + _logger.info("Producer: " + id + " Warming up......"); } + sender.send(config.getWarmupCount()); + sender.sendEndMessage(); } - public void startTest() throws Exception + public void runSender() throws Exception { resetCounters(); receiveFromController(OPCode.PRODUCER_START); - int count = params.getMsgCount(); - boolean transacted = params.isTransacted(); - int tranSize = params.getTransactionSize(); - - long limit = (long)(params.getRate() * rateFactor); // in msecs - long timeLimit = (long)(SEC * rateFactor); // in msecs - - long start = Clock.getTime(); // defaults to nano secs - long interval = start; - for(int i=0; i < count; i++ ) - { - Message msg = getNextMessage(); - msg.setLongProperty(TIMESTAMP, Clock.getTime()); - producer.send(msg); - if ( transacted && ((i+1) % tranSize == 0)) - { - session.commit(); - } - - if (rateLimitProducer && i%limit == 0) - { - long elapsed = (Clock.getTime() - interval)*Clock.convertToMiliSecs(); // in msecs - if (elapsed < timeLimit) - { - Thread.sleep(elapsed); - } - interval = Clock.getTime(); - - } - } - sendEndMessage(); - if ( transacted) - { - session.commit(); - } - long time = Clock.getTime() - start; - rate = (double)count*Clock.convertToSecs()/(double)time; - System.out.println(new StringBuilder("Producer rate: "). - append(df.format(rate)). - append(" msg/sec"). - toString()); + sender.send(config.getMsgCount()); } public void resetCounters() { - - } - - public void sendEndMessage() throws Exception - { - Message msg = session.createMessage(); - msg.setBooleanProperty("End", true); - producer.send(msg); + sender.resetCounters(); } public void sendResults() throws Exception { MapMessage msg = controllerSession.createMapMessage(); msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal()); - msg.setDouble(PROD_RATE, rate); + msg.setDouble(PROD_RATE, reporter.getRate()); sendMessageToController(msg); + reporter.log(new StringBuilder("Producer rate: "). + append(config.getDecimalFormat().format(reporter.getRate())). + append(" msg/sec"). + toString()); } @Override public void tearDown() throws Exception { + sender.tearDown(); super.tearDown(); } @@ -282,9 +130,12 @@ public class PerfProducer extends PerfBase boolean nextIteration = true; while (nextIteration) { - System.out.println("=========================================================\n"); - System.out.println("Producer: " + id + " starting a new iteration ......\n"); - startTest(); + if(_logger.isInfoEnabled()) + { + _logger.info("=========================================================\n"); + _logger.info("Producer: " + id + " starting a new iteration ......\n"); + } + runSender(); sendResults(); nextIteration = continueTest(); } @@ -298,9 +149,9 @@ public class PerfProducer extends PerfBase public void startControllerIfNeeded() { - if (!params.isExternalController()) + if (!config.isExternalController()) { - final PerfTestController controller = new PerfTestController(); + final MercuryTestController controller = new MercuryTestController(config); Runnable r = new Runnable() { public void run() @@ -322,15 +173,16 @@ public class PerfProducer extends PerfBase } } - - public static void main(String[] args) throws InterruptedException + public static void main(String[] args) throws Exception { + TestConfiguration config = new JVMArgConfiguration(); + MercuryReporter reporter= new MercuryReporter(MercuryThroughput.class,System.out,10,true); String scriptId = (args.length == 1) ? args[0] : ""; - int conCount = Integer.getInteger("con_count",1); + int conCount = config.getConnectionCount(); final CountDownLatch testCompleted = new CountDownLatch(conCount); for (int i=0; i < conCount; i++) { - final PerfProducer prod = new PerfProducer(scriptId + i); + final MercuryProducerController prod = new MercuryProducerController(config, reporter, scriptId + i); prod.startControllerIfNeeded(); Runnable r = new Runnable() { @@ -353,6 +205,6 @@ public class PerfProducer extends PerfBase t.start(); } testCompleted.await(); - System.out.println("Producers have completed the test......"); + reporter.log("Producers have completed the test......"); } }
\ No newline at end of file diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java index 5fca1fa4bd..8c66a1e44d 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java @@ -33,6 +33,9 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import org.apache.qpid.client.message.AMQPEncodedMapMessage; +import org.apache.qpid.tools.report.Reporter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The Controller coordinates a test run between a number @@ -62,8 +65,10 @@ import org.apache.qpid.client.message.AMQPEncodedMapMessage; * System throughput is calculated as follows * totalMsgCount/(totalTestTime) */ -public class PerfTestController extends PerfBase implements MessageListener +public class MercuryTestController extends MercuryBase implements MessageListener { + private static final Logger _logger = LoggerFactory.getLogger(MercuryProducerController.class); + enum TestMode { SINGLE_RUN, TIME_BASED }; TestMode testMode = TestMode.SINGLE_RUN; @@ -102,11 +107,13 @@ public class PerfTestController extends PerfBase implements MessageListener private MessageConsumer consumer; private boolean printStdDev = false; - FileWriter writer; + private FileWriter writer; + private Reporter report; - public PerfTestController() + public MercuryTestController(TestConfiguration config) { - super(""); + super(config,""); + consumers = new ConcurrentHashMap<String,MapMessage>(consumerCount); producers = new ConcurrentHashMap<String,MapMessage>(producerCount); @@ -114,7 +121,7 @@ public class PerfTestController extends PerfBase implements MessageListener prodRegistered = new CountDownLatch(producerCount); consReady = new CountDownLatch(consumerCount); prodReady = new CountDownLatch(producerCount); - printStdDev = params.isPrintStdDev(); + printStdDev = config.isPrintStdDev(); testMode = (duration == -1) ? TestMode.SINGLE_RUN : TestMode.TIME_BASED; } @@ -126,28 +133,28 @@ public class PerfTestController extends PerfBase implements MessageListener writer = new FileWriter("stats-csv.log"); } consumer = controllerSession.createConsumer(controllerQueue); - System.out.println("\nController: " + producerCount + " producers are expected"); - System.out.println("Controller: " + consumerCount + " consumers are expected \n"); + report.log("\nController: " + producerCount + " producers are expected"); + report.log("Controller: " + consumerCount + " consumers are expected \n"); consumer.setMessageListener(this); consRegistered.await(); prodRegistered.await(); - System.out.println("\nController: All producers and consumers have registered......\n"); + report.log("\nController: All producers and consumers have registered......\n"); } public void warmup() throws Exception { - System.out.println("Controller initiating warm up sequence......"); + report.log("Controller initiating warm up sequence......"); sendMessageToNodes(OPCode.CONSUMER_STARTWARMUP,consumers.values()); sendMessageToNodes(OPCode.PRODUCER_STARTWARMUP,producers.values()); prodReady.await(); consReady.await(); - System.out.println("\nController : All producers and consumers are ready to start the test......\n"); + report.log("\nController : All producers and consumers are ready to start the test......\n"); } public void startTest() throws Exception { resetCounters(); - System.out.println("\nController Starting test......"); + report.log("\nController Starting test......"); long start = Clock.getTime(); sendMessageToNodes(OPCode.PRODUCER_START,producers.values()); receivedEndMsg.await(); @@ -200,7 +207,7 @@ public class PerfTestController extends PerfBase implements MessageListener } catch(Exception e) { - System.out.println("Error calculating stats from Consumer : " + conStat); + System.err.println("Error calculating stats from Consumer : " + conStat); } @@ -217,7 +224,7 @@ public class PerfTestController extends PerfBase implements MessageListener } catch(Exception e) { - System.out.println("Error calculating stats from Producer : " + conStat); + System.err.println("Error calculating stats from Producer : " + conStat); } avgSystemLatency = totLatency/consumers.size(); @@ -225,56 +232,56 @@ public class PerfTestController extends PerfBase implements MessageListener avgSystemConsRate = totalConsRate/consumers.size(); avgSystemProdRate = totalProdRate/producers.size(); - System.out.println("Total test time : " + totalTestTime + " in " + Clock.getPrecision()); + report.log("Total test time : " + totalTestTime + " in " + Clock.getPrecision()); totalSystemThroughput = (totalMsgCount*Clock.convertToSecs()/totalTestTime); } public void printResults() throws Exception { - System.out.println(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString()); - System.out.println(new StringBuilder("System Throughput : "). - append(df.format(totalSystemThroughput)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Avg Consumer rate : "). - append(df.format(avgSystemConsRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Min Consumer rate : "). - append(df.format(minSystemConsRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Max Consumer rate : "). - append(df.format(maxSystemConsRate)). - append(" msg/sec").toString()); - - System.out.println(new StringBuilder("Avg Producer rate : "). - append(df.format(avgSystemProdRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Min Producer rate : "). - append(df.format(minSystemProdRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Max Producer rate : "). - append(df.format(maxSystemProdRate)). - append(" msg/sec").toString()); - - System.out.println(new StringBuilder("Avg System Latency : "). - append(df.format(avgSystemLatency)). - append(" ms").toString()); - System.out.println(new StringBuilder("Min System Latency : "). - append(df.format(minSystemLatency)). - append(" ms").toString()); - System.out.println(new StringBuilder("Max System Latency : "). - append(df.format(maxSystemLatency)). - append(" ms").toString()); + report.log(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString()); + report.log(new StringBuilder("System Throughput : "). + append(config.getDecimalFormat().format(totalSystemThroughput)). + append(" msg/sec").toString()); + report.log(new StringBuilder("Avg Consumer rate : "). + append(config.getDecimalFormat().format(avgSystemConsRate)). + append(" msg/sec").toString()); + report.log(new StringBuilder("Min Consumer rate : "). + append(config.getDecimalFormat().format(minSystemConsRate)). + append(" msg/sec").toString()); + report.log(new StringBuilder("Max Consumer rate : "). + append(config.getDecimalFormat().format(maxSystemConsRate)). + append(" msg/sec").toString()); + + report.log(new StringBuilder("Avg Producer rate : "). + append(config.getDecimalFormat().format(avgSystemProdRate)). + append(" msg/sec").toString()); + report.log(new StringBuilder("Min Producer rate : "). + append(config.getDecimalFormat().format(minSystemProdRate)). + append(" msg/sec").toString()); + report.log(new StringBuilder("Max Producer rate : "). + append(config.getDecimalFormat().format(maxSystemProdRate)). + append(" msg/sec").toString()); + + report.log(new StringBuilder("Avg System Latency : "). + append(config.getDecimalFormat().format(avgSystemLatency)). + append(" ms").toString()); + report.log(new StringBuilder("Min System Latency : "). + append(config.getDecimalFormat().format(minSystemLatency)). + append(" ms").toString()); + report.log(new StringBuilder("Max System Latency : "). + append(config.getDecimalFormat().format(maxSystemLatency)). + append(" ms").toString()); if (printStdDev) { - System.out.println(new StringBuilder("Avg System Std Dev : "). - append(avgSystemLatencyStdDev)); + report.log(new StringBuilder("Avg System Std Dev : "). + append(avgSystemLatencyStdDev).toString()); } } private synchronized void sendMessageToNodes(OPCode code,Collection<MapMessage> nodes) throws Exception { - System.out.println("\nController: Sending code " + code); + report.log("\nController: Sending code " + code); MessageProducer tmpProd = controllerSession.createProducer(null); MapMessage msg = controllerSession.createMapMessage(); msg.setInt(CODE, code.ordinal()); @@ -282,11 +289,11 @@ public class PerfTestController extends PerfBase implements MessageListener { if (node.getString(REPLY_ADDR) == null) { - System.out.println("REPLY_ADDR is null " + node); + report.log("REPLY_ADDR is null " + node); } else { - System.out.println("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR)); + report.log("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR)); } tmpProd.send(controllerSession.createQueue(node.getString(REPLY_ADDR)), msg); } @@ -299,16 +306,16 @@ public class PerfTestController extends PerfBase implements MessageListener MapMessage m = (MapMessage)msg; OPCode code = OPCode.values()[m.getInt(CODE)]; - System.out.println("\n---------Controller Received Code : " + code); - System.out.println("---------Data : " + ((AMQPEncodedMapMessage)m).getMap()); + report.log("\n---------Controller Received Code : " + code); + report.log("---------Data : " + ((AMQPEncodedMapMessage)m).getMap()); switch (code) { case REGISTER_CONSUMER : if (consRegistered.getCount() == 0) { - System.out.println("Warning : Expected number of consumers have already registered," + - "ignoring extra consumer"); + report.log("Warning : Expected number of consumers have already registered," + + "ignoring extra consumer"); break; } consumers.put(m.getString(ID),m); @@ -318,8 +325,8 @@ public class PerfTestController extends PerfBase implements MessageListener case REGISTER_PRODUCER : if (prodRegistered.getCount() == 0) { - System.out.println("Warning : Expected number of producers have already registered," + - "ignoring extra producer"); + report.log("Warning : Expected number of producers have already registered," + + "ignoring extra producer"); break; } producers.put(m.getString(ID),m); @@ -403,7 +410,7 @@ public class PerfTestController extends PerfBase implements MessageListener @Override public void tearDown() throws Exception { - System.out.println("Controller: Completed the test......\n"); + report.log("Controller: Completed the test......\n"); if (testMode == TestMode.TIME_BASED) { writer.close(); @@ -416,16 +423,16 @@ public class PerfTestController extends PerfBase implements MessageListener public void writeStatsToFile() throws Exception { writer.append(String.valueOf(totalMsgCount)).append(","); - writer.append(df.format(totalSystemThroughput)).append(","); - writer.append(df.format(avgSystemConsRate)).append(","); - writer.append(df.format(minSystemConsRate)).append(","); - writer.append(df.format(maxSystemConsRate)).append(","); - writer.append(df.format(avgSystemProdRate)).append(","); - writer.append(df.format(minSystemProdRate)).append(","); - writer.append(df.format(maxSystemProdRate)).append(","); - writer.append(df.format(avgSystemLatency)).append(","); - writer.append(df.format(minSystemLatency)).append(","); - writer.append(df.format(maxSystemLatency)); + writer.append(config.getDecimalFormat().format(totalSystemThroughput)).append(","); + writer.append(config.getDecimalFormat().format(avgSystemConsRate)).append(","); + writer.append(config.getDecimalFormat().format(minSystemConsRate)).append(","); + writer.append(config.getDecimalFormat().format(maxSystemConsRate)).append(","); + writer.append(config.getDecimalFormat().format(avgSystemProdRate)).append(","); + writer.append(config.getDecimalFormat().format(minSystemProdRate)).append(","); + writer.append(config.getDecimalFormat().format(maxSystemProdRate)).append(","); + writer.append(config.getDecimalFormat().format(avgSystemLatency)).append(","); + writer.append(config.getDecimalFormat().format(minSystemLatency)).append(","); + writer.append(config.getDecimalFormat().format(maxSystemLatency)); if (printStdDev) { writer.append(",").append(String.valueOf(avgSystemLatencyStdDev)); @@ -436,7 +443,8 @@ public class PerfTestController extends PerfBase implements MessageListener public static void main(String[] args) { - PerfTestController controller = new PerfTestController(); + TestConfiguration config = new JVMArgConfiguration(); + MercuryTestController controller = new MercuryTestController(config); controller.run(); } } |