summaryrefslogtreecommitdiff
path: root/java/tools/src
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-07-30 02:22:35 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-07-30 02:22:35 +0000
commit2ad056ccb7801606e92778566159c23127125ed3 (patch)
tree22ae4a8240f7defdf037f3a4346ef06446001867 /java/tools/src
parent8523ced829d64862cfb8eede5db2172ddef243ff (diff)
downloadqpid-python-2ad056ccb7801606e92778566159c23127125ed3.tar.gz
QPID-3358 Modified the producer and consumer to support multiple iterations to ensure we can run the test for longer durations.
Also added support for creating unique destinations based on the default destination. This makes it easy to run multiple producers and consumers with their unique queue with little configuration. The code can make use of an externally specified prefix when creating these destinations, there by allowing scripts to provide meaningful names for identifying queues for debuging/diagnostic purposes. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1152412 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/tools/src')
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java12
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java66
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java98
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java82
4 files changed, 187 insertions, 71 deletions
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java b/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java
index b88b242e6d..90ee7e28ae 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java
@@ -77,7 +77,7 @@ public class LatencyTest extends PerfBase implements MessageListener
public LatencyTest()
{
- super();
+ super("");
warmedUp = lock.newCondition();
testCompleted = lock.newCondition();
// Storing the following two for efficiency
@@ -314,7 +314,7 @@ public class LatencyTest extends PerfBase implements MessageListener
public static void main(String[] args)
{
- final LatencyTest latencyTest = new LatencyTest();
+ final LatencyTest latencyTest = new LatencyTest();
Runnable r = new Runnable()
{
public void run()
@@ -334,16 +334,16 @@ public class LatencyTest extends PerfBase implements MessageListener
}
}
};
-
+
Thread t;
try
{
- t = Threading.getThreadFactory().createThread(r);
+ t = Threading.getThreadFactory().createThread(r);
}
catch(Exception e)
{
throw new Error("Error creating latency test thread",e);
}
- t.start();
+ t.start();
}
-} \ No newline at end of file
+}
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
index 340f11f5e4..121e94cea1 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.tools;
+import java.net.InetAddress;
import java.text.DecimalFormat;
import java.util.UUID;
@@ -32,6 +33,10 @@ import javax.jms.Session;
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession_0_10;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.messaging.Address;
public class PerfBase
{
@@ -57,11 +62,12 @@ public class PerfBase
Destination myControlQueue;
Destination controllerQueue;
DecimalFormat df = new DecimalFormat("###.##");
- String id = UUID.randomUUID().toString();
- String myControlQueueAddr = id + ";{create: always}";
+ String id;
+ String myControlQueueAddr;
MessageProducer sendToController;
MessageConsumer receiveFromController;
+ String prefix = "";
enum OPCode {
REGISTER_CONSUMER, REGISTER_PRODUCER,
@@ -69,7 +75,8 @@ public class PerfBase
CONSUMER_READY, PRODUCER_READY,
PRODUCER_START,
RECEIVED_END_MSG, CONSUMER_STOP,
- RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS
+ RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS,
+ CONTINUE_TEST, STOP_TEST
};
enum MessageType {
@@ -102,14 +109,24 @@ public class PerfBase
MessageType msgType = MessageType.BYTES;
- public PerfBase()
+ public PerfBase(String prefix)
{
params = new TestParams();
+ String host = "";
+ try
+ {
+ host = InetAddress.getLocalHost().getHostName();
+ }
+ catch (Exception e)
+ {
+ }
+ id = host + "-" + UUID.randomUUID().toString();
+ this.prefix = prefix;
+ this.myControlQueueAddr = id + ";{create: always}";
}
public void setUp() throws Exception
{
-
if (params.getHost().equals("") || params.getPort() == -1)
{
con = new AMQConnection(params.getUrl());
@@ -124,7 +141,7 @@ public class PerfBase
controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- dest = new AMQAnyDestination(params.getAddress());
+ dest = createDestination();
controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR);
myControlQueue = session.createQueue(myControlQueueAddr);
msgType = MessageType.getType(params.getMessageType());
@@ -134,9 +151,38 @@ public class PerfBase
receiveFromController = controllerSession.createConsumer(myControlQueue);
}
+ private Destination createDestination() throws Exception
+ {
+ if (params.isUseUniqueDests())
+ {
+ System.out.println("Prefix : " + prefix);
+ Address addr = Address.parse(params.getAddress());
+ AMQAnyDestination temp = new AMQAnyDestination(params.getAddress());
+ int type = ((AMQSession_0_10)session).resolveAddressType(temp);
+
+ if ( type == AMQDestination.TOPIC_TYPE)
+ {
+ addr = new Address(addr.getName(),addr.getSubject() + "." + prefix,addr.getOptions());
+ System.out.println("Setting subject : " + addr);
+ }
+ else
+ {
+ addr = new Address(addr.getName() + "_" + prefix,addr.getSubject(),addr.getOptions());
+ System.out.println("Setting name : " + addr);
+ }
+
+ return new AMQAnyDestination(addr);
+ }
+ else
+ {
+ return new AMQAnyDestination(params.getAddress());
+ }
+ }
+
public synchronized void sendMessageToController(MapMessage m) throws Exception
{
m.setString(ID, id);
+ m.setString(REPLY_ADDR,myControlQueueAddr);
sendToController.send(m);
}
@@ -152,6 +198,14 @@ public class PerfBase
}
+ public boolean continueTest() throws Exception
+ {
+ MapMessage m = (MapMessage)receiveFromController.receive();
+ OPCode code = OPCode.values()[m.getInt(CODE)];
+ System.out.println("Received Code : " + code);
+ return (code == OPCode.CONTINUE_TEST);
+ }
+
public void tearDown() throws Exception
{
session.close();
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
index ae439b7ce0..b63892bb51 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
@@ -22,6 +22,7 @@ package org.apache.qpid.tools;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import javax.jms.MapMessage;
import javax.jms.Message;
@@ -29,6 +30,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
+import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.thread.Threading;
/**
@@ -49,7 +51,7 @@ import org.apache.qpid.thread.Threading;
* b) They are on separate machines that have their time synced via a Time Server
*
* In order to calculate latency the producer inserts a timestamp
- * hen the message is sent. The consumer will note the current time the message is
+ * when the message is sent. The consumer will note the current time the message is
* received and will calculate the latency as follows
* latency = rcvdTime - msg.getJMSTimestamp()
*
@@ -57,13 +59,9 @@ import org.apache.qpid.thread.Threading;
* variance in latencies.
*
* Avg latency is measured by adding all latencies and dividing by the total msgs.
- * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount
*
* Throughput
* ===========
- * System throughput is calculated as follows
- * rcvdMsgCount/(rcvdTime - testStartTime)
- *
* Consumer rate is calculated as
* rcvdMsgCount/(rcvdTime - startTime)
*
@@ -83,7 +81,6 @@ public class PerfConsumer extends PerfBase implements MessageListener
long minLatency = Long.MAX_VALUE;
long totalLatency = 0; // to calculate avg latency.
int rcvdMsgCount = 0;
- long testStartTime = 0; // to measure system throughput
long startTime = 0; // to measure consumer throughput
long rcvdTime = 0;
boolean transacted = false;
@@ -94,9 +91,9 @@ public class PerfConsumer extends PerfBase implements MessageListener
final Object lock = new Object();
- public PerfConsumer()
+ public PerfConsumer(String prefix)
{
- super();
+ super(prefix);
System.out.println("Consumer ID : " + id);
}
@@ -104,26 +101,20 @@ public class PerfConsumer extends PerfBase implements MessageListener
{
super.setUp();
consumer = session.createConsumer(dest);
+ System.out.println("Consumer: " + id + " Receiving messages from : " + ((AMQDestination)dest).getQueueName() + "\n");
// Storing the following two for efficiency
transacted = params.isTransacted();
transSize = params.getTransactionSize();
printStdDev = params.isPrintStdDev();
- if (printStdDev)
- {
- sample = new ArrayList<Long>(params.getMsgCount());
- }
-
MapMessage m = controllerSession.createMapMessage();
m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal());
- m.setString(REPLY_ADDR,myControlQueueAddr);
sendMessageToController(m);
}
public void warmup()throws Exception
{
receiveFromController(OPCode.CONSUMER_STARTWARMUP);
- boolean start = false;
Message msg = consumer.receive();
// This is to ensure we drain the queue before we start the actual test.
while ( msg != null)
@@ -146,12 +137,26 @@ public class PerfConsumer extends PerfBase implements MessageListener
MapMessage m = controllerSession.createMapMessage();
m.setInt(CODE, OPCode.CONSUMER_READY.ordinal());
sendMessageToController(m);
+ consumer.setMessageListener(this);
}
public void startTest() throws Exception
{
- System.out.println("Consumer Starting test......");
- consumer.setMessageListener(this);
+ System.out.println("Consumer: " + id + " Starting test......" + "\n");
+ resetCounters();
+ }
+
+ public void resetCounters()
+ {
+ rcvdMsgCount = 0;
+ maxLatency = 0;
+ minLatency = Long.MAX_VALUE;
+ totalLatency = 0;
+ if (printStdDev)
+ {
+ sample = null;
+ sample = new ArrayList<Long>(params.getMsgCount());
+ }
}
public void sendResults() throws Exception
@@ -193,7 +198,6 @@ public class PerfConsumer extends PerfBase implements MessageListener
System.out.println(new StringBuilder("Std Dev : ").
append(stdDev/Clock.convertToMiliSecs()).toString());
}
- System.out.println("Consumer has completed the test......\n");
}
public double calculateStdDev(double mean)
@@ -262,8 +266,15 @@ public class PerfConsumer extends PerfBase implements MessageListener
{
setUp();
warmup();
- startTest();
- sendResults();
+ boolean nextIteration = true;
+ while (nextIteration)
+ {
+ System.out.println("=========================================================\n");
+ System.out.println("Consumer: " + id + " starting a new iteration ......\n");
+ startTest();
+ sendResults();
+ nextIteration = continueTest();
+ }
tearDown();
}
catch(Exception e)
@@ -272,26 +283,43 @@ public class PerfConsumer extends PerfBase implements MessageListener
}
}
- public static void main(String[] args)
+ @Override
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ public static void main(String[] args) throws InterruptedException
{
- final PerfConsumer cons = new PerfConsumer();
- Runnable r = new Runnable()
+ String scriptId = (args.length == 1) ? args[0] : "";
+ int conCount = Integer.getInteger("con_count",1);
+ final CountDownLatch testCompleted = new CountDownLatch(conCount);
+ for (int i=0; i < conCount; i++)
{
- public void run()
+
+ final PerfConsumer cons = new PerfConsumer(scriptId + i);
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ cons.run();
+ testCompleted.countDown();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
{
- cons.run();
+ throw new Error("Error creating consumer thread",e);
}
- };
+ t.start();
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating consumer thread",e);
}
- t.start();
+ testCompleted.await();
+ System.out.println("Consumers have completed the test......\n");
}
} \ No newline at end of file
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
index 4cecd6f4df..ac6129ab68 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
@@ -23,6 +23,7 @@ package org.apache.qpid.tools;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.CountDownLatch;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
@@ -30,6 +31,7 @@ import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
+import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.thread.Threading;
/**
@@ -51,6 +53,12 @@ import org.apache.qpid.thread.Threading;
* System throughput and latencies calculated by the PerfConsumer are more realistic
* numbers.
*
+ * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs
+ * I have done so far, it seems quite useful to compute the producer rate as it gives an
+ * indication of how the system behaves. For ex if there is a gap between producer and consumer rates
+ * you could clearly see the higher latencies and when producer and consumer rates are very close,
+ * latency is good.
+ *
*/
public class PerfProducer extends PerfBase
{
@@ -69,9 +77,9 @@ public class PerfProducer extends PerfBase
double rateFactor = 0.4;
double rate = 0.0;
- public PerfProducer()
+ public PerfProducer(String prefix)
{
- super();
+ super(prefix);
System.out.println("Producer ID : " + id);
}
@@ -114,12 +122,12 @@ public class PerfProducer extends PerfBase
}
producer = session.createProducer(dest);
+ System.out.println("Producer: " + id + " Sending messages to: " + ((AMQDestination)dest).getQueueName());
producer.setDisableMessageID(params.isDisableMessageID());
producer.setDisableMessageTimestamp(params.isDisableTimestamp());
MapMessage m = controllerSession.createMapMessage();
m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal());
- m.setString(REPLY_ADDR,myControlQueueAddr);
sendMessageToController(m);
}
@@ -178,7 +186,7 @@ public class PerfProducer extends PerfBase
public void warmup()throws Exception
{
receiveFromController(OPCode.PRODUCER_STARTWARMUP);
- System.out.println("Producer Warming up......");
+ System.out.println("Producer: " + id + " Warming up......");
for (int i=0; i < params.getWarmupCount() -1; i++)
{
@@ -194,6 +202,7 @@ public class PerfProducer extends PerfBase
public void startTest() throws Exception
{
+ resetCounters();
receiveFromController(OPCode.PRODUCER_START);
int count = params.getMsgCount();
boolean transacted = params.isTransacted();
@@ -236,8 +245,11 @@ public class PerfProducer extends PerfBase
append(df.format(rate)).
append(" msg/sec").
toString());
+ }
+
+ public void resetCounters()
+ {
- System.out.println("Producer has completed the test......");
}
public void sendEndMessage() throws Exception
@@ -255,14 +267,27 @@ public class PerfProducer extends PerfBase
sendMessageToController(msg);
}
+ @Override
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
public void run()
{
try
{
setUp();
warmup();
- startTest();
- sendResults();
+ boolean nextIteration = true;
+ while (nextIteration)
+ {
+ System.out.println("=========================================================\n");
+ System.out.println("Producer: " + id + " starting a new iteration ......\n");
+ startTest();
+ sendResults();
+ nextIteration = continueTest();
+ }
tearDown();
}
catch(Exception e)
@@ -298,27 +323,36 @@ public class PerfProducer extends PerfBase
}
- public static void main(String[] args)
+ public static void main(String[] args) throws InterruptedException
{
- final PerfProducer prod = new PerfProducer();
- prod.startControllerIfNeeded();
- Runnable r = new Runnable()
+ String scriptId = (args.length == 1) ? args[0] : "";
+ int conCount = Integer.getInteger("con_count",1);
+ final CountDownLatch testCompleted = new CountDownLatch(conCount);
+ for (int i=0; i < conCount; i++)
{
- public void run()
+ final PerfProducer prod = new PerfProducer(scriptId + i);
+ prod.startControllerIfNeeded();
+ Runnable r = new Runnable()
{
- prod.run();
- }
- };
+ public void run()
+ {
+ prod.run();
+ testCompleted.countDown();
+ }
+ };
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating producer thread",e);
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating producer thread",e);
+ }
+ t.start();
}
- t.start();
+ testCompleted.await();
+ System.out.println("Producers have completed the test......");
}
} \ No newline at end of file