diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-01-25 10:59:36 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-01-25 10:59:36 +0000 |
commit | 156219089552d8dd6a89e55b82b57b2687af5f2a (patch) | |
tree | 630a0e8c4eedff52fdf60b05d666b2eeb5ef1a3c | |
parent | 48332965a98fb8ac51323205ac644ce8b9e3757e (diff) | |
download | qpid-python-156219089552d8dd6a89e55b82b57b2687af5f2a.tar.gz |
Refactored to use CountDownLatch as using local count was wrong in multi threaded case.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@499733 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java | 551 |
1 files changed, 268 insertions, 283 deletions
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java index 1a63518ef4..b30ecc1a44 100644 --- a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java +++ b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.ping; -import uk.co.thebadgerset.junit.extensions.TimingControllerAware; -import uk.co.thebadgerset.junit.extensions.TimingController; +//import uk.co.thebadgerset.junit.extensions.TimingControllerAware; +//import uk.co.thebadgerset.junit.extensions.TimingController; import javax.jms.MessageListener; import javax.jms.ObjectMessage; @@ -35,297 +35,282 @@ import org.apache.log4j.Logger; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.TimeUnit; +import java.util.concurrent.CountDownLatch; -public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerAware +public class PingAsyncTestPerf extends PingTestPerf //implements TimingControllerAware { - private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class); +// private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class); - private TimingController _timingController; +// private TimingController _timingController; - private AsyncMessageListener _listener; - - private volatile boolean _done = false; +// private AsyncMessageListener _listener; public PingAsyncTestPerf(String name) { super(name); } - /** - * Compile all the tests into a test suite. - */ - public static Test suite() - { - // Build a new test suite - TestSuite suite = new TestSuite("Ping Performance Tests"); - - // Run performance tests in read committed mode. - suite.addTest(new PingAsyncTestPerf("testAsyncPingOk")); - - return suite; - } - - protected void setUp() throws Exception - { - // Create the test setups on a per thread basis, only if they have not already been created. - - if (threadSetup.get() == null) - { - PerThreadSetup perThreadSetup = new PerThreadSetup(); - - // Extract the test set up paramaeters. - String brokerDetails = testParameters.getProperty(BROKER_PROPNAME); - String username = "guest"; - String password = "guest"; - String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME); - int destinationscount = Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME)); - String destinationname = testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME); - boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)); - boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME)); - String selector = null; - boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME)); - int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)); - int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME)); - boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME)); - - - boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT)); - boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT)); - boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND)); - boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND)); - boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE)); - - int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE)); - int commitbatchSize = Integer.parseInt(testParameters.getProperty(COMMIT_BATCH_SIZE)); - - // This is synchronized because there is a race condition, which causes one connection to sleep if - // all threads try to create connection concurrently - synchronized (this) - { - // Establish a client to ping a Queue and listen the reply back from same Queue - perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath, - destinationname, selector, transacted, persistent, - messageSize, verbose, - afterCommit, beforeCommit, afterSend, beforeSend, failOnce, - commitbatchSize, destinationscount, rate, pubsub); - - - _listener = new AsyncMessageListener(batchSize); - - perThreadSetup._pingItselfClient.setMessageListener(_listener); - // Start the client connection - perThreadSetup._pingItselfClient.getConnection().start(); - - // Attach the per-thread set to the thread. - threadSetup.set(perThreadSetup); - } - } - } - - - public void testAsyncPingOk(int numPings) - { - _timingController = this.getTimingController(); - - _listener.setTotalMessages(numPings); - - PerThreadSetup perThreadSetup = threadSetup.get(); - if (numPings == 0) - { - _logger.error("Number of pings requested was zero."); - } - - // Generate a sample message. This message is already time stamped and has its reply-to destination set. - ObjectMessage msg = null; - - try - { - msg = perThreadSetup._pingItselfClient.getTestMessage(null, - Integer.parseInt(testParameters.getProperty( - MESSAGE_SIZE_PROPNAME)), - Boolean.parseBoolean(testParameters.getProperty( - PERSISTENT_MODE_PROPNAME))); - } - catch (JMSException e) - { - - } - - // start the test - long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME)); - - String correlationID = Long.toString(perThreadSetup._pingItselfClient.getNewID()); - - try - { - _logger.debug("Sending messages"); - - perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, numPings, correlationID); - - _logger.debug("All sent"); - } - catch (JMSException e) - { - e.printStackTrace(); - Assert.fail("JMS Exception Recevied" + e); - } - catch (InterruptedException e) - { - e.printStackTrace(); - } - - while (!_done) - { - try - { - _logger.debug("Awating test finish"); - - perThreadSetup._pingItselfClient.getEndLock(correlationID).await(timeout, TimeUnit.MILLISECONDS); - - if (perThreadSetup._pingItselfClient.getEndLock(correlationID).getCount() != 0) - { - _logger.error("Timeout occured"); - _done = true; - } - else - { - _logger.error("Countdown reached Done?" + _done); - } - //Allow the time out to exit the loop. - } - catch (InterruptedException e) - { - //ignore - _logger.error("Awaiting test end interrupted."); - - } - } - - perThreadSetup._pingItselfClient.removeLock(correlationID); - - // Fail the test if the timeout was exceeded. - int numReplies = _listener.getReplyCount(); - - _logger.info("Test Finished"); - - if (numReplies != numPings) - { - - try - { - perThreadSetup._pingItselfClient.commitTx(perThreadSetup._pingItselfClient.getConsumerSession()); - } - catch (JMSException e) - { - _logger.error("Error commiting recevied messages", e); - } - try - { - _timingController.completeTest(false, numPings - numReplies); - } - catch (InterruptedException e) - { - //ignore - } - Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies); - } - } - - public void setTimingController(TimingController timingController) - { - _timingController = timingController; - } - - public TimingController getTimingController() - { - return _timingController; - } - - - private class AsyncMessageListener implements MessageListener - { - private AtomicInteger _messageReceived; - private volatile int _totalMessages; - private int _batchSize; - - public AsyncMessageListener(int batchSize, int totalMessages) - { - _batchSize = batchSize; - _totalMessages = totalMessages; - _messageReceived = new AtomicInteger(0); - } - - public AsyncMessageListener(int batchSize) - { - _batchSize = batchSize; - _totalMessages = -1; - _messageReceived = new AtomicInteger(0); - } - - public void setTotalMessages(int newTotal) - { - _totalMessages = newTotal; - _messageReceived.set(0); - } - - public void onMessage(Message message) - { - _logger.trace("Message Recevied"); - - int messagesReceived = _messageReceived.incrementAndGet(); - - try - { - if (messagesReceived % _batchSize == 0) - { - if (_timingController != null) - { - _timingController.completeTest(true, _batchSize); - } - - if (messagesReceived == _totalMessages) - { - _done = true; - } - } - else if (messagesReceived == _totalMessages) - { - _logger.info("Test Completed.. signalling"); - doDone(); - } - - } - catch (InterruptedException e) - { - _logger.error("Interupted Test"); -// doDone(); - } - - } - - private void doDone() - { - _done = true; - - _logger.trace("Messages received:" + _messageReceived.get()); - _logger.trace("Total Messages :" + _totalMessages); - - try - { - _timingController.completeTest(true, _totalMessages - _messageReceived.get()); - } - catch (InterruptedException e) - { - //ignore - } - } - - public int getReplyCount() - { - return _messageReceived.get(); - } - - } +// /** +// * Compile all the tests into a test suite. +// */ +// public static Test suite() +// { +// // Build a new test suite +// TestSuite suite = new TestSuite("Ping Performance Tests"); +// +// // Run performance tests in read committed mode. +// suite.addTest(new PingAsyncTestPerf("testAsyncPingOk")); +// +// return suite; +// } +// +// protected void setUp() throws Exception +// { +// // Create the test setups on a per thread basis, only if they have not already been created. +// +// if (threadSetup.get() == null) +// { +// PerThreadSetup perThreadSetup = new PerThreadSetup(); +// +// // Extract the test set up paramaeters. +// String brokerDetails = testParameters.getProperty(BROKER_PROPNAME); +// String username = "guest"; +// String password = "guest"; +// String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME); +// int destinationscount = Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME)); +// String destinationname = testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME); +// boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)); +// boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME)); +// String selector = null; +// boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME)); +// int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)); +// int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME)); +// boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME)); +// +// +// boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT)); +// boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT)); +// boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND)); +// boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND)); +// boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE)); +// +// int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE)); +// int commitbatchSize = Integer.parseInt(testParameters.getProperty(COMMIT_BATCH_SIZE)); +// +// // This is synchronized because there is a race condition, which causes one connection to sleep if +// // all threads try to create connection concurrently +// synchronized (this) +// { +// // Establish a client to ping a Queue and listen the reply back from same Queue +// perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath, +// destinationname, selector, transacted, persistent, +// messageSize, verbose, +// afterCommit, beforeCommit, afterSend, beforeSend, failOnce, +// commitbatchSize, destinationscount, rate, pubsub); +// } +// +// // Attach the per-thread set to the thread. +// threadSetup.set(perThreadSetup); +// +// _listener = new AsyncMessageListener(batchSize); +// +// perThreadSetup._pingItselfClient.setMessageListener(_listener); +// // Start the client connection +// perThreadSetup._pingItselfClient.getConnection().start(); +// +// } +// } +// +// +// public void testAsyncPingOk(int numPings) +// { +// _timingController = this.getTimingController(); +// +// _listener.setTotalMessages(numPings); +// +// PerThreadSetup perThreadSetup = threadSetup.get(); +// if (numPings == 0) +// { +// _logger.error("Number of pings requested was zero."); +// fail("Number of pings requested was zero."); +// } +// +// // Generate a sample message. This message is already time stamped and has its reply-to destination set. +// ObjectMessage msg = null; +// +// try +// { +// msg = perThreadSetup._pingItselfClient.getTestMessage(null, +// Integer.parseInt(testParameters.getProperty( +// MESSAGE_SIZE_PROPNAME)), +// Boolean.parseBoolean(testParameters.getProperty( +// PERSISTENT_MODE_PROPNAME))); +// } +// catch (JMSException e) +// { +// +// } +// +// // start the test +// long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME)); +// +// String correlationID = Long.toString(perThreadSetup._pingItselfClient.getNewID()); +// +// try +// { +// _logger.debug("Sending messages"); +// +// perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, numPings, correlationID); +// +// _logger.debug("All sent"); +// } +// catch (JMSException e) +// { +// e.printStackTrace(); +// Assert.fail("JMS Exception Recevied" + e); +// } +// catch (InterruptedException e) +// { +// e.printStackTrace(); +// } +// +// try +// { +// _logger.debug("Awating test finish"); +// +// perThreadSetup._pingItselfClient.getEndLock(correlationID).await(timeout, TimeUnit.MILLISECONDS); +// +// if (perThreadSetup._pingItselfClient.getEndLock(correlationID).getCount() != 0) +// { +// _logger.error("Timeout occured"); +// } +// //Allow the time out to exit the loop. +// } +// catch (InterruptedException e) +// { +// //ignore +// _logger.error("Awaiting test end was interrupted."); +// +// } +// +// // Fail the test if the timeout was exceeded. +// int numReplies = numPings - (int) perThreadSetup._pingItselfClient.removeLock(correlationID).getCount(); +// +// _logger.info("Test Finished"); +// +// if (numReplies != numPings) +// { +// try +// { +// perThreadSetup._pingItselfClient.commitTx(perThreadSetup._pingItselfClient.getConsumerSession()); +// } +// catch (JMSException e) +// { +// _logger.error("Error commiting recevied messages", e); +// } +// try +// { +// _timingController.completeTest(false, numPings - numReplies); +// } +// catch (InterruptedException e) +// { +// //ignore +// } +// Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies); +// } +// } +// +// public void setTimingController(TimingController timingController) +// { +// _timingController = timingController; +// } +// +// public TimingController getTimingController() +// { +// return _timingController; +// } +// +// +// private class AsyncMessageListener implements MessageListener +// { +// private volatile int _totalMessages; +// private int _batchSize; +// PerThreadSetup _perThreadSetup; +// +// public AsyncMessageListener(int batchSize) +// { +// this(batchSize, -1); +// } +// +// public AsyncMessageListener(int batchSize, int totalMessages) +// { +// _batchSize = batchSize; +// _totalMessages = totalMessages; +// _perThreadSetup = threadSetup.get(); +// } +// +// public void setTotalMessages(int newTotal) +// { +// _totalMessages = newTotal; +// } +// +// public void onMessage(Message message) +// { +// try +// { +// _logger.trace("Message Recevied"); +// +// CountDownLatch count = _perThreadSetup._pingItselfClient.getEndLock(message.getJMSCorrelationID()); +// +// int messagesLeft = (int) count.getCount(); +// +// int messagesReceived = _totalMessages - messagesLeft; +// +// try +// { +// if (messagesReceived % _batchSize == 0) +// { +// if (_timingController != null) +// { +// _timingController.completeTest(true, _batchSize); +// } +// } +// else if (messagesReceived == _totalMessages) +// { +// _logger.info("Test Completed.. signalling"); +// doDone(messagesReceived); +// } +// +// } +// catch (InterruptedException e) +// { +// _logger.error("Interupted Test"); +//// doDone(messagesReceived); +// } +// } +// catch (JMSException e) +// { +// _logger.warn("There was a JMSException", e); +// } +// +// } +// +// private void doDone(int messageCount) +// { +// _logger.trace("Messages received:" + messageCount); +// _logger.trace("Total Messages :" + _totalMessages); +// +// try +// { +// _timingController.completeTest(true, messageCount); +// } +// catch (InterruptedException e) +// { +// //ignore +// } +// } +// +// } } |