From 553a5374ebca0fa86fe927e18a5288644508a50a Mon Sep 17 00:00:00 2001 From: Robert Greig Date: Thu, 18 Jan 2007 13:11:39 +0000 Subject: (Patch submitted by Rupert Smith) Restructured the ping tests, they now share common base classes to avoid cut and paste coding. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@497425 13f79535-47bb-0310-9956-ffa450edef68 --- java/etc/log4j.xml | 43 +++ java/perftests/jar-with-dependencies.xml | 29 ++ java/perftests/pom.xml | 4 +- .../org/apache/qpid/ping/AbstractPingClient.java | 31 +- .../org/apache/qpid/ping/AbstractPingProducer.java | 92 +++--- .../java/org/apache/qpid/ping/TestPingClient.java | 93 +++--- .../org/apache/qpid/ping/TestPingProducer.java | 111 +++---- .../apache/qpid/requestreply/PingPongBouncer.java | 283 ++++++++++++++++++ .../apache/qpid/requestreply/PingPongClient.java | 175 ------------ .../apache/qpid/requestreply/PingPongProducer.java | 318 +++++++++++++++------ .../java/org/apache/qpid/ping/PingTestPerf.java | 180 ++++++++++++ .../apache/qpid/requestreply/PingPongTestPerf.java | 120 +++----- java/pom.xml | 4 +- 13 files changed, 970 insertions(+), 513 deletions(-) create mode 100644 java/etc/log4j.xml create mode 100644 java/perftests/jar-with-dependencies.xml create mode 100644 java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongClient.java create mode 100644 java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java (limited to 'java') diff --git a/java/etc/log4j.xml b/java/etc/log4j.xml new file mode 100644 index 0000000000..d314d8fb23 --- /dev/null +++ b/java/etc/log4j.xml @@ -0,0 +1,43 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/java/perftests/jar-with-dependencies.xml b/java/perftests/jar-with-dependencies.xml new file mode 100644 index 0000000000..62978ee864 --- /dev/null +++ b/java/perftests/jar-with-dependencies.xml @@ -0,0 +1,29 @@ + + + all-test-deps + + jar + + false + + + + + true + test + + + + + target/classes + + + + target/test-classes + + + + diff --git a/java/perftests/pom.xml b/java/perftests/pom.xml index 2f79654e6a..a37e3f0946 100644 --- a/java/perftests/pom.xml +++ b/java/perftests/pom.xml @@ -97,7 +97,7 @@ ./bin/script_name or ./bin/script_name.bat - These scripts can find everything in the 'all test dependencies' jar. + These scripts can find everything in the 'all test dependencies' jar created by the assembly:assembly goal. --> jar-with-dependencies.xml diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java index 3c1a476d51..7c82710a3f 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java @@ -1,9 +1,13 @@ package org.apache.qpid.ping; +import java.text.SimpleDateFormat; + +import javax.jms.Connection; import javax.jms.JMSException; import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.jms.Session; /** @@ -19,19 +23,20 @@ import org.apache.qpid.jms.Session; */ public abstract class AbstractPingClient { + /** Used to format time stamping output. */ + protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); + private static final Logger _logger = Logger.getLogger(TestPingClient.class); + private AMQConnection _connection; - /** Used to keep a handle on the JMS session to send replies using. */ - protected Session _session; + public AMQConnection getConnection() + { + return _connection; + } - /** - * Creates an abstract ping client to manage the specified transcation. - * - * @param session The session. - */ - public AbstractPingClient(Session session) + public void setConnection(AMQConnection _connection) { - _session = session; + this._connection = _connection; } /** @@ -39,13 +44,13 @@ public abstract class AbstractPingClient * * @throws javax.jms.JMSException If the commit fails and then the rollback fails. */ - protected void commitTx() throws JMSException + protected void commitTx(Session session) throws JMSException { - if (_session.getTransacted()) + if (session.getTransacted()) { try { - _session.commit(); + session.commit(); _logger.trace("Session Commited."); } catch (JMSException e) @@ -54,7 +59,7 @@ public abstract class AbstractPingClient try { - _session.rollback(); + session.rollback(); _logger.debug("Message rolled back."); } catch (JMSException jmse) diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java index e2c2d5b440..bedd6e3d16 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java @@ -1,5 +1,7 @@ package org.apache.qpid.ping; +import java.text.SimpleDateFormat; + import javax.jms.*; import org.apache.log4j.Logger; @@ -15,9 +17,10 @@ import org.apache.qpid.jms.Session; * *

*
CRC Card
Responsibilities Collaborations - *
Manage session. + *
Manage the connection. *
Provide clean shutdown on exception or shutdown hook. *
Provide useable shutdown hook implementation. + *
Run a ping loop. *
* * @author Rupert Smith @@ -26,51 +29,66 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene { private static final Logger _logger = Logger.getLogger(AbstractPingProducer.class); - /** Holds the current Qpid session to send and receive pings on. */ - protected Session _session; + /** Used to format time stamping output. */ + protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); /** Used to tell the ping loop when to terminate, it only runs while this is true. */ protected boolean _publish = true; + /** Holds the connection handle to the broker. */ + private Connection _connection; + + /** Holds the producer session, need to create test messages. */ + private Session _producerSession; + /** - * Creates an AbstractPingProducer on a session. + * Convenience method for a short pause. + * + * @param sleepTime The time in milliseconds to pause for. */ - public AbstractPingProducer(Session session) + public static void pause(long sleepTime) { - _session = session; + if (sleepTime > 0) + { + try + { + Thread.sleep(sleepTime); + } + catch (InterruptedException ie) + { } + } } + public abstract void pingLoop(); + /** * Generates a test message of the specified size. * - * @param session The Qpid session under which to generate the message. * @param replyQueue The reply-to destination for the message. * @param messageSize The desired size of the message in bytes. - * @param currentTime The timestamp to add to the message as a "timestamp" property. * * @return A freshly generated test message. * * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through. */ - public static ObjectMessage getTestMessage(Session session, Queue replyQueue, int messageSize, long currentTime, - boolean persistent) throws JMSException + public ObjectMessage getTestMessage(Queue replyQueue, int messageSize, boolean persistent) throws JMSException { ObjectMessage msg; if (messageSize != 0) { - msg = TestMessageFactory.newObjectMessage(session, messageSize); + msg = TestMessageFactory.newObjectMessage(_producerSession, messageSize); } else { - msg = session.createObjectMessage(); + msg = _producerSession.createObjectMessage(); } // Set the messages persistent delivery flag. msg.setJMSDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); // Timestamp the message. - msg.setLongProperty("timestamp", currentTime); + msg.setLongProperty("timestamp", System.currentTimeMillis()); // Ensure that the temporary reply queue is set as the reply to destination for the message. if (replyQueue != null) @@ -81,26 +99,6 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene return msg; } - /** - * Convenience method for a short pause. - * - * @param sleepTime The time in milliseconds to pause for. - */ - public static void pause(long sleepTime) - { - if (sleepTime > 0) - { - try - { - Thread.sleep(sleepTime); - } - catch (InterruptedException ie) - { } - } - } - - public abstract void pingLoop(); - /** * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this * flag has been cleared. @@ -151,18 +149,38 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene }); } + public Connection getConnection() + { + return _connection; + } + + public void setConnection(Connection connection) + { + this._connection = connection; + } + + public Session getProducerSession() + { + return _producerSession; + } + + public void setProducerSession(Session session) + { + this._producerSession = session; + } + /** * Convenience method to commit the transaction on the session associated with this pinger. * * @throws javax.jms.JMSException If the commit fails and then the rollback fails. */ - protected void commitTx() throws JMSException + protected void commitTx(Session session) throws JMSException { - if (_session.getTransacted()) + if (session.getTransacted()) { try { - _session.commit(); + session.commit(); _logger.trace("Session Commited."); } catch (JMSException e) @@ -177,7 +195,7 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene try { - _session.rollback(); + session.rollback(); _logger.trace("Message rolled back."); } catch (JMSException jmse) diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java index 3063e83127..db89b0d38a 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java @@ -43,6 +43,8 @@ import org.apache.qpid.jms.Session; * Responsibilities Collaborations * Provide command line invocation to start the ping consumer on a configurable broker url. * + * + * @todo Add a better command line interpreter to the main method. The command line is not very nice... */ class TestPingClient extends AbstractPingClient implements MessageListener { @@ -51,20 +53,42 @@ class TestPingClient extends AbstractPingClient implements MessageListener /** Used to indicate that the reply generator should log timing info to the console (logger info level). */ private boolean _verbose = false; + /** The producer session. */ + private Session _consumerSession; + /** - * Creates a PingPongClient on the specified session. + * Creates a TestPingClient on the specified session. + * + * @param brokerDetails + * @param username + * @param password + * @param queueName + * @param virtualpath + * @param transacted + * @param selector + * @param verbose * - * @param session The JMS Session for the ping pon client to run on. - * @param consumer The message consumer to receive the messages with. - * @param verbose If set to true will output timing information on every message. + * @throws Exception All underlying exceptions allowed to fall through. This is only test code... */ - public TestPingClient(Session session, MessageConsumer consumer, boolean verbose) throws JMSException + public TestPingClient(String brokerDetails, String username, String password, String queueName, String virtualpath, + boolean transacted, String selector, boolean verbose) throws Exception { - // Hang on to the session for the replies. - super(session); + // Create a connection to the broker. + InetAddress address = InetAddress.getLocalHost(); + String clientID = address.getHostName() + System.currentTimeMillis(); - // Set this up to listen for messages on the queue. + setConnection(new AMQConnection(brokerDetails, username, password, clientID, virtualpath)); + + // Create a transactional or non-transactional session depending on the command line parameter. + _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); + + // Connect a consumer to the ping queue and register this to be called back by it. + Queue q = new AMQQueue(queueName); + MessageConsumer consumer = _consumerSession.createConsumer(q, 1, false, false, selector); consumer.setMessageListener(this); + + // Hang on to the verbose flag setting. + _verbose = verbose; } /** @@ -72,57 +96,32 @@ class TestPingClient extends AbstractPingClient implements MessageListener * * @param args */ - public static void main(String[] args) + public static void main(String[] args) throws Exception { _logger.info("Starting..."); // Display help on the command line. if (args.length < 4) { - System.out.println("Usage: brokerdetails username password virtual-path [transacted] [selector]"); + System.out.println( + "Usage: brokerdetails username password virtual-path [queueName] [verbose] [transacted] [selector]"); System.exit(1); } - // Extract all comman line parameters. + // Extract all command line parameters. String brokerDetails = args[0]; String username = args[1]; String password = args[2]; String virtualpath = args[3]; - boolean transacted = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false; - String selector = (args.length == 6) ? args[5] : null; - - try - { - InetAddress address = InetAddress.getLocalHost(); - - AMQConnection con1 = new AMQConnection(brokerDetails, username, password, address.getHostName(), virtualpath); - - _logger.info("Connected with URL:" + con1.toURL()); + String queueName = (args.length >= 5) ? args[4] : "ping"; + boolean verbose = (args.length >= 6) ? Boolean.parseBoolean(args[5]) : true; + boolean transacted = (args.length >= 7) ? Boolean.parseBoolean(args[6]) : false; + String selector = (args.length == 8) ? args[7] : null; - // Create a transactional or non-transactional session depending on the command line parameter. - Session session = null; - - if (transacted) - { - session = (org.apache.qpid.jms.Session) con1.createSession(false, Session.SESSION_TRANSACTED); - } - else if (!transacted) - { - session = (org.apache.qpid.jms.Session) con1.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - - Queue q = new AMQQueue("ping"); - - MessageConsumer consumer = session.createConsumer(q, 1, false, false, selector); - new TestPingClient(session, consumer, true); - - con1.start(); - } - catch (Throwable t) - { - System.err.println("Fatal error: " + t); - t.printStackTrace(); - } + // Create the test ping client and set it running. + TestPingClient pingClient = + new TestPingClient(brokerDetails, username, password, queueName, virtualpath, transacted, selector, verbose); + pingClient.getConnection().start(); System.out.println("Waiting..."); } @@ -145,12 +144,12 @@ class TestPingClient extends AbstractPingClient implements MessageListener if (timestamp != null) { long diff = System.currentTimeMillis() - timestamp; - _logger.info("Ping time: " + diff); + System.out.println("Ping time: " + diff); } } // Commit the transaction if running in transactional mode. - commitTx(); + commitTx(_consumerSession); } catch (JMSException e) { diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java index d47650d049..182a6228b1 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java @@ -21,6 +21,8 @@ package org.apache.qpid.ping; import java.net.InetAddress; +import java.text.SimpleDateFormat; +import java.util.Date; import javax.jms.*; @@ -57,49 +59,49 @@ class TestPingProducer extends AbstractPingProducer /** Used to define how long to wait between pings. */ private static final long SLEEP_TIME = 250; - /** Used to define how long to wait before assuming that a ping has timed out. */ - private static final long TIMEOUT = 3000; - /** Holds the name of the queue to send pings on. */ private static final String PING_QUEUE_NAME = "ping"; + private static TestPingProducer _pingProducer; /** Holds the message producer to send the pings through. */ private MessageProducer _producer; /** Determines whether this producer sends persistent messages from the run method. */ - private boolean _persistent; + private boolean _persistent = false; /** Holds the message size to send, from the run method. */ - private int _messageSize; + private int _messageSize = DEFAULT_MESSAGE_SIZE; - public TestPingProducer(Session session, MessageProducer producer) throws JMSException - { - super(session); - _producer = producer; - } + /** Used to indicate that the ping loop should print out whenever it pings. */ + private boolean _verbose = false; - public TestPingProducer(Session session, MessageProducer producer, boolean persistent, int messageSize) - throws JMSException + public TestPingProducer(String brokerDetails, String username, String password, String virtualpath, String queueName, + boolean transacted, boolean persistent, int messageSize, boolean verbose) throws Exception { - this(session, producer); + // Create a connection to the broker. + InetAddress address = InetAddress.getLocalHost(); + String clientID = address.getHostName() + System.currentTimeMillis(); + + setConnection(new AMQConnection(brokerDetails, username, password, clientID, virtualpath)); + + // Create a transactional or non-transactional session, based on the command line arguments. + setProducerSession((Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE)); + + // Create a queue to send the pings on. + Queue pingQueue = new AMQQueue(queueName); + _producer = (MessageProducer) getProducerSession().createProducer(pingQueue); _persistent = persistent; _messageSize = messageSize; + + _verbose = verbose; } /** - * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongClient} also needs + * Starts a ping-pong loop running from the command line. The bounce back client {@link TestPingClient} also needs * to be started to bounce the pings back again. * - *

The command line takes from 2 to 4 arguments: - *

- *
brokerDetails The broker connection string. - *
virtualPath The virtual path. - *
transacted A boolean flag, telling this client whether or not to use transactions. - *
size The size of ping messages to use, in bytes. - *
- * * @param args The command line arguments as defined above. */ public static void main(String[] args) throws Exception @@ -108,53 +110,33 @@ class TestPingProducer extends AbstractPingProducer if (args.length < 2) { System.err.println( - "Usage: TestPingPublisher [transacted] [persistent] [message size in bytes]"); + "Usage: TestPingPublisher [verbose] [transacted] [persistent] [message size in bytes]"); System.exit(0); } String brokerDetails = args[0]; String virtualpath = args[1]; - boolean transacted = (args.length >= 3) ? Boolean.parseBoolean(args[2]) : false; - boolean persistent = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false; - int messageSize = (args.length >= 5) ? Integer.parseInt(args[4]) : DEFAULT_MESSAGE_SIZE; + boolean verbose = (args.length >= 3) ? Boolean.parseBoolean(args[2]) : true; + boolean transacted = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false; + boolean persistent = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false; + int messageSize = (args.length >= 6) ? Integer.parseInt(args[5]) : DEFAULT_MESSAGE_SIZE; - // Create a connection to the broker. - InetAddress address = InetAddress.getLocalHost(); - String clientID = address.getHostName() + System.currentTimeMillis(); + // Create a ping producer to generate the pings. + _pingProducer = new TestPingProducer(brokerDetails, "guest", "guest", virtualpath, PING_QUEUE_NAME, transacted, + persistent, messageSize, verbose); - Connection _connection = new AMQConnection(brokerDetails, "guest", "guest", clientID, virtualpath); - - // Create a transactional or non-transactional session, based on the command line arguments. - Session session; - - if (transacted) - { - session = (Session) _connection.createSession(true, Session.SESSION_TRANSACTED); - } - else - { - session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - - // Create a queue to send the pings on. - Queue pingQueue = new AMQQueue(PING_QUEUE_NAME); - MessageProducer producer = (MessageProducer) session.createProducer(pingQueue); - - // Create a ping producer to handle the request/wait/reply cycle. - _pingProducer = new TestPingProducer(session, producer, persistent, messageSize); - - // Start the message consumers running. - _connection.start(); + // Start the connection running. + _pingProducer.getConnection().start(); // Create a shutdown hook to terminate the ping-pong producer. Runtime.getRuntime().addShutdownHook(_pingProducer.getShutdownHook()); - // Start the ping loop running, ensuring that it is registered to listen for exceptions on the connection too. - _connection.setExceptionListener(_pingProducer); + // Ensure the ping loop execption listener is registered on the connection to terminate it on error. + _pingProducer.getConnection().setExceptionListener(_pingProducer); + + // Start the ping loop running until it is interrupted. Thread pingThread = new Thread(_pingProducer); pingThread.run(); - - // Run until the ping loop is terminated. pingThread.join(); } @@ -174,16 +156,7 @@ class TestPingProducer extends AbstractPingProducer // Commit the transaction if running in transactional mode. This must happen now, rather than at the end of // this method, as the message will not be sent until the transaction is committed. - commitTx(); - } - - /** - * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this - * flag has been cleared. - */ - public void stop() - { - _publish = false; + commitTx(getProducerSession()); } /** @@ -195,12 +168,16 @@ class TestPingProducer extends AbstractPingProducer try { // Generate a sample message and time stamp it. - ObjectMessage msg = getTestMessage(_session, null, _messageSize, System.currentTimeMillis(), _persistent); + ObjectMessage msg = getTestMessage(null, _messageSize, _persistent); msg.setLongProperty("timestamp", System.currentTimeMillis()); // Send the message. ping(msg); + if (_verbose) + { + System.out.println("Pinged at: " + timestampFormatter.format(new Date())); //" + " with id: " + msg.getJMSMessageID()); + } // Introduce a short pause if desired. pause(SLEEP_TIME); } diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java new file mode 100644 index 0000000000..d2a376fff0 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java @@ -0,0 +1,283 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.requestreply; + +import java.net.InetAddress; +import java.util.Date; + +import javax.jms.*; + +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.Session; +import org.apache.qpid.ping.AbstractPingClient; + +/** + * PingPongBouncer is a message listener the bounces back messages to their reply to destination. This is used to return + * ping messages generated by {@link org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes + * too. + * + *

The message id from the received message is extracted, and placed into the reply as the correlation id. Messages + * are bounced back to the reply-to destination. The original sender of the message has the option to use either a unique + * temporary queue or the correlation id to correlate the original message to the reply. + * + *

There is a verbose mode flag which causes information about each ping to be output to the console + * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should + * be disabled for real timing tests as writing to the console will slow things down. + * + *

When the a message is received, a reply to producer is created for it if it is not the same as the previous + * message. All subsequent replies are sent using that producer until a different reply to destination is + * encountered; effectively a last used cache of size 1. Fast because it saves creating the reply producer over and + * over again when the destination does not change. For a larger fixed set of reply to destinations could turn this + * into a cache with more elements. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Bounce back messages to their reply to destination. + *
Provide command line invocation to start the bounce back on a configurable broker url. + *
+ * + * @todo Replace the command line parsing with a neater tool. + * + * @todo Make verbose accept a number of messages, only prints to console every X messages. + */ +public class PingPongBouncer extends AbstractPingClient implements MessageListener +{ + private static final Logger _logger = Logger.getLogger(PingPongBouncer.class); + + /** The default prefetch size for the message consumer. */ + private static final int PREFETCH = 1; + + /** The default no local flag for the message consumer. */ + private static final boolean NO_LOCAL = true; + + /** The default exclusive flag for the message consumer. */ + private static final boolean EXCLUSIVE = false; + + /** Used to indicate that the reply generator should log timing info to the console (logger info level). */ + private boolean _verbose = false; + + /** Determines whether this bounce back client bounces back messages persistently. */ + private boolean _persistent = false; + + /** Keeps track of the response destination of the previous message for the last reply to producer cache. */ + private Destination _lastResponseDest; + + /** The cached, most recently used reply producer. */ + private MessageProducer _cachedReplyProducer; + + /** The consumer session. */ + private Session _consumerSession; + + /** The producer session. */ + private Session _producerSession; + + /** + * Creates a PingPongBouncer on the specified producer and consumer sessions. + * + * @param brokerDetails + * @param username + * @param password + * @param virtualpath + * @param queueName + * @param persistent + * @param transacted + * @param selector + * @param verbose + * @throws JMSException + * + * @throws Exception All underlying exceptions allowed to fall through. This is only test code... + */ + public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath, String queueName, + boolean persistent, boolean transacted, String selector, boolean verbose) throws Exception + { + // Create a client id to uniquely identify this client. + InetAddress address = InetAddress.getLocalHost(); + String clientId = address.getHostName() + System.currentTimeMillis(); + + // Connect to the broker. + setConnection(new AMQConnection(brokerDetails, username, password, clientId, virtualpath)); + _logger.info("Connected with URL:" + getConnection().toURL()); + + // Set up the failover notifier. + getConnection().setConnectionListener(new FailoverNotifier()); + + // Create a session to listen for messages on and one to send replies on, transactional depending on the + // command line option. + Session consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); + Session producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); + + // Create the queue to listen for message on. + Queue q = new AMQQueue(queueName); + MessageConsumer consumer = consumerSession.createConsumer(q, PREFETCH, NO_LOCAL, EXCLUSIVE, selector); + + // Hang on to the sessions for the messages and replies. + _consumerSession = consumerSession; + _producerSession = producerSession; + + _verbose = verbose; + _persistent = persistent; + + // Set this up to listen for messages on the queue. + consumer.setMessageListener(this); + } + + /** + * Starts a stand alone ping-pong client running in verbose mode. + * + * @param args + */ + public static void main(String[] args) throws Exception + { + System.out.println("Starting..."); + + // Display help on the command line. + if (args.length < 5) + { + System.err.println("Usage: " + + "[ ] [selector]"); + System.exit(1); + } + + // Extract all command line parameters. + String brokerDetails = args[0]; + String username = args[1]; + String password = args[2]; + String virtualpath = args[3]; + String queueName = args[4]; + boolean persistent = ((args.length >= 6) && (args[5].toUpperCase().charAt(0) == 'P')); + boolean transacted = ((args.length >= 7) && (args[6].toUpperCase().charAt(0) == 'T')); + String selector = (args.length == 8) ? args[5] : null; + + // Instantiate the ping pong client with the command line options and start it running. + PingPongBouncer pingBouncer = + new PingPongBouncer(brokerDetails, username, password, virtualpath, queueName, persistent, transacted, selector, + true); + pingBouncer.getConnection().start(); + + System.out.println("Waiting..."); + } + + /** + * This is a callback method that is notified of all messages for which this has been registered as a message + * listener on a message consumer. It sends a reply (pong) to all messages it receieves on the reply to + * destination of the message. + * + * @param message The message that triggered this callback. + */ + public void onMessage(Message message) + { + try + { + String messageCorrelationId = message.getJMSCorrelationID(); + + if (_verbose) + { + _logger.info(timestampFormatter.format(new Date()) + ": Got ping with correlation id, " + + messageCorrelationId); + } + + // Get the reply to destination from the message and check it is set. + Destination responseDest = message.getJMSReplyTo(); + + if (responseDest == null) + { + _logger.debug("Producer not created because the response destination is null."); + + return; + } + + // Check if the reply to destination is different to the last message and create a new producer if so. + if (!responseDest.equals(_lastResponseDest)) + { + _lastResponseDest = responseDest; + + _logger.debug("About to create a producer."); + _cachedReplyProducer = _producerSession.createProducer(responseDest); + _cachedReplyProducer.setDisableMessageTimestamp(true); + _cachedReplyProducer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + _logger.debug("After create a producer."); + } + + // Spew out some timing information if verbose mode is on. + if (_verbose) + { + Long timestamp = message.getLongProperty("timestamp"); + + if (timestamp != null) + { + long diff = System.currentTimeMillis() - timestamp; + _logger.info("Time to bounce point: " + diff); + } + } + + // Correlate the reply to the original. + message.setJMSCorrelationID(messageCorrelationId); + + // Send the receieved message as the pong reply. + _cachedReplyProducer.send(message); + + if (_verbose) + { + _logger.info(timestampFormatter.format(new Date()) + ": Sent reply with correlation id, " + + messageCorrelationId); + } + + // Commit the transaction if running in transactional mode. + commitTx(_producerSession); + } + catch (JMSException e) + { + _logger.debug("There was a JMSException: " + e.getMessage(), e); + } + } + + /** + * A connection listener that logs out any failover complete events. Could do more interesting things with this + * at some point... + */ + public static class FailoverNotifier implements ConnectionListener + { + public void bytesSent(long count) + { } + + public void bytesReceived(long count) + { } + + public boolean preFailover(boolean redirect) + { + return true; + } + + public boolean preResubscribe() + { + return true; + } + + public void failoverComplete() + { + _logger.info("App got failover complete callback."); + } + } +} diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongClient.java deleted file mode 100644 index bee75bb1eb..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongClient.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.requestreply; - -import java.net.InetAddress; - -import javax.jms.*; - -import org.apache.log4j.Logger; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.jms.Session; -import org.apache.qpid.ping.AbstractPingClient; - -/** - * PingPongClient is a message listener the bounces back messages to their reply to destination. This is used to return - * ping messages generated by {@link org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes too. - * - *

The message id from the received message is extracted, and placed into the reply as the correlation id. Messages - * are bounced back to the reply-to destination. The original sender of the message has the option to use either a unique - * temporary queue or the correlation id to correlate the original message to the reply. - * - *

There is a verbose mode flag which causes information about each ping to be output to the console - * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should - * be disabled for real timing tests as writing to the console will slow things down. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Bounce back messages to their reply to destination. - *
Provide command line invocation to start the bounce back on a configurable broker url. - *
- * - * @todo Rename this to BounceBackClient or something similar. - */ -public class PingPongClient extends AbstractPingClient implements MessageListener -{ - private static final Logger _logger = Logger.getLogger(PingPongClient.class); - - /** Used to indicate that the reply generator should log timing info to the console (logger info level). */ - private boolean _verbose = false; - - /** - * Creates a PingPongClient on the specified session. - * - * @param session The JMS Session for the ping pon client to run on. - * @param consumer The message consumer to receive the messages with. - * @param verbose If set to true will output timing information on every message. - */ - public PingPongClient(Session session, MessageConsumer consumer, boolean verbose) throws JMSException - { - // Hang on to the session for the replies. - super(session); - - // Set this up to listen for messages on the queue. - consumer.setMessageListener(this); - } - - /** - * Starts a stand alone ping-pong client running in verbose mode. - * - * @param args - */ - public static void main(String[] args) - { - _logger.info("Starting..."); - - // Display help on the command line. - if (args.length < 4) - { - System.out.println("Usage: brokerdetails username password virtual-path [transacted] [selector]"); - System.exit(1); - } - - // Extract all comman line parameters. - String brokerDetails = args[0]; - String username = args[1]; - String password = args[2]; - String virtualpath = args[3]; - boolean transacted = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false; - String selector = (args.length == 6) ? args[5] : null; - - try - { - InetAddress address = InetAddress.getLocalHost(); - - AMQConnection con1 = new AMQConnection(brokerDetails, username, password, address.getHostName(), virtualpath); - - _logger.info("Connected with URL:" + con1.toURL()); - - // Create a transactional or non-transactional session depending on the command line parameter. - Session session = null; - - if (transacted) - { - session = (org.apache.qpid.jms.Session) con1.createSession(false, Session.SESSION_TRANSACTED); - } - else if (!transacted) - { - session = (org.apache.qpid.jms.Session) con1.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - - Queue q = new AMQQueue("ping"); - - MessageConsumer consumer = session.createConsumer(q, 1, false, false, selector); - new PingPongClient(session, consumer, true); - - con1.start(); - } - catch (Throwable t) - { - System.err.println("Fatal error: " + t); - t.printStackTrace(); - } - - System.out.println("Waiting..."); - } - - /** - * This is a callback method that is notified of all messages for which this has been registered as a message - * listener on a message consumer. It sends a reply (pong) to all messages it receieves on the reply to - * destination of the message. - * - * @param message The message that triggered this callback. - */ - public void onMessage(javax.jms.Message message) - { - try - { - // Spew out some timing information if verbose mode is on. - if (_verbose) - { - Long timestamp = message.getLongProperty("timestamp"); - - if (timestamp != null) - { - long diff = System.currentTimeMillis() - timestamp; - _logger.info("Ping time: " + diff); - } - } - - // Correlate the reply to the original. - message.setJMSCorrelationID(message.getJMSMessageID()); - - // Send the receieved message as the pong reply. - MessageProducer producer = _session.createProducer(message.getJMSReplyTo()); - producer.send(message); - - // Commit the transaction if running in transactional mode. - commitTx(); - } - catch (JMSException e) - { - _logger.debug("There was a JMSException: " + e.getMessage(), e); - } - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index c1d42eeed4..6956187b66 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -20,22 +20,28 @@ */ package org.apache.qpid.requestreply; +import java.net.InetAddress; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.*; + import org.apache.log4j.Logger; + import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.jms.MessageProducer; import org.apache.qpid.jms.Session; import org.apache.qpid.ping.AbstractPingProducer; -import org.apache.qpid.util.concurrent.BooleanLatch; - -import javax.jms.*; -import java.net.InetAddress; -import java.util.HashMap; -import java.util.Map; /** * PingPongProducer is a client that sends pings to a queue and waits for pongs to be bounced back by a bounce back - * client (see {@link org.apache.qpid.requestreply.PingPongClient} for the bounce back client). It is designed to be run from the command line + * client (see {@link PingPongBouncer} for the bounce back client). It is designed to be run from the command line * as a stand alone test tool, but it may also be fairly easily instantiated by other code by supplying a session, * message producer and message consumer to run the ping-pong cycle on. * @@ -75,8 +81,20 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, /** Holds the name of the queue to send pings on. */ private static final String PING_QUEUE_NAME = "ping"; + /** The batch size. */ + private static final int BATCH_SIZE = 100; + /** Keeps track of the ping producer instance used in the run loop. */ private static PingPongProducer _pingProducer; + private static final int PREFETCH = 100; + private static final boolean NO_LOCAL = true; + private static final boolean EXCLUSIVE = false; + + /** The number of priming loops to run. */ + private static final int PRIMING_LOOPS = 3; + + /** A source for providing sequential unique correlation ids. */ + private AtomicLong idGenerator = new AtomicLong(0L); /** Holds the message producer to send the pings through. */ private MessageProducer _producer; @@ -91,32 +109,65 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, private int _messageSize; /** Holds a map from message ids to latches on which threads wait for replies. */ - private Map trafficLights = new HashMap(); + private Map trafficLights = new HashMap(); + + /** Used to indicate that the ping loop should print out whenever it pings. */ + private boolean _verbose = false; - /** Holds a map from message ids to correlated replies. */ - private Map replies = new HashMap(); + private Session _consumerSession; - public PingPongProducer(Session session, Queue replyQueue, MessageProducer producer, MessageConsumer consumer) - throws JMSException + /** + * Creates a ping pong producer with the specified connection details and type. + * + * @param brokerDetails + * @param username + * @param password + * @param virtualpath + * @param transacted + * @param persistent + * @param messageSize + * @param verbose + * + * @throws Exception All allowed to fall through. This is only test code... + */ + public PingPongProducer(String brokerDetails, String username, String password, String virtualpath, String queueName, + String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose) + throws Exception { - super(session); - _producer = producer; - _replyQueue = replyQueue; + // Create a connection to the broker. + InetAddress address = InetAddress.getLocalHost(); + String clientID = address.getHostName() + System.currentTimeMillis(); + + setConnection(new AMQConnection(brokerDetails, username, password, clientID, virtualpath)); + + // Create transactional or non-transactional sessions, based on the command line arguments. + setProducerSession((Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE)); + _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); + // Create a queue and producer to send the pings on. + Queue pingQueue = new AMQQueue(queueName); + _producer = (MessageProducer) getProducerSession().createProducer(pingQueue); + _producer.setDisableMessageTimestamp(true); + _producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + + // Create a temporary queue to get the pongs on. + _replyQueue = _consumerSession.createTemporaryQueue(); + + // Create a message consumer to get the replies with and register this to be called back by it. + MessageConsumer consumer = _consumerSession.createConsumer(_replyQueue, PREFETCH, NO_LOCAL, EXCLUSIVE, selector); consumer.setMessageListener(this); - } - public PingPongProducer(Session session, Queue replyQueue, MessageProducer producer, MessageConsumer consumer, - boolean persistent, int messageSize) throws JMSException - { - this(session, replyQueue, producer, consumer); + // Run a few priming pings to remove warm up time from test results. + prime(PRIMING_LOOPS); _persistent = persistent; _messageSize = messageSize; + + _verbose = verbose; } /** - * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongClient} also needs + * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongBouncer} also needs * to be started to bounce the pings back again. * *

The command line takes from 2 to 4 arguments: @@ -141,59 +192,59 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, String brokerDetails = args[0]; String virtualpath = args[1]; - boolean transacted = (args.length >= 3) ? Boolean.parseBoolean(args[2]) : false; - boolean persistent = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false; - int messageSize = (args.length >= 5) ? Integer.parseInt(args[4]) : DEFAULT_MESSAGE_SIZE; - - // Create a connection to the broker. - InetAddress address = InetAddress.getLocalHost(); - String clientID = address.getHostName() + System.currentTimeMillis(); - - Connection _connection = new AMQConnection(brokerDetails, "guest", "guest", clientID, virtualpath); - - // Create a transactional or non-transactional session, based on the command line arguments. - Session session; - - if (transacted) - { - session = (Session) _connection.createSession(true, Session.SESSION_TRANSACTED); - } - else - { - session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - - // Create a queue to send the pings on. - Queue pingQueue = new AMQQueue(PING_QUEUE_NAME); - MessageProducer producer = (MessageProducer) session.createProducer(pingQueue); - - // Create a temporary queue to reply with the pongs on. - Queue replyQueue = session.createTemporaryQueue(); - - // Create a message consumer to get the replies with. - MessageConsumer consumer = session.createConsumer(replyQueue); + boolean verbose = (args.length >= 3) ? Boolean.parseBoolean(args[2]) : true; + boolean transacted = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false; + boolean persistent = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false; + int messageSize = (args.length >= 6) ? Integer.parseInt(args[5]) : DEFAULT_MESSAGE_SIZE; // Create a ping producer to handle the request/wait/reply cycle. - _pingProducer = new PingPongProducer(session, replyQueue, producer, consumer, persistent, messageSize); - - // Start the message consumers running. - _connection.start(); + _pingProducer = new PingPongProducer(brokerDetails, "guest", "guest", virtualpath, PING_QUEUE_NAME, null, transacted, + persistent, messageSize, verbose); + _pingProducer.getConnection().start(); // Create a shutdown hook to terminate the ping-pong producer. Runtime.getRuntime().addShutdownHook(_pingProducer.getShutdownHook()); - // Start the ping loop running, ensuring that it is registered to listen for exceptions on the connection too. - _connection.setExceptionListener(_pingProducer); + // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. + _pingProducer.getConnection().setExceptionListener(_pingProducer); + + // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception. Thread pingThread = new Thread(_pingProducer); pingThread.run(); - - // Run until the ping loop is terminated. pingThread.join(); } + /** + * Primes the test loop by sending a few messages, then introducing a short wait. This allows the bounce back client + * on the other end a chance to configure its reply producer on the reply to destination. It is also worth calling + * this a few times, in order to prime the JVMs JIT compilation. + * + * @param x The number of priming loops to run. + * + * @throws JMSException All underlying exceptions are allowed to fall through. + */ + public void prime(int x) throws JMSException + { + for (int i = 0; i < x; i++) + { + // Create and send a small message. + Message first = getTestMessage(_replyQueue, 0, false); + _producer.send(first); + commitTx(getProducerSession()); + + try + { + Thread.sleep(100); + } + catch (InterruptedException ignore) + { } + } + } + /** * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a - * correlating reply may be waiting on. + * correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected + * in the replies map. * * @param message The received message. */ @@ -201,21 +252,37 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, { try { - // Store the reply. + // Store the reply, if it has a correlation id that is expected. String correlationID = message.getJMSCorrelationID(); - replies.put(correlationID, message); + + if (_verbose) + { + _logger.info(timestampFormatter.format(new Date()) + ": Got reply with correlation id, " + correlationID); + } // Turn the traffic light to green. - BooleanLatch trafficLight = trafficLights.get(correlationID); + CountDownLatch trafficLight = trafficLights.get(correlationID); if (trafficLight != null) { - trafficLight.signal(); + _logger.debug("Reply was expected, decrementing the latch for the id."); + trafficLight.countDown(); } else { _logger.debug("There was no thread waiting for reply: " + correlationID); } + + if (_verbose) + { + Long timestamp = message.getLongProperty("timestamp"); + + if (timestamp != null) + { + long diff = System.currentTimeMillis() - timestamp; + _logger.info("Time for round trip: " + diff); + } + } } catch (JMSException e) { @@ -224,50 +291,90 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, } /** - * Sends the specified ping message and then waits for a correlating reply. If the wait times out before a reply - * arrives, then a null reply is returned from this method. + * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out + * before a reply arrives, then a null reply is returned from this method. * - * @param message The message to send. - * @param timeout The timeout in milliseconds. + * @param message The message to send. + * @param numPings The number of ping messages to send. + * @param timeout The timeout in milliseconds. * - * @return The reply, or null if no reply arrives before the timeout. + * @return The number of replies received. This may be less than the number sent if the timeout terminated the + * wait for all prematurely. * * @throws JMSException All underlying JMSExceptions are allowed to fall through. */ - public Message pingAndWaitForReply(Message message, long timeout) throws JMSException, InterruptedException + public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException { - _producer.send(message); + // Put a unique correlation id on the message before sending it. + String messageCorrelationId = Long.toString(idGenerator.incrementAndGet()); + message.setJMSCorrelationID(messageCorrelationId); - // Keep the messageId to correlate with the reply. - String messageId = message.getJMSMessageID(); + for (int i = 0; i < numPings; i++) + { + // Re-timestamp the message. + message.setLongProperty("timestamp", System.currentTimeMillis()); + + _producer.send(message); + } // Commit the transaction if running in transactional mode. This must happen now, rather than at the end of // this method, as the message will not be sent until the transaction is committed. - commitTx(); + commitTx(getProducerSession()); + + // Keep the messageId to correlate with the reply. + //String messageId = message.getJMSMessageID(); + + if (_verbose) + { + _logger.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId); + } // Block the current thread until a reply to the message is received, or it times out. - BooleanLatch trafficLight = new BooleanLatch(); - trafficLights.put(messageId, trafficLight); + CountDownLatch trafficLight = new CountDownLatch(numPings); + trafficLights.put(messageCorrelationId, trafficLight); // Note that this call expects a timeout in nanoseconds, millisecond timeout is multiplied up. - trafficLight.await(timeout * 1000); + trafficLight.await(timeout, TimeUnit.MILLISECONDS); - // Check the replies to see if one was generated, if not then the reply timed out. - Message result = replies.get(messageId); + // Work out how many replies were receieved. + int numReplies = numPings - (int) trafficLight.getCount(); - return result; + if ((numReplies < numPings) && _verbose) + { + _logger.info("Timed out before all replies received on id, " + messageCorrelationId); + } + else if (_verbose) + { + _logger.info("Got all replies on id, " + messageCorrelationId); + } + + return numReplies; } /** - * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the - * connection, this clears the publish flag which in turn will halt the ping loop. + * Sends the specified ping message but does not wait for a correlating reply. + * + * @param message The message to send. + * @param numPings The number of pings to send. * - * @param e The exception that triggered this callback method. + * @return The reply, or null if no reply arrives before the timeout. + * + * @throws JMSException All underlying JMSExceptions are allowed to fall through. */ - public void onException(JMSException e) + public void pingNoWaitForReply(Message message, int numPings) throws JMSException, InterruptedException { - _publish = false; - _logger.debug("There was a JMSException: " + e.getMessage(), e); + for (int i = 0; i < numPings; i++) + { + _producer.send(message); + + if (_verbose) + { + _logger.info(timestampFormatter.format(new Date()) + ": Pinged at."); + } + } + + // Commit the transaction if running in transactional mode, to force the send now. + commitTx(getProducerSession()); } /** @@ -279,11 +386,11 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, try { // Generate a sample message and time stamp it. - ObjectMessage msg = getTestMessage(_session, _replyQueue, _messageSize, System.currentTimeMillis(), _persistent); + ObjectMessage msg = getTestMessage(_replyQueue, _messageSize, _persistent); msg.setLongProperty("timestamp", System.currentTimeMillis()); // Send the message and wait for a reply. - pingAndWaitForReply(msg, TIMEOUT); + pingAndWaitForReply(msg, BATCH_SIZE, TIMEOUT); // Introduce a short pause if desired. pause(SLEEP_TIME); @@ -299,4 +406,37 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, _logger.debug("There was an interruption: " + e.getMessage(), e); } } + + public Queue getReplyQueue() + { + return _replyQueue; + } + + /** + * A connection listener that logs out any failover complete events. Could do more interesting things with this + * at some point... + */ + public static class FailoverNotifier implements ConnectionListener + { + public void bytesSent(long count) + { } + + public void bytesReceived(long count) + { } + + public boolean preFailover(boolean redirect) + { + return true; + } + + public boolean preResubscribe() + { + return true; + } + + public void failoverComplete() + { + _logger.info("App got failover complete callback."); + } + } } diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java new file mode 100644 index 0000000000..3576a088d7 --- /dev/null +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java @@ -0,0 +1,180 @@ +package org.apache.qpid.ping; + +import java.net.InetAddress; +import java.util.Properties; + +import javax.jms.*; + +import junit.framework.TestCase; + +import org.apache.log4j.Logger; +import org.apache.log4j.NDC; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.jms.Connection; +import org.apache.qpid.jms.MessageProducer; +import org.apache.qpid.jms.Session; + +/** + *

+ *
CRC Card
Responsibilities Collaborations + *
+ * + * @author Rupert Smith + */ +public class PingTestPerf extends TestCase //implements TimingControllerAware +{ + private static Logger _logger = Logger.getLogger(PingTestPerf.class); + + /** Holds the name of the property to get the test message size from. */ + private static final String MESSAGE_SIZE_PROPNAME = "messageSize"; + + /** Holds the name of the property to get the ping queue name from. */ + private static final String PING_QUEUE_NAME_PROPNAME = "pingQueue"; + + /** Holds the name of the property to get the test delivery mode from. */ + private static final String PERSISTENT_MODE_PROPNAME = "persistent"; + + /** Holds the name of the property to get the test transactional mode from. */ + private static final String TRANSACTED_PROPNAME = "transacted"; + + /** Holds the name of the property to get the test broker url from. */ + private static final String BROKER_PROPNAME = "broker"; + + /** Holds the name of the property to get the test broker virtual path. */ + private static final String VIRTUAL_PATH_PROPNAME = "virtualPath"; + + /** Holds the size of message body to attach to the ping messages. */ + private static final int MESSAGE_SIZE_DEFAULT = 0; + + /** Holds the name of the queue to which pings are sent. */ + private static final String PING_QUEUE_NAME_DEFAULT = "ping"; + + /** Holds the message delivery mode to use for the test. */ + private static final boolean PERSISTENT_MODE_DEFAULT = false; + + /** Holds the transactional mode to use for the test. */ + private static final boolean TRANSACTED_DEFAULT = false; + + /** Holds the default broker url for the test. */ + private static final String BROKER_DEFAULT = "tcp://localhost:5672"; + + /** Holds the default virtual path for the test. */ + private static final String VIRTUAL_PATH_DEFAULT = "/test"; + + /** Sets a default ping timeout. */ + private static final long TIMEOUT = 3000; + + // Sets up the test parameters with defaults. + static + { + setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT)); + setSystemPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT); + setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT)); + setSystemPropertyIfNull(TRANSACTED_PROPNAME, Boolean.toString(TRANSACTED_DEFAULT)); + setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT); + setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT); + } + + /** Holds the test ping-pong producer. */ + private TestPingProducer _testPingProducer; + + /** Holds the test ping client. */ + private TestPingClient _testPingClient; + + // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in + // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner + // of the test parameters to log with the results. + private Properties testParameters = System.getProperties(); + //private Properties testParameters = new ContextualProperties(System.getProperties()); + + public PingTestPerf(String name) + { + super(name); + } + + private static void setSystemPropertyIfNull(String propName, String propValue) + { + if (System.getProperty(propName) == null) + { + System.setProperty(propName, propValue); + } + } + + public void testPingOk() throws Exception + { + // Generate a sample message. This message is already time stamped and has its reply-to destination set. + ObjectMessage msg = + _testPingProducer.getTestMessage(null, Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)), + Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME))); + + // Use the test timing controller to reset the test timer now and obtain the current time. + // This can be used to remove the message creation time from the test. + //TestTimingController timingUtils = getTimingController(); + //long startTime = timingUtils.restart(); + + // Send the message. + _testPingProducer.ping(msg); + + // Fail the test if the timeout was exceeded. + /*if (reply == null) + { + Assert.fail("The ping timed out for message id: " + msg.getJMSMessageID()); + }*/ + } + + protected void setUp() throws Exception + { + // Log4j will propagate the test name as a thread local in all log output. + NDC.push(getName()); + + // Ensure that the connection, session and ping queue are established, if they have not already been. + if (_testPingProducer == null) + { + // Extract the test set up paramaeters. + String brokerDetails = testParameters.getProperty(BROKER_PROPNAME); + String username = "guest"; + String password = "guest"; + String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME); + String queueName = testParameters.getProperty(PING_QUEUE_NAME_PROPNAME); + boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)); + boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME)); + String selector = null; + boolean verbose = false; + int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)); + + // Establish a bounce back client on the ping queue to bounce back the pings. + _testPingClient = new TestPingClient(brokerDetails, username, password, queueName, virtualpath, transacted, + selector, verbose); + + // Establish a ping-pong client on the ping queue to send the pings with. + _testPingProducer = new TestPingProducer(brokerDetails, username, password, virtualpath, queueName, transacted, + persistent, messageSize, verbose); + + // Start the connections for client and producer running. + _testPingClient.getConnection().start(); + _testPingProducer.getConnection().start(); + } + } + + protected void tearDown() throws Exception + { + try + { + if ((_testPingClient != null) && (_testPingClient.getConnection() != null)) + { + _testPingClient.getConnection().close(); + } + + if ((_testPingProducer != null) && (_testPingProducer.getConnection() != null)) + { + _testPingProducer.getConnection().close(); + } + } + finally + { + NDC.pop(); + } + } +} diff --git a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java index 74f1a899cf..336a727cdb 100644 --- a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java @@ -16,6 +16,7 @@ import org.apache.qpid.client.AMQQueue; import org.apache.qpid.jms.Connection; import org.apache.qpid.jms.MessageProducer; import org.apache.qpid.jms.Session; +import uk.co.thebadgerset.junit.extensions.TimingControllerAware; /** * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run @@ -42,7 +43,7 @@ import org.apache.qpid.jms.Session; * * @author Rupert Smith */ -public class PingPongTestPerf extends TestCase implements ExceptionListener //, TimingControllerAware +public class PingPongTestPerf extends TestCase //implements TimingControllerAware { private static Logger _logger = Logger.getLogger(PingPongTestPerf.class); @@ -99,36 +100,15 @@ public class PingPongTestPerf extends TestCase implements ExceptionListener //, /** Holds the test ping-pong producer. */ private PingPongProducer _testPingProducer; + /** Holds the test ping client. */ + private PingPongBouncer _testPingBouncer; + // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner - // of the test parameters to log with the results. + // of the test parameters to log with the results. It also providers some basic type parsing convenience methods. private Properties testParameters = System.getProperties(); //private Properties testParameters = new ContextualProperties(System.getProperties()); - /** Holds the connection to the broker. */ - private Connection _connection = null; - - /** Holds the current session to the broker. */ - private Session _session; - - /** Holds the destination to send the ping messages to. */ - private Queue _pingQueue; - - /** Holds the destination to send replies to. */ - private Queue _replyQueue; - - /** Holds a message producer, set up on the ping destination, to send messages through. */ - private MessageProducer _producer; - - /** Holds a message consumer, set up on the ping destination, to receive pings through. */ - private MessageConsumer _pingConsumer; - - /** Holds a message consumer, set up on the pong destination, to receive replies through. */ - private MessageConsumer _pongConsumer; - - /** Holds a failure flag, which gets set if the connection to the broker goes down. */ - private boolean _failure; - public PingPongTestPerf(String name) { super(name); @@ -146,9 +126,8 @@ public class PingPongTestPerf extends TestCase implements ExceptionListener //, { // Generate a sample message. This message is already time stamped and has its reply-to destination set. ObjectMessage msg = - PingPongProducer.getTestMessage(_session, _replyQueue, + _testPingProducer.getTestMessage(_testPingProducer.getReplyQueue(), Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)), - System.currentTimeMillis(), Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME))); // Use the test timing controller to reset the test timer now and obtain the current time. @@ -157,77 +136,46 @@ public class PingPongTestPerf extends TestCase implements ExceptionListener //, //long startTime = timingUtils.restart(); // Send the message and wait for a reply. - Message reply = _testPingProducer.pingAndWaitForReply(msg, TIMEOUT); + int numReplies = _testPingProducer.pingAndWaitForReply(msg, 1, TIMEOUT); // Fail the test if the timeout was exceeded. - if (reply == null) + if (numReplies != 1) { Assert.fail("The ping timed out for message id: " + msg.getJMSMessageID()); } } - /** - * This is a callback method that is registered to receive any JMSExceptions that occurr on the connection to - * the broker. It sets a failure flag to indicate that there is an error condition. - * - * @param e The JMSException that triggered this callback method. - * - * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) - */ - public void onException(JMSException e) - { - // Set the failure flag. - _failure = true; - - _logger.debug("There was a JMSException: " + e.getMessage(), e); - } - protected void setUp() throws Exception { // Log4j will propagate the test name as a thread local in all log output. NDC.push(getName()); // Ensure that the connection, session and ping queue are established, if they have not already been. - if (_connection == null) + if (_testPingProducer == null) { - // Create a client id that identifies the client machine. - String clientID = InetAddress.getLocalHost().getHostName() + System.currentTimeMillis(); - - // Connect to the broker. - _connection = new AMQConnection(testParameters.getProperty(BROKER_PROPNAME), "guest", "guest", clientID, - testParameters.getProperty(VIRTUAL_PATH_PROPNAME)); - _connection.setExceptionListener(this); - - // Create a transactional or non-transactional session, based on the test properties, if a session has not - // already been created. - if (Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME))) - { - _session = (Session) _connection.createSession(true, Session.SESSION_TRANSACTED); - } - else - { - _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - - // Create a queue to send the pings on. - _pingQueue = new AMQQueue(testParameters.getProperty(PING_QUEUE_NAME_PROPNAME)); - _producer = (MessageProducer) _session.createProducer(_pingQueue); - - // Create a temporary queue to reply with the pongs on. - _replyQueue = _session.createTemporaryQueue(); - - // Create the ping and pong consumers on their respective destinations. - _pingConsumer = _session.createConsumer(_pingQueue); - _pongConsumer = _session.createConsumer(_replyQueue); + // Extract the test set up paramaeters. + String brokerDetails = testParameters.getProperty(BROKER_PROPNAME); + String username = "guest"; + String password = "guest"; + String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME); + String queueName = testParameters.getProperty(PING_QUEUE_NAME_PROPNAME); + boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)); + boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME)); + String selector = null; + boolean verbose = false; + int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)); // Establish a bounce back client on the ping queue to bounce back the pings. - new org.apache.qpid.requestreply.PingPongClient(_session, _pingConsumer, false); + _testPingBouncer = new PingPongBouncer(brokerDetails, username, password, virtualpath, queueName, persistent, + transacted, selector, verbose); - // Establish a ping-pong client on the ping queue to send pings and wait for replies. - _testPingProducer = new org.apache.qpid.requestreply.PingPongProducer(_session, _replyQueue, _producer, - _pongConsumer); + // Establish a ping-pong client on the ping queue to send the pings with. + _testPingProducer = new PingPongProducer(brokerDetails, username, password, virtualpath, queueName, selector, + transacted, persistent, messageSize, verbose); - _connection.start(); + // Start the connections for client and producer running. + _testPingBouncer.getConnection().start(); + _testPingProducer.getConnection().start(); } } @@ -235,7 +183,15 @@ public class PingPongTestPerf extends TestCase implements ExceptionListener //, { try { - _connection.close(); + if ((_testPingBouncer != null) && (_testPingBouncer.getConnection() != null)) + { + _testPingBouncer.getConnection().close(); + } + + if ((_testPingProducer != null) && (_testPingProducer.getConnection() != null)) + { + _testPingProducer.getConnection().close(); + } } finally { diff --git a/java/pom.xml b/java/pom.xml index fdaba94ce6..380b0b8cd1 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -99,7 +99,7 @@ 1.2-SNAPSHOT - 2.1 + 2.2-SNAPSHOT 2.0 2.0.1 1.0 @@ -284,6 +284,8 @@ amqj.logging.level ${amqj.logging.level} + -- cgit v1.2.1