summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java2
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java81
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java228
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java228
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java148
5 files changed, 210 insertions, 477 deletions
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java
index 979d2ef76f..4e79dd62a8 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java
@@ -27,6 +27,8 @@ package org.apache.qpid.tools;
public class Clock
{
+ public final static long SEC = 60000;
+
private static Precision precision;
private static long offset = -1; // in nano secs
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java
index 121e94cea1..097b021b3e 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java
@@ -21,7 +21,6 @@
package org.apache.qpid.tools;
import java.net.InetAddress;
-import java.text.DecimalFormat;
import java.util.UUID;
import javax.jms.Connection;
@@ -32,14 +31,17 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.qpid.client.AMQAnyDestination;
-import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession_0_10;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.messaging.Address;
+import org.apache.qpid.tools.TestConfiguration.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class PerfBase
+public class MercuryBase
{
+ private static final Logger _logger = LoggerFactory.getLogger(MercuryBase.class);
+
public final static String CODE = "CODE";
public final static String ID = "ID";
public final static String REPLY_ADDR = "REPLY_ADDR";
@@ -54,14 +56,13 @@ public class PerfBase
String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}");
- TestParams params;
+ TestConfiguration config;
Connection con;
Session session;
Session controllerSession;
Destination dest;
Destination myControlQueue;
Destination controllerQueue;
- DecimalFormat df = new DecimalFormat("###.##");
String id;
String myControlQueueAddr;
@@ -69,7 +70,8 @@ public class PerfBase
MessageConsumer receiveFromController;
String prefix = "";
- enum OPCode {
+ enum OPCode
+ {
REGISTER_CONSUMER, REGISTER_PRODUCER,
PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP,
CONSUMER_READY, PRODUCER_READY,
@@ -79,39 +81,11 @@ public class PerfBase
CONTINUE_TEST, STOP_TEST
};
- enum MessageType {
- BYTES, TEXT, MAP, OBJECT;
-
- public static MessageType getType(String s) throws Exception
- {
- if ("text".equalsIgnoreCase(s))
- {
- return TEXT;
- }
- else if ("bytes".equalsIgnoreCase(s))
- {
- return BYTES;
- }
- /*else if ("map".equalsIgnoreCase(s))
- {
- return MAP;
- }
- else if ("object".equalsIgnoreCase(s))
- {
- return OBJECT;
- }*/
- else
- {
- throw new Exception("Unsupported message type");
- }
- }
- };
-
MessageType msgType = MessageType.BYTES;
- public PerfBase(String prefix)
+ public MercuryBase(TestConfiguration config,String prefix)
{
- params = new TestParams();
+ this.config = config;
String host = "";
try
{
@@ -127,25 +101,16 @@ public class PerfBase
public void setUp() throws Exception
{
- if (params.getHost().equals("") || params.getPort() == -1)
- {
- con = new AMQConnection(params.getUrl());
- }
- else
- {
- con = new AMQConnection(params.getHost(),params.getPort(),"guest","guest","test","test");
- }
+ con = config.createConnection();
con.start();
- session = con.createSession(params.isTransacted(),
- params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode());
controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
dest = createDestination();
- controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR);
+ controllerQueue = AMQDestination.createDestination(CONTROLLER_ADDR);
myControlQueue = session.createQueue(myControlQueueAddr);
- msgType = MessageType.getType(params.getMessageType());
- System.out.println("Using " + msgType + " messages");
+ msgType = MessageType.getType(config.getMessageType());
+ _logger.debug("Using " + msgType + " messages");
sendToController = controllerSession.createProducer(controllerQueue);
receiveFromController = controllerSession.createConsumer(myControlQueue);
@@ -153,11 +118,11 @@ public class PerfBase
private Destination createDestination() throws Exception
{
- if (params.isUseUniqueDests())
+ if (config.isUseUniqueDests())
{
- System.out.println("Prefix : " + prefix);
- Address addr = Address.parse(params.getAddress());
- AMQAnyDestination temp = new AMQAnyDestination(params.getAddress());
+ _logger.debug("Prefix : " + prefix);
+ Address addr = Address.parse(config.getAddress());
+ AMQDestination temp = (AMQDestination) AMQDestination.createDestination(config.getAddress());
int type = ((AMQSession_0_10)session).resolveAddressType(temp);
if ( type == AMQDestination.TOPIC_TYPE)
@@ -171,11 +136,11 @@ public class PerfBase
System.out.println("Setting name : " + addr);
}
- return new AMQAnyDestination(addr);
+ return AMQDestination.createDestination(addr.toString());
}
else
{
- return new AMQAnyDestination(params.getAddress());
+ return AMQDestination.createDestination(config.getAddress());
}
}
@@ -190,7 +155,7 @@ public class PerfBase
{
MapMessage m = (MapMessage)receiveFromController.receive();
OPCode code = OPCode.values()[m.getInt(CODE)];
- System.out.println("Received Code : " + code);
+ _logger.debug("Received Code : " + code);
if (expected != code)
{
throw new Exception("Expected OPCode : " + expected + " but received : " + code);
@@ -202,7 +167,7 @@ public class PerfBase
{
MapMessage m = (MapMessage)receiveFromController.receive();
OPCode code = OPCode.values()[m.getInt(CODE)];
- System.out.println("Received Code : " + code);
+ _logger.debug("Received Code : " + code);
return (code == OPCode.CONTINUE_TEST);
}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java
index b63892bb51..b35adc45d6 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java
@@ -20,18 +20,15 @@
*/
package org.apache.qpid.tools;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.CountDownLatch;
import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.TextMessage;
-import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.thread.Threading;
+import org.apache.qpid.tools.report.MercuryReporter;
+import org.apache.qpid.tools.report.MercuryReporter.MercuryThroughputAndLatency;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* PerfConsumer will receive x no of messages in warmup mode.
@@ -74,39 +71,28 @@ import org.apache.qpid.thread.Threading;
*
*/
-public class PerfConsumer extends PerfBase implements MessageListener
+public class MercuryConsumerController extends MercuryBase
{
- MessageConsumer consumer;
- long maxLatency = 0;
- long minLatency = Long.MAX_VALUE;
- long totalLatency = 0; // to calculate avg latency.
- int rcvdMsgCount = 0;
- long startTime = 0; // to measure consumer throughput
- long rcvdTime = 0;
- boolean transacted = false;
- int transSize = 0;
+ private static final Logger _logger = LoggerFactory.getLogger(MercuryConsumerController.class);
+ MercuryReporter reporter;
+ TestConfiguration config;
+ QpidReceive receiver;
- boolean printStdDev = false;
- List<Long> sample;
-
- final Object lock = new Object();
-
- public PerfConsumer(String prefix)
+ public MercuryConsumerController(TestConfiguration config, MercuryReporter reporter, String prefix)
{
- super(prefix);
- System.out.println("Consumer ID : " + id);
+ super(config,prefix);
+ this.reporter = reporter;
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Consumer ID : " + id);
+ }
}
public void setUp() throws Exception
{
super.setUp();
- consumer = session.createConsumer(dest);
- System.out.println("Consumer: " + id + " Receiving messages from : " + ((AMQDestination)dest).getQueueName() + "\n");
-
- // Storing the following two for efficiency
- transacted = params.isTransacted();
- transSize = params.getTransactionSize();
- printStdDev = params.isPrintStdDev();
+ receiver = new QpidReceive(reporter,config, con,dest);
+ receiver.setUp();
MapMessage m = controllerSession.createMapMessage();
m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal());
sendMessageToController(m);
@@ -115,151 +101,71 @@ public class PerfConsumer extends PerfBase implements MessageListener
public void warmup()throws Exception
{
receiveFromController(OPCode.CONSUMER_STARTWARMUP);
- Message msg = consumer.receive();
- // This is to ensure we drain the queue before we start the actual test.
- while ( msg != null)
- {
- if (msg.getBooleanProperty("End") == true)
- {
- // It's more realistic for the consumer to signal this.
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.PRODUCER_READY.ordinal());
- sendMessageToController(m);
- }
- msg = consumer.receive(1000);
- }
+ receiver.waitforCompletion(config.getWarmupCount());
- if (params.isTransacted())
- {
- session.commit();
- }
+ // It's more realistic for the consumer to signal this.
+ MapMessage m1 = controllerSession.createMapMessage();
+ m1.setInt(CODE, OPCode.PRODUCER_READY.ordinal());
+ sendMessageToController(m1);
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.CONSUMER_READY.ordinal());
- sendMessageToController(m);
- consumer.setMessageListener(this);
+ MapMessage m2 = controllerSession.createMapMessage();
+ m2.setInt(CODE, OPCode.CONSUMER_READY.ordinal());
+ sendMessageToController(m2);
}
- public void startTest() throws Exception
+ public void runReceiver() throws Exception
{
- System.out.println("Consumer: " + id + " Starting test......" + "\n");
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Consumer: " + id + " Starting iteration......" + "\n");
+ }
resetCounters();
+ receiver.waitforCompletion(config.getMsgCount());
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal());
+ sendMessageToController(m);
}
public void resetCounters()
{
- rcvdMsgCount = 0;
- maxLatency = 0;
- minLatency = Long.MAX_VALUE;
- totalLatency = 0;
- if (printStdDev)
- {
- sample = null;
- sample = new ArrayList<Long>(params.getMsgCount());
- }
+ reporter.clear();
}
public void sendResults() throws Exception
{
receiveFromController(OPCode.CONSUMER_STOP);
+ reporter.report();
- double avgLatency = (double)totalLatency/(double)rcvdMsgCount;
- double consRate = (double)rcvdMsgCount*Clock.convertToSecs()/(double)(rcvdTime - startTime);
- double stdDev = 0.0;
- if (printStdDev)
- {
- stdDev = calculateStdDev(avgLatency);
- }
MapMessage m = controllerSession.createMapMessage();
m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal());
- m.setDouble(AVG_LATENCY, avgLatency/Clock.convertToMiliSecs());
- m.setDouble(MIN_LATENCY,minLatency/Clock.convertToMiliSecs());
- m.setDouble(MAX_LATENCY,maxLatency/Clock.convertToMiliSecs());
- m.setDouble(STD_DEV, stdDev/Clock.convertToMiliSecs());
- m.setDouble(CONS_RATE, consRate);
- m.setLong(MSG_COUNT, rcvdMsgCount);
+ m.setDouble(AVG_LATENCY, reporter.getAvgLatency());
+ m.setDouble(MIN_LATENCY, reporter.getMinLatency());
+ m.setDouble(MAX_LATENCY, reporter.getMaxLatency());
+ m.setDouble(STD_DEV, reporter.getStdDev());
+ m.setDouble(CONS_RATE, reporter.getRate());
+ m.setLong(MSG_COUNT, reporter.getSampleSize());
sendMessageToController(m);
- System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
- System.out.println(new StringBuilder("Consumer rate : ").
- append(df.format(consRate)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Avg Latency : ").
- append(df.format(avgLatency/Clock.convertToMiliSecs())).
- append(" ms").toString());
- System.out.println(new StringBuilder("Min Latency : ").
- append(df.format(minLatency/Clock.convertToMiliSecs())).
- append(" ms").toString());
- System.out.println(new StringBuilder("Max Latency : ").
- append(df.format(maxLatency/Clock.convertToMiliSecs())).
- append(" ms").toString());
- if (printStdDev)
+ reporter.log(new StringBuilder("Total Msgs Received : ").append(reporter.getSampleSize()).toString());
+ reporter.log(new StringBuilder("Consumer rate : ").
+ append(config.getDecimalFormat().format(reporter.getRate())).
+ append(" msg/sec").toString());
+ reporter.log(new StringBuilder("Avg Latency : ").
+ append(config.getDecimalFormat().format(reporter.getAvgLatency())).
+ append(" ms").toString());
+ reporter.log(new StringBuilder("Min Latency : ").
+ append(config.getDecimalFormat().format(reporter.getMinLatency())).
+ append(" ms").toString());
+ reporter.log(new StringBuilder("Max Latency : ").
+ append(config.getDecimalFormat().format(reporter.getMaxLatency())).
+ append(" ms").toString());
+ if (config.isPrintStdDev())
{
- System.out.println(new StringBuilder("Std Dev : ").
- append(stdDev/Clock.convertToMiliSecs()).toString());
+ reporter.log(new StringBuilder("Std Dev : ").
+ append(reporter.getStdDev()).toString());
}
}
- public double calculateStdDev(double mean)
- {
- double v = 0;
- for (double latency: sample)
- {
- v = v + Math.pow((latency-mean), 2);
- }
- v = v/sample.size();
- return Math.round(Math.sqrt(v));
- }
-
- public void onMessage(Message msg)
- {
- try
- {
- // To figure out the decoding overhead of text
- if (msgType == MessageType.TEXT)
- {
- ((TextMessage)msg).getText();
- }
-
- if (msg.getBooleanProperty("End"))
- {
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal());
- sendMessageToController(m);
- }
- else
- {
- rcvdTime = Clock.getTime();
- rcvdMsgCount ++;
-
- if (rcvdMsgCount == 1)
- {
- startTime = rcvdTime;
- }
-
- if (transacted && (rcvdMsgCount % transSize == 0))
- {
- session.commit();
- }
-
- long latency = rcvdTime - msg.getLongProperty(TIMESTAMP);
- maxLatency = Math.max(maxLatency, latency);
- minLatency = Math.min(minLatency, latency);
- totalLatency = totalLatency + latency;
- if (printStdDev)
- {
- sample.add(latency);
- }
- }
-
- }
- catch(Exception e)
- {
- handleError(e,"Error when receiving messages");
- }
-
- }
-
public void run()
{
try
@@ -271,7 +177,7 @@ public class PerfConsumer extends PerfBase implements MessageListener
{
System.out.println("=========================================================\n");
System.out.println("Consumer: " + id + " starting a new iteration ......\n");
- startTest();
+ runReceiver();
sendResults();
nextIteration = continueTest();
}
@@ -283,21 +189,22 @@ public class PerfConsumer extends PerfBase implements MessageListener
}
}
- @Override
+ @Override
public void tearDown() throws Exception
{
super.tearDown();
}
- public static void main(String[] args) throws InterruptedException
+ public static void main(String[] args) throws Exception
{
+ TestConfiguration config = new JVMArgConfiguration();
+ MercuryReporter reporter= new MercuryReporter(MercuryThroughputAndLatency.class,System.out,10,true);
String scriptId = (args.length == 1) ? args[0] : "";
- int conCount = Integer.getInteger("con_count",1);
+ int conCount = config.getConnectionCount();
final CountDownLatch testCompleted = new CountDownLatch(conCount);
for (int i=0; i < conCount; i++)
{
-
- final PerfConsumer cons = new PerfConsumer(scriptId + i);
+ final MercuryConsumerController cons = new MercuryConsumerController(config, reporter, scriptId + i);
Runnable r = new Runnable()
{
public void run()
@@ -317,9 +224,8 @@ public class PerfConsumer extends PerfBase implements MessageListener
throw new Error("Error creating consumer thread",e);
}
t.start();
-
}
testCompleted.await();
- System.out.println("Consumers have completed the test......\n");
+ reporter.log("Consumers have completed the test......\n");
}
} \ No newline at end of file
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java
index ac6129ab68..02377bb853 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java
@@ -20,19 +20,15 @@
*/
package org.apache.qpid.tools;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
import java.util.concurrent.CountDownLatch;
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.thread.Threading;
+import org.apache.qpid.tools.report.MercuryReporter;
+import org.apache.qpid.tools.report.MercuryReporter.MercuryThroughput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* PerfProducer sends an x no of messages in warmup mode and wait for a confirmation
@@ -60,216 +56,68 @@ import org.apache.qpid.thread.Threading;
* latency is good.
*
*/
-public class PerfProducer extends PerfBase
+public class MercuryProducerController extends MercuryBase
{
- private static long SEC = 60000;
+ private static final Logger _logger = LoggerFactory.getLogger(MercuryProducerController.class);
+ MercuryReporter reporter;
+ QpidSend sender;
- MessageProducer producer;
- Message msg;
- Object payload;
- List<Object> payloads;
- boolean cacheMsg = false;
- boolean randomMsgSize = false;
- boolean durable = false;
- Random random;
- int msgSizeRange = 1024;
- boolean rateLimitProducer = false;
- double rateFactor = 0.4;
- double rate = 0.0;
-
- public PerfProducer(String prefix)
+ public MercuryProducerController(TestConfiguration config, MercuryReporter reporter, String prefix)
{
- super(prefix);
+ super(config,prefix);
+ this.reporter = reporter;
System.out.println("Producer ID : " + id);
}
public void setUp() throws Exception
{
super.setUp();
- durable = params.isDurable();
- rateLimitProducer = params.getRate() > 0 ? true : false;
- if (rateLimitProducer)
- {
- System.out.println("The test will attempt to limit the producer to " + params.getRate() + " msg/sec");
- }
-
- // if message caching is enabled we pre create the message
- // else we pre create the payload
- if (params.isCacheMessage())
- {
- cacheMsg = true;
- msg = createMessage(createPayload(params.getMsgSize()));
- msg.setJMSDeliveryMode(durable?
- DeliveryMode.PERSISTENT :
- DeliveryMode.NON_PERSISTENT
- );
- }
- else if (params.isRandomMsgSize())
- {
- random = new Random(20080921);
- randomMsgSize = true;
- msgSizeRange = params.getMsgSize();
- payloads = new ArrayList<Object>(msgSizeRange);
-
- for (int i=0; i < msgSizeRange; i++)
- {
- payloads.add(createPayload(i));
- }
- }
- else
- {
- payload = createPayload(params.getMsgSize());
- }
-
- producer = session.createProducer(dest);
- System.out.println("Producer: " + id + " Sending messages to: " + ((AMQDestination)dest).getQueueName());
- producer.setDisableMessageID(params.isDisableMessageID());
- producer.setDisableMessageTimestamp(params.isDisableTimestamp());
-
+ sender = new QpidSend(reporter,config, con,dest);
+ sender.setUp();
MapMessage m = controllerSession.createMapMessage();
m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal());
sendMessageToController(m);
}
- Object createPayload(int size)
- {
- if (msgType == MessageType.TEXT)
- {
- return MessageFactory.createMessagePayload(size);
- }
- else
- {
- return MessageFactory.createMessagePayload(size).getBytes();
- }
- }
-
- Message createMessage(Object payload) throws Exception
- {
- if (msgType == MessageType.TEXT)
- {
- return session.createTextMessage((String)payload);
- }
- else
- {
- BytesMessage m = session.createBytesMessage();
- m.writeBytes((byte[])payload);
- return m;
- }
- }
-
- protected Message getNextMessage() throws Exception
- {
- if (cacheMsg)
- {
- return msg;
- }
- else
- {
- Message m;
-
- if (!randomMsgSize)
- {
- m = createMessage(payload);
- }
- else
- {
- m = createMessage(payloads.get(random.nextInt(msgSizeRange)));
- }
- m.setJMSDeliveryMode(durable?
- DeliveryMode.PERSISTENT :
- DeliveryMode.NON_PERSISTENT
- );
- return m;
- }
- }
-
public void warmup()throws Exception
{
receiveFromController(OPCode.PRODUCER_STARTWARMUP);
- System.out.println("Producer: " + id + " Warming up......");
-
- for (int i=0; i < params.getWarmupCount() -1; i++)
+ if (_logger.isInfoEnabled())
{
- producer.send(getNextMessage());
- }
- sendEndMessage();
-
- if (params.isTransacted())
- {
- session.commit();
+ _logger.info("Producer: " + id + " Warming up......");
}
+ sender.send(config.getWarmupCount());
+ sender.sendEndMessage();
}
- public void startTest() throws Exception
+ public void runSender() throws Exception
{
resetCounters();
receiveFromController(OPCode.PRODUCER_START);
- int count = params.getMsgCount();
- boolean transacted = params.isTransacted();
- int tranSize = params.getTransactionSize();
-
- long limit = (long)(params.getRate() * rateFactor); // in msecs
- long timeLimit = (long)(SEC * rateFactor); // in msecs
-
- long start = Clock.getTime(); // defaults to nano secs
- long interval = start;
- for(int i=0; i < count; i++ )
- {
- Message msg = getNextMessage();
- msg.setLongProperty(TIMESTAMP, Clock.getTime());
- producer.send(msg);
- if ( transacted && ((i+1) % tranSize == 0))
- {
- session.commit();
- }
-
- if (rateLimitProducer && i%limit == 0)
- {
- long elapsed = (Clock.getTime() - interval)*Clock.convertToMiliSecs(); // in msecs
- if (elapsed < timeLimit)
- {
- Thread.sleep(elapsed);
- }
- interval = Clock.getTime();
-
- }
- }
- sendEndMessage();
- if ( transacted)
- {
- session.commit();
- }
- long time = Clock.getTime() - start;
- rate = (double)count*Clock.convertToSecs()/(double)time;
- System.out.println(new StringBuilder("Producer rate: ").
- append(df.format(rate)).
- append(" msg/sec").
- toString());
+ sender.send(config.getMsgCount());
}
public void resetCounters()
{
-
- }
-
- public void sendEndMessage() throws Exception
- {
- Message msg = session.createMessage();
- msg.setBooleanProperty("End", true);
- producer.send(msg);
+ sender.resetCounters();
}
public void sendResults() throws Exception
{
MapMessage msg = controllerSession.createMapMessage();
msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal());
- msg.setDouble(PROD_RATE, rate);
+ msg.setDouble(PROD_RATE, reporter.getRate());
sendMessageToController(msg);
+ reporter.log(new StringBuilder("Producer rate: ").
+ append(config.getDecimalFormat().format(reporter.getRate())).
+ append(" msg/sec").
+ toString());
}
@Override
public void tearDown() throws Exception
{
+ sender.tearDown();
super.tearDown();
}
@@ -282,9 +130,12 @@ public class PerfProducer extends PerfBase
boolean nextIteration = true;
while (nextIteration)
{
- System.out.println("=========================================================\n");
- System.out.println("Producer: " + id + " starting a new iteration ......\n");
- startTest();
+ if(_logger.isInfoEnabled())
+ {
+ _logger.info("=========================================================\n");
+ _logger.info("Producer: " + id + " starting a new iteration ......\n");
+ }
+ runSender();
sendResults();
nextIteration = continueTest();
}
@@ -298,9 +149,9 @@ public class PerfProducer extends PerfBase
public void startControllerIfNeeded()
{
- if (!params.isExternalController())
+ if (!config.isExternalController())
{
- final PerfTestController controller = new PerfTestController();
+ final MercuryTestController controller = new MercuryTestController(config);
Runnable r = new Runnable()
{
public void run()
@@ -322,15 +173,16 @@ public class PerfProducer extends PerfBase
}
}
-
- public static void main(String[] args) throws InterruptedException
+ public static void main(String[] args) throws Exception
{
+ TestConfiguration config = new JVMArgConfiguration();
+ MercuryReporter reporter= new MercuryReporter(MercuryThroughput.class,System.out,10,true);
String scriptId = (args.length == 1) ? args[0] : "";
- int conCount = Integer.getInteger("con_count",1);
+ int conCount = config.getConnectionCount();
final CountDownLatch testCompleted = new CountDownLatch(conCount);
for (int i=0; i < conCount; i++)
{
- final PerfProducer prod = new PerfProducer(scriptId + i);
+ final MercuryProducerController prod = new MercuryProducerController(config, reporter, scriptId + i);
prod.startControllerIfNeeded();
Runnable r = new Runnable()
{
@@ -353,6 +205,6 @@ public class PerfProducer extends PerfBase
t.start();
}
testCompleted.await();
- System.out.println("Producers have completed the test......");
+ reporter.log("Producers have completed the test......");
}
} \ No newline at end of file
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java
index 5fca1fa4bd..8c66a1e44d 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java
@@ -33,6 +33,9 @@ import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import org.apache.qpid.client.message.AMQPEncodedMapMessage;
+import org.apache.qpid.tools.report.Reporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The Controller coordinates a test run between a number
@@ -62,8 +65,10 @@ import org.apache.qpid.client.message.AMQPEncodedMapMessage;
* System throughput is calculated as follows
* totalMsgCount/(totalTestTime)
*/
-public class PerfTestController extends PerfBase implements MessageListener
+public class MercuryTestController extends MercuryBase implements MessageListener
{
+ private static final Logger _logger = LoggerFactory.getLogger(MercuryProducerController.class);
+
enum TestMode { SINGLE_RUN, TIME_BASED };
TestMode testMode = TestMode.SINGLE_RUN;
@@ -102,11 +107,13 @@ public class PerfTestController extends PerfBase implements MessageListener
private MessageConsumer consumer;
private boolean printStdDev = false;
- FileWriter writer;
+ private FileWriter writer;
+ private Reporter report;
- public PerfTestController()
+ public MercuryTestController(TestConfiguration config)
{
- super("");
+ super(config,"");
+
consumers = new ConcurrentHashMap<String,MapMessage>(consumerCount);
producers = new ConcurrentHashMap<String,MapMessage>(producerCount);
@@ -114,7 +121,7 @@ public class PerfTestController extends PerfBase implements MessageListener
prodRegistered = new CountDownLatch(producerCount);
consReady = new CountDownLatch(consumerCount);
prodReady = new CountDownLatch(producerCount);
- printStdDev = params.isPrintStdDev();
+ printStdDev = config.isPrintStdDev();
testMode = (duration == -1) ? TestMode.SINGLE_RUN : TestMode.TIME_BASED;
}
@@ -126,28 +133,28 @@ public class PerfTestController extends PerfBase implements MessageListener
writer = new FileWriter("stats-csv.log");
}
consumer = controllerSession.createConsumer(controllerQueue);
- System.out.println("\nController: " + producerCount + " producers are expected");
- System.out.println("Controller: " + consumerCount + " consumers are expected \n");
+ report.log("\nController: " + producerCount + " producers are expected");
+ report.log("Controller: " + consumerCount + " consumers are expected \n");
consumer.setMessageListener(this);
consRegistered.await();
prodRegistered.await();
- System.out.println("\nController: All producers and consumers have registered......\n");
+ report.log("\nController: All producers and consumers have registered......\n");
}
public void warmup() throws Exception
{
- System.out.println("Controller initiating warm up sequence......");
+ report.log("Controller initiating warm up sequence......");
sendMessageToNodes(OPCode.CONSUMER_STARTWARMUP,consumers.values());
sendMessageToNodes(OPCode.PRODUCER_STARTWARMUP,producers.values());
prodReady.await();
consReady.await();
- System.out.println("\nController : All producers and consumers are ready to start the test......\n");
+ report.log("\nController : All producers and consumers are ready to start the test......\n");
}
public void startTest() throws Exception
{
resetCounters();
- System.out.println("\nController Starting test......");
+ report.log("\nController Starting test......");
long start = Clock.getTime();
sendMessageToNodes(OPCode.PRODUCER_START,producers.values());
receivedEndMsg.await();
@@ -200,7 +207,7 @@ public class PerfTestController extends PerfBase implements MessageListener
}
catch(Exception e)
{
- System.out.println("Error calculating stats from Consumer : " + conStat);
+ System.err.println("Error calculating stats from Consumer : " + conStat);
}
@@ -217,7 +224,7 @@ public class PerfTestController extends PerfBase implements MessageListener
}
catch(Exception e)
{
- System.out.println("Error calculating stats from Producer : " + conStat);
+ System.err.println("Error calculating stats from Producer : " + conStat);
}
avgSystemLatency = totLatency/consumers.size();
@@ -225,56 +232,56 @@ public class PerfTestController extends PerfBase implements MessageListener
avgSystemConsRate = totalConsRate/consumers.size();
avgSystemProdRate = totalProdRate/producers.size();
- System.out.println("Total test time : " + totalTestTime + " in " + Clock.getPrecision());
+ report.log("Total test time : " + totalTestTime + " in " + Clock.getPrecision());
totalSystemThroughput = (totalMsgCount*Clock.convertToSecs()/totalTestTime);
}
public void printResults() throws Exception
{
- System.out.println(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString());
- System.out.println(new StringBuilder("System Throughput : ").
- append(df.format(totalSystemThroughput)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Avg Consumer rate : ").
- append(df.format(avgSystemConsRate)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Min Consumer rate : ").
- append(df.format(minSystemConsRate)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Max Consumer rate : ").
- append(df.format(maxSystemConsRate)).
- append(" msg/sec").toString());
-
- System.out.println(new StringBuilder("Avg Producer rate : ").
- append(df.format(avgSystemProdRate)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Min Producer rate : ").
- append(df.format(minSystemProdRate)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Max Producer rate : ").
- append(df.format(maxSystemProdRate)).
- append(" msg/sec").toString());
-
- System.out.println(new StringBuilder("Avg System Latency : ").
- append(df.format(avgSystemLatency)).
- append(" ms").toString());
- System.out.println(new StringBuilder("Min System Latency : ").
- append(df.format(minSystemLatency)).
- append(" ms").toString());
- System.out.println(new StringBuilder("Max System Latency : ").
- append(df.format(maxSystemLatency)).
- append(" ms").toString());
+ report.log(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString());
+ report.log(new StringBuilder("System Throughput : ").
+ append(config.getDecimalFormat().format(totalSystemThroughput)).
+ append(" msg/sec").toString());
+ report.log(new StringBuilder("Avg Consumer rate : ").
+ append(config.getDecimalFormat().format(avgSystemConsRate)).
+ append(" msg/sec").toString());
+ report.log(new StringBuilder("Min Consumer rate : ").
+ append(config.getDecimalFormat().format(minSystemConsRate)).
+ append(" msg/sec").toString());
+ report.log(new StringBuilder("Max Consumer rate : ").
+ append(config.getDecimalFormat().format(maxSystemConsRate)).
+ append(" msg/sec").toString());
+
+ report.log(new StringBuilder("Avg Producer rate : ").
+ append(config.getDecimalFormat().format(avgSystemProdRate)).
+ append(" msg/sec").toString());
+ report.log(new StringBuilder("Min Producer rate : ").
+ append(config.getDecimalFormat().format(minSystemProdRate)).
+ append(" msg/sec").toString());
+ report.log(new StringBuilder("Max Producer rate : ").
+ append(config.getDecimalFormat().format(maxSystemProdRate)).
+ append(" msg/sec").toString());
+
+ report.log(new StringBuilder("Avg System Latency : ").
+ append(config.getDecimalFormat().format(avgSystemLatency)).
+ append(" ms").toString());
+ report.log(new StringBuilder("Min System Latency : ").
+ append(config.getDecimalFormat().format(minSystemLatency)).
+ append(" ms").toString());
+ report.log(new StringBuilder("Max System Latency : ").
+ append(config.getDecimalFormat().format(maxSystemLatency)).
+ append(" ms").toString());
if (printStdDev)
{
- System.out.println(new StringBuilder("Avg System Std Dev : ").
- append(avgSystemLatencyStdDev));
+ report.log(new StringBuilder("Avg System Std Dev : ").
+ append(avgSystemLatencyStdDev).toString());
}
}
private synchronized void sendMessageToNodes(OPCode code,Collection<MapMessage> nodes) throws Exception
{
- System.out.println("\nController: Sending code " + code);
+ report.log("\nController: Sending code " + code);
MessageProducer tmpProd = controllerSession.createProducer(null);
MapMessage msg = controllerSession.createMapMessage();
msg.setInt(CODE, code.ordinal());
@@ -282,11 +289,11 @@ public class PerfTestController extends PerfBase implements MessageListener
{
if (node.getString(REPLY_ADDR) == null)
{
- System.out.println("REPLY_ADDR is null " + node);
+ report.log("REPLY_ADDR is null " + node);
}
else
{
- System.out.println("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR));
+ report.log("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR));
}
tmpProd.send(controllerSession.createQueue(node.getString(REPLY_ADDR)), msg);
}
@@ -299,16 +306,16 @@ public class PerfTestController extends PerfBase implements MessageListener
MapMessage m = (MapMessage)msg;
OPCode code = OPCode.values()[m.getInt(CODE)];
- System.out.println("\n---------Controller Received Code : " + code);
- System.out.println("---------Data : " + ((AMQPEncodedMapMessage)m).getMap());
+ report.log("\n---------Controller Received Code : " + code);
+ report.log("---------Data : " + ((AMQPEncodedMapMessage)m).getMap());
switch (code)
{
case REGISTER_CONSUMER :
if (consRegistered.getCount() == 0)
{
- System.out.println("Warning : Expected number of consumers have already registered," +
- "ignoring extra consumer");
+ report.log("Warning : Expected number of consumers have already registered," +
+ "ignoring extra consumer");
break;
}
consumers.put(m.getString(ID),m);
@@ -318,8 +325,8 @@ public class PerfTestController extends PerfBase implements MessageListener
case REGISTER_PRODUCER :
if (prodRegistered.getCount() == 0)
{
- System.out.println("Warning : Expected number of producers have already registered," +
- "ignoring extra producer");
+ report.log("Warning : Expected number of producers have already registered," +
+ "ignoring extra producer");
break;
}
producers.put(m.getString(ID),m);
@@ -403,7 +410,7 @@ public class PerfTestController extends PerfBase implements MessageListener
@Override
public void tearDown() throws Exception {
- System.out.println("Controller: Completed the test......\n");
+ report.log("Controller: Completed the test......\n");
if (testMode == TestMode.TIME_BASED)
{
writer.close();
@@ -416,16 +423,16 @@ public class PerfTestController extends PerfBase implements MessageListener
public void writeStatsToFile() throws Exception
{
writer.append(String.valueOf(totalMsgCount)).append(",");
- writer.append(df.format(totalSystemThroughput)).append(",");
- writer.append(df.format(avgSystemConsRate)).append(",");
- writer.append(df.format(minSystemConsRate)).append(",");
- writer.append(df.format(maxSystemConsRate)).append(",");
- writer.append(df.format(avgSystemProdRate)).append(",");
- writer.append(df.format(minSystemProdRate)).append(",");
- writer.append(df.format(maxSystemProdRate)).append(",");
- writer.append(df.format(avgSystemLatency)).append(",");
- writer.append(df.format(minSystemLatency)).append(",");
- writer.append(df.format(maxSystemLatency));
+ writer.append(config.getDecimalFormat().format(totalSystemThroughput)).append(",");
+ writer.append(config.getDecimalFormat().format(avgSystemConsRate)).append(",");
+ writer.append(config.getDecimalFormat().format(minSystemConsRate)).append(",");
+ writer.append(config.getDecimalFormat().format(maxSystemConsRate)).append(",");
+ writer.append(config.getDecimalFormat().format(avgSystemProdRate)).append(",");
+ writer.append(config.getDecimalFormat().format(minSystemProdRate)).append(",");
+ writer.append(config.getDecimalFormat().format(maxSystemProdRate)).append(",");
+ writer.append(config.getDecimalFormat().format(avgSystemLatency)).append(",");
+ writer.append(config.getDecimalFormat().format(minSystemLatency)).append(",");
+ writer.append(config.getDecimalFormat().format(maxSystemLatency));
if (printStdDev)
{
writer.append(",").append(String.valueOf(avgSystemLatencyStdDev));
@@ -436,7 +443,8 @@ public class PerfTestController extends PerfBase implements MessageListener
public static void main(String[] args)
{
- PerfTestController controller = new PerfTestController();
+ TestConfiguration config = new JVMArgConfiguration();
+ MercuryTestController controller = new MercuryTestController(config);
controller.run();
}
}