summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-07-13 22:42:46 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-07-13 22:42:46 +0000
commit05ebb6d1625b94e240648c2d251f3c524c7ab7d7 (patch)
tree94b029d92d15d80ba71e4c0df0b9e2511c7aa601
parent6f97615e2ed577dd12f6ed677680feb24ce350dc (diff)
downloadqpid-python-05ebb6d1625b94e240648c2d251f3c524c7ab7d7.tar.gz
QPID-3358 Added the ability to specify a message type (ex bytes, text) and also calculate standard deviation for the latency sample.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1146508 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xjava/tools/bin/perf_report.sh7
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java38
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java63
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java64
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/TestParams.java16
5 files changed, 129 insertions, 59 deletions
diff --git a/java/tools/bin/perf_report.sh b/java/tools/bin/perf_report.sh
index e6b4c987e5..dd80f8c72c 100755
--- a/java/tools/bin/perf_report.sh
+++ b/java/tools/bin/perf_report.sh
@@ -18,9 +18,8 @@
# under the License.
#
-# This will run the 8 use cases defined below and produce
-# a report in tabular format. Refer to the documentation
-# for more details.
+# This will run the following test cases defined below and produce
+# a report in tabular format.
SUB_MEM=-Xmx1024M
PUB_MEM=-Xmx1024M
@@ -82,7 +81,7 @@ echo "--------------------------------------------------------------------------
# setting very low values to start with and experiment while increasing them slowly.
# Test 1 Trans Queue
-#run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10"
+run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10"
# Test 2 Dura Queue
run_testcase "Dura_Queue" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
index ac597d17de..c78c752eca 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
@@ -21,14 +21,10 @@
package org.apache.qpid.tools;
import java.text.DecimalFormat;
-import java.util.Hashtable;
import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Session;
-import javax.naming.Context;
-import javax.naming.InitialContext;
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;
@@ -42,13 +38,43 @@ public class PerfBase
Destination feedbackDest;
DecimalFormat df = new DecimalFormat("###.##");
+ enum MessageType {
+ BYTES, TEXT, MAP, OBJECT;
+
+ public static MessageType getType(String s) throws Exception
+ {
+ if ("text".equalsIgnoreCase(s))
+ {
+ return TEXT;
+ }
+ else if ("bytes".equalsIgnoreCase(s))
+ {
+ return BYTES;
+ }
+ /*else if ("map".equalsIgnoreCase(s))
+ {
+ return MAP;
+ }
+ else if ("object".equalsIgnoreCase(s))
+ {
+ return OBJECT;
+ }*/
+ else
+ {
+ throw new Exception("Unsupported message type");
+ }
+ }
+ };
+
+ MessageType msgType = MessageType.BYTES;
+
public PerfBase()
{
params = new TestParams();
}
public void setUp() throws Exception
- {
+ {
if (params.getHost().equals("") || params.getPort() == -1)
{
@@ -63,6 +89,8 @@ public class PerfBase
params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode());
dest = new AMQAnyDestination(params.getAddress());
+ msgType = MessageType.getType(params.getMessageType());
+ System.out.println("Using " + msgType + " messages");
}
public void handleError(Exception e,String msg)
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
index 0ef0455a64..43ffce23cc 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.tools;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -87,6 +91,9 @@ public class PerfConsumer extends PerfBase implements MessageListener
boolean transacted = false;
int transSize = 0;
+ boolean printStdDev = false;
+ List<Long> sample;
+
final Object lock = new Object();
public PerfConsumer()
@@ -102,6 +109,11 @@ public class PerfConsumer extends PerfBase implements MessageListener
// Storing the following two for efficiency
transacted = params.isTransacted();
transSize = params.getTransactionSize();
+ printStdDev = params.isPrintStdDev();
+ if (printStdDev)
+ {
+ sample = new ArrayList<Long>(params.getMsgCount());
+ }
}
public void warmup()throws Exception
@@ -112,19 +124,16 @@ public class PerfConsumer extends PerfBase implements MessageListener
while (!start)
{
Message msg = consumer.receive();
- if (msg instanceof TextMessage)
+ if (msg.getBooleanProperty("End"))
{
- if (((TextMessage)msg).getText().equals("End"))
+ start = true;
+ MessageProducer temp = session.createProducer(msg.getJMSReplyTo());
+ temp.send(session.createMessage());
+ if (params.isTransacted())
{
- start = true;
- MessageProducer temp = session.createProducer(msg.getJMSReplyTo());
- temp.send(session.createMessage());
- if (params.isTransacted())
- {
- session.commit();
- }
- temp.close();
+ session.commit();
}
+ temp.close();
}
}
}
@@ -161,9 +170,25 @@ public class PerfConsumer extends PerfBase implements MessageListener
System.out.println(new StringBuilder("Max Latency : ").
append(maxLatency).
append(" ms").toString());
+ if (printStdDev)
+ {
+ System.out.println(new StringBuilder("Std Dev : ").
+ append(calculateStdDev(avgLatency)).toString());
+ }
System.out.println("Completed the test......\n");
}
+ public double calculateStdDev(double mean)
+ {
+ double v = 0;
+ for (double latency: sample)
+ {
+ v = v + Math.pow((latency-mean), 2);
+ }
+ v = v/sample.size();
+ return Math.round(Math.sqrt(v));
+ }
+
public void notifyCompletion(Destination replyTo) throws Exception
{
MessageProducer tmp = session.createProducer(replyTo);
@@ -187,7 +212,13 @@ public class PerfConsumer extends PerfBase implements MessageListener
{
try
{
- if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End"))
+ // To figure out the decoding overhead of text
+ if (msgType == MessageType.TEXT)
+ {
+ ((TextMessage)msg).getText();
+ }
+
+ if (msg.getBooleanProperty("End"))
{
notifyCompletion(msg.getJMSReplyTo());
@@ -216,6 +247,10 @@ public class PerfConsumer extends PerfBase implements MessageListener
maxLatency = Math.max(maxLatency, latency);
minLatency = Math.min(minLatency, latency);
totalLatency = totalLatency + latency;
+ if (printStdDev)
+ {
+ sample.add(latency);
+ }
}
}
@@ -252,16 +287,16 @@ public class PerfConsumer extends PerfBase implements MessageListener
cons.test();
}
};
-
+
Thread t;
try
{
- t = Threading.getThreadFactory().createThread(r);
+ t = Threading.getThreadFactory().createThread(r);
}
catch(Exception e)
{
throw new Error("Error creating consumer thread",e);
}
- t.start();
+ t.start();
}
} \ No newline at end of file
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
index 6172a95c3d..6b8ba25d7f 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
@@ -54,6 +54,8 @@ import org.apache.qpid.thread.Threading;
*/
public class PerfProducer extends PerfBase
{
+ private static long SEC = 60000;
+
MessageProducer producer;
Message msg;
Object payload;
@@ -63,36 +65,8 @@ public class PerfProducer extends PerfBase
boolean durable = false;
Random random;
int msgSizeRange = 1024;
-
- enum MessageType {
- BYTES, TEXT, MAP, OBJECT;
-
- public static MessageType getType(String s) throws Exception
- {
- if ("text".equalsIgnoreCase(s))
- {
- return TEXT;
- }
- else if ("bytes".equalsIgnoreCase(s))
- {
- return BYTES;
- }
- /*else if ("map".equalsIgnoreCase(s))
- {
- return MAP;
- }
- else if ("object".equalsIgnoreCase(s))
- {
- return OBJECT;
- }*/
- else
- {
- throw new Exception("Unsupported message type");
- }
- }
- };
-
- MessageType msgType = MessageType.BYTES;
+ boolean rateLimitProducer = false;
+ double rateFactor = 0.4;
public PerfProducer()
{
@@ -105,9 +79,11 @@ public class PerfProducer extends PerfBase
feedbackDest = session.createTemporaryQueue();
durable = params.isDurable();
- msgType = MessageType.getType(params.getMessageType());
-
- System.out.println("Using " + msgType + " messages");
+ rateLimitProducer = params.getRate() > 0 ? true : false;
+ if (rateLimitProducer)
+ {
+ System.out.println("The test will attempt to limit the producer to " + params.getRate() + " msg/sec");
+ }
// if message caching is enabled we pre create the message
// else we pre create the payload
@@ -204,7 +180,8 @@ public class PerfProducer extends PerfBase
{
producer.send(getNextMessage());
}
- Message msg = session.createTextMessage("End");
+ Message msg = session.createMessage();
+ msg.setBooleanProperty("End", true);
msg.setJMSReplyTo(feedbackDest);
producer.send(msg);
@@ -230,16 +207,30 @@ public class PerfProducer extends PerfBase
boolean transacted = params.isTransacted();
int tranSize = params.getTransactionSize();
+ long limit = (long)(params.getRate() * rateFactor);
+ long timeLimit = (long)(SEC * rateFactor);
+
long start = System.currentTimeMillis();
+ long interval = start;
for(int i=0; i < count; i++ )
{
Message msg = getNextMessage();
- msg.setJMSTimestamp(System.currentTimeMillis());
producer.send(msg);
if ( transacted && ((i+1) % tranSize == 0))
{
session.commit();
}
+
+ if (rateLimitProducer && i%limit == 0)
+ {
+ long elapsed = System.currentTimeMillis() - interval;
+ if (elapsed < timeLimit)
+ {
+ Thread.sleep(elapsed);
+ }
+ interval = System.currentTimeMillis();
+
+ }
}
long time = System.currentTimeMillis() - start;
double rate = ((double)count/(double)time)*1000;
@@ -252,7 +243,8 @@ public class PerfProducer extends PerfBase
public void waitForCompletion() throws Exception
{
MessageConsumer tmp = session.createConsumer(feedbackDest);
- Message msg = session.createTextMessage("End");
+ Message msg = session.createMessage();
+ msg.setBooleanProperty("End", true);
msg.setJMSReplyTo(feedbackDest);
producer.send(msg);
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
index 80014d8ac0..c656ba3cfc 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
@@ -67,6 +67,10 @@ public class TestParams
private String msgType = "bytes";
+ private boolean printStdDev = false;
+
+ private long rate = -1;
+
public TestParams()
{
@@ -88,6 +92,8 @@ public class TestParams
warmup_count = Integer.getInteger("warmup_count",warmup_count);
random_msg_size = Boolean.getBoolean("random_msg_size");
msgType = System.getProperty("msg_type","bytes");
+ printStdDev = Boolean.getBoolean("print_std_dev");
+ rate = Long.getLong("rate",-1);
}
public String getUrl()
@@ -174,4 +180,14 @@ public class TestParams
{
return msgType;
}
+
+ public boolean isPrintStdDev()
+ {
+ return printStdDev;
+ }
+
+ public long getRate()
+ {
+ return rate;
+ }
}