diff options
author | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2007-01-22 16:41:23 +0000 |
---|---|---|
committer | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2007-01-22 16:41:23 +0000 |
commit | ec5bff29cb62c537eebca943d5647de3b994dd40 (patch) | |
tree | a0a0f01dc75d6112c4a91717df94afb37ae11b85 /qpid | |
parent | 1ee7712d55033e1ec39929b9c48dd1e399b9e52a (diff) | |
download | qpid-python-ec5bff29cb62c537eebca943d5647de3b994dd40.tar.gz |
performance Ping tests modified for scalability test. Now tests with multiple queues can be performed.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@498687 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
5 files changed, 174 insertions, 36 deletions
diff --git a/qpid/java/perftests/bin/serviceRequestingClient-createLogFile.sh b/qpid/java/perftests/bin/serviceRequestingClient-createLogFile.sh index 56abe0b1da..c078caf7d1 100755 --- a/qpid/java/perftests/bin/serviceRequestingClient-createLogFile.sh +++ b/qpid/java/perftests/bin/serviceRequestingClient-createLogFile.sh @@ -20,7 +20,8 @@ ##LOGDIR=$QPID_HOME/logs LOGDIR=../logs -LOGFILE=$LOGDIR/perftest.log +date=`date +"%y%m%d%H%M%S"` +LOGFILE=$LOGDIR/perftest.log.$date ## create the log dir if [ ! -d $LOGDIR ]; then diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java index bedd6e3d16..513e1609aa 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java @@ -1,14 +1,19 @@ package org.apache.qpid.ping;
import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.*;
import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.message.TestMessageFactory;
import org.apache.qpid.jms.Session;
+import org.apache.qpid.framing.AMQShortString;
/**
* This abstract class captures functionality that is common to all ping producers. It provides functionality to
@@ -41,6 +46,12 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene /** Holds the producer session, need to create test messages. */
private Session _producerSession;
+
+ /** holds the no of queues the tests will be using to send messages. By default it will be 1 */
+ private int _queueCount;
+ private static AtomicInteger _queueSequenceID = new AtomicInteger();
+ private List<Queue> _queues = new ArrayList<Queue>();
+
/**
* Convenience method for a short pause.
*
@@ -169,6 +180,37 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene this._producerSession = session;
}
+ public int getQueueCount()
+ {
+ return _queueCount;
+ }
+
+ public void setQueueCount(int queueCount)
+ {
+ this._queueCount = queueCount;
+ }
+
+ /**
+ * Creates queues dynamically and adds to the queues list. This is when the test is being done with
+ * multiple queues.
+ * @param queueCount
+ */
+ protected void createQueues(int queueCount)
+ {
+ for (int i = 0; i < queueCount; i++)
+ {
+ AMQShortString name = new AMQShortString("Queue_" + _queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis());
+ AMQQueue queue = new AMQQueue(name, name, false, false, false);
+
+ _queues.add(queue);
+ }
+ }
+
+ protected Queue getQueue(int index)
+ {
+ return _queues.get(index);
+ }
+
/**
* Convenience method to commit the transaction on the session associated with this pinger.
*
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java index 6bb4c08e6d..3c6c42d92b 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java @@ -31,6 +31,8 @@ import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Queue; import java.net.InetAddress; +import java.util.List; +import java.util.ArrayList; /** * This class is used to test sending and receiving messages to (pingQueue) and from a queue (replyQueue). @@ -41,7 +43,7 @@ import java.net.InetAddress; public class TestPingItself extends PingPongProducer { private static final Logger _logger = Logger.getLogger(TestPingItself.class); - + public TestPingItself(String brokerDetails, String username, String password, String virtualpath, String queueName, String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose) throws Exception @@ -49,7 +51,26 @@ public class TestPingItself extends PingPongProducer super(brokerDetails, username, password, virtualpath, queueName, selector, transacted, persistent, messageSize, verbose); } + public TestPingItself(String brokerDetails, String username, String password, String virtualpath, int queueCount, + String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose) + throws Exception + { + super(brokerDetails, username, password, virtualpath, transacted); + setQueueCount(queueCount); + createQueues(queueCount); + + _persistent = persistent; + _messageSize = messageSize; + _verbose = verbose; + + createConsumers(selector); + createProducer(); + } + @Override + /** + * Sets the replyQueue to be the same as ping queue. + */ public void createConsumer(String selector) throws JMSException { // Create a message consumer to get the replies with and register this to be called back by it. @@ -58,8 +79,6 @@ public class TestPingItself extends PingPongProducer consumer.setMessageListener(this); } - - /** * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongBouncer} also needs * to be started to bounce the pings back again. diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index 031a5c5299..0b5f040b90 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -118,7 +118,22 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, protected boolean _verbose = false;
protected Session _consumerSession;
-
+
+ protected PingPongProducer(String brokerDetails, String username, String password, String virtualpath,
+ boolean transacted)
+ throws Exception
+ {
+ // Create a connection to the broker.
+ InetAddress address = InetAddress.getLocalHost();
+ String clientID = address.getHostName() + System.currentTimeMillis();
+
+ setConnection(new AMQConnection(brokerDetails, username, password, clientID, virtualpath));
+
+ // Create transactional or non-transactional sessions, based on the command line arguments.
+ setProducerSession((Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE));
+ _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+ }
+
/**
* Creates a ping pong producer with the specified connection details and type.
*
@@ -137,39 +152,39 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose)
throws Exception
{
- // Create a connection to the broker.
- InetAddress address = InetAddress.getLocalHost();
- String clientID = address.getHostName() + System.currentTimeMillis();
-
- setConnection(new AMQConnection(brokerDetails, username, password, clientID, virtualpath));
-
- // Create transactional or non-transactional sessions, based on the command line arguments.
- setProducerSession((Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE));
- _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
-
- // Create producer and the consumer
- createProducer(queueName, persistent);
- createConsumer(selector);
+ this(brokerDetails, username, password, virtualpath, transacted);
+ _pingQueue = new AMQQueue(queueName);
_persistent = persistent;
_messageSize = messageSize;
_verbose = verbose;
+
+ // Create producer and the consumer
+ createProducer();
+ createConsumer(selector);
}
/**
- * Creates the queue and producer to send the pings on
- * @param queueName
- * @param persistent
+ * Creates the producer to send the pings on. If the tests are with nultiple queues, then producer
+ * is created with null destination, so that any destination can be specified while sending
* @throws JMSException
*/
- public void createProducer(String queueName, boolean persistent) throws JMSException
+ public void createProducer() throws JMSException
{
- // Create a queue and producer to send the pings on.
- if (_pingQueue == null)
- _pingQueue = new AMQQueue(queueName);
- _producer = (MessageProducer) getProducerSession().createProducer(_pingQueue);
+ if (getQueueCount() > 1)
+ {
+ // create producer with initial destination as null for test with multiple queues
+ // In this case, a different destination will be used while sending the message
+ _producer = (MessageProducer) getProducerSession().createProducer(null);
+ }
+ else
+ {
+ // Create a queue and producer to send the pings on.
+ _producer = (MessageProducer) getProducerSession().createProducer(_pingQueue);
+
+ }
_producer.setDisableMessageTimestamp(true);
- _producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
}
/**
@@ -187,6 +202,20 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, consumer.setMessageListener(this);
}
+ /**
+ * Creates consumer instances for each queue. This is used when test is being done with multiple queues.
+ * @param selector
+ * @throws JMSException
+ */
+ public void createConsumers(String selector) throws JMSException
+ {
+ for (int i = 0; i < getQueueCount(); i++)
+ {
+ MessageConsumer consumer = getConsumerSession().createConsumer(getQueue(i), PREFETCH, false, EXCLUSIVE, selector);
+ consumer.setMessageListener(this);
+ }
+ }
+
protected Session getConsumerSession()
{
return _consumerSession;
@@ -296,6 +325,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, if (_verbose)
{
_logger.info(timestampFormatter.format(new Date()) + ": Got reply with correlation id, " + correlationID);
+ //_logger.debug("Received from : " + message.getJMSDestination());
}
// Turn the traffic light to green.
@@ -352,12 +382,20 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, CountDownLatch trafficLight = new CountDownLatch(numPings);
trafficLights.put(messageCorrelationId, trafficLight);
- for (int i = 0; i < numPings; i++)
+ if (getQueueCount() > 1)
{
- // Re-timestamp the message.
- message.setLongProperty("timestamp", System.currentTimeMillis());
-
- _producer.send(message);
+ // If test is with multiple queues
+ pingMultipleQueues(message, numPings);
+ }
+ else
+ {
+ // If test is with one Queue only
+ for (int i = 0; i < numPings; i++)
+ {
+ // Re-timestamp the message.
+ message.setLongProperty("timestamp", System.currentTimeMillis());
+ _producer.send(message);
+ }
}
// Commit the transaction if running in transactional mode. This must happen now, rather than at the end of
@@ -390,6 +428,30 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, }
/**
+ * When the test is being performed with multiple queues, then this method will be used, which has a loop to
+ * pick up the next queue from the queues list and sends message to it.
+ * @param message
+ * @param numPings
+ * @throws JMSException
+ */
+ private void pingMultipleQueues(Message message, int numPings) throws JMSException
+ {
+ int queueIndex = 0;
+ for (int i = 0; i < numPings; i++)
+ {
+ // Re-timestamp the message.
+ message.setLongProperty("timestamp", System.currentTimeMillis());
+ _producer.send(getQueue(queueIndex++), message);
+
+ // reset the counter to get the first queue
+ if (queueIndex == getQueueCount() -1)
+ {
+ queueIndex = 0;
+ }
+ }
+ }
+
+ /**
* Sends the specified ping message but does not wait for a correlating reply.
*
* @param message The message to send.
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java index 7cdfd29120..af3accf530 100644 --- a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java +++ b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java @@ -46,6 +46,9 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll /** Holds the name of the property to get the ping queue name from. */
private static final String PING_QUEUE_NAME_PROPNAME = "pingQueue";
+ /** holds the queue count, if the test is being performed with multiple queues */
+ private static final String PING_QUEUE_COUNT_PROPNAME = "queues";
+
/** Holds the name of the property to get the test delivery mode from. */
private static final String PERSISTENT_MODE_PROPNAME = "persistent";
@@ -92,6 +95,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
setSystemPropertyIfNull(TIMEOUT_PROPNAME, Long.toString(TIMEOUT_DEFAULT));
+ setSystemPropertyIfNull(PING_QUEUE_COUNT_PROPNAME, Integer.toString(1));
}
/** Holds the test ping client. */
@@ -130,7 +134,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll // Fail the test if the timeout was exceeded.
if (numReplies != numPings)
{
- Assert.fail("The ping timed out. Messages Sent = " + numReplies + ", MessagesReceived = " + numPings);
+ Assert.fail("The ping timed out. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies);
}
}
@@ -147,6 +151,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll String username = "guest";
String password = "guest";
String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
+ int queueCount = Integer.parseInt(testParameters.getProperty(PING_QUEUE_COUNT_PROPNAME));
String queueName = testParameters.getProperty(PING_QUEUE_NAME_PROPNAME);
boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
@@ -154,9 +159,18 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll boolean verbose = false;
int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
- // Establish a client to ping a Queue and listen the reply back from same Queue
- _pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath, queueName,
- selector, transacted, persistent, messageSize, verbose);
+ if (queueCount > 1)
+ {
+ // test client with multiple queues
+ _pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath, queueCount,
+ selector, transacted, persistent, messageSize, verbose);
+ }
+ else
+ {
+ // Establish a client to ping a Queue and listen the reply back from same Queue
+ _pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath, queueName,
+ selector, transacted, persistent, messageSize, verbose);
+ }
// Start the client connection
_pingItselfClient.getConnection().start();
|