summaryrefslogtreecommitdiff
path: root/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java')
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java452
1 files changed, 0 insertions, 452 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
deleted file mode 100644
index a15897c82b..0000000000
--- a/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
+++ /dev/null
@@ -1,452 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.ping;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.requestreply.PingPongProducer;
-import org.apache.qpid.util.CommandLineParser;
-
-import org.apache.qpid.junit.extensions.util.MathUtils;
-import org.apache.qpid.junit.extensions.util.ParsedProperties;
-
-import javax.jms.*;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * PingDurableClient is a variation of the {@link PingPongProducer} ping tool. Instead of sending its pings and
- * receiving replies to them at the same time, this tool sends pings until it is signalled by some 'event' to stop
- * sending. It then waits for another signal before it re-opens a fresh connection and attempts to receive all of the
- * pings that it has succesfully sent. It is intended to be an interactive test that lets a user experiment with
- * failure conditions when using durable messaging.
- *
- * <p/>The events that can stop it from sending are input from the user on the console, failure of its connection to
- * the broker, completion of sending a specified number of messages, or expiry of a specified duration. In all cases
- * it will do its best to clean up and close the connection before opening a fresh connection to receive the pings
- * with.
- *
- * <p/>The event to re-connect and attempt to recieve the pings is input from the user on the console.
- *
- * <p/>This ping client inherits the configuration properties of its parent class ({@link PingPongProducer}) and
- * additionally accepts the following parameters:
- *
- * <p/><table><caption>Parameters</caption>
- * <tr><th> Parameter <th> Default <th> Comments
- * <tr><td> numMessages <td> 100 <td> The total number of messages to send.
- * <tr><td> numMessagesToAction <td> -1 <td> The number of messages to send before taking a custom 'action'.
- * <tr><td> duration <td> 30S <td> The length of time to ping for. (Format dDhHmMsS, for d days, h hours,
- * m minutes and s seconds).
- * </table>
- *
- * <p/>This ping client also overrides some of the defaults of its parent class, to provide a reasonable set up
- * when no parameters are specified.
- *
- * <p/><table><caption>Parameters</caption>
- * <tr><th> Parameter <th> Default <th> Comments
- * <tr><td> uniqueDests <td> false <td> Prevents destination names being timestamped.
- * <tr><td> transacted <td> true <td> Only makes sense to test with transactions.
- * <tr><td> persistent <td> true <td> Only makes sense to test persistent.
- * <tr><td> durableDests <td> true <td> Should use durable queues with persistent messages.
- * <tr><td> commitBatchSize <td> 10
- * <tr><td> rate <td> 20 <td> Total default test time is 5 seconds.
- * </table>
- *
- * <p/>When a number of messages or duration is specified, this ping client will ping until the first of those limits
- * is reached. Reaching the limit will be interpreted as the first signal to stop sending, and the ping client will
- * wait for the second signal before receiving its pings.
- *
- * <p/>This class provides a mechanism for extensions to add arbitrary actions, after a particular number of messages
- * have been sent. When the number of messages equal the value set in the 'numMessagesToAction' property is method,
- * the {@link #takeAction} method is called. By default this does nothing, but extensions of this class can provide
- * custom behaviour with alternative implementations of this method (for example taking a backup).
- *
- * <p><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Send and receive pings.
- * <tr><td> Accept user input to signal stop sending.
- * <tr><td> Accept user input to signal start receiving.
- * <tr><td> Provide feedback on pings sent versus pings received.
- * <tr><td> Provide extension point for arbitrary action on a particular message count.
- * </table>
- */
-public class PingDurableClient extends PingPongProducer implements ExceptionListener
-{
- private static final Logger log = Logger.getLogger(PingDurableClient.class);
-
- public static final String NUM_MESSAGES_PROPNAME = "numMessages";
- public static final String NUM_MESSAGES_DEFAULT = "100";
- public static final String DURATION_PROPNAME = "duration";
- public static final String DURATION_DEFAULT = "30S";
- public static final String NUM_MESSAGES_TO_ACTION_PROPNAME = "numMessagesToAction";
- public static final String NUM_MESSAGES_TO_ACTION_DEFAULT = "-1";
-
- /** The maximum length of time to wait whilst receiving pings before assuming that no more are coming. */
- private static final long TIME_OUT = 3000;
-
- static
- {
- defaults.setProperty(NUM_MESSAGES_PROPNAME, NUM_MESSAGES_DEFAULT);
- defaults.setProperty(DURATION_PROPNAME, DURATION_DEFAULT);
- defaults.setProperty(UNIQUE_DESTS_PROPNAME, "false");
- defaults.setProperty(TRANSACTED_PROPNAME, "true");
- defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true");
- defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10");
- defaults.setProperty(RATE_PROPNAME, "20");
- defaults.setProperty(DURABLE_DESTS_PROPNAME, "true");
- defaults.setProperty(NUM_MESSAGES_TO_ACTION_PROPNAME, NUM_MESSAGES_TO_ACTION_DEFAULT);
- }
-
- /** Specifies the number of pings to send, if larger than 0. 0 means send until told to stop. */
- private int numMessages;
-
- /** Holds the number of messages to send before taking triggering the action. */
- private int numMessagesToAction;
-
- /** Sepcifies how long to ping for, if larger than 0. 0 means send until told to stop. */
- private long duration;
-
- /** Used to indciate that this application should terminate. Set by the shutdown hook. */
- private boolean terminate = false;
-
- /**
- * @throws Exception Any exceptions are allowed to fall through.
- */
- public PingDurableClient(Properties overrides) throws Exception
- {
- super(overrides);
- log.debug("public PingDurableClient(Properties overrides = " + overrides + "): called");
-
- // Extract the additional configuration parameters.
- ParsedProperties properties = new ParsedProperties(defaults);
- properties.putAll(overrides);
-
- numMessages = properties.getPropertyAsInteger(NUM_MESSAGES_PROPNAME);
- String durationSpec = properties.getProperty(DURATION_PROPNAME);
- numMessagesToAction = properties.getPropertyAsInteger(NUM_MESSAGES_TO_ACTION_PROPNAME);
-
- if (durationSpec != null)
- {
- duration = MathUtils.parseDuration(durationSpec) * 1000000;
- }
- }
-
- /**
- * Starts the ping/wait/receive process.
- *
- * @param args The command line arguments.
- */
- public static void main(String[] args)
- {
- try
- {
- // Create a ping producer overriding its defaults with all options passed on the command line.
- Properties options =
- CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties());
- PingDurableClient pingProducer = new PingDurableClient(options);
-
- // Create a shutdown hook to terminate the ping-pong producer.
- Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
-
- // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
- // pingProducer.getConnection().setExceptionListener(pingProducer);
-
- // Run the test procedure.
- int sent = pingProducer.send();
- pingProducer.closeConnection();
- pingProducer.waitForUser("Press return to begin receiving the pings.");
- pingProducer.receive(sent);
-
- System.exit(0);
- }
- catch (Exception e)
- {
- System.err.println(e.getMessage());
- log.error("Top level handler caught execption.", e);
- System.exit(1);
- }
- }
-
- /**
- * Performs the main test procedure implemented by this ping client. See the class level comment for details.
- */
- protected int send() throws Exception
- {
- log.debug("public void sendWaitReceive(): called");
-
- log.debug("duration = " + duration);
- log.debug("numMessages = " + numMessages);
-
- if (duration > 0)
- {
- System.out.println("Sending for up to " + (duration / 1000000000f) + " seconds.");
- }
-
- if (_rate > 0)
- {
- System.out.println("Sending at " + _rate + " messages per second.");
- }
-
- if (numMessages > 0)
- {
- System.out.println("Sending up to " + numMessages + " messages.");
- }
-
- // Establish the connection and the message producer.
- establishConnection(true, false);
- _connection.start();
-
- Message message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent);
-
- // Send pings until a terminating condition is received.
- boolean endCondition = false;
- int messagesSent = 0;
- int messagesCommitted = 0;
- int messagesNotCommitted = 0;
- long start = System.nanoTime();
-
- // Clear console in.
- clearConsole();
-
- while (!endCondition)
- {
- boolean committed = false;
-
- try
- {
- committed = sendMessage(messagesSent, message) && _transacted;
-
- messagesSent++;
- messagesNotCommitted++;
-
- // Keep count of the number of messsages currently committed and pending commit.
- if (committed)
- {
- log.debug("Adding " + messagesNotCommitted + " messages to the committed count.");
- messagesCommitted += messagesNotCommitted;
- messagesNotCommitted = 0;
-
- System.out.println("Commited: " + messagesCommitted);
- }
- }
- catch (JMSException e)
- {
- log.debug("Got JMSException whilst sending.");
- _publish = false;
- }
-
- // Perform the arbitrary action if the number of messages sent has reached the right number.
- if (messagesSent == numMessagesToAction)
- {
- System.out.println("At action point, Messages sent = " + messagesSent + ", Messages Committed = "
- + messagesCommitted + ", Messages not Committed = " + messagesNotCommitted);
- takeAction();
- }
-
- // Determine if the end condition has been met, based on the number of messages, time passed, errors on
- // the connection or user input.
- long now = System.nanoTime();
-
- if ((duration != 0) && ((now - start) > duration))
- {
- System.out.println("Send halted because duration expired.");
- endCondition = true;
- }
- else if ((numMessages != 0) && (messagesSent >= numMessages))
- {
- System.out.println("Send halted because # messages completed.");
- endCondition = true;
- }
- else if (System.in.available() > 0)
- {
- System.out.println("Send halted by user input.");
- endCondition = true;
-
- clearConsole();
- }
- else if (!_publish)
- {
- System.out.println("Send halted by error on the connection.");
- endCondition = true;
- }
- }
-
- log.debug("messagesSent = " + messagesSent);
- log.debug("messagesCommitted = " + messagesCommitted);
- log.debug("messagesNotCommitted = " + messagesNotCommitted);
-
- System.out.println("Messages sent: " + messagesSent + ", Messages Committed = " + messagesCommitted
- + ", Messages not Committed = " + messagesNotCommitted);
-
- return messagesSent;
- }
-
- protected void closeConnection()
- {
- // Clean up the connection.
- try
- {
- close();
- }
- catch (JMSException e)
- {
- log.debug("There was an error whilst closing the connection: " + e, e);
- System.out.println("There was an error whilst closing the connection.");
-
- // Ignore as did best could manage to clean up.
- }
- }
-
- protected void receive(int messagesSent) throws Exception
- {
- // Re-establish the connection and the message consumer.
- _queueJVMSequenceID = new AtomicInteger();
- _queueSharedID = new AtomicInteger();
-
- establishConnection(false, true);
- _consumer[0].setMessageListener(null);
- _consumerConnection[0].start();
-
- // Try to receive all of the pings that were successfully sent.
- int messagesReceived = 0;
- boolean endCondition = false;
-
- while (!endCondition)
- {
- // Message received = _consumer.receiveNoWait();
- Message received = _consumer[0].receive(TIME_OUT);
- log.debug("received = " + received);
-
- if (received != null)
- {
- messagesReceived++;
- }
-
- // Determine if the end condition has been met, based on the number of messages and time passed since last
- // receiving a message.
- if (received == null)
- {
- System.out.println("Timed out.");
- endCondition = true;
- }
- else if (messagesReceived >= messagesSent)
- {
- System.out.println("Got all messages.");
- endCondition = true;
- }
- }
-
- // Ensure messages received are committed.
- if (_consTransacted)
- {
- try
- {
- _consumerSession[0].commit();
- System.out.println("Committed for all messages received.");
- }
- catch (JMSException e)
- {
- log.debug("Error during commit: " + e, e);
- System.out.println("Error during commit.");
- try
- {
- _consumerSession[0].rollback();
- System.out.println("Rolled back on all messages received.");
- }
- catch (JMSException e2)
- {
- log.debug("Error during rollback: " + e, e);
- System.out.println("Error on roll back of all messages received.");
- }
-
- }
- }
-
- log.debug("messagesReceived = " + messagesReceived);
-
- System.out.println("Messages received: " + messagesReceived);
-
- // Clean up the connection.
- close();
- }
-
- /**
- * Clears any pending input from the console.
- */
- private void clearConsole()
- {
- try
- {
- BufferedReader bis = new BufferedReader(new InputStreamReader(System.in));
-
- // System.in.skip(System.in.available());
- while (bis.ready())
- {
- bis.readLine();
- }
- }
- catch (IOException e)
- { }
- }
-
- /**
- * Returns the ping destinations themselves as the reply destinations for this pinger to listen to. This has the
- * effect of making this pinger listen to its own pings.
- *
- * @return The ping destinations.
- */
- public List<Destination> getReplyDestinations()
- {
- return _pingDestinations;
- }
-
- /**
- * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered with
- * the runtime system as a shutdown hook. This shutdown hook sets an additional terminate flag, compared with the
- * shutdown hook in {@link PingPongProducer}, because the publish flag is used to indicate that sending or receiving
- * message should stop, not that the application should termiante.
- *
- * @return A shutdown hook for the ping loop.
- */
- public Thread getShutdownHook()
- {
- return new Thread(new Runnable()
- {
- public void run()
- {
- stop();
- terminate = true;
- }
- });
- }
-
- /**
- * Performs an aribtrary action once the 'numMesagesToAction' count is reached on sending messages. This default
- * implementation does nothing.
- */
- public void takeAction()
- { }
-}