summaryrefslogtreecommitdiff
path: root/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java')
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java256
1 files changed, 80 insertions, 176 deletions
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
index ac6129ab68..015d1e6205 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
@@ -23,15 +23,13 @@ package org.apache.qpid.tools;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
-import java.util.concurrent.CountDownLatch;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
-import javax.jms.MapMessage;
import javax.jms.Message;
+import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
-import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.thread.Threading;
/**
@@ -53,52 +51,38 @@ import org.apache.qpid.thread.Threading;
* System throughput and latencies calculated by the PerfConsumer are more realistic
* numbers.
*
- * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs
- * I have done so far, it seems quite useful to compute the producer rate as it gives an
- * indication of how the system behaves. For ex if there is a gap between producer and consumer rates
- * you could clearly see the higher latencies and when producer and consumer rates are very close,
- * latency is good.
- *
*/
public class PerfProducer extends PerfBase
{
- private static long SEC = 60000;
-
MessageProducer producer;
Message msg;
- Object payload;
- List<Object> payloads;
+ byte[] payload;
+ List<byte[]> payloads;
boolean cacheMsg = false;
boolean randomMsgSize = false;
boolean durable = false;
Random random;
int msgSizeRange = 1024;
- boolean rateLimitProducer = false;
- double rateFactor = 0.4;
- double rate = 0.0;
-
- public PerfProducer(String prefix)
+
+ public PerfProducer()
{
- super(prefix);
- System.out.println("Producer ID : " + id);
+ super();
}
public void setUp() throws Exception
{
super.setUp();
- durable = params.isDurable();
- rateLimitProducer = params.getRate() > 0 ? true : false;
- if (rateLimitProducer)
- {
- System.out.println("The test will attempt to limit the producer to " + params.getRate() + " msg/sec");
- }
+ feedbackDest = session.createTemporaryQueue();
+ durable = params.isDurable();
+
// if message caching is enabled we pre create the message
// else we pre create the payload
if (params.isCacheMessage())
{
cacheMsg = true;
- msg = createMessage(createPayload(params.getMsgSize()));
+
+ msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
msg.setJMSDeliveryMode(durable?
DeliveryMode.PERSISTENT :
DeliveryMode.NON_PERSISTENT
@@ -109,52 +93,21 @@ public class PerfProducer extends PerfBase
random = new Random(20080921);
randomMsgSize = true;
msgSizeRange = params.getMsgSize();
- payloads = new ArrayList<Object>(msgSizeRange);
-
+ payloads = new ArrayList<byte[]>(msgSizeRange);
+
for (int i=0; i < msgSizeRange; i++)
{
- payloads.add(createPayload(i));
+ payloads.add(MessageFactory.createMessagePayload(i).getBytes());
}
- }
+ }
else
{
- payload = createPayload(params.getMsgSize());
+ payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
}
producer = session.createProducer(dest);
- System.out.println("Producer: " + id + " Sending messages to: " + ((AMQDestination)dest).getQueueName());
producer.setDisableMessageID(params.isDisableMessageID());
producer.setDisableMessageTimestamp(params.isDisableTimestamp());
-
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal());
- sendMessageToController(m);
- }
-
- Object createPayload(int size)
- {
- if (msgType == MessageType.TEXT)
- {
- return MessageFactory.createMessagePayload(size);
- }
- else
- {
- return MessageFactory.createMessagePayload(size).getBytes();
- }
- }
-
- Message createMessage(Object payload) throws Exception
- {
- if (msgType == MessageType.TEXT)
- {
- return session.createTextMessage((String)payload);
- }
- else
- {
- BytesMessage m = session.createBytesMessage();
- m.writeBytes((byte[])payload);
- return m;
- }
}
protected Message getNextMessage() throws Exception
@@ -164,130 +117,117 @@ public class PerfProducer extends PerfBase
return msg;
}
else
- {
- Message m;
-
+ {
+ msg = session.createBytesMessage();
+
if (!randomMsgSize)
{
- m = createMessage(payload);
+ ((BytesMessage)msg).writeBytes(payload);
}
else
{
- m = createMessage(payloads.get(random.nextInt(msgSizeRange)));
+ ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange)));
}
- m.setJMSDeliveryMode(durable?
+ msg.setJMSDeliveryMode(durable?
DeliveryMode.PERSISTENT :
DeliveryMode.NON_PERSISTENT
);
- return m;
+ return msg;
}
}
public void warmup()throws Exception
{
- receiveFromController(OPCode.PRODUCER_STARTWARMUP);
- System.out.println("Producer: " + id + " Warming up......");
+ System.out.println("Warming up......");
+ MessageConsumer tmp = session.createConsumer(feedbackDest);
for (int i=0; i < params.getWarmupCount() -1; i++)
{
producer.send(getNextMessage());
}
- sendEndMessage();
+ Message msg = session.createTextMessage("End");
+ msg.setJMSReplyTo(feedbackDest);
+ producer.send(msg);
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ tmp.receive();
if (params.isTransacted())
{
session.commit();
}
+
+ tmp.close();
}
public void startTest() throws Exception
{
- resetCounters();
- receiveFromController(OPCode.PRODUCER_START);
+ System.out.println("Starting test......");
int count = params.getMsgCount();
boolean transacted = params.isTransacted();
int tranSize = params.getTransactionSize();
- long limit = (long)(params.getRate() * rateFactor); // in msecs
- long timeLimit = (long)(SEC * rateFactor); // in msecs
-
- long start = Clock.getTime(); // defaults to nano secs
- long interval = start;
+ long start = System.currentTimeMillis();
for(int i=0; i < count; i++ )
{
Message msg = getNextMessage();
- msg.setLongProperty(TIMESTAMP, Clock.getTime());
+ msg.setJMSTimestamp(System.currentTimeMillis());
producer.send(msg);
if ( transacted && ((i+1) % tranSize == 0))
{
session.commit();
}
-
- if (rateLimitProducer && i%limit == 0)
- {
- long elapsed = (Clock.getTime() - interval)*Clock.convertToMiliSecs(); // in msecs
- if (elapsed < timeLimit)
- {
- Thread.sleep(elapsed);
- }
- interval = Clock.getTime();
-
- }
- }
- sendEndMessage();
- if ( transacted)
- {
- session.commit();
}
- long time = Clock.getTime() - start;
- rate = (double)count*Clock.convertToSecs()/(double)time;
+ long time = System.currentTimeMillis() - start;
+ double rate = ((double)count/(double)time)*1000;
System.out.println(new StringBuilder("Producer rate: ").
append(df.format(rate)).
append(" msg/sec").
toString());
}
- public void resetCounters()
+ public void waitForCompletion() throws Exception
{
+ MessageConsumer tmp = session.createConsumer(feedbackDest);
+ Message msg = session.createTextMessage("End");
+ msg.setJMSReplyTo(feedbackDest);
+ producer.send(msg);
- }
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
- public void sendEndMessage() throws Exception
- {
- Message msg = session.createMessage();
- msg.setBooleanProperty("End", true);
- producer.send(msg);
- }
+ tmp.receive();
- public void sendResults() throws Exception
- {
- MapMessage msg = controllerSession.createMapMessage();
- msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal());
- msg.setDouble(PROD_RATE, rate);
- sendMessageToController(msg);
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ tmp.close();
+ System.out.println("Consumer has completed the test......");
}
- @Override
public void tearDown() throws Exception
{
- super.tearDown();
+ producer.close();
+ session.close();
+ con.close();
}
- public void run()
+ public void test()
{
try
{
setUp();
warmup();
- boolean nextIteration = true;
- while (nextIteration)
- {
- System.out.println("=========================================================\n");
- System.out.println("Producer: " + id + " starting a new iteration ......\n");
- startTest();
- sendResults();
- nextIteration = continueTest();
- }
+ startTest();
+ waitForCompletion();
tearDown();
}
catch(Exception e)
@@ -296,63 +236,27 @@ public class PerfProducer extends PerfBase
}
}
- public void startControllerIfNeeded()
+
+ public static void main(String[] args)
{
- if (!params.isExternalController())
+ final PerfProducer prod = new PerfProducer();
+ Runnable r = new Runnable()
{
- final PerfTestController controller = new PerfTestController();
- Runnable r = new Runnable()
- {
- public void run()
- {
- controller.run();
- }
- };
-
- Thread t;
- try
+ public void run()
{
- t = Threading.getThreadFactory().createThread(r);
+ prod.test();
}
- catch(Exception e)
- {
- throw new Error("Error creating controller thread",e);
- }
- t.start();
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
}
- }
-
-
- public static void main(String[] args) throws InterruptedException
- {
- String scriptId = (args.length == 1) ? args[0] : "";
- int conCount = Integer.getInteger("con_count",1);
- final CountDownLatch testCompleted = new CountDownLatch(conCount);
- for (int i=0; i < conCount; i++)
+ catch(Exception e)
{
- final PerfProducer prod = new PerfProducer(scriptId + i);
- prod.startControllerIfNeeded();
- Runnable r = new Runnable()
- {
- public void run()
- {
- prod.run();
- testCompleted.countDown();
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating producer thread",e);
- }
- t.start();
+ throw new Error("Error creating producer thread",e);
}
- testCompleted.await();
- System.out.println("Producers have completed the test......");
+ t.start();
}
} \ No newline at end of file