diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-01-25 10:04:52 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-01-25 10:04:52 +0000 |
commit | 48332965a98fb8ac51323205ac644ce8b9e3757e (patch) | |
tree | 740b5f547c813a1c999745e42a4b16e8a3850e87 | |
parent | 8601949672c7ed0e53c4a99a6e5d285682b65a74 (diff) | |
download | qpid-python-48332965a98fb8ac51323205ac644ce8b9e3757e.tar.gz |
Race condition fixed fro AsyncTestPerf
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@499716 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java | 85 | ||||
-rw-r--r-- | qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java | 542 |
2 files changed, 332 insertions, 295 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index be46c1805b..1368f631fb 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -76,7 +76,9 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, */
protected static final int DEFAULT_MESSAGE_SIZE = 0;
- /** This is set and used when the test is for multiple-destinations */
+ /**
+ * This is set and used when the test is for multiple-destinations
+ */
protected static final int DEFAULT_DESTINATION_COUNT = 0;
protected static final int DEFAULT_RATE = 0;
@@ -202,10 +204,10 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, _throttleBatchSize = (int) Math.pow(100, x);
int throttleRate = rate / _throttleBatchSize;
- _logger.info("rate = " + rate);
- _logger.info("x = " + x);
- _logger.info("_throttleBatchSize = " + _throttleBatchSize);
- _logger.info("throttleRate = " + throttleRate);
+ _logger.debug("rate = " + rate);
+ _logger.debug("x = " + x);
+ _logger.debug("_throttleBatchSize = " + _throttleBatchSize);
+ _logger.debug("throttleRate = " + throttleRate);
rateLimiter = new Throttle();
rateLimiter.setRate(throttleRate);
@@ -519,6 +521,11 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, if (trafficLight != null)
{
+ if (_messageListener != null)
+ {
+ _messageListener.onMessage(message);
+ }
+
_logger.trace("Reply was expected, decrementing the latch for the id.");
trafficLight.countDown();
@@ -529,11 +536,6 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, commitTx(getConsumerSession());
}
- if (_messageListener != null)
- {
- _messageListener.onMessage(message);
- }
-
}
else
{
@@ -570,32 +572,39 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, */
public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException
{
- // Put a unique correlation id on the message before sending it.
- String messageCorrelationId = Long.toString(getNewID());
+ String messageCorrelationId = null;
- pingNoWaitForReply(message, numPings, messageCorrelationId);
+ try
+ {
+ // Put a unique correlation id on the message before sending it.
+ messageCorrelationId = Long.toString(getNewID());
- CountDownLatch trafficLight = trafficLights.get(messageCorrelationId);
- // Block the current thread until a reply to the message is received, or it times out.
- trafficLight.await(timeout, TimeUnit.MILLISECONDS);
+ pingNoWaitForReply(message, numPings, messageCorrelationId);
- trafficLights.remove(messageCorrelationId);
+ CountDownLatch trafficLight = trafficLights.get(messageCorrelationId);
+ // Block the current thread until a reply to the message is received, or it times out.
+ trafficLight.await(timeout, TimeUnit.MILLISECONDS);
- // Work out how many replies were receieved.
- int numReplies = numPings - (int) trafficLight.getCount();
+ // Work out how many replies were receieved.
+ int numReplies = numPings - (int) trafficLight.getCount();
- if ((numReplies < numPings) && _verbose)
- {
- _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
+ if ((numReplies < numPings) && _verbose)
+ {
+ _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
+ }
+ else if (_verbose)
+ {
+ _logger.info("Got all replies on id, " + messageCorrelationId);
+ }
+
+ commitTx(getConsumerSession());
+
+ return numReplies;
}
- else if (_verbose)
+ finally
{
- _logger.info("Got all replies on id, " + messageCorrelationId);
+ removeLock(messageCorrelationId);
}
-
- commitTx(getConsumerSession());
-
- return numReplies;
}
public long getNewID()
@@ -603,14 +612,20 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable, return idGenerator.incrementAndGet();
}
+ public CountDownLatch removeLock(String correlationID)
+ {
+ return trafficLights.remove(correlationID);
+ }
+
+
/*
- * Sends the specified ping message but does not wait for a correlating reply.
- *
- * @param message The message to send.
- * @param numPings The number of pings to send.
- * @return The reply, or null if no reply arrives before the timeout.
- * @throws JMSException All underlying JMSExceptions are allowed to fall through.
- */
+ * Sends the specified ping message but does not wait for a correlating reply.
+ *
+ * @param message The message to send.
+ * @param numPings The number of pings to send.
+ * @return The reply, or null if no reply arrives before the timeout.
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException, InterruptedException
{
// Create a count down latch to count the number of replies with. This is created before the message is sent
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java index c76f361f99..1a63518ef4 100644 --- a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java +++ b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.ping; -//import uk.co.thebadgerset.junit.extensions.TimingControllerAware; -//import uk.co.thebadgerset.junit.extensions.TimingController; +import uk.co.thebadgerset.junit.extensions.TimingControllerAware; +import uk.co.thebadgerset.junit.extensions.TimingController; import javax.jms.MessageListener; import javax.jms.ObjectMessage; @@ -33,15 +33,17 @@ import junit.framework.Test; import junit.framework.TestSuite; import org.apache.log4j.Logger; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeUnit; -public class PingAsyncTestPerf extends PingTestPerf //implements TimingControllerAware +public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerAware { -// private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class); + private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class); -// private TimingController _timingController; + private TimingController _timingController; -// private AsyncMessageListener _listener; + private AsyncMessageListener _listener; private volatile boolean _done = false; @@ -50,260 +52,280 @@ public class PingAsyncTestPerf extends PingTestPerf //implements TimingControlle super(name); } -// /** -// * Compile all the tests into a test suite. -// */ -// public static Test suite() -// { -// // Build a new test suite -// TestSuite suite = new TestSuite("Ping Performance Tests"); -// -// // Run performance tests in read committed mode. -// suite.addTest(new PingAsyncTestPerf("testAsyncPingOk")); -// -// return suite; -// } -// -// protected void setUp() throws Exception -// { -// // Create the test setups on a per thread basis, only if they have not already been created. -// -// if (threadSetup.get() == null) -// { -// PerThreadSetup perThreadSetup = new PerThreadSetup(); -// -// // Extract the test set up paramaeters. -// String brokerDetails = testParameters.getProperty(BROKER_PROPNAME); -// String username = "guest"; -// String password = "guest"; -// String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME); -// int destinationscount = Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME)); -// String destinationname = testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME); -// boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)); -// boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME)); -// String selector = null; -// boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME)); -// int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)); -// int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME)); -// boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME)); -// -// -// boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT)); -// boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT)); -// boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND)); -// boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND)); -// boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE)); -// -// int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE)); -// int commitbatchSize = Integer.parseInt(testParameters.getProperty(COMMIT_BATCH_SIZE)); -// -// // This is synchronized because there is a race condition, which causes one connection to sleep if -// // all threads try to create connection concurrently -// synchronized (this) -// { -// // Establish a client to ping a Queue and listen the reply back from same Queue -// perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath, -// destinationname, selector, transacted, persistent, -// messageSize, verbose, -// afterCommit, beforeCommit, afterSend, beforeSend, failOnce, -// commitbatchSize, destinationscount, rate, pubsub); -// -// -// _listener = new AsyncMessageListener(batchSize); -// -// perThreadSetup._pingItselfClient.setMessageListener(_listener); -// // Start the client connection -// perThreadSetup._pingItselfClient.getConnection().start(); -// -// // Attach the per-thread set to the thread. -// threadSetup.set(perThreadSetup); -// } -// } -// } -// -// -// public void testAsyncPingOk(int numPings) -// { -// _timingController = this.getTimingController(); -// -// _listener.setTotalMessages(numPings); -// -// PerThreadSetup perThreadSetup = threadSetup.get(); -// if (numPings == 0) -// { -// _logger.error("Number of pings requested was zero."); -// } -// -// // Generate a sample message. This message is already time stamped and has its reply-to destination set. -// ObjectMessage msg = null; -// -// try -// { -// msg = perThreadSetup._pingItselfClient.getTestMessage(null, -// Integer.parseInt(testParameters.getProperty( -// MESSAGE_SIZE_PROPNAME)), -// Boolean.parseBoolean(testParameters.getProperty( -// PERSISTENT_MODE_PROPNAME))); -// } -// catch (JMSException e) -// { -// -// } -// -// // start the test -// long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME)); -// -// String correlationID = Long.toString(perThreadSetup._pingItselfClient.getNewID()); -// -// try -// { -// _logger.debug("Sending messages"); -// -// perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, numPings, correlationID); -// -// _logger.debug("All sent"); -// } -// catch (JMSException e) -// { -// e.printStackTrace(); -// Assert.fail("JMS Exception Recevied" + e); -// } -// catch (InterruptedException e) -// { -// e.printStackTrace(); -// } -// -// while (!_done) -// { -// try -// { -// _logger.debug("Awating test finish"); -// -// perThreadSetup._pingItselfClient.getEndLock(correlationID).await(); -// -// -// } -// catch (InterruptedException e) -// { -// //ignore -// } -// } -// -// // Fail the test if the timeout was exceeded. -// int numReplies = _listener.getReplyCount(); -// -// _logger.info("Test Finished"); -// -// if (numReplies != numPings) -// { -// -// try -// { -// perThreadSetup._pingItselfClient.commitTx(perThreadSetup._pingItselfClient.getConsumerSession()); -// } -// catch (JMSException e) -// { -// _logger.error("Error commiting recevied messages", e); -// } -// try -// { -// _timingController.completeTest(false, numPings - numReplies); -// } -// catch (InterruptedException e) -// { -// //ignore -// } -// Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies); -// } -// } -// -// public void setTimingController(TimingController timingController) -// { -// _timingController = timingController; -// } -// -// public TimingController getTimingController() -// { -// return _timingController; -// } -// -// -// private class AsyncMessageListener implements MessageListener -// { -// private int _messageReceived; -// private int _totalMessages; -// private int _batchSize; -// -// public AsyncMessageListener(int batchSize, int totalMessages) -// { -// _batchSize = batchSize; -// _totalMessages = totalMessages; -// _messageReceived = 0; -// } -// -// public AsyncMessageListener(int batchSize) -// { -// _batchSize = batchSize; -// _totalMessages = -1; -// _messageReceived = 0; -// } -// -// public void setTotalMessages(int newTotal) -// { -// _totalMessages = newTotal; -// } -// -// public void onMessage(Message message) -// { -// _logger.trace("Message Recevied"); -// -// _messageReceived++; -// -// try -// { -// if (_messageReceived % _batchSize == 0) -// { -// if (_timingController != null) -// { -// _timingController.completeTest(true, _batchSize); -// } -// } -// } -// catch (InterruptedException e) -// { -//// _logger.error("Interupted"); -//// doDone(); -// } -// -// if (_totalMessages == -1 || _messageReceived == _totalMessages) -// { -// _logger.info("Test Completed.. signalling"); + /** + * Compile all the tests into a test suite. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping Performance Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingAsyncTestPerf("testAsyncPingOk")); + + return suite; + } + + protected void setUp() throws Exception + { + // Create the test setups on a per thread basis, only if they have not already been created. + + if (threadSetup.get() == null) + { + PerThreadSetup perThreadSetup = new PerThreadSetup(); + + // Extract the test set up paramaeters. + String brokerDetails = testParameters.getProperty(BROKER_PROPNAME); + String username = "guest"; + String password = "guest"; + String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME); + int destinationscount = Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME)); + String destinationname = testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME); + boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)); + boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME)); + String selector = null; + boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME)); + int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)); + int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME)); + boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME)); + + + boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT)); + boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT)); + boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND)); + boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND)); + boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE)); + + int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE)); + int commitbatchSize = Integer.parseInt(testParameters.getProperty(COMMIT_BATCH_SIZE)); + + // This is synchronized because there is a race condition, which causes one connection to sleep if + // all threads try to create connection concurrently + synchronized (this) + { + // Establish a client to ping a Queue and listen the reply back from same Queue + perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath, + destinationname, selector, transacted, persistent, + messageSize, verbose, + afterCommit, beforeCommit, afterSend, beforeSend, failOnce, + commitbatchSize, destinationscount, rate, pubsub); + + + _listener = new AsyncMessageListener(batchSize); + + perThreadSetup._pingItselfClient.setMessageListener(_listener); + // Start the client connection + perThreadSetup._pingItselfClient.getConnection().start(); + + // Attach the per-thread set to the thread. + threadSetup.set(perThreadSetup); + } + } + } + + + public void testAsyncPingOk(int numPings) + { + _timingController = this.getTimingController(); + + _listener.setTotalMessages(numPings); + + PerThreadSetup perThreadSetup = threadSetup.get(); + if (numPings == 0) + { + _logger.error("Number of pings requested was zero."); + } + + // Generate a sample message. This message is already time stamped and has its reply-to destination set. + ObjectMessage msg = null; + + try + { + msg = perThreadSetup._pingItselfClient.getTestMessage(null, + Integer.parseInt(testParameters.getProperty( + MESSAGE_SIZE_PROPNAME)), + Boolean.parseBoolean(testParameters.getProperty( + PERSISTENT_MODE_PROPNAME))); + } + catch (JMSException e) + { + + } + + // start the test + long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME)); + + String correlationID = Long.toString(perThreadSetup._pingItselfClient.getNewID()); + + try + { + _logger.debug("Sending messages"); + + perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, numPings, correlationID); + + _logger.debug("All sent"); + } + catch (JMSException e) + { + e.printStackTrace(); + Assert.fail("JMS Exception Recevied" + e); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + + while (!_done) + { + try + { + _logger.debug("Awating test finish"); + + perThreadSetup._pingItselfClient.getEndLock(correlationID).await(timeout, TimeUnit.MILLISECONDS); + + if (perThreadSetup._pingItselfClient.getEndLock(correlationID).getCount() != 0) + { + _logger.error("Timeout occured"); + _done = true; + } + else + { + _logger.error("Countdown reached Done?" + _done); + } + //Allow the time out to exit the loop. + } + catch (InterruptedException e) + { + //ignore + _logger.error("Awaiting test end interrupted."); + + } + } + + perThreadSetup._pingItselfClient.removeLock(correlationID); + + // Fail the test if the timeout was exceeded. + int numReplies = _listener.getReplyCount(); + + _logger.info("Test Finished"); + + if (numReplies != numPings) + { + + try + { + perThreadSetup._pingItselfClient.commitTx(perThreadSetup._pingItselfClient.getConsumerSession()); + } + catch (JMSException e) + { + _logger.error("Error commiting recevied messages", e); + } + try + { + _timingController.completeTest(false, numPings - numReplies); + } + catch (InterruptedException e) + { + //ignore + } + Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies); + } + } + + public void setTimingController(TimingController timingController) + { + _timingController = timingController; + } + + public TimingController getTimingController() + { + return _timingController; + } + + + private class AsyncMessageListener implements MessageListener + { + private AtomicInteger _messageReceived; + private volatile int _totalMessages; + private int _batchSize; + + public AsyncMessageListener(int batchSize, int totalMessages) + { + _batchSize = batchSize; + _totalMessages = totalMessages; + _messageReceived = new AtomicInteger(0); + } + + public AsyncMessageListener(int batchSize) + { + _batchSize = batchSize; + _totalMessages = -1; + _messageReceived = new AtomicInteger(0); + } + + public void setTotalMessages(int newTotal) + { + _totalMessages = newTotal; + _messageReceived.set(0); + } + + public void onMessage(Message message) + { + _logger.trace("Message Recevied"); + + int messagesReceived = _messageReceived.incrementAndGet(); + + try + { + if (messagesReceived % _batchSize == 0) + { + if (_timingController != null) + { + _timingController.completeTest(true, _batchSize); + } + + if (messagesReceived == _totalMessages) + { + _done = true; + } + } + else if (messagesReceived == _totalMessages) + { + _logger.info("Test Completed.. signalling"); + doDone(); + } + + } + catch (InterruptedException e) + { + _logger.error("Interupted Test"); // doDone(); -// } -// } -// -// private void doDone() -// { -// _done = true; -// -// _logger.error("Messages received:" + _messageReceived); -// _logger.error("Total Messages :" + _totalMessages); -// -// try -// { -// _timingController.completeTest(true, _totalMessages - _messageReceived); -// } -// catch (InterruptedException e) -// { -// //ignore -// } -// } -// -// public int getReplyCount() -// { -// return _messageReceived; -// } -// -// } + } + + } + + private void doDone() + { + _done = true; + + _logger.trace("Messages received:" + _messageReceived.get()); + _logger.trace("Total Messages :" + _totalMessages); + + try + { + _timingController.completeTest(true, _totalMessages - _messageReceived.get()); + } + catch (InterruptedException e) + { + //ignore + } + } + + public int getReplyCount() + { + return _messageReceived.get(); + } + + } } |