diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-07-30 02:22:35 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-07-30 02:22:35 +0000 |
commit | 2ad056ccb7801606e92778566159c23127125ed3 (patch) | |
tree | 22ae4a8240f7defdf037f3a4346ef06446001867 /java/tools/src | |
parent | 8523ced829d64862cfb8eede5db2172ddef243ff (diff) | |
download | qpid-python-2ad056ccb7801606e92778566159c23127125ed3.tar.gz |
QPID-3358 Modified the producer and consumer to support multiple iterations to ensure we can run the test for longer durations.
Also added support for creating unique destinations based on the default destination.
This makes it easy to run multiple producers and consumers with their
unique queue with little configuration. The code can make use of an externally specified prefix
when creating these destinations, there by allowing scripts to provide meaningful names for identifying
queues for debuging/diagnostic purposes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1152412 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/tools/src')
4 files changed, 187 insertions, 71 deletions
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java b/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java index b88b242e6d..90ee7e28ae 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java @@ -77,7 +77,7 @@ public class LatencyTest extends PerfBase implements MessageListener public LatencyTest() { - super(); + super(""); warmedUp = lock.newCondition(); testCompleted = lock.newCondition(); // Storing the following two for efficiency @@ -314,7 +314,7 @@ public class LatencyTest extends PerfBase implements MessageListener public static void main(String[] args) { - final LatencyTest latencyTest = new LatencyTest(); + final LatencyTest latencyTest = new LatencyTest(); Runnable r = new Runnable() { public void run() @@ -334,16 +334,16 @@ public class LatencyTest extends PerfBase implements MessageListener } } }; - + Thread t; try { - t = Threading.getThreadFactory().createThread(r); + t = Threading.getThreadFactory().createThread(r); } catch(Exception e) { throw new Error("Error creating latency test thread",e); } - t.start(); + t.start(); } -}
\ No newline at end of file +} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java index 340f11f5e4..121e94cea1 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.tools; +import java.net.InetAddress; import java.text.DecimalFormat; import java.util.UUID; @@ -32,6 +33,10 @@ import javax.jms.Session; import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession_0_10; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.messaging.Address; public class PerfBase { @@ -57,11 +62,12 @@ public class PerfBase Destination myControlQueue; Destination controllerQueue; DecimalFormat df = new DecimalFormat("###.##"); - String id = UUID.randomUUID().toString(); - String myControlQueueAddr = id + ";{create: always}"; + String id; + String myControlQueueAddr; MessageProducer sendToController; MessageConsumer receiveFromController; + String prefix = ""; enum OPCode { REGISTER_CONSUMER, REGISTER_PRODUCER, @@ -69,7 +75,8 @@ public class PerfBase CONSUMER_READY, PRODUCER_READY, PRODUCER_START, RECEIVED_END_MSG, CONSUMER_STOP, - RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS + RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS, + CONTINUE_TEST, STOP_TEST }; enum MessageType { @@ -102,14 +109,24 @@ public class PerfBase MessageType msgType = MessageType.BYTES; - public PerfBase() + public PerfBase(String prefix) { params = new TestParams(); + String host = ""; + try + { + host = InetAddress.getLocalHost().getHostName(); + } + catch (Exception e) + { + } + id = host + "-" + UUID.randomUUID().toString(); + this.prefix = prefix; + this.myControlQueueAddr = id + ";{create: always}"; } public void setUp() throws Exception { - if (params.getHost().equals("") || params.getPort() == -1) { con = new AMQConnection(params.getUrl()); @@ -124,7 +141,7 @@ public class PerfBase controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - dest = new AMQAnyDestination(params.getAddress()); + dest = createDestination(); controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR); myControlQueue = session.createQueue(myControlQueueAddr); msgType = MessageType.getType(params.getMessageType()); @@ -134,9 +151,38 @@ public class PerfBase receiveFromController = controllerSession.createConsumer(myControlQueue); } + private Destination createDestination() throws Exception + { + if (params.isUseUniqueDests()) + { + System.out.println("Prefix : " + prefix); + Address addr = Address.parse(params.getAddress()); + AMQAnyDestination temp = new AMQAnyDestination(params.getAddress()); + int type = ((AMQSession_0_10)session).resolveAddressType(temp); + + if ( type == AMQDestination.TOPIC_TYPE) + { + addr = new Address(addr.getName(),addr.getSubject() + "." + prefix,addr.getOptions()); + System.out.println("Setting subject : " + addr); + } + else + { + addr = new Address(addr.getName() + "_" + prefix,addr.getSubject(),addr.getOptions()); + System.out.println("Setting name : " + addr); + } + + return new AMQAnyDestination(addr); + } + else + { + return new AMQAnyDestination(params.getAddress()); + } + } + public synchronized void sendMessageToController(MapMessage m) throws Exception { m.setString(ID, id); + m.setString(REPLY_ADDR,myControlQueueAddr); sendToController.send(m); } @@ -152,6 +198,14 @@ public class PerfBase } + public boolean continueTest() throws Exception + { + MapMessage m = (MapMessage)receiveFromController.receive(); + OPCode code = OPCode.values()[m.getInt(CODE)]; + System.out.println("Received Code : " + code); + return (code == OPCode.CONTINUE_TEST); + } + public void tearDown() throws Exception { session.close(); diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java index ae439b7ce0..b63892bb51 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java @@ -22,6 +22,7 @@ package org.apache.qpid.tools; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; import javax.jms.MapMessage; import javax.jms.Message; @@ -29,6 +30,7 @@ import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.TextMessage; +import org.apache.qpid.client.AMQDestination; import org.apache.qpid.thread.Threading; /** @@ -49,7 +51,7 @@ import org.apache.qpid.thread.Threading; * b) They are on separate machines that have their time synced via a Time Server * * In order to calculate latency the producer inserts a timestamp - * hen the message is sent. The consumer will note the current time the message is + * when the message is sent. The consumer will note the current time the message is * received and will calculate the latency as follows * latency = rcvdTime - msg.getJMSTimestamp() * @@ -57,13 +59,9 @@ import org.apache.qpid.thread.Threading; * variance in latencies. * * Avg latency is measured by adding all latencies and dividing by the total msgs. - * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount * * Throughput * =========== - * System throughput is calculated as follows - * rcvdMsgCount/(rcvdTime - testStartTime) - * * Consumer rate is calculated as * rcvdMsgCount/(rcvdTime - startTime) * @@ -83,7 +81,6 @@ public class PerfConsumer extends PerfBase implements MessageListener long minLatency = Long.MAX_VALUE; long totalLatency = 0; // to calculate avg latency. int rcvdMsgCount = 0; - long testStartTime = 0; // to measure system throughput long startTime = 0; // to measure consumer throughput long rcvdTime = 0; boolean transacted = false; @@ -94,9 +91,9 @@ public class PerfConsumer extends PerfBase implements MessageListener final Object lock = new Object(); - public PerfConsumer() + public PerfConsumer(String prefix) { - super(); + super(prefix); System.out.println("Consumer ID : " + id); } @@ -104,26 +101,20 @@ public class PerfConsumer extends PerfBase implements MessageListener { super.setUp(); consumer = session.createConsumer(dest); + System.out.println("Consumer: " + id + " Receiving messages from : " + ((AMQDestination)dest).getQueueName() + "\n"); // Storing the following two for efficiency transacted = params.isTransacted(); transSize = params.getTransactionSize(); printStdDev = params.isPrintStdDev(); - if (printStdDev) - { - sample = new ArrayList<Long>(params.getMsgCount()); - } - MapMessage m = controllerSession.createMapMessage(); m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal()); - m.setString(REPLY_ADDR,myControlQueueAddr); sendMessageToController(m); } public void warmup()throws Exception { receiveFromController(OPCode.CONSUMER_STARTWARMUP); - boolean start = false; Message msg = consumer.receive(); // This is to ensure we drain the queue before we start the actual test. while ( msg != null) @@ -146,12 +137,26 @@ public class PerfConsumer extends PerfBase implements MessageListener MapMessage m = controllerSession.createMapMessage(); m.setInt(CODE, OPCode.CONSUMER_READY.ordinal()); sendMessageToController(m); + consumer.setMessageListener(this); } public void startTest() throws Exception { - System.out.println("Consumer Starting test......"); - consumer.setMessageListener(this); + System.out.println("Consumer: " + id + " Starting test......" + "\n"); + resetCounters(); + } + + public void resetCounters() + { + rcvdMsgCount = 0; + maxLatency = 0; + minLatency = Long.MAX_VALUE; + totalLatency = 0; + if (printStdDev) + { + sample = null; + sample = new ArrayList<Long>(params.getMsgCount()); + } } public void sendResults() throws Exception @@ -193,7 +198,6 @@ public class PerfConsumer extends PerfBase implements MessageListener System.out.println(new StringBuilder("Std Dev : "). append(stdDev/Clock.convertToMiliSecs()).toString()); } - System.out.println("Consumer has completed the test......\n"); } public double calculateStdDev(double mean) @@ -262,8 +266,15 @@ public class PerfConsumer extends PerfBase implements MessageListener { setUp(); warmup(); - startTest(); - sendResults(); + boolean nextIteration = true; + while (nextIteration) + { + System.out.println("=========================================================\n"); + System.out.println("Consumer: " + id + " starting a new iteration ......\n"); + startTest(); + sendResults(); + nextIteration = continueTest(); + } tearDown(); } catch(Exception e) @@ -272,26 +283,43 @@ public class PerfConsumer extends PerfBase implements MessageListener } } - public static void main(String[] args) + @Override + public void tearDown() throws Exception + { + super.tearDown(); + } + + public static void main(String[] args) throws InterruptedException { - final PerfConsumer cons = new PerfConsumer(); - Runnable r = new Runnable() + String scriptId = (args.length == 1) ? args[0] : ""; + int conCount = Integer.getInteger("con_count",1); + final CountDownLatch testCompleted = new CountDownLatch(conCount); + for (int i=0; i < conCount; i++) { - public void run() + + final PerfConsumer cons = new PerfConsumer(scriptId + i); + Runnable r = new Runnable() + { + public void run() + { + cons.run(); + testCompleted.countDown(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) { - cons.run(); + throw new Error("Error creating consumer thread",e); } - }; + t.start(); - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); } - t.start(); + testCompleted.await(); + System.out.println("Consumers have completed the test......\n"); } }
\ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java index 4cecd6f4df..ac6129ab68 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java @@ -23,6 +23,7 @@ package org.apache.qpid.tools; import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.concurrent.CountDownLatch; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; @@ -30,6 +31,7 @@ import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageProducer; +import org.apache.qpid.client.AMQDestination; import org.apache.qpid.thread.Threading; /** @@ -51,6 +53,12 @@ import org.apache.qpid.thread.Threading; * System throughput and latencies calculated by the PerfConsumer are more realistic * numbers. * + * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs + * I have done so far, it seems quite useful to compute the producer rate as it gives an + * indication of how the system behaves. For ex if there is a gap between producer and consumer rates + * you could clearly see the higher latencies and when producer and consumer rates are very close, + * latency is good. + * */ public class PerfProducer extends PerfBase { @@ -69,9 +77,9 @@ public class PerfProducer extends PerfBase double rateFactor = 0.4; double rate = 0.0; - public PerfProducer() + public PerfProducer(String prefix) { - super(); + super(prefix); System.out.println("Producer ID : " + id); } @@ -114,12 +122,12 @@ public class PerfProducer extends PerfBase } producer = session.createProducer(dest); + System.out.println("Producer: " + id + " Sending messages to: " + ((AMQDestination)dest).getQueueName()); producer.setDisableMessageID(params.isDisableMessageID()); producer.setDisableMessageTimestamp(params.isDisableTimestamp()); MapMessage m = controllerSession.createMapMessage(); m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal()); - m.setString(REPLY_ADDR,myControlQueueAddr); sendMessageToController(m); } @@ -178,7 +186,7 @@ public class PerfProducer extends PerfBase public void warmup()throws Exception { receiveFromController(OPCode.PRODUCER_STARTWARMUP); - System.out.println("Producer Warming up......"); + System.out.println("Producer: " + id + " Warming up......"); for (int i=0; i < params.getWarmupCount() -1; i++) { @@ -194,6 +202,7 @@ public class PerfProducer extends PerfBase public void startTest() throws Exception { + resetCounters(); receiveFromController(OPCode.PRODUCER_START); int count = params.getMsgCount(); boolean transacted = params.isTransacted(); @@ -236,8 +245,11 @@ public class PerfProducer extends PerfBase append(df.format(rate)). append(" msg/sec"). toString()); + } + + public void resetCounters() + { - System.out.println("Producer has completed the test......"); } public void sendEndMessage() throws Exception @@ -255,14 +267,27 @@ public class PerfProducer extends PerfBase sendMessageToController(msg); } + @Override + public void tearDown() throws Exception + { + super.tearDown(); + } + public void run() { try { setUp(); warmup(); - startTest(); - sendResults(); + boolean nextIteration = true; + while (nextIteration) + { + System.out.println("=========================================================\n"); + System.out.println("Producer: " + id + " starting a new iteration ......\n"); + startTest(); + sendResults(); + nextIteration = continueTest(); + } tearDown(); } catch(Exception e) @@ -298,27 +323,36 @@ public class PerfProducer extends PerfBase } - public static void main(String[] args) + public static void main(String[] args) throws InterruptedException { - final PerfProducer prod = new PerfProducer(); - prod.startControllerIfNeeded(); - Runnable r = new Runnable() + String scriptId = (args.length == 1) ? args[0] : ""; + int conCount = Integer.getInteger("con_count",1); + final CountDownLatch testCompleted = new CountDownLatch(conCount); + for (int i=0; i < conCount; i++) { - public void run() + final PerfProducer prod = new PerfProducer(scriptId + i); + prod.startControllerIfNeeded(); + Runnable r = new Runnable() { - prod.run(); - } - }; + public void run() + { + prod.run(); + testCompleted.countDown(); + } + }; - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating producer thread",e); + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating producer thread",e); + } + t.start(); } - t.start(); + testCompleted.await(); + System.out.println("Producers have completed the test......"); } }
\ No newline at end of file |