diff options
Diffstat (limited to 'qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java')
-rw-r--r-- | qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java | 262 |
1 files changed, 262 insertions, 0 deletions
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java new file mode 100644 index 0000000000..015d1e6205 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java @@ -0,0 +1,262 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; + +import org.apache.qpid.thread.Threading; + +/** + * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation + * from the consumer that it has successfully consumed them and ready to start the + * test. It will start sending y no of messages and each message will contain a time + * stamp. This will be used at the receiving end to measure the latency. + * + * This is done with the assumption that both consumer and producer are running on + * the same machine or different machines which have time synced using a time server. + * + * This test also calculates the producer rate as follows. + * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs) + * + * All throughput rates are given as msg/sec so the rates are multiplied by 1000. + * + * Rajith - Producer rate is not an accurate perf metric IMO. + * It is heavily inlfuenced by any in memory buffering. + * System throughput and latencies calculated by the PerfConsumer are more realistic + * numbers. + * + */ +public class PerfProducer extends PerfBase +{ + MessageProducer producer; + Message msg; + byte[] payload; + List<byte[]> payloads; + boolean cacheMsg = false; + boolean randomMsgSize = false; + boolean durable = false; + Random random; + int msgSizeRange = 1024; + + public PerfProducer() + { + super(); + } + + public void setUp() throws Exception + { + super.setUp(); + feedbackDest = session.createTemporaryQueue(); + + durable = params.isDurable(); + + // if message caching is enabled we pre create the message + // else we pre create the payload + if (params.isCacheMessage()) + { + cacheMsg = true; + + msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); + msg.setJMSDeliveryMode(durable? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + } + else if (params.isRandomMsgSize()) + { + random = new Random(20080921); + randomMsgSize = true; + msgSizeRange = params.getMsgSize(); + payloads = new ArrayList<byte[]>(msgSizeRange); + + for (int i=0; i < msgSizeRange; i++) + { + payloads.add(MessageFactory.createMessagePayload(i).getBytes()); + } + } + else + { + payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); + } + + producer = session.createProducer(dest); + producer.setDisableMessageID(params.isDisableMessageID()); + producer.setDisableMessageTimestamp(params.isDisableTimestamp()); + } + + protected Message getNextMessage() throws Exception + { + if (cacheMsg) + { + return msg; + } + else + { + msg = session.createBytesMessage(); + + if (!randomMsgSize) + { + ((BytesMessage)msg).writeBytes(payload); + } + else + { + ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange))); + } + msg.setJMSDeliveryMode(durable? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + return msg; + } + } + + public void warmup()throws Exception + { + System.out.println("Warming up......"); + MessageConsumer tmp = session.createConsumer(feedbackDest); + + for (int i=0; i < params.getWarmupCount() -1; i++) + { + producer.send(getNextMessage()); + } + Message msg = session.createTextMessage("End"); + msg.setJMSReplyTo(feedbackDest); + producer.send(msg); + + if (params.isTransacted()) + { + session.commit(); + } + + tmp.receive(); + + if (params.isTransacted()) + { + session.commit(); + } + + tmp.close(); + } + + public void startTest() throws Exception + { + System.out.println("Starting test......"); + int count = params.getMsgCount(); + boolean transacted = params.isTransacted(); + int tranSize = params.getTransactionSize(); + + long start = System.currentTimeMillis(); + for(int i=0; i < count; i++ ) + { + Message msg = getNextMessage(); + msg.setJMSTimestamp(System.currentTimeMillis()); + producer.send(msg); + if ( transacted && ((i+1) % tranSize == 0)) + { + session.commit(); + } + } + long time = System.currentTimeMillis() - start; + double rate = ((double)count/(double)time)*1000; + System.out.println(new StringBuilder("Producer rate: "). + append(df.format(rate)). + append(" msg/sec"). + toString()); + } + + public void waitForCompletion() throws Exception + { + MessageConsumer tmp = session.createConsumer(feedbackDest); + Message msg = session.createTextMessage("End"); + msg.setJMSReplyTo(feedbackDest); + producer.send(msg); + + if (params.isTransacted()) + { + session.commit(); + } + + tmp.receive(); + + if (params.isTransacted()) + { + session.commit(); + } + + tmp.close(); + System.out.println("Consumer has completed the test......"); + } + + public void tearDown() throws Exception + { + producer.close(); + session.close(); + con.close(); + } + + public void test() + { + try + { + setUp(); + warmup(); + startTest(); + waitForCompletion(); + tearDown(); + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + + public static void main(String[] args) + { + final PerfProducer prod = new PerfProducer(); + Runnable r = new Runnable() + { + public void run() + { + prod.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating producer thread",e); + } + t.start(); + } +}
\ No newline at end of file |