diff options
7 files changed, 676 insertions, 4 deletions
diff --git a/qpid/java/client/src/main/java/client.log4j b/qpid/java/client/src/main/java/client.log4j index c7b6630987..7cab6895dc 100644 --- a/qpid/java/client/src/main/java/client.log4j +++ b/qpid/java/client/src/main/java/client.log4j @@ -23,11 +23,10 @@ log4j.rootLogger=${root.logging.level} #log4j.additivity.org.apache.qpid=false
#log4j.logger.org.apache.qpidity.transport=TRACE, console
-log4j.logger.org.apache.qpid=MAJOR, console
+log4j.logger.org.apache.qpid=ERROR, console
log4j.additivity.org.apache.qpid=false
-log4j.logger.org.apache.qpidity.transport=MAJOR, console
-log4j.logger.org.apache.qpid.testutil.QpidTestCase=MAJOR, console
-
+#log4j.logger.org.apache.qpidity.transport=DEBUG, console
+#log4j.logger.org.apache.qpid.client.message.AbstractBytesTypedMessage=DEBUG, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=all
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/ConnectionUtility.java b/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/ConnectionUtility.java new file mode 100644 index 0000000000..133ef5f854 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/ConnectionUtility.java @@ -0,0 +1,50 @@ +package org.apache.qpid.client.perf; + +import javax.naming.InitialContext; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConnectionUtility +{ + private static final Logger _logger = LoggerFactory.getLogger(ConnectionUtility.class); + + private InitialContext _initialContext; + private AMQConnectionFactory _connectionFactory; + + private static ConnectionUtility _instance = new ConnectionUtility(); + + public static ConnectionUtility getInstance() + { + return _instance; + } + + private InitialContext getInitialContext() throws Exception + { + _logger.info("get InitialContext"); + if (_initialContext == null) + { + _initialContext = new InitialContext(); + } + return _initialContext; + } + + private AMQConnectionFactory getConnectionFactory() throws Exception + { + _logger.info("get ConnectionFactory"); + if (_connectionFactory == null) + { + _connectionFactory = (AMQConnectionFactory) getInitialContext().lookup("local"); + } + return _connectionFactory; + } + + public AMQConnection getConnection() throws Exception + { + _logger.info("get Connection"); + return (AMQConnection)getConnectionFactory().createConnection(); + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java new file mode 100644 index 0000000000..6b6a8a509c --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java @@ -0,0 +1,101 @@ +package org.apache.qpid.client.perf; + +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JMSConsumer implements Runnable +{ + private static final Logger _logger = LoggerFactory.getLogger(JMSConsumer.class); + + private String _id; + private Connection _connection; + private Session _session; + private MessageConsumer _consumer; + private Destination _destination; + private boolean _transacted; + private int _ackMode = Session.AUTO_ACKNOWLEDGE; + private AtomicBoolean _run = new AtomicBoolean(true); + private long _currentMsgCount; + + /* Not implementing transactions for first phase */ + public JMSConsumer(String id,Connection connection, Destination destination,boolean transacted,int ackMode) throws Exception + { + _id = id; + _connection = connection; + _destination = destination; + _transacted = transacted; + _ackMode = ackMode; + } + + public void run() + { + _run.set(true); + + try + { + _session = _connection.createSession(_transacted, _ackMode); + _consumer = _session.createConsumer(_destination); + } + catch(Exception e) + { + _logger.error("Error Setting up JMSProducer:"+ _id, e); + } + + while (_run.get()) + { + try + { + BytesMessage msg = (BytesMessage)_consumer.receive(); + if (msg != null) + { + long msgId = Integer.parseInt(msg.getJMSCorrelationID()); + if (_currentMsgCount+1 != msgId) + { + _logger.error("Error : Message received out of order in JMSConsumer:" + _id + " message id was " + msgId); + } + _currentMsgCount ++; + } + } + catch(Exception e) + { + _logger.error("Error Receiving message from JMSConsumer:" + _id, e); + } + } + try + { + _session.close(); + _connection.close(); + } + catch(Exception e) + { + _logger.error("Error Closing JMSConsumer:"+ _id, e); + } + } + + public void stopConsuming() + { + _run.set(false); + } + + public String getId() + { + return _id; + } + + /* Not worried about synchronizing as accuracy here is not that important. + * So if this method is invoked the count maybe off by a few digits. + * But when the test stops, this will always return the proper count. + */ + public long getCurrentMessageCount() + { + return _currentMsgCount; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSProducer.java new file mode 100644 index 0000000000..c9c5b2b308 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSProducer.java @@ -0,0 +1,96 @@ +package org.apache.qpid.client.perf; + +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.qpid.client.message.TestMessageFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JMSProducer implements Runnable +{ + private static final Logger _logger = LoggerFactory.getLogger(JMSProducer.class); + + private String _id; + private int _messageSize; + private Connection _connection; + private Session _session; + private MessageProducer _producer; + private Destination _destination; + private BytesMessage _payload; + private boolean _transacted; + private int _ackMode = Session.AUTO_ACKNOWLEDGE; + private AtomicBoolean _run = new AtomicBoolean(true); + private long _currentMsgCount; + + /* Not implementing transactions for first phase */ + public JMSProducer(String id,Connection connection, Destination destination,int messageSize, boolean transacted) throws Exception + { + _id = id; + _connection = connection; + _destination = destination; + _messageSize = messageSize; + _transacted = transacted; + } + + public void run() + { + try + { + _session = _connection.createSession(_transacted, _ackMode); + _payload = TestMessageFactory.newBytesMessage(_session, _messageSize); + _producer = _session.createProducer(_destination); + } + catch(Exception e) + { + _logger.error("Error Setting up JMSProducer:"+ _id, e); + } + + while (_run.get()) + { + try + { + _payload.setJMSCorrelationID(String.valueOf(_currentMsgCount+1)); + _producer.send(_payload); + _currentMsgCount ++; + } + catch(Exception e) + { + _logger.error("Error Sending message from JMSProducer:" + _id, e); + } + } + try + { + _session.close(); + _connection.close(); + } + catch(Exception e) + { + _logger.error("Error Closing JMSProducer:"+ _id, e); + } + } + + public void stopProducing() + { + _run.set(false); + } + + public String getId() + { + return _id; + } + + /* Not worried about synchronizing as accuracy here is not that important. + * So if this method is invoked the count maybe off by a few digits. + * But when the test stops, this will always return the proper count. + */ + public long getCurrentMessageCount() + { + return _currentMsgCount; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java b/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java new file mode 100644 index 0000000000..4d6923f0dd --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java @@ -0,0 +1,162 @@ +package org.apache.qpid.client.perf; + +import java.io.FileWriter; +import java.io.RandomAccessFile; +import java.sql.Date; +import java.text.SimpleDateFormat; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Session; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQTopic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MessageConsumerTest extends Options implements Runnable +{ + private static final Logger _logger = LoggerFactory.getLogger(MessageConsumerTest.class); + private SimpleDateFormat df = new SimpleDateFormat("h:mm a"); + + private Map<Integer,JMSConsumer> _consumers = new ConcurrentHashMap<Integer,JMSConsumer>(); + private int _count; + String _logFileName; + private long _gracePeriod = 5 * 60 * 1000; + long _startTime; + long totalMsgCount; + + public void start() throws Exception + { + this.parseOptions(); + boolean useSameDest = true; + _logFileName = _logFilePath + "/MessageConsumerTest_" + System.currentTimeMillis(); + + // use each destination with a different producer + if (_producerCount == destArray.length) + { + useSameDest = false; + } + for (;_count < _producerCount;_count++) + { + createAndStartProducer(useSameDest?destArray[0]:destArray[_count]); + } + } + + private void createAndStartProducer(String routingKey)throws Exception + { + AMQConnection con = ConnectionUtility.getInstance().getConnection(); + con.start(); + Destination dest = new AMQTopic(con,routingKey); + JMSConsumer prod = new JMSConsumer(String.valueOf(_count),(Connection)con, dest, _transacted,Session.AUTO_ACKNOWLEDGE); + Thread t = new Thread(prod); + t.setName("JMSConsumer-"+_count); + t.start(); + _consumers.put(_count, prod); + } + + private void startTimerThread() + { + Thread t = new Thread(this); + t.setName("MessageConsumerTest-TimerThread"); + t.start(); + } + + public void run() + { + boolean run = true; + _startTime = System.currentTimeMillis(); + try + { + while(run) + { + Thread.sleep(_logDuration); + runReaper(false); + + if(System.currentTimeMillis() + _gracePeriod - _startTime > _expiry ) + { + // time to stop the test. + for (Integer id : _consumers.keySet()) + { + JMSConsumer consumer = _consumers.get(id); + consumer.stopConsuming(); + } + runReaper(true); + run = false; + } + } + } + catch (InterruptedException e) + { + _logger.error("The timer thread exited",e); + } + } + + public void runReaper(boolean printSummary) + { + try + { + FileWriter _logFile = new FileWriter(_logFileName + ".csv",true); + for (Integer id : _consumers.keySet()) + { + JMSConsumer prod = _consumers.get(id); + StringBuffer buf = new StringBuffer("JMSConsumer(").append(prod.getId()).append("),"); + Date d = new Date(System.currentTimeMillis()); + buf.append(df.format(d)).append(","); + buf.append(d.getTime()).append(","); + buf.append(prod.getCurrentMessageCount()).append("\n"); + _logFile.write(buf.toString()); + totalMsgCount = totalMsgCount + prod.getCurrentMessageCount(); + } + _logFile.close(); + + FileWriter _memoryLog = new FileWriter(_logFileName + "_memory.csv",true); + StringBuffer buf = new StringBuffer("JMSConsumer,"); + Date d = new Date(System.currentTimeMillis()); + buf.append(df.format(d)).append(","); + buf.append(d.getTime()).append(","); + buf.append(totalMsgCount).append(","); + buf.append(Runtime.getRuntime().totalMemory() -Runtime.getRuntime().freeMemory()).append("\n"); + _memoryLog.write(buf.toString()); + _memoryLog.close(); + if (printSummary) + { + double totaltime = (d.getTime() - _startTime)*1000; // trying to get a per sec rate + double dCount = totalMsgCount; + double ratio = dCount/totaltime; + FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary",true); + buf = new StringBuffer("MessageConsumerTest \n Test started at : "); + buf.append(df.format(new Date(_startTime))).append("\n Test finished at : "); + d = new Date(System.currentTimeMillis()); + buf.append(df.format(d)).append("\n Total Time taken (ms):"); + buf.append(totaltime).append("\n Total messages received:"); + buf.append(totalMsgCount).append("\n Consumer rate:"); + buf.append(ratio).append("\n"); + _summaryLog.write(buf.toString()); + System.out.println("---------- Test Ended -------------"); + _summaryLog.close(); + } + } + catch(Exception e) + { + _logger.error("Error printing info to the log file",e); + } + } + + public static void main(String[] args) + { + try + { + MessageConsumerTest test = new MessageConsumerTest(); + test.start(); + test.startTimerThread(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java b/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java new file mode 100644 index 0000000000..b0caa45845 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java @@ -0,0 +1,159 @@ +package org.apache.qpid.client.perf; + +import java.io.FileWriter; +import java.sql.Date; +import java.text.SimpleDateFormat; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import javax.jms.Connection; +import javax.jms.Destination; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQTopic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MessageProducerTest extends Options implements Runnable +{ + private static final Logger _logger = LoggerFactory.getLogger(MessageProducerTest.class); + private SimpleDateFormat df = new SimpleDateFormat("h:mm a"); + + private Map<Integer,JMSProducer> _producers = new ConcurrentHashMap<Integer,JMSProducer>(); + private int _count; + String _logFileName; + long _startTime; + long totalMsgCount; + + public void start() throws Exception + { + this.parseOptions(); + boolean useSameDest = true; + _logFileName = _logFilePath + "/MessageProducerTest_" + System.currentTimeMillis(); + + // use each destination with a different producer + if (_producerCount == destArray.length) + { + useSameDest = false; + } + for (;_count < _producerCount;_count++) + { + createAndStartProducer(useSameDest?destArray[0]:destArray[_count]); + } + } + + private void createAndStartProducer(String routingKey)throws Exception + { + AMQConnection con = ConnectionUtility.getInstance().getConnection(); + con.start(); + Destination dest = new AMQTopic(con,routingKey); + JMSProducer prod = new JMSProducer(String.valueOf(_count),(Connection)con, dest,_messageSize, _transacted); + Thread t = new Thread(prod); + t.setName("JMSProducer-"+_count); + t.start(); + _producers.put(_count, prod); + } + + private void startTimerThread() + { + Thread t = new Thread(this); + t.setName("MessageProducerTest-TimerThread"); + t.start(); + } + + public void run() + { + boolean run = true; + _startTime = System.currentTimeMillis(); + try + { + while(run) + { + Thread.sleep(_logDuration); + runReaper(false); + + if(System.currentTimeMillis() - _startTime > _expiry ) + { + // time to stop the test. + for (Integer id : _producers.keySet()) + { + JMSProducer prod = _producers.get(id); + prod.stopProducing(); + } + runReaper(true); + run = false; + } + } + } + catch (InterruptedException e) + { + _logger.error("The timer thread exited",e); + } + } + + public void runReaper(boolean printSummary) + { + try + { + FileWriter _logFile = new FileWriter(_logFileName + ".csv",true); + for (Integer id : _producers.keySet()) + { + JMSProducer prod = _producers.get(id); + StringBuffer buf = new StringBuffer("JMSProducer(").append(prod.getId()).append("),"); + Date d = new Date(System.currentTimeMillis()); + buf.append(df.format(d)).append(","); + buf.append(d.getTime()).append(","); + buf.append(prod.getCurrentMessageCount()).append("\n"); + _logFile.write(buf.toString()); + totalMsgCount = totalMsgCount + prod.getCurrentMessageCount(); + } + _logFile.close(); + + FileWriter _memoryLog = new FileWriter(_logFileName + "_memory.csv",true); + StringBuffer buf = new StringBuffer("JMSProducer,"); + Date d = new Date(System.currentTimeMillis()); + buf.append(df.format(d)).append(","); + buf.append(d.getTime()).append(","); + buf.append(totalMsgCount).append(","); + buf.append(Runtime.getRuntime().totalMemory() -Runtime.getRuntime().freeMemory()).append("\n"); + _memoryLog.write(buf.toString()); + _memoryLog.close(); + if (printSummary) + { + double totaltime = (d.getTime() - _startTime)*1000; // trying to get a per sec rate + double dCount = totalMsgCount; + double ratio = dCount/totaltime; + FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary",true); + buf = new StringBuffer("MessageProducerTest \n Test started at : "); + buf.append(df.format(new Date(_startTime))).append("\n Test finished at : "); + d = new Date(System.currentTimeMillis()); + buf.append(df.format(d)).append("\n Total Time taken (ms):"); + buf.append(totaltime).append("\n Total messages sent:"); + buf.append(totalMsgCount).append("\n Producer rate:"); + buf.append(ratio).append("\n"); + _summaryLog.write(buf.toString()); + System.out.println("---------- Test Ended -------------"); + _summaryLog.close(); + } + } + catch(Exception e) + { + _logger.error("Error printing info to the log file",e); + } + } + + public static void main(String[] args) + { + try + { + MessageProducerTest test = new MessageProducerTest(); + test.start(); + test.startTimerThread(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java b/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java new file mode 100644 index 0000000000..1bcf04ae4a --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java @@ -0,0 +1,105 @@ +package org.apache.qpid.client.perf; + +public class Options +{ + int _messageSize; + boolean _transacted; + String[] destArray; + int _producerCount; + int _consumerCount; + long _expiry; + long _logDuration; + String _logFilePath; + + /** + * System props + * -DmessageSize + * -Dtransacted + * -DproducerCount + * -DconsumerCount + * -Ddestinations + * -DlogFilePath + * -Duration=1H,30M,10S + * -DlogDuration=10 in mins + */ + public void parseOptions() + { + _messageSize = Integer.parseInt(System.getProperty("messageSize","100")); + _transacted = false; + String destinations = System.getProperty("destinations"); + destArray = destinations.split(","); + _producerCount = Integer.parseInt(System.getProperty("producerCount","1")); + _consumerCount = Integer.parseInt(System.getProperty("consumerCount","1")); + _logDuration = Long.parseLong(System.getProperty("logDuration","10")); + _logDuration = _logDuration*1000*60; + _logFilePath = System.getProperty("logFilePath"); + _expiry = getExpiry(); + + System.out.println("============= Test Data ==================="); + System.out.println("Total no of producers : " + _producerCount); + System.out.println("Total no of consumer : " + _consumerCount); + System.out.println("Log Frequency in mins : " + _logDuration/(1000*60)); + System.out.println("Log file path : " + _logFilePath); + System.out.println("Test Duration : " + printTestDuration()); + System.out.println("============= /Test Data ==================="); + } + + private String printTestDuration() + { + StringBuffer buf = new StringBuffer(); + int hours = (int)_expiry/(60*60*1000); + int mins = (int)_expiry/(60*1000); + int secs = (int)_expiry/1000; + if (hours > 0) + { + buf.append(hours).append(" hours, "); + } + if (mins > 0) + { + buf.append(mins).append(" mins, "); + } + if (secs > 0) + { + buf.append(secs).append(" secs"); + } + + return buf.toString(); + } + + private long getExpiry() + { + // default is 30 mins + long time = 0; + String s = System.getProperty("duration"); + if(s != null) + { + String[] temp = s.split(","); + for (String st:temp) + { + if(st.indexOf("H")>0) + { + int hour = Integer.parseInt(st.substring(0,st.indexOf("H"))); + time = time + hour * 60 * 60 * 1000; + } + else if(st.indexOf("M")>0) + { + int min = Integer.parseInt(st.substring(0,st.indexOf("M"))); + time = time + min * 60 * 1000; + } + else if(st.indexOf("S")>0) + { + int sec = Integer.parseInt(st.substring(0,st.indexOf("S"))); + time = time + sec * 1000; + } + + } + } + if (time == 0) + { + time = 30 * 60 * 1000; + } + + return time; + } + +} |