summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-01-23 09:39:56 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-01-23 09:39:56 +0000
commit12c8ec5bc9c89993e87e11b626e9d9f88bdd5cdc (patch)
treeb27f262399914f32d65fa2f16d0eed249420091b
parent601fa8a026e7488bbb67e3f809f8255e2e6aeedd (diff)
downloadqpid-python-12c8ec5bc9c89993e87e11b626e9d9f88bdd5cdc.tar.gz
Added ability to cause failover before/after commit/sends
Added batch size ability. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@498965 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/perftests/distribution/pom.xml16
-rw-r--r--qpid/java/perftests/distribution/src/main/assembly/performance.xml7
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java47
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java183
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java53
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java64
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java93
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java208
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java158
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java117
10 files changed, 726 insertions, 220 deletions
diff --git a/qpid/java/perftests/distribution/pom.xml b/qpid/java/perftests/distribution/pom.xml
index 4f637715d1..010f19c9f0 100644
--- a/qpid/java/perftests/distribution/pom.xml
+++ b/qpid/java/perftests/distribution/pom.xml
@@ -44,8 +44,22 @@
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-perftests</artifactId>
+ <type>jar</type>
+ <version>${pom.version}</version>
</dependency>
- </dependencies>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-perftests</artifactId>
+ <type>test-jar</type>
+ <version>${pom.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>uk.co.thebadgerset</groupId>
+ <artifactId>junit-toolkit</artifactId>
+ <version>0.4</version>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
<build>
<pluginManagement>
diff --git a/qpid/java/perftests/distribution/src/main/assembly/performance.xml b/qpid/java/perftests/distribution/src/main/assembly/performance.xml
index 0bf7efa21e..a564261a24 100644
--- a/qpid/java/perftests/distribution/src/main/assembly/performance.xml
+++ b/qpid/java/perftests/distribution/src/main/assembly/performance.xml
@@ -74,6 +74,13 @@
<include>**/*.log4j</include>
</includes>
</fileSet>
+ <fileSet>
+ <directory>../src/test</directory>
+ <outputDirectory>qpid-${qpid.version}/src</outputDirectory>
+ <includes>
+ <include>**/*.java</include>
+ </includes>
+ </fileSet>
<!-- Metadata Jar -->
<fileSet>
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java
index 7c82710a3f..c04a8a7d96 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java
@@ -1,6 +1,7 @@
package org.apache.qpid.ping;
import java.text.SimpleDateFormat;
+import java.io.IOException;
import javax.jms.Connection;
import javax.jms.JMSException;
@@ -13,7 +14,7 @@ import org.apache.qpid.jms.Session;
/**
* Provides functionality common to all ping clients. Provides the ability to manage a session and a convenience method
* to commit on the current transaction.
- *
+ * <p/>
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Commit the current transcation.
@@ -29,6 +30,9 @@ public abstract class AbstractPingClient
private static final Logger _logger = Logger.getLogger(TestPingClient.class);
private AMQConnection _connection;
+ protected boolean _failBeforeCommit = false;
+ protected boolean _failAfterCommit = false;
+
public AMQConnection getConnection()
{
return _connection;
@@ -50,7 +54,20 @@ public abstract class AbstractPingClient
{
try
{
+ if (_failBeforeCommit)
+ {
+ _logger.trace("Failing Before Commit");
+ doFailover();
+ }
+
session.commit();
+
+ if (_failAfterCommit)
+ {
+ _logger.trace("Failing After Commit");
+ doFailover();
+ }
+
_logger.trace("Session Commited.");
}
catch (JMSException e)
@@ -72,4 +89,32 @@ public abstract class AbstractPingClient
}
}
}
+
+ protected void doFailover(String broker)
+ {
+ System.out.println("Kill Broker " + broker + " now.");
+ try
+ {
+ System.in.read();
+ }
+ catch (IOException e)
+ {
+ }
+ System.out.println("Continuing.");
+ }
+
+ protected void doFailover()
+ {
+ System.out.println("Kill Broker now.");
+ try
+ {
+ System.in.read();
+ }
+ catch (IOException e)
+ {
+ }
+ System.out.println("Continuing.");
+
+ }
+
}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
index 1877a23056..4cca77a70e 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
@@ -2,17 +2,23 @@ package org.apache.qpid.ping;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
+import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.BasicMessageProducer;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.message.TestMessageFactory;
+import org.apache.qpid.jms.*;
import org.apache.qpid.jms.Session;
import org.apache.qpid.framing.AMQShortString;
@@ -20,7 +26,7 @@ import org.apache.qpid.framing.AMQShortString;
* This abstract class captures functionality that is common to all ping producers. It provides functionality to
* manage a session, and a convenience method to commit a transaction on the session. It also provides a framework
* for running a ping loop, and terminating that loop on exceptions or a shutdown handler.
- *
+ * <p/>
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Manage the connection.
@@ -35,25 +41,49 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
{
private static final Logger _logger = Logger.getLogger(AbstractPingProducer.class);
- /** Used to format time stamping output. */
+ /**
+ * Used to format time stamping output.
+ */
protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
- /** Used to tell the ping loop when to terminate, it only runs while this is true. */
+ /**
+ * Used to tell the ping loop when to terminate, it only runs while this is true.
+ */
protected boolean _publish = true;
- /** Holds the connection handle to the broker. */
+ /**
+ * Holds the connection handle to the broker.
+ */
private Connection _connection;
- /** Holds the producer session, need to create test messages. */
+ /**
+ * Holds the producer session, need to create test messages.
+ */
private Session _producerSession;
- /** holds the no of queues the tests will be using to send messages. By default it will be 1 */
- private int _queueCount;
+ /**
+ * holds the no of queues the tests will be using to send messages. By default it will be 1
+ */
+ protected int _queueCount;
private static AtomicInteger _queueSequenceID = new AtomicInteger();
private List<Queue> _queues = new ArrayList<Queue>();
/**
+ * Holds the message producer to send the pings through.
+ */
+ protected org.apache.qpid.jms.MessageProducer _producer;
+
+ protected boolean _failBeforeCommit = false;
+ protected boolean _failAfterCommit = false;
+ protected boolean _failBeforeSend = false;
+ protected boolean _failAfterSend = false;
+
+ protected int _sentMessages = 0;
+ protected int _batchSize = 1;
+
+
+ /**
* Convenience method for a short pause.
*
* @param sleepTime The time in milliseconds to pause for.
@@ -67,7 +97,8 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
Thread.sleep(sleepTime);
}
catch (InterruptedException ie)
- { }
+ {
+ }
}
}
@@ -78,9 +109,7 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
*
* @param replyQueue The reply-to destination for the message.
* @param messageSize The desired size of the message in bytes.
- *
* @return A freshly generated test message.
- *
* @throws javax.jms.JMSException All underlying JMSException are allowed to fall through.
*/
public ObjectMessage getTestMessage(Queue replyQueue, int messageSize, boolean persistent) throws JMSException
@@ -153,12 +182,12 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
public Thread getShutdownHook()
{
return new Thread(new Runnable()
+ {
+ public void run()
{
- public void run()
- {
- stop();
- }
- });
+ stop();
+ }
+ });
}
public Connection getConnection()
@@ -181,6 +210,12 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
this._producerSession = session;
}
+
+ protected void commitTx() throws JMSException
+ {
+ commitTx(getProducerSession());
+ }
+
public int getQueueCount()
{
return _queueCount;
@@ -194,6 +229,7 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
/**
* Creates queues dynamically and adds to the queues list. This is when the test is being done with
* multiple queues.
+ *
* @param queueCount
*/
protected void createQueues(int queueCount)
@@ -202,7 +238,7 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
{
AMQShortString name = new AMQShortString("Queue_" + _queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis());
AMQQueue queue = new AMQQueue(name, name, false, false, false);
-
+
_queues.add(queue);
}
}
@@ -219,36 +255,107 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
*/
protected void commitTx(Session session) throws JMSException
{
- if (session.getTransacted())
+ if ((++_sentMessages % _batchSize) == 0)
{
- try
+ if (session.getTransacted())
{
- session.commit();
- _logger.trace("Session Commited.");
- }
- catch (JMSException e)
- {
- _logger.trace("JMSException on commit:" + e.getMessage(), e);
-
- // Warn that the bounce back client is not available.
- if (e.getLinkedException() instanceof AMQNoConsumersException)
- {
- _logger.debug("No consumers on queue.");
- }
-
try
{
- session.rollback();
- _logger.trace("Message rolled back.");
+ if (_failBeforeCommit)
+ {
+ _logger.trace("Failing Before Commit");
+ doFailover();
+ }
+
+ session.commit();
+
+ if (_failAfterCommit)
+ {
+ _logger.trace("Failing After Commit");
+ doFailover();
+ }
+ _logger.trace("Session Commited.");
}
- catch (JMSException jmse)
+ catch (JMSException e)
{
- _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
-
- // Both commit and rollback failed. Throw the rollback exception.
- throw jmse;
+ _logger.trace("JMSException on commit:" + e.getMessage(), e);
+
+ // Warn that the bounce back client is not available.
+ if (e.getLinkedException() instanceof AMQNoConsumersException)
+ {
+ _logger.debug("No consumers on queue.");
+ }
+
+ try
+ {
+ session.rollback();
+ _logger.trace("Message rolled back.");
+ }
+ catch (JMSException jmse)
+ {
+ _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
+
+ // Both commit and rollback failed. Throw the rollback exception.
+ throw jmse;
+ }
}
}
}
}
+
+ protected void sendMessage(Message message) throws JMSException
+ {
+ sendMessage(null, message);
+ }
+
+ protected void sendMessage(Queue q, Message message) throws JMSException
+ {
+ if (_failBeforeSend)
+ {
+ _logger.trace("Failing Before Send");
+ doFailover();
+ }
+
+ if (q == null)
+ {
+ _producer.send(message);
+ }
+ else
+ {
+ _producer.send(q, message);
+ }
+
+ if (_failAfterSend)
+ {
+ _logger.trace("Failing After Send");
+ doFailover();
+ }
+ }
+
+ protected void doFailover(String broker)
+ {
+ System.out.println("Kill Broker " + broker + " now then press return");
+ try
+ {
+ System.in.read();
+ }
+ catch (IOException e)
+ {
+ }
+ System.out.println("Continuing.");
+ }
+
+ protected void doFailover()
+ {
+ System.out.println("Kill Broker now then press return");
+ try
+ {
+ System.in.read();
+ }
+ catch (IOException e)
+ {
+ }
+ System.out.println("Continuing.");
+
+ }
}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java
index 43a010d8ef..949ace20e1 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java
@@ -34,11 +34,11 @@ import org.apache.qpid.jms.Session;
* PingClient is a message listener that received time stamped ping messages. It can work out how long a ping took,
* provided that its clokc is synchronized to that of the ping producer, or by running it on the same machine (or jvm)
* as the ping producer.
- *
+ * <p/>
* <p/>There is a verbose mode flag which causes information about each ping to be output to the console
* (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should
* be disabled for real timing tests as writing to the console will slow things down.
- *
+ * <p/>
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Provide command line invocation to start the ping consumer on a configurable broker url.
@@ -50,10 +50,14 @@ class TestPingClient extends AbstractPingClient implements MessageListener
{
private static final Logger _logger = Logger.getLogger(TestPingClient.class);
- /** Used to indicate that the reply generator should log timing info to the console (logger info level). */
+ /**
+ * Used to indicate that the reply generator should log timing info to the console (logger info level).
+ */
private boolean _verbose = false;
- /** The producer session. */
+ /**
+ * The producer session.
+ */
private Session _consumerSession;
/**
@@ -67,11 +71,11 @@ class TestPingClient extends AbstractPingClient implements MessageListener
* @param transacted
* @param selector
* @param verbose
- *
- * @throws Exception All underlying exceptions allowed to fall through. This is only test code...
+ * @param afterCommit
+ *@param beforeCommit @throws Exception All underlying exceptions allowed to fall through. This is only test code...
*/
public TestPingClient(String brokerDetails, String username, String password, String queueName, String virtualpath,
- boolean transacted, String selector, boolean verbose) throws Exception
+ boolean transacted, String selector, boolean verbose, boolean afterCommit, boolean beforeCommit) throws Exception
{
// Create a connection to the broker.
InetAddress address = InetAddress.getLocalHost();
@@ -85,10 +89,15 @@ class TestPingClient extends AbstractPingClient implements MessageListener
// Connect a consumer to the ping queue and register this to be called back by it.
Queue q = new AMQQueue(queueName);
MessageConsumer consumer = _consumerSession.createConsumer(q, 1, false, false, selector);
+
consumer.setMessageListener(this);
// Hang on to the verbose flag setting.
_verbose = verbose;
+
+ // Set failover interrupts
+ _failAfterCommit = afterCommit;
+ _failBeforeCommit = beforeCommit;
}
/**
@@ -104,7 +113,7 @@ class TestPingClient extends AbstractPingClient implements MessageListener
if (args.length < 4)
{
System.out.println(
- "Usage: brokerdetails username password virtual-path [queueName] [verbose] [transacted] [selector]");
+ "Usage: brokerdetails username password virtual-path [queueName] [verbose] [transacted] [selector] [failover:<before|after>:commit]");
System.exit(1);
}
@@ -118,14 +127,38 @@ class TestPingClient extends AbstractPingClient implements MessageListener
boolean transacted = (args.length >= 7) ? Boolean.parseBoolean(args[6]) : false;
String selector = (args.length == 8) ? args[7] : null;
+ boolean afterCommit = false;
+ boolean beforeCommit = false;
+
+ for (String arg : args)
+ {
+ if (arg.startsWith("failover:"))
+ {
+ //failover:<before|after>:<send:commit>
+ String[] parts = arg.split(":");
+ if (parts.length == 3)
+ {
+ if (parts[2].equals("commit"))
+ {
+ afterCommit = parts[1].equals("after");
+ beforeCommit = parts[1].equals("before");
+ }
+ }
+ else
+ {
+ System.out.println("Unrecognized failover request:" + arg);
+ }
+ }
+ }
+
// Create the test ping client and set it running.
TestPingClient pingClient =
- new TestPingClient(brokerDetails, username, password, queueName, virtualpath, transacted, selector, verbose);
+ new TestPingClient(brokerDetails, username, password, queueName, virtualpath, transacted, selector, verbose, afterCommit, beforeCommit);
+
pingClient.getConnection().start();
System.out.println("Waiting...");
}
-
/**
* This is a callback method that is notified of all messages for which this has been registered as a message
* listener on a message consumer.
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
index 3c6c42d92b..579816870f 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
@@ -45,18 +45,24 @@ public class TestPingItself extends PingPongProducer
private static final Logger _logger = Logger.getLogger(TestPingItself.class);
public TestPingItself(String brokerDetails, String username, String password, String virtualpath, String queueName,
- String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose)
+ String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose,
+ boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend,
+ int batchSize)
throws Exception
{
- super(brokerDetails, username, password, virtualpath, queueName, selector, transacted, persistent, messageSize, verbose);
+ super(brokerDetails, username, password, virtualpath, queueName, selector, transacted, persistent, messageSize,
+ verbose, afterCommit, beforeCommit, afterSend, beforeSend, batchSize, 0);
}
- public TestPingItself(String brokerDetails, String username, String password, String virtualpath, int queueCount,
- String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose)
+ public TestPingItself(String brokerDetails, String username, String password, String virtualpath,
+ String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose,
+ boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend,
+ int batchSize, int queueCount)
throws Exception
{
- super(brokerDetails, username, password, virtualpath, transacted);
- setQueueCount(queueCount);
+ super(brokerDetails, username, password, virtualpath, null, null, transacted, persistent, messageSize,
+ verbose, afterCommit, beforeCommit, afterSend, beforeSend, batchSize, queueCount);
+
createQueues(queueCount);
_persistent = persistent;
@@ -82,7 +88,7 @@ public class TestPingItself extends PingPongProducer
/**
* Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongBouncer} also needs
* to be started to bounce the pings back again.
- *
+ * <p/>
* <p/>The command line takes from 2 to 4 arguments:
* <p/><table>
* <tr><td>brokerDetails <td> The broker connection string.
@@ -99,7 +105,7 @@ public class TestPingItself extends PingPongProducer
if (args.length < 2)
{
System.err.println("Usage: TestPingPublisher <brokerDetails> <virtual path> [verbose (true/false)] " +
- "[transacted (true/false)] [persistent (true/false)] [message size in bytes]");
+ "[transacted (true/false)] [persistent (true/false)] [message size in bytes]");
System.exit(0);
}
@@ -109,14 +115,50 @@ public class TestPingItself extends PingPongProducer
boolean transacted = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false;
boolean persistent = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false;
int messageSize = (args.length >= 6) ? Integer.parseInt(args[5]) : DEFAULT_MESSAGE_SIZE;
+ int batchSize = (args.length >= 7) ? Integer.parseInt(args[6]) : 1;
- String queue = "ping_"+ System.currentTimeMillis();
- _logger.info("Queue:" + queue + ", Transacted:" + transacted + ", persistent:"+ persistent +
+ String queue = "ping_" + System.currentTimeMillis();
+ _logger.info("Queue:" + queue + ", Transacted:" + transacted + ", persistent:" + persistent +
",MessageSize:" + messageSize + " bytes");
+
+ boolean afterCommit = false;
+ boolean beforeCommit = false;
+ boolean afterSend = false;
+ boolean beforeSend = false;
+
+ for (String arg : args)
+ {
+ if (arg.startsWith("failover:"))
+ {
+ //failover:<before|after>:<send:commit>
+ String[] parts = arg.split(":");
+ if (parts.length == 3)
+ {
+ if (parts[2].equals("commit"))
+ {
+ afterCommit = parts[1].equals("after");
+ beforeCommit = parts[1].equals("before");
+ }
+
+ if (parts[2].equals("send"))
+ {
+ afterSend = parts[1].equals("after");
+ beforeSend = parts[1].equals("before");
+ }
+ }
+ else
+ {
+ System.out.println("Unrecognized failover request:" + arg);
+ }
+ }
+ }
+
// Create a ping producer to handle the request/wait/reply cycle.
TestPingItself pingItself = new TestPingItself(brokerDetails, "guest", "guest", virtualpath, queue, null,
- transacted, persistent, messageSize, verbose);
+ transacted, persistent, messageSize, verbose,
+ afterCommit, beforeCommit, afterSend, beforeSend,
+ batchSize);
pingItself.getConnection().start();
// Run a few priming pings to remove warm up time from test results.
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java
index 37f6f7518e..e53d7bb521 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java
@@ -37,12 +37,12 @@ import org.apache.qpid.jms.Session;
* PingProducer is a client that sends timestamped pings to a queue. It is designed to be run from the command line
* as a stand alone test tool, but it may also be fairly easily instantiated by other code by supplying a session and
* configured message producer.
- *
+ * <p/>
* <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
* does all its work through helper methods, so that code wishing to run a ping cycle is not forced to do so
* by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is
* also registered to terminate the ping loop cleanly.
- *
+ * <p/>
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Provide a ping cycle.
@@ -53,31 +53,42 @@ class TestPingProducer extends AbstractPingProducer
{
private static final Logger _logger = Logger.getLogger(TestPingProducer.class);
- /** Used to set up a default message size. */
+ /**
+ * Used to set up a default message size.
+ */
private static final int DEFAULT_MESSAGE_SIZE = 0;
- /** Used to define how long to wait between pings. */
+ /**
+ * Used to define how long to wait between pings.
+ */
private static final long SLEEP_TIME = 250;
- /** Holds the name of the queue to send pings on. */
+ /**
+ * Holds the name of the queue to send pings on.
+ */
private static final String PING_QUEUE_NAME = "ping";
private static TestPingProducer _pingProducer;
- /** Holds the message producer to send the pings through. */
- private MessageProducer _producer;
-
- /** Determines whether this producer sends persistent messages from the run method. */
+ /**
+ * Determines whether this producer sends persistent messages from the run method.
+ */
private boolean _persistent = false;
- /** Holds the message size to send, from the run method. */
+ /**
+ * Holds the message size to send, from the run method.
+ */
private int _messageSize = DEFAULT_MESSAGE_SIZE;
- /** Used to indicate that the ping loop should print out whenever it pings. */
+ /**
+ * Used to indicate that the ping loop should print out whenever it pings.
+ */
private boolean _verbose = false;
+
public TestPingProducer(String brokerDetails, String username, String password, String virtualpath, String queueName,
- boolean transacted, boolean persistent, int messageSize, boolean verbose) throws Exception
+ boolean transacted, boolean persistent, int messageSize, boolean verbose, boolean afterCommit,
+ boolean beforeCommit, boolean afterSend, boolean beforeSend,int batchSize) throws Exception
{
// Create a connection to the broker.
InetAddress address = InetAddress.getLocalHost();
@@ -96,6 +107,14 @@ class TestPingProducer extends AbstractPingProducer
_messageSize = messageSize;
_verbose = verbose;
+
+ // Set failover interrupts
+ _failAfterCommit = afterCommit;
+ _failBeforeCommit = beforeCommit;
+ _failAfterSend = afterSend;
+ _failBeforeSend = beforeSend;
+ _sentMessages = 0;
+ _batchSize = batchSize;
}
/**
@@ -110,7 +129,8 @@ class TestPingProducer extends AbstractPingProducer
if (args.length < 2)
{
System.err.println(
- "Usage: TestPingPublisher <brokerDetails> <virtual path> [verbose] [transacted] [persistent] [message size in bytes]");
+ "Usage: TestPingPublisher <brokerDetails> <virtual path> "+
+ "[<verbose(true|false)> <transacted(true|false))> <persistent(true|false)> <message size in bytes> <batchsize>");
System.exit(0);
}
@@ -120,10 +140,46 @@ class TestPingProducer extends AbstractPingProducer
boolean transacted = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false;
boolean persistent = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false;
int messageSize = (args.length >= 6) ? Integer.parseInt(args[5]) : DEFAULT_MESSAGE_SIZE;
+ int batchSize = (args.length >= 7) ? Integer.parseInt(args[6]) : 1;
+
+
+ boolean afterCommit = false;
+ boolean beforeCommit = false;
+ boolean afterSend = false;
+ boolean beforeSend = false;
+
+ for (String arg : args)
+ {
+ if (arg.startsWith("failover:"))
+ {
+ //failover:<before|after>:<send:commit>
+ String[] parts = arg.split(":");
+ if (parts.length == 3)
+ {
+ if (parts[2].equals("commit"))
+ {
+ afterCommit = parts[1].equals("after");
+ beforeCommit = parts[1].equals("before");
+ }
+
+ if (parts[2].equals("send"))
+ {
+ afterSend = parts[1].equals("after");
+ beforeSend = parts[1].equals("before");
+ }
+ }
+ else
+ {
+ System.out.println("Unrecognized failover request:" + arg);
+ }
+ }
+ }
// Create a ping producer to generate the pings.
- _pingProducer = new TestPingProducer(brokerDetails, "guest", "guest", virtualpath, PING_QUEUE_NAME, transacted,
- persistent, messageSize, verbose);
+ _pingProducer = new TestPingProducer(brokerDetails, "guest", "guest", virtualpath, PING_QUEUE_NAME,
+ transacted, persistent, messageSize, verbose,
+ afterCommit, beforeCommit, afterSend, beforeSend,
+ batchSize);
// Start the connection running.
_pingProducer.getConnection().start();
@@ -144,19 +200,18 @@ class TestPingProducer extends AbstractPingProducer
* Sends the specified ping message.
*
* @param message The message to send.
- *
* @throws JMSException All underlying JMSExceptions are allowed to fall through.
*/
public void ping(Message message) throws JMSException
{
- _producer.send(message);
+ sendMessage(message);
// Keep the messageId to correlate with the reply.
String messageId = message.getJMSMessageID();
// Commit the transaction if running in transactional mode. This must happen now, rather than at the end of
- // this method, as the message will not be sent until the transaction is committed.
- commitTx(getProducerSession());
+ // this method, as the message will not be sent until the transaction is committed.
+ commitTx();
}
/**
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index 0b5f040b90..3c3e31dd55 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -44,17 +44,17 @@ import org.apache.qpid.ping.AbstractPingProducer;
* client (see {@link PingPongBouncer} for the bounce back client). It is designed to be run from the command line
* as a stand alone test tool, but it may also be fairly easily instantiated by other code by supplying a session,
* message producer and message consumer to run the ping-pong cycle on.
- *
+ * <p/>
* <p/>The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings.
* This means that this class has to do some work to correlate pings with pongs; it expectes the original message
* id in the ping to be bounced back in the correlation id. If a new temporary queue per ping were used, then
* this correlation would not need to be done.
- *
+ * <p/>
* <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
* does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so
* by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is
* also registered to terminate the ping-pong loop cleanly.
- *
+ * <p/>
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Provide a ping and wait for response cycle.
@@ -62,65 +62,91 @@ import org.apache.qpid.ping.AbstractPingProducer;
* </table>
*
* @todo Make temp queue per ping a command line option.
- *
* @todo Make the queue name a command line option.
*/
public class PingPongProducer extends AbstractPingProducer implements Runnable, MessageListener, ExceptionListener
{
private static final Logger _logger = Logger.getLogger(PingPongProducer.class);
- /** Used to set up a default message size. */
+ /**
+ * Used to set up a default message size.
+ */
protected static final int DEFAULT_MESSAGE_SIZE = 0;
- /** Used to define how long to wait between pings. */
+ /**
+ * Used to define how long to wait between pings.
+ */
protected static final long SLEEP_TIME = 250;
- /** Used to define how long to wait before assuming that a ping has timed out. */
+ /**
+ * Used to define how long to wait before assuming that a ping has timed out.
+ */
protected static final long TIMEOUT = 9000;
- /** Holds the name of the queue to send pings on. */
+ /**
+ * Holds the name of the queue to send pings on.
+ */
protected static final String PING_QUEUE_NAME = "ping";
- /** The batch size. */
+ /**
+ * The batch size.
+ */
protected static final int BATCH_SIZE = 100;
- /** Keeps track of the ping producer instance used in the run loop. */
+ /**
+ * Keeps track of the ping producer instance used in the run loop.
+ */
private static PingPongProducer _pingProducer;
protected static final int PREFETCH = 100;
protected static final boolean NO_LOCAL = true;
protected static final boolean EXCLUSIVE = false;
- /** The number of priming loops to run. */
+ /**
+ * The number of priming loops to run.
+ */
protected static final int PRIMING_LOOPS = 3;
- /** A source for providing sequential unique correlation ids. */
+ /**
+ * A source for providing sequential unique correlation ids.
+ */
private AtomicLong idGenerator = new AtomicLong(0L);
- /** Holds the message producer to send the pings through. */
- private MessageProducer _producer;
-
- /** Holds the queue to send the ping replies to. */
+ /**
+ * Holds the queue to send the ping replies to.
+ */
private Queue _replyQueue;
- /** Hold the known Queue where the producer will be sending message to*/
+ /**
+ * Hold the known Queue where the producer will be sending message to
+ */
private Queue _pingQueue;
- /** Determines whether this producer sends persistent messages from the run method. */
+ /**
+ * Determines whether this producer sends persistent messages from the run method.
+ */
protected boolean _persistent;
- /** Holds the message size to send, from the run method. */
+ /**
+ * Holds the message size to send, from the run method.
+ */
protected int _messageSize;
- /** Holds a map from message ids to latches on which threads wait for replies. */
+ /**
+ * Holds a map from message ids to latches on which threads wait for replies.
+ */
private Map<String, CountDownLatch> trafficLights = new HashMap<String, CountDownLatch>();
- /** Used to indicate that the ping loop should print out whenever it pings. */
+ /**
+ * Used to indicate that the ping loop should print out whenever it pings.
+ */
protected boolean _verbose = false;
protected Session _consumerSession;
- protected PingPongProducer(String brokerDetails, String username, String password, String virtualpath,
- boolean transacted)
+ private PingPongProducer(String brokerDetails, String username, String password, String virtualpath,
+ boolean transacted, boolean persistent, int messageSize, boolean verbose,
+ boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend,
+ int batchSize)
throws Exception
{
// Create a connection to the broker.
@@ -132,6 +158,18 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
// Create transactional or non-transactional sessions, based on the command line arguments.
setProducerSession((Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE));
_consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+
+ _persistent = persistent;
+ _messageSize = messageSize;
+ _verbose = verbose;
+
+ // Set failover interrupts
+ _failAfterCommit = afterCommit;
+ _failBeforeCommit = beforeCommit;
+ _failAfterSend = afterSend;
+ _failBeforeSend = beforeSend;
+ _batchSize = batchSize;
+ _sentMessages = 0;
}
/**
@@ -142,31 +180,39 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
* @param password
* @param virtualpath
* @param transacted
- * @param persistent
- * @param messageSize
- * @param verbose
- *
* @throws Exception All allowed to fall through. This is only test code...
*/
public PingPongProducer(String brokerDetails, String username, String password, String virtualpath, String queueName,
- String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose)
- throws Exception
+ String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose,
+ boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend,
+ int batchSize, int queueCount)
+ throws Exception
{
- this(brokerDetails, username, password, virtualpath, transacted);
+ this(brokerDetails, username, password, virtualpath, transacted, persistent, messageSize, verbose,
+ afterCommit, beforeCommit, afterSend, beforeSend, batchSize);
- _pingQueue = new AMQQueue(queueName);
- _persistent = persistent;
- _messageSize = messageSize;
- _verbose = verbose;
-
- // Create producer and the consumer
- createProducer();
- createConsumer(selector);
+ if (queueName != null)
+ {
+ _pingQueue = new AMQQueue(queueName);
+ // Create producer and the consumer
+ createProducer();
+ createConsumer(selector);
+ }
+ else if (queueCount > 0)
+ {
+ _queueCount = queueCount;
+ }
+ else
+ {
+ _logger.error("Queue Count is zero and no queueName specified. One must be set.");
+ throw new IllegalArgumentException("Queue Count is zero and no queueName specified. One must be set.");
+ }
}
/**
* Creates the producer to send the pings on. If the tests are with nultiple queues, then producer
* is created with null destination, so that any destination can be specified while sending
+ *
* @throws JMSException
*/
public void createProducer() throws JMSException
@@ -189,6 +235,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
/**
* Creates the temporary queue to listen to the responses
+ *
* @param selector
* @throws JMSException
*/
@@ -204,6 +251,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
/**
* Creates consumer instances for each queue. This is used when test is being done with multiple queues.
+ *
* @param selector
* @throws JMSException
*/
@@ -234,7 +282,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
/**
* Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongBouncer} also needs
* to be started to bounce the pings back again.
- *
+ * <p/>
* <p/>The command line takes from 2 to 4 arguments:
* <p/><table>
* <tr><td>brokerDetails <td> The broker connection string.
@@ -251,7 +299,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
if (args.length < 2)
{
System.err.println("Usage: TestPingPublisher <brokerDetails> <virtual path> [verbose (true/false)] " +
- "[transacted (true/false)] [persistent (true/false)] [message size in bytes]");
+ "[transacted (true/false)] [persistent (true/false)] [message size in bytes]");
System.exit(0);
}
@@ -261,10 +309,46 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
boolean transacted = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false;
boolean persistent = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false;
int messageSize = (args.length >= 6) ? Integer.parseInt(args[5]) : DEFAULT_MESSAGE_SIZE;
+ int batchSize = (args.length >= 7) ? Integer.parseInt(args[6]) : 1;
+
+ boolean afterCommit = false;
+ boolean beforeCommit = false;
+ boolean afterSend = false;
+ boolean beforeSend = false;
+
+ for (String arg : args)
+ {
+ if (arg.startsWith("failover:"))
+ {
+ //failover:<before|after>:<send:commit>
+ String[] parts = arg.split(":");
+ if (parts.length == 3)
+ {
+ if (parts[2].equals("commit"))
+ {
+ afterCommit = parts[1].equals("after");
+ beforeCommit = parts[1].equals("before");
+ }
+
+ if (parts[2].equals("send"))
+ {
+ afterSend = parts[1].equals("after");
+ beforeSend = parts[1].equals("before");
+ }
+ }
+ else
+ {
+ System.out.println("Unrecognized failover request:" + arg);
+ }
+ }
+ }
// Create a ping producer to handle the request/wait/reply cycle.
_pingProducer = new PingPongProducer(brokerDetails, "guest", "guest", virtualpath, PING_QUEUE_NAME, null, transacted,
- persistent, messageSize, verbose);
+ persistent, messageSize, verbose,
+ afterCommit, beforeCommit, afterSend, beforeSend,
+ batchSize, 0);
+
_pingProducer.getConnection().start();
// Run a few priming pings to remove warm up time from test results.
@@ -287,7 +371,6 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
* this a few times, in order to prime the JVMs JIT compilation.
*
* @param x The number of priming loops to run.
- *
* @throws JMSException All underlying exceptions are allowed to fall through.
*/
public void prime(int x) throws JMSException
@@ -296,15 +379,18 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
{
// Create and send a small message.
Message first = getTestMessage(_replyQueue, 0, false);
- _producer.send(first);
- commitTx(getProducerSession());
+
+ sendMessage(first);
+
+ commitTx();
try
{
Thread.sleep(100);
}
catch (InterruptedException ignore)
- { }
+ {
+ }
}
}
@@ -365,10 +451,8 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
* @param message The message to send.
* @param numPings The number of ping messages to send.
* @param timeout The timeout in milliseconds.
- *
* @return The number of replies received. This may be less than the number sent if the timeout terminated the
* wait for all prematurely.
- *
* @throws JMSException All underlying JMSExceptions are allowed to fall through.
*/
public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException
@@ -394,13 +478,13 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
{
// Re-timestamp the message.
message.setLongProperty("timestamp", System.currentTimeMillis());
- _producer.send(message);
+ sendMessage(message);
}
}
// Commit the transaction if running in transactional mode. This must happen now, rather than at the end of
// this method, as the message will not be sent until the transaction is committed.
- commitTx(getProducerSession());
+ commitTx();
// Keep the messageId to correlate with the reply.
//String messageId = message.getJMSMessageID();
@@ -429,7 +513,8 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
/**
* When the test is being performed with multiple queues, then this method will be used, which has a loop to
- * pick up the next queue from the queues list and sends message to it.
+ * pick up the next queue from the queues list and sends message to it.
+ *
* @param message
* @param numPings
* @throws JMSException
@@ -441,31 +526,30 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
{
// Re-timestamp the message.
message.setLongProperty("timestamp", System.currentTimeMillis());
- _producer.send(getQueue(queueIndex++), message);
+
+ sendMessage(getQueue(queueIndex++), message);
// reset the counter to get the first queue
- if (queueIndex == getQueueCount() -1)
+ if (queueIndex == getQueueCount() - 1)
{
queueIndex = 0;
}
}
}
-
+
/**
* Sends the specified ping message but does not wait for a correlating reply.
*
* @param message The message to send.
* @param numPings The number of pings to send.
- *
* @return The reply, or null if no reply arrives before the timeout.
- *
* @throws JMSException All underlying JMSExceptions are allowed to fall through.
*/
public void pingNoWaitForReply(Message message, int numPings) throws JMSException, InterruptedException
{
for (int i = 0; i < numPings; i++)
{
- _producer.send(message);
+ sendMessage(message);
if (_verbose)
{
@@ -474,7 +558,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
}
// Commit the transaction if running in transactional mode, to force the send now.
- commitTx(getProducerSession());
+ commitTx();
}
/**
@@ -524,19 +608,21 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
public static class FailoverNotifier implements ConnectionListener
{
public void bytesSent(long count)
- { }
+ {
+ }
public void bytesReceived(long count)
- { }
+ {
+ }
public boolean preFailover(boolean redirect)
{
- return true;
+ return true; //Allow failover
}
public boolean preResubscribe()
{
- return true;
+ return true; // Allow resubscription
}
public void failoverComplete()
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
index ef34b92265..e416d31031 100644
--- a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
@@ -13,23 +13,22 @@ import org.apache.log4j.Logger;
import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase;
/**
- *
* PingTestPerf is a ping test, that has been written with the intention of being scaled up to run many times
* simultaneously to simluate many clients/producers/connections.
- *
+ * <p/>
* <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of a single
* full round trip ping. This test may be scaled up using a suitable JUnit test runner.
- *
+ * <p/>
* <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a
* temporary queue for replies. This setup is only established once for all the test repeats/threads that may be run,
* except if the connection is lost in which case an attempt to re-establish the setup is made.
- *
+ * <p/>
* <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that
* is the name of the temporary queue, fires off a message on the original queue and waits for a response on the
* temporary queue.
- *
+ * <p/>
* <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details.
- *
+ * <p/>
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* </table>
@@ -40,65 +39,95 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
{
private static Logger _logger = Logger.getLogger(PingTestPerf.class);
- /** Holds the name of the property to get the test message size from. */
+ /**
+ * Holds the name of the property to get the test message size from.
+ */
private static final String MESSAGE_SIZE_PROPNAME = "messageSize";
- /** Holds the name of the property to get the ping queue name from. */
+ /**
+ * Holds the name of the property to get the ping queue name from.
+ */
private static final String PING_QUEUE_NAME_PROPNAME = "pingQueue";
- /** holds the queue count, if the test is being performed with multiple queues */
+ /**
+ * holds the queue count, if the test is being performed with multiple queues
+ */
private static final String PING_QUEUE_COUNT_PROPNAME = "queues";
- /** Holds the name of the property to get the test delivery mode from. */
+ /**
+ * Holds the name of the property to get the test delivery mode from.
+ */
private static final String PERSISTENT_MODE_PROPNAME = "persistent";
- /** Holds the name of the property to get the test transactional mode from. */
+ /**
+ * Holds the name of the property to get the test transactional mode from.
+ */
private static final String TRANSACTED_PROPNAME = "transacted";
- /** Holds the name of the property to get the test broker url from. */
+ /**
+ * Holds the name of the property to get the test broker url from.
+ */
private static final String BROKER_PROPNAME = "broker";
- /** Holds the name of the property to get the test broker virtual path. */
+ /**
+ * Holds the name of the property to get the test broker virtual path.
+ */
private static final String VIRTUAL_PATH_PROPNAME = "virtualPath";
- /** Holds the waiting timeout for response messages */
+ /**
+ * Holds the waiting timeout for response messages
+ */
private static final String TIMEOUT_PROPNAME = "timeout";
- /** Holds the size of message body to attach to the ping messages. */
+ /**
+ * Holds the size of message body to attach to the ping messages.
+ */
private static final int MESSAGE_SIZE_DEFAULT = 0;
- /** Holds the name of the queue to which pings are sent. */
+ private static final int BATCH_SIZE_DEFAULT = 2;
+
+ /**
+ * Holds the name of the queue to which pings are sent.
+ */
private static final String PING_QUEUE_NAME_DEFAULT = "ping";
- /** Holds the message delivery mode to use for the test. */
+ /**
+ * Holds the message delivery mode to use for the test.
+ */
private static final boolean PERSISTENT_MODE_DEFAULT = false;
- /** Holds the transactional mode to use for the test. */
+ /**
+ * Holds the transactional mode to use for the test.
+ */
private static final boolean TRANSACTED_DEFAULT = false;
- /** Holds the default broker url for the test. */
+ /**
+ * Holds the default broker url for the test.
+ */
private static final String BROKER_DEFAULT = "tcp://localhost:5672";
- /** Holds the default virtual path for the test. */
+ /**
+ * Holds the default virtual path for the test.
+ */
private static final String VIRTUAL_PATH_DEFAULT = "/test";
- /** Sets a default ping timeout. */
+ /**
+ * Sets a default ping timeout.
+ */
private static final long TIMEOUT_DEFAULT = 3000;
- // Sets up the test parameters with defaults.
- static
- {
- setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT));
- setSystemPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
- setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT));
- setSystemPropertyIfNull(TRANSACTED_PROPNAME, Boolean.toString(TRANSACTED_DEFAULT));
- setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
- setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
- setSystemPropertyIfNull(TIMEOUT_PROPNAME, Long.toString(TIMEOUT_DEFAULT));
- setSystemPropertyIfNull(PING_QUEUE_COUNT_PROPNAME, Integer.toString(1));
- }
- /** Thread local to hold the per-thread test setup fields. */
+ private static final String FAIL_AFTER_COMMIT = "FailAfterCommit";
+ private static final String FAIL_BEFORE_COMMIT = "FailBeforeCommit";
+ private static final String FAIL_AFTER_SEND = "FailAfterSend";
+ private static final String FAIL_BEFORE_SEND = "FailBeforeSend";
+ private static final String BATCH_SIZE = "BatchSize";
+
+
+
+ /**
+ * Thread local to hold the per-thread test setup fields.
+ */
ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
// Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
@@ -107,9 +136,27 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
private Properties testParameters = System.getProperties();
//private Properties testParameters = new ContextualProperties(System.getProperties());
+
public PingTestPerf(String name)
{
super(name);
+ // Sets up the test parameters with defaults.
+
+
+ setSystemPropertyIfNull(FAIL_AFTER_COMMIT, "false");
+ setSystemPropertyIfNull(FAIL_BEFORE_COMMIT, "false");
+ setSystemPropertyIfNull(FAIL_AFTER_SEND, "false");
+ setSystemPropertyIfNull(FAIL_BEFORE_SEND, "false");
+
+ setSystemPropertyIfNull(BATCH_SIZE, Integer.toString(BATCH_SIZE_DEFAULT));
+ setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT));
+ setSystemPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
+ setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT));
+ setSystemPropertyIfNull(TRANSACTED_PROPNAME, Boolean.toString(TRANSACTED_DEFAULT));
+ setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
+ setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
+ setSystemPropertyIfNull(TIMEOUT_PROPNAME, Long.toString(TIMEOUT_DEFAULT));
+ setSystemPropertyIfNull(PING_QUEUE_COUNT_PROPNAME, Integer.toString(1));
}
/**
@@ -124,7 +171,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
suite.addTest(new PingTestPerf("testPingOk"));
return suite;
- //return new junit.framework.TestSuite(PingTestPerf.class);
+ //return new junit.framework.TestSuite(PingTestPerf.class);
}
private static void setSystemPropertyIfNull(String propName, String propValue)
@@ -135,18 +182,28 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
}
}
+ public void testPing(int jim) throws Exception
+ {
+ testPingOk(1);
+ }
+
public void testPingOk(int numPings) throws Exception
{
// Get the per thread test setup to run the test through.
PerThreadSetup perThreadSetup = threadSetup.get();
+ if (numPings == 0)
+ {
+ _logger.error("Number of pings requested was zero.");
+ }
+
// Generate a sample message. This message is already time stamped and has its reply-to destination set.
ObjectMessage msg =
- perThreadSetup._pingItselfClient.getTestMessage(null,
- Integer.parseInt(testParameters.getProperty(
- MESSAGE_SIZE_PROPNAME)),
- Boolean.parseBoolean(testParameters.getProperty(
- PERSISTENT_MODE_PROPNAME)));
+ perThreadSetup._pingItselfClient.getTestMessage(null,
+ Integer.parseInt(testParameters.getProperty(
+ MESSAGE_SIZE_PROPNAME)),
+ Boolean.parseBoolean(testParameters.getProperty(
+ PERSISTENT_MODE_PROPNAME)));
// start the test
long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME));
@@ -185,20 +242,31 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
boolean verbose = false;
int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
+ boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
+ boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
+ boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND));
+ boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND));
+
+ int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE));
+
// Establish a client to ping a Queue and listen the reply back from same Queue
if (queueCount > 1)
{
// test client with multiple queues
perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath,
- queueCount, selector, transacted, persistent,
- messageSize, verbose);
+ selector, transacted, persistent,
+ messageSize, verbose,
+ afterCommit, beforeCommit, afterSend, beforeSend,
+ batchSize, queueCount);
}
else
{
// Establish a client to ping a Queue and listen the reply back from same Queue
perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath,
queueName, selector, transacted, persistent,
- messageSize, verbose);
+ messageSize, verbose,
+ afterCommit, beforeCommit, afterSend, beforeSend,
+ batchSize);
}
// Start the client connection
@@ -228,7 +296,9 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
private static class PerThreadSetup
{
- /** Holds the test ping client. */
+ /**
+ * Holds the test ping client.
+ */
private TestPingItself _pingItselfClient;
}
}
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
index f553faf302..3e1035ce05 100644
--- a/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
@@ -18,22 +18,22 @@ import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase;
* PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run
* many times simultaneously to simluate many clients/producer/connections. A full round trip ping sends a message from
* a producer to a conumer, then the consumer replies to the message on a temporary queue.
- *
+ * <p/>
* <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of the number
* of pings specified by the test size and time how long it takes for all of these to complete. This test may be scaled
* up using a suitable JUnit test runner. See {@link TKTestRunner} or {@link PPTestRunner} for more information on how
* to do this.
- *
+ * <p/>
* <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a
* temporary queue for replies. This setup is only established once for all the test repeats, but each test threads
* gets its own connection/producer/consumer, this is only re-established if the connection is lost.
- *
+ * <p/>
* <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that
* is the name of the temporary queue, fires off many messages on the original queue and waits for them all to come
* back on the temporary queue.
- *
+ * <p/>
* <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details.
- *
+ * <p/>
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* </table>
@@ -44,43 +44,69 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont
{
private static Logger _logger = Logger.getLogger(PingPongTestPerf.class);
- /** Holds the name of the property to get the test message size from. */
+ /**
+ * Holds the name of the property to get the test message size from.
+ */
private static final String MESSAGE_SIZE_PROPNAME = "messageSize";
- /** Holds the name of the property to get the ping queue name from. */
+ /**
+ * Holds the name of the property to get the ping queue name from.
+ */
private static final String PING_QUEUE_NAME_PROPNAME = "pingQueue";
- /** Holds the name of the property to get the test delivery mode from. */
+ /**
+ * Holds the name of the property to get the test delivery mode from.
+ */
private static final String PERSISTENT_MODE_PROPNAME = "persistent";
- /** Holds the name of the property to get the test transactional mode from. */
+ /**
+ * Holds the name of the property to get the test transactional mode from.
+ */
private static final String TRANSACTED_PROPNAME = "transacted";
- /** Holds the name of the property to get the test broker url from. */
+ /**
+ * Holds the name of the property to get the test broker url from.
+ */
private static final String BROKER_PROPNAME = "broker";
- /** Holds the name of the property to get the test broker virtual path. */
+ /**
+ * Holds the name of the property to get the test broker virtual path.
+ */
private static final String VIRTUAL_PATH_PROPNAME = "virtualPath";
- /** Holds the size of message body to attach to the ping messages. */
+ /**
+ * Holds the size of message body to attach to the ping messages.
+ */
private static final int MESSAGE_SIZE_DEFAULT = 0;
- /** Holds the name of the queue to which pings are sent. */
+ /**
+ * Holds the name of the queue to which pings are sent.
+ */
private static final String PING_QUEUE_NAME_DEFAULT = "ping";
- /** Holds the message delivery mode to use for the test. */
+ /**
+ * Holds the message delivery mode to use for the test.
+ */
private static final boolean PERSISTENT_MODE_DEFAULT = false;
- /** Holds the transactional mode to use for the test. */
+ /**
+ * Holds the transactional mode to use for the test.
+ */
private static final boolean TRANSACTED_DEFAULT = false;
- /** Holds the default broker url for the test. */
+ /**
+ * Holds the default broker url for the test.
+ */
private static final String BROKER_DEFAULT = "tcp://localhost:5672";
- /** Holds the default virtual path for the test. */
+ /**
+ * Holds the default virtual path for the test.
+ */
private static final String VIRTUAL_PATH_DEFAULT = "/test";
- /** Sets a default ping timeout. */
+ /**
+ * Sets a default ping timeout.
+ */
private static final long TIMEOUT = 15000;
// Sets up the test parameters with defaults.
@@ -94,7 +120,9 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont
setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
}
- /** Thread local to hold the per-thread test setup fields. */
+ /**
+ * Thread local to hold the per-thread test setup fields.
+ */
ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
// Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
@@ -103,6 +131,13 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont
private Properties testParameters = System.getProperties();
//private Properties testParameters = new ContextualProperties(System.getProperties());
+ private static final String FAIL_AFTER_COMMIT = "FailAfterCommit";
+ private static final String FAIL_BEFORE_COMMIT = "FailBeforeCommit";
+ private static final String FAIL_AFTER_SEND = "FailAfterSend";
+ private static final String FAIL_BEFORE_SEND = "FailBeforeSend";
+ private static final String BATCH_SIZE = "BatchSize";
+
+
public PingPongTestPerf(String name)
{
super(name);
@@ -137,11 +172,11 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont
// Generate a sample message. This message is already time stamped and has its reply-to destination set.
ObjectMessage msg =
- perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyQueue(),
- Integer.parseInt(testParameters.getProperty(
- MESSAGE_SIZE_PROPNAME)),
- Boolean.parseBoolean(testParameters.getProperty(
- PERSISTENT_MODE_PROPNAME)));
+ perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyQueue(),
+ Integer.parseInt(testParameters.getProperty(
+ MESSAGE_SIZE_PROPNAME)),
+ Boolean.parseBoolean(testParameters.getProperty(
+ PERSISTENT_MODE_PROPNAME)));
// Use the test timing controller to reset the test timer now and obtain the current time.
// This can be used to remove the message creation time from the test.
@@ -181,6 +216,12 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont
boolean verbose = false;
int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
+ boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
+ boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
+ boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND));
+ boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND));
+ int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE));
+
// Establish a bounce back client on the ping queue to bounce back the pings.
perThreadSetup._testPingBouncer = new PingPongBouncer(brokerDetails, username, password, virtualpath, queueName,
persistent, transacted, selector, verbose);
@@ -191,7 +232,9 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont
// Establish a ping-pong client on the ping queue to send the pings with.
perThreadSetup._testPingProducer = new PingPongProducer(brokerDetails, username, password, virtualpath,
queueName, selector, transacted, persistent, messageSize,
- verbose);
+ verbose,
+ afterCommit, beforeCommit, afterSend, beforeSend,
+ batchSize, 0);
perThreadSetup._testPingProducer.getConnection().start();
@@ -205,14 +248,14 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont
try
{
/**if ((_testPingBouncer != null) && (_testPingBouncer.getConnection() != null))
- {
- _testPingBouncer.getConnection().close();
- }
-
- if ((_testPingProducer != null) && (_testPingProducer.getConnection() != null))
- {
- _testPingProducer.getConnection().close();
- }*/
+ {
+ _testPingBouncer.getConnection().close();
+ }
+
+ if ((_testPingProducer != null) && (_testPingProducer.getConnection() != null))
+ {
+ _testPingProducer.getConnection().close();
+ }*/
}
finally
{
@@ -222,10 +265,14 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont
private static class PerThreadSetup
{
- /** Holds the test ping-pong producer. */
+ /**
+ * Holds the test ping-pong producer.
+ */
private PingPongProducer _testPingProducer;
- /** Holds the test ping client. */
+ /**
+ * Holds the test ping client.
+ */
private PingPongBouncer _testPingBouncer;
}
}