summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2012-04-10 11:17:35 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2012-04-10 11:17:35 +0000
commit712ae8927d8f384db7b31cbd9f77a0c5cd1a353f (patch)
tree479d6074ac7f5ed0872a265d070c2f7b025116b0
parenta5f02cb6972f84dc116db36bc73c08bdf31d0312 (diff)
downloadqpid-python-712ae8927d8f384db7b31cbd9f77a0c5cd1a353f.tar.gz
QPID-3941 The existing PerfProducer, PerfConsumer and TestController
were refactored to act more as controllers while the actual sending and receiving is now delegated to the QpidSend and QpidReceive classes. The stat collecting and reporting is now handled by the Report and Statistics classes. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1311677 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java2
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java81
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java228
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java228
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java148
5 files changed, 210 insertions, 477 deletions
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java
index 979d2ef76f..4e79dd62a8 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java
@@ -27,6 +27,8 @@ package org.apache.qpid.tools;
public class Clock
{
+ public final static long SEC = 60000;
+
private static Precision precision;
private static long offset = -1; // in nano secs
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java
index 121e94cea1..097b021b3e 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java
@@ -21,7 +21,6 @@
package org.apache.qpid.tools;
import java.net.InetAddress;
-import java.text.DecimalFormat;
import java.util.UUID;
import javax.jms.Connection;
@@ -32,14 +31,17 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.qpid.client.AMQAnyDestination;
-import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession_0_10;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.messaging.Address;
+import org.apache.qpid.tools.TestConfiguration.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class PerfBase
+public class MercuryBase
{
+ private static final Logger _logger = LoggerFactory.getLogger(MercuryBase.class);
+
public final static String CODE = "CODE";
public final static String ID = "ID";
public final static String REPLY_ADDR = "REPLY_ADDR";
@@ -54,14 +56,13 @@ public class PerfBase
String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}");
- TestParams params;
+ TestConfiguration config;
Connection con;
Session session;
Session controllerSession;
Destination dest;
Destination myControlQueue;
Destination controllerQueue;
- DecimalFormat df = new DecimalFormat("###.##");
String id;
String myControlQueueAddr;
@@ -69,7 +70,8 @@ public class PerfBase
MessageConsumer receiveFromController;
String prefix = "";
- enum OPCode {
+ enum OPCode
+ {
REGISTER_CONSUMER, REGISTER_PRODUCER,
PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP,
CONSUMER_READY, PRODUCER_READY,
@@ -79,39 +81,11 @@ public class PerfBase
CONTINUE_TEST, STOP_TEST
};
- enum MessageType {
- BYTES, TEXT, MAP, OBJECT;
-
- public static MessageType getType(String s) throws Exception
- {
- if ("text".equalsIgnoreCase(s))
- {
- return TEXT;
- }
- else if ("bytes".equalsIgnoreCase(s))
- {
- return BYTES;
- }
- /*else if ("map".equalsIgnoreCase(s))
- {
- return MAP;
- }
- else if ("object".equalsIgnoreCase(s))
- {
- return OBJECT;
- }*/
- else
- {
- throw new Exception("Unsupported message type");
- }
- }
- };
-
MessageType msgType = MessageType.BYTES;
- public PerfBase(String prefix)
+ public MercuryBase(TestConfiguration config,String prefix)
{
- params = new TestParams();
+ this.config = config;
String host = "";
try
{
@@ -127,25 +101,16 @@ public class PerfBase
public void setUp() throws Exception
{
- if (params.getHost().equals("") || params.getPort() == -1)
- {
- con = new AMQConnection(params.getUrl());
- }
- else
- {
- con = new AMQConnection(params.getHost(),params.getPort(),"guest","guest","test","test");
- }
+ con = config.createConnection();
con.start();
- session = con.createSession(params.isTransacted(),
- params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode());
controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
dest = createDestination();
- controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR);
+ controllerQueue = AMQDestination.createDestination(CONTROLLER_ADDR);
myControlQueue = session.createQueue(myControlQueueAddr);
- msgType = MessageType.getType(params.getMessageType());
- System.out.println("Using " + msgType + " messages");
+ msgType = MessageType.getType(config.getMessageType());
+ _logger.debug("Using " + msgType + " messages");
sendToController = controllerSession.createProducer(controllerQueue);
receiveFromController = controllerSession.createConsumer(myControlQueue);
@@ -153,11 +118,11 @@ public class PerfBase
private Destination createDestination() throws Exception
{
- if (params.isUseUniqueDests())
+ if (config.isUseUniqueDests())
{
- System.out.println("Prefix : " + prefix);
- Address addr = Address.parse(params.getAddress());
- AMQAnyDestination temp = new AMQAnyDestination(params.getAddress());
+ _logger.debug("Prefix : " + prefix);
+ Address addr = Address.parse(config.getAddress());
+ AMQDestination temp = (AMQDestination) AMQDestination.createDestination(config.getAddress());
int type = ((AMQSession_0_10)session).resolveAddressType(temp);
if ( type == AMQDestination.TOPIC_TYPE)
@@ -171,11 +136,11 @@ public class PerfBase
System.out.println("Setting name : " + addr);
}
- return new AMQAnyDestination(addr);
+ return AMQDestination.createDestination(addr.toString());
}
else
{
- return new AMQAnyDestination(params.getAddress());
+ return AMQDestination.createDestination(config.getAddress());
}
}
@@ -190,7 +155,7 @@ public class PerfBase
{
MapMessage m = (MapMessage)receiveFromController.receive();
OPCode code = OPCode.values()[m.getInt(CODE)];
- System.out.println("Received Code : " + code);
+ _logger.debug("Received Code : " + code);
if (expected != code)
{
throw new Exception("Expected OPCode : " + expected + " but received : " + code);
@@ -202,7 +167,7 @@ public class PerfBase
{
MapMessage m = (MapMessage)receiveFromController.receive();
OPCode code = OPCode.values()[m.getInt(CODE)];
- System.out.println("Received Code : " + code);
+ _logger.debug("Received Code : " + code);
return (code == OPCode.CONTINUE_TEST);
}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java
index b63892bb51..b35adc45d6 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java
@@ -20,18 +20,15 @@
*/
package org.apache.qpid.tools;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.CountDownLatch;
import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.TextMessage;
-import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.thread.Threading;
+import org.apache.qpid.tools.report.MercuryReporter;
+import org.apache.qpid.tools.report.MercuryReporter.MercuryThroughputAndLatency;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* PerfConsumer will receive x no of messages in warmup mode.
@@ -74,39 +71,28 @@ import org.apache.qpid.thread.Threading;
*
*/
-public class PerfConsumer extends PerfBase implements MessageListener
+public class MercuryConsumerController extends MercuryBase
{
- MessageConsumer consumer;
- long maxLatency = 0;
- long minLatency = Long.MAX_VALUE;
- long totalLatency = 0; // to calculate avg latency.
- int rcvdMsgCount = 0;
- long startTime = 0; // to measure consumer throughput
- long rcvdTime = 0;
- boolean transacted = false;
- int transSize = 0;
+ private static final Logger _logger = LoggerFactory.getLogger(MercuryConsumerController.class);
+ MercuryReporter reporter;
+ TestConfiguration config;
+ QpidReceive receiver;
- boolean printStdDev = false;
- List<Long> sample;
-
- final Object lock = new Object();
-
- public PerfConsumer(String prefix)
+ public MercuryConsumerController(TestConfiguration config, MercuryReporter reporter, String prefix)
{
- super(prefix);
- System.out.println("Consumer ID : " + id);
+ super(config,prefix);
+ this.reporter = reporter;
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Consumer ID : " + id);
+ }
}
public void setUp() throws Exception
{
super.setUp();
- consumer = session.createConsumer(dest);
- System.out.println("Consumer: " + id + " Receiving messages from : " + ((AMQDestination)dest).getQueueName() + "\n");
-
- // Storing the following two for efficiency
- transacted = params.isTransacted();
- transSize = params.getTransactionSize();
- printStdDev = params.isPrintStdDev();
+ receiver = new QpidReceive(reporter,config, con,dest);
+ receiver.setUp();
MapMessage m = controllerSession.createMapMessage();
m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal());
sendMessageToController(m);
@@ -115,151 +101,71 @@ public class PerfConsumer extends PerfBase implements MessageListener
public void warmup()throws Exception
{
receiveFromController(OPCode.CONSUMER_STARTWARMUP);
- Message msg = consumer.receive();
- // This is to ensure we drain the queue before we start the actual test.
- while ( msg != null)
- {
- if (msg.getBooleanProperty("End") == true)
- {
- // It's more realistic for the consumer to signal this.
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.PRODUCER_READY.ordinal());
- sendMessageToController(m);
- }
- msg = consumer.receive(1000);
- }
+ receiver.waitforCompletion(config.getWarmupCount());
- if (params.isTransacted())
- {
- session.commit();
- }
+ // It's more realistic for the consumer to signal this.
+ MapMessage m1 = controllerSession.createMapMessage();
+ m1.setInt(CODE, OPCode.PRODUCER_READY.ordinal());
+ sendMessageToController(m1);
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.CONSUMER_READY.ordinal());
- sendMessageToController(m);
- consumer.setMessageListener(this);
+ MapMessage m2 = controllerSession.createMapMessage();
+ m2.setInt(CODE, OPCode.CONSUMER_READY.ordinal());
+ sendMessageToController(m2);
}
- public void startTest() throws Exception
+ public void runReceiver() throws Exception
{
- System.out.println("Consumer: " + id + " Starting test......" + "\n");
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Consumer: " + id + " Starting iteration......" + "\n");
+ }
resetCounters();
+ receiver.waitforCompletion(config.getMsgCount());
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal());
+ sendMessageToController(m);
}
public void resetCounters()
{
- rcvdMsgCount = 0;
- maxLatency = 0;
- minLatency = Long.MAX_VALUE;
- totalLatency = 0;
- if (printStdDev)
- {
- sample = null;
- sample = new ArrayList<Long>(params.getMsgCount());
- }
+ reporter.clear();
}
public void sendResults() throws Exception
{
receiveFromController(OPCode.CONSUMER_STOP);
+ reporter.report();
- double avgLatency = (double)totalLatency/(double)rcvdMsgCount;
- double consRate = (double)rcvdMsgCount*Clock.convertToSecs()/(double)(rcvdTime - startTime);
- double stdDev = 0.0;
- if (printStdDev)
- {
- stdDev = calculateStdDev(avgLatency);
- }
MapMessage m = controllerSession.createMapMessage();
m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal());
- m.setDouble(AVG_LATENCY, avgLatency/Clock.convertToMiliSecs());
- m.setDouble(MIN_LATENCY,minLatency/Clock.convertToMiliSecs());
- m.setDouble(MAX_LATENCY,maxLatency/Clock.convertToMiliSecs());
- m.setDouble(STD_DEV, stdDev/Clock.convertToMiliSecs());
- m.setDouble(CONS_RATE, consRate);
- m.setLong(MSG_COUNT, rcvdMsgCount);
+ m.setDouble(AVG_LATENCY, reporter.getAvgLatency());
+ m.setDouble(MIN_LATENCY, reporter.getMinLatency());
+ m.setDouble(MAX_LATENCY, reporter.getMaxLatency());
+ m.setDouble(STD_DEV, reporter.getStdDev());
+ m.setDouble(CONS_RATE, reporter.getRate());
+ m.setLong(MSG_COUNT, reporter.getSampleSize());
sendMessageToController(m);
- System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
- System.out.println(new StringBuilder("Consumer rate : ").
- append(df.format(consRate)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Avg Latency : ").
- append(df.format(avgLatency/Clock.convertToMiliSecs())).
- append(" ms").toString());
- System.out.println(new StringBuilder("Min Latency : ").
- append(df.format(minLatency/Clock.convertToMiliSecs())).
- append(" ms").toString());
- System.out.println(new StringBuilder("Max Latency : ").
- append(df.format(maxLatency/Clock.convertToMiliSecs())).
- append(" ms").toString());
- if (printStdDev)
+ reporter.log(new StringBuilder("Total Msgs Received : ").append(reporter.getSampleSize()).toString());
+ reporter.log(new StringBuilder("Consumer rate : ").
+ append(config.getDecimalFormat().format(reporter.getRate())).
+ append(" msg/sec").toString());
+ reporter.log(new StringBuilder("Avg Latency : ").
+ append(config.getDecimalFormat().format(reporter.getAvgLatency())).
+ append(" ms").toString());
+ reporter.log(new StringBuilder("Min Latency : ").
+ append(config.getDecimalFormat().format(reporter.getMinLatency())).
+ append(" ms").toString());
+ reporter.log(new StringBuilder("Max Latency : ").
+ append(config.getDecimalFormat().format(reporter.getMaxLatency())).
+ append(" ms").toString());
+ if (config.isPrintStdDev())
{
- System.out.println(new StringBuilder("Std Dev : ").
- append(stdDev/Clock.convertToMiliSecs()).toString());
+ reporter.log(new StringBuilder("Std Dev : ").
+ append(reporter.getStdDev()).toString());
}
}
- public double calculateStdDev(double mean)
- {
- double v = 0;
- for (double latency: sample)
- {
- v = v + Math.pow((latency-mean), 2);
- }
- v = v/sample.size();
- return Math.round(Math.sqrt(v));
- }
-
- public void onMessage(Message msg)
- {
- try
- {
- // To figure out the decoding overhead of text
- if (msgType == MessageType.TEXT)
- {
- ((TextMessage)msg).getText();
- }
-
- if (msg.getBooleanProperty("End"))
- {
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal());
- sendMessageToController(m);
- }
- else
- {
- rcvdTime = Clock.getTime();
- rcvdMsgCount ++;
-
- if (rcvdMsgCount == 1)
- {
- startTime = rcvdTime;
- }
-
- if (transacted && (rcvdMsgCount % transSize == 0))
- {
- session.commit();
- }
-
- long latency = rcvdTime - msg.getLongProperty(TIMESTAMP);
- maxLatency = Math.max(maxLatency, latency);
- minLatency = Math.min(minLatency, latency);
- totalLatency = totalLatency + latency;
- if (printStdDev)
- {
- sample.add(latency);
- }
- }
-
- }
- catch(Exception e)
- {
- handleError(e,"Error when receiving messages");
- }
-
- }
-
public void run()
{
try
@@ -271,7 +177,7 @@ public class PerfConsumer extends PerfBase implements MessageListener
{
System.out.println("=========================================================\n");
System.out.println("Consumer: " + id + " starting a new iteration ......\n");
- startTest();
+ runReceiver();
sendResults();
nextIteration = continueTest();
}
@@ -283,21 +189,22 @@ public class PerfConsumer extends PerfBase implements MessageListener
}
}
- @Override
+ @Override
public void tearDown() throws Exception
{
super.tearDown();
}
- public static void main(String[] args) throws InterruptedException
+ public static void main(String[] args) throws Exception
{
+ TestConfiguration config = new JVMArgConfiguration();
+ MercuryReporter reporter= new MercuryReporter(MercuryThroughputAndLatency.class,System.out,10,true);
String scriptId = (args.length == 1) ? args[0] : "";
- int conCount = Integer.getInteger("con_count",1);
+ int conCount = config.getConnectionCount();
final CountDownLatch testCompleted = new CountDownLatch(conCount);
for (int i=0; i < conCount; i++)
{
-
- final PerfConsumer cons = new PerfConsumer(scriptId + i);
+ final MercuryConsumerController cons = new MercuryConsumerController(config, reporter, scriptId + i);
Runnable r = new Runnable()
{
public void run()
@@ -317,9 +224,8 @@ public class PerfConsumer extends PerfBase implements MessageListener
throw new Error("Error creating consumer thread",e);
}
t.start();
-
}
testCompleted.await();
- System.out.println("Consumers have completed the test......\n");
+ reporter.log("Consumers have completed the test......\n");
}
} \ No newline at end of file
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java
index ac6129ab68..02377bb853 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java
@@ -20,19 +20,15 @@
*/
package org.apache.qpid.tools;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
import java.util.concurrent.CountDownLatch;
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.thread.Threading;
+import org.apache.qpid.tools.report.MercuryReporter;
+import org.apache.qpid.tools.report.MercuryReporter.MercuryThroughput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* PerfProducer sends an x no of messages in warmup mode and wait for a confirmation
@@ -60,216 +56,68 @@ import org.apache.qpid.thread.Threading;
* latency is good.
*
*/
-public class PerfProducer extends PerfBase
+public class MercuryProducerController extends MercuryBase
{
- private static long SEC = 60000;
+ private static final Logger _logger = LoggerFactory.getLogger(MercuryProducerController.class);
+ MercuryReporter reporter;
+ QpidSend sender;
- MessageProducer producer;
- Message msg;
- Object payload;
- List<Object> payloads;
- boolean cacheMsg = false;
- boolean randomMsgSize = false;
- boolean durable = false;
- Random random;
- int msgSizeRange = 1024;
- boolean rateLimitProducer = false;
- double rateFactor = 0.4;
- double rate = 0.0;
-
- public PerfProducer(String prefix)
+ public MercuryProducerController(TestConfiguration config, MercuryReporter reporter, String prefix)
{
- super(prefix);
+ super(config,prefix);
+ this.reporter = reporter;
System.out.println("Producer ID : " + id);
}
public void setUp() throws Exception
{
super.setUp();
- durable = params.isDurable();
- rateLimitProducer = params.getRate() > 0 ? true : false;
- if (rateLimitProducer)
- {
- System.out.println("The test will attempt to limit the producer to " + params.getRate() + " msg/sec");
- }
-
- // if message caching is enabled we pre create the message
- // else we pre create the payload
- if (params.isCacheMessage())
- {
- cacheMsg = true;
- msg = createMessage(createPayload(params.getMsgSize()));
- msg.setJMSDeliveryMode(durable?
- DeliveryMode.PERSISTENT :
- DeliveryMode.NON_PERSISTENT
- );
- }
- else if (params.isRandomMsgSize())
- {
- random = new Random(20080921);
- randomMsgSize = true;
- msgSizeRange = params.getMsgSize();
- payloads = new ArrayList<Object>(msgSizeRange);
-
- for (int i=0; i < msgSizeRange; i++)
- {
- payloads.add(createPayload(i));
- }
- }
- else
- {
- payload = createPayload(params.getMsgSize());
- }
-
- producer = session.createProducer(dest);
- System.out.println("Producer: " + id + " Sending messages to: " + ((AMQDestination)dest).getQueueName());
- producer.setDisableMessageID(params.isDisableMessageID());
- producer.setDisableMessageTimestamp(params.isDisableTimestamp());
-
+ sender = new QpidSend(reporter,config, con,dest);
+ sender.setUp();
MapMessage m = controllerSession.createMapMessage();
m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal());
sendMessageToController(m);
}
- Object createPayload(int size)
- {
- if (msgType == MessageType.TEXT)
- {
- return MessageFactory.createMessagePayload(size);
- }
- else
- {
- return MessageFactory.createMessagePayload(size).getBytes();
- }
- }
-
- Message createMessage(Object payload) throws Exception
- {
- if (msgType == MessageType.TEXT)
- {
- return session.createTextMessage((String)payload);
- }
- else
- {
- BytesMessage m = session.createBytesMessage();
- m.writeBytes((byte[])payload);
- return m;
- }
- }
-
- protected Message getNextMessage() throws Exception
- {
- if (cacheMsg)
- {
- return msg;
- }
- else
- {
- Message m;
-
- if (!randomMsgSize)
- {
- m = createMessage(payload);
- }
- else
- {
- m = createMessage(payloads.get(random.nextInt(msgSizeRange)));
- }
- m.setJMSDeliveryMode(durable?
- DeliveryMode.PERSISTENT :
- DeliveryMode.NON_PERSISTENT
- );
- return m;
- }
- }
-
public void warmup()throws Exception
{
receiveFromController(OPCode.PRODUCER_STARTWARMUP);
- System.out.println("Producer: " + id + " Warming up......");
-
- for (int i=0; i < params.getWarmupCount() -1; i++)
+ if (_logger.isInfoEnabled())
{
- producer.send(getNextMessage());
- }
- sendEndMessage();
-
- if (params.isTransacted())
- {
- session.commit();
+ _logger.info("Producer: " + id + " Warming up......");
}
+ sender.send(config.getWarmupCount());
+ sender.sendEndMessage();
}
- public void startTest() throws Exception
+ public void runSender() throws Exception
{
resetCounters();
receiveFromController(OPCode.PRODUCER_START);
- int count = params.getMsgCount();
- boolean transacted = params.isTransacted();
- int tranSize = params.getTransactionSize();
-
- long limit = (long)(params.getRate() * rateFactor); // in msecs
- long timeLimit = (long)(SEC * rateFactor); // in msecs
-
- long start = Clock.getTime(); // defaults to nano secs
- long interval = start;
- for(int i=0; i < count; i++ )
- {
- Message msg = getNextMessage();
- msg.setLongProperty(TIMESTAMP, Clock.getTime());
- producer.send(msg);
- if ( transacted && ((i+1) % tranSize == 0))
- {
- session.commit();
- }
-
- if (rateLimitProducer && i%limit == 0)
- {
- long elapsed = (Clock.getTime() - interval)*Clock.convertToMiliSecs(); // in msecs
- if (elapsed < timeLimit)
- {
- Thread.sleep(elapsed);
- }
- interval = Clock.getTime();
-
- }
- }
- sendEndMessage();
- if ( transacted)
- {
- session.commit();
- }
- long time = Clock.getTime() - start;
- rate = (double)count*Clock.convertToSecs()/(double)time;
- System.out.println(new StringBuilder("Producer rate: ").
- append(df.format(rate)).
- append(" msg/sec").
- toString());
+ sender.send(config.getMsgCount());
}
public void resetCounters()
{
-
- }
-
- public void sendEndMessage() throws Exception
- {
- Message msg = session.createMessage();
- msg.setBooleanProperty("End", true);
- producer.send(msg);
+ sender.resetCounters();
}
public void sendResults() throws Exception
{
MapMessage msg = controllerSession.createMapMessage();
msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal());
- msg.setDouble(PROD_RATE, rate);
+ msg.setDouble(PROD_RATE, reporter.getRate());
sendMessageToController(msg);
+ reporter.log(new StringBuilder("Producer rate: ").
+ append(config.getDecimalFormat().format(reporter.getRate())).
+ append(" msg/sec").
+ toString());
}
@Override
public void tearDown() throws Exception
{
+ sender.tearDown();
super.tearDown();
}
@@ -282,9 +130,12 @@ public class PerfProducer extends PerfBase
boolean nextIteration = true;
while (nextIteration)
{
- System.out.println("=========================================================\n");
- System.out.println("Producer: " + id + " starting a new iteration ......\n");
- startTest();
+ if(_logger.isInfoEnabled())
+ {
+ _logger.info("=========================================================\n");
+ _logger.info("Producer: " + id + " starting a new iteration ......\n");
+ }
+ runSender();
sendResults();
nextIteration = continueTest();
}
@@ -298,9 +149,9 @@ public class PerfProducer extends PerfBase
public void startControllerIfNeeded()
{
- if (!params.isExternalController())
+ if (!config.isExternalController())
{
- final PerfTestController controller = new PerfTestController();
+ final MercuryTestController controller = new MercuryTestController(config);
Runnable r = new Runnable()
{
public void run()
@@ -322,15 +173,16 @@ public class PerfProducer extends PerfBase
}
}
-
- public static void main(String[] args) throws InterruptedException
+ public static void main(String[] args) throws Exception
{
+ TestConfiguration config = new JVMArgConfiguration();
+ MercuryReporter reporter= new MercuryReporter(MercuryThroughput.class,System.out,10,true);
String scriptId = (args.length == 1) ? args[0] : "";
- int conCount = Integer.getInteger("con_count",1);
+ int conCount = config.getConnectionCount();
final CountDownLatch testCompleted = new CountDownLatch(conCount);
for (int i=0; i < conCount; i++)
{
- final PerfProducer prod = new PerfProducer(scriptId + i);
+ final MercuryProducerController prod = new MercuryProducerController(config, reporter, scriptId + i);
prod.startControllerIfNeeded();
Runnable r = new Runnable()
{
@@ -353,6 +205,6 @@ public class PerfProducer extends PerfBase
t.start();
}
testCompleted.await();
- System.out.println("Producers have completed the test......");
+ reporter.log("Producers have completed the test......");
}
} \ No newline at end of file
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java
index 5fca1fa4bd..8c66a1e44d 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java
@@ -33,6 +33,9 @@ import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import org.apache.qpid.client.message.AMQPEncodedMapMessage;
+import org.apache.qpid.tools.report.Reporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The Controller coordinates a test run between a number
@@ -62,8 +65,10 @@ import org.apache.qpid.client.message.AMQPEncodedMapMessage;
* System throughput is calculated as follows
* totalMsgCount/(totalTestTime)
*/
-public class PerfTestController extends PerfBase implements MessageListener
+public class MercuryTestController extends MercuryBase implements MessageListener
{
+ private static final Logger _logger = LoggerFactory.getLogger(MercuryProducerController.class);
+
enum TestMode { SINGLE_RUN, TIME_BASED };
TestMode testMode = TestMode.SINGLE_RUN;
@@ -102,11 +107,13 @@ public class PerfTestController extends PerfBase implements MessageListener
private MessageConsumer consumer;
private boolean printStdDev = false;
- FileWriter writer;
+ private FileWriter writer;
+ private Reporter report;
- public PerfTestController()
+ public MercuryTestController(TestConfiguration config)
{
- super("");
+ super(config,"");
+
consumers = new ConcurrentHashMap<String,MapMessage>(consumerCount);
producers = new ConcurrentHashMap<String,MapMessage>(producerCount);
@@ -114,7 +121,7 @@ public class PerfTestController extends PerfBase implements MessageListener
prodRegistered = new CountDownLatch(producerCount);
consReady = new CountDownLatch(consumerCount);
prodReady = new CountDownLatch(producerCount);
- printStdDev = params.isPrintStdDev();
+ printStdDev = config.isPrintStdDev();
testMode = (duration == -1) ? TestMode.SINGLE_RUN : TestMode.TIME_BASED;
}
@@ -126,28 +133,28 @@ public class PerfTestController extends PerfBase implements MessageListener
writer = new FileWriter("stats-csv.log");
}
consumer = controllerSession.createConsumer(controllerQueue);
- System.out.println("\nController: " + producerCount + " producers are expected");
- System.out.println("Controller: " + consumerCount + " consumers are expected \n");
+ report.log("\nController: " + producerCount + " producers are expected");
+ report.log("Controller: " + consumerCount + " consumers are expected \n");
consumer.setMessageListener(this);
consRegistered.await();
prodRegistered.await();
- System.out.println("\nController: All producers and consumers have registered......\n");
+ report.log("\nController: All producers and consumers have registered......\n");
}
public void warmup() throws Exception
{
- System.out.println("Controller initiating warm up sequence......");
+ report.log("Controller initiating warm up sequence......");
sendMessageToNodes(OPCode.CONSUMER_STARTWARMUP,consumers.values());
sendMessageToNodes(OPCode.PRODUCER_STARTWARMUP,producers.values());
prodReady.await();
consReady.await();
- System.out.println("\nController : All producers and consumers are ready to start the test......\n");
+ report.log("\nController : All producers and consumers are ready to start the test......\n");
}
public void startTest() throws Exception
{
resetCounters();
- System.out.println("\nController Starting test......");
+ report.log("\nController Starting test......");
long start = Clock.getTime();
sendMessageToNodes(OPCode.PRODUCER_START,producers.values());
receivedEndMsg.await();
@@ -200,7 +207,7 @@ public class PerfTestController extends PerfBase implements MessageListener
}
catch(Exception e)
{
- System.out.println("Error calculating stats from Consumer : " + conStat);
+ System.err.println("Error calculating stats from Consumer : " + conStat);
}
@@ -217,7 +224,7 @@ public class PerfTestController extends PerfBase implements MessageListener
}
catch(Exception e)
{
- System.out.println("Error calculating stats from Producer : " + conStat);
+ System.err.println("Error calculating stats from Producer : " + conStat);
}
avgSystemLatency = totLatency/consumers.size();
@@ -225,56 +232,56 @@ public class PerfTestController extends PerfBase implements MessageListener
avgSystemConsRate = totalConsRate/consumers.size();
avgSystemProdRate = totalProdRate/producers.size();
- System.out.println("Total test time : " + totalTestTime + " in " + Clock.getPrecision());
+ report.log("Total test time : " + totalTestTime + " in " + Clock.getPrecision());
totalSystemThroughput = (totalMsgCount*Clock.convertToSecs()/totalTestTime);
}
public void printResults() throws Exception
{
- System.out.println(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString());
- System.out.println(new StringBuilder("System Throughput : ").
- append(df.format(totalSystemThroughput)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Avg Consumer rate : ").
- append(df.format(avgSystemConsRate)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Min Consumer rate : ").
- append(df.format(minSystemConsRate)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Max Consumer rate : ").
- append(df.format(maxSystemConsRate)).
- append(" msg/sec").toString());
-
- System.out.println(new StringBuilder("Avg Producer rate : ").
- append(df.format(avgSystemProdRate)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Min Producer rate : ").
- append(df.format(minSystemProdRate)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Max Producer rate : ").
- append(df.format(maxSystemProdRate)).
- append(" msg/sec").toString());
-
- System.out.println(new StringBuilder("Avg System Latency : ").
- append(df.format(avgSystemLatency)).
- append(" ms").toString());
- System.out.println(new StringBuilder("Min System Latency : ").
- append(df.format(minSystemLatency)).
- append(" ms").toString());
- System.out.println(new StringBuilder("Max System Latency : ").
- append(df.format(maxSystemLatency)).
- append(" ms").toString());
+ report.log(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString());
+ report.log(new StringBuilder("System Throughput : ").
+ append(config.getDecimalFormat().format(totalSystemThroughput)).
+ append(" msg/sec").toString());
+ report.log(new StringBuilder("Avg Consumer rate : ").
+ append(config.getDecimalFormat().format(avgSystemConsRate)).
+ append(" msg/sec").toString());
+ report.log(new StringBuilder("Min Consumer rate : ").
+ append(config.getDecimalFormat().format(minSystemConsRate)).
+ append(" msg/sec").toString());
+ report.log(new StringBuilder("Max Consumer rate : ").
+ append(config.getDecimalFormat().format(maxSystemConsRate)).
+ append(" msg/sec").toString());
+
+ report.log(new StringBuilder("Avg Producer rate : ").
+ append(config.getDecimalFormat().format(avgSystemProdRate)).
+ append(" msg/sec").toString());
+ report.log(new StringBuilder("Min Producer rate : ").
+ append(config.getDecimalFormat().format(minSystemProdRate)).
+ append(" msg/sec").toString());
+ report.log(new StringBuilder("Max Producer rate : ").
+ append(config.getDecimalFormat().format(maxSystemProdRate)).
+ append(" msg/sec").toString());
+
+ report.log(new StringBuilder("Avg System Latency : ").
+ append(config.getDecimalFormat().format(avgSystemLatency)).
+ append(" ms").toString());
+ report.log(new StringBuilder("Min System Latency : ").
+ append(config.getDecimalFormat().format(minSystemLatency)).
+ append(" ms").toString());
+ report.log(new StringBuilder("Max System Latency : ").
+ append(config.getDecimalFormat().format(maxSystemLatency)).
+ append(" ms").toString());
if (printStdDev)
{
- System.out.println(new StringBuilder("Avg System Std Dev : ").
- append(avgSystemLatencyStdDev));
+ report.log(new StringBuilder("Avg System Std Dev : ").
+ append(avgSystemLatencyStdDev).toString());
}
}
private synchronized void sendMessageToNodes(OPCode code,Collection<MapMessage> nodes) throws Exception
{
- System.out.println("\nController: Sending code " + code);
+ report.log("\nController: Sending code " + code);
MessageProducer tmpProd = controllerSession.createProducer(null);
MapMessage msg = controllerSession.createMapMessage();
msg.setInt(CODE, code.ordinal());
@@ -282,11 +289,11 @@ public class PerfTestController extends PerfBase implements MessageListener
{
if (node.getString(REPLY_ADDR) == null)
{
- System.out.println("REPLY_ADDR is null " + node);
+ report.log("REPLY_ADDR is null " + node);
}
else
{
- System.out.println("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR));
+ report.log("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR));
}
tmpProd.send(controllerSession.createQueue(node.getString(REPLY_ADDR)), msg);
}
@@ -299,16 +306,16 @@ public class PerfTestController extends PerfBase implements MessageListener
MapMessage m = (MapMessage)msg;
OPCode code = OPCode.values()[m.getInt(CODE)];
- System.out.println("\n---------Controller Received Code : " + code);
- System.out.println("---------Data : " + ((AMQPEncodedMapMessage)m).getMap());
+ report.log("\n---------Controller Received Code : " + code);
+ report.log("---------Data : " + ((AMQPEncodedMapMessage)m).getMap());
switch (code)
{
case REGISTER_CONSUMER :
if (consRegistered.getCount() == 0)
{
- System.out.println("Warning : Expected number of consumers have already registered," +
- "ignoring extra consumer");
+ report.log("Warning : Expected number of consumers have already registered," +
+ "ignoring extra consumer");
break;
}
consumers.put(m.getString(ID),m);
@@ -318,8 +325,8 @@ public class PerfTestController extends PerfBase implements MessageListener
case REGISTER_PRODUCER :
if (prodRegistered.getCount() == 0)
{
- System.out.println("Warning : Expected number of producers have already registered," +
- "ignoring extra producer");
+ report.log("Warning : Expected number of producers have already registered," +
+ "ignoring extra producer");
break;
}
producers.put(m.getString(ID),m);
@@ -403,7 +410,7 @@ public class PerfTestController extends PerfBase implements MessageListener
@Override
public void tearDown() throws Exception {
- System.out.println("Controller: Completed the test......\n");
+ report.log("Controller: Completed the test......\n");
if (testMode == TestMode.TIME_BASED)
{
writer.close();
@@ -416,16 +423,16 @@ public class PerfTestController extends PerfBase implements MessageListener
public void writeStatsToFile() throws Exception
{
writer.append(String.valueOf(totalMsgCount)).append(",");
- writer.append(df.format(totalSystemThroughput)).append(",");
- writer.append(df.format(avgSystemConsRate)).append(",");
- writer.append(df.format(minSystemConsRate)).append(",");
- writer.append(df.format(maxSystemConsRate)).append(",");
- writer.append(df.format(avgSystemProdRate)).append(",");
- writer.append(df.format(minSystemProdRate)).append(",");
- writer.append(df.format(maxSystemProdRate)).append(",");
- writer.append(df.format(avgSystemLatency)).append(",");
- writer.append(df.format(minSystemLatency)).append(",");
- writer.append(df.format(maxSystemLatency));
+ writer.append(config.getDecimalFormat().format(totalSystemThroughput)).append(",");
+ writer.append(config.getDecimalFormat().format(avgSystemConsRate)).append(",");
+ writer.append(config.getDecimalFormat().format(minSystemConsRate)).append(",");
+ writer.append(config.getDecimalFormat().format(maxSystemConsRate)).append(",");
+ writer.append(config.getDecimalFormat().format(avgSystemProdRate)).append(",");
+ writer.append(config.getDecimalFormat().format(minSystemProdRate)).append(",");
+ writer.append(config.getDecimalFormat().format(maxSystemProdRate)).append(",");
+ writer.append(config.getDecimalFormat().format(avgSystemLatency)).append(",");
+ writer.append(config.getDecimalFormat().format(minSystemLatency)).append(",");
+ writer.append(config.getDecimalFormat().format(maxSystemLatency));
if (printStdDev)
{
writer.append(",").append(String.valueOf(avgSystemLatencyStdDev));
@@ -436,7 +443,8 @@ public class PerfTestController extends PerfBase implements MessageListener
public static void main(String[] args)
{
- PerfTestController controller = new PerfTestController();
+ TestConfiguration config = new JVMArgConfiguration();
+ MercuryTestController controller = new MercuryTestController(config);
controller.run();
}
}