summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-01-25 10:59:36 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-01-25 10:59:36 +0000
commit156219089552d8dd6a89e55b82b57b2687af5f2a (patch)
tree630a0e8c4eedff52fdf60b05d666b2eeb5ef1a3c
parent48332965a98fb8ac51323205ac644ce8b9e3757e (diff)
downloadqpid-python-156219089552d8dd6a89e55b82b57b2687af5f2a.tar.gz
Refactored to use CountDownLatch as using local count was wrong in multi threaded case.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@499733 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java551
1 files changed, 268 insertions, 283 deletions
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
index 1a63518ef4..b30ecc1a44 100644
--- a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
@@ -20,8 +20,8 @@
*/
package org.apache.qpid.ping;
-import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
-import uk.co.thebadgerset.junit.extensions.TimingController;
+//import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
+//import uk.co.thebadgerset.junit.extensions.TimingController;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
@@ -35,297 +35,282 @@ import org.apache.log4j.Logger;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CountDownLatch;
-public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerAware
+public class PingAsyncTestPerf extends PingTestPerf //implements TimingControllerAware
{
- private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class);
+// private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class);
- private TimingController _timingController;
+// private TimingController _timingController;
- private AsyncMessageListener _listener;
-
- private volatile boolean _done = false;
+// private AsyncMessageListener _listener;
public PingAsyncTestPerf(String name)
{
super(name);
}
- /**
- * Compile all the tests into a test suite.
- */
- public static Test suite()
- {
- // Build a new test suite
- TestSuite suite = new TestSuite("Ping Performance Tests");
-
- // Run performance tests in read committed mode.
- suite.addTest(new PingAsyncTestPerf("testAsyncPingOk"));
-
- return suite;
- }
-
- protected void setUp() throws Exception
- {
- // Create the test setups on a per thread basis, only if they have not already been created.
-
- if (threadSetup.get() == null)
- {
- PerThreadSetup perThreadSetup = new PerThreadSetup();
-
- // Extract the test set up paramaeters.
- String brokerDetails = testParameters.getProperty(BROKER_PROPNAME);
- String username = "guest";
- String password = "guest";
- String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
- int destinationscount = Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME));
- String destinationname = testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME);
- boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
- boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
- String selector = null;
- boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
- int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
- int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
- boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME));
-
-
- boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
- boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
- boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND));
- boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND));
- boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE));
-
- int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE));
- int commitbatchSize = Integer.parseInt(testParameters.getProperty(COMMIT_BATCH_SIZE));
-
- // This is synchronized because there is a race condition, which causes one connection to sleep if
- // all threads try to create connection concurrently
- synchronized (this)
- {
- // Establish a client to ping a Queue and listen the reply back from same Queue
- perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath,
- destinationname, selector, transacted, persistent,
- messageSize, verbose,
- afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
- commitbatchSize, destinationscount, rate, pubsub);
-
-
- _listener = new AsyncMessageListener(batchSize);
-
- perThreadSetup._pingItselfClient.setMessageListener(_listener);
- // Start the client connection
- perThreadSetup._pingItselfClient.getConnection().start();
-
- // Attach the per-thread set to the thread.
- threadSetup.set(perThreadSetup);
- }
- }
- }
-
-
- public void testAsyncPingOk(int numPings)
- {
- _timingController = this.getTimingController();
-
- _listener.setTotalMessages(numPings);
-
- PerThreadSetup perThreadSetup = threadSetup.get();
- if (numPings == 0)
- {
- _logger.error("Number of pings requested was zero.");
- }
-
- // Generate a sample message. This message is already time stamped and has its reply-to destination set.
- ObjectMessage msg = null;
-
- try
- {
- msg = perThreadSetup._pingItselfClient.getTestMessage(null,
- Integer.parseInt(testParameters.getProperty(
- MESSAGE_SIZE_PROPNAME)),
- Boolean.parseBoolean(testParameters.getProperty(
- PERSISTENT_MODE_PROPNAME)));
- }
- catch (JMSException e)
- {
-
- }
-
- // start the test
- long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME));
-
- String correlationID = Long.toString(perThreadSetup._pingItselfClient.getNewID());
-
- try
- {
- _logger.debug("Sending messages");
-
- perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, numPings, correlationID);
-
- _logger.debug("All sent");
- }
- catch (JMSException e)
- {
- e.printStackTrace();
- Assert.fail("JMS Exception Recevied" + e);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
-
- while (!_done)
- {
- try
- {
- _logger.debug("Awating test finish");
-
- perThreadSetup._pingItselfClient.getEndLock(correlationID).await(timeout, TimeUnit.MILLISECONDS);
-
- if (perThreadSetup._pingItselfClient.getEndLock(correlationID).getCount() != 0)
- {
- _logger.error("Timeout occured");
- _done = true;
- }
- else
- {
- _logger.error("Countdown reached Done?" + _done);
- }
- //Allow the time out to exit the loop.
- }
- catch (InterruptedException e)
- {
- //ignore
- _logger.error("Awaiting test end interrupted.");
-
- }
- }
-
- perThreadSetup._pingItselfClient.removeLock(correlationID);
-
- // Fail the test if the timeout was exceeded.
- int numReplies = _listener.getReplyCount();
-
- _logger.info("Test Finished");
-
- if (numReplies != numPings)
- {
-
- try
- {
- perThreadSetup._pingItselfClient.commitTx(perThreadSetup._pingItselfClient.getConsumerSession());
- }
- catch (JMSException e)
- {
- _logger.error("Error commiting recevied messages", e);
- }
- try
- {
- _timingController.completeTest(false, numPings - numReplies);
- }
- catch (InterruptedException e)
- {
- //ignore
- }
- Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies);
- }
- }
-
- public void setTimingController(TimingController timingController)
- {
- _timingController = timingController;
- }
-
- public TimingController getTimingController()
- {
- return _timingController;
- }
-
-
- private class AsyncMessageListener implements MessageListener
- {
- private AtomicInteger _messageReceived;
- private volatile int _totalMessages;
- private int _batchSize;
-
- public AsyncMessageListener(int batchSize, int totalMessages)
- {
- _batchSize = batchSize;
- _totalMessages = totalMessages;
- _messageReceived = new AtomicInteger(0);
- }
-
- public AsyncMessageListener(int batchSize)
- {
- _batchSize = batchSize;
- _totalMessages = -1;
- _messageReceived = new AtomicInteger(0);
- }
-
- public void setTotalMessages(int newTotal)
- {
- _totalMessages = newTotal;
- _messageReceived.set(0);
- }
-
- public void onMessage(Message message)
- {
- _logger.trace("Message Recevied");
-
- int messagesReceived = _messageReceived.incrementAndGet();
-
- try
- {
- if (messagesReceived % _batchSize == 0)
- {
- if (_timingController != null)
- {
- _timingController.completeTest(true, _batchSize);
- }
-
- if (messagesReceived == _totalMessages)
- {
- _done = true;
- }
- }
- else if (messagesReceived == _totalMessages)
- {
- _logger.info("Test Completed.. signalling");
- doDone();
- }
-
- }
- catch (InterruptedException e)
- {
- _logger.error("Interupted Test");
-// doDone();
- }
-
- }
-
- private void doDone()
- {
- _done = true;
-
- _logger.trace("Messages received:" + _messageReceived.get());
- _logger.trace("Total Messages :" + _totalMessages);
-
- try
- {
- _timingController.completeTest(true, _totalMessages - _messageReceived.get());
- }
- catch (InterruptedException e)
- {
- //ignore
- }
- }
-
- public int getReplyCount()
- {
- return _messageReceived.get();
- }
-
- }
+// /**
+// * Compile all the tests into a test suite.
+// */
+// public static Test suite()
+// {
+// // Build a new test suite
+// TestSuite suite = new TestSuite("Ping Performance Tests");
+//
+// // Run performance tests in read committed mode.
+// suite.addTest(new PingAsyncTestPerf("testAsyncPingOk"));
+//
+// return suite;
+// }
+//
+// protected void setUp() throws Exception
+// {
+// // Create the test setups on a per thread basis, only if they have not already been created.
+//
+// if (threadSetup.get() == null)
+// {
+// PerThreadSetup perThreadSetup = new PerThreadSetup();
+//
+// // Extract the test set up paramaeters.
+// String brokerDetails = testParameters.getProperty(BROKER_PROPNAME);
+// String username = "guest";
+// String password = "guest";
+// String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
+// int destinationscount = Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME));
+// String destinationname = testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME);
+// boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
+// boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
+// String selector = null;
+// boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
+// int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
+// int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
+// boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME));
+//
+//
+// boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
+// boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
+// boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND));
+// boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND));
+// boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE));
+//
+// int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE));
+// int commitbatchSize = Integer.parseInt(testParameters.getProperty(COMMIT_BATCH_SIZE));
+//
+// // This is synchronized because there is a race condition, which causes one connection to sleep if
+// // all threads try to create connection concurrently
+// synchronized (this)
+// {
+// // Establish a client to ping a Queue and listen the reply back from same Queue
+// perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath,
+// destinationname, selector, transacted, persistent,
+// messageSize, verbose,
+// afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
+// commitbatchSize, destinationscount, rate, pubsub);
+// }
+//
+// // Attach the per-thread set to the thread.
+// threadSetup.set(perThreadSetup);
+//
+// _listener = new AsyncMessageListener(batchSize);
+//
+// perThreadSetup._pingItselfClient.setMessageListener(_listener);
+// // Start the client connection
+// perThreadSetup._pingItselfClient.getConnection().start();
+//
+// }
+// }
+//
+//
+// public void testAsyncPingOk(int numPings)
+// {
+// _timingController = this.getTimingController();
+//
+// _listener.setTotalMessages(numPings);
+//
+// PerThreadSetup perThreadSetup = threadSetup.get();
+// if (numPings == 0)
+// {
+// _logger.error("Number of pings requested was zero.");
+// fail("Number of pings requested was zero.");
+// }
+//
+// // Generate a sample message. This message is already time stamped and has its reply-to destination set.
+// ObjectMessage msg = null;
+//
+// try
+// {
+// msg = perThreadSetup._pingItselfClient.getTestMessage(null,
+// Integer.parseInt(testParameters.getProperty(
+// MESSAGE_SIZE_PROPNAME)),
+// Boolean.parseBoolean(testParameters.getProperty(
+// PERSISTENT_MODE_PROPNAME)));
+// }
+// catch (JMSException e)
+// {
+//
+// }
+//
+// // start the test
+// long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME));
+//
+// String correlationID = Long.toString(perThreadSetup._pingItselfClient.getNewID());
+//
+// try
+// {
+// _logger.debug("Sending messages");
+//
+// perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, numPings, correlationID);
+//
+// _logger.debug("All sent");
+// }
+// catch (JMSException e)
+// {
+// e.printStackTrace();
+// Assert.fail("JMS Exception Recevied" + e);
+// }
+// catch (InterruptedException e)
+// {
+// e.printStackTrace();
+// }
+//
+// try
+// {
+// _logger.debug("Awating test finish");
+//
+// perThreadSetup._pingItselfClient.getEndLock(correlationID).await(timeout, TimeUnit.MILLISECONDS);
+//
+// if (perThreadSetup._pingItselfClient.getEndLock(correlationID).getCount() != 0)
+// {
+// _logger.error("Timeout occured");
+// }
+// //Allow the time out to exit the loop.
+// }
+// catch (InterruptedException e)
+// {
+// //ignore
+// _logger.error("Awaiting test end was interrupted.");
+//
+// }
+//
+// // Fail the test if the timeout was exceeded.
+// int numReplies = numPings - (int) perThreadSetup._pingItselfClient.removeLock(correlationID).getCount();
+//
+// _logger.info("Test Finished");
+//
+// if (numReplies != numPings)
+// {
+// try
+// {
+// perThreadSetup._pingItselfClient.commitTx(perThreadSetup._pingItselfClient.getConsumerSession());
+// }
+// catch (JMSException e)
+// {
+// _logger.error("Error commiting recevied messages", e);
+// }
+// try
+// {
+// _timingController.completeTest(false, numPings - numReplies);
+// }
+// catch (InterruptedException e)
+// {
+// //ignore
+// }
+// Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies);
+// }
+// }
+//
+// public void setTimingController(TimingController timingController)
+// {
+// _timingController = timingController;
+// }
+//
+// public TimingController getTimingController()
+// {
+// return _timingController;
+// }
+//
+//
+// private class AsyncMessageListener implements MessageListener
+// {
+// private volatile int _totalMessages;
+// private int _batchSize;
+// PerThreadSetup _perThreadSetup;
+//
+// public AsyncMessageListener(int batchSize)
+// {
+// this(batchSize, -1);
+// }
+//
+// public AsyncMessageListener(int batchSize, int totalMessages)
+// {
+// _batchSize = batchSize;
+// _totalMessages = totalMessages;
+// _perThreadSetup = threadSetup.get();
+// }
+//
+// public void setTotalMessages(int newTotal)
+// {
+// _totalMessages = newTotal;
+// }
+//
+// public void onMessage(Message message)
+// {
+// try
+// {
+// _logger.trace("Message Recevied");
+//
+// CountDownLatch count = _perThreadSetup._pingItselfClient.getEndLock(message.getJMSCorrelationID());
+//
+// int messagesLeft = (int) count.getCount();
+//
+// int messagesReceived = _totalMessages - messagesLeft;
+//
+// try
+// {
+// if (messagesReceived % _batchSize == 0)
+// {
+// if (_timingController != null)
+// {
+// _timingController.completeTest(true, _batchSize);
+// }
+// }
+// else if (messagesReceived == _totalMessages)
+// {
+// _logger.info("Test Completed.. signalling");
+// doDone(messagesReceived);
+// }
+//
+// }
+// catch (InterruptedException e)
+// {
+// _logger.error("Interupted Test");
+//// doDone(messagesReceived);
+// }
+// }
+// catch (JMSException e)
+// {
+// _logger.warn("There was a JMSException", e);
+// }
+//
+// }
+//
+// private void doDone(int messageCount)
+// {
+// _logger.trace("Messages received:" + messageCount);
+// _logger.trace("Total Messages :" + _totalMessages);
+//
+// try
+// {
+// _timingController.completeTest(true, messageCount);
+// }
+// catch (InterruptedException e)
+// {
+// //ignore
+// }
+// }
+//
+// }
}