From 12c8ec5bc9c89993e87e11b626e9d9f88bdd5cdc Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Tue, 23 Jan 2007 09:39:56 +0000 Subject: Added ability to cause failover before/after commit/sends Added batch size ability. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@498965 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/perftests/distribution/pom.xml | 16 +- .../distribution/src/main/assembly/performance.xml | 7 + .../org/apache/qpid/ping/AbstractPingClient.java | 47 ++++- .../org/apache/qpid/ping/AbstractPingProducer.java | 183 ++++++++++++++---- .../java/org/apache/qpid/ping/TestPingClient.java | 53 +++++- .../java/org/apache/qpid/ping/TestPingItself.java | 64 +++++-- .../org/apache/qpid/ping/TestPingProducer.java | 93 +++++++-- .../apache/qpid/requestreply/PingPongProducer.java | 208 +++++++++++++++------ .../java/org/apache/qpid/ping/PingTestPerf.java | 158 +++++++++++----- .../apache/qpid/requestreply/PingPongTestPerf.java | 117 ++++++++---- 10 files changed, 726 insertions(+), 220 deletions(-) diff --git a/qpid/java/perftests/distribution/pom.xml b/qpid/java/perftests/distribution/pom.xml index 4f637715d1..010f19c9f0 100644 --- a/qpid/java/perftests/distribution/pom.xml +++ b/qpid/java/perftests/distribution/pom.xml @@ -44,8 +44,22 @@ org.apache.qpid qpid-perftests + jar + ${pom.version} - + + org.apache.qpid + qpid-perftests + test-jar + ${pom.version} + + + uk.co.thebadgerset + junit-toolkit + 0.4 + runtime + + diff --git a/qpid/java/perftests/distribution/src/main/assembly/performance.xml b/qpid/java/perftests/distribution/src/main/assembly/performance.xml index 0bf7efa21e..a564261a24 100644 --- a/qpid/java/perftests/distribution/src/main/assembly/performance.xml +++ b/qpid/java/perftests/distribution/src/main/assembly/performance.xml @@ -74,6 +74,13 @@ **/*.log4j + + ../src/test + qpid-${qpid.version}/src + + **/*.java + + diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java index 7c82710a3f..c04a8a7d96 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java @@ -1,6 +1,7 @@ package org.apache.qpid.ping; import java.text.SimpleDateFormat; +import java.io.IOException; import javax.jms.Connection; import javax.jms.JMSException; @@ -13,7 +14,7 @@ import org.apache.qpid.jms.Session; /** * Provides functionality common to all ping clients. Provides the ability to manage a session and a convenience method * to commit on the current transaction. - * + *

*

*
CRC Card
Responsibilities Collaborations *
Commit the current transcation. @@ -29,6 +30,9 @@ public abstract class AbstractPingClient private static final Logger _logger = Logger.getLogger(TestPingClient.class); private AMQConnection _connection; + protected boolean _failBeforeCommit = false; + protected boolean _failAfterCommit = false; + public AMQConnection getConnection() { return _connection; @@ -50,7 +54,20 @@ public abstract class AbstractPingClient { try { + if (_failBeforeCommit) + { + _logger.trace("Failing Before Commit"); + doFailover(); + } + session.commit(); + + if (_failAfterCommit) + { + _logger.trace("Failing After Commit"); + doFailover(); + } + _logger.trace("Session Commited."); } catch (JMSException e) @@ -72,4 +89,32 @@ public abstract class AbstractPingClient } } } + + protected void doFailover(String broker) + { + System.out.println("Kill Broker " + broker + " now."); + try + { + System.in.read(); + } + catch (IOException e) + { + } + System.out.println("Continuing."); + } + + protected void doFailover() + { + System.out.println("Kill Broker now."); + try + { + System.in.read(); + } + catch (IOException e) + { + } + System.out.println("Continuing."); + + } + } diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java index 1877a23056..4cca77a70e 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java @@ -2,17 +2,23 @@ package org.apache.qpid.ping; import java.text.DateFormat; import java.text.SimpleDateFormat; +import java.io.IOException; import java.util.List; import java.util.ArrayList; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.*; +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageProducer; import org.apache.log4j.Logger; import org.apache.qpid.client.AMQNoConsumersException; +import org.apache.qpid.client.BasicMessageProducer; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.message.TestMessageFactory; +import org.apache.qpid.jms.*; import org.apache.qpid.jms.Session; import org.apache.qpid.framing.AMQShortString; @@ -20,7 +26,7 @@ import org.apache.qpid.framing.AMQShortString; * This abstract class captures functionality that is common to all ping producers. It provides functionality to * manage a session, and a convenience method to commit a transaction on the session. It also provides a framework * for running a ping loop, and terminating that loop on exceptions or a shutdown handler. - * + *

*

*
CRC Card
Responsibilities Collaborations *
Manage the connection. @@ -35,24 +41,48 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene { private static final Logger _logger = Logger.getLogger(AbstractPingProducer.class); - /** Used to format time stamping output. */ + /** + * Used to format time stamping output. + */ protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); - /** Used to tell the ping loop when to terminate, it only runs while this is true. */ + /** + * Used to tell the ping loop when to terminate, it only runs while this is true. + */ protected boolean _publish = true; - /** Holds the connection handle to the broker. */ + /** + * Holds the connection handle to the broker. + */ private Connection _connection; - /** Holds the producer session, need to create test messages. */ + /** + * Holds the producer session, need to create test messages. + */ private Session _producerSession; - /** holds the no of queues the tests will be using to send messages. By default it will be 1 */ - private int _queueCount; + /** + * holds the no of queues the tests will be using to send messages. By default it will be 1 + */ + protected int _queueCount; private static AtomicInteger _queueSequenceID = new AtomicInteger(); private List _queues = new ArrayList(); + /** + * Holds the message producer to send the pings through. + */ + protected org.apache.qpid.jms.MessageProducer _producer; + + protected boolean _failBeforeCommit = false; + protected boolean _failAfterCommit = false; + protected boolean _failBeforeSend = false; + protected boolean _failAfterSend = false; + + protected int _sentMessages = 0; + protected int _batchSize = 1; + + /** * Convenience method for a short pause. * @@ -67,7 +97,8 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene Thread.sleep(sleepTime); } catch (InterruptedException ie) - { } + { + } } } @@ -78,9 +109,7 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene * * @param replyQueue The reply-to destination for the message. * @param messageSize The desired size of the message in bytes. - * * @return A freshly generated test message. - * * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through. */ public ObjectMessage getTestMessage(Queue replyQueue, int messageSize, boolean persistent) throws JMSException @@ -153,12 +182,12 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene public Thread getShutdownHook() { return new Thread(new Runnable() + { + public void run() { - public void run() - { - stop(); - } - }); + stop(); + } + }); } public Connection getConnection() @@ -181,6 +210,12 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene this._producerSession = session; } + + protected void commitTx() throws JMSException + { + commitTx(getProducerSession()); + } + public int getQueueCount() { return _queueCount; @@ -194,6 +229,7 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene /** * Creates queues dynamically and adds to the queues list. This is when the test is being done with * multiple queues. + * * @param queueCount */ protected void createQueues(int queueCount) @@ -202,7 +238,7 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene { AMQShortString name = new AMQShortString("Queue_" + _queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis()); AMQQueue queue = new AMQQueue(name, name, false, false, false); - + _queues.add(queue); } } @@ -219,36 +255,107 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene */ protected void commitTx(Session session) throws JMSException { - if (session.getTransacted()) + if ((++_sentMessages % _batchSize) == 0) { - try + if (session.getTransacted()) { - session.commit(); - _logger.trace("Session Commited."); - } - catch (JMSException e) - { - _logger.trace("JMSException on commit:" + e.getMessage(), e); - - // Warn that the bounce back client is not available. - if (e.getLinkedException() instanceof AMQNoConsumersException) - { - _logger.debug("No consumers on queue."); - } - try { - session.rollback(); - _logger.trace("Message rolled back."); + if (_failBeforeCommit) + { + _logger.trace("Failing Before Commit"); + doFailover(); + } + + session.commit(); + + if (_failAfterCommit) + { + _logger.trace("Failing After Commit"); + doFailover(); + } + _logger.trace("Session Commited."); } - catch (JMSException jmse) + catch (JMSException e) { - _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse); - - // Both commit and rollback failed. Throw the rollback exception. - throw jmse; + _logger.trace("JMSException on commit:" + e.getMessage(), e); + + // Warn that the bounce back client is not available. + if (e.getLinkedException() instanceof AMQNoConsumersException) + { + _logger.debug("No consumers on queue."); + } + + try + { + session.rollback(); + _logger.trace("Message rolled back."); + } + catch (JMSException jmse) + { + _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse); + + // Both commit and rollback failed. Throw the rollback exception. + throw jmse; + } } } } } + + protected void sendMessage(Message message) throws JMSException + { + sendMessage(null, message); + } + + protected void sendMessage(Queue q, Message message) throws JMSException + { + if (_failBeforeSend) + { + _logger.trace("Failing Before Send"); + doFailover(); + } + + if (q == null) + { + _producer.send(message); + } + else + { + _producer.send(q, message); + } + + if (_failAfterSend) + { + _logger.trace("Failing After Send"); + doFailover(); + } + } + + protected void doFailover(String broker) + { + System.out.println("Kill Broker " + broker + " now then press return"); + try + { + System.in.read(); + } + catch (IOException e) + { + } + System.out.println("Continuing."); + } + + protected void doFailover() + { + System.out.println("Kill Broker now then press return"); + try + { + System.in.read(); + } + catch (IOException e) + { + } + System.out.println("Continuing."); + + } } diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java index 43a010d8ef..949ace20e1 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java @@ -34,11 +34,11 @@ import org.apache.qpid.jms.Session; * PingClient is a message listener that received time stamped ping messages. It can work out how long a ping took, * provided that its clokc is synchronized to that of the ping producer, or by running it on the same machine (or jvm) * as the ping producer. - * + *

*

There is a verbose mode flag which causes information about each ping to be output to the console * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should * be disabled for real timing tests as writing to the console will slow things down. - * + *

*

*
CRC Card
Responsibilities Collaborations *
Provide command line invocation to start the ping consumer on a configurable broker url. @@ -50,10 +50,14 @@ class TestPingClient extends AbstractPingClient implements MessageListener { private static final Logger _logger = Logger.getLogger(TestPingClient.class); - /** Used to indicate that the reply generator should log timing info to the console (logger info level). */ + /** + * Used to indicate that the reply generator should log timing info to the console (logger info level). + */ private boolean _verbose = false; - /** The producer session. */ + /** + * The producer session. + */ private Session _consumerSession; /** @@ -67,11 +71,11 @@ class TestPingClient extends AbstractPingClient implements MessageListener * @param transacted * @param selector * @param verbose - * - * @throws Exception All underlying exceptions allowed to fall through. This is only test code... + * @param afterCommit + *@param beforeCommit @throws Exception All underlying exceptions allowed to fall through. This is only test code... */ public TestPingClient(String brokerDetails, String username, String password, String queueName, String virtualpath, - boolean transacted, String selector, boolean verbose) throws Exception + boolean transacted, String selector, boolean verbose, boolean afterCommit, boolean beforeCommit) throws Exception { // Create a connection to the broker. InetAddress address = InetAddress.getLocalHost(); @@ -85,10 +89,15 @@ class TestPingClient extends AbstractPingClient implements MessageListener // Connect a consumer to the ping queue and register this to be called back by it. Queue q = new AMQQueue(queueName); MessageConsumer consumer = _consumerSession.createConsumer(q, 1, false, false, selector); + consumer.setMessageListener(this); // Hang on to the verbose flag setting. _verbose = verbose; + + // Set failover interrupts + _failAfterCommit = afterCommit; + _failBeforeCommit = beforeCommit; } /** @@ -104,7 +113,7 @@ class TestPingClient extends AbstractPingClient implements MessageListener if (args.length < 4) { System.out.println( - "Usage: brokerdetails username password virtual-path [queueName] [verbose] [transacted] [selector]"); + "Usage: brokerdetails username password virtual-path [queueName] [verbose] [transacted] [selector] [failover::commit]"); System.exit(1); } @@ -118,14 +127,38 @@ class TestPingClient extends AbstractPingClient implements MessageListener boolean transacted = (args.length >= 7) ? Boolean.parseBoolean(args[6]) : false; String selector = (args.length == 8) ? args[7] : null; + boolean afterCommit = false; + boolean beforeCommit = false; + + for (String arg : args) + { + if (arg.startsWith("failover:")) + { + //failover:: + String[] parts = arg.split(":"); + if (parts.length == 3) + { + if (parts[2].equals("commit")) + { + afterCommit = parts[1].equals("after"); + beforeCommit = parts[1].equals("before"); + } + } + else + { + System.out.println("Unrecognized failover request:" + arg); + } + } + } + // Create the test ping client and set it running. TestPingClient pingClient = - new TestPingClient(brokerDetails, username, password, queueName, virtualpath, transacted, selector, verbose); + new TestPingClient(brokerDetails, username, password, queueName, virtualpath, transacted, selector, verbose, afterCommit, beforeCommit); + pingClient.getConnection().start(); System.out.println("Waiting..."); } - /** * This is a callback method that is notified of all messages for which this has been registered as a message * listener on a message consumer. diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java index 3c6c42d92b..579816870f 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java @@ -45,18 +45,24 @@ public class TestPingItself extends PingPongProducer private static final Logger _logger = Logger.getLogger(TestPingItself.class); public TestPingItself(String brokerDetails, String username, String password, String virtualpath, String queueName, - String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose) + String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose, + boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, + int batchSize) throws Exception { - super(brokerDetails, username, password, virtualpath, queueName, selector, transacted, persistent, messageSize, verbose); + super(brokerDetails, username, password, virtualpath, queueName, selector, transacted, persistent, messageSize, + verbose, afterCommit, beforeCommit, afterSend, beforeSend, batchSize, 0); } - public TestPingItself(String brokerDetails, String username, String password, String virtualpath, int queueCount, - String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose) + public TestPingItself(String brokerDetails, String username, String password, String virtualpath, + String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose, + boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, + int batchSize, int queueCount) throws Exception { - super(brokerDetails, username, password, virtualpath, transacted); - setQueueCount(queueCount); + super(brokerDetails, username, password, virtualpath, null, null, transacted, persistent, messageSize, + verbose, afterCommit, beforeCommit, afterSend, beforeSend, batchSize, queueCount); + createQueues(queueCount); _persistent = persistent; @@ -82,7 +88,7 @@ public class TestPingItself extends PingPongProducer /** * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongBouncer} also needs * to be started to bounce the pings back again. - * + *

*

The command line takes from 2 to 4 arguments: *

*
brokerDetails The broker connection string. @@ -99,7 +105,7 @@ public class TestPingItself extends PingPongProducer if (args.length < 2) { System.err.println("Usage: TestPingPublisher [verbose (true/false)] " + - "[transacted (true/false)] [persistent (true/false)] [message size in bytes]"); + "[transacted (true/false)] [persistent (true/false)] [message size in bytes]"); System.exit(0); } @@ -109,14 +115,50 @@ public class TestPingItself extends PingPongProducer boolean transacted = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false; boolean persistent = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false; int messageSize = (args.length >= 6) ? Integer.parseInt(args[5]) : DEFAULT_MESSAGE_SIZE; + int batchSize = (args.length >= 7) ? Integer.parseInt(args[6]) : 1; - String queue = "ping_"+ System.currentTimeMillis(); - _logger.info("Queue:" + queue + ", Transacted:" + transacted + ", persistent:"+ persistent + + String queue = "ping_" + System.currentTimeMillis(); + _logger.info("Queue:" + queue + ", Transacted:" + transacted + ", persistent:" + persistent + ",MessageSize:" + messageSize + " bytes"); + + boolean afterCommit = false; + boolean beforeCommit = false; + boolean afterSend = false; + boolean beforeSend = false; + + for (String arg : args) + { + if (arg.startsWith("failover:")) + { + //failover:: + String[] parts = arg.split(":"); + if (parts.length == 3) + { + if (parts[2].equals("commit")) + { + afterCommit = parts[1].equals("after"); + beforeCommit = parts[1].equals("before"); + } + + if (parts[2].equals("send")) + { + afterSend = parts[1].equals("after"); + beforeSend = parts[1].equals("before"); + } + } + else + { + System.out.println("Unrecognized failover request:" + arg); + } + } + } + // Create a ping producer to handle the request/wait/reply cycle. TestPingItself pingItself = new TestPingItself(brokerDetails, "guest", "guest", virtualpath, queue, null, - transacted, persistent, messageSize, verbose); + transacted, persistent, messageSize, verbose, + afterCommit, beforeCommit, afterSend, beforeSend, + batchSize); pingItself.getConnection().start(); // Run a few priming pings to remove warm up time from test results. diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java index 37f6f7518e..e53d7bb521 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java @@ -37,12 +37,12 @@ import org.apache.qpid.jms.Session; * PingProducer is a client that sends timestamped pings to a queue. It is designed to be run from the command line * as a stand alone test tool, but it may also be fairly easily instantiated by other code by supplying a session and * configured message producer. - * + *

*

This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop * does all its work through helper methods, so that code wishing to run a ping cycle is not forced to do so * by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is * also registered to terminate the ping loop cleanly. - * + *

*

*
CRC Card
Responsibilities Collaborations *
Provide a ping cycle. @@ -53,31 +53,42 @@ class TestPingProducer extends AbstractPingProducer { private static final Logger _logger = Logger.getLogger(TestPingProducer.class); - /** Used to set up a default message size. */ + /** + * Used to set up a default message size. + */ private static final int DEFAULT_MESSAGE_SIZE = 0; - /** Used to define how long to wait between pings. */ + /** + * Used to define how long to wait between pings. + */ private static final long SLEEP_TIME = 250; - /** Holds the name of the queue to send pings on. */ + /** + * Holds the name of the queue to send pings on. + */ private static final String PING_QUEUE_NAME = "ping"; private static TestPingProducer _pingProducer; - /** Holds the message producer to send the pings through. */ - private MessageProducer _producer; - - /** Determines whether this producer sends persistent messages from the run method. */ + /** + * Determines whether this producer sends persistent messages from the run method. + */ private boolean _persistent = false; - /** Holds the message size to send, from the run method. */ + /** + * Holds the message size to send, from the run method. + */ private int _messageSize = DEFAULT_MESSAGE_SIZE; - /** Used to indicate that the ping loop should print out whenever it pings. */ + /** + * Used to indicate that the ping loop should print out whenever it pings. + */ private boolean _verbose = false; + public TestPingProducer(String brokerDetails, String username, String password, String virtualpath, String queueName, - boolean transacted, boolean persistent, int messageSize, boolean verbose) throws Exception + boolean transacted, boolean persistent, int messageSize, boolean verbose, boolean afterCommit, + boolean beforeCommit, boolean afterSend, boolean beforeSend,int batchSize) throws Exception { // Create a connection to the broker. InetAddress address = InetAddress.getLocalHost(); @@ -96,6 +107,14 @@ class TestPingProducer extends AbstractPingProducer _messageSize = messageSize; _verbose = verbose; + + // Set failover interrupts + _failAfterCommit = afterCommit; + _failBeforeCommit = beforeCommit; + _failAfterSend = afterSend; + _failBeforeSend = beforeSend; + _sentMessages = 0; + _batchSize = batchSize; } /** @@ -110,7 +129,8 @@ class TestPingProducer extends AbstractPingProducer if (args.length < 2) { System.err.println( - "Usage: TestPingPublisher [verbose] [transacted] [persistent] [message size in bytes]"); + "Usage: TestPingPublisher "+ + "[ "); System.exit(0); } @@ -120,10 +140,46 @@ class TestPingProducer extends AbstractPingProducer boolean transacted = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false; boolean persistent = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false; int messageSize = (args.length >= 6) ? Integer.parseInt(args[5]) : DEFAULT_MESSAGE_SIZE; + int batchSize = (args.length >= 7) ? Integer.parseInt(args[6]) : 1; + + + boolean afterCommit = false; + boolean beforeCommit = false; + boolean afterSend = false; + boolean beforeSend = false; + + for (String arg : args) + { + if (arg.startsWith("failover:")) + { + //failover:: + String[] parts = arg.split(":"); + if (parts.length == 3) + { + if (parts[2].equals("commit")) + { + afterCommit = parts[1].equals("after"); + beforeCommit = parts[1].equals("before"); + } + + if (parts[2].equals("send")) + { + afterSend = parts[1].equals("after"); + beforeSend = parts[1].equals("before"); + } + } + else + { + System.out.println("Unrecognized failover request:" + arg); + } + } + } // Create a ping producer to generate the pings. - _pingProducer = new TestPingProducer(brokerDetails, "guest", "guest", virtualpath, PING_QUEUE_NAME, transacted, - persistent, messageSize, verbose); + _pingProducer = new TestPingProducer(brokerDetails, "guest", "guest", virtualpath, PING_QUEUE_NAME, + transacted, persistent, messageSize, verbose, + afterCommit, beforeCommit, afterSend, beforeSend, + batchSize); // Start the connection running. _pingProducer.getConnection().start(); @@ -144,19 +200,18 @@ class TestPingProducer extends AbstractPingProducer * Sends the specified ping message. * * @param message The message to send. - * * @throws JMSException All underlying JMSExceptions are allowed to fall through. */ public void ping(Message message) throws JMSException { - _producer.send(message); + sendMessage(message); // Keep the messageId to correlate with the reply. String messageId = message.getJMSMessageID(); // Commit the transaction if running in transactional mode. This must happen now, rather than at the end of - // this method, as the message will not be sent until the transaction is committed. - commitTx(getProducerSession()); + // this method, as the message will not be sent until the transaction is committed. + commitTx(); } /** diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index 0b5f040b90..3c3e31dd55 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -44,17 +44,17 @@ import org.apache.qpid.ping.AbstractPingProducer; * client (see {@link PingPongBouncer} for the bounce back client). It is designed to be run from the command line * as a stand alone test tool, but it may also be fairly easily instantiated by other code by supplying a session, * message producer and message consumer to run the ping-pong cycle on. - * + *

*

The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings. * This means that this class has to do some work to correlate pings with pongs; it expectes the original message * id in the ping to be bounced back in the correlation id. If a new temporary queue per ping were used, then * this correlation would not need to be done. - * + *

*

This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop * does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so * by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is * also registered to terminate the ping-pong loop cleanly. - * + *

*

*
CRC Card
Responsibilities Collaborations *
Provide a ping and wait for response cycle. @@ -62,65 +62,91 @@ import org.apache.qpid.ping.AbstractPingProducer; *
* * @todo Make temp queue per ping a command line option. - * * @todo Make the queue name a command line option. */ public class PingPongProducer extends AbstractPingProducer implements Runnable, MessageListener, ExceptionListener { private static final Logger _logger = Logger.getLogger(PingPongProducer.class); - /** Used to set up a default message size. */ + /** + * Used to set up a default message size. + */ protected static final int DEFAULT_MESSAGE_SIZE = 0; - /** Used to define how long to wait between pings. */ + /** + * Used to define how long to wait between pings. + */ protected static final long SLEEP_TIME = 250; - /** Used to define how long to wait before assuming that a ping has timed out. */ + /** + * Used to define how long to wait before assuming that a ping has timed out. + */ protected static final long TIMEOUT = 9000; - /** Holds the name of the queue to send pings on. */ + /** + * Holds the name of the queue to send pings on. + */ protected static final String PING_QUEUE_NAME = "ping"; - /** The batch size. */ + /** + * The batch size. + */ protected static final int BATCH_SIZE = 100; - /** Keeps track of the ping producer instance used in the run loop. */ + /** + * Keeps track of the ping producer instance used in the run loop. + */ private static PingPongProducer _pingProducer; protected static final int PREFETCH = 100; protected static final boolean NO_LOCAL = true; protected static final boolean EXCLUSIVE = false; - /** The number of priming loops to run. */ + /** + * The number of priming loops to run. + */ protected static final int PRIMING_LOOPS = 3; - /** A source for providing sequential unique correlation ids. */ + /** + * A source for providing sequential unique correlation ids. + */ private AtomicLong idGenerator = new AtomicLong(0L); - /** Holds the message producer to send the pings through. */ - private MessageProducer _producer; - - /** Holds the queue to send the ping replies to. */ + /** + * Holds the queue to send the ping replies to. + */ private Queue _replyQueue; - /** Hold the known Queue where the producer will be sending message to*/ + /** + * Hold the known Queue where the producer will be sending message to + */ private Queue _pingQueue; - /** Determines whether this producer sends persistent messages from the run method. */ + /** + * Determines whether this producer sends persistent messages from the run method. + */ protected boolean _persistent; - /** Holds the message size to send, from the run method. */ + /** + * Holds the message size to send, from the run method. + */ protected int _messageSize; - /** Holds a map from message ids to latches on which threads wait for replies. */ + /** + * Holds a map from message ids to latches on which threads wait for replies. + */ private Map trafficLights = new HashMap(); - /** Used to indicate that the ping loop should print out whenever it pings. */ + /** + * Used to indicate that the ping loop should print out whenever it pings. + */ protected boolean _verbose = false; protected Session _consumerSession; - protected PingPongProducer(String brokerDetails, String username, String password, String virtualpath, - boolean transacted) + private PingPongProducer(String brokerDetails, String username, String password, String virtualpath, + boolean transacted, boolean persistent, int messageSize, boolean verbose, + boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, + int batchSize) throws Exception { // Create a connection to the broker. @@ -132,6 +158,18 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, // Create transactional or non-transactional sessions, based on the command line arguments. setProducerSession((Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE)); _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); + + _persistent = persistent; + _messageSize = messageSize; + _verbose = verbose; + + // Set failover interrupts + _failAfterCommit = afterCommit; + _failBeforeCommit = beforeCommit; + _failAfterSend = afterSend; + _failBeforeSend = beforeSend; + _batchSize = batchSize; + _sentMessages = 0; } /** @@ -142,31 +180,39 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, * @param password * @param virtualpath * @param transacted - * @param persistent - * @param messageSize - * @param verbose - * * @throws Exception All allowed to fall through. This is only test code... */ public PingPongProducer(String brokerDetails, String username, String password, String virtualpath, String queueName, - String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose) - throws Exception + String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose, + boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, + int batchSize, int queueCount) + throws Exception { - this(brokerDetails, username, password, virtualpath, transacted); + this(brokerDetails, username, password, virtualpath, transacted, persistent, messageSize, verbose, + afterCommit, beforeCommit, afterSend, beforeSend, batchSize); - _pingQueue = new AMQQueue(queueName); - _persistent = persistent; - _messageSize = messageSize; - _verbose = verbose; - - // Create producer and the consumer - createProducer(); - createConsumer(selector); + if (queueName != null) + { + _pingQueue = new AMQQueue(queueName); + // Create producer and the consumer + createProducer(); + createConsumer(selector); + } + else if (queueCount > 0) + { + _queueCount = queueCount; + } + else + { + _logger.error("Queue Count is zero and no queueName specified. One must be set."); + throw new IllegalArgumentException("Queue Count is zero and no queueName specified. One must be set."); + } } /** * Creates the producer to send the pings on. If the tests are with nultiple queues, then producer * is created with null destination, so that any destination can be specified while sending + * * @throws JMSException */ public void createProducer() throws JMSException @@ -189,6 +235,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, /** * Creates the temporary queue to listen to the responses + * * @param selector * @throws JMSException */ @@ -204,6 +251,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, /** * Creates consumer instances for each queue. This is used when test is being done with multiple queues. + * * @param selector * @throws JMSException */ @@ -234,7 +282,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, /** * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongBouncer} also needs * to be started to bounce the pings back again. - * + *

*

The command line takes from 2 to 4 arguments: *

*
brokerDetails The broker connection string. @@ -251,7 +299,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, if (args.length < 2) { System.err.println("Usage: TestPingPublisher [verbose (true/false)] " + - "[transacted (true/false)] [persistent (true/false)] [message size in bytes]"); + "[transacted (true/false)] [persistent (true/false)] [message size in bytes]"); System.exit(0); } @@ -261,10 +309,46 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, boolean transacted = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false; boolean persistent = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false; int messageSize = (args.length >= 6) ? Integer.parseInt(args[5]) : DEFAULT_MESSAGE_SIZE; + int batchSize = (args.length >= 7) ? Integer.parseInt(args[6]) : 1; + + boolean afterCommit = false; + boolean beforeCommit = false; + boolean afterSend = false; + boolean beforeSend = false; + + for (String arg : args) + { + if (arg.startsWith("failover:")) + { + //failover:: + String[] parts = arg.split(":"); + if (parts.length == 3) + { + if (parts[2].equals("commit")) + { + afterCommit = parts[1].equals("after"); + beforeCommit = parts[1].equals("before"); + } + + if (parts[2].equals("send")) + { + afterSend = parts[1].equals("after"); + beforeSend = parts[1].equals("before"); + } + } + else + { + System.out.println("Unrecognized failover request:" + arg); + } + } + } // Create a ping producer to handle the request/wait/reply cycle. _pingProducer = new PingPongProducer(brokerDetails, "guest", "guest", virtualpath, PING_QUEUE_NAME, null, transacted, - persistent, messageSize, verbose); + persistent, messageSize, verbose, + afterCommit, beforeCommit, afterSend, beforeSend, + batchSize, 0); + _pingProducer.getConnection().start(); // Run a few priming pings to remove warm up time from test results. @@ -287,7 +371,6 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, * this a few times, in order to prime the JVMs JIT compilation. * * @param x The number of priming loops to run. - * * @throws JMSException All underlying exceptions are allowed to fall through. */ public void prime(int x) throws JMSException @@ -296,15 +379,18 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, { // Create and send a small message. Message first = getTestMessage(_replyQueue, 0, false); - _producer.send(first); - commitTx(getProducerSession()); + + sendMessage(first); + + commitTx(); try { Thread.sleep(100); } catch (InterruptedException ignore) - { } + { + } } } @@ -365,10 +451,8 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, * @param message The message to send. * @param numPings The number of ping messages to send. * @param timeout The timeout in milliseconds. - * * @return The number of replies received. This may be less than the number sent if the timeout terminated the * wait for all prematurely. - * * @throws JMSException All underlying JMSExceptions are allowed to fall through. */ public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException @@ -394,13 +478,13 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, { // Re-timestamp the message. message.setLongProperty("timestamp", System.currentTimeMillis()); - _producer.send(message); + sendMessage(message); } } // Commit the transaction if running in transactional mode. This must happen now, rather than at the end of // this method, as the message will not be sent until the transaction is committed. - commitTx(getProducerSession()); + commitTx(); // Keep the messageId to correlate with the reply. //String messageId = message.getJMSMessageID(); @@ -429,7 +513,8 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, /** * When the test is being performed with multiple queues, then this method will be used, which has a loop to - * pick up the next queue from the queues list and sends message to it. + * pick up the next queue from the queues list and sends message to it. + * * @param message * @param numPings * @throws JMSException @@ -441,31 +526,30 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, { // Re-timestamp the message. message.setLongProperty("timestamp", System.currentTimeMillis()); - _producer.send(getQueue(queueIndex++), message); + + sendMessage(getQueue(queueIndex++), message); // reset the counter to get the first queue - if (queueIndex == getQueueCount() -1) + if (queueIndex == getQueueCount() - 1) { queueIndex = 0; } } } - + /** * Sends the specified ping message but does not wait for a correlating reply. * * @param message The message to send. * @param numPings The number of pings to send. - * * @return The reply, or null if no reply arrives before the timeout. - * * @throws JMSException All underlying JMSExceptions are allowed to fall through. */ public void pingNoWaitForReply(Message message, int numPings) throws JMSException, InterruptedException { for (int i = 0; i < numPings; i++) { - _producer.send(message); + sendMessage(message); if (_verbose) { @@ -474,7 +558,7 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, } // Commit the transaction if running in transactional mode, to force the send now. - commitTx(getProducerSession()); + commitTx(); } /** @@ -524,19 +608,21 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, public static class FailoverNotifier implements ConnectionListener { public void bytesSent(long count) - { } + { + } public void bytesReceived(long count) - { } + { + } public boolean preFailover(boolean redirect) { - return true; + return true; //Allow failover } public boolean preResubscribe() { - return true; + return true; // Allow resubscription } public void failoverComplete() diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java index ef34b92265..e416d31031 100644 --- a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java +++ b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java @@ -13,23 +13,22 @@ import org.apache.log4j.Logger; import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase; /** - * * PingTestPerf is a ping test, that has been written with the intention of being scaled up to run many times * simultaneously to simluate many clients/producers/connections. - * + *

*

A single run of the test using the default JUnit test runner will result in the sending and timing of a single * full round trip ping. This test may be scaled up using a suitable JUnit test runner. - * + *

*

The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a * temporary queue for replies. This setup is only established once for all the test repeats/threads that may be run, * except if the connection is lost in which case an attempt to re-establish the setup is made. - * + *

*

The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that * is the name of the temporary queue, fires off a message on the original queue and waits for a response on the * temporary queue. - * + *

*

Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. - * + *

*

*
CRC Card
Responsibilities Collaborations *
@@ -40,65 +39,95 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll { private static Logger _logger = Logger.getLogger(PingTestPerf.class); - /** Holds the name of the property to get the test message size from. */ + /** + * Holds the name of the property to get the test message size from. + */ private static final String MESSAGE_SIZE_PROPNAME = "messageSize"; - /** Holds the name of the property to get the ping queue name from. */ + /** + * Holds the name of the property to get the ping queue name from. + */ private static final String PING_QUEUE_NAME_PROPNAME = "pingQueue"; - /** holds the queue count, if the test is being performed with multiple queues */ + /** + * holds the queue count, if the test is being performed with multiple queues + */ private static final String PING_QUEUE_COUNT_PROPNAME = "queues"; - /** Holds the name of the property to get the test delivery mode from. */ + /** + * Holds the name of the property to get the test delivery mode from. + */ private static final String PERSISTENT_MODE_PROPNAME = "persistent"; - /** Holds the name of the property to get the test transactional mode from. */ + /** + * Holds the name of the property to get the test transactional mode from. + */ private static final String TRANSACTED_PROPNAME = "transacted"; - /** Holds the name of the property to get the test broker url from. */ + /** + * Holds the name of the property to get the test broker url from. + */ private static final String BROKER_PROPNAME = "broker"; - /** Holds the name of the property to get the test broker virtual path. */ + /** + * Holds the name of the property to get the test broker virtual path. + */ private static final String VIRTUAL_PATH_PROPNAME = "virtualPath"; - /** Holds the waiting timeout for response messages */ + /** + * Holds the waiting timeout for response messages + */ private static final String TIMEOUT_PROPNAME = "timeout"; - /** Holds the size of message body to attach to the ping messages. */ + /** + * Holds the size of message body to attach to the ping messages. + */ private static final int MESSAGE_SIZE_DEFAULT = 0; - /** Holds the name of the queue to which pings are sent. */ + private static final int BATCH_SIZE_DEFAULT = 2; + + /** + * Holds the name of the queue to which pings are sent. + */ private static final String PING_QUEUE_NAME_DEFAULT = "ping"; - /** Holds the message delivery mode to use for the test. */ + /** + * Holds the message delivery mode to use for the test. + */ private static final boolean PERSISTENT_MODE_DEFAULT = false; - /** Holds the transactional mode to use for the test. */ + /** + * Holds the transactional mode to use for the test. + */ private static final boolean TRANSACTED_DEFAULT = false; - /** Holds the default broker url for the test. */ + /** + * Holds the default broker url for the test. + */ private static final String BROKER_DEFAULT = "tcp://localhost:5672"; - /** Holds the default virtual path for the test. */ + /** + * Holds the default virtual path for the test. + */ private static final String VIRTUAL_PATH_DEFAULT = "/test"; - /** Sets a default ping timeout. */ + /** + * Sets a default ping timeout. + */ private static final long TIMEOUT_DEFAULT = 3000; - // Sets up the test parameters with defaults. - static - { - setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT)); - setSystemPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT); - setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT)); - setSystemPropertyIfNull(TRANSACTED_PROPNAME, Boolean.toString(TRANSACTED_DEFAULT)); - setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT); - setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT); - setSystemPropertyIfNull(TIMEOUT_PROPNAME, Long.toString(TIMEOUT_DEFAULT)); - setSystemPropertyIfNull(PING_QUEUE_COUNT_PROPNAME, Integer.toString(1)); - } - /** Thread local to hold the per-thread test setup fields. */ + private static final String FAIL_AFTER_COMMIT = "FailAfterCommit"; + private static final String FAIL_BEFORE_COMMIT = "FailBeforeCommit"; + private static final String FAIL_AFTER_SEND = "FailAfterSend"; + private static final String FAIL_BEFORE_SEND = "FailBeforeSend"; + private static final String BATCH_SIZE = "BatchSize"; + + + + /** + * Thread local to hold the per-thread test setup fields. + */ ThreadLocal threadSetup = new ThreadLocal(); // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in @@ -107,9 +136,27 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll private Properties testParameters = System.getProperties(); //private Properties testParameters = new ContextualProperties(System.getProperties()); + public PingTestPerf(String name) { super(name); + // Sets up the test parameters with defaults. + + + setSystemPropertyIfNull(FAIL_AFTER_COMMIT, "false"); + setSystemPropertyIfNull(FAIL_BEFORE_COMMIT, "false"); + setSystemPropertyIfNull(FAIL_AFTER_SEND, "false"); + setSystemPropertyIfNull(FAIL_BEFORE_SEND, "false"); + + setSystemPropertyIfNull(BATCH_SIZE, Integer.toString(BATCH_SIZE_DEFAULT)); + setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT)); + setSystemPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT); + setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT)); + setSystemPropertyIfNull(TRANSACTED_PROPNAME, Boolean.toString(TRANSACTED_DEFAULT)); + setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT); + setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT); + setSystemPropertyIfNull(TIMEOUT_PROPNAME, Long.toString(TIMEOUT_DEFAULT)); + setSystemPropertyIfNull(PING_QUEUE_COUNT_PROPNAME, Integer.toString(1)); } /** @@ -124,7 +171,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll suite.addTest(new PingTestPerf("testPingOk")); return suite; - //return new junit.framework.TestSuite(PingTestPerf.class); + //return new junit.framework.TestSuite(PingTestPerf.class); } private static void setSystemPropertyIfNull(String propName, String propValue) @@ -135,18 +182,28 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll } } + public void testPing(int jim) throws Exception + { + testPingOk(1); + } + public void testPingOk(int numPings) throws Exception { // Get the per thread test setup to run the test through. PerThreadSetup perThreadSetup = threadSetup.get(); + if (numPings == 0) + { + _logger.error("Number of pings requested was zero."); + } + // Generate a sample message. This message is already time stamped and has its reply-to destination set. ObjectMessage msg = - perThreadSetup._pingItselfClient.getTestMessage(null, - Integer.parseInt(testParameters.getProperty( - MESSAGE_SIZE_PROPNAME)), - Boolean.parseBoolean(testParameters.getProperty( - PERSISTENT_MODE_PROPNAME))); + perThreadSetup._pingItselfClient.getTestMessage(null, + Integer.parseInt(testParameters.getProperty( + MESSAGE_SIZE_PROPNAME)), + Boolean.parseBoolean(testParameters.getProperty( + PERSISTENT_MODE_PROPNAME))); // start the test long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME)); @@ -185,20 +242,31 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll boolean verbose = false; int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)); + boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT)); + boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT)); + boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND)); + boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND)); + + int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE)); + // Establish a client to ping a Queue and listen the reply back from same Queue if (queueCount > 1) { // test client with multiple queues perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath, - queueCount, selector, transacted, persistent, - messageSize, verbose); + selector, transacted, persistent, + messageSize, verbose, + afterCommit, beforeCommit, afterSend, beforeSend, + batchSize, queueCount); } else { // Establish a client to ping a Queue and listen the reply back from same Queue perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath, queueName, selector, transacted, persistent, - messageSize, verbose); + messageSize, verbose, + afterCommit, beforeCommit, afterSend, beforeSend, + batchSize); } // Start the client connection @@ -228,7 +296,9 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll private static class PerThreadSetup { - /** Holds the test ping client. */ + /** + * Holds the test ping client. + */ private TestPingItself _pingItselfClient; } } diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java index f553faf302..3e1035ce05 100644 --- a/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java +++ b/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java @@ -18,22 +18,22 @@ import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase; * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run * many times simultaneously to simluate many clients/producer/connections. A full round trip ping sends a message from * a producer to a conumer, then the consumer replies to the message on a temporary queue. - * + *

*

A single run of the test using the default JUnit test runner will result in the sending and timing of the number * of pings specified by the test size and time how long it takes for all of these to complete. This test may be scaled * up using a suitable JUnit test runner. See {@link TKTestRunner} or {@link PPTestRunner} for more information on how * to do this. - * + *

*

The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a * temporary queue for replies. This setup is only established once for all the test repeats, but each test threads * gets its own connection/producer/consumer, this is only re-established if the connection is lost. - * + *

*

The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that * is the name of the temporary queue, fires off many messages on the original queue and waits for them all to come * back on the temporary queue. - * + *

*

Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. - * + *

*

*
CRC Card
Responsibilities Collaborations *
@@ -44,43 +44,69 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont { private static Logger _logger = Logger.getLogger(PingPongTestPerf.class); - /** Holds the name of the property to get the test message size from. */ + /** + * Holds the name of the property to get the test message size from. + */ private static final String MESSAGE_SIZE_PROPNAME = "messageSize"; - /** Holds the name of the property to get the ping queue name from. */ + /** + * Holds the name of the property to get the ping queue name from. + */ private static final String PING_QUEUE_NAME_PROPNAME = "pingQueue"; - /** Holds the name of the property to get the test delivery mode from. */ + /** + * Holds the name of the property to get the test delivery mode from. + */ private static final String PERSISTENT_MODE_PROPNAME = "persistent"; - /** Holds the name of the property to get the test transactional mode from. */ + /** + * Holds the name of the property to get the test transactional mode from. + */ private static final String TRANSACTED_PROPNAME = "transacted"; - /** Holds the name of the property to get the test broker url from. */ + /** + * Holds the name of the property to get the test broker url from. + */ private static final String BROKER_PROPNAME = "broker"; - /** Holds the name of the property to get the test broker virtual path. */ + /** + * Holds the name of the property to get the test broker virtual path. + */ private static final String VIRTUAL_PATH_PROPNAME = "virtualPath"; - /** Holds the size of message body to attach to the ping messages. */ + /** + * Holds the size of message body to attach to the ping messages. + */ private static final int MESSAGE_SIZE_DEFAULT = 0; - /** Holds the name of the queue to which pings are sent. */ + /** + * Holds the name of the queue to which pings are sent. + */ private static final String PING_QUEUE_NAME_DEFAULT = "ping"; - /** Holds the message delivery mode to use for the test. */ + /** + * Holds the message delivery mode to use for the test. + */ private static final boolean PERSISTENT_MODE_DEFAULT = false; - /** Holds the transactional mode to use for the test. */ + /** + * Holds the transactional mode to use for the test. + */ private static final boolean TRANSACTED_DEFAULT = false; - /** Holds the default broker url for the test. */ + /** + * Holds the default broker url for the test. + */ private static final String BROKER_DEFAULT = "tcp://localhost:5672"; - /** Holds the default virtual path for the test. */ + /** + * Holds the default virtual path for the test. + */ private static final String VIRTUAL_PATH_DEFAULT = "/test"; - /** Sets a default ping timeout. */ + /** + * Sets a default ping timeout. + */ private static final long TIMEOUT = 15000; // Sets up the test parameters with defaults. @@ -94,7 +120,9 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT); } - /** Thread local to hold the per-thread test setup fields. */ + /** + * Thread local to hold the per-thread test setup fields. + */ ThreadLocal threadSetup = new ThreadLocal(); // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in @@ -103,6 +131,13 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont private Properties testParameters = System.getProperties(); //private Properties testParameters = new ContextualProperties(System.getProperties()); + private static final String FAIL_AFTER_COMMIT = "FailAfterCommit"; + private static final String FAIL_BEFORE_COMMIT = "FailBeforeCommit"; + private static final String FAIL_AFTER_SEND = "FailAfterSend"; + private static final String FAIL_BEFORE_SEND = "FailBeforeSend"; + private static final String BATCH_SIZE = "BatchSize"; + + public PingPongTestPerf(String name) { super(name); @@ -137,11 +172,11 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont // Generate a sample message. This message is already time stamped and has its reply-to destination set. ObjectMessage msg = - perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyQueue(), - Integer.parseInt(testParameters.getProperty( - MESSAGE_SIZE_PROPNAME)), - Boolean.parseBoolean(testParameters.getProperty( - PERSISTENT_MODE_PROPNAME))); + perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyQueue(), + Integer.parseInt(testParameters.getProperty( + MESSAGE_SIZE_PROPNAME)), + Boolean.parseBoolean(testParameters.getProperty( + PERSISTENT_MODE_PROPNAME))); // Use the test timing controller to reset the test timer now and obtain the current time. // This can be used to remove the message creation time from the test. @@ -181,6 +216,12 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont boolean verbose = false; int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)); + boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT)); + boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT)); + boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND)); + boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND)); + int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE)); + // Establish a bounce back client on the ping queue to bounce back the pings. perThreadSetup._testPingBouncer = new PingPongBouncer(brokerDetails, username, password, virtualpath, queueName, persistent, transacted, selector, verbose); @@ -191,7 +232,9 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont // Establish a ping-pong client on the ping queue to send the pings with. perThreadSetup._testPingProducer = new PingPongProducer(brokerDetails, username, password, virtualpath, queueName, selector, transacted, persistent, messageSize, - verbose); + verbose, + afterCommit, beforeCommit, afterSend, beforeSend, + batchSize, 0); perThreadSetup._testPingProducer.getConnection().start(); @@ -205,14 +248,14 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont try { /**if ((_testPingBouncer != null) && (_testPingBouncer.getConnection() != null)) - { - _testPingBouncer.getConnection().close(); - } - - if ((_testPingProducer != null) && (_testPingProducer.getConnection() != null)) - { - _testPingProducer.getConnection().close(); - }*/ + { + _testPingBouncer.getConnection().close(); + } + + if ((_testPingProducer != null) && (_testPingProducer.getConnection() != null)) + { + _testPingProducer.getConnection().close(); + }*/ } finally { @@ -222,10 +265,14 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont private static class PerThreadSetup { - /** Holds the test ping-pong producer. */ + /** + * Holds the test ping-pong producer. + */ private PingPongProducer _testPingProducer; - /** Holds the test ping client. */ + /** + * Holds the test ping client. + */ private PingPongBouncer _testPingBouncer; } } -- cgit v1.2.1