diff options
Diffstat (limited to 'qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java')
-rw-r--r-- | qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java | 322 |
1 files changed, 322 insertions, 0 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java new file mode 100644 index 0000000000..dc78276edd --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -0,0 +1,322 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ping; + +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; + +import org.apache.qpid.requestreply.PingPongProducer; + +import org.apache.qpid.junit.extensions.TimingController; +import org.apache.qpid.junit.extensions.TimingControllerAware; + +import javax.jms.JMSException; +import javax.jms.Message; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * PingAsyncTestPerf is a performance test that outputs multiple timings from its test method, using the timing controller + * interface supplied by the test runner from a seperate listener thread. It differs from the {@link PingTestPerf} test + * that it extends because it can output timings as replies are received, rather than waiting until all expected replies + * are received. This is less 'blocky' than the tests in {@link PingTestPerf}, and provides a truer simulation of sending + * and recieving clients working asynchronously. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><td> Responsibilities <th> Collaborations + * <tr><td> Send many ping messages and output timings asynchronously on batches received. + * </table> + */ +public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerAware +{ + private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class); + + /** Holds the name of the property to get the test results logging batch size. */ + public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize"; + + /** Holds the default test results logging batch size. */ + public static final int TEST_RESULTS_BATCH_SIZE_DEFAULT = 1000; + + /** Used to hold the timing controller passed from the test runner. */ + private TimingController _timingController; + + /** Used to generate unique correlation ids for each test run. */ + private AtomicLong corellationIdGenerator = new AtomicLong(); + + /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */ + private Map<String, PerCorrelationId> perCorrelationIds = + Collections.synchronizedMap(new HashMap<String, PerCorrelationId>()); + + /** Holds the batched results listener, that does logging on batch boundaries. */ + private BatchedResultsListener batchedResultsListener = null; + + /** + * Creates a new asynchronous ping performance test with the specified name. + * + * @param name The test name. + */ + public PingAsyncTestPerf(String name) + { + super(name); + + // Sets up the test parameters with defaults. + testParameters.setPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, + Integer.toString(TEST_RESULTS_BATCH_SIZE_DEFAULT)); + } + + /** + * Compile all the tests into a test suite. + * @return The test suite to run. Should only contain testAsyncPingOk method. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping Performance Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingAsyncTestPerf("testAsyncPingOk")); + + return suite; + } + + /** + * Accepts a timing controller from the test runner. + * + * @param timingController The timing controller to register mutliple timings with. + */ + public void setTimingController(TimingController timingController) + { + _timingController = timingController; + } + + /** + * Gets the timing controller passed in by the test runner. + * + * @return The timing controller passed in by the test runner. + */ + public TimingController getTimingController() + { + return _timingController; + } + + /** + * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until + * all replies have been received or a time out occurs before exiting this method. + * + * @param numPings The number of pings to send. + * @throws Exception pass all errors out to the test harness + */ + public void testAsyncPingOk(int numPings) throws Exception + { + // _logger.debug("public void testAsyncPingOk(int numPings): called"); + + // get prefill count to update the expected count + int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME); + + // Ensure that at least one ping was requeusted. + if (numPings + preFill == 0) + { + _logger.error("Number of pings requested was zero."); + fail("Number of pings requested was zero."); + } + + // Get the per thread test setup to run the test through. + PerThreadSetup perThreadSetup = threadSetup.get(); + PingClient pingClient = perThreadSetup._pingClient; + + // Advance the correlation id of messages to send, to make it unique for this run. + perThreadSetup._correlationId = Long.toString(corellationIdGenerator.incrementAndGet()); + // String messageCorrelationId = perThreadSetup._correlationId; + // _logger.debug("messageCorrelationId = " + messageCorrelationId); + + + // Initialize the count and timing controller for the new correlation id. + // This perCorrelationId is only used for controlling the test. + // The PingClient itself uses its own perCorrelationId see in PingPongProducer + PerCorrelationId perCorrelationId = new PerCorrelationId(); + TimingController tc = getTimingController().getControllerForCurrentThread(); + perCorrelationId._tc = tc; + perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings + preFill); + perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId); + + // Must be called before pingAndWaitForReply to setup the CorrelationID. + // This is required because pingClient.start() will start all client threads + // This means that the CorrelationID must be registered before hand. + pingClient.setupCorrelationID(perThreadSetup._correlationId, perCorrelationId._expectedCount); + + // Start the client connection if: + // 1) we are not in a SEND_ONLY test. + // 2) if we have not yet started client because messages are sitting on broker. + // This is either due to a preFill or a consume only test. + if (!testParameters.getPropertyAsBoolean(PingPongProducer.SEND_ONLY_PROPNAME) && + (preFill > 0 || testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME))) + { + pingClient.start(); + } + + // Send the requested number of messages, and wait until they have all been received. + long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); + int numReplies = pingClient.pingAndWaitForReply(null, numPings , preFill, timeout, perThreadSetup._correlationId); + + // Check that all the replies were received and log a fail if they were not. + if (numReplies < perCorrelationId._expectedCount) + { + perCorrelationId._tc.completeTest(false, numPings - perCorrelationId._expectedCount); + } + + // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up. + perCorrelationIds.remove(perThreadSetup._correlationId); + } + + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() + { + _logger.debug("public void threadSetUp(): called"); + + try + { + // Call the set up method in the super class. This creates a PingClient pinger. + super.threadSetUp(); + + // Create the chained message listener, only if it has not already been created. This is set up with the + // batch size property, to tell it what batch size to output results on. A synchronized block is used to + // ensure that only one thread creates this. + synchronized (this) + { + if (batchedResultsListener == null) + { + int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME)); + batchedResultsListener = new BatchedResultsListener(batchSize); + } + } + + // Get the set up that the super class created. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Register the chained message listener on the pinger to do its asynchronous test timings from. + perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } + } + + /** + * BatchedResultsListener is a {@link PingPongProducer.ChainedMessageListener} that can be attached to the + * pinger, in order to receive notifications about every message received and the number remaining to be + * received. Whenever the number remaining crosses a batch size boundary this results listener outputs + * a test timing for the actual number of messages received in the current batch. + */ + private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener + { + /** The test results logging batch size. */ + int _batchSize; + + /** The latency recoreded for the batch */ + private long _batchLatency = 0; + + /** + * Creates a results listener on the specified batch size. + * + * @param batchSize The batch size to use. + */ + public BatchedResultsListener(int batchSize) + { + _batchSize = batchSize; + } + + /** + * This callback method is called from all of the pingers that this test creates. It uses the correlation id + * from the message to identify the timing controller for the test thread that was responsible for sending those + * messages. + * + * @param message The message. + * @param remainingCount The count of messages remaining to be received with a particular correlation id. + * + * @throws JMSException Any underlying JMSException is allowed to fall through. + */ + public void onMessage(Message message, int remainingCount, long latency) throws JMSException + { + // Record the latency for the whole batch + _batchLatency += latency; + // Check if a batch boundary has been crossed. + if ((remainingCount % _batchSize) == 0) + { + // Extract the correlation id from the message. + String correlationId = message.getJMSCorrelationID(); + + /*_logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + + "): called on batch boundary for message id: " + correlationId + " with thread id: " + + Thread.currentThread().getId());*/ + + // Get the details for the correlation id and check that they are not null. They can become null + // if a test times out. + PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId); + if (perCorrelationId != null) + { + // Get the timing controller and expected count for this correlation id. + TimingController tc = perCorrelationId._tc; + int expected = perCorrelationId._expectedCount; + + // Calculate how many messages were actually received in the last batch. This will be the batch size + // except where the number expected is not a multiple of the batch size and this is the first remaining + // count to cross a batch size boundary, in which case it will be the number expected modulo the batch + // size. + int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize; + + // Register a test result for the correlation id. + try + { + // Record the total latency for the batch. + // if batchSize=1 then this will just be the message latency + tc.completeTest(true, receivedInBatch, null, _batchSize == 1 ? latency : _batchLatency); + // Reset latency + _batchLatency = 0; + } + catch (InterruptedException e) + { + // Ignore this. It means the test runner wants to stop as soon as possible. + _logger.warn("Got InterruptedException.", e); + } + } + // Else ignore, test timed out. Should log a fail here? + } + } + } + + /** + * Holds state specific to each correlation id, needed to output test results. This consists of the count of + * the total expected number of messages, and the timing controller for the thread sending those message ids. + */ + private static class PerCorrelationId + { + public int _expectedCount; + public TimingController _tc; + } +} |