diff options
Diffstat (limited to 'qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java')
-rw-r--r-- | qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java | 452 |
1 files changed, 452 insertions, 0 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java new file mode 100644 index 0000000000..a15897c82b --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java @@ -0,0 +1,452 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ping; + +import org.apache.log4j.Logger; + +import org.apache.qpid.requestreply.PingPongProducer; +import org.apache.qpid.util.CommandLineParser; + +import org.apache.qpid.junit.extensions.util.MathUtils; +import org.apache.qpid.junit.extensions.util.ParsedProperties; + +import javax.jms.*; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * PingDurableClient is a variation of the {@link PingPongProducer} ping tool. Instead of sending its pings and + * receiving replies to them at the same time, this tool sends pings until it is signalled by some 'event' to stop + * sending. It then waits for another signal before it re-opens a fresh connection and attempts to receive all of the + * pings that it has succesfully sent. It is intended to be an interactive test that lets a user experiment with + * failure conditions when using durable messaging. + * + * <p/>The events that can stop it from sending are input from the user on the console, failure of its connection to + * the broker, completion of sending a specified number of messages, or expiry of a specified duration. In all cases + * it will do its best to clean up and close the connection before opening a fresh connection to receive the pings + * with. + * + * <p/>The event to re-connect and attempt to recieve the pings is input from the user on the console. + * + * <p/>This ping client inherits the configuration properties of its parent class ({@link PingPongProducer}) and + * additionally accepts the following parameters: + * + * <p/><table><caption>Parameters</caption> + * <tr><th> Parameter <th> Default <th> Comments + * <tr><td> numMessages <td> 100 <td> The total number of messages to send. + * <tr><td> numMessagesToAction <td> -1 <td> The number of messages to send before taking a custom 'action'. + * <tr><td> duration <td> 30S <td> The length of time to ping for. (Format dDhHmMsS, for d days, h hours, + * m minutes and s seconds). + * </table> + * + * <p/>This ping client also overrides some of the defaults of its parent class, to provide a reasonable set up + * when no parameters are specified. + * + * <p/><table><caption>Parameters</caption> + * <tr><th> Parameter <th> Default <th> Comments + * <tr><td> uniqueDests <td> false <td> Prevents destination names being timestamped. + * <tr><td> transacted <td> true <td> Only makes sense to test with transactions. + * <tr><td> persistent <td> true <td> Only makes sense to test persistent. + * <tr><td> durableDests <td> true <td> Should use durable queues with persistent messages. + * <tr><td> commitBatchSize <td> 10 + * <tr><td> rate <td> 20 <td> Total default test time is 5 seconds. + * </table> + * + * <p/>When a number of messages or duration is specified, this ping client will ping until the first of those limits + * is reached. Reaching the limit will be interpreted as the first signal to stop sending, and the ping client will + * wait for the second signal before receiving its pings. + * + * <p/>This class provides a mechanism for extensions to add arbitrary actions, after a particular number of messages + * have been sent. When the number of messages equal the value set in the 'numMessagesToAction' property is method, + * the {@link #takeAction} method is called. By default this does nothing, but extensions of this class can provide + * custom behaviour with alternative implementations of this method (for example taking a backup). + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Send and receive pings. + * <tr><td> Accept user input to signal stop sending. + * <tr><td> Accept user input to signal start receiving. + * <tr><td> Provide feedback on pings sent versus pings received. + * <tr><td> Provide extension point for arbitrary action on a particular message count. + * </table> + */ +public class PingDurableClient extends PingPongProducer implements ExceptionListener +{ + private static final Logger log = Logger.getLogger(PingDurableClient.class); + + public static final String NUM_MESSAGES_PROPNAME = "numMessages"; + public static final String NUM_MESSAGES_DEFAULT = "100"; + public static final String DURATION_PROPNAME = "duration"; + public static final String DURATION_DEFAULT = "30S"; + public static final String NUM_MESSAGES_TO_ACTION_PROPNAME = "numMessagesToAction"; + public static final String NUM_MESSAGES_TO_ACTION_DEFAULT = "-1"; + + /** The maximum length of time to wait whilst receiving pings before assuming that no more are coming. */ + private static final long TIME_OUT = 3000; + + static + { + defaults.setProperty(NUM_MESSAGES_PROPNAME, NUM_MESSAGES_DEFAULT); + defaults.setProperty(DURATION_PROPNAME, DURATION_DEFAULT); + defaults.setProperty(UNIQUE_DESTS_PROPNAME, "false"); + defaults.setProperty(TRANSACTED_PROPNAME, "true"); + defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true"); + defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10"); + defaults.setProperty(RATE_PROPNAME, "20"); + defaults.setProperty(DURABLE_DESTS_PROPNAME, "true"); + defaults.setProperty(NUM_MESSAGES_TO_ACTION_PROPNAME, NUM_MESSAGES_TO_ACTION_DEFAULT); + } + + /** Specifies the number of pings to send, if larger than 0. 0 means send until told to stop. */ + private int numMessages; + + /** Holds the number of messages to send before taking triggering the action. */ + private int numMessagesToAction; + + /** Sepcifies how long to ping for, if larger than 0. 0 means send until told to stop. */ + private long duration; + + /** Used to indciate that this application should terminate. Set by the shutdown hook. */ + private boolean terminate = false; + + /** + * @throws Exception Any exceptions are allowed to fall through. + */ + public PingDurableClient(Properties overrides) throws Exception + { + super(overrides); + log.debug("public PingDurableClient(Properties overrides = " + overrides + "): called"); + + // Extract the additional configuration parameters. + ParsedProperties properties = new ParsedProperties(defaults); + properties.putAll(overrides); + + numMessages = properties.getPropertyAsInteger(NUM_MESSAGES_PROPNAME); + String durationSpec = properties.getProperty(DURATION_PROPNAME); + numMessagesToAction = properties.getPropertyAsInteger(NUM_MESSAGES_TO_ACTION_PROPNAME); + + if (durationSpec != null) + { + duration = MathUtils.parseDuration(durationSpec) * 1000000; + } + } + + /** + * Starts the ping/wait/receive process. + * + * @param args The command line arguments. + */ + public static void main(String[] args) + { + try + { + // Create a ping producer overriding its defaults with all options passed on the command line. + Properties options = + CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties()); + PingDurableClient pingProducer = new PingDurableClient(options); + + // Create a shutdown hook to terminate the ping-pong producer. + Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); + + // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. + // pingProducer.getConnection().setExceptionListener(pingProducer); + + // Run the test procedure. + int sent = pingProducer.send(); + pingProducer.closeConnection(); + pingProducer.waitForUser("Press return to begin receiving the pings."); + pingProducer.receive(sent); + + System.exit(0); + } + catch (Exception e) + { + System.err.println(e.getMessage()); + log.error("Top level handler caught execption.", e); + System.exit(1); + } + } + + /** + * Performs the main test procedure implemented by this ping client. See the class level comment for details. + */ + protected int send() throws Exception + { + log.debug("public void sendWaitReceive(): called"); + + log.debug("duration = " + duration); + log.debug("numMessages = " + numMessages); + + if (duration > 0) + { + System.out.println("Sending for up to " + (duration / 1000000000f) + " seconds."); + } + + if (_rate > 0) + { + System.out.println("Sending at " + _rate + " messages per second."); + } + + if (numMessages > 0) + { + System.out.println("Sending up to " + numMessages + " messages."); + } + + // Establish the connection and the message producer. + establishConnection(true, false); + _connection.start(); + + Message message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent); + + // Send pings until a terminating condition is received. + boolean endCondition = false; + int messagesSent = 0; + int messagesCommitted = 0; + int messagesNotCommitted = 0; + long start = System.nanoTime(); + + // Clear console in. + clearConsole(); + + while (!endCondition) + { + boolean committed = false; + + try + { + committed = sendMessage(messagesSent, message) && _transacted; + + messagesSent++; + messagesNotCommitted++; + + // Keep count of the number of messsages currently committed and pending commit. + if (committed) + { + log.debug("Adding " + messagesNotCommitted + " messages to the committed count."); + messagesCommitted += messagesNotCommitted; + messagesNotCommitted = 0; + + System.out.println("Commited: " + messagesCommitted); + } + } + catch (JMSException e) + { + log.debug("Got JMSException whilst sending."); + _publish = false; + } + + // Perform the arbitrary action if the number of messages sent has reached the right number. + if (messagesSent == numMessagesToAction) + { + System.out.println("At action point, Messages sent = " + messagesSent + ", Messages Committed = " + + messagesCommitted + ", Messages not Committed = " + messagesNotCommitted); + takeAction(); + } + + // Determine if the end condition has been met, based on the number of messages, time passed, errors on + // the connection or user input. + long now = System.nanoTime(); + + if ((duration != 0) && ((now - start) > duration)) + { + System.out.println("Send halted because duration expired."); + endCondition = true; + } + else if ((numMessages != 0) && (messagesSent >= numMessages)) + { + System.out.println("Send halted because # messages completed."); + endCondition = true; + } + else if (System.in.available() > 0) + { + System.out.println("Send halted by user input."); + endCondition = true; + + clearConsole(); + } + else if (!_publish) + { + System.out.println("Send halted by error on the connection."); + endCondition = true; + } + } + + log.debug("messagesSent = " + messagesSent); + log.debug("messagesCommitted = " + messagesCommitted); + log.debug("messagesNotCommitted = " + messagesNotCommitted); + + System.out.println("Messages sent: " + messagesSent + ", Messages Committed = " + messagesCommitted + + ", Messages not Committed = " + messagesNotCommitted); + + return messagesSent; + } + + protected void closeConnection() + { + // Clean up the connection. + try + { + close(); + } + catch (JMSException e) + { + log.debug("There was an error whilst closing the connection: " + e, e); + System.out.println("There was an error whilst closing the connection."); + + // Ignore as did best could manage to clean up. + } + } + + protected void receive(int messagesSent) throws Exception + { + // Re-establish the connection and the message consumer. + _queueJVMSequenceID = new AtomicInteger(); + _queueSharedID = new AtomicInteger(); + + establishConnection(false, true); + _consumer[0].setMessageListener(null); + _consumerConnection[0].start(); + + // Try to receive all of the pings that were successfully sent. + int messagesReceived = 0; + boolean endCondition = false; + + while (!endCondition) + { + // Message received = _consumer.receiveNoWait(); + Message received = _consumer[0].receive(TIME_OUT); + log.debug("received = " + received); + + if (received != null) + { + messagesReceived++; + } + + // Determine if the end condition has been met, based on the number of messages and time passed since last + // receiving a message. + if (received == null) + { + System.out.println("Timed out."); + endCondition = true; + } + else if (messagesReceived >= messagesSent) + { + System.out.println("Got all messages."); + endCondition = true; + } + } + + // Ensure messages received are committed. + if (_consTransacted) + { + try + { + _consumerSession[0].commit(); + System.out.println("Committed for all messages received."); + } + catch (JMSException e) + { + log.debug("Error during commit: " + e, e); + System.out.println("Error during commit."); + try + { + _consumerSession[0].rollback(); + System.out.println("Rolled back on all messages received."); + } + catch (JMSException e2) + { + log.debug("Error during rollback: " + e, e); + System.out.println("Error on roll back of all messages received."); + } + + } + } + + log.debug("messagesReceived = " + messagesReceived); + + System.out.println("Messages received: " + messagesReceived); + + // Clean up the connection. + close(); + } + + /** + * Clears any pending input from the console. + */ + private void clearConsole() + { + try + { + BufferedReader bis = new BufferedReader(new InputStreamReader(System.in)); + + // System.in.skip(System.in.available()); + while (bis.ready()) + { + bis.readLine(); + } + } + catch (IOException e) + { } + } + + /** + * Returns the ping destinations themselves as the reply destinations for this pinger to listen to. This has the + * effect of making this pinger listen to its own pings. + * + * @return The ping destinations. + */ + public List<Destination> getReplyDestinations() + { + return _pingDestinations; + } + + /** + * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered with + * the runtime system as a shutdown hook. This shutdown hook sets an additional terminate flag, compared with the + * shutdown hook in {@link PingPongProducer}, because the publish flag is used to indicate that sending or receiving + * message should stop, not that the application should termiante. + * + * @return A shutdown hook for the ping loop. + */ + public Thread getShutdownHook() + { + return new Thread(new Runnable() + { + public void run() + { + stop(); + terminate = true; + } + }); + } + + /** + * Performs an aribtrary action once the 'numMesagesToAction' count is reached on sending messages. This default + * implementation does nothing. + */ + public void takeAction() + { } +} |