diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-07-13 22:42:46 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-07-13 22:42:46 +0000 |
commit | 05ebb6d1625b94e240648c2d251f3c524c7ab7d7 (patch) | |
tree | 94b029d92d15d80ba71e4c0df0b9e2511c7aa601 | |
parent | 6f97615e2ed577dd12f6ed677680feb24ce350dc (diff) | |
download | qpid-python-05ebb6d1625b94e240648c2d251f3c524c7ab7d7.tar.gz |
QPID-3358 Added the ability to specify a message type (ex bytes, text) and also calculate standard deviation for the latency sample.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1146508 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 129 insertions, 59 deletions
diff --git a/java/tools/bin/perf_report.sh b/java/tools/bin/perf_report.sh index e6b4c987e5..dd80f8c72c 100755 --- a/java/tools/bin/perf_report.sh +++ b/java/tools/bin/perf_report.sh @@ -18,9 +18,8 @@ # under the License. # -# This will run the 8 use cases defined below and produce -# a report in tabular format. Refer to the documentation -# for more details. +# This will run the following test cases defined below and produce +# a report in tabular format. SUB_MEM=-Xmx1024M PUB_MEM=-Xmx1024M @@ -82,7 +81,7 @@ echo "-------------------------------------------------------------------------- # setting very low values to start with and experiment while increasing them slowly. # Test 1 Trans Queue -#run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10" +run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10" # Test 2 Dura Queue run_testcase "Dura_Queue" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java index ac597d17de..c78c752eca 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java @@ -21,14 +21,10 @@ package org.apache.qpid.tools; import java.text.DecimalFormat; -import java.util.Hashtable; import javax.jms.Connection; -import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Session; -import javax.naming.Context; -import javax.naming.InitialContext; import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; @@ -42,13 +38,43 @@ public class PerfBase Destination feedbackDest; DecimalFormat df = new DecimalFormat("###.##"); + enum MessageType { + BYTES, TEXT, MAP, OBJECT; + + public static MessageType getType(String s) throws Exception + { + if ("text".equalsIgnoreCase(s)) + { + return TEXT; + } + else if ("bytes".equalsIgnoreCase(s)) + { + return BYTES; + } + /*else if ("map".equalsIgnoreCase(s)) + { + return MAP; + } + else if ("object".equalsIgnoreCase(s)) + { + return OBJECT; + }*/ + else + { + throw new Exception("Unsupported message type"); + } + } + }; + + MessageType msgType = MessageType.BYTES; + public PerfBase() { params = new TestParams(); } public void setUp() throws Exception - { + { if (params.getHost().equals("") || params.getPort() == -1) { @@ -63,6 +89,8 @@ public class PerfBase params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode()); dest = new AMQAnyDestination(params.getAddress()); + msgType = MessageType.getType(params.getMessageType()); + System.out.println("Using " + msgType + " messages"); } public void handleError(Exception e,String msg) diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java index 0ef0455a64..43ffce23cc 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.tools; +import java.util.ArrayList; +import java.util.List; + +import javax.jms.BytesMessage; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -87,6 +91,9 @@ public class PerfConsumer extends PerfBase implements MessageListener boolean transacted = false; int transSize = 0; + boolean printStdDev = false; + List<Long> sample; + final Object lock = new Object(); public PerfConsumer() @@ -102,6 +109,11 @@ public class PerfConsumer extends PerfBase implements MessageListener // Storing the following two for efficiency transacted = params.isTransacted(); transSize = params.getTransactionSize(); + printStdDev = params.isPrintStdDev(); + if (printStdDev) + { + sample = new ArrayList<Long>(params.getMsgCount()); + } } public void warmup()throws Exception @@ -112,19 +124,16 @@ public class PerfConsumer extends PerfBase implements MessageListener while (!start) { Message msg = consumer.receive(); - if (msg instanceof TextMessage) + if (msg.getBooleanProperty("End")) { - if (((TextMessage)msg).getText().equals("End")) + start = true; + MessageProducer temp = session.createProducer(msg.getJMSReplyTo()); + temp.send(session.createMessage()); + if (params.isTransacted()) { - start = true; - MessageProducer temp = session.createProducer(msg.getJMSReplyTo()); - temp.send(session.createMessage()); - if (params.isTransacted()) - { - session.commit(); - } - temp.close(); + session.commit(); } + temp.close(); } } } @@ -161,9 +170,25 @@ public class PerfConsumer extends PerfBase implements MessageListener System.out.println(new StringBuilder("Max Latency : "). append(maxLatency). append(" ms").toString()); + if (printStdDev) + { + System.out.println(new StringBuilder("Std Dev : "). + append(calculateStdDev(avgLatency)).toString()); + } System.out.println("Completed the test......\n"); } + public double calculateStdDev(double mean) + { + double v = 0; + for (double latency: sample) + { + v = v + Math.pow((latency-mean), 2); + } + v = v/sample.size(); + return Math.round(Math.sqrt(v)); + } + public void notifyCompletion(Destination replyTo) throws Exception { MessageProducer tmp = session.createProducer(replyTo); @@ -187,7 +212,13 @@ public class PerfConsumer extends PerfBase implements MessageListener { try { - if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End")) + // To figure out the decoding overhead of text + if (msgType == MessageType.TEXT) + { + ((TextMessage)msg).getText(); + } + + if (msg.getBooleanProperty("End")) { notifyCompletion(msg.getJMSReplyTo()); @@ -216,6 +247,10 @@ public class PerfConsumer extends PerfBase implements MessageListener maxLatency = Math.max(maxLatency, latency); minLatency = Math.min(minLatency, latency); totalLatency = totalLatency + latency; + if (printStdDev) + { + sample.add(latency); + } } } @@ -252,16 +287,16 @@ public class PerfConsumer extends PerfBase implements MessageListener cons.test(); } }; - + Thread t; try { - t = Threading.getThreadFactory().createThread(r); + t = Threading.getThreadFactory().createThread(r); } catch(Exception e) { throw new Error("Error creating consumer thread",e); } - t.start(); + t.start(); } }
\ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java index 6172a95c3d..6b8ba25d7f 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java @@ -54,6 +54,8 @@ import org.apache.qpid.thread.Threading; */ public class PerfProducer extends PerfBase { + private static long SEC = 60000; + MessageProducer producer; Message msg; Object payload; @@ -63,36 +65,8 @@ public class PerfProducer extends PerfBase boolean durable = false; Random random; int msgSizeRange = 1024; - - enum MessageType { - BYTES, TEXT, MAP, OBJECT; - - public static MessageType getType(String s) throws Exception - { - if ("text".equalsIgnoreCase(s)) - { - return TEXT; - } - else if ("bytes".equalsIgnoreCase(s)) - { - return BYTES; - } - /*else if ("map".equalsIgnoreCase(s)) - { - return MAP; - } - else if ("object".equalsIgnoreCase(s)) - { - return OBJECT; - }*/ - else - { - throw new Exception("Unsupported message type"); - } - } - }; - - MessageType msgType = MessageType.BYTES; + boolean rateLimitProducer = false; + double rateFactor = 0.4; public PerfProducer() { @@ -105,9 +79,11 @@ public class PerfProducer extends PerfBase feedbackDest = session.createTemporaryQueue(); durable = params.isDurable(); - msgType = MessageType.getType(params.getMessageType()); - - System.out.println("Using " + msgType + " messages"); + rateLimitProducer = params.getRate() > 0 ? true : false; + if (rateLimitProducer) + { + System.out.println("The test will attempt to limit the producer to " + params.getRate() + " msg/sec"); + } // if message caching is enabled we pre create the message // else we pre create the payload @@ -204,7 +180,8 @@ public class PerfProducer extends PerfBase { producer.send(getNextMessage()); } - Message msg = session.createTextMessage("End"); + Message msg = session.createMessage(); + msg.setBooleanProperty("End", true); msg.setJMSReplyTo(feedbackDest); producer.send(msg); @@ -230,16 +207,30 @@ public class PerfProducer extends PerfBase boolean transacted = params.isTransacted(); int tranSize = params.getTransactionSize(); + long limit = (long)(params.getRate() * rateFactor); + long timeLimit = (long)(SEC * rateFactor); + long start = System.currentTimeMillis(); + long interval = start; for(int i=0; i < count; i++ ) { Message msg = getNextMessage(); - msg.setJMSTimestamp(System.currentTimeMillis()); producer.send(msg); if ( transacted && ((i+1) % tranSize == 0)) { session.commit(); } + + if (rateLimitProducer && i%limit == 0) + { + long elapsed = System.currentTimeMillis() - interval; + if (elapsed < timeLimit) + { + Thread.sleep(elapsed); + } + interval = System.currentTimeMillis(); + + } } long time = System.currentTimeMillis() - start; double rate = ((double)count/(double)time)*1000; @@ -252,7 +243,8 @@ public class PerfProducer extends PerfBase public void waitForCompletion() throws Exception { MessageConsumer tmp = session.createConsumer(feedbackDest); - Message msg = session.createTextMessage("End"); + Message msg = session.createMessage(); + msg.setBooleanProperty("End", true); msg.setJMSReplyTo(feedbackDest); producer.send(msg); diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java index 80014d8ac0..c656ba3cfc 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java @@ -67,6 +67,10 @@ public class TestParams private String msgType = "bytes"; + private boolean printStdDev = false; + + private long rate = -1; + public TestParams() { @@ -88,6 +92,8 @@ public class TestParams warmup_count = Integer.getInteger("warmup_count",warmup_count); random_msg_size = Boolean.getBoolean("random_msg_size"); msgType = System.getProperty("msg_type","bytes"); + printStdDev = Boolean.getBoolean("print_std_dev"); + rate = Long.getLong("rate",-1); } public String getUrl() @@ -174,4 +180,14 @@ public class TestParams { return msgType; } + + public boolean isPrintStdDev() + { + return printStdDev; + } + + public long getRate() + { + return rate; + } } |