summaryrefslogtreecommitdiff
path: root/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java')
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java484
1 files changed, 279 insertions, 205 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index bd34fd8f20..99ed9f8367 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -46,6 +46,7 @@ import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -119,21 +120,11 @@ import java.util.concurrent.atomic.AtomicLong;
* <tr><td> Provide command line invocation to loop the ping cycle on a configurable broker url.
* </table>
*
- * @todo Make the message listener a static for all replies to be sent to? It won't be any more of a bottle neck than having
- * one per PingPongProducer, as will synchronize on message correlation id, allowing threads to process messages
- * concurrently for different ids. Needs to be static so that when using a chained message listener and shared
- * destinations between multiple PPPs, it gets notified about all replies, not just those that happen to be picked up
- * by the PPP that it is atteched to.
- *
* @todo Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock pair.
* Obtain read lock on all messages, before decrementing the message count. At the end of the on message method add a
* block that obtains the write lock for the very last message, releases any waiting producer. Means that the last
* message waits until all other messages have been handled before releasing producers but allows messages to be
* processed concurrently, unlike the current synchronized block.
- *
- * @todo Get rid of pauses between batches, it will impact the timing statistics, and generate meanigless timings.
- * Instead make mina use a bounded blocking buffer, or other form of back pressure, to stop data being written
- * faster than it can be sent.
*/
public class PingPongProducer implements Runnable /*, MessageListener*/, ExceptionListener
{
@@ -164,7 +155,10 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
/** Holds the transactional mode to use for the test. */
public static final boolean TRANSACTED_DEFAULT = false;
+ /** Holds the name of the property to get the test consumer transacted mode from. */
public static final String CONSUMER_TRANSACTED_PROPNAME = "consTransacted";
+
+ /** Holds the consumer transactional mode default setting. */
public static final boolean CONSUMER_TRANSACTED_DEFAULT = false;
/** Holds the name of the property to get the test broker url from. */
@@ -275,7 +269,10 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
/** Defines the default value for the unique destinations property. */
public static final boolean UNIQUE_DESTS_DEFAULT = true;
+ /** Holds the name of the property to get the durable destinations flag from. */
public static final String DURABLE_DESTS_PROPNAME = "durableDests";
+
+ /** Defines the default value of the durable destinations flag. */
public static final boolean DURABLE_DESTS_DEFAULT = false;
/** Holds the name of the proeprty to get the message acknowledgement mode from. */
@@ -284,10 +281,16 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
/** Defines the default message acknowledgement mode. */
public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
+ /** Holds the name of the property to get the consumers message acknowledgement mode from. */
public static final String CONSUMER_ACK_MODE_PROPNAME = "consAckMode";
+
+ /** Defines the default consumers message acknowledgement mode. */
public static final int CONSUMER_ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
+ /** Holds the name of the property to get the maximum pending message size setting from. */
public static final String MAX_PENDING_PROPNAME = "maxPending";
+
+ /** Defines the default value for the maximum pending message size setting. 0 means no limit. */
public static final int MAX_PENDING_DEFAULT = 0;
/** Defines the default prefetch size to use when consuming messages. */
@@ -336,21 +339,37 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
}
+ /** Holds the broker url. */
protected String _brokerDetails;
+
+ /** Holds the username to access the broker with. */
protected String _username;
+
+ /** Holds the password to access the broker with. */
protected String _password;
+
+ /** Holds the virtual host on the broker to run the tests through. */
protected String _virtualpath;
+
+ /** Holds the root name from which to generate test destination names. */
protected String _destinationName;
+
+ /** Holds the message selector to filter the pings with. */
protected String _selector;
+
+ /** Holds the producers transactional mode flag. */
protected boolean _transacted;
+
+ /** Holds the consumers transactional mode flag. */
protected boolean _consTransacted;
/** Determines whether this producer sends persistent messages. */
protected boolean _persistent;
- /** Holds the acknowledgement mode used for sending and receiving messages. */
+ /** Holds the acknowledgement mode used for the producers. */
protected int _ackMode;
+ /** Holds the acknowledgement mode setting for the consumers. */
protected int _consAckMode;
/** Determines what size of messages this producer sends. */
@@ -401,18 +420,15 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
*/
protected int _maxPendingSize;
- /**
- * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected
- * to wait until the number of unreceived message is reduced before continuing to send.
- */
- protected Object _sendPauseMonitor = new Object();
-
- /** Keeps a count of the number of message currently sent but not received. */
- protected AtomicInteger _unreceived = new AtomicInteger(0);
-
/** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
+ /** A source for providing sequential unqiue ids for instances of this class to be identifed with. */
+ private static AtomicInteger _instanceIdGenerator = new AtomicInteger(0);
+
+ /** Holds this instances unique id. */
+ private int instanceId;
+
/**
* Holds a map from message ids to latches on which threads wait for replies. This map is shared accross multiple
* ping producers on the same JVM.
@@ -423,7 +439,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
/** A convenient formatter to use when time stamping output. */
protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
- /** Holds the connection to the broker. */
+ /** Holds the connection for the message producer. */
protected Connection _connection;
/** Holds the consumer connections. */
@@ -470,23 +486,37 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
/** The prompt to display when asking the user to kill the broker for failover testing. */
private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return.";
+
+ /** Holds the name for this test client to be identified to the broker with. */
private String _clientID;
/** Keeps count of the total messages sent purely for debugging purposes. */
private static AtomicInteger numSent = new AtomicInteger();
/**
+ * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected
+ * to wait until the number of unreceived message is reduced before continuing to send. This monitor is a
+ * fair SynchronousQueue becuase that provides fair scheduling, to ensure that all producer threads get an
+ * equal chance to produce messages.
+ */
+ static final SynchronousQueue _sendPauseMonitor = new SynchronousQueue(true);
+
+ /** Keeps a count of the number of message currently sent but not received. */
+ static AtomicInteger _unreceived = new AtomicInteger(0);
+
+ /**
* Creates a ping producer with the specified parameters, of which there are many. See the class level comments
* for details. This constructor creates a connection to the broker and creates producer and consumer sessions on
* it, to send and recieve its pings and replies on.
*
- * @param overrides Properties containing any desired overrides to the defaults.
+ * @param overrides Properties containing any desired overrides to the defaults.
*
* @throws Exception Any exceptions are allowed to fall through.
*/
public PingPongProducer(Properties overrides) throws Exception
{
// log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called");
+ instanceId = _instanceIdGenerator.getAndIncrement();
// Create a set of parsed properties from the defaults overriden by the passed in values.
ParsedProperties properties = new ParsedProperties(defaults);
@@ -516,8 +546,8 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
_isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME);
_isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME);
_isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME);
- _ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
- _consAckMode = properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME);
+ _ackMode = _transacted ? 0 : properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
+ _consAckMode = _consTransacted ? 0 : properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME);
_maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME);
// Check that one or more destinations were specified.
@@ -706,13 +736,15 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
* Creates consumers for the specified number of destinations. The destinations themselves are also created by this
* method.
*
- * @param noOfDestinations The number of destinations to create consumers for.
- * @param selector The message selector to filter the consumers with.
- * @param rootName The root of the name, or actual name if only one is being created.
- * @param unique <tt>true</tt> to make the destinations unique to this pinger, <tt>false</tt> to share the
- * numbering with all pingers on the same JVM.
+ * @param noOfDestinations The number of destinations to create consumers for.
+ * @param selector The message selector to filter the consumers with.
+ * @param rootName The root of the name, or actual name if only one is being created.
+ * @param unique <tt>true</tt> to make the destinations unique to this pinger, <tt>false</tt> to share the
+ * numbering with all pingers on the same JVM.
+ * @param durable If the destinations are durable topics.
*
* @throws JMSException Any JMSExceptions are allowed to fall through.
+ * @throws AMQException Any AMQExceptions are allowed to fall through.
*/
public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique,
boolean durable) throws JMSException, AMQException
@@ -792,9 +824,9 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
/*log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
+ ", String selector = " + selector + "): called");*/
- // log.debug("There are " + destinations.size() + " destinations.");
- // log.debug("Creating " + _noOfConsumers + " consumers on each destination.");
- // log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers));
+ log.debug("There are " + destinations.size() + " destinations.");
+ log.debug("Creating " + _noOfConsumers + " consumers on each destination.");
+ log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers));
for (Destination destination : destinations)
{
@@ -817,7 +849,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
}
});
- // log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination);
+ log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination);
}
}
}
@@ -827,7 +859,8 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
* correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected in the
* replies map.
*
- * @param message The received message.
+ * @param message The received message.
+ * @param consumerNo The consumer number within this test pinger instance.
*/
public void onMessageWithConsumerNo(Message message, int consumerNo)
{
@@ -838,13 +871,13 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
long timestamp = getTimestamp(message);
long pingTime = now - timestamp;
- // NDC.push("cons" + consumerNo);
+ // NDC.push("id" + instanceId + "/cons" + consumerNo);
// Extract the messages correlation id.
String correlationID = message.getJMSCorrelationID();
// log.debug("correlationID = " + correlationID);
- int num = message.getIntProperty("MSG_NUM");
+ // int num = message.getIntProperty("MSG_NUM");
// log.info("Message " + num + " received.");
boolean isRedelivered = message.getJMSRedelivered();
@@ -864,11 +897,32 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
// log.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
+ // Release waiting senders if there are some and using maxPending limit.
+ if ((_maxPendingSize > 0))
+ {
+ // Decrement the count of sent but not yet received messages.
+ int unreceived = _unreceived.decrementAndGet();
+ int unreceivedSize =
+ (unreceived * ((_messageSize == 0) ? 1 : _messageSize))
+ / (_isPubSub ? getConsumersPerDestination() : 1);
+
+ // log.debug("unreceived = " + unreceived);
+ // log.debug("unreceivedSize = " + unreceivedSize);
+
+ // synchronized (_sendPauseMonitor)
+ // {
+ if (unreceivedSize < _maxPendingSize)
+ {
+ _sendPauseMonitor.poll();
+ }
+ // }
+ }
+
// Decrement the countdown latch. Before this point, it is possible that two threads might enter this
// method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
// ensures that each thread will get a unique value for the remaining messages.
- long trueCount = -1;
- long remainingCount = -1;
+ long trueCount;
+ long remainingCount;
synchronized (trafficLight)
{
@@ -877,51 +931,33 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
trueCount = trafficLight.getCount();
remainingCount = trueCount - 1;
- // Decrement the count of sent but not yet received messages.
- int unreceived = _unreceived.decrementAndGet();
- int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize));
-
- // Release a waiting sender if there is one.
- synchronized (_sendPauseMonitor)
- {
- if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize))
- // && (_sendPauseBarrier.getNumberWaiting() == 1))
- {
- // log.debug("unreceived size estimate under limit = " + unreceivedSize);
-
- // Wait on the send pause barrier for the limit to be re-established.
- /*try
- {*/
- // _sendPauseBarrier.await();
- _sendPauseMonitor.notify();
- /*}
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- catch (BrokenBarrierException e)
- {
- throw new RuntimeException(e);
- }*/
- }
- }
-
// NDC.push("/rem" + remainingCount);
// log.debug("remainingCount = " + remainingCount);
// log.debug("trueCount = " + trueCount);
- // Commit on transaction batch size boundaries. At this point in time the waiting producer remains
- // blocked, even on the last message.
+ // Commit on transaction batch size boundaries. At this point in time the waiting producer
+ // remains blocked, even on the last message.
// Commit count is divided by noOfConsumers in p2p mode, so that each consumer only commits on
// each batch boundary. For pub/sub each consumer gets every message so no division is done.
+ // When running in client ack mode, an ack is done instead of a commit, on the commit batch
+ // size boundaries.
long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers);
// log.debug("commitCount = " + commitCount);
if ((commitCount % _txBatchSize) == 0)
{
- // log.debug("Trying commit for consumer " + consumerNo + ".");
- commitTx(_consumerSession[consumerNo]);
+ if (_consAckMode == 2)
+ {
+ // log.debug("Doing client ack for consumer " + consumerNo + ".");
+ message.acknowledge();
+ }
+ else
+ {
+ // log.debug("Trying commit for consumer " + consumerNo + ".");
+ commitTx(_consumerSession[consumerNo]);
+ // log.info("Tx committed on consumer " + consumerNo);
+ }
}
// Forward the message and remaining count to any interested chained message listener.
@@ -947,18 +983,6 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
{
log.warn("Got redelivered message, ignoring.");
}
-
- // Print out ping times for every message in verbose mode only.
- /*if (_verbose)
- {
- Long timestamp = message.getLongProperty(MESSAGE_TIMESTAMP_PROPNAME);
-
- if (timestamp != null)
- {
- long diff = System.nanoTime() - timestamp;
- //log.trace("Time for round trip (nanos): " + diff);
- }
- }*/
}
catch (JMSException e)
{
@@ -1020,9 +1044,9 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
// Send the specifed number of messages.
pingNoWaitForReply(message, numPings, messageCorrelationId);
- boolean timedOut = false;
- boolean allMessagesReceived = false;
- int numReplies = 0;
+ boolean timedOut;
+ boolean allMessagesReceived;
+ int numReplies;
do
{
@@ -1100,9 +1124,6 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
// Send all of the ping messages.
for (int i = 0; i < numPings; i++)
{
- // Reset the committed flag to indicate that there may be uncommitted messages.
- committed = false;
-
// Re-timestamp the message.
// message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
@@ -1138,99 +1159,161 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
*/
protected boolean sendMessage(int i, Message message) throws JMSException
{
- // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called");
- // log.debug("_txBatchSize = " + _txBatchSize);
+ try
+ {
+ NDC.push("id" + instanceId + "/prod");
- // Round robin the destinations as the messages are sent.
- Destination destination = _pingDestinations.get(i % _pingDestinations.size());
+ // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called");
+ // log.debug("_txBatchSize = " + _txBatchSize);
- // Prompt the user to kill the broker when doing failover testing.
- if (_failBeforeSend)
- {
- if (_failOnce)
- {
- _failBeforeSend = false;
- }
+ // Round robin the destinations as the messages are sent.
+ Destination destination = _pingDestinations.get(i % _pingDestinations.size());
- // log.trace("Failing Before Send");
- waitForUser(KILL_BROKER_PROMPT);
- }
+ // Prompt the user to kill the broker when doing failover testing.
+ _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend);
- // If necessary, wait until the max pending message size comes within its limit.
- synchronized (_sendPauseMonitor)
- {
- while ((_maxPendingSize > 0))
- {
- // Get the size estimate of sent but not yet received messages.
- int unreceived = _unreceived.get();
- int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize));
+ // Get the test setup for the correlation id.
+ String correlationID = message.getJMSCorrelationID();
+ PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID);
- if (unreceivedSize > _maxPendingSize)
+ // If necessary, wait until the max pending message size comes within its limit.
+ if (_maxPendingSize > 0)
+ {
+ synchronized (_sendPauseMonitor)
{
- // log.debug("unreceived size estimate over limit = " + unreceivedSize);
+ // Used to keep track of the number of times that send has to wait.
+ int numWaits = 0;
- // Wait on the send pause barrier for the limit to be re-established.
- try
- {
- // _sendPauseBarrier.await();
- _sendPauseMonitor.wait(1000);
- }
- catch (InterruptedException e)
+ // The maximum number of waits before the test gives up and fails. This has been chosen to correspond with
+ // the test timeout.
+ int waitLimit = (int) (TIMEOUT_DEFAULT / 10000);
+
+ while (true)
{
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
+ // Get the size estimate of sent but not yet received messages.
+ int unreceived = _unreceived.get();
+ int unreceivedSize =
+ (unreceived * ((_messageSize == 0) ? 1 : _messageSize))
+ / (_isPubSub ? getConsumersPerDestination() : 1);
+
+ // log.debug("unreceived = " + unreceived);
+ // log.debug("unreceivedSize = " + unreceivedSize);
+ // log.debug("_maxPendingSize = " + _maxPendingSize);
+
+ if (unreceivedSize > _maxPendingSize)
+ {
+ // log.debug("unreceived size estimate over limit = " + unreceivedSize);
+
+ // Fail the test if the send has had to wait more than the maximum allowed number of times.
+ if (numWaits > waitLimit)
+ {
+ String errorMessage =
+ "Send has had to wait for the unreceivedSize (" + unreceivedSize
+ + ") to come below the maxPendingSize (" + _maxPendingSize + ") more that " + waitLimit
+ + " times.";
+ log.warn(errorMessage);
+ throw new RuntimeException(errorMessage);
+ }
+
+ // Wait on the send pause barrier for the limit to be re-established.
+ try
+ {
+ long start = System.nanoTime();
+ // _sendPauseMonitor.wait(10000);
+ _sendPauseMonitor.offer(new Object(), 10000, TimeUnit.MILLISECONDS);
+ long end = System.nanoTime();
+
+ // Count the wait only if it was for > 99% of the requested wait time.
+ if (((float) (end - start) / (float) (10000 * 1000000L)) > 0.99)
+ {
+ numWaits++;
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ else
+ {
+ break;
+ }
}
- /*catch (BrokenBarrierException e)
- {
- throw new RuntimeException(e);
- }*/
- }
- else
- {
- break;
}
}
- }
- // Send the message either to its round robin destination, or its default destination.
- if (destination == null)
- {
- int num = numSent.incrementAndGet();
- message.setIntProperty("MSG_NUM", num);
- setTimestamp(message);
- _producer.send(message);
- // log.info("Message " + num + " sent.");
- }
- else
- {
- int num = numSent.incrementAndGet();
- message.setIntProperty("MSG_NUM", num);
+ // Send the message either to its round robin destination, or its default destination.
+ // int num = numSent.incrementAndGet();
+ // message.setIntProperty("MSG_NUM", num);
setTimestamp(message);
- _producer.send(destination, message);
- // log.info("Message " + num + " sent.");
- }
- // Increase the unreceived size, this may actually happen aftern the message is recevied.
- _unreceived.getAndIncrement();
+ if (destination == null)
+ {
+ _producer.send(message);
+ }
+ else
+ {
+ _producer.send(destination, message);
+ }
- // Apply message rate throttling if a rate limit has been set up.
- if (_rateLimiter != null)
+ // Increase the unreceived size, this may actually happen after the message is received.
+ // The unreceived size is incremented by the number of consumers that will get a copy of the message,
+ // in pub/sub mode.
+ if (_maxPendingSize > 0)
+ {
+ int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1);
+ // log.debug("newUnreceivedCount = " + newUnreceivedCount);
+ }
+
+ // Apply message rate throttling if a rate limit has been set up.
+ if (_rateLimiter != null)
+ {
+ _rateLimiter.throttle();
+ }
+
+ // Call commit every time the commit batch size is reached.
+ boolean committed = false;
+
+ // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent.
+ if (((i + 1) % _txBatchSize) == 0)
+ {
+ // log.debug("Trying commit on producer session.");
+ committed = commitTx(_producerSession);
+ }
+
+ return committed;
+ }
+ finally
{
- _rateLimiter.throttle();
+ NDC.clear();
}
+ }
- // Call commit every time the commit batch size is reached.
- boolean committed = false;
-
- // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent.
- if (((i + 1) % _txBatchSize) == 0)
+ /**
+ * If the specified fail flag is set, this method waits for the user to cause a failure and then indicate to the
+ * test that the failure has occurred, before the method returns.
+ *
+ * @param failFlag The fail flag to test.
+ *
+ * @return The new value for the fail flag. If the {@link #_failOnce} flag is set, then each fail flag is only
+ * used once, then reset.
+ */
+ private boolean waitForUserToPromptOnFailure(boolean failFlag)
+ {
+ if (failFlag)
{
- // log.debug("Trying commit on producer session.");
- committed = commitTx(_producerSession);
+ if (_failOnce)
+ {
+ failFlag = false;
+ }
+
+ // log.debug("Failing Before Send");
+ waitForUser(KILL_BROKER_PROMPT);
}
- return committed;
+ return failFlag;
}
/**
@@ -1291,15 +1374,16 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
*/
public Message getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException
{
- ObjectMessage msg = TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent);
-
- // Timestamp the message in nanoseconds.
-
- // setTimestamp(msg);
-
- return msg;
+ return TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent);
}
+ /**
+ * Sets the current time in nanoseconds as the timestamp on the message.
+ *
+ * @param msg The message to timestamp.
+ *
+ * @throws JMSException Any JMSExceptions are allowed to fall through.
+ */
protected void setTimestamp(Message msg) throws JMSException
{
if (((AMQSession) _producerSession).isStrictAMQP())
@@ -1312,9 +1396,17 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
}
}
+ /**
+ * Extracts the nanosecond timestamp from a message.
+ *
+ * @param msg The message to extract the time stamp from.
+ *
+ * @return The timestamp in nanos.
+ *
+ * @throws JMSException Any JMSExceptions are allowed to fall through.
+ */
protected long getTimestamp(Message msg) throws JMSException
{
-
if (((AMQSession) _producerSession).isStrictAMQP())
{
Long value = ((AMQMessage) msg).getTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME));
@@ -1328,7 +1420,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
}
/**
- * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this flag
+ * Stops the ping loop by clearing the publish flag. The current loop will complete when it notices that this flag
* has been cleared.
*/
public void stop()
@@ -1336,13 +1428,22 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
_publish = false;
}
+ /**
+ * Starts the producer and consumer connections.
+ *
+ * @throws JMSException Any JMSExceptions are allowed to fall through.
+ */
public void start() throws JMSException
{
+ // log.debug("public void start(): called");
+
_connection.start();
+ // log.debug("Producer started.");
for (int i = 0; i < _noOfConsumers; i++)
{
_consumerConnection[i].start();
+ // log.debug("Consumer " + i + " started.");
}
}
@@ -1386,7 +1487,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
}
/**
- * Closes the pingers connection.
+ * Closes all of the producer and consumer connections.
*
* @throws JMSException All JMSException are allowed to fall through.
*/
@@ -1398,16 +1499,18 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
{
if (_connection != null)
{
+ // log.debug("Before close producer connection.");
_connection.close();
- // log.debug("Close connection.");
+ // log.debug("Closed producer connection.");
}
for (int i = 0; i < _noOfConsumers; i++)
{
if (_consumerConnection[i] != null)
{
+ // log.debug("Before close consumer connection " + i + ".");
_consumerConnection[i].close();
- // log.debug("Closed consumer connection.");
+ // log.debug("Closed consumer connection " + i + ".");
}
}
}
@@ -1451,18 +1554,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
boolean committed = false;
- // log.trace("Batch time reached");
- if (_failAfterSend)
- {
- // log.trace("Batch size reached");
- if (_failOnce)
- {
- _failAfterSend = false;
- }
-
- // log.trace("Failing After Send");
- waitForUser(KILL_BROKER_PROMPT);
- }
+ _failAfterSend = waitForUserToPromptOnFailure(_failAfterSend);
if (session.getTransacted())
{
@@ -1470,32 +1562,14 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
try
{
- if (_failBeforeCommit)
- {
- if (_failOnce)
- {
- _failBeforeCommit = false;
- }
-
- // log.trace("Failing Before Commit");
- waitForUser(KILL_BROKER_PROMPT);
- }
+ _failBeforeCommit = waitForUserToPromptOnFailure(_failBeforeCommit);
long start = System.nanoTime();
session.commit();
committed = true;
// log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms");
- if (_failAfterCommit)
- {
- if (_failOnce)
- {
- _failAfterCommit = false;
- }
-
- // log.trace("Failing After Commit");
- waitForUser(KILL_BROKER_PROMPT);
- }
+ _failAfterCommit = waitForUserToPromptOnFailure(_failAfterCommit);
// log.debug("Session Commited.");
}