summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-01-25 10:04:52 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-01-25 10:04:52 +0000
commit48332965a98fb8ac51323205ac644ce8b9e3757e (patch)
tree740b5f547c813a1c999745e42a4b16e8a3850e87
parent8601949672c7ed0e53c4a99a6e5d285682b65a74 (diff)
downloadqpid-python-48332965a98fb8ac51323205ac644ce8b9e3757e.tar.gz
Race condition fixed fro AsyncTestPerf
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@499716 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java85
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java542
2 files changed, 332 insertions, 295 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index be46c1805b..1368f631fb 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -76,7 +76,9 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
*/
protected static final int DEFAULT_MESSAGE_SIZE = 0;
- /** This is set and used when the test is for multiple-destinations */
+ /**
+ * This is set and used when the test is for multiple-destinations
+ */
protected static final int DEFAULT_DESTINATION_COUNT = 0;
protected static final int DEFAULT_RATE = 0;
@@ -202,10 +204,10 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
_throttleBatchSize = (int) Math.pow(100, x);
int throttleRate = rate / _throttleBatchSize;
- _logger.info("rate = " + rate);
- _logger.info("x = " + x);
- _logger.info("_throttleBatchSize = " + _throttleBatchSize);
- _logger.info("throttleRate = " + throttleRate);
+ _logger.debug("rate = " + rate);
+ _logger.debug("x = " + x);
+ _logger.debug("_throttleBatchSize = " + _throttleBatchSize);
+ _logger.debug("throttleRate = " + throttleRate);
rateLimiter = new Throttle();
rateLimiter.setRate(throttleRate);
@@ -519,6 +521,11 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
if (trafficLight != null)
{
+ if (_messageListener != null)
+ {
+ _messageListener.onMessage(message);
+ }
+
_logger.trace("Reply was expected, decrementing the latch for the id.");
trafficLight.countDown();
@@ -529,11 +536,6 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
commitTx(getConsumerSession());
}
- if (_messageListener != null)
- {
- _messageListener.onMessage(message);
- }
-
}
else
{
@@ -570,32 +572,39 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
*/
public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException
{
- // Put a unique correlation id on the message before sending it.
- String messageCorrelationId = Long.toString(getNewID());
+ String messageCorrelationId = null;
- pingNoWaitForReply(message, numPings, messageCorrelationId);
+ try
+ {
+ // Put a unique correlation id on the message before sending it.
+ messageCorrelationId = Long.toString(getNewID());
- CountDownLatch trafficLight = trafficLights.get(messageCorrelationId);
- // Block the current thread until a reply to the message is received, or it times out.
- trafficLight.await(timeout, TimeUnit.MILLISECONDS);
+ pingNoWaitForReply(message, numPings, messageCorrelationId);
- trafficLights.remove(messageCorrelationId);
+ CountDownLatch trafficLight = trafficLights.get(messageCorrelationId);
+ // Block the current thread until a reply to the message is received, or it times out.
+ trafficLight.await(timeout, TimeUnit.MILLISECONDS);
- // Work out how many replies were receieved.
- int numReplies = numPings - (int) trafficLight.getCount();
+ // Work out how many replies were receieved.
+ int numReplies = numPings - (int) trafficLight.getCount();
- if ((numReplies < numPings) && _verbose)
- {
- _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
+ if ((numReplies < numPings) && _verbose)
+ {
+ _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
+ }
+ else if (_verbose)
+ {
+ _logger.info("Got all replies on id, " + messageCorrelationId);
+ }
+
+ commitTx(getConsumerSession());
+
+ return numReplies;
}
- else if (_verbose)
+ finally
{
- _logger.info("Got all replies on id, " + messageCorrelationId);
+ removeLock(messageCorrelationId);
}
-
- commitTx(getConsumerSession());
-
- return numReplies;
}
public long getNewID()
@@ -603,14 +612,20 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
return idGenerator.incrementAndGet();
}
+ public CountDownLatch removeLock(String correlationID)
+ {
+ return trafficLights.remove(correlationID);
+ }
+
+
/*
- * Sends the specified ping message but does not wait for a correlating reply.
- *
- * @param message The message to send.
- * @param numPings The number of pings to send.
- * @return The reply, or null if no reply arrives before the timeout.
- * @throws JMSException All underlying JMSExceptions are allowed to fall through.
- */
+ * Sends the specified ping message but does not wait for a correlating reply.
+ *
+ * @param message The message to send.
+ * @param numPings The number of pings to send.
+ * @return The reply, or null if no reply arrives before the timeout.
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException, InterruptedException
{
// Create a count down latch to count the number of replies with. This is created before the message is sent
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
index c76f361f99..1a63518ef4 100644
--- a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
@@ -20,8 +20,8 @@
*/
package org.apache.qpid.ping;
-//import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
-//import uk.co.thebadgerset.junit.extensions.TimingController;
+import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
+import uk.co.thebadgerset.junit.extensions.TimingController;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
@@ -33,15 +33,17 @@ import junit.framework.Test;
import junit.framework.TestSuite;
import org.apache.log4j.Logger;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.TimeUnit;
-public class PingAsyncTestPerf extends PingTestPerf //implements TimingControllerAware
+public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerAware
{
-// private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class);
+ private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class);
-// private TimingController _timingController;
+ private TimingController _timingController;
-// private AsyncMessageListener _listener;
+ private AsyncMessageListener _listener;
private volatile boolean _done = false;
@@ -50,260 +52,280 @@ public class PingAsyncTestPerf extends PingTestPerf //implements TimingControlle
super(name);
}
-// /**
-// * Compile all the tests into a test suite.
-// */
-// public static Test suite()
-// {
-// // Build a new test suite
-// TestSuite suite = new TestSuite("Ping Performance Tests");
-//
-// // Run performance tests in read committed mode.
-// suite.addTest(new PingAsyncTestPerf("testAsyncPingOk"));
-//
-// return suite;
-// }
-//
-// protected void setUp() throws Exception
-// {
-// // Create the test setups on a per thread basis, only if they have not already been created.
-//
-// if (threadSetup.get() == null)
-// {
-// PerThreadSetup perThreadSetup = new PerThreadSetup();
-//
-// // Extract the test set up paramaeters.
-// String brokerDetails = testParameters.getProperty(BROKER_PROPNAME);
-// String username = "guest";
-// String password = "guest";
-// String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
-// int destinationscount = Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME));
-// String destinationname = testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME);
-// boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
-// boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
-// String selector = null;
-// boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
-// int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
-// int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
-// boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME));
-//
-//
-// boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
-// boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
-// boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND));
-// boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND));
-// boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE));
-//
-// int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE));
-// int commitbatchSize = Integer.parseInt(testParameters.getProperty(COMMIT_BATCH_SIZE));
-//
-// // This is synchronized because there is a race condition, which causes one connection to sleep if
-// // all threads try to create connection concurrently
-// synchronized (this)
-// {
-// // Establish a client to ping a Queue and listen the reply back from same Queue
-// perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath,
-// destinationname, selector, transacted, persistent,
-// messageSize, verbose,
-// afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
-// commitbatchSize, destinationscount, rate, pubsub);
-//
-//
-// _listener = new AsyncMessageListener(batchSize);
-//
-// perThreadSetup._pingItselfClient.setMessageListener(_listener);
-// // Start the client connection
-// perThreadSetup._pingItselfClient.getConnection().start();
-//
-// // Attach the per-thread set to the thread.
-// threadSetup.set(perThreadSetup);
-// }
-// }
-// }
-//
-//
-// public void testAsyncPingOk(int numPings)
-// {
-// _timingController = this.getTimingController();
-//
-// _listener.setTotalMessages(numPings);
-//
-// PerThreadSetup perThreadSetup = threadSetup.get();
-// if (numPings == 0)
-// {
-// _logger.error("Number of pings requested was zero.");
-// }
-//
-// // Generate a sample message. This message is already time stamped and has its reply-to destination set.
-// ObjectMessage msg = null;
-//
-// try
-// {
-// msg = perThreadSetup._pingItselfClient.getTestMessage(null,
-// Integer.parseInt(testParameters.getProperty(
-// MESSAGE_SIZE_PROPNAME)),
-// Boolean.parseBoolean(testParameters.getProperty(
-// PERSISTENT_MODE_PROPNAME)));
-// }
-// catch (JMSException e)
-// {
-//
-// }
-//
-// // start the test
-// long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME));
-//
-// String correlationID = Long.toString(perThreadSetup._pingItselfClient.getNewID());
-//
-// try
-// {
-// _logger.debug("Sending messages");
-//
-// perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, numPings, correlationID);
-//
-// _logger.debug("All sent");
-// }
-// catch (JMSException e)
-// {
-// e.printStackTrace();
-// Assert.fail("JMS Exception Recevied" + e);
-// }
-// catch (InterruptedException e)
-// {
-// e.printStackTrace();
-// }
-//
-// while (!_done)
-// {
-// try
-// {
-// _logger.debug("Awating test finish");
-//
-// perThreadSetup._pingItselfClient.getEndLock(correlationID).await();
-//
-//
-// }
-// catch (InterruptedException e)
-// {
-// //ignore
-// }
-// }
-//
-// // Fail the test if the timeout was exceeded.
-// int numReplies = _listener.getReplyCount();
-//
-// _logger.info("Test Finished");
-//
-// if (numReplies != numPings)
-// {
-//
-// try
-// {
-// perThreadSetup._pingItselfClient.commitTx(perThreadSetup._pingItselfClient.getConsumerSession());
-// }
-// catch (JMSException e)
-// {
-// _logger.error("Error commiting recevied messages", e);
-// }
-// try
-// {
-// _timingController.completeTest(false, numPings - numReplies);
-// }
-// catch (InterruptedException e)
-// {
-// //ignore
-// }
-// Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies);
-// }
-// }
-//
-// public void setTimingController(TimingController timingController)
-// {
-// _timingController = timingController;
-// }
-//
-// public TimingController getTimingController()
-// {
-// return _timingController;
-// }
-//
-//
-// private class AsyncMessageListener implements MessageListener
-// {
-// private int _messageReceived;
-// private int _totalMessages;
-// private int _batchSize;
-//
-// public AsyncMessageListener(int batchSize, int totalMessages)
-// {
-// _batchSize = batchSize;
-// _totalMessages = totalMessages;
-// _messageReceived = 0;
-// }
-//
-// public AsyncMessageListener(int batchSize)
-// {
-// _batchSize = batchSize;
-// _totalMessages = -1;
-// _messageReceived = 0;
-// }
-//
-// public void setTotalMessages(int newTotal)
-// {
-// _totalMessages = newTotal;
-// }
-//
-// public void onMessage(Message message)
-// {
-// _logger.trace("Message Recevied");
-//
-// _messageReceived++;
-//
-// try
-// {
-// if (_messageReceived % _batchSize == 0)
-// {
-// if (_timingController != null)
-// {
-// _timingController.completeTest(true, _batchSize);
-// }
-// }
-// }
-// catch (InterruptedException e)
-// {
-//// _logger.error("Interupted");
-//// doDone();
-// }
-//
-// if (_totalMessages == -1 || _messageReceived == _totalMessages)
-// {
-// _logger.info("Test Completed.. signalling");
+ /**
+ * Compile all the tests into a test suite.
+ */
+ public static Test suite()
+ {
+ // Build a new test suite
+ TestSuite suite = new TestSuite("Ping Performance Tests");
+
+ // Run performance tests in read committed mode.
+ suite.addTest(new PingAsyncTestPerf("testAsyncPingOk"));
+
+ return suite;
+ }
+
+ protected void setUp() throws Exception
+ {
+ // Create the test setups on a per thread basis, only if they have not already been created.
+
+ if (threadSetup.get() == null)
+ {
+ PerThreadSetup perThreadSetup = new PerThreadSetup();
+
+ // Extract the test set up paramaeters.
+ String brokerDetails = testParameters.getProperty(BROKER_PROPNAME);
+ String username = "guest";
+ String password = "guest";
+ String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
+ int destinationscount = Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME));
+ String destinationname = testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME);
+ boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
+ boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
+ String selector = null;
+ boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
+ int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
+ int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
+ boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME));
+
+
+ boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
+ boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
+ boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND));
+ boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND));
+ boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE));
+
+ int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE));
+ int commitbatchSize = Integer.parseInt(testParameters.getProperty(COMMIT_BATCH_SIZE));
+
+ // This is synchronized because there is a race condition, which causes one connection to sleep if
+ // all threads try to create connection concurrently
+ synchronized (this)
+ {
+ // Establish a client to ping a Queue and listen the reply back from same Queue
+ perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath,
+ destinationname, selector, transacted, persistent,
+ messageSize, verbose,
+ afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
+ commitbatchSize, destinationscount, rate, pubsub);
+
+
+ _listener = new AsyncMessageListener(batchSize);
+
+ perThreadSetup._pingItselfClient.setMessageListener(_listener);
+ // Start the client connection
+ perThreadSetup._pingItselfClient.getConnection().start();
+
+ // Attach the per-thread set to the thread.
+ threadSetup.set(perThreadSetup);
+ }
+ }
+ }
+
+
+ public void testAsyncPingOk(int numPings)
+ {
+ _timingController = this.getTimingController();
+
+ _listener.setTotalMessages(numPings);
+
+ PerThreadSetup perThreadSetup = threadSetup.get();
+ if (numPings == 0)
+ {
+ _logger.error("Number of pings requested was zero.");
+ }
+
+ // Generate a sample message. This message is already time stamped and has its reply-to destination set.
+ ObjectMessage msg = null;
+
+ try
+ {
+ msg = perThreadSetup._pingItselfClient.getTestMessage(null,
+ Integer.parseInt(testParameters.getProperty(
+ MESSAGE_SIZE_PROPNAME)),
+ Boolean.parseBoolean(testParameters.getProperty(
+ PERSISTENT_MODE_PROPNAME)));
+ }
+ catch (JMSException e)
+ {
+
+ }
+
+ // start the test
+ long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME));
+
+ String correlationID = Long.toString(perThreadSetup._pingItselfClient.getNewID());
+
+ try
+ {
+ _logger.debug("Sending messages");
+
+ perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, numPings, correlationID);
+
+ _logger.debug("All sent");
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ Assert.fail("JMS Exception Recevied" + e);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+
+ while (!_done)
+ {
+ try
+ {
+ _logger.debug("Awating test finish");
+
+ perThreadSetup._pingItselfClient.getEndLock(correlationID).await(timeout, TimeUnit.MILLISECONDS);
+
+ if (perThreadSetup._pingItselfClient.getEndLock(correlationID).getCount() != 0)
+ {
+ _logger.error("Timeout occured");
+ _done = true;
+ }
+ else
+ {
+ _logger.error("Countdown reached Done?" + _done);
+ }
+ //Allow the time out to exit the loop.
+ }
+ catch (InterruptedException e)
+ {
+ //ignore
+ _logger.error("Awaiting test end interrupted.");
+
+ }
+ }
+
+ perThreadSetup._pingItselfClient.removeLock(correlationID);
+
+ // Fail the test if the timeout was exceeded.
+ int numReplies = _listener.getReplyCount();
+
+ _logger.info("Test Finished");
+
+ if (numReplies != numPings)
+ {
+
+ try
+ {
+ perThreadSetup._pingItselfClient.commitTx(perThreadSetup._pingItselfClient.getConsumerSession());
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error commiting recevied messages", e);
+ }
+ try
+ {
+ _timingController.completeTest(false, numPings - numReplies);
+ }
+ catch (InterruptedException e)
+ {
+ //ignore
+ }
+ Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies);
+ }
+ }
+
+ public void setTimingController(TimingController timingController)
+ {
+ _timingController = timingController;
+ }
+
+ public TimingController getTimingController()
+ {
+ return _timingController;
+ }
+
+
+ private class AsyncMessageListener implements MessageListener
+ {
+ private AtomicInteger _messageReceived;
+ private volatile int _totalMessages;
+ private int _batchSize;
+
+ public AsyncMessageListener(int batchSize, int totalMessages)
+ {
+ _batchSize = batchSize;
+ _totalMessages = totalMessages;
+ _messageReceived = new AtomicInteger(0);
+ }
+
+ public AsyncMessageListener(int batchSize)
+ {
+ _batchSize = batchSize;
+ _totalMessages = -1;
+ _messageReceived = new AtomicInteger(0);
+ }
+
+ public void setTotalMessages(int newTotal)
+ {
+ _totalMessages = newTotal;
+ _messageReceived.set(0);
+ }
+
+ public void onMessage(Message message)
+ {
+ _logger.trace("Message Recevied");
+
+ int messagesReceived = _messageReceived.incrementAndGet();
+
+ try
+ {
+ if (messagesReceived % _batchSize == 0)
+ {
+ if (_timingController != null)
+ {
+ _timingController.completeTest(true, _batchSize);
+ }
+
+ if (messagesReceived == _totalMessages)
+ {
+ _done = true;
+ }
+ }
+ else if (messagesReceived == _totalMessages)
+ {
+ _logger.info("Test Completed.. signalling");
+ doDone();
+ }
+
+ }
+ catch (InterruptedException e)
+ {
+ _logger.error("Interupted Test");
// doDone();
-// }
-// }
-//
-// private void doDone()
-// {
-// _done = true;
-//
-// _logger.error("Messages received:" + _messageReceived);
-// _logger.error("Total Messages :" + _totalMessages);
-//
-// try
-// {
-// _timingController.completeTest(true, _totalMessages - _messageReceived);
-// }
-// catch (InterruptedException e)
-// {
-// //ignore
-// }
-// }
-//
-// public int getReplyCount()
-// {
-// return _messageReceived;
-// }
-//
-// }
+ }
+
+ }
+
+ private void doDone()
+ {
+ _done = true;
+
+ _logger.trace("Messages received:" + _messageReceived.get());
+ _logger.trace("Total Messages :" + _totalMessages);
+
+ try
+ {
+ _timingController.completeTest(true, _totalMessages - _messageReceived.get());
+ }
+ catch (InterruptedException e)
+ {
+ //ignore
+ }
+ }
+
+ public int getReplyCount()
+ {
+ return _messageReceived.get();
+ }
+
+ }
}