/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.ping; import junit.framework.Test; import junit.framework.TestSuite; import org.apache.log4j.Logger; import org.apache.qpid.client.AMQSession; import org.apache.qpid.requestreply.PingPongProducer; import org.apache.qpid.junit.extensions.TimingController; import org.apache.qpid.junit.extensions.TimingControllerAware; import org.apache.qpid.junit.extensions.util.ParsedProperties; import javax.jms.JMSException; import javax.jms.Message; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; /** * PingLatencyTestPerf is a performance test that outputs multiple timings from its test method, using the timing * controller interface supplied by the test runner from a seperate listener thread. It outputs round trip timings for * individual ping messages rather than for how long a complete batch of messages took to process. It also differs from * the {@link PingTestPerf} test that it extends because it can output timings as replies are received, rather than * waiting until all expected replies are received. * *

This test does not output timings for every single ping message, as when running at high volume, writing the test * log for a vast number of messages would slow the testing down. Instead samples ping latency occasionally. The * frequency of ping sampling is set using the {@link #TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the * default of every {@link #DEFAULT_TEST_RESULTS_BATCH_SIZE}. * *

The size parameter logged for each individual ping is set to the size of the batch of messages that the * individual timed ping was taken from, rather than 1 for a single message. This is so that the total throughput * (messages / time) can be calculated in order to examine the relationship between throughput and latency. * *

CRC Card
Responsibilities Collaborations
Send many ping * messages and output timings for sampled individual pings.
*/ public class PingLatencyTestPerf extends PingTestPerf implements TimingControllerAware { private static Logger _logger = Logger.getLogger(PingLatencyTestPerf.class); /** Holds the name of the property to get the test results logging batch size. */ public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize"; /** Holds the default test results logging batch size. */ public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000; /** Used to hold the timing controller passed from the test runner. */ private TimingController _timingController; /** Used to generate unique correlation ids for each test run. */ private AtomicLong corellationIdGenerator = new AtomicLong(); /** * Holds test specifics by correlation id. This consists of the expected number of messages and the timing * controler. */ private Map perCorrelationIds = Collections.synchronizedMap(new HashMap()); /** Holds the batched results listener, that does logging on batch boundaries. */ private BatchedResultsListener batchedResultsListener = null; /** * Creates a new asynchronous ping performance test with the specified name. * * @param name The test name. */ public PingLatencyTestPerf(String name) { super(name); // Sets up the test parameters with defaults. ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE)); } /** Compile all the tests into a test suite. */ public static Test suite() { // Build a new test suite TestSuite suite = new TestSuite("Ping Latency Tests"); // Run performance tests in read committed mode. suite.addTest(new PingLatencyTestPerf("testPingLatency")); return suite; } /** * Accepts a timing controller from the test runner. * * @param timingController The timing controller to register mutliple timings with. */ public void setTimingController(TimingController timingController) { _timingController = timingController; } /** * Gets the timing controller passed in by the test runner. * * @return The timing controller passed in by the test runner. */ public TimingController getTimingController() { return _timingController; } /** * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until all * replies have been received or a time out occurs before exiting this method. * * @param numPings The number of pings to send. */ public void testPingLatency(int numPings) throws Exception { _logger.debug("public void testPingLatency(int numPings): called"); // Ensure that at least one ping was requeusted. if (numPings == 0) { _logger.error("Number of pings requested was zero."); } // Get the per thread test setup to run the test through. PerThreadSetup perThreadSetup = threadSetup.get(); PingClient pingClient = perThreadSetup._pingClient; // Advance the correlation id of messages to send, to make it unique for this run. String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet()); _logger.debug("messageCorrelationId = " + messageCorrelationId); // Initialize the count and timing controller for the new correlation id. PerCorrelationId perCorrelationId = new PerCorrelationId(); TimingController tc = getTimingController().getControllerForCurrentThread(); perCorrelationId._tc = tc; perCorrelationId._expectedCount = numPings; perCorrelationIds.put(messageCorrelationId, perCorrelationId); // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these // messages. pingClient.setChainedMessageListener(batchedResultsListener); // Generate a sample message of the specified size. Message msg = pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); // Send the requested number of messages, and wait until they have all been received. long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, null); // Check that all the replies were received and log a fail if they were not. if (numReplies < numPings) { tc.completeTest(false, 0); } // Remove the chained message listener from the ping producer. pingClient.removeChainedMessageListener(); // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up. perCorrelationIds.remove(messageCorrelationId); } /** Performs test fixture creation on a per thread basis. This will only be called once for each test thread. */ public void threadSetUp() { _logger.debug("public void threadSetUp(): called"); try { // Call the set up method in the super class. This creates a PingClient pinger. super.threadSetUp(); // Create the chained message listener, only if it has not already been created. This is set up with the // batch size property, to tell it what batch size to output results on. A synchronized block is used to // ensure that only one thread creates this. synchronized (this) { if (batchedResultsListener == null) { int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME)); batchedResultsListener = new BatchedResultsListener(batchSize); } } // Get the set up that the super class created. PerThreadSetup perThreadSetup = threadSetup.get(); // Register the chained message listener on the pinger to do its asynchronous test timings from. perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener); } catch (Exception e) { _logger.warn("There was an exception during per thread setup.", e); } } /** * BatchedResultsListener is a {@link org.apache.qpid.requestreply.PingPongProducer.ChainedMessageListener} that can * be attached to the pinger, in order to receive notifications about every message received and the number * remaining to be received. Whenever the number remaining crosses a batch size boundary this results listener * outputs a test timing for the actual number of messages received in the current batch. */ private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener { /** The test results logging batch size. */ int _batchSize; private boolean _strictAMQP; /** * Creates a results listener on the specified batch size. * * @param batchSize The batch size to use. */ public BatchedResultsListener(int batchSize) { _batchSize = batchSize; _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT)); } /** * This callback method is called from all of the pingers that this test creates. It uses the correlation id * from the message to identify the timing controller for the test thread that was responsible for sending those * messages. * * @param message The message. * @param remainingCount The count of messages remaining to be received with a particular correlation id. * * @throws javax.jms.JMSException Any underlying JMSException is allowed to fall through. */ public void onMessage(Message message, int remainingCount, long latency) throws JMSException { _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called"); // Check if a batch boundary has been crossed. if ((remainingCount % _batchSize) == 0) { // Extract the correlation id from the message. String correlationId = message.getJMSCorrelationID(); // Get the details for the correlation id and check that they are not null. They can become null // if a test times out. PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId); if (perCorrelationId != null) { // Get the timing controller and expected count for this correlation id. TimingController tc = perCorrelationId._tc; int expected = perCorrelationId._expectedCount; // Calculate how many messages were actually received in the last batch. This will be the batch size // except where the number expected is not a multiple of the batch size and this is the first remaining // count to cross a batch size boundary, in which case it will be the number expected modulo the batch // size. int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize; // Register a test result for the correlation id. try { tc.completeTest(true, receivedInBatch, latency); } catch (InterruptedException e) { // Ignore this. It means the test runner wants to stop as soon as possible. _logger.warn("Got InterruptedException.", e); } } // Else ignore, test timed out. Should log a fail here? } } } /** * Holds state specific to each correlation id, needed to output test results. This consists of the count of the * total expected number of messages, and the timing controller for the thread sending those message ids. */ private static class PerCorrelationId { public int _expectedCount; public TimingController _tc; } }