summaryrefslogtreecommitdiff
path: root/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java')
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java224
1 files changed, 83 insertions, 141 deletions
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
index b63892bb51..0ef0455a64 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
@@ -20,17 +20,13 @@
*/
package org.apache.qpid.tools;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.MapMessage;
+import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
import javax.jms.TextMessage;
-import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.thread.Threading;
/**
@@ -51,7 +47,7 @@ import org.apache.qpid.thread.Threading;
* b) They are on separate machines that have their time synced via a Time Server
*
* In order to calculate latency the producer inserts a timestamp
- * when the message is sent. The consumer will note the current time the message is
+ * hen the message is sent. The consumer will note the current time the message is
* received and will calculate the latency as follows
* latency = rcvdTime - msg.getJMSTimestamp()
*
@@ -59,9 +55,13 @@ import org.apache.qpid.thread.Threading;
* variance in latencies.
*
* Avg latency is measured by adding all latencies and dividing by the total msgs.
+ * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount
*
* Throughput
* ===========
+ * System throughput is calculated as follows
+ * rcvdMsgCount/(rcvdTime - testStartTime)
+ *
* Consumer rate is calculated as
* rcvdMsgCount/(rcvdTime - startTime)
*
@@ -81,160 +81,130 @@ public class PerfConsumer extends PerfBase implements MessageListener
long minLatency = Long.MAX_VALUE;
long totalLatency = 0; // to calculate avg latency.
int rcvdMsgCount = 0;
+ long testStartTime = 0; // to measure system throughput
long startTime = 0; // to measure consumer throughput
long rcvdTime = 0;
boolean transacted = false;
int transSize = 0;
- boolean printStdDev = false;
- List<Long> sample;
-
final Object lock = new Object();
- public PerfConsumer(String prefix)
+ public PerfConsumer()
{
- super(prefix);
- System.out.println("Consumer ID : " + id);
+ super();
}
public void setUp() throws Exception
{
super.setUp();
consumer = session.createConsumer(dest);
- System.out.println("Consumer: " + id + " Receiving messages from : " + ((AMQDestination)dest).getQueueName() + "\n");
// Storing the following two for efficiency
transacted = params.isTransacted();
transSize = params.getTransactionSize();
- printStdDev = params.isPrintStdDev();
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal());
- sendMessageToController(m);
}
public void warmup()throws Exception
{
- receiveFromController(OPCode.CONSUMER_STARTWARMUP);
- Message msg = consumer.receive();
- // This is to ensure we drain the queue before we start the actual test.
- while ( msg != null)
+ System.out.println("Warming up......");
+
+ boolean start = false;
+ while (!start)
{
- if (msg.getBooleanProperty("End") == true)
+ Message msg = consumer.receive();
+ if (msg instanceof TextMessage)
{
- // It's more realistic for the consumer to signal this.
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.PRODUCER_READY.ordinal());
- sendMessageToController(m);
+ if (((TextMessage)msg).getText().equals("End"))
+ {
+ start = true;
+ MessageProducer temp = session.createProducer(msg.getJMSReplyTo());
+ temp.send(session.createMessage());
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+ temp.close();
+ }
}
- msg = consumer.receive(1000);
- }
-
- if (params.isTransacted())
- {
- session.commit();
}
-
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.CONSUMER_READY.ordinal());
- sendMessageToController(m);
- consumer.setMessageListener(this);
}
public void startTest() throws Exception
{
- System.out.println("Consumer: " + id + " Starting test......" + "\n");
- resetCounters();
+ System.out.println("Starting test......");
+ consumer.setMessageListener(this);
}
- public void resetCounters()
+ public void printResults() throws Exception
{
- rcvdMsgCount = 0;
- maxLatency = 0;
- minLatency = Long.MAX_VALUE;
- totalLatency = 0;
- if (printStdDev)
+ synchronized (lock)
{
- sample = null;
- sample = new ArrayList<Long>(params.getMsgCount());
+ lock.wait();
}
- }
-
- public void sendResults() throws Exception
- {
- receiveFromController(OPCode.CONSUMER_STOP);
double avgLatency = (double)totalLatency/(double)rcvdMsgCount;
- double consRate = (double)rcvdMsgCount*Clock.convertToSecs()/(double)(rcvdTime - startTime);
- double stdDev = 0.0;
- if (printStdDev)
- {
- stdDev = calculateStdDev(avgLatency);
- }
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal());
- m.setDouble(AVG_LATENCY, avgLatency/Clock.convertToMiliSecs());
- m.setDouble(MIN_LATENCY,minLatency/Clock.convertToMiliSecs());
- m.setDouble(MAX_LATENCY,maxLatency/Clock.convertToMiliSecs());
- m.setDouble(STD_DEV, stdDev/Clock.convertToMiliSecs());
- m.setDouble(CONS_RATE, consRate);
- m.setLong(MSG_COUNT, rcvdMsgCount);
- sendMessageToController(m);
-
+ double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000;
+ double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000;
System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
System.out.println(new StringBuilder("Consumer rate : ").
append(df.format(consRate)).
append(" msg/sec").toString());
+ System.out.println(new StringBuilder("System Throughput : ").
+ append(df.format(throughput)).
+ append(" msg/sec").toString());
System.out.println(new StringBuilder("Avg Latency : ").
- append(df.format(avgLatency/Clock.convertToMiliSecs())).
+ append(df.format(avgLatency)).
append(" ms").toString());
System.out.println(new StringBuilder("Min Latency : ").
- append(df.format(minLatency/Clock.convertToMiliSecs())).
+ append(minLatency).
append(" ms").toString());
System.out.println(new StringBuilder("Max Latency : ").
- append(df.format(maxLatency/Clock.convertToMiliSecs())).
+ append(maxLatency).
append(" ms").toString());
- if (printStdDev)
- {
- System.out.println(new StringBuilder("Std Dev : ").
- append(stdDev/Clock.convertToMiliSecs()).toString());
- }
+ System.out.println("Completed the test......\n");
}
- public double calculateStdDev(double mean)
+ public void notifyCompletion(Destination replyTo) throws Exception
{
- double v = 0;
- for (double latency: sample)
+ MessageProducer tmp = session.createProducer(replyTo);
+ Message endMsg = session.createMessage();
+ tmp.send(endMsg);
+ if (params.isTransacted())
{
- v = v + Math.pow((latency-mean), 2);
+ session.commit();
}
- v = v/sample.size();
- return Math.round(Math.sqrt(v));
+ tmp.close();
+ }
+
+ public void tearDown() throws Exception
+ {
+ consumer.close();
+ session.close();
+ con.close();
}
public void onMessage(Message msg)
{
try
{
- // To figure out the decoding overhead of text
- if (msgType == MessageType.TEXT)
+ if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End"))
{
- ((TextMessage)msg).getText();
- }
+ notifyCompletion(msg.getJMSReplyTo());
- if (msg.getBooleanProperty("End"))
- {
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal());
- sendMessageToController(m);
+ synchronized (lock)
+ {
+ lock.notifyAll();
+ }
}
else
{
- rcvdTime = Clock.getTime();
+ rcvdTime = System.currentTimeMillis();
rcvdMsgCount ++;
if (rcvdMsgCount == 1)
{
startTime = rcvdTime;
+ testStartTime = msg.getJMSTimestamp();
}
if (transacted && (rcvdMsgCount % transSize == 0))
@@ -242,14 +212,10 @@ public class PerfConsumer extends PerfBase implements MessageListener
session.commit();
}
- long latency = rcvdTime - msg.getLongProperty(TIMESTAMP);
+ long latency = rcvdTime - msg.getJMSTimestamp();
maxLatency = Math.max(maxLatency, latency);
minLatency = Math.min(minLatency, latency);
totalLatency = totalLatency + latency;
- if (printStdDev)
- {
- sample.add(latency);
- }
}
}
@@ -260,21 +226,14 @@ public class PerfConsumer extends PerfBase implements MessageListener
}
- public void run()
+ public void test()
{
try
{
setUp();
warmup();
- boolean nextIteration = true;
- while (nextIteration)
- {
- System.out.println("=========================================================\n");
- System.out.println("Consumer: " + id + " starting a new iteration ......\n");
- startTest();
- sendResults();
- nextIteration = continueTest();
- }
+ startTest();
+ printResults();
tearDown();
}
catch(Exception e)
@@ -283,43 +242,26 @@ public class PerfConsumer extends PerfBase implements MessageListener
}
}
- @Override
- public void tearDown() throws Exception
- {
- super.tearDown();
- }
-
- public static void main(String[] args) throws InterruptedException
+ public static void main(String[] args)
{
- String scriptId = (args.length == 1) ? args[0] : "";
- int conCount = Integer.getInteger("con_count",1);
- final CountDownLatch testCompleted = new CountDownLatch(conCount);
- for (int i=0; i < conCount; i++)
+ final PerfConsumer cons = new PerfConsumer();
+ Runnable r = new Runnable()
{
-
- final PerfConsumer cons = new PerfConsumer(scriptId + i);
- Runnable r = new Runnable()
- {
- public void run()
- {
- cons.run();
- testCompleted.countDown();
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
+ public void run()
{
- throw new Error("Error creating consumer thread",e);
+ cons.test();
}
- t.start();
-
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating consumer thread",e);
}
- testCompleted.await();
- System.out.println("Consumers have completed the test......\n");
+ t.start();
}
} \ No newline at end of file