summaryrefslogtreecommitdiff
path: root/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java')
-rw-r--r--trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java207
1 files changed, 207 insertions, 0 deletions
diff --git a/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java b/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java
new file mode 100644
index 0000000000..757b1bfcda
--- /dev/null
+++ b/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java
@@ -0,0 +1,207 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit.perf;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+
+import org.apache.qpid.testkit.MessageFactory;
+
+/**
+ * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation
+ * from the consumer that it has successfully consumed them and ready to start the
+ * test. It will start sending y no of messages and each message will contain a time
+ * stamp. This will be used at the receiving end to measure the latency.
+ *
+ * This is done with the assumption that both consumer and producer are running on
+ * the same machine or different machines which have time synced using a time server.
+ *
+ * This test also calculates the producer rate as follows.
+ * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs)
+ *
+ * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
+ *
+ * Rajith - Producer rate is not an accurate perf metric IMO.
+ * It is heavily inlfuenced by any in memory buffering.
+ * System throughput and latencies calculated by the PerfConsumer are more realistic
+ * numbers.
+ *
+ */
+public class PerfProducer extends PerfBase
+{
+ MessageProducer producer;
+ Message msg;
+ byte[] payload;
+
+ public PerfProducer()
+ {
+ super();
+ }
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ feedbackDest = session.createTemporaryQueue();
+
+ // if message caching is enabled we pre create the message
+ // else we pre create the payload
+ if (params.isCacheMessage())
+ {
+ msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
+ msg.setJMSDeliveryMode(params.isDurable()?
+ DeliveryMode.PERSISTENT :
+ DeliveryMode.NON_PERSISTENT
+ );
+ }
+ else
+ {
+ payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
+ }
+
+ producer = session.createProducer(dest);
+ producer.setDisableMessageID(params.isDisableMessageID());
+ producer.setDisableMessageTimestamp(params.isDisableTimestamp());
+ }
+
+ protected Message getNextMessage() throws Exception
+ {
+ if (params.isCacheMessage())
+ {
+ return msg;
+ }
+ else
+ {
+ msg = session.createBytesMessage();
+ ((BytesMessage)msg).writeBytes(payload);
+ return msg;
+ }
+ }
+
+ public void warmup()throws Exception
+ {
+ System.out.println("Warming up......");
+ MessageConsumer tmp = session.createConsumer(feedbackDest);
+
+ for (int i=0; i < params.getWarmupCount() -1; i++)
+ {
+ producer.send(getNextMessage());
+ }
+ Message msg = session.createTextMessage("End");
+ msg.setJMSReplyTo(feedbackDest);
+ producer.send(msg);
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ tmp.receive();
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ tmp.close();
+ }
+
+ public void startTest() throws Exception
+ {
+ System.out.println("Starting test......");
+ int count = params.getMsgCount();
+ boolean transacted = params.isTransacted();
+ int tranSize = params.getTransactionSize();
+
+ long start = System.currentTimeMillis();
+ for(int i=0; i < count; i++ )
+ {
+ Message msg = getNextMessage();
+ msg.setJMSTimestamp(System.currentTimeMillis());
+ producer.send(msg);
+ if ( transacted && ((i+1) % tranSize == 0))
+ {
+ session.commit();
+ }
+ }
+ long time = System.currentTimeMillis() - start;
+ double rate = ((double)count/(double)time)*1000;
+ System.out.println(new StringBuilder("Producer rate: ").
+ append(df.format(rate)).
+ append(" msg/sec").
+ toString());
+ }
+
+ public void waitForCompletion() throws Exception
+ {
+ MessageConsumer tmp = session.createConsumer(feedbackDest);
+ Message msg = session.createTextMessage("End");
+ msg.setJMSReplyTo(feedbackDest);
+ producer.send(msg);
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ tmp.receive();
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ tmp.close();
+ System.out.println("Consumer has completed the test......");
+ }
+
+ public void tearDown() throws Exception
+ {
+ producer.close();
+ session.close();
+ con.close();
+ }
+
+ public void test()
+ {
+ try
+ {
+ setUp();
+ warmup();
+ startTest();
+ waitForCompletion();
+ tearDown();
+ }
+ catch(Exception e)
+ {
+ handleError(e,"Error when running test");
+ }
+ }
+
+
+ public static void main(String[] args)
+ {
+ PerfProducer prod = new PerfProducer();
+ prod.test();
+ }
+} \ No newline at end of file