diff options
Diffstat (limited to 'qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java')
-rw-r--r-- | qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java | 256 |
1 files changed, 80 insertions, 176 deletions
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java index ac6129ab68..015d1e6205 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java @@ -23,15 +23,13 @@ package org.apache.qpid.tools; import java.util.ArrayList; import java.util.List; import java.util.Random; -import java.util.concurrent.CountDownLatch; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; -import javax.jms.MapMessage; import javax.jms.Message; +import javax.jms.MessageConsumer; import javax.jms.MessageProducer; -import org.apache.qpid.client.AMQDestination; import org.apache.qpid.thread.Threading; /** @@ -53,52 +51,38 @@ import org.apache.qpid.thread.Threading; * System throughput and latencies calculated by the PerfConsumer are more realistic * numbers. * - * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs - * I have done so far, it seems quite useful to compute the producer rate as it gives an - * indication of how the system behaves. For ex if there is a gap between producer and consumer rates - * you could clearly see the higher latencies and when producer and consumer rates are very close, - * latency is good. - * */ public class PerfProducer extends PerfBase { - private static long SEC = 60000; - MessageProducer producer; Message msg; - Object payload; - List<Object> payloads; + byte[] payload; + List<byte[]> payloads; boolean cacheMsg = false; boolean randomMsgSize = false; boolean durable = false; Random random; int msgSizeRange = 1024; - boolean rateLimitProducer = false; - double rateFactor = 0.4; - double rate = 0.0; - - public PerfProducer(String prefix) + + public PerfProducer() { - super(prefix); - System.out.println("Producer ID : " + id); + super(); } public void setUp() throws Exception { super.setUp(); - durable = params.isDurable(); - rateLimitProducer = params.getRate() > 0 ? true : false; - if (rateLimitProducer) - { - System.out.println("The test will attempt to limit the producer to " + params.getRate() + " msg/sec"); - } + feedbackDest = session.createTemporaryQueue(); + durable = params.isDurable(); + // if message caching is enabled we pre create the message // else we pre create the payload if (params.isCacheMessage()) { cacheMsg = true; - msg = createMessage(createPayload(params.getMsgSize())); + + msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); msg.setJMSDeliveryMode(durable? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT @@ -109,52 +93,21 @@ public class PerfProducer extends PerfBase random = new Random(20080921); randomMsgSize = true; msgSizeRange = params.getMsgSize(); - payloads = new ArrayList<Object>(msgSizeRange); - + payloads = new ArrayList<byte[]>(msgSizeRange); + for (int i=0; i < msgSizeRange; i++) { - payloads.add(createPayload(i)); + payloads.add(MessageFactory.createMessagePayload(i).getBytes()); } - } + } else { - payload = createPayload(params.getMsgSize()); + payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); } producer = session.createProducer(dest); - System.out.println("Producer: " + id + " Sending messages to: " + ((AMQDestination)dest).getQueueName()); producer.setDisableMessageID(params.isDisableMessageID()); producer.setDisableMessageTimestamp(params.isDisableTimestamp()); - - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal()); - sendMessageToController(m); - } - - Object createPayload(int size) - { - if (msgType == MessageType.TEXT) - { - return MessageFactory.createMessagePayload(size); - } - else - { - return MessageFactory.createMessagePayload(size).getBytes(); - } - } - - Message createMessage(Object payload) throws Exception - { - if (msgType == MessageType.TEXT) - { - return session.createTextMessage((String)payload); - } - else - { - BytesMessage m = session.createBytesMessage(); - m.writeBytes((byte[])payload); - return m; - } } protected Message getNextMessage() throws Exception @@ -164,130 +117,117 @@ public class PerfProducer extends PerfBase return msg; } else - { - Message m; - + { + msg = session.createBytesMessage(); + if (!randomMsgSize) { - m = createMessage(payload); + ((BytesMessage)msg).writeBytes(payload); } else { - m = createMessage(payloads.get(random.nextInt(msgSizeRange))); + ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange))); } - m.setJMSDeliveryMode(durable? + msg.setJMSDeliveryMode(durable? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT ); - return m; + return msg; } } public void warmup()throws Exception { - receiveFromController(OPCode.PRODUCER_STARTWARMUP); - System.out.println("Producer: " + id + " Warming up......"); + System.out.println("Warming up......"); + MessageConsumer tmp = session.createConsumer(feedbackDest); for (int i=0; i < params.getWarmupCount() -1; i++) { producer.send(getNextMessage()); } - sendEndMessage(); + Message msg = session.createTextMessage("End"); + msg.setJMSReplyTo(feedbackDest); + producer.send(msg); + + if (params.isTransacted()) + { + session.commit(); + } + + tmp.receive(); if (params.isTransacted()) { session.commit(); } + + tmp.close(); } public void startTest() throws Exception { - resetCounters(); - receiveFromController(OPCode.PRODUCER_START); + System.out.println("Starting test......"); int count = params.getMsgCount(); boolean transacted = params.isTransacted(); int tranSize = params.getTransactionSize(); - long limit = (long)(params.getRate() * rateFactor); // in msecs - long timeLimit = (long)(SEC * rateFactor); // in msecs - - long start = Clock.getTime(); // defaults to nano secs - long interval = start; + long start = System.currentTimeMillis(); for(int i=0; i < count; i++ ) { Message msg = getNextMessage(); - msg.setLongProperty(TIMESTAMP, Clock.getTime()); + msg.setJMSTimestamp(System.currentTimeMillis()); producer.send(msg); if ( transacted && ((i+1) % tranSize == 0)) { session.commit(); } - - if (rateLimitProducer && i%limit == 0) - { - long elapsed = (Clock.getTime() - interval)*Clock.convertToMiliSecs(); // in msecs - if (elapsed < timeLimit) - { - Thread.sleep(elapsed); - } - interval = Clock.getTime(); - - } - } - sendEndMessage(); - if ( transacted) - { - session.commit(); } - long time = Clock.getTime() - start; - rate = (double)count*Clock.convertToSecs()/(double)time; + long time = System.currentTimeMillis() - start; + double rate = ((double)count/(double)time)*1000; System.out.println(new StringBuilder("Producer rate: "). append(df.format(rate)). append(" msg/sec"). toString()); } - public void resetCounters() + public void waitForCompletion() throws Exception { + MessageConsumer tmp = session.createConsumer(feedbackDest); + Message msg = session.createTextMessage("End"); + msg.setJMSReplyTo(feedbackDest); + producer.send(msg); - } + if (params.isTransacted()) + { + session.commit(); + } - public void sendEndMessage() throws Exception - { - Message msg = session.createMessage(); - msg.setBooleanProperty("End", true); - producer.send(msg); - } + tmp.receive(); - public void sendResults() throws Exception - { - MapMessage msg = controllerSession.createMapMessage(); - msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal()); - msg.setDouble(PROD_RATE, rate); - sendMessageToController(msg); + if (params.isTransacted()) + { + session.commit(); + } + + tmp.close(); + System.out.println("Consumer has completed the test......"); } - @Override public void tearDown() throws Exception { - super.tearDown(); + producer.close(); + session.close(); + con.close(); } - public void run() + public void test() { try { setUp(); warmup(); - boolean nextIteration = true; - while (nextIteration) - { - System.out.println("=========================================================\n"); - System.out.println("Producer: " + id + " starting a new iteration ......\n"); - startTest(); - sendResults(); - nextIteration = continueTest(); - } + startTest(); + waitForCompletion(); tearDown(); } catch(Exception e) @@ -296,63 +236,27 @@ public class PerfProducer extends PerfBase } } - public void startControllerIfNeeded() + + public static void main(String[] args) { - if (!params.isExternalController()) + final PerfProducer prod = new PerfProducer(); + Runnable r = new Runnable() { - final PerfTestController controller = new PerfTestController(); - Runnable r = new Runnable() - { - public void run() - { - controller.run(); - } - }; - - Thread t; - try + public void run() { - t = Threading.getThreadFactory().createThread(r); + prod.test(); } - catch(Exception e) - { - throw new Error("Error creating controller thread",e); - } - t.start(); + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); } - } - - - public static void main(String[] args) throws InterruptedException - { - String scriptId = (args.length == 1) ? args[0] : ""; - int conCount = Integer.getInteger("con_count",1); - final CountDownLatch testCompleted = new CountDownLatch(conCount); - for (int i=0; i < conCount; i++) + catch(Exception e) { - final PerfProducer prod = new PerfProducer(scriptId + i); - prod.startControllerIfNeeded(); - Runnable r = new Runnable() - { - public void run() - { - prod.run(); - testCompleted.countDown(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating producer thread",e); - } - t.start(); + throw new Error("Error creating producer thread",e); } - testCompleted.await(); - System.out.println("Producers have completed the test......"); + t.start(); } }
\ No newline at end of file |