diff options
Diffstat (limited to 'qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java')
-rw-r--r-- | qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java | 542 |
1 files changed, 282 insertions, 260 deletions
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java index c76f361f99..1a63518ef4 100644 --- a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java +++ b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.ping; -//import uk.co.thebadgerset.junit.extensions.TimingControllerAware; -//import uk.co.thebadgerset.junit.extensions.TimingController; +import uk.co.thebadgerset.junit.extensions.TimingControllerAware; +import uk.co.thebadgerset.junit.extensions.TimingController; import javax.jms.MessageListener; import javax.jms.ObjectMessage; @@ -33,15 +33,17 @@ import junit.framework.Test; import junit.framework.TestSuite; import org.apache.log4j.Logger; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeUnit; -public class PingAsyncTestPerf extends PingTestPerf //implements TimingControllerAware +public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerAware { -// private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class); + private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class); -// private TimingController _timingController; + private TimingController _timingController; -// private AsyncMessageListener _listener; + private AsyncMessageListener _listener; private volatile boolean _done = false; @@ -50,260 +52,280 @@ public class PingAsyncTestPerf extends PingTestPerf //implements TimingControlle super(name); } -// /** -// * Compile all the tests into a test suite. -// */ -// public static Test suite() -// { -// // Build a new test suite -// TestSuite suite = new TestSuite("Ping Performance Tests"); -// -// // Run performance tests in read committed mode. -// suite.addTest(new PingAsyncTestPerf("testAsyncPingOk")); -// -// return suite; -// } -// -// protected void setUp() throws Exception -// { -// // Create the test setups on a per thread basis, only if they have not already been created. -// -// if (threadSetup.get() == null) -// { -// PerThreadSetup perThreadSetup = new PerThreadSetup(); -// -// // Extract the test set up paramaeters. -// String brokerDetails = testParameters.getProperty(BROKER_PROPNAME); -// String username = "guest"; -// String password = "guest"; -// String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME); -// int destinationscount = Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME)); -// String destinationname = testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME); -// boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)); -// boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME)); -// String selector = null; -// boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME)); -// int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)); -// int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME)); -// boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME)); -// -// -// boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT)); -// boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT)); -// boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND)); -// boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND)); -// boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE)); -// -// int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE)); -// int commitbatchSize = Integer.parseInt(testParameters.getProperty(COMMIT_BATCH_SIZE)); -// -// // This is synchronized because there is a race condition, which causes one connection to sleep if -// // all threads try to create connection concurrently -// synchronized (this) -// { -// // Establish a client to ping a Queue and listen the reply back from same Queue -// perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath, -// destinationname, selector, transacted, persistent, -// messageSize, verbose, -// afterCommit, beforeCommit, afterSend, beforeSend, failOnce, -// commitbatchSize, destinationscount, rate, pubsub); -// -// -// _listener = new AsyncMessageListener(batchSize); -// -// perThreadSetup._pingItselfClient.setMessageListener(_listener); -// // Start the client connection -// perThreadSetup._pingItselfClient.getConnection().start(); -// -// // Attach the per-thread set to the thread. -// threadSetup.set(perThreadSetup); -// } -// } -// } -// -// -// public void testAsyncPingOk(int numPings) -// { -// _timingController = this.getTimingController(); -// -// _listener.setTotalMessages(numPings); -// -// PerThreadSetup perThreadSetup = threadSetup.get(); -// if (numPings == 0) -// { -// _logger.error("Number of pings requested was zero."); -// } -// -// // Generate a sample message. This message is already time stamped and has its reply-to destination set. -// ObjectMessage msg = null; -// -// try -// { -// msg = perThreadSetup._pingItselfClient.getTestMessage(null, -// Integer.parseInt(testParameters.getProperty( -// MESSAGE_SIZE_PROPNAME)), -// Boolean.parseBoolean(testParameters.getProperty( -// PERSISTENT_MODE_PROPNAME))); -// } -// catch (JMSException e) -// { -// -// } -// -// // start the test -// long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME)); -// -// String correlationID = Long.toString(perThreadSetup._pingItselfClient.getNewID()); -// -// try -// { -// _logger.debug("Sending messages"); -// -// perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, numPings, correlationID); -// -// _logger.debug("All sent"); -// } -// catch (JMSException e) -// { -// e.printStackTrace(); -// Assert.fail("JMS Exception Recevied" + e); -// } -// catch (InterruptedException e) -// { -// e.printStackTrace(); -// } -// -// while (!_done) -// { -// try -// { -// _logger.debug("Awating test finish"); -// -// perThreadSetup._pingItselfClient.getEndLock(correlationID).await(); -// -// -// } -// catch (InterruptedException e) -// { -// //ignore -// } -// } -// -// // Fail the test if the timeout was exceeded. -// int numReplies = _listener.getReplyCount(); -// -// _logger.info("Test Finished"); -// -// if (numReplies != numPings) -// { -// -// try -// { -// perThreadSetup._pingItselfClient.commitTx(perThreadSetup._pingItselfClient.getConsumerSession()); -// } -// catch (JMSException e) -// { -// _logger.error("Error commiting recevied messages", e); -// } -// try -// { -// _timingController.completeTest(false, numPings - numReplies); -// } -// catch (InterruptedException e) -// { -// //ignore -// } -// Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies); -// } -// } -// -// public void setTimingController(TimingController timingController) -// { -// _timingController = timingController; -// } -// -// public TimingController getTimingController() -// { -// return _timingController; -// } -// -// -// private class AsyncMessageListener implements MessageListener -// { -// private int _messageReceived; -// private int _totalMessages; -// private int _batchSize; -// -// public AsyncMessageListener(int batchSize, int totalMessages) -// { -// _batchSize = batchSize; -// _totalMessages = totalMessages; -// _messageReceived = 0; -// } -// -// public AsyncMessageListener(int batchSize) -// { -// _batchSize = batchSize; -// _totalMessages = -1; -// _messageReceived = 0; -// } -// -// public void setTotalMessages(int newTotal) -// { -// _totalMessages = newTotal; -// } -// -// public void onMessage(Message message) -// { -// _logger.trace("Message Recevied"); -// -// _messageReceived++; -// -// try -// { -// if (_messageReceived % _batchSize == 0) -// { -// if (_timingController != null) -// { -// _timingController.completeTest(true, _batchSize); -// } -// } -// } -// catch (InterruptedException e) -// { -//// _logger.error("Interupted"); -//// doDone(); -// } -// -// if (_totalMessages == -1 || _messageReceived == _totalMessages) -// { -// _logger.info("Test Completed.. signalling"); + /** + * Compile all the tests into a test suite. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping Performance Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingAsyncTestPerf("testAsyncPingOk")); + + return suite; + } + + protected void setUp() throws Exception + { + // Create the test setups on a per thread basis, only if they have not already been created. + + if (threadSetup.get() == null) + { + PerThreadSetup perThreadSetup = new PerThreadSetup(); + + // Extract the test set up paramaeters. + String brokerDetails = testParameters.getProperty(BROKER_PROPNAME); + String username = "guest"; + String password = "guest"; + String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME); + int destinationscount = Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME)); + String destinationname = testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME); + boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)); + boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME)); + String selector = null; + boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME)); + int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)); + int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME)); + boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME)); + + + boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT)); + boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT)); + boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND)); + boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND)); + boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE)); + + int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE)); + int commitbatchSize = Integer.parseInt(testParameters.getProperty(COMMIT_BATCH_SIZE)); + + // This is synchronized because there is a race condition, which causes one connection to sleep if + // all threads try to create connection concurrently + synchronized (this) + { + // Establish a client to ping a Queue and listen the reply back from same Queue + perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath, + destinationname, selector, transacted, persistent, + messageSize, verbose, + afterCommit, beforeCommit, afterSend, beforeSend, failOnce, + commitbatchSize, destinationscount, rate, pubsub); + + + _listener = new AsyncMessageListener(batchSize); + + perThreadSetup._pingItselfClient.setMessageListener(_listener); + // Start the client connection + perThreadSetup._pingItselfClient.getConnection().start(); + + // Attach the per-thread set to the thread. + threadSetup.set(perThreadSetup); + } + } + } + + + public void testAsyncPingOk(int numPings) + { + _timingController = this.getTimingController(); + + _listener.setTotalMessages(numPings); + + PerThreadSetup perThreadSetup = threadSetup.get(); + if (numPings == 0) + { + _logger.error("Number of pings requested was zero."); + } + + // Generate a sample message. This message is already time stamped and has its reply-to destination set. + ObjectMessage msg = null; + + try + { + msg = perThreadSetup._pingItselfClient.getTestMessage(null, + Integer.parseInt(testParameters.getProperty( + MESSAGE_SIZE_PROPNAME)), + Boolean.parseBoolean(testParameters.getProperty( + PERSISTENT_MODE_PROPNAME))); + } + catch (JMSException e) + { + + } + + // start the test + long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME)); + + String correlationID = Long.toString(perThreadSetup._pingItselfClient.getNewID()); + + try + { + _logger.debug("Sending messages"); + + perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, numPings, correlationID); + + _logger.debug("All sent"); + } + catch (JMSException e) + { + e.printStackTrace(); + Assert.fail("JMS Exception Recevied" + e); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + + while (!_done) + { + try + { + _logger.debug("Awating test finish"); + + perThreadSetup._pingItselfClient.getEndLock(correlationID).await(timeout, TimeUnit.MILLISECONDS); + + if (perThreadSetup._pingItselfClient.getEndLock(correlationID).getCount() != 0) + { + _logger.error("Timeout occured"); + _done = true; + } + else + { + _logger.error("Countdown reached Done?" + _done); + } + //Allow the time out to exit the loop. + } + catch (InterruptedException e) + { + //ignore + _logger.error("Awaiting test end interrupted."); + + } + } + + perThreadSetup._pingItselfClient.removeLock(correlationID); + + // Fail the test if the timeout was exceeded. + int numReplies = _listener.getReplyCount(); + + _logger.info("Test Finished"); + + if (numReplies != numPings) + { + + try + { + perThreadSetup._pingItselfClient.commitTx(perThreadSetup._pingItselfClient.getConsumerSession()); + } + catch (JMSException e) + { + _logger.error("Error commiting recevied messages", e); + } + try + { + _timingController.completeTest(false, numPings - numReplies); + } + catch (InterruptedException e) + { + //ignore + } + Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies); + } + } + + public void setTimingController(TimingController timingController) + { + _timingController = timingController; + } + + public TimingController getTimingController() + { + return _timingController; + } + + + private class AsyncMessageListener implements MessageListener + { + private AtomicInteger _messageReceived; + private volatile int _totalMessages; + private int _batchSize; + + public AsyncMessageListener(int batchSize, int totalMessages) + { + _batchSize = batchSize; + _totalMessages = totalMessages; + _messageReceived = new AtomicInteger(0); + } + + public AsyncMessageListener(int batchSize) + { + _batchSize = batchSize; + _totalMessages = -1; + _messageReceived = new AtomicInteger(0); + } + + public void setTotalMessages(int newTotal) + { + _totalMessages = newTotal; + _messageReceived.set(0); + } + + public void onMessage(Message message) + { + _logger.trace("Message Recevied"); + + int messagesReceived = _messageReceived.incrementAndGet(); + + try + { + if (messagesReceived % _batchSize == 0) + { + if (_timingController != null) + { + _timingController.completeTest(true, _batchSize); + } + + if (messagesReceived == _totalMessages) + { + _done = true; + } + } + else if (messagesReceived == _totalMessages) + { + _logger.info("Test Completed.. signalling"); + doDone(); + } + + } + catch (InterruptedException e) + { + _logger.error("Interupted Test"); // doDone(); -// } -// } -// -// private void doDone() -// { -// _done = true; -// -// _logger.error("Messages received:" + _messageReceived); -// _logger.error("Total Messages :" + _totalMessages); -// -// try -// { -// _timingController.completeTest(true, _totalMessages - _messageReceived); -// } -// catch (InterruptedException e) -// { -// //ignore -// } -// } -// -// public int getReplyCount() -// { -// return _messageReceived; -// } -// -// } + } + + } + + private void doDone() + { + _done = true; + + _logger.trace("Messages received:" + _messageReceived.get()); + _logger.trace("Total Messages :" + _totalMessages); + + try + { + _timingController.completeTest(true, _totalMessages - _messageReceived.get()); + } + catch (InterruptedException e) + { + //ignore + } + } + + public int getReplyCount() + { + return _messageReceived.get(); + } + + } } |