/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.ping; import junit.framework.Assert; import junit.framework.Test; import junit.framework.TestSuite; import org.apache.log4j.Logger; import org.apache.qpid.requestreply.PingPongProducer; import org.apache.qpid.junit.extensions.AsymptoticTestCase; import org.apache.qpid.junit.extensions.TestThreadAware; import org.apache.qpid.junit.extensions.util.ParsedProperties; import org.apache.qpid.junit.extensions.util.TestContextProperties; import javax.jms.*; /** * PingTestPerf is a ping test, that has been written with the intention of being scaled up to run many times * simultaneously to simluate many clients/producers/connections. * *

A single run of the test using the default JUnit test runner will result in the sending and timing of a single * full round trip ping. This test may be scaled up using a suitable JUnit test runner. * *

The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a * temporary queue for replies. This setup is only established once for all the test repeats/threads that may be run, * except if the connection is lost in which case an attempt to re-establish the setup is made. * *

The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that * is the name of the temporary queue, fires off a message on the original queue and waits for a response on the * temporary queue. * *

Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. * *

*
CRC Card
Responsibilities Collaborations *
*/ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware { private static Logger _logger = Logger.getLogger(PingTestPerf.class); /** Thread local to hold the per-thread test setup fields. */ ThreadLocal threadSetup = new ThreadLocal(); /** Holds a property reader to extract the test parameters from. */ protected ParsedProperties testParameters = TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/); public PingTestPerf(String name) { super(name); _logger.debug("testParameters = " + testParameters); } /** * Compile all the tests into a test suite. * @return The test method testPingOk. */ public static Test suite() { // Build a new test suite TestSuite suite = new TestSuite("Ping Performance Tests"); // Run performance tests in read committed mode. suite.addTest(new PingTestPerf("testPingOk")); return suite; } public void testPingOk(int numPings) throws Exception { if (numPings == 0) { Assert.fail("Number of pings requested was zero."); } // Get the per thread test setup to run the test through. PerThreadSetup perThreadSetup = threadSetup.get(); if (perThreadSetup == null) { Assert.fail("Could not get per thread test setup, it was null."); } // Generate a sample message. This message is already time stamped and has its reply-to destination set. Message msg = perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); // start the test long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout, null); // Fail the test if the timeout was exceeded. if (numReplies != perThreadSetup._pingClient.getExpectedNumPings(numPings)) { Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies); } } /** Performs test fixture creation on a per thread basis. This will only be called once for each test thread. */ public void threadSetUp() { _logger.debug("public void threadSetUp(): called"); try { PerThreadSetup perThreadSetup = new PerThreadSetup(); // This is synchronized because there is a race condition, which causes one connection to sleep if // all threads try to create connection concurrently. synchronized (this) { // Establish a client to ping a Destination and listen the reply back from same Destination perThreadSetup._pingClient = new PingClient(testParameters); perThreadSetup._pingClient.establishConnection(true, true); } // Attach the per-thread set to the thread. threadSetup.set(perThreadSetup); } catch (Exception e) { _logger.warn("There was an exception during per thread setup.", e); } } /** * Called after all threads have completed their setup. */ public void postThreadSetUp() { _logger.debug("public void postThreadSetUp(): called"); PerThreadSetup perThreadSetup = threadSetup.get(); // Prefill the broker unless we are in consume only mode. int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME); if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME) && preFill > 0) { try { // Manually set the correlation ID to 1. This is not ideal but it is the // value that the main test loop will use. perThreadSetup._pingClient.pingNoWaitForReply(null, preFill, String.valueOf(perThreadSetup._pingClient.getClientCount())); // Note with a large preFill and non-tx session the messages will be // rapidly pushed in to the mina buffers. OOM's are a real risk here. // Should perhaps consider using a TX session for the prefill. long delayBeforeConsume = testParameters.getPropertyAsLong(PingPongProducer.DELAY_BEFORE_CONSUME_PROPNAME); // Only delay if we are // not doing send only // and we have consumers // and a delayBeforeConsume if (!(testParameters.getPropertyAsBoolean(PingPongProducer.SEND_ONLY_PROPNAME)) && (testParameters.getPropertyAsInteger(PingPongProducer.NUM_CONSUMERS_PROPNAME) > 0) && delayBeforeConsume > 0) { boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME); // Only do logging if in verbose mode. if (verbose) { if (delayBeforeConsume > 60000) { long minutes = delayBeforeConsume / 60000; long seconds = (delayBeforeConsume - (minutes * 60000)) / 1000; long ms = delayBeforeConsume - (minutes * 60000) - (seconds * 1000); _logger.info("Delaying for " + minutes + "m " + seconds + "s " + ms + "ms before starting test."); } else { _logger.info("Delaying for " + delayBeforeConsume + "ms before starting test."); } } Thread.sleep(delayBeforeConsume); if (verbose) { _logger.info("Starting Test."); } } // We can't start the client's here as the test client has not yet been configured to receieve messages. // only when the test method is executed will the correlationID map be set up and ready to consume // the messages we have sent here. } catch (Exception e) { _logger.warn("There was an exception during per thread setup.", e); } } else //Only start the consumer if we are not preFilling. { // Start the consumers, unless we have data on the broker // already this is signified by being in consume_only, we will // start the clients after setting up the correlation IDs. // We should also not start the clients if we are in Send only if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME) && !(testParameters.getPropertyAsBoolean(PingPongProducer.SEND_ONLY_PROPNAME))) { // Start the client connection try { perThreadSetup._pingClient.start(); } catch (JMSException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } } } /** * Performs test fixture clean */ public void threadTearDown() { _logger.debug("public void threadTearDown(): called"); try { // Get the per thread test fixture. PerThreadSetup perThreadSetup = threadSetup.get(); // Close the pingers so that it cleans up its connection cleanly. synchronized (this) { if ((perThreadSetup != null) && (perThreadSetup._pingClient != null)) { perThreadSetup._pingClient.close(); } } } catch (JMSException e) { _logger.warn("There was an exception during per thread tear down."); } finally { // Ensure the per thread fixture is reclaimed. threadSetup.remove(); } } protected static class PerThreadSetup { /** * Holds the test ping client. */ protected PingClient _pingClient; protected String _correlationId; } }