From 4ee487686ace6ec20b46ba135c8ef9e8a88cd1df Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Tue, 23 Jan 2007 22:34:25 +0000 Subject: Updated perftests to include an Asynchronous ping sender git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@499166 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/ping/PingAsyncTestPerf.java | 288 +++++++++++++++++++++ .../java/org/apache/qpid/ping/PingTestPerf.java | 60 +++-- 2 files changed, 320 insertions(+), 28 deletions(-) create mode 100644 java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java (limited to 'java/perftests/src/test') diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java new file mode 100644 index 0000000000..d3ce064831 --- /dev/null +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.ping; + +import uk.co.thebadgerset.junit.extensions.TimingControllerAware; +import uk.co.thebadgerset.junit.extensions.TimingController; + +import javax.jms.MessageListener; +import javax.jms.ObjectMessage; +import javax.jms.JMSException; +import javax.jms.Message; + +import junit.framework.Assert; +import junit.framework.Test; +import junit.framework.TestSuite; +import org.apache.log4j.Logger; + +import java.util.concurrent.CountDownLatch; + +public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerAware +{ + private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class); + + private TimingController _timingController; + + private final CountDownLatch _completedLock = new CountDownLatch(1); + + private AsyncMessageListener _listener; + + private volatile boolean _done = false; + + public PingAsyncTestPerf(String name) + { + super(name); + } + + /** + * Compile all the tests into a test suite. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping Performance Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingAsyncTestPerf("testAsyncPingOk")); + + return suite; + } + + protected void setUp() throws Exception + { + // Create the test setups on a per thread basis, only if they have not already been created. + + if (threadSetup.get() == null) + { + PerThreadSetup perThreadSetup = new PerThreadSetup(); + + // Extract the test set up paramaeters. + String brokerDetails = testParameters.getProperty(BROKER_PROPNAME); + String username = "guest"; + String password = "guest"; + String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME); + int queueCount = Integer.parseInt(testParameters.getProperty(PING_QUEUE_COUNT_PROPNAME)); + String queueName = testParameters.getProperty(PING_QUEUE_NAME_PROPNAME); + boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)); + boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME)); + String selector = null; + boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME)); + int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)); + + boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT)); + boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT)); + boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND)); + boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND)); + boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE)); + + int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE)); + int commitbatchSize = Integer.parseInt(testParameters.getProperty(COMMIT_BATCH_SIZE)); + + int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME)); + + // This is synchronized because there is a race condition, which causes one connection to sleep if + // all threads try to create connection concurrently + synchronized (this) + { + // Establish a client to ping a Queue and listen the reply back from same Queue + perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath, + queueName, selector, transacted, persistent, + messageSize, verbose, + afterCommit, beforeCommit, afterSend, beforeSend, failOnce, + commitbatchSize, queueCount, rate); + + + _listener = new AsyncMessageListener(batchSize); + + perThreadSetup._pingItselfClient.setMessageListener(_listener); + // Start the client connection + perThreadSetup._pingItselfClient.getConnection().start(); + + // Attach the per-thread set to the thread. + threadSetup.set(perThreadSetup); + } + } + } + + + public void testAsyncPingOk(int numPings) + { + _timingController = this.getTimingController(); + + _listener.setTotalMessages(numPings); + + PerThreadSetup perThreadSetup = threadSetup.get(); + if (numPings == 0) + { + _logger.error("Number of pings requested was zero."); + } + + // Generate a sample message. This message is already time stamped and has its reply-to destination set. + ObjectMessage msg = null; + + try + { + msg = perThreadSetup._pingItselfClient.getTestMessage(null, + Integer.parseInt(testParameters.getProperty( + MESSAGE_SIZE_PROPNAME)), + Boolean.parseBoolean(testParameters.getProperty( + PERSISTENT_MODE_PROPNAME))); + } + catch (JMSException e) + { + + } + + // start the test + long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME)); + + try + { + perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, numPings); + } + catch (JMSException e) + { + e.printStackTrace(); + Assert.fail("JMS Exception Recevied" + e); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + + while (!_done) + { + try + { + _logger.info("awating test finish"); + + _completedLock.await(); + } + catch (InterruptedException e) + { + //ignore + } + } + + // Fail the test if the timeout was exceeded. + int numReplies = _listener.getReplyCount(); + + _logger.info("Test Finished"); + + if (numReplies != numPings) + + { + Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies); + try + { + _timingController.completeTest(false); + } + catch (InterruptedException e) + { + //ignore + } + } + } + + + public void setTimingController(TimingController timingController) + { + _timingController = timingController; + } + + public TimingController getTimingController() + { + return _timingController; + } + + + private class AsyncMessageListener implements MessageListener + { + private int _messageRecevied; + private int _totalMessages; + private int _batchSize; + + public AsyncMessageListener(int batchSize, int totalMessages) + { + _batchSize = batchSize; + _totalMessages = totalMessages; + _messageRecevied = 0; + } + + public AsyncMessageListener(int batchSize) + { + _batchSize = batchSize; + _totalMessages = -1; + _messageRecevied = 0; + } + + public void setTotalMessages(int newTotal) + { + _totalMessages = newTotal; + } + + public void onMessage(Message message) + { + _logger.info("Message Recevied"); + try + { + _messageRecevied++; + if (_messageRecevied == _batchSize) + { + if (_timingController != null) + { + _timingController.completeTest(true); + } + } + } + catch (InterruptedException e) + { + doDone(); + } + + if (_totalMessages == -1 || _messageRecevied == _totalMessages) + { + _logger.info("Test Completed.. signalling"); + doDone(); + } + } + + private void doDone() + { + _done = true; + _completedLock.countDown(); + try + { + _timingController.completeTest(true); + } + catch (InterruptedException e) + { + //ignore + } + } + + public int getReplyCount() + { + return _messageRecevied; + } + } + +} diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java index c9896ce063..402d72d6db 100644 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java @@ -42,94 +42,96 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll /** * Holds the name of the property to get the test message size from. */ - private static final String MESSAGE_SIZE_PROPNAME = "messageSize"; + protected static final String MESSAGE_SIZE_PROPNAME = "messageSize"; /** * Holds the name of the property to get the ping queue name from. */ - private static final String PING_QUEUE_NAME_PROPNAME = "pingQueue"; + protected static final String PING_QUEUE_NAME_PROPNAME = "pingQueue"; /** * holds the queue count, if the test is being performed with multiple queues */ - private static final String PING_QUEUE_COUNT_PROPNAME = "queues"; + protected static final String PING_QUEUE_COUNT_PROPNAME = "queues"; /** * Holds the name of the property to get the test delivery mode from. */ - private static final String PERSISTENT_MODE_PROPNAME = "persistent"; + protected static final String PERSISTENT_MODE_PROPNAME = "persistent"; /** * Holds the name of the property to get the test transactional mode from. */ - private static final String TRANSACTED_PROPNAME = "transacted"; + protected static final String TRANSACTED_PROPNAME = "transacted"; /** * Holds the name of the property to get the test broker url from. */ - private static final String BROKER_PROPNAME = "broker"; + protected static final String BROKER_PROPNAME = "broker"; /** * Holds the name of the property to get the test broker virtual path. */ - private static final String VIRTUAL_PATH_PROPNAME = "virtualPath"; + protected static final String VIRTUAL_PATH_PROPNAME = "virtualPath"; /** * Holds the name of the property to get the waiting timeout for response messages. */ - private static final String TIMEOUT_PROPNAME = "timeout"; + protected static final String TIMEOUT_PROPNAME = "timeout"; /** Holds the name of the property to get the message rate from. */ - private static final String RATE_PROPNAME = "rate"; + protected static final String RATE_PROPNAME = "rate"; - private static final String VERBOSE_OUTPUT_PROPNAME = "verbose"; + protected static final String VERBOSE_OUTPUT_PROPNAME = "verbose"; /** * Holds the size of message body to attach to the ping messages. */ - private static final int MESSAGE_SIZE_DEFAULT = 0; + protected static final int MESSAGE_SIZE_DEFAULT = 0; - private static final int BATCH_SIZE_DEFAULT = 2; + protected static final int BATCH_SIZE_DEFAULT = 2; + protected static final int COMMIT_BATCH_SIZE_DEFAULT = BATCH_SIZE_DEFAULT; /** * Holds the name of the queue to which pings are sent. */ - private static final String PING_QUEUE_NAME_DEFAULT = "ping"; + protected static final String PING_QUEUE_NAME_DEFAULT = "ping"; /** * Holds the message delivery mode to use for the test. */ - private static final boolean PERSISTENT_MODE_DEFAULT = false; + protected static final boolean PERSISTENT_MODE_DEFAULT = false; /** * Holds the transactional mode to use for the test. */ - private static final boolean TRANSACTED_DEFAULT = false; + protected static final boolean TRANSACTED_DEFAULT = false; /** * Holds the default broker url for the test. */ - private static final String BROKER_DEFAULT = "tcp://localhost:5672"; + protected static final String BROKER_DEFAULT = "tcp://localhost:5672"; /** * Holds the default virtual path for the test. */ - private static final String VIRTUAL_PATH_DEFAULT = "/test"; + protected static final String VIRTUAL_PATH_DEFAULT = "/test"; /** * Sets a default ping timeout. */ - private static final long TIMEOUT_DEFAULT = 3000; + protected static final long TIMEOUT_DEFAULT = 3000; /** Holds the default rate. A value of zero means infinity, only values of 1 or greater are meaningfull. */ private static final int RATE_DEFAULT = 0; - private static final String FAIL_AFTER_COMMIT = "FailAfterCommit"; - private static final String FAIL_BEFORE_COMMIT = "FailBeforeCommit"; - private static final String FAIL_AFTER_SEND = "FailAfterSend"; - private static final String FAIL_BEFORE_SEND = "FailBeforeSend"; - private static final String BATCH_SIZE = "BatchSize"; - private static final String FAIL_ONCE = "FailOnce"; + protected static final String FAIL_AFTER_COMMIT = "FailAfterCommit"; + protected static final String FAIL_BEFORE_COMMIT = "FailBeforeCommit"; + protected static final String FAIL_AFTER_SEND = "FailAfterSend"; + protected static final String FAIL_BEFORE_SEND = "FailBeforeSend"; + protected static final String COMMIT_BATCH_SIZE = "CommitBatchSize"; + protected static final String BATCH_SIZE = "BatchSize"; + protected static final String FAIL_ONCE = "FailOnce"; /** * Thread local to hold the per-thread test setup fields. @@ -139,7 +141,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner // of the test parameters to log with the results. - private Properties testParameters = System.getProperties(); + protected Properties testParameters = System.getProperties(); //private Properties testParameters = new ContextualProperties(System.getProperties()); public PingTestPerf(String name) @@ -154,6 +156,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll setSystemPropertyIfNull(FAIL_ONCE, "true"); setSystemPropertyIfNull(BATCH_SIZE, Integer.toString(BATCH_SIZE_DEFAULT)); + setSystemPropertyIfNull(COMMIT_BATCH_SIZE, Integer.toString(COMMIT_BATCH_SIZE_DEFAULT)); setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT)); setSystemPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT); setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT)); @@ -181,7 +184,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll //return new junit.framework.TestSuite(PingTestPerf.class); } - private static void setSystemPropertyIfNull(String propName, String propValue) + protected static void setSystemPropertyIfNull(String propName, String propValue) { if (System.getProperty(propName) == null) { @@ -223,6 +226,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll } } + protected void setUp() throws Exception { // Log4j will propagate the test name as a thread local in all log output. @@ -293,11 +297,11 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll } } - private static class PerThreadSetup + protected static class PerThreadSetup { /** * Holds the test ping client. */ - private TestPingItself _pingItselfClient; + protected TestPingItself _pingItselfClient; } } -- cgit v1.2.1