diff options
Diffstat (limited to 'qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java')
-rw-r--r-- | qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java | 224 |
1 files changed, 83 insertions, 141 deletions
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java index b63892bb51..0ef0455a64 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java @@ -20,17 +20,13 @@ */ package org.apache.qpid.tools; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; - -import javax.jms.MapMessage; +import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; +import javax.jms.MessageProducer; import javax.jms.TextMessage; -import org.apache.qpid.client.AMQDestination; import org.apache.qpid.thread.Threading; /** @@ -51,7 +47,7 @@ import org.apache.qpid.thread.Threading; * b) They are on separate machines that have their time synced via a Time Server * * In order to calculate latency the producer inserts a timestamp - * when the message is sent. The consumer will note the current time the message is + * hen the message is sent. The consumer will note the current time the message is * received and will calculate the latency as follows * latency = rcvdTime - msg.getJMSTimestamp() * @@ -59,9 +55,13 @@ import org.apache.qpid.thread.Threading; * variance in latencies. * * Avg latency is measured by adding all latencies and dividing by the total msgs. + * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount * * Throughput * =========== + * System throughput is calculated as follows + * rcvdMsgCount/(rcvdTime - testStartTime) + * * Consumer rate is calculated as * rcvdMsgCount/(rcvdTime - startTime) * @@ -81,160 +81,130 @@ public class PerfConsumer extends PerfBase implements MessageListener long minLatency = Long.MAX_VALUE; long totalLatency = 0; // to calculate avg latency. int rcvdMsgCount = 0; + long testStartTime = 0; // to measure system throughput long startTime = 0; // to measure consumer throughput long rcvdTime = 0; boolean transacted = false; int transSize = 0; - boolean printStdDev = false; - List<Long> sample; - final Object lock = new Object(); - public PerfConsumer(String prefix) + public PerfConsumer() { - super(prefix); - System.out.println("Consumer ID : " + id); + super(); } public void setUp() throws Exception { super.setUp(); consumer = session.createConsumer(dest); - System.out.println("Consumer: " + id + " Receiving messages from : " + ((AMQDestination)dest).getQueueName() + "\n"); // Storing the following two for efficiency transacted = params.isTransacted(); transSize = params.getTransactionSize(); - printStdDev = params.isPrintStdDev(); - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal()); - sendMessageToController(m); } public void warmup()throws Exception { - receiveFromController(OPCode.CONSUMER_STARTWARMUP); - Message msg = consumer.receive(); - // This is to ensure we drain the queue before we start the actual test. - while ( msg != null) + System.out.println("Warming up......"); + + boolean start = false; + while (!start) { - if (msg.getBooleanProperty("End") == true) + Message msg = consumer.receive(); + if (msg instanceof TextMessage) { - // It's more realistic for the consumer to signal this. - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.PRODUCER_READY.ordinal()); - sendMessageToController(m); + if (((TextMessage)msg).getText().equals("End")) + { + start = true; + MessageProducer temp = session.createProducer(msg.getJMSReplyTo()); + temp.send(session.createMessage()); + if (params.isTransacted()) + { + session.commit(); + } + temp.close(); + } } - msg = consumer.receive(1000); - } - - if (params.isTransacted()) - { - session.commit(); } - - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.CONSUMER_READY.ordinal()); - sendMessageToController(m); - consumer.setMessageListener(this); } public void startTest() throws Exception { - System.out.println("Consumer: " + id + " Starting test......" + "\n"); - resetCounters(); + System.out.println("Starting test......"); + consumer.setMessageListener(this); } - public void resetCounters() + public void printResults() throws Exception { - rcvdMsgCount = 0; - maxLatency = 0; - minLatency = Long.MAX_VALUE; - totalLatency = 0; - if (printStdDev) + synchronized (lock) { - sample = null; - sample = new ArrayList<Long>(params.getMsgCount()); + lock.wait(); } - } - - public void sendResults() throws Exception - { - receiveFromController(OPCode.CONSUMER_STOP); double avgLatency = (double)totalLatency/(double)rcvdMsgCount; - double consRate = (double)rcvdMsgCount*Clock.convertToSecs()/(double)(rcvdTime - startTime); - double stdDev = 0.0; - if (printStdDev) - { - stdDev = calculateStdDev(avgLatency); - } - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal()); - m.setDouble(AVG_LATENCY, avgLatency/Clock.convertToMiliSecs()); - m.setDouble(MIN_LATENCY,minLatency/Clock.convertToMiliSecs()); - m.setDouble(MAX_LATENCY,maxLatency/Clock.convertToMiliSecs()); - m.setDouble(STD_DEV, stdDev/Clock.convertToMiliSecs()); - m.setDouble(CONS_RATE, consRate); - m.setLong(MSG_COUNT, rcvdMsgCount); - sendMessageToController(m); - + double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000; + double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000; System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); System.out.println(new StringBuilder("Consumer rate : "). append(df.format(consRate)). append(" msg/sec").toString()); + System.out.println(new StringBuilder("System Throughput : "). + append(df.format(throughput)). + append(" msg/sec").toString()); System.out.println(new StringBuilder("Avg Latency : "). - append(df.format(avgLatency/Clock.convertToMiliSecs())). + append(df.format(avgLatency)). append(" ms").toString()); System.out.println(new StringBuilder("Min Latency : "). - append(df.format(minLatency/Clock.convertToMiliSecs())). + append(minLatency). append(" ms").toString()); System.out.println(new StringBuilder("Max Latency : "). - append(df.format(maxLatency/Clock.convertToMiliSecs())). + append(maxLatency). append(" ms").toString()); - if (printStdDev) - { - System.out.println(new StringBuilder("Std Dev : "). - append(stdDev/Clock.convertToMiliSecs()).toString()); - } + System.out.println("Completed the test......\n"); } - public double calculateStdDev(double mean) + public void notifyCompletion(Destination replyTo) throws Exception { - double v = 0; - for (double latency: sample) + MessageProducer tmp = session.createProducer(replyTo); + Message endMsg = session.createMessage(); + tmp.send(endMsg); + if (params.isTransacted()) { - v = v + Math.pow((latency-mean), 2); + session.commit(); } - v = v/sample.size(); - return Math.round(Math.sqrt(v)); + tmp.close(); + } + + public void tearDown() throws Exception + { + consumer.close(); + session.close(); + con.close(); } public void onMessage(Message msg) { try { - // To figure out the decoding overhead of text - if (msgType == MessageType.TEXT) + if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End")) { - ((TextMessage)msg).getText(); - } + notifyCompletion(msg.getJMSReplyTo()); - if (msg.getBooleanProperty("End")) - { - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal()); - sendMessageToController(m); + synchronized (lock) + { + lock.notifyAll(); + } } else { - rcvdTime = Clock.getTime(); + rcvdTime = System.currentTimeMillis(); rcvdMsgCount ++; if (rcvdMsgCount == 1) { startTime = rcvdTime; + testStartTime = msg.getJMSTimestamp(); } if (transacted && (rcvdMsgCount % transSize == 0)) @@ -242,14 +212,10 @@ public class PerfConsumer extends PerfBase implements MessageListener session.commit(); } - long latency = rcvdTime - msg.getLongProperty(TIMESTAMP); + long latency = rcvdTime - msg.getJMSTimestamp(); maxLatency = Math.max(maxLatency, latency); minLatency = Math.min(minLatency, latency); totalLatency = totalLatency + latency; - if (printStdDev) - { - sample.add(latency); - } } } @@ -260,21 +226,14 @@ public class PerfConsumer extends PerfBase implements MessageListener } - public void run() + public void test() { try { setUp(); warmup(); - boolean nextIteration = true; - while (nextIteration) - { - System.out.println("=========================================================\n"); - System.out.println("Consumer: " + id + " starting a new iteration ......\n"); - startTest(); - sendResults(); - nextIteration = continueTest(); - } + startTest(); + printResults(); tearDown(); } catch(Exception e) @@ -283,43 +242,26 @@ public class PerfConsumer extends PerfBase implements MessageListener } } - @Override - public void tearDown() throws Exception - { - super.tearDown(); - } - - public static void main(String[] args) throws InterruptedException + public static void main(String[] args) { - String scriptId = (args.length == 1) ? args[0] : ""; - int conCount = Integer.getInteger("con_count",1); - final CountDownLatch testCompleted = new CountDownLatch(conCount); - for (int i=0; i < conCount; i++) + final PerfConsumer cons = new PerfConsumer(); + Runnable r = new Runnable() { - - final PerfConsumer cons = new PerfConsumer(scriptId + i); - Runnable r = new Runnable() - { - public void run() - { - cons.run(); - testCompleted.countDown(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) + public void run() { - throw new Error("Error creating consumer thread",e); + cons.test(); } - t.start(); - + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); } - testCompleted.await(); - System.out.println("Consumers have completed the test......\n"); + t.start(); } }
\ No newline at end of file |