diff options
Diffstat (limited to 'java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java')
-rw-r--r-- | java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java | 1818 |
1 files changed, 0 insertions, 1818 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java deleted file mode 100644 index 639b0c72ba..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ /dev/null @@ -1,1818 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.requestreply; - -import org.apache.log4j.Logger; -import org.apache.log4j.NDC; - -import org.apache.qpid.test.framework.TestUtils; - -import org.apache.qpid.junit.extensions.BatchedThrottle; -import org.apache.qpid.junit.extensions.Throttle; -import org.apache.qpid.junit.extensions.util.CommandLineParser; -import org.apache.qpid.junit.extensions.util.ParsedProperties; - -import javax.jms.*; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; - -import java.io.*; -import java.net.InetAddress; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.*; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -/** - * PingPongProducer is a client that sends test messages, and waits for replies to these messages. The replies may - * either be generated by another client (see {@link PingPongBouncer}, or an extension of it may be used that listens - * to its own messages and does not send replies (see {@link org.apache.qpid.ping.PingClient}). The intention of ping - * pong producer is that it is a swiss-army knife test client that makes almost every aspect of its behaviour - * configurable. - * - * <p/>The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings. This - * means that this class has to do some work to correlate pings with pongs; it expectes the original message correlation - * id in the ping to be bounced back in the reply correlation id. - * - * <p/>This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor. It - * can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings within - * transactions; control the number of pings to send in each transaction; limit its sending rate; and perform failover - * testing. A complete list of accepted parameters, default values and comments on their usage is provided here: - * - * <p/><table><caption>Parameters</caption> - * <tr><th> Parameter <th> Default <th> Comments - * <tr><td> messageSize <td> 0 <td> Message size in bytes. Not including any headers. - * <tr><td> destinationName <td> ping <td> The root name to use to generate destination names to ping. - * <tr><td> persistent <td> false <td> Determines whether peristent delivery is used. - * <tr><td> transacted <td> false <td> Determines whether messages are sent/received in transactions. - * <tr><td> broker <td> tcp://localhost:5672 <td> Determines the broker to connect to. - * <tr><td> virtualHost <td> test <td> Determines the virtual host to send all ping over. - * <tr><td> rate <td> 0 <td> The maximum rate (in hertz) to send messages at. 0 means no limit. - * <tr><td> verbose <td> false <td> The verbose flag for debugging. Prints to console on every message. - * <tr><td> pubsub <td> false <td> Whether to ping topics or queues. Uses p2p by default. - * <tr><td> failAfterCommit <td> false <td> Whether to prompt user to kill broker after a commit batch. - * <tr><td> failBeforeCommit <td> false <td> Whether to prompt user to kill broker before a commit batch. - * <tr><td> failAfterSend <td> false <td> Whether to prompt user to kill broker after a send. - * <tr><td> failBeforeSend <td> false <td> Whether to prompt user to kill broker before a send. - * <tr><td> failOnce <td> true <td> Whether to prompt for failover only once. - * <tr><td> username <td> guest <td> The username to access the broker with. - * <tr><td> password <td> guest <td> The password to access the broker with. - * <tr><td> selector <td> null <td> Not used. Defines a message selector to filter pings with. - * <tr><td> destinationCount <td> 1 <td> The number of destinations to send pings to. - * <tr><td> numConsumers <td> 1 <td> The number of consumers on each destination. - * <tr><td> timeout <td> 30000 <td> In milliseconds. The timeout to stop waiting for replies. - * <tr><td> commitBatchSize <td> 1 <td> The number of messages per transaction in transactional mode. - * <tr><td> uniqueDests <td> true <td> Whether each receivers only listens to one ping destination or all. - * <tr><td> durableDests <td> false <td> Whether or not durable destinations are used. - * <tr><td> ackMode <td> AUTO_ACK <td> The message acknowledgement mode. Possible values are: - * 0 - SESSION_TRANSACTED - * 1 - AUTO_ACKNOWLEDGE - * 2 - CLIENT_ACKNOWLEDGE - * 3 - DUPS_OK_ACKNOWLEDGE - * 257 - NO_ACKNOWLEDGE - * 258 - PRE_ACKNOWLEDGE - * <tr><td> consTransacted <td> false <td> Whether or not consumers use transactions. Defaults to the same value - * as the 'transacted' option if not seperately defined. - * <tr><td> consAckMode <td> AUTO_ACK <td> The message acknowledgement mode for consumers. Defaults to the same - * value as 'ackMode' if not seperately defined. - * <tr><td> maxPending <td> 0 <td> The maximum size in bytes, of messages sent but not yet received. - * Limits the volume of messages currently buffered on the client - * or broker. Can help scale test clients by limiting amount of buffered - * data to avoid out of memory errors. - * </table> - * - * <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop - * does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so by - * starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is also - * registered to terminate the ping-pong loop cleanly. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Provide a ping and wait for all responses cycle. - * <tr><td> Provide command line invocation to loop the ping cycle on a configurable broker url. - * </table> - * - * @todo Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock pair. - * Obtain read lock on all messages, before decrementing the message count. At the end of the on message method add a - * block that obtains the write lock for the very last message, releases any waiting producer. Means that the last - * message waits until all other messages have been handled before releasing producers but allows messages to be - * processed concurrently, unlike the current synchronized block. - */ -public class PingPongProducer implements Runnable, ExceptionListener -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(PingPongProducer.class); - - /** Holds the name of the property to determine whether of not client id is overridden at connection time. */ - public static final String OVERRIDE_CLIENT_ID_PROPNAME = "overrideClientId"; - - /** Holds the default value of the override client id flag. */ - public static final String OVERRIDE_CLIENT_ID_DEAFULT = "false"; - - /** Holds the name of the property to define the JNDI factory name with. */ - public static final String FACTORY_NAME_PROPNAME = "factoryName"; - - /** Holds the default JNDI name of the connection factory. */ - public static final String FACTORY_NAME_DEAFULT = "local"; - - /** Holds the name of the property to set the JNDI initial context properties with. */ - public static final String FILE_PROPERTIES_PROPNAME = "properties"; - - /** Holds the default file name of the JNDI initial context properties. */ - public static final String FILE_PROPERTIES_DEAFULT = "perftests.properties"; - - /** Holds the name of the property to get the test message size from. */ - public static final String MESSAGE_SIZE_PROPNAME = "messageSize"; - - /** Used to set up a default message size. */ - public static final int MESSAGE_SIZE_DEAFULT = 0; - - /** Holds the name of the property to get the ping queue name from. */ - public static final String PING_QUEUE_NAME_PROPNAME = "destinationName"; - - /** Holds the name of the default destination to send pings on. */ - public static final String PING_QUEUE_NAME_DEFAULT = "ping"; - - /** Holds the name of the property to get the queue name postfix from. */ - public static final String QUEUE_NAME_POSTFIX_PROPNAME = "queueNamePostfix"; - - /** Holds the default queue name postfix value. */ - public static final String QUEUE_NAME_POSTFIX_DEFAULT = ""; - - /** Holds the name of the property to get the test delivery mode from. */ - public static final String PERSISTENT_MODE_PROPNAME = "persistent"; - - /** Holds the message delivery mode to use for the test. */ - public static final boolean PERSISTENT_MODE_DEFAULT = false; - - /** Holds the name of the property to get the test transactional mode from. */ - public static final String TRANSACTED_PROPNAME = "transacted"; - - /** Holds the transactional mode to use for the test. */ - public static final boolean TRANSACTED_DEFAULT = false; - - /** Holds the name of the property to get the test consumer transacted mode from. */ - public static final String CONSUMER_TRANSACTED_PROPNAME = "consTransacted"; - - /** Holds the consumer transactional mode default setting. */ - public static final boolean CONSUMER_TRANSACTED_DEFAULT = false; - - /** Holds the name of the property to get the test broker url from. */ - public static final String BROKER_PROPNAME = "broker"; - - /** Holds the default broker url for the test. */ - public static final String BROKER_DEFAULT = "tcp://localhost:5672"; - - /** Holds the name of the property to get the test broker virtual path. */ - public static final String VIRTUAL_HOST_PROPNAME = "virtualHost"; - - /** Holds the default virtual path for the test. */ - public static final String VIRTUAL_HOST_DEFAULT = ""; - - /** Holds the name of the property to get the message rate from. */ - public static final String RATE_PROPNAME = "rate"; - - /** Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. */ - public static final int RATE_DEFAULT = 0; - - /** Holds the name of the property to get the verbose mode proeprty from. */ - public static final String VERBOSE_PROPNAME = "verbose"; - - /** Holds the default verbose mode. */ - public static final boolean VERBOSE_DEFAULT = false; - - /** Holds the name of the property to get the p2p or pub/sub messaging mode from. */ - public static final String PUBSUB_PROPNAME = "pubsub"; - - /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */ - public static final boolean PUBSUB_DEFAULT = false; - - /** Holds the name of the property to get the fail after commit flag from. */ - public static final String FAIL_AFTER_COMMIT_PROPNAME = "failAfterCommit"; - - /** Holds the default failover after commit test flag. */ - public static final boolean FAIL_AFTER_COMMIT_DEFAULT = false; - - /** Holds the name of the proeprty to get the fail before commit flag from. */ - public static final String FAIL_BEFORE_COMMIT_PROPNAME = "failBeforeCommit"; - - /** Holds the default failover before commit test flag. */ - public static final boolean FAIL_BEFORE_COMMIT_DEFAULT = false; - - /** Holds the name of the proeprty to get the fail after send flag from. */ - public static final String FAIL_AFTER_SEND_PROPNAME = "failAfterSend"; - - /** Holds the default failover after send test flag. */ - public static final boolean FAIL_AFTER_SEND_DEFAULT = false; - - /** Holds the name of the property to get the fail before send flag from. */ - public static final String FAIL_BEFORE_SEND_PROPNAME = "failBeforeSend"; - - /** Holds the default failover before send test flag. */ - public static final boolean FAIL_BEFORE_SEND_DEFAULT = false; - - /** Holds the name of the property to get the fail once flag from. */ - public static final String FAIL_ONCE_PROPNAME = "failOnce"; - - /** The default failover once flag, true means only do one failover, false means failover on every commit cycle. */ - public static final boolean FAIL_ONCE_DEFAULT = true; - - /** Holds the name of the property to get the broker access username from. */ - public static final String USERNAME_PROPNAME = "username"; - - /** Holds the default broker log on username. */ - public static final String USERNAME_DEFAULT = "guest"; - - /** Holds the name of the property to get the broker access password from. */ - public static final String PASSWORD_PROPNAME = "password"; - - /** Holds the default broker log on password. */ - public static final String PASSWORD_DEFAULT = "guest"; - - /** Holds the name of the proeprty to get the. */ - public static final String SELECTOR_PROPNAME = "selector"; - - /** Holds the default message selector. */ - public static final String SELECTOR_DEFAULT = ""; - - /** Holds the name of the property to get the destination count from. */ - public static final String DESTINATION_COUNT_PROPNAME = "destinationCount"; - - /** Defines the default number of destinations to ping. */ - public static final int DESTINATION_COUNT_DEFAULT = 1; - - /** Holds the name of the property to get the number of consumers per destination from. */ - public static final String NUM_CONSUMERS_PROPNAME = "numConsumers"; - - /** Defines the default number consumers per destination. */ - public static final int NUM_CONSUMERS_DEFAULT = 1; - - /** Holds the name of the property to get the waiting timeout for response messages. */ - public static final String TIMEOUT_PROPNAME = "timeout"; - - /** Default time to wait before assuming that a ping has timed out. */ - public static final long TIMEOUT_DEFAULT = 30000; - - /** Holds the name of the property to get the commit batch size from. */ - public static final String TX_BATCH_SIZE_PROPNAME = "commitBatchSize"; - - /** Defines the default number of pings to send in each transaction when running transactionally. */ - public static final int TX_BATCH_SIZE_DEFAULT = 1; - - /** Holds the name of the property to get the unique destinations flag from. */ - public static final String UNIQUE_DESTS_PROPNAME = "uniqueDests"; - - /** Defines the default value for the unique destinations property. */ - public static final boolean UNIQUE_DESTS_DEFAULT = true; - - /** Holds the name of the property to get the durable destinations flag from. */ - public static final String DURABLE_DESTS_PROPNAME = "durableDests"; - - /** Defines the default value of the durable destinations flag. */ - public static final boolean DURABLE_DESTS_DEFAULT = false; - - /** Holds the name of the proeprty to get the message acknowledgement mode from. */ - public static final String ACK_MODE_PROPNAME = "ackMode"; - - /** Defines the default message acknowledgement mode. */ - public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; - - /** Holds the name of the property to get the consumers message acknowledgement mode from. */ - public static final String CONSUMER_ACK_MODE_PROPNAME = "consAckMode"; - - /** Defines the default consumers message acknowledgement mode. */ - public static final int CONSUMER_ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; - - /** Holds the name of the property to get the maximum pending message size setting from. */ - public static final String MAX_PENDING_PROPNAME = "maxPending"; - - /** Defines the default value for the maximum pending message size setting. 0 means no limit. */ - public static final int MAX_PENDING_DEFAULT = 0; - - /** Defines the default prefetch size to use when consuming messages. */ - public static final int PREFETCH_DEFAULT = 100; - - /** Defines the default value of the no local flag to use when consuming messages. */ - public static final boolean NO_LOCAL_DEFAULT = false; - - /** Defines the default value of the exclusive flag to use when consuming messages. */ - public static final boolean EXCLUSIVE_DEFAULT = false; - - /** Holds the name of the property to store nanosecond timestamps in ping messages with. */ - public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp"; - - /** Holds the name of the property to get the number of message to prefill the broker with before starting the main test. */ - public static final String PREFILL_PROPNAME = "preFill"; - - /** Defines the default value for the number of messages to prefill. 0,default, no messages. */ - public static final int PREFILL_DEFAULT = 0; - - /** Holds the name of the property to get the delay to wait in ms before starting the main test after having prefilled. */ - public static final String DELAY_BEFORE_CONSUME_PROPNAME = "delayBeforeConsume"; - - /** Defines the default value for delay in ms to wait before starting thet test run. 0,default, no delay. */ - public static final long DELAY_BEFORE_CONSUME = 0; - - /** Holds the name of the property to get when no messasges should be sent. */ - public static final String CONSUME_ONLY_PROPNAME = "consumeOnly"; - - /** Defines the default value of the consumeOnly flag to use when publishing messages is not desired. */ - public static final boolean CONSUME_ONLY_DEFAULT = false; - - /** Holds the name of the property to get when no messasges should be sent. */ - public static final String SEND_ONLY_PROPNAME = "sendOnly"; - - /** Defines the default value of the consumeOnly flag to use when publishing messages is not desired. */ - public static final boolean SEND_ONLY_DEFAULT = false; - - /** Holds the default configuration properties. */ - public static ParsedProperties defaults = new ParsedProperties(); - - static - { - defaults.setPropertyIfNull(OVERRIDE_CLIENT_ID_PROPNAME, OVERRIDE_CLIENT_ID_DEAFULT); - defaults.setPropertyIfNull(FILE_PROPERTIES_PROPNAME, FILE_PROPERTIES_DEAFULT); - defaults.setPropertyIfNull(FACTORY_NAME_PROPNAME, FACTORY_NAME_DEAFULT); - defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT); - defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT); - defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT); - defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT); - defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT); - defaults.setPropertyIfNull(QUEUE_NAME_POSTFIX_PROPNAME, QUEUE_NAME_POSTFIX_DEFAULT); - defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT); - defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT); - defaults.setPropertyIfNull(CONSUMER_TRANSACTED_PROPNAME, CONSUMER_TRANSACTED_DEFAULT); - defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT); - defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT); - defaults.setPropertyIfNull(CONSUMER_ACK_MODE_PROPNAME, CONSUMER_ACK_MODE_DEFAULT); - defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT); - defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT); - defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT); - defaults.setPropertyIfNull(UNIQUE_DESTS_PROPNAME, UNIQUE_DESTS_DEFAULT); - defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT); - defaults.setPropertyIfNull(FAIL_BEFORE_COMMIT_PROPNAME, FAIL_BEFORE_COMMIT_DEFAULT); - defaults.setPropertyIfNull(FAIL_AFTER_COMMIT_PROPNAME, FAIL_AFTER_COMMIT_DEFAULT); - defaults.setPropertyIfNull(FAIL_BEFORE_SEND_PROPNAME, FAIL_BEFORE_SEND_DEFAULT); - defaults.setPropertyIfNull(FAIL_AFTER_SEND_PROPNAME, FAIL_AFTER_SEND_DEFAULT); - defaults.setPropertyIfNull(FAIL_ONCE_PROPNAME, FAIL_ONCE_DEFAULT); - defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT); - defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT); - defaults.setPropertyIfNull(NUM_CONSUMERS_PROPNAME, NUM_CONSUMERS_DEFAULT); - defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT); - defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT); - defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT); - defaults.setPropertyIfNull(PREFILL_PROPNAME, PREFILL_DEFAULT); - defaults.setPropertyIfNull(DELAY_BEFORE_CONSUME_PROPNAME, DELAY_BEFORE_CONSUME); - defaults.setPropertyIfNull(CONSUME_ONLY_PROPNAME, CONSUME_ONLY_DEFAULT); - defaults.setPropertyIfNull(SEND_ONLY_PROPNAME, SEND_ONLY_DEFAULT); - } - - /** Allows setting of client ID on the connection, rather than through the connection URL. */ - protected boolean _overrideClientId; - - /** Holds the JNDI name of the JMS connection factory. */ - protected String _factoryName; - - /** Holds the name of the properties file to configure JNDI with. */ - protected String _fileProperties; - - /** Holds the broker url. */ - protected String _brokerDetails; - - /** Holds the username to access the broker with. */ - protected String _username; - - /** Holds the password to access the broker with. */ - protected String _password; - - /** Holds the virtual host on the broker to run the tests through. */ - protected String _virtualpath; - - /** Holds the root name from which to generate test destination names. */ - protected String _destinationName; - - /** Holds the default queue name postfix value. */ - protected String _queueNamePostfix; - - /** Holds the message selector to filter the pings with. */ - protected String _selector; - - /** Holds the producers transactional mode flag. */ - protected boolean _transacted; - - /** Holds the consumers transactional mode flag. */ - protected boolean _consTransacted; - - /** Determines whether this producer sends persistent messages. */ - protected boolean _persistent; - - /** Holds the acknowledgement mode used for the producers. */ - protected int _ackMode; - - /** Holds the acknowledgement mode setting for the consumers. */ - protected int _consAckMode; - - /** Determines what size of messages this producer sends. */ - protected int _messageSize; - - /** Used to indicate that the ping loop should print out whenever it pings. */ - protected boolean _verbose; - - /** Flag used to indicate if this is a point to point or pub/sub ping client. */ - protected boolean _isPubSub; - - /** Flag used to indicate if the destinations should be unique client. */ - protected boolean _isUnique; - - /** Flag used to indicate that durable destination should be used. */ - protected boolean _isDurable; - - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */ - protected boolean _failBeforeCommit; - - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */ - protected boolean _failAfterCommit; - - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */ - protected boolean _failBeforeSend; - - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */ - protected boolean _failAfterSend; - - /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */ - protected boolean _failOnce; - - /** Holds the number of sends that should be performed in every transaction when using transactions. */ - protected int _txBatchSize; - - /** Holds the number of destinations to ping. */ - protected int _noOfDestinations; - - /** Holds the number of consumers per destination. */ - protected int _noOfConsumers; - - private int[] _consumerBatchCounts; - - /** Holds the maximum send rate in herz. */ - protected int _rate; - - /** - * Holds the size of the maximum amount of pending data that the client should buffer, sending is suspended - * if this limit is breached. - */ - protected int _maxPendingSize; - - /** - * Holds the number of messages to send during the setup phase, before the clients start consuming. - */ - private Integer _preFill; - - /** - * Holds the time in ms to wait after preFilling before starting thet test. - */ - private Long _delayBeforeConsume; - - /** - * Holds a boolean value of wither this test should just consume, i.e. skips - * sending messages, but still expects to receive the specified number. - */ - private boolean _consumeOnly; - - /** - * Holds a boolean value of wither this test should just send, i.e. skips - * consuming messages, but still creates clients just doesn't start them. - */ - private boolean _sendOnly; - - - /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */ - private static AtomicLong _correlationIdGenerator = new AtomicLong(0L); - - /** A source for providing sequential unqiue ids for instances of this class to be identifed with. */ - private static AtomicInteger _instanceIdGenerator = new AtomicInteger(0); - - /** Holds this instances unique id. */ - private int instanceId; - - /** - * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross multiple - * ping producers on the same JVM. - */ - private static Map<String, PerCorrelationId> perCorrelationIds = - Collections.synchronizedMap(new HashMap<String, PerCorrelationId>()); - - /** A convenient formatter to use when time stamping output. */ - protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); - - /** Holds the connection for the message producer. */ - protected Connection _connection; - - /** Holds the consumer connections. */ - protected Connection[] _consumerConnection; - - /** Holds the controlSession on which ping replies are received. */ - protected Session[] _consumerSession; - - /** Holds the producer controlSession, needed to create ping messages. */ - protected Session _producerSession; - - /** Holds the destination where the response messages will arrive. */ - protected Destination _replyDestination; - - /** Holds the set of destinations that this ping producer pings. */ - protected List<Destination> _pingDestinations; - - /** Used to restrict the sending rate to a specified limit. */ - protected Throttle _rateLimiter; - - /** Holds a message listener that this message listener chains all its messages to. */ - protected ChainedMessageListener _chainedMessageListener = null; - - /** - * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when - * creating multiple ping producers in the same JVM. - */ - protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger(); - - /** - * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers - * on the same JVM using this id generator will allow them to ping on the same queues. - */ - protected AtomicInteger _queueSharedID = new AtomicInteger(); - - /** Used to tell the ping loop when to terminate, it only runs while this is true. */ - protected boolean _publish = true; - - /** Holds the message producer to send the pings through. */ - protected MessageProducer _producer; - - /** Holds the message consumer to receive the ping replies through. */ - protected MessageConsumer[] _consumer; - - /** The prompt to display when asking the user to kill the broker for failover testing. */ - private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return."; - - /** Holds the name for this test client to be identified to the broker with. */ - private String _clientID; - - /** Keeps count of the total messages sent purely for debugging purposes. */ - private static AtomicInteger numSent = new AtomicInteger(); - - /** - * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected - * to wait until the number of unreceived message is reduced before continuing to send. This monitor is a - * fair SynchronousQueue becuase that provides fair scheduling, to ensure that all producer threads get an - * equal chance to produce messages. - */ - static final SynchronousQueue _sendPauseMonitor = new SynchronousQueue(true); - - /** Keeps a count of the number of message currently sent but not received. */ - static AtomicInteger _unreceived = new AtomicInteger(0); - - /** - * Creates a ping producer with the specified parameters, of which there are many. See the class level comments - * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on - * it, to send and recieve its pings and replies on. - * - * @param overrides Properties containing any desired overrides to the defaults. - * - * @throws Exception Any exceptions are allowed to fall through. - */ - public PingPongProducer(Properties overrides) throws Exception - { - // log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called"); - instanceId = _instanceIdGenerator.getAndIncrement(); - - // Create a set of parsed properties from the defaults overriden by the passed in values. - ParsedProperties properties = new ParsedProperties(defaults); - properties.putAll(overrides); - - // Extract the configuration properties to set the pinger up with. - _overrideClientId = properties.getPropertyAsBoolean(OVERRIDE_CLIENT_ID_PROPNAME); - _factoryName = properties.getProperty(FACTORY_NAME_PROPNAME); - _fileProperties = properties.getProperty(FILE_PROPERTIES_PROPNAME); - _brokerDetails = properties.getProperty(BROKER_PROPNAME); - _username = properties.getProperty(USERNAME_PROPNAME); - _password = properties.getProperty(PASSWORD_PROPNAME); - _virtualpath = properties.getProperty(VIRTUAL_HOST_PROPNAME); - _destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME); - _queueNamePostfix = properties.getProperty(QUEUE_NAME_POSTFIX_PROPNAME); - _selector = properties.getProperty(SELECTOR_PROPNAME); - _transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME); - _consTransacted = properties.getPropertyAsBoolean(CONSUMER_TRANSACTED_PROPNAME); - _persistent = properties.getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME); - _messageSize = properties.getPropertyAsInteger(MESSAGE_SIZE_PROPNAME); - _verbose = properties.getPropertyAsBoolean(VERBOSE_PROPNAME); - _failAfterCommit = properties.getPropertyAsBoolean(FAIL_AFTER_COMMIT_PROPNAME); - _failBeforeCommit = properties.getPropertyAsBoolean(FAIL_BEFORE_COMMIT_PROPNAME); - _failAfterSend = properties.getPropertyAsBoolean(FAIL_AFTER_SEND_PROPNAME); - _failBeforeSend = properties.getPropertyAsBoolean(FAIL_BEFORE_SEND_PROPNAME); - _failOnce = properties.getPropertyAsBoolean(FAIL_ONCE_PROPNAME); - _txBatchSize = properties.getPropertyAsInteger(TX_BATCH_SIZE_PROPNAME); - _noOfDestinations = properties.getPropertyAsInteger(DESTINATION_COUNT_PROPNAME); - _noOfConsumers = properties.getPropertyAsInteger(NUM_CONSUMERS_PROPNAME); - _consumerBatchCounts = new int[_noOfConsumers]; - _rate = properties.getPropertyAsInteger(RATE_PROPNAME); - _isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME); - _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME); - _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME); - _ackMode = _transacted ? 0 : properties.getPropertyAsInteger(ACK_MODE_PROPNAME); - _consAckMode = _consTransacted ? 0 : properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME); - _maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME); - _preFill = properties.getPropertyAsInteger(PREFILL_PROPNAME); - _delayBeforeConsume = properties.getPropertyAsLong(DELAY_BEFORE_CONSUME_PROPNAME); - _consumeOnly = properties.getPropertyAsBoolean(CONSUME_ONLY_PROPNAME); - _sendOnly = properties.getPropertyAsBoolean(SEND_ONLY_PROPNAME); - - // Check that one or more destinations were specified. - if (_noOfDestinations < 1) - { - throw new IllegalArgumentException("There must be at least one destination."); - } - - // Set up a throttle to control the send rate, if a rate > 0 is specified. - if (_rate > 0) - { - _rateLimiter = new BatchedThrottle(); - _rateLimiter.setRate(_rate); - } - - // Create the connection and message producers/consumers. - // establishConnection(true, true); - } - - /** - * Establishes a connection to the broker and creates message consumers and producers based on the parameters - * that this ping client was created with. - * - * @param producer Flag to indicate whether or not the producer should be set up. - * @param consumer Flag to indicate whether or not the consumers should be set up. - * - * @throws Exception Any exceptions are allowed to fall through. - */ - public void establishConnection(boolean producer, boolean consumer) throws Exception - { - // log.debug("public void establishConnection(): called"); - - // Generate a unique identifying name for this client, based on it ip address and the current time. - InetAddress address = InetAddress.getLocalHost(); - // _clientID = address.getHostName() + System.currentTimeMillis(); - _clientID = "perftest_" + instanceId; - - // Create a connection to the broker. - createConnection(_clientID); - - // Create transactional or non-transactional sessions, based on the command line arguments. - _producerSession = _connection.createSession(_transacted, _ackMode); - - _consumerSession = new Session[_noOfConsumers]; - - for (int i = 0; i < _noOfConsumers; i++) - { - _consumerSession[i] = _consumerConnection[i].createSession(_consTransacted, _consAckMode); - } - - // Create the destinations to send pings to and receive replies from. - if (_noOfConsumers > 0) - { - _replyDestination = _consumerSession[0].createTemporaryQueue(); - } - createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable); - - // Create the message producer only if instructed to. - if (producer) - { - createProducer(); - } - - // Create the message consumer only if instructed to. - if (consumer) - { - createReplyConsumers(getReplyDestinations(), _selector); - } - } - - /** - * Establishes a connection to the broker, based on the configuration parameters that this ping client was - * created with. - * - * @param clientID The clients identifier. - * - * @throws JMSException Underlying exceptions allowed to fall through. - * @throws NamingException Underlying exceptions allowed to fall through. - * @throws IOException Underlying exceptions allowed to fall through. - */ - protected void createConnection(String clientID) throws JMSException, NamingException, IOException - { - // _log.debug("protected void createConnection(String clientID = " + clientID + "): called"); - - // _log.debug("Creating a connection for the message producer."); - File propsFile = new File(_fileProperties); - InputStream is = new FileInputStream(propsFile); - Properties properties = new Properties(); - properties.load(is); - - Context context = new InitialContext(properties); - ConnectionFactory factory = (ConnectionFactory) context.lookup(_factoryName); - _connection = factory.createConnection(_username, _password); - - if (_overrideClientId) - { - _connection.setClientID(clientID); - } - - // _log.debug("Creating " + _noOfConsumers + " connections for the consumers."); - - _consumerConnection = new Connection[_noOfConsumers]; - - for (int i = 0; i < _noOfConsumers; i++) - { - _consumerConnection[i] = factory.createConnection(_username, _password); - // _consumerConnection[i].setClientID(clientID); - } - } - - /** - * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs - * to be started to bounce the pings back again. - * - * @param args The command line arguments. - */ - public static void main(String[] args) - { - try - { - Properties options = - CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties()); - - // Create a ping producer overriding its defaults with all options passed on the command line. - PingPongProducer pingProducer = new PingPongProducer(options); - pingProducer.establishConnection(true, true); - - // Start the ping producers dispatch thread running. - pingProducer._connection.start(); - - // Create a shutdown hook to terminate the ping-pong producer. - Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); - - // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. - pingProducer._connection.setExceptionListener(pingProducer); - - // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception. - Thread pingThread = new Thread(pingProducer); - pingThread.run(); - pingThread.join(); - } - catch (Exception e) - { - System.err.println(e.getMessage()); - log.error("Top level handler caught execption.", e); - System.exit(1); - } - } - - /** - * Convenience method for a short pause. - * - * @param sleepTime The time in milliseconds to pause for. - */ - public static void pause(long sleepTime) - { - if (sleepTime > 0) - { - try - { - Thread.sleep(sleepTime); - } - catch (InterruptedException ie) - { } - } - } - - /** - * Gets all the reply destinations (to listen for replies on). In this case this will just be the single reply to - * destination of this pinger. - * - * @return The single reply to destination of this pinger, wrapped in a list. - */ - public List<Destination> getReplyDestinations() - { - // log.debug("public List<Destination> getReplyDestinations(): called"); - - List<Destination> replyDestinations = new ArrayList<Destination>(); - replyDestinations.add(_replyDestination); - - // log.debug("replyDestinations = " + replyDestinations); - - return replyDestinations; - } - - /** - * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery - * flag is set accoring the ping producer creation options. - * - * @throws JMSException Any JMSExceptions are allowed to fall through. - */ - public void createProducer() throws JMSException - { - // log.debug("public void createProducer(): called"); - - _producer = (MessageProducer) _producerSession.createProducer(null); - _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - - // log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages."); - } - - /** - * Creates consumers for the specified number of destinations. The destinations themselves are also created by this - * method. - * - * @param noOfDestinations The number of destinations to create consumers for. - * @param selector The message selector to filter the consumers with. - * @param rootName The root of the name, or actual name if only one is being created. - * @param unique <tt>true</tt> to make the destinations unique to this pinger, <tt>false</tt> to share the - * numbering with all pingers on the same JVM. - * @param durable If the destinations are durable topics. - * - * @throws JMSException Any JMSExceptions are allowed to fall through. - */ - public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique, - boolean durable) throws JMSException - { - /*log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = " - + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = " - + durable + "): called");*/ - - _pingDestinations = new ArrayList<Destination>(); - - // Create the desired number of ping destinations and consumers for them. - // log.debug("Creating " + noOfDestinations + " destinations to ping."); - - for (int i = 0; i < noOfDestinations; i++) - { - Destination destination; - String id; - - // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag. - if (unique) - { - // log.debug("Creating unique destinations."); - id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID(); - } - else - { - // log.debug("Creating shared destinations."); - id = "_" + _queueSharedID.incrementAndGet(); - } - - // Check if this is a pub/sub pinger, in which case create topics. - if (_isPubSub) - { - destination = _producerSession.createTopic(rootName + id); - // log.debug("Created non-durable topic " + destination); - - if (durable) - { - _producerSession.createDurableSubscriber((Topic) destination, _connection.getClientID()); - } - } - // Otherwise this is a p2p pinger, in which case create queues. - else - { - destination = _producerSession.createQueue(rootName + id + _queueNamePostfix); - // log.debug("Created queue " + destination); - } - - // Keep the destination. - _pingDestinations.add(destination); - } - } - - /** - * Creates consumers for the specified destinations and registers this pinger to listen to their messages. - * - * @param destinations The destinations to listen to. - * @param selector A selector to filter the messages with. - * - * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through. - */ - public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException - { - /*log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations - + ", String selector = " + selector + "): called");*/ - - log.debug("There are " + destinations.size() + " destinations."); - log.debug("Creating " + _noOfConsumers + " consumers on each destination."); - log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers)); - - for (Destination destination : destinations) - { - _consumer = new MessageConsumer[_noOfConsumers]; - - // If we don't have consumers then ensure we have created the - // destination. - if (_noOfConsumers == 0) - { - _producerSession.createConsumer(destination, selector, - NO_LOCAL_DEFAULT).close(); - } - - for (int i = 0; i < _noOfConsumers; i++) - { - // Create a consumer for the destination and set this pinger to listen to its messages. - _consumer[i] = _consumerSession[i].createConsumer(destination, selector, NO_LOCAL_DEFAULT); - - final int consumerNo = i; - - _consumer[i].setMessageListener(new MessageListener() - { - public void onMessage(Message message) - { - onMessageWithConsumerNo(message, consumerNo); - } - }); - - log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination); - } - } - } - - /** - * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a - * correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected in the - * replies map. - * - * @param message The received message. - * @param consumerNo The consumer number within this test pinger instance. - */ - public void onMessageWithConsumerNo(Message message, int consumerNo) - { - // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo = " + consumerNo + "): called"); - try - { - long now = System.nanoTime(); - long timestamp = getTimestamp(message); - long pingTime = now - timestamp; - - // NDC.push("id" + instanceId + "/cons" + consumerNo); - - // Extract the messages correlation id. - String correlationID = message.getJMSCorrelationID(); - // log.debug("correlationID = " + correlationID); - - // int num = message.getIntProperty("MSG_NUM"); - // log.info("Message " + num + " received."); - - boolean isRedelivered = message.getJMSRedelivered(); - // log.debug("isRedelivered = " + isRedelivered); - - if (!isRedelivered) - { - // Countdown on the traffic light if there is one for the matching correlation id. - PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID); - - if (perCorrelationId != null) - { - CountDownLatch trafficLight = perCorrelationId.trafficLight; - - // Restart the timeout timer on every message. - perCorrelationId.timeOutStart = System.nanoTime(); - - // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID); - - // Release waiting senders if there are some and using maxPending limit. - if ((_maxPendingSize > 0)) - { - // Decrement the count of sent but not yet received messages. - int unreceived = _unreceived.decrementAndGet(); - int unreceivedSize = - (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) - / (_isPubSub ? getConsumersPerDestination() : 1); - - // log.debug("unreceived = " + unreceived); - // log.debug("unreceivedSize = " + unreceivedSize); - - // synchronized (_sendPauseMonitor) - // { - if (unreceivedSize < _maxPendingSize) - { - _sendPauseMonitor.poll(); - } - // } - } - - // Decrement the countdown latch. Before this point, it is possible that two threads might enter this - // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block - // ensures that each thread will get a unique value for the remaining messages. - long trueCount; - long remainingCount; - - synchronized (trafficLight) - { - trafficLight.countDown(); - - trueCount = trafficLight.getCount(); - remainingCount = trueCount - 1; - - if (++_consumerBatchCounts[consumerNo] == _txBatchSize) - { - _consumerBatchCounts[consumerNo] = 0; - if (_consAckMode == 2) - { - // log.debug("Doing client ack for consumer " + consumerNo + "."); - message.acknowledge(); - } - else - { - // log.debug("Trying commit for consumer " + consumerNo + "."); - commitTx(_consumerSession[consumerNo]); - // log.info("Tx committed on consumer " + consumerNo); - } - } - - // Forward the message and remaining count to any interested chained message listener. - if (_chainedMessageListener != null) - { - _chainedMessageListener.onMessage(message, (int) remainingCount, pingTime); - } - - // Check if this is the last message, in which case release any waiting producers. This is done - // after the transaction has been committed and any listeners notified. - if (trueCount == 1) - { - trafficLight.countDown(); - } - } - } - else - { - log.warn(consumerNo + " Got unexpected message with correlationId: " + correlationID); - log.warn(consumerNo + " Map contains:" + perCorrelationIds.entrySet()); - } - } - else - { - log.warn("Got redelivered message, ignoring."); - } - } - catch (Exception e) - { - log.warn("There was a Exception: " + e.getMessage(), e); - } - finally - { - // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo): ending"); - // NDC.clear(); - } - } - - public void setupCorrelationID(String correlationId, int expectedCount) - { - PerCorrelationId perCorrelationId = new PerCorrelationId(); - - // Create a count down latch to count the number of replies with. This is created before the messages are - // sent so that the replies cannot be received before the count down is created. - // One is added to this, so that the last reply becomes a special case. The special case is that the - // chained message listener must be called before this sender can be unblocked, but that decrementing the - // countdown needs to be done before the chained listener can be called. - perCorrelationId.trafficLight = new CountDownLatch(expectedCount + 1); - - perCorrelationIds.put(correlationId, perCorrelationId); - } - - - /** - * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out - * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify - * the correlation id. - * - * Can be augmented through a pre-fill property (PingPongProducer.PREFILL_PROPNAME) that will populate the destination - * with a set number of messages so the total pings sent and therefore expected will be PREFILL + numPings. - * - * If pre-fill is specified then the consumers will start paused to allow the prefilling to occur. - * - * @param message The message to send. If this is null, one is generated. - * @param numPings The number of ping messages to send. - * @param timeout The timeout in milliseconds. - * @param messageCorrelationId The message correlation id. If this is null, one is generated. - * - * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait - * for all prematurely. If we are running in noConsumer=0 so send only mode then it will return the no msgs sent. - * - * @throws JMSException All underlying JMSExceptions are allowed to fall through. - * @throws InterruptedException When interrupted by a timeout - */ - public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId) - throws JMSException, InterruptedException - { - return pingAndWaitForReply(message, numPings, 0, timeout, messageCorrelationId); - } - - public int pingAndWaitForReply(Message message, int numPings, int preFill, long timeout, String messageCorrelationId) - throws JMSException, InterruptedException - { - /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " - + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ - - int totalPingsRequested = numPings + preFill; - - // Generate a unique correlation id to put on the messages before sending them, if one was not specified. - if (messageCorrelationId == null) - { - messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet()); - - setupCorrelationID(messageCorrelationId, getExpectedNumPings(totalPingsRequested)); - } - - try - { - // NDC.push("prod"); - - PerCorrelationId perCorrelationId = perCorrelationIds.get(messageCorrelationId); - - // Set up the current time as the start time for pinging on the correlation id. This is used to determine - // timeouts. - perCorrelationId.timeOutStart = System.nanoTime(); - - // Send the specifed number of messages for this test - pingNoWaitForReply(message, numPings, messageCorrelationId); - - boolean timedOut; - boolean allMessagesReceived; - int numReplies; - - // We don't have a consumer so don't try and wait for the messages. - // this does mean that if the producerSession is !TXed then we may - // get to exit before all msgs have been received. - // - // Return the number of requested messages, this will let the test - // report a pass. - if (_noOfConsumers == 0 || _sendOnly) - { - return getExpectedNumPings(totalPingsRequested); - } - - do - { - // Block the current thread until replies to all the messages are received, or it times out. - perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS); - - // Work out how many replies were receieved. - numReplies = getExpectedNumPings(totalPingsRequested) - (int) perCorrelationId.trafficLight.getCount(); - - allMessagesReceived = numReplies == getExpectedNumPings(totalPingsRequested); - - // log.debug("numReplies = " + numReplies); - // log.debug("allMessagesReceived = " + allMessagesReceived); - - // Recheck the timeout condition. - long now = System.nanoTime(); - long lastMessageReceievedAt = perCorrelationId.timeOutStart; - timedOut = (now - lastMessageReceievedAt) > (timeout * 1000000); - - // log.debug("now = " + now); - // log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt); - } - while (!timedOut && !allMessagesReceived); - - if ((numReplies < getExpectedNumPings(totalPingsRequested)) && _verbose) - { - log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId); - } - else if (_verbose) - { - log.info("Got all replies on id, " + messageCorrelationId); - } - - // commitTx(_consumerSession); - - // log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending"); - - return numReplies; - } - // Ensure that the message countdown latch is always removed from the reply map. The reply map is long lived, - // so will be a memory leak if this is not done. - finally - { - // NDC.pop(); - perCorrelationIds.remove(messageCorrelationId); - } - } - - /** - * Sends the specified number of ping messages and does not wait for correlating replies. - * - * @param message The message to send. - * @param numPings The number of pings to send. - * @param messageCorrelationId A correlation id to place on all messages sent. - * - * @throws JMSException All underlying JMSExceptions are allowed to fall through. - */ - public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException - { - /*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings - + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ - - // If we are runnning a consumeOnly test then don't send any messages - if (_consumeOnly) - { - return; - } - - if (message == null) - { - message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent); - } - - message.setJMSCorrelationID(messageCorrelationId); - - // Set up a committed flag to detect uncommitted messages at the end of the send loop. This may occurr if the - // transaction batch size is not a factor of the number of pings. In which case an extra commit at the end is - // needed. - boolean committed = false; - - // Send all of the ping messages. - for (int i = 0; i < numPings; i++) - { - // Re-timestamp the message. - // message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); - - // Send the message, passing in the message count. - committed = sendMessage(i, message); - - // Spew out per message timings on every message sonly in verbose mode. - /*if (_verbose) - { - log.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId); - }*/ - } - - // Call commit if the send loop finished before reaching a batch size boundary so there may still be uncommitted messages. - if (!committed) - { - commitTx(_producerSession); - } - } - - /** - * Sends the sepcified message, applies rate limiting and possibly commits the current transaction. The count of - * messages sent so far must be specified and is used to round robin the ping destinations (where there are more - * than one), and to determine if the transaction batch size has been reached and the sent messages should be - * committed. - * - * @param i The count of messages sent so far in a loop of multiple calls to this send method. - * @param message The message to send. - * - * @return <tt>true</tt> if the messages were committed, <tt>false</tt> otherwise. - * - * @throws JMSException All underlyiung JMSExceptions are allowed to fall through. - */ - protected boolean sendMessage(int i, Message message) throws JMSException - { - try - { - NDC.push("id" + instanceId + "/prod"); - - // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called"); - // log.debug("_txBatchSize = " + _txBatchSize); - - // Round robin the destinations as the messages are sent. - Destination destination = _pingDestinations.get(i % _pingDestinations.size()); - - // Prompt the user to kill the broker when doing failover testing. - _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend); - - // Get the test setup for the correlation id. - String correlationID = message.getJMSCorrelationID(); - PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID); - - // If necessary, wait until the max pending message size comes within its limit. - if (_maxPendingSize > 0) - { - synchronized (_sendPauseMonitor) - { - // Used to keep track of the number of times that send has to wait. - int numWaits = 0; - - // The maximum number of waits before the test gives up and fails. This has been chosen to correspond with - // the test timeout. - int waitLimit = (int) (TIMEOUT_DEFAULT / 10000); - - while (true) - { - // Get the size estimate of sent but not yet received messages. - int unreceived = _unreceived.get(); - int unreceivedSize = - (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) - / (_isPubSub ? getConsumersPerDestination() : 1); - - // log.debug("unreceived = " + unreceived); - // log.debug("unreceivedSize = " + unreceivedSize); - // log.debug("_maxPendingSize = " + _maxPendingSize); - - if (unreceivedSize > _maxPendingSize) - { - // log.debug("unreceived size estimate over limit = " + unreceivedSize); - - // Fail the test if the send has had to wait more than the maximum allowed number of times. - if (numWaits > waitLimit) - { - String errorMessage = - "Send has had to wait for the unreceivedSize (" + unreceivedSize - + ") to come below the maxPendingSize (" + _maxPendingSize + ") more that " + waitLimit - + " times."; - log.warn(errorMessage); - throw new RuntimeException(errorMessage); - } - - // Wait on the send pause barrier for the limit to be re-established. - try - { - long start = System.nanoTime(); - // _sendPauseMonitor.wait(10000); - _sendPauseMonitor.offer(new Object(), 10000, TimeUnit.MILLISECONDS); - long end = System.nanoTime(); - - // Count the wait only if it was for > 99% of the requested wait time. - if (((float) (end - start) / (float) (10000 * 1000000L)) > 0.99) - { - numWaits++; - } - } - catch (InterruptedException e) - { - // Restore the interrupted status - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } - else - { - break; - } - } - } - } - - // Send the message either to its round robin destination, or its default destination. - // int num = numSent.incrementAndGet(); - // message.setIntProperty("MSG_NUM", num); - setTimestamp(message); - - if (destination == null) - { - _producer.send(message); - } - else - { - _producer.send(destination, message); - } - - // Increase the unreceived size, this may actually happen after the message is received. - // The unreceived size is incremented by the number of consumers that will get a copy of the message, - // in pub/sub mode. - if (_maxPendingSize > 0) - { - int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1); - // log.debug("newUnreceivedCount = " + newUnreceivedCount); - } - - // Apply message rate throttling if a rate limit has been set up. - if (_rateLimiter != null) - { - _rateLimiter.throttle(); - } - - // Call commit every time the commit batch size is reached. - boolean committed = false; - - // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent. - if (((i + 1) % _txBatchSize) == 0) - { - // log.debug("Trying commit on producer session."); - committed = commitTx(_producerSession); - } - - return committed; - } - finally - { - NDC.clear(); - } - } - - /** - * If the specified fail flag is set, this method waits for the user to cause a failure and then indicate to the - * test that the failure has occurred, before the method returns. - * - * @param failFlag The fail flag to test. - * - * @return The new value for the fail flag. If the {@link #_failOnce} flag is set, then each fail flag is only - * used once, then reset. - */ - private boolean waitForUserToPromptOnFailure(boolean failFlag) - { - if (failFlag) - { - if (_failOnce) - { - failFlag = false; - } - - // log.debug("Failing Before Send"); - waitForUser(KILL_BROKER_PROMPT); - } - - return failFlag; - } - - /** - * Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction batch - * size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared, which will - * terminate the pinger. - */ - public void pingLoop() - { - try - { - // Generate a sample message and time stamp it. - Message msg = getTestMessage(_replyDestination, _messageSize, _persistent); - // setTimestamp(msg); - - // Send the message and wait for a reply. - pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, null); - } - catch (JMSException e) - { - _publish = false; - // log.debug("There was a JMSException: " + e.getMessage(), e); - } - catch (InterruptedException e) - { - _publish = false; - // log.debug("There was an interruption: " + e.getMessage(), e); - } - } - - /** - * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set - * here. - * - * @param messageListener The chained message listener. - */ - public void setChainedMessageListener(ChainedMessageListener messageListener) - { - _chainedMessageListener = messageListener; - } - - /** Removes any chained message listeners from this pinger. */ - public void removeChainedMessageListener() - { - _chainedMessageListener = null; - } - - /** - * Generates a test message of the specified size, with the specified reply-to destination and persistence flag. - * - * @param replyQueue The reply-to destination for the message. - * @param messageSize The desired size of the message in bytes. - * @param persistent <tt>true</tt> if the message should use persistent delivery, <tt>false</tt> otherwise. - * - * @return A freshly generated test message. - * - * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through. - */ - public Message getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException - { - // return TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent); - return TestUtils.createTestMessageOfSize(_producerSession, messageSize); - } - - /** - * Sets the current time in nanoseconds as the timestamp on the message. - * - * @param msg The message to timestamp. - * - * @throws JMSException Any JMSExceptions are allowed to fall through. - */ - protected void setTimestamp(Message msg) throws JMSException - { - /*if (((AMQSession)_producerSession).isStrictAMQP()) - { - ((AMQMessage)msg).setTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME), System.nanoTime()); - } - else - {*/ - msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); - // } - } - - /** - * Extracts the nanosecond timestamp from a message. - * - * @param msg The message to extract the time stamp from. - * - * @return The timestamp in nanos. - * - * @throws JMSException Any JMSExceptions are allowed to fall through. - */ - protected long getTimestamp(Message msg) throws JMSException - { - /*if (((AMQSession)_producerSession).isStrictAMQP()) - { - Long value = ((AMQMessage)msg).getTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME)); - - return (value == null) ? 0L : value; - } - else - {*/ - return msg.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME); - // } - } - - /** - * Stops the ping loop by clearing the publish flag. The current loop will complete when it notices that this flag - * has been cleared. - */ - public void stop() - { - _publish = false; - } - - /** - * Starts the producer and consumer connections. - * - * @throws JMSException Any JMSExceptions are allowed to fall through. - */ - public void start() throws JMSException - { - // log.debug("public void start(): called"); - - _connection.start(); - // log.debug("Producer started."); - - for (int i = 0; i < _noOfConsumers; i++) - { - _consumerConnection[i].start(); - // log.debug("Consumer " + i + " started."); - } - } - - /** Implements a ping loop that repeatedly pings until the publish flag becomes false. */ - public void run() - { - // Keep running until the publish flag is cleared. - while (_publish) - { - pingLoop(); - } - } - - /** - * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the - * connection, this clears the publish flag which in turn will halt the ping loop. - * - * @param e The exception that triggered this callback method. - */ - public void onException(JMSException e) - { - // log.debug("public void onException(JMSException e = " + e + "): called", e); - _publish = false; - } - - /** - * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered - * with the runtime system as a shutdown hook. - * - * @return A shutdown hook for the ping loop. - */ - public Thread getShutdownHook() - { - return new Thread(new Runnable() - { - public void run() - { - stop(); - } - }); - } - - /** - * Closes all of the producer and consumer connections. - * - * @throws JMSException All JMSException are allowed to fall through. - */ - public void close() throws JMSException - { - // log.debug("public void close(): called"); - - try - { - if (_connection != null) - { - // log.debug("Before close producer connection."); - _connection.close(); - // log.debug("Closed producer connection."); - } - - for (int i = 0; i < _noOfConsumers; i++) - { - if (_consumerConnection[i] != null) - { - // log.debug("Before close consumer connection " + i + "."); - _consumerConnection[i].close(); - // log.debug("Closed consumer connection " + i + "."); - } - } - } - finally - { - _connection = null; - _producerSession = null; - _consumerSession = null; - _consumerConnection = null; - _producer = null; - _consumer = null; - _pingDestinations = null; - _replyDestination = null; - } - } - - /** - * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not a - * transactional controlSession, this method does nothing (unless the failover after send flag is set). - * - * <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit is - * applied. This flag applies whether the pinger is transactional or not. - * - * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the commit - * is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker after the - * commit is applied. These flags will only apply if using a transactional pinger. - * - * @param session The controlSession to commit - * - * @return <tt>true</tt> if the controlSession was committed, <tt>false</tt> if it was not. - * - * @throws javax.jms.JMSException If the commit fails and then the rollback fails. - * - * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit - * method, because commits only apply to transactional pingers, but fail after send applied to transactional and - * non-transactional alike. - */ - protected boolean commitTx(Session session) throws JMSException - { - // log.debug("protected void commitTx(Session session): called"); - - boolean committed = false; - - _failAfterSend = waitForUserToPromptOnFailure(_failAfterSend); - - if (session.getTransacted()) - { - // log.debug("Session is transacted."); - - try - { - _failBeforeCommit = waitForUserToPromptOnFailure(_failBeforeCommit); - - long start = System.nanoTime(); - session.commit(); - committed = true; - // log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms"); - - _failAfterCommit = waitForUserToPromptOnFailure(_failAfterCommit); - - // log.debug("Session Commited."); - } - catch (JMSException e) - { - // log.debug("JMSException on commit:" + e.getMessage(), e); - - try - { - session.rollback(); - // log.debug("Message rolled back."); - } - catch (JMSException jmse) - { - // log.debug("JMSE on rollback:" + jmse.getMessage(), jmse); - - // Both commit and rollback failed. Throw the rollback exception. - throw jmse; - } - } - } - - return committed; - } - - /** - * Outputs a prompt to the console and waits for the user to press return. - * - * @param prompt The prompt to display on the console. - */ - public void waitForUser(String prompt) - { - System.out.println(prompt); - - try - { - System.in.read(); - } - catch (IOException e) - { - // Ignored. - } - - System.out.println("Continuing."); - } - - /** - * Gets the number of consumers that are listening to each destination in the test. - * - * @return int The number of consumers subscribing to each topic. - */ - public int getConsumersPerDestination() - { - return _noOfConsumers; - } - - /** - * Calculates how many pings are expected to be received for the given number sent. - * - * Note : that if you have set noConsumers to 0 then this will also return 0 - * in the case of PubSub testing. This is correct as without consumers there - * will be no-one to receive the sent messages so they will be unable to respond. - * - * @param numpings The number of pings that will be sent. - * - * @return The number that should be received, for the test to pass. - */ - public int getExpectedNumPings(int numpings) - { - // Wow, I'm freaking sorry about this return here... - return ((_failAfterSend || _failBeforeCommit) ? numpings - 1: numpings) * - (_isPubSub ? getConsumersPerDestination() : 1); - } - - /** - * Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's {@link - * PingPongProducer#onMessageWithConsumerNo} method is called, the chained listener set through the {@link - * PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected count of - * messages with that correlation id. - * - * <p/>Provided only one pinger is producing messages with that correlation id, the chained listener will always be - * given unique message counts. It will always be called while the producer waiting for all messages to arrive is - * still blocked. - */ - public static interface ChainedMessageListener - { - /** - * Notifies interested listeners about message arrival and important test stats, the number of messages - * remaining in the test, and the messages send timestamp. - * - * @param message The newly arrived message. - * @param remainingCount The number of messages left to complete the test. - * @param latency The nanosecond latency of the message. - * - * @throws JMSException Any JMS exceptions is allowed to fall through. - */ - public void onMessage(Message message, int remainingCount, long latency) throws JMSException; - } - - /** - * Holds information on each correlation id. The countdown latch, the current timeout timer... More stuff to be - * added to this: read/write lock to make onMessage more concurrent as described in class header comment. - */ - protected static class PerCorrelationId - { - /** Holds a countdown on number of expected messages. */ - CountDownLatch trafficLight; - - /** Holds the last timestamp that the timeout was reset to. */ - Long timeOutStart; - } -} |