From ecf56d8f28474ef8609a2b815fd1a8e46458b6b6 Mon Sep 17 00:00:00 2001 From: Bhupendra Bhusman Bhardwaj Date: Mon, 22 Jan 2007 16:41:23 +0000 Subject: performance Ping tests modified for scalability test. Now tests with multiple queues can be performed. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@498687 13f79535-47bb-0310-9956-ffa450edef68 --- .../bin/serviceRequestingClient-createLogFile.sh | 3 +- .../org/apache/qpid/ping/AbstractPingProducer.java | 42 ++++++++ .../java/org/apache/qpid/ping/TestPingItself.java | 25 ++++- .../apache/qpid/requestreply/PingPongProducer.java | 118 ++++++++++++++++----- .../java/org/apache/qpid/ping/PingTestPerf.java | 22 +++- 5 files changed, 174 insertions(+), 36 deletions(-) (limited to 'java') diff --git a/java/perftests/bin/serviceRequestingClient-createLogFile.sh b/java/perftests/bin/serviceRequestingClient-createLogFile.sh index 56abe0b1da..c078caf7d1 100755 --- a/java/perftests/bin/serviceRequestingClient-createLogFile.sh +++ b/java/perftests/bin/serviceRequestingClient-createLogFile.sh @@ -20,7 +20,8 @@ ##LOGDIR=$QPID_HOME/logs LOGDIR=../logs -LOGFILE=$LOGDIR/perftest.log +date=`date +"%y%m%d%H%M%S"` +LOGFILE=$LOGDIR/perftest.log.$date ## create the log dir if [ ! -d $LOGDIR ]; then diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java index bedd6e3d16..513e1609aa 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java @@ -1,14 +1,19 @@ package org.apache.qpid.ping; import java.text.SimpleDateFormat; +import java.util.List; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.*; import org.apache.log4j.Logger; import org.apache.qpid.client.AMQNoConsumersException; +import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.message.TestMessageFactory; import org.apache.qpid.jms.Session; +import org.apache.qpid.framing.AMQShortString; /** * This abstract class captures functionality that is common to all ping producers. It provides functionality to @@ -41,6 +46,12 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene /** Holds the producer session, need to create test messages. */ private Session _producerSession; + + /** holds the no of queues the tests will be using to send messages. By default it will be 1 */ + private int _queueCount; + private static AtomicInteger _queueSequenceID = new AtomicInteger(); + private List _queues = new ArrayList(); + /** * Convenience method for a short pause. * @@ -169,6 +180,37 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene this._producerSession = session; } + public int getQueueCount() + { + return _queueCount; + } + + public void setQueueCount(int queueCount) + { + this._queueCount = queueCount; + } + + /** + * Creates queues dynamically and adds to the queues list. This is when the test is being done with + * multiple queues. + * @param queueCount + */ + protected void createQueues(int queueCount) + { + for (int i = 0; i < queueCount; i++) + { + AMQShortString name = new AMQShortString("Queue_" + _queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis()); + AMQQueue queue = new AMQQueue(name, name, false, false, false); + + _queues.add(queue); + } + } + + protected Queue getQueue(int index) + { + return _queues.get(index); + } + /** * Convenience method to commit the transaction on the session associated with this pinger. * diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java index 6bb4c08e6d..3c6c42d92b 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java @@ -31,6 +31,8 @@ import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Queue; import java.net.InetAddress; +import java.util.List; +import java.util.ArrayList; /** * This class is used to test sending and receiving messages to (pingQueue) and from a queue (replyQueue). @@ -41,7 +43,7 @@ import java.net.InetAddress; public class TestPingItself extends PingPongProducer { private static final Logger _logger = Logger.getLogger(TestPingItself.class); - + public TestPingItself(String brokerDetails, String username, String password, String virtualpath, String queueName, String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose) throws Exception @@ -49,7 +51,26 @@ public class TestPingItself extends PingPongProducer super(brokerDetails, username, password, virtualpath, queueName, selector, transacted, persistent, messageSize, verbose); } + public TestPingItself(String brokerDetails, String username, String password, String virtualpath, int queueCount, + String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose) + throws Exception + { + super(brokerDetails, username, password, virtualpath, transacted); + setQueueCount(queueCount); + createQueues(queueCount); + + _persistent = persistent; + _messageSize = messageSize; + _verbose = verbose; + + createConsumers(selector); + createProducer(); + } + @Override + /** + * Sets the replyQueue to be the same as ping queue. + */ public void createConsumer(String selector) throws JMSException { // Create a message consumer to get the replies with and register this to be called back by it. @@ -58,8 +79,6 @@ public class TestPingItself extends PingPongProducer consumer.setMessageListener(this); } - - /** * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongBouncer} also needs * to be started to bounce the pings back again. diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index 031a5c5299..0b5f040b90 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -118,7 +118,22 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, protected boolean _verbose = false; protected Session _consumerSession; - + + protected PingPongProducer(String brokerDetails, String username, String password, String virtualpath, + boolean transacted) + throws Exception + { + // Create a connection to the broker. + InetAddress address = InetAddress.getLocalHost(); + String clientID = address.getHostName() + System.currentTimeMillis(); + + setConnection(new AMQConnection(brokerDetails, username, password, clientID, virtualpath)); + + // Create transactional or non-transactional sessions, based on the command line arguments. + setProducerSession((Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE)); + _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); + } + /** * Creates a ping pong producer with the specified connection details and type. * @@ -137,39 +152,39 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose) throws Exception { - // Create a connection to the broker. - InetAddress address = InetAddress.getLocalHost(); - String clientID = address.getHostName() + System.currentTimeMillis(); - - setConnection(new AMQConnection(brokerDetails, username, password, clientID, virtualpath)); - - // Create transactional or non-transactional sessions, based on the command line arguments. - setProducerSession((Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE)); - _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); - - // Create producer and the consumer - createProducer(queueName, persistent); - createConsumer(selector); + this(brokerDetails, username, password, virtualpath, transacted); + _pingQueue = new AMQQueue(queueName); _persistent = persistent; _messageSize = messageSize; _verbose = verbose; + + // Create producer and the consumer + createProducer(); + createConsumer(selector); } /** - * Creates the queue and producer to send the pings on - * @param queueName - * @param persistent + * Creates the producer to send the pings on. If the tests are with nultiple queues, then producer + * is created with null destination, so that any destination can be specified while sending * @throws JMSException */ - public void createProducer(String queueName, boolean persistent) throws JMSException + public void createProducer() throws JMSException { - // Create a queue and producer to send the pings on. - if (_pingQueue == null) - _pingQueue = new AMQQueue(queueName); - _producer = (MessageProducer) getProducerSession().createProducer(_pingQueue); + if (getQueueCount() > 1) + { + // create producer with initial destination as null for test with multiple queues + // In this case, a different destination will be used while sending the message + _producer = (MessageProducer) getProducerSession().createProducer(null); + } + else + { + // Create a queue and producer to send the pings on. + _producer = (MessageProducer) getProducerSession().createProducer(_pingQueue); + + } _producer.setDisableMessageTimestamp(true); - _producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); } /** @@ -187,6 +202,20 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, consumer.setMessageListener(this); } + /** + * Creates consumer instances for each queue. This is used when test is being done with multiple queues. + * @param selector + * @throws JMSException + */ + public void createConsumers(String selector) throws JMSException + { + for (int i = 0; i < getQueueCount(); i++) + { + MessageConsumer consumer = getConsumerSession().createConsumer(getQueue(i), PREFETCH, false, EXCLUSIVE, selector); + consumer.setMessageListener(this); + } + } + protected Session getConsumerSession() { return _consumerSession; @@ -296,6 +325,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, if (_verbose) { _logger.info(timestampFormatter.format(new Date()) + ": Got reply with correlation id, " + correlationID); + //_logger.debug("Received from : " + message.getJMSDestination()); } // Turn the traffic light to green. @@ -352,12 +382,20 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, CountDownLatch trafficLight = new CountDownLatch(numPings); trafficLights.put(messageCorrelationId, trafficLight); - for (int i = 0; i < numPings; i++) + if (getQueueCount() > 1) { - // Re-timestamp the message. - message.setLongProperty("timestamp", System.currentTimeMillis()); - - _producer.send(message); + // If test is with multiple queues + pingMultipleQueues(message, numPings); + } + else + { + // If test is with one Queue only + for (int i = 0; i < numPings; i++) + { + // Re-timestamp the message. + message.setLongProperty("timestamp", System.currentTimeMillis()); + _producer.send(message); + } } // Commit the transaction if running in transactional mode. This must happen now, rather than at the end of @@ -389,6 +427,30 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, return numReplies; } + /** + * When the test is being performed with multiple queues, then this method will be used, which has a loop to + * pick up the next queue from the queues list and sends message to it. + * @param message + * @param numPings + * @throws JMSException + */ + private void pingMultipleQueues(Message message, int numPings) throws JMSException + { + int queueIndex = 0; + for (int i = 0; i < numPings; i++) + { + // Re-timestamp the message. + message.setLongProperty("timestamp", System.currentTimeMillis()); + _producer.send(getQueue(queueIndex++), message); + + // reset the counter to get the first queue + if (queueIndex == getQueueCount() -1) + { + queueIndex = 0; + } + } + } + /** * Sends the specified ping message but does not wait for a correlating reply. * diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java index 7cdfd29120..af3accf530 100644 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java @@ -46,6 +46,9 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll /** Holds the name of the property to get the ping queue name from. */ private static final String PING_QUEUE_NAME_PROPNAME = "pingQueue"; + /** holds the queue count, if the test is being performed with multiple queues */ + private static final String PING_QUEUE_COUNT_PROPNAME = "queues"; + /** Holds the name of the property to get the test delivery mode from. */ private static final String PERSISTENT_MODE_PROPNAME = "persistent"; @@ -92,6 +95,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT); setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT); setSystemPropertyIfNull(TIMEOUT_PROPNAME, Long.toString(TIMEOUT_DEFAULT)); + setSystemPropertyIfNull(PING_QUEUE_COUNT_PROPNAME, Integer.toString(1)); } /** Holds the test ping client. */ @@ -130,7 +134,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll // Fail the test if the timeout was exceeded. if (numReplies != numPings) { - Assert.fail("The ping timed out. Messages Sent = " + numReplies + ", MessagesReceived = " + numPings); + Assert.fail("The ping timed out. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies); } } @@ -147,6 +151,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll String username = "guest"; String password = "guest"; String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME); + int queueCount = Integer.parseInt(testParameters.getProperty(PING_QUEUE_COUNT_PROPNAME)); String queueName = testParameters.getProperty(PING_QUEUE_NAME_PROPNAME); boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)); boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME)); @@ -154,9 +159,18 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll boolean verbose = false; int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)); - // Establish a client to ping a Queue and listen the reply back from same Queue - _pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath, queueName, - selector, transacted, persistent, messageSize, verbose); + if (queueCount > 1) + { + // test client with multiple queues + _pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath, queueCount, + selector, transacted, persistent, messageSize, verbose); + } + else + { + // Establish a client to ping a Queue and listen the reply back from same Queue + _pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath, queueName, + selector, transacted, persistent, messageSize, verbose); + } // Start the client connection _pingItselfClient.getConnection().start(); -- cgit v1.2.1