summaryrefslogtreecommitdiff
path: root/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java')
-rw-r--r--trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java262
1 files changed, 0 insertions, 262 deletions
diff --git a/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
deleted file mode 100644
index 015d1e6205..0000000000
--- a/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-
-import org.apache.qpid.thread.Threading;
-
-/**
- * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation
- * from the consumer that it has successfully consumed them and ready to start the
- * test. It will start sending y no of messages and each message will contain a time
- * stamp. This will be used at the receiving end to measure the latency.
- *
- * This is done with the assumption that both consumer and producer are running on
- * the same machine or different machines which have time synced using a time server.
- *
- * This test also calculates the producer rate as follows.
- * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs)
- *
- * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
- *
- * Rajith - Producer rate is not an accurate perf metric IMO.
- * It is heavily inlfuenced by any in memory buffering.
- * System throughput and latencies calculated by the PerfConsumer are more realistic
- * numbers.
- *
- */
-public class PerfProducer extends PerfBase
-{
- MessageProducer producer;
- Message msg;
- byte[] payload;
- List<byte[]> payloads;
- boolean cacheMsg = false;
- boolean randomMsgSize = false;
- boolean durable = false;
- Random random;
- int msgSizeRange = 1024;
-
- public PerfProducer()
- {
- super();
- }
-
- public void setUp() throws Exception
- {
- super.setUp();
- feedbackDest = session.createTemporaryQueue();
-
- durable = params.isDurable();
-
- // if message caching is enabled we pre create the message
- // else we pre create the payload
- if (params.isCacheMessage())
- {
- cacheMsg = true;
-
- msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
- msg.setJMSDeliveryMode(durable?
- DeliveryMode.PERSISTENT :
- DeliveryMode.NON_PERSISTENT
- );
- }
- else if (params.isRandomMsgSize())
- {
- random = new Random(20080921);
- randomMsgSize = true;
- msgSizeRange = params.getMsgSize();
- payloads = new ArrayList<byte[]>(msgSizeRange);
-
- for (int i=0; i < msgSizeRange; i++)
- {
- payloads.add(MessageFactory.createMessagePayload(i).getBytes());
- }
- }
- else
- {
- payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
- }
-
- producer = session.createProducer(dest);
- producer.setDisableMessageID(params.isDisableMessageID());
- producer.setDisableMessageTimestamp(params.isDisableTimestamp());
- }
-
- protected Message getNextMessage() throws Exception
- {
- if (cacheMsg)
- {
- return msg;
- }
- else
- {
- msg = session.createBytesMessage();
-
- if (!randomMsgSize)
- {
- ((BytesMessage)msg).writeBytes(payload);
- }
- else
- {
- ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange)));
- }
- msg.setJMSDeliveryMode(durable?
- DeliveryMode.PERSISTENT :
- DeliveryMode.NON_PERSISTENT
- );
- return msg;
- }
- }
-
- public void warmup()throws Exception
- {
- System.out.println("Warming up......");
- MessageConsumer tmp = session.createConsumer(feedbackDest);
-
- for (int i=0; i < params.getWarmupCount() -1; i++)
- {
- producer.send(getNextMessage());
- }
- Message msg = session.createTextMessage("End");
- msg.setJMSReplyTo(feedbackDest);
- producer.send(msg);
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- tmp.receive();
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- tmp.close();
- }
-
- public void startTest() throws Exception
- {
- System.out.println("Starting test......");
- int count = params.getMsgCount();
- boolean transacted = params.isTransacted();
- int tranSize = params.getTransactionSize();
-
- long start = System.currentTimeMillis();
- for(int i=0; i < count; i++ )
- {
- Message msg = getNextMessage();
- msg.setJMSTimestamp(System.currentTimeMillis());
- producer.send(msg);
- if ( transacted && ((i+1) % tranSize == 0))
- {
- session.commit();
- }
- }
- long time = System.currentTimeMillis() - start;
- double rate = ((double)count/(double)time)*1000;
- System.out.println(new StringBuilder("Producer rate: ").
- append(df.format(rate)).
- append(" msg/sec").
- toString());
- }
-
- public void waitForCompletion() throws Exception
- {
- MessageConsumer tmp = session.createConsumer(feedbackDest);
- Message msg = session.createTextMessage("End");
- msg.setJMSReplyTo(feedbackDest);
- producer.send(msg);
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- tmp.receive();
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- tmp.close();
- System.out.println("Consumer has completed the test......");
- }
-
- public void tearDown() throws Exception
- {
- producer.close();
- session.close();
- con.close();
- }
-
- public void test()
- {
- try
- {
- setUp();
- warmup();
- startTest();
- waitForCompletion();
- tearDown();
- }
- catch(Exception e)
- {
- handleError(e,"Error when running test");
- }
- }
-
-
- public static void main(String[] args)
- {
- final PerfProducer prod = new PerfProducer();
- Runnable r = new Runnable()
- {
- public void run()
- {
- prod.test();
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating producer thread",e);
- }
- t.start();
- }
-} \ No newline at end of file