diff options
Diffstat (limited to 'M4-RCs/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java')
-rw-r--r-- | M4-RCs/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java | 453 |
1 files changed, 0 insertions, 453 deletions
diff --git a/M4-RCs/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java b/M4-RCs/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java deleted file mode 100644 index 8e010ccf07..0000000000 --- a/M4-RCs/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java +++ /dev/null @@ -1,453 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.requestreply; - -import java.io.IOException; -import java.net.InetAddress; -import java.text.SimpleDateFormat; -import java.util.Date; - -import javax.jms.*; - -import org.apache.log4j.Logger; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.jms.Session; -import org.apache.qpid.topic.Config; -import org.apache.qpid.exchange.ExchangeDefaults; - -/** - * PingPongBouncer is a message listener the bounces back messages to their reply to destination. This is used to return - * ping messages generated by {@link org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes - * too. - * - * <p/>The correlation id from the received message is extracted, and placed into the reply as the correlation id. Messages - * are bounced back to their reply-to destination. The original sender of the message has the option to use either a unique - * temporary queue or the correlation id to correlate the original message to the reply. - * - * <p/>There is a verbose mode flag which causes information about each ping to be output to the console - * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should - * be disabled for real timing tests as writing to the console will slow things down. - * - * <p><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Bounce back messages to their reply to destination. - * <tr><td> Provide command line invocation to start the bounce back on a configurable broker url. - * </table> - * - * @todo Replace the command line parsing with a neater tool. - * - * @todo Make verbose accept a number of messages, only prints to console every X messages. - */ -public class PingPongBouncer implements MessageListener -{ - private static final Logger _logger = Logger.getLogger(PingPongBouncer.class); - - /** The default prefetch size for the message consumer. */ - private static final int PREFETCH = 1; - - /** The default no local flag for the message consumer. */ - private static final boolean NO_LOCAL = true; - - private static final String DEFAULT_DESTINATION_NAME = "ping"; - - /** The default exclusive flag for the message consumer. */ - private static final boolean EXCLUSIVE = false; - - /** A convenient formatter to use when time stamping output. */ - protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); - - /** Used to indicate that the reply generator should log timing info to the console (logger info level). */ - private boolean _verbose = false; - - /** Determines whether this bounce back client bounces back messages persistently. */ - private boolean _persistent = false; - - private Destination _consumerDestination; - - /** Keeps track of the response destination of the previous message for the last reply to producer cache. */ - private Destination _lastResponseDest; - - /** The producer for sending replies with. */ - private MessageProducer _replyProducer; - - /** The consumer controlSession. */ - private Session _consumerSession; - - /** The producer controlSession. */ - private Session _producerSession; - - /** Holds the connection to the broker. */ - private AMQConnection _connection; - - /** Flag used to indicate if this is a point to point or pub/sub ping client. */ - private boolean _isPubSub = false; - - /** - * This flag is used to indicate that the user should be prompted to kill a broker, in order to test - * failover, immediately before committing a transaction. - */ - protected boolean _failBeforeCommit = false; - - /** - * This flag is used to indicate that the user should be prompted to a kill a broker, in order to test - * failover, immediate after committing a transaction. - */ - protected boolean _failAfterCommit = false; - - /** - * Creates a PingPongBouncer on the specified producer and consumer sessions. - * - * @param brokerDetails The addresses of the brokers to connect to. - * @param username The broker username. - * @param password The broker password. - * @param virtualpath The virtual host name within the broker. - * @param destinationName The name of the queue to receive pings on - * (or root of the queue name where many queues are generated). - * @param persistent A flag to indicate that persistent message should be used. - * @param transacted A flag to indicate that pings should be sent within transactions. - * @param selector A message selector to filter received pings with. - * @param verbose A flag to indicate that message timings should be sent to the console. - * - * @throws Exception All underlying exceptions allowed to fall through. This is only test code... - */ - public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath, - String destinationName, boolean persistent, boolean transacted, String selector, boolean verbose, - boolean pubsub) throws Exception - { - // Create a client id to uniquely identify this client. - InetAddress address = InetAddress.getLocalHost(); - String clientId = address.getHostName() + System.currentTimeMillis(); - _verbose = verbose; - _persistent = persistent; - setPubSub(pubsub); - // Connect to the broker. - setConnection(new AMQConnection(brokerDetails, username, password, clientId, virtualpath)); - _logger.info("Connected with URL:" + getConnection().toURL()); - - // Set up the failover notifier. - getConnection().setConnectionListener(new FailoverNotifier()); - - // Create a controlSession to listen for messages on and one to send replies on, transactional depending on the - // command line option. - _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); - _producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); - - // Create the queue to listen for message on. - createConsumerDestination(destinationName); - MessageConsumer consumer = - _consumerSession.createConsumer(_consumerDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector); - - // Create a producer for the replies, without a default destination. - _replyProducer = _producerSession.createProducer(null); - _replyProducer.setDisableMessageTimestamp(true); - _replyProducer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - - // Set this up to listen for messages on the queue. - consumer.setMessageListener(this); - } - - /** - * Starts a stand alone ping-pong client running in verbose mode. - * - * @param args - */ - public static void main(String[] args) throws Exception - { - System.out.println("Starting..."); - - // Display help on the command line. - if (args.length == 0) - { - _logger.info("Running test with default values..."); - //usage(); - //System.exit(0); - } - - // Extract all command line parameters. - Config config = new Config(); - config.setOptions(args); - String brokerDetails = config.getHost() + ":" + config.getPort(); - String virtualpath = "test"; - String destinationName = config.getDestination(); - if (destinationName == null) - { - destinationName = DEFAULT_DESTINATION_NAME; - } - - String selector = config.getSelector(); - boolean transacted = config.isTransacted(); - boolean persistent = config.usePersistentMessages(); - boolean pubsub = config.isPubSub(); - boolean verbose = true; - - //String selector = null; - - // Instantiate the ping pong client with the command line options and start it running. - PingPongBouncer pingBouncer = - new PingPongBouncer(brokerDetails, "guest", "guest", virtualpath, destinationName, persistent, transacted, - selector, verbose, pubsub); - pingBouncer.getConnection().start(); - - System.out.println("Waiting..."); - } - - private static void usage() - { - System.err.println("Usage: PingPongBouncer \n" + "-host : broker host\n" + "-port : broker port\n" - + "-destinationname : queue/topic name\n" + "-transacted : (true/false). Default is false\n" - + "-persistent : (true/false). Default is false\n" - + "-pubsub : (true/false). Default is false\n" + "-selector : selector string\n"); - } - - /** - * This is a callback method that is notified of all messages for which this has been registered as a message - * listener on a message consumer. It sends a reply (pong) to all messages it receieves on the reply to - * destination of the message. - * - * @param message The message that triggered this callback. - */ - public void onMessage(Message message) - { - try - { - String messageCorrelationId = message.getJMSCorrelationID(); - if (_verbose) - { - _logger.info(timestampFormatter.format(new Date()) + ": Got ping with correlation id, " - + messageCorrelationId); - } - - // Get the reply to destination from the message and check it is set. - Destination responseDest = message.getJMSReplyTo(); - - if (responseDest == null) - { - _logger.debug("Cannot send reply because reply-to destination is null."); - - return; - } - - // Spew out some timing information if verbose mode is on. - if (_verbose) - { - Long timestamp = message.getLongProperty("timestamp"); - - if (timestamp != null) - { - long diff = System.currentTimeMillis() - timestamp; - _logger.info("Time to bounce point: " + diff); - } - } - - // Correlate the reply to the original. - message.setJMSCorrelationID(messageCorrelationId); - - // Send the receieved message as the pong reply. - _replyProducer.send(responseDest, message); - - if (_verbose) - { - _logger.info(timestampFormatter.format(new Date()) + ": Sent reply with correlation id, " - + messageCorrelationId); - } - - // Commit the transaction if running in transactional mode. - commitTx(_producerSession); - } - catch (JMSException e) - { - _logger.debug("There was a JMSException: " + e.getMessage(), e); - } - } - - /** - * Gets the underlying connection that this ping client is running on. - * - * @return The underlying connection that this ping client is running on. - */ - public AMQConnection getConnection() - { - return _connection; - } - - /** - * Sets the connection that this ping client is using. - * - * @param connection The ping connection. - */ - public void setConnection(AMQConnection connection) - { - this._connection = connection; - } - - /** - * Sets or clears the pub/sub flag to indiciate whether this client is pinging a queue or a topic. - * - * @param pubsub <tt>true</tt> if this client is pinging a topic, <tt>false</tt> if it is pinging a queue. - */ - public void setPubSub(boolean pubsub) - { - _isPubSub = pubsub; - } - - /** - * Checks whether this client is a p2p or pub/sub ping client. - * - * @return <tt>true</tt> if this client is pinging a topic, <tt>false</tt> if it is pinging a queue. - */ - public boolean isPubSub() - { - return _isPubSub; - } - - /** - * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not - * a transactional controlSession, this method does nothing. - * - * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the - * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker - * after the commit is applied. - * - * @throws javax.jms.JMSException If the commit fails and then the rollback fails. - */ - protected void commitTx(Session session) throws JMSException - { - if (session.getTransacted()) - { - try - { - if (_failBeforeCommit) - { - _logger.debug("Failing Before Commit"); - doFailover(); - } - - session.commit(); - - if (_failAfterCommit) - { - _logger.debug("Failing After Commit"); - doFailover(); - } - - _logger.debug("Session Commited."); - } - catch (JMSException e) - { - _logger.trace("JMSException on commit:" + e.getMessage(), e); - - try - { - session.rollback(); - _logger.debug("Message rolled back."); - } - catch (JMSException jmse) - { - _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse); - - // Both commit and rollback failed. Throw the rollback exception. - throw jmse; - } - } - } - } - - /** - * Prompts the user to terminate the named broker, in order to test failover functionality. This method will block - * until the user supplied some input on the terminal. - * - * @param broker The name of the broker to terminate. - */ - protected void doFailover(String broker) - { - System.out.println("Kill Broker " + broker + " now."); - try - { - System.in.read(); - } - catch (IOException e) - { } - - System.out.println("Continuing."); - } - - /** - * Prompts the user to terminate the broker, in order to test failover functionality. This method will block - * until the user supplied some input on the terminal. - */ - protected void doFailover() - { - System.out.println("Kill Broker now."); - try - { - System.in.read(); - } - catch (IOException e) - { } - - System.out.println("Continuing."); - - } - - private void createConsumerDestination(String name) - { - if (isPubSub()) - { - _consumerDestination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, name); - } - else - { - _consumerDestination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, name); - } - } - - /** - * A connection listener that logs out any failover complete events. Could do more interesting things with this - * at some point... - */ - public static class FailoverNotifier implements ConnectionListener - { - public void bytesSent(long count) - { } - - public void bytesReceived(long count) - { } - - public boolean preFailover(boolean redirect) - { - return true; - } - - public boolean preResubscribe() - { - return true; - } - - public void failoverComplete() - { - _logger.info("App got failover complete callback."); - } - } -} |